From 0225581078e787327cf883152c806d2888cf7d89 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 5 Aug 2017 22:13:30 -0700 Subject: [PATCH] [rllib] Improve performance for small rollouts (#812) * batch small rollouts together * implement minimum number of samples for each task * add total time * fix linting * style * fix * factor out parameters and document stuff * add rollout batchsize * address comments * linting * small fix --- python/ray/rllib/policy_gradient/agent.py | 46 ++++++++++++++++++- .../rllib/policy_gradient/policy_gradient.py | 34 ++++++++++++-- python/ray/rllib/policy_gradient/rollout.py | 40 +++++++--------- 3 files changed, 92 insertions(+), 28 deletions(-) diff --git a/python/ray/rllib/policy_gradient/agent.py b/python/ray/rllib/policy_gradient/agent.py index 85de0d06f..8bda9106f 100644 --- a/python/ray/rllib/policy_gradient/agent.py +++ b/python/ray/rllib/policy_gradient/agent.py @@ -8,6 +8,7 @@ import os from tensorflow.python import debug as tf_debug +import numpy as np import ray from ray.rllib.parallel import LocalSyncParallelOptimizer @@ -16,6 +17,7 @@ from ray.rllib.policy_gradient.env import BatchedEnv from ray.rllib.policy_gradient.loss import ProximalPolicyLoss from ray.rllib.policy_gradient.filter import MeanStdFilter from ray.rllib.policy_gradient.rollout import rollouts, add_advantage_values +from ray.rllib.policy_gradient.utils import flatten, concatenate # TODO(pcm): Make sure that both observation_filter and reward_filter # are correctly handled, i.e. (a) the values are accumulated accross @@ -82,8 +84,8 @@ class Agent(object): assert config["sgd_batchsize"] % len(devices) == 0, \ "Batch size must be evenly divisible by devices" if is_remote: - self.batch_size = 1 - self.per_device_batch_size = 1 + self.batch_size = config["rollout_batchsize"] + self.per_device_batch_size = config["rollout_batchsize"] else: self.batch_size = config["sgd_batchsize"] self.per_device_batch_size = int(self.batch_size / len(devices)) @@ -148,11 +150,51 @@ class Agent(object): self.variables.set_weights(weights) def compute_trajectory(self, gamma, lam, horizon): + """Compute a single rollout on the agent and return.""" trajectory = rollouts( self.common_policy, self.env, horizon, self.observation_filter, self.reward_filter) add_advantage_values(trajectory, gamma, lam, self.reward_filter) return trajectory + def compute_steps(self, gamma, lam, horizon, min_steps_per_task=-1): + """Compute multiple rollouts and concatenate the results. + + Args: + gamma: MDP discount factor + lam: GAE(lambda) parameter + horizon: Number of steps after which a rollout gets cut + min_steps_per_task: Lower bound on the number of states to be + collected. + + Returns: + states: List of states. + total_rewards: Total rewards of the trajectories. + trajectory_lengths: Lengths of the trajectories. + """ + num_steps_so_far = 0 + trajectories = [] + total_rewards = [] + trajectory_lengths = [] + while True: + trajectory = self.compute_trajectory(gamma, lam, horizon) + total_rewards.append( + trajectory["raw_rewards"].sum(axis=0).mean()) + trajectory_lengths.append( + np.logical_not(trajectory["dones"]).sum(axis=0).mean()) + trajectory = flatten(trajectory) + not_done = np.logical_not(trajectory["dones"]) + # Filtering out states that are done. We do this because + # trajectories are batched and cut only if all the trajectories + # in the batch terminated, so we can potentially get rid of + # some of the states here. + trajectory = {key: val[not_done] + for key, val in trajectory.items()} + num_steps_so_far += trajectory["raw_rewards"].shape[0] + trajectories.append(trajectory) + if num_steps_so_far >= min_steps_per_task: + break + return concatenate(trajectories), total_rewards, trajectory_lengths + RemoteAgent = ray.remote(Agent) diff --git a/python/ray/rllib/policy_gradient/policy_gradient.py b/python/ray/rllib/policy_gradient/policy_gradient.py index 47af6eacf..4f62d38e2 100644 --- a/python/ray/rllib/policy_gradient/policy_gradient.py +++ b/python/ray/rllib/policy_gradient/policy_gradient.py @@ -18,10 +18,19 @@ from ray.rllib.policy_gradient.utils import shuffle DEFAULT_CONFIG = { + # Discount factor of the MDP "gamma": 0.995, + # Number of steps after which the rollout gets cut + "horizon": 2000, + # GAE(lambda) parameter + "lambda": 1.0, + # Initial coefficient for KL divergence "kl_coeff": 0.2, + # Number of SGD iterations in each outer loop "num_sgd_iter": 30, + # Number of outer loop iterations "max_iterations": 1000, + # Stepsize of SGD "sgd_stepsize": 5e-5, # TODO(pcm): Expose the choice between gpus and cpus # as a command line argument. @@ -31,17 +40,34 @@ DEFAULT_CONFIG = { "log_device_placement": False, "allow_soft_placement": True, }, - "sgd_batchsize": 128, # total size across all devices + # Batch size for policy evaluations for rollouts + "rollout_batchsize": 1, + # Total SGD batch size across all devices for SGD + "sgd_batchsize": 128, + # Coefficient of the entropy regularizer "entropy_coeff": 0.0, + # PPO clip parameter "clip_param": 0.3, + # Target value for KL divergence "kl_target": 0.01, "model": {"free_logstd": False}, + # Number of timesteps collected in each outer loop "timesteps_per_batch": 40000, + # Each tasks performs rollouts until at least this + # number of steps is obtained + "min_steps_per_task": 1000, + # Number of actors used to collect the rollouts "num_agents": 5, + # Dump TensorFlow timeline after this many SGD minibatches "full_trace_nth_sgd_batch": -1, + # Whether to profile data loading "full_trace_data_load": False, + # If this is True, the TensorFlow debugger is invoked if an Inf or NaN + # is detected "use_tf_debugger": False, - "write_logs": True, # write checkpoints and tensorflow logging? + # If True, we write checkpoints and tensorflow logging + "write_logs": True, + # Name of the model checkpoint file "model_checkpoint_file": "iteration-%s.ckpt"} @@ -79,6 +105,7 @@ class PolicyGradient(Algorithm): self.env_name, 1, self.preprocessor, self.config, self.logdir, True) for _ in range(config["num_agents"])] + self.start_time = time.time() def train(self): agents = self.agents @@ -108,7 +135,7 @@ class PolicyGradient(Algorithm): weights = ray.put(model.get_weights()) [a.load_weights.remote(weights) for a in agents] trajectory, total_reward, traj_len_mean = collect_samples( - agents, config["timesteps_per_batch"], config["gamma"], 1.0, 2000) + agents, config) print("total reward is ", total_reward) print("trajectory length mean is ", traj_len_mean) print("timesteps:", trajectory["dones"].shape[0]) @@ -213,6 +240,7 @@ class PolicyGradient(Algorithm): print("load time:", load_time) print("sgd time:", sgd_time) print("sgd examples/s:", len(trajectory["observations"]) / sgd_time) + print("total time so far:", time.time() - self.start_time) result = TrainingResult( self.experiment_id.hex, j, total_reward, traj_len_mean, info) diff --git a/python/ray/rllib/policy_gradient/rollout.py b/python/ray/rllib/policy_gradient/rollout.py index 6c4f0804c..d4911dde0 100644 --- a/python/ray/rllib/policy_gradient/rollout.py +++ b/python/ray/rllib/policy_gradient/rollout.py @@ -6,7 +6,7 @@ import numpy as np import ray from ray.rllib.policy_gradient.filter import NoFilter -from ray.rllib.policy_gradient.utils import flatten, concatenate +from ray.rllib.policy_gradient.utils import concatenate def rollouts(policy, env, horizon, observation_filter=NoFilter(), @@ -72,42 +72,36 @@ def add_advantage_values(trajectory, gamma, lam, reward_filter): trajectory["advantages"] = advantages -@ray.remote -def compute_trajectory(policy, env, gamma, lam, horizon, observation_filter, - reward_filter): - trajectory = rollouts(policy, env, horizon, observation_filter, - reward_filter) - add_advantage_values(trajectory, gamma, lam, reward_filter) - return trajectory - - -def collect_samples(agents, num_timesteps, gamma, lam, horizon, - observation_filter=NoFilter(), reward_filter=NoFilter()): +def collect_samples(agents, + config, + observation_filter=NoFilter(), + reward_filter=NoFilter()): num_timesteps_so_far = 0 trajectories = [] total_rewards = [] - traj_len_means = [] + trajectory_lengths = [] # This variable maps the object IDs of trajectories that are currently # computed to the agent that they are computed on; we start some initial # tasks here. - agent_dict = {agent.compute_trajectory.remote(gamma, lam, horizon): + agent_dict = {agent.compute_steps.remote( + config["gamma"], config["lambda"], + config["horizon"], config["min_steps_per_task"]): agent for agent in agents} - while num_timesteps_so_far < num_timesteps: + while num_timesteps_so_far < config["timesteps_per_batch"]: # TODO(pcm): Make wait support arbitrary iterators and remove the # conversion to list here. [next_trajectory], waiting_trajectories = ray.wait( list(agent_dict.keys())) agent = agent_dict.pop(next_trajectory) # Start task with next trajectory and record it in the dictionary. - agent_dict[agent.compute_trajectory.remote(gamma, lam, horizon)] = ( + agent_dict[agent.compute_steps.remote( + config["gamma"], config["lambda"], + config["horizon"], config["min_steps_per_task"])] = ( agent) - trajectory = flatten(ray.get(next_trajectory)) - not_done = np.logical_not(trajectory["dones"]) - total_rewards.append( - trajectory["raw_rewards"][not_done].sum(axis=0).mean()) - traj_len_means.append(not_done.sum(axis=0).mean()) - trajectory = {key: val[not_done] for key, val in trajectory.items()} + trajectory, rewards, lengths = ray.get(next_trajectory) + total_rewards.extend(rewards) + trajectory_lengths.extend(lengths) num_timesteps_so_far += len(trajectory["dones"]) trajectories.append(trajectory) return (concatenate(trajectories), np.mean(total_rewards), - np.mean(traj_len_means)) + np.mean(trajectory_lengths))