Expose internal config parameters for starting Ray (#3246)

## What do these changes do?

This PR exposes the CL option for using a config parameter. This is important for certain tests (i.e., FT tests that removing nodes) to run quickly.

Note that this is bad practice and should be replaced with GFLAGS or some equivalent as soon as possible.

#3239 depends on this.

TODO:
 - [x] Add documentation to method arguments before merging.
 - [x] Add test to verify this works?

## Related issue number
This commit is contained in:
Richard Liaw
2018-11-07 21:46:02 -08:00
committed by GitHub
parent 43df405d07
commit 0bab8ed95c
10 changed files with 253 additions and 35 deletions
+11 -3
View File
@@ -193,13 +193,19 @@ def cli(logging_level, logging_format):
"--temp-dir",
default=None,
help="manually specify the root temporary dir of the Ray process")
@click.option(
"--internal-config",
default=None,
type=str,
help="Do NOT use this. This is for debugging/development purposes ONLY.")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, object_store_memory,
num_workers, num_cpus, num_gpus, resources, head, no_ui, block,
plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir):
plasma_store_socket_name, raylet_socket_name, temp_dir,
internal_config):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
@@ -269,7 +275,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
autoscaling_config=autoscaling_config,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
temp_dir=temp_dir,
_internal_config=internal_config)
logger.info(address_info)
logger.info(
"\nStarted Ray on this node. You can add additional nodes to "
@@ -348,7 +355,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
huge_pages=huge_pages,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
temp_dir=temp_dir,
_internal_config=internal_config)
logger.info(address_info)
logger.info("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
+38 -14
View File
@@ -860,6 +860,7 @@ def start_raylet(redis_address,
stdout_file=None,
stderr_file=None,
cleanup=True,
config=None,
redis_password=None):
"""Start a raylet, which is a combined local scheduler and object manager.
@@ -890,11 +891,16 @@ def start_raylet(redis_address,
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
config (dict|None): Optional Raylet configuration that will
override defaults in RayConfig.
redis_password (str): The password of the redis server.
Returns:
The raylet socket name.
"""
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
@@ -906,11 +912,8 @@ def start_raylet(redis_address,
1, min(multiprocessing.cpu_count(), static_resources["CPU"]))
# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join([
"{},{}".format(resource_name, resource_value)
for resource_name, resource_value in zip(static_resources.keys(),
static_resources.values())
])
resource_argument = ",".join(
["{},{}".format(*kv) for kv in static_resources.items()])
gcs_ip_address, gcs_port = redis_address.split(":")
@@ -948,6 +951,7 @@ def start_raylet(redis_address,
str(num_workers),
str(maximum_startup_concurrency),
resource_argument,
config_str,
start_worker_command,
"", # Worker command for Java, not needed for Python.
redis_password or "",
@@ -1209,7 +1213,8 @@ def start_raylet_monitor(redis_address,
stdout_file=None,
stderr_file=None,
cleanup=True,
redis_password=None):
redis_password=None,
config=None):
"""Run a process to monitor the other processes.
Args:
@@ -1223,10 +1228,14 @@ def start_raylet_monitor(redis_address,
Python process that imported services exits. This is True by
default.
redis_password (str): The password of the redis server.
config (dict|None): Optional configuration that will
override defaults in RayConfig.
"""
gcs_ip_address, gcs_port = redis_address.split(":")
redis_password = redis_password or ""
command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port]
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port, config_str]
if redis_password:
command += [redis_password]
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
@@ -1259,7 +1268,8 @@ def start_ray_processes(address_info=None,
autoscaling_config=None,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
temp_dir=None,
_internal_config=None):
"""Helper method to start Ray processes.
Args:
@@ -1324,6 +1334,8 @@ def start_ray_processes(address_info=None,
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
Returns:
A dictionary of the address information for the processes that were
@@ -1335,6 +1347,8 @@ def start_ray_processes(address_info=None,
logger.info("Process STDOUT and STDERR is being redirected to {}.".format(
get_logs_dir_path()))
config = json.loads(_internal_config) if _internal_config else None
if resources is None:
resources = {}
if not isinstance(resources, list):
@@ -1395,7 +1409,8 @@ def start_ray_processes(address_info=None,
stdout_file=monitor_stdout_file,
stderr_file=monitor_stderr_file,
cleanup=cleanup,
redis_password=redis_password)
redis_password=redis_password,
config=config)
if redis_shards == []:
# Get redis shards from primary redis instance.
redis_ip_address, redis_port = redis_address.split(":")
@@ -1473,7 +1488,8 @@ def start_ray_processes(address_info=None,
stdout_file=raylet_stdout_file,
stderr_file=raylet_stderr_file,
cleanup=cleanup,
redis_password=redis_password))
redis_password=redis_password,
config=config))
# Try to start the web UI.
if include_webui:
@@ -1506,7 +1522,8 @@ def start_ray_node(node_ip_address,
huge_pages=False,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
temp_dir=None,
_internal_config=None):
"""Start the Ray processes for a single node.
This assumes that the Ray processes on some master node have already been
@@ -1550,6 +1567,8 @@ def start_ray_node(node_ip_address,
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
Returns:
A dictionary of the address information for the processes that were
@@ -1577,7 +1596,8 @@ def start_ray_node(node_ip_address,
huge_pages=huge_pages,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
temp_dir=temp_dir,
_internal_config=_internal_config)
def start_ray_head(address_info=None,
@@ -1604,7 +1624,8 @@ def start_ray_head(address_info=None,
autoscaling_config=None,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
temp_dir=None,
_internal_config=None):
"""Start Ray in local mode.
Args:
@@ -1665,6 +1686,8 @@ def start_ray_head(address_info=None,
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
Returns:
A dictionary of the address information for the processes that were
@@ -1697,4 +1720,5 @@ def start_ray_head(address_info=None,
autoscaling_config=autoscaling_config,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
temp_dir=temp_dir,
_internal_config=_internal_config)
+4 -2
View File
@@ -111,15 +111,17 @@ class Cluster(object):
assert not node.any_processes_alive(), (
"There are zombie processes left over after killing.")
def wait_for_nodes(self, retries=20):
def wait_for_nodes(self, retries=30):
"""Waits for all nodes to be registered with global state.
By default, waits for 3 seconds.
Args:
retries (int): Number of times to retry checking client table.
"""
for i in range(retries):
if not ray.is_initialized() or not self._check_registered_nodes():
time.sleep(0.3)
time.sleep(0.1)
else:
break
+14 -3
View File
@@ -1293,7 +1293,8 @@ def _init(address_info=None,
driver_id=None,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
temp_dir=None,
_internal_config=None):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1355,6 +1356,8 @@ def _init(address_info=None,
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
Returns:
Address information about the started processes.
@@ -1427,7 +1430,8 @@ def _init(address_info=None,
include_webui=include_webui,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
temp_dir=temp_dir,
_internal_config=_internal_config)
else:
if redis_address is None:
raise Exception("When connecting to an existing cluster, "
@@ -1468,6 +1472,9 @@ def _init(address_info=None,
if raylet_socket_name is not None:
raise Exception("When connecting to an existing cluster, "
"raylet_socket_name must not be provided.")
if _internal_config is not None:
raise Exception("When connecting to an existing cluster, "
"_internal_config must not be provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
@@ -1530,6 +1537,7 @@ def init(redis_address=None,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None,
_internal_config=None,
use_raylet=None):
"""Connect to an existing Ray cluster or start one and connect to it.
@@ -1601,6 +1609,8 @@ def init(redis_address=None,
used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
Returns:
Address information about the started processes.
@@ -1658,7 +1668,8 @@ def init(redis_address=None,
driver_id=driver_id,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir)
temp_dir=temp_dir,
_internal_config=_internal_config)
for hook in _post_init_hooks:
hook()
return ret