Add --worker-port-list option to ray start (#11481)

This commit is contained in:
Edward Oakes
2020-10-21 14:46:45 -05:00
committed by Alex Wu
parent 9d765ba740
commit dbcb368dea
11 changed files with 76 additions and 10 deletions
+3
View File
@@ -1156,6 +1156,7 @@ def start_raylet(redis_address,
object_store_memory,
min_worker_port=None,
max_worker_port=None,
worker_port_list=None,
object_manager_port=None,
redis_password=None,
metrics_agent_port=None,
@@ -1354,6 +1355,8 @@ def start_raylet(redis_address,
f"--metrics-agent-port={metrics_agent_port}",
f"--metrics_export_port={metrics_export_port}",
]
if worker_port_list is not None:
command.append(f"--worker_port_list={worker_port_list}")
if start_initial_python_workers_for_first_job:
command.append("--num_initial_python_workers_for_first_job={}".format(
resource_spec.num_cpus))
+1
View File
@@ -720,6 +720,7 @@ class Node:
object_store_memory,
min_worker_port=self._ray_params.min_worker_port,
max_worker_port=self._ray_params.max_worker_port,
worker_port_list=self._ray_params.worker_port_list,
object_manager_port=self._ray_params.object_manager_port,
redis_password=self._ray_params.redis_password,
metrics_agent_port=self._ray_params.metrics_agent_port,
+19
View File
@@ -41,6 +41,9 @@ class RayParams:
on. If not set or set to 0, random ports will be chosen.
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
worker_port_list (str): An explicit list of ports to be used for
workers (comma-separated). Overrides min_worker_port and
max_worker_port.
object_ref_seed (int): Used to seed the deterministic generation of
object refs. The same value can be used across multiple runs of the
same job in order to generate the object refs in a consistent
@@ -116,6 +119,7 @@ class RayParams:
raylet_ip_address=None,
min_worker_port=None,
max_worker_port=None,
worker_port_list=None,
object_ref_seed=None,
driver_mode=None,
redirect_worker_output=None,
@@ -163,6 +167,7 @@ class RayParams:
self.raylet_ip_address = raylet_ip_address
self.min_worker_port = min_worker_port
self.max_worker_port = max_worker_port
self.worker_port_list = worker_port_list
self.driver_mode = driver_mode
self.redirect_worker_output = redirect_worker_output
self.redirect_output = redirect_output
@@ -252,6 +257,20 @@ class RayParams:
self._check_usage()
def _check_usage(self):
if self.worker_port_list is not None:
for port_str in self.worker_port_list.split(","):
try:
port = int(port_str)
except ValueError as e:
raise ValueError(
"worker_port_list must be a comma-separated " +
"list of integers: {}".format(e)) from None
if port < 1024 or port > 65535:
raise ValueError(
"Ports in worker_port_list must be "
"between 1024 and 65535. Got: {}".format(port))
# Used primarily for testing.
if os.environ.get("RAY_USE_RANDOM_PORTS", False):
if self.min_worker_port is None and self.min_worker_port is None:
+9 -3
View File
@@ -207,6 +207,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
default=10999,
help="the highest port number that workers will bind on. If set, "
"'--min-worker-port' must also be set.")
@click.option(
"--worker-port-list",
required=False,
help="a comma-separated list of open ports for workers to bind on. "
"Overrides '--min-worker-port' and '--max-worker-port'.")
@click.option(
"--memory",
required=False,
@@ -357,9 +362,9 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
@add_click_options(logging_options)
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, gcs_server_port,
min_worker_port, max_worker_port, memory, object_store_memory,
redis_max_memory, num_cpus, num_gpus, resources, head,
include_dashboard, dashboard_host, dashboard_port, block,
min_worker_port, max_worker_port, worker_port_list, memory,
object_store_memory, redis_max_memory, num_cpus, num_gpus, resources,
head, include_dashboard, dashboard_host, dashboard_port, block,
plasma_directory, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, java_worker_options, load_code_from_local,
@@ -401,6 +406,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
node_ip_address=node_ip_address,
min_worker_port=min_worker_port,
max_worker_port=max_worker_port,
worker_port_list=worker_port_list,
object_manager_port=object_manager_port,
node_manager_port=node_manager_port,
gcs_server_port=gcs_server_port,
+14
View File
@@ -405,6 +405,20 @@ def test_calling_start_ray_head(call_ray_stop_only):
])
check_call_ray(["stop"])
# Test starting Ray with a worker port list.
check_call_ray(["start", "--head", "--worker-port-list", "10000,10001"])
check_call_ray(["stop"])
# Test starting Ray with a non-int in the worker port list.
with pytest.raises(subprocess.CalledProcessError):
check_call_ray(["start", "--head", "--worker-port-list", "10000,a"])
check_call_ray(["stop"])
# Test starting Ray with an invalid port in the worker port list.
with pytest.raises(subprocess.CalledProcessError):
check_call_ray(["start", "--head", "--worker-port-list", "100"])
check_call_ray(["stop"])
# Test starting Ray with the number of CPUs specified.
check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"])
check_call_ray(["stop"])