diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 43ea7b2b6..179881de3 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -450,7 +450,8 @@ class StandardAutoscaler: process_runner=self.process_runner, use_internal_ip=True, is_head_node=False, - docker_config=self.config.get("docker")) + docker_config=self.config.get("docker"), + node_resources=self._node_resources(node_id)) updater.start() self.updaters[node_id] = updater diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index c3a7dffe1..a88015a94 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -473,7 +473,7 @@ def warn_about_bad_start_command(start_commands: List[str]) -> None: def get_or_create_head_node(config: Dict[str, Any], - config_file: str, + printable_config_file: str, no_restart: bool, restart_only: bool, yes: bool, @@ -487,7 +487,6 @@ def get_or_create_head_node(config: Dict[str, Any], config["cluster_name"])) config = copy.deepcopy(config) - config_file = os.path.abspath(config_file) head_node_tags = { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, } @@ -531,13 +530,18 @@ def get_or_create_head_node(config: Dict[str, Any], _abort=True) cli_logger.newline() - # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync) head_node_config = copy.deepcopy(config["head_node"]) + head_node_resources = None if "head_node_type" in config: - head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"] - head_node_config.update(config["available_node_types"][config[ - "head_node_type"]]["node_config"]) + head_node_type = config["head_node_type"] + head_node_tags[TAG_RAY_USER_NODE_TYPE] = head_node_type + head_config = config["available_node_types"][head_node_type] + head_node_config.update(head_config["node_config"]) + + # Not necessary to keep in sync with node_launcher.py + # Keep in sync with autoscaler.py _node_resources + head_node_resources = head_config.get("resources") launch_hash = hash_launch_conf(head_node_config, config["auth"]) if head_node is None or provider.node_tags(head_node).get( @@ -659,6 +663,7 @@ def get_or_create_head_node(config: Dict[str, Any], runtime_hash=runtime_hash, file_mounts_contents_hash=file_mounts_contents_hash, is_head_node=True, + node_resources=head_node_resources, rsync_options={ "rsync_exclude": config.get("rsync_exclude"), "rsync_filter": config.get("rsync_filter") @@ -688,13 +693,15 @@ def get_or_create_head_node(config: Dict[str, Any], cli_logger.newline() with cli_logger.group("Useful commands"): + printable_config_file = os.path.abspath(printable_config_file) cli_logger.print("Monitor autoscaling with") cli_logger.print( - cf.bold(" ray exec {}{} {}"), config_file, modifiers, + cf.bold(" ray exec {}{} {}"), printable_config_file, modifiers, quote(monitor_str)) cli_logger.print("Connect to a terminal on the cluster head:") - cli_logger.print(cf.bold(" ray attach {}{}"), config_file, modifiers) + cli_logger.print( + cf.bold(" ray attach {}{}"), printable_config_file, modifiers) remote_shell_str = updater.cmd_runner.remote_shell_command_str() cli_logger.print("Get a remote shell to the cluster manually:") @@ -1024,7 +1031,7 @@ def _get_worker_nodes(config: Dict[str, Any], def _get_head_node(config: Dict[str, Any], - config_file: str, + printable_config_file: str, override_cluster_name: Optional[str], create_if_needed: bool = False) -> str: provider = _get_node_provider(config["provider"], config["cluster_name"]) @@ -1039,13 +1046,16 @@ def _get_head_node(config: Dict[str, Any], elif create_if_needed: get_or_create_head_node( config, - config_file, + printable_config_file=printable_config_file, restart_only=False, no_restart=False, yes=True, override_cluster_name=override_cluster_name) return _get_head_node( - config, config_file, override_cluster_name, create_if_needed=False) + config, + printable_config_file, + override_cluster_name, + create_if_needed=False) else: raise RuntimeError("Head node of cluster ({}) not found!".format( config["cluster_name"])) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 87b4c8add..0965744b9 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -393,7 +393,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]"]) commands.get_or_create_head_node( SMALL_CLUSTER, - config_path, + printable_config_file=config_path, no_restart=False, restart_only=False, yes=True, diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt index 2e70f7aa6..f64150fac 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt @@ -72,9 +72,9 @@ .+\.py.*Full command is `ssh.+` .+\.py.*NodeUpdater: i-.+: Setup commands succeeded \[LogTimer=.+\] .+\.py.*\[6/6\] Starting the Ray runtime -.+\.py.*Running `ray stop` +.+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray stop` .+\.py.*Full command is `ssh.+` -.+\.py.*Running `ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml` +.+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml` .+\.py.*Full command is `ssh.+` .+\.py.*NodeUpdater: i-.+: Ray start commands succeeded \[LogTimer=.+\] .+\.py.*NodeUpdater: i-.+: Applied config .+ \[LogTimer=.+\] diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 8be955c11..db9fa6b5b 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -1005,7 +1005,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]"]) get_or_create_head_node( MULTI_WORKER_CLUSTER, - config_path, + printable_config_file=config_path, no_restart=False, restart_only=False, yes=True, @@ -1025,6 +1025,37 @@ class AutoscalingTest(unittest.TestCase): self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE), "empty_node") + def testGetOrCreateMultiNodeTypeCustomHeadResources(self): + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"]["empty_node"]["resources"] = { + "empty_resource_name": 1000 + } + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + get_or_create_head_node( + config, + printable_config_file=config_path, + no_restart=False, + restart_only=False, + yes=True, + override_cluster_name=None, + _provider=self.provider, + _runner=runner) + self.waitForNodes(1) + runner.assert_has_call("1.2.3.4", "init_cmd") + runner.assert_has_call("1.2.3.4", "setup_cmd") + runner.assert_has_call("1.2.3.4", "start_ray_head") + runner.assert_has_call("1.2.3.4", "empty_resource_name") + self.assertEqual(self.provider.mock_nodes[0].node_type, "empty_node") + self.assertEqual( + self.provider.mock_nodes[0].node_config.get("FooProperty"), 42) + self.assertEqual( + self.provider.mock_nodes[0].node_config.get("TestProp"), 1) + self.assertEqual( + self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE), + "empty_node") + def testScaleUpMinSanity(self): config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["available_node_types"]["m4.large"]["min_workers"] = \