mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 23:54:34 +08:00
[tune] PB2 (#11466)
Co-authored-by: Sumanth Ratna <sumanthratna@gmail.com> Co-authored-by: Amog Kamsetty <amogkamsetty@yahoo.com> Co-authored-by: Amog Kamsetty <amogkam@users.noreply.github.com> Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
committed by
GitHub
parent
349c3ec86b
commit
e7aafd7d24
+10
-1
@@ -80,7 +80,7 @@ py_test(
|
||||
|
||||
py_test(
|
||||
name = "test_experiment_analysis_mem",
|
||||
size = "small",
|
||||
size = "medium",
|
||||
srcs = ["tests/test_experiment_analysis_mem.py"],
|
||||
deps = [":tune_lib"],
|
||||
)
|
||||
@@ -520,6 +520,15 @@ py_test(
|
||||
args = ["--smoke-test"]
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "pb2_example",
|
||||
size = "medium",
|
||||
srcs = ["examples/pb2_example.py"],
|
||||
deps = [":tune_lib"],
|
||||
tags = ["exclusive", "example"],
|
||||
args = ["--smoke-test"]
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "pbt_convnet_example",
|
||||
size = "medium",
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
|
||||
import ray
|
||||
from ray import tune
|
||||
from ray.tune.schedulers import PB2
|
||||
from ray.tune.examples.pbt_function import pbt_function
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--smoke-test", action="store_true", help="Finish quickly for testing")
|
||||
args, _ = parser.parse_known_args()
|
||||
if args.smoke_test:
|
||||
ray.init(num_cpus=2) # force pausing to happen for test
|
||||
else:
|
||||
ray.init()
|
||||
|
||||
pbt = PB2(
|
||||
time_attr="training_iteration",
|
||||
metric="mean_accuracy",
|
||||
mode="max",
|
||||
perturbation_interval=20,
|
||||
hyperparam_bounds={
|
||||
# hyperparameter bounds.
|
||||
"lr": [0.0001, 0.02],
|
||||
})
|
||||
|
||||
tune.run(
|
||||
pbt_function,
|
||||
name="pbt_test",
|
||||
scheduler=pbt,
|
||||
verbose=False,
|
||||
stop={
|
||||
"training_iteration": 30,
|
||||
},
|
||||
num_samples=8,
|
||||
fail_fast=True,
|
||||
config={
|
||||
"lr": 0.0001,
|
||||
# note: this parameter is perturbed but has no effect on
|
||||
# the model training in this example
|
||||
"some_other_factor": 1,
|
||||
})
|
||||
@@ -0,0 +1,145 @@
|
||||
import os
|
||||
import random
|
||||
import argparse
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
|
||||
import ray
|
||||
from ray.tune import run, sample_from
|
||||
from ray.tune.schedulers import PB2, PopulationBasedTraining
|
||||
|
||||
|
||||
# Postprocess the perturbed config to ensure it's still valid used if PBT.
|
||||
def explore(config):
|
||||
# Ensure we collect enough timesteps to do sgd.
|
||||
if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
|
||||
config["train_batch_size"] = config["sgd_minibatch_size"] * 2
|
||||
# Ensure we run at least one sgd iter.
|
||||
if config["lambda"] > 1:
|
||||
config["lambda"] = 1
|
||||
config["train_batch_size"] = int(config["train_batch_size"])
|
||||
return config
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--max", type=int, default=1000000)
|
||||
parser.add_argument("--algo", type=str, default="PPO")
|
||||
parser.add_argument("--num_workers", type=int, default=4)
|
||||
parser.add_argument("--num_samples", type=int, default=4)
|
||||
parser.add_argument("--t_ready", type=int, default=50000)
|
||||
parser.add_argument("--seed", type=int, default=0)
|
||||
parser.add_argument(
|
||||
"--horizon", type=int, default=1600) # make this 1000 for other envs
|
||||
parser.add_argument("--perturb", type=float, default=0.25) # if using PBT
|
||||
parser.add_argument("--env_name", type=str, default="BipedalWalker-v2")
|
||||
parser.add_argument(
|
||||
"--criteria", type=str,
|
||||
default="timesteps_total") # "training_iteration", "time_total_s"
|
||||
parser.add_argument(
|
||||
"--net", type=str, default="32_32"
|
||||
) # May be important to use a larger network for bigger tasks.
|
||||
parser.add_argument("--filename", type=str, default="")
|
||||
parser.add_argument("--method", type=str, default="pb2") # ['pbt', 'pb2']
|
||||
parser.add_argument("--save_csv", type=bool, default=False)
|
||||
|
||||
args = parser.parse_args()
|
||||
ray.init()
|
||||
|
||||
# bipedalwalker needs 1600
|
||||
if args.env_name in ["BipedalWalker-v2", "BipedalWalker-v3"]:
|
||||
horizon = 1600
|
||||
else:
|
||||
horizon = 1000
|
||||
|
||||
pbt = PopulationBasedTraining(
|
||||
time_attr=args.criteria,
|
||||
metric="episode_reward_mean",
|
||||
mode="max",
|
||||
perturbation_interval=args.t_ready,
|
||||
resample_probability=args.perturb,
|
||||
quantile_fraction=args.perturb, # copy bottom % with top %
|
||||
# Specifies the search space for these hyperparams
|
||||
hyperparam_mutations={
|
||||
"lambda": lambda: random.uniform(0.9, 1.0),
|
||||
"clip_param": lambda: random.uniform(0.1, 0.5),
|
||||
"lr": lambda: random.uniform(1e-3, 1e-5),
|
||||
"train_batch_size": lambda: random.randint(1000, 60000),
|
||||
},
|
||||
custom_explore_fn=explore)
|
||||
|
||||
pb2 = PB2(
|
||||
time_attr=args.criteria,
|
||||
metric="episode_reward_mean",
|
||||
mode="max",
|
||||
perturbation_interval=args.t_ready,
|
||||
quantile_fraction=args.perturb, # copy bottom % with top %
|
||||
# Specifies the hyperparam search space
|
||||
hyperparam_bounds={
|
||||
"lambda": [0.9, 1.0],
|
||||
"clip_param": [0.1, 0.5],
|
||||
"lr": [1e-3, 1e-5],
|
||||
"train_batch_size": [1000, 60000]
|
||||
})
|
||||
|
||||
methods = {"pbt": pbt, "pb2": pb2}
|
||||
|
||||
timelog = str(datetime.date(datetime.now())) + "_" + str(
|
||||
datetime.time(datetime.now()))
|
||||
|
||||
args.dir = "{}_{}_{}_Size{}_{}_{}".format(args.algo,
|
||||
args.filename, args.method,
|
||||
str(args.num_samples),
|
||||
args.env_name, args.criteria)
|
||||
|
||||
analysis = run(
|
||||
args.algo,
|
||||
name="{}_{}_{}_seed{}_{}".format(timelog, args.method, args.env_name,
|
||||
str(args.seed), args.filename),
|
||||
scheduler=methods[args.method],
|
||||
verbose=1,
|
||||
num_samples=args.num_samples,
|
||||
stop={args.criteria: args.max},
|
||||
config={
|
||||
"env": args.env_name,
|
||||
"log_level": "INFO",
|
||||
"seed": args.seed,
|
||||
"kl_coeff": 1.0,
|
||||
"num_gpus": 0,
|
||||
"horizon": horizon,
|
||||
"observation_filter": "MeanStdFilter",
|
||||
"model": {
|
||||
"fcnet_hiddens": [
|
||||
int(args.net.split("_")[0]),
|
||||
int(args.net.split("_")[1])
|
||||
],
|
||||
"free_log_std": True
|
||||
},
|
||||
"num_sgd_iter": 10,
|
||||
"sgd_minibatch_size": 128,
|
||||
"lambda": sample_from(lambda spec: random.uniform(0.9, 1.0)),
|
||||
"clip_param": sample_from(lambda spec: random.uniform(0.1, 0.5)),
|
||||
"lr": sample_from(lambda spec: random.uniform(1e-3, 1e-5)),
|
||||
"train_batch_size": sample_from(
|
||||
lambda spec: random.randint(1000, 60000))
|
||||
})
|
||||
|
||||
all_dfs = analysis.trial_dataframes
|
||||
names = list(all_dfs.keys())
|
||||
|
||||
results = pd.DataFrame()
|
||||
for i in range(args.num_samples):
|
||||
df = all_dfs[names[i]]
|
||||
df = df[[
|
||||
"timesteps_total", "episodes_total", "episode_reward_mean",
|
||||
"info/learner/default_policy/cur_kl_coeff"
|
||||
]]
|
||||
df["Agent"] = i
|
||||
results = pd.concat([results, df]).reset_index(drop=True)
|
||||
|
||||
if args.save_csv:
|
||||
if not (os.path.exists("data/" + args.dir)):
|
||||
os.makedirs("data/" + args.dir)
|
||||
|
||||
results.to_csv("data/{}/seed{}.csv".format(args.dir, str(args.seed)))
|
||||
@@ -6,6 +6,7 @@ from ray.tune.schedulers.async_hyperband import (AsyncHyperBandScheduler,
|
||||
from ray.tune.schedulers.median_stopping_rule import MedianStoppingRule
|
||||
from ray.tune.schedulers.pbt import (PopulationBasedTraining,
|
||||
PopulationBasedTrainingReplay)
|
||||
from ray.tune.schedulers.pb2 import PB2
|
||||
|
||||
|
||||
def create_scheduler(
|
||||
@@ -37,6 +38,7 @@ def create_scheduler(
|
||||
"hb_bohb": HyperBandForBOHB,
|
||||
"pbt": PopulationBasedTraining,
|
||||
"pbt_replay": PopulationBasedTrainingReplay,
|
||||
"pb2": PB2,
|
||||
}
|
||||
scheduler = scheduler.lower()
|
||||
if scheduler not in SCHEDULER_IMPORT:
|
||||
@@ -52,5 +54,5 @@ __all__ = [
|
||||
"TrialScheduler", "HyperBandScheduler", "AsyncHyperBandScheduler",
|
||||
"ASHAScheduler", "MedianStoppingRule", "FIFOScheduler",
|
||||
"PopulationBasedTraining", "PopulationBasedTrainingReplay",
|
||||
"HyperBandForBOHB"
|
||||
"HyperBandForBOHB", "PB2"
|
||||
]
|
||||
|
||||
@@ -0,0 +1,376 @@
|
||||
from copy import deepcopy
|
||||
import logging
|
||||
from typing import Dict, Optional
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from ray.tune import TuneError
|
||||
|
||||
from ray.tune.schedulers import PopulationBasedTraining
|
||||
|
||||
try:
|
||||
import GPy
|
||||
_has_gpy = True
|
||||
except ImportError:
|
||||
_has_gpy = False
|
||||
|
||||
try:
|
||||
import sklearn # noqa: F401
|
||||
_has_sklearn = True
|
||||
except ImportError:
|
||||
_has_sklearn = False
|
||||
|
||||
|
||||
def is_gpy_available():
|
||||
return _has_gpy
|
||||
|
||||
|
||||
def is_sklearn_available():
|
||||
return _has_sklearn
|
||||
|
||||
|
||||
if is_gpy_available():
|
||||
from ray.tune.schedulers.pb2_utils import normalize, optimize_acq, \
|
||||
select_length, UCB, standardize, TV_SquaredExp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def select_config(Xraw, yraw, current, newpoint, bounds, num_f):
|
||||
"""Selects the next hyperparameter config to try.
|
||||
|
||||
This function takes the formatted data, fits the GP model and optimizes the
|
||||
UCB acquisition function to select the next point.
|
||||
|
||||
Args:
|
||||
Xraw (np.array): The un-normalized array of hyperparams, Time and
|
||||
Reward
|
||||
yraw (np.array): The un-normalized vector of reward changes.
|
||||
current (list): The hyperparams of trials currently running. This is
|
||||
important so we do not select the same config twice. If there is
|
||||
data here then we fit a second GP including it
|
||||
(with fake y labels). The GP variance doesn't depend on the y
|
||||
labels so it is ok.
|
||||
newpoint (np.array): The Reward and Time for the new point.
|
||||
We cannot change these as they are based on the *new weights*.
|
||||
bounds (dict): Bounds for the hyperparameters. Used to normalize.
|
||||
num_f (int): The number of fixed params. Almost always 2 (reward+time)
|
||||
|
||||
Return:
|
||||
xt (np.array): A vector of new hyperparameters.
|
||||
"""
|
||||
length = select_length(Xraw, yraw, bounds, num_f)
|
||||
|
||||
Xraw = Xraw[-length:, :]
|
||||
yraw = yraw[-length:]
|
||||
|
||||
base_vals = np.array(list(bounds.values())).T
|
||||
oldpoints = Xraw[:, :num_f]
|
||||
old_lims = np.concatenate((np.max(oldpoints, axis=0),
|
||||
np.min(oldpoints, axis=0))).reshape(
|
||||
2, oldpoints.shape[1])
|
||||
limits = np.concatenate((old_lims, base_vals), axis=1)
|
||||
|
||||
X = normalize(Xraw, limits)
|
||||
y = standardize(yraw).reshape(yraw.size, 1)
|
||||
|
||||
fixed = normalize(newpoint, oldpoints)
|
||||
|
||||
kernel = TV_SquaredExp(
|
||||
input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1)
|
||||
|
||||
try:
|
||||
m = GPy.models.GPRegression(X, y, kernel)
|
||||
except np.linalg.LinAlgError:
|
||||
# add diagonal ** we would ideally make this something more robust...
|
||||
X += np.eye(X.shape[0]) * 1e-3
|
||||
m = GPy.models.GPRegression(X, y, kernel)
|
||||
|
||||
try:
|
||||
m.optimize()
|
||||
except np.linalg.LinAlgError:
|
||||
# add diagonal ** we would ideally make this something more robust...
|
||||
X += np.eye(X.shape[0]) * 1e-3
|
||||
m = GPy.models.GPRegression(X, y, kernel)
|
||||
m.optimize()
|
||||
|
||||
m.kern.lengthscale.fix(m.kern.lengthscale.clip(1e-5, 1))
|
||||
|
||||
if current is None:
|
||||
m1 = deepcopy(m)
|
||||
else:
|
||||
# add the current trials to the dataset
|
||||
padding = np.array([fixed for _ in range(current.shape[0])])
|
||||
current = normalize(current, base_vals)
|
||||
current = np.hstack((padding, current))
|
||||
|
||||
Xnew = np.vstack((X, current))
|
||||
ypad = np.zeros(current.shape[0])
|
||||
ypad = ypad.reshape(-1, 1)
|
||||
ynew = np.vstack((y, ypad))
|
||||
|
||||
# kernel = GPy.kern.RBF(input_dim=X.shape[1], variance=1.,
|
||||
# lengthscale=1.)
|
||||
kernel = TV_SquaredExp(
|
||||
input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1)
|
||||
m1 = GPy.models.GPRegression(Xnew, ynew, kernel)
|
||||
m1.optimize()
|
||||
|
||||
xt = optimize_acq(UCB, m, m1, fixed, num_f)
|
||||
|
||||
# convert back...
|
||||
xt = xt * (np.max(base_vals, axis=0) - np.min(base_vals, axis=0)) + np.min(
|
||||
base_vals, axis=0)
|
||||
|
||||
xt = xt.astype(np.float32)
|
||||
return (xt)
|
||||
|
||||
|
||||
def explore(data, bounds, current, base, old, config):
|
||||
"""Returns next hyperparameter configuration to use.
|
||||
|
||||
This function primarily processes the data from completed trials
|
||||
and then requests the next config from the select_config function.
|
||||
It then adds the new trial to the dataframe, so that the reward change
|
||||
can be computed using the new weights.
|
||||
It returns the new point and the dataframe with the new entry.
|
||||
"""
|
||||
|
||||
df = data.sort_values(by="Time").reset_index(drop=True)
|
||||
|
||||
# Group by trial ID and hyperparams.
|
||||
# Compute change in timesteps and reward.
|
||||
df["y"] = df.groupby(["Trial"] + list(bounds.keys()))["Reward"].diff()
|
||||
df["t_change"] = df.groupby(["Trial"] + list(bounds.keys()))["Time"].diff()
|
||||
|
||||
# Delete entries without positive change in t.
|
||||
df = df[df["t_change"] > 0].reset_index(drop=True)
|
||||
df["R_before"] = df.Reward - df.y
|
||||
|
||||
# Normalize the reward change by the update size.
|
||||
# For example if trials took diff lengths of time.
|
||||
df["y"] = df.y / df.t_change
|
||||
df = df[~df.y.isna()].reset_index(drop=True)
|
||||
df = df.sort_values(by="Time").reset_index(drop=True)
|
||||
|
||||
# Only use the last 1k datapoints, so the GP is not too slow.
|
||||
df = df.iloc[-1000:, :].reset_index(drop=True)
|
||||
|
||||
# We need this to know the T and Reward for the weights.
|
||||
dfnewpoint = df[df["Trial"] == str(base)]
|
||||
|
||||
if not dfnewpoint.empty:
|
||||
# N ow specify the dataset for the GP.
|
||||
y = np.array(df.y.values)
|
||||
# Meta data we keep -> episodes and reward.
|
||||
# (TODO: convert to curve)
|
||||
t_r = df[["Time", "R_before"]]
|
||||
hparams = df[bounds.keys()]
|
||||
X = pd.concat([t_r, hparams], axis=1).values
|
||||
newpoint = df[df["Trial"] == str(base)].iloc[-1, :][[
|
||||
"Time", "R_before"
|
||||
]].values
|
||||
new = select_config(
|
||||
X, y, current, newpoint, bounds, num_f=len(t_r.columns))
|
||||
|
||||
new_config = config.copy()
|
||||
values = []
|
||||
for i, col in enumerate(hparams.columns):
|
||||
if isinstance(config[col], int):
|
||||
new_config[col] = int(new[i])
|
||||
values.append(int(new[i]))
|
||||
else:
|
||||
new_config[col] = new[i]
|
||||
values.append(new[i])
|
||||
|
||||
new_T = df[df["Trial"] == str(base)].iloc[-1, :]["Time"]
|
||||
new_Reward = df[df["Trial"] == str(base)].iloc[-1, :].Reward
|
||||
|
||||
lst = [[old] + [new_T] + values + [new_Reward]]
|
||||
cols = ["Trial", "Time"] + list(bounds) + ["Reward"]
|
||||
new_entry = pd.DataFrame(lst, columns=cols)
|
||||
|
||||
# Create an entry for the new config, with the reward from the
|
||||
# copied agent.
|
||||
data = pd.concat([data, new_entry]).reset_index(drop=True)
|
||||
|
||||
else:
|
||||
new_config = config.copy()
|
||||
|
||||
return new_config, data
|
||||
|
||||
|
||||
class PB2(PopulationBasedTraining):
|
||||
"""Implements the Population Based Bandit (PB2) algorithm.
|
||||
|
||||
PB2 trains a group of models (or agents) in parallel. Periodically, poorly
|
||||
performing models clone the state of the top performers, and the hyper-
|
||||
parameters are re-selected using GP-bandit optimization. The GP model is
|
||||
trained to predict the improvement in the next training period.
|
||||
|
||||
Like PBT, PB2 adapts hyperparameters during training time. This enables
|
||||
very fast hyperparameter discovery and also automatically discovers
|
||||
schedules.
|
||||
|
||||
This Tune PB2 implementation is built on top of Tune's PBT implementation.
|
||||
It considers all trials added as part of the PB2 population. If the number
|
||||
of trials exceeds the cluster capacity, they will be time-multiplexed as to
|
||||
balance training progress across the population. To run multiple trials,
|
||||
use `tune.run(num_samples=<int>)`.
|
||||
|
||||
In {LOG_DIR}/{MY_EXPERIMENT_NAME}/, all mutations are logged in
|
||||
`pb2_global.txt` and individual policy perturbations are recorded
|
||||
in pb2_policy_{i}.txt. Tune logs: [target trial tag, clone trial tag,
|
||||
target trial iteration, clone trial iteration, old config, new config]
|
||||
on each perturbation step.
|
||||
|
||||
Args:
|
||||
time_attr (str): The training result attr to use for comparing time.
|
||||
Note that you can pass in something non-temporal such as
|
||||
`training_iteration` as a measure of progress, the only requirement
|
||||
is that the attribute should increase monotonically.
|
||||
metric (str): The training result objective value attribute. Stopping
|
||||
procedures will use this attribute.
|
||||
mode (str): One of {min, max}. Determines whether objective is
|
||||
minimizing or maximizing the metric attribute.
|
||||
perturbation_interval (float): Models will be considered for
|
||||
perturbation at this interval of `time_attr`. Note that
|
||||
perturbation incurs checkpoint overhead, so you shouldn't set this
|
||||
to be too frequent.
|
||||
hyperparam_bounds (dict): Hyperparameters to mutate. The format is
|
||||
as follows: for each key, enter a list of the form [min, max]
|
||||
representing the minimum and maximum possible hyperparam values.
|
||||
quantile_fraction (float): Parameters are transferred from the top
|
||||
`quantile_fraction` fraction of trials to the bottom
|
||||
`quantile_fraction` fraction. Needs to be between 0 and 0.5.
|
||||
Setting it to 0 essentially implies doing no exploitation at all.
|
||||
log_config (bool): Whether to log the ray config of each model to
|
||||
local_dir at each exploit. Allows config schedule to be
|
||||
reconstructed.
|
||||
require_attrs (bool): Whether to require time_attr and metric to appear
|
||||
in result for every iteration. If True, error will be raised
|
||||
if these values are not present in trial result.
|
||||
synch (bool): If False, will use asynchronous implementation of
|
||||
PBT. Trial perturbations occur every perturbation_interval for each
|
||||
trial independently. If True, will use synchronous implementation
|
||||
of PBT. Perturbations will occur only after all trials are
|
||||
synced at the same time_attr every perturbation_interval.
|
||||
Defaults to False. See Appendix A.1 here
|
||||
https://arxiv.org/pdf/1711.09846.pdf.
|
||||
|
||||
Example:
|
||||
>>> pb2 = PB2(
|
||||
>>> time_attr="timesteps_total",
|
||||
>>> metric="episode_reward_mean",
|
||||
>>> mode="max",
|
||||
>>> perturbation_interval=10000,
|
||||
>>> hyperparam_mutations={
|
||||
>>> # These must be continuous, currently a limitation.
|
||||
>>> "factor_1": lambda: random.uniform(0.0, 20.0),
|
||||
>>> })
|
||||
>>> tune.run({...}, num_samples=8, scheduler=pb2)
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
time_attr: str = "time_total_s",
|
||||
reward_attr: Optional[str] = None,
|
||||
metric: Optional[str] = None,
|
||||
mode: Optional[str] = None,
|
||||
perturbation_interval: float = 60.0,
|
||||
hyperparam_bounds: Dict = None,
|
||||
quantile_fraction: float = 0.25,
|
||||
log_config: bool = True,
|
||||
require_attrs: bool = True,
|
||||
synch: bool = False):
|
||||
|
||||
if not is_gpy_available():
|
||||
raise RuntimeError("Please install GPy to use PB2.")
|
||||
|
||||
if not is_sklearn_available():
|
||||
raise RuntimeError("Please install scikit-learn to use PB2.")
|
||||
|
||||
hyperparam_bounds = hyperparam_bounds or {}
|
||||
for value in hyperparam_bounds.values():
|
||||
if not isinstance(value, (list, tuple)) or len(value) != 2:
|
||||
raise ValueError("`hyperparam_bounds` values must either be "
|
||||
"a list or tuple of size 2, but got {} "
|
||||
"instead".format(value))
|
||||
|
||||
if not hyperparam_bounds:
|
||||
raise TuneError("`hyperparam_bounds` must be specified to use "
|
||||
"PB2 scheduler.")
|
||||
|
||||
super(PB2, self).__init__(
|
||||
time_attr=time_attr,
|
||||
reward_attr=reward_attr,
|
||||
metric=metric,
|
||||
mode=mode,
|
||||
perturbation_interval=perturbation_interval,
|
||||
hyperparam_mutations=hyperparam_bounds,
|
||||
quantile_fraction=quantile_fraction,
|
||||
resample_probability=0,
|
||||
custom_explore_fn=explore,
|
||||
log_config=log_config,
|
||||
require_attrs=require_attrs,
|
||||
synch=synch)
|
||||
|
||||
self.last_exploration_time = 0 # when we last explored
|
||||
self.data = pd.DataFrame()
|
||||
|
||||
self._hyperparam_bounds = hyperparam_bounds
|
||||
|
||||
# Current = trials running that have already re-started after reaching
|
||||
# the checkpoint. When exploring we care if these trials
|
||||
# are already in or scheduled to be in the next round.
|
||||
self.current = None
|
||||
|
||||
def _save_trial_state(self, state, time, result, trial):
|
||||
|
||||
score = super(PB2, self)._save_trial_state(state, time, result, trial)
|
||||
|
||||
# Data logging for PB2.
|
||||
|
||||
# Collect hyperparams names and current values for this trial.
|
||||
names = []
|
||||
values = []
|
||||
for key in self._hyperparam_bounds:
|
||||
names.append(str(key))
|
||||
values.append(trial.config[key])
|
||||
|
||||
# Store trial state and hyperparams in dataframe.
|
||||
# this needs to be made more general.
|
||||
lst = [[trial, result[self._time_attr]] + values + [score]]
|
||||
cols = ["Trial", "Time"] + names + ["Reward"]
|
||||
entry = pd.DataFrame(lst, columns=cols)
|
||||
|
||||
self.data = pd.concat([self.data, entry]).reset_index(drop=True)
|
||||
self.data.Trial = self.data.Trial.astype("str")
|
||||
|
||||
def _get_new_config(self, trial, trial_to_clone):
|
||||
# If we are at a new timestep, we dont want to penalise for trials
|
||||
# still going.
|
||||
if self.data["Time"].max() > self.last_exploration_time:
|
||||
self.current = None
|
||||
|
||||
new_config, data = explore(self.data, self._hyperparam_bounds,
|
||||
self.current, trial_to_clone, trial,
|
||||
trial_to_clone.config)
|
||||
|
||||
# Important to replace the old values, since we are copying across
|
||||
self.data = data.copy()
|
||||
|
||||
# If the current guy being selecting is at a point that is already
|
||||
# done, then append the data to the "current" which contains the
|
||||
# points in the current batch.
|
||||
new = [new_config[key] for key in self._hyperparam_bounds]
|
||||
|
||||
new = np.array(new)
|
||||
new = new.reshape(1, new.size)
|
||||
if self.data["Time"].max() > self.last_exploration_time:
|
||||
self.last_exploration_time = self.data["Time"].max()
|
||||
self.current = new.copy()
|
||||
else:
|
||||
self.current = np.concatenate((self.current, new), axis=0)
|
||||
logger.debug(self.current)
|
||||
|
||||
return (new_config)
|
||||
@@ -0,0 +1,191 @@
|
||||
import numpy as np
|
||||
from scipy.optimize import minimize
|
||||
|
||||
from ray.tune.schedulers.pb2 import is_gpy_available, is_sklearn_available
|
||||
|
||||
if is_gpy_available():
|
||||
import GPy
|
||||
from GPy.kern import Kern
|
||||
from GPy.core import Param
|
||||
|
||||
if is_sklearn_available():
|
||||
from sklearn.metrics import pairwise_distances
|
||||
from sklearn.metrics.pairwise import euclidean_distances
|
||||
|
||||
|
||||
class TV_SquaredExp(Kern):
|
||||
""" Time varying squared exponential kernel.
|
||||
For more info see the TV-GP-UCB paper:
|
||||
http://proceedings.mlr.press/v51/bogunovic16.pdf
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
input_dim,
|
||||
variance=1.,
|
||||
lengthscale=1.,
|
||||
epsilon=0.,
|
||||
active_dims=None):
|
||||
super().__init__(input_dim, active_dims, "time_se")
|
||||
self.variance = Param("variance", variance)
|
||||
self.lengthscale = Param("lengthscale", lengthscale)
|
||||
self.epsilon = Param("epsilon", epsilon)
|
||||
self.link_parameters(self.variance, self.lengthscale, self.epsilon)
|
||||
|
||||
def K(self, X, X2):
|
||||
# time must be in the far left column
|
||||
if self.epsilon > 0.5: # 0.5
|
||||
self.epsilon = 0.5
|
||||
if X2 is None:
|
||||
X2 = np.copy(X)
|
||||
T1 = X[:, 0].reshape(-1, 1)
|
||||
T2 = X2[:, 0].reshape(-1, 1)
|
||||
dists = pairwise_distances(T1, T2, "cityblock")
|
||||
timekernel = (1 - self.epsilon)**(0.5 * dists)
|
||||
|
||||
X = X[:, 1:]
|
||||
X2 = X2[:, 1:]
|
||||
|
||||
RBF = self.variance * np.exp(
|
||||
-np.square(euclidean_distances(X, X2)) / self.lengthscale)
|
||||
|
||||
return RBF * timekernel
|
||||
|
||||
def Kdiag(self, X):
|
||||
return self.variance * np.ones(X.shape[0])
|
||||
|
||||
def update_gradients_full(self, dL_dK, X, X2):
|
||||
if X2 is None:
|
||||
X2 = np.copy(X)
|
||||
T1 = X[:, 0].reshape(-1, 1)
|
||||
T2 = X2[:, 0].reshape(-1, 1)
|
||||
|
||||
X = X[:, 1:]
|
||||
X2 = X2[:, 1:]
|
||||
dist2 = np.square(euclidean_distances(X, X2)) / self.lengthscale
|
||||
|
||||
dvar = np.exp(-np.square(
|
||||
(euclidean_distances(X, X2)) / self.lengthscale))
|
||||
dl = -(2 * euclidean_distances(X, X2)**2 * self.variance *
|
||||
np.exp(-dist2)) * self.lengthscale**(-2)
|
||||
n = pairwise_distances(T1, T2, "cityblock") / 2
|
||||
deps = -n * (1 - self.epsilon)**(n - 1)
|
||||
|
||||
self.variance.gradient = np.sum(dvar * dL_dK)
|
||||
self.lengthscale.gradient = np.sum(dl * dL_dK)
|
||||
self.epsilon.gradient = np.sum(deps * dL_dK)
|
||||
|
||||
|
||||
def normalize(data, wrt):
|
||||
""" Normalize data to be in range (0,1), with respect to (wrt) boundaries,
|
||||
which can be specified.
|
||||
"""
|
||||
return (data - np.min(wrt, axis=0)) / (
|
||||
np.max(wrt, axis=0) - np.min(wrt, axis=0))
|
||||
|
||||
|
||||
def standardize(data):
|
||||
""" Standardize to be Gaussian N(0,1). Clip final values.
|
||||
"""
|
||||
data = (data - np.mean(data, axis=0)) / (np.std(data, axis=0) + 1e-8)
|
||||
return np.clip(data, -2, 2)
|
||||
|
||||
|
||||
def UCB(m, m1, x, fixed, kappa=0.5):
|
||||
""" UCB acquisition function. Interesting points to note:
|
||||
1) We concat with the fixed points, because we are not optimizing wrt
|
||||
these. This is the Reward and Time, which we can't change. We want
|
||||
to find the best hyperparameters *given* the reward and time.
|
||||
2) We use m to get the mean and m1 to get the variance. If we already
|
||||
have trials running, then m1 contains this information. This reduces
|
||||
the variance at points currently running, even if we don't have
|
||||
their label.
|
||||
Ref: https://jmlr.org/papers/volume15/desautels14a/desautels14a.pdf
|
||||
|
||||
"""
|
||||
|
||||
c1 = 0.2
|
||||
c2 = 0.4
|
||||
beta_t = c1 * np.log(c2 * m.X.shape[0])
|
||||
kappa = np.sqrt(beta_t)
|
||||
|
||||
xtest = np.concatenate((fixed.reshape(-1, 1), np.array(x).reshape(-1,
|
||||
1))).T
|
||||
|
||||
try:
|
||||
preds = m.predict(xtest)
|
||||
preds = m.predict(xtest)
|
||||
mean = preds[0][0][0]
|
||||
except ValueError:
|
||||
mean = -9999
|
||||
|
||||
try:
|
||||
preds = m1.predict(xtest)
|
||||
var = preds[1][0][0]
|
||||
except ValueError:
|
||||
var = 0
|
||||
return mean + kappa * var
|
||||
|
||||
|
||||
def optimize_acq(func, m, m1, fixed, num_f):
|
||||
""" Optimize acquisition function."""
|
||||
|
||||
opts = {"maxiter": 200, "maxfun": 200, "disp": False}
|
||||
|
||||
T = 10
|
||||
best_value = -999
|
||||
best_theta = m1.X[0, :]
|
||||
|
||||
bounds = [(0, 1) for _ in range(m.X.shape[1] - num_f)]
|
||||
|
||||
for ii in range(T):
|
||||
x0 = np.random.uniform(0, 1, m.X.shape[1] - num_f)
|
||||
|
||||
res = minimize(
|
||||
lambda x: -func(m, m1, x, fixed),
|
||||
x0,
|
||||
bounds=bounds,
|
||||
method="L-BFGS-B",
|
||||
options=opts)
|
||||
|
||||
val = func(m, m1, res.x, fixed)
|
||||
if val > best_value:
|
||||
best_value = val
|
||||
best_theta = res.x
|
||||
|
||||
return (np.clip(best_theta, 0, 1))
|
||||
|
||||
|
||||
def select_length(Xraw, yraw, bounds, num_f):
|
||||
"""Select the number of datapoints to keep, using cross validation
|
||||
"""
|
||||
min_len = 200
|
||||
|
||||
if Xraw.shape[0] < min_len:
|
||||
return (Xraw.shape[0])
|
||||
else:
|
||||
length = min_len - 10
|
||||
scores = []
|
||||
while length + 10 <= Xraw.shape[0]:
|
||||
length += 10
|
||||
|
||||
base_vals = np.array(list(bounds.values())).T
|
||||
X_len = Xraw[-length:, :]
|
||||
y_len = yraw[-length:]
|
||||
oldpoints = X_len[:, :num_f]
|
||||
old_lims = np.concatenate((np.max(oldpoints, axis=0),
|
||||
np.min(oldpoints, axis=0))).reshape(
|
||||
2, oldpoints.shape[1])
|
||||
limits = np.concatenate((old_lims, base_vals), axis=1)
|
||||
|
||||
X = normalize(X_len, limits)
|
||||
y = standardize(y_len).reshape(y_len.size, 1)
|
||||
|
||||
kernel = TV_SquaredExp(
|
||||
input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1)
|
||||
m = GPy.models.GPRegression(X, y, kernel)
|
||||
m.optimize(messages=True)
|
||||
|
||||
scores.append(m.log_likelihood())
|
||||
idx = np.argmax(scores)
|
||||
length = (idx + int((min_len / 10))) * 10
|
||||
return (length)
|
||||
@@ -243,10 +243,10 @@ class PopulationBasedTraining(FIFOScheduler):
|
||||
"You must use other built in primitives like"
|
||||
"tune.uniform, tune.loguniform, etc.")
|
||||
|
||||
if not hyperparam_mutations and not custom_explore_fn:
|
||||
raise TuneError(
|
||||
"You must specify at least one of `hyperparam_mutations` or "
|
||||
"`custom_explore_fn` to use PBT.")
|
||||
if not hyperparam_mutations and not custom_explore_fn:
|
||||
raise TuneError(
|
||||
"You must specify at least one of `hyperparam_mutations` "
|
||||
"or `custom_explore_fn` to use PBT.")
|
||||
|
||||
if quantile_fraction > 0.5 or quantile_fraction < 0:
|
||||
raise ValueError(
|
||||
@@ -378,11 +378,7 @@ class PopulationBasedTraining(FIFOScheduler):
|
||||
if time - state.last_perturbation_time < self._perturbation_interval:
|
||||
return TrialScheduler.CONTINUE # avoid checkpoint overhead
|
||||
|
||||
# This trial has reached its perturbation interval
|
||||
score = self._metric_op * result[self._metric]
|
||||
state.last_score = score
|
||||
state.last_train_time = time
|
||||
state.last_result = result
|
||||
self._save_trial_state(state, time, result, trial)
|
||||
|
||||
if not self._synch:
|
||||
state.last_perturbation_time = time
|
||||
@@ -433,6 +429,25 @@ class PopulationBasedTraining(FIFOScheduler):
|
||||
# the paused trials.
|
||||
return TrialScheduler.PAUSE
|
||||
|
||||
def _save_trial_state(self, state: PBTTrialState, time: int, result: Dict,
|
||||
trial: Trial):
|
||||
"""Saves necessary trial information when result is received.
|
||||
Args:
|
||||
state (PBTTrialState): The state object for the trial.
|
||||
time (int): The current timestep of the trial.
|
||||
result (dict): The trial's result dictionary.
|
||||
trial (dict): The trial object.
|
||||
"""
|
||||
|
||||
# This trial has reached its perturbation interval.
|
||||
# Record new state in the state object.
|
||||
score = self._metric_op * result[self._metric]
|
||||
state.last_score = score
|
||||
state.last_train_time = time
|
||||
state.last_result = result
|
||||
|
||||
return score
|
||||
|
||||
def _perturb_trial(
|
||||
self, trial: Trial, trial_runner: "trial_runner.TrialRunner",
|
||||
upper_quantile: List[Trial], lower_quantile: List[Trial]):
|
||||
@@ -457,6 +472,10 @@ class PopulationBasedTraining(FIFOScheduler):
|
||||
logger.debug("Trial {} is in lower quantile".format(trial))
|
||||
trial_to_clone = random.choice(upper_quantile)
|
||||
assert trial is not trial_to_clone
|
||||
if not self._trial_state[trial_to_clone].last_checkpoint:
|
||||
logger.info("[pbt]: no checkpoint for trial."
|
||||
" Skip exploit for Trial {}".format(trial))
|
||||
return
|
||||
self._exploit(trial_runner.trial_executor, trial, trial_to_clone)
|
||||
|
||||
def _log_config_on_step(self, trial_state: PBTTrialState,
|
||||
@@ -492,6 +511,11 @@ class PopulationBasedTraining(FIFOScheduler):
|
||||
with open(trial_path, "a+") as f:
|
||||
f.write(json.dumps(policy, cls=_SafeFallbackEncoder) + "\n")
|
||||
|
||||
def _get_new_config(self, trial, trial_to_clone):
|
||||
"""Gets new config for trial by exploring trial_to_clone's config."""
|
||||
return explore(trial_to_clone.config, self._hyperparam_mutations,
|
||||
self._resample_probability, self._custom_explore_fn)
|
||||
|
||||
def _exploit(self, trial_executor: "trial_executor.TrialExecutor",
|
||||
trial: Trial, trial_to_clone: Trial):
|
||||
"""Transfers perturbed state from trial_to_clone -> trial.
|
||||
@@ -500,17 +524,13 @@ class PopulationBasedTraining(FIFOScheduler):
|
||||
"""
|
||||
trial_state = self._trial_state[trial]
|
||||
new_state = self._trial_state[trial_to_clone]
|
||||
if not new_state.last_checkpoint:
|
||||
logger.info("[pbt]: no checkpoint for trial."
|
||||
" Skip exploit for Trial {}".format(trial))
|
||||
return
|
||||
new_config = explore(trial_to_clone.config, self._hyperparam_mutations,
|
||||
self._resample_probability,
|
||||
self._custom_explore_fn)
|
||||
logger.info("[exploit] transferring weights from trial "
|
||||
"{} (score {}) -> {} (score {})".format(
|
||||
trial_to_clone, new_state.last_score, trial,
|
||||
trial_state.last_score))
|
||||
|
||||
new_config = self._get_new_config(trial, trial_to_clone)
|
||||
|
||||
# Only log mutated hyperparameters and not entire config.
|
||||
old_hparams = {
|
||||
k: v
|
||||
|
||||
@@ -4,6 +4,7 @@ ConfigSpace==0.4.10
|
||||
dragonfly-opt
|
||||
gluoncv
|
||||
gym[atari]
|
||||
GPy
|
||||
h5py
|
||||
hpbandster
|
||||
hyperopt==0.1.2
|
||||
@@ -19,6 +20,7 @@ optuna
|
||||
pytest-remotedata>=0.3.1
|
||||
pytorch-lightning
|
||||
pytorch-lightning-bolts
|
||||
scikit-learn
|
||||
scikit-optimize
|
||||
sigopt
|
||||
smart_open
|
||||
|
||||
Reference in New Issue
Block a user