From ffa6ee3ec8056aa1dbaef7b7569d802a5f0ec4b9 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 19 Dec 2018 02:23:29 -0800 Subject: [PATCH] [rllib] streaming minibatching for IMPALA (#3402) * mb impala * fix * paropt * update * cpu warn * on cpu * fix mb * doc * docs * comment * larger num * early release * remove grad clip * only check loader count in multi gpu mode * revert bad multigpu changes * num sgd iter * comment * reuse optimizer * add test * par load test * loosen test * Update run_multi_node_tests.sh * fix local mode * Update agent.py --- python/ray/rllib/agents/agent.py | 2 +- python/ray/rllib/agents/impala/impala.py | 33 +++- .../ray/rllib/evaluation/policy_evaluator.py | 5 + .../optimizers/async_samples_optimizer.py | 108 ++++++++++--- python/ray/rllib/optimizers/multi_gpu_impl.py | 1 - python/ray/rllib/test/test_optimizers.py | 144 +++++++++++++++++- .../tuned_examples/pong-impala-fast.yaml | 2 +- test/jenkins_tests/run_multi_node_tests.sh | 4 +- 8 files changed, 258 insertions(+), 41 deletions(-) diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 04420b71e..69839b9a5 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -130,7 +130,7 @@ COMMON_CONFIG = { # Drop metric batches from unresponsive workers after this many seconds "collect_metrics_timeout": 180, - # === Offline Data Input / Output === + # === Offline Data Input / Output (Experimental) === # Specify how to generate experiences: # - "sampler": generate experiences via online simulation (default) # - a local directory or file glob expression (e.g., "/tmp/*.json") diff --git a/python/ray/rllib/agents/impala/impala.py b/python/ray/rllib/agents/impala/impala.py index aa789387f..bab04f482 100644 --- a/python/ray/rllib/agents/impala/impala.py +++ b/python/ray/rllib/agents/impala/impala.py @@ -18,10 +18,11 @@ OPTIMIZER_SHARED_CONFIGS = [ "train_batch_size", "replay_buffer_num_slots", "replay_proportion", - "num_parallel_data_loaders", - "grad_clip", + "num_data_loader_buffers", "max_sample_requests_in_flight_per_worker", "broadcast_interval", + "num_sgd_iter", + "minibatch_buffer_size", ] # yapf: disable @@ -33,6 +34,17 @@ DEFAULT_CONFIG = with_common_config({ "vtrace_clip_pg_rho_threshold": 1.0, # System params. + # + # == Overview of data flow in IMPALA == + # 1. Policy evaluation in parallel across `num_workers` actors produces + # batches of size `sample_batch_size * num_envs_per_worker`. + # 2. If enabled, the replay buffer stores and produces batches of size + # `sample_batch_size * num_envs_per_worker`. + # 3. If enabled, the minibatch ring buffer stores and replays batches of + # size `train_batch_size` up to `num_sgd_iter` times per batch. + # 4. The learner thread executes data parallel SGD across `num_gpus` GPUs + # on batches of size `train_batch_size`. + # "sample_batch_size": 50, "train_batch_size": 500, "min_iter_time_s": 10, @@ -40,18 +52,23 @@ DEFAULT_CONFIG = with_common_config({ # number of GPUs the learner should use. "num_gpus": 1, # set >1 to load data into GPUs in parallel. Increases GPU memory usage - # proportionally with the number of loaders. - "num_parallel_data_loaders": 1, - # level of queuing for sampling. - "max_sample_requests_in_flight_per_worker": 2, - # max number of workers to broadcast one set of weights to - "broadcast_interval": 1, + # proportionally with the number of buffers. + "num_data_loader_buffers": 1, + # how many train batches should be retained for minibatching. This conf + # only has an effect if `num_sgd_iter > 1`. + "minibatch_buffer_size": 1, + # number of passes to make over each train batch + "num_sgd_iter": 1, # set >0 to enable experience replay. Saved samples will be replayed with # a p:1 proportion to new data samples. "replay_proportion": 0.0, # number of sample batches to store for replay. The number of transitions # saved total will be (replay_buffer_num_slots * sample_batch_size). "replay_buffer_num_slots": 100, + # level of queuing for sampling. + "max_sample_requests_in_flight_per_worker": 2, + # max number of workers to broadcast one set of weights to + "broadcast_interval": 1, # Learning params. "grad_clip": 40.0, diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index e51f0912e..f2e78551f 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -255,6 +255,11 @@ class PolicyEvaluator(EvaluatorInterface): policy_dict = _validate_and_canonicalize(policy_graph, self.env) self.policies_to_train = policies_to_train or list(policy_dict.keys()) if _has_tensorflow_graph(policy_dict): + if (ray.worker._mode() != ray.worker.LOCAL_MODE + and not ray.get_gpu_ids()): + logger.info("Creating policy evaluation worker {}".format( + worker_index) + + " on CPU (please ignore any CUDA init errors)") with tf.Graph().as_default(): if tf_session_creator: self.tf_sess = tf_session_creator() diff --git a/python/ray/rllib/optimizers/async_samples_optimizer.py b/python/ray/rllib/optimizers/async_samples_optimizer.py index ad0d86dfc..9322e1fcd 100644 --- a/python/ray/rllib/optimizers/async_samples_optimizer.py +++ b/python/ray/rllib/optimizers/async_samples_optimizer.py @@ -42,34 +42,41 @@ class AsyncSamplesOptimizer(PolicyOptimizer): num_envs_per_worker=1, num_gpus=0, lr=0.0005, - grad_clip=40, replay_buffer_num_slots=0, replay_proportion=0.0, - num_parallel_data_loaders=1, + num_data_loader_buffers=1, max_sample_requests_in_flight_per_worker=2, - broadcast_interval=1): + broadcast_interval=1, + num_sgd_iter=1, + minibatch_buffer_size=1, + _fake_gpus=False): self.learning_started = False self.train_batch_size = train_batch_size self.sample_batch_size = sample_batch_size self.broadcast_interval = broadcast_interval - if num_gpus > 1 or num_parallel_data_loaders > 1: + if num_gpus > 1 or num_data_loader_buffers > 1: logger.info( "Enabling multi-GPU mode, {} GPUs, {} parallel loaders".format( - num_gpus, num_parallel_data_loaders)) - if train_batch_size // max(1, num_gpus) % ( - sample_batch_size // num_envs_per_worker) != 0: + num_gpus, num_data_loader_buffers)) + if num_data_loader_buffers < minibatch_buffer_size: raise ValueError( - "Sample batches must evenly divide across GPUs.") + "In multi-gpu mode you must have at least as many " + "parallel data loader buffers as minibatch buffers: " + "{} vs {}".format(num_data_loader_buffers, + minibatch_buffer_size)) self.learner = TFMultiGPULearner( self.local_evaluator, lr=lr, num_gpus=num_gpus, train_batch_size=train_batch_size, - grad_clip=grad_clip, - num_parallel_data_loaders=num_parallel_data_loaders) + num_data_loader_buffers=num_data_loader_buffers, + minibatch_buffer_size=minibatch_buffer_size, + num_sgd_iter=num_sgd_iter, + _fake_gpus=_fake_gpus) else: - self.learner = LearnerThread(self.local_evaluator) + self.learner = LearnerThread(self.local_evaluator, + minibatch_buffer_size, num_sgd_iter) self.learner.start() assert len(self.remote_evaluators) > 0 @@ -91,8 +98,11 @@ class AsyncSamplesOptimizer(PolicyOptimizer): self.batch_buffer = [] if replay_proportion: - assert replay_buffer_num_slots > 0 - assert (replay_buffer_num_slots * sample_batch_size > + if replay_buffer_num_slots * sample_batch_size <= train_batch_size: + raise ValueError( + "Replay buffer size is too small to produce train, " + "please increase replay_buffer_num_slots.", + replay_buffer_num_slots, sample_batch_size, train_batch_size) self.replay_proportion = replay_proportion self.replay_buffer_num_slots = replay_buffer_num_slots @@ -220,12 +230,14 @@ class LearnerThread(threading.Thread): improves overall throughput. """ - def __init__(self, local_evaluator): + def __init__(self, local_evaluator, minibatch_buffer_size, num_sgd_iter): threading.Thread.__init__(self) self.learner_queue_size = WindowStat("size", 50) self.local_evaluator = local_evaluator self.inqueue = queue.Queue(maxsize=LEARNER_QUEUE_MAX_SIZE) self.outqueue = queue.Queue() + self.minibatch_buffer = MinibatchBuffer( + self.inqueue, minibatch_buffer_size, num_sgd_iter) self.queue_timer = TimerStat() self.grad_timer = TimerStat() self.load_timer = TimerStat() @@ -241,7 +253,7 @@ class LearnerThread(threading.Thread): def step(self): with self.queue_timer: - batch = self.inqueue.get() + batch, _ = self.minibatch_buffer.get() with self.grad_timer: fetches = self.local_evaluator.compute_apply(batch) @@ -260,19 +272,24 @@ class TFMultiGPULearner(LearnerThread): num_gpus=1, lr=0.0005, train_batch_size=500, - grad_clip=40, - num_parallel_data_loaders=1): + num_data_loader_buffers=1, + minibatch_buffer_size=1, + num_sgd_iter=1, + _fake_gpus=False): # Multi-GPU requires TensorFlow to function. import tensorflow as tf - LearnerThread.__init__(self, local_evaluator) + LearnerThread.__init__(self, local_evaluator, minibatch_buffer_size, + num_sgd_iter) self.lr = lr self.train_batch_size = train_batch_size if not num_gpus: self.devices = ["/cpu:0"] + elif _fake_gpus: + self.devices = ["/cpu:{}".format(i) for i in range(num_gpus)] else: self.devices = ["/gpu:{}".format(i) for i in range(num_gpus)] - logger.info("TFMultiGPULearner devices {}".format(self.devices)) + logger.info("TFMultiGPULearner devices {}".format(self.devices)) assert self.train_batch_size % len(self.devices) == 0 assert self.train_batch_size >= len(self.devices), "batch too small" self.policy = self.local_evaluator.policy_map["default"] @@ -291,7 +308,7 @@ class TFMultiGPULearner(LearnerThread): else: rnn_inputs = [] adam = tf.train.AdamOptimizer(self.lr) - for _ in range(num_parallel_data_loaders): + for _ in range(num_data_loader_buffers): self.par_opt.append( LocalSyncParallelOptimizer( adam, @@ -299,8 +316,7 @@ class TFMultiGPULearner(LearnerThread): [v for _, v in self.policy._loss_inputs], rnn_inputs, 999999, # it will get rounded down - self.policy.copy, - grad_norm_clipping=grad_clip)) + self.policy.copy)) self.sess = self.local_evaluator.tf_sess self.sess.run(tf.global_variables_initializer()) @@ -313,18 +329,22 @@ class TFMultiGPULearner(LearnerThread): self.loader_thread = _LoaderThread(self, share_stats=(i == 0)) self.loader_thread.start() + self.minibatch_buffer = MinibatchBuffer( + self.ready_optimizers, minibatch_buffer_size, num_sgd_iter) + @override(LearnerThread) def step(self): assert self.loader_thread.is_alive() with self.load_wait_timer: - opt = self.ready_optimizers.get() + opt, released = self.minibatch_buffer.get() + if released: + self.idle_optimizers.put(opt) with self.grad_timer: fetches = opt.optimize(self.sess, 0) self.weights_updated = True self.stats = fetches.get("stats", {}) - self.idle_optimizers.put(opt) self.outqueue.put(self.train_batch_size) self.learner_queue_size.push(self.inqueue.qsize()) @@ -363,3 +383,43 @@ class _LoaderThread(threading.Thread): [tuples[k] for k in state_keys]) s.ready_optimizers.put(opt) + + +class MinibatchBuffer(object): + """Ring buffer of recent data batches for minibatch SGD.""" + + def __init__(self, inqueue, size, num_passes): + """Initialize a minibatch buffer. + + Arguments: + inqueue: Queue to populate the internal ring buffer from. + size: Max number of data items to buffer. + num_passes: Max num times each data item should be emitted. + """ + self.inqueue = inqueue + self.size = size + self.max_ttl = num_passes + self.cur_max_ttl = 1 # ramp up slowly to better mix the input data + self.buffers = [None] * size + self.ttl = [0] * size + self.idx = 0 + + def get(self): + """Get a new batch from the internal ring buffer. + + Returns: + buf: Data item saved from inqueue. + released: True if the item is now removed from the ring buffer. + """ + if self.ttl[self.idx] <= 0: + self.buffers[self.idx] = self.inqueue.get() + self.ttl[self.idx] = self.cur_max_ttl + if self.cur_max_ttl < self.max_ttl: + self.cur_max_ttl += 1 + buf = self.buffers[self.idx] + self.ttl[self.idx] -= 1 + released = self.ttl[self.idx] <= 0 + if released: + self.buffers[self.idx] = None + self.idx = (self.idx + 1) % len(self.buffers) + return buf, released diff --git a/python/ray/rllib/optimizers/multi_gpu_impl.py b/python/ray/rllib/optimizers/multi_gpu_impl.py index c548b20cc..0a03df41c 100644 --- a/python/ray/rllib/optimizers/multi_gpu_impl.py +++ b/python/ray/rllib/optimizers/multi_gpu_impl.py @@ -46,7 +46,6 @@ class LocalSyncParallelOptimizer(object): clipped. build_graph: Function that takes the specified inputs and returns a TF Policy Graph instance. - grad_norm_clipping: None or int stdev to clip grad norms by """ def __init__(self, diff --git a/python/ray/rllib/test/test_optimizers.py b/python/ray/rllib/test/test_optimizers.py index 6a5022d36..ee08dcfdb 100644 --- a/python/ray/rllib/test/test_optimizers.py +++ b/python/ray/rllib/test/test_optimizers.py @@ -2,14 +2,18 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import gym +import numpy as np +import tensorflow as tf +import time import unittest -import numpy as np - import ray -from ray.rllib.test.mock_evaluator import _MockEvaluator -from ray.rllib.optimizers import AsyncGradientsOptimizer +from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph from ray.rllib.evaluation import SampleBatch +from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator +from ray.rllib.optimizers import AsyncGradientsOptimizer, AsyncSamplesOptimizer +from ray.rllib.test.mock_evaluator import _MockEvaluator class AsyncOptimizerTest(unittest.TestCase): @@ -40,5 +44,137 @@ class SampleBatchTest(unittest.TestCase): self.assertEqual(b["b"].tolist(), [4, 5, 6, 4, 5]) +class AsyncSamplesOptimizerTest(unittest.TestCase): + @classmethod + def tearDownClass(cls): + ray.shutdown() + + @classmethod + def setUpClass(cls): + ray.init(num_cpus=8) + + def testSimple(self): + local, remotes = self._make_evs() + optimizer = AsyncSamplesOptimizer(local, remotes, {}) + self._wait_for(optimizer, 1000, 1000) + + def testMultiGPU(self): + local, remotes = self._make_evs() + optimizer = AsyncSamplesOptimizer(local, remotes, { + "num_gpus": 2, + "_fake_gpus": True + }) + self._wait_for(optimizer, 1000, 1000) + + def testMultiGPUParallelLoad(self): + local, remotes = self._make_evs() + optimizer = AsyncSamplesOptimizer(local, remotes, { + "num_gpus": 2, + "num_data_loader_buffers": 2, + "_fake_gpus": True + }) + self._wait_for(optimizer, 1000, 1000) + + def testMultiplePasses(self): + local, remotes = self._make_evs() + optimizer = AsyncSamplesOptimizer( + local, remotes, { + "minibatch_buffer_size": 10, + "num_sgd_iter": 10, + "sample_batch_size": 10, + "train_batch_size": 50, + }) + self._wait_for(optimizer, 1000, 10000) + self.assertLess(optimizer.stats()["num_steps_sampled"], 5000) + self.assertGreater(optimizer.stats()["num_steps_trained"], 8000) + + def testReplay(self): + local, remotes = self._make_evs() + optimizer = AsyncSamplesOptimizer( + local, remotes, { + "replay_buffer_num_slots": 100, + "replay_proportion": 10, + "sample_batch_size": 10, + "train_batch_size": 10, + }) + self._wait_for(optimizer, 1000, 1000) + self.assertLess(optimizer.stats()["num_steps_sampled"], 5000) + self.assertGreater(optimizer.stats()["num_steps_replayed"], 8000) + self.assertGreater(optimizer.stats()["num_steps_trained"], 8000) + + def testReplayAndMultiplePasses(self): + local, remotes = self._make_evs() + optimizer = AsyncSamplesOptimizer( + local, remotes, { + "minibatch_buffer_size": 10, + "num_sgd_iter": 10, + "replay_buffer_num_slots": 100, + "replay_proportion": 10, + "sample_batch_size": 10, + "train_batch_size": 10, + }) + self._wait_for(optimizer, 1000, 1000) + self.assertLess(optimizer.stats()["num_steps_sampled"], 5000) + self.assertGreater(optimizer.stats()["num_steps_replayed"], 8000) + self.assertGreater(optimizer.stats()["num_steps_trained"], 40000) + + def testRejectBadConfigs(self): + local, remotes = self._make_evs() + self.assertRaises( + ValueError, lambda: AsyncSamplesOptimizer( + local, remotes, + {"num_data_loader_buffers": 2, "minibatch_buffer_size": 4})) + optimizer = AsyncSamplesOptimizer( + local, remotes, { + "num_gpus": 2, + "train_batch_size": 100, + "sample_batch_size": 50, + "_fake_gpus": True + }) + self._wait_for(optimizer, 1000, 1000) + optimizer = AsyncSamplesOptimizer( + local, remotes, { + "num_gpus": 2, + "train_batch_size": 100, + "sample_batch_size": 25, + "_fake_gpus": True + }) + self._wait_for(optimizer, 1000, 1000) + optimizer = AsyncSamplesOptimizer( + local, remotes, { + "num_gpus": 2, + "train_batch_size": 100, + "sample_batch_size": 74, + "_fake_gpus": True + }) + self._wait_for(optimizer, 1000, 1000) + + def _make_evs(self): + def make_sess(): + return tf.Session(config=tf.ConfigProto(device_count={"CPU": 2})) + + local = PolicyEvaluator( + env_creator=lambda _: gym.make("CartPole-v0"), + policy_graph=PPOPolicyGraph, + tf_session_creator=make_sess) + remotes = [ + PolicyEvaluator.as_remote().remote( + env_creator=lambda _: gym.make("CartPole-v0"), + policy_graph=PPOPolicyGraph, + tf_session_creator=make_sess) + ] + return local, remotes + + def _wait_for(self, optimizer, num_steps_sampled, num_steps_trained): + start = time.time() + while time.time() - start < 30: + optimizer.step() + if optimizer.num_steps_sampled > num_steps_sampled and \ + optimizer.num_steps_trained > num_steps_trained: + print("OK", optimizer.stats()) + return + raise AssertionError("TIMED OUT", optimizer.stats()) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/python/ray/rllib/tuned_examples/pong-impala-fast.yaml b/python/ray/rllib/tuned_examples/pong-impala-fast.yaml index 3c29f4e0c..2162e08d3 100644 --- a/python/ray/rllib/tuned_examples/pong-impala-fast.yaml +++ b/python/ray/rllib/tuned_examples/pong-impala-fast.yaml @@ -13,7 +13,7 @@ pong-impala-fast: num_envs_per_worker: 5 broadcast_interval: 5 max_sample_requests_in_flight_per_worker: 1 - num_parallel_data_loaders: 4 + num_data_loader_buffers: 4 num_gpus: 2 model: dim: 42 diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 75239e47c..84371615d 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -214,14 +214,14 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 2}' \ - --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_parallel_data_loaders": 2, "replay_proportion": 1.0}' + --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_data_loader_buffers": 2, "replay_proportion": 1.0}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 2}' \ - --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_parallel_data_loaders": 2, "replay_proportion": 1.0, "model": {"use_lstm": true}}' + --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_data_loader_buffers": 2, "replay_proportion": 1.0, "model": {"use_lstm": true}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \