From fd854ff090c29d82d7827c01704b76befcc83618 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 28 Oct 2018 17:28:41 -0700 Subject: [PATCH] =?UTF-8?q?Allow=20the=20node=20manager=20port=20and=20obj?= =?UTF-8?q?ect=20manager=20port=20to=20be=20set=20through=E2=80=A6=20(#313?= =?UTF-8?q?0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Allow the node manager port and object manager port to be set through ray start. * Linting * Fix Java test * Address comments. --- .../org/ray/runtime/runner/RunManager.java | 2 + python/ray/scripts/scripts.py | 26 ++++----- python/ray/services.py | 54 +++++++++++++++++-- src/ray/object_manager/object_manager.h | 4 ++ src/ray/raylet/main.cc | 24 +++++---- src/ray/raylet/node_manager.h | 8 +++ src/ray/raylet/raylet.cc | 9 ++-- test/multi_node_test.py | 9 ++-- 8 files changed, 104 insertions(+), 32 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 940d0e78b..8a5aab376 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -179,6 +179,8 @@ public class RunManager { rayConfig.rayletExecutablePath, rayConfig.rayletSocketName, rayConfig.objectStoreSocketName, + "0", // The object manager port. + "0", // The node manager port. rayConfig.nodeIp, rayConfig.getRedisIp(), rayConfig.getRedisPort().toString(), diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index fceaabcd2..7f7a7999d 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -104,6 +104,11 @@ def cli(logging_level, logging_format): required=False, type=int, help="the port to use for starting the object manager") +@click.option( + "--node-manager-port", + required=False, + type=int, + help="the port to use for starting the node manager") @click.option( "--object-store-memory", required=False, @@ -190,11 +195,11 @@ def cli(logging_level, logging_format): help="manually specify the root temporary dir of the Ray process") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, - object_manager_port, object_store_memory, num_workers, num_cpus, - num_gpus, resources, head, no_ui, block, plasma_directory, - huge_pages, autoscaling_config, no_redirect_worker_output, - no_redirect_output, plasma_store_socket_name, raylet_socket_name, - temp_dir): + object_manager_port, node_manager_port, object_store_memory, + num_workers, num_cpus, num_gpus, resources, head, no_ui, block, + plasma_directory, huge_pages, autoscaling_config, + no_redirect_worker_output, no_redirect_output, + plasma_store_socket_name, raylet_socket_name, temp_dir): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -243,15 +248,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, logger.info("Using IP address {} for this node." .format(node_ip_address)) - address_info = {} - # Use the provided object manager port if there is one. - if object_manager_port is not None: - address_info["object_manager_ports"] = [object_manager_port] - if address_info == {}: - address_info = None - address_info = services.start_ray_head( - address_info=address_info, + object_manager_ports=[object_manager_port], + node_manager_ports=[node_manager_port], node_ip_address=node_ip_address, redis_port=redis_port, redis_shard_ports=redis_shard_ports, @@ -337,6 +336,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, node_ip_address=node_ip_address, redis_address=redis_address, object_manager_ports=[object_manager_port], + node_manager_ports=[node_manager_port], num_workers=num_workers, object_store_memory=object_store_memory, redis_password=redis_password, diff --git a/python/ray/services.py b/python/ray/services.py index d0ddd2b32..651cbb053 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -849,6 +849,8 @@ def start_raylet(redis_address, plasma_store_name, worker_path, resources=None, + object_manager_port=None, + node_manager_port=None, num_workers=0, use_valgrind=False, use_profiler=False, @@ -867,6 +869,13 @@ def start_raylet(redis_address, raylet_name (str): The name of the raylet socket to create. worker_path (str): The path of the script to use when the local scheduler starts up new workers. + resources: The resources that this raylet has. + object_manager_port (int): The port to use for the object manager. If + this is not provided, we will use 0 and the object manager will + choose its own port. + node_manager_port (int): The port to use for the node manager. If + this is not provided, we will use 0 and the node manager will + choose its own port. use_valgrind (bool): True if the raylet should be started inside of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the raylet should be started inside @@ -915,10 +924,21 @@ def start_raylet(redis_address, if redis_password: start_worker_command += " --redis-password {}".format(redis_password) + # If the object manager port is None, then use 0 to cause the object + # manager to choose its own port. + if object_manager_port is None: + object_manager_port = 0 + # If the node manager port is None, then use 0 to cause the node manager + # to choose its own port. + if node_manager_port is None: + node_manager_port = 0 + command = [ RAYLET_EXECUTABLE, raylet_name, plasma_store_name, + str(object_manager_port), + str(node_manager_port), node_ip_address, gcs_ip_address, gcs_port, @@ -1159,6 +1179,8 @@ def start_raylet_monitor(redis_address, def start_ray_processes(address_info=None, + object_manager_ports=None, + node_manager_ports=None, node_ip_address="127.0.0.1", redis_port=None, redis_shard_ports=None, @@ -1188,6 +1210,12 @@ def start_ray_processes(address_info=None, address_info (dict): A dictionary with address information for processes that have already been started. If provided, address_info will be modified to include processes that are newly started. + object_manager_ports (list): A list of the ports to use for the object + managers. There should be one per object manager being started on + this node (typically just one). + node_manager_ports (list): A list of the ports to use for the node + managers. There should be one per node manager being started on + this node (typically just one). node_ip_address (str): The IP address of this node. redis_port (int): The port that the primary Redis shard should listen to. If None, then a random port will be chosen. If the key @@ -1341,11 +1369,14 @@ def start_ray_processes(address_info=None, raylet_socket_names = address_info["raylet_socket_names"] # Get the ports to use for the object managers if any are provided. - object_manager_ports = (address_info["object_manager_ports"] if - "object_manager_ports" in address_info else None) if not isinstance(object_manager_ports, list): + assert object_manager_ports is None or num_local_schedulers == 1 object_manager_ports = num_local_schedulers * [object_manager_ports] assert len(object_manager_ports) == num_local_schedulers + if not isinstance(node_manager_ports, list): + assert node_manager_ports is None or num_local_schedulers == 1 + node_manager_ports = num_local_schedulers * [node_manager_ports] + assert len(node_manager_ports) == num_local_schedulers # Start any object stores that do not yet exist. for i in range(num_local_schedulers - len(object_store_addresses)): @@ -1378,6 +1409,8 @@ def start_ray_processes(address_info=None, raylet_socket_name or get_raylet_socket_name(), object_store_addresses[i], worker_path, + object_manager_port=object_manager_ports[i], + node_manager_port=node_manager_ports[i], resources=resources[i], num_workers=workers_per_local_scheduler[i], stdout_file=raylet_stdout_file, @@ -1402,6 +1435,7 @@ def start_ray_processes(address_info=None, def start_ray_node(node_ip_address, redis_address, object_manager_ports=None, + node_manager_ports=None, num_workers=0, num_local_schedulers=1, object_store_memory=None, @@ -1427,6 +1461,9 @@ def start_ray_node(node_ip_address, object_manager_ports (list): A list of the ports to use for the object managers. There should be one per object manager being started on this node (typically just one). + node_manager_ports (list): A list of the ports to use for the node + managers. There should be one per node manager being started on + this node (typically just one). num_workers (int): The number of workers to start. num_local_schedulers (int): The number of local schedulers to start. This is also the number of plasma stores and plasma managers to @@ -1463,10 +1500,11 @@ def start_ray_node(node_ip_address, """ address_info = { "redis_address": redis_address, - "object_manager_ports": object_manager_ports } return start_ray_processes( address_info=address_info, + object_manager_ports=object_manager_ports, + node_manager_ports=node_manager_ports, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, @@ -1486,6 +1524,8 @@ def start_ray_node(node_ip_address, def start_ray_head(address_info=None, + object_manager_ports=None, + node_manager_ports=None, node_ip_address="127.0.0.1", redis_port=None, redis_shard_ports=None, @@ -1514,6 +1554,12 @@ def start_ray_head(address_info=None, address_info (dict): A dictionary with address information for processes that have already been started. If provided, address_info will be modified to include processes that are newly started. + object_manager_ports (list): A list of the ports to use for the object + managers. There should be one per object manager being started on + this node (typically just one). + node_manager_ports (list): A list of the ports to use for the node + managers. There should be one per node manager being started on + this node (typically just one). node_ip_address (str): The IP address of this node. redis_port (int): The port that the primary Redis shard should listen to. If None, then a random port will be chosen. If the key @@ -1570,6 +1616,8 @@ def start_ray_head(address_info=None, num_redis_shards = 1 if num_redis_shards is None else num_redis_shards return start_ray_processes( address_info=address_info, + object_manager_ports=object_manager_ports, + node_manager_ports=node_manager_ports, node_ip_address=node_ip_address, redis_port=redis_port, redis_shard_ports=redis_shard_ports, diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index aaf40dd08..243e29d5b 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -29,6 +29,10 @@ namespace ray { struct ObjectManagerConfig { + /// The port that the object manager should use to listen for connections + /// from other object managers. If this is 0, the object manager will choose + /// its own port. + int object_manager_port; /// The time in milliseconds to wait before retrying a pull /// that fails due to client id lookup. uint pull_timeout_ms; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index cadc0113b..3e2483ee6 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -20,19 +20,21 @@ int main(int argc, char *argv[]) { ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc == 11 || argc == 12); + RAY_CHECK(argc == 13 || argc == 14); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); - const std::string node_ip_address = std::string(argv[3]); - const std::string redis_address = std::string(argv[4]); - int redis_port = std::stoi(argv[5]); - int num_initial_workers = std::stoi(argv[6]); - int maximum_startup_concurrency = std::stoi(argv[7]); - const std::string static_resource_list = std::string(argv[8]); - const std::string python_worker_command = std::string(argv[9]); - const std::string java_worker_command = std::string(argv[10]); - const std::string redis_password = (argc == 12 ? std::string(argv[11]) : ""); + int object_manager_port = std::stoi(argv[3]); + int node_manager_port = std::stoi(argv[4]); + const std::string node_ip_address = std::string(argv[5]); + const std::string redis_address = std::string(argv[6]); + int redis_port = std::stoi(argv[7]); + int num_initial_workers = std::stoi(argv[8]); + int maximum_startup_concurrency = std::stoi(argv[9]); + const std::string static_resource_list = std::string(argv[10]); + const std::string python_worker_command = std::string(argv[11]); + const std::string java_worker_command = std::string(argv[12]); + const std::string redis_password = (argc == 14 ? std::string(argv[13]) : ""); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; @@ -51,6 +53,7 @@ int main(int argc, char *argv[]) { ray::raylet::ResourceSet(std::move(static_resource_conf)); RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " << node_manager_config.resource_config.ToString(); + node_manager_config.node_manager_port = node_manager_port; node_manager_config.num_initial_workers = num_initial_workers; node_manager_config.num_workers_per_process = RayConfig::instance().num_workers_per_process(); @@ -76,6 +79,7 @@ int main(int argc, char *argv[]) { // Configuration for the object manager. ray::ObjectManagerConfig object_manager_config; + object_manager_config.object_manager_port = object_manager_port; object_manager_config.store_socket_name = store_socket_name; object_manager_config.pull_timeout_ms = RayConfig::instance().object_manager_pull_timeout_ms(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 2e5d7605f..483f701d3 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -23,15 +23,23 @@ namespace ray { namespace raylet { struct NodeManagerConfig { + /// The node's resource configuration. ResourceSet resource_config; + /// The port to use for listening to incoming connections. If this is 0 then + /// the node manager will choose its own port. + int node_manager_port; + /// The initial number of workers to create. int num_initial_workers; + /// The number of workers per process. int num_workers_per_process; /// The maximum number of workers that can be started concurrently by a /// worker pool. int maximum_startup_concurrency; /// The commands used to start the worker process, grouped by language. std::unordered_map> worker_commands; + /// The time between heartbeats in milliseconds. uint64_t heartbeat_period_ms; + /// the maximum lineage size. uint64_t max_lineage_size; /// The store socket name. std::string store_socket_name; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 11b54b65b..679b18052 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -24,10 +24,13 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), socket_(main_service), object_manager_acceptor_( - main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), + main_service, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), + object_manager_config.object_manager_port)), object_manager_socket_(main_service), - node_manager_acceptor_( - main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), + node_manager_acceptor_(main_service, boost::asio::ip::tcp::endpoint( + boost::asio::ip::tcp::v4(), + node_manager_config.node_manager_port)), node_manager_socket_(main_service) { // Start listening for clients. DoAccept(); diff --git a/test/multi_node_test.py b/test/multi_node_test.py index 8d3be724e..a9b54d874 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -285,9 +285,12 @@ def test_calling_start_ray_head(): ["ray", "start", "--head", "--node-ip-address", "127.0.0.1"]) subprocess.Popen(["ray", "stop"]).wait() - # Test starting Ray with an object manager port specified. - run_and_get_output( - ["ray", "start", "--head", "--object-manager-port", "12345"]) + # Test starting Ray with the object manager and node manager ports + # specified. + run_and_get_output([ + "ray", "start", "--head", "--object-manager-port", "12345", + "--node-manager-port", "54321" + ]) subprocess.Popen(["ray", "stop"]).wait() # Test starting Ray with the number of CPUs specified.