diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index fb97fe742..9f7e43b14 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -7,6 +7,7 @@ import time import numpy as np import pickle import os +import sys import tensorflow as tf import ray @@ -15,90 +16,99 @@ from ray.rllib.dqn import logger, models from ray.rllib.dqn.common.wrappers import wrap_dqn from ray.rllib.dqn.common.schedules import LinearSchedule from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer +from ray.rllib.ppo.filter import RunningStat from ray.tune.result import TrainingResult -"""The default configuration dict for the DQN algorithm. - - dueling: bool - whether to use dueling dqn - double_q: bool - whether to use double dqn - hiddens: array - hidden layer sizes of the state and action value networks - model: dict - config options to pass to the model constructor - lr: float - learning rate for adam optimizer - schedule_max_timesteps: int - max num timesteps for annealing schedules - timesteps_per_iteration: int - number of env steps to optimize for before returning - buffer_size: int - size of the replay buffer - exploration_fraction: float - fraction of entire training period over which the exploration rate is - annealed - exploration_final_eps: float - final value of random action probability - sample_batch_size: int - update the replay buffer with this many samples at once - num_workers: int - the number of workers to use for parallel batch sample collection - train_batch_size: int - size of a batched sampled from replay buffer for training - print_freq: int - how often to print out training progress - set to None to disable printing - learning_starts: int - how many steps of the model to collect transitions for before learning - starts - gamma: float - discount factor - grad_norm_clipping: int or None - if not None, clip gradients during optimization at this value - target_network_update_freq: int - update the target network every `target_network_update_freq` steps. - prioritized_replay: True - if True prioritized replay buffer will be used. - prioritized_replay_alpha: float - alpha parameter for prioritized replay buffer - prioritized_replay_beta0: float - initial value of beta for prioritized replay buffer - prioritized_replay_beta_iters: int - number of iterations over which beta will be annealed from initial - value to 1.0. If set to None equals to schedule_max_timesteps - prioritized_replay_eps: float - epsilon to add to the TD errors when updating priorities. - num_cpu: int - number of cpus to use for training -""" DEFAULT_CONFIG = dict( + # === Model === + # Whether to use dueling dqn dueling=True, + # Whether to use double dqn double_q=True, + # Hidden layer sizes of the state and action value networks hiddens=[256], + # Config options to pass to the model constructor model={}, - gpu_offset=0, - lr=5e-4, + # Discount factor for the MDP + gamma=0.99, + + # === Exploration === + # Max num timesteps for annealing schedules. Exploration is annealed from + # 1.0 to exploration_fraction over this number of timesteps scaled by + # exploration_fraction schedule_max_timesteps=100000, + # Number of env steps to optimize for before returning timesteps_per_iteration=1000, - buffer_size=50000, + # Fraction of entire training period over which the exploration rate is + # annealed exploration_fraction=0.1, + # Final value of random action probability exploration_final_eps=0.02, - sample_batch_size=1, - num_workers=1, - train_batch_size=32, - print_freq=1, + # How many steps of the model to sample before learning starts. learning_starts=1000, - gamma=1.0, - grad_norm_clipping=10, + # Update the target network every `target_network_update_freq` steps. target_network_update_freq=500, - prioritized_replay=False, + + # === Replay buffer === + # Size of the replay buffer. Note that if async_updates is set, then each + # worker will have a replay buffer of this size. + buffer_size=50000, + # If True prioritized replay buffer will be used. + prioritized_replay=True, + # Alpha parameter for prioritized replay buffer prioritized_replay_alpha=0.6, + # Initial value of beta for prioritized replay buffer prioritized_replay_beta0=0.4, + # Number of iterations over which beta will be annealed from initial + # value to 1.0. If set to None equals to schedule_max_timesteps prioritized_replay_beta_iters=None, + # Epsilon to add to the TD errors when updating priorities. prioritized_replay_eps=1e-6, - num_cpu=16) + + # === Optimization === + # Learning rate for adam optimizer + lr=5e-4, + # Update the replay buffer with this many samples at once. Note that this + # setting applies per-worker if num_workers > 1. + sample_batch_size=1, + # Size of a batched sampled from replay buffer for training. Note that if + # async_updates is set, then each worker returns gradients for a batch of + # this size. + train_batch_size=32, + # SGD minibatch size. Note that this must be << train_batch_size. This + # config has no effect if gradients_on_workres is True. + sgd_batch_size=32, + # If not None, clip gradients during optimization at this value + grad_norm_clipping=10, + + # === Tensorflow === + # Arguments to pass to tensorflow + tf_session_args={ + "device_count": {"CPU": 2}, + "log_device_placement": False, + "allow_soft_placement": True, + "inter_op_parallelism_threads": 1, + "intra_op_parallelism_threads": 1, + }, + + # === Parallelism === + # Number of workers for collecting samples with. Note that the typical + # setting is 1 unless your environment is particularly slow to sample. + num_workers=1, + # Whether to allocate GPUs for workers (if num_workers > 1). + use_gpu_for_workers=False, + # (Experimental) Whether to update the model asynchronously from + # workers. In this mode, gradients will be computed on workers instead of + # on the driver, and workers will each have their own replay buffer. + async_updates=False, + # (Experimental) Whether to use multiple GPUs for SGD optimization. + # Note that this only helps performance if the SGD batch size is large. + multi_gpu_optimize=False, + # Number of SGD iterations over the data. Only applies in multi-gpu mode. + num_sgd_iter=1, + # Devices to use for parallel SGD. Only applies in multi-gpu mode. + devices=["/gpu:0"]) class Actor(object): @@ -108,12 +118,9 @@ class Actor(object): self.env = env self.config = config - num_cpu = config["num_cpu"] - tf_config = tf.ConfigProto( - inter_op_parallelism_threads=num_cpu, - intra_op_parallelism_threads=num_cpu) + tf_config = tf.ConfigProto(**config["tf_session_args"]) self.sess = tf.Session(config=tf_config) - self.dqn_graph = models.DQNGraph(env, config) + self.dqn_graph = models.DQNGraph(env, config, logdir) # Create the replay buffer if config["prioritized_replay"]: @@ -143,8 +150,13 @@ class Actor(object): # Initialize the parameters and copy them to the target network. self.sess.run(tf.global_variables_initializer()) self.dqn_graph.update_target(self.sess) + self.set_weights_time = RunningStat(()) + self.sample_time = RunningStat(()) + self.grad_time = RunningStat(()) + + # Note that workers don't need target vars to be synced self.variables = ray.experimental.TensorFlowVariables( - tf.group(self.dqn_graph.q_tp1, self.dqn_graph.q_t), self.sess) + tf.group(self.dqn_graph.q_t, self.dqn_graph.q_tp1), self.sess) self.episode_rewards = [0.0] self.episode_lengths = [0.0] @@ -153,7 +165,7 @@ class Actor(object): self.file_writer = tf.summary.FileWriter(logdir, self.sess.graph) def step(self, cur_timestep): - # Take action and update exploration to the newest value + """Takes a single step, and returns the result of the step.""" action = self.dqn_graph.act( self.sess, np.array(self.obs)[None], self.exploration.value(cur_timestep))[0] @@ -168,12 +180,27 @@ class Actor(object): self.episode_lengths.append(0.0) return ret - def do_steps(self, num_steps, cur_timestep): - for _ in range(num_steps): - obs, action, rew, new_obs, done = self.step(cur_timestep) - self.replay_buffer.add(obs, action, rew, new_obs, done) + def do_steps(self, num_steps, cur_timestep, store): + """Takes N steps. - def get_gradient(self, cur_timestep): + If store is True, the steps will be stored in the local replay buffer. + Otherwise, the steps will be returned. + """ + + output = [] + for _ in range(num_steps): + result = self.step(cur_timestep) + if store: + obs, action, rew, new_obs, done = result + self.replay_buffer.add(obs, action, rew, new_obs, done) + else: + output.append(result) + if not store: + return output + + def do_multi_gpu_optimize(self, cur_timestep): + """Performs N iters of multi-gpu SGD over the local replay buffer.""" + dt = time.time() if self.config["prioritized_replay"]: experience = self.replay_buffer.sample( self.config["train_batch_size"], @@ -184,6 +211,73 @@ class Actor(object): obses_t, actions, rewards, obses_tp1, dones = \ self.replay_buffer.sample(self.config["train_batch_size"]) batch_idxes = None + replay_buffer_read_time = (time.time() - dt) + dt = time.time() + tuples_per_device = self.dqn_graph.multi_gpu_optimizer.load_data( + self.sess, + [obses_t, actions, rewards, obses_tp1, dones, + np.ones_like(rewards)]) + per_device_batch_size = ( + self.dqn_graph.multi_gpu_optimizer.per_device_batch_size) + num_batches = (int(tuples_per_device) // int(per_device_batch_size)) + data_load_time = (time.time() - dt) + dt = time.time() + for _ in range(self.config["num_sgd_iter"]): + batches = list(range(num_batches)) + np.random.shuffle(batches) + for i in batches: + self.dqn_graph.multi_gpu_optimizer.optimize( + self.sess, i * per_device_batch_size) + sgd_time = (time.time() - dt) + dt = time.time() + if self.config["prioritized_replay"]: + dt = time.time() + td_errors = self.dqn_graph.compute_td_error( + self.sess, obses_t, actions, rewards, obses_tp1, dones, + np.ones_like(rewards)) + dt = time.time() + new_priorities = ( + np.abs(td_errors) + self.config["prioritized_replay_eps"]) + self.replay_buffer.update_priorities( + batch_idxes, new_priorities) + prioritization_time = (time.time() - dt) + return { + "replay_buffer_read_time": replay_buffer_read_time, + "data_load_time": data_load_time, + "sgd_time": sgd_time, + "prioritization_time": prioritization_time, + } + + def do_async_step(self, worker_id, cur_timestep, params, gradient_id): + """Takes steps and returns grad to apply async in the driver.""" + dt = time.time() + self.set_weights(params) + self.set_weights_time.push(time.time() - dt) + dt = time.time() + self.do_steps( + self.config["sample_batch_size"], cur_timestep, store=True) + self.sample_time.push(time.time() - dt) + if (cur_timestep > self.config["learning_starts"] and + len(self.replay_buffer) > self.config["train_batch_size"]): + dt = time.time() + gradient = self.sample_buffer_gradient(cur_timestep) + self.grad_time.push(time.time() - dt) + else: + gradient = None + return gradient, {"id": worker_id, "gradient_id": gradient_id} + + def sample_buffer_gradient(self, cur_timestep): + """Returns grad over a batch sampled from the local replay buffer.""" + if self.config["prioritized_replay"]: + experience = self.replay_buffer.sample( + self.config["sgd_batch_size"], + beta=self.beta_schedule.value(cur_timestep)) + (obses_t, actions, rewards, obses_tp1, + dones, _, batch_idxes) = experience + else: + obses_t, actions, rewards, obses_tp1, dones = \ + self.replay_buffer.sample(self.config["sgd_batch_size"]) + batch_idxes = None td_errors, grad = self.dqn_graph.compute_gradients( self.sess, obses_t, actions, rewards, obses_tp1, dones, np.ones_like(rewards)) @@ -197,6 +291,8 @@ class Actor(object): def apply_gradients(self, grad): self.dqn_graph.apply_gradients(self.sess, grad) + # TODO(ekl) return a dictionary and use that everywhere to clean up the + # bookkeeping of stats def stats(self, num_timesteps): mean_100ep_reward = round(np.mean(self.episode_rewards[-101:-1]), 5) mean_100ep_length = round(np.mean(self.episode_lengths[-101:-1]), 5) @@ -206,7 +302,10 @@ class Actor(object): mean_100ep_length, len(self.episode_rewards), exploration, - len(self.replay_buffer)) + len(self.replay_buffer), + float(self.set_weights_time.mean), + float(self.sample_time.mean), + float(self.grad_time.mean)) def get_weights(self): return self.variables.get_weights() @@ -236,22 +335,44 @@ class Actor(object): @ray.remote class RemoteActor(Actor): - def __init__(self, env_creator, config, logdir, gpu_mask): - os.environ["CUDA_VISIBLE_DEVICES"] = gpu_mask + def __init__(self, env_creator, config, logdir): Actor.__init__(self, env_creator, config, logdir) + def stop(self): + sys.exit(0) + + +@ray.remote(num_gpus=1) +class GPURemoteActor(Actor): + def __init__(self, env_creator, config, logdir): + Actor.__init__(self, env_creator, config, logdir) + + def stop(self): + sys.exit(0) + class DQNAgent(Agent): _agent_name = "DQN" _default_config = DEFAULT_CONFIG + def stop(self): + for w in self.workers: + w.stop.remote() + def _init(self): self.actor = Actor(self.env_creator, self.config, self.logdir) - self.workers = [ - RemoteActor.remote( - self.env_creator, self.config, self.logdir, - "{}".format(i + self.config["gpu_offset"])) - for i in range(self.config["num_workers"])] + if self.config["use_gpu_for_workers"]: + remote_cls = GPURemoteActor + else: + remote_cls = RemoteActor + # Use remote workers + if self.config["num_workers"] > 1 or self.config["async_updates"]: + self.workers = [ + remote_cls.remote(self.env_creator, self.config, self.logdir) + for i in range(self.config["num_workers"])] + else: + # Use a single local worker and avoid object store overheads + self.workers = [] self.cur_timestep = 0 self.num_iterations = 0 @@ -262,52 +383,169 @@ class DQNAgent(Agent): self.saver = tf.train.Saver(max_to_keep=None) def _update_worker_weights(self): - w = self.actor.get_weights() - weights = ray.put(self.actor.get_weights()) - for w in self.workers: - w.set_weights.remote(weights) + if self.workers: + w = self.actor.get_weights() + weights = ray.put(self.actor.get_weights()) + for w in self.workers: + w.set_weights.remote(weights) def _train(self): + if self.config["async_updates"]: + return self._train_async() + else: + return self._train_sync() + + def _train_async(self): + apply_time = RunningStat(()) + wait_time = RunningStat(()) + gradient_lag = RunningStat(()) + iter_init_timesteps = self.cur_timestep + num_gradients_applied = 0 + gradient_list = [ + worker.do_async_step.remote( + i, self.cur_timestep, self.actor.get_weights(), + num_gradients_applied) + for i, worker in enumerate(self.workers)] + steps = self.config["sample_batch_size"] * len(gradient_list) + self.cur_timestep += steps + self.steps_since_update += steps + + while gradient_list: + dt = time.time() + gradient, info = ray.get(gradient_list[0]) + gradient_list = gradient_list[1:] + wait_time.push(time.time() - dt) + + if gradient is not None: + dt = time.time() + self.actor.apply_gradients(gradient) + apply_time.push(time.time() - dt) + gradient_lag.push(num_gradients_applied - info["gradient_id"]) + num_gradients_applied += 1 + + if (self.cur_timestep - iter_init_timesteps < + self.config["timesteps_per_iteration"]): + worker_id = info["id"] + gradient_list.append( + self.workers[info["id"]].do_async_step.remote( + worker_id, self.cur_timestep, + self.actor.get_weights(), num_gradients_applied)) + self.cur_timestep += self.config["sample_batch_size"] + self.steps_since_update += self.config["sample_batch_size"] + + if (self.cur_timestep > self.config["learning_starts"] and + self.steps_since_update > + self.config["target_network_update_freq"]): + # Update target network periodically. + self.actor.dqn_graph.update_target(self.actor.sess) + self.steps_since_update -= ( + self.config["target_network_update_freq"]) + self.num_target_updates += 1 + + mean_100ep_reward = 0.0 + mean_100ep_length = 0.0 + num_episodes = 0 + buffer_size_sum = 0 + stats = ray.get( + [w.stats.remote(self.cur_timestep) for w in self.workers]) + for stat in stats: + mean_100ep_reward += stat[0] + mean_100ep_length += stat[1] + num_episodes += stat[2] + exploration = stat[3] + buffer_size_sum += stat[4] + set_weights_time = stat[5] + sample_time = stat[6] + grad_time = stat[7] + mean_100ep_reward /= self.config["num_workers"] + mean_100ep_length /= self.config["num_workers"] + + info = [ + ("mean_100ep_reward", mean_100ep_reward), + ("exploration_frac", exploration), + ("steps", self.cur_timestep), + ("episodes", num_episodes), + ("buffer_sizes_sum", buffer_size_sum), + ("target_updates", self.num_target_updates), + ("mean_set_weights_time", set_weights_time), + ("mean_sample_time", sample_time), + ("mean_grad_time", grad_time), + ("mean_apply_time", float(apply_time.mean)), + ("mean_ray_wait_time", float(wait_time.mean)), + ("gradient_lag_mean", float(gradient_lag.mean)), + ("gradient_lag_stdev", float(gradient_lag.std)), + ] + + for k, v in info: + logger.record_tabular(k, v) + logger.dump_tabular() + + result = TrainingResult( + episode_reward_mean=mean_100ep_reward, + episode_len_mean=mean_100ep_length, + timesteps_this_iter=self.cur_timestep - iter_init_timesteps, + info=info) + + return result + + def _train_sync(self): config = self.config sample_time, sync_time, learn_time, apply_time = 0, 0, 0, 0 iter_init_timesteps = self.cur_timestep num_loop_iters = 0 - steps_per_iter = config["sample_batch_size"] * len(self.workers) while (self.cur_timestep - iter_init_timesteps < config["timesteps_per_iteration"]): dt = time.time() - ray.get([ - w.do_steps.remote( - config["sample_batch_size"], self.cur_timestep) - for w in self.workers]) + if self.workers: + worker_steps = ray.get([ + w.do_steps.remote( + config["sample_batch_size"] // len(self.workers), + self.cur_timestep, store=False) + for w in self.workers]) + for steps in worker_steps: + for obs, action, rew, new_obs, done in steps: + self.actor.replay_buffer.add( + obs, action, rew, new_obs, done) + else: + self.actor.do_steps( + config["sample_batch_size"], self.cur_timestep, store=True) num_loop_iters += 1 - self.cur_timestep += steps_per_iter - self.steps_since_update += steps_per_iter + self.cur_timestep += config["sample_batch_size"] + self.steps_since_update += config["sample_batch_size"] sample_time += time.time() - dt if self.cur_timestep > config["learning_starts"]: + if config["multi_gpu_optimize"]: + dt = time.time() + times = self.actor.do_multi_gpu_optimize(self.cur_timestep) + if num_loop_iters <= 1: + print("Multi-GPU times", times) + learn_time += (time.time() - dt) + else: + # Minimize the error in Bellman's equation on a batch + # sampled from replay buffer. + for _ in range( + max(1, config["train_batch_size"] // + config["sgd_batch_size"])): + dt = time.time() + gradients = [ + self.actor.sample_buffer_gradient( + self.cur_timestep)] + learn_time += (time.time() - dt) + dt = time.time() + for grad in gradients: + self.actor.apply_gradients(grad) + apply_time += (time.time() - dt) dt = time.time() - # Minimize the error in Bellman's equation on a batch sampled - # from replay buffer. self._update_worker_weights() sync_time += (time.time() - dt) - dt = time.time() - gradients = ray.get( - [w.get_gradient.remote(self.cur_timestep) - for w in self.workers]) - learn_time += (time.time() - dt) - dt = time.time() - for grad in gradients: - self.actor.apply_gradients(grad) - apply_time += (time.time() - dt) if (self.cur_timestep > config["learning_starts"] and self.steps_since_update > config["target_network_update_freq"]): - self.actor.dqn_graph.update_target(self.actor.sess) # Update target network periodically. - self._update_worker_weights() + self.actor.dqn_graph.update_target(self.actor.sess) self.steps_since_update -= config["target_network_update_freq"] self.num_target_updates += 1 @@ -315,14 +553,21 @@ class DQNAgent(Agent): mean_100ep_length = 0.0 num_episodes = 0 buffer_size_sum = 0 + if not self.workers: + stats = self.actor.stats(self.cur_timestep) + mean_100ep_reward += stats[0] + mean_100ep_length += stats[1] + num_episodes += stats[2] + exploration = stats[3] + buffer_size_sum += stats[4] for mean_rew, mean_len, episodes, exploration, buf_sz in ray.get( [w.stats.remote(self.cur_timestep) for w in self.workers]): mean_100ep_reward += mean_rew mean_100ep_length += mean_len num_episodes += episodes buffer_size_sum += buf_sz - mean_100ep_reward /= len(self.workers) - mean_100ep_length /= len(self.workers) + mean_100ep_reward /= config["num_workers"] + mean_100ep_length /= config["num_workers"] info = [ ("mean_100ep_reward", mean_100ep_reward), @@ -336,10 +581,11 @@ class DQNAgent(Agent): ("apply_time", apply_time), ("learn_time", learn_time), ("samples_per_s", - num_loop_iters * np.float64(steps_per_iter) / sample_time), + num_loop_iters * np.float64(config["sample_batch_size"]) / + sample_time), ("learn_samples_per_s", - num_loop_iters * np.float64(config["train_batch_size"]) * - np.float64(config["num_workers"]) / learn_time), + num_loop_iters * np.float64(config["train_batch_size"]) / + learn_time), ] for k, v in info: diff --git a/python/ray/rllib/dqn/models.py b/python/ray/rllib/dqn/models.py index 28fe422b9..8a6692198 100644 --- a/python/ray/rllib/dqn/models.py +++ b/python/ray/rllib/dqn/models.py @@ -6,6 +6,7 @@ import tensorflow as tf import tensorflow.contrib.layers as layers from ray.rllib.models import ModelCatalog +from ray.rllib.parallel import LocalSyncParallelOptimizer, TOWER_SCOPE_NAME def _build_q_network(inputs, num_actions, config): @@ -97,8 +98,56 @@ def _scope_vars(scope, trainable_only=False): scope=scope if isinstance(scope, str) else scope.name) +class ModelAndLoss(object): + """Holds the model and loss function. + + Both graphs are necessary in order for the multi-gpu SGD implementation + to create towers on each device. + """ + + def __init__( + self, num_actions, config, + obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): + # q network evaluation + with tf.variable_scope("q_func", reuse=True): + self.q_t = _build_q_network(obs_t, num_actions, config) + + # target q network evalution + with tf.variable_scope("target_q_func") as scope: + self.q_tp1 = _build_q_network(obs_tp1, num_actions, config) + self.target_q_func_vars = _scope_vars(scope.name) + + # q scores for actions which we know were selected in the given state. + q_t_selected = tf.reduce_sum( + self.q_t * tf.one_hot(act_t, num_actions), 1) + + # compute estimate of best possible value starting from state at t + 1 + if config["double_q"]: + with tf.variable_scope("q_func", reuse=True): + q_tp1_using_online_net = _build_q_network( + obs_tp1, num_actions, config) + q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) + q_tp1_best = tf.reduce_sum( + self.q_tp1 * tf.one_hot( + q_tp1_best_using_online_net, num_actions), 1) + else: + q_tp1_best = tf.reduce_max(self.q_tp1, 1) + q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = rew_t + config["gamma"] * q_tp1_best_masked + + # compute the error (potentially clipped) + self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) + errors = _huber_loss(self.td_error) + + weighted_error = tf.reduce_mean(importance_weights * errors) + + self.loss = weighted_error + + class DQNGraph(object): - def __init__(self, env, config): + def __init__(self, env, config, logdir): self.env = env num_actions = env.action_space.n optimizer = tf.train.AdamOptimizer(learning_rate=config["lr"]) @@ -110,7 +159,11 @@ class DQNGraph(object): tf.float32, shape=(None,) + env.observation_space.shape) # Action Q network - with tf.variable_scope("q_func") as scope: + if config["multi_gpu_optimize"]: + q_scope_name = TOWER_SCOPE_NAME + "/q_func" + else: + q_scope_name = "q_func" + with tf.variable_scope(q_scope_name) as scope: q_values = _build_q_network( self.cur_observations, num_actions, config) q_func_vars = _scope_vars(scope.name) @@ -134,39 +187,34 @@ class DQNGraph(object): self.importance_weights = tf.placeholder( tf.float32, [None], name="weight") - # q network evaluation - with tf.variable_scope("q_func", reuse=True): - self.q_t = _build_q_network(self.obs_t, num_actions, config) + def build_loss( + obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): + return ModelAndLoss( + num_actions, config, + obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights) - # target q network evalution - with tf.variable_scope("target_q_func") as scope: - self.q_tp1 = _build_q_network(self.obs_tp1, num_actions, config) - target_q_func_vars = _scope_vars(scope.name) - - # q scores for actions which we know were selected in the given state. - q_t_selected = tf.reduce_sum( - self.q_t * tf.one_hot(self.act_t, num_actions), 1) - - # compute estimate of best possible value starting from state at t + 1 - if config["double_q"]: - with tf.variable_scope("q_func", reuse=True): - q_tp1_using_online_net = _build_q_network( - self.obs_tp1, num_actions, config) - q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) - q_tp1_best = tf.reduce_sum( - self.q_tp1 * tf.one_hot( - q_tp1_best_using_online_net, num_actions), 1) + if config["multi_gpu_optimize"]: + self.multi_gpu_optimizer = LocalSyncParallelOptimizer( + optimizer, + config["devices"], + [self.obs_t, self.act_t, self.rew_t, self.obs_tp1, + self.done_mask, self.importance_weights], + int(config["sgd_batch_size"] / len(config["devices"])), + build_loss, + logdir, + grad_norm_clipping=config["grad_norm_clipping"]) + loss_obj = self.multi_gpu_optimizer.get_common_loss() else: - q_tp1_best = tf.reduce_max(self.q_tp1, 1) - q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best + loss_obj = build_loss( + self.obs_t, self.act_t, self.rew_t, self.obs_tp1, + self.done_mask, self.importance_weights) - # compute RHS of bellman equation - q_t_selected_target = self.rew_t + config["gamma"] * q_tp1_best_masked + weighted_error = loss_obj.loss + target_q_func_vars = loss_obj.target_q_func_vars + self.q_t = loss_obj.q_t + self.q_tp1 = loss_obj.q_tp1 + self.td_error = loss_obj.td_error - # compute the error (potentially clipped) - self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) - errors = _huber_loss(self.td_error) - weighted_error = tf.reduce_mean(self.importance_weights * errors) # compute optimization op (potentially with gradient clipping) if config["grad_norm_clipping"] is not None: self.grads_and_vars = _minimize_and_clip( @@ -216,6 +264,21 @@ class DQNGraph(object): }) return td_err, grads + def compute_td_error( + self, sess, obs_t, act_t, rew_t, obs_tp1, done_mask, + importance_weights): + td_err = sess.run( + self.td_error, + feed_dict={ + self.obs_t: obs_t, + self.act_t: act_t, + self.rew_t: rew_t, + self.obs_tp1: obs_tp1, + self.done_mask: done_mask, + self.importance_weights: importance_weights + }) + return td_err + def apply_gradients(self, sess, grads): assert len(grads) == len(self.grads_and_vars) feed_dict = {ph: g for (g, ph) in zip(grads, self.grads)} diff --git a/python/ray/rllib/dqn/test/example-cartpole.py b/python/ray/rllib/dqn/test/example-cartpole.py deleted file mode 100755 index 080ab5f79..000000000 --- a/python/ray/rllib/dqn/test/example-cartpole.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse - -import ray -from ray.rllib.dqn import DQNAgent, DEFAULT_CONFIG - - -def main(): - parser = argparse.ArgumentParser(description="Run the DQN algorithm.") - parser.add_argument("--iterations", default=-1, type=int, - help="The number of training iterations to run.") - - args = parser.parse_args() - - config = DEFAULT_CONFIG.copy() - config.update(dict( - lr=1e-3, - schedule_max_timesteps=100000, - exploration_fraction=0.1, - exploration_final_eps=0.02, - dueling=False, - hiddens=[], - model_config=dict( - fcnet_hiddens=[64], - fcnet_activation='relu', - ))) - - ray.init() - dqn = DQNAgent("CartPole-v0", config) - - iteration = 0 - while iteration != args.iterations: - iteration += 1 - res = dqn.train() - print("current status: {}".format(res)) - - -if __name__ == "__main__": - main() diff --git a/python/ray/rllib/dqn/test/example-pong.py b/python/ray/rllib/dqn/test/example-pong.py deleted file mode 100755 index 24afde4a7..000000000 --- a/python/ray/rllib/dqn/test/example-pong.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse - -import ray -from ray.rllib.dqn import DQNAgent, DEFAULT_CONFIG - - -def main(): - parser = argparse.ArgumentParser(description="Run the DQN algorithm.") - parser.add_argument("--iterations", default=-1, type=int, - help="The number of training iterations to run.") - - args = parser.parse_args() - - config = DEFAULT_CONFIG.copy() - config.update(dict( - lr=1e-4, - schedule_max_timesteps=2000000, - exploration_fraction=0.1, - exploration_final_eps=0.01, - train_freq=4, - learning_starts=10000, - target_network_update_freq=1000, - gamma=0.99, - prioritized_replay=True)) - - ray.init() - dqn = DQNAgent("PongNoFrameskip-v4", config) - - iteration = 0 - while iteration != args.iterations: - iteration += 1 - res = dqn.train() - print("current status: {}".format(res)) - - -if __name__ == "__main__": - main() diff --git a/python/ray/rllib/parallel.py b/python/ray/rllib/parallel.py index 5deff3c12..e2281f663 100644 --- a/python/ray/rllib/parallel.py +++ b/python/ray/rllib/parallel.py @@ -9,6 +9,10 @@ from tensorflow.python.client import timeline import tensorflow as tf +# Variable scope in which created variables will be placed under +TOWER_SCOPE_NAME = "tower" + + class LocalSyncParallelOptimizer(object): """Optimizer that runs in parallel across multiple local devices. @@ -41,10 +45,12 @@ class LocalSyncParallelOptimizer(object): object with a 'loss' property that is a scalar Tensor. For example, ray.rllib.ppo.ProximalPolicyLoss. logdir: Directory to place debugging output in. + grad_norm_clipping: None or int stdev to clip grad norms by """ def __init__(self, optimizer, devices, input_placeholders, - per_device_batch_size, build_loss, logdir): + per_device_batch_size, build_loss, logdir, + grad_norm_clipping=None): self.optimizer = optimizer self.devices = devices self.batch_size = per_device_batch_size * len(devices) @@ -54,7 +60,7 @@ class LocalSyncParallelOptimizer(object): self.logdir = logdir # First initialize the shared loss network - with tf.variable_scope("tower"): + with tf.variable_scope(TOWER_SCOPE_NAME): self._shared_loss = build_loss(*input_placeholders) # Then setup the per-device loss graphs that use the shared weights @@ -67,6 +73,10 @@ class LocalSyncParallelOptimizer(object): device_placeholders)) avg = average_gradients([t.grads for t in self._towers]) + if grad_norm_clipping: + for i, (grad, var) in enumerate(avg): + if grad is not None: + avg[i] = (tf.clip_by_norm(grad, grad_norm_clipping), var) self._train_op = self.optimizer.apply_gradients(avg) def load_data(self, sess, inputs, full_trace=False): @@ -173,7 +183,7 @@ class LocalSyncParallelOptimizer(object): def _setup_device(self, device, device_input_placeholders): with tf.device(device): - with tf.variable_scope("tower", reuse=True): + with tf.variable_scope(TOWER_SCOPE_NAME, reuse=True): device_input_batches = [] device_input_slices = [] for ph in device_input_placeholders: @@ -239,6 +249,9 @@ def average_gradients(tower_grads): # below. grads.append(expanded_g) + if not grads: + continue + # Average over the 'tower' dimension. grad = tf.concat(axis=0, values=grads) grad = tf.reduce_mean(grad, 0) diff --git a/python/ray/rllib/tuned_examples/pong-dqn.yaml b/python/ray/rllib/tuned_examples/pong-dqn.yaml index 015698835..7a43853ab 100644 --- a/python/ray/rllib/tuned_examples/pong-dqn.yaml +++ b/python/ray/rllib/tuned_examples/pong-dqn.yaml @@ -1,4 +1,4 @@ -pong-dqn: +pong-deterministic-dqn: env: PongDeterministic-v4 alg: DQN resources: @@ -7,3 +7,50 @@ pong-dqn: stop: episode_reward_mean: 20 time_total_s: 7200 + config: + gamma: 0.99 + lr: .0001 + learning_starts: 10000 + buffer_size: 50000 + sample_batch_size: 4 + train_batch_size: 32 + schedule_max_timesteps: 2000000 + exploration_final_eps: .01 + exploration_fraction: .1 + model: + grayscale: True + zero_mean: False + dim: 42 + conv_filters: [ + [16, [4, 4], 2], + [32, [4, 4], 2], + [512, [11, 11], 1], + ] +pong-noframeskip-dqn: + env: PongNoFrameskip-v4 + alg: DQN + resources: + cpu: 1 + gpu: 1 + stop: + episode_reward_mean: 20 + time_total_s: 7200 + config: + gamma: 0.99 + lr: .0001 + learning_starts: 10000 + buffer_size: 50000 + sample_batch_size: 4 + train_batch_size: 32 + schedule_max_timesteps: 2000000 + exploration_final_eps: .01 + exploration_fraction: .1 + model: + grayscale: True + zero_mean: False + dim: 42 + conv_filters: [ + [16, [4, 4], 2], + [32, [4, 4], 2], + [512, [11, 11], 1], + ]