[rllib] Fix DQN inefficiency, and cleanup for different modes of parallelism (#1151)

* initial checkin

* flake

* dqn

* docs

* add tuned pong

* remove

* upd

* add both

* better gamma

* update

* Last nit
This commit is contained in:
Eric Liang
2017-10-29 10:52:30 -07:00
committed by Richard Liaw
parent 304c3cade4
commit 4cace0976d
6 changed files with 519 additions and 237 deletions
+361 -115
View File
@@ -7,6 +7,7 @@ import time
import numpy as np
import pickle
import os
import sys
import tensorflow as tf
import ray
@@ -15,90 +16,99 @@ from ray.rllib.dqn import logger, models
from ray.rllib.dqn.common.wrappers import wrap_dqn
from ray.rllib.dqn.common.schedules import LinearSchedule
from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer
from ray.rllib.ppo.filter import RunningStat
from ray.tune.result import TrainingResult
"""The default configuration dict for the DQN algorithm.
dueling: bool
whether to use dueling dqn
double_q: bool
whether to use double dqn
hiddens: array<int>
hidden layer sizes of the state and action value networks
model: dict
config options to pass to the model constructor
lr: float
learning rate for adam optimizer
schedule_max_timesteps: int
max num timesteps for annealing schedules
timesteps_per_iteration: int
number of env steps to optimize for before returning
buffer_size: int
size of the replay buffer
exploration_fraction: float
fraction of entire training period over which the exploration rate is
annealed
exploration_final_eps: float
final value of random action probability
sample_batch_size: int
update the replay buffer with this many samples at once
num_workers: int
the number of workers to use for parallel batch sample collection
train_batch_size: int
size of a batched sampled from replay buffer for training
print_freq: int
how often to print out training progress
set to None to disable printing
learning_starts: int
how many steps of the model to collect transitions for before learning
starts
gamma: float
discount factor
grad_norm_clipping: int or None
if not None, clip gradients during optimization at this value
target_network_update_freq: int
update the target network every `target_network_update_freq` steps.
prioritized_replay: True
if True prioritized replay buffer will be used.
prioritized_replay_alpha: float
alpha parameter for prioritized replay buffer
prioritized_replay_beta0: float
initial value of beta for prioritized replay buffer
prioritized_replay_beta_iters: int
number of iterations over which beta will be annealed from initial
value to 1.0. If set to None equals to schedule_max_timesteps
prioritized_replay_eps: float
epsilon to add to the TD errors when updating priorities.
num_cpu: int
number of cpus to use for training
"""
DEFAULT_CONFIG = dict(
# === Model ===
# Whether to use dueling dqn
dueling=True,
# Whether to use double dqn
double_q=True,
# Hidden layer sizes of the state and action value networks
hiddens=[256],
# Config options to pass to the model constructor
model={},
gpu_offset=0,
lr=5e-4,
# Discount factor for the MDP
gamma=0.99,
# === Exploration ===
# Max num timesteps for annealing schedules. Exploration is annealed from
# 1.0 to exploration_fraction over this number of timesteps scaled by
# exploration_fraction
schedule_max_timesteps=100000,
# Number of env steps to optimize for before returning
timesteps_per_iteration=1000,
buffer_size=50000,
# Fraction of entire training period over which the exploration rate is
# annealed
exploration_fraction=0.1,
# Final value of random action probability
exploration_final_eps=0.02,
sample_batch_size=1,
num_workers=1,
train_batch_size=32,
print_freq=1,
# How many steps of the model to sample before learning starts.
learning_starts=1000,
gamma=1.0,
grad_norm_clipping=10,
# Update the target network every `target_network_update_freq` steps.
target_network_update_freq=500,
prioritized_replay=False,
# === Replay buffer ===
# Size of the replay buffer. Note that if async_updates is set, then each
# worker will have a replay buffer of this size.
buffer_size=50000,
# If True prioritized replay buffer will be used.
prioritized_replay=True,
# Alpha parameter for prioritized replay buffer
prioritized_replay_alpha=0.6,
# Initial value of beta for prioritized replay buffer
prioritized_replay_beta0=0.4,
# Number of iterations over which beta will be annealed from initial
# value to 1.0. If set to None equals to schedule_max_timesteps
prioritized_replay_beta_iters=None,
# Epsilon to add to the TD errors when updating priorities.
prioritized_replay_eps=1e-6,
num_cpu=16)
# === Optimization ===
# Learning rate for adam optimizer
lr=5e-4,
# Update the replay buffer with this many samples at once. Note that this
# setting applies per-worker if num_workers > 1.
sample_batch_size=1,
# Size of a batched sampled from replay buffer for training. Note that if
# async_updates is set, then each worker returns gradients for a batch of
# this size.
train_batch_size=32,
# SGD minibatch size. Note that this must be << train_batch_size. This
# config has no effect if gradients_on_workres is True.
sgd_batch_size=32,
# If not None, clip gradients during optimization at this value
grad_norm_clipping=10,
# === Tensorflow ===
# Arguments to pass to tensorflow
tf_session_args={
"device_count": {"CPU": 2},
"log_device_placement": False,
"allow_soft_placement": True,
"inter_op_parallelism_threads": 1,
"intra_op_parallelism_threads": 1,
},
# === Parallelism ===
# Number of workers for collecting samples with. Note that the typical
# setting is 1 unless your environment is particularly slow to sample.
num_workers=1,
# Whether to allocate GPUs for workers (if num_workers > 1).
use_gpu_for_workers=False,
# (Experimental) Whether to update the model asynchronously from
# workers. In this mode, gradients will be computed on workers instead of
# on the driver, and workers will each have their own replay buffer.
async_updates=False,
# (Experimental) Whether to use multiple GPUs for SGD optimization.
# Note that this only helps performance if the SGD batch size is large.
multi_gpu_optimize=False,
# Number of SGD iterations over the data. Only applies in multi-gpu mode.
num_sgd_iter=1,
# Devices to use for parallel SGD. Only applies in multi-gpu mode.
devices=["/gpu:0"])
class Actor(object):
@@ -108,12 +118,9 @@ class Actor(object):
self.env = env
self.config = config
num_cpu = config["num_cpu"]
tf_config = tf.ConfigProto(
inter_op_parallelism_threads=num_cpu,
intra_op_parallelism_threads=num_cpu)
tf_config = tf.ConfigProto(**config["tf_session_args"])
self.sess = tf.Session(config=tf_config)
self.dqn_graph = models.DQNGraph(env, config)
self.dqn_graph = models.DQNGraph(env, config, logdir)
# Create the replay buffer
if config["prioritized_replay"]:
@@ -143,8 +150,13 @@ class Actor(object):
# Initialize the parameters and copy them to the target network.
self.sess.run(tf.global_variables_initializer())
self.dqn_graph.update_target(self.sess)
self.set_weights_time = RunningStat(())
self.sample_time = RunningStat(())
self.grad_time = RunningStat(())
# Note that workers don't need target vars to be synced
self.variables = ray.experimental.TensorFlowVariables(
tf.group(self.dqn_graph.q_tp1, self.dqn_graph.q_t), self.sess)
tf.group(self.dqn_graph.q_t, self.dqn_graph.q_tp1), self.sess)
self.episode_rewards = [0.0]
self.episode_lengths = [0.0]
@@ -153,7 +165,7 @@ class Actor(object):
self.file_writer = tf.summary.FileWriter(logdir, self.sess.graph)
def step(self, cur_timestep):
# Take action and update exploration to the newest value
"""Takes a single step, and returns the result of the step."""
action = self.dqn_graph.act(
self.sess, np.array(self.obs)[None],
self.exploration.value(cur_timestep))[0]
@@ -168,12 +180,27 @@ class Actor(object):
self.episode_lengths.append(0.0)
return ret
def do_steps(self, num_steps, cur_timestep):
for _ in range(num_steps):
obs, action, rew, new_obs, done = self.step(cur_timestep)
self.replay_buffer.add(obs, action, rew, new_obs, done)
def do_steps(self, num_steps, cur_timestep, store):
"""Takes N steps.
def get_gradient(self, cur_timestep):
If store is True, the steps will be stored in the local replay buffer.
Otherwise, the steps will be returned.
"""
output = []
for _ in range(num_steps):
result = self.step(cur_timestep)
if store:
obs, action, rew, new_obs, done = result
self.replay_buffer.add(obs, action, rew, new_obs, done)
else:
output.append(result)
if not store:
return output
def do_multi_gpu_optimize(self, cur_timestep):
"""Performs N iters of multi-gpu SGD over the local replay buffer."""
dt = time.time()
if self.config["prioritized_replay"]:
experience = self.replay_buffer.sample(
self.config["train_batch_size"],
@@ -184,6 +211,73 @@ class Actor(object):
obses_t, actions, rewards, obses_tp1, dones = \
self.replay_buffer.sample(self.config["train_batch_size"])
batch_idxes = None
replay_buffer_read_time = (time.time() - dt)
dt = time.time()
tuples_per_device = self.dqn_graph.multi_gpu_optimizer.load_data(
self.sess,
[obses_t, actions, rewards, obses_tp1, dones,
np.ones_like(rewards)])
per_device_batch_size = (
self.dqn_graph.multi_gpu_optimizer.per_device_batch_size)
num_batches = (int(tuples_per_device) // int(per_device_batch_size))
data_load_time = (time.time() - dt)
dt = time.time()
for _ in range(self.config["num_sgd_iter"]):
batches = list(range(num_batches))
np.random.shuffle(batches)
for i in batches:
self.dqn_graph.multi_gpu_optimizer.optimize(
self.sess, i * per_device_batch_size)
sgd_time = (time.time() - dt)
dt = time.time()
if self.config["prioritized_replay"]:
dt = time.time()
td_errors = self.dqn_graph.compute_td_error(
self.sess, obses_t, actions, rewards, obses_tp1, dones,
np.ones_like(rewards))
dt = time.time()
new_priorities = (
np.abs(td_errors) + self.config["prioritized_replay_eps"])
self.replay_buffer.update_priorities(
batch_idxes, new_priorities)
prioritization_time = (time.time() - dt)
return {
"replay_buffer_read_time": replay_buffer_read_time,
"data_load_time": data_load_time,
"sgd_time": sgd_time,
"prioritization_time": prioritization_time,
}
def do_async_step(self, worker_id, cur_timestep, params, gradient_id):
"""Takes steps and returns grad to apply async in the driver."""
dt = time.time()
self.set_weights(params)
self.set_weights_time.push(time.time() - dt)
dt = time.time()
self.do_steps(
self.config["sample_batch_size"], cur_timestep, store=True)
self.sample_time.push(time.time() - dt)
if (cur_timestep > self.config["learning_starts"] and
len(self.replay_buffer) > self.config["train_batch_size"]):
dt = time.time()
gradient = self.sample_buffer_gradient(cur_timestep)
self.grad_time.push(time.time() - dt)
else:
gradient = None
return gradient, {"id": worker_id, "gradient_id": gradient_id}
def sample_buffer_gradient(self, cur_timestep):
"""Returns grad over a batch sampled from the local replay buffer."""
if self.config["prioritized_replay"]:
experience = self.replay_buffer.sample(
self.config["sgd_batch_size"],
beta=self.beta_schedule.value(cur_timestep))
(obses_t, actions, rewards, obses_tp1,
dones, _, batch_idxes) = experience
else:
obses_t, actions, rewards, obses_tp1, dones = \
self.replay_buffer.sample(self.config["sgd_batch_size"])
batch_idxes = None
td_errors, grad = self.dqn_graph.compute_gradients(
self.sess, obses_t, actions, rewards, obses_tp1, dones,
np.ones_like(rewards))
@@ -197,6 +291,8 @@ class Actor(object):
def apply_gradients(self, grad):
self.dqn_graph.apply_gradients(self.sess, grad)
# TODO(ekl) return a dictionary and use that everywhere to clean up the
# bookkeeping of stats
def stats(self, num_timesteps):
mean_100ep_reward = round(np.mean(self.episode_rewards[-101:-1]), 5)
mean_100ep_length = round(np.mean(self.episode_lengths[-101:-1]), 5)
@@ -206,7 +302,10 @@ class Actor(object):
mean_100ep_length,
len(self.episode_rewards),
exploration,
len(self.replay_buffer))
len(self.replay_buffer),
float(self.set_weights_time.mean),
float(self.sample_time.mean),
float(self.grad_time.mean))
def get_weights(self):
return self.variables.get_weights()
@@ -236,22 +335,44 @@ class Actor(object):
@ray.remote
class RemoteActor(Actor):
def __init__(self, env_creator, config, logdir, gpu_mask):
os.environ["CUDA_VISIBLE_DEVICES"] = gpu_mask
def __init__(self, env_creator, config, logdir):
Actor.__init__(self, env_creator, config, logdir)
def stop(self):
sys.exit(0)
@ray.remote(num_gpus=1)
class GPURemoteActor(Actor):
def __init__(self, env_creator, config, logdir):
Actor.__init__(self, env_creator, config, logdir)
def stop(self):
sys.exit(0)
class DQNAgent(Agent):
_agent_name = "DQN"
_default_config = DEFAULT_CONFIG
def stop(self):
for w in self.workers:
w.stop.remote()
def _init(self):
self.actor = Actor(self.env_creator, self.config, self.logdir)
self.workers = [
RemoteActor.remote(
self.env_creator, self.config, self.logdir,
"{}".format(i + self.config["gpu_offset"]))
for i in range(self.config["num_workers"])]
if self.config["use_gpu_for_workers"]:
remote_cls = GPURemoteActor
else:
remote_cls = RemoteActor
# Use remote workers
if self.config["num_workers"] > 1 or self.config["async_updates"]:
self.workers = [
remote_cls.remote(self.env_creator, self.config, self.logdir)
for i in range(self.config["num_workers"])]
else:
# Use a single local worker and avoid object store overheads
self.workers = []
self.cur_timestep = 0
self.num_iterations = 0
@@ -262,52 +383,169 @@ class DQNAgent(Agent):
self.saver = tf.train.Saver(max_to_keep=None)
def _update_worker_weights(self):
w = self.actor.get_weights()
weights = ray.put(self.actor.get_weights())
for w in self.workers:
w.set_weights.remote(weights)
if self.workers:
w = self.actor.get_weights()
weights = ray.put(self.actor.get_weights())
for w in self.workers:
w.set_weights.remote(weights)
def _train(self):
if self.config["async_updates"]:
return self._train_async()
else:
return self._train_sync()
def _train_async(self):
apply_time = RunningStat(())
wait_time = RunningStat(())
gradient_lag = RunningStat(())
iter_init_timesteps = self.cur_timestep
num_gradients_applied = 0
gradient_list = [
worker.do_async_step.remote(
i, self.cur_timestep, self.actor.get_weights(),
num_gradients_applied)
for i, worker in enumerate(self.workers)]
steps = self.config["sample_batch_size"] * len(gradient_list)
self.cur_timestep += steps
self.steps_since_update += steps
while gradient_list:
dt = time.time()
gradient, info = ray.get(gradient_list[0])
gradient_list = gradient_list[1:]
wait_time.push(time.time() - dt)
if gradient is not None:
dt = time.time()
self.actor.apply_gradients(gradient)
apply_time.push(time.time() - dt)
gradient_lag.push(num_gradients_applied - info["gradient_id"])
num_gradients_applied += 1
if (self.cur_timestep - iter_init_timesteps <
self.config["timesteps_per_iteration"]):
worker_id = info["id"]
gradient_list.append(
self.workers[info["id"]].do_async_step.remote(
worker_id, self.cur_timestep,
self.actor.get_weights(), num_gradients_applied))
self.cur_timestep += self.config["sample_batch_size"]
self.steps_since_update += self.config["sample_batch_size"]
if (self.cur_timestep > self.config["learning_starts"] and
self.steps_since_update >
self.config["target_network_update_freq"]):
# Update target network periodically.
self.actor.dqn_graph.update_target(self.actor.sess)
self.steps_since_update -= (
self.config["target_network_update_freq"])
self.num_target_updates += 1
mean_100ep_reward = 0.0
mean_100ep_length = 0.0
num_episodes = 0
buffer_size_sum = 0
stats = ray.get(
[w.stats.remote(self.cur_timestep) for w in self.workers])
for stat in stats:
mean_100ep_reward += stat[0]
mean_100ep_length += stat[1]
num_episodes += stat[2]
exploration = stat[3]
buffer_size_sum += stat[4]
set_weights_time = stat[5]
sample_time = stat[6]
grad_time = stat[7]
mean_100ep_reward /= self.config["num_workers"]
mean_100ep_length /= self.config["num_workers"]
info = [
("mean_100ep_reward", mean_100ep_reward),
("exploration_frac", exploration),
("steps", self.cur_timestep),
("episodes", num_episodes),
("buffer_sizes_sum", buffer_size_sum),
("target_updates", self.num_target_updates),
("mean_set_weights_time", set_weights_time),
("mean_sample_time", sample_time),
("mean_grad_time", grad_time),
("mean_apply_time", float(apply_time.mean)),
("mean_ray_wait_time", float(wait_time.mean)),
("gradient_lag_mean", float(gradient_lag.mean)),
("gradient_lag_stdev", float(gradient_lag.std)),
]
for k, v in info:
logger.record_tabular(k, v)
logger.dump_tabular()
result = TrainingResult(
episode_reward_mean=mean_100ep_reward,
episode_len_mean=mean_100ep_length,
timesteps_this_iter=self.cur_timestep - iter_init_timesteps,
info=info)
return result
def _train_sync(self):
config = self.config
sample_time, sync_time, learn_time, apply_time = 0, 0, 0, 0
iter_init_timesteps = self.cur_timestep
num_loop_iters = 0
steps_per_iter = config["sample_batch_size"] * len(self.workers)
while (self.cur_timestep - iter_init_timesteps <
config["timesteps_per_iteration"]):
dt = time.time()
ray.get([
w.do_steps.remote(
config["sample_batch_size"], self.cur_timestep)
for w in self.workers])
if self.workers:
worker_steps = ray.get([
w.do_steps.remote(
config["sample_batch_size"] // len(self.workers),
self.cur_timestep, store=False)
for w in self.workers])
for steps in worker_steps:
for obs, action, rew, new_obs, done in steps:
self.actor.replay_buffer.add(
obs, action, rew, new_obs, done)
else:
self.actor.do_steps(
config["sample_batch_size"], self.cur_timestep, store=True)
num_loop_iters += 1
self.cur_timestep += steps_per_iter
self.steps_since_update += steps_per_iter
self.cur_timestep += config["sample_batch_size"]
self.steps_since_update += config["sample_batch_size"]
sample_time += time.time() - dt
if self.cur_timestep > config["learning_starts"]:
if config["multi_gpu_optimize"]:
dt = time.time()
times = self.actor.do_multi_gpu_optimize(self.cur_timestep)
if num_loop_iters <= 1:
print("Multi-GPU times", times)
learn_time += (time.time() - dt)
else:
# Minimize the error in Bellman's equation on a batch
# sampled from replay buffer.
for _ in range(
max(1, config["train_batch_size"] //
config["sgd_batch_size"])):
dt = time.time()
gradients = [
self.actor.sample_buffer_gradient(
self.cur_timestep)]
learn_time += (time.time() - dt)
dt = time.time()
for grad in gradients:
self.actor.apply_gradients(grad)
apply_time += (time.time() - dt)
dt = time.time()
# Minimize the error in Bellman's equation on a batch sampled
# from replay buffer.
self._update_worker_weights()
sync_time += (time.time() - dt)
dt = time.time()
gradients = ray.get(
[w.get_gradient.remote(self.cur_timestep)
for w in self.workers])
learn_time += (time.time() - dt)
dt = time.time()
for grad in gradients:
self.actor.apply_gradients(grad)
apply_time += (time.time() - dt)
if (self.cur_timestep > config["learning_starts"] and
self.steps_since_update >
config["target_network_update_freq"]):
self.actor.dqn_graph.update_target(self.actor.sess)
# Update target network periodically.
self._update_worker_weights()
self.actor.dqn_graph.update_target(self.actor.sess)
self.steps_since_update -= config["target_network_update_freq"]
self.num_target_updates += 1
@@ -315,14 +553,21 @@ class DQNAgent(Agent):
mean_100ep_length = 0.0
num_episodes = 0
buffer_size_sum = 0
if not self.workers:
stats = self.actor.stats(self.cur_timestep)
mean_100ep_reward += stats[0]
mean_100ep_length += stats[1]
num_episodes += stats[2]
exploration = stats[3]
buffer_size_sum += stats[4]
for mean_rew, mean_len, episodes, exploration, buf_sz in ray.get(
[w.stats.remote(self.cur_timestep) for w in self.workers]):
mean_100ep_reward += mean_rew
mean_100ep_length += mean_len
num_episodes += episodes
buffer_size_sum += buf_sz
mean_100ep_reward /= len(self.workers)
mean_100ep_length /= len(self.workers)
mean_100ep_reward /= config["num_workers"]
mean_100ep_length /= config["num_workers"]
info = [
("mean_100ep_reward", mean_100ep_reward),
@@ -336,10 +581,11 @@ class DQNAgent(Agent):
("apply_time", apply_time),
("learn_time", learn_time),
("samples_per_s",
num_loop_iters * np.float64(steps_per_iter) / sample_time),
num_loop_iters * np.float64(config["sample_batch_size"]) /
sample_time),
("learn_samples_per_s",
num_loop_iters * np.float64(config["train_batch_size"]) *
np.float64(config["num_workers"]) / learn_time),
num_loop_iters * np.float64(config["train_batch_size"]) /
learn_time),
]
for k, v in info:
+94 -31
View File
@@ -6,6 +6,7 @@ import tensorflow as tf
import tensorflow.contrib.layers as layers
from ray.rllib.models import ModelCatalog
from ray.rllib.parallel import LocalSyncParallelOptimizer, TOWER_SCOPE_NAME
def _build_q_network(inputs, num_actions, config):
@@ -97,8 +98,56 @@ def _scope_vars(scope, trainable_only=False):
scope=scope if isinstance(scope, str) else scope.name)
class ModelAndLoss(object):
"""Holds the model and loss function.
Both graphs are necessary in order for the multi-gpu SGD implementation
to create towers on each device.
"""
def __init__(
self, num_actions, config,
obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights):
# q network evaluation
with tf.variable_scope("q_func", reuse=True):
self.q_t = _build_q_network(obs_t, num_actions, config)
# target q network evalution
with tf.variable_scope("target_q_func") as scope:
self.q_tp1 = _build_q_network(obs_tp1, num_actions, config)
self.target_q_func_vars = _scope_vars(scope.name)
# q scores for actions which we know were selected in the given state.
q_t_selected = tf.reduce_sum(
self.q_t * tf.one_hot(act_t, num_actions), 1)
# compute estimate of best possible value starting from state at t + 1
if config["double_q"]:
with tf.variable_scope("q_func", reuse=True):
q_tp1_using_online_net = _build_q_network(
obs_tp1, num_actions, config)
q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1)
q_tp1_best = tf.reduce_sum(
self.q_tp1 * tf.one_hot(
q_tp1_best_using_online_net, num_actions), 1)
else:
q_tp1_best = tf.reduce_max(self.q_tp1, 1)
q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best
# compute RHS of bellman equation
q_t_selected_target = rew_t + config["gamma"] * q_tp1_best_masked
# compute the error (potentially clipped)
self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target)
errors = _huber_loss(self.td_error)
weighted_error = tf.reduce_mean(importance_weights * errors)
self.loss = weighted_error
class DQNGraph(object):
def __init__(self, env, config):
def __init__(self, env, config, logdir):
self.env = env
num_actions = env.action_space.n
optimizer = tf.train.AdamOptimizer(learning_rate=config["lr"])
@@ -110,7 +159,11 @@ class DQNGraph(object):
tf.float32, shape=(None,) + env.observation_space.shape)
# Action Q network
with tf.variable_scope("q_func") as scope:
if config["multi_gpu_optimize"]:
q_scope_name = TOWER_SCOPE_NAME + "/q_func"
else:
q_scope_name = "q_func"
with tf.variable_scope(q_scope_name) as scope:
q_values = _build_q_network(
self.cur_observations, num_actions, config)
q_func_vars = _scope_vars(scope.name)
@@ -134,39 +187,34 @@ class DQNGraph(object):
self.importance_weights = tf.placeholder(
tf.float32, [None], name="weight")
# q network evaluation
with tf.variable_scope("q_func", reuse=True):
self.q_t = _build_q_network(self.obs_t, num_actions, config)
def build_loss(
obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights):
return ModelAndLoss(
num_actions, config,
obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights)
# target q network evalution
with tf.variable_scope("target_q_func") as scope:
self.q_tp1 = _build_q_network(self.obs_tp1, num_actions, config)
target_q_func_vars = _scope_vars(scope.name)
# q scores for actions which we know were selected in the given state.
q_t_selected = tf.reduce_sum(
self.q_t * tf.one_hot(self.act_t, num_actions), 1)
# compute estimate of best possible value starting from state at t + 1
if config["double_q"]:
with tf.variable_scope("q_func", reuse=True):
q_tp1_using_online_net = _build_q_network(
self.obs_tp1, num_actions, config)
q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1)
q_tp1_best = tf.reduce_sum(
self.q_tp1 * tf.one_hot(
q_tp1_best_using_online_net, num_actions), 1)
if config["multi_gpu_optimize"]:
self.multi_gpu_optimizer = LocalSyncParallelOptimizer(
optimizer,
config["devices"],
[self.obs_t, self.act_t, self.rew_t, self.obs_tp1,
self.done_mask, self.importance_weights],
int(config["sgd_batch_size"] / len(config["devices"])),
build_loss,
logdir,
grad_norm_clipping=config["grad_norm_clipping"])
loss_obj = self.multi_gpu_optimizer.get_common_loss()
else:
q_tp1_best = tf.reduce_max(self.q_tp1, 1)
q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best
loss_obj = build_loss(
self.obs_t, self.act_t, self.rew_t, self.obs_tp1,
self.done_mask, self.importance_weights)
# compute RHS of bellman equation
q_t_selected_target = self.rew_t + config["gamma"] * q_tp1_best_masked
weighted_error = loss_obj.loss
target_q_func_vars = loss_obj.target_q_func_vars
self.q_t = loss_obj.q_t
self.q_tp1 = loss_obj.q_tp1
self.td_error = loss_obj.td_error
# compute the error (potentially clipped)
self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target)
errors = _huber_loss(self.td_error)
weighted_error = tf.reduce_mean(self.importance_weights * errors)
# compute optimization op (potentially with gradient clipping)
if config["grad_norm_clipping"] is not None:
self.grads_and_vars = _minimize_and_clip(
@@ -216,6 +264,21 @@ class DQNGraph(object):
})
return td_err, grads
def compute_td_error(
self, sess, obs_t, act_t, rew_t, obs_tp1, done_mask,
importance_weights):
td_err = sess.run(
self.td_error,
feed_dict={
self.obs_t: obs_t,
self.act_t: act_t,
self.rew_t: rew_t,
self.obs_tp1: obs_tp1,
self.done_mask: done_mask,
self.importance_weights: importance_weights
})
return td_err
def apply_gradients(self, sess, grads):
assert len(grads) == len(self.grads_and_vars)
feed_dict = {ph: g for (g, ph) in zip(grads, self.grads)}
@@ -1,44 +0,0 @@
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import ray
from ray.rllib.dqn import DQNAgent, DEFAULT_CONFIG
def main():
parser = argparse.ArgumentParser(description="Run the DQN algorithm.")
parser.add_argument("--iterations", default=-1, type=int,
help="The number of training iterations to run.")
args = parser.parse_args()
config = DEFAULT_CONFIG.copy()
config.update(dict(
lr=1e-3,
schedule_max_timesteps=100000,
exploration_fraction=0.1,
exploration_final_eps=0.02,
dueling=False,
hiddens=[],
model_config=dict(
fcnet_hiddens=[64],
fcnet_activation='relu',
)))
ray.init()
dqn = DQNAgent("CartPole-v0", config)
iteration = 0
while iteration != args.iterations:
iteration += 1
res = dqn.train()
print("current status: {}".format(res))
if __name__ == "__main__":
main()
-43
View File
@@ -1,43 +0,0 @@
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import ray
from ray.rllib.dqn import DQNAgent, DEFAULT_CONFIG
def main():
parser = argparse.ArgumentParser(description="Run the DQN algorithm.")
parser.add_argument("--iterations", default=-1, type=int,
help="The number of training iterations to run.")
args = parser.parse_args()
config = DEFAULT_CONFIG.copy()
config.update(dict(
lr=1e-4,
schedule_max_timesteps=2000000,
exploration_fraction=0.1,
exploration_final_eps=0.01,
train_freq=4,
learning_starts=10000,
target_network_update_freq=1000,
gamma=0.99,
prioritized_replay=True))
ray.init()
dqn = DQNAgent("PongNoFrameskip-v4", config)
iteration = 0
while iteration != args.iterations:
iteration += 1
res = dqn.train()
print("current status: {}".format(res))
if __name__ == "__main__":
main()
+16 -3
View File
@@ -9,6 +9,10 @@ from tensorflow.python.client import timeline
import tensorflow as tf
# Variable scope in which created variables will be placed under
TOWER_SCOPE_NAME = "tower"
class LocalSyncParallelOptimizer(object):
"""Optimizer that runs in parallel across multiple local devices.
@@ -41,10 +45,12 @@ class LocalSyncParallelOptimizer(object):
object with a 'loss' property that is a scalar Tensor. For example,
ray.rllib.ppo.ProximalPolicyLoss.
logdir: Directory to place debugging output in.
grad_norm_clipping: None or int stdev to clip grad norms by
"""
def __init__(self, optimizer, devices, input_placeholders,
per_device_batch_size, build_loss, logdir):
per_device_batch_size, build_loss, logdir,
grad_norm_clipping=None):
self.optimizer = optimizer
self.devices = devices
self.batch_size = per_device_batch_size * len(devices)
@@ -54,7 +60,7 @@ class LocalSyncParallelOptimizer(object):
self.logdir = logdir
# First initialize the shared loss network
with tf.variable_scope("tower"):
with tf.variable_scope(TOWER_SCOPE_NAME):
self._shared_loss = build_loss(*input_placeholders)
# Then setup the per-device loss graphs that use the shared weights
@@ -67,6 +73,10 @@ class LocalSyncParallelOptimizer(object):
device_placeholders))
avg = average_gradients([t.grads for t in self._towers])
if grad_norm_clipping:
for i, (grad, var) in enumerate(avg):
if grad is not None:
avg[i] = (tf.clip_by_norm(grad, grad_norm_clipping), var)
self._train_op = self.optimizer.apply_gradients(avg)
def load_data(self, sess, inputs, full_trace=False):
@@ -173,7 +183,7 @@ class LocalSyncParallelOptimizer(object):
def _setup_device(self, device, device_input_placeholders):
with tf.device(device):
with tf.variable_scope("tower", reuse=True):
with tf.variable_scope(TOWER_SCOPE_NAME, reuse=True):
device_input_batches = []
device_input_slices = []
for ph in device_input_placeholders:
@@ -239,6 +249,9 @@ def average_gradients(tower_grads):
# below.
grads.append(expanded_g)
if not grads:
continue
# Average over the 'tower' dimension.
grad = tf.concat(axis=0, values=grads)
grad = tf.reduce_mean(grad, 0)
+48 -1
View File
@@ -1,4 +1,4 @@
pong-dqn:
pong-deterministic-dqn:
env: PongDeterministic-v4
alg: DQN
resources:
@@ -7,3 +7,50 @@ pong-dqn:
stop:
episode_reward_mean: 20
time_total_s: 7200
config:
gamma: 0.99
lr: .0001
learning_starts: 10000
buffer_size: 50000
sample_batch_size: 4
train_batch_size: 32
schedule_max_timesteps: 2000000
exploration_final_eps: .01
exploration_fraction: .1
model:
grayscale: True
zero_mean: False
dim: 42
conv_filters: [
[16, [4, 4], 2],
[32, [4, 4], 2],
[512, [11, 11], 1],
]
pong-noframeskip-dqn:
env: PongNoFrameskip-v4
alg: DQN
resources:
cpu: 1
gpu: 1
stop:
episode_reward_mean: 20
time_total_s: 7200
config:
gamma: 0.99
lr: .0001
learning_starts: 10000
buffer_size: 50000
sample_batch_size: 4
train_batch_size: 32
schedule_max_timesteps: 2000000
exploration_final_eps: .01
exploration_fraction: .1
model:
grayscale: True
zero_mean: False
dim: 42
conv_filters: [
[16, [4, 4], 2],
[32, [4, 4], 2],
[512, [11, 11], 1],
]