[rllib] [docs] Cleanup RLlib API and make docs consistent with upcoming blog post (#1708)

* wip

* more work

* fix apex

* docs

* apex doc

* pool comment

* clean up

* make wrap stack pluggable

* Mon Mar 12 21:45:50 PDT 2018

* clean up comment

* table

* Mon Mar 12 22:51:57 PDT 2018

* Mon Mar 12 22:53:05 PDT 2018

* Mon Mar 12 22:55:03 PDT 2018

* Mon Mar 12 22:56:18 PDT 2018

* Mon Mar 12 22:59:54 PDT 2018

* Update apex_optimizer.py

* Update index.rst

* Update README.rst

* Update README.rst

* comments

* Wed Mar 14 19:01:02 PDT 2018
This commit is contained in:
Eric Liang
2018-03-15 15:57:31 -07:00
committed by GitHub
parent c19c2a4e60
commit 882a649f0c
27 changed files with 282 additions and 128 deletions
+4 -3
View File
@@ -1,13 +1,14 @@
from ray.rllib.optimizers.apex_optimizer import ApexOptimizer
from ray.rllib.optimizers.async import AsyncOptimizer
from ray.rllib.optimizers.async_optimizer import AsyncOptimizer
from ray.rllib.optimizers.local_sync import LocalSyncOptimizer
from ray.rllib.optimizers.local_sync_replay import LocalSyncReplayOptimizer
from ray.rllib.optimizers.multi_gpu import LocalMultiGPUOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.optimizers.evaluator import Evaluator, TFMultiGPUSupport
from ray.rllib.optimizers.policy_evaluator import PolicyEvaluator, \
TFMultiGPUSupport
__all__ = [
"ApexOptimizer", "AsyncOptimizer", "LocalSyncOptimizer",
"LocalSyncReplayOptimizer", "LocalMultiGPUOptimizer", "SampleBatch",
"Evaluator", "TFMultiGPUSupport"]
"PolicyEvaluator", "TFMultiGPUSupport"]
+32 -14
View File
@@ -1,3 +1,7 @@
"""Implements Distributed Prioritized Experience Replay.
https://arxiv.org/abs/1803.00933"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
@@ -11,7 +15,7 @@ import threading
import numpy as np
import ray
from ray.rllib.optimizers.optimizer import Optimizer
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.replay_buffer import PrioritizedReplayBuffer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.utils.actors import TaskPool, create_colocated
@@ -25,6 +29,11 @@ LEARNER_QUEUE_MAX_SIZE = 16
@ray.remote
class ReplayActor(object):
"""A replay buffer shard.
Ray actors are single-threaded, so for scalability multiple replay actors
may be created to increase parallelism."""
def __init__(
self, num_shards, learning_starts, buffer_size, train_batch_size,
prioritized_replay_alpha, prioritized_replay_beta,
@@ -89,7 +98,15 @@ class ReplayActor(object):
return stat
class GenericLearner(threading.Thread):
class LearnerThread(threading.Thread):
"""Background thread that updates the local model from replay data.
The learner thread communicates with the main thread through Queues. This
is needed since Ray operations can only be run on the main thread. In
addition, moving heavyweight gradient ops session runs off the main thread
improves overall throughput.
"""
def __init__(self, local_evaluator):
threading.Thread.__init__(self)
self.learner_queue_size = WindowStat("size", 50)
@@ -110,13 +127,22 @@ class GenericLearner(threading.Thread):
ra, replay = self.inqueue.get()
if replay is not None:
with self.grad_timer:
td_error = self.local_evaluator.compute_apply(replay)
td_error = self.local_evaluator.compute_apply(replay)[
"td_error"]
self.outqueue.put((ra, replay, td_error))
self.learner_queue_size.push(self.inqueue.qsize())
self.weights_updated = True
class ApexOptimizer(Optimizer):
class ApexOptimizer(PolicyOptimizer):
"""Main event loop of the Ape-X optimizer.
This class coordinates the data transfers between the learner thread,
remote evaluators (Ape-X actors), and replay buffer actors.
This optimizer requires that policy evaluators return an additional
"td_error" array in the info return of compute_gradients(). This error
term will be used for sample prioritization."""
def _init(
self, learning_starts=1000, buffer_size=10000,
@@ -134,7 +160,7 @@ class ApexOptimizer(Optimizer):
self.sample_batch_size = sample_batch_size
self.max_weight_sync_delay = max_weight_sync_delay
self.learner = GenericLearner(self.local_evaluator)
self.learner = LearnerThread(self.local_evaluator)
self.learner.start()
self.replay_actors = create_colocated(
@@ -189,10 +215,7 @@ class ApexOptimizer(Optimizer):
weights = None
with self.timers["sample_processing"]:
i = 0
num_weight_syncs = 0
for ev, sample_batch in self.sample_tasks.completed():
i += 1
sample_timesteps += self.sample_batch_size
# Send the data to the replay buffer
@@ -211,16 +234,13 @@ class ApexOptimizer(Optimizer):
self.local_evaluator.get_weights())
ev.set_weights.remote(weights)
self.num_weight_syncs += 1
num_weight_syncs += 1
self.steps_since_update[ev] = 0
# Kick off another sample request
self.sample_tasks.add(ev, ev.sample.remote())
with self.timers["replay_processing"]:
i = 0
for ra, replay in self.replay_tasks.completed():
i += 1
self.replay_tasks.add(ra, ra.replay.remote())
with self.timers["get_samples"]:
samples = ray.get(replay)
@@ -228,9 +248,7 @@ class ApexOptimizer(Optimizer):
self.learner.inqueue.put((ra, samples))
with self.timers["update_priorities"]:
i = 0
while not self.learner.outqueue.empty():
i += 1
ra, replay, td_error = self.learner.outqueue.get()
ra.update_priorities.remote(replay["batch_indexes"], td_error)
train_timesteps += self.train_batch_size
@@ -262,4 +280,4 @@ class ApexOptimizer(Optimizer):
}
if self.debug:
stats.update(debug_stats)
return dict(Optimizer.stats(self), **stats)
return dict(PolicyOptimizer.stats(self), **stats)
@@ -3,11 +3,11 @@ from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.optimizers.optimizer import Optimizer
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.utils.timer import TimerStat
class AsyncOptimizer(Optimizer):
class AsyncOptimizer(PolicyOptimizer):
"""An asynchronous RL optimizer, e.g. for implementing A3C.
This optimizer asynchronously pulls and applies gradients from remote
@@ -37,7 +37,7 @@ class AsyncOptimizer(Optimizer):
while gradient_queue:
with self.wait_timer:
fut, e = gradient_queue.pop(0)
gradient = ray.get(fut)
gradient, _ = ray.get(fut)
if gradient is not None:
with self.apply_timer:
@@ -54,7 +54,7 @@ class AsyncOptimizer(Optimizer):
self.num_steps_trained += self.grads_per_step * self.batch_size
def stats(self):
return dict(Optimizer.stats(), **{
return dict(PolicyOptimizer.stats(), **{
"wait_time_ms": round(1000 * self.wait_timer.mean, 3),
"apply_time_ms": round(1000 * self.apply_timer.mean, 3),
"dispatch_time_ms": round(1000 * self.dispatch_timer.mean, 3),
+4 -4
View File
@@ -3,13 +3,13 @@ from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.optimizers.optimizer import Optimizer
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.utils.filter import RunningStat
from ray.rllib.utils.timer import TimerStat
class LocalSyncOptimizer(Optimizer):
class LocalSyncOptimizer(PolicyOptimizer):
"""A simple synchronous RL optimizer.
In each step, this optimizer pulls samples from a number of remote
@@ -40,7 +40,7 @@ class LocalSyncOptimizer(Optimizer):
samples = self.local_evaluator.sample()
with self.grad_timer:
grad = self.local_evaluator.compute_gradients(samples)
grad, _ = self.local_evaluator.compute_gradients(samples)
self.local_evaluator.apply_gradients(grad)
self.grad_timer.push_units_processed(samples.count)
@@ -48,7 +48,7 @@ class LocalSyncOptimizer(Optimizer):
self.num_steps_trained += samples.count
def stats(self):
return dict(Optimizer.stats(self), **{
return dict(PolicyOptimizer.stats(self), **{
"sample_time_ms": round(1000 * self.sample_timer.mean, 3),
"grad_time_ms": round(1000 * self.grad_timer.mean, 3),
"update_time_ms": round(1000 * self.update_weights_timer.mean, 3),
@@ -7,14 +7,18 @@ import numpy as np
import ray
from ray.rllib.optimizers.replay_buffer import ReplayBuffer, \
PrioritizedReplayBuffer
from ray.rllib.optimizers.optimizer import Optimizer
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.utils.filter import RunningStat
from ray.rllib.utils.timer import TimerStat
class LocalSyncReplayOptimizer(Optimizer):
"""Variant of the local sync optimizer that supports replay (for DQN)."""
class LocalSyncReplayOptimizer(PolicyOptimizer):
"""Variant of the local sync optimizer that supports replay (for DQN).
This optimizer requires that policy evaluators return an additional
"td_error" array in the info return of compute_gradients(). This error
term will be used for sample prioritization."""
def _init(
self, learning_starts=1000, buffer_size=10000,
@@ -88,7 +92,7 @@ class LocalSyncReplayOptimizer(Optimizer):
"batch_indexes": batch_indexes})
with self.grad_timer:
td_error = self.local_evaluator.compute_apply(samples)
td_error = self.local_evaluator.compute_apply(samples)["td_error"]
new_priorities = (
np.abs(td_error) + self.prioritized_replay_eps)
if isinstance(self.replay_buffer, PrioritizedReplayBuffer):
@@ -99,7 +103,7 @@ class LocalSyncReplayOptimizer(Optimizer):
self.num_steps_trained += samples.count
def stats(self):
return dict(Optimizer.stats(self), **{
return dict(PolicyOptimizer.stats(self), **{
"sample_time_ms": round(1000 * self.sample_timer.mean, 3),
"replay_time_ms": round(1000 * self.replay_timer.mean, 3),
"grad_time_ms": round(1000 * self.grad_timer.mean, 3),
+4 -4
View File
@@ -7,14 +7,14 @@ import os
import tensorflow as tf
import ray
from ray.rllib.optimizers.evaluator import TFMultiGPUSupport
from ray.rllib.optimizers.optimizer import Optimizer
from ray.rllib.optimizers.policy_evaluator import TFMultiGPUSupport
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer
from ray.rllib.utils.timer import TimerStat
class LocalMultiGPUOptimizer(Optimizer):
class LocalMultiGPUOptimizer(PolicyOptimizer):
"""A synchronous optimizer that uses multiple local GPUs.
Samples are pulled synchronously from multiple remote evaluators,
@@ -102,7 +102,7 @@ class LocalMultiGPUOptimizer(Optimizer):
self.num_steps_trained += samples.count
def stats(self):
return dict(Optimizer.stats(), **{
return dict(PolicyOptimizer.stats(), **{
"sample_time_ms": round(1000 * self.sample_timer.mean, 3),
"load_time_ms": round(1000 * self.load_timer.mean, 3),
"grad_time_ms": round(1000 * self.grad_timer.mean, 3),
@@ -5,22 +5,26 @@ from __future__ import print_function
import os
class Evaluator(object):
"""Algorithms implement this interface to leverage RLlib optimizers.
class PolicyEvaluator(object):
"""Algorithms implement this interface to leverage policy optimizers.
Any algorithm that implements Evaluator can plug in any RLLib optimizer,
e.g. async SGD, local multi-GPU SGD, etc.
Policy evaluators are the "data plane" of an algorithm.
Any algorithm that implements Evaluator can plug in any PolicyOptimizer,
e.g. async SGD, Ape-X, local multi-GPU SGD, etc.
"""
def sample(self):
"""Returns experience samples from this Evaluator.
"""Returns a batch of experience sampled from this evaluator.
This method must be implemented by subclasses.
Returns:
SampleBatch: A columnar batch of experiences.
SampleBatch: A columnar batch of experiences (e.g., tensors).
Examples:
>>> print(ev.sample())
SampleBatch({"a": [1, 2, 3], "b": [4, 5, 6]})
SampleBatch({"obs": [1, 2, 3], "action": [0, 1, 0], ...})
"""
raise NotImplementedError
@@ -28,18 +32,27 @@ class Evaluator(object):
def compute_gradients(self, samples):
"""Returns a gradient computed w.r.t the specified samples.
This method must be implemented by subclasses.
Returns:
object: A gradient that can be applied on a compatible evaluator.
info: dictionary of extra metadata.
Examples:
>>> batch = ev.sample()
>>> grads, info = ev2.compute_gradients(samples)
"""
raise NotImplementedError
def apply_gradients(self, grads):
"""Applies the given gradients to this Evaluator's weights.
"""Applies the given gradients to this evaluator's weights.
This method must be implemented by subclasses.
Examples:
>>> samples = ev1.sample()
>>> grads = ev2.compute_gradients(samples)
>>> grads, info = ev2.compute_gradients(samples)
>>> ev1.apply_gradients(grads)
"""
@@ -48,8 +61,14 @@ class Evaluator(object):
def get_weights(self):
"""Returns the model weights of this Evaluator.
This method must be implemented by subclasses.
Returns:
object: weights that can be set on a compatible evaluator.
info: dictionary of extra metadata.
Examples:
>>> weights = ev1.get_weights()
"""
raise NotImplementedError
@@ -57,6 +76,8 @@ class Evaluator(object):
def set_weights(self, weights):
"""Sets the model weights of this Evaluator.
This method must be implemented by subclasses.
Examples:
>>> weights = ev1.get_weights()
>>> ev2.set_weights(weights)
@@ -65,27 +86,31 @@ class Evaluator(object):
raise NotImplementedError
def compute_apply(self, samples):
"""Fused compute and apply gradients on given samples.
"""Fused compute gradients and apply gradients call.
Returns:
The result of calling compute_gradients(samples)
info: dictionary of extra metadata from compute_gradients().
Examples:
>>> batch = ev.sample()
>>> ev.compute_apply(samples)
"""
grads = self.compute_gradients(samples)
grads, info = self.compute_gradients(samples)
self.apply_gradients(grads)
return grads
return info
def get_host(self):
"""Returns hostname of actor."""
"""Returns the hostname of the process running this evaluator."""
return os.uname()[1]
class TFMultiGPUSupport(Evaluator):
"""The multi-GPU TF optimizer requires additional TF-specific supportt.
class TFMultiGPUSupport(PolicyEvaluator):
"""The multi-GPU TF optimizer requires additional TF-specific support.
Attributes:
sess (Session) the tensorflow session associated with this evaluator
sess (Session): the tensorflow session associated with this evaluator.
"""
def tf_loss_inputs(self):
@@ -102,7 +127,7 @@ class TFMultiGPUSupport(Evaluator):
>>> print(ev.tf_loss_inputs())
[("action", action_placeholder), ("reward", reward_placeholder)]
>>> print(ev.sample().data.keys())
>>> print(ev.sample()[0].data.keys())
["action", "reward"]
"""
@@ -5,17 +5,27 @@ from __future__ import print_function
import ray
class Optimizer(object):
"""RLlib optimizers encapsulate distributed RL optimization strategies.
class PolicyOptimizer(object):
"""Policy optimizers encapsulate distributed RL optimization strategies.
Policy optimizers serve as the "control plane" of algorithms.
For example, AsyncOptimizer is used for A3C, and LocalMultiGPUOptimizer is
used for PPO. These optimizers are all pluggable, and it is possible
to mix and match as needed.
In order for an algorithm to use an RLlib optimizer, it must implement
the Evaluator interface and pass a number of Evaluators to its Optimizer
of choice. The Optimizer uses these Evaluators to sample from the
environment and compute model gradient updates.
the PolicyEvaluator interface and pass a PolicyEvaluator class or set of
PolicyEvaluators to its PolicyOptimizer of choice. The PolicyOptimizer
uses these Evaluators to sample from the environment and compute model
gradient updates.
Attributes:
config (dict): The JSON configuration passed to this optimizer.
local_evaluator (PolicyEvaluator): The embedded evaluator instance.
remote_evaluators (list): List of remote evaluator replicas, or [].
num_steps_trained (int): Number of timesteps trained on so far.
num_steps_sampled (int): Number of timesteps sampled so far.
"""
@classmethod
@@ -59,10 +69,17 @@ class Optimizer(object):
self.num_steps_sampled = 0
def _init(self):
"""Subclasses should prefer overriding this instead of __init__."""
pass
def step(self):
"""Takes a logical optimization step."""
"""Takes a logical optimization step.
This should run for long enough to minimize call overheads (i.e., at
least a couple seconds), but short enough to return control
periodically to callers (i.e., at most a few tens of seconds).
"""
raise NotImplementedError
@@ -75,8 +92,12 @@ class Optimizer(object):
}
def save(self):
"""Returns a serializable object representing the optimizer state."""
return [self.num_steps_trained, self.num_steps_sampled]
def restore(self, data):
"""Restores optimizer state from the given data object."""
self.num_steps_trained = data[0]
self.num_steps_sampled = data[1]