From 1488975d1b1480655f8317a4b762ae642da63ab0 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 2 Oct 2017 10:46:21 -0700 Subject: [PATCH] =?UTF-8?q?Add=20timing=20statement=20to=20loop=20that=20c?= =?UTF-8?q?alls=20redis=5Fget=5Fcached=5Fdb=5Fclient=20be=E2=80=A6=20(#104?= =?UTF-8?q?5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add timing statement to loop that calls redis_get_cached_db_client because it has been slow in the past. * Fix linting. * Refactoring to make manager vectors into std::vector. * Fix linting. * Fixes. --- src/common/state/object_table.h | 9 +- src/common/state/redis.cc | 106 ++++++++++------ src/common/test/db_tests.cc | 12 +- src/common/test/object_table_tests.cc | 81 ++++++------ src/global_scheduler/global_scheduler.cc | 18 +-- src/local_scheduler/local_scheduler.cc | 13 +- src/plasma/plasma_manager.cc | 152 ++++++----------------- src/plasma/plasma_manager.h | 16 +-- src/plasma/test/manager_tests.cc | 33 ++--- 9 files changed, 184 insertions(+), 256 deletions(-) diff --git a/src/common/state/object_table.h b/src/common/state/object_table.h index 377115fa7..b8233620d 100644 --- a/src/common/state/object_table.h +++ b/src/common/state/object_table.h @@ -13,20 +13,19 @@ /* Callback called when the lookup completes. The callback should free * the manager_vector array, but NOT the strings they are pointing to. If there * was no entry at all for the object (the object had never been created - * before), then manager_count will be -1. + * before), then never_created will be true. */ typedef void (*object_table_lookup_done_callback)( ObjectID object_id, - int manager_count, - OWNER const char *manager_vector[], + bool never_created, + const std::vector &manager_vector, void *user_context); /* Callback called when object ObjectID is available. */ typedef void (*object_table_object_available_callback)( ObjectID object_id, int64_t data_size, - int manager_count, - OWNER const char *manager_vector[], + const std::vector &manager_vector, void *user_context); /** diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index e56b343f3..58aca1ec5 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -599,14 +599,13 @@ void redis_result_table_lookup(TableCallbackData *callback_data) { * * @param db The database handle. * @param index The index of the plasma manager. - * @param manager The pointer where the IP address of the manager gets written. - * @return Void. + * @return The IP address and port of the manager. */ -void redis_get_cached_db_client(DBHandle *db, - DBClientID db_client_id, - const char **manager) { +const std::string redis_get_cached_db_client(DBHandle *db, + DBClientID db_client_id) { auto it = db->db_client_cache.find(db_client_id); + char *manager; if (it == db->db_client_cache.end()) { /* This is a very rare case. It should happen at most once per db client. */ redisReply *reply = (redisReply *) redisCommand( @@ -617,10 +616,42 @@ void redis_get_cached_db_client(DBHandle *db, char *addr = strdup(reply->str); freeReplyObject(reply); db->db_client_cache[db_client_id] = addr; - *manager = addr; + manager = addr; } else { - *manager = it->second; + manager = it->second; } + std::string manager_address(manager); + return manager_address; +} + +const std::vector redis_get_cached_db_clients( + DBHandle *db, + const std::vector &manager_ids) { + /* We time this function because in the past this loop has taken multiple + * seconds under stressful situations on hundreds of machines causing the + * plasma manager to die (because it went too long without sending + * heartbeats). */ + int64_t start_time = current_time_ms(); + + /* Construct the manager vector from the flatbuffers object. */ + std::vector manager_vector; + + for (auto const &manager_id : manager_ids) { + const std::string manager_address = + redis_get_cached_db_client(db, manager_id); + manager_vector.push_back(manager_address); + } + + int64_t end_time = current_time_ms(); + int64_t max_time_for_loop = 1000; + if (end_time - start_time > max_time_for_loop) { + LOG_WARN( + "calling redis_get_cached_db_client in a loop in with %zu manager IDs " + "took %" PRId64 " milliseconds.", + manager_ids.size(), end_time - start_time); + } + + return manager_vector; } void redis_object_table_lookup_callback(redisAsyncContext *c, @@ -631,43 +662,41 @@ void redis_object_table_lookup_callback(redisAsyncContext *c, LOG_DEBUG("Object table lookup callback"); CHECK(reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ARRAY); + object_table_lookup_done_callback done_callback = + (object_table_lookup_done_callback) callback_data->done_callback; + ObjectID obj_id = callback_data->id; - int64_t manager_count = 0; - DBClientID *managers = NULL; - const char **manager_vector = NULL; /* Parse the Redis reply. */ if (reply->type == REDIS_REPLY_NIL) { /* The object entry did not exist. */ - manager_count = -1; - } else if (reply->type == REDIS_REPLY_ARRAY) { - manager_count = reply->elements; - if (manager_count > 0) { - managers = (DBClientID *) malloc(reply->elements * sizeof(DBClientID)); - manager_vector = (const char **) malloc(manager_count * sizeof(char *)); + if (done_callback) { + done_callback(obj_id, true, std::vector(), + callback_data->user_context); } + } else if (reply->type == REDIS_REPLY_ARRAY) { + /* Extract the manager IDs from the response into a vector. */ + std::vector manager_ids; + for (int j = 0; j < reply->elements; ++j) { CHECK(reply->element[j]->type == REDIS_REPLY_STRING); - memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id)); - redis_get_cached_db_client(db, managers[j], manager_vector + j); + DBClientID manager_id; + memcpy(manager_id.id, reply->element[j]->str, sizeof(manager_id.id)); + manager_ids.push_back(manager_id); + } + + const std::vector manager_vector = + redis_get_cached_db_clients(db, manager_ids); + + if (done_callback) { + done_callback(obj_id, false, manager_vector, callback_data->user_context); } } else { LOG_FATAL("Unexpected reply type from object table lookup."); } - object_table_lookup_done_callback done_callback = - (object_table_lookup_done_callback) callback_data->done_callback; - if (done_callback) { - done_callback(obj_id, manager_count, manager_vector, - callback_data->user_context); - } - /* Clean up timer and callback. */ destroy_timer_callback(callback_data->db_handle->loop, callback_data); - if (manager_count > 0) { - free(managers); - free(manager_vector); - } } void object_table_redis_subscribe_to_notifications_callback( @@ -704,22 +733,24 @@ void object_table_redis_subscribe_to_notifications_callback( /* Extract the data size. */ int64_t data_size = message->object_size(); int manager_count = message->manager_ids()->size(); - /* Construct the manager vector from the flatbuffers object. */ - const char **manager_vector = - (const char **) malloc(manager_count * sizeof(char *)); + + /* Extract the manager IDs from the response into a vector. */ + std::vector manager_ids; for (int i = 0; i < manager_count; ++i) { DBClientID manager_id = from_flatbuf(message->manager_ids()->Get(i)); - redis_get_cached_db_client(db, manager_id, &manager_vector[i]); + manager_ids.push_back(manager_id); } + const std::vector manager_vector = + redis_get_cached_db_clients(db, manager_ids); + /* Call the subscribe callback. */ ObjectTableSubscribeData *data = (ObjectTableSubscribeData *) callback_data->data; if (data->object_available_callback) { - data->object_available_callback(obj_id, data_size, manager_count, - manager_vector, data->subscribe_context); + data->object_available_callback(obj_id, data_size, manager_vector, + data->subscribe_context); } - free(manager_vector); } else if (strcmp(message_type->str, "subscribe") == 0) { /* The reply for the initial SUBSCRIBE command. */ /* Call the done callback if there is one. This code path should only be @@ -727,7 +758,8 @@ void object_table_redis_subscribe_to_notifications_callback( if (callback_data->done_callback != NULL) { object_table_lookup_done_callback done_callback = (object_table_lookup_done_callback) callback_data->done_callback; - done_callback(NIL_ID, 0, NULL, callback_data->user_context); + done_callback(NIL_ID, false, std::vector(), + callback_data->user_context); } /* If the initial SUBSCRIBE was successful, clean up the timer, but don't * destroy the callback data. */ diff --git a/src/common/test/db_tests.cc b/src/common/test/db_tests.cc index 5d54ac9a9..782cae0a1 100644 --- a/src/common/test/db_tests.cc +++ b/src/common/test/db_tests.cc @@ -38,17 +38,15 @@ const int TEST_NUMBER = 10; /* Test if entries have been written to the database. */ void lookup_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], + bool never_created, + const std::vector &manager_vector, void *user_context) { - CHECK(manager_count == 2); - if (!manager_vector[0] || - sscanf(manager_vector[0], "%15[0-9.]:%5[0-9]", received_addr1, + CHECK(manager_vector.size() == 2); + if (sscanf(manager_vector.at(0).c_str(), "%15[0-9.]:%5[0-9]", received_addr1, received_port1) != 2) { CHECK(0); } - if (!manager_vector[1] || - sscanf(manager_vector[1], "%15[0-9.]:%5[0-9]", received_addr2, + if (sscanf(manager_vector.at(1).c_str(), "%15[0-9.]:%5[0-9]", received_addr2, received_port2) != 2) { CHECK(0); } diff --git a/src/common/test/object_table_tests.cc b/src/common/test/object_table_tests.cc index 2598a5021..4bd2e0edb 100644 --- a/src/common/test/object_table_tests.cc +++ b/src/common/test/object_table_tests.cc @@ -145,8 +145,8 @@ const char *lookup_timeout_context = "lookup_timeout"; int lookup_failed = 0; void lookup_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], + bool never_created, + const std::vector &manager_vector, void *context) { /* The done callback should not be called. */ CHECK(0); @@ -226,8 +226,7 @@ int subscribe_failed = 0; void subscribe_done_callback(ObjectID object_id, int64_t data_size, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *user_context) { /* The done callback should not be called. */ CHECK(0); @@ -293,14 +292,6 @@ int64_t terminate_event_loop_callback(event_loop *loop, const char *lookup_retry_context = "lookup_retry"; int lookup_retry_succeeded = 0; -void lookup_retry_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], - void *context) { - CHECK(context == (void *) lookup_retry_context); - lookup_retry_succeeded = 1; -} - void lookup_retry_fail_callback(UniqueID id, void *user_context, void *user_data) { @@ -316,12 +307,12 @@ int add_retry_succeeded = 0; /* === Test add then lookup retry === */ void add_lookup_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], + bool never_created, + const std::vector &manager_vector, void *context) { CHECK(context == (void *) lookup_retry_context); - CHECK(manager_count == 1); - CHECK(strcmp(manager_vector[0], "127.0.0.1:11235") == 0); + CHECK(manager_vector.size() == 1); + CHECK(manager_vector.at(0) == "127.0.0.1:11235"); lookup_retry_succeeded = 1; } @@ -365,12 +356,13 @@ TEST add_lookup_test(void) { } /* === Test add, remove, then lookup === */ -void add_remove_lookup_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], - void *context) { +void add_remove_lookup_done_callback( + ObjectID object_id, + bool never_created, + const std::vector &manager_vector, + void *context) { CHECK(context == (void *) lookup_retry_context); - CHECK(manager_count == 0); + CHECK(manager_vector.size() == 0); lookup_retry_succeeded = 1; } @@ -440,8 +432,8 @@ void lookup_late_fail_callback(UniqueID id, } void lookup_late_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], + bool never_created, + const std::vector &manager_vector, void *context) { /* This function should never be called. */ CHECK(0); @@ -528,10 +520,11 @@ void subscribe_late_fail_callback(UniqueID id, subscribe_late_failed = 1; } -void subscribe_late_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], - void *user_context) { +void subscribe_late_done_callback( + ObjectID object_id, + bool never_created, + const std::vector &manager_vector, + void *user_context) { /* This function should never be called. */ CHECK(0); } @@ -578,10 +571,11 @@ void subscribe_success_fail_callback(UniqueID id, CHECK(0); } -void subscribe_success_done_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], - void *user_context) { +void subscribe_success_done_callback( + ObjectID object_id, + bool never_created, + const std::vector &manager_vector, + void *user_context) { RetryInfo retry = { .num_retries = 0, .timeout = 750, .fail_callback = NULL, }; @@ -590,14 +584,14 @@ void subscribe_success_done_callback(ObjectID object_id, subscribe_success_done = 1; } -void subscribe_success_object_available_callback(ObjectID object_id, - int64_t data_size, - int manager_count, - const char *manager_vector[], - void *user_context) { +void subscribe_success_object_available_callback( + ObjectID object_id, + int64_t data_size, + const std::vector &manager_vector, + void *user_context) { CHECK(user_context == (void *) subscribe_success_context); CHECK(ObjectID_equal(object_id, subscribe_id)); - CHECK(manager_count == 1); + CHECK(manager_vector.size() == 1); subscribe_success_succeeded = 1; } @@ -651,15 +645,14 @@ int subscribe_object_present_succeeded = 0; void subscribe_object_present_object_available_callback( ObjectID object_id, int64_t data_size, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *user_context) { subscribe_object_present_context_t *ctx = (subscribe_object_present_context_t *) user_context; CHECK(ctx->data_size == data_size); CHECK(strcmp(subscribe_object_present_str, ctx->teststr) == 0); subscribe_object_present_succeeded = 1; - CHECK(manager_count == 1); + CHECK(manager_vector.size() == 1); } void fatal_fail_callback(UniqueID id, void *user_context, void *user_data) { @@ -718,8 +711,7 @@ const char *subscribe_object_not_present_context = void subscribe_object_not_present_object_available_callback( ObjectID object_id, int64_t data_size, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *user_context) { /* This should not be called. */ CHECK(0); @@ -768,8 +760,7 @@ int subscribe_object_available_later_succeeded = 0; void subscribe_object_available_later_object_available_callback( ObjectID object_id, int64_t data_size, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *user_context) { subscribe_object_present_context_t *myctx = (subscribe_object_present_context_t *) user_context; @@ -777,7 +768,7 @@ void subscribe_object_available_later_object_available_callback( CHECK(strcmp(myctx->teststr, subscribe_object_available_later_context) == 0); /* Make sure the callback is only called once. */ subscribe_object_available_later_succeeded += 1; - CHECK(manager_count == 1); + CHECK(manager_vector.size() == 1); } TEST subscribe_object_available_later_test(void) { diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index d46b43214..ff3209cd8 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -278,19 +278,19 @@ void process_new_db_client(DBClient *db_client, void *user_context) { * @param user_context The user context. * @return Void. */ -void object_table_subscribe_callback(ObjectID object_id, - int64_t data_size, - int manager_count, - const char *manager_vector[], - void *user_context) { +void object_table_subscribe_callback( + ObjectID object_id, + int64_t data_size, + const std::vector &manager_vector, + void *user_context) { /* Extract global scheduler state from the callback context. */ GlobalSchedulerState *state = (GlobalSchedulerState *) user_context; char id_string[ID_STRING_SIZE]; LOG_DEBUG("object table subscribe callback for OBJECT = %s", ObjectID_to_string(object_id, id_string, ID_STRING_SIZE)); ARROW_UNUSED(id_string); - LOG_DEBUG("\tManagers<%d>:", manager_count); - for (int i = 0; i < manager_count; i++) { + LOG_DEBUG("\tManagers<%d>:", manager_vector.size()); + for (int i = 0; i < manager_vector.size(); i++) { LOG_DEBUG("\t\t%s", manager_vector[i]); } @@ -304,7 +304,7 @@ void object_table_subscribe_callback(ObjectID object_id, LOG_DEBUG("New object added to object_info_table with id = %s", ObjectID_to_string(object_id, id_string, ID_STRING_SIZE)); LOG_DEBUG("\tmanager locations:"); - for (int i = 0; i < manager_count; i++) { + for (int i = 0; i < manager_vector.size(); i++) { LOG_DEBUG("\t\t%s", manager_vector[i]); } } @@ -314,7 +314,7 @@ void object_table_subscribe_callback(ObjectID object_id, /* In all cases, replace the object location vector on each callback. */ obj_info_entry.object_locations.clear(); - for (int i = 0; i < manager_count; i++) { + for (int i = 0; i < manager_vector.size(); i++) { obj_info_entry.object_locations.push_back(std::string(manager_vector[i])); } } diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index ef1a51547..fcb2e677d 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -723,23 +723,24 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id, reconstruct_task_update_callback, state); } -void reconstruct_object_lookup_callback(ObjectID reconstruct_object_id, - int manager_count, - const char *manager_vector[], - void *user_context) { +void reconstruct_object_lookup_callback( + ObjectID reconstruct_object_id, + bool never_created, + const std::vector &manager_vector, + void *user_context) { LOG_DEBUG("Manager count was %d", manager_count); /* Only continue reconstruction if we find that the object doesn't exist on * any nodes. NOTE: This codepath is not responsible for checking if the * object table entry is up-to-date. */ LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* Look up the task that created the object in the result table. */ - if (manager_count == 0) { + if (!never_created && manager_vector.size() == 0) { /* If the object was created and later evicted, we reconstruct the object * if and only if there are no other instances of the task running. */ result_table_lookup(state->db, reconstruct_object_id, NULL, reconstruct_evicted_result_lookup_callback, (void *) state); - } else if (manager_count == -1) { + } else if (never_created) { /* If the object has not been created yet, we reconstruct the object if and * only if the task that created the object failed to complete. */ result_table_lookup(state->db, reconstruct_object_id, NULL, diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index 0612d53a0..b47785f5f 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -92,15 +92,13 @@ void process_status_request(ClientConnection *client_conn, ObjectID object_id); * a remote Plasma Store. * * @param object_id ID of the object to transfer or to get its status. - * @param manager_cont Number of remote nodes object_id is stored at. - * @param manager_vector Array containing the Plasma Managers - * running at the nodes where object_id is stored. + * @param manager_vector Array containing the Plasma Managers running at the + * nodes where object_id is stored. * @param context Client connection. * @return Status of object_id as defined in plasma.h */ int request_status(ObjectID object_id, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *context); /** @@ -142,11 +140,8 @@ void process_data_request(event_loop *loop, typedef struct { /** The ID of the object we are fetching or waiting for. */ ObjectID object_id; - /** Pointer to the array containing the manager locations of this object. This - * struct owns and must free each entry. */ - char **manager_vector; - /** The number of manager locations in the array manager_vector. */ - int manager_count; + /** Vector of the addresses of the managers containing this object. */ + std::vector manager_vector; /** The next manager we should try to contact. This is set to an index in * manager_vector in the retry handler, in case the current attempt fails to * contact a manager. */ @@ -241,39 +236,6 @@ struct PlasmaManagerState { PlasmaManagerState *g_manager_state = NULL; -/* The context for fetch and wait requests. These are per client, per object. */ -struct ClientObjectRequest { - /** The ID of the object we are fetching or waiting for. */ - ObjectID object_id; - /** The client connection context, shared between other ClientObjectRequest - * structs for the same client. */ - ClientConnection *client_conn; - /** The ID for the timer that will time out the current request to the state - * database or another plasma manager. */ - int64_t timer; - /** How many retries we have left for the request. Decremented on every - * timeout. */ - int num_retries; - /** Handle for a linked list. */ - ClientObjectRequest *next; - /** Pointer to the array containing the manager locations of - * this object. */ - char **manager_vector; - /** The number of manager locations in the array manager_vector. */ - int manager_count; - /** The next manager we should try to contact. This is set to an index in - * manager_vector in the retry handler, in case the current attempt fails to - * contact a manager. */ - int next_manager; - /** Handle for the uthash table in the client connection - * context that keeps track of active object connection - * contexts. */ - UT_hash_handle active_hh; - /** Handle for the uthash table in the manager state that - * keeps track of outstanding fetch requests. */ - UT_hash_handle fetch_hh; -}; - /* Context for a client connection to another plasma manager. */ struct ClientConnection { /** Current state for this plasma manager. This is shared @@ -459,10 +421,8 @@ void update_object_wait_requests(PlasmaManagerState *manager_state, FetchRequest *create_fetch_request(PlasmaManagerState *manager_state, ObjectID object_id) { - FetchRequest *fetch_req = (FetchRequest *) malloc(sizeof(FetchRequest)); + FetchRequest *fetch_req = new FetchRequest(); fetch_req->object_id = object_id; - fetch_req->manager_count = 0; - fetch_req->manager_vector = NULL; return fetch_req; } @@ -477,14 +437,8 @@ void remove_fetch_request(PlasmaManagerState *manager_state, FetchRequest *fetch_req) { /* Remove the fetch request from the table of fetch requests. */ manager_state->fetch_requests.erase(fetch_req->object_id); - /* Free the fetch request and everything in it. */ - for (int i = 0; i < fetch_req->manager_count; ++i) { - free(fetch_req->manager_vector[i]); - } - if (fetch_req->manager_vector != NULL) { - free(fetch_req->manager_vector); - } - free(fetch_req); + /* Free the fetch request. */ + delete fetch_req; } PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name, @@ -902,13 +856,13 @@ void process_data_request(event_loop *loop, void request_transfer_from(PlasmaManagerState *manager_state, FetchRequest *fetch_req) { - CHECK(fetch_req->manager_count > 0); + CHECK(fetch_req->manager_vector.size() > 0); CHECK(fetch_req->next_manager >= 0 && - fetch_req->next_manager < fetch_req->manager_count); + fetch_req->next_manager < fetch_req->manager_vector.size()); char addr[16]; int port; - parse_ip_addr_port(fetch_req->manager_vector[fetch_req->next_manager], addr, - &port); + parse_ip_addr_port(fetch_req->manager_vector[fetch_req->next_manager].c_str(), + addr, &port); ClientConnection *manager_conn = get_manager_connection(manager_state, addr, port); @@ -940,7 +894,7 @@ void request_transfer_from(PlasmaManagerState *manager_state, /* On the next attempt, try the next manager in manager_vector. */ fetch_req->next_manager += 1; - fetch_req->next_manager %= fetch_req->manager_count; + fetch_req->next_manager %= fetch_req->manager_vector.size(); } int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { @@ -960,7 +914,7 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { for (auto it = manager_state->fetch_requests.begin(); it != manager_state->fetch_requests.end(); it++) { FetchRequest *fetch_req = it->second; - if (fetch_req->manager_count > 0) { + if (fetch_req->manager_vector.size() > 0) { request_transfer_from(manager_state, fetch_req); /* If we've tried all of the managers that we know about for this object, * add this object to the list to resend requests for. */ @@ -992,13 +946,12 @@ bool is_object_local(PlasmaManagerState *state, ObjectID object_id) { } void request_transfer(ObjectID object_id, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *context) { PlasmaManagerState *manager_state = (PlasmaManagerState *) context; /* This callback is called from object_table_subscribe, which guarantees that * the manager vector contains at least one element. */ - CHECK(manager_count >= 1); + CHECK(manager_vector.size() >= 1); auto it = manager_state->fetch_requests.find(object_id); if (is_object_local(manager_state, object_id)) { @@ -1015,25 +968,9 @@ void request_transfer(ObjectID object_id, * callback gets called. */ CHECK(fetch_req != NULL); - /* This method may be run multiple times, so if we are updating the manager - * vector, we need to free the previous manager vector. */ - if (fetch_req->manager_count != 0) { - for (int i = 0; i < fetch_req->manager_count; ++i) { - free(fetch_req->manager_vector[i]); - } - free(fetch_req->manager_vector); - } /* Update the manager vector. */ - fetch_req->manager_count = manager_count; - fetch_req->manager_vector = (char **) malloc(manager_count * sizeof(char *)); + fetch_req->manager_vector = manager_vector; fetch_req->next_manager = 0; - memset(fetch_req->manager_vector, 0, manager_count * sizeof(char *)); - for (int i = 0; i < manager_count; ++i) { - int len = strlen(manager_vector[i]); - fetch_req->manager_vector[i] = (char *) malloc(len + 1); - strncpy(fetch_req->manager_vector[i], manager_vector[i], len); - fetch_req->manager_vector[i][len] = '\0'; - } /* Wait for the object data for the default number of retries, which timeout * after a default interval. */ request_transfer_from(manager_state, fetch_req); @@ -1041,8 +978,7 @@ void request_transfer(ObjectID object_id, /* This method is only called from the tests. */ void call_request_transfer(ObjectID object_id, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *context) { PlasmaManagerState *manager_state = (PlasmaManagerState *) context; /* Check that there isn't already a fetch request for this object. */ @@ -1051,7 +987,7 @@ void call_request_transfer(ObjectID object_id, /* Create a fetch request. */ FetchRequest *fetch_req = create_fetch_request(manager_state, object_id); manager_state->fetch_requests[object_id] = fetch_req; - request_transfer(object_id, manager_count, manager_vector, context); + request_transfer(object_id, manager_vector, context); } void fatal_table_callback(ObjectID id, void *user_context, void *user_data) { @@ -1059,13 +995,12 @@ void fatal_table_callback(ObjectID id, void *user_context, void *user_data) { } void object_present_callback(ObjectID object_id, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *context) { PlasmaManagerState *manager_state = (PlasmaManagerState *) context; /* This callback is called from object_table_subscribe, which guarantees that * the manager vector contains at least one element. */ - CHECK(manager_count >= 1); + CHECK(manager_vector.size() >= 1); /* Update the in-progress remote wait requests. */ update_object_wait_requests(manager_state, object_id, @@ -1075,19 +1010,19 @@ void object_present_callback(ObjectID object_id, /* This callback is used by both fetch and wait. Therefore, it may have to * handle outstanding fetch and wait requests. */ -void object_table_subscribe_callback(ObjectID object_id, - int64_t data_size, - int manager_count, - const char *manager_vector[], - void *context) { +void object_table_subscribe_callback( + ObjectID object_id, + int64_t data_size, + const std::vector &manager_vector, + void *context) { PlasmaManagerState *manager_state = (PlasmaManagerState *) context; /* Run the callback for fetch requests if there is a fetch request. */ auto it = manager_state->fetch_requests.find(object_id); if (it != manager_state->fetch_requests.end()) { - request_transfer(object_id, manager_count, manager_vector, context); + request_transfer(object_id, manager_vector, context); } /* Run the callback for wait requests. */ - object_present_callback(object_id, manager_count, manager_vector, context); + object_present_callback(object_id, manager_vector, context); } void process_fetch_requests(ClientConnection *client_conn, @@ -1217,24 +1152,19 @@ void process_wait_request(ClientConnection *client_conn, * Check whether a non-local object is stored on any remot enote or not. * * @param object_id ID of the object whose status we require. - * @param manager_cont Number of remote nodes object_id is stored at. If - * manager_count > 0, then object_id exists on a remote node an its - * status is ObjectStatus_Remote. Otherwise, if manager_count == 0, the - * object doesn't exist in the system and its status is - * ObjectStatus_Nonexistent. - * @param manager_vector Array containing the Plasma Managers running at the - * nodes where object_id is stored. Not used; it will be eventually - * deallocated. + * @param never_created True if the object has not been created yet and false + * otherwise. + * @param manager_vector Vector containing the addresses of the Plasma Managers + * that have the object. * @param context Client connection. * @return Void. */ void request_status_done(ObjectID object_id, - int manager_count, - const char *manager_vector[], + bool never_created, + const std::vector &manager_vector, void *context) { ClientConnection *client_conn = (ClientConnection *) context; - int status = - request_status(object_id, manager_count, manager_vector, context); + int status = request_status(object_id, manager_vector, context); plasma::ObjectID object_id_copy = object_id.to_plasma_id(); handle_sigpipe( plasma::SendStatusReply(client_conn->fd, &object_id_copy, &status, 1), @@ -1242,8 +1172,7 @@ void request_status_done(ObjectID object_id, } int request_status(ObjectID object_id, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *context) { ClientConnection *client_conn = (ClientConnection *) context; @@ -1252,10 +1181,11 @@ int request_status(ObjectID object_id, return ObjectStatus_Local; } - /* Since object is not stored at the local locally, manager_count > 0 means - * that the object is stored at another remote object. Otherwise, if - * manager_count == 0, the object is not stored anywhere. */ - return (manager_count > 0 ? ObjectStatus_Remote : ObjectStatus_Nonexistent); + /* Since object is not stored at the local locally, manager_vector.size() > 0 + * means that the object is stored at another remote object. Otherwise, if + * manager_vector.size() == 0, the object is not stored anywhere. */ + return (manager_vector.size() > 0 ? ObjectStatus_Remote + : ObjectStatus_Nonexistent); } void object_table_lookup_fail_callback(ObjectID object_id, diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index ea9f96432..7eedc33a9 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -21,7 +21,6 @@ typedef struct PlasmaManagerState PlasmaManagerState; typedef struct ClientConnection ClientConnection; -typedef struct ClientObjectRequest ClientObjectRequest; /** * Initializes the plasma manager state. This connects the manager to the local @@ -182,14 +181,12 @@ typedef struct PlasmaRequestBuffer { * object. This method is only called from the tests. * * @param object_id The object ID of the object to transfer. - * @param manager_count The number of managers that have the object. * @param manager_vector The Plasma managers that have the object. * @param context The plasma manager state. * @return Void. */ void call_request_transfer(ObjectID object_id, - int manager_count, - const char *manager_vector[], + const std::vector &manager_vector, void *context); /* @@ -200,17 +197,6 @@ void call_request_transfer(ObjectID object_id, */ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context); -/** - * Clean up and free an active object context. Deregister it from the - * associated client connection and from the manager state. - * - * @param client_conn The client connection context. - * @param object_id The object ID whose context we want to delete. - * @return Void. - */ -void remove_object_request(ClientConnection *client_conn, - ClientObjectRequest *object_req); - /** * Get a connection to the remote manager at the specified address. Creates a * new connection to this manager if one doesn't already exist. diff --git a/src/plasma/test/manager_tests.cc b/src/plasma/test/manager_tests.cc index f390c63ee..b46057f0c 100644 --- a/src/plasma/test/manager_tests.cc +++ b/src/plasma/test/manager_tests.cc @@ -118,13 +118,10 @@ void destroy_plasma_mock(plasma_mock *mock) { TEST request_transfer_test(void) { plasma_mock *local_mock = init_plasma_mock(NULL); plasma_mock *remote_mock = init_plasma_mock(local_mock); - const char **manager_vector = (const char **) malloc(sizeof(char *)); - UT_string *addr = NULL; - utstring_new(addr); - utstring_printf(addr, "127.0.0.1:%d", remote_mock->port); - manager_vector[0] = utstring_body(addr); - call_request_transfer(object_id, 1, manager_vector, local_mock->state); - free(manager_vector); + std::vector manager_vector; + manager_vector.push_back(std::string("127.0.0.1:") + + std::to_string(remote_mock->port)); + call_request_transfer(object_id, manager_vector, local_mock->state); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, test_done_handler, local_mock->state); event_loop_run(local_mock->loop); @@ -140,7 +137,6 @@ TEST request_transfer_test(void) { ASSERT(ObjectID_equal(object_id, object_id2)); free(address); /* Clean up. */ - utstring_free(addr); destroy_plasma_mock(remote_mock); destroy_plasma_mock(local_mock); PASS(); @@ -162,17 +158,14 @@ TEST request_transfer_retry_test(void) { plasma_mock *local_mock = init_plasma_mock(NULL); plasma_mock *remote_mock1 = init_plasma_mock(local_mock); plasma_mock *remote_mock2 = init_plasma_mock(local_mock); - const char **manager_vector = (const char **) malloc(sizeof(char *) * 2); - UT_string *addr0 = NULL; - utstring_new(addr0); - utstring_printf(addr0, "127.0.0.1:%d", remote_mock1->port); - manager_vector[0] = utstring_body(addr0); - UT_string *addr1 = NULL; - utstring_new(addr1); - utstring_printf(addr1, "127.0.0.1:%d", remote_mock2->port); - manager_vector[1] = utstring_body(addr1); - call_request_transfer(object_id, 2, manager_vector, local_mock->state); - free(manager_vector); + + std::vector manager_vector; + manager_vector.push_back(std::string("127.0.0.1:") + + std::to_string(remote_mock1->port)); + manager_vector.push_back(std::string("127.0.0.1:") + + std::to_string(remote_mock2->port)); + + call_request_transfer(object_id, manager_vector, local_mock->state); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler, local_mock->state); /* Register the fetch timeout handler. This is normally done when the plasma @@ -194,8 +187,6 @@ TEST request_transfer_retry_test(void) { free(address); ASSERT(ObjectID_equal(object_id, object_id2)); /* Clean up. */ - utstring_free(addr0); - utstring_free(addr1); destroy_plasma_mock(remote_mock2); destroy_plasma_mock(remote_mock1); destroy_plasma_mock(local_mock);