diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index ff110369f..c812a88e2 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -132,7 +132,9 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( Status UnblockIfNeeded(const std::shared_ptr &client, const WorkerContext &ctx) { if (ctx.CurrentTaskIsDirectCall()) { - if (ctx.ShouldReleaseResourcesOnBlockingCalls()) { + // NOTE: for direct call actors, we still need to issue an unblock IPC to release + // get subscriptions, even if the worker isn't blocked. + if (ctx.ShouldReleaseResourcesOnBlockingCalls() || ctx.CurrentActorIsDirectCall()) { return client->NotifyDirectCallTaskUnblocked(); } else { return Status::OK(); // We don't need to release resources. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fed1a7827..9cd0743fa 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2097,10 +2097,19 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w return; } - if (!worker || worker->GetAssignedTaskId().IsNil() || !worker->IsBlocked()) { + if (!worker || worker->GetAssignedTaskId().IsNil()) { return; // The worker may have died or is no longer processing the task. } TaskID task_id = worker->GetAssignedTaskId(); + + // First, always release task dependencies. This ensures we don't leak resources even + // if we don't need to unblock the worker below. + task_dependency_manager_.UnsubscribeGetDependencies(task_id); + + if (!worker->IsBlocked()) { + return; // Don't need to unblock the worker. + } + Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); const ResourceSet cpu_resources = required_resources.GetNumCpus(); @@ -2121,7 +2130,6 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w << cluster_resource_map_[self_node_id_].GetAvailableResources().ToString(); } worker->MarkUnblocked(); - task_dependency_manager_.UnsubscribeGetDependencies(task_id); } void NodeManager::AsyncResolveObjects(