[autoscaler] Create worker_file_mounts config (#9762)

This commit is contained in:
Alan Guo
2020-07-31 14:33:27 -07:00
committed by SangBin Cho
parent 0a6847c5dc
commit f8f6f342f6
12 changed files with 126 additions and 43 deletions
+2
View File
@@ -270,6 +270,7 @@ class StandardAutoscaler:
(new_runtime_hash,
new_file_mounts_contents_hash) = hash_runtime_conf(
new_config["file_mounts"],
new_config["cluster_synced_files"],
[
new_config["worker_setup_commands"],
new_config["worker_start_ray_commands"],
@@ -409,6 +410,7 @@ class StandardAutoscaler:
ray_start_commands=with_head_node_ip(ray_start_commands),
runtime_hash=self.runtime_hash,
file_mounts_contents_hash=self.file_mounts_contents_hash,
cluster_synced_files=self.config["cluster_synced_files"],
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config.get("docker"))
+8 -1
View File
@@ -108,7 +108,14 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
+11 -4
View File
@@ -55,7 +55,7 @@ provider:
location: westus2
resource_group: ray-cluster
# set subscription id otherwise the default from az cli will be used
# subscription_id: 00000000-0000-0000-0000-000000000000
# subscription_id: 00000000-0000-0000-0000-000000000000
# How Ray will authenticate with newly launched nodes.
auth:
@@ -67,7 +67,7 @@ auth:
# More specific customization to node configurations can be made using the ARM template azure-vm-template.json file
# See documentation here: https://docs.microsoft.com/en-us/azure/templates/microsoft.compute/2019-03-01/virtualmachines
# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs
# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs
# on the head node, so changes to the template must be included in the wheel file used in setup_commands section below
# Provider-specific config for the head node, e.g. instance type.
@@ -89,7 +89,7 @@ worker_nodes:
imageOffer: ubuntu-1804
imageSku: 1804-gen2
imageVersion: 20.02.01
# optionally set priority to use Spot instances
# optionally set priority to use Spot instances
priority: Spot
# set a maximum price for spot instances if desired
# billingProfile:
@@ -102,7 +102,14 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
+3 -1
View File
@@ -541,8 +541,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
# TODO(ekl) right now we always update the head node even if the
# hash matches.
# We could prompt the user for what they want to do here.
# No need to pass in cluster_sync_files because we use this
# hash to set up the head node
(runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
config["file_mounts"], config)
config["file_mounts"], None, config)
cli_logger.old_info(
logger,
+8 -1
View File
@@ -109,7 +109,14 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
@@ -265,7 +265,14 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
@@ -57,7 +57,14 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
+4
View File
@@ -232,6 +232,10 @@
"type": "object",
"description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}"
},
"cluster_synced_files": {
"type": "array",
"description": "List of paths on the head node which should sync to the worker nodes, e.g. [\"/some/data/somehwere\"]"
},
"file_mounts_sync_continuously": {
"type": "boolean",
"description": "If enabled, file mounts will sync continously between the head node and the worker nodes. The nodes will not re-run setup commands if only the contents of the file mounts folders change."
+2
View File
@@ -30,4 +30,6 @@ TAG_RAY_LAUNCH_CONFIG = "ray-launch-config"
# Hash of the node runtime config, used to determine if updates are needed
TAG_RAY_RUNTIME_CONFIG = "ray-runtime-config"
# Hash of the contents of the directories specified by the file_mounts config
# if the node is a worker, this also hashes content of the directories
# specified by the cluster_synced_files config
TAG_RAY_FILE_MOUNTS_CONTENTS = "ray-file-mounts-contents"
+46 -25
View File
@@ -36,6 +36,7 @@ class NodeUpdater:
ray_start_commands,
runtime_hash,
file_mounts_contents_hash,
cluster_synced_files=None,
process_runner=subprocess,
use_internal_ip=False,
docker_config=None):
@@ -60,6 +61,7 @@ class NodeUpdater:
self.ray_start_commands = ray_start_commands
self.runtime_hash = runtime_hash
self.file_mounts_contents_hash = file_mounts_contents_hash
self.cluster_synced_files = cluster_synced_files
self.auth_config = auth_config
def run(self):
@@ -120,31 +122,50 @@ class NodeUpdater:
"~/ray_bootstrap_key.pem", "~/ray_bootstrap_config.yaml"
]
def do_sync(remote_path, local_path, allow_non_existing_paths=False):
if allow_non_existing_paths and not os.path.exists(local_path):
# Ignore missing source files. In the future we should support
# the --delete-missing-args command to delete files that have
# been removed
return
assert os.path.exists(local_path), local_path
if os.path.isdir(local_path):
if not local_path.endswith("/"):
local_path += "/"
if not remote_path.endswith("/"):
remote_path += "/"
with LogTimer(self.log_prefix +
"Synced {} to {}".format(local_path, remote_path)):
self.cmd_runner.run("mkdir -p {}".format(
os.path.dirname(remote_path)))
sync_cmd(local_path, remote_path)
if remote_path not in nolog_paths:
# todo: timed here?
cli_logger.print("{} from {}", cf.bold(remote_path),
cf.bold(local_path))
# Rsync file mounts
with cli_logger.group(
"Processing file mounts", _numbered=("[]", 2, 5)):
"Processing file mounts", _numbered=("[]", 2, 6)):
for remote_path, local_path in self.file_mounts.items():
assert os.path.exists(local_path), local_path
if os.path.isdir(local_path):
if not local_path.endswith("/"):
local_path += "/"
if not remote_path.endswith("/"):
remote_path += "/"
do_sync(remote_path, local_path)
with LogTimer(self.log_prefix + "Synced {} to {}".format(
local_path, remote_path)):
self.cmd_runner.run("mkdir -p {}".format(
os.path.dirname(remote_path)))
sync_cmd(local_path, remote_path)
if remote_path not in nolog_paths:
# todo: timed here?
cli_logger.print("{} from {}", cf.bold(remote_path),
cf.bold(local_path))
if self.cluster_synced_files:
with cli_logger.group(
"Processing worker file mounts", _numbered=("[]", 3, 6)):
for path in self.cluster_synced_files:
do_sync(path, path, allow_non_existing_paths=True)
else:
cli_logger.print(
"No worker file mounts to sync", _numbered=("[]", 3, 6))
def wait_ready(self, deadline):
with cli_logger.group(
"Waiting for SSH to become available", _numbered=("[]", 1, 5)):
"Waiting for SSH to become available", _numbered=("[]", 1, 6)):
with LogTimer(self.log_prefix + "Got remote shell"):
cli_logger.old_info(logger, "{}Waiting for remote shell...",
self.log_prefix)
@@ -227,8 +248,8 @@ class NodeUpdater:
if self.initialization_commands:
with cli_logger.group(
"Running initialization commands",
_numbered=("[]", 3,
5)): # todo: fix command numbering
_numbered=("[]", 4,
6)): # todo: fix command numbering
with LogTimer(
self.log_prefix + "Initialization commands",
show_status=True):
@@ -242,13 +263,13 @@ class NodeUpdater:
else:
cli_logger.print(
"No initialization commands to run.",
_numbered=("[]", 3, 5))
_numbered=("[]", 4, 6))
if self.setup_commands:
with cli_logger.group(
"Running setup commands",
_numbered=("[]", 4,
5)): # todo: fix command numbering
_numbered=("[]", 5,
6)): # todo: fix command numbering
with LogTimer(
self.log_prefix + "Setup commands",
show_status=True):
@@ -266,10 +287,10 @@ class NodeUpdater:
self.cmd_runner.run(cmd)
else:
cli_logger.print(
"No setup commands to run.", _numbered=("[]", 4, 5))
"No setup commands to run.", _numbered=("[]", 5, 6))
with cli_logger.group(
"Starting the Ray runtime", _numbered=("[]", 5, 5)):
"Starting the Ray runtime", _numbered=("[]", 6, 6)):
with LogTimer(
self.log_prefix + "Ray start commands", show_status=True):
for cmd in self.ray_start_commands:
+25 -9
View File
@@ -103,6 +103,7 @@ _hash_cache = {}
def hash_runtime_conf(file_mounts,
cluster_synced_files,
extra_objs,
generate_file_mounts_contents_hash=False):
"""Returns two hashes, a runtime hash and file_mounts_content hash.
@@ -111,20 +112,22 @@ def hash_runtime_conf(file_mounts,
contents have changed. It is used at launch time (ray up) to determine if
a restart is needed.
The file_mounts_content hash is used to determine if the file_mounts
contents have changed. It is used at monitor time to determine if
additional file syncing is needed.
The file_mounts_content hash is used to determine if the file_mounts or
cluster_synced_files contents have changed. It is used at monitor time to
determine if additional file syncing is needed.
"""
runtime_hasher = hashlib.sha1()
contents_hasher = hashlib.sha1()
def add_content_hashes(path):
def add_content_hashes(path, allow_non_existing_paths: bool = False):
def add_hash_of_file(fpath):
with open(fpath, "rb") as f:
for chunk in iter(lambda: f.read(2**20), b""):
contents_hasher.update(chunk)
path = os.path.expanduser(path)
if allow_non_existing_paths and not os.path.exists(path):
return
if os.path.isdir(path):
dirs = []
for dirpath, _, filenames in os.walk(path):
@@ -146,15 +149,28 @@ def hash_runtime_conf(file_mounts,
if conf_str not in _hash_cache or generate_file_mounts_contents_hash:
for local_path in sorted(file_mounts.values()):
add_content_hashes(local_path)
contents_hash = contents_hasher.hexdigest()
head_node_contents_hash = contents_hasher.hexdigest()
# Generate a new runtime_hash if its not cached
# The runtime hash does not depend on the cluster_synced_files hash
# because we do not want to restart nodes only if cluster_synced_files
# contents have changed.
if conf_str not in _hash_cache:
runtime_hasher.update(conf_str)
runtime_hasher.update(contents_hash.encode("utf-8"))
runtime_hasher.update(head_node_contents_hash.encode("utf-8"))
_hash_cache[conf_str] = runtime_hasher.hexdigest()
else:
contents_hash = None
# Add cluster_synced_files to the file_mounts_content hash
if cluster_synced_files is not None:
for local_path in sorted(cluster_synced_files):
# For cluster_synced_files, we let the path be non-existant
# because its possible that the source directory gets set up
# anytime over the life of the head node.
add_content_hashes(local_path, allow_non_existing_paths=True)
return (_hash_cache[conf_str], contents_hash)
file_mounts_contents_hash = contents_hasher.hexdigest()
else:
file_mounts_contents_hash = None
return (_hash_cache[conf_str], file_mounts_contents_hash)
+1
View File
@@ -190,6 +190,7 @@ SMALL_CLUSTER = {
"TestProp": 2,
},
"file_mounts": {},
"cluster_synced_files": [],
"initialization_commands": ["init_cmd"],
"setup_commands": ["setup_cmd"],
"head_setup_commands": ["head_setup_cmd"],