From b6a18cb39bc3b8a51a66d3dad28cdd6bde34f470 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 26 Jul 2017 12:29:00 -0700 Subject: [PATCH] [rllib] Also refactor DQN to use shared RLlib models (#730) * wip * works with cartpole * lint * fix pg * comment * action dist rename * preprocessor * fix test * typo * fix the action[0] nonsense * revert * satisfy the lint * wip * works with cartpole * lint * fix pg * comment * action dist rename * preprocessor * fix test * typo * fix the action[0] nonsense * revert * satisfy the lint * Minor indentation changes. * fix merge * add humanoid * initial dqn refactor * remove tfutil * fix calls * fix tf errors 1 * closer * runs now * lint * tensorboard graph * fix linting * more 4 space * fix * fix linT * more lint * oops * es parity * remove example.py * fix training bug * add cartpole demo * try fixing cartpole * allow model options, configure cartpole * debug * simplify * no dueling * avoid out of file handles * Test dqn in jenkins. * Minor formatting. * fix issue * fix another * Fix problem in which we log to a directory that hasn't been created. --- python/ray/rllib/dqn/build_graph.py | 284 ------- .../dqn/common/atari_wrappers_deprecated.py | 25 +- python/ray/rllib/dqn/common/tf_util.py | 793 ------------------ python/ray/rllib/dqn/dqn.py | 172 ++-- python/ray/rllib/dqn/example-cartpole.py | 48 ++ python/ray/rllib/dqn/logger.py | 12 +- python/ray/rllib/dqn/models.py | 269 ++++-- python/ray/rllib/example.py | 25 - python/ray/rllib/models/catalog.py | 10 +- python/ray/rllib/models/fcnet.py | 32 +- python/ray/rllib/models/model.py | 23 +- python/ray/rllib/models/visionnet.py | 4 +- test/jenkins_tests/run_multi_node_tests.sh | 4 + 13 files changed, 399 insertions(+), 1302 deletions(-) delete mode 100644 python/ray/rllib/dqn/build_graph.py delete mode 100644 python/ray/rllib/dqn/common/tf_util.py create mode 100755 python/ray/rllib/dqn/example-cartpole.py delete mode 100755 python/ray/rllib/example.py diff --git a/python/ray/rllib/dqn/build_graph.py b/python/ray/rllib/dqn/build_graph.py deleted file mode 100644 index a3e4decc8..000000000 --- a/python/ray/rllib/dqn/build_graph.py +++ /dev/null @@ -1,284 +0,0 @@ -"""Deep Q learning graph - -The functions in this file can are used to create the following functions: - -======= act ======== - - Function to chose an action given an observation - - Parameters - ---------- - observation: object - Observation that can be feed into the output of make_obs_ph - stochastic: bool - if set to False all the actions are always deterministic - (default False) - update_eps_ph: float - update epsilon a new value, if negative not update happens - (default: no update) - - Returns - ------- - Tensor of dtype tf.int64 and shape (BATCH_SIZE,) with an action to be - performed for every element of the batch. - - -======= train ======= - - Function that takes a transition (s,a,r,s') and optimizes Bellman - equation's error: - - td_error = Q(s,a) - (r + gamma * max_a' Q(s', a')) - loss = huber_loss[td_error] - - Parameters - ---------- - obs_t: object - a batch of observations - action: np.array - actions that were selected upon seeing obs_t. - dtype must be int32 and shape must be (batch_size,) - reward: np.array - immediate reward attained after executing those actions - dtype must be float32 and shape must be (batch_size,) - obs_tp1: object - observations that followed obs_t - done: np.array - 1 if obs_t was the last observation in the episode and 0 otherwise - obs_tp1 gets ignored, but must be of the valid shape. - dtype must be float32 and shape must be (batch_size,) - weight: np.array - imporance weights for every element of the batch (gradient is - multiplied by the importance weight) dtype must be float32 and shape - must be (batch_size,) - - Returns - ------- - td_error: np.array - a list of differences between Q(s,a) and the target in Bellman's - equation. dtype is float32 and shape is (batch_size,) - -======= update_target ======== - - copy the parameters from optimized Q function to the target Q function. - In Q learning we actually optimize the following error: - - Q(s,a) - (r + gamma * max_a' Q'(s', a')) - - Where Q' is lagging behind Q to stablize the learning. For example for - Atari - - Q' is set to Q once every 10000 updates training steps. - -""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -from ray.rllib.dqn.common import tf_util as U - - -def build_act(make_obs_ph, q_func, num_actions, scope="deepq", reuse=None): - """Creates the act function: - - Parameters - ---------- - make_obs_ph: str -> tf.placeholder or TfInput - a function that take a name and creates a placeholder of input with - that name - q_func: (tf.Variable, int, str, bool) -> tf.Variable - the model that takes the following inputs: - observation_in: object - the output of observation placeholder - num_actions: int - number of actions - scope: str - reuse: bool - should be passed to outer variable scope - and returns a tensor of shape (batch_size, num_actions) with values of - every action. - num_actions: int - number of actions. - scope: str or VariableScope - optional scope for variable_scope. - reuse: bool or None - whether or not the variables should be reused. To be able to reuse the - scope must be given. - - Returns - ------- - act: (tf.Variable, bool, float) -> tf.Variable - function to select and action given observation. - ` See the top of the file for details. - """ - with tf.variable_scope(scope, reuse=reuse): - observations_ph = U.ensure_tf_input(make_obs_ph("observation")) - stochastic_ph = tf.placeholder(tf.bool, (), name="stochastic") - update_eps_ph = tf.placeholder(tf.float32, (), name="update_eps") - - eps = tf.get_variable( - "eps", (), initializer=tf.constant_initializer(0)) - - q_values = q_func(observations_ph.get(), num_actions, scope="q_func") - deterministic_actions = tf.argmax(q_values, axis=1) - - batch_size = tf.shape(observations_ph.get())[0] - random_actions = tf.random_uniform( - tf.stack([batch_size]), minval=0, maxval=num_actions, - dtype=tf.int64) - chose_random = tf.random_uniform( - tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps - stochastic_actions = tf.where( - chose_random, random_actions, deterministic_actions) - - output_actions = tf.cond( - stochastic_ph, lambda: stochastic_actions, - lambda: deterministic_actions) - update_eps_expr = eps.assign( - tf.cond(update_eps_ph >= 0, lambda: update_eps_ph, lambda: eps)) - - act = U.function( - inputs=[observations_ph, stochastic_ph, update_eps_ph], - outputs=output_actions, - givens={update_eps_ph: -1.0, stochastic_ph: True}, - updates=[update_eps_expr]) - return act - - -def build_train( - make_obs_ph, q_func, num_actions, optimizer, grad_norm_clipping=None, - gamma=1.0, double_q=True, scope="deepq", reuse=None): - """Creates the train function: - - Parameters - ---------- - make_obs_ph: str -> tf.placeholder or TfInput - a function that takes a name and creates a placeholder of input with - that name - q_func: (tf.Variable, int, str, bool) -> tf.Variable - the model that takes the following inputs: - observation_in: object - the output of observation placeholder - num_actions: int - number of actions - scope: str - reuse: bool - should be passed to outer variable scope - and returns a tensor of shape (batch_size, num_actions) with values of - every action. - num_actions: int - number of actions - reuse: bool - whether or not to reuse the graph variables - optimizer: tf.train.Optimizer - optimizer to use for the Q-learning objective. - grad_norm_clipping: float or None - clip gradient norms to this value. If None no clipping is performed. - gamma: float - discount rate. - double_q: bool - if true will use Double Q Learning (https://arxiv.org/abs/1509.06461). - In general it is a good idea to keep it enabled. - scope: str or VariableScope - optional scope for variable_scope. - reuse: bool or None - whether or not the variables should be reused. To be able to reuse the - scope must be given. - - Returns - ------- - act: (tf.Variable, bool, float) -> tf.Variable - function to select and action given observation. - ` See the top of the file for details. - train: (object, np.array, np.array, object, np.array, np.array) -> np.array - optimize the error in Bellman's equation. - ` See the top of the file for details. - update_target: () -> () - copy the parameters from optimized Q function to the target Q function. - ` See the top of the file for details. - debug: {str: function} - a bunch of functions to print debug data like q_values. - """ - act_f = build_act(make_obs_ph, q_func, num_actions, scope=scope, - reuse=reuse) - - with tf.variable_scope(scope, reuse=reuse): - # set up placeholders - obs_t_input = U.ensure_tf_input(make_obs_ph("obs_t")) - act_t_ph = tf.placeholder(tf.int32, [None], name="action") - rew_t_ph = tf.placeholder(tf.float32, [None], name="reward") - obs_tp1_input = U.ensure_tf_input(make_obs_ph("obs_tp1")) - done_mask_ph = tf.placeholder(tf.float32, [None], name="done") - importance_weights_ph = tf.placeholder(tf.float32, [None], - name="weight") - - # q network evaluation - q_t = q_func( - obs_t_input.get(), num_actions, scope="q_func", - reuse=True) # reuse parameters from act - q_func_vars = U.scope_vars(U.absolute_scope_name("q_func")) - - # target q network evalution - q_tp1 = q_func(obs_tp1_input.get(), num_actions, scope="target_q_func") - target_q_func_vars = U.scope_vars( - U.absolute_scope_name("target_q_func")) - - # q scores for actions which we know were selected in the given state. - q_t_selected = tf.reduce_sum(q_t * tf.one_hot(act_t_ph, num_actions), - 1) - - # compute estimate of best possible value starting from state at t + 1 - if double_q: - q_tp1_using_online_net = q_func( - obs_tp1_input.get(), num_actions, scope="q_func", reuse=True) - q_tp1_best_using_online_net = tf.arg_max(q_tp1_using_online_net, 1) - q_tp1_best = tf.reduce_sum( - q_tp1 * tf.one_hot(q_tp1_best_using_online_net, num_actions), - 1) - else: - q_tp1_best = tf.reduce_max(q_tp1, 1) - q_tp1_best_masked = (1.0 - done_mask_ph) * q_tp1_best - - # compute RHS of bellman equation - q_t_selected_target = rew_t_ph + gamma * q_tp1_best_masked - - # compute the error (potentially clipped) - td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) - errors = U.huber_loss(td_error) - weighted_error = tf.reduce_mean(importance_weights_ph * errors) - # compute optimization op (potentially with gradient clipping) - if grad_norm_clipping is not None: - optimize_expr = U.minimize_and_clip( - optimizer, weighted_error, var_list=q_func_vars, - clip_val=grad_norm_clipping) - else: - optimize_expr = optimizer.minimize(weighted_error, - var_list=q_func_vars) - - # update_target_fn will be called periodically to copy Q network to - # target Q network - update_target_expr = [] - for var, var_target in zip( - sorted(q_func_vars, key=lambda v: v.name), - sorted(target_q_func_vars, key=lambda v: v.name)): - update_target_expr.append(var_target.assign(var)) - update_target_expr = tf.group(*update_target_expr) - - # Create callable functions - train = U.function( - inputs=[ - obs_t_input, - act_t_ph, - rew_t_ph, - obs_tp1_input, - done_mask_ph, - importance_weights_ph - ], - outputs=td_error, - updates=[optimize_expr]) - update_target = U.function([], [], updates=[update_target_expr]) - - q_values = U.function([obs_t_input], q_t) - - return act_f, train, update_target, {'q_values': q_values} diff --git a/python/ray/rllib/dqn/common/atari_wrappers_deprecated.py b/python/ray/rllib/dqn/common/atari_wrappers_deprecated.py index 805744789..37d4125d4 100644 --- a/python/ray/rllib/dqn/common/atari_wrappers_deprecated.py +++ b/python/ray/rllib/dqn/common/atari_wrappers_deprecated.py @@ -126,13 +126,14 @@ class MaxAndSkipEnv(gym.Wrapper): return obs -class ProcessFrame84(gym.ObservationWrapper): +# TODO(ekl): switch this to use a RLlib common preprocessor +class ProcessFrame80(gym.ObservationWrapper): def __init__(self, env=None): - super(ProcessFrame84, self).__init__(env) - self.observation_space = spaces.Box(low=0, high=255, shape=(84, 84, 1)) + super(ProcessFrame80, self).__init__(env) + self.observation_space = spaces.Box(low=0, high=255, shape=(80, 80, 1)) def _observation(self, obs): - return ProcessFrame84.process(obs) + return ProcessFrame80.process(obs) @staticmethod def process(frame): @@ -144,10 +145,10 @@ class ProcessFrame84(gym.ObservationWrapper): assert False, "Unknown resolution." img = (img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114) - resized_screen = cv2.resize(img, (84, 110), - interpolation=cv2.INTER_AREA) - x_t = resized_screen[18:102, :] - x_t = np.reshape(x_t, [84, 84, 1]) + resized_screen = cv2.resize( + img, (80, 110), interpolation=cv2.INTER_AREA) + x_t = resized_screen[20:100, :] + x_t = np.reshape(x_t, [80, 80, 1]) return x_t.astype(np.uint8) @@ -225,7 +226,7 @@ def wrap_dqn(env): env = MaxAndSkipEnv(env, skip=4) if 'FIRE' in env.unwrapped.get_action_meanings(): env = FireResetEnv(env) - env = ProcessFrame84(env) + env = ProcessFrame80(env) env = FrameStack(env, 4) env = ClippedRewardsWrapper(env) return env @@ -234,7 +235,7 @@ def wrap_dqn(env): class A2cProcessFrame(gym.Wrapper): def __init__(self, env): gym.Wrapper.__init__(self, env) - self.observation_space = spaces.Box(low=0, high=255, shape=(84, 84, 1)) + self.observation_space = spaces.Box(low=0, high=255, shape=(80, 80, 1)) def _step(self, action): ob, reward, done, info = self.env.step(action) @@ -246,5 +247,5 @@ class A2cProcessFrame(gym.Wrapper): @staticmethod def process(frame): frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) - frame = cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA) - return frame.reshape(84, 84, 1) + frame = cv2.resize(frame, (80, 80), interpolation=cv2.INTER_AREA) + return frame.reshape(80, 80, 1) diff --git a/python/ray/rllib/dqn/common/tf_util.py b/python/ray/rllib/dqn/common/tf_util.py deleted file mode 100644 index bbc718730..000000000 --- a/python/ray/rllib/dqn/common/tf_util.py +++ /dev/null @@ -1,793 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np -import tensorflow as tf # pylint: ignore-module -import builtins -import functools -import copy -import os -import collections - - -# ================================================================ -# Make consistent with numpy -# ================================================================ - -clip = tf.clip_by_value - - -def sum(x, axis=None, keepdims=False): - axis = None if axis is None else [axis] - return tf.reduce_sum(x, axis=axis, keep_dims=keepdims) - - -def mean(x, axis=None, keepdims=False): - axis = None if axis is None else [axis] - return tf.reduce_mean(x, axis=axis, keep_dims=keepdims) - - -def var(x, axis=None, keepdims=False): - meanx = mean(x, axis=axis, keepdims=keepdims) - return mean(tf.square(x - meanx), axis=axis, keepdims=keepdims) - - -def std(x, axis=None, keepdims=False): - return tf.sqrt(var(x, axis=axis, keepdims=keepdims)) - - -def max(x, axis=None, keepdims=False): - axis = None if axis is None else [axis] - return tf.reduce_max(x, axis=axis, keep_dims=keepdims) - - -def min(x, axis=None, keepdims=False): - axis = None if axis is None else [axis] - return tf.reduce_min(x, axis=axis, keep_dims=keepdims) - - -def concatenate(arrs, axis=0): - return tf.concat(axis=axis, values=arrs) - - -def argmax(x, axis=None): - return tf.argmax(x, axis=axis) - - -def switch(condition, then_expression, else_expression): - """Switches between two operations depending on a scalar value (int or - bool). Note that both `then_expression` and `else_expression` - should be symbolic tensors of the *same shape*. - - # Arguments - condition: scalar tensor. - then_expression: TensorFlow operation. - else_expression: TensorFlow operation. - """ - x_shape = copy.copy(then_expression.get_shape()) - x = tf.cond(tf.cast(condition, 'bool'), - lambda: then_expression, lambda: else_expression) - x.set_shape(x_shape) - return x - -# ================================================================ -# Extras -# ================================================================ - - -def l2loss(params): - if len(params) == 0: - return tf.constant(0.0) - else: - return tf.add_n([sum(tf.square(p)) for p in params]) - - -def lrelu(x, leak=0.2): - f1 = 0.5 * (1 + leak) - f2 = 0.5 * (1 - leak) - return f1 * x + f2 * abs(x) - - -def categorical_sample_logits(X): - # https://github.com/tensorflow/tensorflow/issues/456 - U = tf.random_uniform(tf.shape(X)) - return argmax(X - tf.log(-tf.log(U)), axis=1) - - -# ================================================================ -# Inputs -# ================================================================ - - -def is_placeholder(x): - return type(x) is tf.Tensor and len(x.op.inputs) == 0 - - -class TfInput(object): - def __init__(self, name="(unnamed)"): - """Generalized Tensorflow placeholder. The main differences are: - - possibly uses multiple placeholders internally and returns multiple - values - - can apply light postprocessing to the value feed to placeholder. - """ - self.name = name - - def get(self): - """Return the tf variable(s) representing the possibly postprocessed - value of placeholder(s). - """ - raise NotImplemented() - - def make_feed_dict(data): - """Given data input it to the placeholder(s).""" - raise NotImplemented() - - -class PlacholderTfInput(TfInput): - def __init__(self, placeholder): - """Wrapper for regular tensorflow placeholder.""" - super().__init__(placeholder.name) - self._placeholder = placeholder - - def get(self): - return self._placeholder - - def make_feed_dict(self, data): - return {self._placeholder: data} - - -class BatchInput(PlacholderTfInput): - def __init__(self, shape, dtype=tf.float32, name=None): - """Creates a placeholder for a batch of tensors of a given shape and - dtype - - Parameters - ---------- - shape: [int] - shape of a single elemenet of the batch - dtype: tf.dtype - number representation used for tensor contents - name: str - name of the underlying placeholder - """ - super().__init__(tf.placeholder(dtype, [None] + list(shape), - name=name)) - - -class Uint8Input(PlacholderTfInput): - def __init__(self, shape, name=None): - """Takes input in uint8 format which is cast to float32 and divided by - 255 before passing it to the model. - - On GPU this ensures lower data transfer times. - - Parameters - ---------- - shape: [int] - shape of the tensor. - name: str - name of the underlying placeholder - """ - - super().__init__(tf.placeholder(tf.uint8, [None] + list(shape), - name=name)) - self._shape = shape - self._output = tf.cast(super().get(), tf.float32) / 255.0 - - def get(self): - return self._output - - -def ensure_tf_input(thing): - """Takes either tf.placeholder of TfInput and outputs equivalent TfInput""" - if isinstance(thing, TfInput): - return thing - elif is_placeholder(thing): - return PlacholderTfInput(thing) - else: - raise ValueError("Must be a placeholder or TfInput") - -# ================================================================ -# Mathematical utils -# ================================================================ - - -def huber_loss(x, delta=1.0): - """Reference: https://en.wikipedia.org/wiki/Huber_loss""" - return tf.where( - tf.abs(x) < delta, - tf.square(x) * 0.5, - delta * (tf.abs(x) - 0.5 * delta)) - -# ================================================================ -# Optimizer utils -# ================================================================ - - -def minimize_and_clip(optimizer, objective, var_list, clip_val=10): - """Minimized `objective` using `optimizer` w.r.t. variables in - `var_list` while ensure the norm of the gradients for each - variable is clipped to `clip_val` - """ - gradients = optimizer.compute_gradients(objective, var_list=var_list) - for i, (grad, var) in enumerate(gradients): - if grad is not None: - gradients[i] = (tf.clip_by_norm(grad, clip_val), var) - return optimizer.apply_gradients(gradients) - - -# ================================================================ -# Global session -# ================================================================ - -def get_session(): - """Returns recently made Tensorflow session""" - return tf.get_default_session() - - -def make_session(num_cpu): - """Returns a session that will use CPU's only""" - tf_config = tf.ConfigProto( - inter_op_parallelism_threads=num_cpu, - intra_op_parallelism_threads=num_cpu) - return tf.Session(config=tf_config) - - -def single_threaded_session(): - """Returns a session which will only use a single CPU""" - return make_session(1) - - -ALREADY_INITIALIZED = set() - - -def initialize(): - """Initialize all the uninitialized variables in the global scope.""" - new_variables = set(tf.global_variables()) - ALREADY_INITIALIZED - get_session().run(tf.variables_initializer(new_variables)) - ALREADY_INITIALIZED.update(new_variables) - - -def eval(expr, feed_dict=None): - if feed_dict is None: - feed_dict = {} - return get_session().run(expr, feed_dict=feed_dict) - - -VALUE_SETTERS = collections.OrderedDict() - - -def set_value(v, val): - global VALUE_SETTERS - if v in VALUE_SETTERS: - set_op, set_endpoint = VALUE_SETTERS[v] - else: - set_endpoint = tf.placeholder(v.dtype) - set_op = v.assign(set_endpoint) - VALUE_SETTERS[v] = (set_op, set_endpoint) - get_session().run(set_op, feed_dict={set_endpoint: val}) - - -# ================================================================ -# Saving variables -# ================================================================ - - -def load_state(fname): - saver = tf.train.Saver() - saver.restore(get_session(), fname) - - -def save_state(fname): - os.makedirs(os.path.dirname(fname), exist_ok=True) - saver = tf.train.Saver() - saver.save(get_session(), fname) - -# ================================================================ -# Model components -# ================================================================ - - -def normc_initializer(std=1.0): - # pylint: disable=W0613 - def _initializer(shape, dtype=None, partition_info=None): - out = np.random.randn(*shape).astype(np.float32) - out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True)) - return tf.constant(out) - return _initializer - - -def conv2d( - x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME", - dtype=tf.float32, collections=None, summary_tag=None): - with tf.variable_scope(name): - stride_shape = [1, stride[0], stride[1], 1] - filter_shape = [ - filter_size[0], filter_size[1], int(x.get_shape()[3]), num_filters] - - # there are "num input feature maps * filter height * filter width" - # inputs to each hidden unit - fan_in = intprod(filter_shape[:3]) - # each unit in the lower layer receives a gradient from: - # "num output feature maps * filter height * filter width" / - # pooling size - fan_out = intprod(filter_shape[:2]) * num_filters - # initialize weights with random weights - w_bound = np.sqrt(6. / (fan_in + fan_out)) - - w = tf.get_variable( - "W", filter_shape, dtype, - tf.random_uniform_initializer(-w_bound, w_bound), - collections=collections) - b = tf.get_variable( - "b", [1, 1, 1, num_filters], initializer=tf.zeros_initializer(), - collections=collections) - - if summary_tag is not None: - tf.summary.image( - summary_tag, - tf.transpose(tf.reshape(w, [filter_size[0], - filter_size[1], -1, 1]), - [2, 0, 1, 3]), - max_images=10) - - return tf.nn.conv2d(x, w, stride_shape, pad) + b - - -def dense(x, size, name, weight_init=None, bias=True): - w = tf.get_variable(name + "/w", [x.get_shape()[1], size], - initializer=weight_init) - ret = tf.matmul(x, w) - if bias: - b = tf.get_variable( - name + "/b", [size], initializer=tf.zeros_initializer()) - return ret + b - else: - return ret - - -def wndense(x, size, name, init_scale=1.0): - v = tf.get_variable( - name + "/V", [int(x.get_shape()[1]), size], - initializer=tf.random_normal_initializer(0, 0.05)) - g = tf.get_variable( - name + "/g", [size], initializer=tf.constant_initializer(init_scale)) - b = tf.get_variable( - name + "/b", [size], initializer=tf.constant_initializer(0.0)) - - # use weight normalization (Salimans & Kingma, 2016) - x = tf.matmul(x, v) - scaler = g / tf.sqrt(sum(tf.square(v), axis=0, keepdims=True)) - return tf.reshape(scaler, [1, size]) * x + tf.reshape(b, [1, size]) - - -def densenobias(x, size, name, weight_init=None): - return dense(x, size, name, weight_init=weight_init, bias=False) - - -def dropout(x, pkeep, phase=None, mask=None): - mask = tf.floor( - pkeep + tf.random_uniform(tf.shape(x))) if mask is None else mask - if phase is None: - return mask * x - else: - return switch(phase, mask * x, pkeep * x) - - -# ================================================================ -# Theano-like Function -# ================================================================ - - -def function(inputs, outputs, updates=None, givens=None): - """Just like Theano function. Take a bunch of tensorflow placeholders and - expressions computed based on those placeholders and produces f(inputs) -> - outputs. Function f takes values to be fed to the input's placeholders and - produces the values of the expressions in outputs. - - Input values can be passed in the same order as inputs or can be provided - as kwargs based on placeholder name (passed to constructor or accessible - via placeholder.op.name). - - Example: - x = tf.placeholder(tf.int32, (), name="x") - y = tf.placeholder(tf.int32, (), name="y") - z = 3 * x + 2 * y - lin = function([x, y], z, givens={y: 0}) - - with single_threaded_session(): - initialize() - - assert lin(2) == 6 - assert lin(x=3) == 9 - assert lin(2, 2) == 10 - assert lin(x=2, y=3) == 12 - - Parameters - ---------- - inputs: [tf.placeholder or TfInput] - list of input arguments - outputs: [tf.Variable] or tf.Variable - list of outputs or a single output to be returned from function. Returned - value will also have the same shape. - """ - if isinstance(outputs, list): - return _Function(inputs, outputs, updates, givens=givens) - elif isinstance(outputs, (dict, collections.OrderedDict)): - f = _Function(inputs, outputs.values(), updates, givens=givens) - return lambda *args, **kwargs: type(outputs)( - zip(outputs.keys(), f(*args, **kwargs))) - else: - f = _Function(inputs, [outputs], updates, givens=givens) - return lambda *args, **kwargs: f(*args, **kwargs)[0] - - -class _Function(object): - def __init__(self, inputs, outputs, updates, givens, check_nan=False): - for inpt in inputs: - if not issubclass(type(inpt), TfInput): - assert len(inpt.op.inputs) == 0, ( - "inputs should all be placeholders of " - "ray.rllib.dqn.common.TfInput") - self.inputs = inputs - updates = updates or [] - self.update_group = tf.group(*updates) - self.outputs_update = list(outputs) + [self.update_group] - self.givens = {} if givens is None else givens - self.check_nan = check_nan - - def _feed_input(self, feed_dict, inpt, value): - if issubclass(type(inpt), TfInput): - feed_dict.update(inpt.make_feed_dict(value)) - elif is_placeholder(inpt): - feed_dict[inpt] = value - - def __call__(self, *args, **kwargs): - assert len(args) <= len(self.inputs), "Too many arguments provided" - feed_dict = {} - # Update the args - for inpt, value in zip(self.inputs, args): - self._feed_input(feed_dict, inpt, value) - # Update the kwargs - kwargs_passed_inpt_names = set() - for inpt in self.inputs[len(args):]: - inpt_name = inpt.name.split(':')[0] - inpt_name = inpt_name.split('/')[-1] - assert inpt_name not in kwargs_passed_inpt_names, ( - "this function has two arguments with " - "the same name \"{}\", " + - "so kwargs cannot be used.".format(inpt_name)) - if inpt_name in kwargs: - kwargs_passed_inpt_names.add(inpt_name) - self._feed_input(feed_dict, inpt, kwargs.pop(inpt_name)) - else: - assert inpt in self.givens, "Missing argument " + inpt_name - assert len(kwargs) == 0, \ - "Function got extra arguments " + str(list(kwargs.keys())) - # Update feed dict with givens. - for inpt in self.givens: - feed_dict[inpt] = feed_dict.get(inpt, self.givens[inpt]) - results = get_session().run(self.outputs_update, - feed_dict=feed_dict)[:-1] - if self.check_nan: - if any(np.isnan(r).any() for r in results): - raise RuntimeError("Nan detected") - return results - - -def mem_friendly_function(nondata_inputs, data_inputs, outputs, batch_size): - if isinstance(outputs, list): - return _MemFriendlyFunction( - nondata_inputs, data_inputs, outputs, batch_size) - else: - f = _MemFriendlyFunction( - nondata_inputs, data_inputs, [outputs], batch_size) - return lambda *inputs: f(*inputs)[0] - - -class _MemFriendlyFunction(object): - def __init__(self, nondata_inputs, data_inputs, outputs, batch_size): - self.nondata_inputs = nondata_inputs - self.data_inputs = data_inputs - self.outputs = list(outputs) - self.batch_size = batch_size - - def __call__(self, *inputvals): - assert len(inputvals) == (len(self.nondata_inputs) + - len(self.data_inputs)) - nondata_vals = inputvals[0:len(self.nondata_inputs)] - data_vals = inputvals[len(self.nondata_inputs):] - feed_dict = dict(zip(self.nondata_inputs, nondata_vals)) - n = data_vals[0].shape[0] - for v in data_vals[1:]: - assert v.shape[0] == n - for i_start in range(0, n, self.batch_size): - slice_vals = [ - v[i_start:builtins.min(i_start + self.batch_size, n)] - for v in data_vals] - for (var, val) in zip(self.data_inputs, slice_vals): - feed_dict[var] = val - results = tf.get_default_session().run(self.outputs, - feed_dict=feed_dict) - if i_start == 0: - sum_results = results - else: - for i in range(len(results)): - sum_results[i] = sum_results[i] + results[i] - for i in range(len(results)): - sum_results[i] = sum_results[i] / n - return sum_results - -# ================================================================ -# Modules -# ================================================================ - - -class Module(object): - def __init__(self, name): - self.name = name - self.first_time = True - self.scope = None - self.cache = {} - - def __call__(self, *args): - if args in self.cache: - print("(%s) retrieving value from cache" % (self.name,)) - return self.cache[args] - with tf.variable_scope(self.name, reuse=not self.first_time): - scope = tf.get_variable_scope().name - if self.first_time: - self.scope = scope - print("(%s) running function for the first time" % - (self.name,)) - else: - assert self.scope == scope, \ - "Tried calling function with a different scope" - print("(%s) running function on new inputs" % (self.name,)) - self.first_time = False - out = self._call(*args) - self.cache[args] = out - return out - - def _call(self, *args): - raise NotImplementedError - - @property - def trainable_variables(self): - assert self.scope is not None, \ - "need to call module once before getting variables" - return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, self.scope) - - @property - def variables(self): - assert self.scope is not None, \ - "need to call module once before getting variables" - return tf.get_collection(tf.GraphKeys.VARIABLES, self.scope) - - -def module(name): - @functools.wraps - def wrapper(f): - class WrapperModule(Module): - def _call(self, *args): - return f(*args) - return WrapperModule(name) - return wrapper - -# ================================================================ -# Graph traversal -# ================================================================ - - -VARIABLES = {} - - -def get_parents(node): - return node.op.inputs - - -def topsorted(outputs): - """ - Topological sort via non-recursive depth-first search - """ - assert isinstance(outputs, (list, tuple)) - marks = {} - out = [] - stack = [] # pylint: disable=W0621 - # i: node - # jidx = number of children visited so far from that node - # marks: state of each node, which is one of - # 0: haven't visited - # 1: have visited, but not done visiting children - # 2: done visiting children - for x in outputs: - stack.append((x, 0)) - while stack: - (i, jidx) = stack.pop() - if jidx == 0: - m = marks.get(i, 0) - if m == 0: - marks[i] = 1 - elif m == 1: - raise ValueError("not a dag") - else: - continue - ps = get_parents(i) - if jidx == len(ps): - marks[i] = 2 - out.append(i) - else: - stack.append((i, jidx + 1)) - j = ps[jidx] - stack.append((j, 0)) - return out - - -# ================================================================ -# Flat vectors -# ================================================================ - -def var_shape(x): - out = x.get_shape().as_list() - assert all(isinstance(a, int) for a in out), \ - "shape function assumes that shape is fully known" - return out - - -def numel(x): - return intprod(var_shape(x)) - - -def intprod(x): - return int(np.prod(x)) - - -def flatgrad(loss, var_list): - grads = tf.gradients(loss, var_list) - return tf.concat(axis=0, values=[ - tf.reshape(grad if grad is not None else tf.zeros_like(v), [numel(v)]) - for (v, grad) in zip(var_list, grads) - ]) - - -class SetFromFlat(object): - def __init__(self, var_list, dtype=tf.float32): - assigns = [] - shapes = list(map(var_shape, var_list)) - total_size = np.sum([intprod(shape) for shape in shapes]) - - self.theta = theta = tf.placeholder(dtype, [total_size]) - start = 0 - assigns = [] - for (shape, v) in zip(shapes, var_list): - size = intprod(shape) - assigns.append( - tf.assign(v, tf.reshape(theta[start:start + size], shape))) - start += size - self.op = tf.group(*assigns) - - def __call__(self, theta): - get_session().run(self.op, feed_dict={self.theta: theta}) - - -class GetFlat(object): - def __init__(self, var_list): - self.op = tf.concat( - axis=0, values=[tf.reshape(v, [numel(v)]) for v in var_list]) - - def __call__(self): - return get_session().run(self.op) - -# ================================================================ -# Misc -# ================================================================ - - -def fancy_slice_2d(X, inds0, inds1): - """ - like numpy X[inds0, inds1] - XXX this implementation is bad - """ - inds0 = tf.cast(inds0, tf.int64) - inds1 = tf.cast(inds1, tf.int64) - shape = tf.cast(tf.shape(X), tf.int64) - ncols = shape[1] - Xflat = tf.reshape(X, [-1]) - return tf.gather(Xflat, inds0 * ncols + inds1) - - -# ================================================================ -# Scopes -# ================================================================ - - -def scope_vars(scope, trainable_only=False): - """ - Get variables inside a scope - The scope can be specified as a string - - Parameters - ---------- - scope: str or VariableScope - scope in which the variables reside. - trainable_only: bool - whether or not to return only the variables that were marked as - trainable. - - Returns - ------- - vars: [tf.Variable] - list of variables in `scope`. - """ - return tf.get_collection( - tf.GraphKeys.TRAINABLE_VARIABLES - if trainable_only else tf.GraphKeys.VARIABLES, - scope=scope if isinstance(scope, str) else scope.name) - - -def scope_name(): - """Returns the name of current scope as a string, e.g. deepq/q_func""" - return tf.get_variable_scope().name - - -def absolute_scope_name(relative_scope_name): - """Appends parent scope name to `relative_scope_name`""" - return scope_name() + "/" + relative_scope_name - - -def lengths_to_mask(lengths_b, max_length): - """ - Turns a vector of lengths into a boolean mask - - Args: - lengths_b: an integer vector of lengths - max_length: maximum length to fill the mask - - Returns: - a boolean array of shape (batch_size, max_length) - row[i] consists of True repeated lengths_b[i] times, followed by False - """ - lengths_b = tf.convert_to_tensor(lengths_b) - assert lengths_b.get_shape().ndims == 1 - mask_bt = tf.expand_dims( - tf.range(max_length), 0) < tf.expand_dims(lengths_b, 1) - return mask_bt - - -def in_session(f): - @functools.wraps(f) - def newfunc(*args, **kwargs): - with tf.Session(): - f(*args, **kwargs) - return newfunc - - -_PLACEHOLDER_CACHE = {} # name -> (placeholder, dtype, shape) - - -def get_placeholder(name, dtype, shape): - if name in _PLACEHOLDER_CACHE: - out, dtype1, shape1 = _PLACEHOLDER_CACHE[name] - assert dtype1 == dtype and shape1 == shape - return out - else: - out = tf.placeholder(dtype=dtype, shape=shape, name=name) - _PLACEHOLDER_CACHE[name] = (out, dtype, shape) - return out - - -def get_placeholder_cached(name): - return _PLACEHOLDER_CACHE[name][0] - - -def flattenallbut0(x): - return tf.reshape(x, [-1, intprod(x.get_shape().as_list()[1:])]) - - -def reset(): - global _PLACEHOLDER_CACHE - global VARIABLES - _PLACEHOLDER_CACHE = {} - VARIABLES = {} - tf.reset_default_graph() diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index b67e6dd90..0e2202226 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -9,63 +9,75 @@ import numpy as np import tensorflow as tf from ray.rllib.common import Algorithm, TrainingResult -from ray.rllib.dqn.build_graph import build_train from ray.rllib.dqn import logger, models from ray.rllib.dqn.common.atari_wrappers_deprecated \ import wrap_dqn, ScaledFloatFrame -from ray.rllib.dqn.common import tf_util as U from ray.rllib.dqn.common.schedules import LinearSchedule from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer """The default configuration dict for the DQN algorithm. - lr: float - learning rate for adam optimizer - schedule_max_timesteps: int - max num timesteps for annealing schedules - timesteps_per_iteration: int - number of env steps to optimize for before returning - buffer_size: int - size of the replay buffer - exploration_fraction: float - fraction of entire training period over which the exploration rate is - annealed - exploration_final_eps: float - final value of random action probability - train_freq: int - update the model every `train_freq` steps. - batch_size: int - size of a batched sampled from replay buffer for training - print_freq: int - how often to print out training progress - set to None to disable printing - checkpoint_freq: int - how often to save the model. This is so that the best version is restored - at the end of the training. If you do not wish to restore the best version - at the end of the training set this variable to None. - learning_starts: int - how many steps of the model to collect transitions for before learning - starts - gamma: float - discount factor - target_network_update_freq: int - update the target network every `target_network_update_freq` steps. - prioritized_replay: True - if True prioritized replay buffer will be used. - prioritized_replay_alpha: float - alpha parameter for prioritized replay buffer - prioritized_replay_beta0: float - initial value of beta for prioritized replay buffer - prioritized_replay_beta_iters: int - number of iterations over which beta will be annealed from initial value - to 1.0. If set to None equals to schedule_max_timesteps - prioritized_replay_eps: float - epsilon to add to the TD errors when updating priorities. - num_cpu: int - number of cpus to use for training + dueling: bool + whether to use dueling dqn + double_q: bool + whether to use double dqn + hiddens: array + hidden layer sizes of the state and action value networks + model_config: dict + config options to pass to the model constructor + lr: float + learning rate for adam optimizer + schedule_max_timesteps: int + max num timesteps for annealing schedules + timesteps_per_iteration: int + number of env steps to optimize for before returning + buffer_size: int + size of the replay buffer + exploration_fraction: float + fraction of entire training period over which the exploration rate is + annealed + exploration_final_eps: float + final value of random action probability + train_freq: int + update the model every `train_freq` steps. + batch_size: int + size of a batched sampled from replay buffer for training + print_freq: int + how often to print out training progress + set to None to disable printing + checkpoint_freq: int + how often to save the model. This is so that the best version is + restored at the end of the training. If you do not wish to restore + the best version at the end of the training set this variable to None. + learning_starts: int + how many steps of the model to collect transitions for before learning + starts + gamma: float + discount factor + grad_norm_clipping: int or None + if not None, clip gradients during optimization at this value + target_network_update_freq: int + update the target network every `target_network_update_freq` steps. + prioritized_replay: True + if True prioritized replay buffer will be used. + prioritized_replay_alpha: float + alpha parameter for prioritized replay buffer + prioritized_replay_beta0: float + initial value of beta for prioritized replay buffer + prioritized_replay_beta_iters: int + number of iterations over which beta will be annealed from initial + value to 1.0. If set to None equals to schedule_max_timesteps + prioritized_replay_eps: float + epsilon to add to the TD errors when updating priorities. + num_cpu: int + number of cpus to use for training """ DEFAULT_CONFIG = dict( + dueling=True, + double_q=True, + hiddens=[256], + model_config={}, lr=5e-4, schedule_max_timesteps=100000, timesteps_per_iteration=1000, @@ -78,6 +90,7 @@ DEFAULT_CONFIG = dict( checkpoint_freq=10000, learning_starts=1000, gamma=1.0, + grad_norm_clipping=10, target_network_update_freq=500, prioritized_replay=False, prioritized_replay_alpha=0.6, @@ -92,24 +105,18 @@ class DQN(Algorithm): config.update({"alg": "DQN"}) Algorithm.__init__(self, env_name, config, upload_dir=upload_dir) env = gym.make(env_name) - env = ScaledFloatFrame(wrap_dqn(env)) + # TODO(ekl): replace this with RLlib preprocessors + if "NoFrameskip" in env_name: + env = ScaledFloatFrame(wrap_dqn(env)) self.env = env - model = models.cnn_to_mlp( - convs=[(32, 8, 4), (64, 4, 2), (64, 3, 1)], - hiddens=[256], dueling=True) - sess = U.make_session(num_cpu=config["num_cpu"]) - sess.__enter__() - def make_obs_ph(name): - return U.BatchInput(env.observation_space.shape, name=name) + num_cpu = config["num_cpu"] + tf_config = tf.ConfigProto( + inter_op_parallelism_threads=num_cpu, + intra_op_parallelism_threads=num_cpu) + self.sess = tf.Session(config=tf_config) + self.dqn_graph = models.DQNGraph(env, config) - self.act, self.optimize, self.update_target, self.debug = build_train( - make_obs_ph=make_obs_ph, - q_func=model, - num_actions=env.action_space.n, - optimizer=tf.train.AdamOptimizer(learning_rate=config["lr"]), - gamma=config["gamma"], - grad_norm_clipping=10) # Create the replay buffer if config["prioritized_replay"]: self.replay_buffer = PrioritizedReplayBuffer( @@ -136,8 +143,8 @@ class DQN(Algorithm): final_p=config["exploration_final_eps"]) # Initialize the parameters and copy them to the target network. - U.initialize() - self.update_target() + self.sess.run(tf.global_variables_initializer()) + self.dqn_graph.update_target(self.sess) self.episode_rewards = [0.0] self.episode_lengths = [0.0] @@ -145,18 +152,19 @@ class DQN(Algorithm): self.obs = self.env.reset() self.num_timesteps = 0 self.num_iterations = 0 + self.file_writer = tf.summary.FileWriter(self.logdir, self.sess.graph) def train(self): config = self.config sample_time, learn_time = 0, 0 - for t in range(config["timesteps_per_iteration"]): + for _ in range(config["timesteps_per_iteration"]): self.num_timesteps += 1 dt = time.time() # Take action and update exploration to the newest value - action = self.act( - np.array(self.obs)[None], - update_eps=self.exploration.value(t))[0] + action = self.dqn_graph.act( + self.sess, np.array(self.obs)[None], + self.exploration.value(self.num_timesteps))[0] new_obs, rew, done, _ = self.env.step(action) # Store transition in the replay buffer. self.replay_buffer.add(self.obs, action, rew, new_obs, float(done)) @@ -177,28 +185,29 @@ class DQN(Algorithm): # from replay buffer. if config["prioritized_replay"]: experience = self.replay_buffer.sample( - config["batch_size"], beta=self.beta_schedule.value(t)) + config["batch_size"], + beta=self.beta_schedule.value(self.num_timesteps)) (obses_t, actions, rewards, obses_tp1, dones, _, batch_idxes) = experience else: - obses_t, actions, rewards, obses_tp1, dones = \ - self.replay_buffer.sample(config["batch_size"]) + obses_t, actions, rewards, obses_tp1, dones = ( + self.replay_buffer.sample(config["batch_size"])) batch_idxes = None - td_errors = self.optimize( - obses_t, actions, rewards, obses_tp1, dones, + td_errors = self.dqn_graph.train( + self.sess, obses_t, actions, rewards, obses_tp1, dones, np.ones_like(rewards)) if config["prioritized_replay"]: - new_priorities = (np.abs(td_errors) + - config["prioritized_replay_eps"]) - self.replay_buffer.update_priorities(batch_idxes, - new_priorities) + new_priorities = np.abs(td_errors) + ( + config["prioritized_replay_eps"]) + self.replay_buffer.update_priorities( + batch_idxes, new_priorities) learn_time += (time.time() - dt) - if (self.num_timesteps > config["learning_starts"] and + if self.num_timesteps > config["learning_starts"] and ( self.num_timesteps % config["target_network_update_freq"] == 0): # Update target network periodically. - self.update_target() + self.dqn_graph.update_target(self.sess) mean_100ep_reward = round(np.mean(self.episode_rewards[-101:-1]), 1) mean_100ep_length = round(np.mean(self.episode_lengths[-101:-1]), 1) @@ -209,16 +218,19 @@ class DQN(Algorithm): "learn_time": learn_time, "steps": self.num_timesteps, "episodes": num_episodes, - "exploration": int(100 * self.exploration.value(t)) + "exploration": int( + 100 * self.exploration.value(self.num_timesteps)) } logger.record_tabular("sample_time", sample_time) logger.record_tabular("learn_time", learn_time) logger.record_tabular("steps", self.num_timesteps) + logger.record_tabular("buffer_size", len(self.replay_buffer)) logger.record_tabular("episodes", num_episodes) logger.record_tabular("mean 100 episode reward", mean_100ep_reward) logger.record_tabular( - "% time spent exploring", int(100 * self.exploration.value(t))) + "% time spent exploring", + int(100 * self.exploration.value(self.num_timesteps))) logger.dump_tabular() res = TrainingResult( diff --git a/python/ray/rllib/dqn/example-cartpole.py b/python/ray/rllib/dqn/example-cartpole.py new file mode 100755 index 000000000..780a25569 --- /dev/null +++ b/python/ray/rllib/dqn/example-cartpole.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import tensorflow as tf + +import ray +from ray.rllib.dqn import DQN, DEFAULT_CONFIG + + +def main(): + parser = argparse.ArgumentParser(description="Run the A3C algorithm.") + parser.add_argument("--iterations", default=-1, type=int, + help="The number of training iterations to run.") + + args = parser.parse_args() + + config = DEFAULT_CONFIG.copy() + config.update(dict( + lr=1e-3, + schedule_max_timesteps=100000, + exploration_fraction=0.1, + exploration_final_eps=0.02, + dueling=False, + hiddens=[], + model_config=dict( + fcnet_hiddens=[64], + fcnet_activation=tf.nn.relu + ))) + + # Currently Ray is not used in this example, but we need to call ray.init + # to create the directory in which logging will occur. TODO(rkn): Fix this. + ray.init() + + dqn = DQN("CartPole-v0", config) + + iteration = 0 + while iteration != args.iterations: + iteration += 1 + res = dqn.train() + print("current status: {}".format(res)) + + +if __name__ == "__main__": + main() diff --git a/python/ray/rllib/dqn/logger.py b/python/ray/rllib/dqn/logger.py index ea9957c4a..25bf8bb12 100644 --- a/python/ray/rllib/dqn/logger.py +++ b/python/ray/rllib/dqn/logger.py @@ -140,11 +140,15 @@ record_tabular = logkv dump_tabular = dumpkvs -def log(*args, level=INFO): +def log(*args, **kwargs): """ Write the sequence of args, with no separators, to the console and output files (if you've configured an output file). """ + if "level" in kwargs: + level = kwargs["level"] + else: + level = INFO Logger.CURRENT.log(*args, level=level) @@ -216,7 +220,11 @@ class Logger(object): fmt.writekvs(self.name2val) self.name2val.clear() - def log(self, *args, level=INFO): + def log(self, *args, **kwargs): + if "level" in kwargs: + level = kwargs["level"] + else: + level = INFO if self.level <= level: self._do_log(args) diff --git a/python/ray/rllib/dqn/models.py b/python/ray/rllib/dqn/models.py index fa8917156..c3f4a08eb 100644 --- a/python/ray/rllib/dqn/models.py +++ b/python/ray/rllib/dqn/models.py @@ -5,94 +5,209 @@ from __future__ import print_function import tensorflow as tf import tensorflow.contrib.layers as layers +from ray.rllib.models import ModelCatalog -def _mlp(hiddens, inpt, num_actions, scope, reuse=False): - with tf.variable_scope(scope, reuse=reuse): - out = inpt + +def _build_q_network(inputs, num_actions, config): + dueling = config["dueling"] + hiddens = config["hiddens"] + frontend = ModelCatalog.get_model(inputs, 1, config["model_config"]) + frontend_out = frontend.last_layer + + with tf.variable_scope("action_value"): + action_out = frontend_out for hidden in hiddens: - out = layers.fully_connected( - out, num_outputs=hidden, activation_fn=tf.nn.relu) - out = layers.fully_connected( - out, num_outputs=num_actions, activation_fn=None) - return out + action_out = layers.fully_connected( + action_out, num_outputs=hidden, activation_fn=tf.nn.relu) + action_scores = layers.fully_connected( + action_out, num_outputs=num_actions, activation_fn=None) - -def mlp(hiddens=[]): - """This model takes as input an observation and returns values of all - actions. - - Parameters - ---------- - hiddens: [int] - list of sizes of hidden layers - - Returns - ------- - q_func: function - q_function for DQN algorithm. - """ - return lambda *args, **kwargs: _mlp(hiddens, *args, **kwargs) - - -def _cnn_to_mlp( - convs, hiddens, dueling, inpt, num_actions, scope, reuse=False): - - with tf.variable_scope(scope, reuse=reuse): - out = inpt - with tf.variable_scope("convnet"): - for num_outputs, kernel_size, stride in convs: - out = layers.convolution2d( - out, - num_outputs=num_outputs, - kernel_size=kernel_size, - stride=stride, - activation_fn=tf.nn.relu) - out = layers.flatten(out) - with tf.variable_scope("action_value"): - action_out = out + if dueling: + with tf.variable_scope("state_value"): + state_out = frontend_out for hidden in hiddens: - action_out = layers.fully_connected( - action_out, num_outputs=hidden, activation_fn=tf.nn.relu) - action_scores = layers.fully_connected( - action_out, num_outputs=num_actions, activation_fn=None) - - if dueling: - with tf.variable_scope("state_value"): - state_out = out - for hidden in hiddens: - state_out = layers.fully_connected( - state_out, num_outputs=hidden, - activation_fn=tf.nn.relu) - state_score = layers.fully_connected( - state_out, num_outputs=1, activation_fn=None) - action_scores_mean = tf.reduce_mean(action_scores, 1) - action_scores_centered = action_scores - tf.expand_dims( - action_scores_mean, 1) - return state_score + action_scores_centered - else: - return action_scores - return out + state_out = layers.fully_connected( + state_out, num_outputs=hidden, activation_fn=tf.nn.relu) + state_score = layers.fully_connected( + state_out, num_outputs=1, activation_fn=None) + action_scores_mean = tf.reduce_mean(action_scores, 1) + action_scores_centered = action_scores - tf.expand_dims( + action_scores_mean, 1) + return state_score + action_scores_centered + else: + return action_scores -def cnn_to_mlp(convs, hiddens, dueling=False): - """This model takes an observation and returns values for all actions. +def _build_action_network( + q_values, observations, num_actions, stochastic, eps): + deterministic_actions = tf.argmax(q_values, axis=1) + batch_size = tf.shape(observations)[0] + random_actions = tf.random_uniform( + tf.stack([batch_size]), minval=0, maxval=num_actions, dtype=tf.int64) + chose_random = tf.random_uniform( + tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps + stochastic_actions = tf.where( + chose_random, random_actions, deterministic_actions) + return tf.cond( + stochastic, lambda: stochastic_actions, + lambda: deterministic_actions) + + +def _huber_loss(x, delta=1.0): + """Reference: https://en.wikipedia.org/wiki/Huber_loss""" + return tf.where( + tf.abs(x) < delta, + tf.square(x) * 0.5, + delta * (tf.abs(x) - 0.5 * delta)) + + +def _minimize_and_clip(optimizer, objective, var_list, clip_val=10): + """Minimized `objective` using `optimizer` w.r.t. variables in + `var_list` while ensure the norm of the gradients for each + variable is clipped to `clip_val` + """ + gradients = optimizer.compute_gradients(objective, var_list=var_list) + for i, (grad, var) in enumerate(gradients): + if grad is not None: + gradients[i] = (tf.clip_by_norm(grad, clip_val), var) + return optimizer.apply_gradients(gradients) + + +def _scope_vars(scope, trainable_only=False): + """ + Get variables inside a scope + The scope can be specified as a string Parameters ---------- - convs: [(int, int int)] - list of convolutional layers in form of - (num_outputs, kernel_size, stride) - hiddens: [int] - list of sizes of hidden layers - dueling: bool - if true double the output MLP to compute a baseline - for action scores + scope: str or VariableScope + scope in which the variables reside. + trainable_only: bool + whether or not to return only the variables that were marked as + trainable. Returns ------- - q_func: function - q_function for DQN algorithm. + vars: [tf.Variable] + list of variables in `scope`. """ + return tf.get_collection( + tf.GraphKeys.TRAINABLE_VARIABLES + if trainable_only else tf.GraphKeys.VARIABLES, + scope=scope if isinstance(scope, str) else scope.name) - return lambda *args, **kwargs: _cnn_to_mlp( - convs, hiddens, dueling, *args, **kwargs) + +class DQNGraph(object): + def __init__(self, env, config): + self.env = env + num_actions = env.action_space.n + optimizer = tf.train.AdamOptimizer(learning_rate=config["lr"]) + + # Action inputs + self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") + self.eps = tf.placeholder(tf.float32, (), name="eps") + self.cur_observations = tf.placeholder( + tf.float32, shape=(None,) + env.observation_space.shape) + + # Action Q network + with tf.variable_scope("q_func") as scope: + q_values = _build_q_network( + self.cur_observations, num_actions, config) + q_func_vars = _scope_vars(scope.name) + + # Action outputs + self.output_actions = _build_action_network( + q_values, + self.cur_observations, + num_actions, + self.stochastic, + self.eps) + + # Replay inputs + self.obs_t = tf.placeholder( + tf.float32, shape=(None,) + env.observation_space.shape) + self.act_t = tf.placeholder(tf.int32, [None], name="action") + self.rew_t = tf.placeholder(tf.float32, [None], name="reward") + self.obs_tp1 = tf.placeholder( + tf.float32, shape=(None,) + env.observation_space.shape) + self.done_mask = tf.placeholder(tf.float32, [None], name="done") + self.importance_weights = tf.placeholder( + tf.float32, [None], name="weight") + + # q network evaluation + with tf.variable_scope("q_func", reuse=True): + self.q_t = _build_q_network(self.obs_t, num_actions, config) + + # target q network evalution + with tf.variable_scope("target_q_func") as scope: + self.q_tp1 = _build_q_network(self.obs_tp1, num_actions, config) + target_q_func_vars = _scope_vars(scope.name) + + # q scores for actions which we know were selected in the given state. + q_t_selected = tf.reduce_sum( + self.q_t * tf.one_hot(self.act_t, num_actions), 1) + + # compute estimate of best possible value starting from state at t + 1 + if config["double_q"]: + with tf.variable_scope("q_func", reuse=True): + q_tp1_using_online_net = _build_q_network( + self.obs_tp1, num_actions, config) + q_tp1_best_using_online_net = tf.arg_max(q_tp1_using_online_net, 1) + q_tp1_best = tf.reduce_sum( + self.q_tp1 * tf.one_hot( + q_tp1_best_using_online_net, num_actions), 1) + else: + q_tp1_best = tf.reduce_max(self.q_tp1, 1) + q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = self.rew_t + config["gamma"] * q_tp1_best_masked + + # compute the error (potentially clipped) + self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) + errors = _huber_loss(self.td_error) + weighted_error = tf.reduce_mean(self.importance_weights * errors) + # compute optimization op (potentially with gradient clipping) + if config["grad_norm_clipping"] is not None: + self.optimize_expr = _minimize_and_clip( + optimizer, weighted_error, var_list=q_func_vars, + clip_val=config["grad_norm_clipping"]) + else: + self.optimize_expr = optimizer.minimize( + weighted_error, var_list=q_func_vars) + + # update_target_fn will be called periodically to copy Q network to + # target Q network + update_target_expr = [] + for var, var_target in zip( + sorted(q_func_vars, key=lambda v: v.name), + sorted(target_q_func_vars, key=lambda v: v.name)): + update_target_expr.append(var_target.assign(var)) + self.update_target_expr = tf.group(*update_target_expr) + + def update_target(self, sess): + return sess.run(self.update_target_expr) + + def act(self, sess, obs, eps, stochastic=True): + return sess.run( + self.output_actions, + feed_dict={ + self.cur_observations: obs, + self.stochastic: stochastic, + self.eps: eps, + }) + + def train( + self, sess, obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + td_err, _ = sess.run( + [self.td_error, self.optimize_expr], + feed_dict={ + self.obs_t: obs_t, + self.act_t: act_t, + self.rew_t: rew_t, + self.obs_tp1: obs_tp1, + self.done_mask: done_mask, + self.importance_weights: importance_weights + }) + return td_err diff --git a/python/ray/rllib/example.py b/python/ray/rllib/example.py deleted file mode 100755 index 03ce00d55..000000000 --- a/python/ray/rllib/example.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python -"""Demonstrates the RLlib algorithm API through a simple bakeoff.""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -import ray.rllib.evolution_strategies as es -import ray.rllib.policy_gradient as pg - - -if __name__ == "__main__": - ray.init() - - # TODO(ekl): get the algorithms working on a common set of envs - env_name = "CartPole-v0" - alg1 = es.EvolutionStrategies(env_name, es.DEFAULT_CONFIG) - alg2 = pg.PolicyGradient(env_name, pg.DEFAULT_CONFIG) - - while True: - r1 = alg1.train() - r2 = alg2.train() - print("evolution strategies: {}".format(r1)) - print("policy gradient: {}".format(r2)) diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index 5874c70cc..1c1de23cf 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -44,23 +44,27 @@ class ModelCatalog(object): "Unsupported args: {} {}".format(action_space, dist_type)) @staticmethod - def get_model(inputs, num_outputs): + def get_model(inputs, num_outputs, options=None): """Returns a suitable model conforming to given input and output specs. Args: inputs (Tensor): The input tensor to the model. num_outputs (int): The size of the output vector of the model. + options (dict): Optional args to pass to the model constructor. Returns: model (Model): Neural network model. """ + if options is None: + options = {} + obs_rank = len(inputs.get_shape()) - 1 if obs_rank > 1: - return VisionNetwork(inputs, num_outputs) + return VisionNetwork(inputs, num_outputs, options) - return FullyConnectedNetwork(inputs, num_outputs) + return FullyConnectedNetwork(inputs, num_outputs, options) @staticmethod def get_preprocessor(env_name): diff --git a/python/ray/rllib/models/fcnet.py b/python/ray/rllib/models/fcnet.py index 9237bbb67..34d599aea 100644 --- a/python/ray/rllib/models/fcnet.py +++ b/python/ray/rllib/models/fcnet.py @@ -21,17 +21,23 @@ def normc_initializer(std=1.0): class FullyConnectedNetwork(Model): """Generic fully connected network.""" - def _init(self, inputs, num_outputs): + def _init(self, inputs, num_outputs, options): + hiddens = options.get("fcnet_hiddens", [256, 256]) + activation = options.get("fcnet_activation", tf.nn.tanh) + print("Constructing fcnet {} {}".format(hiddens, activation)) + with tf.name_scope("fc_net"): - fc1 = slim.fully_connected( - inputs, 256, weights_initializer=normc_initializer(1.0), - activation_fn=tf.nn.tanh, - scope="fc1") - fc2 = slim.fully_connected( - fc1, 256, weights_initializer=normc_initializer(1.0), - activation_fn=tf.nn.tanh, - scope="fc2") - fc3 = slim.fully_connected( - fc2, num_outputs, weights_initializer=normc_initializer(0.01), - activation_fn=None, scope="fc3") - return fc3 + i = 1 + last_layer = inputs + for size in hiddens: + last_layer = slim.fully_connected( + last_layer, size, + weights_initializer=normc_initializer(1.0), + activation_fn=activation, + scope="fc{}".format(i)) + i += 1 + output = slim.fully_connected( + last_layer, num_outputs, + weights_initializer=normc_initializer(0.01), + activation_fn=None, scope="fc_out") + return output, last_layer diff --git a/python/ray/rllib/models/model.py b/python/ray/rllib/models/model.py index 483923c82..56cf5e866 100644 --- a/python/ray/rllib/models/model.py +++ b/python/ray/rllib/models/model.py @@ -9,20 +9,21 @@ class Model(object): Models convert input tensors to a number of output features. These features can then be interpreted by ActionDistribution classes to determine e.g. agent action values. + + The last layer of the network can also be retrieved if the algorithm + needs to further post-processing (e.g. Actor and Critic networks in A3C). + + Attributes: + inputs (Tensor): The input placeholder for this model. + outputs (Tensor): The output vector of this model. + last_layer (Tensor): The network layer right before the model output. """ - def __init__(self, inputs, num_outputs): + def __init__(self, inputs, num_outputs, options): self.inputs = inputs - self.outputs = self._init(inputs, num_outputs) + self.outputs, self.last_layer = self._init( + inputs, num_outputs, options) def _init(self): - """Initializes the model given self.inputs and self.num_outputs.""" + """Builds and returns the output and last layer of the network.""" raise NotImplementedError - - def inputs(self): - """Returns the input placeholder for this model.""" - return self.inputs - - def outputs(self): - """Returns the output tensor of this model.""" - return self.outputs diff --git a/python/ray/rllib/models/visionnet.py b/python/ray/rllib/models/visionnet.py index 3d4c43132..126b7610e 100644 --- a/python/ray/rllib/models/visionnet.py +++ b/python/ray/rllib/models/visionnet.py @@ -11,7 +11,7 @@ from ray.rllib.models.model import Model class VisionNetwork(Model): """Generic vision network.""" - def _init(self, inputs, num_outputs): + def _init(self, inputs, num_outputs, options): with tf.name_scope("vision_net"): conv1 = slim.conv2d(inputs, 16, [8, 8], 4, scope="conv1") conv2 = slim.conv2d(conv1, 32, [4, 4], 2, scope="conv2") @@ -19,4 +19,4 @@ class VisionNetwork(Model): conv2, 512, [10, 10], padding="VALID", scope="fc1") fc2 = slim.conv2d(fc1, num_outputs, [1, 1], activation_fn=None, normalizer_fn=None, scope="fc2") - return tf.squeeze(fc2, [1, 2]) + return tf.squeeze(fc2, [1, 2]), tf.squeeze(fc1, [1, 2]) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index df5102901..094474662 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -69,3 +69,7 @@ docker run --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/evolution_strategies/example.py \ --env-name=Pendulum-v0 \ --iterations=2 + +docker run --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/dqn/example-cartpole.py \ + --iterations=2