[tune] Add command line support for choosing early stopping schedulers (#1209)

* command line support

* add checkpoint freq

* fix other flags

* fix

* docs

* doc
This commit is contained in:
Eric Liang
2017-11-12 12:05:18 -08:00
committed by Richard Liaw
parent afdc87323f
commit 7c38f964b7
11 changed files with 218 additions and 101 deletions
+30 -1
View File
@@ -142,7 +142,10 @@ class Agent(object):
start = time.time()
result = self._train()
self._iteration += 1
time_this_iter = time.time() - start
if result.time_this_iter_s is not None:
time_this_iter = result.time_this_iter_s
else:
time_this_iter = time.time() - start
assert result.timesteps_this_iter is not None
@@ -340,6 +343,30 @@ class _MockAgent(Agent):
return self.info
class _SigmoidFakeData(_MockAgent):
"""Agent that returns sigmoid learning curves.
This can be helpful for evaluating early stopping algorithms."""
_agent_name = "SigmoidFakeData"
_default_config = {
"width": 100,
"height": 100,
"offset": 0,
"iter_time": 10,
"iter_timesteps": 1,
}
def _train(self):
i = max(0, self.iteration - self.config["offset"])
v = np.tanh(float(i) / self.config["width"])
v *= self.config["height"]
return TrainingResult(
episode_reward_mean=v, episode_len_mean=v,
timesteps_this_iter=self.config["iter_timesteps"],
time_this_iter_s=self.config["iter_time"], info={})
def get_agent_class(alg):
"""Returns the class of an known agent given its name."""
@@ -360,6 +387,8 @@ def get_agent_class(alg):
return script_runner.ScriptRunner
elif alg == "__fake":
return _MockAgent
elif alg == "__sigmoid_fake_data":
return _SigmoidFakeData
else:
raise Exception(
("Unknown algorithm {}, check --alg argument. Valid choices " +
+12 -3
View File
@@ -9,7 +9,7 @@ import sys
import yaml
from ray.tune.config_parser import make_parser, resources_to_json
from ray.tune.tune import run_experiments
from ray.tune.tune import make_scheduler, run_experiments
EXAMPLE_USAGE = """
@@ -18,6 +18,8 @@ Training example:
Grid search example:
./train.py -f tuned_examples/cartpole-grid-search-example.yaml
Note that -f overrides all other trial-specific command-line options.
"""
@@ -33,6 +35,8 @@ parser.add_argument("--num-cpus", default=None, type=int,
help="Number of CPUs to allocate to Ray.")
parser.add_argument("--num-gpus", default=None, type=int,
help="Number of GPUs to allocate to Ray.")
parser.add_argument("--experiment-name", default="default", type=str,
help="Name of experiment dir.")
parser.add_argument("-f", "--config-file", default=None, type=str,
help="If specified, use config options from this file.")
@@ -43,15 +47,19 @@ if __name__ == "__main__":
with open(args.config_file) as f:
experiments = yaml.load(f)
else:
# Note: keep this in sync with tune/config_parser.py
experiments = {
"default": { # i.e. log to /tmp/ray/default
args.experiment_name: { # i.e. log to /tmp/ray/default
"alg": args.alg,
"checkpoint_freq": args.checkpoint_freq,
"local_dir": args.local_dir,
"env": args.env,
"resources": resources_to_json(args.resources),
"stop": args.stop,
"config": args.config,
"restore": args.restore,
"repeat": args.repeat,
"upload_dir": args.upload_dir,
}
}
@@ -62,5 +70,6 @@ if __name__ == "__main__":
parser.error("the following arguments are required: --env")
run_experiments(
experiments, redis_address=args.redis_address,
experiments, scheduler=make_scheduler(args),
redis_address=args.redis_address,
num_cpus=args.num_cpus, num_gpus=args.num_gpus)
+7 -2
View File
@@ -31,6 +31,7 @@ def make_parser(**kwargs):
parser = argparse.ArgumentParser(**kwargs)
# Note: keep this in sync with rllib/train.py
parser.add_argument("--alg", default=None, type=str,
help="The learning algorithm to train.")
parser.add_argument("--stop", default="{}", type=json.loads,
@@ -44,10 +45,14 @@ def make_parser(**kwargs):
help="Number of times to repeat each trial.")
parser.add_argument("--local-dir", default="/tmp/ray", type=str,
help="Local dir to save training results to.")
parser.add_argument("--upload-dir", default=None, type=str,
parser.add_argument("--upload-dir", default="", type=str,
help="URI to upload training results to.")
parser.add_argument("--checkpoint-freq", default=None, type=int,
parser.add_argument("--checkpoint-freq", default=0, type=int,
help="How many iterations between checkpoints.")
parser.add_argument("--scheduler", default="FIFO", type=str,
help="FIFO, MedianStopping, or HyperBand")
parser.add_argument("--scheduler-config", default="{}", type=json.loads,
help="Config options to pass to the scheduler.")
# Note: this currently only makes sense when running a single trial
parser.add_argument("--restore", default=None, type=str,
+1 -1
View File
@@ -32,7 +32,7 @@ class HyperBandScheduler(FIFOScheduler):
and band and will spill over to new brackets/bands accordingly.
"""
def __init__(self, max_iter, eta=3):
def __init__(self, max_iter=200, eta=3):
"""
args:
max_iter (int): maximum iterations per configuration
+100
View File
@@ -0,0 +1,100 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import numpy as np
from ray.tune.trial_scheduler import FIFOScheduler, TrialScheduler
class MedianStoppingRule(FIFOScheduler):
"""Implements the median stopping rule as described in the Vizier paper:
https://research.google.com/pubs/pub46180.html
Args:
time_attr (str): The TrainingResult attr to use for comparing time.
Note that you can pass in something non-temporal such as
`training_iteration` as a measure of progress, the only requirement
is that the attribute should increase monotonically.
reward_attr (str): The TrainingResult objective value attribute. As
with `time_attr`, this may refer to any objective value that
is supposed to increase with time.
grace_period (float): Only stop trials at least this old in time.
The units are the same as the attribute named by `time_attr`.
min_samples_required (int): Min samples to compute median over.
hard_stop (bool): If false, pauses trials instead of stopping
them. When all other trials are complete, paused trials will be
resumed and allowed to run FIFO.
"""
def __init__(
self, time_attr='time_total_s', reward_attr='episode_reward_mean',
grace_period=60.0, min_samples_required=3, hard_stop=True):
FIFOScheduler.__init__(self)
self._stopped_trials = set()
self._completed_trials = set()
self._results = collections.defaultdict(list)
self._grace_period = grace_period
self._min_samples_required = min_samples_required
self._reward_attr = reward_attr
self._time_attr = time_attr
self._hard_stop = hard_stop
def on_trial_result(self, trial_runner, trial, result):
"""Callback for early stopping.
This stopping rule stops a running trial if the trial's best objective
value by step `t` is strictly worse than the median of the running
averages of all completed trials' objectives reported up to step `t`.
"""
if trial in self._stopped_trials:
assert not self._hard_stop
return TrialScheduler.CONTINUE # fall back to FIFO
time = getattr(result, self._time_attr)
self._results[trial].append(result)
median_result = self._get_median_result(time)
best_result = self._best_result(trial)
print("Trial {} best res={} vs median res={} at t={}".format(
trial, best_result, median_result, time))
if best_result < median_result and time > self._grace_period:
print("MedianStoppingRule: early stopping {}".format(trial))
self._stopped_trials.add(trial)
if self._hard_stop:
return TrialScheduler.STOP
else:
return TrialScheduler.PAUSE
else:
return TrialScheduler.CONTINUE
def on_trial_complete(self, trial_runner, trial, result):
self._results[trial].append(result)
self._completed_trials.add(trial)
def debug_string(self):
return "Using MedianStoppingRule: num_stopped={}.".format(
len(self._stopped_trials))
def _get_median_result(self, time):
scores = []
for trial in self._completed_trials:
scores.append(self._running_result(trial, time))
if len(scores) >= self._min_samples_required:
return np.median(scores)
else:
return float('-inf')
def _running_result(self, trial, t_max=float('inf')):
results = self._results[trial]
# TODO(ekl) we could do interpolation to be more precise, but for now
# assume len(results) is large and the time diffs are roughly equal
return np.mean(
[getattr(r, self._reward_attr)
for r in results if getattr(r, self._time_attr) <= t_max])
def _best_result(self, trial):
results = self._results[trial]
return max([getattr(r, self._reward_attr) for r in results])
+2 -1
View File
@@ -46,7 +46,8 @@ TrainingResult = namedtuple("TrainingResult", [
# (Auto-filled) Number of timesteps in the simulator in this iteration.
"timesteps_this_iter",
# (Auto-filled) Time in seconds this iteration took to run.
# (Auto-filled) Time in seconds this iteration took to run. This may be
# overriden in order to override the system-computed time difference.
"time_this_iter_s",
# (Auto-filled) Accumulated time in seconds for this entire experiment.
+2 -2
View File
@@ -60,7 +60,7 @@ class Trial(object):
def __init__(
self, env_creator, alg, config={}, local_dir='/tmp/ray',
experiment_tag=None, resources=Resources(cpu=1, gpu=0),
stopping_criterion={}, checkpoint_freq=None,
stopping_criterion={}, checkpoint_freq=0,
restore_path=None, upload_dir=None):
"""Initialize a new trial.
@@ -179,7 +179,7 @@ class Trial(object):
def should_checkpoint(self):
"""Whether this trial is due for checkpointing."""
if self.checkpoint_freq is None:
if not self.checkpoint_freq:
return False
return self.last_result.training_iteration % self.checkpoint_freq == 0
+14
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import ray
import time
import traceback
@@ -42,9 +43,21 @@ class TrialRunner(object):
self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False
# For debugging, it may be useful to halt trials after some time has
# elapsed. TODO(ekl) consider exposing this in the API.
self._global_time_limit = float(
os.environ.get("TRIALRUNNER_WALLTIME_LIMIT", float('inf')))
self._total_time = 0
def is_finished(self):
"""Returns whether all trials have finished running."""
if self._total_time > self._global_time_limit:
print(
"Exceeded global time limit {} / {}".format(
self._total_time, self._global_time_limit))
return True
for t in self._trials:
if t.status in [Trial.PENDING, Trial.RUNNING, Trial.PAUSED]:
return False
@@ -148,6 +161,7 @@ class TrialRunner(object):
result = ray.get(result_id)
print("result", result)
trial.last_result = result
self._total_time += result.time_this_iter_s
if trial.should_stop(result):
self._scheduler_alg.on_trial_complete(self, trial, result)
+4 -85
View File
@@ -1,9 +1,6 @@
from __future__ import absolute_import
from __future__ import division
import collections
import numpy as np
from ray.tune.trial import Trial
@@ -73,89 +70,11 @@ class FIFOScheduler(TrialScheduler):
if (trial.status == Trial.PENDING and
trial_runner.has_resources(trial.resources)):
return trial
for trial in trial_runner.get_trials():
if (trial.status == Trial.PAUSED and
trial_runner.has_resources(trial.resources)):
return trial
return None
def debug_string(self):
return "Using FIFO scheduling algorithm."
# TODO(ekl) expose this in the command line API
class MedianStoppingRule(FIFOScheduler):
"""Implements the median stopping rule as described in the Vizier paper:
https://research.google.com/pubs/pub46180.html
Args:
time_attr (str): The TrainingResult attr to use for comparing time.
Note that you can pass in something non-temporal such as
`training_iteration` as a measure of progress, the only requirement
is that the attribute should increase monotonically.
reward_attr (str): The TrainingResult objective value attribute. As
with `time_attr`, this may refer to any objective value that
is supposed to increase with time.
grace_period (float): Only stop trials at least this old in time.
The units are the same as the attribute named by `time_attr`.
min_samples_required (int): Min samples to compute median over.
"""
def __init__(
self, time_attr='time_total_s', reward_attr='episode_reward_mean',
grace_period=60.0, min_samples_required=3):
FIFOScheduler.__init__(self)
self._completed_trials = set()
self._results = collections.defaultdict(list)
self._grace_period = grace_period
self._min_samples_required = min_samples_required
self._reward_attr = reward_attr
self._time_attr = time_attr
self._num_stopped = 0
def on_trial_result(self, trial_runner, trial, result):
"""Callback for early stopping.
This stopping rule stops a running trial if the trial's best objective
value by step `t` is strictly worse than the median of the running
averages of all completed trials' objectives reported up to step `t`.
"""
time = getattr(result, self._time_attr)
self._results[trial].append(result)
median_result = self._get_median_result(time)
best_result = self._best_result(trial)
print("Trial {} best res={} vs median res={} at t={}".format(
trial, best_result, median_result, time))
if best_result < median_result and time > self._grace_period:
print("MedianStoppingRule: early stopping {}".format(trial))
self._num_stopped += 1
return TrialScheduler.STOP
else:
return TrialScheduler.CONTINUE
def on_trial_complete(self, trial_runner, trial, result):
self._results[trial].append(result)
self._completed_trials.add(trial)
def debug_string(self):
return "Using MedianStoppingRule: num_stopped={}.".format(
self._num_stopped)
def _get_median_result(self, time):
scores = []
for trial in self._completed_trials:
scores.append(self._running_result(trial, time))
if len(scores) >= self._min_samples_required:
return np.median(scores)
else:
return float('-inf')
def _running_result(self, trial, t_max=float('inf')):
results = self._results[trial]
# TODO(ekl) we could do interpolation to be more precise, but for now
# assume len(results) is large and the time diffs are roughly equal
return np.mean(
[getattr(r, self._reward_attr)
for r in results if getattr(r, self._time_attr) <= t_max])
def _best_result(self, trial):
results = self._results[trial]
return max([getattr(r, self._reward_attr) for r in results])
+29 -4
View File
@@ -5,12 +5,16 @@ from __future__ import division
from __future__ import print_function
import argparse
import json
import sys
import yaml
import ray
from ray.tune.trial_runner import TrialRunner
from ray.tune.hyperband import HyperBandScheduler
from ray.tune.median_stopping_rule import MedianStoppingRule
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.trial_scheduler import FIFOScheduler
from ray.tune.variant_generator import generate_trials
@@ -32,12 +36,33 @@ parser.add_argument("--num-cpus", default=None, type=int,
help="Number of CPUs to allocate to Ray.")
parser.add_argument("--num-gpus", default=None, type=int,
help="Number of GPUs to allocate to Ray.")
parser.add_argument("--scheduler", default="FIFO", type=str,
help="FIFO, MedianStopping, or HyperBand")
parser.add_argument("--scheduler-config", default="{}", type=json.loads,
help="Config options to pass to the scheduler.")
parser.add_argument("-f", "--config-file", required=True, type=str,
help="Read experiment options from this JSON/YAML file.")
def run_experiments(experiments, **ray_args):
runner = TrialRunner()
SCHEDULERS = {
"FIFO": FIFOScheduler,
"MedianStopping": MedianStoppingRule,
"HyperBand": HyperBandScheduler,
}
def make_scheduler(args):
if args.scheduler in SCHEDULERS:
return SCHEDULERS[args.scheduler](**args.scheduler_config)
else:
assert False, "Unknown scheduler: {}, should be one of {}".format(
args.scheduler, SCHEDULERS.keys())
def run_experiments(experiments, scheduler=None, **ray_args):
if scheduler is None:
scheduler = make_scheduler(args)
runner = TrialRunner(scheduler)
for name, spec in experiments.items():
for trial in generate_trials(spec, name):
@@ -63,5 +88,5 @@ if __name__ == "__main__":
with open(args.config_file) as f:
experiments = yaml.load(f)
run_experiments(
experiments, redis_address=args.redis_address,
experiments, make_scheduler(args), redis_address=args.redis_address,
num_cpus=args.num_cpus, num_gpus=args.num_gpus)