mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 20:06:52 +08:00
[core] Make sure to unsubscribe get dependencies for direct task calls. (#7201)
* fix * remove assert
This commit is contained in:
@@ -132,7 +132,9 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
|
||||
Status UnblockIfNeeded(const std::shared_ptr<raylet::RayletClient> &client,
|
||||
const WorkerContext &ctx) {
|
||||
if (ctx.CurrentTaskIsDirectCall()) {
|
||||
if (ctx.ShouldReleaseResourcesOnBlockingCalls()) {
|
||||
// NOTE: for direct call actors, we still need to issue an unblock IPC to release
|
||||
// get subscriptions, even if the worker isn't blocked.
|
||||
if (ctx.ShouldReleaseResourcesOnBlockingCalls() || ctx.CurrentActorIsDirectCall()) {
|
||||
return client->NotifyDirectCallTaskUnblocked();
|
||||
} else {
|
||||
return Status::OK(); // We don't need to release resources.
|
||||
|
||||
@@ -2097,10 +2097,19 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr<Worker> &w
|
||||
return;
|
||||
}
|
||||
|
||||
if (!worker || worker->GetAssignedTaskId().IsNil() || !worker->IsBlocked()) {
|
||||
if (!worker || worker->GetAssignedTaskId().IsNil()) {
|
||||
return; // The worker may have died or is no longer processing the task.
|
||||
}
|
||||
TaskID task_id = worker->GetAssignedTaskId();
|
||||
|
||||
// First, always release task dependencies. This ensures we don't leak resources even
|
||||
// if we don't need to unblock the worker below.
|
||||
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
|
||||
|
||||
if (!worker->IsBlocked()) {
|
||||
return; // Don't need to unblock the worker.
|
||||
}
|
||||
|
||||
Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING);
|
||||
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
|
||||
const ResourceSet cpu_resources = required_resources.GetNumCpus();
|
||||
@@ -2121,7 +2130,6 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr<Worker> &w
|
||||
<< cluster_resource_map_[self_node_id_].GetAvailableResources().ToString();
|
||||
}
|
||||
worker->MarkUnblocked();
|
||||
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
|
||||
}
|
||||
|
||||
void NodeManager::AsyncResolveObjects(
|
||||
|
||||
Reference in New Issue
Block a user