From c57dcc82d16c3eb4d2a28dd0dd57408a453591ee Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 12 Dec 2019 19:50:51 -0800 Subject: [PATCH] Port actor creation to use direct calls (#6375) --- python/ray/_raylet.pyx | 4 +- python/ray/actor.py | 7 +- src/ray/common/task/task_spec.cc | 13 +- src/ray/core_worker/actor_handle.h | 11 ++ src/ray/core_worker/actor_manager.cc | 21 ++++ src/ray/core_worker/actor_manager.h | 40 ++++++ src/ray/core_worker/core_worker.cc | 78 +++++++----- src/ray/core_worker/core_worker.h | 10 ++ src/ray/core_worker/task_manager.cc | 56 ++++++++- src/ray/core_worker/task_manager.h | 36 ++++-- src/ray/core_worker/test/core_worker_test.cc | 20 +-- .../test/direct_actor_transport_test.cc | 29 +++-- .../test/direct_task_transport_test.cc | 7 +- src/ray/core_worker/test/task_manager_test.cc | 21 +++- .../transport/direct_actor_transport.cc | 117 +++++++++--------- .../transport/direct_actor_transport.h | 41 +++--- .../transport/direct_task_transport.cc | 29 +++-- src/ray/gcs/redis_actor_info_accessor.cc | 25 ++++ src/ray/gcs/redis_actor_info_accessor.h | 5 + src/ray/gcs/redis_gcs_client.cc | 3 + src/ray/gcs/redis_gcs_client.h | 5 + src/ray/gcs/subscription_executor.cc | 1 + src/ray/gcs/tables.h | 10 ++ src/ray/protobuf/core_worker.proto | 2 + src/ray/protobuf/gcs.proto | 10 +- src/ray/raylet/node_manager.cc | 5 +- src/ray/rpc/worker/core_worker_client.h | 16 ++- 27 files changed, 447 insertions(+), 175 deletions(-) create mode 100644 src/ray/core_worker/actor_manager.cc create mode 100644 src/ray/core_worker/actor_manager.h diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 70be8eef3..9118845f0 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -718,7 +718,9 @@ cdef CRayStatus task_execution_handler( traceback_str, job_id=None) sys.exit(1) - except SystemExit: + except SystemExit as e: + if not hasattr(e, "is_ray_terminate"): + logger.exception("SystemExit was raised from the worker") # Tell the core worker to exit as soon as the result objects # are processed. return CRayStatus.SystemExit() diff --git a/python/ray/actor.py b/python/ray/actor.py index b49c25cbf..b7b58d9be 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -6,7 +6,6 @@ import copy import inspect import logging import six -import sys import weakref from abc import ABCMeta, abstractmethod @@ -815,7 +814,11 @@ def exit_actor(): ray.disconnect() # Disconnect global state from GCS. ray.state.state.disconnect() - sys.exit(0) + # Set a flag to indicate this is an intentional actor exit. This + # reduces log verbosity. + exit = SystemExit(0) + exit.is_ray_terminate = True + raise exit assert False, "This process should have terminated." else: raise Exception("exit_actor called on a non-actor worker.") diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index b81678fa2..ca31f37f8 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -53,12 +53,23 @@ void TaskSpecification::ComputeResources() { // Task specification getter methods. TaskID TaskSpecification::TaskId() const { + if (message_->task_id().empty() /* e.g., empty proto default */) { + return TaskID::Nil(); + } return TaskID::FromBinary(message_->task_id()); } -JobID TaskSpecification::JobId() const { return JobID::FromBinary(message_->job_id()); } +JobID TaskSpecification::JobId() const { + if (message_->job_id().empty() /* e.g., empty proto default */) { + return JobID::Nil(); + } + return JobID::FromBinary(message_->job_id()); +} TaskID TaskSpecification::ParentTaskId() const { + if (message_->parent_task_id().empty() /* e.g., empty proto default */) { + return TaskID::Nil(); + } return TaskID::FromBinary(message_->parent_task_id()); } diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index a8c7ead48..f2b6bd92c 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -8,6 +8,7 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/protobuf/core_worker.pb.h" +#include "ray/protobuf/gcs.pb.h" namespace ray { @@ -52,10 +53,20 @@ class ActorHandle { /// ActorHandle and reset them in this method. void Reset(); + // Mark the actor handle as dead. + void MarkDead() { state_ = rpc::ActorTableData::DEAD; } + + // Returns whether the actor is known to be dead. + bool IsDead() const { return state_ == rpc::ActorTableData::DEAD; } + private: // Protobuf-defined persistent state of the actor handle. const ray::rpc::ActorHandle inner_; + /// The actor's state (alive or dead). This defaults to ALIVE. Once marked + /// DEAD, the actor handle can never go back to being ALIVE. + rpc::ActorTableData::ActorState state_ = rpc::ActorTableData::ALIVE; + /// The unique id of the dummy object returned by the previous task. /// TODO: This can be removed once we schedule actor tasks by task counter /// only. diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc new file mode 100644 index 000000000..dd5a9b6d7 --- /dev/null +++ b/src/ray/core_worker/actor_manager.cc @@ -0,0 +1,21 @@ +#include "ray/core_worker/actor_manager.h" +#include "ray/gcs/redis_actor_info_accessor.h" + +namespace ray { + +void ActorManager::PublishCreatedActor(const TaskSpecification &actor_creation_task, + const rpc::Address &address) { + auto actor_id = actor_creation_task.ActorCreationId(); + auto data = gcs::CreateActorTableData(actor_creation_task, address, + gcs::ActorTableData::ALIVE, 0); + RAY_CHECK_OK(global_actor_table_.Append(JobID::Nil(), actor_id, data, nullptr)); +} + +void ActorManager::PublishTerminatedActor(const TaskSpecification &actor_creation_task) { + auto actor_id = actor_creation_task.ActorCreationId(); + auto data = gcs::CreateActorTableData(actor_creation_task, rpc::Address(), + gcs::ActorTableData::DEAD, 0); + RAY_CHECK_OK(global_actor_table_.Append(JobID::Nil(), actor_id, data, nullptr)); +} + +} // namespace ray diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h new file mode 100644 index 000000000..e5ca24b4b --- /dev/null +++ b/src/ray/core_worker/actor_manager.h @@ -0,0 +1,40 @@ +#ifndef RAY_CORE_WORKER_ACTOR_MANAGER_H +#define RAY_CORE_WORKER_ACTOR_MANAGER_H + +#include "ray/core_worker/actor_handle.h" +#include "ray/gcs/redis_gcs_client.h" + +namespace ray { + +// Interface for testing. +class ActorManagerInterface { + public: + virtual void PublishCreatedActor(const TaskSpecification &actor_creation_task, + const rpc::Address &address) = 0; + + virtual void PublishTerminatedActor(const TaskSpecification &actor_creation_task) = 0; + + virtual ~ActorManagerInterface() {} +}; + +/// Class to manage lifetimes of actors that we create (actor children). +class ActorManager : public ActorManagerInterface { + public: + ActorManager(gcs::DirectActorTable &global_actor_table) + : global_actor_table_(global_actor_table) {} + + /// Called when an actor creation task that we submitted finishes. + void PublishCreatedActor(const TaskSpecification &actor_creation_task, + const rpc::Address &address) override; + + /// Called when an actor that we own can no longer be restarted. + void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override; + + private: + /// Global database of actors. + gcs::DirectActorTable &global_actor_table_; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_ACTOR_MANAGER_H diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e520c8bc5..501e34832 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -102,6 +102,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, // Initialize gcs client. gcs_client_ = std::make_shared(gcs_options); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); + direct_actor_table_subscriber_ = std::unique_ptr< + gcs::SubscriptionExecutor>( + new gcs::SubscriptionExecutor( + gcs_client_->direct_actor_table())); + + actor_manager_ = + std::unique_ptr(new ActorManager(gcs_client_->direct_actor_table())); // Initialize profiler. profiler_ = std::make_shared(worker_context_, node_ip_address, @@ -138,10 +145,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()), (worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentJobID(), language_, &local_raylet_id, core_worker_server_.GetPort())); - // Unfortunately the raylet client has to be constructed after the receivers. - if (direct_task_receiver_ != nullptr) { - direct_task_receiver_->Init(*local_raylet_client_); - } // Set our own address. RAY_CHECK(!local_raylet_id.IsNil()); @@ -177,8 +180,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, }, ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_)); - task_manager_.reset( - new TaskManager(memory_store_, [this](const TaskSpecification &spec) { + task_manager_.reset(new TaskManager( + memory_store_, actor_manager_, [this](const TaskSpecification &spec) { // Retry after a delay to emulate the existing Raylet reconstruction // behaviour. TODO(ekl) backoff exponentially. RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: " @@ -229,6 +232,10 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, memory_store_, task_manager_, local_raylet_id, RayConfig::instance().worker_lease_timeout_milliseconds())); future_resolver_.reset(new FutureResolver(memory_store_, client_factory)); + // Unfortunately the raylet client has to be constructed after the receivers. + if (direct_task_receiver_ != nullptr) { + direct_task_receiver_->Init(*local_raylet_client_, client_factory, rpc_address_); + } } CoreWorker::~CoreWorker() { @@ -275,7 +282,8 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { if (actor_id_.IsNil() && task_id.IsNil()) { absl::MutexLock lock(&actor_handles_mutex_); for (const auto &handle : actor_handles_) { - RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr)); + RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncUnsubscribe( + gcs_client_->client_table().GetLocalClientId(), handle.first, nullptr)); } actor_handles_.clear(); } @@ -670,7 +678,9 @@ Status CoreWorker::CreateActor(const RayFunction &function, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, actor_creation_options.resources, actor_creation_options.placement_resources, - TaskTransportType::RAYLET, &return_ids); + actor_creation_options.is_direct_call ? TaskTransportType::DIRECT + : TaskTransportType::RAYLET, + &return_ids); builder.SetActorCreationTaskSpec( actor_id, actor_creation_options.max_reconstructions, actor_creation_options.dynamic_worker_options, @@ -685,13 +695,14 @@ Status CoreWorker::CreateActor(const RayFunction &function, *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); - PinObjectReferences(task_spec, TaskTransportType::RAYLET); - // TODO(ekl) if we moved actor creation to use direct call tasks, then we won't - // need to manually resolve direct call args here. - resolver_->ResolveDependencies(task_spec, [this, task_spec]() { - RAY_CHECK_OK(local_raylet_client_->SubmitTask(task_spec)); - }); - return Status::OK(); + if (actor_creation_options.is_direct_call) { + task_manager_->AddPendingTask(task_spec, actor_creation_options.max_reconstructions); + PinObjectReferences(task_spec, TaskTransportType::DIRECT); + return direct_task_submitter_->SubmitTask(task_spec); + } else { + PinObjectReferences(task_spec, TaskTransportType::RAYLET); + return local_raylet_client_->SubmitTask(task_spec); + } } Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, @@ -731,7 +742,13 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f if (is_direct_call) { task_manager_->AddPendingTask(task_spec); PinObjectReferences(task_spec, TaskTransportType::DIRECT); - status = direct_actor_submitter_->SubmitTask(task_spec); + if (actor_handle->IsDead()) { + auto status = Status::IOError("sent task to dead actor"); + task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, + &status); + } else { + status = direct_actor_submitter_->SubmitTask(task_spec); + } } else { PinObjectReferences(task_spec, TaskTransportType::RAYLET); RAY_CHECK_OK(local_raylet_client_->SubmitTask(task_spec)); @@ -765,34 +782,29 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { // Register a callback to handle actor notifications. auto actor_notification_callback = [this](const ActorID &actor_id, const gcs::ActorTableData &actor_data) { - if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { - absl::MutexLock lock(&actor_handles_mutex_); - auto it = actor_handles_.find(actor_id); - RAY_CHECK(it != actor_handles_.end()); - if (it->second->IsDirectCallActor()) { - // We have to reset the actor handle since the next instance of the - // actor will not have the last sequence number that we sent. - // TODO: Remove the check for direct calls. We do not reset for the - // raylet codepath because it tries to replay all tasks since the - // last actor checkpoint. - it->second->Reset(); - } - } else if (actor_data.state() == gcs::ActorTableData::DEAD) { + RAY_CHECK(actor_data.state() != gcs::ActorTableData::RECONSTRUCTING); + if (actor_data.state() == gcs::ActorTableData::DEAD) { + direct_actor_submitter_->DisconnectActor(actor_id, true); + + ActorHandle *actor_handle = nullptr; + RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle)); + actor_handle->MarkDead(); // We cannot erase the actor handle here because clients can still // submit tasks to dead actors. This also means we defer unsubscription, // otherwise we crash when bulk unsubscribing all actor handles. + } else { + direct_actor_submitter_->ConnectActor(actor_id, actor_data.address()); } - direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data); - RAY_LOG(INFO) << "received notification on actor, state=" << static_cast(actor_data.state()) << ", actor_id: " << actor_id << ", ip address: " << actor_data.address().ip_address() << ", port: " << actor_data.address().port(); }; - RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( - actor_id, actor_notification_callback, nullptr)); + RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncSubscribe( + gcs_client_->client_table().GetLocalClientId(), actor_id, + actor_notification_callback, nullptr)); } return inserted; } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 23e2b7485..a8d03b4f4 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -4,6 +4,7 @@ #include "absl/container/flat_hash_map.h" #include "ray/common/buffer.h" #include "ray/core_worker/actor_handle.h" +#include "ray/core_worker/actor_manager.h" #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/future_resolver.h" @@ -15,6 +16,7 @@ #include "ray/core_worker/transport/direct_task_transport.h" #include "ray/core_worker/transport/raylet_transport.h" #include "ray/gcs/redis_gcs_client.h" +#include "ray/gcs/subscription_executor.h" #include "ray/raylet/raylet_client.h" #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/worker/core_worker_client.h" @@ -537,6 +539,11 @@ class CoreWorker { // Client to the GCS shared by core worker interfaces. std::shared_ptr gcs_client_; + // Client to listen to direct actor events. + std::unique_ptr< + gcs::SubscriptionExecutor> + direct_actor_table_subscriber_; + // Client to the raylet shared by core worker interfaces. This needs to be a // shared_ptr for direct calls because we can lease multiple workers through // one client, and we need to keep the connection alive until we return all @@ -568,6 +575,9 @@ class CoreWorker { // Tracks the currently pending tasks. std::shared_ptr task_manager_; + // Interface for publishing actor creation. + std::shared_ptr actor_manager_; + // Interface to submit tasks directly to other actors. std::unique_ptr direct_actor_submitter_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 9481b902c..1be4fc4ca 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1,7 +1,14 @@ #include "ray/core_worker/task_manager.h" +#include "ray/util/util.h" namespace ray { +// Start throttling task failure logs once we hit this threshold. +const int64_t kTaskFailureThrottlingThreshold = 50; + +// Throttle task failure logs to once this interval. +const int64_t kTaskFailureLoggingFrequencyMillis = 5000; + void TaskManager::AddPendingTask(const TaskSpecification &spec, int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId(); absl::MutexLock lock(&mu_); @@ -15,13 +22,16 @@ bool TaskManager::IsTaskPending(const TaskID &task_id) const { } void TaskManager::CompletePendingTask(const TaskID &task_id, - const rpc::PushTaskReply &reply) { + const rpc::PushTaskReply &reply, + const rpc::Address *actor_addr) { RAY_LOG(DEBUG) << "Completing task " << task_id; + TaskSpecification spec; { absl::MutexLock lock(&mu_); auto it = pending_tasks_.find(task_id); RAY_CHECK(it != pending_tasks_.end()) << "Tried to complete task that was not pending " << task_id; + spec = it->second.first; pending_tasks_.erase(it); } @@ -52,9 +62,15 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, in_memory_store_->Put(RayObject(data_buffer, metadata_buffer), object_id)); } } + + if (spec.IsActorCreationTask()) { + RAY_CHECK(actor_addr != nullptr); + actor_manager_->PublishCreatedActor(spec, *actor_addr); + } } -void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) { +void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + Status *status) { // Note that this might be the __ray_terminate__ task, so we don't log // loudly with ERROR here. RAY_LOG(DEBUG) << "Task " << task_id << " failed with error " @@ -83,20 +99,52 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ << ", attempting to resubmit."; retry_task_callback_(spec); } else { - MarkPendingTaskFailed(task_id, spec.NumReturns(), error_type); + // Throttled logging of task failure errors. + { + absl::MutexLock lock(&mu_); + auto debug_str = spec.DebugString(); + if (debug_str.find("__ray_terminate__") == std::string::npos && + (num_failure_logs_ < kTaskFailureThrottlingThreshold || + (current_time_ms() - last_log_time_ms_) > + kTaskFailureLoggingFrequencyMillis)) { + if (num_failure_logs_++ == kTaskFailureThrottlingThreshold) { + RAY_LOG(ERROR) << "Too many failure logs, throttling to once every " + << kTaskFailureLoggingFrequencyMillis << " millis."; + } + last_log_time_ms_ = current_time_ms(); + if (status != nullptr) { + RAY_LOG(ERROR) << "Task failed: " << *status << ": " << spec.DebugString(); + } else { + RAY_LOG(ERROR) << "Task failed: " << spec.DebugString(); + } + } + } + MarkPendingTaskFailed(task_id, spec, error_type); } } -void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, int64_t num_returns, +void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, + const TaskSpecification &spec, rpc::ErrorType error_type) { RAY_LOG(DEBUG) << "Treat task as failed. task_id: " << task_id << ", error_type: " << ErrorType_Name(error_type); + int64_t num_returns = spec.NumReturns(); for (int i = 0; i < num_returns; i++) { const auto object_id = ObjectID::ForTaskReturn( task_id, /*index=*/i + 1, /*transport_type=*/static_cast(TaskTransportType::DIRECT)); RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id)); } + if (spec.IsActorCreationTask()) { + actor_manager_->PublishTerminatedActor(spec); + } +} + +TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const { + absl::MutexLock lock(&mu_); + auto it = pending_tasks_.find(task_id); + RAY_CHECK(it != pending_tasks_.end()); + return it->second.first; } } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 2f15fcf82..870a7b351 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -7,6 +7,7 @@ #include "ray/common/id.h" #include "ray/common/task/task.h" +#include "ray/core_worker/actor_manager.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/protobuf/core_worker.pb.h" #include "ray/protobuf/gcs.pb.h" @@ -15,10 +16,11 @@ namespace ray { class TaskFinisherInterface { public: - virtual void CompletePendingTask(const TaskID &task_id, - const rpc::PushTaskReply &reply) = 0; + virtual void CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, + const rpc::Address *actor_addr) = 0; - virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) = 0; + virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + Status *status = nullptr) = 0; virtual ~TaskFinisherInterface() {} }; @@ -28,8 +30,11 @@ using RetryTaskCallback = std::function; class TaskManager : public TaskFinisherInterface { public: TaskManager(std::shared_ptr in_memory_store, + std::shared_ptr actor_manager, RetryTaskCallback retry_task_callback) - : in_memory_store_(in_memory_store), retry_task_callback_(retry_task_callback) {} + : in_memory_store_(in_memory_store), + actor_manager_(actor_manager), + retry_task_callback_(retry_task_callback) {} /// Add a task that is pending execution. /// @@ -49,29 +54,44 @@ class TaskManager : public TaskFinisherInterface { /// /// \param[in] task_id ID of the pending task. /// \param[in] reply Proto response to a direct actor or task call. + /// \param[in] actor_addr Address of the created actor, or nullptr. /// \return Void. - void CompletePendingTask(const TaskID &task_id, - const rpc::PushTaskReply &reply) override; + void CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, + const rpc::Address *actor_addr) override; /// A pending task failed. This will either retry the task or mark the task /// as failed if there are no retries left. /// /// \param[in] task_id ID of the pending task. /// \param[in] error_type The type of the specific error. - void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) override; + /// \param[in] status Optional status message. + void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + Status *status = nullptr) override; + + /// Return the spec for a pending task. + TaskSpecification GetTaskSpec(const TaskID &task_id) const; private: /// Treat a pending task as failed. The lock should not be held when calling /// this method because it may trigger callbacks in this or other classes. - void MarkPendingTaskFailed(const TaskID &task_id, int64_t num_returns, + void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type) LOCKS_EXCLUDED(mu_); /// Used to store task results. std::shared_ptr in_memory_store_; + // Interface for publishing actor creation. + std::shared_ptr actor_manager_; + /// Called when a task should be retried. const RetryTaskCallback retry_task_callback_; + // The number of task failures we have logged total. + int64_t num_failure_logs_ GUARDED_BY(mu_) = 0; + + // The last time we logged a task failure. + int64_t last_log_time_ms_ GUARDED_BY(mu_) = 0; + /// Protects below fields. mutable absl::Mutex mu_; diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 69a41302d..24ee7875a 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -993,16 +993,18 @@ TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodes) { TestActorTask(resources, true); } -TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) { - std::unordered_map resources; - TestActorReconstruction(resources, true); -} +// TODO(ekl) support reconstruction for direct call actors +// TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) { +// std::unordered_map resources; +// TestActorReconstruction(resources, true); +//} -TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) { - std::unordered_map resources; - resources.emplace("resource1", 1); - TestActorReconstruction(resources, true); -} +// TODO(ekl) support reconstruction for direct call actors +// TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) { +// std::unordered_map resources; +// resources.emplace("resource1", 1); +// TestActorReconstruction(resources, true); +//} TEST_F(SingleNodeTest, TestDirectActorTaskLocalFailure) { std::unordered_map resources; diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 0f37fa4ab..2ac27b521 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -41,8 +41,10 @@ class MockTaskFinisher : public TaskFinisherInterface { public: MockTaskFinisher() {} - MOCK_METHOD2(CompletePendingTask, void(const TaskID &, const rpc::PushTaskReply &)); - MOCK_METHOD2(PendingTaskFailed, void(const TaskID &task_id, rpc::ErrorType error_type)); + MOCK_METHOD3(CompletePendingTask, void(const TaskID &, const rpc::PushTaskReply &, + const rpc::Address *addr)); + MOCK_METHOD3(PendingTaskFailed, + void(const TaskID &task_id, rpc::ErrorType error_type, Status *status)); }; TaskSpecification CreateActorTaskHelper(ActorID actor_id, int64_t counter) { @@ -70,32 +72,32 @@ class DirectActorTransportTest : public ::testing::Test { }; TEST_F(DirectActorTransportTest, TestSubmitTask) { + rpc::Address addr; ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); auto task = CreateActorTaskHelper(actor_id, 0); ASSERT_TRUE(submitter_.SubmitTask(task).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 0); - gcs::ActorTableData actor_data; - submitter_.HandleActorUpdate(actor_id, actor_data); + submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 1); task = CreateActorTaskHelper(actor_id, 1); ASSERT_TRUE(submitter_.SubmitTask(task).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 2); - EXPECT_CALL(*task_finisher_, CompletePendingTask(TaskID::Nil(), _)) + EXPECT_CALL(*task_finisher_, CompletePendingTask(TaskID::Nil(), _, _)) .Times(worker_client_->callbacks.size()); - EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _)).Times(0); + EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask()); } } TEST_F(DirectActorTransportTest, TestDependencies) { + rpc::Address addr; ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - gcs::ActorTableData actor_data; - submitter_.HandleActorUpdate(actor_id, actor_data); + submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor with different arguments. @@ -121,9 +123,9 @@ TEST_F(DirectActorTransportTest, TestDependencies) { } TEST_F(DirectActorTransportTest, TestOutOfOrderDependencies) { + rpc::Address addr; ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); - gcs::ActorTableData actor_data; - submitter_.HandleActorUpdate(actor_id, actor_data); + submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor with different arguments. @@ -150,9 +152,10 @@ TEST_F(DirectActorTransportTest, TestOutOfOrderDependencies) { } TEST_F(DirectActorTransportTest, TestActorFailure) { + rpc::Address addr; ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); gcs::ActorTableData actor_data; - submitter_.HandleActorUpdate(actor_id, actor_data); + submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor. @@ -163,8 +166,8 @@ TEST_F(DirectActorTransportTest, TestActorFailure) { ASSERT_EQ(worker_client_->callbacks.size(), 2); // Simulate the actor dying. All submitted tasks should get failed. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _)).Times(2); - EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _)).Times(0); + EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(2); + EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); } diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 442f3947c..a8fbc20b6 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -41,10 +41,13 @@ class MockTaskFinisher : public TaskFinisherInterface { public: MockTaskFinisher() {} - void CompletePendingTask(const TaskID &, const rpc::PushTaskReply &) override { + void CompletePendingTask(const TaskID &, const rpc::PushTaskReply &, + const rpc::Address *actor_addr) override { num_tasks_complete++; } - void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) override { + + void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + Status *status) override { num_tasks_failed++; } diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 2f8bf6090..671c36338 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -1,6 +1,7 @@ #include "gtest/gtest.h" #include "ray/common/task/task_spec.h" +#include "ray/core_worker/actor_manager.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/core_worker/task_manager.h" #include "ray/util/test_util.h" @@ -14,16 +15,32 @@ TaskSpecification CreateTaskHelper(uint64_t num_returns) { return task; } +class MockActorManager : public ActorManagerInterface { + void PublishCreatedActor(const TaskSpecification &actor_creation_task, + const rpc::Address &address) override { + num_publishes += 1; + } + + void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override { + num_terminations += 1; + } + + int num_publishes = 0; + int num_terminations = 0; +}; + class TaskManagerTest : public ::testing::Test { public: TaskManagerTest() : store_(std::shared_ptr(new CoreWorkerMemoryStore())), - manager_(store_, [this](const TaskSpecification &spec) { + actor_manager_(std::shared_ptr(new MockActorManager())), + manager_(store_, actor_manager_, [this](const TaskSpecification &spec) { num_retries_++; return Status::OK(); }) {} std::shared_ptr store_; + std::shared_ptr actor_manager_; TaskManager manager_; int num_retries_ = 0; }; @@ -41,7 +58,7 @@ TEST_F(TaskManagerTest, TestTaskSuccess) { return_object->set_object_id(return_id.Binary()); auto data = GenerateRandomBuffer(); return_object->set_data(data->Data(), data->Size()); - manager_.CompletePendingTask(spec.TaskId(), reply); + manager_.CompletePendingTask(spec.TaskId(), reply, nullptr); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); std::vector> results; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 6a380dc75..a8b3e7ef7 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -21,7 +21,6 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe resolver_.ResolveDependencies(task_spec, [this, send_pos, task_spec]() mutable { const auto &actor_id = task_spec.ActorId(); - const auto task_id = task_spec.TaskId(); auto request = std::unique_ptr(new rpc::PushTaskRequest); // NOTE(swang): CopyFrom is needed because if we use Swap here and the task @@ -31,26 +30,20 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe absl::MutexLock lock(&mu_); - auto iter = actor_states_.find(actor_id); - if (iter == actor_states_.end() || - iter->second.state_ == ActorTableData::RECONSTRUCTING) { + auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request)); + RAY_CHECK(inserted.second); + + auto it = rpc_clients_.find(actor_id); + if (it == rpc_clients_.end()) { // Actor is not yet created, or is being reconstructed, cache the request // and submit after actor is alive. // TODO(zhijunfu): it might be possible for a user to specify an invalid // actor handle (e.g. from unpickling), in that case it might be desirable // to have a timeout to mark it as invalid if it doesn't show up in the // specified time. - auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request)); - RAY_CHECK(inserted.second); RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created."; - } else if (iter->second.state_ == ActorTableData::ALIVE) { - auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request)); - RAY_CHECK(inserted.second); - SendPendingTasks(actor_id); } else { - // Actor is dead, treat the task as failure. - RAY_CHECK(iter->second.state_ == ActorTableData::DEAD); - task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED); + SendPendingTasks(actor_id); } }); @@ -59,29 +52,30 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe return Status::OK(); } -void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( - const ActorID &actor_id, const ActorTableData &actor_data) { +void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, + const rpc::Address &address) { absl::MutexLock lock(&mu_); - actor_states_.erase(actor_id); - actor_states_.emplace( - actor_id, ActorStateData(actor_data.state(), actor_data.address().ip_address(), - actor_data.address().port())); + // Create a new connection to the actor. + if (rpc_clients_.count(actor_id) == 0) { + rpc::WorkerAddress addr = {address.ip_address(), address.port()}; + rpc_clients_[actor_id] = + std::shared_ptr(client_factory_(addr)); + } + if (pending_requests_.count(actor_id) > 0) { + SendPendingTasks(actor_id); + } +} - if (actor_data.state() == ActorTableData::ALIVE) { - // Create a new connection to the actor. - if (rpc_clients_.count(actor_id) == 0) { - rpc::WorkerAddress addr = {actor_data.address().ip_address(), - actor_data.address().port()}; - rpc_clients_[actor_id] = - std::shared_ptr(client_factory_(addr)); - } - if (pending_requests_.count(actor_id) > 0) { - SendPendingTasks(actor_id); - } - } else { - // Remove rpc client if it's dead or being reconstructed. +void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, + bool dead) { + absl::MutexLock lock(&mu_); + if (!dead) { + // We're reconstructing the actor, so erase the client for now. The new client + // will be inserted once actor reconstruction completes. We don't erase the + // client when the actor is DEAD, so that all further tasks will be failed. rpc_clients_.erase(actor_id); - + } else { + RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id; // If there are pending requests, treat the pending tasks as failed. auto pending_it = pending_requests_.find(actor_id); if (pending_it != pending_requests_.end()) { @@ -90,16 +84,15 @@ void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( auto request = std::move(head->second); head = pending_it->second.erase(head); auto task_id = TaskID::FromBinary(request->task_spec().task_id()); - task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED); + auto status = Status::IOError("cancelling all pending tasks of dead actor"); + task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status); } pending_requests_.erase(pending_it); } - - next_send_position_.erase(actor_id); - next_send_position_to_assign_.erase(actor_id); - // No need to clean up tasks that have been sent and are waiting for // replies. They will be treated as failed once the connection dies. + // We retain the sequencing information so that we can properly fail + // any tasks submitted after the actor death. } } @@ -129,9 +122,9 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask( std::move(request), [this, task_id](Status status, const rpc::PushTaskReply &reply) { if (!status.ok()) { - task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED); + task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status); } else { - task_finisher_->CompletePendingTask(task_id, reply); + task_finisher_->CompletePendingTask(task_id, reply, nullptr); } })); } @@ -139,20 +132,16 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask( bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { absl::MutexLock lock(&mu_); - auto iter = actor_states_.find(actor_id); - return (iter != actor_states_.end() && iter->second.state_ == ActorTableData::ALIVE); + auto iter = rpc_clients_.find(actor_id); + return (iter != rpc_clients_.end()); } -CoreWorkerDirectTaskReceiver::CoreWorkerDirectTaskReceiver( - WorkerContext &worker_context, boost::asio::io_service &main_io_service, - const TaskHandler &task_handler, const std::function &exit_handler) - : worker_context_(worker_context), - task_handler_(task_handler), - exit_handler_(exit_handler), - task_main_io_service_(main_io_service) {} - -void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client) { +void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client, + rpc::ClientFactoryFn client_factory, + rpc::Address rpc_address) { waiter_.reset(new DependencyWaiterImpl(raylet_client)); + rpc_address_ = rpc_address; + client_factory_ = client_factory; } void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) { @@ -192,7 +181,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use"; const TaskSpecification task_spec(request.task_spec()); - RAY_LOG(DEBUG) << "Received task " << task_spec.TaskId(); + RAY_LOG(DEBUG) << "Received task " << task_spec.DebugString(); if (task_spec.IsActorTask() && !worker_context_.CurrentTaskIsDirectCall()) { send_reply_callback(Status::Invalid("This actor doesn't accept direct calls."), nullptr, nullptr); @@ -211,15 +200,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } } - auto it = scheduling_queue_.find(task_spec.CallerId()); - if (it == scheduling_queue_.end()) { - auto result = scheduling_queue_.emplace( - task_spec.CallerId(), - std::unique_ptr(new SchedulingQueue( - task_main_io_service_, *waiter_, pool_, is_asyncio_, fiber_rate_limiter_))); - it = result.first; - } - // Only assign resources for non-actor tasks. Actor tasks inherit the resources // assigned at initial actor creation time. std::shared_ptr resource_ids; @@ -289,10 +269,25 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } }; + // Run actor creation task immediately on the main thread, without going + // through a scheduling queue. + if (task_spec.IsActorCreationTask()) { + accept_callback(); + return; + } + auto reject_callback = [send_reply_callback]() { send_reply_callback(Status::Invalid("client cancelled stale rpc"), nullptr, nullptr); }; + auto it = scheduling_queue_.find(task_spec.CallerId()); + if (it == scheduling_queue_.end()) { + auto result = scheduling_queue_.emplace( + task_spec.CallerId(), + std::unique_ptr(new SchedulingQueue( + task_main_io_service_, *waiter_, pool_, is_asyncio_, fiber_rate_limiter_))); + it = result.first; + } it->second->Add(request.sequence_number(), request.client_processed_up_to(), accept_callback, reject_callback, dependencies); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 7e6825496..c873a523e 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -32,18 +32,6 @@ const int kMaxReorderWaitSeconds = 30; /// In direct actor call task submitter and receiver, a task is directly submitted /// to the actor that will execute it. -/// The state data for an actor. -struct ActorStateData { - ActorStateData(gcs::ActorTableData::ActorState state, const std::string &ip, int port) - : state_(state), location_(std::make_pair(ip, port)) {} - - /// Actor's state (e.g. alive, dead, reconstrucing). - gcs::ActorTableData::ActorState state_; - - /// IP address and port that the actor is listening on. - std::pair location_; -}; - // This class is thread-safe. class CoreWorkerDirectActorTaskSubmitter { public: @@ -60,11 +48,16 @@ class CoreWorkerDirectActorTaskSubmitter { /// \return Status::Invalid if the task is not yet supported. Status SubmitTask(TaskSpecification task_spec); - /// Handle an update about an actor. + /// Create connection to actor and send all pending tasks. /// - /// \param[in] actor_id The ID of the actor whose status has changed. - /// \param[in] actor_data The actor's new status information. - void HandleActorUpdate(const ActorID &actor_id, const gcs::ActorTableData &actor_data); + /// \param[in] actor_id Actor ID. + /// \param[in] address The new address of the actor. + void ConnectActor(const ActorID &actor_id, const rpc::Address &address); + + /// Disconnect from a failed actor. + /// + /// \param[in] actor_id Actor ID. + void DisconnectActor(const ActorID &actor_id, bool dead = false); private: /// Push a task to a remote actor via the given client. @@ -102,9 +95,6 @@ class CoreWorkerDirectActorTaskSubmitter { /// Mutex to proect the various maps below. mutable absl::Mutex mu_; - /// Map from actor id to actor state. This only includes actors that we send tasks to. - absl::flat_hash_map actor_states_ GUARDED_BY(mu_); - /// Map from actor id to rpc client. This only includes actors that we send tasks to. /// We use shared_ptr to enable shared_from_this for pending client callbacks. /// @@ -429,7 +419,11 @@ class CoreWorkerDirectTaskReceiver { CoreWorkerDirectTaskReceiver(WorkerContext &worker_context, boost::asio::io_service &main_io_service, const TaskHandler &task_handler, - const std::function &exit_handler); + const std::function &exit_handler) + : worker_context_(worker_context), + task_handler_(task_handler), + exit_handler_(exit_handler), + task_main_io_service_(main_io_service) {} ~CoreWorkerDirectTaskReceiver() { fiber_shutdown_event_.Notify(); @@ -437,7 +431,8 @@ class CoreWorkerDirectTaskReceiver { } /// Initialize this receiver. This must be called prior to use. - void Init(raylet::RayletClient &client); + void Init(raylet::RayletClient &client, rpc::ClientFactoryFn client_factory, + rpc::Address rpc_address); /// Handle a `PushTask` request. /// @@ -471,6 +466,10 @@ class CoreWorkerDirectTaskReceiver { std::function exit_handler_; /// The IO event loop for running tasks on. boost::asio::io_service &task_main_io_service_; + /// Factory for producing new core worker clients. + rpc::ClientFactoryFn client_factory_; + /// Address of our RPC server. + rpc::Address rpc_address_; /// Shared waiter for dependencies required by incoming tasks. std::unique_ptr waiter_; /// Queue of pending requests per actor handle. diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index b7058250f..6a58decd6 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -42,6 +42,7 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle( auto lease_entry = worker_to_lease_client_[addr]; auto queue_entry = task_queues_.find(scheduling_key); // Return the worker if there was an error executing the previous task, + // the previous task is an actor creation task, // there are no more applicable queued tasks, or the lease is expired. if (was_error || queue_entry == task_queues_.end() || current_time_ms() > lease_entry.second) { @@ -115,10 +116,12 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( RAY_LOG(DEBUG) << "Lease granted " << task_id; rpc::WorkerAddress addr = { reply.worker_address().ip_address(), reply.worker_address().port(), - WorkerID::FromBinary(reply.worker_address().worker_id())}; + WorkerID::FromBinary(reply.worker_address().worker_id()), + ClientID::FromBinary(reply.worker_address().raylet_id())}; AddWorkerLeaseClient(addr, std::move(lease_client)); auto resources_copy = reply.resource_mapping(); - OnWorkerIdle(addr, scheduling_key, /*error=*/false, resources_copy); + OnWorkerIdle(addr, scheduling_key, + /*error=*/false, resources_copy); } else { // The raylet redirected us to a different raylet to retry at. RequestNewWorkerIfNeeded(scheduling_key, &reply.retry_at_raylet_address()); @@ -158,6 +161,8 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( auto task_id = task_spec.TaskId(); auto request = std::unique_ptr(new rpc::PushTaskRequest); bool is_actor = task_spec.IsActorTask(); + bool is_actor_creation = task_spec.IsActorCreationTask(); + RAY_LOG(DEBUG) << "Pushing normal task " << task_spec.TaskId(); // NOTE(swang): CopyFrom is needed because if we use Swap here and the task // fails, then the task data will be gone when the TaskManager attempts to @@ -166,9 +171,10 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( request->mutable_resource_mapping()->CopyFrom(assigned_resources); auto status = client.PushNormalTask( std::move(request), - [this, task_id, is_actor, scheduling_key, addr, assigned_resources]( - Status status, const rpc::PushTaskReply &reply) { - { + [this, task_id, is_actor, is_actor_creation, scheduling_key, addr, + assigned_resources](Status status, const rpc::PushTaskReply &reply) { + // Successful actor creation leases the worker indefinitely from the raylet. + if (!status.ok() || !is_actor_creation) { absl::MutexLock lock(&mu_); OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources); } @@ -177,11 +183,13 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // failure (e.g., by contacting the raylet). If it was a process // failure, it may have been an application-level error and it may // not make sense to retry the task. - task_finisher_->PendingTaskFailed(task_id, is_actor - ? rpc::ErrorType::ACTOR_DIED - : rpc::ErrorType::WORKER_DIED); + task_finisher_->PendingTaskFailed( + task_id, + is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED, + &status); } else { - task_finisher_->CompletePendingTask(task_id, reply); + rpc::Address proto = addr.ToProto(); + task_finisher_->CompletePendingTask(task_id, reply, &proto); } }); if (!status.ok()) { @@ -191,7 +199,8 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( OnWorkerIdle(addr, scheduling_key, /*error=*/true, assigned_resources); } task_finisher_->PendingTaskFailed( - task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED); + task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED, + &status); } } }; // namespace ray diff --git a/src/ray/gcs/redis_actor_info_accessor.cc b/src/ray/gcs/redis_actor_info_accessor.cc index c0018c8a4..a093d702f 100644 --- a/src/ray/gcs/redis_actor_info_accessor.cc +++ b/src/ray/gcs/redis_actor_info_accessor.cc @@ -7,6 +7,31 @@ namespace ray { namespace gcs { +std::shared_ptr CreateActorTableData( + const TaskSpecification &task_spec, const rpc::Address &address, + gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions) { + RAY_CHECK(task_spec.IsActorCreationTask()); + auto actor_id = task_spec.ActorCreationId(); + auto actor_info_ptr = std::make_shared(); + // Set all of the static fields for the actor. These fields will not change + // even if the actor fails or is reconstructed. + actor_info_ptr->set_actor_id(actor_id.Binary()); + actor_info_ptr->set_parent_id(task_spec.CallerId().Binary()); + actor_info_ptr->set_actor_creation_dummy_object_id( + task_spec.ActorDummyObject().Binary()); + actor_info_ptr->set_job_id(task_spec.JobId().Binary()); + actor_info_ptr->set_max_reconstructions(task_spec.MaxActorReconstructions()); + actor_info_ptr->set_is_detached(task_spec.IsDetachedActor()); + // Set the fields that change when the actor is restarted. + actor_info_ptr->set_remaining_reconstructions(remaining_reconstructions); + actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall()); + actor_info_ptr->mutable_address()->CopyFrom(address); + actor_info_ptr->mutable_owner_address()->CopyFrom( + task_spec.GetMessage().caller_address()); + actor_info_ptr->set_state(state); + return actor_info_ptr; +} + RedisActorInfoAccessor::RedisActorInfoAccessor(RedisGcsClient *client_impl) : client_impl_(client_impl), actor_sub_executor_(client_impl_->actor_table()) {} diff --git a/src/ray/gcs/redis_actor_info_accessor.h b/src/ray/gcs/redis_actor_info_accessor.h index 21f66a6c8..a5c92ea85 100644 --- a/src/ray/gcs/redis_actor_info_accessor.h +++ b/src/ray/gcs/redis_actor_info_accessor.h @@ -2,6 +2,7 @@ #define RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H #include "ray/common/id.h" +#include "ray/common/task/task_spec.h" #include "ray/gcs/actor_info_accessor.h" #include "ray/gcs/callback.h" #include "ray/gcs/subscription_executor.h" @@ -11,6 +12,10 @@ namespace ray { namespace gcs { +std::shared_ptr CreateActorTableData( + const TaskSpecification &task_spec, const rpc::Address &address, + gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions); + class RedisGcsClient; /// \class RedisActorInfoAccessor diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 339751c82..af2a408dc 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -126,6 +126,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { Attach(io_service); actor_table_.reset(new ActorTable({primary_context_}, this)); + direct_actor_table_.reset(new DirectActorTable({primary_context_}, this)); // TODO(micafan) Modify ClientTable' Constructor(remove ClientID) in future. // We will use NodeID instead of ClientID. @@ -200,6 +201,8 @@ raylet::TaskTable &RedisGcsClient::raylet_task_table() { return *raylet_task_tab ActorTable &RedisGcsClient::actor_table() { return *actor_table_; } +DirectActorTable &RedisGcsClient::direct_actor_table() { return *direct_actor_table_; } + TaskReconstructionLog &RedisGcsClient::task_reconstruction_log() { return *task_reconstruction_log_; } diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 146d80f60..5eda50896 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -66,6 +66,10 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { ActorCheckpointTable &actor_checkpoint_table(); ActorCheckpointIdTable &actor_checkpoint_id_table(); DynamicResourceTable &resource_table(); + /// Used only for direct calls. Tasks submitted through the raylet transport + /// should use Actors(), which has a requirement on the order in which + /// entries can be appended to the log. + DirectActorTable &direct_actor_table(); // We also need something to export generic code to run on workers from the // driver (to set the PYTHONPATH) @@ -100,6 +104,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { std::unique_ptr object_table_; std::unique_ptr raylet_task_table_; std::unique_ptr actor_table_; + std::unique_ptr direct_actor_table_; std::unique_ptr task_reconstruction_log_; std::unique_ptr task_lease_table_; std::unique_ptr heartbeat_table_; diff --git a/src/ray/gcs/subscription_executor.cc b/src/ray/gcs/subscription_executor.cc index 40f0ead29..84f2b65b5 100644 --- a/src/ray/gcs/subscription_executor.cc +++ b/src/ray/gcs/subscription_executor.cc @@ -187,6 +187,7 @@ Status SubscriptionExecutor::AsyncUnsubscribe( } template class SubscriptionExecutor; +template class SubscriptionExecutor; template class SubscriptionExecutor; } // namespace gcs diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index eb2240017..d970e6263 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -671,6 +671,16 @@ class ActorTable : public Log { } }; +class DirectActorTable : public Log { + public: + DirectActorTable(const std::vector> &contexts, + RedisGcsClient *client) + : Log(contexts, client) { + pubsub_channel_ = TablePubsub::DIRECT_ACTOR_PUBSUB; + prefix_ = TablePrefix::DIRECT_ACTOR; + } +}; + class TaskReconstructionLog : public Log { public: TaskReconstructionLog(const std::vector> &contexts, diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 0e68e0882..fc75eb495 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -96,7 +96,9 @@ message DirectActorCallArgWaitCompleteReply { } message GetObjectStatusRequest { + // The owner of the object. bytes owner_id = 1; + // Wait for this object's status. bytes object_id = 2; } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 9003bfd66..1dbdfec86 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -26,7 +26,8 @@ enum TablePrefix { ACTOR_CHECKPOINT = 15; ACTOR_CHECKPOINT_ID = 16; NODE_RESOURCE = 17; - TABLE_PREFIX_MAX = 18; + DIRECT_ACTOR = 18; + TABLE_PREFIX_MAX = 19; } // The channel that Add operations to the Table should be published on, if any. @@ -44,7 +45,8 @@ enum TablePubsub { TASK_LEASE_PUBSUB = 10; JOB_PUBSUB = 11; NODE_RESOURCE_PUBSUB = 12; - TABLE_PUBSUB_MAX = 13; + DIRECT_ACTOR_PUBSUB = 13; + TABLE_PUBSUB_MAX = 14; } enum GcsChangeMode { @@ -89,8 +91,8 @@ message ActorTableData { } // The ID of the actor that was created. bytes actor_id = 1; - // The ID of the actor that created it, null if created by non-actor task. - bytes parent_actor_id = 2; + // The ID of the caller of the actor creation task. + bytes parent_id = 2; // The dummy object ID returned by the actor creation task. If the actor // dies, then this is the object that should be reconstructed for the actor // to be recreated. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 867c65006..bc0ef2ef2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2321,7 +2321,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { worker.ResetTaskResourceIds(); const auto &spec = task.GetTaskSpecification(); - if (spec.IsActorCreationTask() || spec.IsActorTask()) { + if ((spec.IsActorCreationTask() || spec.IsActorTask()) && !spec.IsDirectCall()) { // If this was an actor or actor creation task, handle the actor's new // state. FinishAssignedActorTask(worker, task); @@ -2406,6 +2406,7 @@ std::shared_ptr NodeManager::CreateActorTableDataFromCreationTas } void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { + RAY_LOG(INFO) << "Finishing assigned actor task"; ActorID actor_id; TaskID caller_id; const TaskSpecification task_spec = task.GetTaskSpecification(); @@ -2508,7 +2509,7 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id // Notify the other node managers that the actor has been created. const ActorID actor_id = task_spec.ActorCreationId(); auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec, port); - new_actor_info->set_parent_actor_id(parent_actor_id.Binary()); + new_actor_info->set_parent_id(parent_actor_id.Binary()); auto update_callback = [actor_id](Status status) { if (!status.ok()) { // Only one node at a time should succeed at creating or updating the actor. diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 1e84d1ead..e3ca41439 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -42,12 +42,21 @@ class WorkerAddress { public: template friend H AbslHashValue(H h, const WorkerAddress &w) { - return H::combine(std::move(h), w.ip_address, w.port, w.worker_id); + return H::combine(std::move(h), w.ip_address, w.port, w.worker_id, w.raylet_id); } bool operator==(const WorkerAddress &other) const { return other.ip_address == ip_address && other.port == port && - other.worker_id == worker_id; + other.worker_id == worker_id && other.raylet_id == raylet_id; + } + + rpc::Address ToProto() const { + rpc::Address addr; + addr.set_raylet_id(raylet_id.Binary()); + addr.set_ip_address(ip_address); + addr.set_port(port); + addr.set_worker_id(worker_id.Binary()); + return addr; } /// The ip address of the worker. @@ -56,6 +65,8 @@ class WorkerAddress { const int port; /// The unique id of the worker. const WorkerID worker_id; + /// The unique id of the worker raylet. + const ClientID raylet_id; }; typedef std::function(const WorkerAddress &)> @@ -184,6 +195,7 @@ class CoreWorkerClient : public std::enable_shared_from_this, *stub_, &CoreWorkerService::Stub::PrepareAsyncGetObjectStatus, request, callback); return call->GetStatus(); } + /// Send as many pending tasks as possible. This method is thread-safe. /// /// The client will guarantee no more than kMaxBytesInFlight bytes of RPCs are being