[rllib] Misc fixes, A2C (#2679)

A bunch of minor rllib fixes:

pull in latest baselines atari wrapper changes (and use deepmind wrapper by default)
move reward clipping to policy evaluator
add a2c variant of a3c
reduce vision network fc layer size to 256 units
switch to 84x84 images
doc tweaks
print timesteps in tune status
This commit is contained in:
Eric Liang
2018-08-20 15:28:03 -07:00
committed by GitHub
parent 880ef1bd21
commit fbe6c59f72
34 changed files with 220 additions and 129 deletions
+2 -1
View File
@@ -19,7 +19,8 @@ from ray.rllib.evaluation.sample_batch import SampleBatch
def _register_all():
for key in [
"PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG", "APEX_DDPG",
"IMPALA", "__fake", "__sigmoid_fake_data", "__parameter_tuning"
"IMPALA", "A2C", "__fake", "__sigmoid_fake_data",
"__parameter_tuning"
]:
from ray.rllib.agents.agent import get_agent_class
register_trainable(key, get_agent_class(key))
+2 -1
View File
@@ -1,3 +1,4 @@
from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG
from ray.rllib.agents.a3c.a2c import A2CAgent
__all__ = ["A3CAgent", "DEFAULT_CONFIG"]
__all__ = ["A2CAgent", "A3CAgent", "DEFAULT_CONFIG"]
+41
View File
@@ -0,0 +1,41 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG as A3C_CONFIG
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
A2C_DEFAULT_CONFIG = merge_dicts(
A3C_CONFIG,
{
"gpu": False,
"sample_batch_size": 20,
"min_iter_time_s": 10,
"optimizer": {
"timesteps_per_batch": 200,
},
},
)
class A2CAgent(A3CAgent):
"""Synchronous variant of the A3CAgent."""
_agent_name = "A2C"
_default_config = A2C_DEFAULT_CONFIG
def _make_optimizer(self):
return SyncSamplesOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=1 if cf["gpu"] else 0,
extra_cpu=cf["num_workers"],
extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0)
+15 -12
View File
@@ -4,6 +4,7 @@ from __future__ import print_function
import pickle
import os
import time
import ray
from ray.rllib.agents.agent import Agent, with_common_config
@@ -30,6 +31,8 @@ DEFAULT_CONFIG = with_common_config({
"use_gpu_for_workers": False,
# Whether to emit extra summary stats
"summarize": False,
# Min time per iteration
"min_iter_time_s": 5,
# Workers sample async. Note that this increases the effective
# sample_batch_size by up to 5x due to async buffering of batches.
"sample_async": True,
@@ -44,7 +47,7 @@ DEFAULT_CONFIG = with_common_config({
# (Image statespace) - Each pixel
"zero_mean": False,
# (Image statespace) - Converts image to (dim, dim, C)
"dim": 80,
"dim": 84,
# (Image statespace) - Converts image shape to (C, dim, dim)
"channel_major": False,
},
@@ -56,11 +59,6 @@ DEFAULT_CONFIG = with_common_config({
"allow_growth": True,
},
},
# Arguments to pass to the rllib optimizer
"optimizer": {
# Number of gradients applied for each `train` step
"grads_per_step": 100,
},
})
@@ -93,15 +91,20 @@ class A3CAgent(Agent):
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, policy_cls, self.config["num_workers"],
{"num_gpus": 1 if self.config["use_gpu_for_workers"] else 0})
self.optimizer = AsyncGradientsOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
self.optimizer = self._make_optimizer()
def _make_optimizer(self):
return AsyncGradientsOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
def _train(self):
prev_steps = self.optimizer.num_steps_sampled
self.optimizer.step()
FilterManager.synchronize(self.local_evaluator.filters,
self.remote_evaluators)
start = time.time()
while time.time() - start < self.config["min_iter_time_s"]:
self.optimizer.step()
FilterManager.synchronize(self.local_evaluator.filters,
self.remote_evaluators)
result = self.optimizer.collect_metrics()
result.update(timesteps_this_iter=self.optimizer.num_steps_sampled -
prev_steps)
+7 -1
View File
@@ -31,8 +31,10 @@ COMMON_CONFIG = {
"sample_async": False,
# Which observation filter to apply to the observation
"observation_filter": "NoFilter",
# Whether to clip rewards prior to experience postprocessing
"clip_rewards": True,
# Whether to use rllib or deepmind preprocessors
"preprocessor_pref": "rllib",
"preprocessor_pref": "deepmind",
# Arguments to pass to the env creator
"env_config": {},
# Environment name can also be passed via config
@@ -136,6 +138,7 @@ class Agent(Trainable):
compress_observations=config["compress_observations"],
num_envs=config["num_envs_per_worker"],
observation_filter=config["observation_filter"],
clip_rewards=config["clip_rewards"],
env_config=config["env_config"],
model_config=config["model"],
policy_config=config,
@@ -370,6 +373,9 @@ def get_agent_class(alg):
elif alg == "A3C":
from ray.rllib.agents import a3c
return a3c.A3CAgent
elif alg == "A2C":
from ray.rllib.agents import a3c
return a3c.A2CAgent
elif alg == "BC":
from ray.rllib.agents import bc
return bc.BCAgent
+1 -1
View File
@@ -30,7 +30,7 @@ DEFAULT_CONFIG = {
# (Image statespace) - Each pixel
"zero_mean": False,
# (Image statespace) - Converts image to (dim, dim, C)
"dim": 80,
"dim": 84,
# (Image statespace) - Converts image shape to (C, dim, dim)
"channel_major": False
},
+1 -3
View File
@@ -10,7 +10,7 @@ from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule
OPTIMIZER_SHARED_CONFIGS = [
"buffer_size", "prioritized_replay", "prioritized_replay_alpha",
"prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size",
"train_batch_size", "learning_starts", "clip_rewards"
"train_batch_size", "learning_starts"
]
DEFAULT_CONFIG = with_common_config({
@@ -61,8 +61,6 @@ DEFAULT_CONFIG = with_common_config({
"prioritized_replay_beta": 0.4,
# Epsilon to add to the TD errors when updating priorities.
"prioritized_replay_eps": 1e-6,
# Whether to clip rewards to [-1, 1] prior to adding to the replay buffer.
"clip_rewards": True,
# Whether to LZ4 compress observations
"compress_observations": False,
@@ -6,7 +6,7 @@ from ray.rllib.models import ModelCatalog
from ray.rllib.env.atari_wrappers import wrap_deepmind
def wrap_dqn(env, options, random_starts):
def wrap_dqn(env, options):
"""Apply a common set of wrappers for DQN."""
is_atari = hasattr(env.unwrapped, "ale")
@@ -14,7 +14,6 @@ def wrap_dqn(env, options, random_starts):
# Override atari default to use the deepmind wrappers.
# TODO(ekl) this logic should be pushed to the catalog.
if is_atari and "custom_preprocessor" not in options:
return wrap_deepmind(
env, random_starts=random_starts, dim=options.get("dim", 80))
return wrap_deepmind(env, dim=options.get("dim", 84))
return ModelCatalog.get_preprocessor_as_wrapper(env, options)
+1 -3
View File
@@ -18,7 +18,7 @@ from ray.tune.trial import Resources
OPTIMIZER_SHARED_CONFIGS = [
"buffer_size", "prioritized_replay", "prioritized_replay_alpha",
"prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size",
"train_batch_size", "learning_starts", "clip_rewards"
"train_batch_size", "learning_starts"
]
DEFAULT_CONFIG = with_common_config({
@@ -61,8 +61,6 @@ DEFAULT_CONFIG = with_common_config({
"prioritized_replay_beta": 0.4,
# Epsilon to add to the TD errors when updating priorities.
"prioritized_replay_eps": 1e-6,
# Whether to clip rewards to [-1, 1] prior to adding to the replay buffer.
"clip_rewards": True,
# Whether to LZ4 compress observations
"compress_observations": True,
+1 -3
View File
@@ -49,12 +49,10 @@ DEFAULT_CONFIG = with_common_config({
"entropy_coeff": -0.01,
# Model and preprocessor options.
"clip_rewards": True,
"preprocessor_pref": "deepmind",
"model": {
"use_lstm": False,
"max_seq_len": 20,
"dim": 80,
"dim": 84,
},
})
@@ -137,11 +137,6 @@ class VTracePolicyGraph(TFPolicyGraph):
rs,
[1, 0] + list(range(2, 1 + int(tf.shape(tensor).shape[0]))))
if self.config["clip_rewards"]:
clipped_rewards = tf.clip_by_value(rewards, -1, 1)
else:
clipped_rewards = rewards
# Inputs are reshaped from [B * T] => [T - 1, B] for V-trace calc.
self.loss = VTraceLoss(
actions=to_batches(actions)[:-1],
@@ -151,7 +146,7 @@ class VTracePolicyGraph(TFPolicyGraph):
behaviour_logits=to_batches(behaviour_logits)[:-1],
target_logits=to_batches(self.model.outputs)[:-1],
discount=config["gamma"],
rewards=to_batches(clipped_rewards)[:-1],
rewards=to_batches(rewards)[:-1],
values=to_batches(values)[:-1],
bootstrap_value=to_batches(values)[-1],
vf_loss_coeff=self.config["vf_loss_coeff"],
+24 -17
View File
@@ -11,7 +11,7 @@ def is_atari(env):
class NoopResetEnv(gym.Wrapper):
def __init__(self, env, noop_max=30, random_starts=False):
def __init__(self, env, noop_max=30):
"""Sample initial states by taking random number of no-ops on reset.
No-op is assumed to be action 0.
"""
@@ -19,7 +19,6 @@ class NoopResetEnv(gym.Wrapper):
self.noop_max = noop_max
self.override_num_noops = None
self.noop_action = 0
self.random_starts = random_starts
assert env.unwrapped.get_action_meanings()[0] == 'NOOP'
def reset(self, **kwargs):
@@ -32,11 +31,7 @@ class NoopResetEnv(gym.Wrapper):
assert noops > 0
obs = None
for _ in range(noops):
if self.random_starts:
action = np.random.randint(self.env.action_space.n)
else:
action = self.noop_action
obs, _, done, _ = self.env.step(action)
obs, _, done, _ = self.env.step(self.noop_action)
if done:
obs = self.env.reset(**kwargs)
return obs
@@ -93,9 +88,9 @@ class EpisodicLifeEnv(gym.Wrapper):
# then update lives to handle bonus lives
lives = self.env.unwrapped.ale.lives()
if lives < self.lives and lives > 0:
# for Qbert sometimes we stay in lives == 0 condtion for a few
# frames so its important to keep lives > 0, so that we only reset
# once the environment advertises done.
# for Qbert sometimes we stay in lives == 0 condtion for a few fr
# so its important to keep lives > 0, so that we only reset once
# the environment advertises done.
done = True
self.lives = lives
return obs, reward, done, info
@@ -150,13 +145,13 @@ class WarpFrame(gym.ObservationWrapper):
def __init__(self, env, dim):
"""Warp frames to the specified size (dim x dim)."""
gym.ObservationWrapper.__init__(self, env)
self.width = dim # in rllib we use 80
self.width = dim
self.height = dim
self.observation_space = spaces.Box(
low=0,
high=255,
shape=(self.height, self.width, 1),
dtype=np.float32)
dtype=np.uint8)
def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
@@ -176,7 +171,7 @@ class FrameStack(gym.Wrapper):
low=0,
high=255,
shape=(shp[0], shp[1], shp[2] * k),
dtype=np.float32)
dtype=env.observation_space.dtype)
def reset(self):
ob = self.env.reset()
@@ -194,22 +189,34 @@ class FrameStack(gym.Wrapper):
return np.concatenate(self.frames, axis=2)
def wrap_deepmind(env, random_starts=True, dim=80):
class ScaledFloatFrame(gym.ObservationWrapper):
def __init__(self, env):
gym.ObservationWrapper.__init__(self, env)
self.observation_space = gym.spaces.Box(
low=0, high=1, shape=env.observation_space.shape, dtype=np.float32)
def observation(self, observation):
# careful! This undoes the memory optimization, use
# with smaller replay buffers only.
return np.array(observation).astype(np.float32) / 255.0
def wrap_deepmind(env, dim=84):
"""Configure environment for DeepMind-style Atari.
Note that we assume reward clipping is done outside the wrapper.
Args:
random_starts (bool): Start with random actions instead of noops.
dim (int): Dimension to resize observations to (dim x dim).
"""
env = NoopResetEnv(env, noop_max=30, random_starts=random_starts)
env = NoopResetEnv(env, noop_max=30)
if 'NoFrameskip' in env.spec.id:
env = MaxAndSkipEnv(env, skip=4)
env = EpisodicLifeEnv(env)
if 'FIRE' in env.unwrapped.get_action_meanings():
env = FireResetEnv(env)
env = WarpFrame(env, dim)
# env = ClipRewardEnv(env) # reward clipping is handled by DQN replay
# env = ScaledFloatFrame(env) # TODO: use for dqn?
# env = ClipRewardEnv(env) # reward clipping is handled by policy eval
env = FrameStack(env, 4)
return env
+3 -1
View File
@@ -6,6 +6,7 @@ import numpy as np
import collections
import ray
from ray.rllib.evaluation.sample_batch import DEFAULT_POLICY_ID
def collect_metrics(local_evaluator, remote_evaluators=[]):
@@ -24,7 +25,8 @@ def collect_metrics(local_evaluator, remote_evaluators=[]):
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
for (_, policy_id), reward in episode.agent_rewards.items():
policy_rewards[policy_id].append(reward)
if policy_id != DEFAULT_POLICY_ID:
policy_rewards[policy_id].append(reward)
if episode_rewards:
min_reward = min(episode_rewards)
max_reward = max(episode_rewards)
@@ -91,11 +91,12 @@ class PolicyEvaluator(EvaluatorInterface):
batch_steps=100,
batch_mode="truncate_episodes",
episode_horizon=None,
preprocessor_pref="rllib",
preprocessor_pref="deepmind",
sample_async=False,
compress_observations=False,
num_envs=1,
observation_filter="NoFilter",
clip_rewards=False,
env_config=None,
model_config=None,
policy_config=None,
@@ -147,6 +148,8 @@ class PolicyEvaluator(EvaluatorInterface):
and vectorize the computation of actions. This has no effect if
if the env already implements VectorEnv.
observation_filter (str): Name of observation filter to use.
clip_rewards (bool): Whether to clip rewards to [-1, 1] prior to
experience postprocessing.
env_config (dict): Config to pass to the env creator.
model_config (dict): Config to use when creating the policy model.
policy_config (dict): Config to pass to the policy. In the
@@ -181,7 +184,7 @@ class PolicyEvaluator(EvaluatorInterface):
preprocessor_pref == "deepmind":
def wrap(env):
return wrap_deepmind(env, dim=model_config.get("dim", 80))
return wrap_deepmind(env, dim=model_config.get("dim", 84))
else:
def wrap(env):
@@ -245,6 +248,7 @@ class PolicyEvaluator(EvaluatorInterface):
self.policy_map,
policy_mapping_fn,
self.filters,
clip_rewards,
batch_steps,
horizon=episode_horizon,
pack=pack_episodes,
@@ -256,6 +260,7 @@ class PolicyEvaluator(EvaluatorInterface):
self.policy_map,
policy_mapping_fn,
self.filters,
clip_rewards,
batch_steps,
horizon=episode_horizon,
pack=pack_episodes,
+6 -1
View File
@@ -61,14 +61,16 @@ class MultiAgentSampleBatchBuilder(object):
corresponding policy batch for the agent's policy.
"""
def __init__(self, policy_map):
def __init__(self, policy_map, clip_rewards):
"""Initialize a MultiAgentSampleBatchBuilder.
Arguments:
policy_map (dict): Maps policy ids to policy graph instances.
clip_rewards (bool): Whether to clip rewards before postprocessing.
"""
self.policy_map = policy_map
self.clip_rewards = clip_rewards
self.policy_builders = {
k: SampleBatchBuilder()
for k in policy_map.keys()
@@ -113,6 +115,9 @@ class MultiAgentSampleBatchBuilder(object):
# Apply postprocessor
post_batches = {}
if self.clip_rewards:
for _, (_, pre_batch) in pre_batches.items():
pre_batch["rewards"] = np.sign(pre_batch["rewards"])
for agent_id, (_, pre_batch) in pre_batches.items():
other_batches = pre_batches.copy()
del other_batches[agent_id]
+8 -3
View File
@@ -34,6 +34,7 @@ class SyncSampler(object):
policies,
policy_mapping_fn,
obs_filters,
clip_rewards,
num_local_steps,
horizon=None,
pack=False,
@@ -48,7 +49,7 @@ class SyncSampler(object):
self.rollout_provider = _env_runner(
self.async_vector_env, self.extra_batches.put, self.policies,
self.policy_mapping_fn, self.num_local_steps, self.horizon,
self._obs_filters, pack, tf_sess)
self._obs_filters, clip_rewards, pack, tf_sess)
self.metrics_queue = queue.Queue()
def get_data(self):
@@ -89,6 +90,7 @@ class AsyncSampler(threading.Thread):
policies,
policy_mapping_fn,
obs_filters,
clip_rewards,
num_local_steps,
horizon=None,
pack=False,
@@ -106,6 +108,7 @@ class AsyncSampler(threading.Thread):
self.policies = policies
self.policy_mapping_fn = policy_mapping_fn
self._obs_filters = obs_filters
self.clip_rewards = clip_rewards
self.daemon = True
self.pack = pack
self.tf_sess = tf_sess
@@ -121,7 +124,7 @@ class AsyncSampler(threading.Thread):
rollout_provider = _env_runner(
self.async_vector_env, self.extra_batches.put, self.policies,
self.policy_mapping_fn, self.num_local_steps, self.horizon,
self._obs_filters, self.pack, self.tf_sess)
self._obs_filters, self.clip_rewards, self.pack, self.tf_sess)
while True:
# The timeout variable exists because apparently, if one worker
# dies, the other workers won't die with it, unless the timeout is
@@ -181,6 +184,7 @@ def _env_runner(async_vector_env,
num_local_steps,
horizon,
obs_filters,
clip_rewards,
pack,
tf_sess=None):
"""This implements the common experience collection logic.
@@ -197,6 +201,7 @@ def _env_runner(async_vector_env,
horizon (int): Horizon of the episode.
obs_filters (dict): Map of policy id to filter used to process
observations for the policy.
clip_rewards (bool): Whether to clip rewards before postprocessing.
pack (bool): Whether to pack multiple episodes into each batch. This
guarantees batches will be exactly `num_local_steps` in size.
tf_sess (Session|None): Optional tensorflow session to use for batching
@@ -223,7 +228,7 @@ def _env_runner(async_vector_env,
if batch_builder_pool:
return batch_builder_pool.pop()
else:
return MultiAgentSampleBatchBuilder(policies)
return MultiAgentSampleBatchBuilder(policies, clip_rewards)
def new_episode():
return MultiAgentEpisode(policies, policy_mapping_fn,
+3 -3
View File
@@ -34,7 +34,7 @@ class AtariPixelPreprocessor(Preprocessor):
def _init(self):
self._grayscale = self._options.get("grayscale", False)
self._zero_mean = self._options.get("zero_mean", True)
self._dim = self._options.get("dim", 80)
self._dim = self._options.get("dim", 84)
self._channel_major = self._options.get("channel_major", False)
if self._grayscale:
self.shape = (self._dim, self._dim, 1)
@@ -48,8 +48,8 @@ class AtariPixelPreprocessor(Preprocessor):
def transform(self, observation):
"""Downsamples images from (210, 160, 3) by the configured factor."""
scaled = observation[25:-25, :, :]
if self._dim < 80:
scaled = cv2.resize(scaled, (80, 80))
if self._dim < 84:
scaled = cv2.resize(scaled, (84, 84))
# OpenAI: Resize by half, then down to 42x42 (essentially mipmapping).
# If we resize directly we lose pixels that, when mapped to 42x42,
# aren't close enough to the pixel boundary.
+1 -1
View File
@@ -21,7 +21,7 @@ class VisionNetwork(Model):
filters = options.get("conv_filters", [
[16, [8, 8], 4],
[32, [4, 4], 2],
[512, [10, 10], 1],
[512, [11, 11], 1],
])
layers = []
in_channels, in_size = inputs[0], inputs[1:]
+6 -6
View File
@@ -47,19 +47,19 @@ class VisionNetwork(Model):
def get_filter_config(options):
filters_80x80 = [
filters_84x84 = [
[16, [8, 8], 4],
[32, [4, 4], 2],
[512, [10, 10], 1],
[256, [11, 11], 1],
]
filters_42x42 = [
[16, [4, 4], 2],
[32, [4, 4], 2],
[512, [11, 11], 1],
[256, [11, 11], 1],
]
dim = options.get("dim", 80)
if dim == 80:
return filters_80x80
dim = options.get("dim", 84)
if dim == 84:
return filters_84x84
elif dim == 42:
return filters_42x42
else:
@@ -36,8 +36,7 @@ class ReplayActor(object):
def __init__(self, num_shards, learning_starts, buffer_size,
train_batch_size, prioritized_replay_alpha,
prioritized_replay_beta, prioritized_replay_eps,
clip_rewards):
prioritized_replay_beta, prioritized_replay_eps):
self.replay_starts = learning_starts // num_shards
self.buffer_size = buffer_size // num_shards
self.train_batch_size = train_batch_size
@@ -45,9 +44,7 @@ class ReplayActor(object):
self.prioritized_replay_eps = prioritized_replay_eps
self.replay_buffer = PrioritizedReplayBuffer(
self.buffer_size,
alpha=prioritized_replay_alpha,
clip_rewards=clip_rewards)
self.buffer_size, alpha=prioritized_replay_alpha)
# Metrics
self.add_batch_timer = TimerStat()
@@ -158,7 +155,6 @@ class AsyncReplayOptimizer(PolicyOptimizer):
sample_batch_size=50,
num_replay_buffer_shards=1,
max_weight_sync_delay=400,
clip_rewards=True,
debug=False):
self.debug = debug
@@ -171,9 +167,13 @@ class AsyncReplayOptimizer(PolicyOptimizer):
self.learner.start()
self.replay_actors = create_colocated(ReplayActor, [
num_replay_buffer_shards, learning_starts, buffer_size,
train_batch_size, prioritized_replay_alpha,
prioritized_replay_beta, prioritized_replay_eps, clip_rewards
num_replay_buffer_shards,
learning_starts,
buffer_size,
train_batch_size,
prioritized_replay_alpha,
prioritized_replay_beta,
prioritized_replay_eps,
], num_replay_buffer_shards)
# Stats
+4 -11
View File
@@ -12,7 +12,7 @@ from ray.rllib.utils.window_stat import WindowStat
class ReplayBuffer(object):
def __init__(self, size, clip_rewards):
def __init__(self, size):
"""Create Prioritized Replay buffer.
Parameters
@@ -30,15 +30,11 @@ class ReplayBuffer(object):
self._num_sampled = 0
self._evicted_hit_stats = WindowStat("evicted_hit", 1000)
self._est_size_bytes = 0
self._clip_rewards = clip_rewards
def __len__(self):
return len(self._storage)
def add(self, obs_t, action, reward, obs_tp1, done, weight):
if self._clip_rewards:
reward = np.sign(reward)
data = (obs_t, action, reward, obs_tp1, done)
self._num_added += 1
@@ -109,7 +105,7 @@ class ReplayBuffer(object):
class PrioritizedReplayBuffer(ReplayBuffer):
def __init__(self, size, alpha, clip_rewards):
def __init__(self, size, alpha):
"""Create Prioritized Replay buffer.
Parameters
@@ -125,7 +121,7 @@ class PrioritizedReplayBuffer(ReplayBuffer):
--------
ReplayBuffer.__init__
"""
super(PrioritizedReplayBuffer, self).__init__(size, clip_rewards)
super(PrioritizedReplayBuffer, self).__init__(size)
assert alpha > 0
self._alpha = alpha
@@ -140,8 +136,6 @@ class PrioritizedReplayBuffer(ReplayBuffer):
def add(self, obs_t, action, reward, obs_tp1, done, weight):
"""See ReplayBuffer.store_effect"""
if self._clip_rewards:
reward = np.sign(reward)
idx = self._next_idx
super(PrioritizedReplayBuffer, self).add(obs_t, action, reward,
@@ -155,8 +149,7 @@ class PrioritizedReplayBuffer(ReplayBuffer):
res = []
for _ in range(batch_size):
# TODO(szymon): should we ensure no repeats?
mass = random.random() * self._it_sum.sum(0,
len(self._storage) - 1)
mass = random.random() * self._it_sum.sum(0, len(self._storage))
idx = self._it_sum.find_prefixsum_idx(mass)
res.append(idx)
return res
@@ -31,8 +31,7 @@ class SyncReplayOptimizer(PolicyOptimizer):
prioritized_replay_beta=0.4,
prioritized_replay_eps=1e-6,
train_batch_size=32,
sample_batch_size=4,
clip_rewards=True):
sample_batch_size=4):
self.replay_starts = learning_starts
self.prioritized_replay_beta = prioritized_replay_beta
@@ -51,13 +50,11 @@ class SyncReplayOptimizer(PolicyOptimizer):
def new_buffer():
return PrioritizedReplayBuffer(
buffer_size,
alpha=prioritized_replay_alpha,
clip_rewards=clip_rewards)
buffer_size, alpha=prioritized_replay_alpha)
else:
def new_buffer():
return ReplayBuffer(buffer_size, clip_rewards)
return ReplayBuffer(buffer_size)
self.replay_buffers = collections.defaultdict(new_buffer)
@@ -43,6 +43,7 @@ class SyncSamplesOptimizer(PolicyOptimizer):
else:
samples.append(self.local_evaluator.sample())
samples = SampleBatch.concat_samples(samples)
self.sample_timer.push_units_processed(samples.count)
with self.grad_timer:
for i in range(self.num_sgd_iter):
@@ -64,5 +65,7 @@ class SyncSamplesOptimizer(PolicyOptimizer):
3),
"opt_peak_throughput": round(self.grad_timer.mean_throughput,
3),
"sample_peak_throughput": round(
self.sample_timer.mean_throughput, 3),
"opt_samples": round(self.grad_timer.mean_units_processed, 3),
})
+1 -1
View File
@@ -74,7 +74,7 @@ class ModelCatalogTest(unittest.TestCase):
with tf.variable_scope("test2"):
p2 = ModelCatalog.get_model(
np.zeros((10, 80, 80, 3), dtype=np.float32), 5)
np.zeros((10, 84, 84, 3), dtype=np.float32), 5)
self.assertEqual(type(p2), VisionNetwork)
def testCustomModel(self):
@@ -120,6 +120,27 @@ class TestPolicyEvaluator(unittest.TestCase):
self.assertEqual(results, [5, 5, 5])
self.assertEqual(results2, [(0, 5), (1, 5), (2, 5)])
def testRewardClipping(self):
# clipping on
ev = PolicyEvaluator(
env_creator=lambda _: MockEnv2(episode_length=10),
policy_graph=MockPolicyGraph,
clip_rewards=True,
batch_mode="complete_episodes")
self.assertEqual(max(ev.sample()["rewards"]), 1)
result = collect_metrics(ev, [])
self.assertEqual(result["episode_reward_mean"], 1000)
# clipping off
ev2 = PolicyEvaluator(
env_creator=lambda _: MockEnv2(episode_length=10),
policy_graph=MockPolicyGraph,
clip_rewards=False,
batch_mode="complete_episodes")
self.assertEqual(max(ev2.sample()["rewards"]), 100)
result2 = collect_metrics(ev2, [])
self.assertEqual(result2["episode_reward_mean"], 1000)
def testMetrics(self):
ev = PolicyEvaluator(
env_creator=lambda _: MockEnv(episode_length=10),
@@ -32,7 +32,7 @@ ACTION_SPACES_TO_TEST = {
OBSERVATION_SPACES_TO_TEST = {
"discrete": Discrete(5),
"vector": Box(0.0, 1.0, (5, ), dtype=np.float32),
"image": Box(0.0, 1.0, (80, 80, 1), dtype=np.float32),
"image": Box(0.0, 1.0, (84, 84, 1), dtype=np.float32),
"atari": Box(0.0, 1.0, (210, 160, 3), dtype=np.float32),
"atari_ram": Box(0.0, 1.0, (128, ), dtype=np.float32),
"simple_tuple": Tuple([
+5 -4
View File
@@ -9,7 +9,8 @@ import os
import yaml
from ray.tune.log_sync import get_syncer
from ray.tune.result import NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S
from ray.tune.result import NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S, \
TIMESTEPS_TOTAL
try:
import tensorflow as tf
@@ -132,13 +133,13 @@ class _TFLogger(Logger):
del tmp[k] # not useful to tf log these
values = to_tf_values(tmp, ["ray", "tune"])
train_stats = tf.Summary(value=values)
self._file_writer.add_summary(train_stats, result[TRAINING_ITERATION])
t = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION]
self._file_writer.add_summary(train_stats, t)
iteration_value = to_tf_values({
"training_iteration": result[TRAINING_ITERATION]
}, ["ray", "tune"])
iteration_stats = tf.Summary(value=iteration_value)
self._file_writer.add_summary(iteration_stats,
result[TRAINING_ITERATION])
self._file_writer.add_summary(iteration_stats, t)
def flush(self):
self._file_writer.flush()
+3
View File
@@ -270,6 +270,9 @@ class Trial(object):
int(self.last_result.get(TIME_TOTAL_S)))
]
if self.last_result.get("timesteps_total") is not None:
pieces.append('{} ts'.format(self.last_result["timesteps_total"]))
if self.last_result.get("episode_reward_mean") is not None:
pieces.append('{} rew'.format(
format(self.last_result["episode_reward_mean"], '.3g')))