mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
[autoscaler] Allow users to disable the cluster config cache (#8117)
* [autoscaler] Remove autoscaler config cache. * [autoscaler] Add flag allowing users to explicitly disable the config cache.
This commit is contained in:
@@ -86,7 +86,7 @@ def request_resources(num_cpus=None, bundles=None):
|
||||
|
||||
def create_or_update_cluster(config_file, override_min_workers,
|
||||
override_max_workers, no_restart, restart_only,
|
||||
yes, override_cluster_name):
|
||||
yes, override_cluster_name, no_config_cache):
|
||||
"""Create or updates an autoscaling Ray cluster from a config json."""
|
||||
config = yaml.safe_load(open(config_file).read())
|
||||
if override_min_workers is not None:
|
||||
@@ -95,19 +95,19 @@ def create_or_update_cluster(config_file, override_min_workers,
|
||||
config["max_workers"] = override_max_workers
|
||||
if override_cluster_name is not None:
|
||||
config["cluster_name"] = override_cluster_name
|
||||
config = _bootstrap_config(config)
|
||||
config = _bootstrap_config(config, no_config_cache)
|
||||
get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
|
||||
override_cluster_name)
|
||||
|
||||
|
||||
def _bootstrap_config(config):
|
||||
def _bootstrap_config(config, no_config_cache=False):
|
||||
config = prepare_config(config)
|
||||
|
||||
hasher = hashlib.sha1()
|
||||
hasher.update(json.dumps([config], sort_keys=True).encode("utf-8"))
|
||||
cache_key = os.path.join(tempfile.gettempdir(),
|
||||
"ray-config-{}".format(hasher.hexdigest()))
|
||||
if os.path.exists(cache_key):
|
||||
if os.path.exists(cache_key) and not no_config_cache:
|
||||
logger.info("Using cached config at {}".format(cache_key))
|
||||
return json.loads(open(cache_key).read())
|
||||
validate_config(config)
|
||||
@@ -119,8 +119,9 @@ def _bootstrap_config(config):
|
||||
|
||||
bootstrap_config, _ = importer()
|
||||
resolved_config = bootstrap_config(config)
|
||||
with open(cache_key, "w") as f:
|
||||
f.write(json.dumps(resolved_config))
|
||||
if not no_config_cache:
|
||||
with open(cache_key, "w") as f:
|
||||
f.write(json.dumps(resolved_config))
|
||||
return resolved_config
|
||||
|
||||
|
||||
|
||||
@@ -168,7 +168,7 @@ class SessionRunner:
|
||||
raise click.ClickException(
|
||||
"Docker support in session is currently not implemented.")
|
||||
|
||||
def create_cluster(self):
|
||||
def create_cluster(self, no_config_cache):
|
||||
"""Create a cluster that will run the session."""
|
||||
create_or_update_cluster(
|
||||
config_file=self.project_definition.cluster_yaml(),
|
||||
@@ -178,6 +178,7 @@ class SessionRunner:
|
||||
restart_only=False,
|
||||
yes=True,
|
||||
override_cluster_name=self.session_name,
|
||||
no_config_cache=no_config_cache,
|
||||
)
|
||||
|
||||
def sync_files(self):
|
||||
@@ -351,7 +352,12 @@ def stop(name):
|
||||
"the command in the project config"),
|
||||
is_flag=True)
|
||||
@click.option("--name", help="A name to tag the session with.", default=None)
|
||||
def session_start(command, args, shell, name):
|
||||
@click.option(
|
||||
"--no-config-cache",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Disable the local cluster config cache.")
|
||||
def session_start(command, args, shell, name, no_config_cache):
|
||||
project_definition = load_project_or_throw()
|
||||
|
||||
if not name:
|
||||
@@ -375,7 +381,7 @@ def session_start(command, args, shell, name):
|
||||
for run in session_runs:
|
||||
runner = SessionRunner(session_name=run["name"])
|
||||
logger.info("[1/{}] Creating cluster".format(run["num_steps"]))
|
||||
runner.create_cluster()
|
||||
runner.create_cluster(no_config_cache)
|
||||
logger.info("[2/{}] Syncing the project".format(run["num_steps"]))
|
||||
runner.sync_files()
|
||||
logger.info("[3/{}] Setting up environment".format(run["num_steps"]))
|
||||
|
||||
@@ -634,6 +634,11 @@ def stop(force, verbose):
|
||||
default=False,
|
||||
help=("Whether to skip running setup commands and only restart Ray. "
|
||||
"This cannot be used with 'no-restart'."))
|
||||
@click.option(
|
||||
"--no-config-cache",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Disable the local cluster config cache.")
|
||||
@click.option(
|
||||
"--min-workers",
|
||||
required=False,
|
||||
@@ -657,7 +662,7 @@ def stop(force, verbose):
|
||||
default=False,
|
||||
help="Don't ask for confirmation.")
|
||||
def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
|
||||
yes, cluster_name):
|
||||
yes, cluster_name, no_config_cache):
|
||||
"""Create or update a Ray cluster."""
|
||||
if restart_only or no_restart:
|
||||
assert restart_only != no_restart, "Cannot set both 'restart_only' " \
|
||||
@@ -673,7 +678,8 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
|
||||
except urllib.error.HTTPError as e:
|
||||
logger.info("Error downloading file: ", e)
|
||||
create_or_update_cluster(cluster_config_file, min_workers, max_workers,
|
||||
no_restart, restart_only, yes, cluster_name)
|
||||
no_restart, restart_only, yes, cluster_name,
|
||||
no_config_cache)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@@ -889,7 +895,7 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
|
||||
|
||||
if start:
|
||||
create_or_update_cluster(cluster_config_file, None, None, False, False,
|
||||
True, cluster_name)
|
||||
True, cluster_name, False)
|
||||
target = os.path.basename(script)
|
||||
target = os.path.join("~", target)
|
||||
rsync(cluster_config_file, script, target, cluster_name, down=False)
|
||||
|
||||
Reference in New Issue
Block a user