From fae99ecb8e8d750bddcb3674f720f068541dc15d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 17 Feb 2020 18:35:25 -0800 Subject: [PATCH] [core] Make sure to unsubscribe get dependencies for direct task calls. (#7201) * fix * remove assert --- .../store_provider/plasma_store_provider.cc | 4 +++- src/ray/raylet/node_manager.cc | 12 ++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index ff110369f..c812a88e2 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -132,7 +132,9 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( Status UnblockIfNeeded(const std::shared_ptr &client, const WorkerContext &ctx) { if (ctx.CurrentTaskIsDirectCall()) { - if (ctx.ShouldReleaseResourcesOnBlockingCalls()) { + // NOTE: for direct call actors, we still need to issue an unblock IPC to release + // get subscriptions, even if the worker isn't blocked. + if (ctx.ShouldReleaseResourcesOnBlockingCalls() || ctx.CurrentActorIsDirectCall()) { return client->NotifyDirectCallTaskUnblocked(); } else { return Status::OK(); // We don't need to release resources. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fed1a7827..9cd0743fa 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2097,10 +2097,19 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w return; } - if (!worker || worker->GetAssignedTaskId().IsNil() || !worker->IsBlocked()) { + if (!worker || worker->GetAssignedTaskId().IsNil()) { return; // The worker may have died or is no longer processing the task. } TaskID task_id = worker->GetAssignedTaskId(); + + // First, always release task dependencies. This ensures we don't leak resources even + // if we don't need to unblock the worker below. + task_dependency_manager_.UnsubscribeGetDependencies(task_id); + + if (!worker->IsBlocked()) { + return; // Don't need to unblock the worker. + } + Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); const ResourceSet cpu_resources = required_resources.GetNumCpus(); @@ -2121,7 +2130,6 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w << cluster_resource_map_[self_node_id_].GetAvailableResources().ToString(); } worker->MarkUnblocked(); - task_dependency_manager_.UnsubscribeGetDependencies(task_id); } void NodeManager::AsyncResolveObjects(