Provide flag for setting redis maxclients. (#1257)

* Add flag for attempting to increase ulimit -n and the redis maxclients.

* Don't bother trying to set ulimit -n.

* Fix linting.

* Add basic test.
This commit is contained in:
Robert Nishihara
2017-11-26 18:25:55 -08:00
committed by Philipp Moritz
parent 7fc2ddbaf7
commit 0b4961b161
4 changed files with 50 additions and 6 deletions
+9 -2
View File
@@ -51,6 +51,9 @@ def cli():
@click.option("--num-redis-shards", required=False, type=int,
help=("the number of additional Redis shards to use in "
"addition to the primary Redis shard"))
@click.option("--redis-max-clients", required=False, type=int,
help=("If provided, attempt to configure Redis with this "
"maximum number of clients."))
@click.option("--object-manager-port", required=False, type=int,
help="the port to use for starting the object manager")
@click.option("--num-workers", required=False, type=int,
@@ -75,8 +78,8 @@ def cli():
@click.option("--huge-pages", is_flag=True, default=False,
help="enable support for huge pages in the object store")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
object_manager_port, num_workers, num_cpus, num_gpus,
num_custom_resource, head, no_ui, block, plasma_directory,
redis_max_clients, object_manager_port, num_workers, num_cpus,
num_gpus, num_custom_resource, head, no_ui, block, plasma_directory,
huge_pages):
# Note that we redirect stdout and stderr to /dev/null because otherwise
# attempts to print may cause exceptions if a process is started inside of
@@ -120,6 +123,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
include_webui=(not no_ui),
plasma_directory=plasma_directory,
huge_pages=huge_pages)
@@ -147,6 +151,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
if num_redis_shards is not None:
raise Exception("If --head is not passed in, --num-redis-shards "
"must not be provided.")
if redis_max_clients is not None:
raise Exception("If --head is not passed in, --redis-max-clients "
"must not be provided.")
if no_ui:
raise Exception("If --head is not passed in, the --no-ui flag is "
"not relevant.")
+23 -2
View File
@@ -328,6 +328,7 @@ def check_version_info(redis_client):
def start_redis(node_ip_address,
port=None,
num_redis_shards=1,
redis_max_clients=None,
redirect_output=False,
redirect_worker_output=False,
cleanup=True):
@@ -341,6 +342,8 @@ def start_redis(node_ip_address,
num_redis_shards (int): If provided, the number of Redis shards to
start, in addition to the primary one. The default value is one
shard.
redis_max_clients: If this is provided, Ray will attempt to configure
Redis with this maxclients number.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
redirect_worker_output (bool): True if worker output should be
@@ -359,6 +362,7 @@ def start_redis(node_ip_address,
"redis", redirect_output)
assigned_port, _ = start_redis_instance(
node_ip_address=node_ip_address, port=port,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file, stderr_file=redis_stderr_file,
cleanup=cleanup)
if port is not None:
@@ -385,8 +389,10 @@ def start_redis(node_ip_address,
redis_stdout_file, redis_stderr_file = new_log_files(
"redis-{}".format(i), redirect_output)
redis_shard_port, _ = start_redis_instance(
node_ip_address=node_ip_address, stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file, cleanup=cleanup)
node_ip_address=node_ip_address,
redis_max_clients=redis_max_clients,
stdout_file=redis_stdout_file, stderr_file=redis_stderr_file,
cleanup=cleanup)
shard_address = address(node_ip_address, redis_shard_port)
redis_shards.append(shard_address)
# Store redis shard information in the primary redis shard.
@@ -397,6 +403,7 @@ def start_redis(node_ip_address,
def start_redis_instance(node_ip_address="127.0.0.1",
port=None,
redis_max_clients=None,
num_retries=20,
stdout_file=None,
stderr_file=None,
@@ -407,6 +414,8 @@ def start_redis_instance(node_ip_address="127.0.0.1",
node_ip_address (str): The IP address of the current node. This is only
used for recording the log filenames in Redis.
port (int): If provided, start a Redis server with this port.
redis_max_clients: If this is provided, Ray will attempt to configure
Redis with this maxclients number.
num_retries (int): The number of times to attempt to start Redis. If a
port is provided, this defaults to 1.
stdout_file: A file handle opened for writing to redirect stdout to. If
@@ -469,6 +478,10 @@ def start_redis_instance(node_ip_address="127.0.0.1",
# Configure Redis to not run in protected mode so that processes on other
# hosts can connect to it. TODO(rkn): Do this in a more secure way.
redis_client.config_set("protected-mode", "no")
# If redis_max_clients is provided, attempt to raise the number of maximum
# number of Redis clients.
if redis_max_clients is not None:
redis_client.config_set("maxclients", str(redis_max_clients))
# Increase the hard and soft limits for the redis client pubsub buffer to
# 128MB. This is a hack to make it less likely for pubsub messages to be
# dropped and for pubsub connections to therefore be killed.
@@ -858,6 +871,7 @@ def start_ray_processes(address_info=None,
num_local_schedulers=1,
object_store_memory=None,
num_redis_shards=1,
redis_max_clients=None,
worker_path=None,
cleanup=True,
redirect_output=False,
@@ -892,6 +906,8 @@ def start_ray_processes(address_info=None,
object store with.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
maxclients number.
worker_path (str): The path of the source code that will be run by the
worker.
cleanup (bool): If cleanup is true, then the processes started here
@@ -961,6 +977,7 @@ def start_ray_processes(address_info=None,
redis_address, redis_shards = start_redis(
node_ip_address, port=redis_port,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
redirect_output=True,
redirect_worker_output=redirect_output, cleanup=cleanup)
address_info["redis_address"] = redis_address
@@ -1191,6 +1208,7 @@ def start_ray_head(address_info=None,
num_gpus=None,
num_custom_resource=None,
num_redis_shards=None,
redis_max_clients=None,
include_webui=True,
plasma_directory=None,
huge_pages=False):
@@ -1228,6 +1246,8 @@ def start_ray_head(address_info=None,
num_gpus (int): number of gpus to configure the local scheduler with.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
maxclients number.
include_webui: True if the UI should be started and false otherwise.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
@@ -1257,6 +1277,7 @@ def start_ray_head(address_info=None,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
+11 -1
View File
@@ -1165,6 +1165,7 @@ def _init(address_info=None,
num_gpus=None,
num_custom_resource=None,
num_redis_shards=None,
redis_max_clients=None,
plasma_directory=None,
huge_pages=False):
"""Helper method to connect to an existing Ray cluster or start a new one.
@@ -1210,6 +1211,8 @@ def _init(address_info=None,
with.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
maxclients number.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
@@ -1273,6 +1276,7 @@ def _init(address_info=None,
num_gpus=num_gpus,
num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
huge_pages=huge_pages)
else:
@@ -1293,6 +1297,9 @@ def _init(address_info=None,
if num_redis_shards is not None:
raise Exception("When connecting to an existing cluster, "
"num_redis_shards must not be provided.")
if redis_max_clients is not None:
raise Exception("When connecting to an existing cluster, "
"redis_max_clients must not be provided.")
if object_store_memory is not None:
raise Exception("When connecting to an existing cluster, "
"object_store_memory must not be provided.")
@@ -1334,7 +1341,7 @@ def _init(address_info=None,
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE, redirect_output=False,
num_cpus=None, num_gpus=None, num_custom_resource=None,
num_redis_shards=None,
num_redis_shards=None, redis_max_clients=None,
plasma_directory=None, huge_pages=False):
"""Connect to an existing Ray cluster or start one and connect to it.
@@ -1368,6 +1375,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
flag is experimental and is subject to changes in the future.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
maxclients number.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: Boolean flag indicating whether to start the Object
@@ -1393,6 +1402,7 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
redirect_output=redirect_output, num_cpus=num_cpus,
num_gpus=num_gpus, num_custom_resource=num_custom_resource,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
huge_pages=huge_pages)