diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 6ee5fa5ab..dd5a9b6d7 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -1,5 +1,5 @@ #include "ray/core_worker/actor_manager.h" -#include "ray/gcs/redis_accessor.h" +#include "ray/gcs/redis_actor_info_accessor.h" namespace ray { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 916265352..ed2abadd7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -206,7 +206,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, std::shared_ptr data = std::make_shared(); data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage()); - RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr)); + RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr)); SetCurrentTaskId(task_id); } diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h deleted file mode 100644 index 2066583da..000000000 --- a/src/ray/gcs/accessor.h +++ /dev/null @@ -1,183 +0,0 @@ -#ifndef RAY_GCS_ACCESSOR_H -#define RAY_GCS_ACCESSOR_H - -#include "ray/common/id.h" -#include "ray/gcs/callback.h" -#include "ray/protobuf/gcs.pb.h" - -namespace ray { - -namespace gcs { - -/// \class ActorInfoAccessor -/// `ActorInfoAccessor` is a sub-interface of `GcsClient`. -/// This class includes all the methods that are related to accessing -/// actor information in the GCS. -class ActorInfoAccessor { - public: - virtual ~ActorInfoAccessor() = default; - - /// Get actor specification from GCS asynchronously. - /// - /// \param actor_id The ID of actor to look up in the GCS. - /// \param callback Callback that will be called after lookup finishes. - /// \return Status - virtual Status AsyncGet(const ActorID &actor_id, - const OptionalItemCallback &callback) = 0; - - /// Register an actor to GCS asynchronously. - /// - /// \param data_ptr The actor that will be registered to the GCS. - /// \param callback Callback that will be called after actor has been registered - /// to the GCS. - /// \return Status - virtual Status AsyncRegister(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; - - /// Update dynamic states of actor in GCS asynchronously. - /// - /// \param actor_id ID of the actor to update. - /// \param data_ptr Data of the actor to update. - /// \param callback Callback that will be called after update finishes. - /// \return Status - /// TODO(micafan) Don't expose the whole `ActorTableData` and only allow - /// updating dynamic states. - virtual Status AsyncUpdate(const ActorID &actor_id, - const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; - - /// Subscribe to any register or update operations of actors. - /// - /// \param subscribe Callback that will be called each time when an actor is registered - /// or updated. - /// \param done Callback that will be called when subscription is complete and we - /// are ready to receive notification. - /// \return Status - virtual Status AsyncSubscribeAll( - const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; - - /// Subscribe to any update operations of an actor. - /// - /// \param actor_id The ID of actor to be subscribed to. - /// \param subscribe Callback that will be called each time when the actor is updated. - /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribe( - const ActorID &actor_id, - const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; - - /// Cancel subscription to an actor. - /// - /// \param actor_id The ID of the actor to be unsubscribed to. - /// \param done Callback that will be called when unsubscribe is complete. - /// \return Status - virtual Status AsyncUnsubscribe(const ActorID &actor_id, - const StatusCallback &done) = 0; - - protected: - ActorInfoAccessor() = default; -}; - -/// \class JobInfoAccessor -/// `JobInfoAccessor` is a sub-interface of `GcsClient`. -/// This class includes all the methods that are related to accessing -/// job information in the GCS. -class JobInfoAccessor { - public: - virtual ~JobInfoAccessor() = default; - - /// Add a job to GCS asynchronously. - /// - /// \param data_ptr The job that will be add to GCS. - /// \param callback Callback that will be called after job has been added - /// to GCS. - /// \return Status - virtual Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; - - /// Mark job as finished in GCS asynchronously. - /// - /// \param job_id ID of the job that will be make finished to GCS. - /// \param callback Callback that will be called after update finished. - /// \return Status - virtual Status AsyncMarkFinished(const JobID &job_id, - const StatusCallback &callback) = 0; - - /// Subscribe to finished jobs. - /// - /// \param subscribe Callback that will be called each time when a job finishes. - /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribeToFinishedJobs( - const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; - - protected: - JobInfoAccessor() = default; -}; - -/// \class TaskInfoAccessor -/// `TaskInfoAccessor` is a sub-interface of `GcsClient`. -/// This class includes all the methods that are related to accessing -/// task information in the GCS. -class TaskInfoAccessor { - public: - virtual ~TaskInfoAccessor() {} - - /// Add a task to GCS asynchronously. - /// - /// \param data_ptr The task that will be added to GCS. - /// \param callback Callback that will be called after task has been added - /// to GCS. - /// \return Status - virtual Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; - - /// Get task information from GCS asynchronously. - /// - /// \param task_id The ID of the task to look up in GCS. - /// \param callback Callback that is called after lookup finished. - /// \return Status - virtual Status AsyncGet(const TaskID &task_id, - const OptionalItemCallback &callback) = 0; - - /// Delete tasks from GCS asynchronously. - /// - /// \param task_ids The vector of IDs to delete from GCS. - /// \param callback Callback that is called after delete finished. - /// \return Status - // TODO(micafan) Will support callback of batch deletion in the future. - // Currently this callback will never be called. - virtual Status AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) = 0; - - /// Subscribe asynchronously to the event that the given task is added in GCS. - /// - /// \param task_id The ID of the task to be subscribed to. - /// \param subscribe Callback that will be called each time when the task is updated. - /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribe( - const TaskID &task_id, - const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; - - /// Cancel subscription to a task asynchronously. - /// This method is for node only (core worker shouldn't use this method). - /// - /// \param task_id The ID of the task to be unsubscribed to. - /// \param done Callback that will be called when unsubscribe is complete. - /// \return Status - virtual Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) = 0; - - protected: - TaskInfoAccessor() = default; -}; - -} // namespace gcs - -} // namespace ray - -#endif // RAY_GCS_ACCESSOR_H \ No newline at end of file diff --git a/src/ray/gcs/actor_info_accessor.h b/src/ray/gcs/actor_info_accessor.h new file mode 100644 index 000000000..4a3a3109c --- /dev/null +++ b/src/ray/gcs/actor_info_accessor.h @@ -0,0 +1,87 @@ +#ifndef RAY_GCS_ACTOR_INFO_ACCESSOR_H +#define RAY_GCS_ACTOR_INFO_ACCESSOR_H + +#include "ray/common/id.h" +#include "ray/gcs/callback.h" +#include "ray/protobuf/gcs.pb.h" + +namespace ray { + +namespace gcs { + +/// \class ActorInfoAccessor +/// `ActorInfoAccessor` is a sub-interface of `GcsClient`. +/// This class includes all the methods that are related to accessing +/// actor information in the GCS. +class ActorInfoAccessor { + public: + virtual ~ActorInfoAccessor() = default; + + /// Get actor specification from GCS asynchronously. + /// + /// \param actor_id The ID of actor to look up in the GCS. + /// \param callback Callback that will be called after lookup finishes. + /// \return Status + virtual Status AsyncGet(const ActorID &actor_id, + const OptionalItemCallback &callback) = 0; + + /// Register an actor to GCS asynchronously. + /// + /// \param data_ptr The actor that will be registered to the GCS. + /// \param callback Callback that will be called after actor has been registered + /// to the GCS. + /// \return Status + virtual Status AsyncRegister(const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Update dynamic states of actor in GCS asynchronously. + /// + /// \param actor_id ID of the actor to update. + /// \param data_ptr Data of the actor to update. + /// \param callback Callback that will be called after update finishes. + /// \return Status + /// TODO(micafan) Don't expose the whole `ActorTableData` and only allow + /// updating dynamic states. + virtual Status AsyncUpdate(const ActorID &actor_id, + const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Subscribe to any register or update operations of actors. + /// + /// \param subscribe Callback that will be called each time when an actor is registered + /// or updated. + /// \param done Callback that will be called when subscription is complete and we + /// are ready to receive notification. + /// \return Status + virtual Status AsyncSubscribeAll( + const SubscribeCallback &subscribe, + const StatusCallback &done) = 0; + + /// Subscribe to any update operations of an actor. + /// + /// \param actor_id The ID of actor to be subscribed to. + /// \param subscribe Callback that will be called each time when the actor is updated. + /// \param done Callback that will be called when subscription is complete. + /// \return Status + virtual Status AsyncSubscribe( + const ActorID &actor_id, + const SubscribeCallback &subscribe, + const StatusCallback &done) = 0; + + /// Cancel subscription to an actor. + /// + /// \param actor_id The ID of the actor to be unsubscribed to. + /// \param done Callback that will be called when unsubscribe is complete. + /// \return Status + virtual Status AsyncUnsubscribe(const ActorID &actor_id, + const StatusCallback &done) = 0; + + protected: + ActorInfoAccessor() = default; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_ACTOR_INFO_ACCESSOR_H diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client.h index d71bdbbec..c5ebd8e35 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client.h @@ -6,7 +6,8 @@ #include #include #include "ray/common/status.h" -#include "ray/gcs/accessor.h" +#include "ray/gcs/actor_info_accessor.h" +#include "ray/gcs/job_info_accessor.h" #include "ray/util/logging.h" namespace ray { @@ -74,13 +75,6 @@ class GcsClient : public std::enable_shared_from_this { return *job_accessor_; } - /// Get the sub-interface for accessing task information in GCS. - /// This function is thread safe. - TaskInfoAccessor &Tasks() { - RAY_CHECK(task_accessor_ != nullptr); - return *task_accessor_; - } - protected: /// Constructor of GcsClient. /// @@ -94,7 +88,6 @@ class GcsClient : public std::enable_shared_from_this { std::unique_ptr actor_accessor_; std::unique_ptr job_accessor_; - std::unique_ptr task_accessor_; }; } // namespace gcs diff --git a/src/ray/gcs/job_info_accessor.h b/src/ray/gcs/job_info_accessor.h new file mode 100644 index 000000000..e24a582e9 --- /dev/null +++ b/src/ray/gcs/job_info_accessor.h @@ -0,0 +1,54 @@ +#ifndef RAY_GCS_JOB_INFO_ACCESSOR_H +#define RAY_GCS_JOB_INFO_ACCESSOR_H + +#include "ray/common/id.h" +#include "ray/gcs/callback.h" +#include "ray/protobuf/gcs.pb.h" + +namespace ray { + +namespace gcs { + +/// \class JobInfoAccessor +/// `JobInfoAccessor` is a sub-interface of `GcsClient`. +/// This class includes all the methods that are related to accessing +/// job information in the GCS. +class JobInfoAccessor { + public: + virtual ~JobInfoAccessor() = default; + + /// Add a job to GCS asynchronously. + /// + /// \param data_ptr The job that will be add to GCS. + /// \param callback Callback that will be called after job has been added + /// to GCS. + /// \return Status + virtual Status AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Mark job as finished in GCS asynchronously. + /// + /// \param job_id ID of the job that will be make finished to GCS. + /// \param callback Callback that will be called after update finished. + /// \return Status + virtual Status AsyncMarkFinished(const JobID &job_id, + const StatusCallback &callback) = 0; + + /// Subscribe to finished jobs. + /// + /// \param subscribe Callback that will be called each time when a job finishes. + /// \param done Callback that will be called when subscription is complete. + /// \return Status + virtual Status AsyncSubscribeToFinishedJobs( + const SubscribeCallback &subscribe, + const StatusCallback &done) = 0; + + protected: + JobInfoAccessor() = default; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_JOB_INFO_ACCESSOR_H diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h deleted file mode 100644 index 53b6668f7..000000000 --- a/src/ray/gcs/redis_accessor.h +++ /dev/null @@ -1,139 +0,0 @@ -#ifndef RAY_GCS_REDIS_ACCESSOR_H -#define RAY_GCS_REDIS_ACCESSOR_H - -#include "ray/common/id.h" -#include "ray/common/task/task_spec.h" -#include "ray/gcs/accessor.h" -#include "ray/gcs/callback.h" -#include "ray/gcs/subscription_executor.h" -#include "ray/gcs/tables.h" - -namespace ray { - -namespace gcs { - -class RedisGcsClient; - -std::shared_ptr CreateActorTableData( - const TaskSpecification &task_spec, const rpc::Address &address, - gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions); - -/// \class RedisActorInfoAccessor -/// `RedisActorInfoAccessor` is an implementation of `ActorInfoAccessor` -/// that uses Redis as the backend storage. -class RedisActorInfoAccessor : public ActorInfoAccessor { - public: - explicit RedisActorInfoAccessor(RedisGcsClient *client_impl); - - virtual ~RedisActorInfoAccessor() {} - - Status AsyncGet(const ActorID &actor_id, - const OptionalItemCallback &callback) override; - - Status AsyncRegister(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncUpdate(const ActorID &actor_id, - const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncSubscribeAll(const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncSubscribe(const ActorID &actor_id, - const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override; - - private: - RedisGcsClient *client_impl_{nullptr}; - // Use a random ClientID for actor subscription. Because: - // If we use ClientID::Nil, GCS will still send all actors' updates to this GCS Client. - // Even we can filter out irrelevant updates, but there will be extra overhead. - // And because the new GCS Client will no longer hold the local ClientID, so we use - // random ClientID instead. - // TODO(micafan): Remove this random id, once GCS becomes a service. - ClientID node_id_{ClientID::FromRandom()}; - - typedef SubscriptionExecutor - ActorSubscriptionExecutor; - ActorSubscriptionExecutor actor_sub_executor_; -}; - -/// \class RedisJobInfoAccessor -/// RedisJobInfoAccessor is an implementation of `JobInfoAccessor` -/// that uses Redis as the backend storage. -class RedisJobInfoAccessor : public JobInfoAccessor { - public: - explicit RedisJobInfoAccessor(RedisGcsClient *client_impl); - - virtual ~RedisJobInfoAccessor() {} - - Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override; - - Status AsyncSubscribeToFinishedJobs( - const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - private: - /// Append job information to GCS asynchronously. - /// - /// \param data_ptr The job information that will be appended to GCS. - /// \param callback Callback that will be called after append done. - /// \return Status - Status DoAsyncAppend(const std::shared_ptr &data_ptr, - const StatusCallback &callback); - - RedisGcsClient *client_impl_{nullptr}; - - typedef SubscriptionExecutor JobSubscriptionExecutor; - JobSubscriptionExecutor job_sub_executor_; -}; - -/// \class RedisTaskInfoAccessor -/// `RedisTaskInfoAccessor` is an implementation of `TaskInfoAccessor` -/// that uses Redis as the backend storage. -class RedisTaskInfoAccessor : public TaskInfoAccessor { - public: - explicit RedisTaskInfoAccessor(RedisGcsClient *client_impl); - - ~RedisTaskInfoAccessor() {} - - Status AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback); - - Status AsyncGet(const TaskID &task_id, - const OptionalItemCallback &callback); - - Status AsyncDelete(const std::vector &task_ids, const StatusCallback &callback); - - Status AsyncSubscribe(const TaskID &task_id, - const SubscribeCallback &subscribe, - const StatusCallback &done); - - Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done); - - private: - RedisGcsClient *client_impl_{nullptr}; - // Use a random ClientID for task subscription. Because: - // If we use ClientID::Nil, GCS will still send all tasks' updates to this GCS Client. - // Even we can filter out irrelevant updates, but there will be extra overhead. - // And because the new GCS Client will no longer hold the local ClientID, so we use - // random ClientID instead. - // TODO(micafan): Remove this random id, once GCS becomes a service. - ClientID subscribe_id_{ClientID::FromRandom()}; - - typedef SubscriptionExecutor - TaskSubscriptionExecutor; - TaskSubscriptionExecutor task_sub_executor_; -}; - -} // namespace gcs - -} // namespace ray - -#endif // RAY_GCS_REDIS_ACCESSOR_H \ No newline at end of file diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_actor_info_accessor.cc similarity index 56% rename from src/ray/gcs/redis_accessor.cc rename to src/ray/gcs/redis_actor_info_accessor.cc index 23de4551f..a093d702f 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_actor_info_accessor.cc @@ -1,6 +1,5 @@ -#include "ray/gcs/redis_accessor.h" +#include "ray/gcs/redis_actor_info_accessor.h" #include -#include "ray/gcs/pb_util.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/util/logging.h" @@ -44,10 +43,8 @@ Status RedisActorInfoAccessor::AsyncGet( boost::optional result; if (!data.empty()) { result = data.back(); - callback(Status::OK(), result); - } else { - callback(Status::Invalid("Actor not exist."), result); } + callback(Status::OK(), result); }; return client_impl_->actor_table().Lookup(JobID::Nil(), actor_id, on_done); @@ -135,100 +132,6 @@ Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id, return actor_sub_executor_.AsyncUnsubscribe(node_id_, actor_id, done); } -RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl) - : client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {} - -Status RedisJobInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) { - return DoAsyncAppend(data_ptr, callback); -} - -Status RedisJobInfoAccessor::AsyncMarkFinished(const JobID &job_id, - const StatusCallback &callback) { - std::shared_ptr data_ptr = - CreateJobTableData(job_id, /*is_dead*/ true, /*time_stamp*/ std::time(nullptr), - /*node_manager_address*/ "", /*driver_pid*/ -1); - return DoAsyncAppend(data_ptr, callback); -} - -Status RedisJobInfoAccessor::DoAsyncAppend(const std::shared_ptr &data_ptr, - const StatusCallback &callback) { - JobTable::WriteCallback on_done = nullptr; - if (callback != nullptr) { - on_done = [callback](RedisGcsClient *client, const JobID &job_id, - const JobTableData &data) { callback(Status::OK()); }; - } - - JobID job_id = JobID::FromBinary(data_ptr->job_id()); - return client_impl_->job_table().Append(job_id, job_id, data_ptr, on_done); -} - -Status RedisJobInfoAccessor::AsyncSubscribeToFinishedJobs( - const SubscribeCallback &subscribe, const StatusCallback &done) { - RAY_CHECK(subscribe != nullptr); - auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) { - if (job_data.is_dead()) { - subscribe(job_id, job_data); - } - }; - return job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done); -} - -RedisTaskInfoAccessor::RedisTaskInfoAccessor(RedisGcsClient *client_impl) - : client_impl_(client_impl), task_sub_executor_(client_impl->raylet_task_table()) {} - -Status RedisTaskInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, - const StatusCallback &callback) { - raylet::TaskTable::WriteCallback on_done = nullptr; - if (callback != nullptr) { - on_done = [callback](RedisGcsClient *client, const TaskID &task_id, - const TaskTableData &data) { callback(Status::OK()); }; - } - - TaskID task_id = TaskID::FromBinary(data_ptr->task().task_spec().task_id()); - raylet::TaskTable &task_table = client_impl_->raylet_task_table(); - return task_table.Add(JobID::Nil(), task_id, data_ptr, on_done); -} - -Status RedisTaskInfoAccessor::AsyncGet( - const TaskID &task_id, const OptionalItemCallback &callback) { - RAY_CHECK(callback != nullptr); - auto on_success = [callback](RedisGcsClient *client, const TaskID &task_id, - const TaskTableData &data) { - boost::optional result(data); - callback(Status::OK(), result); - }; - - auto on_failure = [callback](RedisGcsClient *client, const TaskID &task_id) { - boost::optional result; - callback(Status::Invalid("Task not exist."), result); - }; - - raylet::TaskTable &task_table = client_impl_->raylet_task_table(); - return task_table.Lookup(JobID::Nil(), task_id, on_success, on_failure); -} - -Status RedisTaskInfoAccessor::AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) { - raylet::TaskTable &task_table = client_impl_->raylet_task_table(); - task_table.Delete(JobID::Nil(), task_ids); - // TODO(micafan) Always return OK here. - // Confirm if we need to handle the deletion failure and how to handle it. - return Status::OK(); -} - -Status RedisTaskInfoAccessor::AsyncSubscribe( - const TaskID &task_id, const SubscribeCallback &subscribe, - const StatusCallback &done) { - RAY_CHECK(subscribe != nullptr); - return task_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done); -} - -Status RedisTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id, - const StatusCallback &done) { - return task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done); -} - } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/redis_actor_info_accessor.h b/src/ray/gcs/redis_actor_info_accessor.h new file mode 100644 index 000000000..a5c92ea85 --- /dev/null +++ b/src/ray/gcs/redis_actor_info_accessor.h @@ -0,0 +1,68 @@ +#ifndef RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H +#define RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H + +#include "ray/common/id.h" +#include "ray/common/task/task_spec.h" +#include "ray/gcs/actor_info_accessor.h" +#include "ray/gcs/callback.h" +#include "ray/gcs/subscription_executor.h" +#include "ray/gcs/tables.h" + +namespace ray { + +namespace gcs { + +std::shared_ptr CreateActorTableData( + const TaskSpecification &task_spec, const rpc::Address &address, + gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions); + +class RedisGcsClient; + +/// \class RedisActorInfoAccessor +/// `RedisActorInfoAccessor` is an implementation of `ActorInfoAccessor` +/// that uses Redis as the backend storage. +class RedisActorInfoAccessor : public ActorInfoAccessor { + public: + explicit RedisActorInfoAccessor(RedisGcsClient *client_impl); + + virtual ~RedisActorInfoAccessor() {} + + Status AsyncGet(const ActorID &actor_id, + const OptionalItemCallback &callback) override; + + Status AsyncRegister(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + Status AsyncUpdate(const ActorID &actor_id, + const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + Status AsyncSubscribeAll(const SubscribeCallback &subscribe, + const StatusCallback &done) override; + + Status AsyncSubscribe(const ActorID &actor_id, + const SubscribeCallback &subscribe, + const StatusCallback &done) override; + + Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override; + + private: + RedisGcsClient *client_impl_{nullptr}; + // Use a random ClientID for actor subscription. Because: + // If we use ClientID::Nil, GCS will still send all actors' updates to this GCS Client. + // Even we can filter out irrelevant updates, but there will be extra overhead. + // And because the new GCS Client will no longer hold the local ClientID, so we use + // random ClientID instead. + // TODO(micafan): Remove this random id, once GCS becomes a service. + ClientID node_id_{ClientID::FromRandom()}; + + typedef SubscriptionExecutor + ActorSubscriptionExecutor; + ActorSubscriptionExecutor actor_sub_executor_; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 3f060e39f..af2a408dc 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -2,8 +2,9 @@ #include #include "ray/common/ray_config.h" -#include "ray/gcs/redis_accessor.h" +#include "ray/gcs/redis_actor_info_accessor.h" #include "ray/gcs/redis_context.h" +#include "ray/gcs/redis_job_info_accessor.h" static void GetRedisShards(redisContext *context, std::vector &addresses, std::vector &ports) { @@ -72,8 +73,13 @@ namespace ray { namespace gcs { -RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) - : GcsClient(options), command_type_(CommandType::kRegular) {} +RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) { +#if RAY_USE_NEW_GCS + command_type_ = CommandType::kChain; +#else + command_type_ = CommandType::kRegular; +#endif +} RedisGcsClient::RedisGcsClient(const GcsClientOptions &options, CommandType command_type) : GcsClient(options), command_type_(command_type) {} @@ -144,7 +150,6 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { actor_accessor_.reset(new RedisActorInfoAccessor(this)); job_accessor_.reset(new RedisJobInfoAccessor(this)); - task_accessor_.reset(new RedisTaskInfoAccessor(this)); is_connected_ = true; diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 7606ba83d..5eda50896 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -18,14 +18,12 @@ namespace gcs { class RedisContext; class RAY_EXPORT RedisGcsClient : public GcsClient { - // TODO(micafan) Will remove those friend class / method after we replace RedisGcsClient + // TODO(micafan) Will remove those friend class after we replace RedisGcsClient // with interface class GcsClient in raylet. friend class RedisActorInfoAccessor; friend class RedisJobInfoAccessor; - friend class RedisTaskInfoAccessor; friend class SubscriptionExecutorTest; friend class LogSubscribeTestHelper; - friend class TaskTableTestHelper; public: /// Constructor of RedisGcsClient. @@ -57,6 +55,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { // TODO: Some API for getting the error on the driver ObjectTable &object_table(); + raylet::TaskTable &raylet_task_table(); TaskReconstructionLog &task_reconstruction_log(); TaskLeaseTable &task_lease_table(); ClientTable &client_table(); @@ -97,8 +96,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { ActorTable &actor_table(); /// This method will be deprecated, use method Jobs() instead. JobTable &job_table(); - /// This method will be deprecated, use method Tasks() instead. - raylet::TaskTable &raylet_task_table(); // GCS command type. If CommandType::kChain, chain-replicated versions of the tables // might be used, if available. diff --git a/src/ray/gcs/redis_job_info_accessor.cc b/src/ray/gcs/redis_job_info_accessor.cc new file mode 100644 index 000000000..d7028c163 --- /dev/null +++ b/src/ray/gcs/redis_job_info_accessor.cc @@ -0,0 +1,50 @@ +#include "ray/gcs/redis_job_info_accessor.h" +#include "ray/gcs/pb_util.h" +#include "ray/gcs/redis_gcs_client.h" + +namespace ray { + +namespace gcs { + +RedisJobInfoAccessor::RedisJobInfoAccessor(RedisGcsClient *client_impl) + : client_impl_(client_impl), job_sub_executor_(client_impl->job_table()) {} + +Status RedisJobInfoAccessor::AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { + return DoAsyncAppend(data_ptr, callback); +} + +Status RedisJobInfoAccessor::AsyncMarkFinished(const JobID &job_id, + const StatusCallback &callback) { + std::shared_ptr data_ptr = + CreateJobTableData(job_id, /*is_dead*/ true, /*time_stamp*/ std::time(nullptr), + /*node_manager_address*/ "", /*driver_pid*/ -1); + return DoAsyncAppend(data_ptr, callback); +} + +Status RedisJobInfoAccessor::DoAsyncAppend(const std::shared_ptr &data_ptr, + const StatusCallback &callback) { + JobTable::WriteCallback on_done = nullptr; + if (callback != nullptr) { + on_done = [callback](RedisGcsClient *client, const JobID &job_id, + const JobTableData &data) { callback(Status::OK()); }; + } + + JobID job_id = JobID::FromBinary(data_ptr->job_id()); + return client_impl_->job_table().Append(job_id, job_id, data_ptr, on_done); +} + +Status RedisJobInfoAccessor::AsyncSubscribeToFinishedJobs( + const SubscribeCallback &subscribe, const StatusCallback &done) { + RAY_CHECK(subscribe != nullptr); + auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) { + if (job_data.is_dead()) { + subscribe(job_id, job_data); + } + }; + return job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done); +} + +} // namespace gcs + +} // namespace ray diff --git a/src/ray/gcs/redis_job_info_accessor.h b/src/ray/gcs/redis_job_info_accessor.h new file mode 100644 index 000000000..2fb8ebc75 --- /dev/null +++ b/src/ray/gcs/redis_job_info_accessor.h @@ -0,0 +1,53 @@ +#ifndef RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H +#define RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H + +#include "ray/common/id.h" +#include "ray/gcs/callback.h" +#include "ray/gcs/job_info_accessor.h" +#include "ray/gcs/subscription_executor.h" +#include "ray/gcs/tables.h" + +namespace ray { + +namespace gcs { + +class RedisGcsClient; + +/// \class RedisJobInfoAccessor +/// RedisJobInfoAccessor is an implementation of `JobInfoAccessor` +/// that uses Redis as the backend storage. +class RedisJobInfoAccessor : public JobInfoAccessor { + public: + explicit RedisJobInfoAccessor(RedisGcsClient *client_impl); + + virtual ~RedisJobInfoAccessor() {} + + Status AsyncAdd(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; + + Status AsyncMarkFinished(const JobID &job_id, const StatusCallback &callback) override; + + Status AsyncSubscribeToFinishedJobs( + const SubscribeCallback &subscribe, + const StatusCallback &done) override; + + private: + /// Append job information to GCS asynchronously. + /// + /// \param data_ptr The job information that will be appended to GCS. + /// \param callback Callback that will be called after append done. + /// \return Status + Status DoAsyncAppend(const std::shared_ptr &data_ptr, + const StatusCallback &callback); + + RedisGcsClient *client_impl_{nullptr}; + + typedef SubscriptionExecutor JobSubscriptionExecutor; + JobSubscriptionExecutor job_sub_executor_; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_REDIS_JOB_INFO_ACCESSOR_H diff --git a/src/ray/gcs/subscription_executor.cc b/src/ray/gcs/subscription_executor.cc index 8d5b60856..84f2b65b5 100644 --- a/src/ray/gcs/subscription_executor.cc +++ b/src/ray/gcs/subscription_executor.cc @@ -189,7 +189,6 @@ Status SubscriptionExecutor::AsyncUnsubscribe( template class SubscriptionExecutor; template class SubscriptionExecutor; template class SubscriptionExecutor; -template class SubscriptionExecutor; } // namespace gcs diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 1b357d494..ca7b8e7c5 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -306,13 +306,6 @@ Status Table::Subscribe(const JobID &job_id, const ClientID &client_id done); } -template -Status Table::Subscribe(const JobID &job_id, const ClientID &client_id, - const Callback &subscribe, - const SubscriptionCallback &done) { - return Subscribe(job_id, client_id, subscribe, /*failure*/ nullptr, done); -} - template std::string Table::DebugString() const { std::stringstream result; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index b3c52a372..d970e6263 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -311,9 +311,6 @@ class Table : private Log, using Log::RequestNotifications; using Log::CancelNotifications; - /// Expose this interface for use by subscription tools class SubscriptionExecutor. - /// In this way TaskTable() can also reuse class SubscriptionExecutor. - using Log::Subscribe; /// Add an entry to the table. This overwrites any existing data at the key. /// @@ -359,24 +356,6 @@ class Table : private Log, const Callback &subscribe, const FailureCallback &failure, const SubscriptionCallback &done); - /// Subscribe to any Add operations to this table. The caller may choose to - /// subscribe to all Adds, or to subscribe only to keys that it requests - /// notifications for. This may only be called once per Table instance. - /// - /// \param job_id The ID of the job. - /// \param client_id The type of update to listen to. If this is nil, then a - /// message for each Add to the table will be received. Else, only - /// messages for the given client will be received. In the latter - /// case, the client may request notifications on specific keys in the - /// table via `RequestNotifications`. - /// \param subscribe Callback that is called on each received message. If the - /// callback is called with an empty vector, then there was no data at the key. - /// \param done Callback that is called when subscription is complete and we - /// are ready to receive messages. - /// \return Status - Status Subscribe(const JobID &job_id, const ClientID &client_id, - const Callback &subscribe, const SubscriptionCallback &done); - void Delete(const JobID &job_id, const ID &id) { Log::Delete(job_id, id); } void Delete(const JobID &job_id, const std::vector &ids) { diff --git a/src/ray/gcs/test/accessor_test_base.h b/src/ray/gcs/test/accessor_test_base.h index 50563d7d1..0e9be39ec 100644 --- a/src/ray/gcs/test/accessor_test_base.h +++ b/src/ray/gcs/test/accessor_test_base.h @@ -7,7 +7,6 @@ #include #include #include "gtest/gtest.h" -#include "ray/gcs/redis_accessor.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/util/test_util.h" diff --git a/src/ray/gcs/test/redis_gcs_client_test.cc b/src/ray/gcs/test/redis_gcs_client_test.cc index 24f7aedfe..76dbdcbf3 100644 --- a/src/ray/gcs/test/redis_gcs_client_test.cc +++ b/src/ray/gcs/test/redis_gcs_client_test.cc @@ -87,278 +87,69 @@ class TestGcsWithChainAsio : public TestGcsWithAsio { TestGcsWithChainAsio() : TestGcsWithAsio(gcs::CommandType::kChain){}; }; -class TaskTableTestHelper { - public: - /// A helper function that creates a GCS `TaskTableData` object. - static std::shared_ptr CreateTaskTableData(const TaskID &task_id, - uint64_t num_returns = 0) { - auto data = std::make_shared(); - data->mutable_task()->mutable_task_spec()->set_task_id(task_id.Binary()); - data->mutable_task()->mutable_task_spec()->set_num_returns(num_returns); - return data; - } +/// A helper function that creates a GCS `TaskTableData` object. +std::shared_ptr CreateTaskTableData(const TaskID &task_id, + uint64_t num_returns = 0) { + auto data = std::make_shared(); + data->mutable_task()->mutable_task_spec()->set_task_id(task_id.Binary()); + data->mutable_task()->mutable_task_spec()->set_num_returns(num_returns); + return data; +} - /// A helper function that compare whether 2 `TaskTableData` objects are equal. - /// Note, this function only compares fields set by `CreateTaskTableData`. - static bool TaskTableDataEqual(const TaskTableData &data1, const TaskTableData &data2) { - const auto &spec1 = data1.task().task_spec(); - const auto &spec2 = data2.task().task_spec(); - return (spec1.task_id() == spec2.task_id() && - spec1.num_returns() == spec2.num_returns()); - } +/// A helper function that compare whether 2 `TaskTableData` objects are equal. +/// Note, this function only compares fields set by `CreateTaskTableData`. +bool TaskTableDataEqual(const TaskTableData &data1, const TaskTableData &data2) { + const auto &spec1 = data1.task().task_spec(); + const auto &spec2 = data2.task().task_spec(); + return (spec1.task_id() == spec2.task_id() && + spec1.num_returns() == spec2.num_returns()); +} - static void TestTableLookup(const JobID &job_id, - std::shared_ptr client) { - const auto task_id = RandomTaskId(); - const auto data = CreateTaskTableData(task_id); +void TestTableLookup(const JobID &job_id, std::shared_ptr client) { + const auto task_id = RandomTaskId(); + const auto data = CreateTaskTableData(task_id); - // Check that we added the correct task. - auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &d) { - ASSERT_EQ(id, task_id); - ASSERT_TRUE(TaskTableDataEqual(*data, d)); - }; + // Check that we added the correct task. + auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &d) { + ASSERT_EQ(id, task_id); + ASSERT_TRUE(TaskTableDataEqual(*data, d)); + }; - // Check that the lookup returns the added task. - auto lookup_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &d) { - ASSERT_EQ(id, task_id); - ASSERT_TRUE(TaskTableDataEqual(*data, d)); - test->Stop(); - }; + // Check that the lookup returns the added task. + auto lookup_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &d) { + ASSERT_EQ(id, task_id); + ASSERT_TRUE(TaskTableDataEqual(*data, d)); + test->Stop(); + }; - // Check that the lookup does not return an empty entry. - auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) { - RAY_CHECK(false); - }; + // Check that the lookup does not return an empty entry. + auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) { + RAY_CHECK(false); + }; - // Add the task, then do a lookup. - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback)); - RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback, - failure_callback)); - // Run the event loop. The loop will only stop if the Lookup callback is - // called (or an assertion failure). - test->Start(); - } - - static void TestTableLookupFailure(const JobID &job_id, - std::shared_ptr client) { - TaskID task_id = RandomTaskId(); - - // Check that the lookup does not return data. - auto lookup_callback = [](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &d) { RAY_CHECK(false); }; - - // Check that the lookup returns an empty entry. - auto failure_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id) { - ASSERT_EQ(id, task_id); - test->Stop(); - }; - - // Lookup the task. We have not done any writes, so the key should be empty. - RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback, - failure_callback)); - // Run the event loop. The loop will only stop if the failure callback is - // called (or an assertion failure). - test->Start(); - } - - static void TestDeleteKeysFromTable( - const JobID &job_id, std::shared_ptr client, - std::vector> &data_vector, bool stop_at_end) { - std::vector ids; - TaskID task_id; - for (auto &data : data_vector) { - task_id = RandomTaskId(); - ids.push_back(task_id); - // Check that we added the correct object entries. - auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &d) { - ASSERT_EQ(id, task_id); - ASSERT_TRUE(TaskTableDataEqual(*data, d)); - test->IncrementNumCallbacks(); - }; - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback)); - } - for (const auto &task_id : ids) { - auto task_lookup_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &data) { - ASSERT_EQ(id, task_id); - test->IncrementNumCallbacks(); - }; - RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, - task_lookup_callback, nullptr)); - } - if (ids.size() == 1) { - client->raylet_task_table().Delete(job_id, ids[0]); - } else { - client->raylet_task_table().Delete(job_id, ids); - } - auto expected_failure_callback = [](RedisGcsClient *client, const TaskID &id) { - ASSERT_TRUE(true); - test->IncrementNumCallbacks(); - }; - auto undesired_callback = [](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &data) { ASSERT_TRUE(false); }; - for (size_t i = 0; i < ids.size(); ++i) { - RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, undesired_callback, - expected_failure_callback)); - } - if (stop_at_end) { - auto stop_callback = [](RedisGcsClient *client, const TaskID &id) { test->Stop(); }; - RAY_CHECK_OK( - client->raylet_task_table().Lookup(job_id, ids[0], nullptr, stop_callback)); - } - } - - static void TestTableSubscribeId(const JobID &job_id, - std::shared_ptr client) { - size_t num_modifications = 3; - - // Add a table entry. - TaskID task_id1 = RandomTaskId(); - - // Add a table entry at a second key. - TaskID task_id2 = RandomTaskId(); - - // The callback for a notification from the table. This should only be - // received for keys that we requested notifications for. - auto notification_callback = [task_id2, num_modifications]( - gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &data) { - // Check that we only get notifications for the requested key. - ASSERT_EQ(id, task_id2); - // Check that we get notifications in the same order as the writes. - ASSERT_TRUE( - TaskTableDataEqual(data, *CreateTaskTableData(task_id2, test->NumCallbacks()))); - test->IncrementNumCallbacks(); - if (test->NumCallbacks() == num_modifications) { - test->Stop(); - } - }; - - // The failure callback should be called once since both keys start as empty. - bool failure_notification_received = false; - auto failure_callback = [task_id2, &failure_notification_received]( - gcs::RedisGcsClient *client, const TaskID &id) { - ASSERT_EQ(id, task_id2); - // The failure notification should be the first notification received. - ASSERT_EQ(test->NumCallbacks(), 0); - failure_notification_received = true; - }; - - // The callback for subscription success. Once we've subscribed, request - // notifications for only one of the keys, then write to both keys. - auto subscribe_callback = [job_id, task_id1, task_id2, - num_modifications](gcs::RedisGcsClient *client) { - // Request notifications for one of the keys. - RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( - job_id, task_id2, client->client_table().GetLocalClientId(), nullptr)); - // Write both keys. We should only receive notifications for the key that - // we requested them for. - for (uint64_t i = 0; i < num_modifications; i++) { - auto data = CreateTaskTableData(task_id1, i); - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr)); - } - for (uint64_t i = 0; i < num_modifications; i++) { - auto data = CreateTaskTableData(task_id2, i); - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr)); - } - }; - - // Subscribe to notifications for this client. This allows us to request and - // receive notifications for specific keys. - RAY_CHECK_OK(client->raylet_task_table().Subscribe( - job_id, client->client_table().GetLocalClientId(), notification_callback, - failure_callback, subscribe_callback)); - // Run the event loop. The loop will only stop if the registered subscription - // callback is called for the requested key. - test->Start(); - // Check that the failure callback was called since the key was initially - // empty. - ASSERT_TRUE(failure_notification_received); - // Check that we received one notification callback for each write to the - // requested key. - ASSERT_EQ(test->NumCallbacks(), num_modifications); - } - - static void TestTableSubscribeCancel(const JobID &job_id, - std::shared_ptr client) { - // Add a table entry. - const auto task_id = RandomTaskId(); - const int num_modifications = 3; - const auto data = CreateTaskTableData(task_id, 0); - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); - - // The failure callback should not be called since all keys are non-empty - // when notifications are requested. - auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) { - RAY_CHECK(false); - }; - - // The callback for a notification from the table. This should only be - // received for keys that we requested notifications for. - auto notification_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id, - const TaskTableData &data) { - ASSERT_EQ(id, task_id); - // Check that we only get notifications for the first and last writes, - // since notifications are canceled in between. - if (test->NumCallbacks() == 0) { - ASSERT_TRUE(TaskTableDataEqual(data, *CreateTaskTableData(task_id, 0))); - } else { - ASSERT_TRUE(TaskTableDataEqual( - data, *CreateTaskTableData(task_id, num_modifications - 1))); - } - test->IncrementNumCallbacks(); - if (test->NumCallbacks() == num_modifications - 1) { - test->Stop(); - } - }; - - // The callback for a notification from the table. This should only be - // received for keys that we requested notifications for. - auto subscribe_callback = [job_id, task_id](gcs::RedisGcsClient *client) { - // Request notifications, then cancel immediately. We should receive a - // notification for the current value at the key. - RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( - job_id, task_id, client->client_table().GetLocalClientId(), nullptr)); - RAY_CHECK_OK(client->raylet_task_table().CancelNotifications( - job_id, task_id, client->client_table().GetLocalClientId(), nullptr)); - // Write to the key. Since we canceled notifications, we should not receive - // a notification for these writes. - for (uint64_t i = 1; i < num_modifications; i++) { - auto data = CreateTaskTableData(task_id, i); - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); - } - // Request notifications again. We should receive a notification for the - // current value at the key. - RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( - job_id, task_id, client->client_table().GetLocalClientId(), nullptr)); - }; - - // Subscribe to notifications for this client. This allows us to request and - // receive notifications for specific keys. - RAY_CHECK_OK(client->raylet_task_table().Subscribe( - job_id, client->client_table().GetLocalClientId(), notification_callback, - failure_callback, subscribe_callback)); - // Run the event loop. The loop will only stop if the registered subscription - // callback is called for the requested key. - test->Start(); - // Check that we received a notification callback for the first and least - // writes to the key, since notifications are canceled in between. - ASSERT_EQ(test->NumCallbacks(), 2); - } -}; + // Add the task, then do a lookup. + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback)); + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback, + failure_callback)); + // Run the event loop. The loop will only stop if the Lookup callback is + // called (or an assertion failure). + test->Start(); +} // Convenient macro to test across {ae, asio} x {regular, chain} x {the tests}. // Undefined at the end. -#define TEST_TASK_TABLE_MACRO(FIXTURE, TEST) \ - TEST_F(FIXTURE, TEST) { \ - test = this; \ - TaskTableTestHelper::TEST(job_id_, client_); \ +#define TEST_MACRO(FIXTURE, TEST) \ + TEST_F(FIXTURE, TEST) { \ + test = this; \ + TEST(job_id_, client_); \ } -TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableLookup); +TEST_MACRO(TestGcsWithAsio, TestTableLookup); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAsio, TestTableLookup); +#endif void TestLogLookup(const JobID &job_id, std::shared_ptr client) { // Append some entries to the log at an object ID. @@ -405,7 +196,32 @@ TEST_F(TestGcsWithAsio, TestLogLookup) { TestLogLookup(job_id_, client_); } -TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableLookupFailure); +void TestTableLookupFailure(const JobID &job_id, + std::shared_ptr client) { + TaskID task_id = RandomTaskId(); + + // Check that the lookup does not return data. + auto lookup_callback = [](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &d) { RAY_CHECK(false); }; + + // Check that the lookup returns an empty entry. + auto failure_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id) { + ASSERT_EQ(id, task_id); + test->Stop(); + }; + + // Lookup the task. We have not done any writes, so the key should be empty. + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback, + failure_callback)); + // Run the event loop. The loop will only stop if the failure callback is + // called (or an assertion failure). + test->Start(); +} + +TEST_MACRO(TestGcsWithAsio, TestTableLookupFailure); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAsio, TestTableLookupFailure); +#endif void TestLogAppendAt(const JobID &job_id, std::shared_ptr client) { TaskID task_id = RandomTaskId(); @@ -579,6 +395,55 @@ void TestDeleteKeysFromLog( } } +void TestDeleteKeysFromTable(const JobID &job_id, + std::shared_ptr client, + std::vector> &data_vector, + bool stop_at_end) { + std::vector ids; + TaskID task_id; + for (auto &data : data_vector) { + task_id = RandomTaskId(); + ids.push_back(task_id); + // Check that we added the correct object entries. + auto add_callback = [task_id, data](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &d) { + ASSERT_EQ(id, task_id); + ASSERT_TRUE(TaskTableDataEqual(*data, d)); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback)); + } + for (const auto &task_id : ids) { + auto task_lookup_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &data) { + ASSERT_EQ(id, task_id); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, task_lookup_callback, + nullptr)); + } + if (ids.size() == 1) { + client->raylet_task_table().Delete(job_id, ids[0]); + } else { + client->raylet_task_table().Delete(job_id, ids); + } + auto expected_failure_callback = [](RedisGcsClient *client, const TaskID &id) { + ASSERT_TRUE(true); + test->IncrementNumCallbacks(); + }; + auto undesired_callback = [](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &data) { ASSERT_TRUE(false); }; + for (size_t i = 0; i < ids.size(); ++i) { + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, undesired_callback, + expected_failure_callback)); + } + if (stop_at_end) { + auto stop_callback = [](RedisGcsClient *client, const TaskID &id) { test->Stop(); }; + RAY_CHECK_OK( + client->raylet_task_table().Lookup(job_id, ids[0], nullptr, stop_callback)); + } +} + void TestDeleteKeysFromSet(const JobID &job_id, std::shared_ptr client, std::vector> &data_vector) { @@ -658,21 +523,21 @@ void TestDeleteKeys(const JobID &job_id, std::shared_ptr cl std::vector> task_vector; auto AppendTaskData = [&task_vector](size_t add_count) { for (size_t i = 0; i < add_count; ++i) { - task_vector.push_back(TaskTableTestHelper::CreateTaskTableData(RandomTaskId())); + task_vector.push_back(CreateTaskTableData(RandomTaskId())); } }; AppendTaskData(1); ASSERT_EQ(task_vector.size(), 1); - TaskTableTestHelper::TestDeleteKeysFromTable(job_id, client, task_vector, false); + TestDeleteKeysFromTable(job_id, client, task_vector, false); AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2); ASSERT_GT(task_vector.size(), 1); ASSERT_LT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size()); - TaskTableTestHelper::TestDeleteKeysFromTable(job_id, client, task_vector, false); + TestDeleteKeysFromTable(job_id, client, task_vector, false); AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2); ASSERT_GT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size()); - TaskTableTestHelper::TestDeleteKeysFromTable(job_id, client, task_vector, true); + TestDeleteKeysFromTable(job_id, client, task_vector, true); test->Start(); ASSERT_GT(test->NumCallbacks(), @@ -976,7 +841,81 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeAll) { TestSetSubscribeAll(job_id_, client_); } -TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableSubscribeId); +void TestTableSubscribeId(const JobID &job_id, + std::shared_ptr client) { + size_t num_modifications = 3; + + // Add a table entry. + TaskID task_id1 = RandomTaskId(); + + // Add a table entry at a second key. + TaskID task_id2 = RandomTaskId(); + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto notification_callback = [task_id2, num_modifications](gcs::RedisGcsClient *client, + const TaskID &id, + const TaskTableData &data) { + // Check that we only get notifications for the requested key. + ASSERT_EQ(id, task_id2); + // Check that we get notifications in the same order as the writes. + ASSERT_TRUE( + TaskTableDataEqual(data, *CreateTaskTableData(task_id2, test->NumCallbacks()))); + test->IncrementNumCallbacks(); + if (test->NumCallbacks() == num_modifications) { + test->Stop(); + } + }; + + // The failure callback should be called once since both keys start as empty. + bool failure_notification_received = false; + auto failure_callback = [task_id2, &failure_notification_received]( + gcs::RedisGcsClient *client, const TaskID &id) { + ASSERT_EQ(id, task_id2); + // The failure notification should be the first notification received. + ASSERT_EQ(test->NumCallbacks(), 0); + failure_notification_received = true; + }; + + // The callback for subscription success. Once we've subscribed, request + // notifications for only one of the keys, then write to both keys. + auto subscribe_callback = [job_id, task_id1, task_id2, + num_modifications](gcs::RedisGcsClient *client) { + // Request notifications for one of the keys. + RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( + job_id, task_id2, client->client_table().GetLocalClientId(), nullptr)); + // Write both keys. We should only receive notifications for the key that + // we requested them for. + for (uint64_t i = 0; i < num_modifications; i++) { + auto data = CreateTaskTableData(task_id1, i); + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr)); + } + for (uint64_t i = 0; i < num_modifications; i++) { + auto data = CreateTaskTableData(task_id2, i); + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr)); + } + }; + + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK(client->raylet_task_table().Subscribe( + job_id, client->client_table().GetLocalClientId(), notification_callback, + failure_callback, subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called for the requested key. + test->Start(); + // Check that the failure callback was called since the key was initially + // empty. + ASSERT_TRUE(failure_notification_received); + // Check that we received one notification callback for each write to the + // requested key. + ASSERT_EQ(test->NumCallbacks(), num_modifications); +} + +TEST_MACRO(TestGcsWithAsio, TestTableSubscribeId); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId); +#endif TEST_F(TestGcsWithAsio, TestLogSubscribeId) { test = this; @@ -1059,7 +998,77 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeId) { TestSetSubscribeId(job_id_, client_); } -TEST_TASK_TABLE_MACRO(TestGcsWithAsio, TestTableSubscribeCancel); +void TestTableSubscribeCancel(const JobID &job_id, + std::shared_ptr client) { + // Add a table entry. + const auto task_id = RandomTaskId(); + const int num_modifications = 3; + const auto data = CreateTaskTableData(task_id, 0); + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); + + // The failure callback should not be called since all keys are non-empty + // when notifications are requested. + auto failure_callback = [](gcs::RedisGcsClient *client, const TaskID &id) { + RAY_CHECK(false); + }; + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto notification_callback = [task_id](gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &data) { + ASSERT_EQ(id, task_id); + // Check that we only get notifications for the first and last writes, + // since notifications are canceled in between. + if (test->NumCallbacks() == 0) { + ASSERT_TRUE(TaskTableDataEqual(data, *CreateTaskTableData(task_id, 0))); + } else { + ASSERT_TRUE( + TaskTableDataEqual(data, *CreateTaskTableData(task_id, num_modifications - 1))); + } + test->IncrementNumCallbacks(); + if (test->NumCallbacks() == num_modifications - 1) { + test->Stop(); + } + }; + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto subscribe_callback = [job_id, task_id](gcs::RedisGcsClient *client) { + // Request notifications, then cancel immediately. We should receive a + // notification for the current value at the key. + RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( + job_id, task_id, client->client_table().GetLocalClientId(), nullptr)); + RAY_CHECK_OK(client->raylet_task_table().CancelNotifications( + job_id, task_id, client->client_table().GetLocalClientId(), nullptr)); + // Write to the key. Since we canceled notifications, we should not receive + // a notification for these writes. + for (uint64_t i = 1; i < num_modifications; i++) { + auto data = CreateTaskTableData(task_id, i); + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); + } + // Request notifications again. We should receive a notification for the + // current value at the key. + RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( + job_id, task_id, client->client_table().GetLocalClientId(), nullptr)); + }; + + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK(client->raylet_task_table().Subscribe( + job_id, client->client_table().GetLocalClientId(), notification_callback, + failure_callback, subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called for the requested key. + test->Start(); + // Check that we received a notification callback for the first and least + // writes to the key, since notifications are canceled in between. + ASSERT_EQ(test->NumCallbacks(), 2); +} + +TEST_MACRO(TestGcsWithAsio, TestTableSubscribeCancel); +#if RAY_USE_NEW_GCS +TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel); +#endif TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) { test = this; @@ -1422,7 +1431,7 @@ TEST_F(TestGcsWithAsio, TestHashTable) { TestHashTable(job_id_, client_); } -#undef TEST_TASK_TABLE_MACRO +#undef TEST_MACRO } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/test/redis_job_info_accessor_test.cc b/src/ray/gcs/test/redis_job_info_accessor_test.cc index 68f43f506..d93f7c40d 100644 --- a/src/ray/gcs/test/redis_job_info_accessor_test.cc +++ b/src/ray/gcs/test/redis_job_info_accessor_test.cc @@ -1,3 +1,4 @@ +#include "ray/gcs/redis_job_info_accessor.h" #include #include "gtest/gtest.h" #include "ray/gcs/pb_util.h" diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index b7ed3e62f..863761440 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -1,8 +1,8 @@ #include "lineage_cache.h" -#include -#include "ray/gcs/redis_gcs_client.h" #include "ray/stats/stats.h" +#include + namespace ray { namespace raylet { @@ -152,15 +152,16 @@ const std::unordered_set &Lineage::GetChildren(const TaskID &task_id) co } } -LineageCache::LineageCache(std::shared_ptr gcs_client, +LineageCache::LineageCache(const ClientID &client_id, + gcs::TableInterface &task_storage, + gcs::PubsubInterface &task_pubsub, uint64_t max_lineage_size) - : gcs_client_(gcs_client) {} + : client_id_(client_id), task_storage_(task_storage), task_pubsub_(task_pubsub) {} /// A helper function to add some uncommitted lineage to the local cache. void LineageCache::AddUncommittedLineage(const TaskID &task_id, const Lineage &uncommitted_lineage) { - RAY_LOG(DEBUG) << "Adding uncommitted task " << task_id << " on " - << gcs_client_->client_table().GetLocalClientId(); + RAY_LOG(DEBUG) << "Adding uncommitted task " << task_id << " on " << client_id_; // If the entry is not found in the lineage to merge, then we stop since // there is nothing to copy into the merged lineage. auto entry = uncommitted_lineage.GetEntry(task_id); @@ -191,8 +192,7 @@ bool LineageCache::CommitTask(const Task &task) { return true; } const TaskID task_id = task.GetTaskSpecification().TaskId(); - RAY_LOG(DEBUG) << "Committing task " << task_id << " on " - << gcs_client_->client_table().GetLocalClientId(); + RAY_LOG(DEBUG) << "Committing task " << task_id << " on " << client_id_; if (lineage_.SetEntry(task, GcsStatus::UNCOMMITTED) || lineage_.GetEntry(task_id)->GetStatus() == GcsStatus::UNCOMMITTED) { @@ -275,17 +275,17 @@ void LineageCache::FlushTask(const TaskID &task_id) { RAY_CHECK(entry); RAY_CHECK(entry->GetStatus() < GcsStatus::COMMITTING); - auto task_callback = [this, task_id](Status status) { - RAY_CHECK(status.ok()); - HandleEntryCommitted(task_id); - }; + gcs::raylet::TaskTable::WriteCallback task_callback = + [this](ray::gcs::RedisGcsClient *client, const TaskID &id, + const TaskTableData &data) { HandleEntryCommitted(id); }; auto task = lineage_.GetEntry(task_id); auto task_data = std::make_shared(); task_data->mutable_task()->mutable_task_spec()->CopyFrom( task->TaskData().GetTaskSpecification().GetMessage()); task_data->mutable_task()->mutable_task_execution_spec()->CopyFrom( task->TaskData().GetTaskExecutionSpec().GetMessage()); - RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(task_data, task_callback)); + RAY_CHECK_OK(task_storage_.Add(JobID(task->TaskData().GetTaskSpecification().JobId()), + task_id, task_data, task_callback)); // We successfully wrote the task, so mark it as committing. // TODO(swang): Use a batched interface and write with all object entries. @@ -296,12 +296,10 @@ bool LineageCache::SubscribeTask(const TaskID &task_id) { auto inserted = subscribed_tasks_.insert(task_id); bool unsubscribed = inserted.second; if (unsubscribed) { - auto subscribe = [this](const TaskID &task_id, const TaskTableData) { - HandleEntryCommitted(task_id); - }; - // Subscribe to the task. - RAY_CHECK_OK(gcs_client_->Tasks().AsyncSubscribe(task_id, subscribe, - /*done*/ nullptr)); + // Request notifications for the task if we haven't already requested + // notifications for it. + RAY_CHECK_OK(task_pubsub_.RequestNotifications(JobID::Nil(), task_id, client_id_, + /*done*/ nullptr)); } // Return whether we were previously unsubscribed to this task and are now // subscribed. @@ -312,8 +310,10 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) { auto it = subscribed_tasks_.find(task_id); bool subscribed = (it != subscribed_tasks_.end()); if (subscribed) { - // Cancel subscribe to the task. - RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id, /*done*/ nullptr)); + // Cancel notifications for the task if we previously requested + // notifications for it. + RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::Nil(), task_id, client_id_, + /*done*/ nullptr)); subscribed_tasks_.erase(it); } // Return whether we were previously subscribed to this task and are now @@ -339,8 +339,7 @@ void LineageCache::EvictTask(const TaskID &task_id) { } // Evict the task. - RAY_LOG(DEBUG) << "Evicting task " << task_id << " on " - << gcs_client_->client_table().GetLocalClientId(); + RAY_LOG(DEBUG) << "Evicting task " << task_id << " on " << client_id_; lineage_.PopEntry(task_id); // Try to evict the children of the evict task. These are the tasks that have // a dependency on the evicted task. diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 7c14c6255..b41e69278 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -12,7 +12,7 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/common/task/task.h" -#include "ray/gcs/redis_gcs_client.h" +#include "ray/gcs/tables.h" namespace ray { @@ -209,8 +209,9 @@ class LineageCache { public: /// Create a lineage cache for the given task storage system. /// TODO(swang): Pass in the policy (interface?). - LineageCache(std::shared_ptr gcs_client, - uint64_t max_lineage_size); + LineageCache(const ClientID &client_id, + gcs::TableInterface &task_storage, + gcs::PubsubInterface &task_pubsub, uint64_t max_lineage_size); /// Asynchronously commit a task to the GCS. /// @@ -302,13 +303,19 @@ class LineageCache { /// was successful (whether we were subscribed). bool UnsubscribeTask(const TaskID &task_id); - /// A client connection to the GCS. - std::shared_ptr gcs_client_; + /// The client ID, used to request notifications for specific tasks. + /// TODO(swang): Move the ClientID into the generic Table implementation. + ClientID client_id_; + /// The durable storage system for task information. + gcs::TableInterface &task_storage_; + /// The pubsub storage system for task information. This can be used to + /// request notifications for the commit of a task entry. + gcs::PubsubInterface &task_pubsub_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_; - /// The tasks that we've subscribed to. - /// We will receive a notification for these tasks on commit. + /// The tasks that we've subscribed to notifications for from the pubsub + /// storage system. We will receive a notification for these tasks on commit. std::unordered_set subscribed_tasks_; }; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 81e87577d..8d076b49a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -102,7 +102,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, gcs_client_->client_table().GetLocalClientId(), RayConfig::instance().initial_reconstruction_timeout_milliseconds(), gcs_client_->task_lease_table()), - lineage_cache_(gcs_client_, config.max_lineage_size), + lineage_cache_(gcs_client_->client_table().GetLocalClientId(), + gcs_client_->raylet_task_table(), gcs_client_->raylet_task_table(), + config.max_lineage_size), actor_registry_(), node_manager_server_("NodeManager", config.node_manager_port), node_manager_service_(io_service, *this), @@ -138,6 +140,18 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, ray::Status NodeManager::RegisterGcs() { object_manager_.RegisterGcs(); + // Subscribe to task entry commits in the GCS. These notifications are + // forwarded to the lineage cache, which requests notifications about tasks + // that were executed remotely. + const auto task_committed_callback = [this](gcs::RedisGcsClient *client, + const TaskID &task_id, + const TaskTableData &task_data) { + lineage_cache_.HandleEntryCommitted(task_id); + }; + RAY_RETURN_NOT_OK(gcs_client_->raylet_task_table().Subscribe( + JobID::Nil(), gcs_client_->client_table().GetLocalClientId(), + task_committed_callback, nullptr, nullptr)); + const auto task_lease_notification_callback = [this](gcs::RedisGcsClient *client, const TaskID &task_id, const TaskLeaseData &task_lease) { @@ -970,7 +984,7 @@ void NodeManager::ProcessClientMessage( for (const auto &object_id : object_ids) { creating_task_ids.push_back(object_id.TaskId()); } - RAY_CHECK_OK(gcs_client_->Tasks().AsyncDelete(creating_task_ids, nullptr)); + gcs_client_->raylet_task_table().Delete(JobID::Nil(), creating_task_ids); } } break; case protocol::MessageType::PrepareActorCheckpointRequest: { @@ -2423,26 +2437,27 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { auto parent_task_id = task_spec.ParentTaskId(); int port = worker.Port(); RAY_CHECK_OK( - gcs_client_->Tasks().AsyncGet( - parent_task_id, - /*callback=*/ - [this, task_spec, resumed_from_checkpoint, port, parent_task_id]( - Status status, const boost::optional &parent_task_data) { - RAY_CHECK(status.ok()); - if (parent_task_data) { - // The task was in the GCS task table. Use the stored task spec to - // get the parent actor id. - Task parent_task(parent_task_data->task()); - ActorID parent_actor_id = ActorID::Nil(); - if (parent_task.GetTaskSpecification().IsActorCreationTask()) { - parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId(); - } else if (parent_task.GetTaskSpecification().IsActorTask()) { - parent_actor_id = parent_task.GetTaskSpecification().ActorId(); - } - FinishAssignedActorCreationTask(parent_actor_id, task_spec, - resumed_from_checkpoint, port); - return; + gcs_client_->raylet_task_table().Lookup( + JobID::Nil(), parent_task_id, + /*success_callback=*/ + [this, task_spec, resumed_from_checkpoint, port]( + ray::gcs::RedisGcsClient *client, const TaskID &parent_task_id, + const TaskTableData &parent_task_data) { + // The task was in the GCS task table. Use the stored task spec to + // get the parent actor id. + Task parent_task(parent_task_data.task()); + ActorID parent_actor_id = ActorID::Nil(); + if (parent_task.GetTaskSpecification().IsActorCreationTask()) { + parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId(); + } else if (parent_task.GetTaskSpecification().IsActorTask()) { + parent_actor_id = parent_task.GetTaskSpecification().ActorId(); } + FinishAssignedActorCreationTask(parent_actor_id, task_spec, + resumed_from_checkpoint, port); + }, + /*failure_callback=*/ + [this, task_spec, resumed_from_checkpoint, port]( + ray::gcs::RedisGcsClient *client, const TaskID &parent_task_id) { // The parent task was not in the GCS task table. It should most likely be // in the lineage cache. ActorID parent_actor_id = ActorID::Nil(); @@ -2559,18 +2574,18 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id void NodeManager::HandleTaskReconstruction(const TaskID &task_id, const ObjectID &required_object_id) { // Retrieve the task spec in order to re-execute the task. - RAY_CHECK_OK(gcs_client_->Tasks().AsyncGet( - task_id, - /*callback=*/ - [this, required_object_id, task_id]( - Status status, const boost::optional &task_data) { - RAY_CHECK(status.ok()); - if (task_data) { - // The task was in the GCS task table. Use the stored task spec to - // re-execute the task. - ResubmitTask(Task(task_data->task()), required_object_id); - return; - } + RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( + JobID::Nil(), task_id, + /*success_callback=*/ + [this, required_object_id](ray::gcs::RedisGcsClient *client, const TaskID &task_id, + const TaskTableData &task_data) { + // The task was in the GCS task table. Use the stored task spec to + // re-execute the task. + ResubmitTask(Task(task_data.task()), required_object_id); + }, + /*failure_callback=*/ + [this, required_object_id](ray::gcs::RedisGcsClient *client, + const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. if (lineage_cache_.ContainsTask(task_id)) {