[tune] Improve the tune Python API and variant generation (#1154)

* new variant gen

* wip

* Sat Oct 21 18:21:34 PDT 2017

* update

* comment

* fix

* update

* update readme

* fix

* Update README.rst

* Update README.rst

* fix repeat

* update

* note on restore
This commit is contained in:
Eric Liang
2017-11-06 23:41:17 -08:00
committed by Richard Liaw
parent 6222ec3bd7
commit 52888e4c6f
18 changed files with 636 additions and 317 deletions
+39 -53
View File
@@ -1,80 +1,66 @@
#!/usr/bin/env python
"""The main command line interface to RLlib.
Arguments may either be specified on the command line or in JSON/YAML
files. Additionally, the file-based interface supports hyperparameter
exploration through grid or random search, though both interfaces allow
for the concurrent execution of multiple trials on Ray.
Single-trial example:
./train.py --alg=DQN --env=CartPole-v0
Hyperparameter grid search example:
./train.py -f tuned_examples/cartpole-grid-search-example.yaml
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import sys
import yaml
import ray
from ray.tune.config_parser import make_parser, parse_to_trials
from ray.tune.trial_scheduler import MedianStoppingRule
from ray.tune.trial_runner import TrialRunner
from ray.tune.trial import Trial
from ray.tune.config_parser import make_parser, resources_to_json
from ray.tune.tune import run_experiments
parser = make_parser("Train a reinforcement learning agent.")
EXAMPLE_USAGE = """
Training example:
./train.py --alg DQN --env CartPole-v0
# Extends the base parser defined in ray/tune/config_parser, to add some
# RLlib specific arguments. For more arguments, see the configuration
# defined there.
Grid search example:
./train.py -f tuned_examples/cartpole-grid-search-example.yaml
"""
parser = make_parser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Train a reinforcement learning agent.",
epilog=EXAMPLE_USAGE)
# See also the base parser definition in ray/tune/config_parser.py
parser.add_argument("--redis-address", default=None, type=str,
help="The Redis address of the cluster.")
parser.add_argument("--num-cpus", default=None, type=int,
help="Number of CPUs to allocate to Ray.")
parser.add_argument("--num-gpus", default=None, type=int,
help="Number of GPUs to allocate to Ray.")
parser.add_argument("--restore", default=None, type=str,
help="If specified, restore from this checkpoint.")
parser.add_argument("-f", "--config-file", default=None, type=str,
help="If specified, use config options from this file.")
def main(argv):
args = parser.parse_args(argv)
runner = TrialRunner(MedianStoppingRule())
if __name__ == "__main__":
args = parser.parse_args(sys.argv[1:])
if args.config_file:
with open(args.config_file) as f:
config = yaml.load(f)
for trial in parse_to_trials(config):
runner.add_trial(trial)
experiments = yaml.load(f)
else:
runner.add_trial(
Trial(
args.env, args.alg, args.config, args.local_dir, None,
args.resources, args.stop, args.checkpoint_freq, args.restore,
args.upload_dir))
ray.init(
redis_address=args.redis_address, num_cpus=args.num_cpus,
num_gpus=args.num_gpus)
experiments = {
"default": { # i.e. log to /tmp/ray/default
"alg": args.alg,
"env": args.env,
"resources": resources_to_json(args.resources),
"stop": args.stop,
"config": args.config,
"restore": args.restore,
"repeat": args.repeat,
}
}
while not runner.is_finished():
runner.step()
print(runner.debug_string())
for exp in experiments.values():
if not exp.get("alg"):
parser.error("the following arguments are required: --alg")
if not exp.get("env"):
parser.error("the following arguments are required: --env")
for trial in runner.get_trials():
if trial.status != Trial.TERMINATED:
print("Exit 1")
sys.exit(1)
print("Exit 0")
if __name__ == "__main__":
main(sys.argv[1:])
run_experiments(
experiments, redis_address=args.redis_address,
num_cpus=args.num_cpus, num_gpus=args.num_gpus)
@@ -1,7 +1,6 @@
cartpole-ppo:
env: CartPole-v0
alg: PPO
num_trials: 6
stop:
episode_reward_mean: 200
time_total_s: 180
@@ -1,10 +1,9 @@
hopper-ppo:
env: Hopper-v1
alg: PPO
num_trials: 1
resources:
cpu: 64
gpu: 4
driver_cpu_limit: 4
driver_gpu_limit: 4
cpu: 64
gpu: 4
driver_cpu_limit: 4
driver_gpu_limit: 4
config: {"gamma": 0.995, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 160000, "num_workers": 64}
@@ -2,9 +2,9 @@ humanoid-es:
env: Humanoid-v1
alg: ES
resources:
cpu: 100
driver_cpu_limit: 4
cpu: 100
driver_cpu_limit: 4
stop:
episode_reward_mean: 6000
config:
num_workers: 100
num_workers: 100
@@ -1,12 +1,11 @@
humanoid-ppo-gae:
env: Humanoid-v1
alg: PPO
num_trials: 1
stop:
episode_reward_mean: 6000
resources:
cpu: 64
gpu: 4
driver_cpu_limit: 4
cpu: 64
gpu: 4
driver_cpu_limit: 4
config: {"lambda": 0.95, "clip_param": 0.2, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "horizon": 5000, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "write_logs": false}
@@ -1,9 +1,8 @@
humanoid-ppo:
env: Humanoid-v1
alg: PPO
num_trials: 1
stop:
episode_reward_mean: 6000
episode_reward_mean: 6000
resources:
cpu: 64
gpu: 4
@@ -2,8 +2,8 @@ pong-a3c:
env: PongDeterministic-v4
alg: A3C
resources:
cpu: 16
driver_cpu_limit: 1
cpu: 16
driver_cpu_limit: 1
config:
num_workers: 16
num_batches_per_iteration: 1000
@@ -2,8 +2,8 @@ pong-deterministic-dqn:
env: PongDeterministic-v4
alg: DQN
resources:
cpu: 1
gpu: 1
cpu: 1
gpu: 1
stop:
episode_reward_mean: 20
time_total_s: 7200
@@ -1,9 +1,8 @@
walker2d-v1-ppo:
env: Walker2d-v1
alg: PPO
num_trials: 1
resources:
cpu: 64
gpu: 4
driver_cpu_limit: 4
cpu: 64
gpu: 4
driver_cpu_limit: 4
config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64}
+96 -61
View File
@@ -1,11 +1,11 @@
Parallel hyperparameter evaluation with Ray
===========================================
Parallel hyperparameter search with Ray
=======================================
Using ray.tune for deep neural network training
Using ray.tune with existing training scripts
-----------------------------------------------
With only a couple changes, you can parallelize evaluation of any existing
Python script with Ray.tune.
With only a couple changes, you can adapt any existing script for parallel
hyperparameter search with Ray.tune.
First, you must define a ``train(config, status_reporter)`` function in your
script. This will be the entry point which Ray will call into.
@@ -24,7 +24,7 @@ Second, you should periodically report training status by passing a
def train(config, status_reporter):
for step in range(1000):
# do a training iteration
... # do an optimization step, etc.
status_reporter.report(TrainingResult(
timesteps_total=step, # required
mean_loss=train_loss, # optional
@@ -40,25 +40,12 @@ For example:
./tune.py -f examples/tune_mnist_ray.yaml
The YAML or JSON file passed to ``tune.py`` specifies the configuration of the
trials to launch. For example, the following YAML describes a grid search over
activation functions.
trials to launch. You can also use ray.tune programmatically, e.g. the above
example also defines a main() using tune APIs that can be run directly:
.. code:: yaml
.. code:: bash
tune_mnist:
env: mnist
alg: script
num_trials: 10
resources:
cpu: 1
stop:
mean_accuracy: 0.99
time_total_s: 600
config:
script_file_path: examples/tune_mnist_ray.py
script_entrypoint: train
activation:
grid_search: ['relu', 'elu', 'tanh']
python examples/tune_mnist_ray.py
When run, ``./tune.py`` will schedule the trials on Ray, creating a new local
Ray cluster if an existing cluster address is not specified. Incremental
@@ -70,47 +57,22 @@ metrics using Tensorboard:
== Status ==
Resources used: 4/4 CPUs, 0/0 GPUs
Tensorboard logdir: /tmp/ray/tune_mnist
- script_mnist_0_activation=relu: RUNNING [pid=27708], 16 s, 20 ts, 0.46 acc
- script_mnist_1_activation=elu: RUNNING [pid=27709], 16 s, 20 ts, 0.54 acc
- script_mnist_2_activation=tanh: RUNNING [pid=27711], 18 s, 20 ts, 0.74 acc
- script_mnist_3_activation=relu: RUNNING [pid=27713], 12 s, 10 ts, 0.22 acc
- script_mnist_4_activation=elu: PENDING
- script_mnist_5_activation=tanh: PENDING
- script_mnist_6_activation=relu: PENDING
- script_mnist_7_activation=elu: PENDING
- script_mnist_8_activation=tanh: PENDING
- script_mnist_9_activation=relu: PENDING
- script_custom_0_activation=relu: RUNNING [pid=27708], 16 s, 20 ts, 0.46 acc
- script_custom_1_activation=elu: RUNNING [pid=27709], 16 s, 20 ts, 0.54 acc
- script_custom_2_activation=tanh: RUNNING [pid=27711], 18 s, 20 ts, 0.74 acc
- script_custom_3_activation=relu: RUNNING [pid=27713], 12 s, 10 ts, 0.22 acc
- script_custom_4_activation=elu: PENDING
- script_custom_5_activation=tanh: PENDING
- script_custom_6_activation=relu: PENDING
- script_custom_7_activation=elu: PENDING
- script_custom_8_activation=tanh: PENDING
- script_custom_9_activation=relu: PENDING
Note that if your script requires GPUs, you should specify the number of gpus
required per trial in the ``resources`` section. Additionally, Ray should be
initialized with the ``--num-gpus`` argument (you can also pass this argument
to ``tune.py``).
Using ray.tune as a library
---------------------------
Ray.tune can also be called programmatically from Python code. This allows for
finer-grained control over trial setup and scheduling. Some examples of
calling ray.tune programmatically include:
- ``python/ray/tune/examples/tune_mnist_ray.py``
- ``python/ray/rllib/train.py``
Using ray.tune with Ray RLlib
-----------------------------
Another way to use ray.tune is through RLlib's ``python/ray/rllib/train.py``
script. This script allows you to select between different RL algorithms with
the ``--alg`` option. For example, to train pong with the A3C algorithm, run:
- ``./train.py --env=PongDeterministic-v4 --alg=A3C --num-trials=8 --stop '{"time_total_s": 3200}' --resources '{"cpu": 8}' --config '{"num_workers": 8}'``
or
- ``./train.py -f tuned_examples/pong-a3c.yaml``
You can find more RLlib examples in ``python/ray/rllib/tuned_examples``.
Specifying search parameters
----------------------------
@@ -125,7 +87,7 @@ expression.
cartpole-ppo:
env: CartPole-v0
alg: PPO
num_trials: 6
repeat: 2
stop:
episode_reward_mean: 200
time_total_s: 180
@@ -134,9 +96,82 @@ expression.
driver_cpu_limit: 1 # of the 5 CPUs, only 1 is used by the driver
config:
num_workers: 4
num_sgd_iter:
grid_search: [1, 4]
timesteps_per_batch:
grid_search: [4000, 40000]
sgd_batchsize:
grid_search: [128, 256, 512]
num_sgd_iter:
eval: spec.config.sgd_batchsize * 2
lr:
eval: random.uniform(1e-4, 1e-3)
When using the Python API, the above is equivalent to the following program:
.. code:: python
import random
import ray
from ray.tune.result import TrainingResult
from ray.tune.trial_runner import TrialRunner
from ray.tune.variant_generator import grid_search, generate_trials
runner = TrialRunner()
spec = {
"env": "CartPole-v0",
"alg": "PPO",
"repeat": 2,
"stop": {
"episode_reward_mean": 200,
"time_total_s": 180,
},
"resources": {
"cpu": 4,
},
"config": {
"num_workers": 4,
"timesteps_per_batch": grid_search([4000, 40000]),
"sgd_batchsize": grid_search([128, 256, 512]),
"num_sgd_iter": lambda spec: spec.config.sgd_batchsize * 2,
"lr": lambda spec: random.uniform(1e-4, 1e-3),
},
}
for trial in generate_trials(spec):
runner.add_trial(trial)
ray.init()
while not runner.is_finished():
runner.step()
print(runner.debug_string())
Note that conditional dependencies between variables can be expressed by
variable references, e.g. ``spec.config.sgd_batchsize`` in the above example.
It is also possible to combine grid search and lambda functions by having
a lambda function return a grid search object or vice versa.
Using ray.tune as a library
---------------------------
Ray.tune's Python API allows for finer-grained control over trial setup and
scheduling. Some more examples of calling ray.tune programmatically include:
- ``python/ray/tune/examples/tune_mnist_ray.py`` (see the main function)
- ``python/ray/rllib/train.py``
- ``python/ray/rllib/tune.py``
Using ray.tune with Ray RLlib
-----------------------------
Another way to use ray.tune is through RLlib's ``python/ray/rllib/train.py``
script. This script allows you to select between different RL algorithms with
the ``--alg`` option. For example, to train pong with the A3C algorithm, run:
- ``./train.py --env=PongDeterministic-v4 --alg=A3C --stop '{"time_total_s": 3200}' --resources '{"cpu": 8}' --config '{"num_workers": 8}'``
or
- ``./train.py -f tuned_examples/pong-a3c.yaml``
You can find more RLlib examples in ``python/ray/rllib/tuned_examples``.
+25 -104
View File
@@ -5,36 +5,43 @@ from __future__ import print_function
import argparse
import json
import numpy as np
import os
import random
from ray.tune.trial import Trial, Resources
from ray.tune.trial import Resources
def _resource_json(data):
values = json.loads(data)
def json_to_resources(data):
if type(data) is str:
data = json.loads(data)
return Resources(
values.get('cpu', 0), values.get('gpu', 0),
values.get('driver_cpu_limit'), values.get('driver_gpu_limit'))
data.get("cpu", 0), data.get("gpu", 0),
data.get("driver_cpu_limit"), data.get("driver_gpu_limit"))
def make_parser(description):
def resources_to_json(resources):
return {
"cpu": resources.cpu,
"gpu": resources.gpu,
"driver_cpu_limit": resources.driver_cpu_limit,
"driver_gpu_limit": resources.driver_gpu_limit,
}
def make_parser(**kwargs):
"""Returns a base argument parser for the ray.tune tool."""
parser = argparse.ArgumentParser(description=(description))
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument("--alg", default="PPO", type=str,
parser.add_argument("--alg", default=None, type=str,
help="The learning algorithm to train.")
parser.add_argument("--stop", default="{}", type=json.loads,
help="The stopping criteria, specified in JSON.")
parser.add_argument("--config", default="{}", type=json.loads,
help="The config of the algorithm, specified in JSON.")
parser.add_argument("--resources", default='{"cpu": 1}',
type=_resource_json,
type=json_to_resources,
help="Amount of resources to allocate per trial.")
parser.add_argument("--num-trials", default=1, type=int,
help="Number of trials to evaluate.")
parser.add_argument("--repeat", default=1, type=int,
help="Number of times to repeat each trial.")
parser.add_argument("--local-dir", default="/tmp/ray", type=str,
help="Local dir to save training results to.")
parser.add_argument("--upload-dir", default=None, type=str,
@@ -42,98 +49,12 @@ def make_parser(description):
parser.add_argument("--checkpoint-freq", default=None, type=int,
help="How many iterations between checkpoints.")
# Note: this currently only makes sense when running a single trial
parser.add_argument("--restore", default=None, type=str,
help="If specified, restore from this checkpoint.")
# TODO(ekl) environments are RL specific
parser.add_argument("--env", default=None, type=str,
help="The gym environment to use.")
return parser
def parse_to_trials(config):
"""Parses a json config to the number of trials specified by the config.
The input config is a mapping from experiment names to an argument
dictionary describing a set of trials. These args include the parser args
documented in make_parser().
"""
def resolve(agent_cfg, resolved_vars, i):
assert type(agent_cfg) == dict
cfg = agent_cfg.copy()
for p, val in cfg.items():
if type(val) == dict and "eval" in val:
cfg[p] = eval(val["eval"], {
"random": random,
"np": np,
}, {
"_i": i,
})
resolved_vars[p] = True
return cfg, resolved_vars
def to_argv(config):
argv = []
for k, v in config.items():
argv.append("--{}".format(k.replace("_", "-")))
if type(v) is str:
argv.append(v)
else:
argv.append(json.dumps(v))
return argv
def param_str(config, resolved_vars):
return "_".join(
[k + "=" + str(v) for k, v in sorted(config.items())
if resolved_vars.get(k)])
parser = make_parser("Ray hyperparameter tuning tool")
trials = []
for experiment_name, exp_cfg in config.items():
args = parser.parse_args(to_argv(exp_cfg))
grid_search = _GridSearchGenerator(args.config)
for i in range(args.num_trials):
next_cfg, resolved_vars = grid_search.next()
resolved, resolved_vars = resolve(next_cfg, resolved_vars, i)
if resolved_vars:
experiment_tag = "{}_{}".format(
i, param_str(resolved, resolved_vars))
else:
experiment_tag = str(i)
trials.append(Trial(
args.env, args.alg, resolved,
os.path.join(args.local_dir, experiment_name), experiment_tag,
args.resources, args.stop, args.checkpoint_freq, None,
args.upload_dir))
return trials
class _GridSearchGenerator(object):
"""Generator that implements grid search over a set of value lists."""
def __init__(self, agent_cfg):
self.cfg = agent_cfg
self.grid_values = []
for p, val in sorted(agent_cfg.items()):
if type(val) == dict and "grid_search" in val:
assert type(val["grid_search"] == list)
self.grid_values.append((p, val["grid_search"]))
self.value_indices = [0] * len(self.grid_values)
def next(self):
cfg = self.cfg.copy()
resolved_vars = {}
for i, (k, values) in enumerate(self.grid_values):
idx = self.value_indices[i]
cfg[k] = values[idx]
resolved_vars[k] = True
if self.grid_values:
self._increment(0)
return cfg, resolved_vars
def _increment(self, i):
self.value_indices[i] += 1
if self.value_indices[i] >= len(self.grid_values[i][1]):
self.value_indices[i] = 0
if i + 1 < len(self.value_indices):
self._increment(i + 1)
+15 -13
View File
@@ -35,8 +35,8 @@ import os
import ray
from ray.tune.result import TrainingResult
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.variant_generator import grid_search, generate_trials
from tensorflow.examples.tutorials.mnist import input_data
@@ -203,18 +203,20 @@ def train(config={'activation': 'relu'}, reporter=None):
if __name__ == '__main__':
runner = TrialRunner()
for act in ['relu', 'elu', 'tanh']:
runner.add_trial(
Trial(
'mnist', 'script',
stopping_criterion={
'mean_accuracy': 0.99, 'time_total_s': 600},
config={
'script_file_path': os.path.abspath(__file__),
'script_min_iter_time_s': 1,
'activation': act,
},
experiment_tag='act={}'.format(act)))
spec = {
'stop': {
'mean_accuracy': 0.99,
'time_total_s': 600,
},
'config': {
'script_file_path': os.path.abspath(__file__),
'script_min_iter_time_s': 1,
'activation': grid_search(['relu', 'elu', 'tanh']),
},
}
for trial in generate_trials(spec):
runner.add_trial(trial)
ray.init()
+1 -3
View File
@@ -1,7 +1,5 @@
tune_mnist:
env: mnist
alg: script
num_trials: 10
repeat: 2
resources:
cpu: 1
stop:
+6
View File
@@ -7,6 +7,7 @@ import os
import sys
import time
import threading
import traceback
from ray.rllib.agent import Agent
@@ -79,6 +80,7 @@ class _RunnerThread(threading.Thread):
self._entrypoint(*self._entrypoint_args)
except Exception as e:
self._status_reporter.set_error(e)
print("Runner thread raised: {}".format(traceback.format_exc()))
raise e
@@ -122,6 +124,10 @@ class ScriptRunner(Agent):
self._runner.start()
def train(self):
if not self._initialize_ok:
raise ValueError(
"Agent initialization failed, see previous errors")
poll_start = time.time()
result = self._status_reporter._get_and_clear_status()
while result is None or \
+9 -6
View File
@@ -40,6 +40,7 @@ class TrialRunner(object):
self._running = {}
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False
def is_finished(self):
"""Returns whether all trials have finished running."""
@@ -92,12 +93,13 @@ class TrialRunner(object):
messages = ["== Status =="]
messages.append(self._scheduler_alg.debug_string())
messages.append(
"Resources used: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu,
self._avail_resources.cpu,
self._committed_resources.gpu,
self._avail_resources.gpu))
if self._resources_initialized:
messages.append(
"Resources used: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu,
self._avail_resources.cpu,
self._committed_resources.gpu,
self._avail_resources.gpu))
for local_dir in sorted(set([t.local_dir for t in self._trials])):
messages.append("Tensorboard logdir: {}".format(local_dir))
for t in self._trials:
@@ -210,3 +212,4 @@ class TrialRunner(object):
num_cpus = sum(ls['NumCPUs'] for ls in local_schedulers)
num_gpus = sum(ls['NumGPUs'] for ls in local_schedulers)
self._avail_resources = Resources(int(num_cpus), int(num_gpus))
self._resources_initialized = True
+59 -6
View File
@@ -1,14 +1,67 @@
#!/usr/bin/env python
"""Command-line tool for tuning hyperparameters with Ray.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import sys
import yaml
import ray
from ray.tune.trial_runner import TrialRunner
from ray.tune.trial import Trial
from ray.tune.variant_generator import generate_trials
EXAMPLE_USAGE = """
MNIST tuning example:
./tune.py -f examples/tune_mnist_ray.yaml
"""
from ray.rllib import train
import sys
# TODO(ekl) right now this is a thin wrapper around the rllib training script,
# however in the future we should have a separate command line tool here.
train.main(sys.argv[1:] + ['--alg=script'])
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Tune hyperparameters with Ray.",
epilog=EXAMPLE_USAGE)
# See also the base parser definition in ray/tune/config_parser.py
parser.add_argument("--redis-address", default=None, type=str,
help="The Redis address of the cluster.")
parser.add_argument("--num-cpus", default=None, type=int,
help="Number of CPUs to allocate to Ray.")
parser.add_argument("--num-gpus", default=None, type=int,
help="Number of GPUs to allocate to Ray.")
parser.add_argument("-f", "--config-file", required=True, type=str,
help="Read experiment options from this JSON/YAML file.")
def run_experiments(experiments, **ray_args):
runner = TrialRunner()
for name, spec in experiments.items():
for trial in generate_trials(spec, name):
runner.add_trial(trial)
print(runner.debug_string())
ray.init(**ray_args)
while not runner.is_finished():
runner.step()
print(runner.debug_string())
for trial in runner.get_trials():
if trial.status != Trial.TERMINATED:
print("Exit 1")
sys.exit(1)
print("Exit 0")
if __name__ == "__main__":
args = parser.parse_args(sys.argv[1:])
with open(args.config_file) as f:
experiments = yaml.load(f)
run_experiments(
experiments, redis_address=args.redis_address,
num_cpus=args.num_cpus, num_gpus=args.num_gpus)
+275
View File
@@ -0,0 +1,275 @@
import copy
import json
import numpy
import os
import random
import types
from ray.tune.trial import Trial
from ray.tune.config_parser import make_parser, json_to_resources
def generate_trials(unresolved_spec, output_path=''):
"""Wraps `generate_variants()` to return a Trial object for each variant.
See also: generate_variants()
Arguments:
unresolved_spec (dict): Experiment spec conforming to the argument
schema defined in `ray.tune.config_parser`.
output_path (str): Path where to store experiment outputs.
"""
def to_argv(config):
argv = []
for k, v in config.items():
argv.append("--{}".format(k.replace("_", "-")))
if isinstance(v, str):
argv.append(v)
else:
argv.append(json.dumps(v))
return argv
parser = make_parser()
i = 0
for _ in range(unresolved_spec.get("repeat", 1)):
for resolved_vars, spec in generate_variants(unresolved_spec):
args = parser.parse_args(to_argv(spec))
if resolved_vars:
experiment_tag = "{}_{}".format(i, resolved_vars)
else:
experiment_tag = str(i)
i += 1
yield Trial(
env_creator=spec.get("env", lambda: None),
alg=spec.get("alg", "script"),
config=spec.get("config", {}),
local_dir=os.path.join(args.local_dir, output_path),
experiment_tag=experiment_tag,
resources=json_to_resources(spec.get("resources", {})),
stopping_criterion=spec.get("stop", {}),
checkpoint_freq=args.checkpoint_freq,
restore_path=spec.get("restore"),
upload_dir=args.upload_dir)
def generate_variants(unresolved_spec):
"""Generates variants from a spec (dict) with unresolved values.
There are two types of unresolved values:
Grid search: These define a grid search over values. For example, the
following grid search values in a spec will produce six distinct
variants in combination:
"activation": grid_search(["relu", "tanh"])
"learning_rate": grid_search([1e-3, 1e-4, 1e-5])
Lambda functions: These are evaluated to produce a concrete value, and
can express dependencies or conditional distributions between values.
They can also be used to express random search (e.g., by calling
into the `random` or `np` module).
"cpu": lambda spec: spec.config.num_workers
"batch_size": lambda spec: random.uniform(1, 1000)
It is also possible to nest the two, e.g. have a lambda function
return a grid search or vice versa, as long as there are no cyclic
dependencies between unresolved values.
Finally, to support defining specs in plain JSON / YAML, grid search
and lambda functions can also be defined alternatively as follows:
"activation": {"grid_search": ["relu", "tanh"]}
"cpu": {"eval": "spec.config.num_workers"}
"""
for resolved_vars, spec in _generate_variants(unresolved_spec):
assert not _unresolved_values(spec)
yield _format_vars(resolved_vars), spec
def grid_search(values):
"""Convenience method for specifying grid search over a value."""
return {"grid_search": values}
_STANDARD_IMPORTS = {
"random": random,
"np": numpy,
}
_MAX_RESOLUTION_PASSES = 20
def _format_vars(resolved_vars):
out = []
for path, value in sorted(resolved_vars.items()):
if path[0] in ["alg", "env", "resources"]:
continue # these settings aren't usually search parameters
pieces = []
last_string = True
for k in path[::-1]:
if isinstance(k, int):
pieces.append(str(k))
elif last_string:
last_string = False
pieces.append(k)
pieces.reverse()
out.append("_".join(pieces) + "=" + str(value))
return ",".join(out)
def _generate_variants(spec):
spec = copy.deepcopy(spec)
unresolved = _unresolved_values(spec)
if not unresolved:
yield {}, spec
return
grid_vars = []
lambda_vars = []
for path, value in unresolved.items():
if isinstance(value, types.FunctionType):
lambda_vars.append((path, value))
else:
grid_vars.append((path, value))
grid_vars.sort()
grid_search = _grid_search_generator(spec, grid_vars)
for resolved_spec in grid_search:
resolved_vars = _resolve_lambda_vars(resolved_spec, lambda_vars)
for resolved, spec in _generate_variants(resolved_spec):
for path, value in grid_vars:
resolved_vars[path] = _get_value(spec, path)
for k, v in resolved.items():
if (k in resolved_vars and v != resolved_vars[k] and
_is_resolved(resolved_vars[k])):
raise ValueError(
"The variable `{}` could not be unambiguously "
"resolved to a single value. Consider simplifying "
"your variable dependencies.".format(k))
resolved_vars[k] = v
yield resolved_vars, spec
def _assign_value(spec, path, value):
for k in path[:-1]:
spec = spec[k]
spec[path[-1]] = value
def _get_value(spec, path):
for k in path:
spec = spec[k]
return spec
def _resolve_lambda_vars(spec, lambda_vars):
resolved = {}
error = True
num_passes = 0
while error and num_passes < _MAX_RESOLUTION_PASSES:
num_passes += 1
error = False
for path, fn in lambda_vars:
try:
value = fn(_UnresolvedAccessGuard(spec))
except RecursiveDependencyError as e:
error = e
else:
_assign_value(spec, path, value)
resolved[path] = value
if error:
raise error
return resolved
def _grid_search_generator(unresolved_spec, grid_vars):
value_indices = [0] * len(grid_vars)
def increment(i):
value_indices[i] += 1
if value_indices[i] >= len(grid_vars[i][1]):
value_indices[i] = 0
if i + 1 < len(value_indices):
return increment(i + 1)
else:
return True
return False
if not grid_vars:
yield unresolved_spec
return
while value_indices[-1] < len(grid_vars[-1][1]):
spec = copy.deepcopy(unresolved_spec)
for i, (path, values) in enumerate(grid_vars):
_assign_value(spec, path, values[value_indices[i]])
yield spec
if grid_vars:
done = increment(0)
if done:
break
def _is_resolved(v):
resolved, _ = _try_resolve(v)
return resolved
def _try_resolve(v):
if isinstance(v, types.FunctionType):
# Lambda function
return False, v
elif isinstance(v, dict) and len(v) == 1 and "eval" in v:
# Lambda function in eval syntax
return False, lambda spec: eval(
v["eval"], _STANDARD_IMPORTS, {"spec": spec})
elif isinstance(v, dict) and len(v) == 1 and "grid_search" in v:
# Grid search values
grid_values = v["grid_search"]
assert isinstance(grid_values, list), \
"Grid search expected list of values, got: {}".format(
grid_values)
return False, grid_values
return True, v
def _unresolved_values(spec):
found = {}
for k, v in spec.items():
resolved, v = _try_resolve(v)
if not resolved:
found[(k,)] = v
elif isinstance(v, dict):
# Recurse into a dict
for (path, value) in _unresolved_values(v).items():
found[(k,) + path] = value
elif isinstance(v, list):
# Recurse into a list
for i, elem in enumerate(v):
for (path, value) in _unresolved_values({i: elem}).items():
found[(k,) + path] = value
return found
class _UnresolvedAccessGuard(dict):
def __init__(self, *args, **kwds):
super(_UnresolvedAccessGuard, self).__init__(*args, **kwds)
self.__dict__ = self
def __getattribute__(self, item):
value = dict.__getattribute__(self, item)
if not _is_resolved(value):
raise RecursiveDependencyError(
"`{}` recursively depends on {}".format(item, value))
elif isinstance(value, dict):
return _UnresolvedAccessGuard(value)
else:
return value
class RecursiveDependencyError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)