From 92286660e4bb4da8cc5e866fe2ff19ad8de2445a Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Thu, 12 Nov 2020 00:51:40 +0800 Subject: [PATCH] [Core] Lazy create node manager clients, and destroy then (#11928) --- src/ray/raylet/node_manager.cc | 28 ++++++++++++++-------------- src/ray/raylet/node_manager.h | 6 +++--- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b70c58cb5..4a4f0c2e1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -668,11 +668,9 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) { return; } - // Initialize a rpc client to the new node manager. - std::unique_ptr client( - new rpc::NodeManagerClient(node_info.node_manager_address(), - node_info.node_manager_port(), client_call_manager_)); - remote_node_manager_clients_.emplace(node_id, std::move(client)); + // Store address of the new node manager for rpc requests. + remote_node_manager_addresses_[node_id] = + std::make_pair(node_info.node_manager_address(), node_info.node_manager_port()); // Fetch resource info for the remote client and update cluster resource map. RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetResources( @@ -723,10 +721,10 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { } } - // Remove the node manager client. - const auto client_entry = remote_node_manager_clients_.find(node_id); - if (client_entry != remote_node_manager_clients_.end()) { - remote_node_manager_clients_.erase(client_entry); + // Remove the node manager address. + const auto client_entry = remote_node_manager_addresses_.find(node_id); + if (client_entry != remote_node_manager_addresses_.end()) { + remote_node_manager_addresses_.erase(client_entry); } // Notify the object directory that the client has been removed so that it @@ -3003,8 +3001,8 @@ std::string NodeManager::DebugString() const { result << "\n- num restarting actors: " << statistical_data.restarting_actors; result << "\n- num dead actors: " << statistical_data.dead_actors; - result << "\nRemote node manager clients: "; - for (const auto &entry : remote_node_manager_clients_) { + result << "\nRemote node managers: "; + for (const auto &entry : remote_node_manager_addresses_) { result << "\n" << entry.first; } @@ -3247,7 +3245,7 @@ void NodeManager::HandleFormatGlobalMemoryInfo( auto local_reply = std::make_shared(); local_request->set_include_memory_info(true); - unsigned int num_nodes = remote_node_manager_clients_.size() + 1; + unsigned int num_nodes = remote_node_manager_addresses_.size() + 1; rpc::GetNodeStatsRequest stats_req; stats_req.set_include_memory_info(true); @@ -3261,8 +3259,10 @@ void NodeManager::HandleFormatGlobalMemoryInfo( }; // Fetch from remote nodes. - for (const auto &entry : remote_node_manager_clients_) { - entry.second->GetNodeStats( + for (const auto &entry : remote_node_manager_addresses_) { + std::unique_ptr client(new rpc::NodeManagerClient( + entry.second.first, entry.second.second, client_call_manager_)); + client->GetNodeStats( stats_req, [replies, store_reply](const ray::Status &status, const rpc::GetNodeStatsReply &r) { if (!status.ok()) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d7ea0909f..d911155d2 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -765,9 +765,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// copies), freed, and/or spilled. LocalObjectManager local_object_manager_; - /// Map from node ids to clients of the remote node managers. - std::unordered_map> - remote_node_manager_clients_; + /// Map from node ids to addresses of the remote node managers. + absl::flat_hash_map> + remote_node_manager_addresses_; /// Map of workers leased out to direct call clients. std::unordered_map> leased_workers_;