From 4b25810994f7ddd6798fa9a6b7511eb3ce62e2ca Mon Sep 17 00:00:00 2001 From: William Ma <12377941+williamma12@users.noreply.github.com> Date: Wed, 3 Apr 2019 17:12:06 -0700 Subject: [PATCH] Adds a `push_id` to every push in the object manager (#4407) --- .../object_manager/format/object_manager.fbs | 3 +++ src/ray/object_manager/object_manager.cc | 22 +++++++++++-------- src/ray/object_manager/object_manager.h | 11 +++++----- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/ray/object_manager/format/object_manager.fbs b/src/ray/object_manager/format/object_manager.fbs index dbafa3744..73c28b7be 100644 --- a/src/ray/object_manager/format/object_manager.fbs +++ b/src/ray/object_manager/format/object_manager.fbs @@ -34,6 +34,9 @@ enum MessageType:int { } table PushRequestMessage { + // The push ID to allow the receiver to differentiate different push attempts + // from the same sender. + push_id: string; // The object ID being transferred. object_id: string; // The index of the chunk being transferred. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 29338b165..71ff1ead1 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -407,16 +407,18 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { static_cast(object_info.data_size + object_info.metadata_size); uint64_t metadata_size = static_cast(object_info.metadata_size); uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); + UniqueID push_id = UniqueID::from_random(); for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - send_service_.post([this, client_id, object_id, data_size, metadata_size, + send_service_.post([this, push_id, client_id, object_id, data_size, metadata_size, chunk_index, connection_info]() { double start_time = current_sys_time_seconds(); // NOTE: When this callback executes, it's possible that the object // will have already been evicted. It's also possible that the // object could be in the process of being transferred to this // object manager from another object manager. - ray::Status status = ExecuteSendObject( - client_id, object_id, data_size, metadata_size, chunk_index, connection_info); + ray::Status status = + ExecuteSendObject(push_id, client_id, object_id, data_size, metadata_size, + chunk_index, connection_info); double end_time = current_sys_time_seconds(); // Notify the main thread that we have finished sending the chunk. @@ -435,8 +437,8 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { } ray::Status ObjectManager::ExecuteSendObject( - const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, + const UniqueID &push_id, const ClientID &client_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, const RemoteConnectionInfo &connection_info) { RAY_LOG(DEBUG) << "ExecuteSendObject on " << client_id_ << " to " << client_id << " of object " << object_id << " chunk " << chunk_index; @@ -449,7 +451,8 @@ ray::Status ObjectManager::ExecuteSendObject( } if (conn != nullptr) { - status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn); + status = SendObjectHeaders(push_id, object_id, data_size, metadata_size, chunk_index, + conn); if (!status.ok()) { RAY_CHECK(status.IsIOError()) << "Failed to contact remote object manager during Push"; @@ -459,7 +462,8 @@ ray::Status ObjectManager::ExecuteSendObject( return status; } -ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, +ray::Status ObjectManager::SendObjectHeaders(const UniqueID &push_id, + const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, std::shared_ptr &conn) { @@ -479,7 +483,8 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, // Create buffer. flatbuffers::FlatBufferBuilder fbb; auto message = object_manager_protocol::CreatePushRequestMessage( - fbb, to_flatbuf(fbb, object_id), chunk_index, data_size, metadata_size); + fbb, to_flatbuf(fbb, push_id), to_flatbuf(fbb, object_id), chunk_index, data_size, + metadata_size); fbb.Finish(message); status = conn->WriteMessage( static_cast(object_manager_protocol::MessageType::PushRequest), @@ -827,7 +832,6 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr &con HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, status); }); - }); } diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 29c75c57a..b905f6c2c 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -308,16 +308,17 @@ class ObjectManager : public ObjectManagerInterface { /// Begin executing a send. /// Executes on send_service_ thread pool. - ray::Status ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, + ray::Status ExecuteSendObject(const UniqueID &push_id, const ClientID &client_id, + const ObjectID &object_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, const RemoteConnectionInfo &connection_info); /// This method synchronously sends the object id and object size /// to the remote object manager. /// Executes on send_service_ thread pool. - ray::Status SendObjectHeaders(const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, + ray::Status SendObjectHeaders(const UniqueID &push_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, std::shared_ptr &conn); /// This method initiates the actual object transfer.