mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 07:41:09 +08:00
[tune] BayesOpt - finish early when optimizer converges (#8808)
This commit is contained in:
@@ -36,6 +36,14 @@ py_test(
|
||||
tags = ["exclusive"],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_convergence_gaussian_process",
|
||||
size = "small",
|
||||
srcs = ["tests/test_convergence_gaussian_process.py"],
|
||||
deps = [":tune_lib"],
|
||||
tags = ["exclusive"],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_dependency",
|
||||
size = "small",
|
||||
|
||||
@@ -1,16 +1,29 @@
|
||||
import copy
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
import pickle
|
||||
import json
|
||||
try: # Python 3 only -- needed for lint test.
|
||||
import bayes_opt as byo
|
||||
except ImportError:
|
||||
byo = None
|
||||
|
||||
from ray.tune.suggest import Searcher
|
||||
from ray.tune.utils import flatten_dict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _dict_hash(config, precision):
|
||||
flatconfig = flatten_dict(config)
|
||||
for param, value in flatconfig.items():
|
||||
if isinstance(value, float):
|
||||
flatconfig[param] = "{:.{digits}f}".format(value, digits=precision)
|
||||
|
||||
hashed = json.dumps(flatconfig, sort_keys=True, default=str)
|
||||
return hashed
|
||||
|
||||
|
||||
class BayesOptSearch(Searcher):
|
||||
"""Uses fmfn/BayesianOptimization to optimize hyperparameters.
|
||||
|
||||
@@ -70,6 +83,8 @@ class BayesOptSearch(Searcher):
|
||||
random_state=42,
|
||||
random_search_steps=10,
|
||||
verbose=0,
|
||||
patience=5,
|
||||
skip_duplicate=True,
|
||||
analysis=None,
|
||||
max_concurrent=None,
|
||||
use_early_stopped_trials=None):
|
||||
@@ -88,6 +103,14 @@ class BayesOptSearch(Searcher):
|
||||
random_search_steps (int): Number of initial random searches.
|
||||
This is necessary to avoid initial local overfitting
|
||||
of the Bayesian process.
|
||||
patience (int): Must be > 0. If the optimizer suggests a set of
|
||||
hyperparameters more than 'patience' times,
|
||||
then the whole experiment will stop.
|
||||
skip_duplicate (bool): If true, BayesOptSearch will not create
|
||||
a trial with a previously seen set of hyperparameters. By
|
||||
default, floating values will be reduced to a digit precision
|
||||
of 5. You can override this by setting
|
||||
``searcher.repeat_float_precision``.
|
||||
analysis (ExperimentAnalysis): Optionally, the previous analysis
|
||||
to integrate.
|
||||
verbose (int): Sets verbosity level for BayesOpt packages.
|
||||
@@ -99,6 +122,13 @@ class BayesOptSearch(Searcher):
|
||||
" the command: `pip install bayesian-optimization`.")
|
||||
assert mode in ["min", "max"], "`mode` must be 'min' or 'max'!"
|
||||
self.max_concurrent = max_concurrent
|
||||
self._config_counter = defaultdict(int)
|
||||
self._patience = patience
|
||||
# int: Precision at which to hash values.
|
||||
self.repeat_float_precision = 5
|
||||
if self._patience <= 0:
|
||||
raise ValueError("patience must be set to a value greater than 0!")
|
||||
self._skip_duplicate = skip_duplicate
|
||||
super(BayesOptSearch, self).__init__(
|
||||
metric=metric,
|
||||
mode=mode,
|
||||
@@ -120,7 +150,7 @@ class BayesOptSearch(Searcher):
|
||||
self._metric_op = -1.
|
||||
|
||||
self._live_trial_mapping = {}
|
||||
self._cached_results = []
|
||||
self._buffered_trial_results = []
|
||||
self.random_search_trials = random_search_steps
|
||||
self._total_random_search_trials = 0
|
||||
|
||||
@@ -150,24 +180,41 @@ class BayesOptSearch(Searcher):
|
||||
# we stop the suggestion and return None.
|
||||
return None
|
||||
|
||||
# We compute the new point to explore
|
||||
config = self.optimizer.suggest(self.utility)
|
||||
|
||||
config_hash = _dict_hash(config, self.repeat_float_precision)
|
||||
# Check if already computed
|
||||
already_seen = config_hash in self._config_counter
|
||||
self._config_counter[config_hash] += 1
|
||||
top_repeats = max(self._config_counter.values())
|
||||
|
||||
# If patience is set and we've repeated a trial numerous times,
|
||||
# we terminate the experiment.
|
||||
if self._patience is not None and top_repeats > self._patience:
|
||||
return Searcher.FINISHED
|
||||
# If we have seen a value before, we'll skip it.
|
||||
if already_seen and self._skip_duplicate:
|
||||
logger.info("Skipping duplicated config: {}.".format(config))
|
||||
return None
|
||||
|
||||
# If we are still in the random search part and we are waiting for
|
||||
# trials to complete
|
||||
if len(self._cached_results) < self.random_search_trials:
|
||||
if len(self._buffered_trial_results) < self.random_search_trials:
|
||||
# We check if we have already maxed out the number of requested
|
||||
# random search trials
|
||||
if self._total_random_search_trials == self.random_search_trials:
|
||||
# If so we stop the suggestion and return None
|
||||
return None
|
||||
# Otherwise we increase the total number of rndom search trials
|
||||
self._total_random_search_trials += 1
|
||||
if config:
|
||||
self._total_random_search_trials += 1
|
||||
|
||||
# We compute the new point to explore
|
||||
new_trial = self.optimizer.suggest(self.utility)
|
||||
# Save the new trial to the trial mapping
|
||||
self._live_trial_mapping[trial_id] = new_trial
|
||||
self._live_trial_mapping[trial_id] = config
|
||||
|
||||
# Return a deep copy of the mapping
|
||||
return copy.deepcopy(new_trial)
|
||||
return copy.deepcopy(config)
|
||||
|
||||
def register_analysis(self, analysis):
|
||||
"""Integrate the given analysis into the gaussian process.
|
||||
@@ -205,18 +252,18 @@ class BayesOptSearch(Searcher):
|
||||
return
|
||||
|
||||
# If we don't have to execute some random search steps
|
||||
if len(self._cached_results) >= self.random_search_trials:
|
||||
if len(self._buffered_trial_results) >= self.random_search_trials:
|
||||
# we simply register the obtained result
|
||||
self._register_result(params, result)
|
||||
return
|
||||
|
||||
# We store the results into a temporary cache
|
||||
self._cached_results.append((params, result))
|
||||
self._buffered_trial_results.append((params, result))
|
||||
|
||||
# If the random search finished,
|
||||
# we update the BO with all the computer points.
|
||||
if len(self._cached_results) == self.random_search_trials:
|
||||
for params, result in self._cached_results:
|
||||
if len(self._buffered_trial_results) == self.random_search_trials:
|
||||
for params, result in self._buffered_trial_results:
|
||||
self._register_result(params, result)
|
||||
|
||||
def _register_result(self, params, result):
|
||||
@@ -226,11 +273,13 @@ class BayesOptSearch(Searcher):
|
||||
def save(self, checkpoint_dir):
|
||||
"""Storing current optimizer state."""
|
||||
with open(checkpoint_dir, "wb") as f:
|
||||
pickle.dump((self.optimizer, self._cached_results,
|
||||
self._total_random_search_trials), f)
|
||||
pickle.dump(
|
||||
(self.optimizer, self._buffered_trial_results,
|
||||
self._total_random_search_trials, self._config_counter), f)
|
||||
|
||||
def restore(self, checkpoint_dir):
|
||||
"""Restoring current optimizer state."""
|
||||
with open(checkpoint_dir, "rb") as f:
|
||||
(self.optimizer, self._cached_results,
|
||||
self._total_random_search_trials) = pickle.load(f)
|
||||
(self.optimizer, self._buffered_trial_results,
|
||||
self._total_random_search_trials,
|
||||
self._config_counter) = pickle.load(f)
|
||||
|
||||
@@ -57,6 +57,7 @@ class Searcher:
|
||||
|
||||
|
||||
"""
|
||||
FINISHED = "FINISHED"
|
||||
|
||||
def __init__(self,
|
||||
metric="episode_reward_mean",
|
||||
@@ -120,7 +121,11 @@ class Searcher:
|
||||
trial_id (str): Trial ID used for subsequent notifications.
|
||||
|
||||
Returns:
|
||||
dict|None: Configuration for a trial, if possible.
|
||||
dict | FINISHED | None: Configuration for a trial, if possible.
|
||||
If FINISHED is returned, Tune will be notified that
|
||||
no more suggestions/configurations will be provided.
|
||||
If None is returned, Tune will skip the querying of the
|
||||
searcher for this step.
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
@@ -172,8 +177,10 @@ class ConcurrencyLimiter(Searcher):
|
||||
def suggest(self, trial_id):
|
||||
if len(self.live_trials) >= self.max_concurrent:
|
||||
return
|
||||
self.live_trials.add(trial_id)
|
||||
return self.searcher.suggest(trial_id)
|
||||
suggestion = self.searcher.suggest(trial_id)
|
||||
if suggestion not in (None, Searcher.FINISHED):
|
||||
self.live_trials.add(trial_id)
|
||||
return suggestion
|
||||
|
||||
def on_trial_complete(self, trial_id, result=None, error=False):
|
||||
if trial_id not in self.live_trials:
|
||||
@@ -251,6 +258,11 @@ class SearchGenerator(SearchAlgorithm):
|
||||
logger.debug("creating trial")
|
||||
trial_id = Trial.generate_id()
|
||||
suggested_config = self.searcher.suggest(trial_id)
|
||||
if suggested_config == Searcher.FINISHED:
|
||||
self._finished = True
|
||||
logger.debug("Searcher has finished.")
|
||||
return
|
||||
|
||||
if suggested_config is None:
|
||||
return
|
||||
spec = copy.deepcopy(experiment_spec)
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.suggest.bayesopt import BayesOptSearch
|
||||
from ray.tune.suggest import ConcurrencyLimiter
|
||||
import unittest
|
||||
|
||||
|
||||
def loss(config, reporter):
|
||||
x = config.get("x")
|
||||
reporter(loss=x**2) # A simple function to optimize
|
||||
|
||||
|
||||
class ConvergenceTest(unittest.TestCase):
|
||||
"""Test convergence in gaussian process."""
|
||||
|
||||
def test_convergence_gaussian_process(self):
|
||||
np.random.seed(0)
|
||||
ray.init(local_mode=True, num_cpus=1, num_gpus=1)
|
||||
|
||||
space = {
|
||||
"x": (0, 20) # This is the space of parameters to explore
|
||||
}
|
||||
|
||||
resources_per_trial = {"cpu": 1, "gpu": 0}
|
||||
|
||||
# Following bayesian optimization
|
||||
gp = BayesOptSearch(
|
||||
space, metric="loss", mode="min", random_search_steps=10)
|
||||
gp.repeat_float_precision = 5
|
||||
gp = ConcurrencyLimiter(gp, 1)
|
||||
|
||||
# Execution of the BO.
|
||||
analysis = tune.run(
|
||||
loss,
|
||||
# stop=EarlyStopping("loss", mode="min", patience=5),
|
||||
search_alg=gp,
|
||||
config={},
|
||||
num_samples=100, # Number of iterations
|
||||
resources_per_trial=resources_per_trial,
|
||||
raise_on_failed_trial=False,
|
||||
fail_fast=True,
|
||||
verbose=1)
|
||||
assert len(analysis.trials) == 41
|
||||
|
||||
ray.shutdown()
|
||||
Reference in New Issue
Block a user