diff --git a/python/ray/rllib/agent.py b/python/ray/rllib/agent.py index e7ba9ddd9..f1ac6573c 100644 --- a/python/ray/rllib/agent.py +++ b/python/ray/rllib/agent.py @@ -142,7 +142,10 @@ class Agent(object): start = time.time() result = self._train() self._iteration += 1 - time_this_iter = time.time() - start + if result.time_this_iter_s is not None: + time_this_iter = result.time_this_iter_s + else: + time_this_iter = time.time() - start assert result.timesteps_this_iter is not None @@ -340,6 +343,30 @@ class _MockAgent(Agent): return self.info +class _SigmoidFakeData(_MockAgent): + """Agent that returns sigmoid learning curves. + + This can be helpful for evaluating early stopping algorithms.""" + + _agent_name = "SigmoidFakeData" + _default_config = { + "width": 100, + "height": 100, + "offset": 0, + "iter_time": 10, + "iter_timesteps": 1, + } + + def _train(self): + i = max(0, self.iteration - self.config["offset"]) + v = np.tanh(float(i) / self.config["width"]) + v *= self.config["height"] + return TrainingResult( + episode_reward_mean=v, episode_len_mean=v, + timesteps_this_iter=self.config["iter_timesteps"], + time_this_iter_s=self.config["iter_time"], info={}) + + def get_agent_class(alg): """Returns the class of an known agent given its name.""" @@ -360,6 +387,8 @@ def get_agent_class(alg): return script_runner.ScriptRunner elif alg == "__fake": return _MockAgent + elif alg == "__sigmoid_fake_data": + return _SigmoidFakeData else: raise Exception( ("Unknown algorithm {}, check --alg argument. Valid choices " + diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index ced7876a1..17cf2f43d 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -9,7 +9,7 @@ import sys import yaml from ray.tune.config_parser import make_parser, resources_to_json -from ray.tune.tune import run_experiments +from ray.tune.tune import make_scheduler, run_experiments EXAMPLE_USAGE = """ @@ -18,6 +18,8 @@ Training example: Grid search example: ./train.py -f tuned_examples/cartpole-grid-search-example.yaml + +Note that -f overrides all other trial-specific command-line options. """ @@ -33,6 +35,8 @@ parser.add_argument("--num-cpus", default=None, type=int, help="Number of CPUs to allocate to Ray.") parser.add_argument("--num-gpus", default=None, type=int, help="Number of GPUs to allocate to Ray.") +parser.add_argument("--experiment-name", default="default", type=str, + help="Name of experiment dir.") parser.add_argument("-f", "--config-file", default=None, type=str, help="If specified, use config options from this file.") @@ -43,15 +47,19 @@ if __name__ == "__main__": with open(args.config_file) as f: experiments = yaml.load(f) else: + # Note: keep this in sync with tune/config_parser.py experiments = { - "default": { # i.e. log to /tmp/ray/default + args.experiment_name: { # i.e. log to /tmp/ray/default "alg": args.alg, + "checkpoint_freq": args.checkpoint_freq, + "local_dir": args.local_dir, "env": args.env, "resources": resources_to_json(args.resources), "stop": args.stop, "config": args.config, "restore": args.restore, "repeat": args.repeat, + "upload_dir": args.upload_dir, } } @@ -62,5 +70,6 @@ if __name__ == "__main__": parser.error("the following arguments are required: --env") run_experiments( - experiments, redis_address=args.redis_address, + experiments, scheduler=make_scheduler(args), + redis_address=args.redis_address, num_cpus=args.num_cpus, num_gpus=args.num_gpus) diff --git a/python/ray/tune/config_parser.py b/python/ray/tune/config_parser.py index c4c3daabe..a70309b02 100644 --- a/python/ray/tune/config_parser.py +++ b/python/ray/tune/config_parser.py @@ -31,6 +31,7 @@ def make_parser(**kwargs): parser = argparse.ArgumentParser(**kwargs) + # Note: keep this in sync with rllib/train.py parser.add_argument("--alg", default=None, type=str, help="The learning algorithm to train.") parser.add_argument("--stop", default="{}", type=json.loads, @@ -44,10 +45,14 @@ def make_parser(**kwargs): help="Number of times to repeat each trial.") parser.add_argument("--local-dir", default="/tmp/ray", type=str, help="Local dir to save training results to.") - parser.add_argument("--upload-dir", default=None, type=str, + parser.add_argument("--upload-dir", default="", type=str, help="URI to upload training results to.") - parser.add_argument("--checkpoint-freq", default=None, type=int, + parser.add_argument("--checkpoint-freq", default=0, type=int, help="How many iterations between checkpoints.") + parser.add_argument("--scheduler", default="FIFO", type=str, + help="FIFO, MedianStopping, or HyperBand") + parser.add_argument("--scheduler-config", default="{}", type=json.loads, + help="Config options to pass to the scheduler.") # Note: this currently only makes sense when running a single trial parser.add_argument("--restore", default=None, type=str, diff --git a/python/ray/tune/hyperband.py b/python/ray/tune/hyperband.py index 8b98bac81..6b545d318 100644 --- a/python/ray/tune/hyperband.py +++ b/python/ray/tune/hyperband.py @@ -32,7 +32,7 @@ class HyperBandScheduler(FIFOScheduler): and band and will spill over to new brackets/bands accordingly. """ - def __init__(self, max_iter, eta=3): + def __init__(self, max_iter=200, eta=3): """ args: max_iter (int): maximum iterations per configuration diff --git a/python/ray/tune/median_stopping_rule.py b/python/ray/tune/median_stopping_rule.py new file mode 100644 index 000000000..8dfc5ddb2 --- /dev/null +++ b/python/ray/tune/median_stopping_rule.py @@ -0,0 +1,100 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import numpy as np + +from ray.tune.trial_scheduler import FIFOScheduler, TrialScheduler + + +class MedianStoppingRule(FIFOScheduler): + """Implements the median stopping rule as described in the Vizier paper: + + https://research.google.com/pubs/pub46180.html + + Args: + time_attr (str): The TrainingResult attr to use for comparing time. + Note that you can pass in something non-temporal such as + `training_iteration` as a measure of progress, the only requirement + is that the attribute should increase monotonically. + reward_attr (str): The TrainingResult objective value attribute. As + with `time_attr`, this may refer to any objective value that + is supposed to increase with time. + grace_period (float): Only stop trials at least this old in time. + The units are the same as the attribute named by `time_attr`. + min_samples_required (int): Min samples to compute median over. + hard_stop (bool): If false, pauses trials instead of stopping + them. When all other trials are complete, paused trials will be + resumed and allowed to run FIFO. + """ + + def __init__( + self, time_attr='time_total_s', reward_attr='episode_reward_mean', + grace_period=60.0, min_samples_required=3, hard_stop=True): + FIFOScheduler.__init__(self) + self._stopped_trials = set() + self._completed_trials = set() + self._results = collections.defaultdict(list) + self._grace_period = grace_period + self._min_samples_required = min_samples_required + self._reward_attr = reward_attr + self._time_attr = time_attr + self._hard_stop = hard_stop + + def on_trial_result(self, trial_runner, trial, result): + """Callback for early stopping. + + This stopping rule stops a running trial if the trial's best objective + value by step `t` is strictly worse than the median of the running + averages of all completed trials' objectives reported up to step `t`. + """ + + if trial in self._stopped_trials: + assert not self._hard_stop + return TrialScheduler.CONTINUE # fall back to FIFO + + time = getattr(result, self._time_attr) + self._results[trial].append(result) + median_result = self._get_median_result(time) + best_result = self._best_result(trial) + print("Trial {} best res={} vs median res={} at t={}".format( + trial, best_result, median_result, time)) + if best_result < median_result and time > self._grace_period: + print("MedianStoppingRule: early stopping {}".format(trial)) + self._stopped_trials.add(trial) + if self._hard_stop: + return TrialScheduler.STOP + else: + return TrialScheduler.PAUSE + else: + return TrialScheduler.CONTINUE + + def on_trial_complete(self, trial_runner, trial, result): + self._results[trial].append(result) + self._completed_trials.add(trial) + + def debug_string(self): + return "Using MedianStoppingRule: num_stopped={}.".format( + len(self._stopped_trials)) + + def _get_median_result(self, time): + scores = [] + for trial in self._completed_trials: + scores.append(self._running_result(trial, time)) + if len(scores) >= self._min_samples_required: + return np.median(scores) + else: + return float('-inf') + + def _running_result(self, trial, t_max=float('inf')): + results = self._results[trial] + # TODO(ekl) we could do interpolation to be more precise, but for now + # assume len(results) is large and the time diffs are roughly equal + return np.mean( + [getattr(r, self._reward_attr) + for r in results if getattr(r, self._time_attr) <= t_max]) + + def _best_result(self, trial): + results = self._results[trial] + return max([getattr(r, self._reward_attr) for r in results]) diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index cc7a40e5e..02f3b4655 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -46,7 +46,8 @@ TrainingResult = namedtuple("TrainingResult", [ # (Auto-filled) Number of timesteps in the simulator in this iteration. "timesteps_this_iter", - # (Auto-filled) Time in seconds this iteration took to run. + # (Auto-filled) Time in seconds this iteration took to run. This may be + # overriden in order to override the system-computed time difference. "time_this_iter_s", # (Auto-filled) Accumulated time in seconds for this entire experiment. diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index c77d7a182..013cbc004 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -60,7 +60,7 @@ class Trial(object): def __init__( self, env_creator, alg, config={}, local_dir='/tmp/ray', experiment_tag=None, resources=Resources(cpu=1, gpu=0), - stopping_criterion={}, checkpoint_freq=None, + stopping_criterion={}, checkpoint_freq=0, restore_path=None, upload_dir=None): """Initialize a new trial. @@ -179,7 +179,7 @@ class Trial(object): def should_checkpoint(self): """Whether this trial is due for checkpointing.""" - if self.checkpoint_freq is None: + if not self.checkpoint_freq: return False return self.last_result.training_iteration % self.checkpoint_freq == 0 diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index e64de9e2d..2c8b2a385 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import os import ray import time import traceback @@ -42,9 +43,21 @@ class TrialRunner(object): self._committed_resources = Resources(cpu=0, gpu=0) self._resources_initialized = False + # For debugging, it may be useful to halt trials after some time has + # elapsed. TODO(ekl) consider exposing this in the API. + self._global_time_limit = float( + os.environ.get("TRIALRUNNER_WALLTIME_LIMIT", float('inf'))) + self._total_time = 0 + def is_finished(self): """Returns whether all trials have finished running.""" + if self._total_time > self._global_time_limit: + print( + "Exceeded global time limit {} / {}".format( + self._total_time, self._global_time_limit)) + return True + for t in self._trials: if t.status in [Trial.PENDING, Trial.RUNNING, Trial.PAUSED]: return False @@ -148,6 +161,7 @@ class TrialRunner(object): result = ray.get(result_id) print("result", result) trial.last_result = result + self._total_time += result.time_this_iter_s if trial.should_stop(result): self._scheduler_alg.on_trial_complete(self, trial, result) diff --git a/python/ray/tune/trial_scheduler.py b/python/ray/tune/trial_scheduler.py index d8d932ea1..7ac0d3dce 100644 --- a/python/ray/tune/trial_scheduler.py +++ b/python/ray/tune/trial_scheduler.py @@ -1,9 +1,6 @@ from __future__ import absolute_import from __future__ import division -import collections -import numpy as np - from ray.tune.trial import Trial @@ -73,89 +70,11 @@ class FIFOScheduler(TrialScheduler): if (trial.status == Trial.PENDING and trial_runner.has_resources(trial.resources)): return trial + for trial in trial_runner.get_trials(): + if (trial.status == Trial.PAUSED and + trial_runner.has_resources(trial.resources)): + return trial return None def debug_string(self): return "Using FIFO scheduling algorithm." - - -# TODO(ekl) expose this in the command line API -class MedianStoppingRule(FIFOScheduler): - """Implements the median stopping rule as described in the Vizier paper: - - https://research.google.com/pubs/pub46180.html - - Args: - time_attr (str): The TrainingResult attr to use for comparing time. - Note that you can pass in something non-temporal such as - `training_iteration` as a measure of progress, the only requirement - is that the attribute should increase monotonically. - reward_attr (str): The TrainingResult objective value attribute. As - with `time_attr`, this may refer to any objective value that - is supposed to increase with time. - grace_period (float): Only stop trials at least this old in time. - The units are the same as the attribute named by `time_attr`. - min_samples_required (int): Min samples to compute median over. - """ - - def __init__( - self, time_attr='time_total_s', reward_attr='episode_reward_mean', - grace_period=60.0, min_samples_required=3): - FIFOScheduler.__init__(self) - self._completed_trials = set() - self._results = collections.defaultdict(list) - self._grace_period = grace_period - self._min_samples_required = min_samples_required - self._reward_attr = reward_attr - self._time_attr = time_attr - self._num_stopped = 0 - - def on_trial_result(self, trial_runner, trial, result): - """Callback for early stopping. - - This stopping rule stops a running trial if the trial's best objective - value by step `t` is strictly worse than the median of the running - averages of all completed trials' objectives reported up to step `t`. - """ - - time = getattr(result, self._time_attr) - self._results[trial].append(result) - median_result = self._get_median_result(time) - best_result = self._best_result(trial) - print("Trial {} best res={} vs median res={} at t={}".format( - trial, best_result, median_result, time)) - if best_result < median_result and time > self._grace_period: - print("MedianStoppingRule: early stopping {}".format(trial)) - self._num_stopped += 1 - return TrialScheduler.STOP - else: - return TrialScheduler.CONTINUE - - def on_trial_complete(self, trial_runner, trial, result): - self._results[trial].append(result) - self._completed_trials.add(trial) - - def debug_string(self): - return "Using MedianStoppingRule: num_stopped={}.".format( - self._num_stopped) - - def _get_median_result(self, time): - scores = [] - for trial in self._completed_trials: - scores.append(self._running_result(trial, time)) - if len(scores) >= self._min_samples_required: - return np.median(scores) - else: - return float('-inf') - - def _running_result(self, trial, t_max=float('inf')): - results = self._results[trial] - # TODO(ekl) we could do interpolation to be more precise, but for now - # assume len(results) is large and the time diffs are roughly equal - return np.mean( - [getattr(r, self._reward_attr) - for r in results if getattr(r, self._time_attr) <= t_max]) - - def _best_result(self, trial): - results = self._results[trial] - return max([getattr(r, self._reward_attr) for r in results]) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index d5bfd6716..0a2fcaa24 100755 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -5,12 +5,16 @@ from __future__ import division from __future__ import print_function import argparse +import json import sys import yaml import ray -from ray.tune.trial_runner import TrialRunner +from ray.tune.hyperband import HyperBandScheduler +from ray.tune.median_stopping_rule import MedianStoppingRule from ray.tune.trial import Trial +from ray.tune.trial_runner import TrialRunner +from ray.tune.trial_scheduler import FIFOScheduler from ray.tune.variant_generator import generate_trials @@ -32,12 +36,33 @@ parser.add_argument("--num-cpus", default=None, type=int, help="Number of CPUs to allocate to Ray.") parser.add_argument("--num-gpus", default=None, type=int, help="Number of GPUs to allocate to Ray.") +parser.add_argument("--scheduler", default="FIFO", type=str, + help="FIFO, MedianStopping, or HyperBand") +parser.add_argument("--scheduler-config", default="{}", type=json.loads, + help="Config options to pass to the scheduler.") parser.add_argument("-f", "--config-file", required=True, type=str, help="Read experiment options from this JSON/YAML file.") -def run_experiments(experiments, **ray_args): - runner = TrialRunner() +SCHEDULERS = { + "FIFO": FIFOScheduler, + "MedianStopping": MedianStoppingRule, + "HyperBand": HyperBandScheduler, +} + + +def make_scheduler(args): + if args.scheduler in SCHEDULERS: + return SCHEDULERS[args.scheduler](**args.scheduler_config) + else: + assert False, "Unknown scheduler: {}, should be one of {}".format( + args.scheduler, SCHEDULERS.keys()) + + +def run_experiments(experiments, scheduler=None, **ray_args): + if scheduler is None: + scheduler = make_scheduler(args) + runner = TrialRunner(scheduler) for name, spec in experiments.items(): for trial in generate_trials(spec, name): @@ -63,5 +88,5 @@ if __name__ == "__main__": with open(args.config_file) as f: experiments = yaml.load(f) run_experiments( - experiments, redis_address=args.redis_address, + experiments, make_scheduler(args), redis_address=args.redis_address, num_cpus=args.num_cpus, num_gpus=args.num_gpus) diff --git a/test/trial_scheduler_test.py b/test/trial_scheduler_test.py index 1839e967e..38deab8c6 100644 --- a/test/trial_scheduler_test.py +++ b/test/trial_scheduler_test.py @@ -4,10 +4,11 @@ from __future__ import print_function import unittest +from ray.tune.hyperband import HyperBandScheduler +from ray.tune.median_stopping_rule import MedianStoppingRule from ray.tune.result import TrainingResult from ray.tune.trial import Trial -from ray.tune.trial_scheduler import MedianStoppingRule, TrialScheduler -from ray.tune.hyperband import HyperBandScheduler +from ray.tune.trial_scheduler import TrialScheduler def result(t, rew): @@ -95,6 +96,20 @@ class EarlyStoppingSuite(unittest.TestCase): rule.on_trial_result(None, t3, result(2, 260)), TrialScheduler.STOP) + def testMedianStoppingSoftStop(self): + rule = MedianStoppingRule( + grace_period=0, min_samples_required=1, hard_stop=False) + t1, t2 = self.basicSetup(rule) + rule.on_trial_complete(None, t1, result(10, 1000)) + rule.on_trial_complete(None, t2, result(10, 1000)) + t3 = Trial("t3", "PPO") + self.assertEqual( + rule.on_trial_result(None, t3, result(1, 260)), + TrialScheduler.CONTINUE) + self.assertEqual( + rule.on_trial_result(None, t3, result(2, 260)), + TrialScheduler.PAUSE) + def testAlternateMetrics(self): def result2(t, rew): return TrainingResult(training_iteration=t, neg_mean_loss=rew)