From 5f351a05fe2e18a331e452fd0a8a8b09d5b561ec Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 4 May 2020 10:23:55 +0800 Subject: [PATCH] GCS adapts to task table pub sub (#8210) --- src/ray/gcs/accessor.h | 7 +--- .../gcs/gcs_client/service_based_accessor.cc | 42 ++++++++++++++----- .../gcs/gcs_client/service_based_accessor.h | 9 +--- .../test/service_based_gcs_client_test.cc | 16 +++---- src/ray/gcs/gcs_server/gcs_server.cc | 2 +- .../gcs/gcs_server/task_info_handler_impl.cc | 35 +++++++++------- .../gcs/gcs_server/task_info_handler_impl.h | 7 +++- src/ray/gcs/pubsub/gcs_pub_sub.h | 1 + src/ray/gcs/redis_accessor.cc | 10 ++--- src/ray/gcs/redis_accessor.h | 5 +-- src/ray/raylet/lineage_cache.cc | 2 +- src/ray/raylet/lineage_cache_test.cc | 2 +- src/ray/raylet/reconstruction_policy.cc | 3 +- src/ray/raylet/reconstruction_policy_test.cc | 3 +- 14 files changed, 77 insertions(+), 67 deletions(-) diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 9ad76c9aa..4a983931b 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -229,9 +229,8 @@ class TaskInfoAccessor { /// Cancel subscription to a task asynchronously. /// /// \param task_id The ID of the task to be unsubscribed to. - /// \param done Callback that will be called when unsubscribe is complete. /// \return Status - virtual Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) = 0; + virtual Status AsyncUnsubscribe(const TaskID &task_id) = 0; /// Add a task lease to GCS asynchronously. /// @@ -257,10 +256,8 @@ class TaskInfoAccessor { /// Cancel subscription to a task lease asynchronously. /// /// \param task_id The ID of the task to be unsubscribed to. - /// \param done Callback that will be called when unsubscribe is complete. /// \return Status - virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id, - const StatusCallback &done) = 0; + virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id) = 0; /// Attempt task reconstruction to GCS asynchronously. /// diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 00d66ecf3..6dcd469cc 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -571,7 +571,6 @@ ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl), subscribe_id_(ClientID::FromRandom()), - task_sub_executor_(client_impl->GetRedisGcsClient().raylet_task_table()), task_lease_sub_executor_(client_impl->GetRedisGcsClient().task_lease_table()) {} Status ServiceBasedTaskInfoAccessor::AsyncAdd( @@ -601,8 +600,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet( client_impl_->GetGcsRpcClient().GetTask( request, [task_id, callback](const Status &status, const rpc::GetTaskReply &reply) { if (reply.has_task_data()) { - TaskTableData task_table_data(reply.task_data()); - callback(status, task_table_data); + callback(status, reply.task_data()); } else { callback(status, boost::none); } @@ -636,16 +634,38 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribe( const StatusCallback &done) { RAY_LOG(DEBUG) << "Subscribing task, task id = " << task_id; RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id; - auto status = - task_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done); + auto on_subscribe = [task_id, subscribe](const std::string &id, + const std::string &data) { + TaskTableData task_data; + task_data.ParseFromString(data); + subscribe(task_id, task_data); + }; + auto on_done = [this, task_id, subscribe, done](const Status &status) { + if (status.ok()) { + auto callback = [task_id, subscribe, done]( + const Status &status, + const boost::optional &result) { + if (result) { + subscribe(task_id, *result); + } + if (done) { + done(status); + } + }; + RAY_CHECK_OK(AsyncGet(task_id, callback)); + } else if (done) { + done(status); + } + }; + auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(), + on_subscribe, on_done); RAY_LOG(DEBUG) << "Finished subscribing task, task id = " << task_id; return status; } -Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id, - const StatusCallback &done) { +Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) { RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id; - auto status = task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done); + auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex()); RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id; return status; } @@ -683,10 +703,10 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease( return status; } -Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease( - const TaskID &task_id, const StatusCallback &done) { +Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) { RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id; - auto status = task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done); + auto status = + task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr); RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id; return status; } diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 6e6b8e09b..f36c70ad2 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -212,7 +212,7 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { const SubscribeCallback &subscribe, const StatusCallback &done) override; - Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) override; + Status AsyncUnsubscribe(const TaskID &task_id) override; Status AsyncAddTaskLease(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; @@ -222,8 +222,7 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { const SubscribeCallback> &subscribe, const StatusCallback &done) override; - Status AsyncUnsubscribeTaskLease(const TaskID &task_id, - const StatusCallback &done) override; + Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override; Status AttemptTaskReconstruction( const std::shared_ptr &data_ptr, @@ -234,10 +233,6 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { ClientID subscribe_id_; - typedef SubscriptionExecutor - TaskSubscriptionExecutor; - TaskSubscriptionExecutor task_sub_executor_; - typedef SubscriptionExecutor, TaskLeaseTable> TaskLeaseSubscriptionExecutor; TaskLeaseSubscriptionExecutor task_lease_sub_executor_; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 28a355506..013dc6b03 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -293,11 +293,9 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest { return WaitReady(promise.get_future(), timeout_ms_); } - bool UnsubscribeTask(const TaskID &task_id) { + void UnsubscribeTask(const TaskID &task_id) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe( - task_id, [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); + RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id)); } bool AddTask(const std::shared_ptr task) { @@ -340,11 +338,9 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest { return WaitReady(promise.get_future(), timeout_ms_); } - bool UnsubscribeTaskLease(const TaskID &task_id) { + void UnsubscribeTaskLease(const TaskID &task_id) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease( - task_id, [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); + RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id)); } bool AddTaskLease(const std::shared_ptr task_lease) { @@ -688,7 +684,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { ASSERT_TRUE(get_task_result.task().task_spec().job_id() == job_id.Binary()); // Cancel subscription to a task. - ASSERT_TRUE(UnsubscribeTask(task_id)); + UnsubscribeTask(task_id); // Add a task to GCS again. ASSERT_TRUE(AddTask(task_table_data)); @@ -717,7 +713,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { WaitPendingDone(task_lease_count, 2); // Cancel subscription to a task lease. - ASSERT_TRUE(UnsubscribeTaskLease(task_id)); + UnsubscribeTaskLease(task_id); // Add a task lease to GCS again. ASSERT_TRUE(AddTaskLease(task_lease)); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index f2be23a12..b26a78f18 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -167,7 +167,7 @@ void GcsServer::StoreGcsServerAddressInRedis() { std::unique_ptr GcsServer::InitTaskInfoHandler() { return std::unique_ptr( - new rpc::DefaultTaskInfoHandler(*redis_gcs_client_)); + new rpc::DefaultTaskInfoHandler(*redis_gcs_client_, gcs_pub_sub_)); } std::unique_ptr GcsServer::InitStatsHandler() { diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 25b167404..54e6cd729 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -25,20 +25,24 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request, RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id; auto task_table_data = std::make_shared(); task_table_data->CopyFrom(request.task_data()); - auto on_done = [job_id, task_id, request, reply, send_reply_callback](Status status) { + auto on_done = [this, job_id, task_id, task_table_data, request, reply, + send_reply_callback](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id << ", task id = " << task_id; + } else { + RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_CHANNEL, task_id.Hex(), + task_table_data->SerializeAsString(), nullptr)); + RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id + << ", task id = " << task_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Tasks().AsyncAdd(task_table_data, on_done); if (!status.ok()) { on_done(status); } - RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id - << ", task id = " << task_id; } void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, @@ -48,23 +52,20 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, RAY_LOG(DEBUG) << "Getting task, job id = " << task_id.JobId() << ", task id = " << task_id; auto on_done = [task_id, request, reply, send_reply_callback]( - Status status, const boost::optional &result) { + const Status &status, const boost::optional &result) { if (status.ok()) { RAY_DCHECK(result); reply->mutable_task_data()->CopyFrom(*result); - } else { - RAY_LOG(ERROR) << "Failed to get task, job id = " << task_id.JobId() - << ", task id = " << task_id; } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId() + << ", task id = " << task_id << ", status = " << status.ToString(); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); }; Status status = gcs_client_.Tasks().AsyncGet(task_id, on_done); if (!status.ok()) { on_done(status, boost::none); } - RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId() - << ", task id = " << task_id; } void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request, @@ -127,12 +128,18 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( << ", node id = " << node_id; auto task_reconstruction_data = std::make_shared(); task_reconstruction_data->CopyFrom(request.task_reconstruction()); - auto on_done = [task_id, node_id, request, reply, send_reply_callback](Status status) { + auto on_done = [task_id, node_id, request, reply, + send_reply_callback](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to reconstruct task, job id = " << task_id.JobId() << ", task id = " << task_id << ", reconstructions num = " << request.task_reconstruction().num_reconstructions() << ", node id = " << node_id; + } else { + RAY_LOG(DEBUG) << "Finished reconstructing task, job id = " << task_id.JobId() + << ", task id = " << task_id << ", reconstructions num = " + << request.task_reconstruction().num_reconstructions() + << ", node id = " << node_id; } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; @@ -142,10 +149,6 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( if (!status.ok()) { on_done(status); } - RAY_LOG(DEBUG) << "Finished reconstructing task, job id = " << task_id.JobId() - << ", task id = " << task_id << ", reconstructions num = " - << request.task_reconstruction().num_reconstructions() - << ", node id = " << node_id; } } // namespace rpc diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.h b/src/ray/gcs/gcs_server/task_info_handler_impl.h index 06fa9fc87..040f877d6 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.h @@ -15,6 +15,7 @@ #ifndef RAY_GCS_TASK_INFO_HANDLER_IMPL_H #define RAY_GCS_TASK_INFO_HANDLER_IMPL_H +#include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" @@ -24,8 +25,9 @@ namespace rpc { /// This implementation class of `TaskInfoHandler`. class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { public: - explicit DefaultTaskInfoHandler(gcs::RedisGcsClient &gcs_client) - : gcs_client_(gcs_client) {} + explicit DefaultTaskInfoHandler(gcs::RedisGcsClient &gcs_client, + std::shared_ptr &gcs_pub_sub) + : gcs_client_(gcs_client), gcs_pub_sub_(gcs_pub_sub) {} void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply, SendReplyCallback send_reply_callback) override; @@ -45,6 +47,7 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { private: gcs::RedisGcsClient &gcs_client_; + std::shared_ptr &gcs_pub_sub_; }; } // namespace rpc diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index ffcb9ed81..f4cf93509 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -28,6 +28,7 @@ namespace gcs { #define JOB_CHANNEL "JOB" #define WORKER_FAILURE_CHANNEL "WORKER_FAILURE" #define OBJECT_CHANNEL "OBJECT" +#define TASK_CHANNEL "TASK" /// \class GcsPubSub /// diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 84c36e00c..4d15ce913 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -389,9 +389,8 @@ Status RedisTaskInfoAccessor::AsyncSubscribe( return task_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done); } -Status RedisTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id, - const StatusCallback &done) { - return task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done); +Status RedisTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) { + return task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr); } Status RedisTaskInfoAccessor::AsyncAddTaskLease( @@ -414,9 +413,8 @@ Status RedisTaskInfoAccessor::AsyncSubscribeTaskLease( return task_lease_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done); } -Status RedisTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id, - const StatusCallback &done) { - return task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done); +Status RedisTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) { + return task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr); } Status RedisTaskInfoAccessor::AttemptTaskReconstruction( diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index a3d59cffa..d0f700406 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -195,7 +195,7 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor { const SubscribeCallback &subscribe, const StatusCallback &done) override; - Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) override; + Status AsyncUnsubscribe(const TaskID &task_id) override; Status AsyncAddTaskLease(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; @@ -205,8 +205,7 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor { const SubscribeCallback> &subscribe, const StatusCallback &done) override; - Status AsyncUnsubscribeTaskLease(const TaskID &task_id, - const StatusCallback &done) override; + Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override; Status AttemptTaskReconstruction( const std::shared_ptr &data_ptr, diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index f99700318..44c585ede 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -321,7 +321,7 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) { bool subscribed = (it != subscribed_tasks_.end()); if (subscribed) { // Cancel subscribe to the task. - RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id, /*done*/ nullptr)); + RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id)); subscribed_tasks_.erase(it); } // Return whether we were previously subscribed to this task and are now diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 013095742..4a7a34b98 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -98,7 +98,7 @@ class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor { return ray::Status::OK(); } - Status AsyncUnsubscribe(const TaskID &task_id, const gcs::StatusCallback &done) { + Status AsyncUnsubscribe(const TaskID &task_id) { subscribed_tasks_.erase(task_id); return ray::Status::OK(); } diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index 874f59e58..ae488864a 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -237,8 +237,7 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) { if (it->second.created_objects.empty()) { // Cancel notifications for the task lease if we were subscribed to them. if (it->second.subscribed) { - RAY_CHECK_OK( - gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id, /*done*/ nullptr)); + RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id)); } listening_tasks_.erase(it); } diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 2e7edb398..4899f3443 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -128,8 +128,7 @@ class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor { return ray::Status::OK(); } - Status AsyncUnsubscribeTaskLease(const TaskID &task_id, - const gcs::StatusCallback &done) override { + Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override { subscribed_tasks_.erase(task_id); return ray::Status::OK(); }