From fcdf410ae1bb5071e7d92174eace7d79be2d4ef9 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Sat, 11 Jul 2020 22:06:35 +0200 Subject: [PATCH] [RLlib] Tf2.x native. (#8752) --- rllib/BUILD | 12 ++-- rllib/agents/a3c/tests/test_a2c.py | 2 +- rllib/agents/ddpg/ddpg_tf_policy.py | 8 +-- rllib/agents/ddpg/tests/test_apex_ddpg.py | 2 +- rllib/agents/ddpg/tests/test_ddpg.py | 8 ++- rllib/agents/dqn/common/__init__.py | 0 rllib/agents/dqn/dqn_tf_policy.py | 8 ++- rllib/agents/dqn/learner_thread.py | 6 ++ rllib/agents/dqn/tests/test_apex_dqn.py | 2 +- rllib/agents/dqn/tests/test_dqn.py | 9 ++- rllib/agents/impala/tests/test_impala.py | 2 +- rllib/agents/impala/vtrace_tf.py | 16 ++--- rllib/agents/impala/vtrace_tf_policy.py | 16 +++-- rllib/agents/marwil/marwil_tf_policy.py | 35 ++++++---- rllib/agents/pg/pg_tf_policy.py | 2 +- rllib/agents/pg/tests/test_pg.py | 10 +-- rllib/agents/ppo/ppo.py | 5 +- rllib/agents/ppo/ppo_tf_policy.py | 12 ++-- rllib/agents/ppo/tests/test_ppo.py | 8 +-- rllib/agents/sac/sac_tf_policy.py | 67 +++++++++++++------ rllib/agents/sac/tests/test_sac.py | 65 +++++++++++++++--- rllib/agents/trainer.py | 4 +- rllib/evaluation/rollout_worker.py | 4 +- rllib/execution/learner_thread.py | 6 ++ rllib/execution/train_ops.py | 20 ++++-- rllib/models/catalog.py | 2 +- rllib/models/preprocessors.py | 1 + rllib/policy/eager_tf_policy.py | 2 +- rllib/policy/tests/OBSOLETE_test_policy.py | 35 ---------- rllib/policy/tests/__init__.py | 0 rllib/policy/tf_policy.py | 10 +-- rllib/tests/run_regression_tests.py | 18 +++-- rllib/tests/test_catalog.py | 26 ++++--- rllib/tests/test_eager_support.py | 29 +++++++- rllib/tests/test_multi_agent_pendulum.py | 2 +- rllib/tests/test_supported_multi_agent.py | 5 +- rllib/tests/test_supported_spaces.py | 2 +- rllib/utils/exploration/epsilon_greedy.py | 8 ++- rllib/utils/exploration/gaussian_noise.py | 17 +++-- .../exploration/ornstein_uhlenbeck_noise.py | 21 ++++-- .../exploration/tests/test_explorations.py | 2 +- .../exploration/tests/test_parameter_noise.py | 2 +- rllib/utils/test_utils.py | 34 +++++++--- rllib/utils/tf_ops.py | 15 +++-- scripts | 1 - 45 files changed, 359 insertions(+), 202 deletions(-) delete mode 100644 rllib/agents/dqn/common/__init__.py delete mode 100644 rllib/policy/tests/OBSOLETE_test_policy.py delete mode 100644 rllib/policy/tests/__init__.py delete mode 120000 scripts diff --git a/rllib/BUILD b/rllib/BUILD index fae2838fb..db350f045 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -42,7 +42,7 @@ # A2C/A3C py_test( - name = "regression_test_a2c_cartpole_tf", + name = "run_regression_tests_cartpole_a2c_tf", main = "tests/run_regression_tests.py", tags = ["learning_tests_tf", "learning_tests_cartpole"], size = "medium", @@ -52,7 +52,7 @@ py_test( ) py_test( - name = "regression_test_a2c_cartpole_torch", + name = "run_regression_tests_cartpole_a2c_torch", main = "tests/run_regression_tests.py", tags = ["learning_tests_torch", "learning_tests_cartpole"], size = "medium", @@ -62,7 +62,7 @@ py_test( ) py_test( - name = "regression_test_a3c_cartpole_tf", + name = "run_regression_tests_cartpole_a3c_tf", main = "tests/run_regression_tests.py", tags = ["learning_tests_tf", "learning_tests_cartpole"], size = "medium", @@ -72,7 +72,7 @@ py_test( ) py_test( - name = "regression_test_a3c_cartpole_torch", + name = "run_regression_tests_cartpole_a3c_torch", main = "tests/run_regression_tests.py", tags = ["learning_tests_torch", "learning_tests_cartpole"], size = "medium", @@ -401,7 +401,7 @@ py_test( py_test( name = "test_apex_ddpg", tags = ["agents_dir"], - size = "small", + size = "medium", srcs = ["agents/ddpg/tests/test_apex_ddpg.py"] ) @@ -455,7 +455,7 @@ py_test( py_test( name = "test_impala", tags = ["agents_dir"], - size = "medium", + size = "large", srcs = ["agents/impala/tests/test_impala.py"] ) py_test( diff --git a/rllib/agents/a3c/tests/test_a2c.py b/rllib/agents/a3c/tests/test_a2c.py index 08abcee8a..9924755eb 100644 --- a/rllib/agents/a3c/tests/test_a2c.py +++ b/rllib/agents/a3c/tests/test_a2c.py @@ -25,7 +25,7 @@ class TestA2C(unittest.TestCase): # Test against all frameworks. for fw in framework_iterator(config): - config["sample_async"] = fw in ["tf", "tfe"] + config["sample_async"] = fw in ["tf", "tfe", "tf2"] for env in ["PongDeterministic-v0"]: trainer = a3c.A2CTrainer(config=config, env=env) for i in range(num_iterations): diff --git a/rllib/agents/ddpg/ddpg_tf_policy.py b/rllib/agents/ddpg/ddpg_tf_policy.py index dc42a2af2..71f715170 100644 --- a/rllib/agents/ddpg/ddpg_tf_policy.py +++ b/rllib/agents/ddpg/ddpg_tf_policy.py @@ -236,7 +236,7 @@ def ddpg_actor_critic_loss(policy, model, _, train_batch): def make_ddpg_optimizers(policy, config): # Create separate optimizers for actor & critic losses. - if tfv == 2 and config["framework"] == "tfe": + if policy.config["framework"] in ["tf2", "tfe"]: policy._actor_optimizer = tf.keras.optimizers.Adam( learning_rate=config["actor_lr"]) policy._critic_optimizer = tf.keras.optimizers.Adam( @@ -266,7 +266,7 @@ def build_apply_op(policy, optimizer, grads_and_vars): critic_op = policy._critic_optimizer.apply_gradients( policy._critic_grads_and_vars) # Increment global step & apply ops. - if tfv == 2 and policy.config["framework"] == "tfe": + if policy.config["framework"] in ["tf2", "tfe"]: policy.global_step.assign_add(1) return tf.no_op() else: @@ -275,7 +275,7 @@ def build_apply_op(policy, optimizer, grads_and_vars): def gradients_fn(policy, optimizer, loss): - if policy.config["framework"] == "tfe": + if policy.config["framework"] in ["tf2", "tfe"]: tape = optimizer.tape pol_weights = policy.model.policy_variables() actor_grads_and_vars = list(zip(tape.gradient( @@ -318,7 +318,7 @@ def build_ddpg_stats(policy, batch): def before_init_fn(policy, obs_space, action_space, config): # Create global step for counting the number of update operations. - if tfv == 2 and config["framework"] == "tfe": + if config["framework"] in ["tf2", "tfe"]: policy.global_step = get_variable(0, tf_name="global_step") else: policy.global_step = tf1.train.get_or_create_global_step() diff --git a/rllib/agents/ddpg/tests/test_apex_ddpg.py b/rllib/agents/ddpg/tests/test_apex_ddpg.py index 7df44c1ad..d0b215054 100644 --- a/rllib/agents/ddpg/tests/test_apex_ddpg.py +++ b/rllib/agents/ddpg/tests/test_apex_ddpg.py @@ -24,7 +24,7 @@ class TestApexDDPG(unittest.TestCase): config["learning_starts"] = 0 config["optimizer"]["num_replay_buffer_shards"] = 1 num_iterations = 1 - for _ in framework_iterator(config, frameworks=("tf", "torch")): + for _ in framework_iterator(config): plain_config = config.copy() trainer = apex_ddpg.ApexDDPGTrainer( config=plain_config, env="Pendulum-v0") diff --git a/rllib/agents/ddpg/tests/test_ddpg.py b/rllib/agents/ddpg/tests/test_ddpg.py index a3a1180d4..49c52e942 100644 --- a/rllib/agents/ddpg/tests/test_ddpg.py +++ b/rllib/agents/ddpg/tests/test_ddpg.py @@ -9,12 +9,13 @@ from ray.rllib.agents.ddpg.ddpg_torch_policy import ddpg_actor_critic_loss as \ from ray.rllib.agents.sac.tests.test_sac import SimpleEnv from ray.rllib.execution.replay_buffer import LocalReplayBuffer from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.framework import try_import_torch +from ray.rllib.utils.framework import try_import_tf, try_import_torch from ray.rllib.utils.numpy import fc, huber_loss, l2_loss, relu, sigmoid from ray.rllib.utils.test_utils import check, check_compute_single_action, \ framework_iterator from ray.rllib.utils.torch_ops import convert_to_torch_tensor +tf1, tf, tfv = try_import_tf() torch, _ = try_import_torch() @@ -404,14 +405,15 @@ class TestDDPG(unittest.TestCase): policy_t = sigmoid(2.0 * fc( relu( fc(model_out_t, weights[ks[1]], weights[ks[0]], framework=fw)), - weights[ks[5]], weights[ks[4]])) + weights[ks[5]], weights[ks[4]], framework=fw)) # Get policy output for t+1 (target model). policy_tp1 = sigmoid(2.0 * fc( relu( fc(target_model_out_tp1, weights[ks[3]], weights[ks[2]], - framework=fw)), weights[ks[7]], weights[ks[6]])) + framework=fw)), + weights[ks[7]], weights[ks[6]], framework=fw)) # Assume no smooth target policy. policy_tp1_smoothed = policy_tp1 diff --git a/rllib/agents/dqn/common/__init__.py b/rllib/agents/dqn/common/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/rllib/agents/dqn/dqn_tf_policy.py b/rllib/agents/dqn/dqn_tf_policy.py index 7faaf6da9..982b66dcd 100644 --- a/rllib/agents/dqn/dqn_tf_policy.py +++ b/rllib/agents/dqn/dqn_tf_policy.py @@ -253,8 +253,12 @@ def build_q_losses(policy, model, _, train_batch): def adam_optimizer(policy, config): - return tf1.train.AdamOptimizer( - learning_rate=policy.cur_lr, epsilon=config["adam_epsilon"]) + if policy.config["framework"] in ["tf2", "tfe"]: + return tf.keras.optimizers.Adam( + learning_rate=policy.cur_lr, epsilon=config["adam_epsilon"]) + else: + return tf1.train.AdamOptimizer( + learning_rate=policy.cur_lr, epsilon=config["adam_epsilon"]) def clip_gradients(policy, optimizer, loss): diff --git a/rllib/agents/dqn/learner_thread.py b/rllib/agents/dqn/learner_thread.py index 306b75fa4..57d73aa7a 100644 --- a/rllib/agents/dqn/learner_thread.py +++ b/rllib/agents/dqn/learner_thread.py @@ -3,11 +3,14 @@ from six.moves import queue from ray.rllib.evaluation.metrics import get_learner_stats from ray.rllib.policy.policy import LEARNER_STATS_KEY +from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.timer import TimerStat from ray.rllib.utils.window_stat import WindowStat LEARNER_QUEUE_MAX_SIZE = 16 +tf1, tf, tfv = try_import_tf() + class LearnerThread(threading.Thread): """Background thread that updates the local model from replay data. @@ -33,6 +36,9 @@ class LearnerThread(threading.Thread): self.stats = {} def run(self): + # Switch on eager mode if configured. + if self.local_worker.policy_config.get("framework") in ["tf2", "tfe"]: + tf1.enable_eager_execution() while not self.stopped: self.step() diff --git a/rllib/agents/dqn/tests/test_apex_dqn.py b/rllib/agents/dqn/tests/test_apex_dqn.py index bb4fc9d4a..bf0e818bc 100644 --- a/rllib/agents/dqn/tests/test_apex_dqn.py +++ b/rllib/agents/dqn/tests/test_apex_dqn.py @@ -22,7 +22,7 @@ class TestApexDQN(unittest.TestCase): config["timesteps_per_iteration"] = 100 config["min_iter_time_s"] = 1 config["optimizer"]["num_replay_buffer_shards"] = 1 - for _ in framework_iterator(config, frameworks=("torch", "tf")): + for _ in framework_iterator(config): trainer = apex.ApexTrainer(config=config, env="CartPole-v0") trainer.train() trainer.stop() diff --git a/rllib/agents/dqn/tests/test_dqn.py b/rllib/agents/dqn/tests/test_dqn.py index 287b46c77..87db3eaa8 100644 --- a/rllib/agents/dqn/tests/test_dqn.py +++ b/rllib/agents/dqn/tests/test_dqn.py @@ -3,12 +3,9 @@ import unittest import ray import ray.rllib.agents.dqn as dqn -from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.test_utils import check, check_compute_single_action, \ framework_iterator -tf1, tf, tfv = try_import_tf() - class TestDQN(unittest.TestCase): @classmethod @@ -34,6 +31,7 @@ class TestDQN(unittest.TestCase): print(results) check_compute_single_action(trainer) + trainer.stop() # Rainbow. # TODO(sven): Add torch once DQN-torch supports distributional-Q. @@ -51,6 +49,7 @@ class TestDQN(unittest.TestCase): print(results) check_compute_single_action(trainer) + trainer.stop() def test_dqn_exploration_and_soft_q_config(self): """Tests, whether a DQN Agent outputs exploration/softmaxed actions.""" @@ -73,6 +72,7 @@ class TestDQN(unittest.TestCase): for _ in range(50): actions.append(trainer.compute_action(obs)) check(np.std(actions), 0.0, false=True) + trainer.stop() # Low softmax temperature. Behaves like argmax # (but no epsilon exploration). @@ -86,6 +86,7 @@ class TestDQN(unittest.TestCase): for _ in range(50): actions.append(trainer.compute_action(obs)) check(np.std(actions), 0.0, decimals=3) + trainer.stop() # Higher softmax temperature. config["exploration_config"]["temperature"] = 1.0 @@ -104,6 +105,7 @@ class TestDQN(unittest.TestCase): for _ in range(300): actions.append(trainer.compute_action(obs)) check(np.std(actions), 0.0, false=True) + trainer.stop() # With Random exploration. config["exploration_config"] = {"type": "Random"} @@ -113,6 +115,7 @@ class TestDQN(unittest.TestCase): for _ in range(300): actions.append(trainer.compute_action(obs)) check(np.std(actions), 0.0, false=True) + trainer.stop() if __name__ == "__main__": diff --git a/rllib/agents/impala/tests/test_impala.py b/rllib/agents/impala/tests/test_impala.py index e9885ac66..c8bcef886 100644 --- a/rllib/agents/impala/tests/test_impala.py +++ b/rllib/agents/impala/tests/test_impala.py @@ -23,7 +23,7 @@ class TestIMPALA(unittest.TestCase): config = impala.DEFAULT_CONFIG.copy() num_iterations = 1 - for _ in framework_iterator(config, frameworks=("tf", "torch")): + for _ in framework_iterator(config): local_cfg = config.copy() for env in ["Pendulum-v0", "CartPole-v0"]: print("Env={}".format(env)) diff --git a/rllib/agents/impala/vtrace_tf.py b/rllib/agents/impala/vtrace_tf.py index fb612c57e..ab184a236 100644 --- a/rllib/agents/impala/vtrace_tf.py +++ b/rllib/agents/impala/vtrace_tf.py @@ -222,12 +222,10 @@ def multi_from_logits(behaviour_policy_logits, behaviour_policy_logits[i].shape.assert_has_rank(3) target_policy_logits[i].shape.assert_has_rank(3) - with tf1.name_scope( - name, - values=[ - behaviour_policy_logits, target_policy_logits, actions, - discounts, rewards, values, bootstrap_value - ]): + with tf1.name_scope(name, values=[ + behaviour_policy_logits, target_policy_logits, actions, + discounts, rewards, values, bootstrap_value + ]): target_action_log_probs = multi_log_probs_from_logits_and_actions( target_policy_logits, actions, dist_class, model) @@ -332,9 +330,9 @@ def from_importance_weights(log_rhos, if clip_pg_rho_threshold is not None: clip_pg_rho_threshold.shape.assert_has_rank(0) - with tf1.name_scope( - name, - values=[log_rhos, discounts, rewards, values, bootstrap_value]): + with tf1.name_scope(name, values=[ + log_rhos, discounts, rewards, values, bootstrap_value + ]): rhos = tf.math.exp(log_rhos) if clip_rho_threshold is not None: clipped_rhos = tf.minimum( diff --git a/rllib/agents/impala/vtrace_tf_policy.py b/rllib/agents/impala/vtrace_tf_policy.py index 0237772bc..a6e11a940 100644 --- a/rllib/agents/impala/vtrace_tf_policy.py +++ b/rllib/agents/impala/vtrace_tf_policy.py @@ -253,11 +253,19 @@ def postprocess_trajectory(policy, def choose_optimizer(policy, config): if policy.config["opt_type"] == "adam": - return tf1.train.AdamOptimizer(policy.cur_lr) + if policy.config["framework"] in ["tf2", "tfe"]: + return tf.keras.optimizers.Adam(policy.cur_lr) + else: + return tf1.train.AdamOptimizer(policy.cur_lr) else: - return tf1.train.RMSPropOptimizer( - policy.cur_lr, - config["decay"], config["momentum"], config["epsilon"]) + if tfv == 2: + return tf.keras.optimizers.RMSprop( + policy.cur_lr, config["decay"], config["momentum"], + config["epsilon"]) + else: + return tf1.train.RMSPropOptimizer( + policy.cur_lr, config["decay"], config["momentum"], + config["epsilon"]) def clip_gradients(policy, optimizer, loss): diff --git a/rllib/agents/marwil/marwil_tf_policy.py b/rllib/agents/marwil/marwil_tf_policy.py index cb00f88c0..a148025a1 100644 --- a/rllib/agents/marwil/marwil_tf_policy.py +++ b/rllib/agents/marwil/marwil_tf_policy.py @@ -3,7 +3,7 @@ from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.evaluation.postprocessing import compute_advantages, \ Postprocessing from ray.rllib.policy.tf_policy_template import build_tf_policy -from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.framework import try_import_tf, get_variable from ray.rllib.utils.tf_ops import explained_variance, make_tf_callable tf1, tf, tfv = try_import_tf() @@ -36,16 +36,27 @@ class ReweightedImitationLoss: action_dist, beta): # advantage estimation adv = cumulative_rewards - state_values + # update averaged advantage norm - update_adv_norm = tf1.assign_add( - ref=policy._ma_adv_norm, - value=1e-6 * ( + if policy.config["framework"] in ["tf2", "tfe"]: + policy._ma_adv_norm.assign_add( + 1e-6 * (tf.reduce_mean( + tf.math.square(adv)) - policy._ma_adv_norm)) + # Exponentially weighted advantages. + exp_advs = tf.math.exp( + beta * tf.math.divide( + adv, 1e-8 + tf.math.sqrt(policy._ma_adv_norm))) + else: + update_adv_norm = tf1.assign_add( + ref=policy._ma_adv_norm, + value=1e-6 * ( tf.reduce_mean(tf.math.square(adv)) - policy._ma_adv_norm)) - # exponentially weighted advantages - with tf1.control_dependencies([update_adv_norm]): - exp_advs = tf.math.exp(beta * tf.math.divide( - adv, 1e-8 + tf.math.sqrt(policy._ma_adv_norm))) + # exponentially weighted advantages + with tf1.control_dependencies([update_adv_norm]): + exp_advs = tf.math.exp( + beta * tf.math.divide( + adv, 1e-8 + tf.math.sqrt(policy._ma_adv_norm))) # log\pi_\theta(a|s) logprobs = action_dist.logp(actions) @@ -125,10 +136,10 @@ def setup_mixins(policy, obs_space, action_space, config): ValueNetworkMixin.__init__(policy) # Set up a tf-var for the moving avg (do this here to make it work with # eager mode). - policy._ma_adv_norm = tf1.get_variable( - name="moving_average_of_advantage_norm", - dtype=tf.float32, - initializer=100.0, + policy._ma_adv_norm = get_variable( + 100.0, + framework="tf", + tf_name="moving_average_of_advantage_norm", trainable=False) diff --git a/rllib/agents/pg/pg_tf_policy.py b/rllib/agents/pg/pg_tf_policy.py index 88ccc2ac7..944556281 100644 --- a/rllib/agents/pg/pg_tf_policy.py +++ b/rllib/agents/pg/pg_tf_policy.py @@ -27,7 +27,7 @@ def pg_tf_loss(policy, model, dist_class, train_batch): action_dist = dist_class(logits, model) return -tf.reduce_mean( action_dist.logp(train_batch[SampleBatch.ACTIONS]) * - train_batch[Postprocessing.ADVANTAGES]) + tf.cast(train_batch[Postprocessing.ADVANTAGES], dtype=tf.float32)) PGTFPolicy = build_tf_policy( diff --git a/rllib/agents/pg/tests/test_pg.py b/rllib/agents/pg/tests/test_pg.py index fbd154cad..3f4cb23cb 100644 --- a/rllib/agents/pg/tests/test_pg.py +++ b/rllib/agents/pg/tests/test_pg.py @@ -27,7 +27,7 @@ class TestPG(unittest.TestCase): for _ in framework_iterator(config): trainer = pg.PGTrainer(config=config, env="CartPole-v0") for i in range(num_iterations): - trainer.train() + print(trainer.train()) check_compute_single_action( trainer, include_prev_action_reward=True) @@ -56,7 +56,7 @@ class TestPG(unittest.TestCase): trainer = pg.PGTrainer(config=config, env="CartPole-v0") policy = trainer.get_policy() vars = policy.model.trainable_variables() - if fw == "tf": + if sess: vars = policy.get_session().run(vars) # Post-process (calculate simple (non-GAE) advantages) and attach @@ -71,13 +71,15 @@ class TestPG(unittest.TestCase): check(train_batch[Postprocessing.ADVANTAGES], [2.9701, 1.99, 1.0]) # Actual loss results. - if fw == "tf": + if sess: results = policy.get_session().run( policy._loss, feed_dict=policy._get_loss_inputs_dict( train_batch, shuffle=False)) else: - results = (pg.pg_tf_loss if fw == "tfe" else pg.pg_torch_loss)( + results = ( + pg.pg_tf_loss if fw in ["tf2", "tfe"] else pg.pg_torch_loss + )( policy, policy.model, dist_class=dist_cls, diff --git a/rllib/agents/ppo/ppo.py b/rllib/agents/ppo/ppo.py index 62aa1ba7b..80fba85aa 100644 --- a/rllib/agents/ppo/ppo.py +++ b/rllib/agents/ppo/ppo.py @@ -134,7 +134,7 @@ def validate_config(config): "Using the simple minibatch optimizer. This will significantly " "reduce performance, consider simple_optimizer=False.") # Multi-gpu not supported for PyTorch and tf-eager. - elif config["framework"] in ["tfe", "torch"]: + elif config["framework"] in ["tf2", "tfe", "torch"]: config["simple_optimizer"] = True @@ -192,7 +192,8 @@ def execution_plan(workers, config): num_envs_per_worker=config["num_envs_per_worker"], train_batch_size=config["train_batch_size"], shuffle_sequences=config["shuffle_sequences"], - _fake_gpus=config["_fake_gpus"])) + _fake_gpus=config["_fake_gpus"], + framework=config.get("framework"))) # Update KL after each round of training. train_op = train_op.for_each(lambda t: t[1]).for_each(UpdateKL(workers)) diff --git a/rllib/agents/ppo/ppo_tf_policy.py b/rllib/agents/ppo/ppo_tf_policy.py index f5af4281f..01218e5d5 100644 --- a/rllib/agents/ppo/ppo_tf_policy.py +++ b/rllib/agents/ppo/ppo_tf_policy.py @@ -7,7 +7,7 @@ from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy import LearningRateSchedule, \ EntropyCoeffSchedule from ray.rllib.policy.tf_policy_template import build_tf_policy -from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.framework import try_import_tf, get_variable from ray.rllib.utils.tf_ops import explained_variance, make_tf_callable tf1, tf, tfv = try_import_tf() @@ -206,12 +206,10 @@ class KLCoeffMixin: # KL Coefficient self.kl_coeff_val = config["kl_coeff"] self.kl_target = config["kl_target"] - self.kl_coeff = tf1.get_variable( - initializer=tf.constant_initializer(self.kl_coeff_val), - name="kl_coeff", - shape=(), - trainable=False, - dtype=tf.float32) + self.kl_coeff = get_variable( + float(self.kl_coeff_val), + tf_name="kl_coeff", + trainable=False) def update_kl(self, sampled_kl): if sampled_kl > 2.0 * self.kl_target: diff --git a/rllib/agents/ppo/tests/test_ppo.py b/rllib/agents/ppo/tests/test_ppo.py index b1dec4e5a..13aa9a5d5 100644 --- a/rllib/agents/ppo/tests/test_ppo.py +++ b/rllib/agents/ppo/tests/test_ppo.py @@ -93,10 +93,10 @@ class TestPPO(unittest.TestCase): learnt = False for i in range(num_iterations): results = trainer.train() + print(results) if results["episode_reward_mean"] > 150: learnt = True break - print(results) assert learnt, "PPO multi-GPU (with fake-GPUs) did not learn CartPole!" trainer.stop() @@ -180,7 +180,7 @@ class TestPPO(unittest.TestCase): init_std = get_value() assert init_std == 0.0, init_std - if fw in ["tf", "tfe"]: + if fw in ["tf2", "tf", "tfe"]: batch = postprocess_ppo_gae_tf(policy, FAKE_BATCH) else: batch = postprocess_ppo_gae_torch(policy, FAKE_BATCH) @@ -222,7 +222,7 @@ class TestPPO(unittest.TestCase): # to train_batch dict. # A = [0.99^2 * 0.5 + 0.99 * -1.0 + 1.0, 0.99 * 0.5 - 1.0, 0.5] = # [0.50005, -0.505, 0.5] - if fw == "tf" or fw == "tfe": + if fw in ["tf2", "tf", "tfe"]: train_batch = postprocess_ppo_gae_tf(policy, FAKE_BATCH) else: train_batch = postprocess_ppo_gae_torch(policy, FAKE_BATCH) @@ -233,7 +233,7 @@ class TestPPO(unittest.TestCase): [0.50005, -0.505, 0.5]) # Calculate actual PPO loss. - if fw == "tfe": + if fw in ["tf2", "tfe"]: ppo_surrogate_loss_tf(policy, policy.model, Categorical, train_batch) elif fw == "torch": diff --git a/rllib/agents/sac/sac_tf_policy.py b/rllib/agents/sac/sac_tf_policy.py index 84ddf6153..3c52eba99 100644 --- a/rllib/agents/sac/sac_tf_policy.py +++ b/rllib/agents/sac/sac_tf_policy.py @@ -14,7 +14,8 @@ from ray.rllib.models.tf.tf_action_dist import Beta, Categorical, \ from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy.tf_policy_template import build_tf_policy from ray.rllib.utils.error import UnsupportedSpaceException -from ray.rllib.utils.framework import try_import_tf, try_import_tfp +from ray.rllib.utils.framework import get_variable, try_import_tf, \ + try_import_tfp tf1, tf, tfv = try_import_tf() tfp = try_import_tfp() @@ -277,7 +278,7 @@ def sac_actor_critic_loss(policy, model, _, train_batch): def gradients_fn(policy, optimizer, loss): # Eager: Use GradientTape. - if policy.config["framework"] == "tfe": + if policy.config["framework"] in ["tf2", "tfe"]: tape = optimizer.tape pol_weights = policy.model.policy_variables() actor_grads_and_vars = list(zip(tape.gradient( @@ -355,10 +356,14 @@ def apply_gradients(policy, optimizer, grads_and_vars): policy._critic_optimizer[0].apply_gradients(cgrads) ] - alpha_apply_ops = policy._alpha_optimizer.apply_gradients( - policy._alpha_grads_and_vars, - global_step=tf1.train.get_or_create_global_step()) - return tf.group([actor_apply_ops, alpha_apply_ops] + critic_apply_ops) + if policy.config["framework"] in ["tf2", "tfe"]: + policy._alpha_optimizer.apply_gradients(policy._alpha_grads_and_vars) + return + else: + alpha_apply_ops = policy._alpha_optimizer.apply_gradients( + policy._alpha_grads_and_vars, + global_step=tf1.train.get_or_create_global_step()) + return tf.group([actor_apply_ops, alpha_apply_ops] + critic_apply_ops) def stats(policy, train_batch): @@ -379,22 +384,40 @@ def stats(policy, train_batch): class ActorCriticOptimizerMixin: def __init__(self, config): - # create global step for counting the number of update operations - self.global_step = tf1.train.get_or_create_global_step() - - # use separate optimizers for actor & critic - self._actor_optimizer = tf1.train.AdamOptimizer( - learning_rate=config["optimization"]["actor_learning_rate"]) - self._critic_optimizer = [ - tf1.train.AdamOptimizer( - learning_rate=config["optimization"]["critic_learning_rate"]) - ] - if config["twin_q"]: - self._critic_optimizer.append( - tf1.train.AdamOptimizer(learning_rate=config["optimization"][ - "critic_learning_rate"])) - self._alpha_optimizer = tf1.train.AdamOptimizer( - learning_rate=config["optimization"]["entropy_learning_rate"]) + # - Create global step for counting the number of update operations. + # - Use separate optimizers for actor & critic. + if config["framework"] in ["tf2", "tfe"]: + self.global_step = get_variable(0, tf_name="global_step") + self._actor_optimizer = tf.keras.optimizers.Adam( + learning_rate=config["optimization"]["actor_learning_rate"]) + self._critic_optimizer = [ + tf.keras.optimizers.Adam( + learning_rate=config["optimization"][ + "critic_learning_rate"]) + ] + if config["twin_q"]: + self._critic_optimizer.append( + tf.keras.optimizers.Adam( + learning_rate=config["optimization"][ + "critic_learning_rate"])) + self._alpha_optimizer = tf.keras.optimizers.Adam( + learning_rate=config["optimization"]["entropy_learning_rate"]) + else: + self.global_step = tf1.train.get_or_create_global_step() + self._actor_optimizer = tf1.train.AdamOptimizer( + learning_rate=config["optimization"]["actor_learning_rate"]) + self._critic_optimizer = [ + tf1.train.AdamOptimizer( + learning_rate=config["optimization"][ + "critic_learning_rate"]) + ] + if config["twin_q"]: + self._critic_optimizer.append( + tf1.train.AdamOptimizer( + learning_rate=config["optimization"][ + "critic_learning_rate"])) + self._alpha_optimizer = tf1.train.AdamOptimizer( + learning_rate=config["optimization"]["entropy_learning_rate"]) def setup_early_mixins(policy, obs_space, action_space, config): diff --git a/rllib/agents/sac/tests/test_sac.py b/rllib/agents/sac/tests/test_sac.py index 785167e31..542b55482 100644 --- a/rllib/agents/sac/tests/test_sac.py +++ b/rllib/agents/sac/tests/test_sac.py @@ -5,18 +5,20 @@ import re import unittest import ray.rllib.agents.sac as sac +from ray.rllib.agents.sac.sac_tf_policy import sac_actor_critic_loss as tf_loss from ray.rllib.agents.sac.sac_torch_policy import actor_critic_loss as \ loss_torch from ray.rllib.models.tf.tf_action_dist import SquashedGaussian from ray.rllib.models.torch.torch_action_dist import TorchSquashedGaussian from ray.rllib.execution.replay_buffer import LocalReplayBuffer from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.framework import try_import_torch +from ray.rllib.utils.framework import try_import_tf, try_import_torch from ray.rllib.utils.numpy import fc, relu from ray.rllib.utils.test_utils import check, check_compute_single_action, \ framework_iterator from ray.rllib.utils.torch_ops import convert_to_torch_tensor +tf1, tf, tfv = try_import_tf() torch, _ = try_import_torch() @@ -166,6 +168,10 @@ class TestSAC(unittest.TestCase): if weights_dict is None: assert fw in ["tf", "tfe"] # Start with the tf vars-dict. weights_dict = policy.get_weights() + if fw == "tfe": + log_alpha = weights_dict[10] + weights_dict = self._translate_tfe_weights( + weights_dict, map_) else: assert fw == "torch" # Then transfer that to torch Model. model_dict = self._translate_weights_to_torch( @@ -213,6 +219,16 @@ class TestSAC(unittest.TestCase): tf_a_grads = [g for g, v in tf_a_grads] tf_e_grads = [g for g, v in tf_e_grads] + elif fw == "tfe": + with tf.GradientTape() as tape: + tf_loss(policy, policy.model, None, input_) + c, a, e, t = policy.critic_loss, policy.actor_loss, \ + policy.alpha_loss, policy.td_error + vars = tape.watched_variables() + tf_c_grads = tape.gradient(c[0], vars[6:10]) + tf_a_grads = tape.gradient(a, vars[2:6]) + tf_e_grads = tape.gradient(e, vars[10]) + elif fw == "torch": loss_torch(policy, policy.model, None, input_) c, a, e, t = policy.critic_loss, policy.actor_loss, \ @@ -447,15 +463,27 @@ class TestSAC(unittest.TestCase): # Target q network evaluation. # target_model.get_q_values - q_tp1 = fc( - relu( - fc(np.concatenate([target_model_out_tp1, policy_tp1], -1), - weights[ks[15]], - weights[ks[14]], - framework=fw)), - weights[ks[17]], - weights[ks[16]], - framework=fw) + if fw == "tf": + q_tp1 = fc( + relu( + fc(np.concatenate([target_model_out_tp1, policy_tp1], -1), + weights[ks[15]], + weights[ks[14]], + framework=fw)), + weights[ks[17]], + weights[ks[16]], + framework=fw) + else: + assert fw == "tfe" + q_tp1 = fc( + relu( + fc(np.concatenate([target_model_out_tp1, policy_tp1], -1), + weights[ks[7]], + weights[ks[6]], + framework=fw)), + weights[ks[9]], + weights[ks[8]], + framework=fw) q_t_selected = np.squeeze(q_t, axis=-1) q_tp1 -= alpha * log_pis_tp1 @@ -487,6 +515,23 @@ class TestSAC(unittest.TestCase): } return model_dict + def _translate_tfe_weights(self, weights_dict, map_): + model_dict = { + "default_policy/log_alpha": None, + "default_policy/log_alpha_target": None, + "default_policy/sequential/action_1/kernel": weights_dict[2], + "default_policy/sequential/action_1/bias": weights_dict[3], + "default_policy/sequential/action_out/kernel": weights_dict[4], + "default_policy/sequential/action_out/bias": weights_dict[5], + "default_policy/sequential_1/q_hidden_0/kernel": weights_dict[6], + "default_policy/sequential_1/q_hidden_0/bias": weights_dict[7], + "default_policy/sequential_1/q_out/kernel": weights_dict[8], + "default_policy/sequential_1/q_out/bias": weights_dict[9], + "default_policy/value_out/kernel": weights_dict[0], + "default_policy/value_out/bias": weights_dict[1], + } + return model_dict + if __name__ == "__main__": import pytest diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 0ada135db..31688cd37 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -582,7 +582,9 @@ class Trainer(Trainable): self.config.pop("eager") # Enable eager/tracing support. - if tf1 and self.config["framework"] == "tfe": + if tf1 and self.config["framework"] in ["tf2", "tfe"]: + if self.config["framework"] == "tf2" and tfv < 2: + raise ValueError("`framework`=tf2, but tf-version is < 2.0!") if not tf1.executing_eagerly(): tf1.enable_eager_execution() logger.info("Executing eagerly, with eager_tracing={}".format( diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 157249976..561e67668 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -283,7 +283,7 @@ class RolloutWorker(ParallelIteratorWorker): ParallelIteratorWorker.__init__(self, gen_rollouts, False) policy_config: TrainerConfigDict = policy_config or {} - if (tf1 and policy_config.get("framework") == "tfe" + if (tf1 and policy_config.get("framework") in ["tf2", "tfe"] and not policy_config.get("no_eager_on_workers") # This eager check is necessary for certain all-framework tests # that use tf's eager_mode() context generator. @@ -959,7 +959,7 @@ class RolloutWorker(ParallelIteratorWorker): if tf1 and tf1.executing_eagerly(): if hasattr(cls, "as_eager"): cls = cls.as_eager() - if policy_config["eager_tracing"]: + if policy_config.get("eager_tracing"): cls = cls.with_tracing() elif not issubclass(cls, TFPolicy): pass # could be some other type of policy diff --git a/rllib/execution/learner_thread.py b/rllib/execution/learner_thread.py index 34ac936e8..dd9b4f469 100644 --- a/rllib/execution/learner_thread.py +++ b/rllib/execution/learner_thread.py @@ -5,9 +5,12 @@ from six.moves import queue from ray.rllib.evaluation.metrics import get_learner_stats from ray.rllib.execution.minibatch_buffer import MinibatchBuffer +from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.timer import TimerStat from ray.rllib.utils.window_stat import WindowStat +tf1, tf, tfv = try_import_tf() + class LearnerThread(threading.Thread): """Background thread that updates the local model from sample trajectories. @@ -55,6 +58,9 @@ class LearnerThread(threading.Thread): self.num_steps = 0 def run(self): + # Switch on eager mode if configured. + if self.local_worker.policy_config.get("framework") in ["tf2", "tfe"]: + tf1.enable_eager_execution() while not self.stopped: self.step() diff --git a/rllib/execution/train_ops.py b/rllib/execution/train_ops.py index 05cb942c7..5a3fd0cf0 100644 --- a/rllib/execution/train_ops.py +++ b/rllib/execution/train_ops.py @@ -107,12 +107,14 @@ class TrainTFMultiGPU: train_batch_size: int, shuffle_sequences: bool, policies: List[PolicyID] = frozenset([]), - _fake_gpus: bool = False): + _fake_gpus: bool = False, + framework: str = "tf"): self.workers = workers self.policies = policies or workers.local_worker().policies_to_train self.num_sgd_iter = num_sgd_iter self.sgd_minibatch_size = sgd_minibatch_size self.shuffle_sequences = shuffle_sequences + self.framework = framework # Collect actual devices to use. if not num_gpus: @@ -136,8 +138,10 @@ class TrainTFMultiGPU: with self.workers.local_worker().tf_sess.graph.as_default(): with self.workers.local_worker().tf_sess.as_default(): for policy_id in self.policies: - policy = self.workers.local_worker().get_policy(policy_id) - with tf1.variable_scope(policy_id, reuse=tf1.AUTO_REUSE): + policy = self.workers.local_worker().get_policy( + policy_id) + with tf1.variable_scope( + policy_id, reuse=tf1.AUTO_REUSE): if policy._state_inputs: rnn_inputs = policy._state_inputs + [ policy._seq_lens @@ -146,10 +150,12 @@ class TrainTFMultiGPU: rnn_inputs = [] self.optimizers[policy_id] = ( LocalSyncParallelOptimizer( - policy._optimizer, self.devices, - [v - for _, v in policy._loss_inputs], rnn_inputs, - self.per_device_batch_size, policy.copy)) + policy._optimizer, + self.devices, + [v for _, v in policy._loss_inputs], + rnn_inputs, + self.per_device_batch_size, + policy.copy)) self.sess = self.workers.local_worker().tf_sess self.sess.run(tf1.global_variables_initializer()) diff --git a/rllib/models/catalog.py b/rllib/models/catalog.py index d6ed0882b..425bd623e 100644 --- a/rllib/models/catalog.py +++ b/rllib/models/catalog.py @@ -370,7 +370,7 @@ class ModelCatalog: "used, however you specified a custom model {}".format( model_cls)) - if framework in ["tf", "tfe"]: + if framework in ["tf", "tfe", "tf2"]: v2_class = None # Try to get a default v2 model. if not model_config.get("custom_model"): diff --git a/rllib/models/preprocessors.py b/rllib/models/preprocessors.py index 0b1a30e36..7c67a34cd 100644 --- a/rllib/models/preprocessors.py +++ b/rllib/models/preprocessors.py @@ -57,6 +57,7 @@ class Preprocessor: observation = np.array(observation) try: if not self._obs_space.contains(observation): + print() raise ValueError( "Observation outside expected value range", self._obs_space, observation) diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index 3ef4a1aa0..a8bf265e2 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -266,7 +266,7 @@ def build_eager_tf_policy(name, if optimizer_fn: self._optimizer = optimizer_fn(self, config) else: - self._optimizer = tf1.train.AdamOptimizer(config["lr"]) + self._optimizer = tf.keras.optimizers.Adam(config["lr"]) if after_init: after_init(self, observation_space, action_space, config) diff --git a/rllib/policy/tests/OBSOLETE_test_policy.py b/rllib/policy/tests/OBSOLETE_test_policy.py deleted file mode 100644 index 89f68d5dc..000000000 --- a/rllib/policy/tests/OBSOLETE_test_policy.py +++ /dev/null @@ -1,35 +0,0 @@ -import numpy as np -import random - -from ray.rllib.policy.policy import Policy -from ray.rllib.utils.annotations import override - - -class TestPolicy(Policy): - """A dummy Policy that returns a random (batched) int for compute_actions. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.exploration = self._create_exploration() - - @override(Policy) - def compute_actions(self, - obs_batch, - state_batches=None, - prev_action_batch=None, - prev_reward_batch=None, - episodes=None, - explore=None, - timestep=None, - **kwargs): - return np.array([random.choice([0, 1])] * len(obs_batch)), [], {} - - @override(Policy) - def compute_log_likelihoods(self, - actions, - obs_batch, - state_batches=None, - prev_action_batch=None, - prev_reward_batch=None): - return np.array([random.random()] * len(obs_batch)) diff --git a/rllib/policy/tests/__init__.py b/rllib/policy/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index c813a65f5..84154a43f 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -14,7 +14,7 @@ from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.models.modelv2 import ModelV2 from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.debug import summarize -from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.framework import try_import_tf, get_variable from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule from ray.rllib.utils.tf_run_builder import TFRunBuilder from ray.rllib.utils.types import ModelGradients, TensorType, TrainerConfigDict @@ -816,7 +816,8 @@ class LearningRateSchedule: @DeveloperAPI def __init__(self, lr, lr_schedule): - self.cur_lr = tf1.get_variable("lr", initializer=lr, trainable=False) + self.cur_lr = tf1.get_variable( + "lr", initializer=lr, trainable=False) if lr_schedule is None: self.lr_schedule = ConstantSchedule(lr, framework=None) else: @@ -841,8 +842,9 @@ class EntropyCoeffSchedule: @DeveloperAPI def __init__(self, entropy_coeff, entropy_coeff_schedule): - self.entropy_coeff = tf1.get_variable( - "entropy_coeff", initializer=entropy_coeff, trainable=False) + self.entropy_coeff = get_variable( + entropy_coeff, framework="tf", tf_name="entropy_coeff", + trainable=False) if entropy_coeff_schedule is None: self.entropy_coeff_schedule = ConstantSchedule( diff --git a/rllib/tests/run_regression_tests.py b/rllib/tests/run_regression_tests.py index 6d8ddd9da..e633be6cd 100644 --- a/rllib/tests/run_regression_tests.py +++ b/rllib/tests/run_regression_tests.py @@ -39,17 +39,23 @@ parser.add_argument( if __name__ == "__main__": args = parser.parse_args() - # Bazel regression test mode: Get path to look for yaml files from argv[2]. + # Bazel regression test mode: Get path to look for yaml files. # Get the path or single file to use. rllib_dir = Path(__file__).parent.parent print("rllib dir={}".format(rllib_dir)) - if not os.path.isdir(os.path.join(rllib_dir, args.yaml_dir)): + abs_yaml_path = os.path.join(rllib_dir, args.yaml_dir) + # Single file given. + if os.path.isfile(abs_yaml_path): + yaml_files = [abs_yaml_path] + # Given path/file does not exist. + elif not os.path.isdir(abs_yaml_path): raise ValueError("yaml-dir ({}) not found!".format(args.yaml_dir)) - - yaml_files = rllib_dir.rglob(args.yaml_dir + "/*.yaml") - yaml_files = sorted( - map(lambda path: str(path.absolute()), yaml_files), reverse=True) + # Path given -> Get all yaml files in there via rglob. + else: + yaml_files = rllib_dir.rglob(args.yaml_dir + "/*.yaml") + yaml_files = sorted( + map(lambda path: str(path.absolute()), yaml_files), reverse=True) print("Will run the following regression tests:") for yaml_file in yaml_files: diff --git a/rllib/tests/test_catalog.py b/rllib/tests/test_catalog.py index 32bfec194..6298bf20d 100644 --- a/rllib/tests/test_catalog.py +++ b/rllib/tests/test_catalog.py @@ -102,21 +102,19 @@ class ModelCatalogTest(unittest.TestCase): def test_default_models(self): ray.init(object_store_memory=1000 * 1024 * 1024) - with tf1.variable_scope("test1"): - p1 = ModelCatalog.get_model_v2( - obs_space=Box(0, 1, shape=(3,), dtype=np.float32), - action_space=Discrete(5), - num_outputs=5, - model_config={}) - self.assertEqual(type(p1), FullyConnectedNetwork) + p1 = ModelCatalog.get_model_v2( + obs_space=Box(0, 1, shape=(3, ), dtype=np.float32), + action_space=Discrete(5), + num_outputs=5, + model_config={}) + self.assertEqual(type(p1), FullyConnectedNetwork) - with tf1.variable_scope("test2"): - p2 = ModelCatalog.get_model_v2( - obs_space=Box(0, 1, shape=(84, 84, 3), dtype=np.float32), - action_space=Discrete(5), - num_outputs=5, - model_config={}) - self.assertEqual(type(p2), VisionNetwork) + p2 = ModelCatalog.get_model_v2( + obs_space=Box(0, 1, shape=(84, 84, 3), dtype=np.float32), + action_space=Discrete(5), + num_outputs=5, + model_config={}) + self.assertEqual(type(p2), VisionNetwork) def test_custom_model(self): ray.init(object_store_memory=1000 * 1024 * 1024) diff --git a/rllib/tests/test_eager_support.py b/rllib/tests/test_eager_support.py index 7bda4a003..55b7cf97c 100644 --- a/rllib/tests/test_eager_support.py +++ b/rllib/tests/test_eager_support.py @@ -3,6 +3,9 @@ import unittest import ray from ray import tune from ray.rllib.agents.registry import get_agent_class +from ray.rllib.utils.framework import try_import_tf + +tf1, tf, tfv = try_import_tf() def check_support(alg, config, test_eager=False, test_trace=True): @@ -40,6 +43,22 @@ class TestEagerSupportPG(unittest.TestCase): def tearDown(self): ray.shutdown() + def test_simple_q(self): + check_support("SimpleQ", {"num_workers": 0, "learning_starts": 0}) + + def test_dqn(self): + check_support("DQN", {"num_workers": 0, "learning_starts": 0}) + + def test_ddpg(self): + check_support("DDPG", {"num_workers": 0}) + + # TODO(sven): Add these once APEX_DDPG supports eager. + # def test_apex_ddpg(self): + # check_support("APEX_DDPG", {"num_workers": 1}) + + def test_td3(self): + check_support("TD3", {"num_workers": 0}) + def test_a2c(self): check_support("A2C", {"num_workers": 0}) @@ -100,10 +119,16 @@ class TestEagerSupportOffPolicy(unittest.TestCase): if __name__ == "__main__": - import pytest import sys + # Don't test anything for version 2.x (all tests are eager anyways). + # TODO: (sven) remove entire file in the future. + if tfv == 2: + print("\tskip due to tf==2.x") + sys.exit(0) + # One can specify the specific TestCase class to run. # None for all unittest.TestCase classes in this file. - class_ = sys.argv[1] if len(sys.argv) > 0 else None + import pytest + class_ = sys.argv[1] if len(sys.argv) > 1 else None sys.exit(pytest.main( ["-v", __file__ + ("" if class_ is None else "::" + class_)])) diff --git a/rllib/tests/test_multi_agent_pendulum.py b/rllib/tests/test_multi_agent_pendulum.py index 0cc3fba7b..7afa9f624 100644 --- a/rllib/tests/test_multi_agent_pendulum.py +++ b/rllib/tests/test_multi_agent_pendulum.py @@ -46,7 +46,7 @@ class TestMultiAgentPendulum(unittest.TestCase): "framework": fw, }, } - }) + }, verbose=1) if trials[0].last_result["episode_reward_mean"] < -300.0: raise ValueError("Did not get to -200 reward", trials[0].last_result) diff --git a/rllib/tests/test_supported_multi_agent.py b/rllib/tests/test_supported_multi_agent.py index abc5f1cf7..be4681a19 100644 --- a/rllib/tests/test_supported_multi_agent.py +++ b/rllib/tests/test_supported_multi_agent.py @@ -15,7 +15,8 @@ def check_support_multiagent(alg, config): lambda _: MultiAgentCartPole({"num_agents": 2})) config["log_level"] = "ERROR" for fw in framework_iterator(config): - if fw == "tfe" and alg in ["A3C", "APEX", "APEX_DDPG", "IMPALA"]: + if fw in ["tf2", "tfe"] and \ + alg in ["A3C", "APEX", "APEX_DDPG", "IMPALA"]: continue if alg in ["DDPG", "APEX_DDPG", "SAC"]: a = get_agent_class(alg)( @@ -123,6 +124,6 @@ if __name__ == "__main__": import sys # One can specify the specific TestCase class to run. # None for all unittest.TestCase classes in this file. - class_ = sys.argv[1] if len(sys.argv) > 0 else None + class_ = sys.argv[1] if len(sys.argv) > 1 else None sys.exit(pytest.main( ["-v", __file__ + ("" if class_ is None else "::" + class_)])) diff --git a/rllib/tests/test_supported_spaces.py b/rllib/tests/test_supported_spaces.py index fe541ef24..0dbfddefe 100644 --- a/rllib/tests/test_supported_spaces.py +++ b/rllib/tests/test_supported_spaces.py @@ -202,6 +202,6 @@ if __name__ == "__main__": # One can specify the specific TestCase class to run. # None for all unittest.TestCase classes in this file. - class_ = sys.argv[1] if len(sys.argv) > 0 else None + class_ = sys.argv[1] if len(sys.argv) > 1 else None sys.exit(pytest.main( ["-v", __file__ + ("" if class_ is None else "::" + class_)])) diff --git a/rllib/utils/exploration/epsilon_greedy.py b/rllib/utils/exploration/epsilon_greedy.py index 77c13ea67..8323f2c08 100644 --- a/rllib/utils/exploration/epsilon_greedy.py +++ b/rllib/utils/exploration/epsilon_greedy.py @@ -111,9 +111,13 @@ class EpsilonGreedy(Exploration): ), false_fn=lambda: exploit_action) - assign_op = tf1.assign(self.last_timestep, timestep) - with tf1.control_dependencies([assign_op]): + if self.framework in ["tf2", "tfe"]: + self.last_timestep = timestep return action, tf.zeros_like(action, dtype=tf.float32) + else: + assign_op = tf1.assign(self.last_timestep, timestep) + with tf1.control_dependencies([assign_op]): + return action, tf.zeros_like(action, dtype=tf.float32) def _get_torch_exploration_action(self, q_values, explore, timestep): """Torch method to produce an epsilon exploration action. diff --git a/rllib/utils/exploration/gaussian_noise.py b/rllib/utils/exploration/gaussian_noise.py index c2b8f674f..ebfd36f32 100644 --- a/rllib/utils/exploration/gaussian_noise.py +++ b/rllib/utils/exploration/gaussian_noise.py @@ -72,7 +72,7 @@ class GaussianNoise(Exploration): 0, framework=self.framework, tf_name="timestep") # Build the tf-info-op. - if self.framework == "tf": + if self.framework in ["tf", "tfe"]: self._tf_info_op = self.get_info() @override(Exploration) @@ -123,11 +123,18 @@ class GaussianNoise(Exploration): logp = tf.zeros(shape=(batch_size,), dtype=tf.float32) # Increment `last_timestep` by 1 (or set to `timestep`). - assign_op = ( - tf1.assign_add(self.last_timestep, 1) if timestep is None else - tf1.assign(self.last_timestep, timestep)) - with tf1.control_dependencies([assign_op]): + if self.framework in ["tf2", "tfe"]: + if timestep is None: + self.last_timestep.assign_add(1) + else: + self.last_timestep.assign(timestep) return action, logp + else: + assign_op = ( + tf1.assign_add(self.last_timestep, 1) if timestep is None else + tf1.assign(self.last_timestep, timestep)) + with tf1.control_dependencies([assign_op]): + return action, logp def _get_torch_exploration_action(self, action_dist, explore, timestep): # Set last timestep or (if not given) increase by one. diff --git a/rllib/utils/exploration/ornstein_uhlenbeck_noise.py b/rllib/utils/exploration/ornstein_uhlenbeck_noise.py index 227a74990..47e69140c 100644 --- a/rllib/utils/exploration/ornstein_uhlenbeck_noise.py +++ b/rllib/utils/exploration/ornstein_uhlenbeck_noise.py @@ -95,7 +95,11 @@ class OrnsteinUhlenbeckNoise(GaussianNoise): shape=[self.action_space.low.size], stddev=self.stddev) ou_new = self.ou_theta * -self.ou_state + \ self.ou_sigma * gaussian_sample - ou_state_new = tf1.assign_add(self.ou_state, ou_new) + if self.framework in ["tf2", "tfe"]: + self.ou_state.assign_add(ou_new) + ou_state_new = self.ou_state + else: + ou_state_new = tf1.assign_add(self.ou_state, ou_new) high_m_low = self.action_space.high - self.action_space.low high_m_low = tf.where( tf.math.is_inf(high_m_low), tf.ones_like(high_m_low), high_m_low) @@ -125,11 +129,18 @@ class OrnsteinUhlenbeckNoise(GaussianNoise): logp = tf.zeros(shape=(batch_size,), dtype=tf.float32) # Increment `last_timestep` by 1 (or set to `timestep`). - assign_op = ( - tf1.assign_add(self.last_timestep, 1) if timestep is None else - tf1.assign(self.last_timestep, timestep)) - with tf1.control_dependencies([assign_op, ou_state_new]): + if self.framework in ["tf2", "tfe"]: + if timestep is None: + self.last_timestep.assign_add(1) + else: + self.last_timestep = timestep return action, logp + else: + assign_op = ( + tf1.assign_add(self.last_timestep, 1) if timestep is None else + tf1.assign(self.last_timestep, timestep)) + with tf1.control_dependencies([assign_op, ou_state_new]): + return action, logp @override(GaussianNoise) def _get_torch_exploration_action(self, action_dist, explore, timestep): diff --git a/rllib/utils/exploration/tests/test_explorations.py b/rllib/utils/exploration/tests/test_explorations.py index 8ca6c3d8d..99520b57d 100644 --- a/rllib/utils/exploration/tests/test_explorations.py +++ b/rllib/utils/exploration/tests/test_explorations.py @@ -27,7 +27,7 @@ def do_test_explorations(run, core_config["num_workers"] = 0 # Test all frameworks. - for fw in framework_iterator(core_config): + for _ in framework_iterator(core_config): print("Agent={}".format(run)) # Test for both the default Agent's exploration AND the `Random` diff --git a/rllib/utils/exploration/tests/test_parameter_noise.py b/rllib/utils/exploration/tests/test_parameter_noise.py index 8483d56f7..9d7e7bcb4 100644 --- a/rllib/utils/exploration/tests/test_parameter_noise.py +++ b/rllib/utils/exploration/tests/test_parameter_noise.py @@ -189,7 +189,7 @@ class TestParameterNoise(unittest.TestCase): def _get_current_weight(self, policy, fw): weights = policy.get_weights() - key = 0 if fw == "tfe" else list(weights.keys())[0] + key = 0 if fw in ["tf2", "tfe"] else list(weights.keys())[0] return weights[key][0][0] diff --git a/rllib/utils/test_utils.py b/rllib/utils/test_utils.py index 444381c65..9826c6a0a 100644 --- a/rllib/utils/test_utils.py +++ b/rllib/utils/test_utils.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) def framework_iterator(config=None, - frameworks=("tf", "tfe", "torch"), + frameworks=("tf2", "tf", "tfe", "torch"), session=False): """An generator that allows for looping through n frameworks for testing. @@ -29,18 +29,23 @@ def framework_iterator(config=None, config (Optional[dict]): An optional config dict to alter in place depending on the iteration. frameworks (Tuple[str]): A list/tuple of the frameworks to be tested. - Allowed are: "tf", "tfe", "torch", and None. + Allowed are: "tf2", "tf", "tfe", "torch", and None. session (bool): If True and only in the tf-case: Enter a tf.Session() and yield that as second return value (otherwise yield (fw, None)). Yields: str: If enter_session is False: - The current framework ("tf", "tfe", "torch") used. + The current framework ("tf2", "tf", "tfe", "torch") used. Tuple(str, Union[None,tf.Session]: If enter_session is True: A tuple of the current fw and the tf.Session if fw="tf". """ config = config or {} - frameworks = [frameworks] if isinstance(frameworks, str) else frameworks + frameworks = [frameworks] if isinstance(frameworks, str) else \ + list(frameworks) + + # Both tf2 and tfe present -> remove "tfe" or "tf2" depending on version. + if "tf2" in frameworks and "tfe" in frameworks: + frameworks.remove("tfe" if tfv == 2 else "tf2") for fw in frameworks: # Skip non-installed frameworks. @@ -53,10 +58,14 @@ def framework_iterator(config=None, "installed)!".format(fw)) continue elif fw == "tfe" and not eager_mode: - logger.warning("framework_iterator skipping eager (could not " + logger.warning("framework_iterator skipping tf-eager (could not " "import `eager_mode` from tensorflow.python)!") continue - assert fw in ["tf", "tfe", "torch", None] + elif fw == "tf2" and tfv != 2: + logger.warning( + "framework_iterator skipping tf2.x (tf version is < 2.0)!") + continue + assert fw in ["tf2", "tf", "tfe", "torch", None] # Do we need a test session? sess = None @@ -69,10 +78,12 @@ def framework_iterator(config=None, config["framework"] = fw eager_ctx = None - if fw == "tfe": + # Enable eager mode for tf2 and tfe. + if fw in ["tf2", "tfe"]: eager_ctx = eager_mode() eager_ctx.__enter__() assert tf1.executing_eagerly() + # Make sure, eager mode is off. elif fw == "tf": assert not tf1.executing_eagerly() @@ -169,8 +180,13 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): if tf1 is not None: # y should never be a Tensor (y=expected value). if isinstance(y, tf1.Tensor): - raise ValueError("`y` (expected value) must not be a Tensor. " - "Use numpy.ndarray instead") + # In eager mode, numpyize tensors. + if tf.executing_eagerly(): + y = y.numpy() + else: + raise ValueError( + "`y` (expected value) must not be a Tensor. " + "Use numpy.ndarray instead") if isinstance(x, tf1.Tensor): # In eager mode, numpyize tensors. if tf1.executing_eagerly(): diff --git a/rllib/utils/tf_ops.py b/rllib/utils/tf_ops.py index c6d55fa0f..9db6a145d 100644 --- a/rllib/utils/tf_ops.py +++ b/rllib/utils/tf_ops.py @@ -32,11 +32,18 @@ def minimize_and_clip(optimizer, objective, var_list, clip_val=10.0): # Accidentally passing values < 0.0 will break all gradients. assert clip_val > 0.0, clip_val - gradients = optimizer.compute_gradients(objective, var_list=var_list) - for i, (grad, var) in enumerate(gradients): + if tf.executing_eagerly(): + tape = optimizer.tape + grads_and_vars = list(zip(list( + tape.gradient(objective, var_list)), var_list)) + else: + grads_and_vars = optimizer.compute_gradients( + objective, var_list=var_list) + + for i, (grad, var) in enumerate(grads_and_vars): if grad is not None: - gradients[i] = (tf.clip_by_norm(grad, clip_val), var) - return gradients + grads_and_vars[i] = (tf.clip_by_norm(grad, clip_val), var) + return grads_and_vars def make_tf_callable(session_or_none, dynamic_shape=False): diff --git a/scripts b/scripts deleted file mode 120000 index 8f67c5cc2..000000000 --- a/scripts +++ /dev/null @@ -1 +0,0 @@ -ci/travis \ No newline at end of file