[RLLib] DDPG (#1685)

This commit is contained in:
alvkao58
2018-04-11 15:08:39 -07:00
committed by Richard Liaw
parent 74162d1492
commit 15a668dd12
17 changed files with 586 additions and 15 deletions
+2 -2
View File
@@ -8,8 +8,8 @@ from ray.tune.registry import register_trainable
def _register_all():
for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "__fake",
"__sigmoid_fake_data", "__parameter_tuning"]:
for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG",
"__fake", "__sigmoid_fake_data", "__parameter_tuning"]:
from ray.rllib.agent import get_agent_class
register_trainable(key, get_agent_class(key))
+1 -1
View File
@@ -38,7 +38,7 @@ class SharedModel(TFPolicy):
def compute_gradients(self, samples):
info = {}
feed_dict = {
self.x: samples["observations"],
self.x: samples["obs"],
self.ac: samples["actions"],
self.adv: samples["advantages"],
self.r: samples["value_targets"],
+1 -1
View File
@@ -57,7 +57,7 @@ class SharedModelLSTM(TFPolicy):
"""
features = samples["features"][0]
feed_dict = {
self.x: samples["observations"],
self.x: samples["obs"],
self.ac: samples["actions"],
self.adv: samples["advantages"],
self.r: samples["value_targets"],
+3
View File
@@ -243,6 +243,9 @@ def get_agent_class(alg):
elif alg == "PG":
from ray.rllib import pg
return pg.PGAgent
elif alg == "DDPG":
from ray.rllib import ddpg
return ddpg.DDPGAgent
elif alg == "script":
from ray.tune import script_runner
return script_runner.ScriptRunner
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG
__all__ = ["DDPGAgent", "DEFAULT_CONFIG"]
+112
View File
@@ -0,0 +1,112 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
from ray.rllib.agent import Agent
from ray.rllib.ddpg.ddpg_evaluator import DDPGEvaluator, RemoteDDPGEvaluator
from ray.rllib.optimizers import LocalSyncReplayOptimizer
from ray.tune.result import TrainingResult
DEFAULT_CONFIG = {
# Actor learning rate
"actor_lr": 0.0001,
# Critic learning rate
"critic_lr": 0.001,
# Arguments to pass in to env creator
"env_config": {},
# MDP Discount factor
"gamma": 0.99,
# Number of steps after which the rollout gets cut
"horizon": 500,
# Whether to include parameter noise
"noise_add": True,
# Linear decay of exploration policy
"noise_epsilon": 0.0002,
# Parameters for noise process
"noise_parameters": {
"mu": 0,
"sigma": 0.2,
"theta": 0.15,
},
# Number of local steps taken for each call to sample
"num_local_steps": 1,
# Number of workers (excluding master)
"num_workers": 0,
"optimizer": {
# Replay buffer size
"buffer_size": 10000,
# Number of steps in warm-up phase before learning starts
"learning_starts": 500,
# Whether to clip rewards
"clip_rewards": False,
# Whether to use prioritized replay
"prioritized_replay": False,
# Size of batch sampled from replay buffer
"train_batch_size": 64,
},
# Controls how fast target networks move
"tau": 0.001,
# Number of steps taken per training iteration
"train_steps": 600,
}
class DDPGAgent(Agent):
_agent_name = "DDPG"
_default_config = DEFAULT_CONFIG
def _init(self):
self.local_evaluator = DDPGEvaluator(
self.registry, self.env_creator, self.config)
self.remote_evaluators = [
RemoteDDPGEvaluator.remote(
self.registry, self.env_creator, self.config)
for _ in range(self.config["num_workers"])]
self.optimizer = LocalSyncReplayOptimizer(
self.config["optimizer"], self.local_evaluator,
self.remote_evaluators)
def _train(self):
for _ in range(self.config["train_steps"]):
self.optimizer.step()
# update target
if self.optimizer.num_steps_trained > 0:
self.local_evaluator.update_target()
# generate training result
return self._fetch_metrics()
def _fetch_metrics(self):
episode_rewards = []
episode_lengths = []
if self.config["num_workers"] > 0:
metric_lists = [a.get_completed_rollout_metrics.remote()
for a in self.remote_evaluators]
for metrics in metric_lists:
for episode in ray.get(metrics):
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
else:
metrics = self.local_evaluator.get_completed_rollout_metrics()
for episode in metrics:
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
avg_reward = (np.mean(episode_rewards))
avg_length = (np.mean(episode_lengths))
timesteps = np.sum(episode_lengths)
result = TrainingResult(
episode_reward_mean=avg_reward,
episode_len_mean=avg_length,
timesteps_this_iter=timesteps,
info={})
return result
+75
View File
@@ -0,0 +1,75 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
from ray.rllib.ddpg.models import DDPGModel
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.optimizers import PolicyEvaluator
from ray.rllib.utils.filter import NoFilter
from ray.rllib.utils.process_rollout import process_rollout
from ray.rllib.utils.sampler import SyncSampler
class DDPGEvaluator(PolicyEvaluator):
def __init__(self, registry, env_creator, config):
self.env = ModelCatalog.get_preprocessor_as_wrapper(
registry, env_creator(config["env_config"]))
# contains model, target_model
self.model = DDPGModel(registry, self.env, config)
self.sampler = SyncSampler(
self.env, self.model.model, NoFilter(),
config["num_local_steps"], horizon=config["horizon"])
def sample(self):
"""Returns a batch of samples."""
rollout = self.sampler.get_data()
rollout.data["weights"] = np.ones_like(rollout.data["rewards"])
# since each sample is one step, no discounting needs to be applied;
# this does not involve config["gamma"]
samples = process_rollout(
rollout, NoFilter(),
gamma=1.0, use_gae=False)
return samples
def update_target(self):
"""Updates target critic and target actor."""
self.model.update_target()
def compute_gradients(self, samples):
"""Returns critic, actor gradients."""
return self.model.compute_gradients(samples)
def apply_gradients(self, grads):
"""Applies gradients to evaluator weights."""
self.model.apply_gradients(grads)
def compute_apply(self, samples):
grads, _ = self.compute_gradients(samples)
self.apply_gradients(grads)
def get_weights(self):
"""Returns model weights."""
return self.model.get_weights()
def set_weights(self, weights):
"""Sets model weights."""
self.model.set_weights(weights)
def get_completed_rollout_metrics(self):
"""Returns metrics on previously completed rollouts.
Calling this clears the queue of completed rollout metrics.
"""
return self.sampler.get_metrics()
RemoteDDPGEvaluator = ray.remote(DDPGEvaluator)
+241
View File
@@ -0,0 +1,241 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from ray.experimental.tfutils import TensorFlowVariables
from ray.rllib.models.ddpgnet import DDPGActor, DDPGCritic
from ray.rllib.ddpg.random_process import OrnsteinUhlenbeckProcess
class DDPGModel():
def __init__(self, registry, env, config):
self.config = config
self.sess = tf.Session()
with tf.variable_scope("model"):
self.model = DDPGActorCritic(
registry, env, self.config, self.sess)
with tf.variable_scope("target_model"):
self.target_model = DDPGActorCritic(
registry, env, self.config, self.sess)
self._setup_gradients()
self._setup_target_updates()
self.initialize()
self._initialize_target_weights()
def initialize(self):
self.sess.run(tf.global_variables_initializer())
def _initialize_target_weights(self):
"""Set initial target weights to match model weights."""
a_updates = []
for var, target_var in zip(
self.model.actor_var_list, self.target_model.actor_var_list):
a_updates.append(tf.assign(target_var, var))
actor_updates = tf.group(*a_updates)
c_updates = []
for var, target_var in zip(
self.model.critic_var_list, self.target_model.critic_var_list):
c_updates.append(tf.assign(target_var, var))
critic_updates = tf.group(*c_updates)
self.sess.run([actor_updates, critic_updates])
def _setup_gradients(self):
"""Setup critic and actor gradients."""
self.critic_grads = tf.gradients(
self.model.critic_loss, self.model.critic_var_list)
c_grads_and_vars = list(zip(
self.critic_grads, self.model.critic_var_list))
c_opt = tf.train.AdamOptimizer(self.config["critic_lr"])
self._apply_c_gradients = c_opt.apply_gradients(c_grads_and_vars)
self.actor_grads = tf.gradients(
-self.model.cn_for_loss, self.model.actor_var_list)
a_grads_and_vars = list(zip(
self.actor_grads, self.model.actor_var_list))
a_opt = tf.train.AdamOptimizer(self.config["actor_lr"])
self._apply_a_gradients = a_opt.apply_gradients(a_grads_and_vars)
def compute_gradients(self, samples):
""" Returns gradient w.r.t. samples."""
# actor gradients
actor_actions = self.sess.run(
self.model.output_action,
feed_dict={self.model.obs: samples["obs"]}
)
actor_feed_dict = {
self.model.obs: samples["obs"],
self.model.output_action: actor_actions,
}
self.actor_grads = [g for g in self.actor_grads if g is not None]
actor_grad = self.sess.run(self.actor_grads, feed_dict=actor_feed_dict)
# feed samples into target actor
target_Q_act = self.sess.run(
self.target_model.output_action,
feed_dict={self.target_model.obs: samples["new_obs"]}
)
target_Q_dict = {
self.target_model.obs: samples["new_obs"],
self.target_model.act: target_Q_act,
}
target_Q = self.sess.run(
self.target_model.critic_eval, feed_dict=target_Q_dict)
# critic gradients
critic_feed_dict = {
self.model.obs: samples["obs"],
self.model.act: samples["actions"],
self.model.reward: samples["rewards"],
self.model.target_Q: target_Q,
}
self.critic_grads = [g for g in self.critic_grads if g is not None]
critic_grad = self.sess.run(
self.critic_grads, feed_dict=critic_feed_dict)
return (critic_grad, actor_grad), {}
def apply_gradients(self, grads):
"""Applies gradients to evaluator weights."""
c_grads, a_grads = grads
critic_feed_dict = dict(zip(self.critic_grads, c_grads))
self.sess.run(self._apply_c_gradients, feed_dict=critic_feed_dict)
actor_feed_dict = dict(zip(self.actor_grads, a_grads))
self.sess.run(self._apply_a_gradients, feed_dict=actor_feed_dict)
def get_weights(self):
"""Returns model weights, target model weights."""
return self.model.get_weights(), self.target_model.get_weights()
def set_weights(self, weights):
"""Sets model and target model weights."""
model_weights, target_model_weights = weights
self.model.set_weights(model_weights)
self.target_model.set_weights(target_model_weights)
def _setup_target_updates(self):
"""Set up target actor and critic updates."""
a_updates = []
tau = self.config["tau"]
for var, target_var in zip(
self.model.actor_var_list, self.target_model.actor_var_list):
a_updates.append(tf.assign(
target_var, tau * var + (1. - tau) * target_var))
actor_updates = tf.group(*a_updates)
c_updates = []
for var, target_var in zip(
self.model.critic_var_list, self.target_model.critic_var_list):
c_updates.append(tf.assign(
target_var, tau * var + (1. - tau) * target_var))
critic_updates = tf.group(*c_updates)
self.target_updates = [actor_updates, critic_updates]
def update_target(self):
"""Updates target critic and target actor."""
self.sess.run(self.target_updates)
class DDPGActorCritic():
other_output = []
is_recurrent = False
def __init__(self, registry, env, config, sess):
self.config = config
self.sess = sess
obs_space = env.observation_space
ac_space = env.action_space
self.obs_size = int(np.prod(obs_space.shape))
self.obs = tf.placeholder(tf.float32, [None, self.obs_size])
self.ac_size = int(np.prod(ac_space.shape))
self.act = tf.placeholder(tf.float32, [None, self.ac_size])
self.action_bound = env.action_space.high
# TODO: change action_bound to make more general
self._setup_actor_network(obs_space, ac_space)
self._setup_critic_network(obs_space, ac_space)
self._setup_critic_loss(ac_space)
with tf.variable_scope("critic"):
self.critic_var_list = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES,
tf.get_variable_scope().name
)
self.critic_vars = TensorFlowVariables(self.critic_loss,
self.sess)
with tf.variable_scope("actor"):
self.actor_var_list = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES,
tf.get_variable_scope().name
)
self.actor_vars = TensorFlowVariables(self.output_action,
self.sess)
if (self.config["noise_add"]):
params = self.config["noise_parameters"]
self.rand_process = OrnsteinUhlenbeckProcess(size=self.ac_size,
theta=params["theta"],
mu=params["mu"],
sigma=params["sigma"])
self.epsilon = 1.0
def _setup_critic_loss(self, action_space):
"""Sets up critic loss."""
self.target_Q = tf.placeholder(tf.float32, [None, 1], name="target_q")
# compare critic eval to critic_target (squared loss)
self.reward = tf.placeholder(tf.float32, [None], name="reward")
self.critic_target = tf.expand_dims(self.reward, 1) + \
self.config['gamma'] * self.target_Q
self.critic_loss = tf.reduce_mean(tf.square(
self.critic_target - self.critic_eval))
def _setup_critic_network(self, obs_space, ac_space):
"""Sets up Q network."""
with tf.variable_scope("critic", reuse=tf.AUTO_REUSE):
self.critic_network = DDPGCritic((self.obs, self.act), 1, {})
self.critic_eval = self.critic_network.outputs
with tf.variable_scope("critic", reuse=True):
self.cn_for_loss = DDPGCritic(
(self.obs, self.output_action), 1, {}).outputs
def _setup_actor_network(self, obs_space, ac_space):
"""Sets up actor network."""
with tf.variable_scope("actor", reuse=tf.AUTO_REUSE):
self.actor_network = DDPGActor(
self.obs, self.ac_size,
options={"action_bound": self.action_bound})
self.output_action = self.actor_network.outputs
def get_weights(self):
"""Returns critic weights, actor weights."""
return self.critic_vars.get_weights(), self.actor_vars.get_weights()
def set_weights(self, weights):
"""Sets critic and actor weights."""
critic_weights, actor_weights = weights
self.critic_vars.set_weights(critic_weights)
self.actor_vars.set_weights(actor_weights)
def compute(self, ob):
"""Returns action, given state."""
flattened_ob = np.reshape(ob, [-1, np.prod(ob.shape)])
action = self.sess.run(self.output_action, {self.obs: flattened_ob})
if (self.config["noise_add"]):
action += self.epsilon * self.rand_process.sample()
if (self.epsilon > 0):
self.epsilon -= self.config["noise_epsilon"]
return action[0], {}
def value(self, *args):
return 0
+63
View File
@@ -0,0 +1,63 @@
# [reference]
# https://github.com/matthiasplappert/keras-rl/blob/master/rl/random.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
class RandomProcess(object):
def reset_states(self):
pass
class AnnealedGaussianProcess(RandomProcess):
def __init__(self, mu, sigma, sigma_min, n_steps_annealing):
self.mu = mu
self.sigma = sigma
self.n_steps = 0
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0.
self.c = sigma
self.sigma_min = sigma
@property
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
return sigma
# Based on
# http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
def __init__(self, theta, mu=0., sigma=1., dt=1e-2,
x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
super(OrnsteinUhlenbeckProcess, self).__init__(
mu=mu,
sigma=sigma,
sigma_min=sigma_min,
n_steps_annealing=n_steps_annealing)
self.theta = theta
self.mu = mu
self.dt = dt
self.x0 = x0
self.size = size
self.reset_states()
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.current_sigma * np.sqrt(self.dt) * \
np.random.normal(size=self.size)
self.x_prev = x
self.n_steps += 1
return x
def reset_states(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
+49
View File
@@ -0,0 +1,49 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.slim as slim
from ray.rllib.models.model import Model
class DDPGActor(Model):
"""Actor network for DDPG."""
def _init(self, inputs, num_outputs, options):
w_normal = tf.truncated_normal_initializer()
w_init = tf.random_uniform_initializer(minval=-0.003, maxval=0.003)
ac_bound = options["action_bound"]
net = slim.fully_connected(
inputs, 400, activation_fn=tf.nn.relu,
weights_initializer=w_normal)
net = slim.fully_connected(
net, 300, activation_fn=tf.nn.relu, weights_initializer=w_normal)
out = slim.fully_connected(
net, num_outputs, activation_fn=tf.nn.tanh,
weights_initializer=w_init)
scaled_out = tf.multiply(out, ac_bound)
return scaled_out, net
class DDPGCritic(Model):
"""Critic network for DDPG."""
def _init(self, inputs, num_outputs, options):
obs, action = inputs
w_normal = tf.truncated_normal_initializer()
w_init = tf.random_uniform_initializer(minval=-0.0003, maxval=0.0003)
net = slim.fully_connected(
obs, 400, activation_fn=tf.nn.relu, weights_initializer=w_normal)
t1 = slim.fully_connected(
net, 300, activation_fn=None, biases_initializer=None,
weights_initializer=w_normal)
t2 = slim.fully_connected(
action, 300, activation_fn=None, weights_initializer=w_normal)
net = tf.nn.relu(tf.add(t1, t2))
out = slim.fully_connected(
net, 1, activation_fn=None, weights_initializer=w_init)
return out, net
@@ -85,17 +85,17 @@ class LocalSyncReplayOptimizer(PolicyOptimizer):
self.train_batch_size)
weights = np.ones_like(rewards)
batch_indexes = - np.ones_like(rewards)
samples = SampleBatch({
"obs": obses_t, "actions": actions, "rewards": rewards,
"new_obs": obses_tp1, "dones": dones, "weights": weights,
"batch_indexes": batch_indexes})
with self.grad_timer:
td_error = self.local_evaluator.compute_apply(samples)["td_error"]
new_priorities = (
np.abs(td_error) + self.prioritized_replay_eps)
info = self.local_evaluator.compute_apply(samples)
if isinstance(self.replay_buffer, PrioritizedReplayBuffer):
td_error = info["td_error"]
new_priorities = (
np.abs(td_error) + self.prioritized_replay_eps)
self.replay_buffer.update_priorities(
samples["batch_indexes"], new_priorities)
self.grad_timer.push_units_processed(samples.count)
+1 -1
View File
@@ -59,7 +59,7 @@ class PGPolicy():
def compute_gradients(self, samples):
info = {}
feed_dict = {
self.x: samples["observations"],
self.x: samples["obs"],
self.ac: samples["actions"],
self.adv: samples["advantages"],
}
+1 -1
View File
@@ -223,7 +223,7 @@ class PPOAgent(Agent):
"shuffle_time": shuffle_time,
"load_time": load_time,
"sgd_time": sgd_time,
"sample_throughput": len(samples["observations"]) / sgd_time
"sample_throughput": len(samples["obs"]) / sgd_time
}
FilterManager.synchronize(
+1 -1
View File
@@ -139,7 +139,7 @@ class PPOEvaluator(PolicyEvaluator):
dummy = np.zeros_like(trajectories["advantages"])
return self.par_opt.load_data(
self.sess,
[trajectories["observations"],
[trajectories["obs"],
trajectories["value_targets"] if use_gae else dummy,
trajectories["advantages"],
trajectories["actions"],
@@ -0,0 +1,10 @@
pendulum-ddpg:
env: Pendulum-v0
run: DDPG
stop:
episode_reward_mean: -100
time_total_s: 600
trial_resources:
cpu: 1
config:
num_workers: 1
+5 -4
View File
@@ -20,7 +20,7 @@ class PartialRollout(object):
last_r (float): Value of next state. Used for bootstrapping.
"""
fields = ["observations", "actions", "rewards", "terminal", "features"]
fields = ["obs", "actions", "rewards", "new_obs", "dones", "features"]
def __init__(self, extra_fields=None):
"""Initializers internals. Maintains a `last_r` field
@@ -54,7 +54,7 @@ class PartialRollout(object):
Returns:
terminal (bool): if rollout has terminated."""
return self.data["terminal"][-1]
return self.data["dones"][-1]
CompletedRollout = namedtuple(
@@ -232,11 +232,12 @@ def _env_runner(env, policy, num_local_steps, horizon, obs_filter):
action = np.concatenate(action, axis=0).flatten()
# Collect the experience.
rollout.add(observations=last_observation,
rollout.add(obs=last_observation,
actions=action,
rewards=reward,
terminal=terminal,
dones=terminal,
features=last_features,
new_obs=observation,
**pi_info)
last_observation = observation