From 076936a7f5cd49872b1ebf44c416dd351f78c967 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 11 Mar 2018 21:14:38 -0700 Subject: [PATCH] [rllib] Switch DQN to using deepmind wrappers (#1655) * deepmind wrap * use 80x80 * respect custom prep * fix replay size * fix chekc * batch idx * Wed Mar 7 11:00:39 PST 2018 * random starts and reward clipping * Fri Mar 9 17:27:17 PST 2018 * Fri Mar 9 17:36:15 PST 2018 * Sat Mar 10 19:47:10 PST 2018 * Sat Mar 10 19:47:37 PST 2018 * Sat Mar 10 20:05:12 PST 2018 * Sat Mar 10 20:54:21 PST 2018 * Sat Mar 10 21:03:52 PST 2018 --- python/ray/rllib/dqn/common/atari_wrappers.py | 202 +++++++++++++++ python/ray/rllib/dqn/common/wrappers.py | 232 +----------------- python/ray/rllib/dqn/dqn.py | 4 + python/ray/rllib/dqn/dqn_evaluator.py | 2 +- python/ray/rllib/optimizers/apex_optimizer.py | 16 +- .../ray/rllib/optimizers/local_sync_replay.py | 8 +- python/ray/rllib/optimizers/replay_buffer.py | 13 +- .../ray/rllib/tuned_examples/pong-apex.yaml | 7 +- python/ray/tune/logger.py | 2 +- 9 files changed, 244 insertions(+), 242 deletions(-) create mode 100644 python/ray/rllib/dqn/common/atari_wrappers.py diff --git a/python/ray/rllib/dqn/common/atari_wrappers.py b/python/ray/rllib/dqn/common/atari_wrappers.py new file mode 100644 index 000000000..4726a4f99 --- /dev/null +++ b/python/ray/rllib/dqn/common/atari_wrappers.py @@ -0,0 +1,202 @@ +import numpy as np +from collections import deque +import gym +from gym import spaces +import cv2 +cv2.ocl.setUseOpenCL(False) + + +class NoopResetEnv(gym.Wrapper): + def __init__(self, env, noop_max=30, random_starts=False): + """Sample initial states by taking random number of no-ops on reset. + No-op is assumed to be action 0. + """ + gym.Wrapper.__init__(self, env) + self.noop_max = noop_max + self.override_num_noops = None + self.noop_action = 0 + self.random_starts = random_starts + assert env.unwrapped.get_action_meanings()[0] == 'NOOP' + + def reset(self, **kwargs): + """ Do no-op action for a number of steps in [1, noop_max].""" + self.env.reset(**kwargs) + if self.override_num_noops is not None: + noops = self.override_num_noops + else: + noops = self.unwrapped.np_random.randint( + 1, self.noop_max + 1) + assert noops > 0 + obs = None + for _ in range(noops): + if self.random_starts: + action = np.random.randint(self.env.action_space.n) + else: + action = self.noop_action + obs, _, done, _ = self.env.step(action) + if done: + obs = self.env.reset(**kwargs) + return obs + + def step(self, ac): + return self.env.step(ac) + + +class ClipRewardEnv(gym.RewardWrapper): + def __init__(self, env): + gym.RewardWrapper.__init__(self, env) + + def reward(self, reward): + """Bin reward to {+1, 0, -1} by its sign.""" + return np.sign(reward) + + +class FireResetEnv(gym.Wrapper): + def __init__(self, env): + """Take action on reset. + + For environments that are fixed until firing.""" + gym.Wrapper.__init__(self, env) + assert env.unwrapped.get_action_meanings()[1] == 'FIRE' + assert len(env.unwrapped.get_action_meanings()) >= 3 + + def reset(self, **kwargs): + self.env.reset(**kwargs) + obs, _, done, _ = self.env.step(1) + if done: + self.env.reset(**kwargs) + obs, _, done, _ = self.env.step(2) + if done: + self.env.reset(**kwargs) + return obs + + def step(self, ac): + return self.env.step(ac) + + +class EpisodicLifeEnv(gym.Wrapper): + def __init__(self, env): + """Make end-of-life == end-of-episode, but only reset on true game over. + Done by DeepMind for the DQN and co. since it helps value estimation. + """ + gym.Wrapper.__init__(self, env) + self.lives = 0 + self.was_real_done = True + + def step(self, action): + obs, reward, done, info = self.env.step(action) + self.was_real_done = done + # check current lives, make loss of life terminal, + # then update lives to handle bonus lives + lives = self.env.unwrapped.ale.lives() + if lives < self.lives and lives > 0: + # for Qbert sometimes we stay in lives == 0 condtion for a few + # frames so its important to keep lives > 0, so that we only reset + # once the environment advertises done. + done = True + self.lives = lives + return obs, reward, done, info + + def reset(self, **kwargs): + """Reset only when lives are exhausted. + This way all states are still reachable even though lives are episodic, + and the learner need not know about any of this behind-the-scenes. + """ + if self.was_real_done: + obs = self.env.reset(**kwargs) + else: + # no-op step to advance from terminal/lost life state + obs, _, _, _ = self.env.step(0) + self.lives = self.env.unwrapped.ale.lives() + return obs + + +class MaxAndSkipEnv(gym.Wrapper): + def __init__(self, env, skip=4): + """Return only every `skip`-th frame""" + gym.Wrapper.__init__(self, env) + # most recent raw observations (for max pooling across time steps) + self._obs_buffer = np.zeros( + (2,)+env.observation_space.shape, dtype=np.uint8) + self._skip = skip + + def step(self, action): + """Repeat action, sum reward, and max over last observations.""" + total_reward = 0.0 + done = None + for i in range(self._skip): + obs, reward, done, info = self.env.step(action) + if i == self._skip - 2: + self._obs_buffer[0] = obs + if i == self._skip - 1: + self._obs_buffer[1] = obs + total_reward += reward + if done: + break + # Note that the observation on the done=True frame + # doesn't matter + max_frame = self._obs_buffer.max(axis=0) + + return max_frame, total_reward, done, info + + def reset(self, **kwargs): + return self.env.reset(**kwargs) + + +class WarpFrame(gym.ObservationWrapper): + def __init__(self, env): + """Warp frames to 84x84 as done in the Nature paper and later work.""" + gym.ObservationWrapper.__init__(self, env) + self.width = 80 # in rllib we use 80 + self.height = 80 + self.observation_space = spaces.Box( + low=0, high=255, shape=(self.height, self.width, 1)) + + def observation(self, frame): + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) + frame = cv2.resize( + frame, (self.width, self.height), interpolation=cv2.INTER_AREA) + return frame[:, :, None] + + +class FrameStack(gym.Wrapper): + def __init__(self, env, k): + """Stack k last frames.""" + gym.Wrapper.__init__(self, env) + self.k = k + self.frames = deque([], maxlen=k) + shp = env.observation_space.shape + self.observation_space = spaces.Box( + low=0, high=255, shape=(shp[0], shp[1], shp[2] * k)) + + def reset(self): + ob = self.env.reset() + for _ in range(self.k): + self.frames.append(ob) + return self._get_ob() + + def step(self, action): + ob, reward, done, info = self.env.step(action) + self.frames.append(ob) + return self._get_ob(), reward, done, info + + def _get_ob(self): + assert len(self.frames) == self.k + return np.concatenate(self.frames, axis=2) + + +def wrap_deepmind(env, random_starts): + """Configure environment for DeepMind-style Atari. + + Note that we assume reward clipping is done outside the wrapper. + """ + env = NoopResetEnv(env, noop_max=30, random_starts=random_starts) + if 'NoFrameskip' in env.spec.id: + env = MaxAndSkipEnv(env, skip=4) + env = EpisodicLifeEnv(env) + if 'FIRE' in env.unwrapped.get_action_meanings(): + env = FireResetEnv(env) + env = WarpFrame(env) + # env = ClipRewardEnv(env) # reward clipping is handled by DQN replay + env = FrameStack(env, 4) + return env diff --git a/python/ray/rllib/dqn/common/wrappers.py b/python/ray/rllib/dqn/common/wrappers.py index 9ac859952..3a8fd68ae 100644 --- a/python/ray/rllib/dqn/common/wrappers.py +++ b/python/ray/rllib/dqn/common/wrappers.py @@ -2,236 +2,18 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import cv2 -import gym -import numpy as np - -from collections import deque -from gym import spaces - from ray.rllib.models import ModelCatalog +from ray.rllib.dqn.common.atari_wrappers import wrap_deepmind -class NoopResetEnv(gym.Wrapper): - def __init__(self, env=None, noop_max=30): - """Sample initial states by taking random number of no-ops on reset. - No-op is assumed to be action 0. - """ - super(NoopResetEnv, self).__init__(env) - self.noop_max = noop_max - self.override_num_noops = None - assert env.unwrapped.get_action_meanings()[0] == 'NOOP' - - def reset(self): - """ Do no-op action for a number of steps in [1, noop_max].""" - self.env.reset() - if self.override_num_noops is not None: - noops = self.override_num_noops - else: - noops = np.random.randint(1, self.noop_max + 1) - assert noops > 0 - obs = None - for _ in range(noops): - obs, _, done, _ = self.env.step(0) - if done: - obs = self.env.reset() - return obs - - -class FireResetEnv(gym.Wrapper): - def __init__(self, env=None): - """For environments where the user need to press FIRE for the game to - start.""" - super(FireResetEnv, self).__init__(env) - assert env.unwrapped.get_action_meanings()[1] == 'FIRE' - assert len(env.unwrapped.get_action_meanings()) >= 3 - - def reset(self): - self.env.reset() - obs, _, done, _ = self.env.step(1) - if done: - self.env.reset() - obs, _, done, _ = self.env.step(2) - if done: - self.env.reset() - return obs - - -class EpisodicLifeEnv(gym.Wrapper): - def __init__(self, env=None): - """Make end-of-life == end-of-episode, but only reset on true game - over. Done by DeepMind for the DQN and co. since it helps value - estimation. - """ - super(EpisodicLifeEnv, self).__init__(env) - self.lives = 0 - self.was_real_done = True - self.was_real_reset = False - - def step(self, action): - obs, reward, done, info = self.env.step(action) - self.was_real_done = done - # check current lives, make loss of life terminal, - # then update lives to handle bonus lives - lives = self.env.unwrapped.ale.lives() - if lives < self.lives and lives > 0: - # for Qbert somtimes we stay in lives == 0 condtion for a few - # frames so its important to keep lives > 0, so that we only reset - # once the environment advertises done. - done = True - self.lives = lives - return obs, reward, done, info - - def reset(self): - """Reset only when lives are exhausted. - This way all states are still reachable even though lives are episodic, - and the learner need not know about any of this behind-the-scenes. - """ - if self.was_real_done: - obs = self.env.reset() - self.was_real_reset = True - else: - # no-op step to advance from terminal/lost life state - obs, _, _, _ = self.env.step(0) - self.was_real_reset = False - self.lives = self.env.unwrapped.ale.lives() - return obs - - -class MaxAndSkipEnv(gym.Wrapper): - def __init__(self, env=None, skip=4): - """Return only every `skip`-th frame""" - super(MaxAndSkipEnv, self).__init__(env) - # most recent raw observations (for max pooling across time steps) - self._obs_buffer = deque(maxlen=2) - self._skip = skip - - def step(self, action): - total_reward = 0.0 - done = None - for _ in range(self._skip): - obs, reward, done, info = self.env.step(action) - self._obs_buffer.append(obs) - total_reward += reward - if done: - break - - max_frame = np.max(np.stack(self._obs_buffer), axis=0) - - return max_frame, total_reward, done, info - - def reset(self): - """Clear past frame buffer and init. to first obs. from inner env.""" - self._obs_buffer.clear() - obs = self.env.reset() - self._obs_buffer.append(obs) - return obs - - -# TODO(ekl): switch this to use a RLlib common preprocessor -class ProcessFrame80(gym.ObservationWrapper): - def __init__(self, env=None): - super(ProcessFrame80, self).__init__(env) - self.observation_space = spaces.Box( - low=0, high=255, shape=(80, 80, 1), dtype=np.uint8) - - def observation(self, obs): - return ProcessFrame80.process(obs) - - @staticmethod - def process(frame): - if frame.size == 210 * 160 * 3: - img = np.reshape(frame, [210, 160, 3]).astype(np.float32) - elif frame.size == 250 * 160 * 3: - img = np.reshape(frame, [250, 160, 3]).astype(np.float32) - else: - assert False, "Unknown resolution." - img = (img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + - img[:, :, 2] * 0.114) - resized_screen = cv2.resize( - img, (80, 110), interpolation=cv2.INTER_AREA) - x_t = resized_screen[20:100, :] - x_t = np.reshape(x_t, [80, 80, 1]) - return x_t.astype(np.uint8) - - -class ClippedRewardsWrapper(gym.RewardWrapper): - def reward(self, reward): - """Change all the positive rewards to 1, negative to -1 and keep - zero.""" - return np.sign(reward) - - -class LazyFrames(object): - def __init__(self, frames): - """This object ensures that common frames between the observations are - only stored once. It exists purely to optimize memory usage which can - be huge for DQN's 1M frames replay buffers. - - This object should only be converted to numpy array before being passed - to the model. - - You'd not belive how complex the previous solution was.""" - self._frames = frames - - def __array__(self, dtype=None): - out = np.concatenate(self._frames, axis=2) - if dtype is not None: - out = out.astype(dtype) - return out - - -class FrameStack(gym.Wrapper): - def __init__(self, env, k): - """Stack k last frames. - - Returns lazy array, which is much more memory efficient. - - See Also - -------- - LazyFrames - """ - gym.Wrapper.__init__(self, env) - self.k = k - self.frames = deque([], maxlen=k) - shp = env.observation_space.shape - self.observation_space = spaces.Box( - low=0, high=255, shape=(shp[0], shp[1], shp[2] * k), - dtype=np.uint8) - - def reset(self): - ob = self.env.reset() - for _ in range(self.k): - self.frames.append(ob) - return self._get_ob() - - def step(self, action): - ob, reward, done, info = self.env.step(action) - self.frames.append(ob) - return self._get_ob(), reward, done, info - - def _get_ob(self): - assert len(self.frames) == self.k - return LazyFrames(list(self.frames)) - - -def wrap_dqn(registry, env, options): +def wrap_dqn(registry, env, options, random_starts): """Apply a common set of wrappers for DQN.""" is_atari = hasattr(env.unwrapped, "ale") - if is_atari: - env = EpisodicLifeEnv(env) - env = NoopResetEnv(env, noop_max=30) - if 'NoFrameskip' in env.spec.id: - env = MaxAndSkipEnv(env, skip=4) - if 'FIRE' in env.unwrapped.get_action_meanings(): - env = FireResetEnv(env) + # Override atari default to use the deepmind wrappers. + # TODO(ekl) this logic should be pushed to the catalog. + if is_atari and "custom_preprocessor" not in options: + return wrap_deepmind(env, random_starts=random_starts) - env = ModelCatalog.get_preprocessor_as_wrapper(registry, env, options) - - if is_atari: - env = FrameStack(env, 4) - env = ClippedRewardsWrapper(env) - - return env + return ModelCatalog.get_preprocessor_as_wrapper(registry, env, options) diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index a264754c4..dc0bb3f4e 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -52,6 +52,8 @@ DEFAULT_CONFIG = dict( exploration_final_eps=0.02, # Update the target network every `target_network_update_freq` steps. target_network_update_freq=500, + # Whether to start with random actions instead of noops. + random_starts=True, # === Replay buffer === # Size of the replay buffer. Note that if async_updates is set, then @@ -65,6 +67,8 @@ DEFAULT_CONFIG = dict( prioritized_replay_beta=0.4, # Epsilon to add to the TD errors when updating priorities. prioritized_replay_eps=1e-6, + # Whether to clip rewards to [-1, 1] prior to adding to the replay buffer. + clip_rewards=True, # === Optimization === # Learning rate for adam optimizer diff --git a/python/ray/rllib/dqn/dqn_evaluator.py b/python/ray/rllib/dqn/dqn_evaluator.py index ae7a3a080..088840161 100644 --- a/python/ray/rllib/dqn/dqn_evaluator.py +++ b/python/ray/rllib/dqn/dqn_evaluator.py @@ -50,7 +50,7 @@ class DQNEvaluator(Evaluator): def __init__(self, registry, env_creator, config, logdir, worker_index): env = env_creator(config["env_config"]) - env = wrap_dqn(registry, env, config["model"]) + env = wrap_dqn(registry, env, config["model"], config["random_starts"]) self.env = env self.config = config diff --git a/python/ray/rllib/optimizers/apex_optimizer.py b/python/ray/rllib/optimizers/apex_optimizer.py index 78a210b8a..cadcc28f2 100644 --- a/python/ray/rllib/optimizers/apex_optimizer.py +++ b/python/ray/rllib/optimizers/apex_optimizer.py @@ -28,7 +28,7 @@ class ReplayActor(object): def __init__( self, num_shards, learning_starts, buffer_size, train_batch_size, prioritized_replay_alpha, prioritized_replay_beta, - prioritized_replay_eps): + prioritized_replay_eps, clip_rewards): self.replay_starts = learning_starts // num_shards self.buffer_size = buffer_size // num_shards self.train_batch_size = train_batch_size @@ -36,7 +36,8 @@ class ReplayActor(object): self.prioritized_replay_eps = prioritized_replay_eps self.replay_buffer = PrioritizedReplayBuffer( - buffer_size, alpha=prioritized_replay_alpha) + self.buffer_size, alpha=prioritized_replay_alpha, + clip_rewards=clip_rewards) # Metrics self.add_batch_timer = TimerStat() @@ -98,6 +99,7 @@ class GenericLearner(threading.Thread): self.queue_timer = TimerStat() self.grad_timer = TimerStat() self.daemon = True + self.weights_updated = False def run(self): while True: @@ -111,6 +113,7 @@ class GenericLearner(threading.Thread): td_error = self.local_evaluator.compute_apply(replay) self.outqueue.put((ra, replay, td_error)) self.learner_queue_size.push(self.inqueue.qsize()) + self.weights_updated = True class ApexOptimizer(Optimizer): @@ -121,7 +124,7 @@ class ApexOptimizer(Optimizer): prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, train_batch_size=512, sample_batch_size=50, num_replay_buffer_shards=1, max_weight_sync_delay=400, - debug=False): + clip_rewards=True, debug=False): self.debug = debug self.replay_starts = learning_starts @@ -138,7 +141,7 @@ class ApexOptimizer(Optimizer): ReplayActor, [num_replay_buffer_shards, learning_starts, buffer_size, train_batch_size, prioritized_replay_alpha, - prioritized_replay_beta, prioritized_replay_eps], + prioritized_replay_beta, prioritized_replay_eps, clip_rewards], num_replay_buffer_shards) assert len(self.remote_evaluators) > 0 @@ -199,7 +202,10 @@ class ApexOptimizer(Optimizer): # Update weights if needed self.steps_since_update[ev] += self.sample_batch_size if self.steps_since_update[ev] >= self.max_weight_sync_delay: - if weights is None: + # Note that it's important to pull new weights once + # updated to avoid excessive correlation between actors + if weights is None or self.learner.weights_updated: + self.learner.weights_updated = False with self.timers["put_weights"]: weights = ray.put( self.local_evaluator.get_weights()) diff --git a/python/ray/rllib/optimizers/local_sync_replay.py b/python/ray/rllib/optimizers/local_sync_replay.py index 7222584b4..a2015e873 100644 --- a/python/ray/rllib/optimizers/local_sync_replay.py +++ b/python/ray/rllib/optimizers/local_sync_replay.py @@ -20,7 +20,7 @@ class LocalSyncReplayOptimizer(Optimizer): self, learning_starts=1000, buffer_size=10000, prioritized_replay=True, prioritized_replay_alpha=0.6, prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, - train_batch_size=32, sample_batch_size=4): + train_batch_size=32, sample_batch_size=4, clip_rewards=True): self.replay_starts = learning_starts self.prioritized_replay_beta = prioritized_replay_beta @@ -37,10 +37,10 @@ class LocalSyncReplayOptimizer(Optimizer): # Set up replay buffer if prioritized_replay: self.replay_buffer = PrioritizedReplayBuffer( - buffer_size, - alpha=prioritized_replay_alpha) + buffer_size, alpha=prioritized_replay_alpha, + clip_rewards=clip_rewards) else: - self.replay_buffer = ReplayBuffer(buffer_size) + self.replay_buffer = ReplayBuffer(buffer_size, clip_rewards) assert buffer_size >= self.replay_starts diff --git a/python/ray/rllib/optimizers/replay_buffer.py b/python/ray/rllib/optimizers/replay_buffer.py index 513ee7c4e..d38014ba2 100644 --- a/python/ray/rllib/optimizers/replay_buffer.py +++ b/python/ray/rllib/optimizers/replay_buffer.py @@ -12,7 +12,7 @@ from ray.rllib.utils.window_stat import WindowStat class ReplayBuffer(object): - def __init__(self, size): + def __init__(self, size, clip_rewards): """Create Prioritized Replay buffer. Parameters @@ -30,11 +30,15 @@ class ReplayBuffer(object): self._num_sampled = 0 self._evicted_hit_stats = WindowStat("evicted_hit", 1000) self._est_size_bytes = 0 + self._clip_rewards = clip_rewards def __len__(self): return len(self._storage) def add(self, obs_t, action, reward, obs_tp1, done, weight): + if self._clip_rewards: + reward = np.sign(reward) + data = (obs_t, action, reward, obs_tp1, done) self._num_added += 1 @@ -103,7 +107,7 @@ class ReplayBuffer(object): class PrioritizedReplayBuffer(ReplayBuffer): - def __init__(self, size, alpha): + def __init__(self, size, alpha, clip_rewards): """Create Prioritized Replay buffer. Parameters @@ -119,7 +123,7 @@ class PrioritizedReplayBuffer(ReplayBuffer): -------- ReplayBuffer.__init__ """ - super(PrioritizedReplayBuffer, self).__init__(size) + super(PrioritizedReplayBuffer, self).__init__(size, clip_rewards) assert alpha > 0 self._alpha = alpha @@ -134,6 +138,9 @@ class PrioritizedReplayBuffer(ReplayBuffer): def add(self, obs_t, action, reward, obs_tp1, done, weight): """See ReplayBuffer.store_effect""" + if self._clip_rewards: + reward = np.sign(reward) + idx = self._next_idx super(PrioritizedReplayBuffer, self).add( obs_t, action, reward, obs_tp1, done, weight) diff --git a/python/ray/rllib/tuned_examples/pong-apex.yaml b/python/ray/rllib/tuned_examples/pong-apex.yaml index 9a324a0eb..f3f00c339 100644 --- a/python/ray/rllib/tuned_examples/pong-apex.yaml +++ b/python/ray/rllib/tuned_examples/pong-apex.yaml @@ -1,5 +1,7 @@ +# This can be expected to reach 20.8 reward within an hour when using +# a V100 GPU (e.g. p3.2xl instance on AWS, and m4.4xl workers). pong-apex: - env: Pong-v0 + env: PongNoFrameskip-v4 run: APEX resources: cpu: @@ -7,8 +9,7 @@ pong-apex: gpu: 1 config: force_evaluators_remote: True # requires cluster + target_network_update_freq: 50000 num_workers: 32 lr: .0001 gamma: 0.99 - model: - grayscale: True diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 0379c6c22..55d466b75 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -187,4 +187,4 @@ def pretty_print(result): out[k] = v cleaned = json.dumps(out, cls=_CustomEncoder) - return yaml.dump(json.loads(cleaned), default_flow_style=False) + return yaml.safe_dump(json.loads(cleaned), default_flow_style=False)