Adds a push_id to every push in the object manager (#4407)

This commit is contained in:
William Ma
2019-04-03 17:12:06 -07:00
committed by Robert Nishihara
parent c2349cf12d
commit 4b25810994
3 changed files with 22 additions and 14 deletions
@@ -34,6 +34,9 @@ enum MessageType:int {
}
table PushRequestMessage {
// The push ID to allow the receiver to differentiate different push attempts
// from the same sender.
push_id: string;
// The object ID being transferred.
object_id: string;
// The index of the chunk being transferred.
+13 -9
View File
@@ -407,16 +407,18 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
UniqueID push_id = UniqueID::from_random();
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
send_service_.post([this, client_id, object_id, data_size, metadata_size,
send_service_.post([this, push_id, client_id, object_id, data_size, metadata_size,
chunk_index, connection_info]() {
double start_time = current_sys_time_seconds();
// NOTE: When this callback executes, it's possible that the object
// will have already been evicted. It's also possible that the
// object could be in the process of being transferred to this
// object manager from another object manager.
ray::Status status = ExecuteSendObject(
client_id, object_id, data_size, metadata_size, chunk_index, connection_info);
ray::Status status =
ExecuteSendObject(push_id, client_id, object_id, data_size, metadata_size,
chunk_index, connection_info);
double end_time = current_sys_time_seconds();
// Notify the main thread that we have finished sending the chunk.
@@ -435,8 +437,8 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
}
ray::Status ObjectManager::ExecuteSendObject(
const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const UniqueID &push_id, const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info) {
RAY_LOG(DEBUG) << "ExecuteSendObject on " << client_id_ << " to " << client_id
<< " of object " << object_id << " chunk " << chunk_index;
@@ -449,7 +451,8 @@ ray::Status ObjectManager::ExecuteSendObject(
}
if (conn != nullptr) {
status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn);
status = SendObjectHeaders(push_id, object_id, data_size, metadata_size, chunk_index,
conn);
if (!status.ok()) {
RAY_CHECK(status.IsIOError())
<< "Failed to contact remote object manager during Push";
@@ -459,7 +462,8 @@ ray::Status ObjectManager::ExecuteSendObject(
return status;
}
ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
ray::Status ObjectManager::SendObjectHeaders(const UniqueID &push_id,
const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<SenderConnection> &conn) {
@@ -479,7 +483,8 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
// Create buffer.
flatbuffers::FlatBufferBuilder fbb;
auto message = object_manager_protocol::CreatePushRequestMessage(
fbb, to_flatbuf(fbb, object_id), chunk_index, data_size, metadata_size);
fbb, to_flatbuf(fbb, push_id), to_flatbuf(fbb, object_id), chunk_index, data_size,
metadata_size);
fbb.Finish(message);
status = conn->WriteMessage(
static_cast<int64_t>(object_manager_protocol::MessageType::PushRequest),
@@ -827,7 +832,6 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr<TcpClientConnection> &con
HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
});
}
+6 -5
View File
@@ -308,16 +308,17 @@ class ObjectManager : public ObjectManagerInterface {
/// Begin executing a send.
/// Executes on send_service_ thread pool.
ray::Status ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
ray::Status ExecuteSendObject(const UniqueID &push_id, const ClientID &client_id,
const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info);
/// This method synchronously sends the object id and object size
/// to the remote object manager.
/// Executes on send_service_ thread pool.
ray::Status SendObjectHeaders(const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
ray::Status SendObjectHeaders(const UniqueID &push_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<SenderConnection> &conn);
/// This method initiates the actual object transfer.