diff --git a/doc/source/tune-package-ref.rst b/doc/source/tune-package-ref.rst index 7b7221385..c83451720 100644 --- a/doc/source/tune-package-ref.rst +++ b/doc/source/tune-package-ref.rst @@ -38,6 +38,8 @@ ray.tune.suggest :private-members: :show-inheritance: +.. autoclass:: ray.tune.suggest.Repeater + ray.tune.track -------------- diff --git a/doc/source/tune-searchalg.rst b/doc/source/tune-searchalg.rst index fc4526307..8f2bf90a1 100644 --- a/doc/source/tune-searchalg.rst +++ b/doc/source/tune-searchalg.rst @@ -33,6 +33,34 @@ By default, Tune uses the `default search space and variant generation process < Note that other search algorithms will not necessarily extend this class and may require a different search space declaration than the default Tune format. + +Repeated Evaluations +-------------------- + +Use ``ray.tune.suggest.Repeater`` to average over multiple evaluations of the same +hyperparameter configurations. This is useful in cases where the evaluated +training procedure has high variance (i.e., in reinforcement learning). + +By default, ``Repeater`` will take in a ``repeat`` parameter and a ``search_alg``. +The ``search_alg`` will suggest new configurations to try, and the ``Repeater`` +will run ``repeat`` trials of the configuration. It will then average the +``search_alg.metric`` from the final results of each repeated trial. + +See `Repeater `_ docstring for more details. + +.. code-block:: python + + from ray.tune.suggest import Repeater + + search_alg = BayesOpt(...) + re_search_alg = Repeater(search_alg, repeat=10) + tune.run(trainable, search_alg=re_search_alg) + +.. note:: This does not apply for grid search and random search. +.. warning:: It is recommended to not use ``Repeater`` with a TrialScheduler. + Early termination can negatively affect the average reported metric. + + BayesOpt Search --------------- @@ -241,11 +269,11 @@ If you are interested in implementing or contributing a new Search Algorithm, th Model-Based Suggestion Algorithms ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Often times, hyperparameter search algorithms are model-based and may be quite simple to implement. For this, one can extend the following abstract class and implement ``on_trial_result``, ``on_trial_complete``, and ``_suggest``. The abstract class will take care of Tune-specific boilerplate such as creating Trials and queuing trials: +Often times, hyperparameter search algorithms are model-based and may be quite simple to implement. For this, one can extend the following abstract class and implement ``on_trial_result``, ``on_trial_complete``, and ``suggest``. The abstract class will take care of Tune-specific boilerplate such as creating Trials and queuing trials: .. autoclass:: ray.tune.suggest.SuggestionAlgorithm :show-inheritance: :noindex: - .. automethod:: ray.tune.suggest.SuggestionAlgorithm._suggest + .. automethod:: ray.tune.suggest.SuggestionAlgorithm.suggest :noindex: diff --git a/python/ray/tune/suggest/__init__.py b/python/ray/tune/suggest/__init__.py index a182f6b1a..24ebdb897 100644 --- a/python/ray/tune/suggest/__init__.py +++ b/python/ray/tune/suggest/__init__.py @@ -2,11 +2,12 @@ from ray.tune.suggest.search import SearchAlgorithm from ray.tune.suggest.basic_variant import BasicVariantGenerator from ray.tune.suggest.suggestion import SuggestionAlgorithm from ray.tune.suggest.variant_generator import grid_search +from ray.tune.suggest.repeater import Repeater from ray.tune.suggest.bohb import TuneBOHB __all__ = [ "SearchAlgorithm", "BasicVariantGenerator", "SuggestionAlgorithm", - "grid_search", "TuneBOHB" + "grid_search", "TuneBOHB", "Repeater" ] diff --git a/python/ray/tune/suggest/ax.py b/python/ray/tune/suggest/ax.py index 92071995d..d1fcb1d48 100644 --- a/python/ray/tune/suggest/ax.py +++ b/python/ray/tune/suggest/ax.py @@ -30,8 +30,8 @@ class AxSearch(SuggestionAlgorithm): reported/returned by the Trainable. max_concurrent (int): Number of maximum concurrent trials. Defaults to 10. - minimize (bool): Whether this experiment represents a minimization - problem. Defaults to False. + mode (str): One of {min, max}. Determines whether objective is + minimizing or maximizing the metric attribute. Defaults to "max". parameter_constraints (list[str]): Parameter constraints, such as "x3 >= x4" or "x3 + x4 >= 2". outcome_constraints (list[str]): Outcome constraints of form @@ -49,7 +49,7 @@ class AxSearch(SuggestionAlgorithm): >>> objective_name="hartmann6", max_concurrent=4) """ - def __init__(self, ax_client, max_concurrent=10, **kwargs): + def __init__(self, ax_client, max_concurrent=10, mode="max", **kwargs): assert ax is not None, "Ax must be installed!" assert type(max_concurrent) is int and max_concurrent > 0 self._ax = ax_client @@ -62,9 +62,10 @@ class AxSearch(SuggestionAlgorithm): self._max_concurrent = max_concurrent self._parameters = list(exp.parameters) self._live_index_mapping = {} - super(AxSearch, self).__init__(**kwargs) + super(AxSearch, self).__init__( + metric=self._objective_name, mode=mode, **kwargs) - def _suggest(self, trial_id): + def suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None parameters, trial_index = self._ax.get_next_trial() diff --git a/python/ray/tune/suggest/basic_variant.py b/python/ray/tune/suggest/basic_variant.py index a40a52c02..3216c9b9e 100644 --- a/python/ray/tune/suggest/basic_variant.py +++ b/python/ray/tune/suggest/basic_variant.py @@ -43,7 +43,9 @@ class BasicVariantGenerator(SearchAlgorithm): for experiment in experiment_list: self._trial_generator = itertools.chain( self._trial_generator, - self._generate_trials(experiment.spec, experiment.name)) + self._generate_trials( + experiment.spec.get("num_samples", 1), experiment.spec, + experiment.name)) def next_trials(self): """Provides Trial objects to be queued into the TrialRunner. @@ -57,7 +59,7 @@ class BasicVariantGenerator(SearchAlgorithm): self._finished = True return trials - def _generate_trials(self, unresolved_spec, output_path=""): + def _generate_trials(self, num_samples, unresolved_spec, output_path=""): """Generates Trial objects with the variant generation process. Uses a fixed point iteration to resolve variants. All trials @@ -71,7 +73,7 @@ class BasicVariantGenerator(SearchAlgorithm): if "run" not in unresolved_spec: raise TuneError("Must specify `run` in {}".format(unresolved_spec)) - for _ in range(unresolved_spec.get("num_samples", 1)): + for _ in range(num_samples): for resolved_vars, spec in generate_variants(unresolved_spec): trial_id = "%05d" % self._counter experiment_tag = str(self._counter) diff --git a/python/ray/tune/suggest/bayesopt.py b/python/ray/tune/suggest/bayesopt.py index be39f9a45..50737086d 100644 --- a/python/ray/tune/suggest/bayesopt.py +++ b/python/ray/tune/suggest/bayesopt.py @@ -80,9 +80,10 @@ class BayesOptSearch(SuggestionAlgorithm): self.utility = byo.UtilityFunction(**utility_kwargs) - super(BayesOptSearch, self).__init__(**kwargs) + super(BayesOptSearch, self).__init__( + metric=self._metric, mode=mode, **kwargs) - def _suggest(self, trial_id): + def suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None diff --git a/python/ray/tune/suggest/bohb.py b/python/ray/tune/suggest/bohb.py index 9972d920a..32b37f9cc 100644 --- a/python/ray/tune/suggest/bohb.py +++ b/python/ray/tune/suggest/bohb.py @@ -71,16 +71,16 @@ class TuneBOHB(SuggestionAlgorithm): self.trial_to_params = {} self.running = set() self.paused = set() - self.metric = metric + self._metric = metric if mode == "max": self._metric_op = -1. elif mode == "min": self._metric_op = 1. bohb_config = bohb_config or {} self.bohber = BOHB(space, **bohb_config) - super(TuneBOHB, self).__init__() + super(TuneBOHB, self).__init__(metric=self._metric, mode=mode) - def _suggest(self, trial_id): + def suggest(self, trial_id): if len(self.running) < self._max_concurrent: # This parameter is not used in hpbandster implementation. config, info = self.bohber.get_config(None) diff --git a/python/ray/tune/suggest/hyperopt.py b/python/ray/tune/suggest/hyperopt.py index bd9bee72b..7dc434823 100644 --- a/python/ray/tune/suggest/hyperopt.py +++ b/python/ray/tune/suggest/hyperopt.py @@ -122,9 +122,10 @@ class HyperOptSearch(SuggestionAlgorithm): else: self.rstate = np.random.RandomState(random_state_seed) - super(HyperOptSearch, self).__init__(**kwargs) + super(HyperOptSearch, self).__init__( + metric=self._metric, mode=mode, **kwargs) - def _suggest(self, trial_id): + def suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None diff --git a/python/ray/tune/suggest/nevergrad.py b/python/ray/tune/suggest/nevergrad.py index be9a36447..f83ec552c 100644 --- a/python/ray/tune/suggest/nevergrad.py +++ b/python/ray/tune/suggest/nevergrad.py @@ -86,7 +86,8 @@ class NevergradSearch(SuggestionAlgorithm): self._metric_op = 1. self._nevergrad_opt = optimizer self._live_trial_mapping = {} - super(NevergradSearch, self).__init__(**kwargs) + super(NevergradSearch, self).__init__( + metric=metric, mode=mode, **kwargs) # validate parameters if hasattr(optimizer, "instrumentation"): # added in v0.2.0 if optimizer.instrumentation.kwargs: @@ -108,7 +109,7 @@ class NevergradSearch(SuggestionAlgorithm): raise ValueError("len(parameters_names) must match optimizer " "dimension for non-instrumented optimizers") - def _suggest(self, trial_id): + def suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None suggested_config = self._nevergrad_opt.ask() diff --git a/python/ray/tune/suggest/repeater.py b/python/ray/tune/suggest/repeater.py new file mode 100644 index 000000000..38c8b127c --- /dev/null +++ b/python/ray/tune/suggest/repeater.py @@ -0,0 +1,153 @@ +import copy +import itertools +import logging +import numpy as np + +from ray.tune.suggest.suggestion import SuggestionAlgorithm +from ray.tune.experiment import convert_to_experiment_list + +logger = logging.getLogger(__name__) + +TRIAL_INDEX = "__trial_index__" +"""str: A constant value representing the repeat index of the trial.""" + + +class _TrialGroup: + """Internal class for grouping trials of same parameters. + + This is used when repeating trials for reducing training variance. + + Args: + primary_trial_id (str): Trial ID of the "primary trial". + This trial is the one that the Searcher is aware of. + config (dict): Suggested configuration shared across all trials + in the trial group. + max_trials (int): Max number of trials to execute within this group. + + """ + + def __init__(self, primary_trial_id, config, max_trials=1): + assert type(config) is dict, ( + "config is not a dict, got {}".format(config)) + self.primary_trial_id = primary_trial_id + self.config = config + self._trials = {primary_trial_id: None} + self.max_trials = max_trials + + def add(self, trial_id): + assert len(self._trials) < self.max_trials + self._trials[trial_id] = None + + def full(self): + return len(self._trials) == self.max_trials + + def report(self, trial_id, score): + assert trial_id in self._trials + if score is None: + raise ValueError("Internal Error: Score cannot be None.") + self._trials[trial_id] = score + + def finished_reporting(self): + return None not in self._trials.values() + + def scores(self): + return list(self._trials.values()) + + def count(self): + return len(self._trials) + + +class Repeater(SuggestionAlgorithm): + """A wrapper algorithm for repeating trials of same parameters. + + It is recommended that you do not run an early-stopping TrialScheduler + simultaneously. + + Args: + search_alg (SearchAlgorithm): SearchAlgorithm object that the + Repeater will optimize. Note that the SearchAlgorithm + will only see 1 trial among multiple repeated trials. + The result/metric passed to the SearchAlgorithm upon + trial completion will be averaged among all repeats. + repeat (int): Number of times to generate a trial with a repeated + configuration. Defaults to 1. + set_index (bool): Sets a tune.suggest.repeater.TRIAL_INDEX in + Trainable/Function config which corresponds to the index of the + repeated trial. This can be used for seeds. Defaults to True. + + """ + + def __init__(self, search_alg, repeat=1, set_index=True): + self.search_alg = search_alg + self._repeat = repeat + self._set_index = set_index + self._groups = [] + self._trial_id_to_group = {} + self._current_group = None + super(Repeater, self).__init__( + metric=self.search_alg.metric, + mode=self.search_alg.mode, + use_early_stopped_trials=self.search_alg._use_early_stopped) + + def add_configurations(self, experiments): + """Chains generator given experiment specifications. + + Multiplies the number of trials by the repeat factor. + + Arguments: + experiments (Experiment | list | dict): Experiments to run. + """ + experiment_list = convert_to_experiment_list(experiments) + for experiment in experiment_list: + self._trial_generator = itertools.chain( + self._trial_generator, + self._generate_trials( + experiment.spec.get("num_samples", 1) * self._repeat, + experiment.spec, experiment.name)) + + def suggest(self, trial_id): + if self._current_group is None or self._current_group.full(): + config = self.search_alg.suggest(trial_id) + if config is None: + return config + self._current_group = _TrialGroup( + trial_id, copy.deepcopy(config), max_trials=self._repeat) + self._groups.append(self._current_group) + index_in_group = 0 + else: + index_in_group = self._current_group.count() + self._current_group.add(trial_id) + + config = self._current_group.config.copy() + if self._set_index: + config[TRIAL_INDEX] = index_in_group + self._trial_id_to_group[trial_id] = self._current_group + return config + + def on_trial_complete(self, trial_id, result=None, **kwargs): + """Stores the score for and keeps track of a completed trial. + + Stores the metric of a trial as nan if any of the following conditions + are met: + + 1. ``result`` is empty or not provided. + 2. ``result`` is provided but no metric was provided. + + """ + if trial_id not in self._trial_id_to_group: + logger.error("Trial {} not in group; cannot report score. " + "Seen trials: {}".format( + trial_id, list(self._trial_id_to_group))) + trial_group = self._trial_id_to_group[trial_id] + if not result or self.search_alg.metric not in result: + score = np.nan + else: + score = result[self.search_alg.metric] + trial_group.report(trial_id, score) + + if trial_group.finished_reporting(): + scores = trial_group.scores() + self.search_alg.on_trial_complete( + trial_group.primary_trial_id, + result={self.search_alg.metric: np.nanmean(scores)}, + **kwargs) diff --git a/python/ray/tune/suggest/sigopt.py b/python/ray/tune/suggest/sigopt.py index 0f825e78e..1aa9baf5f 100644 --- a/python/ray/tune/suggest/sigopt.py +++ b/python/ray/tune/suggest/sigopt.py @@ -95,9 +95,9 @@ class SigOptSearch(SuggestionAlgorithm): parallel_bandwidth=self._max_concurrent, ) - super(SigOptSearch, self).__init__(**kwargs) + super(SigOptSearch, self).__init__(metric=metric, mode=mode, **kwargs) - def _suggest(self, trial_id): + def suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None diff --git a/python/ray/tune/suggest/skopt.py b/python/ray/tune/suggest/skopt.py index e0d9b8577..4456488fe 100644 --- a/python/ray/tune/suggest/skopt.py +++ b/python/ray/tune/suggest/skopt.py @@ -122,9 +122,10 @@ class SkOptSearch(SuggestionAlgorithm): self._metric_op = 1. self._skopt_opt = optimizer self._live_trial_mapping = {} - super(SkOptSearch, self).__init__(**kwargs) + super(SkOptSearch, self).__init__( + metric=self._metric, mode=mode, **kwargs) - def _suggest(self, trial_id): + def suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None if self._initial_points: diff --git a/python/ray/tune/suggest/suggestion.py b/python/ray/tune/suggest/suggestion.py index 51f621970..f4fc11b88 100644 --- a/python/ray/tune/suggest/suggestion.py +++ b/python/ray/tune/suggest/suggestion.py @@ -14,27 +14,29 @@ class SuggestionAlgorithm(SearchAlgorithm): """Abstract class for suggestion-based algorithms. Custom search algorithms can extend this class easily by overriding the - `_suggest` method provide generated parameters for the trials. + `suggest` method provide generated parameters for the trials. To track suggestions and their corresponding evaluations, the method - `_suggest` will be passed a trial_id, which will be used in + `suggest` will be passed a trial_id, which will be used in subsequent notifications. Example: >>> suggester = SuggestionAlgorithm() >>> suggester.add_configurations({ ... }) - >>> new_parameters = suggester._suggest() + >>> new_parameters = suggester.suggest() >>> suggester.on_trial_complete(trial_id, result) - >>> better_parameters = suggester._suggest() + >>> better_parameters = suggester.suggest() """ - def __init__(self, use_early_stopped_trials=True): - """Constructs a generator given experiment specifications. - """ + def __init__(self, metric=None, mode="max", use_early_stopped_trials=True): + """Constructs a generator given experiment specifications.""" self._parser = make_parser() self._trial_generator = [] self._counter = 0 self._finished = False + self._metric = metric + assert mode in ["min", "max"] + self._mode = mode self._use_early_stopped = use_early_stopped_trials def add_configurations(self, experiments): @@ -47,7 +49,9 @@ class SuggestionAlgorithm(SearchAlgorithm): for experiment in experiment_list: self._trial_generator = itertools.chain( self._trial_generator, - self._generate_trials(experiment.spec, experiment.name)) + self._generate_trials( + experiment.spec.get("num_samples", 1), experiment.spec, + experiment.name)) def next_trials(self): """Provides a batch of Trial objects to be queued into the TrialRunner. @@ -67,20 +71,20 @@ class SuggestionAlgorithm(SearchAlgorithm): self._finished = True return trials - def _generate_trials(self, experiment_spec, output_path=""): - """Generates trials with configurations from `_suggest`. + def _generate_trials(self, num_samples, experiment_spec, output_path=""): + """Generates trials with configurations from `suggest`. - Creates a trial_id that is passed into `_suggest`. + Creates a trial_id that is passed into `suggest`. Yields: Trial objects constructed according to `spec` """ if "run" not in experiment_spec: raise TuneError("Must specify `run` in {}".format(experiment_spec)) - for _ in range(experiment_spec.get("num_samples", 1)): + for _ in range(num_samples): trial_id = Trial.generate_id() while True: - suggested_config = self._suggest(trial_id) + suggested_config = self.suggest(trial_id) if suggested_config is None: yield None else: @@ -103,7 +107,7 @@ class SuggestionAlgorithm(SearchAlgorithm): def is_finished(self): return self._finished - def _suggest(self, trial_id): + def suggest(self, trial_id): """Queries the algorithm to retrieve the next set of parameters. Arguments: @@ -117,11 +121,11 @@ class SuggestionAlgorithm(SearchAlgorithm): Example: >>> suggester = SuggestionAlgorithm(max_concurrent=1) >>> suggester.add_configurations({ ... }) - >>> parameters_1 = suggester._suggest() - >>> parameters_2 = suggester._suggest() + >>> parameters_1 = suggester.suggest() + >>> parameters_2 = suggester.suggest() >>> parameters_2 is None >>> suggester.on_trial_complete(trial_id, result) - >>> parameters_2 = suggester._suggest() + >>> parameters_2 = suggester.suggest() >>> parameters_2 is not None """ raise NotImplementedError @@ -132,6 +136,16 @@ class SuggestionAlgorithm(SearchAlgorithm): def restore(self, checkpoint_dir): raise NotImplementedError + @property + def metric(self): + """The training result objective value attribute.""" + return self._metric + + @property + def mode(self): + """Specifies if minimizing or maximizing the metric.""" + return self._mode + class _MockSuggestionAlgorithm(SuggestionAlgorithm): def __init__(self, max_concurrent=2, **kwargs): @@ -143,7 +157,7 @@ class _MockSuggestionAlgorithm(SuggestionAlgorithm): self.results = [] super(_MockSuggestionAlgorithm, self).__init__(**kwargs) - def _suggest(self, trial_id): + def suggest(self, trial_id): if len(self.live_trials) < self._max_concurrent and not self.stall: self.live_trials[trial_id] = 1 return {"test_variable": 2} diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 1732b5886..b837d8845 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -13,6 +13,7 @@ from ray.tune.experiment import Experiment from ray.tune.trial import Trial from ray.tune.trial_runner import TrialRunner from ray.tune.resources import Resources, json_to_resources, resources_to_json +from ray.tune.suggest.repeater import Repeater from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm, SuggestionAlgorithm) @@ -252,7 +253,7 @@ class TrialRunnerTest3(unittest.TestCase): self._finished = True return trials - def _suggest(self, trial_id): + def suggest(self, trial_id): return {} ray.init(num_cpus=2) @@ -467,9 +468,13 @@ class TrialRunnerTest3(unittest.TestCase): class SearchAlgorithmTest(unittest.TestCase): + def tearDown(self): + ray.shutdown() + _register_all() + def testNestedSuggestion(self): class TestSuggestion(SuggestionAlgorithm): - def _suggest(self, trial_id): + def suggest(self, trial_id): return {"a": {"b": {"c": {"d": 4, "e": 5}}}} alg = TestSuggestion() @@ -478,6 +483,41 @@ class SearchAlgorithmTest(unittest.TestCase): self.assertTrue("e=5" in trial.experiment_tag) self.assertTrue("d=4" in trial.experiment_tag) + def _test_repeater(self, repeat): + ray.init(num_cpus=4) + + class TestSuggestion(SuggestionAlgorithm): + count = 0 + + def suggest(self, trial_id): + return {"test_variable": 5} + + def on_trial_complete(self, *args, **kwargs): + self.count += 1 + + alg = TestSuggestion(metric="episode_reward_mean") + repeat_alg = Repeater(alg, repeat=repeat, set_index=False) + experiment_spec = { + "run": "__fake", + "num_samples": 1, + "stop": { + "training_iteration": 1 + } + } + repeat_alg.add_configurations({"test": experiment_spec}) + runner = TrialRunner(search_alg=repeat_alg) + for i in range(repeat * 2): + runner.step() + + trials = runner.get_trials() + self.assertEquals(len(trials), repeat) + + def testRepeat1(self): + self._test_repeater(repeat=1) + + def testRepeat4(self): + self._test_repeater(repeat=4) + class ResourcesTest(unittest.TestCase): def testSubtraction(self):