Fix crash in HandleObjectMissing when direct actor creation task is not found in local_queues_ (#6817)

This commit is contained in:
Zhijun Fu
2020-01-18 03:29:13 +08:00
committed by Edward Oakes
parent 30776450a3
commit 92380dd4e6
2 changed files with 51 additions and 3 deletions
+48 -3
View File
@@ -2722,17 +2722,62 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
}
}
bool NodeManager::IsDirectActorCreationTask(const TaskID &task_id) {
auto actor_id = task_id.ActorId();
if (!actor_id.IsNil() && task_id == TaskID::ForActorCreationTask(actor_id)) {
// This task ID corresponds to an actor creation task.
auto iter = actor_registry_.find(actor_id);
if (iter != actor_registry_.end() && iter->second.GetTableData().is_direct_call()) {
// This actor is direct call actor.
return true;
}
}
return false;
}
void NodeManager::HandleObjectMissing(const ObjectID &object_id) {
// Notify the task dependency manager that this object is no longer local.
const auto waiting_task_ids = task_dependency_manager_.HandleObjectMissing(object_id);
RAY_LOG(DEBUG) << "Object missing " << object_id << ", "
<< " on " << self_node_id_ << waiting_task_ids.size()
<< " tasks waiting";
std::stringstream result;
result << "Object missing " << object_id << ", "
<< " on " << self_node_id_ << ", " << waiting_task_ids.size()
<< " tasks waiting";
if (waiting_task_ids.size() > 0) {
result << ", tasks: ";
for (const auto &task_id : waiting_task_ids) {
result << task_id << " ";
}
}
RAY_LOG(DEBUG) << result.str();
// Transition any tasks that were in the runnable state and are dependent on
// this object to the waiting state.
if (!waiting_task_ids.empty()) {
std::unordered_set<TaskID> waiting_task_id_set(waiting_task_ids.begin(),
waiting_task_ids.end());
// NOTE(zhijunfu): For direct actors, the worker is initially assigned actor
// creation task ID, which will not be reset after the task finishes. And later tasks
// of this actor will reuse this task ID to require objects from plasma with
// FetchOrReconstruct, since direct actor task IDs are not known to raylet.
// To support actor reconstruction for direct actor, raylet marks actor creation task
// as completed and removes it from `local_queues_` when it receives `TaskDone`
// message from worker. This is necessary because the actor creation task will be
// re-submitted during reconstruction, if the task is not removed previously, the new
// submitted task will be marked as duplicate and thus ignored.
// So here we check for direct actor creation task explicitly to allow this case.
auto iter = waiting_task_id_set.begin();
while (iter != waiting_task_id_set.end()) {
if (IsDirectActorCreationTask(*iter)) {
RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter
<< " when handling object missing for " << object_id;
iter = waiting_task_id_set.erase(iter);
} else {
++iter;
}
}
// First filter out any tasks that can't be transitioned to READY. These
// are running workers or drivers, now blocked in a get.
local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING);
+3
View File
@@ -578,6 +578,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Repeat the process as long as we can schedule a task.
void NewSchedulerSchedulePendingTasks();
/// Whether a task is an direct actor creation task.
bool IsDirectActorCreationTask(const TaskID &task_id);
/// ID of this node.
ClientID self_node_id_;
boost::asio::io_service &io_service_;