From 7ec2223c843e923bc002d11e72dcff72ab3149d3 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Sun, 26 Apr 2020 23:08:13 +0200 Subject: [PATCH] [RLlib] DDPG PyTorch actor-model was missing sigmoid layer (#8188) Fix DDPG PyTorch (missing sigmoid layer (to squash action outputs) after deterministic action outputs). --- rllib/agents/ddpg/ddpg_tf_policy.py | 4 +- rllib/agents/ddpg/ddpg_torch_model.py | 16 + rllib/agents/ddpg/ddpg_torch_policy.py | 7 +- rllib/agents/ddpg/tests/test_ddpg.py | 420 ++++++++++++++++++++++++- rllib/agents/sac/tests/test_sac.py | 2 +- rllib/utils/numpy.py | 26 +- 6 files changed, 462 insertions(+), 13 deletions(-) diff --git a/rllib/agents/ddpg/ddpg_tf_policy.py b/rllib/agents/ddpg/ddpg_tf_policy.py index fe2cfec00..207cd4c4b 100644 --- a/rllib/agents/ddpg/ddpg_tf_policy.py +++ b/rllib/agents/ddpg/ddpg_tf_policy.py @@ -215,8 +215,8 @@ def ddpg_actor_critic_loss(policy, model, _, train_batch): twin_td_error = twin_q_t_selected - q_t_selected_target td_error = td_error + twin_td_error if use_huber: - errors = huber_loss(td_error, huber_threshold) \ - + huber_loss(twin_td_error, huber_threshold) + errors = huber_loss(td_error, huber_threshold) + \ + huber_loss(twin_td_error, huber_threshold) else: errors = 0.5 * tf.square(td_error) + 0.5 * tf.square(twin_td_error) else: diff --git a/rllib/agents/ddpg/ddpg_torch_model.py b/rllib/agents/ddpg/ddpg_torch_model.py index dcfb066c0..f993e5171 100644 --- a/rllib/agents/ddpg/ddpg_torch_model.py +++ b/rllib/agents/ddpg/ddpg_torch_model.py @@ -79,6 +79,22 @@ class DDPGTorchModel(TorchModelV2, nn.Module): initializer=torch.nn.init.xavier_uniform_, activation_fn=None)) + # Use sigmoid to scale to [0,1], but also double magnitude of input to + # emulate behaviour of tanh activation used in DDPG and TD3 papers. + class _Lambda(nn.Module): + def forward(self, x): + sigmoid_out = nn.Sigmoid()(2.0 * x) + # Rescale to actual env policy scale + # (shape of sigmoid_out is [batch_size, dim_actions], + # so we reshape to get same dims) + action_range = (action_space.high - action_space.low)[None] + low_action = action_space.low[None] + actions = torch.from_numpy(action_range) * sigmoid_out + \ + torch.from_numpy(low_action) + return actions + + self.policy_model.add_module("action_out_squashed", _Lambda()) + # Build the Q-net(s), including target Q-net(s). def build_q_net(name_): activation = get_activation_fn( diff --git a/rllib/agents/ddpg/ddpg_torch_policy.py b/rllib/agents/ddpg/ddpg_torch_policy.py index ab7cbeb46..519365f0e 100644 --- a/rllib/agents/ddpg/ddpg_torch_policy.py +++ b/rllib/agents/ddpg/ddpg_torch_policy.py @@ -163,9 +163,12 @@ def ddpg_actor_critic_loss(policy, model, _, train_batch): def make_ddpg_optimizers(policy, config): # Create separate optimizers for actor & critic losses. policy._actor_optimizer = torch.optim.Adam( - params=policy.model.policy_variables(), lr=config["actor_lr"]) + params=policy.model.policy_variables(), + lr=config["actor_lr"], + eps=1e-7) # to match tf.keras.optimizers.Adam's epsilon default policy._critic_optimizer = torch.optim.Adam( - params=policy.model.q_variables(), lr=config["critic_lr"]) + params=policy.model.q_variables(), lr=config["critic_lr"], + eps=1e-7) # to match tf.keras.optimizers.Adam's epsilon default return policy._actor_optimizer, policy._critic_optimizer diff --git a/rllib/agents/ddpg/tests/test_ddpg.py b/rllib/agents/ddpg/tests/test_ddpg.py index e16009cf7..6b35eceb6 100644 --- a/rllib/agents/ddpg/tests/test_ddpg.py +++ b/rllib/agents/ddpg/tests/test_ddpg.py @@ -1,11 +1,19 @@ import numpy as np +import re import unittest import ray.rllib.agents.ddpg as ddpg -from ray.rllib.utils.framework import try_import_tf +from ray.rllib.agents.ddpg.ddpg_torch_policy import ddpg_actor_critic_loss as \ + loss_torch +from ray.rllib.agents.sac.tests.test_sac import SimpleEnv +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.framework import try_import_tf, try_import_torch +from ray.rllib.utils.numpy import fc, huber_loss, l2_loss, relu, sigmoid from ray.rllib.utils.test_utils import check, framework_iterator +from ray.rllib.utils.torch_ops import convert_to_torch_tensor tf = try_import_tf() +torch, _ = try_import_torch() class TestDDPG(unittest.TestCase): @@ -74,6 +82,416 @@ class TestDDPG(unittest.TestCase): a = trainer.compute_action(obs, explore=False) check(a, deterministic_action) + def test_ddpg_loss_function(self): + """Tests DDPG loss function results across all frameworks.""" + config = ddpg.DEFAULT_CONFIG.copy() + # Run locally. + config["num_workers"] = 0 + config["learning_starts"] = 0 + config["twin_q"] = True + config["use_huber"] = True + config["huber_threshold"] = 1.0 + config["gamma"] = 0.99 + # Make this small (seems to introduce errors). + config["l2_reg"] = 1e-10 + config["prioritized_replay"] = False + # Use very simple nets. + config["actor_hiddens"] = [10] + config["critic_hiddens"] = [10] + # Make sure, timing differences do not affect trainer.train(). + config["min_iter_time_s"] = 0 + config["timesteps_per_iteration"] = 100 + + map_ = { + # Normal net. + "default_policy/actor_hidden_0/kernel": "policy_model.action_0." + "_model.0.weight", + "default_policy/actor_hidden_0/bias": "policy_model.action_0." + "_model.0.bias", + "default_policy/actor_out/kernel": "policy_model.action_out." + "_model.0.weight", + "default_policy/actor_out/bias": "policy_model.action_out." + "_model.0.bias", + "default_policy/sequential/q_hidden_0/kernel": "q_model.q_hidden_0" + "._model.0.weight", + "default_policy/sequential/q_hidden_0/bias": "q_model.q_hidden_0." + "_model.0.bias", + "default_policy/sequential/q_out/kernel": "q_model.q_out._model." + "0.weight", + "default_policy/sequential/q_out/bias": "q_model.q_out._model." + "0.bias", + # -- twin. + "default_policy/sequential_1/twin_q_hidden_0/kernel": "twin_" + "q_model.twin_q_hidden_0._model.0.weight", + "default_policy/sequential_1/twin_q_hidden_0/bias": "twin_" + "q_model.twin_q_hidden_0._model.0.bias", + "default_policy/sequential_1/twin_q_out/kernel": "twin_" + "q_model.twin_q_out._model.0.weight", + "default_policy/sequential_1/twin_q_out/bias": "twin_" + "q_model.twin_q_out._model.0.bias", + # Target net. + "default_policy/actor_hidden_0_1/kernel": "policy_model.action_0." + "_model.0.weight", + "default_policy/actor_hidden_0_1/bias": "policy_model.action_0." + "_model.0.bias", + "default_policy/actor_out_1/kernel": "policy_model.action_out." + "_model.0.weight", + "default_policy/actor_out_1/bias": "policy_model.action_out._model" + ".0.bias", + "default_policy/sequential_2/q_hidden_0/kernel": "q_model." + "q_hidden_0._model.0.weight", + "default_policy/sequential_2/q_hidden_0/bias": "q_model." + "q_hidden_0._model.0.bias", + "default_policy/sequential_2/q_out/kernel": "q_model." + "q_out._model.0.weight", + "default_policy/sequential_2/q_out/bias": "q_model." + "q_out._model.0.bias", + # -- twin. + "default_policy/sequential_3/twin_q_hidden_0/kernel": "twin_" + "q_model.twin_q_hidden_0._model.0.weight", + "default_policy/sequential_3/twin_q_hidden_0/bias": "twin_" + "q_model.twin_q_hidden_0._model.0.bias", + "default_policy/sequential_3/twin_q_out/kernel": "twin_" + "q_model.twin_q_out._model.0.weight", + "default_policy/sequential_3/twin_q_out/bias": "twin_" + "q_model.twin_q_out._model.0.bias", + } + + env = SimpleEnv + batch_size = 100 + if env is SimpleEnv: + obs_size = (batch_size, 1) + actions = np.random.random(size=(batch_size, 1)) + elif env == "CartPole-v0": + obs_size = (batch_size, 4) + actions = np.random.randint(0, 2, size=(batch_size, )) + else: + obs_size = (batch_size, 3) + actions = np.random.random(size=(batch_size, 1)) + + # Batch of size=n. + input_ = self._get_batch_helper(obs_size, actions, batch_size) + + # Simply compare loss values AND grads of all frameworks with each + # other. + prev_fw_loss = weights_dict = None + expect_c, expect_a, expect_t = None, None, None + # History of tf-updated NN-weights over n training steps. + tf_updated_weights = [] + # History of input batches used. + tf_inputs = [] + for fw, sess in framework_iterator( + config, frameworks=("tf", "torch"), session=True): + # Generate Trainer and get its default Policy object. + trainer = ddpg.DDPGTrainer(config=config, env=env) + policy = trainer.get_policy() + p_sess = None + if sess: + p_sess = policy.get_session() + + # Set all weights (of all nets) to fixed values. + if weights_dict is None: + assert fw == "tf" # Start with the tf vars-dict. + weights_dict = policy.get_weights() + else: + assert fw == "torch" # Then transfer that to torch Model. + model_dict = self._translate_weights_to_torch( + weights_dict, map_) + policy.model.load_state_dict(model_dict) + policy.target_model.load_state_dict(model_dict) + + if fw == "torch": + # Actually convert to torch tensors. + input_ = policy._lazy_tensor_dict(input_) + input_ = {k: input_[k] for k in input_.keys()} + + # Only run the expectation once, should be the same anyways + # for all frameworks. + if expect_c is None: + expect_c, expect_a, expect_t = \ + self._ddpg_loss_helper( + input_, weights_dict, sorted(weights_dict.keys()), fw, + gamma=config["gamma"], + huber_threshold=config["huber_threshold"], + l2_reg=config["l2_reg"], + sess=sess) + + # Get actual outs and compare to expectation AND previous + # framework. c=critic, a=actor, e=entropy, t=td-error. + if fw == "tf": + c, a, t, tf_c_grads, tf_a_grads = \ + p_sess.run([ + policy.critic_loss, + policy.actor_loss, + policy.td_error, + policy._critic_optimizer.compute_gradients( + policy.critic_loss, + policy.model.q_variables()), + policy._actor_optimizer.compute_gradients( + policy.actor_loss, + policy.model.policy_variables())], + feed_dict=policy._get_loss_inputs_dict( + input_, shuffle=False)) + # Check pure loss values. + check(c, expect_c) + check(a, expect_a) + check(t, expect_t) + + tf_c_grads = [g for g, v in tf_c_grads] + tf_a_grads = [g for g, v in tf_a_grads] + + elif fw == "torch": + loss_torch(policy, policy.model, None, input_) + c, a, t = policy.critic_loss, policy.actor_loss, \ + policy.td_error + # Check pure loss values. + check(c, expect_c) + check(a, expect_a) + check(t, expect_t) + + # Test actor gradients. + policy._actor_optimizer.zero_grad() + assert all(v.grad is None for v in policy.model.q_variables()) + assert all( + v.grad is None for v in policy.model.policy_variables()) + a.backward() + # `actor_loss` depends on Q-net vars + # (but not twin-Q-net vars!). + assert not any(v.grad is None + for v in policy.model.q_variables()[:4]) + assert all( + v.grad is None for v in policy.model.q_variables()[4:]) + assert not all( + torch.mean(v.grad) == 0 + for v in policy.model.policy_variables()) + assert not all( + torch.min(v.grad) == 0 + for v in policy.model.policy_variables()) + # Compare with tf ones. + torch_a_grads = [ + v.grad for v in policy.model.policy_variables() + ] + for tf_g, torch_g in zip(tf_a_grads, torch_a_grads): + if tf_g.shape != torch_g.shape: + check(tf_g, np.transpose(torch_g)) + else: + check(tf_g, torch_g) + + # Test critic gradients. + policy._critic_optimizer.zero_grad() + assert all( + v.grad is None or torch.mean(v.grad) == 0.0 + for v in policy.model.q_variables()) + assert all( + v.grad is None or torch.min(v.grad) == 0.0 + for v in policy.model.q_variables()) + c.backward() + assert not all( + torch.mean(v.grad) == 0 + for v in policy.model.q_variables()) + assert not all( + torch.min(v.grad) == 0 for v in policy.model.q_variables()) + # Compare with tf ones. + torch_c_grads = [v.grad for v in policy.model.q_variables()] + for tf_g, torch_g in zip(tf_c_grads, torch_c_grads): + if tf_g.shape != torch_g.shape: + check(tf_g, np.transpose(torch_g)) + else: + check(tf_g, torch_g) + # Compare (unchanged(!) actor grads) with tf ones. + torch_a_grads = [ + v.grad for v in policy.model.policy_variables() + ] + for tf_g, torch_g in zip(tf_a_grads, torch_a_grads): + if tf_g.shape != torch_g.shape: + check(tf_g, np.transpose(torch_g)) + else: + check(tf_g, torch_g) + + # Store this framework's losses in prev_fw_loss to compare with + # next framework's outputs. + if prev_fw_loss is not None: + check(c, prev_fw_loss[0]) + check(a, prev_fw_loss[1]) + check(t, prev_fw_loss[2]) + + prev_fw_loss = (c, a, t) + + # Update weights from our batch (n times). + for update_iteration in range(10): + print("train iteration {}".format(update_iteration)) + if fw == "tf": + in_ = self._get_batch_helper(obs_size, actions, batch_size) + tf_inputs.append(in_) + # Set a fake-batch to use + # (instead of sampling from replay buffer). + trainer.optimizer._fake_batch = in_ + trainer.train() + updated_weights = policy.get_weights() + # Net must have changed. + if tf_updated_weights: + check( + updated_weights[ + "default_policy/actor_hidden_0/kernel"], + tf_updated_weights[-1][ + "default_policy/actor_hidden_0/kernel"], + false=True) + tf_updated_weights.append(updated_weights) + + # Compare with updated tf-weights. Must all be the same. + else: + tf_weights = tf_updated_weights[update_iteration] + in_ = tf_inputs[update_iteration] + # Set a fake-batch to use + # (instead of sampling from replay buffer). + trainer.optimizer._fake_batch = in_ + trainer.train() + # Compare updated model and target weights. + for tf_key in tf_weights.keys(): + tf_var = tf_weights[tf_key] + # Model. + if re.search( + "actor_out_1|actor_hidden_0_1|sequential_" + "[23]", tf_key): + torch_var = policy.target_model.state_dict()[map_[ + tf_key]] + # Target model. + else: + torch_var = policy.model.state_dict()[map_[tf_key]] + if tf_var.shape != torch_var.shape: + check(tf_var, np.transpose(torch_var), rtol=0.07) + else: + check(tf_var, torch_var, rtol=0.07) + + def _get_batch_helper(self, obs_size, actions, batch_size): + return { + SampleBatch.CUR_OBS: np.random.random(size=obs_size), + SampleBatch.ACTIONS: actions, + SampleBatch.REWARDS: np.random.random(size=(batch_size, )), + SampleBatch.DONES: np.random.choice( + [True, False], size=(batch_size, )), + SampleBatch.NEXT_OBS: np.random.random(size=obs_size), + "weights": np.ones(shape=(batch_size, )), + } + + def _ddpg_loss_helper(self, train_batch, weights, ks, fw, gamma, + huber_threshold, l2_reg, sess): + """Emulates DDPG loss functions for tf and torch.""" + model_out_t = train_batch[SampleBatch.CUR_OBS] + target_model_out_tp1 = train_batch[SampleBatch.NEXT_OBS] + # get_policy_output + policy_t = sigmoid(2.0 * fc( + relu( + fc(model_out_t, weights[ks[1]], weights[ks[0]], framework=fw)), + weights[ks[5]], weights[ks[4]])) + # Get policy output for t+1 (target model). + policy_tp1 = sigmoid(2.0 * fc( + relu( + fc(target_model_out_tp1, + weights[ks[3]], + weights[ks[2]], + framework=fw)), weights[ks[7]], weights[ks[6]])) + # Assume no smooth target policy. + policy_tp1_smoothed = policy_tp1 + + # Q-values for the actually selected actions. + # get_q_values + q_t = fc( + relu( + fc(np.concatenate( + [model_out_t, train_batch[SampleBatch.ACTIONS]], -1), + weights[ks[9]], + weights[ks[8]], + framework=fw)), + weights[ks[11]], + weights[ks[10]], + framework=fw) + twin_q_t = fc( + relu( + fc(np.concatenate( + [model_out_t, train_batch[SampleBatch.ACTIONS]], -1), + weights[ks[13]], + weights[ks[12]], + framework=fw)), + weights[ks[15]], + weights[ks[14]], + framework=fw) + + # Q-values for current policy in given current state. + # get_q_values + q_t_det_policy = fc( + relu( + fc(np.concatenate([model_out_t, policy_t], -1), + weights[ks[9]], + weights[ks[8]], + framework=fw)), + weights[ks[11]], + weights[ks[10]], + framework=fw) + + # Target q network evaluation. + # target_model.get_q_values + q_tp1 = fc( + relu( + fc(np.concatenate([target_model_out_tp1, policy_tp1_smoothed], + -1), + weights[ks[17]], + weights[ks[16]], + framework=fw)), + weights[ks[19]], + weights[ks[18]], + framework=fw) + twin_q_tp1 = fc( + relu( + fc(np.concatenate([target_model_out_tp1, policy_tp1_smoothed], + -1), + weights[ks[21]], + weights[ks[20]], + framework=fw)), + weights[ks[23]], + weights[ks[22]], + framework=fw) + + q_t_selected = np.squeeze(q_t, axis=-1) + twin_q_t_selected = np.squeeze(twin_q_t, axis=-1) + q_tp1 = np.minimum(q_tp1, twin_q_tp1) + q_tp1_best = np.squeeze(q_tp1, axis=-1) + + dones = train_batch[SampleBatch.DONES] + rewards = train_batch[SampleBatch.REWARDS] + if fw == "torch": + dones = dones.float().numpy() + rewards = rewards.numpy() + + q_tp1_best_masked = (1.0 - dones) * q_tp1_best + q_t_selected_target = rewards + gamma * q_tp1_best_masked + + td_error = q_t_selected - q_t_selected_target + twin_td_error = twin_q_t_selected - q_t_selected_target + td_error = td_error + twin_td_error + errors = huber_loss(td_error, huber_threshold) + \ + huber_loss(twin_td_error, huber_threshold) + + critic_loss = np.mean(errors) + actor_loss = -np.mean(q_t_det_policy) + # Add l2-regularization if required. + for name, var in weights.items(): + if re.match("default_policy/actor_(hidden_0|out)/kernel", name): + actor_loss += (l2_reg * l2_loss(var)) + elif re.match("default_policy/sequential(_1)?/\\w+/kernel", name): + critic_loss += (l2_reg * l2_loss(var)) + + return critic_loss, actor_loss, td_error + + def _translate_weights_to_torch(self, weights_dict, map_): + model_dict = { + map_[k]: convert_to_torch_tensor( + np.transpose(v) if re.search("kernel", k) else v) + for k, v in weights_dict.items() if re.search( + "default_policy/(actor_(hidden_0|out)|sequential(_1)?)/", k) + } + return model_dict + if __name__ == "__main__": import pytest diff --git a/rllib/agents/sac/tests/test_sac.py b/rllib/agents/sac/tests/test_sac.py index d454b1907..041912dd8 100644 --- a/rllib/agents/sac/tests/test_sac.py +++ b/rllib/agents/sac/tests/test_sac.py @@ -67,7 +67,7 @@ class TestSAC(unittest.TestCase): print(results) def test_sac_loss_function(self): - """Tests SAC function results across all frameworks.""" + """Tests SAC loss function results across all frameworks.""" config = sac.DEFAULT_CONFIG.copy() # Run locally. config["num_workers"] = 0 diff --git a/rllib/utils/numpy.py b/rllib/utils/numpy.py index 19f3c9ff2..650d711f4 100644 --- a/rllib/utils/numpy.py +++ b/rllib/utils/numpy.py @@ -15,6 +15,25 @@ MIN_LOG_NN_OUTPUT = -20 MAX_LOG_NN_OUTPUT = 2 +def huber_loss(x, delta=1.0): + """Reference: https://en.wikipedia.org/wiki/Huber_loss""" + return np.where( + np.abs(x) < delta, + np.power(x, 2.0) * 0.5, delta * (np.abs(x) - 0.5 * delta)) + + +def l2_loss(x): + """Computes half the L2 norm of a tensor (w/o the sqrt): sum(x**2) / 2 + + Args: + x (np.ndarray): The input tensor. + + Returns: + The l2-loss output according to the above formula given `x`. + """ + return np.sum(np.square(x)) / 2.0 + + def sigmoid(x, derivative=False): """ Returns the sigmoid function applied to x. @@ -228,10 +247,3 @@ def lstm(x, unrolled_outputs[:, t, :] = h_states return unrolled_outputs, (c_states, h_states) - - -def huber_loss(x, delta=1.0): - """Reference: https://en.wikipedia.org/wiki/Huber_loss""" - return np.where( - np.abs(x) < delta, - np.power(x, 2.0) * 0.5, delta * (np.abs(x) - 0.5 * delta))