[rllib] Add docs on how to use TF eager execution (#4927)

This commit is contained in:
Eric Liang
2019-06-07 16:42:37 -07:00
committed by GitHub
parent 873d45b467
commit 9e328fbe6f
10 changed files with 215 additions and 6 deletions
+3 -2
View File
@@ -41,8 +41,9 @@ def actor_critic_loss(policy, batch_tensors):
policy.loss = A3CLoss(
policy.action_dist, batch_tensors[SampleBatch.ACTIONS],
batch_tensors[Postprocessing.ADVANTAGES],
batch_tensors[Postprocessing.VALUE_TARGETS], policy.vf,
policy.config["vf_loss_coeff"], policy.config["entropy_coeff"])
batch_tensors[Postprocessing.VALUE_TARGETS],
policy.convert_to_eager(policy.vf), policy.config["vf_loss_coeff"],
policy.config["entropy_coeff"])
return policy.loss.total_loss
+6 -4
View File
@@ -106,8 +106,10 @@ class PPOLoss(object):
def ppo_surrogate_loss(policy, batch_tensors):
if policy.model.state_in:
max_seq_len = tf.reduce_max(policy.model.seq_lens)
mask = tf.sequence_mask(policy.model.seq_lens, max_seq_len)
max_seq_len = tf.reduce_max(
policy.convert_to_eager(policy.model.seq_lens))
mask = tf.sequence_mask(
policy.convert_to_eager(policy.model.seq_lens), max_seq_len)
mask = tf.reshape(mask, [-1])
else:
mask = tf.ones_like(
@@ -121,8 +123,8 @@ def ppo_surrogate_loss(policy, batch_tensors):
batch_tensors[BEHAVIOUR_LOGITS],
batch_tensors[SampleBatch.VF_PREDS],
policy.action_dist,
policy.value_function,
policy.kl_coeff,
policy.convert_to_eager(policy.value_function),
policy.convert_to_eager(policy.kl_coeff),
mask,
entropy_coeff=policy.config["entropy_coeff"],
clip_param=policy.config["clip_param"],
+3
View File
@@ -67,6 +67,9 @@ COMMON_CONFIG = {
},
# Whether to attempt to continue training if a worker crashes.
"ignore_worker_failures": False,
# Execute TF loss functions in eager mode. This is currently experimental
# and only really works with the basic PG algorithm.
"use_eager": False,
# === Policy ===
# Arguments to pass to model. See models/catalog.py for a full list of the
@@ -0,0 +1,101 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import random
import ray
from ray import tune
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.models import FullyConnectedNetwork, Model, ModelCatalog
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.utils import try_import_tf
tf = try_import_tf()
parser = argparse.ArgumentParser()
parser.add_argument("--iters", type=int, default=200)
class EagerModel(Model):
"""Example of using embedded eager execution in a custom model.
This shows how to use tf.py_function() to execute a snippet of TF code
in eager mode. Here the `self.forward_eager` method just prints out
the intermediate tensor for debug purposes, but you can in general
perform any TF eager operation in tf.py_function().
"""
def _build_layers_v2(self, input_dict, num_outputs, options):
self.fcnet = FullyConnectedNetwork(input_dict, self.obs_space,
self.action_space, num_outputs,
options)
feature_out = tf.py_function(self.forward_eager,
[self.fcnet.last_layer], tf.float32)
with tf.control_dependencies([feature_out]):
return tf.identity(self.fcnet.outputs), feature_out
def forward_eager(self, feature_layer):
assert tf.executing_eagerly()
if random.random() > 0.99:
print("Eagerly printing the feature layer mean value",
tf.reduce_mean(feature_layer))
return feature_layer
def policy_gradient_loss(policy, batch_tensors):
"""Example of using embedded eager execution in a custom loss.
Here `compute_penalty` prints the actions and rewards for debugging, and
also computes a (dummy) penalty term to add to the loss.
Alternatively, you can set config["use_eager"] = True, which will try to
automatically eagerify the entire loss function. However, this only works
if your loss doesn't reference any non-eager tensors. It also won't work
with the multi-GPU optimizer used by PPO.
"""
def compute_penalty(actions, rewards):
assert tf.executing_eagerly()
penalty = tf.reduce_mean(tf.cast(actions, tf.float32))
if random.random() > 0.9:
print("The eagerly computed penalty is", penalty, actions, rewards)
return penalty
actions = batch_tensors[SampleBatch.ACTIONS]
rewards = batch_tensors[SampleBatch.REWARDS]
penalty = tf.py_function(
compute_penalty, [actions, rewards], Tout=tf.float32)
return penalty - tf.reduce_mean(policy.action_dist.logp(actions) * rewards)
# <class 'ray.rllib.policy.tf_policy_template.MyTFPolicy'>
MyTFPolicy = build_tf_policy(
name="MyTFPolicy",
loss_fn=policy_gradient_loss,
)
# <class 'ray.rllib.agents.trainer_template.MyCustomTrainer'>
MyTrainer = build_trainer(
name="MyCustomTrainer",
default_policy=MyTFPolicy,
)
if __name__ == "__main__":
ray.init()
args = parser.parse_args()
ModelCatalog.register_custom_model("eager_model", EagerModel)
tune.run(
MyTrainer,
stop={"training_iteration": args.iters},
config={
"env": "CartPole-v0",
"num_workers": 0,
"model": {
"custom_model": "eager_model"
},
})
@@ -167,6 +167,8 @@ class DynamicTFPolicy(TFPolicy):
batch_divisibility_req=batch_divisibility_req)
# Phase 2 init
self._needs_eager_conversion = set()
self._eager_tensors = {}
before_loss_init(self, obs_space, action_space, config)
if not existing_inputs:
self._initialize_loss()
@@ -178,10 +180,26 @@ class DynamicTFPolicy(TFPolicy):
"""
return self.input_dict
def convert_to_eager(self, tensor):
"""Convert a graph tensor accessed in the loss to an eager tensor.
Experimental.
"""
if tf.executing_eagerly():
return self._eager_tensors[tensor]
else:
self._needs_eager_conversion.add(tensor)
return tensor
@override(TFPolicy)
def copy(self, existing_inputs):
"""Creates a copy of self using existing input placeholders."""
if self.config["use_eager"]:
raise ValueError(
"eager not implemented for multi-GPU, try setting "
"`simple_optimizer: true`")
# Note that there might be RNN state inputs at the end of the list
if self._state_inputs:
num_state_inputs = len(self._state_inputs) + 1
@@ -297,6 +315,38 @@ class DynamicTFPolicy(TFPolicy):
loss = self._do_loss_init(batch_tensors)
for k in sorted(batch_tensors.accessed_keys):
loss_inputs.append((k, batch_tensors[k]))
# XXX experimental support for automatically eagerifying the loss.
# The main limitation right now is that TF doesn't support mixing eager
# and non-eager tensors, so losses that read non-eager tensors through
# `policy` need to use `policy.convert_to_eager(tensor)`.
if self.config["use_eager"]:
if not self.model:
raise ValueError("eager not implemented in this case")
graph_tensors = list(self._needs_eager_conversion)
def gen_loss(model_outputs, *args):
# fill in the batch tensor dict with eager ensors
eager_inputs = dict(
zip([k for (k, v) in loss_inputs],
args[:len(loss_inputs)]))
# fill in the eager versions of all accessed graph tensors
self._eager_tensors = dict(
zip(graph_tensors, args[len(loss_inputs):]))
# patch the action dist to use eager mode tensors
self.action_dist.inputs = model_outputs
return self._loss_fn(self, eager_inputs)
# TODO(ekl) also handle the stats funcs
loss = tf.py_function(
gen_loss,
# cast works around TypeError: Cannot convert provided value
# to EagerTensor. Provided value: 0.0 Requested dtype: int64
[self.model.outputs] + [
tf.cast(v, tf.float32) for (k, v) in loss_inputs
] + [tf.cast(t, tf.float32) for t in graph_tensors],
tf.float32)
TFPolicy._initialize_loss(self, loss, loss_inputs)
if self._grad_stats_fn:
self._stats_fetches.update(self._grad_stats_fn(self, self._grads))