diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index e074595c1..fc9f4e7f3 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -102,7 +102,6 @@ def test_worker_stats(shutdown_only): # Check that the rest of the processes are workers, 1 for each CPU. assert len(reply.workers_stats) == num_cpus + 1 views = [view.view_name for view in reply.view_data] - assert "redis_latency" in views assert "local_available_resource" in views # Check that all processes are Python. pids = [worker.pid for worker in reply.workers_stats] diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index a622254fa..c61cbb0de 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -392,7 +392,8 @@ class NodeInfoAccessor { /// Subscribe to node addition and removal events from GCS and cache those information. /// /// \param subscribe Callback that will be called if a node is - /// added or a node is removed. + /// added or a node is removed. The callback needs to be idempotent because it will also + /// be called for existing nodes. /// \param done Callback that will be called when subscription is complete. /// \return Status virtual Status AsyncSubscribeToNodeChange( diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 443429c8c..bf266abce 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -399,33 +399,53 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange( const StatusCallback &done) { RAY_LOG(DEBUG) << "Subscribing node change."; RAY_CHECK(subscribe != nullptr); - ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table(); - auto status = client_table.SubscribeToNodeChange(subscribe, done); + RAY_CHECK(node_change_callback_ == nullptr); + node_change_callback_ = subscribe; + + auto on_subscribe = [this](const std::string &id, const std::string &data) { + GcsNodeInfo node_info; + node_info.ParseFromString(data); + HandleNotification(node_info); + }; + + auto on_done = [this, subscribe, done](const Status &status) { + // Get nodes from GCS Service. + auto callback = [this, subscribe, done]( + const Status &status, + const std::vector &node_info_list) { + for (auto &node_info : node_info_list) { + HandleNotification(node_info); + } + if (done) { + done(status); + } + }; + RAY_CHECK_OK(AsyncGetAll(callback)); + }; + + auto status = + client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done); RAY_LOG(DEBUG) << "Finished subscribing node change."; return status; } boost::optional ServiceBasedNodeInfoAccessor::Get( const ClientID &node_id) const { - GcsNodeInfo node_info; - ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table(); - bool found = client_table.GetClient(node_id, &node_info); - boost::optional optional_node; - if (found) { - optional_node = std::move(node_info); + RAY_CHECK(!node_id.IsNil()); + auto entry = node_cache_.find(node_id); + if (entry != node_cache_.end()) { + return entry->second; } - return optional_node; + return boost::none; } const std::unordered_map &ServiceBasedNodeInfoAccessor::GetAll() const { - ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table(); - return client_table.GetAllClients(); + return node_cache_; } bool ServiceBasedNodeInfoAccessor::IsRemoved(const ClientID &node_id) const { - ClientTable &client_table = client_impl_->GetRedisGcsClient().client_table(); - return client_table.IsRemoved(node_id); + return removed_nodes_.count(node_id) == 1; } Status ServiceBasedNodeInfoAccessor::AsyncGetResources( @@ -567,6 +587,45 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( return status; } +void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) { + ClientID node_id = ClientID::FromBinary(node_info.node_id()); + bool is_alive = (node_info.state() == GcsNodeInfo::ALIVE); + auto entry = node_cache_.find(node_id); + bool is_notif_new; + if (entry == node_cache_.end()) { + // If the entry is not in the cache, then the notification is new. + is_notif_new = true; + } else { + // If the entry is in the cache, then the notification is new if the node + // was alive and is now dead or resources have been updated. + bool was_alive = (entry->second.state() == GcsNodeInfo::ALIVE); + is_notif_new = was_alive && !is_alive; + // Once a node with a given ID has been removed, it should never be added + // again. If the entry was in the cache and the node was deleted, check + // that this new notification is not an insertion. + if (!was_alive) { + RAY_CHECK(!is_alive) + << "Notification for addition of a node that was already removed:" << node_id; + } + } + + // Add the notification to our cache. + RAY_LOG(INFO) << "Received notification for node id = " << node_id + << ", IsAlive = " << is_alive; + node_cache_[node_id] = node_info; + + // If the notification is new, call registered callback. + if (is_notif_new) { + if (is_alive) { + RAY_CHECK(removed_nodes_.find(node_id) == removed_nodes_.end()); + } else { + removed_nodes_.insert(node_id); + } + GcsNodeInfo &cache_data = node_cache_[node_id]; + node_change_callback_(node_id, cache_data); + } +} + ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 38d0c0c4b..8e713d007 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -170,8 +170,13 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { const StatusCallback &done) override; private: + void HandleNotification(const GcsNodeInfo &node_info); + ServiceBasedGcsClient *client_impl_; + using NodeChangeCallback = + std::function; + typedef SubscriptionExecutor DynamicResourceSubscriptionExecutor; DynamicResourceSubscriptionExecutor resource_sub_executor_; @@ -188,6 +193,14 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { ClientID local_node_id_; Sequencer sequencer_; + + /// The callback to call when a new node is added or a node is removed. + NodeChangeCallback node_change_callback_{nullptr}; + + /// A cache for information about all nodes. + std::unordered_map node_cache_; + /// The set of removed nodes. + std::unordered_set removed_nodes_; }; /// \class ServiceBasedTaskInfoAccessor diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 939c7d9aa..45eb1b8de 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -100,18 +100,25 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() { ////////////////////////////////////////////////////////////////////////////////////////// GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service, gcs::NodeInfoAccessor &node_info_accessor, - gcs::ErrorInfoAccessor &error_info_accessor) + gcs::ErrorInfoAccessor &error_info_accessor, + std::shared_ptr &gcs_pub_sub) : node_info_accessor_(node_info_accessor), error_info_accessor_(error_info_accessor), node_failure_detector_(new NodeFailureDetector( - io_service, node_info_accessor, [this](const ClientID &node_id) { + io_service, node_info_accessor, + [this](const ClientID &node_id) { if (auto node = RemoveNode(node_id, /* is_intended = */ false)) { node->set_state(rpc::GcsNodeInfo::DEAD); RAY_CHECK(dead_nodes_.emplace(node_id, node).second); - RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, nullptr)); + auto on_done = [this, node_id, node](const Status &status) { + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), + node->SerializeAsString(), nullptr)); + }; + RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, on_done)); // TODO(Shanly): Remove node resources from resource table. } - })) { + })), + gcs_pub_sub_(gcs_pub_sub) { // TODO(Shanly): Load node info list from storage synchronously. // TODO(Shanly): Load cluster resources from storage synchronously. } @@ -122,9 +129,12 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, ClientID node_id = ClientID::FromBinary(request.node_info().node_id()); RAY_LOG(INFO) << "Registering node info, node id = " << node_id; AddNode(std::make_shared(request.node_info())); - auto on_done = [node_id, reply, send_reply_callback](Status status) { + auto on_done = [this, node_id, request, reply, + send_reply_callback](const Status &status) { RAY_CHECK_OK(status); RAY_LOG(INFO) << "Finished registering node info, node id = " << node_id; + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), + request.node_info().SerializeAsString(), nullptr)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; RAY_CHECK_OK(node_info_accessor_.AsyncRegister(request.node_info(), on_done)); @@ -135,14 +145,17 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ rpc::SendReplyCallback send_reply_callback) { ClientID node_id = ClientID::FromBinary(request.node_id()); RAY_LOG(INFO) << "Unregistering node info, node id = " << node_id; - auto on_done = [node_id, request, reply, send_reply_callback](Status status) { - RAY_CHECK_OK(status); - RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; if (auto node = RemoveNode(node_id, /* is_intended = */ true)) { node->set_state(rpc::GcsNodeInfo::DEAD); RAY_CHECK(dead_nodes_.emplace(node_id, node).second); + + auto on_done = [this, node_id, node, reply, send_reply_callback](Status status) { + RAY_CHECK_OK(status); + RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id; + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), + node->SerializeAsString(), nullptr)); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }; RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, on_done)); // TODO(Shanly): Remove node resources from resource table. } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index b38b917c6..9f9bea1c7 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -22,6 +22,7 @@ #include #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "ray/gcs/pubsub/gcs_pub_sub.h" namespace ray { namespace gcs { @@ -39,7 +40,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// when detecting the death of nodes. explicit GcsNodeManager(boost::asio::io_service &io_service, gcs::NodeInfoAccessor &node_info_accessor, - gcs::ErrorInfoAccessor &error_info_accessor); + gcs::ErrorInfoAccessor &error_info_accessor, + std::shared_ptr &gcs_pub_sub); /// Handle register rpc request come from raylet. void HandleRegisterNode(const rpc::RegisterNodeRequest &request, @@ -196,6 +198,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Listeners which monitors the removal of nodes. std::vector)>> node_removed_listeners_; + /// A publisher for publishing gcs messages. + std::shared_ptr gcs_pub_sub_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index b89853a0e..6f4f7e3b3 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -40,12 +40,12 @@ void GcsServer::Start() { // Init backend client. InitBackendClient(); - // Init gcs node_manager. - InitGcsNodeManager(); - // Init gcs pub sub instance. gcs_pub_sub_ = std::make_shared(redis_gcs_client_->GetRedisClient()); + // Init gcs node_manager. + InitGcsNodeManager(); + // Init gcs detector. gcs_redis_failure_detector_ = std::make_shared( main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); }); @@ -128,8 +128,9 @@ void GcsServer::InitBackendClient() { void GcsServer::InitGcsNodeManager() { RAY_CHECK(redis_gcs_client_ != nullptr); - gcs_node_manager_ = std::make_shared( - main_service_, redis_gcs_client_->Nodes(), redis_gcs_client_->Errors()); + gcs_node_manager_ = + std::make_shared(main_service_, redis_gcs_client_->Nodes(), + redis_gcs_client_->Errors(), gcs_pub_sub_); } void GcsServer::InitGcsActorManager() { diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index c2f04e265..f333e5143 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -26,7 +26,7 @@ class GcsActorSchedulerTest : public ::testing::Test { raylet_client_ = std::make_shared(); worker_client_ = std::make_shared(); gcs_node_manager_ = std::make_shared( - io_service_, node_info_accessor_, error_info_accessor_); + io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_); gcs_actor_scheduler_ = std::make_shared( io_service_, actor_info_accessor_, *gcs_node_manager_, /*schedule_failure_handler=*/ @@ -55,6 +55,7 @@ class GcsActorSchedulerTest : public ::testing::Test { std::shared_ptr gcs_actor_scheduler_; std::vector> success_actors_; std::vector> failure_actors_; + std::shared_ptr gcs_pub_sub_; }; TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) { diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index b0e387d46..38e55f8dd 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -19,13 +19,17 @@ #include "gtest/gtest.h" namespace ray { -class GcsNodeManagerTest : public ::testing::Test {}; +class GcsNodeManagerTest : public ::testing::Test { + protected: + std::shared_ptr gcs_pub_sub_; +}; TEST_F(GcsNodeManagerTest, TestManagement) { boost::asio::io_service io_service; auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor(); auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor(); - gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor); + gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor, + gcs_pub_sub_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = ClientID::FromBinary(node->node_id()); @@ -41,7 +45,8 @@ TEST_F(GcsNodeManagerTest, TestListener) { boost::asio::io_service io_service; auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor(); auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor(); - gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor); + gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor, + gcs_pub_sub_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 55ae26afe..4169cbaee 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -26,6 +26,7 @@ namespace ray { namespace gcs { #define JOB_CHANNEL "JOB" +#define NODE_CHANNEL "NODE" #define WORKER_FAILURE_CHANNEL "WORKER_FAILURE" #define OBJECT_CHANNEL "OBJECT" #define TASK_CHANNEL "TASK" diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7e90d03ca..156b1747e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -514,6 +514,12 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) { const ClientID node_id = ClientID::FromBinary(node_info.node_id()); RAY_LOG(DEBUG) << "[NodeAdded] Received callback from client id " << node_id; + if (1 == cluster_resource_map_.count(node_id)) { + RAY_LOG(DEBUG) << "Received notification of a new node that already exists: " + << node_id; + return; + } + if (node_id == self_node_id_) { // We got a notification for ourselves, so we are connected to the GCS now. // Save this NodeManager's resource information in the cluster resource map. @@ -521,13 +527,6 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) { return; } - auto entry = remote_node_manager_clients_.find(node_id); - if (entry != remote_node_manager_clients_.end()) { - RAY_LOG(DEBUG) << "Received notification of a new client that already exists: " - << node_id; - return; - } - // Initialize a rpc client to the new node manager. std::unique_ptr client( new rpc::NodeManagerClient(node_info.node_manager_address(), @@ -565,15 +564,16 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { // not be necessary. // Remove the client from the resource map. - cluster_resource_map_.erase(node_id); + if (0 == cluster_resource_map_.erase(node_id)) { + RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id + << "."; + return; + } // Remove the node manager client. const auto client_entry = remote_node_manager_clients_.find(node_id); if (client_entry != remote_node_manager_clients_.end()) { remote_node_manager_clients_.erase(client_entry); - } else { - RAY_LOG(WARNING) << "Received NodeRemoved callback for an unknown client " << node_id - << "."; } // For any live actors that were on the dead node, broadcast a notification