diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index 77794ea16..5b9726b94 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -10,6 +10,7 @@ import pickle import os import tensorflow as tf +import ray from ray.rllib.common import Agent, TrainingResult from ray.rllib.dqn import logger, models from ray.rllib.dqn.common.atari_wrappers_deprecated \ @@ -41,17 +42,15 @@ from ray.rllib.dqn.replay_buffer import ReplayBuffer, PrioritizedReplayBuffer annealed exploration_final_eps: float final value of random action probability - train_freq: int - update the model every `train_freq` steps. - batch_size: int + sample_batch_size: int + update the replay buffer with this many samples at once + num_workers: int + the number of workers to use for parallel batch sample collection + train_batch_size: int size of a batched sampled from replay buffer for training print_freq: int how often to print out training progress set to None to disable printing - checkpoint_freq: int - how often to save the model. This is so that the best version is - restored at the end of the training. If you do not wish to restore - the best version at the end of the training set this variable to None. learning_starts: int how many steps of the model to collect transitions for before learning starts @@ -80,16 +79,17 @@ DEFAULT_CONFIG = dict( double_q=True, hiddens=[256], model={}, + gpu_offset=0, lr=5e-4, schedule_max_timesteps=100000, timesteps_per_iteration=1000, buffer_size=50000, exploration_fraction=0.1, exploration_final_eps=0.02, - train_freq=1, - batch_size=32, + sample_batch_size=1, + num_workers=1, + train_batch_size=32, print_freq=1, - checkpoint_freq=10000, learning_starts=1000, gamma=1.0, grad_norm_clipping=10, @@ -102,22 +102,14 @@ DEFAULT_CONFIG = dict( num_cpu=16) -class DQNAgent(Agent): - def __init__(self, env_name, config, upload_dir=None): - config.update({"alg": "DQN"}) - - Agent.__init__(self, env_name, config, upload_dir=upload_dir) - - with tf.Graph().as_default(): - self._init() - - def _init(self): - config = self.config - env = gym.make(self.env_name) +class Actor(object): + def __init__(self, env_name, config, logdir): + env = gym.make(env_name) # TODO(ekl): replace this with RLlib preprocessors - if "NoFrameskip" in self.env_name: + if "NoFrameskip" in env_name: env = ScaledFloatFrame(wrap_dqn(env)) self.env = env + self.config = config num_cpu = config["num_cpu"] tf_config = tf.ConfigProto( @@ -131,11 +123,11 @@ class DQNAgent(Agent): self.replay_buffer = PrioritizedReplayBuffer( config["buffer_size"], alpha=config["prioritized_replay_alpha"]) - prioritized_replay_beta_iters = ( - config["prioritized_replay_beta_iters"]) + prioritized_replay_beta_iters = \ + config["prioritized_replay_beta_iters"] if prioritized_replay_beta_iters is None: - prioritized_replay_beta_iters = ( - config["schedule_max_timesteps"]) + prioritized_replay_beta_iters = \ + config["schedule_max_timesteps"] self.beta_schedule = LinearSchedule( prioritized_replay_beta_iters, initial_p=config["prioritized_replay_beta0"], @@ -154,135 +146,245 @@ class DQNAgent(Agent): # Initialize the parameters and copy them to the target network. self.sess.run(tf.global_variables_initializer()) self.dqn_graph.update_target(self.sess) + self.variables = ray.experimental.TensorFlowVariables( + tf.group(self.dqn_graph.q_tp1, self.dqn_graph.q_t), self.sess) self.episode_rewards = [0.0] self.episode_lengths = [0.0] self.saved_mean_reward = None self.obs = self.env.reset() - self.num_timesteps = 0 + self.file_writer = tf.summary.FileWriter(logdir, self.sess.graph) + + def step(self, cur_timestep): + # Take action and update exploration to the newest value + action = self.dqn_graph.act( + self.sess, np.array(self.obs)[None], + self.exploration.value(cur_timestep))[0] + new_obs, rew, done, _ = self.env.step(action) + ret = (self.obs, action, rew, new_obs, float(done)) + self.obs = new_obs + self.episode_rewards[-1] += rew + self.episode_lengths[-1] += 1 + if done: + self.obs = self.env.reset() + self.episode_rewards.append(0.0) + self.episode_lengths.append(0.0) + return ret + + def do_steps(self, num_steps, cur_timestep): + for _ in range(num_steps): + obs, action, rew, new_obs, done = self.step(cur_timestep) + self.replay_buffer.add(obs, action, rew, new_obs, done) + + def get_gradient(self, cur_timestep): + if self.config["prioritized_replay"]: + experience = self.replay_buffer.sample( + self.config["train_batch_size"], + beta=self.beta_schedule.value(cur_timestep)) + (obses_t, actions, rewards, obses_tp1, + dones, _, batch_idxes) = experience + else: + obses_t, actions, rewards, obses_tp1, dones = \ + self.replay_buffer.sample(self.config["train_batch_size"]) + batch_idxes = None + td_errors, grad = self.dqn_graph.compute_gradients( + self.sess, obses_t, actions, rewards, obses_tp1, dones, + np.ones_like(rewards)) + if self.config["prioritized_replay"]: + new_priorities = ( + np.abs(td_errors) + self.config["prioritized_replay_eps"]) + self.replay_buffer.update_priorities( + batch_idxes, new_priorities) + return grad + + def apply_gradients(self, grad): + self.dqn_graph.apply_gradients(self.sess, grad) + + def stats(self, num_timesteps): + mean_100ep_reward = round(np.mean(self.episode_rewards[-101:-1]), 1) + mean_100ep_length = round(np.mean(self.episode_lengths[-101:-1]), 1) + exploration = self.exploration.value(num_timesteps) + return ( + mean_100ep_reward, + mean_100ep_length, + len(self.episode_rewards), + exploration, + len(self.replay_buffer)) + + def get_weights(self): + return self.variables.get_weights() + + def set_weights(self, weights): + self.variables.set_weights(weights) + + def save(self): + return [ + self.beta_schedule, + self.exploration, + self.episode_rewards, + self.episode_lengths, + self.saved_mean_reward, + self.obs] + + def restore(self, data): + self.beta_schedule = data[0] + self.exploration = data[1] + self.episode_rewards = data[2] + self.episode_lengths = data[3] + self.saved_mean_reward = data[4] + self.obs = data[5] + + +@ray.remote +class RemoteActor(Actor): + def __init__(self, env_name, config, logdir, gpu_mask): + os.environ["CUDA_VISIBLE_DEVICES"] = gpu_mask + Actor.__init__(self, env_name, config, logdir) + + +class DQNAgent(Agent): + def __init__(self, env_name, config, upload_dir=None): + config.update({"alg": "DQN"}) + + Agent.__init__(self, env_name, config, upload_dir=upload_dir) + + with tf.Graph().as_default(): + self._init(config, env_name) + + def _init(self, config, env_name): + self.actor = Actor(env_name, config, self.logdir) + self.workers = [ + RemoteActor.remote( + env_name, config, self.logdir, + "{}".format(i + config["gpu_offset"])) + for i in range(config["num_workers"])] + + self.cur_timestep = 0 self.num_iterations = 0 - self.file_writer = tf.summary.FileWriter(self.logdir, self.sess.graph) + self.num_target_updates = 0 + self.steps_since_update = 0 + self.file_writer = tf.summary.FileWriter( + self.logdir, self.actor.sess.graph) self.saver = tf.train.Saver(max_to_keep=None) + def _update_worker_weights(self): + w = self.actor.get_weights() + weights = ray.put(self.actor.get_weights()) + for w in self.workers: + w.set_weights.remote(weights) + def _train(self): config = self.config - sample_time, learn_time = 0, 0 - iter_init_timesteps = self.num_timesteps + sample_time, sync_time, learn_time, apply_time = 0, 0, 0, 0 + iter_init_timesteps = self.cur_timestep - for _ in range(config["timesteps_per_iteration"]): - self.num_timesteps += 1 + num_loop_iters = 0 + steps_per_iter = config["sample_batch_size"] * len(self.workers) + while (self.cur_timestep - iter_init_timesteps < + config["timesteps_per_iteration"]): dt = time.time() - # Take action and update exploration to the newest value - action = self.dqn_graph.act( - self.sess, np.array(self.obs)[None], - self.exploration.value(self.num_timesteps))[0] - new_obs, rew, done, _ = self.env.step(action) - # Store transition in the replay buffer. - self.replay_buffer.add(self.obs, action, rew, new_obs, float(done)) - self.obs = new_obs - - self.episode_rewards[-1] += rew - self.episode_lengths[-1] += 1 - if done: - self.obs = self.env.reset() - self.episode_rewards.append(0.0) - self.episode_lengths.append(0.0) + ray.get([ + w.do_steps.remote( + config["sample_batch_size"], self.cur_timestep) + for w in self.workers]) + num_loop_iters += 1 + self.cur_timestep += steps_per_iter + self.steps_since_update += steps_per_iter sample_time += time.time() - dt - if self.num_timesteps > config["learning_starts"] and \ - self.num_timesteps % config["train_freq"] == 0: + if self.cur_timestep > config["learning_starts"]: dt = time.time() # Minimize the error in Bellman's equation on a batch sampled # from replay buffer. - if config["prioritized_replay"]: - experience = self.replay_buffer.sample( - config["batch_size"], - beta=self.beta_schedule.value(self.num_timesteps)) - (obses_t, actions, rewards, obses_tp1, - dones, _, batch_idxes) = experience - else: - obses_t, actions, rewards, obses_tp1, dones = ( - self.replay_buffer.sample(config["batch_size"])) - batch_idxes = None - td_errors = self.dqn_graph.train( - self.sess, obses_t, actions, rewards, obses_tp1, dones, - np.ones_like(rewards)) - if config["prioritized_replay"]: - new_priorities = np.abs(td_errors) + ( - config["prioritized_replay_eps"]) - self.replay_buffer.update_priorities( - batch_idxes, new_priorities) + self._update_worker_weights() + sync_time += (time.time() - dt) + dt = time.time() + gradients = ray.get( + [w.get_gradient.remote(self.cur_timestep) + for w in self.workers]) learn_time += (time.time() - dt) + dt = time.time() + for grad in gradients: + self.actor.apply_gradients(grad) + apply_time += (time.time() - dt) - if self.num_timesteps > config["learning_starts"] and ( - self.num_timesteps % - config["target_network_update_freq"] == 0): + if (self.cur_timestep > config["learning_starts"] and + self.steps_since_update > + config["target_network_update_freq"]): + self.actor.dqn_graph.update_target(self.actor.sess) # Update target network periodically. - self.dqn_graph.update_target(self.sess) + self._update_worker_weights() + self.steps_since_update -= config["target_network_update_freq"] + self.num_target_updates += 1 - mean_100ep_reward = round(np.mean(self.episode_rewards[-101:-1]), 1) - mean_100ep_length = round(np.mean(self.episode_lengths[-101:-1]), 1) - num_episodes = len(self.episode_rewards) + mean_100ep_reward = 0.0 + mean_100ep_length = 0.0 + num_episodes = 0 + buffer_size_sum = 0 + for mean_rew, mean_len, episodes, exploration, buf_sz in ray.get( + [w.stats.remote(self.cur_timestep) for w in self.workers]): + mean_100ep_reward += mean_rew + mean_100ep_length += mean_len + num_episodes += episodes + buffer_size_sum += buf_sz + mean_100ep_reward /= len(self.workers) + mean_100ep_length /= len(self.workers) - info = { - "sample_time": sample_time, - "learn_time": learn_time, - "steps": self.num_timesteps, - "episodes": num_episodes, - "exploration": int( - 100 * self.exploration.value(self.num_timesteps)) - } + info = [ + ("mean_100ep_reward", mean_100ep_reward), + ("exploration_frac", exploration), + ("steps", self.cur_timestep), + ("episodes", num_episodes), + ("buffer_sizes_sum", buffer_size_sum), + ("target_updates", self.num_target_updates), + ("sample_time", sample_time), + ("weight_sync_time", sync_time), + ("apply_time", apply_time), + ("learn_time", learn_time), + ("samples_per_s", + num_loop_iters * np.float64(steps_per_iter) / sample_time), + ("learn_samples_per_s", + num_loop_iters * np.float64(config["train_batch_size"]) * + np.float64(config["num_workers"]) / learn_time), + ] - logger.record_tabular("sample_time", sample_time) - logger.record_tabular("learn_time", learn_time) - logger.record_tabular("steps", self.num_timesteps) - logger.record_tabular("buffer_size", len(self.replay_buffer)) - logger.record_tabular("episodes", num_episodes) - logger.record_tabular("mean 100 episode reward", mean_100ep_reward) - logger.record_tabular( - "% time spent exploring", - int(100 * self.exploration.value(self.num_timesteps))) + for k, v in info: + logger.record_tabular(k, v) logger.dump_tabular() result = TrainingResult( episode_reward_mean=mean_100ep_reward, episode_len_mean=mean_100ep_length, - timesteps_this_iter=self.num_timesteps - iter_init_timesteps, + timesteps_this_iter=self.cur_timestep - iter_init_timesteps, info=info) return result def _save(self): checkpoint_path = self.saver.save( - self.sess, + self.actor.sess, os.path.join(self.logdir, "checkpoint"), global_step=self.num_iterations) extra_data = [ + self.actor.save(), self.replay_buffer, - self.beta_schedule, - self.exploration, - self.episode_rewards, - self.episode_lengths, - self.saved_mean_reward, - self.obs, - self.num_timesteps, - self.num_iterations] + self.cur_timestep, + self.num_iterations, + self.num_target_updates, + self.steps_since_update] pickle.dump(extra_data, open(checkpoint_path + ".extra_data", "wb")) return checkpoint_path def _restore(self, checkpoint_path): self.saver.restore(self.sess, checkpoint_path) extra_data = pickle.load(open(checkpoint_path + ".extra_data", "rb")) - self.replay_buffer = extra_data[0] - self.beta_schedule = extra_data[1] - self.exploration = extra_data[2] - self.episode_rewards = extra_data[3] - self.episode_lengths = extra_data[4] - self.saved_mean_reward = extra_data[5] - self.obs = extra_data[6] - self.num_timesteps = extra_data[7] - self.num_iterations = extra_data[8] + self.actor.restore(extra_data[0]) + self.replay_buffer = extra_data[1] + self.cur_timestep = extra_data[2] + self.num_iterations = extra_data[3] + self.num_target_updates = extra_data[4] + self.steps_since_update = extra_data[5] def compute_action(self, observation): - return self.dqn_graph.act( - self.sess, np.array(observation)[None], 0.0)[0] + return self.actor.dqn_graph.act( + self.actor.sess, np.array(observation)[None], 0.0)[0] diff --git a/python/ray/rllib/dqn/models.py b/python/ray/rllib/dqn/models.py index a4e28381f..f2bc94ff7 100644 --- a/python/ray/rllib/dqn/models.py +++ b/python/ray/rllib/dqn/models.py @@ -70,7 +70,7 @@ def _minimize_and_clip(optimizer, objective, var_list, clip_val=10): for i, (grad, var) in enumerate(gradients): if grad is not None: gradients[i] = (tf.clip_by_norm(grad, clip_val), var) - return optimizer.apply_gradients(gradients) + return gradients def _scope_vars(scope, trainable_only=False): @@ -169,12 +169,16 @@ class DQNGraph(object): weighted_error = tf.reduce_mean(self.importance_weights * errors) # compute optimization op (potentially with gradient clipping) if config["grad_norm_clipping"] is not None: - self.optimize_expr = _minimize_and_clip( + self.grads_and_vars = _minimize_and_clip( optimizer, weighted_error, var_list=q_func_vars, clip_val=config["grad_norm_clipping"]) else: - self.optimize_expr = optimizer.minimize( + self.grads_and_vars = optimizer.compute_gradients( weighted_error, var_list=q_func_vars) + self.grads_and_vars = [ + (g, v) for (g, v) in self.grads_and_vars if g is not None] + self.grads = [g for (g, v) in self.grads_and_vars] + self.train_expr = optimizer.apply_gradients(self.grads_and_vars) # update_target_fn will be called periodically to copy Q network to # target Q network @@ -197,11 +201,11 @@ class DQNGraph(object): self.eps: eps, }) - def train( + def compute_gradients( self, sess, obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): - td_err, _ = sess.run( - [self.td_error, self.optimize_expr], + td_err, grads = sess.run( + [self.td_error, self.grads], feed_dict={ self.obs_t: obs_t, self.act_t: act_t, @@ -210,4 +214,9 @@ class DQNGraph(object): self.done_mask: done_mask, self.importance_weights: importance_weights }) - return td_err + return td_err, grads + + def apply_gradients(self, sess, grads): + assert len(grads) == len(self.grads_and_vars) + feed_dict = {ph: g for (g, ph) in zip(grads, self.grads)} + sess.run(self.train_expr, feed_dict=feed_dict) diff --git a/python/ray/rllib/dqn/test/example-cartpole.py b/python/ray/rllib/dqn/test/example-cartpole.py new file mode 100755 index 000000000..080ab5f79 --- /dev/null +++ b/python/ray/rllib/dqn/test/example-cartpole.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse + +import ray +from ray.rllib.dqn import DQNAgent, DEFAULT_CONFIG + + +def main(): + parser = argparse.ArgumentParser(description="Run the DQN algorithm.") + parser.add_argument("--iterations", default=-1, type=int, + help="The number of training iterations to run.") + + args = parser.parse_args() + + config = DEFAULT_CONFIG.copy() + config.update(dict( + lr=1e-3, + schedule_max_timesteps=100000, + exploration_fraction=0.1, + exploration_final_eps=0.02, + dueling=False, + hiddens=[], + model_config=dict( + fcnet_hiddens=[64], + fcnet_activation='relu', + ))) + + ray.init() + dqn = DQNAgent("CartPole-v0", config) + + iteration = 0 + while iteration != args.iterations: + iteration += 1 + res = dqn.train() + print("current status: {}".format(res)) + + +if __name__ == "__main__": + main() diff --git a/python/ray/rllib/dqn/test/example-pong.py b/python/ray/rllib/dqn/test/example-pong.py new file mode 100755 index 000000000..24afde4a7 --- /dev/null +++ b/python/ray/rllib/dqn/test/example-pong.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse + +import ray +from ray.rllib.dqn import DQNAgent, DEFAULT_CONFIG + + +def main(): + parser = argparse.ArgumentParser(description="Run the DQN algorithm.") + parser.add_argument("--iterations", default=-1, type=int, + help="The number of training iterations to run.") + + args = parser.parse_args() + + config = DEFAULT_CONFIG.copy() + config.update(dict( + lr=1e-4, + schedule_max_timesteps=2000000, + exploration_fraction=0.1, + exploration_final_eps=0.01, + train_freq=4, + learning_starts=10000, + target_network_update_freq=1000, + gamma=0.99, + prioritized_replay=True)) + + ray.init() + dqn = DQNAgent("PongNoFrameskip-v4", config) + + iteration = 0 + while iteration != args.iterations: + iteration += 1 + res = dqn.train() + print("current status: {}".format(res)) + + +if __name__ == "__main__": + main() diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index ed50893f4..9bf440c7a 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -96,7 +96,7 @@ docker run --shm-size=10G --memory=10G $DOCKER_SHA \ --env PongNoFrameskip-v4 \ --alg DQN \ --num-iterations 2 \ - --config '{"lr": 1e-4, "schedule_max_timesteps": 2000000, "buffer_size": 10000, "exploration_fraction": 0.1, "exploration_final_eps": 0.01, "train_freq": 4, "learning_starts": 10000, "target_network_update_freq": 1000, "gamma": 0.99, "prioritized_replay": true}' + --config '{"lr": 1e-4, "schedule_max_timesteps": 2000000, "buffer_size": 10000, "exploration_fraction": 0.1, "exploration_final_eps": 0.01, "sample_batch_size": 4, "learning_starts": 10000, "target_network_update_freq": 1000, "gamma": 0.99, "prioritized_replay": true}' docker run --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \