From fe23f23680ea334b204f448bc9bafbf7a02cdaeb Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Fri, 11 Sep 2020 14:13:20 -0700 Subject: [PATCH] [tune/rllib] revert removal of queue-trials (#10744) --- python/ray/tune/ray_trial_executor.py | 10 +--------- python/ray/tune/tests/test_api.py | 24 +++++++++++------------- python/ray/tune/trial_executor.py | 24 ++++++++++++++++-------- python/ray/tune/tune.py | 16 +++++++++------- rllib/train.py | 6 +----- 5 files changed, 38 insertions(+), 42 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index dd30b6d78..4aaf19e03 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -12,7 +12,6 @@ import ray from ray.exceptions import GetTimeoutError from ray import ray_constants from ray.resource_spec import ResourceSpec -from ray.tune.cluster_info import is_ray_cluster from ray.tune.durable_trainable import DurableTrainable from ray.tune.error import AbortTrialExecution, TuneError from ray.tune.logger import NoopLogger @@ -136,17 +135,10 @@ class RayTrialExecutor(TrialExecutor): """An implementation of TrialExecutor based on Ray.""" def __init__(self, - queue_trials=None, + queue_trials=False, reuse_actors=False, ray_auto_init=None, refresh_period=RESOURCE_REFRESH_PERIOD): - if queue_trials is None: - if os.environ.get("TUNE_DISABLE_QUEUE_TRIALS") == "1": - logger.info("'TUNE_DISABLE_QUEUE_TRIALS=1' detected.") - queue_trials = False - elif is_ray_cluster(): - queue_trials = True - if ray_auto_init is None: if os.environ.get("TUNE_DISABLE_AUTO_INIT") == "1": logger.info("'TUNE_DISABLE_AUTO_INIT=1' detected.") diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index 3dc3d9fb2..f362967ad 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -245,19 +245,17 @@ class TrainableFunctionApiTest(unittest.TestCase): register_trainable("B", B) def f(cpus, gpus, queue_trials): - if not queue_trials: - os.environ["TUNE_DISABLE_QUEUE_TRIALS"] = "1" - else: - os.environ.pop("TUNE_DISABLE_QUEUE_TRIALS", None) - return run_experiments({ - "foo": { - "run": "B", - "config": { - "cpu": cpus, - "gpu": gpus, - }, - } - })[0] + return run_experiments( + { + "foo": { + "run": "B", + "config": { + "cpu": cpus, + "gpu": gpus, + }, + } + }, + queue_trials=queue_trials)[0] # Should all succeed self.assertEqual(f(0, 0, False).status, Trial.TERMINATED) diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index d8ef607e5..99bd62341 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -3,6 +3,7 @@ import logging from ray.tune.trial import Trial, Checkpoint from ray.tune.error import TuneError +from ray.tune.cluster_info import is_ray_cluster logger = logging.getLogger(__name__) @@ -162,15 +163,22 @@ class TrialExecutor: for trial in trial_runner.get_trials(): if trial.status == Trial.PENDING: if not self.has_resources(trial.resources): + resource_string = trial.resources.summary_string() + trial_resource_help_msg = trial.get_trainable_cls( + ).resource_help(trial.config) + autoscaling_msg = "" + if is_ray_cluster(): + autoscaling_msg = ( + "Pass `queue_trials=True` in ray.tune.run() or " + "on the command line to queue trials until the " + "cluster scales up or resources become available. " + ) raise TuneError( - ("Insufficient cluster resources to launch trial: " - "trial requested {} but the cluster has only {}. " - "This error should not occur if running on an " - "autoscaling cluster. {}").format( - trial.resources.summary_string(), - self.resource_string(), - trial.get_trainable_cls().resource_help( - trial.config))) + "Insufficient cluster resources to launch trial: " + f"trial requested {resource_string}, but the cluster " + f"has only {self.resource_string()}. " + f"{autoscaling_msg}" + f"{trial_resource_help_msg} ") elif trial.status == Trial.PAUSED: raise TuneError("There are paused trials, but no more pending " "trials with sufficient resources.") diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 075ba5c69..2e0a2a1e8 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -95,13 +95,13 @@ def run( restore=None, server_port=None, resume=False, + queue_trials=False, reuse_actors=False, trial_executor=None, raise_on_failed_trial=True, # Deprecated args ray_auto_init=None, run_errored_only=None, - queue_trials=None, global_checkpoint_period=None, with_server=None, upload_dir=None, @@ -246,6 +246,10 @@ def run( ERRORED trials upon resume - previous trial artifacts will be left untouched. If resume is set but checkpoint does not exist, ValueError will be thrown. + queue_trials (bool): Whether to queue trials when the cluster does + not currently have enough resources to launch one. This should + be set to True when running on an autoscaling cluster to enable + automatic scale-up. reuse_actors (bool): Whether to reuse actors between different trials when possible. This can drastically speed up experiments that start and stop actors often (e.g., PBT in time-multiplexing mode). This @@ -264,11 +268,6 @@ def run( if global_checkpoint_period: raise ValueError("global_checkpoint_period is deprecated. Set env var " "'TUNE_GLOBAL_CHECKPOINT_S' instead.") - if queue_trials: - raise ValueError( - "queue_trials is deprecated. " - "Set env var 'TUNE_DISABLE_QUEUE_TRIALS=1' instead to " - "disable queuing behavior.") if ray_auto_init: raise ValueError("ray_auto_init is deprecated. " "Set env var 'TUNE_DISABLE_AUTO_INIT=1' instead or " @@ -294,7 +293,7 @@ def run( set_sync_periods(sync_config) trial_executor = trial_executor or RayTrialExecutor( - reuse_actors=reuse_actors) + reuse_actors=reuse_actors, queue_trials=queue_trials) if isinstance(run_or_experiment, list): experiments = run_or_experiment else: @@ -443,6 +442,7 @@ def run_experiments(experiments, verbose=2, progress_reporter=None, resume=False, + queue_trials=False, reuse_actors=False, trial_executor=None, raise_on_failed_trial=True, @@ -472,6 +472,7 @@ def run_experiments(experiments, verbose=verbose, progress_reporter=progress_reporter, resume=resume, + queue_trials=queue_trials, reuse_actors=reuse_actors, trial_executor=trial_executor, raise_on_failed_trial=raise_on_failed_trial, @@ -485,6 +486,7 @@ def run_experiments(experiments, verbose=verbose, progress_reporter=progress_reporter, resume=resume, + queue_trials=queue_trials, reuse_actors=reuse_actors, trial_executor=trial_executor, raise_on_failed_trial=raise_on_failed_trial, diff --git a/rllib/train.py b/rllib/train.py index 2229e9de7..f16205943 100755 --- a/rllib/train.py +++ b/rllib/train.py @@ -205,15 +205,11 @@ def run(args, parser): num_gpus=args.ray_num_gpus, local_mode=args.local_mode) - if not args.queue_trials: - # TODO: this should be eventually removed as an arg - # because it is already autodetected on an autoscaling cluster. - os.environ["TUNE_DISABLE_QUEUE_TRIALS"] = "1" - run_experiments( experiments, scheduler=_make_scheduler(args), resume=args.resume, + queue_trials=args.queue_trials, verbose=verbose, concurrent=True)