From 15a668dd126664bb507294f4301c6b6d08e16d22 Mon Sep 17 00:00:00 2001 From: alvkao58 Date: Wed, 11 Apr 2018 15:08:39 -0700 Subject: [PATCH] [RLLib] DDPG (#1685) --- python/ray/rllib/__init__.py | 4 +- python/ray/rllib/a3c/shared_model.py | 2 +- python/ray/rllib/a3c/shared_model_lstm.py | 2 +- python/ray/rllib/agent.py | 3 + python/ray/rllib/ddpg/__init__.py | 3 + python/ray/rllib/ddpg/ddpg.py | 112 ++++++++ python/ray/rllib/ddpg/ddpg_evaluator.py | 75 ++++++ python/ray/rllib/ddpg/models.py | 241 ++++++++++++++++++ python/ray/rllib/ddpg/random_process.py | 63 +++++ python/ray/rllib/models/ddpgnet.py | 49 ++++ .../ray/rllib/optimizers/local_sync_replay.py | 8 +- python/ray/rllib/pg/policy.py | 2 +- python/ray/rllib/ppo/ppo.py | 2 +- python/ray/rllib/ppo/ppo_evaluator.py | 2 +- .../regression_tests/pendulum-ddpg.yaml | 10 + python/ray/rllib/utils/sampler.py | 9 +- test/jenkins_tests/run_multi_node_tests.sh | 14 + 17 files changed, 586 insertions(+), 15 deletions(-) create mode 100644 python/ray/rllib/ddpg/__init__.py create mode 100644 python/ray/rllib/ddpg/ddpg.py create mode 100644 python/ray/rllib/ddpg/ddpg_evaluator.py create mode 100644 python/ray/rllib/ddpg/models.py create mode 100644 python/ray/rllib/ddpg/random_process.py create mode 100644 python/ray/rllib/models/ddpgnet.py create mode 100644 python/ray/rllib/tuned_examples/regression_tests/pendulum-ddpg.yaml diff --git a/python/ray/rllib/__init__.py b/python/ray/rllib/__init__.py index 1815c6108..140cf5908 100644 --- a/python/ray/rllib/__init__.py +++ b/python/ray/rllib/__init__.py @@ -8,8 +8,8 @@ from ray.tune.registry import register_trainable def _register_all(): - for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "__fake", - "__sigmoid_fake_data", "__parameter_tuning"]: + for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG", + "__fake", "__sigmoid_fake_data", "__parameter_tuning"]: from ray.rllib.agent import get_agent_class register_trainable(key, get_agent_class(key)) diff --git a/python/ray/rllib/a3c/shared_model.py b/python/ray/rllib/a3c/shared_model.py index fb323b952..8209be159 100644 --- a/python/ray/rllib/a3c/shared_model.py +++ b/python/ray/rllib/a3c/shared_model.py @@ -38,7 +38,7 @@ class SharedModel(TFPolicy): def compute_gradients(self, samples): info = {} feed_dict = { - self.x: samples["observations"], + self.x: samples["obs"], self.ac: samples["actions"], self.adv: samples["advantages"], self.r: samples["value_targets"], diff --git a/python/ray/rllib/a3c/shared_model_lstm.py b/python/ray/rllib/a3c/shared_model_lstm.py index aea1bb65f..37f71e490 100644 --- a/python/ray/rllib/a3c/shared_model_lstm.py +++ b/python/ray/rllib/a3c/shared_model_lstm.py @@ -57,7 +57,7 @@ class SharedModelLSTM(TFPolicy): """ features = samples["features"][0] feed_dict = { - self.x: samples["observations"], + self.x: samples["obs"], self.ac: samples["actions"], self.adv: samples["advantages"], self.r: samples["value_targets"], diff --git a/python/ray/rllib/agent.py b/python/ray/rllib/agent.py index ccf24513c..63f8ecf40 100644 --- a/python/ray/rllib/agent.py +++ b/python/ray/rllib/agent.py @@ -243,6 +243,9 @@ def get_agent_class(alg): elif alg == "PG": from ray.rllib import pg return pg.PGAgent + elif alg == "DDPG": + from ray.rllib import ddpg + return ddpg.DDPGAgent elif alg == "script": from ray.tune import script_runner return script_runner.ScriptRunner diff --git a/python/ray/rllib/ddpg/__init__.py b/python/ray/rllib/ddpg/__init__.py new file mode 100644 index 000000000..004e0f128 --- /dev/null +++ b/python/ray/rllib/ddpg/__init__.py @@ -0,0 +1,3 @@ +from ray.rllib.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG + +__all__ = ["DDPGAgent", "DEFAULT_CONFIG"] diff --git a/python/ray/rllib/ddpg/ddpg.py b/python/ray/rllib/ddpg/ddpg.py new file mode 100644 index 000000000..fc7901383 --- /dev/null +++ b/python/ray/rllib/ddpg/ddpg.py @@ -0,0 +1,112 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + +import ray +from ray.rllib.agent import Agent +from ray.rllib.ddpg.ddpg_evaluator import DDPGEvaluator, RemoteDDPGEvaluator +from ray.rllib.optimizers import LocalSyncReplayOptimizer +from ray.tune.result import TrainingResult + +DEFAULT_CONFIG = { + # Actor learning rate + "actor_lr": 0.0001, + # Critic learning rate + "critic_lr": 0.001, + # Arguments to pass in to env creator + "env_config": {}, + # MDP Discount factor + "gamma": 0.99, + # Number of steps after which the rollout gets cut + "horizon": 500, + + # Whether to include parameter noise + "noise_add": True, + # Linear decay of exploration policy + "noise_epsilon": 0.0002, + # Parameters for noise process + "noise_parameters": { + "mu": 0, + "sigma": 0.2, + "theta": 0.15, + }, + + # Number of local steps taken for each call to sample + "num_local_steps": 1, + # Number of workers (excluding master) + "num_workers": 0, + + "optimizer": { + # Replay buffer size + "buffer_size": 10000, + # Number of steps in warm-up phase before learning starts + "learning_starts": 500, + # Whether to clip rewards + "clip_rewards": False, + # Whether to use prioritized replay + "prioritized_replay": False, + # Size of batch sampled from replay buffer + "train_batch_size": 64, + }, + + # Controls how fast target networks move + "tau": 0.001, + # Number of steps taken per training iteration + "train_steps": 600, +} + + +class DDPGAgent(Agent): + _agent_name = "DDPG" + _default_config = DEFAULT_CONFIG + + def _init(self): + self.local_evaluator = DDPGEvaluator( + self.registry, self.env_creator, self.config) + self.remote_evaluators = [ + RemoteDDPGEvaluator.remote( + self.registry, self.env_creator, self.config) + for _ in range(self.config["num_workers"])] + self.optimizer = LocalSyncReplayOptimizer( + self.config["optimizer"], self.local_evaluator, + self.remote_evaluators) + + def _train(self): + for _ in range(self.config["train_steps"]): + self.optimizer.step() + # update target + if self.optimizer.num_steps_trained > 0: + self.local_evaluator.update_target() + + # generate training result + return self._fetch_metrics() + + def _fetch_metrics(self): + episode_rewards = [] + episode_lengths = [] + if self.config["num_workers"] > 0: + metric_lists = [a.get_completed_rollout_metrics.remote() + for a in self.remote_evaluators] + for metrics in metric_lists: + for episode in ray.get(metrics): + episode_lengths.append(episode.episode_length) + episode_rewards.append(episode.episode_reward) + else: + metrics = self.local_evaluator.get_completed_rollout_metrics() + for episode in metrics: + episode_lengths.append(episode.episode_length) + episode_rewards.append(episode.episode_reward) + + avg_reward = (np.mean(episode_rewards)) + avg_length = (np.mean(episode_lengths)) + timesteps = np.sum(episode_lengths) + + result = TrainingResult( + episode_reward_mean=avg_reward, + episode_len_mean=avg_length, + timesteps_this_iter=timesteps, + info={}) + + return result diff --git a/python/ray/rllib/ddpg/ddpg_evaluator.py b/python/ray/rllib/ddpg/ddpg_evaluator.py new file mode 100644 index 000000000..dda3c3479 --- /dev/null +++ b/python/ray/rllib/ddpg/ddpg_evaluator.py @@ -0,0 +1,75 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + +import ray +from ray.rllib.ddpg.models import DDPGModel +from ray.rllib.models.catalog import ModelCatalog +from ray.rllib.optimizers import PolicyEvaluator +from ray.rllib.utils.filter import NoFilter +from ray.rllib.utils.process_rollout import process_rollout +from ray.rllib.utils.sampler import SyncSampler + + +class DDPGEvaluator(PolicyEvaluator): + + def __init__(self, registry, env_creator, config): + self.env = ModelCatalog.get_preprocessor_as_wrapper( + registry, env_creator(config["env_config"])) + + # contains model, target_model + self.model = DDPGModel(registry, self.env, config) + + self.sampler = SyncSampler( + self.env, self.model.model, NoFilter(), + config["num_local_steps"], horizon=config["horizon"]) + + def sample(self): + """Returns a batch of samples.""" + + rollout = self.sampler.get_data() + rollout.data["weights"] = np.ones_like(rollout.data["rewards"]) + + # since each sample is one step, no discounting needs to be applied; + # this does not involve config["gamma"] + samples = process_rollout( + rollout, NoFilter(), + gamma=1.0, use_gae=False) + + return samples + + def update_target(self): + """Updates target critic and target actor.""" + self.model.update_target() + + def compute_gradients(self, samples): + """Returns critic, actor gradients.""" + return self.model.compute_gradients(samples) + + def apply_gradients(self, grads): + """Applies gradients to evaluator weights.""" + self.model.apply_gradients(grads) + + def compute_apply(self, samples): + grads, _ = self.compute_gradients(samples) + self.apply_gradients(grads) + + def get_weights(self): + """Returns model weights.""" + return self.model.get_weights() + + def set_weights(self, weights): + """Sets model weights.""" + self.model.set_weights(weights) + + def get_completed_rollout_metrics(self): + """Returns metrics on previously completed rollouts. + + Calling this clears the queue of completed rollout metrics. + """ + return self.sampler.get_metrics() + + +RemoteDDPGEvaluator = ray.remote(DDPGEvaluator) diff --git a/python/ray/rllib/ddpg/models.py b/python/ray/rllib/ddpg/models.py new file mode 100644 index 000000000..20a661a97 --- /dev/null +++ b/python/ray/rllib/ddpg/models.py @@ -0,0 +1,241 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import tensorflow as tf + +from ray.experimental.tfutils import TensorFlowVariables +from ray.rllib.models.ddpgnet import DDPGActor, DDPGCritic +from ray.rllib.ddpg.random_process import OrnsteinUhlenbeckProcess + + +class DDPGModel(): + def __init__(self, registry, env, config): + self.config = config + self.sess = tf.Session() + + with tf.variable_scope("model"): + self.model = DDPGActorCritic( + registry, env, self.config, self.sess) + with tf.variable_scope("target_model"): + self.target_model = DDPGActorCritic( + registry, env, self.config, self.sess) + self._setup_gradients() + self._setup_target_updates() + + self.initialize() + self._initialize_target_weights() + + def initialize(self): + self.sess.run(tf.global_variables_initializer()) + + def _initialize_target_weights(self): + """Set initial target weights to match model weights.""" + a_updates = [] + for var, target_var in zip( + self.model.actor_var_list, self.target_model.actor_var_list): + a_updates.append(tf.assign(target_var, var)) + actor_updates = tf.group(*a_updates) + + c_updates = [] + for var, target_var in zip( + self.model.critic_var_list, self.target_model.critic_var_list): + c_updates.append(tf.assign(target_var, var)) + critic_updates = tf.group(*c_updates) + self.sess.run([actor_updates, critic_updates]) + + def _setup_gradients(self): + """Setup critic and actor gradients.""" + self.critic_grads = tf.gradients( + self.model.critic_loss, self.model.critic_var_list) + c_grads_and_vars = list(zip( + self.critic_grads, self.model.critic_var_list)) + c_opt = tf.train.AdamOptimizer(self.config["critic_lr"]) + self._apply_c_gradients = c_opt.apply_gradients(c_grads_and_vars) + + self.actor_grads = tf.gradients( + -self.model.cn_for_loss, self.model.actor_var_list) + a_grads_and_vars = list(zip( + self.actor_grads, self.model.actor_var_list)) + a_opt = tf.train.AdamOptimizer(self.config["actor_lr"]) + self._apply_a_gradients = a_opt.apply_gradients(a_grads_and_vars) + + def compute_gradients(self, samples): + """ Returns gradient w.r.t. samples.""" + # actor gradients + actor_actions = self.sess.run( + self.model.output_action, + feed_dict={self.model.obs: samples["obs"]} + ) + + actor_feed_dict = { + self.model.obs: samples["obs"], + self.model.output_action: actor_actions, + } + self.actor_grads = [g for g in self.actor_grads if g is not None] + actor_grad = self.sess.run(self.actor_grads, feed_dict=actor_feed_dict) + + # feed samples into target actor + target_Q_act = self.sess.run( + self.target_model.output_action, + feed_dict={self.target_model.obs: samples["new_obs"]} + ) + target_Q_dict = { + self.target_model.obs: samples["new_obs"], + self.target_model.act: target_Q_act, + } + + target_Q = self.sess.run( + self.target_model.critic_eval, feed_dict=target_Q_dict) + + # critic gradients + critic_feed_dict = { + self.model.obs: samples["obs"], + self.model.act: samples["actions"], + self.model.reward: samples["rewards"], + self.model.target_Q: target_Q, + } + self.critic_grads = [g for g in self.critic_grads if g is not None] + critic_grad = self.sess.run( + self.critic_grads, feed_dict=critic_feed_dict) + return (critic_grad, actor_grad), {} + + def apply_gradients(self, grads): + """Applies gradients to evaluator weights.""" + c_grads, a_grads = grads + critic_feed_dict = dict(zip(self.critic_grads, c_grads)) + self.sess.run(self._apply_c_gradients, feed_dict=critic_feed_dict) + actor_feed_dict = dict(zip(self.actor_grads, a_grads)) + self.sess.run(self._apply_a_gradients, feed_dict=actor_feed_dict) + + def get_weights(self): + """Returns model weights, target model weights.""" + return self.model.get_weights(), self.target_model.get_weights() + + def set_weights(self, weights): + """Sets model and target model weights.""" + model_weights, target_model_weights = weights + self.model.set_weights(model_weights) + self.target_model.set_weights(target_model_weights) + + def _setup_target_updates(self): + """Set up target actor and critic updates.""" + a_updates = [] + tau = self.config["tau"] + for var, target_var in zip( + self.model.actor_var_list, self.target_model.actor_var_list): + a_updates.append(tf.assign( + target_var, tau * var + (1. - tau) * target_var)) + actor_updates = tf.group(*a_updates) + + c_updates = [] + for var, target_var in zip( + self.model.critic_var_list, self.target_model.critic_var_list): + c_updates.append(tf.assign( + target_var, tau * var + (1. - tau) * target_var)) + critic_updates = tf.group(*c_updates) + self.target_updates = [actor_updates, critic_updates] + + def update_target(self): + """Updates target critic and target actor.""" + self.sess.run(self.target_updates) + + +class DDPGActorCritic(): + other_output = [] + is_recurrent = False + + def __init__(self, registry, env, config, sess): + self.config = config + self.sess = sess + + obs_space = env.observation_space + ac_space = env.action_space + + self.obs_size = int(np.prod(obs_space.shape)) + self.obs = tf.placeholder(tf.float32, [None, self.obs_size]) + self.ac_size = int(np.prod(ac_space.shape)) + self.act = tf.placeholder(tf.float32, [None, self.ac_size]) + self.action_bound = env.action_space.high + # TODO: change action_bound to make more general + + self._setup_actor_network(obs_space, ac_space) + self._setup_critic_network(obs_space, ac_space) + self._setup_critic_loss(ac_space) + + with tf.variable_scope("critic"): + self.critic_var_list = tf.get_collection( + tf.GraphKeys.TRAINABLE_VARIABLES, + tf.get_variable_scope().name + ) + self.critic_vars = TensorFlowVariables(self.critic_loss, + self.sess) + + with tf.variable_scope("actor"): + self.actor_var_list = tf.get_collection( + tf.GraphKeys.TRAINABLE_VARIABLES, + tf.get_variable_scope().name + ) + self.actor_vars = TensorFlowVariables(self.output_action, + self.sess) + + if (self.config["noise_add"]): + params = self.config["noise_parameters"] + self.rand_process = OrnsteinUhlenbeckProcess(size=self.ac_size, + theta=params["theta"], + mu=params["mu"], + sigma=params["sigma"]) + self.epsilon = 1.0 + + def _setup_critic_loss(self, action_space): + """Sets up critic loss.""" + self.target_Q = tf.placeholder(tf.float32, [None, 1], name="target_q") + + # compare critic eval to critic_target (squared loss) + self.reward = tf.placeholder(tf.float32, [None], name="reward") + self.critic_target = tf.expand_dims(self.reward, 1) + \ + self.config['gamma'] * self.target_Q + self.critic_loss = tf.reduce_mean(tf.square( + self.critic_target - self.critic_eval)) + + def _setup_critic_network(self, obs_space, ac_space): + """Sets up Q network.""" + with tf.variable_scope("critic", reuse=tf.AUTO_REUSE): + self.critic_network = DDPGCritic((self.obs, self.act), 1, {}) + self.critic_eval = self.critic_network.outputs + + with tf.variable_scope("critic", reuse=True): + self.cn_for_loss = DDPGCritic( + (self.obs, self.output_action), 1, {}).outputs + + def _setup_actor_network(self, obs_space, ac_space): + """Sets up actor network.""" + with tf.variable_scope("actor", reuse=tf.AUTO_REUSE): + self.actor_network = DDPGActor( + self.obs, self.ac_size, + options={"action_bound": self.action_bound}) + self.output_action = self.actor_network.outputs + + def get_weights(self): + """Returns critic weights, actor weights.""" + return self.critic_vars.get_weights(), self.actor_vars.get_weights() + + def set_weights(self, weights): + """Sets critic and actor weights.""" + critic_weights, actor_weights = weights + self.critic_vars.set_weights(critic_weights) + self.actor_vars.set_weights(actor_weights) + + def compute(self, ob): + """Returns action, given state.""" + flattened_ob = np.reshape(ob, [-1, np.prod(ob.shape)]) + action = self.sess.run(self.output_action, {self.obs: flattened_ob}) + if (self.config["noise_add"]): + action += self.epsilon * self.rand_process.sample() + if (self.epsilon > 0): + self.epsilon -= self.config["noise_epsilon"] + return action[0], {} + + def value(self, *args): + return 0 diff --git a/python/ray/rllib/ddpg/random_process.py b/python/ray/rllib/ddpg/random_process.py new file mode 100644 index 000000000..0a969fd00 --- /dev/null +++ b/python/ray/rllib/ddpg/random_process.py @@ -0,0 +1,63 @@ +# [reference] +# https://github.com/matthiasplappert/keras-rl/blob/master/rl/random.py + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + + +class RandomProcess(object): + def reset_states(self): + pass + + +class AnnealedGaussianProcess(RandomProcess): + def __init__(self, mu, sigma, sigma_min, n_steps_annealing): + self.mu = mu + self.sigma = sigma + self.n_steps = 0 + + if sigma_min is not None: + self.m = -float(sigma - sigma_min) / float(n_steps_annealing) + self.c = sigma + self.sigma_min = sigma_min + else: + self.m = 0. + self.c = sigma + self.sigma_min = sigma + + @property + def current_sigma(self): + sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c) + return sigma + + +# Based on +# http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab +class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess): + def __init__(self, theta, mu=0., sigma=1., dt=1e-2, + x0=None, size=1, sigma_min=None, n_steps_annealing=1000): + super(OrnsteinUhlenbeckProcess, self).__init__( + mu=mu, + sigma=sigma, + sigma_min=sigma_min, + n_steps_annealing=n_steps_annealing) + self.theta = theta + self.mu = mu + self.dt = dt + self.x0 = x0 + self.size = size + self.reset_states() + + def sample(self): + x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \ + self.current_sigma * np.sqrt(self.dt) * \ + np.random.normal(size=self.size) + self.x_prev = x + self.n_steps += 1 + return x + + def reset_states(self): + self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size) diff --git a/python/ray/rllib/models/ddpgnet.py b/python/ray/rllib/models/ddpgnet.py new file mode 100644 index 000000000..b881e6013 --- /dev/null +++ b/python/ray/rllib/models/ddpgnet.py @@ -0,0 +1,49 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import tensorflow.contrib.slim as slim + +from ray.rllib.models.model import Model + + +class DDPGActor(Model): + """Actor network for DDPG.""" + + def _init(self, inputs, num_outputs, options): + w_normal = tf.truncated_normal_initializer() + w_init = tf.random_uniform_initializer(minval=-0.003, maxval=0.003) + ac_bound = options["action_bound"] + + net = slim.fully_connected( + inputs, 400, activation_fn=tf.nn.relu, + weights_initializer=w_normal) + net = slim.fully_connected( + net, 300, activation_fn=tf.nn.relu, weights_initializer=w_normal) + out = slim.fully_connected( + net, num_outputs, activation_fn=tf.nn.tanh, + weights_initializer=w_init) + scaled_out = tf.multiply(out, ac_bound) + return scaled_out, net + + +class DDPGCritic(Model): + """Critic network for DDPG.""" + + def _init(self, inputs, num_outputs, options): + obs, action = inputs + w_normal = tf.truncated_normal_initializer() + w_init = tf.random_uniform_initializer(minval=-0.0003, maxval=0.0003) + net = slim.fully_connected( + obs, 400, activation_fn=tf.nn.relu, weights_initializer=w_normal) + t1 = slim.fully_connected( + net, 300, activation_fn=None, biases_initializer=None, + weights_initializer=w_normal) + t2 = slim.fully_connected( + action, 300, activation_fn=None, weights_initializer=w_normal) + net = tf.nn.relu(tf.add(t1, t2)) + + out = slim.fully_connected( + net, 1, activation_fn=None, weights_initializer=w_init) + return out, net diff --git a/python/ray/rllib/optimizers/local_sync_replay.py b/python/ray/rllib/optimizers/local_sync_replay.py index 30af1f704..5ba8b6f9a 100644 --- a/python/ray/rllib/optimizers/local_sync_replay.py +++ b/python/ray/rllib/optimizers/local_sync_replay.py @@ -85,17 +85,17 @@ class LocalSyncReplayOptimizer(PolicyOptimizer): self.train_batch_size) weights = np.ones_like(rewards) batch_indexes = - np.ones_like(rewards) - samples = SampleBatch({ "obs": obses_t, "actions": actions, "rewards": rewards, "new_obs": obses_tp1, "dones": dones, "weights": weights, "batch_indexes": batch_indexes}) with self.grad_timer: - td_error = self.local_evaluator.compute_apply(samples)["td_error"] - new_priorities = ( - np.abs(td_error) + self.prioritized_replay_eps) + info = self.local_evaluator.compute_apply(samples) if isinstance(self.replay_buffer, PrioritizedReplayBuffer): + td_error = info["td_error"] + new_priorities = ( + np.abs(td_error) + self.prioritized_replay_eps) self.replay_buffer.update_priorities( samples["batch_indexes"], new_priorities) self.grad_timer.push_units_processed(samples.count) diff --git a/python/ray/rllib/pg/policy.py b/python/ray/rllib/pg/policy.py index 18b2cd71c..cc53eebcb 100644 --- a/python/ray/rllib/pg/policy.py +++ b/python/ray/rllib/pg/policy.py @@ -59,7 +59,7 @@ class PGPolicy(): def compute_gradients(self, samples): info = {} feed_dict = { - self.x: samples["observations"], + self.x: samples["obs"], self.ac: samples["actions"], self.adv: samples["advantages"], } diff --git a/python/ray/rllib/ppo/ppo.py b/python/ray/rllib/ppo/ppo.py index 7cbb4569a..784bc2743 100644 --- a/python/ray/rllib/ppo/ppo.py +++ b/python/ray/rllib/ppo/ppo.py @@ -223,7 +223,7 @@ class PPOAgent(Agent): "shuffle_time": shuffle_time, "load_time": load_time, "sgd_time": sgd_time, - "sample_throughput": len(samples["observations"]) / sgd_time + "sample_throughput": len(samples["obs"]) / sgd_time } FilterManager.synchronize( diff --git a/python/ray/rllib/ppo/ppo_evaluator.py b/python/ray/rllib/ppo/ppo_evaluator.py index f012d1c24..434feb094 100644 --- a/python/ray/rllib/ppo/ppo_evaluator.py +++ b/python/ray/rllib/ppo/ppo_evaluator.py @@ -139,7 +139,7 @@ class PPOEvaluator(PolicyEvaluator): dummy = np.zeros_like(trajectories["advantages"]) return self.par_opt.load_data( self.sess, - [trajectories["observations"], + [trajectories["obs"], trajectories["value_targets"] if use_gae else dummy, trajectories["advantages"], trajectories["actions"], diff --git a/python/ray/rllib/tuned_examples/regression_tests/pendulum-ddpg.yaml b/python/ray/rllib/tuned_examples/regression_tests/pendulum-ddpg.yaml new file mode 100644 index 000000000..b25180ff0 --- /dev/null +++ b/python/ray/rllib/tuned_examples/regression_tests/pendulum-ddpg.yaml @@ -0,0 +1,10 @@ +pendulum-ddpg: + env: Pendulum-v0 + run: DDPG + stop: + episode_reward_mean: -100 + time_total_s: 600 + trial_resources: + cpu: 1 + config: + num_workers: 1 diff --git a/python/ray/rllib/utils/sampler.py b/python/ray/rllib/utils/sampler.py index 86be66106..c522806d6 100644 --- a/python/ray/rllib/utils/sampler.py +++ b/python/ray/rllib/utils/sampler.py @@ -20,7 +20,7 @@ class PartialRollout(object): last_r (float): Value of next state. Used for bootstrapping. """ - fields = ["observations", "actions", "rewards", "terminal", "features"] + fields = ["obs", "actions", "rewards", "new_obs", "dones", "features"] def __init__(self, extra_fields=None): """Initializers internals. Maintains a `last_r` field @@ -54,7 +54,7 @@ class PartialRollout(object): Returns: terminal (bool): if rollout has terminated.""" - return self.data["terminal"][-1] + return self.data["dones"][-1] CompletedRollout = namedtuple( @@ -232,11 +232,12 @@ def _env_runner(env, policy, num_local_steps, horizon, obs_filter): action = np.concatenate(action, axis=0).flatten() # Collect the experience. - rollout.add(observations=last_observation, + rollout.add(obs=last_observation, actions=action, rewards=reward, - terminal=terminal, + dones=terminal, features=last_features, + new_obs=observation, **pi_info) last_observation = observation diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 70ee6cf55..0bf09e092 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -181,6 +181,20 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"batch_size": 500, "num_workers": 1}' +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env Pendulum-v0 \ + --run DDPG \ + --stop '{"training_iteration": 2}' \ + --config '{"num_workers": 1}' + +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env MountainCarContinuous-v0 \ + --run DDPG \ + --stop '{"training_iteration": 2}' \ + --config '{"num_workers": 1}' + docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ sh /ray/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh