diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index 71efe15e8..d2873e602 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -17,12 +17,24 @@ from ray.tune.trial import Resources OPTIMIZER_SHARED_CONFIGS = [ "buffer_size", "prioritized_replay", "prioritized_replay_alpha", - "prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size", - "train_batch_size", "learning_starts" + "prioritized_replay_beta", "schedule_max_timesteps", + "beta_annealing_fraction", "final_prioritized_replay_beta", + "prioritized_replay_eps", "sample_batch_size", "train_batch_size", + "learning_starts" ] DEFAULT_CONFIG = with_common_config({ # === Model === + # Number of atoms for representing the distribution of return. When + # this is greater than 1, distributional Q-learning is used. + # the discrete supports are bounded by v_min and v_max + "num_atoms": 1, + "v_min": -10.0, + "v_max": 10.0, + # Whether to use noisy network + "noisy": False, + # control the initial value of noisy nets + "sigma0": 0.5, # Whether to use dueling dqn "dueling": True, # Whether to use double dqn @@ -59,6 +71,11 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_alpha": 0.6, # Beta parameter for sampling from prioritized replay buffer. "prioritized_replay_beta": 0.4, + # Fraction of entire training period over which the beta parameter is + # annealed + "beta_annealing_fraction": 0.2, + # Final value of beta + "final_prioritized_replay_beta": 0.4, # Epsilon to add to the TD errors when updating priorities. "prioritized_replay_eps": 1e-6, # Whether to LZ4 compress observations @@ -67,6 +84,8 @@ DEFAULT_CONFIG = with_common_config({ # === Optimization === # Learning rate for adam optimizer "lr": 5e-4, + # Adam epsilon hyper parameter + "adam_epsilon": 1e-8, # If not None, clip gradients during optimization at this value "grad_norm_clipping": 40, # How many steps of the model to sample before learning starts. @@ -130,6 +149,12 @@ class DQNAgent(Agent): ] for k in OPTIMIZER_SHARED_CONFIGS: + if self._agent_name != "DQN" and k in [ + "schedule_max_timesteps", "beta_annealing_fraction", + "final_prioritized_replay_beta" + ]: + # only Rainbow needs annealing prioritized_replay_beta + continue if k not in self.config["optimizer"]: self.config["optimizer"][k] = self.config[k] @@ -149,6 +174,7 @@ class DQNAgent(Agent): else: # Hack to workaround https://github.com/ray-project/ray/issues/2541 self.remote_evaluators = None + self.optimizer = getattr(optimizers, self.config["optimizer_class"])( self.local_evaluator, self.remote_evaluators, self.config["optimizer"]) diff --git a/python/ray/rllib/agents/dqn/dqn_policy_graph.py b/python/ray/rllib/agents/dqn/dqn_policy_graph.py index 6dccdede1..b9fd6468f 100644 --- a/python/ray/rllib/agents/dqn/dqn_policy_graph.py +++ b/python/ray/rllib/agents/dqn/dqn_policy_graph.py @@ -18,32 +18,158 @@ Q_TARGET_SCOPE = "target_q_func" class QNetwork(object): - def __init__(self, model, num_actions, dueling=False, hiddens=[256]): + def __init__(self, + model, + num_actions, + dueling=False, + hiddens=[256], + use_noisy=False, + num_atoms=1, + v_min=-10.0, + v_max=10.0, + sigma0=0.5): with tf.variable_scope("action_value"): action_out = model.last_layer - for hidden in hiddens: - action_out = layers.fully_connected( - action_out, num_outputs=hidden, activation_fn=tf.nn.relu) - action_scores = layers.fully_connected( - action_out, num_outputs=num_actions, activation_fn=None) + for i in range(len(hiddens)): + if use_noisy: + action_out = self.noisy_layer("hidden_%d" % i, action_out, + hiddens[i], sigma0) + else: + action_out = layers.fully_connected( + action_out, + num_outputs=hiddens[i], + activation_fn=tf.nn.relu) + if use_noisy: + action_scores = self.noisy_layer( + "output", + action_out, + num_actions * num_atoms, + sigma0, + non_linear=False) + else: + action_scores = layers.fully_connected( + action_out, + num_outputs=num_actions * num_atoms, + activation_fn=None) + if num_atoms > 1: + # Distributional Q-learning uses a discrete support z + # to represent the action value distribution + z = tf.range(num_atoms, dtype=tf.float32) + z = v_min + z * (v_max - v_min) / float(num_atoms - 1) + support_logits_per_action = tf.reshape( + tensor=action_scores, shape=(-1, num_actions, num_atoms)) + support_prob_per_action = tf.nn.softmax( + logits=support_logits_per_action) + action_scores = tf.reduce_sum( + input_tensor=z * support_prob_per_action, axis=-1) + self.logits = support_logits_per_action + self.dist = support_prob_per_action + else: + self.logits = tf.expand_dims(tf.ones_like(action_scores), -1) + self.dist = tf.expand_dims(tf.ones_like(action_scores), -1) if dueling: with tf.variable_scope("state_value"): state_out = model.last_layer - for hidden in hiddens: - state_out = layers.fully_connected( + for i in range(len(hiddens)): + if use_noisy: + state_out = self.noisy_layer("dueling_hidden_%d" % i, + state_out, hiddens[i], + sigma0) + else: + state_out = layers.fully_connected( + state_out, + num_outputs=hiddens[i], + activation_fn=tf.nn.relu) + if use_noisy: + state_score = self.noisy_layer( + "dueling_output", state_out, - num_outputs=hidden, - activation_fn=tf.nn.relu) - state_score = layers.fully_connected( - state_out, num_outputs=1, activation_fn=None) - action_scores_mean = tf.reduce_mean(action_scores, 1) - action_scores_centered = action_scores - tf.expand_dims( - action_scores_mean, 1) - self.value = state_score + action_scores_centered + num_atoms, + sigma0, + non_linear=False) + else: + state_score = layers.fully_connected( + state_out, num_outputs=num_atoms, activation_fn=None) + if num_atoms > 1: + support_logits_per_action_mean = tf.reduce_mean( + support_logits_per_action, 1) + support_logits_per_action_centered = ( + support_logits_per_action - tf.expand_dims( + support_logits_per_action_mean, 1)) + support_logits_per_action = tf.expand_dims( + state_score, 1) + support_logits_per_action_centered + support_prob_per_action = tf.nn.softmax( + logits=support_logits_per_action) + self.value = tf.reduce_sum( + input_tensor=z * support_prob_per_action, axis=-1) + self.logits = support_logits_per_action + self.dist = support_prob_per_action + else: + action_scores_mean = tf.reduce_mean(action_scores, 1) + action_scores_centered = action_scores - tf.expand_dims( + action_scores_mean, 1) + self.value = state_score + action_scores_centered else: self.value = action_scores + def f_epsilon(self, x): + return tf.sign(x) * tf.sqrt(tf.abs(x)) + + def noisy_layer(self, prefix, action_in, out_size, sigma0, + non_linear=True): + """ + a common dense layer: y = w^{T}x + b + a noisy layer: y = (w + \epsilon_w*\sigma_w)^{T}x + + (b+\epsilon_b*\sigma_b) + where \epsilon are random variables sampled from factorized normal + distributions and \sigma are trainable variables which are expected to + vanish along the training procedure + """ + in_size = int(action_in.shape[1]) + + epsilon_in = tf.random_normal(shape=[in_size]) + epsilon_out = tf.random_normal(shape=[out_size]) + epsilon_in = self.f_epsilon(epsilon_in) + epsilon_out = self.f_epsilon(epsilon_out) + epsilon_w = tf.matmul( + a=tf.expand_dims(epsilon_in, -1), b=tf.expand_dims(epsilon_out, 0)) + epsilon_b = epsilon_out + sigma_w = tf.get_variable( + name=prefix + "_sigma_w", + shape=[in_size, out_size], + dtype=tf.float32, + initializer=tf.random_uniform_initializer( + minval=-1.0 / np.sqrt(float(in_size)), + maxval=1.0 / np.sqrt(float(in_size)))) + # TF noise generation can be unreliable on GPU + # If generating the noise on the CPU, + # lowering sigma0 to 0.1 may be helpful + sigma_b = tf.get_variable( + name=prefix + "_sigma_b", + shape=[out_size], + dtype=tf.float32, # 0.5~GPU, 0.1~CPU + initializer=tf.constant_initializer( + sigma0 / np.sqrt(float(in_size)))) + + w = tf.get_variable( + name=prefix + "_fc_w", + shape=[in_size, out_size], + dtype=tf.float32, + initializer=layers.xavier_initializer()) + b = tf.get_variable( + name=prefix + "_fc_b", + shape=[out_size], + dtype=tf.float32, + initializer=tf.zeros_initializer()) + + action_activation = tf.nn.xw_plus_b(action_in, w + sigma_w * epsilon_w, + b + sigma_b * epsilon_b) + + if not non_linear: + return action_activation + return tf.nn.relu(action_activation) + class QValuePolicy(object): def __init__(self, q_values, observations, num_actions, stochastic, eps): @@ -65,21 +191,67 @@ class QValuePolicy(object): class QLoss(object): def __init__(self, q_t_selected, + q_logits_t_selected, q_tp1_best, + q_dist_tp1_best, importance_weights, rewards, done_mask, gamma=0.99, - n_step=1): - q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best + n_step=1, + num_atoms=1, + v_min=-10.0, + v_max=10.0): - # compute RHS of bellman equation - q_t_selected_target = rewards + gamma**n_step * q_tp1_best_masked + if num_atoms > 1: + # Distributional Q-learning which corresponds to an entropy loss - # compute the error (potentially clipped) - self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) - self.loss = tf.reduce_mean( - importance_weights * _huber_loss(self.td_error)) + z = tf.range(num_atoms, dtype=tf.float32) + z = v_min + z * (v_max - v_min) / float(num_atoms - 1) + + # (batch_size, 1) * (1, num_atoms) = (batch_size, num_atoms) + r_tau = tf.expand_dims( + rewards, -1) + gamma**n_step * tf.expand_dims( + 1.0 - done_mask, -1) * tf.expand_dims(z, 0) + r_tau = tf.clip_by_value(r_tau, v_min, v_max) + b = (r_tau - v_min) / ((v_max - v_min) / float(num_atoms - 1)) + lb = tf.floor(b) + ub = tf.ceil(b) + # indispensable judgement which is missed in most implementations + # when b happens to be an integer, lb == ub, so pr_j(s', a*) will + # be discarded because (ub-b) == (b-lb) == 0 + floor_equal_ceil = tf.to_float(tf.less(ub - lb, 0.5)) + + l_project = tf.one_hot( + tf.cast(lb, dtype=tf.int32), + num_atoms) # (batch_size, num_atoms, num_atoms) + u_project = tf.one_hot( + tf.cast(ub, dtype=tf.int32), + num_atoms) # (batch_size, num_atoms, num_atoms) + ml_delta = q_dist_tp1_best * (ub - b + floor_equal_ceil) + mu_delta = q_dist_tp1_best * (b - lb) + ml_delta = tf.reduce_sum( + l_project * tf.expand_dims(ml_delta, -1), axis=1) + mu_delta = tf.reduce_sum( + u_project * tf.expand_dims(mu_delta, -1), axis=1) + m = ml_delta + mu_delta + + # Rainbow paper claims that using this cross entropy loss for + # priority is robust and insensitive to `prioritized_replay_alpha` + self.td_error = tf.nn.softmax_cross_entropy_with_logits( + labels=m, logits=q_logits_t_selected) + self.loss = tf.reduce_mean(self.td_error * importance_weights) + else: + q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = rewards + gamma**n_step * q_tp1_best_masked + + # compute the error (potentially clipped) + self.td_error = ( + q_t_selected - tf.stop_gradient(q_t_selected_target)) + self.loss = tf.reduce_mean( + importance_weights * _huber_loss(self.td_error)) class DQNPolicyGraph(TFPolicyGraph): @@ -102,7 +274,8 @@ class DQNPolicyGraph(TFPolicyGraph): # Action Q network with tf.variable_scope(Q_SCOPE) as scope: - q_values = self._build_q_network(self.cur_observations) + q_values, q_logits, q_dist = self._build_q_network( + self.cur_observations) self.q_func_vars = _scope_vars(scope.name) # Action outputs @@ -121,29 +294,43 @@ class DQNPolicyGraph(TFPolicyGraph): # q network evaluation with tf.variable_scope(Q_SCOPE, reuse=True): - q_t = self._build_q_network(self.obs_t) + q_t, q_logits_t, q_dist_t = self._build_q_network(self.obs_t) # target q network evalution with tf.variable_scope(Q_TARGET_SCOPE) as scope: - q_tp1 = self._build_q_network(self.obs_tp1) + q_tp1, q_logits_tp1, q_dist_tp1 = self._build_q_network( + self.obs_tp1) self.target_q_func_vars = _scope_vars(scope.name) # q scores for actions which we know were selected in the given state. - q_t_selected = tf.reduce_sum( - q_t * tf.one_hot(self.act_t, self.num_actions), 1) + one_hot_selection = tf.one_hot(self.act_t, self.num_actions) + q_t_selected = tf.reduce_sum(q_t * one_hot_selection, 1) + q_logits_t_selected = tf.reduce_sum( + q_logits_t * tf.expand_dims(one_hot_selection, -1), 1) # compute estimate of best possible value starting from state at t + 1 if config["double_q"]: with tf.variable_scope(Q_SCOPE, reuse=True): - q_tp1_using_online_net = self._build_q_network(self.obs_tp1) + q_tp1_using_online_net, q_logits_tp1_using_online_net, \ + q_dist_tp1_using_online_net = self._build_q_network( + self.obs_tp1) q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) - q_tp1_best = tf.reduce_sum( - q_tp1 * tf.one_hot(q_tp1_best_using_online_net, - self.num_actions), 1) + q_tp1_best_one_hot_selection = tf.one_hot( + q_tp1_best_using_online_net, self.num_actions) + q_tp1_best = tf.reduce_sum(q_tp1 * q_tp1_best_one_hot_selection, 1) + q_dist_tp1_best = tf.reduce_sum( + q_dist_tp1 * tf.expand_dims(q_tp1_best_one_hot_selection, -1), + 1) else: - q_tp1_best = tf.reduce_max(q_tp1, 1) + q_tp1_best_one_hot_selection = tf.one_hot( + tf.argmax(q_tp1, 1), self.num_actions) + q_tp1_best = tf.reduce_sum(q_tp1 * q_tp1_best_one_hot_selection, 1) + q_dist_tp1_best = tf.reduce_sum( + q_dist_tp1 * tf.expand_dims(q_tp1_best_one_hot_selection, -1), + 1) - self.loss = self._build_q_loss(q_t_selected, q_tp1_best) + self.loss = self._build_q_loss(q_t_selected, q_logits_t_selected, + q_tp1_best, q_dist_tp1_best) # update_target_fn will be called periodically to copy Q network to # target Q network @@ -176,22 +363,29 @@ class DQNPolicyGraph(TFPolicyGraph): self.sess.run(tf.global_variables_initializer()) def _build_q_network(self, obs): - return QNetwork( - ModelCatalog.get_model(obs, 1, - self.config["model"]), self.num_actions, - self.config["dueling"], self.config["hiddens"]).value + qnet = QNetwork( + ModelCatalog.get_model(obs, 1, self.config["model"]), + self.num_actions, self.config["dueling"], self.config["hiddens"], + self.config["noisy"], self.config["num_atoms"], + self.config["v_min"], self.config["v_max"], self.config["sigma0"]) + return qnet.value, qnet.logits, qnet.dist def _build_q_value_policy(self, q_values): return QValuePolicy(q_values, self.cur_observations, self.num_actions, self.stochastic, self.eps).action - def _build_q_loss(self, q_t_selected, q_tp1_best): - return QLoss(q_t_selected, q_tp1_best, self.importance_weights, - self.rew_t, self.done_mask, self.config["gamma"], - self.config["n_step"]) + def _build_q_loss(self, q_t_selected, q_logits_t_selected, q_tp1_best, + q_dist_tp1_best): + return QLoss(q_t_selected, q_logits_t_selected, q_tp1_best, + q_dist_tp1_best, self.importance_weights, self.rew_t, + self.done_mask, self.config["gamma"], + self.config["n_step"], self.config["num_atoms"], + self.config["v_min"], self.config["v_max"]) def optimizer(self): - return tf.train.AdamOptimizer(learning_rate=self.config["lr"]) + return tf.train.AdamOptimizer( + learning_rate=self.config["lr"], + epsilon=self.config["adam_epsilon"]) def gradients(self, optimizer): if self.config["grad_norm_clipping"] is not None: diff --git a/python/ray/rllib/optimizers/sync_replay_optimizer.py b/python/ray/rllib/optimizers/sync_replay_optimizer.py index 900f009dd..2993f883f 100644 --- a/python/ray/rllib/optimizers/sync_replay_optimizer.py +++ b/python/ray/rllib/optimizers/sync_replay_optimizer.py @@ -14,6 +14,7 @@ from ray.rllib.evaluation.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \ from ray.rllib.utils.compression import pack_if_needed from ray.rllib.utils.filter import RunningStat from ray.rllib.utils.timer import TimerStat +from ray.rllib.utils.schedules import LinearSchedule class SyncReplayOptimizer(PolicyOptimizer): @@ -29,12 +30,20 @@ class SyncReplayOptimizer(PolicyOptimizer): prioritized_replay=True, prioritized_replay_alpha=0.6, prioritized_replay_beta=0.4, + schedule_max_timesteps=100000, + beta_annealing_fraction=0.2, + final_prioritized_replay_beta=0.4, prioritized_replay_eps=1e-6, train_batch_size=32, sample_batch_size=4): self.replay_starts = learning_starts - self.prioritized_replay_beta = prioritized_replay_beta + # linearly annealing beta used in Rainbow paper + self.prioritized_replay_beta = LinearSchedule( + schedule_timesteps=int( + schedule_max_timesteps * beta_annealing_fraction), + initial_p=prioritized_replay_beta, + final_p=final_prioritized_replay_beta) self.prioritized_replay_eps = prioritized_replay_eps self.train_batch_size = train_batch_size @@ -122,7 +131,8 @@ class SyncReplayOptimizer(PolicyOptimizer): (obses_t, actions, rewards, obses_tp1, dones, weights, batch_indexes) = replay_buffer.sample( self.train_batch_size, - beta=self.prioritized_replay_beta) + beta=self.prioritized_replay_beta.value( + self.num_steps_trained)) else: (obses_t, actions, rewards, obses_tp1, dones) = replay_buffer.sample(self.train_batch_size) diff --git a/python/ray/rllib/tuned_examples/pong-rainbow.yaml b/python/ray/rllib/tuned_examples/pong-rainbow.yaml new file mode 100644 index 000000000..5226d57fe --- /dev/null +++ b/python/ray/rllib/tuned_examples/pong-rainbow.yaml @@ -0,0 +1,29 @@ +pong-deterministic-rainbow: + env: PongDeterministic-v4 + run: DQN + stop: + episode_reward_mean: 20 + config: + num_atoms: 51 + noisy: True + gamma: 0.99 + lr: .0001 + hiddens: [512] + learning_starts: 10000 + buffer_size: 50000 + sample_batch_size: 4 + train_batch_size: 32 + schedule_max_timesteps: 2000000 + exploration_final_eps: 0.0 + exploration_fraction: .000001 + target_network_update_freq: 500 + prioritized_replay: True + prioritized_replay_alpha: 0.5 + beta_annealing_fraction: 0.2 + final_prioritized_replay_beta: 1.0 + n_step: 3 + gpu: True + model: + grayscale: True + zero_mean: False + dim: 42