diff --git a/doc/source/rllib-algorithms.rst b/doc/source/rllib-algorithms.rst index b0cbeb8e5..06b813f05 100644 --- a/doc/source/rllib-algorithms.rst +++ b/doc/source/rllib-algorithms.rst @@ -13,22 +13,24 @@ Tuned examples: `PongNoFrameskip-v4 `__ `[implementation] `__ -RLlib's A3C uses the AsyncGradientsOptimizer to apply gradients computed remotely on policy evaluation actors. It scales to up to 16-32 worker processes, depending on the environment. Both a TensorFlow (LSTM), and PyTorch version are available. Note that if you have a GPU, `IMPALA <#importance-weighted-actor-learner-architecture>`__ probably will perform better than A3C. +RLlib's A3C uses the AsyncGradientsOptimizer to apply gradients computed remotely on policy evaluation actors. It scales to up to 16-32 worker processes, depending on the environment. Both a TensorFlow (LSTM), and PyTorch version are available. -Tuned examples: `PongDeterministic-v4 `__, `PyTorch version `__ +Note that if you have a GPU, `IMPALA <#importance-weighted-actor-learner-architecture>`__ probably will perform better than A3C. You can also use the synchronous variant of A3C, `A2C `__. -Deep Deterministic Policy Gradients ------------------------------------ +Tuned examples: `PongDeterministic-v4 `__, `A2C variant `__, `PyTorch version `__ + +Deep Deterministic Policy Gradients (DDPG) +------------------------------------------ `[paper] `__ `[implementation] `__ DDPG is implemented similarly to DQN (below). The algorithm can be scaled by increasing the number of workers, switching to AsyncGradientsOptimizer, or using Ape-X. Tuned examples: `Pendulum-v0 `__, `MountainCarContinuous-v0 `__, `HalfCheetah-v2 `__ -Deep Q Networks ---------------- +Deep Q Networks (DQN) +--------------------- `[paper] `__ `[implementation] `__ RLlib DQN is implemented using the SyncReplayOptimizer. The algorithm can be scaled by increasing the number of workers, using the AsyncGradientsOptimizer for async DQN, or using Ape-X. Memory usage is reduced by compressing samples in the replay buffer with LZ4. @@ -47,8 +49,8 @@ Tuned examples: `Humanoid-v1 `__ `[implementation] `__ @@ -67,8 +69,8 @@ Policy Gradients Tuned examples: `CartPole-v0 `__ -Proximal Policy Optimization ----------------------------- +Proximal Policy Optimization (PPO) +---------------------------------- `[paper] `__ `[implementation] `__ PPO's clipped objective supports multiple SGD passes over the same batch of experiences. RLlib's multi-GPU optimizer pins that data in GPU memory to avoid unnecessary transfers from host memory, substantially improving performance over a naive implementation. RLlib's PPO scales out using multiple workers for experience collection, and also with multiple GPUs for SGD. diff --git a/doc/source/rllib-env.rst b/doc/source/rllib-env.rst index 3c2e13fda..953c48005 100644 --- a/doc/source/rllib-env.rst +++ b/doc/source/rllib-env.rst @@ -136,12 +136,12 @@ Here is a simple `example training script 1``. -Serving -------- +Agent-Driven +------------ -In many situations, it does not make sense for an environment to be "stepped" by RLlib. For example, if a policy is to be used in a web serving system, then it is more natural to instead *query* a service that serves policy decisions, and for that service to learn from experience over time. +In many situations, it does not make sense for an environment to be "stepped" by RLlib. For example, if a policy is to be used in a web serving system, then it is more natural for an agent to query a service that serves policy decisions, and for that service to learn from experience over time. -RLlib provides the `ServingEnv `__ class for this purpose. Unlike other envs, ServingEnv runs as its own thread of control. At any point, that thread can query the current policy for decisions via ``self.get_action()`` and reports rewards via ``self.log_returns()``. This can be done for multiple concurrent episodes as well. +RLlib provides the `ServingEnv `__ class for this purpose. Unlike other envs, ServingEnv has its own thread of control. At any point, agents on that thread can query the current policy for decisions via ``self.get_action()`` and reports rewards via ``self.log_returns()``. This can be done for multiple concurrent episodes as well. For example, ServingEnv can be used to implement a simple REST policy `server `__ that learns over time using RLlib. In this example RLlib runs with ``num_workers=0`` to avoid port allocation issues, but in principle this could be scaled by increasing ``num_workers``. diff --git a/doc/source/rllib-models.rst b/doc/source/rllib-models.rst index 70b5ed756..a234ba002 100644 --- a/doc/source/rllib-models.rst +++ b/doc/source/rllib-models.rst @@ -15,7 +15,7 @@ RLlib picks default models based on a simple heuristic: a `vision network `__. More generally, RLlib supports the use of recurrent models for its algorithms (A3C, PG out of the box), and RNN support is built into its policy evaluation utilities. -For preprocessors, RLlib tries to pick one of its built-in preprocessor based on the environment's observation space. Discrete observations are one-hot encoded, Atari observations downscaled, and Tuple observations flattened (there isn't native tuple support yet, but you can reshape the flattened observation in a custom model). Note that for Atari, DQN defaults to using the `DeepMind preprocessors `__, which are also used by the OpenAI baselines library. +For preprocessors, RLlib tries to pick one of its built-in preprocessor based on the environment's observation space. Discrete observations are one-hot encoded, Atari observations downscaled, and Tuple observations flattened (there isn't native tuple support yet, but you can reshape the flattened observation in a custom model). Note that for Atari, RLlib defaults to using the `DeepMind preprocessors `__, which are also used by the OpenAI baselines library. Custom Models diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index e40f734b9..a9596ba4b 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -36,7 +36,7 @@ The ``train.py`` script has a number of options you can show by running The most important options are for choosing the environment with ``--env`` (any OpenAI gym environment including ones registered by the user can be used) and for choosing the algorithm with ``--run`` -(available options are ``PPO``, ``PG``, ``A3C``, ``IMPALA``, ``ES``, ``DDPG``, ``DQN``, ``APEX``, and ``APEX_DDPG``). +(available options are ``PPO``, ``PG``, ``A2C``, ``A3C``, ``IMPALA``, ``ES``, ``DDPG``, ``DQN``, ``APEX``, and ``APEX_DDPG``). Specifying Parameters ~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index 29f8acf5c..ba0b2dd38 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -36,20 +36,20 @@ Environments * `OpenAI Gym `__ * `Vectorized `__ * `Multi-Agent `__ -* `Serving (Agent-oriented) `__ +* `Serving (Agent driven) `__ * `Offline Data Ingest `__ * `Batch Asynchronous `__ Algorithms ---------- -* `Ape-X Distributed Prioritized Experience Replay `__ -* `Asynchronous Advantage Actor-Critic `__ -* `Deep Deterministic Policy Gradients `__ -* `Deep Q Networks `__ +* `Ape-X Distributed Prioritized Experience Replay `__ +* `Advantage Actor-Critic (A2C, A3C) `__ +* `Deep Deterministic Policy Gradients (DDPG) `__ +* `Deep Q Networks (DQN) `__ * `Evolution Strategies `__ -* `Importance Weighted Actor-Learner Architecture `__ +* `Importance Weighted Actor-Learner Architecture (IMPALA) `__ * `Policy Gradients `__ -* `Proximal Policy Optimization `__ +* `Proximal Policy Optimization (PPO) `__ Models and Preprocessors ------------------------ diff --git a/python/ray/rllib/__init__.py b/python/ray/rllib/__init__.py index 4d6575fcf..8a172df32 100644 --- a/python/ray/rllib/__init__.py +++ b/python/ray/rllib/__init__.py @@ -19,7 +19,8 @@ from ray.rllib.evaluation.sample_batch import SampleBatch def _register_all(): for key in [ "PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG", "APEX_DDPG", - "IMPALA", "__fake", "__sigmoid_fake_data", "__parameter_tuning" + "IMPALA", "A2C", "__fake", "__sigmoid_fake_data", + "__parameter_tuning" ]: from ray.rllib.agents.agent import get_agent_class register_trainable(key, get_agent_class(key)) diff --git a/python/ray/rllib/agents/a3c/__init__.py b/python/ray/rllib/agents/a3c/__init__.py index e4ab31764..7d4303263 100644 --- a/python/ray/rllib/agents/a3c/__init__.py +++ b/python/ray/rllib/agents/a3c/__init__.py @@ -1,3 +1,4 @@ from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG +from ray.rllib.agents.a3c.a2c import A2CAgent -__all__ = ["A3CAgent", "DEFAULT_CONFIG"] +__all__ = ["A2CAgent", "A3CAgent", "DEFAULT_CONFIG"] diff --git a/python/ray/rllib/agents/a3c/a2c.py b/python/ray/rllib/agents/a3c/a2c.py new file mode 100644 index 000000000..860cbf574 --- /dev/null +++ b/python/ray/rllib/agents/a3c/a2c.py @@ -0,0 +1,41 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG as A3C_CONFIG +from ray.rllib.optimizers import SyncSamplesOptimizer +from ray.rllib.utils import merge_dicts +from ray.tune.trial import Resources + +A2C_DEFAULT_CONFIG = merge_dicts( + A3C_CONFIG, + { + "gpu": False, + "sample_batch_size": 20, + "min_iter_time_s": 10, + "optimizer": { + "timesteps_per_batch": 200, + }, + }, +) + + +class A2CAgent(A3CAgent): + """Synchronous variant of the A3CAgent.""" + + _agent_name = "A2C" + _default_config = A2C_DEFAULT_CONFIG + + def _make_optimizer(self): + return SyncSamplesOptimizer(self.local_evaluator, + self.remote_evaluators, + self.config["optimizer"]) + + @classmethod + def default_resource_request(cls, config): + cf = merge_dicts(cls._default_config, config) + return Resources( + cpu=1, + gpu=1 if cf["gpu"] else 0, + extra_cpu=cf["num_workers"], + extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0) diff --git a/python/ray/rllib/agents/a3c/a3c.py b/python/ray/rllib/agents/a3c/a3c.py index c8bff7430..6c4c1928a 100644 --- a/python/ray/rllib/agents/a3c/a3c.py +++ b/python/ray/rllib/agents/a3c/a3c.py @@ -4,6 +4,7 @@ from __future__ import print_function import pickle import os +import time import ray from ray.rllib.agents.agent import Agent, with_common_config @@ -30,6 +31,8 @@ DEFAULT_CONFIG = with_common_config({ "use_gpu_for_workers": False, # Whether to emit extra summary stats "summarize": False, + # Min time per iteration + "min_iter_time_s": 5, # Workers sample async. Note that this increases the effective # sample_batch_size by up to 5x due to async buffering of batches. "sample_async": True, @@ -44,7 +47,7 @@ DEFAULT_CONFIG = with_common_config({ # (Image statespace) - Each pixel "zero_mean": False, # (Image statespace) - Converts image to (dim, dim, C) - "dim": 80, + "dim": 84, # (Image statespace) - Converts image shape to (C, dim, dim) "channel_major": False, }, @@ -56,11 +59,6 @@ DEFAULT_CONFIG = with_common_config({ "allow_growth": True, }, }, - # Arguments to pass to the rllib optimizer - "optimizer": { - # Number of gradients applied for each `train` step - "grads_per_step": 100, - }, }) @@ -93,15 +91,20 @@ class A3CAgent(Agent): self.remote_evaluators = self.make_remote_evaluators( self.env_creator, policy_cls, self.config["num_workers"], {"num_gpus": 1 if self.config["use_gpu_for_workers"] else 0}) - self.optimizer = AsyncGradientsOptimizer(self.local_evaluator, - self.remote_evaluators, - self.config["optimizer"]) + self.optimizer = self._make_optimizer() + + def _make_optimizer(self): + return AsyncGradientsOptimizer(self.local_evaluator, + self.remote_evaluators, + self.config["optimizer"]) def _train(self): prev_steps = self.optimizer.num_steps_sampled - self.optimizer.step() - FilterManager.synchronize(self.local_evaluator.filters, - self.remote_evaluators) + start = time.time() + while time.time() - start < self.config["min_iter_time_s"]: + self.optimizer.step() + FilterManager.synchronize(self.local_evaluator.filters, + self.remote_evaluators) result = self.optimizer.collect_metrics() result.update(timesteps_this_iter=self.optimizer.num_steps_sampled - prev_steps) diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 2e330f9e1..64726956f 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -31,8 +31,10 @@ COMMON_CONFIG = { "sample_async": False, # Which observation filter to apply to the observation "observation_filter": "NoFilter", + # Whether to clip rewards prior to experience postprocessing + "clip_rewards": True, # Whether to use rllib or deepmind preprocessors - "preprocessor_pref": "rllib", + "preprocessor_pref": "deepmind", # Arguments to pass to the env creator "env_config": {}, # Environment name can also be passed via config @@ -136,6 +138,7 @@ class Agent(Trainable): compress_observations=config["compress_observations"], num_envs=config["num_envs_per_worker"], observation_filter=config["observation_filter"], + clip_rewards=config["clip_rewards"], env_config=config["env_config"], model_config=config["model"], policy_config=config, @@ -370,6 +373,9 @@ def get_agent_class(alg): elif alg == "A3C": from ray.rllib.agents import a3c return a3c.A3CAgent + elif alg == "A2C": + from ray.rllib.agents import a3c + return a3c.A2CAgent elif alg == "BC": from ray.rllib.agents import bc return bc.BCAgent diff --git a/python/ray/rllib/agents/bc/bc.py b/python/ray/rllib/agents/bc/bc.py index 1930b8f61..c23b31a09 100644 --- a/python/ray/rllib/agents/bc/bc.py +++ b/python/ray/rllib/agents/bc/bc.py @@ -30,7 +30,7 @@ DEFAULT_CONFIG = { # (Image statespace) - Each pixel "zero_mean": False, # (Image statespace) - Converts image to (dim, dim, C) - "dim": 80, + "dim": 84, # (Image statespace) - Converts image shape to (C, dim, dim) "channel_major": False }, diff --git a/python/ray/rllib/agents/ddpg/ddpg.py b/python/ray/rllib/agents/ddpg/ddpg.py index e43bc3651..b475e297a 100644 --- a/python/ray/rllib/agents/ddpg/ddpg.py +++ b/python/ray/rllib/agents/ddpg/ddpg.py @@ -10,7 +10,7 @@ from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule OPTIMIZER_SHARED_CONFIGS = [ "buffer_size", "prioritized_replay", "prioritized_replay_alpha", "prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size", - "train_batch_size", "learning_starts", "clip_rewards" + "train_batch_size", "learning_starts" ] DEFAULT_CONFIG = with_common_config({ @@ -61,8 +61,6 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_beta": 0.4, # Epsilon to add to the TD errors when updating priorities. "prioritized_replay_eps": 1e-6, - # Whether to clip rewards to [-1, 1] prior to adding to the replay buffer. - "clip_rewards": True, # Whether to LZ4 compress observations "compress_observations": False, diff --git a/python/ray/rllib/agents/dqn/common/wrappers.py b/python/ray/rllib/agents/dqn/common/wrappers.py index 84a8321bb..eb6a6c0d5 100644 --- a/python/ray/rllib/agents/dqn/common/wrappers.py +++ b/python/ray/rllib/agents/dqn/common/wrappers.py @@ -6,7 +6,7 @@ from ray.rllib.models import ModelCatalog from ray.rllib.env.atari_wrappers import wrap_deepmind -def wrap_dqn(env, options, random_starts): +def wrap_dqn(env, options): """Apply a common set of wrappers for DQN.""" is_atari = hasattr(env.unwrapped, "ale") @@ -14,7 +14,6 @@ def wrap_dqn(env, options, random_starts): # Override atari default to use the deepmind wrappers. # TODO(ekl) this logic should be pushed to the catalog. if is_atari and "custom_preprocessor" not in options: - return wrap_deepmind( - env, random_starts=random_starts, dim=options.get("dim", 80)) + return wrap_deepmind(env, dim=options.get("dim", 84)) return ModelCatalog.get_preprocessor_as_wrapper(env, options) diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index d88509b57..71efe15e8 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -18,7 +18,7 @@ from ray.tune.trial import Resources OPTIMIZER_SHARED_CONFIGS = [ "buffer_size", "prioritized_replay", "prioritized_replay_alpha", "prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size", - "train_batch_size", "learning_starts", "clip_rewards" + "train_batch_size", "learning_starts" ] DEFAULT_CONFIG = with_common_config({ @@ -61,8 +61,6 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_beta": 0.4, # Epsilon to add to the TD errors when updating priorities. "prioritized_replay_eps": 1e-6, - # Whether to clip rewards to [-1, 1] prior to adding to the replay buffer. - "clip_rewards": True, # Whether to LZ4 compress observations "compress_observations": True, diff --git a/python/ray/rllib/agents/impala/impala.py b/python/ray/rllib/agents/impala/impala.py index 99322d1d3..6f9a23388 100644 --- a/python/ray/rllib/agents/impala/impala.py +++ b/python/ray/rllib/agents/impala/impala.py @@ -49,12 +49,10 @@ DEFAULT_CONFIG = with_common_config({ "entropy_coeff": -0.01, # Model and preprocessor options. - "clip_rewards": True, - "preprocessor_pref": "deepmind", "model": { "use_lstm": False, "max_seq_len": 20, - "dim": 80, + "dim": 84, }, }) diff --git a/python/ray/rllib/agents/impala/vtrace_policy_graph.py b/python/ray/rllib/agents/impala/vtrace_policy_graph.py index bd55d5329..9e3bb654e 100644 --- a/python/ray/rllib/agents/impala/vtrace_policy_graph.py +++ b/python/ray/rllib/agents/impala/vtrace_policy_graph.py @@ -137,11 +137,6 @@ class VTracePolicyGraph(TFPolicyGraph): rs, [1, 0] + list(range(2, 1 + int(tf.shape(tensor).shape[0])))) - if self.config["clip_rewards"]: - clipped_rewards = tf.clip_by_value(rewards, -1, 1) - else: - clipped_rewards = rewards - # Inputs are reshaped from [B * T] => [T - 1, B] for V-trace calc. self.loss = VTraceLoss( actions=to_batches(actions)[:-1], @@ -151,7 +146,7 @@ class VTracePolicyGraph(TFPolicyGraph): behaviour_logits=to_batches(behaviour_logits)[:-1], target_logits=to_batches(self.model.outputs)[:-1], discount=config["gamma"], - rewards=to_batches(clipped_rewards)[:-1], + rewards=to_batches(rewards)[:-1], values=to_batches(values)[:-1], bootstrap_value=to_batches(values)[-1], vf_loss_coeff=self.config["vf_loss_coeff"], diff --git a/python/ray/rllib/env/atari_wrappers.py b/python/ray/rllib/env/atari_wrappers.py index 76f5d4f01..c1f3b05c8 100644 --- a/python/ray/rllib/env/atari_wrappers.py +++ b/python/ray/rllib/env/atari_wrappers.py @@ -11,7 +11,7 @@ def is_atari(env): class NoopResetEnv(gym.Wrapper): - def __init__(self, env, noop_max=30, random_starts=False): + def __init__(self, env, noop_max=30): """Sample initial states by taking random number of no-ops on reset. No-op is assumed to be action 0. """ @@ -19,7 +19,6 @@ class NoopResetEnv(gym.Wrapper): self.noop_max = noop_max self.override_num_noops = None self.noop_action = 0 - self.random_starts = random_starts assert env.unwrapped.get_action_meanings()[0] == 'NOOP' def reset(self, **kwargs): @@ -32,11 +31,7 @@ class NoopResetEnv(gym.Wrapper): assert noops > 0 obs = None for _ in range(noops): - if self.random_starts: - action = np.random.randint(self.env.action_space.n) - else: - action = self.noop_action - obs, _, done, _ = self.env.step(action) + obs, _, done, _ = self.env.step(self.noop_action) if done: obs = self.env.reset(**kwargs) return obs @@ -93,9 +88,9 @@ class EpisodicLifeEnv(gym.Wrapper): # then update lives to handle bonus lives lives = self.env.unwrapped.ale.lives() if lives < self.lives and lives > 0: - # for Qbert sometimes we stay in lives == 0 condtion for a few - # frames so its important to keep lives > 0, so that we only reset - # once the environment advertises done. + # for Qbert sometimes we stay in lives == 0 condtion for a few fr + # so its important to keep lives > 0, so that we only reset once + # the environment advertises done. done = True self.lives = lives return obs, reward, done, info @@ -150,13 +145,13 @@ class WarpFrame(gym.ObservationWrapper): def __init__(self, env, dim): """Warp frames to the specified size (dim x dim).""" gym.ObservationWrapper.__init__(self, env) - self.width = dim # in rllib we use 80 + self.width = dim self.height = dim self.observation_space = spaces.Box( low=0, high=255, shape=(self.height, self.width, 1), - dtype=np.float32) + dtype=np.uint8) def observation(self, frame): frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) @@ -176,7 +171,7 @@ class FrameStack(gym.Wrapper): low=0, high=255, shape=(shp[0], shp[1], shp[2] * k), - dtype=np.float32) + dtype=env.observation_space.dtype) def reset(self): ob = self.env.reset() @@ -194,22 +189,34 @@ class FrameStack(gym.Wrapper): return np.concatenate(self.frames, axis=2) -def wrap_deepmind(env, random_starts=True, dim=80): +class ScaledFloatFrame(gym.ObservationWrapper): + def __init__(self, env): + gym.ObservationWrapper.__init__(self, env) + self.observation_space = gym.spaces.Box( + low=0, high=1, shape=env.observation_space.shape, dtype=np.float32) + + def observation(self, observation): + # careful! This undoes the memory optimization, use + # with smaller replay buffers only. + return np.array(observation).astype(np.float32) / 255.0 + + +def wrap_deepmind(env, dim=84): """Configure environment for DeepMind-style Atari. Note that we assume reward clipping is done outside the wrapper. Args: - random_starts (bool): Start with random actions instead of noops. dim (int): Dimension to resize observations to (dim x dim). """ - env = NoopResetEnv(env, noop_max=30, random_starts=random_starts) + env = NoopResetEnv(env, noop_max=30) if 'NoFrameskip' in env.spec.id: env = MaxAndSkipEnv(env, skip=4) env = EpisodicLifeEnv(env) if 'FIRE' in env.unwrapped.get_action_meanings(): env = FireResetEnv(env) env = WarpFrame(env, dim) - # env = ClipRewardEnv(env) # reward clipping is handled by DQN replay + # env = ScaledFloatFrame(env) # TODO: use for dqn? + # env = ClipRewardEnv(env) # reward clipping is handled by policy eval env = FrameStack(env, 4) return env diff --git a/python/ray/rllib/evaluation/metrics.py b/python/ray/rllib/evaluation/metrics.py index fbfeb1699..cdc7f84b8 100644 --- a/python/ray/rllib/evaluation/metrics.py +++ b/python/ray/rllib/evaluation/metrics.py @@ -6,6 +6,7 @@ import numpy as np import collections import ray +from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID def collect_metrics(local_evaluator, remote_evaluators=[]): @@ -24,7 +25,8 @@ def collect_metrics(local_evaluator, remote_evaluators=[]): episode_lengths.append(episode.episode_length) episode_rewards.append(episode.episode_reward) for (_, policy_id), reward in episode.agent_rewards.items(): - policy_rewards[policy_id].append(reward) + if policy_id != DEFAULT_POLICY_ID: + policy_rewards[policy_id].append(reward) if episode_rewards: min_reward = min(episode_rewards) max_reward = max(episode_rewards) diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 3f789a4a1..75f45cc4d 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -91,11 +91,12 @@ class PolicyEvaluator(EvaluatorInterface): batch_steps=100, batch_mode="truncate_episodes", episode_horizon=None, - preprocessor_pref="rllib", + preprocessor_pref="deepmind", sample_async=False, compress_observations=False, num_envs=1, observation_filter="NoFilter", + clip_rewards=False, env_config=None, model_config=None, policy_config=None, @@ -147,6 +148,8 @@ class PolicyEvaluator(EvaluatorInterface): and vectorize the computation of actions. This has no effect if if the env already implements VectorEnv. observation_filter (str): Name of observation filter to use. + clip_rewards (bool): Whether to clip rewards to [-1, 1] prior to + experience postprocessing. env_config (dict): Config to pass to the env creator. model_config (dict): Config to use when creating the policy model. policy_config (dict): Config to pass to the policy. In the @@ -181,7 +184,7 @@ class PolicyEvaluator(EvaluatorInterface): preprocessor_pref == "deepmind": def wrap(env): - return wrap_deepmind(env, dim=model_config.get("dim", 80)) + return wrap_deepmind(env, dim=model_config.get("dim", 84)) else: def wrap(env): @@ -245,6 +248,7 @@ class PolicyEvaluator(EvaluatorInterface): self.policy_map, policy_mapping_fn, self.filters, + clip_rewards, batch_steps, horizon=episode_horizon, pack=pack_episodes, @@ -256,6 +260,7 @@ class PolicyEvaluator(EvaluatorInterface): self.policy_map, policy_mapping_fn, self.filters, + clip_rewards, batch_steps, horizon=episode_horizon, pack=pack_episodes, diff --git a/python/ray/rllib/evaluation/sample_batch.py b/python/ray/rllib/evaluation/sample_batch.py index 0ecc56609..f8f88a4aa 100644 --- a/python/ray/rllib/evaluation/sample_batch.py +++ b/python/ray/rllib/evaluation/sample_batch.py @@ -61,14 +61,16 @@ class MultiAgentSampleBatchBuilder(object): corresponding policy batch for the agent's policy. """ - def __init__(self, policy_map): + def __init__(self, policy_map, clip_rewards): """Initialize a MultiAgentSampleBatchBuilder. Arguments: policy_map (dict): Maps policy ids to policy graph instances. + clip_rewards (bool): Whether to clip rewards before postprocessing. """ self.policy_map = policy_map + self.clip_rewards = clip_rewards self.policy_builders = { k: SampleBatchBuilder() for k in policy_map.keys() @@ -113,6 +115,9 @@ class MultiAgentSampleBatchBuilder(object): # Apply postprocessor post_batches = {} + if self.clip_rewards: + for _, (_, pre_batch) in pre_batches.items(): + pre_batch["rewards"] = np.sign(pre_batch["rewards"]) for agent_id, (_, pre_batch) in pre_batches.items(): other_batches = pre_batches.copy() del other_batches[agent_id] diff --git a/python/ray/rllib/evaluation/sampler.py b/python/ray/rllib/evaluation/sampler.py index f88e1cdae..0c7716697 100644 --- a/python/ray/rllib/evaluation/sampler.py +++ b/python/ray/rllib/evaluation/sampler.py @@ -34,6 +34,7 @@ class SyncSampler(object): policies, policy_mapping_fn, obs_filters, + clip_rewards, num_local_steps, horizon=None, pack=False, @@ -48,7 +49,7 @@ class SyncSampler(object): self.rollout_provider = _env_runner( self.async_vector_env, self.extra_batches.put, self.policies, self.policy_mapping_fn, self.num_local_steps, self.horizon, - self._obs_filters, pack, tf_sess) + self._obs_filters, clip_rewards, pack, tf_sess) self.metrics_queue = queue.Queue() def get_data(self): @@ -89,6 +90,7 @@ class AsyncSampler(threading.Thread): policies, policy_mapping_fn, obs_filters, + clip_rewards, num_local_steps, horizon=None, pack=False, @@ -106,6 +108,7 @@ class AsyncSampler(threading.Thread): self.policies = policies self.policy_mapping_fn = policy_mapping_fn self._obs_filters = obs_filters + self.clip_rewards = clip_rewards self.daemon = True self.pack = pack self.tf_sess = tf_sess @@ -121,7 +124,7 @@ class AsyncSampler(threading.Thread): rollout_provider = _env_runner( self.async_vector_env, self.extra_batches.put, self.policies, self.policy_mapping_fn, self.num_local_steps, self.horizon, - self._obs_filters, self.pack, self.tf_sess) + self._obs_filters, self.clip_rewards, self.pack, self.tf_sess) while True: # The timeout variable exists because apparently, if one worker # dies, the other workers won't die with it, unless the timeout is @@ -181,6 +184,7 @@ def _env_runner(async_vector_env, num_local_steps, horizon, obs_filters, + clip_rewards, pack, tf_sess=None): """This implements the common experience collection logic. @@ -197,6 +201,7 @@ def _env_runner(async_vector_env, horizon (int): Horizon of the episode. obs_filters (dict): Map of policy id to filter used to process observations for the policy. + clip_rewards (bool): Whether to clip rewards before postprocessing. pack (bool): Whether to pack multiple episodes into each batch. This guarantees batches will be exactly `num_local_steps` in size. tf_sess (Session|None): Optional tensorflow session to use for batching @@ -223,7 +228,7 @@ def _env_runner(async_vector_env, if batch_builder_pool: return batch_builder_pool.pop() else: - return MultiAgentSampleBatchBuilder(policies) + return MultiAgentSampleBatchBuilder(policies, clip_rewards) def new_episode(): return MultiAgentEpisode(policies, policy_mapping_fn, diff --git a/python/ray/rllib/models/preprocessors.py b/python/ray/rllib/models/preprocessors.py index 46404ae08..c400dd980 100644 --- a/python/ray/rllib/models/preprocessors.py +++ b/python/ray/rllib/models/preprocessors.py @@ -34,7 +34,7 @@ class AtariPixelPreprocessor(Preprocessor): def _init(self): self._grayscale = self._options.get("grayscale", False) self._zero_mean = self._options.get("zero_mean", True) - self._dim = self._options.get("dim", 80) + self._dim = self._options.get("dim", 84) self._channel_major = self._options.get("channel_major", False) if self._grayscale: self.shape = (self._dim, self._dim, 1) @@ -48,8 +48,8 @@ class AtariPixelPreprocessor(Preprocessor): def transform(self, observation): """Downsamples images from (210, 160, 3) by the configured factor.""" scaled = observation[25:-25, :, :] - if self._dim < 80: - scaled = cv2.resize(scaled, (80, 80)) + if self._dim < 84: + scaled = cv2.resize(scaled, (84, 84)) # OpenAI: Resize by half, then down to 42x42 (essentially mipmapping). # If we resize directly we lose pixels that, when mapped to 42x42, # aren't close enough to the pixel boundary. diff --git a/python/ray/rllib/models/pytorch/visionnet.py b/python/ray/rllib/models/pytorch/visionnet.py index 067e1659e..94ac8291d 100644 --- a/python/ray/rllib/models/pytorch/visionnet.py +++ b/python/ray/rllib/models/pytorch/visionnet.py @@ -21,7 +21,7 @@ class VisionNetwork(Model): filters = options.get("conv_filters", [ [16, [8, 8], 4], [32, [4, 4], 2], - [512, [10, 10], 1], + [512, [11, 11], 1], ]) layers = [] in_channels, in_size = inputs[0], inputs[1:] diff --git a/python/ray/rllib/models/visionnet.py b/python/ray/rllib/models/visionnet.py index c3b633dbe..805d2e9e5 100644 --- a/python/ray/rllib/models/visionnet.py +++ b/python/ray/rllib/models/visionnet.py @@ -47,19 +47,19 @@ class VisionNetwork(Model): def get_filter_config(options): - filters_80x80 = [ + filters_84x84 = [ [16, [8, 8], 4], [32, [4, 4], 2], - [512, [10, 10], 1], + [256, [11, 11], 1], ] filters_42x42 = [ [16, [4, 4], 2], [32, [4, 4], 2], - [512, [11, 11], 1], + [256, [11, 11], 1], ] - dim = options.get("dim", 80) - if dim == 80: - return filters_80x80 + dim = options.get("dim", 84) + if dim == 84: + return filters_84x84 elif dim == 42: return filters_42x42 else: diff --git a/python/ray/rllib/optimizers/async_replay_optimizer.py b/python/ray/rllib/optimizers/async_replay_optimizer.py index 99dfc735f..3ed5f37d3 100644 --- a/python/ray/rllib/optimizers/async_replay_optimizer.py +++ b/python/ray/rllib/optimizers/async_replay_optimizer.py @@ -36,8 +36,7 @@ class ReplayActor(object): def __init__(self, num_shards, learning_starts, buffer_size, train_batch_size, prioritized_replay_alpha, - prioritized_replay_beta, prioritized_replay_eps, - clip_rewards): + prioritized_replay_beta, prioritized_replay_eps): self.replay_starts = learning_starts // num_shards self.buffer_size = buffer_size // num_shards self.train_batch_size = train_batch_size @@ -45,9 +44,7 @@ class ReplayActor(object): self.prioritized_replay_eps = prioritized_replay_eps self.replay_buffer = PrioritizedReplayBuffer( - self.buffer_size, - alpha=prioritized_replay_alpha, - clip_rewards=clip_rewards) + self.buffer_size, alpha=prioritized_replay_alpha) # Metrics self.add_batch_timer = TimerStat() @@ -158,7 +155,6 @@ class AsyncReplayOptimizer(PolicyOptimizer): sample_batch_size=50, num_replay_buffer_shards=1, max_weight_sync_delay=400, - clip_rewards=True, debug=False): self.debug = debug @@ -171,9 +167,13 @@ class AsyncReplayOptimizer(PolicyOptimizer): self.learner.start() self.replay_actors = create_colocated(ReplayActor, [ - num_replay_buffer_shards, learning_starts, buffer_size, - train_batch_size, prioritized_replay_alpha, - prioritized_replay_beta, prioritized_replay_eps, clip_rewards + num_replay_buffer_shards, + learning_starts, + buffer_size, + train_batch_size, + prioritized_replay_alpha, + prioritized_replay_beta, + prioritized_replay_eps, ], num_replay_buffer_shards) # Stats diff --git a/python/ray/rllib/optimizers/replay_buffer.py b/python/ray/rllib/optimizers/replay_buffer.py index fecea9f9d..77d954345 100644 --- a/python/ray/rllib/optimizers/replay_buffer.py +++ b/python/ray/rllib/optimizers/replay_buffer.py @@ -12,7 +12,7 @@ from ray.rllib.utils.window_stat import WindowStat class ReplayBuffer(object): - def __init__(self, size, clip_rewards): + def __init__(self, size): """Create Prioritized Replay buffer. Parameters @@ -30,15 +30,11 @@ class ReplayBuffer(object): self._num_sampled = 0 self._evicted_hit_stats = WindowStat("evicted_hit", 1000) self._est_size_bytes = 0 - self._clip_rewards = clip_rewards def __len__(self): return len(self._storage) def add(self, obs_t, action, reward, obs_tp1, done, weight): - if self._clip_rewards: - reward = np.sign(reward) - data = (obs_t, action, reward, obs_tp1, done) self._num_added += 1 @@ -109,7 +105,7 @@ class ReplayBuffer(object): class PrioritizedReplayBuffer(ReplayBuffer): - def __init__(self, size, alpha, clip_rewards): + def __init__(self, size, alpha): """Create Prioritized Replay buffer. Parameters @@ -125,7 +121,7 @@ class PrioritizedReplayBuffer(ReplayBuffer): -------- ReplayBuffer.__init__ """ - super(PrioritizedReplayBuffer, self).__init__(size, clip_rewards) + super(PrioritizedReplayBuffer, self).__init__(size) assert alpha > 0 self._alpha = alpha @@ -140,8 +136,6 @@ class PrioritizedReplayBuffer(ReplayBuffer): def add(self, obs_t, action, reward, obs_tp1, done, weight): """See ReplayBuffer.store_effect""" - if self._clip_rewards: - reward = np.sign(reward) idx = self._next_idx super(PrioritizedReplayBuffer, self).add(obs_t, action, reward, @@ -155,8 +149,7 @@ class PrioritizedReplayBuffer(ReplayBuffer): res = [] for _ in range(batch_size): # TODO(szymon): should we ensure no repeats? - mass = random.random() * self._it_sum.sum(0, - len(self._storage) - 1) + mass = random.random() * self._it_sum.sum(0, len(self._storage)) idx = self._it_sum.find_prefixsum_idx(mass) res.append(idx) return res diff --git a/python/ray/rllib/optimizers/sync_replay_optimizer.py b/python/ray/rllib/optimizers/sync_replay_optimizer.py index 834994cd7..900f009dd 100644 --- a/python/ray/rllib/optimizers/sync_replay_optimizer.py +++ b/python/ray/rllib/optimizers/sync_replay_optimizer.py @@ -31,8 +31,7 @@ class SyncReplayOptimizer(PolicyOptimizer): prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, train_batch_size=32, - sample_batch_size=4, - clip_rewards=True): + sample_batch_size=4): self.replay_starts = learning_starts self.prioritized_replay_beta = prioritized_replay_beta @@ -51,13 +50,11 @@ class SyncReplayOptimizer(PolicyOptimizer): def new_buffer(): return PrioritizedReplayBuffer( - buffer_size, - alpha=prioritized_replay_alpha, - clip_rewards=clip_rewards) + buffer_size, alpha=prioritized_replay_alpha) else: def new_buffer(): - return ReplayBuffer(buffer_size, clip_rewards) + return ReplayBuffer(buffer_size) self.replay_buffers = collections.defaultdict(new_buffer) diff --git a/python/ray/rllib/optimizers/sync_samples_optimizer.py b/python/ray/rllib/optimizers/sync_samples_optimizer.py index 7af87fcd3..8da88e7f3 100644 --- a/python/ray/rllib/optimizers/sync_samples_optimizer.py +++ b/python/ray/rllib/optimizers/sync_samples_optimizer.py @@ -43,6 +43,7 @@ class SyncSamplesOptimizer(PolicyOptimizer): else: samples.append(self.local_evaluator.sample()) samples = SampleBatch.concat_samples(samples) + self.sample_timer.push_units_processed(samples.count) with self.grad_timer: for i in range(self.num_sgd_iter): @@ -64,5 +65,7 @@ class SyncSamplesOptimizer(PolicyOptimizer): 3), "opt_peak_throughput": round(self.grad_timer.mean_throughput, 3), + "sample_peak_throughput": round( + self.sample_timer.mean_throughput, 3), "opt_samples": round(self.grad_timer.mean_units_processed, 3), }) diff --git a/python/ray/rllib/test/test_catalog.py b/python/ray/rllib/test/test_catalog.py index 454c9255c..e3dc1e782 100644 --- a/python/ray/rllib/test/test_catalog.py +++ b/python/ray/rllib/test/test_catalog.py @@ -74,7 +74,7 @@ class ModelCatalogTest(unittest.TestCase): with tf.variable_scope("test2"): p2 = ModelCatalog.get_model( - np.zeros((10, 80, 80, 3), dtype=np.float32), 5) + np.zeros((10, 84, 84, 3), dtype=np.float32), 5) self.assertEqual(type(p2), VisionNetwork) def testCustomModel(self): diff --git a/python/ray/rllib/test/test_policy_evaluator.py b/python/ray/rllib/test/test_policy_evaluator.py index 1aa559df6..b8e1a9a3a 100644 --- a/python/ray/rllib/test/test_policy_evaluator.py +++ b/python/ray/rllib/test/test_policy_evaluator.py @@ -120,6 +120,27 @@ class TestPolicyEvaluator(unittest.TestCase): self.assertEqual(results, [5, 5, 5]) self.assertEqual(results2, [(0, 5), (1, 5), (2, 5)]) + def testRewardClipping(self): + # clipping on + ev = PolicyEvaluator( + env_creator=lambda _: MockEnv2(episode_length=10), + policy_graph=MockPolicyGraph, + clip_rewards=True, + batch_mode="complete_episodes") + self.assertEqual(max(ev.sample()["rewards"]), 1) + result = collect_metrics(ev, []) + self.assertEqual(result["episode_reward_mean"], 1000) + + # clipping off + ev2 = PolicyEvaluator( + env_creator=lambda _: MockEnv2(episode_length=10), + policy_graph=MockPolicyGraph, + clip_rewards=False, + batch_mode="complete_episodes") + self.assertEqual(max(ev2.sample()["rewards"]), 100) + result2 = collect_metrics(ev2, []) + self.assertEqual(result2["episode_reward_mean"], 1000) + def testMetrics(self): ev = PolicyEvaluator( env_creator=lambda _: MockEnv(episode_length=10), diff --git a/python/ray/rllib/test/test_supported_spaces.py b/python/ray/rllib/test/test_supported_spaces.py index 39ce73807..1205d7615 100644 --- a/python/ray/rllib/test/test_supported_spaces.py +++ b/python/ray/rllib/test/test_supported_spaces.py @@ -32,7 +32,7 @@ ACTION_SPACES_TO_TEST = { OBSERVATION_SPACES_TO_TEST = { "discrete": Discrete(5), "vector": Box(0.0, 1.0, (5, ), dtype=np.float32), - "image": Box(0.0, 1.0, (80, 80, 1), dtype=np.float32), + "image": Box(0.0, 1.0, (84, 84, 1), dtype=np.float32), "atari": Box(0.0, 1.0, (210, 160, 3), dtype=np.float32), "atari_ram": Box(0.0, 1.0, (128, ), dtype=np.float32), "simple_tuple": Tuple([ diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 3aeeaed52..2ce7ca2cf 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -9,7 +9,8 @@ import os import yaml from ray.tune.log_sync import get_syncer -from ray.tune.result import NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S +from ray.tune.result import NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S, \ + TIMESTEPS_TOTAL try: import tensorflow as tf @@ -132,13 +133,13 @@ class _TFLogger(Logger): del tmp[k] # not useful to tf log these values = to_tf_values(tmp, ["ray", "tune"]) train_stats = tf.Summary(value=values) - self._file_writer.add_summary(train_stats, result[TRAINING_ITERATION]) + t = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION] + self._file_writer.add_summary(train_stats, t) iteration_value = to_tf_values({ "training_iteration": result[TRAINING_ITERATION] }, ["ray", "tune"]) iteration_stats = tf.Summary(value=iteration_value) - self._file_writer.add_summary(iteration_stats, - result[TRAINING_ITERATION]) + self._file_writer.add_summary(iteration_stats, t) def flush(self): self._file_writer.flush() diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index be8c37e32..ddcc29637 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -270,6 +270,9 @@ class Trial(object): int(self.last_result.get(TIME_TOTAL_S))) ] + if self.last_result.get("timesteps_total") is not None: + pieces.append('{} ts'.format(self.last_result["timesteps_total"])) + if self.last_result.get("episode_reward_mean") is not None: pieces.append('{} rew'.format( format(self.last_result["episode_reward_mean"], '.3g'))) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index a1a77177a..255fed38e 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -16,7 +16,14 @@ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env PongDeterministic-v0 \ --run A3C \ --stop '{"training_iteration": 2}' \ - --config '{"num_workers": 16}' + --config '{"num_workers": 2}' + +docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env PongDeterministic-v0 \ + --run A2C \ + --stop '{"training_iteration": 2}' \ + --config '{"num_workers": 2}' docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ @@ -51,14 +58,14 @@ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env Pendulum-v0 \ --run ES \ --stop '{"training_iteration": 2}' \ - --config '{"stepsize": 0.01, "episodes_per_batch": 20, "timesteps_per_batch": 100}' + --config '{"stepsize": 0.01, "episodes_per_batch": 20, "timesteps_per_batch": 100, "num_workers": 2}' docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pong-v0 \ --run ES \ --stop '{"training_iteration": 2}' \ - --config '{"stepsize": 0.01, "episodes_per_batch": 20, "timesteps_per_batch": 100}' + --config '{"stepsize": 0.01, "episodes_per_batch": 20, "timesteps_per_batch": 100, "num_workers": 2}' docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ @@ -276,7 +283,7 @@ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env PongDeterministic-v4 \ --run A3C \ --stop '{"training_iteration": 2}' \ - --config '{"num_workers": 2, "use_pytorch": true, "model": {"use_lstm": false, "grayscale": true, "zero_mean": false, "dim": 80, "channel_major": true}}' + --config '{"num_workers": 2, "use_pytorch": true, "model": {"use_lstm": false, "grayscale": true, "zero_mean": false, "dim": 84, "channel_major": true}, "preprocessor_pref": "rllib"}' docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \