Add ability to specify worker and driver ports (#7833)

This commit is contained in:
Edward Oakes
2020-04-16 13:49:25 -05:00
committed by GitHub
parent d5f517b2f5
commit 9f751ff8c4
20 changed files with 351 additions and 108 deletions
+2
View File
@@ -77,6 +77,8 @@ class Cluster:
"num_cpus": 1,
"num_gpus": 0,
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
"min_worker_port": 0,
"max_worker_port": 0,
}
if "_internal_config" in node_args:
node_args["_internal_config"] = json.loads(
+2
View File
@@ -586,6 +586,8 @@ class Node:
self._temp_dir,
self._session_dir,
self.get_resource_spec(),
self._ray_params.min_worker_port,
self._ray_params.max_worker_port,
self._ray_params.object_manager_port,
self._ray_params.redis_password,
use_valgrind=use_valgrind,
+34
View File
@@ -1,4 +1,5 @@
import logging
import os
import numpy as np
@@ -35,6 +36,10 @@ class RayParams:
node_ip_address (str): The IP address of the node that we are on.
raylet_ip_address (str): The IP address of the raylet that this node
connects to.
min_worker_port (int): The lowest port number that workers will bind
on. If not set or set to 0, random ports will be chosen.
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
same job in order to generate the object IDs in a consistent
@@ -98,6 +103,8 @@ class RayParams:
node_manager_port=None,
node_ip_address=None,
raylet_ip_address=None,
min_worker_port=None,
max_worker_port=None,
object_id_seed=None,
driver_mode=None,
redirect_worker_output=None,
@@ -135,6 +142,8 @@ class RayParams:
self.node_manager_port = node_manager_port
self.node_ip_address = node_ip_address
self.raylet_ip_address = raylet_ip_address
self.min_worker_port = min_worker_port
self.max_worker_port = max_worker_port
self.driver_mode = driver_mode
self.redirect_worker_output = redirect_worker_output
self.redirect_output = redirect_output
@@ -189,6 +198,31 @@ class RayParams:
self._check_usage()
def _check_usage(self):
# Used primarily for testing.
if os.environ.get("RAY_USE_RANDOM_PORTS", False):
if self.min_worker_port is None and self.min_worker_port is None:
self.min_worker_port = 0
self.max_worker_port = 0
if self.min_worker_port is not None:
if self.min_worker_port != 0 and (self.min_worker_port < 1024
or self.min_worker_port > 65535):
raise ValueError("min_worker_port must be 0 or an integer "
"between 1024 and 65535.")
if self.max_worker_port is not None:
if self.min_worker_port is None:
raise ValueError("If max_worker_port is set, min_worker_port "
"must also be set.")
elif self.max_worker_port != 0:
if self.max_worker_port < 1024 or self.max_worker_port > 65535:
raise ValueError(
"max_worker_port must be 0 or an integer between "
"1024 and 65535.")
elif self.max_worker_port <= self.min_worker_port:
raise ValueError("max_worker_port must be higher than "
"min_worker_port.")
if self.resources is not None:
assert "CPU" not in self.resources, (
"'CPU' should not be included in the resource dictionary. Use "
+21 -4
View File
@@ -158,6 +158,20 @@ def dashboard(cluster_config_file, cluster_name, port):
required=False,
type=int,
help="the port to use for starting the node manager")
@click.option(
"--min-worker-port",
required=False,
type=int,
default=10000,
help="the lowest port number that workers will bind on. If not set, "
"random ports will be chosen.")
@click.option(
"--max-worker-port",
required=False,
type=int,
default=10999,
help="the highest port number that workers will bind on. If set, "
"'--min-worker-port' must also be set.")
@click.option(
"--memory",
required=False,
@@ -277,10 +291,11 @@ def dashboard(cluster_config_file, cluster_name, port):
help="Specify whether load code from local file or GCS serialization.")
def start(node_ip_address, redis_address, address, redis_port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port, memory,
object_store_memory, redis_max_memory, num_cpus, num_gpus, resources,
head, include_webui, webui_host, block, plasma_directory, huge_pages,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
redis_shard_ports, object_manager_port, node_manager_port,
min_worker_port, max_worker_port, memory, object_store_memory,
redis_max_memory, num_cpus, num_gpus, resources, head, include_webui,
webui_host, block, plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config):
if redis_address is not None:
@@ -308,6 +323,8 @@ def start(node_ip_address, redis_address, address, redis_port,
redirect_output = None if not no_redirect_output else True
ray_params = ray.parameter.RayParams(
node_ip_address=node_ip_address,
min_worker_port=min_worker_port,
max_worker_port=max_worker_port,
object_manager_port=object_manager_port,
node_manager_port=node_manager_port,
memory=memory,
+14
View File
@@ -1223,6 +1223,8 @@ def start_raylet(redis_address,
temp_dir,
session_dir,
resource_spec,
min_worker_port=None,
max_worker_port=None,
object_manager_port=None,
redis_password=None,
use_valgrind=False,
@@ -1251,6 +1253,10 @@ def start_raylet(redis_address,
resource_spec (ResourceSpec): Resources for this raylet.
object_manager_port: The port to use for the object manager. If this is
None, then the object manager will choose its own port.
min_worker_port (int): The lowest port number that workers will bind
on. If not set, random ports will be chosen.
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
redis_password: The password to use when connecting to Redis.
use_valgrind (bool): True if the raylet should be started inside
of valgrind. If this is True, use_profiler must be False.
@@ -1328,6 +1334,12 @@ def start_raylet(redis_address,
if object_manager_port is None:
object_manager_port = 0
if min_worker_port is None:
min_worker_port = 0
if max_worker_port is None:
max_worker_port = 0
if load_code_from_local:
start_worker_command += ["--load-code-from-local"]
@@ -1336,6 +1348,8 @@ def start_raylet(redis_address,
"--raylet_socket_name={}".format(raylet_name),
"--store_socket_name={}".format(plasma_store_name),
"--object_manager_port={}".format(object_manager_port),
"--min_worker_port={}".format(min_worker_port),
"--max_worker_port={}".format(max_worker_port),
"--node_manager_port={}".format(node_manager_port),
"--node_ip_address={}".format(node_ip_address),
"--redis_address={}".format(gcs_ip_address),
+7
View File
@@ -376,6 +376,13 @@ def test_calling_start_ray_head(call_ray_stop_only):
])
subprocess.check_output(["ray", "stop"])
# Test starting Ray with the worker port range specified.
subprocess.check_output([
"ray", "start", "--head", "--min-worker-port", "12345",
"--max-worker-port", "12346"
])
subprocess.check_output(["ray", "stop"])
# Test starting Ray with the number of CPUs specified.
subprocess.check_output(["ray", "start", "--head", "--num-cpus", "2"])
subprocess.check_output(["ray", "stop"])