From d2bba596ab35cc771786fe26538ad7a25cf0e652 Mon Sep 17 00:00:00 2001 From: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com> Date: Thu, 26 Dec 2019 10:59:50 +0800 Subject: [PATCH] Fix actor reconstruction with direct call (#6570) --- .../ray/api/test/ActorReconstructionTest.java | 7 --- python/ray/tests/test_actor.py | 1 - python/ray/tests/test_component_failures_3.py | 22 ++++++++- src/ray/core_worker/context.cc | 8 +++- src/ray/core_worker/core_worker.cc | 33 +++++++------ src/ray/core_worker/core_worker.h | 14 ------ src/ray/core_worker/task_manager.cc | 8 ---- src/ray/core_worker/test/core_worker_test.cc | 20 ++++---- .../transport/direct_actor_transport.cc | 10 ++++ .../transport/direct_actor_transport.h | 2 + .../transport/direct_task_transport.cc | 5 +- .../transport/direct_task_transport.h | 8 +++- src/ray/gcs/redis_accessor.cc | 3 +- src/ray/raylet/node_manager.cc | 47 ++++++++++++------- src/ray/raylet/node_manager.h | 5 +- src/ray/raylet/task_dependency_manager.cc | 31 +++++++++++- 16 files changed, 140 insertions(+), 84 deletions(-) diff --git a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java index 00cc1e385..43ccfe0ff 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java @@ -47,9 +47,6 @@ public class ActorReconstructionTest extends BaseTest { public void testActorReconstruction() throws InterruptedException, IOException { TestUtils.skipTestUnderSingleProcess(); - // TODO (kfstorm): Actor reconstruction is currently not supporeted in direct actor call mode. - // Will re-enable the test once the issue got fixed. - TestUtils.skipTestIfDirectActorCallEnabled(); ActorCreationOptions options = new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions(); RayActor actor = Ray.createActor(Counter::new, options); @@ -130,10 +127,6 @@ public class ActorReconstructionTest extends BaseTest { public void testActorCheckpointing() throws IOException, InterruptedException { TestUtils.skipTestUnderSingleProcess(); - // TODO (kfstorm): In direct actor call mode, the actor creation task is not pushed to raylet. - // But to save an actor checkpoint, raylet needs to know about this the actor. Will re-enable - // the test once the issue got fixed. - TestUtils.skipTestIfDirectActorCallEnabled(); ActorCreationOptions options = new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions(); RayActor actor = Ray.createActor(CheckpointableCounter::new, options); diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index a398b77c2..198d15b50 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -836,7 +836,6 @@ def test_actor_init_fails(ray_start_cluster_head): assert results == [1 for actor in actors] -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet") def test_reconstruction_suppression(ray_start_cluster_head): cluster = ray_start_cluster_head num_nodes = 5 diff --git a/python/ray/tests/test_component_failures_3.py b/python/ray/tests/test_component_failures_3.py index 625eed573..db6622ce2 100644 --- a/python/ray/tests/test_component_failures_3.py +++ b/python/ray/tests/test_component_failures_3.py @@ -12,6 +12,8 @@ import pytest import ray import ray.ray_constants as ray_constants +RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() + @pytest.mark.parametrize( "ray_start_cluster", [{ @@ -29,13 +31,16 @@ def test_actor_creation_node_failure(ray_start_cluster): def __init__(self, death_probability): self.death_probability = death_probability + def get_probability(self): + return self.death_probability + def ping(self): # Exit process with some probability. exit_chance = np.random.rand() if exit_chance < self.death_probability: sys.exit(-1) - num_children = 50 + num_children = 25 # Children actors will die about half the time. death_probability = 0.5 @@ -58,6 +63,21 @@ def test_actor_creation_node_failure(ray_start_cluster): ray.get(out) except ray.exceptions.RayActorError: children[i] = Child.remote(death_probability) + + if (RAY_FORCE_DIRECT): + children_out = [ + child.get_probability.remote() for child in children + ] + # Wait for new created actors to finish creation before + # removing a node. This is needed because right now we don't + # support reconstructing actors that died in the process of + # being created. + ready, _ = ray.wait( + children_out, + num_returns=len(children_out), + timeout=5 * 60.0) + assert len(ready) == len(children_out) + # Remove a node. Any actor creation tasks that were forwarded to this # node must be reconstructed. cluster.remove_node(cluster.list_all_nodes()[-1]) diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 325656feb..8cd7015f8 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -125,7 +125,13 @@ bool WorkerContext::CurrentThreadIsMain() const { } bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const { - return !CurrentActorIsDirectCall() && CurrentThreadIsMain(); + // Check if we need to release resources when we block: + // - Driver doesn't acquire resources and thus doesn't need to release. + // - We only support lifetime resources for direct actors, which can be + // acquired when the actor is created, per call resources are not supported, + // thus we don't need to release resources for direct actor call. + return worker_type_ != WorkerType::DRIVER && !CurrentActorIsDirectCall() && + CurrentThreadIsMain(); } bool WorkerContext::CurrentActorIsDirectCall() const { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 1405e5095..f2deefdda 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -101,13 +101,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, // Initialize gcs client. gcs_client_ = std::make_shared(gcs_options); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); - direct_actor_table_subscriber_ = std::unique_ptr< - gcs::SubscriptionExecutor>( - new gcs::SubscriptionExecutor( - gcs_client_->direct_actor_table())); - - actor_manager_ = - std::unique_ptr(new ActorManager(gcs_client_->direct_actor_table())); // Initialize profiler. profiler_ = std::make_shared(worker_context_, node_ip_address, @@ -194,8 +187,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_)); task_manager_.reset(new TaskManager( - memory_store_, reference_counter_, actor_manager_, - [this](const TaskSpecification &spec) { + memory_store_, reference_counter_, nullptr, [this](const TaskSpecification &spec) { // Retry after a delay to emulate the existing Raylet reconstruction // behaviour. TODO(ekl) backoff exponentially. RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: " @@ -301,8 +293,7 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { if (actor_id_.IsNil() && task_id.IsNil()) { absl::MutexLock lock(&actor_handles_mutex_); for (const auto &handle : actor_handles_) { - RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncUnsubscribe( - subscribe_id_, handle.first, nullptr)); + RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr)); } actor_handles_.clear(); } @@ -749,8 +740,20 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { // Register a callback to handle actor notifications. auto actor_notification_callback = [this](const ActorID &actor_id, const gcs::ActorTableData &actor_data) { - RAY_CHECK(actor_data.state() != gcs::ActorTableData::RECONSTRUCTING); - if (actor_data.state() == gcs::ActorTableData::DEAD) { + if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { + absl::MutexLock lock(&actor_handles_mutex_); + auto it = actor_handles_.find(actor_id); + RAY_CHECK(it != actor_handles_.end()); + if (it->second->IsDirectCallActor()) { + // We have to reset the actor handle since the next instance of the + // actor will not have the last sequence number that we sent. + // TODO: Remove the check for direct calls. We do not reset for the + // raylet codepath because it tries to replay all tasks since the + // last actor checkpoint. + it->second->Reset(); + } + direct_actor_submitter_->DisconnectActor(actor_id, false); + } else if (actor_data.state() == gcs::ActorTableData::DEAD) { direct_actor_submitter_->DisconnectActor(actor_id, true); ActorHandle *actor_handle = nullptr; @@ -772,8 +775,8 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { << ClientID::FromBinary(actor_data.address().raylet_id()); }; - RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncSubscribe( - subscribe_id_, actor_id, actor_notification_callback, nullptr)); + RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( + actor_id, actor_notification_callback, nullptr)); } return inserted; } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a2feae095..63c24f3bd 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -586,17 +586,6 @@ class CoreWorker { // Client to the GCS shared by core worker interfaces. std::shared_ptr gcs_client_; - /// This is temporary fake node id that is used only by - /// `direct_actor_table_subscriber_ `. - /// TODO(micafan): remove `direct_actor_table_subscriber_` and - /// use `GcsClient` for actor subscription. - ClientID subscribe_id_{ClientID::FromRandom()}; - - // Client to listen to direct actor events. - std::unique_ptr< - gcs::SubscriptionExecutor> - direct_actor_table_subscriber_; - // Client to the raylet shared by core worker interfaces. This needs to be a // shared_ptr for direct calls because we can lease multiple workers through // one client, and we need to keep the connection alive until we return all @@ -628,9 +617,6 @@ class CoreWorker { // Tracks the currently pending tasks. std::shared_ptr task_manager_; - // Interface for publishing actor creation. - std::shared_ptr actor_manager_; - // Interface to submit tasks directly to other actors. std::unique_ptr direct_actor_submitter_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 7d905bffe..323c82159 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -101,11 +101,6 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, } } - if (spec.IsActorCreationTask()) { - RAY_CHECK(actor_addr != nullptr); - actor_manager_->PublishCreatedActor(spec, *actor_addr); - } - ShutdownIfNeeded(); } @@ -208,9 +203,6 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, /*transport_type=*/static_cast(TaskTransportType::DIRECT)); RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id)); } - if (spec.IsActorCreationTask()) { - actor_manager_->PublishTerminatedActor(spec); - } } TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const { diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 8253492f2..69a41302d 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -993,18 +993,16 @@ TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodes) { TestActorTask(resources, true); } -// TODO(ekl) re-enable once reconstruction is implemented -// TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) { -// std::unordered_map resources; -// TestActorReconstruction(resources, true); -//} +TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) { + std::unordered_map resources; + TestActorReconstruction(resources, true); +} -// TODO(ekl) re-enable once reconstruction is implemented -// TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) { -// std::unordered_map resources; -// resources.emplace("resource1", 1); -// TestActorReconstruction(resources, true); -//} +TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) { + std::unordered_map resources; + resources.emplace("resource1", 1); + TestActorReconstruction(resources, true); +} TEST_F(SingleNodeTest, TestDirectActorTaskLocalFailure) { std::unordered_map resources; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 9f49e444a..e1a6ba335 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -174,6 +174,7 @@ void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client, waiter_.reset(new DependencyWaiterImpl(raylet_client)); rpc_address_ = rpc_address; client_factory_ = client_factory; + local_raylet_client_ = raylet_client; } void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) { @@ -283,6 +284,15 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } } } + + if (task_spec.IsActorCreationTask()) { + RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId() + << ", actor_id: " << task_spec.ActorCreationId(); + // Tell raylet that an actor creation task has finished execution, so that + // raylet can publish actor creation event to GCS, and mark this worker as + // actor, thus if this worker dies later raylet will reconstruct the actor. + RAY_CHECK_OK(local_raylet_client_->TaskDone()); + } } if (status.IsSystemExit()) { // Don't allow the worker to be reused, even though the reply status is OK. diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 8fd2c3c35..d1839942c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -509,6 +509,8 @@ class CoreWorkerDirectTaskReceiver { /// The fiber semaphore used to limit the number of concurrent fibers /// running at once. std::shared_ptr fiber_rate_limiter_; + + boost::optional local_raylet_client_; }; } // namespace ray diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index a3cfe9ce0..7359c1883 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -11,8 +11,9 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { absl::MutexLock lock(&mu_); // Note that the dependencies in the task spec are mutated to only contain // plasma dependencies after ResolveDependencies finishes. - const SchedulingKey scheduling_key(task_spec.GetSchedulingClass(), - task_spec.GetDependencies()); + const SchedulingKey scheduling_key( + task_spec.GetSchedulingClass(), task_spec.GetDependencies(), + task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil()); auto it = task_queues_.find(scheduling_key); if (it == task_queues_.end()) { it = task_queues_.emplace(scheduling_key, std::deque()).first; diff --git a/src/ray/core_worker/transport/direct_task_transport.h b/src/ray/core_worker/transport/direct_task_transport.h index 2b8f6d36d..aec5918e9 100644 --- a/src/ray/core_worker/transport/direct_task_transport.h +++ b/src/ray/core_worker/transport/direct_task_transport.h @@ -25,8 +25,12 @@ typedef std::function(const std::string &i // (encapsulated in SchedulingClass) to defer resource allocation decisions to the raylet // and ensure fairness between different tasks, as well as plasma task dependencies as // a performance optimization because the raylet will fetch plasma dependencies to the -// scheduled worker. -using SchedulingKey = std::pair>; +// scheduled worker. It's also keyed on actor ID to ensure the actor creation task +// would always request a new worker lease. We need this to let raylet know about +// direct actor creation task, and reconstruct the actor if it dies. Otherwise if +// the actor creation task just reuses an existing worker, then raylet will not +// be aware of the actor and is not able to manage it. +using SchedulingKey = std::tuple, ActorID>; // This class is thread-safe. class CoreWorkerDirectTaskSubmitter { diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 999cbb7b1..249885bcb 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -85,7 +85,8 @@ Status RedisActorInfoAccessor::AsyncUpdate( // RECONSTRUCTING or DEAD entries have an odd index. log_length += 1; } - + RAY_LOG(DEBUG) << "AsyncUpdate actor state to " << data_ptr->state() + << ", actor id: " << actor_id << ", log_length: " << log_length; auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id, const ActorTableData &data) { // If we successfully appended a record to the GCS table of the actor that diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 266fd335c..d55b061a8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -692,8 +692,6 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_b HeartbeatAdded(client_id, heartbeat_data); } - RAY_LOG(DEBUG) << "Total active object IDs received: " << active_object_ids.size(); - // Refresh the active object IDs in plasma to prevent them from being evicted. std::vector plasma_ids; plasma_ids.reserve(active_object_ids.size()); @@ -1058,7 +1056,8 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca if (was_local && !status.ok()) { // If the disconnected actor was local, only this node will try to update actor // state. So the update shouldn't fail. - RAY_LOG(FATAL) << "Failed to update state for actor " << actor_id; + RAY_LOG(FATAL) << "Failed to update state for actor " << actor_id + << ", status: " << status.ToString(); } }; auto actor_notification = std::make_shared(new_actor_info); @@ -1510,6 +1509,18 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); Task task(task_message); + bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask(); + ActorID actor_id = ActorID::Nil(); + if (is_actor_creation_task) { + actor_id = task.GetTaskSpecification().ActorCreationId(); + + // Save the actor creation task spec to GCS, which is needed to + // reconstruct the actor when raylet detect it dies. + std::shared_ptr data = std::make_shared(); + data->mutable_task()->mutable_task_spec()->CopyFrom( + task.GetTaskSpecification().GetMessage()); + RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr)); + } if (new_scheduler_enabled_) { auto request_resources = @@ -2204,7 +2215,7 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & } RAY_LOG(DEBUG) << "Assigning task " << spec.TaskId() << " to worker with pid " - << worker->Pid(); + << worker->Pid() << ", worker id: " << worker->WorkerId(); flatbuffers::FlatBufferBuilder fbb; // Resource accounting: acquire resources for the assigned task. @@ -2235,6 +2246,9 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds() : worker->GetTaskResourceIds()); post_assign_callbacks->push_back([this, worker, task_id]() { + RAY_LOG(DEBUG) << "Finished assigning task " << task_id << " to worker " + << worker->WorkerId(); + FinishAssignTask(worker, task_id, /*success=*/true); }); } else { @@ -2276,7 +2290,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { worker.ResetTaskResourceIds(); const auto &spec = task.GetTaskSpecification(); - if ((spec.IsActorCreationTask() || spec.IsActorTask()) && !spec.IsDirectCall()) { + if ((spec.IsActorCreationTask() || spec.IsActorTask())) { // If this was an actor or actor creation task, handle the actor's new // state. FinishAssignedActorTask(worker, task); @@ -2305,7 +2319,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { } std::shared_ptr NodeManager::CreateActorTableDataFromCreationTask( - const TaskSpecification &task_spec, int port) { + const TaskSpecification &task_spec, int port, const WorkerID &worker_id) { RAY_CHECK(task_spec.IsActorCreationTask()); auto actor_id = task_spec.ActorCreationId(); auto actor_entry = actor_registry_.find(actor_id); @@ -2355,6 +2369,7 @@ std::shared_ptr NodeManager::CreateActorTableDataFromCreationTas gcs_client_->Nodes().GetSelfInfo().node_manager_address()); actor_info_ptr->mutable_address()->set_port(port); actor_info_ptr->mutable_address()->set_raylet_id(self_node_id_.Binary()); + actor_info_ptr->mutable_address()->set_worker_id(worker_id.Binary()); actor_info_ptr->set_state(ActorTableData::ALIVE); return actor_info_ptr; } @@ -2387,11 +2402,12 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { // Lookup the parent actor id. auto parent_task_id = task_spec.ParentTaskId(); int port = worker.Port(); + auto worker_id = worker.WorkerId(); RAY_CHECK_OK( gcs_client_->Tasks().AsyncGet( parent_task_id, /*callback=*/ - [this, task_spec, resumed_from_checkpoint, port, parent_task_id]( + [this, task_spec, resumed_from_checkpoint, port, parent_task_id, worker_id]( Status status, const boost::optional &parent_task_data) { if (parent_task_data) { // The task was in the GCS task table. Use the stored task spec to @@ -2404,7 +2420,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { parent_actor_id = parent_task.GetTaskSpecification().ActorId(); } FinishAssignedActorCreationTask(parent_actor_id, task_spec, - resumed_from_checkpoint, port); + resumed_from_checkpoint, port, worker_id); return; } // The parent task was not in the GCS task table. It should most likely be @@ -2429,7 +2445,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { << "ray.init(redis_max_memory=)."; } FinishAssignedActorCreationTask(parent_actor_id, task_spec, - resumed_from_checkpoint, port); + resumed_from_checkpoint, port, worker_id); })); } else { auto actor_entry = actor_registry_.find(actor_id); @@ -2456,11 +2472,11 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id, const TaskSpecification &task_spec, - bool resumed_from_checkpoint, - int port) { + bool resumed_from_checkpoint, int port, + const WorkerID &worker_id) { // Notify the other node managers that the actor has been created. const ActorID actor_id = task_spec.ActorCreationId(); - auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec, port); + auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec, port, worker_id); new_actor_info->set_parent_id(parent_actor_id.Binary()); auto update_callback = [actor_id](Status status) { if (!status.ok()) { @@ -2557,11 +2573,6 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object RAY_LOG(DEBUG) << "Attempting to resubmit task " << task.GetTaskSpecification().TaskId(); - if (task.GetTaskSpecification().IsDirectCall()) { - TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE); - return; - } - // Actors should only be recreated if the first initialization failed or if // the most recent instance of the actor failed. if (task.GetTaskSpecification().IsActorCreationTask()) { @@ -2823,6 +2834,7 @@ void NodeManager::ForwardTask( void NodeManager::FinishAssignTask(const std::shared_ptr &worker, const TaskID &task_id, bool success) { + RAY_LOG(DEBUG) << "FinishAssignTask: " << task_id; // Remove the ASSIGNED task from the READY queue. Task assigned_task; TaskState state; @@ -2831,7 +2843,6 @@ void NodeManager::FinishAssignTask(const std::shared_ptr &worker, return; } RAY_CHECK(state == TaskState::READY); - if (success) { auto spec = assigned_task.GetTaskSpecification(); // We successfully assigned the task to the worker. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 115a16819..31d861832 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -234,7 +234,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// actor. /// \param worker The port that the actor is listening on. std::shared_ptr CreateActorTableDataFromCreationTask( - const TaskSpecification &task_spec, int port); + const TaskSpecification &task_spec, int port, const WorkerID &worker_id); /// Handle a worker finishing an assigned actor task or actor creation task. /// \param worker The worker that finished the task. /// \param task The actor task or actor creation task. @@ -252,7 +252,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void FinishAssignedActorCreationTask(const ActorID &parent_actor_id, const TaskSpecification &task_spec, - bool resumed_from_checkpoint, int port); + bool resumed_from_checkpoint, int port, + const WorkerID &worker_id); /// Make a placement decision for placeable tasks given the resource_map /// provided. This will perform task state transitions and task forwarding. /// diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index b77b93244..258e4f5db 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -310,8 +310,37 @@ std::vector TaskDependencyManager::GetPendingTasks() const { void TaskDependencyManager::TaskPending(const Task &task) { // Direct tasks are not tracked by the raylet. + // NOTE(zhijunfu): Direct tasks are not tracked by the raylet, + // but we still need raylet to reconstruct the actors. + // For direct actor creation task: + // - Initially the caller leases a worker from raylet and + // then pushes actor creation task directly to the worker, + // thus it doesn't need task lease. And actually if we + // acquire a lease in this case and forget to cancel it, + // the lease would never expire which will prevent the + // actor from being reconstructed; + // - When a direct actor is reconstructed, raylet resubmits + // the task, and the task can be forwarded to another raylet, + // and eventually assigned to a worker. In this case we need + // the task lease to make sure there's only one raylet can + // resubmit the task. if (task.GetTaskSpecification().IsDirectCall()) { - return; + // We can use `OnDispatch` to differeniate whether this task is + // a worker lease request. + // For direct actor creation task: + // - when it's submitted by core worker, we guarantee that + // we always request a new worker lease, in that case + // `OnDispatch` is overriden to an actual callback. + // - when it's resubmitted by raylet because of reconstruction, + // `OnDispatch` will not be overriden and thus is nullptr. + if (task.GetTaskSpecification().IsActorCreationTask() && + task.OnDispatch() == nullptr) { + // This is an actor creation task, and it's being reconstructed, + // in this case we still need the task lease. Note that we don't + // require task lease for direct actor creation task. + } else { + return; + } } TaskID task_id = task.GetTaskSpecification().TaskId();