From 7248d5f4ae340fbcc68b0d346656e842d78f511b Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Tue, 3 Nov 2020 21:05:00 -0800 Subject: [PATCH] Revert "[tune] PB2 (#11466)" (#11795) This reverts commit e7aafd7d24fdbac9a9440293ec2022ade066ebca. --- doc/source/tune/api_docs/schedulers.rst | 40 +- doc/source/tune/examples/index.rst | 1 - doc/source/tune/examples/pb2_example.rst | 6 - doc/source/tune/examples/pb2_ppo_example.rst | 6 - python/ray/tune/BUILD | 11 +- python/ray/tune/examples/pb2_example.py | 45 --- python/ray/tune/examples/pb2_ppo_example.py | 145 ------- python/ray/tune/schedulers/__init__.py | 4 +- python/ray/tune/schedulers/pb2.py | 376 ------------------- python/ray/tune/schedulers/pb2_utils.py | 191 ---------- python/ray/tune/schedulers/pbt.py | 52 +-- python/requirements_tune.txt | 2 - 12 files changed, 20 insertions(+), 859 deletions(-) delete mode 100644 doc/source/tune/examples/pb2_example.rst delete mode 100644 doc/source/tune/examples/pb2_ppo_example.rst delete mode 100644 python/ray/tune/examples/pb2_example.py delete mode 100644 python/ray/tune/examples/pb2_ppo_example.py delete mode 100644 python/ray/tune/schedulers/pb2.py delete mode 100644 python/ray/tune/schedulers/pb2_utils.py diff --git a/doc/source/tune/api_docs/schedulers.rst b/doc/source/tune/api_docs/schedulers.rst index ee7cd5542..f904a0b82 100644 --- a/doc/source/tune/api_docs/schedulers.rst +++ b/doc/source/tune/api_docs/schedulers.rst @@ -16,7 +16,7 @@ All Trial Schedulers take in a ``metric``, which is a value returned in the resu Summary ------- -Tune includes distributed implementations of early stopping algorithms such as `Median Stopping Rule `__, `HyperBand `__, and `ASHA `__. Tune also includes a distributed implementation of `Population Based Training (PBT) `__ and `Population Based Bandits (PB2) `__. +Tune includes distributed implementations of early stopping algorithms such as `Median Stopping Rule `__, `HyperBand `__, and `ASHA `__. Tune also includes a distributed implementation of `Population Based Training (PBT) `__. .. tip:: The easiest scheduler to start with is the ``ASHAScheduler`` which will aggressively terminate low-performing trials. @@ -48,11 +48,7 @@ When using schedulers, you may face compatibility issues, as shown in the below * - :ref:`Population Based Training ` - Yes - Not Compatible - - :doc:`Link ` - * - :ref:`Population Based Bandits ` - - Yes - - Not Compatible - - :doc:`Basic Example `, :doc:`PPO example ` + - :doc:`Link ` .. _tune-scheduler-hyperband: @@ -176,38 +172,6 @@ replay utility in practice. .. autoclass:: ray.tune.schedulers.PopulationBasedTrainingReplay - -.. _tune-scheduler-pb2: - -Population Based Bandits (PB2) (tune.schedulers.PB2) -------------------------------------------------------------------- - -Tune includes a distributed implementation of `Population Based Bandits (PB2) `__. This can be enabled by setting the ``scheduler`` parameter of ``tune.run``, e.g. - -.. code-block:: python - - pb2_scheduler = PB2( - time_attr='time_total_s', - metric='mean_accuracy', - mode='max', - perturbation_interval=600.0, - hyperparam_bounds={ - "lr": [1e-3, 1e-5], - "alpha": [0.0, 1.0], - ... - }) - tune.run( ... , scheduler=pb2_scheduler) - -This code builds upon PBT, with the main difference being that instead of using random perturbations, PB2 selects new hyperparameter configurations using a Gaussian Process model. - -When the PB2 scheduler is enabled, each trial variant is treated as a member of the population. Periodically, top-performing trials are checkpointed (this requires your Trainable to support :ref:`save and restore `). Low-performing trials clone the checkpoints of top performers and perturb the configurations in the hope of discovering an even better variation. - -The primary motivation for PB2 is the ability to find promising hyperparamters with only a small population size. With that in mind, you can run this :doc:`PB2 PPO example ` to compare PB2 vs. PBT, with a population size of ``4`` (as in the paper). The example uses the ``BipedalWalker`` environment so does not require any additional licenses. - - -.. autoclass:: ray.tune.schedulers.PB2 - - .. _tune-scheduler-bohb: BOHB (tune.schedulers.HyperBandForBOHB) diff --git a/doc/source/tune/examples/index.rst b/doc/source/tune/examples/index.rst index 3808e80c3..aeddc7347 100644 --- a/doc/source/tune/examples/index.rst +++ b/doc/source/tune/examples/index.rst @@ -18,7 +18,6 @@ General Examples - :doc:`/tune/examples/pbt_example`: Example of using a Trainable class with PopulationBasedTraining scheduler. - :doc:`/tune/examples/pbt_function`: Example of using the function API with a PopulationBasedTraining scheduler. - :doc:`/tune/examples/pbt_ppo_example`: Example of optimizing a distributed RLlib algorithm (PPO) with the PopulationBasedTraining scheduler. -- :doc:`/tune/examples/pb2_ppo_example`: Example of optimizing a distributed RLlib algorithm (PPO) with the PB2 scheduler. Uses a small population size of 4, so can train on a laptop. - :doc:`/tune/examples/logging_example`: Example of custom loggers and custom trial directory naming. Search Algorithm Examples diff --git a/doc/source/tune/examples/pb2_example.rst b/doc/source/tune/examples/pb2_example.rst deleted file mode 100644 index e40926c7c..000000000 --- a/doc/source/tune/examples/pb2_example.rst +++ /dev/null @@ -1,6 +0,0 @@ -:orphan: - -pb2_example -~~~~~~~~~~~ - -.. literalinclude:: /../../python/ray/tune/examples/pb2_example.py \ No newline at end of file diff --git a/doc/source/tune/examples/pb2_ppo_example.rst b/doc/source/tune/examples/pb2_ppo_example.rst deleted file mode 100644 index f36903c6f..000000000 --- a/doc/source/tune/examples/pb2_ppo_example.rst +++ /dev/null @@ -1,6 +0,0 @@ -:orphan: - -pb2_ppo_example -~~~~~~~~~~~~~~~ - -.. literalinclude:: /../../python/ray/tune/examples/pb2_ppo_example.py diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index b9c7e36b5..e86348f16 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -80,7 +80,7 @@ py_test( py_test( name = "test_experiment_analysis_mem", - size = "medium", + size = "small", srcs = ["tests/test_experiment_analysis_mem.py"], deps = [":tune_lib"], ) @@ -520,15 +520,6 @@ py_test( args = ["--smoke-test"] ) -py_test( - name = "pb2_example", - size = "medium", - srcs = ["examples/pb2_example.py"], - deps = [":tune_lib"], - tags = ["exclusive", "example"], - args = ["--smoke-test"] -) - py_test( name = "pbt_convnet_example", size = "small", diff --git a/python/ray/tune/examples/pb2_example.py b/python/ray/tune/examples/pb2_example.py deleted file mode 100644 index 927daca3a..000000000 --- a/python/ray/tune/examples/pb2_example.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python - -import argparse - -import ray -from ray import tune -from ray.tune.schedulers import PB2 -from ray.tune.examples.pbt_function import pbt_function - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--smoke-test", action="store_true", help="Finish quickly for testing") - args, _ = parser.parse_known_args() - if args.smoke_test: - ray.init(num_cpus=2) # force pausing to happen for test - else: - ray.init() - - pbt = PB2( - time_attr="training_iteration", - metric="mean_accuracy", - mode="max", - perturbation_interval=20, - hyperparam_bounds={ - # hyperparameter bounds. - "lr": [0.0001, 0.02], - }) - - tune.run( - pbt_function, - name="pbt_test", - scheduler=pbt, - verbose=False, - stop={ - "training_iteration": 30, - }, - num_samples=8, - fail_fast=True, - config={ - "lr": 0.0001, - # note: this parameter is perturbed but has no effect on - # the model training in this example - "some_other_factor": 1, - }) diff --git a/python/ray/tune/examples/pb2_ppo_example.py b/python/ray/tune/examples/pb2_ppo_example.py deleted file mode 100644 index f2c4268c7..000000000 --- a/python/ray/tune/examples/pb2_ppo_example.py +++ /dev/null @@ -1,145 +0,0 @@ -import os -import random -import argparse -import pandas as pd -from datetime import datetime - -import ray -from ray.tune import run, sample_from -from ray.tune.schedulers import PB2, PopulationBasedTraining - - -# Postprocess the perturbed config to ensure it's still valid used if PBT. -def explore(config): - # Ensure we collect enough timesteps to do sgd. - if config["train_batch_size"] < config["sgd_minibatch_size"] * 2: - config["train_batch_size"] = config["sgd_minibatch_size"] * 2 - # Ensure we run at least one sgd iter. - if config["lambda"] > 1: - config["lambda"] = 1 - config["train_batch_size"] = int(config["train_batch_size"]) - return config - - -if __name__ == "__main__": - - parser = argparse.ArgumentParser() - parser.add_argument("--max", type=int, default=1000000) - parser.add_argument("--algo", type=str, default="PPO") - parser.add_argument("--num_workers", type=int, default=4) - parser.add_argument("--num_samples", type=int, default=4) - parser.add_argument("--t_ready", type=int, default=50000) - parser.add_argument("--seed", type=int, default=0) - parser.add_argument( - "--horizon", type=int, default=1600) # make this 1000 for other envs - parser.add_argument("--perturb", type=float, default=0.25) # if using PBT - parser.add_argument("--env_name", type=str, default="BipedalWalker-v2") - parser.add_argument( - "--criteria", type=str, - default="timesteps_total") # "training_iteration", "time_total_s" - parser.add_argument( - "--net", type=str, default="32_32" - ) # May be important to use a larger network for bigger tasks. - parser.add_argument("--filename", type=str, default="") - parser.add_argument("--method", type=str, default="pb2") # ['pbt', 'pb2'] - parser.add_argument("--save_csv", type=bool, default=False) - - args = parser.parse_args() - ray.init() - - # bipedalwalker needs 1600 - if args.env_name in ["BipedalWalker-v2", "BipedalWalker-v3"]: - horizon = 1600 - else: - horizon = 1000 - - pbt = PopulationBasedTraining( - time_attr=args.criteria, - metric="episode_reward_mean", - mode="max", - perturbation_interval=args.t_ready, - resample_probability=args.perturb, - quantile_fraction=args.perturb, # copy bottom % with top % - # Specifies the search space for these hyperparams - hyperparam_mutations={ - "lambda": lambda: random.uniform(0.9, 1.0), - "clip_param": lambda: random.uniform(0.1, 0.5), - "lr": lambda: random.uniform(1e-3, 1e-5), - "train_batch_size": lambda: random.randint(1000, 60000), - }, - custom_explore_fn=explore) - - pb2 = PB2( - time_attr=args.criteria, - metric="episode_reward_mean", - mode="max", - perturbation_interval=args.t_ready, - quantile_fraction=args.perturb, # copy bottom % with top % - # Specifies the hyperparam search space - hyperparam_bounds={ - "lambda": [0.9, 1.0], - "clip_param": [0.1, 0.5], - "lr": [1e-3, 1e-5], - "train_batch_size": [1000, 60000] - }) - - methods = {"pbt": pbt, "pb2": pb2} - - timelog = str(datetime.date(datetime.now())) + "_" + str( - datetime.time(datetime.now())) - - args.dir = "{}_{}_{}_Size{}_{}_{}".format(args.algo, - args.filename, args.method, - str(args.num_samples), - args.env_name, args.criteria) - - analysis = run( - args.algo, - name="{}_{}_{}_seed{}_{}".format(timelog, args.method, args.env_name, - str(args.seed), args.filename), - scheduler=methods[args.method], - verbose=1, - num_samples=args.num_samples, - stop={args.criteria: args.max}, - config={ - "env": args.env_name, - "log_level": "INFO", - "seed": args.seed, - "kl_coeff": 1.0, - "num_gpus": 0, - "horizon": horizon, - "observation_filter": "MeanStdFilter", - "model": { - "fcnet_hiddens": [ - int(args.net.split("_")[0]), - int(args.net.split("_")[1]) - ], - "free_log_std": True - }, - "num_sgd_iter": 10, - "sgd_minibatch_size": 128, - "lambda": sample_from(lambda spec: random.uniform(0.9, 1.0)), - "clip_param": sample_from(lambda spec: random.uniform(0.1, 0.5)), - "lr": sample_from(lambda spec: random.uniform(1e-3, 1e-5)), - "train_batch_size": sample_from( - lambda spec: random.randint(1000, 60000)) - }) - - all_dfs = analysis.trial_dataframes - names = list(all_dfs.keys()) - - results = pd.DataFrame() - for i in range(args.num_samples): - df = all_dfs[names[i]] - df = df[[ - "timesteps_total", "episodes_total", "episode_reward_mean", - "info/learner/default_policy/cur_kl_coeff" - ]] - df["Agent"] = i - results = pd.concat([results, df]).reset_index(drop=True) - - if args.save_csv: - if not (os.path.exists("data/" + args.dir)): - os.makedirs("data/" + args.dir) - - results.to_csv("data/{}/seed{}.csv".format(args.dir, str(args.seed))) diff --git a/python/ray/tune/schedulers/__init__.py b/python/ray/tune/schedulers/__init__.py index 9f163eb82..3bbb8cc70 100644 --- a/python/ray/tune/schedulers/__init__.py +++ b/python/ray/tune/schedulers/__init__.py @@ -6,7 +6,6 @@ from ray.tune.schedulers.async_hyperband import (AsyncHyperBandScheduler, from ray.tune.schedulers.median_stopping_rule import MedianStoppingRule from ray.tune.schedulers.pbt import (PopulationBasedTraining, PopulationBasedTrainingReplay) -from ray.tune.schedulers.pb2 import PB2 def create_scheduler( @@ -38,7 +37,6 @@ def create_scheduler( "hb_bohb": HyperBandForBOHB, "pbt": PopulationBasedTraining, "pbt_replay": PopulationBasedTrainingReplay, - "pb2": PB2, } scheduler = scheduler.lower() if scheduler not in SCHEDULER_IMPORT: @@ -54,5 +52,5 @@ __all__ = [ "TrialScheduler", "HyperBandScheduler", "AsyncHyperBandScheduler", "ASHAScheduler", "MedianStoppingRule", "FIFOScheduler", "PopulationBasedTraining", "PopulationBasedTrainingReplay", - "HyperBandForBOHB", "PB2" + "HyperBandForBOHB" ] diff --git a/python/ray/tune/schedulers/pb2.py b/python/ray/tune/schedulers/pb2.py deleted file mode 100644 index afee10092..000000000 --- a/python/ray/tune/schedulers/pb2.py +++ /dev/null @@ -1,376 +0,0 @@ -from copy import deepcopy -import logging -from typing import Dict, Optional - -import numpy as np -import pandas as pd -from ray.tune import TuneError - -from ray.tune.schedulers import PopulationBasedTraining - -try: - import GPy - _has_gpy = True -except ImportError: - _has_gpy = False - -try: - import sklearn # noqa: F401 - _has_sklearn = True -except ImportError: - _has_sklearn = False - - -def is_gpy_available(): - return _has_gpy - - -def is_sklearn_available(): - return _has_sklearn - - -if is_gpy_available(): - from ray.tune.schedulers.pb2_utils import normalize, optimize_acq, \ - select_length, UCB, standardize, TV_SquaredExp - -logger = logging.getLogger(__name__) - - -def select_config(Xraw, yraw, current, newpoint, bounds, num_f): - """Selects the next hyperparameter config to try. - - This function takes the formatted data, fits the GP model and optimizes the - UCB acquisition function to select the next point. - - Args: - Xraw (np.array): The un-normalized array of hyperparams, Time and - Reward - yraw (np.array): The un-normalized vector of reward changes. - current (list): The hyperparams of trials currently running. This is - important so we do not select the same config twice. If there is - data here then we fit a second GP including it - (with fake y labels). The GP variance doesn't depend on the y - labels so it is ok. - newpoint (np.array): The Reward and Time for the new point. - We cannot change these as they are based on the *new weights*. - bounds (dict): Bounds for the hyperparameters. Used to normalize. - num_f (int): The number of fixed params. Almost always 2 (reward+time) - - Return: - xt (np.array): A vector of new hyperparameters. - """ - length = select_length(Xraw, yraw, bounds, num_f) - - Xraw = Xraw[-length:, :] - yraw = yraw[-length:] - - base_vals = np.array(list(bounds.values())).T - oldpoints = Xraw[:, :num_f] - old_lims = np.concatenate((np.max(oldpoints, axis=0), - np.min(oldpoints, axis=0))).reshape( - 2, oldpoints.shape[1]) - limits = np.concatenate((old_lims, base_vals), axis=1) - - X = normalize(Xraw, limits) - y = standardize(yraw).reshape(yraw.size, 1) - - fixed = normalize(newpoint, oldpoints) - - kernel = TV_SquaredExp( - input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1) - - try: - m = GPy.models.GPRegression(X, y, kernel) - except np.linalg.LinAlgError: - # add diagonal ** we would ideally make this something more robust... - X += np.eye(X.shape[0]) * 1e-3 - m = GPy.models.GPRegression(X, y, kernel) - - try: - m.optimize() - except np.linalg.LinAlgError: - # add diagonal ** we would ideally make this something more robust... - X += np.eye(X.shape[0]) * 1e-3 - m = GPy.models.GPRegression(X, y, kernel) - m.optimize() - - m.kern.lengthscale.fix(m.kern.lengthscale.clip(1e-5, 1)) - - if current is None: - m1 = deepcopy(m) - else: - # add the current trials to the dataset - padding = np.array([fixed for _ in range(current.shape[0])]) - current = normalize(current, base_vals) - current = np.hstack((padding, current)) - - Xnew = np.vstack((X, current)) - ypad = np.zeros(current.shape[0]) - ypad = ypad.reshape(-1, 1) - ynew = np.vstack((y, ypad)) - - # kernel = GPy.kern.RBF(input_dim=X.shape[1], variance=1., - # lengthscale=1.) - kernel = TV_SquaredExp( - input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1) - m1 = GPy.models.GPRegression(Xnew, ynew, kernel) - m1.optimize() - - xt = optimize_acq(UCB, m, m1, fixed, num_f) - - # convert back... - xt = xt * (np.max(base_vals, axis=0) - np.min(base_vals, axis=0)) + np.min( - base_vals, axis=0) - - xt = xt.astype(np.float32) - return (xt) - - -def explore(data, bounds, current, base, old, config): - """Returns next hyperparameter configuration to use. - - This function primarily processes the data from completed trials - and then requests the next config from the select_config function. - It then adds the new trial to the dataframe, so that the reward change - can be computed using the new weights. - It returns the new point and the dataframe with the new entry. - """ - - df = data.sort_values(by="Time").reset_index(drop=True) - - # Group by trial ID and hyperparams. - # Compute change in timesteps and reward. - df["y"] = df.groupby(["Trial"] + list(bounds.keys()))["Reward"].diff() - df["t_change"] = df.groupby(["Trial"] + list(bounds.keys()))["Time"].diff() - - # Delete entries without positive change in t. - df = df[df["t_change"] > 0].reset_index(drop=True) - df["R_before"] = df.Reward - df.y - - # Normalize the reward change by the update size. - # For example if trials took diff lengths of time. - df["y"] = df.y / df.t_change - df = df[~df.y.isna()].reset_index(drop=True) - df = df.sort_values(by="Time").reset_index(drop=True) - - # Only use the last 1k datapoints, so the GP is not too slow. - df = df.iloc[-1000:, :].reset_index(drop=True) - - # We need this to know the T and Reward for the weights. - dfnewpoint = df[df["Trial"] == str(base)] - - if not dfnewpoint.empty: - # N ow specify the dataset for the GP. - y = np.array(df.y.values) - # Meta data we keep -> episodes and reward. - # (TODO: convert to curve) - t_r = df[["Time", "R_before"]] - hparams = df[bounds.keys()] - X = pd.concat([t_r, hparams], axis=1).values - newpoint = df[df["Trial"] == str(base)].iloc[-1, :][[ - "Time", "R_before" - ]].values - new = select_config( - X, y, current, newpoint, bounds, num_f=len(t_r.columns)) - - new_config = config.copy() - values = [] - for i, col in enumerate(hparams.columns): - if isinstance(config[col], int): - new_config[col] = int(new[i]) - values.append(int(new[i])) - else: - new_config[col] = new[i] - values.append(new[i]) - - new_T = df[df["Trial"] == str(base)].iloc[-1, :]["Time"] - new_Reward = df[df["Trial"] == str(base)].iloc[-1, :].Reward - - lst = [[old] + [new_T] + values + [new_Reward]] - cols = ["Trial", "Time"] + list(bounds) + ["Reward"] - new_entry = pd.DataFrame(lst, columns=cols) - - # Create an entry for the new config, with the reward from the - # copied agent. - data = pd.concat([data, new_entry]).reset_index(drop=True) - - else: - new_config = config.copy() - - return new_config, data - - -class PB2(PopulationBasedTraining): - """Implements the Population Based Bandit (PB2) algorithm. - - PB2 trains a group of models (or agents) in parallel. Periodically, poorly - performing models clone the state of the top performers, and the hyper- - parameters are re-selected using GP-bandit optimization. The GP model is - trained to predict the improvement in the next training period. - - Like PBT, PB2 adapts hyperparameters during training time. This enables - very fast hyperparameter discovery and also automatically discovers - schedules. - - This Tune PB2 implementation is built on top of Tune's PBT implementation. - It considers all trials added as part of the PB2 population. If the number - of trials exceeds the cluster capacity, they will be time-multiplexed as to - balance training progress across the population. To run multiple trials, - use `tune.run(num_samples=)`. - - In {LOG_DIR}/{MY_EXPERIMENT_NAME}/, all mutations are logged in - `pb2_global.txt` and individual policy perturbations are recorded - in pb2_policy_{i}.txt. Tune logs: [target trial tag, clone trial tag, - target trial iteration, clone trial iteration, old config, new config] - on each perturbation step. - - Args: - time_attr (str): The training result attr to use for comparing time. - Note that you can pass in something non-temporal such as - `training_iteration` as a measure of progress, the only requirement - is that the attribute should increase monotonically. - metric (str): The training result objective value attribute. Stopping - procedures will use this attribute. - mode (str): One of {min, max}. Determines whether objective is - minimizing or maximizing the metric attribute. - perturbation_interval (float): Models will be considered for - perturbation at this interval of `time_attr`. Note that - perturbation incurs checkpoint overhead, so you shouldn't set this - to be too frequent. - hyperparam_bounds (dict): Hyperparameters to mutate. The format is - as follows: for each key, enter a list of the form [min, max] - representing the minimum and maximum possible hyperparam values. - quantile_fraction (float): Parameters are transferred from the top - `quantile_fraction` fraction of trials to the bottom - `quantile_fraction` fraction. Needs to be between 0 and 0.5. - Setting it to 0 essentially implies doing no exploitation at all. - log_config (bool): Whether to log the ray config of each model to - local_dir at each exploit. Allows config schedule to be - reconstructed. - require_attrs (bool): Whether to require time_attr and metric to appear - in result for every iteration. If True, error will be raised - if these values are not present in trial result. - synch (bool): If False, will use asynchronous implementation of - PBT. Trial perturbations occur every perturbation_interval for each - trial independently. If True, will use synchronous implementation - of PBT. Perturbations will occur only after all trials are - synced at the same time_attr every perturbation_interval. - Defaults to False. See Appendix A.1 here - https://arxiv.org/pdf/1711.09846.pdf. - - Example: - >>> pb2 = PB2( - >>> time_attr="timesteps_total", - >>> metric="episode_reward_mean", - >>> mode="max", - >>> perturbation_interval=10000, - >>> hyperparam_mutations={ - >>> # These must be continuous, currently a limitation. - >>> "factor_1": lambda: random.uniform(0.0, 20.0), - >>> }) - >>> tune.run({...}, num_samples=8, scheduler=pb2) - """ - - def __init__(self, - time_attr: str = "time_total_s", - reward_attr: Optional[str] = None, - metric: Optional[str] = None, - mode: Optional[str] = None, - perturbation_interval: float = 60.0, - hyperparam_bounds: Dict = None, - quantile_fraction: float = 0.25, - log_config: bool = True, - require_attrs: bool = True, - synch: bool = False): - - if not is_gpy_available(): - raise RuntimeError("Please install GPy to use PB2.") - - if not is_sklearn_available(): - raise RuntimeError("Please install scikit-learn to use PB2.") - - hyperparam_bounds = hyperparam_bounds or {} - for value in hyperparam_bounds.values(): - if not isinstance(value, (list, tuple)) or len(value) != 2: - raise ValueError("`hyperparam_bounds` values must either be " - "a list or tuple of size 2, but got {} " - "instead".format(value)) - - if not hyperparam_bounds: - raise TuneError("`hyperparam_bounds` must be specified to use " - "PB2 scheduler.") - - super(PB2, self).__init__( - time_attr=time_attr, - reward_attr=reward_attr, - metric=metric, - mode=mode, - perturbation_interval=perturbation_interval, - hyperparam_mutations=hyperparam_bounds, - quantile_fraction=quantile_fraction, - resample_probability=0, - custom_explore_fn=explore, - log_config=log_config, - require_attrs=require_attrs, - synch=synch) - - self.last_exploration_time = 0 # when we last explored - self.data = pd.DataFrame() - - self._hyperparam_bounds = hyperparam_bounds - - # Current = trials running that have already re-started after reaching - # the checkpoint. When exploring we care if these trials - # are already in or scheduled to be in the next round. - self.current = None - - def _save_trial_state(self, state, time, result, trial): - - score = super(PB2, self)._save_trial_state(state, time, result, trial) - - # Data logging for PB2. - - # Collect hyperparams names and current values for this trial. - names = [] - values = [] - for key in self._hyperparam_bounds: - names.append(str(key)) - values.append(trial.config[key]) - - # Store trial state and hyperparams in dataframe. - # this needs to be made more general. - lst = [[trial, result[self._time_attr]] + values + [score]] - cols = ["Trial", "Time"] + names + ["Reward"] - entry = pd.DataFrame(lst, columns=cols) - - self.data = pd.concat([self.data, entry]).reset_index(drop=True) - self.data.Trial = self.data.Trial.astype("str") - - def _get_new_config(self, trial, trial_to_clone): - # If we are at a new timestep, we dont want to penalise for trials - # still going. - if self.data["Time"].max() > self.last_exploration_time: - self.current = None - - new_config, data = explore(self.data, self._hyperparam_bounds, - self.current, trial_to_clone, trial, - trial_to_clone.config) - - # Important to replace the old values, since we are copying across - self.data = data.copy() - - # If the current guy being selecting is at a point that is already - # done, then append the data to the "current" which contains the - # points in the current batch. - new = [new_config[key] for key in self._hyperparam_bounds] - - new = np.array(new) - new = new.reshape(1, new.size) - if self.data["Time"].max() > self.last_exploration_time: - self.last_exploration_time = self.data["Time"].max() - self.current = new.copy() - else: - self.current = np.concatenate((self.current, new), axis=0) - logger.debug(self.current) - - return (new_config) diff --git a/python/ray/tune/schedulers/pb2_utils.py b/python/ray/tune/schedulers/pb2_utils.py deleted file mode 100644 index 929aaed15..000000000 --- a/python/ray/tune/schedulers/pb2_utils.py +++ /dev/null @@ -1,191 +0,0 @@ -import numpy as np -from scipy.optimize import minimize - -from ray.tune.schedulers.pb2 import is_gpy_available, is_sklearn_available - -if is_gpy_available(): - import GPy - from GPy.kern import Kern - from GPy.core import Param - -if is_sklearn_available(): - from sklearn.metrics import pairwise_distances - from sklearn.metrics.pairwise import euclidean_distances - - -class TV_SquaredExp(Kern): - """ Time varying squared exponential kernel. - For more info see the TV-GP-UCB paper: - http://proceedings.mlr.press/v51/bogunovic16.pdf - """ - - def __init__(self, - input_dim, - variance=1., - lengthscale=1., - epsilon=0., - active_dims=None): - super().__init__(input_dim, active_dims, "time_se") - self.variance = Param("variance", variance) - self.lengthscale = Param("lengthscale", lengthscale) - self.epsilon = Param("epsilon", epsilon) - self.link_parameters(self.variance, self.lengthscale, self.epsilon) - - def K(self, X, X2): - # time must be in the far left column - if self.epsilon > 0.5: # 0.5 - self.epsilon = 0.5 - if X2 is None: - X2 = np.copy(X) - T1 = X[:, 0].reshape(-1, 1) - T2 = X2[:, 0].reshape(-1, 1) - dists = pairwise_distances(T1, T2, "cityblock") - timekernel = (1 - self.epsilon)**(0.5 * dists) - - X = X[:, 1:] - X2 = X2[:, 1:] - - RBF = self.variance * np.exp( - -np.square(euclidean_distances(X, X2)) / self.lengthscale) - - return RBF * timekernel - - def Kdiag(self, X): - return self.variance * np.ones(X.shape[0]) - - def update_gradients_full(self, dL_dK, X, X2): - if X2 is None: - X2 = np.copy(X) - T1 = X[:, 0].reshape(-1, 1) - T2 = X2[:, 0].reshape(-1, 1) - - X = X[:, 1:] - X2 = X2[:, 1:] - dist2 = np.square(euclidean_distances(X, X2)) / self.lengthscale - - dvar = np.exp(-np.square( - (euclidean_distances(X, X2)) / self.lengthscale)) - dl = -(2 * euclidean_distances(X, X2)**2 * self.variance * - np.exp(-dist2)) * self.lengthscale**(-2) - n = pairwise_distances(T1, T2, "cityblock") / 2 - deps = -n * (1 - self.epsilon)**(n - 1) - - self.variance.gradient = np.sum(dvar * dL_dK) - self.lengthscale.gradient = np.sum(dl * dL_dK) - self.epsilon.gradient = np.sum(deps * dL_dK) - - -def normalize(data, wrt): - """ Normalize data to be in range (0,1), with respect to (wrt) boundaries, - which can be specified. - """ - return (data - np.min(wrt, axis=0)) / ( - np.max(wrt, axis=0) - np.min(wrt, axis=0)) - - -def standardize(data): - """ Standardize to be Gaussian N(0,1). Clip final values. - """ - data = (data - np.mean(data, axis=0)) / (np.std(data, axis=0) + 1e-8) - return np.clip(data, -2, 2) - - -def UCB(m, m1, x, fixed, kappa=0.5): - """ UCB acquisition function. Interesting points to note: - 1) We concat with the fixed points, because we are not optimizing wrt - these. This is the Reward and Time, which we can't change. We want - to find the best hyperparameters *given* the reward and time. - 2) We use m to get the mean and m1 to get the variance. If we already - have trials running, then m1 contains this information. This reduces - the variance at points currently running, even if we don't have - their label. - Ref: https://jmlr.org/papers/volume15/desautels14a/desautels14a.pdf - - """ - - c1 = 0.2 - c2 = 0.4 - beta_t = c1 * np.log(c2 * m.X.shape[0]) - kappa = np.sqrt(beta_t) - - xtest = np.concatenate((fixed.reshape(-1, 1), np.array(x).reshape(-1, - 1))).T - - try: - preds = m.predict(xtest) - preds = m.predict(xtest) - mean = preds[0][0][0] - except ValueError: - mean = -9999 - - try: - preds = m1.predict(xtest) - var = preds[1][0][0] - except ValueError: - var = 0 - return mean + kappa * var - - -def optimize_acq(func, m, m1, fixed, num_f): - """ Optimize acquisition function.""" - - opts = {"maxiter": 200, "maxfun": 200, "disp": False} - - T = 10 - best_value = -999 - best_theta = m1.X[0, :] - - bounds = [(0, 1) for _ in range(m.X.shape[1] - num_f)] - - for ii in range(T): - x0 = np.random.uniform(0, 1, m.X.shape[1] - num_f) - - res = minimize( - lambda x: -func(m, m1, x, fixed), - x0, - bounds=bounds, - method="L-BFGS-B", - options=opts) - - val = func(m, m1, res.x, fixed) - if val > best_value: - best_value = val - best_theta = res.x - - return (np.clip(best_theta, 0, 1)) - - -def select_length(Xraw, yraw, bounds, num_f): - """Select the number of datapoints to keep, using cross validation - """ - min_len = 200 - - if Xraw.shape[0] < min_len: - return (Xraw.shape[0]) - else: - length = min_len - 10 - scores = [] - while length + 10 <= Xraw.shape[0]: - length += 10 - - base_vals = np.array(list(bounds.values())).T - X_len = Xraw[-length:, :] - y_len = yraw[-length:] - oldpoints = X_len[:, :num_f] - old_lims = np.concatenate((np.max(oldpoints, axis=0), - np.min(oldpoints, axis=0))).reshape( - 2, oldpoints.shape[1]) - limits = np.concatenate((old_lims, base_vals), axis=1) - - X = normalize(X_len, limits) - y = standardize(y_len).reshape(y_len.size, 1) - - kernel = TV_SquaredExp( - input_dim=X.shape[1], variance=1., lengthscale=1., epsilon=0.1) - m = GPy.models.GPRegression(X, y, kernel) - m.optimize(messages=True) - - scores.append(m.log_likelihood()) - idx = np.argmax(scores) - length = (idx + int((min_len / 10))) * 10 - return (length) diff --git a/python/ray/tune/schedulers/pbt.py b/python/ray/tune/schedulers/pbt.py index 24a0d97cc..e5b5f0ff4 100644 --- a/python/ray/tune/schedulers/pbt.py +++ b/python/ray/tune/schedulers/pbt.py @@ -243,10 +243,10 @@ class PopulationBasedTraining(FIFOScheduler): "You must use other built in primitives like" "tune.uniform, tune.loguniform, etc.") - if not hyperparam_mutations and not custom_explore_fn: - raise TuneError( - "You must specify at least one of `hyperparam_mutations` " - "or `custom_explore_fn` to use PBT.") + if not hyperparam_mutations and not custom_explore_fn: + raise TuneError( + "You must specify at least one of `hyperparam_mutations` or " + "`custom_explore_fn` to use PBT.") if quantile_fraction > 0.5 or quantile_fraction < 0: raise ValueError( @@ -378,7 +378,11 @@ class PopulationBasedTraining(FIFOScheduler): if time - state.last_perturbation_time < self._perturbation_interval: return TrialScheduler.CONTINUE # avoid checkpoint overhead - self._save_trial_state(state, time, result, trial) + # This trial has reached its perturbation interval + score = self._metric_op * result[self._metric] + state.last_score = score + state.last_train_time = time + state.last_result = result if not self._synch: state.last_perturbation_time = time @@ -429,25 +433,6 @@ class PopulationBasedTraining(FIFOScheduler): # the paused trials. return TrialScheduler.PAUSE - def _save_trial_state(self, state: PBTTrialState, time: int, result: Dict, - trial: Trial): - """Saves necessary trial information when result is received. - Args: - state (PBTTrialState): The state object for the trial. - time (int): The current timestep of the trial. - result (dict): The trial's result dictionary. - trial (dict): The trial object. - """ - - # This trial has reached its perturbation interval. - # Record new state in the state object. - score = self._metric_op * result[self._metric] - state.last_score = score - state.last_train_time = time - state.last_result = result - - return score - def _perturb_trial( self, trial: Trial, trial_runner: "trial_runner.TrialRunner", upper_quantile: List[Trial], lower_quantile: List[Trial]): @@ -472,10 +457,6 @@ class PopulationBasedTraining(FIFOScheduler): logger.debug("Trial {} is in lower quantile".format(trial)) trial_to_clone = random.choice(upper_quantile) assert trial is not trial_to_clone - if not self._trial_state[trial_to_clone].last_checkpoint: - logger.info("[pbt]: no checkpoint for trial." - " Skip exploit for Trial {}".format(trial)) - return self._exploit(trial_runner.trial_executor, trial, trial_to_clone) def _log_config_on_step(self, trial_state: PBTTrialState, @@ -511,11 +492,6 @@ class PopulationBasedTraining(FIFOScheduler): with open(trial_path, "a+") as f: f.write(json.dumps(policy, cls=SafeFallbackEncoder) + "\n") - def _get_new_config(self, trial, trial_to_clone): - """Gets new config for trial by exploring trial_to_clone's config.""" - return explore(trial_to_clone.config, self._hyperparam_mutations, - self._resample_probability, self._custom_explore_fn) - def _exploit(self, trial_executor: "trial_executor.TrialExecutor", trial: Trial, trial_to_clone: Trial): """Transfers perturbed state from trial_to_clone -> trial. @@ -524,13 +500,17 @@ class PopulationBasedTraining(FIFOScheduler): """ trial_state = self._trial_state[trial] new_state = self._trial_state[trial_to_clone] + if not new_state.last_checkpoint: + logger.info("[pbt]: no checkpoint for trial." + " Skip exploit for Trial {}".format(trial)) + return + new_config = explore(trial_to_clone.config, self._hyperparam_mutations, + self._resample_probability, + self._custom_explore_fn) logger.info("[exploit] transferring weights from trial " "{} (score {}) -> {} (score {})".format( trial_to_clone, new_state.last_score, trial, trial_state.last_score)) - - new_config = self._get_new_config(trial, trial_to_clone) - # Only log mutated hyperparameters and not entire config. old_hparams = { k: v diff --git a/python/requirements_tune.txt b/python/requirements_tune.txt index c87eafa9b..9c31bd3a7 100644 --- a/python/requirements_tune.txt +++ b/python/requirements_tune.txt @@ -4,7 +4,6 @@ ConfigSpace==0.4.10 dragonfly-opt gluoncv gym[atari] -GPy h5py hpbandster hyperopt==0.1.2 @@ -20,7 +19,6 @@ optuna pytest-remotedata>=0.3.1 pytorch-lightning==1.0.3 pytorch-lightning-bolts -scikit-learn scikit-optimize sigopt smart_open