Add timing statement to loop that calls redis_get_cached_db_client be… (#1045)

* Add timing statement to loop that calls redis_get_cached_db_client because it has been slow in the past.

* Fix linting.

* Refactoring to make manager vectors into std::vector.

* Fix linting.

* Fixes.
This commit is contained in:
Robert Nishihara
2017-10-02 10:46:21 -07:00
committed by Philipp Moritz
parent a31d138f21
commit 1488975d1b
9 changed files with 184 additions and 256 deletions
+4 -5
View File
@@ -13,20 +13,19 @@
/* Callback called when the lookup completes. The callback should free
* the manager_vector array, but NOT the strings they are pointing to. If there
* was no entry at all for the object (the object had never been created
* before), then manager_count will be -1.
* before), then never_created will be true.
*/
typedef void (*object_table_lookup_done_callback)(
ObjectID object_id,
int manager_count,
OWNER const char *manager_vector[],
bool never_created,
const std::vector<std::string> &manager_vector,
void *user_context);
/* Callback called when object ObjectID is available. */
typedef void (*object_table_object_available_callback)(
ObjectID object_id,
int64_t data_size,
int manager_count,
OWNER const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *user_context);
/**
+69 -37
View File
@@ -599,14 +599,13 @@ void redis_result_table_lookup(TableCallbackData *callback_data) {
*
* @param db The database handle.
* @param index The index of the plasma manager.
* @param manager The pointer where the IP address of the manager gets written.
* @return Void.
* @return The IP address and port of the manager.
*/
void redis_get_cached_db_client(DBHandle *db,
DBClientID db_client_id,
const char **manager) {
const std::string redis_get_cached_db_client(DBHandle *db,
DBClientID db_client_id) {
auto it = db->db_client_cache.find(db_client_id);
char *manager;
if (it == db->db_client_cache.end()) {
/* This is a very rare case. It should happen at most once per db client. */
redisReply *reply = (redisReply *) redisCommand(
@@ -617,10 +616,42 @@ void redis_get_cached_db_client(DBHandle *db,
char *addr = strdup(reply->str);
freeReplyObject(reply);
db->db_client_cache[db_client_id] = addr;
*manager = addr;
manager = addr;
} else {
*manager = it->second;
manager = it->second;
}
std::string manager_address(manager);
return manager_address;
}
const std::vector<std::string> redis_get_cached_db_clients(
DBHandle *db,
const std::vector<DBClientID> &manager_ids) {
/* We time this function because in the past this loop has taken multiple
* seconds under stressful situations on hundreds of machines causing the
* plasma manager to die (because it went too long without sending
* heartbeats). */
int64_t start_time = current_time_ms();
/* Construct the manager vector from the flatbuffers object. */
std::vector<std::string> manager_vector;
for (auto const &manager_id : manager_ids) {
const std::string manager_address =
redis_get_cached_db_client(db, manager_id);
manager_vector.push_back(manager_address);
}
int64_t end_time = current_time_ms();
int64_t max_time_for_loop = 1000;
if (end_time - start_time > max_time_for_loop) {
LOG_WARN(
"calling redis_get_cached_db_client in a loop in with %zu manager IDs "
"took %" PRId64 " milliseconds.",
manager_ids.size(), end_time - start_time);
}
return manager_vector;
}
void redis_object_table_lookup_callback(redisAsyncContext *c,
@@ -631,43 +662,41 @@ void redis_object_table_lookup_callback(redisAsyncContext *c,
LOG_DEBUG("Object table lookup callback");
CHECK(reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ARRAY);
object_table_lookup_done_callback done_callback =
(object_table_lookup_done_callback) callback_data->done_callback;
ObjectID obj_id = callback_data->id;
int64_t manager_count = 0;
DBClientID *managers = NULL;
const char **manager_vector = NULL;
/* Parse the Redis reply. */
if (reply->type == REDIS_REPLY_NIL) {
/* The object entry did not exist. */
manager_count = -1;
} else if (reply->type == REDIS_REPLY_ARRAY) {
manager_count = reply->elements;
if (manager_count > 0) {
managers = (DBClientID *) malloc(reply->elements * sizeof(DBClientID));
manager_vector = (const char **) malloc(manager_count * sizeof(char *));
if (done_callback) {
done_callback(obj_id, true, std::vector<std::string>(),
callback_data->user_context);
}
} else if (reply->type == REDIS_REPLY_ARRAY) {
/* Extract the manager IDs from the response into a vector. */
std::vector<DBClientID> manager_ids;
for (int j = 0; j < reply->elements; ++j) {
CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id));
redis_get_cached_db_client(db, managers[j], manager_vector + j);
DBClientID manager_id;
memcpy(manager_id.id, reply->element[j]->str, sizeof(manager_id.id));
manager_ids.push_back(manager_id);
}
const std::vector<std::string> manager_vector =
redis_get_cached_db_clients(db, manager_ids);
if (done_callback) {
done_callback(obj_id, false, manager_vector, callback_data->user_context);
}
} else {
LOG_FATAL("Unexpected reply type from object table lookup.");
}
object_table_lookup_done_callback done_callback =
(object_table_lookup_done_callback) callback_data->done_callback;
if (done_callback) {
done_callback(obj_id, manager_count, manager_vector,
callback_data->user_context);
}
/* Clean up timer and callback. */
destroy_timer_callback(callback_data->db_handle->loop, callback_data);
if (manager_count > 0) {
free(managers);
free(manager_vector);
}
}
void object_table_redis_subscribe_to_notifications_callback(
@@ -704,22 +733,24 @@ void object_table_redis_subscribe_to_notifications_callback(
/* Extract the data size. */
int64_t data_size = message->object_size();
int manager_count = message->manager_ids()->size();
/* Construct the manager vector from the flatbuffers object. */
const char **manager_vector =
(const char **) malloc(manager_count * sizeof(char *));
/* Extract the manager IDs from the response into a vector. */
std::vector<DBClientID> manager_ids;
for (int i = 0; i < manager_count; ++i) {
DBClientID manager_id = from_flatbuf(message->manager_ids()->Get(i));
redis_get_cached_db_client(db, manager_id, &manager_vector[i]);
manager_ids.push_back(manager_id);
}
const std::vector<std::string> manager_vector =
redis_get_cached_db_clients(db, manager_ids);
/* Call the subscribe callback. */
ObjectTableSubscribeData *data =
(ObjectTableSubscribeData *) callback_data->data;
if (data->object_available_callback) {
data->object_available_callback(obj_id, data_size, manager_count,
manager_vector, data->subscribe_context);
data->object_available_callback(obj_id, data_size, manager_vector,
data->subscribe_context);
}
free(manager_vector);
} else if (strcmp(message_type->str, "subscribe") == 0) {
/* The reply for the initial SUBSCRIBE command. */
/* Call the done callback if there is one. This code path should only be
@@ -727,7 +758,8 @@ void object_table_redis_subscribe_to_notifications_callback(
if (callback_data->done_callback != NULL) {
object_table_lookup_done_callback done_callback =
(object_table_lookup_done_callback) callback_data->done_callback;
done_callback(NIL_ID, 0, NULL, callback_data->user_context);
done_callback(NIL_ID, false, std::vector<std::string>(),
callback_data->user_context);
}
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
+5 -7
View File
@@ -38,17 +38,15 @@ const int TEST_NUMBER = 10;
/* Test if entries have been written to the database. */
void lookup_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
bool never_created,
const std::vector<std::string> &manager_vector,
void *user_context) {
CHECK(manager_count == 2);
if (!manager_vector[0] ||
sscanf(manager_vector[0], "%15[0-9.]:%5[0-9]", received_addr1,
CHECK(manager_vector.size() == 2);
if (sscanf(manager_vector.at(0).c_str(), "%15[0-9.]:%5[0-9]", received_addr1,
received_port1) != 2) {
CHECK(0);
}
if (!manager_vector[1] ||
sscanf(manager_vector[1], "%15[0-9.]:%5[0-9]", received_addr2,
if (sscanf(manager_vector.at(1).c_str(), "%15[0-9.]:%5[0-9]", received_addr2,
received_port2) != 2) {
CHECK(0);
}
+36 -45
View File
@@ -145,8 +145,8 @@ const char *lookup_timeout_context = "lookup_timeout";
int lookup_failed = 0;
void lookup_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
bool never_created,
const std::vector<std::string> &manager_vector,
void *context) {
/* The done callback should not be called. */
CHECK(0);
@@ -226,8 +226,7 @@ int subscribe_failed = 0;
void subscribe_done_callback(ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *user_context) {
/* The done callback should not be called. */
CHECK(0);
@@ -293,14 +292,6 @@ int64_t terminate_event_loop_callback(event_loop *loop,
const char *lookup_retry_context = "lookup_retry";
int lookup_retry_succeeded = 0;
void lookup_retry_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
void *context) {
CHECK(context == (void *) lookup_retry_context);
lookup_retry_succeeded = 1;
}
void lookup_retry_fail_callback(UniqueID id,
void *user_context,
void *user_data) {
@@ -316,12 +307,12 @@ int add_retry_succeeded = 0;
/* === Test add then lookup retry === */
void add_lookup_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
bool never_created,
const std::vector<std::string> &manager_vector,
void *context) {
CHECK(context == (void *) lookup_retry_context);
CHECK(manager_count == 1);
CHECK(strcmp(manager_vector[0], "127.0.0.1:11235") == 0);
CHECK(manager_vector.size() == 1);
CHECK(manager_vector.at(0) == "127.0.0.1:11235");
lookup_retry_succeeded = 1;
}
@@ -365,12 +356,13 @@ TEST add_lookup_test(void) {
}
/* === Test add, remove, then lookup === */
void add_remove_lookup_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
void *context) {
void add_remove_lookup_done_callback(
ObjectID object_id,
bool never_created,
const std::vector<std::string> &manager_vector,
void *context) {
CHECK(context == (void *) lookup_retry_context);
CHECK(manager_count == 0);
CHECK(manager_vector.size() == 0);
lookup_retry_succeeded = 1;
}
@@ -440,8 +432,8 @@ void lookup_late_fail_callback(UniqueID id,
}
void lookup_late_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
bool never_created,
const std::vector<std::string> &manager_vector,
void *context) {
/* This function should never be called. */
CHECK(0);
@@ -528,10 +520,11 @@ void subscribe_late_fail_callback(UniqueID id,
subscribe_late_failed = 1;
}
void subscribe_late_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
void subscribe_late_done_callback(
ObjectID object_id,
bool never_created,
const std::vector<std::string> &manager_vector,
void *user_context) {
/* This function should never be called. */
CHECK(0);
}
@@ -578,10 +571,11 @@ void subscribe_success_fail_callback(UniqueID id,
CHECK(0);
}
void subscribe_success_done_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
void subscribe_success_done_callback(
ObjectID object_id,
bool never_created,
const std::vector<std::string> &manager_vector,
void *user_context) {
RetryInfo retry = {
.num_retries = 0, .timeout = 750, .fail_callback = NULL,
};
@@ -590,14 +584,14 @@ void subscribe_success_done_callback(ObjectID object_id,
subscribe_success_done = 1;
}
void subscribe_success_object_available_callback(ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
void *user_context) {
void subscribe_success_object_available_callback(
ObjectID object_id,
int64_t data_size,
const std::vector<std::string> &manager_vector,
void *user_context) {
CHECK(user_context == (void *) subscribe_success_context);
CHECK(ObjectID_equal(object_id, subscribe_id));
CHECK(manager_count == 1);
CHECK(manager_vector.size() == 1);
subscribe_success_succeeded = 1;
}
@@ -651,15 +645,14 @@ int subscribe_object_present_succeeded = 0;
void subscribe_object_present_object_available_callback(
ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *user_context) {
subscribe_object_present_context_t *ctx =
(subscribe_object_present_context_t *) user_context;
CHECK(ctx->data_size == data_size);
CHECK(strcmp(subscribe_object_present_str, ctx->teststr) == 0);
subscribe_object_present_succeeded = 1;
CHECK(manager_count == 1);
CHECK(manager_vector.size() == 1);
}
void fatal_fail_callback(UniqueID id, void *user_context, void *user_data) {
@@ -718,8 +711,7 @@ const char *subscribe_object_not_present_context =
void subscribe_object_not_present_object_available_callback(
ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *user_context) {
/* This should not be called. */
CHECK(0);
@@ -768,8 +760,7 @@ int subscribe_object_available_later_succeeded = 0;
void subscribe_object_available_later_object_available_callback(
ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *user_context) {
subscribe_object_present_context_t *myctx =
(subscribe_object_present_context_t *) user_context;
@@ -777,7 +768,7 @@ void subscribe_object_available_later_object_available_callback(
CHECK(strcmp(myctx->teststr, subscribe_object_available_later_context) == 0);
/* Make sure the callback is only called once. */
subscribe_object_available_later_succeeded += 1;
CHECK(manager_count == 1);
CHECK(manager_vector.size() == 1);
}
TEST subscribe_object_available_later_test(void) {
+9 -9
View File
@@ -278,19 +278,19 @@ void process_new_db_client(DBClient *db_client, void *user_context) {
* @param user_context The user context.
* @return Void.
*/
void object_table_subscribe_callback(ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
void *user_context) {
void object_table_subscribe_callback(
ObjectID object_id,
int64_t data_size,
const std::vector<std::string> &manager_vector,
void *user_context) {
/* Extract global scheduler state from the callback context. */
GlobalSchedulerState *state = (GlobalSchedulerState *) user_context;
char id_string[ID_STRING_SIZE];
LOG_DEBUG("object table subscribe callback for OBJECT = %s",
ObjectID_to_string(object_id, id_string, ID_STRING_SIZE));
ARROW_UNUSED(id_string);
LOG_DEBUG("\tManagers<%d>:", manager_count);
for (int i = 0; i < manager_count; i++) {
LOG_DEBUG("\tManagers<%d>:", manager_vector.size());
for (int i = 0; i < manager_vector.size(); i++) {
LOG_DEBUG("\t\t%s", manager_vector[i]);
}
@@ -304,7 +304,7 @@ void object_table_subscribe_callback(ObjectID object_id,
LOG_DEBUG("New object added to object_info_table with id = %s",
ObjectID_to_string(object_id, id_string, ID_STRING_SIZE));
LOG_DEBUG("\tmanager locations:");
for (int i = 0; i < manager_count; i++) {
for (int i = 0; i < manager_vector.size(); i++) {
LOG_DEBUG("\t\t%s", manager_vector[i]);
}
}
@@ -314,7 +314,7 @@ void object_table_subscribe_callback(ObjectID object_id,
/* In all cases, replace the object location vector on each callback. */
obj_info_entry.object_locations.clear();
for (int i = 0; i < manager_count; i++) {
for (int i = 0; i < manager_vector.size(); i++) {
obj_info_entry.object_locations.push_back(std::string(manager_vector[i]));
}
}
+7 -6
View File
@@ -723,23 +723,24 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
reconstruct_task_update_callback, state);
}
void reconstruct_object_lookup_callback(ObjectID reconstruct_object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
void reconstruct_object_lookup_callback(
ObjectID reconstruct_object_id,
bool never_created,
const std::vector<std::string> &manager_vector,
void *user_context) {
LOG_DEBUG("Manager count was %d", manager_count);
/* Only continue reconstruction if we find that the object doesn't exist on
* any nodes. NOTE: This codepath is not responsible for checking if the
* object table entry is up-to-date. */
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
/* Look up the task that created the object in the result table. */
if (manager_count == 0) {
if (!never_created && manager_vector.size() == 0) {
/* If the object was created and later evicted, we reconstruct the object
* if and only if there are no other instances of the task running. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_evicted_result_lookup_callback,
(void *) state);
} else if (manager_count == -1) {
} else if (never_created) {
/* If the object has not been created yet, we reconstruct the object if and
* only if the task that created the object failed to complete. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
+41 -111
View File
@@ -92,15 +92,13 @@ void process_status_request(ClientConnection *client_conn, ObjectID object_id);
* a remote Plasma Store.
*
* @param object_id ID of the object to transfer or to get its status.
* @param manager_cont Number of remote nodes object_id is stored at.
* @param manager_vector Array containing the Plasma Managers
* running at the nodes where object_id is stored.
* @param manager_vector Array containing the Plasma Managers running at the
* nodes where object_id is stored.
* @param context Client connection.
* @return Status of object_id as defined in plasma.h
*/
int request_status(ObjectID object_id,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *context);
/**
@@ -142,11 +140,8 @@ void process_data_request(event_loop *loop,
typedef struct {
/** The ID of the object we are fetching or waiting for. */
ObjectID object_id;
/** Pointer to the array containing the manager locations of this object. This
* struct owns and must free each entry. */
char **manager_vector;
/** The number of manager locations in the array manager_vector. */
int manager_count;
/** Vector of the addresses of the managers containing this object. */
std::vector<std::string> manager_vector;
/** The next manager we should try to contact. This is set to an index in
* manager_vector in the retry handler, in case the current attempt fails to
* contact a manager. */
@@ -241,39 +236,6 @@ struct PlasmaManagerState {
PlasmaManagerState *g_manager_state = NULL;
/* The context for fetch and wait requests. These are per client, per object. */
struct ClientObjectRequest {
/** The ID of the object we are fetching or waiting for. */
ObjectID object_id;
/** The client connection context, shared between other ClientObjectRequest
* structs for the same client. */
ClientConnection *client_conn;
/** The ID for the timer that will time out the current request to the state
* database or another plasma manager. */
int64_t timer;
/** How many retries we have left for the request. Decremented on every
* timeout. */
int num_retries;
/** Handle for a linked list. */
ClientObjectRequest *next;
/** Pointer to the array containing the manager locations of
* this object. */
char **manager_vector;
/** The number of manager locations in the array manager_vector. */
int manager_count;
/** The next manager we should try to contact. This is set to an index in
* manager_vector in the retry handler, in case the current attempt fails to
* contact a manager. */
int next_manager;
/** Handle for the uthash table in the client connection
* context that keeps track of active object connection
* contexts. */
UT_hash_handle active_hh;
/** Handle for the uthash table in the manager state that
* keeps track of outstanding fetch requests. */
UT_hash_handle fetch_hh;
};
/* Context for a client connection to another plasma manager. */
struct ClientConnection {
/** Current state for this plasma manager. This is shared
@@ -459,10 +421,8 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
FetchRequest *create_fetch_request(PlasmaManagerState *manager_state,
ObjectID object_id) {
FetchRequest *fetch_req = (FetchRequest *) malloc(sizeof(FetchRequest));
FetchRequest *fetch_req = new FetchRequest();
fetch_req->object_id = object_id;
fetch_req->manager_count = 0;
fetch_req->manager_vector = NULL;
return fetch_req;
}
@@ -477,14 +437,8 @@ void remove_fetch_request(PlasmaManagerState *manager_state,
FetchRequest *fetch_req) {
/* Remove the fetch request from the table of fetch requests. */
manager_state->fetch_requests.erase(fetch_req->object_id);
/* Free the fetch request and everything in it. */
for (int i = 0; i < fetch_req->manager_count; ++i) {
free(fetch_req->manager_vector[i]);
}
if (fetch_req->manager_vector != NULL) {
free(fetch_req->manager_vector);
}
free(fetch_req);
/* Free the fetch request. */
delete fetch_req;
}
PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name,
@@ -902,13 +856,13 @@ void process_data_request(event_loop *loop,
void request_transfer_from(PlasmaManagerState *manager_state,
FetchRequest *fetch_req) {
CHECK(fetch_req->manager_count > 0);
CHECK(fetch_req->manager_vector.size() > 0);
CHECK(fetch_req->next_manager >= 0 &&
fetch_req->next_manager < fetch_req->manager_count);
fetch_req->next_manager < fetch_req->manager_vector.size());
char addr[16];
int port;
parse_ip_addr_port(fetch_req->manager_vector[fetch_req->next_manager], addr,
&port);
parse_ip_addr_port(fetch_req->manager_vector[fetch_req->next_manager].c_str(),
addr, &port);
ClientConnection *manager_conn =
get_manager_connection(manager_state, addr, port);
@@ -940,7 +894,7 @@ void request_transfer_from(PlasmaManagerState *manager_state,
/* On the next attempt, try the next manager in manager_vector. */
fetch_req->next_manager += 1;
fetch_req->next_manager %= fetch_req->manager_count;
fetch_req->next_manager %= fetch_req->manager_vector.size();
}
int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
@@ -960,7 +914,7 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
for (auto it = manager_state->fetch_requests.begin();
it != manager_state->fetch_requests.end(); it++) {
FetchRequest *fetch_req = it->second;
if (fetch_req->manager_count > 0) {
if (fetch_req->manager_vector.size() > 0) {
request_transfer_from(manager_state, fetch_req);
/* If we've tried all of the managers that we know about for this object,
* add this object to the list to resend requests for. */
@@ -992,13 +946,12 @@ bool is_object_local(PlasmaManagerState *state, ObjectID object_id) {
}
void request_transfer(ObjectID object_id,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *context) {
PlasmaManagerState *manager_state = (PlasmaManagerState *) context;
/* This callback is called from object_table_subscribe, which guarantees that
* the manager vector contains at least one element. */
CHECK(manager_count >= 1);
CHECK(manager_vector.size() >= 1);
auto it = manager_state->fetch_requests.find(object_id);
if (is_object_local(manager_state, object_id)) {
@@ -1015,25 +968,9 @@ void request_transfer(ObjectID object_id,
* callback gets called. */
CHECK(fetch_req != NULL);
/* This method may be run multiple times, so if we are updating the manager
* vector, we need to free the previous manager vector. */
if (fetch_req->manager_count != 0) {
for (int i = 0; i < fetch_req->manager_count; ++i) {
free(fetch_req->manager_vector[i]);
}
free(fetch_req->manager_vector);
}
/* Update the manager vector. */
fetch_req->manager_count = manager_count;
fetch_req->manager_vector = (char **) malloc(manager_count * sizeof(char *));
fetch_req->manager_vector = manager_vector;
fetch_req->next_manager = 0;
memset(fetch_req->manager_vector, 0, manager_count * sizeof(char *));
for (int i = 0; i < manager_count; ++i) {
int len = strlen(manager_vector[i]);
fetch_req->manager_vector[i] = (char *) malloc(len + 1);
strncpy(fetch_req->manager_vector[i], manager_vector[i], len);
fetch_req->manager_vector[i][len] = '\0';
}
/* Wait for the object data for the default number of retries, which timeout
* after a default interval. */
request_transfer_from(manager_state, fetch_req);
@@ -1041,8 +978,7 @@ void request_transfer(ObjectID object_id,
/* This method is only called from the tests. */
void call_request_transfer(ObjectID object_id,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *context) {
PlasmaManagerState *manager_state = (PlasmaManagerState *) context;
/* Check that there isn't already a fetch request for this object. */
@@ -1051,7 +987,7 @@ void call_request_transfer(ObjectID object_id,
/* Create a fetch request. */
FetchRequest *fetch_req = create_fetch_request(manager_state, object_id);
manager_state->fetch_requests[object_id] = fetch_req;
request_transfer(object_id, manager_count, manager_vector, context);
request_transfer(object_id, manager_vector, context);
}
void fatal_table_callback(ObjectID id, void *user_context, void *user_data) {
@@ -1059,13 +995,12 @@ void fatal_table_callback(ObjectID id, void *user_context, void *user_data) {
}
void object_present_callback(ObjectID object_id,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *context) {
PlasmaManagerState *manager_state = (PlasmaManagerState *) context;
/* This callback is called from object_table_subscribe, which guarantees that
* the manager vector contains at least one element. */
CHECK(manager_count >= 1);
CHECK(manager_vector.size() >= 1);
/* Update the in-progress remote wait requests. */
update_object_wait_requests(manager_state, object_id,
@@ -1075,19 +1010,19 @@ void object_present_callback(ObjectID object_id,
/* This callback is used by both fetch and wait. Therefore, it may have to
* handle outstanding fetch and wait requests. */
void object_table_subscribe_callback(ObjectID object_id,
int64_t data_size,
int manager_count,
const char *manager_vector[],
void *context) {
void object_table_subscribe_callback(
ObjectID object_id,
int64_t data_size,
const std::vector<std::string> &manager_vector,
void *context) {
PlasmaManagerState *manager_state = (PlasmaManagerState *) context;
/* Run the callback for fetch requests if there is a fetch request. */
auto it = manager_state->fetch_requests.find(object_id);
if (it != manager_state->fetch_requests.end()) {
request_transfer(object_id, manager_count, manager_vector, context);
request_transfer(object_id, manager_vector, context);
}
/* Run the callback for wait requests. */
object_present_callback(object_id, manager_count, manager_vector, context);
object_present_callback(object_id, manager_vector, context);
}
void process_fetch_requests(ClientConnection *client_conn,
@@ -1217,24 +1152,19 @@ void process_wait_request(ClientConnection *client_conn,
* Check whether a non-local object is stored on any remot enote or not.
*
* @param object_id ID of the object whose status we require.
* @param manager_cont Number of remote nodes object_id is stored at. If
* manager_count > 0, then object_id exists on a remote node an its
* status is ObjectStatus_Remote. Otherwise, if manager_count == 0, the
* object doesn't exist in the system and its status is
* ObjectStatus_Nonexistent.
* @param manager_vector Array containing the Plasma Managers running at the
* nodes where object_id is stored. Not used; it will be eventually
* deallocated.
* @param never_created True if the object has not been created yet and false
* otherwise.
* @param manager_vector Vector containing the addresses of the Plasma Managers
* that have the object.
* @param context Client connection.
* @return Void.
*/
void request_status_done(ObjectID object_id,
int manager_count,
const char *manager_vector[],
bool never_created,
const std::vector<std::string> &manager_vector,
void *context) {
ClientConnection *client_conn = (ClientConnection *) context;
int status =
request_status(object_id, manager_count, manager_vector, context);
int status = request_status(object_id, manager_vector, context);
plasma::ObjectID object_id_copy = object_id.to_plasma_id();
handle_sigpipe(
plasma::SendStatusReply(client_conn->fd, &object_id_copy, &status, 1),
@@ -1242,8 +1172,7 @@ void request_status_done(ObjectID object_id,
}
int request_status(ObjectID object_id,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *context) {
ClientConnection *client_conn = (ClientConnection *) context;
@@ -1252,10 +1181,11 @@ int request_status(ObjectID object_id,
return ObjectStatus_Local;
}
/* Since object is not stored at the local locally, manager_count > 0 means
* that the object is stored at another remote object. Otherwise, if
* manager_count == 0, the object is not stored anywhere. */
return (manager_count > 0 ? ObjectStatus_Remote : ObjectStatus_Nonexistent);
/* Since object is not stored at the local locally, manager_vector.size() > 0
* means that the object is stored at another remote object. Otherwise, if
* manager_vector.size() == 0, the object is not stored anywhere. */
return (manager_vector.size() > 0 ? ObjectStatus_Remote
: ObjectStatus_Nonexistent);
}
void object_table_lookup_fail_callback(ObjectID object_id,
+1 -15
View File
@@ -21,7 +21,6 @@
typedef struct PlasmaManagerState PlasmaManagerState;
typedef struct ClientConnection ClientConnection;
typedef struct ClientObjectRequest ClientObjectRequest;
/**
* Initializes the plasma manager state. This connects the manager to the local
@@ -182,14 +181,12 @@ typedef struct PlasmaRequestBuffer {
* object. This method is only called from the tests.
*
* @param object_id The object ID of the object to transfer.
* @param manager_count The number of managers that have the object.
* @param manager_vector The Plasma managers that have the object.
* @param context The plasma manager state.
* @return Void.
*/
void call_request_transfer(ObjectID object_id,
int manager_count,
const char *manager_vector[],
const std::vector<std::string> &manager_vector,
void *context);
/*
@@ -200,17 +197,6 @@ void call_request_transfer(ObjectID object_id,
*/
int fetch_timeout_handler(event_loop *loop, timer_id id, void *context);
/**
* Clean up and free an active object context. Deregister it from the
* associated client connection and from the manager state.
*
* @param client_conn The client connection context.
* @param object_id The object ID whose context we want to delete.
* @return Void.
*/
void remove_object_request(ClientConnection *client_conn,
ClientObjectRequest *object_req);
/**
* Get a connection to the remote manager at the specified address. Creates a
* new connection to this manager if one doesn't already exist.
+12 -21
View File
@@ -118,13 +118,10 @@ void destroy_plasma_mock(plasma_mock *mock) {
TEST request_transfer_test(void) {
plasma_mock *local_mock = init_plasma_mock(NULL);
plasma_mock *remote_mock = init_plasma_mock(local_mock);
const char **manager_vector = (const char **) malloc(sizeof(char *));
UT_string *addr = NULL;
utstring_new(addr);
utstring_printf(addr, "127.0.0.1:%d", remote_mock->port);
manager_vector[0] = utstring_body(addr);
call_request_transfer(object_id, 1, manager_vector, local_mock->state);
free(manager_vector);
std::vector<std::string> manager_vector;
manager_vector.push_back(std::string("127.0.0.1:") +
std::to_string(remote_mock->port));
call_request_transfer(object_id, manager_vector, local_mock->state);
event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, test_done_handler,
local_mock->state);
event_loop_run(local_mock->loop);
@@ -140,7 +137,6 @@ TEST request_transfer_test(void) {
ASSERT(ObjectID_equal(object_id, object_id2));
free(address);
/* Clean up. */
utstring_free(addr);
destroy_plasma_mock(remote_mock);
destroy_plasma_mock(local_mock);
PASS();
@@ -162,17 +158,14 @@ TEST request_transfer_retry_test(void) {
plasma_mock *local_mock = init_plasma_mock(NULL);
plasma_mock *remote_mock1 = init_plasma_mock(local_mock);
plasma_mock *remote_mock2 = init_plasma_mock(local_mock);
const char **manager_vector = (const char **) malloc(sizeof(char *) * 2);
UT_string *addr0 = NULL;
utstring_new(addr0);
utstring_printf(addr0, "127.0.0.1:%d", remote_mock1->port);
manager_vector[0] = utstring_body(addr0);
UT_string *addr1 = NULL;
utstring_new(addr1);
utstring_printf(addr1, "127.0.0.1:%d", remote_mock2->port);
manager_vector[1] = utstring_body(addr1);
call_request_transfer(object_id, 2, manager_vector, local_mock->state);
free(manager_vector);
std::vector<std::string> manager_vector;
manager_vector.push_back(std::string("127.0.0.1:") +
std::to_string(remote_mock1->port));
manager_vector.push_back(std::string("127.0.0.1:") +
std::to_string(remote_mock2->port));
call_request_transfer(object_id, manager_vector, local_mock->state);
event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler,
local_mock->state);
/* Register the fetch timeout handler. This is normally done when the plasma
@@ -194,8 +187,6 @@ TEST request_transfer_retry_test(void) {
free(address);
ASSERT(ObjectID_equal(object_id, object_id2));
/* Clean up. */
utstring_free(addr0);
utstring_free(addr1);
destroy_plasma_mock(remote_mock2);
destroy_plasma_mock(remote_mock1);
destroy_plasma_mock(local_mock);