diff --git a/python/ray/rllib/agents/ddpg/ddpg.py b/python/ray/rllib/agents/ddpg/ddpg.py index a6b42f1ca..9deeb06cc 100644 --- a/python/ray/rllib/agents/ddpg/ddpg.py +++ b/python/ray/rllib/agents/ddpg/ddpg.py @@ -41,7 +41,7 @@ DEFAULT_CONFIG = with_common_config({ # === Model === # Apply a state preprocessor with spec given by the "model" config option # (like other RL algorithms). This is mostly useful if you have a weird - # observation shape, like an image. Disabled by default. + # observation shape, like an image. Auto-enabled if a custom model is set. "use_state_preprocessor": False, # Postprocess the policy network model output with these hidden layers. If # use_state_preprocessor is False, then these will be the *only* hidden @@ -173,7 +173,7 @@ def make_exploration_schedule(config, worker_index): if config["per_worker_exploration"]: assert config["num_workers"] > 1, "This requires multiple workers" if worker_index >= 0: - # FIXME: what do magic constants mean? (0.4, 7) + # Exploration constants from the Ape-X paper max_index = float(config["num_workers"] - 1) exponent = 1 + worker_index / max_index * 7 return ConstantSchedule(0.4**exponent) diff --git a/python/ray/rllib/agents/ddpg/ddpg_model.py b/python/ray/rllib/agents/ddpg/ddpg_model.py new file mode 100644 index 000000000..4c1ecba0e --- /dev/null +++ b/python/ray/rllib/agents/ddpg/ddpg_model.py @@ -0,0 +1,246 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + + +class DDPGModel(TFModelV2): + """Extension of standard TFModel for DDPG. + + Data flow: + obs -> forward() -> model_out + model_out -> get_policy_output() -> pi(s) + model_out, actions -> get_q_values() -> Q(s, a) + model_out, actions -> get_twin_q_values() -> Q_twin(s, a) + + Note that this class by itself is not a valid model unless you + implement forward() in a subclass.""" + + def __init__(self, + obs_space, + action_space, + num_outputs, + model_config, + name, + actor_hidden_activation="relu", + actor_hiddens=(400, 300), + critic_hidden_activation="relu", + critic_hiddens=(400, 300), + parameter_noise=False, + twin_q=False, + exploration_ou_sigma=0.2): + """Initialize variables of this model. + + Extra model kwargs: + actor_hidden_activation (str): activation for actor network + actor_hiddens (list): hidden layers sizes for actor network + critic_hidden_activation (str): activation for critic network + critic_hiddens (list): hidden layers sizes for critic network + parameter_noise (bool): use param noise exploration + twin_q (bool): build twin Q networks + exploration_ou_sigma (float): ou noise sigma for exploration + + Note that the core layers for forward() are not defined here, this + only defines the layers for the output heads. Those layers for + forward() should be defined in subclasses of DDPGModel. + """ + + super(DDPGModel, self).__init__(obs_space, action_space, num_outputs, + model_config, name) + self.exploration_ou_sigma = exploration_ou_sigma + + self.action_dim = np.product(action_space.shape) + self.model_out = tf.keras.layers.Input( + shape=(num_outputs, ), name="model_out") + self.actions = tf.keras.layers.Input( + shape=(self.action_dim, ), name="actions") + + def build_action_net(action_out): + activation = getattr(tf.nn, actor_hidden_activation) + i = 0 + for hidden in actor_hiddens: + if parameter_noise: + import tensorflow.contrib.layers as layers + action_out = layers.fully_connected( + action_out, + num_outputs=hidden, + activation_fn=activation, + normalizer_fn=layers.layer_norm) + else: + action_out = tf.layers.dense( + action_out, + units=hidden, + activation=activation, + name="action_hidden_{}".format(i)) + i += 1 + return tf.layers.dense( + action_out, + units=self.action_dim, + activation=None, + name="action_out") + + action_scope = name + "/action_net" + + # TODO(ekl) use keras layers instead of variable scopes + def build_action_net_scope(model_out): + with tf.variable_scope(action_scope, reuse=tf.AUTO_REUSE): + return build_action_net(model_out) + + pi_out = tf.keras.layers.Lambda(build_action_net_scope)(self.model_out) + self.action_net = tf.keras.Model(self.model_out, pi_out) + self.register_variables(self.action_net.variables) + + # Noise vars for P network except for layer normalization vars + if parameter_noise: + with tf.variable_scope(action_scope, reuse=tf.AUTO_REUSE): + self._build_parameter_noise([ + var for var in self.action_net.variables + if "LayerNorm" not in var.name + ]) + + def build_q_net(name, model_out, actions): + q_out = tf.keras.layers.Concatenate(axis=1)([model_out, actions]) + activation = getattr(tf.nn, critic_hidden_activation) + for i, n in enumerate(critic_hiddens): + q_out = tf.keras.layers.Dense( + n, + name="{}_hidden_{}".format(name, i), + activation=activation)(q_out) + q_out = tf.keras.layers.Dense( + 1, activation=None, name="{}_out".format(name))(q_out) + return tf.keras.Model([model_out, actions], q_out) + + self.q_net = build_q_net("q", self.model_out, self.actions) + self.register_variables(self.q_net.variables) + + if twin_q: + self.twin_q_net = build_q_net("twin_q", self.model_out, + self.actions) + self.register_variables(self.twin_q_net.variables) + else: + self.twin_q_net = None + + def forward(self, input_dict, state, seq_lens): + """This generates the model_out tensor input. + + You must implement this as documented in modelv2.py.""" + raise NotImplementedError + + def get_policy_output(self, model_out): + """Return the (unscaled) output of the policy network. + + This returns the unscaled outputs of pi(s). + + Arguments: + model_out (Tensor): obs embeddings from the model layers, of shape + [BATCH_SIZE, num_outputs]. + + Returns: + tensor of shape [BATCH_SIZE, action_dim] with range [-inf, inf]. + """ + return self.action_net(model_out) + + def get_q_values(self, model_out, actions): + """Return the Q estimates for the most recent forward pass. + + This implements Q(s, a). + + Arguments: + model_out (Tensor): obs embeddings from the model layers, of shape + [BATCH_SIZE, num_outputs]. + actions (Tensor): action values that correspond with the most + recent batch of observations passed through forward(), of shape + [BATCH_SIZE, action_dim]. + + Returns: + tensor of shape [BATCH_SIZE]. + """ + return self.q_net([model_out, actions]) + + def get_twin_q_values(self, model_out, actions): + """Same as get_q_values but using the twin Q net. + + This implements the twin Q(s, a). + + Arguments: + model_out (Tensor): obs embeddings from the model layers, of shape + [BATCH_SIZE, num_outputs]. + actions (Tensor): action values that correspond with the most + recent batch of observations passed through forward(), of shape + [BATCH_SIZE, action_dim]. + + Returns: + tensor of shape [BATCH_SIZE]. + """ + return self.twin_q_net([model_out, actions]) + + def policy_variables(self): + """Return the list of variables for the policy net.""" + + return list(self.action_net.variables) + + def q_variables(self): + """Return the list of variables for Q / twin Q nets.""" + + return self.q_net.variables + (self.twin_q_net.variables + if self.twin_q_net else []) + + def update_action_noise(self, session, distance_in_action_space, + exploration_ou_sigma, cur_noise_scale): + """Update the model action noise settings. + + This is called internally by the DDPG policy.""" + + self.pi_distance = distance_in_action_space + if (distance_in_action_space < exploration_ou_sigma * cur_noise_scale): + # multiplying the sampled OU noise by noise scale is + # equivalent to multiplying the sigma of OU by noise scale + self.parameter_noise_sigma_val *= 1.01 + else: + self.parameter_noise_sigma_val /= 1.01 + self.parameter_noise_sigma.load( + self.parameter_noise_sigma_val, session=session) + + def _build_parameter_noise(self, pnet_params): + assert pnet_params + self.parameter_noise_sigma_val = self.exploration_ou_sigma + self.parameter_noise_sigma = tf.get_variable( + initializer=tf.constant_initializer( + self.parameter_noise_sigma_val), + name="parameter_noise_sigma", + shape=(), + trainable=False, + dtype=tf.float32) + self.parameter_noise = [] + # No need to add any noise on LayerNorm parameters + for var in pnet_params: + noise_var = tf.get_variable( + name=var.name.split(":")[0] + "_noise", + shape=var.shape, + initializer=tf.constant_initializer(.0), + trainable=False) + self.parameter_noise.append(noise_var) + remove_noise_ops = list() + for var, var_noise in zip(pnet_params, self.parameter_noise): + remove_noise_ops.append(tf.assign_add(var, -var_noise)) + self.remove_noise_op = tf.group(*tuple(remove_noise_ops)) + generate_noise_ops = list() + for var_noise in self.parameter_noise: + generate_noise_ops.append( + tf.assign( + var_noise, + tf.random_normal( + shape=var_noise.shape, + stddev=self.parameter_noise_sigma))) + with tf.control_dependencies(generate_noise_ops): + add_noise_ops = list() + for var, var_noise in zip(pnet_params, self.parameter_noise): + add_noise_ops.append(tf.assign_add(var, var_noise)) + self.add_noise_op = tf.group(*tuple(add_noise_ops)) + self.pi_distance = None diff --git a/python/ray/rllib/agents/ddpg/ddpg_policy.py b/python/ray/rllib/agents/ddpg/ddpg_policy.py index 95e4bd121..a9fd52be9 100644 --- a/python/ray/rllib/agents/ddpg/ddpg_policy.py +++ b/python/ray/rllib/agents/ddpg/ddpg_policy.py @@ -4,393 +4,401 @@ from __future__ import print_function from gym.spaces import Box import numpy as np +import logging import ray import ray.experimental.tf_utils -from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn +from ray.rllib.agents.ddpg.ddpg_model import DDPGModel +from ray.rllib.agents.ddpg.noop_model import NoopModel +from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn, PRIO_WEIGHTS from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY +from ray.rllib.policy.tf_policy_template import build_tf_policy from ray.rllib.models import ModelCatalog from ray.rllib.utils.annotations import override from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.policy.policy import Policy from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.utils import try_import_tf -from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip, scope_vars +from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip tf = try_import_tf() - -ACTION_SCOPE = "action" -POLICY_SCOPE = "policy" -POLICY_TARGET_SCOPE = "target_policy" -Q_SCOPE = "critic" -Q_TARGET_SCOPE = "target_critic" -TWIN_Q_SCOPE = "twin_critic" -TWIN_Q_TARGET_SCOPE = "twin_target_critic" - -# Importance sampling weights for prioritized replay -PRIO_WEIGHTS = "weights" +logger = logging.getLogger(__name__) -class DDPGPostprocessing(object): - """Implements n-step learning and param noise adjustments.""" +def build_ddpg_model(policy, obs_space, action_space, config): + if config["model"]["custom_model"]: + logger.warning( + "Setting use_state_preprocessor=True since a custom model " + "was specified.") + config["use_state_preprocessor"] = True + if not isinstance(action_space, Box): + raise UnsupportedSpaceException( + "Action space {} is not supported for DDPG.".format(action_space)) + if len(action_space.shape) > 1: + raise UnsupportedSpaceException( + "Action space has multiple dimensions " + "{}. ".format(action_space.shape) + + "Consider reshaping this into a single dimension, " + "using a Tuple action space, or the multi-agent API.") - @override(Policy) - def postprocess_trajectory(self, - sample_batch, - other_agent_batches=None, - episode=None): - if self.config["parameter_noise"]: - # adjust the sigma of parameter space noise - states, noisy_actions = [ - list(x) for x in sample_batch.columns( - [SampleBatch.CUR_OBS, SampleBatch.ACTIONS]) - ] - self.sess.run(self.remove_noise_op) - clean_actions = self.sess.run( - self.output_actions, - feed_dict={ - self.cur_observations: states, - self.stochastic: False, - self.noise_scale: .0, - self.pure_exploration_phase: False, - }) - distance_in_action_space = np.sqrt( - np.mean(np.square(clean_actions - noisy_actions))) - self.pi_distance = distance_in_action_space - if distance_in_action_space < \ - self.config["exploration_ou_sigma"] * self.cur_noise_scale: - # multiplying the sampled OU noise by noise scale is - # equivalent to multiplying the sigma of OU by noise scale - self.parameter_noise_sigma_val *= 1.01 + if config["use_state_preprocessor"]: + default_model = None # catalog decides + num_outputs = 256 # arbitrary + config["model"]["no_final_linear"] = True + else: + default_model = NoopModel + num_outputs = int(np.product(obs_space.shape)) + + policy.model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + model_interface=DDPGModel, + default_model=default_model, + name="ddpg_model", + actor_hidden_activation=config["actor_hidden_activation"], + actor_hiddens=config["actor_hiddens"], + critic_hidden_activation=config["critic_hidden_activation"], + critic_hiddens=config["critic_hiddens"], + parameter_noise=config["parameter_noise"], + twin_q=config["twin_q"]) + + policy.target_model = ModelCatalog.get_model_v2( + obs_space, + action_space, + num_outputs, + config["model"], + framework="tf", + model_interface=DDPGModel, + default_model=default_model, + name="target_ddpg_model", + actor_hidden_activation=config["actor_hidden_activation"], + actor_hiddens=config["actor_hiddens"], + critic_hidden_activation=config["critic_hidden_activation"], + critic_hiddens=config["critic_hiddens"], + parameter_noise=config["parameter_noise"], + twin_q=config["twin_q"]) + + return policy.model + + +def postprocess_trajectory(policy, + sample_batch, + other_agent_batches=None, + episode=None): + if policy.config["parameter_noise"]: + policy.adjust_param_noise_sigma(sample_batch) + return _postprocess_dqn(policy, sample_batch) + + +def exploration_setting_inputs(policy): + return { + policy.stochastic: True, + policy.noise_scale: policy.cur_noise_scale, + policy.pure_exploration_phase: policy.cur_pure_exploration_phase, + } + + +def build_action_output(policy, model, input_dict, obs_space, action_space, + config): + model_out, _ = model({ + "obs": input_dict[SampleBatch.CUR_OBS], + "is_training": policy._get_is_training_placeholder(), + }, [], None) + action_out = model.get_policy_output(model_out) + + # Use sigmoid to scale to [0,1], but also double magnitude of input to + # emulate behaviour of tanh activation used in DDPG and TD3 papers. + sigmoid_out = tf.nn.sigmoid(2 * action_out) + # Rescale to actual env policy scale + # (shape of sigmoid_out is [batch_size, dim_actions], so we reshape to + # get same dims) + action_range = (action_space.high - action_space.low)[None] + low_action = action_space.low[None] + deterministic_actions = action_range * sigmoid_out + low_action + + noise_type = config["exploration_noise_type"] + action_low = action_space.low + action_high = action_space.high + action_range = action_space.high - action_low + + def compute_stochastic_actions(): + def make_noisy_actions(): + # shape of deterministic_actions is [None, dim_action] + if noise_type == "gaussian": + # add IID Gaussian noise for exploration, TD3-style + normal_sample = policy.noise_scale * tf.random_normal( + tf.shape(deterministic_actions), + stddev=config["exploration_gaussian_sigma"]) + stochastic_actions = tf.clip_by_value( + deterministic_actions + normal_sample, + action_low * tf.ones_like(deterministic_actions), + action_high * tf.ones_like(deterministic_actions)) + elif noise_type == "ou": + # add OU noise for exploration, DDPG-style + zero_acts = action_low.size * [.0] + exploration_sample = tf.get_variable( + name="ornstein_uhlenbeck", + dtype=tf.float32, + initializer=zero_acts, + trainable=False) + normal_sample = tf.random_normal( + shape=[action_low.size], mean=0.0, stddev=1.0) + ou_new = config["exploration_ou_theta"] \ + * -exploration_sample \ + + config["exploration_ou_sigma"] * normal_sample + exploration_value = tf.assign_add(exploration_sample, ou_new) + base_scale = config["exploration_ou_noise_scale"] + noise = policy.noise_scale * base_scale \ + * exploration_value * action_range + stochastic_actions = tf.clip_by_value( + deterministic_actions + noise, + action_low * tf.ones_like(deterministic_actions), + action_high * tf.ones_like(deterministic_actions)) else: - self.parameter_noise_sigma_val /= 1.01 - self.parameter_noise_sigma.load( - self.parameter_noise_sigma_val, session=self.sess) + raise ValueError( + "Unknown noise type '%s' (try 'ou' or 'gaussian')" % + noise_type) + return stochastic_actions - return _postprocess_dqn(self, sample_batch) + def make_uniform_random_actions(): + # pure random exploration option + uniform_random_actions = tf.random_uniform( + tf.shape(deterministic_actions)) + # rescale uniform random actions according to action range + tf_range = tf.constant(action_range[None], dtype="float32") + tf_low = tf.constant(action_low[None], dtype="float32") + uniform_random_actions = uniform_random_actions * tf_range \ + + tf_low + return uniform_random_actions + + stochastic_actions = tf.cond( + # need to condition on noise_scale > 0 because zeroing + # noise_scale is how a worker signals no noise should be used + # (this is ugly and should be fixed by adding an "eval_mode" + # config flag or something) + tf.logical_and(policy.pure_exploration_phase, + policy.noise_scale > 0), + true_fn=make_uniform_random_actions, + false_fn=make_noisy_actions) + return stochastic_actions + + enable_stochastic = tf.logical_and(policy.stochastic, + not config["parameter_noise"]) + actions = tf.cond(enable_stochastic, compute_stochastic_actions, + lambda: deterministic_actions) + policy.output_actions = actions + return actions, None -class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): - def __init__(self, observation_space, action_space, config): - config = dict(ray.rllib.agents.ddpg.ddpg.DEFAULT_CONFIG, **config) - if not isinstance(action_space, Box): - raise UnsupportedSpaceException( - "Action space {} is not supported for DDPG.".format( - action_space)) - if len(action_space.shape) > 1: - raise UnsupportedSpaceException( - "Action space has multiple dimensions " - "{}. ".format(action_space.shape) + - "Consider reshaping this into a single dimension, " - "using a Tuple action space, or the multi-agent API.") +def actor_critic_loss(policy, batch_tensors): + model_out_t, _ = policy.model({ + "obs": batch_tensors[SampleBatch.CUR_OBS], + "is_training": policy._get_is_training_placeholder(), + }, [], None) - self.config = config + model_out_tp1, _ = policy.model({ + "obs": batch_tensors[SampleBatch.NEXT_OBS], + "is_training": policy._get_is_training_placeholder(), + }, [], None) + + target_model_out_tp1, _ = policy.target_model({ + "obs": batch_tensors[SampleBatch.NEXT_OBS], + "is_training": policy._get_is_training_placeholder(), + }, [], None) + + policy_t = policy.model.get_policy_output(model_out_t) + policy_tp1 = policy.model.get_policy_output(model_out_tp1) + + if policy.config["smooth_target_policy"]: + target_noise_clip = policy.config["target_noise_clip"] + clipped_normal_sample = tf.clip_by_value( + tf.random_normal( + tf.shape(policy_tp1), stddev=policy.config["target_noise"]), + -target_noise_clip, target_noise_clip) + policy_tp1_smoothed = tf.clip_by_value( + policy_tp1 + clipped_normal_sample, + policy.action_space.low * tf.ones_like(policy_tp1), + policy.action_space.high * tf.ones_like(policy_tp1)) + else: + policy_tp1_smoothed = policy_tp1 + + # q network evaluation + q_t = policy.model.get_q_values(model_out_t, + batch_tensors[SampleBatch.ACTIONS]) + if policy.config["twin_q"]: + twin_q_t = policy.model.get_twin_q_values( + model_out_t, batch_tensors[SampleBatch.ACTIONS]) + + # Q-values for current policy (no noise) in given current state + q_t_det_policy = policy.model.get_q_values(model_out_t, policy_t) + + # target q network evaluation + q_tp1 = policy.target_model.get_q_values(target_model_out_tp1, + policy_tp1_smoothed) + if policy.config["twin_q"]: + twin_q_tp1 = policy.target_model.get_twin_q_values( + target_model_out_tp1, policy_tp1_smoothed) + + q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) + if policy.config["twin_q"]: + twin_q_t_selected = tf.squeeze(twin_q_t, axis=len(q_t.shape) - 1) + q_tp1 = tf.minimum(q_tp1, twin_q_tp1) + + q_tp1_best = tf.squeeze(input=q_tp1, axis=len(q_tp1.shape) - 1) + q_tp1_best_masked = (1.0 - tf.cast(batch_tensors[SampleBatch.DONES], + tf.float32)) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = tf.stop_gradient( + batch_tensors[SampleBatch.REWARDS] + + policy.config["gamma"]**policy.config["n_step"] * q_tp1_best_masked) + + # compute the error (potentially clipped) + if policy.config["twin_q"]: + td_error = q_t_selected - q_t_selected_target + twin_td_error = twin_q_t_selected - q_t_selected_target + td_error = td_error + twin_td_error + if policy.config["use_huber"]: + errors = huber_loss(td_error, policy.config["huber_threshold"]) \ + + huber_loss(twin_td_error, policy.config["huber_threshold"]) + else: + errors = 0.5 * tf.square(td_error) + 0.5 * tf.square(twin_td_error) + else: + td_error = q_t_selected - q_t_selected_target + if policy.config["use_huber"]: + errors = huber_loss(td_error, policy.config["huber_threshold"]) + else: + errors = 0.5 * tf.square(td_error) + + critic_loss = policy.model.custom_loss( + tf.reduce_mean(batch_tensors[PRIO_WEIGHTS] * errors), batch_tensors) + actor_loss = -tf.reduce_mean(q_t_det_policy) + + if policy.config["l2_reg"] is not None: + for var in policy.model.policy_variables(): + if "bias" not in var.name: + actor_loss += policy.config["l2_reg"] * tf.nn.l2_loss(var) + for var in policy.model.q_variables(): + if "bias" not in var.name: + critic_loss += policy.config["l2_reg"] * tf.nn.l2_loss(var) + + # save for stats function + policy.q_t = q_t + policy.td_error = td_error + policy.actor_loss = actor_loss + policy.critic_loss = critic_loss + + # in a custom apply op we handle the losses separately, but return them + # combined in one loss for now + return actor_loss + critic_loss + + +def gradients(policy, optimizer, loss): + if policy.config["grad_norm_clipping"] is not None: + actor_grads_and_vars = minimize_and_clip( + policy._actor_optimizer, + policy.actor_loss, + var_list=policy.model.policy_variables(), + clip_val=policy.config["grad_norm_clipping"]) + critic_grads_and_vars = minimize_and_clip( + policy._critic_optimizer, + policy.critic_loss, + var_list=policy.model.q_variables(), + clip_val=policy.config["grad_norm_clipping"]) + else: + actor_grads_and_vars = policy._actor_optimizer.compute_gradients( + policy.actor_loss, var_list=policy.model.policy_variables()) + critic_grads_and_vars = policy._critic_optimizer.compute_gradients( + policy.critic_loss, var_list=policy.model.q_variables()) + # save these for later use in build_apply_op + policy._actor_grads_and_vars = [(g, v) for (g, v) in actor_grads_and_vars + if g is not None] + policy._critic_grads_and_vars = [(g, v) for (g, v) in critic_grads_and_vars + if g is not None] + grads_and_vars = ( + policy._actor_grads_and_vars + policy._critic_grads_and_vars) + return grads_and_vars + + +def apply_gradients(policy, optimizer, grads_and_vars): + # for policy gradient, update policy net one time v.s. + # update critic net `policy_delay` time(s) + should_apply_actor_opt = tf.equal( + tf.mod(policy.global_step, policy.config["policy_delay"]), 0) + + def make_apply_op(): + return policy._actor_optimizer.apply_gradients( + policy._actor_grads_and_vars) + + actor_op = tf.cond( + should_apply_actor_opt, + true_fn=make_apply_op, + false_fn=lambda: tf.no_op()) + critic_op = policy._critic_optimizer.apply_gradients( + policy._critic_grads_and_vars) + + # increment global step & apply ops + with tf.control_dependencies([tf.assign_add(policy.global_step, 1)]): + return tf.group(actor_op, critic_op) + + +def stats(policy, batch_tensors): + return { + "td_error": tf.reduce_mean(policy.td_error), + "actor_loss": tf.reduce_mean(policy.actor_loss), + "critic_loss": tf.reduce_mean(policy.critic_loss), + "mean_q": tf.reduce_mean(policy.q_t), + "max_q": tf.reduce_max(policy.q_t), + "min_q": tf.reduce_min(policy.q_t), + } + + +class ExplorationStateMixin(object): + def __init__(self, obs_space, action_space, config): self.cur_noise_scale = 1.0 self.cur_pure_exploration_phase = False - self.dim_actions = action_space.shape[0] - self.low_action = action_space.low - self.high_action = action_space.high - - # create global step for counting the number of update operations - self.global_step = tf.train.get_or_create_global_step() - - # use separate optimizers for actor & critic - self._actor_optimizer = tf.train.AdamOptimizer( - learning_rate=self.config["actor_lr"]) - self._critic_optimizer = tf.train.AdamOptimizer( - learning_rate=self.config["critic_lr"]) - - # Action inputs self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") self.noise_scale = tf.placeholder(tf.float32, (), name="noise_scale") self.pure_exploration_phase = tf.placeholder( tf.bool, (), name="pure_exploration_phase") - self.cur_observations = tf.placeholder( - tf.float32, - shape=(None, ) + observation_space.shape, - name="cur_obs") - with tf.variable_scope(POLICY_SCOPE) as scope: - policy_out, self.policy_model = self._build_policy_network( - self.cur_observations, observation_space, action_space) - self.policy_vars = scope_vars(scope.name) - - # Noise vars for P network except for layer normalization vars + def add_parameter_noise(self): if self.config["parameter_noise"]: - self._build_parameter_noise([ - var for var in self.policy_vars if "LayerNorm" not in var.name - ]) + self.get_session().run(self.model.add_noise_op) - # Action outputs - with tf.variable_scope(ACTION_SCOPE): - self.output_actions = self._add_exploration_noise( - policy_out, self.stochastic, self.noise_scale, - self.pure_exploration_phase, action_space) - - if self.config["smooth_target_policy"]: - self.reset_noise_op = tf.no_op() - else: - with tf.variable_scope(ACTION_SCOPE, reuse=True): - exploration_sample = tf.get_variable(name="ornstein_uhlenbeck") - self.reset_noise_op = tf.assign(exploration_sample, - self.dim_actions * [.0]) - - # Replay inputs - self.obs_t = tf.placeholder( - tf.float32, - shape=(None, ) + observation_space.shape, - name="observation") - self.act_t = tf.placeholder( - tf.float32, shape=(None, ) + action_space.shape, name="action") - self.rew_t = tf.placeholder(tf.float32, [None], name="reward") - self.obs_tp1 = tf.placeholder( - tf.float32, shape=(None, ) + observation_space.shape) - self.done_mask = tf.placeholder(tf.float32, [None], name="done") - self.importance_weights = tf.placeholder( - tf.float32, [None], name="weight") - - # policy network evaluation - with tf.variable_scope(POLICY_SCOPE, reuse=True) as scope: - prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - self.policy_t, _ = self._build_policy_network( - self.obs_t, observation_space, action_space) - policy_batchnorm_update_ops = list( - set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - - prev_update_ops) - - # target policy network evaluation - with tf.variable_scope(POLICY_TARGET_SCOPE) as scope: - policy_tp1, _ = self._build_policy_network( - self.obs_tp1, observation_space, action_space) - target_policy_vars = scope_vars(scope.name) - - # Action outputs - with tf.variable_scope(ACTION_SCOPE, reuse=True): - if config["smooth_target_policy"]: - target_noise_clip = self.config["target_noise_clip"] - clipped_normal_sample = tf.clip_by_value( - tf.random_normal( - tf.shape(policy_tp1), - stddev=self.config["target_noise"]), - -target_noise_clip, target_noise_clip) - policy_tp1_smoothed = tf.clip_by_value( - policy_tp1 + clipped_normal_sample, - action_space.low * tf.ones_like(policy_tp1), - action_space.high * tf.ones_like(policy_tp1)) - else: - # no smoothing, just use deterministic actions - policy_tp1_smoothed = policy_tp1 - - # q network evaluation - prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - with tf.variable_scope(Q_SCOPE) as scope: - # Q-values for given actions & observations in given current - q_t, self.q_model = self._build_q_network( - self.obs_t, observation_space, action_space, self.act_t) - self.q_func_vars = scope_vars(scope.name) - self.stats = { - "mean_q": tf.reduce_mean(q_t), - "max_q": tf.reduce_max(q_t), - "min_q": tf.reduce_min(q_t), - } - with tf.variable_scope(Q_SCOPE, reuse=True): - # Q-values for current policy (no noise) in given current state - q_t_det_policy, _ = self._build_q_network( - self.obs_t, observation_space, action_space, self.policy_t) - if self.config["twin_q"]: - with tf.variable_scope(TWIN_Q_SCOPE) as scope: - twin_q_t, self.twin_q_model = self._build_q_network( - self.obs_t, observation_space, action_space, self.act_t) - self.twin_q_func_vars = scope_vars(scope.name) - q_batchnorm_update_ops = list( - set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops) - - # target q network evaluation - with tf.variable_scope(Q_TARGET_SCOPE) as scope: - q_tp1, _ = self._build_q_network(self.obs_tp1, observation_space, - action_space, policy_tp1_smoothed) - target_q_func_vars = scope_vars(scope.name) - if self.config["twin_q"]: - with tf.variable_scope(TWIN_Q_TARGET_SCOPE) as scope: - twin_q_tp1, _ = self._build_q_network( - self.obs_tp1, observation_space, action_space, - policy_tp1_smoothed) - twin_target_q_func_vars = scope_vars(scope.name) - - if self.config["twin_q"]: - self.critic_loss, self.actor_loss, self.td_error \ - = self._build_actor_critic_loss( - q_t, q_tp1, q_t_det_policy, twin_q_t=twin_q_t, - twin_q_tp1=twin_q_tp1) - else: - self.critic_loss, self.actor_loss, self.td_error \ - = self._build_actor_critic_loss( - q_t, q_tp1, q_t_det_policy) - - if config["l2_reg"] is not None: - for var in self.policy_vars: - if "bias" not in var.name: - self.actor_loss += (config["l2_reg"] * tf.nn.l2_loss(var)) - for var in self.q_func_vars: - if "bias" not in var.name: - self.critic_loss += (config["l2_reg"] * tf.nn.l2_loss(var)) - if self.config["twin_q"]: - for var in self.twin_q_func_vars: - if "bias" not in var.name: - self.critic_loss += ( - config["l2_reg"] * tf.nn.l2_loss(var)) - - # update_target_fn will be called periodically to copy Q network to - # target Q network - self.tau_value = config.get("tau") - self.tau = tf.placeholder(tf.float32, (), name="tau") - update_target_expr = [] - for var, var_target in zip( - sorted(self.q_func_vars, key=lambda v: v.name), - sorted(target_q_func_vars, key=lambda v: v.name)): - update_target_expr.append( - var_target.assign(self.tau * var + - (1.0 - self.tau) * var_target)) - if self.config["twin_q"]: - for var, var_target in zip( - sorted(self.twin_q_func_vars, key=lambda v: v.name), - sorted(twin_target_q_func_vars, key=lambda v: v.name)): - update_target_expr.append( - var_target.assign(self.tau * var + - (1.0 - self.tau) * var_target)) - for var, var_target in zip( - sorted(self.policy_vars, key=lambda v: v.name), - sorted(target_policy_vars, key=lambda v: v.name)): - update_target_expr.append( - var_target.assign(self.tau * var + - (1.0 - self.tau) * var_target)) - self.update_target_expr = tf.group(*update_target_expr) - - self.sess = tf.get_default_session() - self.loss_inputs = [ - (SampleBatch.CUR_OBS, self.obs_t), - (SampleBatch.ACTIONS, self.act_t), - (SampleBatch.REWARDS, self.rew_t), - (SampleBatch.NEXT_OBS, self.obs_tp1), - (SampleBatch.DONES, self.done_mask), - (PRIO_WEIGHTS, self.importance_weights), + def adjust_param_noise_sigma(self, sample_batch): + # adjust the sigma of parameter space noise + states, noisy_actions = [ + list(x) for x in sample_batch.columns( + [SampleBatch.CUR_OBS, SampleBatch.ACTIONS]) ] - input_dict = dict(self.loss_inputs) + self.get_session().run(self.model.remove_noise_op) + clean_actions = self.get_session().run( + self.output_actions, + feed_dict={ + self.get_placeholder(SampleBatch.CUR_OBS): states, + self.stochastic: False, + self.noise_scale: .0, + self.pure_exploration_phase: False, + }) + distance_in_action_space = np.sqrt( + np.mean(np.square(clean_actions - noisy_actions))) + self.model.update_action_noise( + self.get_session(), distance_in_action_space, + self.config["exploration_ou_sigma"], self.cur_noise_scale) - if self.config["use_state_preprocessor"]: - # Model self-supervised losses - self.actor_loss = self.policy_model.custom_loss( - self.actor_loss, input_dict) - self.critic_loss = self.q_model.custom_loss( - self.critic_loss, input_dict) - if self.config["twin_q"]: - self.critic_loss = self.twin_q_model.custom_loss( - self.critic_loss, input_dict) + def set_epsilon(self, epsilon): + # set_epsilon is called by optimizer to anneal exploration as + # necessary, and to turn it off during evaluation. The "epsilon" part + # is a carry-over from DQN, which uses epsilon-greedy exploration + # rather than adding action noise to the output of a policy network. + self.cur_noise_scale = epsilon - TFPolicy.__init__( - self, - observation_space, - action_space, - self.sess, - obs_input=self.cur_observations, - action_sampler=self.output_actions, - loss=self.actor_loss + self.critic_loss, - loss_inputs=self.loss_inputs, - update_ops=q_batchnorm_update_ops + policy_batchnorm_update_ops) - self.sess.run(tf.global_variables_initializer()) - - # Note that this encompasses both the policy and Q-value networks and - # their corresponding target networks - self.variables = ray.experimental.tf_utils.TensorFlowVariables( - tf.group(q_t_det_policy, q_tp1), self.sess) - - # Hard initial update - self.update_target(tau=1.0) - - @override(TFPolicy) - def optimizer(self): - # we don't use this because we have two separate optimisers - return None - - @override(TFPolicy) - def build_apply_op(self, optimizer, grads_and_vars): - # for policy gradient, update policy net one time v.s. - # update critic net `policy_delay` time(s) - should_apply_actor_opt = tf.equal( - tf.mod(self.global_step, self.config["policy_delay"]), 0) - - def make_apply_op(): - return self._actor_optimizer.apply_gradients( - self._actor_grads_and_vars) - - actor_op = tf.cond( - should_apply_actor_opt, - true_fn=make_apply_op, - false_fn=lambda: tf.no_op()) - critic_op = self._critic_optimizer.apply_gradients( - self._critic_grads_and_vars) - # increment global step & apply ops - with tf.control_dependencies([tf.assign_add(self.global_step, 1)]): - return tf.group(actor_op, critic_op) - - @override(TFPolicy) - def gradients(self, optimizer, loss): - if self.config["grad_norm_clipping"] is not None: - actor_grads_and_vars = minimize_and_clip( - self._actor_optimizer, - self.actor_loss, - var_list=self.policy_vars, - clip_val=self.config["grad_norm_clipping"]) - critic_grads_and_vars = minimize_and_clip( - self._critic_optimizer, - self.critic_loss, - var_list=self.q_func_vars + self.twin_q_func_vars - if self.config["twin_q"] else self.q_func_vars, - clip_val=self.config["grad_norm_clipping"]) - else: - actor_grads_and_vars = self._actor_optimizer.compute_gradients( - self.actor_loss, var_list=self.policy_vars) - if self.config["twin_q"]: - critic_vars = self.q_func_vars + self.twin_q_func_vars - else: - critic_vars = self.q_func_vars - critic_grads_and_vars = self._critic_optimizer.compute_gradients( - self.critic_loss, var_list=critic_vars) - # save these for later use in build_apply_op - self._actor_grads_and_vars = [(g, v) for (g, v) in actor_grads_and_vars - if g is not None] - self._critic_grads_and_vars = [(g, v) - for (g, v) in critic_grads_and_vars - if g is not None] - grads_and_vars = self._actor_grads_and_vars \ - + self._critic_grads_and_vars - return grads_and_vars - - @override(TFPolicy) - def extra_compute_action_feed_dict(self): - return { - # FIXME: what about turning off exploration? Isn't that a good - # idea? - self.stochastic: True, - self.noise_scale: self.cur_noise_scale, - self.pure_exploration_phase: self.cur_pure_exploration_phase, - } - - @override(TFPolicy) - def extra_compute_grad_fetches(self): - return { - "td_error": self.td_error, - LEARNER_STATS_KEY: self.stats, - } - - @override(TFPolicy) - def get_weights(self): - return self.variables.get_weights() - - @override(TFPolicy) - def set_weights(self, weights): - self.variables.set_weights(weights) + def set_pure_exploration_phase(self, pure_exploration_phase): + self.cur_pure_exploration_phase = pure_exploration_phase @override(Policy) def get_state(self): @@ -405,253 +413,95 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): self.set_epsilon(state[1]) self.set_pure_exploration_phase(state[2]) - def _build_q_network(self, obs, obs_space, action_space, actions): - if self.config["use_state_preprocessor"]: - q_model = ModelCatalog.get_model({ - "obs": obs, - "is_training": self._get_is_training_placeholder(), - }, obs_space, action_space, 1, self.config["model"]) - q_out = tf.concat([q_model.last_layer, actions], axis=1) - else: - q_model = None - q_out = tf.concat([obs, actions], axis=1) - activation = getattr(tf.nn, self.config["critic_hidden_activation"]) - for hidden in self.config["critic_hiddens"]: - q_out = tf.layers.dense(q_out, units=hidden, activation=activation) - q_values = tf.layers.dense(q_out, units=1, activation=None) +class TargetNetworkMixin(object): + def __init__(self, config): + # update_target_fn will be called periodically to copy Q network to + # target Q network + self.tau_value = config.get("tau") + self.tau = tf.placeholder(tf.float32, (), name="tau") + update_target_expr = [] + model_vars = self.model.trainable_variables() + target_model_vars = self.target_model.trainable_variables() + assert len(model_vars) == len(target_model_vars), \ + (model_vars, target_model_vars) + for var, var_target in zip(model_vars, target_model_vars): + update_target_expr.append( + var_target.assign(self.tau * var + + (1.0 - self.tau) * var_target)) + logger.debug("Update target op {}".format(var_target)) + self.update_target_expr = tf.group(*update_target_expr) - return q_values, q_model - - def _build_policy_network(self, obs, obs_space, action_space): - if self.config["use_state_preprocessor"]: - model = ModelCatalog.get_model({ - "obs": obs, - "is_training": self._get_is_training_placeholder(), - }, obs_space, action_space, 1, self.config["model"]) - action_out = model.last_layer - else: - model = None - action_out = obs - - activation = getattr(tf.nn, self.config["actor_hidden_activation"]) - for hidden in self.config["actor_hiddens"]: - if self.config["parameter_noise"]: - import tensorflow.contrib.layers as layers - action_out = layers.fully_connected( - action_out, - num_outputs=hidden, - activation_fn=activation, - normalizer_fn=layers.layer_norm) - else: - action_out = tf.layers.dense( - action_out, units=hidden, activation=activation) - action_out = tf.layers.dense( - action_out, units=self.dim_actions, activation=None) - - # Use sigmoid to scale to [0,1], but also double magnitude of input to - # emulate behaviour of tanh activation used in DDPG and TD3 papers. - sigmoid_out = tf.nn.sigmoid(2 * action_out) - # Rescale to actual env policy scale - # (shape of sigmoid_out is [batch_size, dim_actions], so we reshape to - # get same dims) - action_range = (action_space.high - action_space.low)[None] - low_action = action_space.low[None] - actions = action_range * sigmoid_out + low_action - - return actions, model - - def _add_exploration_noise(self, deterministic_actions, - should_be_stochastic, noise_scale, - enable_pure_exploration, action_space): - noise_type = self.config["exploration_noise_type"] - action_low = action_space.low - action_high = action_space.high - action_range = action_space.high - action_low - - def compute_stochastic_actions(): - def make_noisy_actions(): - # shape of deterministic_actions is [None, dim_action] - if noise_type == "gaussian": - # add IID Gaussian noise for exploration, TD3-style - normal_sample = noise_scale * tf.random_normal( - tf.shape(deterministic_actions), - stddev=self.config["exploration_gaussian_sigma"]) - stochastic_actions = tf.clip_by_value( - deterministic_actions + normal_sample, - action_low * tf.ones_like(deterministic_actions), - action_high * tf.ones_like(deterministic_actions)) - elif noise_type == "ou": - # add OU noise for exploration, DDPG-style - zero_acts = action_low.size * [.0] - exploration_sample = tf.get_variable( - name="ornstein_uhlenbeck", - dtype=tf.float32, - initializer=zero_acts, - trainable=False) - normal_sample = tf.random_normal( - shape=[action_low.size], mean=0.0, stddev=1.0) - ou_new = self.config["exploration_ou_theta"] \ - * -exploration_sample \ - + self.config["exploration_ou_sigma"] * normal_sample - exploration_value = tf.assign_add(exploration_sample, - ou_new) - base_scale = self.config["exploration_ou_noise_scale"] - noise = noise_scale * base_scale \ - * exploration_value * action_range - stochastic_actions = tf.clip_by_value( - deterministic_actions + noise, - action_low * tf.ones_like(deterministic_actions), - action_high * tf.ones_like(deterministic_actions)) - else: - raise ValueError( - "Unknown noise type '%s' (try 'ou' or 'gaussian')" % - noise_type) - return stochastic_actions - - def make_uniform_random_actions(): - # pure random exploration option - uniform_random_actions = tf.random_uniform( - tf.shape(deterministic_actions)) - # rescale uniform random actions according to action range - tf_range = tf.constant(action_range[None], dtype="float32") - tf_low = tf.constant(action_low[None], dtype="float32") - uniform_random_actions = uniform_random_actions * tf_range \ - + tf_low - return uniform_random_actions - - stochastic_actions = tf.cond( - # need to condition on noise_scale > 0 because zeroing - # noise_scale is how a worker signals no noise should be used - # (this is ugly and should be fixed by adding an "eval_mode" - # config flag or something) - tf.logical_and(enable_pure_exploration, noise_scale > 0), - true_fn=make_uniform_random_actions, - false_fn=make_noisy_actions) - return stochastic_actions - - enable_stochastic = tf.logical_and(should_be_stochastic, - not self.config["parameter_noise"]) - actions = tf.cond(enable_stochastic, compute_stochastic_actions, - lambda: deterministic_actions) - return actions - - def _build_actor_critic_loss(self, - q_t, - q_tp1, - q_t_det_policy, - twin_q_t=None, - twin_q_tp1=None): - twin_q = self.config["twin_q"] - gamma = self.config["gamma"] - n_step = self.config["n_step"] - use_huber = self.config["use_huber"] - huber_threshold = self.config["huber_threshold"] - - q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) - if twin_q: - twin_q_t_selected = tf.squeeze(twin_q_t, axis=len(q_t.shape) - 1) - q_tp1 = tf.minimum(q_tp1, twin_q_tp1) - - q_tp1_best = tf.squeeze(input=q_tp1, axis=len(q_tp1.shape) - 1) - q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best - - # compute RHS of bellman equation - q_t_selected_target = tf.stop_gradient( - self.rew_t + gamma**n_step * q_tp1_best_masked) - - # compute the error (potentially clipped) - if twin_q: - td_error = q_t_selected - q_t_selected_target - twin_td_error = twin_q_t_selected - q_t_selected_target - td_error = td_error + twin_td_error - if use_huber: - errors = huber_loss(td_error, huber_threshold) \ - + huber_loss(twin_td_error, huber_threshold) - else: - errors = 0.5 * tf.square(td_error) + 0.5 * tf.square( - twin_td_error) - else: - td_error = q_t_selected - q_t_selected_target - if use_huber: - errors = huber_loss(td_error, huber_threshold) - else: - errors = 0.5 * tf.square(td_error) - - critic_loss = tf.reduce_mean(self.importance_weights * errors) - actor_loss = -tf.reduce_mean(q_t_det_policy) - return critic_loss, actor_loss, td_error - - def _build_parameter_noise(self, pnet_params): - self.parameter_noise_sigma_val = self.config["exploration_ou_sigma"] - self.parameter_noise_sigma = tf.get_variable( - initializer=tf.constant_initializer( - self.parameter_noise_sigma_val), - name="parameter_noise_sigma", - shape=(), - trainable=False, - dtype=tf.float32) - self.parameter_noise = list() - # No need to add any noise on LayerNorm parameters - for var in pnet_params: - noise_var = tf.get_variable( - name=var.name.split(":")[0] + "_noise", - shape=var.shape, - initializer=tf.constant_initializer(.0), - trainable=False) - self.parameter_noise.append(noise_var) - remove_noise_ops = list() - for var, var_noise in zip(pnet_params, self.parameter_noise): - remove_noise_ops.append(tf.assign_add(var, -var_noise)) - self.remove_noise_op = tf.group(*tuple(remove_noise_ops)) - generate_noise_ops = list() - for var_noise in self.parameter_noise: - generate_noise_ops.append( - tf.assign( - var_noise, - tf.random_normal( - shape=var_noise.shape, - stddev=self.parameter_noise_sigma))) - with tf.control_dependencies(generate_noise_ops): - add_noise_ops = list() - for var, var_noise in zip(pnet_params, self.parameter_noise): - add_noise_ops.append(tf.assign_add(var, var_noise)) - self.add_noise_op = tf.group(*tuple(add_noise_ops)) - self.pi_distance = None - - def compute_td_error(self, obs_t, act_t, rew_t, obs_tp1, done_mask, - importance_weights): - td_err = self.sess.run( - self.td_error, - feed_dict={ - self.obs_t: [np.array(ob) for ob in obs_t], - self.act_t: act_t, - self.rew_t: rew_t, - self.obs_tp1: [np.array(ob) for ob in obs_tp1], - self.done_mask: done_mask, - self.importance_weights: importance_weights - }) - return td_err - - def reset_noise(self, sess): - sess.run(self.reset_noise_op) - - def add_parameter_noise(self): - if self.config["parameter_noise"]: - self.sess.run(self.add_noise_op) + # Hard initial update + self.update_target(tau=1.0) # support both hard and soft sync def update_target(self, tau=None): tau = tau or self.tau_value - return self.sess.run( + return self.get_session().run( self.update_target_expr, feed_dict={self.tau: tau}) - def set_epsilon(self, epsilon): - # set_epsilon is called by optimizer to anneal exploration as - # necessary, and to turn it off during evaluation. The "epsilon" part - # is a carry-over from DQN, which uses epsilon-greedy exploration - # rather than adding action noise to the output of a policy network. - self.cur_noise_scale = epsilon - def set_pure_exploration_phase(self, pure_exploration_phase): - self.cur_pure_exploration_phase = pure_exploration_phase +class ActorCriticOptimizerMixin(object): + def __init__(self, config): + # create global step for counting the number of update operations + self.global_step = tf.train.get_or_create_global_step() + + # use separate optimizers for actor & critic + self._actor_optimizer = tf.train.AdamOptimizer( + learning_rate=config["actor_lr"]) + self._critic_optimizer = tf.train.AdamOptimizer( + learning_rate=config["critic_lr"]) + + +class ComputeTDErrorMixin(object): + def compute_td_error(self, obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + if not self.loss_initialized(): + return np.zeros_like(rew_t) + + td_err = self.get_session().run( + self.td_error, + feed_dict={ + self.get_placeholder(SampleBatch.CUR_OBS): [ + np.array(ob) for ob in obs_t + ], + self.get_placeholder(SampleBatch.ACTIONS): act_t, + self.get_placeholder(SampleBatch.REWARDS): rew_t, + self.get_placeholder(SampleBatch.NEXT_OBS): [ + np.array(ob) for ob in obs_tp1 + ], + self.get_placeholder(SampleBatch.DONES): done_mask, + self.get_placeholder(PRIO_WEIGHTS): importance_weights + }) + return td_err + + +def setup_early_mixins(policy, obs_space, action_space, config): + ExplorationStateMixin.__init__(policy, obs_space, action_space, config) + ActorCriticOptimizerMixin.__init__(policy, config) + + +def setup_late_mixins(policy, obs_space, action_space, config): + TargetNetworkMixin.__init__(policy, config) + + +DDPGTFPolicy = build_tf_policy( + name="DDPGTFPolicy", + get_default_config=lambda: ray.rllib.agents.ddpg.ddpg.DEFAULT_CONFIG, + make_model=build_ddpg_model, + postprocess_fn=postprocess_trajectory, + extra_action_feed_fn=exploration_setting_inputs, + action_sampler_fn=build_action_output, + loss_fn=actor_critic_loss, + stats_fn=stats, + gradients_fn=gradients, + apply_gradients_fn=apply_gradients, + extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, + mixins=[ + TargetNetworkMixin, ExplorationStateMixin, ActorCriticOptimizerMixin, + ComputeTDErrorMixin + ], + before_init=setup_early_mixins, + after_init=setup_late_mixins, + obs_include_prev_action_reward=False) diff --git a/python/ray/rllib/agents/ddpg/noop_model.py b/python/ray/rllib/agents/ddpg/noop_model.py new file mode 100644 index 000000000..d81dcf2b3 --- /dev/null +++ b/python/ray/rllib/agents/ddpg/noop_model.py @@ -0,0 +1,20 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.models import Model +from ray.rllib.utils.annotations import override +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + + +class NoopModel(Model): + """Trivial model that just returns the obs flattened. + + This is the model used if use_state_preprocessor=False.""" + + @override(Model) + def _build_layers_v2(self, input_dict, num_outputs, options): + out = tf.reshape(input_dict["obs"], [-1, num_outputs]) + return out, out diff --git a/python/ray/rllib/agents/dqn/distributional_q_model.py b/python/ray/rllib/agents/dqn/distributional_q_model.py index adede90f9..70724dd30 100644 --- a/python/ray/rllib/agents/dqn/distributional_q_model.py +++ b/python/ray/rllib/agents/dqn/distributional_q_model.py @@ -15,6 +15,11 @@ class DistributionalQModel(TFModelV2): It also supports options for noisy nets and parameter space noise. + Data flow: + obs -> forward() -> model_out + model_out -> get_q_value_distributions() -> Q(s, a) atoms + model_out -> get_state_value() -> V(s) + Note that this class by itself is not a valid model unless you implement forward() in a subclass.""" diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index 3541d147d..cb862c05c 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -187,7 +187,7 @@ def check_config_and_setup_param_noise(config): policies = info["policy"] episode = info["episode"] episode.custom_metrics["policy_distance"] = policies[ - DEFAULT_POLICY_ID].pi_distance + DEFAULT_POLICY_ID].model.pi_distance if end_callback: end_callback(info) @@ -207,6 +207,7 @@ def make_exploration_schedule(config, worker_index): assert config["num_workers"] > 1, \ "This requires multiple workers" if worker_index >= 0: + # Exploration constants from the Ape-X paper exponent = ( 1 + worker_index / float(config["num_workers"] - 1) * 7) return ConstantSchedule(0.4**exponent) diff --git a/python/ray/rllib/agents/dqn/dqn_policy.py b/python/ray/rllib/agents/dqn/dqn_policy.py index 0865b24d5..76159ad81 100644 --- a/python/ray/rllib/agents/dqn/dqn_policy.py +++ b/python/ray/rllib/agents/dqn/dqn_policy.py @@ -300,12 +300,9 @@ def _build_parameter_noise(policy, pnet_params): def build_q_losses(policy, batch_tensors): config = policy.config # q network evaluation - prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) q_t, q_logits_t, q_dist_t = _compute_q_values( policy, policy.q_model, batch_tensors[SampleBatch.CUR_OBS], policy.observation_space, policy.action_space) - policy.q_batchnorm_update_ops = list( - set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops) # target q network evalution q_tp1, q_logits_tp1, q_dist_tp1 = _compute_q_values( @@ -495,7 +492,6 @@ DQNTFPolicy = build_tf_policy( extra_action_feed_fn=exploration_setting_inputs, extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, extra_learn_fetches_fn=lambda policy: {"td_error": policy.q_loss.td_error}, - update_ops_fn=lambda policy: policy.q_batchnorm_update_ops, before_init=setup_early_mixins, after_init=setup_late_mixins, obs_include_prev_action_reward=False, diff --git a/python/ray/rllib/agents/dqn/simple_q_model.py b/python/ray/rllib/agents/dqn/simple_q_model.py index 3efec7921..2cf68c3cf 100644 --- a/python/ray/rllib/agents/dqn/simple_q_model.py +++ b/python/ray/rllib/agents/dqn/simple_q_model.py @@ -13,6 +13,10 @@ tf = try_import_tf() class SimpleQModel(TFModelV2): """Extension of standard TFModel to provide Q values. + Data flow: + obs -> forward() -> model_out + model_out -> get_q_values() -> Q(s, a) + Note that this class by itself is not a valid model unless you implement forward() in a subclass.""" diff --git a/python/ray/rllib/evaluation/rollout_worker.py b/python/ray/rllib/evaluation/rollout_worker.py index 2e1269e74..044f19fea 100644 --- a/python/ray/rllib/evaluation/rollout_worker.py +++ b/python/ray/rllib/evaluation/rollout_worker.py @@ -301,6 +301,15 @@ class RolloutWorker(EvaluatorInterface): if seed is not None: np.random.seed(seed) random.seed(seed) + if not hasattr(self.env, "seed"): + raise ValueError("Env doesn't support env.seed(): {}".format( + self.env)) + self.env.seed(seed) + try: + import torch + torch.manual_seed(seed) + except ImportError: + logger.info("Could not seed torch") if _has_tensorflow_graph(policy_dict): if (ray.is_initialized() and ray.worker._mode() != ray.worker.LOCAL_MODE diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index 4d3723c82..4bdda4ecf 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -211,6 +211,7 @@ class ModelCatalog(object): framework="tf", name=None, model_interface=None, + default_model=None, **model_kwargs): """Returns a suitable model compatible with given spaces and output. @@ -223,6 +224,8 @@ class ModelCatalog(object): framework (str): Either "tf" or "torch". name (str): Name (scope) for the model. model_interface (cls): Interface required for the model + default_model (cls): Override the default class for the model. This + only has an effect when not using a custom model model_kwargs (dict): args to pass to the ModelV2 constructor Returns: @@ -263,7 +266,7 @@ class ModelCatalog(object): return instance if framework == "tf": - legacy_model_cls = ModelCatalog.get_model + legacy_model_cls = default_model or ModelCatalog.get_model wrapper = ModelCatalog._wrap_if_needed( make_v1_wrapper(legacy_model_cls), model_interface) return wrapper(obs_space, action_space, num_outputs, model_config, diff --git a/python/ray/rllib/models/modelv2.py b/python/ray/rllib/models/modelv2.py index 4705532b1..a5162be37 100644 --- a/python/ray/rllib/models/modelv2.py +++ b/python/ray/rllib/models/modelv2.py @@ -9,7 +9,11 @@ class ModelV2(object): """Defines a Keras-style abstract network model for use with RLlib. Custom models should extend either TFModelV2 or TorchModelV2 instead of - this class directly. Experimental. + this class directly. + + Data flow: + obs -> forward() -> model_out + value_function() -> V(s) Attributes: obs_space (Space): observation space of the target gym env. This diff --git a/python/ray/rllib/models/tf/modelv1_compat.py b/python/ray/rllib/models/tf/modelv1_compat.py index eb3dbe5fa..b55a6cf98 100644 --- a/python/ray/rllib/models/tf/modelv1_compat.py +++ b/python/ray/rllib/models/tf/modelv1_compat.py @@ -50,6 +50,9 @@ def make_v1_wrapper(legacy_model_cls): # Tracks branches created so far self.branches_created = set() + # Tracks update ops + self._update_ops = None + with tf.variable_scope(self.name) as scope: self.variable_scope = scope @@ -68,9 +71,14 @@ def make_v1_wrapper(legacy_model_cls): else: # create a new model instance with tf.variable_scope(self.name): + prev_update_ops = set( + tf.get_collection(tf.GraphKeys.UPDATE_OPS)) new_instance = self.legacy_model_cls( input_dict, self.obs_space, self.action_space, self.num_outputs, self.model_config, state, seq_lens) + self._update_ops = list( + set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - + prev_update_ops) if len(new_instance.state_init) != len(self.get_initial_state()): raise ValueError( "When using a custom recurrent ModelV1 model, you should " @@ -83,6 +91,13 @@ def make_v1_wrapper(legacy_model_cls): self.variable_scope = new_instance.scope return new_instance.outputs, new_instance.state_out + @override(TFModelV2) + def update_ops(self): + if self._update_ops is None: + raise ValueError( + "Cannot get update ops before wrapped v1 model init") + return list(self._update_ops) + @override(ModelV2) def variables(self): var_list = super(ModelV1Wrapper, self).variables() diff --git a/python/ray/rllib/models/tf/tf_modelv2.py b/python/ray/rllib/models/tf/tf_modelv2.py index 34cbbe40d..26fb117d6 100644 --- a/python/ray/rllib/models/tf/tf_modelv2.py +++ b/python/ray/rllib/models/tf/tf_modelv2.py @@ -21,3 +21,9 @@ class TFModelV2(ModelV2): model_config, name, framework="tf") + + def update_ops(self): + """Return the list of update ops for this model. + + For example, this should include any BatchNorm update ops.""" + return [] diff --git a/python/ray/rllib/policy/dynamic_tf_policy.py b/python/ray/rllib/policy/dynamic_tf_policy.py index 256fb1a6c..d7a68c064 100644 --- a/python/ray/rllib/policy/dynamic_tf_policy.py +++ b/python/ray/rllib/policy/dynamic_tf_policy.py @@ -39,7 +39,6 @@ class DynamicTFPolicy(TFPolicy): config, loss_fn, stats_fn=None, - update_ops_fn=None, grad_stats_fn=None, before_loss_init=None, make_model=None, @@ -60,8 +59,6 @@ class DynamicTFPolicy(TFPolicy): TF fetches given the policy and batch input tensors grad_stats_fn (func): optional function that returns a dict of TF fetches given the policy and loss gradient tensors - update_ops_fn (func): optional function that returns a list - overriding the update ops to run when applying gradients before_loss_init (func): optional function to run prior to loss init that takes the same arguments as __init__ make_model (func): optional function that returns a ModelV2 object @@ -95,7 +92,6 @@ class DynamicTFPolicy(TFPolicy): self._loss_fn = loss_fn self._stats_fn = stats_fn self._grad_stats_fn = grad_stats_fn - self._update_ops_fn = update_ops_fn self._obs_include_prev_action_reward = obs_include_prev_action_reward # Setup standard placeholders @@ -127,8 +123,14 @@ class DynamicTFPolicy(TFPolicy): dtype=tf.int32, shape=[None], name="seq_lens") # Setup model - self.dist_class, logit_dim = ModelCatalog.get_action_dist( - action_space, self.config["model"]) + if action_sampler_fn: + if not make_model: + raise ValueError( + "make_model is required if action_sampler_fn is given") + self.dist_class = None + else: + self.dist_class, logit_dim = ModelCatalog.get_action_dist( + action_space, self.config["model"]) if existing_model: self.model = existing_model elif make_model: @@ -158,7 +160,6 @@ class DynamicTFPolicy(TFPolicy): # Setup action sampler if action_sampler_fn: self.action_dist = None - self.dist_class = None action_sampler, action_prob = action_sampler_fn( self, self.model, self.input_dict, obs_space, action_space, config) @@ -335,6 +336,6 @@ class DynamicTFPolicy(TFPolicy): loss = self._loss_fn(self, batch_tensors) if self._stats_fn: self._stats_fetches.update(self._stats_fn(self, batch_tensors)) - if self._update_ops_fn: - self._update_ops = self._update_ops_fn(self) + # override the update ops to be those of the model + self._update_ops = self.model.update_ops() return loss diff --git a/python/ray/rllib/policy/tf_policy.py b/python/ray/rllib/policy/tf_policy.py index c5a53abbc..b46506474 100644 --- a/python/ray/rllib/policy/tf_policy.py +++ b/python/ray/rllib/policy/tf_policy.py @@ -202,7 +202,7 @@ class TFPolicy(Policy): self._update_ops = tf.get_collection( tf.GraphKeys.UPDATE_OPS, scope=tf.get_variable_scope().name) if self._update_ops: - logger.debug("Update ops to run on apply gradient: {}".format( + logger.info("Update ops to run on apply gradient: {}".format( self._update_ops)) with tf.control_dependencies(self._update_ops): self._apply_op = self.build_apply_op(self._optimizer, diff --git a/python/ray/rllib/policy/tf_policy_template.py b/python/ray/rllib/policy/tf_policy_template.py index c75e618e0..e44a6dac5 100644 --- a/python/ray/rllib/policy/tf_policy_template.py +++ b/python/ray/rllib/policy/tf_policy_template.py @@ -15,9 +15,9 @@ def build_tf_policy(name, get_default_config=None, postprocess_fn=None, stats_fn=None, - update_ops_fn=None, optimizer_fn=None, gradients_fn=None, + apply_gradients_fn=None, grad_stats_fn=None, extra_action_fetches_fn=None, extra_action_feed_fn=None, @@ -35,8 +35,9 @@ def build_tf_policy(name, Functions will be run in this order to initialize the policy: 1. Placeholder setup: postprocess_fn - 2. Loss init: loss_fn, stats_fn, update_ops_fn - 3. Optimizer init: optimizer_fn, gradients_fn, grad_stats_fn + 2. Loss init: loss_fn, stats_fn + 3. Optimizer init: optimizer_fn, gradients_fn, apply_gradients_fn, + grad_stats_fn This means that you can e.g., depend on any policy attributes created in the running of `loss_fn` in later functions such as `stats_fn`. @@ -58,13 +59,13 @@ def build_tf_policy(name, that takes the same args as Policy.postprocess_trajectory() stats_fn (func): optional function that returns a dict of TF fetches given the policy and batch input tensors - update_ops_fn (func): optional function that returns a list overriding - the update ops to run when applying gradients optimizer_fn (func): optional function that returns a tf.Optimizer given the policy and config gradients_fn (func): optional function that returns a list of gradients - given a tf optimizer and loss tensor. If not specified, this + given (policy, optimizer, loss). If not specified, this defaults to optimizer.compute_gradients(loss) + apply_gradients_fn (func): optional function that returns an apply + gradients op given (policy, optimizer, grads_and_vars) grad_stats_fn (func): optional function that returns a dict of TF fetches given the policy and loss gradient tensors extra_action_fetches_fn (func): optional function that returns @@ -134,7 +135,6 @@ def build_tf_policy(name, loss_fn, stats_fn=stats_fn, grad_stats_fn=grad_stats_fn, - update_ops_fn=update_ops_fn, before_loss_init=before_loss_init_wrapper, make_model=make_model, action_sampler_fn=action_sampler_fn, @@ -170,6 +170,13 @@ def build_tf_policy(name, else: return TFPolicy.gradients(self, optimizer, loss) + @override(TFPolicy) + def build_apply_op(self, optimizer, grads_and_vars): + if apply_gradients_fn: + return apply_gradients_fn(self, optimizer, grads_and_vars) + else: + return TFPolicy.build_apply_op(self, optimizer, grads_and_vars) + @override(TFPolicy) def extra_compute_action_fetches(self): return dict(