From c957ed58edfbfb9d9574e8ea1c73d12c1002d7c0 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Tue, 21 Jan 2020 08:06:50 +0100 Subject: [PATCH] [RLlib] Implement PPO torch version. (#6826) --- ci/jenkins_tests/run_rllib_tests.sh | 12 +- doc/source/rllib-concepts.rst | 2 +- rllib/agents/ppo/__init__.py | 5 +- rllib/agents/ppo/appo_policy.py | 2 +- rllib/agents/ppo/ppo.py | 27 ++- .../ppo/{ppo_policy.py => ppo_tf_policy.py} | 40 ++-- rllib/agents/ppo/ppo_torch_policy.py | 223 ++++++++++++++++++ rllib/agents/ppo/{test => tests}/test.py | 0 rllib/agents/ppo/tests/test_ppo.py | 182 ++++++++++++++ rllib/evaluation/postprocessing.py | 5 +- rllib/examples/centralized_critic.py | 5 +- rllib/examples/multiagent_two_trainers.py | 2 +- rllib/models/catalog.py | 57 +++-- rllib/policy/policy.py | 4 +- rllib/policy/torch_policy.py | 3 +- rllib/policy/torch_policy_template.py | 7 +- rllib/tests/test_env_with_subprocess.py | 4 +- rllib/tests/test_legacy.py | 11 - rllib/tests/test_multi_agent_env.py | 4 +- rllib/tests/test_optimizers.py | 2 +- rllib/tests/test_rollout_worker.py | 4 +- .../compact-regression-test.yaml | 26 +- ...cartpole-ppo.yaml => cartpole-ppo-tf.yaml} | 2 +- rllib/utils/deprecation.py | 11 +- rllib/utils/explained_variance.py | 8 +- 25 files changed, 555 insertions(+), 93 deletions(-) rename rllib/agents/ppo/{ppo_policy.py => ppo_tf_policy.py} (91%) create mode 100644 rllib/agents/ppo/ppo_torch_policy.py rename rllib/agents/ppo/{test => tests}/test.py (100%) create mode 100644 rllib/agents/ppo/tests/test_ppo.py delete mode 100644 rllib/tests/test_legacy.py rename rllib/tuned_examples/regression_tests/{cartpole-ppo.yaml => cartpole-ppo-tf.yaml} (92%) diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index f86ec51c2..98cbd0c83 100755 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -34,6 +34,9 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --stop '{"training_iteration": 1}' \ --config '{"num_workers": 2}' +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output python /ray/rllib/agents/ppo/tests/test_ppo.py + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output /ray/rllib/train.py \ --env CartPole-v1 \ @@ -305,9 +308,6 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/tests/test_dependency.py -docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_legacy.py - docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/tests/test_io.py @@ -365,9 +365,6 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/tests/test_supported_spaces.py -docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - /ray/ci/suppress_output python /ray/rllib/tests/test_env_with_subprocess.py - docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output /ray/rllib/tests/test_rollout.sh @@ -493,3 +490,6 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/examples/custom_keras_rnn_model.py --run=PPO --stop=50 --env=RepeatInitialEnv + +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output python /ray/rllib/tests/test_env_with_subprocess.py diff --git a/doc/source/rllib-concepts.rst b/doc/source/rllib-concepts.rst index 505b69c50..00fc9daf2 100644 --- a/doc/source/rllib-concepts.rst +++ b/doc/source/rllib-concepts.rst @@ -537,7 +537,7 @@ You can use the ``with_updates`` method on Trainers and Policy objects built wit .. code-block:: python from ray.rllib.agents.ppo import PPOTrainer - from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy + from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy CustomPolicy = PPOTFPolicy.with_updates( name="MyCustomPPOTFPolicy", diff --git a/rllib/agents/ppo/__init__.py b/rllib/agents/ppo/__init__.py index a3d492baf..239008743 100644 --- a/rllib/agents/ppo/__init__.py +++ b/rllib/agents/ppo/__init__.py @@ -1,7 +1,4 @@ from ray.rllib.agents.ppo.ppo import PPOTrainer, DEFAULT_CONFIG from ray.rllib.agents.ppo.appo import APPOTrainer -from ray.rllib.utils import renamed_agent -PPOAgent = renamed_agent(PPOTrainer) - -__all__ = ["PPOAgent", "APPOTrainer", "PPOTrainer", "DEFAULT_CONFIG"] +__all__ = ["APPOTrainer", "PPOTrainer", "DEFAULT_CONFIG"] diff --git a/rllib/agents/ppo/appo_policy.py b/rllib/agents/ppo/appo_policy.py index 46094b69f..dcfecdd89 100644 --- a/rllib/agents/ppo/appo_policy.py +++ b/rllib/agents/ppo/appo_policy.py @@ -16,7 +16,7 @@ from ray.rllib.evaluation.postprocessing import compute_advantages from ray.rllib.utils import try_import_tf from ray.rllib.policy.tf_policy_template import build_tf_policy from ray.rllib.policy.tf_policy import LearningRateSchedule, TFPolicy -from ray.rllib.agents.ppo.ppo_policy import KLCoeffMixin, ValueNetworkMixin +from ray.rllib.agents.ppo.ppo_tf_policy import KLCoeffMixin, ValueNetworkMixin from ray.rllib.models import ModelCatalog from ray.rllib.utils.annotations import override from ray.rllib.utils.explained_variance import explained_variance diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index 9e96a7f8b..363705afe 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -1,12 +1,13 @@ import logging from ray.rllib.agents import with_common_config -from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy +from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy from ray.rllib.agents.trainer_template import build_trainer from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer from ray.rllib.utils import try_import_tf tf = try_import_tf() + logger = logging.getLogger(__name__) # yapf: disable @@ -63,6 +64,8 @@ DEFAULT_CONFIG = with_common_config({ # usually slower, but you might want to try it if you run into issues with # the default optimizer. "simple_optimizer": False, + # Use PyTorch as framework? + "use_pytorch": False }) # __sphinx_doc_end__ # yapf: enable @@ -138,23 +141,18 @@ def warn_about_bad_reward_scales(trainer, result): "This means that it will take more than " "{} iterations for your value ".format(rew_scale) + "function to converge. If this is not intended, consider " - "increasing `vf_clip_param`." - ) + "increasing `vf_clip_param`.") def validate_config(config): - # PyTorch check. - if config["use_pytorch"]: - raise ValueError("PPO does not support PyTorch yet! Use tf instead.") if config["entropy_coeff"] < 0: raise DeprecationWarning("entropy_coeff must be >= 0") if isinstance(config["entropy_coeff"], int): config["entropy_coeff"] = float(config["entropy_coeff"]) if config["sgd_minibatch_size"] > config["train_batch_size"]: raise ValueError( - "Minibatch size {} must be <= train batch size {}.". - format(config["sgd_minibatch_size"], config["train_batch_size"]) - ) + "Minibatch size {} must be <= train batch size {}.".format( + config["sgd_minibatch_size"], config["train_batch_size"])) if config["batch_mode"] == "truncate_episodes" and not config["use_gae"]: raise ValueError( "Episode truncation is not supported without a value " @@ -168,14 +166,23 @@ def validate_config(config): logger.warning( "Using the simple minibatch optimizer. This will significantly " "reduce performance, consider simple_optimizer=False.") - elif tf and tf.executing_eagerly(): + elif config["use_pytorch"] or (tf and tf.executing_eagerly()): config["simple_optimizer"] = True # multi-gpu not supported +def get_policy_class(config): + if config.get("use_pytorch") is True: + from ray.rllib.agents.ppo.ppo_torch_policy import PPOTorchPolicy + return PPOTorchPolicy + else: + return PPOTFPolicy + + PPOTrainer = build_trainer( name="PPO", default_config=DEFAULT_CONFIG, default_policy=PPOTFPolicy, + get_policy_class=get_policy_class, make_policy_optimizer=choose_policy_optimizer, validate_config=validate_config, after_optimizer_step=update_kl, diff --git a/rllib/agents/ppo/ppo_policy.py b/rllib/agents/ppo/ppo_tf_policy.py similarity index 91% rename from rllib/agents/ppo/ppo_policy.py rename to rllib/agents/ppo/ppo_tf_policy.py index ef611ab92..e3d643a3e 100644 --- a/rllib/agents/ppo/ppo_policy.py +++ b/rllib/agents/ppo/ppo_tf_policy.py @@ -1,11 +1,13 @@ import logging import ray +from ray.rllib.agents.impala.vtrace_policy import BEHAVIOUR_LOGITS from ray.rllib.evaluation.postprocessing import compute_advantages, \ Postprocessing from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.policy import ACTION_LOGP from ray.rllib.policy.tf_policy import LearningRateSchedule, \ - EntropyCoeffSchedule, ACTION_LOGP + EntropyCoeffSchedule from ray.rllib.policy.tf_policy_template import build_tf_policy from ray.rllib.utils.explained_variance import explained_variance from ray.rllib.utils.tf_ops import make_tf_callable @@ -15,13 +17,9 @@ tf = try_import_tf() logger = logging.getLogger(__name__) -# Frozen logits of the policy that computed the action -BEHAVIOUR_LOGITS = "behaviour_logits" - class PPOLoss: def __init__(self, - action_space, dist_class, model, value_targets, @@ -38,12 +36,10 @@ class PPOLoss: clip_param=0.1, vf_clip_param=0.1, vf_loss_coeff=1.0, - use_gae=True, - model_config=None): + use_gae=True): """Constructs the loss for Proximal Policy Objective. Arguments: - action_space: Environment observation space specification. dist_class: action distribution class for logits. value_targets (Placeholder): Placeholder for target values; used for GAE. @@ -53,27 +49,32 @@ class PPOLoss: from previous model evaluation. prev_logits (Placeholder): Placeholder for logits output from previous model evaluation. - prev_actions_logp (Placeholder): Placeholder for prob output from - previous model evaluation. + prev_actions_logp (Placeholder): Placeholder for action prob output + from the previous (before update) Model evaluation. vf_preds (Placeholder): Placeholder for value function output - from previous model evaluation. + from the previous (before update) Model evaluation. curr_action_dist (ActionDistribution): ActionDistribution of the current model. value_fn (Tensor): Current value function output Tensor. cur_kl_coeff (Variable): Variable holding the current PPO KL coefficient. - valid_mask (Tensor): A bool mask of valid input elements (#2992). + valid_mask (Optional[tf.Tensor]): An optional bool mask of valid + input elements (for max-len padded sequences (RNNs)). entropy_coeff (float): Coefficient of the entropy regularizer. clip_param (float): Clip parameter vf_clip_param (float): Clip parameter for the value function vf_loss_coeff (float): Coefficient of the value function loss use_gae (bool): If true, use the Generalized Advantage Estimator. - model_config (dict): (Optional) model config for use in specifying - action distributions. """ + if valid_mask is not None: - def reduce_mean_valid(t): - return tf.reduce_mean(tf.boolean_mask(t, valid_mask)) + def reduce_mean_valid(t): + return tf.reduce_mean(tf.boolean_mask(t, valid_mask)) + + else: + + def reduce_mean_valid(t): + return tf.reduce_mean(t) prev_dist = dist_class(prev_logits, model) # Make loss functions. @@ -112,16 +113,13 @@ def ppo_surrogate_loss(policy, model, dist_class, train_batch): logits, state = model.from_batch(train_batch) action_dist = dist_class(logits, model) + mask = None if state: max_seq_len = tf.reduce_max(train_batch["seq_lens"]) mask = tf.sequence_mask(train_batch["seq_lens"], max_seq_len) mask = tf.reshape(mask, [-1]) - else: - mask = tf.ones_like( - train_batch[Postprocessing.ADVANTAGES], dtype=tf.bool) policy.loss_obj = PPOLoss( - policy.action_space, dist_class, model, train_batch[Postprocessing.VALUE_TARGETS], @@ -139,7 +137,7 @@ def ppo_surrogate_loss(policy, model, dist_class, train_batch): vf_clip_param=policy.config["vf_clip_param"], vf_loss_coeff=policy.config["vf_loss_coeff"], use_gae=policy.config["use_gae"], - model_config=policy.config["model"]) + ) return policy.loss_obj.loss diff --git a/rllib/agents/ppo/ppo_torch_policy.py b/rllib/agents/ppo/ppo_torch_policy.py new file mode 100644 index 000000000..92bd3c8fb --- /dev/null +++ b/rllib/agents/ppo/ppo_torch_policy.py @@ -0,0 +1,223 @@ +import logging + +import ray +from ray.rllib.agents.impala.vtrace_policy import BEHAVIOUR_LOGITS +from ray.rllib.agents.a3c.a3c_torch_policy import apply_grad_clipping +from ray.rllib.agents.ppo.ppo_tf_policy import postprocess_ppo_gae, \ + setup_config +from ray.rllib.evaluation.postprocessing import Postprocessing +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.policy import ACTION_LOGP +from ray.rllib.policy.torch_policy import EntropyCoeffSchedule, \ + LearningRateSchedule +from ray.rllib.policy.torch_policy_template import build_torch_policy +from ray.rllib.utils.explained_variance import explained_variance +from ray.rllib.utils.torch_ops import sequence_mask +from ray.rllib.utils import try_import_torch + +torch, nn = try_import_torch() + +logger = logging.getLogger(__name__) + + +class PPOLoss: + def __init__(self, + dist_class, + model, + value_targets, + advantages, + actions, + prev_logits, + prev_actions_logp, + vf_preds, + curr_action_dist, + value_fn, + cur_kl_coeff, + valid_mask, + entropy_coeff=0, + clip_param=0.1, + vf_clip_param=0.1, + vf_loss_coeff=1.0, + use_gae=True): + """Constructs the loss for Proximal Policy Objective. + + Arguments: + dist_class: action distribution class for logits. + value_targets (Placeholder): Placeholder for target values; used + for GAE. + actions (Placeholder): Placeholder for actions taken + from previous model evaluation. + advantages (Placeholder): Placeholder for calculated advantages + from previous model evaluation. + prev_logits (Placeholder): Placeholder for logits output from + previous model evaluation. + prev_actions_logp (Placeholder): Placeholder for prob output from + previous model evaluation. + vf_preds (Placeholder): Placeholder for value function output + from previous model evaluation. + curr_action_dist (ActionDistribution): ActionDistribution + of the current model. + value_fn (Tensor): Current value function output Tensor. + cur_kl_coeff (Variable): Variable holding the current PPO KL + coefficient. + valid_mask (Tensor): A bool mask of valid input elements (#2992). + entropy_coeff (float): Coefficient of the entropy regularizer. + clip_param (float): Clip parameter + vf_clip_param (float): Clip parameter for the value function + vf_loss_coeff (float): Coefficient of the value function loss + use_gae (bool): If true, use the Generalized Advantage Estimator. + """ + + def reduce_mean_valid(t): + return torch.mean(t * valid_mask) + + prev_dist = dist_class(prev_logits, model) + # Make loss functions. + logp_ratio = torch.exp( + curr_action_dist.logp(actions) - prev_actions_logp) + action_kl = prev_dist.kl(curr_action_dist) + self.mean_kl = reduce_mean_valid(action_kl) + + curr_entropy = curr_action_dist.entropy() + self.mean_entropy = reduce_mean_valid(curr_entropy) + + surrogate_loss = torch.min( + advantages * logp_ratio, + advantages * torch.clamp(logp_ratio, 1 - clip_param, + 1 + clip_param)) + self.mean_policy_loss = reduce_mean_valid(-surrogate_loss) + + if use_gae: + vf_loss1 = torch.pow(value_fn - value_targets, 2.0) + vf_clipped = vf_preds + torch.clamp(value_fn - vf_preds, + -vf_clip_param, vf_clip_param) + vf_loss2 = torch.pow(vf_clipped - value_targets, 2.0) + vf_loss = torch.max(vf_loss1, vf_loss2) + self.mean_vf_loss = reduce_mean_valid(vf_loss) + loss = reduce_mean_valid( + -surrogate_loss + cur_kl_coeff * action_kl + + vf_loss_coeff * vf_loss - entropy_coeff * curr_entropy) + else: + self.mean_vf_loss = 0.0 + loss = reduce_mean_valid(-surrogate_loss + + cur_kl_coeff * action_kl - + entropy_coeff * curr_entropy) + self.loss = loss + + +def ppo_surrogate_loss(policy, model, dist_class, train_batch): + logits, state = model.from_batch(train_batch) + action_dist = dist_class(logits, model) + + if state: + max_seq_len = torch.max(train_batch["seq_lens"]) + mask = sequence_mask(train_batch["seq_lens"], max_seq_len) + mask = torch.reshape(mask, [-1]) + else: + mask = torch.ones_like( + train_batch[Postprocessing.ADVANTAGES], dtype=torch.bool) + + policy.loss_obj = PPOLoss( + dist_class, + model, + train_batch[Postprocessing.VALUE_TARGETS], + train_batch[Postprocessing.ADVANTAGES], + train_batch[SampleBatch.ACTIONS], + train_batch[BEHAVIOUR_LOGITS], + train_batch[ACTION_LOGP], + train_batch[SampleBatch.VF_PREDS], + action_dist, + model.value_function(), + policy.kl_coeff, + mask, + entropy_coeff=policy.entropy_coeff, + clip_param=policy.config["clip_param"], + vf_clip_param=policy.config["vf_clip_param"], + vf_loss_coeff=policy.config["vf_loss_coeff"], + use_gae=policy.config["use_gae"], + ) + + return policy.loss_obj.loss + + +def kl_and_loss_stats(policy, train_batch): + return { + "cur_kl_coeff": policy.kl_coeff, + "cur_lr": policy.cur_lr, + "total_loss": policy.loss_obj.loss.cpu().detach().numpy(), + "policy_loss": policy.loss_obj.mean_policy_loss.cpu().detach().numpy(), + "vf_loss": policy.loss_obj.mean_vf_loss.cpu().detach().numpy(), + "vf_explained_var": explained_variance( + train_batch[Postprocessing.VALUE_TARGETS], + policy.model.value_function(), + framework="torch").cpu().detach().numpy(), + "kl": policy.loss_obj.mean_kl.cpu().detach().numpy(), + "entropy": policy.loss_obj.mean_entropy.cpu().detach().numpy(), + "entropy_coeff": policy.entropy_coeff, + } + + +def vf_preds_and_logits_fetches(policy, input_dict, state_batches, model, + action_dist): + """Adds value function and logits outputs to experience train_batches.""" + return { + SampleBatch.VF_PREDS: policy.model.value_function(), + BEHAVIOUR_LOGITS: policy.model.last_output().numpy(), + ACTION_LOGP: action_dist.logp(input_dict[SampleBatch.ACTIONS]) + } + + +class KLCoeffMixin: + def __init__(self, config): + # KL Coefficient. + self.kl_coeff = config["kl_coeff"] + self.kl_target = config["kl_target"] + + def update_kl(self, sampled_kl): + if sampled_kl > 2.0 * self.kl_target: + self.kl_coeff *= 1.5 + elif sampled_kl < 0.5 * self.kl_target: + self.kl_coeff *= 0.5 + return self.kl_coeff + + +class ValueNetworkMixin: + def __init__(self, obs_space, action_space, config): + if config["use_gae"]: + + def value(ob, prev_action, prev_reward, *state): + model_out, _ = self.model({ + SampleBatch.CUR_OBS: torch.Tensor([ob]), + SampleBatch.PREV_ACTIONS: torch.Tensor([prev_action]), + SampleBatch.PREV_REWARDS: torch.Tensor([prev_reward]), + "is_training": False, + }, [torch.Tensor([s]) for s in state], torch.Tensor([1])) + return self.model.value_function()[0] + + else: + + def value(ob, prev_action, prev_reward, *state): + return 0.0 + + self._value = value + + +def setup_mixins(policy, obs_space, action_space, config): + ValueNetworkMixin.__init__(policy, obs_space, action_space, config) + KLCoeffMixin.__init__(policy, config) + EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"], + config["entropy_coeff_schedule"]) + LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"]) + + +PPOTorchPolicy = build_torch_policy( + name="PPOTorchPolicy", + get_default_config=lambda: ray.rllib.agents.ppo.ppo.DEFAULT_CONFIG, + loss_fn=ppo_surrogate_loss, + stats_fn=kl_and_loss_stats, + extra_action_out_fn=vf_preds_and_logits_fetches, + postprocess_fn=postprocess_ppo_gae, + extra_grad_process_fn=apply_grad_clipping, + before_init=setup_config, + after_init=setup_mixins, + mixins=[KLCoeffMixin, ValueNetworkMixin]) diff --git a/rllib/agents/ppo/test/test.py b/rllib/agents/ppo/tests/test.py similarity index 100% rename from rllib/agents/ppo/test/test.py rename to rllib/agents/ppo/tests/test.py diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py new file mode 100644 index 000000000..6f0b5a977 --- /dev/null +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -0,0 +1,182 @@ +import numpy as np +import unittest + +import ray +from ray.rllib.agents.impala.vtrace_policy import BEHAVIOUR_LOGITS +import ray.rllib.agents.ppo as ppo +from ray.rllib.agents.ppo.ppo_tf_policy import postprocess_ppo_gae as \ + postprocess_ppo_gae_tf, ppo_surrogate_loss as ppo_surrogate_loss_tf +from ray.rllib.agents.ppo.ppo_torch_policy import postprocess_ppo_gae as \ + postprocess_ppo_gae_torch, ppo_surrogate_loss as ppo_surrogate_loss_torch +from ray.rllib.evaluation.postprocessing import Postprocessing +from ray.rllib.models.tf.tf_action_dist import Categorical +from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 +from ray.rllib.models.torch.torch_action_dist import TorchCategorical +from ray.rllib.policy.policy import ACTION_LOGP +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.numpy import fc +from ray.rllib.utils.test_utils import check + + +class TestPPO(unittest.TestCase): + + ray.init() + + def test_ppo_compilation(self): + """Test whether a PPOTrainer can be built with both frameworks.""" + config = ppo.DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + + # tf. + trainer = ppo.PPOTrainer(config=config, env="CartPole-v0") + + num_iterations = 2 + for i in range(num_iterations): + trainer.train() + + # Torch. + config["use_pytorch"] = True + config["simple_optimizer"] = True + trainer = ppo.PPOTrainer(config=config, env="CartPole-v0") + for i in range(num_iterations): + trainer.train() + + def test_ppo_loss_function(self): + """Tests the PPO loss function math.""" + config = ppo.DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + config["eager"] = True + config["gamma"] = 0.99 + config["model"]["fcnet_hiddens"] = [10] + config["model"]["fcnet_activation"] = "linear" + + # Fake CartPole episode of n time steps. + train_batch = { + SampleBatch.CUR_OBS: np.array( + [[0.1, 0.2, 0.3, 0.4], [0.5, 0.6, 0.7, 0.8], + [0.9, 1.0, 1.1, 1.2]], + dtype=np.float32), + SampleBatch.ACTIONS: np.array([0, 1, 1]), + SampleBatch.REWARDS: np.array([1.0, -1.0, .5], dtype=np.float32), + SampleBatch.DONES: np.array([False, False, True]), + SampleBatch.VF_PREDS: np.array([0.5, 0.6, 0.7], dtype=np.float32), + BEHAVIOUR_LOGITS: np.array( + [[-2., 0.5], [-3., -0.3], [-0.1, 2.5]], dtype=np.float32), + ACTION_LOGP: np.array([-0.5, -0.1, -0.2], dtype=np.float32) + } + + # tf. + trainer = ppo.PPOTrainer(config=config, env="CartPole-v0") + policy = trainer.get_policy() + + # Post-process (calculate simple (non-GAE) advantages) and attach to + # train_batch dict. + # A = [0.99^2 * 0.5 + 0.99 * -1.0 + 1.0, 0.99 * 0.5 - 1.0, 0.5] = + # [0.50005, -0.505, 0.5] + train_batch = postprocess_ppo_gae_tf(policy, train_batch) + # Check Advantage values. + check(train_batch[Postprocessing.VALUE_TARGETS], + [0.50005, -0.505, 0.5]) + + # Calculate actual PPO loss (results are stored in policy.loss_obj) for + # tf. + ppo_surrogate_loss_tf(policy, policy.model, Categorical, train_batch) + + vars = policy.model.trainable_variables() + expected_logits = fc( + fc(train_batch[SampleBatch.CUR_OBS], vars[0].numpy(), + vars[1].numpy()), vars[4].numpy(), vars[5].numpy()) + expected_value_outs = fc( + fc(train_batch[SampleBatch.CUR_OBS], vars[2].numpy(), + vars[3].numpy()), vars[6].numpy(), vars[7].numpy()) + + kl, entropy, pg_loss, vf_loss, overall_loss = \ + self._ppo_loss_helper( + policy, policy.model, Categorical, train_batch, + expected_logits, expected_value_outs + ) + check(kl, policy.loss_obj.mean_kl) + check(entropy, policy.loss_obj.mean_entropy) + check(np.mean(-pg_loss), policy.loss_obj.mean_policy_loss) + check(np.mean(vf_loss), policy.loss_obj.mean_vf_loss, decimals=4) + check(policy.loss_obj.loss.numpy(), overall_loss, decimals=4) + + # Torch. + config["use_pytorch"] = True + trainer = ppo.PPOTrainer(config=config, env="CartPole-v0") + policy = trainer.get_policy() + train_batch = postprocess_ppo_gae_torch(policy, train_batch) + train_batch = policy._lazy_tensor_dict(train_batch) + + # Check Advantage values. + check(train_batch[Postprocessing.VALUE_TARGETS], + [0.50005, -0.505, 0.5]) + + # Calculate actual PPO loss (results are stored in policy.loss_obj) + # for tf. + ppo_surrogate_loss_torch(policy, policy.model, TorchCategorical, + train_batch) + + kl, entropy, pg_loss, vf_loss, overall_loss = \ + self._ppo_loss_helper( + policy, policy.model, TorchCategorical, train_batch, + policy.model._last_output, + policy.model.value_function().detach().numpy() + ) + check(kl, policy.loss_obj.mean_kl.detach().numpy()) + check(entropy, policy.loss_obj.mean_entropy.detach().numpy()) + check( + np.mean(-pg_loss), + policy.loss_obj.mean_policy_loss.detach().numpy()) + check( + np.mean(vf_loss), + policy.loss_obj.mean_vf_loss.detach().numpy(), + decimals=4) + check(policy.loss_obj.loss.detach().numpy(), overall_loss, decimals=4) + + def _ppo_loss_helper(self, policy, model, dist_class, train_batch, logits, + vf_outs): + """ + Calculates the expected PPO loss (components) given Policy, + Model, distribution, some batch, logits & vf outputs, using numpy. + """ + # Calculate expected PPO loss results. + dist = dist_class(logits, policy.model) + dist_prev = dist_class(train_batch[BEHAVIOUR_LOGITS], policy.model) + expected_logp = dist.logp(train_batch[SampleBatch.ACTIONS]) + if isinstance(model, TorchModelV2): + expected_rho = np.exp(expected_logp.detach().numpy() - + train_batch.get(ACTION_LOGP)) + # KL(prev vs current action dist)-loss component. + kl = np.mean(dist_prev.kl(dist).detach().numpy()) + # Entropy-loss component. + entropy = np.mean(dist.entropy().detach().numpy()) + else: + expected_rho = np.exp(expected_logp - train_batch[ACTION_LOGP]) + # KL(prev vs current action dist)-loss component. + kl = np.mean(dist_prev.kl(dist)) + # Entropy-loss component. + entropy = np.mean(dist.entropy()) + + # Policy loss component. + pg_loss = np.minimum( + train_batch.get(Postprocessing.ADVANTAGES) * expected_rho, + train_batch.get(Postprocessing.ADVANTAGES) * np.clip( + expected_rho, 1 - policy.config["clip_param"], + 1 + policy.config["clip_param"])) + + # Value function loss component. + vf_loss1 = np.power( + vf_outs - train_batch.get(Postprocessing.VALUE_TARGETS), 2.0) + vf_clipped = train_batch.get(SampleBatch.VF_PREDS) + np.clip( + vf_outs - train_batch.get(SampleBatch.VF_PREDS), + -policy.config["vf_clip_param"], policy.config["vf_clip_param"]) + vf_loss2 = np.power( + vf_clipped - train_batch.get(Postprocessing.VALUE_TARGETS), 2.0) + vf_loss = np.maximum(vf_loss1, vf_loss2) + + # Overall loss. + overall_loss = np.mean(-pg_loss + policy.kl_coeff * kl + + policy.config["vf_loss_coeff"] * vf_loss - + policy.entropy_coeff * entropy) + return kl, entropy, pg_loss, vf_loss, overall_loss diff --git a/rllib/evaluation/postprocessing.py b/rllib/evaluation/postprocessing.py index acb201781..62c6365ec 100644 --- a/rllib/evaluation/postprocessing.py +++ b/rllib/evaluation/postprocessing.py @@ -17,7 +17,8 @@ class Postprocessing: @DeveloperAPI def compute_advantages(rollout, last_r, gamma=0.9, lambda_=1.0, use_gae=True): - """Given a rollout, compute its value targets and the advantage. + """ + Given a rollout, compute its value targets and the advantage. Args: rollout (SampleBatch): SampleBatch of a single trajectory @@ -43,7 +44,7 @@ def compute_advantages(rollout, last_r, gamma=0.9, lambda_=1.0, use_gae=True): np.array([last_r])]) delta_t = ( traj[SampleBatch.REWARDS] + gamma * vpred_t[1:] - vpred_t[:-1]) - # This formula for the advantage comes + # This formula for the advantage comes from: # "Generalized Advantage Estimation": https://arxiv.org/abs/1506.02438 traj[Postprocessing.ADVANTAGES] = discount(delta_t, gamma * lambda_) traj[Postprocessing.VALUE_TARGETS] = ( diff --git a/rllib/examples/centralized_critic.py b/rllib/examples/centralized_critic.py index 9a5fdb54e..8679a3ef1 100644 --- a/rllib/examples/centralized_critic.py +++ b/rllib/examples/centralized_critic.py @@ -17,9 +17,10 @@ import numpy as np from gym.spaces import Discrete from ray import tune +from ray.rllib.agents.impala.vtrace_policy import BEHAVIOUR_LOGITS from ray.rllib.agents.ppo.ppo import PPOTrainer -from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy, KLCoeffMixin, \ - PPOLoss, BEHAVIOUR_LOGITS +from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy, KLCoeffMixin, \ + PPOLoss from ray.rllib.evaluation.postprocessing import compute_advantages, \ Postprocessing from ray.rllib.examples.twostep_game import TwoStepGame diff --git a/rllib/examples/multiagent_two_trainers.py b/rllib/examples/multiagent_two_trainers.py index be3b102b4..f3580faf4 100644 --- a/rllib/examples/multiagent_two_trainers.py +++ b/rllib/examples/multiagent_two_trainers.py @@ -15,7 +15,7 @@ import ray from ray.rllib.agents.dqn.dqn import DQNTrainer from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy from ray.rllib.agents.ppo.ppo import PPOTrainer -from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy +from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy from ray.rllib.tests.test_multi_agent_env import MultiCartpole from ray.tune.logger import pretty_print from ray.tune.registry import register_env diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index 76c669cc2..f55f13b95 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -24,6 +24,7 @@ from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils import try_import_tf from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI from ray.rllib.utils.error import UnsupportedSpaceException +from ray.rllib.utils.deprecation import deprecation_warning tf = try_import_tf() @@ -105,20 +106,31 @@ class ModelCatalog: @staticmethod @DeveloperAPI - def get_action_dist(action_space, config, dist_type=None, torch=False): - """Returns action distribution class and size for the given action space. + def get_action_dist( + action_space, config, dist_type=None, torch=None, + framework="tf" + ): + """ + Returns action distribution class and size for the given action space. Args: action_space (Space): Action space of the target gym env. config (dict): Optional model config. dist_type (str): Optional identifier of the action distribution. - torch (bool): Optional whether to return PyTorch distribution. + torch (bool): Obsoleted: Whether to return PyTorch Model and + distribution (use framework="torch" instead). + framework (str): One of "tf" or "torch". Returns: dist_class (ActionDistribution): Python class of the distribution. dist_dim (int): The size of the input vector to the distribution. """ + # Obsoleted parameter `torch`: + if torch is not None: + deprecation_warning("`torch` parameter", "`framework`='tf|torch'") + framework = "torch" if torch else "tf" + dist = None config = config or MODEL_DEFAULTS if config.get("custom_action_dist"): action_dist_name = config["custom_action_dist"] @@ -135,15 +147,17 @@ class ModelCatalog: "using a custom action distribution, " "using a Tuple action space, or the multi-agent API.") if dist_type is None: - dist = TorchDiagGaussian if torch else DiagGaussian + dist = DiagGaussian if framework == "tf" else TorchDiagGaussian elif dist_type == "deterministic": dist = Deterministic elif isinstance(action_space, gym.spaces.Discrete): - dist = TorchCategorical if torch else Categorical + dist = Categorical if framework == "tf" else TorchCategorical elif isinstance(action_space, gym.spaces.Tuple): - if torch: - raise NotImplementedError("Tuple action spaces not supported " - "for Pytorch.") + if framework == "torch": + # TODO(sven): implement + raise NotImplementedError( + "Tuple action spaces not supported for Pytorch." + ) child_dist = [] input_lens = [] for action in action_space.spaces: @@ -157,26 +171,33 @@ class ModelCatalog: action_space=action_space, input_lens=input_lens), sum(input_lens) elif isinstance(action_space, Simplex): - if torch: - raise NotImplementedError("Simplex action spaces not " - "supported for Pytorch.") + if framework == "torch": + # TODO(sven): implement + raise NotImplementedError( + "Simplex action spaces not supported for Pytorch." + ) dist = Dirichlet elif isinstance(action_space, gym.spaces.MultiDiscrete): - if torch: - raise NotImplementedError("MultiDiscrete action spaces not " - "supported for Pytorch.") + if framework == "torch": + # TODO(sven): implement + raise NotImplementedError( + "MultiDiscrete action spaces not supported for Pytorch." + ) return partial(MultiCategorical, input_lens=action_space.nvec), \ int(sum(action_space.nvec)) elif isinstance(action_space, gym.spaces.Dict): + # TODO(sven): implement raise NotImplementedError( "Dict action spaces are not supported, consider using " - "gym.spaces.Tuple instead") + "gym.spaces.Tuple instead" + ) + else: + raise NotImplementedError( + "Unsupported args: {} {}".format(action_space, dist_type) + ) return dist, dist.required_model_output_shape(action_space, config) - raise NotImplementedError("Unsupported args: {} {}".format( - action_space, dist_type)) - @staticmethod @DeveloperAPI def get_action_shape(action_space): diff --git a/rllib/policy/policy.py b/rllib/policy/policy.py index 37e1ac2b5..081d8f384 100644 --- a/rllib/policy/policy.py +++ b/rllib/policy/policy.py @@ -221,7 +221,7 @@ class Policy(metaclass=ABCMeta): Returns: weights (obj): Serializable copy or view of model weights """ - pass + raise NotImplementedError @DeveloperAPI def set_weights(self, weights): @@ -230,7 +230,7 @@ class Policy(metaclass=ABCMeta): Arguments: weights (obj): Serializable copy or view of model weights """ - pass + raise NotImplementedError @DeveloperAPI def num_state_tensors(self): diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index ab9efa692..80d99af3f 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -72,9 +72,10 @@ class TorchPolicy(Policy): logits, state = model_out action_dist = self.dist_class(logits, self.model) actions = action_dist.sample() + input_dict[SampleBatch.ACTIONS] = actions return (actions.cpu().numpy(), [h.cpu().numpy() for h in state], self.extra_action_out(input_dict, state_batches, - self.model)) + self.model, action_dist)) @override(Policy) def learn_on_batch(self, postprocessed_batch): diff --git a/rllib/policy/torch_policy_template.py b/rllib/policy/torch_policy_template.py index 2ed1495a9..4e08b43b5 100644 --- a/rllib/policy/torch_policy_template.py +++ b/rllib/policy/torch_policy_template.py @@ -1,6 +1,7 @@ from ray.rllib.policy.policy import Policy from ray.rllib.policy.torch_policy import TorchPolicy from ray.rllib.models.catalog import ModelCatalog +from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 from ray.rllib.utils import add_mixins from ray.rllib.utils.annotations import override, DeveloperAPI @@ -67,9 +68,13 @@ def build_torch_policy(name, if make_model_and_action_dist: self.model, self.dist_class = make_model_and_action_dist( self, obs_space, action_space, config) + # Make sure, we passed in a correct Model factory. + assert isinstance(self.model, TorchModelV2), \ + "ERROR: TorchPolicy::make_model_and_action_dist must " \ + "return a TorchModelV2 object!" else: self.dist_class, logit_dim = ModelCatalog.get_action_dist( - action_space, self.config["model"], torch=True) + action_space, self.config["model"], framework="torch") self.model = ModelCatalog.get_model_v2( obs_space, action_space, diff --git a/rllib/tests/test_env_with_subprocess.py b/rllib/tests/test_env_with_subprocess.py index 3daacb438..3a4d92a18 100644 --- a/rllib/tests/test_env_with_subprocess.py +++ b/rllib/tests/test_env_with_subprocess.py @@ -30,11 +30,11 @@ class EnvWithSubprocess(gym.Env): self.subproc = subprocess.Popen(UNIQUE_CMD.split(" "), shell=False) self.config = config # Exit handler should be called + atexit.register(lambda: self.subproc.kill()) if config.worker_index == 0: atexit.register(lambda: os.unlink(UNIQUE_FILE_0)) else: atexit.register(lambda: os.unlink(UNIQUE_FILE_1)) - atexit.register(lambda: self.subproc.kill()) def close(self): if self.config.worker_index == 0: @@ -76,7 +76,7 @@ if __name__ == "__main__": }, }, }) - time.sleep(5.0) + time.sleep(10.0) leaked = leaked_processes() assert not leaked, "LEAKED PROCESSES: {}".format(leaked) assert not os.path.exists(UNIQUE_FILE_0), "atexit handler not called" diff --git a/rllib/tests/test_legacy.py b/rllib/tests/test_legacy.py deleted file mode 100644 index 5cbb428c4..000000000 --- a/rllib/tests/test_legacy.py +++ /dev/null @@ -1,11 +0,0 @@ -from ray.rllib.agents.ppo import PPOAgent -from ray import tune -import ray - -if __name__ == "__main__": - ray.init() - # Test legacy *Agent classes work (renamed to Trainer) - tune.run( - PPOAgent, - config={"env": "CartPole-v0"}, - stop={"training_iteration": 2}) diff --git a/rllib/tests/test_multi_agent_env.py b/rllib/tests/test_multi_agent_env.py index 89d90de9d..cc0db3322 100644 --- a/rllib/tests/test_multi_agent_env.py +++ b/rllib/tests/test_multi_agent_env.py @@ -10,7 +10,7 @@ from ray.rllib.optimizers import (SyncSamplesOptimizer, SyncReplayOptimizer, AsyncGradientsOptimizer) from ray.rllib.tests.test_rollout_worker import (MockEnv, MockEnv2, MockPolicy) from ray.rllib.evaluation.rollout_worker import RolloutWorker -from ray.rllib.policy.policy import Policy +from ray.rllib.policy.tests.test_policy import TestPolicy from ray.rllib.evaluation.metrics import collect_metrics from ray.rllib.evaluation.worker_set import WorkerSet from ray.rllib.env.base_env import _MultiAgentEnvToBaseEnv @@ -441,7 +441,7 @@ class TestMultiAgentEnv(unittest.TestCase): def test_custom_rnn_state_values(self): h = {"some": {"arbitrary": "structure", "here": [1, 2, 3]}} - class StatefulPolicy(Policy): + class StatefulPolicy(TestPolicy): def compute_actions(self, obs_batch, state_batches=None, diff --git a/rllib/tests/test_optimizers.py b/rllib/tests/test_optimizers.py index 870a352d3..c9037f6d0 100644 --- a/rllib/tests/test_optimizers.py +++ b/rllib/tests/test_optimizers.py @@ -5,7 +5,7 @@ import unittest import ray from ray.rllib.agents.ppo import PPOTrainer -from ray.rllib.agents.ppo.ppo_policy import PPOTFPolicy +from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy from ray.rllib.evaluation import SampleBatch from ray.rllib.evaluation.rollout_worker import RolloutWorker from ray.rllib.evaluation.worker_set import WorkerSet diff --git a/rllib/tests/test_rollout_worker.py b/rllib/tests/test_rollout_worker.py index fc5914f5a..f6bb2bac4 100644 --- a/rllib/tests/test_rollout_worker.py +++ b/rllib/tests/test_rollout_worker.py @@ -10,14 +10,14 @@ from ray.rllib.agents.pg import PGTrainer from ray.rllib.agents.a3c import A2CTrainer from ray.rllib.evaluation.rollout_worker import RolloutWorker from ray.rllib.evaluation.metrics import collect_metrics -from ray.rllib.policy.policy import Policy +from ray.rllib.policy.tests.test_policy import TestPolicy from ray.rllib.evaluation.postprocessing import compute_advantages from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, SampleBatch from ray.rllib.env.vector_env import VectorEnv from ray.tune.registry import register_env -class MockPolicy(Policy): +class MockPolicy(TestPolicy): def compute_actions(self, obs_batch, state_batches=None, diff --git a/rllib/tuned_examples/compact-regression-test.yaml b/rllib/tuned_examples/compact-regression-test.yaml index 8a53120ac..0b29990e2 100644 --- a/rllib/tuned_examples/compact-regression-test.yaml +++ b/rllib/tuned_examples/compact-regression-test.yaml @@ -22,7 +22,7 @@ atari-impala: [20000000, 0.000000000001], ] num_gpus: 1 -atari-ppo: +atari-ppo-tf: env: BreakoutNoFrameskip-v4 run: PPO num_samples: 4 @@ -45,6 +45,30 @@ atari-ppo: observation_filter: NoFilter vf_share_layers: true num_gpus: 1 +atari-ppo-torch: + env: BreakoutNoFrameskip-v4 + run: PPO + num_samples: 4 + stop: + time_total_s: 3600 + config: + use_pytorch: true, + lambda: 0.95 + kl_coeff: 0.5 + clip_rewards: True + clip_param: 0.1 + vf_clip_param: 10.0 + entropy_coeff: 0.01 + train_batch_size: 5000 + sample_batch_size: 100 + sgd_minibatch_size: 500 + num_sgd_iter: 10 + num_workers: 10 + num_envs_per_worker: 5 + batch_mode: truncate_episodes + observation_filter: NoFilter + vf_share_layers: true + num_gpus: 1 apex: env: BreakoutNoFrameskip-v4 run: APEX diff --git a/rllib/tuned_examples/regression_tests/cartpole-ppo.yaml b/rllib/tuned_examples/regression_tests/cartpole-ppo-tf.yaml similarity index 92% rename from rllib/tuned_examples/regression_tests/cartpole-ppo.yaml rename to rllib/tuned_examples/regression_tests/cartpole-ppo-tf.yaml index d34b35280..09bf5dafd 100644 --- a/rllib/tuned_examples/regression_tests/cartpole-ppo.yaml +++ b/rllib/tuned_examples/regression_tests/cartpole-ppo-tf.yaml @@ -1,4 +1,4 @@ -cartpole-ppo: +cartpole-ppo-tf: env: CartPole-v0 run: PPO stop: diff --git a/rllib/utils/deprecation.py b/rllib/utils/deprecation.py index fd6628925..9f3accdfe 100644 --- a/rllib/utils/deprecation.py +++ b/rllib/utils/deprecation.py @@ -5,9 +5,16 @@ logger = logging.getLogger(__name__) def deprecation_warning(old, new=None): + """ + Logs a deprecation warning via the `logger` object. + + Args: + old (str): A description of the "thing" that is to be deprecated. + new (Optional[str]): A description of the new "thing" that replaces it. + """ logger.warning( - "DeprecationWarning: `{}` has been deprecated.".format(old) + - (" Use `{}` instead." if new else "") + + "DeprecationWarning: `{}` has been deprecated.{}". + format(old, (" Use `{}` instead.".format(new) if new else "")) + " This will raise an error in the future!" ) diff --git a/rllib/utils/explained_variance.py b/rllib/utils/explained_variance.py index 6db6c0d9b..555396a78 100644 --- a/rllib/utils/explained_variance.py +++ b/rllib/utils/explained_variance.py @@ -12,4 +12,10 @@ def explained_variance(y, pred, framework="tf"): else: y_var = torch.var(y, dim=[0]) diff_var = torch.var(y - pred, dim=[0]) - return max(-1.0, 1 - (diff_var / y_var)) + min_ = torch.Tensor([-1.0]) + return torch.max( + min_.to( + device=torch.device("cuda") + ) if torch.cuda.is_available() else min_, + 1 - (diff_var / y_var) + )