diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 472f9e496..51f19ea1f 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -97,15 +97,9 @@ class Monitor: message = ray.gcs_utils.HeartbeatBatchTableData.FromString( heartbeat_data) for heartbeat_message in message.batch: - resource_load = dict( - zip(heartbeat_message.resource_load_label, - heartbeat_message.resource_load_capacity)) - total_resources = dict( - zip(heartbeat_message.resources_total_label, - heartbeat_message.resources_total_capacity)) - available_resources = dict( - zip(heartbeat_message.resources_available_label, - heartbeat_message.resources_available_capacity)) + resource_load = dict(heartbeat_message.resource_load) + total_resources = dict(heartbeat_message.resources_total) + available_resources = dict(heartbeat_message.resources_available) for resource in total_resources: available_resources.setdefault(resource, 0.0) diff --git a/python/ray/state.py b/python/ray/state.py index 98d222e01..a095edb47 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -703,12 +703,9 @@ class GlobalState: heartbeat_data = pub_message.data message = gcs_utils.HeartbeatTableData.FromString(heartbeat_data) # Calculate available resources for this client - num_resources = len(message.resources_available_label) dynamic_resources = {} - for i in range(num_resources): - resource_id = message.resources_available_label[i] - dynamic_resources[resource_id] = ( - message.resources_available_capacity[i]) + for resource_id, capacity in message.resources_available.items(): + dynamic_resources[resource_id] = capacity # Update available resources for this client client_id = ray.utils.binary_to_hex(message.client_id) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 24b1aba46..a3299e7bd 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -50,9 +50,9 @@ void GcsNodeManager::NodeFailureDetector::HandleHeartbeat( iter->second = num_heartbeats_timeout_; if (!light_heartbeat_enabled_ || heartbeat_data.should_global_gc() || - heartbeat_data.resources_available_label_size() > 0 || - heartbeat_data.resources_total_label_size() > 0 || - heartbeat_data.resource_load_label_size() > 0) { + heartbeat_data.resources_available_size() > 0 || + heartbeat_data.resources_total_size() > 0 || + heartbeat_data.resource_load_size() > 0) { heartbeat_buffer_[node_id] = heartbeat_data; } } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index fe8b19874..467bf895d 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -221,20 +221,16 @@ message GcsNodeInfo { message HeartbeatTableData { // Node manager client id bytes client_id = 1; - // TODO(hchen): Define the following resources in map format. // Resource capacity currently available on this node manager. - repeated string resources_available_label = 2; - repeated double resources_available_capacity = 3; + map resources_available = 2; // Total resource capacity configured for this node manager. - repeated string resources_total_label = 4; - repeated double resources_total_capacity = 5; + map resources_total = 3; // Aggregate outstanding resource load on this node manager. - repeated string resource_load_label = 6; - repeated double resource_load_capacity = 7; + map resource_load = 4; // Object IDs that are in use by workers on this node manager's node. - repeated bytes active_object_id = 8; + repeated bytes active_object_id = 5; // Whether this node manager is requesting global GC. - bool should_global_gc = 9; + bool should_global_gc = 6; } message HeartbeatBatchTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b5862bba7..cbbefc267 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -351,8 +351,8 @@ void NodeManager::Heartbeat() { local_resources.GetAvailableResources())) { for (const auto &resource_pair : local_resources.GetAvailableResources().GetResourceMap()) { - heartbeat_data->add_resources_available_label(resource_pair.first); - heartbeat_data->add_resources_available_capacity(resource_pair.second); + (*heartbeat_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; } last_heartbeat_resources_.SetAvailableResources( ResourceSet(local_resources.GetAvailableResources())); @@ -362,8 +362,8 @@ void NodeManager::Heartbeat() { local_resources.GetTotalResources())) { for (const auto &resource_pair : local_resources.GetTotalResources().GetResourceMap()) { - heartbeat_data->add_resources_total_label(resource_pair.first); - heartbeat_data->add_resources_total_capacity(resource_pair.second); + (*heartbeat_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; } last_heartbeat_resources_.SetTotalResources( ResourceSet(local_resources.GetTotalResources())); @@ -374,8 +374,8 @@ void NodeManager::Heartbeat() { local_resources.GetLoadResources())) { for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { - heartbeat_data->add_resource_load_label(resource_pair.first); - heartbeat_data->add_resource_load_capacity(resource_pair.second); + (*heartbeat_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; } last_heartbeat_resources_.SetLoadResources( ResourceSet(local_resources.GetLoadResources())); @@ -384,16 +384,16 @@ void NodeManager::Heartbeat() { // If light heartbeat disabled, we send whole resources information every time. for (const auto &resource_pair : local_resources.GetAvailableResources().GetResourceMap()) { - heartbeat_data->add_resources_available_label(resource_pair.first); - heartbeat_data->add_resources_available_capacity(resource_pair.second); + (*heartbeat_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; } last_heartbeat_resources_.SetAvailableResources( ResourceSet(local_resources.GetAvailableResources())); for (const auto &resource_pair : local_resources.GetTotalResources().GetResourceMap()) { - heartbeat_data->add_resources_total_label(resource_pair.first); - heartbeat_data->add_resources_total_capacity(resource_pair.second); + (*heartbeat_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; } last_heartbeat_resources_.SetTotalResources( ResourceSet(local_resources.GetTotalResources())); @@ -401,8 +401,8 @@ void NodeManager::Heartbeat() { local_resources.SetLoadResources(local_queues_.GetResourceLoad()); for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { - heartbeat_data->add_resource_load_label(resource_pair.first); - heartbeat_data->add_resource_load_capacity(resource_pair.second); + (*heartbeat_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; } last_heartbeat_resources_.SetLoadResources( ResourceSet(local_resources.GetLoadResources())); @@ -800,37 +800,26 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, // If light heartbeat enabled, we update remote resources only when related resources // map in heartbeat is not empty. if (light_heartbeat_enabled_) { - if (heartbeat_data.resources_total_label_size() > 0) { - ResourceSet remote_total( - VectorFromProtobuf(heartbeat_data.resources_total_label()), - VectorFromProtobuf(heartbeat_data.resources_total_capacity())); + if (heartbeat_data.resources_total_size() > 0) { + ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); } - if (heartbeat_data.resources_available_label_size() > 0) { - ResourceSet remote_available( - VectorFromProtobuf(heartbeat_data.resources_available_label()), - VectorFromProtobuf(heartbeat_data.resources_available_capacity())); + if (heartbeat_data.resources_available_size() > 0) { + ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); } - if (heartbeat_data.resource_load_label_size() > 0) { - ResourceSet remote_load( - VectorFromProtobuf(heartbeat_data.resource_load_label()), - VectorFromProtobuf(heartbeat_data.resource_load_capacity())); + if (heartbeat_data.resource_load_size() > 0) { + ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load())); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); } } else { // If light heartbeat disabled, we update remote resources every time. - ResourceSet remote_total( - VectorFromProtobuf(heartbeat_data.resources_total_label()), - VectorFromProtobuf(heartbeat_data.resources_total_capacity())); + ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); - ResourceSet remote_available( - VectorFromProtobuf(heartbeat_data.resources_available_label()), - VectorFromProtobuf(heartbeat_data.resources_available_capacity())); + ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); - ResourceSet remote_load(VectorFromProtobuf(heartbeat_data.resource_load_label()), - VectorFromProtobuf(heartbeat_data.resource_load_capacity())); + ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load())); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); }