mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 17:11:05 +08:00
[New scheduler] Also unsubscribe get dependencies on unblock
This commit is contained in:
@@ -2181,6 +2181,15 @@ void NodeManager::HandleDirectCallTaskBlocked(
|
||||
|
||||
void NodeManager::HandleDirectCallTaskUnblocked(
|
||||
const std::shared_ptr<WorkerInterface> &worker) {
|
||||
if (!worker || worker->GetAssignedTaskId().IsNil()) {
|
||||
return; // The worker may have died or is no longer processing the task.
|
||||
}
|
||||
TaskID task_id = worker->GetAssignedTaskId();
|
||||
|
||||
// First, always release task dependencies. This ensures we don't leak resources even
|
||||
// if we don't need to unblock the worker below.
|
||||
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
|
||||
|
||||
if (new_scheduler_enabled_) {
|
||||
// Important: avoid double unblocking if the unblock RPC finishes after task end.
|
||||
if (!worker || !worker->IsBlocked()) {
|
||||
@@ -2200,15 +2209,6 @@ void NodeManager::HandleDirectCallTaskUnblocked(
|
||||
return;
|
||||
}
|
||||
|
||||
if (!worker || worker->GetAssignedTaskId().IsNil()) {
|
||||
return; // The worker may have died or is no longer processing the task.
|
||||
}
|
||||
TaskID task_id = worker->GetAssignedTaskId();
|
||||
|
||||
// First, always release task dependencies. This ensures we don't leak resources even
|
||||
// if we don't need to unblock the worker below.
|
||||
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
|
||||
|
||||
if (!worker->IsBlocked()) {
|
||||
return; // Don't need to unblock the worker.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user