mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:38:18 +08:00
[rllib] Code for Supporting Shared Models (#775)
* Code for Supporting Shared Models * Running (with vnet modification) - needs to be tested for performance * Small fix for jenkins * Linting * linting * Summaries * Small refactoring + generalized to more domains * Addressing changes * Addressing changes * Update envs.py * Addressing changes * convnet * final touches * Merge - new model * final linting * Changing iterations back * Policy option removed, fixed small things * Nits * nit * Linting * Linting
This commit is contained in:
committed by
Philipp Moritz
parent
df65e87fc7
commit
c30fdb4ab0
@@ -22,6 +22,7 @@ class LSTMPolicy(Policy):
|
||||
In this A3C implementation, both the Critic and the Actor share the
|
||||
model.
|
||||
"""
|
||||
num_actions = ac_space.n
|
||||
self.x = x = tf.placeholder(tf.float32, [None] + list(ob_space))
|
||||
|
||||
for i in range(4):
|
||||
@@ -54,12 +55,12 @@ class LSTMPolicy(Policy):
|
||||
time_major=False)
|
||||
lstm_c, lstm_h = lstm_state
|
||||
x = tf.reshape(lstm_outputs, [-1, size])
|
||||
self.logits = linear(x, ac_space, "action",
|
||||
self.logits = linear(x, num_actions, "action",
|
||||
normalized_columns_initializer(0.01))
|
||||
self.vf = tf.reshape(linear(x, 1, "value",
|
||||
normalized_columns_initializer(1.0)), [-1])
|
||||
self.state_out = [lstm_c[:1, :], lstm_h[:1, :]]
|
||||
self.sample = categorical_sample(self.logits, ac_space)[0, :]
|
||||
self.sample = categorical_sample(self.logits, num_actions)[0, :]
|
||||
self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
|
||||
tf.get_variable_scope().name)
|
||||
self.global_step = tf.get_variable(
|
||||
@@ -81,16 +82,24 @@ class LSTMPolicy(Policy):
|
||||
self.state_in[0]: batch.features[0],
|
||||
self.state_in[1]: batch.features[1]
|
||||
}
|
||||
info = {}
|
||||
self.local_steps += 1
|
||||
return self.sess.run(self.grads, feed_dict=feed_dict)
|
||||
if self.summarize:
|
||||
grad, summ = self.sess.run([self.grads, self.summary_op],
|
||||
feed_dict=feed_dict)
|
||||
info['summary'] = summ
|
||||
else:
|
||||
grad = self.sess.run(self.grads, feed_dict=feed_dict)
|
||||
return grad, info
|
||||
|
||||
def act(self, ob, c, h):
|
||||
def compute_actions(self, ob, c, h):
|
||||
return self.sess.run([self.sample, self.vf] + self.state_out,
|
||||
{self.x: [ob],
|
||||
self.state_in[0]: c,
|
||||
self.state_in[1]: h})
|
||||
|
||||
def value(self, ob, c, h):
|
||||
# process_rollout is very non-intuitive due to value being a float
|
||||
return self.sess.run(self.vf, {self.x: [ob],
|
||||
self.state_in[0]: c,
|
||||
self.state_in[1]: h})[0]
|
||||
|
||||
+22
-14
@@ -8,15 +8,16 @@ import six.moves.queue as queue
|
||||
import os
|
||||
|
||||
import ray
|
||||
from ray.rllib.a3c.LSTM import LSTMPolicy
|
||||
from ray.rllib.a3c.runner import RunnerThread, process_rollout
|
||||
from ray.rllib.a3c.envs import create_env
|
||||
from ray.rllib.common import Algorithm, TrainingResult
|
||||
from ray.rllib.a3c.shared_model import SharedModel
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"num_workers": 4,
|
||||
"num_batches_per_iteration": 100,
|
||||
"batch_size": 10
|
||||
}
|
||||
|
||||
|
||||
@@ -26,17 +27,15 @@ class Runner(object):
|
||||
|
||||
The gradient computation is also executed from this object.
|
||||
"""
|
||||
def __init__(self, env_name, actor_id, logdir, start=True):
|
||||
def __init__(self, env_name, policy_cls, actor_id, batch_size, logdir):
|
||||
env = create_env(env_name)
|
||||
self.id = actor_id
|
||||
num_actions = env.action_space.n
|
||||
self.policy = LSTMPolicy(env.observation_space.shape, num_actions,
|
||||
actor_id)
|
||||
self.runner = RunnerThread(env, self.policy, 20)
|
||||
# TODO(rliaw): should change this to be just env.observation_space
|
||||
self.policy = policy_cls(env.observation_space.shape, env.action_space)
|
||||
self.runner = RunnerThread(env, self.policy, batch_size)
|
||||
self.env = env
|
||||
self.logdir = logdir
|
||||
if start:
|
||||
self.start()
|
||||
self.start()
|
||||
|
||||
def pull_batch_from_queue(self):
|
||||
"""Take a rollout from the queue of the thread runner."""
|
||||
@@ -76,21 +75,28 @@ class Runner(object):
|
||||
self.policy.set_weights(params)
|
||||
rollout = self.pull_batch_from_queue()
|
||||
batch = process_rollout(rollout, gamma=0.99, lambda_=1.0)
|
||||
gradient = self.policy.get_gradients(batch)
|
||||
gradient, info = self.policy.get_gradients(batch)
|
||||
if "summary" in info:
|
||||
self.summary_writer.add_summary(
|
||||
tf.Summary.FromString(info['summary']),
|
||||
self.policy.local_steps)
|
||||
self.summary_writer.flush()
|
||||
info = {"id": self.id,
|
||||
"size": len(batch.a)}
|
||||
return gradient, info
|
||||
|
||||
|
||||
class A3C(Algorithm):
|
||||
def __init__(self, env_name, config, upload_dir=None):
|
||||
def __init__(self, env_name, config,
|
||||
policy_cls=SharedModel, upload_dir=None):
|
||||
config.update({"alg": "A3C"})
|
||||
Algorithm.__init__(self, env_name, config, upload_dir=upload_dir)
|
||||
self.env = create_env(env_name)
|
||||
self.policy = LSTMPolicy(
|
||||
self.env.observation_space.shape, self.env.action_space.n, 0)
|
||||
self.policy = policy_cls(
|
||||
self.env.observation_space.shape, self.env.action_space)
|
||||
self.agents = [
|
||||
Runner.remote(env_name, i, self.logdir)
|
||||
Runner.remote(env_name, policy_cls, i,
|
||||
config["batch_size"], self.logdir)
|
||||
for i in range(config["num_workers"])]
|
||||
self.parameters = self.policy.get_weights()
|
||||
self.iteration = 0
|
||||
@@ -124,7 +130,9 @@ class A3C(Algorithm):
|
||||
for episode in ray.get(metrics):
|
||||
episode_lengths.append(episode.episode_length)
|
||||
episode_rewards.append(episode.episode_reward)
|
||||
avg_reward = np.mean(episode_rewards) if episode_rewards else None
|
||||
avg_length = np.mean(episode_lengths) if episode_lengths else None
|
||||
res = TrainingResult(
|
||||
self.experiment_id.hex, self.iteration,
|
||||
np.mean(episode_rewards), np.mean(episode_lengths), dict())
|
||||
avg_reward, avg_length, dict())
|
||||
return res
|
||||
|
||||
@@ -15,8 +15,9 @@ logger.setLevel(logging.INFO)
|
||||
|
||||
def create_env(env_id):
|
||||
env = gym.make(env_id)
|
||||
env = AtariProcessing(env)
|
||||
env = Diagnostic(env)
|
||||
if hasattr(env.env, "ale"):
|
||||
env = AtariProcessing(env)
|
||||
env = Diagnostic(env)
|
||||
return env
|
||||
|
||||
|
||||
@@ -34,6 +35,17 @@ def _process_frame42(frame):
|
||||
return frame
|
||||
|
||||
|
||||
def _process_frame80(frame):
|
||||
frame = frame[34:(34 + 160), :160]
|
||||
# Resize by half, then down to 80x80.
|
||||
frame = cv2.resize(frame, (80, 80))
|
||||
frame = frame.mean(2)
|
||||
frame = frame.astype(np.float32)
|
||||
frame *= (1.0 / 255.0)
|
||||
frame = np.reshape(frame, [80, 80, 1])
|
||||
return frame
|
||||
|
||||
|
||||
class AtariProcessing(gym.ObservationWrapper):
|
||||
def __init__(self, env=None):
|
||||
super(AtariProcessing, self).__init__(env)
|
||||
|
||||
@@ -12,11 +12,11 @@ from ray.rllib.a3c import A3C, DEFAULT_CONFIG
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Run the A3C algorithm.")
|
||||
parser.add_argument("--environment", default="PongDeterministic-v3",
|
||||
parser.add_argument("--environment", default="PongDeterministic-v4",
|
||||
type=str, help="The gym environment to use.")
|
||||
parser.add_argument("--redis-address", default=None, type=str,
|
||||
help="The Redis address of the cluster.")
|
||||
parser.add_argument("--num-workers", default=4, type=int,
|
||||
parser.add_argument("--num-workers", default=16, type=int,
|
||||
help="The number of A3C workers to use.")
|
||||
parser.add_argument("--iterations", default=-1, type=int,
|
||||
help="The number of training iterations to run.")
|
||||
|
||||
@@ -9,8 +9,9 @@ import ray
|
||||
|
||||
class Policy(object):
|
||||
"""The policy base class."""
|
||||
def __init__(self, ob_space, ac_space, task, name="local"):
|
||||
def __init__(self, ob_space, ac_space, name="local", summarize=True):
|
||||
self.local_steps = 0
|
||||
self.summarize = summarize
|
||||
worker_device = "/job:localhost/replica:0/task:0/cpu:0"
|
||||
self.g = tf.Graph()
|
||||
with self.g.as_default(), tf.device(worker_device):
|
||||
@@ -25,7 +26,8 @@ class Policy(object):
|
||||
def setup_graph(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def setup_loss(self, num_actions, summarize=True):
|
||||
def setup_loss(self, ac_space):
|
||||
num_actions = ac_space.n
|
||||
self.ac = tf.placeholder(tf.float32, [None, num_actions], name="ac")
|
||||
self.adv = tf.placeholder(tf.float32, [None], name="adv")
|
||||
self.r = tf.placeholder(tf.float32, [None], name="r")
|
||||
@@ -42,7 +44,6 @@ class Policy(object):
|
||||
|
||||
# loss of value function
|
||||
vf_loss = 0.5 * tf.reduce_sum(tf.square(self.vf - self.r))
|
||||
vf_loss = tf.Print(vf_loss, [vf_loss], "Value Fn Loss")
|
||||
entropy = - tf.reduce_sum(prob_tf * log_prob_tf)
|
||||
|
||||
bs = tf.to_float(tf.shape(self.x)[0])
|
||||
@@ -55,11 +56,12 @@ class Policy(object):
|
||||
opt = tf.train.AdamOptimizer(1e-4)
|
||||
self._apply_gradients = opt.apply_gradients(grads_and_vars)
|
||||
|
||||
if summarize:
|
||||
if self.summarize:
|
||||
tf.summary.scalar("model/policy_loss", pi_loss / bs)
|
||||
tf.summary.scalar("model/value_loss", vf_loss / bs)
|
||||
tf.summary.scalar("model/entropy", entropy / bs)
|
||||
tf.summary.image("model/state", self.x)
|
||||
tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads))
|
||||
tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list))
|
||||
self.summary_op = tf.summary.merge_all()
|
||||
|
||||
def initialize(self):
|
||||
@@ -87,7 +89,7 @@ class Policy(object):
|
||||
def get_vf_loss(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def act(self, ob):
|
||||
def compute_actions(self, observations):
|
||||
raise NotImplementedError
|
||||
|
||||
def value(self, ob):
|
||||
|
||||
@@ -136,7 +136,7 @@ def env_runner(env, policy, num_local_steps, summary_writer, render):
|
||||
rollout = PartialRollout()
|
||||
|
||||
for _ in range(num_local_steps):
|
||||
fetched = policy.act(last_state, *last_features)
|
||||
fetched = policy.compute_actions(last_state, *last_features)
|
||||
action, value_, features = fetched[0], fetched[1], fetched[2:]
|
||||
# Argmax to convert from one-hot.
|
||||
state, reward, terminal, info = env.step(action.argmax())
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import tensorflow as tf
|
||||
from ray.rllib.a3c.policy import (
|
||||
categorical_sample, linear,
|
||||
normalized_columns_initializer, Policy)
|
||||
|
||||
from ray.rllib.models.catalog import ModelCatalog
|
||||
|
||||
|
||||
class SharedModel(Policy):
|
||||
def __init__(self, ob_space, ac_space, **kwargs):
|
||||
super(SharedModel, self).__init__(ob_space, ac_space, **kwargs)
|
||||
|
||||
def setup_graph(self, ob_space, ac_space):
|
||||
num_actions = ac_space.n
|
||||
self.x = tf.placeholder(tf.float32, [None] + list(ob_space))
|
||||
dist_class, dist_dim = ModelCatalog.get_action_dist(ac_space)
|
||||
self._model = ModelCatalog.ConvolutionalNetwork(self.x, dist_dim)
|
||||
self.logits = self._model.outputs
|
||||
self.vf = tf.reshape(linear(self._model.last_layer, 1, "value",
|
||||
normalized_columns_initializer(1.0)), [-1])
|
||||
|
||||
self.sample = categorical_sample(self.logits, num_actions)[0, :]
|
||||
self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
|
||||
tf.get_variable_scope().name)
|
||||
self.global_step = tf.get_variable(
|
||||
"global_step", [], tf.int32,
|
||||
initializer=tf.constant_initializer(0, dtype=tf.int32),
|
||||
trainable=False)
|
||||
|
||||
def get_gradients(self, batch):
|
||||
info = {}
|
||||
feed_dict = {
|
||||
self.x: batch.si,
|
||||
self.ac: batch.a,
|
||||
self.adv: batch.adv,
|
||||
self.r: batch.r,
|
||||
}
|
||||
|
||||
self.local_steps += 1
|
||||
if self.summarize:
|
||||
grad, summ = self.sess.run([self.grads, self.summary_op],
|
||||
feed_dict=feed_dict)
|
||||
info['summary'] = summ
|
||||
else:
|
||||
grad = self.sess.run(self.grads, feed_dict=feed_dict)
|
||||
return grad, info
|
||||
|
||||
def compute_actions(self, ob, *args):
|
||||
return self.sess.run([self.sample, self.vf],
|
||||
{self.x: [ob]})
|
||||
|
||||
def value(self, ob, *args):
|
||||
return self.sess.run(self.vf, {self.x: [ob]})[0]
|
||||
|
||||
def get_initial_features(self):
|
||||
return []
|
||||
@@ -8,6 +8,7 @@ from ray.rllib.models.action_dist import (
|
||||
Categorical, Deterministic, DiagGaussian)
|
||||
from ray.rllib.models.fcnet import FullyConnectedNetwork
|
||||
from ray.rllib.models.visionnet import VisionNetwork
|
||||
from ray.rllib.models.convnet import ConvolutionalNetwork
|
||||
|
||||
|
||||
class ModelCatalog(object):
|
||||
@@ -67,6 +68,10 @@ class ModelCatalog(object):
|
||||
|
||||
return FullyConnectedNetwork(inputs, num_outputs, options)
|
||||
|
||||
@staticmethod
|
||||
def ConvolutionalNetwork(inputs, num_outputs, options=None):
|
||||
return ConvolutionalNetwork(inputs, num_outputs, options)
|
||||
|
||||
@staticmethod
|
||||
def get_preprocessor(env_name):
|
||||
"""Returns a suitable processor for the given environment.
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import tensorflow as tf
|
||||
import numpy as np
|
||||
|
||||
from ray.rllib.models.model import Model
|
||||
from ray.rllib.models.misc import normc_initializer
|
||||
|
||||
|
||||
def conv2d(x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME",
|
||||
dtype=tf.float32, collections=None):
|
||||
with tf.variable_scope(name):
|
||||
stride_shape = [1, stride[0], stride[1], 1]
|
||||
filter_shape = [filter_size[0], filter_size[1], int(x.get_shape()[3]),
|
||||
num_filters]
|
||||
|
||||
# There are "num input feature maps * filter height * filter width"
|
||||
# inputs to each hidden unit.
|
||||
fan_in = np.prod(filter_shape[:3])
|
||||
# Each unit in the lower layer receives a gradient from: "num output
|
||||
# feature maps * filter height * filter width" / pooling size.
|
||||
fan_out = np.prod(filter_shape[:2]) * num_filters
|
||||
# Initialize weights with random weights.
|
||||
w_bound = np.sqrt(6 / (fan_in + fan_out))
|
||||
|
||||
w = tf.get_variable("W", filter_shape, dtype,
|
||||
tf.random_uniform_initializer(-w_bound, w_bound),
|
||||
collections=collections)
|
||||
b = tf.get_variable("b", [1, 1, 1, num_filters],
|
||||
initializer=tf.constant_initializer(0.0),
|
||||
collections=collections)
|
||||
return tf.nn.conv2d(x, w, stride_shape, pad) + b
|
||||
|
||||
|
||||
def linear(x, size, name, initializer=None, bias_init=0):
|
||||
w = tf.get_variable(name + "/w", [x.get_shape()[1], size],
|
||||
initializer=initializer)
|
||||
b = tf.get_variable(name + "/b", [size],
|
||||
initializer=tf.constant_initializer(bias_init))
|
||||
return tf.matmul(x, w) + b
|
||||
|
||||
|
||||
class ConvolutionalNetwork(Model):
|
||||
"""Generic convolutional network."""
|
||||
|
||||
def _init(self, inputs, num_outputs, options):
|
||||
x = inputs
|
||||
with tf.name_scope("convnet"):
|
||||
for i in range(4):
|
||||
x = tf.nn.elu(conv2d(x, 32, "l{}".format(i+1), [3, 3], [2, 2]))
|
||||
r, c = x.shape[1].value, x.shape[2].value
|
||||
x = tf.reshape(x, [-1, r*c*32])
|
||||
fc1 = linear(x, 256, "fc1")
|
||||
fc2 = linear(x, num_outputs, "fc2", normc_initializer(0.01))
|
||||
return fc2, fc1
|
||||
@@ -0,0 +1,15 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import tensorflow as tf
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def normc_initializer(std=1.0):
|
||||
def _initializer(shape, dtype=None, partition_info=None):
|
||||
out = np.random.randn(*shape).astype(np.float32)
|
||||
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
|
||||
return tf.constant(out)
|
||||
return _initializer
|
||||
Reference in New Issue
Block a user