[tune] Repeated evals (#7366)

* easyrepeat

* done

* suggest

* doc

* ok

* commit

* Apply suggestions from code review

Co-Authored-By: Ujval Misra <misraujval@gmail.com>

* Apply suggestions from code review

Co-Authored-By: Ujval Misra <misraujval@gmail.com>

* Apply suggestions from code review

* ok

* docs

Co-authored-by: Ujval Misra <misraujval@gmail.com>
This commit is contained in:
Richard Liaw
2020-03-07 11:08:23 -08:00
committed by GitHub
parent a8bda9b551
commit 115468de2c
14 changed files with 289 additions and 44 deletions
+2 -1
View File
@@ -2,11 +2,12 @@ from ray.tune.suggest.search import SearchAlgorithm
from ray.tune.suggest.basic_variant import BasicVariantGenerator
from ray.tune.suggest.suggestion import SuggestionAlgorithm
from ray.tune.suggest.variant_generator import grid_search
from ray.tune.suggest.repeater import Repeater
from ray.tune.suggest.bohb import TuneBOHB
__all__ = [
"SearchAlgorithm", "BasicVariantGenerator", "SuggestionAlgorithm",
"grid_search", "TuneBOHB"
"grid_search", "TuneBOHB", "Repeater"
]
+6 -5
View File
@@ -30,8 +30,8 @@ class AxSearch(SuggestionAlgorithm):
reported/returned by the Trainable.
max_concurrent (int): Number of maximum concurrent trials. Defaults
to 10.
minimize (bool): Whether this experiment represents a minimization
problem. Defaults to False.
mode (str): One of {min, max}. Determines whether objective is
minimizing or maximizing the metric attribute. Defaults to "max".
parameter_constraints (list[str]): Parameter constraints, such as
"x3 >= x4" or "x3 + x4 >= 2".
outcome_constraints (list[str]): Outcome constraints of form
@@ -49,7 +49,7 @@ class AxSearch(SuggestionAlgorithm):
>>> objective_name="hartmann6", max_concurrent=4)
"""
def __init__(self, ax_client, max_concurrent=10, **kwargs):
def __init__(self, ax_client, max_concurrent=10, mode="max", **kwargs):
assert ax is not None, "Ax must be installed!"
assert type(max_concurrent) is int and max_concurrent > 0
self._ax = ax_client
@@ -62,9 +62,10 @@ class AxSearch(SuggestionAlgorithm):
self._max_concurrent = max_concurrent
self._parameters = list(exp.parameters)
self._live_index_mapping = {}
super(AxSearch, self).__init__(**kwargs)
super(AxSearch, self).__init__(
metric=self._objective_name, mode=mode, **kwargs)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
parameters, trial_index = self._ax.get_next_trial()
+5 -3
View File
@@ -43,7 +43,9 @@ class BasicVariantGenerator(SearchAlgorithm):
for experiment in experiment_list:
self._trial_generator = itertools.chain(
self._trial_generator,
self._generate_trials(experiment.spec, experiment.name))
self._generate_trials(
experiment.spec.get("num_samples", 1), experiment.spec,
experiment.name))
def next_trials(self):
"""Provides Trial objects to be queued into the TrialRunner.
@@ -57,7 +59,7 @@ class BasicVariantGenerator(SearchAlgorithm):
self._finished = True
return trials
def _generate_trials(self, unresolved_spec, output_path=""):
def _generate_trials(self, num_samples, unresolved_spec, output_path=""):
"""Generates Trial objects with the variant generation process.
Uses a fixed point iteration to resolve variants. All trials
@@ -71,7 +73,7 @@ class BasicVariantGenerator(SearchAlgorithm):
if "run" not in unresolved_spec:
raise TuneError("Must specify `run` in {}".format(unresolved_spec))
for _ in range(unresolved_spec.get("num_samples", 1)):
for _ in range(num_samples):
for resolved_vars, spec in generate_variants(unresolved_spec):
trial_id = "%05d" % self._counter
experiment_tag = str(self._counter)
+3 -2
View File
@@ -80,9 +80,10 @@ class BayesOptSearch(SuggestionAlgorithm):
self.utility = byo.UtilityFunction(**utility_kwargs)
super(BayesOptSearch, self).__init__(**kwargs)
super(BayesOptSearch, self).__init__(
metric=self._metric, mode=mode, **kwargs)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
+3 -3
View File
@@ -71,16 +71,16 @@ class TuneBOHB(SuggestionAlgorithm):
self.trial_to_params = {}
self.running = set()
self.paused = set()
self.metric = metric
self._metric = metric
if mode == "max":
self._metric_op = -1.
elif mode == "min":
self._metric_op = 1.
bohb_config = bohb_config or {}
self.bohber = BOHB(space, **bohb_config)
super(TuneBOHB, self).__init__()
super(TuneBOHB, self).__init__(metric=self._metric, mode=mode)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if len(self.running) < self._max_concurrent:
# This parameter is not used in hpbandster implementation.
config, info = self.bohber.get_config(None)
+3 -2
View File
@@ -122,9 +122,10 @@ class HyperOptSearch(SuggestionAlgorithm):
else:
self.rstate = np.random.RandomState(random_state_seed)
super(HyperOptSearch, self).__init__(**kwargs)
super(HyperOptSearch, self).__init__(
metric=self._metric, mode=mode, **kwargs)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
+3 -2
View File
@@ -86,7 +86,8 @@ class NevergradSearch(SuggestionAlgorithm):
self._metric_op = 1.
self._nevergrad_opt = optimizer
self._live_trial_mapping = {}
super(NevergradSearch, self).__init__(**kwargs)
super(NevergradSearch, self).__init__(
metric=metric, mode=mode, **kwargs)
# validate parameters
if hasattr(optimizer, "instrumentation"): # added in v0.2.0
if optimizer.instrumentation.kwargs:
@@ -108,7 +109,7 @@ class NevergradSearch(SuggestionAlgorithm):
raise ValueError("len(parameters_names) must match optimizer "
"dimension for non-instrumented optimizers")
def _suggest(self, trial_id):
def suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
suggested_config = self._nevergrad_opt.ask()
+153
View File
@@ -0,0 +1,153 @@
import copy
import itertools
import logging
import numpy as np
from ray.tune.suggest.suggestion import SuggestionAlgorithm
from ray.tune.experiment import convert_to_experiment_list
logger = logging.getLogger(__name__)
TRIAL_INDEX = "__trial_index__"
"""str: A constant value representing the repeat index of the trial."""
class _TrialGroup:
"""Internal class for grouping trials of same parameters.
This is used when repeating trials for reducing training variance.
Args:
primary_trial_id (str): Trial ID of the "primary trial".
This trial is the one that the Searcher is aware of.
config (dict): Suggested configuration shared across all trials
in the trial group.
max_trials (int): Max number of trials to execute within this group.
"""
def __init__(self, primary_trial_id, config, max_trials=1):
assert type(config) is dict, (
"config is not a dict, got {}".format(config))
self.primary_trial_id = primary_trial_id
self.config = config
self._trials = {primary_trial_id: None}
self.max_trials = max_trials
def add(self, trial_id):
assert len(self._trials) < self.max_trials
self._trials[trial_id] = None
def full(self):
return len(self._trials) == self.max_trials
def report(self, trial_id, score):
assert trial_id in self._trials
if score is None:
raise ValueError("Internal Error: Score cannot be None.")
self._trials[trial_id] = score
def finished_reporting(self):
return None not in self._trials.values()
def scores(self):
return list(self._trials.values())
def count(self):
return len(self._trials)
class Repeater(SuggestionAlgorithm):
"""A wrapper algorithm for repeating trials of same parameters.
It is recommended that you do not run an early-stopping TrialScheduler
simultaneously.
Args:
search_alg (SearchAlgorithm): SearchAlgorithm object that the
Repeater will optimize. Note that the SearchAlgorithm
will only see 1 trial among multiple repeated trials.
The result/metric passed to the SearchAlgorithm upon
trial completion will be averaged among all repeats.
repeat (int): Number of times to generate a trial with a repeated
configuration. Defaults to 1.
set_index (bool): Sets a tune.suggest.repeater.TRIAL_INDEX in
Trainable/Function config which corresponds to the index of the
repeated trial. This can be used for seeds. Defaults to True.
"""
def __init__(self, search_alg, repeat=1, set_index=True):
self.search_alg = search_alg
self._repeat = repeat
self._set_index = set_index
self._groups = []
self._trial_id_to_group = {}
self._current_group = None
super(Repeater, self).__init__(
metric=self.search_alg.metric,
mode=self.search_alg.mode,
use_early_stopped_trials=self.search_alg._use_early_stopped)
def add_configurations(self, experiments):
"""Chains generator given experiment specifications.
Multiplies the number of trials by the repeat factor.
Arguments:
experiments (Experiment | list | dict): Experiments to run.
"""
experiment_list = convert_to_experiment_list(experiments)
for experiment in experiment_list:
self._trial_generator = itertools.chain(
self._trial_generator,
self._generate_trials(
experiment.spec.get("num_samples", 1) * self._repeat,
experiment.spec, experiment.name))
def suggest(self, trial_id):
if self._current_group is None or self._current_group.full():
config = self.search_alg.suggest(trial_id)
if config is None:
return config
self._current_group = _TrialGroup(
trial_id, copy.deepcopy(config), max_trials=self._repeat)
self._groups.append(self._current_group)
index_in_group = 0
else:
index_in_group = self._current_group.count()
self._current_group.add(trial_id)
config = self._current_group.config.copy()
if self._set_index:
config[TRIAL_INDEX] = index_in_group
self._trial_id_to_group[trial_id] = self._current_group
return config
def on_trial_complete(self, trial_id, result=None, **kwargs):
"""Stores the score for and keeps track of a completed trial.
Stores the metric of a trial as nan if any of the following conditions
are met:
1. ``result`` is empty or not provided.
2. ``result`` is provided but no metric was provided.
"""
if trial_id not in self._trial_id_to_group:
logger.error("Trial {} not in group; cannot report score. "
"Seen trials: {}".format(
trial_id, list(self._trial_id_to_group)))
trial_group = self._trial_id_to_group[trial_id]
if not result or self.search_alg.metric not in result:
score = np.nan
else:
score = result[self.search_alg.metric]
trial_group.report(trial_id, score)
if trial_group.finished_reporting():
scores = trial_group.scores()
self.search_alg.on_trial_complete(
trial_group.primary_trial_id,
result={self.search_alg.metric: np.nanmean(scores)},
**kwargs)
+2 -2
View File
@@ -95,9 +95,9 @@ class SigOptSearch(SuggestionAlgorithm):
parallel_bandwidth=self._max_concurrent,
)
super(SigOptSearch, self).__init__(**kwargs)
super(SigOptSearch, self).__init__(metric=metric, mode=mode, **kwargs)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
+3 -2
View File
@@ -122,9 +122,10 @@ class SkOptSearch(SuggestionAlgorithm):
self._metric_op = 1.
self._skopt_opt = optimizer
self._live_trial_mapping = {}
super(SkOptSearch, self).__init__(**kwargs)
super(SkOptSearch, self).__init__(
metric=self._metric, mode=mode, **kwargs)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if self._num_live_trials() >= self._max_concurrent:
return None
if self._initial_points:
+32 -18
View File
@@ -14,27 +14,29 @@ class SuggestionAlgorithm(SearchAlgorithm):
"""Abstract class for suggestion-based algorithms.
Custom search algorithms can extend this class easily by overriding the
`_suggest` method provide generated parameters for the trials.
`suggest` method provide generated parameters for the trials.
To track suggestions and their corresponding evaluations, the method
`_suggest` will be passed a trial_id, which will be used in
`suggest` will be passed a trial_id, which will be used in
subsequent notifications.
Example:
>>> suggester = SuggestionAlgorithm()
>>> suggester.add_configurations({ ... })
>>> new_parameters = suggester._suggest()
>>> new_parameters = suggester.suggest()
>>> suggester.on_trial_complete(trial_id, result)
>>> better_parameters = suggester._suggest()
>>> better_parameters = suggester.suggest()
"""
def __init__(self, use_early_stopped_trials=True):
"""Constructs a generator given experiment specifications.
"""
def __init__(self, metric=None, mode="max", use_early_stopped_trials=True):
"""Constructs a generator given experiment specifications."""
self._parser = make_parser()
self._trial_generator = []
self._counter = 0
self._finished = False
self._metric = metric
assert mode in ["min", "max"]
self._mode = mode
self._use_early_stopped = use_early_stopped_trials
def add_configurations(self, experiments):
@@ -47,7 +49,9 @@ class SuggestionAlgorithm(SearchAlgorithm):
for experiment in experiment_list:
self._trial_generator = itertools.chain(
self._trial_generator,
self._generate_trials(experiment.spec, experiment.name))
self._generate_trials(
experiment.spec.get("num_samples", 1), experiment.spec,
experiment.name))
def next_trials(self):
"""Provides a batch of Trial objects to be queued into the TrialRunner.
@@ -67,20 +71,20 @@ class SuggestionAlgorithm(SearchAlgorithm):
self._finished = True
return trials
def _generate_trials(self, experiment_spec, output_path=""):
"""Generates trials with configurations from `_suggest`.
def _generate_trials(self, num_samples, experiment_spec, output_path=""):
"""Generates trials with configurations from `suggest`.
Creates a trial_id that is passed into `_suggest`.
Creates a trial_id that is passed into `suggest`.
Yields:
Trial objects constructed according to `spec`
"""
if "run" not in experiment_spec:
raise TuneError("Must specify `run` in {}".format(experiment_spec))
for _ in range(experiment_spec.get("num_samples", 1)):
for _ in range(num_samples):
trial_id = Trial.generate_id()
while True:
suggested_config = self._suggest(trial_id)
suggested_config = self.suggest(trial_id)
if suggested_config is None:
yield None
else:
@@ -103,7 +107,7 @@ class SuggestionAlgorithm(SearchAlgorithm):
def is_finished(self):
return self._finished
def _suggest(self, trial_id):
def suggest(self, trial_id):
"""Queries the algorithm to retrieve the next set of parameters.
Arguments:
@@ -117,11 +121,11 @@ class SuggestionAlgorithm(SearchAlgorithm):
Example:
>>> suggester = SuggestionAlgorithm(max_concurrent=1)
>>> suggester.add_configurations({ ... })
>>> parameters_1 = suggester._suggest()
>>> parameters_2 = suggester._suggest()
>>> parameters_1 = suggester.suggest()
>>> parameters_2 = suggester.suggest()
>>> parameters_2 is None
>>> suggester.on_trial_complete(trial_id, result)
>>> parameters_2 = suggester._suggest()
>>> parameters_2 = suggester.suggest()
>>> parameters_2 is not None
"""
raise NotImplementedError
@@ -132,6 +136,16 @@ class SuggestionAlgorithm(SearchAlgorithm):
def restore(self, checkpoint_dir):
raise NotImplementedError
@property
def metric(self):
"""The training result objective value attribute."""
return self._metric
@property
def mode(self):
"""Specifies if minimizing or maximizing the metric."""
return self._mode
class _MockSuggestionAlgorithm(SuggestionAlgorithm):
def __init__(self, max_concurrent=2, **kwargs):
@@ -143,7 +157,7 @@ class _MockSuggestionAlgorithm(SuggestionAlgorithm):
self.results = []
super(_MockSuggestionAlgorithm, self).__init__(**kwargs)
def _suggest(self, trial_id):
def suggest(self, trial_id):
if len(self.live_trials) < self._max_concurrent and not self.stall:
self.live_trials[trial_id] = 1
return {"test_variable": 2}
+42 -2
View File
@@ -13,6 +13,7 @@ from ray.tune.experiment import Experiment
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.resources import Resources, json_to_resources, resources_to_json
from ray.tune.suggest.repeater import Repeater
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
SuggestionAlgorithm)
@@ -252,7 +253,7 @@ class TrialRunnerTest3(unittest.TestCase):
self._finished = True
return trials
def _suggest(self, trial_id):
def suggest(self, trial_id):
return {}
ray.init(num_cpus=2)
@@ -467,9 +468,13 @@ class TrialRunnerTest3(unittest.TestCase):
class SearchAlgorithmTest(unittest.TestCase):
def tearDown(self):
ray.shutdown()
_register_all()
def testNestedSuggestion(self):
class TestSuggestion(SuggestionAlgorithm):
def _suggest(self, trial_id):
def suggest(self, trial_id):
return {"a": {"b": {"c": {"d": 4, "e": 5}}}}
alg = TestSuggestion()
@@ -478,6 +483,41 @@ class SearchAlgorithmTest(unittest.TestCase):
self.assertTrue("e=5" in trial.experiment_tag)
self.assertTrue("d=4" in trial.experiment_tag)
def _test_repeater(self, repeat):
ray.init(num_cpus=4)
class TestSuggestion(SuggestionAlgorithm):
count = 0
def suggest(self, trial_id):
return {"test_variable": 5}
def on_trial_complete(self, *args, **kwargs):
self.count += 1
alg = TestSuggestion(metric="episode_reward_mean")
repeat_alg = Repeater(alg, repeat=repeat, set_index=False)
experiment_spec = {
"run": "__fake",
"num_samples": 1,
"stop": {
"training_iteration": 1
}
}
repeat_alg.add_configurations({"test": experiment_spec})
runner = TrialRunner(search_alg=repeat_alg)
for i in range(repeat * 2):
runner.step()
trials = runner.get_trials()
self.assertEquals(len(trials), repeat)
def testRepeat1(self):
self._test_repeater(repeat=1)
def testRepeat4(self):
self._test_repeater(repeat=4)
class ResourcesTest(unittest.TestCase):
def testSubtraction(self):