diff --git a/python/ray/rllib/policy_gradient/agent.py b/python/ray/rllib/policy_gradient/agent.py index ddddc3270..18b0f38db 100644 --- a/python/ray/rllib/policy_gradient/agent.py +++ b/python/ray/rllib/policy_gradient/agent.py @@ -16,7 +16,8 @@ from ray.rllib.models import ModelCatalog from ray.rllib.policy_gradient.env import BatchedEnv from ray.rllib.policy_gradient.loss import ProximalPolicyLoss from ray.rllib.policy_gradient.filter import MeanStdFilter -from ray.rllib.policy_gradient.rollout import rollouts, add_advantage_values +from ray.rllib.policy_gradient.rollout import ( + rollouts, add_return_values, add_advantage_values) from ray.rllib.policy_gradient.utils import flatten, concatenate # TODO(pcm): Make sure that both observation_filter and reward_filter @@ -51,19 +52,25 @@ class Agent(object): config_proto = tf.ConfigProto(**config["tf_session_args"]) self.preprocessor = self.env.preprocessor self.sess = tf.Session(config=config_proto) - if config["use_tf_debugger"] and not is_remote: + if config["tf_debug_inf_or_nan"] and not is_remote: self.sess = tf_debug.LocalCLIDebugWrapperSession(self.sess) self.sess.add_tensor_filter( "has_inf_or_nan", tf_debug.has_inf_or_nan) - # Defines the training inputs. + # Defines the training inputs: + # The coefficient of the KL penalty. self.kl_coeff = tf.placeholder( name="newkl", shape=(), dtype=tf.float32) + # The shape of the preprocessed observations. self.preprocessor_shape = self.preprocessor.transform_shape( self.env.observation_space.shape) + # The input observations. self.observations = tf.placeholder( tf.float32, shape=(None,) + self.preprocessor_shape) + # Targets of the value function. + self.returns = tf.placeholder(tf.float32, shape=(None,)) + # Advantage values in the policy gradient estimator. self.advantages = tf.placeholder(tf.float32, shape=(None,)) action_space = self.env.action_space @@ -78,8 +85,11 @@ class Agent(object): "currently not supported") self.distribution_class, self.logit_dim = ModelCatalog.get_action_dist( action_space) + # Log probabilities from the policy before the policy update. self.prev_logits = tf.placeholder( tf.float32, shape=(None, self.logit_dim)) + # Value function predictions before the policy update. + self.prev_vf_preds = tf.placeholder(tf.float32, shape=(None,)) assert config["sgd_batchsize"] % len(devices) == 0, \ "Batch size must be evenly divisible by devices" @@ -90,18 +100,18 @@ class Agent(object): self.batch_size = config["sgd_batchsize"] self.per_device_batch_size = int(self.batch_size / len(devices)) - def build_loss(obs, advs, acts, plog): + def build_loss(obs, rets, advs, acts, plog, pvf_preds): return ProximalPolicyLoss( self.env.observation_space, self.env.action_space, - obs, advs, acts, plog, self.logit_dim, + obs, rets, advs, acts, plog, pvf_preds, self.logit_dim, self.kl_coeff, self.distribution_class, self.config, self.sess) self.par_opt = LocalSyncParallelOptimizer( tf.train.AdamOptimizer(self.config["sgd_stepsize"]), self.devices, - [self.observations, self.advantages, self.actions, - self.prev_logits], + [self.observations, self.returns, self.advantages, + self.actions, self.prev_logits, self.prev_vf_preds], self.per_device_batch_size, build_loss, self.logdir) @@ -110,12 +120,20 @@ class Agent(object): with tf.name_scope("test_outputs"): policies = self.par_opt.get_device_losses() self.mean_loss = tf.reduce_mean( - tf.stack(values=[policy.loss for policy in policies]), 0) + tf.stack(values=[ + policy.loss for policy in policies]), 0) + self.mean_policy_loss = tf.reduce_mean( + tf.stack(values=[ + policy.mean_policy_loss for policy in policies]), 0) + self.mean_vf_loss = tf.reduce_mean( + tf.stack(values=[ + policy.mean_vf_loss for policy in policies]), 0) self.mean_kl = tf.reduce_mean( - tf.stack(values=[policy.mean_kl for policy in policies]), 0) + tf.stack(values=[ + policy.mean_kl for policy in policies]), 0) self.mean_entropy = tf.reduce_mean( - tf.stack( - values=[policy.mean_entropy for policy in policies]), 0) + tf.stack(values=[ + policy.mean_entropy for policy in policies]), 0) # References to the model weights self.common_policy = self.par_opt.get_common_loss() @@ -127,20 +145,36 @@ class Agent(object): self.sess.run(tf.global_variables_initializer()) def load_data(self, trajectories, full_trace): - return self.par_opt.load_data( - self.sess, - [trajectories["observations"], - trajectories["advantages"], - trajectories["actions"].squeeze(), - trajectories["logprobs"]], - full_trace=full_trace) + if self.config["use_gae"]: + return self.par_opt.load_data( + self.sess, + [trajectories["observations"], + trajectories["td_lambda_returns"], + trajectories["advantages"], + trajectories["actions"].squeeze(), + trajectories["logprobs"], + trajectories["vf_preds"]], + full_trace=full_trace) + else: + dummy = np.zeros((trajectories["observations"].shape[0],)) + return self.par_opt.load_data( + self.sess, + [trajectories["observations"], + dummy, + trajectories["returns"], + trajectories["actions"].squeeze(), + trajectories["logprobs"], + dummy], + full_trace=full_trace) def run_sgd_minibatch( self, batch_index, kl_coeff, full_trace, file_writer): return self.par_opt.optimize( self.sess, batch_index, - extra_ops=[self.mean_loss, self.mean_kl, self.mean_entropy], + extra_ops=[ + self.mean_loss, self.mean_policy_loss, self.mean_vf_loss, + self.mean_kl, self.mean_entropy], extra_feed_dict={self.kl_coeff: kl_coeff}, file_writer=file_writer if full_trace else None) @@ -155,7 +189,10 @@ class Agent(object): trajectory = rollouts( self.common_policy, self.env, horizon, self.observation_filter, self.reward_filter) - add_advantage_values(trajectory, gamma, lam, self.reward_filter) + if self.config["use_gae"]: + add_advantage_values(trajectory, gamma, lam, self.reward_filter) + else: + add_return_values(trajectory, gamma, self.reward_filter) return trajectory def compute_steps(self, gamma, lam, horizon, min_steps_per_task=-1): diff --git a/python/ray/rllib/policy_gradient/loss.py b/python/ray/rllib/policy_gradient/loss.py index dfb82d071..8c15e68ff 100644 --- a/python/ray/rllib/policy_gradient/loss.py +++ b/python/ray/rllib/policy_gradient/loss.py @@ -12,7 +12,8 @@ class ProximalPolicyLoss(object): def __init__( self, observation_space, action_space, - observations, advantages, actions, prev_logits, logit_dim, + observations, returns, advantages, actions, + prev_logits, prev_vf_preds, logit_dim, kl_coeff, distribution_class, config, sess): assert (isinstance(action_space, gym.spaces.Discrete) or isinstance(action_space, gym.spaces.Box)) @@ -26,6 +27,17 @@ class ProximalPolicyLoss(object): self.curr_dist = distribution_class(self.curr_logits) self.sampler = self.curr_dist.sample() + if config["use_gae"]: + vf_config = config["model"].copy() + # Do not split the last layer of the value function into + # mean parameters and standard deviation parameters and + # do not make the standard deviations free variables. + vf_config["free_logstd"] = False + with tf.variable_scope("value_function"): + self.value_function = ModelCatalog.get_model( + observations, 1, vf_config).outputs + self.value_function = tf.reshape(self.value_function, [-1]) + # Make loss functions. self.ratio = tf.exp(self.curr_dist.logp(actions) - self.prev_dist.logp(actions)) @@ -37,12 +49,41 @@ class ProximalPolicyLoss(object): self.surr2 = tf.clip_by_value(self.ratio, 1 - config["clip_param"], 1 + config["clip_param"]) * advantages self.surr = tf.minimum(self.surr1, self.surr2) - self.loss = tf.reduce_mean(-self.surr + kl_coeff * self.kl - - config["entropy_coeff"] * self.entropy) + self.mean_policy_loss = tf.reduce_mean(-self.surr) + + if config["use_gae"]: + # We use a huber loss here to be more robust against outliers, + # which seem to occur when the rollouts get longer (the variance + # scales superlinearly with the length of the rollout) + self.vf_loss1 = tf.square(self.value_function - returns) + vf_clipped = prev_vf_preds + tf.clip_by_value( + self.value_function - prev_vf_preds, + -config["clip_param"], config["clip_param"]) + self.vf_loss2 = tf.square(vf_clipped - returns) + self.vf_loss = tf.minimum(self.vf_loss1, self.vf_loss2) + self.mean_vf_loss = tf.reduce_mean(self.vf_loss) + self.loss = tf.reduce_mean( + -self.surr + kl_coeff * self.kl + + config["vf_loss_coeff"] * self.vf_loss - + config["entropy_coeff"] * self.entropy) + else: + self.mean_vf_loss = tf.constant(0.0) + self.loss = tf.reduce_mean( + -self.surr + + kl_coeff * self.kl - + config["entropy_coeff"] * self.entropy) + self.sess = sess - def compute_actions(self, observations): - return self.sess.run([self.sampler, self.curr_logits], + if config["use_gae"]: + self.policy_results = [ + self.sampler, self.curr_logits, self.value_function] + else: + self.policy_results = [ + self.sampler, self.curr_logits, tf.constant("NA")] + + def compute(self, observations): + return self.sess.run(self.policy_results, feed_dict={self.observations: observations}) def loss(self): diff --git a/python/ray/rllib/policy_gradient/policy_gradient.py b/python/ray/rllib/policy_gradient/policy_gradient.py index 6cb9d22f5..8361185c7 100644 --- a/python/ray/rllib/policy_gradient/policy_gradient.py +++ b/python/ray/rllib/policy_gradient/policy_gradient.py @@ -7,6 +7,7 @@ import time import numpy as np import tensorflow as tf +from tensorflow.python import debug as tf_debug import ray from ray.rllib.common import Algorithm, TrainingResult @@ -20,6 +21,9 @@ DEFAULT_CONFIG = { "gamma": 0.995, # Number of steps after which the rollout gets cut "horizon": 2000, + # If true, use the Generalized Advantage Estimator (GAE) + # with a value function, see https://arxiv.org/pdf/1506.02438.pdf. + "use_gae": True, # GAE(lambda) parameter "lambda": 1.0, # Initial coefficient for KL divergence @@ -40,6 +44,8 @@ DEFAULT_CONFIG = { "rollout_batchsize": 1, # Total SGD batch size across all devices for SGD "sgd_batchsize": 128, + # Coefficient of the value function loss + "vf_loss_coeff": 1.0, # Coefficient of the entropy regularizer "entropy_coeff": 0.0, # PPO clip parameter @@ -58,9 +64,11 @@ DEFAULT_CONFIG = { "full_trace_nth_sgd_batch": -1, # Whether to profile data loading "full_trace_data_load": False, + # Outer loop iteration index when we drop into the TensorFlow debugger + "tf_debug_iteration": -1, # If this is True, the TensorFlow debugger is invoked if an Inf or NaN # is detected - "use_tf_debugger": False, + "tf_debug_inf_or_nan": False, # If True, we write checkpoints and tensorflow logging "write_logs": True, # Name of the model checkpoint file @@ -125,13 +133,22 @@ class PolicyGradient(Algorithm): simple_value=traj_len_mean)]) file_writer.add_summary(traj_stats, self.global_step) self.global_step += 1 - trajectory["advantages"] = ((trajectory["advantages"] - - trajectory["advantages"].mean()) / - trajectory["advantages"].std()) + + def standardized(value): + # Divide by the maximum of value.std() and 1e-4 + # to guard against the case where all values are equal + return (value - value.mean()) / max(1e-4, value.std()) + + if config["use_gae"]: + trajectory["advantages"] = standardized(trajectory["advantages"]) + else: + trajectory["returns"] = standardized(trajectory["returns"]) + rollouts_end = time.time() print("Computing policy (iterations=" + str(config["num_sgd_iter"]) + ", stepsize=" + str(config["sgd_stepsize"]) + "):") - names = ["iter", "loss", "kl", "entropy"] + names = [ + "iter", "total loss", "policy loss", "vf loss", "kl", "entropy"] print(("{:>15}" * len(names)).format(*names)) trajectory = shuffle(trajectory) shuffle_end = time.time() @@ -148,26 +165,35 @@ class PolicyGradient(Algorithm): batch_index = 0 num_batches = ( int(tuples_per_device) // int(model.per_device_batch_size)) - loss, kl, entropy = [], [], [] + loss, policy_loss, vf_loss, kl, entropy = [], [], [], [], [] permutation = np.random.permutation(num_batches) + # Prepare to drop into the debugger + if j == config["tf_debug_iteration"]: + model.sess = tf_debug.LocalCLIDebugWrapperSession(model.sess) while batch_index < num_batches: full_trace = ( i == 0 and j == 0 and batch_index == config["full_trace_nth_sgd_batch"]) - batch_loss, batch_kl, batch_entropy = model.run_sgd_minibatch( - permutation[batch_index] * model.per_device_batch_size, - self.kl_coeff, full_trace, - file_writer if write_tf_logs else None) + batch_loss, batch_policy_loss, batch_vf_loss, batch_kl, \ + batch_entropy = model.run_sgd_minibatch( + permutation[batch_index] * model.per_device_batch_size, + self.kl_coeff, full_trace, + file_writer if write_tf_logs else None) loss.append(batch_loss) + policy_loss.append(batch_policy_loss) + vf_loss.append(batch_vf_loss) kl.append(batch_kl) entropy.append(batch_entropy) batch_index += 1 loss = np.mean(loss) + policy_loss = np.mean(policy_loss) + vf_loss = np.mean(vf_loss) kl = np.mean(kl) entropy = np.mean(entropy) sgd_end = time.time() print( - "{:>15}{:15.5e}{:15.5e}{:15.5e}".format(i, loss, kl, entropy)) + "{:>15}{:15.5e}{:15.5e}{:15.5e}{:15.5e}{:15.5e}".format( + i, loss, policy_loss, vf_loss, kl, entropy)) values = [] if i == config["num_sgd_iter"] - 1: diff --git a/python/ray/rllib/policy_gradient/rollout.py b/python/ray/rllib/policy_gradient/rollout.py index d4911dde0..b44666c3f 100644 --- a/python/ray/rllib/policy_gradient/rollout.py +++ b/python/ray/rllib/policy_gradient/rollout.py @@ -33,14 +33,16 @@ def rollouts(policy, env, horizon, observation_filter=NoFilter(), observation = observation_filter(env.reset()) done = np.array(env.batchsize * [False]) t = 0 - observations = [] - raw_rewards = [] # Empirical rewards - actions = [] - logprobs = [] - dones = [] + observations = [] # Filtered observations + raw_rewards = [] # Empirical rewards + actions = [] # Actions sampled by the policy + logprobs = [] # Last layer of the policy network + vf_preds = [] # Value function predictions + dones = [] # Has this rollout terminated? - while not done.all() and t < horizon: - action, logprob = policy.compute_actions(observation) + while True: + action, logprob, vfpred = policy.compute(observation) + vf_preds.append(vfpred) observations.append(observation[None]) actions.append(action[None]) logprobs.append(logprob[None]) @@ -49,27 +51,49 @@ def rollouts(policy, env, horizon, observation_filter=NoFilter(), raw_rewards.append(raw_reward[None]) dones.append(done[None]) t += 1 + if done.all() or t >= horizon: + break return {"observations": np.vstack(observations), "raw_rewards": np.vstack(raw_rewards), "actions": np.vstack(actions), "logprobs": np.vstack(logprobs), + "vf_preds": np.vstack(vf_preds), "dones": np.vstack(dones)} +def add_return_values(trajectory, gamma, reward_filter): + rewards = trajectory["raw_rewards"] + dones = trajectory["dones"] + returns = np.zeros_like(rewards) + last_return = np.zeros(rewards.shape[1], dtype="float32") + + for t in reversed(range(len(rewards) - 1)): + last_return = rewards[t, :] * (1 - dones[t, :]) + gamma * last_return + returns[t, :] = last_return + reward_filter(returns[t, :]) + + trajectory["returns"] = returns + + def add_advantage_values(trajectory, gamma, lam, reward_filter): rewards = trajectory["raw_rewards"] + vf_preds = trajectory["vf_preds"] dones = trajectory["dones"] advantages = np.zeros_like(rewards) last_advantage = np.zeros(rewards.shape[1], dtype="float32") - for t in reversed(range(len(rewards))): - delta = rewards[t, :] * (1 - dones[t, :]) - last_advantage = delta + gamma * lam * last_advantage + for t in reversed(range(len(rewards) - 1)): + delta = rewards[t, :] * (1 - dones[t, :]) + \ + gamma * vf_preds[t+1, :] * (1 - dones[t+1, :]) - vf_preds[t, :] + last_advantage = \ + delta + gamma * lam * last_advantage * (1 - dones[t+1, :]) advantages[t, :] = last_advantage reward_filter(advantages[t, :]) trajectory["advantages"] = advantages + trajectory["td_lambda_returns"] = \ + trajectory["advantages"] + trajectory["vf_preds"] def collect_samples(agents, diff --git a/python/ray/rllib/test.sh b/python/ray/rllib/test.sh index f721dc4c4..162c53137 100755 --- a/python/ray/rllib/test.sh +++ b/python/ray/rllib/test.sh @@ -6,7 +6,9 @@ python train.py --env CartPole-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20 python train.py --env Walker2d-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64}' --alg PolicyGradient --upload-dir s3://bucketname/ -python train.py --env Humanoid-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64, "model": {"free_logstd": true}}' --alg PolicyGradient --upload-dir s3://bucketname/ +python train.py --env Humanoid-v1 --config '{"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64, "model": {"free_logstd": true}, "use_gae": false}' --alg PolicyGradient --upload-dir s3://bucketname/ + +python train.py --env Humanoid-v1 --config '{"lambda": 0.95, "clip_param": 0.2, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": 1e-4, "sgd_batchsize": 32768, "horizon": 5000, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_agents": 64, "model": {"free_logstd": true}, "write_logs": false}' --alg PolicyGradient --upload-dir s3://bucketname/ python train.py --env PongNoFrameskip-v0 --alg DQN --upload-dir s3://bucketname/ python train.py --env PongDeterministic-v0 --alg A3C --upload-dir s3://bucketname/ diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index ebf5016c1..c51fa836b 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -70,6 +70,13 @@ docker run --shm-size=10G --memory=10G $DOCKER_SHA \ --num-iterations 2 \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "sgd_stepsize": 1e-4, "sgd_batchsize": 64, "timesteps_per_batch": 2000, "num_agents": 1}' +docker run --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env CartPole-v1 \ + --alg PolicyGradient \ + --num-iterations 2 \ + --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "sgd_stepsize": 1e-4, "sgd_batchsize": 64, "timesteps_per_batch": 2000, "num_agents": 1, "use_gae": false}' + docker run --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pendulum-v0 \