From e7aafd7d24fdbac9a9440293ec2022ade066ebca Mon Sep 17 00:00:00 2001 From: Jack Parker-Holder Date: Tue, 27 Oct 2020 08:03:21 +0000 Subject: [PATCH] [tune] PB2 (#11466) Co-authored-by: Sumanth Ratna Co-authored-by: Amog Kamsetty Co-authored-by: Amog Kamsetty Co-authored-by: Richard Liaw --- doc/source/tune/api_docs/schedulers.rst | 40 +- doc/source/tune/examples/index.rst | 1 + doc/source/tune/examples/pb2_example.rst | 6 + doc/source/tune/examples/pb2_ppo_example.rst | 6 + python/ray/tune/BUILD | 11 +- python/ray/tune/examples/pb2_example.py | 45 +++ python/ray/tune/examples/pb2_ppo_example.py | 145 +++++++ python/ray/tune/schedulers/__init__.py | 4 +- python/ray/tune/schedulers/pb2.py | 376 +++++++++++++++++++ python/ray/tune/schedulers/pb2_utils.py | 191 ++++++++++ python/ray/tune/schedulers/pbt.py | 52 ++- python/requirements_tune.txt | 2 + 12 files changed, 859 insertions(+), 20 deletions(-) create mode 100644 doc/source/tune/examples/pb2_example.rst create mode 100644 doc/source/tune/examples/pb2_ppo_example.rst create mode 100644 python/ray/tune/examples/pb2_example.py create mode 100644 python/ray/tune/examples/pb2_ppo_example.py create mode 100644 python/ray/tune/schedulers/pb2.py create mode 100644 python/ray/tune/schedulers/pb2_utils.py diff --git a/doc/source/tune/api_docs/schedulers.rst b/doc/source/tune/api_docs/schedulers.rst index f904a0b82..ee7cd5542 100644 --- a/doc/source/tune/api_docs/schedulers.rst +++ b/doc/source/tune/api_docs/schedulers.rst @@ -16,7 +16,7 @@ All Trial Schedulers take in a ``metric``, which is a value returned in the resu Summary ------- -Tune includes distributed implementations of early stopping algorithms such as `Median Stopping Rule `__, `HyperBand `__, and `ASHA `__. Tune also includes a distributed implementation of `Population Based Training (PBT) `__. +Tune includes distributed implementations of early stopping algorithms such as `Median Stopping Rule `__, `HyperBand `__, and `ASHA `__. Tune also includes a distributed implementation of `Population Based Training (PBT) `__ and `Population Based Bandits (PB2) `__. .. tip:: The easiest scheduler to start with is the ``ASHAScheduler`` which will aggressively terminate low-performing trials. @@ -48,7 +48,11 @@ When using schedulers, you may face compatibility issues, as shown in the below * - :ref:`Population Based Training ` - Yes - Not Compatible - - :doc:`Link ` + - :doc:`Link ` + * - :ref:`Population Based Bandits ` + - Yes + - Not Compatible + - :doc:`Basic Example `, :doc:`PPO example ` .. _tune-scheduler-hyperband: @@ -172,6 +176,38 @@ replay utility in practice. .. autoclass:: ray.tune.schedulers.PopulationBasedTrainingReplay + +.. _tune-scheduler-pb2: + +Population Based Bandits (PB2) (tune.schedulers.PB2) +------------------------------------------------------------------- + +Tune includes a distributed implementation of `Population Based Bandits (PB2) `__. This can be enabled by setting the ``scheduler`` parameter of ``tune.run``, e.g. + +.. code-block:: python + + pb2_scheduler = PB2( + time_attr='time_total_s', + metric='mean_accuracy', + mode='max', + perturbation_interval=600.0, + hyperparam_bounds={ + "lr": [1e-3, 1e-5], + "alpha": [0.0, 1.0], + ... + }) + tune.run( ... , scheduler=pb2_scheduler) + +This code builds upon PBT, with the main difference being that instead of using random perturbations, PB2 selects new hyperparameter configurations using a Gaussian Process model. + +When the PB2 scheduler is enabled, each trial variant is treated as a member of the population. Periodically, top-performing trials are checkpointed (this requires your Trainable to support :ref:`save and restore `). Low-performing trials clone the checkpoints of top performers and perturb the configurations in the hope of discovering an even better variation. + +The primary motivation for PB2 is the ability to find promising hyperparamters with only a small population size. With that in mind, you can run this :doc:`PB2 PPO example ` to compare PB2 vs. PBT, with a population size of ``4`` (as in the paper). The example uses the ``BipedalWalker`` environment so does not require any additional licenses. + + +.. autoclass:: ray.tune.schedulers.PB2 + + .. _tune-scheduler-bohb: BOHB (tune.schedulers.HyperBandForBOHB) diff --git a/doc/source/tune/examples/index.rst b/doc/source/tune/examples/index.rst index aeddc7347..3808e80c3 100644 --- a/doc/source/tune/examples/index.rst +++ b/doc/source/tune/examples/index.rst @@ -18,6 +18,7 @@ General Examples - :doc:`/tune/examples/pbt_example`: Example of using a Trainable class with PopulationBasedTraining scheduler. - :doc:`/tune/examples/pbt_function`: Example of using the function API with a PopulationBasedTraining scheduler. - :doc:`/tune/examples/pbt_ppo_example`: Example of optimizing a distributed RLlib algorithm (PPO) with the PopulationBasedTraining scheduler. +- :doc:`/tune/examples/pb2_ppo_example`: Example of optimizing a distributed RLlib algorithm (PPO) with the PB2 scheduler. Uses a small population size of 4, so can train on a laptop. - :doc:`/tune/examples/logging_example`: Example of custom loggers and custom trial directory naming. Search Algorithm Examples diff --git a/doc/source/tune/examples/pb2_example.rst b/doc/source/tune/examples/pb2_example.rst new file mode 100644 index 000000000..e40926c7c --- /dev/null +++ b/doc/source/tune/examples/pb2_example.rst @@ -0,0 +1,6 @@ +:orphan: + +pb2_example +~~~~~~~~~~~ + +.. literalinclude:: /../../python/ray/tune/examples/pb2_example.py \ No newline at end of file diff --git a/doc/source/tune/examples/pb2_ppo_example.rst b/doc/source/tune/examples/pb2_ppo_example.rst new file mode 100644 index 000000000..f36903c6f --- /dev/null +++ b/doc/source/tune/examples/pb2_ppo_example.rst @@ -0,0 +1,6 @@ +:orphan: + +pb2_ppo_example +~~~~~~~~~~~~~~~ + +.. literalinclude:: /../../python/ray/tune/examples/pb2_ppo_example.py diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index a5e422e88..bc3e1a2ef 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -80,7 +80,7 @@ py_test( py_test( name = "test_experiment_analysis_mem", - size = "small", + size = "medium", srcs = ["tests/test_experiment_analysis_mem.py"], deps = [":tune_lib"], ) @@ -520,6 +520,15 @@ py_test( args = ["--smoke-test"] ) +py_test( + name = "pb2_example", + size = "medium", + srcs = ["examples/pb2_example.py"], + deps = [":tune_lib"], + tags = ["exclusive", "example"], + args = ["--smoke-test"] +) + py_test( name = "pbt_convnet_example", size = "medium", diff --git a/python/ray/tune/examples/pb2_example.py b/python/ray/tune/examples/pb2_example.py new file mode 100644 index 000000000..927daca3a --- /dev/null +++ b/python/ray/tune/examples/pb2_example.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +import argparse + +import ray +from ray import tune +from ray.tune.schedulers import PB2 +from ray.tune.examples.pbt_function import pbt_function + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--smoke-test", action="store_true", help="Finish quickly for testing") + args, _ = parser.parse_known_args() + if args.smoke_test: + ray.init(num_cpus=2) # force pausing to happen for test + else: + ray.init() + + pbt = PB2( + time_attr="training_iteration", + metric="mean_accuracy", + mode="max", + perturbation_interval=20, + hyperparam_bounds={ + # hyperparameter bounds. + "lr": [0.0001, 0.02], + }) + + tune.run( + pbt_function, + name="pbt_test", + scheduler=pbt, + verbose=False, + stop={ + "training_iteration": 30, + }, + num_samples=8, + fail_fast=True, + config={ + "lr": 0.0001, + # note: this parameter is perturbed but has no effect on + # the model training in this example + "some_other_factor": 1, + }) diff --git a/python/ray/tune/examples/pb2_ppo_example.py b/python/ray/tune/examples/pb2_ppo_example.py new file mode 100644 index 000000000..f2c4268c7 --- /dev/null +++ b/python/ray/tune/examples/pb2_ppo_example.py @@ -0,0 +1,145 @@ +import os +import random +import argparse +import pandas as pd +from datetime import datetime + +import ray +from ray.tune import run, sample_from +from ray.tune.schedulers import PB2, PopulationBasedTraining + + +# Postprocess the perturbed config to ensure it's still valid used if PBT. +def explore(config): + # Ensure we collect enough timesteps to do sgd. + if config["train_batch_size"] < config["sgd_minibatch_size"] * 2: + config["train_batch_size"] = config["sgd_minibatch_size"] * 2 + # Ensure we run at least one sgd iter. + if config["lambda"] > 1: + config["lambda"] = 1 + config["train_batch_size"] = int(config["train_batch_size"]) + return config + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument("--max", type=int, default=1000000) + parser.add_argument("--algo", type=str, default="PPO") + parser.add_argument("--num_workers", type=int, default=4) + parser.add_argument("--num_samples", type=int, default=4) + parser.add_argument("--t_ready", type=int, default=50000) + parser.add_argument("--seed", type=int, default=0) + parser.add_argument( + "--horizon", type=int, default=1600) # make this 1000 for other envs + parser.add_argument("--perturb", type=float, default=0.25) # if using PBT + parser.add_argument("--env_name", type=str, default="BipedalWalker-v2") + parser.add_argument( + "--criteria", type=str, + default="timesteps_total") # "training_iteration", "time_total_s" + parser.add_argument( + "--net", type=str, default="32_32" + ) # May be important to use a larger network for bigger tasks. + parser.add_argument("--filename", type=str, default="") + parser.add_argument("--method", type=str, default="pb2") # ['pbt', 'pb2'] + parser.add_argument("--save_csv", type=bool, default=False) + + args = parser.parse_args() + ray.init() + + # bipedalwalker needs 1600 + if args.env_name in ["BipedalWalker-v2", "BipedalWalker-v3"]: + horizon = 1600 + else: + horizon = 1000 + + pbt = PopulationBasedTraining( + time_attr=args.criteria, + metric="episode_reward_mean", + mode="max", + perturbation_interval=args.t_ready, + resample_probability=args.perturb, + quantile_fraction=args.perturb, # copy bottom % with top % + # Specifies the search space for these hyperparams + hyperparam_mutations={ + "lambda": lambda: random.uniform(0.9, 1.0), + "clip_param": lambda: random.uniform(0.1, 0.5), + "lr": lambda: random.uniform(1e-3, 1e-5), + "train_batch_size": lambda: random.randint(1000, 60000), + }, + custom_explore_fn=explore) + + pb2 = PB2( + time_attr=args.criteria, + metric="episode_reward_mean", + mode="max", + perturbation_interval=args.t_ready, + quantile_fraction=args.perturb, # copy bottom % with top % + # Specifies the hyperparam search space + hyperparam_bounds={ + "lambda": [0.9, 1.0], + "clip_param": [0.1, 0.5], + "lr": [1e-3, 1e-5], + "train_batch_size": [1000, 60000] + }) + + methods = {"pbt": pbt, "pb2": pb2} + + timelog = str(datetime.date(datetime.now())) + "_" + str( + datetime.time(datetime.now())) + + args.dir = "{}_{}_{}_Size{}_{}_{}".format(args.algo, + args.filename, args.method, + str(args.num_samples), + args.env_name, args.criteria) + + analysis = run( + args.algo, + name="{}_{}_{}_seed{}_{}".format(timelog, args.method, args.env_name, + str(args.seed), args.filename), + scheduler=methods[args.method], + verbose=1, + num_samples=args.num_samples, + stop={args.criteria: args.max}, + config={ + "env": args.env_name, + "log_level": "INFO", + "seed": args.seed, + "kl_coeff": 1.0, + "num_gpus": 0, + "horizon": horizon, + "observation_filter": "MeanStdFilter", + "model": { + "fcnet_hiddens": [ + int(args.net.split("_")[0]), + int(args.net.split("_")[1]) + ], + "free_log_std": True + }, + "num_sgd_iter": 10, + "sgd_minibatch_size": 128, + "lambda": sample_from(lambda spec: random.uniform(0.9, 1.0)), + "clip_param": sample_from(lambda spec: random.uniform(0.1, 0.5)), + "lr": sample_from(lambda spec: random.uniform(1e-3, 1e-5)), + "train_batch_size": sample_from( + lambda spec: random.randint(1000, 60000)) + }) + + all_dfs = analysis.trial_dataframes + names = list(all_dfs.keys()) + + results = pd.DataFrame() + for i in range(args.num_samples): + df = all_dfs[names[i]] + df = df[[ + "timesteps_total", "episodes_total", "episode_reward_mean", + "info/learner/default_policy/cur_kl_coeff" + ]] + df["Agent"] = i + results = pd.concat([results, df]).reset_index(drop=True) + + if args.save_csv: + if not (os.path.exists("data/" + args.dir)): + os.makedirs("data/" + args.dir) + + results.to_csv("data/{}/seed{}.csv".format(args.dir, str(args.seed))) diff --git a/python/ray/tune/schedulers/__init__.py b/python/ray/tune/schedulers/__init__.py index 3bbb8cc70..9f163eb82 100644 --- a/python/ray/tune/schedulers/__init__.py +++ b/python/ray/tune/schedulers/__init__.py @@ -6,6 +6,7 @@ from ray.tune.schedulers.async_hyperband import (AsyncHyperBandScheduler, from ray.tune.schedulers.median_stopping_rule import MedianStoppingRule from ray.tune.schedulers.pbt import (PopulationBasedTraining, PopulationBasedTrainingReplay) +from ray.tune.schedulers.pb2 import PB2 def create_scheduler( @@ -37,6 +38,7 @@ def create_scheduler( "hb_bohb": HyperBandForBOHB, "pbt": PopulationBasedTraining, "pbt_replay": PopulationBasedTrainingReplay, + "pb2": PB2, } scheduler = scheduler.lower() if scheduler not in SCHEDULER_IMPORT: @@ -52,5 +54,5 @@ __all__ = [ "TrialScheduler", "HyperBandScheduler", "AsyncHyperBandScheduler", "ASHAScheduler", "MedianStoppingRule", "FIFOScheduler", "PopulationBasedTraining", "PopulationBasedTrainingReplay", - "HyperBandForBOHB" + "HyperBandForBOHB", "PB2" ] diff --git a/python/ray/tune/schedulers/pb2.py b/python/ray/tune/schedulers/pb2.py new file mode 100644 index 000000000..afee10092 --- /dev/null +++ b/python/ray/tune/schedulers/pb2.py @@ -0,0 +1,376 @@ +from copy import deepcopy +import logging +from typing import Dict, Optional + +import numpy as np +import pandas as pd +from ray.tune import TuneError + +from ray.tune.schedulers import PopulationBasedTraining + +try: + import GPy + _has_gpy = True +except ImportError: + _has_gpy = False + +try: + import sklearn # noqa: F401 + _has_sklearn = True +except ImportError: + _has_sklearn = False + + +def is_gpy_available(): + return _has_gpy + + +def is_sklearn_available(): + return _has_sklearn + + +if is_gpy_available(): + from ray.tune.schedulers.pb2_utils import normalize, optimize_acq, \ + select_length, UCB, standardize, TV_SquaredExp + +logger = logging.getLogger(__name__) + + +def select_config(Xraw, yraw, current, newpoint, bounds, num_f): + """Selects the next hyperparameter config to try. + + This function takes the formatted data, fits the GP model and optimizes the + UCB acquisition function to select the next point. + + Args: + Xraw (np.array): The un-normalized array of hyperparams, Time and + Reward + yraw (np.array): The un-normalized vector of reward changes. + current (list): The hyperparams of trials currently running. This is + important so we do not select the same config twice. If there is + data here then we fit a second GP including it + (with fake y labels). The GP variance doesn't depend on the y + labels so it is ok. + newpoint (np.array): The Reward and Time for the new point. + We cannot change these as they are based on the *new weights*. + bounds (dict): Bounds for the hyperparameters. Used to normalize. + num_f (int): The number of fixed params. Almost always 2 (reward+time) + + Return: + xt (np.array): A vector of new hyperparameters. + """ + length = select_length(Xraw, yraw, bounds, num_f) + + Xraw = Xraw[-length:, :] + yraw = yraw[-length:] + + base_vals = np.array(list(bounds.values())).T + oldpoints = Xraw[:, :num_f] + old_lims = np.concatenate((np.max(oldpoints, axis=0), + np.min(oldpoints, axis=0))).reshape( + 2, oldpoints.shape[1]) + limits = np.concatenate((old_lims, base_vals), axis=1) + + X = normalize(Xraw, limits) + y = standardize(yraw).reshape(yraw.size, 1) + + fixed = normalize(newpoint, oldpoints) + + kernel = TV_SquaredExp( + input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1) + + try: + m = GPy.models.GPRegression(X, y, kernel) + except np.linalg.LinAlgError: + # add diagonal ** we would ideally make this something more robust... + X += np.eye(X.shape[0]) * 1e-3 + m = GPy.models.GPRegression(X, y, kernel) + + try: + m.optimize() + except np.linalg.LinAlgError: + # add diagonal ** we would ideally make this something more robust... + X += np.eye(X.shape[0]) * 1e-3 + m = GPy.models.GPRegression(X, y, kernel) + m.optimize() + + m.kern.lengthscale.fix(m.kern.lengthscale.clip(1e-5, 1)) + + if current is None: + m1 = deepcopy(m) + else: + # add the current trials to the dataset + padding = np.array([fixed for _ in range(current.shape[0])]) + current = normalize(current, base_vals) + current = np.hstack((padding, current)) + + Xnew = np.vstack((X, current)) + ypad = np.zeros(current.shape[0]) + ypad = ypad.reshape(-1, 1) + ynew = np.vstack((y, ypad)) + + # kernel = GPy.kern.RBF(input_dim=X.shape[1], variance=1., + # lengthscale=1.) + kernel = TV_SquaredExp( + input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1) + m1 = GPy.models.GPRegression(Xnew, ynew, kernel) + m1.optimize() + + xt = optimize_acq(UCB, m, m1, fixed, num_f) + + # convert back... + xt = xt * (np.max(base_vals, axis=0) - np.min(base_vals, axis=0)) + np.min( + base_vals, axis=0) + + xt = xt.astype(np.float32) + return (xt) + + +def explore(data, bounds, current, base, old, config): + """Returns next hyperparameter configuration to use. + + This function primarily processes the data from completed trials + and then requests the next config from the select_config function. + It then adds the new trial to the dataframe, so that the reward change + can be computed using the new weights. + It returns the new point and the dataframe with the new entry. + """ + + df = data.sort_values(by="Time").reset_index(drop=True) + + # Group by trial ID and hyperparams. + # Compute change in timesteps and reward. + df["y"] = df.groupby(["Trial"] + list(bounds.keys()))["Reward"].diff() + df["t_change"] = df.groupby(["Trial"] + list(bounds.keys()))["Time"].diff() + + # Delete entries without positive change in t. + df = df[df["t_change"] > 0].reset_index(drop=True) + df["R_before"] = df.Reward - df.y + + # Normalize the reward change by the update size. + # For example if trials took diff lengths of time. + df["y"] = df.y / df.t_change + df = df[~df.y.isna()].reset_index(drop=True) + df = df.sort_values(by="Time").reset_index(drop=True) + + # Only use the last 1k datapoints, so the GP is not too slow. + df = df.iloc[-1000:, :].reset_index(drop=True) + + # We need this to know the T and Reward for the weights. + dfnewpoint = df[df["Trial"] == str(base)] + + if not dfnewpoint.empty: + # N ow specify the dataset for the GP. + y = np.array(df.y.values) + # Meta data we keep -> episodes and reward. + # (TODO: convert to curve) + t_r = df[["Time", "R_before"]] + hparams = df[bounds.keys()] + X = pd.concat([t_r, hparams], axis=1).values + newpoint = df[df["Trial"] == str(base)].iloc[-1, :][[ + "Time", "R_before" + ]].values + new = select_config( + X, y, current, newpoint, bounds, num_f=len(t_r.columns)) + + new_config = config.copy() + values = [] + for i, col in enumerate(hparams.columns): + if isinstance(config[col], int): + new_config[col] = int(new[i]) + values.append(int(new[i])) + else: + new_config[col] = new[i] + values.append(new[i]) + + new_T = df[df["Trial"] == str(base)].iloc[-1, :]["Time"] + new_Reward = df[df["Trial"] == str(base)].iloc[-1, :].Reward + + lst = [[old] + [new_T] + values + [new_Reward]] + cols = ["Trial", "Time"] + list(bounds) + ["Reward"] + new_entry = pd.DataFrame(lst, columns=cols) + + # Create an entry for the new config, with the reward from the + # copied agent. + data = pd.concat([data, new_entry]).reset_index(drop=True) + + else: + new_config = config.copy() + + return new_config, data + + +class PB2(PopulationBasedTraining): + """Implements the Population Based Bandit (PB2) algorithm. + + PB2 trains a group of models (or agents) in parallel. Periodically, poorly + performing models clone the state of the top performers, and the hyper- + parameters are re-selected using GP-bandit optimization. The GP model is + trained to predict the improvement in the next training period. + + Like PBT, PB2 adapts hyperparameters during training time. This enables + very fast hyperparameter discovery and also automatically discovers + schedules. + + This Tune PB2 implementation is built on top of Tune's PBT implementation. + It considers all trials added as part of the PB2 population. If the number + of trials exceeds the cluster capacity, they will be time-multiplexed as to + balance training progress across the population. To run multiple trials, + use `tune.run(num_samples=)`. + + In {LOG_DIR}/{MY_EXPERIMENT_NAME}/, all mutations are logged in + `pb2_global.txt` and individual policy perturbations are recorded + in pb2_policy_{i}.txt. Tune logs: [target trial tag, clone trial tag, + target trial iteration, clone trial iteration, old config, new config] + on each perturbation step. + + Args: + time_attr (str): The training result attr to use for comparing time. + Note that you can pass in something non-temporal such as + `training_iteration` as a measure of progress, the only requirement + is that the attribute should increase monotonically. + metric (str): The training result objective value attribute. Stopping + procedures will use this attribute. + mode (str): One of {min, max}. Determines whether objective is + minimizing or maximizing the metric attribute. + perturbation_interval (float): Models will be considered for + perturbation at this interval of `time_attr`. Note that + perturbation incurs checkpoint overhead, so you shouldn't set this + to be too frequent. + hyperparam_bounds (dict): Hyperparameters to mutate. The format is + as follows: for each key, enter a list of the form [min, max] + representing the minimum and maximum possible hyperparam values. + quantile_fraction (float): Parameters are transferred from the top + `quantile_fraction` fraction of trials to the bottom + `quantile_fraction` fraction. Needs to be between 0 and 0.5. + Setting it to 0 essentially implies doing no exploitation at all. + log_config (bool): Whether to log the ray config of each model to + local_dir at each exploit. Allows config schedule to be + reconstructed. + require_attrs (bool): Whether to require time_attr and metric to appear + in result for every iteration. If True, error will be raised + if these values are not present in trial result. + synch (bool): If False, will use asynchronous implementation of + PBT. Trial perturbations occur every perturbation_interval for each + trial independently. If True, will use synchronous implementation + of PBT. Perturbations will occur only after all trials are + synced at the same time_attr every perturbation_interval. + Defaults to False. See Appendix A.1 here + https://arxiv.org/pdf/1711.09846.pdf. + + Example: + >>> pb2 = PB2( + >>> time_attr="timesteps_total", + >>> metric="episode_reward_mean", + >>> mode="max", + >>> perturbation_interval=10000, + >>> hyperparam_mutations={ + >>> # These must be continuous, currently a limitation. + >>> "factor_1": lambda: random.uniform(0.0, 20.0), + >>> }) + >>> tune.run({...}, num_samples=8, scheduler=pb2) + """ + + def __init__(self, + time_attr: str = "time_total_s", + reward_attr: Optional[str] = None, + metric: Optional[str] = None, + mode: Optional[str] = None, + perturbation_interval: float = 60.0, + hyperparam_bounds: Dict = None, + quantile_fraction: float = 0.25, + log_config: bool = True, + require_attrs: bool = True, + synch: bool = False): + + if not is_gpy_available(): + raise RuntimeError("Please install GPy to use PB2.") + + if not is_sklearn_available(): + raise RuntimeError("Please install scikit-learn to use PB2.") + + hyperparam_bounds = hyperparam_bounds or {} + for value in hyperparam_bounds.values(): + if not isinstance(value, (list, tuple)) or len(value) != 2: + raise ValueError("`hyperparam_bounds` values must either be " + "a list or tuple of size 2, but got {} " + "instead".format(value)) + + if not hyperparam_bounds: + raise TuneError("`hyperparam_bounds` must be specified to use " + "PB2 scheduler.") + + super(PB2, self).__init__( + time_attr=time_attr, + reward_attr=reward_attr, + metric=metric, + mode=mode, + perturbation_interval=perturbation_interval, + hyperparam_mutations=hyperparam_bounds, + quantile_fraction=quantile_fraction, + resample_probability=0, + custom_explore_fn=explore, + log_config=log_config, + require_attrs=require_attrs, + synch=synch) + + self.last_exploration_time = 0 # when we last explored + self.data = pd.DataFrame() + + self._hyperparam_bounds = hyperparam_bounds + + # Current = trials running that have already re-started after reaching + # the checkpoint. When exploring we care if these trials + # are already in or scheduled to be in the next round. + self.current = None + + def _save_trial_state(self, state, time, result, trial): + + score = super(PB2, self)._save_trial_state(state, time, result, trial) + + # Data logging for PB2. + + # Collect hyperparams names and current values for this trial. + names = [] + values = [] + for key in self._hyperparam_bounds: + names.append(str(key)) + values.append(trial.config[key]) + + # Store trial state and hyperparams in dataframe. + # this needs to be made more general. + lst = [[trial, result[self._time_attr]] + values + [score]] + cols = ["Trial", "Time"] + names + ["Reward"] + entry = pd.DataFrame(lst, columns=cols) + + self.data = pd.concat([self.data, entry]).reset_index(drop=True) + self.data.Trial = self.data.Trial.astype("str") + + def _get_new_config(self, trial, trial_to_clone): + # If we are at a new timestep, we dont want to penalise for trials + # still going. + if self.data["Time"].max() > self.last_exploration_time: + self.current = None + + new_config, data = explore(self.data, self._hyperparam_bounds, + self.current, trial_to_clone, trial, + trial_to_clone.config) + + # Important to replace the old values, since we are copying across + self.data = data.copy() + + # If the current guy being selecting is at a point that is already + # done, then append the data to the "current" which contains the + # points in the current batch. + new = [new_config[key] for key in self._hyperparam_bounds] + + new = np.array(new) + new = new.reshape(1, new.size) + if self.data["Time"].max() > self.last_exploration_time: + self.last_exploration_time = self.data["Time"].max() + self.current = new.copy() + else: + self.current = np.concatenate((self.current, new), axis=0) + logger.debug(self.current) + + return (new_config) diff --git a/python/ray/tune/schedulers/pb2_utils.py b/python/ray/tune/schedulers/pb2_utils.py new file mode 100644 index 000000000..929aaed15 --- /dev/null +++ b/python/ray/tune/schedulers/pb2_utils.py @@ -0,0 +1,191 @@ +import numpy as np +from scipy.optimize import minimize + +from ray.tune.schedulers.pb2 import is_gpy_available, is_sklearn_available + +if is_gpy_available(): + import GPy + from GPy.kern import Kern + from GPy.core import Param + +if is_sklearn_available(): + from sklearn.metrics import pairwise_distances + from sklearn.metrics.pairwise import euclidean_distances + + +class TV_SquaredExp(Kern): + """ Time varying squared exponential kernel. + For more info see the TV-GP-UCB paper: + http://proceedings.mlr.press/v51/bogunovic16.pdf + """ + + def __init__(self, + input_dim, + variance=1., + lengthscale=1., + epsilon=0., + active_dims=None): + super().__init__(input_dim, active_dims, "time_se") + self.variance = Param("variance", variance) + self.lengthscale = Param("lengthscale", lengthscale) + self.epsilon = Param("epsilon", epsilon) + self.link_parameters(self.variance, self.lengthscale, self.epsilon) + + def K(self, X, X2): + # time must be in the far left column + if self.epsilon > 0.5: # 0.5 + self.epsilon = 0.5 + if X2 is None: + X2 = np.copy(X) + T1 = X[:, 0].reshape(-1, 1) + T2 = X2[:, 0].reshape(-1, 1) + dists = pairwise_distances(T1, T2, "cityblock") + timekernel = (1 - self.epsilon)**(0.5 * dists) + + X = X[:, 1:] + X2 = X2[:, 1:] + + RBF = self.variance * np.exp( + -np.square(euclidean_distances(X, X2)) / self.lengthscale) + + return RBF * timekernel + + def Kdiag(self, X): + return self.variance * np.ones(X.shape[0]) + + def update_gradients_full(self, dL_dK, X, X2): + if X2 is None: + X2 = np.copy(X) + T1 = X[:, 0].reshape(-1, 1) + T2 = X2[:, 0].reshape(-1, 1) + + X = X[:, 1:] + X2 = X2[:, 1:] + dist2 = np.square(euclidean_distances(X, X2)) / self.lengthscale + + dvar = np.exp(-np.square( + (euclidean_distances(X, X2)) / self.lengthscale)) + dl = -(2 * euclidean_distances(X, X2)**2 * self.variance * + np.exp(-dist2)) * self.lengthscale**(-2) + n = pairwise_distances(T1, T2, "cityblock") / 2 + deps = -n * (1 - self.epsilon)**(n - 1) + + self.variance.gradient = np.sum(dvar * dL_dK) + self.lengthscale.gradient = np.sum(dl * dL_dK) + self.epsilon.gradient = np.sum(deps * dL_dK) + + +def normalize(data, wrt): + """ Normalize data to be in range (0,1), with respect to (wrt) boundaries, + which can be specified. + """ + return (data - np.min(wrt, axis=0)) / ( + np.max(wrt, axis=0) - np.min(wrt, axis=0)) + + +def standardize(data): + """ Standardize to be Gaussian N(0,1). Clip final values. + """ + data = (data - np.mean(data, axis=0)) / (np.std(data, axis=0) + 1e-8) + return np.clip(data, -2, 2) + + +def UCB(m, m1, x, fixed, kappa=0.5): + """ UCB acquisition function. Interesting points to note: + 1) We concat with the fixed points, because we are not optimizing wrt + these. This is the Reward and Time, which we can't change. We want + to find the best hyperparameters *given* the reward and time. + 2) We use m to get the mean and m1 to get the variance. If we already + have trials running, then m1 contains this information. This reduces + the variance at points currently running, even if we don't have + their label. + Ref: https://jmlr.org/papers/volume15/desautels14a/desautels14a.pdf + + """ + + c1 = 0.2 + c2 = 0.4 + beta_t = c1 * np.log(c2 * m.X.shape[0]) + kappa = np.sqrt(beta_t) + + xtest = np.concatenate((fixed.reshape(-1, 1), np.array(x).reshape(-1, + 1))).T + + try: + preds = m.predict(xtest) + preds = m.predict(xtest) + mean = preds[0][0][0] + except ValueError: + mean = -9999 + + try: + preds = m1.predict(xtest) + var = preds[1][0][0] + except ValueError: + var = 0 + return mean + kappa * var + + +def optimize_acq(func, m, m1, fixed, num_f): + """ Optimize acquisition function.""" + + opts = {"maxiter": 200, "maxfun": 200, "disp": False} + + T = 10 + best_value = -999 + best_theta = m1.X[0, :] + + bounds = [(0, 1) for _ in range(m.X.shape[1] - num_f)] + + for ii in range(T): + x0 = np.random.uniform(0, 1, m.X.shape[1] - num_f) + + res = minimize( + lambda x: -func(m, m1, x, fixed), + x0, + bounds=bounds, + method="L-BFGS-B", + options=opts) + + val = func(m, m1, res.x, fixed) + if val > best_value: + best_value = val + best_theta = res.x + + return (np.clip(best_theta, 0, 1)) + + +def select_length(Xraw, yraw, bounds, num_f): + """Select the number of datapoints to keep, using cross validation + """ + min_len = 200 + + if Xraw.shape[0] < min_len: + return (Xraw.shape[0]) + else: + length = min_len - 10 + scores = [] + while length + 10 <= Xraw.shape[0]: + length += 10 + + base_vals = np.array(list(bounds.values())).T + X_len = Xraw[-length:, :] + y_len = yraw[-length:] + oldpoints = X_len[:, :num_f] + old_lims = np.concatenate((np.max(oldpoints, axis=0), + np.min(oldpoints, axis=0))).reshape( + 2, oldpoints.shape[1]) + limits = np.concatenate((old_lims, base_vals), axis=1) + + X = normalize(X_len, limits) + y = standardize(y_len).reshape(y_len.size, 1) + + kernel = TV_SquaredExp( + input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1) + m = GPy.models.GPRegression(X, y, kernel) + m.optimize(messages=True) + + scores.append(m.log_likelihood()) + idx = np.argmax(scores) + length = (idx + int((min_len / 10))) * 10 + return (length) diff --git a/python/ray/tune/schedulers/pbt.py b/python/ray/tune/schedulers/pbt.py index d1bd6c286..4df3dbbb3 100644 --- a/python/ray/tune/schedulers/pbt.py +++ b/python/ray/tune/schedulers/pbt.py @@ -243,10 +243,10 @@ class PopulationBasedTraining(FIFOScheduler): "You must use other built in primitives like" "tune.uniform, tune.loguniform, etc.") - if not hyperparam_mutations and not custom_explore_fn: - raise TuneError( - "You must specify at least one of `hyperparam_mutations` or " - "`custom_explore_fn` to use PBT.") + if not hyperparam_mutations and not custom_explore_fn: + raise TuneError( + "You must specify at least one of `hyperparam_mutations` " + "or `custom_explore_fn` to use PBT.") if quantile_fraction > 0.5 or quantile_fraction < 0: raise ValueError( @@ -378,11 +378,7 @@ class PopulationBasedTraining(FIFOScheduler): if time - state.last_perturbation_time < self._perturbation_interval: return TrialScheduler.CONTINUE # avoid checkpoint overhead - # This trial has reached its perturbation interval - score = self._metric_op * result[self._metric] - state.last_score = score - state.last_train_time = time - state.last_result = result + self._save_trial_state(state, time, result, trial) if not self._synch: state.last_perturbation_time = time @@ -433,6 +429,25 @@ class PopulationBasedTraining(FIFOScheduler): # the paused trials. return TrialScheduler.PAUSE + def _save_trial_state(self, state: PBTTrialState, time: int, result: Dict, + trial: Trial): + """Saves necessary trial information when result is received. + Args: + state (PBTTrialState): The state object for the trial. + time (int): The current timestep of the trial. + result (dict): The trial's result dictionary. + trial (dict): The trial object. + """ + + # This trial has reached its perturbation interval. + # Record new state in the state object. + score = self._metric_op * result[self._metric] + state.last_score = score + state.last_train_time = time + state.last_result = result + + return score + def _perturb_trial( self, trial: Trial, trial_runner: "trial_runner.TrialRunner", upper_quantile: List[Trial], lower_quantile: List[Trial]): @@ -457,6 +472,10 @@ class PopulationBasedTraining(FIFOScheduler): logger.debug("Trial {} is in lower quantile".format(trial)) trial_to_clone = random.choice(upper_quantile) assert trial is not trial_to_clone + if not self._trial_state[trial_to_clone].last_checkpoint: + logger.info("[pbt]: no checkpoint for trial." + " Skip exploit for Trial {}".format(trial)) + return self._exploit(trial_runner.trial_executor, trial, trial_to_clone) def _log_config_on_step(self, trial_state: PBTTrialState, @@ -492,6 +511,11 @@ class PopulationBasedTraining(FIFOScheduler): with open(trial_path, "a+") as f: f.write(json.dumps(policy, cls=_SafeFallbackEncoder) + "\n") + def _get_new_config(self, trial, trial_to_clone): + """Gets new config for trial by exploring trial_to_clone's config.""" + return explore(trial_to_clone.config, self._hyperparam_mutations, + self._resample_probability, self._custom_explore_fn) + def _exploit(self, trial_executor: "trial_executor.TrialExecutor", trial: Trial, trial_to_clone: Trial): """Transfers perturbed state from trial_to_clone -> trial. @@ -500,17 +524,13 @@ class PopulationBasedTraining(FIFOScheduler): """ trial_state = self._trial_state[trial] new_state = self._trial_state[trial_to_clone] - if not new_state.last_checkpoint: - logger.info("[pbt]: no checkpoint for trial." - " Skip exploit for Trial {}".format(trial)) - return - new_config = explore(trial_to_clone.config, self._hyperparam_mutations, - self._resample_probability, - self._custom_explore_fn) logger.info("[exploit] transferring weights from trial " "{} (score {}) -> {} (score {})".format( trial_to_clone, new_state.last_score, trial, trial_state.last_score)) + + new_config = self._get_new_config(trial, trial_to_clone) + # Only log mutated hyperparameters and not entire config. old_hparams = { k: v diff --git a/python/requirements_tune.txt b/python/requirements_tune.txt index 50979f55e..e4f8d7c39 100644 --- a/python/requirements_tune.txt +++ b/python/requirements_tune.txt @@ -4,6 +4,7 @@ ConfigSpace==0.4.10 dragonfly-opt gluoncv gym[atari] +GPy h5py hpbandster hyperopt==0.1.2 @@ -19,6 +20,7 @@ optuna pytest-remotedata>=0.3.1 pytorch-lightning pytorch-lightning-bolts +scikit-learn scikit-optimize sigopt smart_open