[rllib] streaming minibatching for IMPALA (#3402)

* mb impala

* fix

* paropt

* update

* cpu warn

* on cpu

* fix mb

* doc

* docs

* comment

* larger num

* early release

* remove grad clip

* only check loader count in multi gpu mode

* revert bad multigpu changes

* num sgd iter

* comment

* reuse optimizer

* add test

* par load test

* loosen test

* Update run_multi_node_tests.sh

* fix local mode

* Update agent.py
This commit is contained in:
Eric Liang
2018-12-19 02:23:29 -08:00
committed by GitHub
parent c4cba98c75
commit ffa6ee3ec8
8 changed files with 258 additions and 41 deletions
+1 -1
View File
@@ -130,7 +130,7 @@ COMMON_CONFIG = {
# Drop metric batches from unresponsive workers after this many seconds
"collect_metrics_timeout": 180,
# === Offline Data Input / Output ===
# === Offline Data Input / Output (Experimental) ===
# Specify how to generate experiences:
# - "sampler": generate experiences via online simulation (default)
# - a local directory or file glob expression (e.g., "/tmp/*.json")
+25 -8
View File
@@ -18,10 +18,11 @@ OPTIMIZER_SHARED_CONFIGS = [
"train_batch_size",
"replay_buffer_num_slots",
"replay_proportion",
"num_parallel_data_loaders",
"grad_clip",
"num_data_loader_buffers",
"max_sample_requests_in_flight_per_worker",
"broadcast_interval",
"num_sgd_iter",
"minibatch_buffer_size",
]
# yapf: disable
@@ -33,6 +34,17 @@ DEFAULT_CONFIG = with_common_config({
"vtrace_clip_pg_rho_threshold": 1.0,
# System params.
#
# == Overview of data flow in IMPALA ==
# 1. Policy evaluation in parallel across `num_workers` actors produces
# batches of size `sample_batch_size * num_envs_per_worker`.
# 2. If enabled, the replay buffer stores and produces batches of size
# `sample_batch_size * num_envs_per_worker`.
# 3. If enabled, the minibatch ring buffer stores and replays batches of
# size `train_batch_size` up to `num_sgd_iter` times per batch.
# 4. The learner thread executes data parallel SGD across `num_gpus` GPUs
# on batches of size `train_batch_size`.
#
"sample_batch_size": 50,
"train_batch_size": 500,
"min_iter_time_s": 10,
@@ -40,18 +52,23 @@ DEFAULT_CONFIG = with_common_config({
# number of GPUs the learner should use.
"num_gpus": 1,
# set >1 to load data into GPUs in parallel. Increases GPU memory usage
# proportionally with the number of loaders.
"num_parallel_data_loaders": 1,
# level of queuing for sampling.
"max_sample_requests_in_flight_per_worker": 2,
# max number of workers to broadcast one set of weights to
"broadcast_interval": 1,
# proportionally with the number of buffers.
"num_data_loader_buffers": 1,
# how many train batches should be retained for minibatching. This conf
# only has an effect if `num_sgd_iter > 1`.
"minibatch_buffer_size": 1,
# number of passes to make over each train batch
"num_sgd_iter": 1,
# set >0 to enable experience replay. Saved samples will be replayed with
# a p:1 proportion to new data samples.
"replay_proportion": 0.0,
# number of sample batches to store for replay. The number of transitions
# saved total will be (replay_buffer_num_slots * sample_batch_size).
"replay_buffer_num_slots": 100,
# level of queuing for sampling.
"max_sample_requests_in_flight_per_worker": 2,
# max number of workers to broadcast one set of weights to
"broadcast_interval": 1,
# Learning params.
"grad_clip": 40.0,
@@ -255,6 +255,11 @@ class PolicyEvaluator(EvaluatorInterface):
policy_dict = _validate_and_canonicalize(policy_graph, self.env)
self.policies_to_train = policies_to_train or list(policy_dict.keys())
if _has_tensorflow_graph(policy_dict):
if (ray.worker._mode() != ray.worker.LOCAL_MODE
and not ray.get_gpu_ids()):
logger.info("Creating policy evaluation worker {}".format(
worker_index) +
" on CPU (please ignore any CUDA init errors)")
with tf.Graph().as_default():
if tf_session_creator:
self.tf_sess = tf_session_creator()
@@ -42,34 +42,41 @@ class AsyncSamplesOptimizer(PolicyOptimizer):
num_envs_per_worker=1,
num_gpus=0,
lr=0.0005,
grad_clip=40,
replay_buffer_num_slots=0,
replay_proportion=0.0,
num_parallel_data_loaders=1,
num_data_loader_buffers=1,
max_sample_requests_in_flight_per_worker=2,
broadcast_interval=1):
broadcast_interval=1,
num_sgd_iter=1,
minibatch_buffer_size=1,
_fake_gpus=False):
self.learning_started = False
self.train_batch_size = train_batch_size
self.sample_batch_size = sample_batch_size
self.broadcast_interval = broadcast_interval
if num_gpus > 1 or num_parallel_data_loaders > 1:
if num_gpus > 1 or num_data_loader_buffers > 1:
logger.info(
"Enabling multi-GPU mode, {} GPUs, {} parallel loaders".format(
num_gpus, num_parallel_data_loaders))
if train_batch_size // max(1, num_gpus) % (
sample_batch_size // num_envs_per_worker) != 0:
num_gpus, num_data_loader_buffers))
if num_data_loader_buffers < minibatch_buffer_size:
raise ValueError(
"Sample batches must evenly divide across GPUs.")
"In multi-gpu mode you must have at least as many "
"parallel data loader buffers as minibatch buffers: "
"{} vs {}".format(num_data_loader_buffers,
minibatch_buffer_size))
self.learner = TFMultiGPULearner(
self.local_evaluator,
lr=lr,
num_gpus=num_gpus,
train_batch_size=train_batch_size,
grad_clip=grad_clip,
num_parallel_data_loaders=num_parallel_data_loaders)
num_data_loader_buffers=num_data_loader_buffers,
minibatch_buffer_size=minibatch_buffer_size,
num_sgd_iter=num_sgd_iter,
_fake_gpus=_fake_gpus)
else:
self.learner = LearnerThread(self.local_evaluator)
self.learner = LearnerThread(self.local_evaluator,
minibatch_buffer_size, num_sgd_iter)
self.learner.start()
assert len(self.remote_evaluators) > 0
@@ -91,8 +98,11 @@ class AsyncSamplesOptimizer(PolicyOptimizer):
self.batch_buffer = []
if replay_proportion:
assert replay_buffer_num_slots > 0
assert (replay_buffer_num_slots * sample_batch_size >
if replay_buffer_num_slots * sample_batch_size <= train_batch_size:
raise ValueError(
"Replay buffer size is too small to produce train, "
"please increase replay_buffer_num_slots.",
replay_buffer_num_slots, sample_batch_size,
train_batch_size)
self.replay_proportion = replay_proportion
self.replay_buffer_num_slots = replay_buffer_num_slots
@@ -220,12 +230,14 @@ class LearnerThread(threading.Thread):
improves overall throughput.
"""
def __init__(self, local_evaluator):
def __init__(self, local_evaluator, minibatch_buffer_size, num_sgd_iter):
threading.Thread.__init__(self)
self.learner_queue_size = WindowStat("size", 50)
self.local_evaluator = local_evaluator
self.inqueue = queue.Queue(maxsize=LEARNER_QUEUE_MAX_SIZE)
self.outqueue = queue.Queue()
self.minibatch_buffer = MinibatchBuffer(
self.inqueue, minibatch_buffer_size, num_sgd_iter)
self.queue_timer = TimerStat()
self.grad_timer = TimerStat()
self.load_timer = TimerStat()
@@ -241,7 +253,7 @@ class LearnerThread(threading.Thread):
def step(self):
with self.queue_timer:
batch = self.inqueue.get()
batch, _ = self.minibatch_buffer.get()
with self.grad_timer:
fetches = self.local_evaluator.compute_apply(batch)
@@ -260,19 +272,24 @@ class TFMultiGPULearner(LearnerThread):
num_gpus=1,
lr=0.0005,
train_batch_size=500,
grad_clip=40,
num_parallel_data_loaders=1):
num_data_loader_buffers=1,
minibatch_buffer_size=1,
num_sgd_iter=1,
_fake_gpus=False):
# Multi-GPU requires TensorFlow to function.
import tensorflow as tf
LearnerThread.__init__(self, local_evaluator)
LearnerThread.__init__(self, local_evaluator, minibatch_buffer_size,
num_sgd_iter)
self.lr = lr
self.train_batch_size = train_batch_size
if not num_gpus:
self.devices = ["/cpu:0"]
elif _fake_gpus:
self.devices = ["/cpu:{}".format(i) for i in range(num_gpus)]
else:
self.devices = ["/gpu:{}".format(i) for i in range(num_gpus)]
logger.info("TFMultiGPULearner devices {}".format(self.devices))
logger.info("TFMultiGPULearner devices {}".format(self.devices))
assert self.train_batch_size % len(self.devices) == 0
assert self.train_batch_size >= len(self.devices), "batch too small"
self.policy = self.local_evaluator.policy_map["default"]
@@ -291,7 +308,7 @@ class TFMultiGPULearner(LearnerThread):
else:
rnn_inputs = []
adam = tf.train.AdamOptimizer(self.lr)
for _ in range(num_parallel_data_loaders):
for _ in range(num_data_loader_buffers):
self.par_opt.append(
LocalSyncParallelOptimizer(
adam,
@@ -299,8 +316,7 @@ class TFMultiGPULearner(LearnerThread):
[v for _, v in self.policy._loss_inputs],
rnn_inputs,
999999, # it will get rounded down
self.policy.copy,
grad_norm_clipping=grad_clip))
self.policy.copy))
self.sess = self.local_evaluator.tf_sess
self.sess.run(tf.global_variables_initializer())
@@ -313,18 +329,22 @@ class TFMultiGPULearner(LearnerThread):
self.loader_thread = _LoaderThread(self, share_stats=(i == 0))
self.loader_thread.start()
self.minibatch_buffer = MinibatchBuffer(
self.ready_optimizers, minibatch_buffer_size, num_sgd_iter)
@override(LearnerThread)
def step(self):
assert self.loader_thread.is_alive()
with self.load_wait_timer:
opt = self.ready_optimizers.get()
opt, released = self.minibatch_buffer.get()
if released:
self.idle_optimizers.put(opt)
with self.grad_timer:
fetches = opt.optimize(self.sess, 0)
self.weights_updated = True
self.stats = fetches.get("stats", {})
self.idle_optimizers.put(opt)
self.outqueue.put(self.train_batch_size)
self.learner_queue_size.push(self.inqueue.qsize())
@@ -363,3 +383,43 @@ class _LoaderThread(threading.Thread):
[tuples[k] for k in state_keys])
s.ready_optimizers.put(opt)
class MinibatchBuffer(object):
"""Ring buffer of recent data batches for minibatch SGD."""
def __init__(self, inqueue, size, num_passes):
"""Initialize a minibatch buffer.
Arguments:
inqueue: Queue to populate the internal ring buffer from.
size: Max number of data items to buffer.
num_passes: Max num times each data item should be emitted.
"""
self.inqueue = inqueue
self.size = size
self.max_ttl = num_passes
self.cur_max_ttl = 1 # ramp up slowly to better mix the input data
self.buffers = [None] * size
self.ttl = [0] * size
self.idx = 0
def get(self):
"""Get a new batch from the internal ring buffer.
Returns:
buf: Data item saved from inqueue.
released: True if the item is now removed from the ring buffer.
"""
if self.ttl[self.idx] <= 0:
self.buffers[self.idx] = self.inqueue.get()
self.ttl[self.idx] = self.cur_max_ttl
if self.cur_max_ttl < self.max_ttl:
self.cur_max_ttl += 1
buf = self.buffers[self.idx]
self.ttl[self.idx] -= 1
released = self.ttl[self.idx] <= 0
if released:
self.buffers[self.idx] = None
self.idx = (self.idx + 1) % len(self.buffers)
return buf, released
@@ -46,7 +46,6 @@ class LocalSyncParallelOptimizer(object):
clipped.
build_graph: Function that takes the specified inputs and returns a
TF Policy Graph instance.
grad_norm_clipping: None or int stdev to clip grad norms by
"""
def __init__(self,
+140 -4
View File
@@ -2,14 +2,18 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gym
import numpy as np
import tensorflow as tf
import time
import unittest
import numpy as np
import ray
from ray.rllib.test.mock_evaluator import _MockEvaluator
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph
from ray.rllib.evaluation import SampleBatch
from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator
from ray.rllib.optimizers import AsyncGradientsOptimizer, AsyncSamplesOptimizer
from ray.rllib.test.mock_evaluator import _MockEvaluator
class AsyncOptimizerTest(unittest.TestCase):
@@ -40,5 +44,137 @@ class SampleBatchTest(unittest.TestCase):
self.assertEqual(b["b"].tolist(), [4, 5, 6, 4, 5])
class AsyncSamplesOptimizerTest(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.shutdown()
@classmethod
def setUpClass(cls):
ray.init(num_cpus=8)
def testSimple(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(local, remotes, {})
self._wait_for(optimizer, 1000, 1000)
def testMultiGPU(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(local, remotes, {
"num_gpus": 2,
"_fake_gpus": True
})
self._wait_for(optimizer, 1000, 1000)
def testMultiGPUParallelLoad(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(local, remotes, {
"num_gpus": 2,
"num_data_loader_buffers": 2,
"_fake_gpus": True
})
self._wait_for(optimizer, 1000, 1000)
def testMultiplePasses(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(
local, remotes, {
"minibatch_buffer_size": 10,
"num_sgd_iter": 10,
"sample_batch_size": 10,
"train_batch_size": 50,
})
self._wait_for(optimizer, 1000, 10000)
self.assertLess(optimizer.stats()["num_steps_sampled"], 5000)
self.assertGreater(optimizer.stats()["num_steps_trained"], 8000)
def testReplay(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(
local, remotes, {
"replay_buffer_num_slots": 100,
"replay_proportion": 10,
"sample_batch_size": 10,
"train_batch_size": 10,
})
self._wait_for(optimizer, 1000, 1000)
self.assertLess(optimizer.stats()["num_steps_sampled"], 5000)
self.assertGreater(optimizer.stats()["num_steps_replayed"], 8000)
self.assertGreater(optimizer.stats()["num_steps_trained"], 8000)
def testReplayAndMultiplePasses(self):
local, remotes = self._make_evs()
optimizer = AsyncSamplesOptimizer(
local, remotes, {
"minibatch_buffer_size": 10,
"num_sgd_iter": 10,
"replay_buffer_num_slots": 100,
"replay_proportion": 10,
"sample_batch_size": 10,
"train_batch_size": 10,
})
self._wait_for(optimizer, 1000, 1000)
self.assertLess(optimizer.stats()["num_steps_sampled"], 5000)
self.assertGreater(optimizer.stats()["num_steps_replayed"], 8000)
self.assertGreater(optimizer.stats()["num_steps_trained"], 40000)
def testRejectBadConfigs(self):
local, remotes = self._make_evs()
self.assertRaises(
ValueError, lambda: AsyncSamplesOptimizer(
local, remotes,
{"num_data_loader_buffers": 2, "minibatch_buffer_size": 4}))
optimizer = AsyncSamplesOptimizer(
local, remotes, {
"num_gpus": 2,
"train_batch_size": 100,
"sample_batch_size": 50,
"_fake_gpus": True
})
self._wait_for(optimizer, 1000, 1000)
optimizer = AsyncSamplesOptimizer(
local, remotes, {
"num_gpus": 2,
"train_batch_size": 100,
"sample_batch_size": 25,
"_fake_gpus": True
})
self._wait_for(optimizer, 1000, 1000)
optimizer = AsyncSamplesOptimizer(
local, remotes, {
"num_gpus": 2,
"train_batch_size": 100,
"sample_batch_size": 74,
"_fake_gpus": True
})
self._wait_for(optimizer, 1000, 1000)
def _make_evs(self):
def make_sess():
return tf.Session(config=tf.ConfigProto(device_count={"CPU": 2}))
local = PolicyEvaluator(
env_creator=lambda _: gym.make("CartPole-v0"),
policy_graph=PPOPolicyGraph,
tf_session_creator=make_sess)
remotes = [
PolicyEvaluator.as_remote().remote(
env_creator=lambda _: gym.make("CartPole-v0"),
policy_graph=PPOPolicyGraph,
tf_session_creator=make_sess)
]
return local, remotes
def _wait_for(self, optimizer, num_steps_sampled, num_steps_trained):
start = time.time()
while time.time() - start < 30:
optimizer.step()
if optimizer.num_steps_sampled > num_steps_sampled and \
optimizer.num_steps_trained > num_steps_trained:
print("OK", optimizer.stats())
return
raise AssertionError("TIMED OUT", optimizer.stats())
if __name__ == '__main__':
unittest.main(verbosity=2)
@@ -13,7 +13,7 @@ pong-impala-fast:
num_envs_per_worker: 5
broadcast_interval: 5
max_sample_requests_in_flight_per_worker: 1
num_parallel_data_loaders: 4
num_data_loader_buffers: 4
num_gpus: 2
model:
dim: 42