[rllib] Merge DDPG and DDPG2 implementations (#2202)

* removed ddpg2

* removed ddpg2 from codebase

* added tests used in ddpg vs ddpg2 comparison

* added notes about training timesteps to yaml files

* removed ddpg2 yaml files

* removed unnecessary configs from yaml files

* removed unnecessary configs from yaml files

* moved pendulum, mountaincarcontinuous, and halfcheetah tests to tuned_examples

* moved pendulum, mountaincarcontinuous, and halfcheetah tests to tuned_examples

* added more configuration details to yaml files

* removed random starts from halfcheetah
This commit is contained in:
andrewztan
2018-06-09 16:46:23 -07:00
committed by Richard Liaw
parent 3b5e700fd7
commit 1475600c81
14 changed files with 185 additions and 520 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ RLlib includes the following reference algorithms:
- Deep Q Networks (`DQN <https://github.com/ray-project/ray/tree/master/python/ray/rllib/dqn>`__).
- Deep Deterministic Policy Gradients (`DDPG <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg>`__, `DDPG2 <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg2>`__).
- Deep Deterministic Policy Gradients (`DDPG <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg>`__).
- Ape-X Distributed Prioritized Experience Replay, including both `DQN <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/apex.py>`__ and `DDPG <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ddpg/apex.py>`__ variants.
+1 -1
View File
@@ -14,7 +14,7 @@ from ray.rllib.optimizers.sample_batch import SampleBatch
def _register_all():
for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG",
"DDPG2", "APEX_DDPG", "__fake", "__sigmoid_fake_data",
"APEX_DDPG", "__fake", "__sigmoid_fake_data",
"__parameter_tuning"]:
from ray.rllib.agent import get_agent_class
register_trainable(key, get_agent_class(key))
+1 -4
View File
@@ -231,10 +231,7 @@ class _ParameterTuningAgent(_MockAgent):
def get_agent_class(alg):
"""Returns the class of a known agent given its name."""
if alg == "DDPG2":
from ray.rllib import ddpg2
return ddpg2.DDPG2Agent
elif alg == "DDPG":
if alg == "DDPG":
from ray.rllib import ddpg
return ddpg.DDPGAgent
elif alg == "APEX_DDPG":
-1
View File
@@ -1 +0,0 @@
Alternate DDPG implementation. See also https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg.
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.ddpg2.ddpg import DDPG2Agent, DEFAULT_CONFIG
__all__ = ["DDPG2Agent", "DEFAULT_CONFIG"]
-112
View File
@@ -1,112 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
from ray.rllib.agent import Agent
from ray.rllib.ddpg2.ddpg_evaluator import DDPGEvaluator, RemoteDDPGEvaluator
from ray.rllib.optimizers import LocalSyncReplayOptimizer
from ray.tune.result import TrainingResult
DEFAULT_CONFIG = {
# Actor learning rate
"actor_lr": 0.0001,
# Critic learning rate
"critic_lr": 0.001,
# Arguments to pass in to env creator
"env_config": {},
# MDP Discount factor
"gamma": 0.99,
# Number of steps after which the rollout gets cut
"horizon": 500,
# Whether to include parameter noise
"noise_add": True,
# Linear decay of exploration policy
"noise_epsilon": 0.0002,
# Parameters for noise process
"noise_parameters": {
"mu": 0,
"sigma": 0.2,
"theta": 0.15,
},
# Number of local steps taken for each call to sample
"num_local_steps": 1,
# Number of workers (excluding master)
"num_workers": 0,
"optimizer": {
# Replay buffer size
"buffer_size": 10000,
# Number of steps in warm-up phase before learning starts
"learning_starts": 500,
# Whether to clip rewards
"clip_rewards": False,
# Whether to use prioritized replay
"prioritized_replay": False,
# Size of batch sampled from replay buffer
"train_batch_size": 64,
},
# Controls how fast target networks move
"tau": 0.001,
# Number of steps taken per training iteration
"train_steps": 600,
}
class DDPG2Agent(Agent):
_agent_name = "DDPG2"
_default_config = DEFAULT_CONFIG
def _init(self):
self.local_evaluator = DDPGEvaluator(
self.registry, self.env_creator, self.config)
self.remote_evaluators = [
RemoteDDPGEvaluator.remote(
self.registry, self.env_creator, self.config)
for _ in range(self.config["num_workers"])]
self.optimizer = LocalSyncReplayOptimizer(
self.config["optimizer"], self.local_evaluator,
self.remote_evaluators)
def _train(self):
for _ in range(self.config["train_steps"]):
self.optimizer.step()
# update target
if self.optimizer.num_steps_trained > 0:
self.local_evaluator.update_target()
# generate training result
return self._fetch_metrics()
def _fetch_metrics(self):
episode_rewards = []
episode_lengths = []
if self.config["num_workers"] > 0:
metric_lists = [a.get_completed_rollout_metrics.remote()
for a in self.remote_evaluators]
for metrics in metric_lists:
for episode in ray.get(metrics):
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
else:
metrics = self.local_evaluator.get_completed_rollout_metrics()
for episode in metrics:
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
avg_reward = (np.mean(episode_rewards))
avg_length = (np.mean(episode_lengths))
timesteps = np.sum(episode_lengths)
result = TrainingResult(
episode_reward_mean=avg_reward,
episode_len_mean=avg_length,
timesteps_this_iter=timesteps,
info={})
return result
-73
View File
@@ -1,73 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
from ray.rllib.ddpg2.models import DDPGModel
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.optimizers import PolicyEvaluator
from ray.rllib.utils.filter import NoFilter
from ray.rllib.utils.process_rollout import compute_advantages
from ray.rllib.utils.sampler import SyncSampler
class DDPGEvaluator(PolicyEvaluator):
def __init__(self, registry, env_creator, config):
self.env = ModelCatalog.get_preprocessor_as_wrapper(
registry, env_creator(config["env_config"]))
# contains model, target_model
self.model = DDPGModel(registry, self.env, config)
self.sampler = SyncSampler(
self.env, self.model.model, NoFilter(),
config["num_local_steps"], horizon=config["horizon"])
def sample(self):
"""Returns a batch of samples."""
rollout = self.sampler.get_data()
rollout.data["weights"] = np.ones_like(rollout.data["rewards"])
# since each sample is one step, no discounting needs to be applied;
# this does not involve config["gamma"]
samples = compute_advantages(rollout, 0.0, gamma=1.0, use_gae=False)
return samples
def update_target(self):
"""Updates target critic and target actor."""
self.model.update_target()
def compute_gradients(self, samples):
"""Returns critic, actor gradients."""
return self.model.compute_gradients(samples)
def apply_gradients(self, grads):
"""Applies gradients to evaluator weights."""
self.model.apply_gradients(grads)
def compute_apply(self, samples):
grads, _ = self.compute_gradients(samples)
self.apply_gradients(grads)
def get_weights(self):
"""Returns model weights."""
return self.model.get_weights()
def set_weights(self, weights):
"""Sets model weights."""
self.model.set_weights(weights)
def get_completed_rollout_metrics(self):
"""Returns metrics on previously completed rollouts.
Calling this clears the queue of completed rollout metrics.
"""
return self.sampler.get_metrics()
RemoteDDPGEvaluator = ray.remote(DDPGEvaluator)
-244
View File
@@ -1,244 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from ray.experimental.tfutils import TensorFlowVariables
from ray.rllib.models.ddpgnet import DDPGActor, DDPGCritic
from ray.rllib.ddpg2.random_process import OrnsteinUhlenbeckProcess
class DDPGModel():
def __init__(self, registry, env, config):
self.config = config
self.sess = tf.Session()
with tf.variable_scope("model"):
self.model = DDPGActorCritic(
registry, env, self.config, self.sess)
with tf.variable_scope("target_model"):
self.target_model = DDPGActorCritic(
registry, env, self.config, self.sess)
self._setup_gradients()
self._setup_target_updates()
self.initialize()
self._initialize_target_weights()
def initialize(self):
self.sess.run(tf.global_variables_initializer())
def _initialize_target_weights(self):
"""Set initial target weights to match model weights."""
a_updates = []
for var, target_var in zip(
self.model.actor_var_list, self.target_model.actor_var_list):
a_updates.append(tf.assign(target_var, var))
actor_updates = tf.group(*a_updates)
c_updates = []
for var, target_var in zip(
self.model.critic_var_list, self.target_model.critic_var_list):
c_updates.append(tf.assign(target_var, var))
critic_updates = tf.group(*c_updates)
self.sess.run([actor_updates, critic_updates])
def _setup_gradients(self):
"""Setup critic and actor gradients."""
self.critic_grads = tf.gradients(
self.model.critic_loss, self.model.critic_var_list)
c_grads_and_vars = list(zip(
self.critic_grads, self.model.critic_var_list))
c_opt = tf.train.AdamOptimizer(self.config["critic_lr"])
self._apply_c_gradients = c_opt.apply_gradients(c_grads_and_vars)
self.actor_grads = tf.gradients(
-self.model.cn_for_loss, self.model.actor_var_list)
a_grads_and_vars = list(zip(
self.actor_grads, self.model.actor_var_list))
a_opt = tf.train.AdamOptimizer(self.config["actor_lr"])
self._apply_a_gradients = a_opt.apply_gradients(a_grads_and_vars)
def compute_gradients(self, samples):
""" Returns gradient w.r.t. samples."""
# actor gradients
actor_actions = self.sess.run(
self.model.output_action,
feed_dict={self.model.obs: samples["obs"]}
)
actor_feed_dict = {
self.model.obs: samples["obs"],
self.model.output_action: actor_actions,
}
self.actor_grads = [g for g in self.actor_grads if g is not None]
actor_grad = self.sess.run(self.actor_grads, feed_dict=actor_feed_dict)
# feed samples into target actor
target_Q_act = self.sess.run(
self.target_model.output_action,
feed_dict={self.target_model.obs: samples["new_obs"]}
)
target_Q_dict = {
self.target_model.obs: samples["new_obs"],
self.target_model.act: target_Q_act,
}
target_Q = self.sess.run(
self.target_model.critic_eval, feed_dict=target_Q_dict)
# critic gradients
critic_feed_dict = {
self.model.obs: samples["obs"],
self.model.act: samples["actions"],
self.model.reward: samples["rewards"],
self.model.target_Q: target_Q,
}
self.critic_grads = [g for g in self.critic_grads if g is not None]
critic_grad = self.sess.run(
self.critic_grads, feed_dict=critic_feed_dict)
return (critic_grad, actor_grad), {}
def apply_gradients(self, grads):
"""Applies gradients to evaluator weights."""
c_grads, a_grads = grads
critic_feed_dict = dict(zip(self.critic_grads, c_grads))
self.sess.run(self._apply_c_gradients, feed_dict=critic_feed_dict)
actor_feed_dict = dict(zip(self.actor_grads, a_grads))
self.sess.run(self._apply_a_gradients, feed_dict=actor_feed_dict)
def get_weights(self):
"""Returns model weights, target model weights."""
return self.model.get_weights(), self.target_model.get_weights()
def set_weights(self, weights):
"""Sets model and target model weights."""
model_weights, target_model_weights = weights
self.model.set_weights(model_weights)
self.target_model.set_weights(target_model_weights)
def _setup_target_updates(self):
"""Set up target actor and critic updates."""
a_updates = []
tau = self.config["tau"]
for var, target_var in zip(
self.model.actor_var_list, self.target_model.actor_var_list):
a_updates.append(tf.assign(
target_var, tau * var + (1. - tau) * target_var))
actor_updates = tf.group(*a_updates)
c_updates = []
for var, target_var in zip(
self.model.critic_var_list, self.target_model.critic_var_list):
c_updates.append(tf.assign(
target_var, tau * var + (1. - tau) * target_var))
critic_updates = tf.group(*c_updates)
self.target_updates = [actor_updates, critic_updates]
def update_target(self):
"""Updates target critic and target actor."""
self.sess.run(self.target_updates)
class DDPGActorCritic():
other_output = []
is_recurrent = False
def __init__(self, registry, env, config, sess):
self.config = config
self.sess = sess
obs_space = env.observation_space
ac_space = env.action_space
self.obs_size = int(np.prod(obs_space.shape))
self.obs = tf.placeholder(tf.float32, [None, self.obs_size])
self.ac_size = int(np.prod(ac_space.shape))
self.act = tf.placeholder(tf.float32, [None, self.ac_size])
self.action_bound = env.action_space.high
# TODO: change action_bound to make more general
self._setup_actor_network(obs_space, ac_space)
self._setup_critic_network(obs_space, ac_space)
self._setup_critic_loss(ac_space)
with tf.variable_scope("critic"):
self.critic_var_list = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES,
tf.get_variable_scope().name
)
self.critic_vars = TensorFlowVariables(self.critic_loss,
self.sess)
with tf.variable_scope("actor"):
self.actor_var_list = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES,
tf.get_variable_scope().name
)
self.actor_vars = TensorFlowVariables(self.output_action,
self.sess)
if (self.config["noise_add"]):
params = self.config["noise_parameters"]
self.rand_process = OrnsteinUhlenbeckProcess(size=self.ac_size,
theta=params["theta"],
mu=params["mu"],
sigma=params["sigma"])
self.epsilon = 1.0
def _setup_critic_loss(self, action_space):
"""Sets up critic loss."""
self.target_Q = tf.placeholder(tf.float32, [None, 1], name="target_q")
# compare critic eval to critic_target (squared loss)
self.reward = tf.placeholder(tf.float32, [None], name="reward")
self.critic_target = tf.expand_dims(self.reward, 1) + \
self.config['gamma'] * self.target_Q
self.critic_loss = tf.reduce_mean(tf.square(
self.critic_target - self.critic_eval))
def _setup_critic_network(self, obs_space, ac_space):
"""Sets up Q network."""
with tf.variable_scope("critic", reuse=tf.AUTO_REUSE):
self.critic_network = DDPGCritic((self.obs, self.act), 1, {})
self.critic_eval = self.critic_network.outputs
with tf.variable_scope("critic", reuse=True):
self.cn_for_loss = DDPGCritic(
(self.obs, self.output_action), 1, {}).outputs
def _setup_actor_network(self, obs_space, ac_space):
"""Sets up actor network."""
with tf.variable_scope("actor", reuse=tf.AUTO_REUSE):
self.actor_network = DDPGActor(
self.obs, self.ac_size,
options={"action_bound": self.action_bound})
self.output_action = self.actor_network.outputs
def get_weights(self):
"""Returns critic weights, actor weights."""
return self.critic_vars.get_weights(), self.actor_vars.get_weights()
def set_weights(self, weights):
"""Sets critic and actor weights."""
critic_weights, actor_weights = weights
self.critic_vars.set_weights(critic_weights)
self.actor_vars.set_weights(actor_weights)
def compute_single_action(self, ob, h, is_training):
"""Returns action, given state."""
flattened_ob = np.reshape(ob, [-1, np.prod(ob.shape)])
action = self.sess.run(self.output_action, {self.obs: flattened_ob})
if (self.config["noise_add"]):
action += self.epsilon * self.rand_process.sample()
if (self.epsilon > 0):
self.epsilon -= self.config["noise_epsilon"]
return action[0], [], {}
def value(self, *args):
return 0
def get_initial_state(self):
return []
-63
View File
@@ -1,63 +0,0 @@
# [reference]
# https://github.com/matthiasplappert/keras-rl/blob/master/rl/random.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
class RandomProcess(object):
def reset_states(self):
pass
class AnnealedGaussianProcess(RandomProcess):
def __init__(self, mu, sigma, sigma_min, n_steps_annealing):
self.mu = mu
self.sigma = sigma
self.n_steps = 0
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0.
self.c = sigma
self.sigma_min = sigma
@property
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
return sigma
# Based on
# http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
def __init__(self, theta, mu=0., sigma=1., dt=1e-2,
x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
super(OrnsteinUhlenbeckProcess, self).__init__(
mu=mu,
sigma=sigma,
sigma_min=sigma_min,
n_steps_annealing=n_steps_annealing)
self.theta = theta
self.mu = mu
self.dt = dt
self.x0 = x0
self.size = size
self.reset_states()
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.current_sigma * np.sqrt(self.dt) * \
np.random.normal(size=self.size)
self.x_prev = x
self.n_steps += 1
return x
def reset_states(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
@@ -0,0 +1,67 @@
# This configuration can expect to reach 2000 reward in 150k-200k timesteps
halfcheetah-ddpg:
env: HalfCheetah-v2
run: DDPG
stop:
episode_reward_mean: 2000
time_total_s: 5400 # 90 minutes
config:
# === Model ===
actor_hiddens: [64, 64]
critic_hiddens: [64, 64]
n_step: 1
model: {}
gamma: 0.99
env_config: {}
# === Exploration ===
schedule_max_timesteps: 100000
timesteps_per_iteration: 1000
exploration_fraction: 0.1
exploration_final_eps: 0.02
noise_scale: 0.1
exploration_theta: 0.15
exploration_sigma: 0.2
target_network_update_freq: 0
tau: 0.001
# === Replay buffer ===
buffer_size: 10000
prioritized_replay: True
prioritized_replay_alpha: 0.6
prioritized_replay_beta: 0.4
prioritized_replay_eps: 0.000001
clip_rewards: False
# === Optimization ===
actor_lr: 0.0001
critic_lr: 0.001
use_huber: False
huber_threshold: 1.0
l2_reg: 0.000001
learning_starts: 500
sample_batch_size: 1
train_batch_size: 64
smoothing_num_episodes: 10
# === Tensorflow ===
tf_session_args: {
"device_count": {
"CPU": 2
},
"log_device_placement": False,
"allow_soft_placement": True,
"gpu_options": {
"allow_growth": True
},
"inter_op_parallelism_threads": 1,
"intra_op_parallelism_threads": 1,
}
# === Parallelism ===
num_workers: 0
num_gpus_per_worker: 0
optimizer_class: "LocalSyncReplayOptimizer"
optimizer_config: {}
per_worker_exploration: False
worker_side_prioritization: False
@@ -1,19 +1,67 @@
# can expect improvement to 90 reward in ~12-24k timesteps
# This configuration can expect to reach 90 reward in 10k-20k timesteps
mountaincarcontinuous-ddpg:
env: MountainCarContinuous-v0
run: DDPG
stop:
episode_reward_mean: 90
time_total_s: 600 # 10 minutes
config:
n_step: 3
# === Model ===
actor_hiddens: [32, 64]
critic_hiddens: [64, 64]
noise_scale: 0.75
n_step: 3
model: {}
gamma: 0.99
env_config: {}
# === Exploration ===
schedule_max_timesteps: 100000
timesteps_per_iteration: 1000
exploration_fraction: 0.4
exploration_final_eps: 0.02
noise_scale: 0.75
exploration_theta: 0.15
exploration_sigma: 0.2
target_network_update_freq: 0
tau: 0.01
l2_reg: 0.00001
# === Replay buffer ===
buffer_size: 50000
prioritized_replay: False
prioritized_replay_alpha: 0.6
prioritized_replay_beta: 0.4
prioritized_replay_eps: 0.000001
clip_rewards: False
# === Optimization ===
actor_lr: 0.0001
critic_lr: 0.001
use_huber: False
huber_threshold: 1.0
l2_reg: 0.00001
learning_starts: 1000
#model:
# fcnet_hiddens: []
sample_batch_size: 1
train_batch_size: 64
smoothing_num_episodes: 10
# === Tensorflow ===
tf_session_args: {
"device_count": {
"CPU": 2
},
"log_device_placement": False,
"allow_soft_placement": True,
"gpu_options": {
"allow_growth": True
},
"inter_op_parallelism_threads": 1,
"intra_op_parallelism_threads": 1,
}
# === Parallelism ===
num_workers: 0
num_gpus_per_worker: 0
optimizer_class: "LocalSyncReplayOptimizer"
optimizer_config: {}
per_worker_exploration: False
worker_side_prioritization: False
@@ -1,10 +1,67 @@
# can expect improvement to -160 reward in ~30k timesteps
# This configuration can expect to reach -160 reward in 10k-20k timesteps
pendulum-ddpg:
env: Pendulum-v0
run: DDPG
stop:
episode_reward_mean: -160
time_total_s: 600 # 10 minutes
config:
use_huber: True
clip_rewards: False
# === Model ===
actor_hiddens: [64, 64]
critic_hiddens: [64, 64]
n_step: 1
model: {}
gamma: 0.99
env_config: {}
# === Exploration ===
schedule_max_timesteps: 100000
timesteps_per_iteration: 600
exploration_fraction: 0.1
exploration_final_eps: 0.02
noise_scale: 0.1
exploration_theta: 0.15
exploration_sigma: 0.2
target_network_update_freq: 0
tau: 0.001
# === Replay buffer ===
buffer_size: 10000
prioritized_replay: True
prioritized_replay_alpha: 0.6
prioritized_replay_beta: 0.4
prioritized_replay_eps: 0.000001
clip_rewards: False
# === Optimization ===
actor_lr: 0.0001
critic_lr: 0.001
use_huber: True
huber_threshold: 1.0
l2_reg: 0.000001
learning_starts: 500
sample_batch_size: 1
train_batch_size: 64
smoothing_num_episodes: 10
# === Tensorflow ===
tf_session_args: {
"device_count": {
"CPU": 2
},
"log_device_placement": False,
"allow_soft_placement": True,
"gpu_options": {
"allow_growth": True
},
"inter_op_parallelism_threads": 1,
"intra_op_parallelism_threads": 1,
}
# === Parallelism ===
num_workers: 0
num_gpus_per_worker: 0
optimizer_class: "LocalSyncReplayOptimizer"
optimizer_config: {}
per_worker_exploration: False
worker_side_prioritization: False
@@ -1,8 +0,0 @@
pendulum-ddpg2:
env: Pendulum-v0
run: DDPG2
stop:
episode_reward_mean: -100
time_total_s: 600
config:
num_workers: 1