diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 0b88e7042..589198bc9 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -321,6 +321,44 @@ class NodeInfoAccessor { /// \return Whether the node is removed. virtual bool IsRemoved(const ClientID &node_id) const = 0; + /// Report heartbeat of a node to GCS asynchronously. + /// + /// \param data_ptr The heartbeat that will be reported to GCS. + /// \param callback Callback that will be called after report finishes. + /// \return Status + // TODO(micafan) NodeStateAccessor will call this method to report heartbeat. + virtual Status AsyncReportHeartbeat( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Subscribe to the heartbeat of each node from GCS. + /// + /// \param subscribe Callback that will be called each time when heartbeat is updated. + /// \param done Callback that will be called when subscription is complete. + /// \return Status + virtual Status AsyncSubscribeHeartbeat( + const SubscribeCallback &subscribe, + const StatusCallback &done) = 0; + + /// Report state of all nodes to GCS asynchronously. + /// + /// \param data_ptr The heartbeats that will be reported to GCS. + /// \param callback Callback that will be called after report finishes. + /// \return Status + virtual Status AsyncReportBatchHeartbeat( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Subscribe batched state of all nodes from GCS. + /// + /// \param subscribe Callback that will be called each time when batch heartbeat is + /// updated. + /// \param done Callback that will be called when subscription is complete. + /// \return Status + virtual Status AsyncSubscribeBatchHeartbeat( + const ItemCallback &subscribe, + const StatusCallback &done) = 0; + protected: NodeInfoAccessor() = default; }; @@ -329,4 +367,4 @@ class NodeInfoAccessor { } // namespace ray -#endif // RAY_GCS_ACCESSOR_H \ No newline at end of file +#endif // RAY_GCS_ACCESSOR_H diff --git a/src/ray/gcs/callback.h b/src/ray/gcs/callback.h index cdf6867b1..78590ffcd 100644 --- a/src/ray/gcs/callback.h +++ b/src/ray/gcs/callback.h @@ -39,6 +39,11 @@ using MultiItemCallback = template using SubscribeCallback = std::function; +/// This callback is used to receive a single item from GCS. +/// \param result The item returned by GCS. +template +using ItemCallback = std::function; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index d99b2a087..9e5f280a2 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -293,7 +293,9 @@ Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &obje } RedisNodeInfoAccessor::RedisNodeInfoAccessor(RedisGcsClient *client_impl) - : client_impl_(client_impl) {} + : client_impl_(client_impl), + heartbeat_sub_executor_(client_impl->heartbeat_table()), + heartbeat_batch_sub_executor_(client_impl->heartbeat_batch_table()) {} Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) { ClientTable &client_table = client_impl_->client_table(); @@ -377,6 +379,55 @@ bool RedisNodeInfoAccessor::IsRemoved(const ClientID &node_id) const { ClientTable &client_table = client_impl_->client_table(); return client_table.IsRemoved(node_id); } +Status RedisNodeInfoAccessor::AsyncReportHeartbeat( + const std::shared_ptr &data_ptr, const StatusCallback &callback) { + HeartbeatTable::WriteCallback on_done = nullptr; + if (callback != nullptr) { + on_done = [callback](RedisGcsClient *client, const ClientID &node_id, + const HeartbeatTableData &data) { callback(Status::OK()); }; + } + + ClientID node_id = ClientID::FromBinary(data_ptr->client_id()); + HeartbeatTable &heartbeat_table = client_impl_->heartbeat_table(); + return heartbeat_table.Add(JobID::Nil(), node_id, data_ptr, on_done); +} + +Status RedisNodeInfoAccessor::AsyncSubscribeHeartbeat( + const SubscribeCallback &subscribe, + const StatusCallback &done) { + RAY_CHECK(subscribe != nullptr); + auto on_subscribe = [subscribe](const ClientID &node_id, + const HeartbeatTableData &data) { + subscribe(node_id, data); + }; + + return heartbeat_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done); +} + +Status RedisNodeInfoAccessor::AsyncReportBatchHeartbeat( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) { + HeartbeatBatchTable::WriteCallback on_done = nullptr; + if (callback != nullptr) { + on_done = [callback](RedisGcsClient *client, const ClientID &node_id, + const HeartbeatBatchTableData &data) { callback(Status::OK()); }; + } + + HeartbeatBatchTable &hb_batch_table = client_impl_->heartbeat_batch_table(); + return hb_batch_table.Add(JobID::Nil(), ClientID::Nil(), data_ptr, on_done); +} + +Status RedisNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( + const ItemCallback &subscribe, const StatusCallback &done) { + RAY_CHECK(subscribe != nullptr); + auto on_subscribe = [subscribe](const ClientID &node_id, + const HeartbeatBatchTableData &data) { + subscribe(data); + }; + + return heartbeat_batch_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, + done); +} } // namespace gcs diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 55e55667a..285d063d9 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -208,12 +208,35 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { bool IsRemoved(const ClientID &node_id) const override; + Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + Status AsyncSubscribeHeartbeat( + const SubscribeCallback &subscribe, + const StatusCallback &done) override; + + Status AsyncReportBatchHeartbeat( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + Status AsyncSubscribeBatchHeartbeat( + const ItemCallback &subscribe, + const StatusCallback &done) override; + private: RedisGcsClient *client_impl_{nullptr}; + + typedef SubscriptionExecutor + HeartbeatSubscriptionExecutor; + HeartbeatSubscriptionExecutor heartbeat_sub_executor_; + + typedef SubscriptionExecutor + HeartbeatBatchSubscriptionExecutor; + HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_; }; } // namespace gcs } // namespace ray -#endif // RAY_GCS_REDIS_ACCESSOR_H \ No newline at end of file +#endif // RAY_GCS_REDIS_ACCESSOR_H diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index e9fd78d1c..005f03a03 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -62,8 +62,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { // TODO: Some API for getting the error on the driver TaskReconstructionLog &task_reconstruction_log(); TaskLeaseTable &task_lease_table(); - HeartbeatTable &heartbeat_table(); - HeartbeatBatchTable &heartbeat_batch_table(); ErrorTable &error_table(); ProfileTable &profile_table(); ActorCheckpointTable &actor_checkpoint_table(); @@ -101,8 +99,10 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { JobTable &job_table(); /// This method will be deprecated, use method Objects() instead ObjectTable &object_table(); - /// This method will be deprecated, use method Nodes() instead. + /// The following three methods will be deprecated, use method Nodes() instead. ClientTable &client_table(); + HeartbeatTable &heartbeat_table(); + HeartbeatBatchTable &heartbeat_batch_table(); /// This method will be deprecated, use method Tasks() instead. raylet::TaskTable &raylet_task_table(); diff --git a/src/ray/gcs/subscription_executor.cc b/src/ray/gcs/subscription_executor.cc index a5a46a071..32c5214c0 100644 --- a/src/ray/gcs/subscription_executor.cc +++ b/src/ray/gcs/subscription_executor.cc @@ -189,6 +189,9 @@ template class SubscriptionExecutor; template class SubscriptionExecutor; template class SubscriptionExecutor; template class SubscriptionExecutor; +template class SubscriptionExecutor; +template class SubscriptionExecutor; } // namespace gcs diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index bcda7001e..af367669f 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -30,12 +30,11 @@ void Monitor::HandleHeartbeat(const ClientID &node_id, } void Monitor::Start() { - const auto heartbeat_callback = [this](gcs::RedisGcsClient *client, const ClientID &id, + const auto heartbeat_callback = [this](const ClientID &id, const HeartbeatTableData &heartbeat_data) { HandleHeartbeat(id, heartbeat_data); }; - RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe( - JobID::Nil(), ClientID::Nil(), heartbeat_callback, nullptr, nullptr)); + RAY_CHECK_OK(gcs_client_.Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr)); Tick(); } @@ -88,8 +87,7 @@ void Monitor::Tick() { for (const auto &heartbeat : heartbeat_buffer_) { batch->add_batch()->CopyFrom(heartbeat.second); } - RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(JobID::Nil(), ClientID::Nil(), - batch, nullptr)); + RAY_CHECK_OK(gcs_client_.Nodes().AsyncReportBatchHeartbeat(batch, nullptr)); heartbeat_buffer_.clear(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 07b6785d4..55374b5a7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -211,14 +211,11 @@ ray::Status NodeManager::RegisterGcs() { // Subscribe to heartbeat batches from the monitor. const auto &heartbeat_batch_added = - [this](gcs::RedisGcsClient *client, const ClientID &id, - const HeartbeatBatchTableData &heartbeat_batch) { + [this](const HeartbeatBatchTableData &heartbeat_batch) { HeartbeatBatchAdded(heartbeat_batch); }; - RAY_RETURN_NOT_OK(gcs_client_->heartbeat_batch_table().Subscribe( - JobID::Nil(), ClientID::Nil(), heartbeat_batch_added, - /*subscribe_callback=*/nullptr, - /*done_callback=*/nullptr)); + RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchHeartbeat( + heartbeat_batch_added, /*done*/ nullptr)); // Subscribe to job updates. const auto job_subscribe_handler = [this](const JobID &job_id, @@ -297,7 +294,6 @@ void NodeManager::Heartbeat() { } last_heartbeat_at_ms_ = now_ms; - auto &heartbeat_table = gcs_client_->heartbeat_table(); auto heartbeat_data = std::make_shared(); SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; heartbeat_data->set_client_id(self_node_id_.Binary()); @@ -338,8 +334,8 @@ void NodeManager::Heartbeat() { } } - ray::Status status = heartbeat_table.Add(JobID::Nil(), self_node_id_, heartbeat_data, - /*success_callback=*/nullptr); + ray::Status status = gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, + /*done*/ nullptr); RAY_CHECK_OK_PREPEND(status, "Heartbeat failed"); if (debug_dump_period_ > 0 &&