mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 04:42:11 +08:00
[tune] Let Search Algorithms use early stopped trials (#5651)
This commit is contained in:
committed by
Richard Liaw
parent
5780ec1b62
commit
776b071f3b
@@ -40,6 +40,8 @@ class AxSearch(SuggestionAlgorithm):
|
||||
"x3 >= x4" or "x3 + x4 >= 2".
|
||||
outcome_constraints (list[str]): Outcome constraints of form
|
||||
"metric_name >= bound", like "m1 <= 3."
|
||||
use_early_stopped_trials (bool): Whether to use early terminated
|
||||
trial results in the optimization process.
|
||||
|
||||
|
||||
Example:
|
||||
@@ -81,22 +83,28 @@ class AxSearch(SuggestionAlgorithm):
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
"""Pass data back to Ax.
|
||||
"""Notification for the completion of trial.
|
||||
|
||||
Data of form key value dictionary of metric names and values.
|
||||
"""
|
||||
ax_trial_index = self._live_index_mapping.pop(trial_id)
|
||||
if result:
|
||||
metric_dict = {
|
||||
self._objective_name: (result[self._objective_name], 0.0)
|
||||
}
|
||||
outcome_names = [
|
||||
oc.metric.name for oc in
|
||||
self._ax.experiment.optimization_config.outcome_constraints
|
||||
]
|
||||
metric_dict.update({on: (result[on], 0.0) for on in outcome_names})
|
||||
self._ax.complete_trial(
|
||||
trial_index=ax_trial_index, raw_data=metric_dict)
|
||||
self._process_result(trial_id, result, early_terminated)
|
||||
self._live_index_mapping.pop(trial_id)
|
||||
|
||||
def _process_result(self, trial_id, result, early_terminated=False):
|
||||
if early_terminated and self._use_early_stopped is False:
|
||||
return
|
||||
ax_trial_index = self._live_index_mapping[trial_id]
|
||||
metric_dict = {
|
||||
self._objective_name: (result[self._objective_name], 0.0)
|
||||
}
|
||||
outcome_names = [
|
||||
oc.metric.name for oc in
|
||||
self._ax.experiment.optimization_config.outcome_constraints
|
||||
]
|
||||
metric_dict.update({on: (result[on], 0.0) for on in outcome_names})
|
||||
self._ax.complete_trial(
|
||||
trial_index=ax_trial_index, raw_data=metric_dict)
|
||||
|
||||
def _num_live_trials(self):
|
||||
return len(self._live_index_mapping)
|
||||
|
||||
@@ -33,6 +33,8 @@ class BayesOptSearch(SuggestionAlgorithm):
|
||||
provide values for the keys `kind`, `kappa`, and `xi`.
|
||||
random_state (int): Used to initialize BayesOpt.
|
||||
verbose (int): Sets verbosity level for BayesOpt packages.
|
||||
use_early_stopped_trials (bool): Whether to use early terminated
|
||||
trial results in the optimization process.
|
||||
|
||||
Example:
|
||||
>>> space = {
|
||||
@@ -102,14 +104,18 @@ class BayesOptSearch(SuggestionAlgorithm):
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
"""Passes the result to BayesOpt unless early terminated or errored"""
|
||||
"""Notification for the completion of trial."""
|
||||
if result:
|
||||
self.optimizer.register(
|
||||
params=self._live_trial_mapping[trial_id],
|
||||
target=self._metric_op * result[self._metric])
|
||||
|
||||
self._process_result(trial_id, result, early_terminated)
|
||||
del self._live_trial_mapping[trial_id]
|
||||
|
||||
def _process_result(self, trial_id, result, early_terminated=False):
|
||||
if early_terminated and self._use_early_stopped is False:
|
||||
return
|
||||
self.optimizer.register(
|
||||
params=self._live_trial_mapping[trial_id],
|
||||
target=self._metric_op * result[self._metric])
|
||||
|
||||
def _num_live_trials(self):
|
||||
return len(self._live_trial_mapping)
|
||||
|
||||
|
||||
@@ -53,6 +53,8 @@ class HyperOptSearch(SuggestionAlgorithm):
|
||||
results. Defaults to None.
|
||||
gamma (float in range (0,1)): parameter governing the tree parzen
|
||||
estimators suggestion algorithm. Defaults to 0.25.
|
||||
use_early_stopped_trials (bool): Whether to use early terminated
|
||||
trial results in the optimization process.
|
||||
|
||||
Example:
|
||||
>>> space = {
|
||||
@@ -171,7 +173,7 @@ class HyperOptSearch(SuggestionAlgorithm):
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
"""Passes the result to HyperOpt unless early terminated or errored.
|
||||
"""Notification for the completion of trial.
|
||||
|
||||
The result is internally negated when interacting with HyperOpt
|
||||
so that HyperOpt can "maximize" this value, as it minimizes on default.
|
||||
@@ -183,15 +185,24 @@ class HyperOptSearch(SuggestionAlgorithm):
|
||||
if error:
|
||||
ho_trial["state"] = hpo.base.JOB_STATE_ERROR
|
||||
ho_trial["misc"]["error"] = (str(TuneError), "Tune Error")
|
||||
elif early_terminated:
|
||||
self._hpopt_trials.refresh()
|
||||
else:
|
||||
self._process_result(trial_id, result, early_terminated)
|
||||
del self._live_trial_mapping[trial_id]
|
||||
|
||||
def _process_result(self, trial_id, result, early_terminated=False):
|
||||
ho_trial = self._get_hyperopt_trial(trial_id)
|
||||
ho_trial["refresh_time"] = hpo.utils.coarse_utcnow()
|
||||
|
||||
if early_terminated and self._use_early_stopped is False:
|
||||
ho_trial["state"] = hpo.base.JOB_STATE_ERROR
|
||||
ho_trial["misc"]["error"] = (str(TuneError), "Tune Removed")
|
||||
else:
|
||||
ho_trial["state"] = hpo.base.JOB_STATE_DONE
|
||||
hp_result = self._to_hyperopt_result(result)
|
||||
ho_trial["result"] = hp_result
|
||||
return
|
||||
|
||||
ho_trial["state"] = hpo.base.JOB_STATE_DONE
|
||||
hp_result = self._to_hyperopt_result(result)
|
||||
ho_trial["result"] = hp_result
|
||||
self._hpopt_trials.refresh()
|
||||
del self._live_trial_mapping[trial_id]
|
||||
|
||||
def _to_hyperopt_result(self, result):
|
||||
return {"loss": self._metric_op * result[self._metric], "status": "ok"}
|
||||
|
||||
@@ -35,6 +35,8 @@ class NevergradSearch(SuggestionAlgorithm):
|
||||
metric (str): The training result objective value attribute.
|
||||
mode (str): One of {min, max}. Determines whether objective is
|
||||
minimizing or maximizing the metric attribute.
|
||||
use_early_stopped_trials (bool): Whether to use early terminated
|
||||
trial results in the optimization process.
|
||||
|
||||
Example:
|
||||
>>> from nevergrad.optimization import optimizerlib
|
||||
@@ -133,16 +135,23 @@ class NevergradSearch(SuggestionAlgorithm):
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
"""Passes the result to Nevergrad unless early terminated or errored.
|
||||
"""Notification for the completion of trial.
|
||||
|
||||
The result is internally negated when interacting with Nevergrad
|
||||
so that Nevergrad Optimizers can "maximize" this value,
|
||||
as it minimizes on default.
|
||||
"""
|
||||
ng_trial_info = self._live_trial_mapping.pop(trial_id)
|
||||
if result:
|
||||
self._nevergrad_opt.tell(ng_trial_info,
|
||||
self._metric_op * result[self._metric])
|
||||
self._process_result(trial_id, result, early_terminated)
|
||||
|
||||
self._live_trial_mapping.pop(trial_id)
|
||||
|
||||
def _process_result(self, trial_id, result, early_terminated=False):
|
||||
if early_terminated and self._use_early_stopped is False:
|
||||
return
|
||||
ng_trial_info = self._live_trial_mapping[trial_id]
|
||||
self._nevergrad_opt.tell(ng_trial_info,
|
||||
self._metric_op * result[self._metric])
|
||||
|
||||
def _num_live_trials(self):
|
||||
return len(self._live_trial_mapping)
|
||||
|
||||
@@ -78,6 +78,9 @@ class SigOptSearch(SuggestionAlgorithm):
|
||||
"`reward_attr` is deprecated and will be removed in a future "
|
||||
"version of Tune. "
|
||||
"Setting `metric={}` and `mode=max`.".format(reward_attr))
|
||||
if "use_early_stopped_trials" in kwargs:
|
||||
logger.warning(
|
||||
"`use_early_stopped_trials` is not used in SigOptSearch.")
|
||||
|
||||
self._max_concurrent = max_concurrent
|
||||
self._metric = metric
|
||||
@@ -118,7 +121,7 @@ class SigOptSearch(SuggestionAlgorithm):
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
"""Passes the result to SigOpt unless early terminated or errored.
|
||||
"""Notification for the completion of trial.
|
||||
|
||||
If a trial fails, it will be reported as a failed Observation, telling
|
||||
the optimizer that the Suggestion led to a metric failure, which
|
||||
|
||||
@@ -70,6 +70,8 @@ class SkOptSearch(SuggestionAlgorithm):
|
||||
as a list so the optimiser can be told the results without
|
||||
needing to re-compute the trial. Must be the same length as
|
||||
points_to_evaluate. (See tune/examples/skopt_example.py)
|
||||
use_early_stopped_trials (bool): Whether to use early terminated
|
||||
trial results in the optimization process.
|
||||
|
||||
Example:
|
||||
>>> from skopt import Optimizer
|
||||
@@ -145,16 +147,23 @@ class SkOptSearch(SuggestionAlgorithm):
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
"""Passes the result to skopt unless early terminated or errored.
|
||||
"""Notification for the completion of trial.
|
||||
|
||||
The result is internally negated when interacting with Skopt
|
||||
so that Skopt Optimizers can "maximize" this value,
|
||||
as it minimizes on default.
|
||||
"""
|
||||
skopt_trial_info = self._live_trial_mapping.pop(trial_id)
|
||||
|
||||
if result:
|
||||
self._skopt_opt.tell(skopt_trial_info,
|
||||
self._metric_op * result[self._metric])
|
||||
self._process_result(trial_id, result, early_terminated)
|
||||
self._live_trial_mapping.pop(trial_id)
|
||||
|
||||
def _process_result(self, trial_id, result, early_terminated=False):
|
||||
if early_terminated and self._use_early_stopped is False:
|
||||
return
|
||||
skopt_trial_info = self._live_trial_mapping[trial_id]
|
||||
self._skopt_opt.tell(skopt_trial_info,
|
||||
self._metric_op * result[self._metric])
|
||||
|
||||
def _num_live_trials(self):
|
||||
return len(self._live_trial_mapping)
|
||||
|
||||
@@ -32,13 +32,14 @@ class SuggestionAlgorithm(SearchAlgorithm):
|
||||
>>> better_parameters = suggester._suggest()
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, use_early_stopped_trials=True):
|
||||
"""Constructs a generator given experiment specifications.
|
||||
"""
|
||||
self._parser = make_parser()
|
||||
self._trial_generator = []
|
||||
self._counter = 0
|
||||
self._finished = False
|
||||
self._use_early_stopped = use_early_stopped_trials
|
||||
|
||||
def add_configurations(self, experiments):
|
||||
"""Chains generator given experiment specifications.
|
||||
@@ -141,6 +142,7 @@ class _MockSuggestionAlgorithm(SuggestionAlgorithm):
|
||||
self._max_concurrent = max_concurrent
|
||||
self.live_trials = {}
|
||||
self.counter = {"result": 0, "complete": 0}
|
||||
self.final_results = []
|
||||
self.stall = False
|
||||
self.results = []
|
||||
super(_MockSuggestionAlgorithm, self).__init__(**kwargs)
|
||||
@@ -155,6 +157,16 @@ class _MockSuggestionAlgorithm(SuggestionAlgorithm):
|
||||
self.counter["result"] += 1
|
||||
self.results += [result]
|
||||
|
||||
def on_trial_complete(self, trial_id, **kwargs):
|
||||
def on_trial_complete(self,
|
||||
trial_id,
|
||||
result=None,
|
||||
error=False,
|
||||
early_terminated=False):
|
||||
self.counter["complete"] += 1
|
||||
if result:
|
||||
self._process_result(result, early_terminated)
|
||||
del self.live_trials[trial_id]
|
||||
|
||||
def _process_result(self, result, early_terminated):
|
||||
if early_terminated and self._use_early_stopped:
|
||||
self.final_results += [result]
|
||||
|
||||
@@ -2216,6 +2216,30 @@ class TrialRunnerTest(unittest.TestCase):
|
||||
self.assertTrue(searcher.is_finished())
|
||||
self.assertTrue(runner.is_finished())
|
||||
|
||||
def testSearchAlgSchedulerEarlyStop(self):
|
||||
"""Early termination notif to Searcher can be turned off."""
|
||||
|
||||
class _MockScheduler(FIFOScheduler):
|
||||
def on_trial_result(self, *args, **kwargs):
|
||||
return TrialScheduler.STOP
|
||||
|
||||
ray.init(num_cpus=4, num_gpus=2)
|
||||
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
|
||||
experiments = [Experiment.from_json("test", experiment_spec)]
|
||||
searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=True)
|
||||
searcher.add_configurations(experiments)
|
||||
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
|
||||
runner.step()
|
||||
runner.step()
|
||||
self.assertEqual(len(searcher.final_results), 1)
|
||||
|
||||
searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=False)
|
||||
searcher.add_configurations(experiments)
|
||||
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
|
||||
runner.step()
|
||||
runner.step()
|
||||
self.assertEqual(len(searcher.final_results), 0)
|
||||
|
||||
def testSearchAlgStalled(self):
|
||||
"""Checks that runner and searcher state is maintained when stalled."""
|
||||
ray.init(num_cpus=4, num_gpus=2)
|
||||
|
||||
@@ -465,7 +465,9 @@ class TrialRunner(object):
|
||||
if decision == TrialScheduler.STOP:
|
||||
with warn_if_slow("search_alg.on_trial_complete"):
|
||||
self._search_alg.on_trial_complete(
|
||||
trial.trial_id, early_terminated=True)
|
||||
trial.trial_id,
|
||||
result=flat_result,
|
||||
early_terminated=True)
|
||||
|
||||
if not is_duplicate:
|
||||
trial.update_last_result(
|
||||
|
||||
Reference in New Issue
Block a user