[RLlib] DQN torch version. (#7597)

* Fix.

* Rollback.

* WIP.

* WIP.

* WIP.

* WIP.

* WIP.

* WIP.

* WIP.

* WIP.

* Fix.

* Fix.

* Fix.

* Fix.

* Fix.

* WIP.

* WIP.

* Fix.

* Test case fixes.

* Test case fixes and LINT.

* Test case fixes and LINT.

* Rollback.

* WIP.

* WIP.

* Test case fixes.

* Fix.

* Fix.

* Fix.

* Add regression test for DQN w/ param noise.

* Fixes and LINT.

* Fixes and LINT.

* Fixes and LINT.

* Fixes and LINT.

* Fixes and LINT.

* Comment

* Regression test case.

* WIP.

* WIP.

* LINT.

* LINT.

* WIP.

* Fix.

* Fix.

* Fix.

* LINT.

* Fix (SAC does currently not support eager).

* Fix.

* WIP.

* LINT.

* Update rllib/evaluation/sampler.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Update rllib/evaluation/sampler.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Update rllib/utils/exploration/exploration.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Update rllib/utils/exploration/exploration.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* WIP.

* WIP.

* Fix.

* LINT.

* LINT.

* Fix and LINT.

* WIP.

* WIP.

* WIP.

* WIP.

* Fix.

* LINT.

* Fix.

* Fix and LINT.

* Update rllib/utils/exploration/exploration.py

* Update rllib/policy/dynamic_tf_policy.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Update rllib/policy/dynamic_tf_policy.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Update rllib/policy/dynamic_tf_policy.py

Co-Authored-By: Eric Liang <ekhliang@gmail.com>

* Fixes.

* WIP.

* LINT.

* Fixes and LINT.

* LINT and fixes.

* LINT.

* Move action_dist back into torch extra_action_out_fn and LINT.

* Working SimpleQ learning cartpole on both torch AND tf.

* Working Rainbow learning cartpole on tf.

* Working Rainbow learning cartpole on tf.

* WIP.

* LINT.

* LINT.

* Update docs and add torch to APEX test.

* LINT.

* Fix.

* LINT.

* Fix.

* Fix.

* Fix and docstrings.

* Fix broken RLlib tests in master.

* Split BAZEL learning tests into cartpole and pendulum (reached the 60min barrier).

* Fix error_outputs option in BAZEL for RLlib regression tests.

* Fix.

* Tune param-noise tests.

* LINT.

* Fix.

* Fix.

* test

* test

* test

* Fix.

* Fix.

* WIP.

* WIP.

* WIP.

* WIP.

* LINT.

* WIP.

Co-authored-by: Eric Liang <ekhliang@gmail.com>
This commit is contained in:
Sven Mika
2020-04-06 20:56:16 +02:00
committed by GitHub
parent f63b4c1110
commit 22ccc43670
56 changed files with 1292 additions and 277 deletions
+2 -2
View File
@@ -192,7 +192,7 @@ matrix:
- ./ci/suppress_output ./ci/travis/install-ray.sh
script:
- if [ $RAY_CI_RLLIB_AFFECTED != "1" ]; then exit; fi
- travis_wait 60 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors rllib/...
- travis_wait 90 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=streamed rllib/...
# RLlib: Learning tests with tf=1.x (from rllib/tuned_examples/regression_tests/*.yaml).
# Requested by Edi (MS): Test all learning capabilities with tf1.x
@@ -213,7 +213,7 @@ matrix:
- ./ci/suppress_output ./ci/travis/install-ray.sh
script:
- if [ $RAY_CI_RLLIB_FULL_AFFECTED != "1" ]; then exit; fi
- travis_wait 60 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors rllib/...
- travis_wait 90 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=streamed rllib/...
# RLlib: Quick Agent train.py runs (compilation & running, no(!) learning).
# Agent single tests (compilation, loss-funcs, etc..).
+2 -2
View File
@@ -37,7 +37,7 @@ High-throughput architectures
Distributed Prioritized Experience Replay (Ape-X)
-------------------------------------------------
|tensorflow|
|pytorch| |tensorflow|
`[paper] <https://arxiv.org/abs/1803.00933>`__
`[implementation] <https://github.com/ray-project/ray/blob/master/rllib/agents/dqn/apex.py>`__
Ape-X variations of DQN, DDPG, and QMIX (`APEX_DQN <https://github.com/ray-project/ray/blob/master/rllib/agents/dqn/apex.py>`__, `APEX_DDPG <https://github.com/ray-project/ray/blob/master/rllib/agents/ddpg/apex.py>`__, `APEX_QMIX <https://github.com/ray-project/ray/blob/master/rllib/agents/qmix/apex.py>`__) use a single GPU learner and many CPU workers for experience collection. Experience collection can scale to hundreds of CPU workers due to the distributed prioritization of experience prior to storage in replay buffers.
@@ -247,7 +247,7 @@ Tuned examples: `Pendulum-v0 <https://github.com/ray-project/ray/blob/master/rll
Deep Q Networks (DQN, Rainbow, Parametric DQN)
----------------------------------------------
|tensorflow|
|pytorch| |tensorflow|
`[paper] <https://arxiv.org/abs/1312.5602>`__ `[implementation] <https://github.com/ray-project/ray/blob/master/rllib/agents/dqn/dqn.py>`__
RLlib DQN is implemented using the SyncReplayOptimizer. The algorithm can be scaled by increasing the number of workers, using the AsyncGradientsOptimizer for async DQN, or using Ape-X. Memory usage is reduced by compressing samples in the replay buffer with LZ4. All of the DQN improvements evaluated in `Rainbow <https://arxiv.org/abs/1710.02298>`__ are available, though not all are enabled by default. See also how to use `parametric-actions in DQN <rllib-models.html#variable-length-parametric-action-spaces>`__.
+9 -3
View File
@@ -46,7 +46,7 @@ py_test(
tags = ["learning_tests", "learning_tests_cartpole"],
size = "enormous", # = 60min timeout
srcs = ["tests/run_regression_tests.py"],
data = glob(["tuned_examples/regression_tests/cartpole*.yaml"]),
data = glob(["tuned_examples/regression_tests/cartpole-*.yaml"]),
# Pass `BAZEL` option and the path to look for yaml regression files.
args = ["BAZEL", "tuned_examples/regression_tests"]
)
@@ -57,7 +57,7 @@ py_test(
tags = ["learning_tests", "learning_tests_pendulum"],
size = "enormous", # = 60min timeout
srcs = ["tests/run_regression_tests.py"],
data = glob(["tuned_examples/regression_tests/pendulum*.yaml"]),
data = glob(["tuned_examples/regression_tests/pendulum-*.yaml"]),
# Pass `BAZEL` option and the path to look for yaml regression files.
args = ["BAZEL", "tuned_examples/regression_tests"]
)
@@ -85,13 +85,19 @@ py_test(
srcs = ["agents/ddpg/tests/test_ddpg.py"]
)
# DQNTrainer
# DQNTrainer/SimpleQTrainer
py_test(
name = "test_dqn",
tags = ["agents_dir"],
size = "medium",
srcs = ["agents/dqn/tests/test_dqn.py"]
)
py_test(
name = "test_simple_q",
tags = ["agents_dir"],
size = "medium",
srcs = ["agents/dqn/tests/test_simple_q.py"]
)
# APEXTrainer
py_test(
+1
View File
@@ -193,5 +193,6 @@ DDPGTrainer = GenericOffPolicyTrainer.with_updates(
name="DDPG",
default_config=DEFAULT_CONFIG,
default_policy=DDPGTFPolicy,
get_policy_class=None,
validate_config=validate_config,
)
+1 -1
View File
@@ -3,7 +3,7 @@ import numpy as np
import ray
import ray.experimental.tf_utils
from ray.rllib.agents.dqn.dqn_policy import postprocess_nstep_and_prio
from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY
from ray.rllib.models import ModelCatalog
+4 -1
View File
@@ -58,4 +58,7 @@ TD3_DEFAULT_CONFIG = DDPGTrainer.merge_trainer_configs(
})
TD3Trainer = DDPGTrainer.with_updates(
name="TD3", default_config=TD3_DEFAULT_CONFIG)
name="TD3",
default_config=TD3_DEFAULT_CONFIG,
get_policy_class=None,
)
+12 -7
View File
@@ -1,11 +1,16 @@
from ray.rllib.agents.dqn.apex import ApexTrainer
from ray.rllib.agents.dqn.dqn import DQNTrainer, SimpleQTrainer, DEFAULT_CONFIG
from ray.rllib.utils import renamed_agent
DQNAgent = renamed_agent(DQNTrainer)
ApexAgent = renamed_agent(ApexTrainer)
from ray.rllib.agents.dqn.dqn import DQNTrainer, DEFAULT_CONFIG
from ray.rllib.agents.dqn.simple_q import SimpleQTrainer, \
DEFAULT_CONFIG as SIMPLE_Q_DEFAULT_CONFIG
from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy
from ray.rllib.agents.dqn.simple_q_torch_policy import SimpleQTorchPolicy
__all__ = [
"DQNAgent", "ApexAgent", "ApexTrainer", "DQNTrainer", "DEFAULT_CONFIG",
"SimpleQTrainer"
"ApexTrainer",
"DQNTrainer",
"DEFAULT_CONFIG",
"SIMPLE_Q_DEFAULT_CONFIG",
"SimpleQTFPolicy",
"SimpleQTorchPolicy",
"SimpleQTrainer",
]
@@ -6,7 +6,7 @@ from ray.rllib.utils import try_import_tf
tf = try_import_tf()
class DistributionalQModel(TFModelV2):
class DistributionalQTFModel(TFModelV2):
"""Extension of standard TFModel to provide distributional Q values.
It also supports options for noisy nets and parameter space noise.
@@ -41,10 +41,15 @@ class DistributionalQModel(TFModelV2):
"""Initialize variables of this model.
Extra model kwargs:
q_hiddens (list): defines size of hidden layers for the q head.
These will be used to postprocess the model output for the
purposes of computing Q values.
dueling (bool): whether to build the state value head for DDQN
q_hiddens (List[int]): List of layer-sizes after(!) the
Advantages(A)/Value(V)-split. Hence, each of the A- and V-
branches will have this structure of Dense layers. To define
the NN before this A/V-split, use - as always -
config["model"]["fcnet_hiddens"].
dueling (bool): Whether to build the advantage(A)/value(V) heads
for DDQN. If True, Q-values are calculated as:
Q = (A - mean[A]) + V. If False, raw NN output is interpreted
as Q-values.
num_atoms (int): if >1, enables distributional DQN
use_noisy (bool): use noisy nets
v_min (float): min value support for distributional DQN
@@ -57,7 +62,7 @@ class DistributionalQModel(TFModelV2):
should be defined in subclasses of DistributionalQModel.
"""
super(DistributionalQModel, self).__init__(
super(DistributionalQTFModel, self).__init__(
obs_space, action_space, num_outputs, model_config, name)
# setup the Q head output (i.e., model for get_q_values)
@@ -87,6 +92,7 @@ class DistributionalQModel(TFModelV2):
# Avoid postprocessing the outputs. This enables custom models
# to be used for parametric action DQN.
action_out = model_out
if use_noisy:
action_scores = self._noisy_layer(
"output",
@@ -131,14 +137,12 @@ class DistributionalQModel(TFModelV2):
state_out = self._noisy_layer("dueling_hidden_%d" % i,
state_out, q_hiddens[i],
sigma0)
elif add_layer_norm:
state_out = tf.keras.layers.Dense(
units=q_hiddens[i], activation=tf.nn.relu)(state_out)
state_out = \
tf.keras.layers.LayerNormalization()(state_out)
else:
state_out = tf.keras.layers.Dense(
units=q_hiddens[i], activation=tf.nn.relu)(state_out)
if add_layer_norm:
state_out = tf.keras.layers.LayerNormalization()(
state_out)
if use_noisy:
state_score = self._noisy_layer(
"dueling_output",
+47 -20
View File
@@ -2,8 +2,8 @@ import logging
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy
from ray.rllib.agents.dqn.simple_q_policy import SimpleQPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy
from ray.rllib.optimizers import SyncReplayOptimizer
from ray.rllib.optimizers.replay_buffer import ReplayBuffer
from ray.rllib.utils.deprecation import deprecation_warning, DEPRECATED_VALUE
@@ -30,11 +30,11 @@ DEFAULT_CONFIG = with_common_config({
"sigma0": 0.5,
# Whether to use dueling dqn
"dueling": True,
# Dense-layer setup for each the advantage branch and the value branch
# in a dueling architecture.
"hiddens": [256],
# Whether to use double dqn
"double_q": True,
# Postprocess model outputs with these hidden layers to compute the
# state and action values. See also the model config in catalog.py.
"hiddens": [256],
# N-step Q learning
"n_step": 1,
@@ -90,13 +90,13 @@ DEFAULT_CONFIG = with_common_config({
# Adam epsilon hyper parameter
"adam_epsilon": 1e-8,
# If not None, clip gradients during optimization at this value
"grad_norm_clipping": 40,
"grad_clip": 40,
# How many steps of the model to sample before learning starts.
"learning_starts": 1000,
# Update the replay buffer with this many samples at once. Note that
# this setting applies per-worker if num_workers > 1.
"rollout_fragment_length": 4,
# Size of a batched sampled from replay buffer for training. Note that
# Size of a batch sampled from replay buffer for training. Note that
# if async_updates is set, then each worker returns gradients for a
# batch of this size.
"train_batch_size": 32,
@@ -122,6 +122,7 @@ DEFAULT_CONFIG = with_common_config({
"softmax_temp": DEPRECATED_VALUE,
"soft_q": DEPRECATED_VALUE,
"parameter_noise": DEPRECATED_VALUE,
"grad_norm_clipping": DEPRECATED_VALUE,
})
# __sphinx_doc_end__
# yapf: enable
@@ -133,20 +134,27 @@ def make_policy_optimizer(workers, config):
Returns:
SyncReplayOptimizer: Used for generic off-policy Trainers.
"""
# SimpleQ does not use a PR buffer.
kwargs = {"prioritized_replay": config.get("prioritized_replay", False)}
kwargs.update(**config["optimizer"])
if "prioritized_replay" in config:
kwargs.update({
"prioritized_replay_alpha": config["prioritized_replay_alpha"],
"prioritized_replay_beta": config["prioritized_replay_beta"],
"prioritized_replay_beta_annealing_timesteps": config[
"prioritized_replay_beta_annealing_timesteps"],
"final_prioritized_replay_beta": config[
"final_prioritized_replay_beta"],
"prioritized_replay_eps": config["prioritized_replay_eps"],
})
return SyncReplayOptimizer(
workers,
# TODO(sven): Move all PR-beta decays into Schedule components.
learning_starts=config["learning_starts"],
buffer_size=config["buffer_size"],
prioritized_replay=config["prioritized_replay"],
prioritized_replay_alpha=config["prioritized_replay_alpha"],
prioritized_replay_beta=config["prioritized_replay_beta"],
prioritized_replay_beta_annealing_timesteps=config[
"prioritized_replay_beta_annealing_timesteps"],
final_prioritized_replay_beta=config["final_prioritized_replay_beta"],
prioritized_replay_eps=config["prioritized_replay_eps"],
train_batch_size=config["train_batch_size"],
**config["optimizer"])
**kwargs)
def validate_config_and_setup_param_noise(config):
@@ -154,14 +162,14 @@ def validate_config_and_setup_param_noise(config):
Rewrites rollout_fragment_length to take into account n_step truncation.
"""
# PyTorch check.
if config["use_pytorch"]:
raise ValueError("DQN does not support PyTorch yet! Use tf instead.")
# TODO(sven): Remove at some point.
# Backward compatibility of epsilon-exploration config AND beta-annealing
# fraction settings (both based on schedule_max_timesteps, which is
# deprecated).
if config.get("grad_norm_clipping", DEPRECATED_VALUE) != DEPRECATED_VALUE:
deprecation_warning("grad_norm_clipping", "grad_clip")
config["grad_clip"] = config.pop("grad_norm_clipping")
schedule_max_timesteps = None
if config.get("schedule_max_timesteps", DEPRECATED_VALUE) != \
DEPRECATED_VALUE:
@@ -309,9 +317,27 @@ def execution_plan(workers, config):
return StandardMetricsReporting(train_op, workers, config)
def get_policy_class(config):
if config["use_pytorch"]:
from ray.rllib.agents.dqn.dqn_torch_policy import DQNTorchPolicy
return DQNTorchPolicy
else:
return DQNTFPolicy
def get_simple_policy_class(config):
if config["use_pytorch"]:
from ray.rllib.agents.dqn.simple_q_torch_policy import \
SimpleQTorchPolicy
return SimpleQTorchPolicy
else:
return SimpleQTFPolicy
GenericOffPolicyTrainer = build_trainer(
name="GenericOffPolicyAlgorithm",
default_policy=None,
get_policy_class=get_policy_class,
default_config=DEFAULT_CONFIG,
validate_config=validate_config_and_setup_param_noise,
get_initial_state=get_initial_state,
@@ -324,4 +350,5 @@ GenericOffPolicyTrainer = build_trainer(
DQNTrainer = GenericOffPolicyTrainer.with_updates(
name="DQN", default_policy=DQNTFPolicy, default_config=DEFAULT_CONFIG)
SimpleQTrainer = DQNTrainer.with_updates(default_policy=SimpleQPolicy)
SimpleQTrainer = DQNTrainer.with_updates(
default_policy=SimpleQTFPolicy, get_policy_class=get_simple_policy_class)
@@ -2,18 +2,19 @@ from gym.spaces import Discrete
import numpy as np
import ray
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
from ray.rllib.agents.dqn.simple_q_policy import TargetNetworkMixin
from ray.rllib.agents.dqn.distributional_q_tf_model import \
DistributionalQTFModel
from ray.rllib.agents.dqn.simple_q_tf_policy import TargetNetworkMixin
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.tf_action_dist import Categorical
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy import LearningRateSchedule
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.tf_action_dist import Categorical
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils.exploration import ParameterNoise
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.tf_ops import huber_loss, reduce_mean_ignore_inf, \
minimize_and_clip
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.tf_ops import make_tf_callable
tf = try_import_tf()
@@ -138,16 +139,16 @@ def build_q_model(policy, obs_space, action_space, config):
num_outputs = action_space.n
policy.q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
obs_space=obs_space,
action_space=action_space,
num_outputs=num_outputs,
model_config=config["model"],
framework="tf",
model_interface=DistributionalQModel,
model_interface=DistributionalQTFModel,
name=Q_SCOPE,
num_atoms=config["num_atoms"],
q_hiddens=config["hiddens"],
dueling=config["dueling"],
q_hiddens=config["hiddens"],
use_noisy=config["noisy"],
v_min=config["v_min"],
v_max=config["v_max"],
@@ -159,16 +160,16 @@ def build_q_model(policy, obs_space, action_space, config):
or config["exploration_config"]["type"] == "ParameterNoise")
policy.target_q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
obs_space=obs_space,
action_space=action_space,
num_outputs=num_outputs,
model_config=config["model"],
framework="tf",
model_interface=DistributionalQModel,
model_interface=DistributionalQTFModel,
name=Q_TARGET_SCOPE,
num_atoms=config["num_atoms"],
q_hiddens=config["hiddens"],
dueling=config["dueling"],
q_hiddens=config["hiddens"],
use_noisy=config["noisy"],
v_min=config["v_min"],
v_max=config["v_max"],
@@ -257,12 +258,12 @@ def adam_optimizer(policy, config):
def clip_gradients(policy, optimizer, loss):
if policy.config["grad_norm_clipping"] is not None:
if policy.config["grad_clip"] is not None:
grads_and_vars = minimize_and_clip(
optimizer,
loss,
var_list=policy.q_func_vars,
clip_val=policy.config["grad_norm_clipping"])
clip_val=policy.config["grad_clip"])
else:
grads_and_vars = optimizer.compute_gradients(
loss, var_list=policy.q_func_vars)
+165
View File
@@ -0,0 +1,165 @@
import numpy as np
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.utils import try_import_torch
torch, nn = try_import_torch()
class DQNTorchModel(TorchModelV2):
"""Extension of standard TorchModelV2 to provide dueling-Q functionality.
"""
def __init__(
self,
obs_space,
action_space,
num_outputs,
model_config,
name,
*,
dueling=False,
q_hiddens=(256, ),
dueling_activation="relu",
use_noisy=False,
sigma0=0.5,
# TODO(sven): Move `add_layer_norm` into ModelCatalog as
# generic option, then error if we use ParameterNoise as
# Exploration type and do not have any LayerNorm layers in
# the net.
add_layer_norm=False):
"""Initialize variables of this model.
Extra model kwargs:
dueling (bool): Whether to build the advantage(A)/value(V) heads
for DDQN. If True, Q-values are calculated as:
Q = (A - mean[A]) + V. If False, raw NN output is interpreted
as Q-values.
q_hiddens (List[int]): List of layer-sizes after(!) the
Advantages(A)/Value(V)-split. Hence, each of the A- and V-
branches will have this structure of Dense layers. To define
the NN before this A/V-split, use - as always -
config["model"]["fcnet_hiddens"].
dueling_activation (str): The activation to use for all dueling
layers (A- and V-branch). One of "relu", "tanh", "linear".
use_noisy (bool): use noisy nets
sigma0 (float): initial value of noisy nets
add_layer_norm (bool): Enable layer norm (for param noise).
"""
super(DQNTorchModel, self).__init__(obs_space, action_space,
num_outputs, model_config, name)
self.dueling = dueling
ins = num_outputs
# Dueling case: Build the shared (advantages and value) fc-network.
advantage_module = nn.Sequential()
value_module = None
if self.dueling:
value_module = nn.Sequential()
for i, n in enumerate(q_hiddens):
advantage_module.add_module("dueling_A_{}".format(i),
nn.Linear(ins, n))
value_module.add_module("dueling_V_{}".format(i),
nn.Linear(ins, n))
# Add activations if necessary.
if dueling_activation == "relu":
advantage_module.add_module("dueling_A_act_{}".format(i),
nn.ReLU())
value_module.add_module("dueling_V_act_{}".format(i),
nn.ReLU())
elif dueling_activation == "tanh":
advantage_module.add_module("dueling_A_act_{}".format(i),
nn.Tanh())
value_module.add_module("dueling_V_act_{}".format(i),
nn.Tanh())
# Add LayerNorm after each Dense.
if add_layer_norm:
advantage_module.add_module("LayerNorm_A_{}".format(i),
nn.LayerNorm(n))
value_module.add_module("LayerNorm_V_{}".format(i),
nn.LayerNorm(n))
ins = n
# Actual Advantages layer (nodes=num-actions) and
# value layer (nodes=1).
advantage_module.add_module("A", nn.Linear(ins, action_space.n))
value_module.add_module("V", nn.Linear(ins, 1))
# Non-dueling:
# Q-value layer (use Advantage module's outputs as Q-values).
else:
advantage_module.add_module("Q", nn.Linear(ins, action_space.n))
self.advantage_module = advantage_module
self.value_module = value_module
def get_advantages_or_q_values(self, model_out):
"""Returns distributional values for Q(s, a) given a state embedding.
Override this in your custom model to customize the Q output head.
Arguments:
model_out (Tensor): embedding from the model layers
Returns:
(action_scores, logits, dist) if num_atoms == 1, otherwise
(action_scores, z, support_logits_per_action, logits, dist)
"""
return self.advantage_module(model_out)
def get_state_value(self, model_out):
"""Returns the state value prediction for the given state embedding."""
return self.value_module(model_out)
def _noisy_layer(self, action_in, out_size, sigma0, non_linear=True):
"""
a common dense layer: y = w^{T}x + b
a noisy layer: y = (w + \\epsilon_w*\\sigma_w)^{T}x +
(b+\\epsilon_b*\\sigma_b)
where \epsilon are random variables sampled from factorized normal
distributions and \\sigma are trainable variables which are expected to
vanish along the training procedure
"""
in_size = int(action_in.shape[1])
epsilon_in = torch.normal(size=[in_size])
epsilon_out = torch.normal(size=[out_size])
epsilon_in = self._f_epsilon(epsilon_in)
epsilon_out = self._f_epsilon(epsilon_out)
epsilon_w = torch.matmul(
torch.unsqueeze(epsilon_in, -1),
other=torch.unsqueeze(epsilon_out, 0))
epsilon_b = epsilon_out
sigma_w = torch.Tensor(
data=np.random.uniform(
low=-1.0 / np.sqrt(float(in_size)),
high=1.0 / np.sqrt(float(in_size)),
size=[in_size, out_size]),
dtype=torch.float32,
requires_grad=True)
# TF noise generation can be unreliable on GPU
# If generating the noise on the CPU,
# lowering sigma0 to 0.1 may be helpful
sigma_b = torch.Tensor(
data=np.full(
shape=[out_size], fill_value=sigma0 / np.sqrt(float(in_size))),
requires_grad=True)
w = torch.Tensor(
data=np.full(
shape=[in_size, out_size],
fill_value=6 / np.sqrt(float(in_size) + float(out_size))),
requires_grad=True)
b = torch.Tensor(data=np.zeros([out_size]), requires_grad=True)
action_activation = torch.matmul(action_in, w + sigma_w * epsilon_w) \
+ b + sigma_b * epsilon_b
if not non_linear:
return action_activation
return nn.functional.relu(action_activation)
def _f_epsilon(self, x):
return torch.sign(x) * torch.pow(torch.abs(x), 0.5)
+268
View File
@@ -0,0 +1,268 @@
from gym.spaces import Discrete
import ray
from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio, \
PRIO_WEIGHTS, Q_SCOPE, Q_TARGET_SCOPE
from ray.rllib.agents.a3c.a3c_torch_policy import apply_grad_clipping
from ray.rllib.agents.dqn.dqn_torch_model import DQNTorchModel
from ray.rllib.agents.dqn.simple_q_torch_policy import TargetNetworkMixin
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.models.torch.torch_action_dist import TorchCategorical
from ray.rllib.policy.torch_policy import LearningRateSchedule
from ray.rllib.policy.torch_policy_template import build_torch_policy
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils.exploration.parameter_noise import ParameterNoise
from ray.rllib.utils.torch_ops import huber_loss, reduce_mean_ignore_inf
from ray.rllib.utils import try_import_torch
torch, nn = try_import_torch()
F = None
if nn:
F = torch.nn.functional
class QLoss:
def __init__(self,
q_t_selected,
q_tp1_best,
importance_weights,
rewards,
done_mask,
gamma=0.99,
n_step=1,
num_atoms=1,
v_min=-10.0,
v_max=10.0):
if num_atoms > 1:
raise ValueError("Torch version of DQN does not support "
"distributional Q yet!")
q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best
# compute RHS of bellman equation
q_t_selected_target = rewards + gamma**n_step * q_tp1_best_masked
# compute the error (potentially clipped)
self.td_error = q_t_selected - q_t_selected_target.detach()
self.loss = torch.mean(
importance_weights.float() * huber_loss(self.td_error))
self.stats = {
"mean_q": torch.mean(q_t_selected),
"min_q": torch.min(q_t_selected),
"max_q": torch.max(q_t_selected),
"td_error": self.td_error,
"mean_td_error": torch.mean(self.td_error),
}
class ComputeTDErrorMixin:
def __init__(self):
def compute_td_error(obs_t, act_t, rew_t, obs_tp1, done_mask,
importance_weights):
input_dict = self._lazy_tensor_dict({SampleBatch.CUR_OBS: obs_t})
input_dict[SampleBatch.ACTIONS] = act_t
input_dict[SampleBatch.REWARDS] = rew_t
input_dict[SampleBatch.NEXT_OBS] = obs_tp1
input_dict[SampleBatch.DONES] = done_mask
input_dict[PRIO_WEIGHTS] = importance_weights
# Do forward pass on loss to update td error attribute
build_q_losses(self, self.model, None, input_dict)
return self.q_loss.td_error
self.compute_td_error = compute_td_error
def build_q_model_and_distribution(policy, obs_space, action_space, config):
if not isinstance(action_space, Discrete):
raise UnsupportedSpaceException(
"Action space {} is not supported for DQN.".format(action_space))
if config["hiddens"]:
# try to infer the last layer size, otherwise fall back to 256
num_outputs = ([256] + config["model"]["fcnet_hiddens"])[-1]
config["model"]["no_final_linear"] = True
else:
num_outputs = action_space.n
# TODO(sven): Move option to add LayerNorm after each Dense
# generically into ModelCatalog.
add_layer_norm = (
isinstance(getattr(policy, "exploration", None), ParameterNoise)
or config["exploration_config"]["type"] == "ParameterNoise")
policy.q_model = ModelCatalog.get_model_v2(
obs_space=obs_space,
action_space=action_space,
num_outputs=num_outputs,
model_config=config["model"],
framework="torch",
model_interface=DQNTorchModel,
name=Q_SCOPE,
dueling=config["dueling"],
q_hiddens=config["hiddens"],
use_noisy=config["noisy"],
sigma0=config["sigma0"],
# TODO(sven): Move option to add LayerNorm after each Dense
# generically into ModelCatalog.
add_layer_norm=add_layer_norm)
policy.q_func_vars = policy.q_model.variables()
policy.target_q_model = ModelCatalog.get_model_v2(
obs_space=obs_space,
action_space=action_space,
num_outputs=num_outputs,
model_config=config["model"],
framework="torch",
model_interface=DQNTorchModel,
name=Q_TARGET_SCOPE,
dueling=config["dueling"],
q_hiddens=config["hiddens"],
use_noisy=config["noisy"],
sigma0=config["sigma0"],
# TODO(sven): Move option to add LayerNorm after each Dense
# generically into ModelCatalog.
add_layer_norm=add_layer_norm)
policy.target_q_func_vars = policy.target_q_model.variables()
return policy.q_model, TorchCategorical
def get_distribution_inputs_and_class(policy,
q_model,
obs_batch,
*,
explore=True,
is_training=False,
**kwargs):
q_vals = compute_q_values(policy, q_model, obs_batch, explore, is_training)
q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals
policy.q_values = q_vals
return policy.q_values, TorchCategorical, [] # state-out
def build_q_losses(policy, model, _, train_batch):
config = policy.config
# q network evaluation
q_t = compute_q_values(
policy,
policy.q_model,
train_batch[SampleBatch.CUR_OBS],
explore=False,
is_training=False)
# target q network evalution
q_tp1 = compute_q_values(
policy,
policy.target_q_model,
train_batch[SampleBatch.NEXT_OBS],
explore=False,
is_training=False)
# q scores for actions which we know were selected in the given state.
one_hot_selection = F.one_hot(train_batch[SampleBatch.ACTIONS],
policy.action_space.n)
q_t_selected = torch.sum(q_t * one_hot_selection, 1)
# compute estimate of best possible value starting from state at t + 1
if config["double_q"]:
q_tp1_using_online_net = compute_q_values(
policy,
policy.q_model,
train_batch[SampleBatch.NEXT_OBS],
explore=False,
is_training=False)
q_tp1_best_using_online_net = torch.argmax(q_tp1_using_online_net, 1)
q_tp1_best_one_hot_selection = F.one_hot(q_tp1_best_using_online_net,
policy.action_space.n)
q_tp1_best = torch.sum(q_tp1 * q_tp1_best_one_hot_selection, 1)
else:
q_tp1_best_one_hot_selection = F.one_hot(
torch.argmax(q_tp1, 1), policy.action_space.n)
q_tp1_best = torch.sum(q_tp1 * q_tp1_best_one_hot_selection, 1)
policy.q_loss = QLoss(q_t_selected, q_tp1_best, train_batch[PRIO_WEIGHTS],
train_batch[SampleBatch.REWARDS],
train_batch[SampleBatch.DONES].float(),
config["gamma"], config["n_step"],
config["num_atoms"], config["v_min"],
config["v_max"])
return policy.q_loss.loss
def adam_optimizer(policy, config):
return torch.optim.Adam(
policy.q_func_vars, lr=policy.cur_lr, eps=config["adam_epsilon"])
def build_q_stats(policy, batch):
return dict({
"cur_lr": policy.cur_lr,
}, **policy.q_loss.stats)
def setup_early_mixins(policy, obs_space, action_space, config):
LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])
def after_init(policy, obs_space, action_space, config):
ComputeTDErrorMixin.__init__(policy)
TargetNetworkMixin.__init__(policy, obs_space, action_space, config)
# Move target net to device (this is done autoatically for the
# policy.model, but not for any other models the policy has).
policy.target_q_model = policy.target_q_model.to(policy.device)
def compute_q_values(policy, model, obs, explore, is_training=False):
if policy.config["num_atoms"] > 1:
raise ValueError("torch DQN does not support distributional DQN yet!")
model_out, state = model({
SampleBatch.CUR_OBS: obs,
"is_training": is_training,
}, [], None)
advantages_or_q_values = model.get_advantages_or_q_values(model_out)
if policy.config["dueling"]:
state_value = model.get_state_value(model_out)
advantages_mean = reduce_mean_ignore_inf(advantages_or_q_values, 1)
advantages_centered = advantages_or_q_values - torch.unsqueeze(
advantages_mean, 1)
q_values = state_value + advantages_centered
else:
q_values = advantages_or_q_values
return q_values
def extra_action_out_fn(policy, input_dict, state_batches, model, action_dist):
return {"q_values": policy.q_values}
DQNTorchPolicy = build_torch_policy(
name="DQNTorchPolicy",
loss_fn=build_q_losses,
get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG,
make_model_and_action_dist=build_q_model_and_distribution,
action_distribution_fn=get_distribution_inputs_and_class,
stats_fn=build_q_stats,
postprocess_fn=postprocess_nstep_and_prio,
optimizer_fn=adam_optimizer,
extra_grad_process_fn=apply_grad_clipping,
extra_action_out_fn=extra_action_out_fn,
before_init=setup_early_mixins,
after_init=after_init,
mixins=[
TargetNetworkMixin,
ComputeTDErrorMixin,
LearningRateSchedule,
])
+87
View File
@@ -0,0 +1,87 @@
import logging
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy
from ray.rllib.agents.dqn.dqn import DQNTrainer
logger = logging.getLogger(__name__)
# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# === Exploration Settings (Experimental) ===
"exploration_config": {
# The Exploration class to use.
"type": "EpsilonGreedy",
# Config for the Exploration class' constructor:
"initial_epsilon": 1.0,
"final_epsilon": 0.02,
"epsilon_timesteps": 10000, # Timesteps over which to anneal epsilon.
# For soft_q, use:
# "exploration_config" = {
# "type": "SoftQ"
# "temperature": [float, e.g. 1.0]
# }
},
# Switch to greedy actions in evaluation workers.
"evaluation_config": {
"explore": False,
},
# Minimum env steps to optimize for per train call. This value does
# not affect learning, only the length of iterations.
"timesteps_per_iteration": 1000,
# Update the target network every `target_network_update_freq` steps.
"target_network_update_freq": 500,
# === Replay buffer ===
# Size of the replay buffer. Note that if async_updates is set, then
# each worker will have a replay buffer of this size.
"buffer_size": 50000,
# Whether to LZ4 compress observations
"compress_observations": True,
# === Optimization ===
# Learning rate for adam optimizer
"lr": 5e-4,
# Learning rate schedule
"lr_schedule": None,
# Adam epsilon hyper parameter
"adam_epsilon": 1e-8,
# If not None, clip gradients during optimization at this value
"grad_clip": 40,
# How many steps of the model to sample before learning starts.
"learning_starts": 1000,
# Update the replay buffer with this many samples at once. Note that
# this setting applies per-worker if num_workers > 1.
"rollout_fragment_length": 4,
# Size of a batch sampled from replay buffer for training. Note that
# if async_updates is set, then each worker returns gradients for a
# batch of this size.
"train_batch_size": 32,
# === Parallelism ===
# Number of workers for collecting samples with. This only makes sense
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Prevent iterations from going lower than this time span
"min_iter_time_s": 1,
})
# __sphinx_doc_end__
# yapf: enable
def get_policy_class(config):
if config["use_pytorch"]:
from ray.rllib.agents.dqn.simple_q_torch_policy import \
SimpleQTorchPolicy
return SimpleQTorchPolicy
else:
return SimpleQTFPolicy
SimpleQTrainer = DQNTrainer.with_updates(
default_policy=SimpleQTFPolicy,
get_policy_class=get_policy_class,
default_config=DEFAULT_CONFIG)
@@ -4,9 +4,9 @@ from gym.spaces import Discrete
import logging
import ray
from ray.rllib.agents.dqn.simple_q_model import SimpleQModel
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_action_dist import TorchCategorical
from ray.rllib.models.tf.tf_action_dist import Categorical
from ray.rllib.utils.annotations import override
from ray.rllib.utils.error import UnsupportedSpaceException
@@ -50,31 +50,24 @@ def build_q_models(policy, obs_space, action_space, config):
raise UnsupportedSpaceException(
"Action space {} is not supported for DQN.".format(action_space))
if config["hiddens"]:
num_outputs = 256
config["model"]["no_final_linear"] = True
else:
num_outputs = action_space.n
policy.q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
framework="tf",
name=Q_SCOPE,
model_interface=SimpleQModel,
q_hiddens=config["hiddens"])
obs_space=obs_space,
action_space=action_space,
num_outputs=action_space.n,
model_config=config["model"],
framework="torch" if config["use_pytorch"] else "tf",
name=Q_SCOPE)
policy.target_q_model = ModelCatalog.get_model_v2(
obs_space,
action_space,
num_outputs,
config["model"],
framework="tf",
name=Q_TARGET_SCOPE,
model_interface=SimpleQModel,
q_hiddens=config["hiddens"])
obs_space=obs_space,
action_space=action_space,
num_outputs=action_space.n,
model_config=config["model"],
framework="torch" if config["use_pytorch"] else "tf",
name=Q_TARGET_SCOPE)
policy.q_func_vars = policy.q_model.variables()
policy.target_q_func_vars = policy.target_q_model.variables()
return policy.q_model
@@ -84,13 +77,15 @@ def get_distribution_inputs_and_class(policy,
obs_batch,
*,
explore=True,
is_training=True,
**kwargs):
q_vals = compute_q_values(policy, q_model, obs_batch, explore)
q_vals = compute_q_values(policy, q_model, obs_batch, explore, is_training)
q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals
policy.q_values = q_vals
policy.q_func_vars = q_model.variables()
return policy.q_values, Categorical, [] # state-outs
return policy.q_values,\
TorchCategorical if policy.config["use_pytorch"] else Categorical,\
[] # state-outs
def build_q_losses(policy, model, dist_class, train_batch):
@@ -136,21 +131,22 @@ def build_q_losses(policy, model, dist_class, train_batch):
return loss
def compute_q_values(policy, model, obs, explore):
def compute_q_values(policy, model, obs, explore, is_training=None):
model_out, _ = model({
SampleBatch.CUR_OBS: obs,
"is_training": policy._get_is_training_placeholder(),
"is_training": is_training
if is_training is not None else policy._get_is_training_placeholder(),
}, [], None)
return model.get_q_values(model_out)
return model_out
def setup_late_mixins(policy, obs_space, action_space, config):
TargetNetworkMixin.__init__(policy, obs_space, action_space, config)
SimpleQPolicy = build_tf_policy(
name="SimpleQPolicy",
SimpleQTFPolicy = build_tf_policy(
name="SimpleQTFPolicy",
get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG,
make_model=build_q_models,
action_distribution_fn=get_distribution_inputs_and_class,
+100
View File
@@ -0,0 +1,100 @@
"""Basic example of a DQN policy without any optimizations."""
import logging
import ray
from ray.rllib.agents.dqn.simple_q_tf_policy import build_q_models, \
get_distribution_inputs_and_class, compute_q_values
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models.torch.torch_action_dist import TorchCategorical
from ray.rllib.policy.torch_policy_template import build_torch_policy
from ray.rllib.utils import try_import_torch
from ray.rllib.utils.torch_ops import huber_loss
torch, nn = try_import_torch()
F = None
if nn:
F = nn.functional
logger = logging.getLogger(__name__)
class TargetNetworkMixin:
def __init__(self, obs_space, action_space, config):
def do_update():
# Update_target_fn will be called periodically to copy Q network to
# target Q network.
assert len(self.q_func_vars) == len(self.target_q_func_vars), \
(self.q_func_vars, self.target_q_func_vars)
self.target_q_model.load_state_dict(self.q_model.state_dict())
self.update_target = do_update
def build_q_model_and_distribution(policy, obs_space, action_space, config):
return build_q_models(policy, obs_space, action_space, config), \
TorchCategorical
def build_q_losses(policy, model, dist_class, train_batch):
# q network evaluation
q_t = compute_q_values(
policy,
policy.q_model,
train_batch[SampleBatch.CUR_OBS],
explore=False,
is_training=True)
# target q network evalution
q_tp1 = compute_q_values(
policy,
policy.target_q_model,
train_batch[SampleBatch.NEXT_OBS],
explore=False,
is_training=True)
# q scores for actions which we know were selected in the given state.
one_hot_selection = F.one_hot(train_batch[SampleBatch.ACTIONS],
policy.action_space.n)
q_t_selected = torch.sum(q_t * one_hot_selection, 1)
# compute estimate of best possible value starting from state at t + 1
dones = train_batch[SampleBatch.DONES].float()
q_tp1_best_one_hot_selection = F.one_hot(
torch.argmax(q_tp1, 1), policy.action_space.n)
q_tp1_best = torch.sum(q_tp1 * q_tp1_best_one_hot_selection, 1)
q_tp1_best_masked = (1.0 - dones) * q_tp1_best
# compute RHS of bellman equation
q_t_selected_target = (train_batch[SampleBatch.REWARDS] +
policy.config["gamma"] * q_tp1_best_masked)
# Compute the error (Square/Huber).
td_error = q_t_selected - q_t_selected_target.detach()
loss = torch.mean(huber_loss(td_error))
# save TD error as an attribute for outside access
policy.td_error = td_error
return loss
def extra_action_out_fn(policy, input_dict, state_batches, model, action_dist):
"""Adds q-values to action out dict."""
return {"q_values": policy.q_values}
def setup_late_mixins(policy, obs_space, action_space, config):
TargetNetworkMixin.__init__(policy, obs_space, action_space, config)
SimpleQTorchPolicy = build_torch_policy(
name="SimpleQPolicy",
loss_fn=build_q_losses,
get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG,
extra_action_out_fn=extra_action_out_fn,
after_init=setup_late_mixins,
make_model_and_action_dist=build_q_model_and_distribution,
mixins=[TargetNetworkMixin],
action_distribution_fn=get_distribution_inputs_and_class,
stats_fn=lambda policy, config: {"td_error": policy.td_error},
)
+9 -5
View File
@@ -4,6 +4,7 @@ import unittest
import ray
import ray.rllib.agents.dqn.apex as apex
from ray.rllib.utils.test_utils import framework_iterator
class TestApex(unittest.TestCase):
@@ -17,11 +18,14 @@ class TestApex(unittest.TestCase):
config = apex.APEX_DEFAULT_CONFIG.copy()
config["num_workers"] = 3
config["optimizer"]["num_replay_buffer_shards"] = 1
trainer = apex.ApexTrainer(config, env="CartPole-v0")
infos = trainer.workers.foreach_policy(
lambda p, _: p.get_exploration_info())
eps = [i["cur_epsilon"] for i in infos]
assert np.allclose(eps, [1.0, 0.016190862, 0.00065536, 2.6527108e-05])
for _ in framework_iterator(config):
trainer = apex.ApexTrainer(config, env="CartPole-v0")
infos = trainer.workers.foreach_policy(
lambda p, _: p.get_exploration_info())
eps = [i["cur_epsilon"] for i in infos]
assert np.allclose(eps,
[1.0, 0.016190862, 0.00065536, 2.6527108e-05])
if __name__ == "__main__":
+27 -21
View File
@@ -10,13 +10,23 @@ tf = try_import_tf()
class TestDQN(unittest.TestCase):
def test_dqn_compilation(self):
"""Test whether a DQNTrainer can be built with both frameworks."""
"""Test whether a DQNTrainer can be built on all frameworks."""
config = dqn.DEFAULT_CONFIG.copy()
config["num_workers"] = 0 # Run locally.
num_iterations = 2
for _ in framework_iterator(config, frameworks=["tf", "eager"]):
for fw in framework_iterator(config):
# double-dueling DQN.
plain_config = config.copy()
trainer = dqn.DQNTrainer(config=plain_config, env="CartPole-v0")
for i in range(num_iterations):
results = trainer.train()
print(results)
# Rainbow.
# TODO(sven): Add torch once DQN-torch supports distributional-Q.
if fw == "torch":
continue
rainbow_config = config.copy()
rainbow_config["num_atoms"] = 10
rainbow_config["noisy"] = True
@@ -28,13 +38,6 @@ class TestDQN(unittest.TestCase):
results = trainer.train()
print(results)
# double-dueling DQN.
plain_config = config.copy()
trainer = dqn.DQNTrainer(config=plain_config, env="CartPole-v0")
for i in range(num_iterations):
results = trainer.train()
print(results)
def test_dqn_exploration_and_soft_q_config(self):
"""Tests, whether a DQN Agent outputs exploration/softmaxed actions."""
config = dqn.DEFAULT_CONFIG.copy()
@@ -43,7 +46,7 @@ class TestDQN(unittest.TestCase):
obs = np.array(0)
# Test against all frameworks.
for _ in framework_iterator(config, ["tf", "eager"]):
for _ in framework_iterator(config):
# Default EpsilonGreedy setup.
trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0")
# Setting explore=False should always return the same action.
@@ -61,14 +64,14 @@ class TestDQN(unittest.TestCase):
# (but no epsilon exploration).
config["exploration_config"] = {
"type": "SoftQ",
"temperature": 0.001
"temperature": 0.000001
}
trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0")
# Due to the low temp, always expect the same action.
a_ = trainer.compute_action(obs)
actions = [trainer.compute_action(obs)]
for _ in range(50):
a = trainer.compute_action(obs)
check(a, a_)
actions.append(trainer.compute_action(obs))
check(np.std(actions), 0.0, decimals=3)
# Higher softmax temperature.
config["exploration_config"]["temperature"] = 1.0
@@ -104,8 +107,8 @@ class TestDQN(unittest.TestCase):
core_config["num_workers"] = 0 # Run locally.
core_config["env_config"] = {"is_slippery": False, "map_name": "4x4"}
for fw in framework_iterator(core_config, ["tf", "eager"]):
# Test against all frameworks.
for fw in framework_iterator(core_config):
config = core_config.copy()
# DQN with ParameterNoise exploration (config["explore"]=True).
@@ -115,13 +118,14 @@ class TestDQN(unittest.TestCase):
trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0")
policy = trainer.get_policy()
p_sess = getattr(policy, "_sess", None)
self.assertFalse(policy.exploration.weights_are_currently_noisy)
noise_before = self._get_current_noise(policy, fw)
check(noise_before, 0.0)
initial_weights = self._get_current_weight(policy, fw)
# Pseudo-start an episode and compare the weights before and after.
policy.exploration.on_episode_start(policy, tf_sess=policy._sess)
policy.exploration.on_episode_start(policy, tf_sess=p_sess)
self.assertFalse(policy.exploration.weights_are_currently_noisy)
noise_after_ep_start = self._get_current_noise(policy, fw)
weights_after_ep_start = self._get_current_weight(policy, fw)
@@ -162,7 +166,7 @@ class TestDQN(unittest.TestCase):
# Pseudo-end the episode and compare weights again.
# Make sure they are the original ones.
policy.exploration.on_episode_end(policy, tf_sess=policy._sess)
policy.exploration.on_episode_end(policy, tf_sess=p_sess)
weights_after_ep_end = self._get_current_weight(policy, fw)
check(current_weight - noise, weights_after_ep_end, decimals=5)
@@ -173,6 +177,7 @@ class TestDQN(unittest.TestCase):
config["explore"] = False
trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0")
policy = trainer.get_policy()
p_sess = getattr(policy, "_sess", None)
self.assertFalse(policy.exploration.weights_are_currently_noisy)
initial_weights = self._get_current_weight(policy, fw)
@@ -182,7 +187,7 @@ class TestDQN(unittest.TestCase):
# Pseudo-start an episode and compare the weights before and after
# (they should be the same).
policy.exploration.on_episode_start(policy, tf_sess=policy._sess)
policy.exploration.on_episode_start(policy, tf_sess=p_sess)
self.assertFalse(policy.exploration.weights_are_currently_noisy)
# Should be the same, as we don't do anything at the beginning of
@@ -207,7 +212,7 @@ class TestDQN(unittest.TestCase):
# Pseudo-end the episode and compare weights again.
# Make sure they are the original ones (no noise permanently
# applied throughout the episode).
policy.exploration.on_episode_end(policy, tf_sess=policy._sess)
policy.exploration.on_episode_end(policy, tf_sess=p_sess)
weights_after_episode_end = self._get_current_weight(policy, fw)
check(initial_weights, weights_after_episode_end)
# Noise should still be the same (re-sampling only happens at
@@ -232,7 +237,8 @@ class TestDQN(unittest.TestCase):
# the same action for the same input (parameter noise is
# deterministic).
policy = trainer.get_policy()
policy.exploration.on_episode_start(policy, tf_sess=policy._sess)
p_sess = getattr(policy, "_sess", None)
policy.exploration.on_episode_start(policy, tf_sess=p_sess)
a_ = trainer.compute_action(obs)
for _ in range(10):
a = trainer.compute_action(obs, explore=True)
+102
View File
@@ -0,0 +1,102 @@
import numpy as np
import unittest
import ray.rllib.agents.dqn as dqn
from ray.rllib.agents.dqn.simple_q_tf_policy import build_q_losses as loss_tf
from ray.rllib.agents.dqn.simple_q_torch_policy import build_q_losses as \
loss_torch
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.numpy import fc, one_hot, huber_loss
from ray.rllib.utils.test_utils import check, framework_iterator
tf = try_import_tf()
class TestSimpleQ(unittest.TestCase):
def test_simple_q_compilation(self):
"""Test whether a SimpleQTrainer can be built on all frameworks."""
config = dqn.SIMPLE_Q_DEFAULT_CONFIG.copy()
config["num_workers"] = 0 # Run locally.
for _ in framework_iterator(config):
trainer = dqn.SimpleQTrainer(config=config, env="CartPole-v0")
num_iterations = 2
for i in range(num_iterations):
results = trainer.train()
print(results)
def test_simple_q_loss_function(self):
"""Tests the Simple-Q loss function results on all frameworks."""
config = dqn.SIMPLE_Q_DEFAULT_CONFIG.copy()
# Run locally.
config["num_workers"] = 0
# Use very simple net (layer0=10 nodes, q-layer=2 nodes (2 actions)).
config["model"]["fcnet_hiddens"] = [10]
config["model"]["fcnet_activation"] = "linear"
for fw in framework_iterator(config):
# Generate Trainer and get its default Policy object.
trainer = dqn.SimpleQTrainer(config=config, env="CartPole-v0")
policy = trainer.get_policy()
# Batch of size=2.
input_ = {
SampleBatch.CUR_OBS: np.random.random(size=(2, 4)),
SampleBatch.ACTIONS: np.array([0, 1]),
SampleBatch.REWARDS: np.array([0.4, -1.23]),
SampleBatch.DONES: np.array([False, False]),
SampleBatch.NEXT_OBS: np.random.random(size=(2, 4))
}
# Get model vars for computing expected model outs (q-vals).
# 0=layer-kernel; 1=layer-bias; 2=q-val-kernel; 3=q-val-bias
vars = policy.get_weights()
if isinstance(vars, dict):
vars = list(vars.values())
vars_t = policy.target_q_func_vars
if fw == "tf":
vars_t = policy.get_session().run(vars_t)
# Q(s,a) outputs.
q_t = np.sum(
one_hot(input_[SampleBatch.ACTIONS], 2) * fc(
fc(input_[SampleBatch.CUR_OBS],
vars[0],
vars[1],
framework=fw),
vars[2],
vars[3],
framework=fw), 1)
# max[a'](Qtarget(s',a')) outputs.
q_target_tp1 = np.max(
fc(fc(
input_[SampleBatch.NEXT_OBS],
vars_t[0],
vars_t[1],
framework=fw),
vars_t[2],
vars_t[3],
framework=fw), 1)
# TD-errors (Bellman equation).
td_error = q_t - config["gamma"] * input_[SampleBatch.REWARDS] + \
q_target_tp1
# Huber/Square loss on TD-error.
expected_loss = huber_loss(td_error).mean()
if fw == "torch":
input_ = policy._lazy_tensor_dict(input_)
# Get actual out and compare.
if fw == "tf":
out = policy.get_session().run(
policy._loss,
feed_dict=policy._get_loss_inputs_dict(
input_, shuffle=False))
else:
out = (loss_torch if fw == "torch" else
loss_tf)(policy, policy.model, None, input_)
check(out, expected_loss, decimals=1)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+35
View File
@@ -0,0 +1,35 @@
import unittest
import ray
import ray.rllib.agents.ppo as ppo
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.test_utils import framework_iterator
tf = try_import_tf()
class TestDDPPO(unittest.TestCase):
@classmethod
def setUpClass(cls):
ray.init()
@classmethod
def tearDownClass(cls):
ray.shutdown()
def test_ddppo_compilation(self):
"""Test whether a DDPPOTrainer can be built with both frameworks."""
config = ppo.ddppo.DEFAULT_CONFIG.copy()
config["num_gpus_per_worker"] = 0
num_iterations = 2
for _ in framework_iterator(config, "torch"):
trainer = ppo.ddppo.DDPPOTrainer(config=config, env="CartPole-v0")
for i in range(num_iterations):
trainer.train()
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+10 -6
View File
@@ -133,8 +133,7 @@ class TestPPO(unittest.TestCase):
[-0.5, -0.1, -0.2], dtype=np.float32),
}
for fw, sess in framework_iterator(
config, frameworks=["eager", "tf", "torch"], session=True):
for fw, sess in framework_iterator(config, session=True):
trainer = ppo.PPOTrainer(config=config, env="CartPole-v0")
policy = trainer.get_policy()
@@ -164,10 +163,15 @@ class TestPPO(unittest.TestCase):
list(policy.model.parameters())
if fw == "tf":
vars = policy.get_session().run(vars)
expected_shared_out = fc(train_batch[SampleBatch.CUR_OBS], vars[0],
vars[1])
expected_logits = fc(expected_shared_out, vars[2], vars[3])
expected_value_outs = fc(expected_shared_out, vars[4], vars[5])
expected_shared_out = fc(
train_batch[SampleBatch.CUR_OBS],
vars[0],
vars[1],
framework=fw)
expected_logits = fc(
expected_shared_out, vars[2], vars[3], framework=fw)
expected_value_outs = fc(
expected_shared_out, vars[4], vars[5], framework=fw)
kl, entropy, pg_loss, vf_loss, overall_loss = \
self._ppo_loss_helper(
+1
View File
@@ -99,4 +99,5 @@ QMixTrainer = GenericOffPolicyTrainer.with_updates(
name="QMIX",
default_config=DEFAULT_CONFIG,
default_policy=QMixTorchPolicy,
get_policy_class=None,
make_policy_optimizer=make_sync_batch_optimizer)
+1 -1
View File
@@ -103,4 +103,4 @@ SACTrainer = GenericOffPolicyTrainer.with_updates(
name="SAC",
default_config=DEFAULT_CONFIG,
default_policy=SACTFPolicy,
)
get_policy_class=lambda c: SACTFPolicy)
+1 -1
View File
@@ -5,7 +5,7 @@ import ray
import ray.experimental.tf_utils
from gym.spaces import Box, Discrete
from ray.rllib.agents.ddpg.noop_model import NoopModel
from ray.rllib.agents.dqn.dqn_policy import postprocess_nstep_and_prio, \
from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio, \
PRIO_WEIGHTS
from ray.rllib.agents.sac.sac_model import SACModel
from ray.rllib.models import ModelCatalog
+1
View File
@@ -172,6 +172,7 @@ MADDPGTrainer = GenericOffPolicyTrainer.with_updates(
name="MADDPG",
default_config=DEFAULT_CONFIG,
default_policy=MADDPGTFPolicy,
get_policy_class=None,
before_init=None,
before_train_step=set_global_timestep,
make_policy_optimizer=make_optimizer,
+1 -1
View File
@@ -1,5 +1,5 @@
import ray
from ray.rllib.agents.dqn.dqn_policy import minimize_and_clip, _adjust_nstep
from ray.rllib.agents.dqn.dqn_tf_policy import minimize_and_clip, _adjust_nstep
from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models import ModelCatalog
+4 -1
View File
@@ -255,7 +255,10 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker):
policy_config = policy_config or {}
if (tf and policy_config.get("eager")
and not policy_config.get("no_eager_on_workers")):
tf.enable_eager_execution()
# This check is necessary for certain all-framework tests that
# use tf's eager_mode() context generator.
if not tf.executing_eagerly():
tf.enable_eager_execution()
if log_level:
logging.getLogger("ray.rllib").setLevel(log_level)
+3 -2
View File
@@ -7,7 +7,8 @@ from ray import tune
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
from ray.rllib.agents.dqn.distributional_q_tf_model import \
DistributionalQTFModel
from ray.rllib.utils import try_import_tf
from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as MyVisionNetwork
@@ -58,7 +59,7 @@ class MyKerasModel(TFModelV2):
return {"foo": tf.constant(42.0)}
class MyKerasQModel(DistributionalQModel):
class MyKerasQModel(DistributionalQTFModel):
"""Custom model for DQN."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
+1 -1
View File
@@ -13,7 +13,7 @@ import gym
import ray
from ray.rllib.agents.dqn.dqn import DQNTrainer
from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy
from ray.rllib.tests.test_multi_agent_env import MultiCartpole
+3 -2
View File
@@ -22,7 +22,8 @@ from gym.spaces import Box, Discrete, Dict
import ray
from ray import tune
from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel
from ray.rllib.agents.dqn.distributional_q_tf_model import \
DistributionalQTFModel
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
@@ -109,7 +110,7 @@ class ParametricActionCartpole(gym.Env):
return obs, rew, done, info
class ParametricActionsModel(DistributionalQModel, TFModelV2):
class ParametricActionsModel(DistributionalQTFModel, TFModelV2):
"""Parametric action model that handles the dot product and masking.
This assumes the outputs are logits for a single Categorical action dist.
+21 -25
View File
@@ -1,26 +1,26 @@
from functools import partial
import gym
import logging
import numpy as np
from functools import partial
from ray.tune.registry import RLLIB_MODEL, RLLIB_PREPROCESSOR, \
RLLIB_ACTION_DIST, _global_registry
from ray.rllib.models.extra_spaces import Simplex
from ray.rllib.models.action_dist import ActionDistribution
from ray.rllib.models.torch.torch_action_dist import TorchCategorical, \
TorchMultiCategorical, TorchDiagGaussian
from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2
from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2
from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \
Deterministic, DiagGaussian, MultiActionDistribution, Dirichlet
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.preprocessors import get_preprocessor
from ray.rllib.models.tf.fcnet_v1 import FullyConnectedNetwork
from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2
from ray.rllib.models.tf.lstm_v1 import LSTM
from ray.rllib.models.tf.modelv1_compat import make_v1_wrapper
from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \
Deterministic, DiagGaussian, MultiActionDistribution, Dirichlet
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.visionnet_v1 import VisionNetwork
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.torch_action_dist import TorchCategorical, \
TorchMultiCategorical, TorchDiagGaussian
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI
from ray.rllib.utils.error import UnsupportedSpaceException
@@ -350,8 +350,12 @@ class ModelCatalog:
if default_model:
return default_model(obs_space, action_space, num_outputs,
model_config, name)
return ModelCatalog._get_default_torch_model_v2(
v2_class = ModelCatalog._get_default_torch_model_class_v2(
obs_space, action_space, num_outputs, model_config, name)
# wrap in the requested interface
wrapper = ModelCatalog._wrap_if_needed(v2_class, model_interface)
return wrapper(obs_space, action_space, num_outputs, model_config,
name, **model_kwargs)
else:
raise NotImplementedError(
"Framework must be 'tf' or 'torch': {}".format(framework))
@@ -451,7 +455,7 @@ class ModelCatalog:
@staticmethod
def _wrap_if_needed(model_cls, model_interface):
assert issubclass(model_cls, TFModelV2), model_cls
assert issubclass(model_cls, (TFModelV2, TorchModelV2)), model_cls
if not model_interface or issubclass(model_cls, model_interface):
return model_cls
@@ -466,30 +470,22 @@ class ModelCatalog:
return wrapper
@staticmethod
def _get_default_torch_model_v2(obs_space, action_space, num_outputs,
model_config, name):
def _get_default_torch_model_class_v2(obs_space, action_space, num_outputs,
model_config, name):
from ray.rllib.models.torch.fcnet import (FullyConnectedNetwork as
PyTorchFCNet)
from ray.rllib.models.torch.visionnet import (VisionNetwork as
PyTorchVisionNet)
model_config = model_config or MODEL_DEFAULTS
if model_config.get("use_lstm"):
raise NotImplementedError(
"LSTM auto-wrapping not implemented for torch")
if isinstance(obs_space, gym.spaces.Discrete):
obs_rank = 1
if isinstance(obs_space, gym.spaces.Discrete) or \
len(obs_space.shape) <= 2:
return PyTorchFCNet
else:
obs_rank = len(obs_space.shape)
if obs_rank > 2:
return PyTorchVisionNet(obs_space, action_space, num_outputs,
model_config, name)
return PyTorchFCNet(obs_space, action_space, num_outputs, model_config,
name)
return PyTorchVisionNet
@staticmethod
def get_model(input_dict,
+32 -1
View File
@@ -84,7 +84,11 @@ class ModelV2:
raise NotImplementedError
def value_function(self):
"""Return the value function estimate for the most recent forward pass.
"""Returns the value function output for the most recent forward pass.
Note that a `forward` call has to be performed first, before this
methods can return anything and thus that calling this method does not
cause an extra forward pass through the network.
Returns:
value estimate tensor of shape [BATCH].
@@ -222,6 +226,33 @@ class ModelV2:
"""Returns a contextmanager for the current forward pass."""
return NullContextManager()
def variables(self, as_dict=False):
"""Returns the list (or a dict) of variables for this model.
Args:
as_dict(bool): Whether variables should be returned as dict-values
(using descriptive keys).
Returns:
Union[List[any],Dict[str,any]]: The list (or dict if `as_dict` is
True) of all variables of this ModelV2.
"""
raise NotImplementedError
def trainable_variables(self, as_dict=False):
"""Returns the list of trainable variables for this model.
Args:
as_dict(bool): Whether variables should be returned as dict-values
(using descriptive keys).
Returns:
Union[List[any],Dict[str,any]]: The list (or dict if `as_dict` is
True) of all trainable (tf)/requires_grad (torch) variables
of this ModelV2.
"""
raise NotImplementedError
class NullContextManager:
"""No-op context manager"""
+18 -23
View File
@@ -1,6 +1,7 @@
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import PublicAPI
from ray.rllib.utils import try_import_tf
from ray.rllib.utils.annotations import override
tf = try_import_tf()
@@ -65,7 +66,7 @@ class TFModelV2(ModelV2):
Custom models should override this instead of __call__.
Arguments:
Args:
input_dict (dict): dictionary of input tensors, including "obs",
"obs_flat", "prev_action", "prev_reward", "is_training"
state (list): list of state tensors with sizes matching those
@@ -76,24 +77,11 @@ class TFModelV2(ModelV2):
(outputs, state): The model output tensor of size
[BATCH, num_outputs]
Sample implementation for the ``MyModelClass`` example::
def forward(self, input_dict, state, seq_lens):
model_out, self._value_out = self.base_model(input_dict["obs"])
return model_out, state
"""
raise NotImplementedError
def value_function(self):
"""Return the value function estimate for the most recent forward pass.
Returns:
value estimate tensor of shape [BATCH].
Sample implementation for the ``MyModelClass`` example::
def value_function(self):
return self._value_out
Examples:
>>> def forward(self, input_dict, state, seq_lens):
>>> model_out, self._value_out = self.base_model(
... input_dict["obs"])
>>> return model_out, state
"""
raise NotImplementedError
@@ -107,10 +95,17 @@ class TFModelV2(ModelV2):
"""Register the given list of variables with this model."""
self.var_list.extend(variables)
def variables(self):
"""Returns the list of variables for this model."""
@override(ModelV2)
def variables(self, as_dict=False):
if as_dict:
return {v.name: v for v in self.var_list}
return list(self.var_list)
def trainable_variables(self):
"""Returns the list of trainable variables for this model."""
@override(ModelV2)
def trainable_variables(self, as_dict=False):
if as_dict:
return {
k: v
for k, v in self.variables(as_dict=True).items() if v.trainable
}
return [v for v in self.variables() if v.trainable]
+4 -2
View File
@@ -46,8 +46,10 @@ class TorchCategorical(TorchDistributionWrapper):
@override(ActionDistribution)
def __init__(self, inputs, model=None, temperature=1.0):
assert temperature > 0.0, "Categorical `temperature` must be > 0.0!"
inputs /= temperature
if temperature != 1.0:
assert temperature > 0.0, \
"Categorical `temperature` must be > 0.0!"
inputs /= temperature
super().__init__(inputs, model)
self.dist = torch.distributions.categorical.Categorical(
logits=self.inputs)
+21 -19
View File
@@ -1,5 +1,5 @@
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import PublicAPI
from ray.rllib.utils.annotations import override, PublicAPI
from ray.rllib.utils import try_import_torch
_, nn = try_import_torch()
@@ -55,7 +55,7 @@ class TorchModelV2(ModelV2):
Custom models should override this instead of __call__.
Arguments:
Args:
input_dict (dict): dictionary of input tensors, including "obs",
"obs_flat", "prev_action", "prev_reward", "is_training"
state (list): list of state tensors with sizes matching those
@@ -66,24 +66,26 @@ class TorchModelV2(ModelV2):
(outputs, state): The model output tensor of size
[BATCH, num_outputs]
Sample implementation for the ``MyModelClass`` example::
def forward(self, input_dict, state, seq_lens):
features = self._hidden_layers(input_dict["obs"])
self._value_out = self._value_branch(features)
return self._logits(features), state
Examples:
>>> def forward(self, input_dict, state, seq_lens):
>>> features = self._hidden_layers(input_dict["obs"])
>>> self._value_out = self._value_branch(features)
>>> return self._logits(features), state
"""
raise NotImplementedError
def value_function(self):
"""Return the value function estimate for the most recent forward pass.
@override(ModelV2)
def variables(self, as_dict=False):
if as_dict:
return self.state_dict()
return list(self.parameters())
Returns:
value estimate tensor of shape [BATCH].
Sample implementation for the ``MyModelClass`` example::
def value_function(self):
return self._value_out
"""
raise NotImplementedError
@override(ModelV2)
def trainable_variables(self, as_dict=False):
if as_dict:
return {
k: v
for k, v in self.variables(as_dict=True).items()
if v.requires_grad
}
return [v for v in self.variables() if v.requires_grad]
+6 -1
View File
@@ -169,7 +169,12 @@ class SyncReplayOptimizer(PolicyOptimizer):
self.learner_stats[policy_id] = get_learner_stats(info)
replay_buffer = self.replay_buffers[policy_id]
if isinstance(replay_buffer, PrioritizedReplayBuffer):
td_error = info["td_error"]
# TODO(sven): This is currently structured differently for
# torch/tf. Clean up these results/info dicts across
# policies (note: fixing this in torch_policy.py will
# break e.g. DDPPO!).
td_error = info.get(
"td_error", info["learner_stats"].get("td_error"))
new_priorities = (
np.abs(td_error) + self.prioritized_replay_eps)
replay_buffer.update_priorities(
+9 -2
View File
@@ -357,7 +357,9 @@ def build_eager_tf_policy(name,
action_distribution_fn(
self, self.model,
input_dict[SampleBatch.CUR_OBS],
explore=explore, timestep=timestep)
explore=explore,
timestep=timestep,
is_training=False)
else:
dist_class = self.dist_class
dist_inputs, state_out = self.model(
@@ -420,7 +422,11 @@ def build_eager_tf_policy(name,
# Action dist class and inputs are generated via custom function.
if action_distribution_fn:
dist_inputs, dist_class, _ = action_distribution_fn(
self, self.model, input_dict[SampleBatch.CUR_OBS])
self,
self.model,
input_dict[SampleBatch.CUR_OBS],
explore=False,
is_training=False)
action_dist = dist_class(dist_inputs, self.model)
log_likelihoods = action_dist.logp(actions)
# Default log-likelihood calculation.
@@ -560,6 +566,7 @@ def build_eager_tf_policy(name,
}
else:
fetches[LEARNER_STATS_KEY] = {}
if extra_learn_fetches_fn:
fetches.update(
{k: v
@@ -76,9 +76,11 @@ def do_test_log_likelihood(run,
layer_key[0])])
else:
expected_mean_logstd = fc(
fc(obs_batch,
vars["_hidden_layers.0._model.0.weight"]),
vars["_logits._model.0.weight"])
fc(
obs_batch,
np.transpose(
vars["_hidden_layers.0._model.0.weight"])),
np.transpose(vars["_logits._model.0.weight"]))
mean, log_std = np.split(expected_mean_logstd, 2, axis=-1)
if logp_func is None:
expected_logp = np.log(norm.pdf(a, mean, np.exp(log_std)))
+1 -1
View File
@@ -185,7 +185,7 @@ def build_tf_policy(name,
@override(TFPolicy)
def extra_compute_grad_fetches(self):
if extra_learn_fetches_fn:
# auto-add empty learner stats dict if needed
# Auto-add empty learner stats dict if needed.
return dict({
LEARNER_STATS_KEY: {}
}, **extra_learn_fetches_fn(self))
+30 -12
View File
@@ -7,7 +7,8 @@ from ray.rllib.policy.rnn_sequencing import pad_batch_to_sequences_of_same_size
from ray.rllib.utils.annotations import override, DeveloperAPI
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule
from ray.rllib.utils.torch_ops import convert_to_non_torch_type
from ray.rllib.utils.torch_ops import convert_to_non_torch_type, \
convert_to_torch_tensor
from ray.rllib.utils.tracking_dict import UsageTrackingDict
torch, _ = try_import_torch()
@@ -111,15 +112,18 @@ class TorchPolicy(Policy):
seq_lens = torch.ones(len(obs_batch), dtype=torch.int32)
input_dict = self._lazy_tensor_dict({
SampleBatch.CUR_OBS: obs_batch,
"is_training": False,
})
if prev_action_batch:
if prev_action_batch is not None:
input_dict[SampleBatch.PREV_ACTIONS] = prev_action_batch
if prev_reward_batch:
if prev_reward_batch is not None:
input_dict[SampleBatch.PREV_REWARDS] = prev_reward_batch
state_batches = [self._convert_to_tensor(s) for s in state_batches]
state_batches = [
self._convert_to_tensor(s) for s in (state_batches or [])
]
if self.action_sampler_fn:
dist_class = dist_inputs = None
action_dist = dist_inputs = None
state_out = []
actions, logp = self.action_sampler_fn(
self,
@@ -129,12 +133,17 @@ class TorchPolicy(Policy):
timestep=timestep)
else:
# Call the exploration before_compute_actions hook.
self.exploration.before_compute_actions(timestep=timestep)
self.exploration.before_compute_actions(
explore=explore, timestep=timestep)
if self.action_distribution_fn:
dist_inputs, dist_class, state_out = \
self.action_distribution_fn(
self, self.model, input_dict[SampleBatch.CUR_OBS],
explore=explore, timestep=timestep)
self,
self.model,
input_dict[SampleBatch.CUR_OBS],
explore=explore,
timestep=timestep,
is_training=False)
else:
dist_class = self.dist_class
dist_inputs, state_out = self.model(
@@ -194,7 +203,10 @@ class TorchPolicy(Policy):
# Action dist class and inputs are generated via custom function.
if self.action_distribution_fn:
dist_inputs, dist_class, _ = self.action_distribution_fn(
self, self.model, input_dict[SampleBatch.CUR_OBS])
self,
self.model,
input_dict[SampleBatch.CUR_OBS],
explore=False)
# Default action-dist inputs calculation.
else:
dist_class = self.dist_class
@@ -242,9 +254,11 @@ class TorchPolicy(Policy):
info["allreduce_latency"] = time.time() - start
self._optimizer.step()
info.update(self.extra_grad_info(train_batch))
return {LEARNER_STATS_KEY: info}
info.update(self.extra_grad_info(train_batch))
return {
LEARNER_STATS_KEY: info
}
@override(Policy)
def compute_gradients(self, postprocessed_batch):
@@ -278,10 +292,14 @@ class TorchPolicy(Policy):
@override(Policy)
def get_weights(self):
return {k: v.cpu() for k, v in self.model.state_dict().items()}
return {
k: v.cpu().detach().numpy()
for k, v in self.model.state_dict().items()
}
@override(Policy)
def set_weights(self, weights):
weights = convert_to_torch_tensor(weights, device=self.device)
self.model.load_state_dict(weights)
@override(Policy)
+1 -1
View File
@@ -51,7 +51,7 @@ if __name__ == "__main__":
passed = False
for i in range(3):
trials = run_experiments(experiments, resume=False, verbose=1)
trials = run_experiments(experiments, resume=False, verbose=0)
for t in trials:
if (t.last_result["episode_reward_mean"] >=
+1 -1
View File
@@ -4,7 +4,7 @@ import unittest
import ray
from ray.rllib.agents.dqn import DQNTrainer
from ray.rllib.agents.a3c import A3CTrainer
from ray.rllib.agents.dqn.dqn_policy import _adjust_nstep
from ray.rllib.agents.dqn.dqn_tf_policy import _adjust_nstep
from ray.tune.registry import register_env
+1 -1
View File
@@ -5,7 +5,7 @@ import unittest
import ray
from ray.rllib.agents.pg import PGTrainer
from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy
from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.optimizers import (SyncSamplesOptimizer, SyncReplayOptimizer,
AsyncGradientsOptimizer)
from ray.rllib.tests.test_rollout_worker import (MockEnv, MockEnv2, MockPolicy)
@@ -1,17 +1,18 @@
cartpole-dqn-w-param-noise:
cartpole-dqn-tf-w-param-noise:
env: CartPole-v0
run: DQN
stop:
episode_reward_mean: 150
timesteps_total: 100000
timesteps_total: 300000
config:
use_pytorch: false
exploration_config:
type: ParameterNoise
random_timesteps: 0
random_timesteps: 10000
initial_stddev: 1.0
batch_mode: complete_episodes
lr: 0.001
lr: 0.0008
num_workers: 0
model:
fcnet_hiddens: [16]
fcnet_activation: linear
fcnet_hiddens: [32, 32]
fcnet_activation: tanh
@@ -1,9 +1,10 @@
cartpole-dqn:
cartpole-dqn-tf:
env: CartPole-v0
run: DQN
stop:
episode_reward_mean: 150
timesteps_total: 50000
config:
use_pytorch: false
n_step: 3
gamma: 0.95
@@ -0,0 +1,18 @@
cartpole-dqn-torch-w-param-noise:
env: CartPole-v0
run: DQN
stop:
episode_reward_mean: 150
timesteps_total: 300000
config:
use_pytorch: true
exploration_config:
type: ParameterNoise
random_timesteps: 10000
initial_stddev: 1.0
batch_mode: complete_episodes
lr: 0.0008
num_workers: 0
model:
fcnet_hiddens: [32, 32]
fcnet_activation: tanh
@@ -0,0 +1,10 @@
cartpole-dqn-torch:
env: CartPole-v0
run: DQN
stop:
episode_reward_mean: 150
timesteps_total: 50000
config:
use_pytorch: true
n_step: 3
gamma: 0.95
@@ -0,0 +1,8 @@
cartpole-dqn-tf:
env: CartPole-v0
run: SimpleQ
stop:
episode_reward_mean: 150
timesteps_total: 50000
config:
use_pytorch: false
@@ -0,0 +1,8 @@
cartpole-dqn-torch:
env: CartPole-v0
run: SimpleQ
stop:
episode_reward_mean: 150
timesteps_total: 50000
config:
use_pytorch: true
+4
View File
@@ -14,6 +14,8 @@ from ray.rllib.utils.policy_server import PolicyServer
from ray.rllib.utils.schedules import LinearSchedule, PiecewiseSchedule, \
PolynomialSchedule, ExponentialSchedule, ConstantSchedule
from ray.rllib.utils.test_utils import check, framework_iterator
from ray.rllib.utils.torch_ops import convert_to_non_torch_type, \
convert_to_torch_tensor
from ray.tune.utils import merge_dicts, deep_update
@@ -60,6 +62,8 @@ __all__ = [
"add_mixins",
"check",
"check_framework",
"convert_to_non_torch_type",
"convert_to_torch_tensor",
"deprecation_warning",
"fc",
"force_list",
+12 -8
View File
@@ -4,6 +4,7 @@ import numpy as np
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.tf.tf_action_dist import Categorical
from ray.rllib.models.torch.torch_action_dist import TorchCategorical
from ray.rllib.utils.annotations import override
from ray.rllib.utils.exploration.exploration import Exploration
from ray.rllib.utils.framework import try_import_tf, try_import_torch
@@ -63,18 +64,21 @@ class ParameterNoise(Exploration):
# This excludes any variable, whose name contains "LayerNorm" (those
# are BatchNormalization layers, which should not be perturbed).
self.model_variables = [
v for v in self.model.variables() if "LayerNorm" not in v.name
v for k, v in self.model.variables(as_dict=True).items()
if "LayerNorm" not in k
]
# Our noise to be added to the weights. Each item in `self.noise`
# corresponds to one Model variable and holding the Gaussian noise to
# be added to that variable (weight).
self.noise = []
for var in self.model_variables:
name_ = var.name.split(":")[0] + "_noisy" if var.name else ""
self.noise.append(
get_variable(
np.zeros(var.shape, dtype=np.float32),
framework=self.framework,
tf_name=var.name.split(":")[0] + "_noisy"))
tf_name=name_,
torch_tensor=True))
# tf-specific ops to sample, assign and remove noise.
if self.framework == "tf" and not tf.executing_eagerly():
@@ -205,7 +209,7 @@ class ParameterNoise(Exploration):
explore=self.weights_are_currently_noisy)
# Categorical case (e.g. DQN).
if policy.dist_class is Categorical:
if policy.dist_class in (Categorical, TorchCategorical):
action_dist = softmax(fetches[SampleBatch.ACTION_DIST_INPUTS])
else: # TODO(sven): Other action-dist cases.
raise NotImplementedError
@@ -223,7 +227,7 @@ class ParameterNoise(Exploration):
explore=not self.weights_are_currently_noisy)
# Categorical case (e.g. DQN).
if policy.dist_class is Categorical:
if policy.dist_class in (Categorical, TorchCategorical):
action_dist = softmax(fetches[SampleBatch.ACTION_DIST_INPUTS])
if noisy_action_dist is None:
@@ -232,7 +236,7 @@ class ParameterNoise(Exploration):
noise_free_action_dist = action_dist
# Categorical case (e.g. DQN).
if policy.dist_class is Categorical:
if policy.dist_class in (Categorical, TorchCategorical):
# Calculate KL-divergence (DKL(clean||noisy)) according to [2].
# TODO(sven): Allow KL-divergence to be calculated by our
# Distribution classes (don't support off-graph/numpy yet).
@@ -269,7 +273,7 @@ class ParameterNoise(Exploration):
else:
for i in range(len(self.noise)):
self.noise[i] = torch.normal(
0.0, self.stddev, size=self.noise[i].size)
0.0, self.stddev, size=self.noise[i].size())
def _tf_sample_new_noise_op(self):
added_noises = []
@@ -319,7 +323,7 @@ class ParameterNoise(Exploration):
else:
for i in range(len(self.noise)):
# Add noise to weights in-place.
torch.add_(self.model_variables[i], self.noise[i])
self.model_variables[i].add_(self.noise[i])
self.weights_are_currently_noisy = True
@@ -358,7 +362,7 @@ class ParameterNoise(Exploration):
# Removes the stored noise from the model's parameters.
for var, noise in zip(self.model_variables, self.noise):
# Remove noise from weights in-place.
torch.add_(var, -noise)
var.add_(-noise)
self.weights_are_currently_noisy = False
@@ -31,8 +31,8 @@ def do_test_explorations(run,
# Test all frameworks.
for fw in framework_iterator(config):
if fw == "torch" and \
run in [ddpg.DDPGTrainer, dqn.DQNTrainer, dqn.SimpleQTrainer,
impala.ImpalaTrainer, sac.SACTrainer, td3.TD3Trainer]:
run in [ddpg.DDPGTrainer, impala.ImpalaTrainer,
sac.SACTrainer, td3.TD3Trainer]:
continue
elif fw == "eager" and run in [
ddpg.DDPGTrainer, sac.SACTrainer, td3.TD3Trainer
@@ -123,8 +123,8 @@ class TestExplorations(unittest.TestCase):
expected_mean_action=0.0)
def test_simple_dqn(self):
do_test_explorations(dqn.SimpleQTrainer,
"CartPole-v0", dqn.DEFAULT_CONFIG,
do_test_explorations(dqn.SimpleQTrainer, "CartPole-v0",
dqn.SIMPLE_Q_DEFAULT_CONFIG,
np.array([0.0, 0.1, 0.0, 0.0]))
def test_dqn(self):
+19 -4
View File
@@ -135,16 +135,26 @@ def try_import_torch(error=False):
return None, nn
def get_variable(value, framework="tf", tf_name="unnamed-variable"):
def get_variable(value,
framework="tf",
trainable=False,
tf_name="unnamed-variable",
torch_tensor=False):
"""
Args:
value (any): The initial value to use. In the non-tf case, this will
be returned as is.
framework (str): One of "tf", "torch", or None.
tf_name (str): An optional name for the variable. Only for tf.
trainable (bool): Whether the generated variable should be
trainable (tf)/require_grad (torch) or not (default: False).
tf_name (str): For framework="tf": An optional name for the
tf.Variable.
torch_tensor (bool): For framework="torch": Whether to actually create
a torch.tensor, or just a python value (default).
Returns:
any: A framework-specific variable (tf.Variable or python primitive).
any: A framework-specific variable (tf.Variable, torch.tensor, or
python primitive).
"""
if framework == "tf":
import tensorflow as tf
@@ -153,7 +163,12 @@ def get_variable(value, framework="tf", tf_name="unnamed-variable"):
if isinstance(value, float) else tf.int32
if isinstance(value, int) else None)
return tf.compat.v1.get_variable(
tf_name, initializer=value, dtype=dtype)
tf_name, initializer=value, dtype=dtype, trainable=trainable)
elif framework == "torch" and torch_tensor is True:
import torch
var_ = torch.from_numpy(value)
var_.requires_grad = trainable
return var_
# torch or None: Return python primitive.
return value
+29 -12
View File
@@ -83,6 +83,10 @@ def one_hot(x, depth=0, on_value=1, off_value=0):
Returns:
np.ndarray: The one-hot encoded equivalent of the input array.
"""
# Handle torch arrays properly.
if torch and isinstance(x, torch.Tensor):
x = x.numpy()
# Handle bool arrays correctly.
if x.dtype == np.bool_:
x = x.astype(np.int)
@@ -114,7 +118,7 @@ def one_hot(x, depth=0, on_value=1, off_value=0):
return out
def fc(x, weights, biases=None):
def fc(x, weights, biases=None, framework=None):
"""
Calculates the outputs of a fully-connected (dense) layer given
weights/biases and an input.
@@ -123,23 +127,29 @@ def fc(x, weights, biases=None):
x (np.ndarray): The input to the dense layer.
weights (np.ndarray): The weights matrix.
biases (Optional[np.ndarray]): The biases vector. All 0s if None.
framework (Optional[str]): An optional framework hint (to figure out,
e.g. whether to transpose torch weight matrices).
Returns:
The dense layer's output.
"""
def map_(data, transpose=False):
if torch:
if isinstance(data, torch.Tensor):
data = data.cpu().detach().numpy()
if tf and tf.executing_eagerly():
if isinstance(data, tf.Variable):
data = data.numpy()
if transpose:
data = np.transpose(data)
return data
x = map_(x)
# Torch stores matrices in transpose (faster for backprop).
if torch: # and isinstance(weights, torch.Tensor):
x = x.detach().numpy() if isinstance(x, torch.Tensor) else x
weights = np.transpose(weights.detach().numpy()) if \
isinstance(weights, torch.Tensor) else weights
biases = biases.detach().numpy() if \
isinstance(biases, torch.Tensor) else biases
if tf and tf.executing_eagerly():
x = x.numpy() if isinstance(x, tf.Variable) else x
weights = weights.numpy() if isinstance(weights, tf.Variable) else \
weights
biases = biases.numpy() if isinstance(biases, tf.Variable) else biases
weights = map_(weights, transpose=framework == "torch")
biases = map_(biases)
return np.matmul(x, weights) + (0.0 if biases is None else biases)
@@ -216,3 +226,10 @@ def lstm(x,
unrolled_outputs[:, t, :] = h_states
return unrolled_outputs, (c_states, h_states)
def huber_loss(x, delta=1.0):
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
return np.where(
np.abs(x) < delta,
np.power(x, 2.0) * 0.5, delta * (np.abs(x) - 0.5 * delta))
+3 -8
View File
@@ -187,15 +187,10 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False):
rtol=rtol,
false=false)
if torch is not None:
# y should never be a Tensor (y=expected value).
if isinstance(y, torch.Tensor):
raise ValueError("`y` (expected value) must not be a Tensor. "
"Use numpy.ndarray instead")
if isinstance(x, torch.Tensor):
try:
x = x.numpy()
except RuntimeError:
x = x.detach().numpy()
x = x.detach().numpy()
if isinstance(y, torch.Tensor):
y = y.detach().numpy()
# Using decimals.
if atol is None and rtol is None:
+4 -1
View File
@@ -18,11 +18,14 @@ def reduce_mean_ignore_inf(x, axis):
tf.cast(mask, tf.float32), axis))
def minimize_and_clip(optimizer, objective, var_list, clip_val=10):
def minimize_and_clip(optimizer, objective, var_list, clip_val=10.0):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
# Accidentally passing values < 0.0 will break all gradients.
assert clip_val > 0.0, clip_val
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
+55 -3
View File
@@ -1,3 +1,4 @@
import numpy as np
import logging
from ray.rllib.utils.framework import try_import_torch
@@ -13,6 +14,33 @@ except (ImportError, ModuleNotFoundError) as e:
raise e
def huber_loss(x, delta=1.0):
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
return torch.where(
torch.abs(x) < delta,
torch.pow(x, 2.0) * 0.5, delta * (torch.abs(x) - 0.5 * delta))
def reduce_mean_ignore_inf(x, axis):
"""Same as torch.mean() but ignores -inf values."""
mask = torch.ne(x, float("-inf"))
x_zeroed = torch.where(mask, x, torch.zeros_like(x))
return torch.sum(x_zeroed, axis) / torch.sum(mask.float(), axis)
def minimize_and_clip(optimizer, objective, var_list, clip_val=10):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (torch.nn.utils.clip_grad_norm_(grad, clip_val),
var)
return gradients
def sequence_mask(lengths, maxlen, dtype=None):
"""
Exact same behavior as tf.sequence_mask.
@@ -31,7 +59,7 @@ def sequence_mask(lengths, maxlen, dtype=None):
def convert_to_non_torch_type(stats):
"""Converts values in stats_dict to non-Tensor numpy or python types.
"""Converts values in `stats` to non-Tensor numpy or python types.
Args:
stats (any): Any (possibly nested) struct, the values in which will be
@@ -39,7 +67,7 @@ def convert_to_non_torch_type(stats):
being converted to numpy types.
Returns:
dict: A new dict with the same structure as stats_dict, but with all
Any: A new struct with the same structure as `stats`, but with all
values converted to non-torch Tensor types.
"""
@@ -47,8 +75,32 @@ def convert_to_non_torch_type(stats):
def mapping(item):
if isinstance(item, torch.Tensor):
return item.cpu().item() if len(item.size()) == 0 else \
item.cpu().numpy()
item.cpu().detach().numpy()
else:
return item
return tree.map_structure(mapping, stats)
def convert_to_torch_tensor(stats, device=None):
"""Converts any struct to torch.Tensors.
stats (any): Any (possibly nested) struct, the values in which will be
converted and returned as a new struct with all leaves converted
to torch tensors.
Returns:
Any: A new struct with the same structure as `stats`, but with all
values converted to torch Tensor types.
"""
def mapping(item):
if torch.is_tensor(item):
return item if device is None else item.to(device)
tensor = torch.from_numpy(np.asarray(item))
# Floatify all float64 tensors.
if tensor.dtype == torch.double:
tensor = tensor.float()
return tensor if device is None else tensor.to(device)
return tree.map_structure(mapping, stats)