From f8f6f342f662029d9e100d21cf074aaced757ba8 Mon Sep 17 00:00:00 2001 From: Alan Guo Date: Fri, 31 Jul 2020 14:33:27 -0700 Subject: [PATCH] [autoscaler] Create worker_file_mounts config (#9762) --- python/ray/autoscaler/autoscaler.py | 2 + python/ray/autoscaler/aws/example-full.yaml | 9 ++- python/ray/autoscaler/azure/example-full.yaml | 15 ++-- python/ray/autoscaler/commands.py | 4 +- python/ray/autoscaler/gcp/example-full.yaml | 9 ++- .../autoscaler/kubernetes/example-full.yaml | 9 ++- python/ray/autoscaler/local/example-full.yaml | 9 ++- python/ray/autoscaler/ray-schema.json | 4 ++ python/ray/autoscaler/tags.py | 2 + python/ray/autoscaler/updater.py | 71 ++++++++++++------- python/ray/autoscaler/util.py | 34 ++++++--- python/ray/tests/test_autoscaler.py | 1 + 12 files changed, 126 insertions(+), 43 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 3b2ac0f68..feac22065 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -270,6 +270,7 @@ class StandardAutoscaler: (new_runtime_hash, new_file_mounts_contents_hash) = hash_runtime_conf( new_config["file_mounts"], + new_config["cluster_synced_files"], [ new_config["worker_setup_commands"], new_config["worker_start_ray_commands"], @@ -409,6 +410,7 @@ class StandardAutoscaler: ray_start_commands=with_head_node_ip(ray_start_commands), runtime_hash=self.runtime_hash, file_mounts_contents_hash=self.file_mounts_contents_hash, + cluster_synced_files=self.config["cluster_synced_files"], process_runner=self.process_runner, use_internal_ip=True, docker_config=self.config.get("docker")) diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index 042c3e497..794cc87ca 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -108,7 +108,14 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# Whether changes to mounted files in the head node should sync to the worker node continuously +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously file_mounts_sync_continuously: False # List of commands that will be run before `setup_commands`. If docker is diff --git a/python/ray/autoscaler/azure/example-full.yaml b/python/ray/autoscaler/azure/example-full.yaml index 69267bd31..c188d4184 100644 --- a/python/ray/autoscaler/azure/example-full.yaml +++ b/python/ray/autoscaler/azure/example-full.yaml @@ -55,7 +55,7 @@ provider: location: westus2 resource_group: ray-cluster # set subscription id otherwise the default from az cli will be used - # subscription_id: 00000000-0000-0000-0000-000000000000 + # subscription_id: 00000000-0000-0000-0000-000000000000 # How Ray will authenticate with newly launched nodes. auth: @@ -67,7 +67,7 @@ auth: # More specific customization to node configurations can be made using the ARM template azure-vm-template.json file # See documentation here: https://docs.microsoft.com/en-us/azure/templates/microsoft.compute/2019-03-01/virtualmachines -# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs +# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs # on the head node, so changes to the template must be included in the wheel file used in setup_commands section below # Provider-specific config for the head node, e.g. instance type. @@ -89,7 +89,7 @@ worker_nodes: imageOffer: ubuntu-1804 imageSku: 1804-gen2 imageVersion: 20.02.01 - # optionally set priority to use Spot instances + # optionally set priority to use Spot instances priority: Spot # set a maximum price for spot instances if desired # billingProfile: @@ -102,7 +102,14 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# Whether changes to mounted files in the head node should sync to the worker node continuously +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously file_mounts_sync_continuously: False # List of commands that will be run before `setup_commands`. If docker is diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index b9bc16b3e..22a6ac1e7 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -541,8 +541,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, # TODO(ekl) right now we always update the head node even if the # hash matches. # We could prompt the user for what they want to do here. + # No need to pass in cluster_sync_files because we use this + # hash to set up the head node (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf( - config["file_mounts"], config) + config["file_mounts"], None, config) cli_logger.old_info( logger, diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index 092c2babd..b5da92226 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -109,7 +109,14 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# Whether changes to mounted files in the head node should sync to the worker node continuously +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously file_mounts_sync_continuously: False # List of commands that will be run before `setup_commands`. If docker is diff --git a/python/ray/autoscaler/kubernetes/example-full.yaml b/python/ray/autoscaler/kubernetes/example-full.yaml index 8df1add59..eeb91fd15 100644 --- a/python/ray/autoscaler/kubernetes/example-full.yaml +++ b/python/ray/autoscaler/kubernetes/example-full.yaml @@ -265,7 +265,14 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# Whether changes to mounted files in the head node should sync to the worker node continuously +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously file_mounts_sync_continuously: False # List of commands that will be run before `setup_commands`. If docker is diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml index 35e24e2a1..489fb3e9b 100644 --- a/python/ray/autoscaler/local/example-full.yaml +++ b/python/ray/autoscaler/local/example-full.yaml @@ -57,7 +57,14 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# Whether changes to mounted files in the head node should sync to the worker node continuously +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously file_mounts_sync_continuously: False # List of commands that will be run before `setup_commands`. If docker is diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 3b2d9098f..bb2f10f3b 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -232,6 +232,10 @@ "type": "object", "description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}" }, + "cluster_synced_files": { + "type": "array", + "description": "List of paths on the head node which should sync to the worker nodes, e.g. [\"/some/data/somehwere\"]" + }, "file_mounts_sync_continuously": { "type": "boolean", "description": "If enabled, file mounts will sync continously between the head node and the worker nodes. The nodes will not re-run setup commands if only the contents of the file mounts folders change." diff --git a/python/ray/autoscaler/tags.py b/python/ray/autoscaler/tags.py index 99f201d0e..d24ac3a73 100644 --- a/python/ray/autoscaler/tags.py +++ b/python/ray/autoscaler/tags.py @@ -30,4 +30,6 @@ TAG_RAY_LAUNCH_CONFIG = "ray-launch-config" # Hash of the node runtime config, used to determine if updates are needed TAG_RAY_RUNTIME_CONFIG = "ray-runtime-config" # Hash of the contents of the directories specified by the file_mounts config +# if the node is a worker, this also hashes content of the directories +# specified by the cluster_synced_files config TAG_RAY_FILE_MOUNTS_CONTENTS = "ray-file-mounts-contents" diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 130a2c4e9..0bd832c1e 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -36,6 +36,7 @@ class NodeUpdater: ray_start_commands, runtime_hash, file_mounts_contents_hash, + cluster_synced_files=None, process_runner=subprocess, use_internal_ip=False, docker_config=None): @@ -60,6 +61,7 @@ class NodeUpdater: self.ray_start_commands = ray_start_commands self.runtime_hash = runtime_hash self.file_mounts_contents_hash = file_mounts_contents_hash + self.cluster_synced_files = cluster_synced_files self.auth_config = auth_config def run(self): @@ -120,31 +122,50 @@ class NodeUpdater: "~/ray_bootstrap_key.pem", "~/ray_bootstrap_config.yaml" ] + def do_sync(remote_path, local_path, allow_non_existing_paths=False): + if allow_non_existing_paths and not os.path.exists(local_path): + # Ignore missing source files. In the future we should support + # the --delete-missing-args command to delete files that have + # been removed + return + + assert os.path.exists(local_path), local_path + + if os.path.isdir(local_path): + if not local_path.endswith("/"): + local_path += "/" + if not remote_path.endswith("/"): + remote_path += "/" + + with LogTimer(self.log_prefix + + "Synced {} to {}".format(local_path, remote_path)): + self.cmd_runner.run("mkdir -p {}".format( + os.path.dirname(remote_path))) + sync_cmd(local_path, remote_path) + + if remote_path not in nolog_paths: + # todo: timed here? + cli_logger.print("{} from {}", cf.bold(remote_path), + cf.bold(local_path)) + # Rsync file mounts with cli_logger.group( - "Processing file mounts", _numbered=("[]", 2, 5)): + "Processing file mounts", _numbered=("[]", 2, 6)): for remote_path, local_path in self.file_mounts.items(): - assert os.path.exists(local_path), local_path - if os.path.isdir(local_path): - if not local_path.endswith("/"): - local_path += "/" - if not remote_path.endswith("/"): - remote_path += "/" + do_sync(remote_path, local_path) - with LogTimer(self.log_prefix + "Synced {} to {}".format( - local_path, remote_path)): - self.cmd_runner.run("mkdir -p {}".format( - os.path.dirname(remote_path))) - sync_cmd(local_path, remote_path) - - if remote_path not in nolog_paths: - # todo: timed here? - cli_logger.print("{} from {}", cf.bold(remote_path), - cf.bold(local_path)) + if self.cluster_synced_files: + with cli_logger.group( + "Processing worker file mounts", _numbered=("[]", 3, 6)): + for path in self.cluster_synced_files: + do_sync(path, path, allow_non_existing_paths=True) + else: + cli_logger.print( + "No worker file mounts to sync", _numbered=("[]", 3, 6)) def wait_ready(self, deadline): with cli_logger.group( - "Waiting for SSH to become available", _numbered=("[]", 1, 5)): + "Waiting for SSH to become available", _numbered=("[]", 1, 6)): with LogTimer(self.log_prefix + "Got remote shell"): cli_logger.old_info(logger, "{}Waiting for remote shell...", self.log_prefix) @@ -227,8 +248,8 @@ class NodeUpdater: if self.initialization_commands: with cli_logger.group( "Running initialization commands", - _numbered=("[]", 3, - 5)): # todo: fix command numbering + _numbered=("[]", 4, + 6)): # todo: fix command numbering with LogTimer( self.log_prefix + "Initialization commands", show_status=True): @@ -242,13 +263,13 @@ class NodeUpdater: else: cli_logger.print( "No initialization commands to run.", - _numbered=("[]", 3, 5)) + _numbered=("[]", 4, 6)) if self.setup_commands: with cli_logger.group( "Running setup commands", - _numbered=("[]", 4, - 5)): # todo: fix command numbering + _numbered=("[]", 5, + 6)): # todo: fix command numbering with LogTimer( self.log_prefix + "Setup commands", show_status=True): @@ -266,10 +287,10 @@ class NodeUpdater: self.cmd_runner.run(cmd) else: cli_logger.print( - "No setup commands to run.", _numbered=("[]", 4, 5)) + "No setup commands to run.", _numbered=("[]", 5, 6)) with cli_logger.group( - "Starting the Ray runtime", _numbered=("[]", 5, 5)): + "Starting the Ray runtime", _numbered=("[]", 6, 6)): with LogTimer( self.log_prefix + "Ray start commands", show_status=True): for cmd in self.ray_start_commands: diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/util.py index d9c646d40..d436be426 100644 --- a/python/ray/autoscaler/util.py +++ b/python/ray/autoscaler/util.py @@ -103,6 +103,7 @@ _hash_cache = {} def hash_runtime_conf(file_mounts, + cluster_synced_files, extra_objs, generate_file_mounts_contents_hash=False): """Returns two hashes, a runtime hash and file_mounts_content hash. @@ -111,20 +112,22 @@ def hash_runtime_conf(file_mounts, contents have changed. It is used at launch time (ray up) to determine if a restart is needed. - The file_mounts_content hash is used to determine if the file_mounts - contents have changed. It is used at monitor time to determine if - additional file syncing is needed. + The file_mounts_content hash is used to determine if the file_mounts or + cluster_synced_files contents have changed. It is used at monitor time to + determine if additional file syncing is needed. """ runtime_hasher = hashlib.sha1() contents_hasher = hashlib.sha1() - def add_content_hashes(path): + def add_content_hashes(path, allow_non_existing_paths: bool = False): def add_hash_of_file(fpath): with open(fpath, "rb") as f: for chunk in iter(lambda: f.read(2**20), b""): contents_hasher.update(chunk) path = os.path.expanduser(path) + if allow_non_existing_paths and not os.path.exists(path): + return if os.path.isdir(path): dirs = [] for dirpath, _, filenames in os.walk(path): @@ -146,15 +149,28 @@ def hash_runtime_conf(file_mounts, if conf_str not in _hash_cache or generate_file_mounts_contents_hash: for local_path in sorted(file_mounts.values()): add_content_hashes(local_path) - contents_hash = contents_hasher.hexdigest() + head_node_contents_hash = contents_hasher.hexdigest() # Generate a new runtime_hash if its not cached + # The runtime hash does not depend on the cluster_synced_files hash + # because we do not want to restart nodes only if cluster_synced_files + # contents have changed. if conf_str not in _hash_cache: runtime_hasher.update(conf_str) - runtime_hasher.update(contents_hash.encode("utf-8")) + runtime_hasher.update(head_node_contents_hash.encode("utf-8")) _hash_cache[conf_str] = runtime_hasher.hexdigest() - else: - contents_hash = None + # Add cluster_synced_files to the file_mounts_content hash + if cluster_synced_files is not None: + for local_path in sorted(cluster_synced_files): + # For cluster_synced_files, we let the path be non-existant + # because its possible that the source directory gets set up + # anytime over the life of the head node. + add_content_hashes(local_path, allow_non_existing_paths=True) - return (_hash_cache[conf_str], contents_hash) + file_mounts_contents_hash = contents_hasher.hexdigest() + + else: + file_mounts_contents_hash = None + + return (_hash_cache[conf_str], file_mounts_contents_hash) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index b6b6a40d3..6c115f58c 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -190,6 +190,7 @@ SMALL_CLUSTER = { "TestProp": 2, }, "file_mounts": {}, + "cluster_synced_files": [], "initialization_commands": ["init_cmd"], "setup_commands": ["setup_cmd"], "head_setup_commands": ["head_setup_cmd"],