From b4dff9f933ce03619fc22b3d6456a6531eed4dd1 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 28 Jun 2018 09:49:08 -0700 Subject: [PATCH] [rllib] PPO onto new RLlib APIs (#2270) --- .../legacy_multiagent/multiagent_pendulum.py | 1 + python/ray/rllib/optimizers/multi_gpu_impl.py | 66 +++--- .../rllib/optimizers/multi_gpu_optimizer.py | 67 +++--- python/ray/rllib/ppo/loss.py | 98 --------- python/ray/rllib/ppo/ppo.py | 99 ++++----- python/ray/rllib/ppo/ppo_evaluator.py | 201 ------------------ python/ray/rllib/ppo/ppo_tf_policy.py | 198 +++++++++++++++++ .../rllib/utils/common_policy_evaluator.py | 1 + python/ray/rllib/utils/tf_policy_graph.py | 3 + 9 files changed, 324 insertions(+), 410 deletions(-) delete mode 100644 python/ray/rllib/ppo/loss.py delete mode 100644 python/ray/rllib/ppo/ppo_evaluator.py create mode 100644 python/ray/rllib/ppo/ppo_tf_policy.py diff --git a/python/ray/rllib/examples/legacy_multiagent/multiagent_pendulum.py b/python/ray/rllib/examples/legacy_multiagent/multiagent_pendulum.py index d4cf8e5bf..baf5bc29a 100644 --- a/python/ray/rllib/examples/legacy_multiagent/multiagent_pendulum.py +++ b/python/ray/rllib/examples/legacy_multiagent/multiagent_pendulum.py @@ -43,6 +43,7 @@ if __name__ == '__main__': ray.init(num_cpus=num_cpus, redirect_output=True) config["num_workers"] = num_cpus config["timesteps_per_batch"] = 10 + config["sgd_batchsize"] = 10 config["num_sgd_iter"] = 10 config["gamma"] = 0.999 config["horizon"] = horizon diff --git a/python/ray/rllib/optimizers/multi_gpu_impl.py b/python/ray/rllib/optimizers/multi_gpu_impl.py index cefe52309..6a694e1e5 100644 --- a/python/ray/rllib/optimizers/multi_gpu_impl.py +++ b/python/ray/rllib/optimizers/multi_gpu_impl.py @@ -34,47 +34,48 @@ class LocalSyncParallelOptimizer(object): Args: optimizer: Delegate TensorFlow optimizer object. devices: List of the names of TensorFlow devices to parallelize over. - input_placeholders: List of inputs for the loss function. Tensors of - these shapes will be passed to build_loss() in order to define the - per-device loss ops. + input_placeholders: List of (name, input_placeholder) + for the loss function. Tensors of these shapes will be passed + to build_graph() in order to define the per-device loss ops. per_device_batch_size: Number of tuples to optimize over at a time per device. In each call to `optimize()`, `len(devices) * per_device_batch_size` tuples of data will be processed. - build_loss: Function that takes the specified inputs and returns an - object with a 'loss' property that is a scalar Tensor. For example, - ray.rllib.ppo.ProximalPolicyGraph. + build_graph: Function that takes the specified inputs and returns a + TF Policy Graph instance. logdir: Directory to place debugging output in. grad_norm_clipping: None or int stdev to clip grad norms by """ def __init__(self, optimizer, devices, input_placeholders, - per_device_batch_size, build_loss, logdir, + per_device_batch_size, build_graph, logdir, grad_norm_clipping=None): + # TODO(rliaw): remove logdir self.optimizer = optimizer self.devices = devices self.batch_size = per_device_batch_size * len(devices) self.per_device_batch_size = per_device_batch_size - self.input_placeholders = input_placeholders - self.build_loss = build_loss + self.loss_inputs = input_placeholders + self.build_graph = build_graph self.logdir = logdir # First initialize the shared loss network with tf.name_scope(TOWER_SCOPE_NAME): - self._shared_loss = build_loss(*input_placeholders) + self._shared_loss = build_graph(input_placeholders) # Then setup the per-device loss graphs that use the shared weights self._batch_index = tf.placeholder(tf.int32) # Split on the CPU in case the data doesn't fit in GPU memory. with tf.device("/cpu:0"): + names, placeholders = zip(*input_placeholders) data_splits = zip( - *[tf.split(ph, len(devices)) for ph in input_placeholders]) + *[tf.split(ph, len(devices)) for ph in placeholders]) self._towers = [] for device, device_placeholders in zip(self.devices, data_splits): - self._towers.append(self._setup_device(device, - device_placeholders)) + self._towers.append( + self._setup_device(device, zip(names, device_placeholders))) avg = average_gradients([t.grads for t in self._towers]) if grad_norm_clipping: @@ -103,8 +104,8 @@ class LocalSyncParallelOptimizer(object): """ feed_dict = {} - assert len(self.input_placeholders) == len(inputs) - for ph, arr in zip(self.input_placeholders, inputs): + assert len(self.loss_inputs) == len(inputs) + for (name, ph), arr in zip(self.loss_inputs, inputs): truncated_arr = make_divisible_by(arr, self.batch_size) feed_dict[ph] = truncated_arr truncated_len = len(truncated_arr) @@ -135,8 +136,7 @@ class LocalSyncParallelOptimizer(object): assert tuples_per_device % self.per_device_batch_size == 0 return tuples_per_device - def optimize(self, sess, batch_index, extra_ops=[], extra_feed_dict={}, - file_writer=None): + def optimize(self, sess, batch_index, file_writer=None): """Run a single step of SGD. Runs a SGD step over a slice of the preloaded batch with size given by @@ -151,15 +151,12 @@ class LocalSyncParallelOptimizer(object): batch_index: Offset into the preloaded data. This value must be between `0` and `tuples_per_device`. The amount of data to process is always fixed to `per_device_batch_size`. - extra_ops: Extra ops to run with this step (e.g. for metrics). - extra_feed_dict: Extra args to feed into this session run. file_writer: If specified, tf metrics will be written out using this. Returns: The outputs of extra_ops evaluated over the batch. """ - if file_writer: run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) else: @@ -167,9 +164,17 @@ class LocalSyncParallelOptimizer(object): run_metadata = tf.RunMetadata() feed_dict = {self._batch_index: batch_index} - feed_dict.update(extra_feed_dict) + for tower in self._towers: + feed_dict.update(tower.loss_graph.extra_compute_grad_feed_dict()) + feed_dict.update(tower.loss_graph.extra_apply_grad_feed_dict()) + + fetches = {"train": self._train_op} + for tower in self._towers: + fetches.update(tower.loss_graph.extra_compute_grad_fetches()) + fetches.update(tower.loss_graph.extra_apply_grad_fetches()) + outs = sess.run( - [self._train_op] + extra_ops, + fetches, feed_dict=feed_dict, options=run_options, run_metadata=run_metadata) @@ -182,20 +187,20 @@ class LocalSyncParallelOptimizer(object): file_writer.add_run_metadata( run_metadata, "sgd_train_{}".format(batch_index)) - return outs[1:] + return outs def get_common_loss(self): return self._shared_loss def get_device_losses(self): - return [t.loss_object for t in self._towers] + return [t.loss_graph for t in self._towers] def _setup_device(self, device, device_input_placeholders): with tf.device(device): with tf.name_scope(TOWER_SCOPE_NAME): device_input_batches = [] device_input_slices = [] - for ph in device_input_placeholders: + for name, ph in device_input_placeholders: current_batch = tf.Variable( ph, trainable=False, validate_shape=False, collections=[]) @@ -206,19 +211,18 @@ class LocalSyncParallelOptimizer(object): ([self.per_device_batch_size] + [-1] * len(ph.shape[1:]))) current_slice.set_shape(ph.shape) - device_input_slices.append(current_slice) - device_loss_obj = self.build_loss(*device_input_slices) - device_grads = self.optimizer.compute_gradients( - device_loss_obj.loss, colocate_gradients_with_ops=True) + device_input_slices.append((name, current_slice)) + graph_obj = self.build_graph(device_input_slices) + device_grads = graph_obj.gradients(self.optimizer) return Tower( tf.group(*[batch.initializer for batch in device_input_batches]), device_grads, - device_loss_obj) + graph_obj) # Each tower is a copy of the loss graph pinned to a specific device. -Tower = namedtuple("Tower", ["init_op", "grads", "loss_object"]) +Tower = namedtuple("Tower", ["init_op", "grads", "loss_graph"]) def make_divisible_by(array, n): diff --git a/python/ray/rllib/optimizers/multi_gpu_optimizer.py b/python/ray/rllib/optimizers/multi_gpu_optimizer.py index aa3a82072..c2f24367c 100644 --- a/python/ray/rllib/optimizers/multi_gpu_optimizer.py +++ b/python/ray/rllib/optimizers/multi_gpu_optimizer.py @@ -3,13 +3,14 @@ from __future__ import division from __future__ import print_function import numpy as np +from collections import defaultdict import os import tensorflow as tf import ray -from ray.rllib.optimizers.policy_evaluator import TFMultiGPUSupport from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer +from ray.rllib.utils.tf_policy_graph import TFPolicyGraph from ray.rllib.utils.timer import TimerStat @@ -21,13 +22,16 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): A number of SGD passes are then taken over the in-memory data. For more details, see `multi_gpu_impl.LocalSyncParallelOptimizer`. - This optimizer is Tensorflow-specific and require evaluators to implement - the TFMultiGPUSupport API. + This optimizer is Tensorflow-specific and require the underlying + PolicyGraph to be a TFPolicyGraph instance that support `.copy()`. + + Note that all replicas of the TFPolicyGraph will merge their + extra_compute_grad and apply_grad feed_dicts and fetches. This + may result in unexpected behavior. """ def _init(self, sgd_batch_size=128, sgd_stepsize=5e-5, num_sgd_iter=10, timesteps_per_batch=1024): - assert isinstance(self.local_evaluator, TFMultiGPUSupport) self.batch_size = sgd_batch_size self.sgd_stepsize = sgd_stepsize self.num_sgd_iter = num_sgd_iter @@ -50,29 +54,29 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): print("LocalMultiGPUOptimizer devices", self.devices) print("LocalMultiGPUOptimizer batch size", self.batch_size) - # List of (feature name, feature placeholder) tuples - self.loss_inputs = self.local_evaluator.tf_loss_inputs() + assert set(self.local_evaluator.policy_map.keys()) == {"default"}, \ + "Multi-agent is not supported" + self.policy = self.local_evaluator.policy_map["default"] + assert isinstance(self.policy, TFPolicyGraph), \ + "Only TF policies are supported" # per-GPU graph copies created below must share vars with the policy - main_thread_scope = tf.get_variable_scope() # reuse is set to AUTO_REUSE because Adam nodes are created after # all of the device copies are created. - with tf.variable_scope(main_thread_scope, reuse=tf.AUTO_REUSE): - self.par_opt = LocalSyncParallelOptimizer( - tf.train.AdamOptimizer(self.sgd_stepsize), - self.devices, - [ph for _, ph in self.loss_inputs], - self.per_device_batch_size, - lambda *ph: self.local_evaluator.build_tf_loss(ph), - os.getcwd()) + with self.local_evaluator.tf_sess.graph.as_default(): + with self.local_evaluator.tf_sess.as_default(): + main_scope = tf.get_variable_scope() + with tf.variable_scope(main_scope, reuse=tf.AUTO_REUSE): + self.par_opt = LocalSyncParallelOptimizer( + tf.train.AdamOptimizer(self.sgd_stepsize), + self.devices, + self.policy.loss_inputs(), + self.per_device_batch_size, + self.policy.copy, + os.getcwd()) - # TODO(rliaw): Find more elegant solution for this - if hasattr(self.local_evaluator, "init_extra_ops"): - self.local_evaluator.init_extra_ops( - self.par_opt.get_device_losses()) - - self.sess = self.local_evaluator.sess - self.sess.run(tf.global_variables_initializer()) + self.sess = self.local_evaluator.tf_sess + self.sess.run(tf.global_variables_initializer()) def step(self, postprocess_fn=None): with self.update_weights_timer: @@ -96,27 +100,26 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): with self.load_timer: tuples_per_device = self.par_opt.load_data( - self.local_evaluator.sess, - samples.columns([key for key, _ in self.loss_inputs])) + self.sess, + samples.columns([key for key, _ in self.policy.loss_inputs()])) with self.grad_timer: - all_extra_fetches = [] - model = self.local_evaluator + all_extra_fetches = defaultdict(list) num_batches = ( int(tuples_per_device) // int(self.per_device_batch_size)) for i in range(self.num_sgd_iter): - iter_extra_fetches = [] + iter_extra_fetches = defaultdict(list) permutation = np.random.permutation(num_batches) for batch_index in range(num_batches): # TODO(ekl) support ppo's debugging features, e.g. # printing the current loss and tracing batch_fetches = self.par_opt.optimize( self.sess, - permutation[batch_index] * self.per_device_batch_size, - extra_ops=model.extra_apply_grad_fetches(), - extra_feed_dict=model.extra_apply_grad_feed_dict()) - iter_extra_fetches += [batch_fetches] - all_extra_fetches += [iter_extra_fetches] + permutation[batch_index] * self.per_device_batch_size) + for k, v in batch_fetches.items(): + iter_extra_fetches[k] += [v] + for k, v in iter_extra_fetches.items(): + all_extra_fetches[k] += [v] self.num_steps_sampled += samples.count self.num_steps_trained += samples.count diff --git a/python/ray/rllib/ppo/loss.py b/python/ray/rllib/ppo/loss.py deleted file mode 100644 index e40e03cb3..000000000 --- a/python/ray/rllib/ppo/loss.py +++ /dev/null @@ -1,98 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf - -from ray.rllib.models import ModelCatalog - - -class ProximalPolicyGraph(object): - - other_output = ["vf_preds", "logprobs"] - is_recurrent = False - - def __init__( - self, observation_space, action_space, - observations, value_targets, advantages, actions, - prev_logits, prev_vf_preds, logit_dim, - kl_coeff, distribution_class, config, sess): - self.prev_dist = distribution_class(prev_logits) - - # Saved so that we can compute actions given different observations - self.observations = observations - - self.curr_logits = ModelCatalog.get_model( - observations, logit_dim, config["model"]).outputs - self.curr_dist = distribution_class(self.curr_logits) - self.sampler = self.curr_dist.sample() - - if config["use_gae"]: - vf_config = config["model"].copy() - # Do not split the last layer of the value function into - # mean parameters and standard deviation parameters and - # do not make the standard deviations free variables. - vf_config["free_log_std"] = False - with tf.variable_scope("value_function"): - self.value_function = ModelCatalog.get_model( - observations, 1, vf_config).outputs - self.value_function = tf.reshape(self.value_function, [-1]) - - # Make loss functions. - self.ratio = tf.exp(self.curr_dist.logp(actions) - - self.prev_dist.logp(actions)) - self.kl = self.prev_dist.kl(self.curr_dist) - self.mean_kl = tf.reduce_mean(self.kl) - self.entropy = self.curr_dist.entropy() - self.mean_entropy = tf.reduce_mean(self.entropy) - self.surr1 = self.ratio * advantages - self.surr2 = tf.clip_by_value(self.ratio, 1 - config["clip_param"], - 1 + config["clip_param"]) * advantages - self.surr = tf.minimum(self.surr1, self.surr2) - self.mean_policy_loss = tf.reduce_mean(-self.surr) - - if config["use_gae"]: - # We use a huber loss here to be more robust against outliers, - # which seem to occur when the rollouts get longer (the variance - # scales superlinearly with the length of the rollout) - self.vf_loss1 = tf.square(self.value_function - value_targets) - vf_clipped = prev_vf_preds + tf.clip_by_value( - self.value_function - prev_vf_preds, - -config["clip_param"], config["clip_param"]) - self.vf_loss2 = tf.square(vf_clipped - value_targets) - self.vf_loss = tf.minimum(self.vf_loss1, self.vf_loss2) - self.mean_vf_loss = tf.reduce_mean(self.vf_loss) - self.loss = tf.reduce_mean( - -self.surr + kl_coeff * self.kl + - config["vf_loss_coeff"] * self.vf_loss - - config["entropy_coeff"] * self.entropy) - else: - self.mean_vf_loss = tf.constant(0.0) - self.loss = tf.reduce_mean( - -self.surr + - kl_coeff * self.kl - - config["entropy_coeff"] * self.entropy) - - self.sess = sess - - if config["use_gae"]: - self.policy_results = [ - self.sampler, self.curr_logits, self.value_function] - else: - self.policy_results = [ - self.sampler, self.curr_logits, tf.constant("NA")] - - def compute_actions(self, observations, features, is_training=False): - action, logprobs, vf = self.sess.run( - self.policy_results, - feed_dict={self.observations: observations}) - return action, [], {"vf_preds": vf, "logprobs": logprobs} - - def postprocess_trajectory(self, batch, other_agent_batches=None): - return batch - - def get_initial_state(self): - return [] - - def loss(self): - return self.loss diff --git a/python/ray/rllib/ppo/ppo.py b/python/ray/rllib/ppo/ppo.py index 5ae3e8dbf..dfd2c594d 100644 --- a/python/ray/rllib/ppo/ppo.py +++ b/python/ray/rllib/ppo/ppo.py @@ -8,11 +8,12 @@ import pickle import tensorflow as tf import ray -from ray.tune.result import TrainingResult from ray.tune.trial import Resources from ray.rllib.agent import Agent +from ray.rllib.utils.common_policy_evaluator import ( + CommonPolicyEvaluator, collect_metrics) from ray.rllib.utils import FilterManager -from ray.rllib.ppo.ppo_evaluator import PPOEvaluator +from ray.rllib.ppo.ppo_tf_policy import PPOTFPolicyGraph from ray.rllib.optimizers.multi_gpu_optimizer import LocalMultiGPUOptimizer DEFAULT_CONFIG = { @@ -89,6 +90,7 @@ DEFAULT_CONFIG = { class PPOAgent(Agent): _agent_name = "PPO" _default_config = DEFAULT_CONFIG + _default_policy_graph = PPOTFPolicyGraph @classmethod def default_resource_request(cls, config): @@ -100,15 +102,32 @@ class PPOAgent(Agent): extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) def _init(self): - self.global_step = 0 - self.local_evaluator = PPOEvaluator( - self.env_creator, self.config, self.logdir, False) - RemotePPOEvaluator = ray.remote( + def session_creator(): + return tf.Session( + config=tf.ConfigProto(**self.config["tf_session_args"])) + self.local_evaluator = CommonPolicyEvaluator( + self.env_creator, + self._default_policy_graph, + tf_session_creator=session_creator, + batch_mode="complete_episodes", + observation_filter=self.config["observation_filter"], + env_config=self.config["env_config"], + model_config=self.config["model"], + policy_config=self.config + ) + RemoteEvaluator = CommonPolicyEvaluator.as_remote( num_cpus=self.config["num_cpus_per_worker"], - num_gpus=self.config["num_gpus_per_worker"])(PPOEvaluator) + num_gpus=self.config["num_gpus_per_worker"]) self.remote_evaluators = [ - RemotePPOEvaluator.remote( - self.env_creator, self.config, self.logdir, True) + RemoteEvaluator.remote( + self.env_creator, + self._default_policy_graph, + batch_mode="complete_episodes", + observation_filter=self.config["observation_filter"], + env_config=self.config["env_config"], + model_config=self.config["model"], + policy_config=self.config + ) for _ in range(self.config["num_workers"])] self.optimizer = LocalMultiGPUOptimizer( @@ -116,9 +135,11 @@ class PPOAgent(Agent): "sgd_stepsize": self.config["sgd_stepsize"], "num_sgd_iter": self.config["num_sgd_iter"], "timesteps_per_batch": self.config["timesteps_per_batch"]}, - self.local_evaluator, self.remote_evaluators,) + self.local_evaluator, self.remote_evaluators) - self.saver = tf.train.Saver(max_to_keep=None) + # TODO(rliaw): Push into Policy Graph + with self.local_evaluator.tf_sess.graph.as_default(): + self.saver = tf.train.Saver() def _train(self): def postprocess_samples(batch): @@ -133,48 +154,29 @@ class PPOAgent(Agent): batch.data["value_targets"] = dummy batch.data["vf_preds"] = dummy extra_fetches = self.optimizer.step(postprocess_fn=postprocess_samples) + kl = np.array(extra_fetches["kl"]).mean(axis=1)[-1] + total_loss = np.array(extra_fetches["total_loss"]).mean(axis=1)[-1] + policy_loss = np.array(extra_fetches["policy_loss"]).mean(axis=1)[-1] + vf_loss = np.array(extra_fetches["vf_loss"]).mean(axis=1)[-1] + entropy = np.array(extra_fetches["entropy"]).mean(axis=1)[-1] - final_metrics = np.array(extra_fetches).mean(axis=1)[-1, :].tolist() - total_loss, policy_loss, vf_loss, kl, entropy = final_metrics - self.local_evaluator.update_kl(kl) + newkl = self.local_evaluator.for_policy(lambda pi: pi.update_kl(kl)) info = { + "kl_divergence": kl, + "kl_coefficient": newkl, "total_loss": total_loss, "policy_loss": policy_loss, "vf_loss": vf_loss, - "kl_divergence": kl, "entropy": entropy, - "kl_coefficient": self.local_evaluator.kl_coeff_val, } FilterManager.synchronize( self.local_evaluator.filters, self.remote_evaluators) - res = self._fetch_metrics_from_remote_evaluators() + res = collect_metrics(self.local_evaluator, self.remote_evaluators) res = res._replace(info=info) return res - def _fetch_metrics_from_remote_evaluators(self): - episode_rewards = [] - episode_lengths = [] - metric_lists = [a.get_completed_rollout_metrics.remote() - for a in self.remote_evaluators] - for metrics in metric_lists: - for episode in ray.get(metrics): - episode_lengths.append(episode.episode_length) - episode_rewards.append(episode.episode_reward) - avg_reward = ( - np.mean(episode_rewards) if episode_rewards else float('nan')) - avg_length = ( - np.mean(episode_lengths) if episode_lengths else float('nan')) - timesteps = np.sum(episode_lengths) if episode_lengths else 0 - - result = TrainingResult( - episode_reward_mean=avg_reward, - episode_len_mean=avg_length, - timesteps_this_iter=timesteps) - - return result - def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for ev in self.remote_evaluators: @@ -182,29 +184,30 @@ class PPOAgent(Agent): def _save(self, checkpoint_dir): checkpoint_path = self.saver.save( - self.local_evaluator.sess, + self.local_evaluator.tf_sess, os.path.join(checkpoint_dir, "checkpoint"), global_step=self.iteration) agent_state = ray.get( [a.save.remote() for a in self.remote_evaluators]) extra_data = [ self.local_evaluator.save(), - self.global_step, agent_state] pickle.dump(extra_data, open(checkpoint_path + ".extra_data", "wb")) return checkpoint_path def _restore(self, checkpoint_path): - self.saver.restore(self.local_evaluator.sess, checkpoint_path) + self.saver.restore(self.local_evaluator.tf_sess, checkpoint_path) extra_data = pickle.load(open(checkpoint_path + ".extra_data", "rb")) self.local_evaluator.restore(extra_data[0]) - self.global_step = extra_data[1] ray.get([ a.restore.remote(o) - for (a, o) in zip(self.remote_evaluators, extra_data[2])]) + for (a, o) in zip(self.remote_evaluators, extra_data[1])]) - def compute_action(self, observation): - observation = self.local_evaluator.obs_filter( + def compute_action(self, observation, state=None): + if state is None: + state = [] + obs = self.local_evaluator.filters["default"]( observation, update=False) - return self.local_evaluator.common_policy.compute_actions( - [observation], [], False)[0][0] + return self.local_evaluator.for_policy( + lambda p: p.compute_single_action( + obs, state, is_training=False)[0]) diff --git a/python/ray/rllib/ppo/ppo_evaluator.py b/python/ray/rllib/ppo/ppo_evaluator.py deleted file mode 100644 index 46d2ae223..000000000 --- a/python/ray/rllib/ppo/ppo_evaluator.py +++ /dev/null @@ -1,201 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import pickle -import tensorflow as tf -from collections import OrderedDict - -import ray -from ray.rllib.optimizers import SampleBatch, TFMultiGPUSupport -from ray.rllib.models import ModelCatalog -from ray.rllib.utils.sampler import SyncSampler -from ray.rllib.utils.filter import get_filter, MeanStdFilter -from ray.rllib.utils.postprocessing import compute_advantages -from ray.rllib.ppo.loss import ProximalPolicyGraph - - -class PPOEvaluator(TFMultiGPUSupport): - """ - Runner class that holds the simulator environment and the policy. - - Initializes the tensorflow graphs for both training and evaluation. - One common policy graph is initialized on '/cpu:0' and holds all the shared - network weights. When run as a remote agent, only this graph is used. - """ - - def __init__(self, env_creator, config, logdir, is_remote): - self.config = config - self.logdir = logdir - self.env = ModelCatalog.get_preprocessor_as_wrapper( - env_creator(config["env_config"]), config["model"]) - if is_remote: - config_proto = tf.ConfigProto() - else: - config_proto = tf.ConfigProto(**config["tf_session_args"]) - self.sess = tf.Session(config=config_proto) - self.kl_coeff_val = self.config["kl_coeff"] - self.kl_target = self.config["kl_target"] - - # Defines the training inputs: - # The coefficient of the KL penalty. - self.kl_coeff = tf.placeholder( - name="newkl", shape=(), dtype=tf.float32) - - # The input observations. - self.observations = tf.placeholder( - tf.float32, shape=(None,) + self.env.observation_space.shape) - # Targets of the value function. - self.value_targets = tf.placeholder(tf.float32, shape=(None,)) - # Advantage values in the policy gradient estimator. - self.advantages = tf.placeholder(tf.float32, shape=(None,)) - - action_space = self.env.action_space - self.actions = ModelCatalog.get_action_placeholder(action_space) - self.distribution_class, self.logit_dim = ModelCatalog.get_action_dist( - action_space, config["model"]) - # Log probabilities from the policy before the policy update. - self.prev_logits = tf.placeholder( - tf.float32, shape=(None, self.logit_dim)) - # Value function predictions before the policy update. - self.prev_vf_preds = tf.placeholder(tf.float32, shape=(None,)) - - self.inputs = [ - ("obs", self.observations), - ("value_targets", self.value_targets), - ("advantages", self.advantages), - ("actions", self.actions), - ("logprobs", self.prev_logits), - ("vf_preds", self.prev_vf_preds) - ] - self.common_policy = self.build_tf_loss([ph for _, ph in self.inputs]) - - # References to the model weights - self.variables = ray.experimental.TensorFlowVariables( - self.common_policy.loss, self.sess) - self.obs_filter = get_filter( - config["observation_filter"], self.env.observation_space.shape) - self.rew_filter = MeanStdFilter((), clip=5.0) - self.filters = {"obs_filter": self.obs_filter, - "rew_filter": self.rew_filter} - self.sampler = SyncSampler( - self.env, {"default": self.common_policy}, lambda _: "default", - {"default": self.obs_filter}, self.config["horizon"], - self.config["horizon"]) - - def tf_loss_inputs(self): - return self.inputs - - def build_tf_loss(self, input_placeholders): - obs, vtargets, advs, acts, plog, pvf_preds = input_placeholders - return ProximalPolicyGraph( - self.env.observation_space, self.env.action_space, - obs, vtargets, advs, acts, plog, pvf_preds, self.logit_dim, - self.kl_coeff, self.distribution_class, self.config, - self.sess) - - def init_extra_ops(self, device_losses): - self.extra_ops = OrderedDict() - with tf.name_scope("test_outputs"): - policies = device_losses - self.extra_ops["loss"] = tf.reduce_mean( - tf.stack(values=[ - policy.loss for policy in policies]), 0) - self.extra_ops["policy_loss"] = tf.reduce_mean( - tf.stack(values=[ - policy.mean_policy_loss for policy in policies]), 0) - self.extra_ops["vf_loss"] = tf.reduce_mean( - tf.stack(values=[ - policy.mean_vf_loss for policy in policies]), 0) - self.extra_ops["kl"] = tf.reduce_mean( - tf.stack(values=[ - policy.mean_kl for policy in policies]), 0) - self.extra_ops["entropy"] = tf.reduce_mean( - tf.stack(values=[ - policy.mean_entropy for policy in policies]), 0) - - def extra_apply_grad_fetches(self): - return list(self.extra_ops.values()) - - def extra_apply_grad_feed_dict(self): - return {self.kl_coeff: self.kl_coeff_val} - - def update_kl(self, sampled_kl): - if sampled_kl > 2.0 * self.kl_target: - self.kl_coeff_val *= 1.5 - elif sampled_kl < 0.5 * self.kl_target: - self.kl_coeff_val *= 0.5 - - def save(self): - filters = self.get_filters(flush_after=True) - return pickle.dumps({ - "filters": filters, - "kl_coeff_val": self.kl_coeff_val, - "kl_target": self.kl_target, - - }) - - def restore(self, objs): - objs = pickle.loads(objs) - self.sync_filters(objs["filters"]) - self.kl_coeff_val = objs["kl_coeff_val"] - self.kl_target = objs["kl_target"] - - def get_weights(self): - return self.variables.get_weights() - - def set_weights(self, weights): - self.variables.set_weights(weights) - - def sample(self): - """Returns experience samples from this Evaluator. Observation - filter and reward filters are flushed here. - - Returns: - SampleBatch: A columnar batch of experiences. - """ - num_steps_so_far = 0 - all_samples = [] - - while num_steps_so_far < self.config["min_steps_per_task"]: - rollout = self.sampler.get_data() - last_r = 0.0 # note: not needed since we don't truncate rollouts - samples = compute_advantages( - rollout, last_r, self.config["gamma"], - self.config["lambda"], use_gae=self.config["use_gae"]) - num_steps_so_far += samples.count - all_samples.append(samples) - return SampleBatch.concat_samples(all_samples) - - def get_completed_rollout_metrics(self): - """Returns metrics on previously completed rollouts. - - Calling this clears the queue of completed rollout metrics. - """ - return self.sampler.get_metrics() - - def sync_filters(self, new_filters): - """Changes self's filter to given and rebases any accumulated delta. - - Args: - new_filters (dict): Filters with new state to update local copy. - """ - assert all(k in new_filters for k in self.filters) - for k in self.filters: - self.filters[k].sync(new_filters[k]) - - def get_filters(self, flush_after=False): - """Returns a snapshot of filters. - - Args: - flush_after (bool): Clears the filter buffer state. - - Returns: - return_filters (dict): Dict for serializable filters - """ - return_filters = {} - for k, f in self.filters.items(): - return_filters[k] = f.as_serializable() - if flush_after: - f.clear_buffer() - return return_filters diff --git a/python/ray/rllib/ppo/ppo_tf_policy.py b/python/ray/rllib/ppo/ppo_tf_policy.py new file mode 100644 index 000000000..3fd8b06fc --- /dev/null +++ b/python/ray/rllib/ppo/ppo_tf_policy.py @@ -0,0 +1,198 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + +from ray.rllib.models.catalog import ModelCatalog +from ray.rllib.utils.postprocessing import compute_advantages +from ray.rllib.utils.tf_policy_graph import TFPolicyGraph + + +class PPOLoss(object): + def __init__( + self, action_space, value_targets, advantages, actions, logprobs, + vf_preds, curr_action_dist, value_fn, cur_kl_coeff, + entropy_coeff=0, clip_param=0.1, vf_loss_coeff=1.0, use_gae=True): + """Constructs the loss for Proximal Policy Objective. + + Arguments: + action_space: Environment observation space specification. + value_targets (Placeholder): Placeholder for target values; used + for GAE. + actions (Placeholder): Placeholder for actions taken + from previous model evaluation. + advantages (Placeholder): Placeholder for calculated advantages + from previous model evaluation. + logprobs (Placeholder): Placeholder for logits output from + previous model evaluation. + vf_preds (Placeholder): Placeholder for value function output + from previous model evaluation. + curr_action_dist (ActionDistribution): ActionDistribution + of the current model. + value_fn (Tensor): Current value function output Tensor. + cur_kl_coeff (Variable): Variable holding the current PPO KL + coefficient. + entropy_coeff (float): Coefficient of the entropy regularizer. + clip_param (float): Clip parameter + vf_loss_coeff (float): Coefficient of the value function loss + use_gae (bool): If true, use the Generalized Advantage Estimator. + """ + dist_cls, _ = ModelCatalog.get_action_dist(action_space) + prev_dist = dist_cls(logprobs) + # Make loss functions. + logp_ratio = tf.exp( + curr_action_dist.logp(actions) - prev_dist.logp(actions)) + action_kl = prev_dist.kl(curr_action_dist) + self.mean_kl = tf.reduce_mean(action_kl) + + curr_entropy = curr_action_dist.entropy() + self.mean_entropy = tf.reduce_mean(curr_entropy) + + surrogate_loss = tf.minimum( + advantages * logp_ratio, + advantages * tf.clip_by_value( + logp_ratio, 1 - clip_param, 1 + clip_param)) + self.mean_policy_loss = tf.reduce_mean(-surrogate_loss) + + if use_gae: + vf_loss1 = tf.square(value_fn - value_targets) + vf_clipped = vf_preds + tf.clip_by_value( + value_fn - vf_preds, -clip_param, clip_param) + vf_loss2 = tf.square(vf_clipped - value_targets) + vf_loss = tf.minimum(vf_loss1, vf_loss2) + self.mean_vf_loss = tf.reduce_mean(vf_loss) + loss = tf.reduce_mean( + -surrogate_loss + cur_kl_coeff*action_kl + + vf_loss_coeff*vf_loss - entropy_coeff*curr_entropy) + else: + self.mean_vf_loss = tf.constant(0.0) + loss = tf.reduce_mean( + -surrogate_loss + cur_kl_coeff*action_kl - + entropy_coeff*curr_entropy) + self.loss = loss + + +class PPOTFPolicyGraph(TFPolicyGraph): + def __init__(self, observation_space, action_space, + config, existing_inputs=None): + """ + Arguments: + observation_space: Environment observation space specification. + action_space: Environment action space specification. + config (dict): Configuration values for PPO graph. + existing_inputs (list): Optional list of tuples that specify the + placeholders upon which the graph should be built upon. + """ + self.sess = tf.get_default_session() + self.action_space = action_space + self.config = config + self.kl_coeff_val = self.config["kl_coeff"] + self.kl_target = self.config["kl_target"] + dist_cls, logit_dim = ModelCatalog.get_action_dist( + action_space) + + if existing_inputs: + self.loss_in = existing_inputs + obs_ph, value_targets_ph, adv_ph, act_ph, \ + logprobs_ph, vf_preds_ph = [ph for _, ph in existing_inputs] + else: + obs_ph = tf.placeholder( + tf.float32, name="obs", shape=(None,)+observation_space.shape) + # Targets of the value function. + value_targets_ph = tf.placeholder( + tf.float32, name="value_targets", shape=(None,)) + # Advantage values in the policy gradient estimator. + adv_ph = tf.placeholder( + tf.float32, name="advantages", shape=(None,)) + act_ph = ModelCatalog.get_action_placeholder(action_space) + # Log probabilities from the policy before the policy update. + logprobs_ph = tf.placeholder( + tf.float32, name="logprobs", shape=(None, logit_dim)) + # Value function predictions before the policy update. + vf_preds_ph = tf.placeholder( + tf.float32, name="vf_preds", shape=(None,)) + self.loss_in = [ + ("obs", obs_ph), + ("value_targets", value_targets_ph), + ("advantages", adv_ph), + ("actions", act_ph), + ("logprobs", logprobs_ph), + ("vf_preds", vf_preds_ph) + ] + + # KL Coefficient + self.kl_coeff = tf.get_variable( + initializer=tf.constant_initializer(self.kl_coeff_val), + name="kl_coeff", shape=(), trainable=False, dtype=tf.float32) + + self.logits = ModelCatalog.get_model( + obs_ph, logit_dim, self.config["model"]).outputs + curr_action_dist = dist_cls(self.logits) + self.sampler = curr_action_dist.sample() + if self.config["use_gae"]: + vf_config = self.config["model"].copy() + # Do not split the last layer of the value function into + # mean parameters and standard deviation parameters and + # do not make the standard deviations free variables. + vf_config["free_log_std"] = False + with tf.variable_scope("value_function"): + self.value_function = ModelCatalog.get_model( + obs_ph, 1, vf_config).outputs + self.value_function = tf.reshape(self.value_function, [-1]) + else: + self.value_function = tf.constant("NA") + + self.loss_obj = PPOLoss( + action_space, value_targets_ph, adv_ph, act_ph, + logprobs_ph, vf_preds_ph, + curr_action_dist, self.value_function, self.kl_coeff, + entropy_coeff=self.config["entropy_coeff"], + clip_param=self.config["clip_param"], + vf_loss_coeff=self.config["kl_target"], + use_gae=self.config["use_gae"]) + self.is_training = tf.placeholder_with_default(True, ()) + + TFPolicyGraph.__init__( + self, observation_space, action_space, + self.sess, obs_input=obs_ph, + action_sampler=self.sampler, loss=self.loss_obj.loss, + loss_inputs=self.loss_in, + is_training=self.is_training) + + def copy(self, existing_inputs): + """Creates a copy of self using existing input placeholders.""" + return PPOTFPolicyGraph( + None, self.action_space, self.config, + existing_inputs=existing_inputs) + + def extra_compute_action_fetches(self): + return {"vf_preds": self.value_function, "logprobs": self.logits} + + def extra_apply_grad_fetches(self): + return { + "total_loss": self.loss_obj.loss, + "policy_loss": self.loss_obj.mean_policy_loss, + "vf_loss": self.loss_obj.mean_vf_loss, + "kl": self.loss_obj.mean_kl, + "entropy": self.loss_obj.mean_entropy + } + + def update_kl(self, sampled_kl): + if sampled_kl > 2.0 * self.kl_target: + self.kl_coeff_val *= 1.5 + elif sampled_kl < 0.5 * self.kl_target: + self.kl_coeff_val *= 0.5 + self.kl_coeff.load(self.kl_coeff_val, session=self.sess) + return self.kl_coeff_val + + def postprocess_trajectory(self, sample_batch, other_agent_batches=None): + last_r = 0.0 + batch = compute_advantages( + sample_batch, last_r, self.config["gamma"], + self.config["lambda"], use_gae=self.config["use_gae"]) + return batch + + def gradients(self, optimizer): + return optimizer.compute_gradients( + self._loss, colocate_gradients_with_ops=True) diff --git a/python/ray/rllib/utils/common_policy_evaluator.py b/python/ray/rllib/utils/common_policy_evaluator.py index 14d374d1a..c25ad30b0 100644 --- a/python/ray/rllib/utils/common_policy_evaluator.py +++ b/python/ray/rllib/utils/common_policy_evaluator.py @@ -192,6 +192,7 @@ class CommonPolicyEvaluator(PolicyEvaluator): env_context = EnvContext(env_config or {}, worker_index) policy_config = policy_config or {} + self.policy_config = policy_config model_config = model_config or {} policy_mapping_fn = ( policy_mapping_fn or (lambda agent_id: DEFAULT_POLICY_ID)) diff --git a/python/ray/rllib/utils/tf_policy_graph.py b/python/ray/rllib/utils/tf_policy_graph.py index 3f72bcb22..23e6bf02a 100644 --- a/python/ray/rllib/utils/tf_policy_graph.py +++ b/python/ray/rllib/utils/tf_policy_graph.py @@ -210,3 +210,6 @@ class TFPolicyGraph(PolicyGraph): def gradients(self, optimizer): return optimizer.compute_gradients(self._loss) + + def loss_inputs(self): + return self._loss_inputs