diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 8292583ac..3b2ac0f68 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -13,6 +13,7 @@ from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized from ray.autoscaler.node_provider import get_node_provider from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, + TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, STATUS_UP_TO_DATE, NODE_TYPE_WORKER) from ray.autoscaler.updater import NodeUpdaterThread @@ -142,6 +143,10 @@ class StandardAutoscaler: self.last_update_time = now nodes = self.workers() + # Check pending nodes immediately after fetching the number of running + # nodes to minimize chance number of pending nodes changing after + # additional nodes are launched. + num_pending = self.pending_launches.value self.load_metrics.prune_active_ips( [self.provider.internal_ip(node_id) for node_id in nodes]) target_workers = self.target_num_workers() @@ -199,7 +204,6 @@ class StandardAutoscaler: self.launch_new_node(count, instance_type=instance_type) # Launch additional nodes of the default type, if still needed. - num_pending = self.pending_launches.value num_workers = len(nodes) + num_pending if num_workers < target_workers: max_allowed = min(self.max_launch_batch, @@ -253,19 +257,29 @@ class StandardAutoscaler: self.recover_if_needed(node_id, now) def reload_config(self, errors_fatal=False): + sync_continuously = False + if hasattr(self, "config"): + sync_continuously = self.config.get( + "file_mounts_sync_continuously", False) try: with open(self.config_path) as f: new_config = yaml.safe_load(f.read()) validate_config(new_config) new_launch_hash = hash_launch_conf(new_config["worker_nodes"], new_config["auth"]) - new_runtime_hash = hash_runtime_conf(new_config["file_mounts"], [ - new_config["worker_setup_commands"], - new_config["worker_start_ray_commands"] - ]) + (new_runtime_hash, + new_file_mounts_contents_hash) = hash_runtime_conf( + new_config["file_mounts"], + [ + new_config["worker_setup_commands"], + new_config["worker_start_ray_commands"], + ], + generate_file_mounts_contents_hash=sync_continuously, + ) self.config = new_config self.launch_hash = new_launch_hash self.runtime_hash = new_runtime_hash + self.file_mounts_contents_hash = new_file_mounts_contents_hash except Exception as e: if errors_fatal: raise e @@ -312,11 +326,19 @@ class StandardAutoscaler: return True def files_up_to_date(self, node_id): - applied = self.provider.node_tags(node_id).get(TAG_RAY_RUNTIME_CONFIG) - if applied != self.runtime_hash: + node_tags = self.provider.node_tags(node_id) + applied_config_hash = node_tags.get(TAG_RAY_RUNTIME_CONFIG) + applied_file_mounts_contents_hash = node_tags.get( + TAG_RAY_FILE_MOUNTS_CONTENTS) + if (applied_config_hash != self.runtime_hash + or (self.file_mounts_contents_hash is not None + and self.file_mounts_contents_hash != + applied_file_mounts_contents_hash)): logger.info("StandardAutoscaler: " - "{}: Runtime state is {}, want {}".format( - node_id, applied, self.runtime_hash)) + "{}: Runtime state is ({},{}), want ({},{})".format( + node_id, applied_config_hash, + applied_file_mounts_contents_hash, + self.runtime_hash, self.file_mounts_contents_hash)) return False return True @@ -345,6 +367,7 @@ class StandardAutoscaler: ray_start_commands=with_head_node_ip( self.config["worker_start_ray_commands"]), runtime_hash=self.runtime_hash, + file_mounts_contents_hash=self.file_mounts_contents_hash, process_runner=self.process_runner, use_internal_ip=True, docker_config=self.config.get("docker")) @@ -385,6 +408,7 @@ class StandardAutoscaler: setup_commands=with_head_node_ip(init_commands), ray_start_commands=with_head_node_ip(ray_start_commands), runtime_hash=self.runtime_hash, + file_mounts_contents_hash=self.file_mounts_contents_hash, process_runner=self.process_runner, use_internal_ip=True, docker_config=self.config.get("docker")) diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index 2586cc8aa..042c3e497 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -108,6 +108,9 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } +# Whether changes to mounted files in the head node should sync to the worker node continuously +file_mounts_sync_continuously: False + # List of commands that will be run before `setup_commands`. If docker is # enabled, these commands will run outside the container and before docker # is setup. diff --git a/python/ray/autoscaler/azure/example-full.yaml b/python/ray/autoscaler/azure/example-full.yaml index 3537c9401..69267bd31 100644 --- a/python/ray/autoscaler/azure/example-full.yaml +++ b/python/ray/autoscaler/azure/example-full.yaml @@ -102,6 +102,9 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } +# Whether changes to mounted files in the head node should sync to the worker node continuously +file_mounts_sync_continuously: False + # List of commands that will be run before `setup_commands`. If docker is # enabled, these commands will run outside the container and before docker # is setup. diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 015b63c16..fb4d95de4 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -403,9 +403,9 @@ class SSHCommandRunner(CommandRunnerInterface): if cli_logger.verbosity > 0: with cli_logger.indented(): - start_process() + return start_process() else: - start_process() + return start_process() def run_rsync_up(self, source, target): self._set_ssh_ip_if_required() diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 504fef5d1..b9bc16b3e 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -373,6 +373,7 @@ def kill_node(config_file, yes, hard, override_cluster_name): setup_commands=[], ray_start_commands=[], runtime_hash="", + file_mounts_contents_hash="", docker_config=config.get("docker")) _exec(updater, "ray stop", False, False) @@ -540,7 +541,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, # TODO(ekl) right now we always update the head node even if the # hash matches. # We could prompt the user for what they want to do here. - runtime_hash = hash_runtime_conf(config["file_mounts"], config) + (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf( + config["file_mounts"], config) cli_logger.old_info( logger, @@ -604,6 +606,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, setup_commands=init_commands, ray_start_commands=ray_start_commands, runtime_hash=runtime_hash, + file_mounts_contents_hash=file_mounts_contents_hash, docker_config=config.get("docker")) updater.start() updater.join() @@ -740,6 +743,7 @@ def exec_cluster(config_file: str, setup_commands=[], ray_start_commands=[], runtime_hash="", + file_mounts_contents_hash="", docker_config=config.get("docker")) is_docker = isinstance(updater.cmd_runner, DockerCommandRunner) @@ -863,6 +867,7 @@ def rsync(config_file: str, setup_commands=[], ray_start_commands=[], runtime_hash="", + file_mounts_contents_hash="", docker_config=config.get("docker")) if down: rsync = updater.rsync_down diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index 5864775ed..092c2babd 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -109,6 +109,9 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } +# Whether changes to mounted files in the head node should sync to the worker node continuously +file_mounts_sync_continuously: False + # List of commands that will be run before `setup_commands`. If docker is # enabled, these commands will run outside the container and before docker # is setup. diff --git a/python/ray/autoscaler/kubernetes/example-full.yaml b/python/ray/autoscaler/kubernetes/example-full.yaml index d7188b689..8df1add59 100644 --- a/python/ray/autoscaler/kubernetes/example-full.yaml +++ b/python/ray/autoscaler/kubernetes/example-full.yaml @@ -265,6 +265,9 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } +# Whether changes to mounted files in the head node should sync to the worker node continuously +file_mounts_sync_continuously: False + # List of commands that will be run before `setup_commands`. If docker is # enabled, these commands will run outside the container and before docker # is setup. diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml index e674f68ce..35e24e2a1 100644 --- a/python/ray/autoscaler/local/example-full.yaml +++ b/python/ray/autoscaler/local/example-full.yaml @@ -57,6 +57,9 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } +# Whether changes to mounted files in the head node should sync to the worker node continuously +file_mounts_sync_continuously: False + # List of commands that will be run before `setup_commands`. If docker is # enabled, these commands will run outside the container and before docker # is setup. diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 2c3fead8e..3b2d9098f 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -232,6 +232,10 @@ "type": "object", "description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}" }, + "file_mounts_sync_continuously": { + "type": "boolean", + "description": "If enabled, file mounts will sync continously between the head node and the worker nodes. The nodes will not re-run setup commands if only the contents of the file mounts folders change." + }, "initialization_commands": { "$ref": "#/definitions/commands", "description": "List of commands that will be run before `setup_commands`. If docker is enabled, these commands will run outside the container and before docker is setup." diff --git a/python/ray/autoscaler/tags.py b/python/ray/autoscaler/tags.py index 6a49e9e9b..99f201d0e 100644 --- a/python/ray/autoscaler/tags.py +++ b/python/ray/autoscaler/tags.py @@ -29,3 +29,5 @@ TAG_RAY_LAUNCH_CONFIG = "ray-launch-config" # Hash of the node runtime config, used to determine if updates are needed TAG_RAY_RUNTIME_CONFIG = "ray-runtime-config" +# Hash of the contents of the directories specified by the file_mounts config +TAG_RAY_FILE_MOUNTS_CONTENTS = "ray-file-mounts-contents" diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 82c72bccd..130a2c4e9 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -7,6 +7,7 @@ import time from threading import Thread from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ + TAG_RAY_FILE_MOUNTS_CONTENTS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions @@ -34,6 +35,7 @@ class NodeUpdater: setup_commands, ray_start_commands, runtime_hash, + file_mounts_contents_hash, process_runner=subprocess, use_internal_ip=False, docker_config=None): @@ -57,6 +59,7 @@ class NodeUpdater: self.setup_commands = setup_commands self.ray_start_commands = ray_start_commands self.runtime_hash = runtime_hash + self.file_mounts_contents_hash = file_mounts_contents_hash self.auth_config = auth_config def run(self): @@ -97,11 +100,15 @@ class NodeUpdater: return raise - self.provider.set_node_tags( - self.node_id, { - TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, - TAG_RAY_RUNTIME_CONFIG: self.runtime_hash - }) + tags_to_set = { + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_RUNTIME_CONFIG: self.runtime_hash, + } + if self.file_mounts_contents_hash is not None: + tags_to_set[ + TAG_RAY_FILE_MOUNTS_CONTENTS] = self.file_mounts_contents_hash + + self.provider.set_node_tags(self.node_id, tags_to_set) cli_logger.labeled_value("New status", STATUS_UP_TO_DATE) self.exitcode = 0 @@ -182,7 +189,13 @@ class NodeUpdater: node_tags = self.provider.node_tags(self.node_id) logger.debug("Node tags: {}".format(str(node_tags))) - if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash: + + # runtime_hash will only change whenever the user restarts + # or updates their cluster with `get_or_create_head_node` + if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash and ( + self.file_mounts_contents_hash is None + or node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS) == + self.file_mounts_contents_hash): # todo: we lie in the confirmation message since # full setup might be cancelled here cli_logger.print( @@ -191,6 +204,7 @@ class NodeUpdater: cli_logger.old_info(logger, "{}{} already up-to-date, skip to ray start", self.log_prefix, self.node_id) + else: cli_logger.print( "Updating cluster configuration.", @@ -201,51 +215,58 @@ class NodeUpdater: cli_logger.labeled_value("New status", STATUS_SYNCING_FILES) self.sync_file_mounts(self.rsync_up) - # Run init commands - self.provider.set_node_tags( - self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP}) - cli_logger.labeled_value("New status", STATUS_SETTING_UP) + # Only run setup commands if runtime_hash has changed because + # we don't want to run setup_commands every time the head node + # file_mounts folders have changed. + if node_tags.get(TAG_RAY_RUNTIME_CONFIG) != self.runtime_hash: + # Run init commands + self.provider.set_node_tags( + self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP}) + cli_logger.labeled_value("New status", STATUS_SETTING_UP) - if self.initialization_commands: - with cli_logger.group( - "Running initialization commands", - _numbered=("[]", 3, 5)): # todo: fix command numbering - with LogTimer( - self.log_prefix + "Initialization commands", - show_status=True): + if self.initialization_commands: + with cli_logger.group( + "Running initialization commands", + _numbered=("[]", 3, + 5)): # todo: fix command numbering + with LogTimer( + self.log_prefix + "Initialization commands", + show_status=True): - for cmd in self.initialization_commands: - self.cmd_runner.run( - cmd, - ssh_options_override=SSHOptions( - self.auth_config.get("ssh_private_key"))) - else: - cli_logger.print( - "No initialization commands to run.", - _numbered=("[]", 3, 5)) + for cmd in self.initialization_commands: + self.cmd_runner.run( + cmd, + ssh_options_override=SSHOptions( + self.auth_config.get( + "ssh_private_key"))) + else: + cli_logger.print( + "No initialization commands to run.", + _numbered=("[]", 3, 5)) - if self.setup_commands: - with cli_logger.group( - "Running setup commands", - _numbered=("[]", 4, 5)): # todo: fix command numbering - with LogTimer( - self.log_prefix + "Setup commands", - show_status=True): + if self.setup_commands: + with cli_logger.group( + "Running setup commands", + _numbered=("[]", 4, + 5)): # todo: fix command numbering + with LogTimer( + self.log_prefix + "Setup commands", + show_status=True): - total = len(self.setup_commands) - for i, cmd in enumerate(self.setup_commands): - if cli_logger.verbosity == 0: - cmd_to_print = cf.bold(cmd[:30]) + "..." - else: - cmd_to_print = cf.bold(cmd) + total = len(self.setup_commands) + for i, cmd in enumerate(self.setup_commands): + if cli_logger.verbosity == 0: + cmd_to_print = cf.bold(cmd[:30]) + "..." + else: + cmd_to_print = cf.bold(cmd) - cli_logger.print( - cmd_to_print, _numbered=("()", i, total)) + cli_logger.print( + cmd_to_print, _numbered=("()", i, total)) - self.cmd_runner.run(cmd) - else: - cli_logger.print( - "No setup commands to run.", _numbered=("[]", 4, 5)) + self.cmd_runner.run(cmd) + else: + cli_logger.print( + "No setup commands to run.", _numbered=("[]", 4, 5)) with cli_logger.group( "Starting the Ray runtime", _numbered=("[]", 5, 5)): diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/util.py index 47fedd959..d9c646d40 100644 --- a/python/ray/autoscaler/util.py +++ b/python/ray/autoscaler/util.py @@ -102,14 +102,27 @@ def hash_launch_conf(node_conf, auth): _hash_cache = {} -def hash_runtime_conf(file_mounts, extra_objs): - hasher = hashlib.sha1() +def hash_runtime_conf(file_mounts, + extra_objs, + generate_file_mounts_contents_hash=False): + """Returns two hashes, a runtime hash and file_mounts_content hash. + + The runtime hash is used to determine if the configuration or file_mounts + contents have changed. It is used at launch time (ray up) to determine if + a restart is needed. + + The file_mounts_content hash is used to determine if the file_mounts + contents have changed. It is used at monitor time to determine if + additional file syncing is needed. + """ + runtime_hasher = hashlib.sha1() + contents_hasher = hashlib.sha1() def add_content_hashes(path): def add_hash_of_file(fpath): with open(fpath, "rb") as f: for chunk in iter(lambda: f.read(2**20), b""): - hasher.update(chunk) + contents_hasher.update(chunk) path = os.path.expanduser(path) if os.path.isdir(path): @@ -117,9 +130,9 @@ def hash_runtime_conf(file_mounts, extra_objs): for dirpath, _, filenames in os.walk(path): dirs.append((dirpath, sorted(filenames))) for dirpath, filenames in sorted(dirs): - hasher.update(dirpath.encode("utf-8")) + contents_hasher.update(dirpath.encode("utf-8")) for name in filenames: - hasher.update(name.encode("utf-8")) + contents_hasher.update(name.encode("utf-8")) fpath = os.path.join(dirpath, name) add_hash_of_file(fpath) else: @@ -128,12 +141,20 @@ def hash_runtime_conf(file_mounts, extra_objs): conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") + json.dumps(extra_objs, sort_keys=True).encode("utf-8")) - # Important: only hash the files once. Otherwise, we can end up restarting - # workers if the files were changed and we re-hashed them. - if conf_str not in _hash_cache: - hasher.update(conf_str) + # Only generate a contents hash if generate_contents_hash is true or + # if we need to generate the runtime_hash + if conf_str not in _hash_cache or generate_file_mounts_contents_hash: for local_path in sorted(file_mounts.values()): add_content_hashes(local_path) - _hash_cache[conf_str] = hasher.hexdigest() + contents_hash = contents_hasher.hexdigest() - return _hash_cache[conf_str] + # Generate a new runtime_hash if its not cached + if conf_str not in _hash_cache: + runtime_hasher.update(conf_str) + runtime_hasher.update(contents_hash.encode("utf-8")) + _hash_cache[conf_str] = runtime_hasher.hexdigest() + + else: + contents_hash = None + + return (_hash_cache[conf_str], contents_hash) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 0d8882f8f..b6b6a40d3 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -47,6 +47,10 @@ class MockProcessRunner: raise Exception("Failing command on purpose") self.calls.append(cmd) + def check_output(self, cmd): + self.check_call(cmd) + return "command-output".encode() + def assert_has_call(self, ip, pattern): out = "" for cmd in self.calls: @@ -285,6 +289,7 @@ class AutoscalingTest(unittest.TestCase): self.tmpdir = tempfile.mkdtemp() def tearDown(self): + self.provider = None del NODE_PROVIDERS["mock"] shutil.rmtree(self.tmpdir) ray.shutdown() @@ -1083,6 +1088,137 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") runner.assert_has_call("172.0.0.{}".format(i), "start_ray_worker") + def testContinuousFileMounts(self): + file_mount_dir = tempfile.mkdtemp() + + self.provider = MockProvider() + config = SMALL_CLUSTER.copy() + config["file_mounts"] = {"/home/test-folder": file_mount_dir} + config["file_mounts_sync_continuously"] = True + config["min_workers"] = 2 + config["max_workers"] = 2 + config_path = self.write_config(config) + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + + autoscaler.update() + self.waitForNodes(2) + self.provider.finish_starting_nodes() + autoscaler.update() + self.waitForNodes( + 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + autoscaler.update() + + for i in [0, 1]: + runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call( + "172.0.0.{}".format(i), + "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( + file_mount_dir, i)) + + runner.clear_history() + + with open(os.path.join(file_mount_dir, "test.txt"), "wb") as temp_file: + temp_file.write("hello".encode()) + + autoscaler.update() + self.waitForNodes(2) + self.provider.finish_starting_nodes() + autoscaler.update() + self.waitForNodes( + 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + autoscaler.update() + + for i in [0, 1]: + runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call( + "172.0.0.{}".format(i), + "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( + file_mount_dir, i)) + + def testFileMountsNonContinuous(self): + file_mount_dir = tempfile.mkdtemp() + + self.provider = MockProvider() + config = SMALL_CLUSTER.copy() + config["file_mounts"] = {"/home/test-folder": file_mount_dir} + config["min_workers"] = 2 + config["max_workers"] = 2 + config_path = self.write_config(config) + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + + autoscaler.update() + self.waitForNodes(2) + self.provider.finish_starting_nodes() + autoscaler.update() + self.waitForNodes( + 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + autoscaler.update() + + for i in [0, 1]: + runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call( + "172.0.0.{}".format(i), + "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( + file_mount_dir, i)) + + runner.clear_history() + + with open(os.path.join(file_mount_dir, "test.txt"), "wb") as temp_file: + temp_file.write("hello".encode()) + + autoscaler.update() + self.waitForNodes(2) + self.provider.finish_starting_nodes() + self.waitForNodes( + 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + + for i in [0, 1]: + runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_not_has_call( + "172.0.0.{}".format(i), + "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( + file_mount_dir, i)) + + # Simulate a second `ray up` call + from ray.autoscaler import util + util._hash_cache = {} + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + + autoscaler.update() + self.waitForNodes(2) + self.provider.finish_starting_nodes() + self.waitForNodes( + 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + autoscaler.update() + + for i in [0, 1]: + runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call( + "172.0.0.{}".format(i), + "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( + file_mount_dir, i)) + if __name__ == "__main__": import sys