diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index c61cbb0de..e4988f729 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -463,7 +463,7 @@ class NodeInfoAccessor { /// \param done Callback that will be called when subscription is complete. /// \return Status virtual Status AsyncSubscribeToResources( - const SubscribeCallback &subscribe, + const ItemCallback &subscribe, const StatusCallback &done) = 0; /// Report heartbeat of a node to GCS asynchronously. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 7b29a1802..1e55b2a2c 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -282,8 +282,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID( ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor( ServiceBasedGcsClient *client_impl) - : client_impl_(client_impl), - resource_sub_executor_(client_impl->GetRedisGcsClient().resource_table()) {} + : client_impl_(client_impl) {} Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) { auto node_id = ClientID::FromBinary(local_node_info.node_id()); @@ -522,12 +521,18 @@ Status ServiceBasedNodeInfoAccessor::AsyncDeleteResources( } Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources( - const SubscribeCallback &subscribe, - const StatusCallback &done) { + const ItemCallback &subscribe, const StatusCallback &done) { RAY_LOG(DEBUG) << "Subscribing node resources change."; RAY_CHECK(subscribe != nullptr); - auto status = - resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done); + + auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { + rpc::NodeResourceChange node_resource_change; + node_resource_change.ParseFromString(data); + subscribe(node_resource_change); + }; + + auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL, + on_subscribe, done); RAY_LOG(DEBUG) << "Finished subscribing node resources change."; return status; } diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 6ce285fe7..117ebc30f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -150,9 +150,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { const std::vector &resource_names, const StatusCallback &callback) override; - Status AsyncSubscribeToResources( - const SubscribeCallback &subscribe, - const StatusCallback &done) override; + Status AsyncSubscribeToResources(const ItemCallback &subscribe, + const StatusCallback &done) override; Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; @@ -177,10 +176,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { using NodeChangeCallback = std::function; - typedef SubscriptionExecutor - DynamicResourceSubscriptionExecutor; - DynamicResourceSubscriptionExecutor resource_sub_executor_; - GcsNodeInfo local_node_info_; ClientID local_node_id_; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 013dc6b03..c994a69da 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -225,9 +225,7 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest { return WaitReady(promise.get_future(), timeout_ms_); } - bool SubscribeToResources( - const gcs::SubscribeCallback - &subscribe) { + bool SubscribeToResources(const gcs::ItemCallback &subscribe) { std::promise promise; RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToResources( subscribe, [&promise](Status status) { promise.set_value(status.ok()); })); @@ -613,12 +611,11 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResources) { // Subscribe to node resource changes. std::atomic add_count(0); std::atomic remove_count(0); - auto on_subscribe = [&add_count, &remove_count]( - const ClientID &id, - const gcs::ResourceChangeNotification ¬ification) { - if (notification.IsAdded()) { + auto on_subscribe = [&add_count, + &remove_count](const rpc::NodeResourceChange ¬ification) { + if (0 == notification.deleted_resources_size()) { ++add_count; - } else if (notification.IsRemoved()) { + } else { ++remove_count; } }; diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index cc14fea82..fffbe54a6 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -228,9 +228,19 @@ void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &re for (auto &entry : *to_be_updated_resources) { iter->second[entry.first] = entry.second; } - auto on_done = [node_id, to_be_updated_resources, reply, - send_reply_callback](Status status) { + auto on_done = [this, node_id, to_be_updated_resources, reply, + send_reply_callback](const Status &status) { RAY_CHECK_OK(status); + rpc::NodeResourceChange node_resource_change; + node_resource_change.set_node_id(node_id.Binary()); + for (auto &it : *to_be_updated_resources) { + (*node_resource_change.mutable_updated_resources())[it.first] = + it.second->resource_capacity(); + } + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(), + node_resource_change.SerializeAsString(), + nullptr)); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); RAY_LOG(DEBUG) << "Finished updating resources, node id = " << node_id; }; @@ -255,8 +265,18 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re for (auto &resource_name : resource_names) { iter->second.erase(resource_name); } - auto on_done = [reply, send_reply_callback](Status status) { + auto on_done = [this, node_id, resource_names, reply, + send_reply_callback](const Status &status) { RAY_CHECK_OK(status); + rpc::NodeResourceChange node_resource_change; + node_resource_change.set_node_id(node_id.Binary()); + for (const auto &resource_name : resource_names) { + node_resource_change.add_deleted_resources(resource_name); + } + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(), + node_resource_change.SerializeAsString(), + nullptr)); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; RAY_CHECK_OK( diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index b1ef49365..53ad39373 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -308,8 +308,7 @@ struct GcsServerMocker { } Status AsyncSubscribeToResources( - const gcs::SubscribeCallback - &subscribe, + const gcs::ItemCallback &subscribe, const gcs::StatusCallback &done) override { return Status::NotImplemented(""); } diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 8356dfdd6..e31399cb1 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -27,6 +27,7 @@ namespace gcs { #define JOB_CHANNEL "JOB" #define NODE_CHANNEL "NODE" +#define NODE_RESOURCE_CHANNEL "NODE_RESOURCE" #define WORKER_FAILURE_CHANNEL "WORKER_FAILURE" #define OBJECT_CHANNEL "OBJECT" #define TASK_CHANNEL "TASK" diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index d1f53378d..3f1d41008 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -711,10 +711,25 @@ Status RedisNodeInfoAccessor::AsyncDeleteResources( } Status RedisNodeInfoAccessor::AsyncSubscribeToResources( - const SubscribeCallback &subscribe, - const StatusCallback &done) { + const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); - return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done); + auto on_subscribe = [subscribe](const ClientID &id, + const ResourceChangeNotification &result) { + rpc::NodeResourceChange node_resource_change; + node_resource_change.set_node_id(id.Binary()); + if (result.IsAdded()) { + for (auto &it : result.GetData()) { + (*node_resource_change.mutable_updated_resources())[it.first] = + it.second->resource_capacity(); + } + } else { + for (auto &it : result.GetData()) { + node_resource_change.add_deleted_resources(it.first); + } + } + subscribe(node_resource_change); + }; + return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done); } RedisErrorInfoAccessor::RedisErrorInfoAccessor(RedisGcsClient *client_impl) diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index b50f244b7..7561c2f0c 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -319,9 +319,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { const std::vector &resource_names, const StatusCallback &callback) override; - Status AsyncSubscribeToResources( - const SubscribeCallback &subscribe, - const StatusCallback &done) override; + Status AsyncSubscribeToResources(const ItemCallback &subscribe, + const StatusCallback &done) override; Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; diff --git a/src/ray/gcs/test/redis_node_info_accessor_test.cc b/src/ray/gcs/test/redis_node_info_accessor_test.cc index f4193d570..0dd2d3dfe 100644 --- a/src/ray/gcs/test/redis_node_info_accessor_test.cc +++ b/src/ray/gcs/test/redis_node_info_accessor_test.cc @@ -126,15 +126,15 @@ TEST_F(NodeDynamicResourceTest, Subscribe) { } WaitPendingDone(wait_pending_timeout_); - auto subscribe = [this](const ClientID &id, - const ResourceChangeNotification ¬ification) { + auto subscribe = [this](const rpc::NodeResourceChange ¬ification) { + auto id = ClientID::FromBinary(notification.node_id()); RAY_LOG(INFO) << "receive client id=" << id; auto it = id_to_resource_map_.find(id); ASSERT_TRUE(it != id_to_resource_map_.end()); - if (notification.IsAdded()) { - ASSERT_EQ(notification.GetData().size(), it->second.size()); + if (0 == notification.deleted_resources_size()) { + ASSERT_EQ(notification.updated_resources_size(), it->second.size()); } else { - ASSERT_EQ(notification.GetData().size(), resource_to_delete_.size()); + ASSERT_EQ(notification.deleted_resources_size(), resource_to_delete_.size()); } --sub_pending_count_; }; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index ab972e585..bbdf50039 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -317,6 +317,16 @@ message ObjectLocationChange { ObjectTableData data = 2; } +// A notification message about one node's resources being changed. +message NodeResourceChange { + // ID of the node whose resources have changed. + bytes node_id = 1; + // Labels of the updated resources and their latest capacities. + map updated_resources = 2; + // Labels of the resources that were deleted. + repeated string deleted_resources = 3; +} + message PubSubMessage { bytes id = 1; bytes data = 2; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index eee8809a7..9483a7895 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -218,22 +218,17 @@ ray::Status NodeManager::RegisterGcs() { RAY_CHECK_OK(status); // Subscribe to resource changes. const auto &resources_changed = - [this](const ClientID &id, - const gcs::ResourceChangeNotification &resource_notification) { - if (resource_notification.IsAdded()) { - ResourceSet resource_set; - for (auto &entry : resource_notification.GetData()) { - resource_set.AddOrUpdateResource(entry.first, - entry.second->resource_capacity()); - } + [this](const rpc::NodeResourceChange &resource_notification) { + auto id = ClientID::FromBinary(resource_notification.node_id()); + if (resource_notification.updated_resources_size() != 0) { + ResourceSet resource_set( + MapFromProtobuf(resource_notification.updated_resources())); ResourceCreateUpdated(id, resource_set); - } else { - RAY_CHECK(resource_notification.IsRemoved()); - std::vector resource_names; - for (auto &entry : resource_notification.GetData()) { - resource_names.push_back(entry.first); - } - ResourceDeleted(id, resource_names); + } + + if (resource_notification.deleted_resources_size() != 0) { + ResourceDeleted( + id, VectorFromProtobuf(resource_notification.deleted_resources())); } }; RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToResources(