[rllib] Implement GAE for PPO (#849)

* make information available for GAE

* buggy version of GAE estimator

* fix

* add more logging and reweight losses

* fix logging

* fix loss

* adapt advantage calculation

* update gae

* standardize returns

* don't normalize td lambda ret

* fix

* don't standardize advantages

* do standardization earlier

* different standardization

* initializer

* drop into the debugger

* fix tensorflow broadcasting bug

* vf clipping

* don't standardize tdlambdaret

* different standardization

* use huber loss for value function

* refactor -- first half

* it runs

* fix

* update

* documentation

* linting and tests

* fix linting

* naming

* fix

* linting

* fix

* remove prefix madness

* fixes

* fix

* add value function example

* fix linting

* remove newline
This commit is contained in:
Philipp Moritz
2017-08-23 20:35:47 -07:00
committed by Robert Nishihara
parent c943ecaa42
commit 791bee343f
6 changed files with 184 additions and 47 deletions
+57 -20
View File
@@ -16,7 +16,8 @@ from ray.rllib.models import ModelCatalog
from ray.rllib.policy_gradient.env import BatchedEnv
from ray.rllib.policy_gradient.loss import ProximalPolicyLoss
from ray.rllib.policy_gradient.filter import MeanStdFilter
from ray.rllib.policy_gradient.rollout import rollouts, add_advantage_values
from ray.rllib.policy_gradient.rollout import (
rollouts, add_return_values, add_advantage_values)
from ray.rllib.policy_gradient.utils import flatten, concatenate
# TODO(pcm): Make sure that both observation_filter and reward_filter
@@ -51,19 +52,25 @@ class Agent(object):
config_proto = tf.ConfigProto(**config["tf_session_args"])
self.preprocessor = self.env.preprocessor
self.sess = tf.Session(config=config_proto)
if config["use_tf_debugger"] and not is_remote:
if config["tf_debug_inf_or_nan"] and not is_remote:
self.sess = tf_debug.LocalCLIDebugWrapperSession(self.sess)
self.sess.add_tensor_filter(
"has_inf_or_nan", tf_debug.has_inf_or_nan)
# Defines the training inputs.
# Defines the training inputs:
# The coefficient of the KL penalty.
self.kl_coeff = tf.placeholder(
name="newkl", shape=(), dtype=tf.float32)
# The shape of the preprocessed observations.
self.preprocessor_shape = self.preprocessor.transform_shape(
self.env.observation_space.shape)
# The input observations.
self.observations = tf.placeholder(
tf.float32, shape=(None,) + self.preprocessor_shape)
# Targets of the value function.
self.returns = tf.placeholder(tf.float32, shape=(None,))
# Advantage values in the policy gradient estimator.
self.advantages = tf.placeholder(tf.float32, shape=(None,))
action_space = self.env.action_space
@@ -78,8 +85,11 @@ class Agent(object):
"currently not supported")
self.distribution_class, self.logit_dim = ModelCatalog.get_action_dist(
action_space)
# Log probabilities from the policy before the policy update.
self.prev_logits = tf.placeholder(
tf.float32, shape=(None, self.logit_dim))
# Value function predictions before the policy update.
self.prev_vf_preds = tf.placeholder(tf.float32, shape=(None,))
assert config["sgd_batchsize"] % len(devices) == 0, \
"Batch size must be evenly divisible by devices"
@@ -90,18 +100,18 @@ class Agent(object):
self.batch_size = config["sgd_batchsize"]
self.per_device_batch_size = int(self.batch_size / len(devices))
def build_loss(obs, advs, acts, plog):
def build_loss(obs, rets, advs, acts, plog, pvf_preds):
return ProximalPolicyLoss(
self.env.observation_space, self.env.action_space,
obs, advs, acts, plog, self.logit_dim,
obs, rets, advs, acts, plog, pvf_preds, self.logit_dim,
self.kl_coeff, self.distribution_class, self.config,
self.sess)
self.par_opt = LocalSyncParallelOptimizer(
tf.train.AdamOptimizer(self.config["sgd_stepsize"]),
self.devices,
[self.observations, self.advantages, self.actions,
self.prev_logits],
[self.observations, self.returns, self.advantages,
self.actions, self.prev_logits, self.prev_vf_preds],
self.per_device_batch_size,
build_loss,
self.logdir)
@@ -110,12 +120,20 @@ class Agent(object):
with tf.name_scope("test_outputs"):
policies = self.par_opt.get_device_losses()
self.mean_loss = tf.reduce_mean(
tf.stack(values=[policy.loss for policy in policies]), 0)
tf.stack(values=[
policy.loss for policy in policies]), 0)
self.mean_policy_loss = tf.reduce_mean(
tf.stack(values=[
policy.mean_policy_loss for policy in policies]), 0)
self.mean_vf_loss = tf.reduce_mean(
tf.stack(values=[
policy.mean_vf_loss for policy in policies]), 0)
self.mean_kl = tf.reduce_mean(
tf.stack(values=[policy.mean_kl for policy in policies]), 0)
tf.stack(values=[
policy.mean_kl for policy in policies]), 0)
self.mean_entropy = tf.reduce_mean(
tf.stack(
values=[policy.mean_entropy for policy in policies]), 0)
tf.stack(values=[
policy.mean_entropy for policy in policies]), 0)
# References to the model weights
self.common_policy = self.par_opt.get_common_loss()
@@ -127,20 +145,36 @@ class Agent(object):
self.sess.run(tf.global_variables_initializer())
def load_data(self, trajectories, full_trace):
return self.par_opt.load_data(
self.sess,
[trajectories["observations"],
trajectories["advantages"],
trajectories["actions"].squeeze(),
trajectories["logprobs"]],
full_trace=full_trace)
if self.config["use_gae"]:
return self.par_opt.load_data(
self.sess,
[trajectories["observations"],
trajectories["td_lambda_returns"],
trajectories["advantages"],
trajectories["actions"].squeeze(),
trajectories["logprobs"],
trajectories["vf_preds"]],
full_trace=full_trace)
else:
dummy = np.zeros((trajectories["observations"].shape[0],))
return self.par_opt.load_data(
self.sess,
[trajectories["observations"],
dummy,
trajectories["returns"],
trajectories["actions"].squeeze(),
trajectories["logprobs"],
dummy],
full_trace=full_trace)
def run_sgd_minibatch(
self, batch_index, kl_coeff, full_trace, file_writer):
return self.par_opt.optimize(
self.sess,
batch_index,
extra_ops=[self.mean_loss, self.mean_kl, self.mean_entropy],
extra_ops=[
self.mean_loss, self.mean_policy_loss, self.mean_vf_loss,
self.mean_kl, self.mean_entropy],
extra_feed_dict={self.kl_coeff: kl_coeff},
file_writer=file_writer if full_trace else None)
@@ -155,7 +189,10 @@ class Agent(object):
trajectory = rollouts(
self.common_policy,
self.env, horizon, self.observation_filter, self.reward_filter)
add_advantage_values(trajectory, gamma, lam, self.reward_filter)
if self.config["use_gae"]:
add_advantage_values(trajectory, gamma, lam, self.reward_filter)
else:
add_return_values(trajectory, gamma, self.reward_filter)
return trajectory
def compute_steps(self, gamma, lam, horizon, min_steps_per_task=-1):
+46 -5
View File
@@ -12,7 +12,8 @@ class ProximalPolicyLoss(object):
def __init__(
self, observation_space, action_space,
observations, advantages, actions, prev_logits, logit_dim,
observations, returns, advantages, actions,
prev_logits, prev_vf_preds, logit_dim,
kl_coeff, distribution_class, config, sess):
assert (isinstance(action_space, gym.spaces.Discrete) or
isinstance(action_space, gym.spaces.Box))
@@ -26,6 +27,17 @@ class ProximalPolicyLoss(object):
self.curr_dist = distribution_class(self.curr_logits)
self.sampler = self.curr_dist.sample()
if config["use_gae"]:
vf_config = config["model"].copy()
# Do not split the last layer of the value function into
# mean parameters and standard deviation parameters and
# do not make the standard deviations free variables.
vf_config["free_logstd"] = False
with tf.variable_scope("value_function"):
self.value_function = ModelCatalog.get_model(
observations, 1, vf_config).outputs
self.value_function = tf.reshape(self.value_function, [-1])
# Make loss functions.
self.ratio = tf.exp(self.curr_dist.logp(actions) -
self.prev_dist.logp(actions))
@@ -37,12 +49,41 @@ class ProximalPolicyLoss(object):
self.surr2 = tf.clip_by_value(self.ratio, 1 - config["clip_param"],
1 + config["clip_param"]) * advantages
self.surr = tf.minimum(self.surr1, self.surr2)
self.loss = tf.reduce_mean(-self.surr + kl_coeff * self.kl -
config["entropy_coeff"] * self.entropy)
self.mean_policy_loss = tf.reduce_mean(-self.surr)
if config["use_gae"]:
# We use a huber loss here to be more robust against outliers,
# which seem to occur when the rollouts get longer (the variance
# scales superlinearly with the length of the rollout)
self.vf_loss1 = tf.square(self.value_function - returns)
vf_clipped = prev_vf_preds + tf.clip_by_value(
self.value_function - prev_vf_preds,
-config["clip_param"], config["clip_param"])
self.vf_loss2 = tf.square(vf_clipped - returns)
self.vf_loss = tf.minimum(self.vf_loss1, self.vf_loss2)
self.mean_vf_loss = tf.reduce_mean(self.vf_loss)
self.loss = tf.reduce_mean(
-self.surr + kl_coeff * self.kl +
config["vf_loss_coeff"] * self.vf_loss -
config["entropy_coeff"] * self.entropy)
else:
self.mean_vf_loss = tf.constant(0.0)
self.loss = tf.reduce_mean(
-self.surr +
kl_coeff * self.kl -
config["entropy_coeff"] * self.entropy)
self.sess = sess
def compute_actions(self, observations):
return self.sess.run([self.sampler, self.curr_logits],
if config["use_gae"]:
self.policy_results = [
self.sampler, self.curr_logits, self.value_function]
else:
self.policy_results = [
self.sampler, self.curr_logits, tf.constant("NA")]
def compute(self, observations):
return self.sess.run(self.policy_results,
feed_dict={self.observations: observations})
def loss(self):
@@ -7,6 +7,7 @@ import time
import numpy as np
import tensorflow as tf
from tensorflow.python import debug as tf_debug
import ray
from ray.rllib.common import Algorithm, TrainingResult
@@ -20,6 +21,9 @@ DEFAULT_CONFIG = {
"gamma": 0.995,
# Number of steps after which the rollout gets cut
"horizon": 2000,
# If true, use the Generalized Advantage Estimator (GAE)
# with a value function, see https://arxiv.org/pdf/1506.02438.pdf.
"use_gae": True,
# GAE(lambda) parameter
"lambda": 1.0,
# Initial coefficient for KL divergence
@@ -40,6 +44,8 @@ DEFAULT_CONFIG = {
"rollout_batchsize": 1,
# Total SGD batch size across all devices for SGD
"sgd_batchsize": 128,
# Coefficient of the value function loss
"vf_loss_coeff": 1.0,
# Coefficient of the entropy regularizer
"entropy_coeff": 0.0,
# PPO clip parameter
@@ -58,9 +64,11 @@ DEFAULT_CONFIG = {
"full_trace_nth_sgd_batch": -1,
# Whether to profile data loading
"full_trace_data_load": False,
# Outer loop iteration index when we drop into the TensorFlow debugger
"tf_debug_iteration": -1,
# If this is True, the TensorFlow debugger is invoked if an Inf or NaN
# is detected
"use_tf_debugger": False,
"tf_debug_inf_or_nan": False,
# If True, we write checkpoints and tensorflow logging
"write_logs": True,
# Name of the model checkpoint file
@@ -125,13 +133,22 @@ class PolicyGradient(Algorithm):
simple_value=traj_len_mean)])
file_writer.add_summary(traj_stats, self.global_step)
self.global_step += 1
trajectory["advantages"] = ((trajectory["advantages"] -
trajectory["advantages"].mean()) /
trajectory["advantages"].std())
def standardized(value):
# Divide by the maximum of value.std() and 1e-4
# to guard against the case where all values are equal
return (value - value.mean()) / max(1e-4, value.std())
if config["use_gae"]:
trajectory["advantages"] = standardized(trajectory["advantages"])
else:
trajectory["returns"] = standardized(trajectory["returns"])
rollouts_end = time.time()
print("Computing policy (iterations=" + str(config["num_sgd_iter"]) +
", stepsize=" + str(config["sgd_stepsize"]) + "):")
names = ["iter", "loss", "kl", "entropy"]
names = [
"iter", "total loss", "policy loss", "vf loss", "kl", "entropy"]
print(("{:>15}" * len(names)).format(*names))
trajectory = shuffle(trajectory)
shuffle_end = time.time()
@@ -148,26 +165,35 @@ class PolicyGradient(Algorithm):
batch_index = 0
num_batches = (
int(tuples_per_device) // int(model.per_device_batch_size))
loss, kl, entropy = [], [], []
loss, policy_loss, vf_loss, kl, entropy = [], [], [], [], []
permutation = np.random.permutation(num_batches)
# Prepare to drop into the debugger
if j == config["tf_debug_iteration"]:
model.sess = tf_debug.LocalCLIDebugWrapperSession(model.sess)
while batch_index < num_batches:
full_trace = (
i == 0 and j == 0 and
batch_index == config["full_trace_nth_sgd_batch"])
batch_loss, batch_kl, batch_entropy = model.run_sgd_minibatch(
permutation[batch_index] * model.per_device_batch_size,
self.kl_coeff, full_trace,
file_writer if write_tf_logs else None)
batch_loss, batch_policy_loss, batch_vf_loss, batch_kl, \
batch_entropy = model.run_sgd_minibatch(
permutation[batch_index] * model.per_device_batch_size,
self.kl_coeff, full_trace,
file_writer if write_tf_logs else None)
loss.append(batch_loss)
policy_loss.append(batch_policy_loss)
vf_loss.append(batch_vf_loss)
kl.append(batch_kl)
entropy.append(batch_entropy)
batch_index += 1
loss = np.mean(loss)
policy_loss = np.mean(policy_loss)
vf_loss = np.mean(vf_loss)
kl = np.mean(kl)
entropy = np.mean(entropy)
sgd_end = time.time()
print(
"{:>15}{:15.5e}{:15.5e}{:15.5e}".format(i, loss, kl, entropy))
"{:>15}{:15.5e}{:15.5e}{:15.5e}{:15.5e}{:15.5e}".format(
i, loss, policy_loss, vf_loss, kl, entropy))
values = []
if i == config["num_sgd_iter"] - 1:
+34 -10
View File
@@ -33,14 +33,16 @@ def rollouts(policy, env, horizon, observation_filter=NoFilter(),
observation = observation_filter(env.reset())
done = np.array(env.batchsize * [False])
t = 0
observations = []
raw_rewards = [] # Empirical rewards
actions = []
logprobs = []
dones = []
observations = [] # Filtered observations
raw_rewards = [] # Empirical rewards
actions = [] # Actions sampled by the policy
logprobs = [] # Last layer of the policy network
vf_preds = [] # Value function predictions
dones = [] # Has this rollout terminated?
while not done.all() and t < horizon:
action, logprob = policy.compute_actions(observation)
while True:
action, logprob, vfpred = policy.compute(observation)
vf_preds.append(vfpred)
observations.append(observation[None])
actions.append(action[None])
logprobs.append(logprob[None])
@@ -49,27 +51,49 @@ def rollouts(policy, env, horizon, observation_filter=NoFilter(),
raw_rewards.append(raw_reward[None])
dones.append(done[None])
t += 1
if done.all() or t >= horizon:
break
return {"observations": np.vstack(observations),
"raw_rewards": np.vstack(raw_rewards),
"actions": np.vstack(actions),
"logprobs": np.vstack(logprobs),
"vf_preds": np.vstack(vf_preds),
"dones": np.vstack(dones)}
def add_return_values(trajectory, gamma, reward_filter):
rewards = trajectory["raw_rewards"]
dones = trajectory["dones"]
returns = np.zeros_like(rewards)
last_return = np.zeros(rewards.shape[1], dtype="float32")
for t in reversed(range(len(rewards) - 1)):
last_return = rewards[t, :] * (1 - dones[t, :]) + gamma * last_return
returns[t, :] = last_return
reward_filter(returns[t, :])
trajectory["returns"] = returns
def add_advantage_values(trajectory, gamma, lam, reward_filter):
rewards = trajectory["raw_rewards"]
vf_preds = trajectory["vf_preds"]
dones = trajectory["dones"]
advantages = np.zeros_like(rewards)
last_advantage = np.zeros(rewards.shape[1], dtype="float32")
for t in reversed(range(len(rewards))):
delta = rewards[t, :] * (1 - dones[t, :])
last_advantage = delta + gamma * lam * last_advantage
for t in reversed(range(len(rewards) - 1)):
delta = rewards[t, :] * (1 - dones[t, :]) + \
gamma * vf_preds[t+1, :] * (1 - dones[t+1, :]) - vf_preds[t, :]
last_advantage = \
delta + gamma * lam * last_advantage * (1 - dones[t+1, :])
advantages[t, :] = last_advantage
reward_filter(advantages[t, :])
trajectory["advantages"] = advantages
trajectory["td_lambda_returns"] = \
trajectory["advantages"] + trajectory["vf_preds"]
def collect_samples(agents,
+3 -1
View File
@@ -6,7 +6,9 @@ python train.py --env CartPole-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20
python train.py --env Walker2d-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64}' --alg PolicyGradient --upload-dir s3://bucketname/
python train.py --env Humanoid-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64, "model": {"free_logstd": true}}' --alg PolicyGradient --upload-dir s3://bucketname/
python train.py --env Humanoid-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64, "model": {"free_logstd": true}, "use_gae": false}' --alg PolicyGradient --upload-dir s3://bucketname/
python train.py --env Humanoid-v1 --config '{"lambda": 0.95, "clip_param": 0.2, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "horizon": 5000, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64, "model": {"free_logstd": true}, "write_logs": false}' --alg PolicyGradient --upload-dir s3://bucketname/
python train.py --env PongNoFrameskip-v0 --alg DQN --upload-dir s3://bucketname/
python train.py --env PongDeterministic-v0 --alg A3C --upload-dir s3://bucketname/