diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index ed047aae9..26913ef0c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -51,6 +51,9 @@ def cli(): @click.option("--num-redis-shards", required=False, type=int, help=("the number of additional Redis shards to use in " "addition to the primary Redis shard")) +@click.option("--redis-max-clients", required=False, type=int, + help=("If provided, attempt to configure Redis with this " + "maximum number of clients.")) @click.option("--object-manager-port", required=False, type=int, help="the port to use for starting the object manager") @click.option("--num-workers", required=False, type=int, @@ -75,8 +78,8 @@ def cli(): @click.option("--huge-pages", is_flag=True, default=False, help="enable support for huge pages in the object store") def start(node_ip_address, redis_address, redis_port, num_redis_shards, - object_manager_port, num_workers, num_cpus, num_gpus, - num_custom_resource, head, no_ui, block, plasma_directory, + redis_max_clients, object_manager_port, num_workers, num_cpus, + num_gpus, num_custom_resource, head, no_ui, block, plasma_directory, huge_pages): # Note that we redirect stdout and stderr to /dev/null because otherwise # attempts to print may cause exceptions if a process is started inside of @@ -120,6 +123,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, num_gpus=num_gpus, num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards, + redis_max_clients=redis_max_clients, include_webui=(not no_ui), plasma_directory=plasma_directory, huge_pages=huge_pages) @@ -147,6 +151,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, if num_redis_shards is not None: raise Exception("If --head is not passed in, --num-redis-shards " "must not be provided.") + if redis_max_clients is not None: + raise Exception("If --head is not passed in, --redis-max-clients " + "must not be provided.") if no_ui: raise Exception("If --head is not passed in, the --no-ui flag is " "not relevant.") diff --git a/python/ray/services.py b/python/ray/services.py index 588e867b7..38e19d472 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -328,6 +328,7 @@ def check_version_info(redis_client): def start_redis(node_ip_address, port=None, num_redis_shards=1, + redis_max_clients=None, redirect_output=False, redirect_worker_output=False, cleanup=True): @@ -341,6 +342,8 @@ def start_redis(node_ip_address, num_redis_shards (int): If provided, the number of Redis shards to start, in addition to the primary one. The default value is one shard. + redis_max_clients: If this is provided, Ray will attempt to configure + Redis with this maxclients number. redirect_output (bool): True if output should be redirected to a file and false otherwise. redirect_worker_output (bool): True if worker output should be @@ -359,6 +362,7 @@ def start_redis(node_ip_address, "redis", redirect_output) assigned_port, _ = start_redis_instance( node_ip_address=node_ip_address, port=port, + redis_max_clients=redis_max_clients, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup) if port is not None: @@ -385,8 +389,10 @@ def start_redis(node_ip_address, redis_stdout_file, redis_stderr_file = new_log_files( "redis-{}".format(i), redirect_output) redis_shard_port, _ = start_redis_instance( - node_ip_address=node_ip_address, stdout_file=redis_stdout_file, - stderr_file=redis_stderr_file, cleanup=cleanup) + node_ip_address=node_ip_address, + redis_max_clients=redis_max_clients, + stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, + cleanup=cleanup) shard_address = address(node_ip_address, redis_shard_port) redis_shards.append(shard_address) # Store redis shard information in the primary redis shard. @@ -397,6 +403,7 @@ def start_redis(node_ip_address, def start_redis_instance(node_ip_address="127.0.0.1", port=None, + redis_max_clients=None, num_retries=20, stdout_file=None, stderr_file=None, @@ -407,6 +414,8 @@ def start_redis_instance(node_ip_address="127.0.0.1", node_ip_address (str): The IP address of the current node. This is only used for recording the log filenames in Redis. port (int): If provided, start a Redis server with this port. + redis_max_clients: If this is provided, Ray will attempt to configure + Redis with this maxclients number. num_retries (int): The number of times to attempt to start Redis. If a port is provided, this defaults to 1. stdout_file: A file handle opened for writing to redirect stdout to. If @@ -469,6 +478,10 @@ def start_redis_instance(node_ip_address="127.0.0.1", # Configure Redis to not run in protected mode so that processes on other # hosts can connect to it. TODO(rkn): Do this in a more secure way. redis_client.config_set("protected-mode", "no") + # If redis_max_clients is provided, attempt to raise the number of maximum + # number of Redis clients. + if redis_max_clients is not None: + redis_client.config_set("maxclients", str(redis_max_clients)) # Increase the hard and soft limits for the redis client pubsub buffer to # 128MB. This is a hack to make it less likely for pubsub messages to be # dropped and for pubsub connections to therefore be killed. @@ -858,6 +871,7 @@ def start_ray_processes(address_info=None, num_local_schedulers=1, object_store_memory=None, num_redis_shards=1, + redis_max_clients=None, worker_path=None, cleanup=True, redirect_output=False, @@ -892,6 +906,8 @@ def start_ray_processes(address_info=None, object store with. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. + redis_max_clients: If provided, attempt to configure Redis with this + maxclients number. worker_path (str): The path of the source code that will be run by the worker. cleanup (bool): If cleanup is true, then the processes started here @@ -961,6 +977,7 @@ def start_ray_processes(address_info=None, redis_address, redis_shards = start_redis( node_ip_address, port=redis_port, num_redis_shards=num_redis_shards, + redis_max_clients=redis_max_clients, redirect_output=True, redirect_worker_output=redirect_output, cleanup=cleanup) address_info["redis_address"] = redis_address @@ -1191,6 +1208,7 @@ def start_ray_head(address_info=None, num_gpus=None, num_custom_resource=None, num_redis_shards=None, + redis_max_clients=None, include_webui=True, plasma_directory=None, huge_pages=False): @@ -1228,6 +1246,8 @@ def start_ray_head(address_info=None, num_gpus (int): number of gpus to configure the local scheduler with. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. + redis_max_clients: If provided, attempt to configure Redis with this + maxclients number. include_webui: True if the UI should be started and false otherwise. plasma_directory: A directory where the Plasma memory mapped files will be created. @@ -1257,6 +1277,7 @@ def start_ray_head(address_info=None, num_gpus=num_gpus, num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards, + redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, huge_pages=huge_pages) diff --git a/python/ray/worker.py b/python/ray/worker.py index 10831880f..50473868a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1165,6 +1165,7 @@ def _init(address_info=None, num_gpus=None, num_custom_resource=None, num_redis_shards=None, + redis_max_clients=None, plasma_directory=None, huge_pages=False): """Helper method to connect to an existing Ray cluster or start a new one. @@ -1210,6 +1211,8 @@ def _init(address_info=None, with. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. + redis_max_clients: If provided, attempt to configure Redis with this + maxclients number. plasma_directory: A directory where the Plasma memory mapped files will be created. huge_pages: Boolean flag indicating whether to start the Object @@ -1273,6 +1276,7 @@ def _init(address_info=None, num_gpus=num_gpus, num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards, + redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, huge_pages=huge_pages) else: @@ -1293,6 +1297,9 @@ def _init(address_info=None, if num_redis_shards is not None: raise Exception("When connecting to an existing cluster, " "num_redis_shards must not be provided.") + if redis_max_clients is not None: + raise Exception("When connecting to an existing cluster, " + "redis_max_clients must not be provided.") if object_store_memory is not None: raise Exception("When connecting to an existing cluster, " "object_store_memory must not be provided.") @@ -1334,7 +1341,7 @@ def _init(address_info=None, def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False, num_cpus=None, num_gpus=None, num_custom_resource=None, - num_redis_shards=None, + num_redis_shards=None, redis_max_clients=None, plasma_directory=None, huge_pages=False): """Connect to an existing Ray cluster or start one and connect to it. @@ -1368,6 +1375,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, flag is experimental and is subject to changes in the future. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. + redis_max_clients: If provided, attempt to configure Redis with this + maxclients number. plasma_directory: A directory where the Plasma memory mapped files will be created. huge_pages: Boolean flag indicating whether to start the Object @@ -1393,6 +1402,7 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, redirect_output=redirect_output, num_cpus=num_cpus, num_gpus=num_gpus, num_custom_resource=num_custom_resource, num_redis_shards=num_redis_shards, + redis_max_clients=redis_max_clients, plasma_directory=plasma_directory, huge_pages=huge_pages) diff --git a/test/multi_node_test.py b/test/multi_node_test.py index fc84ffd71..3f7871634 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -193,13 +193,19 @@ class StartRayScriptTest(unittest.TestCase): "--num-gpus", "100"]) subprocess.Popen(["ray", "stop"]).wait() + # Test starting Ray with the max redis clients specified. + subprocess.check_output(["ray", "start", "--head", + "--redis-max-clients", "100"]) + subprocess.Popen(["ray", "stop"]).wait() + # Test starting Ray with all arguments specified. subprocess.check_output(["ray", "start", "--head", "--num-workers", "20", "--redis-port", "6379", "--object-manager-port", "12345", "--num-cpus", "100", - "--num-gpus", "0"]) + "--num-gpus", "0", + "--redis-max-clients", "100"]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with invalid arguments.