diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index cf9cc2e6d..e840d20f0 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -3,6 +3,7 @@ from ray.core.generated.gcs_pb2 import ( ActorCheckpointIdData, ActorTableData, GcsNodeInfo, + AvailableResources, JobTableData, JobConfig, ErrorTableData, @@ -26,6 +27,7 @@ __all__ = [ "ActorCheckpointIdData", "ActorTableData", "GcsNodeInfo", + "AvailableResources", "JobTableData", "JobConfig", "ErrorTableData", diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 34d34e97b..058aa8514 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -19,6 +19,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil: void Disconnect() c_vector[c_string] GetAllJobInfo() c_vector[c_string] GetAllNodeInfo() + c_vector[c_string] GetAllAvailableResources() c_vector[c_string] GetAllProfileInfo() c_vector[c_string] GetAllObjectInfo() unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id) diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index db9e66fbd..216d3dd82 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -51,6 +51,12 @@ cdef class GlobalStateAccessor: result = self.inner.get().GetAllNodeInfo() return result + def get_all_available_resources(self): + cdef c_vector[c_string] result + with nogil: + result = self.inner.get().GetAllAvailableResources() + return result + def get_profile_table(self): cdef c_vector[c_string] result with nogil: diff --git a/python/ray/state.py b/python/ray/state.py index 579df4179..64e749870 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -760,41 +760,26 @@ class GlobalState: """Returns a dictionary mapping node id to avaiable resources.""" available_resources_by_id = {} - subscribe_client = self.redis_client.pubsub( - ignore_subscribe_messages=True) - subscribe_client.psubscribe(gcs_utils.XRAY_HEARTBEAT_PATTERN) - - client_ids = self._live_client_ids() - - while set(available_resources_by_id.keys()) != client_ids: - # Parse client message - raw_message = subscribe_client.get_message() - if (raw_message is None or raw_message["pattern"] != - gcs_utils.XRAY_HEARTBEAT_PATTERN): - continue - data = raw_message["data"] - pub_message = gcs_utils.PubSubMessage.FromString(data) - heartbeat_data = pub_message.data - message = gcs_utils.HeartbeatTableData.FromString(heartbeat_data) - # Calculate available resources for this client + all_available_resources = \ + self.global_state_accessor.get_all_available_resources() + for available_resource in all_available_resources: + message = ray.gcs_utils.AvailableResources.FromString( + available_resource) + # Calculate available resources for this node. dynamic_resources = {} - for resource_id, capacity in message.resources_available.items(): + for resource_id, capacity in \ + message.resources_available.items(): dynamic_resources[resource_id] = capacity + # Update available resources for this node. + node_id = ray.utils.binary_to_hex(message.node_id) + available_resources_by_id[node_id] = dynamic_resources - # Update available resources for this client - client_id = ray.utils.binary_to_hex(message.client_id) - available_resources_by_id[client_id] = dynamic_resources - - # Update clients in cluster - client_ids = self._live_client_ids() - - # Remove disconnected clients - for client_id in list(available_resources_by_id.keys()): - if client_id not in client_ids: - del available_resources_by_id[client_id] - - # Close the pubsub clients to avoid leaking file descriptors. - subscribe_client.close() + # Update nodes in cluster. + node_ids = self._live_client_ids() + # Remove disconnected nodes. + for node_id in available_resources_by_id.keys(): + if node_id not in node_ids: + del available_resources_by_id[node_id] return available_resources_by_id @@ -814,7 +799,7 @@ class GlobalState: available_resources_by_id = self._available_resources_per_node() - # Calculate total available resources + # Calculate total available resources. total_available_resources = defaultdict(int) for available_resources in available_resources_by_id.values(): for resource_id, num_available in available_resources.items(): diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 6b114e7ac..e76b77620 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -695,12 +695,16 @@ def test_accelerator_type_api(shutdown_only): @ray.remote(accelerator_type=v100) def decorated_func(quantity): - return ray.available_resources()[resource_name] < quantity + wait_for_condition( + lambda: ray.available_resources()[resource_name] < quantity) + return True assert ray.get(decorated_func.remote(quantity)) def via_options_func(quantity): - return ray.available_resources()[resource_name] < quantity + wait_for_condition( + lambda: ray.available_resources()[resource_name] < quantity) + return True assert ray.get( ray.remote(via_options_func).options( @@ -725,13 +729,15 @@ def test_accelerator_type_api(shutdown_only): # Avoid a race condition where the actor hasn't been initialized and # claimed the resources yet. ray.get(decorated_actor.initialized.remote()) - assert ray.available_resources()[resource_name] < quantity + wait_for_condition( + lambda: ray.available_resources()[resource_name] < quantity) quantity = ray.available_resources()[resource_name] with_options = ray.remote(ActorWithOptions).options( accelerator_type=v100).remote() ray.get(with_options.initialized.remote()) - assert ray.available_resources()[resource_name] < quantity + wait_for_condition( + lambda: ray.available_resources()[resource_name] < quantity) def test_detect_docker_cpus(): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 72cb90c42..a27736d38 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -219,8 +219,12 @@ def test_many_fractional_resources(shutdown_only): stop_time = time.time() + 10 correct_available_resources = False while time.time() < stop_time: - if (ray.available_resources()["CPU"] == 2.0 + available_resources = ray.available_resources() + if ("CPU" in available_resources + and ray.available_resources()["CPU"] == 2.0 + and "GPU" in available_resources and ray.available_resources()["GPU"] == 2.0 + and "Custom" in available_resources and ray.available_resources()["Custom"] == 2.0): correct_available_resources = True break @@ -346,6 +350,9 @@ def test_ray_options(shutdown_only): @ray.remote( num_cpus=2, num_gpus=3, memory=150 * 2**20, resources={"custom1": 1}) def foo(): + import time + # Sleep for a heartbeat period to ensure resources changing reported. + time.sleep(0.1) return ray.available_resources() ray.init(num_cpus=10, num_gpus=10, resources={"custom1": 2}) diff --git a/python/ray/tests/test_dynres.py b/python/ray/tests/test_dynres.py index 1470f8919..a67da6416 100644 --- a/python/ray/tests/test_dynres.py +++ b/python/ray/tests/test_dynres.py @@ -647,7 +647,9 @@ def test_release_cpus_when_actor_creation_task_blocking(shutdown_only): return False def assert_available_resources(): - return 1 == ray.available_resources()["CPU"] + available_resources = ray.available_resources() + return "CPU" in available_resources and 1 == ray.available_resources( + )["CPU"] result = wait_until(assert_available_resources, 1000) assert result is True diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 5f36dd79a..ef7c6c1dc 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -538,6 +538,13 @@ class NodeInfoAccessor { virtual Status AsyncGetResources(const NodeID &node_id, const OptionalItemCallback &callback) = 0; + /// Get available resources of all nodes from GCS asynchronously. + /// + /// \param callback Callback that will be called after lookup finishes. + /// \return Status + virtual Status AsyncGetAllAvailableResources( + const MultiItemCallback &callback) = 0; + /// Update resources of node in GCS asynchronously. /// /// \param node_id The ID of node to update dynamic resources. diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index f59f55937..3d427d7b6 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -148,6 +148,16 @@ std::string GlobalStateAccessor::GetNodeResourceInfo(const NodeID &node_id) { return node_resource_map.SerializeAsString(); } +std::vector GlobalStateAccessor::GetAllAvailableResources() { + std::vector available_resources; + std::promise promise; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllAvailableResources( + TransformForMultiItemCallback(available_resources, + promise))); + promise.get_future().get(); + return available_resources; +} + std::string GlobalStateAccessor::GetInternalConfig() { rpc::StoredConfig config_proto; std::promise promise; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index 1786e0877..8ce72b156 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -86,6 +86,13 @@ class GlobalStateAccessor { /// deserialized with protobuf function. std::string GetNodeResourceInfo(const NodeID &node_id); + /// Get available resources of all nodes. + /// + /// \return available resources of all nodes. To support multi-language, we serialize + /// each AvailableResources and return the serialized string. Where used, it needs to be + /// deserialized with protobuf function. + std::vector GetAllAvailableResources(); + /// Get internal config from GCS Service. /// /// \return map of internal config keys and values. It is stored as a StoredConfig proto diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 07c4d6400..345322f0e 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -638,6 +638,21 @@ Status ServiceBasedNodeInfoAccessor::AsyncGetResources( return Status::OK(); } +Status ServiceBasedNodeInfoAccessor::AsyncGetAllAvailableResources( + const MultiItemCallback &callback) { + rpc::GetAllAvailableResourcesRequest request; + client_impl_->GetGcsRpcClient().GetAllAvailableResources( + request, + [callback](const Status &status, const rpc::GetAllAvailableResourcesReply &reply) { + std::vector result = + VectorFromProtobuf(reply.resources_list()); + callback(status, result); + RAY_LOG(DEBUG) << "Finished getting available resources of all nodes, status = " + << status; + }); + return Status::OK(); +} + Status ServiceBasedNodeInfoAccessor::AsyncUpdateResources( const NodeID &node_id, const ResourceMap &resources, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Updating node resources, node id = " << node_id; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index fcd8e1e20..dd9ae406e 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -182,6 +182,9 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { Status AsyncGetResources(const NodeID &node_id, const OptionalItemCallback &callback) override; + Status AsyncGetAllAvailableResources( + const MultiItemCallback &callback) override; + Status AsyncUpdateResources(const NodeID &node_id, const ResourceMap &resources, const StatusCallback &callback) override; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 3d43ddc4f..5206cd048 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -331,6 +331,20 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return WaitReady(promise.get_future(), timeout_ms_); } + std::vector GetAllAvailableResources() { + std::promise promise; + std::vector resources; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllAvailableResources( + [&resources, &promise](Status status, + const std::vector &result) { + EXPECT_TRUE(!result.empty()); + resources.assign(result.begin(), result.end()); + promise.set_value(status.ok()); + })); + EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_)); + return resources; + } + bool SubscribeTask( const TaskID &task_id, const gcs::SubscribeCallback &subscribe) { @@ -732,6 +746,38 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeat) { WaitForExpectedCount(heartbeat_batch_count, 1); } +TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { + // Subscribe batched state of all nodes from GCS. + std::atomic heartbeat_batch_count(0); + auto on_subscribe = + [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { + ++heartbeat_batch_count; + }; + ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + + // Register node. + auto node_info = Mocker::GenNodeInfo(); + RAY_CHECK(RegisterNode(*node_info)); + + // Report heartbeat of a node to GCS. + NodeID node_id = NodeID::FromBinary(node_info->node_id()); + auto heartbeat = std::make_shared(); + heartbeat->set_client_id(node_id.Binary()); + // Set this flag because GCS won't publish unchanged heartbeat. + heartbeat->set_should_global_gc(true); + (*heartbeat->mutable_resources_available())["CPU"] = 1.0; + (*heartbeat->mutable_resources_available())["GPU"] = 10.0; + ASSERT_TRUE(ReportHeartbeat(heartbeat)); + WaitForExpectedCount(heartbeat_batch_count, 1); + + // Assert get all available resources right. + std::vector resources = GetAllAvailableResources(); + EXPECT_EQ(resources.size(), 1); + EXPECT_EQ(resources[0].resources_available_size(), 2); + EXPECT_EQ((*resources[0].mutable_resources_available())["CPU"], 1.0); + EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0); +} + TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { JobID job_id = JobID::FromInt(1); TaskID task_id = TaskID::ForDriverTask(job_id); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 1166a8eca..f314772ad 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -354,6 +354,21 @@ void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config)); } +void GcsNodeManager::HandleGetAllAvailableResources( + const rpc::GetAllAvailableResourcesRequest &request, + rpc::GetAllAvailableResourcesReply *reply, + rpc::SendReplyCallback send_reply_callback) { + for (const auto &iter : GetClusterRealtimeResources()) { + rpc::AvailableResources resource; + resource.set_node_id(iter.first.Binary()); + for (auto res : iter.second->GetResourceAmountMap()) { + (*resource.mutable_resources_available())[res.first] = res.second.ToDouble(); + } + reply->add_resources_list()->CopyFrom(resource); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); +} + std::shared_ptr GcsNodeManager::GetNode( const ray::NodeID &node_id) const { auto iter = alive_nodes_.find(node_id); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 26a8b7904..c94a26ebb 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -79,16 +79,22 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::DeleteResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle setting internal config. + /// Handle set internal config. void HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request, rpc::SetInternalConfigReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle getting internal config. + /// Handle get internal config. void HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request, rpc::GetInternalConfigReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle get available resources of all nodes. + void HandleGetAllAvailableResources( + const rpc::GetAllAvailableResourcesRequest &request, + rpc::GetAllAvailableResourcesReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Add an alive node. /// /// \param node The info of the node to be added. diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index e94b152fd..9d93ec2c5 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -356,6 +356,11 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { Status AsyncGetResources(const NodeID &node_id, const OptionalItemCallback &callback) override; + Status AsyncGetAllAvailableResources( + const MultiItemCallback &callback) override { + return Status::NotImplemented("AsyncGetAllAvailableResources not implemented"); + } + Status AsyncUpdateResources(const NodeID &node_id, const ResourceMap &resources, const StatusCallback &callback) override; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 2bc54e865..110376f13 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -222,6 +222,13 @@ message ResourceTableData { double resource_capacity = 1; } +message AvailableResources { + // Node manager client id. + bytes node_id = 1; + // Resource capacity currently available on this node manager. + map resources_available = 2; +} + message GcsNodeInfo { // State of a node. enum GcsNodeState { diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 141e2b526..ac048e4cf 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -263,6 +263,14 @@ message GetInternalConfigReply { StoredConfig config = 2; } +message GetAllAvailableResourcesRequest { +} + +message GetAllAvailableResourcesReply { + GcsStatus status = 1; + repeated AvailableResources resources_list = 2; +} + // Service for node info access. service NodeInfoGcsService { // Register a node to GCS Service. @@ -283,6 +291,9 @@ service NodeInfoGcsService { rpc SetInternalConfig(SetInternalConfigRequest) returns (SetInternalConfigReply); // Get cluster internal config. rpc GetInternalConfig(GetInternalConfigRequest) returns (GetInternalConfigReply); + // Get available resources of all nodes. + rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) + returns (GetAllAvailableResourcesReply); } message GetObjectLocationsRequest { diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 37b5557f3..79e964b89 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -190,6 +190,10 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetInternalConfig, node_info_grpc_client_, ) + /// Get available resources of all nodes from the GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllAvailableResources, + node_info_grpc_client_, ) + /// Get object's locations from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations, object_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index dc15a35c2..3a601a44a 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -215,6 +215,11 @@ class NodeInfoGcsServiceHandler { virtual void HandleGetInternalConfig(const GetInternalConfigRequest &request, GetInternalConfigReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetAllAvailableResources( + const rpc::GetAllAvailableResourcesRequest &request, + rpc::GetAllAvailableResourcesReply *reply, + rpc::SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeInfoGcsService`. @@ -242,6 +247,7 @@ class NodeInfoGrpcService : public GrpcService { NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources); NODE_INFO_SERVICE_RPC_HANDLER(SetInternalConfig); NODE_INFO_SERVICE_RPC_HANDLER(GetInternalConfig); + NODE_INFO_SERVICE_RPC_HANDLER(GetAllAvailableResources); } private: