From 776b071f3bd7d83d70319709a5c595f7e3f01bd6 Mon Sep 17 00:00:00 2001 From: Adi Zimmerman Date: Mon, 11 Nov 2019 09:38:14 -0800 Subject: [PATCH] [tune] Let Search Algorithms use early stopped trials (#5651) --- python/ray/tune/suggest/ax.py | 32 ++++++++++++++-------- python/ray/tune/suggest/bayesopt.py | 16 +++++++---- python/ray/tune/suggest/hyperopt.py | 25 ++++++++++++----- python/ray/tune/suggest/nevergrad.py | 17 +++++++++--- python/ray/tune/suggest/sigopt.py | 5 +++- python/ray/tune/suggest/skopt.py | 17 +++++++++--- python/ray/tune/suggest/suggestion.py | 16 +++++++++-- python/ray/tune/tests/test_trial_runner.py | 24 ++++++++++++++++ python/ray/tune/trial_runner.py | 4 ++- 9 files changed, 120 insertions(+), 36 deletions(-) diff --git a/python/ray/tune/suggest/ax.py b/python/ray/tune/suggest/ax.py index 75b982d67..d917879aa 100644 --- a/python/ray/tune/suggest/ax.py +++ b/python/ray/tune/suggest/ax.py @@ -40,6 +40,8 @@ class AxSearch(SuggestionAlgorithm): "x3 >= x4" or "x3 + x4 >= 2". outcome_constraints (list[str]): Outcome constraints of form "metric_name >= bound", like "m1 <= 3." + use_early_stopped_trials (bool): Whether to use early terminated + trial results in the optimization process. Example: @@ -81,22 +83,28 @@ class AxSearch(SuggestionAlgorithm): result=None, error=False, early_terminated=False): - """Pass data back to Ax. + """Notification for the completion of trial. Data of form key value dictionary of metric names and values. """ - ax_trial_index = self._live_index_mapping.pop(trial_id) if result: - metric_dict = { - self._objective_name: (result[self._objective_name], 0.0) - } - outcome_names = [ - oc.metric.name for oc in - self._ax.experiment.optimization_config.outcome_constraints - ] - metric_dict.update({on: (result[on], 0.0) for on in outcome_names}) - self._ax.complete_trial( - trial_index=ax_trial_index, raw_data=metric_dict) + self._process_result(trial_id, result, early_terminated) + self._live_index_mapping.pop(trial_id) + + def _process_result(self, trial_id, result, early_terminated=False): + if early_terminated and self._use_early_stopped is False: + return + ax_trial_index = self._live_index_mapping[trial_id] + metric_dict = { + self._objective_name: (result[self._objective_name], 0.0) + } + outcome_names = [ + oc.metric.name for oc in + self._ax.experiment.optimization_config.outcome_constraints + ] + metric_dict.update({on: (result[on], 0.0) for on in outcome_names}) + self._ax.complete_trial( + trial_index=ax_trial_index, raw_data=metric_dict) def _num_live_trials(self): return len(self._live_index_mapping) diff --git a/python/ray/tune/suggest/bayesopt.py b/python/ray/tune/suggest/bayesopt.py index 65e220354..0bb35d707 100644 --- a/python/ray/tune/suggest/bayesopt.py +++ b/python/ray/tune/suggest/bayesopt.py @@ -33,6 +33,8 @@ class BayesOptSearch(SuggestionAlgorithm): provide values for the keys `kind`, `kappa`, and `xi`. random_state (int): Used to initialize BayesOpt. verbose (int): Sets verbosity level for BayesOpt packages. + use_early_stopped_trials (bool): Whether to use early terminated + trial results in the optimization process. Example: >>> space = { @@ -102,14 +104,18 @@ class BayesOptSearch(SuggestionAlgorithm): result=None, error=False, early_terminated=False): - """Passes the result to BayesOpt unless early terminated or errored""" + """Notification for the completion of trial.""" if result: - self.optimizer.register( - params=self._live_trial_mapping[trial_id], - target=self._metric_op * result[self._metric]) - + self._process_result(trial_id, result, early_terminated) del self._live_trial_mapping[trial_id] + def _process_result(self, trial_id, result, early_terminated=False): + if early_terminated and self._use_early_stopped is False: + return + self.optimizer.register( + params=self._live_trial_mapping[trial_id], + target=self._metric_op * result[self._metric]) + def _num_live_trials(self): return len(self._live_trial_mapping) diff --git a/python/ray/tune/suggest/hyperopt.py b/python/ray/tune/suggest/hyperopt.py index 0470f4751..bb6614fd3 100644 --- a/python/ray/tune/suggest/hyperopt.py +++ b/python/ray/tune/suggest/hyperopt.py @@ -53,6 +53,8 @@ class HyperOptSearch(SuggestionAlgorithm): results. Defaults to None. gamma (float in range (0,1)): parameter governing the tree parzen estimators suggestion algorithm. Defaults to 0.25. + use_early_stopped_trials (bool): Whether to use early terminated + trial results in the optimization process. Example: >>> space = { @@ -171,7 +173,7 @@ class HyperOptSearch(SuggestionAlgorithm): result=None, error=False, early_terminated=False): - """Passes the result to HyperOpt unless early terminated or errored. + """Notification for the completion of trial. The result is internally negated when interacting with HyperOpt so that HyperOpt can "maximize" this value, as it minimizes on default. @@ -183,15 +185,24 @@ class HyperOptSearch(SuggestionAlgorithm): if error: ho_trial["state"] = hpo.base.JOB_STATE_ERROR ho_trial["misc"]["error"] = (str(TuneError), "Tune Error") - elif early_terminated: + self._hpopt_trials.refresh() + else: + self._process_result(trial_id, result, early_terminated) + del self._live_trial_mapping[trial_id] + + def _process_result(self, trial_id, result, early_terminated=False): + ho_trial = self._get_hyperopt_trial(trial_id) + ho_trial["refresh_time"] = hpo.utils.coarse_utcnow() + + if early_terminated and self._use_early_stopped is False: ho_trial["state"] = hpo.base.JOB_STATE_ERROR ho_trial["misc"]["error"] = (str(TuneError), "Tune Removed") - else: - ho_trial["state"] = hpo.base.JOB_STATE_DONE - hp_result = self._to_hyperopt_result(result) - ho_trial["result"] = hp_result + return + + ho_trial["state"] = hpo.base.JOB_STATE_DONE + hp_result = self._to_hyperopt_result(result) + ho_trial["result"] = hp_result self._hpopt_trials.refresh() - del self._live_trial_mapping[trial_id] def _to_hyperopt_result(self, result): return {"loss": self._metric_op * result[self._metric], "status": "ok"} diff --git a/python/ray/tune/suggest/nevergrad.py b/python/ray/tune/suggest/nevergrad.py index 8ce3e9ba5..1685774b9 100644 --- a/python/ray/tune/suggest/nevergrad.py +++ b/python/ray/tune/suggest/nevergrad.py @@ -35,6 +35,8 @@ class NevergradSearch(SuggestionAlgorithm): metric (str): The training result objective value attribute. mode (str): One of {min, max}. Determines whether objective is minimizing or maximizing the metric attribute. + use_early_stopped_trials (bool): Whether to use early terminated + trial results in the optimization process. Example: >>> from nevergrad.optimization import optimizerlib @@ -133,16 +135,23 @@ class NevergradSearch(SuggestionAlgorithm): result=None, error=False, early_terminated=False): - """Passes the result to Nevergrad unless early terminated or errored. + """Notification for the completion of trial. The result is internally negated when interacting with Nevergrad so that Nevergrad Optimizers can "maximize" this value, as it minimizes on default. """ - ng_trial_info = self._live_trial_mapping.pop(trial_id) if result: - self._nevergrad_opt.tell(ng_trial_info, - self._metric_op * result[self._metric]) + self._process_result(trial_id, result, early_terminated) + + self._live_trial_mapping.pop(trial_id) + + def _process_result(self, trial_id, result, early_terminated=False): + if early_terminated and self._use_early_stopped is False: + return + ng_trial_info = self._live_trial_mapping[trial_id] + self._nevergrad_opt.tell(ng_trial_info, + self._metric_op * result[self._metric]) def _num_live_trials(self): return len(self._live_trial_mapping) diff --git a/python/ray/tune/suggest/sigopt.py b/python/ray/tune/suggest/sigopt.py index 030424e02..b88ab0b6b 100644 --- a/python/ray/tune/suggest/sigopt.py +++ b/python/ray/tune/suggest/sigopt.py @@ -78,6 +78,9 @@ class SigOptSearch(SuggestionAlgorithm): "`reward_attr` is deprecated and will be removed in a future " "version of Tune. " "Setting `metric={}` and `mode=max`.".format(reward_attr)) + if "use_early_stopped_trials" in kwargs: + logger.warning( + "`use_early_stopped_trials` is not used in SigOptSearch.") self._max_concurrent = max_concurrent self._metric = metric @@ -118,7 +121,7 @@ class SigOptSearch(SuggestionAlgorithm): result=None, error=False, early_terminated=False): - """Passes the result to SigOpt unless early terminated or errored. + """Notification for the completion of trial. If a trial fails, it will be reported as a failed Observation, telling the optimizer that the Suggestion led to a metric failure, which diff --git a/python/ray/tune/suggest/skopt.py b/python/ray/tune/suggest/skopt.py index 9d65df241..cba3e664c 100644 --- a/python/ray/tune/suggest/skopt.py +++ b/python/ray/tune/suggest/skopt.py @@ -70,6 +70,8 @@ class SkOptSearch(SuggestionAlgorithm): as a list so the optimiser can be told the results without needing to re-compute the trial. Must be the same length as points_to_evaluate. (See tune/examples/skopt_example.py) + use_early_stopped_trials (bool): Whether to use early terminated + trial results in the optimization process. Example: >>> from skopt import Optimizer @@ -145,16 +147,23 @@ class SkOptSearch(SuggestionAlgorithm): result=None, error=False, early_terminated=False): - """Passes the result to skopt unless early terminated or errored. + """Notification for the completion of trial. The result is internally negated when interacting with Skopt so that Skopt Optimizers can "maximize" this value, as it minimizes on default. """ - skopt_trial_info = self._live_trial_mapping.pop(trial_id) + if result: - self._skopt_opt.tell(skopt_trial_info, - self._metric_op * result[self._metric]) + self._process_result(trial_id, result, early_terminated) + self._live_trial_mapping.pop(trial_id) + + def _process_result(self, trial_id, result, early_terminated=False): + if early_terminated and self._use_early_stopped is False: + return + skopt_trial_info = self._live_trial_mapping[trial_id] + self._skopt_opt.tell(skopt_trial_info, + self._metric_op * result[self._metric]) def _num_live_trials(self): return len(self._live_trial_mapping) diff --git a/python/ray/tune/suggest/suggestion.py b/python/ray/tune/suggest/suggestion.py index 8c1144dea..0b936c438 100644 --- a/python/ray/tune/suggest/suggestion.py +++ b/python/ray/tune/suggest/suggestion.py @@ -32,13 +32,14 @@ class SuggestionAlgorithm(SearchAlgorithm): >>> better_parameters = suggester._suggest() """ - def __init__(self): + def __init__(self, use_early_stopped_trials=True): """Constructs a generator given experiment specifications. """ self._parser = make_parser() self._trial_generator = [] self._counter = 0 self._finished = False + self._use_early_stopped = use_early_stopped_trials def add_configurations(self, experiments): """Chains generator given experiment specifications. @@ -141,6 +142,7 @@ class _MockSuggestionAlgorithm(SuggestionAlgorithm): self._max_concurrent = max_concurrent self.live_trials = {} self.counter = {"result": 0, "complete": 0} + self.final_results = [] self.stall = False self.results = [] super(_MockSuggestionAlgorithm, self).__init__(**kwargs) @@ -155,6 +157,16 @@ class _MockSuggestionAlgorithm(SuggestionAlgorithm): self.counter["result"] += 1 self.results += [result] - def on_trial_complete(self, trial_id, **kwargs): + def on_trial_complete(self, + trial_id, + result=None, + error=False, + early_terminated=False): self.counter["complete"] += 1 + if result: + self._process_result(result, early_terminated) del self.live_trials[trial_id] + + def _process_result(self, result, early_terminated): + if early_terminated and self._use_early_stopped: + self.final_results += [result] diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index 984418048..9b208985e 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -2216,6 +2216,30 @@ class TrialRunnerTest(unittest.TestCase): self.assertTrue(searcher.is_finished()) self.assertTrue(runner.is_finished()) + def testSearchAlgSchedulerEarlyStop(self): + """Early termination notif to Searcher can be turned off.""" + + class _MockScheduler(FIFOScheduler): + def on_trial_result(self, *args, **kwargs): + return TrialScheduler.STOP + + ray.init(num_cpus=4, num_gpus=2) + experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}} + experiments = [Experiment.from_json("test", experiment_spec)] + searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=True) + searcher.add_configurations(experiments) + runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler()) + runner.step() + runner.step() + self.assertEqual(len(searcher.final_results), 1) + + searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=False) + searcher.add_configurations(experiments) + runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler()) + runner.step() + runner.step() + self.assertEqual(len(searcher.final_results), 0) + def testSearchAlgStalled(self): """Checks that runner and searcher state is maintained when stalled.""" ray.init(num_cpus=4, num_gpus=2) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index f8c7feac5..6bfe77cb6 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -465,7 +465,9 @@ class TrialRunner(object): if decision == TrialScheduler.STOP: with warn_if_slow("search_alg.on_trial_complete"): self._search_alg.on_trial_complete( - trial.trial_id, early_terminated=True) + trial.trial_id, + result=flat_result, + early_terminated=True) if not is_duplicate: trial.update_last_result(