From d8850eac4bedfc9465c53c3b38d440ec60eb13fb Mon Sep 17 00:00:00 2001 From: Melih Elibol Date: Thu, 1 Feb 2018 22:45:31 -0800 Subject: [PATCH] Suppress object transfer requests when object is already being received. (#1430) * added deterministic check for objects received in fetch_timeout_handler. * use receive time, in case something goes wrong after object is received. * increase timeout for removal. * indentation fix. * make log info log debug. clean up debug log. * undo unecessary changes. * changed description var. * shorten line 949. * incorporate feedback. * linting; make is_object_received function consts. * change semantics of received_objects to objects being received. added checks to both points at which objects are re-requested. updated object receive initialization accordingly. * eliminate erase on receive init. check call to request_transfer_from instead of request_transfer. * updated comments. * added todo for multiple object transfers. * linting. --- .gitignore | 1 + src/plasma/plasma_manager.cc | 54 +++++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 9a813c64f..efdeba1bb 100644 --- a/.gitignore +++ b/.gitignore @@ -131,6 +131,7 @@ build # Gradle: .idea/**/gradle.xml .idea/**/libraries +.idea # Website /site/Gemfile.lock diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index dd622a1ef..6abbe0225 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -238,6 +238,16 @@ struct PlasmaManagerState { /** The time (in milliseconds since the Unix epoch) when the most recent * heartbeat was sent. */ int64_t previous_heartbeat_time; + /** This is the set of ObjectIDs currently being transferred to this manager. + * An ObjectID is added to this set if a shared buffer is + * successfully created for the corresponding object. + * The ObjectID is removed in process_add_object_notification, which is + * triggered by the corresponding notification from the plasma store. + * If an object transfer fails, only the ObjectID of the corresponding + * object is removed. If object transfers between managers is parallelized, + * then all objects being received from a remote manager will need to be + * removed if the connection to the remote manager fails. */ + std::unordered_set receives_in_progress; }; PlasmaManagerState *g_manager_state = NULL; @@ -534,6 +544,12 @@ void PlasmaManagerState_free(PlasmaManagerState *state) { delete state; } +bool is_receiving_or_received(const PlasmaManagerState *state, + const ObjectID &object_id) { + return state->local_available_objects.count(object_id) > 0 || + state->receives_in_progress.count(object_id) > 0; +} + event_loop *get_event_loop(PlasmaManagerState *state) { return state->loop; } @@ -546,7 +562,6 @@ void process_message(event_loop *loop, int events); int write_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) { - LOG_DEBUG("Writing data to fd %d", conn->fd); ssize_t r, s; /* Try to write one buf_size at a time. */ s = buf->data_size + buf->metadata_size - conn->cursor; @@ -642,8 +657,6 @@ void send_queued_request(event_loop *loop, } int read_object_chunk(ClientConnection *conn, PlasmaRequestBuffer *buf) { - LOG_DEBUG("Reading data from fd %d to %p", conn->fd, - buf->data + conn->cursor); ssize_t r, s; CHECK(buf != NULL); /* Try to read one buf_size at a time. */ @@ -680,6 +693,11 @@ void process_data_chunk(event_loop *loop, int err = read_object_chunk(conn, buf); auto plasma_conn = conn->manager_state->plasma_conn; if (err != 0) { + // Remove the object from the receives_in_progress set so that + // retries are processed. + // TODO(hme): Remove all ObjectIDs associated with this manager if we + // allow parallel object transfers. + conn->manager_state->receives_in_progress.erase(buf->object_id); /* Abort the object that we were trying to read from the remote plasma * manager. */ ARROW_CHECK_OK(plasma_conn->Release(buf->object_id.to_plasma_id())); @@ -865,6 +883,14 @@ void process_data_request(event_loop *loop, event_loop_remove_file(loop, client_sock); event_loop_file_handler data_chunk_handler; if (s.ok()) { + // Monitor objects that are in progress of being received. + // If a read fails while receiving this object, its + // ObjectID will be removed. If the object is successfully + // received, its ObjectID is removed by process_add_object_notification. + // If a shared buffer for the object cannot be created, + // then the receive is ignored, and the corresponding ObjectID + // is not inserted into receives_in_progress. + conn->manager_state->receives_in_progress.insert(object_id); buf->data = data->mutable_data(); data_chunk_handler = process_data_chunk; } else { @@ -946,6 +972,15 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { it != manager_state->fetch_requests.end(); it++) { FetchRequest *fetch_req = it->second; if (fetch_req->manager_vector.size() > 0) { + if (is_receiving_or_received(manager_state, fetch_req->object_id)) { + // Do nothing if the object transfer is in progress or if the object + // has already been received. + LOG_DEBUG("fetch_timeout_handler: Object in progress or received. %s", + fetch_req->object_id.hex().c_str()); + continue; + } + LOG_DEBUG("fetch_timeout_handler: Object missing. %s", + fetch_req->object_id.hex().c_str()); request_transfer_from(manager_state, fetch_req); /* If we've tried all of the managers that we know about for this object, * add this object to the list to resend requests for. */ @@ -1005,7 +1040,12 @@ void request_transfer(ObjectID object_id, fetch_req->next_manager = 0; /* Wait for the object data for the default number of retries, which timeout * after a default interval. */ - request_transfer_from(manager_state, fetch_req); + + if (!is_receiving_or_received(manager_state, object_id)) { + // Request object if it's not already being received, + // or if it has not already been received. + request_transfer_from(manager_state, fetch_req); + } } /* This method is only called from the tests. */ @@ -1037,6 +1077,7 @@ void object_table_subscribe_callback(ObjectID object_id, db_client_table_get_ip_addresses(manager_state->db, manager_ids); /* Run the callback for fetch requests if there is a fetch request. */ auto it = manager_state->fetch_requests.find(object_id); + if (it != manager_state->fetch_requests.end()) { request_transfer(object_id, managers, context); } @@ -1315,6 +1356,11 @@ void process_add_object_notification(PlasmaManagerState *state, int64_t metadata_size, unsigned char *digest) { state->local_available_objects.insert(object_id); + if (state->receives_in_progress.count(object_id) > 0) { + // This object is now locally available, so remove it from the + // receives_in_progress set. + state->receives_in_progress.erase(object_id); + } /* Add this object to the (redis) object table. */ if (state->db) {