From 1627f8994556a08006765092d831b5b2568fd08d Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 26 Apr 2017 15:13:39 -0700 Subject: [PATCH] Fix problem in which actors and workers running tasks are not killed by driver exit. (#490) * Augment test to verify that relevant workers and actors are killed during driver cleanup. * Fix bug in which we were only killing one worker when a driver exited. * Fix remove driver test. * Fix and augment test. --- python/ray/test/multi_node_tests.py | 35 ++- src/local_scheduler/local_scheduler.cc | 2 - test/jenkins_tests/multi_node_docker_test.py | 6 +- .../multi_node_tests/remove_driver_test.py | 210 +++++++++++++++--- test/jenkins_tests/run_multi_node_tests.sh | 3 +- 5 files changed, 208 insertions(+), 48 deletions(-) diff --git a/python/ray/test/multi_node_tests.py b/python/ray/test/multi_node_tests.py index 6b0277541..f9d1488ae 100644 --- a/python/ray/test/multi_node_tests.py +++ b/python/ray/test/multi_node_tests.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import redis import time @@ -53,34 +54,48 @@ def _wait_for_nodes_to_join(num_nodes, timeout=20): num_nodes)) -def _broadcast_event(event_name, redis_address): +def _broadcast_event(event_name, redis_address, data=None): """Broadcast an event. + This is used to synchronize drivers for the multi-node tests. + Args: event_name: The name of the event to wait for. redis_address: The address of the Redis server to use for synchronization. - - This is used to synchronize drivers for the multi-node tests. + data: Extra data to include in the broadcast (this will be returned by the + corresponding _wait_for_event call). This data must be json serializable. """ redis_host, redis_port = redis_address.split(":") redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) - redis_client.rpush(EVENT_KEY, event_name) + payload = json.dumps((event_name, data)) + redis_client.rpush(EVENT_KEY, payload) -def _wait_for_event(event_name, redis_address, extra_buffer=1): +def _wait_for_event(event_name, redis_address, extra_buffer=0): """Block until an event has been broadcast. + This is used to synchronize drivers for the multi-node tests. + Args: event_name: The name of the event to wait for. redis_address: The address of the Redis server to use for synchronization. extra_buffer: An amount of time in seconds to wait after the event. - This is used to synchronize drivers for the multi-node tests. + Returns: + The data that was passed into the corresponding _broadcast_event call. """ redis_host, redis_port = redis_address.split(":") redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) while True: - event_names = redis_client.lrange(EVENT_KEY, 0, -1) - if event_name.encode("ascii") in event_names: - break - time.sleep(extra_buffer) + event_infos = redis_client.lrange(EVENT_KEY, 0, -1) + events = dict() + for event_info in event_infos: + name, data = json.loads(event_info) + if name in events: + raise Exception("The same event {} was broadcast twice.".format(name)) + events[name] = data + if event_name in events: + # Potentially sleep a little longer and then return the event data. + time.sleep(extra_buffer) + return events[event_name] + time.sleep(0.1) diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 5ca04b371..cd99a0eac 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -746,13 +746,11 @@ void handle_driver_removed_callback(WorkerID driver_id, void *user_context) { /* This actor was created by the removed driver, so kill the actor. */ LOG_DEBUG("Killing an actor for a removed driver."); kill_worker(state, *it, false); - break; } } else if (task != NULL) { if (WorkerID_equal(TaskSpec_driver_id(Task_task_spec(task)), driver_id)) { LOG_DEBUG("Killing a worker executing a task for a removed driver."); kill_worker(state, *it, false); - break; } } diff --git a/test/jenkins_tests/multi_node_docker_test.py b/test/jenkins_tests/multi_node_docker_test.py index c7ccfac7d..9362f2bd9 100644 --- a/test/jenkins_tests/multi_node_docker_test.py +++ b/test/jenkins_tests/multi_node_docker_test.py @@ -275,6 +275,10 @@ if __name__ == "__main__": num_gpus = ([int(i) for i in args.num_gpus.split(",")] if args.num_gpus is not None else num_nodes * [0]) + # Parse the driver locations. + driver_locations = (None if args.driver_locations is None + else [int(i) for i in args.driver_locations.split(",")]) + d = DockerRunner() d.start_ray(docker_image=args.docker_image, mem_size=args.mem_size, shm_size=args.shm_size, num_nodes=num_nodes, @@ -282,7 +286,7 @@ if __name__ == "__main__": development_mode=args.development_mode) try: run_results = d.run_test(args.test_script, args.num_drivers, - driver_locations=args.driver_locations) + driver_locations=driver_locations) finally: d.stop_ray() diff --git a/test/jenkins_tests/multi_node_tests/remove_driver_test.py b/test/jenkins_tests/multi_node_tests/remove_driver_test.py index 35ffc6d65..2e71224d1 100644 --- a/test/jenkins_tests/multi_node_tests/remove_driver_test.py +++ b/test/jenkins_tests/multi_node_tests/remove_driver_test.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import os +import psutil import time import ray @@ -11,38 +12,106 @@ from ray.test.multi_node_tests import (_wait_for_nodes_to_join, _wait_for_event) # This test should be run with 5 nodes, which have 0, 1, 2, 3, and 4 GPUs for a -# total of 10 GPUs. It shoudl be run with 3 drivers. +# total of 10 GPUs. It should be run with 7 drivers. Drivers 2 through 6 must +# run on different nodes so they can check if all the relevant workers on all +# the nodes have been killed. total_num_nodes = 5 +def pid_alive(pid): + """Check if the process with this PID is alive or not. + + Args: + pid: The pid to check. + + Returns: + This returns false if the process is dead or defunct. Otherwise, it returns + true. + """ + try: + os.kill(pid, 0) + except OSError: + return False + else: + if psutil.Process(pid).status() == psutil.STATUS_ZOMBIE: + return False + else: + return True + + +def actor_event_name(driver_index, actor_index): + return "DRIVER_{}_ACTOR_{}_RUNNING".format(driver_index, actor_index) + + +def remote_function_event_name(driver_index, task_index): + return "DRIVER_{}_TASK_{}_RUNNING".format(driver_index, task_index) + + +@ray.remote +def long_running_task(driver_index, task_index, redis_address): + _broadcast_event(remote_function_event_name(driver_index, task_index), + redis_address, + data=(ray.services.get_node_ip_address(), os.getpid())) + # Loop forever. + while True: + time.sleep(100) + + +num_long_running_tasks_per_driver = 2 + + @ray.actor class Actor0(object): - def __init__(self): + def __init__(self, driver_index, actor_index, redis_address): + _broadcast_event(actor_event_name(driver_index, actor_index), + redis_address, + data=(ray.services.get_node_ip_address(), os.getpid())) assert len(ray.get_gpu_ids()) == 0 def check_ids(self): assert len(ray.get_gpu_ids()) == 0 + def long_running_method(self): + # Loop forever. + while True: + time.sleep(100) + @ray.actor(num_gpus=1) class Actor1(object): - def __init__(self): + def __init__(self, driver_index, actor_index, redis_address): + _broadcast_event(actor_event_name(driver_index, actor_index), + redis_address, + data=(ray.services.get_node_ip_address(), os.getpid())) assert len(ray.get_gpu_ids()) == 1 def check_ids(self): assert len(ray.get_gpu_ids()) == 1 + def long_running_method(self): + # Loop forever. + while True: + time.sleep(100) + @ray.actor(num_gpus=2) class Actor2(object): - def __init__(self): + def __init__(self, driver_index, actor_index, redis_address): + _broadcast_event(actor_event_name(driver_index, actor_index), + redis_address, + data=(ray.services.get_node_ip_address(), os.getpid())) assert len(ray.get_gpu_ids()) == 2 def check_ids(self): assert len(ray.get_gpu_ids()) == 2 + def long_running_method(self): + # Loop forever. + while True: + time.sleep(100) -def driver_0(redis_address): + +def driver_0(redis_address, driver_index): """The script for driver 0. This driver should create five actors that each use one GPU and some actors @@ -53,19 +122,29 @@ def driver_0(redis_address): # Wait for all the nodes to join the cluster. _wait_for_nodes_to_join(total_num_nodes) + # Start some long running task. Driver 2 will make sure the worker running + # this task has been killed. + for i in range(num_long_running_tasks_per_driver): + long_running_task.remote(driver_index, i, redis_address) + # Create some actors that require one GPU. - actors_one_gpu = [Actor1() for _ in range(5)] + actors_one_gpu = [Actor1(driver_index, i, redis_address) for i in range(5)] # Create some actors that don't require any GPUs. - actors_no_gpus = [Actor0() for _ in range(5)] + actors_no_gpus = [Actor0(driver_index, 5 + i, redis_address) + for i in range(5)] for _ in range(1000): ray.get([actor.check_ids() for actor in actors_one_gpu]) ray.get([actor.check_ids() for actor in actors_no_gpus]) + # Start a long-running method on one actor and make sure this doesn't affect + # anything. + actors_no_gpus[0].long_running_method() + _broadcast_event("DRIVER_0_DONE", redis_address) -def driver_1(redis_address): +def driver_1(redis_address, driver_index): """The script for driver 1. This driver should create one actor that uses two GPUs, three actors that @@ -77,39 +156,58 @@ def driver_1(redis_address): # Wait for all the nodes to join the cluster. _wait_for_nodes_to_join(total_num_nodes) + # Start some long running task. Driver 2 will make sure the worker running + # this task has been killed. + for i in range(num_long_running_tasks_per_driver): + long_running_task.remote(driver_index, i, redis_address) + # Create an actor that requires two GPUs. - actors_two_gpus = [Actor2() for _ in range(1)] + actors_two_gpus = [Actor2(driver_index, i, redis_address) for i in range(1)] # Create some actors that require one GPU. - actors_one_gpu = [Actor1() for _ in range(3)] + actors_one_gpu = [Actor1(driver_index, 1 + i, redis_address) + for i in range(3)] # Create some actors that don't require any GPUs. - actors_no_gpus = [Actor0() for _ in range(5)] + actors_no_gpus = [Actor0(driver_index, 1 + 3 + i, redis_address) + for i in range(5)] for _ in range(1000): ray.get([actor.check_ids() for actor in actors_two_gpus]) ray.get([actor.check_ids() for actor in actors_one_gpu]) ray.get([actor.check_ids() for actor in actors_no_gpus]) + # Start a long-running method on one actor and make sure this doesn't affect + # anything. + actors_one_gpu[0].long_running_method() + _broadcast_event("DRIVER_1_DONE", redis_address) -def driver_2(redis_address): - """The script for driver 2. +def cleanup_driver(redis_address, driver_index): + """The script for drivers 2 through 6. This driver should wait for the first two drivers to finish. Then it should create some actors that use a total of ten GPUs. """ ray.init(redis_address=redis_address) + # Only one of the cleanup drivers should create more actors. + if driver_index == 2: + # We go ahead and create some actors that don't require any GPUs. We don't + # need to wait for the other drivers to finish. We call methods on these + # actors later to make sure they haven't been killed. + actors_no_gpus = [Actor0(driver_index, i, redis_address) + for i in range(10)] + _wait_for_event("DRIVER_0_DONE", redis_address) _wait_for_event("DRIVER_1_DONE", redis_address) - def try_to_create_actor(actor_class, timeout=20): + def try_to_create_actor(actor_class, driver_index, actor_index, timeout=20): # Try to create an actor, but allow failures while we wait for the monitor # to release the resources for the removed drivers. start_time = time.time() while time.time() - start_time < timeout: try: - actor = actor_class() + actor = actor_class(driver_index, actor_index, redis_address) except Exception as e: time.sleep(0.1) else: @@ -117,23 +215,67 @@ def driver_2(redis_address): # If we are here, then we timed out while looping. raise Exception("Timed out while trying to create actor.") - # Create some actors that require two GPUs. - actors_two_gpus = [] - for _ in range(3): - actors_two_gpus.append(try_to_create_actor(Actor2)) - # Create some actors that require one GPU. - actors_one_gpu = [] - for _ in range(4): - actors_one_gpu.append(try_to_create_actor(Actor1)) - # Create some actors that don't require any GPUs. - actors_no_gpus = [Actor0() for _ in range(5)] + # Only one of the cleanup drivers should create more actors. + if driver_index == 2: + # Create some actors that require two GPUs. + actors_two_gpus = [] + for i in range(3): + actors_two_gpus.append(try_to_create_actor(Actor2, driver_index, 10 + i)) + # Create some actors that require one GPU. + actors_one_gpu = [] + for i in range(4): + actors_one_gpu.append(try_to_create_actor(Actor1, driver_index, + 10 + 3 + i)) - for _ in range(1000): - ray.get([actor.check_ids() for actor in actors_two_gpus]) - ray.get([actor.check_ids() for actor in actors_one_gpu]) - ray.get([actor.check_ids() for actor in actors_no_gpus]) + def wait_for_pid_to_exit(pid, timeout=20): + start_time = time.time() + while time.time() - start_time < timeout: + if not pid_alive(pid): + return + time.sleep(0.1) + raise Exception("Timed out while waiting for process to exit.") - _broadcast_event("DRIVER_2_DONE", redis_address) + removed_workers = 0 + + # Make sure that the PIDs for the long-running tasks from driver 0 and driver + # 1 have been killed. + for i in range(num_long_running_tasks_per_driver): + node_ip_address, pid = _wait_for_event(remote_function_event_name(0, i), + redis_address) + if node_ip_address == ray.services.get_node_ip_address(): + wait_for_pid_to_exit(pid) + removed_workers += 1 + for i in range(num_long_running_tasks_per_driver): + node_ip_address, pid = _wait_for_event(remote_function_event_name(1, i), + redis_address) + if node_ip_address == ray.services.get_node_ip_address(): + wait_for_pid_to_exit(pid) + removed_workers += 1 + # Make sure that the PIDs for the actors from driver 0 and driver 1 have been + # killed. + for i in range(10): + node_ip_address, pid = _wait_for_event(actor_event_name(0, i), + redis_address) + if node_ip_address == ray.services.get_node_ip_address(): + wait_for_pid_to_exit(pid) + removed_workers += 1 + for i in range(9): + node_ip_address, pid = _wait_for_event(actor_event_name(1, i), + redis_address) + if node_ip_address == ray.services.get_node_ip_address(): + wait_for_pid_to_exit(pid) + removed_workers += 1 + + print("{} workers/actors were removed on this node.".format(removed_workers)) + + # Only one of the cleanup drivers should create and use more actors. + if driver_index == 2: + for _ in range(1000): + ray.get([actor.check_ids() for actor in actors_two_gpus]) + ray.get([actor.check_ids() for actor in actors_one_gpu]) + ray.get([actor.check_ids() for actor in actors_no_gpus]) + + _broadcast_event("DRIVER_{}_DONE".format(driver_index), redis_address) if __name__ == "__main__": @@ -142,11 +284,11 @@ if __name__ == "__main__": print("Driver {} started at {}.".format(driver_index, time.time())) if driver_index == 0: - driver_0(redis_address) + driver_0(redis_address, driver_index) elif driver_index == 1: - driver_1(redis_address) - elif driver_index == 2: - driver_2(redis_address) + driver_1(redis_address, driver_index) + elif driver_index in [2, 3, 4, 5, 6]: + cleanup_driver(redis_address, driver_index) else: raise Exception("This code should be unreachable.") diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index ded001b71..1d352be98 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -17,7 +17,8 @@ python $ROOT_DIR/multi_node_docker_test.py \ --docker-image=$DOCKER_SHA \ --num-nodes=5 \ --num-gpus=0,1,2,3,4 \ - --num-drivers=3 \ + --num-drivers=7 \ + --driver-locations=0,1,0,1,2,3,4 \ --test-script=/ray/test/jenkins_tests/multi_node_tests/remove_driver_test.py python $ROOT_DIR/multi_node_docker_test.py \