diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index 7eeff3799..58d27fe6c 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -10,7 +10,6 @@ import io.ray.api.id.TaskId; import io.ray.api.id.UniqueId; import io.ray.api.runtimecontext.NodeInfo; import io.ray.runtime.config.RayConfig; -import io.ray.runtime.gcs.GlobalStateAccessor; import io.ray.runtime.generated.Gcs; import io.ray.runtime.generated.Gcs.ActorCheckpointIdData; import io.ray.runtime.generated.Gcs.GcsNodeInfo; @@ -96,19 +95,16 @@ public class GcsClient { } private Map getResourcesForClient(UniqueId clientId) { - final String prefix = TablePrefix.NODE_RESOURCE.toString(); - final byte[] key = ArrayUtils.addAll(prefix.getBytes(), clientId.getBytes()); - Map results = primary.hgetAll(key); - Map resources = new HashMap<>(); - for (Map.Entry entry : results.entrySet()) { - String resourceName = new String(entry.getKey()); - Gcs.ResourceTableData resourceTableData; - try { - resourceTableData = Gcs.ResourceTableData.parseFrom(entry.getValue()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException("Received invalid protobuf data from GCS."); - } - resources.put(resourceName, resourceTableData.getResourceCapacity()); + byte[] resourceMapBytes = globalStateAccessor.getNodeResourceInfo(clientId); + Gcs.ResourceMap resourceMap; + try { + resourceMap = Gcs.ResourceMap.parseFrom(resourceMapBytes); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Received invalid protobuf data from GCS."); + } + HashMap resources = new HashMap<>(); + for (Map.Entry entry : resourceMap.getItemsMap().entrySet()) { + resources.put(entry.getKey(), entry.getValue().getResourceCapacity()); } return resources; } diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java index 257068725..a1c67b53d 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java @@ -2,6 +2,7 @@ package io.ray.runtime.gcs; import com.google.common.base.Preconditions; import io.ray.api.id.ActorId; +import io.ray.api.id.UniqueId; import java.util.List; /** @@ -65,6 +66,18 @@ public class GlobalStateAccessor { } } + /** + * @param nodeId node unique id. + * @return A map of node resource info in protobuf schema. + */ + public byte[] getNodeResourceInfo(UniqueId nodeId) { + synchronized (GlobalStateAccessor.class) { + Preconditions.checkState(globalStateAccessorNativePointer != 0, + "Get resource info by node id when global state accessor have been destroyed."); + return nativeGetNodeResourceInfo(globalStateAccessorNativePointer, nodeId.getBytes()); + } + } + /** * @return A list of actor info with ActorInfo protobuf schema. */ @@ -120,6 +133,8 @@ public class GlobalStateAccessor { private native List nativeGetAllNodeInfo(long nativePtr); + private native byte[] nativeGetNodeResourceInfo(long nativePtr, byte[] nodeId); + private native List nativeGetAllActorInfo(long nativePtr); private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId); diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 7cfbb7f31..41068ca5e 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -13,6 +13,7 @@ from ray.core.generated.gcs_pb2 import ( TablePrefix, TablePubsub, TaskTableData, + ResourceMap, ResourceTableData, ObjectLocationInfo, PubSubMessage, @@ -33,6 +34,7 @@ __all__ = [ "TablePrefix", "TablePubsub", "TaskTableData", + "ResourceMap", "ResourceTableData", "construct_error_message", "ObjectLocationInfo", diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 42b06a8ad..5a3760459 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -4,6 +4,7 @@ from libcpp.vector cimport vector as c_vector from libcpp.memory cimport unique_ptr from ray.includes.unique_ids cimport ( CActorID, + CClientID, CObjectID, ) @@ -21,3 +22,4 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil: unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id) c_vector[c_string] GetAllActorInfo() unique_ptr[c_string] GetActorInfo(const CActorID &actor_id) + c_string GetNodeResourceInfo(const CClientID &node_id) diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 609c16bc7..4a5f3fb5f 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -1,5 +1,6 @@ from ray.includes.unique_ids cimport ( CActorID, + CClientID, CObjectID, ) @@ -53,3 +54,6 @@ cdef class GlobalStateAccessor: if actor_info: return c_string(actor_info.get().data(), actor_info.get().size()) return None + + def get_node_resource_info(self, node_id): + return self.inner.get().GetNodeResourceInfo(CClientID.FromBinary(node_id.binary())) diff --git a/python/ray/state.py b/python/ray/state.py index 2372f87ec..23b2ef770 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -17,106 +17,6 @@ from ray._raylet import GlobalStateAccessor logger = logging.getLogger(__name__) -def _parse_client_table(redis_client): - """Read the client table. - - Args: - redis_client: A client to the primary Redis shard. - - Returns: - A list of information about the nodes in the cluster. - """ - NIL_CLIENT_ID = ray.ClientID.nil().binary() - message = redis_client.execute_command( - "RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("CLIENT"), "", - NIL_CLIENT_ID) - - # Handle the case where no clients are returned. This should only - # occur potentially immediately after the cluster is started. - if message is None: - return [] - - node_info = {} - gcs_entry = gcs_utils.GcsEntry.FromString(message) - - ordered_node_ids = [] - - # Since GCS entries are append-only, we override so that - # only the latest entries are kept. - for entry in gcs_entry.entries: - item = gcs_utils.GcsNodeInfo.FromString(entry) - - node_id = ray.utils.binary_to_hex(item.node_id) - - if item.state == gcs_utils.GcsNodeInfo.GcsNodeState.Value("ALIVE"): - ordered_node_ids.append(node_id) - node_info[node_id] = { - "NodeID": node_id, - "Alive": True, - "NodeManagerAddress": item.node_manager_address, - "NodeManagerHostname": item.node_manager_hostname, - "NodeManagerPort": item.node_manager_port, - "ObjectManagerPort": item.object_manager_port, - "ObjectStoreSocketName": item.object_store_socket_name, - "RayletSocketName": item.raylet_socket_name - } - - # If this node is being removed, then it must - # have previously been inserted, and - # it cannot have previously been removed. - else: - assert node_id in node_info, "node not found!" - assert node_info[node_id]["Alive"], ( - "Unexpected duplicate removal of node.") - node_info[node_id]["Alive"] = False - # Fill resource info. - for node_id in ordered_node_ids: - if node_info[node_id]["Alive"]: - resources = _parse_resource_table(redis_client, node_id) - else: - resources = {} - node_info[node_id]["Resources"] = resources - # NOTE: We return the list comprehension below instead of simply doing - # 'list(node_info.values())' in order to have the nodes appear in the order - # that they joined the cluster. Python dictionaries do not preserve - # insertion order. We could use an OrderedDict, but then we'd have to be - # sure to only insert a given node a single time (clients that die appear - # twice in the GCS log). - return [node_info[node_id] for node_id in ordered_node_ids] - - -def _parse_resource_table(redis_client, client_id): - """Read the resource table with given client id. - - Args: - redis_client: A client to the primary Redis shard. - client_id: The client ID of the node in hex. - - Returns: - A dict of resources about this node. - """ - message = redis_client.execute_command( - "RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("NODE_RESOURCE"), "", - ray.utils.hex_to_binary(client_id)) - - if message is None: - return {} - - resources = {} - gcs_entry = gcs_utils.GcsEntry.FromString(message) - entries_len = len(gcs_entry.entries) - if entries_len % 2 != 0: - raise ValueError("Invalid entry size for resource lookup: " + - str(entries_len)) - - for i in range(0, entries_len, 2): - resource_table_data = gcs_utils.ResourceTableData.FromString( - gcs_entry.entries[i + 1]) - resources[decode( - gcs_entry.entries[i])] = resource_table_data.resource_capacity - return resources - - class GlobalState: """A class used to interface with the Ray control state. @@ -356,6 +256,30 @@ class GlobalState: } return actor_info + def node_resource_table(self, node_id=None): + """Fetch and parse the node resource table info for one. + + Args: + node_id: An node ID to fetch information about. + + Returns: + Information from the node resource table. + """ + self._check_connected() + + node_id = ray.ClientID(hex_to_binary(node_id)) + node_resource_bytes = \ + self.global_state_accessor.get_node_resource_info(node_id) + if node_resource_bytes is None: + return {} + else: + node_resource_info = gcs_utils.ResourceMap.FromString( + node_resource_bytes) + return { + key: value.resource_capacity + for key, value in node_resource_info.items.items() + } + def node_table(self): """Fetch and parse the Gcs node info table. @@ -381,8 +305,7 @@ class GlobalState: "RayletSocketName": item.raylet_socket_name } node_info["alive"] = node_info["Alive"] - node_info["Resources"] = _parse_resource_table( - self.redis_client, + node_info["Resources"] = self.node_resource_table( node_info["NodeID"]) if node_info["Alive"] else {} results.append(node_info) return results diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc index 7d48e42f4..0cdc7f50d 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc @@ -76,6 +76,16 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env, jo }); } +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo( + JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray node_id_bytes) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + auto node_id = JavaByteArrayToId(env, node_id_bytes); + auto node_resource_info = gcs_accessor->GetNodeResourceInfo(node_id); + return static_cast(NativeStringToJavaByteArray(env, node_resource_info)); +} + JNIEXPORT jobject JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo( JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h index f05bf50be..a29420f3a 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h @@ -75,6 +75,14 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *, jobject, jlong); +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeGetNodeResourceInfo + * Signature: (J[B)[B + */ +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(JNIEnv *, jobject, + jlong, jbyteArray); /* * Class: io_ray_runtime_gcs_GlobalStateAccessor * Method: nativeGetAllActorInfo diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index b9ae57949..2a2189880 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -129,6 +129,27 @@ std::unique_ptr GlobalStateAccessor::GetObjectInfo( return object_info; } +std::string GlobalStateAccessor::GetNodeResourceInfo(const ClientID &node_id) { + rpc::ResourceMap node_resource_map; + std::promise promise; + auto on_done = + [&node_resource_map, &promise]( + const Status &status, + const boost::optional &result) { + RAY_CHECK_OK(status); + if (result) { + auto result_value = result.get(); + for (auto &data : result_value) { + (*node_resource_map.mutable_items())[data.first] = *data.second; + } + } + promise.set_value(); + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetResources(node_id, on_done)); + promise.get_future().get(); + return node_resource_map.SerializeAsString(); +} + std::vector GlobalStateAccessor::GetAllActorInfo() { std::vector actor_table_data; std::promise promise; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index fe57cb14a..2ea4ec6df 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -79,6 +79,14 @@ class GlobalStateAccessor { /// protobuf function. std::unique_ptr GetObjectInfo(const ObjectID &object_id); + /// Get information of a node resource from GCS Service. + /// + /// \param node_id The ID of node to look up in the GCS Service. + /// \return node resource map info. To support multi-language, we serialize each + /// ResourceTableData and return the serialized string. Where used, it needs to be + /// deserialized with protobuf function. + std::string GetNodeResourceInfo(const ClientID &node_id); + /// Get information of all actors from GCS Service. /// /// \return All actor info. To support multi-language, we serialize each ActorTableData diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 817777a78..ff459f3a6 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -132,6 +132,42 @@ TEST_F(GlobalStateAccessorTest, TestNodeTable) { } } +TEST_F(GlobalStateAccessorTest, TestNodeResourceTable) { + int node_count = 100; + ASSERT_EQ(global_state_->GetAllNodeInfo().size(), 0); + for (int index = 0; index < node_count; ++index) { + auto node_table_data = + Mocker::GenNodeInfo(index, std::string("127.0.0.") + std::to_string(index)); + auto node_id = ClientID::FromBinary(node_table_data->node_id()); + std::promise promise; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncRegister( + *node_table_data, [&promise](Status status) { promise.set_value(status.ok()); })); + WaitReady(promise.get_future(), timeout_ms_); + ray::gcs::NodeInfoAccessor::ResourceMap resources; + rpc::ResourceTableData resource_table_data; + resource_table_data.set_resource_capacity(static_cast(index + 1) + 0.1); + resources[std::to_string(index)] = + std::make_shared(resource_table_data); + RAY_IGNORE_EXPR(gcs_client_->Nodes().AsyncUpdateResources( + node_id, resources, [](Status status) { RAY_CHECK(status.ok()); })); + } + auto node_table = global_state_->GetAllNodeInfo(); + ASSERT_EQ(node_table.size(), node_count); + for (int index = 0; index < node_count; ++index) { + rpc::GcsNodeInfo node_data; + node_data.ParseFromString(node_table[index]); + auto resource_map_str = + global_state_->GetNodeResourceInfo(ClientID::FromBinary(node_data.node_id())); + rpc::ResourceMap resource_map; + resource_map.ParseFromString(resource_map_str); + ASSERT_EQ( + static_cast( + (*resource_map.mutable_items())[std::to_string(node_data.node_manager_port())] + .resource_capacity()), + node_data.node_manager_port() + 1); + } +} + TEST_F(GlobalStateAccessorTest, TestProfileTable) { int profile_count = 100; ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index c1bee85c8..f2383bf16 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -202,8 +202,8 @@ void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request, RAY_LOG(DEBUG) << "Getting node resources, node id = " << node_id; auto iter = cluster_resources_.find(node_id); if (iter != cluster_resources_.end()) { - for (auto &resource : iter->second) { - (*reply->mutable_resources())[resource.first] = *resource.second; + for (auto &resource : iter->second.items()) { + (*reply->mutable_resources())[resource.first] = resource.second; } } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -216,23 +216,19 @@ void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &re ClientID node_id = ClientID::FromBinary(request.node_id()); RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id; auto iter = cluster_resources_.find(node_id); + auto to_be_updated_resources = request.resources(); if (iter != cluster_resources_.end()) { - auto to_be_updated_resources = std::make_shared(); - for (auto resource : request.resources()) { - (*to_be_updated_resources)[resource.first] = - std::make_shared(resource.second); - } - for (auto &entry : *to_be_updated_resources) { - iter->second[entry.first] = entry.second; + for (auto &entry : to_be_updated_resources) { + (*iter->second.mutable_items())[entry.first] = entry.second; } auto on_done = [this, node_id, to_be_updated_resources, reply, send_reply_callback](const Status &status) { RAY_CHECK_OK(status); rpc::NodeResourceChange node_resource_change; node_resource_change.set_node_id(node_id.Binary()); - for (auto &it : *to_be_updated_resources) { + for (auto &it : to_be_updated_resources) { (*node_resource_change.mutable_updated_resources())[it.first] = - it.second->resource_capacity(); + it.second.resource_capacity(); } RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(), node_resource_change.SerializeAsString(), @@ -242,8 +238,8 @@ void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &re RAY_LOG(DEBUG) << "Finished updating resources, node id = " << node_id; }; - RAY_CHECK_OK(node_info_accessor_.AsyncUpdateResources( - node_id, *to_be_updated_resources, on_done)); + RAY_CHECK_OK( + gcs_table_storage_->NodeResourceTable().Put(node_id, iter->second, on_done)); } else { GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::Invalid("Node is not exist.")); RAY_LOG(ERROR) << "Failed to update resources as node " << node_id @@ -260,7 +256,7 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re auto iter = cluster_resources_.find(node_id); if (iter != cluster_resources_.end()) { for (auto &resource_name : resource_names) { - iter->second.erase(resource_name); + RAY_IGNORE_EXPR(iter->second.mutable_items()->erase(resource_name)); } auto on_done = [this, node_id, resource_names, reply, send_reply_callback](const Status &status) { @@ -277,7 +273,7 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; RAY_CHECK_OK( - node_info_accessor_.AsyncDeleteResources(node_id, resource_names, on_done)); + gcs_table_storage_->NodeResourceTable().Put(node_id, iter->second, on_done)); } else { GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); RAY_LOG(DEBUG) << "Finished deleting node resources, node id = " << node_id; @@ -300,8 +296,7 @@ void GcsNodeManager::AddNode(std::shared_ptr node) { if (iter == alive_nodes_.end()) { alive_nodes_.emplace(node_id, node); // Add an empty resources for this node. - RAY_CHECK( - cluster_resources_.emplace(node_id, gcs::NodeInfoAccessor::ResourceMap()).second); + RAY_CHECK(cluster_resources_.emplace(node_id, rpc::ResourceMap()).second); // Register this node to the `node_failure_detector_` which will start monitoring it. node_failure_detector_->AddNode(node_id); // Notify all listeners. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index c2ee97b6e..8c889b8ec 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -198,7 +198,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Dead nodes. absl::flat_hash_map> dead_nodes_; /// Cluster resources. - absl::flat_hash_map cluster_resources_; + absl::flat_hash_map cluster_resources_; /// Listeners which monitors the addition of nodes. std::vector)>> node_added_listeners_;