[Autoscaler] Cluster sync expand user (#10705)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Alex Wu
2020-09-10 15:59:24 -07:00
committed by GitHub
parent fb0ae148e6
commit 12261c2a19
+20 -3
View File
@@ -78,17 +78,28 @@ class NodeUpdater:
self.process_runner = process_runner
self.node_id = node_id
self.provider = provider
# Some node providers don't specify empty structures as
# defaults. Better to be defensive.
file_mounts = file_mounts or {}
self.file_mounts = {
remote: os.path.expanduser(local)
for remote, local in file_mounts.items()
}
self.initialization_commands = initialization_commands
self.setup_commands = setup_commands
self.ray_start_commands = ray_start_commands
self.node_resources = node_resources
self.runtime_hash = runtime_hash
self.file_mounts_contents_hash = file_mounts_contents_hash
self.cluster_synced_files = cluster_synced_files
# TODO (Alex): This makes the assumption that $HOME on the head and
# worker nodes is the same. Also note that `cluster_synced_files` is
# set on the head -> worker updaters only (so `expanduser` is only run
# on the head node).
cluster_synced_files = cluster_synced_files or []
self.cluster_synced_files = [
os.path.expanduser(path) for path in cluster_synced_files
]
self.auth_config = auth_config
self.is_head_node = is_head_node
@@ -166,6 +177,8 @@ class NodeUpdater:
def do_sync(remote_path, local_path, allow_non_existing_paths=False):
if allow_non_existing_paths and not os.path.exists(local_path):
cli_logger.print("sync: {} does not exist. Skipping.",
local_path)
# Ignore missing source files. In the future we should support
# the --delete-missing-args command to delete files that have
# been removed
@@ -199,17 +212,21 @@ class NodeUpdater:
_numbered=("[]", previous_steps + 1, total_steps)):
for remote_path, local_path in self.file_mounts.items():
do_sync(remote_path, local_path)
previous_steps += 1
if self.cluster_synced_files:
with cli_logger.group(
"Processing worker file mounts",
_numbered=("[]", previous_steps + 2, total_steps)):
_numbered=("[]", previous_steps + 1, total_steps)):
cli_logger.print("synced files: {}",
str(self.cluster_synced_files))
for path in self.cluster_synced_files:
do_sync(path, path, allow_non_existing_paths=True)
previous_steps += 1
else:
cli_logger.print(
"No worker file mounts to sync",
_numbered=("[]", previous_steps + 2, total_steps))
_numbered=("[]", previous_steps + 1, total_steps))
def wait_ready(self, deadline):
with cli_logger.group(