From 12261c2a19cd198c123158f4e45cd22c9a940913 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Thu, 10 Sep 2020 15:59:24 -0700 Subject: [PATCH] [Autoscaler] Cluster sync expand user (#10705) Co-authored-by: Richard Liaw --- python/ray/autoscaler/updater.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 8b69714cc..573c71661 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -78,17 +78,28 @@ class NodeUpdater: self.process_runner = process_runner self.node_id = node_id self.provider = provider + # Some node providers don't specify empty structures as + # defaults. Better to be defensive. + file_mounts = file_mounts or {} self.file_mounts = { remote: os.path.expanduser(local) for remote, local in file_mounts.items() } + self.initialization_commands = initialization_commands self.setup_commands = setup_commands self.ray_start_commands = ray_start_commands self.node_resources = node_resources self.runtime_hash = runtime_hash self.file_mounts_contents_hash = file_mounts_contents_hash - self.cluster_synced_files = cluster_synced_files + # TODO (Alex): This makes the assumption that $HOME on the head and + # worker nodes is the same. Also note that `cluster_synced_files` is + # set on the head -> worker updaters only (so `expanduser` is only run + # on the head node). + cluster_synced_files = cluster_synced_files or [] + self.cluster_synced_files = [ + os.path.expanduser(path) for path in cluster_synced_files + ] self.auth_config = auth_config self.is_head_node = is_head_node @@ -166,6 +177,8 @@ class NodeUpdater: def do_sync(remote_path, local_path, allow_non_existing_paths=False): if allow_non_existing_paths and not os.path.exists(local_path): + cli_logger.print("sync: {} does not exist. Skipping.", + local_path) # Ignore missing source files. In the future we should support # the --delete-missing-args command to delete files that have # been removed @@ -199,17 +212,21 @@ class NodeUpdater: _numbered=("[]", previous_steps + 1, total_steps)): for remote_path, local_path in self.file_mounts.items(): do_sync(remote_path, local_path) + previous_steps += 1 if self.cluster_synced_files: with cli_logger.group( "Processing worker file mounts", - _numbered=("[]", previous_steps + 2, total_steps)): + _numbered=("[]", previous_steps + 1, total_steps)): + cli_logger.print("synced files: {}", + str(self.cluster_synced_files)) for path in self.cluster_synced_files: do_sync(path, path, allow_non_existing_paths=True) + previous_steps += 1 else: cli_logger.print( "No worker file mounts to sync", - _numbered=("[]", previous_steps + 2, total_steps)) + _numbered=("[]", previous_steps + 1, total_steps)) def wait_ready(self, deadline): with cli_logger.group(