[rllib] Add debug info back to PPO and fix optimizer compatibility (#2366)

This commit is contained in:
Eric Liang
2018-07-12 19:22:46 +02:00
committed by Richard Liaw
parent 8ea926c266
commit b316afeb43
14 changed files with 122 additions and 97 deletions
+2 -2
View File
@@ -80,11 +80,11 @@ class A3CAgent(Agent):
def _init(self):
if self.config["use_pytorch"]:
from ray.rllib.agents.a3c.a3c_torch_policy import \
from ray.rllib.agents.a3c.a3c_torch_policy_graph import \
A3CTorchPolicyGraph
policy_cls = A3CTorchPolicyGraph
else:
from ray.rllib.agents.a3c.a3c_tf_policy import A3CPolicyGraph
from ray.rllib.agents.a3c.a3c_tf_policy_graph import A3CPolicyGraph
policy_cls = A3CPolicyGraph
self.local_evaluator = self.make_local_evaluator(
+2 -2
View File
@@ -20,7 +20,7 @@ COMMON_CONFIG = {
# Number of steps after which the rollout gets cut
"horizon": None,
# Number of environments to evaluate vectorwise per worker.
"num_envs": 1,
"num_envs_per_worker": 1,
# Number of actors used for parallelism
"num_workers": 2,
# Default sample batch size
@@ -145,7 +145,7 @@ class Agent(Trainable):
preprocessor_pref=config["preprocessor_pref"],
sample_async=config["sample_async"],
compress_observations=config["compress_observations"],
num_envs=config["num_envs"],
num_envs=config["num_envs_per_worker"],
observation_filter=config["observation_filter"],
env_config=config["env_config"],
model_config=config["model"],
+29 -44
View File
@@ -3,14 +3,13 @@ from __future__ import division
from __future__ import print_function
import os
import numpy as np
import pickle
import ray
from ray.rllib.agents import Agent, with_common_config
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicyGraph
from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph
from ray.rllib.utils import FilterManager
from ray.rllib.optimizers.multi_gpu_optimizer import LocalMultiGPUOptimizer
from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer
from ray.tune.trial import Resources
DEFAULT_CONFIG = with_common_config({
@@ -27,7 +26,7 @@ DEFAULT_CONFIG = with_common_config({
"num_sgd_iter": 30,
# Stepsize of SGD
"sgd_stepsize": 5e-5,
# Total SGD batch size across all devices for SGD
# Total SGD batch size across all devices for SGD (multi-gpu only)
"sgd_batchsize": 128,
# Coefficient of the value function loss
"vf_loss_coeff": 1.0,
@@ -47,6 +46,15 @@ DEFAULT_CONFIG = with_common_config({
"batch_mode": "complete_episodes",
# Which observation filter to apply to the observation
"observation_filter": "MeanStdFilter",
# Use the sync samples optimizer instead of the multi-gpu one
"simple_optimizer": False,
# Override model config
"model": {
# Use LSTM model (note: requires simple optimizer for now).
"use_lstm": False,
# Max seq length for LSTM training.
"max_seq_len": 20,
},
})
@@ -67,57 +75,34 @@ class PPOAgent(Agent):
def _init(self):
self.local_evaluator = self.make_local_evaluator(
self.env_creator, PPOTFPolicyGraph)
self.env_creator, PPOPolicyGraph)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, PPOTFPolicyGraph, self.config["num_workers"],
self.env_creator, PPOPolicyGraph, self.config["num_workers"],
{"num_cpus": self.config["num_cpus_per_worker"],
"num_gpus": self.config["num_gpus_per_worker"]})
self.optimizer = LocalMultiGPUOptimizer(
self.local_evaluator, self.remote_evaluators,
{"sgd_batch_size": self.config["sgd_batchsize"],
"sgd_stepsize": self.config["sgd_stepsize"],
"num_sgd_iter": self.config["num_sgd_iter"],
"timesteps_per_batch": self.config["timesteps_per_batch"]})
if self.config["simple_optimizer"]:
self.optimizer = SyncSamplesOptimizer(
self.local_evaluator, self.remote_evaluators,
{"num_sgd_iter": self.config["num_sgd_iter"]})
else:
self.optimizer = LocalMultiGPUOptimizer(
self.local_evaluator, self.remote_evaluators,
{"sgd_batch_size": self.config["sgd_batchsize"],
"sgd_stepsize": self.config["sgd_stepsize"],
"num_sgd_iter": self.config["num_sgd_iter"],
"timesteps_per_batch": self.config["timesteps_per_batch"],
"standardize_fields": ["advantages"]})
def _train(self):
prev_steps = self.optimizer.num_steps_sampled
def postprocess_samples(batch):
# Divide by the maximum of value.std() and 1e-4
# to guard against the case where all values are equal
value = batch["advantages"]
standardized = (value - value.mean()) / max(1e-4, value.std())
batch.data["advantages"] = standardized
batch.shuffle()
dummy = np.zeros_like(batch["advantages"])
if not self.config["use_gae"]:
batch.data["value_targets"] = dummy
batch.data["vf_preds"] = dummy
extra_fetches = self.optimizer.step(postprocess_fn=postprocess_samples)
kl = np.array(extra_fetches["kl"]).mean(axis=1)[-1]
total_loss = np.array(extra_fetches["total_loss"]).mean(axis=1)[-1]
policy_loss = np.array(extra_fetches["policy_loss"]).mean(axis=1)[-1]
vf_loss = np.array(extra_fetches["vf_loss"]).mean(axis=1)[-1]
entropy = np.array(extra_fetches["entropy"]).mean(axis=1)[-1]
newkl = self.local_evaluator.for_policy(lambda pi: pi.update_kl(kl))
info = {
"kl_divergence": kl,
"kl_coefficient": newkl,
"total_loss": total_loss,
"policy_loss": policy_loss,
"vf_loss": vf_loss,
"entropy": entropy,
}
fetches = self.optimizer.step()
self.local_evaluator.for_policy(lambda pi: pi.update_kl(fetches["kl"]))
FilterManager.synchronize(
self.local_evaluator.filters, self.remote_evaluators)
res = self.optimizer.collect_metrics()
res = res._replace(
timesteps_this_iter=self.optimizer.num_steps_sampled - prev_steps,
info=dict(info, **res.info))
info=dict(fetches, **res.info))
return res
def _stop(self):
@@ -11,7 +11,7 @@ from ray.rllib.models.catalog import ModelCatalog
class PPOLoss(object):
def __init__(
self, action_space, value_targets, advantages, actions, logprobs,
self, action_space, value_targets, advantages, actions, logits,
vf_preds, curr_action_dist, value_fn, cur_kl_coeff,
entropy_coeff=0, clip_param=0.1, vf_loss_coeff=1.0, use_gae=True):
"""Constructs the loss for Proximal Policy Objective.
@@ -24,7 +24,7 @@ class PPOLoss(object):
from previous model evaluation.
advantages (Placeholder): Placeholder for calculated advantages
from previous model evaluation.
logprobs (Placeholder): Placeholder for logits output from
logits (Placeholder): Placeholder for logits output from
previous model evaluation.
vf_preds (Placeholder): Placeholder for value function output
from previous model evaluation.
@@ -39,7 +39,7 @@ class PPOLoss(object):
use_gae (bool): If true, use the Generalized Advantage Estimator.
"""
dist_cls, _ = ModelCatalog.get_action_dist(action_space)
prev_dist = dist_cls(logprobs)
prev_dist = dist_cls(logits)
# Make loss functions.
logp_ratio = tf.exp(
curr_action_dist.logp(actions) - prev_dist.logp(actions))
@@ -60,7 +60,7 @@ class PPOLoss(object):
vf_clipped = vf_preds + tf.clip_by_value(
value_fn - vf_preds, -clip_param, clip_param)
vf_loss2 = tf.square(vf_clipped - value_targets)
vf_loss = tf.minimum(vf_loss1, vf_loss2)
vf_loss = tf.maximum(vf_loss1, vf_loss2)
self.mean_vf_loss = tf.reduce_mean(vf_loss)
loss = tf.reduce_mean(
-surrogate_loss + cur_kl_coeff*action_kl +
@@ -73,7 +73,7 @@ class PPOLoss(object):
self.loss = loss
class PPOTFPolicyGraph(TFPolicyGraph):
class PPOPolicyGraph(TFPolicyGraph):
def __init__(self, observation_space, action_space,
config, existing_inputs=None):
"""
@@ -89,46 +89,48 @@ class PPOTFPolicyGraph(TFPolicyGraph):
self.config = config
self.kl_coeff_val = self.config["kl_coeff"]
self.kl_target = self.config["kl_target"]
dist_cls, logit_dim = ModelCatalog.get_action_dist(
action_space)
dist_cls, logit_dim = ModelCatalog.get_action_dist(action_space)
if existing_inputs:
self.loss_in = existing_inputs
obs_ph, value_targets_ph, adv_ph, act_ph, \
logprobs_ph, vf_preds_ph = [ph for _, ph in existing_inputs]
logits_ph, vf_preds_ph = [ph for _, ph in existing_inputs]
else:
obs_ph = tf.placeholder(
tf.float32, name="obs", shape=(None,)+observation_space.shape)
# Targets of the value function.
value_targets_ph = tf.placeholder(
tf.float32, name="value_targets", shape=(None,))
# Advantage values in the policy gradient estimator.
adv_ph = tf.placeholder(
tf.float32, name="advantages", shape=(None,))
act_ph = ModelCatalog.get_action_placeholder(action_space)
# Log probabilities from the policy before the policy update.
logprobs_ph = tf.placeholder(
tf.float32, name="logprobs", shape=(None, logit_dim))
# Value function predictions before the policy update.
logits_ph = tf.placeholder(
tf.float32, name="logits", shape=(None, logit_dim))
vf_preds_ph = tf.placeholder(
tf.float32, name="vf_preds", shape=(None,))
value_targets_ph = tf.placeholder(
tf.float32, name="value_targets", shape=(None,))
self.loss_in = [
("obs", obs_ph),
("value_targets", value_targets_ph),
("advantages", adv_ph),
("actions", act_ph),
("logprobs", logprobs_ph),
("vf_preds", vf_preds_ph)
("logits", logits_ph),
("vf_preds", vf_preds_ph),
]
# TODO(ekl) feed RNN states in here
self.model = ModelCatalog.get_model(
obs_ph, logit_dim, self.config["model"])
# LSTM support
if not existing_inputs:
for i, ph in enumerate(self.model.state_in):
self.loss_in.append(("state_in_{}".format(i), ph))
# KL Coefficient
self.kl_coeff = tf.get_variable(
initializer=tf.constant_initializer(self.kl_coeff_val),
name="kl_coeff", shape=(), trainable=False, dtype=tf.float32)
self.logits = ModelCatalog.get_model(
obs_ph, logit_dim, self.config["model"]).outputs
self.logits = self.model.outputs
curr_action_dist = dist_cls(self.logits)
self.sampler = curr_action_dist.sample()
if self.config["use_gae"]:
@@ -137,16 +139,17 @@ class PPOTFPolicyGraph(TFPolicyGraph):
# mean parameters and standard deviation parameters and
# do not make the standard deviations free variables.
vf_config["free_log_std"] = False
vf_config["use_lstm"] = False
with tf.variable_scope("value_function"):
self.value_function = ModelCatalog.get_model(
obs_ph, 1, vf_config).outputs
self.value_function = tf.reshape(self.value_function, [-1])
else:
self.value_function = tf.constant("NA")
self.value_function = tf.zeros(shape=tf.shape(obs_ph)[:1])
self.loss_obj = PPOLoss(
action_space, value_targets_ph, adv_ph, act_ph,
logprobs_ph, vf_preds_ph,
logits_ph, vf_preds_ph,
curr_action_dist, self.value_function, self.kl_coeff,
entropy_coeff=self.config["entropy_coeff"],
clip_param=self.config["clip_param"],
@@ -158,19 +161,22 @@ class PPOTFPolicyGraph(TFPolicyGraph):
self, observation_space, action_space,
self.sess, obs_input=obs_ph,
action_sampler=self.sampler, loss=self.loss_obj.loss,
loss_inputs=self.loss_in,
is_training=self.is_training)
loss_inputs=self.loss_in, is_training=self.is_training,
state_inputs=self.model.state_in,
state_outputs=self.model.state_out, seq_lens=self.model.seq_lens)
self.sess.run(tf.global_variables_initializer())
def copy(self, existing_inputs):
"""Creates a copy of self using existing input placeholders."""
return PPOTFPolicyGraph(
return PPOPolicyGraph(
None, self.action_space, self.config,
existing_inputs=existing_inputs)
def extra_compute_action_fetches(self):
return {"vf_preds": self.value_function, "logprobs": self.logits}
return {"vf_preds": self.value_function, "logits": self.logits}
def extra_apply_grad_fetches(self):
def extra_compute_grad_fetches(self):
return {
"total_loss": self.loss_obj.loss,
"policy_loss": self.loss_obj.mean_policy_loss,
@@ -194,6 +200,12 @@ class PPOTFPolicyGraph(TFPolicyGraph):
self.config["lambda"], use_gae=self.config["use_gae"])
return batch
def optimizer(self):
return tf.train.AdamOptimizer(self.config["sgd_stepsize"])
def gradients(self, optimizer):
return optimizer.compute_gradients(
self._loss, colocate_gradients_with_ops=True)
def get_initial_state(self):
return self.model.state_init
@@ -44,6 +44,8 @@ def compute_advantages(rollout, last_r, gamma, lambda_=1.0, use_gae=True):
rewards_plus_v = np.concatenate(
[rollout["rewards"], np.array([last_r])])
traj["advantages"] = discount(rewards_plus_v, gamma)[:-1]
# TODO(ekl): support using a critic without GAE
traj["value_targets"] = np.zeros_like(traj["advantages"])
traj["advantages"] = traj["advantages"].copy().astype(np.float32)
@@ -31,7 +31,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
"""
def _init(self, sgd_batch_size=128, sgd_stepsize=5e-5, num_sgd_iter=10,
timesteps_per_batch=1024):
timesteps_per_batch=1024, standardize_fields=[]):
self.batch_size = sgd_batch_size
self.sgd_stepsize = sgd_stepsize
self.num_sgd_iter = num_sgd_iter
@@ -50,6 +50,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
self.load_timer = TimerStat()
self.grad_timer = TimerStat()
self.update_weights_timer = TimerStat()
self.standardize_fields = standardize_fields
print("LocalMultiGPUOptimizer devices", self.devices)
@@ -58,6 +59,8 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
self.policy = self.local_evaluator.policy_map["default"]
assert isinstance(self.policy, TFPolicyGraph), \
"Only TF policies are supported"
assert len(self.policy.get_initial_state()) == 0, \
"No RNN support yet for multi-gpu. Try the simple optimizer."
# per-GPU graph copies created below must share vars with the policy
# reuse is set to AUTO_REUSE because Adam nodes are created after
@@ -76,7 +79,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
self.sess = self.local_evaluator.tf_sess
self.sess.run(tf.global_variables_initializer())
def step(self, postprocess_fn=None):
def step(self):
with self.update_weights_timer:
if self.remote_evaluators:
weights = ray.put(self.local_evaluator.get_weights())
@@ -93,8 +96,11 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
samples = self.local_evaluator.sample()
self._check_not_multiagent(samples)
if postprocess_fn:
postprocess_fn(samples)
for field in self.standardize_fields:
value = samples[field]
standardized = (value - value.mean()) / max(1e-4, value.std())
samples[field] = standardized
samples.shuffle()
with self.load_timer:
tuples_per_device = self.par_opt.load_data(
@@ -102,26 +108,23 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
samples.columns([key for key, _ in self.policy.loss_inputs()]))
with self.grad_timer:
all_extra_fetches = defaultdict(list)
num_batches = (
int(tuples_per_device) // int(self.per_device_batch_size))
print("== sgd epochs ==")
for i in range(self.num_sgd_iter):
iter_extra_fetches = defaultdict(list)
permutation = np.random.permutation(num_batches)
for batch_index in range(num_batches):
# TODO(ekl) support ppo's debugging features, e.g.
# printing the current loss and tracing
batch_fetches = self.par_opt.optimize(
self.sess,
permutation[batch_index] * self.per_device_batch_size)
for k, v in batch_fetches.items():
iter_extra_fetches[k] += [v]
for k, v in iter_extra_fetches.items():
all_extra_fetches[k] += [v]
iter_extra_fetches[k].append(v)
print(i, _averaged(iter_extra_fetches))
self.num_steps_sampled += samples.count
self.num_steps_trained += samples.count
return all_extra_fetches
return _averaged(iter_extra_fetches)
def stats(self):
return dict(PolicyOptimizer.stats(self), **{
@@ -130,3 +133,11 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
"grad_time_ms": round(1000 * self.grad_timer.mean, 3),
"update_time_ms": round(1000 * self.update_weights_timer.mean, 3),
})
def _averaged(kv):
out = {}
for k, v in kv.items():
if v[0] is not None:
out[k] = np.mean(v)
return out
@@ -62,6 +62,9 @@ class PolicyOptimizer(object):
This should run for long enough to minimize call overheads (i.e., at
least a couple seconds), but short enough to return control
periodically to callers (i.e., at most a few tens of seconds).
Returns:
fetches (dict|None): Optional fetches from compute grads calls.
"""
raise NotImplementedError
@@ -17,11 +17,12 @@ class SyncSamplesOptimizer(PolicyOptimizer):
model weights are then broadcast to all remote evaluators.
"""
def _init(self):
def _init(self, num_sgd_iter=1):
self.update_weights_timer = TimerStat()
self.sample_timer = TimerStat()
self.grad_timer = TimerStat()
self.throughput = RunningStat()
self.num_sgd_iter = num_sgd_iter
def step(self):
with self.update_weights_timer:
@@ -39,11 +40,15 @@ class SyncSamplesOptimizer(PolicyOptimizer):
samples = self.local_evaluator.sample()
with self.grad_timer:
self.local_evaluator.compute_apply(samples)
for i in range(self.num_sgd_iter):
fetches = self.local_evaluator.compute_apply(samples)
if self.num_sgd_iter > 1:
print(i, fetches)
self.grad_timer.push_units_processed(samples.count)
self.num_steps_sampled += samples.count
self.num_steps_trained += samples.count
return fetches
def stats(self):
return dict(PolicyOptimizer.stats(self), **{
@@ -8,6 +8,6 @@ pong-apex:
target_network_update_freq: 50000
num_workers: 32
## can also enable vectorization within processes
# num_envs: 4
# num_envs_per_worker: 4
lr: .0001
gamma: 0.99