diff --git a/python/ray/test/test_utils.py b/python/ray/test/test_utils.py index 204f20c6a..14d627fb4 100644 --- a/python/ray/test/test_utils.py +++ b/python/ray/test/test_utils.py @@ -37,14 +37,19 @@ def _wait_for_nodes_to_join(num_nodes, timeout=20): ready = True # Check that for each node, a local scheduler and a plasma manager # are present. - for ip_address, clients in client_table.items(): - client_types = [client["ClientType"] for client in clients] - if "local_scheduler" not in client_types: - ready = False - if "plasma_manager" not in client_types: - ready = False - if ready: + if ray.global_state.use_raylet: + # In raylet mode, this is a list of map. + # The GCS info will appear as a whole instead of part by part. return + else: + for ip_address, clients in client_table.items(): + client_types = [client["ClientType"] for client in clients] + if "local_scheduler" not in client_types: + ready = False + if "plasma_manager" not in client_types: + ready = False + if ready: + return if num_ready_nodes > num_nodes: # Too many nodes have joined. Something must be wrong. raise Exception("{} nodes have joined the cluster, but we were " diff --git a/python/ray/worker.py b/python/ray/worker.py index 1c3714284..2e3ca09a5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1411,7 +1411,12 @@ def get_address_info_from_redis_helper(redis_address, (client_node_ip_address == "127.0.0.1" and redis_ip_address == ray.services.get_node_ip_address())): raylets.append(client) - + # Make sure that at least one raylet has started locally. + # This handles a race condition where Redis has started but + # the raylet has not connected. + if len(raylets) == 0: + raise Exception( + "Redis has started but no raylets have registered yet.") object_store_addresses = [ services.ObjectStoreAddress( name=ray.utils.decode(raylet.ObjectStoreSocketName()), diff --git a/test/jenkins_tests/multi_node_docker_test.py b/test/jenkins_tests/multi_node_docker_test.py index bb2a7d6d5..3b3a13fa3 100644 --- a/test/jenkins_tests/multi_node_docker_test.py +++ b/test/jenkins_tests/multi_node_docker_test.py @@ -3,8 +3,9 @@ from __future__ import division from __future__ import print_function import argparse -import numpy as np +import datetime import os +import random import re import signal import subprocess @@ -32,7 +33,15 @@ def wait_for_output(proc): Returns: A tuple of the stdout and stderr of the process as strings. """ - stdout_data, stderr_data = proc.communicate() + try: + # NOTE: This test must be run with Python 3. + stdout_data, stderr_data = proc.communicate(timeout=200) + except subprocess.TimeoutExpired: + # Timeout: kill the process. + # Get the remaining message from PIPE for debugging purpose. + print("Killing process because it timed out.") + proc.kill() + stdout_data, stderr_data = proc.communicate() if stdout_data is not None: try: @@ -71,11 +80,12 @@ class DockerRunner(object): head node. """ - def __init__(self): + def __init__(self, use_raylet): """Initialize the DockerRunner.""" self.head_container_id = None self.worker_container_ids = [] self.head_container_ip = None + self.use_raylet = use_raylet def _get_container_id(self, stdout_data): """Parse the docker container ID from stdout_data. @@ -139,6 +149,8 @@ class DockerRunner(object): "--num-cpus={}".format(num_cpus), "--num-gpus={}".format(num_gpus), "--no-ui" ]) + if self.use_raylet: + command.append("--use-raylet") print("Starting head node with command:{}".format(command)) proc = subprocess.Popen( @@ -165,6 +177,8 @@ class DockerRunner(object): "--redis-address={:s}:6379".format(self.head_container_ip), "--num-cpus={}".format(num_cpus), "--num-gpus={}".format(num_gpus) ]) + if self.use_raylet: + command.append("--use-raylet") print("Starting worker node with command:{}".format(command)) proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -289,13 +303,16 @@ class DockerRunner(object): Raises: Exception: An exception is raised if the timeout expires. """ + print("Multi-node docker test started at: {}".format( + datetime.datetime.now())) all_container_ids = ( [self.head_container_id] + self.worker_container_ids) if driver_locations is None: driver_locations = [ - np.random.randint(0, len(all_container_ids)) - for _ in range(num_drivers) + random.randrange(0, len(all_container_ids)) + for i in range(num_drivers) ] + print("driver_locations: {}".format(driver_locations)) # Define a signal handler and set an alarm to go off in # timeout_seconds. @@ -308,13 +325,19 @@ class DockerRunner(object): # Start the different drivers. driver_processes = [] + if self.use_raylet: + use_raylet_env = 1 + else: + use_raylet_env = 0 for i in range(len(driver_locations)): # Get the container ID to run the ith driver in. container_id = all_container_ids[driver_locations[i]] command = [ - "docker", "exec", container_id, "/bin/bash", "-c", - ("RAY_REDIS_ADDRESS={}:6379 RAY_DRIVER_INDEX={} python " - "{}".format(self.head_container_ip, i, test_script)) + "docker", "exec", container_id, "/bin/bash", + "-c", ("RAY_REDIS_ADDRESS={}:6379 RAY_DRIVER_INDEX={} " + "RAY_USE_XRAY={} python {}".format( + self.head_container_ip, i, use_raylet_env, + test_script)) ] print("Starting driver with command {}.".format(test_script)) # Start the driver. @@ -322,7 +345,6 @@ class DockerRunner(object): command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) driver_processes.append(p) - # Wait for the drivers to finish. results = [] for p in driver_processes: stdout_data, stderr_data = wait_for_output(p) @@ -337,7 +359,8 @@ class DockerRunner(object): # Disable the alarm. signal.alarm(0) - + print("Multi-node docker test ended at: {}".format( + datetime.datetime.now())) return results @@ -381,6 +404,8 @@ if __name__ == "__main__": "--development-mode", action="store_true", help="use local copies of the test scripts") + parser.add_argument( + "--use-raylet", action="store_true", help="use raylet mode in Docker") args = parser.parse_args() # Parse the number of CPUs and GPUs to use for each worker. @@ -394,7 +419,7 @@ if __name__ == "__main__": driver_locations = (None if args.driver_locations is None else [int(i) for i in args.driver_locations.split(",")]) - d = DockerRunner() + d = DockerRunner(args.use_raylet) d.start_ray( docker_image=args.docker_image, mem_size=args.mem_size, diff --git a/test/jenkins_tests/multi_node_tests/remove_driver_test.py b/test/jenkins_tests/multi_node_tests/remove_driver_test.py index d18afed6e..4b61634b3 100644 --- a/test/jenkins_tests/multi_node_tests/remove_driver_test.py +++ b/test/jenkins_tests/multi_node_tests/remove_driver_test.py @@ -208,14 +208,9 @@ def cleanup_driver(redis_address, driver_index): # Only one of the cleanup drivers should create more actors. if driver_index == 2: - # Create some actors that require two GPUs. - actors_two_gpus = [] - for i in range(3): - actors_two_gpus.append( - try_to_create_actor(Actor2, driver_index, 10 + i)) # Create some actors that require one GPU. actors_one_gpu = [] - for i in range(4): + for i in range(10): actors_one_gpu.append( try_to_create_actor(Actor1, driver_index, 10 + 3 + i)) @@ -256,7 +251,6 @@ def cleanup_driver(redis_address, driver_index): # Only one of the cleanup drivers should create and use more actors. if driver_index == 2: for _ in range(1000): - ray.get([actor.check_ids.remote() for actor in actors_two_gpus]) ray.get([actor.check_ids.remote() for actor in actors_one_gpu]) ray.get([actor.check_ids.remote() for actor in actors_no_gpus]) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 51e27602d..db362bcf6 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -285,13 +285,14 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/multiagent_two_trainers.py --num-iters=2 -python $ROOT_DIR/multi_node_docker_test.py \ +python3 $ROOT_DIR/multi_node_docker_test.py \ --docker-image=$DOCKER_SHA \ --num-nodes=5 \ --num-redis-shards=10 \ + --use-raylet \ --test-script=/ray/test/jenkins_tests/multi_node_tests/test_0.py -python $ROOT_DIR/multi_node_docker_test.py \ +python3 $ROOT_DIR/multi_node_docker_test.py \ --docker-image=$DOCKER_SHA \ --num-nodes=5 \ --num-redis-shards=5 \ @@ -300,17 +301,19 @@ python $ROOT_DIR/multi_node_docker_test.py \ --driver-locations=0,1,0,1,2,3,4 \ --test-script=/ray/test/jenkins_tests/multi_node_tests/remove_driver_test.py -python $ROOT_DIR/multi_node_docker_test.py \ +python3 $ROOT_DIR/multi_node_docker_test.py \ --docker-image=$DOCKER_SHA \ --num-nodes=5 \ --num-redis-shards=2 \ --num-gpus=0,0,5,6,50 \ --num-drivers=100 \ + --use-raylet \ --test-script=/ray/test/jenkins_tests/multi_node_tests/many_drivers_test.py -python $ROOT_DIR/multi_node_docker_test.py \ +python3 $ROOT_DIR/multi_node_docker_test.py \ --docker-image=$DOCKER_SHA \ --num-nodes=1 \ --mem-size=60G \ --shm-size=60G \ - --test-script=/ray/test/jenkins_tests/multi_node_tests/large_memory_test.py + --use-raylet \ + --test-script=/ray/test/jenkins_tests/multi_node_tests/large_memory_test.py \ No newline at end of file