From 92380dd4e64eb0477eee2e6a8f57fb6b1116f41c Mon Sep 17 00:00:00 2001 From: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com> Date: Sat, 18 Jan 2020 03:29:13 +0800 Subject: [PATCH] Fix crash in HandleObjectMissing when direct actor creation task is not found in local_queues_ (#6817) --- src/ray/raylet/node_manager.cc | 51 ++++++++++++++++++++++++++++++++-- src/ray/raylet/node_manager.h | 3 ++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d5e8605a1..1ece8f99a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2722,17 +2722,62 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { } } +bool NodeManager::IsDirectActorCreationTask(const TaskID &task_id) { + auto actor_id = task_id.ActorId(); + if (!actor_id.IsNil() && task_id == TaskID::ForActorCreationTask(actor_id)) { + // This task ID corresponds to an actor creation task. + auto iter = actor_registry_.find(actor_id); + if (iter != actor_registry_.end() && iter->second.GetTableData().is_direct_call()) { + // This actor is direct call actor. + return true; + } + } + + return false; +} + void NodeManager::HandleObjectMissing(const ObjectID &object_id) { // Notify the task dependency manager that this object is no longer local. const auto waiting_task_ids = task_dependency_manager_.HandleObjectMissing(object_id); - RAY_LOG(DEBUG) << "Object missing " << object_id << ", " - << " on " << self_node_id_ << waiting_task_ids.size() - << " tasks waiting"; + std::stringstream result; + result << "Object missing " << object_id << ", " + << " on " << self_node_id_ << ", " << waiting_task_ids.size() + << " tasks waiting"; + if (waiting_task_ids.size() > 0) { + result << ", tasks: "; + for (const auto &task_id : waiting_task_ids) { + result << task_id << " "; + } + } + RAY_LOG(DEBUG) << result.str(); + // Transition any tasks that were in the runnable state and are dependent on // this object to the waiting state. if (!waiting_task_ids.empty()) { std::unordered_set waiting_task_id_set(waiting_task_ids.begin(), waiting_task_ids.end()); + + // NOTE(zhijunfu): For direct actors, the worker is initially assigned actor + // creation task ID, which will not be reset after the task finishes. And later tasks + // of this actor will reuse this task ID to require objects from plasma with + // FetchOrReconstruct, since direct actor task IDs are not known to raylet. + // To support actor reconstruction for direct actor, raylet marks actor creation task + // as completed and removes it from `local_queues_` when it receives `TaskDone` + // message from worker. This is necessary because the actor creation task will be + // re-submitted during reconstruction, if the task is not removed previously, the new + // submitted task will be marked as duplicate and thus ignored. + // So here we check for direct actor creation task explicitly to allow this case. + auto iter = waiting_task_id_set.begin(); + while (iter != waiting_task_id_set.end()) { + if (IsDirectActorCreationTask(*iter)) { + RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter + << " when handling object missing for " << object_id; + iter = waiting_task_id_set.erase(iter); + } else { + ++iter; + } + } + // First filter out any tasks that can't be transitioned to READY. These // are running workers or drivers, now blocked in a get. local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index e360b7460..d6bf6a492 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -578,6 +578,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Repeat the process as long as we can schedule a task. void NewSchedulerSchedulePendingTasks(); + /// Whether a task is an direct actor creation task. + bool IsDirectActorCreationTask(const TaskID &task_id); + /// ID of this node. ClientID self_node_id_; boost::asio::io_service &io_service_;