From 428516056abe41f135133e732a8d44af6ce9a234 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Wed, 15 Apr 2020 13:25:16 +0200 Subject: [PATCH] [RLlib] SAC Torch (incl. Atari learning) (#7984) * Policy-classes cleanup and torch/tf unification. - Make Policy abstract. - Add `action_dist` to call to `extra_action_out_fn` (necessary for PPO torch). - Move some methods and vars to base Policy (from TFPolicy): num_state_tensors, ACTION_PROB, ACTION_LOGP and some more. * Fix `clip_action` import from Policy (should probably be moved into utils altogether). * - Move `is_recurrent()` and `num_state_tensors()` into TFPolicy (from DynamicTFPolicy). - Add config to all Policy c'tor calls (as 3rd arg after obs and action spaces). * Add `config` to c'tor call to TFPolicy. * Add missing `config` to c'tor call to TFPolicy in marvil_policy.py. * Fix test_rollout_worker.py::MockPolicy and BadPolicy classes (Policy base class is now abstract). * Fix LINT errors in Policy classes. * Implement StatefulPolicy abstract methods in test cases: test_multi_agent_env.py. * policy.py LINT errors. * Create a simple TestPolicy to sub-class from when testing Policies (reduces code in some test cases). * policy.py - Remove abstractmethod from `apply_gradients` and `compute_gradients` (these are not required iff `learn_on_batch` implemented). - Fix docstring of `num_state_tensors`. * Make QMIX torch Policy a child of TorchPolicy (instead of Policy). * QMixPolicy add empty implementations of abstract Policy methods. * Store Policy's config in self.config in base Policy c'tor. * - Make only compute_actions in base Policy's an abstractmethod and provide pass implementation to all other methods if not defined. - Fix state_batches=None (most Policies don't have internal states). * Cartpole tf learning. * Cartpole tf AND torch learning (in ~ same ts). * Cartpole tf AND torch learning (in ~ same ts). 2 * Cartpole tf (torch syntax-broken) learning (in ~ same ts). 3 * Cartpole tf AND torch learning (in ~ same ts). 4 * Cartpole tf AND torch learning (in ~ same ts). 5 * Cartpole tf AND torch learning (in ~ same ts). 6 * Cartpole tf AND torch learning (in ~ same ts). Pendulum tf learning. * WIP. * WIP. * SAC torch learning Pendulum. * WIP. * SAC torch and tf learning Pendulum and Cartpole after cleanup. * WIP. * LINT. * LINT. * SAC: Move policy.target_model to policy.device as well. * Fixes and cleanup. * Fix data-format of tf keras Conv2d layers (broken for some tf-versions which have data_format="channels_first" as default). * Fixes and LINT. * Fixes and LINT. * Fix and LINT. * WIP. * Test fixes and LINT. * Fixes and LINT. Co-authored-by: Sven Mika --- python/ray/tune/logger.py | 2 +- rllib/agents/a3c/a3c_torch_policy.py | 8 +- rllib/agents/dqn/dqn.py | 4 +- rllib/agents/dqn/dqn_tf_policy.py | 6 +- rllib/agents/dqn/dqn_torch_policy.py | 4 +- rllib/agents/dqn/tests/test_simple_q.py | 16 +- rllib/agents/pg/tests/test_pg.py | 26 +- rllib/agents/ppo/tests/test_ppo.py | 9 +- rllib/agents/sac/__init__.py | 4 +- rllib/agents/sac/sac.py | 68 ++- .../sac/{sac_model.py => sac_tf_model.py} | 24 +- .../sac/{sac_policy.py => sac_tf_policy.py} | 125 ++--- rllib/agents/sac/sac_torch_model.py | 208 ++++++++ rllib/agents/sac/sac_torch_policy.py | 343 +++++++++++++ rllib/agents/sac/tests/test_sac.py | 472 +++++++++++++++++- rllib/agents/trainer.py | 7 - rllib/agents/trainer_template.py | 3 + .../alpha_zero/core/alpha_zero_policy.py | 7 +- rllib/env/vector_env.py | 4 +- rllib/models/catalog.py | 66 ++- rllib/models/tests/test_distributions.py | 102 +++- rllib/models/tf/fcnet_v1.py | 4 +- rllib/models/tf/fcnet_v2.py | 53 +- rllib/models/tf/misc.py | 6 - rllib/models/tf/tf_action_dist.py | 25 +- rllib/models/tf/visionnet_v1.py | 4 +- rllib/models/tf/visionnet_v2.py | 16 +- rllib/models/torch/fcnet.py | 65 ++- rllib/models/torch/misc.py | 37 +- rllib/models/torch/torch_action_dist.py | 114 +++++ rllib/models/torch/visionnet.py | 23 +- rllib/optimizers/sync_replay_optimizer.py | 19 +- rllib/policy/dynamic_tf_policy.py | 8 +- .../tests/test_compute_log_likelihoods.py | 31 +- rllib/policy/torch_policy.py | 139 ++++-- rllib/policy/torch_policy_template.py | 27 +- rllib/tests/test_catalog.py | 29 +- rllib/tests/test_dependency_torch.py | 3 + rllib/tuned_examples/atari-sac.yaml | 48 ++ rllib/tuned_examples/halfcheetah-sac.yaml | 18 +- rllib/tuned_examples/mspacman-sac.yaml | 43 ++ rllib/tuned_examples/pendulum-sac.yaml | 72 +-- ...cartpole-sac.yaml => cartpole-sac-tf.yaml} | 9 +- .../regression_tests/cartpole-sac-torch.yaml | 17 + ...pendulum-sac.yaml => pendulum-sac-tf.yaml} | 11 +- .../regression_tests/pendulum-sac-torch.yaml | 13 + rllib/utils/exploration/epsilon_greedy.py | 3 +- rllib/utils/exploration/exploration.py | 8 +- rllib/utils/exploration/gaussian_noise.py | 7 +- .../exploration/ornstein_uhlenbeck_noise.py | 33 +- rllib/utils/exploration/parameter_noise.py | 5 +- rllib/utils/exploration/random.py | 18 +- rllib/utils/framework.py | 65 ++- rllib/utils/numpy.py | 4 +- rllib/utils/torch_ops.py | 4 + 55 files changed, 2031 insertions(+), 458 deletions(-) rename rllib/agents/sac/{sac_model.py => sac_tf_model.py} (90%) rename rllib/agents/sac/{sac_policy.py => sac_tf_policy.py} (82%) create mode 100644 rllib/agents/sac/sac_torch_model.py create mode 100644 rllib/agents/sac/sac_torch_policy.py create mode 100644 rllib/tuned_examples/atari-sac.yaml create mode 100644 rllib/tuned_examples/mspacman-sac.yaml rename rllib/tuned_examples/regression_tests/{cartpole-sac.yaml => cartpole-sac-tf.yaml} (76%) create mode 100644 rllib/tuned_examples/regression_tests/cartpole-sac-torch.yaml rename rllib/tuned_examples/regression_tests/{pendulum-sac.yaml => pendulum-sac-tf.yaml} (55%) create mode 100644 rllib/tuned_examples/regression_tests/pendulum-sac-torch.yaml diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 0c7bd4410..d7544c997 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -215,7 +215,7 @@ class TBXLogger(Logger): valid_result[full_attr] = value self._file_writer.add_scalar( full_attr, value, global_step=step) - elif type(value) is list and len(value) > 0: + elif type(value) in [list, np.ndarray] and len(value) > 0: valid_result[full_attr] = value try: self._file_writer.add_histogram( diff --git a/rllib/agents/a3c/a3c_torch_policy.py b/rllib/agents/a3c/a3c_torch_policy.py index 2168e7d11..5babc3e8b 100644 --- a/rllib/agents/a3c/a3c_torch_policy.py +++ b/rllib/agents/a3c/a3c_torch_policy.py @@ -55,12 +55,12 @@ def model_value_predictions(policy, input_dict, state_batches, model, return {SampleBatch.VF_PREDS: model.value_function()} -def apply_grad_clipping(policy): +def apply_grad_clipping(policy, optimizer, loss): info = {} if policy.config["grad_clip"]: - total_norm = nn.utils.clip_grad_norm_(policy.model.parameters(), - policy.config["grad_clip"]) - info["grad_gnorm"] = total_norm + for param_group in optimizer.param_groups: + info["grad_gnorm"] = nn.utils.clip_grad_norm_( + param_group["params"], policy.config["grad_clip"]) return info diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 00a940a6b..1698c944f 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -159,7 +159,7 @@ def make_policy_optimizer(workers, config): **kwargs) -def validate_config_and_setup_param_noise(config): +def validate_config(config): """Checks and updates the config based on settings. Rewrites rollout_fragment_length to take into account n_step truncation. @@ -341,7 +341,7 @@ GenericOffPolicyTrainer = build_trainer( default_policy=None, get_policy_class=get_policy_class, default_config=DEFAULT_CONFIG, - validate_config=validate_config_and_setup_param_noise, + validate_config=validate_config, get_initial_state=get_initial_state, make_policy_optimizer=make_policy_optimizer, before_train_step=update_worker_exploration, diff --git a/rllib/agents/dqn/dqn_tf_policy.py b/rllib/agents/dqn/dqn_tf_policy.py index 554c785e9..7838fe617 100644 --- a/rllib/agents/dqn/dqn_tf_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -184,16 +184,16 @@ def build_q_model(policy, obs_space, action_space, config): def get_distribution_inputs_and_class(policy, - q_model, + model, obs_batch, *, explore=True, **kwargs): - q_vals = compute_q_values(policy, q_model, obs_batch, explore) + q_vals = compute_q_values(policy, model, obs_batch, explore) q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals policy.q_values = q_vals - policy.q_func_vars = q_model.variables() + policy.q_func_vars = model.variables() return policy.q_values, Categorical, [] # state-out diff --git a/rllib/agents/dqn/dqn_torch_policy.py b/rllib/agents/dqn/dqn_torch_policy.py index 3a60df5db..16b2148e6 100644 --- a/rllib/agents/dqn/dqn_torch_policy.py +++ b/rllib/agents/dqn/dqn_torch_policy.py @@ -135,13 +135,13 @@ def build_q_model_and_distribution(policy, obs_space, action_space, config): def get_distribution_inputs_and_class(policy, - q_model, + model, obs_batch, *, explore=True, is_training=False, **kwargs): - q_vals = compute_q_values(policy, q_model, obs_batch, explore, is_training) + q_vals = compute_q_values(policy, model, obs_batch, explore, is_training) q_vals = q_vals[0] if isinstance(q_vals, tuple) else q_vals policy.q_values = q_vals diff --git a/rllib/agents/dqn/tests/test_simple_q.py b/rllib/agents/dqn/tests/test_simple_q.py index d3c9d36a1..17ea36749 100644 --- a/rllib/agents/dqn/tests/test_simple_q.py +++ b/rllib/agents/dqn/tests/test_simple_q.py @@ -60,21 +60,21 @@ class TestSimpleQ(unittest.TestCase): q_t = np.sum( one_hot(input_[SampleBatch.ACTIONS], 2) * fc( fc(input_[SampleBatch.CUR_OBS], - vars[0], - vars[1], + vars[0 if fw != "torch" else 2], + vars[1 if fw != "torch" else 3], framework=fw), - vars[2], - vars[3], + vars[2 if fw != "torch" else 0], + vars[3 if fw != "torch" else 1], framework=fw), 1) # max[a'](Qtarget(s',a')) outputs. q_target_tp1 = np.max( fc(fc( input_[SampleBatch.NEXT_OBS], - vars_t[0], - vars_t[1], + vars_t[0 if fw != "torch" else 2], + vars_t[1 if fw != "torch" else 3], framework=fw), - vars_t[2], - vars_t[3], + vars_t[2 if fw != "torch" else 0], + vars_t[3 if fw != "torch" else 1], framework=fw), 1) # TD-errors (Bellman equation). td_error = q_t - config["gamma"] * input_[SampleBatch.REWARDS] + \ diff --git a/rllib/agents/pg/tests/test_pg.py b/rllib/agents/pg/tests/test_pg.py index b4b1c800f..6c56d4ad7 100644 --- a/rllib/agents/pg/tests/test_pg.py +++ b/rllib/agents/pg/tests/test_pg.py @@ -91,14 +91,24 @@ class TestPG(unittest.TestCase): train_batch=train_batch) # Calculate expected results. - expected_logits = fc( - fc(train_batch[SampleBatch.CUR_OBS], - vars[0], - vars[1], - framework=fw), - vars[2], - vars[3], - framework=fw) + if fw != "torch": + expected_logits = fc( + fc(train_batch[SampleBatch.CUR_OBS], + vars[0], + vars[1], + framework=fw), + vars[2], + vars[3], + framework=fw) + else: + expected_logits = fc( + fc(train_batch[SampleBatch.CUR_OBS], + vars[2], + vars[3], + framework=fw), + vars[0], + vars[1], + framework=fw) expected_logp = dist_cls(expected_logits, policy.model).logp( train_batch[SampleBatch.ACTIONS]) if sess: diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index cdc0df228..d7a65f664 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -165,11 +165,14 @@ class TestPPO(unittest.TestCase): vars = policy.get_session().run(vars) expected_shared_out = fc( train_batch[SampleBatch.CUR_OBS], - vars[0], - vars[1], + vars[0 if fw != "torch" else 2], + vars[1 if fw != "torch" else 3], framework=fw) expected_logits = fc( - expected_shared_out, vars[2], vars[3], framework=fw) + expected_shared_out, + vars[2 if fw != "torch" else 0], + vars[3 if fw != "torch" else 1], + framework=fw) expected_value_outs = fc( expected_shared_out, vars[4], vars[5], framework=fw) diff --git a/rllib/agents/sac/__init__.py b/rllib/agents/sac/__init__.py index 673192004..260b4f781 100644 --- a/rllib/agents/sac/__init__.py +++ b/rllib/agents/sac/__init__.py @@ -1,8 +1,10 @@ from ray.rllib.agents.sac.sac import SACTrainer, DEFAULT_CONFIG -from ray.rllib.agents.sac.sac_policy import SACTFPolicy +from ray.rllib.agents.sac.sac_tf_policy import SACTFPolicy +from ray.rllib.agents.sac.sac_torch_policy import SACTorchPolicy __all__ = [ "SACTFPolicy", + "SACTorchPolicy", "SACTrainer", "DEFAULT_CONFIG", ] diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index 399fcd454..d92cda6e7 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -1,6 +1,7 @@ from ray.rllib.agents.trainer import with_common_config from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer -from ray.rllib.agents.sac.sac_policy import SACTFPolicy +from ray.rllib.agents.sac.sac_tf_policy import SACTFPolicy +from ray.rllib.utils.deprecation import deprecation_warning, DEPRECATED_VALUE OPTIMIZER_SHARED_CONFIGS = [ "buffer_size", "prioritized_replay", "prioritized_replay_alpha", @@ -14,15 +15,19 @@ DEFAULT_CONFIG = with_common_config({ # === Model === "twin_q": True, "use_state_preprocessor": False, - # RLlib model options for the Q function + # RLlib model options for the Q function(s). "Q_model": { - "hidden_activation": "relu", - "hidden_layer_sizes": (256, 256), + "fcnet_activation": "relu", + "fcnet_hiddens": [256, 256], + "hidden_activation": DEPRECATED_VALUE, + "hidden_layer_sizes": DEPRECATED_VALUE, }, - # RLlib model options for the policy function + # RLlib model options for the policy function. "policy_model": { - "hidden_activation": "relu", - "hidden_layer_sizes": (256, 256), + "fcnet_activation": "relu", + "fcnet_hiddens": [256, 256], + "hidden_activation": DEPRECATED_VALUE, + "hidden_layer_sizes": DEPRECATED_VALUE, }, # Unsquash actions to the upper and lower bounds of env's action space. # Ignored for discrete action spaces. @@ -67,7 +72,7 @@ DEFAULT_CONFIG = with_common_config({ "entropy_learning_rate": 3e-4, }, # If not None, clip gradients during optimization at this value. - "grad_norm_clipping": None, + "grad_clip": None, # How many steps of the model to sample before learning starts. "learning_starts": 1500, # Update the replay buffer with this many samples at once. Note that this @@ -95,12 +100,57 @@ DEFAULT_CONFIG = with_common_config({ "worker_side_prioritization": False, # Prevent iterations from going lower than this time span. "min_iter_time_s": 1, + + # Whether the loss should be calculated deterministically (w/o the + # stochastic action sampling step). True only useful for cont. actions and + # for debugging! + "_deterministic_loss": False, + # Use a Beta-distribution instead of a SquashedGaussian for bounded, + # continuous action spaces (not recommended, for debugging only). + "_use_beta_distribution": False, + + # DEPRECATED VALUES (set to -1 to indicate they have not been overwritten + # by user's config). If we don't set them here, we will get an error + # from the config-key checker. + "grad_norm_clipping": DEPRECATED_VALUE, }) # __sphinx_doc_end__ # yapf: enable + +def get_policy_class(config): + if config.get("use_pytorch") is True: + from ray.rllib.agents.sac.sac_torch_policy import SACTorchPolicy + return SACTorchPolicy + else: + return SACTFPolicy + + +def validate_config(config): + if config.get("grad_norm_clipping", DEPRECATED_VALUE) != DEPRECATED_VALUE: + deprecation_warning("grad_norm_clipping", "grad_clip") + config["grad_clip"] = config.pop("grad_norm_clipping") + + # Use same keys as for standard Trainer "model" config. + for model in ["Q_model", "policy_model"]: + if config[model].get("hidden_activation", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: + deprecation_warning( + "{}.hidden_activation".format(model), + "{}.fcnet_activation".format(model), + error=True) + if config[model].get("hidden_layer_sizes", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: + deprecation_warning( + "{}.hidden_layer_sizes".format(model), + "{}.fcnet_hiddens".format(model), + error=True) + + SACTrainer = GenericOffPolicyTrainer.with_updates( name="SAC", default_config=DEFAULT_CONFIG, default_policy=SACTFPolicy, - get_policy_class=lambda c: SACTFPolicy) + get_policy_class=get_policy_class, + validate_config=validate_config, +) diff --git a/rllib/agents/sac/sac_model.py b/rllib/agents/sac/sac_tf_model.py similarity index 90% rename from rllib/agents/sac/sac_model.py rename to rllib/agents/sac/sac_tf_model.py index d5dd0676d..7079c5f17 100644 --- a/rllib/agents/sac/sac_model.py +++ b/rllib/agents/sac/sac_tf_model.py @@ -2,14 +2,12 @@ from gym.spaces import Discrete import numpy as np from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.utils import try_import_tf +from ray.rllib.utils.framework import try_import_tf tf = try_import_tf() -SCALE_DIAG_MIN_MAX = (-20, 2) - -class SACModel(TFModelV2): +class SACTFModel(TFModelV2): """Extension of standard TFModel for SAC. Data flow: @@ -32,7 +30,8 @@ class SACModel(TFModelV2): critic_hidden_activation="relu", critic_hiddens=(256, 256), twin_q=False, - initial_alpha=1.0): + initial_alpha=1.0, + target_entropy=None): """Initialize variables of this model. Extra model kwargs: @@ -48,20 +47,20 @@ class SACModel(TFModelV2): only defines the layers for the output heads. Those layers for forward() should be defined in subclasses of SACModel. """ - super(SACModel, self).__init__(obs_space, action_space, num_outputs, - model_config, name) - self.discrete = False + super(SACTFModel, self).__init__(obs_space, action_space, num_outputs, + model_config, name) if isinstance(action_space, Discrete): self.action_dim = action_space.n self.discrete = True action_outs = q_outs = self.action_dim else: self.action_dim = np.product(action_space.shape) + self.discrete = False action_outs = 2 * self.action_dim q_outs = 1 self.model_out = tf.keras.layers.Input( - shape=(num_outputs, ), name="model_out") + shape=(self.num_outputs, ), name="model_out") self.action_model = tf.keras.Sequential([ tf.keras.layers.Dense( units=hidden, @@ -118,6 +117,13 @@ class SACModel(TFModelV2): self.log_alpha = tf.Variable( np.log(initial_alpha), dtype=tf.float32, name="log_alpha") self.alpha = tf.exp(self.log_alpha) + # Auto-calculate the target entropy. + if target_entropy is None or target_entropy == "auto": + if self.discrete: + target_entropy = -action_space.n + else: + target_entropy = -np.prod(action_space.shape) + self.target_entropy = target_entropy self.register_variables([self.log_alpha]) diff --git a/rllib/agents/sac/sac_policy.py b/rllib/agents/sac/sac_tf_policy.py similarity index 82% rename from rllib/agents/sac/sac_policy.py rename to rllib/agents/sac/sac_tf_policy.py index 8a1c95338..0f611f2b4 100644 --- a/rllib/agents/sac/sac_policy.py +++ b/rllib/agents/sac/sac_tf_policy.py @@ -1,21 +1,20 @@ +from gym.spaces import Box, Discrete import logging -import numpy as np import ray import ray.experimental.tf_utils -from gym.spaces import Box, Discrete -from ray.rllib.agents.ddpg.noop_model import NoopModel from ray.rllib.agents.ddpg.ddpg_policy import ComputeTDErrorMixin, \ TargetNetworkMixin from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio -from ray.rllib.agents.sac.sac_model import SACModel +from ray.rllib.agents.sac.sac_tf_model import SACTFModel +from ray.rllib.agents.sac.sac_torch_model import SACTorchModel from ray.rllib.models import ModelCatalog from ray.rllib.models.tf.tf_action_dist import (Categorical, SquashedGaussian, DiagGaussian) from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy_template import build_tf_policy -from ray.rllib.utils import try_import_tf, try_import_tfp from ray.rllib.utils.error import UnsupportedSpaceException +from ray.rllib.utils.framework import try_import_tf, try_import_tfp from ray.rllib.utils.tf_ops import minimize_and_clip tf = try_import_tf() @@ -25,7 +24,7 @@ logger = logging.getLogger(__name__) def build_sac_model(policy, obs_space, action_space, config): - if config["model"]["custom_model"]: + if config["model"].get("custom_model"): logger.warning( "Setting use_state_preprocessor=True since a custom model " "was specified.") @@ -40,45 +39,49 @@ def build_sac_model(policy, obs_space, action_space, config): "Consider reshaping this into a single dimension, " "using a Tuple action space, or the multi-agent API.") + # 2 cases: + # 1) with separate state-preprocessor (before obs+action concat). + # 2) no separate state-preprocessor: concat obs+actions right away. if config["use_state_preprocessor"]: - default_model = None # catalog decides - num_outputs = 256 # arbitrary - config["model"]["no_final_linear"] = True + num_outputs = 256 # Flatten last Conv2D to this many nodes. else: - default_model = NoopModel - num_outputs = int(np.product(obs_space.shape)) + config["model"]["fcnet_hiddens"] = [] + num_outputs = 0 + # Force-ignore any additionally provided hidden layer sizes. + # Everything should be configured using SAC's "Q_model" and "policy_model" + # settings. policy.model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], - framework="tf", - model_interface=SACModel, - default_model=default_model, + obs_space=obs_space, + action_space=action_space, + num_outputs=num_outputs, + model_config=config["model"], + framework="torch" if config["use_pytorch"] else "tf", + model_interface=SACTorchModel if config["use_pytorch"] else SACTFModel, name="sac_model", - actor_hidden_activation=config["policy_model"]["hidden_activation"], - actor_hiddens=config["policy_model"]["hidden_layer_sizes"], - critic_hidden_activation=config["Q_model"]["hidden_activation"], - critic_hiddens=config["Q_model"]["hidden_layer_sizes"], + actor_hidden_activation=config["policy_model"]["fcnet_activation"], + actor_hiddens=config["policy_model"]["fcnet_hiddens"], + critic_hidden_activation=config["Q_model"]["fcnet_activation"], + critic_hiddens=config["Q_model"]["fcnet_hiddens"], twin_q=config["twin_q"], - initial_alpha=config["initial_alpha"]) + initial_alpha=config["initial_alpha"], + target_entropy=config["target_entropy"]) policy.target_model = ModelCatalog.get_model_v2( - obs_space, - action_space, - num_outputs, - config["model"], - framework="tf", - model_interface=SACModel, - default_model=default_model, + obs_space=obs_space, + action_space=action_space, + num_outputs=num_outputs, + model_config=config["model"], + framework="torch" if config["use_pytorch"] else "tf", + model_interface=SACTorchModel if config["use_pytorch"] else SACTFModel, name="target_sac_model", - actor_hidden_activation=config["policy_model"]["hidden_activation"], - actor_hiddens=config["policy_model"]["hidden_layer_sizes"], - critic_hidden_activation=config["Q_model"]["hidden_activation"], - critic_hiddens=config["Q_model"]["hidden_layer_sizes"], + actor_hidden_activation=config["policy_model"]["fcnet_activation"], + actor_hiddens=config["policy_model"]["fcnet_hiddens"], + critic_hidden_activation=config["Q_model"]["fcnet_activation"], + critic_hiddens=config["Q_model"]["fcnet_hiddens"], twin_q=config["twin_q"], - initial_alpha=config["initial_alpha"]) + initial_alpha=config["initial_alpha"], + target_entropy=config["target_entropy"]) return policy.model @@ -91,6 +94,9 @@ def postprocess_trajectory(policy, def get_dist_class(config, action_space): + assert config["_use_beta_distribution"] is False, \ + "Beta-distr. not supported for tf!" + if isinstance(action_space, Discrete): action_dist_class = Categorical else: @@ -117,6 +123,9 @@ def get_distribution_inputs_and_class(policy, def sac_actor_critic_loss(policy, model, _, train_batch): + # Should be True only for debugging purposes (e.g. test cases)! + deterministic = policy.config["_deterministic_loss"] + model_out_t, _ = model({ "obs": train_batch[SampleBatch.CUR_OBS], "is_training": policy._get_is_training_placeholder(), @@ -168,12 +177,14 @@ def sac_actor_critic_loss(policy, model, _, train_batch): action_dist_class = get_dist_class(policy.config, policy.action_space) action_dist_t = action_dist_class( model.get_policy_output(model_out_t), policy.model) - policy_t = action_dist_t.sample() - log_pis_t = tf.expand_dims(action_dist_t.sampled_action_logp(), -1) + policy_t = action_dist_t.sample() if not deterministic else \ + action_dist_t.deterministic_sample() + log_pis_t = tf.expand_dims(action_dist_t.logp(policy_t), -1) action_dist_tp1 = action_dist_class( model.get_policy_output(model_out_tp1), policy.model) - policy_tp1 = action_dist_tp1.sample() - log_pis_tp1 = tf.expand_dims(action_dist_tp1.sampled_action_logp(), -1) + policy_tp1 = action_dist_tp1.sample() if not deterministic else \ + action_dist_tp1.deterministic_sample() + log_pis_tp1 = tf.expand_dims(action_dist_tp1.logp(policy_tp1), -1) # Q-values for the actually selected actions. q_t = model.get_q_values(model_out_t, train_batch[SampleBatch.ACTIONS]) @@ -195,11 +206,12 @@ def sac_actor_critic_loss(policy, model, _, train_batch): if policy.config["twin_q"]: twin_q_tp1 = policy.target_model.get_twin_q_values( target_model_out_tp1, policy_tp1) + # Take min over both twin-NNs. + q_tp1 = tf.reduce_min((q_tp1, twin_q_tp1), axis=0) q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) if policy.config["twin_q"]: twin_q_t_selected = tf.squeeze(twin_q_t, axis=len(q_t.shape) - 1) - q_tp1 = tf.reduce_min((q_tp1, twin_q_tp1), axis=0) q_tp1 -= model.alpha * log_pis_tp1 q_tp1_best = tf.squeeze(input=q_tp1, axis=len(q_tp1.shape) - 1) @@ -232,15 +244,6 @@ def sac_actor_critic_loss(policy, model, _, train_batch): predictions=twin_q_t_selected, weights=0.5)) - # Auto-calculate the target entropy. - if policy.config["target_entropy"] == "auto": - if model.discrete: - target_entropy = np.array(-policy.action_space.n, dtype=np.float32) - else: - target_entropy = -np.prod(policy.action_space.shape) - else: - target_entropy = policy.config["target_entropy"] - # Alpha- and actor losses. # Note: In the papers, alpha is used directly, here we take the log. # Discrete case: Multiply the action probs as weights with the original @@ -250,7 +253,7 @@ def sac_actor_critic_loss(policy, model, _, train_batch): tf.reduce_sum( tf.multiply( tf.stop_gradient(policy_t), -model.log_alpha * - tf.stop_gradient(log_pis_t + target_entropy)), + tf.stop_gradient(log_pis_t + model.target_entropy)), axis=-1)) actor_loss = tf.reduce_mean( tf.reduce_sum( @@ -262,17 +265,19 @@ def sac_actor_critic_loss(policy, model, _, train_batch): axis=-1)) else: alpha_loss = -tf.reduce_mean( - model.log_alpha * tf.stop_gradient(log_pis_t + target_entropy)) + model.log_alpha * + tf.stop_gradient(log_pis_t + model.target_entropy)) actor_loss = tf.reduce_mean(model.alpha * log_pis_t - q_t_det_policy) # save for stats function + policy.policy_t = policy_t policy.q_t = q_t policy.td_error = td_error policy.actor_loss = actor_loss policy.critic_loss = critic_loss policy.alpha_loss = alpha_loss policy.alpha_value = model.alpha - policy.target_entropy = target_entropy + policy.target_entropy = model.target_entropy # in a custom apply op we handle the losses separately, but return them # combined in one loss for now @@ -280,12 +285,12 @@ def sac_actor_critic_loss(policy, model, _, train_batch): def gradients(policy, optimizer, loss): - if policy.config["grad_norm_clipping"]: + if policy.config["grad_clip"]: actor_grads_and_vars = minimize_and_clip( optimizer, # isn't optimizer not well defined here (which one)? policy.actor_loss, var_list=policy.model.policy_variables(), - clip_val=policy.config["grad_norm_clipping"]) + clip_val=policy.config["grad_clip"]) if policy.config["twin_q"]: q_variables = policy.model.q_variables() half_cutoff = len(q_variables) // 2 @@ -294,23 +299,23 @@ def gradients(policy, optimizer, loss): optimizer, policy.critic_loss[0], var_list=q_variables[:half_cutoff], - clip_val=policy.config["grad_norm_clipping"]) + clip_val=policy.config["grad_clip"]) critic_grads_and_vars += minimize_and_clip( optimizer, policy.critic_loss[1], var_list=q_variables[half_cutoff:], - clip_val=policy.config["grad_norm_clipping"]) + clip_val=policy.config["grad_clip"]) else: critic_grads_and_vars = minimize_and_clip( optimizer, policy.critic_loss[0], var_list=policy.model.q_variables(), - clip_val=policy.config["grad_norm_clipping"]) + clip_val=policy.config["grad_clip"]) alpha_grads_and_vars = minimize_and_clip( optimizer, policy.alpha_loss, var_list=[policy.model.log_alpha], - clip_val=policy.config["grad_norm_clipping"]) + clip_val=policy.config["grad_clip"]) else: actor_grads_and_vars = policy._actor_optimizer.compute_gradients( policy.actor_loss, var_list=policy.model.policy_variables()) @@ -366,7 +371,9 @@ def apply_gradients(policy, optimizer, grads_and_vars): def stats(policy, train_batch): return { - "td_error": tf.reduce_mean(policy.td_error), + # "policy_t": policy.policy_t, + # "td_error": policy.td_error, + "mean_td_error": tf.reduce_mean(policy.td_error), "actor_loss": tf.reduce_mean(policy.actor_loss), "critic_loss": tf.reduce_mean(policy.critic_loss), "alpha_loss": tf.reduce_mean(policy.alpha_loss), diff --git a/rllib/agents/sac/sac_torch_model.py b/rllib/agents/sac/sac_torch_model.py new file mode 100644 index 000000000..67e0a7826 --- /dev/null +++ b/rllib/agents/sac/sac_torch_model.py @@ -0,0 +1,208 @@ +from gym.spaces import Discrete +import numpy as np + +from ray.rllib.models.torch.misc import SlimFC +from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 +from ray.rllib.utils.framework import get_activation_fn, try_import_torch + +torch, nn = try_import_torch() + + +class SACTorchModel(TorchModelV2): + """Extension of standard TorchModelV2 for SAC. + + Data flow: + obs -> forward() -> model_out + model_out -> get_policy_output() -> pi(s) + model_out, actions -> get_q_values() -> Q(s, a) + model_out, actions -> get_twin_q_values() -> Q_twin(s, a) + + Note that this class by itself is not a valid model unless you + implement forward() in a subclass.""" + + def __init__(self, + obs_space, + action_space, + num_outputs, + model_config, + name, + actor_hidden_activation="relu", + actor_hiddens=(256, 256), + critic_hidden_activation="relu", + critic_hiddens=(256, 256), + twin_q=False, + initial_alpha=1.0, + target_entropy=None): + """Initialize variables of this model. + + Extra model kwargs: + actor_hidden_activation (str): activation for actor network + actor_hiddens (list): hidden layers sizes for actor network + critic_hidden_activation (str): activation for critic network + critic_hiddens (list): hidden layers sizes for critic network + twin_q (bool): build twin Q networks. + initial_alpha (float): The initial value for the to-be-optimized + alpha parameter (default: 1.0). + target_entropy (Optional[float]): An optional fixed value for the + SAC alpha loss term. None or "auto" for automatic calculation + of this value according to [1] (cont. actions) or [2] + (discrete actions). + + Note that the core layers for forward() are not defined here, this + only defines the layers for the output heads. Those layers for + forward() should be defined in subclasses of SACModel. + """ + super(SACTorchModel, self).__init__(obs_space, action_space, + num_outputs, model_config, name) + + if isinstance(action_space, Discrete): + self.action_dim = action_space.n + self.discrete = True + self.action_outs = q_outs = self.action_dim + self.action_ins = None # No action inputs for the discrete case. + else: + self.action_dim = np.product(action_space.shape) + self.discrete = False + self.action_outs = 2 * self.action_dim + self.action_ins = self.action_dim + q_outs = 1 + + # Build the policy network. + self.action_model = nn.Sequential() + ins = self.num_outputs + self.obs_ins = ins + activation = get_activation_fn( + actor_hidden_activation, framework="torch") + for i, n in enumerate(actor_hiddens): + self.action_model.add_module( + "action_{}".format(i), + SlimFC( + ins, + n, + initializer=torch.nn.init.xavier_uniform_, + activation_fn=activation)) + ins = n + self.action_model.add_module( + "action_out", + SlimFC( + ins, + self.action_outs, + initializer=torch.nn.init.xavier_uniform_, + activation_fn=None)) + + # Build the Q-net(s), including target Q-net(s). + def build_q_net(name_): + activation = get_activation_fn( + critic_hidden_activation, framework="torch") + # For continuous actions: Feed obs and actions (concatenated) + # through the NN. For discrete actions, only obs. + q_net = nn.Sequential() + ins = self.obs_ins + (0 if self.discrete else self.action_ins) + for i, n in enumerate(critic_hiddens): + q_net.add_module( + "{}_hidden_{}".format(name_, i), + SlimFC( + ins, + n, + initializer=torch.nn.init.xavier_uniform_, + activation_fn=activation)) + ins = n + + q_net.add_module( + "{}_out".format(name_), + SlimFC( + ins, + q_outs, + initializer=torch.nn.init.xavier_uniform_, + activation_fn=None)) + return q_net + + self.q_net = build_q_net("q") + if twin_q: + self.twin_q_net = build_q_net("twin_q") + else: + self.twin_q_net = None + + self.log_alpha = torch.tensor( + data=[np.log(initial_alpha)], + dtype=torch.float32, + requires_grad=True) + + # Auto-calculate the target entropy. + if target_entropy is None or target_entropy == "auto": + # See hyperparams in [2] (README.md). + if self.discrete: + target_entropy = 0.98 * np.array( + -np.log(1.0 / action_space.n), dtype=np.float32) + # See [1] (README.md). + else: + target_entropy = -np.prod(action_space.shape) + + self.target_entropy = torch.tensor( + data=[target_entropy], dtype=torch.float32, requires_grad=False) + + def get_q_values(self, model_out, actions=None): + """Return the Q estimates for the most recent forward pass. + + This implements Q(s, a). + + Arguments: + model_out (Tensor): obs embeddings from the model layers, of shape + [BATCH_SIZE, num_outputs]. + actions (Optional[Tensor]): Actions to return the Q-values for. + Shape: [BATCH_SIZE, action_dim]. If None (discrete action + case), return Q-values for all actions. + + Returns: + tensor of shape [BATCH_SIZE]. + """ + if actions is not None: + return self.q_net(torch.cat([model_out, actions], -1)) + else: + return self.q_net(model_out) + + def get_twin_q_values(self, model_out, actions=None): + """Same as get_q_values but using the twin Q net. + + This implements the twin Q(s, a). + + Arguments: + model_out (Tensor): obs embeddings from the model layers, of shape + [BATCH_SIZE, num_outputs]. + actions (Optional[Tensor]): Actions to return the Q-values for. + Shape: [BATCH_SIZE, action_dim]. If None (discrete action + case), return Q-values for all actions. + + Returns: + tensor of shape [BATCH_SIZE]. + """ + if actions is not None: + return self.twin_q_net(torch.cat([model_out, actions], -1)) + else: + return self.twin_q_net(model_out) + + def get_policy_output(self, model_out): + """Return the action output for the most recent forward pass. + + This outputs the support for pi(s). For continuous action spaces, this + is the action directly. For discrete, is is the mean / std dev. + + Arguments: + model_out (Tensor): obs embeddings from the model layers, of shape + [BATCH_SIZE, num_outputs]. + + Returns: + tensor of shape [BATCH_SIZE, action_out_size] + """ + return self.action_model(model_out) + + def policy_variables(self): + """Return the list of variables for the policy net.""" + + return list(self.action_model.parameters()) + + def q_variables(self): + """Return the list of variables for Q / twin Q nets.""" + + return list(self.q_net.parameters()) + \ + (list(self.twin_q_net.parameters()) if self.twin_q_net else []) diff --git a/rllib/agents/sac/sac_torch_policy.py b/rllib/agents/sac/sac_torch_policy.py new file mode 100644 index 000000000..503bc4979 --- /dev/null +++ b/rllib/agents/sac/sac_torch_policy.py @@ -0,0 +1,343 @@ +from gym.spaces import Discrete +import logging + +import ray +import ray.experimental.tf_utils +from ray.rllib.agents.a3c.a3c_torch_policy import apply_grad_clipping +from ray.rllib.agents.sac.sac_tf_policy import build_sac_model, \ + postprocess_trajectory +from ray.rllib.agents.dqn.dqn_tf_policy import PRIO_WEIGHTS +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.torch_policy_template import build_torch_policy +from ray.rllib.models.torch.torch_action_dist import ( + TorchCategorical, TorchSquashedGaussian, TorchDiagGaussian, TorchBeta) +from ray.rllib.utils import try_import_torch + +torch, nn = try_import_torch() +F = nn.functional + +logger = logging.getLogger(__name__) + + +def build_sac_model_and_action_dist(policy, obs_space, action_space, config): + model = build_sac_model(policy, obs_space, action_space, config) + action_dist_class = get_dist_class(config, action_space) + return model, action_dist_class + + +def get_dist_class(config, action_space): + if isinstance(action_space, Discrete): + return TorchCategorical + else: + if config["normalize_actions"]: + return TorchSquashedGaussian if \ + not config["_use_beta_distribution"] else TorchBeta + else: + return TorchDiagGaussian + + +def action_distribution_fn(policy, + model, + obs_batch, + *, + state_batches=None, + seq_lens=None, + prev_action_batch=None, + prev_reward_batch=None, + explore=None, + timestep=None, + is_training=None): + model_out, _ = model({ + "obs": obs_batch, + "is_training": is_training, + }, [], None) + distribution_inputs = model.get_policy_output(model_out) + action_dist_class = get_dist_class(policy.config, policy.action_space) + + return distribution_inputs, action_dist_class, [] + + +def actor_critic_loss(policy, model, _, train_batch): + # Should be True only for debugging purposes (e.g. test cases)! + deterministic = policy.config["_deterministic_loss"] + + model_out_t, _ = model({ + "obs": train_batch[SampleBatch.CUR_OBS], + "is_training": True, + }, [], None) + + model_out_tp1, _ = model({ + "obs": train_batch[SampleBatch.NEXT_OBS], + "is_training": True, + }, [], None) + + target_model_out_tp1, _ = policy.target_model({ + "obs": train_batch[SampleBatch.NEXT_OBS], + "is_training": True, + }, [], None) + + alpha = torch.exp(model.log_alpha) + + # Discrete case. + if model.discrete: + # Get all action probs directly from pi and form their logp. + log_pis_t = F.log_softmax(model.get_policy_output(model_out_t), dim=-1) + policy_t = torch.exp(log_pis_t) + log_pis_tp1 = F.log_softmax(model.get_policy_output(model_out_tp1), -1) + policy_tp1 = torch.exp(log_pis_tp1) + # Q-values. + q_t = model.get_q_values(model_out_t) + # Target Q-values. + q_tp1 = policy.target_model.get_q_values(target_model_out_tp1) + if policy.config["twin_q"]: + twin_q_t = model.get_twin_q_values(model_out_t) + twin_q_tp1 = policy.target_model.get_twin_q_values( + target_model_out_tp1) + q_tp1 = torch.min(q_tp1, twin_q_tp1) + q_tp1 -= alpha * log_pis_tp1 + + # Actually selected Q-values (from the actions batch). + one_hot = F.one_hot( + train_batch[SampleBatch.ACTIONS], num_classes=q_t.size()[-1]) + q_t_selected = torch.sum(q_t * one_hot, dim=-1) + if policy.config["twin_q"]: + twin_q_t_selected = torch.sum(twin_q_t * one_hot, dim=-1) + # Discrete case: "Best" means weighted by the policy (prob) outputs. + q_tp1_best = torch.sum(torch.mul(policy_tp1, q_tp1), dim=-1) + q_tp1_best_masked = \ + (1.0 - train_batch[SampleBatch.DONES].float()) * \ + q_tp1_best + # Continuous actions case. + else: + # Sample single actions from distribution. + action_dist_class = get_dist_class(policy.config, policy.action_space) + action_dist_t = action_dist_class( + model.get_policy_output(model_out_t), policy.model) + policy_t = action_dist_t.sample() if not deterministic else \ + action_dist_t.deterministic_sample() + log_pis_t = torch.unsqueeze(action_dist_t.logp(policy_t), -1) + action_dist_tp1 = action_dist_class( + model.get_policy_output(model_out_tp1), policy.model) + policy_tp1 = action_dist_tp1.sample() if not deterministic else \ + action_dist_tp1.deterministic_sample() + log_pis_tp1 = torch.unsqueeze(action_dist_tp1.logp(policy_tp1), -1) + + # Q-values for the actually selected actions. + q_t = model.get_q_values(model_out_t, train_batch[SampleBatch.ACTIONS]) + if policy.config["twin_q"]: + twin_q_t = model.get_twin_q_values( + model_out_t, train_batch[SampleBatch.ACTIONS]) + + # Q-values for current policy in given current state. + q_t_det_policy = model.get_q_values(model_out_t, policy_t) + if policy.config["twin_q"]: + twin_q_t_det_policy = model.get_twin_q_values( + model_out_t, policy_t) + q_t_det_policy = torch.min(q_t_det_policy, twin_q_t_det_policy) + + # Target q network evaluation. + q_tp1 = policy.target_model.get_q_values(target_model_out_tp1, + policy_tp1) + if policy.config["twin_q"]: + twin_q_tp1 = policy.target_model.get_twin_q_values( + target_model_out_tp1, policy_tp1) + # Take min over both twin-NNs. + q_tp1 = torch.min(q_tp1, twin_q_tp1) + + q_t_selected = torch.squeeze(q_t, dim=-1) + if policy.config["twin_q"]: + twin_q_t_selected = torch.squeeze(twin_q_t, dim=-1) + q_tp1 -= alpha * log_pis_tp1 + + q_tp1_best = torch.squeeze(input=q_tp1, dim=-1) + q_tp1_best_masked = (1.0 - train_batch[SampleBatch.DONES].float()) * \ + q_tp1_best + + assert policy.config["n_step"] == 1, "TODO(hartikainen) n_step > 1" + + # compute RHS of bellman equation + q_t_selected_target = ( + train_batch[SampleBatch.REWARDS] + + (policy.config["gamma"]**policy.config["n_step"]) * q_tp1_best_masked + ).detach() + + # Compute the TD-error (potentially clipped). + base_td_error = torch.abs(q_t_selected - q_t_selected_target) + if policy.config["twin_q"]: + twin_td_error = torch.abs(twin_q_t_selected - q_t_selected_target) + td_error = 0.5 * (base_td_error + twin_td_error) + else: + td_error = base_td_error + + critic_loss = [ + 0.5 * torch.mean(torch.pow(q_t_selected_target - q_t_selected, 2.0)) + ] + if policy.config["twin_q"]: + critic_loss.append(0.5 * torch.mean( + torch.pow(q_t_selected_target - twin_q_t_selected, 2.0))) + + # Alpha- and actor losses. + # Note: In the papers, alpha is used directly, here we take the log. + # Discrete case: Multiply the action probs as weights with the original + # loss terms (no expectations needed). + if model.discrete: + weighted_log_alpha_loss = policy_t.detach() * ( + -model.log_alpha * (log_pis_t + model.target_entropy).detach()) + # Sum up weighted terms and mean over all batch items. + alpha_loss = torch.mean(torch.sum(weighted_log_alpha_loss, dim=-1)) + # Actor loss. + actor_loss = torch.mean( + torch.sum( + torch.mul( + # NOTE: No stop_grad around policy output here + # (compare with q_t_det_policy for continuous case). + policy_t, + alpha.detach() * log_pis_t - q_t.detach()), + dim=-1)) + else: + alpha_loss = -torch.mean(model.log_alpha * + (log_pis_t + model.target_entropy).detach()) + # Note: Do not detach q_t_det_policy here b/c is depends partly + # on the policy vars (policy sample pushed through Q-net). + # However, we must make sure `actor_loss` is not used to update + # the Q-net(s)' variables. + actor_loss = torch.mean(alpha.detach() * log_pis_t - q_t_det_policy) + + # Save for stats function. + policy.q_t = q_t + policy.policy_t = policy_t + policy.log_pis_t = log_pis_t + policy.td_error = td_error + policy.actor_loss = actor_loss + policy.critic_loss = critic_loss + policy.alpha_loss = alpha_loss + policy.log_alpha_value = model.log_alpha + policy.alpha_value = alpha + policy.target_entropy = model.target_entropy + + # Return all loss terms corresponding to our optimizers. + return tuple([policy.actor_loss] + policy.critic_loss + + [policy.alpha_loss]) + + +def stats(policy, train_batch): + return { + "td_error": policy.td_error, + "mean_td_error": torch.mean(policy.td_error), + "actor_loss": torch.mean(policy.actor_loss), + "critic_loss": torch.mean(torch.stack(policy.critic_loss)), + "alpha_loss": torch.mean(policy.alpha_loss), + "alpha_value": torch.mean(policy.alpha_value), + "log_alpha_value": torch.mean(policy.log_alpha_value), + "target_entropy": policy.target_entropy, + "policy_t": torch.mean(policy.policy_t), + "mean_q": torch.mean(policy.q_t), + "max_q": torch.max(policy.q_t), + "min_q": torch.min(policy.q_t), + } + + +def optimizer_fn(policy, config): + """Creates all necessary optimizers for SAC learning. + + The 3 or 4 (twin_q=True) optimizers returned here correspond to the + number of loss terms returned by the loss function. + """ + policy.actor_optim = torch.optim.Adam( + params=policy.model.policy_variables(), + lr=config["optimization"]["actor_learning_rate"], + eps=1e-7, # to match tf.keras.optimizers.Adam's epsilon default + ) + + critic_split = len(policy.model.q_variables()) + if config["twin_q"]: + critic_split //= 2 + + policy.critic_optims = [ + torch.optim.Adam( + params=policy.model.q_variables()[:critic_split], + lr=config["optimization"]["critic_learning_rate"], + eps=1e-7, # to match tf.keras.optimizers.Adam's epsilon default + ) + ] + if config["twin_q"]: + policy.critic_optims.append( + torch.optim.Adam( + params=policy.model.q_variables()[critic_split:], + lr=config["optimization"]["critic_learning_rate"], + eps=1e-7, # to match tf.keras.optimizers.Adam's eps default + )) + policy.alpha_optim = torch.optim.Adam( + params=[policy.model.log_alpha], + lr=config["optimization"]["entropy_learning_rate"], + eps=1e-7, # to match tf.keras.optimizers.Adam's epsilon default + ) + + return tuple([policy.actor_optim] + policy.critic_optims + + [policy.alpha_optim]) + + +class ComputeTDErrorMixin: + def __init__(self): + def compute_td_error(obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + input_dict = self._lazy_tensor_dict({ + SampleBatch.CUR_OBS: obs_t, + SampleBatch.ACTIONS: act_t, + SampleBatch.REWARDS: rew_t, + SampleBatch.NEXT_OBS: obs_tp1, + SampleBatch.DONES: done_mask, + PRIO_WEIGHTS: importance_weights, + }) + # Do forward pass on loss to update td errors attribute + # (one TD-error value per item in batch to update PR weights). + actor_critic_loss(self, self.model, None, input_dict) + + # Self.td_error is set within actor_critic_loss call. + return self.td_error + + self.compute_td_error = compute_td_error + + +class TargetNetworkMixin: + def __init__(self): + # Hard initial update from Q-net(s) to target Q-net(s). + self.update_target(tau=1.0) + + def update_target(self, tau=None): + # Update_target_fn will be called periodically to copy Q network to + # target Q network, using (soft) tau-synching. + tau = tau or self.config.get("tau") + model_state_dict = self.model.state_dict() + # Support partial (soft) synching. + # If tau == 1.0: Full sync from Q-model to target Q-model. + if tau != 1.0: + target_state_dict = self.target_model.state_dict() + model_state_dict = { + k: tau * model_state_dict[k] + (1 - tau) * v + for k, v in target_state_dict.items() + } + self.target_model.load_state_dict(model_state_dict) + + +def setup_late_mixins(policy, obs_space, action_space, config): + policy.target_model = policy.target_model.to(policy.device) + policy.model.log_alpha = policy.model.log_alpha.to(policy.device) + policy.model.target_entropy = policy.model.target_entropy.to(policy.device) + ComputeTDErrorMixin.__init__(policy) + TargetNetworkMixin.__init__(policy) + + +SACTorchPolicy = build_torch_policy( + name="SACTorchPolicy", + loss_fn=actor_critic_loss, + get_default_config=lambda: ray.rllib.agents.sac.sac.DEFAULT_CONFIG, + stats_fn=stats, + postprocess_fn=postprocess_trajectory, + extra_grad_process_fn=apply_grad_clipping, + optimizer_fn=optimizer_fn, + after_init=setup_late_mixins, + make_model_and_action_dist=build_sac_model_and_action_dist, + mixins=[TargetNetworkMixin, ComputeTDErrorMixin], + action_distribution_fn=action_distribution_fn, +) diff --git a/rllib/agents/sac/tests/test_sac.py b/rllib/agents/sac/tests/test_sac.py index 1cc814e15..c0f7b7f89 100644 --- a/rllib/agents/sac/tests/test_sac.py +++ b/rllib/agents/sac/tests/test_sac.py @@ -1,33 +1,487 @@ +from gym import Env +from gym.spaces import Box +import numpy as np +import re import unittest -import ray import ray.rllib.agents.sac as sac -from ray.rllib.utils.framework import try_import_tf -from ray.rllib.utils.test_utils import framework_iterator +from ray.rllib.agents.sac.sac_torch_policy import actor_critic_loss as \ + loss_torch +from ray.rllib.models.tf.tf_action_dist import SquashedGaussian +from ray.rllib.models.torch.torch_action_dist import TorchSquashedGaussian +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.framework import try_import_tf, try_import_torch +from ray.rllib.utils.numpy import fc, relu +from ray.rllib.utils.test_utils import check, framework_iterator +from ray.rllib.utils.torch_ops import convert_to_torch_tensor tf = try_import_tf() +torch, _ = try_import_torch() + + +class SimpleEnv(Env): + def __init__(self, config): + self.action_space = Box(0.0, 1.0, (1, )) + self.observation_space = Box(0.0, 1.0, (1, )) + self.max_steps = config.get("max_steps", 100) + self.state = None + self.steps = None + + def reset(self): + self.state = self.observation_space.sample() + self.steps = 0 + return self.state + + def step(self, action): + self.steps += 1 + # Reward is 1.0 - (action - state). + [r] = 1.0 - np.abs(action - self.state) + d = self.steps >= self.max_steps + self.state = self.observation_space.sample() + return self.state, r, d, {} class TestSAC(unittest.TestCase): def test_sac_compilation(self): """Test whether an SACTrainer can be built with all frameworks.""" - ray.init() config = sac.DEFAULT_CONFIG.copy() config["num_workers"] = 0 # Run locally. + config["twin_q"] = True + config["soft_horizon"] = True + config["clip_actions"] = False + config["normalize_actions"] = True + config["learning_starts"] = 0 + config["prioritized_replay"] = True num_iterations = 1 - - # eager (discrete and cont. actions). - for _ in framework_iterator(config, ["tf", "eager"]): + for _ in framework_iterator(config, ("tf", "torch")): + # Test for different env types (discrete w/ and w/o image, + cont). for env in [ - "CartPole-v0", - "Pendulum-v0", + "Pendulum-v0", "MsPacmanNoFrameskip-v4", "CartPole-v0" ]: print("Env={}".format(env)) + config["use_state_preprocessor"] = \ + env == "MsPacmanNoFrameskip-v4" trainer = sac.SACTrainer(config=config, env=env) for i in range(num_iterations): results = trainer.train() print(results) + def test_sac_loss_function(self): + """Tests SAC function results across all frameworks.""" + config = sac.DEFAULT_CONFIG.copy() + # Run locally. + config["num_workers"] = 0 + config["learning_starts"] = 0 + config["twin_q"] = False + config["gamma"] = 0.99 + # Switch on deterministic loss so we can compare the loss values. + config["_deterministic_loss"] = True + # Use very simple nets. + config["Q_model"]["fcnet_hiddens"] = [10] + config["policy_model"]["fcnet_hiddens"] = [10] + # Make sure, timing differences do not affect trainer.train(). + config["min_iter_time_s"] = 0 + + map_ = { + # Normal net. + "default_policy/sequential/action_1/kernel": "action_model." + "action_0._model.0.weight", + "default_policy/sequential/action_1/bias": "action_model." + "action_0._model.0.bias", + "default_policy/sequential/action_out/kernel": "action_model." + "action_out._model.0.weight", + "default_policy/sequential/action_out/bias": "action_model." + "action_out._model.0.bias", + "default_policy/sequential_1/q_hidden_0/kernel": "q_net." + "q_hidden_0._model.0.weight", + "default_policy/sequential_1/q_hidden_0/bias": "q_net." + "q_hidden_0._model.0.bias", + "default_policy/sequential_1/q_out/kernel": "q_net." + "q_out._model.0.weight", + "default_policy/sequential_1/q_out/bias": "q_net." + "q_out._model.0.bias", + "default_policy/value_out/kernel": "_value_branch." + "_model.0.weight", + "default_policy/value_out/bias": "_value_branch." + "_model.0.bias", + # Target net. + "default_policy/sequential_2/action_1/kernel": "action_model." + "action_0._model.0.weight", + "default_policy/sequential_2/action_1/bias": "action_model." + "action_0._model.0.bias", + "default_policy/sequential_2/action_out/kernel": "action_model." + "action_out._model.0.weight", + "default_policy/sequential_2/action_out/bias": "action_model." + "action_out._model.0.bias", + "default_policy/sequential_3/q_hidden_0/kernel": "q_net." + "q_hidden_0._model.0.weight", + "default_policy/sequential_3/q_hidden_0/bias": "q_net." + "q_hidden_0._model.0.bias", + "default_policy/sequential_3/q_out/kernel": "q_net." + "q_out._model.0.weight", + "default_policy/sequential_3/q_out/bias": "q_net." + "q_out._model.0.bias", + "default_policy/value_out_1/kernel": "_value_branch." + "_model.0.weight", + "default_policy/value_out_1/bias": "_value_branch." + "_model.0.bias", + } + + env = SimpleEnv + batch_size = 100 + if env is SimpleEnv: + obs_size = (batch_size, 1) + actions = np.random.random(size=(batch_size, 1)) + elif env == "CartPole-v0": + obs_size = (batch_size, 4) + actions = np.random.randint(0, 2, size=(batch_size, )) + else: + obs_size = (batch_size, 3) + actions = np.random.random(size=(batch_size, 1)) + + # Batch of size=n. + input_ = self._get_batch_helper(obs_size, actions, batch_size) + + # Simply compare loss values AND grads of all frameworks with each + # other. + prev_fw_loss = weights_dict = None + expect_c, expect_a, expect_e, expect_t = None, None, None, None + # History of tf-updated NN-weights over n training steps. + tf_updated_weights = [] + # History of input batches used. + tf_inputs = [] + for fw, sess in framework_iterator( + config, frameworks=("tf", "torch"), session=True): + # Generate Trainer and get its default Policy object. + trainer = sac.SACTrainer(config=config, env=env) + policy = trainer.get_policy() + p_sess = None + if sess: + p_sess = policy.get_session() + + # Set all weights (of all nets) to fixed values. + if weights_dict is None: + assert fw == "tf" # Start with the tf vars-dict. + weights_dict = policy.get_weights() + else: + assert fw == "torch" # Then transfer that to torch Model. + model_dict = self._translate_weights_to_torch( + weights_dict, map_) + policy.model.load_state_dict(model_dict) + policy.target_model.load_state_dict(model_dict) + + if fw == "tf": + log_alpha = weights_dict["default_policy/log_alpha"] + elif fw == "torch": + # Actually convert to torch tensors. + input_ = policy._lazy_tensor_dict(input_) + input_ = {k: input_[k] for k in input_.keys()} + log_alpha = policy.model.log_alpha.detach().numpy()[0] + + # Only run the expectation once, should be the same anyways + # for all frameworks. + if expect_c is None: + expect_c, expect_a, expect_e, expect_t = \ + self._sac_loss_helper(input_, weights_dict, + sorted(weights_dict.keys()), + log_alpha, fw, + gamma=config["gamma"], sess=sess) + + # Get actual outs and compare to expectation AND previous + # framework. c=critic, a=actor, e=entropy, t=td-error. + if fw == "tf": + c, a, e, t, tf_c_grads, tf_a_grads, tf_e_grads = \ + p_sess.run([ + policy.critic_loss, + policy.actor_loss, + policy.alpha_loss, + policy.td_error, + policy.optimizer().compute_gradients( + policy.critic_loss[0], + policy.model.q_variables()), + policy.optimizer().compute_gradients( + policy.actor_loss, + policy.model.policy_variables()), + policy.optimizer().compute_gradients( + policy.alpha_loss, policy.model.log_alpha)], + feed_dict=policy._get_loss_inputs_dict( + input_, shuffle=False)) + tf_c_grads = [g for g, v in tf_c_grads] + tf_a_grads = [g for g, v in tf_a_grads] + tf_e_grads = [g for g, v in tf_e_grads] + + elif fw == "torch": + loss_torch(policy, policy.model, None, input_) + c, a, e, t = policy.critic_loss, policy.actor_loss, \ + policy.alpha_loss, policy.td_error + + # Test actor gradients. + policy.actor_optim.zero_grad() + assert all(v.grad is None for v in policy.model.q_variables()) + assert all( + v.grad is None for v in policy.model.policy_variables()) + assert policy.model.log_alpha.grad is None + a.backward() + # `actor_loss` depends on Q-net vars (but these grads must + # be ignored and overridden in critic_loss.backward!). + assert not any(v.grad is None + for v in policy.model.q_variables()) + assert not all( + torch.mean(v.grad) == 0 + for v in policy.model.policy_variables()) + assert not all( + torch.min(v.grad) == 0 + for v in policy.model.policy_variables()) + assert policy.model.log_alpha.grad is None + # Compare with tf ones. + torch_a_grads = [ + v.grad for v in policy.model.policy_variables() + ] + for tf_g, torch_g in zip(tf_a_grads, torch_a_grads): + if tf_g.shape != torch_g.shape: + check(tf_g, np.transpose(torch_g)) + else: + check(tf_g, torch_g) + + # Test critic gradients. + policy.critic_optims[0].zero_grad() + assert all( + torch.mean(v.grad) == 0.0 + for v in policy.model.q_variables()) + assert all( + torch.min(v.grad) == 0.0 + for v in policy.model.q_variables()) + assert policy.model.log_alpha.grad is None + c[0].backward() + assert not all( + torch.mean(v.grad) == 0 + for v in policy.model.q_variables()) + assert not all( + torch.min(v.grad) == 0 for v in policy.model.q_variables()) + assert policy.model.log_alpha.grad is None + # Compare with tf ones. + torch_c_grads = [v.grad for v in policy.model.q_variables()] + for tf_g, torch_g in zip(tf_c_grads, torch_c_grads): + if tf_g.shape != torch_g.shape: + check(tf_g, np.transpose(torch_g)) + else: + check(tf_g, torch_g) + # Compare (unchanged(!) actor grads) with tf ones. + torch_a_grads = [ + v.grad for v in policy.model.policy_variables() + ] + for tf_g, torch_g in zip(tf_a_grads, torch_a_grads): + if tf_g.shape != torch_g.shape: + check(tf_g, np.transpose(torch_g)) + else: + check(tf_g, torch_g) + + # Test alpha gradient. + policy.alpha_optim.zero_grad() + assert policy.model.log_alpha.grad is None + e.backward() + assert policy.model.log_alpha.grad is not None + check(policy.model.log_alpha.grad, tf_e_grads) + + check(c, expect_c) + check(a, expect_a) + check(e, expect_e) + check(t, expect_t) + + # Store this framework's losses in prev_fw_loss to compare with + # next framework's outputs. + if prev_fw_loss is not None: + check(c, prev_fw_loss[0]) + check(a, prev_fw_loss[1]) + check(e, prev_fw_loss[2]) + check(t, prev_fw_loss[3]) + + prev_fw_loss = (c, a, e, t) + + # Update weights from our batch (n times). + for update_iteration in range(10): + print("train iteration {}".format(update_iteration)) + if fw == "tf": + in_ = self._get_batch_helper(obs_size, actions, batch_size) + tf_inputs.append(in_) + # Set a fake-batch to use + # (instead of sampling from replay buffer). + trainer.optimizer._fake_batch = in_ + trainer.train() + updated_weights = policy.get_weights() + # Net must have changed. + if tf_updated_weights: + check( + updated_weights[ + "default_policy/sequential/action_1/kernel"], + tf_updated_weights[-1][ + "default_policy/sequential/action_1/kernel"], + false=True) + tf_updated_weights.append(updated_weights) + + # Compare with updated tf-weights. Must all be the same. + else: + tf_weights = tf_updated_weights[update_iteration] + in_ = tf_inputs[update_iteration] + # Set a fake-batch to use + # (instead of sampling from replay buffer). + trainer.optimizer._fake_batch = in_ + trainer.train() + # Compare updated model. + for tf_key in sorted(tf_weights.keys())[2:10]: + tf_var = tf_weights[tf_key] + torch_var = policy.model.state_dict()[map_[tf_key]] + if tf_var.shape != torch_var.shape: + check(tf_var, np.transpose(torch_var), rtol=0.01) + else: + check(tf_var, torch_var, rtol=0.01) + # And alpha. + check(policy.model.log_alpha, + tf_weights["default_policy/log_alpha"]) + # Compare target nets. + for tf_key in sorted(tf_weights.keys())[10:18]: + tf_var = tf_weights[tf_key] + torch_var = policy.target_model.state_dict()[map_[ + tf_key]] + if tf_var.shape != torch_var.shape: + check(tf_var, np.transpose(torch_var), rtol=0.01) + else: + check(tf_var, torch_var, rtol=0.01) + + def _get_batch_helper(self, obs_size, actions, batch_size): + return { + SampleBatch.CUR_OBS: np.random.random(size=obs_size), + SampleBatch.ACTIONS: actions, + SampleBatch.REWARDS: np.random.random(size=(batch_size, )), + SampleBatch.DONES: np.random.choice( + [True, False], size=(batch_size, )), + SampleBatch.NEXT_OBS: np.random.random(size=obs_size) + } + + def _sac_loss_helper(self, train_batch, weights, ks, log_alpha, fw, gamma, + sess): + """Emulates SAC loss functions for tf and torch.""" + # ks: + # 0=log_alpha + # 1=target log-alpha (not used) + + # 2=action hidden bias + # 3=action hidden kernel + # 4=action out bias + # 5=action out kernel + + # 6=Q hidden bias + # 7=Q hidden kernel + # 8=Q out bias + # 9=Q out kernel + + # 14=target Q hidden bias + # 15=target Q hidden kernel + # 16=target Q out bias + # 17=target Q out kernel + alpha = np.exp(log_alpha) + cls = TorchSquashedGaussian if fw == "torch" else SquashedGaussian + model_out_t = train_batch[SampleBatch.CUR_OBS] + model_out_tp1 = train_batch[SampleBatch.NEXT_OBS] + target_model_out_tp1 = train_batch[SampleBatch.NEXT_OBS] + + # get_policy_output + action_dist_t = cls( + fc( + relu( + fc(model_out_t, + weights[ks[3]], + weights[ks[2]], + framework=fw)), weights[ks[5]], weights[ks[4]]), None) + policy_t = action_dist_t.deterministic_sample() + log_pis_t = action_dist_t.logp(policy_t) + if sess: + log_pis_t = sess.run(log_pis_t) + policy_t = sess.run(policy_t) + log_pis_t = np.expand_dims(log_pis_t, -1) + + # Get policy output for t+1. + action_dist_tp1 = cls( + fc( + relu( + fc(model_out_tp1, + weights[ks[3]], + weights[ks[2]], + framework=fw)), weights[ks[5]], weights[ks[4]]), None) + policy_tp1 = action_dist_tp1.deterministic_sample() + log_pis_tp1 = action_dist_tp1.logp(policy_tp1) + if sess: + log_pis_tp1 = sess.run(log_pis_tp1) + policy_tp1 = sess.run(policy_tp1) + log_pis_tp1 = np.expand_dims(log_pis_tp1, -1) + + # Q-values for the actually selected actions. + # get_q_values + q_t = fc( + relu( + fc(np.concatenate( + [model_out_t, train_batch[SampleBatch.ACTIONS]], -1), + weights[ks[7]], + weights[ks[6]], + framework=fw)), + weights[ks[9]], + weights[ks[8]], + framework=fw) + + # Q-values for current policy in given current state. + # get_q_values + q_t_det_policy = fc( + relu( + fc(np.concatenate([model_out_t, policy_t], -1), + weights[ks[7]], + weights[ks[6]], + framework=fw)), + weights[ks[9]], + weights[ks[8]], + framework=fw) + + # Target q network evaluation. + # target_model.get_q_values + q_tp1 = fc( + relu( + fc(np.concatenate([target_model_out_tp1, policy_tp1], -1), + weights[ks[15]], + weights[ks[14]], + framework=fw)), + weights[ks[17]], + weights[ks[16]], + framework=fw) + + q_t_selected = np.squeeze(q_t, axis=-1) + q_tp1 -= alpha * log_pis_tp1 + q_tp1_best = np.squeeze(q_tp1, axis=-1) + dones = train_batch[SampleBatch.DONES] + rewards = train_batch[SampleBatch.REWARDS] + if fw == "torch": + dones = dones.float().numpy() + rewards = rewards.numpy() + q_tp1_best_masked = (1.0 - dones) * q_tp1_best + q_t_selected_target = rewards + gamma * q_tp1_best_masked + base_td_error = np.abs(q_t_selected - q_t_selected_target) + td_error = base_td_error + critic_loss = [ + 0.5 * np.mean(np.power(q_t_selected_target - q_t_selected, 2.0)) + ] + target_entropy = -np.prod((1, )) + alpha_loss = -np.mean(log_alpha * (log_pis_t + target_entropy)) + actor_loss = np.mean(alpha * log_pis_t - q_t_det_policy) + + return critic_loss, actor_loss, alpha_loss, td_error + + def _translate_weights_to_torch(self, weights_dict, map_): + model_dict = { + map_[k]: convert_to_torch_tensor( + np.transpose(v) if re.search("kernel", k) else v) + for k, v in weights_dict.items() + if re.search("(sequential(/|_1)|value_out/)", k) + } + return model_dict + if __name__ == "__main__": import pytest diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 5f47f3c8a..072b4a072 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -678,13 +678,6 @@ class Trainer(Trainable): Note that this default implementation does not do anything beyond merging evaluation_config with the normal trainer config. """ - if not self.config["evaluation_config"]: - raise ValueError( - "No evaluation_config specified. It doesn't make sense " - "to enable evaluation without specifying any config " - "overrides, since the results will be the " - "same as reported during normal policy evaluation.") - self._before_evaluate() # Broadcast the new policy weights to all evaluation workers. diff --git a/rllib/agents/trainer_template.py b/rllib/agents/trainer_template.py index c2714e3c8..b2601d76b 100644 --- a/rllib/agents/trainer_template.py +++ b/rllib/agents/trainer_template.py @@ -146,8 +146,10 @@ def build_trainer(name, prev_steps = self.optimizer.num_steps_sampled start = time.time() + optimizer_steps_this_iter = 0 while True: fetches = self.optimizer.step() + optimizer_steps_this_iter += 1 if after_optimizer_step: after_optimizer_step(self, fetches) if (time.time() - start >= self.config["min_iter_time_s"] @@ -160,6 +162,7 @@ def build_trainer(name, else: res = self.collect_metrics() res.update( + optimizer_steps_this_iter=optimizer_steps_this_iter, timesteps_this_iter=self.optimizer.num_steps_sampled - prev_steps, info=res.get("info", {})) diff --git a/rllib/contrib/alpha_zero/core/alpha_zero_policy.py b/rllib/contrib/alpha_zero/core/alpha_zero_policy.py index 46f24c92e..3fab613af 100644 --- a/rllib/contrib/alpha_zero/core/alpha_zero_policy.py +++ b/rllib/contrib/alpha_zero/core/alpha_zero_policy.py @@ -116,11 +116,12 @@ class AlphaZeroPolicy(TorchPolicy): loss_out, policy_loss, value_loss = self._loss( self, self.model, self.dist_class, train_batch) - self._optimizer.zero_grad() + self._optimizers[0].zero_grad() loss_out.backward() - grad_process_info = self.extra_grad_process() - self._optimizer.step() + grad_process_info = self.extra_grad_process(self._optimizers[0], + loss_out) + self._optimizers[0].step() grad_info = self.extra_grad_info(train_batch) grad_info.update(grad_process_info) diff --git a/rllib/env/vector_env.py b/rllib/env/vector_env.py index 4528c9922..db2292128 100644 --- a/rllib/env/vector_env.py +++ b/rllib/env/vector_env.py @@ -106,8 +106,8 @@ class _VectorizedGymEnv(VectorEnv): obs, r, done, info = self.envs[i].step(actions[i]) if not np.isscalar(r) or not np.isreal(r) or not np.isfinite(r): raise ValueError( - "Reward should be finite scalar, got {} ({})".format( - r, type(r))) + "Reward should be finite scalar, got {} ({}). " + "Actions={}.".format(r, type(r), actions[i])) if type(info) is not dict: raise ValueError("Info should be a dict, got {} ({})".format( info, type(info))) diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index 65fa238d6..9c843bff6 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -10,14 +10,12 @@ from ray.rllib.models.action_dist import ActionDistribution from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.models.preprocessors import get_preprocessor from ray.rllib.models.tf.fcnet_v1 import FullyConnectedNetwork -from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2 from ray.rllib.models.tf.lstm_v1 import LSTM from ray.rllib.models.tf.modelv1_compat import make_v1_wrapper from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \ Deterministic, DiagGaussian, MultiActionDistribution, Dirichlet from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.visionnet_v1 import VisionNetwork -from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2 from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 from ray.rllib.models.torch.torch_action_dist import TorchCategorical, \ TorchMultiCategorical, TorchDiagGaussian @@ -331,8 +329,8 @@ class ModelCatalog: v2_class = None # try to get a default v2 model if not model_config.get("custom_model"): - v2_class = default_model or ModelCatalog._get_v2_model( - obs_space, model_config) + v2_class = default_model or ModelCatalog._get_v2_model_class( + obs_space, model_config, framework=framework) # fallback to a default v1 model if v2_class is None: if tf.executing_eagerly(): @@ -347,12 +345,10 @@ class ModelCatalog: return wrapper(obs_space, action_space, num_outputs, model_config, name, **model_kwargs) elif framework == "torch": - if default_model: - return default_model(obs_space, action_space, num_outputs, - model_config, name) - v2_class = ModelCatalog._get_default_torch_model_class_v2( - obs_space, action_space, num_outputs, model_config, name) - # wrap in the requested interface + v2_class = \ + default_model or ModelCatalog._get_v2_model_class( + obs_space, model_config, framework=framework) + # Wrap in the requested interface. wrapper = ModelCatalog._wrap_if_needed(v2_class, model_interface) return wrapper(obs_space, action_space, num_outputs, model_config, name, **model_kwargs) @@ -469,24 +465,6 @@ class ModelCatalog: return wrapper - @staticmethod - def _get_default_torch_model_class_v2(obs_space, action_space, num_outputs, - model_config, name): - from ray.rllib.models.torch.fcnet import (FullyConnectedNetwork as - PyTorchFCNet) - from ray.rllib.models.torch.visionnet import (VisionNetwork as - PyTorchVisionNet) - model_config = model_config or MODEL_DEFAULTS - if model_config.get("use_lstm"): - raise NotImplementedError( - "LSTM auto-wrapping not implemented for torch") - - if isinstance(obs_space, gym.spaces.Discrete) or \ - len(obs_space.shape) <= 2: - return PyTorchFCNet - else: - return PyTorchVisionNet - @staticmethod def get_model(input_dict, obs_space, @@ -544,17 +522,29 @@ class ModelCatalog: num_outputs, options) @staticmethod - def _get_v2_model(obs_space, options): - options = options or MODEL_DEFAULTS - obs_rank = len(obs_space.shape) + def _get_v2_model_class(obs_space, model_config, framework="tf"): + model_config = model_config or MODEL_DEFAULTS + if framework == "torch": + from ray.rllib.models.torch.fcnet import (FullyConnectedNetwork as + FCNet) + from ray.rllib.models.torch.visionnet import (VisionNetwork as + VisionNet) + if model_config.get("use_lstm"): + raise NotImplementedError( + "LSTM auto-wrapping not implemented for torch") + else: + from ray.rllib.models.tf.fcnet_v2 import \ + FullyConnectedNetwork as FCNet + from ray.rllib.models.tf.visionnet_v2 import \ + VisionNetwork as VisionNet - if options.get("use_lstm"): - return None # TODO: default LSTM v2 not implemented - - if obs_rank > 2: - return VisionNetV2 - - return FCNetV2 + # Discrete/1D obs-spaces. + if isinstance(obs_space, gym.spaces.Discrete) or \ + len(obs_space.shape) <= 2: + return FCNet + # Default Conv2D net. + else: + return VisionNet @staticmethod def get_torch_model(obs_space, diff --git a/rllib/models/tests/test_distributions.py b/rllib/models/tests/test_distributions.py index d23123371..ebd3525ac 100644 --- a/rllib/models/tests/test_distributions.py +++ b/rllib/models/tests/test_distributions.py @@ -1,13 +1,15 @@ import numpy as np from gym.spaces import Box -from scipy.stats import norm +from scipy.stats import norm, beta import unittest from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \ SquashedGaussian, GumbelSoftmax -from ray.rllib.models.torch.torch_action_dist import TorchMultiCategorical +from ray.rllib.models.torch.torch_action_dist import TorchMultiCategorical, \ + TorchSquashedGaussian, TorchBeta from ray.rllib.utils import try_import_tf, try_import_torch -from ray.rllib.utils.numpy import MIN_LOG_NN_OUTPUT, MAX_LOG_NN_OUTPUT, softmax +from ray.rllib.utils.numpy import MIN_LOG_NN_OUTPUT, MAX_LOG_NN_OUTPUT, \ + softmax, SMALL_NUMBER from ray.rllib.utils.test_utils import check, framework_iterator tf = try_import_tf() @@ -97,17 +99,17 @@ class TestDistributions(unittest.TestCase): check(out, expected_entropy) def test_squashed_gaussian(self): - """Tests the SquashedGaussia ActionDistribution (tf-eager only).""" - for fw, sess in framework_iterator( - frameworks=["tf", "eager"], session=True): - input_space = Box(-1.0, 1.0, shape=(200, 10)) - low, high = -2.0, 1.0 + """Tests the SquashedGaussian ActionDistribution for all frameworks.""" + input_space = Box(-2.0, 2.0, shape=(200, 10)) + low, high = -2.0, 1.0 + + for fw, sess in framework_iterator(session=True): + cls = SquashedGaussian if fw != "torch" else TorchSquashedGaussian # Batch of size=n and deterministic. inputs = input_space.sample() means, _ = np.split(inputs, 2, axis=-1) - squashed_distribution = SquashedGaussian( - inputs, {}, low=low, high=high) + squashed_distribution = cls(inputs, {}, low=low, high=high) expected = ((np.tanh(means) + 1.0) / 2.0) * (high - low) + low # Sample n times, expect always mean value (deterministic draw). out = squashed_distribution.deterministic_sample() @@ -116,41 +118,50 @@ class TestDistributions(unittest.TestCase): # Batch of size=n and non-deterministic -> expect roughly the mean. inputs = input_space.sample() means, log_stds = np.split(inputs, 2, axis=-1) - squashed_distribution = SquashedGaussian( - inputs, {}, low=low, high=high) + squashed_distribution = cls(inputs, {}, low=low, high=high) expected = ((np.tanh(means) + 1.0) / 2.0) * (high - low) + low values = squashed_distribution.sample() if sess: values = sess.run(values) + else: + values = values.numpy() self.assertTrue(np.max(values) < high) self.assertTrue(np.min(values) > low) check(np.mean(values), expected.mean(), decimals=1) # Test log-likelihood outputs. - sampled_action_logp = squashed_distribution.logp(values) + sampled_action_logp = squashed_distribution.logp( + values if fw != "torch" else torch.Tensor(values)) if sess: sampled_action_logp = sess.run(sampled_action_logp) + else: + sampled_action_logp = sampled_action_logp.numpy() # Convert to parameters for distr. stds = np.exp( np.clip(log_stds, MIN_LOG_NN_OUTPUT, MAX_LOG_NN_OUTPUT)) # Unsquash values, then get log-llh from regular gaussian. - unsquashed_values = np.arctanh((values - low) / - (high - low) * 2.0 - 1.0) - log_prob_unsquashed = \ - np.sum(np.log(norm.pdf(unsquashed_values, means, stds)), -1) + # atanh_in = np.clip((values - low) / (high - low) * 2.0 - 1.0, + # -1.0 + SMALL_NUMBER, 1.0 - SMALL_NUMBER) + atanh_in = (values - low) / (high - low) * 2.0 - 1.0 + unsquashed_values = np.arctanh(atanh_in) + log_prob_unsquashed = np.sum( + np.log( + norm.pdf(unsquashed_values, means, stds) + SMALL_NUMBER), + -1) log_prob = log_prob_unsquashed - \ np.sum(np.log(1 - np.tanh(unsquashed_values) ** 2), axis=-1) - check(np.mean(sampled_action_logp), np.mean(log_prob), rtol=0.01) + check(np.sum(sampled_action_logp), np.sum(log_prob), rtol=0.05) # NN output. means = np.array([[0.1, 0.2, 0.3, 0.4, 50.0], [-0.1, -0.2, -0.3, -0.4, -1.0]]) log_stds = np.array([[0.8, -0.2, 0.3, -1.0, 2.0], [0.7, -0.3, 0.4, -0.9, 2.0]]) - squashed_distribution = SquashedGaussian( - np.concatenate([means, log_stds], axis=-1), {}, + squashed_distribution = cls( + inputs=np.concatenate([means, log_stds], axis=-1), + model={}, low=low, high=high) # Convert to parameters for distr. @@ -168,10 +179,57 @@ class TestDistributions(unittest.TestCase): np.sum(np.log(1 - np.tanh(unsquashed_values) ** 2), axis=-1) - outs = squashed_distribution.logp(values) + outs = squashed_distribution.logp(values if fw != "torch" else + torch.Tensor(values)) if sess: outs = sess.run(outs) - check(outs, log_prob) + check(outs, log_prob, decimals=4) + + def test_beta(self): + input_space = Box(-2.0, 1.0, shape=(200, 10)) + low, high = -1.0, 2.0 + plain_beta_value_space = Box(0.0, 1.0, shape=(200, 5)) + + for fw, sess in framework_iterator(frameworks="torch", session=True): + cls = TorchBeta + inputs = input_space.sample() + beta_distribution = cls(inputs, {}, low=low, high=high) + + inputs = beta_distribution.inputs + alpha, beta_ = np.split(inputs.numpy(), 2, axis=-1) + + # Mean for a Beta distribution: 1 / [1 + (beta/alpha)] + expected = (1.0 / (1.0 + beta_ / alpha)) * (high - low) + low + # Sample n times, expect always mean value (deterministic draw). + out = beta_distribution.deterministic_sample() + check(out, expected, rtol=0.01) + + # Batch of size=n and non-deterministic -> expect roughly the mean. + values = beta_distribution.sample() + if sess: + values = sess.run(values) + else: + values = values.numpy() + self.assertTrue(np.max(values) <= high) + self.assertTrue(np.min(values) >= low) + + check(np.mean(values), expected.mean(), decimals=1) + + # Test log-likelihood outputs (against scipy). + inputs = input_space.sample() + beta_distribution = cls(inputs, {}, low=low, high=high) + inputs = beta_distribution.inputs + alpha, beta_ = np.split(inputs.numpy(), 2, axis=-1) + + values = plain_beta_value_space.sample() + values_scaled = values * (high - low) + low + out = beta_distribution.logp(torch.Tensor(values_scaled)) + check( + out, + np.sum(np.log(beta.pdf(values, alpha, beta_)), -1), + rtol=0.001) + + # TODO(sven): Test entropy outputs (against scipy). def test_gumbel_softmax(self): """Tests the GumbelSoftmax ActionDistribution (tf-eager only).""" diff --git a/rllib/models/tf/fcnet_v1.py b/rllib/models/tf/fcnet_v1.py index d2d182b36..c117bcf45 100644 --- a/rllib/models/tf/fcnet_v1.py +++ b/rllib/models/tf/fcnet_v1.py @@ -1,7 +1,7 @@ from ray.rllib.models.model import Model -from ray.rllib.models.tf.misc import normc_initializer, get_activation_fn +from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.utils.annotations import override -from ray.rllib.utils import try_import_tf +from ray.rllib.utils.framework import get_activation_fn, try_import_tf tf = try_import_tf() diff --git a/rllib/models/tf/fcnet_v2.py b/rllib/models/tf/fcnet_v2.py index 5f69b3b1b..5a1e2dd7b 100644 --- a/rllib/models/tf/fcnet_v2.py +++ b/rllib/models/tf/fcnet_v2.py @@ -1,8 +1,8 @@ import numpy as np +from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.models.tf.misc import normc_initializer, get_activation_fn -from ray.rllib.utils import try_import_tf +from ray.rllib.utils.framework import get_activation_fn, try_import_tf tf = try_import_tf() @@ -23,37 +23,46 @@ class FullyConnectedNetwork(TFModelV2): # we are using obs_flat, so take the flattened shape as input inputs = tf.keras.layers.Input( shape=(np.product(obs_space.shape), ), name="observations") - last_layer = inputs + last_layer = layer_out = inputs i = 1 - if no_final_linear: - # the last layer is adjusted to be of size num_outputs - for size in hiddens[:-1]: - last_layer = tf.keras.layers.Dense( - size, - name="fc_{}".format(i), - activation=activation, - kernel_initializer=normc_initializer(1.0))(last_layer) - i += 1 + # Create layers 0 to second-last. + for size in hiddens[:-1]: + last_layer = tf.keras.layers.Dense( + size, + name="fc_{}".format(i), + activation=activation, + kernel_initializer=normc_initializer(1.0))(last_layer) + i += 1 + + # The last layer is adjusted to be of size num_outputs, but it's a + # layer with activation. + if no_final_linear and self.num_outputs: layer_out = tf.keras.layers.Dense( - num_outputs, + self.num_outputs, name="fc_out", activation=activation, kernel_initializer=normc_initializer(1.0))(last_layer) + # Finish the layers with the provided sizes (`hiddens`), plus - + # iff num_outputs > 0 - a last linear layer of size num_outputs. else: - # the last layer is a linear to size num_outputs - for size in hiddens: + if len(hiddens) > 0: last_layer = tf.keras.layers.Dense( - size, + hiddens[-1], name="fc_{}".format(i), activation=activation, kernel_initializer=normc_initializer(1.0))(last_layer) - i += 1 - layer_out = tf.keras.layers.Dense( - num_outputs, - name="fc_out", - activation=None, - kernel_initializer=normc_initializer(0.01))(last_layer) + if self.num_outputs: + layer_out = tf.keras.layers.Dense( + self.num_outputs, + name="fc_out", + activation=None, + kernel_initializer=normc_initializer(0.01))(last_layer) + # Adjust self.num_outputs to be the number of nodes in the last + # layer. + else: + self.num_outputs = ( + [np.product(obs_space.shape)] + hiddens[-1:-1])[-1] if not vf_share_layers: # build a parallel set of hidden layers for the value net diff --git a/rllib/models/tf/misc.py b/rllib/models/tf/misc.py index 8416f1c15..bddbdeba8 100644 --- a/rllib/models/tf/misc.py +++ b/rllib/models/tf/misc.py @@ -13,12 +13,6 @@ def normc_initializer(std=1.0): return _initializer -def get_activation_fn(name): - if name == "linear": - return None - return getattr(tf.nn, name) - - def conv2d(x, num_filters, name, diff --git a/rllib/models/tf/tf_action_dist.py b/rllib/models/tf/tf_action_dist.py index 892ecf029..16288aecc 100644 --- a/rllib/models/tf/tf_action_dist.py +++ b/rllib/models/tf/tf_action_dist.py @@ -19,6 +19,7 @@ class TFActionDistribution(ActionDistribution): def __init__(self, inputs, model): super().__init__(inputs, model) self.sample_op = self._build_sample_op() + self.sampled_action_logp_op = self.logp(self.sample_op) @DeveloperAPI def _build_sample_op(self): @@ -37,7 +38,7 @@ class TFActionDistribution(ActionDistribution): @override(ActionDistribution) def sampled_action_logp(self): """Returns the log probability of the sampled action.""" - return self.logp(self.sample_op) + return self.sampled_action_logp_op class Categorical(TFActionDistribution): @@ -99,6 +100,7 @@ class MultiCategorical(TFActionDistribution): for input_ in tf.split(inputs, input_lens, axis=1) ] self.sample_op = self._build_sample_op() + self.sampled_action_logp_op = self.logp(self.sample_op) @override(ActionDistribution) def deterministic_sample(self): @@ -270,28 +272,17 @@ class SquashedGaussian(TFActionDistribution): (excluding this value). """ assert tfp is not None - loc, log_scale = tf.split(inputs, 2, axis=-1) + mean, log_std = tf.split(inputs, 2, axis=-1) # Clip `scale` values (coming from NN) to reasonable values. - log_scale = tf.clip_by_value(log_scale, MIN_LOG_NN_OUTPUT, - MAX_LOG_NN_OUTPUT) - scale = tf.exp(log_scale) - self.distr = tfp.distributions.Normal(loc=loc, scale=scale) + log_std = tf.clip_by_value(log_std, MIN_LOG_NN_OUTPUT, + MAX_LOG_NN_OUTPUT) + std = tf.exp(log_std) + self.distr = tfp.distributions.Normal(loc=mean, scale=std) assert np.all(np.less(low, high)) self.low = low self.high = high super().__init__(inputs, model) - @override(TFActionDistribution) - def sampled_action_logp(self): - unsquashed_values = self._unsquash(self.sample_op) - log_prob = tf.reduce_sum( - self.distr.log_prob(unsquashed_values), axis=-1) - unsquashed_values_tanhd = tf.math.tanh(unsquashed_values) - log_prob -= tf.math.reduce_sum( - tf.math.log(1 - unsquashed_values_tanhd**2 + SMALL_NUMBER), - axis=-1) - return log_prob - @override(ActionDistribution) def deterministic_sample(self): mean = self.distr.mean() diff --git a/rllib/models/tf/visionnet_v1.py b/rllib/models/tf/visionnet_v1.py index 35e5a8b14..73c848183 100644 --- a/rllib/models/tf/visionnet_v1.py +++ b/rllib/models/tf/visionnet_v1.py @@ -1,7 +1,7 @@ from ray.rllib.models.model import Model -from ray.rllib.models.tf.misc import get_activation_fn, flatten +from ray.rllib.models.tf.misc import flatten from ray.rllib.utils.annotations import override -from ray.rllib.utils import try_import_tf +from ray.rllib.utils.framework import get_activation_fn, try_import_tf tf = try_import_tf() diff --git a/rllib/models/tf/visionnet_v2.py b/rllib/models/tf/visionnet_v2.py index 729cab291..4887d04b7 100644 --- a/rllib/models/tf/visionnet_v2.py +++ b/rllib/models/tf/visionnet_v2.py @@ -1,7 +1,7 @@ from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.visionnet_v1 import _get_filter_config -from ray.rllib.models.tf.misc import normc_initializer, get_activation_fn -from ray.rllib.utils import try_import_tf +from ray.rllib.models.tf.misc import normc_initializer +from ray.rllib.utils.framework import get_activation_fn, try_import_tf tf = try_import_tf() @@ -33,18 +33,23 @@ class VisionNetwork(TFModelV2): strides=(stride, stride), activation=activation, padding="same", + data_format="channels_last", name="conv{}".format(i))(last_layer) out_size, kernel, stride = filters[-1] + + # No final linear: Last layer is a Conv2D and uses num_outputs. if no_final_linear: - # the last layer is adjusted to be of size num_outputs last_layer = tf.keras.layers.Conv2D( num_outputs, kernel, strides=(stride, stride), activation=activation, padding="valid", + data_format="channels_last", name="conv_out")(last_layer) conv_out = last_layer + # Finish network normally (w/o overriding last layer size with + # `num_outputs`), then add another linear one of size `num_outputs`. else: last_layer = tf.keras.layers.Conv2D( out_size, @@ -52,11 +57,13 @@ class VisionNetwork(TFModelV2): strides=(stride, stride), activation=activation, padding="valid", + data_format="channels_last", name="conv{}".format(i + 1))(last_layer) conv_out = tf.keras.layers.Conv2D( num_outputs, [1, 1], activation=None, padding="same", + data_format="channels_last", name="conv_out")(last_layer) # Build the value layers @@ -78,6 +85,7 @@ class VisionNetwork(TFModelV2): strides=(stride, stride), activation=activation, padding="same", + data_format="channels_last", name="conv_value_{}".format(i))(last_layer) out_size, kernel, stride = filters[-1] last_layer = tf.keras.layers.Conv2D( @@ -86,11 +94,13 @@ class VisionNetwork(TFModelV2): strides=(stride, stride), activation=activation, padding="valid", + data_format="channels_last", name="conv_value_{}".format(i + 1))(last_layer) last_layer = tf.keras.layers.Conv2D( 1, [1, 1], activation=None, padding="same", + data_format="channels_last", name="conv_value_out")(last_layer) value_out = tf.keras.layers.Lambda( lambda x: tf.squeeze(x, axis=[1, 2]))(last_layer) diff --git a/rllib/models/torch/fcnet.py b/rllib/models/torch/fcnet.py index cea59c384..dccfdf212 100644 --- a/rllib/models/torch/fcnet.py +++ b/rllib/models/torch/fcnet.py @@ -2,9 +2,9 @@ import logging import numpy as np from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 -from ray.rllib.models.torch.misc import normc_initializer, SlimFC, \ - _get_activation_fn +from ray.rllib.models.torch.misc import SlimFC, normc_initializer from ray.rllib.utils.annotations import override +from ray.rllib.utils.framework import get_activation_fn from ray.rllib.utils import try_import_torch _, nn = try_import_torch() @@ -21,39 +21,76 @@ class FullyConnectedNetwork(TorchModelV2, nn.Module): model_config, name) nn.Module.__init__(self) + activation = get_activation_fn( + model_config.get("fcnet_activation"), framework="torch") hiddens = model_config.get("fcnet_hiddens") - activation = _get_activation_fn(model_config.get("fcnet_activation")) + no_final_linear = model_config.get("no_final_linear") + + # TODO(sven): implement case: vf_shared_layers = False. + # vf_share_layers = model_config.get("vf_share_layers") + logger.debug("Constructing fcnet {} {}".format(hiddens, activation)) layers = [] - last_layer_size = np.product(obs_space.shape) - for size in hiddens: + prev_layer_size = np.product(obs_space.shape) + self._logits = None + + # Create layers 0 to second-last. + for size in hiddens[:-1]: layers.append( SlimFC( - in_size=last_layer_size, + in_size=prev_layer_size, out_size=size, initializer=normc_initializer(1.0), activation_fn=activation)) - last_layer_size = size + prev_layer_size = size + + # The last layer is adjusted to be of size num_outputs, but it's a + # layer with activation. + if no_final_linear and self.num_outputs: + layers.append( + SlimFC( + in_size=prev_layer_size, + out_size=self.num_outputs, + initializer=normc_initializer(1.0), + activation_fn=activation)) + prev_layer_size = self.num_outputs + # Finish the layers with the provided sizes (`hiddens`), plus - + # iff num_outputs > 0 - a last linear layer of size num_outputs. + else: + if len(hiddens) > 0: + layers.append( + SlimFC( + in_size=prev_layer_size, + out_size=hiddens[-1], + initializer=normc_initializer(1.0), + activation_fn=activation)) + prev_layer_size = hiddens[-1] + if self.num_outputs: + self._logits = SlimFC( + in_size=hiddens[-1], + out_size=self.num_outputs, + initializer=normc_initializer(0.01), + activation_fn=None) + else: + self.num_outputs = ( + [np.product(obs_space.shape)] + hiddens[-1:-1])[-1] self._hidden_layers = nn.Sequential(*layers) - self._logits = SlimFC( - in_size=last_layer_size, - out_size=num_outputs, - initializer=normc_initializer(0.01), - activation_fn=None) + # TODO(sven): Implement non-shared value branch. self._value_branch = SlimFC( - in_size=last_layer_size, + in_size=prev_layer_size, out_size=1, initializer=normc_initializer(1.0), activation_fn=None) + # Holds the current value output. self._cur_value = None @override(TorchModelV2) def forward(self, input_dict, state, seq_lens): obs = input_dict["obs_flat"] features = self._hidden_layers(obs.reshape(obs.shape[0], -1)) - logits = self._logits(features) + logits = self._logits(features) if self._logits else features self._cur_value = self._value_branch(features).squeeze(1) return logits, state diff --git a/rllib/models/torch/misc.py b/rllib/models/torch/misc.py index edec3a148..d86368a80 100644 --- a/rllib/models/torch/misc.py +++ b/rllib/models/torch/misc.py @@ -48,40 +48,35 @@ def valid_padding(in_size, filter_size, stride_size): return padding, output -def _get_activation_fn(name): - if name == "tanh": - return nn.Tanh - elif name == "relu": - return nn.ReLU - elif name == "linear": - return None - else: - raise ValueError("Unknown activation: {}".format(name)) - - class SlimConv2d(nn.Module): """Simple mock of tf.slim Conv2d""" - def __init__(self, - in_channels, - out_channels, - kernel, - stride, - padding, - initializer=nn.init.xavier_uniform_, - activation_fn=nn.ReLU, - bias_init=0): + def __init__( + self, + in_channels, + out_channels, + kernel, + stride, + padding, + # Defaulting these to nn.[..] will break soft torch import. + initializer="default", + activation_fn="default", + bias_init=0): super(SlimConv2d, self).__init__() layers = [] if padding: layers.append(nn.ZeroPad2d(padding)) conv = nn.Conv2d(in_channels, out_channels, kernel, stride) if initializer: + if initializer == "default": + initializer = nn.init.xavier_uniform_ initializer(conv.weight) nn.init.constant_(conv.bias, bias_init) layers.append(conv) if activation_fn: + if activation_fn == "default": + activation_fn = nn.ReLU layers.append(activation_fn()) self._model = nn.Sequential(*layers) @@ -97,7 +92,7 @@ class SlimFC(nn.Module): out_size, initializer=None, activation_fn=None, - bias_init=0): + bias_init=0.0): super(SlimFC, self).__init__() layers = [] linear = nn.Linear(in_size, out_size) diff --git a/rllib/models/torch/torch_action_dist.py b/rllib/models/torch/torch_action_dist.py index 414860fbc..a4210d695 100644 --- a/rllib/models/torch/torch_action_dist.py +++ b/rllib/models/torch/torch_action_dist.py @@ -1,8 +1,12 @@ +from math import log import numpy as np from ray.rllib.models.action_dist import ActionDistribution from ray.rllib.utils.annotations import override from ray.rllib.utils import try_import_torch +from ray.rllib.utils.numpy import SMALL_NUMBER, MIN_LOG_NN_OUTPUT, \ + MAX_LOG_NN_OUTPUT +from ray.rllib.utils.torch_ops import atanh torch, nn = try_import_torch() @@ -159,6 +163,116 @@ class TorchDiagGaussian(TorchDistributionWrapper): return np.prod(action_space.shape) * 2 +class TorchSquashedGaussian(TorchDistributionWrapper): + """A tanh-squashed Gaussian distribution defined by: mean, std, low, high. + + The distribution will never return low or high exactly, but + `low`+SMALL_NUMBER or `high`-SMALL_NUMBER respectively. + """ + + def __init__(self, inputs, model, low=-1.0, high=1.0): + """Parameterizes the distribution via `inputs`. + + Args: + low (float): The lowest possible sampling value + (excluding this value). + high (float): The highest possible sampling value + (excluding this value). + """ + super().__init__(inputs, model) + # Split inputs into mean and log(std). + mean, log_std = torch.chunk(self.inputs, 2, dim=-1) + # Clip `scale` values (coming from NN) to reasonable values. + log_std = torch.clamp(log_std, MIN_LOG_NN_OUTPUT, MAX_LOG_NN_OUTPUT) + std = torch.exp(log_std) + self.dist = torch.distributions.normal.Normal(mean, std) + assert np.all(np.less(low, high)) + self.low = low + self.high = high + + @override(ActionDistribution) + def deterministic_sample(self): + self.last_sample = self._squash(self.dist.mean) + return self.last_sample + + @override(TorchDistributionWrapper) + def sample(self): + # Use the reparameterization version of `dist.sample` to allow for + # the results to be backprop'able e.g. in a loss term. + normal_sample = self.dist.rsample() + self.last_sample = self._squash(normal_sample) + return self.last_sample + + @override(ActionDistribution) + def logp(self, x): + unsquashed_values = self._unsquash(x) + log_prob = torch.sum(self.dist.log_prob(unsquashed_values), dim=-1) + unsquashed_values_tanhd = torch.tanh(unsquashed_values) + log_prob -= torch.sum( + torch.log(1 - unsquashed_values_tanhd**2 + SMALL_NUMBER), dim=-1) + return log_prob + + def _squash(self, raw_values): + # Make sure raw_values are not too high/low (such that tanh would + # return exactly 1.0/-1.0, which would lead to +/-inf log-probs). + return (torch.clamp( + torch.tanh(raw_values), + -1.0 + SMALL_NUMBER, + 1.0 - SMALL_NUMBER) + 1.0) / 2.0 * (self.high - self.low) + \ + self.low + + def _unsquash(self, values): + return atanh((values - self.low) / (self.high - self.low) * 2.0 - 1.0) + + +class TorchBeta(TorchDistributionWrapper): + """ + A Beta distribution is defined on the interval [0, 1] and parameterized by + shape parameters alpha and beta (also called concentration parameters). + + PDF(x; alpha, beta) = x**(alpha - 1) (1 - x)**(beta - 1) / Z + with Z = Gamma(alpha) Gamma(beta) / Gamma(alpha + beta) + and Gamma(n) = (n - 1)! + """ + + def __init__(self, inputs, model, low=0.0, high=1.0): + super().__init__(inputs, model) + # Stabilize input parameters (possibly coming from a linear layer). + self.inputs = torch.clamp(self.inputs, log(SMALL_NUMBER), + -log(SMALL_NUMBER)) + self.inputs = torch.log(torch.exp(self.inputs) + 1.0) + 1.0 + self.low = low + self.high = high + alpha, beta = torch.chunk(self.inputs, 2, dim=-1) + # Note: concentration0==beta, concentration1=alpha (!) + self.dist = torch.distributions.Beta( + concentration1=alpha, concentration0=beta) + + @override(ActionDistribution) + def deterministic_sample(self): + self.last_sample = self._squash(self.dist.mean) + return self.last_sample + + @override(TorchDistributionWrapper) + def sample(self): + # Use the reparameterization version of `dist.sample` to allow for + # the results to be backprop'able e.g. in a loss term. + normal_sample = self.dist.rsample() + self.last_sample = self._squash(normal_sample) + return self.last_sample + + @override(ActionDistribution) + def logp(self, x): + unsquashed_values = self._unsquash(x) + return torch.sum(self.dist.log_prob(unsquashed_values), dim=-1) + + def _squash(self, raw_values): + return raw_values * (self.high - self.low) + self.low + + def _unsquash(self, values): + return (values - self.low) / (self.high - self.low) + + class TorchDeterministic(TorchDistributionWrapper): """Action distribution that returns the input values directly. diff --git a/rllib/models/torch/visionnet.py b/rllib/models/torch/visionnet.py index dc9c53321..7dd55b4b5 100644 --- a/rllib/models/torch/visionnet.py +++ b/rllib/models/torch/visionnet.py @@ -3,6 +3,7 @@ from ray.rllib.models.torch.misc import normc_initializer, valid_padding, \ SlimConv2d, SlimFC from ray.rllib.models.tf.visionnet_v1 import _get_filter_config from ray.rllib.utils.annotations import override +from ray.rllib.utils.framework import get_activation_fn from ray.rllib.utils import try_import_torch _, nn = try_import_torch() @@ -17,24 +18,40 @@ class VisionNetwork(TorchModelV2, nn.Module): model_config, name) nn.Module.__init__(self) + activation = get_activation_fn( + model_config.get("conv_activation"), framework="torch") filters = model_config.get("conv_filters") if not filters: filters = _get_filter_config(obs_space.shape) - layers = [] + # no_final_linear = model_config.get("no_final_linear") + # vf_share_layers = model_config.get("vf_share_layers") + layers = [] (w, h, in_channels) = obs_space.shape in_size = [w, h] for out_channels, kernel, stride in filters[:-1]: padding, out_size = valid_padding(in_size, kernel, [stride, stride]) layers.append( - SlimConv2d(in_channels, out_channels, kernel, stride, padding)) + SlimConv2d( + in_channels, + out_channels, + kernel, + stride, + padding, + activation_fn=activation)) in_channels = out_channels in_size = out_size out_channels, kernel, stride = filters[-1] layers.append( - SlimConv2d(in_channels, out_channels, kernel, stride, None)) + SlimConv2d( + in_channels, + out_channels, + kernel, + stride, + None, + activation_fn=activation)) self._convs = nn.Sequential(*layers) self._logits = SlimFC( diff --git a/rllib/optimizers/sync_replay_optimizer.py b/rllib/optimizers/sync_replay_optimizer.py index 46faa0769..1d8304350 100644 --- a/rllib/optimizers/sync_replay_optimizer.py +++ b/rllib/optimizers/sync_replay_optimizer.py @@ -101,6 +101,13 @@ class SyncReplayOptimizer(PolicyOptimizer): logger.warning("buffer_size={} < replay_starts={}".format( buffer_size, self.replay_starts)) + # If set, will use this batch for stepping/updating, instead of + # sampling from the replay buffer. Actual sampling from the env + # (and adding collected experiences to the replay will still happen + # normally). + # After self.step(), self.fake_batch must be set again. + self._fake_batch = None + @override(PolicyOptimizer) def step(self): with self.update_weights_timer: @@ -156,7 +163,13 @@ class SyncReplayOptimizer(PolicyOptimizer): }) def _optimize(self): - samples = self._replay() + if self._fake_batch: + fake_batch = SampleBatch(self._fake_batch) + samples = MultiAgentBatch({ + DEFAULT_POLICY_ID: fake_batch + }, fake_batch.count) + else: + samples = self._replay() with self.grad_timer: if self.before_learn_on_batch: @@ -173,8 +186,8 @@ class SyncReplayOptimizer(PolicyOptimizer): # torch/tf. Clean up these results/info dicts across # policies (note: fixing this in torch_policy.py will # break e.g. DDPPO!). - td_error = info.get( - "td_error", info["learner_stats"].get("td_error")) + td_error = info.get("td_error", + info["learner_stats"].get("td_error")) new_priorities = ( np.abs(td_error) + self.prioritized_replay_eps) replay_buffer.update_priorities( diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index 9853f066d..bf5f8ca70 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -228,10 +228,10 @@ class DynamicTFPolicy(TFPolicy): batch_divisibility_req = 1 super().__init__( - obs_space, - action_space, - config, - sess, + observation_space=obs_space, + action_space=action_space, + config=config, + sess=sess, obs_input=obs, action_input=action_input, # for logp calculations sampled_action=sampled_action, diff --git a/rllib/policy/tests/test_compute_log_likelihoods.py b/rllib/policy/tests/test_compute_log_likelihoods.py index 04951f978..0e5b13bb8 100644 --- a/rllib/policy/tests/test_compute_log_likelihoods.py +++ b/rllib/policy/tests/test_compute_log_likelihoods.py @@ -18,7 +18,8 @@ def do_test_log_likelihood(run, config, prev_a=None, continuous=False, - layer_key=("fc", (0, 4)), + layer_key=("fc", (0, 4), ("_hidden_layers.0.", + "_logits.")), logp_func=None): config = config.copy() # Run locally. @@ -37,7 +38,7 @@ def do_test_log_likelihood(run, # Test against all frameworks. for fw in framework_iterator(config): - if run in [dqn.DQNTrainer, sac.SACTrainer] and fw == "torch": + if run in [sac.SACTrainer] and fw == "eager": continue trainer = run(config=config, env=env) @@ -76,11 +77,11 @@ def do_test_log_likelihood(run, layer_key[0])]) else: expected_mean_logstd = fc( - fc( - obs_batch, - np.transpose( - vars["_hidden_layers.0._model.0.weight"])), - np.transpose(vars["_logits._model.0.weight"])) + fc(obs_batch, + vars["{}_model.0.weight".format(layer_key[2][0])], + framework=fw), + vars["{}_model.0.weight".format(layer_key[2][1])], + framework=fw) mean, log_std = np.split(expected_mean_logstd, 2, axis=-1) if logp_func is None: expected_logp = np.log(norm.pdf(a, mean, np.exp(log_std))) @@ -124,7 +125,7 @@ class TestComputeLogLikelihood(unittest.TestCase): config, prev_a, continuous=True, - layer_key=("fc", (0, 2))) + layer_key=("fc", (0, 2), ("_hidden_layers.0.", "_logits."))) def test_pg_discr(self): """Tests PG's (cont. actions) compute_log_likelihoods method.""" @@ -148,8 +149,8 @@ class TestComputeLogLikelihood(unittest.TestCase): def test_sac_cont(self): """Tests SAC's (cont. actions) compute_log_likelihoods method.""" config = sac.DEFAULT_CONFIG.copy() - config["policy_model"]["hidden_layer_sizes"] = [10] - config["policy_model"]["hidden_activation"] = "linear" + config["policy_model"]["fcnet_hiddens"] = [10] + config["policy_model"]["fcnet_activation"] = "linear" prev_a = np.array([0.0]) # SAC cont uses a squashed normal distribution. Implement it's logp @@ -170,21 +171,23 @@ class TestComputeLogLikelihood(unittest.TestCase): config, prev_a, continuous=True, - layer_key=("sequential/action", (0, 2)), + layer_key=("sequential/action", (0, 2), + ("action_model.action_0.", "action_model.action_out.")), logp_func=logp_func) def test_sac_discr(self): """Tests SAC's (discrete actions) compute_log_likelihoods method.""" config = sac.DEFAULT_CONFIG.copy() - config["policy_model"]["hidden_layer_sizes"] = [10] - config["policy_model"]["hidden_activation"] = "linear" + config["policy_model"]["fcnet_hiddens"] = [10] + config["policy_model"]["fcnet_activation"] = "linear" prev_a = np.array(0) do_test_log_likelihood( sac.SACTrainer, config, prev_a, - layer_key=("sequential/action", (0, 2))) + layer_key=("sequential/action", (0, 2), + ("action_model.action_0.", "action_model.action_out."))) if __name__ == "__main__": diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index cc77ab76f..861654b0a 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -4,6 +4,7 @@ import time from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.rnn_sequencing import pad_batch_to_sequences_of_same_size +from ray.rllib.utils import force_list from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.framework import try_import_torch from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule @@ -79,7 +80,7 @@ class TorchPolicy(Policy): self.exploration = self._create_exploration() self.unwrapped_model = model # used to support DistributedDataParallel self._loss = loss - self._optimizer = self.optimizer() + self._optimizers = force_list(self.optimizer()) self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn @@ -203,10 +204,11 @@ class TorchPolicy(Policy): # Action dist class and inputs are generated via custom function. if self.action_distribution_fn: dist_inputs, dist_class, _ = self.action_distribution_fn( - self, - self.model, - input_dict[SampleBatch.CUR_OBS], - explore=False) + policy=self, + model=self.model, + obs_batch=input_dict[SampleBatch.CUR_OBS], + explore=False, + is_training=False) # Default action-dist inputs calculation. else: dist_class = self.dist_class @@ -227,57 +229,73 @@ class TorchPolicy(Policy): batch_divisibility_req=self.batch_divisibility_req) train_batch = self._lazy_tensor_dict(postprocessed_batch) - loss_out = self._loss(self, self.model, self.dist_class, train_batch) - self._optimizer.zero_grad() - loss_out.backward() + loss_out = force_list( + self._loss(self, self.model, self.dist_class, train_batch)) + assert len(loss_out) == len(self._optimizers) - info = {} - info.update(self.extra_grad_process()) + # Loop through all optimizers. + grad_info = {"allreduce_latency": 0.0} + for i, opt in enumerate(self._optimizers): + # Erase gradients in all vars of this optimizer. + opt.zero_grad() + # Recompute gradients of loss over all variables. + loss_out[i].backward(retain_graph=(i < len(self._optimizers) - 1)) - if self.distributed_world_size: - grads = [] - for p in self.model.parameters(): - if p.grad is not None: - grads.append(p.grad) - start = time.time() - if torch.cuda.is_available(): - # Sadly, allreduce_coalesced does not work with CUDA yet. - for g in grads: - torch.distributed.all_reduce( - g, op=torch.distributed.ReduceOp.SUM) - else: - torch.distributed.all_reduce_coalesced( - grads, op=torch.distributed.ReduceOp.SUM) - for p in self.model.parameters(): - if p.grad is not None: - p.grad /= self.distributed_world_size - info["allreduce_latency"] = time.time() - start + grad_info.update(self.extra_grad_process(opt, loss_out[i])) - self._optimizer.step() + if self.distributed_world_size: + grads = [] + for param_group in opt.param_groups: + for p in param_group["params"]: + if p.grad is not None: + grads.append(p.grad) - info.update(self.extra_grad_info(train_batch)) - return { - LEARNER_STATS_KEY: info - } + start = time.time() + if torch.cuda.is_available(): + # Sadly, allreduce_coalesced does not work with CUDA yet. + for g in grads: + torch.distributed.all_reduce( + g, op=torch.distributed.ReduceOp.SUM) + else: + torch.distributed.all_reduce_coalesced( + grads, op=torch.distributed.ReduceOp.SUM) + + for param_group in opt.param_groups: + for p in param_group["params"]: + if p.grad is not None: + p.grad /= self.distributed_world_size + + grad_info["allreduce_latency"] += time.time() - start + + # Step the optimizer. + opt.step() + + grad_info["allreduce_latency"] /= len(self._optimizers) + grad_info.update(self.extra_grad_info(train_batch)) + return {LEARNER_STATS_KEY: grad_info} @override(Policy) def compute_gradients(self, postprocessed_batch): train_batch = self._lazy_tensor_dict(postprocessed_batch) + loss_out = force_list( + self._loss(self, self.model, self.dist_class, train_batch)) + assert len(loss_out) == len(self._optimizers) - loss_out = self._loss(self, self.model, self.dist_class, train_batch) - self._optimizer.zero_grad() - loss_out.backward() - - grad_process_info = self.extra_grad_process() - - # Note that return values are just references; - # calling zero_grad will modify the values + grad_process_info = {} grads = [] - for p in self.model.parameters(): - if p.grad is not None: - grads.append(p.grad.data.cpu().numpy()) - else: - grads.append(None) + for i, opt in enumerate(self._optimizers): + opt.zero_grad() + loss_out[i].backward() + grad_process_info = self.extra_grad_process(opt, loss_out[i]) + + # Note that return values are just references; + # calling zero_grad will modify the values + for param_group in opt.param_groups: + for p in param_group["params"]: + if p.grad is not None: + grads.append(p.grad.data.cpu().numpy()) + else: + grads.append(None) grad_info = self.extra_grad_info(train_batch) grad_info.update(grad_process_info) @@ -285,10 +303,13 @@ class TorchPolicy(Policy): @override(Policy) def apply_gradients(self, gradients): + # TODO(sven): Not supported for multiple optimizers yet. + assert len(self._optimizers) == 1 for g, p in zip(gradients, self.model.parameters()): if g is not None: p.grad = torch.from_numpy(g).to(self.device) - self._optimizer.step() + + self._optimizers[0].step() @override(Policy) def get_weights(self): @@ -314,9 +335,20 @@ class TorchPolicy(Policy): def get_initial_state(self): return [s.numpy() for s in self.model.get_initial_state()] - def extra_grad_process(self): - """Allow subclass to do extra processing on gradients and - return processing info.""" + def extra_grad_process(self, optimizer, loss): + """Called after each optimizer.zero_grad() + loss.backward() call. + + Called for each self._optimizers/loss-value pair. + Allows for gradient processing before optimizer.step() is called. + E.g. for gradient clipping. + + Args: + optimizer (torch.optim.Optimizer): A torch optimizer object. + loss (torch.Tensor): The loss tensor associated with the optimizer. + + Returns: + dict: An info dict. + """ return {} def extra_action_out(self, input_dict, state_batches, model, action_dist): @@ -394,9 +426,10 @@ class LearningRateSchedule: @override(TorchPolicy) def optimizer(self): - for p in self._optimizer.param_groups: - p["lr"] = self.cur_lr - return self._optimizer + for opt in self._optimizers: + for p in opt.param_groups: + p["lr"] = self.cur_lr + return self._optimizers @DeveloperAPI diff --git a/rllib/policy/torch_policy_template.py b/rllib/policy/torch_policy_template.py index 65320835d..fa8ba5c84 100644 --- a/rllib/policy/torch_policy_template.py +++ b/rllib/policy/torch_policy_template.py @@ -25,6 +25,7 @@ def build_torch_policy(name, action_sampler_fn=None, action_distribution_fn=None, make_model_and_action_dist=None, + apply_gradients_fn=None, mixins=None, get_batch_divisibility_req=None): """Helper function for creating a torch policy at runtime. @@ -59,6 +60,8 @@ def build_torch_policy(name, arguments as policy init and returns a tuple of model instance and torch action distribution class. If not specified, the default model and action dist from the catalog will be used + apply_gradients_fn (Optional[callable]): An optional callable that + takes a grads list and applies these to the Model's parameters. mixins (list): list of any class mixins for the returned policy class. These mixins will be applied in order and will have higher precedence than the TorchPolicy class @@ -101,9 +104,9 @@ def build_torch_policy(name, TorchPolicy.__init__( self, - obs_space, - action_space, - config, + observation_space=obs_space, + action_space=action_space, + config=config, model=self.model, loss=loss_fn, action_distribution_class=dist_class, @@ -135,11 +138,23 @@ def build_torch_policy(name, return sample_batch @override(TorchPolicy) - def extra_grad_process(self): + def extra_grad_process(self, optimizer, loss): + """Called after optimizer.zero_grad() and loss.backward() calls. + + Allows for gradient processing before optimizer.step() is called. + E.g. for gradient clipping. + """ if extra_grad_process_fn: - return extra_grad_process_fn(self) + return extra_grad_process_fn(self, optimizer, loss) else: - return TorchPolicy.extra_grad_process(self) + return TorchPolicy.extra_grad_process(self, optimizer, loss) + + @override(TorchPolicy) + def apply_gradients(self, gradients): + if apply_gradients_fn: + apply_gradients_fn(self, gradients) + else: + TorchPolicy.apply_gradients(self, gradients) @override(TorchPolicy) def extra_action_out(self, input_dict, state_batches, model, diff --git a/rllib/tests/test_catalog.py b/rllib/tests/test_catalog.py index c5ac6cee5..1a2b33533 100644 --- a/rllib/tests/test_catalog.py +++ b/rllib/tests/test_catalog.py @@ -4,14 +4,15 @@ import numpy as np import unittest import ray -from ray.rllib.models import ModelCatalog, MODEL_DEFAULTS +from ray.rllib.models import ModelCatalog, MODEL_DEFAULTS, ActionDistribution from ray.rllib.models.model import Model from ray.rllib.models.tf.tf_action_dist import TFActionDistribution from ray.rllib.models.preprocessors import (NoPreprocessor, OneHotPreprocessor, Preprocessor) from ray.rllib.models.tf.fcnet_v1 import FullyConnectedNetwork from ray.rllib.models.tf.visionnet_v1 import VisionNetwork -from ray.rllib.utils import try_import_tf +from ray.rllib.utils.annotations import override +from ray.rllib.utils.framework import try_import_tf tf = try_import_tf() @@ -32,6 +33,16 @@ class CustomModel(Model): class CustomActionDistribution(TFActionDistribution): + def __init__(self, inputs, model): + # Store our output shape. + custom_options = model.model_config["custom_options"] + if "output_dim" in custom_options: + self.output_shape = tf.concat( + [tf.shape(inputs)[:1], custom_options["output_dim"]], axis=0) + else: + self.output_shape = tf.shape(inputs) + super().__init__(inputs, model) + @staticmethod def required_model_output_shape(action_space, model_config=None): custom_options = model_config["custom_options"] or {} @@ -39,15 +50,13 @@ class CustomActionDistribution(TFActionDistribution): return custom_options.get("output_dim") return action_space.shape + @override(TFActionDistribution) def _build_sample_op(self): - custom_options = self.model.model_config["custom_options"] - if "output_dim" in custom_options: - output_shape = tf.concat( - [tf.shape(self.inputs)[:1], custom_options["output_dim"]], - axis=0) - else: - output_shape = tf.shape(self.inputs) - return tf.random_uniform(output_shape) + return tf.random_uniform(self.output_shape) + + @override(ActionDistribution) + def logp(self, x): + return tf.zeros(self.output_shape) class ModelCatalogTest(unittest.TestCase): diff --git a/rllib/tests/test_dependency_torch.py b/rllib/tests/test_dependency_torch.py index 59198bdc1..4dd89be35 100755 --- a/rllib/tests/test_dependency_torch.py +++ b/rllib/tests/test_dependency_torch.py @@ -20,3 +20,6 @@ if __name__ == "__main__": trainer.train() assert "torch" not in sys.modules, "Torch should not be imported" + + # Clean up. + del os.environ["RLLIB_TEST_NO_TORCH_IMPORT"] diff --git a/rllib/tuned_examples/atari-sac.yaml b/rllib/tuned_examples/atari-sac.yaml new file mode 100644 index 000000000..e87594f7b --- /dev/null +++ b/rllib/tuned_examples/atari-sac.yaml @@ -0,0 +1,48 @@ +# Run e.g. on a g3.16xlarge (4 GPUs) with `num_gpus=1` (1 for each trial; +# MsPacman torch + tf; Pong torch + tf). +# Uses the hyperparameters published in [2] (see rllib/agents/sac/README.md). +atari-sac-tf-and-torch: + env: + grid_search: + - MsPacmanNoFrameskip-v4 + - PongNoFrameskip-v4 + run: SAC + stop: + timesteps_total: 20000000 + config: + # Works for both torch and tf. + use_pytorch: + grid_search: [false, true] + gamma: 0.99 + # state-preprocessor=Our default Atari Conv2D-net. + use_state_preprocessor: true + Q_model: + hidden_activation: relu + hidden_layer_sizes: [512] + policy_model: + hidden_activation: relu + hidden_layer_sizes: [512] + # Do hard syncs. + # Soft-syncs seem to work less reliably for discrete action spaces. + tau: 1.0 + target_network_update_freq: 8000 + # auto = 0.98 * -log(1/|A|) + target_entropy: auto + clip_rewards: 1.0 + no_done_at_end: False + n_step: 1 + rollout_fragment_length: 1 + prioritized_replay: true + train_batch_size: 64 + timesteps_per_iteration: 4 + # Paper uses 20k random timesteps, which is not exactly the same, but + # seems to work nevertheless. We use 100k here for the longer Atari + # runs (DQN style: filling up the buffer a bit before learning). + learning_starts: 100000 + optimization: + actor_learning_rate: 0.0003 + critic_learning_rate: 0.0003 + entropy_learning_rate: 0.0003 + num_workers: 0 + num_gpus: 1 + metrics_smoothing_episodes: 5 diff --git a/rllib/tuned_examples/halfcheetah-sac.yaml b/rllib/tuned_examples/halfcheetah-sac.yaml index fe05eb5ef..56824baf4 100644 --- a/rllib/tuned_examples/halfcheetah-sac.yaml +++ b/rllib/tuned_examples/halfcheetah-sac.yaml @@ -6,19 +6,19 @@ halfcheetah_sac: episode_reward_mean: 9000 config: horizon: 1000 - soft_horizon: False + soft_horizon: false Q_model: - hidden_activation: relu - hidden_layer_sizes: [256, 256] + fcnet_activation: relu + fcnet_hiddens: [256, 256] policy_model: - hidden_activation: relu - hidden_layer_sizes: [256, 256] + fcnet_activation: relu + fcnet_hiddens: [256, 256] tau: 0.005 target_entropy: auto - no_done_at_end: True + no_done_at_end: true n_step: 1 rollout_fragment_length: 1 - prioritized_replay: False + prioritized_replay: true train_batch_size: 256 target_network_update_freq: 1 timesteps_per_iteration: 1000 @@ -29,8 +29,8 @@ halfcheetah_sac: entropy_learning_rate: 0.0003 num_workers: 0 num_gpus: 0 - clip_actions: False - normalize_actions: True + clip_actions: false + normalize_actions: true evaluation_interval: 1 metrics_smoothing_episodes: 5 diff --git a/rllib/tuned_examples/mspacman-sac.yaml b/rllib/tuned_examples/mspacman-sac.yaml new file mode 100644 index 000000000..4b70aa4f5 --- /dev/null +++ b/rllib/tuned_examples/mspacman-sac.yaml @@ -0,0 +1,43 @@ +# Our implementation of SAC discrete can reach up +# to ~750 reward in 40k timesteps. Run e.g. on a g3.4xlarge with `num_gpus=1`. +# Uses the hyperparameters published in [2] (see rllib/agents/sac/README.md). +mspacman-sac-tf: + env: MsPacmanNoFrameskip-v4 + run: SAC + stop: + episode_reward_mean: 800 + timesteps_total: 100000 + config: + use_pytorch: false + gamma: 0.99 + # state-preprocessor=Our default Atari Conv2D-net. + use_state_preprocessor: true + Q_model: + hidden_activation: relu + hidden_layer_sizes: [512] + policy_model: + hidden_activation: relu + hidden_layer_sizes: [512] + # Do hard syncs. + # Soft-syncs seem to work less reliably for discrete action spaces. + tau: 1.0 + target_network_update_freq: 8000 + # paper uses: 0.98 * -log(1/|A|) + target_entropy: 1.755 + clip_rewards: 1.0 + no_done_at_end: False + n_step: 1 + rollout_fragment_length: 1 + prioritized_replay: true + train_batch_size: 64 + timesteps_per_iteration: 4 + # Paper uses 20k random timesteps, which is not exactly the same, but + # seems to work nevertheless. + learning_starts: 20000 + optimization: + actor_learning_rate: 0.0003 + critic_learning_rate: 0.0003 + entropy_learning_rate: 0.0003 + num_workers: 0 + num_gpus: 0 + metrics_smoothing_episodes: 5 diff --git a/rllib/tuned_examples/pendulum-sac.yaml b/rllib/tuned_examples/pendulum-sac.yaml index 85442f9ee..6d2d7296c 100644 --- a/rllib/tuned_examples/pendulum-sac.yaml +++ b/rllib/tuned_examples/pendulum-sac.yaml @@ -1,36 +1,36 @@ -# Pendulum SAC can attain -150+ reward in 6-7k -# Configurations are the similar to original softlearning/sac codebase -pendulum_sac: - env: Pendulum-v0 - run: SAC - stop: - episode_reward_mean: -150 - config: - horizon: 200 - soft_horizon: False - Q_model: - hidden_activation: relu - hidden_layer_sizes: [256, 256] - policy_model: - hidden_activation: relu - hidden_layer_sizes: [256, 256] - tau: 0.005 - target_entropy: auto - no_done_at_end: True - n_step: 1 - rollout_fragment_length: 1 - prioritized_replay: False - train_batch_size: 256 - target_network_update_freq: 1 - timesteps_per_iteration: 1000 - learning_starts: 256 - optimization: - actor_learning_rate: 0.0003 - critic_learning_rate: 0.0003 - entropy_learning_rate: 0.0003 - num_workers: 0 - num_gpus: 0 - clip_actions: False - normalize_actions: True - evaluation_interval: 1 - metrics_smoothing_episodes: 5 +# Pendulum SAC can attain -150+ reward in 6-7k +# Configurations are the similar to original softlearning/sac codebase +pendulum_sac: + env: Pendulum-v0 + run: SAC + stop: + episode_reward_mean: -150 + config: + horizon: 200 + soft_horizon: False + Q_model: + fcnet_activation: relu + fcnet_hiddens: [256, 256] + policy_model: + fcnet_activation: relu + fcnet_hiddens: [256, 256] + tau: 0.005 + target_entropy: auto + no_done_at_end: True + n_step: 1 + rollout_fragment_length: 1 + prioritized_replay: False + train_batch_size: 256 + target_network_update_freq: 1 + timesteps_per_iteration: 1000 + learning_starts: 256 + optimization: + actor_learning_rate: 0.0003 + critic_learning_rate: 0.0003 + entropy_learning_rate: 0.0003 + num_workers: 0 + num_gpus: 0 + clip_actions: False + normalize_actions: True + evaluation_interval: 1 + metrics_smoothing_episodes: 5 diff --git a/rllib/tuned_examples/regression_tests/cartpole-sac.yaml b/rllib/tuned_examples/regression_tests/cartpole-sac-tf.yaml similarity index 76% rename from rllib/tuned_examples/regression_tests/cartpole-sac.yaml rename to rllib/tuned_examples/regression_tests/cartpole-sac-tf.yaml index bfb35eb88..8a9c030d4 100644 --- a/rllib/tuned_examples/regression_tests/cartpole-sac.yaml +++ b/rllib/tuned_examples/regression_tests/cartpole-sac-tf.yaml @@ -1,10 +1,11 @@ -cartpole-sac: +cartpole-sac-tf: env: CartPole-v0 run: SAC stop: episode_reward_mean: 150 - timesteps_total: 100000 + timesteps_total: 50000 config: + use_pytorch: false gamma: 0.95 no_done_at_end: false target_network_update_freq: 32 @@ -16,5 +17,5 @@ cartpole-sac: critic_learning_rate: 0.005 entropy_learning_rate: 0.0001 # grad_norm_clipping: 40.0 - evaluation_config: - explore: true + # evaluation_config: + # explore: true diff --git a/rllib/tuned_examples/regression_tests/cartpole-sac-torch.yaml b/rllib/tuned_examples/regression_tests/cartpole-sac-torch.yaml new file mode 100644 index 000000000..1f7259bf8 --- /dev/null +++ b/rllib/tuned_examples/regression_tests/cartpole-sac-torch.yaml @@ -0,0 +1,17 @@ +cartpole-sac-torch: + env: CartPole-v0 + run: SAC + stop: + episode_reward_mean: 150 + timesteps_total: 50000 + config: + use_pytorch: true + gamma: 0.95 + no_done_at_end: false + target_network_update_freq: 32 + tau: 1.0 + train_batch_size: 32 + optimization: + actor_learning_rate: 0.005 + critic_learning_rate: 0.005 + entropy_learning_rate: 0.0001 diff --git a/rllib/tuned_examples/regression_tests/pendulum-sac.yaml b/rllib/tuned_examples/regression_tests/pendulum-sac-tf.yaml similarity index 55% rename from rllib/tuned_examples/regression_tests/pendulum-sac.yaml rename to rllib/tuned_examples/regression_tests/pendulum-sac-tf.yaml index 8f2a17050..14230cf53 100644 --- a/rllib/tuned_examples/regression_tests/pendulum-sac.yaml +++ b/rllib/tuned_examples/regression_tests/pendulum-sac-tf.yaml @@ -1,12 +1,13 @@ -pendulum-sac: +pendulum-sac-tf: env: Pendulum-v0 run: SAC stop: episode_reward_mean: -300 # note that evaluation perf is higher timesteps_total: 10000 config: - soft_horizon: True - clip_actions: False - normalize_actions: True + use_pytorch: false + soft_horizon: true + clip_actions: false + normalize_actions: true metrics_smoothing_episodes: 5 - no_done_at_end: True + no_done_at_end: true diff --git a/rllib/tuned_examples/regression_tests/pendulum-sac-torch.yaml b/rllib/tuned_examples/regression_tests/pendulum-sac-torch.yaml new file mode 100644 index 000000000..eddb06357 --- /dev/null +++ b/rllib/tuned_examples/regression_tests/pendulum-sac-torch.yaml @@ -0,0 +1,13 @@ +pendulum-sac-torch: + env: Pendulum-v0 + run: SAC + stop: + episode_reward_mean: -300 # note that evaluation perf is higher + timesteps_total: 10000 + config: + use_pytorch: true + soft_horizon: true + clip_actions: false + normalize_actions: true + metrics_smoothing_episodes: 5 + no_done_at_end: true diff --git a/rllib/utils/exploration/epsilon_greedy.py b/rllib/utils/exploration/epsilon_greedy.py index 4b038ee43..3b1d286c6 100644 --- a/rllib/utils/exploration/epsilon_greedy.py +++ b/rllib/utils/exploration/epsilon_greedy.py @@ -140,7 +140,8 @@ class EpsilonGreedy(Exploration): torch.multinomial(random_valid_action_logits, 1), axis=1) # Pick either random or greedy. action = torch.where( - torch.empty((batch_size, )).uniform_() < epsilon, + torch.empty( + (batch_size, )).uniform_().to(self.device) < epsilon, random_actions, exploit_action) return action, action_logp diff --git a/rllib/utils/exploration/exploration.py b/rllib/utils/exploration/exploration.py index c31172e39..50d08b9eb 100644 --- a/rllib/utils/exploration/exploration.py +++ b/rllib/utils/exploration/exploration.py @@ -1,13 +1,13 @@ from gym.spaces import Space from typing import Union -from ray.rllib.utils.framework import check_framework, try_import_tf, \ +from ray.rllib.utils.framework import check_framework, try_import_torch, \ TensorType from ray.rllib.models.action_dist import ActionDistribution from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils.annotations import DeveloperAPI -tf = try_import_tf() +torch, nn = try_import_torch() @DeveloperAPI @@ -37,6 +37,10 @@ class Exploration: self.num_workers = num_workers self.worker_index = worker_index self.framework = check_framework(framework) + # The device on which the Model has been placed. + # This Exploration will be on the same device. + self.device = None if not isinstance(self.model, nn.Module) else \ + next(self.model.parameters()).device @DeveloperAPI def before_compute_actions(self, diff --git a/rllib/utils/exploration/gaussian_noise.py b/rllib/utils/exploration/gaussian_noise.py index deaf68d99..50c98f579 100644 --- a/rllib/utils/exploration/gaussian_noise.py +++ b/rllib/utils/exploration/gaussian_noise.py @@ -136,13 +136,13 @@ class GaussianNoise(Exploration): if self.last_timestep <= self.random_timesteps: action, _ = \ self.random_exploration.get_torch_exploration_action( - action_dist, True) + action_dist, explore=True) # Take a Gaussian sample with our stddev (mean=0.0) and scale it. else: det_actions = action_dist.deterministic_sample() scale = self.scale_schedule(self.last_timestep) gaussian_sample = scale * torch.normal( - mean=0.0, stddev=self.stddev, size=det_actions.size()) + mean=torch.zeros(det_actions.size()), std=self.stddev) action = torch.clamp( det_actions + gaussian_sample, self.action_space.low * torch.ones_like(det_actions), @@ -152,7 +152,8 @@ class GaussianNoise(Exploration): action = action_dist.deterministic_sample() # Logp=always zero. - logp = torch.zeros(shape=(action.size()[0], ), dtype=torch.float32) + logp = torch.zeros( + (action.size()[0], ), dtype=torch.float32, device=self.device) return action, logp diff --git a/rllib/utils/exploration/ornstein_uhlenbeck_noise.py b/rllib/utils/exploration/ornstein_uhlenbeck_noise.py index 7b582eef2..877390a1c 100644 --- a/rllib/utils/exploration/ornstein_uhlenbeck_noise.py +++ b/rllib/utils/exploration/ornstein_uhlenbeck_noise.py @@ -1,3 +1,5 @@ +import numpy as np + from ray.rllib.utils.annotations import override from ray.rllib.utils.exploration.gaussian_noise import GaussianNoise from ray.rllib.utils.framework import try_import_tf, try_import_torch, \ @@ -73,9 +75,11 @@ class OrnsteinUhlenbeckNoise(GaussianNoise): # The current OU-state value (gets updated each time, an eploration # action is computed). self.ou_state = get_variable( - self.action_space.low.size * [.0], + np.array(self.action_space.low.size * [.0], dtype=np.float32), framework=self.framework, - tf_name="ou_state") + tf_name="ou_state", + torch_tensor=True, + device=self.device) @override(GaussianNoise) def _get_tf_exploration_action_op(self, action_dist, explore, timestep): @@ -135,31 +139,34 @@ class OrnsteinUhlenbeckNoise(GaussianNoise): if explore: # Random exploration phase. if self.last_timestep <= self.random_timesteps: - action = self.random_exploration.get_torch_exploration_action( - action_dist, True) + action, _ = \ + self.random_exploration.get_torch_exploration_action( + action_dist, explore=True) # Apply base-scaled and time-annealed scaled OU-noise to # deterministic actions. else: det_actions = action_dist.deterministic_sample() scale = self.scale_schedule(self.last_timestep) gaussian_sample = scale * torch.normal( - mean=0.0, stddev=1.0, size=det_actions.size()) - + mean=torch.zeros(self.ou_state.size()), std=1.0) \ + .to(self.device) ou_new = self.ou_theta * -self.ou_state + \ self.ou_sigma * gaussian_sample self.ou_state += ou_new - noise = scale * self.ou_base_scale * self.ou_state * \ - (self.action_space.high - self.action_space.low) - action = torch.clamp( - det_actions + noise, - self.action_space.low * torch.ones_like(det_actions), - self.action_space.high * torch.ones_like(det_actions)) + high_low = torch.from_numpy(self.action_space.high - + self.action_space.low).to( + self.device) + noise = scale * self.ou_base_scale * self.ou_state * high_low + action = torch.clamp(det_actions + noise, + self.action_space.low[0], + self.action_space.high[0]) # No exploration -> Return deterministic actions. else: action = action_dist.deterministic_sample() # Logp=always zero. - logp = torch.zeros(shape=(action.size()[0], ), dtype=torch.float32) + logp = torch.zeros( + (action.size()[0], ), dtype=torch.float32, device=self.device) return action, logp diff --git a/rllib/utils/exploration/parameter_noise.py b/rllib/utils/exploration/parameter_noise.py index 5416f9a51..15f237013 100644 --- a/rllib/utils/exploration/parameter_noise.py +++ b/rllib/utils/exploration/parameter_noise.py @@ -79,7 +79,8 @@ class ParameterNoise(Exploration): np.zeros(var.shape, dtype=np.float32), framework=self.framework, tf_name=name_, - torch_tensor=True)) + torch_tensor=True, + device=self.device)) # tf-specific ops to sample, assign and remove noise. if self.framework == "tf" and not tf.executing_eagerly(): @@ -298,7 +299,7 @@ class ParameterNoise(Exploration): else: for i in range(len(self.noise)): self.noise[i] = torch.normal( - 0.0, self.stddev, size=self.noise[i].size()) + mean=torch.zeros(self.noise[i].size()), std=self.stddev) def _tf_sample_new_noise_op(self): added_noises = [] diff --git a/rllib/utils/exploration/random.py b/rllib/utils/exploration/random.py index c38a8f55a..30a7d3f44 100644 --- a/rllib/utils/exploration/random.py +++ b/rllib/utils/exploration/random.py @@ -1,4 +1,5 @@ from gym.spaces import Discrete, MultiDiscrete, Tuple +import numpy as np from typing import Union from ray.rllib.models.action_dist import ActionDistribution @@ -7,6 +8,7 @@ from ray.rllib.utils.exploration.exploration import Exploration from ray.rllib.utils.framework import try_import_tf, try_import_torch, \ TensorType from ray.rllib.utils.tuple_actions import TupleActions +from ray.rllib.utils import force_tuple tf = try_import_tf() torch, _ = try_import_torch() @@ -81,15 +83,19 @@ class Random(Exploration): return action, logp def get_torch_exploration_action(self, action_dist, explore): - tensor_fn = torch.LongTensor if \ - type(self.action_space) in [Discrete, MultiDiscrete] else \ - torch.FloatTensor if explore: # Unsqueeze will be unnecessary, once we support batch/time-aware # Spaces. a = self.action_space.sample() - action = tensor_fn([a] if isinstance(a, int) else a) + req = force_tuple( + action_dist.required_model_output_shape( + self.action_space, self.model.model_config)) + # Add a batch dimension. + if len(action_dist.inputs.shape) == len(req) + 1: + a = np.expand_dims(a, 0) + action = torch.from_numpy(a).to(self.device) else: - action = tensor_fn(action_dist.deterministic_sample()) - logp = torch.zeros((action.size()[0], ), dtype=torch.float32) + action = action_dist.deterministic_sample() + logp = torch.zeros( + (action.size()[0], ), dtype=torch.float32, device=self.device) return action, logp diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 3de14c6f1..aee50b320 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -1,5 +1,6 @@ import logging import os +import sys from typing import Any logger = logging.getLogger(__name__) @@ -38,17 +39,36 @@ def try_import_tf(error=False): Returns: The tf module (either from tf2.0.compat.v1 OR as tf1.x. """ - # TODO(sven): Make sure, these are reset after each test case - # that uses them. + # Make sure, these are reset after each test case + # that uses them: del os.environ["RLLIB_TEST_NO_TF_IMPORT"] if "RLLIB_TEST_NO_TF_IMPORT" in os.environ: logger.warning("Not importing TensorFlow for test purposes") return None + if "TF_CPP_MIN_LOG_LEVEL" not in os.environ: + os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" + + # Try to reuse already imported tf module. This will avoid going through + # the initial import steps below and thereby switching off v2_behavior + # (switching off v2 behavior twice breaks all-framework tests for eager). + if "tensorflow" in sys.modules: + tf_module = sys.modules["tensorflow"] + # Try "reducing" tf to tf.compat.v1. + try: + tf_module = tf_module.compat.v1 + # No compat.v1 -> return tf as is. + except AttributeError: + pass + return tf_module + + # Just in case. We should not go through the below twice. + assert "tensorflow" not in sys.modules + try: - if "TF_CPP_MIN_LOG_LEVEL" not in os.environ: - os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" + # Try "reducing" tf to tf.compat.v1. import tensorflow.compat.v1 as tf tf.logging.set_verbosity(tf.logging.ERROR) + # Disable v2 eager mode. tf.disable_v2_behavior() return tf except ImportError: @@ -144,7 +164,8 @@ def get_variable(value, framework="tf", trainable=False, tf_name="unnamed-variable", - torch_tensor=False): + torch_tensor=False, + device=None): """ Args: value (any): The initial value to use. In the non-tf case, this will @@ -171,13 +192,45 @@ def get_variable(value, tf_name, initializer=value, dtype=dtype, trainable=trainable) elif framework == "torch" and torch_tensor is True: torch, _ = try_import_torch() - var_ = torch.from_numpy(value) + var_ = torch.from_numpy(value).to(device) var_.requires_grad = trainable return var_ # torch or None: Return python primitive. return value +def get_activation_fn(name, framework="tf"): + """ + Returns a framework specific activation function, given a name string. + + Args: + name (str): One of "relu" (default), "tanh", or "linear". + framework (str): One of "tf" or "torch". + + Returns: + A framework-specific activtion function. e.g. tf.nn.tanh or + torch.nn.ReLU. Returns None for name="linear". + """ + if framework == "torch": + _, nn = try_import_torch() + if name == "linear": + return None + elif name == "relu": + return nn.ReLU + elif name == "tanh": + return nn.Tanh + else: + if name == "linear": + return None + tf = try_import_tf() + fn = getattr(tf.nn, name, None) + if fn is not None: + return fn + + raise ValueError("Unknown activation ({}) for framework={}!".format( + name, framework)) + + # This call should never happen inside a module's functions/classes # as it would re-disable tf-eager. tf = try_import_tf() diff --git a/rllib/utils/numpy.py b/rllib/utils/numpy.py index a40d0a6ca..19f3c9ff2 100644 --- a/rllib/utils/numpy.py +++ b/rllib/utils/numpy.py @@ -147,7 +147,9 @@ def fc(x, weights, biases=None, framework=None): x = map_(x) # Torch stores matrices in transpose (faster for backprop). - weights = map_(weights, transpose=framework == "torch") + transpose = (framework == "torch" and (x.shape[1] != weights.shape[0] + and x.shape[1] == weights.shape[1])) + weights = map_(weights, transpose=transpose) biases = map_(biases) return np.matmul(x, weights) + (0.0 if biases is None else biases) diff --git a/rllib/utils/torch_ops.py b/rllib/utils/torch_ops.py index 87ce51a4e..b04a779b4 100644 --- a/rllib/utils/torch_ops.py +++ b/rllib/utils/torch_ops.py @@ -104,3 +104,7 @@ def convert_to_torch_tensor(stats, device=None): return tensor if device is None else tensor.to(device) return tree.map_structure(mapping, stats) + + +def atanh(x): + return 0.5 * torch.log((1 + x) / (1 - x))