From 2cbe29a7fa8bc63d57f97b03ba726676425e160b Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Wed, 19 Aug 2020 17:49:50 +0200 Subject: [PATCH] [RLlib] Curiosity minor fixes, do-overs, and testing. (#10143) --- python/requirements.txt | 1 + rllib/BUILD | 7 + rllib/agents/ppo/ppo_torch_policy.py | 4 +- rllib/agents/trainer.py | 3 +- rllib/evaluation/sample_batch_builder.py | 5 +- rllib/models/catalog.py | 8 +- rllib/models/torch/fcnet.py | 5 +- rllib/models/torch/misc.py | 13 +- rllib/models/torch/recurrent_net.py | 2 +- rllib/models/torch/visionnet.py | 13 +- rllib/policy/torch_policy.py | 38 +- rllib/policy/torch_policy_template.py | 5 +- rllib/utils/exploration/curiosity.py | 484 +++++++++--------- rllib/utils/exploration/exploration.py | 110 ++-- .../utils/exploration/tests/test_curiosity.py | 185 +++++-- rllib/utils/spaces/space_utils.py | 2 +- rllib/utils/typing.py | 8 + 17 files changed, 533 insertions(+), 360 deletions(-) diff --git a/python/requirements.txt b/python/requirements.txt index 0736046f4..55b753594 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -46,6 +46,7 @@ dataclasses dask[complete] feather-format gym +gym-minigrid kubernetes lxml mypy diff --git a/rllib/BUILD b/rllib/BUILD index 9f9737405..222fa482a 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -1073,6 +1073,13 @@ py_test( # Tag: utils # -------------------------------------------------------------------- +py_test( + name = "test_curiosity", + tags = ["utils"], + size = "large", + srcs = ["utils/exploration/tests/test_curiosity.py"] +) + py_test( name = "test_explorations", tags = ["utils"], diff --git a/rllib/agents/ppo/ppo_torch_policy.py b/rllib/agents/ppo/ppo_torch_policy.py index 3bab45ac7..c415f7b8a 100644 --- a/rllib/agents/ppo/ppo_torch_policy.py +++ b/rllib/agents/ppo/ppo_torch_policy.py @@ -74,9 +74,7 @@ class PPOLoss: return torch.sum(t[valid_mask]) / num_valid else: - - def reduce_mean_valid(t): - return torch.mean(t) + reduce_mean_valid = torch.mean prev_dist = dist_class(prev_logits, model) # Make loss functions. diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index b21c2fff7..dc7c73ce5 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -573,7 +573,8 @@ class Trainer(Trainable): # Try gym. else: import gym # soft dependency - self.env_creator = lambda env_config: gym.make(env) + self.env_creator = \ + lambda env_config: gym.make(env, **env_config) else: self.env_creator = lambda env_config: None diff --git a/rllib/evaluation/sample_batch_builder.py b/rllib/evaluation/sample_batch_builder.py index ec7164883..c725a9248 100644 --- a/rllib/evaluation/sample_batch_builder.py +++ b/rllib/evaluation/sample_batch_builder.py @@ -196,13 +196,14 @@ class MultiAgentSampleBatchBuilder: raise ValueError( "Batches sent to postprocessing must only contain steps " "from a single trajectory.", pre_batch) - post_batches[agent_id] = policy.postprocess_trajectory( - pre_batch, other_batches, episode) # Call the Policy's Exploration's postprocess method. + post_batches[agent_id] = pre_batch if getattr(policy, "exploration", None) is not None: policy.exploration.postprocess_trajectory( policy, post_batches[agent_id], getattr(policy, "_sess", None)) + post_batches[agent_id] = policy.postprocess_trajectory( + post_batches[agent_id], other_batches, episode) if log_once("after_post"): logger.info( diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index 75a377a68..37df032a0 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -37,14 +37,14 @@ logger = logging.getLogger(__name__) # __sphinx_doc_begin__ MODEL_DEFAULTS: ModelConfigDict = { # === Built-in options === + # Number of hidden layers for fully connected net + "fcnet_hiddens": [256, 256], + # Nonlinearity for fully connected net (tanh, relu) + "fcnet_activation": "tanh", # Filter config. List of [out_channels, kernel, stride] for each filter "conv_filters": None, # Nonlinearity for built-in convnet "conv_activation": "relu", - # Nonlinearity for fully connected net (tanh, relu) - "fcnet_activation": "tanh", - # Number of hidden layers for fully connected net - "fcnet_hiddens": [256, 256], # For DiagGaussian action distributions, make the second half of the model # outputs floating bias variables instead of state-dependent. This only # has an effect is using the default fully connected net. diff --git a/rllib/models/torch/fcnet.py b/rllib/models/torch/fcnet.py index 747fc221a..4080c9318 100644 --- a/rllib/models/torch/fcnet.py +++ b/rllib/models/torch/fcnet.py @@ -5,7 +5,7 @@ from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 from ray.rllib.models.torch.misc import SlimFC, AppendBiasLayer, \ normc_initializer from ray.rllib.utils.annotations import override -from ray.rllib.utils.framework import get_activation_fn, try_import_torch +from ray.rllib.utils.framework import try_import_torch torch, nn = try_import_torch() @@ -21,8 +21,7 @@ class FullyConnectedNetwork(TorchModelV2, nn.Module): model_config, name) nn.Module.__init__(self) - activation = get_activation_fn( - model_config.get("fcnet_activation"), framework="torch") + activation = model_config.get("fcnet_activation") hiddens = model_config.get("fcnet_hiddens") no_final_linear = model_config.get("no_final_linear") self.vf_share_layers = model_config.get("vf_share_layers") diff --git a/rllib/models/torch/misc.py b/rllib/models/torch/misc.py index 61a5ac972..b7f352434 100644 --- a/rllib/models/torch/misc.py +++ b/rllib/models/torch/misc.py @@ -68,20 +68,26 @@ class SlimConv2d(nn.Module): bias_init=0): super(SlimConv2d, self).__init__() layers = [] + # Padding layer. if padding: layers.append(nn.ZeroPad2d(padding)) + # Actual Conv2D layer (including correct initialization logic). conv = nn.Conv2d(in_channels, out_channels, kernel, stride) if initializer: if initializer == "default": initializer = nn.init.xavier_uniform_ initializer(conv.weight) nn.init.constant_(conv.bias, bias_init) - layers.append(conv) - if activation_fn: + # Activation function (if any; default=ReLu). + if isinstance(activation_fn, str): if activation_fn == "default": activation_fn = nn.ReLU + else: + activation_fn = get_activation_fn(activation_fn, "torch") + if activation_fn is not None: layers.append(activation_fn()) + # Put everything in sequence. self._model = nn.Sequential(*layers) def forward(self, x): @@ -100,16 +106,19 @@ class SlimFC(nn.Module): bias_init=0.0): super(SlimFC, self).__init__() layers = [] + # Actual Conv2D layer (including correct initialization logic). linear = nn.Linear(in_size, out_size, bias=use_bias) if initializer: initializer(linear.weight) if use_bias is True: nn.init.constant_(linear.bias, bias_init) layers.append(linear) + # Activation function (if any; default=None (linear)). if isinstance(activation_fn, str): activation_fn = get_activation_fn(activation_fn, "torch") if activation_fn is not None: layers.append(activation_fn()) + # Put everything in sequence. self._model = nn.Sequential(*layers) def forward(self, x): diff --git a/rllib/models/torch/recurrent_net.py b/rllib/models/torch/recurrent_net.py index 27980e5ac..1684d11c6 100644 --- a/rllib/models/torch/recurrent_net.py +++ b/rllib/models/torch/recurrent_net.py @@ -139,7 +139,7 @@ class LSTMWrapper(RecurrentNetwork, nn.Module): wrapped_out, torch.reshape(input_dict[SampleBatch.PREV_ACTIONS].float(), [-1, self.action_dim]), - torch.reshape(input_dict[SampleBatch.PREV_REWARDS], + torch.reshape(input_dict[SampleBatch.PREV_REWARDS].float(), [-1, 1]), ], dim=1) diff --git a/rllib/models/torch/visionnet.py b/rllib/models/torch/visionnet.py index ae54f0835..bdd03720f 100644 --- a/rllib/models/torch/visionnet.py +++ b/rllib/models/torch/visionnet.py @@ -5,7 +5,7 @@ from ray.rllib.models.torch.misc import normc_initializer, same_padding, \ SlimConv2d, SlimFC from ray.rllib.models.tf.visionnet_v1 import _get_filter_config from ray.rllib.utils.annotations import override -from ray.rllib.utils.framework import get_activation_fn, try_import_torch +from ray.rllib.utils.framework import try_import_torch _, nn = try_import_torch() @@ -22,8 +22,7 @@ class VisionNetwork(TorchModelV2, nn.Module): model_config, name) nn.Module.__init__(self) - activation = get_activation_fn( - self.model_config.get("conv_activation"), framework="torch") + activation = self.model_config.get("conv_activation") filters = self.model_config["conv_filters"] no_final_linear = self.model_config.get("no_final_linear") vf_share_layers = self.model_config.get("vf_share_layers") @@ -101,7 +100,10 @@ class VisionNetwork(TorchModelV2, nn.Module): self._value_branch_separate = self._value_branch = None if vf_share_layers: self._value_branch = SlimFC( - out_channels, 1, initializer=normc_initializer(0.01)) + out_channels, + 1, + initializer=normc_initializer(0.01), + activation_fn=None) else: vf_layers = [] (w, h, in_channels) = obs_space.shape @@ -136,7 +138,8 @@ class VisionNetwork(TorchModelV2, nn.Module): out_channels=1, kernel=1, stride=1, - padding=None)) + padding=None, + activation_fn=None)) self._value_branch_separate = nn.Sequential(*vf_layers) # Holds the current "base" output (before logits layer). diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index e8f11e413..280064a4e 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -2,7 +2,7 @@ import functools import gym import numpy as np import time -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Type, Union import ray from ray.rllib.models.modelv2 import ModelV2 @@ -47,15 +47,21 @@ class TorchPolicy(Policy): config: TrainerConfigDict, *, model: ModelV2, - loss: Callable[[Policy, ModelV2, type, SampleBatch], TensorType], - action_distribution_class: TorchDistributionWrapper, - action_sampler_fn: Callable[[TensorType, List[TensorType]], Tuple[ - TensorType, TensorType]] = None, + loss: Callable[[ + Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch + ], Union[TensorType, List[TensorType]]], + action_distribution_class: Type[TorchDistributionWrapper], + action_sampler_fn: Optional[Callable[[ + TensorType, List[TensorType] + ], Tuple[TensorType, TensorType]]] = None, action_distribution_fn: Optional[Callable[[ Policy, ModelV2, TensorType, TensorType, TensorType - ], Tuple[TensorType, type, List[TensorType]]]] = None, + ], Tuple[TensorType, Type[TorchDistributionWrapper], List[ + TensorType]]]] = None, max_seq_len: int = 20, - get_batch_divisibility_req: Optional[int] = None): + get_batch_divisibility_req: Optional[Callable[[Policy], + int]] = None, + ): """Build a policy from policy and loss torch modules. Note that model will be placed on GPU device if CUDA_VISIBLE_DEVICES @@ -69,11 +75,11 @@ class TorchPolicy(Policy): model (ModelV2): PyTorch policy module. Given observations as input, this module must return a list of outputs where the first item is action logits, and the rest can be any value. - loss (Callable[[Policy, ModelV2, type, SampleBatch], TensorType]): - Function that takes (policy, model, dist_class, train_batch) - and returns a single scalar loss. - action_distribution_class (TorchDistributionWrapper): Class for - a torch action distribution. + loss (Callable[[Policy, ModelV2, Type[TorchDistributionWrapper], + SampleBatch], Union[TensorType, List[TensorType]]]): Callable + that returns a single scalar loss or a list of loss terms. + action_distribution_class (Type[TorchDistributionWrapper]): Class + for a torch action distribution. action_sampler_fn (Callable[[TensorType, List[TensorType]], Tuple[TensorType, TensorType]]): A callable returning a sampled action and its log-likelihood given Policy, ModelV2, @@ -337,15 +343,21 @@ class TorchPolicy(Policy): batch_divisibility_req=self.batch_divisibility_req) train_batch = self._lazy_tensor_dict(postprocessed_batch) + + # Calculate the actual policy loss. loss_out = force_list( self._loss(self, self.model, self.dist_class, train_batch)) + # Call Model's custom-loss with Policy loss outputs and train_batch. if self.model: loss_out = self.model.custom_loss(loss_out, train_batch) - # Modifies the loss as specified by the Exploration strategy. + + # Give Exploration component that chance to modify the loss (or add + # its own terms). if hasattr(self, "exploration"): loss_out = self.exploration.get_exploration_loss( loss_out, train_batch) + assert len(loss_out) == len(self._optimizers) # assert not any(torch.isnan(l) for l in loss_out) fetches = self.extra_compute_grad_fetches() diff --git a/rllib/policy/torch_policy_template.py b/rllib/policy/torch_policy_template.py index a7a0c2001..13ed32024 100644 --- a/rllib/policy/torch_policy_template.py +++ b/rllib/policy/torch_policy_template.py @@ -298,9 +298,8 @@ def build_torch_policy( optimizers = TorchPolicy.optimizer(self) optimizers = force_list(optimizers) if hasattr(self, "exploration"): - exploration_optimizers = force_list( - self.exploration.get_exploration_optimizer(self.config)) - optimizers.extend(exploration_optimizers) + optimizers = self.exploration.get_exploration_optimizer( + optimizers) return optimizers @override(TorchPolicy) diff --git a/rllib/utils/exploration/curiosity.py b/rllib/utils/exploration/curiosity.py index 4191366d4..e0acf29ea 100644 --- a/rllib/utils/exploration/curiosity.py +++ b/rllib/utils/exploration/curiosity.py @@ -1,268 +1,286 @@ -""" -Curiosity-driven Exploration by Self-supervised Prediction - Pathak, Agrawal, -Efros, and Darrell - UC Berkeley - ICML 2017. - -This implements the curiosty-based loss function from -https://arxiv.org/pdf/1705.05363.pdf. We learn a simplified model of the -environment based on three networks: - 1) embedding states into latent space (the "features" network) - 2) predicting the next embedded state, given a state and action (the - "forwards" network) - 3) predicting the action, given two consecutive embedded state (the - "inverse" network) - -If the agent was unable to successfully predict the state-action-next_state -sequence, we modify the standard reward with a penalty. Therefore, if a state -transition was unexpected, the agent becomes "curious" and further explores -this transition. - -This is tailored for sparse reward environments, as it generates an intrinsic -reward. -""" -from gym.spaces import Space -from typing import Union, Optional +from gym.spaces import Discrete, Space +from typing import Optional, Tuple, Union from ray.rllib.models.action_dist import ActionDistribution +from ray.rllib.models.catalog import ModelCatalog +from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.models.torch.misc import SlimFC +from ray.rllib.models.torch.torch_action_dist import TorchCategorical +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.annotations import override from ray.rllib.utils.exploration.exploration import Exploration from ray.rllib.utils.framework import try_import_torch, TensorType from ray.rllib.utils.from_config import from_config -from ray.rllib.utils.typing import SampleBatchType, TrainerConfigDict +from ray.rllib.utils.typing import FromConfigSpec, ModelConfigDict, \ + SampleBatchType torch, nn = try_import_torch() - -# TODO: (tanay) how to test if action space is discrete -""" -Example Configuration - -config = ppo.DEFAULT_CONFIG -env = "CartPole-v0" -config["framework"] = "torch" -config["exploration_config"] = { - "type": "ray.rllib.utils.exploration.curiosity_exploration.Curiosity", - "forward_net_hiddens": [64], - "inverse_net_hiddens": [32,4], - "feature_net_hiddens": [16,8], - "feature_dim": 8, - "forward_activation": "relu", - "inverse_activation": "relu", - "feature_activation": "relu", - "submodule": "EpsilonGreedy", -} -trainer = ppo.PPOTrainer(config=config, env=env) -trainer.train() -""" +F = None +if nn is not None: + F = nn.functional class Curiosity(Exploration): - def __init__(self, action_space: Space, *, framework: str, **kwargs): - """ + """Implementation of: + [1] Curiosity-driven Exploration by Self-supervised Prediction + Pathak, Agrawal, Efros, and Darrell - UC Berkeley - ICML 2017. + https://arxiv.org/pdf/1705.05363.pdf + + Learns a simplified model of the environment based on three networks: + 1) Embedding observations into latent space ("feature" network). + 2) Predicting the action, given two consecutive embedded observations + ("inverse" network). + 3) Predicting the next embedded obs, given an obs and action + ("forward" network). + + The less the agent is able to predict the actually observed next feature + vector, given obs and action (through the forwards network), the larger the + "intrinsic reward", which will be added to the extrinsic reward. + Therefore, if a state transition was unexpected, the agent becomes + "curious" and will further explore this transition leading to better + exploration in sparse rewards environments. + """ + + def __init__(self, + action_space: Space, + *, + framework: str, + model: ModelV2, + feature_dim: int = 288, + feature_net_config: Optional[ModelConfigDict] = None, + inverse_net_hiddens: Tuple[int] = (256, ), + inverse_net_activation: str = "relu", + forward_net_hiddens: Tuple[int] = (256, ), + forward_net_activation: str = "relu", + beta: float = 0.2, + eta: float = 1.0, + lr: float = 1e-3, + sub_exploration: Optional[FromConfigSpec] = None, + **kwargs): + """Initializes a Curiosity object. + + Uses as defaults the hyperparameters described in [1]. + Args: - action_space (Space): The action space in which to explore. - framework (str): One of "tf" or "torch". Currently only torch is - supported. + feature_dim (int): The dimensionality of the feature (phi) + vectors. + feature_net_config (Optional[ModelConfigDict]): Optional model + configuration for the feature network, producing feature + vectors (phi) from observations. This can be used to configure + fcnet- or conv_net setups to properly process any observation + space. + inverse_net_hiddens (Tuple[int]): Tuple of the layer sizes of the + inverse (action predicting) NN head (on top of the feature + outputs for phi and phi'). + inverse_net_activation (str): Activation specifier for the inverse + net. + forward_net_hiddens (Tuple[int]): Tuple of the layer sizes of the + forward (phi' predicting) NN head. + forward_net_activation (str): Activation specifier for the forward + net. + beta (float): Weight for the forward loss (over the inverse loss, + which gets weight=1.0-beta) in the common loss term. + eta (float): Weight for intrinsic rewards before being added to + extrinsic ones. + lr (float): The learning rate for the curiosity-specific + optimizer, optimizing feature-, inverse-, and forward nets. + sub_exploration (Optional[FromConfigSpec]): The config dict for + the underlying Exploration to use (e.g. epsilon-greedy for + DQN). If None, uses the FromSpecDict provided in the Policy's + default config. """ if framework != "torch": - raise NotImplementedError("only torch is currently supported for " - "curiosity") - - # Parse the curiosity-specific arguments - # If it was not specified in the config, assign the given default - def extract_from_kwargs(key, default): - if key in kwargs: - temp = kwargs[key] - del kwargs[key] - return temp - else: - return default - - # Casts a single int to a list, else leaves it unchanged - def cast_to_list(l): - if type(l) == int: - return [l] - else: - return l - - submodule_type = extract_from_kwargs("submodule", "StochasticSampling") - self.feature_dim = extract_from_kwargs("feature_dim", 32) - - forward_activation = extract_from_kwargs("forward_activation", nn.ReLU) - inverse_activation = extract_from_kwargs("inverse_activation", nn.ReLU) - feature_activation = extract_from_kwargs("feature_activation", nn.ReLU) - - feature_net_hiddens = cast_to_list( - extract_from_kwargs("feature_net_hiddens", [64])) - inverse_net_hiddens = cast_to_list( - extract_from_kwargs("inverse_net_hiddens", [64])) - forward_net_hiddens = cast_to_list( - extract_from_kwargs("forward_net_hiddens", [64])) + raise ValueError("Only torch is currently supported for Curiosity") + elif not isinstance(action_space, Discrete): + raise ValueError( + "Only Discrete action spaces supported for Curiosity so far.") super().__init__( - action_space=action_space, framework=framework, **kwargs) + action_space, model=model, framework=framework, **kwargs) - # TODO: what should this look like for multidimensional obs spaces - self.obs_space_dim = kwargs["model"].obs_space.shape[0] - # TODO can we always assume 1 - self.action_space_dim = 1 + self.feature_dim = feature_dim + if feature_net_config is None: + feature_net_config = self.policy_config["model"].copy() + self.feature_net_config = feature_net_config + self.inverse_net_hiddens = inverse_net_hiddens + self.inverse_net_activation = inverse_net_activation + self.forward_net_hiddens = forward_net_hiddens + self.forward_net_activation = forward_net_activation - # Given a list of layer dimensions, create a FC ReLU net. - # If layer_dims is [4,8,6] we'll have a two layer net: 4->8 and 8->6 - def create_fc_net(layer_dims, activation): - layers = [] - for i in range(len(layer_dims) - 1): - layers.append( - SlimFC( - in_size=layer_dims[i], - out_size=layer_dims[i + 1], - use_bias=False, - activation_fn=activation)) - return nn.Sequential(*layers) + self.beta = beta + self.eta = eta + self.lr = lr + # TODO: (sven) if sub_exploration is None, use Trainer's default + # Exploration config. + if sub_exploration is None: + raise NotImplementedError + self.sub_exploration = sub_exploration - # List of dimension of each layer. Appends the hidden dims. - feature_dims = [self.obs_space_dim - ] + feature_net_hiddens + [self.feature_dim] - inverse_dims = [2 * self.feature_dim - ] + inverse_net_hiddens + [self.action_space_dim] - forward_dims = [self.feature_dim + self.action_space_dim] + \ - forward_net_hiddens + [self.feature_dim] + # Creates modules/layers inside the actual ModelV2. + self._curiosity_feature_net = ModelCatalog.get_model_v2( + self.model.obs_space, + self.action_space, + self.feature_dim, + model_config=self.feature_net_config, + framework=self.framework, + name="feature_net", + ) - # Creates actual models - self.feature_model = create_fc_net(feature_dims, feature_activation) - self.inverse_model = create_fc_net(inverse_dims, inverse_activation) - self.forward_model = create_fc_net(forward_dims, forward_activation) + self._curiosity_inverse_fcnet = self._create_fc_net( + [2 * self.feature_dim] + list(self.inverse_net_hiddens) + + [self.action_space.n], self.inverse_net_activation) - # Convenient reductions - self.criterion = torch.nn.MSELoss(reduction="none") - self.criterion_reduced = torch.nn.MSELoss(reduction="sum") + self._curiosity_forward_fcnet = self._create_fc_net( + [self.feature_dim + self.action_space.n + ] + list(forward_net_hiddens) + [self.feature_dim], + self.forward_net_activation) # This is only used to select the correct action self.exploration_submodule = from_config( cls=Exploration, - config={ - "type": submodule_type, - "action_space": action_space, - "framework": framework, - "policy_config": self.policy_config, - "model": self.model, - "num_workers": self.num_workers, - "worker_index": self.worker_index - }) + config=self.sub_exploration, + action_space=self.action_space, + framework=self.framework, + policy_config=self.policy_config, + model=self.model, + num_workers=self.num_workers, + worker_index=self.worker_index, + ) + @override(Exploration) def get_exploration_action(self, *, action_distribution: ActionDistribution, timestep: Union[int, TensorType], explore: bool = True): + # Simply delegate to sub-Exploration module. + return self.exploration_submodule.get_exploration_action( + action_distribution=action_distribution, + timestep=timestep, + explore=explore) + + @override(Exploration) + def get_exploration_optimizer(self, optimizers): + feature_params = list(self._curiosity_feature_net.parameters()) + inverse_params = list(self._curiosity_inverse_fcnet.parameters()) + forward_params = list(self._curiosity_forward_fcnet.parameters()) + + # Now that the Policy's own optimizer(s) have been created (from + # the Model parameters (IMPORTANT: w/o(!) the curiosity params), + # we can add our curiosity sub-modules to the Policy's Model. + self.model._curiosity_feature_net = \ + self._curiosity_feature_net.to(self.device) + self.model._curiosity_inverse_fcnet = \ + self._curiosity_inverse_fcnet.to(self.device) + self.model._curiosity_forward_fcnet = \ + self._curiosity_forward_fcnet.to(self.device) + + # Add the Adam for curiosity NN updating to the Policy's optimizers. + return optimizers + [ + torch.optim.Adam( + forward_params + inverse_params + feature_params, lr=self.lr) + ] + + @override(Exploration) + def postprocess_trajectory(self, policy, sample_batch, tf_sess=None): + """Calculates phi values (obs, obs', and predicted obs') and ri. + + Stores calculated phi, phi' and predicted phi' as well as the intrinsic + rewards in the batch for loss processing by the policy. """ - Returns the action to take next + batch_size = sample_batch[SampleBatch.OBS].shape[0] + phis, _ = self.model._curiosity_feature_net({ + SampleBatch.OBS: torch.cat([ + torch.from_numpy(sample_batch[SampleBatch.OBS]), + torch.from_numpy(sample_batch[SampleBatch.NEXT_OBS]) + ]) + }) + phi, next_phi = phis[:batch_size], phis[batch_size:] + + # Detach phi from graph (should not backpropagate through feature net + # for forward-loss). + predicted_next_phi = self.model._curiosity_forward_fcnet( + torch.cat( + [ + phi.detach(), + F.one_hot( + torch.from_numpy( + sample_batch[SampleBatch.ACTIONS]).long(), + num_classes=self.action_space.n).float() + ], + dim=-1)) + + # Forward loss term (predicted phi', given phi and action vs actually + # observed phi'). + forward_l2_norm_sqared = 0.5 * torch.sum( + torch.pow(predicted_next_phi - next_phi, 2.0), dim=-1) + # Scale forward loss by eta hyper-parameter. + sample_batch[SampleBatch.REWARDS] = \ + sample_batch[SampleBatch.REWARDS] + \ + self.eta * forward_l2_norm_sqared.detach().cpu().numpy() + return sample_batch + + @override(Exploration) + def get_exploration_loss(self, policy_loss, train_batch: SampleBatchType): + """Adds the loss for the inverse and forward models to policy_loss. + """ + batch_size = train_batch[SampleBatch.OBS].shape[0] + phis, _ = self.model._curiosity_feature_net({ + SampleBatch.OBS: torch.cat( + [ + train_batch[SampleBatch.OBS], + train_batch[SampleBatch.NEXT_OBS] + ], + dim=0) + }) + phi, next_phi = phis[:batch_size], phis[batch_size:] + # Inverse loss term (prediced action that led from phi to phi' vs + # actual action taken). + phi_next_phi = torch.cat([phi, next_phi], dim=-1) + dist_inputs = self.model._curiosity_inverse_fcnet(phi_next_phi) + action_dist = TorchCategorical(dist_inputs, self.model) + # Neg log(p); p=probability of observed action given the inverse-NN + # predicted action distribution. + inverse_loss = -action_dist.logp(train_batch[SampleBatch.ACTIONS]) + inverse_loss = torch.mean(inverse_loss) + + # Forward loss term has already been calculated during train batch pre- + # processing (just have to weight with beta here). + predicted_next_phi = self.model._curiosity_forward_fcnet( + torch.cat( + [ + phi, + F.one_hot( + train_batch[SampleBatch.ACTIONS].long(), + num_classes=self.action_space.n).float() + ], + dim=-1)) + forward_loss = torch.mean(0.5 * torch.sum( + torch.pow(predicted_next_phi - next_phi, 2.0), dim=-1)) + + # Append our loss to the policy loss(es). + return policy_loss + [ + (1.0 - self.beta) * inverse_loss + self.beta * forward_loss + ] + + def _create_fc_net(self, layer_dims, activation): + """Given a list of layer dimensions (incl. input-dim), creates FC-net. Args: - action_distribution (ActionDistribution): The probabilistic - distribution we sample actions from - timestep (Union[int, TensorType]): - explore (bool): If true, uses the submodule strategy to select the - next action + layer_dims (Tuple[int]): Tuple of layer dims, including the input + dimension. + activation (str): An activation specifier string (e.g. "relu"). + + + Examples: + If layer_dims is [4,8,6] we'll have a two layer net: 4->8 and 8->6. """ - return self.exploration_submodule.get_exploration_action( - action_distribution=action_distribution, timestep=timestep) - - def get_exploration_loss(self, policy_loss, sample_batch: SampleBatchType): - """ - Returns the intrinsic reward associated to the explorations strategy - policy_loss (TensorType): The loss from the policy, not associated - to the exploration strategy, which we will modify - sample_batch (SampleBatchType): The SampleBatch of observations, to - which we will associate an intrinsic loss. - """ - - # Cast to torch tensors, to be fed into the model - obs_list = sample_batch["obs"].float() - next_obs_list = sample_batch["new_obs"].float() - emb_next_obs_list = self._get_latent_vector(next_obs_list).float() - actions_list = sample_batch["actions"].float() - - actions_pred = self._predict_action(obs_list, next_obs_list) - embedding_pred = self._predict_next_obs(obs_list, actions_list) - - # L2 losses for predicted action and next state - embedding_loss = self.criterion_reduced(emb_next_obs_list, - embedding_pred) - actions_loss = self.criterion_reduced( - actions_pred.squeeze(1), actions_list) - return policy_loss + [embedding_loss + actions_loss] - - def _get_latent_vector(self, obs: TensorType) -> TensorType: - """ - Returns the embedded vector phi(state) - obs (TensorType): a batch of states - """ - return self.feature_model(obs) - - def get_exploration_optimizers(self, config: TrainerConfigDict): - """Returns optimizer (or list) for environmental dynamics networks. - """ - forward_params = list(self.forward_model.parameters()) - inverse_params = list(self.inverse_model.parameters()) - feature_params = list(self.feature_model.parameters()) - - return torch.optim.Adam( - forward_params + inverse_params + feature_params, lr=1e-3) - - def postprocess_trajectory(self, - policy, - sample_batch: SampleBatchType, - tf_sess: Optional["tf.Session"] = None): - """Calculates intrinsic rewards and adds them to "rewards" in batch. - - Calculations are based on difference between predicted and actually - observed next observations. - """ - - # Extract the relevant data from the SampleBatch, and cast to Tensors - obs_list = torch.from_numpy(sample_batch["obs"]).float() - next_obs_list = torch.from_numpy(sample_batch["new_obs"]).float() - emb_next_obs_list = self._get_latent_vector(next_obs_list).float() - actions_list = torch.from_numpy(sample_batch["actions"]).float() - - # Equation (2) in paper. - actions_pred = self._predict_action(obs_list, next_obs_list) - embedding_pred = self._predict_next_obs(obs_list, actions_list) - - # A vector of L2 losses corresponding to each observation, - # Equation (7) in paper. - embedding_loss = torch.sum( - self.criterion(emb_next_obs_list, embedding_pred), dim=-1) - - # Equation (3) in paper. TODO discrete action space - actions_loss = self.criterion(actions_pred.squeeze(1), actions_list) - - # Modifies environment rewards by subtracting intrinsic rewards - sample_batch["rewards"] = sample_batch["rewards"] - \ - embedding_loss.clone().detach().numpy() - \ - actions_loss.clone().detach().numpy() - - def _predict_action(self, obs: TensorType, next_obs: TensorType): - """ - Returns the predicted action, given two states. This is the inverse - dynamics model. - - obs (TensorType): Observed state at time t. - next_obs (TensorType): Observed state at time t+1 - """ - return self.inverse_model( - torch.cat( - (self._get_latent_vector(obs), - self._get_latent_vector(next_obs)), - axis=-1)) - - # raw obs (not embedded) - def _predict_next_obs(self, obs: TensorType, action: TensorType): - """ - Returns the predicted next state, given an action and state. - - obs (TensorType): Observed state at time t. - action (TensorType): Action taken at time t - """ - return self.forward_model( - torch.cat( - (self._get_latent_vector(obs), action.unsqueeze(1)), dim=-1)) + layers = [] + for i in range(len(layer_dims) - 1): + act = activation if i < len(layer_dims) - 2 else None + layers.append( + SlimFC( + in_size=layer_dims[i], + out_size=layer_dims[i + 1], + activation_fn=act)) + return nn.Sequential(*layers) diff --git a/rllib/utils/exploration/exploration.py b/rllib/utils/exploration/exploration.py index 7a7725143..2206c829b 100644 --- a/rllib/utils/exploration/exploration.py +++ b/rllib/utils/exploration/exploration.py @@ -1,12 +1,17 @@ from gym.spaces import Space -from typing import Union +from typing import List, Optional, Union, TYPE_CHECKING from ray.rllib.models.action_dist import ActionDistribution from ray.rllib.models.modelv2 import ModelV2 +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.annotations import DeveloperAPI from ray.rllib.utils.framework import try_import_torch, TensorType +from ray.rllib.utils.typing import LocalOptimizer, TrainerConfigDict -torch, nn = try_import_torch() +if TYPE_CHECKING: + from ray.rllib.policy.policy import Policy + +_, nn = try_import_torch() @DeveloperAPI @@ -19,13 +24,13 @@ class Exploration: """ def __init__(self, action_space: Space, *, framework: str, - policy_config: dict, model: ModelV2, num_workers: int, - worker_index: int): + policy_config: TrainerConfigDict, model: ModelV2, + num_workers: int, worker_index: int): """ Args: action_space (Space): The action space in which to explore. framework (str): One of "tf" or "torch". - policy_config (dict): The Policy's config dict. + policy_config (TrainerConfigDict): The Policy's config dict. model (ModelV2): The Policy's model. num_workers (int): The overall number of workers used. worker_index (int): The index of the worker using this class. @@ -45,17 +50,20 @@ class Exploration: self.device = params[0].device @DeveloperAPI - def before_compute_actions(self, - *, - timestep=None, - explore=None, - tf_sess=None, - **kwargs): + def before_compute_actions( + self, + *, + timestep: Optional[Union[TensorType, int]] = None, + explore: Optional[Union[TensorType, bool]] = None, + tf_sess: Optional["tf.Session"] = None, + **kwargs): """Hook for preparations before policy.compute_actions() is called. Args: - timestep (Optional[TensorType]): An optional timestep tensor. - explore (Optional[TensorType]): An optional explore boolean flag. + timestep (Optional[Union[TensorType, int]]): An optional timestep + tensor. + explore (Optional[Union[TensorType, bool]]): An optional explore + boolean flag. tf_sess (Optional[tf.Session]): The tf-session object to use. **kwargs: Forward compatibility kwargs. """ @@ -65,7 +73,7 @@ class Exploration: def get_exploration_action(self, *, action_distribution: ActionDistribution, - timestep: Union[int, TensorType], + timestep: Union[TensorType, int], explore: bool = True): """Returns a (possibly) exploratory action and its log-likelihood. @@ -76,11 +84,11 @@ class Exploration: action_distribution (ActionDistribution): The instantiated ActionDistribution object to work with when creating exploration actions. - timestep (int|TensorType): The current sampling time step. It can - be a tensor for TF graph mode, otherwise an integer. - explore (bool): True: "Normal" exploration behavior. - False: Suppress all exploratory behavior and return - a deterministic action. + timestep (Union[TensorType, int]): The current sampling time step. + It can be a tensor for TF graph mode, otherwise an integer. + explore (Union[TensorType, bool]): True: "Normal" exploration + behavior. False: Suppress all exploratory behavior and return + a deterministic action. Returns: Tuple: @@ -90,28 +98,6 @@ class Exploration: """ pass - @DeveloperAPI - def get_exploration_loss(self, policy_loss, sample_batch): - """Modifies the policy loss with a loss associated to the exploration - strategy. - - Args: - policy_loss (TODO): Loss from the Policy - sample_batch (SampleBatch): The SampleBatch object to post-process. - """ - return policy_loss - - @DeveloperAPI - def get_exploration_optimizer(self, config=None): - """ - Returns: an optimizer for the loss from get_exploration_loss (in case - the exploration strategy has trainable components) - - Args: - config: configuration for an optimizer - """ - return [] - @DeveloperAPI def on_episode_start(self, policy, @@ -147,7 +133,10 @@ class Exploration: pass @DeveloperAPI - def postprocess_trajectory(self, policy, sample_batch, tf_sess=None): + def postprocess_trajectory(self, + policy: "Policy", + sample_batch, + tf_sess=None): """Handles post-processing of done episode trajectories. Changes the given batch in place. This callback is invoked by the @@ -160,6 +149,43 @@ class Exploration: """ return sample_batch + @DeveloperAPI + def get_exploration_optimizer(self, optimizers: List[LocalOptimizer]): + """May add optimizer(s) to the Policy's own `optimizers`. + + The number of optimizers (Policy's plus Exploration's optimizers) must + match the number of loss terms produced by the Policy's loss function + and the Exploration component's loss terms. + + Args: + optimizers (List[LocalOptimizer]): The list of the Policy's + local optimizers. + + Returns: + List[LocalOptimizer]: The updated list of local optimizers to use + on the different loss terms. + """ + return optimizers + + @DeveloperAPI + def get_exploration_loss(self, policy_loss: List[TensorType], + train_batch: SampleBatch): + """May add loss term(s) to the Policy's own loss(es). + + Args: + policy_loss (List[TensorType]): Loss(es) already calculated by the + Policy's own loss function and maybe the Model's custom loss. + train_batch (SampleBatch): The training data to calculate the + loss(es) for. This train data has already gone through + this Exploration's `preprocess_train_batch()` method. + + Returns: + List[TensorType]: The updated list of loss terms. + This may be the original Policy loss(es), altered, and/or new + loss terms added to it. + """ + return policy_loss + @DeveloperAPI def get_info(self, sess=None): """Returns a description of the current exploration state. diff --git a/rllib/utils/exploration/tests/test_curiosity.py b/rllib/utils/exploration/tests/test_curiosity.py index 14a061f85..b050b60c8 100644 --- a/rllib/utils/exploration/tests/test_curiosity.py +++ b/rllib/utils/exploration/tests/test_curiosity.py @@ -1,67 +1,158 @@ +import gym +import gym_minigrid import numpy as np import ray import sys import unittest -from ray.rllib.utils import check import ray.rllib.agents.ppo as ppo +from ray.rllib.utils.test_utils import framework_iterator +from ray.rllib.utils.numpy import one_hot +from ray.tune import register_env + + +class OneHotWrapper(gym.core.ObservationWrapper): + def __init__(self, env): + super().__init__(env) + self.observation_space = gym.spaces.Box( + # 11=objects; 6=colors; 3=states + # +4: direction + 0.0, + 1.0, + shape=(49 * (11 + 6 + 3) + 4, ), + dtype=np.float32) + self.init_x = None + self.init_y = None + self.x_positions = [] + self.y_positions = [] + + def observation(self, obs): + # Debug output: max-x/y positions to watch exploration progress. + if self.step_count == 0: + if self.x_positions: + # max_diff = max( + # np.sqrt((np.array(self.x_positions) - self.init_x) ** 2 + ( + # np.array(self.y_positions) - self.init_y) ** 2)) + # print("After reset: max delta-x/y={}".format(max_diff)) + self.x_positions = [] + self.y_positions = [] + self.init_x = self.agent_pos[0] + self.init_y = self.agent_pos[1] + + # Are we carrying the key? + if self.carrying is not None: + print("Carrying KEY!!") + + self.x_positions.append(self.agent_pos[0]) + self.y_positions.append(self.agent_pos[1]) + + # One-hot the last dim into 11, 6, 3 one-hot vectors, then flatten. + objects = one_hot(obs[:, :, 0], depth=11) + colors = one_hot(obs[:, :, 1], depth=6) + states = one_hot(obs[:, :, 2], depth=3) + # Is the door we see open? + for x in range(7): + for y in range(7): + if objects[x, y, 4] == 1.0 and states[x, y, 0] == 1.0: + print("Door OPEN!!") + + all_ = np.concatenate([objects, colors, states], -1) + ret = np.reshape(all_, (-1, )) + direction = one_hot( + np.array(self.agent_dir), depth=4).astype(np.float32) + return np.concatenate([ret, direction]) + + +def env_maker(config): + name = config.get("name", "MiniGrid-Empty-5x5-v0") + env = gym.make(name) + # Only use image portion of observation (discard goal and direction). + env = gym_minigrid.wrappers.ImgObsWrapper(env) + env = OneHotWrapper(env) + return env + + +register_env("mini-grid", env_maker) +CONV_FILTERS = [[16, [11, 11], 3], [32, [9, 9], 3], [64, [5, 5], 3]] class TestCuriosity(unittest.TestCase): - - # Sets up a single ray environment for every test. - @classmethod def setUpClass(cls): - ray.init(local_mode=True) + ray.init() @classmethod def tearDownClass(cls): ray.shutdown() - def test_no_curiosity(self): - config = ppo.DEFAULT_CONFIG - env = "CartPole-v0" - dummy_obs = np.array([0.0, 0.1, 0.0, 0.0]) - prev_a = np.array(0) - config["framework"] = "torch" - config["exploration_config"] = {"type": "ParameterNoise"} - - trainer = ppo.PPOTrainer(config=config, env=env) - trainer.train() - - # Make sure all actions drawn are the same, given same - # observations. Tests the explorations API. - - actions = [] - for _ in range(5): - actions.append( - trainer.compute_action( - observation=dummy_obs, - explore=False, - prev_action=prev_a, - prev_reward=1.0 if prev_a is not None else None)) - check(actions[-1], actions[0]) - print(actions) - - def test_curiosity(self): - config = ppo.DEFAULT_CONFIG - - env = "CartPole-v0" - config["framework"] = "torch" - config["exploration_config"] = { - "type": "ray.rllib.utils.exploration.curiosity.Curiosity", - "forward_net_hiddens": [64], - "inverse_net_hiddens": [32, 4], - "feature_net_hiddens": [16, 8], - "feature_dim": 8, - "forward_activation": "relu", - "inverse_activation": "relu", - "feature_activation": "relu", - "submodule": "EpsilonGreedy", + def test_curiosity_on_large_frozen_lake(self): + config = ppo.DEFAULT_CONFIG.copy() + # A very largeĀ frozen-lake that's hard for a random policy to solve + # due to 0.0 feedback. + config["env"] = "FrozenLake-v0" + config["env_config"] = { + "desc": [ + "SFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFF", + "FFFFFFFFFFFFFFFG", + ], + "is_slippery": False } - trainer = ppo.PPOTrainer(config=config, env=env) - trainer.train() + # Limit horizon to make it really hard for non-curious agent to reach + # the goal state. + config["horizon"] = 40 + config["num_workers"] = 0 # local only + config["train_batch_size"] = 512 + config["num_sgd_iter"] = 10 + + num_iterations = 30 + for _ in framework_iterator(config, frameworks="torch"): + # W/ Curiosity. + config["exploration_config"] = { + "type": "Curiosity", + "feature_dim": 128, + "eta": 0.05, + "sub_exploration": { + "type": "StochasticSampling", + } + } + trainer = ppo.PPOTrainer(config=config) + rewards_w = 0.0 + for _ in range(num_iterations): + result = trainer.train() + rewards_w += result["episode_reward_mean"] + print(result) + rewards_w /= num_iterations + trainer.stop() + + # W/o Curiosity. + config["exploration_config"] = { + "type": "StochasticSampling", + } + trainer = ppo.PPOTrainer(config=config) + rewards_wo = 0.0 + for _ in range(num_iterations): + result = trainer.train() + rewards_wo += result["episode_reward_mean"] + print(result) + rewards_wo /= num_iterations + trainer.stop() + + self.assertTrue(rewards_wo == 0.0) + self.assertGreater(rewards_w, 0.1) if __name__ == "__main__": diff --git a/rllib/utils/spaces/space_utils.py b/rllib/utils/spaces/space_utils.py index 06acd8869..ff0463983 100644 --- a/rllib/utils/spaces/space_utils.py +++ b/rllib/utils/spaces/space_utils.py @@ -67,7 +67,7 @@ def flatten_to_single_ndarray(input_): """Returns a single np.ndarray given a list/tuple of np.ndarrays. Args: - input_ (Union[List[np.ndarray],np.ndarray]): The list of ndarrays or + input_ (Union[List[np.ndarray], np.ndarray]): The list of ndarrays or a single ndarray. Returns: diff --git a/rllib/utils/typing.py b/rllib/utils/typing.py index f52f5850c..96a8893ef 100644 --- a/rllib/utils/typing.py +++ b/rllib/utils/typing.py @@ -19,6 +19,10 @@ EnvConfigDict = dict # the model catalog. ModelConfigDict = dict +# Objects that can be created through the `from_config()` util method +# need a config dict with a "type" key, a class path (str), or a type directly. +FromConfigSpec = Union[Dict[str, Any], type, str] + # Represents a BaseEnv, MultiAgentEnv, ExternalEnv, ExternalMultiAgentEnv, # VectorEnv, or gym.Env. EnvType = Any @@ -61,6 +65,10 @@ FileType = Any # Represents the result dict returned by Trainer.train(). ResultDict = dict +# A tf or torch local optimizer object. +LocalOptimizer = Union["tf.keras.optimizers.Optimizer", + "torch.optim.Optimizer"] + # Dict of tensors returned by compute gradients on the policy, e.g., # {"td_error": [...], "learner_stats": {"vf_loss": ..., ...}}, for multi-agent, # {"policy1": {"learner_stats": ..., }, "policy2": ...}.