diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index 13036ae7d..273c6fbcd 100644 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -313,6 +313,15 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_external_multi_agent_env.py +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_keras_model.py --run=A2C --stop=50 + +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_keras_model.py --run=PPO --stop=50 + +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_keras_model.py --run=DQN --stop=50 + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/python/ray/rllib/examples/parametric_action_cartpole.py --run=PG --stop=50 @@ -395,12 +404,26 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/python/ray/rllib/examples/eager_execution.py --iters=2 +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output /ray/python/ray/rllib/train.py \ + --env CartPole-v0 \ + --run PG \ + --stop '{"training_iteration": 1}' \ + --config '{"use_eager": true}' + +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output /ray/python/ray/rllib/train.py \ + --env CartPole-v0 \ + --run A2C \ + --stop '{"training_iteration": 1}' \ + --config '{"use_eager": true}' + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run PPO \ --stop '{"training_iteration": 1}' \ - --config '{"use_eager": true, "simple_optimizer": true}' + --config '{"use_eager": true}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_tf_policy.py --iters=2 diff --git a/ci/travis/format.sh b/ci/travis/format.sh index 2ef141532..9089404f6 100755 --- a/ci/travis/format.sh +++ b/ci/travis/format.sh @@ -5,6 +5,12 @@ # Cause the script to exit if a single command fails set -eo pipefail +ver=$(yapf --version) +if ! echo $ver | grep -q 0.23.0; then + echo "Wrong YAPF version installed: 0.23.0 is required, not $ver" + exit 1 +fi + # this stops git rev-parse from failing if we run this from the .git directory builtin cd "$(dirname "${BASH_SOURCE:-$0}")" @@ -17,7 +23,7 @@ if ! [[ -e "$ROOT/.git/refs/remotes/upstream" ]]; then fi # Only fetch master since that's the branch we're diffing against. -git fetch upstream master +git fetch upstream master || true YAPF_FLAGS=( '--style' "$ROOT/.style.yapf" diff --git a/doc/source/rllib-concepts.rst b/doc/source/rllib-concepts.rst index 4b00f5636..9b9cdbbb2 100644 --- a/doc/source/rllib-concepts.rst +++ b/doc/source/rllib-concepts.rst @@ -230,22 +230,22 @@ The ``choose_policy_optimizer`` function chooses which `Policy Optimizer <#polic standardize_fields=["advantages"], straggler_mitigation=config["straggler_mitigation"]) -Suppose we want to customize PPO to use an asynchronous-gradient optimization strategy similar to A3C. To do that, we could define a new function that returns ``AsyncGradientsOptimizer`` and pass in ``make_policy_optimizer=make_async_optimizer`` when building the trainer: +Suppose we want to customize PPO to use an asynchronous-gradient optimization strategy similar to A3C. To do that, we could define a new function that returns ``AsyncGradientsOptimizer`` and override the ``make_policy_optimizer`` component of ``PPOTrainer``. .. code-block:: python - from ray.rllib.agents.ppo.ppo_policy import * + from ray.rllib.agents.ppo import PPOTrainer from ray.rllib.optimizers import AsyncGradientsOptimizer - from ray.rllib.policy.tf_policy_template import build_tf_policy def make_async_optimizer(workers, config): return AsyncGradientsOptimizer(workers, grads_per_step=100) - PPOTrainer = build_trainer( - ..., + CustomTrainer = PPOTrainer.with_updates( make_policy_optimizer=make_async_optimizer) +The ``with_updates`` method that we use here is also available for Torch and TF policies built from templates. + Now let's take a look at the ``update_kl`` function. This is used to adaptively adjust the KL penalty coefficient on the PPO loss, which bounds the policy change per training step. You'll notice the code handles both single and multi-agent cases (where there are be multiple policies each with different KL coeffs): .. code-block:: python @@ -342,7 +342,69 @@ In PPO we run ``setup_mixins`` before the loss function is called (i.e., ``befor **Example 2: Deep Q Networks** -(todo) +Let's look at how to implement a different family of policies, by looking at the `SimpleQ policy definition `__: + +.. code-block:: python + + SimpleQPolicy = build_tf_policy( + name="SimpleQPolicy", + get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, + make_model=build_q_models, + action_sampler_fn=build_action_sampler, + loss_fn=build_q_losses, + extra_action_feed_fn=exploration_setting_inputs, + extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, + extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, + before_init=setup_early_mixins, + after_init=setup_late_mixins, + obs_include_prev_action_reward=False, + mixins=[ + ExplorationStateMixin, + TargetNetworkMixin, + ]) + +The biggest difference from the policy gradient policies you saw previously is that SimpleQPolicy defines its own ``make_model`` and ``action_sampler_fn``. This means that the policy builder will not internally create a model and action distribution, rather it will call ``build_q_models`` and ``build_action_sampler`` to get the output action tensors. + +The model creation function actually creates two different models for DQN: the base Q network, and also a target network. It requires each model to be of type ``SimpleQModel``, which implements a ``get_q_values()`` method. The model catalog will raise an error if you try to use a custom ModelV2 model that isn't a subclass of SimpleQModel. Similarly, the full DQN policy requires models to subclass ``DistributionalQModel``, which implements ``get_q_value_distributions()`` and ``get_state_value()``: + +.. code-block:: python + + def build_q_models(policy, obs_space, action_space, config): + ... + + policy.q_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + name=Q_SCOPE, + model_interface=SimpleQModel, + q_hiddens=config["hiddens"]) + + policy.target_q_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + name=Q_TARGET_SCOPE, + model_interface=SimpleQModel, + q_hiddens=config["hiddens"]) + + return policy.q_model + +The action sampler is straightforward, it just takes the q_model, runs a forward pass, and returns the argmax over the actions: + +.. code-block:: python + + def build_action_sampler(policy, q_model, input_dict, obs_space, action_space, + config): + # do max over Q values... + ... + return action, action_prob + +The remainder of DQN is similar to other algorithms. Target updates are handled by a ``after_optimizer_step`` callback that periodically copies the weights of the Q network to the target. Finally, note that you do not have to use ``build_tf_policy`` to define a TensorFlow policy. You can alternatively subclass ``Policy``, ``TFPolicy``, or ``DynamicTFPolicy`` as convenient. @@ -375,7 +437,7 @@ While RLlib runs all TF operations in graph mode, you can still leverage TensorF You can find a runnable file for the above eager execution example `here `__. -There is also experimental support for running the entire loss function in eager mode. This can be enabled with ``use_eager: True``, e.g., ``rllib train --env=CartPole-v0 --run=PPO --config='{"use_eager": true, "simple_optimizer": true}'``. However this currently only works for a couple algorithms. +There is also experimental support for running the entire loss function in eager mode. This can be enabled with ``use_eager: True``, e.g., ``rllib train --env=CartPole-v0 --run=PPO --config='{"use_eager": true}'``. However this currently only works for PG, A2C, and PPO. Building Policies in PyTorch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/rllib-examples.rst b/doc/source/rllib-examples.rst index 604abf394..836442d57 100644 --- a/doc/source/rllib-examples.rst +++ b/doc/source/rllib-examples.rst @@ -73,7 +73,7 @@ Community Examples Example of training robotic control policies in SageMaker with RLlib. - `StarCraft2 `__: Example of training in StarCraft2 maps with RLlib / multi-agent. -- `NeuroCuts `__: +- `NeuroCuts `__: Example of building packet classification trees using RLlib / multi-agent in a bandit-like setting. - `Sequential Social Dilemma Games `__: Example of using the multi-agent API to model several `social dilemma games `__. diff --git a/doc/source/rllib-models.rst b/doc/source/rllib-models.rst index 6a05e5b1c..b1f933aee 100644 --- a/doc/source/rllib-models.rst +++ b/doc/source/rllib-models.rst @@ -184,14 +184,14 @@ You can use ``tf.layers.batch_normalization(x, training=input_dict["is_training" Custom Models (PyTorch) ----------------------- -Similarly, you can create and register custom PyTorch models for use with PyTorch-based algorithms (e.g., A2C, PG, QMIX). See these examples of `fully connected `__, `convolutional `__, and `recurrent `__ torch models. +Similarly, you can create and register custom PyTorch models for use with PyTorch-based algorithms (e.g., A2C, PG, QMIX). See these examples of `fully connected `__, `convolutional `__, and `recurrent `__ torch models. .. code-block:: python import ray from ray.rllib.agents import a3c from ray.rllib.models import ModelCatalog - from ray.rllib.models.pytorch.model import TorchModel + from ray.rllib.models.torch.model import TorchModel class CustomTorchModel(TorchModel): diff --git a/python/ray/rllib/agents/a3c/a3c_tf_policy.py b/python/ray/rllib/agents/a3c/a3c_tf_policy.py index d05f496a7..2104d0b90 100644 --- a/python/ray/rllib/agents/a3c/a3c_tf_policy.py +++ b/python/ray/rllib/agents/a3c/a3c_tf_policy.py @@ -56,7 +56,7 @@ def postprocess_advantages(policy, last_r = 0.0 else: next_state = [] - for i in range(len(policy.model.state_in)): + for i in range(len(policy.state_in)): next_state.append([sample_batch["state_out_{}".format(i)][-1]]) last_r = policy._value(sample_batch[SampleBatch.NEXT_OBS][-1], sample_batch[SampleBatch.ACTIONS][-1], @@ -79,11 +79,11 @@ class ValueNetworkMixin(object): self.get_placeholder(SampleBatch.CUR_OBS): [ob], self.get_placeholder(SampleBatch.PREV_ACTIONS): [prev_action], self.get_placeholder(SampleBatch.PREV_REWARDS): [prev_reward], - self.model.seq_lens: [1] + self.seq_lens: [1] } - assert len(args) == len(self.model.state_in), \ - (args, self.model.state_in) - for k, v in zip(self.model.state_in, args): + assert len(args) == len(self.state_in), \ + (args, self.state_in) + for k, v in zip(self.state_in, args): feed_dict[k] = v vf = self.get_session().run(self.vf, feed_dict) return vf[0] @@ -91,10 +91,11 @@ class ValueNetworkMixin(object): def stats(policy, batch_tensors): return { - "cur_lr": tf.cast(policy.cur_lr, tf.float64), + "cur_lr": tf.cast(policy.convert_to_eager(policy.cur_lr), tf.float64), "policy_loss": policy.loss.pi_loss, "policy_entropy": policy.loss.entropy, - "var_gnorm": tf.global_norm(policy.var_list), + "var_gnorm": tf.global_norm( + [policy.convert_to_eager(x) for x in policy.var_list]), "vf_loss": policy.loss.vf_loss, } diff --git a/python/ray/rllib/agents/ddpg/ddpg_policy.py b/python/ray/rllib/agents/ddpg/ddpg_policy.py index bb5fc25ef..ad9f4baf8 100644 --- a/python/ray/rllib/agents/ddpg/ddpg_policy.py +++ b/python/ray/rllib/agents/ddpg/ddpg_policy.py @@ -7,8 +7,7 @@ import numpy as np import ray import ray.experimental.tf_utils -from ray.rllib.agents.dqn.dqn_policy import (_huber_loss, _minimize_and_clip, - _scope_vars, _postprocess_dqn) +from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY from ray.rllib.models import ModelCatalog @@ -17,6 +16,7 @@ from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.policy.policy import Policy from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.utils import try_import_tf +from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip, scope_vars tf = try_import_tf() @@ -105,7 +105,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): with tf.variable_scope(POLICY_SCOPE) as scope: policy_out, self.policy_model = self._build_policy_network( self.cur_observations, observation_space, action_space) - self.policy_vars = _scope_vars(scope.name) + self.policy_vars = scope_vars(scope.name) # Noise vars for P network except for layer normalization vars if self.config["parameter_noise"]: @@ -154,7 +154,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): with tf.variable_scope(POLICY_TARGET_SCOPE) as scope: policy_tp1, _ = self._build_policy_network( self.obs_tp1, observation_space, action_space) - target_policy_vars = _scope_vars(scope.name) + target_policy_vars = scope_vars(scope.name) # Action outputs with tf.variable_scope(ACTION_SCOPE, reuse=True): @@ -179,7 +179,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): # Q-values for given actions & observations in given current q_t, self.q_model = self._build_q_network( self.obs_t, observation_space, action_space, self.act_t) - self.q_func_vars = _scope_vars(scope.name) + self.q_func_vars = scope_vars(scope.name) self.stats = { "mean_q": tf.reduce_mean(q_t), "max_q": tf.reduce_max(q_t), @@ -193,7 +193,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): with tf.variable_scope(TWIN_Q_SCOPE) as scope: twin_q_t, self.twin_q_model = self._build_q_network( self.obs_t, observation_space, action_space, self.act_t) - self.twin_q_func_vars = _scope_vars(scope.name) + self.twin_q_func_vars = scope_vars(scope.name) q_batchnorm_update_ops = list( set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops) @@ -201,13 +201,13 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): with tf.variable_scope(Q_TARGET_SCOPE) as scope: q_tp1, _ = self._build_q_network(self.obs_tp1, observation_space, action_space, policy_tp1_smoothed) - target_q_func_vars = _scope_vars(scope.name) + target_q_func_vars = scope_vars(scope.name) if self.config["twin_q"]: with tf.variable_scope(TWIN_Q_TARGET_SCOPE) as scope: twin_q_tp1, _ = self._build_q_network( self.obs_tp1, observation_space, action_space, policy_tp1_smoothed) - twin_target_q_func_vars = _scope_vars(scope.name) + twin_target_q_func_vars = scope_vars(scope.name) if self.config["twin_q"]: self.critic_loss, self.actor_loss, self.td_error \ @@ -330,12 +330,12 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): @override(TFPolicy) def gradients(self, optimizer, loss): if self.config["grad_norm_clipping"] is not None: - actor_grads_and_vars = _minimize_and_clip( + actor_grads_and_vars = minimize_and_clip( self._actor_optimizer, self.actor_loss, var_list=self.policy_vars, clip_val=self.config["grad_norm_clipping"]) - critic_grads_and_vars = _minimize_and_clip( + critic_grads_and_vars = minimize_and_clip( self._critic_optimizer, self.critic_loss, var_list=self.q_func_vars + self.twin_q_func_vars @@ -559,15 +559,15 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): twin_td_error = twin_q_t_selected - q_t_selected_target td_error = td_error + twin_td_error if use_huber: - errors = _huber_loss(td_error, huber_threshold) \ - + _huber_loss(twin_td_error, huber_threshold) + errors = huber_loss(td_error, huber_threshold) \ + + huber_loss(twin_td_error, huber_threshold) else: errors = 0.5 * tf.square(td_error) + 0.5 * tf.square( twin_td_error) else: td_error = q_t_selected - q_t_selected_target if use_huber: - errors = _huber_loss(td_error, huber_threshold) + errors = huber_loss(td_error, huber_threshold) else: errors = 0.5 * tf.square(td_error) diff --git a/python/ray/rllib/agents/dqn/__init__.py b/python/ray/rllib/agents/dqn/__init__.py index d3de8cb80..d23ea0393 100644 --- a/python/ray/rllib/agents/dqn/__init__.py +++ b/python/ray/rllib/agents/dqn/__init__.py @@ -3,12 +3,13 @@ from __future__ import division from __future__ import print_function from ray.rllib.agents.dqn.apex import ApexTrainer -from ray.rllib.agents.dqn.dqn import DQNTrainer, DEFAULT_CONFIG +from ray.rllib.agents.dqn.dqn import DQNTrainer, SimpleQTrainer, DEFAULT_CONFIG from ray.rllib.utils import renamed_agent DQNAgent = renamed_agent(DQNTrainer) ApexAgent = renamed_agent(ApexTrainer) __all__ = [ - "DQNAgent", "ApexAgent", "ApexTrainer", "DQNTrainer", "DEFAULT_CONFIG" + "DQNAgent", "ApexAgent", "ApexTrainer", "DQNTrainer", "DEFAULT_CONFIG", + "SimpleQTrainer" ] diff --git a/python/ray/rllib/agents/dqn/distributional_q_model.py b/python/ray/rllib/agents/dqn/distributional_q_model.py new file mode 100644 index 000000000..de0c0dbe6 --- /dev/null +++ b/python/ray/rllib/agents/dqn/distributional_q_model.py @@ -0,0 +1,250 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + + +class DistributionalQModel(TFModelV2): + """Extension of standard TFModel to provide distributional Q values. + + It also supports options for noisy nets and parameter space noise. + + Note that this class by itself is not a valid model unless you + implement forward() in a subclass.""" + + def __init__(self, + obs_space, + action_space, + num_outputs, + model_config, + name, + q_hiddens=(256, ), + dueling=False, + num_atoms=1, + use_noisy=False, + v_min=-10.0, + v_max=10.0, + sigma0=0.5, + parameter_noise=False): + """Initialize variables of this model. + + Extra model kwargs: + q_hiddens (list): defines size of hidden layers for the q head. + These will be used to postprocess the model output for the + purposes of computing Q values. + dueling (bool): whether to build the state value head for DDQN + num_atoms (int): if >1, enables distributional DQN + use_noisy (bool): use noisy nets + v_min (float): min value support for distributional DQN + v_max (float): max value support for distributional DQN + sigma0 (float): initial value of noisy nets + parameter_noise (bool): enable layer norm for param noise + + Note that the core layers for forward() are not defined here, this + only defines the layers for the Q head. Those layers for forward() + should be defined in subclasses of DistributionalQModel. + """ + + super(DistributionalQModel, self).__init__( + obs_space, action_space, num_outputs, model_config, name) + + # setup the Q head output (i.e., model for get_q_values) + self.model_out = tf.keras.layers.Input( + shape=(num_outputs, ), name="model_out") + + def build_action_value(model_out): + if q_hiddens: + action_out = model_out + for i in range(len(q_hiddens)): + if use_noisy: + action_out = self._noisy_layer( + "hidden_%d" % i, action_out, q_hiddens[i], sigma0) + elif parameter_noise: + import tensorflow.contrib.layers as layers + action_out = layers.fully_connected( + action_out, + num_outputs=q_hiddens[i], + activation_fn=tf.nn.relu, + normalizer_fn=layers.layer_norm) + else: + action_out = tf.layers.dense( + action_out, + units=q_hiddens[i], + activation=tf.nn.relu, + name="hidden_%d" % i) + else: + # Avoid postprocessing the outputs. This enables custom models + # to be used for parametric action DQN. + action_out = model_out + if use_noisy: + action_scores = self._noisy_layer( + "output", + action_out, + self.action_space.n * num_atoms, + sigma0, + non_linear=False) + elif q_hiddens: + action_scores = tf.layers.dense( + action_out, + units=self.action_space.n * num_atoms, + activation=None) + else: + action_scores = model_out + if num_atoms > 1: + # Distributional Q-learning uses a discrete support z + # to represent the action value distribution + z = tf.range(num_atoms, dtype=tf.float32) + z = v_min + z * (v_max - v_min) / float(num_atoms - 1) + support_logits_per_action = tf.reshape( + tensor=action_scores, + shape=(-1, self.action_space.n, num_atoms)) + support_prob_per_action = tf.nn.softmax( + logits=support_logits_per_action) + action_scores = tf.reduce_sum( + input_tensor=z * support_prob_per_action, axis=-1) + logits = support_logits_per_action + dist = support_prob_per_action + return [ + action_scores, z, support_logits_per_action, logits, dist + ] + else: + logits = tf.expand_dims(tf.ones_like(action_scores), -1) + dist = tf.expand_dims(tf.ones_like(action_scores), -1) + return [action_scores, logits, dist] + + def build_state_score(model_out): + state_out = model_out + for i in range(len(q_hiddens)): + if use_noisy: + state_out = self._noisy_layer("dueling_hidden_%d" % i, + state_out, q_hiddens[i], + sigma0) + elif parameter_noise: + state_out = tf.contrib.layers.fully_connected( + state_out, + num_outputs=q_hiddens[i], + activation_fn=tf.nn.relu, + normalizer_fn=tf.contrib.layers.layer_norm) + else: + state_out = tf.layers.dense( + state_out, units=q_hiddens[i], activation=tf.nn.relu) + if use_noisy: + state_score = self._noisy_layer( + "dueling_output", + state_out, + num_atoms, + sigma0, + non_linear=False) + else: + state_score = tf.layers.dense( + state_out, units=num_atoms, activation=None) + return state_score + + def build_action_value_in_scope(model_out): + with tf.variable_scope( + name + "/action_value", reuse=tf.AUTO_REUSE): + return build_action_value(model_out) + + def build_state_score_in_scope(model_out): + with tf.variable_scope(name + "/state_value", reuse=tf.AUTO_REUSE): + return build_state_score(model_out) + + q_out = tf.keras.layers.Lambda(build_action_value_in_scope)( + self.model_out) + self.q_value_head = tf.keras.Model(self.model_out, q_out) + self.register_variables(self.q_value_head.variables) + + if dueling: + state_out = tf.keras.layers.Lambda(build_state_score_in_scope)( + self.model_out) + self.state_value_head = tf.keras.Model(self.model_out, state_out) + self.register_variables(self.state_value_head.variables) + + def get_q_value_distributions(self, model_out): + """Returns distributional values for Q(s, a) given a state embedding. + + Override this in your custom model to customize the Q output head. + + Arguments: + model_out (Tensor): embedding from the model layers + + Returns: + (action_scores, logits, dist) if num_atoms == 1, otherwise + (action_scores, z, support_logits_per_action, logits, dist) + """ + + return self.q_value_head(model_out) + + def get_state_value(self, model_out): + """Returns the state value prediction for the given state embedding.""" + + return self.state_value_head(model_out) + + def _noisy_layer(self, + prefix, + action_in, + out_size, + sigma0, + non_linear=True): + """ + a common dense layer: y = w^{T}x + b + a noisy layer: y = (w + \epsilon_w*\sigma_w)^{T}x + + (b+\epsilon_b*\sigma_b) + where \epsilon are random variables sampled from factorized normal + distributions and \sigma are trainable variables which are expected to + vanish along the training procedure + """ + import tensorflow.contrib.layers as layers + + in_size = int(action_in.shape[1]) + + epsilon_in = tf.random_normal(shape=[in_size]) + epsilon_out = tf.random_normal(shape=[out_size]) + epsilon_in = self._f_epsilon(epsilon_in) + epsilon_out = self._f_epsilon(epsilon_out) + epsilon_w = tf.matmul( + a=tf.expand_dims(epsilon_in, -1), b=tf.expand_dims(epsilon_out, 0)) + epsilon_b = epsilon_out + sigma_w = tf.get_variable( + name=prefix + "_sigma_w", + shape=[in_size, out_size], + dtype=tf.float32, + initializer=tf.random_uniform_initializer( + minval=-1.0 / np.sqrt(float(in_size)), + maxval=1.0 / np.sqrt(float(in_size)))) + # TF noise generation can be unreliable on GPU + # If generating the noise on the CPU, + # lowering sigma0 to 0.1 may be helpful + sigma_b = tf.get_variable( + name=prefix + "_sigma_b", + shape=[out_size], + dtype=tf.float32, # 0.5~GPU, 0.1~CPU + initializer=tf.constant_initializer( + sigma0 / np.sqrt(float(in_size)))) + + w = tf.get_variable( + name=prefix + "_fc_w", + shape=[in_size, out_size], + dtype=tf.float32, + initializer=layers.xavier_initializer()) + b = tf.get_variable( + name=prefix + "_fc_b", + shape=[out_size], + dtype=tf.float32, + initializer=tf.zeros_initializer()) + + action_activation = tf.nn.xw_plus_b(action_in, w + sigma_w * epsilon_w, + b + sigma_b * epsilon_b) + + if not non_linear: + return action_activation + return tf.nn.relu(action_activation) + + def _f_epsilon(self, x): + return tf.sign(x) * tf.sqrt(tf.abs(x)) diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index cc418907a..3541d147d 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -8,6 +8,7 @@ from ray import tune from ray.rllib.agents.trainer import with_common_config from ray.rllib.agents.trainer_template import build_trainer from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy +from ray.rllib.agents.dqn.simple_q_policy import SimpleQPolicy from ray.rllib.optimizers import SyncReplayOptimizer from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule @@ -294,3 +295,5 @@ GenericOffPolicyTrainer = build_trainer( DQNTrainer = GenericOffPolicyTrainer.with_updates( name="DQN", default_policy=DQNTFPolicy, default_config=DEFAULT_CONFIG) + +SimpleQTrainer = DQNTrainer.with_updates(default_policy=SimpleQPolicy) diff --git a/python/ray/rllib/agents/dqn/dqn_policy.py b/python/ray/rllib/agents/dqn/dqn_policy.py index 505930406..c2e22f721 100644 --- a/python/ray/rllib/agents/dqn/dqn_policy.py +++ b/python/ray/rllib/agents/dqn/dqn_policy.py @@ -7,14 +7,16 @@ import numpy as np from scipy.stats import entropy import ray -from ray.rllib.policy.policy import Policy +from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel +from ray.rllib.agents.dqn.simple_q_policy import ExplorationStateMixin, \ + TargetNetworkMixin from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models import ModelCatalog, Categorical -from ray.rllib.utils.annotations import override from ray.rllib.utils.error import UnsupportedSpaceException -from ray.rllib.policy.tf_policy import TFPolicy, \ - LearningRateSchedule +from ray.rllib.policy.tf_policy import LearningRateSchedule from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.utils.tf_ops import huber_loss, reduce_mean_ignore_inf, \ + minimize_and_clip from ray.rllib.utils import try_import_tf tf = try_import_tf() @@ -93,7 +95,7 @@ class QLoss(object): self.td_error = ( q_t_selected - tf.stop_gradient(q_t_selected_target)) self.loss = tf.reduce_mean( - importance_weights * _huber_loss(self.td_error)) + importance_weights * huber_loss(self.td_error)) self.stats = { "mean_q": tf.reduce_mean(q_t_selected), "min_q": tf.reduce_min(q_t_selected), @@ -102,180 +104,6 @@ class QLoss(object): } -class QNetwork(object): - def __init__(self, - model, - num_actions, - dueling=False, - hiddens=[256], - use_noisy=False, - num_atoms=1, - v_min=-10.0, - v_max=10.0, - sigma0=0.5, - parameter_noise=False): - self.model = model - with tf.variable_scope("action_value"): - if hiddens: - action_out = model.last_layer - for i in range(len(hiddens)): - if use_noisy: - action_out = self.noisy_layer( - "hidden_%d" % i, action_out, hiddens[i], sigma0) - elif parameter_noise: - import tensorflow.contrib.layers as layers - action_out = layers.fully_connected( - action_out, - num_outputs=hiddens[i], - activation_fn=tf.nn.relu, - normalizer_fn=layers.layer_norm) - else: - action_out = tf.layers.dense( - action_out, - units=hiddens[i], - activation=tf.nn.relu) - else: - # Avoid postprocessing the outputs. This enables custom models - # to be used for parametric action DQN. - action_out = model.outputs - if use_noisy: - action_scores = self.noisy_layer( - "output", - action_out, - num_actions * num_atoms, - sigma0, - non_linear=False) - elif hiddens: - action_scores = tf.layers.dense( - action_out, units=num_actions * num_atoms, activation=None) - else: - action_scores = model.outputs - if num_atoms > 1: - # Distributional Q-learning uses a discrete support z - # to represent the action value distribution - z = tf.range(num_atoms, dtype=tf.float32) - z = v_min + z * (v_max - v_min) / float(num_atoms - 1) - support_logits_per_action = tf.reshape( - tensor=action_scores, shape=(-1, num_actions, num_atoms)) - support_prob_per_action = tf.nn.softmax( - logits=support_logits_per_action) - action_scores = tf.reduce_sum( - input_tensor=z * support_prob_per_action, axis=-1) - self.logits = support_logits_per_action - self.dist = support_prob_per_action - else: - self.logits = tf.expand_dims(tf.ones_like(action_scores), -1) - self.dist = tf.expand_dims(tf.ones_like(action_scores), -1) - - if dueling: - with tf.variable_scope("state_value"): - state_out = model.last_layer - for i in range(len(hiddens)): - if use_noisy: - state_out = self.noisy_layer("dueling_hidden_%d" % i, - state_out, hiddens[i], - sigma0) - elif parameter_noise: - state_out = tf.contrib.layers.fully_connected( - state_out, - num_outputs=hiddens[i], - activation_fn=tf.nn.relu, - normalizer_fn=tf.contrib.layers.layer_norm) - else: - state_out = tf.layers.dense( - state_out, units=hiddens[i], activation=tf.nn.relu) - if use_noisy: - state_score = self.noisy_layer( - "dueling_output", - state_out, - num_atoms, - sigma0, - non_linear=False) - else: - state_score = tf.layers.dense( - state_out, units=num_atoms, activation=None) - if num_atoms > 1: - support_logits_per_action_mean = tf.reduce_mean( - support_logits_per_action, 1) - support_logits_per_action_centered = ( - support_logits_per_action - tf.expand_dims( - support_logits_per_action_mean, 1)) - support_logits_per_action = tf.expand_dims( - state_score, 1) + support_logits_per_action_centered - support_prob_per_action = tf.nn.softmax( - logits=support_logits_per_action) - self.value = tf.reduce_sum( - input_tensor=z * support_prob_per_action, axis=-1) - self.logits = support_logits_per_action - self.dist = support_prob_per_action - else: - action_scores_mean = _reduce_mean_ignore_inf(action_scores, 1) - action_scores_centered = action_scores - tf.expand_dims( - action_scores_mean, 1) - self.value = state_score + action_scores_centered - else: - self.value = action_scores - - def f_epsilon(self, x): - return tf.sign(x) * tf.sqrt(tf.abs(x)) - - def noisy_layer(self, prefix, action_in, out_size, sigma0, - non_linear=True): - """ - a common dense layer: y = w^{T}x + b - a noisy layer: y = (w + \epsilon_w*\sigma_w)^{T}x + - (b+\epsilon_b*\sigma_b) - where \epsilon are random variables sampled from factorized normal - distributions and \sigma are trainable variables which are expected to - vanish along the training procedure - """ - import tensorflow.contrib.layers as layers - - in_size = int(action_in.shape[1]) - - epsilon_in = tf.random_normal(shape=[in_size]) - epsilon_out = tf.random_normal(shape=[out_size]) - epsilon_in = self.f_epsilon(epsilon_in) - epsilon_out = self.f_epsilon(epsilon_out) - epsilon_w = tf.matmul( - a=tf.expand_dims(epsilon_in, -1), b=tf.expand_dims(epsilon_out, 0)) - epsilon_b = epsilon_out - sigma_w = tf.get_variable( - name=prefix + "_sigma_w", - shape=[in_size, out_size], - dtype=tf.float32, - initializer=tf.random_uniform_initializer( - minval=-1.0 / np.sqrt(float(in_size)), - maxval=1.0 / np.sqrt(float(in_size)))) - # TF noise generation can be unreliable on GPU - # If generating the noise on the CPU, - # lowering sigma0 to 0.1 may be helpful - sigma_b = tf.get_variable( - name=prefix + "_sigma_b", - shape=[out_size], - dtype=tf.float32, # 0.5~GPU, 0.1~CPU - initializer=tf.constant_initializer( - sigma0 / np.sqrt(float(in_size)))) - - w = tf.get_variable( - name=prefix + "_fc_w", - shape=[in_size, out_size], - dtype=tf.float32, - initializer=layers.xavier_initializer()) - b = tf.get_variable( - name=prefix + "_fc_b", - shape=[out_size], - dtype=tf.float32, - initializer=tf.zeros_initializer()) - - action_activation = tf.nn.xw_plus_b(action_in, w + sigma_w * epsilon_w, - b + sigma_b * epsilon_b) - - if not non_linear: - return action_activation - return tf.nn.relu(action_activation) - - class QValuePolicy(object): def __init__(self, q_values, observations, num_actions, stochastic, eps, softmax, softmax_temp): @@ -305,44 +133,6 @@ class QValuePolicy(object): self.action_prob = None -class ExplorationStateMixin(object): - def __init__(self, obs_space, action_space, config): - self.cur_epsilon = 1.0 - self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") - self.eps = tf.placeholder(tf.float32, (), name="eps") - - def add_parameter_noise(self): - if self.config["parameter_noise"]: - self.sess.run(self.add_noise_op) - - def set_epsilon(self, epsilon): - self.cur_epsilon = epsilon - - @override(Policy) - def get_state(self): - return [TFPolicy.get_state(self), self.cur_epsilon] - - @override(Policy) - def set_state(self, state): - TFPolicy.set_state(self, state[0]) - self.set_epsilon(state[1]) - - -class TargetNetworkMixin(object): - def __init__(self, obs_space, action_space, config): - # update_target_fn will be called periodically to copy Q network to - # target Q network - update_target_expr = [] - assert len(self.q_func_vars) == len(self.target_q_func_vars), \ - (self.q_func_vars, self.target_q_func_vars) - for var, var_target in zip(self.q_func_vars, self.target_q_func_vars): - update_target_expr.append(var_target.assign(var)) - self.update_target_expr = tf.group(*update_target_expr) - - def update_target(self): - return self.get_session().run(self.update_target_expr) - - class ComputeTDErrorMixin(object): def compute_td_error(self, obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): @@ -350,7 +140,7 @@ class ComputeTDErrorMixin(object): return np.zeros_like(rew_t) td_err = self.get_session().run( - self.loss.td_error, + self.q_loss.td_error, feed_dict={ self.get_placeholder(SampleBatch.CUR_OBS): [ np.array(ob) for ob in obs_t @@ -394,20 +184,64 @@ def postprocess_trajectory(policy, return _postprocess_dqn(policy, sample_batch) -def build_q_networks(policy, input_dict, observation_space, action_space, - config): +def build_q_model(policy, obs_space, action_space, config): if not isinstance(action_space, Discrete): raise UnsupportedSpaceException( "Action space {} is not supported for DQN.".format(action_space)) + if config["hiddens"]: + num_outputs = 256 + config["model"]["no_final_linear"] = True + else: + num_outputs = action_space.n + + policy.q_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + model_interface=DistributionalQModel, + name=Q_SCOPE, + num_atoms=config["num_atoms"], + q_hiddens=config["hiddens"], + dueling=config["dueling"], + use_noisy=config["noisy"], + v_min=config["v_min"], + v_max=config["v_max"], + sigma0=config["sigma0"], + parameter_noise=config["parameter_noise"]) + + policy.target_q_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + model_interface=DistributionalQModel, + name=Q_TARGET_SCOPE, + num_atoms=config["num_atoms"], + q_hiddens=config["hiddens"], + dueling=config["dueling"], + use_noisy=config["noisy"], + v_min=config["v_min"], + v_max=config["v_max"], + sigma0=config["sigma0"], + parameter_noise=config["parameter_noise"]) + + return policy.q_model + + +def build_q_networks(policy, q_model, input_dict, obs_space, action_space, + config): + # Action Q network - with tf.variable_scope(Q_SCOPE) as scope: - q_values, q_logits, q_dist, _ = _build_q_network( - policy, input_dict[SampleBatch.CUR_OBS], observation_space, - action_space) - policy.q_values = q_values - policy.q_func_vars = _scope_vars(scope.name) + q_values, q_logits, q_dist = _compute_q_values( + policy, q_model, input_dict[SampleBatch.CUR_OBS], obs_space, + action_space) + policy.q_values = q_values + policy.q_func_vars = q_model.variables() # Noise vars for Q network except for layer normalization vars if config["parameter_noise"]: @@ -419,7 +253,7 @@ def build_q_networks(policy, input_dict, observation_space, action_space, # Action outputs qvp = QValuePolicy(q_values, input_dict[SampleBatch.CUR_OBS], action_space.n, policy.stochastic, policy.eps, - policy.config["soft_q"], policy.config["softmax_temp"]) + config["soft_q"], config["softmax_temp"]) policy.output_actions, policy.action_prob = qvp.action, qvp.action_prob return policy.output_actions, policy.action_prob @@ -463,37 +297,36 @@ def _build_parameter_noise(policy, pnet_params): def build_q_losses(policy, batch_tensors): + config = policy.config # q network evaluation - with tf.variable_scope(Q_SCOPE, reuse=True): - prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - q_t, q_logits_t, q_dist_t, model = _build_q_network( - policy, batch_tensors[SampleBatch.CUR_OBS], - policy.observation_space, policy.action_space) - policy.q_batchnorm_update_ops = list( - set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops) + prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) + q_t, q_logits_t, q_dist_t = _compute_q_values( + policy, policy.q_model, batch_tensors[SampleBatch.CUR_OBS], + policy.observation_space, policy.action_space) + policy.q_batchnorm_update_ops = list( + set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops) # target q network evalution - with tf.variable_scope(Q_TARGET_SCOPE) as scope: - q_tp1, q_logits_tp1, q_dist_tp1, _ = _build_q_network( - policy, batch_tensors[SampleBatch.NEXT_OBS], - policy.observation_space, policy.action_space) - policy.target_q_func_vars = _scope_vars(scope.name) + q_tp1, q_logits_tp1, q_dist_tp1 = _compute_q_values( + policy, policy.target_q_model, batch_tensors[SampleBatch.NEXT_OBS], + policy.observation_space, policy.action_space) + policy.target_q_func_vars = policy.target_q_model.variables() # q scores for actions which we know were selected in the given state. - one_hot_selection = tf.one_hot(batch_tensors[SampleBatch.ACTIONS], - policy.action_space.n) + one_hot_selection = tf.one_hot( + tf.cast(batch_tensors[SampleBatch.ACTIONS], tf.int32), + policy.action_space.n) q_t_selected = tf.reduce_sum(q_t * one_hot_selection, 1) q_logits_t_selected = tf.reduce_sum( q_logits_t * tf.expand_dims(one_hot_selection, -1), 1) # compute estimate of best possible value starting from state at t + 1 - if policy.config["double_q"]: - with tf.variable_scope(Q_SCOPE, reuse=True): - q_tp1_using_online_net, q_logits_tp1_using_online_net, \ - q_dist_tp1_using_online_net, _ = _build_q_network( - policy, - batch_tensors[SampleBatch.NEXT_OBS], - policy.observation_space, policy.action_space) + if config["double_q"]: + q_tp1_using_online_net, q_logits_tp1_using_online_net, \ + q_dist_tp1_using_online_net = _compute_q_values( + policy, policy.q_model, + batch_tensors[SampleBatch.NEXT_OBS], + policy.observation_space, policy.action_space) q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) q_tp1_best_one_hot_selection = tf.one_hot(q_tp1_best_using_online_net, policy.action_space.n) @@ -507,12 +340,14 @@ def build_q_losses(policy, batch_tensors): q_dist_tp1_best = tf.reduce_sum( q_dist_tp1 * tf.expand_dims(q_tp1_best_one_hot_selection, -1), 1) - policy.loss = _build_q_loss( + policy.q_loss = QLoss( q_t_selected, q_logits_t_selected, q_tp1_best, q_dist_tp1_best, - batch_tensors[SampleBatch.REWARDS], batch_tensors[SampleBatch.DONES], - batch_tensors[PRIO_WEIGHTS], policy.config) + batch_tensors[PRIO_WEIGHTS], batch_tensors[SampleBatch.REWARDS], + tf.cast(batch_tensors[SampleBatch.DONES], + tf.float32), config["gamma"], config["n_step"], + config["num_atoms"], config["v_min"], config["v_max"]) - return policy.loss.loss + return policy.q_loss.loss def adam_optimizer(policy, config): @@ -522,7 +357,7 @@ def adam_optimizer(policy, config): def clip_gradients(policy, optimizer, loss): if policy.config["grad_norm_clipping"] is not None: - grads_and_vars = _minimize_and_clip( + grads_and_vars = minimize_and_clip( optimizer, loss, var_list=policy.q_func_vars, @@ -544,7 +379,7 @@ def exploration_setting_inputs(policy): def build_q_stats(policy, batch_tensors): return dict({ "cur_lr": tf.cast(policy.cur_lr, tf.float64), - }, **policy.loss.stats) + }, **policy.q_loss.stats) def setup_early_mixins(policy, obs_space, action_space, config): @@ -556,33 +391,45 @@ def setup_late_mixins(policy, obs_space, action_space, config): TargetNetworkMixin.__init__(policy, obs_space, action_space, config) -def _build_q_network(policy, obs, obs_space, action_space): +def _compute_q_values(policy, model, obs, obs_space, action_space): config = policy.config - qnet = QNetwork( - ModelCatalog.get_model({ - "obs": obs, - "is_training": policy._get_is_training_placeholder(), - }, obs_space, action_space, action_space.n, config["model"]), - action_space.n, config["dueling"], config["hiddens"], config["noisy"], - config["num_atoms"], config["v_min"], config["v_max"], - config["sigma0"], config["parameter_noise"]) - return qnet.value, qnet.logits, qnet.dist, qnet.model + model_out, state = model({ + "obs": obs, + "is_training": policy._get_is_training_placeholder(), + }, [], None) + if config["num_atoms"] > 1: + (action_scores, z, support_logits_per_action, logits, + dist) = model.get_q_value_distributions(model_out) + else: + (action_scores, logits, + dist) = model.get_q_value_distributions(model_out) -def _build_q_value_policy(policy, q_values): - policy = QValuePolicy(q_values, policy.cur_observations, - policy.num_actions, policy.stochastic, policy.eps, - policy.config["soft_q"], - policy.config["softmax_temp"]) - return policy.action, policy.action_prob + if config["dueling"]: + state_score = model.get_state_value(model_out) + if config["num_atoms"] > 1: + support_logits_per_action_mean = tf.reduce_mean( + support_logits_per_action, 1) + support_logits_per_action_centered = ( + support_logits_per_action - tf.expand_dims( + support_logits_per_action_mean, 1)) + support_logits_per_action = tf.expand_dims( + state_score, 1) + support_logits_per_action_centered + support_prob_per_action = tf.nn.softmax( + logits=support_logits_per_action) + value = tf.reduce_sum( + input_tensor=z * support_prob_per_action, axis=-1) + logits = support_logits_per_action + dist = support_prob_per_action + else: + action_scores_mean = reduce_mean_ignore_inf(action_scores, 1) + action_scores_centered = action_scores - tf.expand_dims( + action_scores_mean, 1) + value = state_score + action_scores_centered + else: + value = action_scores - -def _build_q_loss(q_t_selected, q_logits_t_selected, q_tp1_best, - q_dist_tp1_best, rewards, dones, importance_weights, config): - return QLoss(q_t_selected, q_logits_t_selected, q_tp1_best, - q_dist_tp1_best, importance_weights, rewards, - tf.cast(dones, tf.float32), config["gamma"], config["n_step"], - config["num_atoms"], config["v_min"], config["v_max"]) + return value, logits, dist def _adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): @@ -634,61 +481,11 @@ def _postprocess_dqn(policy, batch): return batch -def _reduce_mean_ignore_inf(x, axis): - """Same as tf.reduce_mean() but ignores -inf values.""" - mask = tf.not_equal(x, tf.float32.min) - x_zeroed = tf.where(mask, x, tf.zeros_like(x)) - return (tf.reduce_sum(x_zeroed, axis) / tf.reduce_sum( - tf.cast(mask, tf.float32), axis)) - - -def _huber_loss(x, delta=1.0): - """Reference: https://en.wikipedia.org/wiki/Huber_loss""" - return tf.where( - tf.abs(x) < delta, - tf.square(x) * 0.5, delta * (tf.abs(x) - 0.5 * delta)) - - -def _minimize_and_clip(optimizer, objective, var_list, clip_val=10): - """Minimized `objective` using `optimizer` w.r.t. variables in - `var_list` while ensure the norm of the gradients for each - variable is clipped to `clip_val` - """ - gradients = optimizer.compute_gradients(objective, var_list=var_list) - for i, (grad, var) in enumerate(gradients): - if grad is not None: - gradients[i] = (tf.clip_by_norm(grad, clip_val), var) - return gradients - - -def _scope_vars(scope, trainable_only=False): - """ - Get variables inside a scope - The scope can be specified as a string - - Parameters - ---------- - scope: str or VariableScope - scope in which the variables reside. - trainable_only: bool - whether or not to return only the variables that were marked as - trainable. - - Returns - ------- - vars: [tf.Variable] - list of variables in `scope`. - """ - return tf.get_collection( - tf.GraphKeys.TRAINABLE_VARIABLES - if trainable_only else tf.GraphKeys.VARIABLES, - scope=scope if isinstance(scope, str) else scope.name) - - DQNTFPolicy = build_tf_policy( name="DQNTFPolicy", get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, - make_action_sampler=build_q_networks, + make_model=build_q_model, + action_sampler_fn=build_q_networks, loss_fn=build_q_losses, stats_fn=build_q_stats, postprocess_fn=postprocess_trajectory, @@ -696,7 +493,7 @@ DQNTFPolicy = build_tf_policy( gradients_fn=clip_gradients, extra_action_feed_fn=exploration_setting_inputs, extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, - extra_learn_fetches_fn=lambda policy: {"td_error": policy.loss.td_error}, + extra_learn_fetches_fn=lambda policy: {"td_error": policy.q_loss.td_error}, update_ops_fn=lambda policy: policy.q_batchnorm_update_ops, before_init=setup_early_mixins, after_init=setup_late_mixins, diff --git a/python/ray/rllib/agents/dqn/simple_q_model.py b/python/ray/rllib/agents/dqn/simple_q_model.py new file mode 100644 index 000000000..d9815205f --- /dev/null +++ b/python/ray/rllib/agents/dqn/simple_q_model.py @@ -0,0 +1,69 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + + +class SimpleQModel(TFModelV2): + """Extension of standard TFModel to provide Q values. + + Note that this class by itself is not a valid model unless you + implement forward() in a subclass.""" + + def __init__(self, + obs_space, + action_space, + num_outputs, + model_config, + name, + q_hiddens=(256, )): + """Initialize variables of this model. + + Extra model kwargs: + q_hiddens (list): defines size of hidden layers for the q head. + These will be used to postprocess the model output for the + purposes of computing Q values. + + Note that the core layers for forward() are not defined here, this + only defines the layers for the Q head. Those layers for forward() + should be defined in subclasses of SimpleQModel. + """ + + super(SimpleQModel, self).__init__(obs_space, action_space, + num_outputs, model_config, name) + + # setup the Q head output (i.e., model for get_q_values) + self.model_out = tf.keras.layers.Input( + shape=(num_outputs, ), name="model_out") + + if q_hiddens: + last_layer = self.model_out + for i, n in enumerate(q_hiddens): + last_layer = tf.keras.layers.Dense( + n, name="q_hidden_{}".format(i), + activation=tf.nn.relu)(last_layer) + q_out = tf.keras.layers.Dense( + action_space.n, activation=None, name="q_out")(last_layer) + else: + q_out = self.model_out + + self.q_value_head = tf.keras.Model(self.model_out, q_out) + self.register_variables(self.q_value_head.variables) + + def get_q_values(self, model_out): + """Returns Q(s, a) given a feature tensor for the state. + + Override this in your custom model to customize the Q output head. + + Arguments: + model_out (Tensor): embedding from the model layers + + Returns: + action scores Q(s, a) for each action, shape [None, action_space.n] + """ + + return self.q_value_head(model_out) diff --git a/python/ray/rllib/agents/dqn/simple_q_policy.py b/python/ray/rllib/agents/dqn/simple_q_policy.py new file mode 100644 index 000000000..15ae8b976 --- /dev/null +++ b/python/ray/rllib/agents/dqn/simple_q_policy.py @@ -0,0 +1,211 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +"""Basic example of a DQN policy without any optimizations.""" + +from gym.spaces import Discrete + +import ray +from ray.rllib.agents.dqn.simple_q_model import SimpleQModel +from ray.rllib.policy.policy import Policy +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.models import ModelCatalog +from ray.rllib.utils.annotations import override +from ray.rllib.utils.error import UnsupportedSpaceException +from ray.rllib.policy.tf_policy import TFPolicy +from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.utils import try_import_tf +from ray.rllib.utils.tf_ops import huber_loss + +tf = try_import_tf() + +Q_SCOPE = "q_func" +Q_TARGET_SCOPE = "target_q_func" + + +class ExplorationStateMixin(object): + def __init__(self, obs_space, action_space, config): + self.cur_epsilon = 1.0 + self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") + self.eps = tf.placeholder(tf.float32, (), name="eps") + + def add_parameter_noise(self): + if self.config["parameter_noise"]: + self.sess.run(self.add_noise_op) + + def set_epsilon(self, epsilon): + self.cur_epsilon = epsilon + + @override(Policy) + def get_state(self): + return [TFPolicy.get_state(self), self.cur_epsilon] + + @override(Policy) + def set_state(self, state): + TFPolicy.set_state(self, state[0]) + self.set_epsilon(state[1]) + + +class TargetNetworkMixin(object): + def __init__(self, obs_space, action_space, config): + # update_target_fn will be called periodically to copy Q network to + # target Q network + update_target_expr = [] + assert len(self.q_func_vars) == len(self.target_q_func_vars), \ + (self.q_func_vars, self.target_q_func_vars) + for var, var_target in zip(self.q_func_vars, self.target_q_func_vars): + update_target_expr.append(var_target.assign(var)) + self.update_target_expr = tf.group(*update_target_expr) + + def update_target(self): + return self.get_session().run(self.update_target_expr) + + +def build_q_models(policy, obs_space, action_space, config): + + if not isinstance(action_space, Discrete): + raise UnsupportedSpaceException( + "Action space {} is not supported for DQN.".format(action_space)) + + if config["hiddens"]: + num_outputs = 256 + config["model"]["no_final_linear"] = True + else: + num_outputs = action_space.n + + policy.q_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + name=Q_SCOPE, + model_interface=SimpleQModel, + q_hiddens=config["hiddens"]) + + policy.target_q_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + name=Q_TARGET_SCOPE, + model_interface=SimpleQModel, + q_hiddens=config["hiddens"]) + + return policy.q_model + + +def build_action_sampler(policy, q_model, input_dict, obs_space, action_space, + config): + + # Action Q network + q_values = _compute_q_values(policy, q_model, + input_dict[SampleBatch.CUR_OBS], obs_space, + action_space) + policy.q_values = q_values + policy.q_func_vars = q_model.variables() + + # Action outputs + deterministic_actions = tf.argmax(q_values, axis=1) + batch_size = tf.shape(input_dict[SampleBatch.CUR_OBS])[0] + + # Special case masked out actions (q_value ~= -inf) so that we don't + # even consider them for exploration. + random_valid_action_logits = tf.where( + tf.equal(q_values, tf.float32.min), + tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values)) + random_actions = tf.squeeze( + tf.multinomial(random_valid_action_logits, 1), axis=1) + + chose_random = tf.random_uniform( + tf.stack([batch_size]), minval=0, maxval=1, + dtype=tf.float32) < policy.eps + stochastic_actions = tf.where(chose_random, random_actions, + deterministic_actions) + action = tf.cond(policy.stochastic, lambda: stochastic_actions, + lambda: deterministic_actions) + action_prob = None + + return action, action_prob + + +def build_q_losses(policy, batch_tensors): + # q network evaluation + q_t = _compute_q_values(policy, policy.q_model, + batch_tensors[SampleBatch.CUR_OBS], + policy.observation_space, policy.action_space) + + # target q network evalution + q_tp1 = _compute_q_values(policy, policy.target_q_model, + batch_tensors[SampleBatch.NEXT_OBS], + policy.observation_space, policy.action_space) + policy.target_q_func_vars = policy.target_q_model.variables() + + # q scores for actions which we know were selected in the given state. + one_hot_selection = tf.one_hot( + tf.cast(batch_tensors[SampleBatch.ACTIONS], tf.int32), + policy.action_space.n) + q_t_selected = tf.reduce_sum(q_t * one_hot_selection, 1) + + # compute estimate of best possible value starting from state at t + 1 + dones = tf.cast(batch_tensors[SampleBatch.DONES], tf.float32) + q_tp1_best_one_hot_selection = tf.one_hot( + tf.argmax(q_tp1, 1), policy.action_space.n) + q_tp1_best = tf.reduce_sum(q_tp1 * q_tp1_best_one_hot_selection, 1) + q_tp1_best_masked = (1.0 - dones) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = (batch_tensors[SampleBatch.REWARDS] + + policy.config["gamma"] * q_tp1_best_masked) + + # compute the error (potentially clipped) + td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) + loss = tf.reduce_mean(huber_loss(td_error)) + + # save TD error as an attribute for outside access + policy.td_error = td_error + + return loss + + +def _compute_q_values(policy, model, obs, obs_space, action_space): + input_dict = { + "obs": obs, + "is_training": policy._get_is_training_placeholder(), + } + model_out, _ = model(input_dict, [], None) + return model.get_q_values(model_out) + + +def exploration_setting_inputs(policy): + return { + policy.stochastic: True, + policy.eps: policy.cur_epsilon, + } + + +def setup_early_mixins(policy, obs_space, action_space, config): + ExplorationStateMixin.__init__(policy, obs_space, action_space, config) + + +def setup_late_mixins(policy, obs_space, action_space, config): + TargetNetworkMixin.__init__(policy, obs_space, action_space, config) + + +SimpleQPolicy = build_tf_policy( + name="SimpleQPolicy", + get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, + make_model=build_q_models, + action_sampler_fn=build_action_sampler, + loss_fn=build_q_losses, + extra_action_feed_fn=exploration_setting_inputs, + extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, + extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, + before_init=setup_early_mixins, + after_init=setup_late_mixins, + obs_include_prev_action_reward=False, + mixins=[ + ExplorationStateMixin, + TargetNetworkMixin, + ]) diff --git a/python/ray/rllib/agents/marwil/marwil_policy.py b/python/ray/rllib/agents/marwil/marwil_policy.py index add021025..8c8eafba3 100644 --- a/python/ray/rllib/agents/marwil/marwil_policy.py +++ b/python/ray/rllib/agents/marwil/marwil_policy.py @@ -11,9 +11,9 @@ from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY from ray.rllib.utils.annotations import override from ray.rllib.policy.policy import Policy from ray.rllib.policy.tf_policy import TFPolicy -from ray.rllib.agents.dqn.dqn_policy import _scope_vars from ray.rllib.utils.explained_variance import explained_variance from ray.rllib.utils import try_import_tf +from ray.rllib.utils.tf_ops import scope_vars tf = try_import_tf() @@ -103,7 +103,7 @@ class MARWILPolicy(MARWILPostprocessing, TFPolicy): }, observation_space, action_space, logit_dim, self.config["model"]) logits = self.model.outputs - self.p_func_vars = _scope_vars(scope.name) + self.p_func_vars = scope_vars(scope.name) # Action outputs action_dist = dist_cls(logits) @@ -116,7 +116,7 @@ class MARWILPolicy(MARWILPostprocessing, TFPolicy): # v network evaluation with tf.variable_scope(VALUE_SCOPE) as scope: state_values = self.model.value_function() - self.v_func_vars = _scope_vars(scope.name) + self.v_func_vars = scope_vars(scope.name) self.v_loss = self._build_value_loss(state_values, self.cum_rew_t) self.p_loss = self._build_policy_loss(state_values, self.cum_rew_t, logits, self.act_t, action_space) diff --git a/python/ray/rllib/agents/ppo/appo_policy.py b/python/ray/rllib/agents/ppo/appo_policy.py index 56e473a61..e5941283e 100644 --- a/python/ray/rllib/agents/ppo/appo_policy.py +++ b/python/ray/rllib/agents/ppo/appo_policy.py @@ -179,8 +179,8 @@ def _make_time_major(policy, tensor, drop_last=False): if isinstance(tensor, list): return [_make_time_major(policy, t, drop_last) for t in tensor] - if policy.model.state_init: - B = tf.shape(policy.model.seq_lens)[0] + if policy.state_in: + B = tf.shape(policy.seq_lens)[0] T = tf.shape(tensor)[0] // B else: # Important: chop the tensor into batches at known episode cut @@ -219,15 +219,14 @@ def build_appo_surrogate_loss(policy, batch_tensors): behaviour_logits = batch_tensors[BEHAVIOUR_LOGITS] unpacked_behaviour_logits = tf.split( behaviour_logits, output_hidden_shape, axis=1) - unpacked_outputs = tf.split( - policy.model.outputs, output_hidden_shape, axis=1) + unpacked_outputs = tf.split(policy.model_out, output_hidden_shape, axis=1) action_dist = policy.action_dist prev_action_dist = policy.dist_class(behaviour_logits) values = policy.value_function - if policy.model.state_in: - max_seq_len = tf.reduce_max(policy.model.seq_lens) - 1 - mask = tf.sequence_mask(policy.model.seq_lens, max_seq_len) + if policy.state_in: + max_seq_len = tf.reduce_max(policy.seq_lens) - 1 + mask = tf.sequence_mask(policy.seq_lens, max_seq_len) mask = tf.reshape(mask, [-1]) else: mask = tf.ones_like(rewards) @@ -316,7 +315,7 @@ def postprocess_trajectory(policy, last_r = 0.0 else: next_state = [] - for i in range(len(policy.model.state_in)): + for i in range(len(policy.state_in)): next_state.append([sample_batch["state_out_{}".format(i)][-1]]) last_r = policy.value(sample_batch["new_obs"][-1], *next_state) batch = compute_advantages( @@ -332,7 +331,7 @@ def postprocess_trajectory(policy, def add_values_and_logits(policy): - out = {BEHAVIOUR_LOGITS: policy.model.outputs} + out = {BEHAVIOUR_LOGITS: policy.model_out} if not policy.config["vtrace"]: out[SampleBatch.VF_PREDS] = policy.value_function return out @@ -367,11 +366,11 @@ class ValueNetworkMixin(object): def value(self, ob, *args): feed_dict = { self.get_placeholder(SampleBatch.CUR_OBS): [ob], - self.model.seq_lens: [1] + self.seq_lens: [1] } - assert len(args) == len(self.model.state_in), \ - (args, self.model.state_in) - for k, v in zip(self.model.state_in, args): + assert len(args) == len(self.state_in), \ + (args, self.state_in) + for k, v in zip(self.state_in, args): feed_dict[k] = v vf = self.get_session().run(self.value_function, feed_dict) return vf[0] diff --git a/python/ray/rllib/agents/ppo/ppo.py b/python/ray/rllib/agents/ppo/ppo.py index a21c3d28f..593fae1c4 100644 --- a/python/ray/rllib/agents/ppo/ppo.py +++ b/python/ray/rllib/agents/ppo/ppo.py @@ -151,6 +151,9 @@ def validate_config(config): "FYI: By default, the value function will not share layers " "with the policy model ('vf_share_layers': False).") + # auto set the model option for layer sharing + config["model"]["vf_share_layers"] = config["vf_share_layers"] + PPOTrainer = build_trainer( name="PPO", diff --git a/python/ray/rllib/agents/ppo/ppo_policy.py b/python/ray/rllib/agents/ppo/ppo_policy.py index ad79d90fa..e12da8e70 100644 --- a/python/ray/rllib/agents/ppo/ppo_policy.py +++ b/python/ray/rllib/agents/ppo/ppo_policy.py @@ -105,11 +105,10 @@ class PPOLoss(object): def ppo_surrogate_loss(policy, batch_tensors): - if policy.model.state_in: - max_seq_len = tf.reduce_max( - policy.convert_to_eager(policy.model.seq_lens)) + if policy.state_in: + max_seq_len = tf.reduce_max(policy.convert_to_eager(policy.seq_lens)) mask = tf.sequence_mask( - policy.convert_to_eager(policy.model.seq_lens), max_seq_len) + policy.convert_to_eager(policy.seq_lens), max_seq_len) mask = tf.reshape(mask, [-1]) else: mask = tf.ones_like( @@ -136,28 +135,26 @@ def ppo_surrogate_loss(policy, batch_tensors): def kl_and_loss_stats(policy, batch_tensors): - policy.explained_variance = explained_variance( - batch_tensors[Postprocessing.VALUE_TARGETS], policy.value_function) - - stats_fetches = { - "cur_kl_coeff": policy.kl_coeff, - "cur_lr": tf.cast(policy.cur_lr, tf.float64), + return { + "cur_kl_coeff": tf.cast( + policy.convert_to_eager(policy.kl_coeff), tf.float64), + "cur_lr": tf.cast(policy.convert_to_eager(policy.cur_lr), tf.float64), "total_loss": policy.loss_obj.loss, "policy_loss": policy.loss_obj.mean_policy_loss, "vf_loss": policy.loss_obj.mean_vf_loss, - "vf_explained_var": policy.explained_variance, + "vf_explained_var": explained_variance( + batch_tensors[Postprocessing.VALUE_TARGETS], + policy.convert_to_eager(policy.value_function)), "kl": policy.loss_obj.mean_kl, "entropy": policy.loss_obj.mean_entropy, } - return stats_fetches - def vf_preds_and_logits_fetches(policy): """Adds value function and logits outputs to experience batches.""" return { SampleBatch.VF_PREDS: policy.value_function, - BEHAVIOUR_LOGITS: policy.model.outputs, + BEHAVIOUR_LOGITS: policy.model_out, } @@ -172,7 +169,7 @@ def postprocess_ppo_gae(policy, last_r = 0.0 else: next_state = [] - for i in range(len(policy.model.state_in)): + for i in range(len(policy.state_in)): next_state.append([sample_batch["state_out_{}".format(i)][-1]]) last_r = policy._value(sample_batch[SampleBatch.NEXT_OBS][-1], sample_batch[SampleBatch.ACTIONS][-1], @@ -225,27 +222,7 @@ class KLCoeffMixin(object): class ValueNetworkMixin(object): def __init__(self, obs_space, action_space, config): if config["use_gae"]: - if config["vf_share_layers"]: - self.value_function = self.model.value_function() - else: - vf_config = config["model"].copy() - # Do not split the last layer of the value function into - # mean parameters and standard deviation parameters and - # do not make the standard deviations free variables. - vf_config["free_log_std"] = False - if vf_config["use_lstm"]: - vf_config["use_lstm"] = False - logger.warning( - "It is not recommended to use a LSTM model with " - "vf_share_layers=False (consider setting it to True). " - "If you want to not share layers, you can implement " - "a custom LSTM model that overrides the " - "value_function() method.") - with tf.variable_scope("value_function"): - self.value_function = ModelCatalog.get_model( - self.get_obs_input_dict(), obs_space, action_space, 1, - vf_config).outputs - self.value_function = tf.reshape(self.value_function, [-1]) + self.value_function = self.model.value_function() else: self.value_function = tf.zeros( shape=tf.shape(self.get_placeholder(SampleBatch.CUR_OBS))[:1]) @@ -255,11 +232,10 @@ class ValueNetworkMixin(object): self.get_placeholder(SampleBatch.CUR_OBS): [ob], self.get_placeholder(SampleBatch.PREV_ACTIONS): [prev_action], self.get_placeholder(SampleBatch.PREV_REWARDS): [prev_reward], - self.model.seq_lens: [1] + self.seq_lens: [1] } - assert len(args) == len(self.model.state_in), \ - (args, self.model.state_in) - for k, v in zip(self.model.state_in, args): + assert len(args) == len(self.state_in), (args, self.state_in) + for k, v in zip(self.state_in, args): feed_dict[k] = v vf = self.get_session().run(self.value_function, feed_dict) return vf[0] diff --git a/python/ray/rllib/agents/qmix/model.py b/python/ray/rllib/agents/qmix/model.py index 660a9b484..dd4377a4a 100644 --- a/python/ray/rllib/agents/qmix/model.py +++ b/python/ray/rllib/agents/qmix/model.py @@ -6,7 +6,7 @@ from torch import nn import torch.nn.functional as F from ray.rllib.models.preprocessors import get_preprocessor -from ray.rllib.models.pytorch.model import TorchModel +from ray.rllib.models.torch.model import TorchModel from ray.rllib.utils.annotations import override diff --git a/python/ray/rllib/agents/registry.py b/python/ray/rllib/agents/registry.py index aa70275ad..2d322fff9 100644 --- a/python/ray/rllib/agents/registry.py +++ b/python/ray/rllib/agents/registry.py @@ -59,6 +59,11 @@ def _import_dqn(): return dqn.DQNTrainer +def _import_simple_q(): + from ray.rllib.agents import dqn + return dqn.SimpleQTrainer + + def _import_apex(): from ray.rllib.agents import dqn return dqn.ApexTrainer @@ -97,6 +102,7 @@ ALGORITHMS = { "ES": _import_es, "ARS": _import_ars, "DQN": _import_dqn, + "SimpleQ": _import_simple_q, "APEX": _import_apex, "A3C": _import_a3c, "A2C": _import_a2c, diff --git a/python/ray/rllib/evaluation/rollout_worker.py b/python/ray/rllib/evaluation/rollout_worker.py index 3be01a429..489c21c08 100644 --- a/python/ray/rllib/evaluation/rollout_worker.py +++ b/python/ray/rllib/evaluation/rollout_worker.py @@ -299,6 +299,8 @@ class RolloutWorker(EvaluatorInterface): logger.info("Creating policy evaluation worker {}".format( worker_index) + " on CPU (please ignore any CUDA init errors)") + if not tf: + raise ImportError("Could not import tensorflow") with tf.Graph().as_default(): if tf_session_creator: self.tf_sess = tf_session_creator() diff --git a/python/ray/rllib/examples/custom_keras_model.py b/python/ray/rllib/examples/custom_keras_model.py new file mode 100644 index 000000000..ea1c5d8ff --- /dev/null +++ b/python/ray/rllib/examples/custom_keras_model.py @@ -0,0 +1,109 @@ +"""Example of using a custom ModelV2 Keras-style model. + +TODO(ekl): add this to docs once ModelV2 is fully implemented. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse + +import ray +from ray import tune +from ray.rllib.models import ModelCatalog +from ray.rllib.models.misc import normc_initializer +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + +parser = argparse.ArgumentParser() +parser.add_argument("--run", type=str, default="SimpleQ") # Try PG, PPO, DQN +parser.add_argument("--stop", type=int, default=200) + + +class MyKerasModel(TFModelV2): + """Custom model for policy gradient algorithms.""" + + def __init__(self, obs_space, action_space, num_outputs, model_config, + name): + super(MyKerasModel, self).__init__(obs_space, action_space, + num_outputs, model_config, name) + self.inputs = tf.keras.layers.Input( + shape=obs_space.shape, name="observations") + layer_1 = tf.keras.layers.Dense( + 128, + name="my_layer1", + activation=tf.nn.relu, + kernel_initializer=normc_initializer(1.0))(self.inputs) + layer_out = tf.keras.layers.Dense( + num_outputs, + name="my_out", + activation=None, + kernel_initializer=normc_initializer(0.01))(layer_1) + value_out = tf.keras.layers.Dense( + 1, + name="value_out", + activation=None, + kernel_initializer=normc_initializer(0.01))(layer_1) + self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out]) + self.register_variables(self.base_model.variables) + + def forward(self, input_dict, state, seq_lens): + self.prev_input = input_dict + model_out, self._value_out = self.base_model(input_dict["obs"]) + return model_out, state + + def value_function(self): + return tf.reshape(self._value_out, [-1]) + + +class MyKerasQModel(DistributionalQModel): + """Custom model for DQN.""" + + def __init__(self, obs_space, action_space, num_outputs, model_config, + name, **kw): + super(MyKerasQModel, self).__init__( + obs_space, action_space, num_outputs, model_config, name, **kw) + + # Define the core model layers which will be used by the other + # output heads of DistributionalQModel + self.inputs = tf.keras.layers.Input( + shape=obs_space.shape, name="observations") + layer_1 = tf.keras.layers.Dense( + 128, + name="my_layer1", + activation=tf.nn.relu, + kernel_initializer=normc_initializer(1.0))(self.inputs) + layer_out = tf.keras.layers.Dense( + num_outputs, + name="my_out", + activation=tf.nn.relu, + kernel_initializer=normc_initializer(1.0))(layer_1) + self.base_model = tf.keras.Model(self.inputs, layer_out) + self.register_variables(self.base_model.variables) + + # Implement the core forward method + def forward(self, input_dict, state, seq_lens): + self.prev_input = input_dict + model_out = self.base_model(input_dict["obs"]) + return model_out, state + + +if __name__ == "__main__": + ray.init(local_mode=True) + args = parser.parse_args() + ModelCatalog.register_custom_model("keras_model", MyKerasModel) + ModelCatalog.register_custom_model("keras_q_model", MyKerasQModel) + tune.run( + args.run, + stop={"episode_reward_mean": args.stop}, + config={ + "env": "CartPole-v0", + "model": { + "custom_model": "keras_q_model" + if args.run == "DQN" else "keras_model" + }, + }) diff --git a/python/ray/rllib/examples/parametric_action_cartpole.py b/python/ray/rllib/examples/parametric_action_cartpole.py index e16e1ab75..22d45d066 100644 --- a/python/ray/rllib/examples/parametric_action_cartpole.py +++ b/python/ray/rllib/examples/parametric_action_cartpole.py @@ -173,9 +173,14 @@ if __name__ == "__main__": "observation_filter": "NoFilter", # don't filter the action list "vf_share_layers": True, # don't create duplicate value model } - elif args.run == "DQN": + elif args.run in ["SimpleQ", "DQN"]: cfg = { "hiddens": [], # important: don't postprocess the action scores + # TODO(ekl) we could support dueling if the model in this example + # was ModelV2 and only emitted -inf values on get_q_values(). + # The problem with ModelV1 is that the model outputs + # are used as state scores and hence cause blowup to inf. + "dueling": False, } else: cfg = {} # PG, IMPALA, A2C, etc. diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index a3a68c22e..62ebb691c 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -16,10 +16,13 @@ from ray.rllib.models.action_dist import (Categorical, MultiCategorical, MultiActionDistribution, Dirichlet) from ray.rllib.models.torch_action_dist import (TorchCategorical, TorchDiagGaussian) +from ray.rllib.models.tf.modelv1_compat import make_v1_wrapper from ray.rllib.models.preprocessors import get_preprocessor from ray.rllib.models.fcnet import FullyConnectedNetwork from ray.rllib.models.visionnet import VisionNetwork from ray.rllib.models.lstm import LSTM +from ray.rllib.models.modelv2 import ModelV2 +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI from ray.rllib.utils import try_import_tf @@ -41,8 +44,12 @@ MODEL_DEFAULTS = { "fcnet_hiddens": [256, 256], # For control envs, documented in ray.rllib.models.Model "free_log_std": False, - # (deprecated) Whether to use sigmoid to squash actions to space range - "squash_to_range": False, + # Whether to skip the final linear layer used to resize the hidden layer + # outputs to size `num_outputs`. If True, then the last hidden layer + # should already match num_outputs. + "no_final_linear": False, + # Whether layers should be shared for the value function. + "vf_share_layers": False, # == LSTM == # Whether to wrap the model with a LSTM @@ -53,6 +60,9 @@ MODEL_DEFAULTS = { "lstm_cell_size": 256, # Whether to feed a_{t-1}, r_{t-1} to LSTM "lstm_use_prev_action_reward": False, + # When using modelv1 models with a modelv2 algorithm, you may have to + # define the state shape here (e.g., [256, 256]). + "state_shape": None, # == Atari == # Whether to enable framestack for Atari envs @@ -117,10 +127,6 @@ class ModelCatalog(object): "using a Tuple action space, or the multi-agent API.") if dist_type is None: dist = TorchDiagGaussian if torch else DiagGaussian - if config.get("squash_to_range"): - raise ValueError( - "The squash_to_range option is deprecated. See the " - "clip_actions agent option instead.") return dist, action_space.shape[0] * 2 elif dist_type == "deterministic": return Deterministic, action_space.shape[0] @@ -196,6 +202,90 @@ class ModelCatalog(object): raise NotImplementedError("action space {}" " not supported".format(action_space)) + @staticmethod + def get_model_v2(obs_space, + action_space, + num_outputs, + model_config, + framework="tf", + name=None, + model_interface=None, + **model_kwargs): + """Returns a suitable model compatible with given spaces and output. + + Args: + obs_space (Space): Observation space of the target gym env. This + may have an `original_space` attribute that specifies how to + unflatten the tensor into a ragged tensor. + action_space (Space): Action space of the target gym env. + num_outputs (int): The size of the output vector of the model. + framework (str): Either "tf" or "torch". + name (str): Name (scope) for the model. + model_interface (cls): Interface required for the model + model_kwargs (dict): args to pass to the ModelV2 constructor + + Returns: + model (ModelV2): Model to use for the policy. + """ + + if model_config.get("custom_model"): + model_cls = _global_registry.get(RLLIB_MODEL, + model_config["custom_model"]) + if issubclass(model_cls, ModelV2): + if model_interface and not issubclass(model_cls, + model_interface): + raise ValueError("The given model must subclass", + model_interface) + created = set() + + # Track and warn if variables were created but no registered + def track_var_creation(next_creator, **kw): + v = next_creator(**kw) + created.add(v) + return v + + with tf.variable_creator_scope(track_var_creation): + instance = model_cls(obs_space, action_space, num_outputs, + model_config, name, **model_kwargs) + registered = set(instance.variables()) + not_registered = set() + for var in created: + if var not in registered: + not_registered.add(var) + if not_registered: + raise ValueError( + "It looks like variables {} were created as part of " + "{} but does not appear in model.variables() ({}). " + "Did you forget to call model.register_variables() " + "on the variables in question?".format( + not_registered, instance, registered)) + return instance + + if framework == "tf": + legacy_model_cls = ModelCatalog.get_model + wrapper = ModelCatalog._wrap_if_needed( + make_v1_wrapper(legacy_model_cls), model_interface) + return wrapper(obs_space, action_space, num_outputs, model_config, + name, **model_kwargs) + + raise NotImplementedError("TODO: support {} models".format(framework)) + + @staticmethod + def _wrap_if_needed(model_cls, model_interface): + assert issubclass(model_cls, TFModelV2) + + if not model_interface or issubclass(model_cls, model_interface): + return model_cls + + class wrapper(model_interface, model_cls): + pass + + name = "{}_as_{}".format(model_cls.__name__, model_interface.__name__) + wrapper.__name__ = name + wrapper.__qualname__ = name + + return wrapper + @staticmethod @DeveloperAPI def get_model(input_dict, @@ -284,10 +374,10 @@ class ModelCatalog(object): Returns: model (models.Model): Neural network model. """ - from ray.rllib.models.pytorch.fcnet import (FullyConnectedNetwork as - PyTorchFCNet) - from ray.rllib.models.pytorch.visionnet import (VisionNetwork as - PyTorchVisionNet) + from ray.rllib.models.torch.fcnet import (FullyConnectedNetwork as + PyTorchFCNet) + from ray.rllib.models.torch.visionnet import (VisionNetwork as + PyTorchVisionNet) options = options or MODEL_DEFAULTS diff --git a/python/ray/rllib/models/fcnet.py b/python/ray/rllib/models/fcnet.py index c3bacbd46..76d054e3d 100644 --- a/python/ray/rllib/models/fcnet.py +++ b/python/ray/rllib/models/fcnet.py @@ -28,6 +28,16 @@ class FullyConnectedNetwork(Model): i = 1 last_layer = inputs for size in hiddens: + # skip final linear layer + if options.get("no_final_linear") and i == len(hiddens): + output = tf.layers.dense( + last_layer, + num_outputs, + kernel_initializer=normc_initializer(1.0), + activation=activation, + name="fc_out") + return output, output + label = "fc{}".format(i) last_layer = tf.layers.dense( last_layer, @@ -36,11 +46,11 @@ class FullyConnectedNetwork(Model): activation=activation, name=label) i += 1 - label = "fc_out" + output = tf.layers.dense( last_layer, num_outputs, kernel_initializer=normc_initializer(0.01), activation=None, - name=label) + name="fc_out") return output, last_layer diff --git a/python/ray/rllib/models/model.py b/python/ray/rllib/models/model.py index 901ffa802..a43b088cc 100644 --- a/python/ray/rllib/models/model.py +++ b/python/ray/rllib/models/model.py @@ -67,6 +67,7 @@ class Model(object): self.options = options self.scope = tf.get_variable_scope() self.session = tf.get_default_session() + self.input_dict = input_dict if seq_lens is not None: self.seq_lens = seq_lens else: @@ -77,6 +78,8 @@ class Model(object): if options.get("free_log_std"): assert num_outputs % 2 == 0 num_outputs = num_outputs // 2 + + ok = True try: restored = input_dict.copy() restored["obs"] = restore_original_dimensions( @@ -84,6 +87,10 @@ class Model(object): self.outputs, self.last_layer = self._build_layers_v2( restored, num_outputs, options) except NotImplementedError: + ok = False + # In TF 1.14, you cannot construct variable scopes in exception + # handlers so we have to set the OK flag and check it here: + if not ok: self.outputs, self.last_layer = self._build_layers( input_dict["obs"], num_outputs, options) @@ -192,6 +199,12 @@ class Model(object): """Deprecated: use self.custom_loss().""" return None + @classmethod + def get_initial_state(cls, obs_space, action_space, num_outputs, options): + raise NotImplementedError( + "In order to use recurrent models with ModelV2, you should define " + "the get_initial_state @classmethod on your custom model class.") + def _validate_output_shape(self): """Checks that the model has the correct number of outputs.""" try: @@ -226,6 +239,11 @@ def restore_original_dimensions(obs, obs_space, tensorlib=tf): """ if hasattr(obs_space, "original_space"): + if tensorlib == "tf": + tensorlib = tf + elif tensorlib == "torch": + import torch + tensorlib = torch return _unpack_obs(obs, obs_space.original_space, tensorlib=tensorlib) else: return obs diff --git a/python/ray/rllib/models/modelv2.py b/python/ray/rllib/models/modelv2.py new file mode 100644 index 000000000..2fca20bfb --- /dev/null +++ b/python/ray/rllib/models/modelv2.py @@ -0,0 +1,169 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.models.model import restore_original_dimensions + + +class ModelV2(object): + """Defines a Keras-style abstract network model for use with RLlib. + + Custom models should extend either TFModelV2 or TorchModelV2 instead of + this class directly. Experimental. + + Attributes: + obs_space (Space): observation space of the target gym env. This + may have an `original_space` attribute that specifies how to + unflatten the tensor into a ragged tensor. + action_space (Space): action space of the target gym env + num_outputs (int): number of output units of the model + model_config (dict): config for the model, documented in ModelCatalog + name (str): name (scope) for the model + framework (str): either "tf" or "torch" + """ + + def __init__(self, obs_space, action_space, num_outputs, model_config, + name, framework): + """Initialize the model. + + This method should create any variables used by the model. + """ + + self.obs_space = obs_space + self.action_space = action_space + self.num_outputs = num_outputs + self.model_config = model_config + self.name = name or "default_model" + self.framework = framework + self.var_list = [] + + def get_initial_state(self): + """Get the initial recurrent state values for the model. + + Returns: + list of np.array objects, if any + """ + return [] + + def forward(self, input_dict, state, seq_lens): + """Call the model with the given input tensors and state. + + Any complex observations (dicts, tuples, etc.) will be unpacked by + __call__ before being passed to forward(). To access the flattened + observation tensor, refer to input_dict["obs_flat"]. + + This method can be called any number of times. In eager execution, + each call to forward() will eagerly evaluate the model. In symbolic + execution, each call to forward creates a computation graph that + operates over the variables of this model (i.e., shares weights). + + Custom models should override this instead of __call__. + + Arguments: + input_dict (dict): dictionary of input tensors, including "obs", + "obs_flat", "prev_action", "prev_reward", "is_training" + state (list): list of state tensors with sizes matching those + returned by get_initial_state + the batch dimension + seq_lens (Tensor): 1d tensor holding input sequence lengths + + Returns: + (outputs, state): The model output tensor of size + [BATCH, num_outputs] + """ + raise NotImplementedError + + def value_function(self): + """Return the value function estimate for the most recent forward pass. + + Returns: + value estimate tensor of shape [BATCH]. + """ + raise NotImplementedError + + def custom_loss(self, policy_loss, loss_inputs): + """Override to customize the loss function used to optimize this model. + + This can be used to incorporate self-supervised losses (by defining + a loss over existing input and output tensors of this model), and + supervised losses (by defining losses over a variable-sharing copy of + this model's layers). + + You can find an runnable example in examples/custom_loss.py. + + Arguments: + policy_loss (Tensor): scalar policy loss from the policy. + loss_inputs (dict): map of input placeholders for rollout data. + + Returns: + Scalar tensor for the customized loss for this model. + """ + return policy_loss + + def metrics(self): + """Override to return custom metrics from your model. + + The stats will be reported as part of the learner stats, i.e., + info: + learner: + model: + key1: metric1 + key2: metric2 + + Returns: + Dict of string keys to scalar tensors. + """ + return {} + + def register_variables(self, variables): + """Register the given list of variables with this model.""" + self.var_list.extend(variables) + + def variables(self): + """Returns the list of variables for this model.""" + return self.var_list + + def trainable_variables(self): + """Returns the list of trainable variables for this model.""" + return self.variables() + + def __call__(self, input_dict, state, seq_lens): + """Call the model with the given input tensors and state. + + This is the method used by RLlib to execute the forward pass. It calls + forward() internally after unpacking nested observation tensors. + + Custom models should override forward() instead of __call__. + + Arguments: + input_dict (dict): dictionary of input tensors, including "obs", + "prev_action", "prev_reward", "is_training" + state (list): list of state tensors with sizes matching those + returned by get_initial_state + the batch dimension + seq_lens (Tensor): 1d tensor holding input sequence lengths + + Returns: + (outputs, state): The model output tensor of size + [BATCH, output_spec.size] or a list of tensors corresponding to + output_spec.shape_list, and a list of state tensors of + [BATCH, state_size_i]. + """ + + restored = input_dict.copy() + restored["obs"] = restore_original_dimensions( + input_dict["obs"], self.obs_space, self.framework) + restored["obs_flat"] = input_dict["obs"] + outputs, state = self.forward(restored, state, seq_lens) + + try: + shape = outputs.shape + except AttributeError: + raise ValueError("Output is not a tensor: {}".format(outputs)) + else: + if len(shape) != 2 or shape[1] != self.num_outputs: + raise ValueError( + "Expected output shape of [None, {}], got {}".format( + self.num_outputs, shape)) + if not isinstance(state, list): + raise ValueError("State output is not a list: {}".format(state)) + + return outputs, state diff --git a/python/ray/rllib/models/pytorch/__init__.py b/python/ray/rllib/models/tf/__init__.py similarity index 100% rename from python/ray/rllib/models/pytorch/__init__.py rename to python/ray/rllib/models/tf/__init__.py diff --git a/python/ray/rllib/models/tf/modelv1_compat.py b/python/ray/rllib/models/tf/modelv1_compat.py new file mode 100644 index 000000000..6bc0ac65e --- /dev/null +++ b/python/ray/rllib/models/tf/modelv1_compat.py @@ -0,0 +1,134 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import numpy as np + +from ray.rllib.models.modelv2 import ModelV2 +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from ray.rllib.models.misc import linear, normc_initializer +from ray.rllib.utils.annotations import override +from ray.rllib.utils import try_import_tf +from ray.rllib.utils.tf_ops import scope_vars + +tf = try_import_tf() + +logger = logging.getLogger(__name__) + + +def make_v1_wrapper(legacy_model_cls): + class ModelV1Wrapper(TFModelV2): + """Wrapper that allows V1 models to be used as ModelV2.""" + + def __init__(self, obs_space, action_space, num_outputs, model_config, + name): + TFModelV2.__init__(self, obs_space, action_space, num_outputs, + model_config, name) + self.legacy_model_cls = legacy_model_cls + + def instance_template(input_dict, state, seq_lens): + # create a new model instance + with tf.variable_scope(self.name): + new_instance = self.legacy_model_cls( + input_dict, obs_space, action_space, num_outputs, + model_config, state, seq_lens) + return new_instance + + self.instance_template = tf.make_template("instance_template", + instance_template) + # Tracks the last v1 model created by the call to forward + self.cur_instance = None + + def vf_template(last_layer, input_dict): + with tf.variable_scope(self.variable_scope): + with tf.variable_scope("value_function"): + # Simple case: sharing the feature layer + if model_config["vf_share_layers"]: + return tf.reshape( + linear(last_layer, 1, "value_function", + normc_initializer(1.0)), [-1]) + + # Create a new separate model with no RNN state, etc. + branch_model_config = model_config.copy() + branch_model_config["free_log_std"] = False + if branch_model_config["use_lstm"]: + branch_model_config["use_lstm"] = False + logger.warning( + "It is not recommended to use a LSTM model " + "with vf_share_layers=False (consider " + "setting it to True). If you want to not " + "share layers, you can implement a custom " + "LSTM model that overrides the " + "value_function() method.") + branch_instance = legacy_model_cls( + input_dict, + obs_space, + action_space, + 1, + branch_model_config, + state_in=None, + seq_lens=None) + return tf.reshape(branch_instance.outputs, [-1]) + + self.vf_template = tf.make_template("vf_template", vf_template) + + # XXX: Try to guess the initial state size. Since the size of the + # state is known only after forward() for V1 models, it might be + # wrong. + if model_config.get("state_shape"): + self.initial_state = [ + np.zeros(s, np.float32) + for s in model_config["state_shape"] + ] + elif model_config.get("use_lstm"): + cell_size = model_config.get("lstm_cell_size", 256) + self.initial_state = [ + np.zeros(cell_size, np.float32), + np.zeros(cell_size, np.float32), + ] + else: + self.initial_state = [] + + with tf.variable_scope(self.name) as scope: + self.variable_scope = scope + + @override(ModelV2) + def get_initial_state(self): + return self.initial_state + + @override(ModelV2) + def __call__(self, input_dict, state, seq_lens): + new_instance = self.instance_template(input_dict, state, seq_lens) + if len(new_instance.state_init) != len(self.get_initial_state()): + raise ValueError( + "When using a custom recurrent ModelV1 model, you should " + "declare the state_shape in the model options. For " + "example, set 'state_shape': [256, 256] for a lstm with " + "cell size 256. The guessed state shape was {} which " + "appears to be incorrect.".format( + [s.shape[0] for s in self.get_initial_state()])) + self.cur_instance = new_instance + self.variable_scope = new_instance.scope + return new_instance.outputs, new_instance.state_out + + @override(ModelV2) + def variables(self): + return super(ModelV1Wrapper, self).variables() + scope_vars( + self.variable_scope) + + @override(ModelV2) + def custom_loss(self, policy_loss, loss_inputs): + return self.cur_instance.custom_loss(policy_loss, loss_inputs) + + @override(ModelV2) + def metrics(self): + return self.cur_instance.custom_stats() + + @override(ModelV2) + def value_function(self): + assert self.cur_instance, "must call forward first" + return self.vf_template(self.cur_instance.last_layer, + self.cur_instance.input_dict) + + return ModelV1Wrapper diff --git a/python/ray/rllib/models/tf/tf_modelv2.py b/python/ray/rllib/models/tf/tf_modelv2.py new file mode 100644 index 000000000..b2769b5ec --- /dev/null +++ b/python/ray/rllib/models/tf/tf_modelv2.py @@ -0,0 +1,23 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.models.modelv2 import ModelV2 +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + + +class TFModelV2(ModelV2): + """TF version of ModelV2.""" + + def __init__(self, obs_space, action_space, output_spec, model_config, + name): + ModelV2.__init__( + self, + obs_space, + action_space, + output_spec, + model_config, + name, + framework="tf") diff --git a/python/ray/rllib/models/torch/__init__.py b/python/ray/rllib/models/torch/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/rllib/models/pytorch/fcnet.py b/python/ray/rllib/models/torch/fcnet.py similarity index 93% rename from python/ray/rllib/models/pytorch/fcnet.py rename to python/ray/rllib/models/torch/fcnet.py index d6469f65a..68957ba11 100644 --- a/python/ray/rllib/models/pytorch/fcnet.py +++ b/python/ray/rllib/models/torch/fcnet.py @@ -6,8 +6,8 @@ import logging import numpy as np import torch.nn as nn -from ray.rllib.models.pytorch.model import TorchModel -from ray.rllib.models.pytorch.misc import normc_initializer, SlimFC, \ +from ray.rllib.models.torch.model import TorchModel +from ray.rllib.models.torch.misc import normc_initializer, SlimFC, \ _get_activation_fn from ray.rllib.utils.annotations import override diff --git a/python/ray/rllib/models/pytorch/misc.py b/python/ray/rllib/models/torch/misc.py similarity index 100% rename from python/ray/rllib/models/pytorch/misc.py rename to python/ray/rllib/models/torch/misc.py diff --git a/python/ray/rllib/models/pytorch/model.py b/python/ray/rllib/models/torch/model.py similarity index 96% rename from python/ray/rllib/models/pytorch/model.py rename to python/ray/rllib/models/torch/model.py index 48b9587a7..e06e3f1bb 100644 --- a/python/ray/rllib/models/pytorch/model.py +++ b/python/ray/rllib/models/torch/model.py @@ -9,6 +9,7 @@ from ray.rllib.models.model import restore_original_dimensions from ray.rllib.utils.annotations import PublicAPI +# TODO(ekl) rewrite using modelv2 @PublicAPI class TorchModel(nn.Module): """Defines an abstract network model for use with RLlib / PyTorch.""" @@ -31,6 +32,7 @@ class TorchModel(nn.Module): def forward(self, input_dict, hidden_state): """Wraps _forward() to unpack flattened Dict and Tuple observations.""" input_dict["obs"] = input_dict["obs"].float() # TODO(ekl): avoid cast + input_dict["obs_flat"] = input_dict["obs"] input_dict["obs"] = restore_original_dimensions( input_dict["obs"], self.obs_space, tensorlib=torch) outputs, features, vf, h = self._forward(input_dict, hidden_state) diff --git a/python/ray/rllib/models/torch/torch_modelv2.py b/python/ray/rllib/models/torch/torch_modelv2.py new file mode 100644 index 000000000..901cf91cf --- /dev/null +++ b/python/ray/rllib/models/torch/torch_modelv2.py @@ -0,0 +1,20 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.models.modelv2 import ModelV2 + + +class TorchModelV2(ModelV2): + """Torch version of ModelV2.""" + + def __init__(self, obs_space, action_space, output_spec, model_config, + name): + ModelV2.__init__( + self, + obs_space, + action_space, + output_spec, + model_config, + name, + framework="torch") diff --git a/python/ray/rllib/models/pytorch/visionnet.py b/python/ray/rllib/models/torch/visionnet.py similarity index 93% rename from python/ray/rllib/models/pytorch/visionnet.py rename to python/ray/rllib/models/torch/visionnet.py index 0b8a283df..9851b91ab 100644 --- a/python/ray/rllib/models/pytorch/visionnet.py +++ b/python/ray/rllib/models/torch/visionnet.py @@ -4,8 +4,8 @@ from __future__ import print_function import torch.nn as nn -from ray.rllib.models.pytorch.model import TorchModel -from ray.rllib.models.pytorch.misc import normc_initializer, valid_padding, \ +from ray.rllib.models.torch.model import TorchModel +from ray.rllib.models.torch.misc import normc_initializer, valid_padding, \ SlimConv2d, SlimFC from ray.rllib.models.visionnet import _get_filter_config from ray.rllib.utils.annotations import override diff --git a/python/ray/rllib/models/visionnet.py b/python/ray/rllib/models/visionnet.py index 6ad30ddb9..fabaf4c03 100644 --- a/python/ray/rllib/models/visionnet.py +++ b/python/ray/rllib/models/visionnet.py @@ -33,6 +33,19 @@ class VisionNetwork(Model): padding="same", name="conv{}".format(i)) out_size, kernel, stride = filters[-1] + + # skip final linear layer + if options.get("no_final_linear"): + fc_out = tf.layers.conv2d( + inputs, + num_outputs, + kernel, + stride, + activation=activation, + padding="valid", + name="fc_out") + return flatten(fc_out), flatten(fc_out) + fc1 = tf.layers.conv2d( inputs, out_size, diff --git a/python/ray/rllib/policy/dynamic_tf_policy.py b/python/ray/rllib/policy/dynamic_tf_policy.py index 23014553b..c66fb7f53 100644 --- a/python/ray/rllib/policy/dynamic_tf_policy.py +++ b/python/ray/rllib/policy/dynamic_tf_policy.py @@ -23,12 +23,19 @@ logger = logging.getLogger(__name__) class DynamicTFPolicy(TFPolicy): """A TFPolicy that auto-defines placeholders dynamically at runtime. + This class also supports eager execution if config["use_eager"] is True. + Eager execution is implemented using a py_function op inside graph mode. + Initialization of this class occurs in two phases. * Phase 1: the model is created and model variables are initialized. * Phase 2: a fake batch of data is created, sent to the trajectory postprocessor, and then used to create placeholders for the loss function. The loss and stats functions are initialized with these placeholders. + + Initialization defines the static graph. When using eager execution, a + corresponding imperative py_function is also generated as an embedded op + inside the static graph. """ def __init__(self, @@ -40,8 +47,10 @@ class DynamicTFPolicy(TFPolicy): update_ops_fn=None, grad_stats_fn=None, before_loss_init=None, - make_action_sampler=None, + make_model=None, + action_sampler_fn=None, existing_inputs=None, + existing_model=None, get_batch_divisibility_req=None, obs_include_prev_action_reward=True): """Initialize a dynamic TF policy. @@ -60,17 +69,32 @@ class DynamicTFPolicy(TFPolicy): overriding the update ops to run when applying gradients before_loss_init (func): optional function to run prior to loss init that takes the same arguments as __init__ - make_action_sampler (func): optional function that returns a - tuple of action and action prob tensors. The function takes - (policy, input_dict, obs_space, action_space, config) as its - arguments + make_model (func): optional function that returns a ModelV2 object + given (policy, obs_space, action_space, config). + All policy variables should be created in this function. If not + specified, a default model will be created. + action_sampler_fn (func): optional function that returns a + tuple of action and action prob tensors given + (policy, model, input_dict, obs_space, action_space, config). + If not specified, a default action distribution will be used. existing_inputs (OrderedDict): when copying a policy, this specifies an existing dict of placeholders to use instead of defining new ones + existing_model (ModelV2): when copying a policy, this specifies + an existing model to clone and share weights with get_batch_divisibility_req (func): optional function that returns the divisibility requirement for sample batches obs_include_prev_action_reward (bool): whether to include the previous action and reward in the model input + + Attributes: + config: config of the policy + model: model instance, if any + model_out: output tensors of the model + action_dist: action distribution of the model, if any + state_in: state input tensors, if any + state_out: state output tensors, if any + seq_lens: tensor of sequence lengths """ self.config = config self._loss_fn = loss_fn @@ -104,40 +128,47 @@ class DynamicTFPolicy(TFPolicy): SampleBatch.PREV_REWARDS: prev_rewards, "is_training": self._get_is_training_placeholder(), } + self.seq_lens = tf.placeholder( + dtype=tf.int32, shape=[None], name="seq_lens") - # Create the model network and action outputs - if make_action_sampler: - assert not existing_inputs, \ - "Cloning not supported with custom action sampler" - self.model = None - self.dist_class = None - self.action_dist = None - action_sampler, action_prob = make_action_sampler( - self, self.input_dict, obs_space, action_space, config) + # Setup model + self.dist_class, logit_dim = ModelCatalog.get_action_dist( + action_space, self.config["model"]) + if existing_model: + self.model = existing_model + elif make_model: + self.model = make_model(self, obs_space, action_space, config) else: - self.dist_class, logit_dim = ModelCatalog.get_action_dist( - action_space, self.config["model"]) - if existing_inputs: - existing_state_in = [ - v for k, v in existing_inputs.items() - if k.startswith("state_in_") - ] - if existing_state_in: - existing_seq_lens = existing_inputs["seq_lens"] - else: - existing_seq_lens = None - else: - existing_state_in = [] - existing_seq_lens = None - self.model = ModelCatalog.get_model( - self.input_dict, + self.model = ModelCatalog.get_model_v2( obs_space, action_space, logit_dim, self.config["model"], - state_in=existing_state_in, - seq_lens=existing_seq_lens) - self.action_dist = self.dist_class(self.model.outputs) + framework="tf") + if existing_inputs: + self.state_in = [ + v for k, v in existing_inputs.items() + if k.startswith("state_in_") + ] + if self.state_in: + self.seq_lens = existing_inputs["seq_lens"] + else: + self.state_in = [ + tf.placeholder(shape=(None, ) + s.shape, dtype=s.dtype) + for s in self.model.get_initial_state() + ] + self.model_out, self.state_out = self.model( + self.input_dict, self.state_in, self.seq_lens) + + # Setup action sampler + if action_sampler_fn: + self.action_dist = None + self.dist_class = None + action_sampler, action_prob = action_sampler_fn( + self, self.model, self.input_dict, obs_space, action_space, + config) + else: + self.action_dist = self.dist_class(self.model_out) action_sampler = self.action_dist.sample() action_prob = self.action_dist.sampled_action_prob() @@ -158,11 +189,11 @@ class DynamicTFPolicy(TFPolicy): loss=None, # dynamically initialized on run loss_inputs=[], model=self.model, - state_inputs=self.model and self.model.state_in, - state_outputs=self.model and self.model.state_out, + state_inputs=self.state_in, + state_outputs=self.state_out, prev_action_input=prev_actions, prev_reward_input=prev_rewards, - seq_lens=self.model and self.model.seq_lens, + seq_lens=self.seq_lens, max_seq_len=config["model"]["max_seq_len"], batch_divisibility_req=batch_divisibility_req) @@ -195,11 +226,6 @@ class DynamicTFPolicy(TFPolicy): def copy(self, existing_inputs): """Creates a copy of self using existing input placeholders.""" - if self.config["use_eager"]: - raise ValueError( - "eager not implemented for multi-GPU, try setting " - "`simple_optimizer: true`") - # Note that there might be RNN state inputs at the end of the list if self._state_inputs: num_state_inputs = len(self._state_inputs) + 1 @@ -227,12 +253,18 @@ class DynamicTFPolicy(TFPolicy): self.observation_space, self.action_space, self.config, - existing_inputs=input_dict) + existing_inputs=input_dict, + existing_model=self.model) loss = instance._do_loss_init(input_dict) - TFPolicy._initialize_loss( - instance, loss, [(k, existing_inputs[i]) - for i, (k, _) in enumerate(self._loss_inputs)]) + loss_inputs = [(k, existing_inputs[i]) + for i, (k, _) in enumerate(self._loss_inputs)] + + if self.config["use_eager"]: + loss, new_stats = instance._gen_eager_loss_op(loss_inputs) + instance._stats_fetches = new_stats + + TFPolicy._initialize_loss(instance, loss, loss_inputs) if instance._grad_stats_fn: instance._stats_fetches.update( instance._grad_stats_fn(instance, instance._grads)) @@ -241,7 +273,7 @@ class DynamicTFPolicy(TFPolicy): @override(Policy) def get_initial_state(self): if self.model: - return self.model.state_init + return self.model.get_initial_state() else: return [] @@ -321,31 +353,8 @@ class DynamicTFPolicy(TFPolicy): # and non-eager tensors, so losses that read non-eager tensors through # `policy` need to use `policy.convert_to_eager(tensor)`. if self.config["use_eager"]: - if not self.model: - raise ValueError("eager not implemented in this case") - graph_tensors = list(self._needs_eager_conversion) - - def gen_loss(model_outputs, *args): - # fill in the batch tensor dict with eager ensors - eager_inputs = dict( - zip([k for (k, v) in loss_inputs], - args[:len(loss_inputs)])) - # fill in the eager versions of all accessed graph tensors - self._eager_tensors = dict( - zip(graph_tensors, args[len(loss_inputs):])) - # patch the action dist to use eager mode tensors - self.action_dist.inputs = model_outputs - return self._loss_fn(self, eager_inputs) - - # TODO(ekl) also handle the stats funcs - loss = tf.py_function( - gen_loss, - # cast works around TypeError: Cannot convert provided value - # to EagerTensor. Provided value: 0.0 Requested dtype: int64 - [self.model.outputs] + [ - tf.cast(v, tf.float32) for (k, v) in loss_inputs - ] + [tf.cast(t, tf.float32) for t in graph_tensors], - tf.float32) + loss, new_stats = self._gen_eager_loss_op(loss_inputs) + self._stats_fetches = new_stats TFPolicy._initialize_loss(self, loss, loss_inputs) if self._grad_stats_fn: @@ -359,3 +368,36 @@ class DynamicTFPolicy(TFPolicy): if self._update_ops_fn: self._update_ops = self._update_ops_fn(self) return loss + + def _gen_eager_loss_op(self, loss_inputs): + graph_tensors = list(self._needs_eager_conversion) + stat_items = list(self._stats_fetches.items()) + + def gen_loss(model_outputs, *args): + # fill in the batch tensor dict with eager ensors + eager_inputs = dict( + zip([k for (k, v) in loss_inputs], args[:len(loss_inputs)])) + # fill in the eager versions of all accessed graph tensors + self._eager_tensors = dict( + zip(graph_tensors, args[len(loss_inputs):])) + # patch the action dist to use eager mode tensors + self.action_dist.inputs = model_outputs + loss = self._loss_fn(self, eager_inputs) + if self._stats_fn: + stats = self._stats_fn(self, eager_inputs) + return [loss] + [stats[k] for (k, v) in stat_items] + + eager_out = tf.py_function( + gen_loss, + # cast works around TypeError: Cannot convert provided value + # to EagerTensor. Provided value: 0.0 Requested dtype: int64 + [self.model_out] + [ + tf.cast(v, tf.float32) for (k, v) in loss_inputs + ] + [tf.cast(t, tf.float32) for t in graph_tensors], + Tout=[tf.float32] + [v.dtype for (k, v) in stat_items]) + stats = { + k: stat_tensor + for (stat_tensor, (k, v)) in zip(eager_out[1:], stat_items) + } + + return eager_out[0], stats diff --git a/python/ray/rllib/policy/tf_policy.py b/python/ray/rllib/policy/tf_policy.py index ef0de42e2..0642283e8 100644 --- a/python/ray/rllib/policy/tf_policy.py +++ b/python/ray/rllib/policy/tf_policy.py @@ -12,6 +12,7 @@ import ray.experimental.tf_utils from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models.lstm import chop_into_sequences +from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.debug import log_once, summarize from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule @@ -180,7 +181,10 @@ class TFPolicy(Policy): if self.model: self._loss = self.model.custom_loss(loss, self._loss_input_dict) - self._stats_fetches.update({"model": self.model.custom_stats()}) + self._stats_fetches.update({ + "model": self.model.metrics() if isinstance( + self.model, ModelV2) else self.model.custom_stats() + }) else: self._loss = loss diff --git a/python/ray/rllib/policy/tf_policy_template.py b/python/ray/rllib/policy/tf_policy_template.py index 37828bfe1..7419240f4 100644 --- a/python/ray/rllib/policy/tf_policy_template.py +++ b/python/ray/rllib/policy/tf_policy_template.py @@ -26,7 +26,8 @@ def build_tf_policy(name, before_init=None, before_loss_init=None, after_init=None, - make_action_sampler=None, + make_model=None, + action_sampler_fn=None, mixins=None, get_batch_divisibility_req=None, obs_include_prev_action_reward=True): @@ -40,6 +41,13 @@ def build_tf_policy(name, This means that you can e.g., depend on any policy attributes created in the running of `loss_fn` in later functions such as `stats_fn`. + In eager mode (experimental), the following functions will be run + repeatedly on each eager execution: loss_fn, stats_fn + + This means that these functions should not define any variables internally, + otherwise they will fail in eager mode execution. Variable should only + be created in make_model (if defined). + Arguments: name (str): name of the policy (e.g., "PPOTFPolicy") loss_fn (func): function that returns a loss tensor the policy, @@ -73,10 +81,14 @@ def build_tf_policy(name, init that takes the same arguments as the policy constructor after_init (func): optional function to run at the end of policy init that takes the same arguments as the policy constructor - make_action_sampler (func): optional function that returns a - tuple of action and action prob tensors. The function takes - (policy, input_dict, obs_space, action_space, config) as its - arguments + make_model (func): optional function that returns a ModelV2 object + given (policy, obs_space, action_space, config). + All policy variables should be created in this function. If not + specified, a default model will be created. + action_sampler_fn (func): optional function that returns a + tuple of action and action prob tensors given + (policy, model, input_dict, obs_space, action_space, config). + If not specified, a default action distribution will be used. mixins (list): list of any class mixins for the returned policy class. These mixins will be applied in order and will have higher precedence than the DynamicTFPolicy class @@ -97,6 +109,7 @@ def build_tf_policy(name, obs_space, action_space, config, + existing_model=None, existing_inputs=None): if get_default_config: config = dict(get_default_config(), **config) @@ -123,7 +136,9 @@ def build_tf_policy(name, grad_stats_fn=grad_stats_fn, update_ops_fn=update_ops_fn, before_loss_init=before_loss_init_wrapper, - make_action_sampler=make_action_sampler, + make_model=make_model, + action_sampler_fn=action_sampler_fn, + existing_model=existing_model, existing_inputs=existing_inputs, obs_include_prev_action_reward=obs_include_prev_action_reward) diff --git a/python/ray/rllib/tests/test_lstm.py b/python/ray/rllib/tests/test_lstm.py index dd9c7ccd9..fb8e6a20b 100644 --- a/python/ray/rllib/tests/test_lstm.py +++ b/python/ray/rllib/tests/test_lstm.py @@ -182,6 +182,7 @@ class RNNSequencing(unittest.TestCase): "model": { "custom_model": "rnn", "max_seq_len": 4, + "state_shape": [3, 3], }, }) ppo.train() @@ -238,6 +239,7 @@ class RNNSequencing(unittest.TestCase): "model": { "custom_model": "rnn", "max_seq_len": 4, + "state_shape": [3, 3], }, }) ppo.train() diff --git a/python/ray/rllib/tests/test_nested_spaces.py b/python/ray/rllib/tests/test_nested_spaces.py index 0220ba017..610fad2f1 100644 --- a/python/ray/rllib/tests/test_nested_spaces.py +++ b/python/ray/rllib/tests/test_nested_spaces.py @@ -18,8 +18,8 @@ from ray.rllib.env.base_env import BaseEnv from ray.rllib.env.vector_env import VectorEnv from ray.rllib.models import ModelCatalog from ray.rllib.models.model import Model -from ray.rllib.models.pytorch.fcnet import FullyConnectedNetwork -from ray.rllib.models.pytorch.model import TorchModel +from ray.rllib.models.torch.fcnet import FullyConnectedNetwork +from ray.rllib.models.torch.model import TorchModel from ray.rllib.rollout import rollout from ray.rllib.tests.test_external_env import SimpleServing from ray.tune.registry import register_env diff --git a/python/ray/rllib/utils/tf_ops.py b/python/ray/rllib/utils/tf_ops.py new file mode 100644 index 000000000..62c73e782 --- /dev/null +++ b/python/ray/rllib/utils/tf_ops.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + + +def huber_loss(x, delta=1.0): + """Reference: https://en.wikipedia.org/wiki/Huber_loss""" + return tf.where( + tf.abs(x) < delta, + tf.square(x) * 0.5, delta * (tf.abs(x) - 0.5 * delta)) + + +def reduce_mean_ignore_inf(x, axis): + """Same as tf.reduce_mean() but ignores -inf values.""" + mask = tf.not_equal(x, tf.float32.min) + x_zeroed = tf.where(mask, x, tf.zeros_like(x)) + return (tf.reduce_sum(x_zeroed, axis) / tf.reduce_sum( + tf.cast(mask, tf.float32), axis)) + + +def minimize_and_clip(optimizer, objective, var_list, clip_val=10): + """Minimized `objective` using `optimizer` w.r.t. variables in + `var_list` while ensure the norm of the gradients for each + variable is clipped to `clip_val` + """ + gradients = optimizer.compute_gradients(objective, var_list=var_list) + for i, (grad, var) in enumerate(gradients): + if grad is not None: + gradients[i] = (tf.clip_by_norm(grad, clip_val), var) + return gradients + + +def scope_vars(scope, trainable_only=False): + """ + Get variables inside a scope + The scope can be specified as a string + + Parameters + ---------- + scope: str or VariableScope + scope in which the variables reside. + trainable_only: bool + whether or not to return only the variables that were marked as + trainable. + + Returns + ------- + vars: [tf.Variable] + list of variables in `scope`. + """ + return tf.get_collection( + tf.GraphKeys.TRAINABLE_VARIABLES + if trainable_only else tf.GraphKeys.VARIABLES, + scope=scope if isinstance(scope, str) else scope.name)