From 3e33f6f71bb52841127464366dd7922c996d044d Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 21 Nov 2018 12:26:22 -0800 Subject: [PATCH] Fix failure handling for actor death (#3359) * Broadcast actor death, clean up dummy objects * Reduce logging and clean up state when failing a task * lint * Make actor failure test nicer, reduce node timeout --- src/ray/gcs/format/gcs.fbs | 9 + src/ray/raylet/actor_registration.cc | 7 +- src/ray/raylet/actor_registration.h | 19 +- src/ray/raylet/node_manager.cc | 250 ++++++++++++++-------- src/ray/raylet/node_manager.h | 30 +-- src/ray/raylet/task_dependency_manager.cc | 8 +- src/ray/raylet/task_dependency_manager.h | 3 +- test/actor_test.py | 17 +- test/component_failures_test.py | 7 +- 9 files changed, 231 insertions(+), 119 deletions(-) diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 1cc09925c..6414613d4 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -164,6 +164,13 @@ table TaskTableTestAndUpdate { table ClassTableData { } +enum ActorState:int { + // Actor is alive. + ALIVE = 0, + // Actor is already dead and won't be reconstructed. + DEAD +} + table ActorTableData { // The ID of the actor that was created. actor_id: string; @@ -175,6 +182,8 @@ table ActorTableData { driver_id: string; // The ID of the node manager that created the actor. node_manager_id: string; + // Current state of this actor. + state: ActorState; } table ErrorTableData { diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index 45215e004..3f4a67c1d 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -37,11 +37,12 @@ void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, frontier_entry.task_counter++; frontier_entry.execution_dependency = execution_dependency; execution_dependency_ = execution_dependency; + dummy_objects_.push_back(execution_dependency); } -bool ActorRegistration::IsAlive() const { return alive_; } - -void ActorRegistration::MarkDead() { alive_ = false; } +bool ActorRegistration::IsAlive() const { + return actor_table_data_.state == ActorState::ALIVE; +} std::string ActorRegistration::DebugString() const { std::stringstream result; diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index d5d3f8c39..faa05eb26 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -36,6 +36,16 @@ class ActorRegistration { ObjectID execution_dependency; }; + /// Get the actor table data. + /// + /// \return The actor table data. + const ActorTableDataT &GetTableData() const { return actor_table_data_; } + + /// Get the actor's current state (ALIVE or DEAD). + /// + /// \return The actor's current state. + const ActorState &GetState() const { return actor_table_data_.state; } + /// Get the actor's node manager location. /// /// \return The actor's node manager location. All tasks for the actor should @@ -66,6 +76,9 @@ class ActorRegistration { /// that handle. const std::unordered_map &GetFrontier() const; + /// Get all the dummy objects of this actor's tasks. + const std::vector &GetDummyObjects() const { return dummy_objects_; } + /// Extend the frontier of the actor by a single task. This should be called /// whenever the actor executes a task. /// @@ -81,10 +94,6 @@ class ActorRegistration { /// \return True if the local actor is alive and false if it is dead. bool IsAlive() const; - /// Mark the actor as dead. - /// \return Void. - void MarkDead(); - /// Returns debug string for class. /// /// \return string. @@ -104,6 +113,8 @@ class ActorRegistration { /// executed so far and which tasks may execute next, based on execution /// dependencies. This is indexed by handle. std::unordered_map frontier_; + /// All of the dummy object IDs from this actor's tasks. + std::vector dummy_objects_; }; } // namespace raylet diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1efa3a487..a0b7526e8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -135,9 +135,11 @@ ray::Status NodeManager::RegisterGcs() { task_lease_notification_callback, task_lease_empty_callback, nullptr)); // Register a callback for actor creation notifications. - auto actor_creation_callback = [this]( - gcs::AsyncGcsClient *client, const ActorID &actor_id, - const std::vector &data) { HandleActorCreation(actor_id, data); }; + auto actor_creation_callback = [this](gcs::AsyncGcsClient *client, + const ActorID &actor_id, + const std::vector &data) { + HandleActorStateTransition(actor_id, data.back()); + }; RAY_RETURN_NOT_OK(gcs_client_->actor_table().Subscribe( UniqueID::nil(), UniqueID::nil(), actor_creation_callback, nullptr)); @@ -397,6 +399,16 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { // Remove the remote server connection. remote_server_connections_.erase(client_id); + + // For any live actors that were on the dead node, broadcast a notification + // about the actor's death + // TODO(swang): This could be very slow if there are many actors. + for (const auto &actor_entry : actor_registry_) { + if (actor_entry.second.GetNodeManagerId() == client_id && + actor_entry.second.IsAlive()) { + HandleDisconnectedActor(actor_entry.first, /*was_local=*/false); + } + } } void NodeManager::HeartbeatAdded(const ClientID &client_id, @@ -434,7 +446,7 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, if (state != TaskState::INFEASIBLE) { // Don't unsubscribe for infeasible tasks because we never subscribed in // the first place. - task_dependency_manager_.UnsubscribeDependencies(task_id); + RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies(task_id)); } // Attempt to forward the task. If this fails to forward the task, // the task will be resubmit locally. @@ -455,32 +467,61 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_ } } -void NodeManager::HandleActorCreation(const ActorID &actor_id, - const std::vector &data) { - RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id; +void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_local) { + RAY_LOG(DEBUG) << "Actor disconnected " << actor_id; + auto actor_entry = actor_registry_.find(actor_id); + RAY_CHECK(actor_entry != actor_registry_.end()); - // TODO(swang): In presence of failures, data may have size > 1, since the - // actor will have been created multiple times. In that case, we should - // only consider the last entry as valid. All previous entries should have - // a dead node_manager_id. - RAY_CHECK(data.size() == 1); + // Release all the dummy objects for the dead actor. + if (was_local) { + for (auto &dummy_object : actor_entry->second.GetDummyObjects()) { + HandleObjectMissing(dummy_object); + } + } + + auto new_actor_data = + std::make_shared(actor_entry->second.GetTableData()); + new_actor_data->state = ActorState::DEAD; + HandleActorStateTransition(actor_id, *new_actor_data); + ray::gcs::ActorTable::WriteCallback failure_callback = nullptr; + if (was_local) { + // The actor was local to this node, so we are the only one who should try + // to update the log. + failure_callback = [](gcs::AsyncGcsClient *client, const ActorID &id, + const ActorTableDataT &data) { + RAY_LOG(FATAL) << "Failed to update state to DEAD for actor " << id; + }; + } + // Actor reconstruction is disabled, so the actor can only go from ALIVE to + // DEAD. The DEAD entry must therefore be at the second index in the log. + RAY_CHECK_OK(gcs_client_->actor_table().AppendAt(JobID::nil(), actor_id, new_actor_data, + nullptr, failure_callback, + /*log_index=*/1)); +} + +void NodeManager::HandleActorStateTransition(const ActorID &actor_id, + const ActorTableDataT &data) { + RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id << " " + << static_cast(data.state); // Register the new actor. - ActorRegistration actor_registration(data.back()); - ClientID received_node_manager_id = actor_registration.GetNodeManagerId(); - // Extend the frontier to include the actor creation task. NOTE(swang): The - // creator of the actor is always assigned nil as the actor handle ID. - actor_registration.ExtendFrontier(ActorHandleID::nil(), - actor_registration.GetActorCreationDependency()); - auto inserted = actor_registry_.emplace(actor_id, std::move(actor_registration)); - if (!inserted.second) { - // If we weren't able to insert the actor's location, check that the - // existing entry is the same as the new one. - // TODO(swang): This is not true in the case of failures. - RAY_CHECK(received_node_manager_id == inserted.first->second.GetNodeManagerId()) - << "Actor scheduled on " << inserted.first->second.GetNodeManagerId() - << ", but received notification for " << received_node_manager_id; + ActorRegistration actor_registration(data); + // Update local registry. + auto it = actor_registry_.find(actor_id); + if (it == actor_registry_.end()) { + it = actor_registry_.emplace(actor_id, actor_registration).first; } else { + RAY_CHECK(it->second.GetNodeManagerId() == actor_registration.GetNodeManagerId()); + if (actor_registration.GetState() > it->second.GetState()) { + // The new state is later than our current state. + it->second = actor_registration; + } else { + // Our state is already at or past the update, so skip the update. + return; + } + } + + if (it->second.IsAlive()) { // The actor's location is now known. Dequeue any methods that were // submitted before the actor's location was known. // (See design_docs/task_states.rst for the state transition diagram.) @@ -505,17 +546,14 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id, // empty lineage this time. SubmitTask(method, Lineage()); } - } -} - -void NodeManager::CleanUpTasksForDeadActor(const ActorID &actor_id) { - auto tasks_to_remove = local_queues_.GetTaskIdsForActor(actor_id); - auto removed_tasks = local_queues_.RemoveTasks(tasks_to_remove); - - for (auto const &task : removed_tasks) { - const TaskSpecification &spec = task.GetTaskSpecification(); - TreatTaskAsFailed(spec); - task_dependency_manager_.TaskCanceled(spec.TaskId()); + } else { + // When an actor dies, loop over all of the queued tasks for that actor + // and treat them as failed. + auto tasks_to_remove = local_queues_.GetTaskIdsForActor(actor_id); + auto removed_tasks = local_queues_.RemoveTasks(tasks_to_remove); + for (auto const &task : removed_tasks) { + TreatTaskAsFailed(task); + } } } @@ -716,12 +754,10 @@ void NodeManager::ProcessDisconnectClientMessage( // If the worker was killed intentionally, e.g., when the driver that created // the task that this worker is currently executing exits, the task for this // worker has already been removed from queue, so the following are skipped. - task_dependency_manager_.TaskCanceled(task_id); const Task &task = local_queues_.RemoveTask(task_id); - const TaskSpecification &spec = task.GetTaskSpecification(); // Handle the task failure in order to raise an exception in the // application. - TreatTaskAsFailed(spec); + TreatTaskAsFailed(task); const JobID &job_id = worker->GetAssignedDriverId(); @@ -741,15 +777,9 @@ void NodeManager::ProcessDisconnectClientMessage( // If the worker was an actor, add it to the list of dead actors. const ActorID &actor_id = worker->GetActorId(); if (!actor_id.is_nil()) { - // TODO(rkn): Consider broadcasting a message to all of the other - // node managers so that they can mark the actor as dead. - RAY_LOG(DEBUG) << "The actor with ID " << actor_id << " died."; - auto actor_entry = actor_registry_.find(actor_id); - RAY_CHECK(actor_entry != actor_registry_.end()); - actor_entry->second.MarkDead(); - // For dead actors, if there are remaining tasks for this actor, we - // should handle them. - CleanUpTasksForDeadActor(actor_id); + RAY_LOG(DEBUG) << "The actor with ID " << actor_id << " died on " + << gcs_client_->client_table().GetLocalClientId(); + HandleDisconnectedActor(actor_id, /*was_local=*/true); } const ClientID &client_id = gcs_client_->client_table().GetLocalClientId(); @@ -1021,7 +1051,8 @@ bool NodeManager::CheckDependencyManagerInvariant() const { return true; } -void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) { +void NodeManager::TreatTaskAsFailed(const Task &task) { + const TaskSpecification &spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed."; // Loop over the return IDs (except the dummy ID) and store a fake object in // the object store. @@ -1047,6 +1078,17 @@ void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) { ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id())); } } + // A task failing is equivalent to assigning and finishing the task, so clean + // up any leftover state as for any task dispatched and removed from the + // local queue. + lineage_cache_.AddReadyTask(task); + task_dependency_manager_.TaskCanceled(spec.TaskId()); + // Notify the task dependency manager that we no longer need this task's + // object dependencies. TODO(swang): Ideally, we would check the return value + // here. However, we don't know at this point if the task was in the WAITING + // or READY queue before, in which case we would not have been subscribed to + // its dependencies. + task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); } void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage, @@ -1071,27 +1113,21 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // Check whether we know the location of the actor. const auto actor_entry = actor_registry_.find(spec.ActorId()); if (actor_entry != actor_registry_.end()) { - // We have a known location for the actor. - auto node_manager_id = actor_entry->second.GetNodeManagerId(); - if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) { - // The actor is local. Check if the actor is still alive. - if (!actor_entry->second.IsAlive()) { - // Handle the fact that this actor is dead. - TreatTaskAsFailed(spec); - } else { + if (!actor_entry->second.IsAlive()) { + TreatTaskAsFailed(task); + } else { + // We have a known location for the actor. + auto node_manager_id = actor_entry->second.GetNodeManagerId(); + if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) { // Queue the task for local execution, bypassing placement. EnqueuePlaceableTask(task); - } - } else { - // The actor is remote. Forward the task to the node manager that owns - // the actor. - if (gcs_client_->client_table().IsRemoved(node_manager_id)) { - // The remote node manager is dead, so handle the fact that this actor - // is also dead. - TreatTaskAsFailed(spec); } else { - // Attempt to forward the task. If this fails to forward the task, - // the task will be resubmit locally. + // If the node manager has been removed, then it must have already been + // marked as DEAD in the handler for a removed GCS client. + RAY_CHECK(!gcs_client_->client_table().IsRemoved(node_manager_id)); + // The actor is remote. Attempt to forward the task to the node manager + // that owns the actor. If this fails to forward the task, the task + // will be resubmitted locally. ForwardTaskOrResubmit(task, node_manager_id); } } @@ -1106,7 +1142,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag const std::vector &data) { if (!data.empty()) { // The actor has been created. - HandleActorCreation(actor_id, data); + HandleActorStateTransition(actor_id, data.back()); } else { // The actor has not yet been created. // TODO(swang): Set a timer for reconstructing the actor creation @@ -1245,7 +1281,7 @@ void NodeManager::HandleTaskUnblocked( worker->RemoveBlockedTaskId(current_task_id); // Unsubscribe to the objects. Any fetch or reconstruction operations to // make the objects local are canceled. - task_dependency_manager_.UnsubscribeDependencies(current_task_id); + RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies(current_task_id)); local_queues_.RemoveBlockedTaskId(current_task_id); } @@ -1348,9 +1384,6 @@ bool NodeManager::AssignTask(const Task &task) { // (SetExecutionDependencies takes a non-const so copy task in a // on-const variable.) assigned_task.SetExecutionDependencies({execution_dependency}); - // Extend the frontier to include the executing task. - actor_entry->second.ExtendFrontier(spec.ActorHandleId(), - spec.ActorDummyObject()); } // We started running the task, so the task is ready to write to GCS. if (!lineage_cache_.AddReadyTask(assigned_task)) { @@ -1363,7 +1396,7 @@ bool NodeManager::AssignTask(const Task &task) { local_queues_.QueueRunningTasks(std::vector({assigned_task})); // Notify the task dependency manager that we no longer need this task's // object dependencies. - task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); + RAY_CHECK(task_dependency_manager_.UnsubscribeDependencies(spec.TaskId())); } else { RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client"; // We failed to send the task to the worker, so disconnect the worker. @@ -1398,18 +1431,26 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // Publish the actor creation event to all other nodes so that methods for // the actor will be forwarded directly to this node. - auto actor_notification = std::make_shared(); - actor_notification->actor_id = actor_id.binary(); - actor_notification->actor_creation_dummy_object_id = + RAY_CHECK(actor_registry_.find(actor_id) == actor_registry_.end()); + auto actor_data = std::make_shared(); + actor_data->actor_id = actor_id.binary(); + actor_data->actor_creation_dummy_object_id = task.GetTaskSpecification().ActorDummyObject().binary(); - actor_notification->driver_id = driver_id.binary(); - actor_notification->node_manager_id = - gcs_client_->client_table().GetLocalClientId().binary(); + actor_data->driver_id = driver_id.binary(); + actor_data->node_manager_id = gcs_client_->client_table().GetLocalClientId().binary(); + actor_data->state = ActorState::ALIVE; RAY_LOG(DEBUG) << "Publishing actor creation: " << actor_id << " driver_id: " << driver_id; - RAY_CHECK_OK(gcs_client_->actor_table().Append(JobID::nil(), actor_id, - actor_notification, nullptr)); + HandleActorStateTransition(actor_id, *actor_data); + // The actor should not have been created before, so writing to the first + // index in the log should succeed. + auto failure_callback = [](gcs::AsyncGcsClient *client, const ActorID &id, + const ActorTableDataT &data) { + RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; + }; + RAY_CHECK_OK(gcs_client_->actor_table().AppendAt( + JobID::nil(), actor_id, actor_data, nullptr, failure_callback, /*log_index=*/0)); // Resources required by an actor creation task are acquired for the // lifetime of the actor, so we do not release any resources here. @@ -1430,7 +1471,26 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // removing the objects, e.g., when an actor is terminated. if (task.GetTaskSpecification().IsActorCreationTask() || task.GetTaskSpecification().IsActorTask()) { + ActorID actor_id; + ActorHandleID actor_handle_id; + if (task.GetTaskSpecification().IsActorCreationTask()) { + actor_id = task.GetTaskSpecification().ActorCreationId(); + actor_handle_id = ActorHandleID::nil(); + } else { + actor_id = task.GetTaskSpecification().ActorId(); + actor_handle_id = task.GetTaskSpecification().ActorHandleId(); + } + auto actor_entry = actor_registry_.find(actor_id); + RAY_CHECK(actor_entry != actor_registry_.end()); auto dummy_object = task.GetTaskSpecification().ActorDummyObject(); + // Extend the actor's frontier to include the executed task. + actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); + // Mark the dummy object as locally available to indicate that the actor's + // state has changed and the next method can run. + // NOTE(swang): The dummy objects must be marked as local whenever + // ExtendFrontier is called, and vice versa, so that we can clean up the + // dummy objects properly in case the actor fails and needs to be + // reconstructed. HandleObjectLocal(dummy_object); } @@ -1447,8 +1507,6 @@ void NodeManager::FinishAssignedTask(Worker &worker) { } void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { - RAY_LOG(INFO) << "Reconstructing task " << task_id << " on client " - << gcs_client_->client_table().GetLocalClientId(); // Retrieve the task spec in order to re-execute the task. RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( JobID::nil(), task_id, @@ -1473,11 +1531,23 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { } void NodeManager::ResubmitTask(const Task &task) { - // Actor reconstruction is turned off by default right now. If this is an - // actor task, treat the task as failed and do not resubmit it. if (task.GetTaskSpecification().IsActorTask()) { - TreatTaskAsFailed(task.GetTaskSpecification()); - return; + // Actor reconstruction is turned off by default right now. + const ActorID actor_id = task.GetTaskSpecification().ActorId(); + auto it = actor_registry_.find(actor_id); + RAY_CHECK(it != actor_registry_.end()); + if (it->second.IsAlive()) { + // If the actor is still alive, then do not resubmit. + RAY_LOG(ERROR) << "The output of an actor task is required, but the actor may " + "still be alive. If the output has been evicted, the job may " + "hang."; + return; + } + // The actor is dead. The actor task will get resubmitted, at which point + // it will be treated as failed. + } else { + RAY_LOG(INFO) << "Reconstructing task " << task.GetTaskSpecification().TaskId() + << " on client " << gcs_client_->client_table().GetLocalClientId(); } // Driver tasks cannot be reconstructed. If this is a driver task, push an @@ -1581,8 +1651,8 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task, // Timer killing will receive the boost::asio::error::operation_aborted, // we only handle the timeout event. RAY_CHECK(!error); - RAY_LOG(DEBUG) << "Resubmitting task " << task_id - << " because ForwardTask failed."; + RAY_LOG(INFO) << "Resubmitting task " << task_id + << " because ForwardTask failed."; SubmitTask(task, Lineage()); }); // Remove the task from the lineage cache. The task will get added back diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 6d52b734c..7fd820a5e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -145,14 +145,17 @@ class NodeManager { /// \param task The task in question. /// \return Void. void EnqueuePlaceableTask(const Task &task); - /// This will treat the task as if it had been executed and failed. This is - /// done by looping over the task return IDs and for each ID storing an object - /// that represents a failure in the object store. When clients retrieve these - /// objects, they will raise application-level exceptions. + /// This will treat a task removed from the local queue as if it had been + /// executed and failed. This is done by looping over the task return IDs and + /// for each ID storing an object that represents a failure in the object + /// store. When clients retrieve these objects, they will raise + /// application-level exceptions. State for the task will be cleaned up as if + /// it were any other task that had been assigned, executed, and removed from + /// the local queue. /// /// \param spec The specification of the task. /// \return Void. - void TreatTaskAsFailed(const TaskSpecification &spec); + void TreatTaskAsFailed(const Task &task); /// Handle specified task's submission to the local node manager. /// /// \param task The task being submitted. @@ -258,20 +261,21 @@ class NodeManager { void KillWorker(std::shared_ptr worker); /// Methods for actor scheduling. - /// Handler for the creation of an actor, possibly on a remote node. + /// Handler for an actor state transition, for a newly created actor or an + /// actor that died. This method is idempotent and will ignore old state + /// transitions. /// /// \param actor_id The actor ID of the actor that was created. - /// \param data Data associated with the actor creation event. + /// \param data Data associated with the actor state transition. /// \return Void. - void HandleActorCreation(const ActorID &actor_id, - const std::vector &data); + void HandleActorStateTransition(const ActorID &actor_id, const ActorTableDataT &data); - /// When an actor dies, loop over all of the queued tasks for that actor and - /// treat them as failed. + /// Handler for an actor dying. The actor may be remote. /// - /// \param actor_id The actor that died. + /// \param actor_id The actor ID of the actor that died. + /// \param was_local Whether the actor was local. /// \return Void. - void CleanUpTasksForDeadActor(const ActorID &actor_id); + void HandleDisconnectedActor(const ActorID &actor_id, bool was_local); /// When a driver dies, loop over all of the queued tasks for that driver and /// treat them as failed. diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index 339f70da1..16e90d3e8 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -170,10 +170,12 @@ bool TaskDependencyManager::SubscribeDependencies( return (task_entry.num_missing_dependencies == 0); } -void TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) { +bool TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) { // Remove the task from the table of subscribed tasks. auto it = task_dependencies_.find(task_id); - RAY_CHECK(it != task_dependencies_.end()); + if (it == task_dependencies_.end()) { + return false; + } const TaskDependencies task_entry = std::move(it->second); task_dependencies_.erase(it); @@ -206,6 +208,8 @@ void TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) { for (const auto &object_id : task_entry.object_dependencies) { HandleRemoteDependencyCanceled(object_id); } + + return true; } std::vector TaskDependencyManager::GetPendingTasks() const { diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 5caae6663..bb49f4bc1 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -61,7 +61,8 @@ class TaskDependencyManager { /// then they will be canceled. /// /// \param task_id The ID of the task whose dependencies to unsubscribe from. - void UnsubscribeDependencies(const TaskID &task_id); + /// \return Whether the task was subscribed before. + bool UnsubscribeDependencies(const TaskID &task_id); /// Mark that the given task is pending execution. Any objects that it creates /// are now considered to be pending creation. If there are any subscribed diff --git a/test/actor_test.py b/test/actor_test.py index dc83d3d90..693b6cb50 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1260,7 +1260,14 @@ def test_blocking_actor_task(shutdown_only): def test_exception_raised_when_actor_node_dies(shutdown_only): - ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_cpus=1) + ray.worker._init( + start_ray_local=True, + num_local_schedulers=2, + num_cpus=1, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200, + "num_heartbeats_timeout": 10, + })) @ray.remote class Counter(object): @@ -1287,11 +1294,11 @@ def test_exception_raised_when_actor_node_dies(shutdown_only): ray.services.PROCESS_TYPE_PLASMA_STORE][1] process.kill() - # Submit some new actor tasks. - x_ids = [actor.inc.remote() for _ in range(5)] - - # Make sure that getting the result raises an exception. + # Submit some new actor tasks both before and after the node failure is + # detected. Make sure that getting the result raises an exception. for _ in range(10): + # Submit some new actor tasks. + x_ids = [actor.inc.remote() for _ in range(5)] for x_id in x_ids: with pytest.raises(ray.worker.RayGetError): # There is some small chance that ray.get will actually diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 454e9fb53..b9d257962 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import os +import json import signal import time @@ -262,7 +263,11 @@ def _test_component_failed(component_type): num_local_schedulers=num_local_schedulers, start_ray_local=True, num_cpus=[num_workers_per_scheduler] * num_local_schedulers, - redirect_output=True) + redirect_output=True, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 1000, + "num_heartbeats_timeout": 10, + })) # Submit many tasks with many dependencies. @ray.remote