From f6ce9dfa6c100d349fd87b5742a586da61d39d7d Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 12 Feb 2017 12:39:32 -0800 Subject: [PATCH] Allow start_ray.sh to take an object manager port. (#272) * Allow start_ray.sh to take a object manager port. * Fix typo and add test. * Small cleanups. --- python/plasma/plasma.py | 23 +++++++++++++++++----- python/ray/services.py | 27 ++++++++++++++++++++++---- scripts/start_ray.py | 15 ++++++++++---- test/multi_node_test.py | 43 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 13 deletions(-) diff --git a/python/plasma/plasma.py b/python/plasma/plasma.py index 37b53fa0d..e6dc3d2ed 100644 --- a/python/plasma/plasma.py +++ b/python/plasma/plasma.py @@ -325,13 +325,21 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg time.sleep(0.1) return plasma_store_name, pid -def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", num_retries=20, use_valgrind=False, run_profiler=False, redirect_output=False): +def new_port(): + return random.randint(10000, 65535) + +def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", + plasma_manager_port=None, num_retries=20, + use_valgrind=False, run_profiler=False, + redirect_output=False): """Start a plasma manager and return the ports it listens on. Args: store_name (str): The name of the plasma store socket. redis_address (str): The address of the Redis server. node_ip_address (str): The IP address of the node. + plasma_manager_port (int): The port to use for the plasma manager. If this + is not provided, a port will be generated at random. use_valgrind (bool): True if the Plasma manager should be started inside of valgrind and False otherwise. redirect_output (bool): True if stdout and stderr should be redirected to @@ -346,18 +354,21 @@ def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", """ plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../core/src/plasma/plasma_manager") plasma_manager_name = "/tmp/plasma_manager{}".format(random_name()) - port = None + if plasma_manager_port is not None: + if num_retries != 1: + raise Exception("num_retries must be 1 if port is specified.") + else: + plasma_manager_port = new_port() process = None counter = 0 while counter < num_retries: if counter > 0: print("Plasma manager failed to start, retrying now.") - port = random.randint(10000, 65535) command = [plasma_manager_executable, "-s", store_name, "-m", plasma_manager_name, "-h", node_ip_address, - "-p", str(port), + "-p", str(plasma_manager_port), "-r", redis_address] with open(os.devnull, "w") as FNULL: stdout = FNULL if redirect_output else None @@ -373,6 +384,8 @@ def start_plasma_manager(store_name, redis_address, node_ip_address="127.0.0.1", time.sleep(0.1) # See if the process has terminated if process.poll() == None: - return plasma_manager_name, process, port + return plasma_manager_name, process, plasma_manager_port + # Generate a new port and try again. + plasma_manager_port = new_port() counter += 1 raise Exception("Couldn't start plasma manager.") diff --git a/python/ray/services.py b/python/ray/services.py index 55aedf9aa..bd63ee201 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -218,7 +218,7 @@ def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False): counter = 0 if port is not None: if num_retries != 1: - raise Exception("Num retries must be 1 if port is specified") + raise Exception("num_retries must be 1 if port is specified.") else: port = new_port() while counter < num_retries: @@ -317,12 +317,15 @@ def start_local_scheduler(redis_address, all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) return local_scheduler_name -def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output=False, objstore_memory=None): +def start_objstore(node_ip_address, redis_address, object_manager_port=None, + cleanup=True, redirect_output=False, objstore_memory=None): """This method starts an object store process. Args: node_ip_address (str): The IP address of the node running the object store. redis_address (str): The address of the Redis instance to connect to. + object_manager_port (int): The port to use for the object manager. If this + is not provided, one will be generated randomly. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. @@ -358,7 +361,11 @@ def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output # Start the Plasma store. plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, redirect_output=redirect_output) # Start the plasma manager. - plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) + if object_manager_port is not None: + plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, plasma_manager_port=object_manager_port, node_ip_address=node_ip_address, num_retries=1, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) + assert plasma_manager_port == object_manager_port + else: + plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) if cleanup: all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1) all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) @@ -500,10 +507,17 @@ def start_ray_processes(address_info=None, address_info["local_scheduler_socket_names"] = [] local_scheduler_socket_names = address_info["local_scheduler_socket_names"] + # Get the ports to use for the object managers if any are provided. + object_manager_ports = address_info["object_manager_ports"] if "object_manager_ports" in address_info else None + if not isinstance(object_manager_ports, list): + object_manager_ports = num_local_schedulers * [object_manager_ports] + assert len(object_manager_ports) == num_local_schedulers + # Start any object stores that do not yet exist. - for _ in range(num_local_schedulers - len(object_store_addresses)): + for i in range(num_local_schedulers - len(object_store_addresses)): # Start Plasma. object_store_address = start_objstore(node_ip_address, redis_address, + object_manager_port=object_manager_ports[i], cleanup=cleanup, redirect_output=redirect_output) object_store_addresses.append(object_store_address) @@ -570,6 +584,7 @@ def start_ray_processes(address_info=None, def start_ray_node(node_ip_address, redis_address, + object_manager_ports=None, num_workers=0, num_local_schedulers=1, worker_path=None, @@ -585,6 +600,9 @@ def start_ray_node(node_ip_address, Args: node_ip_address (str): The IP address of this node. redis_address (str): The address of the Redis server. + object_manager_ports (list): A list of the ports to use for the object + managers. There should be one per object manager being started on this + node (typically just one). num_workers (int): The number of workers to start. num_local_schedulers (int): The number of local schedulers to start. This is also the number of plasma stores and plasma managers to start. @@ -602,6 +620,7 @@ def start_ray_node(node_ip_address, """ address_info = { "redis_address": redis_address, + "object_manager_ports": object_manager_ports, } return start_ray_processes(address_info=address_info, node_ip_address=node_ip_address, diff --git a/scripts/start_ray.py b/scripts/start_ray.py index eabc16e02..31b6b67fe 100644 --- a/scripts/start_ray.py +++ b/scripts/start_ray.py @@ -11,6 +11,7 @@ parser = argparse.ArgumentParser(description="Parse addresses for the worker to parser.add_argument("--node-ip-address", required=False, type=str, help="the IP address of the worker's node") parser.add_argument("--redis-address", required=False, type=str, help="the address to use for connecting to Redis") parser.add_argument("--redis-port", required=False, type=str, help="the port to use for starting Redis") +parser.add_argument("--object-manager-port", required=False, type=int, help="the port to use for starting the object manager") parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") parser.add_argument("--head", action="store_true", help="provide this argument for the head node") @@ -50,10 +51,15 @@ if __name__ == "__main__": node_ip_address = args.node_ip_address print("Using IP address {} for this node.".format(node_ip_address)) + address_info = {} + # Use the provided Redis port if there is one. if args.redis_port is not None: - address_info = {"redis_address": "{}:{}".format(node_ip_address, - args.redis_port)} - else: + address_info["redis_address"] = "{}:{}".format(node_ip_address, + args.redis_port) + # Use the provided object manager port if there is one. + if args.object_manager_port is not None: + address_info["object_manager_ports"] = [args.object_manager_port] + if address_info == {}: address_info = None address_info = services.start_ray_head(address_info=address_info, @@ -91,7 +97,7 @@ if __name__ == "__main__": if args.node_ip_address is None: node_ip_address = services.get_node_ip_address(args.redis_address) else: - node_ip_addess = args.node_ip_address + node_ip_address = args.node_ip_address print("Using IP address {} for this node.".format(node_ip_address)) # Check that there aren't already Redis clients with the same IP address # connected with this Redis instance. This raises an exception if the Redis @@ -99,6 +105,7 @@ if __name__ == "__main__": check_no_existing_redis_clients(node_ip_address, args.redis_address) address_info = services.start_ray_node(node_ip_address=node_ip_address, redis_address=args.redis_address, + object_manager_ports=[args.object_manager_port], num_workers=args.num_workers, cleanup=False, redirect_output=True) diff --git a/test/multi_node_test.py b/test/multi_node_test.py index 8afb6cee5..cae0fd6bf 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -97,5 +97,48 @@ print("success") ray.worker.cleanup() subprocess.Popen([stop_ray_script]).wait() +class StartRayScriptTest(unittest.TestCase): + + def testCallingStartRayHead(self): + # Test that we can call start-ray.sh with various command line parameters. + # TODO(rkn): This test only tests the --head code path. We should also test + # the non-head node code path. + + # Test starting Ray with no arguments. + out = subprocess.check_output([start_ray_script, "--head"]).decode("ascii") + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with a number of workers specified. + subprocess.check_output([start_ray_script, "--head", "--num-workers", "20"]) + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with a redis port specified. + subprocess.check_output([start_ray_script, "--head", + "--redis-port", "6379"]) + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with a node IP address specified. + subprocess.check_output([start_ray_script, "--head", + "--node-ip-address", "127.0.0.1"]) + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with an object manager port specified. + subprocess.check_output([start_ray_script, "--head", + "--object-manager-port", "12345"]) + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with all arguments specified. + subprocess.check_output([start_ray_script, "--head", + "--num-workers", "20", + "--redis-port", "6379", + "--object-manager-port", "12345"]) + subprocess.Popen([stop_ray_script]).wait() + + # Test starting Ray with invalid arguments. + with self.assertRaises(Exception): + subprocess.check_output([start_ray_script, "--head", + "--redis-address", "127.0.0.1:6379"]) + subprocess.Popen([stop_ray_script]).wait() + if __name__ == "__main__": unittest.main(verbosity=2)