From dbcb368dea20a178eab524efa1eaa18d27e36d51 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 21 Oct 2020 14:46:45 -0500 Subject: [PATCH] Add --worker-port-list option to ray start (#11481) --- python/ray/_private/services.py | 3 +++ python/ray/node.py | 1 + python/ray/parameter.py | 19 +++++++++++++++++++ python/ray/scripts/scripts.py | 12 +++++++++--- python/ray/tests/test_multi_node.py | 14 ++++++++++++++ src/ray/raylet/main.cc | 15 +++++++++++++-- src/ray/raylet/node_manager.cc | 4 ++-- src/ray/raylet/node_manager.h | 3 +++ src/ray/raylet/worker_pool.cc | 10 ++++++++-- src/ray/raylet/worker_pool.h | 3 +++ src/ray/raylet/worker_pool_test.cc | 2 +- 11 files changed, 76 insertions(+), 10 deletions(-) diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 60c8e8885..430aab5ce 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1156,6 +1156,7 @@ def start_raylet(redis_address, object_store_memory, min_worker_port=None, max_worker_port=None, + worker_port_list=None, object_manager_port=None, redis_password=None, metrics_agent_port=None, @@ -1354,6 +1355,8 @@ def start_raylet(redis_address, f"--metrics-agent-port={metrics_agent_port}", f"--metrics_export_port={metrics_export_port}", ] + if worker_port_list is not None: + command.append(f"--worker_port_list={worker_port_list}") if start_initial_python_workers_for_first_job: command.append("--num_initial_python_workers_for_first_job={}".format( resource_spec.num_cpus)) diff --git a/python/ray/node.py b/python/ray/node.py index 51dc143bd..86d0c21d8 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -720,6 +720,7 @@ class Node: object_store_memory, min_worker_port=self._ray_params.min_worker_port, max_worker_port=self._ray_params.max_worker_port, + worker_port_list=self._ray_params.worker_port_list, object_manager_port=self._ray_params.object_manager_port, redis_password=self._ray_params.redis_password, metrics_agent_port=self._ray_params.metrics_agent_port, diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 811d9539a..52da19f04 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -41,6 +41,9 @@ class RayParams: on. If not set or set to 0, random ports will be chosen. max_worker_port (int): The highest port number that workers will bind on. If set, min_worker_port must also be set. + worker_port_list (str): An explicit list of ports to be used for + workers (comma-separated). Overrides min_worker_port and + max_worker_port. object_ref_seed (int): Used to seed the deterministic generation of object refs. The same value can be used across multiple runs of the same job in order to generate the object refs in a consistent @@ -116,6 +119,7 @@ class RayParams: raylet_ip_address=None, min_worker_port=None, max_worker_port=None, + worker_port_list=None, object_ref_seed=None, driver_mode=None, redirect_worker_output=None, @@ -163,6 +167,7 @@ class RayParams: self.raylet_ip_address = raylet_ip_address self.min_worker_port = min_worker_port self.max_worker_port = max_worker_port + self.worker_port_list = worker_port_list self.driver_mode = driver_mode self.redirect_worker_output = redirect_worker_output self.redirect_output = redirect_output @@ -252,6 +257,20 @@ class RayParams: self._check_usage() def _check_usage(self): + if self.worker_port_list is not None: + for port_str in self.worker_port_list.split(","): + try: + port = int(port_str) + except ValueError as e: + raise ValueError( + "worker_port_list must be a comma-separated " + + "list of integers: {}".format(e)) from None + + if port < 1024 or port > 65535: + raise ValueError( + "Ports in worker_port_list must be " + "between 1024 and 65535. Got: {}".format(port)) + # Used primarily for testing. if os.environ.get("RAY_USE_RANDOM_PORTS", False): if self.min_worker_port is None and self.min_worker_port is None: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index ddb0c3cb8..606183053 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -207,6 +207,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): default=10999, help="the highest port number that workers will bind on. If set, " "'--min-worker-port' must also be set.") +@click.option( + "--worker-port-list", + required=False, + help="a comma-separated list of open ports for workers to bind on. " + "Overrides '--min-worker-port' and '--max-worker-port'.") @click.option( "--memory", required=False, @@ -357,9 +362,9 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): @add_click_options(logging_options) def start(node_ip_address, address, port, redis_password, redis_shard_ports, object_manager_port, node_manager_port, gcs_server_port, - min_worker_port, max_worker_port, memory, object_store_memory, - redis_max_memory, num_cpus, num_gpus, resources, head, - include_dashboard, dashboard_host, dashboard_port, block, + min_worker_port, max_worker_port, worker_port_list, memory, + object_store_memory, redis_max_memory, num_cpus, num_gpus, resources, + head, include_dashboard, dashboard_host, dashboard_port, block, plasma_directory, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, java_worker_options, load_code_from_local, @@ -401,6 +406,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, node_ip_address=node_ip_address, min_worker_port=min_worker_port, max_worker_port=max_worker_port, + worker_port_list=worker_port_list, object_manager_port=object_manager_port, node_manager_port=node_manager_port, gcs_server_port=gcs_server_port, diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index c9cb3cfbd..cb206112d 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -405,6 +405,20 @@ def test_calling_start_ray_head(call_ray_stop_only): ]) check_call_ray(["stop"]) + # Test starting Ray with a worker port list. + check_call_ray(["start", "--head", "--worker-port-list", "10000,10001"]) + check_call_ray(["stop"]) + + # Test starting Ray with a non-int in the worker port list. + with pytest.raises(subprocess.CalledProcessError): + check_call_ray(["start", "--head", "--worker-port-list", "10000,a"]) + check_call_ray(["stop"]) + + # Test starting Ray with an invalid port in the worker port list. + with pytest.raises(subprocess.CalledProcessError): + check_call_ray(["start", "--head", "--worker-port-list", "100"]) + check_call_ray(["stop"]) + # Test starting Ray with the number of CPUs specified. check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"]) check_call_ray(["stop"]) diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 474d594f4..4ee33551b 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -36,6 +36,8 @@ DEFINE_int32(min_worker_port, 0, "The lowest port that workers' gRPC servers will bind on."); DEFINE_int32(max_worker_port, 0, "The highest port that workers' gRPC servers will bind on."); +DEFINE_string(worker_port_list, "", + "An explicit list of ports that workers' gRPC servers will bind on."); DEFINE_int32(num_initial_workers, 0, "Number of initial workers."); DEFINE_int32(num_initial_python_workers_for_first_job, 0, "Number of initial Python workers for the first job."); @@ -75,6 +77,7 @@ int main(int argc, char *argv[]) { const int redis_port = static_cast(FLAGS_redis_port); const int min_worker_port = static_cast(FLAGS_min_worker_port); const int max_worker_port = static_cast(FLAGS_max_worker_port); + const std::string worker_port_list = FLAGS_worker_port_list; const int num_initial_workers = static_cast(FLAGS_num_initial_workers); const int num_initial_python_workers_for_first_job = static_cast(FLAGS_num_initial_python_workers_for_first_job); @@ -150,6 +153,15 @@ int main(int argc, char *argv[]) { RayConfig::instance().initialize(raylet_config); + // Parse the worker port list. + std::istringstream worker_port_list_string(worker_port_list); + std::string worker_port; + std::vector worker_ports; + + while (std::getline(worker_port_list_string, worker_port, ',')) { + worker_ports.push_back(std::stoi(worker_port)); + } + // Parse the resource list. std::istringstream resource_string(static_resource_list); std::string resource_name; @@ -157,8 +169,6 @@ int main(int argc, char *argv[]) { while (std::getline(resource_string, resource_name, ',')) { RAY_CHECK(std::getline(resource_string, resource_quantity, ',')); - // TODO(rkn): The line below could throw an exception. What should we do - // about this? static_resource_conf[resource_name] = std::stod(resource_quantity); } auto num_cpus_it = static_resource_conf.find("CPU"); @@ -180,6 +190,7 @@ int main(int argc, char *argv[]) { node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency; node_manager_config.min_worker_port = min_worker_port; node_manager_config.max_worker_port = max_worker_port; + node_manager_config.worker_ports = worker_ports; if (!python_worker_command.empty()) { node_manager_config.worker_commands.emplace( diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9b73cbd76..369edaf08 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -142,8 +142,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self io_service, config.num_initial_workers, config.num_workers_soft_limit, config.num_initial_python_workers_for_first_job, config.maximum_startup_concurrency, config.min_worker_port, - config.max_worker_port, gcs_client_, config.worker_commands, - config.raylet_config, + config.max_worker_port, config.worker_ports, gcs_client_, + config.worker_commands, config.raylet_config, /*starting_worker_timeout_callback=*/ [this]() { this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); }), scheduling_policy_(local_queues_), diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 49b0ae385..0d8836c70 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -65,6 +65,9 @@ struct NodeManagerConfig { /// The highest port number that workers started will bind on. /// If this is not set to 0, min_worker_port must also not be set to 0. int max_worker_port; + /// An explicit list of open ports that workers started will bind + /// on. This takes precedence over min_worker_port and max_worker_port. + std::vector worker_ports; /// The initial number of workers to create. int num_initial_workers; /// The soft limit of the number of workers. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 8bf0a44af..2d32e79da 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -58,7 +58,8 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, int num_workers_soft_limit, int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, int min_worker_port, - int max_worker_port, std::shared_ptr gcs_client, + int max_worker_port, const std::vector &worker_ports, + std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands, const std::unordered_map &raylet_config, std::function starting_worker_timeout_callback) @@ -114,7 +115,12 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, RAY_CHECK(!state.worker_command.empty()) << "Worker command must not be empty."; } // Initialize free ports list with all ports in the specified range. - if (min_worker_port != 0) { + if (!worker_ports.empty()) { + free_ports_ = std::unique_ptr>(new std::queue()); + for (int port : worker_ports) { + free_ports_->push(port); + } + } else if (min_worker_port != 0) { if (max_worker_port == 0) { max_worker_port = 65535; // Maximum valid port number. } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index fce840368..981493a2d 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -83,6 +83,8 @@ class WorkerPool : public WorkerPoolInterface { /// If this is set to 0, workers will bind on random ports. /// \param max_worker_port The highest port number that workers started will bind on. /// If this is not set to 0, min_worker_port must also not be set to 0. + /// \param worker_ports An explicit list of open ports that workers started will bind + /// on. This takes precedence over min_worker_port and max_worker_port. /// \param worker_commands The commands used to start the worker process, grouped by /// language. /// \param raylet_config The raylet config list of this node. @@ -91,6 +93,7 @@ class WorkerPool : public WorkerPoolInterface { WorkerPool(boost::asio::io_service &io_service, int num_workers, int num_workers_soft_limit, int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, int min_worker_port, int max_worker_port, + const std::vector &worker_ports, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands, const std::unordered_map &raylet_config, diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 47f2cbdd7..219c4a444 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -34,7 +34,7 @@ class WorkerPoolMock : public WorkerPool { public: explicit WorkerPoolMock(boost::asio::io_service &io_service, const WorkerCommandMap &worker_commands) - : WorkerPool(io_service, 0, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, nullptr, + : WorkerPool(io_service, 0, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands, {}, []() {}), last_worker_process_() { states_by_lang_[ray::Language::JAVA].num_workers_per_process =