mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:32:11 +08:00
[Core] Lazy create node manager clients, and destroy then (#11928)
This commit is contained in:
@@ -668,11 +668,9 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize a rpc client to the new node manager.
|
||||
std::unique_ptr<rpc::NodeManagerClient> client(
|
||||
new rpc::NodeManagerClient(node_info.node_manager_address(),
|
||||
node_info.node_manager_port(), client_call_manager_));
|
||||
remote_node_manager_clients_.emplace(node_id, std::move(client));
|
||||
// Store address of the new node manager for rpc requests.
|
||||
remote_node_manager_addresses_[node_id] =
|
||||
std::make_pair(node_info.node_manager_address(), node_info.node_manager_port());
|
||||
|
||||
// Fetch resource info for the remote client and update cluster resource map.
|
||||
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetResources(
|
||||
@@ -723,10 +721,10 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the node manager client.
|
||||
const auto client_entry = remote_node_manager_clients_.find(node_id);
|
||||
if (client_entry != remote_node_manager_clients_.end()) {
|
||||
remote_node_manager_clients_.erase(client_entry);
|
||||
// Remove the node manager address.
|
||||
const auto client_entry = remote_node_manager_addresses_.find(node_id);
|
||||
if (client_entry != remote_node_manager_addresses_.end()) {
|
||||
remote_node_manager_addresses_.erase(client_entry);
|
||||
}
|
||||
|
||||
// Notify the object directory that the client has been removed so that it
|
||||
@@ -3003,8 +3001,8 @@ std::string NodeManager::DebugString() const {
|
||||
result << "\n- num restarting actors: " << statistical_data.restarting_actors;
|
||||
result << "\n- num dead actors: " << statistical_data.dead_actors;
|
||||
|
||||
result << "\nRemote node manager clients: ";
|
||||
for (const auto &entry : remote_node_manager_clients_) {
|
||||
result << "\nRemote node managers: ";
|
||||
for (const auto &entry : remote_node_manager_addresses_) {
|
||||
result << "\n" << entry.first;
|
||||
}
|
||||
|
||||
@@ -3247,7 +3245,7 @@ void NodeManager::HandleFormatGlobalMemoryInfo(
|
||||
auto local_reply = std::make_shared<rpc::GetNodeStatsReply>();
|
||||
local_request->set_include_memory_info(true);
|
||||
|
||||
unsigned int num_nodes = remote_node_manager_clients_.size() + 1;
|
||||
unsigned int num_nodes = remote_node_manager_addresses_.size() + 1;
|
||||
rpc::GetNodeStatsRequest stats_req;
|
||||
stats_req.set_include_memory_info(true);
|
||||
|
||||
@@ -3261,8 +3259,10 @@ void NodeManager::HandleFormatGlobalMemoryInfo(
|
||||
};
|
||||
|
||||
// Fetch from remote nodes.
|
||||
for (const auto &entry : remote_node_manager_clients_) {
|
||||
entry.second->GetNodeStats(
|
||||
for (const auto &entry : remote_node_manager_addresses_) {
|
||||
std::unique_ptr<rpc::NodeManagerClient> client(new rpc::NodeManagerClient(
|
||||
entry.second.first, entry.second.second, client_call_manager_));
|
||||
client->GetNodeStats(
|
||||
stats_req, [replies, store_reply](const ray::Status &status,
|
||||
const rpc::GetNodeStatsReply &r) {
|
||||
if (!status.ok()) {
|
||||
|
||||
@@ -765,9 +765,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
||||
/// copies), freed, and/or spilled.
|
||||
LocalObjectManager local_object_manager_;
|
||||
|
||||
/// Map from node ids to clients of the remote node managers.
|
||||
std::unordered_map<NodeID, std::unique_ptr<rpc::NodeManagerClient>>
|
||||
remote_node_manager_clients_;
|
||||
/// Map from node ids to addresses of the remote node managers.
|
||||
absl::flat_hash_map<NodeID, std::pair<std::string, int32_t>>
|
||||
remote_node_manager_addresses_;
|
||||
|
||||
/// Map of workers leased out to direct call clients.
|
||||
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> leased_workers_;
|
||||
|
||||
Reference in New Issue
Block a user