[GCS]Add heartbeat methods to NodeInfoAccessor (#6604)

This commit is contained in:
micafan
2019-12-31 14:17:35 +08:00
committed by Hao Chen
parent 65acb54553
commit cdc1ce4ebf
8 changed files with 134 additions and 20 deletions
+39 -1
View File
@@ -321,6 +321,44 @@ class NodeInfoAccessor {
/// \return Whether the node is removed.
virtual bool IsRemoved(const ClientID &node_id) const = 0;
/// Report heartbeat of a node to GCS asynchronously.
///
/// \param data_ptr The heartbeat that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
// TODO(micafan) NodeStateAccessor will call this method to report heartbeat.
virtual Status AsyncReportHeartbeat(
const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Subscribe to the heartbeat of each node from GCS.
///
/// \param subscribe Callback that will be called each time when heartbeat is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeHeartbeat(
const SubscribeCallback<ClientID, rpc::HeartbeatTableData> &subscribe,
const StatusCallback &done) = 0;
/// Report state of all nodes to GCS asynchronously.
///
/// \param data_ptr The heartbeats that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
virtual Status AsyncReportBatchHeartbeat(
const std::shared_ptr<rpc::HeartbeatBatchTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Subscribe batched state of all nodes from GCS.
///
/// \param subscribe Callback that will be called each time when batch heartbeat is
/// updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeBatchHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
const StatusCallback &done) = 0;
protected:
NodeInfoAccessor() = default;
};
@@ -329,4 +367,4 @@ class NodeInfoAccessor {
} // namespace ray
#endif // RAY_GCS_ACCESSOR_H
#endif // RAY_GCS_ACCESSOR_H
+5
View File
@@ -39,6 +39,11 @@ using MultiItemCallback =
template <typename ID, typename Data>
using SubscribeCallback = std::function<void(const ID &id, const Data &result)>;
/// This callback is used to receive a single item from GCS.
/// \param result The item returned by GCS.
template <typename Data>
using ItemCallback = std::function<void(const Data &result)>;
} // namespace gcs
} // namespace ray
+52 -1
View File
@@ -293,7 +293,9 @@ Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &obje
}
RedisNodeInfoAccessor::RedisNodeInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl) {}
: client_impl_(client_impl),
heartbeat_sub_executor_(client_impl->heartbeat_table()),
heartbeat_batch_sub_executor_(client_impl->heartbeat_batch_table()) {}
Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) {
ClientTable &client_table = client_impl_->client_table();
@@ -377,6 +379,55 @@ bool RedisNodeInfoAccessor::IsRemoved(const ClientID &node_id) const {
ClientTable &client_table = client_impl_->client_table();
return client_table.IsRemoved(node_id);
}
Status RedisNodeInfoAccessor::AsyncReportHeartbeat(
const std::shared_ptr<HeartbeatTableData> &data_ptr, const StatusCallback &callback) {
HeartbeatTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const ClientID &node_id,
const HeartbeatTableData &data) { callback(Status::OK()); };
}
ClientID node_id = ClientID::FromBinary(data_ptr->client_id());
HeartbeatTable &heartbeat_table = client_impl_->heartbeat_table();
return heartbeat_table.Add(JobID::Nil(), node_id, data_ptr, on_done);
}
Status RedisNodeInfoAccessor::AsyncSubscribeHeartbeat(
const SubscribeCallback<ClientID, HeartbeatTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const ClientID &node_id,
const HeartbeatTableData &data) {
subscribe(node_id, data);
};
return heartbeat_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
}
Status RedisNodeInfoAccessor::AsyncReportBatchHeartbeat(
const std::shared_ptr<HeartbeatBatchTableData> &data_ptr,
const StatusCallback &callback) {
HeartbeatBatchTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const ClientID &node_id,
const HeartbeatBatchTableData &data) { callback(Status::OK()); };
}
HeartbeatBatchTable &hb_batch_table = client_impl_->heartbeat_batch_table();
return hb_batch_table.Add(JobID::Nil(), ClientID::Nil(), data_ptr, on_done);
}
Status RedisNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
const ItemCallback<HeartbeatBatchTableData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const ClientID &node_id,
const HeartbeatBatchTableData &data) {
subscribe(data);
};
return heartbeat_batch_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe,
done);
}
} // namespace gcs
+24 -1
View File
@@ -208,12 +208,35 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
bool IsRemoved(const ClientID &node_id) const override;
Status AsyncReportHeartbeat(const std::shared_ptr<HeartbeatTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncSubscribeHeartbeat(
const SubscribeCallback<ClientID, HeartbeatTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncReportBatchHeartbeat(
const std::shared_ptr<HeartbeatBatchTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncSubscribeBatchHeartbeat(
const ItemCallback<HeartbeatBatchTableData> &subscribe,
const StatusCallback &done) override;
private:
RedisGcsClient *client_impl_{nullptr};
typedef SubscriptionExecutor<ClientID, HeartbeatTableData, HeartbeatTable>
HeartbeatSubscriptionExecutor;
HeartbeatSubscriptionExecutor heartbeat_sub_executor_;
typedef SubscriptionExecutor<ClientID, HeartbeatBatchTableData, HeartbeatBatchTable>
HeartbeatBatchSubscriptionExecutor;
HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_ACCESSOR_H
#endif // RAY_GCS_REDIS_ACCESSOR_H
+3 -3
View File
@@ -62,8 +62,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
// TODO: Some API for getting the error on the driver
TaskReconstructionLog &task_reconstruction_log();
TaskLeaseTable &task_lease_table();
HeartbeatTable &heartbeat_table();
HeartbeatBatchTable &heartbeat_batch_table();
ErrorTable &error_table();
ProfileTable &profile_table();
ActorCheckpointTable &actor_checkpoint_table();
@@ -101,8 +99,10 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
JobTable &job_table();
/// This method will be deprecated, use method Objects() instead
ObjectTable &object_table();
/// This method will be deprecated, use method Nodes() instead.
/// The following three methods will be deprecated, use method Nodes() instead.
ClientTable &client_table();
HeartbeatTable &heartbeat_table();
HeartbeatBatchTable &heartbeat_batch_table();
/// This method will be deprecated, use method Tasks() instead.
raylet::TaskTable &raylet_task_table();
+3
View File
@@ -189,6 +189,9 @@ template class SubscriptionExecutor<ActorID, ActorTableData, DirectActorTable>;
template class SubscriptionExecutor<JobID, JobTableData, JobTable>;
template class SubscriptionExecutor<TaskID, TaskTableData, raylet::TaskTable>;
template class SubscriptionExecutor<ObjectID, ObjectChangeNotification, ObjectTable>;
template class SubscriptionExecutor<ClientID, HeartbeatTableData, HeartbeatTable>;
template class SubscriptionExecutor<ClientID, HeartbeatBatchTableData,
HeartbeatBatchTable>;
} // namespace gcs
+3 -5
View File
@@ -30,12 +30,11 @@ void Monitor::HandleHeartbeat(const ClientID &node_id,
}
void Monitor::Start() {
const auto heartbeat_callback = [this](gcs::RedisGcsClient *client, const ClientID &id,
const auto heartbeat_callback = [this](const ClientID &id,
const HeartbeatTableData &heartbeat_data) {
HandleHeartbeat(id, heartbeat_data);
};
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(
JobID::Nil(), ClientID::Nil(), heartbeat_callback, nullptr, nullptr));
RAY_CHECK_OK(gcs_client_.Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr));
Tick();
}
@@ -88,8 +87,7 @@ void Monitor::Tick() {
for (const auto &heartbeat : heartbeat_buffer_) {
batch->add_batch()->CopyFrom(heartbeat.second);
}
RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(JobID::Nil(), ClientID::Nil(),
batch, nullptr));
RAY_CHECK_OK(gcs_client_.Nodes().AsyncReportBatchHeartbeat(batch, nullptr));
heartbeat_buffer_.clear();
}
+5 -9
View File
@@ -211,14 +211,11 @@ ray::Status NodeManager::RegisterGcs() {
// Subscribe to heartbeat batches from the monitor.
const auto &heartbeat_batch_added =
[this](gcs::RedisGcsClient *client, const ClientID &id,
const HeartbeatBatchTableData &heartbeat_batch) {
[this](const HeartbeatBatchTableData &heartbeat_batch) {
HeartbeatBatchAdded(heartbeat_batch);
};
RAY_RETURN_NOT_OK(gcs_client_->heartbeat_batch_table().Subscribe(
JobID::Nil(), ClientID::Nil(), heartbeat_batch_added,
/*subscribe_callback=*/nullptr,
/*done_callback=*/nullptr));
RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchHeartbeat(
heartbeat_batch_added, /*done*/ nullptr));
// Subscribe to job updates.
const auto job_subscribe_handler = [this](const JobID &job_id,
@@ -297,7 +294,6 @@ void NodeManager::Heartbeat() {
}
last_heartbeat_at_ms_ = now_ms;
auto &heartbeat_table = gcs_client_->heartbeat_table();
auto heartbeat_data = std::make_shared<HeartbeatTableData>();
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
heartbeat_data->set_client_id(self_node_id_.Binary());
@@ -338,8 +334,8 @@ void NodeManager::Heartbeat() {
}
}
ray::Status status = heartbeat_table.Add(JobID::Nil(), self_node_id_, heartbeat_data,
/*success_callback=*/nullptr);
ray::Status status = gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data,
/*done*/ nullptr);
RAY_CHECK_OK_PREPEND(status, "Heartbeat failed");
if (debug_dump_period_ > 0 &&