From 330159d8bdd8f094ca78113a7ea12b4d11fa5ff5 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 22 Feb 2018 11:05:37 -0800 Subject: [PATCH] Allow setting redis shard ports through ray start (also object store memory). (#1581) * Allow passing in --object-store-memory to ray start. * Allow setting ports for the redis shards. * Reorder arguments and infer number of shards from ports. * Move code block into only the head node case. * Add test. --- python/ray/scripts/scripts.py | 39 +++++++++++++++++++++++++++-------- python/ray/services.py | 28 +++++++++++++++++++++++-- test/multi_node_test.py | 6 ++++++ 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index d6b1d6767..f13f4d8ee 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -53,8 +53,14 @@ def cli(): @click.option("--redis-max-clients", required=False, type=int, help=("If provided, attempt to configure Redis with this " "maximum number of clients.")) +@click.option("--redis-shard-ports", required=False, type=str, + help="the port to use for the Redis shards other than the " + "primary Redis shard") @click.option("--object-manager-port", required=False, type=int, help="the port to use for starting the object manager") +@click.option("--object-store-memory", required=False, type=int, + help="the maximum amount of memory (in bytes) to allow the " + "object store to use") @click.option("--num-workers", required=False, type=int, help=("The initial number of workers to start on this node, " "note that the local scheduler may start additional " @@ -81,15 +87,10 @@ def cli(): @click.option("--autoscaling-config", required=False, type=str, help="the file that contains the autoscaling config") def start(node_ip_address, redis_address, redis_port, num_redis_shards, - redis_max_clients, object_manager_port, num_workers, num_cpus, - num_gpus, resources, head, no_ui, block, plasma_directory, - huge_pages, autoscaling_config): - # Note that we redirect stdout and stderr to /dev/null because otherwise - # attempts to print may cause exceptions if a process is started inside of - # an SSH connection and the SSH connection dies. TODO(rkn): This is a - # temporary fix. We should actually redirect stdout and stderr to Redis in - # some way. - + redis_max_clients, redis_shard_ports, object_manager_port, + object_store_memory, num_workers, num_cpus, num_gpus, resources, + head, no_ui, block, plasma_directory, huge_pages, + autoscaling_config): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -113,6 +114,20 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, if head: # Start Ray on the head node. + if redis_shard_ports is not None: + redis_shard_ports = redis_shard_ports.split(",") + # Infer the number of Redis shards from the ports if the number is + # not provided. + if num_redis_shards is None: + num_redis_shards = len(redis_shard_ports) + # Check that the arguments match. + if len(redis_shard_ports) != num_redis_shards: + raise Exception("If --redis-shard-ports is provided, it must " + "have the form '6380,6381,6382', and the " + "number of ports provided must equal " + "--num-redis-shards (which is 1 if not " + "provided)") + if redis_address is not None: raise Exception("If --head is passed in, a Redis server will be " "started, so a Redis address should not be " @@ -134,6 +149,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, address_info=address_info, node_ip_address=node_ip_address, redis_port=redis_port, + redis_shard_ports=redis_shard_ports, + object_store_memory=object_store_memory, num_workers=num_workers, cleanup=False, redirect_output=True, @@ -162,6 +179,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, if redis_port is not None: raise Exception("If --head is not passed in, --redis-port is not " "allowed") + if redis_shard_ports is not None: + raise Exception("If --head is not passed in, --redis-shard-ports " + "is not allowed") if redis_address is None: raise Exception("If --head is not passed in, --redis-address must " "be provided.") @@ -200,6 +220,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_address=redis_address, object_manager_ports=[object_manager_port], num_workers=num_workers, + object_store_memory=object_store_memory, cleanup=False, redirect_output=True, resources=resources, diff --git a/python/ray/services.py b/python/ray/services.py index 086f52b46..a912f5b9c 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -369,6 +369,7 @@ def check_version_info(redis_client): def start_redis(node_ip_address, port=None, + redis_shard_ports=None, num_redis_shards=1, redis_max_clients=None, redirect_output=False, @@ -381,6 +382,8 @@ def start_redis(node_ip_address, for recording the log filenames in Redis. port (int): If provided, the primary Redis shard will be started on this port. + redis_shard_ports: A list of the ports to use for the non-primary Redis + shards. num_redis_shards (int): If provided, the number of Redis shards to start, in addition to the primary one. The default value is one shard. @@ -403,6 +406,12 @@ def start_redis(node_ip_address, redis_stdout_file, redis_stderr_file = new_log_files( "redis", redirect_output) + if redis_shard_ports is None: + redis_shard_ports = num_redis_shards * [None] + elif len(redis_shard_ports) != num_redis_shards: + raise Exception("The number of Redis shard ports does not match the " + "number of Redis shards.") + assigned_port, _ = start_redis_instance( node_ip_address=node_ip_address, port=port, redis_max_clients=redis_max_clients, @@ -425,17 +434,20 @@ def start_redis(node_ip_address, # Store version information in the primary Redis shard. _put_version_info_in_redis(redis_client) - # Start other Redis shards listening on random ports. Each Redis shard logs - # to a separate file, prefixed by "redis-". + # Start other Redis shards. Each Redis shard logs to a separate file, + # prefixed by "redis-". redis_shards = [] for i in range(num_redis_shards): redis_stdout_file, redis_stderr_file = new_log_files( "redis-{}".format(i), redirect_output) redis_shard_port, _ = start_redis_instance( node_ip_address=node_ip_address, + port=redis_shard_ports[i], redis_max_clients=redis_max_clients, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup) + if redis_shard_ports[i] is not None: + assert redis_shard_port == redis_shard_ports[i] shard_address = address(node_ip_address, redis_shard_port) redis_shards.append(shard_address) # Store redis shard information in the primary redis shard. @@ -942,6 +954,7 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None, def start_ray_processes(address_info=None, node_ip_address="127.0.0.1", redis_port=None, + redis_shard_ports=None, num_workers=None, num_local_schedulers=1, object_store_memory=None, @@ -969,6 +982,8 @@ def start_ray_processes(address_info=None, to. If None, then a random port will be chosen. If the key "redis_address" is in address_info, then this argument will be ignored. + redis_shard_ports: A list of the ports to use for the non-primary Redis + shards. num_workers (int): The number of workers to start. num_local_schedulers (int): The total number of local schedulers required. This is also the total number of object stores required. @@ -1042,6 +1057,7 @@ def start_ray_processes(address_info=None, if redis_address is None: redis_address, redis_shards = start_redis( node_ip_address, port=redis_port, + redis_shard_ports=redis_shard_ports, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, redirect_output=True, @@ -1203,6 +1219,7 @@ def start_ray_node(node_ip_address, object_manager_ports=None, num_workers=0, num_local_schedulers=1, + object_store_memory=None, worker_path=None, cleanup=True, redirect_output=False, @@ -1224,6 +1241,8 @@ def start_ray_node(node_ip_address, num_local_schedulers (int): The number of local schedulers to start. This is also the number of plasma stores and plasma managers to start. + object_store_memory (int): The maximum amount of memory (in bytes) to + let the plasma store use. worker_path (str): The path of the source code that will be run by the worker. cleanup (bool): If cleanup is true, then the processes started here @@ -1248,6 +1267,7 @@ def start_ray_node(node_ip_address, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, + object_store_memory=object_store_memory, worker_path=worker_path, include_log_monitor=True, cleanup=cleanup, @@ -1260,6 +1280,7 @@ def start_ray_node(node_ip_address, def start_ray_head(address_info=None, node_ip_address="127.0.0.1", redis_port=None, + redis_shard_ports=None, num_workers=0, num_local_schedulers=1, object_store_memory=None, @@ -1285,6 +1306,8 @@ def start_ray_head(address_info=None, to. If None, then a random port will be chosen. If the key "redis_address" is in address_info, then this argument will be ignored. + redis_shard_ports: A list of the ports to use for the non-primary Redis + shards. num_workers (int): The number of workers to start. num_local_schedulers (int): The total number of local schedulers required. This is also the total number of object stores required. @@ -1326,6 +1349,7 @@ def start_ray_head(address_info=None, address_info=address_info, node_ip_address=node_ip_address, redis_port=redis_port, + redis_shard_ports=redis_shard_ports, num_workers=num_workers, num_local_schedulers=num_local_schedulers, object_store_memory=object_store_memory, diff --git a/test/multi_node_test.py b/test/multi_node_test.py index fa80e3637..d238e55f2 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -216,6 +216,11 @@ class StartRayScriptTest(unittest.TestCase): "--redis-port", "6379"]) subprocess.Popen(["ray", "stop"]).wait() + # Test starting Ray with redis shard ports specified. + subprocess.check_output(["ray", "start", "--head", + "--redis-shard-ports", "6380,6381,6382"]) + subprocess.Popen(["ray", "stop"]).wait() + # Test starting Ray with a node IP address specified. subprocess.check_output(["ray", "start", "--head", "--node-ip-address", "127.0.0.1"]) @@ -245,6 +250,7 @@ class StartRayScriptTest(unittest.TestCase): subprocess.check_output(["ray", "start", "--head", "--num-workers", "20", "--redis-port", "6379", + "--redis-shard-ports", "6380,6381,6382", "--object-manager-port", "12345", "--num-cpus", "100", "--num-gpus", "0",