From b663bc6d6716b9faa1d6ec9792249ba1dab69fae Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Thu, 12 Mar 2020 22:13:56 +0800 Subject: [PATCH] Use gcs server to replace raylet monitor when RAY_GCS_SERVICE_ENABLED=true (#7166) --- python/ray/node.py | 4 +- python/ray/tests/test_component_failures_3.py | 19 ++- python/ray/tests/test_multinode_failures_2.py | 23 +++- python/ray/tests/test_tempfile.py | 8 +- src/ray/core_worker/test/core_worker_test.cc | 8 +- .../gcs/gcs_client/service_based_accessor.cc | 27 ++-- .../test/service_based_gcs_client_test.cc | 22 --- src/ray/gcs/gcs_server/gcs_node_manager.cc | 127 ++++++++++++++++++ src/ray/gcs/gcs_server/gcs_node_manager.h | 68 ++++++++++ src/ray/gcs/gcs_server/gcs_server.cc | 10 +- src/ray/gcs/gcs_server/gcs_server.h | 11 +- .../gcs/gcs_server/node_info_handler_impl.cc | 45 +------ .../gcs/gcs_server/node_info_handler_impl.h | 12 +- .../gcs_server/test/gcs_server_rpc_test.cc | 15 --- src/ray/protobuf/gcs_service.proto | 3 - src/ray/rpc/gcs_server/gcs_rpc_client.h | 4 - src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 - 17 files changed, 269 insertions(+), 142 deletions(-) create mode 100644 src/ray/gcs/gcs_server/gcs_node_manager.cc create mode 100644 src/ray/gcs/gcs_server/gcs_node_manager.h diff --git a/python/ray/node.py b/python/ray/node.py index 79fb2528e..b42d16eaf 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -620,9 +620,11 @@ class Node: if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, None): self.start_gcs_server() + else: + self.start_raylet_monitor() self.start_monitor() - self.start_raylet_monitor() + if self._ray_params.include_webui: self.start_dashboard(require_webui=True) elif self._ray_params.include_webui is None: diff --git a/python/ray/tests/test_component_failures_3.py b/python/ray/tests/test_component_failures_3.py index 0c8b030e5..8f082312b 100644 --- a/python/ray/tests/test_component_failures_3.py +++ b/python/ray/tests/test_component_failures_3.py @@ -82,10 +82,10 @@ def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_plasma_store() ray.worker._global_node.kill_log_monitor() ray.worker._global_node.kill_monitor() - ray.worker._global_node.kill_raylet_monitor() - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, None): ray.worker._global_node.kill_gcs_server() + else: + ray.worker._global_node.kill_raylet_monitor() # If the driver can reach the tearDown method, then it is still alive. @@ -97,14 +97,11 @@ def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, None): - process_infos = ( - all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + - all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] + - all_processes[ray_constants.PROCESS_TYPE_RAYLET] + - all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + - all_processes[ray_constants.PROCESS_TYPE_MONITOR] + - all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR]) - assert len(process_infos) == 6 + process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + + all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] + + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + + all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + + all_processes[ray_constants.PROCESS_TYPE_MONITOR]) else: process_infos = ( all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + @@ -112,7 +109,7 @@ def test_driver_lives_parallel(ray_start_regular): all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + all_processes[ray_constants.PROCESS_TYPE_MONITOR] + all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR]) - assert len(process_infos) == 5 + assert len(process_infos) == 5 # Kill all the components in parallel. for process_info in process_infos: diff --git a/python/ray/tests/test_multinode_failures_2.py b/python/ray/tests/test_multinode_failures_2.py index 6284bbcae..b29b77847 100644 --- a/python/ray/tests/test_multinode_failures_2.py +++ b/python/ray/tests/test_multinode_failures_2.py @@ -132,7 +132,10 @@ def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_plasma_store() ray.worker._global_node.kill_log_monitor() ray.worker._global_node.kill_monitor() - ray.worker._global_node.kill_raylet_monitor() + if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, None): + ray.worker._global_node.kill_gcs_server() + else: + ray.worker._global_node.kill_raylet_monitor() # If the driver can reach the tearDown method, then it is still alive. @@ -142,11 +145,19 @@ def test_driver_lives_sequential(ray_start_regular): reason="Hanging with new GCS API.") def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes - process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + - all_processes[ray_constants.PROCESS_TYPE_RAYLET] + - all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + - all_processes[ray_constants.PROCESS_TYPE_MONITOR] + - all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR]) + if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, None): + process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + + all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] + + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + + all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + + all_processes[ray_constants.PROCESS_TYPE_MONITOR]) + else: + process_infos = ( + all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + + all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] + + all_processes[ray_constants.PROCESS_TYPE_MONITOR] + + all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR]) assert len(process_infos) == 5 # Kill all the components in parallel. diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index bc88aefd3..6acd7dc01 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -101,13 +101,15 @@ def test_raylet_tempfiles(shutdown_only): log_files = set(os.listdir(node.get_logs_dir_path())) log_files_expected = { "log_monitor.out", "log_monitor.err", "plasma_store.out", - "plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out", - "raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err", - "redis.out", "redis.err", "raylet.out", "raylet.err" + "plasma_store.err", "monitor.out", "monitor.err", "redis-shard_0.out", + "redis-shard_0.err", "redis.out", "redis.err", "raylet.out", + "raylet.err" } if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, None): log_files_expected.update({"gcs_server.out", "gcs_server.err"}) + else: + log_files_expected.update({"raylet_monitor.out", "raylet_monitor.err"}) assert log_files.issuperset(log_files_expected) diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 822700f74..6d597b893 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -108,13 +108,13 @@ class CoreWorkerTest : public ::testing::Test { store_socket = StartStore(); } - // core worker test relies on node resources. It's important that one raylet can - // receive the heartbeat from another. So starting raylet monitor is required here. - raylet_monitor_pid_ = StartRayletMonitor("127.0.0.1"); - // start gcs server if (getenv("RAY_GCS_SERVICE_ENABLED") != nullptr) { gcs_server_pid_ = StartGcsServer("127.0.0.1"); + } else { + // core worker test relies on node resources. It's important that one raylet can + // receive the heartbeat from another. So starting raylet monitor is required here. + raylet_monitor_pid_ = StartRayletMonitor("127.0.0.1"); } // start raylet on each node. Assign each node with different resources so that diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 9d0260ec8..35817a22f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -507,30 +507,19 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( Status ServiceBasedNodeInfoAccessor::AsyncSubscribeHeartbeat( const SubscribeCallback &subscribe, const StatusCallback &done) { - RAY_LOG(DEBUG) << "Subscribing heartbeat."; - RAY_CHECK(subscribe != nullptr); - auto status = - heartbeat_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done); - RAY_LOG(DEBUG) << "Finished subscribing heartbeat."; - return status; + const std::string error_msg = + "Unsupported method of AsyncSubscribeHeartbeat in ServiceBasedNodeInfoAccessor."; + RAY_LOG(FATAL) << error_msg; + return Status::Invalid(error_msg); } Status ServiceBasedNodeInfoAccessor::AsyncReportBatchHeartbeat( const std::shared_ptr &data_ptr, const StatusCallback &callback) { - RAY_LOG(DEBUG) << "Reporting batch heartbeat, batch size = " << data_ptr->batch_size(); - rpc::ReportBatchHeartbeatRequest request; - request.mutable_heartbeat_batch()->CopyFrom(*data_ptr); - client_impl_->GetGcsRpcClient().ReportBatchHeartbeat( - request, [data_ptr, callback](const Status &status, - const rpc::ReportBatchHeartbeatReply &reply) { - if (callback) { - callback(status); - } - RAY_LOG(DEBUG) << "Finished reporting batch heartbeat, status = " << status - << ", batch size = " << data_ptr->batch_size(); - }); - return Status::OK(); + const std::string error_msg = + "Unsupported method of AsyncReportBatchHeartbeat in ServiceBasedNodeInfoAccessor."; + RAY_LOG(FATAL) << error_msg; + return Status::Invalid(error_msg); } Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 4ef04a527..2dab31b13 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -225,14 +225,6 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest { return WaitReady(promise.get_future(), timeout_ms_); } - bool ReportBatchHeartbeat( - const std::shared_ptr batch_heartbeat) { - std::promise promise; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportBatchHeartbeat( - batch_heartbeat, [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); - } - bool AddTask(const std::shared_ptr task) { std::promise promise; RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd( @@ -563,14 +555,6 @@ TEST_F(ServiceBasedGcsGcsClientTest, TestNodeResources) { } TEST_F(ServiceBasedGcsGcsClientTest, TestNodeHeartbeat) { - int heartbeat_count = 0; - auto heartbeat_subscribe = [&heartbeat_count](const ClientID &id, - const gcs::HeartbeatTableData &result) { - ++heartbeat_count; - }; - RAY_CHECK_OK( - gcs_client_->Nodes().AsyncSubscribeHeartbeat(heartbeat_subscribe, nullptr)); - int heartbeat_batch_count = 0; auto heartbeat_batch_subscribe = [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { @@ -584,12 +568,6 @@ TEST_F(ServiceBasedGcsGcsClientTest, TestNodeHeartbeat) { auto heartbeat = std::make_shared(); heartbeat->set_client_id(node_id.Binary()); ASSERT_TRUE(ReportHeartbeat(heartbeat)); - WaitPendingDone(heartbeat_count, 1); - - // Report batch heartbeat - auto batch_heartbeat = std::make_shared(); - batch_heartbeat->add_batch()->set_client_id(node_id.Binary()); - ASSERT_TRUE(ReportBatchHeartbeat(batch_heartbeat)); WaitPendingDone(heartbeat_batch_count, 1); } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc new file mode 100644 index 000000000..74ee2f6ca --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -0,0 +1,127 @@ +#include "gcs_node_manager.h" +#include +#include +#include +#include "ray/gcs/redis_gcs_client.h" + +namespace ray { +namespace gcs { + +GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service, + std::shared_ptr gcs_client) + : client_call_manager_(io_service), + gcs_client_(std::move(gcs_client)), + num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), + heartbeat_timer_(io_service) { + Start(); +} + +void GcsNodeManager::HandleHeartbeat(const ClientID &node_id, + rpc::HeartbeatTableData &&heartbeat_data) { + heartbeats_[node_id] = num_heartbeats_timeout_; + heartbeat_buffer_[node_id] = heartbeat_data; +} + +void GcsNodeManager::Start() { + RAY_LOG(INFO) << "Starting gcs node manager."; + const auto lookup_callback = [this](Status status, + const std::vector &node_info_list) { + for (const auto &node_info : node_info_list) { + if (node_info.state() != rpc::GcsNodeInfo::DEAD) { + // If there're any existing alive clients in client table, add them to + // our `heartbeats_` cache. Thus, if they died before monitor starts, + // we can also detect their death. + // Use `emplace` instead of `operator []` because we just want to add this + // client to `heartbeats_` only if it has not yet received heartbeat event. + // Besides, it is not necessary to add an empty `HeartbeatTableData` + // to `heartbeat_buffer_` as it doesn't make sense to broadcast an empty + // message to the cluster and it's ok to add it when actually receive + // its heartbeat event. + heartbeats_.emplace(ClientID::FromBinary(node_info.node_id()), + num_heartbeats_timeout_); + } + } + Tick(); + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback)); +} + +/// A periodic timer that checks for timed out clients. +void GcsNodeManager::Tick() { + DetectDeadNodes(); + SendBatchedHeartbeat(); + ScheduleTick(); +} + +void GcsNodeManager::DetectDeadNodes() { + for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { + it->second = it->second - 1; + if (it->second == 0) { + if (dead_nodes_.count(it->first) == 0) { + auto node_id = it->first; + RAY_LOG(WARNING) << "Node timed out: " << node_id; + auto lookup_callback = [this, node_id](Status status, + const std::vector &all_node) { + RAY_CHECK_OK(status); + bool marked = false; + for (const auto &node : all_node) { + if (node_id.Binary() == node.node_id() && node.state() == GcsNodeInfo::DEAD) { + // The node has been marked dead by itself. + marked = true; + break; + } + } + if (!marked) { + RAY_CHECK_OK(gcs_client_->Nodes().AsyncUnregister(node_id, nullptr)); + // Broadcast a warning to all of the drivers indicating that the node + // has been marked as dead. + // TODO(rkn): Define this constant somewhere else. + std::string type = "node_removed"; + std::ostringstream error_message; + error_message << "The node with node ID " << node_id + << " has been marked dead because the monitor" + << " has missed too many heartbeats from it."; + auto error_data_ptr = + gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); + RAY_CHECK_OK( + gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); + } + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback)); + dead_nodes_.insert(node_id); + } + it = heartbeats_.erase(it); + } else { + it++; + } + } +} + +void GcsNodeManager::SendBatchedHeartbeat() { + if (!heartbeat_buffer_.empty()) { + auto batch = std::make_shared(); + for (const auto &heartbeat : heartbeat_buffer_) { + batch->add_batch()->CopyFrom(heartbeat.second); + } + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportBatchHeartbeat(batch, nullptr)); + heartbeat_buffer_.clear(); + } +} + +void GcsNodeManager::ScheduleTick() { + auto heartbeat_period = boost::posix_time::milliseconds( + RayConfig::instance().raylet_heartbeat_timeout_milliseconds()); + heartbeat_timer_.expires_from_now(heartbeat_period); + heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::system::errc::operation_canceled) { + // `operation_canceled` is set when `heartbeat_timer_` is canceled or destroyed. + // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) + return; + } + RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message(); + Tick(); + }); +} + +} // namespace gcs +} // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h new file mode 100644 index 000000000..fe0df104a --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -0,0 +1,68 @@ +#ifndef RAY_GCS_NODE_MANAGER_H +#define RAY_GCS_NODE_MANAGER_H + +#include +#include +#include + +namespace ray { + +namespace gcs { +class RedisGcsClient; +/// GcsNodeManager is responsible for managing and monitoring nodes. +class GcsNodeManager { + public: + /// Create a GcsNodeManager. + /// + /// \param io_service The event loop to run the monitor on. + /// \param gcs_client The client of gcs to access/pub/sub data. + explicit GcsNodeManager(boost::asio::io_service &io_service, + std::shared_ptr gcs_client); + + /// Handle a heartbeat from a Raylet. + /// + /// \param node_id The client ID of the Raylet that sent the heartbeat. + /// \param heartbeat_data The heartbeat sent by the client. + void HandleHeartbeat(const ClientID &node_id, rpc::HeartbeatTableData &&heartbeat_data); + + protected: + /// Listen for heartbeats from Raylets and mark Raylets + /// that do not send a heartbeat within a given period as dead. + void Start(); + + /// A periodic timer that fires on every heartbeat period. Raylets that have + /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be + /// marked as dead in the client table. + void Tick(); + + /// Check that if any raylet is inactive due to no heartbeat for a period of time. + /// If found any, mark it as dead. + void DetectDeadNodes(); + + /// Send any buffered heartbeats as a single publish. + void SendBatchedHeartbeat(); + + /// Schedule another tick after a short time. + void ScheduleTick(); + + private: + rpc::ClientCallManager client_call_manager_; + /// A client to the GCS, through which heartbeats are received. + std::shared_ptr gcs_client_; + /// The number of heartbeats that can be missed before a node is removed. + int64_t num_heartbeats_timeout_; + /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. + boost::asio::deadline_timer heartbeat_timer_; + /// For each Raylet that we receive a heartbeat from, the number of ticks + /// that may pass before the Raylet will be declared dead. + std::unordered_map heartbeats_; + /// The Raylets that have been marked as dead in gcs. + std::unordered_set dead_nodes_; + /// A buffer containing heartbeats received from node managers in the last tick. + std::unordered_map heartbeat_buffer_; +}; + +} // namespace gcs +} // namespace ray + +#endif // RAY_GCS_NODE_MANAGER_H diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 76a99208b..aad7e05eb 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -15,6 +15,7 @@ #include "gcs_server.h" #include "actor_info_handler_impl.h" #include "error_info_handler_impl.h" +#include "gcs_node_manager.h" #include "job_info_handler_impl.h" #include "node_info_handler_impl.h" #include "object_info_handler_impl.h" @@ -36,6 +37,9 @@ void GcsServer::Start() { // Init backend client. InitBackendClient(); + // Init gcs node_manager + InitGcsNodeManager(); + // Register rpc service. job_info_handler_ = InitJobInfoHandler(); job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *job_info_handler_)); @@ -105,6 +109,10 @@ void GcsServer::InitBackendClient() { RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status; } +void GcsServer::InitGcsNodeManager() { + gcs_node_manager_ = std::make_shared(main_service_, redis_gcs_client_); +} + std::unique_ptr GcsServer::InitJobInfoHandler() { return std::unique_ptr( new rpc::DefaultJobInfoHandler(*redis_gcs_client_)); @@ -117,7 +125,7 @@ std::unique_ptr GcsServer::InitActorInfoHandler() { std::unique_ptr GcsServer::InitNodeInfoHandler() { return std::unique_ptr( - new rpc::DefaultNodeInfoHandler(*redis_gcs_client_)); + new rpc::DefaultNodeInfoHandler(*redis_gcs_client_, *gcs_node_manager_)); } std::unique_ptr GcsServer::InitObjectInfoHandler() { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 00523c49c..9a8ea226d 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -32,6 +32,8 @@ struct GcsServerConfig { bool is_test = false; }; +class GcsNodeManager; + /// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent /// transmit the command to the backend reliable storage for the time being. /// In the future, GCS server's main responsibility is to manage meta data @@ -56,11 +58,16 @@ class GcsServer { bool IsStarted() const { return is_started_; } protected: - /// Initialize the backend storage client + /// Initialize the backend storage client. /// The gcs server is just the proxy between the gcs client and reliable storage /// for the time being, so we need a backend client to connect to the storage. virtual void InitBackendClient(); + /// Initialize the gcs node manager. + /// The gcs node manager is responsible for managing and monitoring all nodes in the + /// cluster. + virtual void InitGcsNodeManager(); + /// The job info handler virtual std::unique_ptr InitJobInfoHandler(); @@ -99,6 +106,8 @@ class GcsServer { rpc::GrpcServer rpc_server_; /// The main io service to drive event posted from grpc threads. boost::asio::io_context main_service_; + /// The gcs node manager. + std::shared_ptr gcs_node_manager_; /// Job info handler and service std::unique_ptr job_info_handler_; std::unique_ptr job_info_service_; diff --git a/src/ray/gcs/gcs_server/node_info_handler_impl.cc b/src/ray/gcs/gcs_server/node_info_handler_impl.cc index 11a88c3ff..417082e5e 100644 --- a/src/ray/gcs/gcs_server/node_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/node_info_handler_impl.cc @@ -88,50 +88,13 @@ void DefaultNodeInfoHandler::HandleReportHeartbeat( SendReplyCallback send_reply_callback) { ClientID node_id = ClientID::FromBinary(request.heartbeat().client_id()); RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id; - - auto on_done = [node_id, reply, send_reply_callback](Status status) { - if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to report heartbeat: " << status.ToString() - << ", node id = " << node_id; - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - - auto heartbeat_data = std::make_shared(); - heartbeat_data->CopyFrom(request.heartbeat()); - Status status = gcs_client_.Nodes().AsyncReportHeartbeat(heartbeat_data, on_done); - if (!status.ok()) { - on_done(status); - } + rpc::HeartbeatTableData heartbeat_data; + heartbeat_data.CopyFrom(request.heartbeat()); + gcs_node_manager_.HandleHeartbeat(node_id, std::move(heartbeat_data)); + send_reply_callback(Status::OK(), nullptr, nullptr); RAY_LOG(DEBUG) << "Finished reporting heartbeat, node id = " << node_id; } -void DefaultNodeInfoHandler::HandleReportBatchHeartbeat( - const ReportBatchHeartbeatRequest &request, ReportBatchHeartbeatReply *reply, - SendReplyCallback send_reply_callback) { - RAY_LOG(DEBUG) << "Reporting batch heartbeat, batch size = " - << request.heartbeat_batch().batch_size(); - - auto on_done = [&request, reply, send_reply_callback](Status status) { - if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to report batch heartbeat: " << status.ToString() - << ", batch size = " << request.heartbeat_batch().batch_size(); - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - - auto heartbeat_batch_data = std::make_shared(); - heartbeat_batch_data->CopyFrom(request.heartbeat_batch()); - Status status = - gcs_client_.Nodes().AsyncReportBatchHeartbeat(heartbeat_batch_data, on_done); - if (!status.ok()) { - on_done(status); - } - - RAY_LOG(DEBUG) << "Finished reporting batch heartbeat, batch size = " - << request.heartbeat_batch().batch_size(); -} - void DefaultNodeInfoHandler::HandleGetResources(const GetResourcesRequest &request, GetResourcesReply *reply, SendReplyCallback send_reply_callback) { diff --git a/src/ray/gcs/gcs_server/node_info_handler_impl.h b/src/ray/gcs/gcs_server/node_info_handler_impl.h index 720eba3c2..94fff9890 100644 --- a/src/ray/gcs/gcs_server/node_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/node_info_handler_impl.h @@ -15,17 +15,20 @@ #ifndef RAY_GCS_NODE_INFO_HANDLER_IMPL_H #define RAY_GCS_NODE_INFO_HANDLER_IMPL_H +#include "gcs_node_manager.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" namespace ray { + namespace rpc { /// This implementation class of `NodeInfoHandler`. class DefaultNodeInfoHandler : public rpc::NodeInfoHandler { public: - explicit DefaultNodeInfoHandler(gcs::RedisGcsClient &gcs_client) - : gcs_client_(gcs_client) {} + explicit DefaultNodeInfoHandler(gcs::RedisGcsClient &gcs_client, + gcs::GcsNodeManager &gcs_node_manager) + : gcs_client_(gcs_client), gcs_node_manager_(gcs_node_manager) {} void HandleRegisterNode(const RegisterNodeRequest &request, RegisterNodeReply *reply, SendReplyCallback send_reply_callback) override; @@ -42,10 +45,6 @@ class DefaultNodeInfoHandler : public rpc::NodeInfoHandler { ReportHeartbeatReply *reply, SendReplyCallback send_reply_callback) override; - void HandleReportBatchHeartbeat(const ReportBatchHeartbeatRequest &request, - ReportBatchHeartbeatReply *reply, - SendReplyCallback send_reply_callback) override; - void HandleGetResources(const GetResourcesRequest &request, GetResourcesReply *reply, SendReplyCallback send_reply_callback) override; @@ -59,6 +58,7 @@ class DefaultNodeInfoHandler : public rpc::NodeInfoHandler { private: gcs::RedisGcsClient &gcs_client_; + gcs::GcsNodeManager &gcs_node_manager_; }; } // namespace rpc diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 21b595ca6..c7ad75447 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -209,17 +209,6 @@ class GcsServerTest : public RedisServiceManagerForTest { return WaitReady(promise.get_future(), timeout_ms_); } - bool ReportBatchHeartbeat(const rpc::ReportBatchHeartbeatRequest &request) { - std::promise promise; - client_->ReportBatchHeartbeat( - request, - [&promise](const Status &status, const rpc::ReportBatchHeartbeatReply &reply) { - RAY_CHECK_OK(status); - promise.set_value(true); - }); - return WaitReady(promise.get_future(), timeout_ms_); - } - bool UpdateResources(const rpc::UpdateResourcesRequest &request) { std::promise promise; client_->UpdateResources(request, [&promise](const Status &status, @@ -536,10 +525,6 @@ TEST_F(GcsServerTest, TestNodeInfo) { rpc::ReportHeartbeatRequest report_heartbeat_request; report_heartbeat_request.mutable_heartbeat()->set_client_id(node_id.Binary()); ASSERT_TRUE(ReportHeartbeat(report_heartbeat_request)); - rpc::ReportBatchHeartbeatRequest report_batch_heartbeat_request; - report_batch_heartbeat_request.mutable_heartbeat_batch()->add_batch()->set_client_id( - node_id.Binary()); - ASSERT_TRUE(ReportBatchHeartbeat(report_batch_heartbeat_request)); // Unregister node info rpc::UnregisterNodeRequest unregister_node_info_request; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index b7d2aa994..43b5885d2 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -200,9 +200,6 @@ service NodeInfoGcsService { rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply); // Report heartbeat of a node to GCS Service. rpc ReportHeartbeat(ReportHeartbeatRequest) returns (ReportHeartbeatReply); - // Report batch heartbeat to GCS Service. - rpc ReportBatchHeartbeat(ReportBatchHeartbeatRequest) - returns (ReportBatchHeartbeatReply); // Get node's resources from GCS Service. rpc GetResources(GetResourcesRequest) returns (GetResourcesReply); // Update resources of a node in GCS Service. diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 98b85db90..04ea45ee8 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -132,10 +132,6 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat, node_info_grpc_client_, ) - /// Report batch heartbeat to GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportBatchHeartbeat, - node_info_grpc_client_, ) - /// Get node's resources from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetResources, node_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index e621249b1..8e1c84ecb 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -170,10 +170,6 @@ class NodeInfoGcsServiceHandler { ReportHeartbeatReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleReportBatchHeartbeat(const ReportBatchHeartbeatRequest &request, - ReportBatchHeartbeatReply *reply, - SendReplyCallback send_reply_callback) = 0; - virtual void HandleGetResources(const GetResourcesRequest &request, GetResourcesReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -207,7 +203,6 @@ class NodeInfoGrpcService : public GrpcService { NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode); NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo); NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat); - NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat); NODE_INFO_SERVICE_RPC_HANDLER(GetResources); NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources); NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources);