[rllib] Improve performance for small rollouts (#812)

* batch small rollouts together

* implement minimum number of samples for each task

* add total time

* fix linting

* style

* fix

* factor out parameters and document stuff

* add rollout batchsize

* address comments

* linting

* small fix
This commit is contained in:
Philipp Moritz
2017-08-05 22:13:30 -07:00
committed by Robert Nishihara
parent 64eaaaebf0
commit 0225581078
3 changed files with 92 additions and 28 deletions
+44 -2
View File
@@ -8,6 +8,7 @@ import os
from tensorflow.python import debug as tf_debug
import numpy as np
import ray
from ray.rllib.parallel import LocalSyncParallelOptimizer
@@ -16,6 +17,7 @@ from ray.rllib.policy_gradient.env import BatchedEnv
from ray.rllib.policy_gradient.loss import ProximalPolicyLoss
from ray.rllib.policy_gradient.filter import MeanStdFilter
from ray.rllib.policy_gradient.rollout import rollouts, add_advantage_values
from ray.rllib.policy_gradient.utils import flatten, concatenate
# TODO(pcm): Make sure that both observation_filter and reward_filter
# are correctly handled, i.e. (a) the values are accumulated accross
@@ -82,8 +84,8 @@ class Agent(object):
assert config["sgd_batchsize"] % len(devices) == 0, \
"Batch size must be evenly divisible by devices"
if is_remote:
self.batch_size = 1
self.per_device_batch_size = 1
self.batch_size = config["rollout_batchsize"]
self.per_device_batch_size = config["rollout_batchsize"]
else:
self.batch_size = config["sgd_batchsize"]
self.per_device_batch_size = int(self.batch_size / len(devices))
@@ -148,11 +150,51 @@ class Agent(object):
self.variables.set_weights(weights)
def compute_trajectory(self, gamma, lam, horizon):
"""Compute a single rollout on the agent and return."""
trajectory = rollouts(
self.common_policy,
self.env, horizon, self.observation_filter, self.reward_filter)
add_advantage_values(trajectory, gamma, lam, self.reward_filter)
return trajectory
def compute_steps(self, gamma, lam, horizon, min_steps_per_task=-1):
"""Compute multiple rollouts and concatenate the results.
Args:
gamma: MDP discount factor
lam: GAE(lambda) parameter
horizon: Number of steps after which a rollout gets cut
min_steps_per_task: Lower bound on the number of states to be
collected.
Returns:
states: List of states.
total_rewards: Total rewards of the trajectories.
trajectory_lengths: Lengths of the trajectories.
"""
num_steps_so_far = 0
trajectories = []
total_rewards = []
trajectory_lengths = []
while True:
trajectory = self.compute_trajectory(gamma, lam, horizon)
total_rewards.append(
trajectory["raw_rewards"].sum(axis=0).mean())
trajectory_lengths.append(
np.logical_not(trajectory["dones"]).sum(axis=0).mean())
trajectory = flatten(trajectory)
not_done = np.logical_not(trajectory["dones"])
# Filtering out states that are done. We do this because
# trajectories are batched and cut only if all the trajectories
# in the batch terminated, so we can potentially get rid of
# some of the states here.
trajectory = {key: val[not_done]
for key, val in trajectory.items()}
num_steps_so_far += trajectory["raw_rewards"].shape[0]
trajectories.append(trajectory)
if num_steps_so_far >= min_steps_per_task:
break
return concatenate(trajectories), total_rewards, trajectory_lengths
RemoteAgent = ray.remote(Agent)
@@ -18,10 +18,19 @@ from ray.rllib.policy_gradient.utils import shuffle
DEFAULT_CONFIG = {
# Discount factor of the MDP
"gamma": 0.995,
# Number of steps after which the rollout gets cut
"horizon": 2000,
# GAE(lambda) parameter
"lambda": 1.0,
# Initial coefficient for KL divergence
"kl_coeff": 0.2,
# Number of SGD iterations in each outer loop
"num_sgd_iter": 30,
# Number of outer loop iterations
"max_iterations": 1000,
# Stepsize of SGD
"sgd_stepsize": 5e-5,
# TODO(pcm): Expose the choice between gpus and cpus
# as a command line argument.
@@ -31,17 +40,34 @@ DEFAULT_CONFIG = {
"log_device_placement": False,
"allow_soft_placement": True,
},
"sgd_batchsize": 128, # total size across all devices
# Batch size for policy evaluations for rollouts
"rollout_batchsize": 1,
# Total SGD batch size across all devices for SGD
"sgd_batchsize": 128,
# Coefficient of the entropy regularizer
"entropy_coeff": 0.0,
# PPO clip parameter
"clip_param": 0.3,
# Target value for KL divergence
"kl_target": 0.01,
"model": {"free_logstd": False},
# Number of timesteps collected in each outer loop
"timesteps_per_batch": 40000,
# Each tasks performs rollouts until at least this
# number of steps is obtained
"min_steps_per_task": 1000,
# Number of actors used to collect the rollouts
"num_agents": 5,
# Dump TensorFlow timeline after this many SGD minibatches
"full_trace_nth_sgd_batch": -1,
# Whether to profile data loading
"full_trace_data_load": False,
# If this is True, the TensorFlow debugger is invoked if an Inf or NaN
# is detected
"use_tf_debugger": False,
"write_logs": True, # write checkpoints and tensorflow logging?
# If True, we write checkpoints and tensorflow logging
"write_logs": True,
# Name of the model checkpoint file
"model_checkpoint_file": "iteration-%s.ckpt"}
@@ -79,6 +105,7 @@ class PolicyGradient(Algorithm):
self.env_name, 1, self.preprocessor, self.config,
self.logdir, True)
for _ in range(config["num_agents"])]
self.start_time = time.time()
def train(self):
agents = self.agents
@@ -108,7 +135,7 @@ class PolicyGradient(Algorithm):
weights = ray.put(model.get_weights())
[a.load_weights.remote(weights) for a in agents]
trajectory, total_reward, traj_len_mean = collect_samples(
agents, config["timesteps_per_batch"], config["gamma"], 1.0, 2000)
agents, config)
print("total reward is ", total_reward)
print("trajectory length mean is ", traj_len_mean)
print("timesteps:", trajectory["dones"].shape[0])
@@ -213,6 +240,7 @@ class PolicyGradient(Algorithm):
print("load time:", load_time)
print("sgd time:", sgd_time)
print("sgd examples/s:", len(trajectory["observations"]) / sgd_time)
print("total time so far:", time.time() - self.start_time)
result = TrainingResult(
self.experiment_id.hex, j, total_reward, traj_len_mean, info)
+17 -23
View File
@@ -6,7 +6,7 @@ import numpy as np
import ray
from ray.rllib.policy_gradient.filter import NoFilter
from ray.rllib.policy_gradient.utils import flatten, concatenate
from ray.rllib.policy_gradient.utils import concatenate
def rollouts(policy, env, horizon, observation_filter=NoFilter(),
@@ -72,42 +72,36 @@ def add_advantage_values(trajectory, gamma, lam, reward_filter):
trajectory["advantages"] = advantages
@ray.remote
def compute_trajectory(policy, env, gamma, lam, horizon, observation_filter,
reward_filter):
trajectory = rollouts(policy, env, horizon, observation_filter,
reward_filter)
add_advantage_values(trajectory, gamma, lam, reward_filter)
return trajectory
def collect_samples(agents, num_timesteps, gamma, lam, horizon,
observation_filter=NoFilter(), reward_filter=NoFilter()):
def collect_samples(agents,
config,
observation_filter=NoFilter(),
reward_filter=NoFilter()):
num_timesteps_so_far = 0
trajectories = []
total_rewards = []
traj_len_means = []
trajectory_lengths = []
# This variable maps the object IDs of trajectories that are currently
# computed to the agent that they are computed on; we start some initial
# tasks here.
agent_dict = {agent.compute_trajectory.remote(gamma, lam, horizon):
agent_dict = {agent.compute_steps.remote(
config["gamma"], config["lambda"],
config["horizon"], config["min_steps_per_task"]):
agent for agent in agents}
while num_timesteps_so_far < num_timesteps:
while num_timesteps_so_far < config["timesteps_per_batch"]:
# TODO(pcm): Make wait support arbitrary iterators and remove the
# conversion to list here.
[next_trajectory], waiting_trajectories = ray.wait(
list(agent_dict.keys()))
agent = agent_dict.pop(next_trajectory)
# Start task with next trajectory and record it in the dictionary.
agent_dict[agent.compute_trajectory.remote(gamma, lam, horizon)] = (
agent_dict[agent.compute_steps.remote(
config["gamma"], config["lambda"],
config["horizon"], config["min_steps_per_task"])] = (
agent)
trajectory = flatten(ray.get(next_trajectory))
not_done = np.logical_not(trajectory["dones"])
total_rewards.append(
trajectory["raw_rewards"][not_done].sum(axis=0).mean())
traj_len_means.append(not_done.sum(axis=0).mean())
trajectory = {key: val[not_done] for key, val in trajectory.items()}
trajectory, rewards, lengths = ray.get(next_trajectory)
total_rewards.extend(rewards)
trajectory_lengths.extend(lengths)
num_timesteps_so_far += len(trajectory["dones"])
trajectories.append(trajectory)
return (concatenate(trajectories), np.mean(total_rewards),
np.mean(traj_len_means))
np.mean(trajectory_lengths))