diff --git a/.gitignore b/.gitignore index 98eb3766a..6a911dad4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ /python/ray/pyarrow_files/pyarrow/ /python/build /python/dist +/python/flatbuffers-1.7.1/ /src/common/thirdparty/redis /src/thirdparty/arrow /flatbuffers-1.7.1/ diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index 2cf21abcc..98aec1523 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -119,7 +119,7 @@ some number of iterations of the algorithm, save and load the state of training and evaluate the current policy. All agents inherit from a common base class: -.. autoclass:: ray.rllib.common.Agent +.. autoclass:: ray.rllib.agent.Agent :members: Models diff --git a/python/ray/rllib/a3c/a3c.py b/python/ray/rllib/a3c/a3c.py index 65f4c77d4..1c39f3c4d 100644 --- a/python/ray/rllib/a3c/a3c.py +++ b/python/ray/rllib/a3c/a3c.py @@ -9,11 +9,12 @@ import six.moves.queue as queue import os import ray +from ray.rllib.agent import Agent from ray.rllib.a3c.runner import RunnerThread, process_rollout from ray.rllib.a3c.envs import create_and_wrap -from ray.rllib.common import Agent, TrainingResult from ray.rllib.a3c.shared_model import SharedModel from ray.rllib.a3c.shared_model_lstm import SharedModelLSTM +from ray.tune.result import TrainingResult DEFAULT_CONFIG = { diff --git a/python/ray/rllib/common.py b/python/ray/rllib/agent.py similarity index 77% rename from python/ray/rllib/common.py rename to python/ray/rllib/agent.py index 10798232c..d844e689f 100644 --- a/python/ray/rllib/common.py +++ b/python/ray/rllib/agent.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from collections import namedtuple from datetime import datetime import json @@ -16,6 +15,7 @@ import time import uuid import tensorflow as tf +from ray.tune.result import TrainingResult if sys.version_info[0] == 2: import cStringIO as StringIO @@ -26,39 +26,6 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -TrainingResult = namedtuple("TrainingResult", [ - # Unique string identifier for this experiment. This id is preserved - # across checkpoint / restore calls. - "experiment_id", - - # The index of this training iteration, e.g. call to train(). - "training_iteration", - - # The mean episode reward reported during this iteration. - "episode_reward_mean", - - # The mean episode length reported during this iteration. - "episode_len_mean", - - # Agent-specific metadata to report for this iteration. - "info", - - # Number of timesteps in the simulator in this iteration. - "timesteps_this_iter", - - # Accumulated timesteps for this entire experiment. - "timesteps_total", - - # Time in seconds this iteration took to run. - "time_this_iter_s", - - # Accumulated time in seconds for this entire experiment. - "time_total_s", -]) - -TrainingResult.__new__.__defaults__ = (None,) * len(TrainingResult._fields) - - class Agent(object): """All RLlib agents extend this base class. @@ -71,6 +38,8 @@ class Agent(object): logdir (str): Directory in which training outputs should be placed. """ + _allow_unknown_configs = False + def __init__( self, env_creator, config, local_dir='/tmp/ray', upload_dir=None, agent_id=None): @@ -97,11 +66,12 @@ class Agent(object): self.env_creator = env_creator self.config = self._default_config.copy() - for k in config.keys(): - if k not in self.config: - raise Exception( - "Unknown agent config `{}`, " - "all agent configs: {}".format(k, self.config.keys())) + if not self._allow_unknown_configs: + for k in config.keys(): + if k not in self.config: + raise Exception( + "Unknown agent config `{}`, " + "all agent configs: {}".format(k, self.config.keys())) self.config.update(config) self.config.update({ "agent_id": agent_id, @@ -112,7 +82,7 @@ class Agent(object): logdir_suffix = "{}_{}_{}".format( env_name, - self.__class__.__name__, + self._agent_name, agent_id or datetime.today().strftime("%Y-%m-%d_%H-%M-%S")) if not os.path.exists(local_dir): @@ -128,12 +98,12 @@ class Agent(object): # TODO(ekl) consider inlining config into the result jsons config_out = os.path.join(self.logdir, "config.json") with open(config_out, "w") as f: - json.dump(self.config, f, sort_keys=True, cls=RLLibEncoder) + json.dump(self.config, f, sort_keys=True, cls=_Encoder) logger.info( - "%s algorithm created with logdir '%s' and upload uri '%s'", + "%s agent created with logdir '%s' and upload uri '%s'", self.__class__.__name__, self.logdir, log_upload_uri) - self._result_logger = RLLibLogger( + self._result_logger = _Logger( os.path.join(self.logdir, "result.json"), log_upload_uri and os.path.join(log_upload_uri, "result.json")) self._file_writer = tf.summary.FileWriter(self.logdir) @@ -162,6 +132,8 @@ class Agent(object): self._iteration += 1 time_this_iter = time.time() - start + assert result.timesteps_this_iter is not None + self._time_total += time_this_iter self._timesteps_total += result.timesteps_this_iter @@ -170,10 +142,9 @@ class Agent(object): training_iteration=self._iteration, timesteps_total=self._timesteps_total, time_this_iter_s=time_this_iter, - time_total_s=self._time_total) - - for field in result: - assert field is not None, result + time_total_s=self._time_total, + pid=os.getpid(), + hostname=os.uname()[1]) self._log_result(result) @@ -184,18 +155,18 @@ class Agent(object): # We need to use a custom json serializer class so that NaNs get # encoded as null as required by Athena. - json.dump(result._asdict(), self._result_logger, cls=RLLibEncoder) + json.dump(result._asdict(), self._result_logger, cls=_Encoder) self._result_logger.write("\n") - train_stats = tf.Summary(value=[ - tf.Summary.Value( - tag="rllib/time_this_iter_s", - simple_value=result.time_this_iter_s), - tf.Summary.Value( - tag="rllib/episode_reward_mean", - simple_value=result.episode_reward_mean), - tf.Summary.Value( - tag="rllib/episode_len_mean", - simple_value=result.episode_len_mean)]) + attrs_to_log = [ + "time_this_iter_s", "mean_loss", "mean_accuracy", + "episode_reward_mean", "episode_len_mean"] + values = [] + for attr in attrs_to_log: + if getattr(result, attr) is not None: + values.append(tf.Summary.Value( + tag="ray/tune/{}".format(attr), + simple_value=getattr(result, attr))) + train_stats = tf.Summary(value=values) self._file_writer.add_summary(train_stats, result.training_iteration) def save(self): @@ -269,10 +240,10 @@ class Agent(object): raise NotImplementedError -class RLLibEncoder(json.JSONEncoder): +class _Encoder(json.JSONEncoder): def __init__(self, nan_str="null", **kwargs): - super(RLLibEncoder, self).__init__(**kwargs) + super(_Encoder, self).__init__(**kwargs) self.nan_str = nan_str def iterencode(self, o, _one_shot=False): @@ -299,7 +270,7 @@ class RLLibEncoder(json.JSONEncoder): return int(value) -class RLLibLogger(object): +class _Logger(object): """Writing small amounts of data to S3 with real-time updates. """ @@ -322,3 +293,44 @@ class RLLibLogger(object): with self.smart_open(self.uri, "w") as f: self.result_buffer.write(b) f.write(self.result_buffer.getvalue()) + + +class _MockAgent(Agent): + """Mock agent for use in tests""" + + _agent_name = "MockAgent" + _default_config = {} + + def _init(self): + pass + + def _train(self): + return TrainingResult( + episode_reward_mean=10, episode_len_mean=10, + timesteps_this_iter=10, info={}) + + +def get_agent_class(alg): + """Returns the class of an known agent given its name.""" + + if alg == "PPO": + from ray.rllib import ppo + return ppo.PPOAgent + elif alg == "ES": + from ray.rllib import es + return es.ESAgent + elif alg == "DQN": + from ray.rllib import dqn + return dqn.DQNAgent + elif alg == "A3C": + from ray.rllib import a3c + return a3c.A3CAgent + elif alg == "script": + from ray.tune import script_runner + return script_runner.ScriptRunner + elif alg == "__fake": + return _MockAgent + else: + raise Exception( + ("Unknown algorithm {}, check --alg argument. Valid choices " + + "are PPO, ES, DQN, and A3C.").format(alg)) diff --git a/python/ray/rllib/agents.py b/python/ray/rllib/agents.py deleted file mode 100644 index 9ff144fee..000000000 --- a/python/ray/rllib/agents.py +++ /dev/null @@ -1,43 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from ray.rllib.common import Agent, TrainingResult - - -class _MockAgent(Agent): - """Mock agent for use in tests""" - - _agent_name = "MockAgent" - _default_config = {} - - def _init(self): - pass - - def _train(self): - return TrainingResult( - episode_reward_mean=10, episode_len_mean=10, - timesteps_this_iter=10, info={}) - - -def get_agent_class(alg): - """Returns the class of an known agent given its name.""" - - if alg == "PPO": - from ray.rllib import ppo - return ppo.PPOAgent - elif alg == "ES": - from ray.rllib import es - return es.ESAgent - elif alg == "DQN": - from ray.rllib import dqn - return dqn.DQNAgent - elif alg == "A3C": - from ray.rllib import a3c - return a3c.A3CAgent - elif alg == "__fake": - return _MockAgent - else: - raise Exception( - ("Unknown algorithm {}, check --alg argument. Valid choices " + - "are PPO, ES, DQN, and A3C.").format(alg)) diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index 76697e444..b05883231 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -10,11 +10,12 @@ import os import tensorflow as tf import ray -from ray.rllib.common import Agent, TrainingResult +from ray.rllib.agent import Agent from ray.rllib.dqn import logger, models from ray.rllib.dqn.common.wrappers import wrap_dqn from ray.rllib.dqn.common.schedules import LinearSchedule from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer +from ray.tune.result import TrainingResult """The default configuration dict for the DQN algorithm. diff --git a/python/ray/rllib/es/es.py b/python/ray/rllib/es/es.py index 5945b0601..d93a3d942 100644 --- a/python/ray/rllib/es/es.py +++ b/python/ray/rllib/es/es.py @@ -12,7 +12,7 @@ import pickle import time import ray -from ray.rllib.common import Agent, TrainingResult +from ray.rllib.agent import Agent from ray.rllib.models import ModelCatalog from ray.rllib.es import optimizers @@ -20,6 +20,7 @@ from ray.rllib.es import policies from ray.rllib.es import tabular_logger as tlogger from ray.rllib.es import tf_util from ray.rllib.es import utils +from ray.tune.result import TrainingResult Result = namedtuple("Result", [ diff --git a/python/ray/rllib/ppo/ppo.py b/python/ray/rllib/ppo/ppo.py index ae6aaf160..f0518dc02 100644 --- a/python/ray/rllib/ppo/ppo.py +++ b/python/ray/rllib/ppo/ppo.py @@ -11,7 +11,8 @@ import tensorflow as tf from tensorflow.python import debug as tf_debug import ray -from ray.rllib.common import Agent, TrainingResult +from ray.rllib.agent import Agent +from ray.tune.result import TrainingResult from ray.rllib.ppo.runner import Runner, RemoteRunner from ray.rllib.ppo.rollout import collect_samples from ray.rllib.ppo.utils import shuffle diff --git a/python/ray/rllib/test/test_checkpoint_restore.py b/python/ray/rllib/test/test_checkpoint_restore.py index cdf7c140f..33ac3628a 100755 --- a/python/ray/rllib/test/test_checkpoint_restore.py +++ b/python/ray/rllib/test/test_checkpoint_restore.py @@ -8,7 +8,7 @@ import numpy as np import ray import random -from ray.rllib.agents import get_agent_class +from ray.rllib.agent import get_agent_class def get_mean_action(alg, obs): diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index 7f80dff08..5c3bbef2f 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -34,14 +34,18 @@ parser = make_parser("Train a reinforcement learning agent.") # defined there. parser.add_argument("--redis-address", default=None, type=str, help="The Redis address of the cluster.") +parser.add_argument("--num-cpus", default=None, type=int, + help="Number of CPUs to allocate to Ray.") +parser.add_argument("--num-gpus", default=None, type=int, + help="Number of GPUs to allocate to Ray.") parser.add_argument("--restore", default=None, type=str, help="If specified, restore from this checkpoint.") parser.add_argument("-f", "--config-file", default=None, type=str, help="If specified, use config options from this file.") -if __name__ == "__main__": - args = parser.parse_args() +def main(argv): + args = parser.parse_args(argv) runner = TrialRunner() if args.config_file: @@ -56,7 +60,9 @@ if __name__ == "__main__": args.resources, args.stop, args.checkpoint_freq, args.restore, args.upload_dir)) - ray.init(redis_address=args.redis_address) + ray.init( + redis_address=args.redis_address, num_cpus=args.num_cpus, + num_gpus=args.num_gpus) while not runner.is_finished(): runner.step() @@ -64,4 +70,11 @@ if __name__ == "__main__": for trial in runner.get_trials(): if trial.status != Trial.TERMINATED: + print("Exit 1") sys.exit(1) + + print("Exit 0") + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/python/ray/rllib/tuned_examples/hopper-ppo.yaml b/python/ray/rllib/tuned_examples/hopper-ppo.yaml index 3b69610d3..b20a434a4 100644 --- a/python/ray/rllib/tuned_examples/hopper-ppo.yaml +++ b/python/ray/rllib/tuned_examples/hopper-ppo.yaml @@ -5,4 +5,4 @@ hopper-ppo: resources: cpu: 64 gpu: 4 - config: {"gamma": 0.995, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 160000, "num_workers": 64} + config: {"gamma": 0.995, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 160000, "num_workers": 64} diff --git a/python/ray/rllib/tuned_examples/humanoid-ppo-gae.yaml b/python/ray/rllib/tuned_examples/humanoid-ppo-gae.yaml index 4ec83fa76..61e43e738 100644 --- a/python/ray/rllib/tuned_examples/humanoid-ppo-gae.yaml +++ b/python/ray/rllib/tuned_examples/humanoid-ppo-gae.yaml @@ -7,5 +7,5 @@ humanoid-ppo-gae: resources: cpu: 64 gpu: 4 - config: {"lambda": 0.95, "clip_param": 0.2, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "horizon": 5000, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "write_logs": false} + config: {"lambda": 0.95, "clip_param": 0.2, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "horizon": 5000, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "write_logs": false} diff --git a/python/ray/rllib/tuned_examples/humanoid-ppo.yaml b/python/ray/rllib/tuned_examples/humanoid-ppo.yaml index 4993c39c5..99f9dfa91 100644 --- a/python/ray/rllib/tuned_examples/humanoid-ppo.yaml +++ b/python/ray/rllib/tuned_examples/humanoid-ppo.yaml @@ -7,4 +7,4 @@ humanoid-ppo: resources: cpu: 64 gpu: 4 - config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "use_gae": false} + config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "use_gae": false} diff --git a/python/ray/rllib/tuned_examples/walker2d-ppo.yaml b/python/ray/rllib/tuned_examples/walker2d-ppo.yaml index 66a5ebf3b..f94d3370b 100644 --- a/python/ray/rllib/tuned_examples/walker2d-ppo.yaml +++ b/python/ray/rllib/tuned_examples/walker2d-ppo.yaml @@ -5,4 +5,4 @@ walker2d-v1-ppo: resources: cpu: 64 gpu: 4 - config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64} + config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64} diff --git a/python/ray/tune/README.rst b/python/ray/tune/README.rst index e83788878..9716c9b16 100644 --- a/python/ray/tune/README.rst +++ b/python/ray/tune/README.rst @@ -1,24 +1,123 @@ -Ray.tune: Fast hyperparameter search -==================================== +Parallel hyperparameter evaluation with Ray +=========================================== -Using ray.tune with RLlib -------------------------- +Using ray.tune for deep neural network training +----------------------------------------------- -One way to use ray.tune is through RLlib's train.py script. The train.py script -supports two modes. For example, to run multiple concurrent trials of Pong: +With only a couple changes, you can parallelize evaluation of any existing +Python script with Ray.tune. -- Inline args: ``./train.py --env=Pong-v0 --alg=PPO --num_trials=8 --stop '{"time_total_s": 3200}' --resources '{"cpu": 8, "gpu": 2}' --config '{"num_workers": 8, "sgd_num_iter": 10}'`` +First, you must define a ``train(config, status_reporter)`` function in your +script. This will be the entry point which Ray will call into. -- File-based: ``./train.py -f tune-pong.yaml`` +.. code:: python -Both delegate scheduling of trials to the ray.tune TrialRunner class. -Additionally, the file-based mode supports hyper-parameter tuning -(currently just grid and random search). + def train(config, status_reporter): + pass -To specify search parameters, variables in the `config` section may be set to -different values for each trial. You can either specify `grid_search: ` +Second, you should periodically report training status by passing a +``TrainingResult`` tuple to ``status_reporter.report()``. + +.. code:: python + + from ray.tune.result import TrainingResult + + def train(config, status_reporter): + for step in range(1000): + # do a training iteration + status_reporter.report(TrainingResult( + timesteps_total=step, # required + mean_loss=train_loss, # optional + mean_accuracy=train_accuracy # optional + )) + +You can then launch a hyperparameter tuning run by running ``tune.py``. +For example: + +.. code:: bash + + cd python/ray/tune + ./tune.py -f examples/tune_mnist_ray.yaml + +The YAML or JSON file passed to ``tune.py`` specifies the configuration of the +trials to launch. For example, the following YAML describes a grid search over +activation functions. + +.. code:: yaml + + tune_mnist: + env: mnist + alg: script + num_trials: 10 + resources: + cpu: 1 + stop: + mean_accuracy: 0.99 + time_total_s: 600 + config: + script_file_path: examples/tune_mnist_ray.py + script_entrypoint: train + activation: + grid_search: ['relu', 'elu', 'tanh'] + +When run, ``./tune.py`` will schedule the trials on Ray, creating a new local +Ray cluster if an existing cluster address is not specified. Incremental +status will be reported on the command line, and you can also view the reported +metrics using Tensorboard: + +.. code:: text + + == Status == + Resources used: 4/4 CPUs, 0/0 GPUs + Tensorboard logdir: /tmp/ray/tune_mnist + - script_mnist_0_activation=relu: RUNNING [pid=27708], 16 s, 20 ts, 0.46 acc + - script_mnist_1_activation=elu: RUNNING [pid=27709], 16 s, 20 ts, 0.54 acc + - script_mnist_2_activation=tanh: RUNNING [pid=27711], 18 s, 20 ts, 0.74 acc + - script_mnist_3_activation=relu: RUNNING [pid=27713], 12 s, 10 ts, 0.22 acc + - script_mnist_4_activation=elu: PENDING + - script_mnist_5_activation=tanh: PENDING + - script_mnist_6_activation=relu: PENDING + - script_mnist_7_activation=elu: PENDING + - script_mnist_8_activation=tanh: PENDING + - script_mnist_9_activation=relu: PENDING + +Note that if your script requires GPUs, you should specify the number of gpus +required per trial in the ``resources`` section. Additionally, Ray should be +initialized with the ``--num-gpus`` argument (you can also pass this argument +to ``tune.py``). + +Using ray.tune as a library +--------------------------- + +Ray.tune can also be called programmatically from Python code. This allows for +finer-grained control over trial setup and scheduling. Some examples of +calling ray.tune programmatically include: + +- ``python/ray/tune/examples/tune_mnist_ray.py`` +- ``python/ray/rllib/train.py`` + +Using ray.tune with Ray RLlib +----------------------------- + +Another way to use ray.tune is through RLlib's ``python/ray/rllib/train.py`` +script. This script allows you to select between different RL algorithms with +the ``--alg`` option. For example, to train pong with the A3C algorithm, run: + +- ``./train.py --env=PongDeterministic-v4 --alg=A3C --num-trials=8 --stop '{"time_total_s": 3200}' --resources '{"cpu": 8}' --config '{"num_workers": 8}'`` + +or + +- ``./train.py -f tuned_examples/pong-a3c.yaml`` + +You can find more RLlib examples in ``python/ray/rllib/tuned_examples``. + +Specifying search parameters +---------------------------- + +To specify search parameters, variables in the ``config`` section may be set to +different values for each trial. You can either specify ``grid_search: `` in place of a concrete value to specify a grid search across the list of -values, or `eval: ` for values to be sampled from the given Python +values, or ``eval: `` for values to be sampled from the given Python expression. .. code:: yaml @@ -40,15 +139,3 @@ expression. grid_search: [128, 256, 512] lr: eval: random.uniform(1e-4, 1e-3) - -See ray/rllib/tuned_examples for more examples of configs in YAML form. - -Using ray.tune to run custom scripts ------------------------------------- - -TODO - -Using ray.tune as a library ---------------------------- - -TODO diff --git a/python/ray/tune/config_parser.py b/python/ray/tune/config_parser.py index caac2d1af..9f4135841 100644 --- a/python/ray/tune/config_parser.py +++ b/python/ray/tune/config_parser.py @@ -8,7 +8,6 @@ import json import numpy as np import os import random -import sys from ray.tune.trial import Trial, Resources @@ -32,13 +31,13 @@ def make_parser(description): parser.add_argument("--resources", default='{"cpu": 1}', type=_resource_json, help="Amount of resources to allocate per trial.") - parser.add_argument("--num_trials", default=1, type=int, + parser.add_argument("--num-trials", default=1, type=int, help="Number of trials to evaluate.") - parser.add_argument("--local_dir", default="/tmp/ray", type=str, + parser.add_argument("--local-dir", default="/tmp/ray", type=str, help="Local dir to save training results to.") - parser.add_argument("--upload_dir", default=None, type=str, + parser.add_argument("--upload-dir", default=None, type=str, help="URI to upload training results to.") - parser.add_argument("--checkpoint_freq", default=sys.maxsize, type=int, + parser.add_argument("--checkpoint-freq", default=None, type=int, help="How many iterations between checkpoints.") # TODO(ekl) environments are RL specific @@ -73,7 +72,7 @@ def parse_to_trials(config): def to_argv(config): argv = [] for k, v in config.items(): - argv.append("--{}".format(k)) + argv.append("--{}".format(k.replace("_", "-"))) if type(v) is str: argv.append(v) else: diff --git a/python/ray/tune/examples/tune_mnist_ray.py b/python/ray/tune/examples/tune_mnist_ray.py new file mode 100755 index 000000000..2a10070d7 --- /dev/null +++ b/python/ray/tune/examples/tune_mnist_ray.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python +# +# Copyright 2015 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +"""A deep MNIST classifier using convolutional layers. + +See extensive documentation at +https://www.tensorflow.org/get_started/mnist/pros +""" +# Disable linter warnings to maintain consistency with tutorial. +# pylint: disable=invalid-name +# pylint: disable=g-bad-import-order + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import sys +import tempfile +import os + +import ray +from ray.tune.result import TrainingResult +from ray.tune.trial import Trial +from ray.tune.trial_runner import TrialRunner + +from tensorflow.examples.tutorials.mnist import input_data + +import tensorflow as tf + +FLAGS = None +status_reporter = None # used to report training status back to Ray +activation_fn = None # e.g. tf.nn.relu + + +def deepnn(x): + """deepnn builds the graph for a deep net for classifying digits. + + Args: + x: an input tensor with the dimensions (N_examples, 784), where 784 is + the number of pixels in a standard MNIST image. + + Returns: + A tuple (y, keep_prob). y is a tensor of shape (N_examples, 10), with + values equal to the logits of classifying the digit into one of 10 + classes (the digits 0-9). keep_prob is a scalar placeholder for the + probability of dropout. + """ + # Reshape to use within a convolutional neural net. + # Last dimension is for "features" - there is only one here, since images + # are grayscale -- it would be 3 for an RGB image, 4 for RGBA, etc. + with tf.name_scope('reshape'): + x_image = tf.reshape(x, [-1, 28, 28, 1]) + + # First convolutional layer - maps one grayscale image to 32 feature maps. + with tf.name_scope('conv1'): + W_conv1 = weight_variable([5, 5, 1, 32]) + b_conv1 = bias_variable([32]) + h_conv1 = activation_fn(conv2d(x_image, W_conv1) + b_conv1) + + # Pooling layer - downsamples by 2X. + with tf.name_scope('pool1'): + h_pool1 = max_pool_2x2(h_conv1) + + # Second convolutional layer -- maps 32 feature maps to 64. + with tf.name_scope('conv2'): + W_conv2 = weight_variable([5, 5, 32, 64]) + b_conv2 = bias_variable([64]) + h_conv2 = activation_fn(conv2d(h_pool1, W_conv2) + b_conv2) + + # Second pooling layer. + with tf.name_scope('pool2'): + h_pool2 = max_pool_2x2(h_conv2) + + # Fully connected layer 1 -- after 2 round of downsampling, our 28x28 image + # is down to 7x7x64 feature maps -- maps this to 1024 features. + with tf.name_scope('fc1'): + W_fc1 = weight_variable([7 * 7 * 64, 1024]) + b_fc1 = bias_variable([1024]) + + h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64]) + h_fc1 = activation_fn(tf.matmul(h_pool2_flat, W_fc1) + b_fc1) + + # Dropout - controls the complexity of the model, prevents co-adaptation of + # features. + with tf.name_scope('dropout'): + keep_prob = tf.placeholder(tf.float32) + h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob) + + # Map the 1024 features to 10 classes, one for each digit + with tf.name_scope('fc2'): + W_fc2 = weight_variable([1024, 10]) + b_fc2 = bias_variable([10]) + + y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2 + return y_conv, keep_prob + + +def conv2d(x, W): + """conv2d returns a 2d convolution layer with full stride.""" + return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME') + + +def max_pool_2x2(x): + """max_pool_2x2 downsamples a feature map by 2X.""" + return tf.nn.max_pool( + x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') + + +def weight_variable(shape): + """weight_variable generates a weight variable of a given shape.""" + initial = tf.truncated_normal(shape, stddev=0.1) + return tf.Variable(initial) + + +def bias_variable(shape): + """bias_variable generates a bias variable of a given shape.""" + initial = tf.constant(0.1, shape=shape) + return tf.Variable(initial) + + +def main(_): + # Import data + mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True) + + # Create the model + x = tf.placeholder(tf.float32, [None, 784]) + + # Define loss and optimizer + y_ = tf.placeholder(tf.float32, [None, 10]) + + # Build the graph for the deep net + y_conv, keep_prob = deepnn(x) + + with tf.name_scope('loss'): + cross_entropy = tf.nn.softmax_cross_entropy_with_logits( + labels=y_, logits=y_conv) + cross_entropy = tf.reduce_mean(cross_entropy) + + with tf.name_scope('adam_optimizer'): + train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy) + + with tf.name_scope('accuracy'): + correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1)) + correct_prediction = tf.cast(correct_prediction, tf.float32) + accuracy = tf.reduce_mean(correct_prediction) + + graph_location = tempfile.mkdtemp() + print('Saving graph to: %s' % graph_location) + train_writer = tf.summary.FileWriter(graph_location) + train_writer.add_graph(tf.get_default_graph()) + + with tf.Session() as sess: + sess.run(tf.global_variables_initializer()) + for i in range(20000): + batch = mnist.train.next_batch(50) + if i % 10 == 0: + train_accuracy = accuracy.eval(feed_dict={ + x: batch[0], y_: batch[1], keep_prob: 1.0}) + + # !!! Report status to ray.tune !!! + if status_reporter: + status_reporter.report(TrainingResult( + timesteps_total=i, + mean_accuracy=train_accuracy)) + + print('step %d, training accuracy %g' % (i, train_accuracy)) + train_step.run( + feed_dict={x: batch[0], y_: batch[1], keep_prob: 0.5}) + + print('test accuracy %g' % accuracy.eval(feed_dict={ + x: mnist.test.images, y_: mnist.test.labels, keep_prob: 1.0})) + + +# !!! Entrypoint for ray.tune !!! +def train(config={'activation': 'relu'}, reporter=None): + global FLAGS, status_reporter, activation_fn + status_reporter = reporter + activation_fn = getattr(tf.nn, config['activation']) + parser = argparse.ArgumentParser() + parser.add_argument( + '--data_dir', type=str, default='/tmp/tensorflow/mnist/input_data', + help='Directory for storing input data') + FLAGS, unparsed = parser.parse_known_args() + tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) + + +# !!! Example of using the ray.tune Python API !!! +if __name__ == '__main__': + runner = TrialRunner() + + for act in ['relu', 'elu', 'tanh']: + runner.add_trial( + Trial( + 'mnist', 'script', + stopping_criterion={ + 'mean_accuracy': 0.99, 'time_total_s': 600}, + config={ + 'script_file_path': os.path.abspath(__file__), + 'script_min_iter_time_s': 1, + 'activation': act, + }, + agent_id='act={}'.format(act))) + + ray.init() + + while not runner.is_finished(): + runner.step() + print(runner.debug_string()) diff --git a/python/ray/tune/examples/tune_mnist_ray.yaml b/python/ray/tune/examples/tune_mnist_ray.yaml new file mode 100644 index 000000000..8ba414e6f --- /dev/null +++ b/python/ray/tune/examples/tune_mnist_ray.yaml @@ -0,0 +1,15 @@ +tune_mnist: + env: mnist + alg: script + num_trials: 10 + resources: + cpu: 1 + stop: + mean_accuracy: 0.99 + time_total_s: 600 + config: + script_file_path: examples/tune_mnist_ray.py + script_entrypoint: train + script_min_iter_time_s: 1 + activation: + grid_search: ['relu', 'elu', 'tanh'] diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py new file mode 100644 index 000000000..cc7a40e5e --- /dev/null +++ b/python/ray/tune/result.py @@ -0,0 +1,62 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import namedtuple + +""" +When using ray.tune with custom training scripts, you must periodically report +training status back to Ray by calling status_reporter.report(result). + +Most of the fields are optional, the only required one is timesteps_total. + +In RLlib, the supplied algorithms fill in TrainingResult for you. +""" + + +TrainingResult = namedtuple("TrainingResult", [ + # (Required) Accumulated timesteps for this entire experiment. + "timesteps_total", + + # (Optional) Custom metadata to report for this iteration. + "info", + + # (Optional) The mean episode reward if applicable. + "episode_reward_mean", + + # (Optional) The mean episode length if applicable. + "episode_len_mean", + + # (Optional) The current training accuracy if applicable> + "mean_accuracy", + + # (Optional) The current training loss if applicable. + "mean_loss", + + # (Auto-filled) The negated current training loss. + "neg_mean_loss", + + # (Auto-filled) Unique string identifier for this experiment. This id is + # preserved across checkpoint / restore calls. + "experiment_id", + + # (Auto-filled) The index of this training iteration, e.g. call to train(). + "training_iteration", + + # (Auto-filled) Number of timesteps in the simulator in this iteration. + "timesteps_this_iter", + + # (Auto-filled) Time in seconds this iteration took to run. + "time_this_iter_s", + + # (Auto-filled) Accumulated time in seconds for this entire experiment. + "time_total_s", + + # (Auto-filled) The pid of the training process. + "pid", + + # (Auto-filled) The hostname of the machine hosting the training process. + "hostname", +]) + +TrainingResult.__new__.__defaults__ = (None,) * len(TrainingResult._fields) diff --git a/python/ray/tune/script_runner.py b/python/ray/tune/script_runner.py new file mode 100644 index 000000000..30ad56ba9 --- /dev/null +++ b/python/ray/tune/script_runner.py @@ -0,0 +1,162 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import importlib +import os +import sys +import time +import threading + +from ray.rllib.agent import Agent + + +class StatusReporter(object): + """Object passed into your main() that you can report status through.""" + + def __init__(self): + self._latest_result = None + self._lock = threading.Lock() + self._error = None + + def report(self, result): + """Report updated training status. + + Args: + result (TrainingResult): Latest training result status. You must + at least define `timesteps_total`, but probably want to report + some of the other metrics as well. + """ + + with self._lock: + self._latest_result = result + + def set_error(self, error): + """Report an error. + + Args: + error (obj): Error object or string. + """ + + self._error = error + + def _get_and_clear_status(self): + if self._error: + raise Exception("Error running script: " + str(self._error)) + with self._lock: + res = self._latest_result + self._latest_result = None + return res + + def _stop(self): + self._error = "Agent stopped" + + +DEFAULT_CONFIG = { + # path of the script to run + "script_file_path": "/path/to/file.py", + + # name of train function in the file, e.g. train(config, status_reporter) + "script_entrypoint": "train", + + # batch results to at least this granularity + "script_min_iter_time_s": 5, +} + + +class _RunnerThread(threading.Thread): + """Supervisor thread that runs your script.""" + + def __init__(self, entrypoint, config, status_reporter): + self._entrypoint = entrypoint + self._entrypoint_args = [config, status_reporter] + self._status_reporter = status_reporter + threading.Thread.__init__(self) + self.daemon = True + + def run(self): + try: + self._entrypoint(*self._entrypoint_args) + except Exception as e: + self._status_reporter.set_error(e) + raise e + + +class ScriptRunner(Agent): + """Agent that runs a user script returning training results.""" + + _agent_name = "script" + _default_config = DEFAULT_CONFIG + _allow_unknown_configs = True + + def _init(self): + # strong assumption here that we're in a new process + file_path = os.path.expanduser(self.config["script_file_path"]) + sys.path.insert(0, os.path.dirname(file_path)) + if hasattr(importlib, "util"): + # Python 3.4+ + spec = importlib.util.spec_from_file_location( + "external_file", file_path) + external_file = importlib.util.module_from_spec(spec) + spec.loader.exec_module(external_file) + elif hasattr(importlib, "machinery"): + # Python 3.3 + from importlib.machinery import SourceFileLoader + external_file = SourceFileLoader( + "external_file", file_path).load_module() + else: + # Python 2.x + import imp + external_file = imp.load_source("external_file", file_path) + if not external_file: + raise Exception( + "Unable to import file at {}".format( + self.config["script_file_path"])) + entrypoint = getattr(external_file, self.config["script_entrypoint"]) + self._status_reporter = StatusReporter() + self._runner = _RunnerThread( + entrypoint, self.config, self._status_reporter) + self._start_time = time.time() + self._last_reported_time = self._start_time + self._last_reported_timestep = 0 + self._runner.start() + + def train(self): + poll_start = time.time() + result = self._status_reporter._get_and_clear_status() + while result is None or \ + time.time() - poll_start < \ + self.config["script_min_iter_time_s"]: + time.sleep(1) + result = self._status_reporter._get_and_clear_status() + + now = time.time() + + # Include the negative loss to use as a stopping condition + if result.mean_loss is not None: + neg_loss = -result.mean_loss + else: + neg_loss = result.neg_mean_loss + + result = result._replace( + experiment_id=self._experiment_id, + neg_mean_loss=neg_loss, + training_iteration=self.iteration, + time_this_iter_s=now - self._last_reported_time, + timesteps_this_iter=( + result.timesteps_total - self._last_reported_timestep), + time_total_s=now - self._start_time, + pid=os.getpid(), + hostname=os.uname()[1]) + + if result.timesteps_total: + self._last_reported_timestep = result.timesteps_total + self._last_reported_time = now + self._iteration += 1 + self._log_result(result) + + return result + + def stop(self): + self._status_reporter._stop() + Agent.stop(self) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index fe3a65e4e..d92c8a9c5 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -2,12 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import sys import traceback import ray +import os from collections import namedtuple -from ray.rllib.agents import get_agent_class +from ray.rllib.agent import get_agent_class # Ray resources required to schedule a Trial @@ -24,15 +24,15 @@ class Trial(object): On error it transitions to ERROR, otherwise TERMINATED on success. """ - PENDING = 'PENDING' - RUNNING = 'RUNNING' - TERMINATED = 'TERMINATED' - ERROR = 'ERROR' + PENDING = "PENDING" + RUNNING = "RUNNING" + TERMINATED = "TERMINATED" + ERROR = "ERROR" def __init__( self, env_creator, alg, config={}, local_dir='/tmp/ray', agent_id=None, resources=Resources(cpu=1, gpu=0), - stopping_criterion={}, checkpoint_freq=sys.maxsize, + stopping_criterion={}, checkpoint_freq=None, restore_path=None, upload_dir=None): """Initialize a new trial. @@ -61,6 +61,7 @@ class Trial(object): self.checkpoint_path = None self.agent = None self.status = Trial.PENDING + self.location = None def start(self): """Starts this trial. @@ -135,12 +136,33 @@ class Trial(object): if self.last_result is None: return self.status - return '{}, {} s, {} ts, {} itrs, {} rew'.format( - self.status, - int(self.last_result.time_total_s), - int(self.last_result.timesteps_total), - self.last_result.training_iteration, - round(self.last_result.episode_reward_mean, 1)) + + def location_string(hostname, pid): + if hostname == os.uname()[1]: + return 'pid={}'.format(pid) + else: + return '{} pid={}'.format(hostname, pid) + + pieces = [ + '{} [{}]'.format( + self.status, location_string( + self.last_result.hostname, self.last_result.pid)), + '{} s'.format(int(self.last_result.time_total_s)), + '{} ts'.format(int(self.last_result.timesteps_total))] + + if self.last_result.episode_reward_mean is not None: + pieces.append('{} rew'.format( + format(self.last_result.episode_reward_mean, '.3g'))) + + if self.last_result.mean_loss is not None: + pieces.append('{} loss'.format( + format(self.last_result.mean_loss, '.3g'))) + + if self.last_result.mean_accuracy is not None: + pieces.append('{} acc'.format( + format(self.last_result.mean_accuracy, '.3g'))) + + return ', '.join(pieces) def checkpoint(self): """Synchronously checkpoints the state of this trial. diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 5cffd0cd9..4a9b6ef69 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -63,7 +63,7 @@ class TrialRunner(object): if trial.status == Trial.PENDING: assert self._has_resources(trial.resources), \ ("Insufficient cluster resources to launch trial", - trial.resources) + (trial.resources, self._avail_resources)) assert False, "Called step when all trials finished?" def get_trials(self): @@ -87,9 +87,11 @@ class TrialRunner(object): messages = ["== Status =="] messages.append( - "Available: {}".format(self._avail_resources)) - messages.append( - "Committed: {}".format(self._committed_resources)) + "Resources used: {}/{} CPUs, {}/{} GPUs".format( + self._committed_resources.cpu, + self._avail_resources.cpu, + self._committed_resources.gpu, + self._avail_resources.gpu)) for local_dir in sorted(set([t.local_dir for t in self._trials])): messages.append("Tensorboard logdir: {}".format(local_dir)) for t in self._trials: diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py new file mode 100755 index 000000000..86a051436 --- /dev/null +++ b/python/ray/tune/tune.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +"""Command-line tool for tuning hyperparameters with Ray. + +MNIST tuning example: + ./tune.py -f examples/tune_mnist_ray.yaml +""" + +from ray.rllib import train +import sys + +# TODO(ekl) right now this is a thin wrapper around the rllib training script, +# however in the future we should have a separate command line tool here. +train.main(sys.argv[1:] + ['--alg=script']) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 08c679df9..2b082ba88 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -114,3 +114,9 @@ docker run --shm-size=10G --memory=10G $DOCKER_SHA \ docker run --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_checkpoint_restore.py + +docker run --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/tune/tune.py \ + --env mnist \ + --stop '{"training_iteration": 2}' \ + --config '{"script_file_path": "/ray/python/ray/tune/examples/tune_mnist_ray.py", "activation": "relu"}'