mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:44:07 +08:00
Fall back to random port instead of default port for non-primary Redis shards; attempt to cluster Redis shard ports close to each other. (#13847)
This commit is contained in:
@@ -829,6 +829,13 @@ def start_redis(node_ip_address,
|
||||
redis_modules = [REDIS_MODULE]
|
||||
|
||||
redis_stdout_file, redis_stderr_file = redirect_files[0]
|
||||
# If no port is given, fallback to default Redis port for the primary
|
||||
# shard.
|
||||
if port is None:
|
||||
port = ray_constants.DEFAULT_PORT
|
||||
num_retries = 20
|
||||
else:
|
||||
num_retries = 1
|
||||
# Start the primary Redis shard.
|
||||
port, p = _start_redis_instance(
|
||||
redis_executable,
|
||||
@@ -836,6 +843,7 @@ def start_redis(node_ip_address,
|
||||
port=port,
|
||||
password=password,
|
||||
redis_max_clients=redis_max_clients,
|
||||
num_retries=num_retries,
|
||||
# Below we use None to indicate no limit on the memory of the
|
||||
# primary Redis shard.
|
||||
redis_max_memory=None,
|
||||
@@ -869,17 +877,29 @@ def start_redis(node_ip_address,
|
||||
# Start other Redis shards. Each Redis shard logs to a separate file,
|
||||
# prefixed by "redis-<shard number>".
|
||||
redis_shards = []
|
||||
# Attempt to start the other Redis shards port range right after the
|
||||
# primary Redis shard port.
|
||||
last_shard_port = port
|
||||
for i in range(num_redis_shards):
|
||||
redis_stdout_file, redis_stderr_file = redirect_files[i + 1]
|
||||
redis_executable = REDIS_EXECUTABLE
|
||||
redis_modules = [REDIS_MODULE]
|
||||
redis_shard_port = redis_shard_ports[i]
|
||||
# If no shard port is given, try to start this shard's Redis instance
|
||||
# on the port right after the last shard's port.
|
||||
if redis_shard_port is None:
|
||||
redis_shard_port = last_shard_port + 1
|
||||
num_retries = 20
|
||||
else:
|
||||
num_retries = 1
|
||||
|
||||
redis_shard_port, p = _start_redis_instance(
|
||||
redis_executable,
|
||||
modules=redis_modules,
|
||||
port=redis_shard_ports[i],
|
||||
port=redis_shard_port,
|
||||
password=password,
|
||||
redis_max_clients=redis_max_clients,
|
||||
num_retries=num_retries,
|
||||
redis_max_memory=redis_max_memory,
|
||||
stdout_file=redis_stdout_file,
|
||||
stderr_file=redis_stderr_file,
|
||||
@@ -890,13 +910,14 @@ def start_redis(node_ip_address,
|
||||
redis_shards.append(shard_address)
|
||||
# Store redis shard information in the primary redis shard.
|
||||
primary_redis_client.rpush("RedisShards", shard_address)
|
||||
last_shard_port = redis_shard_port
|
||||
|
||||
return redis_address, redis_shards, processes
|
||||
|
||||
|
||||
def _start_redis_instance(executable,
|
||||
modules,
|
||||
port=None,
|
||||
port,
|
||||
redis_max_clients=None,
|
||||
num_retries=20,
|
||||
stdout_file=None,
|
||||
@@ -907,20 +928,19 @@ def _start_redis_instance(executable,
|
||||
"""Start a single Redis server.
|
||||
|
||||
Notes:
|
||||
If "port" is not None, then we will only use this port and try
|
||||
only once. Otherwise, we will first try the default redis port,
|
||||
and if it is unavailable, we will try random ports with
|
||||
maximum retries of "num_retries".
|
||||
We will initially try to start the Redis instance at the given port,
|
||||
and then try at most `num_retries - 1` times to start the Redis
|
||||
instance at successive random ports.
|
||||
|
||||
Args:
|
||||
executable (str): Full path of the redis-server executable.
|
||||
modules (list of str): A list of pathnames, pointing to the redis
|
||||
module(s) that will be loaded in this redis server.
|
||||
port (int): If provided, start a Redis server with this port.
|
||||
port (int): Try to start a Redis server at this port.
|
||||
redis_max_clients: If this is provided, Ray will attempt to configure
|
||||
Redis with this maxclients number.
|
||||
num_retries (int): The number of times to attempt to start Redis. If a
|
||||
port is provided, this defaults to 1.
|
||||
num_retries (int): The number of times to attempt to start Redis at
|
||||
successive ports.
|
||||
stdout_file: A file handle opened for writing to redirect stdout to. If
|
||||
no redirection should happen, then this should be None.
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
@@ -943,13 +963,6 @@ def _start_redis_instance(executable,
|
||||
for module in modules:
|
||||
assert os.path.isfile(module)
|
||||
counter = 0
|
||||
if port is not None:
|
||||
# If a port is specified, then try only once to connect.
|
||||
# This ensures that we will use the given port.
|
||||
num_retries = 1
|
||||
else:
|
||||
port = ray_constants.DEFAULT_PORT
|
||||
|
||||
load_module_args = []
|
||||
for module in modules:
|
||||
load_module_args += ["--loadmodule", module]
|
||||
|
||||
@@ -17,9 +17,12 @@ class RayParams:
|
||||
raylet, a plasma store, a plasma manager, and some workers.
|
||||
It will also kill these processes when Python exits.
|
||||
redis_port (int): The port that the primary Redis shard should listen
|
||||
to. If None, then a random port will be chosen.
|
||||
to. If None, then it will fall back to
|
||||
ray.ray_constants.DEFAULT_PORT, or a random port if the default is
|
||||
not available.
|
||||
redis_shard_ports: A list of the ports to use for the non-primary Redis
|
||||
shards.
|
||||
shards. If None, then it will fall back to the ports right after
|
||||
redis_port, or random ports if those are not available.
|
||||
num_cpus (int): Number of CPUs to configure the raylet with.
|
||||
num_gpus (int): Number of GPUs to configure the raylet with.
|
||||
resources: A dictionary mapping the name of a resource to the quantity
|
||||
|
||||
Reference in New Issue
Block a user