From 6e1c3ea824e87c9ba9d6f137ffb41c3070ff480a Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Tue, 11 Feb 2020 00:22:07 +0100 Subject: [PATCH] [RLlib] Exploration API (+EpsilonGreedy sub-class). (#6974) --- ci/jenkins_tests/run_rllib_tests.sh | 6 +- python/ray/tune/logger.py | 2 +- python/ray/tune/utils/util.py | 2 +- rllib/agents/ddpg/ddpg.py | 21 +- rllib/agents/ddpg/ddpg_policy.py | 4 +- rllib/agents/dqn/apex.py | 2 +- rllib/agents/dqn/dqn.py | 229 +++++++++--------- rllib/agents/dqn/dqn_policy.py | 72 ++---- rllib/agents/dqn/simple_q_policy.py | 58 +---- rllib/agents/dqn/tests/test_dqn.py | 22 ++ rllib/agents/pg/tests/__init__.py | 0 rllib/agents/pg/tests/test_pg.py | 46 ++-- rllib/agents/ppo/tests/test_ppo.py | 2 +- rllib/agents/sac/sac.py | 14 +- rllib/agents/sac/sac_policy.py | 5 +- rllib/agents/trainer.py | 101 +++++--- rllib/agents/trainer_template.py | 4 +- rllib/evaluation/rollout_worker.py | 6 + rllib/evaluation/sampler.py | 28 ++- rllib/evaluation/worker_set.py | 52 +++- rllib/examples/multiagent_two_trainers.py | 6 +- rllib/models/catalog.py | 31 +-- rllib/models/tf/tf_action_dist.py | 12 +- rllib/offline/off_policy_estimator.py | 3 +- rllib/optimizers/sync_replay_optimizer.py | 43 ++-- rllib/policy/dynamic_tf_policy.py | 8 +- rllib/policy/eager_tf_policy.py | 31 ++- rllib/policy/policy.py | 88 ++++++- rllib/policy/tests/test_policy.py | 3 +- rllib/policy/tf_policy.py | 95 ++++++-- rllib/policy/torch_policy.py | 23 +- rllib/tests/test_catalog.py | 4 +- rllib/tests/test_dependency.py | 11 +- rllib/tests/test_external_env.py | 5 +- rllib/tests/test_multi_agent_env.py | 5 +- rllib/tests/test_rollout_worker.py | 4 + rllib/tuned_examples/atari-apex.yaml | 9 +- rllib/tuned_examples/atari-dist-dqn.yaml | 8 +- rllib/tuned_examples/atari-dqn.yaml | 8 +- rllib/tuned_examples/atari-duel-ddqn.yaml | 8 +- .../compact-regression-test.yaml | 16 +- rllib/tuned_examples/pong-dqn.yaml | 6 +- rllib/tuned_examples/pong-rainbow.yaml | 8 +- rllib/utils/__init__.py | 3 +- rllib/utils/deprecation.py | 26 +- rllib/utils/exploration/__init__.py | 6 + rllib/utils/exploration/epsilon_greedy.py | 199 +++++++++++++++ rllib/utils/exploration/exploration.py | 132 ++++++++++ .../exploration/per_worker_epsilon_greedy.py | 52 ++++ rllib/utils/framework.py | 32 ++- rllib/utils/from_config.py | 52 ++-- rllib/utils/schedules/piecewise_schedule.py | 4 +- rllib/utils/schedules/schedule.py | 3 - rllib/utils/schedules/tests/test_schedules.py | 2 +- .../test_framework_agnostic_components.py | 32 ++- rllib/utils/torch_ops.py | 5 +- 56 files changed, 1163 insertions(+), 496 deletions(-) create mode 100644 rllib/agents/dqn/tests/test_dqn.py delete mode 100644 rllib/agents/pg/tests/__init__.py create mode 100644 rllib/utils/exploration/__init__.py create mode 100644 rllib/utils/exploration/epsilon_greedy.py create mode 100644 rllib/utils/exploration/exploration.py create mode 100644 rllib/utils/exploration/per_worker_epsilon_greedy.py diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index 45302c7b7..54aaf777b 100755 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -111,7 +111,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 1}' \ - --config '{"lr": 1e-3, "schedule_max_timesteps": 100000, "exploration_fraction": 0.1, "exploration_final_eps": 0.02, "dueling": false, "hiddens": [], "model": {"fcnet_hiddens": [64], "fcnet_activation": "relu"}}' + --config '{"lr": 1e-3, "exploration": {"epsilon_timesteps": 10000, "final_epsilon": 0.02}, "dueling": false, "hiddens": [], "model": {"fcnet_hiddens": [64], "fcnet_activation": "relu"}}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output /ray/rllib/train.py \ @@ -145,7 +145,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --env PongDeterministic-v4 \ --run DQN \ --stop '{"training_iteration": 1}' \ - --config '{"lr": 1e-4, "schedule_max_timesteps": 2000000, "buffer_size": 10000, "exploration_fraction": 0.1, "exploration_final_eps": 0.01, "sample_batch_size": 4, "learning_starts": 10000, "target_network_update_freq": 1000, "gamma": 0.99, "prioritized_replay": true}' + --config '{"lr": 1e-4, "exploration": {"epsilon_timesteps": 200000, "final_epsilon": 0.01}, "buffer_size": 10000, "sample_batch_size": 4, "learning_starts": 10000, "target_network_update_freq": 1000, "gamma": 0.99, "prioritized_replay": true}' docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output /ray/rllib/train.py \ @@ -298,7 +298,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/tests/test_local.py - + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/tests/test_reproducibility.py diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index f41b133bb..5c9ddd1a4 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -214,7 +214,7 @@ class TBXLogger(Logger): full_attr, value, global_step=step) # In case TensorboardX still doesn't think it's a valid value # (e.g. `[[]]`), warn and move on. - except ValueError: + except (ValueError, TypeError): logger.warning( "You are trying to log an invalid value ({}={}) " "via {}!".format(full_attr, value, diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 78147b1ec..603049cd4 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -164,7 +164,7 @@ def deep_update(original, new_dict, new_keys_allowed, whitelist): if k not in original: if not new_keys_allowed: raise Exception("Unknown config parameter `{}` ".format(k)) - if isinstance(original.get(k), dict): + if isinstance(original.get(k), dict) and isinstance(value, dict): if k in whitelist: deep_update(original[k], value, True, []) else: diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index 89ca5f40f..c30a88c54 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -1,6 +1,5 @@ from ray.rllib.agents.trainer import with_common_config -from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer, \ - update_worker_explorations +from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer from ray.rllib.agents.ddpg.ddpg_policy import DDPGTFPolicy from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule @@ -109,9 +108,8 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_alpha": 0.6, # Beta parameter for sampling from prioritized replay buffer. "prioritized_replay_beta": 0.4, - # Fraction of entire training period over which the beta parameter is - # annealed - "beta_annealing_fraction": 0.2, + # Time steps over which the beta parameter is annealed. + "prioritized_replay_beta_annealing_timesteps": 20000, # Final value of beta "final_prioritized_replay_beta": 0.4, # Epsilon to add to the TD errors when updating priorities. @@ -196,6 +194,19 @@ def setup_ddpg_exploration(trainer): ] +def update_worker_explorations(trainer): + global_timestep = trainer.optimizer.num_steps_sampled + exp_vals = [trainer.exploration0.value(global_timestep)] + trainer.workers.local_worker().foreach_trainable_policy( + lambda p, _: p.set_epsilon(exp_vals[0])) + for i, e in enumerate(trainer.workers.remote_workers()): + exp_val = trainer.explorations[i].value(global_timestep) + e.foreach_trainable_policy.remote(lambda p, _: p.set_epsilon(exp_val)) + exp_vals.append(exp_val) + trainer.train_start_timestep = global_timestep + trainer.exploration_infos = exp_vals + + def add_pure_exploration_phase(trainer): global_timestep = trainer.optimizer.num_steps_sampled pure_expl_steps = trainer.config["pure_exploration_steps"] diff --git a/rllib/agents/ddpg/ddpg_policy.py b/rllib/agents/ddpg/ddpg_policy.py index 6d23e8099..f226e013e 100644 --- a/rllib/agents/ddpg/ddpg_policy.py +++ b/rllib/agents/ddpg/ddpg_policy.py @@ -3,7 +3,7 @@ import numpy as np import ray import ray.experimental.tf_utils -from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn +from ray.rllib.agents.dqn.dqn_policy import postprocess_nstep_and_prio from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY from ray.rllib.models import ModelCatalog @@ -64,7 +64,7 @@ class DDPGPostprocessing: self.parameter_noise_sigma.load( self.parameter_noise_sigma_val, session=self.sess) - return _postprocess_dqn(self, sample_batch) + return postprocess_nstep_and_prio(self, sample_batch) class DDPGTFPolicy(DDPGPostprocessing, TFPolicy): diff --git a/rllib/agents/dqn/apex.py b/rllib/agents/dqn/apex.py index 61774436a..ac2c91c21 100644 --- a/rllib/agents/dqn/apex.py +++ b/rllib/agents/dqn/apex.py @@ -22,7 +22,7 @@ APEX_DEFAULT_CONFIG = merge_dicts( "sample_batch_size": 50, "target_network_update_freq": 500000, "timesteps_per_iteration": 25000, - "per_worker_exploration": True, + "exploration": {"type": "PerWorkerEpsilonGreedy"}, "worker_side_prioritization": True, "min_iter_time_s": 30, }, diff --git a/rllib/agents/dqn/dqn.py b/rllib/agents/dqn/dqn.py index 1709e1ee2..188447ffd 100644 --- a/rllib/agents/dqn/dqn.py +++ b/rllib/agents/dqn/dqn.py @@ -6,7 +6,8 @@ from ray.rllib.agents.dqn.dqn_policy import DQNTFPolicy from ray.rllib.agents.dqn.simple_q_policy import SimpleQPolicy from ray.rllib.optimizers import SyncReplayOptimizer from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID -from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule +from ray.rllib.utils.deprecation import deprecation_warning +from ray.rllib.utils.exploration import PerWorkerEpsilonGreedy logger = logging.getLogger(__name__) @@ -34,21 +35,27 @@ DEFAULT_CONFIG = with_common_config({ # N-step Q learning "n_step": 1, - # === Exploration === - # Max num timesteps for annealing schedules. Exploration is annealed from - # 1.0 to exploration_fraction over this number of timesteps scaled by - # exploration_fraction - "schedule_max_timesteps": 100000, + # === Exploration Settings (Experimental) === + "exploration": { + # The Exploration class to use. In the simplest case, this is the name + # (str) of any class present in the `rllib.utils.exploration` package. + # You can also provide the python class directly or the full location + # of your class (e.g. "ray.rllib.utils.exploration.epsilon_greedy. + # EpsilonGreedy"). + "type": "EpsilonGreedy", + # Config for the Exploration class' constructor: + "initial_epsilon": 1.0, + "final_epsilon": 0.02, + "epsilon_timesteps": 10000, # Timesteps over which to anneal epsilon. + }, + # TODO(sven): Make Exploration class for parameter noise. + # If True parameter space noise will be used for exploration + # See https://blog.openai.com/better-exploration-with-parameter-noise/ + "parameter_noise": False, + # Minimum env steps to optimize for per train call. This value does # not affect learning, only the length of iterations. "timesteps_per_iteration": 1000, - # Fraction of entire training period over which the exploration rate is - # annealed - "exploration_fraction": 0.1, - # Initial value of random action probability. - "exploration_initial_eps": 1.0, - # Final value of random action probability. - "exploration_final_eps": 0.02, # Update the target network every `target_network_update_freq` steps. "target_network_update_freq": 500, # Use softmax for sampling actions. Required for off policy estimation. @@ -56,15 +63,6 @@ DEFAULT_CONFIG = with_common_config({ # Softmax temperature. Q values are divided by this value prior to softmax. # Softmax approaches argmax as the temperature drops to zero. "softmax_temp": 1.0, - # If True parameter space noise will be used for exploration - # See https://blog.openai.com/better-exploration-with-parameter-noise/ - "parameter_noise": False, - # Extra configuration that disables exploration. - "evaluation_config": { - "exploration_fraction": 0, - "exploration_final_eps": 0, - }, - # === Replay buffer === # Size of the replay buffer. Note that if async_updates is set, then # each worker will have a replay buffer of this size. @@ -75,11 +73,10 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_alpha": 0.6, # Beta parameter for sampling from prioritized replay buffer. "prioritized_replay_beta": 0.4, - # Fraction of entire training period over which the beta parameter is - # annealed - "beta_annealing_fraction": 0.2, - # Final value of beta + # Final value of beta (by default, we use constant beta=0.4). "final_prioritized_replay_beta": 0.4, + # Time steps over which the beta parameter is annealed. + "prioritized_replay_beta_annealing_timesteps": 20000, # Epsilon to add to the TD errors when updating priorities. "prioritized_replay_eps": 1e-6, # Whether to LZ4 compress observations @@ -109,41 +106,51 @@ DEFAULT_CONFIG = with_common_config({ # to increase if your environment is particularly slow to sample, or if # you"re using the Async or Ape-X optimizers. "num_workers": 0, - # Whether to use a distribution of epsilons across workers for exploration. - "per_worker_exploration": False, # Whether to compute priorities on workers. "worker_side_prioritization": False, # Prevent iterations from going lower than this time span "min_iter_time_s": 1, + + # DEPRECATED VALUES (set to -1 to indicate they have not been overwritten + # by user's config). If we don't set them here, we will get an error + # from the config-key checker. + "schedule_max_timesteps": -1, + "exploration_final_eps": -1, + "exploration_fraction": -1, + "beta_annealing_fraction": -1, + "per_worker_exploration": -1, }) # __sphinx_doc_end__ # yapf: enable -def make_optimizer(workers, config): +def make_policy_optimizer(workers, config): + """Create the single process DQN policy optimizer. + + Returns: + SyncReplayOptimizer: Used for generic off-policy Trainers. + """ return SyncReplayOptimizer( workers, + # TODO(sven): Move all PR-beta decays into Schedule components. learning_starts=config["learning_starts"], buffer_size=config["buffer_size"], prioritized_replay=config["prioritized_replay"], prioritized_replay_alpha=config["prioritized_replay_alpha"], prioritized_replay_beta=config["prioritized_replay_beta"], - schedule_max_timesteps=config["schedule_max_timesteps"], - beta_annealing_fraction=config["beta_annealing_fraction"], + prioritized_replay_beta_annealing_timesteps=config[ + "prioritized_replay_beta_annealing_timesteps"], final_prioritized_replay_beta=config["final_prioritized_replay_beta"], prioritized_replay_eps=config["prioritized_replay_eps"], train_batch_size=config["train_batch_size"], - sample_batch_size=config["sample_batch_size"], **config["optimizer"]) -def check_config_and_setup_param_noise(config): - """Update the config based on settings. +def validate_config_and_setup_param_noise(config): + """Checks and updates the config based on settings. - Rewrites sample_batch_size to take into account n_step truncation, and also - adds the necessary callbacks to support parameter space noise exploration. + Rewrites sample_batch_size to take into account n_step truncation. """ - # PyTorch check. if config["use_pytorch"]: raise ValueError("DQN does not support PyTorch yet! Use tf instead.") @@ -153,18 +160,56 @@ def check_config_and_setup_param_noise(config): config.get("n_step", 1)) config["sample_batch_size"] = adjusted_batch_size + # TODO(sven): Remove at some point. + # Backward compatibility of epsilon-exploration config AND beta-annealing + # fraction settings (both based on schedule_max_timesteps, which is + # deprecated). + schedule_max_timesteps = None + if "schedule_max_timesteps" in config and \ + config["schedule_max_timesteps"] > 0: + deprecation_warning( + "schedule_max_timesteps", "exploration.epsilon_timesteps AND " + "prioritized_replay_beta_annealing_timesteps") + schedule_max_timesteps = config["schedule_max_timesteps"] + if "exploration_final_eps" in config and \ + config["exploration_final_eps"] > 0: + deprecation_warning("exploration_final_eps", + "exploration.final_epsilon") + if isinstance(config["exploration"], dict): + config["exploration"]["final_epsilon"] = \ + config.pop("exploration_final_eps") + if "exploration_fraction" in config and config["exploration_fraction"] > 0: + assert schedule_max_timesteps is not None + deprecation_warning("exploration_fraction", + "exploration.epsilon_timesteps") + if isinstance(config["exploration"], dict): + config["exploration"]["epsilon_timesteps"] = config.pop( + "exploration_fraction") * schedule_max_timesteps + if "beta_annealing_fraction" in config and \ + config["beta_annealing_fraction"] > 0: + assert schedule_max_timesteps is not None + deprecation_warning( + "beta_annealing_fraction (decimal)", + "prioritized_replay_beta_annealing_timesteps (int)") + config["prioritized_replay_beta_annealing_timesteps"] = config.pop( + "beta_annealing_fraction") * schedule_max_timesteps + if "per_worker_exploration" in config and \ + config["per_worker_exploration"] != -1: + deprecation_warning("per_worker_exploration", + "exploration.type=PerWorkerEpsilonGreedy") + if isinstance(config["exploration"], dict): + config["exploration"]["type"] = PerWorkerEpsilonGreedy + + # Setup parameter noise. if config.get("parameter_noise", False): if config["batch_mode"] != "complete_episodes": raise ValueError("Exploration with parameter space noise requires " "batch_mode to be complete_episodes.") if config.get("noisy", False): - raise ValueError( - "Exploration with parameter space noise and noisy network " - "cannot be used at the same time.") - if config["callbacks"]["on_episode_start"]: - start_callback = config["callbacks"]["on_episode_start"] - else: - start_callback = None + raise ValueError("Exploration with parameter space noise and " + "noisy network cannot be used at the same time.") + + start_callback = config["callbacks"].get("on_episode_start") def on_episode_start(info): # as a callback function to sample and pose parameter space @@ -172,14 +217,12 @@ def check_config_and_setup_param_noise(config): policies = info["policy"] for pi in policies.values(): pi.add_parameter_noise() - if start_callback: + if start_callback is not None: start_callback(info) config["callbacks"]["on_episode_start"] = on_episode_start - if config["callbacks"]["on_episode_end"]: - end_callback = config["callbacks"]["on_episode_end"] - else: - end_callback = None + + end_callback = config["callbacks"].get("on_episode_end") def on_episode_end(info): # as a callback function to monitor the distance @@ -189,7 +232,7 @@ def check_config_and_setup_param_noise(config): model = policies[DEFAULT_POLICY_ID].model if hasattr(model, "pi_distance"): episode.custom_metrics["policy_distance"] = model.pi_distance - if end_callback: + if end_callback is not None: end_callback(info) config["callbacks"]["on_episode_end"] = on_episode_end @@ -202,64 +245,37 @@ def get_initial_state(config): } -def make_exploration_schedule(config, worker_index): - # Use either a different `eps` per worker, or a linear schedule. - if config["per_worker_exploration"]: - assert config["num_workers"] > 1, \ - "This requires multiple workers" - if worker_index >= 0: - # Exploration constants from the Ape-X paper - exponent = ( - 1 + worker_index / float(config["num_workers"] - 1) * 7) - return ConstantSchedule(0.4**exponent) - else: - # local ev should have zero exploration so that eval rollouts - # run properly - return ConstantSchedule(0.0) +# TODO(sven): Move this to generic Trainer/Policy. Every Algo should do this. +def update_worker_exploration(trainer): + """Sets epsilon exploration values in all policies to updated values. - return PiecewiseSchedule( - endpoints=[ - (0, config["exploration_initial_eps"]), - (int(config["exploration_fraction"] * - config["schedule_max_timesteps"]), - config["exploration_final_eps"]), - ], - outside_value=config["exploration_final_eps"]) + According to current time-step. - -def setup_exploration(trainer): - trainer.exploration0 = make_exploration_schedule(trainer.config, -1) - trainer.explorations = [ - make_exploration_schedule(trainer.config, i) - for i in range(trainer.config["num_workers"]) - ] - - -def update_worker_explorations(trainer): + Args: + trainer (Trainer): The Trainer object for the DQN. + """ + # Store some data for metrics after learning. global_timestep = trainer.optimizer.num_steps_sampled - exp_vals = [trainer.exploration0.value(global_timestep)] - trainer.workers.local_worker().foreach_trainable_policy( - lambda p, _: p.set_epsilon(exp_vals[0])) - for i, e in enumerate(trainer.workers.remote_workers()): - exp_val = trainer.explorations[i].value(global_timestep) - e.foreach_trainable_policy.remote(lambda p, _: p.set_epsilon(exp_val)) - exp_vals.append(exp_val) trainer.train_start_timestep = global_timestep - trainer.cur_exp_vals = exp_vals + + # Get all current exploration-infos (from Policies, which cache this info). + trainer.exploration_infos = trainer.workers.foreach_trainable_policy( + lambda p, _: p.get_exploration_info()) -def add_trainer_metrics(trainer, result): +def after_train_result(trainer, result): + """Add some DQN specific metrics to results.""" global_timestep = trainer.optimizer.num_steps_sampled result.update( timesteps_this_iter=global_timestep - trainer.train_start_timestep, info=dict({ - "min_exploration": min(trainer.cur_exp_vals), - "max_exploration": max(trainer.cur_exp_vals), + "exploration_infos": trainer.exploration_infos, "num_target_updates": trainer.state["num_target_updates"], }, **trainer.optimizer.stats())) def update_target_if_needed(trainer, fetches): + """Update the target network in configured intervals.""" global_timestep = trainer.optimizer.num_steps_sampled if global_timestep - trainer.state["last_target_update_ts"] > \ trainer.config["target_network_update_freq"]: @@ -269,35 +285,16 @@ def update_target_if_needed(trainer, fetches): trainer.state["num_target_updates"] += 1 -def collect_metrics(trainer): - if trainer.config["per_worker_exploration"]: - # Only collect metrics from the third of workers with lowest eps - result = trainer.collect_metrics( - selected_workers=trainer.workers.remote_workers()[ - -len(trainer.workers.remote_workers()) // 3:]) - else: - result = trainer.collect_metrics() - return result - - -def disable_exploration(trainer): - trainer.evaluation_workers.local_worker().foreach_trainable_policy( - lambda p, _: p.set_epsilon(0)) - - GenericOffPolicyTrainer = build_trainer( name="GenericOffPolicyAlgorithm", default_policy=None, default_config=DEFAULT_CONFIG, - validate_config=check_config_and_setup_param_noise, + validate_config=validate_config_and_setup_param_noise, get_initial_state=get_initial_state, - make_policy_optimizer=make_optimizer, - before_init=setup_exploration, - before_train_step=update_worker_explorations, + make_policy_optimizer=make_policy_optimizer, + before_train_step=update_worker_exploration, after_optimizer_step=update_target_if_needed, - after_train_result=add_trainer_metrics, - collect_metrics_fn=collect_metrics, - before_evaluate_fn=disable_exploration) + after_train_result=after_train_result) DQNTrainer = GenericOffPolicyTrainer.with_updates( name="DQN", default_policy=DQNTFPolicy, default_config=DEFAULT_CONFIG) diff --git a/rllib/agents/dqn/dqn_policy.py b/rllib/agents/dqn/dqn_policy.py index 34af3839f..2b0d7956e 100644 --- a/rllib/agents/dqn/dqn_policy.py +++ b/rllib/agents/dqn/dqn_policy.py @@ -4,8 +4,8 @@ from scipy.stats import entropy import ray from ray.rllib.agents.dqn.distributional_q_model import DistributionalQModel -from ray.rllib.agents.dqn.simple_q_policy import ExplorationStateMixin, \ - TargetNetworkMixin +from ray.rllib.agents.dqn.simple_q_policy import TargetNetworkMixin, \ + ParameterNoiseMixin from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models import ModelCatalog from ray.rllib.models.tf.tf_action_dist import Categorical @@ -103,34 +103,6 @@ class QLoss: } -class QValuePolicy: - def __init__(self, q_values, observations, num_actions, cur_epsilon, - softmax, softmax_temp, model_config): - if softmax: - action_dist = Categorical(q_values / softmax_temp) - self.action = action_dist.sample() - self.action_prob = tf.exp(action_dist.sampled_action_logp()) - return - - deterministic_actions = tf.argmax(q_values, axis=1) - batch_size = tf.shape(observations)[0] - - # Special case masked out actions (q_value ~= -inf) so that we don't - # even consider them for exploration. - random_valid_action_logits = tf.where( - tf.equal(q_values, tf.float32.min), - tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values)) - random_actions = tf.squeeze( - tf.multinomial(random_valid_action_logits, 1), axis=1) - - chose_random = tf.random_uniform( - tf.stack([batch_size]), minval=0, maxval=1, - dtype=tf.float32) < cur_epsilon - self.action = tf.where(chose_random, random_actions, - deterministic_actions) - self.action_prob = None - - class ComputeTDErrorMixin: def __init__(self): @make_tf_callable(self.get_session(), dynamic_shape=True) @@ -177,7 +149,7 @@ def postprocess_trajectory(policy, policy.parameter_noise_sigma.load( policy.parameter_noise_sigma_val, session=policy.get_session()) - return _postprocess_dqn(policy, sample_batch) + return postprocess_nstep_and_prio(policy, sample_batch) def build_q_model(policy, obs_space, action_space, config): @@ -230,10 +202,9 @@ def build_q_model(policy, obs_space, action_space, config): return policy.q_model -def build_q_networks(policy, q_model, input_dict, obs_space, action_space, - config): - - # Action Q network +def sample_action_from_q_network(policy, q_model, input_dict, obs_space, + action_space, config): + # Action Q network. q_values, q_logits, q_dist = _compute_q_values( policy, q_model, input_dict[SampleBatch.CUR_OBS], obs_space, action_space) @@ -247,16 +218,17 @@ def build_q_networks(policy, q_model, input_dict, obs_space, action_space, [var for var in policy.q_func_vars if "LayerNorm" not in var.name]) policy.action_probs = tf.nn.softmax(policy.q_values) - # Action outputs - qvp = QValuePolicy(q_values, input_dict[SampleBatch.CUR_OBS], - action_space.n, policy.cur_epsilon, config["soft_q"], - config["softmax_temp"], config["model"]) - policy.output_actions, policy.action_prob = qvp.action, qvp.action_prob - - actions = policy.output_actions - action_prob = (tf.log(policy.action_prob) - if policy.action_prob is not None else None) - return actions, action_prob + # TODO(sven): Move soft_q logic to different Exploration child-component. + action_log_prob = None + if config["soft_q"]: + action_dist = Categorical(q_values / config["softmax_temp"]) + policy.output_actions = action_dist.sample() + action_log_prob = action_dist.sampled_action_logp() + policy.action_prob = tf.exp(action_log_prob) + else: + policy.output_actions = tf.argmax(q_values, axis=1) + policy.action_prob = None + return policy.output_actions, action_log_prob def _build_parameter_noise(policy, pnet_params): @@ -374,7 +346,7 @@ def build_q_stats(policy, batch): def setup_early_mixins(policy, obs_space, action_space, config): LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"]) - ExplorationStateMixin.__init__(policy, obs_space, action_space, config) + ParameterNoiseMixin.__init__(policy, obs_space, action_space, config) def setup_mid_mixins(policy, obs_space, action_space, config): @@ -451,7 +423,7 @@ def _adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): rewards[i] += gamma**j * rewards[i + j] -def _postprocess_dqn(policy, batch): +def postprocess_nstep_and_prio(policy, batch, other_agent=None, episode=None): # N-step Q adjustments if policy.config["n_step"] > 1: _adjust_nstep(policy.config["n_step"], policy.config["gamma"], @@ -479,10 +451,10 @@ DQNTFPolicy = build_tf_policy( name="DQNTFPolicy", get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, make_model=build_q_model, - action_sampler_fn=build_q_networks, + action_sampler_fn=sample_action_from_q_network, loss_fn=build_q_losses, stats_fn=build_q_stats, - postprocess_fn=postprocess_trajectory, + postprocess_fn=postprocess_nstep_and_prio, optimizer_fn=adam_optimizer, gradients_fn=clip_gradients, extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, @@ -492,7 +464,7 @@ DQNTFPolicy = build_tf_policy( after_init=setup_late_mixins, obs_include_prev_action_reward=False, mixins=[ - ExplorationStateMixin, + ParameterNoiseMixin, TargetNetworkMixin, ComputeTDErrorMixin, LearningRateSchedule, diff --git a/rllib/agents/dqn/simple_q_policy.py b/rllib/agents/dqn/simple_q_policy.py index b60c8e95e..07f892455 100644 --- a/rllib/agents/dqn/simple_q_policy.py +++ b/rllib/agents/dqn/simple_q_policy.py @@ -5,7 +5,6 @@ import logging import ray from ray.rllib.agents.dqn.simple_q_model import SimpleQModel -from ray.rllib.policy.policy import Policy from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models import ModelCatalog from ray.rllib.utils.annotations import override @@ -22,35 +21,14 @@ Q_SCOPE = "q_func" Q_TARGET_SCOPE = "target_q_func" -class ExplorationStateMixin: +class ParameterNoiseMixin: def __init__(self, obs_space, action_space, config): - # Python value, should always be same as the TF variable - self.cur_epsilon_value = 1.0 - self.cur_epsilon = tf.get_variable( - initializer=tf.constant_initializer(self.cur_epsilon_value), - name="eps", - shape=(), - trainable=False, - dtype=tf.float32) + pass def add_parameter_noise(self): if self.config["parameter_noise"]: self.sess.run(self.add_noise_op) - def set_epsilon(self, epsilon): - self.cur_epsilon_value = epsilon - self.cur_epsilon.load( - self.cur_epsilon_value, session=self.get_session()) - - @override(Policy) - def get_state(self): - return [TFPolicy.get_state(self), self.cur_epsilon_value] - - @override(Policy) - def set_state(self, state): - TFPolicy.set_state(self, state[0]) - self.set_epsilon(state[1]) - class TargetNetworkMixin: def __init__(self, obs_space, action_space, config): @@ -109,8 +87,8 @@ def build_q_models(policy, obs_space, action_space, config): return policy.q_model -def build_action_sampler(policy, q_model, input_dict, obs_space, action_space, - config): +def sample_action_from_q_network(policy, q_model, input_dict, obs_space, + action_space, config): # Action Q network q_values = _compute_q_values(policy, q_model, @@ -119,26 +97,8 @@ def build_action_sampler(policy, q_model, input_dict, obs_space, action_space, policy.q_values = q_values policy.q_func_vars = q_model.variables() - # Action outputs - deterministic_actions = tf.argmax(q_values, axis=1) - batch_size = tf.shape(input_dict[SampleBatch.CUR_OBS])[0] - - # Special case masked out actions (q_value ~= -inf) so that we don't - # even consider them for exploration. - random_valid_action_logits = tf.where( - tf.equal(q_values, tf.float32.min), - tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values)) - random_actions = tf.squeeze( - tf.multinomial(random_valid_action_logits, 1), axis=1) - - chose_random = tf.random_uniform( - tf.stack([batch_size]), minval=0, maxval=1, - dtype=tf.float32) < policy.cur_epsilon - stochastic_actions = tf.where(chose_random, random_actions, - deterministic_actions) - action_logp = None - - return stochastic_actions, action_logp + # Action outputs. + return tf.argmax(q_values, axis=1), None def build_q_losses(policy, model, dist_class, train_batch): @@ -190,7 +150,7 @@ def _compute_q_values(policy, model, obs, obs_space, action_space): def setup_early_mixins(policy, obs_space, action_space, config): - ExplorationStateMixin.__init__(policy, obs_space, action_space, config) + ParameterNoiseMixin.__init__(policy, obs_space, action_space, config) def setup_late_mixins(policy, obs_space, action_space, config): @@ -201,7 +161,7 @@ SimpleQPolicy = build_tf_policy( name="SimpleQPolicy", get_default_config=lambda: ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, make_model=build_q_models, - action_sampler_fn=build_action_sampler, + action_sampler_fn=sample_action_from_q_network, loss_fn=build_q_losses, extra_action_fetches_fn=lambda policy: {"q_values": policy.q_values}, extra_learn_fetches_fn=lambda policy: {"td_error": policy.td_error}, @@ -209,6 +169,6 @@ SimpleQPolicy = build_tf_policy( after_init=setup_late_mixins, obs_include_prev_action_reward=False, mixins=[ - ExplorationStateMixin, + ParameterNoiseMixin, TargetNetworkMixin, ]) diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py new file mode 100644 index 000000000..29f34d56c --- /dev/null +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -0,0 +1,22 @@ +import unittest + +import ray.rllib.agents.dqn as dqn +from ray.rllib.utils.framework import try_import_tf + +tf = try_import_tf() + + +class TestDQN(unittest.TestCase): + def test_dqn_compilation(self): + """Test whether a DQNTrainer can be built with both frameworks.""" + config = dqn.DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + config["eager"] = True + + # tf. + trainer = dqn.DQNTrainer(config=config, env="CartPole-v0") + + num_iterations = 2 + for i in range(num_iterations): + results = trainer.train() + print(results) diff --git a/rllib/agents/pg/tests/__init__.py b/rllib/agents/pg/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/rllib/agents/pg/tests/test_pg.py b/rllib/agents/pg/tests/test_pg.py index 0971843a2..281d0289f 100644 --- a/rllib/agents/pg/tests/test_pg.py +++ b/rllib/agents/pg/tests/test_pg.py @@ -41,13 +41,11 @@ class TestPG(unittest.TestCase): config["model"]["fcnet_hiddens"] = [10] config["model"]["fcnet_activation"] = "linear" - # Fake CartPole episode of n timesteps. + # Fake CartPole episode of n time steps. train_batch = { - SampleBatch.CUR_OBS: np.array([ - [0.1, 0.2, 0.3, 0.4], - [0.5, 0.6, 0.7, 0.8], - [0.9, 1.0, 1.1, 1.2] - ]), + SampleBatch.CUR_OBS: np.array([[0.1, 0.2, 0.3, + 0.4], [0.5, 0.6, 0.7, 0.8], + [0.9, 1.0, 1.1, 1.2]]), SampleBatch.ACTIONS: np.array([0, 1, 1]), SampleBatch.REWARDS: np.array([1.0, 1.0, 1.0]), SampleBatch.DONES: np.array([False, False, True]) @@ -68,24 +66,19 @@ class TestPG(unittest.TestCase): # Actual loss results. results = pg.pg_tf_loss( - policy, policy.model, dist_class=Categorical, - train_batch=train_batch - ) + policy, + policy.model, + dist_class=Categorical, + train_batch=train_batch) # Calculate expected results. expected_logits = fc( - fc( - train_batch[SampleBatch.CUR_OBS], - vars[0].numpy(), vars[1].numpy() - ), - vars[2].numpy(), vars[3].numpy() - ) + fc(train_batch[SampleBatch.CUR_OBS], vars[0].numpy(), + vars[1].numpy()), vars[2].numpy(), vars[3].numpy()) expected_logp = Categorical(expected_logits, policy.model).logp( - train_batch[SampleBatch.ACTIONS] - ) + train_batch[SampleBatch.ACTIONS]) expected_loss = -np.mean( - expected_logp * train_batch[Postprocessing.ADVANTAGES] - ) + expected_logp * train_batch[Postprocessing.ADVANTAGES]) check(results.numpy(), expected_loss, decimals=4) # Torch. @@ -94,15 +87,14 @@ class TestPG(unittest.TestCase): policy = trainer.get_policy() train_batch = policy._lazy_tensor_dict(train_batch) results = pg.pg_torch_loss( - policy, policy.model, dist_class=TorchCategorical, - train_batch=train_batch - ) - expected_logits = policy.model._last_output + policy, + policy.model, + dist_class=TorchCategorical, + train_batch=train_batch) + expected_logits = policy.model.last_output() expected_logp = TorchCategorical(expected_logits, policy.model).logp( - train_batch[SampleBatch.ACTIONS] - ) + train_batch[SampleBatch.ACTIONS]) expected_loss = -np.mean( expected_logp.detach().numpy() * - train_batch[Postprocessing.ADVANTAGES].numpy() - ) + train_batch[Postprocessing.ADVANTAGES].numpy()) check(results.detach().numpy(), expected_loss, decimals=4) diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index 6f0b5a977..4e8f753cb 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -120,7 +120,7 @@ class TestPPO(unittest.TestCase): kl, entropy, pg_loss, vf_loss, overall_loss = \ self._ppo_loss_helper( policy, policy.model, TorchCategorical, train_batch, - policy.model._last_output, + policy.model.last_output(), policy.model.value_function().detach().numpy() ) check(kl, policy.loss_obj.mean_kl.detach().numpy()) diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index 5adc216e3..d5339b60f 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -64,7 +64,7 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_alpha": 0.6, "prioritized_replay_beta": 0.4, "prioritized_replay_eps": 1e-6, - "beta_annealing_fraction": 0.2, + "prioritized_replay_beta_annealing_timesteps": 20000, "final_prioritized_replay_beta": 0.4, "compress_observations": False, @@ -104,12 +104,12 @@ DEFAULT_CONFIG = with_common_config({ # Prevent iterations from going lower than this time span. "min_iter_time_s": 1, - # TODO(ekl) these are unused; remove them from sac config - "per_worker_exploration": False, - "exploration_fraction": 0.1, - "schedule_max_timesteps": 100000, - "exploration_initial_eps": 1.0, - "exploration_final_eps": 0.02, + # DEPRECATED: + "per_worker_exploration": -1, + "exploration_fraction": -1, + "schedule_max_timesteps": -1, + "exploration_initial_eps": -1, + "exploration_final_eps": -1, }) # __sphinx_doc_end__ # yapf: enable diff --git a/rllib/agents/sac/sac_policy.py b/rllib/agents/sac/sac_policy.py index 73ed56d01..306b4a7d2 100644 --- a/rllib/agents/sac/sac_policy.py +++ b/rllib/agents/sac/sac_policy.py @@ -6,7 +6,8 @@ import ray import ray.experimental.tf_utils from ray.rllib.agents.sac.sac_model import SACModel from ray.rllib.agents.ddpg.noop_model import NoopModel -from ray.rllib.agents.dqn.dqn_policy import _postprocess_dqn, PRIO_WEIGHTS +from ray.rllib.agents.dqn.dqn_policy import postprocess_nstep_and_prio, \ + PRIO_WEIGHTS from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.policy.tf_policy_template import build_tf_policy @@ -82,7 +83,7 @@ def postprocess_trajectory(policy, sample_batch, other_agent_batches=None, episode=None): - return _postprocess_dqn(policy, sample_batch) + return postprocess_nstep_and_prio(policy, sample_batch) def build_action_output(policy, model, input_dict, obs_space, action_space, diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index ffdb29a07..619caca14 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -15,10 +15,10 @@ from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID from ray.rllib.evaluation.metrics import collect_metrics from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.evaluation.worker_set import WorkerSet +from ray.rllib.utils import FilterManager, deep_update, merge_dicts, \ + try_import_tf from ray.rllib.utils.annotations import override, PublicAPI, DeveloperAPI -from ray.rllib.utils import FilterManager, deep_update, merge_dicts from ray.rllib.utils.memory import ray_get_and_free -from ray.rllib.utils import try_import_tf from ray.tune.registry import ENV_CREATOR, register_env, _global_registry from ray.tune.trainable import Trainable from ray.tune.trial import ExportFormat @@ -165,6 +165,11 @@ COMMON_CONFIG = { # only has an effect if eager is enabled. "no_eager_on_workers": False, + # === Exploration Settings === + # Provide a dict specifying the Exploration object's config. + # Set to False or None for no exploration behavior (e.g., for evaluation). + "exploration": False, + # === Evaluation Settings === # Evaluate with every `evaluation_interval` training iterations. # The evaluation stats will be reported under the "evaluation" metric key. @@ -174,7 +179,6 @@ COMMON_CONFIG = { # Number of episodes to run per evaluation period. If using multiple # evaluation workers, we will run at least this many episodes total. "evaluation_num_episodes": 10, - # Extra arguments to pass to evaluation workers. # Typical usage is to pass extra args to evaluation env creator # and to disable exploration by computing deterministic actions "evaluation_config": { @@ -182,6 +186,7 @@ COMMON_CONFIG = { # "env_config": {...}, # "exploration_fraction": 0, # "exploration_final_eps": 0, + "exploration": False }, # Number of parallel workers to use for evaluation. Note that this is set # to zero by default, which means evaluation will be run in the trainer @@ -301,7 +306,7 @@ COMMON_CONFIG = { "input_evaluation": ["is", "wis"], # Whether to run postprocess_trajectory() on the trajectory fragments from # offline inputs. Note that postprocessing will be done using the *current* - # policy, not the *behaviour* policy, which is typically undesirable for + # policy, not the *behavior* policy, which is typically undesirable for # on-policy algorithms. "postprocess_inputs": False, # If positive, input batches will be shuffled via a sliding window buffer @@ -419,7 +424,7 @@ class Trainer(Trainable): logger_creator = default_logger_creator - Trainable.__init__(self, config, logger_creator) + super().__init__(config, logger_creator) @classmethod @override(Trainable) @@ -556,7 +561,7 @@ class Trainer(Trainable): with get_scope(): self._init(self.config, self.env_creator) - # Evaluation related + # Evaluation setup. if self.config.get("evaluation_interval"): # Update env_config with evaluation settings: extra_config = copy.deepcopy(self.config["evaluation_config"]) @@ -566,6 +571,7 @@ class Trainer(Trainable): }) logger.debug( "using evaluation_config: {}".format(extra_config)) + self.evaluation_workers = self._make_workers( self.env_creator, self._policy, @@ -594,6 +600,26 @@ class Trainer(Trainable): @DeveloperAPI def _make_workers(self, env_creator, policy, config, num_workers): + """Default factory method for a WorkerSet running under this Trainer. + + Override this method by passing a custom `make_workers` into + `build_trainer`. + + Args: + env_creator (callable): A function that return and Env given an env + config. + policy (class): The Policy class to use for creating the policies + of the workers. + config (dict): The Trainer's config. + num_workers (int): Number of remote rollout workers to create. + 0 for local only. + remote_config_updates (Optional[List[dict]]): A list of config + dicts to update `config` with for each Worker (len must be + same as `num_workers`). + + Returns: + WorkerSet: The created WorkerSet. + """ return WorkerSet( env_creator, policy, @@ -604,7 +630,6 @@ class Trainer(Trainable): @DeveloperAPI def _init(self, config, env_creator): """Subclasses should override this for custom initialization.""" - raise NotImplementedError @DeveloperAPI @@ -614,7 +639,6 @@ class Trainer(Trainable): Note that this default implementation does not do anything beyond merging evaluation_config with the normal trainer config. """ - if not self.config["evaluation_config"]: raise ValueError( "No evaluation_config specified. It doesn't make sense " @@ -677,8 +701,9 @@ class Trainer(Trainable): prev_reward=None, info=None, policy_id=DEFAULT_POLICY_ID, - full_fetch=False): - """Computes an action for the specified policy. + full_fetch=False, + explore=True): + """Computes an action for the specified policy on the local Worker. Note that you can also access the policy object through self.get_policy(policy_id) and call compute_actions() on it directly. @@ -686,58 +711,57 @@ class Trainer(Trainable): Arguments: observation (obj): observation from the environment. state (list): RNN hidden state, if any. If state is not None, - then all of compute_single_action(...) is returned - (computed action, rnn state, logits dictionary). - Otherwise compute_single_action(...)[0] is - returned (computed action). + then all of compute_single_action(...) is returned + (computed action, rnn state(s), logits dictionary). + Otherwise compute_single_action(...)[0] is returned + (computed action). prev_action (obj): previous action value, if any prev_reward (int): previous reward, if any info (dict): info object, if any - policy_id (str): policy to query (only applies to multi-agent). - full_fetch (bool): whether to return extra action fetch results. - This is always set to true if RNN state is specified. + policy_id (str): Policy to query (only applies to multi-agent). + full_fetch (bool): Whether to return extra action fetch results. + This is always set to True if RNN state is specified. + explore (bool): Whether to pick an action using exploration or not. Returns: - Just the computed action if full_fetch=False, or the full output - of policy.compute_actions() otherwise. + any: The computed action if full_fetch=False, or + tuple: The full output of policy.compute_actions() if + full_fetch=True or we have an RNN-based Policy. """ - if state is None: state = [] preprocessed = self.workers.local_worker().preprocessors[ policy_id].transform(observation) filtered_obs = self.workers.local_worker().filters[policy_id]( preprocessed, update=False) - if state: - return self.get_policy(policy_id).compute_single_action( - filtered_obs, - state, - prev_action, - prev_reward, - info, - clip_actions=self.config["clip_actions"]) - res = self.get_policy(policy_id).compute_single_action( + + # Figure out the current (sample) time step and pass it into Policy. + timestep = self.optimizer.num_steps_sampled \ + if self._has_policy_optimizer() else None + + result = self.get_policy(policy_id).compute_single_action( filtered_obs, state, prev_action, prev_reward, info, - clip_actions=self.config["clip_actions"]) - if full_fetch: - return res + clip_actions=self.config["clip_actions"], + explore=explore, + timestep=timestep) + + if state or full_fetch: + return result else: - return res[0] # backwards compatibility + return result[0] # backwards compatibility @property def _name(self): """Subclasses should override this to declare their name.""" - raise NotImplementedError @property def _default_config(self): """Subclasses should override this to declare their default config.""" - raise NotImplementedError @PublicAPI @@ -747,7 +771,6 @@ class Trainer(Trainable): Arguments: policy_id (str): id of policy to return. """ - return self.workers.local_worker().get_policy(policy_id) @PublicAPI @@ -890,6 +913,12 @@ class Trainer(Trainable): self.optimizer.reset(healthy_workers) def _has_policy_optimizer(self): + """Whether this Trainer has a PolicyOptimizer as `optimizer` property. + + Returns: + bool: True if this Trainer holds a PolicyOptimizer object in + property `self.optimizer`. + """ return hasattr(self, "optimizer") and isinstance( self.optimizer, PolicyOptimizer) diff --git a/rllib/agents/trainer_template.py b/rllib/agents/trainer_template.py index 6006a31fc..761ba77ce 100644 --- a/rllib/agents/trainer_template.py +++ b/rllib/agents/trainer_template.py @@ -33,8 +33,8 @@ def build_trainer(name, Arguments: name (str): name of the trainer (e.g., "PPO") default_policy (cls): the default Policy class to use - default_config (dict): the default config dict of the algorithm, - otherwises uses the Trainer default config + default_config (dict): The default config dict of the algorithm, + otherwise uses the Trainer default config. validate_config (func): optional callback that checks a given config for correctness. It may mutate the config as needed. get_initial_state (func): optional function that returns the initial diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 676a2e9b7..6ec4bf6c6 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -131,6 +131,7 @@ class RolloutWorker(EvaluatorInterface): model_config=None, policy_config=None, worker_index=0, + num_workers=0, monitor_path=None, log_dir=None, log_level=None, @@ -202,6 +203,8 @@ class RolloutWorker(EvaluatorInterface): worker_index (int): For remote workers, this should be set to a non-zero and unique value. This index is passed to created envs through EnvContext so that envs can be configured per worker. + num_workers (int): For remote workers, how many workers altogether + have been created? monitor_path (str): Write out episode stats and videos to this directory if specified. log_dir (str): Directory where logs can be placed. @@ -255,6 +258,7 @@ class RolloutWorker(EvaluatorInterface): self.policy_config = policy_config self.callbacks = callbacks or {} self.worker_index = worker_index + self.num_workers = num_workers model_config = model_config or {} policy_mapping_fn = (policy_mapping_fn or (lambda agent_id: DEFAULT_POLICY_ID)) @@ -777,6 +781,8 @@ class RolloutWorker(EvaluatorInterface): conf) in sorted(policy_dict.items()): logger.debug("Creating policy for {}".format(name)) merged_conf = merge_dicts(policy_config, conf) + merged_conf["num_workers"] = self.num_workers + merged_conf["worker_index"] = self.worker_index if self.preprocessing_enabled: preprocessor = ModelCatalog.get_preprocessor_for_space( obs_space, merged_conf.get("model")) diff --git a/rllib/evaluation/sampler.py b/rllib/evaluation/sampler.py index 41de80405..81e949ab4 100644 --- a/rllib/evaluation/sampler.py +++ b/rllib/evaluation/sampler.py @@ -9,7 +9,7 @@ from ray.rllib.evaluation.episode import MultiAgentEpisode, _flatten_action from ray.rllib.evaluation.rollout_metrics import RolloutMetrics from ray.rllib.evaluation.sample_batch_builder import \ MultiAgentSampleBatchBuilder -from ray.rllib.policy.policy import TupleActions +from ray.rllib.policy.policy import TupleActions, clip_action from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.env.base_env import BaseEnv, ASYNC_RESET_RETURN from ray.rllib.env.atari_wrappers import get_wrapper_by_cls, MonitorEnv @@ -17,7 +17,6 @@ from ray.rllib.offline import InputReader from ray.rllib.utils.annotations import override from ray.rllib.utils.debug import log_once, summarize from ray.rllib.utils.tf_run_builder import TFRunBuilder -from ray.rllib.policy.policy import clip_action logger = logging.getLogger(__name__) @@ -531,11 +530,16 @@ def _do_policy_eval(tf_sess, to_eval, policies, active_episodes): TFPolicy.compute_actions.__code__): rnn_in_cols = _to_column_format(rnn_in) # TODO(ekl): how can we make info batch available to TF code? + # TODO(sven): Return dict from _build_compute_actions. + # it's becoming more and more unclear otherwise, what's where in + # the return tuple. pending_fetches[policy_id] = policy._build_compute_actions( - builder, [t.obs for t in eval_data], - rnn_in_cols, + builder, + obs_batch=[t.obs for t in eval_data], + state_batches=rnn_in_cols, prev_action_batch=[t.prev_action for t in eval_data], - prev_reward_batch=[t.prev_reward for t in eval_data]) + prev_reward_batch=[t.prev_reward for t in eval_data], + timestep=policy.global_timestep) else: # TODO(sven): Does this work for LSTM torch? rnn_in_cols = [ @@ -544,11 +548,12 @@ def _do_policy_eval(tf_sess, to_eval, policies, active_episodes): ] eval_results[policy_id] = policy.compute_actions( [t.obs for t in eval_data], - rnn_in_cols, + state_batches=rnn_in_cols, prev_action_batch=[t.prev_action for t in eval_data], prev_reward_batch=[t.prev_reward for t in eval_data], info_batch=[t.info for t in eval_data], - episodes=[active_episodes[t.env_id] for t in eval_data]) + episodes=[active_episodes[t.env_id] for t in eval_data], + timestep=policy.global_timestep) if builder: for k, v in pending_fetches.items(): eval_results[k] = builder.get(v) @@ -578,7 +583,7 @@ def _process_policy_eval_results(to_eval, eval_results, active_episodes, for policy_id, eval_data in to_eval.items(): rnn_in_cols = _to_column_format([t.rnn_state for t in eval_data]) - actions, rnn_out_cols, pi_info_cols = eval_results[policy_id] + actions, rnn_out_cols, pi_info_cols = eval_results[policy_id][:3] if len(rnn_in_cols) != len(rnn_out_cols): raise ValueError("Length of RNN in did not match RNN out, got: " "{} vs {}".format(rnn_in_cols, rnn_out_cols)) @@ -650,6 +655,13 @@ def _to_column_format(rnn_state_rows): def _get_or_raise(mapping, policy_id): + """Returns a Policy object under key `policy_id` in `mapping`. + + Throws an error if `policy_id` cannot be found. + + Returns: + Policy: The found Policy object. + """ if policy_id not in mapping: raise ValueError( "Could not find policy for agent: agent policy id `{}` not " diff --git a/rllib/evaluation/worker_set.py b/rllib/evaluation/worker_set.py index a1798951f..83a224297 100644 --- a/rllib/evaluation/worker_set.py +++ b/rllib/evaluation/worker_set.py @@ -61,7 +61,7 @@ class WorkerSet: # Create a number of remote workers self._remote_workers = [] - self.add_workers(num_workers) + self.add_workers(self._num_workers) def local_worker(self): """Return the local rollout worker.""" @@ -72,7 +72,12 @@ class WorkerSet: return self._remote_workers def add_workers(self, num_workers): - """Create and add a number of remote workers to this worker set.""" + """Creates and add a number of remote workers to this worker set. + + Args: + num_workers (int): The number of remote Workers to add to this + WorkerSet. + """ remote_args = { "num_cpus": self._remote_config["num_cpus_per_worker"], "num_gpus": self._remote_config["num_gpus_per_worker"], @@ -113,7 +118,6 @@ class WorkerSet: The index will be passed as the second arg to the given function. """ - local_result = [func(self.local_worker(), 0)] remote_results = ray_get_and_free([ w.apply.remote(func, i + 1) @@ -121,6 +125,47 @@ class WorkerSet: ]) return local_result + remote_results + @DeveloperAPI + def foreach_policy(self, func): + """Apply the given function to each worker's (policy, policy_id) tuple. + + Args: + func (callable): A function - taking a Policy and its ID - that is + called on all workers' Policies. + + Returns: + List[any]: The list of return values of func over all workers' + policies. + """ + local_results = self.local_worker().foreach_policy(func) + remote_results = [] + for worker in self.remote_workers(): + res = ray_get_and_free( + worker.apply.remote(lambda w: w.foreach_policy(func))) + remote_results.extend(res) + return local_results + remote_results + + @DeveloperAPI + def foreach_trainable_policy(self, func): + """Apply `func` to all workers' Policies iff in `policies_to_train`. + + Args: + func (callable): A function - taking a Policy and its ID - that is + called on all workers' Policies in `worker.policies_to_train`. + + Returns: + List[any]: The list of n return values of all + `func([trainable policy], [ID])`-calls. + """ + local_results = self.local_worker().foreach_trainable_policy(func) + remote_results = [] + for worker in self.remote_workers(): + res = ray_get_and_free( + worker.apply.remote( + lambda w: w.foreach_trainable_policy(func))) + remote_results.extend(res) + return local_results + remote_results + @staticmethod def _from_existing(local_worker, remote_workers=None): workers = WorkerSet(None, None, {}, _setup=False) @@ -200,6 +245,7 @@ class WorkerSet: model_config=config["model"], policy_config=config, worker_index=worker_index, + num_workers=self._num_workers, monitor_path=self._logdir if config["monitor"] else None, log_dir=self._logdir, log_level=config["log_level"], diff --git a/rllib/examples/multiagent_two_trainers.py b/rllib/examples/multiagent_two_trainers.py index f3580faf4..05a7685c3 100644 --- a/rllib/examples/multiagent_two_trainers.py +++ b/rllib/examples/multiagent_two_trainers.py @@ -54,6 +54,7 @@ if __name__ == "__main__": "policy_mapping_fn": policy_mapping_fn, "policies_to_train": ["ppo_policy"], }, + "exploration": False, # disable filters, otherwise we would need to synchronize those # as well to the DQN agent "observation_filter": "NoFilter", @@ -71,11 +72,6 @@ if __name__ == "__main__": "n_step": 3, }) - # disable DQN exploration when used by the PPO trainer - ppo_trainer.workers.foreach_worker( - lambda ev: ev.for_policy( - lambda pi: pi.set_epsilon(0.0), policy_id="dqn_policy")) - # You should see both the printed X and Y approach 200 as this trains: # info: # policy_reward_mean: diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index f55f13b95..ba4542040 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -106,17 +106,17 @@ class ModelCatalog: @staticmethod @DeveloperAPI - def get_action_dist( - action_space, config, dist_type=None, torch=None, - framework="tf" - ): - """ - Returns action distribution class and size for the given action space. + def get_action_dist(action_space, + config, + dist_type=None, + torch=None, + framework="tf"): + """Returns a distribution class and size for the given action space. Args: action_space (Space): Action space of the target gym env. config (dict): Optional model config. - dist_type (str): Optional identifier of the action distribution. + dist_type (Optional[str]): Identifier of the action distribution. torch (bool): Obsoleted: Whether to return PyTorch Model and distribution (use framework="torch" instead). framework (str): One of "tf" or "torch". @@ -156,8 +156,7 @@ class ModelCatalog: if framework == "torch": # TODO(sven): implement raise NotImplementedError( - "Tuple action spaces not supported for Pytorch." - ) + "Tuple action spaces not supported for Pytorch.") child_dist = [] input_lens = [] for action in action_space.spaces: @@ -174,27 +173,23 @@ class ModelCatalog: if framework == "torch": # TODO(sven): implement raise NotImplementedError( - "Simplex action spaces not supported for Pytorch." - ) + "Simplex action spaces not supported for torch.") dist = Dirichlet elif isinstance(action_space, gym.spaces.MultiDiscrete): if framework == "torch": # TODO(sven): implement raise NotImplementedError( - "MultiDiscrete action spaces not supported for Pytorch." - ) + "MultiDiscrete action spaces not supported for Pytorch.") return partial(MultiCategorical, input_lens=action_space.nvec), \ int(sum(action_space.nvec)) elif isinstance(action_space, gym.spaces.Dict): # TODO(sven): implement raise NotImplementedError( "Dict action spaces are not supported, consider using " - "gym.spaces.Tuple instead" - ) + "gym.spaces.Tuple instead") else: - raise NotImplementedError( - "Unsupported args: {} {}".format(action_space, dist_type) - ) + raise NotImplementedError("Unsupported args: {} {}".format( + action_space, dist_type)) return dist, dist.required_model_output_shape(action_space, config) diff --git a/rllib/models/tf/tf_action_dist.py b/rllib/models/tf/tf_action_dist.py index f52faa1d2..5f25e4516 100644 --- a/rllib/models/tf/tf_action_dist.py +++ b/rllib/models/tf/tf_action_dist.py @@ -15,7 +15,7 @@ class TFActionDistribution(ActionDistribution): @DeveloperAPI def __init__(self, inputs, model): - super(TFActionDistribution, self).__init__(inputs, model) + super().__init__(inputs, model) self.sample_op = self._build_sample_op() @DeveloperAPI @@ -43,7 +43,7 @@ class Categorical(TFActionDistribution): @DeveloperAPI def __init__(self, inputs, model=None): - super(Categorical, self).__init__(inputs, model) + super().__init__(inputs, model) @override(ActionDistribution) def logp(self, x): @@ -142,7 +142,7 @@ class DiagGaussian(TFActionDistribution): self.mean = mean self.log_std = log_std self.std = tf.exp(log_std) - TFActionDistribution.__init__(self, inputs, model) + super().__init__(inputs, model) @override(ActionDistribution) def logp(self, x): @@ -281,12 +281,12 @@ class Dirichlet(TFActionDistribution): validate_args=True, allow_nan_stats=False, ) - TFActionDistribution.__init__(self, concentration, model) + super().__init__(concentration, model) @override(ActionDistribution) def logp(self, x): - # Support of Dirichlet are positive real numbers. x is already be - # an array of positive number, but we clip to avoid zeros due to + # Support of Dirichlet are positive real numbers. x is already + # an array of positive numbers, but we clip to avoid zeros due to # numerical errors. x = tf.maximum(x, self.epsilon) x = x / tf.reduce_sum(x, axis=-1, keepdims=True) diff --git a/rllib/offline/off_policy_estimator.py b/rllib/offline/off_policy_estimator.py index 093d050d7..7af8e039d 100644 --- a/rllib/offline/off_policy_estimator.py +++ b/rllib/offline/off_policy_estimator.py @@ -62,7 +62,8 @@ class OffPolicyEstimator: state_batches=[batch[k] for k in state_keys], prev_action_batch=batch.data.get("prev_action"), prev_reward_batch=batch.data.get("prev_reward"), - info_batch=batch.data.get("info")) + info_batch=batch.data.get("info"), + explore=False) # switch off any exploration if "action_prob" not in info: raise ValueError( "Off-policy estimation is not possible unless the policy " diff --git a/rllib/optimizers/sync_replay_optimizer.py b/rllib/optimizers/sync_replay_optimizer.py index 966ba35a7..6db44d27e 100644 --- a/rllib/optimizers/sync_replay_optimizer.py +++ b/rllib/optimizers/sync_replay_optimizer.py @@ -25,24 +25,24 @@ class SyncReplayOptimizer(PolicyOptimizer): "td_error" array in the info return of compute_gradients(). This error term will be used for sample prioritization.""" - def __init__(self, - workers, - learning_starts=1000, - buffer_size=10000, - prioritized_replay=True, - prioritized_replay_alpha=0.6, - prioritized_replay_beta=0.4, - prioritized_replay_eps=1e-6, - schedule_max_timesteps=100000, - beta_annealing_fraction=0.2, - final_prioritized_replay_beta=0.4, - train_batch_size=32, - sample_batch_size=4, - before_learn_on_batch=None, - synchronize_sampling=False): + def __init__( + self, + workers, + learning_starts=1000, + buffer_size=10000, + prioritized_replay=True, + prioritized_replay_alpha=0.6, + prioritized_replay_beta=0.4, + prioritized_replay_eps=1e-6, + final_prioritized_replay_beta=0.4, + train_batch_size=32, + before_learn_on_batch=None, + synchronize_sampling=False, + prioritized_replay_beta_annealing_timesteps=100000 * 0.2, + ): """Initialize an sync replay optimizer. - Arguments: + Args: workers (WorkerSet): all workers learning_starts (int): wait until this many steps have been sampled before starting optimization. @@ -51,25 +51,24 @@ class SyncReplayOptimizer(PolicyOptimizer): prioritized_replay_alpha (float): replay alpha hyperparameter prioritized_replay_beta (float): replay beta hyperparameter prioritized_replay_eps (float): replay eps hyperparameter - schedule_max_timesteps (int): number of timesteps in the schedule - beta_annealing_fraction (float): fraction of schedule to anneal - beta over - final_prioritized_replay_beta (float): final value of beta + final_prioritized_replay_beta (float): Final value of beta. train_batch_size (int): size of batches to learn on - sample_batch_size (int): size of batches to sample from workers before_learn_on_batch (function): callback to run before passing the sampled batch to learn on synchronize_sampling (bool): whether to sample the experiences for all policies with the same indices (used in MADDPG). + prioritized_replay_beta_annealing_timesteps (int): The timestep at + which PR-beta annealing should end. """ PolicyOptimizer.__init__(self, workers) self.replay_starts = learning_starts + # Linearly annealing beta used in Rainbow paper, stopping at # `final_prioritized_replay_beta`. self.prioritized_replay_beta = PiecewiseSchedule( endpoints=[(0, prioritized_replay_beta), - (schedule_max_timesteps * beta_annealing_fraction, + (prioritized_replay_beta_annealing_timesteps, final_prioritized_replay_beta)], outside_value=final_prioritized_replay_beta) self.prioritized_replay_eps = prioritized_replay_eps diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index c18c4c617..2dbf9881f 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -115,6 +115,7 @@ class DynamicTFPolicy(TFPolicy): SampleBatch.PREV_REWARDS: prev_rewards, "is_training": self._get_is_training_placeholder(), } + # Placeholder for RNN time-chunk valid lengths. self._seq_lens = tf.placeholder( dtype=tf.int32, shape=[None], name="seq_lens") @@ -156,11 +157,12 @@ class DynamicTFPolicy(TFPolicy): model_out, self._state_out = self.model(self._input_dict, self._state_in, self._seq_lens) - # Setup action sampler + # Setup custom action sampler. if action_sampler_fn: action_sampler, action_logp = action_sampler_fn( self, self.model, self._input_dict, obs_space, action_space, config) + # Default action sampler. else: action_dist = self.dist_class(model_out, self.model) action_sampler = action_dist.sample() @@ -172,8 +174,8 @@ class DynamicTFPolicy(TFPolicy): batch_divisibility_req = get_batch_divisibility_req(self) else: batch_divisibility_req = 1 - TFPolicy.__init__( - self, + + super().__init__( obs_space, action_space, config, diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index 2a213e8f3..9be9f880d 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -113,6 +113,8 @@ def traced_eager_policy(eager_policy_cls): prev_reward_batch=None, info_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): obs_batch = tf.convert_to_tensor(obs_batch) @@ -127,7 +129,7 @@ def traced_eager_policy(eager_policy_cls): return self._traced_compute_actions( obs_batch, state_batches, prev_action_batch, prev_reward_batch, - info_batch, episodes, **kwargs) + info_batch, episodes, explore, timestep, **kwargs) @override(Policy) @convert_eager_inputs @@ -294,8 +296,9 @@ def build_eager_tf_policy(name, prev_reward_batch=None, info_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): - # TODO: remove python side effect to cull sources of bugs. self._is_training = False self._state_in = state_batches @@ -331,15 +334,30 @@ def build_eager_tf_policy(name, self, self.model, input_dict, self.observation_space, self.action_space, self.config) - fetches = {} + # Override `action` with exploration action. + if explore and self.exploration: + action = self.exploration.get_exploration_action( + action, + self.model, + action_dist=self.dist_class, + explore=True, + timestep=timestep + if timestep is not None else self.global_timestep) + logp = None + + extra_fetches = {} if logp is not None: - fetches.update({ + extra_fetches.update({ ACTION_PROB: tf.exp(logp), ACTION_LOGP: logp, }) if extra_action_fetches_fn: - fetches.update(extra_action_fetches_fn(self)) - return action, state_out, fetches + extra_fetches.update(extra_action_fetches_fn(self)) + + # Increase our global sampling timestep counter by 1. + self.global_timestep += 1 + + return action, state_out, extra_fetches @override(Policy) def apply_gradients(self, gradients): @@ -364,6 +382,7 @@ def build_eager_tf_policy(name, """Return the list of all savable variables for this policy.""" return self.model.variables() + @override(Policy) def is_recurrent(self): return len(self._state_in) > 0 diff --git a/rllib/policy/policy.py b/rllib/policy/policy.py index 9518a6d5d..f84bb2df6 100644 --- a/rllib/policy/policy.py +++ b/rllib/policy/policy.py @@ -4,6 +4,8 @@ import gym import numpy as np from ray.rllib.utils.annotations import DeveloperAPI +from ray.rllib.utils.exploration.exploration import Exploration +from ray.rllib.utils.from_config import from_config # By convention, metrics from optimizing the loss can be reported in the # `grad_info` dict returned by learn_on_batch() / compute_grads() via this key. @@ -58,6 +60,22 @@ class Policy(metaclass=ABCMeta): self.observation_space = observation_space self.action_space = action_space self.config = config + # The global timestep, broadcast down from time to time from the + # driver. + self.global_timestep = 0 + + # Create the Exploration object to use for this Policy. + self.exploration = from_config( + Exploration, + config.get("exploration"), + action_space=self.action_space, + num_workers=self.config.get("num_workers"), + worker_index=self.config.get("worker_index"), + framework="torch" if self.config.get("use_pytorch") else "tf") + + # The default sampling behavior for actions if not explicitly given + # in calls to `compute_actions`. + self.deterministic = config.get("deterministic", False) @abstractmethod @DeveloperAPI @@ -68,6 +86,8 @@ class Policy(metaclass=ABCMeta): prev_reward_batch=None, info_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): """Computes actions for the current policy. @@ -83,6 +103,9 @@ class Policy(metaclass=ABCMeta): episodes (list): MultiAgentEpisode for each obs in obs_batch. This provides access to all of the internal episode state, which may be useful for model-based or multiagent algorithms. + explore (bool): Whether we should use exploration + (e.g. when training) or not (for inference/evaluation). + timestep (int): The current (sampling) time step. kwargs: forward compatibility placeholder Returns: @@ -104,6 +127,8 @@ class Policy(metaclass=ABCMeta): info=None, episode=None, clip_actions=False, + explore=True, + timestep=None, **kwargs): """Unbatched version of compute_actions. @@ -117,6 +142,9 @@ class Policy(metaclass=ABCMeta): internal episode state, which may be useful for model-based or multi-agent algorithms. clip_actions (bool): should the action be clipped + explore (bool): Whether we should use exploration (i.e. when + training) or not (e.g. for inference/evaluation). + timestep (int): The current (sampling) time step. kwargs: forward compatibility placeholder Returns: @@ -146,7 +174,10 @@ class Policy(metaclass=ABCMeta): prev_action_batch=prev_action_batch, prev_reward_batch=prev_reward_batch, info_batch=info_batch, - episodes=episodes) + episodes=episodes, + explore=explore, + timestep=timestep) + if clip_actions: action = clip_action(action, self.action_space) @@ -237,10 +268,55 @@ class Policy(metaclass=ABCMeta): raise NotImplementedError @DeveloperAPI - def num_state_tensors(self): - """ + def get_exploration_info(self): + """Returns the current exploration information of this policy. + + This information depends on the policy's Exploration object. + Returns: - int: The number of RNN hidden states kept by this Policy's Model. + any: Serializable information on the `self.exploration` object. + """ + if isinstance(self.exploration, Exploration): + return self.exploration.get_info() + + @DeveloperAPI + def get_exploration_state(self): + """Returns the current exploration state of this policy. + + This state depends on the policy's Exploration object. + + Returns: + any: Serializable copy or view of the current exploration state. + """ + if isinstance(self.exploration, Exploration): + raise NotImplementedError + + @DeveloperAPI + def set_exploration_state(self, exploration_state): + """Sets the current exploration state of this Policy. + + Arguments: + exploration_state (any): Serializable copy or view of the new + exploration state. + """ + if isinstance(self.exploration, Exploration): + raise NotImplementedError + + @DeveloperAPI + def is_recurrent(self): + """Whether this Policy holds a recurrent Model. + + Returns: + bool: True if this Policy has-a RNN-based Model. + """ + return 0 + + @DeveloperAPI + def num_state_tensors(self): + """The number of internal states needed by the RNN-Model of the Policy. + + Returns: + int: The number of RNN internal states kept by this Policy's Model. """ return 0 @@ -274,7 +350,9 @@ class Policy(metaclass=ABCMeta): Arguments: global_vars (dict): Global variables broadcast from the driver. """ - pass + # Store the current global time step (sum over all policies' sample + # steps). + self.global_timestep = global_vars["timestep"] @DeveloperAPI def export_model(self, export_dir): diff --git a/rllib/policy/tests/test_policy.py b/rllib/policy/tests/test_policy.py index a34d75f18..af5a952d6 100644 --- a/rllib/policy/tests/test_policy.py +++ b/rllib/policy/tests/test_policy.py @@ -8,6 +8,7 @@ class TestPolicy(Policy): A dummy Policy that returns a random (batched) int for compute_actions and implements all other abstract methods of Policy with "pass". """ + def compute_actions(self, obs_batch, state_batches=None, @@ -16,6 +17,6 @@ class TestPolicy(Policy): episodes=None, deterministic=None, explore=True, - time_step=None, + timestep=None, **kwargs): return [random.choice([0, 1])] * len(obs_batch), [], {} diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 0d9522129..1a051ca1e 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -12,6 +12,7 @@ from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.debug import log_once, summarize +from ray.rllib.utils.exploration.exploration import Exploration from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule from ray.rllib.utils.tf_run_builder import TFRunBuilder from ray.rllib.utils import try_import_tf @@ -71,13 +72,14 @@ class TFPolicy(Policy): Arguments: observation_space (gym.Space): Observation space of the env. action_space (gym.Space): Action space of the env. - sess (Session): TensorFlow session to use. - obs_input (Tensor): input placeholder for observations, of shape + config (dict): The Policy config dict. + sess (Session): The TensorFlow session to use. + obs_input (Tensor): Input placeholder for observations, of shape [BATCH_SIZE, obs...]. action_sampler (Tensor): Tensor for sampling an action, of shape [BATCH_SIZE, action...] - loss (Tensor): scalar policy loss output tensor. - loss_inputs (list): a (name, placeholder) tuple for each loss + loss (Tensor): Scalar policy loss output tensor. + loss_inputs (list): A (name, placeholder) tuple for each loss input argument. Each placeholder name must correspond to a SampleBatch column key returned by postprocess_trajectory(), and has shape [BATCH_SIZE, data...]. These keys will be read @@ -90,10 +92,10 @@ class TFPolicy(Policy): state_outputs (list): list of RNN state output Tensors. prev_action_input (Tensor): placeholder for previous actions prev_reward_input (Tensor): placeholder for previous rewards - seq_lens (Tensor): placeholder for RNN sequence lengths, of shape + seq_lens (Tensor): Placeholder for RNN sequence lengths, of shape [NUM_SEQUENCES]. Note that NUM_SEQUENCES << BATCH_SIZE. See policy/rnn_sequencing.py for more information. - max_seq_len (int): max sequence length for LSTM training. + max_seq_len (int): Max sequence length for LSTM training. batch_divisibility_req (int): pad all agent experiences batches to multiples of this value. This only has an effect if not using a LSTM model. @@ -101,14 +103,16 @@ class TFPolicy(Policy): applying gradients. Otherwise we run all update ops found in the current variable scope. """ - super(TFPolicy, self).__init__(observation_space, action_space, config) + super().__init__(observation_space, action_space, config) self.model = model self._sess = sess self._obs_input = obs_input self._prev_action_input = prev_action_input self._prev_reward_input = prev_reward_input - self._sampler = action_sampler + self._action = action_sampler self._is_training = self._get_is_training_placeholder() + self._is_exploring = tf.placeholder_with_default( + True, (), name="is_exploring") self._action_logp = action_logp self._action_prob = (tf.exp(self._action_logp) if self._action_logp is not None else None) @@ -120,6 +124,7 @@ class TFPolicy(Policy): self._update_ops = update_ops self._stats_fetches = {} self._loss_input_dict = None + self._timestep = tf.placeholder(tf.int32, (), name="timestep") if loss is not None: self._initialize_loss(loss, loss_inputs) @@ -139,6 +144,17 @@ class TFPolicy(Policy): raise ValueError( "seq_lens tensor must be given if state inputs are defined") + # Apply the post-forward-pass exploration if applicable. + # And store the `get_state` op. + self._exploration_action = None + if self.exploration: + self._exploration_action = self.exploration.get_exploration_action( + self._action, + self.model, + action_dist=self.dist_class, + explore=self._is_exploring, + timestep=self._timestep) + def variables(self): """Return the list of all savable variables for this policy.""" return self.model.variables() @@ -149,7 +165,6 @@ class TFPolicy(Policy): If the loss has not been initialized and a loss input placeholder is requested, an error is raised. """ - obs_inputs = { SampleBatch.CUR_OBS: self._obs_input, SampleBatch.PREV_ACTIONS: self._prev_action_input, @@ -191,11 +206,12 @@ class TFPolicy(Policy): if g is not None ] self._grads = [g for (g, v) in self._grads_and_vars] + + # TODO(sven/ekl): Deprecate support for v1 models. if hasattr(self, "model") and isinstance(self.model, ModelV2): self._variables = ray.experimental.tf_utils.TensorFlowVariables( [], self._sess, self.variables()) else: - # TODO(ekl) deprecate support for v1 models self._variables = ray.experimental.tf_utils.TensorFlowVariables( self._loss, self._sess) @@ -225,12 +241,22 @@ class TFPolicy(Policy): prev_reward_batch=None, info_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): builder = TFRunBuilder(self._sess, "compute_actions") - fetches = self._build_compute_actions(builder, obs_batch, - state_batches, prev_action_batch, - prev_reward_batch) - return builder.get(fetches) + fetches = self._build_compute_actions( + builder, + obs_batch, + state_batches, + prev_action_batch, + prev_reward_batch, + explore=explore, + timestep=timestep + if timestep is not None else self.global_timestep) + # Execute session run to get action (and other fetches). + ret = builder.get(fetches) + return ret[:3] @override(Policy) def compute_gradients(self, postprocessed_batch): @@ -253,6 +279,11 @@ class TFPolicy(Policy): fetches = self._build_learn_on_batch(builder, postprocessed_batch) return builder.get(fetches) + @override(Policy) + def get_exploration_info(self): + if isinstance(self.exploration, Exploration): + return self._sess.run(self.exploration.get_info()) + @override(Policy) def get_weights(self): return self._variables.get_weights() @@ -293,6 +324,7 @@ class TFPolicy(Policy): Optional, only required to work with the multi-GPU optimizer.""" raise NotImplementedError + @override(Policy) def is_recurrent(self): return len(self._state_inputs) > 0 @@ -311,13 +343,11 @@ class TFPolicy(Policy): By default we only return action probability info (if present). """ + ret = {} if self._action_logp is not None: - return { - ACTION_PROB: self._action_prob, - ACTION_LOGP: self._action_logp, - } - else: - return {} + ret[ACTION_PROB] = self._action_prob + ret[ACTION_LOGP] = self._action_logp + return ret @DeveloperAPI def extra_compute_grad_feed_dict(self): @@ -358,7 +388,8 @@ class TFPolicy(Policy): This can be called safely before __init__ has run. """ if not hasattr(self, "_is_training"): - self._is_training = tf.placeholder_with_default(False, ()) + self._is_training = tf.placeholder_with_default( + False, (), name="is_training") return self._is_training def _debug_vars(self): @@ -413,7 +444,7 @@ class TFPolicy(Policy): # build output signatures output_signature = self._extra_output_signature_def() output_signature["actions"] = \ - tf.saved_model.utils.build_tensor_info(self._sampler) + tf.saved_model.utils.build_tensor_info(self._action) for state_output in self._state_outputs: output_signature[state_output.name] = \ tf.saved_model.utils.build_tensor_info(state_output) @@ -432,7 +463,10 @@ class TFPolicy(Policy): state_batches=None, prev_action_batch=None, prev_reward_batch=None, - episodes=None): + episodes=None, + explore=True, + timestep=None): + state_batches = state_batches or [] if len(self._state_inputs) != len(state_batches): raise ValueError( @@ -449,9 +483,20 @@ class TFPolicy(Policy): prev_reward_batch is not None: builder.add_feed_dict({self._prev_reward_input: prev_reward_batch}) builder.add_feed_dict({self._is_training: False}) + builder.add_feed_dict({self._is_exploring: explore}) + if timestep is not None: + builder.add_feed_dict({self._timestep: timestep}) builder.add_feed_dict(dict(zip(self._state_inputs, state_batches))) - fetches = builder.add_fetches([self._sampler] + self._state_outputs + - [self.extra_compute_action_fetches()]) + # Get an exploration action. + if explore and self.exploration: + fetches = builder.add_fetches( + [self._exploration_action] + self._state_outputs + + [self.extra_compute_action_fetches()]) + # Do not explore. + else: + fetches = builder.add_fetches( + [self._action] + self._state_outputs + + [self.extra_compute_action_fetches()]) return fetches[0], fetches[1:-1], fetches[-1] def _build_compute_gradients(self, builder, postprocessed_batch): diff --git a/rllib/policy/torch_policy.py b/rllib/policy/torch_policy.py index e69484ba9..a68bc133d 100644 --- a/rllib/policy/torch_policy.py +++ b/rllib/policy/torch_policy.py @@ -43,8 +43,7 @@ class TorchPolicy(Policy): action_distribution_class (ActionDistribution): Class for action distribution. """ - super(TorchPolicy, self).__init__(observation_space, action_space, - config) + super().__init__(observation_space, action_space, config) self.device = (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")) self.model = model.to(self.device) @@ -64,6 +63,8 @@ class TorchPolicy(Policy): prev_reward_batch=None, info_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): with torch.no_grad(): input_dict = self._lazy_tensor_dict({ @@ -76,8 +77,16 @@ class TorchPolicy(Policy): model_out = self.model(input_dict, state_batches, [1]) logits, state = model_out action_dist = self.dist_class(logits, self.model) - actions = action_dist.sample() + # Try our Exploration, if any. + if self.exploration: + actions = self.exploration.get_action( + model_out, self.model, action_dist, explore, timestep + if timestep is not None else self.global_timestep) + else: + actions = action_dist.sample() + input_dict[SampleBatch.ACTIONS] = actions + return (actions.cpu().numpy(), [h.cpu().numpy() for h in state], self.extra_action_out(input_dict, state_batches, self.model, action_dist)) @@ -149,6 +158,10 @@ class TorchPolicy(Policy): def set_weights(self, weights): self.model.load_state_dict(weights) + @override(Policy) + def is_recurrent(self): + return len(self.model.get_initial_state()) > 0 + @override(Policy) def num_state_tensors(self): return len(self.model.get_initial_state()) @@ -217,7 +230,7 @@ class TorchPolicy(Policy): @DeveloperAPI -class LearningRateSchedule(object): +class LearningRateSchedule: """Mixin for TFPolicy that adds a learning rate schedule.""" @DeveloperAPI @@ -242,7 +255,7 @@ class LearningRateSchedule(object): @DeveloperAPI -class EntropyCoeffSchedule(object): +class EntropyCoeffSchedule: """Mixin for TorchPolicy that adds entropy coeff decay.""" @DeveloperAPI diff --git a/rllib/tests/test_catalog.py b/rllib/tests/test_catalog.py index b8287ad65..14aef8c96 100644 --- a/rllib/tests/test_catalog.py +++ b/rllib/tests/test_catalog.py @@ -118,7 +118,9 @@ class ModelCatalogTest(unittest.TestCase): class Model(): pass - ray.init(object_store_memory=1000 * 1024 * 1024) + ray.init( + object_store_memory=1000 * 1024 * 1024, + ignore_reinit_error=True) # otherwise fails sometimes locally # registration ModelCatalog.register_custom_action_dist("test", CustomActionDistribution) diff --git a/rllib/tests/test_dependency.py b/rllib/tests/test_dependency.py index fcb844644..840c31a8f 100644 --- a/rllib/tests/test_dependency.py +++ b/rllib/tests/test_dependency.py @@ -3,11 +3,13 @@ import os import sys -os.environ["RLLIB_TEST_NO_TF_IMPORT"] = "1" - if __name__ == "__main__": + # + os.environ["RLLIB_TEST_NO_TF_IMPORT"] = "1" + from ray.rllib.agents.a3c import A2CTrainer - assert "tensorflow" not in sys.modules, "TF initially present" + assert "tensorflow" not in sys.modules, \ + "TF initially present, when it shouldn't." # note: no ray.init(), to test it works without Ray trainer = A2CTrainer( @@ -18,3 +20,6 @@ if __name__ == "__main__": trainer.train() assert "tensorflow" not in sys.modules, "TF should not be imported" + + # Clean up. + del os.environ["RLLIB_TEST_NO_TF_IMPORT"] diff --git a/rllib/tests/test_external_env.py b/rllib/tests/test_external_env.py index c9aff809c..2b98ac465 100644 --- a/rllib/tests/test_external_env.py +++ b/rllib/tests/test_external_env.py @@ -159,7 +159,10 @@ class TestExternalEnv(unittest.TestCase): register_env( "test3", lambda _: PartOffPolicyServing( gym.make("CartPole-v0"), off_pol_frac=0.2)) - dqn = DQNTrainer(env="test3", config={"exploration_fraction": 0.001}) + dqn = DQNTrainer( + env="test3", config={"exploration": { + "epsilon_timesteps": 100 + }}) for i in range(100): result = dqn.train() print("Iteration {}, reward {}, timesteps {}".format( diff --git a/rllib/tests/test_multi_agent_env.py b/rllib/tests/test_multi_agent_env.py index cc0db3322..075c761b9 100644 --- a/rllib/tests/test_multi_agent_env.py +++ b/rllib/tests/test_multi_agent_env.py @@ -448,6 +448,8 @@ class TestMultiAgentEnv(unittest.TestCase): prev_action_batch=None, prev_reward_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): return [0] * len(obs_batch), [[h] * len(obs_batch)], {} @@ -605,9 +607,6 @@ class TestMultiAgentEnv(unittest.TestCase): workers = WorkerSet._from_existing(worker, remote_workers) optimizer = optimizer_cls(workers) for i in range(200): - worker.foreach_policy(lambda p, _: p.set_epsilon( - max(0.02, 1 - i * .02)) - if isinstance(p, DQNTFPolicy) else None) optimizer.step() result = collect_metrics(worker, remote_workers) if i % 20 == 0: diff --git a/rllib/tests/test_rollout_worker.py b/rllib/tests/test_rollout_worker.py index 8cc52f3dc..908a7c4f0 100644 --- a/rllib/tests/test_rollout_worker.py +++ b/rllib/tests/test_rollout_worker.py @@ -24,6 +24,8 @@ class MockPolicy(TestPolicy): prev_action_batch=None, prev_reward_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): return [random.choice([0, 1])] * len(obs_batch), [], {} @@ -43,6 +45,8 @@ class BadPolicy(MockPolicy): prev_action_batch=None, prev_reward_batch=None, episodes=None, + explore=True, + timestep=None, **kwargs): raise Exception("intentional error") diff --git a/rllib/tuned_examples/atari-apex.yaml b/rllib/tuned_examples/atari-apex.yaml index e24e347dd..4785c4b30 100644 --- a/rllib/tuned_examples/atari-apex.yaml +++ b/rllib/tuned_examples/atari-apex.yaml @@ -17,12 +17,13 @@ apex: adam_epsilon: .00015 hiddens: [512] buffer_size: 1000000 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.01 - exploration_fraction: .1 + exploration: + final_epsilon: 0.01 + epsilon_timesteps: 200000 prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 + final_prioritized_replay_beta_annealing_timesteps: 2000000 + num_gpus: 1 # APEX diff --git a/rllib/tuned_examples/atari-dist-dqn.yaml b/rllib/tuned_examples/atari-dist-dqn.yaml index 94f5783ed..ab2480a04 100644 --- a/rllib/tuned_examples/atari-dist-dqn.yaml +++ b/rllib/tuned_examples/atari-dist-dqn.yaml @@ -21,11 +21,11 @@ atari-dist-dqn: buffer_size: 1000000 sample_batch_size: 4 train_batch_size: 32 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.01 - exploration_fraction: .1 + exploration: + epsilon_timesteps: 200000 + final_epsilon: 0.01 prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 + prioritized_replay_beta_annealing_timesteps: 2000000 num_gpus: 0.2 timesteps_per_iteration: 10000 diff --git a/rllib/tuned_examples/atari-dqn.yaml b/rllib/tuned_examples/atari-dqn.yaml index b8731bb05..6ebaf4906 100644 --- a/rllib/tuned_examples/atari-dqn.yaml +++ b/rllib/tuned_examples/atari-dqn.yaml @@ -23,11 +23,11 @@ atari-basic-dqn: buffer_size: 1000000 sample_batch_size: 4 train_batch_size: 32 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.01 - exploration_fraction: .1 + exploration: + epsilon_timesteps: 200000 + final_epsilon: 0.01 prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 + prioritized_replay_beta_annealing_timesteps: 2000000 num_gpus: 0.2 timesteps_per_iteration: 10000 diff --git a/rllib/tuned_examples/atari-duel-ddqn.yaml b/rllib/tuned_examples/atari-duel-ddqn.yaml index b5a13162b..93cfd1e0d 100644 --- a/rllib/tuned_examples/atari-duel-ddqn.yaml +++ b/rllib/tuned_examples/atari-duel-ddqn.yaml @@ -23,11 +23,11 @@ dueling-ddqn: buffer_size: 1000000 sample_batch_size: 4 train_batch_size: 32 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.01 - exploration_fraction: .1 + exploration: + epsilon_timesteps: 200000 + final_epsilon: 0.01 prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 + prioritized_replay_beta_annealing_timesteps: 2000000 num_gpus: 0.2 timesteps_per_iteration: 10000 diff --git a/rllib/tuned_examples/compact-regression-test.yaml b/rllib/tuned_examples/compact-regression-test.yaml index 0b29990e2..1a10e24b8 100644 --- a/rllib/tuned_examples/compact-regression-test.yaml +++ b/rllib/tuned_examples/compact-regression-test.yaml @@ -85,12 +85,12 @@ apex: adam_epsilon: .00015 hiddens: [512] buffer_size: 1000000 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.01 - exploration_fraction: .1 + exploration: + epsilon_timesteps: 200000 + final_epsilon: 0.01 prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 + prioritized_replay_beta_annealing_timesteps: 2000000 num_gpus: 1 num_workers: 8 num_envs_per_worker: 8 @@ -135,11 +135,11 @@ atari-basic-dqn: buffer_size: 1000000 sample_batch_size: 4 train_batch_size: 32 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.01 - exploration_fraction: .1 + exploration: + epsilon_timesteps: 200000 + final_epsilon: 0.01 prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 + prioritized_replay_beta_annealing_timesteps: 2000000 num_gpus: 0.2 timesteps_per_iteration: 10000 diff --git a/rllib/tuned_examples/pong-dqn.yaml b/rllib/tuned_examples/pong-dqn.yaml index 2c3e5a877..a9b709eed 100644 --- a/rllib/tuned_examples/pong-dqn.yaml +++ b/rllib/tuned_examples/pong-dqn.yaml @@ -13,9 +13,9 @@ pong-deterministic-dqn: buffer_size: 50000 sample_batch_size: 4 train_batch_size: 32 - schedule_max_timesteps: 2000000 - exploration_final_eps: .01 - exploration_fraction: .1 + exploration: + epsilon_timesteps: 200000 + final_epsilon: .01 model: grayscale: True zero_mean: False diff --git a/rllib/tuned_examples/pong-rainbow.yaml b/rllib/tuned_examples/pong-rainbow.yaml index 5226d57fe..e76694cd3 100644 --- a/rllib/tuned_examples/pong-rainbow.yaml +++ b/rllib/tuned_examples/pong-rainbow.yaml @@ -13,14 +13,14 @@ pong-deterministic-rainbow: buffer_size: 50000 sample_batch_size: 4 train_batch_size: 32 - schedule_max_timesteps: 2000000 - exploration_final_eps: 0.0 - exploration_fraction: .000001 + exploration: + epsilon_timesteps: 2 + final_epsilon: 0.0 target_network_update_freq: 500 prioritized_replay: True prioritized_replay_alpha: 0.5 - beta_annealing_fraction: 0.2 final_prioritized_replay_beta: 1.0 + prioritized_replay_beta_annealing_timesteps: 400000 n_step: 3 gpu: True model: diff --git a/rllib/utils/__init__.py b/rllib/utils/__init__.py index 1129dc662..cd16f8c81 100644 --- a/rllib/utils/__init__.py +++ b/rllib/utils/__init__.py @@ -2,7 +2,7 @@ from functools import partial from ray.rllib.utils.annotations import override, PublicAPI, DeveloperAPI from ray.rllib.utils.framework import try_import_tf, try_import_tfp, \ - try_import_torch + try_import_torch, check_framework from ray.rllib.utils.deprecation import deprecation_warning, renamed_agent, \ renamed_class, renamed_function from ray.rllib.utils.filter_manager import FilterManager @@ -59,6 +59,7 @@ force_tuple = partial(force_list, to_tuple=True) __all__ = [ "add_mixins", "check", + "check_framework", "deprecation_warning", "fc", "force_list", diff --git a/rllib/utils/deprecation.py b/rllib/utils/deprecation.py index 9f3accdfe..252a72a04 100644 --- a/rllib/utils/deprecation.py +++ b/rllib/utils/deprecation.py @@ -1,22 +1,28 @@ import logging - logger = logging.getLogger(__name__) -def deprecation_warning(old, new=None): +def deprecation_warning(old, new=None, error=None): """ - Logs a deprecation warning via the `logger` object. + Logs (via the `logger` object) or throws a deprecation warning/error. Args: old (str): A description of the "thing" that is to be deprecated. new (Optional[str]): A description of the new "thing" that replaces it. + error (Optional[bool,Exception]): Whether or which exception to throw. + If True, throw ValueError. """ - logger.warning( - "DeprecationWarning: `{}` has been deprecated.{}". - format(old, (" Use `{}` instead.".format(new) if new else "")) + - " This will raise an error in the future!" - ) + msg = "`{}` has been deprecated.{}".format( + old, (" Use `{}` instead.".format(new) if new else "")) + + if error is True: + raise ValueError(msg) + elif error and issubclass(error, Exception): + raise error(msg) + else: + logger.warning("DeprecationWarning: " + msg + + " This will raise an error in the future!") def renamed_class(cls, old_name): @@ -64,6 +70,6 @@ def renamed_function(func, old_name): def moved_function(func): new_location = func.__module__ - deprecation_warning("import {}".format(func.__name__), "import {}". - format(new_location)) + deprecation_warning("import {}".format(func.__name__), + "import {}".format(new_location)) return func diff --git a/rllib/utils/exploration/__init__.py b/rllib/utils/exploration/__init__.py new file mode 100644 index 000000000..5b89b9ae2 --- /dev/null +++ b/rllib/utils/exploration/__init__.py @@ -0,0 +1,6 @@ +from ray.rllib.utils.exploration.exploration import Exploration +from ray.rllib.utils.exploration.epsilon_greedy import EpsilonGreedy +from ray.rllib.utils.exploration.per_worker_epsilon_greedy import \ + PerWorkerEpsilonGreedy + +__all__ = ["Exploration", "EpsilonGreedy", "PerWorkerEpsilonGreedy"] diff --git a/rllib/utils/exploration/epsilon_greedy.py b/rllib/utils/exploration/epsilon_greedy.py new file mode 100644 index 000000000..1bef8f3aa --- /dev/null +++ b/rllib/utils/exploration/epsilon_greedy.py @@ -0,0 +1,199 @@ +import gym +import numpy as np + +from ray.rllib.utils.annotations import override +from ray.rllib.utils.exploration.exploration import Exploration +from ray.rllib.utils.framework import try_import_tf, try_import_torch, \ + get_variable +from ray.rllib.utils.schedules import PiecewiseSchedule + +tf = try_import_tf() +torch, _ = try_import_torch() + + +class EpsilonGreedy(Exploration): + """Epsilon-greedy Exploration class that produces exploration actions. + + When given a Model's output and a current epsilon value (based on some + Schedule), it produces a random action (if rand(1) < eps) or + uses the model-computed one (if rand(1) >= eps). + """ + + def __init__(self, + action_space, + initial_epsilon=1.0, + final_epsilon=0.05, + epsilon_timesteps=int(1e5), + num_workers=None, + worker_index=None, + epsilon_schedule=None, + framework="tf"): + """ + + Args: + action_space (Space): The gym action space used by the environment. + initial_epsilon (float): The initial epsilon value to use. + final_epsilon (float): The final epsilon value to use. + epsilon_timesteps (int): The time step after which epsilon should + always be `final_epsilon`. + num_workers (Optional[int]): The overall number of workers used. + worker_index (Optional[int]): The index of the Worker using this + Exploration. + epsilon_schedule (Optional[Schedule]): An optional Schedule object + to use (instead of constructing one from the given parameters). + framework (Optional[str]): One of None, "tf", "torch". + """ + # For now, require Discrete action space (may loosen this restriction + # in the future). + assert isinstance(action_space, gym.spaces.Discrete) + assert framework is not None + super().__init__( + action_space=action_space, + num_workers=num_workers, + worker_index=worker_index, + framework=framework) + + self.epsilon_schedule = epsilon_schedule or PiecewiseSchedule( + endpoints=[(0, initial_epsilon), + (epsilon_timesteps, final_epsilon)], + outside_value=final_epsilon, + framework=self.framework) + + # The current timestep value (tf-var or python int). + self.last_timestep = get_variable( + 0, framework=framework, tf_name="timestep") + + @override(Exploration) + def get_exploration_action(self, + action, + model=None, + action_dist=None, + explore=True, + timestep=None): + # TODO(sven): This is hardcoded. Put a meaningful error, in case model + # API is not as required. + if not hasattr(model, "q_value_head"): + return action + q_values = model.q_value_head(model.last_output()) + if isinstance(q_values, list): + q_values = q_values[0] + if self.framework == "tf": + return self._get_tf_exploration_action_op(action, explore, + timestep, q_values) + else: + return self._get_torch_exploration_action(action, explore, + timestep, q_values) + + def _get_tf_exploration_action_op(self, action, explore, timestep, + q_values): + """Tf method to produce the tf op for an epsilon exploration action. + + Args: + action (tf.Tensor): The already sampled action (non-exploratory + case) as tf op. + + Returns: + tf.Tensor: The tf exploration-action op. + """ + epsilon = tf.convert_to_tensor( + self.epsilon_schedule(timestep if timestep is not None else + self.last_timestep)) + + batch_size = tf.shape(action)[0] + + # Maske out actions with q-value=-inf so that we don't + # even consider them for exploration. + random_valid_action_logits = tf.where( + tf.equal(q_values, tf.float32.min), + tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values)) + random_actions = tf.squeeze( + tf.multinomial(random_valid_action_logits, 1), axis=1) + + chose_random = tf.random_uniform( + tf.stack([batch_size]), + minval=0, maxval=1, dtype=epsilon.dtype) \ + < epsilon + + exploration_action = tf.cond( + pred=tf.constant(explore, dtype=tf.bool) + if isinstance(explore, bool) else explore, + true_fn=lambda: tf.where(chose_random, random_actions, action), + false_fn=lambda: action, + ) + # Increment `last_timestep` by 1 (or set to `timestep`). + assign_op = \ + tf.assign_add(self.last_timestep, 1) if timestep is None else \ + tf.assign(self.last_timestep, timestep) + with tf.control_dependencies([assign_op]): + return exploration_action + + def _get_torch_exploration_action(self, action, explore, timestep, + q_values): + """Torch method to produce an epsilon exploration action. + + Args: + action (torch.Tensor): The already sampled action (non-exploratory + case). + + Returns: + torch.Tensor: The exploration-action. + """ + # Set last time step or (if not given) increase by one. + self.last_timestep = timestep if timestep is not None else \ + self.last_timestep + 1 + + if explore: + # Get the current epsilon. + epsilon = self.epsilon_schedule(self.last_timestep) + + batch_size = q_values.size()[0] + # Mask out actions, whose Q-values are -inf, so that we don't + # even consider them for exploration. + random_valid_action_logits = torch.where( + q_values == float("-inf"), + torch.ones_like(q_values) * float("-inf"), + torch.ones_like(q_values)) + + random_actions = torch.squeeze( + torch.multinomial(random_valid_action_logits, 1), axis=1) + + return torch.where( + torch.empty((batch_size, )).uniform_() < epsilon, + random_actions, action) + + # Return the original action. + else: + return action + + @override(Exploration) + def get_info(self): + """Returns the current epsilon value. + + Returns: + Union[float,tf.Tensor[float]]: The current epsilon value. + """ + return self.epsilon_schedule(self.last_timestep) + + @override(Exploration) + def get_state(self): + return [self.last_timestep] + + @override(Exploration) + def set_state(self, state): + if self.framework == "tf" and tf.executing_eagerly() is False: + update_op = tf.assign(self.last_timestep, state) + with tf.control_dependencies([update_op]): + return tf.no_op() + self.last_timestep = state + + @override(Exploration) + def reset_state(self): + return self.set_state(0) + + @classmethod + @override(Exploration) + def merge_states(cls, exploration_objects): + timesteps = [e.get_state() for e in exploration_objects] + if exploration_objects[0].framework == "tf": + return tf.reduce_sum(timesteps) + return np.sum(timesteps) diff --git a/rllib/utils/exploration/exploration.py b/rllib/utils/exploration/exploration.py new file mode 100644 index 000000000..a0823c65a --- /dev/null +++ b/rllib/utils/exploration/exploration.py @@ -0,0 +1,132 @@ +from ray.rllib.utils.framework import check_framework + + +class Exploration: + """Implements an env-exploration strategy for Policies. + + An Exploration takes the predicted actions or action values from the agent, + and selects the action to actually apply to the environment using some + predefined exploration schema. + """ + + def __init__(self, + action_space=None, + num_workers=None, + worker_index=None, + framework="tf"): + """ + Args: + action_space (Optional[gym.spaces.Space]): The action space in + which to explore. + num_workers (Optional[int]): The overall number of workers used. + worker_index (Optional[int]): The index of the Worker using this + Exploration. + framework (str): One of "tf" or "torch". + """ + self.action_space = action_space + self.num_workers = num_workers + self.worker_index = worker_index + self.framework = check_framework(framework) + + def get_exploration_action(self, + action, + model=None, + action_dist=None, + explore=True, + timestep=None): + """Returns an action for exploration purposes. + + Given the Model's output and action distribution, returns an + exploration action (as opposed to the original model calculated + action). + + Args: + action (any): The already sampled action (non-exploratory case). + model (ModelV2): The Model object. + action_dist: The ActionDistribution class. + explore (bool): Whether to explore or not (this could be a tf + placeholder). + timestep (int): The current sampling time step. If None, the + component should try to use an internal counter, which it + then increments by 1. If provided, will set the internal + counter to the given value. + + Returns: + any: The chosen exploration action or a tf-op to fetch the + exploration action from the graph. + """ + pass + + def get_loss_exploration_term(self, + model_output, + model=None, + action_dist=None, + action_sample=None): + """Returns an extra loss term to be added to a loss. + + Args: + model_output (any): The Model's output Tensor(s). + model (ModelV2): The Model object. + action_dist: The ActionDistribution object resulting from + `model_output`. TODO: Or the class? + action_sample (any): An optional action sample. + + Returns: + any: The extra loss term to add to the loss. + """ + pass # TODO(sven): implement for some example Exploration class. + + def get_info(self): + """Returns a description of the current exploration state. + + This is not necessarily the state itself (and cannot be used in + set_state!), but rather useful (e.g. debugging) information. + + Returns: + any: A description of the Exploration (not necessarily its state). + """ + return None + + def get_state(self): + """Returns the current exploration state. + + Returns: + List[any]: The current state (or a tf-op thereof). + """ + return [] + + def set_state(self, state): + """Sets the current state of the Exploration to the given value. + + Or returns a tf op that will do the set. + + Args: + state (List[any]): The new state to set. + + Returns: + Union[None,tf.op]: If framework=tf, the op that handles the update. + """ + pass + + def reset_state(self): + """Resets the exploration's state. + + Returns: + Union[None,tf.op]: If framework=tf, the op that handles the reset. + """ + pass + + @classmethod + def merge_states(cls, exploration_objects): + """Returns the merged states of all exploration_objects as a value. + + Or a tf.Tensor (whose execution will trigger the merge). + + Args: + exploration_objects (List[Exploration]): All Exploration objects, + whose states have to be merged somehow. + + Returns: + The merged value or a tf.op to execute. + """ + pass diff --git a/rllib/utils/exploration/per_worker_epsilon_greedy.py b/rllib/utils/exploration/per_worker_epsilon_greedy.py new file mode 100644 index 000000000..73a257a73 --- /dev/null +++ b/rllib/utils/exploration/per_worker_epsilon_greedy.py @@ -0,0 +1,52 @@ +from ray.rllib.utils.exploration.epsilon_greedy import EpsilonGreedy +from ray.rllib.utils.schedules import ConstantSchedule + + +class PerWorkerEpsilonGreedy(EpsilonGreedy): + """A per-worker epsilon-greedy class for distributed algorithms. + + Sets the epsilon schedules of individual workers to a constant: + 0.4 ^ (1 + [worker-index] / float([num-workers] - 1) * 7) + See Ape-X paper. + """ + + def __init__(self, + action_space, + initial_epsilon=1.0, + final_epsilon=0.1, + epsilon_timesteps=int(1e5), + num_workers=0, + worker_index=0, + framework="tf"): + """ + Args: + action_space (Space): The gym action space used by the environment. + initial_epsilon (float): The initial epsilon value to use. + final_epsilon (float): The final epsilon value to use. + epsilon_timesteps (int): The time step after which epsilon should + always be `final_epsilon`. + num_workers (Optional[int]): The overall number of workers used. + worker_index (Optional[int]): The index of the Worker using this + Exploration. + framework (Optional[str]): One of None, "tf", "torch". + """ + epsilon_schedule = None + # Use a fixed, different epsilon per worker. See: Ape-X paper. + if num_workers > 0: + if worker_index >= 0: + exponent = (1 + worker_index / float(num_workers - 1) * 7) + epsilon_schedule = ConstantSchedule(0.4**exponent) + # Local worker should have zero exploration so that eval + # rollouts run properly. + else: + epsilon_schedule = ConstantSchedule(0.0) + + super().__init__( + action_space=action_space, + initial_epsilon=initial_epsilon, + final_epsilon=final_epsilon, + epsilon_timesteps=epsilon_timesteps, + num_workers=num_workers, + worker_index=worker_index, + framework=framework, + epsilon_schedule=epsilon_schedule) diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index b63ee1ee8..89f9bc56f 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -16,9 +16,11 @@ def check_framework(framework="tf"): str: The input framework string. """ if framework == "tf": - try_import_tf(error=True) + if tf is None: + raise ImportError("Could not import tensorflow.") elif framework == "torch": - try_import_torch(error=True) + if torch is None: + raise ImportError("Could not import torch.") else: assert framework is None return framework @@ -32,6 +34,8 @@ def try_import_tf(error=False): Returns: The tf module (either from tf2.0.compat.v1 OR as tf1.x. """ + # TODO(sven): Make sure, these are reset after each test case + # that uses them. if "RLLIB_TEST_NO_TF_IMPORT" in os.environ: logger.warning("Not importing TensorFlow for test purposes") return None @@ -95,3 +99,27 @@ def try_import_torch(error=False): if error: raise e return None, None + + +def get_variable(value, framework="tf", tf_name="unnamed-variable"): + """ + Args: + value (any): The initial value to use. In the non-tf case, this will + be returned as is. + framework (str): One of "tf", "torch", or None. + tf_name (str): An optional name for the variable. Only for tf. + + Returns: + any: A framework-specific variable (tf.Variable or python primitive). + """ + if framework == "tf": + import tensorflow as tf + return tf.compat.v1.get_variable(tf_name, initializer=value) + # torch or None: Return python primitive. + return value + + +# This call should never happen inside a module's functions/classes +# as it would re-disable tf-eager. +tf = try_import_tf() +torch, _ = try_import_torch() diff --git a/rllib/utils/from_config.py b/rllib/utils/from_config.py index f6dacef2c..594f7b0f4 100644 --- a/rllib/utils/from_config.py +++ b/rllib/utils/from_config.py @@ -41,9 +41,9 @@ def from_config(cls, config=None, **kwargs): filename. Keyword Args: - kwargs (any): Optional possibility to pass the c'tor arguments in + kwargs (any): Optional possibility to pass the constructor arguments in here and use `config` as the type-only info. Then we can call - this like: from_config([type]?, [**kwargs for c'tor]) + this like: from_config([type]?, [**kwargs for constructor]) If `config` is already a dict, then `kwargs` will be merged with `config` (overwriting keys in `config`) after "type" has been popped out of `config`. @@ -76,7 +76,7 @@ def from_config(cls, config=None, **kwargs): ctor_kwargs = config # Give kwargs priority over things defined in config dict. # This way, one can pass a generic `spec` and then override single - # c'tor parameters via the kwargs in the call to `from_config`. + # constructor parameters via the kwargs in the call to `from_config`. ctor_kwargs.update(kwargs) else: type_ = config @@ -96,11 +96,12 @@ def from_config(cls, config=None, **kwargs): cls.__default_constructor__ is not None and \ ctor_args == [] and \ ( - not hasattr(cls.__bases__[0], "__default_constructor__") - or - cls.__bases__[0].__default_constructor__ is None or - cls.__bases__[0].__default_constructor__ is not - cls.__default_constructor__ + not hasattr(cls.__bases__[0], + "__default_constructor__") + or + cls.__bases__[0].__default_constructor__ is None or + cls.__bases__[0].__default_constructor__ is not + cls.__default_constructor__ ): constructor = cls.__default_constructor__ # Default constructor's keywords into ctor_kwargs. @@ -108,7 +109,7 @@ def from_config(cls, config=None, **kwargs): kwargs = merge_dicts(ctor_kwargs, constructor.keywords) constructor = partial(constructor.func, **kwargs) ctor_kwargs = {} # erase to avoid duplicate kwarg error - # No default constructor -> Try cls itself as c'tor. + # No default constructor -> Try cls itself as constructor. else: constructor = cls # Try the __type_registry__ of this class. @@ -140,6 +141,7 @@ def from_config(cls, config=None, **kwargs): else: return obj + # Test for absolute module.class specifier. if type_.find(".") != -1: module_name, function_name = type_.rsplit(".", 1) try: @@ -147,11 +149,27 @@ def from_config(cls, config=None, **kwargs): constructor = getattr(module, function_name) except (ModuleNotFoundError, ImportError): pass + # If constructor still not found, try attaching cls' module, + # then look for type_ in there. + if constructor is None: + try: + module = importlib.import_module(cls.__module__) + constructor = getattr(module, type_) + except (ModuleNotFoundError, ImportError, AttributeError): + # Try the package as well. + try: + package_name = importlib.import_module( + cls.__module__).__package__ + module = __import__(package_name, fromlist=[type_]) + constructor = getattr(module, type_) + except (ModuleNotFoundError, ImportError, AttributeError): + pass if constructor is None: raise ValueError( "String specifier ({}) in `from_config` must be a " - "filename, a module+class, or a key into " - "{}.__type_registry__!".format(type_, cls.__name__)) + "filename, a module+class, a class within '{}', or a key " + "into {}.__type_registry__!".format( + type_, cls.__module__, cls.__name__)) if not constructor: raise TypeError( @@ -201,14 +219,10 @@ def from_file(cls, filename, *args, **kwargs): def lookup_type(cls, type_): if cls is not None and hasattr(cls, "__type_registry__") and \ - isinstance(cls.__type_registry__, dict) and \ - ( - type_ in cls.__type_registry__ or ( - isinstance(type_, str) and - re.sub("[\\W_]", "", type_.lower()) - in cls.__type_registry__ - ) - ): + isinstance(cls.__type_registry__, dict) and ( + type_ in cls.__type_registry__ or ( + isinstance(type_, str) and + re.sub("[\\W_]", "", type_.lower()) in cls.__type_registry__)): available_class_for_type = cls.__type_registry__.get(type_) if available_class_for_type is None: available_class_for_type = \ diff --git a/rllib/utils/schedules/piecewise_schedule.py b/rllib/utils/schedules/piecewise_schedule.py index d03c3b0d1..0a194b4d7 100644 --- a/rllib/utils/schedules/piecewise_schedule.py +++ b/rllib/utils/schedules/piecewise_schedule.py @@ -19,7 +19,7 @@ class PiecewiseSchedule(Schedule): between two values. E.g. t=400 and endpoints=[(0, 20.0),(500, 30.0)] - output=20.0 + 0.8 * 10.0 = 28.0 + output=20.0 + 0.8 * (30.0 - 20.0) = 28.0 NOTE: All the values for time must be sorted in an increasing order. @@ -28,7 +28,7 @@ class PiecewiseSchedule(Schedule): (0.0=only left value, 1.0=only right value), which is the fraction of distance from left endpoint to right endpoint. - outside_value (Optional[float]): If t_pct in call to `value` is + outside_value (Optional[float]): If t in call to `value` is outside of all the intervals in `endpoints` this value is returned. If None then an AssertionError is raised when outside value is requested. diff --git a/rllib/utils/schedules/schedule.py b/rllib/utils/schedules/schedule.py index 48776d274..63e3a0f2b 100644 --- a/rllib/utils/schedules/schedule.py +++ b/rllib/utils/schedules/schedule.py @@ -23,9 +23,6 @@ class Schedule(metaclass=ABCMeta): """ def __init__(self, framework=None): - # TODO(sven): replace with .tf_value() / torch_value() methods that - # can be applied late binding, so no need to set framework during - # construction. self.framework = check_framework(framework) @abstractmethod diff --git a/rllib/utils/schedules/tests/test_schedules.py b/rllib/utils/schedules/tests/test_schedules.py index 5d6481202..e4e2a3ecf 100644 --- a/rllib/utils/schedules/tests/test_schedules.py +++ b/rllib/utils/schedules/tests/test_schedules.py @@ -11,7 +11,7 @@ tf = try_import_tf() class TestSchedules(unittest.TestCase): """ - Tests all time-step/time-percentage dependent Schedule classes. + Tests all time-step dependent Schedule classes. """ def test_constant_schedule(self): diff --git a/rllib/utils/tests/test_framework_agnostic_components.py b/rllib/utils/tests/test_framework_agnostic_components.py index 1cf7a1ea0..25252c52f 100644 --- a/rllib/utils/tests/test_framework_agnostic_components.py +++ b/rllib/utils/tests/test_framework_agnostic_components.py @@ -1,9 +1,11 @@ from abc import ABCMeta, abstractmethod +from gym.spaces import Discrete import unittest +from ray.rllib.utils.exploration.exploration import Exploration +from ray.rllib.utils.framework import try_import_tf, try_import_torch from ray.rllib.utils.from_config import from_config from ray.rllib.utils.test_utils import check -from ray.rllib.utils.framework import try_import_tf, try_import_torch tf = try_import_tf() tf.enable_eager_execution() @@ -55,6 +57,21 @@ class TestFrameWorkAgnosticComponents(unittest.TestCase): check(component.prop_d, 4) # default check(component.add(-1.1).numpy(), -2.1) # prop_b == -1.0 + # Test recognizing default module path. + component = from_config( + DummyComponent, '{"type": "NonAbstractChildOfDummyComponent", ' + '"prop_a": "A", "prop_b": -1.0, "prop_c": "non-default"}') + check(component.prop_a, "A") + check(component.prop_d, 4) # default + check(component.add(-1.1).numpy(), -2.1) # prop_b == -1.0 + + # Test recognizing default package path. + component = from_config(Exploration, { + "type": "EpsilonGreedy", + "action_space": Discrete(2) + }) + check(component.epsilon_schedule.outside_value, 0.05) # default + # Create torch Component from yaml-string. component = from_config( "type: ray.rllib.utils.tests." @@ -66,9 +83,9 @@ class TestFrameWorkAgnosticComponents(unittest.TestCase): class DummyComponent: - """ - A simple DummyComponent that can be used for testing framework-agnostic - logic. Implements a simple `add()` method for adding a value to + """A simple class that can be used for testing framework-agnostic logic. + + Implements a simple `add()` method for adding a value to `self.prop_b`. """ @@ -94,9 +111,12 @@ class DummyComponent: return tf.add(self.prop_b, value) +class NonAbstractChildOfDummyComponent(DummyComponent): + pass + + class AbstractDummyComponent(DummyComponent, metaclass=ABCMeta): - """ - Used for testing `from_config()`. + """Used for testing `from_config()`. """ @abstractmethod diff --git a/rllib/utils/torch_ops.py b/rllib/utils/torch_ops.py index 58edd9f91..981147290 100644 --- a/rllib/utils/torch_ops.py +++ b/rllib/utils/torch_ops.py @@ -7,12 +7,13 @@ def sequence_mask(lengths, maxlen, dtype=torch.bool): """ Exact same behavior as tf.sequence_mask. Thanks to Dimitris Papatheodorou - (https://discuss.pytorch.org/t/pytorch-equivalent-for-tf-sequence-mask/39036). + (https://discuss.pytorch.org/t/pytorch-equivalent-for-tf-sequence-mask/ + 39036). """ if maxlen is None: maxlen = lengths.max() - mask = ~(torch.ones((len(lengths), maxlen)).cumsum(dim=1).t() > lengths).\ + mask = ~(torch.ones((len(lengths), maxlen)).cumsum(dim=1).t() > lengths). \ t() mask.type(dtype)