[rllib] PPO onto new RLlib APIs (#2270)

This commit is contained in:
Richard Liaw
2018-06-28 09:49:08 -07:00
committed by GitHub
parent b197c0c404
commit b4dff9f933
9 changed files with 324 additions and 410 deletions
@@ -43,6 +43,7 @@ if __name__ == '__main__':
ray.init(num_cpus=num_cpus, redirect_output=True)
config["num_workers"] = num_cpus
config["timesteps_per_batch"] = 10
config["sgd_batchsize"] = 10
config["num_sgd_iter"] = 10
config["gamma"] = 0.999
config["horizon"] = horizon
+35 -31
View File
@@ -34,47 +34,48 @@ class LocalSyncParallelOptimizer(object):
Args:
optimizer: Delegate TensorFlow optimizer object.
devices: List of the names of TensorFlow devices to parallelize over.
input_placeholders: List of inputs for the loss function. Tensors of
these shapes will be passed to build_loss() in order to define the
per-device loss ops.
input_placeholders: List of (name, input_placeholder)
for the loss function. Tensors of these shapes will be passed
to build_graph() in order to define the per-device loss ops.
per_device_batch_size: Number of tuples to optimize over at a time per
device. In each call to `optimize()`,
`len(devices) * per_device_batch_size` tuples of data will be
processed.
build_loss: Function that takes the specified inputs and returns an
object with a 'loss' property that is a scalar Tensor. For example,
ray.rllib.ppo.ProximalPolicyGraph.
build_graph: Function that takes the specified inputs and returns a
TF Policy Graph instance.
logdir: Directory to place debugging output in.
grad_norm_clipping: None or int stdev to clip grad norms by
"""
def __init__(self, optimizer, devices, input_placeholders,
per_device_batch_size, build_loss, logdir,
per_device_batch_size, build_graph, logdir,
grad_norm_clipping=None):
# TODO(rliaw): remove logdir
self.optimizer = optimizer
self.devices = devices
self.batch_size = per_device_batch_size * len(devices)
self.per_device_batch_size = per_device_batch_size
self.input_placeholders = input_placeholders
self.build_loss = build_loss
self.loss_inputs = input_placeholders
self.build_graph = build_graph
self.logdir = logdir
# First initialize the shared loss network
with tf.name_scope(TOWER_SCOPE_NAME):
self._shared_loss = build_loss(*input_placeholders)
self._shared_loss = build_graph(input_placeholders)
# Then setup the per-device loss graphs that use the shared weights
self._batch_index = tf.placeholder(tf.int32)
# Split on the CPU in case the data doesn't fit in GPU memory.
with tf.device("/cpu:0"):
names, placeholders = zip(*input_placeholders)
data_splits = zip(
*[tf.split(ph, len(devices)) for ph in input_placeholders])
*[tf.split(ph, len(devices)) for ph in placeholders])
self._towers = []
for device, device_placeholders in zip(self.devices, data_splits):
self._towers.append(self._setup_device(device,
device_placeholders))
self._towers.append(
self._setup_device(device, zip(names, device_placeholders)))
avg = average_gradients([t.grads for t in self._towers])
if grad_norm_clipping:
@@ -103,8 +104,8 @@ class LocalSyncParallelOptimizer(object):
"""
feed_dict = {}
assert len(self.input_placeholders) == len(inputs)
for ph, arr in zip(self.input_placeholders, inputs):
assert len(self.loss_inputs) == len(inputs)
for (name, ph), arr in zip(self.loss_inputs, inputs):
truncated_arr = make_divisible_by(arr, self.batch_size)
feed_dict[ph] = truncated_arr
truncated_len = len(truncated_arr)
@@ -135,8 +136,7 @@ class LocalSyncParallelOptimizer(object):
assert tuples_per_device % self.per_device_batch_size == 0
return tuples_per_device
def optimize(self, sess, batch_index, extra_ops=[], extra_feed_dict={},
file_writer=None):
def optimize(self, sess, batch_index, file_writer=None):
"""Run a single step of SGD.
Runs a SGD step over a slice of the preloaded batch with size given by
@@ -151,15 +151,12 @@ class LocalSyncParallelOptimizer(object):
batch_index: Offset into the preloaded data. This value must be
between `0` and `tuples_per_device`. The amount of data to
process is always fixed to `per_device_batch_size`.
extra_ops: Extra ops to run with this step (e.g. for metrics).
extra_feed_dict: Extra args to feed into this session run.
file_writer: If specified, tf metrics will be written out using
this.
Returns:
The outputs of extra_ops evaluated over the batch.
"""
if file_writer:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
else:
@@ -167,9 +164,17 @@ class LocalSyncParallelOptimizer(object):
run_metadata = tf.RunMetadata()
feed_dict = {self._batch_index: batch_index}
feed_dict.update(extra_feed_dict)
for tower in self._towers:
feed_dict.update(tower.loss_graph.extra_compute_grad_feed_dict())
feed_dict.update(tower.loss_graph.extra_apply_grad_feed_dict())
fetches = {"train": self._train_op}
for tower in self._towers:
fetches.update(tower.loss_graph.extra_compute_grad_fetches())
fetches.update(tower.loss_graph.extra_apply_grad_fetches())
outs = sess.run(
[self._train_op] + extra_ops,
fetches,
feed_dict=feed_dict,
options=run_options,
run_metadata=run_metadata)
@@ -182,20 +187,20 @@ class LocalSyncParallelOptimizer(object):
file_writer.add_run_metadata(
run_metadata, "sgd_train_{}".format(batch_index))
return outs[1:]
return outs
def get_common_loss(self):
return self._shared_loss
def get_device_losses(self):
return [t.loss_object for t in self._towers]
return [t.loss_graph for t in self._towers]
def _setup_device(self, device, device_input_placeholders):
with tf.device(device):
with tf.name_scope(TOWER_SCOPE_NAME):
device_input_batches = []
device_input_slices = []
for ph in device_input_placeholders:
for name, ph in device_input_placeholders:
current_batch = tf.Variable(
ph, trainable=False, validate_shape=False,
collections=[])
@@ -206,19 +211,18 @@ class LocalSyncParallelOptimizer(object):
([self.per_device_batch_size] + [-1] *
len(ph.shape[1:])))
current_slice.set_shape(ph.shape)
device_input_slices.append(current_slice)
device_loss_obj = self.build_loss(*device_input_slices)
device_grads = self.optimizer.compute_gradients(
device_loss_obj.loss, colocate_gradients_with_ops=True)
device_input_slices.append((name, current_slice))
graph_obj = self.build_graph(device_input_slices)
device_grads = graph_obj.gradients(self.optimizer)
return Tower(
tf.group(*[batch.initializer
for batch in device_input_batches]),
device_grads,
device_loss_obj)
graph_obj)
# Each tower is a copy of the loss graph pinned to a specific device.
Tower = namedtuple("Tower", ["init_op", "grads", "loss_object"])
Tower = namedtuple("Tower", ["init_op", "grads", "loss_graph"])
def make_divisible_by(array, n):
@@ -3,13 +3,14 @@ from __future__ import division
from __future__ import print_function
import numpy as np
from collections import defaultdict
import os
import tensorflow as tf
import ray
from ray.rllib.optimizers.policy_evaluator import TFMultiGPUSupport
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.utils.timer import TimerStat
@@ -21,13 +22,16 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
A number of SGD passes are then taken over the in-memory data. For more
details, see `multi_gpu_impl.LocalSyncParallelOptimizer`.
This optimizer is Tensorflow-specific and require evaluators to implement
the TFMultiGPUSupport API.
This optimizer is Tensorflow-specific and require the underlying
PolicyGraph to be a TFPolicyGraph instance that support `.copy()`.
Note that all replicas of the TFPolicyGraph will merge their
extra_compute_grad and apply_grad feed_dicts and fetches. This
may result in unexpected behavior.
"""
def _init(self, sgd_batch_size=128, sgd_stepsize=5e-5, num_sgd_iter=10,
timesteps_per_batch=1024):
assert isinstance(self.local_evaluator, TFMultiGPUSupport)
self.batch_size = sgd_batch_size
self.sgd_stepsize = sgd_stepsize
self.num_sgd_iter = num_sgd_iter
@@ -50,29 +54,29 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
print("LocalMultiGPUOptimizer devices", self.devices)
print("LocalMultiGPUOptimizer batch size", self.batch_size)
# List of (feature name, feature placeholder) tuples
self.loss_inputs = self.local_evaluator.tf_loss_inputs()
assert set(self.local_evaluator.policy_map.keys()) == {"default"}, \
"Multi-agent is not supported"
self.policy = self.local_evaluator.policy_map["default"]
assert isinstance(self.policy, TFPolicyGraph), \
"Only TF policies are supported"
# per-GPU graph copies created below must share vars with the policy
main_thread_scope = tf.get_variable_scope()
# reuse is set to AUTO_REUSE because Adam nodes are created after
# all of the device copies are created.
with tf.variable_scope(main_thread_scope, reuse=tf.AUTO_REUSE):
self.par_opt = LocalSyncParallelOptimizer(
tf.train.AdamOptimizer(self.sgd_stepsize),
self.devices,
[ph for _, ph in self.loss_inputs],
self.per_device_batch_size,
lambda *ph: self.local_evaluator.build_tf_loss(ph),
os.getcwd())
with self.local_evaluator.tf_sess.graph.as_default():
with self.local_evaluator.tf_sess.as_default():
main_scope = tf.get_variable_scope()
with tf.variable_scope(main_scope, reuse=tf.AUTO_REUSE):
self.par_opt = LocalSyncParallelOptimizer(
tf.train.AdamOptimizer(self.sgd_stepsize),
self.devices,
self.policy.loss_inputs(),
self.per_device_batch_size,
self.policy.copy,
os.getcwd())
# TODO(rliaw): Find more elegant solution for this
if hasattr(self.local_evaluator, "init_extra_ops"):
self.local_evaluator.init_extra_ops(
self.par_opt.get_device_losses())
self.sess = self.local_evaluator.sess
self.sess.run(tf.global_variables_initializer())
self.sess = self.local_evaluator.tf_sess
self.sess.run(tf.global_variables_initializer())
def step(self, postprocess_fn=None):
with self.update_weights_timer:
@@ -96,27 +100,26 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
with self.load_timer:
tuples_per_device = self.par_opt.load_data(
self.local_evaluator.sess,
samples.columns([key for key, _ in self.loss_inputs]))
self.sess,
samples.columns([key for key, _ in self.policy.loss_inputs()]))
with self.grad_timer:
all_extra_fetches = []
model = self.local_evaluator
all_extra_fetches = defaultdict(list)
num_batches = (
int(tuples_per_device) // int(self.per_device_batch_size))
for i in range(self.num_sgd_iter):
iter_extra_fetches = []
iter_extra_fetches = defaultdict(list)
permutation = np.random.permutation(num_batches)
for batch_index in range(num_batches):
# TODO(ekl) support ppo's debugging features, e.g.
# printing the current loss and tracing
batch_fetches = self.par_opt.optimize(
self.sess,
permutation[batch_index] * self.per_device_batch_size,
extra_ops=model.extra_apply_grad_fetches(),
extra_feed_dict=model.extra_apply_grad_feed_dict())
iter_extra_fetches += [batch_fetches]
all_extra_fetches += [iter_extra_fetches]
permutation[batch_index] * self.per_device_batch_size)
for k, v in batch_fetches.items():
iter_extra_fetches[k] += [v]
for k, v in iter_extra_fetches.items():
all_extra_fetches[k] += [v]
self.num_steps_sampled += samples.count
self.num_steps_trained += samples.count
-98
View File
@@ -1,98 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from ray.rllib.models import ModelCatalog
class ProximalPolicyGraph(object):
other_output = ["vf_preds", "logprobs"]
is_recurrent = False
def __init__(
self, observation_space, action_space,
observations, value_targets, advantages, actions,
prev_logits, prev_vf_preds, logit_dim,
kl_coeff, distribution_class, config, sess):
self.prev_dist = distribution_class(prev_logits)
# Saved so that we can compute actions given different observations
self.observations = observations
self.curr_logits = ModelCatalog.get_model(
observations, logit_dim, config["model"]).outputs
self.curr_dist = distribution_class(self.curr_logits)
self.sampler = self.curr_dist.sample()
if config["use_gae"]:
vf_config = config["model"].copy()
# Do not split the last layer of the value function into
# mean parameters and standard deviation parameters and
# do not make the standard deviations free variables.
vf_config["free_log_std"] = False
with tf.variable_scope("value_function"):
self.value_function = ModelCatalog.get_model(
observations, 1, vf_config).outputs
self.value_function = tf.reshape(self.value_function, [-1])
# Make loss functions.
self.ratio = tf.exp(self.curr_dist.logp(actions) -
self.prev_dist.logp(actions))
self.kl = self.prev_dist.kl(self.curr_dist)
self.mean_kl = tf.reduce_mean(self.kl)
self.entropy = self.curr_dist.entropy()
self.mean_entropy = tf.reduce_mean(self.entropy)
self.surr1 = self.ratio * advantages
self.surr2 = tf.clip_by_value(self.ratio, 1 - config["clip_param"],
1 + config["clip_param"]) * advantages
self.surr = tf.minimum(self.surr1, self.surr2)
self.mean_policy_loss = tf.reduce_mean(-self.surr)
if config["use_gae"]:
# We use a huber loss here to be more robust against outliers,
# which seem to occur when the rollouts get longer (the variance
# scales superlinearly with the length of the rollout)
self.vf_loss1 = tf.square(self.value_function - value_targets)
vf_clipped = prev_vf_preds + tf.clip_by_value(
self.value_function - prev_vf_preds,
-config["clip_param"], config["clip_param"])
self.vf_loss2 = tf.square(vf_clipped - value_targets)
self.vf_loss = tf.minimum(self.vf_loss1, self.vf_loss2)
self.mean_vf_loss = tf.reduce_mean(self.vf_loss)
self.loss = tf.reduce_mean(
-self.surr + kl_coeff * self.kl +
config["vf_loss_coeff"] * self.vf_loss -
config["entropy_coeff"] * self.entropy)
else:
self.mean_vf_loss = tf.constant(0.0)
self.loss = tf.reduce_mean(
-self.surr +
kl_coeff * self.kl -
config["entropy_coeff"] * self.entropy)
self.sess = sess
if config["use_gae"]:
self.policy_results = [
self.sampler, self.curr_logits, self.value_function]
else:
self.policy_results = [
self.sampler, self.curr_logits, tf.constant("NA")]
def compute_actions(self, observations, features, is_training=False):
action, logprobs, vf = self.sess.run(
self.policy_results,
feed_dict={self.observations: observations})
return action, [], {"vf_preds": vf, "logprobs": logprobs}
def postprocess_trajectory(self, batch, other_agent_batches=None):
return batch
def get_initial_state(self):
return []
def loss(self):
return self.loss
+51 -48
View File
@@ -8,11 +8,12 @@ import pickle
import tensorflow as tf
import ray
from ray.tune.result import TrainingResult
from ray.tune.trial import Resources
from ray.rllib.agent import Agent
from ray.rllib.utils.common_policy_evaluator import (
CommonPolicyEvaluator, collect_metrics)
from ray.rllib.utils import FilterManager
from ray.rllib.ppo.ppo_evaluator import PPOEvaluator
from ray.rllib.ppo.ppo_tf_policy import PPOTFPolicyGraph
from ray.rllib.optimizers.multi_gpu_optimizer import LocalMultiGPUOptimizer
DEFAULT_CONFIG = {
@@ -89,6 +90,7 @@ DEFAULT_CONFIG = {
class PPOAgent(Agent):
_agent_name = "PPO"
_default_config = DEFAULT_CONFIG
_default_policy_graph = PPOTFPolicyGraph
@classmethod
def default_resource_request(cls, config):
@@ -100,15 +102,32 @@ class PPOAgent(Agent):
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def _init(self):
self.global_step = 0
self.local_evaluator = PPOEvaluator(
self.env_creator, self.config, self.logdir, False)
RemotePPOEvaluator = ray.remote(
def session_creator():
return tf.Session(
config=tf.ConfigProto(**self.config["tf_session_args"]))
self.local_evaluator = CommonPolicyEvaluator(
self.env_creator,
self._default_policy_graph,
tf_session_creator=session_creator,
batch_mode="complete_episodes",
observation_filter=self.config["observation_filter"],
env_config=self.config["env_config"],
model_config=self.config["model"],
policy_config=self.config
)
RemoteEvaluator = CommonPolicyEvaluator.as_remote(
num_cpus=self.config["num_cpus_per_worker"],
num_gpus=self.config["num_gpus_per_worker"])(PPOEvaluator)
num_gpus=self.config["num_gpus_per_worker"])
self.remote_evaluators = [
RemotePPOEvaluator.remote(
self.env_creator, self.config, self.logdir, True)
RemoteEvaluator.remote(
self.env_creator,
self._default_policy_graph,
batch_mode="complete_episodes",
observation_filter=self.config["observation_filter"],
env_config=self.config["env_config"],
model_config=self.config["model"],
policy_config=self.config
)
for _ in range(self.config["num_workers"])]
self.optimizer = LocalMultiGPUOptimizer(
@@ -116,9 +135,11 @@ class PPOAgent(Agent):
"sgd_stepsize": self.config["sgd_stepsize"],
"num_sgd_iter": self.config["num_sgd_iter"],
"timesteps_per_batch": self.config["timesteps_per_batch"]},
self.local_evaluator, self.remote_evaluators,)
self.local_evaluator, self.remote_evaluators)
self.saver = tf.train.Saver(max_to_keep=None)
# TODO(rliaw): Push into Policy Graph
with self.local_evaluator.tf_sess.graph.as_default():
self.saver = tf.train.Saver()
def _train(self):
def postprocess_samples(batch):
@@ -133,48 +154,29 @@ class PPOAgent(Agent):
batch.data["value_targets"] = dummy
batch.data["vf_preds"] = dummy
extra_fetches = self.optimizer.step(postprocess_fn=postprocess_samples)
kl = np.array(extra_fetches["kl"]).mean(axis=1)[-1]
total_loss = np.array(extra_fetches["total_loss"]).mean(axis=1)[-1]
policy_loss = np.array(extra_fetches["policy_loss"]).mean(axis=1)[-1]
vf_loss = np.array(extra_fetches["vf_loss"]).mean(axis=1)[-1]
entropy = np.array(extra_fetches["entropy"]).mean(axis=1)[-1]
final_metrics = np.array(extra_fetches).mean(axis=1)[-1, :].tolist()
total_loss, policy_loss, vf_loss, kl, entropy = final_metrics
self.local_evaluator.update_kl(kl)
newkl = self.local_evaluator.for_policy(lambda pi: pi.update_kl(kl))
info = {
"kl_divergence": kl,
"kl_coefficient": newkl,
"total_loss": total_loss,
"policy_loss": policy_loss,
"vf_loss": vf_loss,
"kl_divergence": kl,
"entropy": entropy,
"kl_coefficient": self.local_evaluator.kl_coeff_val,
}
FilterManager.synchronize(
self.local_evaluator.filters, self.remote_evaluators)
res = self._fetch_metrics_from_remote_evaluators()
res = collect_metrics(self.local_evaluator, self.remote_evaluators)
res = res._replace(info=info)
return res
def _fetch_metrics_from_remote_evaluators(self):
episode_rewards = []
episode_lengths = []
metric_lists = [a.get_completed_rollout_metrics.remote()
for a in self.remote_evaluators]
for metrics in metric_lists:
for episode in ray.get(metrics):
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
avg_reward = (
np.mean(episode_rewards) if episode_rewards else float('nan'))
avg_length = (
np.mean(episode_lengths) if episode_lengths else float('nan'))
timesteps = np.sum(episode_lengths) if episode_lengths else 0
result = TrainingResult(
episode_reward_mean=avg_reward,
episode_len_mean=avg_length,
timesteps_this_iter=timesteps)
return result
def _stop(self):
# workaround for https://github.com/ray-project/ray/issues/1516
for ev in self.remote_evaluators:
@@ -182,29 +184,30 @@ class PPOAgent(Agent):
def _save(self, checkpoint_dir):
checkpoint_path = self.saver.save(
self.local_evaluator.sess,
self.local_evaluator.tf_sess,
os.path.join(checkpoint_dir, "checkpoint"),
global_step=self.iteration)
agent_state = ray.get(
[a.save.remote() for a in self.remote_evaluators])
extra_data = [
self.local_evaluator.save(),
self.global_step,
agent_state]
pickle.dump(extra_data, open(checkpoint_path + ".extra_data", "wb"))
return checkpoint_path
def _restore(self, checkpoint_path):
self.saver.restore(self.local_evaluator.sess, checkpoint_path)
self.saver.restore(self.local_evaluator.tf_sess, checkpoint_path)
extra_data = pickle.load(open(checkpoint_path + ".extra_data", "rb"))
self.local_evaluator.restore(extra_data[0])
self.global_step = extra_data[1]
ray.get([
a.restore.remote(o)
for (a, o) in zip(self.remote_evaluators, extra_data[2])])
for (a, o) in zip(self.remote_evaluators, extra_data[1])])
def compute_action(self, observation):
observation = self.local_evaluator.obs_filter(
def compute_action(self, observation, state=None):
if state is None:
state = []
obs = self.local_evaluator.filters["default"](
observation, update=False)
return self.local_evaluator.common_policy.compute_actions(
[observation], [], False)[0][0]
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
obs, state, is_training=False)[0])
-201
View File
@@ -1,201 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pickle
import tensorflow as tf
from collections import OrderedDict
import ray
from ray.rllib.optimizers import SampleBatch, TFMultiGPUSupport
from ray.rllib.models import ModelCatalog
from ray.rllib.utils.sampler import SyncSampler
from ray.rllib.utils.filter import get_filter, MeanStdFilter
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.ppo.loss import ProximalPolicyGraph
class PPOEvaluator(TFMultiGPUSupport):
"""
Runner class that holds the simulator environment and the policy.
Initializes the tensorflow graphs for both training and evaluation.
One common policy graph is initialized on '/cpu:0' and holds all the shared
network weights. When run as a remote agent, only this graph is used.
"""
def __init__(self, env_creator, config, logdir, is_remote):
self.config = config
self.logdir = logdir
self.env = ModelCatalog.get_preprocessor_as_wrapper(
env_creator(config["env_config"]), config["model"])
if is_remote:
config_proto = tf.ConfigProto()
else:
config_proto = tf.ConfigProto(**config["tf_session_args"])
self.sess = tf.Session(config=config_proto)
self.kl_coeff_val = self.config["kl_coeff"]
self.kl_target = self.config["kl_target"]
# Defines the training inputs:
# The coefficient of the KL penalty.
self.kl_coeff = tf.placeholder(
name="newkl", shape=(), dtype=tf.float32)
# The input observations.
self.observations = tf.placeholder(
tf.float32, shape=(None,) + self.env.observation_space.shape)
# Targets of the value function.
self.value_targets = tf.placeholder(tf.float32, shape=(None,))
# Advantage values in the policy gradient estimator.
self.advantages = tf.placeholder(tf.float32, shape=(None,))
action_space = self.env.action_space
self.actions = ModelCatalog.get_action_placeholder(action_space)
self.distribution_class, self.logit_dim = ModelCatalog.get_action_dist(
action_space, config["model"])
# Log probabilities from the policy before the policy update.
self.prev_logits = tf.placeholder(
tf.float32, shape=(None, self.logit_dim))
# Value function predictions before the policy update.
self.prev_vf_preds = tf.placeholder(tf.float32, shape=(None,))
self.inputs = [
("obs", self.observations),
("value_targets", self.value_targets),
("advantages", self.advantages),
("actions", self.actions),
("logprobs", self.prev_logits),
("vf_preds", self.prev_vf_preds)
]
self.common_policy = self.build_tf_loss([ph for _, ph in self.inputs])
# References to the model weights
self.variables = ray.experimental.TensorFlowVariables(
self.common_policy.loss, self.sess)
self.obs_filter = get_filter(
config["observation_filter"], self.env.observation_space.shape)
self.rew_filter = MeanStdFilter((), clip=5.0)
self.filters = {"obs_filter": self.obs_filter,
"rew_filter": self.rew_filter}
self.sampler = SyncSampler(
self.env, {"default": self.common_policy}, lambda _: "default",
{"default": self.obs_filter}, self.config["horizon"],
self.config["horizon"])
def tf_loss_inputs(self):
return self.inputs
def build_tf_loss(self, input_placeholders):
obs, vtargets, advs, acts, plog, pvf_preds = input_placeholders
return ProximalPolicyGraph(
self.env.observation_space, self.env.action_space,
obs, vtargets, advs, acts, plog, pvf_preds, self.logit_dim,
self.kl_coeff, self.distribution_class, self.config,
self.sess)
def init_extra_ops(self, device_losses):
self.extra_ops = OrderedDict()
with tf.name_scope("test_outputs"):
policies = device_losses
self.extra_ops["loss"] = tf.reduce_mean(
tf.stack(values=[
policy.loss for policy in policies]), 0)
self.extra_ops["policy_loss"] = tf.reduce_mean(
tf.stack(values=[
policy.mean_policy_loss for policy in policies]), 0)
self.extra_ops["vf_loss"] = tf.reduce_mean(
tf.stack(values=[
policy.mean_vf_loss for policy in policies]), 0)
self.extra_ops["kl"] = tf.reduce_mean(
tf.stack(values=[
policy.mean_kl for policy in policies]), 0)
self.extra_ops["entropy"] = tf.reduce_mean(
tf.stack(values=[
policy.mean_entropy for policy in policies]), 0)
def extra_apply_grad_fetches(self):
return list(self.extra_ops.values())
def extra_apply_grad_feed_dict(self):
return {self.kl_coeff: self.kl_coeff_val}
def update_kl(self, sampled_kl):
if sampled_kl > 2.0 * self.kl_target:
self.kl_coeff_val *= 1.5
elif sampled_kl < 0.5 * self.kl_target:
self.kl_coeff_val *= 0.5
def save(self):
filters = self.get_filters(flush_after=True)
return pickle.dumps({
"filters": filters,
"kl_coeff_val": self.kl_coeff_val,
"kl_target": self.kl_target,
})
def restore(self, objs):
objs = pickle.loads(objs)
self.sync_filters(objs["filters"])
self.kl_coeff_val = objs["kl_coeff_val"]
self.kl_target = objs["kl_target"]
def get_weights(self):
return self.variables.get_weights()
def set_weights(self, weights):
self.variables.set_weights(weights)
def sample(self):
"""Returns experience samples from this Evaluator. Observation
filter and reward filters are flushed here.
Returns:
SampleBatch: A columnar batch of experiences.
"""
num_steps_so_far = 0
all_samples = []
while num_steps_so_far < self.config["min_steps_per_task"]:
rollout = self.sampler.get_data()
last_r = 0.0 # note: not needed since we don't truncate rollouts
samples = compute_advantages(
rollout, last_r, self.config["gamma"],
self.config["lambda"], use_gae=self.config["use_gae"])
num_steps_so_far += samples.count
all_samples.append(samples)
return SampleBatch.concat_samples(all_samples)
def get_completed_rollout_metrics(self):
"""Returns metrics on previously completed rollouts.
Calling this clears the queue of completed rollout metrics.
"""
return self.sampler.get_metrics()
def sync_filters(self, new_filters):
"""Changes self's filter to given and rebases any accumulated delta.
Args:
new_filters (dict): Filters with new state to update local copy.
"""
assert all(k in new_filters for k in self.filters)
for k in self.filters:
self.filters[k].sync(new_filters[k])
def get_filters(self, flush_after=False):
"""Returns a snapshot of filters.
Args:
flush_after (bool): Clears the filter buffer state.
Returns:
return_filters (dict): Dict for serializable filters
"""
return_filters = {}
for k, f in self.filters.items():
return_filters[k] = f.as_serializable()
if flush_after:
f.clear_buffer()
return return_filters
+198
View File
@@ -0,0 +1,198 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
class PPOLoss(object):
def __init__(
self, action_space, value_targets, advantages, actions, logprobs,
vf_preds, curr_action_dist, value_fn, cur_kl_coeff,
entropy_coeff=0, clip_param=0.1, vf_loss_coeff=1.0, use_gae=True):
"""Constructs the loss for Proximal Policy Objective.
Arguments:
action_space: Environment observation space specification.
value_targets (Placeholder): Placeholder for target values; used
for GAE.
actions (Placeholder): Placeholder for actions taken
from previous model evaluation.
advantages (Placeholder): Placeholder for calculated advantages
from previous model evaluation.
logprobs (Placeholder): Placeholder for logits output from
previous model evaluation.
vf_preds (Placeholder): Placeholder for value function output
from previous model evaluation.
curr_action_dist (ActionDistribution): ActionDistribution
of the current model.
value_fn (Tensor): Current value function output Tensor.
cur_kl_coeff (Variable): Variable holding the current PPO KL
coefficient.
entropy_coeff (float): Coefficient of the entropy regularizer.
clip_param (float): Clip parameter
vf_loss_coeff (float): Coefficient of the value function loss
use_gae (bool): If true, use the Generalized Advantage Estimator.
"""
dist_cls, _ = ModelCatalog.get_action_dist(action_space)
prev_dist = dist_cls(logprobs)
# Make loss functions.
logp_ratio = tf.exp(
curr_action_dist.logp(actions) - prev_dist.logp(actions))
action_kl = prev_dist.kl(curr_action_dist)
self.mean_kl = tf.reduce_mean(action_kl)
curr_entropy = curr_action_dist.entropy()
self.mean_entropy = tf.reduce_mean(curr_entropy)
surrogate_loss = tf.minimum(
advantages * logp_ratio,
advantages * tf.clip_by_value(
logp_ratio, 1 - clip_param, 1 + clip_param))
self.mean_policy_loss = tf.reduce_mean(-surrogate_loss)
if use_gae:
vf_loss1 = tf.square(value_fn - value_targets)
vf_clipped = vf_preds + tf.clip_by_value(
value_fn - vf_preds, -clip_param, clip_param)
vf_loss2 = tf.square(vf_clipped - value_targets)
vf_loss = tf.minimum(vf_loss1, vf_loss2)
self.mean_vf_loss = tf.reduce_mean(vf_loss)
loss = tf.reduce_mean(
-surrogate_loss + cur_kl_coeff*action_kl +
vf_loss_coeff*vf_loss - entropy_coeff*curr_entropy)
else:
self.mean_vf_loss = tf.constant(0.0)
loss = tf.reduce_mean(
-surrogate_loss + cur_kl_coeff*action_kl -
entropy_coeff*curr_entropy)
self.loss = loss
class PPOTFPolicyGraph(TFPolicyGraph):
def __init__(self, observation_space, action_space,
config, existing_inputs=None):
"""
Arguments:
observation_space: Environment observation space specification.
action_space: Environment action space specification.
config (dict): Configuration values for PPO graph.
existing_inputs (list): Optional list of tuples that specify the
placeholders upon which the graph should be built upon.
"""
self.sess = tf.get_default_session()
self.action_space = action_space
self.config = config
self.kl_coeff_val = self.config["kl_coeff"]
self.kl_target = self.config["kl_target"]
dist_cls, logit_dim = ModelCatalog.get_action_dist(
action_space)
if existing_inputs:
self.loss_in = existing_inputs
obs_ph, value_targets_ph, adv_ph, act_ph, \
logprobs_ph, vf_preds_ph = [ph for _, ph in existing_inputs]
else:
obs_ph = tf.placeholder(
tf.float32, name="obs", shape=(None,)+observation_space.shape)
# Targets of the value function.
value_targets_ph = tf.placeholder(
tf.float32, name="value_targets", shape=(None,))
# Advantage values in the policy gradient estimator.
adv_ph = tf.placeholder(
tf.float32, name="advantages", shape=(None,))
act_ph = ModelCatalog.get_action_placeholder(action_space)
# Log probabilities from the policy before the policy update.
logprobs_ph = tf.placeholder(
tf.float32, name="logprobs", shape=(None, logit_dim))
# Value function predictions before the policy update.
vf_preds_ph = tf.placeholder(
tf.float32, name="vf_preds", shape=(None,))
self.loss_in = [
("obs", obs_ph),
("value_targets", value_targets_ph),
("advantages", adv_ph),
("actions", act_ph),
("logprobs", logprobs_ph),
("vf_preds", vf_preds_ph)
]
# KL Coefficient
self.kl_coeff = tf.get_variable(
initializer=tf.constant_initializer(self.kl_coeff_val),
name="kl_coeff", shape=(), trainable=False, dtype=tf.float32)
self.logits = ModelCatalog.get_model(
obs_ph, logit_dim, self.config["model"]).outputs
curr_action_dist = dist_cls(self.logits)
self.sampler = curr_action_dist.sample()
if self.config["use_gae"]:
vf_config = self.config["model"].copy()
# Do not split the last layer of the value function into
# mean parameters and standard deviation parameters and
# do not make the standard deviations free variables.
vf_config["free_log_std"] = False
with tf.variable_scope("value_function"):
self.value_function = ModelCatalog.get_model(
obs_ph, 1, vf_config).outputs
self.value_function = tf.reshape(self.value_function, [-1])
else:
self.value_function = tf.constant("NA")
self.loss_obj = PPOLoss(
action_space, value_targets_ph, adv_ph, act_ph,
logprobs_ph, vf_preds_ph,
curr_action_dist, self.value_function, self.kl_coeff,
entropy_coeff=self.config["entropy_coeff"],
clip_param=self.config["clip_param"],
vf_loss_coeff=self.config["kl_target"],
use_gae=self.config["use_gae"])
self.is_training = tf.placeholder_with_default(True, ())
TFPolicyGraph.__init__(
self, observation_space, action_space,
self.sess, obs_input=obs_ph,
action_sampler=self.sampler, loss=self.loss_obj.loss,
loss_inputs=self.loss_in,
is_training=self.is_training)
def copy(self, existing_inputs):
"""Creates a copy of self using existing input placeholders."""
return PPOTFPolicyGraph(
None, self.action_space, self.config,
existing_inputs=existing_inputs)
def extra_compute_action_fetches(self):
return {"vf_preds": self.value_function, "logprobs": self.logits}
def extra_apply_grad_fetches(self):
return {
"total_loss": self.loss_obj.loss,
"policy_loss": self.loss_obj.mean_policy_loss,
"vf_loss": self.loss_obj.mean_vf_loss,
"kl": self.loss_obj.mean_kl,
"entropy": self.loss_obj.mean_entropy
}
def update_kl(self, sampled_kl):
if sampled_kl > 2.0 * self.kl_target:
self.kl_coeff_val *= 1.5
elif sampled_kl < 0.5 * self.kl_target:
self.kl_coeff_val *= 0.5
self.kl_coeff.load(self.kl_coeff_val, session=self.sess)
return self.kl_coeff_val
def postprocess_trajectory(self, sample_batch, other_agent_batches=None):
last_r = 0.0
batch = compute_advantages(
sample_batch, last_r, self.config["gamma"],
self.config["lambda"], use_gae=self.config["use_gae"])
return batch
def gradients(self, optimizer):
return optimizer.compute_gradients(
self._loss, colocate_gradients_with_ops=True)
@@ -192,6 +192,7 @@ class CommonPolicyEvaluator(PolicyEvaluator):
env_context = EnvContext(env_config or {}, worker_index)
policy_config = policy_config or {}
self.policy_config = policy_config
model_config = model_config or {}
policy_mapping_fn = (
policy_mapping_fn or (lambda agent_id: DEFAULT_POLICY_ID))
@@ -210,3 +210,6 @@ class TFPolicyGraph(PolicyGraph):
def gradients(self, optimizer):
return optimizer.compute_gradients(self._loss)
def loss_inputs(self):
return self._loss_inputs