From d7b64ef2797370cb71b02cee6f2d9b12afe16aac Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Tue, 9 Jun 2020 12:09:39 -0700 Subject: [PATCH] [tune] BayesOpt - finish early when optimizer converges (#8808) --- python/ray/tune/BUILD | 8 ++ python/ray/tune/suggest/bayesopt.py | 79 +++++++++++++++---- python/ray/tune/suggest/suggestion.py | 18 ++++- .../test_convergence_gaussian_process.py | 47 +++++++++++ 4 files changed, 134 insertions(+), 18 deletions(-) create mode 100644 python/ray/tune/tests/test_convergence_gaussian_process.py diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index 73075c9a4..8bfb9ddde 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -36,6 +36,14 @@ py_test( tags = ["exclusive"], ) +py_test( + name = "test_convergence_gaussian_process", + size = "small", + srcs = ["tests/test_convergence_gaussian_process.py"], + deps = [":tune_lib"], + tags = ["exclusive"], +) + py_test( name = "test_dependency", size = "small", diff --git a/python/ray/tune/suggest/bayesopt.py b/python/ray/tune/suggest/bayesopt.py index 63beca11f..cd0351d31 100644 --- a/python/ray/tune/suggest/bayesopt.py +++ b/python/ray/tune/suggest/bayesopt.py @@ -1,16 +1,29 @@ import copy +from collections import defaultdict import logging import pickle +import json try: # Python 3 only -- needed for lint test. import bayes_opt as byo except ImportError: byo = None from ray.tune.suggest import Searcher +from ray.tune.utils import flatten_dict logger = logging.getLogger(__name__) +def _dict_hash(config, precision): + flatconfig = flatten_dict(config) + for param, value in flatconfig.items(): + if isinstance(value, float): + flatconfig[param] = "{:.{digits}f}".format(value, digits=precision) + + hashed = json.dumps(flatconfig, sort_keys=True, default=str) + return hashed + + class BayesOptSearch(Searcher): """Uses fmfn/BayesianOptimization to optimize hyperparameters. @@ -70,6 +83,8 @@ class BayesOptSearch(Searcher): random_state=42, random_search_steps=10, verbose=0, + patience=5, + skip_duplicate=True, analysis=None, max_concurrent=None, use_early_stopped_trials=None): @@ -88,6 +103,14 @@ class BayesOptSearch(Searcher): random_search_steps (int): Number of initial random searches. This is necessary to avoid initial local overfitting of the Bayesian process. + patience (int): Must be > 0. If the optimizer suggests a set of + hyperparameters more than 'patience' times, + then the whole experiment will stop. + skip_duplicate (bool): If true, BayesOptSearch will not create + a trial with a previously seen set of hyperparameters. By + default, floating values will be reduced to a digit precision + of 5. You can override this by setting + ``searcher.repeat_float_precision``. analysis (ExperimentAnalysis): Optionally, the previous analysis to integrate. verbose (int): Sets verbosity level for BayesOpt packages. @@ -99,6 +122,13 @@ class BayesOptSearch(Searcher): " the command: `pip install bayesian-optimization`.") assert mode in ["min", "max"], "`mode` must be 'min' or 'max'!" self.max_concurrent = max_concurrent + self._config_counter = defaultdict(int) + self._patience = patience + # int: Precision at which to hash values. + self.repeat_float_precision = 5 + if self._patience <= 0: + raise ValueError("patience must be set to a value greater than 0!") + self._skip_duplicate = skip_duplicate super(BayesOptSearch, self).__init__( metric=metric, mode=mode, @@ -120,7 +150,7 @@ class BayesOptSearch(Searcher): self._metric_op = -1. self._live_trial_mapping = {} - self._cached_results = [] + self._buffered_trial_results = [] self.random_search_trials = random_search_steps self._total_random_search_trials = 0 @@ -150,24 +180,41 @@ class BayesOptSearch(Searcher): # we stop the suggestion and return None. return None + # We compute the new point to explore + config = self.optimizer.suggest(self.utility) + + config_hash = _dict_hash(config, self.repeat_float_precision) + # Check if already computed + already_seen = config_hash in self._config_counter + self._config_counter[config_hash] += 1 + top_repeats = max(self._config_counter.values()) + + # If patience is set and we've repeated a trial numerous times, + # we terminate the experiment. + if self._patience is not None and top_repeats > self._patience: + return Searcher.FINISHED + # If we have seen a value before, we'll skip it. + if already_seen and self._skip_duplicate: + logger.info("Skipping duplicated config: {}.".format(config)) + return None + # If we are still in the random search part and we are waiting for # trials to complete - if len(self._cached_results) < self.random_search_trials: + if len(self._buffered_trial_results) < self.random_search_trials: # We check if we have already maxed out the number of requested # random search trials if self._total_random_search_trials == self.random_search_trials: # If so we stop the suggestion and return None return None # Otherwise we increase the total number of rndom search trials - self._total_random_search_trials += 1 + if config: + self._total_random_search_trials += 1 - # We compute the new point to explore - new_trial = self.optimizer.suggest(self.utility) # Save the new trial to the trial mapping - self._live_trial_mapping[trial_id] = new_trial + self._live_trial_mapping[trial_id] = config # Return a deep copy of the mapping - return copy.deepcopy(new_trial) + return copy.deepcopy(config) def register_analysis(self, analysis): """Integrate the given analysis into the gaussian process. @@ -205,18 +252,18 @@ class BayesOptSearch(Searcher): return # If we don't have to execute some random search steps - if len(self._cached_results) >= self.random_search_trials: + if len(self._buffered_trial_results) >= self.random_search_trials: # we simply register the obtained result self._register_result(params, result) return # We store the results into a temporary cache - self._cached_results.append((params, result)) + self._buffered_trial_results.append((params, result)) # If the random search finished, # we update the BO with all the computer points. - if len(self._cached_results) == self.random_search_trials: - for params, result in self._cached_results: + if len(self._buffered_trial_results) == self.random_search_trials: + for params, result in self._buffered_trial_results: self._register_result(params, result) def _register_result(self, params, result): @@ -226,11 +273,13 @@ class BayesOptSearch(Searcher): def save(self, checkpoint_dir): """Storing current optimizer state.""" with open(checkpoint_dir, "wb") as f: - pickle.dump((self.optimizer, self._cached_results, - self._total_random_search_trials), f) + pickle.dump( + (self.optimizer, self._buffered_trial_results, + self._total_random_search_trials, self._config_counter), f) def restore(self, checkpoint_dir): """Restoring current optimizer state.""" with open(checkpoint_dir, "rb") as f: - (self.optimizer, self._cached_results, - self._total_random_search_trials) = pickle.load(f) + (self.optimizer, self._buffered_trial_results, + self._total_random_search_trials, + self._config_counter) = pickle.load(f) diff --git a/python/ray/tune/suggest/suggestion.py b/python/ray/tune/suggest/suggestion.py index 68d2e49b5..f535ec4ec 100644 --- a/python/ray/tune/suggest/suggestion.py +++ b/python/ray/tune/suggest/suggestion.py @@ -57,6 +57,7 @@ class Searcher: """ + FINISHED = "FINISHED" def __init__(self, metric="episode_reward_mean", @@ -120,7 +121,11 @@ class Searcher: trial_id (str): Trial ID used for subsequent notifications. Returns: - dict|None: Configuration for a trial, if possible. + dict | FINISHED | None: Configuration for a trial, if possible. + If FINISHED is returned, Tune will be notified that + no more suggestions/configurations will be provided. + If None is returned, Tune will skip the querying of the + searcher for this step. """ raise NotImplementedError @@ -172,8 +177,10 @@ class ConcurrencyLimiter(Searcher): def suggest(self, trial_id): if len(self.live_trials) >= self.max_concurrent: return - self.live_trials.add(trial_id) - return self.searcher.suggest(trial_id) + suggestion = self.searcher.suggest(trial_id) + if suggestion not in (None, Searcher.FINISHED): + self.live_trials.add(trial_id) + return suggestion def on_trial_complete(self, trial_id, result=None, error=False): if trial_id not in self.live_trials: @@ -251,6 +258,11 @@ class SearchGenerator(SearchAlgorithm): logger.debug("creating trial") trial_id = Trial.generate_id() suggested_config = self.searcher.suggest(trial_id) + if suggested_config == Searcher.FINISHED: + self._finished = True + logger.debug("Searcher has finished.") + return + if suggested_config is None: return spec = copy.deepcopy(experiment_spec) diff --git a/python/ray/tune/tests/test_convergence_gaussian_process.py b/python/ray/tune/tests/test_convergence_gaussian_process.py new file mode 100644 index 000000000..c81eff8ef --- /dev/null +++ b/python/ray/tune/tests/test_convergence_gaussian_process.py @@ -0,0 +1,47 @@ +import numpy as np + +import ray +from ray import tune +from ray.tune.suggest.bayesopt import BayesOptSearch +from ray.tune.suggest import ConcurrencyLimiter +import unittest + + +def loss(config, reporter): + x = config.get("x") + reporter(loss=x**2) # A simple function to optimize + + +class ConvergenceTest(unittest.TestCase): + """Test convergence in gaussian process.""" + + def test_convergence_gaussian_process(self): + np.random.seed(0) + ray.init(local_mode=True, num_cpus=1, num_gpus=1) + + space = { + "x": (0, 20) # This is the space of parameters to explore + } + + resources_per_trial = {"cpu": 1, "gpu": 0} + + # Following bayesian optimization + gp = BayesOptSearch( + space, metric="loss", mode="min", random_search_steps=10) + gp.repeat_float_precision = 5 + gp = ConcurrencyLimiter(gp, 1) + + # Execution of the BO. + analysis = tune.run( + loss, + # stop=EarlyStopping("loss", mode="min", patience=5), + search_alg=gp, + config={}, + num_samples=100, # Number of iterations + resources_per_trial=resources_per_trial, + raise_on_failed_trial=False, + fail_fast=True, + verbose=1) + assert len(analysis.trials) == 41 + + ray.shutdown()