diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 603049cd4..934487cfd 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -146,8 +146,13 @@ def merge_dicts(d1, d2): return merged -def deep_update(original, new_dict, new_keys_allowed, whitelist): +def deep_update(original, + new_dict, + new_keys_allowed=False, + whitelist=None, + override_all_if_type_changes=None): """Updates original dict with values from new_dict recursively. + If new key is introduced in new_dict, then if new_keys_allowed is not True, an error will be thrown. Further, for sub-dicts, if the key is in the whitelist, then new subkeys can be introduced. @@ -156,19 +161,35 @@ def deep_update(original, new_dict, new_keys_allowed, whitelist): original (dict): Dictionary with default values. new_dict (dict): Dictionary with values to be updated new_keys_allowed (bool): Whether new keys are allowed. - whitelist (list): List of keys that correspond to dict values - where new subkeys can be introduced. This is only at - the top level. + whitelist (Optional[List[str]]): List of keys that correspond to dict + values where new subkeys can be introduced. This is only at the top + level. + override_all_if_type_changes(Optional[List[str]]): List of top level + keys with value=dict, for which we always simply override the + entire value (dict), iff the "type" key in that value dict changes. """ + whitelist = whitelist or [] + override_all_if_type_changes = override_all_if_type_changes or [] + for k, value in new_dict.items(): - if k not in original: - if not new_keys_allowed: - raise Exception("Unknown config parameter `{}` ".format(k)) + if k not in original and not new_keys_allowed: + raise Exception("Unknown config parameter `{}` ".format(k)) + + # Both orginal value and new one are dicts. if isinstance(original.get(k), dict) and isinstance(value, dict): - if k in whitelist: - deep_update(original[k], value, True, []) + # Check old type vs old one. If different, override entire value. + if k in override_all_if_type_changes and \ + "type" in value and "type" in original[k] and \ + value["type"] != original[k]["type"]: + original[k] = value + # Whitelisted key -> ok to add new subkeys. + elif k in whitelist: + deep_update(original[k], value, True) + # Non-whitelisted key. else: - deep_update(original[k], value, new_keys_allowed, []) + deep_update(original[k], value, new_keys_allowed) + # Original value not a dict OR new value not a dict: + # Override entire value. else: original[k] = value return original diff --git a/rllib/BUILD b/rllib/BUILD index 4a7ff8b5d..5b6dfa3c6 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -62,7 +62,7 @@ py_test( py_test( name = "test_dqn", tags = ["agents_dir"], - size = "small", + size = "medium", srcs = ["agents/dqn/tests/test_dqn.py"] ) @@ -78,7 +78,7 @@ py_test( py_test( name = "test_ppo", tags = ["agents_dir"], - size = "small", + size = "medium", srcs = ["agents/ppo/tests/test_ppo.py", "agents/ppo/tests/test.py"] # TODO(sven): Move down once PR 6889 merged ) @@ -815,6 +815,7 @@ py_test( name = "test_framework_agnostic_components", tags = ["utils"], size = "small", + data = glob(["utils/tests/**"]), srcs = ["utils/tests/test_framework_agnostic_components.py"] ) @@ -902,12 +903,12 @@ py_test( srcs = ["tests/test_evaluators.py"] ) -#py_test( -# name = "tests/test_explorations", -# tags = ["tests_dir", "tests_dir_E"], -# size = "medium", -# srcs = ["tests/test_explorations.py"] -#) +py_test( + name = "tests/test_explorations", + tags = ["tests_dir", "tests_dir_E"], + size = "medium", + srcs = ["tests/test_explorations.py"] +) py_test( name = "tests/test_external_env", @@ -1190,18 +1191,25 @@ py_test( ) py_test( - name = "examples/multiagent_cartpole", + name = "examples/multi_agent_cartpole", tags = ["examples", "examples_M"], size = "medium", - srcs = ["examples/multiagent_cartpole.py"], + srcs = ["examples/multi_agent_cartpole.py"], args = ["--num-iters=2", "--num-cpus=4"] ) py_test( - name = "examples/multiagent_two_trainers", + name = "examples/multi_agent_custom_policy", + tags = ["examples", "examples_M_xxx"], + size = "medium", + srcs = ["examples/multi_agent_custom_policy.py"], +) + +py_test( + name = "examples/multi_agent_two_trainers", tags = ["examples", "examples_M"], size = "medium", - srcs = ["examples/multiagent_two_trainers.py"], + srcs = ["examples/multi_agent_two_trainers.py"], args = ["--num-iters=2"] ) diff --git a/rllib/agents/a3c/a3c.py b/rllib/agents/a3c/a3c.py index 9aca1ddf0..7a1980c82 100644 --- a/rllib/agents/a3c/a3c.py +++ b/rllib/agents/a3c/a3c.py @@ -1,8 +1,12 @@ +import logging + from ray.rllib.agents.a3c.a3c_tf_policy import A3CTFPolicy from ray.rllib.agents.trainer import with_common_config from ray.rllib.agents.trainer_template import build_trainer from ray.rllib.optimizers import AsyncGradientsOptimizer +logger = logging.getLogger(__name__) + # yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ @@ -12,7 +16,6 @@ DEFAULT_CONFIG = with_common_config({ # If true, use the Generalized Advantage Estimator (GAE) # with a value function, see https://arxiv.org/pdf/1506.02438.pdf. "use_gae": True, - # Size of rollout batch "sample_batch_size": 10, # GAE(gamma) parameter @@ -50,7 +53,8 @@ def validate_config(config): if config["entropy_coeff"] < 0: raise DeprecationWarning("entropy_coeff must be >= 0") if config["sample_async"] and config["use_pytorch"]: - raise ValueError( + config["sample_async"] = False + logger.warning( "The sample_async option is not supported with use_pytorch: " "Multithreading can be lead to crashes if used with pytorch.") diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 188447ffd..a135c2f7a 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -6,7 +6,7 @@ from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy from ray.rllib.agents.dqn.simple_q_policy import SimpleQPolicy from ray.rllib.optimizers import SyncReplayOptimizer from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID -from ray.rllib.utils.deprecation import deprecation_warning +from ray.rllib.utils.deprecation import deprecation_warning, DEPRECATED_VALUE from ray.rllib.utils.exploration import PerWorkerEpsilonGreedy logger = logging.getLogger(__name__) @@ -36,18 +36,25 @@ DEFAULT_CONFIG = with_common_config({ "n_step": 1, # === Exploration Settings (Experimental) === - "exploration": { - # The Exploration class to use. In the simplest case, this is the name - # (str) of any class present in the `rllib.utils.exploration` package. - # You can also provide the python class directly or the full location - # of your class (e.g. "ray.rllib.utils.exploration.epsilon_greedy. - # EpsilonGreedy"). + "exploration_config": { + # The Exploration class to use. "type": "EpsilonGreedy", # Config for the Exploration class' constructor: "initial_epsilon": 1.0, "final_epsilon": 0.02, "epsilon_timesteps": 10000, # Timesteps over which to anneal epsilon. + + # For soft_q, use: + # "exploration_config" = { + # "type": "SoftQ" + # "temperature": [float, e.g. 1.0] + # } }, + # Switch to greedy actions in evaluation workers. + "evaluation_config": { + "explore": False, + }, + # TODO(sven): Make Exploration class for parameter noise. # If True parameter space noise will be used for exploration # See https://blog.openai.com/better-exploration-with-parameter-noise/ @@ -58,11 +65,6 @@ DEFAULT_CONFIG = with_common_config({ "timesteps_per_iteration": 1000, # Update the target network every `target_network_update_freq` steps. "target_network_update_freq": 500, - # Use softmax for sampling actions. Required for off policy estimation. - "soft_q": False, - # Softmax temperature. Q values are divided by this value prior to softmax. - # Softmax approaches argmax as the temperature drops to zero. - "softmax_temp": 1.0, # === Replay buffer === # Size of the replay buffer. Note that if async_updates is set, then # each worker will have a replay buffer of this size. @@ -114,11 +116,13 @@ DEFAULT_CONFIG = with_common_config({ # DEPRECATED VALUES (set to -1 to indicate they have not been overwritten # by user's config). If we don't set them here, we will get an error # from the config-key checker. - "schedule_max_timesteps": -1, - "exploration_final_eps": -1, - "exploration_fraction": -1, - "beta_annealing_fraction": -1, - "per_worker_exploration": -1, + "schedule_max_timesteps": DEPRECATED_VALUE, + "exploration_final_eps": DEPRECATED_VALUE, + "exploration_fraction": DEPRECATED_VALUE, + "beta_annealing_fraction": DEPRECATED_VALUE, + "per_worker_exploration": DEPRECATED_VALUE, + "softmax_temp": DEPRECATED_VALUE, + "soft_q": DEPRECATED_VALUE, }) # __sphinx_doc_end__ # yapf: enable @@ -155,50 +159,69 @@ def validate_config_and_setup_param_noise(config): if config["use_pytorch"]: raise ValueError("DQN does not support PyTorch yet! Use tf instead.") - # Update effective batch size to include n-step - adjusted_batch_size = max(config["sample_batch_size"], - config.get("n_step", 1)) - config["sample_batch_size"] = adjusted_batch_size - # TODO(sven): Remove at some point. # Backward compatibility of epsilon-exploration config AND beta-annealing # fraction settings (both based on schedule_max_timesteps, which is # deprecated). schedule_max_timesteps = None - if "schedule_max_timesteps" in config and \ - config["schedule_max_timesteps"] > 0: + if config.get("schedule_max_timesteps", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: deprecation_warning( - "schedule_max_timesteps", "exploration.epsilon_timesteps AND " + "schedule_max_timesteps", + "exploration_config.epsilon_timesteps AND " "prioritized_replay_beta_annealing_timesteps") schedule_max_timesteps = config["schedule_max_timesteps"] - if "exploration_final_eps" in config and \ - config["exploration_final_eps"] > 0: + if config.get("exploration_final_eps", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: deprecation_warning("exploration_final_eps", - "exploration.final_epsilon") - if isinstance(config["exploration"], dict): - config["exploration"]["final_epsilon"] = \ + "exploration_config.final_epsilon") + if isinstance(config["exploration_config"], dict): + config["exploration_config"]["final_epsilon"] = \ config.pop("exploration_final_eps") - if "exploration_fraction" in config and config["exploration_fraction"] > 0: + if config.get("exploration_fraction", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: assert schedule_max_timesteps is not None deprecation_warning("exploration_fraction", - "exploration.epsilon_timesteps") - if isinstance(config["exploration"], dict): - config["exploration"]["epsilon_timesteps"] = config.pop( + "exploration_config.epsilon_timesteps") + if isinstance(config["exploration_config"], dict): + config["exploration_config"]["epsilon_timesteps"] = config.pop( "exploration_fraction") * schedule_max_timesteps - if "beta_annealing_fraction" in config and \ - config["beta_annealing_fraction"] > 0: + if config.get("beta_annealing_fraction", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: assert schedule_max_timesteps is not None deprecation_warning( "beta_annealing_fraction (decimal)", "prioritized_replay_beta_annealing_timesteps (int)") config["prioritized_replay_beta_annealing_timesteps"] = config.pop( "beta_annealing_fraction") * schedule_max_timesteps - if "per_worker_exploration" in config and \ - config["per_worker_exploration"] != -1: + if config.get("per_worker_exploration", DEPRECATED_VALUE) != \ + DEPRECATED_VALUE: deprecation_warning("per_worker_exploration", - "exploration.type=PerWorkerEpsilonGreedy") - if isinstance(config["exploration"], dict): - config["exploration"]["type"] = PerWorkerEpsilonGreedy + "exploration_config.type=PerWorkerEpsilonGreedy") + if isinstance(config["exploration_config"], dict): + config["exploration_config"]["type"] = PerWorkerEpsilonGreedy + if config.get("softmax_temp", DEPRECATED_VALUE) != DEPRECATED_VALUE: + deprecation_warning( + "soft_q", "exploration_config={" + "type=StochasticSampling, temperature=[float]" + "}") + if config.get("softmax_temp", 1.0) < 0.00001: + logger.warning("softmax temp very low: Clipped it to 0.00001.") + config["softmax_temperature"] = 0.00001 + if config.get("soft_q", DEPRECATED_VALUE) != DEPRECATED_VALUE: + deprecation_warning( + "soft_q", "exploration_config={" + "type=StochasticSampling, temperature=[float]" + "}") + config["exploration_config"] = { + "type": "SoftQ", + "temperature": config.get("softmax_temp", 1.0) + } + + # Update effective batch size to include n-step + adjusted_batch_size = max(config["sample_batch_size"], + config.get("n_step", 1)) + config["sample_batch_size"] = adjusted_batch_size # Setup parameter noise. if config.get("parameter_noise", False): diff --git a/rllib/agents/dqn/dqn_policy.py b/rllib/agents/dqn/dqn_policy.py index 2b0d7956e..11034faaa 100644 --- a/rllib/agents/dqn/dqn_policy.py +++ b/rllib/agents/dqn/dqn_policy.py @@ -203,32 +203,28 @@ def build_q_model(policy, obs_space, action_space, config): def sample_action_from_q_network(policy, q_model, input_dict, obs_space, - action_space, config): + action_space, explore, config, timestep): + # Action Q network. - q_values, q_logits, q_dist = _compute_q_values( - policy, q_model, input_dict[SampleBatch.CUR_OBS], obs_space, - action_space) - policy.q_values = q_values + q_vals = _compute_q_values(policy, q_model, + input_dict[SampleBatch.CUR_OBS], obs_space, + action_space) + + policy.q_values = q_vals[0] if isinstance(q_vals, tuple) else q_vals policy.q_func_vars = q_model.variables() - # Noise vars for Q network except for layer normalization vars + policy.output_actions, policy.action_logp = \ + policy.exploration.get_exploration_action( + policy.q_values, q_model, Categorical, explore, timestep) + + # Noise vars for Q network except for layer normalization vars. if config["parameter_noise"]: _build_parameter_noise( policy, [var for var in policy.q_func_vars if "LayerNorm" not in var.name]) policy.action_probs = tf.nn.softmax(policy.q_values) - # TODO(sven): Move soft_q logic to different Exploration child-component. - action_log_prob = None - if config["soft_q"]: - action_dist = Categorical(q_values / config["softmax_temp"]) - policy.output_actions = action_dist.sample() - action_log_prob = action_dist.sampled_action_logp() - policy.action_prob = tf.exp(action_log_prob) - else: - policy.output_actions = tf.argmax(q_values, axis=1) - policy.action_prob = None - return policy.output_actions, action_log_prob + return policy.output_actions, policy.action_logp def _build_parameter_noise(policy, pnet_params): diff --git a/rllib/agents/dqn/simple_q_policy.py b/rllib/agents/dqn/simple_q_policy.py index 07f892455..903212c0d 100644 --- a/rllib/agents/dqn/simple_q_policy.py +++ b/rllib/agents/dqn/simple_q_policy.py @@ -7,6 +7,7 @@ import ray from ray.rllib.agents.dqn.simple_q_model import SimpleQModel from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models import ModelCatalog +from ray.rllib.models.tf.tf_action_dist import Categorical from ray.rllib.utils.annotations import override from ray.rllib.utils.error import UnsupportedSpaceException from ray.rllib.policy.tf_policy import TFPolicy @@ -87,18 +88,22 @@ def build_q_models(policy, obs_space, action_space, config): return policy.q_model -def sample_action_from_q_network(policy, q_model, input_dict, obs_space, - action_space, config): +def simple_sample_action_from_q_network(policy, q_model, input_dict, obs_space, + action_space, explore, config, + timestep): + # Action Q network. + q_vals = _compute_q_values(policy, q_model, + input_dict[SampleBatch.CUR_OBS], obs_space, + action_space) - # Action Q network - q_values = _compute_q_values(policy, q_model, - input_dict[SampleBatch.CUR_OBS], obs_space, - action_space) - policy.q_values = q_values + policy.q_values = q_vals[0] if isinstance(q_vals, tuple) else q_vals policy.q_func_vars = q_model.variables() - # Action outputs. - return tf.argmax(q_values, axis=1), None + policy.output_actions, policy.action_logp = \ + policy.exploration.get_exploration_action( + policy.q_values, q_model, Categorical, explore, timestep) + + return policy.output_actions, policy.action_logp def build_q_losses(policy, model, dist_class, train_batch): @@ -161,7 +166,7 @@ SimpleQPolicy = build_tf_policy( name="SimpleQPolicy", get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, make_model=build_q_models, - action_sampler_fn=sample_action_from_q_network, + action_sampler_fn=simple_sample_action_from_q_network, loss_fn=build_q_losses, extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index 29f34d56c..bbd4730f1 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -1,7 +1,9 @@ +import numpy as np import unittest import ray.rllib.agents.dqn as dqn from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.test_utils import check tf = try_import_tf() @@ -11,12 +13,91 @@ class TestDQN(unittest.TestCase): """Test whether a DQNTrainer can be built with both frameworks.""" config = dqn.DEFAULT_CONFIG.copy() config["num_workers"] = 0 # Run locally. - config["eager"] = True # tf. + config["eager"] = True trainer = dqn.DQNTrainer(config=config, env="CartPole-v0") - num_iterations = 2 for i in range(num_iterations): results = trainer.train() print(results) + + config["eager"] = False + trainer = dqn.DQNTrainer(config=config, env="CartPole-v0") + num_iterations = 2 + for i in range(num_iterations): + results = trainer.train() + print(results) + + def test_dqn_exploration_and_soft_q_config(self): + """Tests, whether a DQN Agent outputs exploration/softmaxed actions.""" + config = dqn.DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + config["env_config"] = {"is_slippery": False, "map_name": "4x4"} + obs = np.array(0) + + # Test against all frameworks. + for fw in ["tf", "eager", "torch"]: + if fw == "torch": + continue + + config["eager"] = True if fw == "eager" else False + config["use_pytorch"] = True if fw == "torch" else False + + # Default EpsilonGreedy setup. + trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") + # Setting explore=False should always return the same action. + a_ = trainer.compute_action(obs, explore=False) + for _ in range(50): + a = trainer.compute_action(obs, explore=False) + check(a, a_) + # explore=None (default: explore) should return different actions. + actions = [] + for _ in range(50): + actions.append(trainer.compute_action(obs)) + check(np.std(actions), 0.0, false=True) + + # Low softmax temperature. Behaves like argmax + # (but no epsilon exploration). + config["exploration_config"] = { + "type": "SoftQ", + "temperature": 0.0 + } + trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") + # Due to the low temp, always expect the same action. + a_ = trainer.compute_action(obs) + for _ in range(50): + a = trainer.compute_action(obs) + check(a, a_) + + # Higher softmax temperature. + config["exploration_config"]["temperature"] = 1.0 + trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") + + # Even with the higher temperature, if we set explore=False, we + # should expect the same actions always. + a_ = trainer.compute_action(obs, explore=False) + for _ in range(50): + a = trainer.compute_action(obs, explore=False) + check(a, a_) + + # Due to the higher temp, expect different actions avg'ing + # around 1.5. + actions = [] + for _ in range(300): + actions.append(trainer.compute_action(obs)) + check(np.std(actions), 0.0, false=True) + + # With Random exploration. + config["exploration_config"] = {"type": "Random"} + config["explore"] = True + trainer = dqn.DQNTrainer(config=config, env="FrozenLake-v0") + actions = [] + for _ in range(300): + actions.append(trainer.compute_action(obs)) + check(np.std(actions), 0.0, false=True) + + +if __name__ == "__main__": + import unittest + unittest.main(verbosity=1) diff --git a/rllib/agents/impala/impala.py b/rllib/agents/impala/impala.py index 2168ac487..fa59b4a14 100644 --- a/rllib/agents/impala/impala.py +++ b/rllib/agents/impala/impala.py @@ -15,7 +15,6 @@ DEFAULT_CONFIG = with_common_config({ "vtrace": True, "vtrace_clip_rho_threshold": 1.0, "vtrace_clip_pg_rho_threshold": 1.0, - # System params. # # == Overview of data flow in IMPALA == @@ -95,8 +94,7 @@ def validate_config(config): # PyTorch check. if config["use_pytorch"]: raise ValueError( - "IMPALA does not support PyTorch yet! Use tf instead." - ) + "IMPALA does not support PyTorch yet! Use tf instead.") if config["entropy_coeff"] < 0: raise DeprecationWarning("entropy_coeff must be >= 0") diff --git a/rllib/agents/pg/pg_torch_policy.py b/rllib/agents/pg/pg_torch_policy.py index 4f6e8f405..f63b76cb1 100644 --- a/rllib/agents/pg/pg_torch_policy.py +++ b/rllib/agents/pg/pg_torch_policy.py @@ -17,8 +17,7 @@ def pg_torch_loss(policy, model, dist_class, train_batch): # policy.pi_err = -train_batch[Postprocessing.ADVANTAGES].dot( # log_probs.reshape(-1)) / len(log_probs) policy.pi_err = -torch.mean( - log_probs * train_batch[Postprocessing.ADVANTAGES] - ) + log_probs * train_batch[Postprocessing.ADVANTAGES]) return policy.pi_err diff --git a/rllib/agents/pg/tests/test_pg.py b/rllib/agents/pg/tests/test_pg.py index 281d0289f..9f3e1b2fa 100644 --- a/rllib/agents/pg/tests/test_pg.py +++ b/rllib/agents/pg/tests/test_pg.py @@ -98,3 +98,8 @@ class TestPG(unittest.TestCase): expected_logp.detach().numpy() * train_batch[Postprocessing.ADVANTAGES].numpy()) check(results.detach().numpy(), expected_loss, decimals=4) + + +if __name__ == "__main__": + import unittest + unittest.main(verbosity=1) diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index 25f990656..4de31a3e2 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -19,7 +19,6 @@ DEFAULT_CONFIG = with_common_config({ # If true, use the Generalized Advantage Estimator (GAE) # with a value function, see https://arxiv.org/pdf/1506.02438.pdf. "use_gae": True, - # The GAE(lambda) parameter. "lambda": 1.0, # Initial coefficient for KL divergence. diff --git a/rllib/agents/ppo/ppo_torch_policy.py b/rllib/agents/ppo/ppo_torch_policy.py index 92bfb3e6a..ad9ae27dc 100644 --- a/rllib/agents/ppo/ppo_torch_policy.py +++ b/rllib/agents/ppo/ppo_torch_policy.py @@ -163,8 +163,6 @@ def vf_preds_and_logits_fetches(policy, input_dict, state_batches, model, return { SampleBatch.VF_PREDS: policy.model.value_function().cpu().numpy(), BEHAVIOUR_LOGITS: policy.model.last_output().cpu().numpy(), - ACTION_LOGP: action_dist.logp( - input_dict[SampleBatch.ACTIONS]).cpu().numpy(), } diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index 4e8f753cb..ca75d395d 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -14,9 +14,12 @@ from ray.rllib.models.torch.torch_modelv2 import TorchModelV2 from ray.rllib.models.torch.torch_action_dist import TorchCategorical from ray.rllib.policy.policy import ACTION_LOGP from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.numpy import fc from ray.rllib.utils.test_utils import check +tf = try_import_tf() + class TestPPO(unittest.TestCase): @@ -41,6 +44,48 @@ class TestPPO(unittest.TestCase): for i in range(num_iterations): trainer.train() + def test_ppo_exploration_setup(self): + """Tests, whether PPO runs with different exploration setups.""" + config = ppo.DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + config["env_config"] = {"is_slippery": False, "map_name": "4x4"} + obs = np.array(0) + + # Test against all frameworks. + for fw in ["tf", "eager", "torch"]: + config["eager"] = True if fw == "eager" else False + config["use_pytorch"] = True if fw == "torch" else False + + # Default Agent should be setup with StochasticSampling. + trainer = ppo.PPOTrainer(config=config, env="FrozenLake-v0") + # explore=False, always expect the same (deterministic) action. + a_ = trainer.compute_action( + obs, + explore=False, + prev_action=np.array(2), + prev_reward=np.array(1.0)) + # Test whether this is really the argmax action over the logits. + if fw != "tf": + last_out = trainer.get_policy().model.last_output() + check(a_, np.argmax(last_out.numpy(), 1)[0]) + for _ in range(50): + a = trainer.compute_action( + obs, + explore=False, + prev_action=np.array(2), + prev_reward=np.array(1.0)) + check(a, a_) + + # With explore=True (default), expect stochastic actions. + actions = [] + for _ in range(300): + actions.append( + trainer.compute_action( + obs, + prev_action=np.array(2), + prev_reward=np.array(1.0))) + check(np.mean(actions), 1.5, atol=0.2) + def test_ppo_loss_function(self): """Tests the PPO loss function math.""" config = ppo.DEFAULT_CONFIG.copy() @@ -95,11 +140,11 @@ class TestPPO(unittest.TestCase): policy, policy.model, Categorical, train_batch, expected_logits, expected_value_outs ) - check(kl, policy.loss_obj.mean_kl) - check(entropy, policy.loss_obj.mean_entropy) - check(np.mean(-pg_loss), policy.loss_obj.mean_policy_loss) - check(np.mean(vf_loss), policy.loss_obj.mean_vf_loss, decimals=4) - check(policy.loss_obj.loss.numpy(), overall_loss, decimals=4) + check(policy.loss_obj.mean_kl, kl) + check(policy.loss_obj.mean_entropy, entropy) + check(policy.loss_obj.mean_policy_loss, np.mean(-pg_loss)) + check(policy.loss_obj.mean_vf_loss, np.mean(vf_loss), decimals=4) + check(policy.loss_obj.loss, overall_loss, decimals=4) # Torch. config["use_pytorch"] = True @@ -123,16 +168,11 @@ class TestPPO(unittest.TestCase): policy.model.last_output(), policy.model.value_function().detach().numpy() ) - check(kl, policy.loss_obj.mean_kl.detach().numpy()) - check(entropy, policy.loss_obj.mean_entropy.detach().numpy()) - check( - np.mean(-pg_loss), - policy.loss_obj.mean_policy_loss.detach().numpy()) - check( - np.mean(vf_loss), - policy.loss_obj.mean_vf_loss.detach().numpy(), - decimals=4) - check(policy.loss_obj.loss.detach().numpy(), overall_loss, decimals=4) + check(policy.loss_obj.mean_kl, kl) + check(policy.loss_obj.mean_entropy, entropy) + check(policy.loss_obj.mean_policy_loss, np.mean(-pg_loss)) + check(policy.loss_obj.mean_vf_loss, np.mean(vf_loss), decimals=4) + check(policy.loss_obj.loss, overall_loss, decimals=4) def _ppo_loss_helper(self, policy, model, dist_class, train_batch, logits, vf_outs): @@ -180,3 +220,8 @@ class TestPPO(unittest.TestCase): policy.config["vf_loss_coeff"] * vf_loss - policy.entropy_coeff * entropy) return kl, entropy, pg_loss, vf_loss, overall_loss + + +if __name__ == "__main__": + import unittest + unittest.main(verbosity=1) diff --git a/rllib/agents/qmix/qmix_policy.py b/rllib/agents/qmix/qmix_policy.py index 2464ca4d7..463840716 100644 --- a/rllib/agents/qmix/qmix_policy.py +++ b/rllib/agents/qmix/qmix_policy.py @@ -10,13 +10,14 @@ import ray from ray.rllib.agents.qmix.mixers import VDNMixer, QMixer from ray.rllib.agents.qmix.model import RNNModel, _get_size from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY -from ray.rllib.policy.policy import TupleActions, Policy +from ray.rllib.policy.policy import Policy from ray.rllib.policy.rnn_sequencing import chop_into_sequences from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models.catalog import ModelCatalog from ray.rllib.models.model import _unpack_obs from ray.rllib.env.constants import GROUP_REWARDS from ray.rllib.utils.annotations import override +from ray.rllib.utils.tuple_actions import TupleActions logger = logging.getLogger(__name__) @@ -155,9 +156,11 @@ class QMixTorchPolicy(Policy): dict space with an action_mask key, e.g. {"obs": ob, "action_mask": mask}. The mask space must be `Box(0, 1, (n_actions,))`. """ + def __init__(self, obs_space, action_space, config): _validate(obs_space, action_space) config = dict(ray.rllib.agents.qmix.qmix.DEFAULT_CONFIG, **config) + self.framework = "torch" super().__init__(obs_space, action_space, config) self.n_agents = len(obs_space.original_space.spaces) self.n_actions = action_space.spaces[0].n @@ -251,7 +254,9 @@ class QMixTorchPolicy(Policy): prev_reward_batch=None, info_batch=None, episodes=None, + explore=None, **kwargs): + explore = explore if explore is not None else self.config["explore"] obs_batch, action_mask, _ = self._unpack_observation(obs_batch) # We need to ensure we do not use the env global state # to compute actions @@ -271,7 +276,8 @@ class QMixTorchPolicy(Policy): masked_q_values[avail == 0.0] = -float("inf") # epsilon-greedy action selector random_numbers = th.rand_like(q_values[:, :, 0]) - pick_random = (random_numbers < self.cur_epsilon).long() + pick_random = (random_numbers < (self.cur_epsilon + if explore else 0.0)).long() random_actions = Categorical(avail).sample().long() actions = (pick_random * random_actions + (1 - pick_random) * masked_q_values.argmax(dim=2)) diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index d5339b60f..839cd1de1 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -38,7 +38,6 @@ DEFAULT_CONFIG = with_common_config({ "no_done_at_end": True, # N-step target updates "n_step": 1, - # === Evaluation === # The evaluation stats will be reported under the "evaluation" metric key. "evaluation_interval": 1, @@ -46,13 +45,11 @@ DEFAULT_CONFIG = with_common_config({ "evaluation_num_episodes": 1, # Extra configuration that disables exploration. "evaluation_config": { - "exploration_enabled": False, + "explore": False, }, - # === Exploration === # Number of env steps to optimize for before returning "timesteps_per_iteration": 100, - "exploration_enabled": True, # === Replay buffer === # Size of the replay buffer. Note that if async_updates is set, then @@ -103,13 +100,6 @@ DEFAULT_CONFIG = with_common_config({ "worker_side_prioritization": False, # Prevent iterations from going lower than this time span. "min_iter_time_s": 1, - - # DEPRECATED: - "per_worker_exploration": -1, - "exploration_fraction": -1, - "schedule_max_timesteps": -1, - "exploration_initial_eps": -1, - "exploration_final_eps": -1, }) # __sphinx_doc_end__ # yapf: enable diff --git a/rllib/agents/sac/sac_policy.py b/rllib/agents/sac/sac_policy.py index 306b4a7d2..4ccb5cfc6 100644 --- a/rllib/agents/sac/sac_policy.py +++ b/rllib/agents/sac/sac_policy.py @@ -87,7 +87,7 @@ def postprocess_trajectory(policy, def build_action_output(policy, model, input_dict, obs_space, action_space, - config): + explore, config, timestep): model_out, _ = model({ "obs": input_dict[SampleBatch.CUR_OBS], "is_training": policy._get_is_training_placeholder(), @@ -116,13 +116,17 @@ def build_action_output(policy, model, input_dict, obs_space, action_space, "normalize_actions"] else unsquash_actions( squashed_deterministic_actions) - actions = tf.cond(policy.stochastic, lambda: stochastic_actions, - lambda: deterministic_actions) + actions = tf.cond( + tf.constant(explore) if isinstance(explore, bool) else explore, + true_fn=lambda: stochastic_actions, + false_fn=lambda: deterministic_actions) + logp = tf.cond( + tf.constant(explore) if isinstance(explore, bool) else explore, + true_fn=lambda: log_pis, + false_fn=lambda: tf.zeros_like(log_pis)) - action_probabilities = tf.cond(policy.stochastic, lambda: log_pis, - lambda: tf.zeros_like(log_pis)) - policy.output_actions = actions - return actions, action_probabilities + policy.output_actions, policy.action_logp = actions, logp + return policy.output_actions, policy.action_logp def actor_critic_loss(policy, model, _, train_batch): @@ -317,19 +321,6 @@ def stats(policy, train_batch): } -class ExplorationStateMixin: - def __init__(self, obs_space, action_space, config): - self.stochastic = tf.get_variable( - initializer=tf.constant_initializer(config["exploration_enabled"]), - name="stochastic", - shape=(), - trainable=False, - dtype=tf.bool) - - def set_epsilon(self, epsilon): - pass - - class ActorCriticOptimizerMixin: def __init__(self, config): # create global step for counting the number of update operations @@ -401,7 +392,6 @@ class TargetNetworkMixin: def setup_early_mixins(policy, obs_space, action_space, config): - ExplorationStateMixin.__init__(policy, obs_space, action_space, config) ActorCriticOptimizerMixin.__init__(policy, config) @@ -425,8 +415,7 @@ SACTFPolicy = build_tf_policy( apply_gradients_fn=apply_gradients, extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, mixins=[ - TargetNetworkMixin, ExplorationStateMixin, ActorCriticOptimizerMixin, - ComputeTDErrorMixin + TargetNetworkMixin, ActorCriticOptimizerMixin, ComputeTDErrorMixin ], before_init=setup_early_mixins, before_loss_init=setup_mid_mixins, diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 597186f8c..f43d33df9 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -166,10 +166,20 @@ COMMON_CONFIG = { "no_eager_on_workers": False, # === Exploration Settings === + # Default exploration behavior, iff `explore`=None is passed into + # compute_action(s). + # Set to False for no exploration behavior (e.g., for evaluation). + "explore": True, # Provide a dict specifying the Exploration object's config. - # Set to False or None for no exploration behavior (e.g., for evaluation). - "exploration": False, - + "exploration_config": { + # The Exploration class to use. In the simplest case, this is the name + # (str) of any class present in the `rllib.utils.exploration` package. + # You can also provide the python class directly or the full location + # of your class (e.g. "ray.rllib.utils.exploration.epsilon_greedy. + # EpsilonGreedy"). + "type": "StochasticSampling", + # Add constructor kwargs here (if any). + }, # === Evaluation Settings === # Evaluate with every `evaluation_interval` training iterations. # The evaluation stats will be reported under the "evaluation" metric key. @@ -182,13 +192,14 @@ COMMON_CONFIG = { # Internal flag that is set to True for evaluation workers. "in_evaluation": False, # Typical usage is to pass extra args to evaluation env creator - # and to disable exploration by computing deterministic actions + # and to disable exploration by computing deterministic actions. + # IMPORTANT NOTE: Policy gradient algorithms are able to find the optimal + # policy, even if this is a stochastic one. Setting "explore=False" here + # will result in the evaluation workers not using this optimal policy! "evaluation_config": { # Example: overriding env_config, exploration, etc: # "env_config": {...}, - # "exploration_fraction": 0, - # "exploration_final_eps": 0, - "exploration": False + # "explore": False }, # Number of parallel workers to use for evaluation. Note that this is set # to zero by default, which means evaluation will be run in the trainer @@ -371,14 +382,21 @@ class Trainer(Trainable): config (obj): Algorithm-specific configuration data. logdir (str): Directory in which training outputs should be placed. """ - + # Whether to allow unknown top-level config keys. _allow_unknown_configs = False + + # List of top-level keys with value=dict, for which new sub-keys are + # allowed to be added to the value dict. _allow_unknown_subkeys = [ "tf_session_args", "local_tf_session_args", "env_config", "model", "optimizer", "multiagent", "custom_resources_per_worker", - "evaluation_config" + "evaluation_config", "exploration_config" ] + # List of top level keys with value=dict, for which we always override the + # entire value (dict), iff the "type" key in that value dict changes. + _override_all_subkeys_if_type_changes = ["exploration_config"] + @PublicAPI def __init__(self, config=None, env=None, logger_creator=None): """Initialize an RLLib trainer. @@ -532,11 +550,12 @@ class Trainer(Trainable): else: self.env_creator = lambda env_config: None - # Merge the supplied config with the class default + # Merge the supplied config with the class default. merged_config = copy.deepcopy(self._default_config) merged_config = deep_update(merged_config, config, self._allow_unknown_configs, - self._allow_unknown_subkeys) + self._allow_unknown_subkeys, + self._override_all_subkeys_if_type_changes) self.raw_user_config = config self.config = merged_config @@ -713,7 +732,7 @@ class Trainer(Trainable): info=None, policy_id=DEFAULT_POLICY_ID, full_fetch=False, - explore=True): + explore=None): """Computes an action for the specified policy on the local Worker. Note that you can also access the policy object through @@ -732,7 +751,8 @@ class Trainer(Trainable): policy_id (str): Policy to query (only applies to multi-agent). full_fetch (bool): Whether to return extra action fetch results. This is always set to True if RNN state is specified. - explore (bool): Whether to pick an action using exploration or not. + explore (bool): Whether to pick an exploitation or exploration + action (default: None -> use self.config["explore"]). Returns: any: The computed action if full_fetch=False, or diff --git a/rllib/evaluation/sampler.py b/rllib/evaluation/sampler.py index 81e949ab4..b49370aa7 100644 --- a/rllib/evaluation/sampler.py +++ b/rllib/evaluation/sampler.py @@ -9,13 +9,14 @@ from ray.rllib.evaluation.episode import MultiAgentEpisode, _flatten_action from ray.rllib.evaluation.rollout_metrics import RolloutMetrics from ray.rllib.evaluation.sample_batch_builder import \ MultiAgentSampleBatchBuilder -from ray.rllib.policy.policy import TupleActions, clip_action +from ray.rllib.policy.policy import clip_action from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.env.base_env import BaseEnv, ASYNC_RESET_RETURN from ray.rllib.env.atari_wrappers import get_wrapper_by_cls, MonitorEnv from ray.rllib.offline import InputReader from ray.rllib.utils.annotations import override from ray.rllib.utils.debug import log_once, summarize +from ray.rllib.utils.tuple_actions import TupleActions from ray.rllib.utils.tf_run_builder import TFRunBuilder logger = logging.getLogger(__name__) diff --git a/rllib/examples/autoregressive_action_dist.py b/rllib/examples/autoregressive_action_dist.py index 943e50539..3a720e6fc 100644 --- a/rllib/examples/autoregressive_action_dist.py +++ b/rllib/examples/autoregressive_action_dist.py @@ -21,7 +21,7 @@ from ray.rllib.models import ModelCatalog from ray.rllib.models.tf.tf_action_dist import Categorical, ActionDistribution from ray.rllib.models.tf.misc import normc_initializer from ray.rllib.models.tf.tf_modelv2 import TFModelV2 -from ray.rllib.policy.policy import TupleActions +from ray.rllib.utils.tuple_actions import TupleActions from ray.rllib.utils import try_import_tf tf = try_import_tf() @@ -67,6 +67,19 @@ class BinaryAutoregressiveOutput(ActionDistribution): def required_model_output_shape(self, model_config): return 16 # controls model output feature vector size + def deterministic_sample(self): + # first, sample a1 + a1_dist = self._a1_distribution() + a1 = a1_dist.deterministic_sample() + + # sample a2 conditioned on a1 + a2_dist = self._a2_distribution(a1) + a2 = a2_dist.deterministic_sample() + self._action_logp = a1_dist.logp(a1) + a2_dist.logp(a2) + + # return the action tuple + return TupleActions([a1, a2]) + def sample(self): # first, sample a1 a1_dist = self._a1_distribution() diff --git a/rllib/examples/multiagent_cartpole.py b/rllib/examples/multi_agent_cartpole.py similarity index 100% rename from rllib/examples/multiagent_cartpole.py rename to rllib/examples/multi_agent_cartpole.py diff --git a/rllib/examples/multiagent_custom_policy.py b/rllib/examples/multi_agent_custom_policy.py similarity index 100% rename from rllib/examples/multiagent_custom_policy.py rename to rllib/examples/multi_agent_custom_policy.py diff --git a/rllib/examples/multiagent_two_trainers.py b/rllib/examples/multi_agent_two_trainers.py similarity index 98% rename from rllib/examples/multiagent_two_trainers.py rename to rllib/examples/multi_agent_two_trainers.py index 05a7685c3..7c8ca524c 100644 --- a/rllib/examples/multiagent_two_trainers.py +++ b/rllib/examples/multi_agent_two_trainers.py @@ -54,7 +54,7 @@ if __name__ == "__main__": "policy_mapping_fn": policy_mapping_fn, "policies_to_train": ["ppo_policy"], }, - "exploration": False, + "explore": False, # disable filters, otherwise we would need to synchronize those # as well to the DQN agent "observation_filter": "NoFilter", diff --git a/rllib/examples/rock_paper_scissors_multiagent.py b/rllib/examples/rock_paper_scissors_multiagent.py index 614eccd2d..11f4a2909 100644 --- a/rllib/examples/rock_paper_scissors_multiagent.py +++ b/rllib/examples/rock_paper_scissors_multiagent.py @@ -103,6 +103,7 @@ class AlwaysSameHeuristic(Policy): class BeatLastHeuristic(Policy): """Play the move that would beat the last move of the opponent.""" + def compute_actions(self, obs_batch, state_batches=None, diff --git a/rllib/models/action_dist.py b/rllib/models/action_dist.py index e6e21dde2..5ee4f2e7c 100644 --- a/rllib/models/action_dist.py +++ b/rllib/models/action_dist.py @@ -29,6 +29,15 @@ class ActionDistribution: """Draw a sample from the action distribution.""" raise NotImplementedError + @DeveloperAPI + def deterministic_sample(self): + """ + Get the deterministic "sampling" output from the distribution. + This is usually the max likelihood output, i.e. mean for Normal, argmax + for Categorical, etc.. + """ + raise NotImplementedError + @DeveloperAPI def sampled_action_logp(self): """Returns the log probability of the last sampled action.""" diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index ba4542040..4fd864fde 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -7,13 +7,13 @@ from ray.tune.registry import RLLIB_MODEL, RLLIB_PREPROCESSOR, \ RLLIB_ACTION_DIST, _global_registry from ray.rllib.models.extra_spaces import Simplex +from ray.rllib.models.action_dist import ActionDistribution from ray.rllib.models.torch.torch_action_dist import (TorchCategorical, TorchDiagGaussian) from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2 from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2 -from ray.rllib.models.tf.tf_action_dist import ( - Categorical, MultiCategorical, Deterministic, DiagGaussian, - MultiActionDistribution, Dirichlet) +from ray.rllib.models.tf.tf_action_dist import Categorical, MultiCategorical, \ + Deterministic, DiagGaussian, MultiActionDistribution, Dirichlet from ray.rllib.models.preprocessors import get_preprocessor from ray.rllib.models.tf.fcnet_v1 import FullyConnectedNetwork from ray.rllib.models.tf.lstm_v1 import LSTM @@ -24,7 +24,6 @@ from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils import try_import_tf from ray.rllib.utils.annotations import DeveloperAPI, PublicAPI from ray.rllib.utils.error import UnsupportedSpaceException -from ray.rllib.utils.deprecation import deprecation_warning tf = try_import_tf() @@ -77,8 +76,9 @@ MODEL_DEFAULTS = { # === Options for custom models === # Name of a custom model to use "custom_model": None, - # Name of a custom action distribution to use + # Name of a custom action distribution to use. "custom_action_dist": None, + # Extra options to pass to the custom classes "custom_options": {}, # Custom preprocessors are deprecated. Please use a wrapper class around @@ -109,35 +109,36 @@ class ModelCatalog: def get_action_dist(action_space, config, dist_type=None, - torch=None, - framework="tf"): + framework="tf", + **kwargs): """Returns a distribution class and size for the given action space. Args: action_space (Space): Action space of the target gym env. - config (dict): Optional model config. + config (Optional[dict]): Optional model config. dist_type (Optional[str]): Identifier of the action distribution. - torch (bool): Obsoleted: Whether to return PyTorch Model and - distribution (use framework="torch" instead). framework (str): One of "tf" or "torch". + kwargs (dict): Optional kwargs to pass on to the Distribution's + constructor. Returns: dist_class (ActionDistribution): Python class of the distribution. dist_dim (int): The size of the input vector to the distribution. """ - # Obsoleted parameter `torch`: - if torch is not None: - deprecation_warning("`torch` parameter", "`framework`='tf|torch'") - framework = "torch" if torch else "tf" - dist = None config = config or MODEL_DEFAULTS + # Custom distribution given. if config.get("custom_action_dist"): action_dist_name = config["custom_action_dist"] logger.debug( "Using custom action distribution {}".format(action_dist_name)) dist = _global_registry.get(RLLIB_ACTION_DIST, action_dist_name) - + # Dist_type is given directly as a class. + elif type(dist_type) is type and \ + issubclass(dist_type, ActionDistribution) and \ + dist_type is not MultiActionDistribution: + dist = dist_type + # Box space -> DiagGaussian OR Deterministic. elif isinstance(action_space, gym.spaces.Box): if len(action_space.shape) > 1: raise UnsupportedSpaceException( @@ -146,13 +147,17 @@ class ModelCatalog: "Consider reshaping this into a single dimension, " "using a custom action distribution, " "using a Tuple action space, or the multi-agent API.") + # TODO(sven): Check for bounds and return SquashedNormal, etc.. if dist_type is None: dist = DiagGaussian if framework == "tf" else TorchDiagGaussian elif dist_type == "deterministic": dist = Deterministic + # Discrete Space -> Categorical. elif isinstance(action_space, gym.spaces.Discrete): dist = Categorical if framework == "tf" else TorchCategorical - elif isinstance(action_space, gym.spaces.Tuple): + # Tuple Space -> MultiAction. + elif dist_type is MultiActionDistribution or \ + isinstance(action_space, gym.spaces.Tuple): if framework == "torch": # TODO(sven): implement raise NotImplementedError( @@ -166,15 +171,17 @@ class ModelCatalog: input_lens.append(action_size) return partial( MultiActionDistribution, - child_distributions=child_dist, action_space=action_space, + child_distributions=child_dist, input_lens=input_lens), sum(input_lens) + # Simplex -> Dirichlet. elif isinstance(action_space, Simplex): if framework == "torch": # TODO(sven): implement raise NotImplementedError( "Simplex action spaces not supported for torch.") dist = Dirichlet + # MultiDiscrete -> MultiCategorical. elif isinstance(action_space, gym.spaces.MultiDiscrete): if framework == "torch": # TODO(sven): implement @@ -182,11 +189,13 @@ class ModelCatalog: "MultiDiscrete action spaces not supported for Pytorch.") return partial(MultiCategorical, input_lens=action_space.nvec), \ int(sum(action_space.nvec)) + # Dict -> TODO(sven) elif isinstance(action_space, gym.spaces.Dict): # TODO(sven): implement raise NotImplementedError( "Dict action spaces are not supported, consider using " "gym.spaces.Tuple instead") + # Unknown type -> Error. else: raise NotImplementedError("Unsupported args: {} {}".format( action_space, dist_type)) @@ -231,7 +240,7 @@ class ModelCatalog: @staticmethod @DeveloperAPI - def get_action_placeholder(action_space): + def get_action_placeholder(action_space, name=None): """Returns an action placeholder consistent with the action space Args: @@ -242,7 +251,7 @@ class ModelCatalog: dtype, shape = ModelCatalog.get_action_shape(action_space) - return tf.placeholder(dtype, shape=shape, name="action") + return tf.placeholder(dtype, shape=shape, name=(name or "action")) @staticmethod @DeveloperAPI @@ -250,7 +259,7 @@ class ModelCatalog: action_space, num_outputs, model_config, - framework, + framework="tf", name="default_model", model_interface=None, default_model=None, @@ -263,7 +272,7 @@ class ModelCatalog: unflatten the tensor into a ragged tensor. action_space (Space): Action space of the target gym env. num_outputs (int): The size of the output vector of the model. - framework (str): Either "tf" or "torch". + framework (str): One of "tf" or "torch". name (str): Name (scope) for the model. model_interface (cls): Interface required for the model default_model (cls): Override the default class for the model. This diff --git a/rllib/models/tests/test_distributions.py b/rllib/models/tests/test_distributions.py index 52caf04cd..f6bdf818e 100644 --- a/rllib/models/tests/test_distributions.py +++ b/rllib/models/tests/test_distributions.py @@ -23,3 +23,8 @@ class TestDistributions(unittest.TestCase): counts[sample] += 1.0 probs = np.exp(z) / np.sum(np.exp(z)) self.assertTrue(np.sum(np.abs(probs - counts / num_samples)) <= 0.01) + + +if __name__ == "__main__": + import unittest + unittest.main(verbosity=1) diff --git a/rllib/models/tf/tf_action_dist.py b/rllib/models/tf/tf_action_dist.py index ed6858e2e..8f21fd708 100644 --- a/rllib/models/tf/tf_action_dist.py +++ b/rllib/models/tf/tf_action_dist.py @@ -2,9 +2,9 @@ import numpy as np import functools from ray.rllib.models.action_dist import ActionDistribution -from ray.rllib.policy.policy import TupleActions from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils import try_import_tf +from ray.rllib.utils.tuple_actions import TupleActions tf = try_import_tf() @@ -42,8 +42,15 @@ class Categorical(TFActionDistribution): """Categorical distribution for discrete action spaces.""" @DeveloperAPI - def __init__(self, inputs, model=None): - super().__init__(inputs, model) + def __init__(self, inputs, model=None, temperature=1.0): + temperature = max(0.0001, temperature) # clamp for stability reasons + # Allow softmax formula w/ temperature != 1.0: + # Divide inputs by temperature. + super().__init__(inputs / temperature, model) + + @override(ActionDistribution) + def deterministic_sample(self): + return tf.math.argmax(self.inputs, axis=1) @override(ActionDistribution) def logp(self, x): @@ -52,12 +59,11 @@ class Categorical(TFActionDistribution): @override(ActionDistribution) def entropy(self): - a0 = self.inputs - tf.reduce_max( - self.inputs, reduction_indices=[1], keep_dims=True) + a0 = self.inputs - tf.reduce_max(self.inputs, axis=1, keep_dims=True) ea0 = tf.exp(a0) - z0 = tf.reduce_sum(ea0, reduction_indices=[1], keep_dims=True) + z0 = tf.reduce_sum(ea0, axis=1, keep_dims=True) p0 = ea0 / z0 - return tf.reduce_sum(p0 * (tf.log(z0) - a0), reduction_indices=[1]) + return tf.reduce_sum(p0 * (tf.log(z0) - a0), axis=1) @override(ActionDistribution) def kl(self, other): @@ -92,6 +98,10 @@ class MultiCategorical(TFActionDistribution): ] self.sample_op = self._build_sample_op() + @override(ActionDistribution) + def deterministic_sample(self): + return tf.math.argmax(self.inputs, axis=-1) + @override(ActionDistribution) def logp(self, actions): # If tensor is provided, unstack it into list @@ -143,12 +153,16 @@ class DiagGaussian(TFActionDistribution): self.std = tf.exp(log_std) super().__init__(inputs, model) + @override(ActionDistribution) + def deterministic_sample(self): + return self.mean + @override(ActionDistribution) def logp(self, x): - return (-0.5 * tf.reduce_sum( - tf.square((x - self.mean) / self.std), reduction_indices=[1]) - - 0.5 * np.log(2.0 * np.pi) * tf.to_float(tf.shape(x)[1]) - - tf.reduce_sum(self.log_std, reduction_indices=[1])) + return -0.5 * tf.reduce_sum( + tf.square((x - self.mean) / self.std), axis=1) - \ + 0.5 * np.log(2.0 * np.pi) * tf.to_float(tf.shape(x)[1]) - \ + tf.reduce_sum(self.log_std, axis=1) @override(ActionDistribution) def kl(self, other): @@ -157,13 +171,12 @@ class DiagGaussian(TFActionDistribution): other.log_std - self.log_std + (tf.square(self.std) + tf.square(self.mean - other.mean)) / (2.0 * tf.square(other.std)) - 0.5, - reduction_indices=[1]) + axis=1) @override(ActionDistribution) def entropy(self): return tf.reduce_sum( - self.log_std + .5 * np.log(2.0 * np.pi * np.e), - reduction_indices=[1]) + self.log_std + .5 * np.log(2.0 * np.pi * np.e), axis=1) @override(TFActionDistribution) def _build_sample_op(self): @@ -181,6 +194,10 @@ class Deterministic(TFActionDistribution): This is similar to DiagGaussian with standard deviation zero. """ + @override(ActionDistribution) + def deterministic_sample(self): + return self.inputs + @override(TFActionDistribution) def sampled_action_logp(self): return 0.0 @@ -251,6 +268,11 @@ class MultiActionDistribution(TFActionDistribution): def sample(self): return TupleActions([s.sample() for s in self.child_distributions]) + @override(ActionDistribution) + def deterministic_sample(self): + return TupleActions( + [s.deterministic_sample() for s in self.child_distributions]) + @override(TFActionDistribution) def sampled_action_logp(self): p = self.child_distributions[0].sampled_action_logp() diff --git a/rllib/models/torch/torch_action_dist.py b/rllib/models/torch/torch_action_dist.py index 9cdaec6c0..a1f7748e9 100644 --- a/rllib/models/torch/torch_action_dist.py +++ b/rllib/models/torch/torch_action_dist.py @@ -10,6 +10,11 @@ torch, nn = try_import_torch() class TorchDistributionWrapper(ActionDistribution): """Wrapper class for torch.distributions.""" + def __init_(self, inputs): + super().__init__(inputs) + # Store the last sample here. + self.last_sample = None + @override(ActionDistribution) def logp(self, actions): return self.dist.log_prob(actions) @@ -24,7 +29,13 @@ class TorchDistributionWrapper(ActionDistribution): @override(ActionDistribution) def sample(self): - return self.dist.sample() + self.last_sample = self.dist.sample() + return self.last_sample + + @override(ActionDistribution) + def sampled_action_logp(self): + assert self.last_sample is not None + return self.logp(self.last_sample) class TorchCategorical(TorchDistributionWrapper): @@ -32,8 +43,13 @@ class TorchCategorical(TorchDistributionWrapper): @override(ActionDistribution) def __init__(self, inputs, model): + super().__init__(inputs, model) self.dist = torch.distributions.categorical.Categorical(logits=inputs) + @override(ActionDistribution) + def deterministic_sample(self): + return self.dist.probs.argmax(dim=1) + @staticmethod @override(ActionDistribution) def required_model_output_shape(action_space, model_config): @@ -45,9 +61,14 @@ class TorchDiagGaussian(TorchDistributionWrapper): @override(ActionDistribution) def __init__(self, inputs, model): + super().__init__(inputs, model) mean, log_std = torch.chunk(inputs, 2, dim=1) self.dist = torch.distributions.normal.Normal(mean, torch.exp(log_std)) + @override(ActionDistribution) + def deterministic_sample(self): + return self.dist.mean + @override(TorchDistributionWrapper) def logp(self, actions): return TorchDistributionWrapper.logp(self, actions).sum(-1) diff --git a/rllib/offline/is_estimator.py b/rllib/offline/is_estimator.py index 7ea542b30..6c4ce9192 100644 --- a/rllib/offline/is_estimator.py +++ b/rllib/offline/is_estimator.py @@ -8,9 +8,6 @@ class ImportanceSamplingEstimator(OffPolicyEstimator): Step-wise IS estimator described in https://arxiv.org/pdf/1511.03722.pdf""" - def __init__(self, policy, gamma): - OffPolicyEstimator.__init__(self, policy, gamma) - @override(OffPolicyEstimator) def estimate(self, batch): self.check_can_estimate_for(batch) diff --git a/rllib/offline/off_policy_estimator.py b/rllib/offline/off_policy_estimator.py index 7af8e039d..82c49b898 100644 --- a/rllib/offline/off_policy_estimator.py +++ b/rllib/offline/off_policy_estimator.py @@ -57,13 +57,17 @@ class OffPolicyEstimator: if k.startswith("state_in_"): num_state_inputs += 1 state_keys = ["state_in_{}".format(i) for i in range(num_state_inputs)] + + # TODO(sven): This is wrong. The info["action_prob"] needs to refer + # to the old action (from the batch). It might be the action-prob of + # a different action (as the policy has changed). + # https://github.com/ray-project/ray/issues/7107 _, _, info = self.policy.compute_actions( obs_batch=batch["obs"], state_batches=[batch[k] for k in state_keys], prev_action_batch=batch.data.get("prev_action"), prev_reward_batch=batch.data.get("prev_reward"), - info_batch=batch.data.get("info"), - explore=False) # switch off any exploration + info_batch=batch.data.get("info")) if "action_prob" not in info: raise ValueError( "Off-policy estimation is not possible unless the policy " diff --git a/rllib/offline/wis_estimator.py b/rllib/offline/wis_estimator.py index 695c3dc35..d4d36490c 100644 --- a/rllib/offline/wis_estimator.py +++ b/rllib/offline/wis_estimator.py @@ -9,7 +9,7 @@ class WeightedImportanceSamplingEstimator(OffPolicyEstimator): Step-wise WIS estimator in https://arxiv.org/pdf/1511.03722.pdf""" def __init__(self, policy, gamma): - OffPolicyEstimator.__init__(self, policy, gamma) + super().__init__(policy, gamma) self.filter_values = [] self.filter_counts = [] diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index 2dbf9881f..37c3814f8 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -8,8 +8,7 @@ from ray.rllib.policy.policy import Policy from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.models.catalog import ModelCatalog -from ray.rllib.utils.annotations import override -from ray.rllib.utils import try_import_tf +from ray.rllib.utils import try_import_tf, override from ray.rllib.utils.debug import log_once, summarize from ray.rllib.utils.tracking_dict import UsageTrackingDict @@ -85,6 +84,7 @@ class DynamicTFPolicy(TFPolicy): previous action and reward in the model input """ self.config = config + self.framework = "tf" self._loss_fn = loss_fn self._stats_fn = stats_fn self._grad_stats_fn = grad_stats_fn @@ -105,10 +105,12 @@ class DynamicTFPolicy(TFPolicy): name="observation") if self._obs_include_prev_action_reward: prev_actions = ModelCatalog.get_action_placeholder( - action_space) + action_space, "prev_action") prev_rewards = tf.placeholder( tf.float32, [None], name="prev_reward") + explore = tf.placeholder_with_default(False, (), name="is_exploring") + self._input_dict = { SampleBatch.CUR_OBS: obs, SampleBatch.PREV_ACTIONS: prev_actions, @@ -157,16 +159,25 @@ class DynamicTFPolicy(TFPolicy): model_out, self._state_out = self.model(self._input_dict, self._state_in, self._seq_lens) + # Create the Exploration object to use for this Policy. + self.exploration = self._create_exploration(action_space, config) + timestep = tf.placeholder(tf.int32, (), name="timestep") + # Setup custom action sampler. if action_sampler_fn: action_sampler, action_logp = action_sampler_fn( self, self.model, self._input_dict, obs_space, action_space, - config) - # Default action sampler. + explore, config, timestep) + # Create a default action sampler. else: - action_dist = self.dist_class(model_out, self.model) - action_sampler = action_dist.sample() - action_logp = action_dist.sampled_action_logp() + # Using an exporation setup. + action_sampler, action_logp = \ + self.exploration.get_exploration_action( + model_out, + self.model, + action_dist_class=self.dist_class, + explore=explore, + timestep=timestep) # Phase 1 init sess = tf.get_default_session() or tf.Session() @@ -192,7 +203,9 @@ class DynamicTFPolicy(TFPolicy): prev_reward_input=prev_rewards, seq_lens=self._seq_lens, max_seq_len=config["model"]["max_seq_len"], - batch_divisibility_req=batch_divisibility_req) + batch_divisibility_req=batch_divisibility_req, + explore=explore, + timestep=timestep) # Phase 2 init. if before_loss_init is not None: diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index 9be9f880d..8ade4a83f 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -113,7 +113,7 @@ def traced_eager_policy(eager_policy_cls): prev_reward_batch=None, info_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): @@ -195,6 +195,7 @@ def build_eager_tf_policy(name, class eager_policy_cls(base): def __init__(self, observation_space, action_space, config): assert tf.executing_eagerly() + self.framework = "tf" Policy.__init__(self, observation_space, action_space, config) self._is_training = False self._loss_initialized = False @@ -296,9 +297,13 @@ def build_eager_tf_policy(name, prev_reward_batch=None, info_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): + + explore = explore if explore is not None else \ + self.config["explore"] + # TODO: remove python side effect to cull sources of bugs. self._is_training = False self._state_in = state_batches @@ -325,25 +330,20 @@ def build_eager_tf_policy(name, model_out, state_out = self.model(input_dict, state_batches, seq_lens) - if self.dist_class: - action_dist = self.dist_class(model_out, self.model) - action = action_dist.sample() - logp = action_dist.sampled_action_logp() - else: + # Custom sampler fn given (which may handle self.exploration). + if action_sampler_fn is not None: action, logp = action_sampler_fn( self, self.model, input_dict, self.observation_space, - self.action_space, self.config) - - # Override `action` with exploration action. - if explore and self.exploration: - action = self.exploration.get_exploration_action( - action, + self.action_space, explore, self.config, timestep) + # Use Exploration object. + else: + action, logp = self.exploration.get_exploration_action( + model_out, self.model, - action_dist=self.dist_class, - explore=True, + action_dist_class=self.dist_class, + explore=explore, timestep=timestep if timestep is not None else self.global_timestep) - logp = None extra_fetches = {} if logp is not None: @@ -523,7 +523,7 @@ def build_eager_tf_policy(name, dummy_batch["seq_lens"] = tf.convert_to_tensor( np.array([1], dtype=np.int32)) - # for IMPALA which expects a certain sample batch size + # for IMPALA which expects a certain sample batch size. def tile_to(tensor, n): return tf.tile(tensor, [n] + [1 for _ in tensor.shape.as_list()[1:]]) diff --git a/rllib/policy/policy.py b/rllib/policy/policy.py index f84bb2df6..7d5fe0722 100644 --- a/rllib/policy/policy.py +++ b/rllib/policy/policy.py @@ -1,5 +1,4 @@ from abc import ABCMeta, abstractmethod -from collections import namedtuple import gym import numpy as np @@ -15,16 +14,6 @@ ACTION_PROB = "action_prob" ACTION_LOGP = "action_logp" -class TupleActions(namedtuple("TupleActions", ["batches"])): - """Used to return tuple actions as a list of batches per tuple element.""" - - def __new__(cls, batches): - return super(TupleActions, cls).__new__(cls, batches) - - def numpy(self): - return TupleActions([b.numpy() for b in self.batches]) - - @DeveloperAPI class Policy(metaclass=ABCMeta): """An agent policy and loss, i.e., a TFPolicy or other subclass. @@ -56,27 +45,17 @@ class Policy(metaclass=ABCMeta): observation_space (gym.Space): Observation space of the policy. action_space (gym.Space): Action space of the policy. config (dict): Policy-specific configuration data. + exploration (Exploration): The exploration object to use for + computing actions. """ self.observation_space = observation_space self.action_space = action_space self.config = config + self.exploration = self._create_exploration(action_space, config) # The global timestep, broadcast down from time to time from the # driver. self.global_timestep = 0 - # Create the Exploration object to use for this Policy. - self.exploration = from_config( - Exploration, - config.get("exploration"), - action_space=self.action_space, - num_workers=self.config.get("num_workers"), - worker_index=self.config.get("worker_index"), - framework="torch" if self.config.get("use_pytorch") else "tf") - - # The default sampling behavior for actions if not explicitly given - # in calls to `compute_actions`. - self.deterministic = config.get("deterministic", False) - @abstractmethod @DeveloperAPI def compute_actions(self, @@ -86,7 +65,7 @@ class Policy(metaclass=ABCMeta): prev_reward_batch=None, info_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): """Computes actions for the current policy. @@ -103,8 +82,8 @@ class Policy(metaclass=ABCMeta): episodes (list): MultiAgentEpisode for each obs in obs_batch. This provides access to all of the internal episode state, which may be useful for model-based or multiagent algorithms. - explore (bool): Whether we should use exploration - (e.g. when training) or not (for inference/evaluation). + explore (bool): Whether to pick an exploitation or exploration + action (default: None -> use self.config["explore"]). timestep (int): The current (sampling) time step. kwargs: forward compatibility placeholder @@ -127,7 +106,7 @@ class Policy(metaclass=ABCMeta): info=None, episode=None, clip_actions=False, - explore=True, + explore=None, timestep=None, **kwargs): """Unbatched version of compute_actions. @@ -142,8 +121,8 @@ class Policy(metaclass=ABCMeta): internal episode state, which may be useful for model-based or multi-agent algorithms. clip_actions (bool): should the action be clipped - explore (bool): Whether we should use exploration (i.e. when - training) or not (e.g. for inference/evaluation). + explore (bool): Whether to pick an exploitation or exploration + action (default: None -> use self.config["explore"]). timestep (int): The current (sampling) time step. kwargs: forward compatibility placeholder @@ -276,8 +255,7 @@ class Policy(metaclass=ABCMeta): Returns: any: Serializable information on the `self.exploration` object. """ - if isinstance(self.exploration, Exploration): - return self.exploration.get_info() + return self.exploration.get_info() @DeveloperAPI def get_exploration_state(self): @@ -288,8 +266,7 @@ class Policy(metaclass=ABCMeta): Returns: any: Serializable copy or view of the current exploration state. """ - if isinstance(self.exploration, Exploration): - raise NotImplementedError + raise NotImplementedError @DeveloperAPI def set_exploration_state(self, exploration_state): @@ -299,8 +276,7 @@ class Policy(metaclass=ABCMeta): exploration_state (any): Serializable copy or view of the new exploration state. """ - if isinstance(self.exploration, Exploration): - raise NotImplementedError + raise NotImplementedError @DeveloperAPI def is_recurrent(self): @@ -372,6 +348,25 @@ class Policy(metaclass=ABCMeta): """ raise NotImplementedError + def _create_exploration(self, action_space, config): + """Creates the Policy's Exploration object. + + This method only exists b/c some Trainers do not use TfPolicy nor + TorchPolicy, but inherit directly from Policy. Others inherit from + TfPolicy w/o using DynamicTfPolicy. + TODO(sven): unify these cases.""" + exploration = from_config( + Exploration, + config.get("exploration_config", {"type": "StochasticSampling"}), + action_space=action_space, + num_workers=config.get("num_workers"), + worker_index=config.get("worker_index"), + framework=getattr(self, "framework", "tf")) + # If config is further passed around, it'll contain an already + # instantiated object. + config["exploration_config"] = exploration + return exploration + def clip_action(action, space): """ diff --git a/rllib/policy/tests/test_policy.py b/rllib/policy/tests/test_policy.py index af5a952d6..481410137 100644 --- a/rllib/policy/tests/test_policy.py +++ b/rllib/policy/tests/test_policy.py @@ -15,8 +15,7 @@ class TestPolicy(Policy): prev_action_batch=None, prev_reward_batch=None, episodes=None, - deterministic=None, - explore=True, + explore=None, timestep=None, **kwargs): return [random.choice([0, 1])] * len(obs_batch), [], {} diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 1a051ca1e..cc4b6dbf6 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -66,7 +66,9 @@ class TFPolicy(Policy): seq_lens=None, max_seq_len=20, batch_divisibility_req=1, - update_ops=None): + update_ops=None, + explore=None, + timestep=None): """Initialize the policy. Arguments: @@ -102,7 +104,11 @@ class TFPolicy(Policy): update_ops (list): override the batchnorm update ops to run when applying gradients. Otherwise we run all update ops found in the current variable scope. + explore (Tensor): Placeholder for `explore` parameter into + call to Exploration.get_exploration_action. + timestep (Tensor): Placeholder for the global sampling timestep. """ + self.framework = "tf" super().__init__(observation_space, action_space, config) self.model = model self._sess = sess @@ -111,8 +117,8 @@ class TFPolicy(Policy): self._prev_reward_input = prev_reward_input self._action = action_sampler self._is_training = self._get_is_training_placeholder() - self._is_exploring = tf.placeholder_with_default( - True, (), name="is_exploring") + self._is_exploring = explore if explore is not None else \ + tf.placeholder_with_default(True, (), name="is_exploring") self._action_logp = action_logp self._action_prob = (tf.exp(self._action_logp) if self._action_logp is not None else None) @@ -124,7 +130,9 @@ class TFPolicy(Policy): self._update_ops = update_ops self._stats_fetches = {} self._loss_input_dict = None - self._timestep = tf.placeholder(tf.int32, (), name="timestep") + self.exploration_info = self.exploration.get_info() + self._timestep = timestep if timestep is not None else \ + tf.placeholder(tf.int32, (), name="timestep") if loss is not None: self._initialize_loss(loss, loss_inputs) @@ -144,17 +152,6 @@ class TFPolicy(Policy): raise ValueError( "seq_lens tensor must be given if state inputs are defined") - # Apply the post-forward-pass exploration if applicable. - # And store the `get_state` op. - self._exploration_action = None - if self.exploration: - self._exploration_action = self.exploration.get_exploration_action( - self._action, - self.model, - action_dist=self.dist_class, - explore=self._is_exploring, - timestep=self._timestep) - def variables(self): """Return the list of all savable variables for this policy.""" return self.model.variables() @@ -241,9 +238,10 @@ class TFPolicy(Policy): prev_reward_batch=None, info_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): + explore = explore if explore is not None else self.config["explore"] builder = TFRunBuilder(self._sess, "compute_actions") fetches = self._build_compute_actions( builder, @@ -255,8 +253,7 @@ class TFPolicy(Policy): timestep=timestep if timestep is not None else self.global_timestep) # Execute session run to get action (and other fetches). - ret = builder.get(fetches) - return ret[:3] + return builder.get(fetches) @override(Policy) def compute_gradients(self, postprocessed_batch): @@ -282,7 +279,7 @@ class TFPolicy(Policy): @override(Policy) def get_exploration_info(self): if isinstance(self.exploration, Exploration): - return self._sess.run(self.exploration.get_info()) + return self._sess.run(self.exploration_info) @override(Policy) def get_weights(self): @@ -464,8 +461,9 @@ class TFPolicy(Policy): prev_action_batch=None, prev_reward_batch=None, episodes=None, - explore=True, + explore=None, timestep=None): + explore = explore if explore is not None else self.config["explore"] state_batches = state_batches or [] if len(self._state_inputs) != len(state_batches): @@ -487,16 +485,8 @@ class TFPolicy(Policy): if timestep is not None: builder.add_feed_dict({self._timestep: timestep}) builder.add_feed_dict(dict(zip(self._state_inputs, state_batches))) - # Get an exploration action. - if explore and self.exploration: - fetches = builder.add_fetches( - [self._exploration_action] + self._state_outputs + - [self.extra_compute_action_fetches()]) - # Do not explore. - else: - fetches = builder.add_fetches( - [self._action] + self._state_outputs + - [self.extra_compute_action_fetches()]) + fetches = builder.add_fetches([self._action] + self._state_outputs + + [self.extra_compute_action_fetches()]) return fetches[0], fetches[1:-1], fetches[-1] def _build_compute_gradients(self, builder, postprocessed_batch): diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index 819fc3f20..e0782aab1 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -1,7 +1,8 @@ import numpy as np import time -from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY +from ray.rllib.policy.policy import Policy, LEARNER_STATS_KEY, ACTION_PROB, \ + ACTION_LOGP from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils import try_import_torch from ray.rllib.utils.annotations import override, DeveloperAPI @@ -19,9 +20,9 @@ class TorchPolicy(Policy): Attributes: observation_space (gym.Space): observation space of the policy. action_space (gym.Space): action space of the policy. - config (dict): config of the policy - model (TorchModel): Torch model instance - dist_class (type): Torch action distribution class + config (dict): config of the policy. + model (TorchModel): Torch model instance. + dist_class (type): Torch action distribution class. """ def __init__(self, observation_space, action_space, config, model, loss, @@ -43,6 +44,7 @@ class TorchPolicy(Policy): action_distribution_class (ActionDistribution): Class for action distribution. """ + self.framework = "torch" super().__init__(observation_space, action_space, config) self.device = (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")) @@ -63,9 +65,12 @@ class TorchPolicy(Policy): prev_reward_batch=None, info_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): + + explore = explore if explore is not None else self.config["explore"] + with torch.no_grad(): input_dict = self._lazy_tensor_dict({ SampleBatch.CUR_OBS: obs_batch, @@ -78,20 +83,23 @@ class TorchPolicy(Policy): model_out = self.model(input_dict, state_batches, self._convert_to_tensor([1])) logits, state = model_out - action_dist = self.dist_class(logits, self.model) - # Try our Exploration, if any. - if self.exploration: - actions = self.exploration.get_action( - model_out, self.model, action_dist, explore, timestep - if timestep is not None else self.global_timestep) - else: - actions = action_dist.sample() - + action_dist = None + actions, logp = \ + self.exploration.get_exploration_action( + logits, self.model, self.dist_class, explore, + timestep if timestep is not None else + self.global_timestep) input_dict[SampleBatch.ACTIONS] = actions + extra_action_out = self.extra_action_out(input_dict, state_batches, + self.model, action_dist) + if logp is not None: + extra_action_out.update({ + ACTION_PROB: torch.exp(logp), + ACTION_LOGP: logp + }) return (actions.cpu().numpy(), [h.cpu().numpy() for h in state], - self.extra_action_out(input_dict, state_batches, - self.model, action_dist)) + extra_action_out) @override(Policy) def learn_on_batch(self, postprocessed_batch): diff --git a/rllib/tests/test_dependency.py b/rllib/tests/test_dependency.py index 840c31a8f..0c99aaec5 100644 --- a/rllib/tests/test_dependency.py +++ b/rllib/tests/test_dependency.py @@ -4,7 +4,7 @@ import os import sys if __name__ == "__main__": - # + # Do not import tf for testing purposes. os.environ["RLLIB_TEST_NO_TF_IMPORT"] = "1" from ray.rllib.agents.a3c import A2CTrainer diff --git a/rllib/tests/test_explorations.py b/rllib/tests/test_explorations.py new file mode 100644 index 000000000..f92fdec5b --- /dev/null +++ b/rllib/tests/test_explorations.py @@ -0,0 +1,149 @@ +import numpy as np +import unittest + +import ray +import ray.rllib.agents.a3c as a3c +import ray.rllib.agents.dqn as dqn +import ray.rllib.agents.impala as impala +import ray.rllib.agents.pg as pg +import ray.rllib.agents.ppo as ppo +import ray.rllib.agents.sac as sac +from ray.rllib.utils import check + + +def test_explorations(run, + env, + config, + dummy_obs, + prev_a=None, + expected_mean_action=None): + """Calls an Agent's `compute_actions` with different `explore` options.""" + + config = config.copy() + if run not in [a3c.A3CTrainer]: + config["num_workers"] = 0 + + # Test all frameworks. + for fw in ["torch", "eager", "tf"]: + if fw == "torch" and \ + run in [dqn.DQNTrainer, dqn.SimpleQTrainer, + impala.ImpalaTrainer, sac.SACTrainer]: + continue + print("Testing {} in framework={}".format(run, fw)) + config["eager"] = True if fw == "eager" else False + config["use_pytorch"] = True if fw == "torch" else False + + # Test for both the default Agent's exploration AND the `Random` + # exploration class. + for exploration in [None]: # , "Random"]: + if exploration == "Random": + config["exploration_config"] = {"type": "Random"} + + trainer = run(config=config, env=env) + + # Make sure all actions drawn are the same, given same + # observations. + actions = [] + for _ in range(100): + actions.append( + trainer.compute_action( + observation=dummy_obs, + explore=False, + prev_action=prev_a, + prev_reward=1.0 if prev_a is not None else None)) + check(actions[-1], actions[0]) + + # Make sure actions drawn are different (around some mean value), + # given constant observations. + actions = [] + for _ in range(100): + actions.append( + trainer.compute_action( + observation=dummy_obs, + explore=True, + prev_action=prev_a, + prev_reward=1.0 if prev_a is not None else None)) + check( + np.mean(actions), + expected_mean_action + if expected_mean_action is not None else 0.5, + atol=0.3) + # Check that the stddev is not 0.0 (values differ). + check(np.std(actions), 0.0, false=True) + + +class TestExplorations(unittest.TestCase): + """ + Tests all Exploration components and the deterministic flag for + compute_action calls. + """ + ray.init(ignore_reinit_error=True) + + def test_a2c(self): + test_explorations( + a3c.A2CTrainer, + "CartPole-v0", + a3c.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0, 0.0]), + prev_a=np.array(1)) + + def test_a3c(self): + test_explorations( + a3c.A3CTrainer, + "CartPole-v0", + a3c.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0, 0.0]), + prev_a=np.array(1)) + + def test_simple_dqn(self): + test_explorations(dqn.SimpleQTrainer, "CartPole-v0", + dqn.DEFAULT_CONFIG, np.array([0.0, 0.1, 0.0, 0.0])) + + def test_dqn(self): + test_explorations(dqn.DQNTrainer, "CartPole-v0", dqn.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0, 0.0])) + + def test_impala(self): + test_explorations( + impala.ImpalaTrainer, + "CartPole-v0", + impala.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0, 0.0]), + prev_a=np.array([0])) + + def test_pg(self): + test_explorations( + pg.PGTrainer, + "CartPole-v0", + pg.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0, 0.0]), + prev_a=np.array([1])) + + def test_ppo_discr(self): + test_explorations( + ppo.PPOTrainer, + "CartPole-v0", + ppo.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0, 0.0]), + prev_a=np.array([0])) + + def test_ppo_cont(self): + test_explorations( + ppo.PPOTrainer, + "Pendulum-v0", + ppo.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0]), + prev_a=np.array([0]), + expected_mean_action=0.0) + + def test_sac(self): + test_explorations( + sac.SACTrainer, + "Pendulum-v0", + sac.DEFAULT_CONFIG, + np.array([0.0, 0.1, 0.0]), + expected_mean_action=0.0) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/rllib/tests/test_external_env.py b/rllib/tests/test_external_env.py index 2b98ac465..a9fb05c36 100644 --- a/rllib/tests/test_external_env.py +++ b/rllib/tests/test_external_env.py @@ -160,7 +160,8 @@ class TestExternalEnv(unittest.TestCase): "test3", lambda _: PartOffPolicyServing( gym.make("CartPole-v0"), off_pol_frac=0.2)) dqn = DQNTrainer( - env="test3", config={"exploration": { + env="test3", + config={"exploration_config": { "epsilon_timesteps": 100 }}) for i in range(100): diff --git a/rllib/tests/test_multi_agent_env.py b/rllib/tests/test_multi_agent_env.py index 075c761b9..9c05cc657 100644 --- a/rllib/tests/test_multi_agent_env.py +++ b/rllib/tests/test_multi_agent_env.py @@ -339,6 +339,8 @@ class TestMultiAgentEnv(unittest.TestCase): list(range(25)) * 6) def testMultiAgentSampleSyncRemote(self): + # Allow to be run via Unittest. + ray.init(num_cpus=4, ignore_reinit_error=True) act_space = gym.spaces.Discrete(2) obs_space = gym.spaces.Discrete(2) ev = RolloutWorker( @@ -356,6 +358,8 @@ class TestMultiAgentEnv(unittest.TestCase): self.assertEqual(batch.count, 200) def testMultiAgentSampleAsyncRemote(self): + # Allow to be run via Unittest. + ray.init(num_cpus=4, ignore_reinit_error=True) act_space = gym.spaces.Discrete(2) obs_space = gym.spaces.Discrete(2) ev = RolloutWorker( @@ -628,6 +632,8 @@ class TestMultiAgentEnv(unittest.TestCase): self._testWithOptimizer(SyncSamplesOptimizer) def test_multi_agent_async_gradients_optimizer(self): + # Allow to be run via Unittest. + ray.init(num_cpus=4, ignore_reinit_error=True) self._testWithOptimizer(AsyncGradientsOptimizer) def test_multi_agent_replay_optimizer(self): diff --git a/rllib/tests/test_rollout_worker.py b/rllib/tests/test_rollout_worker.py index 44f5e1bc1..726146954 100644 --- a/rllib/tests/test_rollout_worker.py +++ b/rllib/tests/test_rollout_worker.py @@ -24,7 +24,7 @@ class MockPolicy(TestPolicy): prev_action_batch=None, prev_reward_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): return [random.choice([0, 1])] * len(obs_batch), [], {} @@ -45,7 +45,7 @@ class BadPolicy(MockPolicy): prev_action_batch=None, prev_reward_batch=None, episodes=None, - explore=True, + explore=None, timestep=None, **kwargs): raise Exception("intentional error") @@ -159,6 +159,7 @@ class TestRolloutWorker(unittest.TestCase): len(set(SampleBatch.concat(batch1, batch2)["unroll_id"])), 2) def test_global_vars_update(self): + # Allow for Unittest run. ray.init(num_cpus=5, ignore_reinit_error=True) agent = A2CTrainer( env="CartPole-v0", @@ -179,6 +180,7 @@ class TestRolloutWorker(unittest.TestCase): result["info"]["num_steps_trained"])) def test_no_step_on_init(self): + # Allow for Unittest run. ray.init(num_cpus=5, ignore_reinit_error=True) register_env("fail", lambda _: FailOnStepEnv()) pg = PGTrainer(env="fail", config={"num_workers": 1}) @@ -209,6 +211,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertLess(counts["step"], 400) def test_query_evaluators(self): + # Allow for Unittest run. ray.init(num_cpus=5, ignore_reinit_error=True) register_env("test", lambda _: gym.make("CartPole-v0")) pg = PGTrainer( @@ -277,6 +280,7 @@ class TestRolloutWorker(unittest.TestCase): self.assertEqual(sum(samples["dones"]), 1) def test_metrics(self): + # Allow for Unittest run. ray.init(num_cpus=5, ignore_reinit_error=True) ev = RolloutWorker( env_creator=lambda _: MockEnv(episode_length=10), diff --git a/rllib/tests/test_supported_spaces.py b/rllib/tests/test_supported_spaces.py index f4784e439..d6f44d409 100644 --- a/rllib/tests/test_supported_spaces.py +++ b/rllib/tests/test_supported_spaces.py @@ -1,18 +1,17 @@ -import unittest -import traceback - import gym from gym.spaces import Box, Discrete, Tuple, Dict, MultiDiscrete from gym.envs.registration import EnvSpec import numpy as np import sys +import unittest +import traceback import ray from ray.rllib.agents.registry import get_agent_class from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork as FCNetV2 from ray.rllib.models.tf.visionnet_v2 import VisionNetwork as VisionNetV2 -from ray.rllib.tests.test_multi_agent_env import (MultiCartpole, - MultiMountainCar) +from ray.rllib.tests.test_multi_agent_env import MultiCartpole, \ + MultiMountainCar from ray.rllib.utils.error import UnsupportedSpaceException from ray.tune.registry import register_env @@ -73,9 +72,11 @@ def check_support(alg, config, stats, check_bounds=False, name=None): covered_a = set() covered_o = set() config["log_level"] = "ERROR" + first_error = None for a_name, action_space in ACTION_SPACES_TO_TEST.items(): for o_name, obs_space in OBSERVATION_SPACES_TO_TEST.items(): - print("=== Testing", alg, action_space, obs_space, "===") + print("=== Testing {} A={} S={} ===".format( + alg, action_space, obs_space)) stub_env = make_stub_env(action_space, obs_space, check_bounds) register_env("stub_env", lambda c: stub_env()) stat = "ok" @@ -85,7 +86,7 @@ def check_support(alg, config, stats, check_bounds=False, name=None): stat = "skip" # speed up tests by avoiding full grid else: a = get_agent_class(alg)(config=config, env="stub_env") - if alg not in ["DDPG", "ES", "ARS"]: + if alg not in ["DDPG", "ES", "ARS", "SAC"]: if o_name in ["atari", "image"]: assert isinstance(a.get_policy().model, VisionNetV2) @@ -100,6 +101,7 @@ def check_support(alg, config, stats, check_bounds=False, name=None): stat = "ERROR" print(e) print(traceback.format_exc()) + first_error = first_error if first_error is not None else e finally: if a: try: @@ -111,6 +113,10 @@ def check_support(alg, config, stats, check_bounds=False, name=None): print() stats[name or alg, a_name, o_name] = stat + # If anything happened, raise error. + if first_error is not None: + raise first_error + def check_support_multiagent(alg, config): register_env("multi_mountaincar", lambda _: MultiMountainCar(2)) @@ -127,30 +133,15 @@ def check_support_multiagent(alg, config): class ModelSupportedSpaces(unittest.TestCase): + stats = {} + def setUp(self): - ray.init(num_cpus=4) + ray.init(num_cpus=4, ignore_reinit_error=True) def tearDown(self): ray.shutdown() - def testAll(self): - stats = {} - check_support("IMPALA", {"num_gpus": 0}, stats) - check_support("APPO", {"num_gpus": 0, "vtrace": False}, stats) - check_support( - "APPO", { - "num_gpus": 0, - "vtrace": True - }, stats, name="APPO-vt") - check_support( - "DDPG", { - "exploration_ou_noise_scale": 100.0, - "timesteps_per_iteration": 1, - "use_state_preprocessor": True, - }, - stats, - check_bounds=True) - check_support("DQN", {"timesteps_per_iteration": 1}, stats) + def test_a3c(self): check_support( "A3C", { "num_workers": 1, @@ -158,8 +149,54 @@ class ModelSupportedSpaces(unittest.TestCase): "grads_per_step": 1 } }, - stats, + self.stats, check_bounds=True) + + def test_appo(self): + check_support("APPO", {"num_gpus": 0, "vtrace": False}, self.stats) + check_support( + "APPO", { + "num_gpus": 0, + "vtrace": True + }, + self.stats, + name="APPO-vt") + + def test_ars(self): + check_support( + "ARS", { + "num_workers": 1, + "noise_size": 10000000, + "num_rollouts": 1, + "rollouts_used": 1 + }, self.stats) + + def test_ddpg(self): + check_support( + "DDPG", { + "exploration_ou_noise_scale": 100.0, + "timesteps_per_iteration": 1, + "use_state_preprocessor": True, + }, + self.stats, + check_bounds=True) + + def test_dqn(self): + check_support("DQN", {"timesteps_per_iteration": 1}, self.stats) + + def test_es(self): + check_support( + "ES", { + "num_workers": 1, + "noise_size": 10000000, + "episodes_per_batch": 1, + "train_batch_size": 1 + }, self.stats) + + def test_impala(self): + check_support("IMPALA", {"num_gpus": 0}, self.stats) + + def test_ppo(self): check_support( "PPO", { "num_workers": 1, @@ -168,38 +205,40 @@ class ModelSupportedSpaces(unittest.TestCase): "sample_batch_size": 10, "sgd_minibatch_size": 1, }, - stats, + self.stats, check_bounds=True) - check_support( - "ES", { - "num_workers": 1, - "noise_size": 10000000, - "episodes_per_batch": 1, - "train_batch_size": 1 - }, stats) - check_support( - "ARS", { - "num_workers": 1, - "noise_size": 10000000, - "num_rollouts": 1, - "rollouts_used": 1 - }, stats) + + def test_pg(self): check_support( "PG", { "num_workers": 1, "optimizer": {} }, - stats, + self.stats, check_bounds=True) - num_unexpected_errors = 0 - for (alg, a_name, o_name), stat in sorted(stats.items()): - if stat not in ["ok", "unsupported", "skip"]: - num_unexpected_errors += 1 - print(alg, "action_space", a_name, "obs_space", o_name, "result", - stat) - self.assertEqual(num_unexpected_errors, 0) - def testMultiAgent(self): + def test_sac(self): + check_support("SAC", {}, self.stats, check_bounds=True) + + # def testAll(self): + + # num_unexpected_errors = 0 + # for (alg, a_name, o_name), stat in sorted(self.stats.items()): + # if stat not in ["ok", "unsupported", "skip"]: + # num_unexpected_errors += 1 + # print(alg, "action_space", a_name, "obs_space", o_name, "result", + # stat) + # self.assertEqual(num_unexpected_errors, 0) + + def test_a3c_multiagent(self): + check_support_multiagent("A3C", { + "num_workers": 1, + "optimizer": { + "grads_per_step": 1 + } + }) + + def test_apex_multiagent(self): check_support_multiagent( "APEX", { "num_workers": 2, @@ -209,6 +248,8 @@ class ModelSupportedSpaces(unittest.TestCase): "learning_starts": 1000, "target_network_update_freq": 100, }) + + def test_apex_ddpg_multiagent(self): check_support_multiagent( "APEX_DDPG", { "num_workers": 2, @@ -219,14 +260,23 @@ class ModelSupportedSpaces(unittest.TestCase): "target_network_update_freq": 100, "use_state_preprocessor": True, }) - check_support_multiagent("IMPALA", {"num_gpus": 0}) - check_support_multiagent("DQN", {"timesteps_per_iteration": 1}) - check_support_multiagent("A3C", { - "num_workers": 1, - "optimizer": { - "grads_per_step": 1 - } + + def test_ddpg_multiagent(self): + check_support_multiagent("DDPG", { + "timesteps_per_iteration": 1, + "use_state_preprocessor": True, }) + + def test_dqn_multiagent(self): + check_support_multiagent("DQN", {"timesteps_per_iteration": 1}) + + def test_impala_multiagent(self): + check_support_multiagent("IMPALA", {"num_gpus": 0}) + + def test_pg_multiagent(self): + check_support_multiagent("PG", {"num_workers": 1, "optimizer": {}}) + + def test_ppo_multiagent(self): check_support_multiagent( "PPO", { "num_workers": 1, @@ -235,11 +285,6 @@ class ModelSupportedSpaces(unittest.TestCase): "sample_batch_size": 10, "sgd_minibatch_size": 1, }) - check_support_multiagent("PG", {"num_workers": 1, "optimizer": {}}) - check_support_multiagent("DDPG", { - "timesteps_per_iteration": 1, - "use_state_preprocessor": True, - }) if __name__ == "__main__": diff --git a/rllib/utils/deprecation.py b/rllib/utils/deprecation.py index 252a72a04..8f3828b6a 100644 --- a/rllib/utils/deprecation.py +++ b/rllib/utils/deprecation.py @@ -2,6 +2,11 @@ import logging logger = logging.getLogger(__name__) +# A constant to use for any configuration that should be deprecated +# (to check, whether this config has actually been assigned a proper value or +# not). +DEPRECATED_VALUE = -1 + def deprecation_warning(old, new=None, error=None): """ diff --git a/rllib/utils/exploration/__init__.py b/rllib/utils/exploration/__init__.py index 5b89b9ae2..c9222bf40 100644 --- a/rllib/utils/exploration/__init__.py +++ b/rllib/utils/exploration/__init__.py @@ -2,5 +2,16 @@ from ray.rllib.utils.exploration.exploration import Exploration from ray.rllib.utils.exploration.epsilon_greedy import EpsilonGreedy from ray.rllib.utils.exploration.per_worker_epsilon_greedy import \ PerWorkerEpsilonGreedy +from ray.rllib.utils.exploration.random import Random +from ray.rllib.utils.exploration.soft_q import SoftQ +from ray.rllib.utils.exploration.stochastic_sampling import \ + StochasticSampling -__all__ = ["Exploration", "EpsilonGreedy", "PerWorkerEpsilonGreedy"] +__all__ = [ + "Exploration", + "EpsilonGreedy", + "PerWorkerEpsilonGreedy", + "Random", + "SoftQ", + "StochasticSampling", +] diff --git a/rllib/utils/exploration/epsilon_greedy.py b/rllib/utils/exploration/epsilon_greedy.py index 1bef8f3aa..d6f088c95 100644 --- a/rllib/utils/exploration/epsilon_greedy.py +++ b/rllib/utils/exploration/epsilon_greedy.py @@ -1,4 +1,3 @@ -import gym import numpy as np from ray.rllib.utils.annotations import override @@ -45,7 +44,6 @@ class EpsilonGreedy(Exploration): """ # For now, require Discrete action space (may loosen this restriction # in the future). - assert isinstance(action_space, gym.spaces.Discrete) assert framework is not None super().__init__( action_space=action_space, @@ -65,32 +63,24 @@ class EpsilonGreedy(Exploration): @override(Exploration) def get_exploration_action(self, - action, - model=None, - action_dist=None, + model_output, + model, + action_dist_class, explore=True, timestep=None): - # TODO(sven): This is hardcoded. Put a meaningful error, in case model - # API is not as required. - if not hasattr(model, "q_value_head"): - return action - q_values = model.q_value_head(model.last_output()) - if isinstance(q_values, list): - q_values = q_values[0] - if self.framework == "tf": - return self._get_tf_exploration_action_op(action, explore, - timestep, q_values) - else: - return self._get_torch_exploration_action(action, explore, - timestep, q_values) - def _get_tf_exploration_action_op(self, action, explore, timestep, - q_values): + if self.framework == "tf": + return self._get_tf_exploration_action_op(model_output, explore, + timestep) + else: + return self._get_torch_exploration_action(model_output, explore, + timestep) + + def _get_tf_exploration_action_op(self, model_output, explore, timestep): """Tf method to produce the tf op for an epsilon exploration action. Args: - action (tf.Tensor): The already sampled action (non-exploratory - case) as tf op. + model_output (tf.Tensor): Returns: tf.Tensor: The tf exploration-action op. @@ -99,13 +89,16 @@ class EpsilonGreedy(Exploration): self.epsilon_schedule(timestep if timestep is not None else self.last_timestep)) - batch_size = tf.shape(action)[0] + # Get the exploit action as the one with the highest logit value. + exploit_action = tf.argmax(model_output, axis=1) - # Maske out actions with q-value=-inf so that we don't + batch_size = tf.shape(model_output)[0] + # Mask out actions with q-value=-inf so that we don't # even consider them for exploration. random_valid_action_logits = tf.where( - tf.equal(q_values, tf.float32.min), - tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values)) + tf.equal(model_output, tf.float32.min), + tf.ones_like(model_output) * tf.float32.min, + tf.ones_like(model_output)) random_actions = tf.squeeze( tf.multinomial(random_valid_action_logits, 1), axis=1) @@ -114,26 +107,26 @@ class EpsilonGreedy(Exploration): minval=0, maxval=1, dtype=epsilon.dtype) \ < epsilon - exploration_action = tf.cond( + action = tf.cond( pred=tf.constant(explore, dtype=tf.bool) if isinstance(explore, bool) else explore, - true_fn=lambda: tf.where(chose_random, random_actions, action), - false_fn=lambda: action, - ) + true_fn=( + lambda: tf.where(chose_random, random_actions, exploit_action) + ), + false_fn=lambda: exploit_action) + # Increment `last_timestep` by 1 (or set to `timestep`). assign_op = \ tf.assign_add(self.last_timestep, 1) if timestep is None else \ tf.assign(self.last_timestep, timestep) with tf.control_dependencies([assign_op]): - return exploration_action + return action, tf.zeros_like(action, dtype=tf.float32) - def _get_torch_exploration_action(self, action, explore, timestep, - q_values): + def _get_torch_exploration_action(self, model_output, explore, timestep): """Torch method to produce an epsilon exploration action. Args: - action (torch.Tensor): The already sampled action (non-exploratory - case). + model_output (torch.Tensor): Returns: torch.Tensor: The exploration-action. @@ -142,28 +135,32 @@ class EpsilonGreedy(Exploration): self.last_timestep = timestep if timestep is not None else \ self.last_timestep + 1 + _, exploit_action = torch.max(model_output, 1) + action_logp = torch.zeros_like(exploit_action) + + # Explore. if explore: # Get the current epsilon. epsilon = self.epsilon_schedule(self.last_timestep) - - batch_size = q_values.size()[0] + batch_size = model_output.size()[0] # Mask out actions, whose Q-values are -inf, so that we don't # even consider them for exploration. random_valid_action_logits = torch.where( - q_values == float("-inf"), - torch.ones_like(q_values) * float("-inf"), - torch.ones_like(q_values)) - + model_output == float("-inf"), + torch.ones_like(model_output) * float("-inf"), + torch.ones_like(model_output)) + # A random action. random_actions = torch.squeeze( torch.multinomial(random_valid_action_logits, 1), axis=1) - - return torch.where( + # Pick either random or greedy. + action = torch.where( torch.empty((batch_size, )).uniform_() < epsilon, - random_actions, action) + random_actions, exploit_action) - # Return the original action. + return action, action_logp + # Return the deterministic "sample" (argmax) over the logits. else: - return action + return exploit_action, action_logp @override(Exploration) def get_info(self): diff --git a/rllib/utils/exploration/exploration.py b/rllib/utils/exploration/exploration.py index a0823c65a..ed163ee7e 100644 --- a/rllib/utils/exploration/exploration.py +++ b/rllib/utils/exploration/exploration.py @@ -1,12 +1,14 @@ -from ray.rllib.utils.framework import check_framework +from ray.rllib.utils.framework import check_framework, try_import_tf + +tf = try_import_tf() class Exploration: - """Implements an env-exploration strategy for Policies. + """Implements an exploration strategy for Policies. - An Exploration takes the predicted actions or action values from the agent, - and selects the action to actually apply to the environment using some - predefined exploration schema. + An Exploration takes model outputs, a distribution, and a timestep from + the agent and computes an action to apply to the environment using an + implemented exploration schema. """ def __init__(self, @@ -29,23 +31,24 @@ class Exploration: self.framework = check_framework(framework) def get_exploration_action(self, - action, - model=None, - action_dist=None, + model_output, + model, + action_dist_class, explore=True, timestep=None): - """Returns an action for exploration purposes. + """Returns a (possibly) exploratory action. - Given the Model's output and action distribution, returns an - exploration action (as opposed to the original model calculated - action). + Given the Model's logits outputs and action distribution, returns an + exploratory action. Args: - action (any): The already sampled action (non-exploratory case). + model_output (any): The raw output coming from the model + (e.g. q-values or PG-logits). model (ModelV2): The Model object. - action_dist: The ActionDistribution class. - explore (bool): Whether to explore or not (this could be a tf - placeholder). + action_dist_class: The ActionDistribution class. + explore (bool): True: "Normal" exploration behavior. + False: Suppress all exploratory behavior and return + a deterministic action. timestep (int): The current sampling time step. If None, the component should try to use an internal counter, which it then increments by 1. If provided, will set the internal @@ -85,7 +88,8 @@ class Exploration: Returns: any: A description of the Exploration (not necessarily its state). """ - return None + if self.framework == "tf": + return tf.no_op() def get_state(self): """Returns the current exploration state. diff --git a/rllib/utils/exploration/random.py b/rllib/utils/exploration/random.py new file mode 100644 index 000000000..c4342df78 --- /dev/null +++ b/rllib/utils/exploration/random.py @@ -0,0 +1,74 @@ +from gym.spaces import Discrete + +from ray.rllib.utils.annotations import override +from ray.rllib.utils.exploration.exploration import Exploration +from ray.rllib.utils.framework import try_import_tf, try_import_torch, \ + tf_function +from ray.rllib.utils.tuple_actions import TupleActions + +tf = try_import_tf() +torch, _ = try_import_torch() + + +class Random(Exploration): + """A random action selector (deterministic/greedy for explore=False). + + If explore=True, returns actions randomly from `self.action_space` (via + Space.sample()). + If explore=False, returns the greedy/max-likelihood action. + """ + + def __init__(self, action_space, framework="tf", **kwargs): + """Initialize a Random Exploration object. + + Args: + action_space (Space): The gym action space used by the environment. + framework (Optional[str]): One of None, "tf", "torch". + """ + assert isinstance(action_space, Discrete) + super().__init__( + action_space=action_space, framework=framework, **kwargs) + + @override(Exploration) + def get_exploration_action(self, + model_output, + model, + action_dist_class, + explore=True, + timestep=None): + # Instantiate the distribution object. + action_dist = action_dist_class(model_output, model) + + if self.framework == "tf": + return self._get_tf_exploration_action_op(action_dist, explore, + timestep) + else: + return self._get_torch_exploration_action(action_dist, explore, + timestep) + + @tf_function(tf) + def _get_tf_exploration_action_op(self, action_dist, explore, timestep): + if explore: + action = self.action_space.sample() + # Will be unnecessary, once we support batch/time-aware Spaces. + action = tf.expand_dims(tf.cast(action, dtype=tf.int32), 0) + else: + action = tf.cast( + action_dist.deterministic_sample(), dtype=tf.int32) + # TODO(sven): Move into (deterministic_)sample(logp=True|False) + if isinstance(action, TupleActions): + batch_size = tf.shape(action[0][0])[0] + else: + batch_size = tf.shape(action)[0] + logp = tf.zeros(shape=(batch_size, ), dtype=tf.float32) + return action, logp + + def _get_torch_exploration_action(self, action_dist, explore, timestep): + if explore: + # Unsqueeze will be unnecessary, once we support batch/time-aware + # Spaces. + action = torch.IntTensor(self.action_space.sample()).unsqueeze(0) + else: + action = torch.IntTensor(action_dist.deterministic_sample()) + logp = torch.zeros((action.size()[0], ), dtype=torch.float32) + return action, logp diff --git a/rllib/utils/exploration/soft_q.py b/rllib/utils/exploration/soft_q.py new file mode 100644 index 000000000..c89862e8f --- /dev/null +++ b/rllib/utils/exploration/soft_q.py @@ -0,0 +1,29 @@ +from gym.spaces import Discrete + +from ray.rllib.utils.exploration.stochastic_sampling import StochasticSampling + + +class SoftQ(StochasticSampling): + """Special case of StochasticSampling w/ Categorical and temperature param. + + Returns a stochastic sample from a Categorical parameterized by the model + output divided by the temperature. Returns the argmax iff explore=False. + """ + + def __init__(self, action_space, temperature=1.0, framework="tf", + **kwargs): + """Initializes a SoftQ Exploration object. + + Args: + action_space (Space): The gym action space used by the environment. + temperature (Schedule): The temperature to divide model outputs by + before creating the Categorical distribution to sample from. + framework (Optional[str]): One of None, "tf", "torch". + kwargs (dict): Passed on to super constructor. + """ + assert isinstance(action_space, Discrete) + super().__init__( + action_space=action_space, + static_params=dict(temperature=temperature), + framework=framework, + **kwargs) diff --git a/rllib/utils/exploration/stochastic_sampling.py b/rllib/utils/exploration/stochastic_sampling.py new file mode 100644 index 000000000..23f00db97 --- /dev/null +++ b/rllib/utils/exploration/stochastic_sampling.py @@ -0,0 +1,101 @@ +from ray.rllib.models.catalog import ModelCatalog +from ray.rllib.utils.annotations import override +from ray.rllib.utils.exploration.exploration import Exploration +from ray.rllib.utils.framework import try_import_tf, try_import_torch, \ + tf_function +from ray.rllib.utils.tuple_actions import TupleActions + +tf = try_import_tf() +torch, _ = try_import_torch() + + +class StochasticSampling(Exploration): + """An exploration that simply samples from a distribution. + + The sampling can be made deterministic by passing explore=False into + the call to `get_exploration_action`. + Also allows for scheduled parameters for the distributions, such as + lowering stddev, temperature, etc.. over time. + """ + + def __init__(self, + action_space, + framework="tf", + static_params=None, + time_dependent_params=None, + **kwargs): + """Initializes a StochasticSampling Exploration object. + + Args: + action_space (Space): The gym action space used by the environment. + framework (Optional[str]): One of None, "tf", "torch". + static_params (Optional[dict]): Parameters to be passed as-is into + the action distribution class' constructor. + time_dependent_params (dict): Parameters to be evaluated based on + `timestep` and then passed into the action distribution + class' constructor. + """ + assert framework is not None + super().__init__( + action_space=action_space, framework=framework, **kwargs) + + self.static_params = static_params or {} + + # TODO(sven): Support scheduled params whose values depend on timestep + # and that will be passed into the distribution's c'tor. + self.time_dependent_params = time_dependent_params or {} + + @override(Exploration) + def get_exploration_action(self, + model_output, + model, + action_dist_class, + explore=True, + timestep=None): + kwargs = self.static_params.copy() + + # TODO(sven): create schedules for these via easy-config patterns + # These can be used anywhere in configs, where schedules are wanted: + # e.g. lr=[0.003, 0.00001, 100k] <- linear anneal from 0.003, to + # 0.00001 over 100k ts. + # if self.time_dependent_params: + # for k, v in self.time_dependent_params: + # kwargs[k] = v(timestep) + constructor, _ = ModelCatalog.get_action_dist( + self.action_space, + None, + action_dist_class, + framework=self.framework) + action_dist = constructor(model_output, model, **kwargs) + + if self.framework == "torch": + return self._get_torch_exploration_action(action_dist, explore) + else: + return self._get_tf_exploration_action_op(action_dist, explore) + + @staticmethod + @tf_function(tf) + def _get_tf_exploration_action_op(action_dist, explore): + if explore: + action = action_dist.sample() + # TODO(sven): Change `sample` to accept `sample(logp=True|False)` + logp = action_dist.sampled_action_logp() + else: + action = action_dist.deterministic_sample() + # TODO(sven): Move into (deterministic_)sample(logp=True|False) + if isinstance(action, TupleActions): + batch_size = tf.shape(action[0][0])[0] + else: + batch_size = tf.shape(action)[0] + logp = tf.zeros(shape=(batch_size, ), dtype=tf.float32) + return action, logp + + @staticmethod + def _get_torch_exploration_action(action_dist, explore): + if explore: + action = action_dist.sample() + logp = action_dist.sampled_action_logp() + else: + action = action_dist.deterministic_sample() + logp = torch.zeros((action.size()[0], ), dtype=torch.float32) + return action, logp diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 89f9bc56f..bd94f0fc5 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -57,6 +57,22 @@ def try_import_tf(error=False): return None +def tf_function(tf_module): + """Conditional decorator for @tf.function. + + Use @tf_function(tf) instead to avoid errors if tf is not installed.""" + + # The actual decorator to use (pass in `tf` (which could be None)). + def decorator(func): + # If tf not installed -> return function as is (won't be used anyways). + if tf_module is None: + return func + # If tf installed, return @tf.function-decorated function. + return tf_module.function(func) + + return decorator + + def try_import_tfp(error=False): """ Args: diff --git a/rllib/utils/schedules/tests/test_schedules.py b/rllib/utils/schedules/tests/test_schedules.py index e4e2a3ecf..b5bd7db17 100644 --- a/rllib/utils/schedules/tests/test_schedules.py +++ b/rllib/utils/schedules/tests/test_schedules.py @@ -111,3 +111,8 @@ class TestSchedules(unittest.TestCase): for t, e in zip(ts, expected): out = piecewise(t) check(out, e, decimals=4) + + +if __name__ == "__main__": + import unittest + unittest.main(verbosity=1) diff --git a/rllib/utils/sgd.py b/rllib/utils/sgd.py index 04ce2b662..5c23c9979 100644 --- a/rllib/utils/sgd.py +++ b/rllib/utils/sgd.py @@ -109,7 +109,7 @@ def do_minibatch_sgd(samples, policies, local_worker, num_sgd_iter, MultiAgentBatch({ policy_id: minibatch }, minibatch.count)))[policy_id] - for k, v in batch_fetches[LEARNER_STATS_KEY].items(): + for k, v in batch_fetches.get(LEARNER_STATS_KEY, {}).items(): iter_extra_fetches[k].append(v) logger.debug("{} {}".format(i, averaged(iter_extra_fetches))) fetches[policy_id] = averaged(iter_extra_fetches) diff --git a/rllib/utils/test_utils.py b/rllib/utils/test_utils.py index 7ea734b4f..0afaac2c9 100644 --- a/rllib/utils/test_utils.py +++ b/rllib/utils/test_utils.py @@ -1,8 +1,9 @@ import numpy as np -from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.framework import try_import_tf, try_import_torch tf = try_import_tf() +torch, _ = try_import_torch() def check(x, y, decimals=5, atol=None, rtol=None, false=False): @@ -84,7 +85,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): except AssertionError as e: if false is False: raise e - # Everything else (assume numeric). + # Everything else (assume numeric or tf/torch.Tensor). else: if tf is not None: # y should never be a Tensor (y=expected value). @@ -95,7 +96,7 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): # In eager mode, numpyize tensors. if tf.executing_eagerly(): x = x.numpy() - # Otherwise, ??? + # Otherwise, use a quick tf-session. else: with tf.Session() as sess: x = sess.run(x) @@ -106,6 +107,16 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): atol=atol, rtol=rtol, false=false) + if torch is not None: + # y should never be a Tensor (y=expected value). + if isinstance(y, torch.Tensor): + raise ValueError("`y` (expected value) must not be a Tensor. " + "Use numpy.ndarray instead") + if isinstance(x, torch.Tensor): + try: + x = x.numpy() + except RuntimeError: + x = x.detach().numpy() # Using decimals. if atol is None and rtol is None: diff --git a/rllib/utils/tests/test_framework_agnostic_components.py b/rllib/utils/tests/test_framework_agnostic_components.py index 25252c52f..82fa4a66e 100644 --- a/rllib/utils/tests/test_framework_agnostic_components.py +++ b/rllib/utils/tests/test_framework_agnostic_components.py @@ -1,5 +1,7 @@ from abc import ABCMeta, abstractmethod from gym.spaces import Discrete +import numpy as np +from pathlib import Path import unittest from ray.rllib.utils.exploration.exploration import Exploration @@ -23,6 +25,11 @@ class TestFrameWorkAgnosticComponents(unittest.TestCase): # Switch on eager for testing purposes. tf.enable_eager_execution() + # Bazel makes it hard to find files specified in `args` (and `data`). + # Use the true absolute path. + script_dir = Path(__file__).parent + abs_path = script_dir.absolute() + # Try to create from an abstract class w/o default constructor. # Expect None. test = from_config({ @@ -37,16 +44,18 @@ class TestFrameWorkAgnosticComponents(unittest.TestCase): check(component.prop_d, "non_default") # Create a tf Component from json file. - component = from_config("dummy_config.json") + config_file = str(abs_path.joinpath("dummy_config.json")) + component = from_config(config_file) check(component.prop_c, "default") check(component.prop_d, 4) # default check(component.add(3.3).numpy(), 5.3) # prop_b == 2.0 # Create a torch Component from yaml file. - component = from_config("dummy_config.yml") + config_file = str(abs_path.joinpath("dummy_config.yml")) + component = from_config(config_file) check(component.prop_a, "something else") check(component.prop_d, 3) - check(component.add(1.2), torch.Tensor([2.2])) # prop_b == 1.0 + check(component.add(1.2), np.array([2.2])) # prop_b == 1.0 # Create tf Component from json-string (e.g. on command line). component = from_config( @@ -79,7 +88,7 @@ class TestFrameWorkAgnosticComponents(unittest.TestCase): "prop_a: B\nprop_b: -1.5\nprop_c: non-default\nframework: torch") check(component.prop_a, "B") check(component.prop_d, 4) # default - check(component.add(-5.1), torch.Tensor([-6.6])) # prop_b == -1.5 + check(component.add(-5.1), np.array([-6.6])) # prop_b == -1.5 class DummyComponent: @@ -122,3 +131,8 @@ class AbstractDummyComponent(DummyComponent, metaclass=ABCMeta): @abstractmethod def some_abstract_method(self): raise NotImplementedError + + +if __name__ == "__main__": + import unittest + unittest.main(verbosity=1) diff --git a/rllib/utils/tuple_actions.py b/rllib/utils/tuple_actions.py new file mode 100644 index 000000000..c2d9f299c --- /dev/null +++ b/rllib/utils/tuple_actions.py @@ -0,0 +1,11 @@ +from collections import namedtuple + + +class TupleActions(namedtuple("TupleActions", ["batches"])): + """Used to return tuple actions as a list of batches per tuple element.""" + + def __new__(cls, batches): + return super(TupleActions, cls).__new__(cls, batches) + + def numpy(self): + return TupleActions([b.numpy() for b in self.batches])