diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 5cd4e441a..3ae962a83 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -294,10 +294,16 @@ class NodeUpdater: node_tags = self.provider.node_tags(self.node_id) logger.debug("Node tags: {}".format(str(node_tags))) + if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash: + # When resuming from a stopped instance the runtime_hash may be the + # same, but the container will not be started. + self.cmd_runner.run_init( + as_head=self.is_head_node, file_mounts=self.file_mounts) + # runtime_hash will only change whenever the user restarts # or updates their cluster with `get_or_create_head_node` if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash and ( - self.file_mounts_contents_hash is None + not self.file_mounts_contents_hash or node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS) == self.file_mounts_contents_hash): # todo: we lie in the confirmation message since @@ -310,11 +316,6 @@ class NodeUpdater: "{}{} already up-to-date, skip to ray start", self.log_prefix, self.node_id) - # When resuming from a stopped instance the runtime_hash may be the - # same, but the container will not be started. - self.cmd_runner.run_init( - as_head=self.is_head_node, file_mounts=self.file_mounts) - else: cli_logger.print( "Updating cluster configuration.", diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index ab19d070b..5862d6171 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -1110,7 +1110,10 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.1", "start_ray_worker") def testSetupCommandsWithStoppedNodeCaching(self): + file_mount_dir = tempfile.mkdtemp() config = SMALL_CLUSTER.copy() + config["file_mounts"] = {"/root/test-folder": file_mount_dir} + config["file_mounts_sync_continuously"] = True config["min_workers"] = 1 config["max_workers"] = 1 config_path = self.write_config(config) @@ -1133,6 +1136,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.0", "setup_cmd") runner.assert_has_call("172.0.0.0", "worker_setup_cmd") runner.assert_has_call("172.0.0.0", "start_ray_worker") + runner.assert_has_call("172.0.0.0", "docker run") # Check the node was indeed reused self.provider.terminate_node(0) @@ -1147,6 +1151,25 @@ class AutoscalingTest(unittest.TestCase): runner.assert_not_has_call("172.0.0.0", "setup_cmd") runner.assert_not_has_call("172.0.0.0", "worker_setup_cmd") runner.assert_has_call("172.0.0.0", "start_ray_worker") + runner.assert_has_call("172.0.0.0", "docker run") + + with open(f"{file_mount_dir}/new_file", "w") as f: + f.write("abcdefgh") + + # Check that run_init happens when file_mounts have updated + self.provider.terminate_node(0) + autoscaler.update() + self.waitForNodes(1) + runner.clear_history() + self.provider.finish_starting_nodes() + autoscaler.update() + self.waitForNodes( + 1, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + runner.assert_not_has_call("172.0.0.0", "init_cmd") + runner.assert_not_has_call("172.0.0.0", "setup_cmd") + runner.assert_not_has_call("172.0.0.0", "worker_setup_cmd") + runner.assert_has_call("172.0.0.0", "start_ray_worker") + runner.assert_has_call("172.0.0.0", "docker run") runner.clear_history() autoscaler.update()