Introduce file_mounts_sync_continuously cluster option (#9544)

* Separate out file_mounts contents hashing into its own separate hash

Add an option to continuously sync file_mounts from head node to worker nodes:
monitor.py will re-sync file mounts whenver contents change but will only run setup_commands if the config also changes

* add test and default value for file_mounts_sync_continuously

* format code

* Update comments

* Add param to skip setup commands when only file_mounts content changed during monitor.py's update tick

Fixed so setup commands run when ray up is run and file_mounts content changes

* Refactor so that runtime_hash retains previous behavior

runtime_hash is almost identical as before this PR. It is used to determine if setup_commands need to run
file_mounts_contents_hash is an additional hash of the file_mounts content that is used to detect when only file syncing has to occur.

Note: runtime_hash value will have changed from before the PR because we hash the hash of the contents of the file_mounts as a performance optimization

* fix issue with hashing a hash

* fix bug where trying to set contents hash when it wasn't generated

* Fix lint error

Fix bug in command_runner where check_output was no longer returning the output of the command

* clear out provider between tests to get rid of flakyness

* reduce chance of race condition from node_launcher launching a node in the middle of an autoscaler.update call
This commit is contained in:
Alan Guo
2020-07-28 00:02:08 -07:00
committed by GitHub
parent c290c308fe
commit 5831737287
13 changed files with 296 additions and 68 deletions
+33 -9
View File
@@ -13,6 +13,7 @@ from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE,
STATUS_UP_TO_DATE, NODE_TYPE_WORKER)
from ray.autoscaler.updater import NodeUpdaterThread
@@ -142,6 +143,10 @@ class StandardAutoscaler:
self.last_update_time = now
nodes = self.workers()
# Check pending nodes immediately after fetching the number of running
# nodes to minimize chance number of pending nodes changing after
# additional nodes are launched.
num_pending = self.pending_launches.value
self.load_metrics.prune_active_ips(
[self.provider.internal_ip(node_id) for node_id in nodes])
target_workers = self.target_num_workers()
@@ -199,7 +204,6 @@ class StandardAutoscaler:
self.launch_new_node(count, instance_type=instance_type)
# Launch additional nodes of the default type, if still needed.
num_pending = self.pending_launches.value
num_workers = len(nodes) + num_pending
if num_workers < target_workers:
max_allowed = min(self.max_launch_batch,
@@ -253,19 +257,29 @@ class StandardAutoscaler:
self.recover_if_needed(node_id, now)
def reload_config(self, errors_fatal=False):
sync_continuously = False
if hasattr(self, "config"):
sync_continuously = self.config.get(
"file_mounts_sync_continuously", False)
try:
with open(self.config_path) as f:
new_config = yaml.safe_load(f.read())
validate_config(new_config)
new_launch_hash = hash_launch_conf(new_config["worker_nodes"],
new_config["auth"])
new_runtime_hash = hash_runtime_conf(new_config["file_mounts"], [
new_config["worker_setup_commands"],
new_config["worker_start_ray_commands"]
])
(new_runtime_hash,
new_file_mounts_contents_hash) = hash_runtime_conf(
new_config["file_mounts"],
[
new_config["worker_setup_commands"],
new_config["worker_start_ray_commands"],
],
generate_file_mounts_contents_hash=sync_continuously,
)
self.config = new_config
self.launch_hash = new_launch_hash
self.runtime_hash = new_runtime_hash
self.file_mounts_contents_hash = new_file_mounts_contents_hash
except Exception as e:
if errors_fatal:
raise e
@@ -312,11 +326,19 @@ class StandardAutoscaler:
return True
def files_up_to_date(self, node_id):
applied = self.provider.node_tags(node_id).get(TAG_RAY_RUNTIME_CONFIG)
if applied != self.runtime_hash:
node_tags = self.provider.node_tags(node_id)
applied_config_hash = node_tags.get(TAG_RAY_RUNTIME_CONFIG)
applied_file_mounts_contents_hash = node_tags.get(
TAG_RAY_FILE_MOUNTS_CONTENTS)
if (applied_config_hash != self.runtime_hash
or (self.file_mounts_contents_hash is not None
and self.file_mounts_contents_hash !=
applied_file_mounts_contents_hash)):
logger.info("StandardAutoscaler: "
"{}: Runtime state is {}, want {}".format(
node_id, applied, self.runtime_hash))
"{}: Runtime state is ({},{}), want ({},{})".format(
node_id, applied_config_hash,
applied_file_mounts_contents_hash,
self.runtime_hash, self.file_mounts_contents_hash))
return False
return True
@@ -345,6 +367,7 @@ class StandardAutoscaler:
ray_start_commands=with_head_node_ip(
self.config["worker_start_ray_commands"]),
runtime_hash=self.runtime_hash,
file_mounts_contents_hash=self.file_mounts_contents_hash,
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config.get("docker"))
@@ -385,6 +408,7 @@ class StandardAutoscaler:
setup_commands=with_head_node_ip(init_commands),
ray_start_commands=with_head_node_ip(ray_start_commands),
runtime_hash=self.runtime_hash,
file_mounts_contents_hash=self.file_mounts_contents_hash,
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config.get("docker"))
@@ -108,6 +108,9 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
@@ -102,6 +102,9 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
+2 -2
View File
@@ -403,9 +403,9 @@ class SSHCommandRunner(CommandRunnerInterface):
if cli_logger.verbosity > 0:
with cli_logger.indented():
start_process()
return start_process()
else:
start_process()
return start_process()
def run_rsync_up(self, source, target):
self._set_ssh_ip_if_required()
+6 -1
View File
@@ -373,6 +373,7 @@ def kill_node(config_file, yes, hard, override_cluster_name):
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
docker_config=config.get("docker"))
_exec(updater, "ray stop", False, False)
@@ -540,7 +541,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
# TODO(ekl) right now we always update the head node even if the
# hash matches.
# We could prompt the user for what they want to do here.
runtime_hash = hash_runtime_conf(config["file_mounts"], config)
(runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
config["file_mounts"], config)
cli_logger.old_info(
logger,
@@ -604,6 +606,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
setup_commands=init_commands,
ray_start_commands=ray_start_commands,
runtime_hash=runtime_hash,
file_mounts_contents_hash=file_mounts_contents_hash,
docker_config=config.get("docker"))
updater.start()
updater.join()
@@ -740,6 +743,7 @@ def exec_cluster(config_file: str,
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
docker_config=config.get("docker"))
is_docker = isinstance(updater.cmd_runner, DockerCommandRunner)
@@ -863,6 +867,7 @@ def rsync(config_file: str,
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
docker_config=config.get("docker"))
if down:
rsync = updater.rsync_down
@@ -109,6 +109,9 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
@@ -265,6 +265,9 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
@@ -57,6 +57,9 @@ file_mounts: {
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# Whether changes to mounted files in the head node should sync to the worker node continuously
file_mounts_sync_continuously: False
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
+4
View File
@@ -232,6 +232,10 @@
"type": "object",
"description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}"
},
"file_mounts_sync_continuously": {
"type": "boolean",
"description": "If enabled, file mounts will sync continously between the head node and the worker nodes. The nodes will not re-run setup commands if only the contents of the file mounts folders change."
},
"initialization_commands": {
"$ref": "#/definitions/commands",
"description": "List of commands that will be run before `setup_commands`. If docker is enabled, these commands will run outside the container and before docker is setup."
+2
View File
@@ -29,3 +29,5 @@ TAG_RAY_LAUNCH_CONFIG = "ray-launch-config"
# Hash of the node runtime config, used to determine if updates are needed
TAG_RAY_RUNTIME_CONFIG = "ray-runtime-config"
# Hash of the contents of the directories specified by the file_mounts config
TAG_RAY_FILE_MOUNTS_CONTENTS = "ray-file-mounts-contents"
+66 -45
View File
@@ -7,6 +7,7 @@ import time
from threading import Thread
from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \
TAG_RAY_FILE_MOUNTS_CONTENTS, \
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \
STATUS_SETTING_UP, STATUS_SYNCING_FILES
from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions
@@ -34,6 +35,7 @@ class NodeUpdater:
setup_commands,
ray_start_commands,
runtime_hash,
file_mounts_contents_hash,
process_runner=subprocess,
use_internal_ip=False,
docker_config=None):
@@ -57,6 +59,7 @@ class NodeUpdater:
self.setup_commands = setup_commands
self.ray_start_commands = ray_start_commands
self.runtime_hash = runtime_hash
self.file_mounts_contents_hash = file_mounts_contents_hash
self.auth_config = auth_config
def run(self):
@@ -97,11 +100,15 @@ class NodeUpdater:
return
raise
self.provider.set_node_tags(
self.node_id, {
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_RUNTIME_CONFIG: self.runtime_hash
})
tags_to_set = {
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_RUNTIME_CONFIG: self.runtime_hash,
}
if self.file_mounts_contents_hash is not None:
tags_to_set[
TAG_RAY_FILE_MOUNTS_CONTENTS] = self.file_mounts_contents_hash
self.provider.set_node_tags(self.node_id, tags_to_set)
cli_logger.labeled_value("New status", STATUS_UP_TO_DATE)
self.exitcode = 0
@@ -182,7 +189,13 @@ class NodeUpdater:
node_tags = self.provider.node_tags(self.node_id)
logger.debug("Node tags: {}".format(str(node_tags)))
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash:
# runtime_hash will only change whenever the user restarts
# or updates their cluster with `get_or_create_head_node`
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash and (
self.file_mounts_contents_hash is None
or node_tags.get(TAG_RAY_FILE_MOUNTS_CONTENTS) ==
self.file_mounts_contents_hash):
# todo: we lie in the confirmation message since
# full setup might be cancelled here
cli_logger.print(
@@ -191,6 +204,7 @@ class NodeUpdater:
cli_logger.old_info(logger,
"{}{} already up-to-date, skip to ray start",
self.log_prefix, self.node_id)
else:
cli_logger.print(
"Updating cluster configuration.",
@@ -201,51 +215,58 @@ class NodeUpdater:
cli_logger.labeled_value("New status", STATUS_SYNCING_FILES)
self.sync_file_mounts(self.rsync_up)
# Run init commands
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP})
cli_logger.labeled_value("New status", STATUS_SETTING_UP)
# Only run setup commands if runtime_hash has changed because
# we don't want to run setup_commands every time the head node
# file_mounts folders have changed.
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) != self.runtime_hash:
# Run init commands
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP})
cli_logger.labeled_value("New status", STATUS_SETTING_UP)
if self.initialization_commands:
with cli_logger.group(
"Running initialization commands",
_numbered=("[]", 3, 5)): # todo: fix command numbering
with LogTimer(
self.log_prefix + "Initialization commands",
show_status=True):
if self.initialization_commands:
with cli_logger.group(
"Running initialization commands",
_numbered=("[]", 3,
5)): # todo: fix command numbering
with LogTimer(
self.log_prefix + "Initialization commands",
show_status=True):
for cmd in self.initialization_commands:
self.cmd_runner.run(
cmd,
ssh_options_override=SSHOptions(
self.auth_config.get("ssh_private_key")))
else:
cli_logger.print(
"No initialization commands to run.",
_numbered=("[]", 3, 5))
for cmd in self.initialization_commands:
self.cmd_runner.run(
cmd,
ssh_options_override=SSHOptions(
self.auth_config.get(
"ssh_private_key")))
else:
cli_logger.print(
"No initialization commands to run.",
_numbered=("[]", 3, 5))
if self.setup_commands:
with cli_logger.group(
"Running setup commands",
_numbered=("[]", 4, 5)): # todo: fix command numbering
with LogTimer(
self.log_prefix + "Setup commands",
show_status=True):
if self.setup_commands:
with cli_logger.group(
"Running setup commands",
_numbered=("[]", 4,
5)): # todo: fix command numbering
with LogTimer(
self.log_prefix + "Setup commands",
show_status=True):
total = len(self.setup_commands)
for i, cmd in enumerate(self.setup_commands):
if cli_logger.verbosity == 0:
cmd_to_print = cf.bold(cmd[:30]) + "..."
else:
cmd_to_print = cf.bold(cmd)
total = len(self.setup_commands)
for i, cmd in enumerate(self.setup_commands):
if cli_logger.verbosity == 0:
cmd_to_print = cf.bold(cmd[:30]) + "..."
else:
cmd_to_print = cf.bold(cmd)
cli_logger.print(
cmd_to_print, _numbered=("()", i, total))
cli_logger.print(
cmd_to_print, _numbered=("()", i, total))
self.cmd_runner.run(cmd)
else:
cli_logger.print(
"No setup commands to run.", _numbered=("[]", 4, 5))
self.cmd_runner.run(cmd)
else:
cli_logger.print(
"No setup commands to run.", _numbered=("[]", 4, 5))
with cli_logger.group(
"Starting the Ray runtime", _numbered=("[]", 5, 5)):
+32 -11
View File
@@ -102,14 +102,27 @@ def hash_launch_conf(node_conf, auth):
_hash_cache = {}
def hash_runtime_conf(file_mounts, extra_objs):
hasher = hashlib.sha1()
def hash_runtime_conf(file_mounts,
extra_objs,
generate_file_mounts_contents_hash=False):
"""Returns two hashes, a runtime hash and file_mounts_content hash.
The runtime hash is used to determine if the configuration or file_mounts
contents have changed. It is used at launch time (ray up) to determine if
a restart is needed.
The file_mounts_content hash is used to determine if the file_mounts
contents have changed. It is used at monitor time to determine if
additional file syncing is needed.
"""
runtime_hasher = hashlib.sha1()
contents_hasher = hashlib.sha1()
def add_content_hashes(path):
def add_hash_of_file(fpath):
with open(fpath, "rb") as f:
for chunk in iter(lambda: f.read(2**20), b""):
hasher.update(chunk)
contents_hasher.update(chunk)
path = os.path.expanduser(path)
if os.path.isdir(path):
@@ -117,9 +130,9 @@ def hash_runtime_conf(file_mounts, extra_objs):
for dirpath, _, filenames in os.walk(path):
dirs.append((dirpath, sorted(filenames)))
for dirpath, filenames in sorted(dirs):
hasher.update(dirpath.encode("utf-8"))
contents_hasher.update(dirpath.encode("utf-8"))
for name in filenames:
hasher.update(name.encode("utf-8"))
contents_hasher.update(name.encode("utf-8"))
fpath = os.path.join(dirpath, name)
add_hash_of_file(fpath)
else:
@@ -128,12 +141,20 @@ def hash_runtime_conf(file_mounts, extra_objs):
conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") +
json.dumps(extra_objs, sort_keys=True).encode("utf-8"))
# Important: only hash the files once. Otherwise, we can end up restarting
# workers if the files were changed and we re-hashed them.
if conf_str not in _hash_cache:
hasher.update(conf_str)
# Only generate a contents hash if generate_contents_hash is true or
# if we need to generate the runtime_hash
if conf_str not in _hash_cache or generate_file_mounts_contents_hash:
for local_path in sorted(file_mounts.values()):
add_content_hashes(local_path)
_hash_cache[conf_str] = hasher.hexdigest()
contents_hash = contents_hasher.hexdigest()
return _hash_cache[conf_str]
# Generate a new runtime_hash if its not cached
if conf_str not in _hash_cache:
runtime_hasher.update(conf_str)
runtime_hasher.update(contents_hash.encode("utf-8"))
_hash_cache[conf_str] = runtime_hasher.hexdigest()
else:
contents_hash = None
return (_hash_cache[conf_str], contents_hash)
+136
View File
@@ -47,6 +47,10 @@ class MockProcessRunner:
raise Exception("Failing command on purpose")
self.calls.append(cmd)
def check_output(self, cmd):
self.check_call(cmd)
return "command-output".encode()
def assert_has_call(self, ip, pattern):
out = ""
for cmd in self.calls:
@@ -285,6 +289,7 @@ class AutoscalingTest(unittest.TestCase):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
self.provider = None
del NODE_PROVIDERS["mock"]
shutil.rmtree(self.tmpdir)
ray.shutdown()
@@ -1083,6 +1088,137 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call("172.0.0.{}".format(i), "start_ray_worker")
def testContinuousFileMounts(self):
file_mount_dir = tempfile.mkdtemp()
self.provider = MockProvider()
config = SMALL_CLUSTER.copy()
config["file_mounts"] = {"/home/test-folder": file_mount_dir}
config["file_mounts_sync_continuously"] = True
config["min_workers"] = 2
config["max_workers"] = 2
config_path = self.write_config(config)
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
autoscaler.update()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
autoscaler.update()
for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
runner.clear_history()
with open(os.path.join(file_mount_dir, "test.txt"), "wb") as temp_file:
temp_file.write("hello".encode())
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
autoscaler.update()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
autoscaler.update()
for i in [0, 1]:
runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
def testFileMountsNonContinuous(self):
file_mount_dir = tempfile.mkdtemp()
self.provider = MockProvider()
config = SMALL_CLUSTER.copy()
config["file_mounts"] = {"/home/test-folder": file_mount_dir}
config["min_workers"] = 2
config["max_workers"] = 2
config_path = self.write_config(config)
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
autoscaler.update()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
autoscaler.update()
for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
runner.clear_history()
with open(os.path.join(file_mount_dir, "test.txt"), "wb") as temp_file:
temp_file.write("hello".encode())
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
for i in [0, 1]:
runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_not_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
# Simulate a second `ray up` call
from ray.autoscaler import util
util._hash_cache = {}
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
self.provider.finish_starting_nodes()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
autoscaler.update()
for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
if __name__ == "__main__":
import sys