diff --git a/python/ray/rllib/__init__.py b/python/ray/rllib/__init__.py index 6a45ff50a..6bbe33a53 100644 --- a/python/ray/rllib/__init__.py +++ b/python/ray/rllib/__init__.py @@ -8,7 +8,7 @@ from ray.tune.registry import register_trainable def _register_all(): - for key in ["PPO", "ES", "DQN", "A3C", "BC", "PG", "__fake", + for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "__fake", "__sigmoid_fake_data", "__parameter_tuning"]: try: from ray.rllib.agent import get_agent_class diff --git a/python/ray/rllib/agent.py b/python/ray/rllib/agent.py index a7db45c95..a17f2534d 100644 --- a/python/ray/rllib/agent.py +++ b/python/ray/rllib/agent.py @@ -231,6 +231,9 @@ def get_agent_class(alg): elif alg == "DQN": from ray.rllib import dqn return dqn.DQNAgent + elif alg == "APEX": + from ray.rllib import dqn + return dqn.ApexAgent elif alg == "A3C": from ray.rllib import a3c return a3c.A3CAgent diff --git a/python/ray/rllib/dqn/__init__.py b/python/ray/rllib/dqn/__init__.py index d42995b4e..a383adeb4 100644 --- a/python/ray/rllib/dqn/__init__.py +++ b/python/ray/rllib/dqn/__init__.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +from ray.rllib.dqn.apex import ApexAgent from ray.rllib.dqn.dqn import DQNAgent, DEFAULT_CONFIG -__all__ = ["DQNAgent", "DEFAULT_CONFIG"] +__all__ = ["ApexAgent", "DQNAgent", "DEFAULT_CONFIG"] diff --git a/python/ray/rllib/dqn/apex.py b/python/ray/rllib/dqn/apex.py new file mode 100644 index 000000000..dbc6d7229 --- /dev/null +++ b/python/ray/rllib/dqn/apex.py @@ -0,0 +1,45 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.dqn.dqn import DQNAgent, DEFAULT_CONFIG as DQN_CONFIG + +APEX_DEFAULT_CONFIG = dict(DQN_CONFIG, **dict( + optimizer_class="ApexOptimizer", + optimizer_config=dict(DQN_CONFIG["optimizer_config"], **dict( + max_weight_sync_delay=400, + num_replay_buffer_shards=4, + )), + n_step=3, + num_workers=32, + buffer_size=2000000, + learning_starts=50000, + train_batch_size=512, + sample_batch_size=50, + max_weight_sync_delay=400, + target_network_update_freq=500000, + timesteps_per_iteration=25000, + per_worker_exploration=True, + worker_side_prioritization=True, + force_evaluators_remote=False, # consider enabling for large clusters +)) + + +class ApexAgent(DQNAgent): + """DQN variant that uses the Ape-X distributed policy optimizer. + + By default, this is configured for a large single node (32 cores). For + running in a large cluster, increase `num_workers` and consider setting + `force_evaluators_remote` to move workers off of the head node. + """ + + _agent_name = "APEX" + _default_config = APEX_DEFAULT_CONFIG + + def update_target_if_needed(self): + # Ape-X updates based on num steps trained, not sampled + if self.optimizer.num_steps_trained - self.last_target_update_ts > \ + self.config["target_network_update_freq"]: + self.local_evaluator.update_target() + self.last_target_update_ts = self.optimizer.num_steps_trained + self.num_target_updates += 1 diff --git a/python/ray/rllib/dqn/common/tests/test_tf_util.py b/python/ray/rllib/dqn/common/tests/test_tf_util.py deleted file mode 100644 index b9bbdbe3e..000000000 --- a/python/ray/rllib/dqn/common/tests/test_tf_util.py +++ /dev/null @@ -1,73 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -# tests for tf_util -import tensorflow as tf -from ray.rllib.dqn.common.tf_util import ( - function, - initialize, - set_value, - single_threaded_session -) - - -def test_set_value(): - a = tf.Variable(42.) - with single_threaded_session(): - set_value(a, 5) - assert a.eval() == 5 - g = tf.get_default_graph() - g.finalize() - set_value(a, 6) - assert a.eval() == 6 - - # test the test - try: - assert a.eval() == 7 - except AssertionError: - pass - else: - assert False, "assertion should have failed" - - -def test_function(): - tf.reset_default_graph() - x = tf.placeholder(tf.int32, (), name="x") - y = tf.placeholder(tf.int32, (), name="y") - z = 3 * x + 2 * y - lin = function([x, y], z, givens={y: 0}) - - with single_threaded_session(): - initialize() - - assert lin(2) == 6 - assert lin(x=3) == 9 - assert lin(2, 2) == 10 - assert lin(x=2, y=3) == 12 - - -def test_multikwargs(): - tf.reset_default_graph() - x = tf.placeholder(tf.int32, (), name="x") - with tf.variable_scope("other"): - x2 = tf.placeholder(tf.int32, (), name="x") - z = 3 * x + 2 * x2 - - lin = function([x, x2], z, givens={x2: 0}) - with single_threaded_session(): - initialize() - assert lin(2) == 6 - assert lin(2, 2) == 10 - expt_caught = False - try: - lin(x=2) - except AssertionError: - expt_caught = True - assert expt_caught - - -if __name__ == '__main__': - test_set_value() - test_function() - test_multikwargs() diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index 51e501c53..ac018a0e5 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -9,14 +9,18 @@ import numpy as np import tensorflow as tf import ray +from ray.rllib import optimizers from ray.rllib.dqn.dqn_evaluator import DQNEvaluator -from ray.rllib.dqn.dqn_replay_evaluator import DQNReplayEvaluator -from ray.rllib.optimizers import AsyncOptimizer, LocalMultiGPUOptimizer, \ - LocalSyncOptimizer +from ray.rllib.utils.actors import split_colocated from ray.rllib.agent import Agent from ray.tune.result import TrainingResult +OPTIMIZER_SHARED_CONFIGS = [ + "buffer_size", "prioritized_replay", "prioritized_replay_alpha", + "prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size", + "train_batch_size", "learning_starts"] + DEFAULT_CONFIG = dict( # === Model === # Whether to use dueling dqn @@ -46,44 +50,36 @@ DEFAULT_CONFIG = dict( exploration_fraction=0.1, # Final value of random action probability exploration_final_eps=0.02, - # How many steps of the model to sample before learning starts. - learning_starts=1000, # Update the target network every `target_network_update_freq` steps. target_network_update_freq=500, # === Replay buffer === - # Size of the replay buffer. Note that if async_updates is set, then each - # worker will have a replay buffer of this size. + # Size of the replay buffer. Note that if async_updates is set, then + # each worker will have a replay buffer of this size. buffer_size=50000, # If True prioritized replay buffer will be used. prioritized_replay=True, - # Alpha parameter for prioritized replay buffer + # Alpha parameter for prioritized replay buffer. prioritized_replay_alpha=0.6, - # Initial value of beta for prioritized replay buffer - prioritized_replay_beta0=0.4, - # Number of iterations over which beta will be annealed from initial - # value to 1.0. If set to None equals to schedule_max_timesteps - prioritized_replay_beta_iters=None, + # Beta parameter for sampling from prioritized replay buffer. + prioritized_replay_beta=0.4, # Epsilon to add to the TD errors when updating priorities. prioritized_replay_eps=1e-6, # === Optimization === # Learning rate for adam optimizer lr=5e-4, - # Update the replay buffer with this many samples at once. Note that this - # setting applies per-worker if num_workers > 1. - sample_batch_size=1, - # Size of a batched sampled from replay buffer for training. Note that if - # async_updates is set, then each worker returns gradients for a batch of - # this size. - train_batch_size=32, - # SGD minibatch size. Note that this must be << train_batch_size. This - # config has no effect if gradients_on_workres is True. - sgd_batch_size=32, # If not None, clip gradients during optimization at this value - grad_norm_clipping=10, - # Arguments to pass to the rllib optimizer - optimizer={}, + grad_norm_clipping=40, + # How many steps of the model to sample before learning starts. + learning_starts=1000, + # Update the replay buffer with this many samples at once. Note that + # this setting applies per-worker if num_workers > 1. + sample_batch_size=4, + # Size of a batched sampled from replay buffer for training. Note that + # if async_updates is set, then each worker returns gradients for a + # batch of this size. + train_batch_size=32, # Smooth the current average reward over this many previous episodes. smoothing_num_episodes=100, @@ -93,23 +89,30 @@ DEFAULT_CONFIG = dict( "device_count": {"CPU": 2}, "log_device_placement": False, "allow_soft_placement": True, + "gpu_options": { + "allow_growth": True + }, "inter_op_parallelism_threads": 1, "intra_op_parallelism_threads": 1, }, # === Parallelism === - # Number of workers for collecting samples with. Note that the typical - # setting is 1 unless your environment is particularly slow to sample. - num_workers=1, + # Number of workers for collecting samples with. This only makes sense + # to increase if your environment is particularly slow to sample, or if + # you're using the Ape-X optimizer. + num_workers=0, # Whether to allocate GPUs for workers (if > 0). num_gpus_per_worker=0, - # (Experimental) Whether to update the model asynchronously from - # workers. In this mode, gradients will be computed on workers instead of - # on the driver, and workers will each have their own replay buffer. - async_updates=False, - # (Experimental) Whether to use multiple GPUs for SGD optimization. - # Note that this only helps performance if the SGD batch size is large. - multi_gpu=False) + # Optimizer class to use. + optimizer_class="LocalSyncReplayOptimizer", + # Config to pass to the optimizer. + optimizer_config=dict(), + # Whether to use a distribution of epsilons across workers for exploration. + per_worker_exploration=False, + # Whether to compute priorities on workers. + worker_side_prioritization=False, + # Whether to force evaluator actors to be placed on remote machines. + force_evaluators_remote=False) class DQNAgent(Agent): @@ -119,85 +122,60 @@ class DQNAgent(Agent): _default_config = DEFAULT_CONFIG def _init(self): - if self.config["async_updates"]: - self.local_evaluator = DQNEvaluator( - self.registry, self.env_creator, self.config, self.logdir) - remote_cls = ray.remote( - num_cpus=1, num_gpus=self.config["num_gpus_per_worker"])( - DQNReplayEvaluator) - remote_config = dict(self.config, num_workers=1) - # In async mode, we create N remote evaluators, each with their - # own replay buffer (i.e. the replay buffer is sharded). - self.remote_evaluators = [ - remote_cls.remote( - self.registry, self.env_creator, remote_config, - self.logdir) - for _ in range(self.config["num_workers"])] - optimizer_cls = AsyncOptimizer - else: - self.local_evaluator = DQNReplayEvaluator( - self.registry, self.env_creator, self.config, self.logdir) - # No remote evaluators. If num_workers > 1, the DQNReplayEvaluator - # will internally create more workers for parallelism. This means - # there is only one replay buffer regardless of num_workers. - self.remote_evaluators = [] - if self.config["multi_gpu"]: - optimizer_cls = LocalMultiGPUOptimizer - else: - optimizer_cls = LocalSyncOptimizer + self.local_evaluator = DQNEvaluator( + self.registry, self.env_creator, self.config, self.logdir, 0) + remote_cls = ray.remote( + num_cpus=1, num_gpus=self.config["num_gpus_per_worker"])( + DQNEvaluator) + self.remote_evaluators = [ + remote_cls.remote( + self.registry, self.env_creator, self.config, self.logdir, + i) + for i in range(self.config["num_workers"])] - self.optimizer = optimizer_cls( - self.config["optimizer"], self.local_evaluator, + if self.config["force_evaluators_remote"]: + _, self.remote_evaluators = split_colocated( + self.remote_evaluators) + + for k in OPTIMIZER_SHARED_CONFIGS: + if k not in self.config["optimizer_config"]: + self.config["optimizer_config"][k] = self.config[k] + + self.optimizer = getattr(optimizers, self.config["optimizer_class"])( + self.config["optimizer_config"], self.local_evaluator, self.remote_evaluators) - self.saver = tf.train.Saver(max_to_keep=None) - self.global_timestep = 0 + self.saver = tf.train.Saver(max_to_keep=None) self.last_target_update_ts = 0 self.num_target_updates = 0 + @property + def global_timestep(self): + return self.optimizer.num_steps_sampled + + def update_target_if_needed(self): + if self.global_timestep - self.last_target_update_ts > \ + self.config["target_network_update_freq"]: + self.local_evaluator.update_target() + self.last_target_update_ts = self.global_timestep + self.num_target_updates += 1 + def _train(self): start_timestep = self.global_timestep while (self.global_timestep - start_timestep < self.config["timesteps_per_iteration"]): - if self.global_timestep < self.config["learning_starts"]: - self._populate_replay_buffer() - else: - self.optimizer.step() + self.optimizer.step() + self.update_target_if_needed() - stats = self._update_global_stats() + self.local_evaluator.set_global_timestep(self.global_timestep) + for e in self.remote_evaluators: + e.set_global_timestep.remote(self.global_timestep) - if self.global_timestep - self.last_target_update_ts > \ - self.config["target_network_update_freq"]: - self.local_evaluator.update_target() - self.last_target_update_ts = self.global_timestep - self.num_target_updates += 1 + return self._train_stats(start_timestep) - mean_100ep_reward = 0.0 - mean_100ep_length = 0.0 - num_episodes = 0 - exploration = -1 - - for s in stats: - mean_100ep_reward += s["mean_100ep_reward"] / len(stats) - mean_100ep_length += s["mean_100ep_length"] / len(stats) - num_episodes += s["num_episodes"] - exploration = s["exploration"] - - result = TrainingResult( - episode_reward_mean=mean_100ep_reward, - episode_len_mean=mean_100ep_length, - episodes_total=num_episodes, - timesteps_this_iter=self.global_timestep - start_timestep, - info=dict({ - "exploration": exploration, - "num_target_updates": self.num_target_updates, - }, **self.optimizer.stats())) - - return result - - def _update_global_stats(self): + def _train_stats(self, start_timestep): if self.remote_evaluators: stats = ray.get([ e.stats.remote() for e in self.remote_evaluators]) @@ -205,13 +183,40 @@ class DQNAgent(Agent): stats = self.local_evaluator.stats() if not isinstance(stats, list): stats = [stats] - new_timestep = sum(s["local_timestep"] for s in stats) - assert new_timestep > self.global_timestep, new_timestep - self.global_timestep = new_timestep - self.local_evaluator.set_global_timestep(self.global_timestep) - for e in self.remote_evaluators: - e.set_global_timestep.remote(self.global_timestep) - return stats + + mean_100ep_reward = 0.0 + mean_100ep_length = 0.0 + num_episodes = 0 + explorations = [] + + if self.config["per_worker_exploration"]: + # Return stats from workers with the lowest 20% of exploration + test_stats = stats[-int(max(1, len(stats)*0.2)):] + else: + test_stats = stats + + for s in test_stats: + mean_100ep_reward += s["mean_100ep_reward"] / len(test_stats) + mean_100ep_length += s["mean_100ep_length"] / len(test_stats) + + for s in stats: + num_episodes += s["num_episodes"] + explorations.append(s["exploration"]) + + opt_stats = self.optimizer.stats() + + result = TrainingResult( + episode_reward_mean=mean_100ep_reward, + episode_len_mean=mean_100ep_length, + episodes_total=num_episodes, + timesteps_this_iter=self.global_timestep - start_timestep, + info=dict({ + "min_exploration": min(explorations), + "max_exploration": max(explorations), + "num_target_updates": self.num_target_updates, + }, **opt_stats)) + + return result def _populate_replay_buffer(self): if self.remote_evaluators: @@ -233,7 +238,7 @@ class DQNAgent(Agent): extra_data = [ self.local_evaluator.save(), ray.get([e.save.remote() for e in self.remote_evaluators]), - self.global_timestep, + self.optimizer.save(), self.num_target_updates, self.last_target_update_ts] pickle.dump(extra_data, open(checkpoint_path + ".extra_data", "wb")) @@ -246,7 +251,7 @@ class DQNAgent(Agent): ray.get([ e.restore.remote(d) for (d, e) in zip(extra_data[1], self.remote_evaluators)]) - self.global_timestep = extra_data[2] + self.optimizer.restore(extra_data[2]) self.num_target_updates = extra_data[3] self.last_target_update_ts = extra_data[4] diff --git a/python/ray/rllib/dqn/dqn_evaluator.py b/python/ray/rllib/dqn/dqn_evaluator.py index 2bae4aed8..ae7a3a080 100644 --- a/python/ray/rllib/dqn/dqn_evaluator.py +++ b/python/ray/rllib/dqn/dqn_evaluator.py @@ -10,8 +10,9 @@ import ray from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.dqn import models from ray.rllib.dqn.common.wrappers import wrap_dqn -from ray.rllib.dqn.common.schedules import LinearSchedule -from ray.rllib.optimizers import SampleBatch, TFMultiGPUSupport +from ray.rllib.dqn.common.schedules import ConstantSchedule, LinearSchedule +from ray.rllib.optimizers import SampleBatch, Evaluator +from ray.rllib.utils.compression import pack def adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): @@ -42,12 +43,12 @@ def adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): del arr[new_len:] -class DQNEvaluator(TFMultiGPUSupport): - """The base DQN Evaluator that does not include the replay buffer. +class DQNEvaluator(Evaluator): + """The DQN Evaluator. TODO(rliaw): Support observation/reward filters?""" - def __init__(self, registry, env_creator, config, logdir): + def __init__(self, registry, env_creator, config, logdir, worker_index): env = env_creator(config["env_config"]) env = wrap_dqn(registry, env, config["model"]) self.env = env @@ -62,13 +63,19 @@ class DQNEvaluator(TFMultiGPUSupport): self.sess = tf.Session(config=tf_config) self.dqn_graph = models.DQNGraph(registry, env, config, logdir) - # Create the schedule for exploration starting from 1. - self.exploration = LinearSchedule( - schedule_timesteps=int( - config["exploration_fraction"] * - config["schedule_max_timesteps"]), - initial_p=1.0, - final_p=config["exploration_final_eps"]) + # Use either a different `eps` per worker, or a linear schedule. + if config["per_worker_exploration"]: + assert config["num_workers"] > 1, "This requires multiple workers" + self.exploration = ConstantSchedule( + 0.4 ** ( + 1 + worker_index / float(config["num_workers"] - 1) * 7)) + else: + self.exploration = LinearSchedule( + schedule_timesteps=int( + config["exploration_fraction"] * + config["schedule_max_timesteps"]), + initial_p=1.0, + final_p=config["exploration_final_eps"]) # Initialize the parameters and copy them to the target network. self.sess.run(tf.global_variables_initializer()) @@ -112,20 +119,30 @@ class DQNEvaluator(TFMultiGPUSupport): obs, actions, rewards, new_obs, dones) batch = SampleBatch({ - "obs": obs, "actions": actions, "rewards": rewards, - "new_obs": new_obs, "dones": dones, + "obs": [pack(np.array(o)) for o in obs], "actions": actions, + "rewards": rewards, + "new_obs": [pack(np.array(o)) for o in new_obs], "dones": dones, "weights": np.ones_like(rewards)}) - assert batch.count == self.config["sample_batch_size"] + assert (batch.count == self.config["sample_batch_size"]) + + # Prioritize on the worker side + if self.config["worker_side_prioritization"]: + td_errors = self.dqn_graph.compute_td_error( + self.sess, obs, batch["actions"], batch["rewards"], + new_obs, batch["dones"], batch["weights"]) + new_priorities = ( + np.abs(td_errors) + self.config["prioritized_replay_eps"]) + batch.data["weights"] = new_priorities + return batch - def compute_gradients(self, samples): - _, grad = self.dqn_graph.compute_gradients( + def compute_apply(self, samples): + if samples is None: + return None + td_error = self.dqn_graph.compute_apply( self.sess, samples["obs"], samples["actions"], samples["rewards"], samples["new_obs"], samples["dones"], samples["weights"]) - return grad - - def apply_gradients(self, grads): - self.dqn_graph.apply_gradients(self.sess, grads) + return td_error def get_weights(self): return self.variables.get_weights() @@ -133,12 +150,6 @@ class DQNEvaluator(TFMultiGPUSupport): def set_weights(self, weights): self.variables.set_weights(weights) - def tf_loss_inputs(self): - return self.dqn_graph.loss_inputs - - def build_tf_loss(self, input_placeholders): - return self.dqn_graph.build_loss(*input_placeholders) - def _step(self, global_timestep): """Takes a single step, and returns the result of the step.""" action = self.dqn_graph.act( diff --git a/python/ray/rllib/dqn/dqn_replay_evaluator.py b/python/ray/rllib/dqn/dqn_replay_evaluator.py deleted file mode 100644 index effd0bf01..000000000 --- a/python/ray/rllib/dqn/dqn_replay_evaluator.py +++ /dev/null @@ -1,154 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np - -import ray -from ray.rllib.dqn.dqn_evaluator import DQNEvaluator -from ray.rllib.dqn.common.schedules import LinearSchedule -from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer -from ray.rllib.optimizers import SampleBatch - - -class DQNReplayEvaluator(DQNEvaluator): - """Wraps DQNEvaluators to provide replay buffer functionality. - - This has two modes: - If config["num_workers"] == 1: - Samples will be collected locally. - If config["num_workers"] > 1: - Samples will be collected from a number of remote workers. - """ - - def __init__(self, registry, env_creator, config, logdir): - DQNEvaluator.__init__(self, registry, env_creator, config, logdir) - - # Create extra workers if needed - if self.config["num_workers"] > 1: - remote_cls = ray.remote(num_cpus=1)(DQNEvaluator) - self.workers = [ - remote_cls.remote(registry, env_creator, config, logdir) - for _ in range(self.config["num_workers"])] - else: - self.workers = [] - - # Create the replay buffer - if config["prioritized_replay"]: - self.replay_buffer = PrioritizedReplayBuffer( - config["buffer_size"], - alpha=config["prioritized_replay_alpha"]) - prioritized_replay_beta_iters = \ - config["prioritized_replay_beta_iters"] - if prioritized_replay_beta_iters is None: - prioritized_replay_beta_iters = \ - config["schedule_max_timesteps"] - self.beta_schedule = LinearSchedule( - prioritized_replay_beta_iters, - initial_p=config["prioritized_replay_beta0"], - final_p=1.0) - else: - self.replay_buffer = ReplayBuffer(config["buffer_size"]) - self.beta_schedule = None - - self.samples_to_prioritize = None - - def sample(self, no_replay=False): - # First seed the replay buffer with a few new samples - if self.workers: - weights = ray.put(self.get_weights()) - for w in self.workers: - w.set_weights.remote(weights) - samples = ray.get([w.sample.remote() for w in self.workers]) - else: - samples = [DQNEvaluator.sample(self)] - - for s in samples: - for row in s.rows(): - self.replay_buffer.add( - row["obs"], row["actions"], row["rewards"], row["new_obs"], - row["dones"]) - - if no_replay: - return SampleBatch.concat_samples(samples) - - # Then return a batch sampled from the buffer - if self.config["prioritized_replay"]: - (obses_t, actions, rewards, obses_tp1, - dones, weights, batch_indexes) = self.replay_buffer.sample( - self.config["train_batch_size"], - beta=self.beta_schedule.value(self.global_timestep)) - self._update_priorities_if_needed() - batch = SampleBatch({ - "obs": obses_t, "actions": actions, "rewards": rewards, - "new_obs": obses_tp1, "dones": dones, "weights": weights, - "batch_indexes": batch_indexes}) - self.samples_to_prioritize = batch - else: - obses_t, actions, rewards, obses_tp1, dones = \ - self.replay_buffer.sample(self.config["train_batch_size"]) - batch = SampleBatch({ - "obs": obses_t, "actions": actions, "rewards": rewards, - "new_obs": obses_tp1, "dones": dones, - "weights": np.ones_like(rewards)}) - return batch - - def compute_gradients(self, samples): - td_errors, grad = self.dqn_graph.compute_gradients( - self.sess, samples["obs"], samples["actions"], samples["rewards"], - samples["new_obs"], samples["dones"], samples["weights"]) - if self.config["prioritized_replay"]: - new_priorities = ( - np.abs(td_errors) + self.config["prioritized_replay_eps"]) - self.replay_buffer.update_priorities( - samples["batch_indexes"], new_priorities) - self.samples_to_prioritize = None - return grad - - def _update_priorities_if_needed(self): - """Manually updates replay buffer priorities on the last batch. - - Note that this is only needed when not computing gradients on this - Evaluator (e.g. when using local multi-GPU). Otherwise, priorities - can be updated more efficiently as part of computing gradients. - """ - - if not self.samples_to_prioritize: - return - - batch = self.samples_to_prioritize - td_errors = self.dqn_graph.compute_td_error( - self.sess, batch["obs"], batch["actions"], batch["rewards"], - batch["new_obs"], batch["dones"], batch["weights"]) - - new_priorities = ( - np.abs(td_errors) + self.config["prioritized_replay_eps"]) - self.replay_buffer.update_priorities( - batch["batch_indexes"], new_priorities) - self.samples_to_prioritize = None - - def stats(self): - if self.workers: - return ray.get([s.stats.remote() for s in self.workers]) - else: - return DQNEvaluator.stats(self) - - def save(self): - return [ - DQNEvaluator.save(self), - ray.get([w.save.remote() for w in self.workers]), - self.beta_schedule, - self.replay_buffer] - - def restore(self, data): - DQNEvaluator.restore(self, data[0]) - for (w, d) in zip(self.workers, data[1]): - w.restore.remote(d) - self.beta_schedule = data[2] - self.replay_buffer = data[3] - - def set_global_timestep(self, global_timestep): - self.global_timestep = global_timestep - if self.workers: - ray.get([worker.set_global_timestep.remote(global_timestep) - for worker in self.workers]) diff --git a/python/ray/rllib/dqn/models.py b/python/ray/rllib/dqn/models.py index d071410da..6629b6126 100644 --- a/python/ray/rllib/dqn/models.py +++ b/python/ray/rllib/dqn/models.py @@ -2,6 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import numpy as np + import tensorflow as tf import tensorflow.contrib.layers as layers @@ -270,10 +272,10 @@ class DQNGraph(object): td_err = sess.run( self.td_error, feed_dict={ - self.obs_t: obs_t, + self.obs_t: [np.array(ob) for ob in obs_t], self.act_t: act_t, self.rew_t: rew_t, - self.obs_tp1: obs_tp1, + self.obs_tp1: [np.array(ob) for ob in obs_tp1], self.done_mask: done_mask, self.importance_weights: importance_weights }) @@ -283,3 +285,18 @@ class DQNGraph(object): assert len(grads) == len(self.grads_and_vars) feed_dict = {ph: g for (g, ph) in zip(grads, self.grads)} sess.run(self.train_expr, feed_dict=feed_dict) + + def compute_apply( + self, sess, obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + td_err, _ = sess.run( + [self.td_error, self.train_expr], + feed_dict={ + self.obs_t: obs_t, + self.act_t: act_t, + self.rew_t: rew_t, + self.obs_tp1: obs_tp1, + self.done_mask: done_mask, + self.importance_weights: importance_weights + }) + return td_err diff --git a/python/ray/rllib/optimizers/__init__.py b/python/ray/rllib/optimizers/__init__.py index a97a7e524..55bbb2670 100644 --- a/python/ray/rllib/optimizers/__init__.py +++ b/python/ray/rllib/optimizers/__init__.py @@ -1,10 +1,13 @@ +from ray.rllib.optimizers.apex_optimizer import ApexOptimizer from ray.rllib.optimizers.async import AsyncOptimizer from ray.rllib.optimizers.local_sync import LocalSyncOptimizer +from ray.rllib.optimizers.local_sync_replay import LocalSyncReplayOptimizer from ray.rllib.optimizers.multi_gpu import LocalMultiGPUOptimizer from ray.rllib.optimizers.sample_batch import SampleBatch from ray.rllib.optimizers.evaluator import Evaluator, TFMultiGPUSupport __all__ = [ - "AsyncOptimizer", "LocalSyncOptimizer", "LocalMultiGPUOptimizer", - "SampleBatch", "Evaluator", "TFMultiGPUSupport"] + "ApexOptimizer", "AsyncOptimizer", "LocalSyncOptimizer", + "LocalSyncReplayOptimizer", "LocalMultiGPUOptimizer", "SampleBatch", + "Evaluator", "TFMultiGPUSupport"] diff --git a/python/ray/rllib/optimizers/apex_optimizer.py b/python/ray/rllib/optimizers/apex_optimizer.py new file mode 100644 index 000000000..fef57ed96 --- /dev/null +++ b/python/ray/rllib/optimizers/apex_optimizer.py @@ -0,0 +1,265 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import queue +import random +import time +import threading + +import numpy as np + +import ray +from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.replay_buffer import PrioritizedReplayBuffer +from ray.rllib.optimizers.sample_batch import SampleBatch +from ray.rllib.utils.actors import TaskPool, create_colocated +from ray.rllib.utils.timer import TimerStat +from ray.rllib.utils.window_stat import WindowStat + +SAMPLE_QUEUE_DEPTH = 2 +REPLAY_QUEUE_DEPTH = 4 +LEARNER_QUEUE_MAX_SIZE = 16 + + +@ray.remote +class ReplayActor(object): + def __init__( + self, num_shards, learning_starts, buffer_size, train_batch_size, + prioritized_replay_alpha, prioritized_replay_beta, + prioritized_replay_eps): + self.replay_starts = learning_starts // num_shards + self.buffer_size = buffer_size // num_shards + self.train_batch_size = train_batch_size + self.prioritized_replay_beta = prioritized_replay_beta + self.prioritized_replay_eps = prioritized_replay_eps + + self.replay_buffer = PrioritizedReplayBuffer( + buffer_size, alpha=prioritized_replay_alpha) + + # Metrics + self.add_batch_timer = TimerStat() + self.replay_timer = TimerStat() + self.update_priorities_timer = TimerStat() + + def get_host(self): + return os.uname()[1] + + def add_batch(self, batch): + with self.add_batch_timer: + for row in batch.rows(): + self.replay_buffer.add( + row["obs"], row["actions"], row["rewards"], row["new_obs"], + row["dones"], row["weights"]) + + def replay(self): + with self.replay_timer: + if len(self.replay_buffer) < self.replay_starts: + return None + + (obses_t, actions, rewards, obses_tp1, + dones, weights, batch_indexes) = self.replay_buffer.sample( + self.train_batch_size, + beta=self.prioritized_replay_beta) + + batch = SampleBatch({ + "obs": obses_t, "actions": actions, "rewards": rewards, + "new_obs": obses_tp1, "dones": dones, "weights": weights, + "batch_indexes": batch_indexes}) + return batch + + def update_priorities(self, batch, td_errors): + with self.update_priorities_timer: + new_priorities = ( + np.abs(td_errors) + self.prioritized_replay_eps) + self.replay_buffer.update_priorities( + batch["batch_indexes"], new_priorities) + + def stats(self): + stat = { + "add_batch_time_ms": round( + 1000 * self.add_batch_timer.mean, 3), + "replay_time_ms": round( + 1000 * self.replay_timer.mean, 3), + "update_priorities_time_ms": round( + 1000 * self.update_priorities_timer.mean, 3), + } + stat.update(self.replay_buffer.stats()) + return stat + + +class GenericLearner(threading.Thread): + def __init__(self, local_evaluator): + threading.Thread.__init__(self) + self.learner_queue_size = WindowStat("size", 50) + self.local_evaluator = local_evaluator + self.inqueue = queue.Queue(maxsize=LEARNER_QUEUE_MAX_SIZE) + self.outqueue = queue.Queue() + self.queue_timer = TimerStat() + self.grad_timer = TimerStat() + self.daemon = True + + def run(self): + while True: + self.step() + + def step(self): + with self.queue_timer: + ra, replay = self.inqueue.get() + if replay is not None: + with self.grad_timer: + td_error = self.local_evaluator.compute_apply(replay) + self.outqueue.put((ra, replay, td_error)) + self.learner_queue_size.push(self.inqueue.qsize()) + + +class ApexOptimizer(Optimizer): + + def _init( + self, learning_starts=1000, buffer_size=10000, + prioritized_replay=True, prioritized_replay_alpha=0.6, + prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, + train_batch_size=512, sample_batch_size=50, + num_replay_buffer_shards=1, max_weight_sync_delay=400): + + self.replay_starts = learning_starts + self.prioritized_replay_beta = prioritized_replay_beta + self.prioritized_replay_eps = prioritized_replay_eps + self.train_batch_size = train_batch_size + self.sample_batch_size = sample_batch_size + self.max_weight_sync_delay = max_weight_sync_delay + + self.learner = GenericLearner(self.local_evaluator) + self.learner.start() + + self.replay_actors = create_colocated( + ReplayActor, + [num_replay_buffer_shards, learning_starts, buffer_size, + train_batch_size, prioritized_replay_alpha, + prioritized_replay_beta, prioritized_replay_eps], + num_replay_buffer_shards) + assert len(self.remote_evaluators) > 0 + + # Stats + self.timers = {k: TimerStat() for k in [ + "put_weights", "get_samples", "enqueue", "sample_processing", + "replay_processing", "update_priorities", "train", "sample"]} + self.meters = {k: WindowStat(k, 10) for k in [ + "samples_per_loop", "replays_per_loop", "reprios_per_loop", + "reweights_per_loop"]} + self.num_weight_syncs = 0 + self.learning_started = False + + # Number of worker steps since the last weight update + self.steps_since_update = {} + + # Otherwise kick of replay tasks for local gradient updates + self.replay_tasks = TaskPool() + for ra in self.replay_actors: + for _ in range(REPLAY_QUEUE_DEPTH): + self.replay_tasks.add(ra, ra.replay.remote()) + + # Kick off async background sampling + self.sample_tasks = TaskPool() + weights = self.local_evaluator.get_weights() + for ev in self.remote_evaluators: + ev.set_weights.remote(weights) + self.steps_since_update[ev] = 0 + for _ in range(SAMPLE_QUEUE_DEPTH): + self.sample_tasks.add(ev, ev.sample.remote()) + + def step(self): + start = time.time() + sample_timesteps, train_timesteps = self._step() + time_delta = time.time() - start + self.timers["sample"].push(time_delta) + self.timers["sample"].push_units_processed(sample_timesteps) + if train_timesteps > 0: + self.learning_started = True + if self.learning_started: + self.timers["train"].push(time_delta) + self.timers["train"].push_units_processed(train_timesteps) + self.num_steps_sampled += sample_timesteps + self.num_steps_trained += train_timesteps + + def _step(self): + sample_timesteps, train_timesteps = 0, 0 + weights = None + + with self.timers["sample_processing"]: + i = 0 + num_weight_syncs = 0 + for ev, sample_batch in self.sample_tasks.completed(): + i += 1 + sample_timesteps += self.sample_batch_size + + # Send the data to the replay buffer + random.choice(self.replay_actors).add_batch.remote( + sample_batch) + + # Update weights if needed + self.steps_since_update[ev] += self.sample_batch_size + if self.steps_since_update[ev] >= self.max_weight_sync_delay: + if weights is None: + with self.timers["put_weights"]: + weights = ray.put( + self.local_evaluator.get_weights()) + ev.set_weights.remote(weights) + self.num_weight_syncs += 1 + num_weight_syncs += 1 + self.steps_since_update[ev] = 0 + + # Kick off another sample request + self.sample_tasks.add(ev, ev.sample.remote()) + self.meters["samples_per_loop"].push(i) + self.meters["reweights_per_loop"].push(num_weight_syncs) + + with self.timers["replay_processing"]: + i = 0 + for ra, replay in self.replay_tasks.completed(): + i += 1 + self.replay_tasks.add(ra, ra.replay.remote()) + with self.timers["get_samples"]: + samples = ray.get(replay) + with self.timers["enqueue"]: + self.learner.inqueue.put((ra, samples)) + self.meters["replays_per_loop"].push(i) + + with self.timers["update_priorities"]: + i = 0 + while not self.learner.outqueue.empty(): + i += 1 + ra, replay, td_error = self.learner.outqueue.get() + ra.update_priorities.remote(replay, td_error) + train_timesteps += self.train_batch_size + self.meters["reprios_per_loop"].push(i) + + return sample_timesteps, train_timesteps + + def stats(self): + replay_stats = ray.get(self.replay_actors[0].stats.remote()) + timing = { + "{}_time_ms".format(k): round(1000 * self.timers[k].mean, 3) + for k in self.timers + } + timing["learner_grad_time_ms"] = round( + 1000 * self.learner.grad_timer.mean, 3) + timing["learner_dequeue_time_ms"] = round( + 1000 * self.learner.queue_timer.mean, 3) + stats = { + "replay_shard_0": replay_stats, + "timing_breakdown": timing, + "sample_throughput": round( + self.timers["sample"].mean_throughput, 3), + "train_throughput": round(self.timers["train"].mean_throughput, 3), + "num_weight_syncs": self.num_weight_syncs, + "pending_sample_tasks": self.sample_tasks.count, + "pending_replay_tasks": self.replay_tasks.count, + "learner_queue": self.learner.learner_queue_size.stats(), + "samples": self.meters["samples_per_loop"].stats(), + "replays": self.meters["replays_per_loop"].stats(), + "reprios": self.meters["reprios_per_loop"].stats(), + "reweights": self.meters["reweights_per_loop"].stats(), + } + return dict(Optimizer.stats(self), **stats) diff --git a/python/ray/rllib/optimizers/async.py b/python/ray/rllib/optimizers/async.py index b48fed3f8..b51e96ba5 100644 --- a/python/ray/rllib/optimizers/async.py +++ b/python/ray/rllib/optimizers/async.py @@ -14,11 +14,12 @@ class AsyncOptimizer(Optimizer): evaluators, sending updated weights back as needed. This pipelines the gradient computations on the remote workers. """ - def _init(self): + def _init(self, grads_per_step=100, batch_size=10): self.apply_timer = TimerStat() self.wait_timer = TimerStat() self.dispatch_timer = TimerStat() - self.grads_per_step = self.config.get("grads_per_step", 100) + self.grads_per_step = grads_per_step + self.batch_size = batch_size def step(self): weights = ray.put(self.local_evaluator.get_weights()) @@ -49,9 +50,12 @@ class AsyncOptimizer(Optimizer): gradient_queue.append((fut, e)) num_gradients += 1 + self.num_steps_sampled += self.grads_per_step * self.batch_size + self.num_steps_trained += self.grads_per_step * self.batch_size + def stats(self): - return { + return dict(Optimizer.stats(), **{ "wait_time_ms": round(1000 * self.wait_timer.mean, 3), "apply_time_ms": round(1000 * self.apply_timer.mean, 3), "dispatch_time_ms": round(1000 * self.dispatch_timer.mean, 3), - } + }) diff --git a/python/ray/rllib/optimizers/evaluator.py b/python/ray/rllib/optimizers/evaluator.py index 165010bf3..ee079fcd7 100644 --- a/python/ray/rllib/optimizers/evaluator.py +++ b/python/ray/rllib/optimizers/evaluator.py @@ -2,6 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import os + class Evaluator(object): """Algorithms implement this interface to leverage RLlib optimizers. @@ -62,6 +64,22 @@ class Evaluator(object): raise NotImplementedError + def compute_apply(self, samples): + """Fused compute and apply gradients on given samples. + + Returns: + The result of calling compute_gradients(samples) + """ + + grads = self.compute_gradients(samples) + self.apply_gradients(grads) + return grads + + def get_host(self): + """Returns hostname of actor.""" + + return os.uname()[1] + class TFMultiGPUSupport(Evaluator): """The multi-GPU TF optimizer requires additional TF-specific supportt. diff --git a/python/ray/rllib/optimizers/local_sync.py b/python/ray/rllib/optimizers/local_sync.py index d14ee6fa0..0ca8cb39e 100644 --- a/python/ray/rllib/optimizers/local_sync.py +++ b/python/ray/rllib/optimizers/local_sync.py @@ -5,6 +5,7 @@ from __future__ import print_function import ray from ray.rllib.optimizers.optimizer import Optimizer from ray.rllib.optimizers.sample_batch import SampleBatch +from ray.rllib.utils.filter import RunningStat from ray.rllib.utils.timer import TimerStat @@ -16,10 +17,12 @@ class LocalSyncOptimizer(Optimizer): model weights are then broadcast to all remote evaluators. """ - def _init(self): + def _init(self, batch_size=32): self.update_weights_timer = TimerStat() self.sample_timer = TimerStat() self.grad_timer = TimerStat() + self.throughput = RunningStat() + self.batch_size = batch_size def step(self): with self.update_weights_timer: @@ -39,10 +42,16 @@ class LocalSyncOptimizer(Optimizer): with self.grad_timer: grad = self.local_evaluator.compute_gradients(samples) self.local_evaluator.apply_gradients(grad) + self.grad_timer.push_units_processed(samples.count) + + self.num_steps_sampled += samples.count + self.num_steps_trained += samples.count def stats(self): - return { + return dict(Optimizer.stats(self), **{ "sample_time_ms": round(1000 * self.sample_timer.mean, 3), "grad_time_ms": round(1000 * self.grad_timer.mean, 3), "update_time_ms": round(1000 * self.update_weights_timer.mean, 3), - } + "opt_peak_throughput": round(self.grad_timer.mean_throughput, 3), + "opt_samples": round(self.grad_timer.mean_units_processed, 3), + }) diff --git a/python/ray/rllib/optimizers/local_sync_replay.py b/python/ray/rllib/optimizers/local_sync_replay.py new file mode 100644 index 000000000..7222584b4 --- /dev/null +++ b/python/ray/rllib/optimizers/local_sync_replay.py @@ -0,0 +1,109 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + +import ray +from ray.rllib.optimizers.replay_buffer import ReplayBuffer, \ + PrioritizedReplayBuffer +from ray.rllib.optimizers.optimizer import Optimizer +from ray.rllib.optimizers.sample_batch import SampleBatch +from ray.rllib.utils.filter import RunningStat +from ray.rllib.utils.timer import TimerStat + + +class LocalSyncReplayOptimizer(Optimizer): + """Variant of the local sync optimizer that supports replay (for DQN).""" + + def _init( + self, learning_starts=1000, buffer_size=10000, + prioritized_replay=True, prioritized_replay_alpha=0.6, + prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, + train_batch_size=32, sample_batch_size=4): + + self.replay_starts = learning_starts + self.prioritized_replay_beta = prioritized_replay_beta + self.prioritized_replay_eps = prioritized_replay_eps + self.train_batch_size = train_batch_size + + # Stats + self.update_weights_timer = TimerStat() + self.sample_timer = TimerStat() + self.replay_timer = TimerStat() + self.grad_timer = TimerStat() + self.throughput = RunningStat() + + # Set up replay buffer + if prioritized_replay: + self.replay_buffer = PrioritizedReplayBuffer( + buffer_size, + alpha=prioritized_replay_alpha) + else: + self.replay_buffer = ReplayBuffer(buffer_size) + + assert buffer_size >= self.replay_starts + + def step(self): + with self.update_weights_timer: + if self.remote_evaluators: + weights = ray.put(self.local_evaluator.get_weights()) + for e in self.remote_evaluators: + e.set_weights.remote(weights) + + with self.sample_timer: + if self.remote_evaluators: + batch = SampleBatch.concat_samples( + ray.get( + [e.sample.remote() for e in self.remote_evaluators])) + else: + batch = self.local_evaluator.sample() + for row in batch.rows(): + self.replay_buffer.add( + row["obs"], row["actions"], row["rewards"], row["new_obs"], + row["dones"], row["weights"]) + + if len(self.replay_buffer) >= self.replay_starts: + self._optimize() + + self.num_steps_sampled += batch.count + + def _optimize(self): + with self.replay_timer: + if isinstance(self.replay_buffer, PrioritizedReplayBuffer): + (obses_t, actions, rewards, obses_tp1, + dones, weights, batch_indexes) = self.replay_buffer.sample( + self.train_batch_size, + beta=self.prioritized_replay_beta) + else: + (obses_t, actions, rewards, obses_tp1, + dones) = self.replay_buffer.sample( + self.train_batch_size) + weights = np.ones_like(rewards) + batch_indexes = - np.ones_like(rewards) + + samples = SampleBatch({ + "obs": obses_t, "actions": actions, "rewards": rewards, + "new_obs": obses_tp1, "dones": dones, "weights": weights, + "batch_indexes": batch_indexes}) + + with self.grad_timer: + td_error = self.local_evaluator.compute_apply(samples) + new_priorities = ( + np.abs(td_error) + self.prioritized_replay_eps) + if isinstance(self.replay_buffer, PrioritizedReplayBuffer): + self.replay_buffer.update_priorities( + samples["batch_indexes"], new_priorities) + self.grad_timer.push_units_processed(samples.count) + + self.num_steps_trained += samples.count + + def stats(self): + return dict(Optimizer.stats(self), **{ + "sample_time_ms": round(1000 * self.sample_timer.mean, 3), + "replay_time_ms": round(1000 * self.replay_timer.mean, 3), + "grad_time_ms": round(1000 * self.grad_timer.mean, 3), + "update_time_ms": round(1000 * self.update_weights_timer.mean, 3), + "opt_peak_throughput": round(self.grad_timer.mean_throughput, 3), + "opt_samples": round(self.grad_timer.mean_units_processed, 3), + }) diff --git a/python/ray/rllib/optimizers/multi_gpu.py b/python/ray/rllib/optimizers/multi_gpu.py index 6cdbf97cf..05274133d 100644 --- a/python/ray/rllib/optimizers/multi_gpu.py +++ b/python/ray/rllib/optimizers/multi_gpu.py @@ -26,9 +26,11 @@ class LocalMultiGPUOptimizer(Optimizer): the TFMultiGPUSupport API. """ - def _init(self): + def _init(self, sgd_batch_size=128, sgd_stepsize=5e-5, num_sgd_iter=10): assert isinstance(self.local_evaluator, TFMultiGPUSupport) - self.batch_size = self.config.get("sgd_batch_size", 128) + self.batch_size = sgd_batch_size + self.sgd_stepsize = sgd_stepsize + self.num_sgd_iter = num_sgd_iter gpu_ids = ray.get_gpu_ids() if not gpu_ids: self.devices = ["/cpu:0"] @@ -51,12 +53,12 @@ class LocalMultiGPUOptimizer(Optimizer): tf.get_variable_scope().reuse_variables() self.par_opt = LocalSyncParallelOptimizer( - tf.train.AdamOptimizer(self.config.get("sgd_stepsize", 5e-5)), + tf.train.AdamOptimizer(self.sgd_stepsize), self.devices, [ph for _, ph in self.loss_inputs], self.per_device_batch_size, lambda *ph: self.local_evaluator.build_tf_loss(ph), - self.config.get("logdir", os.getcwd())) + os.getcwd()) self.sess = self.local_evaluator.sess self.sess.run(tf.global_variables_initializer()) @@ -83,7 +85,7 @@ class LocalMultiGPUOptimizer(Optimizer): samples.columns([key for key, _ in self.loss_inputs])) with self.grad_timer: - for i in range(self.config.get("num_sgd_iter", 10)): + for i in range(self.num_sgd_iter): batch_index = 0 num_batches = ( int(tuples_per_device) // int(self.per_device_batch_size)) @@ -96,10 +98,13 @@ class LocalMultiGPUOptimizer(Optimizer): permutation[batch_index] * self.per_device_batch_size) batch_index += 1 + self.num_steps_sampled += samples.count + self.num_steps_trained += samples.count + def stats(self): - return { + return dict(Optimizer.stats(), **{ "sample_time_ms": round(1000 * self.sample_timer.mean, 3), "load_time_ms": round(1000 * self.load_timer.mean, 3), "grad_time_ms": round(1000 * self.grad_timer.mean, 3), "update_time_ms": round(1000 * self.update_weights_timer.mean, 3), - } + }) diff --git a/python/ray/rllib/optimizers/optimizer.py b/python/ray/rllib/optimizers/optimizer.py index 74b87453a..2be7a2a86 100644 --- a/python/ray/rllib/optimizers/optimizer.py +++ b/python/ray/rllib/optimizers/optimizer.py @@ -2,6 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import ray + class Optimizer(object): """RLlib optimizers encapsulate distributed RL optimization strategies. @@ -16,20 +18,45 @@ class Optimizer(object): environment and compute model gradient updates. """ + @classmethod + def make( + cls, evaluator_cls, evaluator_args, num_workers, optimizer_config): + """Create evaluators and an optimizer instance using those evaluators. + + Args: + evaluator_cls (class): Python class of the evaluators to create. + evaluator_args (list): List of constructor args for the evaluators. + num_workers (int): Number of remote evaluators to create in + addition to a local evaluator. This can be zero or greater. + optimizer_config (dict): Keyword arguments to pass to the + optimizer class constructor. + """ + + local_evaluator = evaluator_cls(*evaluator_args) + remote_cls = ray.remote(num_cpus=1)(evaluator_cls) + remote_evaluators = [ + remote_cls.remote(*evaluator_args) + for _ in range(num_workers)] + return cls(optimizer_config, local_evaluator, remote_evaluators) + def __init__(self, config, local_evaluator, remote_evaluators): """Create an optimizer instance. Args: - config (dict): Optimizer-specific configuration data. + config (dict): Optimizer-specific arguments. local_evaluator (Evaluator): Local evaluator instance, required. - remote_evaluators (list): A list of handles to remote evaluators. - if empty, the optimizer should fall back to to using only the - local evaluator. + remote_evaluators (list): A list of Ray actor handles to remote + evaluators instances. If empty, the optimizer should fall back + to using only the local evaluator. """ self.config = config self.local_evaluator = local_evaluator self.remote_evaluators = remote_evaluators - self._init() + self._init(**config) + + # Counters that should be updated by sub-classes + self.num_steps_trained = 0 + self.num_steps_sampled = 0 def _init(self): pass @@ -42,4 +69,14 @@ class Optimizer(object): def stats(self): """Returns a dictionary of internal performance statistics.""" - return {} + return { + "num_steps_trained": self.num_steps_trained, + "num_steps_sampled": self.num_steps_sampled, + } + + def save(self): + return [self.num_steps_trained, self.num_steps_sampled] + + def restore(self, data): + self.num_steps_trained = data[0] + self.num_steps_sampled = data[1] diff --git a/python/ray/rllib/dqn/replay_buffer.py b/python/ray/rllib/optimizers/replay_buffer.py similarity index 75% rename from python/ray/rllib/dqn/replay_buffer.py rename to python/ray/rllib/optimizers/replay_buffer.py index 1d4c49b2f..513ee7c4e 100644 --- a/python/ray/rllib/dqn/replay_buffer.py +++ b/python/ray/rllib/optimizers/replay_buffer.py @@ -4,8 +4,11 @@ from __future__ import print_function import numpy as np import random +import sys -from ray.rllib.dqn.common.segment_tree import SumSegmentTree, MinSegmentTree +from ray.rllib.optimizers.segment_tree import SumSegmentTree, MinSegmentTree +from ray.rllib.utils.compression import unpack +from ray.rllib.utils.window_stat import WindowStat class ReplayBuffer(object): @@ -21,29 +24,43 @@ class ReplayBuffer(object): self._storage = [] self._maxsize = size self._next_idx = 0 + self._hit_count = np.zeros(size) + self._eviction_started = False + self._num_added = 0 + self._num_sampled = 0 + self._evicted_hit_stats = WindowStat("evicted_hit", 1000) + self._est_size_bytes = 0 def __len__(self): return len(self._storage) - def add(self, obs_t, action, reward, obs_tp1, done): + def add(self, obs_t, action, reward, obs_tp1, done, weight): data = (obs_t, action, reward, obs_tp1, done) + self._num_added += 1 if self._next_idx >= len(self._storage): self._storage.append(data) + self._est_size_bytes += sum([sys.getsizeof(d) for d in data]) else: self._storage[self._next_idx] = data + if self._next_idx + 1 >= self._maxsize: + self._eviction_started = True self._next_idx = (self._next_idx + 1) % self._maxsize + if self._eviction_started: + self._evicted_hit_stats.push(self._hit_count[self._next_idx]) + self._hit_count[self._next_idx] = 0 def _encode_sample(self, idxes): obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], [] for i in idxes: data = self._storage[i] obs_t, action, reward, obs_tp1, done = data - obses_t.append(np.array(obs_t, copy=False)) + obses_t.append(np.array(unpack(obs_t), copy=False)) actions.append(np.array(action, copy=False)) rewards.append(reward) - obses_tp1.append(np.array(obs_tp1, copy=False)) + obses_tp1.append(np.array(unpack(obs_tp1), copy=False)) dones.append(done) + self._hit_count[i] += 1 return (np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)) @@ -71,8 +88,19 @@ class ReplayBuffer(object): """ idxes = [random.randint(0, len(self._storage) - 1) for _ in range(batch_size)] + self._num_sampled += batch_size return self._encode_sample(idxes) + def stats(self): + data = { + "added_count": self._num_added, + "sampled_count": self._num_sampled, + "est_size_bytes": self._est_size_bytes, + "num_entries": len(self._storage), + } + data.update(self._evicted_hit_stats.stats()) + return data + class PrioritizedReplayBuffer(ReplayBuffer): def __init__(self, size, alpha): @@ -102,13 +130,17 @@ class PrioritizedReplayBuffer(ReplayBuffer): self._it_sum = SumSegmentTree(it_capacity) self._it_min = MinSegmentTree(it_capacity) self._max_priority = 1.0 + self._prio_change_stats = WindowStat("reprio", 1000) - def add(self, *args, **kwargs): + def add(self, obs_t, action, reward, obs_tp1, done, weight): """See ReplayBuffer.store_effect""" idx = self._next_idx - super(PrioritizedReplayBuffer, self).add(*args, **kwargs) - self._it_sum[idx] = self._max_priority ** self._alpha - self._it_min[idx] = self._max_priority ** self._alpha + super(PrioritizedReplayBuffer, self).add( + obs_t, action, reward, obs_tp1, done, weight) + if weight is None: + weight = self._max_priority + self._it_sum[idx] = weight ** self._alpha + self._it_min[idx] = weight ** self._alpha def _sample_proportional(self, batch_size): res = [] @@ -157,6 +189,7 @@ class PrioritizedReplayBuffer(ReplayBuffer): idexes in buffer of sampled experiences """ assert beta > 0 + self._num_sampled += batch_size idxes = self._sample_proportional(batch_size) @@ -191,7 +224,14 @@ class PrioritizedReplayBuffer(ReplayBuffer): for idx, priority in zip(idxes, priorities): assert priority > 0 assert 0 <= idx < len(self._storage) + delta = priority ** self._alpha - self._it_sum[idx] + self._prio_change_stats.push(delta) self._it_sum[idx] = priority ** self._alpha self._it_min[idx] = priority ** self._alpha self._max_priority = max(self._max_priority, priority) + + def stats(self): + parent = ReplayBuffer.stats(self) + parent.update(self._prio_change_stats.stats()) + return parent diff --git a/python/ray/rllib/optimizers/sample_batch.py b/python/ray/rllib/optimizers/sample_batch.py index 510234171..e3b1c0b98 100644 --- a/python/ray/rllib/optimizers/sample_batch.py +++ b/python/ray/rllib/optimizers/sample_batch.py @@ -37,7 +37,7 @@ class SampleBatch(object): def concat_samples(samples): out = {} for k in samples[0].data.keys(): - out[k] = np.concatenate([arrayify(s.data[k]) for s in samples]) + out[k] = np.concatenate([s.data[k] for s in samples]) return SampleBatch(out) def concat(self, other): diff --git a/python/ray/rllib/dqn/common/segment_tree.py b/python/ray/rllib/optimizers/segment_tree.py similarity index 100% rename from python/ray/rllib/dqn/common/segment_tree.py rename to python/ray/rllib/optimizers/segment_tree.py diff --git a/python/ray/rllib/dqn/common/tests/test_segment_tree.py b/python/ray/rllib/optimizers/tests/test_segment_tree.py similarity index 97% rename from python/ray/rllib/dqn/common/tests/test_segment_tree.py rename to python/ray/rllib/optimizers/tests/test_segment_tree.py index 13a920d92..f2a72c648 100644 --- a/python/ray/rllib/dqn/common/tests/test_segment_tree.py +++ b/python/ray/rllib/optimizers/tests/test_segment_tree.py @@ -4,7 +4,7 @@ from __future__ import print_function import numpy as np -from ray.rllib.dqn.common.segment_tree import SumSegmentTree, MinSegmentTree +from ray.rllib.optimizers.segment_tree import SumSegmentTree, MinSegmentTree def test_tree_set(): diff --git a/python/ray/rllib/pg/pg.py b/python/ray/rllib/pg/pg.py index aa82681cc..1f4b53b1f 100644 --- a/python/ray/rllib/pg/pg.py +++ b/python/ray/rllib/pg/pg.py @@ -6,7 +6,7 @@ import numpy as np import ray from ray.rllib.optimizers import LocalSyncOptimizer -from ray.rllib.pg.pg_evaluator import PGEvaluator, RemotePGEvaluator +from ray.rllib.pg.pg_evaluator import PGEvaluator from ray.rllib.agent import Agent from ray.tune.result import TrainingResult @@ -22,10 +22,7 @@ DEFAULT_CONFIG = { # Learning rate "lr": 0.0004, # Arguments to pass to the rllib optimizer - "optimizer": { - # Number of gradients applied for each `train` step - "grads_per_step": 1, - }, + "optimizer": {}, # Model parameters "model": {"fcnet_hiddens": [128, 128]}, # Arguments to pass to the env creator @@ -45,15 +42,11 @@ class PGAgent(Agent): _default_config = DEFAULT_CONFIG def _init(self): - self.local_evaluator = PGEvaluator( - self.registry, self.env_creator, self.config) - self.remote_evaluators = [ - RemotePGEvaluator.remote( - self.registry, self.env_creator, self.config) - for _ in range(self.config["num_workers"])] - self.optimizer = LocalSyncOptimizer( - self.config["optimizer"], self.local_evaluator, - self.remote_evaluators) + self.optimizer = LocalSyncOptimizer.make( + evaluator_cls=PGEvaluator, + evaluator_args=[self.registry, self.env_creator, self.config], + num_workers=self.config["num_workers"], + optimizer_config=self.config["optimizer"]) def _train(self): self.optimizer.step() @@ -61,7 +54,7 @@ class PGAgent(Agent): episode_rewards = [] episode_lengths = [] metric_lists = [a.get_completed_rollout_metrics.remote() - for a in self.remote_evaluators] + for a in self.optimizer.remote_evaluators] for metrics in metric_lists: for episode in ray.get(metrics): episode_lengths.append(episode.episode_length) @@ -79,5 +72,5 @@ class PGAgent(Agent): return result def compute_action(self, obs): - action, info = self.local_evaluator.policy.compute(obs) + action, info = self.optimizer.local_evaluator.policy.compute(obs) return action diff --git a/python/ray/rllib/pg/pg_evaluator.py b/python/ray/rllib/pg/pg_evaluator.py index 6ab6b0b67..4fa58b4bd 100644 --- a/python/ray/rllib/pg/pg_evaluator.py +++ b/python/ray/rllib/pg/pg_evaluator.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import ray from ray.rllib.models.catalog import ModelCatalog from ray.rllib.optimizers import Evaluator from ray.rllib.pg.policy import PGPolicy @@ -55,6 +54,3 @@ class PGEvaluator(Evaluator): def set_weights(self, weights): """Sets model weights.""" return self.policy.set_weights(weights) - - -RemotePGEvaluator = ray.remote(PGEvaluator) diff --git a/python/ray/rllib/test/test_supported_spaces.py b/python/ray/rllib/test/test_supported_spaces.py index b9f9ff933..109b585f8 100644 --- a/python/ray/rllib/test/test_supported_spaces.py +++ b/python/ray/rllib/test/test_supported_spaces.py @@ -122,7 +122,7 @@ class ModelSupportedSpaces(unittest.TestCase): stats) check_support( "PG", - {"num_workers": 1, "optimizer": {"grads_per_step": 1}}, + {"num_workers": 1, "optimizer": {}}, stats) num_unexpected_errors = 0 num_unexpected_success = 0 diff --git a/python/ray/rllib/tuned_examples/pong-apex.yaml b/python/ray/rllib/tuned_examples/pong-apex.yaml new file mode 100644 index 000000000..9a324a0eb --- /dev/null +++ b/python/ray/rllib/tuned_examples/pong-apex.yaml @@ -0,0 +1,14 @@ +pong-apex: + env: Pong-v0 + run: APEX + resources: + cpu: + eval: spec.config.num_workers + gpu: 1 + config: + force_evaluators_remote: True # requires cluster + num_workers: 32 + lr: .0001 + gamma: 0.99 + model: + grayscale: True diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py new file mode 100644 index 000000000..4762e58aa --- /dev/null +++ b/python/ray/rllib/utils/actors.py @@ -0,0 +1,57 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import ray + + +class TaskPool(object): + def __init__(self): + self._tasks = {} + + def add(self, worker, obj_id): + self._tasks[obj_id] = worker + + def completed(self): + pending = list(self._tasks) + if pending: + ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10) + for obj_id in ready: + yield (self._tasks.pop(obj_id), obj_id) + + @property + def count(self): + return len(self._tasks) + + +def split_colocated(actors): + localhost = os.uname()[1] + hosts = ray.get([a.get_host.remote() for a in actors]) + local = [] + non_local = [] + for host, a in zip(hosts, actors): + if host == localhost: + local.append(a) + else: + non_local.append(a) + return local, non_local + + +def try_create_colocated(cls, args, count): + actors = [cls.remote(*args) for _ in range(count)] + local, _ = split_colocated(actors) + print("Got {} colocated actors of {}".format(len(local), count)) + return local + + +def create_colocated(cls, args, count): + ok = [] + i = 1 + while len(ok) < count and i < 10: + attempt = try_create_colocated(cls, args, count * i) + ok.extend(attempt) + i += 1 + if len(ok) < count: + raise Exception("Unable to create enough colocated actors, abort.") + return ok[:count] diff --git a/python/ray/rllib/utils/compression.py b/python/ray/rllib/utils/compression.py new file mode 100644 index 000000000..dcf07eb6a --- /dev/null +++ b/python/ray/rllib/utils/compression.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import base64 +import pyarrow + +try: + import snappy + SNAPPY_ENABLED = True +except ImportError: + print("WARNING: python-snappy not available, disabling sample compression") + SNAPPY_ENABLED = False + + +def pack(data): + if SNAPPY_ENABLED: + data = snappy.compress( + pyarrow.serialize(data).to_buffer().to_pybytes()) + # TODO(ekl) we shouldn't need to base64 encode this data, but this + # seems to not survive a transfer through the object store if we don't. + return base64.b64encode(data) + else: + return data + + +def unpack(data): + if SNAPPY_ENABLED: + data = base64.b64decode(data) + return pyarrow.deserialize(snappy.decompress(data)) + else: + return data diff --git a/python/ray/rllib/utils/timer.py b/python/ray/rllib/utils/timer.py index 98770b671..5dc4e328c 100644 --- a/python/ray/rllib/utils/timer.py +++ b/python/ray/rllib/utils/timer.py @@ -4,23 +4,27 @@ from __future__ import print_function import time -from ray.rllib.utils.filter import RunningStat +import numpy as np -class TimerStat(RunningStat): +class TimerStat(object): """A running stat for conveniently logging the duration of a code block. Example: - wait_timer = TimeStat() + wait_timer = TimerStat() with wait_timer: ray.wait(...) Note that this class is *not* thread-safe. """ - def __init__(self): - RunningStat.__init__(self, ()) + def __init__(self, window_size=10): + self._window_size = window_size + self._samples = [] + self._units_processed = [] self._start_time = None + self._total_time = 0.0 + self.count = 0 def __enter__(self): assert self._start_time is None, "concurrent updates not supported" @@ -28,5 +32,33 @@ class TimerStat(RunningStat): def __exit__(self, type, value, tb): assert self._start_time is not None - self.push(time.time() - self._start_time) + time_delta = time.time() - self._start_time + self.push(time_delta) self._start_time = None + + def push(self, time_delta): + self._samples.append(time_delta) + if len(self._samples) > self._window_size: + self._samples.pop(0) + self.count += 1 + self._total_time += time_delta + + def push_units_processed(self, n): + self._units_processed.append(n) + if len(self._units_processed) > self._window_size: + self._units_processed.pop(0) + + @property + def mean(self): + return np.mean(self._samples) + + @property + def mean_units_processed(self): + return float(np.mean(self._units_processed)) + + @property + def mean_throughput(self): + time_total = sum(self._samples) + if not time_total: + return 0.0 + return sum(self._units_processed) / time_total diff --git a/python/ray/rllib/utils/window_stat.py b/python/ray/rllib/utils/window_stat.py new file mode 100644 index 000000000..ed1d99c46 --- /dev/null +++ b/python/ray/rllib/utils/window_stat.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + + +class WindowStat(object): + def __init__(self, name, n): + self.name = name + self.items = [None] * n + self.idx = 0 + self.count = 0 + + def push(self, obj): + self.items[self.idx] = obj + self.idx += 1 + self.count += 1 + self.idx %= len(self.items) + + def stats(self): + if not self.count: + quantiles = [] + else: + quantiles = np.percentile( + self.items[:self.count], [0, 10, 50, 90, 100]).tolist() + return { + self.name + "_count": int(self.count), + self.name + "_mean": float(np.mean(self.items[:self.count])), + self.name + "_std": float(np.std(self.items[:self.count])), + self.name + "_quantiles": quantiles, + } diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 5aad51cea..77c1aa5bf 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -100,7 +100,7 @@ def pretty_print(result): if v is not None: out[k] = v if yaml: - return yaml.dump(out, default_flow_style=False) + return yaml.safe_dump(out, default_flow_style=False) else: return json.dumps(out) + "\n" diff --git a/python/setup.py b/python/setup.py index 582f91937..990975da0 100644 --- a/python/setup.py +++ b/python/setup.py @@ -41,7 +41,9 @@ else: optional_ray_files += ray_ui_files extras = { - "rllib": ["tensorflow", "pyyaml", "gym[atari]", "opencv-python", "scipy"] + "rllib": [ + "tensorflow", "pyyaml", "gym[atari]", "opencv-python", + "python-snappy", "scipy"] } diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index ec0fe2103..ea3287082 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -110,14 +110,14 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 2}' \ - --config '{"async_updates": true, "num_workers": 2}' + --config '{"num_workers": 2}' docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ - --run DQN \ + --run APEX \ --stop '{"training_iteration": 2}' \ - --config '{"multi_gpu": true, "optimizer": {"sgd_batch_size": 4}}' + --config '{"num_workers": 2, "timesteps_per_iteration": 1000}' docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \