From 22ccc43670dac93eb7fe81520a84cf3979d05693 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Mon, 6 Apr 2020 20:56:16 +0200 Subject: [PATCH] [RLlib] DQN torch version. (#7597) * Fix. * Rollback. * WIP. * WIP. * WIP. * WIP. * WIP. * WIP. * WIP. * WIP. * Fix. * Fix. * Fix. * Fix. * Fix. * WIP. * WIP. * Fix. * Test case fixes. * Test case fixes and LINT. * Test case fixes and LINT. * Rollback. * WIP. * WIP. * Test case fixes. * Fix. * Fix. * Fix. * Add regression test for DQN w/ param noise. * Fixes and LINT. * Fixes and LINT. * Fixes and LINT. * Fixes and LINT. * Fixes and LINT. * Comment * Regression test case. * WIP. * WIP. * LINT. * LINT. * WIP. * Fix. * Fix. * Fix. * LINT. * Fix (SAC does currently not support eager). * Fix. * WIP. * LINT. * Update rllib/evaluation/sampler.py Co-Authored-By: Eric Liang * Update rllib/evaluation/sampler.py Co-Authored-By: Eric Liang * Update rllib/utils/exploration/exploration.py Co-Authored-By: Eric Liang * Update rllib/utils/exploration/exploration.py Co-Authored-By: Eric Liang * WIP. * WIP. * Fix. * LINT. * LINT. * Fix and LINT. * WIP. * WIP. * WIP. * WIP. * Fix. * LINT. * Fix. * Fix and LINT. * Update rllib/utils/exploration/exploration.py * Update rllib/policy/dynamic_tf_policy.py Co-Authored-By: Eric Liang * Update rllib/policy/dynamic_tf_policy.py Co-Authored-By: Eric Liang * Update rllib/policy/dynamic_tf_policy.py Co-Authored-By: Eric Liang * Fixes. * WIP. * LINT. * Fixes and LINT. * LINT and fixes. * LINT. * Move action_dist back into torch extra_action_out_fn and LINT. * Working SimpleQ learning cartpole on both torch AND tf. * Working Rainbow learning cartpole on tf. * Working Rainbow learning cartpole on tf. * WIP. * LINT. * LINT. * Update docs and add torch to APEX test. * LINT. * Fix. * LINT. * Fix. * Fix. * Fix and docstrings. * Fix broken RLlib tests in master. * Split BAZEL learning tests into cartpole and pendulum (reached the 60min barrier). * Fix error_outputs option in BAZEL for RLlib regression tests. * Fix. * Tune param-noise tests. * LINT. * Fix. * Fix. * test * test * test * Fix. * Fix. * WIP. * WIP. * WIP. * WIP. * LINT. * WIP. Co-authored-by: Eric Liang --- .travis.yml | 4 +- doc/source/rllib-algorithms.rst | 4 +- rllib/BUILD | 12 +- rllib/agents/ddpg/ddpg.py | 1 + rllib/agents/ddpg/ddpg_policy.py | 2 +- rllib/agents/ddpg/td3.py | 5 +- rllib/agents/dqn/__init__.py | 19 +- ..._model.py => distributional_q_tf_model.py} | 26 +- rllib/agents/dqn/dqn.py | 67 +++-- .../dqn/{dqn_policy.py => dqn_tf_policy.py} | 39 +-- rllib/agents/dqn/dqn_torch_model.py | 165 +++++++++++ rllib/agents/dqn/dqn_torch_policy.py | 268 ++++++++++++++++++ rllib/agents/dqn/simple_q.py | 87 ++++++ ...mple_q_policy.py => simple_q_tf_policy.py} | 58 ++-- rllib/agents/dqn/simple_q_torch_policy.py | 100 +++++++ rllib/agents/dqn/tests/test_apex.py | 14 +- rllib/agents/dqn/tests/test_dqn.py | 48 ++-- rllib/agents/dqn/tests/test_simple_q.py | 102 +++++++ rllib/agents/ppo/tests/test_ddppo.py | 35 +++ rllib/agents/ppo/tests/test_ppo.py | 16 +- rllib/agents/qmix/qmix.py | 1 + rllib/agents/sac/sac.py | 2 +- rllib/agents/sac/sac_policy.py | 2 +- rllib/contrib/maddpg/maddpg.py | 1 + rllib/contrib/maddpg/maddpg_policy.py | 2 +- rllib/evaluation/rollout_worker.py | 5 +- rllib/examples/custom_keras_model.py | 5 +- rllib/examples/multi_agent_two_trainers.py | 2 +- rllib/examples/parametric_action_cartpole.py | 5 +- rllib/models/catalog.py | 46 ++- rllib/models/modelv2.py | 33 ++- rllib/models/tf/tf_modelv2.py | 41 ++- rllib/models/torch/torch_action_dist.py | 6 +- rllib/models/torch/torch_modelv2.py | 40 +-- rllib/optimizers/sync_replay_optimizer.py | 7 +- rllib/policy/eager_tf_policy.py | 11 +- .../tests/test_compute_log_likelihoods.py | 8 +- rllib/policy/tf_policy_template.py | 2 +- rllib/policy/torch_policy.py | 42 ++- rllib/tests/run_regression_tests.py | 2 +- rllib/tests/test_evaluators.py | 2 +- rllib/tests/test_multi_agent_env.py | 2 +- ....yaml => cartpole-dqn-tf-param-noise.yaml} | 13 +- ...cartpole-dqn.yaml => cartpole-dqn-tf.yaml} | 3 +- .../cartpole-dqn-torch-param-noise.yaml | 18 ++ .../regression_tests/cartpole-dqn-torch.yaml | 10 + .../regression_tests/cartpole-simpleq-tf.yaml | 8 + .../cartpole-simpleq-torch.yaml | 8 + rllib/utils/__init__.py | 4 + rllib/utils/exploration/parameter_noise.py | 20 +- .../exploration/tests/test_explorations.py | 8 +- rllib/utils/framework.py | 23 +- rllib/utils/numpy.py | 41 ++- rllib/utils/test_utils.py | 11 +- rllib/utils/tf_ops.py | 5 +- rllib/utils/torch_ops.py | 58 +++- 56 files changed, 1292 insertions(+), 277 deletions(-) rename rllib/agents/dqn/{distributional_q_model.py => distributional_q_tf_model.py} (92%) rename rllib/agents/dqn/{dqn_policy.py => dqn_tf_policy.py} (95%) create mode 100644 rllib/agents/dqn/dqn_torch_model.py create mode 100644 rllib/agents/dqn/dqn_torch_policy.py create mode 100644 rllib/agents/dqn/simple_q.py rename rllib/agents/dqn/{simple_q_policy.py => simple_q_tf_policy.py} (80%) create mode 100644 rllib/agents/dqn/simple_q_torch_policy.py create mode 100644 rllib/agents/dqn/tests/test_simple_q.py create mode 100644 rllib/agents/ppo/tests/test_ddppo.py rename rllib/tuned_examples/regression_tests/{cartpole-dqn-param-noise.yaml => cartpole-dqn-tf-param-noise.yaml} (54%) rename rllib/tuned_examples/regression_tests/{cartpole-dqn.yaml => cartpole-dqn-tf.yaml} (78%) create mode 100644 rllib/tuned_examples/regression_tests/cartpole-dqn-torch-param-noise.yaml create mode 100644 rllib/tuned_examples/regression_tests/cartpole-dqn-torch.yaml create mode 100644 rllib/tuned_examples/regression_tests/cartpole-simpleq-tf.yaml create mode 100644 rllib/tuned_examples/regression_tests/cartpole-simpleq-torch.yaml diff --git a/.travis.yml b/.travis.yml index 23b6f46b5..13d9899e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -192,7 +192,7 @@ matrix: - ./ci/suppress_output ./ci/travis/install-ray.sh script: - if [ $RAY_CI_RLLIB_AFFECTED != "1" ]; then exit; fi - - travis_wait 60 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors rllib/... + - travis_wait 90 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=streamed rllib/... # RLlib: Learning tests with tf=1.x (from rllib/tuned_examples/regression_tests/*.yaml). # Requested by Edi (MS): Test all learning capabilities with tf1.x @@ -213,7 +213,7 @@ matrix: - ./ci/suppress_output ./ci/travis/install-ray.sh script: - if [ $RAY_CI_RLLIB_FULL_AFFECTED != "1" ]; then exit; fi - - travis_wait 60 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors rllib/... + - travis_wait 90 bazel test --build_tests_only --test_tag_filters=learning_tests --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=streamed rllib/... # RLlib: Quick Agent train.py runs (compilation & running, no(!) learning). # Agent single tests (compilation, loss-funcs, etc..). diff --git a/doc/source/rllib-algorithms.rst b/doc/source/rllib-algorithms.rst index 991a81d2f..a036bbb52 100644 --- a/doc/source/rllib-algorithms.rst +++ b/doc/source/rllib-algorithms.rst @@ -37,7 +37,7 @@ High-throughput architectures Distributed Prioritized Experience Replay (Ape-X) ------------------------------------------------- -|tensorflow| +|pytorch| |tensorflow| `[paper] `__ `[implementation] `__ Ape-X variations of DQN, DDPG, and QMIX (`APEX_DQN `__, `APEX_DDPG `__, `APEX_QMIX `__) use a single GPU learner and many CPU workers for experience collection. Experience collection can scale to hundreds of CPU workers due to the distributed prioritization of experience prior to storage in replay buffers. @@ -247,7 +247,7 @@ Tuned examples: `Pendulum-v0 `__ `[implementation] `__ RLlib DQN is implemented using the SyncReplayOptimizer. The algorithm can be scaled by increasing the number of workers, using the AsyncGradientsOptimizer for async DQN, or using Ape-X. Memory usage is reduced by compressing samples in the replay buffer with LZ4. All of the DQN improvements evaluated in `Rainbow `__ are available, though not all are enabled by default. See also how to use `parametric-actions in DQN `__. diff --git a/rllib/BUILD b/rllib/BUILD index f858baba9..fb813a328 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -46,7 +46,7 @@ py_test( tags = ["learning_tests", "learning_tests_cartpole"], size = "enormous", # = 60min timeout srcs = ["tests/run_regression_tests.py"], - data = glob(["tuned_examples/regression_tests/cartpole*.yaml"]), + data = glob(["tuned_examples/regression_tests/cartpole-*.yaml"]), # Pass `BAZEL` option and the path to look for yaml regression files. args = ["BAZEL", "tuned_examples/regression_tests"] ) @@ -57,7 +57,7 @@ py_test( tags = ["learning_tests", "learning_tests_pendulum"], size = "enormous", # = 60min timeout srcs = ["tests/run_regression_tests.py"], - data = glob(["tuned_examples/regression_tests/pendulum*.yaml"]), + data = glob(["tuned_examples/regression_tests/pendulum-*.yaml"]), # Pass `BAZEL` option and the path to look for yaml regression files. args = ["BAZEL", "tuned_examples/regression_tests"] ) @@ -85,13 +85,19 @@ py_test( srcs = ["agents/ddpg/tests/test_ddpg.py"] ) -# DQNTrainer +# DQNTrainer/SimpleQTrainer py_test( name = "test_dqn", tags = ["agents_dir"], size = "medium", srcs = ["agents/dqn/tests/test_dqn.py"] ) +py_test( + name = "test_simple_q", + tags = ["agents_dir"], + size = "medium", + srcs = ["agents/dqn/tests/test_simple_q.py"] +) # APEXTrainer py_test( diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index d22d5670c..32cbe0540 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -193,5 +193,6 @@ DDPGTrainer = GenericOffPolicyTrainer.with_updates( name="DDPG", default_config=DEFAULT_CONFIG, default_policy=DDPGTFPolicy, + get_policy_class=None, validate_config=validate_config, ) diff --git a/rllib/agents/ddpg/ddpg_policy.py b/rllib/agents/ddpg/ddpg_policy.py index 641f34284..34106d5b6 100644 --- a/rllib/agents/ddpg/ddpg_policy.py +++ b/rllib/agents/ddpg/ddpg_policy.py @@ -3,7 +3,7 @@ import numpy as np import ray import ray.experimental.tf_utils -from ray.rllib.agents.dqn.dqn_policy import postprocess_nstep_and_prio +from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY from ray.rllib.models import ModelCatalog diff --git a/rllib/agents/ddpg/td3.py b/rllib/agents/ddpg/td3.py index 2c8e7324d..531d52f46 100644 --- a/rllib/agents/ddpg/td3.py +++ b/rllib/agents/ddpg/td3.py @@ -58,4 +58,7 @@ TD3_DEFAULT_CONFIG = DDPGTrainer.merge_trainer_configs( }) TD3Trainer = DDPGTrainer.with_updates( - name="TD3", default_config=TD3_DEFAULT_CONFIG) + name="TD3", + default_config=TD3_DEFAULT_CONFIG, + get_policy_class=None, +) diff --git a/rllib/agents/dqn/__init__.py b/rllib/agents/dqn/__init__.py index 2eb78cc9a..aaf169e3c 100644 --- a/rllib/agents/dqn/__init__.py +++ b/rllib/agents/dqn/__init__.py @@ -1,11 +1,16 @@ from ray.rllib.agents.dqn.apex import ApexTrainer -from ray.rllib.agents.dqn.dqn import DQNTrainer, SimpleQTrainer, DEFAULT_CONFIG -from ray.rllib.utils import renamed_agent - -DQNAgent = renamed_agent(DQNTrainer) -ApexAgent = renamed_agent(ApexTrainer) +from ray.rllib.agents.dqn.dqn import DQNTrainer, DEFAULT_CONFIG +from ray.rllib.agents.dqn.simple_q import SimpleQTrainer, \ + DEFAULT_CONFIG as SIMPLE_Q_DEFAULT_CONFIG +from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy +from ray.rllib.agents.dqn.simple_q_torch_policy import SimpleQTorchPolicy __all__ = [ - "DQNAgent", "ApexAgent", "ApexTrainer", "DQNTrainer", "DEFAULT_CONFIG", - "SimpleQTrainer" + "ApexTrainer", + "DQNTrainer", + "DEFAULT_CONFIG", + "SIMPLE_Q_DEFAULT_CONFIG", + "SimpleQTFPolicy", + "SimpleQTorchPolicy", + "SimpleQTrainer", ] diff --git a/rllib/agents/dqn/distributional_q_model.py b/rllib/agents/dqn/distributional_q_tf_model.py similarity index 92% rename from rllib/agents/dqn/distributional_q_model.py rename to rllib/agents/dqn/distributional_q_tf_model.py index 5b4c329d3..c2220e0da 100644 --- a/rllib/agents/dqn/distributional_q_model.py +++ b/rllib/agents/dqn/distributional_q_tf_model.py @@ -6,7 +6,7 @@ from ray.rllib.utils import try_import_tf tf = try_import_tf() -class DistributionalQModel(TFModelV2): +class DistributionalQTFModel(TFModelV2): """Extension of standard TFModel to provide distributional Q values. It also supports options for noisy nets and parameter space noise. @@ -41,10 +41,15 @@ class DistributionalQModel(TFModelV2): """Initialize variables of this model. Extra model kwargs: - q_hiddens (list): defines size of hidden layers for the q head. - These will be used to postprocess the model output for the - purposes of computing Q values. - dueling (bool): whether to build the state value head for DDQN + q_hiddens (List[int]): List of layer-sizes after(!) the + Advantages(A)/Value(V)-split. Hence, each of the A- and V- + branches will have this structure of Dense layers. To define + the NN before this A/V-split, use - as always - + config["model"]["fcnet_hiddens"]. + dueling (bool): Whether to build the advantage(A)/value(V) heads + for DDQN. If True, Q-values are calculated as: + Q = (A - mean[A]) + V. If False, raw NN output is interpreted + as Q-values. num_atoms (int): if >1, enables distributional DQN use_noisy (bool): use noisy nets v_min (float): min value support for distributional DQN @@ -57,7 +62,7 @@ class DistributionalQModel(TFModelV2): should be defined in subclasses of DistributionalQModel. """ - super(DistributionalQModel, self).__init__( + super(DistributionalQTFModel, self).__init__( obs_space, action_space, num_outputs, model_config, name) # setup the Q head output (i.e., model for get_q_values) @@ -87,6 +92,7 @@ class DistributionalQModel(TFModelV2): # Avoid postprocessing the outputs. This enables custom models # to be used for parametric action DQN. action_out = model_out + if use_noisy: action_scores = self._noisy_layer( "output", @@ -131,14 +137,12 @@ class DistributionalQModel(TFModelV2): state_out = self._noisy_layer("dueling_hidden_%d" % i, state_out, q_hiddens[i], sigma0) - elif add_layer_norm: - state_out = tf.keras.layers.Dense( - units=q_hiddens[i], activation=tf.nn.relu)(state_out) - state_out = \ - tf.keras.layers.LayerNormalization()(state_out) else: state_out = tf.keras.layers.Dense( units=q_hiddens[i], activation=tf.nn.relu)(state_out) + if add_layer_norm: + state_out = tf.keras.layers.LayerNormalization()( + state_out) if use_noisy: state_score = self._noisy_layer( "dueling_output", diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index aa9dec8dc..7777b6d9c 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -2,8 +2,8 @@ import logging from ray.rllib.agents.trainer import with_common_config from ray.rllib.agents.trainer_template import build_trainer -from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy -from ray.rllib.agents.dqn.simple_q_policy import SimpleQPolicy +from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy +from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy from ray.rllib.optimizers import SyncReplayOptimizer from ray.rllib.optimizers.replay_buffer import ReplayBuffer from ray.rllib.utils.deprecation import deprecation_warning, DEPRECATED_VALUE @@ -30,11 +30,11 @@ DEFAULT_CONFIG = with_common_config({ "sigma0": 0.5, # Whether to use dueling dqn "dueling": True, + # Dense-layer setup for each the advantage branch and the value branch + # in a dueling architecture. + "hiddens": [256], # Whether to use double dqn "double_q": True, - # Postprocess model outputs with these hidden layers to compute the - # state and action values. See also the model config in catalog.py. - "hiddens": [256], # N-step Q learning "n_step": 1, @@ -90,13 +90,13 @@ DEFAULT_CONFIG = with_common_config({ # Adam epsilon hyper parameter "adam_epsilon": 1e-8, # If not None, clip gradients during optimization at this value - "grad_norm_clipping": 40, + "grad_clip": 40, # How many steps of the model to sample before learning starts. "learning_starts": 1000, # Update the replay buffer with this many samples at once. Note that # this setting applies per-worker if num_workers > 1. "rollout_fragment_length": 4, - # Size of a batched sampled from replay buffer for training. Note that + # Size of a batch sampled from replay buffer for training. Note that # if async_updates is set, then each worker returns gradients for a # batch of this size. "train_batch_size": 32, @@ -122,6 +122,7 @@ DEFAULT_CONFIG = with_common_config({ "softmax_temp": DEPRECATED_VALUE, "soft_q": DEPRECATED_VALUE, "parameter_noise": DEPRECATED_VALUE, + "grad_norm_clipping": DEPRECATED_VALUE, }) # __sphinx_doc_end__ # yapf: enable @@ -133,20 +134,27 @@ def make_policy_optimizer(workers, config): Returns: SyncReplayOptimizer: Used for generic off-policy Trainers. """ + # SimpleQ does not use a PR buffer. + kwargs = {"prioritized_replay": config.get("prioritized_replay", False)} + kwargs.update(**config["optimizer"]) + if "prioritized_replay" in config: + kwargs.update({ + "prioritized_replay_alpha": config["prioritized_replay_alpha"], + "prioritized_replay_beta": config["prioritized_replay_beta"], + "prioritized_replay_beta_annealing_timesteps": config[ + "prioritized_replay_beta_annealing_timesteps"], + "final_prioritized_replay_beta": config[ + "final_prioritized_replay_beta"], + "prioritized_replay_eps": config["prioritized_replay_eps"], + }) + return SyncReplayOptimizer( workers, # TODO(sven): Move all PR-beta decays into Schedule components. learning_starts=config["learning_starts"], buffer_size=config["buffer_size"], - prioritized_replay=config["prioritized_replay"], - prioritized_replay_alpha=config["prioritized_replay_alpha"], - prioritized_replay_beta=config["prioritized_replay_beta"], - prioritized_replay_beta_annealing_timesteps=config[ - "prioritized_replay_beta_annealing_timesteps"], - final_prioritized_replay_beta=config["final_prioritized_replay_beta"], - prioritized_replay_eps=config["prioritized_replay_eps"], train_batch_size=config["train_batch_size"], - **config["optimizer"]) + **kwargs) def validate_config_and_setup_param_noise(config): @@ -154,14 +162,14 @@ def validate_config_and_setup_param_noise(config): Rewrites rollout_fragment_length to take into account n_step truncation. """ - # PyTorch check. - if config["use_pytorch"]: - raise ValueError("DQN does not support PyTorch yet! Use tf instead.") - # TODO(sven): Remove at some point. # Backward compatibility of epsilon-exploration config AND beta-annealing # fraction settings (both based on schedule_max_timesteps, which is # deprecated). + if config.get("grad_norm_clipping", DEPRECATED_VALUE) != DEPRECATED_VALUE: + deprecation_warning("grad_norm_clipping", "grad_clip") + config["grad_clip"] = config.pop("grad_norm_clipping") + schedule_max_timesteps = None if config.get("schedule_max_timesteps", DEPRECATED_VALUE) != \ DEPRECATED_VALUE: @@ -309,9 +317,27 @@ def execution_plan(workers, config): return StandardMetricsReporting(train_op, workers, config) +def get_policy_class(config): + if config["use_pytorch"]: + from ray.rllib.agents.dqn.dqn_torch_policy import DQNTorchPolicy + return DQNTorchPolicy + else: + return DQNTFPolicy + + +def get_simple_policy_class(config): + if config["use_pytorch"]: + from ray.rllib.agents.dqn.simple_q_torch_policy import \ + SimpleQTorchPolicy + return SimpleQTorchPolicy + else: + return SimpleQTFPolicy + + GenericOffPolicyTrainer = build_trainer( name="GenericOffPolicyAlgorithm", default_policy=None, + get_policy_class=get_policy_class, default_config=DEFAULT_CONFIG, validate_config=validate_config_and_setup_param_noise, get_initial_state=get_initial_state, @@ -324,4 +350,5 @@ GenericOffPolicyTrainer = build_trainer( DQNTrainer = GenericOffPolicyTrainer.with_updates( name="DQN", default_policy=DQNTFPolicy, default_config=DEFAULT_CONFIG) -SimpleQTrainer = DQNTrainer.with_updates(default_policy=SimpleQPolicy) +SimpleQTrainer = DQNTrainer.with_updates( + default_policy=SimpleQTFPolicy, get_policy_class=get_simple_policy_class) diff --git a/rllib/agents/dqn/dqn_policy.py b/rllib/agents/dqn/dqn_tf_policy.py similarity index 95% rename from rllib/agents/dqn/dqn_policy.py rename to rllib/agents/dqn/dqn_tf_policy.py index 365ef17ad..554c785e9 100644 --- a/rllib/agents/dqn/dqn_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -2,18 +2,19 @@ from gym.spaces import Discrete import numpy as np import ray -from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel -from ray.rllib.agents.dqn.simple_q_policy import TargetNetworkMixin +from ray.rllib.agents.dqn.distributional_q_tf_model import \ + DistributionalQTFModel +from ray.rllib.agents.dqn.simple_q_tf_policy import TargetNetworkMixin +from ray.rllib.models import ModelCatalog +from ray.rllib.models.tf.tf_action_dist import Categorical from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy import LearningRateSchedule from ray.rllib.policy.tf_policy_template import build_tf_policy -from ray.rllib.models import ModelCatalog -from ray.rllib.models.tf.tf_action_dist import Categorical from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.utils.exploration import ParameterNoise +from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.tf_ops import huber_loss, reduce_mean_ignore_inf, \ minimize_and_clip -from ray.rllib.utils import try_import_tf from ray.rllib.utils.tf_ops import make_tf_callable tf = try_import_tf() @@ -138,16 +139,16 @@ def build_q_model(policy, obs_space, action_space, config): num_outputs = action_space.n policy.q_model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], + obs_space=obs_space, + action_space=action_space, + num_outputs=num_outputs, + model_config=config["model"], framework="tf", - model_interface=DistributionalQModel, + model_interface=DistributionalQTFModel, name=Q_SCOPE, num_atoms=config["num_atoms"], - q_hiddens=config["hiddens"], dueling=config["dueling"], + q_hiddens=config["hiddens"], use_noisy=config["noisy"], v_min=config["v_min"], v_max=config["v_max"], @@ -159,16 +160,16 @@ def build_q_model(policy, obs_space, action_space, config): or config["exploration_config"]["type"] == "ParameterNoise") policy.target_q_model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], + obs_space=obs_space, + action_space=action_space, + num_outputs=num_outputs, + model_config=config["model"], framework="tf", - model_interface=DistributionalQModel, + model_interface=DistributionalQTFModel, name=Q_TARGET_SCOPE, num_atoms=config["num_atoms"], - q_hiddens=config["hiddens"], dueling=config["dueling"], + q_hiddens=config["hiddens"], use_noisy=config["noisy"], v_min=config["v_min"], v_max=config["v_max"], @@ -257,12 +258,12 @@ def adam_optimizer(policy, config): def clip_gradients(policy, optimizer, loss): - if policy.config["grad_norm_clipping"] is not None: + if policy.config["grad_clip"] is not None: grads_and_vars = minimize_and_clip( optimizer, loss, var_list=policy.q_func_vars, - clip_val=policy.config["grad_norm_clipping"]) + clip_val=policy.config["grad_clip"]) else: grads_and_vars = optimizer.compute_gradients( loss, var_list=policy.q_func_vars) diff --git a/rllib/agents/dqn/dqn_torch_model.py b/rllib/agents/dqn/dqn_torch_model.py new file mode 100644 index 000000000..e9fb393d2 --- /dev/null +++ b/rllib/agents/dqn/dqn_torch_model.py @@ -0,0 +1,165 @@ +import numpy as np + +from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 +from ray.rllib.utils import try_import_torch + +torch, nn = try_import_torch() + + +class DQNTorchModel(TorchModelV2): + """Extension of standard TorchModelV2 to provide dueling-Q functionality. + """ + + def __init__( + self, + obs_space, + action_space, + num_outputs, + model_config, + name, + *, + dueling=False, + q_hiddens=(256, ), + dueling_activation="relu", + use_noisy=False, + sigma0=0.5, + # TODO(sven): Move `add_layer_norm` into ModelCatalog as + # generic option, then error if we use ParameterNoise as + # Exploration type and do not have any LayerNorm layers in + # the net. + add_layer_norm=False): + """Initialize variables of this model. + + Extra model kwargs: + dueling (bool): Whether to build the advantage(A)/value(V) heads + for DDQN. If True, Q-values are calculated as: + Q = (A - mean[A]) + V. If False, raw NN output is interpreted + as Q-values. + q_hiddens (List[int]): List of layer-sizes after(!) the + Advantages(A)/Value(V)-split. Hence, each of the A- and V- + branches will have this structure of Dense layers. To define + the NN before this A/V-split, use - as always - + config["model"]["fcnet_hiddens"]. + dueling_activation (str): The activation to use for all dueling + layers (A- and V-branch). One of "relu", "tanh", "linear". + use_noisy (bool): use noisy nets + sigma0 (float): initial value of noisy nets + add_layer_norm (bool): Enable layer norm (for param noise). + """ + + super(DQNTorchModel, self).__init__(obs_space, action_space, + num_outputs, model_config, name) + + self.dueling = dueling + ins = num_outputs + + # Dueling case: Build the shared (advantages and value) fc-network. + advantage_module = nn.Sequential() + value_module = None + if self.dueling: + value_module = nn.Sequential() + for i, n in enumerate(q_hiddens): + advantage_module.add_module("dueling_A_{}".format(i), + nn.Linear(ins, n)) + value_module.add_module("dueling_V_{}".format(i), + nn.Linear(ins, n)) + # Add activations if necessary. + if dueling_activation == "relu": + advantage_module.add_module("dueling_A_act_{}".format(i), + nn.ReLU()) + value_module.add_module("dueling_V_act_{}".format(i), + nn.ReLU()) + elif dueling_activation == "tanh": + advantage_module.add_module("dueling_A_act_{}".format(i), + nn.Tanh()) + value_module.add_module("dueling_V_act_{}".format(i), + nn.Tanh()) + + # Add LayerNorm after each Dense. + if add_layer_norm: + advantage_module.add_module("LayerNorm_A_{}".format(i), + nn.LayerNorm(n)) + value_module.add_module("LayerNorm_V_{}".format(i), + nn.LayerNorm(n)) + ins = n + # Actual Advantages layer (nodes=num-actions) and + # value layer (nodes=1). + advantage_module.add_module("A", nn.Linear(ins, action_space.n)) + value_module.add_module("V", nn.Linear(ins, 1)) + # Non-dueling: + # Q-value layer (use Advantage module's outputs as Q-values). + else: + advantage_module.add_module("Q", nn.Linear(ins, action_space.n)) + + self.advantage_module = advantage_module + self.value_module = value_module + + def get_advantages_or_q_values(self, model_out): + """Returns distributional values for Q(s, a) given a state embedding. + + Override this in your custom model to customize the Q output head. + + Arguments: + model_out (Tensor): embedding from the model layers + + Returns: + (action_scores, logits, dist) if num_atoms == 1, otherwise + (action_scores, z, support_logits_per_action, logits, dist) + """ + + return self.advantage_module(model_out) + + def get_state_value(self, model_out): + """Returns the state value prediction for the given state embedding.""" + + return self.value_module(model_out) + + def _noisy_layer(self, action_in, out_size, sigma0, non_linear=True): + """ + a common dense layer: y = w^{T}x + b + a noisy layer: y = (w + \\epsilon_w*\\sigma_w)^{T}x + + (b+\\epsilon_b*\\sigma_b) + where \epsilon are random variables sampled from factorized normal + distributions and \\sigma are trainable variables which are expected to + vanish along the training procedure + """ + in_size = int(action_in.shape[1]) + + epsilon_in = torch.normal(size=[in_size]) + epsilon_out = torch.normal(size=[out_size]) + epsilon_in = self._f_epsilon(epsilon_in) + epsilon_out = self._f_epsilon(epsilon_out) + epsilon_w = torch.matmul( + torch.unsqueeze(epsilon_in, -1), + other=torch.unsqueeze(epsilon_out, 0)) + epsilon_b = epsilon_out + + sigma_w = torch.Tensor( + data=np.random.uniform( + low=-1.0 / np.sqrt(float(in_size)), + high=1.0 / np.sqrt(float(in_size)), + size=[in_size, out_size]), + dtype=torch.float32, + requires_grad=True) + # TF noise generation can be unreliable on GPU + # If generating the noise on the CPU, + # lowering sigma0 to 0.1 may be helpful + sigma_b = torch.Tensor( + data=np.full( + shape=[out_size], fill_value=sigma0 / np.sqrt(float(in_size))), + requires_grad=True) + w = torch.Tensor( + data=np.full( + shape=[in_size, out_size], + fill_value=6 / np.sqrt(float(in_size) + float(out_size))), + requires_grad=True) + b = torch.Tensor(data=np.zeros([out_size]), requires_grad=True) + action_activation = torch.matmul(action_in, w + sigma_w * epsilon_w) \ + + b + sigma_b * epsilon_b + + if not non_linear: + return action_activation + return nn.functional.relu(action_activation) + + def _f_epsilon(self, x): + return torch.sign(x) * torch.pow(torch.abs(x), 0.5) diff --git a/rllib/agents/dqn/dqn_torch_policy.py b/rllib/agents/dqn/dqn_torch_policy.py new file mode 100644 index 000000000..3a60df5db --- /dev/null +++ b/rllib/agents/dqn/dqn_torch_policy.py @@ -0,0 +1,268 @@ +from gym.spaces import Discrete + +import ray +from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio, \ + PRIO_WEIGHTS, Q_SCOPE, Q_TARGET_SCOPE +from ray.rllib.agents.a3c.a3c_torch_policy import apply_grad_clipping +from ray.rllib.agents.dqn.dqn_torch_model import DQNTorchModel +from ray.rllib.agents.dqn.simple_q_torch_policy import TargetNetworkMixin +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.models.catalog import ModelCatalog +from ray.rllib.models.torch.torch_action_dist import TorchCategorical +from ray.rllib.policy.torch_policy import LearningRateSchedule +from ray.rllib.policy.torch_policy_template import build_torch_policy +from ray.rllib.utils.error import UnsupportedSpaceException +from ray.rllib.utils.exploration.parameter_noise import ParameterNoise +from ray.rllib.utils.torch_ops import huber_loss, reduce_mean_ignore_inf +from ray.rllib.utils import try_import_torch + +torch, nn = try_import_torch() +F = None +if nn: + F = torch.nn.functional + + +class QLoss: + def __init__(self, + q_t_selected, + q_tp1_best, + importance_weights, + rewards, + done_mask, + gamma=0.99, + n_step=1, + num_atoms=1, + v_min=-10.0, + v_max=10.0): + + if num_atoms > 1: + raise ValueError("Torch version of DQN does not support " + "distributional Q yet!") + + q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = rewards + gamma**n_step * q_tp1_best_masked + + # compute the error (potentially clipped) + self.td_error = q_t_selected - q_t_selected_target.detach() + self.loss = torch.mean( + importance_weights.float() * huber_loss(self.td_error)) + self.stats = { + "mean_q": torch.mean(q_t_selected), + "min_q": torch.min(q_t_selected), + "max_q": torch.max(q_t_selected), + "td_error": self.td_error, + "mean_td_error": torch.mean(self.td_error), + } + + +class ComputeTDErrorMixin: + def __init__(self): + def compute_td_error(obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + input_dict = self._lazy_tensor_dict({SampleBatch.CUR_OBS: obs_t}) + input_dict[SampleBatch.ACTIONS] = act_t + input_dict[SampleBatch.REWARDS] = rew_t + input_dict[SampleBatch.NEXT_OBS] = obs_tp1 + input_dict[SampleBatch.DONES] = done_mask + input_dict[PRIO_WEIGHTS] = importance_weights + + # Do forward pass on loss to update td error attribute + build_q_losses(self, self.model, None, input_dict) + + return self.q_loss.td_error + + self.compute_td_error = compute_td_error + + +def build_q_model_and_distribution(policy, obs_space, action_space, config): + + if not isinstance(action_space, Discrete): + raise UnsupportedSpaceException( + "Action space {} is not supported for DQN.".format(action_space)) + + if config["hiddens"]: + # try to infer the last layer size, otherwise fall back to 256 + num_outputs = ([256] + config["model"]["fcnet_hiddens"])[-1] + config["model"]["no_final_linear"] = True + else: + num_outputs = action_space.n + + # TODO(sven): Move option to add LayerNorm after each Dense + # generically into ModelCatalog. + add_layer_norm = ( + isinstance(getattr(policy, "exploration", None), ParameterNoise) + or config["exploration_config"]["type"] == "ParameterNoise") + + policy.q_model = ModelCatalog.get_model_v2( + obs_space=obs_space, + action_space=action_space, + num_outputs=num_outputs, + model_config=config["model"], + framework="torch", + model_interface=DQNTorchModel, + name=Q_SCOPE, + dueling=config["dueling"], + q_hiddens=config["hiddens"], + use_noisy=config["noisy"], + sigma0=config["sigma0"], + # TODO(sven): Move option to add LayerNorm after each Dense + # generically into ModelCatalog. + add_layer_norm=add_layer_norm) + + policy.q_func_vars = policy.q_model.variables() + + policy.target_q_model = ModelCatalog.get_model_v2( + obs_space=obs_space, + action_space=action_space, + num_outputs=num_outputs, + model_config=config["model"], + framework="torch", + model_interface=DQNTorchModel, + name=Q_TARGET_SCOPE, + dueling=config["dueling"], + q_hiddens=config["hiddens"], + use_noisy=config["noisy"], + sigma0=config["sigma0"], + # TODO(sven): Move option to add LayerNorm after each Dense + # generically into ModelCatalog. + add_layer_norm=add_layer_norm) + + policy.target_q_func_vars = policy.target_q_model.variables() + + return policy.q_model, TorchCategorical + + +def get_distribution_inputs_and_class(policy, + q_model, + obs_batch, + *, + explore=True, + is_training=False, + **kwargs): + q_vals = compute_q_values(policy, q_model, obs_batch, explore, is_training) + q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals + + policy.q_values = q_vals + return policy.q_values, TorchCategorical, [] # state-out + + +def build_q_losses(policy, model, _, train_batch): + config = policy.config + # q network evaluation + q_t = compute_q_values( + policy, + policy.q_model, + train_batch[SampleBatch.CUR_OBS], + explore=False, + is_training=False) + + # target q network evalution + q_tp1 = compute_q_values( + policy, + policy.target_q_model, + train_batch[SampleBatch.NEXT_OBS], + explore=False, + is_training=False) + + # q scores for actions which we know were selected in the given state. + one_hot_selection = F.one_hot(train_batch[SampleBatch.ACTIONS], + policy.action_space.n) + q_t_selected = torch.sum(q_t * one_hot_selection, 1) + + # compute estimate of best possible value starting from state at t + 1 + if config["double_q"]: + q_tp1_using_online_net = compute_q_values( + policy, + policy.q_model, + train_batch[SampleBatch.NEXT_OBS], + explore=False, + is_training=False) + q_tp1_best_using_online_net = torch.argmax(q_tp1_using_online_net, 1) + q_tp1_best_one_hot_selection = F.one_hot(q_tp1_best_using_online_net, + policy.action_space.n) + q_tp1_best = torch.sum(q_tp1 * q_tp1_best_one_hot_selection, 1) + else: + q_tp1_best_one_hot_selection = F.one_hot( + torch.argmax(q_tp1, 1), policy.action_space.n) + q_tp1_best = torch.sum(q_tp1 * q_tp1_best_one_hot_selection, 1) + + policy.q_loss = QLoss(q_t_selected, q_tp1_best, train_batch[PRIO_WEIGHTS], + train_batch[SampleBatch.REWARDS], + train_batch[SampleBatch.DONES].float(), + config["gamma"], config["n_step"], + config["num_atoms"], config["v_min"], + config["v_max"]) + + return policy.q_loss.loss + + +def adam_optimizer(policy, config): + return torch.optim.Adam( + policy.q_func_vars, lr=policy.cur_lr, eps=config["adam_epsilon"]) + + +def build_q_stats(policy, batch): + return dict({ + "cur_lr": policy.cur_lr, + }, **policy.q_loss.stats) + + +def setup_early_mixins(policy, obs_space, action_space, config): + LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"]) + + +def after_init(policy, obs_space, action_space, config): + ComputeTDErrorMixin.__init__(policy) + TargetNetworkMixin.__init__(policy, obs_space, action_space, config) + # Move target net to device (this is done autoatically for the + # policy.model, but not for any other models the policy has). + policy.target_q_model = policy.target_q_model.to(policy.device) + + +def compute_q_values(policy, model, obs, explore, is_training=False): + if policy.config["num_atoms"] > 1: + raise ValueError("torch DQN does not support distributional DQN yet!") + + model_out, state = model({ + SampleBatch.CUR_OBS: obs, + "is_training": is_training, + }, [], None) + + advantages_or_q_values = model.get_advantages_or_q_values(model_out) + + if policy.config["dueling"]: + state_value = model.get_state_value(model_out) + advantages_mean = reduce_mean_ignore_inf(advantages_or_q_values, 1) + advantages_centered = advantages_or_q_values - torch.unsqueeze( + advantages_mean, 1) + q_values = state_value + advantages_centered + else: + q_values = advantages_or_q_values + + return q_values + + +def extra_action_out_fn(policy, input_dict, state_batches, model, action_dist): + return {"q_values": policy.q_values} + + +DQNTorchPolicy = build_torch_policy( + name="DQNTorchPolicy", + loss_fn=build_q_losses, + get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, + make_model_and_action_dist=build_q_model_and_distribution, + action_distribution_fn=get_distribution_inputs_and_class, + stats_fn=build_q_stats, + postprocess_fn=postprocess_nstep_and_prio, + optimizer_fn=adam_optimizer, + extra_grad_process_fn=apply_grad_clipping, + extra_action_out_fn=extra_action_out_fn, + before_init=setup_early_mixins, + after_init=after_init, + mixins=[ + TargetNetworkMixin, + ComputeTDErrorMixin, + LearningRateSchedule, + ]) diff --git a/rllib/agents/dqn/simple_q.py b/rllib/agents/dqn/simple_q.py new file mode 100644 index 000000000..fd7d3dd09 --- /dev/null +++ b/rllib/agents/dqn/simple_q.py @@ -0,0 +1,87 @@ +import logging + +from ray.rllib.agents.trainer import with_common_config +from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy +from ray.rllib.agents.dqn.dqn import DQNTrainer + +logger = logging.getLogger(__name__) + +# yapf: disable +# __sphinx_doc_begin__ +DEFAULT_CONFIG = with_common_config({ + # === Exploration Settings (Experimental) === + "exploration_config": { + # The Exploration class to use. + "type": "EpsilonGreedy", + # Config for the Exploration class' constructor: + "initial_epsilon": 1.0, + "final_epsilon": 0.02, + "epsilon_timesteps": 10000, # Timesteps over which to anneal epsilon. + + # For soft_q, use: + # "exploration_config" = { + # "type": "SoftQ" + # "temperature": [float, e.g. 1.0] + # } + }, + # Switch to greedy actions in evaluation workers. + "evaluation_config": { + "explore": False, + }, + + # Minimum env steps to optimize for per train call. This value does + # not affect learning, only the length of iterations. + "timesteps_per_iteration": 1000, + # Update the target network every `target_network_update_freq` steps. + "target_network_update_freq": 500, + # === Replay buffer === + # Size of the replay buffer. Note that if async_updates is set, then + # each worker will have a replay buffer of this size. + "buffer_size": 50000, + # Whether to LZ4 compress observations + "compress_observations": True, + + # === Optimization === + # Learning rate for adam optimizer + "lr": 5e-4, + # Learning rate schedule + "lr_schedule": None, + # Adam epsilon hyper parameter + "adam_epsilon": 1e-8, + # If not None, clip gradients during optimization at this value + "grad_clip": 40, + # How many steps of the model to sample before learning starts. + "learning_starts": 1000, + # Update the replay buffer with this many samples at once. Note that + # this setting applies per-worker if num_workers > 1. + "rollout_fragment_length": 4, + # Size of a batch sampled from replay buffer for training. Note that + # if async_updates is set, then each worker returns gradients for a + # batch of this size. + "train_batch_size": 32, + + # === Parallelism === + # Number of workers for collecting samples with. This only makes sense + # to increase if your environment is particularly slow to sample, or if + # you"re using the Async or Ape-X optimizers. + "num_workers": 0, + # Prevent iterations from going lower than this time span + "min_iter_time_s": 1, +}) +# __sphinx_doc_end__ +# yapf: enable + + +def get_policy_class(config): + if config["use_pytorch"]: + from ray.rllib.agents.dqn.simple_q_torch_policy import \ + SimpleQTorchPolicy + return SimpleQTorchPolicy + else: + return SimpleQTFPolicy + + +SimpleQTrainer = DQNTrainer.with_updates( + default_policy=SimpleQTFPolicy, + get_policy_class=get_policy_class, + default_config=DEFAULT_CONFIG) diff --git a/rllib/agents/dqn/simple_q_policy.py b/rllib/agents/dqn/simple_q_tf_policy.py similarity index 80% rename from rllib/agents/dqn/simple_q_policy.py rename to rllib/agents/dqn/simple_q_tf_policy.py index d0473ceb6..3319ab571 100644 --- a/rllib/agents/dqn/simple_q_policy.py +++ b/rllib/agents/dqn/simple_q_tf_policy.py @@ -4,9 +4,9 @@ from gym.spaces import Discrete import logging import ray -from ray.rllib.agents.dqn.simple_q_model import SimpleQModel from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models import ModelCatalog +from ray.rllib.models.torch.torch_action_dist import TorchCategorical from ray.rllib.models.tf.tf_action_dist import Categorical from ray.rllib.utils.annotations import override from ray.rllib.utils.error import UnsupportedSpaceException @@ -50,31 +50,24 @@ def build_q_models(policy, obs_space, action_space, config): raise UnsupportedSpaceException( "Action space {} is not supported for DQN.".format(action_space)) - if config["hiddens"]: - num_outputs = 256 - config["model"]["no_final_linear"] = True - else: - num_outputs = action_space.n - policy.q_model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], - framework="tf", - name=Q_SCOPE, - model_interface=SimpleQModel, - q_hiddens=config["hiddens"]) + obs_space=obs_space, + action_space=action_space, + num_outputs=action_space.n, + model_config=config["model"], + framework="torch" if config["use_pytorch"] else "tf", + name=Q_SCOPE) policy.target_q_model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], - framework="tf", - name=Q_TARGET_SCOPE, - model_interface=SimpleQModel, - q_hiddens=config["hiddens"]) + obs_space=obs_space, + action_space=action_space, + num_outputs=action_space.n, + model_config=config["model"], + framework="torch" if config["use_pytorch"] else "tf", + name=Q_TARGET_SCOPE) + + policy.q_func_vars = policy.q_model.variables() + policy.target_q_func_vars = policy.target_q_model.variables() return policy.q_model @@ -84,13 +77,15 @@ def get_distribution_inputs_and_class(policy, obs_batch, *, explore=True, + is_training=True, **kwargs): - q_vals = compute_q_values(policy, q_model, obs_batch, explore) + q_vals = compute_q_values(policy, q_model, obs_batch, explore, is_training) q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals policy.q_values = q_vals - policy.q_func_vars = q_model.variables() - return policy.q_values, Categorical, [] # state-outs + return policy.q_values,\ + TorchCategorical if policy.config["use_pytorch"] else Categorical,\ + [] # state-outs def build_q_losses(policy, model, dist_class, train_batch): @@ -136,21 +131,22 @@ def build_q_losses(policy, model, dist_class, train_batch): return loss -def compute_q_values(policy, model, obs, explore): +def compute_q_values(policy, model, obs, explore, is_training=None): model_out, _ = model({ SampleBatch.CUR_OBS: obs, - "is_training": policy._get_is_training_placeholder(), + "is_training": is_training + if is_training is not None else policy._get_is_training_placeholder(), }, [], None) - return model.get_q_values(model_out) + return model_out def setup_late_mixins(policy, obs_space, action_space, config): TargetNetworkMixin.__init__(policy, obs_space, action_space, config) -SimpleQPolicy = build_tf_policy( - name="SimpleQPolicy", +SimpleQTFPolicy = build_tf_policy( + name="SimpleQTFPolicy", get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, make_model=build_q_models, action_distribution_fn=get_distribution_inputs_and_class, diff --git a/rllib/agents/dqn/simple_q_torch_policy.py b/rllib/agents/dqn/simple_q_torch_policy.py new file mode 100644 index 000000000..a240f721d --- /dev/null +++ b/rllib/agents/dqn/simple_q_torch_policy.py @@ -0,0 +1,100 @@ +"""Basic example of a DQN policy without any optimizations.""" + +import logging + +import ray +from ray.rllib.agents.dqn.simple_q_tf_policy import build_q_models, \ + get_distribution_inputs_and_class, compute_q_values +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.models.torch.torch_action_dist import TorchCategorical +from ray.rllib.policy.torch_policy_template import build_torch_policy +from ray.rllib.utils import try_import_torch +from ray.rllib.utils.torch_ops import huber_loss + +torch, nn = try_import_torch() +F = None +if nn: + F = nn.functional +logger = logging.getLogger(__name__) + + +class TargetNetworkMixin: + def __init__(self, obs_space, action_space, config): + def do_update(): + # Update_target_fn will be called periodically to copy Q network to + # target Q network. + assert len(self.q_func_vars) == len(self.target_q_func_vars), \ + (self.q_func_vars, self.target_q_func_vars) + self.target_q_model.load_state_dict(self.q_model.state_dict()) + + self.update_target = do_update + + +def build_q_model_and_distribution(policy, obs_space, action_space, config): + return build_q_models(policy, obs_space, action_space, config), \ + TorchCategorical + + +def build_q_losses(policy, model, dist_class, train_batch): + # q network evaluation + q_t = compute_q_values( + policy, + policy.q_model, + train_batch[SampleBatch.CUR_OBS], + explore=False, + is_training=True) + + # target q network evalution + q_tp1 = compute_q_values( + policy, + policy.target_q_model, + train_batch[SampleBatch.NEXT_OBS], + explore=False, + is_training=True) + + # q scores for actions which we know were selected in the given state. + one_hot_selection = F.one_hot(train_batch[SampleBatch.ACTIONS], + policy.action_space.n) + q_t_selected = torch.sum(q_t * one_hot_selection, 1) + + # compute estimate of best possible value starting from state at t + 1 + dones = train_batch[SampleBatch.DONES].float() + q_tp1_best_one_hot_selection = F.one_hot( + torch.argmax(q_tp1, 1), policy.action_space.n) + q_tp1_best = torch.sum(q_tp1 * q_tp1_best_one_hot_selection, 1) + q_tp1_best_masked = (1.0 - dones) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = (train_batch[SampleBatch.REWARDS] + + policy.config["gamma"] * q_tp1_best_masked) + + # Compute the error (Square/Huber). + td_error = q_t_selected - q_t_selected_target.detach() + loss = torch.mean(huber_loss(td_error)) + + # save TD error as an attribute for outside access + policy.td_error = td_error + + return loss + + +def extra_action_out_fn(policy, input_dict, state_batches, model, action_dist): + """Adds q-values to action out dict.""" + return {"q_values": policy.q_values} + + +def setup_late_mixins(policy, obs_space, action_space, config): + TargetNetworkMixin.__init__(policy, obs_space, action_space, config) + + +SimpleQTorchPolicy = build_torch_policy( + name="SimpleQPolicy", + loss_fn=build_q_losses, + get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, + extra_action_out_fn=extra_action_out_fn, + after_init=setup_late_mixins, + make_model_and_action_dist=build_q_model_and_distribution, + mixins=[TargetNetworkMixin], + action_distribution_fn=get_distribution_inputs_and_class, + stats_fn=lambda policy, config: {"td_error": policy.td_error}, +) diff --git a/rllib/agents/dqn/tests/test_apex.py b/rllib/agents/dqn/tests/test_apex.py index c840957b8..9ae6cdca9 100644 --- a/rllib/agents/dqn/tests/test_apex.py +++ b/rllib/agents/dqn/tests/test_apex.py @@ -4,6 +4,7 @@ import unittest import ray import ray.rllib.agents.dqn.apex as apex +from ray.rllib.utils.test_utils import framework_iterator class TestApex(unittest.TestCase): @@ -17,11 +18,14 @@ class TestApex(unittest.TestCase): config = apex.APEX_DEFAULT_CONFIG.copy() config["num_workers"] = 3 config["optimizer"]["num_replay_buffer_shards"] = 1 - trainer = apex.ApexTrainer(config, env="CartPole-v0") - infos = trainer.workers.foreach_policy( - lambda p, _: p.get_exploration_info()) - eps = [i["cur_epsilon"] for i in infos] - assert np.allclose(eps, [1.0, 0.016190862, 0.00065536, 2.6527108e-05]) + + for _ in framework_iterator(config): + trainer = apex.ApexTrainer(config, env="CartPole-v0") + infos = trainer.workers.foreach_policy( + lambda p, _: p.get_exploration_info()) + eps = [i["cur_epsilon"] for i in infos] + assert np.allclose(eps, + [1.0, 0.016190862, 0.00065536, 2.6527108e-05]) if __name__ == "__main__": diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index e36a19da3..a1a7ecfd4 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -10,13 +10,23 @@ tf = try_import_tf() class TestDQN(unittest.TestCase): def test_dqn_compilation(self): - """Test whether a DQNTrainer can be built with both frameworks.""" + """Test whether a DQNTrainer can be built on all frameworks.""" config = dqn.DEFAULT_CONFIG.copy() config["num_workers"] = 0 # Run locally. num_iterations = 2 - for _ in framework_iterator(config, frameworks=["tf", "eager"]): + for fw in framework_iterator(config): + # double-dueling DQN. + plain_config = config.copy() + trainer = dqn.DQNTrainer(config=plain_config, env="CartPole-v0") + for i in range(num_iterations): + results = trainer.train() + print(results) + # Rainbow. + # TODO(sven): Add torch once DQN-torch supports distributional-Q. + if fw == "torch": + continue rainbow_config = config.copy() rainbow_config["num_atoms"] = 10 rainbow_config["noisy"] = True @@ -28,13 +38,6 @@ class TestDQN(unittest.TestCase): results = trainer.train() print(results) - # double-dueling DQN. - plain_config = config.copy() - trainer = dqn.DQNTrainer(config=plain_config, env="CartPole-v0") - for i in range(num_iterations): - results = trainer.train() - print(results) - def test_dqn_exploration_and_soft_q_config(self): """Tests, whether a DQN Agent outputs exploration/softmaxed actions.""" config = dqn.DEFAULT_CONFIG.copy() @@ -43,7 +46,7 @@ class TestDQN(unittest.TestCase): obs = np.array(0) # Test against all frameworks. - for _ in framework_iterator(config, ["tf", "eager"]): + for _ in framework_iterator(config): # Default EpsilonGreedy setup. trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") # Setting explore=False should always return the same action. @@ -61,14 +64,14 @@ class TestDQN(unittest.TestCase): # (but no epsilon exploration). config["exploration_config"] = { "type": "SoftQ", - "temperature": 0.001 + "temperature": 0.000001 } trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") # Due to the low temp, always expect the same action. - a_ = trainer.compute_action(obs) + actions = [trainer.compute_action(obs)] for _ in range(50): - a = trainer.compute_action(obs) - check(a, a_) + actions.append(trainer.compute_action(obs)) + check(np.std(actions), 0.0, decimals=3) # Higher softmax temperature. config["exploration_config"]["temperature"] = 1.0 @@ -104,8 +107,8 @@ class TestDQN(unittest.TestCase): core_config["num_workers"] = 0 # Run locally. core_config["env_config"] = {"is_slippery": False, "map_name": "4x4"} - for fw in framework_iterator(core_config, ["tf", "eager"]): - + # Test against all frameworks. + for fw in framework_iterator(core_config): config = core_config.copy() # DQN with ParameterNoise exploration (config["explore"]=True). @@ -115,13 +118,14 @@ class TestDQN(unittest.TestCase): trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") policy = trainer.get_policy() + p_sess = getattr(policy, "_sess", None) self.assertFalse(policy.exploration.weights_are_currently_noisy) noise_before = self._get_current_noise(policy, fw) check(noise_before, 0.0) initial_weights = self._get_current_weight(policy, fw) # Pseudo-start an episode and compare the weights before and after. - policy.exploration.on_episode_start(policy, tf_sess=policy._sess) + policy.exploration.on_episode_start(policy, tf_sess=p_sess) self.assertFalse(policy.exploration.weights_are_currently_noisy) noise_after_ep_start = self._get_current_noise(policy, fw) weights_after_ep_start = self._get_current_weight(policy, fw) @@ -162,7 +166,7 @@ class TestDQN(unittest.TestCase): # Pseudo-end the episode and compare weights again. # Make sure they are the original ones. - policy.exploration.on_episode_end(policy, tf_sess=policy._sess) + policy.exploration.on_episode_end(policy, tf_sess=p_sess) weights_after_ep_end = self._get_current_weight(policy, fw) check(current_weight - noise, weights_after_ep_end, decimals=5) @@ -173,6 +177,7 @@ class TestDQN(unittest.TestCase): config["explore"] = False trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") policy = trainer.get_policy() + p_sess = getattr(policy, "_sess", None) self.assertFalse(policy.exploration.weights_are_currently_noisy) initial_weights = self._get_current_weight(policy, fw) @@ -182,7 +187,7 @@ class TestDQN(unittest.TestCase): # Pseudo-start an episode and compare the weights before and after # (they should be the same). - policy.exploration.on_episode_start(policy, tf_sess=policy._sess) + policy.exploration.on_episode_start(policy, tf_sess=p_sess) self.assertFalse(policy.exploration.weights_are_currently_noisy) # Should be the same, as we don't do anything at the beginning of @@ -207,7 +212,7 @@ class TestDQN(unittest.TestCase): # Pseudo-end the episode and compare weights again. # Make sure they are the original ones (no noise permanently # applied throughout the episode). - policy.exploration.on_episode_end(policy, tf_sess=policy._sess) + policy.exploration.on_episode_end(policy, tf_sess=p_sess) weights_after_episode_end = self._get_current_weight(policy, fw) check(initial_weights, weights_after_episode_end) # Noise should still be the same (re-sampling only happens at @@ -232,7 +237,8 @@ class TestDQN(unittest.TestCase): # the same action for the same input (parameter noise is # deterministic). policy = trainer.get_policy() - policy.exploration.on_episode_start(policy, tf_sess=policy._sess) + p_sess = getattr(policy, "_sess", None) + policy.exploration.on_episode_start(policy, tf_sess=p_sess) a_ = trainer.compute_action(obs) for _ in range(10): a = trainer.compute_action(obs, explore=True) diff --git a/rllib/agents/dqn/tests/test_simple_q.py b/rllib/agents/dqn/tests/test_simple_q.py new file mode 100644 index 000000000..d3c9d36a1 --- /dev/null +++ b/rllib/agents/dqn/tests/test_simple_q.py @@ -0,0 +1,102 @@ +import numpy as np +import unittest + +import ray.rllib.agents.dqn as dqn +from ray.rllib.agents.dqn.simple_q_tf_policy import build_q_losses as loss_tf +from ray.rllib.agents.dqn.simple_q_torch_policy import build_q_losses as \ + loss_torch +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.numpy import fc, one_hot, huber_loss +from ray.rllib.utils.test_utils import check, framework_iterator + +tf = try_import_tf() + + +class TestSimpleQ(unittest.TestCase): + def test_simple_q_compilation(self): + """Test whether a SimpleQTrainer can be built on all frameworks.""" + config = dqn.SIMPLE_Q_DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + + for _ in framework_iterator(config): + trainer = dqn.SimpleQTrainer(config=config, env="CartPole-v0") + num_iterations = 2 + for i in range(num_iterations): + results = trainer.train() + print(results) + + def test_simple_q_loss_function(self): + """Tests the Simple-Q loss function results on all frameworks.""" + config = dqn.SIMPLE_Q_DEFAULT_CONFIG.copy() + # Run locally. + config["num_workers"] = 0 + # Use very simple net (layer0=10 nodes, q-layer=2 nodes (2 actions)). + config["model"]["fcnet_hiddens"] = [10] + config["model"]["fcnet_activation"] = "linear" + + for fw in framework_iterator(config): + # Generate Trainer and get its default Policy object. + trainer = dqn.SimpleQTrainer(config=config, env="CartPole-v0") + policy = trainer.get_policy() + # Batch of size=2. + input_ = { + SampleBatch.CUR_OBS: np.random.random(size=(2, 4)), + SampleBatch.ACTIONS: np.array([0, 1]), + SampleBatch.REWARDS: np.array([0.4, -1.23]), + SampleBatch.DONES: np.array([False, False]), + SampleBatch.NEXT_OBS: np.random.random(size=(2, 4)) + } + # Get model vars for computing expected model outs (q-vals). + # 0=layer-kernel; 1=layer-bias; 2=q-val-kernel; 3=q-val-bias + vars = policy.get_weights() + if isinstance(vars, dict): + vars = list(vars.values()) + vars_t = policy.target_q_func_vars + if fw == "tf": + vars_t = policy.get_session().run(vars_t) + + # Q(s,a) outputs. + q_t = np.sum( + one_hot(input_[SampleBatch.ACTIONS], 2) * fc( + fc(input_[SampleBatch.CUR_OBS], + vars[0], + vars[1], + framework=fw), + vars[2], + vars[3], + framework=fw), 1) + # max[a'](Qtarget(s',a')) outputs. + q_target_tp1 = np.max( + fc(fc( + input_[SampleBatch.NEXT_OBS], + vars_t[0], + vars_t[1], + framework=fw), + vars_t[2], + vars_t[3], + framework=fw), 1) + # TD-errors (Bellman equation). + td_error = q_t - config["gamma"] * input_[SampleBatch.REWARDS] + \ + q_target_tp1 + # Huber/Square loss on TD-error. + expected_loss = huber_loss(td_error).mean() + + if fw == "torch": + input_ = policy._lazy_tensor_dict(input_) + # Get actual out and compare. + if fw == "tf": + out = policy.get_session().run( + policy._loss, + feed_dict=policy._get_loss_inputs_dict( + input_, shuffle=False)) + else: + out = (loss_torch if fw == "torch" else + loss_tf)(policy, policy.model, None, input_) + check(out, expected_loss, decimals=1) + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/agents/ppo/tests/test_ddppo.py b/rllib/agents/ppo/tests/test_ddppo.py new file mode 100644 index 000000000..5e8c04cd5 --- /dev/null +++ b/rllib/agents/ppo/tests/test_ddppo.py @@ -0,0 +1,35 @@ +import unittest + +import ray +import ray.rllib.agents.ppo as ppo +from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.test_utils import framework_iterator + +tf = try_import_tf() + + +class TestDDPPO(unittest.TestCase): + @classmethod + def setUpClass(cls): + ray.init() + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + def test_ddppo_compilation(self): + """Test whether a DDPPOTrainer can be built with both frameworks.""" + config = ppo.ddppo.DEFAULT_CONFIG.copy() + config["num_gpus_per_worker"] = 0 + num_iterations = 2 + + for _ in framework_iterator(config, "torch"): + trainer = ppo.ddppo.DDPPOTrainer(config=config, env="CartPole-v0") + for i in range(num_iterations): + trainer.train() + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index 976488551..cdc0df228 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -133,8 +133,7 @@ class TestPPO(unittest.TestCase): [-0.5, -0.1, -0.2], dtype=np.float32), } - for fw, sess in framework_iterator( - config, frameworks=["eager", "tf", "torch"], session=True): + for fw, sess in framework_iterator(config, session=True): trainer = ppo.PPOTrainer(config=config, env="CartPole-v0") policy = trainer.get_policy() @@ -164,10 +163,15 @@ class TestPPO(unittest.TestCase): list(policy.model.parameters()) if fw == "tf": vars = policy.get_session().run(vars) - expected_shared_out = fc(train_batch[SampleBatch.CUR_OBS], vars[0], - vars[1]) - expected_logits = fc(expected_shared_out, vars[2], vars[3]) - expected_value_outs = fc(expected_shared_out, vars[4], vars[5]) + expected_shared_out = fc( + train_batch[SampleBatch.CUR_OBS], + vars[0], + vars[1], + framework=fw) + expected_logits = fc( + expected_shared_out, vars[2], vars[3], framework=fw) + expected_value_outs = fc( + expected_shared_out, vars[4], vars[5], framework=fw) kl, entropy, pg_loss, vf_loss, overall_loss = \ self._ppo_loss_helper( diff --git a/rllib/agents/qmix/qmix.py b/rllib/agents/qmix/qmix.py index de403b80e..92f9090b8 100644 --- a/rllib/agents/qmix/qmix.py +++ b/rllib/agents/qmix/qmix.py @@ -99,4 +99,5 @@ QMixTrainer = GenericOffPolicyTrainer.with_updates( name="QMIX", default_config=DEFAULT_CONFIG, default_policy=QMixTorchPolicy, + get_policy_class=None, make_policy_optimizer=make_sync_batch_optimizer) diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index 1ac3db115..399fcd454 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -103,4 +103,4 @@ SACTrainer = GenericOffPolicyTrainer.with_updates( name="SAC", default_config=DEFAULT_CONFIG, default_policy=SACTFPolicy, -) + get_policy_class=lambda c: SACTFPolicy) diff --git a/rllib/agents/sac/sac_policy.py b/rllib/agents/sac/sac_policy.py index 4914a7b89..d5acecd19 100644 --- a/rllib/agents/sac/sac_policy.py +++ b/rllib/agents/sac/sac_policy.py @@ -5,7 +5,7 @@ import ray import ray.experimental.tf_utils from gym.spaces import Box, Discrete from ray.rllib.agents.ddpg.noop_model import NoopModel -from ray.rllib.agents.dqn.dqn_policy import postprocess_nstep_and_prio, \ +from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio, \ PRIO_WEIGHTS from ray.rllib.agents.sac.sac_model import SACModel from ray.rllib.models import ModelCatalog diff --git a/rllib/contrib/maddpg/maddpg.py b/rllib/contrib/maddpg/maddpg.py index dfaa1cb25..ad8604588 100644 --- a/rllib/contrib/maddpg/maddpg.py +++ b/rllib/contrib/maddpg/maddpg.py @@ -172,6 +172,7 @@ MADDPGTrainer = GenericOffPolicyTrainer.with_updates( name="MADDPG", default_config=DEFAULT_CONFIG, default_policy=MADDPGTFPolicy, + get_policy_class=None, before_init=None, before_train_step=set_global_timestep, make_policy_optimizer=make_optimizer, diff --git a/rllib/contrib/maddpg/maddpg_policy.py b/rllib/contrib/maddpg/maddpg_policy.py index 4865db839..5dba875ee 100644 --- a/rllib/contrib/maddpg/maddpg_policy.py +++ b/rllib/contrib/maddpg/maddpg_policy.py @@ -1,5 +1,5 @@ import ray -from ray.rllib.agents.dqn.dqn_policy import minimize_and_clip, _adjust_nstep +from ray.rllib.agents.dqn.dqn_tf_policy import minimize_and_clip, _adjust_nstep from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models import ModelCatalog diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 529586676..de92cae26 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -255,7 +255,10 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker): policy_config = policy_config or {} if (tf and policy_config.get("eager") and not policy_config.get("no_eager_on_workers")): - tf.enable_eager_execution() + # This check is necessary for certain all-framework tests that + # use tf's eager_mode() context generator. + if not tf.executing_eagerly(): + tf.enable_eager_execution() if log_level: logging.getLogger("ray.rllib").setLevel(log_level) diff --git a/rllib/examples/custom_keras_model.py b/rllib/examples/custom_keras_model.py index 84f3d980f..f2f5dc614 100644 --- a/rllib/examples/custom_keras_model.py +++ b/rllib/examples/custom_keras_model.py @@ -7,7 +7,8 @@ from ray import tune from ray.rllib.models import ModelCatalog from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel +from ray.rllib.agents.dqn.distributional_q_tf_model import \ + DistributionalQTFModel from ray.rllib.utils import try_import_tf from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as MyVisionNetwork @@ -58,7 +59,7 @@ class MyKerasModel(TFModelV2): return {"foo": tf.constant(42.0)} -class MyKerasQModel(DistributionalQModel): +class MyKerasQModel(DistributionalQTFModel): """Custom model for DQN.""" def __init__(self, obs_space, action_space, num_outputs, model_config, diff --git a/rllib/examples/multi_agent_two_trainers.py b/rllib/examples/multi_agent_two_trainers.py index 7c8ca524c..994bbd3f1 100644 --- a/rllib/examples/multi_agent_two_trainers.py +++ b/rllib/examples/multi_agent_two_trainers.py @@ -13,7 +13,7 @@ import gym import ray from ray.rllib.agents.dqn.dqn import DQNTrainer -from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy +from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy from ray.rllib.agents.ppo.ppo import PPOTrainer from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy from ray.rllib.tests.test_multi_agent_env import MultiCartpole diff --git a/rllib/examples/parametric_action_cartpole.py b/rllib/examples/parametric_action_cartpole.py index dd1eb5c6f..d55b10160 100644 --- a/rllib/examples/parametric_action_cartpole.py +++ b/rllib/examples/parametric_action_cartpole.py @@ -22,7 +22,8 @@ from gym.spaces import Box, Discrete, Dict import ray from ray import tune -from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel +from ray.rllib.agents.dqn.distributional_q_tf_model import \ + DistributionalQTFModel from ray.rllib.models import ModelCatalog from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork from ray.rllib.models.tf.tf_modelv2 import TFModelV2 @@ -109,7 +110,7 @@ class ParametricActionCartpole(gym.Env): return obs, rew, done, info -class ParametricActionsModel(DistributionalQModel, TFModelV2): +class ParametricActionsModel(DistributionalQTFModel, TFModelV2): """Parametric action model that handles the dot product and masking. This assumes the outputs are logits for a single Categorical action dist. diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index 71daab5dc..65fa238d6 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -1,26 +1,26 @@ +from functools import partial import gym import logging import numpy as np -from functools import partial from ray.tune.registry import RLLIB_MODEL, RLLIB_PREPROCESSOR, \ RLLIB_ACTION_DIST, _global_registry - from ray.rllib.models.extra_spaces import Simplex from ray.rllib.models.action_dist import ActionDistribution -from ray.rllib.models.torch.torch_action_dist import TorchCategorical, \ - TorchMultiCategorical, TorchDiagGaussian -from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2 -from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2 -from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \ - Deterministic, DiagGaussian, MultiActionDistribution, Dirichlet +from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.models.preprocessors import get_preprocessor from ray.rllib.models.tf.fcnet_v1 import FullyConnectedNetwork +from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2 from ray.rllib.models.tf.lstm_v1 import LSTM from ray.rllib.models.tf.modelv1_compat import make_v1_wrapper +from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \ + Deterministic, DiagGaussian, MultiActionDistribution, Dirichlet from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.visionnet_v1 import VisionNetwork -from ray.rllib.models.modelv2 import ModelV2 +from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2 +from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 +from ray.rllib.models.torch.torch_action_dist import TorchCategorical, \ + TorchMultiCategorical, TorchDiagGaussian from ray.rllib.utils import try_import_tf from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI from ray.rllib.utils.error import UnsupportedSpaceException @@ -350,8 +350,12 @@ class ModelCatalog: if default_model: return default_model(obs_space, action_space, num_outputs, model_config, name) - return ModelCatalog._get_default_torch_model_v2( + v2_class = ModelCatalog._get_default_torch_model_class_v2( obs_space, action_space, num_outputs, model_config, name) + # wrap in the requested interface + wrapper = ModelCatalog._wrap_if_needed(v2_class, model_interface) + return wrapper(obs_space, action_space, num_outputs, model_config, + name, **model_kwargs) else: raise NotImplementedError( "Framework must be 'tf' or 'torch': {}".format(framework)) @@ -451,7 +455,7 @@ class ModelCatalog: @staticmethod def _wrap_if_needed(model_cls, model_interface): - assert issubclass(model_cls, TFModelV2), model_cls + assert issubclass(model_cls, (TFModelV2, TorchModelV2)), model_cls if not model_interface or issubclass(model_cls, model_interface): return model_cls @@ -466,30 +470,22 @@ class ModelCatalog: return wrapper @staticmethod - def _get_default_torch_model_v2(obs_space, action_space, num_outputs, - model_config, name): + def _get_default_torch_model_class_v2(obs_space, action_space, num_outputs, + model_config, name): from ray.rllib.models.torch.fcnet import (FullyConnectedNetwork as PyTorchFCNet) from ray.rllib.models.torch.visionnet import (VisionNetwork as PyTorchVisionNet) - model_config = model_config or MODEL_DEFAULTS - if model_config.get("use_lstm"): raise NotImplementedError( "LSTM auto-wrapping not implemented for torch") - if isinstance(obs_space, gym.spaces.Discrete): - obs_rank = 1 + if isinstance(obs_space, gym.spaces.Discrete) or \ + len(obs_space.shape) <= 2: + return PyTorchFCNet else: - obs_rank = len(obs_space.shape) - - if obs_rank > 2: - return PyTorchVisionNet(obs_space, action_space, num_outputs, - model_config, name) - - return PyTorchFCNet(obs_space, action_space, num_outputs, model_config, - name) + return PyTorchVisionNet @staticmethod def get_model(input_dict, diff --git a/rllib/models/modelv2.py b/rllib/models/modelv2.py index d3e147620..dee1308a5 100644 --- a/rllib/models/modelv2.py +++ b/rllib/models/modelv2.py @@ -84,7 +84,11 @@ class ModelV2: raise NotImplementedError def value_function(self): - """Return the value function estimate for the most recent forward pass. + """Returns the value function output for the most recent forward pass. + + Note that a `forward` call has to be performed first, before this + methods can return anything and thus that calling this method does not + cause an extra forward pass through the network. Returns: value estimate tensor of shape [BATCH]. @@ -222,6 +226,33 @@ class ModelV2: """Returns a contextmanager for the current forward pass.""" return NullContextManager() + def variables(self, as_dict=False): + """Returns the list (or a dict) of variables for this model. + + Args: + as_dict(bool): Whether variables should be returned as dict-values + (using descriptive keys). + + Returns: + Union[List[any],Dict[str,any]]: The list (or dict if `as_dict` is + True) of all variables of this ModelV2. + """ + raise NotImplementedError + + def trainable_variables(self, as_dict=False): + """Returns the list of trainable variables for this model. + + Args: + as_dict(bool): Whether variables should be returned as dict-values + (using descriptive keys). + + Returns: + Union[List[any],Dict[str,any]]: The list (or dict if `as_dict` is + True) of all trainable (tf)/requires_grad (torch) variables + of this ModelV2. + """ + raise NotImplementedError + class NullContextManager: """No-op context manager""" diff --git a/rllib/models/tf/tf_modelv2.py b/rllib/models/tf/tf_modelv2.py index 15877f30c..71b5b0109 100644 --- a/rllib/models/tf/tf_modelv2.py +++ b/rllib/models/tf/tf_modelv2.py @@ -1,6 +1,7 @@ from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils.annotations import PublicAPI from ray.rllib.utils import try_import_tf +from ray.rllib.utils.annotations import override tf = try_import_tf() @@ -65,7 +66,7 @@ class TFModelV2(ModelV2): Custom models should override this instead of __call__. - Arguments: + Args: input_dict (dict): dictionary of input tensors, including "obs", "obs_flat", "prev_action", "prev_reward", "is_training" state (list): list of state tensors with sizes matching those @@ -76,24 +77,11 @@ class TFModelV2(ModelV2): (outputs, state): The model output tensor of size [BATCH, num_outputs] - Sample implementation for the ``MyModelClass`` example:: - - def forward(self, input_dict, state, seq_lens): - model_out, self._value_out = self.base_model(input_dict["obs"]) - return model_out, state - """ - raise NotImplementedError - - def value_function(self): - """Return the value function estimate for the most recent forward pass. - - Returns: - value estimate tensor of shape [BATCH]. - - Sample implementation for the ``MyModelClass`` example:: - - def value_function(self): - return self._value_out + Examples: + >>> def forward(self, input_dict, state, seq_lens): + >>> model_out, self._value_out = self.base_model( + ... input_dict["obs"]) + >>> return model_out, state """ raise NotImplementedError @@ -107,10 +95,17 @@ class TFModelV2(ModelV2): """Register the given list of variables with this model.""" self.var_list.extend(variables) - def variables(self): - """Returns the list of variables for this model.""" + @override(ModelV2) + def variables(self, as_dict=False): + if as_dict: + return {v.name: v for v in self.var_list} return list(self.var_list) - def trainable_variables(self): - """Returns the list of trainable variables for this model.""" + @override(ModelV2) + def trainable_variables(self, as_dict=False): + if as_dict: + return { + k: v + for k, v in self.variables(as_dict=True).items() if v.trainable + } return [v for v in self.variables() if v.trainable] diff --git a/rllib/models/torch/torch_action_dist.py b/rllib/models/torch/torch_action_dist.py index aeb6c337b..254d91ff8 100644 --- a/rllib/models/torch/torch_action_dist.py +++ b/rllib/models/torch/torch_action_dist.py @@ -46,8 +46,10 @@ class TorchCategorical(TorchDistributionWrapper): @override(ActionDistribution) def __init__(self, inputs, model=None, temperature=1.0): - assert temperature > 0.0, "Categorical `temperature` must be > 0.0!" - inputs /= temperature + if temperature != 1.0: + assert temperature > 0.0, \ + "Categorical `temperature` must be > 0.0!" + inputs /= temperature super().__init__(inputs, model) self.dist = torch.distributions.categorical.Categorical( logits=self.inputs) diff --git a/rllib/models/torch/torch_modelv2.py b/rllib/models/torch/torch_modelv2.py index ed34fcb90..8f310c190 100644 --- a/rllib/models/torch/torch_modelv2.py +++ b/rllib/models/torch/torch_modelv2.py @@ -1,5 +1,5 @@ from ray.rllib.models.modelv2 import ModelV2 -from ray.rllib.utils.annotations import PublicAPI +from ray.rllib.utils.annotations import override, PublicAPI from ray.rllib.utils import try_import_torch _, nn = try_import_torch() @@ -55,7 +55,7 @@ class TorchModelV2(ModelV2): Custom models should override this instead of __call__. - Arguments: + Args: input_dict (dict): dictionary of input tensors, including "obs", "obs_flat", "prev_action", "prev_reward", "is_training" state (list): list of state tensors with sizes matching those @@ -66,24 +66,26 @@ class TorchModelV2(ModelV2): (outputs, state): The model output tensor of size [BATCH, num_outputs] - Sample implementation for the ``MyModelClass`` example:: - - def forward(self, input_dict, state, seq_lens): - features = self._hidden_layers(input_dict["obs"]) - self._value_out = self._value_branch(features) - return self._logits(features), state + Examples: + >>> def forward(self, input_dict, state, seq_lens): + >>> features = self._hidden_layers(input_dict["obs"]) + >>> self._value_out = self._value_branch(features) + >>> return self._logits(features), state """ raise NotImplementedError - def value_function(self): - """Return the value function estimate for the most recent forward pass. + @override(ModelV2) + def variables(self, as_dict=False): + if as_dict: + return self.state_dict() + return list(self.parameters()) - Returns: - value estimate tensor of shape [BATCH]. - - Sample implementation for the ``MyModelClass`` example:: - - def value_function(self): - return self._value_out - """ - raise NotImplementedError + @override(ModelV2) + def trainable_variables(self, as_dict=False): + if as_dict: + return { + k: v + for k, v in self.variables(as_dict=True).items() + if v.requires_grad + } + return [v for v in self.variables() if v.requires_grad] diff --git a/rllib/optimizers/sync_replay_optimizer.py b/rllib/optimizers/sync_replay_optimizer.py index fef50e778..46faa0769 100644 --- a/rllib/optimizers/sync_replay_optimizer.py +++ b/rllib/optimizers/sync_replay_optimizer.py @@ -169,7 +169,12 @@ class SyncReplayOptimizer(PolicyOptimizer): self.learner_stats[policy_id] = get_learner_stats(info) replay_buffer = self.replay_buffers[policy_id] if isinstance(replay_buffer, PrioritizedReplayBuffer): - td_error = info["td_error"] + # TODO(sven): This is currently structured differently for + # torch/tf. Clean up these results/info dicts across + # policies (note: fixing this in torch_policy.py will + # break e.g. DDPPO!). + td_error = info.get( + "td_error", info["learner_stats"].get("td_error")) new_priorities = ( np.abs(td_error) + self.prioritized_replay_eps) replay_buffer.update_priorities( diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index d4c557c4f..8ec176bec 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -357,7 +357,9 @@ def build_eager_tf_policy(name, action_distribution_fn( self, self.model, input_dict[SampleBatch.CUR_OBS], - explore=explore, timestep=timestep) + explore=explore, + timestep=timestep, + is_training=False) else: dist_class = self.dist_class dist_inputs, state_out = self.model( @@ -420,7 +422,11 @@ def build_eager_tf_policy(name, # Action dist class and inputs are generated via custom function. if action_distribution_fn: dist_inputs, dist_class, _ = action_distribution_fn( - self, self.model, input_dict[SampleBatch.CUR_OBS]) + self, + self.model, + input_dict[SampleBatch.CUR_OBS], + explore=False, + is_training=False) action_dist = dist_class(dist_inputs, self.model) log_likelihoods = action_dist.logp(actions) # Default log-likelihood calculation. @@ -560,6 +566,7 @@ def build_eager_tf_policy(name, } else: fetches[LEARNER_STATS_KEY] = {} + if extra_learn_fetches_fn: fetches.update( {k: v diff --git a/rllib/policy/tests/test_compute_log_likelihoods.py b/rllib/policy/tests/test_compute_log_likelihoods.py index b915bb49e..04951f978 100644 --- a/rllib/policy/tests/test_compute_log_likelihoods.py +++ b/rllib/policy/tests/test_compute_log_likelihoods.py @@ -76,9 +76,11 @@ def do_test_log_likelihood(run, layer_key[0])]) else: expected_mean_logstd = fc( - fc(obs_batch, - vars["_hidden_layers.0._model.0.weight"]), - vars["_logits._model.0.weight"]) + fc( + obs_batch, + np.transpose( + vars["_hidden_layers.0._model.0.weight"])), + np.transpose(vars["_logits._model.0.weight"])) mean, log_std = np.split(expected_mean_logstd, 2, axis=-1) if logp_func is None: expected_logp = np.log(norm.pdf(a, mean, np.exp(log_std))) diff --git a/rllib/policy/tf_policy_template.py b/rllib/policy/tf_policy_template.py index 02e2b18cd..c932ecde3 100644 --- a/rllib/policy/tf_policy_template.py +++ b/rllib/policy/tf_policy_template.py @@ -185,7 +185,7 @@ def build_tf_policy(name, @override(TFPolicy) def extra_compute_grad_fetches(self): if extra_learn_fetches_fn: - # auto-add empty learner stats dict if needed + # Auto-add empty learner stats dict if needed. return dict({ LEARNER_STATS_KEY: {} }, **extra_learn_fetches_fn(self)) diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index be464eae5..6b896028f 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -7,7 +7,8 @@ from ray.rllib.policy.rnn_sequencing import pad_batch_to_sequences_of_same_size from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.framework import try_import_torch from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule -from ray.rllib.utils.torch_ops import convert_to_non_torch_type +from ray.rllib.utils.torch_ops import convert_to_non_torch_type, \ + convert_to_torch_tensor from ray.rllib.utils.tracking_dict import UsageTrackingDict torch, _ = try_import_torch() @@ -111,15 +112,18 @@ class TorchPolicy(Policy): seq_lens = torch.ones(len(obs_batch), dtype=torch.int32) input_dict = self._lazy_tensor_dict({ SampleBatch.CUR_OBS: obs_batch, + "is_training": False, }) - if prev_action_batch: + if prev_action_batch is not None: input_dict[SampleBatch.PREV_ACTIONS] = prev_action_batch - if prev_reward_batch: + if prev_reward_batch is not None: input_dict[SampleBatch.PREV_REWARDS] = prev_reward_batch - state_batches = [self._convert_to_tensor(s) for s in state_batches] + state_batches = [ + self._convert_to_tensor(s) for s in (state_batches or []) + ] if self.action_sampler_fn: - dist_class = dist_inputs = None + action_dist = dist_inputs = None state_out = [] actions, logp = self.action_sampler_fn( self, @@ -129,12 +133,17 @@ class TorchPolicy(Policy): timestep=timestep) else: # Call the exploration before_compute_actions hook. - self.exploration.before_compute_actions(timestep=timestep) + self.exploration.before_compute_actions( + explore=explore, timestep=timestep) if self.action_distribution_fn: dist_inputs, dist_class, state_out = \ self.action_distribution_fn( - self, self.model, input_dict[SampleBatch.CUR_OBS], - explore=explore, timestep=timestep) + self, + self.model, + input_dict[SampleBatch.CUR_OBS], + explore=explore, + timestep=timestep, + is_training=False) else: dist_class = self.dist_class dist_inputs, state_out = self.model( @@ -194,7 +203,10 @@ class TorchPolicy(Policy): # Action dist class and inputs are generated via custom function. if self.action_distribution_fn: dist_inputs, dist_class, _ = self.action_distribution_fn( - self, self.model, input_dict[SampleBatch.CUR_OBS]) + self, + self.model, + input_dict[SampleBatch.CUR_OBS], + explore=False) # Default action-dist inputs calculation. else: dist_class = self.dist_class @@ -242,9 +254,11 @@ class TorchPolicy(Policy): info["allreduce_latency"] = time.time() - start self._optimizer.step() - info.update(self.extra_grad_info(train_batch)) - return {LEARNER_STATS_KEY: info} + info.update(self.extra_grad_info(train_batch)) + return { + LEARNER_STATS_KEY: info + } @override(Policy) def compute_gradients(self, postprocessed_batch): @@ -278,10 +292,14 @@ class TorchPolicy(Policy): @override(Policy) def get_weights(self): - return {k: v.cpu() for k, v in self.model.state_dict().items()} + return { + k: v.cpu().detach().numpy() + for k, v in self.model.state_dict().items() + } @override(Policy) def set_weights(self, weights): + weights = convert_to_torch_tensor(weights, device=self.device) self.model.load_state_dict(weights) @override(Policy) diff --git a/rllib/tests/run_regression_tests.py b/rllib/tests/run_regression_tests.py index 26eaeffc3..51a2c9d65 100644 --- a/rllib/tests/run_regression_tests.py +++ b/rllib/tests/run_regression_tests.py @@ -51,7 +51,7 @@ if __name__ == "__main__": passed = False for i in range(3): - trials = run_experiments(experiments, resume=False, verbose=1) + trials = run_experiments(experiments, resume=False, verbose=0) for t in trials: if (t.last_result["episode_reward_mean"] >= diff --git a/rllib/tests/test_evaluators.py b/rllib/tests/test_evaluators.py index 6979e403d..58edb6a6c 100644 --- a/rllib/tests/test_evaluators.py +++ b/rllib/tests/test_evaluators.py @@ -4,7 +4,7 @@ import unittest import ray from ray.rllib.agents.dqn import DQNTrainer from ray.rllib.agents.a3c import A3CTrainer -from ray.rllib.agents.dqn.dqn_policy import _adjust_nstep +from ray.rllib.agents.dqn.dqn_tf_policy import _adjust_nstep from ray.tune.registry import register_env diff --git a/rllib/tests/test_multi_agent_env.py b/rllib/tests/test_multi_agent_env.py index f0b205b65..434cd3919 100644 --- a/rllib/tests/test_multi_agent_env.py +++ b/rllib/tests/test_multi_agent_env.py @@ -5,7 +5,7 @@ import unittest import ray from ray.rllib.agents.pg import PGTrainer from ray.rllib.agents.pg.pg_tf_policy import PGTFPolicy -from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy +from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy from ray.rllib.optimizers import (SyncSamplesOptimizer, SyncReplayOptimizer, AsyncGradientsOptimizer) from ray.rllib.tests.test_rollout_worker import (MockEnv, MockEnv2, MockPolicy) diff --git a/rllib/tuned_examples/regression_tests/cartpole-dqn-param-noise.yaml b/rllib/tuned_examples/regression_tests/cartpole-dqn-tf-param-noise.yaml similarity index 54% rename from rllib/tuned_examples/regression_tests/cartpole-dqn-param-noise.yaml rename to rllib/tuned_examples/regression_tests/cartpole-dqn-tf-param-noise.yaml index 7b8233a33..50d3075f5 100644 --- a/rllib/tuned_examples/regression_tests/cartpole-dqn-param-noise.yaml +++ b/rllib/tuned_examples/regression_tests/cartpole-dqn-tf-param-noise.yaml @@ -1,17 +1,18 @@ -cartpole-dqn-w-param-noise: +cartpole-dqn-tf-w-param-noise: env: CartPole-v0 run: DQN stop: episode_reward_mean: 150 - timesteps_total: 100000 + timesteps_total: 300000 config: + use_pytorch: false exploration_config: type: ParameterNoise - random_timesteps: 0 + random_timesteps: 10000 initial_stddev: 1.0 batch_mode: complete_episodes - lr: 0.001 + lr: 0.0008 num_workers: 0 model: - fcnet_hiddens: [16] - fcnet_activation: linear + fcnet_hiddens: [32, 32] + fcnet_activation: tanh diff --git a/rllib/tuned_examples/regression_tests/cartpole-dqn.yaml b/rllib/tuned_examples/regression_tests/cartpole-dqn-tf.yaml similarity index 78% rename from rllib/tuned_examples/regression_tests/cartpole-dqn.yaml rename to rllib/tuned_examples/regression_tests/cartpole-dqn-tf.yaml index 5a6ba5033..410fafb87 100644 --- a/rllib/tuned_examples/regression_tests/cartpole-dqn.yaml +++ b/rllib/tuned_examples/regression_tests/cartpole-dqn-tf.yaml @@ -1,9 +1,10 @@ -cartpole-dqn: +cartpole-dqn-tf: env: CartPole-v0 run: DQN stop: episode_reward_mean: 150 timesteps_total: 50000 config: + use_pytorch: false n_step: 3 gamma: 0.95 diff --git a/rllib/tuned_examples/regression_tests/cartpole-dqn-torch-param-noise.yaml b/rllib/tuned_examples/regression_tests/cartpole-dqn-torch-param-noise.yaml new file mode 100644 index 000000000..1e5fcdf10 --- /dev/null +++ b/rllib/tuned_examples/regression_tests/cartpole-dqn-torch-param-noise.yaml @@ -0,0 +1,18 @@ +cartpole-dqn-torch-w-param-noise: + env: CartPole-v0 + run: DQN + stop: + episode_reward_mean: 150 + timesteps_total: 300000 + config: + use_pytorch: true + exploration_config: + type: ParameterNoise + random_timesteps: 10000 + initial_stddev: 1.0 + batch_mode: complete_episodes + lr: 0.0008 + num_workers: 0 + model: + fcnet_hiddens: [32, 32] + fcnet_activation: tanh diff --git a/rllib/tuned_examples/regression_tests/cartpole-dqn-torch.yaml b/rllib/tuned_examples/regression_tests/cartpole-dqn-torch.yaml new file mode 100644 index 000000000..ff0d48620 --- /dev/null +++ b/rllib/tuned_examples/regression_tests/cartpole-dqn-torch.yaml @@ -0,0 +1,10 @@ +cartpole-dqn-torch: + env: CartPole-v0 + run: DQN + stop: + episode_reward_mean: 150 + timesteps_total: 50000 + config: + use_pytorch: true + n_step: 3 + gamma: 0.95 diff --git a/rllib/tuned_examples/regression_tests/cartpole-simpleq-tf.yaml b/rllib/tuned_examples/regression_tests/cartpole-simpleq-tf.yaml new file mode 100644 index 000000000..16da7921f --- /dev/null +++ b/rllib/tuned_examples/regression_tests/cartpole-simpleq-tf.yaml @@ -0,0 +1,8 @@ +cartpole-dqn-tf: + env: CartPole-v0 + run: SimpleQ + stop: + episode_reward_mean: 150 + timesteps_total: 50000 + config: + use_pytorch: false diff --git a/rllib/tuned_examples/regression_tests/cartpole-simpleq-torch.yaml b/rllib/tuned_examples/regression_tests/cartpole-simpleq-torch.yaml new file mode 100644 index 000000000..dcb9128b0 --- /dev/null +++ b/rllib/tuned_examples/regression_tests/cartpole-simpleq-torch.yaml @@ -0,0 +1,8 @@ +cartpole-dqn-torch: + env: CartPole-v0 + run: SimpleQ + stop: + episode_reward_mean: 150 + timesteps_total: 50000 + config: + use_pytorch: true diff --git a/rllib/utils/__init__.py b/rllib/utils/__init__.py index 36f8a9739..3efdb877b 100644 --- a/rllib/utils/__init__.py +++ b/rllib/utils/__init__.py @@ -14,6 +14,8 @@ from ray.rllib.utils.policy_server import PolicyServer from ray.rllib.utils.schedules import LinearSchedule, PiecewiseSchedule, \ PolynomialSchedule, ExponentialSchedule, ConstantSchedule from ray.rllib.utils.test_utils import check, framework_iterator +from ray.rllib.utils.torch_ops import convert_to_non_torch_type, \ + convert_to_torch_tensor from ray.tune.utils import merge_dicts, deep_update @@ -60,6 +62,8 @@ __all__ = [ "add_mixins", "check", "check_framework", + "convert_to_non_torch_type", + "convert_to_torch_tensor", "deprecation_warning", "fc", "force_list", diff --git a/rllib/utils/exploration/parameter_noise.py b/rllib/utils/exploration/parameter_noise.py index ef0812f85..efa68a75b 100644 --- a/rllib/utils/exploration/parameter_noise.py +++ b/rllib/utils/exploration/parameter_noise.py @@ -4,6 +4,7 @@ import numpy as np from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.models.tf.tf_action_dist import Categorical +from ray.rllib.models.torch.torch_action_dist import TorchCategorical from ray.rllib.utils.annotations import override from ray.rllib.utils.exploration.exploration import Exploration from ray.rllib.utils.framework import try_import_tf, try_import_torch @@ -63,18 +64,21 @@ class ParameterNoise(Exploration): # This excludes any variable, whose name contains "LayerNorm" (those # are BatchNormalization layers, which should not be perturbed). self.model_variables = [ - v for v in self.model.variables() if "LayerNorm" not in v.name + v for k, v in self.model.variables(as_dict=True).items() + if "LayerNorm" not in k ] # Our noise to be added to the weights. Each item in `self.noise` # corresponds to one Model variable and holding the Gaussian noise to # be added to that variable (weight). self.noise = [] for var in self.model_variables: + name_ = var.name.split(":")[0] + "_noisy" if var.name else "" self.noise.append( get_variable( np.zeros(var.shape, dtype=np.float32), framework=self.framework, - tf_name=var.name.split(":")[0] + "_noisy")) + tf_name=name_, + torch_tensor=True)) # tf-specific ops to sample, assign and remove noise. if self.framework == "tf" and not tf.executing_eagerly(): @@ -205,7 +209,7 @@ class ParameterNoise(Exploration): explore=self.weights_are_currently_noisy) # Categorical case (e.g. DQN). - if policy.dist_class is Categorical: + if policy.dist_class in (Categorical, TorchCategorical): action_dist = softmax(fetches[SampleBatch.ACTION_DIST_INPUTS]) else: # TODO(sven): Other action-dist cases. raise NotImplementedError @@ -223,7 +227,7 @@ class ParameterNoise(Exploration): explore=not self.weights_are_currently_noisy) # Categorical case (e.g. DQN). - if policy.dist_class is Categorical: + if policy.dist_class in (Categorical, TorchCategorical): action_dist = softmax(fetches[SampleBatch.ACTION_DIST_INPUTS]) if noisy_action_dist is None: @@ -232,7 +236,7 @@ class ParameterNoise(Exploration): noise_free_action_dist = action_dist # Categorical case (e.g. DQN). - if policy.dist_class is Categorical: + if policy.dist_class in (Categorical, TorchCategorical): # Calculate KL-divergence (DKL(clean||noisy)) according to [2]. # TODO(sven): Allow KL-divergence to be calculated by our # Distribution classes (don't support off-graph/numpy yet). @@ -269,7 +273,7 @@ class ParameterNoise(Exploration): else: for i in range(len(self.noise)): self.noise[i] = torch.normal( - 0.0, self.stddev, size=self.noise[i].size) + 0.0, self.stddev, size=self.noise[i].size()) def _tf_sample_new_noise_op(self): added_noises = [] @@ -319,7 +323,7 @@ class ParameterNoise(Exploration): else: for i in range(len(self.noise)): # Add noise to weights in-place. - torch.add_(self.model_variables[i], self.noise[i]) + self.model_variables[i].add_(self.noise[i]) self.weights_are_currently_noisy = True @@ -358,7 +362,7 @@ class ParameterNoise(Exploration): # Removes the stored noise from the model's parameters. for var, noise in zip(self.model_variables, self.noise): # Remove noise from weights in-place. - torch.add_(var, -noise) + var.add_(-noise) self.weights_are_currently_noisy = False diff --git a/rllib/utils/exploration/tests/test_explorations.py b/rllib/utils/exploration/tests/test_explorations.py index fafef3b93..24c3c783c 100644 --- a/rllib/utils/exploration/tests/test_explorations.py +++ b/rllib/utils/exploration/tests/test_explorations.py @@ -31,8 +31,8 @@ def do_test_explorations(run, # Test all frameworks. for fw in framework_iterator(config): if fw == "torch" and \ - run in [ddpg.DDPGTrainer, dqn.DQNTrainer, dqn.SimpleQTrainer, - impala.ImpalaTrainer, sac.SACTrainer, td3.TD3Trainer]: + run in [ddpg.DDPGTrainer, impala.ImpalaTrainer, + sac.SACTrainer, td3.TD3Trainer]: continue elif fw == "eager" and run in [ ddpg.DDPGTrainer, sac.SACTrainer, td3.TD3Trainer @@ -123,8 +123,8 @@ class TestExplorations(unittest.TestCase): expected_mean_action=0.0) def test_simple_dqn(self): - do_test_explorations(dqn.SimpleQTrainer, - "CartPole-v0", dqn.DEFAULT_CONFIG, + do_test_explorations(dqn.SimpleQTrainer, "CartPole-v0", + dqn.SIMPLE_Q_DEFAULT_CONFIG, np.array([0.0, 0.1, 0.0, 0.0])) def test_dqn(self): diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 3507860ea..342ae4eab 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -135,16 +135,26 @@ def try_import_torch(error=False): return None, nn -def get_variable(value, framework="tf", tf_name="unnamed-variable"): +def get_variable(value, + framework="tf", + trainable=False, + tf_name="unnamed-variable", + torch_tensor=False): """ Args: value (any): The initial value to use. In the non-tf case, this will be returned as is. framework (str): One of "tf", "torch", or None. - tf_name (str): An optional name for the variable. Only for tf. + trainable (bool): Whether the generated variable should be + trainable (tf)/require_grad (torch) or not (default: False). + tf_name (str): For framework="tf": An optional name for the + tf.Variable. + torch_tensor (bool): For framework="torch": Whether to actually create + a torch.tensor, or just a python value (default). Returns: - any: A framework-specific variable (tf.Variable or python primitive). + any: A framework-specific variable (tf.Variable, torch.tensor, or + python primitive). """ if framework == "tf": import tensorflow as tf @@ -153,7 +163,12 @@ def get_variable(value, framework="tf", tf_name="unnamed-variable"): if isinstance(value, float) else tf.int32 if isinstance(value, int) else None) return tf.compat.v1.get_variable( - tf_name, initializer=value, dtype=dtype) + tf_name, initializer=value, dtype=dtype, trainable=trainable) + elif framework == "torch" and torch_tensor is True: + import torch + var_ = torch.from_numpy(value) + var_.requires_grad = trainable + return var_ # torch or None: Return python primitive. return value diff --git a/rllib/utils/numpy.py b/rllib/utils/numpy.py index a2e4b31be..a40d0a6ca 100644 --- a/rllib/utils/numpy.py +++ b/rllib/utils/numpy.py @@ -83,6 +83,10 @@ def one_hot(x, depth=0, on_value=1, off_value=0): Returns: np.ndarray: The one-hot encoded equivalent of the input array. """ + # Handle torch arrays properly. + if torch and isinstance(x, torch.Tensor): + x = x.numpy() + # Handle bool arrays correctly. if x.dtype == np.bool_: x = x.astype(np.int) @@ -114,7 +118,7 @@ def one_hot(x, depth=0, on_value=1, off_value=0): return out -def fc(x, weights, biases=None): +def fc(x, weights, biases=None, framework=None): """ Calculates the outputs of a fully-connected (dense) layer given weights/biases and an input. @@ -123,23 +127,29 @@ def fc(x, weights, biases=None): x (np.ndarray): The input to the dense layer. weights (np.ndarray): The weights matrix. biases (Optional[np.ndarray]): The biases vector. All 0s if None. + framework (Optional[str]): An optional framework hint (to figure out, + e.g. whether to transpose torch weight matrices). Returns: The dense layer's output. """ + def map_(data, transpose=False): + if torch: + if isinstance(data, torch.Tensor): + data = data.cpu().detach().numpy() + if tf and tf.executing_eagerly(): + if isinstance(data, tf.Variable): + data = data.numpy() + if transpose: + data = np.transpose(data) + return data + + x = map_(x) # Torch stores matrices in transpose (faster for backprop). - if torch: # and isinstance(weights, torch.Tensor): - x = x.detach().numpy() if isinstance(x, torch.Tensor) else x - weights = np.transpose(weights.detach().numpy()) if \ - isinstance(weights, torch.Tensor) else weights - biases = biases.detach().numpy() if \ - isinstance(biases, torch.Tensor) else biases - if tf and tf.executing_eagerly(): - x = x.numpy() if isinstance(x, tf.Variable) else x - weights = weights.numpy() if isinstance(weights, tf.Variable) else \ - weights - biases = biases.numpy() if isinstance(biases, tf.Variable) else biases + weights = map_(weights, transpose=framework == "torch") + biases = map_(biases) + return np.matmul(x, weights) + (0.0 if biases is None else biases) @@ -216,3 +226,10 @@ def lstm(x, unrolled_outputs[:, t, :] = h_states return unrolled_outputs, (c_states, h_states) + + +def huber_loss(x, delta=1.0): + """Reference: https://en.wikipedia.org/wiki/Huber_loss""" + return np.where( + np.abs(x) < delta, + np.power(x, 2.0) * 0.5, delta * (np.abs(x) - 0.5 * delta)) diff --git a/rllib/utils/test_utils.py b/rllib/utils/test_utils.py index fe4e0d30f..49bf4269d 100644 --- a/rllib/utils/test_utils.py +++ b/rllib/utils/test_utils.py @@ -187,15 +187,10 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): rtol=rtol, false=false) if torch is not None: - # y should never be a Tensor (y=expected value). - if isinstance(y, torch.Tensor): - raise ValueError("`y` (expected value) must not be a Tensor. " - "Use numpy.ndarray instead") if isinstance(x, torch.Tensor): - try: - x = x.numpy() - except RuntimeError: - x = x.detach().numpy() + x = x.detach().numpy() + if isinstance(y, torch.Tensor): + y = y.detach().numpy() # Using decimals. if atol is None and rtol is None: diff --git a/rllib/utils/tf_ops.py b/rllib/utils/tf_ops.py index f94f54ce6..bca05fb0d 100644 --- a/rllib/utils/tf_ops.py +++ b/rllib/utils/tf_ops.py @@ -18,11 +18,14 @@ def reduce_mean_ignore_inf(x, axis): tf.cast(mask, tf.float32), axis)) -def minimize_and_clip(optimizer, objective, var_list, clip_val=10): +def minimize_and_clip(optimizer, objective, var_list, clip_val=10.0): """Minimized `objective` using `optimizer` w.r.t. variables in `var_list` while ensure the norm of the gradients for each variable is clipped to `clip_val` """ + # Accidentally passing values < 0.0 will break all gradients. + assert clip_val > 0.0, clip_val + gradients = optimizer.compute_gradients(objective, var_list=var_list) for i, (grad, var) in enumerate(gradients): if grad is not None: diff --git a/rllib/utils/torch_ops.py b/rllib/utils/torch_ops.py index 3322e0878..6eaa14c27 100644 --- a/rllib/utils/torch_ops.py +++ b/rllib/utils/torch_ops.py @@ -1,3 +1,4 @@ +import numpy as np import logging from ray.rllib.utils.framework import try_import_torch @@ -13,6 +14,33 @@ except (ImportError, ModuleNotFoundError) as e: raise e +def huber_loss(x, delta=1.0): + """Reference: https://en.wikipedia.org/wiki/Huber_loss""" + return torch.where( + torch.abs(x) < delta, + torch.pow(x, 2.0) * 0.5, delta * (torch.abs(x) - 0.5 * delta)) + + +def reduce_mean_ignore_inf(x, axis): + """Same as torch.mean() but ignores -inf values.""" + mask = torch.ne(x, float("-inf")) + x_zeroed = torch.where(mask, x, torch.zeros_like(x)) + return torch.sum(x_zeroed, axis) / torch.sum(mask.float(), axis) + + +def minimize_and_clip(optimizer, objective, var_list, clip_val=10): + """Minimized `objective` using `optimizer` w.r.t. variables in + `var_list` while ensure the norm of the gradients for each + variable is clipped to `clip_val` + """ + gradients = optimizer.compute_gradients(objective, var_list=var_list) + for i, (grad, var) in enumerate(gradients): + if grad is not None: + gradients[i] = (torch.nn.utils.clip_grad_norm_(grad, clip_val), + var) + return gradients + + def sequence_mask(lengths, maxlen, dtype=None): """ Exact same behavior as tf.sequence_mask. @@ -31,7 +59,7 @@ def sequence_mask(lengths, maxlen, dtype=None): def convert_to_non_torch_type(stats): - """Converts values in stats_dict to non-Tensor numpy or python types. + """Converts values in `stats` to non-Tensor numpy or python types. Args: stats (any): Any (possibly nested) struct, the values in which will be @@ -39,7 +67,7 @@ def convert_to_non_torch_type(stats): being converted to numpy types. Returns: - dict: A new dict with the same structure as stats_dict, but with all + Any: A new struct with the same structure as `stats`, but with all values converted to non-torch Tensor types. """ @@ -47,8 +75,32 @@ def convert_to_non_torch_type(stats): def mapping(item): if isinstance(item, torch.Tensor): return item.cpu().item() if len(item.size()) == 0 else \ - item.cpu().numpy() + item.cpu().detach().numpy() else: return item return tree.map_structure(mapping, stats) + + +def convert_to_torch_tensor(stats, device=None): + """Converts any struct to torch.Tensors. + + stats (any): Any (possibly nested) struct, the values in which will be + converted and returned as a new struct with all leaves converted + to torch tensors. + + Returns: + Any: A new struct with the same structure as `stats`, but with all + values converted to torch Tensor types. + """ + + def mapping(item): + if torch.is_tensor(item): + return item if device is None else item.to(device) + tensor = torch.from_numpy(np.asarray(item)) + # Floatify all float64 tensors. + if tensor.dtype == torch.double: + tensor = tensor.float() + return tensor if device is None else tensor.to(device) + + return tree.map_structure(mapping, stats)