diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 1e7252b31..a963f6b15 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -8,6 +8,7 @@ import subprocess import time import ray +from ray.utils import _random_string from ray.tests.utils import (run_and_get_output, run_string_as_driver, run_string_as_driver_nonblocking) @@ -409,7 +410,8 @@ def test_driver_exiting_when_worker_blocked(call_ray_start): ray.init(redis_address=redis_address) - # Define a driver that creates an actor and exits. + # Define a driver that creates two tasks, one that runs forever and the + # other blocked on the first. driver_script = """ import time import ray @@ -432,6 +434,32 @@ print("success") # Make sure the first driver ran to completion. assert "success" in out + nonexistent_id_bytes = _random_string() + nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes) + # Define a driver that creates one task that depends on a nonexistent + # object. This task will be queued as waiting to execute. + driver_script = """ +import time +import ray +ray.init(redis_address="{}") +@ray.remote +def g(x): + return +g.remote(ray.ObjectID(ray.utils.hex_to_binary("{}"))) +time.sleep(1) +print("success") +""".format(redis_address, nonexistent_id_hex) + + # Create some drivers and let them exit and make sure everything is + # still alive. + for _ in range(3): + out = run_string_as_driver(driver_script) + # Simulate the nonexistent dependency becoming available. + ray.worker.global_worker.put_object( + ray.ObjectID(nonexistent_id_bytes), None) + # Make sure the first driver ran to completion. + assert "success" in out + @ray.remote def f(): return 1 diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f94ddaeb1..72211ddd9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -592,9 +592,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, void NodeManager::CleanUpTasksForDeadDriver(const DriverID &driver_id) { auto tasks_to_remove = local_queues_.GetTaskIdsForDriver(driver_id); - local_queues_.RemoveTasks(tasks_to_remove); - task_dependency_manager_.RemoveTasksAndRelatedObjects(tasks_to_remove); + // NOTE(swang): SchedulingQueue::RemoveTasks modifies its argument so we must + // call it last. + local_queues_.RemoveTasks(tasks_to_remove); } void NodeManager::ProcessNewClient(LocalClientConnection &client) {