From f6cfc44dbd17e323433287fa65be290f3316f484 Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Wed, 10 Feb 2021 20:17:20 -0800 Subject: [PATCH] [autoscaler] run setup commands with restart_only=True (#13836) --- python/ray/autoscaler/_private/commands.py | 10 ++++++-- python/ray/autoscaler/_private/updater.py | 10 +++++++- python/ray/tests/test_autoscaler.py | 30 +++++++++++++++++++++- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 336dca40f..d967543ff 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -646,7 +646,12 @@ def get_or_create_head_node(config: Dict[str, Any], cli_logger.print("Prepared bootstrap config") if restart_only: - setup_commands = [] + # Docker may re-launch nodes, requiring setup + # commands to be rerun. + if config.get("docker", {}).get("container_name"): + setup_commands = config["head_setup_commands"] + else: + setup_commands = [] ray_start_commands = config["head_start_ray_commands"] elif no_restart: setup_commands = config["head_setup_commands"] @@ -678,7 +683,8 @@ def get_or_create_head_node(config: Dict[str, Any], "rsync_exclude": config.get("rsync_exclude"), "rsync_filter": config.get("rsync_filter") }, - docker_config=config.get("docker")) + docker_config=config.get("docker"), + restart_only=restart_only) updater.start() updater.join() diff --git a/python/ray/autoscaler/_private/updater.py b/python/ray/autoscaler/_private/updater.py index 7256d9046..14981252c 100644 --- a/python/ray/autoscaler/_private/updater.py +++ b/python/ray/autoscaler/_private/updater.py @@ -48,6 +48,7 @@ class NodeUpdater: use_internal_ip: Wwhether the node_id belongs to an internal ip or external ip. docker_config: Docker section of autoscaler yaml + restart_only: Whether to skip setup commands & just restart ray """ def __init__(self, @@ -68,7 +69,8 @@ class NodeUpdater: rsync_options=None, process_runner=subprocess, use_internal_ip=False, - docker_config=None): + docker_config=None, + restart_only=False): self.log_prefix = "NodeUpdater: {}: ".format(node_id) use_internal_ip = (use_internal_ip @@ -106,6 +108,7 @@ class NodeUpdater: self.auth_config = auth_config self.is_head_node = is_head_node self.docker_config = docker_config + self.restart_only = restart_only def run(self): if cmd_output_util.does_allow_interactive( @@ -298,6 +301,11 @@ class NodeUpdater: sync_run_yet=False) if init_required: node_tags[TAG_RAY_RUNTIME_CONFIG] += "-invalidate" + # This ensures that `setup_commands` are not removed + self.restart_only = False + + if self.restart_only: + self.setup_commands = [] # runtime_hash will only change whenever the user restarts # or updates their cluster with `get_or_create_head_node` diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 204ed1ef8..925cb1d20 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -500,7 +500,7 @@ class AutoscalingTest(unittest.TestCase): _provider=self.provider, _runner=runner) self.waitForNodes(1) - # Init & Setup commands msut be run for Docker! + # Init & Setup commands must be run for Docker! runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") @@ -543,6 +543,34 @@ class AutoscalingTest(unittest.TestCase): assert first_mkdir < first_rsync assert first_rsync < first_cp + def testGetOrCreateHeadNodeFromStoppedRestartOnly(self): + self.testGetOrCreateHeadNode() + self.provider.cache_stopped = True + existing_nodes = self.provider.non_terminated_nodes({}) + assert len(existing_nodes) == 1 + self.provider.terminate_node(existing_nodes[0]) + config_path = self.write_config(SMALL_CLUSTER) + runner = MockProcessRunner() + runner.respond_to_call("json .Mounts", ["[]"]) + # Two initial calls to docker cp, + 2 more calls during run_init + runner.respond_to_call(".State.Running", + ["false", "false", "false", "false"]) + runner.respond_to_call("json .Config.Env", ["[]"]) + commands.get_or_create_head_node( + SMALL_CLUSTER, + printable_config_file=config_path, + no_restart=False, + restart_only=True, + yes=True, + override_cluster_name=None, + _provider=self.provider, + _runner=runner) + self.waitForNodes(1) + # Init & Setup commands must be run for Docker! + runner.assert_has_call("1.2.3.4", "init_cmd") + runner.assert_has_call("1.2.3.4", "head_setup_cmd") + runner.assert_has_call("1.2.3.4", "start_ray_head") + @unittest.skipIf(sys.platform == "win32", "Failing on Windows.") def testDockerFileMountsAdded(self): config = copy.deepcopy(SMALL_CLUSTER)