From 4fd8977eafd658056c92ffa6294027e77cdc46c0 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Thu, 25 Jun 2020 19:01:32 +0200 Subject: [PATCH] [RLlib] Minor cleanup in preparation to tf2.x support. (#9130) * WIP. * Fixes. * LINT. * Fixes. * Fixes and LINT. * WIP. --- python/ray/experimental/tf_utils.py | 4 +- rllib/BUILD | 21 ++-- rllib/agents/a3c/a3c_tf_policy.py | 7 +- rllib/agents/ddpg/ddpg.py | 9 +- rllib/agents/ddpg/ddpg_tf_policy.py | 42 +++---- rllib/agents/ddpg/ddpg_torch_policy.py | 3 +- rllib/agents/dqn/distributional_q_tf_model.py | 6 +- rllib/agents/dqn/dqn_tf_policy.py | 4 +- rllib/agents/dqn/simple_q_torch_policy.py | 2 +- rllib/agents/dyna/dyna_torch_model.py | 2 +- rllib/agents/impala/vtrace_tf_policy.py | 8 +- rllib/agents/marwil/marwil_tf_policy.py | 10 +- rllib/agents/ppo/appo_tf_policy.py | 8 +- rllib/agents/ppo/ppo_tf_policy.py | 4 +- .../qmix/tests/test_qmix.py} | 0 rllib/agents/sac/sac.py | 12 +- rllib/agents/sac/sac_tf_policy.py | 29 +++-- rllib/agents/sac/sac_torch_policy.py | 3 +- rllib/agents/sac/tests/test_sac.py | 5 +- rllib/agents/trainer.py | 24 ++-- rllib/agents/trainer_template.py | 54 ++++----- .../models/autoregressive_action_dist.py | 24 ++-- .../models/parametric_actions_model.py | 2 +- rllib/models/catalog.py | 3 +- rllib/models/tf/__init__.py | 3 +- rllib/models/tf/layers/__init__.py | 8 +- rllib/models/tf/layers/noisy_layer.py | 105 ++++++++++++++++++ rllib/models/tf/tf_action_dist.py | 29 ++--- rllib/policy/eager_tf_policy.py | 4 + rllib/policy/tf_policy_template.py | 7 ++ rllib/policy/torch_policy_template.py | 9 +- rllib/tests/test_dependency.py | 2 + rllib/tests/test_execution.py | 2 +- rllib/tests/test_rollout_worker.py | 59 ++++++---- rllib/utils/exploration/parameter_noise.py | 2 +- rllib/utils/tf_ops.py | 2 +- rllib/utils/types.py | 5 +- 37 files changed, 347 insertions(+), 176 deletions(-) rename rllib/{tests/test_avail_actions_qmix.py => agents/qmix/tests/test_qmix.py} (100%) create mode 100644 rllib/models/tf/layers/noisy_layer.py diff --git a/python/ray/experimental/tf_utils.py b/python/ray/experimental/tf_utils.py index 6bc1255a1..c528d94d6 100644 --- a/python/ray/experimental/tf_utils.py +++ b/python/ray/experimental/tf_utils.py @@ -42,8 +42,8 @@ class TensorFlowVariables: Args: output (tf.Operation, List[tf.Operation]): The tensorflow operation to extract all variables from. - sess (tf.Session): Session used for running the get and set - methods. + sess (Optional[tf.Session]): Optional tf.Session used for running + the get and set methods in tf graph mode. input_variables (List[tf.Variables]): Variables to include in the list. """ diff --git a/rllib/BUILD b/rllib/BUILD index 3691e247b..5712b1b81 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -496,7 +496,7 @@ py_test( "agents/ppo/tests/test.py"] # TODO(sven): Move down once PR 6889 merged ) -# DDPPO +# PPO: DDPPO py_test( name = "test_ddppo", tags = ["agents_dir"], @@ -504,7 +504,7 @@ py_test( srcs = ["agents/ppo/tests/test_ddppo.py"] ) -# APPO +# PPO: APPO py_test( name = "test_appo", tags = ["agents_dir"], @@ -512,7 +512,15 @@ py_test( srcs = ["agents/ppo/tests/test_appo.py"] ) -# SAC +# QMixTrainer +py_test( + name = "test_qmix", + tags = ["agents_dir"], + size = "medium", + srcs = ["agents/qmix/tests/test_qmix.py"] +) + +# SACTrainer py_test( name = "test_sac", tags = ["agents_dir"], @@ -1103,13 +1111,6 @@ py_test( srcs = ["tests/test_attention_net_learning.py"] ) -py_test( - name = "tests/test_avail_actions_qmix", - tags = ["tests_dir", "tests_dir_A"], - size = "medium", - srcs = ["tests/test_avail_actions_qmix.py"] -) - py_test( name = "tests/test_catalog", tags = ["tests_dir", "tests_dir_C"], diff --git a/rllib/agents/a3c/a3c_tf_policy.py b/rllib/agents/a3c/a3c_tf_policy.py index c0d4158c0..8c2d9146a 100644 --- a/rllib/agents/a3c/a3c_tf_policy.py +++ b/rllib/agents/a3c/a3c_tf_policy.py @@ -27,7 +27,7 @@ class A3CLoss: self.pi_loss = -tf.reduce_sum(log_prob * advantages) delta = vf - v_target - self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta)) + self.vf_loss = 0.5 * tf.reduce_sum(tf.math.square(delta)) self.entropy = tf.reduce_sum(action_dist.entropy()) self.total_loss = (self.pi_loss + self.vf_loss * vf_loss_coeff - self.entropy * entropy_coeff) @@ -90,14 +90,15 @@ def stats(policy, train_batch): "cur_lr": tf.cast(policy.cur_lr, tf.float64), "policy_loss": policy.loss.pi_loss, "policy_entropy": policy.loss.entropy, - "var_gnorm": tf.global_norm(list(policy.model.trainable_variables())), + "var_gnorm": tf.linalg.global_norm( + list(policy.model.trainable_variables())), "vf_loss": policy.loss.vf_loss, } def grad_stats(policy, train_batch, grads): return { - "grad_gnorm": tf.global_norm(grads), + "grad_gnorm": tf.linalg.global_norm(grads), "vf_explained_var": explained_variance( train_batch[Postprocessing.VALUE_TARGETS], policy.model.value_function()), diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index 7aa30685a..a61e7778b 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -157,6 +157,12 @@ DEFAULT_CONFIG = with_common_config({ def validate_config(config): + if config["model"]["custom_model"]: + logger.warning( + "Setting use_state_preprocessor=True since a custom model " + "was specified.") + config["use_state_preprocessor"] = True + # TODO(sven): Remove at some point. # Backward compatibility of noise-based exploration config. schedule_max_timesteps = None @@ -191,8 +197,7 @@ def validate_config(config): if config.get("parameter_noise", DEPRECATED_VALUE) != DEPRECATED_VALUE: deprecation_warning("parameter_noise", "exploration_config={" - "type=ParameterNoise" - "}") + "type=ParameterNoise}") if config["exploration_config"]["type"] == "ParameterNoise": if config["batch_mode"] != "complete_episodes": diff --git a/rllib/agents/ddpg/ddpg_tf_policy.py b/rllib/agents/ddpg/ddpg_tf_policy.py index a8862beff..4d9730254 100644 --- a/rllib/agents/ddpg/ddpg_tf_policy.py +++ b/rllib/agents/ddpg/ddpg_tf_policy.py @@ -15,9 +15,9 @@ from ray.rllib.models import ModelCatalog from ray.rllib.models.tf.tf_action_dist import Deterministic from ray.rllib.models.torch.torch_action_dist import TorchDeterministic from ray.rllib.utils.annotations import override -from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.tf_ops import huber_loss, minimize_and_clip, \ make_tf_callable @@ -36,22 +36,6 @@ TWIN_Q_TARGET_SCOPE = "twin_target_critic" def build_ddpg_models(policy, observation_space, action_space, config): - if config["model"]["custom_model"]: - logger.warning( - "Setting use_state_preprocessor=True since a custom model " - "was specified.") - config["use_state_preprocessor"] = True - - if not isinstance(action_space, Box): - raise UnsupportedSpaceException( - "Action space {} is not supported for DDPG.".format(action_space)) - elif len(action_space.shape) > 1: - raise UnsupportedSpaceException( - "Action space has multiple dimensions " - "{}. ".format(action_space.shape) + - "Consider reshaping this into a single dimension, " - "using a Tuple action space, or the multi-agent API.") - if policy.config["use_state_preprocessor"]: default_model = None # catalog decides num_outputs = 256 # arbitrary @@ -157,7 +141,7 @@ def ddpg_actor_critic_loss(policy, model, _, train_batch): if policy.config["smooth_target_policy"]: target_noise_clip = policy.config["target_noise_clip"] clipped_normal_sample = tf.clip_by_value( - tf.random_normal( + tf.random.normal( tf.shape(policy_tp1), stddev=policy.config["target_noise"]), -target_noise_clip, target_noise_clip) @@ -219,15 +203,17 @@ def ddpg_actor_critic_loss(policy, model, _, train_batch): errors = huber_loss(td_error, huber_threshold) + \ huber_loss(twin_td_error, huber_threshold) else: - errors = 0.5 * tf.square(td_error) + 0.5 * tf.square(twin_td_error) + errors = 0.5 * tf.math.square(td_error) + \ + 0.5 * tf.math.square(twin_td_error) else: td_error = q_t_selected - q_t_selected_target if use_huber: errors = huber_loss(td_error, huber_threshold) else: - errors = 0.5 * tf.square(td_error) + errors = 0.5 * tf.math.square(td_error) - critic_loss = tf.reduce_mean(train_batch[PRIO_WEIGHTS] * errors) + critic_loss = tf.reduce_mean( + tf.cast(train_batch[PRIO_WEIGHTS], tf.float32) * errors) actor_loss = -tf.reduce_mean(q_t_det_policy) # Add l2-regularization if required. @@ -417,6 +403,19 @@ def setup_late_mixins(policy, obs_space, action_space, config): TargetNetworkMixin.__init__(policy, config) +def validate_spaces(pid, observation_space, action_space, config): + if not isinstance(action_space, Box): + raise UnsupportedSpaceException( + "Action space ({}) of {} is not supported for " + "DDPG.".format(action_space, pid)) + elif len(action_space.shape) > 1: + raise UnsupportedSpaceException( + "Action space ({}) of {} has multiple dimensions " + "{}. ".format(action_space, pid, action_space.shape) + + "Consider reshaping this into a single dimension, " + "using a Tuple action space, or the multi-agent API.") + + DDPGTFPolicy = build_tf_policy( name="DDPGTFPolicy", get_default_config=lambda: ray.rllib.agents.ddpg.ddpg.DEFAULT_CONFIG, @@ -429,6 +428,7 @@ DDPGTFPolicy = build_tf_policy( gradients_fn=gradients_fn, apply_gradients_fn=build_apply_op, extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, + validate_spaces=validate_spaces, before_init=before_init_fn, before_loss_init=setup_mid_mixins, after_init=setup_late_mixins, diff --git a/rllib/agents/ddpg/ddpg_torch_policy.py b/rllib/agents/ddpg/ddpg_torch_policy.py index 1c8d1d7a9..b3f49f102 100644 --- a/rllib/agents/ddpg/ddpg_torch_policy.py +++ b/rllib/agents/ddpg/ddpg_torch_policy.py @@ -2,7 +2,7 @@ import logging import ray from ray.rllib.agents.ddpg.ddpg_tf_policy import build_ddpg_models, \ - get_distribution_inputs_and_class + get_distribution_inputs_and_class, validate_spaces from ray.rllib.agents.dqn.dqn_tf_policy import postprocess_nstep_and_prio, \ PRIO_WEIGHTS from ray.rllib.models.torch.torch_action_dist import TorchDeterministic @@ -269,6 +269,7 @@ DDPGTorchPolicy = build_torch_policy( postprocess_fn=postprocess_nstep_and_prio, extra_grad_process_fn=gradients_fn, optimizer_fn=make_ddpg_optimizers, + validate_spaces=validate_spaces, before_init=before_init_fn, after_init=setup_late_mixins, action_distribution_fn=get_distribution_inputs_and_class, diff --git a/rllib/agents/dqn/distributional_q_tf_model.py b/rllib/agents/dqn/distributional_q_tf_model.py index a3cf1de8b..c3e936f8e 100644 --- a/rllib/agents/dqn/distributional_q_tf_model.py +++ b/rllib/agents/dqn/distributional_q_tf_model.py @@ -234,8 +234,8 @@ class DistributionalQTFModel(TFModelV2): """ in_size = int(action_in.shape[1]) - epsilon_in = tf.random_normal(shape=[in_size]) - epsilon_out = tf.random_normal(shape=[out_size]) + epsilon_in = tf.random.normal(shape=[in_size]) + epsilon_out = tf.random.normal(shape=[out_size]) epsilon_in = self._f_epsilon(epsilon_in) epsilon_out = self._f_epsilon(epsilon_out) epsilon_w = tf.matmul( @@ -279,4 +279,4 @@ class DistributionalQTFModel(TFModelV2): return tf.nn.relu(action_activation) def _f_epsilon(self, x): - return tf.sign(x) * tf.sqrt(tf.abs(x)) + return tf.math.sign(x) * tf.math.sqrt(tf.math.abs(x)) diff --git a/rllib/agents/dqn/dqn_tf_policy.py b/rllib/agents/dqn/dqn_tf_policy.py index 7838fe617..c5e13bf5e 100644 --- a/rllib/agents/dqn/dqn_tf_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -54,11 +54,11 @@ class QLoss: r_tau = tf.clip_by_value(r_tau, v_min, v_max) b = (r_tau - v_min) / ((v_max - v_min) / float(num_atoms - 1)) lb = tf.floor(b) - ub = tf.ceil(b) + ub = tf.math.ceil(b) # indispensable judgement which is missed in most implementations # when b happens to be an integer, lb == ub, so pr_j(s', a*) will # be discarded because (ub-b) == (b-lb) == 0 - floor_equal_ceil = tf.to_float(tf.less(ub - lb, 0.5)) + floor_equal_ceil = tf.cast(tf.less(ub - lb, 0.5), tf.float32) l_project = tf.one_hot( tf.cast(lb, dtype=tf.int32), diff --git a/rllib/agents/dqn/simple_q_torch_policy.py b/rllib/agents/dqn/simple_q_torch_policy.py index da584a465..bc9336ec5 100644 --- a/rllib/agents/dqn/simple_q_torch_policy.py +++ b/rllib/agents/dqn/simple_q_torch_policy.py @@ -53,7 +53,7 @@ def build_q_losses(policy, model, dist_class, train_batch): is_training=True) # q scores for actions which we know were selected in the given state. - one_hot_selection = F.one_hot(train_batch[SampleBatch.ACTIONS], + one_hot_selection = F.one_hot(train_batch[SampleBatch.ACTIONS].long(), policy.action_space.n) q_t_selected = torch.sum(q_t * one_hot_selection, 1) diff --git a/rllib/agents/dyna/dyna_torch_model.py b/rllib/agents/dyna/dyna_torch_model.py index 6c748b7aa..b02eaa9bb 100644 --- a/rllib/agents/dyna/dyna_torch_model.py +++ b/rllib/agents/dyna/dyna_torch_model.py @@ -50,7 +50,7 @@ class DYNATorchModel(TorchModelV2, nn.Module): # One-hot the actions. actions_flat = nn.functional.one_hot( - actions, num_classes=self.action_space.n).float() + actions.long(), num_classes=self.action_space.n).float() # Push through our underlying Model. next_obs, _ = self.forward({ "obs_flat": torch.cat([observations, actions_flat], -1) diff --git a/rllib/agents/impala/vtrace_tf_policy.py b/rllib/agents/impala/vtrace_tf_policy.py index 61816ff39..a8baf9bbf 100644 --- a/rllib/agents/impala/vtrace_tf_policy.py +++ b/rllib/agents/impala/vtrace_tf_policy.py @@ -80,7 +80,7 @@ class VTraceLoss: behaviour_policy_logits=behaviour_logits, target_policy_logits=target_logits, actions=tf.unstack(actions, axis=2), - discounts=tf.to_float(~dones) * discount, + discounts=tf.cast(~dones, tf.float32) * discount, rewards=rewards, values=values, bootstrap_value=bootstrap_value, @@ -98,7 +98,7 @@ class VTraceLoss: # The baseline loss. delta = tf.boolean_mask(values - self.vtrace_returns.vs, valid_mask) - self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta)) + self.vf_loss = 0.5 * tf.reduce_sum(tf.math.square(delta)) # The entropy loss. self.entropy = tf.reduce_sum( @@ -228,7 +228,7 @@ def stats(policy, train_batch): "policy_loss": policy.loss.pi_loss, "entropy": policy.loss.entropy, "entropy_coeff": tf.cast(policy.entropy_coeff, tf.float64), - "var_gnorm": tf.global_norm(policy.model.trainable_variables()), + "var_gnorm": tf.linalg.global_norm(policy.model.trainable_variables()), "vf_loss": policy.loss.vf_loss, "vf_explained_var": explained_variance( tf.reshape(policy.loss.value_targets, [-1]), @@ -238,7 +238,7 @@ def stats(policy, train_batch): def grad_stats(policy, train_batch, grads): return { - "grad_gnorm": tf.global_norm(grads), + "grad_gnorm": tf.linalg.global_norm(grads), } diff --git a/rllib/agents/marwil/marwil_tf_policy.py b/rllib/agents/marwil/marwil_tf_policy.py index 06d5f6848..947142f1d 100644 --- a/rllib/agents/marwil/marwil_tf_policy.py +++ b/rllib/agents/marwil/marwil_tf_policy.py @@ -28,7 +28,7 @@ class ValueNetworkMixin: class ValueLoss: def __init__(self, state_values, cumulative_rewards): self.loss = 0.5 * tf.reduce_mean( - tf.square(state_values - cumulative_rewards)) + tf.math.square(state_values - cumulative_rewards)) class ReweightedImitationLoss: @@ -39,13 +39,13 @@ class ReweightedImitationLoss: # update averaged advantage norm update_adv_norm = tf.assign_add( ref=policy._ma_adv_norm, - value=1e-6 * - (tf.reduce_mean(tf.square(adv)) - policy._ma_adv_norm)) + value=1e-6 * ( + tf.reduce_mean(tf.math.square(adv)) - policy._ma_adv_norm)) # exponentially weighted advantages with tf.control_dependencies([update_adv_norm]): - exp_advs = tf.exp( - beta * tf.divide(adv, 1e-8 + tf.sqrt(policy._ma_adv_norm))) + exp_advs = tf.math.exp(beta * tf.math.divide( + adv, 1e-8 + tf.math.sqrt(policy._ma_adv_norm))) # log\pi_\theta(a|s) logprobs = action_dist.logp(actions) diff --git a/rllib/agents/ppo/appo_tf_policy.py b/rllib/agents/ppo/appo_tf_policy.py index af833a8c9..c733890ec 100644 --- a/rllib/agents/ppo/appo_tf_policy.py +++ b/rllib/agents/ppo/appo_tf_policy.py @@ -78,7 +78,7 @@ class PPOSurrogateLoss: # The baseline loss delta = values - value_targets self.value_targets = value_targets - self.vf_loss = 0.5 * reduce_mean_valid(tf.square(delta)) + self.vf_loss = 0.5 * reduce_mean_valid(tf.math.square(delta)) # The entropy loss self.entropy = reduce_mean_valid(actions_entropy) @@ -159,7 +159,7 @@ class VTraceSurrogateLoss: behaviour_policy_logits=behaviour_logits, target_policy_logits=old_policy_behaviour_logits, actions=tf.unstack(actions, axis=2), - discounts=tf.to_float(~dones) * discount, + discounts=tf.cast(~dones, tf.float32) * discount, rewards=rewards, values=values, bootstrap_value=bootstrap_value, @@ -185,7 +185,7 @@ class VTraceSurrogateLoss: # The baseline loss delta = values - self.vtrace_returns.vs self.value_targets = self.vtrace_returns.vs - self.vf_loss = 0.5 * reduce_mean_valid(tf.square(delta)) + self.vf_loss = 0.5 * reduce_mean_valid(tf.math.square(delta)) # The entropy loss self.entropy = reduce_mean_valid(actions_entropy) @@ -350,7 +350,7 @@ def stats(policy, train_batch): "cur_lr": tf.cast(policy.cur_lr, tf.float64), "policy_loss": policy.loss.pi_loss, "entropy": policy.loss.entropy, - "var_gnorm": tf.global_norm(policy.model.trainable_variables()), + "var_gnorm": tf.linalg.global_norm(policy.model.trainable_variables()), "vf_loss": policy.loss.vf_loss, "vf_explained_var": explained_variance( tf.reshape(policy.loss.value_targets, [-1]), diff --git a/rllib/agents/ppo/ppo_tf_policy.py b/rllib/agents/ppo/ppo_tf_policy.py index 54f1386ea..963d4d816 100644 --- a/rllib/agents/ppo/ppo_tf_policy.py +++ b/rllib/agents/ppo/ppo_tf_policy.py @@ -89,10 +89,10 @@ class PPOLoss: self.mean_policy_loss = reduce_mean_valid(-surrogate_loss) if use_gae: - vf_loss1 = tf.square(value_fn - value_targets) + vf_loss1 = tf.math.square(value_fn - value_targets) vf_clipped = vf_preds + tf.clip_by_value( value_fn - vf_preds, -vf_clip_param, vf_clip_param) - vf_loss2 = tf.square(vf_clipped - value_targets) + vf_loss2 = tf.math.square(vf_clipped - value_targets) vf_loss = tf.maximum(vf_loss1, vf_loss2) self.mean_vf_loss = reduce_mean_valid(vf_loss) loss = reduce_mean_valid( diff --git a/rllib/tests/test_avail_actions_qmix.py b/rllib/agents/qmix/tests/test_qmix.py similarity index 100% rename from rllib/tests/test_avail_actions_qmix.py rename to rllib/agents/qmix/tests/test_qmix.py diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index 52e935599..ca6b0937d 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -1,8 +1,12 @@ +import logging + from ray.rllib.agents.trainer import with_common_config from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer from ray.rllib.agents.sac.sac_tf_policy import SACTFPolicy from ray.rllib.utils.deprecation import deprecation_warning, DEPRECATED_VALUE +logger = logging.getLogger(__name__) + OPTIMIZER_SHARED_CONFIGS = [ "buffer_size", "prioritized_replay", "prioritized_replay_alpha", "prioritized_replay_beta", "prioritized_replay_eps", @@ -131,6 +135,12 @@ def get_policy_class(config): def validate_config(config): + if config["model"].get("custom_model"): + logger.warning( + "Setting use_state_preprocessor=True since a custom model " + "was specified.") + config["use_state_preprocessor"] = True + if config.get("grad_norm_clipping", DEPRECATED_VALUE) != DEPRECATED_VALUE: deprecation_warning("grad_norm_clipping", "grad_clip") config["grad_clip"] = config.pop("grad_norm_clipping") @@ -154,7 +164,7 @@ def validate_config(config): SACTrainer = GenericOffPolicyTrainer.with_updates( name="SAC", default_config=DEFAULT_CONFIG, - validate_config=validate_config, default_policy=SACTFPolicy, get_policy_class=get_policy_class, + validate_config=validate_config, ) diff --git a/rllib/agents/sac/sac_tf_policy.py b/rllib/agents/sac/sac_tf_policy.py index 94cb2553b..fcf05fbd5 100644 --- a/rllib/agents/sac/sac_tf_policy.py +++ b/rllib/agents/sac/sac_tf_policy.py @@ -24,21 +24,6 @@ logger = logging.getLogger(__name__) def build_sac_model(policy, obs_space, action_space, config): - if config["model"].get("custom_model"): - logger.warning( - "Setting use_state_preprocessor=True since a custom model " - "was specified.") - config["use_state_preprocessor"] = True - if not isinstance(action_space, (Box, Discrete)): - raise UnsupportedSpaceException( - "Action space {} is not supported for SAC.".format(action_space)) - if isinstance(action_space, Box) and len(action_space.shape) > 1: - raise UnsupportedSpaceException( - "Action space has multiple dimensions " - "{}. ".format(action_space.shape) + - "Consider reshaping this into a single dimension, " - "using a Tuple action space, or the multi-agent API.") - # 2 cases: # 1) with separate state-preprocessor (before obs+action concat). # 2) no separate state-preprocessor: concat obs+actions right away. @@ -425,6 +410,19 @@ def setup_late_mixins(policy, obs_space, action_space, config): TargetNetworkMixin.__init__(policy, config) +def validate_spaces(pid, observation_space, action_space, config): + if not isinstance(action_space, (Box, Discrete)): + raise UnsupportedSpaceException( + "Action space ({}) of {} is not supported for " + "SAC.".format(action_space, pid)) + if isinstance(action_space, Box) and len(action_space.shape) > 1: + raise UnsupportedSpaceException( + "Action space ({}) of {} has multiple dimensions " + "{}. ".format(action_space, pid, action_space.shape) + + "Consider reshaping this into a single dimension, " + "using a Tuple action space, or the multi-agent API.") + + SACTFPolicy = build_tf_policy( name="SACTFPolicy", get_default_config=lambda: ray.rllib.agents.sac.sac.DEFAULT_CONFIG, @@ -439,6 +437,7 @@ SACTFPolicy = build_tf_policy( mixins=[ TargetNetworkMixin, ActorCriticOptimizerMixin, ComputeTDErrorMixin ], + validate_spaces=validate_spaces, before_init=setup_early_mixins, before_loss_init=setup_mid_mixins, after_init=setup_late_mixins, diff --git a/rllib/agents/sac/sac_torch_policy.py b/rllib/agents/sac/sac_torch_policy.py index a3444ce0a..4e405e0fc 100644 --- a/rllib/agents/sac/sac_torch_policy.py +++ b/rllib/agents/sac/sac_torch_policy.py @@ -5,7 +5,7 @@ import ray import ray.experimental.tf_utils from ray.rllib.agents.a3c.a3c_torch_policy import apply_grad_clipping from ray.rllib.agents.sac.sac_tf_policy import build_sac_model, \ - postprocess_trajectory + postprocess_trajectory, validate_spaces from ray.rllib.agents.dqn.dqn_tf_policy import PRIO_WEIGHTS from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.torch_policy_template import build_torch_policy @@ -336,6 +336,7 @@ SACTorchPolicy = build_torch_policy( postprocess_fn=postprocess_trajectory, extra_grad_process_fn=apply_grad_clipping, optimizer_fn=optimizer_fn, + validate_spaces=validate_spaces, after_init=setup_late_mixins, make_model_and_action_dist=build_sac_model_and_action_dist, mixins=[TargetNetworkMixin, ComputeTDErrorMixin], diff --git a/rllib/agents/sac/tests/test_sac.py b/rllib/agents/sac/tests/test_sac.py index 25157f4e8..134127d62 100644 --- a/rllib/agents/sac/tests/test_sac.py +++ b/rllib/agents/sac/tests/test_sac.py @@ -68,6 +68,7 @@ class TestSAC(unittest.TestCase): results = trainer.train() print(results) check_compute_single_action(trainer) + trainer.stop() def test_sac_loss_function(self): """Tests SAC loss function results across all frameworks.""" @@ -164,7 +165,7 @@ class TestSAC(unittest.TestCase): # Set all weights (of all nets) to fixed values. if weights_dict is None: - assert fw == "tf" # Start with the tf vars-dict. + assert fw in ["tf", "tfe"] # Start with the tf vars-dict. weights_dict = policy.get_weights() else: assert fw == "torch" # Then transfer that to torch Model. @@ -176,7 +177,7 @@ class TestSAC(unittest.TestCase): if fw == "tf": log_alpha = weights_dict["default_policy/log_alpha"] elif fw == "torch": - # Actually convert to torch tensors. + # Actually convert to torch tensors (by accessing everything). input_ = policy._lazy_tensor_dict(input_) input_ = {k: input_[k] for k in input_.keys()} log_alpha = policy.model.log_alpha.detach().numpy()[0] diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 6d4774892..6a99bc9c7 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -7,7 +7,7 @@ import os import pickle import time import tempfile -from typing import Callable, List, Dict, Union, Any +from typing import Callable, List, Dict, Union import ray from ray.exceptions import RayError @@ -701,9 +701,6 @@ class Trainer(Trainable): config (dict): The Trainer's config. num_workers (int): Number of remote rollout workers to create. 0 for local only. - remote_config_updates (Optional[List[dict]]): A list of config - dicts to update `config` with for each Worker (len must be - same as `num_workers`). Returns: WorkerSet: The created WorkerSet. @@ -778,9 +775,9 @@ class Trainer(Trainable): @PublicAPI def compute_action(self, observation: TensorStructType, - state: List[Any] = None, + state: List[TensorStructType] = None, prev_action: TensorStructType = None, - prev_reward: int = None, + prev_reward: float = None, info: EnvInfoDict = None, policy_id: PolicyID = DEFAULT_POLICY_ID, full_fetch: bool = False, @@ -791,16 +788,17 @@ class Trainer(Trainable): self.get_policy(policy_id) and call compute_actions() on it directly. Arguments: - observation (obj): observation from the environment. - state (list): RNN hidden state, if any. If state is not None, - then all of compute_single_action(...) is returned + observation (TensorStructType): observation from the environment. + state (List[TensorStructType]): RNN hidden state, if any. If state + is not None, then all of compute_single_action(...) is returned (computed action, rnn state(s), logits dictionary). Otherwise compute_single_action(...)[0] is returned (computed action). - prev_action (obj): previous action value, if any - prev_reward (int): previous reward, if any - info (dict): info object, if any - policy_id (str): Policy to query (only applies to multi-agent). + prev_action (TensorStructType): Previous action value, if any. + prev_reward (float): Previous reward, if any. + info (EnvInfoDict): info object, if any + policy_id (PolicyID): Policy to query (only applies to + multi-agent). full_fetch (bool): Whether to return extra action fetch results. This is always set to True if RNN state is specified. explore (bool): Whether to pick an exploitation or exploration diff --git a/rllib/agents/trainer_template.py b/rllib/agents/trainer_template.py index e94110d70..4cb8f917a 100644 --- a/rllib/agents/trainer_template.py +++ b/rllib/agents/trainer_template.py @@ -1,6 +1,6 @@ -from typing import Callable, Optional, List, Iterable import logging import time +from typing import Callable, Optional, List, Iterable from ray.rllib.agents.trainer import Trainer, COMMON_CONFIG from ray.rllib.evaluation.worker_set import WorkerSet @@ -34,20 +34,21 @@ def default_execution_plan(workers: WorkerSet, config: TrainerConfigDict): @DeveloperAPI def build_trainer( - name: str, - default_policy: Optional[Policy], - default_config: TrainerConfigDict = None, - validate_config: Callable[[TrainerConfigDict], None] = None, - get_initial_state=None, # DEPRECATED - get_policy_class: Callable[[TrainerConfigDict], Policy] = None, - before_init: Callable[[Trainer], None] = None, - make_workers=None, # DEPRECATED - make_policy_optimizer=None, # DEPRECATED - after_init: Callable[[Trainer], None] = None, - before_train_step=None, # DEPRECATED - after_optimizer_step=None, # DEPRECATED - after_train_result=None, # DEPRECATED - collect_metrics_fn=None, # DEPRECATED + name: str, + default_policy: Optional[Policy], + *, + default_config: TrainerConfigDict = None, + validate_config: Callable[[TrainerConfigDict], None] = None, + get_initial_state=None, # DEPRECATED + get_policy_class: Callable[[TrainerConfigDict], Policy] = None, + before_init: Callable[[Trainer], None] = None, + make_workers=None, # DEPRECATED + make_policy_optimizer=None, # DEPRECATED + after_init: Callable[[Trainer], None] = None, + before_train_step=None, # DEPRECATED + after_optimizer_step=None, # DEPRECATED + after_train_result=None, # DEPRECATED + collect_metrics_fn=None, # DEPRECATED before_evaluate_fn: Callable[[Trainer], None] = None, mixins: List[type] = None, execution_plan: Callable[[WorkerSet, TrainerConfigDict], Iterable[ @@ -64,19 +65,20 @@ def build_trainer( default_policy (cls): the default Policy class to use default_config (dict): The default config dict of the algorithm, otherwise uses the Trainer default config. - validate_config (func): optional callback that checks a given config - for correctness. It may mutate the config as needed. - get_policy_class (func): optional callback that takes a config and - returns the policy class to override the default with - before_init (func): optional function to run at the start of trainer - init that takes the trainer instance as argument - after_init (func): optional function to run at the end of trainer init - that takes the trainer instance as argument - before_evaluate_fn (func): callback to run before evaluation. This - takes the trainer instance as argument. + validate_config (Optional[callable]): Optional callable that takes the + config to check for correctness. It may mutate the config as + needed. + get_policy_class (Optional[callable]): Optional callable that takes a + config and returns the policy class to override the default with. + before_init (Optional[callable]): Optional callable to run at the start + of trainer init that takes the trainer instance as argument. + after_init (Optional[callable]): Optional callable to run at the end of + trainer init that takes the trainer instance as argument. + before_evaluate_fn (Optional[callable]): callback to run before + evaluation. This takes the trainer instance as argument. mixins (list): list of any class mixins for the returned trainer class. These mixins will be applied in order and will have higher - precedence than the Trainer class + precedence than the Trainer class. execution_plan (func): Setup the distributed execution workflow. Returns: diff --git a/rllib/examples/models/autoregressive_action_dist.py b/rllib/examples/models/autoregressive_action_dist.py index 324913a68..5385e0b15 100644 --- a/rllib/examples/models/autoregressive_action_dist.py +++ b/rllib/examples/models/autoregressive_action_dist.py @@ -11,29 +11,29 @@ class BinaryAutoregressiveDistribution(ActionDistribution): """Action distribution P(a1, a2) = P(a1) * P(a2 | a1)""" def deterministic_sample(self): - # first, sample a1 + # First, sample a1. a1_dist = self._a1_distribution() a1 = a1_dist.deterministic_sample() - # sample a2 conditioned on a1 + # Sample a2 conditioned on a1. a2_dist = self._a2_distribution(a1) a2 = a2_dist.deterministic_sample() self._action_logp = a1_dist.logp(a1) + a2_dist.logp(a2) - # return the action tuple + # Return the action tuple. return (a1, a2) def sample(self): - # first, sample a1 + # First, sample a1. a1_dist = self._a1_distribution() a1 = a1_dist.sample() - # sample a2 conditioned on a1 + # Sample a2 conditioned on a1. a2_dist = self._a2_distribution(a1) a2 = a2_dist.sample() self._action_logp = a1_dist.logp(a1) + a2_dist.logp(a2) - # return the action tuple + # Return the action tuple. return (a1, a2) def logp(self, actions): @@ -81,29 +81,29 @@ class TorchBinaryAutoregressiveDistribution(TorchDistributionWrapper): """Action distribution P(a1, a2) = P(a1) * P(a2 | a1)""" def deterministic_sample(self): - # first, sample a1 + # First, sample a1. a1_dist = self._a1_distribution() a1 = a1_dist.deterministic_sample() - # sample a2 conditioned on a1 + # Sample a2 conditioned on a1. a2_dist = self._a2_distribution(a1) a2 = a2_dist.deterministic_sample() self._action_logp = a1_dist.logp(a1) + a2_dist.logp(a2) - # return the action tuple + # Return the action tuple. return (a1, a2) def sample(self): - # first, sample a1 + # First, sample a1. a1_dist = self._a1_distribution() a1 = a1_dist.sample() - # sample a2 conditioned on a1 + # Sample a2 conditioned on a1. a2_dist = self._a2_distribution(a1) a2 = a2_dist.sample() self._action_logp = a1_dist.logp(a1) + a2_dist.logp(a2) - # return the action tuple + # Return the action tuple. return (a1, a2) def logp(self, actions): diff --git a/rllib/examples/models/parametric_actions_model.py b/rllib/examples/models/parametric_actions_model.py index c464cb554..f0c62935d 100644 --- a/rllib/examples/models/parametric_actions_model.py +++ b/rllib/examples/models/parametric_actions_model.py @@ -56,7 +56,7 @@ class ParametricActionsModel(DistributionalQTFModel): action_logits = tf.reduce_sum(avail_actions * intent_vector, axis=2) # Mask out invalid actions (use tf.float32.min for stability) - inf_mask = tf.maximum(tf.log(action_mask), tf.float32.min) + inf_mask = tf.maximum(tf.math.log(action_mask), tf.float32.min) return action_logits + inf_mask, state def value_function(self): diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index e6f7cc526..652c1e65e 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -416,7 +416,8 @@ class ModelCatalog: name, **model_kwargs) else: raise NotImplementedError( - "Framework must be 'tf' or 'torch': {}".format(framework)) + "`framework` must be 'tf|tfe|torch', but is " + "{}!".format(framework)) @staticmethod @DeveloperAPI diff --git a/rllib/models/tf/__init__.py b/rllib/models/tf/__init__.py index d43e8f403..86d33b39d 100644 --- a/rllib/models/tf/__init__.py +++ b/rllib/models/tf/__init__.py @@ -1,7 +1,6 @@ from ray.rllib.models.tf.tf_modelv2 import TFModelV2 from ray.rllib.models.tf.fcnet import FullyConnectedNetwork -from ray.rllib.models.tf.recurrent_net import \ - RecurrentNetwork +from ray.rllib.models.tf.recurrent_net import RecurrentNetwork from ray.rllib.models.tf.visionnet import VisionNetwork __all__ = [ diff --git a/rllib/models/tf/layers/__init__.py b/rllib/models/tf/layers/__init__.py index 5328e1667..887853895 100644 --- a/rllib/models/tf/layers/__init__.py +++ b/rllib/models/tf/layers/__init__.py @@ -1,10 +1,14 @@ from ray.rllib.models.tf.layers.gru_gate import GRUGate +from ray.rllib.models.tf.layers.noisy_layer import NoisyLayer from ray.rllib.models.tf.layers.relative_multi_head_attention import \ RelativeMultiHeadAttention from ray.rllib.models.tf.layers.skip_connection import SkipConnection from ray.rllib.models.tf.layers.multi_head_attention import MultiHeadAttention __all__ = [ - "GRUGate", "RelativeMultiHeadAttention", "SkipConnection", - "MultiHeadAttention" + "GRUGate", + "MultiHeadAttention", + "NoisyLayer", + "RelativeMultiHeadAttention", + "SkipConnection" ] diff --git a/rllib/models/tf/layers/noisy_layer.py b/rllib/models/tf/layers/noisy_layer.py new file mode 100644 index 000000000..7024c8acd --- /dev/null +++ b/rllib/models/tf/layers/noisy_layer.py @@ -0,0 +1,105 @@ +import numpy as np + +from ray.rllib.utils.framework import get_activation_fn, get_variable, \ + try_import_tf + +tf = try_import_tf() + + +class NoisyLayer(tf.keras.layers.Layer): + """A Layer that adds learnable Noise + a common dense layer: y = w^{T}x + b + a noisy layer: y = (w + \\epsilon_w*\\sigma_w)^{T}x + + (b+\\epsilon_b*\\sigma_b) + where \epsilon are random variables sampled from factorized normal + distributions and \\sigma are trainable variables which are expected to + vanish along the training procedure + """ + + def __init__(self, + prefix, + out_size, + sigma0, + activation="relu"): + """Initializes a NoisyLayer object. + + Args: + prefix: + out_size: + sigma0: + non_linear: + """ + super().__init__() + self.prefix = prefix + self.out_size = out_size + # TF noise generation can be unreliable on GPU + # If generating the noise on the CPU, + # lowering sigma0 to 0.1 may be helpful + self.sigma0 = sigma0 # 0.5~GPU, 0.1~CPU + self.activation = activation + # Variables. + self.w = None # Weight matrix. + self.b = None # Biases. + self.sigma_w = None # Noise for weight matrix + self.sigma_b = None # Noise for biases. + + def build(self, input_shape): + in_size = int(input_shape[1]) + + self.sigma_w = get_variable( + value=tf.keras.initializers.RandomUniform( + minval=-1.0 / np.sqrt(float(in_size)), + maxval=1.0 / np.sqrt(float(in_size))), + trainable=True, + tf_name=self.prefix + "_sigma_w", + shape=[in_size, self.out_size], + dtype=tf.float32 + ) + + self.sigma_b = get_variable( + value=tf.keras.initializers.Constant( + self.sigma0 / np.sqrt(float(in_size))), + trainable=True, + tf_name=self.prefix + "_sigma_b", + shape=[self.out_size], + dtype=tf.float32, + ) + + self.w = get_variable( + value=tf.keras.initializers.GlorotUniform(), + tf_name=self.prefix + "_fc_w", + trainable=True, + shape=[in_size, self.out_size], + dtype=tf.float32, + ) + + self.b = get_variable( + value=tf.keras.initializers.Zeros(), + tf_name=self.prefix + "_fc_b", + trainable=True, + shape=[self.out_size], + dtype=tf.float32, + ) + + def call(self, inputs): + in_size = int(inputs.shape[1]) + epsilon_in = tf.random.normal(shape=[in_size]) + epsilon_out = tf.random.normal(shape=[self.out_size]) + epsilon_in = self._f_epsilon(epsilon_in) + epsilon_out = self._f_epsilon(epsilon_out) + epsilon_w = tf.matmul( + a=tf.expand_dims(epsilon_in, -1), b=tf.expand_dims(epsilon_out, 0)) + epsilon_b = epsilon_out + + action_activation = tf.matmul( + inputs, + self.w + self.sigma_w * epsilon_w) + \ + self.b + self.sigma_b * epsilon_b + + fn = get_activation_fn(self.activation, framework="tf") + if fn is not None: + action_activation = fn(action_activation) + return action_activation + + def _f_epsilon(self, x): + return tf.math.sign(x) * tf.math.sqrt(tf.math.abs(x)) diff --git a/rllib/models/tf/tf_action_dist.py b/rllib/models/tf/tf_action_dist.py index 585d8eb3f..031c5ff5c 100644 --- a/rllib/models/tf/tf_action_dist.py +++ b/rllib/models/tf/tf_action_dist.py @@ -65,22 +65,23 @@ class Categorical(TFActionDistribution): @override(ActionDistribution) def entropy(self): - a0 = self.inputs - tf.reduce_max(self.inputs, axis=1, keep_dims=True) + a0 = self.inputs - tf.reduce_max(self.inputs, axis=1, keepdims=True) ea0 = tf.exp(a0) - z0 = tf.reduce_sum(ea0, axis=1, keep_dims=True) + z0 = tf.reduce_sum(ea0, axis=1, keepdims=True) p0 = ea0 / z0 - return tf.reduce_sum(p0 * (tf.log(z0) - a0), axis=1) + return tf.reduce_sum(p0 * (tf.math.log(z0) - a0), axis=1) @override(ActionDistribution) def kl(self, other): - a0 = self.inputs - tf.reduce_max(self.inputs, axis=1, keep_dims=True) - a1 = other.inputs - tf.reduce_max(other.inputs, axis=1, keep_dims=True) + a0 = self.inputs - tf.reduce_max(self.inputs, axis=1, keepdims=True) + a1 = other.inputs - tf.reduce_max(other.inputs, axis=1, keepdims=True) ea0 = tf.exp(a0) ea1 = tf.exp(a1) - z0 = tf.reduce_sum(ea0, axis=1, keep_dims=True) - z1 = tf.reduce_sum(ea1, axis=1, keep_dims=True) + z0 = tf.reduce_sum(ea0, axis=1, keepdims=True) + z1 = tf.reduce_sum(ea1, axis=1, keepdims=True) p0 = ea0 / z0 - return tf.reduce_sum(p0 * (a0 - tf.log(z0) - a1 + tf.log(z1)), axis=1) + return tf.reduce_sum( + p0 * (a0 - tf.math.log(z0) - a1 + tf.math.log(z1)), axis=1) @override(TFActionDistribution) def _build_sample_op(self): @@ -230,8 +231,9 @@ class DiagGaussian(TFActionDistribution): @override(ActionDistribution) def logp(self, x): return -0.5 * tf.reduce_sum( - tf.square((tf.to_float(x) - self.mean) / self.std), axis=1) - \ - 0.5 * np.log(2.0 * np.pi) * tf.to_float(tf.shape(x)[1]) - \ + tf.math.square((tf.cast(x, tf.float32) - self.mean) / self.std), + axis=1 + ) - 0.5 * np.log(2.0 * np.pi) * tf.cast(tf.shape(x)[1], tf.float32) - \ tf.reduce_sum(self.log_std, axis=1) @override(ActionDistribution) @@ -239,8 +241,9 @@ class DiagGaussian(TFActionDistribution): assert isinstance(other, DiagGaussian) return tf.reduce_sum( other.log_std - self.log_std + - (tf.square(self.std) + tf.square(self.mean - other.mean)) / - (2.0 * tf.square(other.std)) - 0.5, + (tf.math.square(self.std) + + tf.math.square(self.mean - other.mean)) / + (2.0 * tf.math.square(other.std)) - 0.5, axis=1) @override(ActionDistribution) @@ -250,7 +253,7 @@ class DiagGaussian(TFActionDistribution): @override(TFActionDistribution) def _build_sample_op(self): - return self.mean + self.std * tf.random_normal(tf.shape(self.mean)) + return self.mean + self.std * tf.random.normal(tf.shape(self.mean)) @staticmethod @override(ActionDistribution) diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index 8dd15584f..767f84750 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -174,6 +174,7 @@ def build_eager_tf_policy(name, grad_stats_fn=None, extra_learn_fetches_fn=None, extra_action_fetches_fn=None, + validate_spaces=None, before_init=None, before_loss_init=None, after_init=None, @@ -208,6 +209,9 @@ def build_eager_tf_policy(name, if get_default_config: config = dict(get_default_config(), **config) + if validate_spaces: + validate_spaces(self, observation_space, action_space, config) + if before_init: before_init(self, observation_space, action_space, config) diff --git a/rllib/policy/tf_policy_template.py b/rllib/policy/tf_policy_template.py index 26bfd51e2..5c1f51f03 100644 --- a/rllib/policy/tf_policy_template.py +++ b/rllib/policy/tf_policy_template.py @@ -22,6 +22,7 @@ def build_tf_policy(name, grad_stats_fn=None, extra_action_fetches_fn=None, extra_learn_fetches_fn=None, + validate_spaces=None, before_init=None, before_loss_init=None, after_init=None, @@ -73,6 +74,9 @@ def build_tf_policy(name, a dict of TF fetches given the policy object extra_learn_fetches_fn (func): optional function that returns a dict of extra values to fetch and return when learning on a batch + validate_spaces (Optional[callable]): Optional callable that takes the + Policy, observation_space, action_space, and config to check for + correctness. before_init (func): optional function to run at the beginning of policy init that takes the same arguments as the policy constructor before_loss_init (func): optional function to run prior to loss @@ -113,6 +117,9 @@ def build_tf_policy(name, if get_default_config: config = dict(get_default_config(), **config) + if validate_spaces: + validate_spaces(self, obs_space, action_space, config) + if before_init: before_init(self, obs_space, action_space, config) diff --git a/rllib/policy/torch_policy_template.py b/rllib/policy/torch_policy_template.py index 7be5855e2..0b4ffeae1 100644 --- a/rllib/policy/torch_policy_template.py +++ b/rllib/policy/torch_policy_template.py @@ -20,6 +20,7 @@ def build_torch_policy(name, extra_action_out_fn=None, extra_grad_process_fn=None, optimizer_fn=None, + validate_spaces=None, before_init=None, after_init=None, action_sampler_fn=None, @@ -48,6 +49,9 @@ def build_torch_policy(name, called after gradients are computed and returns processing info. optimizer_fn (Optional[callable]): Optional callable that returns a torch optimizer given the policy and config. + validate_spaces (Optional[callable]): Optional callable that takes the + Policy, observation_space, action_space, and config to check for + correctness. before_init (Optional[callable]): Optional callable to run at the beginning of `Policy.__init__` that takes the same arguments as the Policy constructor. @@ -94,8 +98,11 @@ def build_torch_policy(name, config = dict(get_default_config(), **config) self.config = config + if validate_spaces: + validate_spaces(self, obs_space, action_space, self.config) + if before_init: - before_init(self, obs_space, action_space, config) + before_init(self, obs_space, action_space, self.config) # Model is customized (use default action dist class). if make_model: diff --git a/rllib/tests/test_dependency.py b/rllib/tests/test_dependency.py index d195dac8d..bf2fdf153 100644 --- a/rllib/tests/test_dependency.py +++ b/rllib/tests/test_dependency.py @@ -23,3 +23,5 @@ if __name__ == "__main__": # Clean up. del os.environ["RLLIB_TEST_NO_TF_IMPORT"] + + print("ok") diff --git a/rllib/tests/test_execution.py b/rllib/tests/test_execution.py index 1022b213b..bc92ff881 100644 --- a/rllib/tests/test_execution.py +++ b/rllib/tests/test_execution.py @@ -1,5 +1,4 @@ import numpy as np -import pytest import time import gym import queue @@ -252,5 +251,6 @@ def test_store_to_replay_actor(ray_start_regular_shared): if __name__ == "__main__": + import pytest import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/tests/test_rollout_worker.py b/rllib/tests/test_rollout_worker.py index 09957e5ba..6c56c66f7 100644 --- a/rllib/tests/test_rollout_worker.py +++ b/rllib/tests/test_rollout_worker.py @@ -158,6 +158,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(batch["prev_actions"].tolist(), to_prev(batch["actions"])) self.assertGreater(batch["advantages"][0], 1) + ev.stop() def test_batch_ids(self): ev = RolloutWorker( @@ -170,6 +171,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(len(set(batch2["unroll_id"])), 1) self.assertEqual( len(set(SampleBatch.concat(batch1, batch2)["unroll_id"])), 2) + ev.stop() def test_global_vars_update(self): # Allow for Unittest run. @@ -202,10 +204,9 @@ class TestRolloutWorker(unittest.TestCase): break self.assertLess( result["info"]["learner"]["default_policy"]["cur_lr"], 0.07) + agent.stop() def test_no_step_on_init(self): - # Allow for Unittest run. - ray.init(num_cpus=5, ignore_reinit_error=True) register_env("fail", lambda _: FailOnStepEnv()) for fw in framework_iterator(frameworks=()): pg = PGTrainer( @@ -214,6 +215,7 @@ class TestRolloutWorker(unittest.TestCase): "framework": fw, }) self.assertRaises(Exception, lambda: pg.train()) + pg.stop() def test_callbacks(self): for fw in framework_iterator(frameworks=("torch", "tf")): @@ -240,10 +242,9 @@ class TestRolloutWorker(unittest.TestCase): self.assertGreater(counts["start"], 0) self.assertGreater(counts["end"], 0) self.assertGreater(counts["step"], 0) + pg.stop() def test_query_evaluators(self): - # Allow for Unittest run. - ray.init(num_cpus=5, ignore_reinit_error=True) register_env("test", lambda _: gym.make("CartPole-v0")) for fw in framework_iterator(frameworks=("torch", "tf")): pg = PGTrainer( @@ -263,6 +264,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(results, [10, 10, 10]) self.assertEqual(results2, [(0, 10), (1, 10), (2, 10)]) self.assertEqual(results3, [[1, 1], [1, 1], [1, 1]]) + pg.stop() def test_reward_clipping(self): # clipping on @@ -274,6 +276,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(max(ev.sample()["rewards"]), 1) result = collect_metrics(ev, []) self.assertEqual(result["episode_reward_mean"], 1000) + ev.stop() # clipping off ev2 = RolloutWorker( @@ -284,6 +287,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(max(ev2.sample()["rewards"]), 100) result2 = collect_metrics(ev2, []) self.assertEqual(result2["episode_reward_mean"], 1000) + ev2.stop() def test_hard_horizon(self): ev = RolloutWorker( @@ -302,6 +306,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(np.argmax(samples["obs"][4]), 0) # 3 done values. self.assertEqual(sum(samples["dones"]), 3) + ev.stop() # A gym env's max_episode_steps is smaller than Trainer's horizon. ev = RolloutWorker( @@ -322,6 +327,7 @@ class TestRolloutWorker(unittest.TestCase): False, False, False, False, False, True, False, False, False, False, False, True ]) + ev.stop() def test_soft_horizon(self): ev = RolloutWorker( @@ -336,10 +342,9 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(len(set(samples["eps_id"])), 3) # only 1 hard done value self.assertEqual(sum(samples["dones"]), 1) + ev.stop() def test_metrics(self): - # Allow for Unittest run. - ray.init(num_cpus=5, ignore_reinit_error=True) ev = RolloutWorker( env_creator=lambda _: MockEnv(episode_length=10), policy=MockPolicy, @@ -353,6 +358,7 @@ class TestRolloutWorker(unittest.TestCase): result = collect_metrics(ev, [remote_ev]) self.assertEqual(result["episodes_this_iter"], 20) self.assertEqual(result["episode_reward_mean"], 10) + ev.stop() def test_async(self): ev = RolloutWorker( @@ -363,6 +369,7 @@ class TestRolloutWorker(unittest.TestCase): for key in ["obs", "actions", "rewards", "dones", "advantages"]: self.assertIn(key, batch) self.assertGreater(batch["advantages"][0], 1) + ev.stop() def test_auto_vectorization(self): ev = RolloutWorker( @@ -386,6 +393,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(env.unwrapped.config.worker_index, 0) indices.append(env.unwrapped.config.vector_index) self.assertEqual(indices, [0, 1, 2, 3, 4, 5, 6, 7]) + ev.stop() def test_batches_larger_when_vectorized(self): ev = RolloutWorker( @@ -401,6 +409,7 @@ class TestRolloutWorker(unittest.TestCase): batch = ev.sample() result = collect_metrics(ev, []) self.assertEqual(result["episodes_this_iter"], 4) + ev.stop() def test_vector_env_support(self): ev = RolloutWorker( @@ -418,6 +427,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(batch.count, 10) result = collect_metrics(ev, []) self.assertEqual(result["episodes_this_iter"], 8) + ev.stop() def test_truncate_episodes(self): ev = RolloutWorker( @@ -427,6 +437,7 @@ class TestRolloutWorker(unittest.TestCase): batch_mode="truncate_episodes") batch = ev.sample() self.assertEqual(batch.count, 15) + ev.stop() def test_complete_episodes(self): ev = RolloutWorker( @@ -436,6 +447,7 @@ class TestRolloutWorker(unittest.TestCase): batch_mode="complete_episodes") batch = ev.sample() self.assertEqual(batch.count, 10) + ev.stop() def test_complete_episodes_packing(self): ev = RolloutWorker( @@ -448,6 +460,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual( batch["t"].tolist(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + ev.stop() def test_filter_sync(self): ev = RolloutWorker( @@ -461,6 +474,7 @@ class TestRolloutWorker(unittest.TestCase): obs_f = filters[DEFAULT_POLICY_ID] self.assertNotEqual(obs_f.rs.n, 0) self.assertNotEqual(obs_f.buffer.n, 0) + ev.stop() def test_get_filters(self): ev = RolloutWorker( @@ -476,6 +490,7 @@ class TestRolloutWorker(unittest.TestCase): obs_f2 = filters2[DEFAULT_POLICY_ID] self.assertGreaterEqual(obs_f2.rs.n, obs_f.rs.n) self.assertGreaterEqual(obs_f2.buffer.n, obs_f.buffer.n) + ev.stop() def test_sync_filter(self): ev = RolloutWorker( @@ -498,6 +513,23 @@ class TestRolloutWorker(unittest.TestCase): obs_f = filters[DEFAULT_POLICY_ID] self.assertGreaterEqual(obs_f.rs.n, 100) self.assertLessEqual(obs_f.buffer.n, 20) + ev.stop() + + def test_extra_python_envs(self): + extra_envs = {"env_key_1": "env_value_1", "env_key_2": "env_value_2"} + self.assertFalse("env_key_1" in os.environ) + self.assertFalse("env_key_2" in os.environ) + ev = RolloutWorker( + env_creator=lambda _: MockEnv(10), + policy=MockPolicy, + extra_python_environs=extra_envs) + self.assertTrue("env_key_1" in os.environ) + self.assertTrue("env_key_2" in os.environ) + ev.stop() + + # reset to original + del os.environ["env_key_1"] + del os.environ["env_key_2"] def sample_and_flush(self, ev): time.sleep(2) @@ -508,21 +540,6 @@ class TestRolloutWorker(unittest.TestCase): self.assertNotEqual(obs_f.buffer.n, 0) return obs_f - def test_extra_python_envs(self): - extra_envs = {"env_key_1": "env_value_1", "env_key_2": "env_value_2"} - self.assertFalse("env_key_1" in os.environ) - self.assertFalse("env_key_2" in os.environ) - RolloutWorker( - env_creator=lambda _: MockEnv(10), - policy=MockPolicy, - extra_python_environs=extra_envs) - self.assertTrue("env_key_1" in os.environ) - self.assertTrue("env_key_2" in os.environ) - - # reset to original - del os.environ["env_key_1"] - del os.environ["env_key_2"] - if __name__ == "__main__": import pytest diff --git a/rllib/utils/exploration/parameter_noise.py b/rllib/utils/exploration/parameter_noise.py index c6249ff7d..6654bd829 100644 --- a/rllib/utils/exploration/parameter_noise.py +++ b/rllib/utils/exploration/parameter_noise.py @@ -305,7 +305,7 @@ class ParameterNoise(Exploration): added_noises.append( tf.assign( noise, - tf.random_normal( + tf.random.normal( shape=noise.shape, stddev=self.stddev, dtype=tf.float32))) diff --git a/rllib/utils/tf_ops.py b/rllib/utils/tf_ops.py index b415b8689..abdef2fc8 100644 --- a/rllib/utils/tf_ops.py +++ b/rllib/utils/tf_ops.py @@ -13,7 +13,7 @@ def huber_loss(x, delta=1.0): """Reference: https://en.wikipedia.org/wiki/Huber_loss""" return tf.where( tf.abs(x) < delta, - tf.square(x) * 0.5, delta * (tf.abs(x) - 0.5 * delta)) + tf.math.square(x) * 0.5, delta * (tf.abs(x) - 0.5 * delta)) def reduce_mean_ignore_inf(x, axis): diff --git a/rllib/utils/types.py b/rllib/utils/types.py index a43b10497..a12e78243 100644 --- a/rllib/utils/types.py +++ b/rllib/utils/types.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Union, Tuple +from typing import Any, Dict, List, Tuple, Union import gym # Represents a fully filled out config of a Trainer class. @@ -77,3 +77,6 @@ TensorType = Any # Either a plain tensor, or a dict or tuple of tensors (or StructTensors). TensorStructType = Union[TensorType, dict, tuple] + +# A shape of a tensor. +TensorShape = Union[Tuple[int], List[int]]