diff --git a/python/ray/rllib/a3c/LSTM.py b/python/ray/rllib/a3c/LSTM.py index 4ddb0cb56..96a5398d8 100644 --- a/python/ray/rllib/a3c/LSTM.py +++ b/python/ray/rllib/a3c/LSTM.py @@ -22,6 +22,7 @@ class LSTMPolicy(Policy): In this A3C implementation, both the Critic and the Actor share the model. """ + num_actions = ac_space.n self.x = x = tf.placeholder(tf.float32, [None] + list(ob_space)) for i in range(4): @@ -54,12 +55,12 @@ class LSTMPolicy(Policy): time_major=False) lstm_c, lstm_h = lstm_state x = tf.reshape(lstm_outputs, [-1, size]) - self.logits = linear(x, ac_space, "action", + self.logits = linear(x, num_actions, "action", normalized_columns_initializer(0.01)) self.vf = tf.reshape(linear(x, 1, "value", normalized_columns_initializer(1.0)), [-1]) self.state_out = [lstm_c[:1, :], lstm_h[:1, :]] - self.sample = categorical_sample(self.logits, ac_space)[0, :] + self.sample = categorical_sample(self.logits, num_actions)[0, :] self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, tf.get_variable_scope().name) self.global_step = tf.get_variable( @@ -81,16 +82,24 @@ class LSTMPolicy(Policy): self.state_in[0]: batch.features[0], self.state_in[1]: batch.features[1] } + info = {} self.local_steps += 1 - return self.sess.run(self.grads, feed_dict=feed_dict) + if self.summarize: + grad, summ = self.sess.run([self.grads, self.summary_op], + feed_dict=feed_dict) + info['summary'] = summ + else: + grad = self.sess.run(self.grads, feed_dict=feed_dict) + return grad, info - def act(self, ob, c, h): + def compute_actions(self, ob, c, h): return self.sess.run([self.sample, self.vf] + self.state_out, {self.x: [ob], self.state_in[0]: c, self.state_in[1]: h}) def value(self, ob, c, h): + # process_rollout is very non-intuitive due to value being a float return self.sess.run(self.vf, {self.x: [ob], self.state_in[0]: c, self.state_in[1]: h})[0] diff --git a/python/ray/rllib/a3c/a3c.py b/python/ray/rllib/a3c/a3c.py index f0b909e4f..e19b75120 100644 --- a/python/ray/rllib/a3c/a3c.py +++ b/python/ray/rllib/a3c/a3c.py @@ -8,15 +8,16 @@ import six.moves.queue as queue import os import ray -from ray.rllib.a3c.LSTM import LSTMPolicy from ray.rllib.a3c.runner import RunnerThread, process_rollout from ray.rllib.a3c.envs import create_env from ray.rllib.common import Algorithm, TrainingResult +from ray.rllib.a3c.shared_model import SharedModel DEFAULT_CONFIG = { "num_workers": 4, "num_batches_per_iteration": 100, + "batch_size": 10 } @@ -26,17 +27,15 @@ class Runner(object): The gradient computation is also executed from this object. """ - def __init__(self, env_name, actor_id, logdir, start=True): + def __init__(self, env_name, policy_cls, actor_id, batch_size, logdir): env = create_env(env_name) self.id = actor_id - num_actions = env.action_space.n - self.policy = LSTMPolicy(env.observation_space.shape, num_actions, - actor_id) - self.runner = RunnerThread(env, self.policy, 20) + # TODO(rliaw): should change this to be just env.observation_space + self.policy = policy_cls(env.observation_space.shape, env.action_space) + self.runner = RunnerThread(env, self.policy, batch_size) self.env = env self.logdir = logdir - if start: - self.start() + self.start() def pull_batch_from_queue(self): """Take a rollout from the queue of the thread runner.""" @@ -76,21 +75,28 @@ class Runner(object): self.policy.set_weights(params) rollout = self.pull_batch_from_queue() batch = process_rollout(rollout, gamma=0.99, lambda_=1.0) - gradient = self.policy.get_gradients(batch) + gradient, info = self.policy.get_gradients(batch) + if "summary" in info: + self.summary_writer.add_summary( + tf.Summary.FromString(info['summary']), + self.policy.local_steps) + self.summary_writer.flush() info = {"id": self.id, "size": len(batch.a)} return gradient, info class A3C(Algorithm): - def __init__(self, env_name, config, upload_dir=None): + def __init__(self, env_name, config, + policy_cls=SharedModel, upload_dir=None): config.update({"alg": "A3C"}) Algorithm.__init__(self, env_name, config, upload_dir=upload_dir) self.env = create_env(env_name) - self.policy = LSTMPolicy( - self.env.observation_space.shape, self.env.action_space.n, 0) + self.policy = policy_cls( + self.env.observation_space.shape, self.env.action_space) self.agents = [ - Runner.remote(env_name, i, self.logdir) + Runner.remote(env_name, policy_cls, i, + config["batch_size"], self.logdir) for i in range(config["num_workers"])] self.parameters = self.policy.get_weights() self.iteration = 0 @@ -124,7 +130,9 @@ class A3C(Algorithm): for episode in ray.get(metrics): episode_lengths.append(episode.episode_length) episode_rewards.append(episode.episode_reward) + avg_reward = np.mean(episode_rewards) if episode_rewards else None + avg_length = np.mean(episode_lengths) if episode_lengths else None res = TrainingResult( self.experiment_id.hex, self.iteration, - np.mean(episode_rewards), np.mean(episode_lengths), dict()) + avg_reward, avg_length, dict()) return res diff --git a/python/ray/rllib/a3c/envs.py b/python/ray/rllib/a3c/envs.py index 95f26f544..cd01901f4 100644 --- a/python/ray/rllib/a3c/envs.py +++ b/python/ray/rllib/a3c/envs.py @@ -15,8 +15,9 @@ logger.setLevel(logging.INFO) def create_env(env_id): env = gym.make(env_id) - env = AtariProcessing(env) - env = Diagnostic(env) + if hasattr(env.env, "ale"): + env = AtariProcessing(env) + env = Diagnostic(env) return env @@ -34,6 +35,17 @@ def _process_frame42(frame): return frame +def _process_frame80(frame): + frame = frame[34:(34 + 160), :160] + # Resize by half, then down to 80x80. + frame = cv2.resize(frame, (80, 80)) + frame = frame.mean(2) + frame = frame.astype(np.float32) + frame *= (1.0 / 255.0) + frame = np.reshape(frame, [80, 80, 1]) + return frame + + class AtariProcessing(gym.ObservationWrapper): def __init__(self, env=None): super(AtariProcessing, self).__init__(env) diff --git a/python/ray/rllib/a3c/example.py b/python/ray/rllib/a3c/example.py index de66693e1..8b794ecf4 100755 --- a/python/ray/rllib/a3c/example.py +++ b/python/ray/rllib/a3c/example.py @@ -12,11 +12,11 @@ from ray.rllib.a3c import A3C, DEFAULT_CONFIG if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run the A3C algorithm.") - parser.add_argument("--environment", default="PongDeterministic-v3", + parser.add_argument("--environment", default="PongDeterministic-v4", type=str, help="The gym environment to use.") parser.add_argument("--redis-address", default=None, type=str, help="The Redis address of the cluster.") - parser.add_argument("--num-workers", default=4, type=int, + parser.add_argument("--num-workers", default=16, type=int, help="The number of A3C workers to use.") parser.add_argument("--iterations", default=-1, type=int, help="The number of training iterations to run.") diff --git a/python/ray/rllib/a3c/policy.py b/python/ray/rllib/a3c/policy.py index f5335c404..055948973 100644 --- a/python/ray/rllib/a3c/policy.py +++ b/python/ray/rllib/a3c/policy.py @@ -9,8 +9,9 @@ import ray class Policy(object): """The policy base class.""" - def __init__(self, ob_space, ac_space, task, name="local"): + def __init__(self, ob_space, ac_space, name="local", summarize=True): self.local_steps = 0 + self.summarize = summarize worker_device = "/job:localhost/replica:0/task:0/cpu:0" self.g = tf.Graph() with self.g.as_default(), tf.device(worker_device): @@ -25,7 +26,8 @@ class Policy(object): def setup_graph(self): raise NotImplementedError - def setup_loss(self, num_actions, summarize=True): + def setup_loss(self, ac_space): + num_actions = ac_space.n self.ac = tf.placeholder(tf.float32, [None, num_actions], name="ac") self.adv = tf.placeholder(tf.float32, [None], name="adv") self.r = tf.placeholder(tf.float32, [None], name="r") @@ -42,7 +44,6 @@ class Policy(object): # loss of value function vf_loss = 0.5 * tf.reduce_sum(tf.square(self.vf - self.r)) - vf_loss = tf.Print(vf_loss, [vf_loss], "Value Fn Loss") entropy = - tf.reduce_sum(prob_tf * log_prob_tf) bs = tf.to_float(tf.shape(self.x)[0]) @@ -55,11 +56,12 @@ class Policy(object): opt = tf.train.AdamOptimizer(1e-4) self._apply_gradients = opt.apply_gradients(grads_and_vars) - if summarize: + if self.summarize: tf.summary.scalar("model/policy_loss", pi_loss / bs) tf.summary.scalar("model/value_loss", vf_loss / bs) tf.summary.scalar("model/entropy", entropy / bs) - tf.summary.image("model/state", self.x) + tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads)) + tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list)) self.summary_op = tf.summary.merge_all() def initialize(self): @@ -87,7 +89,7 @@ class Policy(object): def get_vf_loss(self): raise NotImplementedError - def act(self, ob): + def compute_actions(self, observations): raise NotImplementedError def value(self, ob): diff --git a/python/ray/rllib/a3c/runner.py b/python/ray/rllib/a3c/runner.py index 2119f419c..7a9338f7d 100644 --- a/python/ray/rllib/a3c/runner.py +++ b/python/ray/rllib/a3c/runner.py @@ -136,7 +136,7 @@ def env_runner(env, policy, num_local_steps, summary_writer, render): rollout = PartialRollout() for _ in range(num_local_steps): - fetched = policy.act(last_state, *last_features) + fetched = policy.compute_actions(last_state, *last_features) action, value_, features = fetched[0], fetched[1], fetched[2:] # Argmax to convert from one-hot. state, reward, terminal, info = env.step(action.argmax()) diff --git a/python/ray/rllib/a3c/shared_model.py b/python/ray/rllib/a3c/shared_model.py new file mode 100644 index 000000000..cd0793d52 --- /dev/null +++ b/python/ray/rllib/a3c/shared_model.py @@ -0,0 +1,60 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +from ray.rllib.a3c.policy import ( + categorical_sample, linear, + normalized_columns_initializer, Policy) + +from ray.rllib.models.catalog import ModelCatalog + + +class SharedModel(Policy): + def __init__(self, ob_space, ac_space, **kwargs): + super(SharedModel, self).__init__(ob_space, ac_space, **kwargs) + + def setup_graph(self, ob_space, ac_space): + num_actions = ac_space.n + self.x = tf.placeholder(tf.float32, [None] + list(ob_space)) + dist_class, dist_dim = ModelCatalog.get_action_dist(ac_space) + self._model = ModelCatalog.ConvolutionalNetwork(self.x, dist_dim) + self.logits = self._model.outputs + self.vf = tf.reshape(linear(self._model.last_layer, 1, "value", + normalized_columns_initializer(1.0)), [-1]) + + self.sample = categorical_sample(self.logits, num_actions)[0, :] + self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, + tf.get_variable_scope().name) + self.global_step = tf.get_variable( + "global_step", [], tf.int32, + initializer=tf.constant_initializer(0, dtype=tf.int32), + trainable=False) + + def get_gradients(self, batch): + info = {} + feed_dict = { + self.x: batch.si, + self.ac: batch.a, + self.adv: batch.adv, + self.r: batch.r, + } + + self.local_steps += 1 + if self.summarize: + grad, summ = self.sess.run([self.grads, self.summary_op], + feed_dict=feed_dict) + info['summary'] = summ + else: + grad = self.sess.run(self.grads, feed_dict=feed_dict) + return grad, info + + def compute_actions(self, ob, *args): + return self.sess.run([self.sample, self.vf], + {self.x: [ob]}) + + def value(self, ob, *args): + return self.sess.run(self.vf, {self.x: [ob]})[0] + + def get_initial_features(self): + return [] diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index a5548643f..af0ec0538 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -8,6 +8,7 @@ from ray.rllib.models.action_dist import ( Categorical, Deterministic, DiagGaussian) from ray.rllib.models.fcnet import FullyConnectedNetwork from ray.rllib.models.visionnet import VisionNetwork +from ray.rllib.models.convnet import ConvolutionalNetwork class ModelCatalog(object): @@ -67,6 +68,10 @@ class ModelCatalog(object): return FullyConnectedNetwork(inputs, num_outputs, options) + @staticmethod + def ConvolutionalNetwork(inputs, num_outputs, options=None): + return ConvolutionalNetwork(inputs, num_outputs, options) + @staticmethod def get_preprocessor(env_name): """Returns a suitable processor for the given environment. diff --git a/python/ray/rllib/models/convnet.py b/python/ray/rllib/models/convnet.py new file mode 100644 index 000000000..ece6f17b3 --- /dev/null +++ b/python/ray/rllib/models/convnet.py @@ -0,0 +1,57 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import numpy as np + +from ray.rllib.models.model import Model +from ray.rllib.models.misc import normc_initializer + + +def conv2d(x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME", + dtype=tf.float32, collections=None): + with tf.variable_scope(name): + stride_shape = [1, stride[0], stride[1], 1] + filter_shape = [filter_size[0], filter_size[1], int(x.get_shape()[3]), + num_filters] + + # There are "num input feature maps * filter height * filter width" + # inputs to each hidden unit. + fan_in = np.prod(filter_shape[:3]) + # Each unit in the lower layer receives a gradient from: "num output + # feature maps * filter height * filter width" / pooling size. + fan_out = np.prod(filter_shape[:2]) * num_filters + # Initialize weights with random weights. + w_bound = np.sqrt(6 / (fan_in + fan_out)) + + w = tf.get_variable("W", filter_shape, dtype, + tf.random_uniform_initializer(-w_bound, w_bound), + collections=collections) + b = tf.get_variable("b", [1, 1, 1, num_filters], + initializer=tf.constant_initializer(0.0), + collections=collections) + return tf.nn.conv2d(x, w, stride_shape, pad) + b + + +def linear(x, size, name, initializer=None, bias_init=0): + w = tf.get_variable(name + "/w", [x.get_shape()[1], size], + initializer=initializer) + b = tf.get_variable(name + "/b", [size], + initializer=tf.constant_initializer(bias_init)) + return tf.matmul(x, w) + b + + +class ConvolutionalNetwork(Model): + """Generic convolutional network.""" + + def _init(self, inputs, num_outputs, options): + x = inputs + with tf.name_scope("convnet"): + for i in range(4): + x = tf.nn.elu(conv2d(x, 32, "l{}".format(i+1), [3, 3], [2, 2])) + r, c = x.shape[1].value, x.shape[2].value + x = tf.reshape(x, [-1, r*c*32]) + fc1 = linear(x, 256, "fc1") + fc2 = linear(x, num_outputs, "fc2", normc_initializer(0.01)) + return fc2, fc1 diff --git a/python/ray/rllib/models/misc.py b/python/ray/rllib/models/misc.py new file mode 100644 index 000000000..0044a021e --- /dev/null +++ b/python/ray/rllib/models/misc.py @@ -0,0 +1,15 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + +import numpy as np + + +def normc_initializer(std=1.0): + def _initializer(shape, dtype=None, partition_info=None): + out = np.random.randn(*shape).astype(np.float32) + out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True)) + return tf.constant(out) + return _initializer