[rllib] ModelV2 API (#4926)

This commit is contained in:
Eric Liang
2019-07-03 15:59:47 -07:00
committed by GitHub
parent 9e0192bc0b
commit 34d054ff19
42 changed files with 1641 additions and 518 deletions
+8 -7
View File
@@ -56,7 +56,7 @@ def postprocess_advantages(policy,
last_r = 0.0
else:
next_state = []
for i in range(len(policy.model.state_in)):
for i in range(len(policy.state_in)):
next_state.append([sample_batch["state_out_{}".format(i)][-1]])
last_r = policy._value(sample_batch[SampleBatch.NEXT_OBS][-1],
sample_batch[SampleBatch.ACTIONS][-1],
@@ -79,11 +79,11 @@ class ValueNetworkMixin(object):
self.get_placeholder(SampleBatch.CUR_OBS): [ob],
self.get_placeholder(SampleBatch.PREV_ACTIONS): [prev_action],
self.get_placeholder(SampleBatch.PREV_REWARDS): [prev_reward],
self.model.seq_lens: [1]
self.seq_lens: [1]
}
assert len(args) == len(self.model.state_in), \
(args, self.model.state_in)
for k, v in zip(self.model.state_in, args):
assert len(args) == len(self.state_in), \
(args, self.state_in)
for k, v in zip(self.state_in, args):
feed_dict[k] = v
vf = self.get_session().run(self.vf, feed_dict)
return vf[0]
@@ -91,10 +91,11 @@ class ValueNetworkMixin(object):
def stats(policy, batch_tensors):
return {
"cur_lr": tf.cast(policy.cur_lr, tf.float64),
"cur_lr": tf.cast(policy.convert_to_eager(policy.cur_lr), tf.float64),
"policy_loss": policy.loss.pi_loss,
"policy_entropy": policy.loss.entropy,
"var_gnorm": tf.global_norm(policy.var_list),
"var_gnorm": tf.global_norm(
[policy.convert_to_eager(x) for x in policy.var_list]),
"vf_loss": policy.loss.vf_loss,
}
+13 -13
View File
@@ -7,8 +7,7 @@ import numpy as np
import ray
import ray.experimental.tf_utils
from ray.rllib.agents.dqn.dqn_policy import (_huber_loss, _minimize_and_clip,
_scope_vars, _postprocess_dqn)
from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY
from ray.rllib.models import ModelCatalog
@@ -17,6 +16,7 @@ from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip, scope_vars
tf = try_import_tf()
@@ -105,7 +105,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
with tf.variable_scope(POLICY_SCOPE) as scope:
policy_out, self.policy_model = self._build_policy_network(
self.cur_observations, observation_space, action_space)
self.policy_vars = _scope_vars(scope.name)
self.policy_vars = scope_vars(scope.name)
# Noise vars for P network except for layer normalization vars
if self.config["parameter_noise"]:
@@ -154,7 +154,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
with tf.variable_scope(POLICY_TARGET_SCOPE) as scope:
policy_tp1, _ = self._build_policy_network(
self.obs_tp1, observation_space, action_space)
target_policy_vars = _scope_vars(scope.name)
target_policy_vars = scope_vars(scope.name)
# Action outputs
with tf.variable_scope(ACTION_SCOPE, reuse=True):
@@ -179,7 +179,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
# Q-values for given actions & observations in given current
q_t, self.q_model = self._build_q_network(
self.obs_t, observation_space, action_space, self.act_t)
self.q_func_vars = _scope_vars(scope.name)
self.q_func_vars = scope_vars(scope.name)
self.stats = {
"mean_q": tf.reduce_mean(q_t),
"max_q": tf.reduce_max(q_t),
@@ -193,7 +193,7 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
with tf.variable_scope(TWIN_Q_SCOPE) as scope:
twin_q_t, self.twin_q_model = self._build_q_network(
self.obs_t, observation_space, action_space, self.act_t)
self.twin_q_func_vars = _scope_vars(scope.name)
self.twin_q_func_vars = scope_vars(scope.name)
q_batchnorm_update_ops = list(
set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops)
@@ -201,13 +201,13 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
with tf.variable_scope(Q_TARGET_SCOPE) as scope:
q_tp1, _ = self._build_q_network(self.obs_tp1, observation_space,
action_space, policy_tp1_smoothed)
target_q_func_vars = _scope_vars(scope.name)
target_q_func_vars = scope_vars(scope.name)
if self.config["twin_q"]:
with tf.variable_scope(TWIN_Q_TARGET_SCOPE) as scope:
twin_q_tp1, _ = self._build_q_network(
self.obs_tp1, observation_space, action_space,
policy_tp1_smoothed)
twin_target_q_func_vars = _scope_vars(scope.name)
twin_target_q_func_vars = scope_vars(scope.name)
if self.config["twin_q"]:
self.critic_loss, self.actor_loss, self.td_error \
@@ -330,12 +330,12 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
@override(TFPolicy)
def gradients(self, optimizer, loss):
if self.config["grad_norm_clipping"] is not None:
actor_grads_and_vars = _minimize_and_clip(
actor_grads_and_vars = minimize_and_clip(
self._actor_optimizer,
self.actor_loss,
var_list=self.policy_vars,
clip_val=self.config["grad_norm_clipping"])
critic_grads_and_vars = _minimize_and_clip(
critic_grads_and_vars = minimize_and_clip(
self._critic_optimizer,
self.critic_loss,
var_list=self.q_func_vars + self.twin_q_func_vars
@@ -559,15 +559,15 @@ class DDPGTFPolicy(DDPGPostprocessing, TFPolicy):
twin_td_error = twin_q_t_selected - q_t_selected_target
td_error = td_error + twin_td_error
if use_huber:
errors = _huber_loss(td_error, huber_threshold) \
+ _huber_loss(twin_td_error, huber_threshold)
errors = huber_loss(td_error, huber_threshold) \
+ huber_loss(twin_td_error, huber_threshold)
else:
errors = 0.5 * tf.square(td_error) + 0.5 * tf.square(
twin_td_error)
else:
td_error = q_t_selected - q_t_selected_target
if use_huber:
errors = _huber_loss(td_error, huber_threshold)
errors = huber_loss(td_error, huber_threshold)
else:
errors = 0.5 * tf.square(td_error)
+3 -2
View File
@@ -3,12 +3,13 @@ from __future__ import division
from __future__ import print_function
from ray.rllib.agents.dqn.apex import ApexTrainer
from ray.rllib.agents.dqn.dqn import DQNTrainer, DEFAULT_CONFIG
from ray.rllib.agents.dqn.dqn import DQNTrainer, SimpleQTrainer, DEFAULT_CONFIG
from ray.rllib.utils import renamed_agent
DQNAgent = renamed_agent(DQNTrainer)
ApexAgent = renamed_agent(ApexTrainer)
__all__ = [
"DQNAgent", "ApexAgent", "ApexTrainer", "DQNTrainer", "DEFAULT_CONFIG"
"DQNAgent", "ApexAgent", "ApexTrainer", "DQNTrainer", "DEFAULT_CONFIG",
"SimpleQTrainer"
]
@@ -0,0 +1,250 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
class DistributionalQModel(TFModelV2):
"""Extension of standard TFModel to provide distributional Q values.
It also supports options for noisy nets and parameter space noise.
Note that this class by itself is not a valid model unless you
implement forward() in a subclass."""
def __init__(self,
obs_space,
action_space,
num_outputs,
model_config,
name,
q_hiddens=(256, ),
dueling=False,
num_atoms=1,
use_noisy=False,
v_min=-10.0,
v_max=10.0,
sigma0=0.5,
parameter_noise=False):
"""Initialize variables of this model.
Extra model kwargs:
q_hiddens (list): defines size of hidden layers for the q head.
These will be used to postprocess the model output for the
purposes of computing Q values.
dueling (bool): whether to build the state value head for DDQN
num_atoms (int): if >1, enables distributional DQN
use_noisy (bool): use noisy nets
v_min (float): min value support for distributional DQN
v_max (float): max value support for distributional DQN
sigma0 (float): initial value of noisy nets
parameter_noise (bool): enable layer norm for param noise
Note that the core layers for forward() are not defined here, this
only defines the layers for the Q head. Those layers for forward()
should be defined in subclasses of DistributionalQModel.
"""
super(DistributionalQModel, self).__init__(
obs_space, action_space, num_outputs, model_config, name)
# setup the Q head output (i.e., model for get_q_values)
self.model_out = tf.keras.layers.Input(
shape=(num_outputs, ), name="model_out")
def build_action_value(model_out):
if q_hiddens:
action_out = model_out
for i in range(len(q_hiddens)):
if use_noisy:
action_out = self._noisy_layer(
"hidden_%d" % i, action_out, q_hiddens[i], sigma0)
elif parameter_noise:
import tensorflow.contrib.layers as layers
action_out = layers.fully_connected(
action_out,
num_outputs=q_hiddens[i],
activation_fn=tf.nn.relu,
normalizer_fn=layers.layer_norm)
else:
action_out = tf.layers.dense(
action_out,
units=q_hiddens[i],
activation=tf.nn.relu,
name="hidden_%d" % i)
else:
# Avoid postprocessing the outputs. This enables custom models
# to be used for parametric action DQN.
action_out = model_out
if use_noisy:
action_scores = self._noisy_layer(
"output",
action_out,
self.action_space.n * num_atoms,
sigma0,
non_linear=False)
elif q_hiddens:
action_scores = tf.layers.dense(
action_out,
units=self.action_space.n * num_atoms,
activation=None)
else:
action_scores = model_out
if num_atoms > 1:
# Distributional Q-learning uses a discrete support z
# to represent the action value distribution
z = tf.range(num_atoms, dtype=tf.float32)
z = v_min + z * (v_max - v_min) / float(num_atoms - 1)
support_logits_per_action = tf.reshape(
tensor=action_scores,
shape=(-1, self.action_space.n, num_atoms))
support_prob_per_action = tf.nn.softmax(
logits=support_logits_per_action)
action_scores = tf.reduce_sum(
input_tensor=z * support_prob_per_action, axis=-1)
logits = support_logits_per_action
dist = support_prob_per_action
return [
action_scores, z, support_logits_per_action, logits, dist
]
else:
logits = tf.expand_dims(tf.ones_like(action_scores), -1)
dist = tf.expand_dims(tf.ones_like(action_scores), -1)
return [action_scores, logits, dist]
def build_state_score(model_out):
state_out = model_out
for i in range(len(q_hiddens)):
if use_noisy:
state_out = self._noisy_layer("dueling_hidden_%d" % i,
state_out, q_hiddens[i],
sigma0)
elif parameter_noise:
state_out = tf.contrib.layers.fully_connected(
state_out,
num_outputs=q_hiddens[i],
activation_fn=tf.nn.relu,
normalizer_fn=tf.contrib.layers.layer_norm)
else:
state_out = tf.layers.dense(
state_out, units=q_hiddens[i], activation=tf.nn.relu)
if use_noisy:
state_score = self._noisy_layer(
"dueling_output",
state_out,
num_atoms,
sigma0,
non_linear=False)
else:
state_score = tf.layers.dense(
state_out, units=num_atoms, activation=None)
return state_score
def build_action_value_in_scope(model_out):
with tf.variable_scope(
name + "/action_value", reuse=tf.AUTO_REUSE):
return build_action_value(model_out)
def build_state_score_in_scope(model_out):
with tf.variable_scope(name + "/state_value", reuse=tf.AUTO_REUSE):
return build_state_score(model_out)
q_out = tf.keras.layers.Lambda(build_action_value_in_scope)(
self.model_out)
self.q_value_head = tf.keras.Model(self.model_out, q_out)
self.register_variables(self.q_value_head.variables)
if dueling:
state_out = tf.keras.layers.Lambda(build_state_score_in_scope)(
self.model_out)
self.state_value_head = tf.keras.Model(self.model_out, state_out)
self.register_variables(self.state_value_head.variables)
def get_q_value_distributions(self, model_out):
"""Returns distributional values for Q(s, a) given a state embedding.
Override this in your custom model to customize the Q output head.
Arguments:
model_out (Tensor): embedding from the model layers
Returns:
(action_scores, logits, dist) if num_atoms == 1, otherwise
(action_scores, z, support_logits_per_action, logits, dist)
"""
return self.q_value_head(model_out)
def get_state_value(self, model_out):
"""Returns the state value prediction for the given state embedding."""
return self.state_value_head(model_out)
def _noisy_layer(self,
prefix,
action_in,
out_size,
sigma0,
non_linear=True):
"""
a common dense layer: y = w^{T}x + b
a noisy layer: y = (w + \epsilon_w*\sigma_w)^{T}x +
(b+\epsilon_b*\sigma_b)
where \epsilon are random variables sampled from factorized normal
distributions and \sigma are trainable variables which are expected to
vanish along the training procedure
"""
import tensorflow.contrib.layers as layers
in_size = int(action_in.shape[1])
epsilon_in = tf.random_normal(shape=[in_size])
epsilon_out = tf.random_normal(shape=[out_size])
epsilon_in = self._f_epsilon(epsilon_in)
epsilon_out = self._f_epsilon(epsilon_out)
epsilon_w = tf.matmul(
a=tf.expand_dims(epsilon_in, -1), b=tf.expand_dims(epsilon_out, 0))
epsilon_b = epsilon_out
sigma_w = tf.get_variable(
name=prefix + "_sigma_w",
shape=[in_size, out_size],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(
minval=-1.0 / np.sqrt(float(in_size)),
maxval=1.0 / np.sqrt(float(in_size))))
# TF noise generation can be unreliable on GPU
# If generating the noise on the CPU,
# lowering sigma0 to 0.1 may be helpful
sigma_b = tf.get_variable(
name=prefix + "_sigma_b",
shape=[out_size],
dtype=tf.float32, # 0.5~GPU, 0.1~CPU
initializer=tf.constant_initializer(
sigma0 / np.sqrt(float(in_size))))
w = tf.get_variable(
name=prefix + "_fc_w",
shape=[in_size, out_size],
dtype=tf.float32,
initializer=layers.xavier_initializer())
b = tf.get_variable(
name=prefix + "_fc_b",
shape=[out_size],
dtype=tf.float32,
initializer=tf.zeros_initializer())
action_activation = tf.nn.xw_plus_b(action_in, w + sigma_w * epsilon_w,
b + sigma_b * epsilon_b)
if not non_linear:
return action_activation
return tf.nn.relu(action_activation)
def _f_epsilon(self, x):
return tf.sign(x) * tf.sqrt(tf.abs(x))
+3
View File
@@ -8,6 +8,7 @@ from ray import tune
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy
from ray.rllib.agents.dqn.simple_q_policy import SimpleQPolicy
from ray.rllib.optimizers import SyncReplayOptimizer
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule
@@ -294,3 +295,5 @@ GenericOffPolicyTrainer = build_trainer(
DQNTrainer = GenericOffPolicyTrainer.with_updates(
name="DQN", default_policy=DQNTFPolicy, default_config=DEFAULT_CONFIG)
SimpleQTrainer = DQNTrainer.with_updates(default_policy=SimpleQPolicy)
+127 -330
View File
@@ -7,14 +7,16 @@ import numpy as np
from scipy.stats import entropy
import ray
from ray.rllib.policy.policy import Policy
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
from ray.rllib.agents.dqn.simple_q_policy import ExplorationStateMixin, \
TargetNetworkMixin
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models import ModelCatalog, Categorical
from ray.rllib.utils.annotations import override
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.policy.tf_policy import TFPolicy, \
LearningRateSchedule
from ray.rllib.policy.tf_policy import LearningRateSchedule
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.utils.tf_ops import huber_loss, reduce_mean_ignore_inf, \
minimize_and_clip
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
@@ -93,7 +95,7 @@ class QLoss(object):
self.td_error = (
q_t_selected - tf.stop_gradient(q_t_selected_target))
self.loss = tf.reduce_mean(
importance_weights * _huber_loss(self.td_error))
importance_weights * huber_loss(self.td_error))
self.stats = {
"mean_q": tf.reduce_mean(q_t_selected),
"min_q": tf.reduce_min(q_t_selected),
@@ -102,180 +104,6 @@ class QLoss(object):
}
class QNetwork(object):
def __init__(self,
model,
num_actions,
dueling=False,
hiddens=[256],
use_noisy=False,
num_atoms=1,
v_min=-10.0,
v_max=10.0,
sigma0=0.5,
parameter_noise=False):
self.model = model
with tf.variable_scope("action_value"):
if hiddens:
action_out = model.last_layer
for i in range(len(hiddens)):
if use_noisy:
action_out = self.noisy_layer(
"hidden_%d" % i, action_out, hiddens[i], sigma0)
elif parameter_noise:
import tensorflow.contrib.layers as layers
action_out = layers.fully_connected(
action_out,
num_outputs=hiddens[i],
activation_fn=tf.nn.relu,
normalizer_fn=layers.layer_norm)
else:
action_out = tf.layers.dense(
action_out,
units=hiddens[i],
activation=tf.nn.relu)
else:
# Avoid postprocessing the outputs. This enables custom models
# to be used for parametric action DQN.
action_out = model.outputs
if use_noisy:
action_scores = self.noisy_layer(
"output",
action_out,
num_actions * num_atoms,
sigma0,
non_linear=False)
elif hiddens:
action_scores = tf.layers.dense(
action_out, units=num_actions * num_atoms, activation=None)
else:
action_scores = model.outputs
if num_atoms > 1:
# Distributional Q-learning uses a discrete support z
# to represent the action value distribution
z = tf.range(num_atoms, dtype=tf.float32)
z = v_min + z * (v_max - v_min) / float(num_atoms - 1)
support_logits_per_action = tf.reshape(
tensor=action_scores, shape=(-1, num_actions, num_atoms))
support_prob_per_action = tf.nn.softmax(
logits=support_logits_per_action)
action_scores = tf.reduce_sum(
input_tensor=z * support_prob_per_action, axis=-1)
self.logits = support_logits_per_action
self.dist = support_prob_per_action
else:
self.logits = tf.expand_dims(tf.ones_like(action_scores), -1)
self.dist = tf.expand_dims(tf.ones_like(action_scores), -1)
if dueling:
with tf.variable_scope("state_value"):
state_out = model.last_layer
for i in range(len(hiddens)):
if use_noisy:
state_out = self.noisy_layer("dueling_hidden_%d" % i,
state_out, hiddens[i],
sigma0)
elif parameter_noise:
state_out = tf.contrib.layers.fully_connected(
state_out,
num_outputs=hiddens[i],
activation_fn=tf.nn.relu,
normalizer_fn=tf.contrib.layers.layer_norm)
else:
state_out = tf.layers.dense(
state_out, units=hiddens[i], activation=tf.nn.relu)
if use_noisy:
state_score = self.noisy_layer(
"dueling_output",
state_out,
num_atoms,
sigma0,
non_linear=False)
else:
state_score = tf.layers.dense(
state_out, units=num_atoms, activation=None)
if num_atoms > 1:
support_logits_per_action_mean = tf.reduce_mean(
support_logits_per_action, 1)
support_logits_per_action_centered = (
support_logits_per_action - tf.expand_dims(
support_logits_per_action_mean, 1))
support_logits_per_action = tf.expand_dims(
state_score, 1) + support_logits_per_action_centered
support_prob_per_action = tf.nn.softmax(
logits=support_logits_per_action)
self.value = tf.reduce_sum(
input_tensor=z * support_prob_per_action, axis=-1)
self.logits = support_logits_per_action
self.dist = support_prob_per_action
else:
action_scores_mean = _reduce_mean_ignore_inf(action_scores, 1)
action_scores_centered = action_scores - tf.expand_dims(
action_scores_mean, 1)
self.value = state_score + action_scores_centered
else:
self.value = action_scores
def f_epsilon(self, x):
return tf.sign(x) * tf.sqrt(tf.abs(x))
def noisy_layer(self, prefix, action_in, out_size, sigma0,
non_linear=True):
"""
a common dense layer: y = w^{T}x + b
a noisy layer: y = (w + \epsilon_w*\sigma_w)^{T}x +
(b+\epsilon_b*\sigma_b)
where \epsilon are random variables sampled from factorized normal
distributions and \sigma are trainable variables which are expected to
vanish along the training procedure
"""
import tensorflow.contrib.layers as layers
in_size = int(action_in.shape[1])
epsilon_in = tf.random_normal(shape=[in_size])
epsilon_out = tf.random_normal(shape=[out_size])
epsilon_in = self.f_epsilon(epsilon_in)
epsilon_out = self.f_epsilon(epsilon_out)
epsilon_w = tf.matmul(
a=tf.expand_dims(epsilon_in, -1), b=tf.expand_dims(epsilon_out, 0))
epsilon_b = epsilon_out
sigma_w = tf.get_variable(
name=prefix + "_sigma_w",
shape=[in_size, out_size],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(
minval=-1.0 / np.sqrt(float(in_size)),
maxval=1.0 / np.sqrt(float(in_size))))
# TF noise generation can be unreliable on GPU
# If generating the noise on the CPU,
# lowering sigma0 to 0.1 may be helpful
sigma_b = tf.get_variable(
name=prefix + "_sigma_b",
shape=[out_size],
dtype=tf.float32, # 0.5~GPU, 0.1~CPU
initializer=tf.constant_initializer(
sigma0 / np.sqrt(float(in_size))))
w = tf.get_variable(
name=prefix + "_fc_w",
shape=[in_size, out_size],
dtype=tf.float32,
initializer=layers.xavier_initializer())
b = tf.get_variable(
name=prefix + "_fc_b",
shape=[out_size],
dtype=tf.float32,
initializer=tf.zeros_initializer())
action_activation = tf.nn.xw_plus_b(action_in, w + sigma_w * epsilon_w,
b + sigma_b * epsilon_b)
if not non_linear:
return action_activation
return tf.nn.relu(action_activation)
class QValuePolicy(object):
def __init__(self, q_values, observations, num_actions, stochastic, eps,
softmax, softmax_temp):
@@ -305,44 +133,6 @@ class QValuePolicy(object):
self.action_prob = None
class ExplorationStateMixin(object):
def __init__(self, obs_space, action_space, config):
self.cur_epsilon = 1.0
self.stochastic = tf.placeholder(tf.bool, (), name="stochastic")
self.eps = tf.placeholder(tf.float32, (), name="eps")
def add_parameter_noise(self):
if self.config["parameter_noise"]:
self.sess.run(self.add_noise_op)
def set_epsilon(self, epsilon):
self.cur_epsilon = epsilon
@override(Policy)
def get_state(self):
return [TFPolicy.get_state(self), self.cur_epsilon]
@override(Policy)
def set_state(self, state):
TFPolicy.set_state(self, state[0])
self.set_epsilon(state[1])
class TargetNetworkMixin(object):
def __init__(self, obs_space, action_space, config):
# update_target_fn will be called periodically to copy Q network to
# target Q network
update_target_expr = []
assert len(self.q_func_vars) == len(self.target_q_func_vars), \
(self.q_func_vars, self.target_q_func_vars)
for var, var_target in zip(self.q_func_vars, self.target_q_func_vars):
update_target_expr.append(var_target.assign(var))
self.update_target_expr = tf.group(*update_target_expr)
def update_target(self):
return self.get_session().run(self.update_target_expr)
class ComputeTDErrorMixin(object):
def compute_td_error(self, obs_t, act_t, rew_t, obs_tp1, done_mask,
importance_weights):
@@ -350,7 +140,7 @@ class ComputeTDErrorMixin(object):
return np.zeros_like(rew_t)
td_err = self.get_session().run(
self.loss.td_error,
self.q_loss.td_error,
feed_dict={
self.get_placeholder(SampleBatch.CUR_OBS): [
np.array(ob) for ob in obs_t
@@ -394,20 +184,64 @@ def postprocess_trajectory(policy,
return _postprocess_dqn(policy, sample_batch)
def build_q_networks(policy, input_dict, observation_space, action_space,
config):
def build_q_model(policy, obs_space, action_space, config):
if not isinstance(action_space, Discrete):
raise UnsupportedSpaceException(
"Action space {} is not supported for DQN.".format(action_space))
if config["hiddens"]:
num_outputs = 256
config["model"]["no_final_linear"] = True
else:
num_outputs = action_space.n
policy.q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
framework="tf",
model_interface=DistributionalQModel,
name=Q_SCOPE,
num_atoms=config["num_atoms"],
q_hiddens=config["hiddens"],
dueling=config["dueling"],
use_noisy=config["noisy"],
v_min=config["v_min"],
v_max=config["v_max"],
sigma0=config["sigma0"],
parameter_noise=config["parameter_noise"])
policy.target_q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
framework="tf",
model_interface=DistributionalQModel,
name=Q_TARGET_SCOPE,
num_atoms=config["num_atoms"],
q_hiddens=config["hiddens"],
dueling=config["dueling"],
use_noisy=config["noisy"],
v_min=config["v_min"],
v_max=config["v_max"],
sigma0=config["sigma0"],
parameter_noise=config["parameter_noise"])
return policy.q_model
def build_q_networks(policy, q_model, input_dict, obs_space, action_space,
config):
# Action Q network
with tf.variable_scope(Q_SCOPE) as scope:
q_values, q_logits, q_dist, _ = _build_q_network(
policy, input_dict[SampleBatch.CUR_OBS], observation_space,
action_space)
policy.q_values = q_values
policy.q_func_vars = _scope_vars(scope.name)
q_values, q_logits, q_dist = _compute_q_values(
policy, q_model, input_dict[SampleBatch.CUR_OBS], obs_space,
action_space)
policy.q_values = q_values
policy.q_func_vars = q_model.variables()
# Noise vars for Q network except for layer normalization vars
if config["parameter_noise"]:
@@ -419,7 +253,7 @@ def build_q_networks(policy, input_dict, observation_space, action_space,
# Action outputs
qvp = QValuePolicy(q_values, input_dict[SampleBatch.CUR_OBS],
action_space.n, policy.stochastic, policy.eps,
policy.config["soft_q"], policy.config["softmax_temp"])
config["soft_q"], config["softmax_temp"])
policy.output_actions, policy.action_prob = qvp.action, qvp.action_prob
return policy.output_actions, policy.action_prob
@@ -463,37 +297,36 @@ def _build_parameter_noise(policy, pnet_params):
def build_q_losses(policy, batch_tensors):
config = policy.config
# q network evaluation
with tf.variable_scope(Q_SCOPE, reuse=True):
prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS))
q_t, q_logits_t, q_dist_t, model = _build_q_network(
policy, batch_tensors[SampleBatch.CUR_OBS],
policy.observation_space, policy.action_space)
policy.q_batchnorm_update_ops = list(
set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops)
prev_update_ops = set(tf.get_collection(tf.GraphKeys.UPDATE_OPS))
q_t, q_logits_t, q_dist_t = _compute_q_values(
policy, policy.q_model, batch_tensors[SampleBatch.CUR_OBS],
policy.observation_space, policy.action_space)
policy.q_batchnorm_update_ops = list(
set(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) - prev_update_ops)
# target q network evalution
with tf.variable_scope(Q_TARGET_SCOPE) as scope:
q_tp1, q_logits_tp1, q_dist_tp1, _ = _build_q_network(
policy, batch_tensors[SampleBatch.NEXT_OBS],
policy.observation_space, policy.action_space)
policy.target_q_func_vars = _scope_vars(scope.name)
q_tp1, q_logits_tp1, q_dist_tp1 = _compute_q_values(
policy, policy.target_q_model, batch_tensors[SampleBatch.NEXT_OBS],
policy.observation_space, policy.action_space)
policy.target_q_func_vars = policy.target_q_model.variables()
# q scores for actions which we know were selected in the given state.
one_hot_selection = tf.one_hot(batch_tensors[SampleBatch.ACTIONS],
policy.action_space.n)
one_hot_selection = tf.one_hot(
tf.cast(batch_tensors[SampleBatch.ACTIONS], tf.int32),
policy.action_space.n)
q_t_selected = tf.reduce_sum(q_t * one_hot_selection, 1)
q_logits_t_selected = tf.reduce_sum(
q_logits_t * tf.expand_dims(one_hot_selection, -1), 1)
# compute estimate of best possible value starting from state at t + 1
if policy.config["double_q"]:
with tf.variable_scope(Q_SCOPE, reuse=True):
q_tp1_using_online_net, q_logits_tp1_using_online_net, \
q_dist_tp1_using_online_net, _ = _build_q_network(
policy,
batch_tensors[SampleBatch.NEXT_OBS],
policy.observation_space, policy.action_space)
if config["double_q"]:
q_tp1_using_online_net, q_logits_tp1_using_online_net, \
q_dist_tp1_using_online_net = _compute_q_values(
policy, policy.q_model,
batch_tensors[SampleBatch.NEXT_OBS],
policy.observation_space, policy.action_space)
q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1)
q_tp1_best_one_hot_selection = tf.one_hot(q_tp1_best_using_online_net,
policy.action_space.n)
@@ -507,12 +340,14 @@ def build_q_losses(policy, batch_tensors):
q_dist_tp1_best = tf.reduce_sum(
q_dist_tp1 * tf.expand_dims(q_tp1_best_one_hot_selection, -1), 1)
policy.loss = _build_q_loss(
policy.q_loss = QLoss(
q_t_selected, q_logits_t_selected, q_tp1_best, q_dist_tp1_best,
batch_tensors[SampleBatch.REWARDS], batch_tensors[SampleBatch.DONES],
batch_tensors[PRIO_WEIGHTS], policy.config)
batch_tensors[PRIO_WEIGHTS], batch_tensors[SampleBatch.REWARDS],
tf.cast(batch_tensors[SampleBatch.DONES],
tf.float32), config["gamma"], config["n_step"],
config["num_atoms"], config["v_min"], config["v_max"])
return policy.loss.loss
return policy.q_loss.loss
def adam_optimizer(policy, config):
@@ -522,7 +357,7 @@ def adam_optimizer(policy, config):
def clip_gradients(policy, optimizer, loss):
if policy.config["grad_norm_clipping"] is not None:
grads_and_vars = _minimize_and_clip(
grads_and_vars = minimize_and_clip(
optimizer,
loss,
var_list=policy.q_func_vars,
@@ -544,7 +379,7 @@ def exploration_setting_inputs(policy):
def build_q_stats(policy, batch_tensors):
return dict({
"cur_lr": tf.cast(policy.cur_lr, tf.float64),
}, **policy.loss.stats)
}, **policy.q_loss.stats)
def setup_early_mixins(policy, obs_space, action_space, config):
@@ -556,33 +391,45 @@ def setup_late_mixins(policy, obs_space, action_space, config):
TargetNetworkMixin.__init__(policy, obs_space, action_space, config)
def _build_q_network(policy, obs, obs_space, action_space):
def _compute_q_values(policy, model, obs, obs_space, action_space):
config = policy.config
qnet = QNetwork(
ModelCatalog.get_model({
"obs": obs,
"is_training": policy._get_is_training_placeholder(),
}, obs_space, action_space, action_space.n, config["model"]),
action_space.n, config["dueling"], config["hiddens"], config["noisy"],
config["num_atoms"], config["v_min"], config["v_max"],
config["sigma0"], config["parameter_noise"])
return qnet.value, qnet.logits, qnet.dist, qnet.model
model_out, state = model({
"obs": obs,
"is_training": policy._get_is_training_placeholder(),
}, [], None)
if config["num_atoms"] > 1:
(action_scores, z, support_logits_per_action, logits,
dist) = model.get_q_value_distributions(model_out)
else:
(action_scores, logits,
dist) = model.get_q_value_distributions(model_out)
def _build_q_value_policy(policy, q_values):
policy = QValuePolicy(q_values, policy.cur_observations,
policy.num_actions, policy.stochastic, policy.eps,
policy.config["soft_q"],
policy.config["softmax_temp"])
return policy.action, policy.action_prob
if config["dueling"]:
state_score = model.get_state_value(model_out)
if config["num_atoms"] > 1:
support_logits_per_action_mean = tf.reduce_mean(
support_logits_per_action, 1)
support_logits_per_action_centered = (
support_logits_per_action - tf.expand_dims(
support_logits_per_action_mean, 1))
support_logits_per_action = tf.expand_dims(
state_score, 1) + support_logits_per_action_centered
support_prob_per_action = tf.nn.softmax(
logits=support_logits_per_action)
value = tf.reduce_sum(
input_tensor=z * support_prob_per_action, axis=-1)
logits = support_logits_per_action
dist = support_prob_per_action
else:
action_scores_mean = reduce_mean_ignore_inf(action_scores, 1)
action_scores_centered = action_scores - tf.expand_dims(
action_scores_mean, 1)
value = state_score + action_scores_centered
else:
value = action_scores
def _build_q_loss(q_t_selected, q_logits_t_selected, q_tp1_best,
q_dist_tp1_best, rewards, dones, importance_weights, config):
return QLoss(q_t_selected, q_logits_t_selected, q_tp1_best,
q_dist_tp1_best, importance_weights, rewards,
tf.cast(dones, tf.float32), config["gamma"], config["n_step"],
config["num_atoms"], config["v_min"], config["v_max"])
return value, logits, dist
def _adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones):
@@ -634,61 +481,11 @@ def _postprocess_dqn(policy, batch):
return batch
def _reduce_mean_ignore_inf(x, axis):
"""Same as tf.reduce_mean() but ignores -inf values."""
mask = tf.not_equal(x, tf.float32.min)
x_zeroed = tf.where(mask, x, tf.zeros_like(x))
return (tf.reduce_sum(x_zeroed, axis) / tf.reduce_sum(
tf.cast(mask, tf.float32), axis))
def _huber_loss(x, delta=1.0):
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
return tf.where(
tf.abs(x) < delta,
tf.square(x) * 0.5, delta * (tf.abs(x) - 0.5 * delta))
def _minimize_and_clip(optimizer, objective, var_list, clip_val=10):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (tf.clip_by_norm(grad, clip_val), var)
return gradients
def _scope_vars(scope, trainable_only=False):
"""
Get variables inside a scope
The scope can be specified as a string
Parameters
----------
scope: str or VariableScope
scope in which the variables reside.
trainable_only: bool
whether or not to return only the variables that were marked as
trainable.
Returns
-------
vars: [tf.Variable]
list of variables in `scope`.
"""
return tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES
if trainable_only else tf.GraphKeys.VARIABLES,
scope=scope if isinstance(scope, str) else scope.name)
DQNTFPolicy = build_tf_policy(
name="DQNTFPolicy",
get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG,
make_action_sampler=build_q_networks,
make_model=build_q_model,
action_sampler_fn=build_q_networks,
loss_fn=build_q_losses,
stats_fn=build_q_stats,
postprocess_fn=postprocess_trajectory,
@@ -696,7 +493,7 @@ DQNTFPolicy = build_tf_policy(
gradients_fn=clip_gradients,
extra_action_feed_fn=exploration_setting_inputs,
extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values},
extra_learn_fetches_fn=lambda policy: {"td_error": policy.loss.td_error},
extra_learn_fetches_fn=lambda policy: {"td_error": policy.q_loss.td_error},
update_ops_fn=lambda policy: policy.q_batchnorm_update_ops,
before_init=setup_early_mixins,
after_init=setup_late_mixins,
@@ -0,0 +1,69 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
class SimpleQModel(TFModelV2):
"""Extension of standard TFModel to provide Q values.
Note that this class by itself is not a valid model unless you
implement forward() in a subclass."""
def __init__(self,
obs_space,
action_space,
num_outputs,
model_config,
name,
q_hiddens=(256, )):
"""Initialize variables of this model.
Extra model kwargs:
q_hiddens (list): defines size of hidden layers for the q head.
These will be used to postprocess the model output for the
purposes of computing Q values.
Note that the core layers for forward() are not defined here, this
only defines the layers for the Q head. Those layers for forward()
should be defined in subclasses of SimpleQModel.
"""
super(SimpleQModel, self).__init__(obs_space, action_space,
num_outputs, model_config, name)
# setup the Q head output (i.e., model for get_q_values)
self.model_out = tf.keras.layers.Input(
shape=(num_outputs, ), name="model_out")
if q_hiddens:
last_layer = self.model_out
for i, n in enumerate(q_hiddens):
last_layer = tf.keras.layers.Dense(
n, name="q_hidden_{}".format(i),
activation=tf.nn.relu)(last_layer)
q_out = tf.keras.layers.Dense(
action_space.n, activation=None, name="q_out")(last_layer)
else:
q_out = self.model_out
self.q_value_head = tf.keras.Model(self.model_out, q_out)
self.register_variables(self.q_value_head.variables)
def get_q_values(self, model_out):
"""Returns Q(s, a) given a feature tensor for the state.
Override this in your custom model to customize the Q output head.
Arguments:
model_out (Tensor): embedding from the model layers
Returns:
action scores Q(s, a) for each action, shape [None, action_space.n]
"""
return self.q_value_head(model_out)
@@ -0,0 +1,211 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Basic example of a DQN policy without any optimizations."""
from gym.spaces import Discrete
import ray
from ray.rllib.agents.dqn.simple_q_model import SimpleQModel
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models import ModelCatalog
from ray.rllib.utils.annotations import override
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.tf_ops import huber_loss
tf = try_import_tf()
Q_SCOPE = "q_func"
Q_TARGET_SCOPE = "target_q_func"
class ExplorationStateMixin(object):
def __init__(self, obs_space, action_space, config):
self.cur_epsilon = 1.0
self.stochastic = tf.placeholder(tf.bool, (), name="stochastic")
self.eps = tf.placeholder(tf.float32, (), name="eps")
def add_parameter_noise(self):
if self.config["parameter_noise"]:
self.sess.run(self.add_noise_op)
def set_epsilon(self, epsilon):
self.cur_epsilon = epsilon
@override(Policy)
def get_state(self):
return [TFPolicy.get_state(self), self.cur_epsilon]
@override(Policy)
def set_state(self, state):
TFPolicy.set_state(self, state[0])
self.set_epsilon(state[1])
class TargetNetworkMixin(object):
def __init__(self, obs_space, action_space, config):
# update_target_fn will be called periodically to copy Q network to
# target Q network
update_target_expr = []
assert len(self.q_func_vars) == len(self.target_q_func_vars), \
(self.q_func_vars, self.target_q_func_vars)
for var, var_target in zip(self.q_func_vars, self.target_q_func_vars):
update_target_expr.append(var_target.assign(var))
self.update_target_expr = tf.group(*update_target_expr)
def update_target(self):
return self.get_session().run(self.update_target_expr)
def build_q_models(policy, obs_space, action_space, config):
if not isinstance(action_space, Discrete):
raise UnsupportedSpaceException(
"Action space {} is not supported for DQN.".format(action_space))
if config["hiddens"]:
num_outputs = 256
config["model"]["no_final_linear"] = True
else:
num_outputs = action_space.n
policy.q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
framework="tf",
name=Q_SCOPE,
model_interface=SimpleQModel,
q_hiddens=config["hiddens"])
policy.target_q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
framework="tf",
name=Q_TARGET_SCOPE,
model_interface=SimpleQModel,
q_hiddens=config["hiddens"])
return policy.q_model
def build_action_sampler(policy, q_model, input_dict, obs_space, action_space,
config):
# Action Q network
q_values = _compute_q_values(policy, q_model,
input_dict[SampleBatch.CUR_OBS], obs_space,
action_space)
policy.q_values = q_values
policy.q_func_vars = q_model.variables()
# Action outputs
deterministic_actions = tf.argmax(q_values, axis=1)
batch_size = tf.shape(input_dict[SampleBatch.CUR_OBS])[0]
# Special case masked out actions (q_value ~= -inf) so that we don't
# even consider them for exploration.
random_valid_action_logits = tf.where(
tf.equal(q_values, tf.float32.min),
tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values))
random_actions = tf.squeeze(
tf.multinomial(random_valid_action_logits, 1), axis=1)
chose_random = tf.random_uniform(
tf.stack([batch_size]), minval=0, maxval=1,
dtype=tf.float32) < policy.eps
stochastic_actions = tf.where(chose_random, random_actions,
deterministic_actions)
action = tf.cond(policy.stochastic, lambda: stochastic_actions,
lambda: deterministic_actions)
action_prob = None
return action, action_prob
def build_q_losses(policy, batch_tensors):
# q network evaluation
q_t = _compute_q_values(policy, policy.q_model,
batch_tensors[SampleBatch.CUR_OBS],
policy.observation_space, policy.action_space)
# target q network evalution
q_tp1 = _compute_q_values(policy, policy.target_q_model,
batch_tensors[SampleBatch.NEXT_OBS],
policy.observation_space, policy.action_space)
policy.target_q_func_vars = policy.target_q_model.variables()
# q scores for actions which we know were selected in the given state.
one_hot_selection = tf.one_hot(
tf.cast(batch_tensors[SampleBatch.ACTIONS], tf.int32),
policy.action_space.n)
q_t_selected = tf.reduce_sum(q_t * one_hot_selection, 1)
# compute estimate of best possible value starting from state at t + 1
dones = tf.cast(batch_tensors[SampleBatch.DONES], tf.float32)
q_tp1_best_one_hot_selection = tf.one_hot(
tf.argmax(q_tp1, 1), policy.action_space.n)
q_tp1_best = tf.reduce_sum(q_tp1 * q_tp1_best_one_hot_selection, 1)
q_tp1_best_masked = (1.0 - dones) * q_tp1_best
# compute RHS of bellman equation
q_t_selected_target = (batch_tensors[SampleBatch.REWARDS] +
policy.config["gamma"] * q_tp1_best_masked)
# compute the error (potentially clipped)
td_error = q_t_selected - tf.stop_gradient(q_t_selected_target)
loss = tf.reduce_mean(huber_loss(td_error))
# save TD error as an attribute for outside access
policy.td_error = td_error
return loss
def _compute_q_values(policy, model, obs, obs_space, action_space):
input_dict = {
"obs": obs,
"is_training": policy._get_is_training_placeholder(),
}
model_out, _ = model(input_dict, [], None)
return model.get_q_values(model_out)
def exploration_setting_inputs(policy):
return {
policy.stochastic: True,
policy.eps: policy.cur_epsilon,
}
def setup_early_mixins(policy, obs_space, action_space, config):
ExplorationStateMixin.__init__(policy, obs_space, action_space, config)
def setup_late_mixins(policy, obs_space, action_space, config):
TargetNetworkMixin.__init__(policy, obs_space, action_space, config)
SimpleQPolicy = build_tf_policy(
name="SimpleQPolicy",
get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG,
make_model=build_q_models,
action_sampler_fn=build_action_sampler,
loss_fn=build_q_losses,
extra_action_feed_fn=exploration_setting_inputs,
extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values},
extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error},
before_init=setup_early_mixins,
after_init=setup_late_mixins,
obs_include_prev_action_reward=False,
mixins=[
ExplorationStateMixin,
TargetNetworkMixin,
])
@@ -11,9 +11,9 @@ from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY
from ray.rllib.utils.annotations import override
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.agents.dqn.dqn_policy import _scope_vars
from ray.rllib.utils.explained_variance import explained_variance
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.tf_ops import scope_vars
tf = try_import_tf()
@@ -103,7 +103,7 @@ class MARWILPolicy(MARWILPostprocessing, TFPolicy):
}, observation_space, action_space, logit_dim,
self.config["model"])
logits = self.model.outputs
self.p_func_vars = _scope_vars(scope.name)
self.p_func_vars = scope_vars(scope.name)
# Action outputs
action_dist = dist_cls(logits)
@@ -116,7 +116,7 @@ class MARWILPolicy(MARWILPostprocessing, TFPolicy):
# v network evaluation
with tf.variable_scope(VALUE_SCOPE) as scope:
state_values = self.model.value_function()
self.v_func_vars = _scope_vars(scope.name)
self.v_func_vars = scope_vars(scope.name)
self.v_loss = self._build_value_loss(state_values, self.cum_rew_t)
self.p_loss = self._build_policy_loss(state_values, self.cum_rew_t,
logits, self.act_t, action_space)
+12 -13
View File
@@ -179,8 +179,8 @@ def _make_time_major(policy, tensor, drop_last=False):
if isinstance(tensor, list):
return [_make_time_major(policy, t, drop_last) for t in tensor]
if policy.model.state_init:
B = tf.shape(policy.model.seq_lens)[0]
if policy.state_in:
B = tf.shape(policy.seq_lens)[0]
T = tf.shape(tensor)[0] // B
else:
# Important: chop the tensor into batches at known episode cut
@@ -219,15 +219,14 @@ def build_appo_surrogate_loss(policy, batch_tensors):
behaviour_logits = batch_tensors[BEHAVIOUR_LOGITS]
unpacked_behaviour_logits = tf.split(
behaviour_logits, output_hidden_shape, axis=1)
unpacked_outputs = tf.split(
policy.model.outputs, output_hidden_shape, axis=1)
unpacked_outputs = tf.split(policy.model_out, output_hidden_shape, axis=1)
action_dist = policy.action_dist
prev_action_dist = policy.dist_class(behaviour_logits)
values = policy.value_function
if policy.model.state_in:
max_seq_len = tf.reduce_max(policy.model.seq_lens) - 1
mask = tf.sequence_mask(policy.model.seq_lens, max_seq_len)
if policy.state_in:
max_seq_len = tf.reduce_max(policy.seq_lens) - 1
mask = tf.sequence_mask(policy.seq_lens, max_seq_len)
mask = tf.reshape(mask, [-1])
else:
mask = tf.ones_like(rewards)
@@ -316,7 +315,7 @@ def postprocess_trajectory(policy,
last_r = 0.0
else:
next_state = []
for i in range(len(policy.model.state_in)):
for i in range(len(policy.state_in)):
next_state.append([sample_batch["state_out_{}".format(i)][-1]])
last_r = policy.value(sample_batch["new_obs"][-1], *next_state)
batch = compute_advantages(
@@ -332,7 +331,7 @@ def postprocess_trajectory(policy,
def add_values_and_logits(policy):
out = {BEHAVIOUR_LOGITS: policy.model.outputs}
out = {BEHAVIOUR_LOGITS: policy.model_out}
if not policy.config["vtrace"]:
out[SampleBatch.VF_PREDS] = policy.value_function
return out
@@ -367,11 +366,11 @@ class ValueNetworkMixin(object):
def value(self, ob, *args):
feed_dict = {
self.get_placeholder(SampleBatch.CUR_OBS): [ob],
self.model.seq_lens: [1]
self.seq_lens: [1]
}
assert len(args) == len(self.model.state_in), \
(args, self.model.state_in)
for k, v in zip(self.model.state_in, args):
assert len(args) == len(self.state_in), \
(args, self.state_in)
for k, v in zip(self.state_in, args):
feed_dict[k] = v
vf = self.get_session().run(self.value_function, feed_dict)
return vf[0]
+3
View File
@@ -151,6 +151,9 @@ def validate_config(config):
"FYI: By default, the value function will not share layers "
"with the policy model ('vf_share_layers': False).")
# auto set the model option for layer sharing
config["model"]["vf_share_layers"] = config["vf_share_layers"]
PPOTrainer = build_trainer(
name="PPO",
+16 -40
View File
@@ -105,11 +105,10 @@ class PPOLoss(object):
def ppo_surrogate_loss(policy, batch_tensors):
if policy.model.state_in:
max_seq_len = tf.reduce_max(
policy.convert_to_eager(policy.model.seq_lens))
if policy.state_in:
max_seq_len = tf.reduce_max(policy.convert_to_eager(policy.seq_lens))
mask = tf.sequence_mask(
policy.convert_to_eager(policy.model.seq_lens), max_seq_len)
policy.convert_to_eager(policy.seq_lens), max_seq_len)
mask = tf.reshape(mask, [-1])
else:
mask = tf.ones_like(
@@ -136,28 +135,26 @@ def ppo_surrogate_loss(policy, batch_tensors):
def kl_and_loss_stats(policy, batch_tensors):
policy.explained_variance = explained_variance(
batch_tensors[Postprocessing.VALUE_TARGETS], policy.value_function)
stats_fetches = {
"cur_kl_coeff": policy.kl_coeff,
"cur_lr": tf.cast(policy.cur_lr, tf.float64),
return {
"cur_kl_coeff": tf.cast(
policy.convert_to_eager(policy.kl_coeff), tf.float64),
"cur_lr": tf.cast(policy.convert_to_eager(policy.cur_lr), tf.float64),
"total_loss": policy.loss_obj.loss,
"policy_loss": policy.loss_obj.mean_policy_loss,
"vf_loss": policy.loss_obj.mean_vf_loss,
"vf_explained_var": policy.explained_variance,
"vf_explained_var": explained_variance(
batch_tensors[Postprocessing.VALUE_TARGETS],
policy.convert_to_eager(policy.value_function)),
"kl": policy.loss_obj.mean_kl,
"entropy": policy.loss_obj.mean_entropy,
}
return stats_fetches
def vf_preds_and_logits_fetches(policy):
"""Adds value function and logits outputs to experience batches."""
return {
SampleBatch.VF_PREDS: policy.value_function,
BEHAVIOUR_LOGITS: policy.model.outputs,
BEHAVIOUR_LOGITS: policy.model_out,
}
@@ -172,7 +169,7 @@ def postprocess_ppo_gae(policy,
last_r = 0.0
else:
next_state = []
for i in range(len(policy.model.state_in)):
for i in range(len(policy.state_in)):
next_state.append([sample_batch["state_out_{}".format(i)][-1]])
last_r = policy._value(sample_batch[SampleBatch.NEXT_OBS][-1],
sample_batch[SampleBatch.ACTIONS][-1],
@@ -225,27 +222,7 @@ class KLCoeffMixin(object):
class ValueNetworkMixin(object):
def __init__(self, obs_space, action_space, config):
if config["use_gae"]:
if config["vf_share_layers"]:
self.value_function = self.model.value_function()
else:
vf_config = config["model"].copy()
# Do not split the last layer of the value function into
# mean parameters and standard deviation parameters and
# do not make the standard deviations free variables.
vf_config["free_log_std"] = False
if vf_config["use_lstm"]:
vf_config["use_lstm"] = False
logger.warning(
"It is not recommended to use a LSTM model with "
"vf_share_layers=False (consider setting it to True). "
"If you want to not share layers, you can implement "
"a custom LSTM model that overrides the "
"value_function() method.")
with tf.variable_scope("value_function"):
self.value_function = ModelCatalog.get_model(
self.get_obs_input_dict(), obs_space, action_space, 1,
vf_config).outputs
self.value_function = tf.reshape(self.value_function, [-1])
self.value_function = self.model.value_function()
else:
self.value_function = tf.zeros(
shape=tf.shape(self.get_placeholder(SampleBatch.CUR_OBS))[:1])
@@ -255,11 +232,10 @@ class ValueNetworkMixin(object):
self.get_placeholder(SampleBatch.CUR_OBS): [ob],
self.get_placeholder(SampleBatch.PREV_ACTIONS): [prev_action],
self.get_placeholder(SampleBatch.PREV_REWARDS): [prev_reward],
self.model.seq_lens: [1]
self.seq_lens: [1]
}
assert len(args) == len(self.model.state_in), \
(args, self.model.state_in)
for k, v in zip(self.model.state_in, args):
assert len(args) == len(self.state_in), (args, self.state_in)
for k, v in zip(self.state_in, args):
feed_dict[k] = v
vf = self.get_session().run(self.value_function, feed_dict)
return vf[0]
+1 -1
View File
@@ -6,7 +6,7 @@ from torch import nn
import torch.nn.functional as F
from ray.rllib.models.preprocessors import get_preprocessor
from ray.rllib.models.pytorch.model import TorchModel
from ray.rllib.models.torch.model import TorchModel
from ray.rllib.utils.annotations import override
+6
View File
@@ -59,6 +59,11 @@ def _import_dqn():
return dqn.DQNTrainer
def _import_simple_q():
from ray.rllib.agents import dqn
return dqn.SimpleQTrainer
def _import_apex():
from ray.rllib.agents import dqn
return dqn.ApexTrainer
@@ -97,6 +102,7 @@ ALGORITHMS = {
"ES": _import_es,
"ARS": _import_ars,
"DQN": _import_dqn,
"SimpleQ": _import_simple_q,
"APEX": _import_apex,
"A3C": _import_a3c,
"A2C": _import_a2c,
@@ -299,6 +299,8 @@ class RolloutWorker(EvaluatorInterface):
logger.info("Creating policy evaluation worker {}".format(
worker_index) +
" on CPU (please ignore any CUDA init errors)")
if not tf:
raise ImportError("Could not import tensorflow")
with tf.Graph().as_default():
if tf_session_creator:
self.tf_sess = tf_session_creator()
@@ -0,0 +1,109 @@
"""Example of using a custom ModelV2 Keras-style model.
TODO(ekl): add this to docs once ModelV2 is fully implemented.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import ray
from ray import tune
from ray.rllib.models import ModelCatalog
from ray.rllib.models.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
parser = argparse.ArgumentParser()
parser.add_argument("--run", type=str, default="SimpleQ") # Try PG, PPO, DQN
parser.add_argument("--stop", type=int, default=200)
class MyKerasModel(TFModelV2):
"""Custom model for policy gradient algorithms."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
super(MyKerasModel, self).__init__(obs_space, action_space,
num_outputs, model_config, name)
self.inputs = tf.keras.layers.Input(
shape=obs_space.shape, name="observations")
layer_1 = tf.keras.layers.Dense(
128,
name="my_layer1",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(1.0))(self.inputs)
layer_out = tf.keras.layers.Dense(
num_outputs,
name="my_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(layer_1)
value_out = tf.keras.layers.Dense(
1,
name="value_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(layer_1)
self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])
self.register_variables(self.base_model.variables)
def forward(self, input_dict, state, seq_lens):
self.prev_input = input_dict
model_out, self._value_out = self.base_model(input_dict["obs"])
return model_out, state
def value_function(self):
return tf.reshape(self._value_out, [-1])
class MyKerasQModel(DistributionalQModel):
"""Custom model for DQN."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name, **kw):
super(MyKerasQModel, self).__init__(
obs_space, action_space, num_outputs, model_config, name, **kw)
# Define the core model layers which will be used by the other
# output heads of DistributionalQModel
self.inputs = tf.keras.layers.Input(
shape=obs_space.shape, name="observations")
layer_1 = tf.keras.layers.Dense(
128,
name="my_layer1",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(1.0))(self.inputs)
layer_out = tf.keras.layers.Dense(
num_outputs,
name="my_out",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(1.0))(layer_1)
self.base_model = tf.keras.Model(self.inputs, layer_out)
self.register_variables(self.base_model.variables)
# Implement the core forward method
def forward(self, input_dict, state, seq_lens):
self.prev_input = input_dict
model_out = self.base_model(input_dict["obs"])
return model_out, state
if __name__ == "__main__":
ray.init(local_mode=True)
args = parser.parse_args()
ModelCatalog.register_custom_model("keras_model", MyKerasModel)
ModelCatalog.register_custom_model("keras_q_model", MyKerasQModel)
tune.run(
args.run,
stop={"episode_reward_mean": args.stop},
config={
"env": "CartPole-v0",
"model": {
"custom_model": "keras_q_model"
if args.run == "DQN" else "keras_model"
},
})
@@ -173,9 +173,14 @@ if __name__ == "__main__":
"observation_filter": "NoFilter", # don't filter the action list
"vf_share_layers": True, # don't create duplicate value model
}
elif args.run == "DQN":
elif args.run in ["SimpleQ", "DQN"]:
cfg = {
"hiddens": [], # important: don't postprocess the action scores
# TODO(ekl) we could support dueling if the model in this example
# was ModelV2 and only emitted -inf values on get_q_values().
# The problem with ModelV1 is that the model outputs
# are used as state scores and hence cause blowup to inf.
"dueling": False,
}
else:
cfg = {} # PG, IMPALA, A2C, etc.
+100 -10
View File
@@ -16,10 +16,13 @@ from ray.rllib.models.action_dist import (Categorical, MultiCategorical,
MultiActionDistribution, Dirichlet)
from ray.rllib.models.torch_action_dist import (TorchCategorical,
TorchDiagGaussian)
from ray.rllib.models.tf.modelv1_compat import make_v1_wrapper
from ray.rllib.models.preprocessors import get_preprocessor
from ray.rllib.models.fcnet import FullyConnectedNetwork
from ray.rllib.models.visionnet import VisionNetwork
from ray.rllib.models.lstm import LSTM
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI
from ray.rllib.utils import try_import_tf
@@ -41,8 +44,12 @@ MODEL_DEFAULTS = {
"fcnet_hiddens": [256, 256],
# For control envs, documented in ray.rllib.models.Model
"free_log_std": False,
# (deprecated) Whether to use sigmoid to squash actions to space range
"squash_to_range": False,
# Whether to skip the final linear layer used to resize the hidden layer
# outputs to size `num_outputs`. If True, then the last hidden layer
# should already match num_outputs.
"no_final_linear": False,
# Whether layers should be shared for the value function.
"vf_share_layers": False,
# == LSTM ==
# Whether to wrap the model with a LSTM
@@ -53,6 +60,9 @@ MODEL_DEFAULTS = {
"lstm_cell_size": 256,
# Whether to feed a_{t-1}, r_{t-1} to LSTM
"lstm_use_prev_action_reward": False,
# When using modelv1 models with a modelv2 algorithm, you may have to
# define the state shape here (e.g., [256, 256]).
"state_shape": None,
# == Atari ==
# Whether to enable framestack for Atari envs
@@ -117,10 +127,6 @@ class ModelCatalog(object):
"using a Tuple action space, or the multi-agent API.")
if dist_type is None:
dist = TorchDiagGaussian if torch else DiagGaussian
if config.get("squash_to_range"):
raise ValueError(
"The squash_to_range option is deprecated. See the "
"clip_actions agent option instead.")
return dist, action_space.shape[0] * 2
elif dist_type == "deterministic":
return Deterministic, action_space.shape[0]
@@ -196,6 +202,90 @@ class ModelCatalog(object):
raise NotImplementedError("action space {}"
" not supported".format(action_space))
@staticmethod
def get_model_v2(obs_space,
action_space,
num_outputs,
model_config,
framework="tf",
name=None,
model_interface=None,
**model_kwargs):
"""Returns a suitable model compatible with given spaces and output.
Args:
obs_space (Space): Observation space of the target gym env. This
may have an `original_space` attribute that specifies how to
unflatten the tensor into a ragged tensor.
action_space (Space): Action space of the target gym env.
num_outputs (int): The size of the output vector of the model.
framework (str): Either "tf" or "torch".
name (str): Name (scope) for the model.
model_interface (cls): Interface required for the model
model_kwargs (dict): args to pass to the ModelV2 constructor
Returns:
model (ModelV2): Model to use for the policy.
"""
if model_config.get("custom_model"):
model_cls = _global_registry.get(RLLIB_MODEL,
model_config["custom_model"])
if issubclass(model_cls, ModelV2):
if model_interface and not issubclass(model_cls,
model_interface):
raise ValueError("The given model must subclass",
model_interface)
created = set()
# Track and warn if variables were created but no registered
def track_var_creation(next_creator, **kw):
v = next_creator(**kw)
created.add(v)
return v
with tf.variable_creator_scope(track_var_creation):
instance = model_cls(obs_space, action_space, num_outputs,
model_config, name, **model_kwargs)
registered = set(instance.variables())
not_registered = set()
for var in created:
if var not in registered:
not_registered.add(var)
if not_registered:
raise ValueError(
"It looks like variables {} were created as part of "
"{} but does not appear in model.variables() ({}). "
"Did you forget to call model.register_variables() "
"on the variables in question?".format(
not_registered, instance, registered))
return instance
if framework == "tf":
legacy_model_cls = ModelCatalog.get_model
wrapper = ModelCatalog._wrap_if_needed(
make_v1_wrapper(legacy_model_cls), model_interface)
return wrapper(obs_space, action_space, num_outputs, model_config,
name, **model_kwargs)
raise NotImplementedError("TODO: support {} models".format(framework))
@staticmethod
def _wrap_if_needed(model_cls, model_interface):
assert issubclass(model_cls, TFModelV2)
if not model_interface or issubclass(model_cls, model_interface):
return model_cls
class wrapper(model_interface, model_cls):
pass
name = "{}_as_{}".format(model_cls.__name__, model_interface.__name__)
wrapper.__name__ = name
wrapper.__qualname__ = name
return wrapper
@staticmethod
@DeveloperAPI
def get_model(input_dict,
@@ -284,10 +374,10 @@ class ModelCatalog(object):
Returns:
model (models.Model): Neural network model.
"""
from ray.rllib.models.pytorch.fcnet import (FullyConnectedNetwork as
PyTorchFCNet)
from ray.rllib.models.pytorch.visionnet import (VisionNetwork as
PyTorchVisionNet)
from ray.rllib.models.torch.fcnet import (FullyConnectedNetwork as
PyTorchFCNet)
from ray.rllib.models.torch.visionnet import (VisionNetwork as
PyTorchVisionNet)
options = options or MODEL_DEFAULTS
+12 -2
View File
@@ -28,6 +28,16 @@ class FullyConnectedNetwork(Model):
i = 1
last_layer = inputs
for size in hiddens:
# skip final linear layer
if options.get("no_final_linear") and i == len(hiddens):
output = tf.layers.dense(
last_layer,
num_outputs,
kernel_initializer=normc_initializer(1.0),
activation=activation,
name="fc_out")
return output, output
label = "fc{}".format(i)
last_layer = tf.layers.dense(
last_layer,
@@ -36,11 +46,11 @@ class FullyConnectedNetwork(Model):
activation=activation,
name=label)
i += 1
label = "fc_out"
output = tf.layers.dense(
last_layer,
num_outputs,
kernel_initializer=normc_initializer(0.01),
activation=None,
name=label)
name="fc_out")
return output, last_layer
+18
View File
@@ -67,6 +67,7 @@ class Model(object):
self.options = options
self.scope = tf.get_variable_scope()
self.session = tf.get_default_session()
self.input_dict = input_dict
if seq_lens is not None:
self.seq_lens = seq_lens
else:
@@ -77,6 +78,8 @@ class Model(object):
if options.get("free_log_std"):
assert num_outputs % 2 == 0
num_outputs = num_outputs // 2
ok = True
try:
restored = input_dict.copy()
restored["obs"] = restore_original_dimensions(
@@ -84,6 +87,10 @@ class Model(object):
self.outputs, self.last_layer = self._build_layers_v2(
restored, num_outputs, options)
except NotImplementedError:
ok = False
# In TF 1.14, you cannot construct variable scopes in exception
# handlers so we have to set the OK flag and check it here:
if not ok:
self.outputs, self.last_layer = self._build_layers(
input_dict["obs"], num_outputs, options)
@@ -192,6 +199,12 @@ class Model(object):
"""Deprecated: use self.custom_loss()."""
return None
@classmethod
def get_initial_state(cls, obs_space, action_space, num_outputs, options):
raise NotImplementedError(
"In order to use recurrent models with ModelV2, you should define "
"the get_initial_state @classmethod on your custom model class.")
def _validate_output_shape(self):
"""Checks that the model has the correct number of outputs."""
try:
@@ -226,6 +239,11 @@ def restore_original_dimensions(obs, obs_space, tensorlib=tf):
"""
if hasattr(obs_space, "original_space"):
if tensorlib == "tf":
tensorlib = tf
elif tensorlib == "torch":
import torch
tensorlib = torch
return _unpack_obs(obs, obs_space.original_space, tensorlib=tensorlib)
else:
return obs
+169
View File
@@ -0,0 +1,169 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.models.model import restore_original_dimensions
class ModelV2(object):
"""Defines a Keras-style abstract network model for use with RLlib.
Custom models should extend either TFModelV2 or TorchModelV2 instead of
this class directly. Experimental.
Attributes:
obs_space (Space): observation space of the target gym env. This
may have an `original_space` attribute that specifies how to
unflatten the tensor into a ragged tensor.
action_space (Space): action space of the target gym env
num_outputs (int): number of output units of the model
model_config (dict): config for the model, documented in ModelCatalog
name (str): name (scope) for the model
framework (str): either "tf" or "torch"
"""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name, framework):
"""Initialize the model.
This method should create any variables used by the model.
"""
self.obs_space = obs_space
self.action_space = action_space
self.num_outputs = num_outputs
self.model_config = model_config
self.name = name or "default_model"
self.framework = framework
self.var_list = []
def get_initial_state(self):
"""Get the initial recurrent state values for the model.
Returns:
list of np.array objects, if any
"""
return []
def forward(self, input_dict, state, seq_lens):
"""Call the model with the given input tensors and state.
Any complex observations (dicts, tuples, etc.) will be unpacked by
__call__ before being passed to forward(). To access the flattened
observation tensor, refer to input_dict["obs_flat"].
This method can be called any number of times. In eager execution,
each call to forward() will eagerly evaluate the model. In symbolic
execution, each call to forward creates a computation graph that
operates over the variables of this model (i.e., shares weights).
Custom models should override this instead of __call__.
Arguments:
input_dict (dict): dictionary of input tensors, including "obs",
"obs_flat", "prev_action", "prev_reward", "is_training"
state (list): list of state tensors with sizes matching those
returned by get_initial_state + the batch dimension
seq_lens (Tensor): 1d tensor holding input sequence lengths
Returns:
(outputs, state): The model output tensor of size
[BATCH, num_outputs]
"""
raise NotImplementedError
def value_function(self):
"""Return the value function estimate for the most recent forward pass.
Returns:
value estimate tensor of shape [BATCH].
"""
raise NotImplementedError
def custom_loss(self, policy_loss, loss_inputs):
"""Override to customize the loss function used to optimize this model.
This can be used to incorporate self-supervised losses (by defining
a loss over existing input and output tensors of this model), and
supervised losses (by defining losses over a variable-sharing copy of
this model's layers).
You can find an runnable example in examples/custom_loss.py.
Arguments:
policy_loss (Tensor): scalar policy loss from the policy.
loss_inputs (dict): map of input placeholders for rollout data.
Returns:
Scalar tensor for the customized loss for this model.
"""
return policy_loss
def metrics(self):
"""Override to return custom metrics from your model.
The stats will be reported as part of the learner stats, i.e.,
info:
learner:
model:
key1: metric1
key2: metric2
Returns:
Dict of string keys to scalar tensors.
"""
return {}
def register_variables(self, variables):
"""Register the given list of variables with this model."""
self.var_list.extend(variables)
def variables(self):
"""Returns the list of variables for this model."""
return self.var_list
def trainable_variables(self):
"""Returns the list of trainable variables for this model."""
return self.variables()
def __call__(self, input_dict, state, seq_lens):
"""Call the model with the given input tensors and state.
This is the method used by RLlib to execute the forward pass. It calls
forward() internally after unpacking nested observation tensors.
Custom models should override forward() instead of __call__.
Arguments:
input_dict (dict): dictionary of input tensors, including "obs",
"prev_action", "prev_reward", "is_training"
state (list): list of state tensors with sizes matching those
returned by get_initial_state + the batch dimension
seq_lens (Tensor): 1d tensor holding input sequence lengths
Returns:
(outputs, state): The model output tensor of size
[BATCH, output_spec.size] or a list of tensors corresponding to
output_spec.shape_list, and a list of state tensors of
[BATCH, state_size_i].
"""
restored = input_dict.copy()
restored["obs"] = restore_original_dimensions(
input_dict["obs"], self.obs_space, self.framework)
restored["obs_flat"] = input_dict["obs"]
outputs, state = self.forward(restored, state, seq_lens)
try:
shape = outputs.shape
except AttributeError:
raise ValueError("Output is not a tensor: {}".format(outputs))
else:
if len(shape) != 2 or shape[1] != self.num_outputs:
raise ValueError(
"Expected output shape of [None, {}], got {}".format(
self.num_outputs, shape))
if not isinstance(state, list):
raise ValueError("State output is not a list: {}".format(state))
return outputs, state
@@ -0,0 +1,134 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import numpy as np
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.misc import linear, normc_initializer
from ray.rllib.utils.annotations import override
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.tf_ops import scope_vars
tf = try_import_tf()
logger = logging.getLogger(__name__)
def make_v1_wrapper(legacy_model_cls):
class ModelV1Wrapper(TFModelV2):
"""Wrapper that allows V1 models to be used as ModelV2."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
TFModelV2.__init__(self, obs_space, action_space, num_outputs,
model_config, name)
self.legacy_model_cls = legacy_model_cls
def instance_template(input_dict, state, seq_lens):
# create a new model instance
with tf.variable_scope(self.name):
new_instance = self.legacy_model_cls(
input_dict, obs_space, action_space, num_outputs,
model_config, state, seq_lens)
return new_instance
self.instance_template = tf.make_template("instance_template",
instance_template)
# Tracks the last v1 model created by the call to forward
self.cur_instance = None
def vf_template(last_layer, input_dict):
with tf.variable_scope(self.variable_scope):
with tf.variable_scope("value_function"):
# Simple case: sharing the feature layer
if model_config["vf_share_layers"]:
return tf.reshape(
linear(last_layer, 1, "value_function",
normc_initializer(1.0)), [-1])
# Create a new separate model with no RNN state, etc.
branch_model_config = model_config.copy()
branch_model_config["free_log_std"] = False
if branch_model_config["use_lstm"]:
branch_model_config["use_lstm"] = False
logger.warning(
"It is not recommended to use a LSTM model "
"with vf_share_layers=False (consider "
"setting it to True). If you want to not "
"share layers, you can implement a custom "
"LSTM model that overrides the "
"value_function() method.")
branch_instance = legacy_model_cls(
input_dict,
obs_space,
action_space,
1,
branch_model_config,
state_in=None,
seq_lens=None)
return tf.reshape(branch_instance.outputs, [-1])
self.vf_template = tf.make_template("vf_template", vf_template)
# XXX: Try to guess the initial state size. Since the size of the
# state is known only after forward() for V1 models, it might be
# wrong.
if model_config.get("state_shape"):
self.initial_state = [
np.zeros(s, np.float32)
for s in model_config["state_shape"]
]
elif model_config.get("use_lstm"):
cell_size = model_config.get("lstm_cell_size", 256)
self.initial_state = [
np.zeros(cell_size, np.float32),
np.zeros(cell_size, np.float32),
]
else:
self.initial_state = []
with tf.variable_scope(self.name) as scope:
self.variable_scope = scope
@override(ModelV2)
def get_initial_state(self):
return self.initial_state
@override(ModelV2)
def __call__(self, input_dict, state, seq_lens):
new_instance = self.instance_template(input_dict, state, seq_lens)
if len(new_instance.state_init) != len(self.get_initial_state()):
raise ValueError(
"When using a custom recurrent ModelV1 model, you should "
"declare the state_shape in the model options. For "
"example, set 'state_shape': [256, 256] for a lstm with "
"cell size 256. The guessed state shape was {} which "
"appears to be incorrect.".format(
[s.shape[0] for s in self.get_initial_state()]))
self.cur_instance = new_instance
self.variable_scope = new_instance.scope
return new_instance.outputs, new_instance.state_out
@override(ModelV2)
def variables(self):
return super(ModelV1Wrapper, self).variables() + scope_vars(
self.variable_scope)
@override(ModelV2)
def custom_loss(self, policy_loss, loss_inputs):
return self.cur_instance.custom_loss(policy_loss, loss_inputs)
@override(ModelV2)
def metrics(self):
return self.cur_instance.custom_stats()
@override(ModelV2)
def value_function(self):
assert self.cur_instance, "must call forward first"
return self.vf_template(self.cur_instance.last_layer,
self.cur_instance.input_dict)
return ModelV1Wrapper
+23
View File
@@ -0,0 +1,23 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
class TFModelV2(ModelV2):
"""TF version of ModelV2."""
def __init__(self, obs_space, action_space, output_spec, model_config,
name):
ModelV2.__init__(
self,
obs_space,
action_space,
output_spec,
model_config,
name,
framework="tf")
@@ -6,8 +6,8 @@ import logging
import numpy as np
import torch.nn as nn
from ray.rllib.models.pytorch.model import TorchModel
from ray.rllib.models.pytorch.misc import normc_initializer, SlimFC, \
from ray.rllib.models.torch.model import TorchModel
from ray.rllib.models.torch.misc import normc_initializer, SlimFC, \
_get_activation_fn
from ray.rllib.utils.annotations import override
@@ -9,6 +9,7 @@ from ray.rllib.models.model import restore_original_dimensions
from ray.rllib.utils.annotations import PublicAPI
# TODO(ekl) rewrite using modelv2
@PublicAPI
class TorchModel(nn.Module):
"""Defines an abstract network model for use with RLlib / PyTorch."""
@@ -31,6 +32,7 @@ class TorchModel(nn.Module):
def forward(self, input_dict, hidden_state):
"""Wraps _forward() to unpack flattened Dict and Tuple observations."""
input_dict["obs"] = input_dict["obs"].float() # TODO(ekl): avoid cast
input_dict["obs_flat"] = input_dict["obs"]
input_dict["obs"] = restore_original_dimensions(
input_dict["obs"], self.obs_space, tensorlib=torch)
outputs, features, vf, h = self._forward(input_dict, hidden_state)
@@ -0,0 +1,20 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.models.modelv2 import ModelV2
class TorchModelV2(ModelV2):
"""Torch version of ModelV2."""
def __init__(self, obs_space, action_space, output_spec, model_config,
name):
ModelV2.__init__(
self,
obs_space,
action_space,
output_spec,
model_config,
name,
framework="torch")
@@ -4,8 +4,8 @@ from __future__ import print_function
import torch.nn as nn
from ray.rllib.models.pytorch.model import TorchModel
from ray.rllib.models.pytorch.misc import normc_initializer, valid_padding, \
from ray.rllib.models.torch.model import TorchModel
from ray.rllib.models.torch.misc import normc_initializer, valid_padding, \
SlimConv2d, SlimFC
from ray.rllib.models.visionnet import _get_filter_config
from ray.rllib.utils.annotations import override
+13
View File
@@ -33,6 +33,19 @@ class VisionNetwork(Model):
padding="same",
name="conv{}".format(i))
out_size, kernel, stride = filters[-1]
# skip final linear layer
if options.get("no_final_linear"):
fc_out = tf.layers.conv2d(
inputs,
num_outputs,
kernel,
stride,
activation=activation,
padding="valid",
name="fc_out")
return flatten(fc_out), flatten(fc_out)
fc1 = tf.layers.conv2d(
inputs,
out_size,
+113 -71
View File
@@ -23,12 +23,19 @@ logger = logging.getLogger(__name__)
class DynamicTFPolicy(TFPolicy):
"""A TFPolicy that auto-defines placeholders dynamically at runtime.
This class also supports eager execution if config["use_eager"] is True.
Eager execution is implemented using a py_function op inside graph mode.
Initialization of this class occurs in two phases.
* Phase 1: the model is created and model variables are initialized.
* Phase 2: a fake batch of data is created, sent to the trajectory
postprocessor, and then used to create placeholders for the loss
function. The loss and stats functions are initialized with these
placeholders.
Initialization defines the static graph. When using eager execution, a
corresponding imperative py_function is also generated as an embedded op
inside the static graph.
"""
def __init__(self,
@@ -40,8 +47,10 @@ class DynamicTFPolicy(TFPolicy):
update_ops_fn=None,
grad_stats_fn=None,
before_loss_init=None,
make_action_sampler=None,
make_model=None,
action_sampler_fn=None,
existing_inputs=None,
existing_model=None,
get_batch_divisibility_req=None,
obs_include_prev_action_reward=True):
"""Initialize a dynamic TF policy.
@@ -60,17 +69,32 @@ class DynamicTFPolicy(TFPolicy):
overriding the update ops to run when applying gradients
before_loss_init (func): optional function to run prior to loss
init that takes the same arguments as __init__
make_action_sampler (func): optional function that returns a
tuple of action and action prob tensors. The function takes
(policy, input_dict, obs_space, action_space, config) as its
arguments
make_model (func): optional function that returns a ModelV2 object
given (policy, obs_space, action_space, config).
All policy variables should be created in this function. If not
specified, a default model will be created.
action_sampler_fn (func): optional function that returns a
tuple of action and action prob tensors given
(policy, model, input_dict, obs_space, action_space, config).
If not specified, a default action distribution will be used.
existing_inputs (OrderedDict): when copying a policy, this
specifies an existing dict of placeholders to use instead of
defining new ones
existing_model (ModelV2): when copying a policy, this specifies
an existing model to clone and share weights with
get_batch_divisibility_req (func): optional function that returns
the divisibility requirement for sample batches
obs_include_prev_action_reward (bool): whether to include the
previous action and reward in the model input
Attributes:
config: config of the policy
model: model instance, if any
model_out: output tensors of the model
action_dist: action distribution of the model, if any
state_in: state input tensors, if any
state_out: state output tensors, if any
seq_lens: tensor of sequence lengths
"""
self.config = config
self._loss_fn = loss_fn
@@ -104,40 +128,47 @@ class DynamicTFPolicy(TFPolicy):
SampleBatch.PREV_REWARDS: prev_rewards,
"is_training": self._get_is_training_placeholder(),
}
self.seq_lens = tf.placeholder(
dtype=tf.int32, shape=[None], name="seq_lens")
# Create the model network and action outputs
if make_action_sampler:
assert not existing_inputs, \
"Cloning not supported with custom action sampler"
self.model = None
self.dist_class = None
self.action_dist = None
action_sampler, action_prob = make_action_sampler(
self, self.input_dict, obs_space, action_space, config)
# Setup model
self.dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"])
if existing_model:
self.model = existing_model
elif make_model:
self.model = make_model(self, obs_space, action_space, config)
else:
self.dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"])
if existing_inputs:
existing_state_in = [
v for k, v in existing_inputs.items()
if k.startswith("state_in_")
]
if existing_state_in:
existing_seq_lens = existing_inputs["seq_lens"]
else:
existing_seq_lens = None
else:
existing_state_in = []
existing_seq_lens = None
self.model = ModelCatalog.get_model(
self.input_dict,
self.model = ModelCatalog.get_model_v2(
obs_space,
action_space,
logit_dim,
self.config["model"],
state_in=existing_state_in,
seq_lens=existing_seq_lens)
self.action_dist = self.dist_class(self.model.outputs)
framework="tf")
if existing_inputs:
self.state_in = [
v for k, v in existing_inputs.items()
if k.startswith("state_in_")
]
if self.state_in:
self.seq_lens = existing_inputs["seq_lens"]
else:
self.state_in = [
tf.placeholder(shape=(None, ) + s.shape, dtype=s.dtype)
for s in self.model.get_initial_state()
]
self.model_out, self.state_out = self.model(
self.input_dict, self.state_in, self.seq_lens)
# Setup action sampler
if action_sampler_fn:
self.action_dist = None
self.dist_class = None
action_sampler, action_prob = action_sampler_fn(
self, self.model, self.input_dict, obs_space, action_space,
config)
else:
self.action_dist = self.dist_class(self.model_out)
action_sampler = self.action_dist.sample()
action_prob = self.action_dist.sampled_action_prob()
@@ -158,11 +189,11 @@ class DynamicTFPolicy(TFPolicy):
loss=None, # dynamically initialized on run
loss_inputs=[],
model=self.model,
state_inputs=self.model and self.model.state_in,
state_outputs=self.model and self.model.state_out,
state_inputs=self.state_in,
state_outputs=self.state_out,
prev_action_input=prev_actions,
prev_reward_input=prev_rewards,
seq_lens=self.model and self.model.seq_lens,
seq_lens=self.seq_lens,
max_seq_len=config["model"]["max_seq_len"],
batch_divisibility_req=batch_divisibility_req)
@@ -195,11 +226,6 @@ class DynamicTFPolicy(TFPolicy):
def copy(self, existing_inputs):
"""Creates a copy of self using existing input placeholders."""
if self.config["use_eager"]:
raise ValueError(
"eager not implemented for multi-GPU, try setting "
"`simple_optimizer: true`")
# Note that there might be RNN state inputs at the end of the list
if self._state_inputs:
num_state_inputs = len(self._state_inputs) + 1
@@ -227,12 +253,18 @@ class DynamicTFPolicy(TFPolicy):
self.observation_space,
self.action_space,
self.config,
existing_inputs=input_dict)
existing_inputs=input_dict,
existing_model=self.model)
loss = instance._do_loss_init(input_dict)
TFPolicy._initialize_loss(
instance, loss, [(k, existing_inputs[i])
for i, (k, _) in enumerate(self._loss_inputs)])
loss_inputs = [(k, existing_inputs[i])
for i, (k, _) in enumerate(self._loss_inputs)]
if self.config["use_eager"]:
loss, new_stats = instance._gen_eager_loss_op(loss_inputs)
instance._stats_fetches = new_stats
TFPolicy._initialize_loss(instance, loss, loss_inputs)
if instance._grad_stats_fn:
instance._stats_fetches.update(
instance._grad_stats_fn(instance, instance._grads))
@@ -241,7 +273,7 @@ class DynamicTFPolicy(TFPolicy):
@override(Policy)
def get_initial_state(self):
if self.model:
return self.model.state_init
return self.model.get_initial_state()
else:
return []
@@ -321,31 +353,8 @@ class DynamicTFPolicy(TFPolicy):
# and non-eager tensors, so losses that read non-eager tensors through
# `policy` need to use `policy.convert_to_eager(tensor)`.
if self.config["use_eager"]:
if not self.model:
raise ValueError("eager not implemented in this case")
graph_tensors = list(self._needs_eager_conversion)
def gen_loss(model_outputs, *args):
# fill in the batch tensor dict with eager ensors
eager_inputs = dict(
zip([k for (k, v) in loss_inputs],
args[:len(loss_inputs)]))
# fill in the eager versions of all accessed graph tensors
self._eager_tensors = dict(
zip(graph_tensors, args[len(loss_inputs):]))
# patch the action dist to use eager mode tensors
self.action_dist.inputs = model_outputs
return self._loss_fn(self, eager_inputs)
# TODO(ekl) also handle the stats funcs
loss = tf.py_function(
gen_loss,
# cast works around TypeError: Cannot convert provided value
# to EagerTensor. Provided value: 0.0 Requested dtype: int64
[self.model.outputs] + [
tf.cast(v, tf.float32) for (k, v) in loss_inputs
] + [tf.cast(t, tf.float32) for t in graph_tensors],
tf.float32)
loss, new_stats = self._gen_eager_loss_op(loss_inputs)
self._stats_fetches = new_stats
TFPolicy._initialize_loss(self, loss, loss_inputs)
if self._grad_stats_fn:
@@ -359,3 +368,36 @@ class DynamicTFPolicy(TFPolicy):
if self._update_ops_fn:
self._update_ops = self._update_ops_fn(self)
return loss
def _gen_eager_loss_op(self, loss_inputs):
graph_tensors = list(self._needs_eager_conversion)
stat_items = list(self._stats_fetches.items())
def gen_loss(model_outputs, *args):
# fill in the batch tensor dict with eager ensors
eager_inputs = dict(
zip([k for (k, v) in loss_inputs], args[:len(loss_inputs)]))
# fill in the eager versions of all accessed graph tensors
self._eager_tensors = dict(
zip(graph_tensors, args[len(loss_inputs):]))
# patch the action dist to use eager mode tensors
self.action_dist.inputs = model_outputs
loss = self._loss_fn(self, eager_inputs)
if self._stats_fn:
stats = self._stats_fn(self, eager_inputs)
return [loss] + [stats[k] for (k, v) in stat_items]
eager_out = tf.py_function(
gen_loss,
# cast works around TypeError: Cannot convert provided value
# to EagerTensor. Provided value: 0.0 Requested dtype: int64
[self.model_out] + [
tf.cast(v, tf.float32) for (k, v) in loss_inputs
] + [tf.cast(t, tf.float32) for t in graph_tensors],
Tout=[tf.float32] + [v.dtype for (k, v) in stat_items])
stats = {
k: stat_tensor
for (stat_tensor, (k, v)) in zip(eager_out[1:], stat_items)
}
return eager_out[0], stats
+5 -1
View File
@@ -12,6 +12,7 @@ import ray.experimental.tf_utils
from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models.lstm import chop_into_sequences
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import override, DeveloperAPI
from ray.rllib.utils.debug import log_once, summarize
from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule
@@ -180,7 +181,10 @@ class TFPolicy(Policy):
if self.model:
self._loss = self.model.custom_loss(loss, self._loss_input_dict)
self._stats_fetches.update({"model": self.model.custom_stats()})
self._stats_fetches.update({
"model": self.model.metrics() if isinstance(
self.model, ModelV2) else self.model.custom_stats()
})
else:
self._loss = loss
+21 -6
View File
@@ -26,7 +26,8 @@ def build_tf_policy(name,
before_init=None,
before_loss_init=None,
after_init=None,
make_action_sampler=None,
make_model=None,
action_sampler_fn=None,
mixins=None,
get_batch_divisibility_req=None,
obs_include_prev_action_reward=True):
@@ -40,6 +41,13 @@ def build_tf_policy(name,
This means that you can e.g., depend on any policy attributes created in
the running of `loss_fn` in later functions such as `stats_fn`.
In eager mode (experimental), the following functions will be run
repeatedly on each eager execution: loss_fn, stats_fn
This means that these functions should not define any variables internally,
otherwise they will fail in eager mode execution. Variable should only
be created in make_model (if defined).
Arguments:
name (str): name of the policy (e.g., "PPOTFPolicy")
loss_fn (func): function that returns a loss tensor the policy,
@@ -73,10 +81,14 @@ def build_tf_policy(name,
init that takes the same arguments as the policy constructor
after_init (func): optional function to run at the end of policy init
that takes the same arguments as the policy constructor
make_action_sampler (func): optional function that returns a
tuple of action and action prob tensors. The function takes
(policy, input_dict, obs_space, action_space, config) as its
arguments
make_model (func): optional function that returns a ModelV2 object
given (policy, obs_space, action_space, config).
All policy variables should be created in this function. If not
specified, a default model will be created.
action_sampler_fn (func): optional function that returns a
tuple of action and action prob tensors given
(policy, model, input_dict, obs_space, action_space, config).
If not specified, a default action distribution will be used.
mixins (list): list of any class mixins for the returned policy class.
These mixins will be applied in order and will have higher
precedence than the DynamicTFPolicy class
@@ -97,6 +109,7 @@ def build_tf_policy(name,
obs_space,
action_space,
config,
existing_model=None,
existing_inputs=None):
if get_default_config:
config = dict(get_default_config(), **config)
@@ -123,7 +136,9 @@ def build_tf_policy(name,
grad_stats_fn=grad_stats_fn,
update_ops_fn=update_ops_fn,
before_loss_init=before_loss_init_wrapper,
make_action_sampler=make_action_sampler,
make_model=make_model,
action_sampler_fn=action_sampler_fn,
existing_model=existing_model,
existing_inputs=existing_inputs,
obs_include_prev_action_reward=obs_include_prev_action_reward)
+2
View File
@@ -182,6 +182,7 @@ class RNNSequencing(unittest.TestCase):
"model": {
"custom_model": "rnn",
"max_seq_len": 4,
"state_shape": [3, 3],
},
})
ppo.train()
@@ -238,6 +239,7 @@ class RNNSequencing(unittest.TestCase):
"model": {
"custom_model": "rnn",
"max_seq_len": 4,
"state_shape": [3, 3],
},
})
ppo.train()
+2 -2
View File
@@ -18,8 +18,8 @@ from ray.rllib.env.base_env import BaseEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.models import ModelCatalog
from ray.rllib.models.model import Model
from ray.rllib.models.pytorch.fcnet import FullyConnectedNetwork
from ray.rllib.models.pytorch.model import TorchModel
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork
from ray.rllib.models.torch.model import TorchModel
from ray.rllib.rollout import rollout
from ray.rllib.tests.test_external_env import SimpleServing
from ray.tune.registry import register_env
+58
View File
@@ -0,0 +1,58 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
def huber_loss(x, delta=1.0):
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
return tf.where(
tf.abs(x) < delta,
tf.square(x) * 0.5, delta * (tf.abs(x) - 0.5 * delta))
def reduce_mean_ignore_inf(x, axis):
"""Same as tf.reduce_mean() but ignores -inf values."""
mask = tf.not_equal(x, tf.float32.min)
x_zeroed = tf.where(mask, x, tf.zeros_like(x))
return (tf.reduce_sum(x_zeroed, axis) / tf.reduce_sum(
tf.cast(mask, tf.float32), axis))
def minimize_and_clip(optimizer, objective, var_list, clip_val=10):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (tf.clip_by_norm(grad, clip_val), var)
return gradients
def scope_vars(scope, trainable_only=False):
"""
Get variables inside a scope
The scope can be specified as a string
Parameters
----------
scope: str or VariableScope
scope in which the variables reside.
trainable_only: bool
whether or not to return only the variables that were marked as
trainable.
Returns
-------
vars: [tf.Variable]
list of variables in `scope`.
"""
return tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES
if trainable_only else tf.GraphKeys.VARIABLES,
scope=scope if isinstance(scope, str) else scope.name)