[rllib] Revert [rllib] Port DDPG to the build_tf_policy pattern (#5626)

This commit is contained in:
Eric Liang
2019-09-04 21:39:22 -07:00
committed by Philipp Moritz
parent f38bb288e2
commit 19bbf1eb4d
7 changed files with 658 additions and 761 deletions
+1 -1
View File
@@ -54,7 +54,7 @@ PICKLE_OBJECT_WARNING_SIZE = 10**7
# The maximum resource quantity that is allowed. TODO(rkn): This could be
# relaxed, but the current implementation of the node manager will be slower
# for large resource quantities due to bookkeeping of specific resource IDs.
MAX_RESOURCE_QUANTITY = 10000
MAX_RESOURCE_QUANTITY = 20000
# Each memory "resource" counts as this many bytes of memory.
MEMORY_RESOURCE_UNIT_BYTES = 50 * 1024 * 1024
+2 -2
View File
@@ -41,7 +41,7 @@ DEFAULT_CONFIG = with_common_config({
# === Model ===
# Apply a state preprocessor with spec given by the "model" config option
# (like other RL algorithms). This is mostly useful if you have a weird
# observation shape, like an image. Auto-enabled if a custom model is set.
# observation shape, like an image. Disabled by default.
"use_state_preprocessor": False,
# Postprocess the policy network model output with these hidden layers. If
# use_state_preprocessor is False, then these will be the *only* hidden
@@ -173,7 +173,7 @@ def make_exploration_schedule(config, worker_index):
if config["per_worker_exploration"]:
assert config["num_workers"] > 1, "This requires multiple workers"
if worker_index >= 0:
# Exploration constants from the Ape-X paper
# FIXME: what do magic constants mean? (0.4, 7)
max_index = float(config["num_workers"] - 1)
exponent = 1 + worker_index / max_index * 7
return ConstantSchedule(0.4**exponent)
-262
View File
@@ -1,262 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
class DDPGModel(TFModelV2):
"""Extension of standard TFModel for DDPG.
Data flow:
obs -> forward() -> model_out
model_out -> get_policy_output() -> pi(s)
model_out, actions -> get_q_values() -> Q(s, a)
model_out, actions -> get_twin_q_values() -> Q_twin(s, a)
Note that this class by itself is not a valid model unless you
implement forward() in a subclass."""
def __init__(self,
obs_space,
action_space,
num_outputs,
model_config,
name,
actor_hidden_activation="relu",
actor_hiddens=(400, 300),
critic_hidden_activation="relu",
critic_hiddens=(400, 300),
parameter_noise=False,
twin_q=False,
exploration_ou_sigma=0.2):
"""Initialize variables of this model.
Extra model kwargs:
actor_hidden_activation (str): activation for actor network
actor_hiddens (list): hidden layers sizes for actor network
critic_hidden_activation (str): activation for critic network
critic_hiddens (list): hidden layers sizes for critic network
parameter_noise (bool): use param noise exploration
twin_q (bool): build twin Q networks
exploration_ou_sigma (float): ou noise sigma for exploration
Note that the core layers for forward() are not defined here, this
only defines the layers for the output heads. Those layers for
forward() should be defined in subclasses of DDPGModel.
"""
super(DDPGModel, self).__init__(obs_space, action_space, num_outputs,
model_config, name)
self.exploration_ou_sigma = exploration_ou_sigma
self.action_dim = np.product(action_space.shape)
self.model_out = tf.keras.layers.Input(
shape=(num_outputs, ), name="model_out")
self.actions = tf.keras.layers.Input(
shape=(self.action_dim, ), name="actions")
def build_action_net(action_out):
assert action_out.dtype == tf.float32
activation = getattr(tf.nn, actor_hidden_activation)
i = 0
for hidden in actor_hiddens:
if parameter_noise:
import tensorflow.contrib.layers as layers
action_out = layers.fully_connected(
action_out,
num_outputs=hidden,
activation_fn=activation,
normalizer_fn=layers.layer_norm)
else:
action_out = tf.layers.dense(
action_out,
units=hidden,
activation=activation,
name="action_hidden_{}".format(i))
i += 1
return tf.layers.dense(
action_out,
units=self.action_dim,
activation=None,
name="action_out")
action_scope = name + "/action_net"
# Save the scope object, since in eager we will execute this
# path repeatedly and there is no guarantee it will always be run
# in the same original scope.
with tf.variable_scope(action_scope) as action_scope_handle:
pass
# TODO(ekl) use keras layers instead of variable scopes
if tf.executing_eagerly():
# Have to use a variable store to reuse variables in eager mode
import tensorflow.contrib as tfc
store = tfc.eager.EagerVariableStore()
def build_action_net_scope(model_out):
with store.as_default():
with tf.variable_scope(
action_scope_handle, reuse=tf.AUTO_REUSE):
return build_action_net(model_out)
else:
def build_action_net_scope(model_out):
with tf.variable_scope(
action_scope_handle, reuse=tf.AUTO_REUSE):
return build_action_net(model_out)
pi_out = tf.keras.layers.Lambda(build_action_net_scope)(self.model_out)
self.action_net = tf.keras.Model(self.model_out, pi_out)
self.register_variables(self.action_net.variables)
# Noise vars for P network except for layer normalization vars
if parameter_noise:
assert not tf.executing_eagerly(), "eager p noise not implemented"
with tf.variable_scope(action_scope_handle, reuse=tf.AUTO_REUSE):
self._build_parameter_noise([
var for var in self.action_net.variables
if "LayerNorm" not in var.name
])
def build_q_net(name, model_out, actions):
q_out = tf.keras.layers.Concatenate(axis=1)([model_out, actions])
activation = getattr(tf.nn, critic_hidden_activation)
for i, n in enumerate(critic_hiddens):
q_out = tf.keras.layers.Dense(
n,
name="{}_hidden_{}".format(name, i),
activation=activation)(q_out)
q_out = tf.keras.layers.Dense(
1, activation=None, name="{}_out".format(name))(q_out)
return tf.keras.Model([model_out, actions], q_out)
self.q_net = build_q_net("q", self.model_out, self.actions)
self.register_variables(self.q_net.variables)
if twin_q:
self.twin_q_net = build_q_net("twin_q", self.model_out,
self.actions)
self.register_variables(self.twin_q_net.variables)
else:
self.twin_q_net = None
def get_policy_output(self, model_out):
"""Return the (unscaled) output of the policy network.
This returns the unscaled outputs of pi(s).
Arguments:
model_out (Tensor): obs embeddings from the model layers, of shape
[BATCH_SIZE, num_outputs].
Returns:
tensor of shape [BATCH_SIZE, action_dim] with range [-inf, inf].
"""
return self.action_net(model_out)
def get_q_values(self, model_out, actions):
"""Return the Q estimates for the most recent forward pass.
This implements Q(s, a).
Arguments:
model_out (Tensor): obs embeddings from the model layers, of shape
[BATCH_SIZE, num_outputs].
actions (Tensor): action values that correspond with the most
recent batch of observations passed through forward(), of shape
[BATCH_SIZE, action_dim].
Returns:
tensor of shape [BATCH_SIZE].
"""
return self.q_net([model_out, actions])
def get_twin_q_values(self, model_out, actions):
"""Same as get_q_values but using the twin Q net.
This implements the twin Q(s, a).
Arguments:
model_out (Tensor): obs embeddings from the model layers, of shape
[BATCH_SIZE, num_outputs].
actions (Tensor): action values that correspond with the most
recent batch of observations passed through forward(), of shape
[BATCH_SIZE, action_dim].
Returns:
tensor of shape [BATCH_SIZE].
"""
return self.twin_q_net([model_out, actions])
def policy_variables(self):
"""Return the list of variables for the policy net."""
return list(self.action_net.variables)
def q_variables(self):
"""Return the list of variables for Q / twin Q nets."""
return self.q_net.variables + (self.twin_q_net.variables
if self.twin_q_net else [])
def update_action_noise(self, session, distance_in_action_space,
exploration_ou_sigma, cur_noise_scale):
"""Update the model action noise settings.
This is called internally by the DDPG policy."""
self.pi_distance = distance_in_action_space
if (distance_in_action_space < exploration_ou_sigma * cur_noise_scale):
# multiplying the sampled OU noise by noise scale is
# equivalent to multiplying the sigma of OU by noise scale
self.parameter_noise_sigma_val *= 1.01
else:
self.parameter_noise_sigma_val /= 1.01
self.parameter_noise_sigma.load(
self.parameter_noise_sigma_val, session=session)
def _build_parameter_noise(self, pnet_params):
assert pnet_params
self.parameter_noise_sigma_val = self.exploration_ou_sigma
self.parameter_noise_sigma = tf.get_variable(
initializer=tf.constant_initializer(
self.parameter_noise_sigma_val),
name="parameter_noise_sigma",
shape=(),
trainable=False,
dtype=tf.float32)
self.parameter_noise = []
# No need to add any noise on LayerNorm parameters
for var in pnet_params:
noise_var = tf.get_variable(
name=var.name.split(":")[0] + "_noise",
shape=var.shape,
initializer=tf.constant_initializer(.0),
trainable=False)
self.parameter_noise.append(noise_var)
remove_noise_ops = list()
for var, var_noise in zip(pnet_params, self.parameter_noise):
remove_noise_ops.append(tf.assign_add(var, -var_noise))
self.remove_noise_op = tf.group(*tuple(remove_noise_ops))
generate_noise_ops = list()
for var_noise in self.parameter_noise:
generate_noise_ops.append(
tf.assign(
var_noise,
tf.random_normal(
shape=var_noise.shape,
stddev=self.parameter_noise_sigma)))
with tf.control_dependencies(generate_noise_ops):
add_noise_ops = list()
for var, var_noise in zip(pnet_params, self.parameter_noise):
add_noise_ops.append(tf.assign_add(var, var_noise))
self.add_noise_op = tf.group(*tuple(add_noise_ops))
self.pi_distance = None
self.register_variables(self.parameter_noise)
self.register_variables([self.parameter_noise_sigma])
File diff suppressed because it is too large Load Diff
+3 -2
View File
@@ -184,8 +184,9 @@ def check_config_and_setup_param_noise(config):
# between noisy policy and original policy
policies = info["policy"]
episode = info["episode"]
episode.custom_metrics["policy_distance"] = policies[
DEFAULT_POLICY_ID].model.pi_distance
model = policies[DEFAULT_POLICY_ID].model
if hasattr(model, "pi_distance"):
episode.custom_metrics["policy_distance"] = model.pi_distance
if end_callback:
end_callback(info)
+50 -3
View File
@@ -10,15 +10,13 @@ import ray
import ray.experimental.tf_utils
from ray.rllib.agents.sac.sac_model import SACModel
from ray.rllib.agents.ddpg.noop_model import NoopModel
from ray.rllib.agents.ddpg.ddpg_policy import ComputeTDErrorMixin, \
TargetNetworkMixin
from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn, PRIO_WEIGHTS
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.models import ModelCatalog
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils import try_import_tf, try_import_tfp
from ray.rllib.utils.tf_ops import minimize_and_clip
from ray.rllib.utils.tf_ops import minimize_and_clip, make_tf_callable
tf = try_import_tf()
tfp = try_import_tfp()
@@ -287,6 +285,55 @@ class ActorCriticOptimizerMixin(object):
learning_rate=config["optimization"]["entropy_learning_rate"])
class ComputeTDErrorMixin(object):
def __init__(self):
@make_tf_callable(self.get_session(), dynamic_shape=True)
def compute_td_error(obs_t, act_t, rew_t, obs_tp1, done_mask,
importance_weights):
if not self.loss_initialized():
return tf.zeros_like(rew_t)
# Do forward pass on loss to update td error attribute
actor_critic_loss(
self, self.model, None, {
SampleBatch.CUR_OBS: tf.convert_to_tensor(obs_t),
SampleBatch.ACTIONS: tf.convert_to_tensor(act_t),
SampleBatch.REWARDS: tf.convert_to_tensor(rew_t),
SampleBatch.NEXT_OBS: tf.convert_to_tensor(obs_tp1),
SampleBatch.DONES: tf.convert_to_tensor(done_mask),
PRIO_WEIGHTS: tf.convert_to_tensor(importance_weights),
})
return self.td_error
self.compute_td_error = compute_td_error
class TargetNetworkMixin(object):
def __init__(self, config):
@make_tf_callable(self.get_session())
def update_target_fn(tau):
tau = tf.convert_to_tensor(tau, dtype=tf.float32)
update_target_expr = []
model_vars = self.model.trainable_variables()
target_model_vars = self.target_model.trainable_variables()
assert len(model_vars) == len(target_model_vars), \
(model_vars, target_model_vars)
for var, var_target in zip(model_vars, target_model_vars):
update_target_expr.append(
var_target.assign(tau * var + (1.0 - tau) * var_target))
logger.debug("Update target op {}".format(var_target))
return tf.group(*update_target_expr)
# Hard initial update
self._do_update = update_target_fn
self.update_target(tau=1.0)
# support both hard and soft sync
def update_target(self, tau=None):
self._do_update(np.float32(tau or self.config.get("tau")))
def setup_early_mixins(policy, obs_space, action_space, config):
ExplorationStateMixin.__init__(policy, obs_space, action_space, config)
ActorCriticOptimizerMixin.__init__(policy, config)
-24
View File
@@ -56,30 +56,6 @@ class TestEagerSupport(unittest.TestCase):
"timesteps_per_iteration": 100
})
def testDDPG(self):
check_support("DDPG", {
"num_workers": 0,
"learning_starts": 0,
"timesteps_per_iteration": 10
})
def testTD3(self):
check_support("TD3", {
"num_workers": 0,
"learning_starts": 0,
"timesteps_per_iteration": 10
})
def testAPEX_DDPG(self):
check_support(
"APEX_DDPG", {
"num_workers": 2,
"learning_starts": 0,
"num_gpus": 0,
"min_iter_time_s": 1,
"timesteps_per_iteration": 100
})
def testSAC(self):
check_support("SAC", {
"num_workers": 0,