diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 5497384c3..4f578989f 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -333,11 +333,13 @@ class NodeInfoAccessor { /// \return GcsNodeInfo virtual const rpc::GcsNodeInfo &GetSelfInfo() const = 0; - /// Register node to GCS synchronously. + /// Register a node to GCS asynchronously. /// /// \param node_info The information of node to register to GCS. + /// \param callback Callback that will be called when registration is complete. /// \return Status - virtual Status Register(const rpc::GcsNodeInfo &node_info) = 0; + virtual Status AsyncRegister(const rpc::GcsNodeInfo &node_info, + const StatusCallback &callback) = 0; /// Cancel registration of a node to GCS asynchronously. /// diff --git a/src/ray/gcs/gcs_server/node_info_handler_impl.cc b/src/ray/gcs/gcs_server/node_info_handler_impl.cc index 82e1cb586..eb231d7d0 100644 --- a/src/ray/gcs/gcs_server/node_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/node_info_handler_impl.cc @@ -9,11 +9,19 @@ void DefaultNodeInfoHandler::HandleRegisterNode( rpc::SendReplyCallback send_reply_callback) { ClientID node_id = ClientID::FromBinary(request.node_info().node_id()); RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id; - Status status = gcs_client_.Nodes().Register(request.node_info()); + + auto on_done = [node_id, send_reply_callback](Status status) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to register node info: " << status.ToString() + << ", node id = " << node_id; + } + send_reply_callback(status, nullptr, nullptr); + }; + + Status status = gcs_client_.Nodes().AsyncRegister(request.node_info(), on_done); if (!status.ok()) { - RAY_LOG(DEBUG) << "Failed to register node info, node id = " << node_id; + on_done(status); } - send_reply_callback(status, nullptr, nullptr); RAY_LOG(DEBUG) << "Finished registering node info, node id = " << node_id; } diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 97dd00cbf..b31005621 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -420,9 +420,15 @@ const GcsNodeInfo &RedisNodeInfoAccessor::GetSelfInfo() const { return client_table.GetLocalClient(); } -Status RedisNodeInfoAccessor::Register(const GcsNodeInfo &node_info) { +Status RedisNodeInfoAccessor::AsyncRegister(const GcsNodeInfo &node_info, + const StatusCallback &callback) { + ClientTable::WriteCallback on_done = nullptr; + if (callback != nullptr) { + on_done = [callback](RedisGcsClient *client, const ClientID &id, + const GcsNodeInfo &data) { callback(Status::OK()); }; + } ClientTable &client_table = client_impl_->client_table(); - return client_table.Register(node_info); + return client_table.MarkConnected(node_info, on_done); } Status RedisNodeInfoAccessor::AsyncUnregister(const ClientID &node_id, diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index ce28f4710..f4583f2c8 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -227,7 +227,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { const GcsNodeInfo &GetSelfInfo() const override; - Status Register(const GcsNodeInfo &node_info) override; + Status AsyncRegister(const GcsNodeInfo &node_info, + const StatusCallback &callback) override; Status AsyncUnregister(const ClientID &node_id, const StatusCallback &callback) override; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index b28731114..d9c3d40bf 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -640,10 +640,11 @@ Status ClientTable::Disconnect() { return status; } -ray::Status ClientTable::Register(const GcsNodeInfo &node_info) { +ray::Status ClientTable::MarkConnected(const GcsNodeInfo &node_info, + const WriteCallback &done) { RAY_CHECK(node_info.state() == GcsNodeInfo::ALIVE); auto node_info_ptr = std::make_shared(node_info); - return SyncAppend(JobID::Nil(), client_log_key_, node_info_ptr); + return Append(JobID::Nil(), client_log_key_, node_info_ptr, done); } ray::Status ClientTable::MarkDisconnected(const ClientID &dead_node_id, diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 62c722ef3..05c9f35a0 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -894,16 +894,17 @@ class ClientTable : public Log { /// \return Status ray::Status Disconnect(); - /// Register a new client to the GCS. + /// Mark a new node as connected to GCS asynchronously. /// - /// \param node_info Information about the client. + /// \param node_info Information about the node. + /// \param done Callback that is called once the node has been marked to connected. /// \return Status - ray::Status Register(const GcsNodeInfo &node_info); + ray::Status MarkConnected(const GcsNodeInfo &node_info, const WriteCallback &done); - /// Mark a different client as disconnected. The client ID should never be - /// reused for a new client. + /// Mark a different node as disconnected. The client ID should never be + /// reused for a new node. /// - /// \param dead_node_id The ID of the client to mark as dead. + /// \param dead_node_id The ID of the node to mark as dead. /// \param done Callback that is called once the node has been marked to /// disconnected. /// \return Status