From 2d81edfcdc7bcd1e71c1a987496515038f5558e8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 29 Jun 2017 08:49:56 -0700 Subject: [PATCH] [rllib] Move a3c implementation from examples/ to python/ray/rllib/ (#698) * rllib v0 * fix imports * lint * comments * update docs * a3c wip * a3c wip * report stats * update doc * name is too long * fix small bug * propagate exception on error * fetch metrics * fix lint --- doc/source/example-a3c.rst | 6 +- examples/a3c/driver.py | 83 ------------ examples/a3c/misc.py | 33 ----- {examples => python/ray/rllib}/a3c/LSTM.py | 6 +- python/ray/rllib/a3c/__init__.py | 3 + python/ray/rllib/a3c/a3c.py | 126 +++++++++++++++++++ {examples => python/ray/rllib}/a3c/envs.py | 0 python/ray/rllib/a3c/example.py | 32 +++++ {examples => python/ray/rllib}/a3c/policy.py | 0 {examples => python/ray/rllib}/a3c/runner.py | 43 +++++-- 10 files changed, 199 insertions(+), 133 deletions(-) delete mode 100644 examples/a3c/driver.py delete mode 100644 examples/a3c/misc.py rename {examples => python/ray/rllib}/a3c/LSTM.py (96%) create mode 100644 python/ray/rllib/a3c/__init__.py create mode 100644 python/ray/rllib/a3c/a3c.py rename {examples => python/ray/rllib}/a3c/envs.py (100%) create mode 100755 python/ray/rllib/a3c/example.py rename {examples => python/ray/rllib}/a3c/policy.py (100%) rename {examples => python/ray/rllib}/a3c/runner.py (82%) diff --git a/doc/source/example-a3c.rst b/doc/source/example-a3c.rst index 553889ff4..5e7f0c6ca 100644 --- a/doc/source/example-a3c.rst +++ b/doc/source/example-a3c.rst @@ -25,7 +25,7 @@ You can run the code with .. code-block:: bash - python ray/examples/a3c/driver.py [num_workers] + python/ray/rllib/a3c/example.py --num-workers=N Reinforcement Learning ---------------------- @@ -153,6 +153,6 @@ workers, we can train the agent in around 25 minutes. You can visualize performance by running :code:`tensorboard --logdir [directory]` in a separate screen, where -:code:`[directory]` is defaulted to :code:`./results/`. If you are running +:code:`[directory]` is defaulted to :code:`/tmp/ray/a3c/`. If you are running multiple experiments, be sure to vary the directory to which Tensorflow saves -its progress (found in :code:`driver.py`). +its progress (found in :code:`a3c.py`). diff --git a/examples/a3c/driver.py b/examples/a3c/driver.py deleted file mode 100644 index 74af71ea8..000000000 --- a/examples/a3c/driver.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -from runner import RunnerThread, process_rollout -from LSTM import LSTMPolicy -import tensorflow as tf -import six.moves.queue as queue -import sys -import os -from envs import create_env - - -@ray.remote -class Runner(object): - """Actor object to start running simulation on workers. - - The gradient computation is also executed from this object. - """ - def __init__(self, env_name, actor_id, logdir="results/", start=True): - env = create_env(env_name) - self.id = actor_id - num_actions = env.action_space.n - self.policy = LSTMPolicy(env.observation_space.shape, num_actions, - actor_id) - self.runner = RunnerThread(env, self.policy, 20) - self.env = env - self.logdir = logdir - if start: - self.start() - - def pull_batch_from_queue(self): - """Take a rollout from the queue of the thread runner.""" - rollout = self.runner.queue.get(timeout=600.0) - while not rollout.terminal: - try: - rollout.extend(self.runner.queue.get_nowait()) - except queue.Empty: - break - return rollout - - def start(self): - summary_writer = tf.summary.FileWriter( - os.path.join(self.logdir, "agent_%d" % self.id)) - self.summary_writer = summary_writer - self.runner.start_runner(self.policy.sess, summary_writer) - - def compute_gradient(self, params): - self.policy.set_weights(params) - rollout = self.pull_batch_from_queue() - batch = process_rollout(rollout, gamma=0.99, lambda_=1.0) - gradient = self.policy.get_gradients(batch) - info = {"id": self.id, - "size": len(batch.a)} - return gradient, info - - -def train(num_workers, env_name="PongDeterministic-v3"): - env = create_env(env_name) - policy = LSTMPolicy(env.observation_space.shape, env.action_space.n, 0) - agents = [Runner.remote(env_name, i) for i in range(num_workers)] - parameters = policy.get_weights() - gradient_list = [agent.compute_gradient.remote(parameters) - for agent in agents] - steps = 0 - obs = 0 - while True: - done_id, gradient_list = ray.wait(gradient_list) - gradient, info = ray.get(done_id)[0] - policy.model_update(gradient) - parameters = policy.get_weights() - steps += 1 - obs += info["size"] - gradient_list.extend( - [agents[info["id"]].compute_gradient.remote(parameters)]) - return policy - - -if __name__ == "__main__": - num_workers = int(sys.argv[1]) - ray.init(num_cpus=num_workers) - train(num_workers) diff --git a/examples/a3c/misc.py b/examples/a3c/misc.py deleted file mode 100644 index ab915ce0f..000000000 --- a/examples/a3c/misc.py +++ /dev/null @@ -1,33 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from datetime import datetime -import cProfile -import io -import pstats - - -def timestamp(): - return datetime.now().timestamp() - - -def time_string(): - return datetime.now().strftime("%Y%m%d_%H_%M_%f") - - -class Profiler(object): - def __init__(self): - self.pr = cProfile.Profile() - pass - - def __enter__(self): - self.pr.enable() - - def __exit__(self, type, value, traceback): - self.pr.disable() - s = io.StringIO() - sortby = "cumtime" - ps = pstats.Stats(self.pr, stream=s).sort_stats(sortby) - ps.print_stats(.2) - print(s.getvalue()) diff --git a/examples/a3c/LSTM.py b/python/ray/rllib/a3c/LSTM.py similarity index 96% rename from examples/a3c/LSTM.py rename to python/ray/rllib/a3c/LSTM.py index 2b8d79f3d..4aac0982e 100644 --- a/examples/a3c/LSTM.py +++ b/python/ray/rllib/a3c/LSTM.py @@ -6,8 +6,10 @@ import numpy as np import tensorflow as tf import tensorflow.contrib.rnn as rnn import distutils.version -from policy import (categorical_sample, conv2d, linear, flatten, - normalized_columns_initializer, Policy) + +from ray.rllib.a3c.policy import ( + categorical_sample, conv2d, linear, flatten, + normalized_columns_initializer, Policy) use_tf100_api = (distutils.version.LooseVersion(tf.VERSION) >= distutils.version.LooseVersion("1.0.0")) diff --git a/python/ray/rllib/a3c/__init__.py b/python/ray/rllib/a3c/__init__.py new file mode 100644 index 000000000..6df6b2cc6 --- /dev/null +++ b/python/ray/rllib/a3c/__init__.py @@ -0,0 +1,3 @@ +from ray.rllib.a3c.a3c import A3C, DEFAULT_CONFIG + +__all__ = ["A3C", "DEFAULT_CONFIG"] diff --git a/python/ray/rllib/a3c/a3c.py b/python/ray/rllib/a3c/a3c.py new file mode 100644 index 000000000..f47ec3962 --- /dev/null +++ b/python/ray/rllib/a3c/a3c.py @@ -0,0 +1,126 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import tensorflow as tf +import six.moves.queue as queue +import os + +import ray +from ray.rllib.a3c.LSTM import LSTMPolicy +from ray.rllib.a3c.runner import RunnerThread, process_rollout +from ray.rllib.a3c.envs import create_env +from ray.rllib.common import Algorithm, TrainingResult + + +DEFAULT_CONFIG = { + "num_workers": 4, + "num_batches_per_iteration": 100, +} + + +@ray.remote +class Runner(object): + """Actor object to start running simulation on workers. + + The gradient computation is also executed from this object. + """ + def __init__(self, env_name, actor_id, logdir="/tmp/ray/a3c/", start=True): + env = create_env(env_name) + self.id = actor_id + num_actions = env.action_space.n + self.policy = LSTMPolicy(env.observation_space.shape, num_actions, + actor_id) + self.runner = RunnerThread(env, self.policy, 20) + self.env = env + self.logdir = logdir + if start: + self.start() + + def pull_batch_from_queue(self): + """Take a rollout from the queue of the thread runner.""" + rollout = self.runner.queue.get(timeout=600.0) + if isinstance(rollout, BaseException): + raise rollout + while not rollout.terminal: + try: + part = self.runner.queue.get_nowait() + if isinstance(part, BaseException): + raise rollout + rollout.extend(part) + except queue.Empty: + break + return rollout + + def get_completed_rollout_metrics(self): + """Returns metrics on previously completed rollouts. + + Calling this clears the queue of completed rollout metrics. + """ + completed = [] + while True: + try: + completed.append(self.runner.metrics_queue.get_nowait()) + except queue.Empty: + break + return completed + + def start(self): + summary_writer = tf.summary.FileWriter( + os.path.join(self.logdir, "agent_%d" % self.id)) + self.summary_writer = summary_writer + self.runner.start_runner(self.policy.sess, summary_writer) + + def compute_gradient(self, params): + self.policy.set_weights(params) + rollout = self.pull_batch_from_queue() + batch = process_rollout(rollout, gamma=0.99, lambda_=1.0) + gradient = self.policy.get_gradients(batch) + info = {"id": self.id, + "size": len(batch.a)} + return gradient, info + + +class A3C(Algorithm): + def __init__(self, env_name, config): + Algorithm.__init__(self, env_name, config) + self.env = create_env(env_name) + self.policy = LSTMPolicy( + self.env.observation_space.shape, self.env.action_space.n, 0) + self.agents = [ + Runner.remote(env_name, i) for i in range(config["num_workers"])] + self.parameters = self.policy.get_weights() + self.iteration = 0 + + def train(self): + gradient_list = [ + agent.compute_gradient.remote(self.parameters) + for agent in self.agents] + max_batches = self.config["num_batches_per_iteration"] + batches_so_far = len(gradient_list) + while gradient_list: + done_id, gradient_list = ray.wait(gradient_list) + gradient, info = ray.get(done_id)[0] + self.policy.model_update(gradient) + self.parameters = self.policy.get_weights() + if batches_so_far < max_batches: + batches_so_far += 1 + gradient_list.extend( + [self.agents[info["id"]].compute_gradient.remote(self.parameters)]) + res = self.fetch_metrics_from_workers() + self.iteration += 1 + return res + + def fetch_metrics_from_workers(self): + episode_rewards = [] + episode_lengths = [] + metric_lists = [ + a.get_completed_rollout_metrics.remote() for a in self.agents] + for metrics in metric_lists: + for episode in ray.get(metrics): + episode_lengths.append(episode.episode_length) + episode_rewards.append(episode.episode_reward) + res = TrainingResult( + self.iteration, np.mean(episode_rewards), np.mean(episode_lengths)) + return res diff --git a/examples/a3c/envs.py b/python/ray/rllib/a3c/envs.py similarity index 100% rename from examples/a3c/envs.py rename to python/ray/rllib/a3c/envs.py diff --git a/python/ray/rllib/a3c/example.py b/python/ray/rllib/a3c/example.py new file mode 100755 index 000000000..34529882e --- /dev/null +++ b/python/ray/rllib/a3c/example.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse + +import ray +from ray.rllib.a3c import A3C, DEFAULT_CONFIG + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run the A3C algorithm.") + parser.add_argument("--environment", default="PongDeterministic-v3", + type=str, help="The gym environment to use.") + parser.add_argument("--redis-address", default=None, type=str, + help="The Redis address of the cluster.") + parser.add_argument("--num-workers", default=4, type=int, + help="The number of A3C workers to use>") + + args = parser.parse_args() + ray.init(redis_address=args.redis_address, num_cpus=args.num_workers) + + config = DEFAULT_CONFIG.copy() + config["num_workers"] = args.num_workers + + a3c = A3C(args.environment, config) + + while True: + res = a3c.train() + print("current status: {}".format(res)) diff --git a/examples/a3c/policy.py b/python/ray/rllib/a3c/policy.py similarity index 100% rename from examples/a3c/policy.py rename to python/ray/rllib/a3c/policy.py diff --git a/examples/a3c/runner.py b/python/ray/rllib/a3c/runner.py similarity index 82% rename from examples/a3c/runner.py rename to python/ray/rllib/a3c/runner.py index f787b79ea..7c8dd182f 100644 --- a/examples/a3c/runner.py +++ b/python/ray/rllib/a3c/runner.py @@ -33,7 +33,11 @@ def process_rollout(rollout, gamma, lambda_=1.0): features) -Batch = namedtuple("Batch", ["si", "a", "adv", "r", "terminal", "features"]) +Batch = namedtuple( + "Batch", ["si", "a", "adv", "r", "terminal", "features"]) + +CompletedRollout = namedtuple( + "CompletedRollout", ["episode_length", "episode_reward"]) class PartialRollout(object): @@ -75,6 +79,7 @@ class RunnerThread(threading.Thread): def __init__(self, env, policy, num_local_steps, visualise=False): threading.Thread.__init__(self) self.queue = queue.Queue(5) + self.metrics_queue = queue.Queue() self.num_local_steps = num_local_steps self.env = env self.last_features = None @@ -90,26 +95,37 @@ class RunnerThread(threading.Thread): self.start() def run(self): - with self.sess.as_default(): - self._run() + try: + with self.sess.as_default(): + self._run() + except BaseException as e: + self.queue.put(e) + raise e def _run(self): - rollout_provider = env_runner(self.env, self.policy, self.num_local_steps, - self.summary_writer, self.visualise) + rollout_provider = env_runner( + self.env, self.policy, self.num_local_steps, + self.summary_writer, self.visualise) while True: # The timeout variable exists because apparently, if one worker dies, the # other workers won't die with it, unless the timeout is set to some # large number. This is an empirical observation. - self.queue.put(next(rollout_provider), timeout=600.0) + item = next(rollout_provider) + if isinstance(item, CompletedRollout): + self.metrics_queue.put(item) + else: + self.queue.put(item, timeout=600.0) def env_runner(env, policy, num_local_steps, summary_writer, render): - """This impleents the logic of the thread runner. + """This implements the logic of the thread runner. It continually runs the policy, and as long as the rollout exceeds a certain length, the thread runner appends the policy to the queue. """ last_state = env.reset() + timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit" + ".max_episode_steps") last_features = policy.get_initial_features() length = 0 rewards = 0 @@ -127,10 +143,13 @@ def env_runner(env, policy, num_local_steps, summary_writer, render): if render: env.render() - # Collect the experience. - rollout.add(last_state, action, reward, value_, terminal, last_features) length += 1 rewards += reward + if length >= timestep_limit: + terminal = True + + # Collect the experience. + rollout.add(last_state, action, reward, value_, terminal, last_features) last_state = state last_features = features @@ -142,10 +161,10 @@ def env_runner(env, policy, num_local_steps, summary_writer, render): summary_writer.add_summary(summary, rollout_number) summary_writer.flush() - timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit" - ".max_episode_steps") - if terminal or length >= timestep_limit: + if terminal: terminal_end = True + yield CompletedRollout(length, rewards) + if length >= timestep_limit or not env.metadata.get("semantics" ".autoreset"): last_state = env.reset()