From 19bbf1eb4da1c486e2714a9a113ab1322f05e52f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 4 Sep 2019 21:39:22 -0700 Subject: [PATCH] [rllib] Revert [rllib] Port DDPG to the build_tf_policy pattern (#5626) --- python/ray/ray_constants.py | 2 +- rllib/agents/ddpg/ddpg.py | 4 +- rllib/agents/ddpg/ddpg_model.py | 262 ------- rllib/agents/ddpg/ddpg_policy.py | 1069 ++++++++++++++++------------- rllib/agents/dqn/dqn.py | 5 +- rllib/agents/sac/sac_policy.py | 53 +- rllib/tests/test_eager_support.py | 24 - 7 files changed, 658 insertions(+), 761 deletions(-) delete mode 100644 rllib/agents/ddpg/ddpg_model.py diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index e6d37eea8..c61cfed2e 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -54,7 +54,7 @@ PICKLE_OBJECT_WARNING_SIZE = 10**7 # The maximum resource quantity that is allowed. TODO(rkn): This could be # relaxed, but the current implementation of the node manager will be slower # for large resource quantities due to bookkeeping of specific resource IDs. -MAX_RESOURCE_QUANTITY = 10000 +MAX_RESOURCE_QUANTITY = 20000 # Each memory "resource" counts as this many bytes of memory. MEMORY_RESOURCE_UNIT_BYTES = 50 * 1024 * 1024 diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index 9deeb06cc..a6b42f1ca 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -41,7 +41,7 @@ DEFAULT_CONFIG = with_common_config({ # === Model === # Apply a state preprocessor with spec given by the "model" config option # (like other RL algorithms). This is mostly useful if you have a weird - # observation shape, like an image. Auto-enabled if a custom model is set. + # observation shape, like an image. Disabled by default. "use_state_preprocessor": False, # Postprocess the policy network model output with these hidden layers. If # use_state_preprocessor is False, then these will be the *only* hidden @@ -173,7 +173,7 @@ def make_exploration_schedule(config, worker_index): if config["per_worker_exploration"]: assert config["num_workers"] > 1, "This requires multiple workers" if worker_index >= 0: - # Exploration constants from the Ape-X paper + # FIXME: what do magic constants mean? (0.4, 7) max_index = float(config["num_workers"] - 1) exponent = 1 + worker_index / max_index * 7 return ConstantSchedule(0.4**exponent) diff --git a/rllib/agents/ddpg/ddpg_model.py b/rllib/agents/ddpg/ddpg_model.py deleted file mode 100644 index f71079640..000000000 --- a/rllib/agents/ddpg/ddpg_model.py +++ /dev/null @@ -1,262 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np - -from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.utils import try_import_tf - -tf = try_import_tf() - - -class DDPGModel(TFModelV2): - """Extension of standard TFModel for DDPG. - - Data flow: - obs -> forward() -> model_out - model_out -> get_policy_output() -> pi(s) - model_out, actions -> get_q_values() -> Q(s, a) - model_out, actions -> get_twin_q_values() -> Q_twin(s, a) - - Note that this class by itself is not a valid model unless you - implement forward() in a subclass.""" - - def __init__(self, - obs_space, - action_space, - num_outputs, - model_config, - name, - actor_hidden_activation="relu", - actor_hiddens=(400, 300), - critic_hidden_activation="relu", - critic_hiddens=(400, 300), - parameter_noise=False, - twin_q=False, - exploration_ou_sigma=0.2): - """Initialize variables of this model. - - Extra model kwargs: - actor_hidden_activation (str): activation for actor network - actor_hiddens (list): hidden layers sizes for actor network - critic_hidden_activation (str): activation for critic network - critic_hiddens (list): hidden layers sizes for critic network - parameter_noise (bool): use param noise exploration - twin_q (bool): build twin Q networks - exploration_ou_sigma (float): ou noise sigma for exploration - - Note that the core layers for forward() are not defined here, this - only defines the layers for the output heads. Those layers for - forward() should be defined in subclasses of DDPGModel. - """ - - super(DDPGModel, self).__init__(obs_space, action_space, num_outputs, - model_config, name) - self.exploration_ou_sigma = exploration_ou_sigma - - self.action_dim = np.product(action_space.shape) - self.model_out = tf.keras.layers.Input( - shape=(num_outputs, ), name="model_out") - self.actions = tf.keras.layers.Input( - shape=(self.action_dim, ), name="actions") - - def build_action_net(action_out): - assert action_out.dtype == tf.float32 - activation = getattr(tf.nn, actor_hidden_activation) - i = 0 - for hidden in actor_hiddens: - if parameter_noise: - import tensorflow.contrib.layers as layers - action_out = layers.fully_connected( - action_out, - num_outputs=hidden, - activation_fn=activation, - normalizer_fn=layers.layer_norm) - else: - action_out = tf.layers.dense( - action_out, - units=hidden, - activation=activation, - name="action_hidden_{}".format(i)) - i += 1 - return tf.layers.dense( - action_out, - units=self.action_dim, - activation=None, - name="action_out") - - action_scope = name + "/action_net" - # Save the scope object, since in eager we will execute this - # path repeatedly and there is no guarantee it will always be run - # in the same original scope. - with tf.variable_scope(action_scope) as action_scope_handle: - pass - - # TODO(ekl) use keras layers instead of variable scopes - if tf.executing_eagerly(): - # Have to use a variable store to reuse variables in eager mode - import tensorflow.contrib as tfc - store = tfc.eager.EagerVariableStore() - - def build_action_net_scope(model_out): - with store.as_default(): - with tf.variable_scope( - action_scope_handle, reuse=tf.AUTO_REUSE): - return build_action_net(model_out) - else: - - def build_action_net_scope(model_out): - with tf.variable_scope( - action_scope_handle, reuse=tf.AUTO_REUSE): - return build_action_net(model_out) - - pi_out = tf.keras.layers.Lambda(build_action_net_scope)(self.model_out) - self.action_net = tf.keras.Model(self.model_out, pi_out) - self.register_variables(self.action_net.variables) - - # Noise vars for P network except for layer normalization vars - if parameter_noise: - assert not tf.executing_eagerly(), "eager p noise not implemented" - with tf.variable_scope(action_scope_handle, reuse=tf.AUTO_REUSE): - self._build_parameter_noise([ - var for var in self.action_net.variables - if "LayerNorm" not in var.name - ]) - - def build_q_net(name, model_out, actions): - q_out = tf.keras.layers.Concatenate(axis=1)([model_out, actions]) - activation = getattr(tf.nn, critic_hidden_activation) - for i, n in enumerate(critic_hiddens): - q_out = tf.keras.layers.Dense( - n, - name="{}_hidden_{}".format(name, i), - activation=activation)(q_out) - q_out = tf.keras.layers.Dense( - 1, activation=None, name="{}_out".format(name))(q_out) - return tf.keras.Model([model_out, actions], q_out) - - self.q_net = build_q_net("q", self.model_out, self.actions) - self.register_variables(self.q_net.variables) - - if twin_q: - self.twin_q_net = build_q_net("twin_q", self.model_out, - self.actions) - self.register_variables(self.twin_q_net.variables) - else: - self.twin_q_net = None - - def get_policy_output(self, model_out): - """Return the (unscaled) output of the policy network. - - This returns the unscaled outputs of pi(s). - - Arguments: - model_out (Tensor): obs embeddings from the model layers, of shape - [BATCH_SIZE, num_outputs]. - - Returns: - tensor of shape [BATCH_SIZE, action_dim] with range [-inf, inf]. - """ - return self.action_net(model_out) - - def get_q_values(self, model_out, actions): - """Return the Q estimates for the most recent forward pass. - - This implements Q(s, a). - - Arguments: - model_out (Tensor): obs embeddings from the model layers, of shape - [BATCH_SIZE, num_outputs]. - actions (Tensor): action values that correspond with the most - recent batch of observations passed through forward(), of shape - [BATCH_SIZE, action_dim]. - - Returns: - tensor of shape [BATCH_SIZE]. - """ - return self.q_net([model_out, actions]) - - def get_twin_q_values(self, model_out, actions): - """Same as get_q_values but using the twin Q net. - - This implements the twin Q(s, a). - - Arguments: - model_out (Tensor): obs embeddings from the model layers, of shape - [BATCH_SIZE, num_outputs]. - actions (Tensor): action values that correspond with the most - recent batch of observations passed through forward(), of shape - [BATCH_SIZE, action_dim]. - - Returns: - tensor of shape [BATCH_SIZE]. - """ - return self.twin_q_net([model_out, actions]) - - def policy_variables(self): - """Return the list of variables for the policy net.""" - - return list(self.action_net.variables) - - def q_variables(self): - """Return the list of variables for Q / twin Q nets.""" - - return self.q_net.variables + (self.twin_q_net.variables - if self.twin_q_net else []) - - def update_action_noise(self, session, distance_in_action_space, - exploration_ou_sigma, cur_noise_scale): - """Update the model action noise settings. - - This is called internally by the DDPG policy.""" - - self.pi_distance = distance_in_action_space - if (distance_in_action_space < exploration_ou_sigma * cur_noise_scale): - # multiplying the sampled OU noise by noise scale is - # equivalent to multiplying the sigma of OU by noise scale - self.parameter_noise_sigma_val *= 1.01 - else: - self.parameter_noise_sigma_val /= 1.01 - self.parameter_noise_sigma.load( - self.parameter_noise_sigma_val, session=session) - - def _build_parameter_noise(self, pnet_params): - assert pnet_params - self.parameter_noise_sigma_val = self.exploration_ou_sigma - self.parameter_noise_sigma = tf.get_variable( - initializer=tf.constant_initializer( - self.parameter_noise_sigma_val), - name="parameter_noise_sigma", - shape=(), - trainable=False, - dtype=tf.float32) - self.parameter_noise = [] - # No need to add any noise on LayerNorm parameters - for var in pnet_params: - noise_var = tf.get_variable( - name=var.name.split(":")[0] + "_noise", - shape=var.shape, - initializer=tf.constant_initializer(.0), - trainable=False) - self.parameter_noise.append(noise_var) - remove_noise_ops = list() - for var, var_noise in zip(pnet_params, self.parameter_noise): - remove_noise_ops.append(tf.assign_add(var, -var_noise)) - self.remove_noise_op = tf.group(*tuple(remove_noise_ops)) - generate_noise_ops = list() - for var_noise in self.parameter_noise: - generate_noise_ops.append( - tf.assign( - var_noise, - tf.random_normal( - shape=var_noise.shape, - stddev=self.parameter_noise_sigma))) - with tf.control_dependencies(generate_noise_ops): - add_noise_ops = list() - for var, var_noise in zip(pnet_params, self.parameter_noise): - add_noise_ops.append(tf.assign_add(var, var_noise)) - self.add_noise_op = tf.group(*tuple(add_noise_ops)) - self.pi_distance = None - self.register_variables(self.parameter_noise) - self.register_variables([self.parameter_noise_sigma]) diff --git a/rllib/agents/ddpg/ddpg_policy.py b/rllib/agents/ddpg/ddpg_policy.py index a7617adb7..95e4bd121 100644 --- a/rllib/agents/ddpg/ddpg_policy.py +++ b/rllib/agents/ddpg/ddpg_policy.py @@ -4,414 +4,393 @@ from __future__ import print_function from gym.spaces import Box import numpy as np -import logging import ray import ray.experimental.tf_utils -from ray.rllib.agents.ddpg.ddpg_model import DDPGModel -from ray.rllib.agents.ddpg.noop_model import NoopModel -from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn, PRIO_WEIGHTS +from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY from ray.rllib.models import ModelCatalog from ray.rllib.utils.annotations import override from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.policy.policy import Policy from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.utils import try_import_tf -from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip, \ - make_tf_callable +from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip, scope_vars tf = try_import_tf() -logger = logging.getLogger(__name__) + +ACTION_SCOPE = "action" +POLICY_SCOPE = "policy" +POLICY_TARGET_SCOPE = "target_policy" +Q_SCOPE = "critic" +Q_TARGET_SCOPE = "target_critic" +TWIN_Q_SCOPE = "twin_critic" +TWIN_Q_TARGET_SCOPE = "twin_target_critic" + +# Importance sampling weights for prioritized replay +PRIO_WEIGHTS = "weights" -def build_ddpg_model(policy, obs_space, action_space, config): - if config["model"]["custom_model"]: - logger.warning( - "Setting use_state_preprocessor=True since a custom model " - "was specified.") - config["use_state_preprocessor"] = True - if not isinstance(action_space, Box): - raise UnsupportedSpaceException( - "Action space {} is not supported for DDPG.".format(action_space)) - if len(action_space.shape) > 1: - raise UnsupportedSpaceException( - "Action space has multiple dimensions " - "{}. ".format(action_space.shape) + - "Consider reshaping this into a single dimension, " - "using a Tuple action space, or the multi-agent API.") +class DDPGPostprocessing(object): + """Implements n-step learning and param noise adjustments.""" - if config["use_state_preprocessor"]: - default_model = None # catalog decides - num_outputs = 256 # arbitrary - config["model"]["no_final_linear"] = True - else: - default_model = NoopModel - num_outputs = int(np.product(obs_space.shape)) - - policy.model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], - framework="tf", - model_interface=DDPGModel, - default_model=default_model, - name="ddpg_model", - actor_hidden_activation=config["actor_hidden_activation"], - actor_hiddens=config["actor_hiddens"], - critic_hidden_activation=config["critic_hidden_activation"], - critic_hiddens=config["critic_hiddens"], - parameter_noise=config["parameter_noise"], - twin_q=config["twin_q"]) - - policy.target_model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], - framework="tf", - model_interface=DDPGModel, - default_model=default_model, - name="target_ddpg_model", - actor_hidden_activation=config["actor_hidden_activation"], - actor_hiddens=config["actor_hiddens"], - critic_hidden_activation=config["critic_hidden_activation"], - critic_hiddens=config["critic_hiddens"], - parameter_noise=config["parameter_noise"], - twin_q=config["twin_q"]) - - return policy.model - - -def postprocess_trajectory(policy, - sample_batch, - other_agent_batches=None, - episode=None): - if policy.config["parameter_noise"]: - policy.adjust_param_noise_sigma(sample_batch) - return _postprocess_dqn(policy, sample_batch) - - -def build_action_output(policy, model, input_dict, obs_space, action_space, - config): - model_out, _ = model({ - "obs": input_dict[SampleBatch.CUR_OBS], - "is_training": policy._get_is_training_placeholder(), - }, [], None) - action_out = model.get_policy_output(model_out) - - # Use sigmoid to scale to [0,1], but also double magnitude of input to - # emulate behaviour of tanh activation used in DDPG and TD3 papers. - sigmoid_out = tf.nn.sigmoid(2 * action_out) - # Rescale to actual env policy scale - # (shape of sigmoid_out is [batch_size, dim_actions], so we reshape to - # get same dims) - action_range = (action_space.high - action_space.low)[None] - low_action = action_space.low[None] - deterministic_actions = action_range * sigmoid_out + low_action - - noise_type = config["exploration_noise_type"] - action_low = action_space.low - action_high = action_space.high - action_range = action_space.high - action_low - - def compute_stochastic_actions(): - def make_noisy_actions(): - # shape of deterministic_actions is [None, dim_action] - if noise_type == "gaussian": - # add IID Gaussian noise for exploration, TD3-style - normal_sample = policy.noise_scale * tf.random_normal( - tf.shape(deterministic_actions), - stddev=config["exploration_gaussian_sigma"]) - stochastic_actions = tf.clip_by_value( - deterministic_actions + normal_sample, - action_low * tf.ones_like(deterministic_actions), - action_high * tf.ones_like(deterministic_actions)) - elif noise_type == "ou": - # add OU noise for exploration, DDPG-style - zero_acts = action_low.size * [.0] - exploration_sample = tf.get_variable( - name="ornstein_uhlenbeck", - dtype=tf.float32, - initializer=zero_acts, - trainable=False) - normal_sample = tf.random_normal( - shape=[action_low.size], mean=0.0, stddev=1.0) - ou_new = config["exploration_ou_theta"] \ - * -exploration_sample \ - + config["exploration_ou_sigma"] * normal_sample - exploration_value = tf.assign_add(exploration_sample, ou_new) - base_scale = config["exploration_ou_noise_scale"] - noise = policy.noise_scale * base_scale \ - * exploration_value * action_range - stochastic_actions = tf.clip_by_value( - deterministic_actions + noise, - action_low * tf.ones_like(deterministic_actions), - action_high * tf.ones_like(deterministic_actions)) + @override(Policy) + def postprocess_trajectory(self, + sample_batch, + other_agent_batches=None, + episode=None): + if self.config["parameter_noise"]: + # adjust the sigma of parameter space noise + states, noisy_actions = [ + list(x) for x in sample_batch.columns( + [SampleBatch.CUR_OBS, SampleBatch.ACTIONS]) + ] + self.sess.run(self.remove_noise_op) + clean_actions = self.sess.run( + self.output_actions, + feed_dict={ + self.cur_observations: states, + self.stochastic: False, + self.noise_scale: .0, + self.pure_exploration_phase: False, + }) + distance_in_action_space = np.sqrt( + np.mean(np.square(clean_actions - noisy_actions))) + self.pi_distance = distance_in_action_space + if distance_in_action_space < \ + self.config["exploration_ou_sigma"] * self.cur_noise_scale: + # multiplying the sampled OU noise by noise scale is + # equivalent to multiplying the sigma of OU by noise scale + self.parameter_noise_sigma_val *= 1.01 else: - raise ValueError( - "Unknown noise type '%s' (try 'ou' or 'gaussian')" % - noise_type) - return stochastic_actions + self.parameter_noise_sigma_val /= 1.01 + self.parameter_noise_sigma.load( + self.parameter_noise_sigma_val, session=self.sess) - def make_uniform_random_actions(): - # pure random exploration option - uniform_random_actions = tf.random_uniform( - tf.shape(deterministic_actions)) - # rescale uniform random actions according to action range - tf_range = tf.constant(action_range[None], dtype="float32") - tf_low = tf.constant(action_low[None], dtype="float32") - uniform_random_actions = uniform_random_actions * tf_range \ - + tf_low - return uniform_random_actions - - stochastic_actions = tf.cond( - # need to condition on noise_scale > 0 because zeroing - # noise_scale is how a worker signals no noise should be used - # (this is ugly and should be fixed by adding an "eval_mode" - # config flag or something) - tf.logical_and(policy.pure_exploration_phase, - policy.noise_scale > 0), - true_fn=make_uniform_random_actions, - false_fn=make_noisy_actions) - return stochastic_actions - - enable_stochastic = tf.logical_and(policy.stochastic, - not config["parameter_noise"]) - actions = tf.cond(enable_stochastic, compute_stochastic_actions, - lambda: deterministic_actions) - policy.output_actions = actions - return actions, None + return _postprocess_dqn(self, sample_batch) -def actor_critic_loss(policy, model, _, train_batch): - model_out_t, _ = model({ - "obs": train_batch[SampleBatch.CUR_OBS], - "is_training": policy._get_is_training_placeholder(), - }, [], None) +class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): + def __init__(self, observation_space, action_space, config): + config = dict(ray.rllib.agents.ddpg.ddpg.DEFAULT_CONFIG, **config) + if not isinstance(action_space, Box): + raise UnsupportedSpaceException( + "Action space {} is not supported for DDPG.".format( + action_space)) + if len(action_space.shape) > 1: + raise UnsupportedSpaceException( + "Action space has multiple dimensions " + "{}. ".format(action_space.shape) + + "Consider reshaping this into a single dimension, " + "using a Tuple action space, or the multi-agent API.") - model_out_tp1, _ = model({ - "obs": train_batch[SampleBatch.NEXT_OBS], - "is_training": policy._get_is_training_placeholder(), - }, [], None) - - target_model_out_tp1, _ = policy.target_model({ - "obs": train_batch[SampleBatch.NEXT_OBS], - "is_training": policy._get_is_training_placeholder(), - }, [], None) - - policy_t = model.get_policy_output(model_out_t) - policy_tp1 = model.get_policy_output(model_out_tp1) - - if policy.config["smooth_target_policy"]: - target_noise_clip = policy.config["target_noise_clip"] - clipped_normal_sample = tf.clip_by_value( - tf.random_normal( - tf.shape(policy_tp1), stddev=policy.config["target_noise"]), - -target_noise_clip, target_noise_clip) - policy_tp1_smoothed = tf.clip_by_value( - policy_tp1 + clipped_normal_sample, - policy.action_space.low * tf.ones_like(policy_tp1), - policy.action_space.high * tf.ones_like(policy_tp1)) - else: - policy_tp1_smoothed = policy_tp1 - - # q network evaluation - q_t = model.get_q_values(model_out_t, train_batch[SampleBatch.ACTIONS]) - if policy.config["twin_q"]: - twin_q_t = model.get_twin_q_values(model_out_t, - train_batch[SampleBatch.ACTIONS]) - - # Q-values for current policy (no noise) in given current state - q_t_det_policy = model.get_q_values(model_out_t, policy_t) - - # target q network evaluation - q_tp1 = policy.target_model.get_q_values(target_model_out_tp1, - policy_tp1_smoothed) - if policy.config["twin_q"]: - twin_q_tp1 = policy.target_model.get_twin_q_values( - target_model_out_tp1, policy_tp1_smoothed) - - q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) - if policy.config["twin_q"]: - twin_q_t_selected = tf.squeeze(twin_q_t, axis=len(q_t.shape) - 1) - q_tp1 = tf.minimum(q_tp1, twin_q_tp1) - - q_tp1_best = tf.squeeze(input=q_tp1, axis=len(q_tp1.shape) - 1) - q_tp1_best_masked = ( - 1.0 - tf.cast(train_batch[SampleBatch.DONES], tf.float32)) * q_tp1_best - - # compute RHS of bellman equation - q_t_selected_target = tf.stop_gradient( - train_batch[SampleBatch.REWARDS] + - policy.config["gamma"]**policy.config["n_step"] * q_tp1_best_masked) - - # compute the error (potentially clipped) - if policy.config["twin_q"]: - td_error = q_t_selected - q_t_selected_target - twin_td_error = twin_q_t_selected - q_t_selected_target - td_error = td_error + twin_td_error - if policy.config["use_huber"]: - errors = huber_loss(td_error, policy.config["huber_threshold"]) \ - + huber_loss(twin_td_error, policy.config["huber_threshold"]) - else: - errors = 0.5 * tf.square(td_error) + 0.5 * tf.square(twin_td_error) - else: - td_error = q_t_selected - q_t_selected_target - if policy.config["use_huber"]: - errors = huber_loss(td_error, policy.config["huber_threshold"]) - else: - errors = 0.5 * tf.square(td_error) - - critic_loss = model.custom_loss( - tf.reduce_mean( - tf.cast(train_batch[PRIO_WEIGHTS], tf.float32) * errors), - train_batch) - actor_loss = -tf.reduce_mean(q_t_det_policy) - - if policy.config["l2_reg"] is not None: - for var in model.policy_variables(): - if "bias" not in var.name: - actor_loss += policy.config["l2_reg"] * tf.nn.l2_loss(var) - for var in model.q_variables(): - if "bias" not in var.name: - critic_loss += policy.config["l2_reg"] * tf.nn.l2_loss(var) - - # save for stats function - policy.q_t = q_t - policy.td_error = td_error - policy.actor_loss = actor_loss - policy.critic_loss = critic_loss - - # in a custom apply op we handle the losses separately, but return them - # combined in one loss for now - return actor_loss + critic_loss - - -def gradients(policy, optimizer, loss): - if policy.config["grad_norm_clipping"] is not None: - actor_grads_and_vars = minimize_and_clip( - optimizer, - policy.actor_loss, - var_list=policy.model.policy_variables(), - clip_val=policy.config["grad_norm_clipping"]) - critic_grads_and_vars = minimize_and_clip( - optimizer, - policy.critic_loss, - var_list=policy.model.q_variables(), - clip_val=policy.config["grad_norm_clipping"]) - else: - actor_grads_and_vars = optimizer.compute_gradients( - policy.actor_loss, var_list=policy.model.policy_variables()) - critic_grads_and_vars = optimizer.compute_gradients( - policy.critic_loss, var_list=policy.model.q_variables()) - # save these for later use in build_apply_op - policy._actor_grads_and_vars = [(g, v) for (g, v) in actor_grads_and_vars - if g is not None] - policy._critic_grads_and_vars = [(g, v) for (g, v) in critic_grads_and_vars - if g is not None] - grads_and_vars = ( - policy._actor_grads_and_vars + policy._critic_grads_and_vars) - return grads_and_vars - - -def apply_gradients(policy, optimizer, grads_and_vars): - # for policy gradient, update policy net one time v.s. - # update critic net `policy_delay` time(s) - should_apply_actor_opt = tf.equal( - tf.mod(policy.global_step, policy.config["policy_delay"]), 0) - - def make_apply_op(): - return policy._actor_optimizer.apply_gradients( - policy._actor_grads_and_vars) - - actor_op = tf.cond( - should_apply_actor_opt, - true_fn=make_apply_op, - false_fn=lambda: tf.no_op()) - critic_op = policy._critic_optimizer.apply_gradients( - policy._critic_grads_and_vars) - - # increment global step & apply ops - with tf.control_dependencies([tf.assign_add(policy.global_step, 1)]): - return tf.group(actor_op, critic_op) - - -def stats(policy, train_batch): - return { - "td_error": tf.reduce_mean(policy.td_error), - "actor_loss": tf.reduce_mean(policy.actor_loss), - "critic_loss": tf.reduce_mean(policy.critic_loss), - "mean_q": tf.reduce_mean(policy.q_t), - "max_q": tf.reduce_max(policy.q_t), - "min_q": tf.reduce_min(policy.q_t), - } - - -class ExplorationStateMixin(object): - def __init__(self, obs_space, action_space, config): + self.config = config self.cur_noise_scale = 1.0 self.cur_pure_exploration_phase = False - self.stochastic = tf.get_variable( - initializer=tf.constant_initializer(True), - name="stochastic", - shape=(), - trainable=False, - dtype=tf.bool) - self.noise_scale = tf.get_variable( - initializer=tf.constant_initializer(self.cur_noise_scale), - name="noise_scale", - shape=(), - trainable=False, - dtype=tf.float32) - self.pure_exploration_phase = tf.get_variable( - initializer=tf.constant_initializer( - self.cur_pure_exploration_phase), - name="pure_exploration_phase", - shape=(), - trainable=False, - dtype=tf.bool) + self.dim_actions = action_space.shape[0] + self.low_action = action_space.low + self.high_action = action_space.high - def add_parameter_noise(self): + # create global step for counting the number of update operations + self.global_step = tf.train.get_or_create_global_step() + + # use separate optimizers for actor & critic + self._actor_optimizer = tf.train.AdamOptimizer( + learning_rate=self.config["actor_lr"]) + self._critic_optimizer = tf.train.AdamOptimizer( + learning_rate=self.config["critic_lr"]) + + # Action inputs + self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") + self.noise_scale = tf.placeholder(tf.float32, (), name="noise_scale") + self.pure_exploration_phase = tf.placeholder( + tf.bool, (), name="pure_exploration_phase") + self.cur_observations = tf.placeholder( + tf.float32, + shape=(None, ) + observation_space.shape, + name="cur_obs") + + with tf.variable_scope(POLICY_SCOPE) as scope: + policy_out, self.policy_model = self._build_policy_network( + self.cur_observations, observation_space, action_space) + self.policy_vars = scope_vars(scope.name) + + # Noise vars for P network except for layer normalization vars if self.config["parameter_noise"]: - self.get_session().run(self.model.add_noise_op) + self._build_parameter_noise([ + var for var in self.policy_vars if "LayerNorm" not in var.name + ]) - def adjust_param_noise_sigma(self, sample_batch): - assert not tf.executing_eagerly(), "eager not supported with p noise" - # adjust the sigma of parameter space noise - states, noisy_actions = [ - list(x) for x in sample_batch.columns( - [SampleBatch.CUR_OBS, SampleBatch.ACTIONS]) + # Action outputs + with tf.variable_scope(ACTION_SCOPE): + self.output_actions = self._add_exploration_noise( + policy_out, self.stochastic, self.noise_scale, + self.pure_exploration_phase, action_space) + + if self.config["smooth_target_policy"]: + self.reset_noise_op = tf.no_op() + else: + with tf.variable_scope(ACTION_SCOPE, reuse=True): + exploration_sample = tf.get_variable(name="ornstein_uhlenbeck") + self.reset_noise_op = tf.assign(exploration_sample, + self.dim_actions * [.0]) + + # Replay inputs + self.obs_t = tf.placeholder( + tf.float32, + shape=(None, ) + observation_space.shape, + name="observation") + self.act_t = tf.placeholder( + tf.float32, shape=(None, ) + action_space.shape, name="action") + self.rew_t = tf.placeholder(tf.float32, [None], name="reward") + self.obs_tp1 = tf.placeholder( + tf.float32, shape=(None, ) + observation_space.shape) + self.done_mask = tf.placeholder(tf.float32, [None], name="done") + self.importance_weights = tf.placeholder( + tf.float32, [None], name="weight") + + # policy network evaluation + with tf.variable_scope(POLICY_SCOPE, reuse=True) as scope: + prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) + self.policy_t, _ = self._build_policy_network( + self.obs_t, observation_space, action_space) + policy_batchnorm_update_ops = list( + set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - + prev_update_ops) + + # target policy network evaluation + with tf.variable_scope(POLICY_TARGET_SCOPE) as scope: + policy_tp1, _ = self._build_policy_network( + self.obs_tp1, observation_space, action_space) + target_policy_vars = scope_vars(scope.name) + + # Action outputs + with tf.variable_scope(ACTION_SCOPE, reuse=True): + if config["smooth_target_policy"]: + target_noise_clip = self.config["target_noise_clip"] + clipped_normal_sample = tf.clip_by_value( + tf.random_normal( + tf.shape(policy_tp1), + stddev=self.config["target_noise"]), + -target_noise_clip, target_noise_clip) + policy_tp1_smoothed = tf.clip_by_value( + policy_tp1 + clipped_normal_sample, + action_space.low * tf.ones_like(policy_tp1), + action_space.high * tf.ones_like(policy_tp1)) + else: + # no smoothing, just use deterministic actions + policy_tp1_smoothed = policy_tp1 + + # q network evaluation + prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) + with tf.variable_scope(Q_SCOPE) as scope: + # Q-values for given actions & observations in given current + q_t, self.q_model = self._build_q_network( + self.obs_t, observation_space, action_space, self.act_t) + self.q_func_vars = scope_vars(scope.name) + self.stats = { + "mean_q": tf.reduce_mean(q_t), + "max_q": tf.reduce_max(q_t), + "min_q": tf.reduce_min(q_t), + } + with tf.variable_scope(Q_SCOPE, reuse=True): + # Q-values for current policy (no noise) in given current state + q_t_det_policy, _ = self._build_q_network( + self.obs_t, observation_space, action_space, self.policy_t) + if self.config["twin_q"]: + with tf.variable_scope(TWIN_Q_SCOPE) as scope: + twin_q_t, self.twin_q_model = self._build_q_network( + self.obs_t, observation_space, action_space, self.act_t) + self.twin_q_func_vars = scope_vars(scope.name) + q_batchnorm_update_ops = list( + set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops) + + # target q network evaluation + with tf.variable_scope(Q_TARGET_SCOPE) as scope: + q_tp1, _ = self._build_q_network(self.obs_tp1, observation_space, + action_space, policy_tp1_smoothed) + target_q_func_vars = scope_vars(scope.name) + if self.config["twin_q"]: + with tf.variable_scope(TWIN_Q_TARGET_SCOPE) as scope: + twin_q_tp1, _ = self._build_q_network( + self.obs_tp1, observation_space, action_space, + policy_tp1_smoothed) + twin_target_q_func_vars = scope_vars(scope.name) + + if self.config["twin_q"]: + self.critic_loss, self.actor_loss, self.td_error \ + = self._build_actor_critic_loss( + q_t, q_tp1, q_t_det_policy, twin_q_t=twin_q_t, + twin_q_tp1=twin_q_tp1) + else: + self.critic_loss, self.actor_loss, self.td_error \ + = self._build_actor_critic_loss( + q_t, q_tp1, q_t_det_policy) + + if config["l2_reg"] is not None: + for var in self.policy_vars: + if "bias" not in var.name: + self.actor_loss += (config["l2_reg"] * tf.nn.l2_loss(var)) + for var in self.q_func_vars: + if "bias" not in var.name: + self.critic_loss += (config["l2_reg"] * tf.nn.l2_loss(var)) + if self.config["twin_q"]: + for var in self.twin_q_func_vars: + if "bias" not in var.name: + self.critic_loss += ( + config["l2_reg"] * tf.nn.l2_loss(var)) + + # update_target_fn will be called periodically to copy Q network to + # target Q network + self.tau_value = config.get("tau") + self.tau = tf.placeholder(tf.float32, (), name="tau") + update_target_expr = [] + for var, var_target in zip( + sorted(self.q_func_vars, key=lambda v: v.name), + sorted(target_q_func_vars, key=lambda v: v.name)): + update_target_expr.append( + var_target.assign(self.tau * var + + (1.0 - self.tau) * var_target)) + if self.config["twin_q"]: + for var, var_target in zip( + sorted(self.twin_q_func_vars, key=lambda v: v.name), + sorted(twin_target_q_func_vars, key=lambda v: v.name)): + update_target_expr.append( + var_target.assign(self.tau * var + + (1.0 - self.tau) * var_target)) + for var, var_target in zip( + sorted(self.policy_vars, key=lambda v: v.name), + sorted(target_policy_vars, key=lambda v: v.name)): + update_target_expr.append( + var_target.assign(self.tau * var + + (1.0 - self.tau) * var_target)) + self.update_target_expr = tf.group(*update_target_expr) + + self.sess = tf.get_default_session() + self.loss_inputs = [ + (SampleBatch.CUR_OBS, self.obs_t), + (SampleBatch.ACTIONS, self.act_t), + (SampleBatch.REWARDS, self.rew_t), + (SampleBatch.NEXT_OBS, self.obs_tp1), + (SampleBatch.DONES, self.done_mask), + (PRIO_WEIGHTS, self.importance_weights), ] - self.get_session().run(self.model.remove_noise_op) - clean_actions = self.get_session().run( - self.output_actions, - feed_dict={ - self.get_placeholder(SampleBatch.CUR_OBS): states, - self.stochastic: False, - self.noise_scale: .0, - self.pure_exploration_phase: False, - }) - distance_in_action_space = np.sqrt( - np.mean(np.square(clean_actions - noisy_actions))) - self.model.update_action_noise( - self.get_session(), distance_in_action_space, - self.config["exploration_ou_sigma"], self.cur_noise_scale) + input_dict = dict(self.loss_inputs) - def set_epsilon(self, epsilon): - # set_epsilon is called by optimizer to anneal exploration as - # necessary, and to turn it off during evaluation. The "epsilon" part - # is a carry-over from DQN, which uses epsilon-greedy exploration - # rather than adding action noise to the output of a policy network. - self.cur_noise_scale = epsilon - self.noise_scale.load(self.cur_noise_scale, self.get_session()) + if self.config["use_state_preprocessor"]: + # Model self-supervised losses + self.actor_loss = self.policy_model.custom_loss( + self.actor_loss, input_dict) + self.critic_loss = self.q_model.custom_loss( + self.critic_loss, input_dict) + if self.config["twin_q"]: + self.critic_loss = self.twin_q_model.custom_loss( + self.critic_loss, input_dict) - def set_pure_exploration_phase(self, pure_exploration_phase): - self.cur_pure_exploration_phase = pure_exploration_phase - self.pure_exploration_phase.load(self.cur_pure_exploration_phase, - self.get_session()) + TFPolicy.__init__( + self, + observation_space, + action_space, + self.sess, + obs_input=self.cur_observations, + action_sampler=self.output_actions, + loss=self.actor_loss + self.critic_loss, + loss_inputs=self.loss_inputs, + update_ops=q_batchnorm_update_ops + policy_batchnorm_update_ops) + self.sess.run(tf.global_variables_initializer()) + + # Note that this encompasses both the policy and Q-value networks and + # their corresponding target networks + self.variables = ray.experimental.tf_utils.TensorFlowVariables( + tf.group(q_t_det_policy, q_tp1), self.sess) + + # Hard initial update + self.update_target(tau=1.0) + + @override(TFPolicy) + def optimizer(self): + # we don't use this because we have two separate optimisers + return None + + @override(TFPolicy) + def build_apply_op(self, optimizer, grads_and_vars): + # for policy gradient, update policy net one time v.s. + # update critic net `policy_delay` time(s) + should_apply_actor_opt = tf.equal( + tf.mod(self.global_step, self.config["policy_delay"]), 0) + + def make_apply_op(): + return self._actor_optimizer.apply_gradients( + self._actor_grads_and_vars) + + actor_op = tf.cond( + should_apply_actor_opt, + true_fn=make_apply_op, + false_fn=lambda: tf.no_op()) + critic_op = self._critic_optimizer.apply_gradients( + self._critic_grads_and_vars) + # increment global step & apply ops + with tf.control_dependencies([tf.assign_add(self.global_step, 1)]): + return tf.group(actor_op, critic_op) + + @override(TFPolicy) + def gradients(self, optimizer, loss): + if self.config["grad_norm_clipping"] is not None: + actor_grads_and_vars = minimize_and_clip( + self._actor_optimizer, + self.actor_loss, + var_list=self.policy_vars, + clip_val=self.config["grad_norm_clipping"]) + critic_grads_and_vars = minimize_and_clip( + self._critic_optimizer, + self.critic_loss, + var_list=self.q_func_vars + self.twin_q_func_vars + if self.config["twin_q"] else self.q_func_vars, + clip_val=self.config["grad_norm_clipping"]) + else: + actor_grads_and_vars = self._actor_optimizer.compute_gradients( + self.actor_loss, var_list=self.policy_vars) + if self.config["twin_q"]: + critic_vars = self.q_func_vars + self.twin_q_func_vars + else: + critic_vars = self.q_func_vars + critic_grads_and_vars = self._critic_optimizer.compute_gradients( + self.critic_loss, var_list=critic_vars) + # save these for later use in build_apply_op + self._actor_grads_and_vars = [(g, v) for (g, v) in actor_grads_and_vars + if g is not None] + self._critic_grads_and_vars = [(g, v) + for (g, v) in critic_grads_and_vars + if g is not None] + grads_and_vars = self._actor_grads_and_vars \ + + self._critic_grads_and_vars + return grads_and_vars + + @override(TFPolicy) + def extra_compute_action_feed_dict(self): + return { + # FIXME: what about turning off exploration? Isn't that a good + # idea? + self.stochastic: True, + self.noise_scale: self.cur_noise_scale, + self.pure_exploration_phase: self.cur_pure_exploration_phase, + } + + @override(TFPolicy) + def extra_compute_grad_fetches(self): + return { + "td_error": self.td_error, + LEARNER_STATS_KEY: self.stats, + } + + @override(TFPolicy) + def get_weights(self): + return self.variables.get_weights() + + @override(TFPolicy) + def set_weights(self, weights): + self.variables.set_weights(weights) @override(Policy) def get_state(self): @@ -426,97 +405,253 @@ class ExplorationStateMixin(object): self.set_epsilon(state[1]) self.set_pure_exploration_phase(state[2]) + def _build_q_network(self, obs, obs_space, action_space, actions): + if self.config["use_state_preprocessor"]: + q_model = ModelCatalog.get_model({ + "obs": obs, + "is_training": self._get_is_training_placeholder(), + }, obs_space, action_space, 1, self.config["model"]) + q_out = tf.concat([q_model.last_layer, actions], axis=1) + else: + q_model = None + q_out = tf.concat([obs, actions], axis=1) -class TargetNetworkMixin(object): - def __init__(self, config): - @make_tf_callable(self.get_session()) - def update_target_fn(tau): - tau = tf.convert_to_tensor(tau, dtype=tf.float32) - update_target_expr = [] - model_vars = self.model.trainable_variables() - target_model_vars = self.target_model.trainable_variables() - assert len(model_vars) == len(target_model_vars), \ - (model_vars, target_model_vars) - for var, var_target in zip(model_vars, target_model_vars): - update_target_expr.append( - var_target.assign(tau * var + (1.0 - tau) * var_target)) - logger.debug("Update target op {}".format(var_target)) - return tf.group(*update_target_expr) + activation = getattr(tf.nn, self.config["critic_hidden_activation"]) + for hidden in self.config["critic_hiddens"]: + q_out = tf.layers.dense(q_out, units=hidden, activation=activation) + q_values = tf.layers.dense(q_out, units=1, activation=None) - # Hard initial update - self._do_update = update_target_fn - self.update_target(tau=1.0) + return q_values, q_model + + def _build_policy_network(self, obs, obs_space, action_space): + if self.config["use_state_preprocessor"]: + model = ModelCatalog.get_model({ + "obs": obs, + "is_training": self._get_is_training_placeholder(), + }, obs_space, action_space, 1, self.config["model"]) + action_out = model.last_layer + else: + model = None + action_out = obs + + activation = getattr(tf.nn, self.config["actor_hidden_activation"]) + for hidden in self.config["actor_hiddens"]: + if self.config["parameter_noise"]: + import tensorflow.contrib.layers as layers + action_out = layers.fully_connected( + action_out, + num_outputs=hidden, + activation_fn=activation, + normalizer_fn=layers.layer_norm) + else: + action_out = tf.layers.dense( + action_out, units=hidden, activation=activation) + action_out = tf.layers.dense( + action_out, units=self.dim_actions, activation=None) + + # Use sigmoid to scale to [0,1], but also double magnitude of input to + # emulate behaviour of tanh activation used in DDPG and TD3 papers. + sigmoid_out = tf.nn.sigmoid(2 * action_out) + # Rescale to actual env policy scale + # (shape of sigmoid_out is [batch_size, dim_actions], so we reshape to + # get same dims) + action_range = (action_space.high - action_space.low)[None] + low_action = action_space.low[None] + actions = action_range * sigmoid_out + low_action + + return actions, model + + def _add_exploration_noise(self, deterministic_actions, + should_be_stochastic, noise_scale, + enable_pure_exploration, action_space): + noise_type = self.config["exploration_noise_type"] + action_low = action_space.low + action_high = action_space.high + action_range = action_space.high - action_low + + def compute_stochastic_actions(): + def make_noisy_actions(): + # shape of deterministic_actions is [None, dim_action] + if noise_type == "gaussian": + # add IID Gaussian noise for exploration, TD3-style + normal_sample = noise_scale * tf.random_normal( + tf.shape(deterministic_actions), + stddev=self.config["exploration_gaussian_sigma"]) + stochastic_actions = tf.clip_by_value( + deterministic_actions + normal_sample, + action_low * tf.ones_like(deterministic_actions), + action_high * tf.ones_like(deterministic_actions)) + elif noise_type == "ou": + # add OU noise for exploration, DDPG-style + zero_acts = action_low.size * [.0] + exploration_sample = tf.get_variable( + name="ornstein_uhlenbeck", + dtype=tf.float32, + initializer=zero_acts, + trainable=False) + normal_sample = tf.random_normal( + shape=[action_low.size], mean=0.0, stddev=1.0) + ou_new = self.config["exploration_ou_theta"] \ + * -exploration_sample \ + + self.config["exploration_ou_sigma"] * normal_sample + exploration_value = tf.assign_add(exploration_sample, + ou_new) + base_scale = self.config["exploration_ou_noise_scale"] + noise = noise_scale * base_scale \ + * exploration_value * action_range + stochastic_actions = tf.clip_by_value( + deterministic_actions + noise, + action_low * tf.ones_like(deterministic_actions), + action_high * tf.ones_like(deterministic_actions)) + else: + raise ValueError( + "Unknown noise type '%s' (try 'ou' or 'gaussian')" % + noise_type) + return stochastic_actions + + def make_uniform_random_actions(): + # pure random exploration option + uniform_random_actions = tf.random_uniform( + tf.shape(deterministic_actions)) + # rescale uniform random actions according to action range + tf_range = tf.constant(action_range[None], dtype="float32") + tf_low = tf.constant(action_low[None], dtype="float32") + uniform_random_actions = uniform_random_actions * tf_range \ + + tf_low + return uniform_random_actions + + stochastic_actions = tf.cond( + # need to condition on noise_scale > 0 because zeroing + # noise_scale is how a worker signals no noise should be used + # (this is ugly and should be fixed by adding an "eval_mode" + # config flag or something) + tf.logical_and(enable_pure_exploration, noise_scale > 0), + true_fn=make_uniform_random_actions, + false_fn=make_noisy_actions) + return stochastic_actions + + enable_stochastic = tf.logical_and(should_be_stochastic, + not self.config["parameter_noise"]) + actions = tf.cond(enable_stochastic, compute_stochastic_actions, + lambda: deterministic_actions) + return actions + + def _build_actor_critic_loss(self, + q_t, + q_tp1, + q_t_det_policy, + twin_q_t=None, + twin_q_tp1=None): + twin_q = self.config["twin_q"] + gamma = self.config["gamma"] + n_step = self.config["n_step"] + use_huber = self.config["use_huber"] + huber_threshold = self.config["huber_threshold"] + + q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) + if twin_q: + twin_q_t_selected = tf.squeeze(twin_q_t, axis=len(q_t.shape) - 1) + q_tp1 = tf.minimum(q_tp1, twin_q_tp1) + + q_tp1_best = tf.squeeze(input=q_tp1, axis=len(q_tp1.shape) - 1) + q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = tf.stop_gradient( + self.rew_t + gamma**n_step * q_tp1_best_masked) + + # compute the error (potentially clipped) + if twin_q: + td_error = q_t_selected - q_t_selected_target + twin_td_error = twin_q_t_selected - q_t_selected_target + td_error = td_error + twin_td_error + if use_huber: + errors = huber_loss(td_error, huber_threshold) \ + + huber_loss(twin_td_error, huber_threshold) + else: + errors = 0.5 * tf.square(td_error) + 0.5 * tf.square( + twin_td_error) + else: + td_error = q_t_selected - q_t_selected_target + if use_huber: + errors = huber_loss(td_error, huber_threshold) + else: + errors = 0.5 * tf.square(td_error) + + critic_loss = tf.reduce_mean(self.importance_weights * errors) + actor_loss = -tf.reduce_mean(q_t_det_policy) + return critic_loss, actor_loss, td_error + + def _build_parameter_noise(self, pnet_params): + self.parameter_noise_sigma_val = self.config["exploration_ou_sigma"] + self.parameter_noise_sigma = tf.get_variable( + initializer=tf.constant_initializer( + self.parameter_noise_sigma_val), + name="parameter_noise_sigma", + shape=(), + trainable=False, + dtype=tf.float32) + self.parameter_noise = list() + # No need to add any noise on LayerNorm parameters + for var in pnet_params: + noise_var = tf.get_variable( + name=var.name.split(":")[0] + "_noise", + shape=var.shape, + initializer=tf.constant_initializer(.0), + trainable=False) + self.parameter_noise.append(noise_var) + remove_noise_ops = list() + for var, var_noise in zip(pnet_params, self.parameter_noise): + remove_noise_ops.append(tf.assign_add(var, -var_noise)) + self.remove_noise_op = tf.group(*tuple(remove_noise_ops)) + generate_noise_ops = list() + for var_noise in self.parameter_noise: + generate_noise_ops.append( + tf.assign( + var_noise, + tf.random_normal( + shape=var_noise.shape, + stddev=self.parameter_noise_sigma))) + with tf.control_dependencies(generate_noise_ops): + add_noise_ops = list() + for var, var_noise in zip(pnet_params, self.parameter_noise): + add_noise_ops.append(tf.assign_add(var, var_noise)) + self.add_noise_op = tf.group(*tuple(add_noise_ops)) + self.pi_distance = None + + def compute_td_error(self, obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + td_err = self.sess.run( + self.td_error, + feed_dict={ + self.obs_t: [np.array(ob) for ob in obs_t], + self.act_t: act_t, + self.rew_t: rew_t, + self.obs_tp1: [np.array(ob) for ob in obs_tp1], + self.done_mask: done_mask, + self.importance_weights: importance_weights + }) + return td_err + + def reset_noise(self, sess): + sess.run(self.reset_noise_op) + + def add_parameter_noise(self): + if self.config["parameter_noise"]: + self.sess.run(self.add_noise_op) # support both hard and soft sync def update_target(self, tau=None): - self._do_update(np.float32(tau or self.config.get("tau"))) + tau = tau or self.tau_value + return self.sess.run( + self.update_target_expr, feed_dict={self.tau: tau}) + def set_epsilon(self, epsilon): + # set_epsilon is called by optimizer to anneal exploration as + # necessary, and to turn it off during evaluation. The "epsilon" part + # is a carry-over from DQN, which uses epsilon-greedy exploration + # rather than adding action noise to the output of a policy network. + self.cur_noise_scale = epsilon -class ActorCriticOptimizerMixin(object): - def __init__(self, config): - # create global step for counting the number of update operations - self.global_step = tf.train.get_or_create_global_step() - - # use separate optimizers for actor & critic - self._actor_optimizer = tf.train.AdamOptimizer( - learning_rate=config["actor_lr"]) - self._critic_optimizer = tf.train.AdamOptimizer( - learning_rate=config["critic_lr"]) - - -class ComputeTDErrorMixin(object): - def __init__(self): - @make_tf_callable(self.get_session(), dynamic_shape=True) - def compute_td_error(obs_t, act_t, rew_t, obs_tp1, done_mask, - importance_weights): - if not self.loss_initialized(): - return tf.zeros_like(rew_t) - - # Do forward pass on loss to update td error attribute - actor_critic_loss( - self, self.model, None, { - SampleBatch.CUR_OBS: tf.convert_to_tensor(obs_t), - SampleBatch.ACTIONS: tf.convert_to_tensor(act_t), - SampleBatch.REWARDS: tf.convert_to_tensor(rew_t), - SampleBatch.NEXT_OBS: tf.convert_to_tensor(obs_tp1), - SampleBatch.DONES: tf.convert_to_tensor(done_mask), - PRIO_WEIGHTS: tf.convert_to_tensor(importance_weights), - }) - - return self.td_error - - self.compute_td_error = compute_td_error - - -def setup_early_mixins(policy, obs_space, action_space, config): - ExplorationStateMixin.__init__(policy, obs_space, action_space, config) - ActorCriticOptimizerMixin.__init__(policy, config) - - -def setup_mid_mixins(policy, obs_space, action_space, config): - ComputeTDErrorMixin.__init__(policy) - - -def setup_late_mixins(policy, obs_space, action_space, config): - TargetNetworkMixin.__init__(policy, config) - - -DDPGTFPolicy = build_tf_policy( - name="DDPGTFPolicy", - get_default_config=lambda: ray.rllib.agents.ddpg.ddpg.DEFAULT_CONFIG, - make_model=build_ddpg_model, - postprocess_fn=postprocess_trajectory, - action_sampler_fn=build_action_output, - loss_fn=actor_critic_loss, - stats_fn=stats, - gradients_fn=gradients, - apply_gradients_fn=apply_gradients, - extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, - mixins=[ - TargetNetworkMixin, ExplorationStateMixin, ActorCriticOptimizerMixin, - ComputeTDErrorMixin - ], - before_init=setup_early_mixins, - before_loss_init=setup_mid_mixins, - after_init=setup_late_mixins, - obs_include_prev_action_reward=False) + def set_pure_exploration_phase(self, pure_exploration_phase): + self.cur_pure_exploration_phase = pure_exploration_phase diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index f339b512d..761feea62 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -184,8 +184,9 @@ def check_config_and_setup_param_noise(config): # between noisy policy and original policy policies = info["policy"] episode = info["episode"] - episode.custom_metrics["policy_distance"] = policies[ - DEFAULT_POLICY_ID].model.pi_distance + model = policies[DEFAULT_POLICY_ID].model + if hasattr(model, "pi_distance"): + episode.custom_metrics["policy_distance"] = model.pi_distance if end_callback: end_callback(info) diff --git a/rllib/agents/sac/sac_policy.py b/rllib/agents/sac/sac_policy.py index 8fb4e36e6..b7ad09c53 100644 --- a/rllib/agents/sac/sac_policy.py +++ b/rllib/agents/sac/sac_policy.py @@ -10,15 +10,13 @@ import ray import ray.experimental.tf_utils from ray.rllib.agents.sac.sac_model import SACModel from ray.rllib.agents.ddpg.noop_model import NoopModel -from ray.rllib.agents.ddpg.ddpg_policy import ComputeTDErrorMixin, \ - TargetNetworkMixin from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn, PRIO_WEIGHTS from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy_template import build_tf_policy from ray.rllib.models import ModelCatalog from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.utils import try_import_tf, try_import_tfp -from ray.rllib.utils.tf_ops import minimize_and_clip +from ray.rllib.utils.tf_ops import minimize_and_clip, make_tf_callable tf = try_import_tf() tfp = try_import_tfp() @@ -287,6 +285,55 @@ class ActorCriticOptimizerMixin(object): learning_rate=config["optimization"]["entropy_learning_rate"]) +class ComputeTDErrorMixin(object): + def __init__(self): + @make_tf_callable(self.get_session(), dynamic_shape=True) + def compute_td_error(obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + if not self.loss_initialized(): + return tf.zeros_like(rew_t) + + # Do forward pass on loss to update td error attribute + actor_critic_loss( + self, self.model, None, { + SampleBatch.CUR_OBS: tf.convert_to_tensor(obs_t), + SampleBatch.ACTIONS: tf.convert_to_tensor(act_t), + SampleBatch.REWARDS: tf.convert_to_tensor(rew_t), + SampleBatch.NEXT_OBS: tf.convert_to_tensor(obs_tp1), + SampleBatch.DONES: tf.convert_to_tensor(done_mask), + PRIO_WEIGHTS: tf.convert_to_tensor(importance_weights), + }) + + return self.td_error + + self.compute_td_error = compute_td_error + + +class TargetNetworkMixin(object): + def __init__(self, config): + @make_tf_callable(self.get_session()) + def update_target_fn(tau): + tau = tf.convert_to_tensor(tau, dtype=tf.float32) + update_target_expr = [] + model_vars = self.model.trainable_variables() + target_model_vars = self.target_model.trainable_variables() + assert len(model_vars) == len(target_model_vars), \ + (model_vars, target_model_vars) + for var, var_target in zip(model_vars, target_model_vars): + update_target_expr.append( + var_target.assign(tau * var + (1.0 - tau) * var_target)) + logger.debug("Update target op {}".format(var_target)) + return tf.group(*update_target_expr) + + # Hard initial update + self._do_update = update_target_fn + self.update_target(tau=1.0) + + # support both hard and soft sync + def update_target(self, tau=None): + self._do_update(np.float32(tau or self.config.get("tau"))) + + def setup_early_mixins(policy, obs_space, action_space, config): ExplorationStateMixin.__init__(policy, obs_space, action_space, config) ActorCriticOptimizerMixin.__init__(policy, config) diff --git a/rllib/tests/test_eager_support.py b/rllib/tests/test_eager_support.py index 9bcb7212b..359926a2a 100644 --- a/rllib/tests/test_eager_support.py +++ b/rllib/tests/test_eager_support.py @@ -56,30 +56,6 @@ class TestEagerSupport(unittest.TestCase): "timesteps_per_iteration": 100 }) - def testDDPG(self): - check_support("DDPG", { - "num_workers": 0, - "learning_starts": 0, - "timesteps_per_iteration": 10 - }) - - def testTD3(self): - check_support("TD3", { - "num_workers": 0, - "learning_starts": 0, - "timesteps_per_iteration": 10 - }) - - def testAPEX_DDPG(self): - check_support( - "APEX_DDPG", { - "num_workers": 2, - "learning_starts": 0, - "num_gpus": 0, - "min_iter_time_s": 1, - "timesteps_per_iteration": 100 - }) - def testSAC(self): check_support("SAC", { "num_workers": 0,