[GCS] Add async register nodes to GCS Client (#6742)

This commit is contained in:
micafan
2020-01-09 10:51:22 +08:00
committed by Zhijun Fu
parent 69c5a2bc3c
commit 1211e6a1fc
6 changed files with 35 additions and 16 deletions
+4 -2
View File
@@ -333,11 +333,13 @@ class NodeInfoAccessor {
/// \return GcsNodeInfo
virtual const rpc::GcsNodeInfo &GetSelfInfo() const = 0;
/// Register node to GCS synchronously.
/// Register a node to GCS asynchronously.
///
/// \param node_info The information of node to register to GCS.
/// \param callback Callback that will be called when registration is complete.
/// \return Status
virtual Status Register(const rpc::GcsNodeInfo &node_info) = 0;
virtual Status AsyncRegister(const rpc::GcsNodeInfo &node_info,
const StatusCallback &callback) = 0;
/// Cancel registration of a node to GCS asynchronously.
///
@@ -9,11 +9,19 @@ void DefaultNodeInfoHandler::HandleRegisterNode(
rpc::SendReplyCallback send_reply_callback) {
ClientID node_id = ClientID::FromBinary(request.node_info().node_id());
RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id;
Status status = gcs_client_.Nodes().Register(request.node_info());
auto on_done = [node_id, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to register node info: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
};
Status status = gcs_client_.Nodes().AsyncRegister(request.node_info(), on_done);
if (!status.ok()) {
RAY_LOG(DEBUG) << "Failed to register node info, node id = " << node_id;
on_done(status);
}
send_reply_callback(status, nullptr, nullptr);
RAY_LOG(DEBUG) << "Finished registering node info, node id = " << node_id;
}
+8 -2
View File
@@ -420,9 +420,15 @@ const GcsNodeInfo &RedisNodeInfoAccessor::GetSelfInfo() const {
return client_table.GetLocalClient();
}
Status RedisNodeInfoAccessor::Register(const GcsNodeInfo &node_info) {
Status RedisNodeInfoAccessor::AsyncRegister(const GcsNodeInfo &node_info,
const StatusCallback &callback) {
ClientTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const ClientID &id,
const GcsNodeInfo &data) { callback(Status::OK()); };
}
ClientTable &client_table = client_impl_->client_table();
return client_table.Register(node_info);
return client_table.MarkConnected(node_info, on_done);
}
Status RedisNodeInfoAccessor::AsyncUnregister(const ClientID &node_id,
+2 -1
View File
@@ -227,7 +227,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
const GcsNodeInfo &GetSelfInfo() const override;
Status Register(const GcsNodeInfo &node_info) override;
Status AsyncRegister(const GcsNodeInfo &node_info,
const StatusCallback &callback) override;
Status AsyncUnregister(const ClientID &node_id,
const StatusCallback &callback) override;
+3 -2
View File
@@ -640,10 +640,11 @@ Status ClientTable::Disconnect() {
return status;
}
ray::Status ClientTable::Register(const GcsNodeInfo &node_info) {
ray::Status ClientTable::MarkConnected(const GcsNodeInfo &node_info,
const WriteCallback &done) {
RAY_CHECK(node_info.state() == GcsNodeInfo::ALIVE);
auto node_info_ptr = std::make_shared<GcsNodeInfo>(node_info);
return SyncAppend(JobID::Nil(), client_log_key_, node_info_ptr);
return Append(JobID::Nil(), client_log_key_, node_info_ptr, done);
}
ray::Status ClientTable::MarkDisconnected(const ClientID &dead_node_id,
+7 -6
View File
@@ -894,16 +894,17 @@ class ClientTable : public Log<ClientID, GcsNodeInfo> {
/// \return Status
ray::Status Disconnect();
/// Register a new client to the GCS.
/// Mark a new node as connected to GCS asynchronously.
///
/// \param node_info Information about the client.
/// \param node_info Information about the node.
/// \param done Callback that is called once the node has been marked to connected.
/// \return Status
ray::Status Register(const GcsNodeInfo &node_info);
ray::Status MarkConnected(const GcsNodeInfo &node_info, const WriteCallback &done);
/// Mark a different client as disconnected. The client ID should never be
/// reused for a new client.
/// Mark a different node as disconnected. The client ID should never be
/// reused for a new node.
///
/// \param dead_node_id The ID of the client to mark as dead.
/// \param dead_node_id The ID of the node to mark as dead.
/// \param done Callback that is called once the node has been marked to
/// disconnected.
/// \return Status