[rllib] Also refactor DQN to use shared RLlib models (#730)

* wip

* works with cartpole

* lint

* fix pg

* comment

* action dist rename

* preprocessor

* fix test

* typo

* fix the action[0] nonsense

* revert

* satisfy the lint

* wip

* works with cartpole

* lint

* fix pg

* comment

* action dist rename

* preprocessor

* fix test

* typo

* fix the action[0] nonsense

* revert

* satisfy the lint

* Minor indentation changes.

* fix merge

* add humanoid

* initial dqn refactor

* remove tfutil

* fix calls

* fix tf errors 1

* closer

* runs now

* lint

* tensorboard graph

* fix linting

* more 4 space

* fix

* fix linT

* more lint

* oops

* es parity

* remove example.py

* fix training bug

* add cartpole demo

* try fixing cartpole

* allow model options, configure cartpole

* debug

* simplify

* no dueling

* avoid out of file handles

* Test dqn in jenkins.

* Minor formatting.

* fix issue

* fix another

* Fix problem in which we log to a directory that hasn't been created.
This commit is contained in:
Eric Liang
2017-07-26 12:29:00 -07:00
committed by Philipp Moritz
parent 8ad9ced99b
commit b6a18cb39b
13 changed files with 399 additions and 1302 deletions
-284
View File
@@ -1,284 +0,0 @@
"""Deep Q learning graph
The functions in this file can are used to create the following functions:
======= act ========
Function to chose an action given an observation
Parameters
----------
observation: object
Observation that can be feed into the output of make_obs_ph
stochastic: bool
if set to False all the actions are always deterministic
(default False)
update_eps_ph: float
update epsilon a new value, if negative not update happens
(default: no update)
Returns
-------
Tensor of dtype tf.int64 and shape (BATCH_SIZE,) with an action to be
performed for every element of the batch.
======= train =======
Function that takes a transition (s,a,r,s') and optimizes Bellman
equation's error:
td_error = Q(s,a) - (r + gamma * max_a' Q(s', a'))
loss = huber_loss[td_error]
Parameters
----------
obs_t: object
a batch of observations
action: np.array
actions that were selected upon seeing obs_t.
dtype must be int32 and shape must be (batch_size,)
reward: np.array
immediate reward attained after executing those actions
dtype must be float32 and shape must be (batch_size,)
obs_tp1: object
observations that followed obs_t
done: np.array
1 if obs_t was the last observation in the episode and 0 otherwise
obs_tp1 gets ignored, but must be of the valid shape.
dtype must be float32 and shape must be (batch_size,)
weight: np.array
imporance weights for every element of the batch (gradient is
multiplied by the importance weight) dtype must be float32 and shape
must be (batch_size,)
Returns
-------
td_error: np.array
a list of differences between Q(s,a) and the target in Bellman's
equation. dtype is float32 and shape is (batch_size,)
======= update_target ========
copy the parameters from optimized Q function to the target Q function.
In Q learning we actually optimize the following error:
Q(s,a) - (r + gamma * max_a' Q'(s', a'))
Where Q' is lagging behind Q to stablize the learning. For example for
Atari
Q' is set to Q once every 10000 updates training steps.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from ray.rllib.dqn.common import tf_util as U
def build_act(make_obs_ph, q_func, num_actions, scope="deepq", reuse=None):
"""Creates the act function:
Parameters
----------
make_obs_ph: str -> tf.placeholder or TfInput
a function that take a name and creates a placeholder of input with
that name
q_func: (tf.Variable, int, str, bool) -> tf.Variable
the model that takes the following inputs:
observation_in: object
the output of observation placeholder
num_actions: int
number of actions
scope: str
reuse: bool
should be passed to outer variable scope
and returns a tensor of shape (batch_size, num_actions) with values of
every action.
num_actions: int
number of actions.
scope: str or VariableScope
optional scope for variable_scope.
reuse: bool or None
whether or not the variables should be reused. To be able to reuse the
scope must be given.
Returns
-------
act: (tf.Variable, bool, float) -> tf.Variable
function to select and action given observation.
` See the top of the file for details.
"""
with tf.variable_scope(scope, reuse=reuse):
observations_ph = U.ensure_tf_input(make_obs_ph("observation"))
stochastic_ph = tf.placeholder(tf.bool, (), name="stochastic")
update_eps_ph = tf.placeholder(tf.float32, (), name="update_eps")
eps = tf.get_variable(
"eps", (), initializer=tf.constant_initializer(0))
q_values = q_func(observations_ph.get(), num_actions, scope="q_func")
deterministic_actions = tf.argmax(q_values, axis=1)
batch_size = tf.shape(observations_ph.get())[0]
random_actions = tf.random_uniform(
tf.stack([batch_size]), minval=0, maxval=num_actions,
dtype=tf.int64)
chose_random = tf.random_uniform(
tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps
stochastic_actions = tf.where(
chose_random, random_actions, deterministic_actions)
output_actions = tf.cond(
stochastic_ph, lambda: stochastic_actions,
lambda: deterministic_actions)
update_eps_expr = eps.assign(
tf.cond(update_eps_ph >= 0, lambda: update_eps_ph, lambda: eps))
act = U.function(
inputs=[observations_ph, stochastic_ph, update_eps_ph],
outputs=output_actions,
givens={update_eps_ph: -1.0, stochastic_ph: True},
updates=[update_eps_expr])
return act
def build_train(
make_obs_ph, q_func, num_actions, optimizer, grad_norm_clipping=None,
gamma=1.0, double_q=True, scope="deepq", reuse=None):
"""Creates the train function:
Parameters
----------
make_obs_ph: str -> tf.placeholder or TfInput
a function that takes a name and creates a placeholder of input with
that name
q_func: (tf.Variable, int, str, bool) -> tf.Variable
the model that takes the following inputs:
observation_in: object
the output of observation placeholder
num_actions: int
number of actions
scope: str
reuse: bool
should be passed to outer variable scope
and returns a tensor of shape (batch_size, num_actions) with values of
every action.
num_actions: int
number of actions
reuse: bool
whether or not to reuse the graph variables
optimizer: tf.train.Optimizer
optimizer to use for the Q-learning objective.
grad_norm_clipping: float or None
clip gradient norms to this value. If None no clipping is performed.
gamma: float
discount rate.
double_q: bool
if true will use Double Q Learning (https://arxiv.org/abs/1509.06461).
In general it is a good idea to keep it enabled.
scope: str or VariableScope
optional scope for variable_scope.
reuse: bool or None
whether or not the variables should be reused. To be able to reuse the
scope must be given.
Returns
-------
act: (tf.Variable, bool, float) -> tf.Variable
function to select and action given observation.
` See the top of the file for details.
train: (object, np.array, np.array, object, np.array, np.array) -> np.array
optimize the error in Bellman's equation.
` See the top of the file for details.
update_target: () -> ()
copy the parameters from optimized Q function to the target Q function.
` See the top of the file for details.
debug: {str: function}
a bunch of functions to print debug data like q_values.
"""
act_f = build_act(make_obs_ph, q_func, num_actions, scope=scope,
reuse=reuse)
with tf.variable_scope(scope, reuse=reuse):
# set up placeholders
obs_t_input = U.ensure_tf_input(make_obs_ph("obs_t"))
act_t_ph = tf.placeholder(tf.int32, [None], name="action")
rew_t_ph = tf.placeholder(tf.float32, [None], name="reward")
obs_tp1_input = U.ensure_tf_input(make_obs_ph("obs_tp1"))
done_mask_ph = tf.placeholder(tf.float32, [None], name="done")
importance_weights_ph = tf.placeholder(tf.float32, [None],
name="weight")
# q network evaluation
q_t = q_func(
obs_t_input.get(), num_actions, scope="q_func",
reuse=True) # reuse parameters from act
q_func_vars = U.scope_vars(U.absolute_scope_name("q_func"))
# target q network evalution
q_tp1 = q_func(obs_tp1_input.get(), num_actions, scope="target_q_func")
target_q_func_vars = U.scope_vars(
U.absolute_scope_name("target_q_func"))
# q scores for actions which we know were selected in the given state.
q_t_selected = tf.reduce_sum(q_t * tf.one_hot(act_t_ph, num_actions),
1)
# compute estimate of best possible value starting from state at t + 1
if double_q:
q_tp1_using_online_net = q_func(
obs_tp1_input.get(), num_actions, scope="q_func", reuse=True)
q_tp1_best_using_online_net = tf.arg_max(q_tp1_using_online_net, 1)
q_tp1_best = tf.reduce_sum(
q_tp1 * tf.one_hot(q_tp1_best_using_online_net, num_actions),
1)
else:
q_tp1_best = tf.reduce_max(q_tp1, 1)
q_tp1_best_masked = (1.0 - done_mask_ph) * q_tp1_best
# compute RHS of bellman equation
q_t_selected_target = rew_t_ph + gamma * q_tp1_best_masked
# compute the error (potentially clipped)
td_error = q_t_selected - tf.stop_gradient(q_t_selected_target)
errors = U.huber_loss(td_error)
weighted_error = tf.reduce_mean(importance_weights_ph * errors)
# compute optimization op (potentially with gradient clipping)
if grad_norm_clipping is not None:
optimize_expr = U.minimize_and_clip(
optimizer, weighted_error, var_list=q_func_vars,
clip_val=grad_norm_clipping)
else:
optimize_expr = optimizer.minimize(weighted_error,
var_list=q_func_vars)
# update_target_fn will be called periodically to copy Q network to
# target Q network
update_target_expr = []
for var, var_target in zip(
sorted(q_func_vars, key=lambda v: v.name),
sorted(target_q_func_vars, key=lambda v: v.name)):
update_target_expr.append(var_target.assign(var))
update_target_expr = tf.group(*update_target_expr)
# Create callable functions
train = U.function(
inputs=[
obs_t_input,
act_t_ph,
rew_t_ph,
obs_tp1_input,
done_mask_ph,
importance_weights_ph
],
outputs=td_error,
updates=[optimize_expr])
update_target = U.function([], [], updates=[update_target_expr])
q_values = U.function([obs_t_input], q_t)
return act_f, train, update_target, {'q_values': q_values}
@@ -126,13 +126,14 @@ class MaxAndSkipEnv(gym.Wrapper):
return obs
class ProcessFrame84(gym.ObservationWrapper):
# TODO(ekl): switch this to use a RLlib common preprocessor
class ProcessFrame80(gym.ObservationWrapper):
def __init__(self, env=None):
super(ProcessFrame84, self).__init__(env)
self.observation_space = spaces.Box(low=0, high=255, shape=(84, 84, 1))
super(ProcessFrame80, self).__init__(env)
self.observation_space = spaces.Box(low=0, high=255, shape=(80, 80, 1))
def _observation(self, obs):
return ProcessFrame84.process(obs)
return ProcessFrame80.process(obs)
@staticmethod
def process(frame):
@@ -144,10 +145,10 @@ class ProcessFrame84(gym.ObservationWrapper):
assert False, "Unknown resolution."
img = (img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 +
img[:, :, 2] * 0.114)
resized_screen = cv2.resize(img, (84, 110),
interpolation=cv2.INTER_AREA)
x_t = resized_screen[18:102, :]
x_t = np.reshape(x_t, [84, 84, 1])
resized_screen = cv2.resize(
img, (80, 110), interpolation=cv2.INTER_AREA)
x_t = resized_screen[20:100, :]
x_t = np.reshape(x_t, [80, 80, 1])
return x_t.astype(np.uint8)
@@ -225,7 +226,7 @@ def wrap_dqn(env):
env = MaxAndSkipEnv(env, skip=4)
if 'FIRE' in env.unwrapped.get_action_meanings():
env = FireResetEnv(env)
env = ProcessFrame84(env)
env = ProcessFrame80(env)
env = FrameStack(env, 4)
env = ClippedRewardsWrapper(env)
return env
@@ -234,7 +235,7 @@ def wrap_dqn(env):
class A2cProcessFrame(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.observation_space = spaces.Box(low=0, high=255, shape=(84, 84, 1))
self.observation_space = spaces.Box(low=0, high=255, shape=(80, 80, 1))
def _step(self, action):
ob, reward, done, info = self.env.step(action)
@@ -246,5 +247,5 @@ class A2cProcessFrame(gym.Wrapper):
@staticmethod
def process(frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA)
return frame.reshape(84, 84, 1)
frame = cv2.resize(frame, (80, 80), interpolation=cv2.INTER_AREA)
return frame.reshape(80, 80, 1)
-793
View File
@@ -1,793 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf # pylint: ignore-module
import builtins
import functools
import copy
import os
import collections
# ================================================================
# Make consistent with numpy
# ================================================================
clip = tf.clip_by_value
def sum(x, axis=None, keepdims=False):
axis = None if axis is None else [axis]
return tf.reduce_sum(x, axis=axis, keep_dims=keepdims)
def mean(x, axis=None, keepdims=False):
axis = None if axis is None else [axis]
return tf.reduce_mean(x, axis=axis, keep_dims=keepdims)
def var(x, axis=None, keepdims=False):
meanx = mean(x, axis=axis, keepdims=keepdims)
return mean(tf.square(x - meanx), axis=axis, keepdims=keepdims)
def std(x, axis=None, keepdims=False):
return tf.sqrt(var(x, axis=axis, keepdims=keepdims))
def max(x, axis=None, keepdims=False):
axis = None if axis is None else [axis]
return tf.reduce_max(x, axis=axis, keep_dims=keepdims)
def min(x, axis=None, keepdims=False):
axis = None if axis is None else [axis]
return tf.reduce_min(x, axis=axis, keep_dims=keepdims)
def concatenate(arrs, axis=0):
return tf.concat(axis=axis, values=arrs)
def argmax(x, axis=None):
return tf.argmax(x, axis=axis)
def switch(condition, then_expression, else_expression):
"""Switches between two operations depending on a scalar value (int or
bool). Note that both `then_expression` and `else_expression`
should be symbolic tensors of the *same shape*.
# Arguments
condition: scalar tensor.
then_expression: TensorFlow operation.
else_expression: TensorFlow operation.
"""
x_shape = copy.copy(then_expression.get_shape())
x = tf.cond(tf.cast(condition, 'bool'),
lambda: then_expression, lambda: else_expression)
x.set_shape(x_shape)
return x
# ================================================================
# Extras
# ================================================================
def l2loss(params):
if len(params) == 0:
return tf.constant(0.0)
else:
return tf.add_n([sum(tf.square(p)) for p in params])
def lrelu(x, leak=0.2):
f1 = 0.5 * (1 + leak)
f2 = 0.5 * (1 - leak)
return f1 * x + f2 * abs(x)
def categorical_sample_logits(X):
# https://github.com/tensorflow/tensorflow/issues/456
U = tf.random_uniform(tf.shape(X))
return argmax(X - tf.log(-tf.log(U)), axis=1)
# ================================================================
# Inputs
# ================================================================
def is_placeholder(x):
return type(x) is tf.Tensor and len(x.op.inputs) == 0
class TfInput(object):
def __init__(self, name="(unnamed)"):
"""Generalized Tensorflow placeholder. The main differences are:
- possibly uses multiple placeholders internally and returns multiple
values
- can apply light postprocessing to the value feed to placeholder.
"""
self.name = name
def get(self):
"""Return the tf variable(s) representing the possibly postprocessed
value of placeholder(s).
"""
raise NotImplemented()
def make_feed_dict(data):
"""Given data input it to the placeholder(s)."""
raise NotImplemented()
class PlacholderTfInput(TfInput):
def __init__(self, placeholder):
"""Wrapper for regular tensorflow placeholder."""
super().__init__(placeholder.name)
self._placeholder = placeholder
def get(self):
return self._placeholder
def make_feed_dict(self, data):
return {self._placeholder: data}
class BatchInput(PlacholderTfInput):
def __init__(self, shape, dtype=tf.float32, name=None):
"""Creates a placeholder for a batch of tensors of a given shape and
dtype
Parameters
----------
shape: [int]
shape of a single elemenet of the batch
dtype: tf.dtype
number representation used for tensor contents
name: str
name of the underlying placeholder
"""
super().__init__(tf.placeholder(dtype, [None] + list(shape),
name=name))
class Uint8Input(PlacholderTfInput):
def __init__(self, shape, name=None):
"""Takes input in uint8 format which is cast to float32 and divided by
255 before passing it to the model.
On GPU this ensures lower data transfer times.
Parameters
----------
shape: [int]
shape of the tensor.
name: str
name of the underlying placeholder
"""
super().__init__(tf.placeholder(tf.uint8, [None] + list(shape),
name=name))
self._shape = shape
self._output = tf.cast(super().get(), tf.float32) / 255.0
def get(self):
return self._output
def ensure_tf_input(thing):
"""Takes either tf.placeholder of TfInput and outputs equivalent TfInput"""
if isinstance(thing, TfInput):
return thing
elif is_placeholder(thing):
return PlacholderTfInput(thing)
else:
raise ValueError("Must be a placeholder or TfInput")
# ================================================================
# Mathematical utils
# ================================================================
def huber_loss(x, delta=1.0):
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
return tf.where(
tf.abs(x) < delta,
tf.square(x) * 0.5,
delta * (tf.abs(x) - 0.5 * delta))
# ================================================================
# Optimizer utils
# ================================================================
def minimize_and_clip(optimizer, objective, var_list, clip_val=10):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (tf.clip_by_norm(grad, clip_val), var)
return optimizer.apply_gradients(gradients)
# ================================================================
# Global session
# ================================================================
def get_session():
"""Returns recently made Tensorflow session"""
return tf.get_default_session()
def make_session(num_cpu):
"""Returns a session that will use <num_cpu> CPU's only"""
tf_config = tf.ConfigProto(
inter_op_parallelism_threads=num_cpu,
intra_op_parallelism_threads=num_cpu)
return tf.Session(config=tf_config)
def single_threaded_session():
"""Returns a session which will only use a single CPU"""
return make_session(1)
ALREADY_INITIALIZED = set()
def initialize():
"""Initialize all the uninitialized variables in the global scope."""
new_variables = set(tf.global_variables()) - ALREADY_INITIALIZED
get_session().run(tf.variables_initializer(new_variables))
ALREADY_INITIALIZED.update(new_variables)
def eval(expr, feed_dict=None):
if feed_dict is None:
feed_dict = {}
return get_session().run(expr, feed_dict=feed_dict)
VALUE_SETTERS = collections.OrderedDict()
def set_value(v, val):
global VALUE_SETTERS
if v in VALUE_SETTERS:
set_op, set_endpoint = VALUE_SETTERS[v]
else:
set_endpoint = tf.placeholder(v.dtype)
set_op = v.assign(set_endpoint)
VALUE_SETTERS[v] = (set_op, set_endpoint)
get_session().run(set_op, feed_dict={set_endpoint: val})
# ================================================================
# Saving variables
# ================================================================
def load_state(fname):
saver = tf.train.Saver()
saver.restore(get_session(), fname)
def save_state(fname):
os.makedirs(os.path.dirname(fname), exist_ok=True)
saver = tf.train.Saver()
saver.save(get_session(), fname)
# ================================================================
# Model components
# ================================================================
def normc_initializer(std=1.0):
# pylint: disable=W0613
def _initializer(shape, dtype=None, partition_info=None):
out = np.random.randn(*shape).astype(np.float32)
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
return tf.constant(out)
return _initializer
def conv2d(
x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME",
dtype=tf.float32, collections=None, summary_tag=None):
with tf.variable_scope(name):
stride_shape = [1, stride[0], stride[1], 1]
filter_shape = [
filter_size[0], filter_size[1], int(x.get_shape()[3]), num_filters]
# there are "num input feature maps * filter height * filter width"
# inputs to each hidden unit
fan_in = intprod(filter_shape[:3])
# each unit in the lower layer receives a gradient from:
# "num output feature maps * filter height * filter width" /
# pooling size
fan_out = intprod(filter_shape[:2]) * num_filters
# initialize weights with random weights
w_bound = np.sqrt(6. / (fan_in + fan_out))
w = tf.get_variable(
"W", filter_shape, dtype,
tf.random_uniform_initializer(-w_bound, w_bound),
collections=collections)
b = tf.get_variable(
"b", [1, 1, 1, num_filters], initializer=tf.zeros_initializer(),
collections=collections)
if summary_tag is not None:
tf.summary.image(
summary_tag,
tf.transpose(tf.reshape(w, [filter_size[0],
filter_size[1], -1, 1]),
[2, 0, 1, 3]),
max_images=10)
return tf.nn.conv2d(x, w, stride_shape, pad) + b
def dense(x, size, name, weight_init=None, bias=True):
w = tf.get_variable(name + "/w", [x.get_shape()[1], size],
initializer=weight_init)
ret = tf.matmul(x, w)
if bias:
b = tf.get_variable(
name + "/b", [size], initializer=tf.zeros_initializer())
return ret + b
else:
return ret
def wndense(x, size, name, init_scale=1.0):
v = tf.get_variable(
name + "/V", [int(x.get_shape()[1]), size],
initializer=tf.random_normal_initializer(0, 0.05))
g = tf.get_variable(
name + "/g", [size], initializer=tf.constant_initializer(init_scale))
b = tf.get_variable(
name + "/b", [size], initializer=tf.constant_initializer(0.0))
# use weight normalization (Salimans & Kingma, 2016)
x = tf.matmul(x, v)
scaler = g / tf.sqrt(sum(tf.square(v), axis=0, keepdims=True))
return tf.reshape(scaler, [1, size]) * x + tf.reshape(b, [1, size])
def densenobias(x, size, name, weight_init=None):
return dense(x, size, name, weight_init=weight_init, bias=False)
def dropout(x, pkeep, phase=None, mask=None):
mask = tf.floor(
pkeep + tf.random_uniform(tf.shape(x))) if mask is None else mask
if phase is None:
return mask * x
else:
return switch(phase, mask * x, pkeep * x)
# ================================================================
# Theano-like Function
# ================================================================
def function(inputs, outputs, updates=None, givens=None):
"""Just like Theano function. Take a bunch of tensorflow placeholders and
expressions computed based on those placeholders and produces f(inputs) ->
outputs. Function f takes values to be fed to the input's placeholders and
produces the values of the expressions in outputs.
Input values can be passed in the same order as inputs or can be provided
as kwargs based on placeholder name (passed to constructor or accessible
via placeholder.op.name).
Example:
x = tf.placeholder(tf.int32, (), name="x")
y = tf.placeholder(tf.int32, (), name="y")
z = 3 * x + 2 * y
lin = function([x, y], z, givens={y: 0})
with single_threaded_session():
initialize()
assert lin(2) == 6
assert lin(x=3) == 9
assert lin(2, 2) == 10
assert lin(x=2, y=3) == 12
Parameters
----------
inputs: [tf.placeholder or TfInput]
list of input arguments
outputs: [tf.Variable] or tf.Variable
list of outputs or a single output to be returned from function. Returned
value will also have the same shape.
"""
if isinstance(outputs, list):
return _Function(inputs, outputs, updates, givens=givens)
elif isinstance(outputs, (dict, collections.OrderedDict)):
f = _Function(inputs, outputs.values(), updates, givens=givens)
return lambda *args, **kwargs: type(outputs)(
zip(outputs.keys(), f(*args, **kwargs)))
else:
f = _Function(inputs, [outputs], updates, givens=givens)
return lambda *args, **kwargs: f(*args, **kwargs)[0]
class _Function(object):
def __init__(self, inputs, outputs, updates, givens, check_nan=False):
for inpt in inputs:
if not issubclass(type(inpt), TfInput):
assert len(inpt.op.inputs) == 0, (
"inputs should all be placeholders of "
"ray.rllib.dqn.common.TfInput")
self.inputs = inputs
updates = updates or []
self.update_group = tf.group(*updates)
self.outputs_update = list(outputs) + [self.update_group]
self.givens = {} if givens is None else givens
self.check_nan = check_nan
def _feed_input(self, feed_dict, inpt, value):
if issubclass(type(inpt), TfInput):
feed_dict.update(inpt.make_feed_dict(value))
elif is_placeholder(inpt):
feed_dict[inpt] = value
def __call__(self, *args, **kwargs):
assert len(args) <= len(self.inputs), "Too many arguments provided"
feed_dict = {}
# Update the args
for inpt, value in zip(self.inputs, args):
self._feed_input(feed_dict, inpt, value)
# Update the kwargs
kwargs_passed_inpt_names = set()
for inpt in self.inputs[len(args):]:
inpt_name = inpt.name.split(':')[0]
inpt_name = inpt_name.split('/')[-1]
assert inpt_name not in kwargs_passed_inpt_names, (
"this function has two arguments with "
"the same name \"{}\", " +
"so kwargs cannot be used.".format(inpt_name))
if inpt_name in kwargs:
kwargs_passed_inpt_names.add(inpt_name)
self._feed_input(feed_dict, inpt, kwargs.pop(inpt_name))
else:
assert inpt in self.givens, "Missing argument " + inpt_name
assert len(kwargs) == 0, \
"Function got extra arguments " + str(list(kwargs.keys()))
# Update feed dict with givens.
for inpt in self.givens:
feed_dict[inpt] = feed_dict.get(inpt, self.givens[inpt])
results = get_session().run(self.outputs_update,
feed_dict=feed_dict)[:-1]
if self.check_nan:
if any(np.isnan(r).any() for r in results):
raise RuntimeError("Nan detected")
return results
def mem_friendly_function(nondata_inputs, data_inputs, outputs, batch_size):
if isinstance(outputs, list):
return _MemFriendlyFunction(
nondata_inputs, data_inputs, outputs, batch_size)
else:
f = _MemFriendlyFunction(
nondata_inputs, data_inputs, [outputs], batch_size)
return lambda *inputs: f(*inputs)[0]
class _MemFriendlyFunction(object):
def __init__(self, nondata_inputs, data_inputs, outputs, batch_size):
self.nondata_inputs = nondata_inputs
self.data_inputs = data_inputs
self.outputs = list(outputs)
self.batch_size = batch_size
def __call__(self, *inputvals):
assert len(inputvals) == (len(self.nondata_inputs) +
len(self.data_inputs))
nondata_vals = inputvals[0:len(self.nondata_inputs)]
data_vals = inputvals[len(self.nondata_inputs):]
feed_dict = dict(zip(self.nondata_inputs, nondata_vals))
n = data_vals[0].shape[0]
for v in data_vals[1:]:
assert v.shape[0] == n
for i_start in range(0, n, self.batch_size):
slice_vals = [
v[i_start:builtins.min(i_start + self.batch_size, n)]
for v in data_vals]
for (var, val) in zip(self.data_inputs, slice_vals):
feed_dict[var] = val
results = tf.get_default_session().run(self.outputs,
feed_dict=feed_dict)
if i_start == 0:
sum_results = results
else:
for i in range(len(results)):
sum_results[i] = sum_results[i] + results[i]
for i in range(len(results)):
sum_results[i] = sum_results[i] / n
return sum_results
# ================================================================
# Modules
# ================================================================
class Module(object):
def __init__(self, name):
self.name = name
self.first_time = True
self.scope = None
self.cache = {}
def __call__(self, *args):
if args in self.cache:
print("(%s) retrieving value from cache" % (self.name,))
return self.cache[args]
with tf.variable_scope(self.name, reuse=not self.first_time):
scope = tf.get_variable_scope().name
if self.first_time:
self.scope = scope
print("(%s) running function for the first time" %
(self.name,))
else:
assert self.scope == scope, \
"Tried calling function with a different scope"
print("(%s) running function on new inputs" % (self.name,))
self.first_time = False
out = self._call(*args)
self.cache[args] = out
return out
def _call(self, *args):
raise NotImplementedError
@property
def trainable_variables(self):
assert self.scope is not None, \
"need to call module once before getting variables"
return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, self.scope)
@property
def variables(self):
assert self.scope is not None, \
"need to call module once before getting variables"
return tf.get_collection(tf.GraphKeys.VARIABLES, self.scope)
def module(name):
@functools.wraps
def wrapper(f):
class WrapperModule(Module):
def _call(self, *args):
return f(*args)
return WrapperModule(name)
return wrapper
# ================================================================
# Graph traversal
# ================================================================
VARIABLES = {}
def get_parents(node):
return node.op.inputs
def topsorted(outputs):
"""
Topological sort via non-recursive depth-first search
"""
assert isinstance(outputs, (list, tuple))
marks = {}
out = []
stack = [] # pylint: disable=W0621
# i: node
# jidx = number of children visited so far from that node
# marks: state of each node, which is one of
# 0: haven't visited
# 1: have visited, but not done visiting children
# 2: done visiting children
for x in outputs:
stack.append((x, 0))
while stack:
(i, jidx) = stack.pop()
if jidx == 0:
m = marks.get(i, 0)
if m == 0:
marks[i] = 1
elif m == 1:
raise ValueError("not a dag")
else:
continue
ps = get_parents(i)
if jidx == len(ps):
marks[i] = 2
out.append(i)
else:
stack.append((i, jidx + 1))
j = ps[jidx]
stack.append((j, 0))
return out
# ================================================================
# Flat vectors
# ================================================================
def var_shape(x):
out = x.get_shape().as_list()
assert all(isinstance(a, int) for a in out), \
"shape function assumes that shape is fully known"
return out
def numel(x):
return intprod(var_shape(x))
def intprod(x):
return int(np.prod(x))
def flatgrad(loss, var_list):
grads = tf.gradients(loss, var_list)
return tf.concat(axis=0, values=[
tf.reshape(grad if grad is not None else tf.zeros_like(v), [numel(v)])
for (v, grad) in zip(var_list, grads)
])
class SetFromFlat(object):
def __init__(self, var_list, dtype=tf.float32):
assigns = []
shapes = list(map(var_shape, var_list))
total_size = np.sum([intprod(shape) for shape in shapes])
self.theta = theta = tf.placeholder(dtype, [total_size])
start = 0
assigns = []
for (shape, v) in zip(shapes, var_list):
size = intprod(shape)
assigns.append(
tf.assign(v, tf.reshape(theta[start:start + size], shape)))
start += size
self.op = tf.group(*assigns)
def __call__(self, theta):
get_session().run(self.op, feed_dict={self.theta: theta})
class GetFlat(object):
def __init__(self, var_list):
self.op = tf.concat(
axis=0, values=[tf.reshape(v, [numel(v)]) for v in var_list])
def __call__(self):
return get_session().run(self.op)
# ================================================================
# Misc
# ================================================================
def fancy_slice_2d(X, inds0, inds1):
"""
like numpy X[inds0, inds1]
XXX this implementation is bad
"""
inds0 = tf.cast(inds0, tf.int64)
inds1 = tf.cast(inds1, tf.int64)
shape = tf.cast(tf.shape(X), tf.int64)
ncols = shape[1]
Xflat = tf.reshape(X, [-1])
return tf.gather(Xflat, inds0 * ncols + inds1)
# ================================================================
# Scopes
# ================================================================
def scope_vars(scope, trainable_only=False):
"""
Get variables inside a scope
The scope can be specified as a string
Parameters
----------
scope: str or VariableScope
scope in which the variables reside.
trainable_only: bool
whether or not to return only the variables that were marked as
trainable.
Returns
-------
vars: [tf.Variable]
list of variables in `scope`.
"""
return tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES
if trainable_only else tf.GraphKeys.VARIABLES,
scope=scope if isinstance(scope, str) else scope.name)
def scope_name():
"""Returns the name of current scope as a string, e.g. deepq/q_func"""
return tf.get_variable_scope().name
def absolute_scope_name(relative_scope_name):
"""Appends parent scope name to `relative_scope_name`"""
return scope_name() + "/" + relative_scope_name
def lengths_to_mask(lengths_b, max_length):
"""
Turns a vector of lengths into a boolean mask
Args:
lengths_b: an integer vector of lengths
max_length: maximum length to fill the mask
Returns:
a boolean array of shape (batch_size, max_length)
row[i] consists of True repeated lengths_b[i] times, followed by False
"""
lengths_b = tf.convert_to_tensor(lengths_b)
assert lengths_b.get_shape().ndims == 1
mask_bt = tf.expand_dims(
tf.range(max_length), 0) < tf.expand_dims(lengths_b, 1)
return mask_bt
def in_session(f):
@functools.wraps(f)
def newfunc(*args, **kwargs):
with tf.Session():
f(*args, **kwargs)
return newfunc
_PLACEHOLDER_CACHE = {} # name -> (placeholder, dtype, shape)
def get_placeholder(name, dtype, shape):
if name in _PLACEHOLDER_CACHE:
out, dtype1, shape1 = _PLACEHOLDER_CACHE[name]
assert dtype1 == dtype and shape1 == shape
return out
else:
out = tf.placeholder(dtype=dtype, shape=shape, name=name)
_PLACEHOLDER_CACHE[name] = (out, dtype, shape)
return out
def get_placeholder_cached(name):
return _PLACEHOLDER_CACHE[name][0]
def flattenallbut0(x):
return tf.reshape(x, [-1, intprod(x.get_shape().as_list()[1:])])
def reset():
global _PLACEHOLDER_CACHE
global VARIABLES
_PLACEHOLDER_CACHE = {}
VARIABLES = {}
tf.reset_default_graph()
+92 -80
View File
@@ -9,63 +9,75 @@ import numpy as np
import tensorflow as tf
from ray.rllib.common import Algorithm, TrainingResult
from ray.rllib.dqn.build_graph import build_train
from ray.rllib.dqn import logger, models
from ray.rllib.dqn.common.atari_wrappers_deprecated \
import wrap_dqn, ScaledFloatFrame
from ray.rllib.dqn.common import tf_util as U
from ray.rllib.dqn.common.schedules import LinearSchedule
from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer
"""The default configuration dict for the DQN algorithm.
lr: float
learning rate for adam optimizer
schedule_max_timesteps: int
max num timesteps for annealing schedules
timesteps_per_iteration: int
number of env steps to optimize for before returning
buffer_size: int
size of the replay buffer
exploration_fraction: float
fraction of entire training period over which the exploration rate is
annealed
exploration_final_eps: float
final value of random action probability
train_freq: int
update the model every `train_freq` steps.
batch_size: int
size of a batched sampled from replay buffer for training
print_freq: int
how often to print out training progress
set to None to disable printing
checkpoint_freq: int
how often to save the model. This is so that the best version is restored
at the end of the training. If you do not wish to restore the best version
at the end of the training set this variable to None.
learning_starts: int
how many steps of the model to collect transitions for before learning
starts
gamma: float
discount factor
target_network_update_freq: int
update the target network every `target_network_update_freq` steps.
prioritized_replay: True
if True prioritized replay buffer will be used.
prioritized_replay_alpha: float
alpha parameter for prioritized replay buffer
prioritized_replay_beta0: float
initial value of beta for prioritized replay buffer
prioritized_replay_beta_iters: int
number of iterations over which beta will be annealed from initial value
to 1.0. If set to None equals to schedule_max_timesteps
prioritized_replay_eps: float
epsilon to add to the TD errors when updating priorities.
num_cpu: int
number of cpus to use for training
dueling: bool
whether to use dueling dqn
double_q: bool
whether to use double dqn
hiddens: array<int>
hidden layer sizes of the state and action value networks
model_config: dict
config options to pass to the model constructor
lr: float
learning rate for adam optimizer
schedule_max_timesteps: int
max num timesteps for annealing schedules
timesteps_per_iteration: int
number of env steps to optimize for before returning
buffer_size: int
size of the replay buffer
exploration_fraction: float
fraction of entire training period over which the exploration rate is
annealed
exploration_final_eps: float
final value of random action probability
train_freq: int
update the model every `train_freq` steps.
batch_size: int
size of a batched sampled from replay buffer for training
print_freq: int
how often to print out training progress
set to None to disable printing
checkpoint_freq: int
how often to save the model. This is so that the best version is
restored at the end of the training. If you do not wish to restore
the best version at the end of the training set this variable to None.
learning_starts: int
how many steps of the model to collect transitions for before learning
starts
gamma: float
discount factor
grad_norm_clipping: int or None
if not None, clip gradients during optimization at this value
target_network_update_freq: int
update the target network every `target_network_update_freq` steps.
prioritized_replay: True
if True prioritized replay buffer will be used.
prioritized_replay_alpha: float
alpha parameter for prioritized replay buffer
prioritized_replay_beta0: float
initial value of beta for prioritized replay buffer
prioritized_replay_beta_iters: int
number of iterations over which beta will be annealed from initial
value to 1.0. If set to None equals to schedule_max_timesteps
prioritized_replay_eps: float
epsilon to add to the TD errors when updating priorities.
num_cpu: int
number of cpus to use for training
"""
DEFAULT_CONFIG = dict(
dueling=True,
double_q=True,
hiddens=[256],
model_config={},
lr=5e-4,
schedule_max_timesteps=100000,
timesteps_per_iteration=1000,
@@ -78,6 +90,7 @@ DEFAULT_CONFIG = dict(
checkpoint_freq=10000,
learning_starts=1000,
gamma=1.0,
grad_norm_clipping=10,
target_network_update_freq=500,
prioritized_replay=False,
prioritized_replay_alpha=0.6,
@@ -92,24 +105,18 @@ class DQN(Algorithm):
config.update({"alg": "DQN"})
Algorithm.__init__(self, env_name, config, upload_dir=upload_dir)
env = gym.make(env_name)
env = ScaledFloatFrame(wrap_dqn(env))
# TODO(ekl): replace this with RLlib preprocessors
if "NoFrameskip" in env_name:
env = ScaledFloatFrame(wrap_dqn(env))
self.env = env
model = models.cnn_to_mlp(
convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)],
hiddens=[256], dueling=True)
sess = U.make_session(num_cpu=config["num_cpu"])
sess.__enter__()
def make_obs_ph(name):
return U.BatchInput(env.observation_space.shape, name=name)
num_cpu = config["num_cpu"]
tf_config = tf.ConfigProto(
inter_op_parallelism_threads=num_cpu,
intra_op_parallelism_threads=num_cpu)
self.sess = tf.Session(config=tf_config)
self.dqn_graph = models.DQNGraph(env, config)
self.act, self.optimize, self.update_target, self.debug = build_train(
make_obs_ph=make_obs_ph,
q_func=model,
num_actions=env.action_space.n,
optimizer=tf.train.AdamOptimizer(learning_rate=config["lr"]),
gamma=config["gamma"],
grad_norm_clipping=10)
# Create the replay buffer
if config["prioritized_replay"]:
self.replay_buffer = PrioritizedReplayBuffer(
@@ -136,8 +143,8 @@ class DQN(Algorithm):
final_p=config["exploration_final_eps"])
# Initialize the parameters and copy them to the target network.
U.initialize()
self.update_target()
self.sess.run(tf.global_variables_initializer())
self.dqn_graph.update_target(self.sess)
self.episode_rewards = [0.0]
self.episode_lengths = [0.0]
@@ -145,18 +152,19 @@ class DQN(Algorithm):
self.obs = self.env.reset()
self.num_timesteps = 0
self.num_iterations = 0
self.file_writer = tf.summary.FileWriter(self.logdir, self.sess.graph)
def train(self):
config = self.config
sample_time, learn_time = 0, 0
for t in range(config["timesteps_per_iteration"]):
for _ in range(config["timesteps_per_iteration"]):
self.num_timesteps += 1
dt = time.time()
# Take action and update exploration to the newest value
action = self.act(
np.array(self.obs)[None],
update_eps=self.exploration.value(t))[0]
action = self.dqn_graph.act(
self.sess, np.array(self.obs)[None],
self.exploration.value(self.num_timesteps))[0]
new_obs, rew, done, _ = self.env.step(action)
# Store transition in the replay buffer.
self.replay_buffer.add(self.obs, action, rew, new_obs, float(done))
@@ -177,28 +185,29 @@ class DQN(Algorithm):
# from replay buffer.
if config["prioritized_replay"]:
experience = self.replay_buffer.sample(
config["batch_size"], beta=self.beta_schedule.value(t))
config["batch_size"],
beta=self.beta_schedule.value(self.num_timesteps))
(obses_t, actions, rewards, obses_tp1,
dones, _, batch_idxes) = experience
else:
obses_t, actions, rewards, obses_tp1, dones = \
self.replay_buffer.sample(config["batch_size"])
obses_t, actions, rewards, obses_tp1, dones = (
self.replay_buffer.sample(config["batch_size"]))
batch_idxes = None
td_errors = self.optimize(
obses_t, actions, rewards, obses_tp1, dones,
td_errors = self.dqn_graph.train(
self.sess, obses_t, actions, rewards, obses_tp1, dones,
np.ones_like(rewards))
if config["prioritized_replay"]:
new_priorities = (np.abs(td_errors) +
config["prioritized_replay_eps"])
self.replay_buffer.update_priorities(batch_idxes,
new_priorities)
new_priorities = np.abs(td_errors) + (
config["prioritized_replay_eps"])
self.replay_buffer.update_priorities(
batch_idxes, new_priorities)
learn_time += (time.time() - dt)
if (self.num_timesteps > config["learning_starts"] and
if self.num_timesteps > config["learning_starts"] and (
self.num_timesteps %
config["target_network_update_freq"] == 0):
# Update target network periodically.
self.update_target()
self.dqn_graph.update_target(self.sess)
mean_100ep_reward = round(np.mean(self.episode_rewards[-101:-1]), 1)
mean_100ep_length = round(np.mean(self.episode_lengths[-101:-1]), 1)
@@ -209,16 +218,19 @@ class DQN(Algorithm):
"learn_time": learn_time,
"steps": self.num_timesteps,
"episodes": num_episodes,
"exploration": int(100 * self.exploration.value(t))
"exploration": int(
100 * self.exploration.value(self.num_timesteps))
}
logger.record_tabular("sample_time", sample_time)
logger.record_tabular("learn_time", learn_time)
logger.record_tabular("steps", self.num_timesteps)
logger.record_tabular("buffer_size", len(self.replay_buffer))
logger.record_tabular("episodes", num_episodes)
logger.record_tabular("mean 100 episode reward", mean_100ep_reward)
logger.record_tabular(
"% time spent exploring", int(100 * self.exploration.value(t)))
"% time spent exploring",
int(100 * self.exploration.value(self.num_timesteps)))
logger.dump_tabular()
res = TrainingResult(
+48
View File
@@ -0,0 +1,48 @@
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import tensorflow as tf
import ray
from ray.rllib.dqn import DQN, DEFAULT_CONFIG
def main():
parser = argparse.ArgumentParser(description="Run the A3C algorithm.")
parser.add_argument("--iterations", default=-1, type=int,
help="The number of training iterations to run.")
args = parser.parse_args()
config = DEFAULT_CONFIG.copy()
config.update(dict(
lr=1e-3,
schedule_max_timesteps=100000,
exploration_fraction=0.1,
exploration_final_eps=0.02,
dueling=False,
hiddens=[],
model_config=dict(
fcnet_hiddens=[64],
fcnet_activation=tf.nn.relu
)))
# Currently Ray is not used in this example, but we need to call ray.init
# to create the directory in which logging will occur. TODO(rkn): Fix this.
ray.init()
dqn = DQN("CartPole-v0", config)
iteration = 0
while iteration != args.iterations:
iteration += 1
res = dqn.train()
print("current status: {}".format(res))
if __name__ == "__main__":
main()
+10 -2
View File
@@ -140,11 +140,15 @@ record_tabular = logkv
dump_tabular = dumpkvs
def log(*args, level=INFO):
def log(*args, **kwargs):
"""
Write the sequence of args, with no separators, to the console and output
files (if you've configured an output file).
"""
if "level" in kwargs:
level = kwargs["level"]
else:
level = INFO
Logger.CURRENT.log(*args, level=level)
@@ -216,7 +220,11 @@ class Logger(object):
fmt.writekvs(self.name2val)
self.name2val.clear()
def log(self, *args, level=INFO):
def log(self, *args, **kwargs):
if "level" in kwargs:
level = kwargs["level"]
else:
level = INFO
if self.level <= level:
self._do_log(args)
+192 -77
View File
@@ -5,94 +5,209 @@ from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.layers as layers
from ray.rllib.models import ModelCatalog
def _mlp(hiddens, inpt, num_actions, scope, reuse=False):
with tf.variable_scope(scope, reuse=reuse):
out = inpt
def _build_q_network(inputs, num_actions, config):
dueling = config["dueling"]
hiddens = config["hiddens"]
frontend = ModelCatalog.get_model(inputs, 1, config["model_config"])
frontend_out = frontend.last_layer
with tf.variable_scope("action_value"):
action_out = frontend_out
for hidden in hiddens:
out = layers.fully_connected(
out, num_outputs=hidden, activation_fn=tf.nn.relu)
out = layers.fully_connected(
out, num_outputs=num_actions, activation_fn=None)
return out
action_out = layers.fully_connected(
action_out, num_outputs=hidden, activation_fn=tf.nn.relu)
action_scores = layers.fully_connected(
action_out, num_outputs=num_actions, activation_fn=None)
def mlp(hiddens=[]):
"""This model takes as input an observation and returns values of all
actions.
Parameters
----------
hiddens: [int]
list of sizes of hidden layers
Returns
-------
q_func: function
q_function for DQN algorithm.
"""
return lambda *args, **kwargs: _mlp(hiddens, *args, **kwargs)
def _cnn_to_mlp(
convs, hiddens, dueling, inpt, num_actions, scope, reuse=False):
with tf.variable_scope(scope, reuse=reuse):
out = inpt
with tf.variable_scope("convnet"):
for num_outputs, kernel_size, stride in convs:
out = layers.convolution2d(
out,
num_outputs=num_outputs,
kernel_size=kernel_size,
stride=stride,
activation_fn=tf.nn.relu)
out = layers.flatten(out)
with tf.variable_scope("action_value"):
action_out = out
if dueling:
with tf.variable_scope("state_value"):
state_out = frontend_out
for hidden in hiddens:
action_out = layers.fully_connected(
action_out, num_outputs=hidden, activation_fn=tf.nn.relu)
action_scores = layers.fully_connected(
action_out, num_outputs=num_actions, activation_fn=None)
if dueling:
with tf.variable_scope("state_value"):
state_out = out
for hidden in hiddens:
state_out = layers.fully_connected(
state_out, num_outputs=hidden,
activation_fn=tf.nn.relu)
state_score = layers.fully_connected(
state_out, num_outputs=1, activation_fn=None)
action_scores_mean = tf.reduce_mean(action_scores, 1)
action_scores_centered = action_scores - tf.expand_dims(
action_scores_mean, 1)
return state_score + action_scores_centered
else:
return action_scores
return out
state_out = layers.fully_connected(
state_out, num_outputs=hidden, activation_fn=tf.nn.relu)
state_score = layers.fully_connected(
state_out, num_outputs=1, activation_fn=None)
action_scores_mean = tf.reduce_mean(action_scores, 1)
action_scores_centered = action_scores - tf.expand_dims(
action_scores_mean, 1)
return state_score + action_scores_centered
else:
return action_scores
def cnn_to_mlp(convs, hiddens, dueling=False):
"""This model takes an observation and returns values for all actions.
def _build_action_network(
q_values, observations, num_actions, stochastic, eps):
deterministic_actions = tf.argmax(q_values, axis=1)
batch_size = tf.shape(observations)[0]
random_actions = tf.random_uniform(
tf.stack([batch_size]), minval=0, maxval=num_actions, dtype=tf.int64)
chose_random = tf.random_uniform(
tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps
stochastic_actions = tf.where(
chose_random, random_actions, deterministic_actions)
return tf.cond(
stochastic, lambda: stochastic_actions,
lambda: deterministic_actions)
def _huber_loss(x, delta=1.0):
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
return tf.where(
tf.abs(x) < delta,
tf.square(x) * 0.5,
delta * (tf.abs(x) - 0.5 * delta))
def _minimize_and_clip(optimizer, objective, var_list, clip_val=10):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (tf.clip_by_norm(grad, clip_val), var)
return optimizer.apply_gradients(gradients)
def _scope_vars(scope, trainable_only=False):
"""
Get variables inside a scope
The scope can be specified as a string
Parameters
----------
convs: [(int, int int)]
list of convolutional layers in form of
(num_outputs, kernel_size, stride)
hiddens: [int]
list of sizes of hidden layers
dueling: bool
if true double the output MLP to compute a baseline
for action scores
scope: str or VariableScope
scope in which the variables reside.
trainable_only: bool
whether or not to return only the variables that were marked as
trainable.
Returns
-------
q_func: function
q_function for DQN algorithm.
vars: [tf.Variable]
list of variables in `scope`.
"""
return tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES
if trainable_only else tf.GraphKeys.VARIABLES,
scope=scope if isinstance(scope, str) else scope.name)
return lambda *args, **kwargs: _cnn_to_mlp(
convs, hiddens, dueling, *args, **kwargs)
class DQNGraph(object):
def __init__(self, env, config):
self.env = env
num_actions = env.action_space.n
optimizer = tf.train.AdamOptimizer(learning_rate=config["lr"])
# Action inputs
self.stochastic = tf.placeholder(tf.bool, (), name="stochastic")
self.eps = tf.placeholder(tf.float32, (), name="eps")
self.cur_observations = tf.placeholder(
tf.float32, shape=(None,) + env.observation_space.shape)
# Action Q network
with tf.variable_scope("q_func") as scope:
q_values = _build_q_network(
self.cur_observations, num_actions, config)
q_func_vars = _scope_vars(scope.name)
# Action outputs
self.output_actions = _build_action_network(
q_values,
self.cur_observations,
num_actions,
self.stochastic,
self.eps)
# Replay inputs
self.obs_t = tf.placeholder(
tf.float32, shape=(None,) + env.observation_space.shape)
self.act_t = tf.placeholder(tf.int32, [None], name="action")
self.rew_t = tf.placeholder(tf.float32, [None], name="reward")
self.obs_tp1 = tf.placeholder(
tf.float32, shape=(None,) + env.observation_space.shape)
self.done_mask = tf.placeholder(tf.float32, [None], name="done")
self.importance_weights = tf.placeholder(
tf.float32, [None], name="weight")
# q network evaluation
with tf.variable_scope("q_func", reuse=True):
self.q_t = _build_q_network(self.obs_t, num_actions, config)
# target q network evalution
with tf.variable_scope("target_q_func") as scope:
self.q_tp1 = _build_q_network(self.obs_tp1, num_actions, config)
target_q_func_vars = _scope_vars(scope.name)
# q scores for actions which we know were selected in the given state.
q_t_selected = tf.reduce_sum(
self.q_t * tf.one_hot(self.act_t, num_actions), 1)
# compute estimate of best possible value starting from state at t + 1
if config["double_q"]:
with tf.variable_scope("q_func", reuse=True):
q_tp1_using_online_net = _build_q_network(
self.obs_tp1, num_actions, config)
q_tp1_best_using_online_net = tf.arg_max(q_tp1_using_online_net, 1)
q_tp1_best = tf.reduce_sum(
self.q_tp1 * tf.one_hot(
q_tp1_best_using_online_net, num_actions), 1)
else:
q_tp1_best = tf.reduce_max(self.q_tp1, 1)
q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best
# compute RHS of bellman equation
q_t_selected_target = self.rew_t + config["gamma"] * q_tp1_best_masked
# compute the error (potentially clipped)
self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target)
errors = _huber_loss(self.td_error)
weighted_error = tf.reduce_mean(self.importance_weights * errors)
# compute optimization op (potentially with gradient clipping)
if config["grad_norm_clipping"] is not None:
self.optimize_expr = _minimize_and_clip(
optimizer, weighted_error, var_list=q_func_vars,
clip_val=config["grad_norm_clipping"])
else:
self.optimize_expr = optimizer.minimize(
weighted_error, var_list=q_func_vars)
# update_target_fn will be called periodically to copy Q network to
# target Q network
update_target_expr = []
for var, var_target in zip(
sorted(q_func_vars, key=lambda v: v.name),
sorted(target_q_func_vars, key=lambda v: v.name)):
update_target_expr.append(var_target.assign(var))
self.update_target_expr = tf.group(*update_target_expr)
def update_target(self, sess):
return sess.run(self.update_target_expr)
def act(self, sess, obs, eps, stochastic=True):
return sess.run(
self.output_actions,
feed_dict={
self.cur_observations: obs,
self.stochastic: stochastic,
self.eps: eps,
})
def train(
self, sess, obs_t, act_t, rew_t, obs_tp1, done_mask,
importance_weights):
td_err, _ = sess.run(
[self.td_error, self.optimize_expr],
feed_dict={
self.obs_t: obs_t,
self.act_t: act_t,
self.rew_t: rew_t,
self.obs_tp1: obs_tp1,
self.done_mask: done_mask,
self.importance_weights: importance_weights
})
return td_err
-25
View File
@@ -1,25 +0,0 @@
#!/usr/bin/env python
"""Demonstrates the RLlib algorithm API through a simple bakeoff."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
import ray.rllib.evolution_strategies as es
import ray.rllib.policy_gradient as pg
if __name__ == "__main__":
ray.init()
# TODO(ekl): get the algorithms working on a common set of envs
env_name = "CartPole-v0"
alg1 = es.EvolutionStrategies(env_name, es.DEFAULT_CONFIG)
alg2 = pg.PolicyGradient(env_name, pg.DEFAULT_CONFIG)
while True:
r1 = alg1.train()
r2 = alg2.train()
print("evolution strategies: {}".format(r1))
print("policy gradient: {}".format(r2))
+7 -3
View File
@@ -44,23 +44,27 @@ class ModelCatalog(object):
"Unsupported args: {} {}".format(action_space, dist_type))
@staticmethod
def get_model(inputs, num_outputs):
def get_model(inputs, num_outputs, options=None):
"""Returns a suitable model conforming to given input and output specs.
Args:
inputs (Tensor): The input tensor to the model.
num_outputs (int): The size of the output vector of the model.
options (dict): Optional args to pass to the model constructor.
Returns:
model (Model): Neural network model.
"""
if options is None:
options = {}
obs_rank = len(inputs.get_shape()) - 1
if obs_rank > 1:
return VisionNetwork(inputs, num_outputs)
return VisionNetwork(inputs, num_outputs, options)
return FullyConnectedNetwork(inputs, num_outputs)
return FullyConnectedNetwork(inputs, num_outputs, options)
@staticmethod
def get_preprocessor(env_name):
+19 -13
View File
@@ -21,17 +21,23 @@ def normc_initializer(std=1.0):
class FullyConnectedNetwork(Model):
"""Generic fully connected network."""
def _init(self, inputs, num_outputs):
def _init(self, inputs, num_outputs, options):
hiddens = options.get("fcnet_hiddens", [256, 256])
activation = options.get("fcnet_activation", tf.nn.tanh)
print("Constructing fcnet {} {}".format(hiddens, activation))
with tf.name_scope("fc_net"):
fc1 = slim.fully_connected(
inputs, 256, weights_initializer=normc_initializer(1.0),
activation_fn=tf.nn.tanh,
scope="fc1")
fc2 = slim.fully_connected(
fc1, 256, weights_initializer=normc_initializer(1.0),
activation_fn=tf.nn.tanh,
scope="fc2")
fc3 = slim.fully_connected(
fc2, num_outputs, weights_initializer=normc_initializer(0.01),
activation_fn=None, scope="fc3")
return fc3
i = 1
last_layer = inputs
for size in hiddens:
last_layer = slim.fully_connected(
last_layer, size,
weights_initializer=normc_initializer(1.0),
activation_fn=activation,
scope="fc{}".format(i))
i += 1
output = slim.fully_connected(
last_layer, num_outputs,
weights_initializer=normc_initializer(0.01),
activation_fn=None, scope="fc_out")
return output, last_layer
+12 -11
View File
@@ -9,20 +9,21 @@ class Model(object):
Models convert input tensors to a number of output features. These features
can then be interpreted by ActionDistribution classes to determine
e.g. agent action values.
The last layer of the network can also be retrieved if the algorithm
needs to further post-processing (e.g. Actor and Critic networks in A3C).
Attributes:
inputs (Tensor): The input placeholder for this model.
outputs (Tensor): The output vector of this model.
last_layer (Tensor): The network layer right before the model output.
"""
def __init__(self, inputs, num_outputs):
def __init__(self, inputs, num_outputs, options):
self.inputs = inputs
self.outputs = self._init(inputs, num_outputs)
self.outputs, self.last_layer = self._init(
inputs, num_outputs, options)
def _init(self):
"""Initializes the model given self.inputs and self.num_outputs."""
"""Builds and returns the output and last layer of the network."""
raise NotImplementedError
def inputs(self):
"""Returns the input placeholder for this model."""
return self.inputs
def outputs(self):
"""Returns the output tensor of this model."""
return self.outputs
+2 -2
View File
@@ -11,7 +11,7 @@ from ray.rllib.models.model import Model
class VisionNetwork(Model):
"""Generic vision network."""
def _init(self, inputs, num_outputs):
def _init(self, inputs, num_outputs, options):
with tf.name_scope("vision_net"):
conv1 = slim.conv2d(inputs, 16, [8, 8], 4, scope="conv1")
conv2 = slim.conv2d(conv1, 32, [4, 4], 2, scope="conv2")
@@ -19,4 +19,4 @@ class VisionNetwork(Model):
conv2, 512, [10, 10], padding="VALID", scope="fc1")
fc2 = slim.conv2d(fc1, num_outputs, [1, 1], activation_fn=None,
normalizer_fn=None, scope="fc2")
return tf.squeeze(fc2, [1, 2])
return tf.squeeze(fc2, [1, 2]), tf.squeeze(fc1, [1, 2])