Fix bug in cluster mode where driver exits when there are tasks in the waiting queue (#4251)

This commit is contained in:
Stephanie Wang
2019-03-20 10:18:27 -07:00
committed by Robert Nishihara
parent 8ce7565530
commit 4ac9c1ed6e
2 changed files with 32 additions and 3 deletions
+29 -1
View File
@@ -8,6 +8,7 @@ import subprocess
import time
import ray
from ray.utils import _random_string
from ray.tests.utils import (run_and_get_output, run_string_as_driver,
run_string_as_driver_nonblocking)
@@ -409,7 +410,8 @@ def test_driver_exiting_when_worker_blocked(call_ray_start):
ray.init(redis_address=redis_address)
# Define a driver that creates an actor and exits.
# Define a driver that creates two tasks, one that runs forever and the
# other blocked on the first.
driver_script = """
import time
import ray
@@ -432,6 +434,32 @@ print("success")
# Make sure the first driver ran to completion.
assert "success" in out
nonexistent_id_bytes = _random_string()
nonexistent_id_hex = ray.utils.binary_to_hex(nonexistent_id_bytes)
# Define a driver that creates one task that depends on a nonexistent
# object. This task will be queued as waiting to execute.
driver_script = """
import time
import ray
ray.init(redis_address="{}")
@ray.remote
def g(x):
return
g.remote(ray.ObjectID(ray.utils.hex_to_binary("{}")))
time.sleep(1)
print("success")
""".format(redis_address, nonexistent_id_hex)
# Create some drivers and let them exit and make sure everything is
# still alive.
for _ in range(3):
out = run_string_as_driver(driver_script)
# Simulate the nonexistent dependency becoming available.
ray.worker.global_worker.put_object(
ray.ObjectID(nonexistent_id_bytes), None)
# Make sure the first driver ran to completion.
assert "success" in out
@ray.remote
def f():
return 1