diff --git a/doc/source/rllib-dev.rst b/doc/source/rllib-dev.rst index 83237e707..6b5358f58 100644 --- a/doc/source/rllib-dev.rst +++ b/doc/source/rllib-dev.rst @@ -68,7 +68,6 @@ Algorithms share neural network models which inherit from the following class: Currently we support fully connected and convolutional TensorFlow policies on all algorithms: .. autoclass:: ray.rllib.models.FullyConnectedNetwork -.. autoclass:: ray.rllib.models.ConvolutionalNetwork A3C also supports a TensorFlow LSTM policy. diff --git a/python/ray/rllib/a3c/a3c.py b/python/ray/rllib/a3c/a3c.py index f18ebc05c..b04351be3 100644 --- a/python/ray/rllib/a3c/a3c.py +++ b/python/ray/rllib/a3c/a3c.py @@ -11,7 +11,6 @@ from ray.rllib.optimizers import AsyncOptimizer from ray.rllib.utils import FilterManager from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \ collect_metrics -from ray.rllib.a3c.common import get_policy_cls from ray.tune.trial import Resources DEFAULT_CONFIG = { @@ -21,8 +20,6 @@ DEFAULT_CONFIG = { "num_envs": 1, # Size of rollout batch "batch_size": 10, - # Use LSTM model - only applicable for image states - "use_lstm": False, # Use PyTorch as backend - no LSTM support "use_pytorch": False, # Which observation filter to apply to the observation @@ -47,6 +44,8 @@ DEFAULT_CONFIG = { "summarize": False, # Model and preprocessor options "model": { + # Use LSTM model - only applicable for image states. Requires TF. + "use_lstm": False, # (Image statespace) - Converts image to Channels = 1 "grayscale": True, # (Image statespace) - Each pixel @@ -86,7 +85,12 @@ class A3CAgent(Agent): extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0) def _init(self): - self.policy_cls = get_policy_cls(self.config) + if self.config["use_pytorch"]: + from ray.rllib.a3c.a3c_torch_policy import A3CTorchPolicyGraph + self.policy_cls = A3CTorchPolicyGraph + else: + from ray.rllib.a3c.a3c_tf_policy import A3CPolicyGraph + self.policy_cls = A3CPolicyGraph if self.config["use_pytorch"]: session_creator = None diff --git a/python/ray/rllib/a3c/a3c_tf_policy.py b/python/ray/rllib/a3c/a3c_tf_policy.py index a23d4b9c4..cc7605965 100644 --- a/python/ray/rllib/a3c/a3c_tf_policy.py +++ b/python/ray/rllib/a3c/a3c_tf_policy.py @@ -7,90 +7,124 @@ import gym import ray from ray.rllib.utils.error import UnsupportedSpaceException -from ray.rllib.utils.process_rollout import compute_advantages +from ray.rllib.utils.postprocessing import compute_advantages from ray.rllib.utils.tf_policy_graph import TFPolicyGraph +from ray.rllib.models.misc import linear, normc_initializer +from ray.rllib.models.catalog import ModelCatalog -class A3CTFPolicyGraph(TFPolicyGraph): - """The TF policy base class.""" +class A3CLoss(object): + def __init__( + self, action_dist, actions, advantages, v_target, vf, + vf_loss_coeff=0.5, entropy_coeff=-0.01): + log_prob = action_dist.logp(actions) - def __init__(self, ob_space, action_space, config): + # The "policy gradients" loss + self.pi_loss = - tf.reduce_sum(log_prob * advantages) + + delta = vf - v_target + self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta)) + self.entropy = tf.reduce_sum(action_dist.entropy()) + self.total_loss = (self.pi_loss + + self.vf_loss * vf_loss_coeff + + self.entropy * entropy_coeff) + + +class A3CPolicyGraph(TFPolicyGraph): + def __init__(self, observation_space, action_space, config): config = dict(ray.rllib.a3c.a3c.DEFAULT_CONFIG, **config) - self.local_steps = 0 self.config = config - self.summarize = config.get("summarize") - - self._setup_graph(ob_space, action_space) - assert all(hasattr(self, attr) - for attr in ["vf", "logits", "x", "var_list"]) - print("Setting up loss") - self.setup_loss(action_space) - self.is_training = tf.placeholder_with_default(True, ()) self.sess = tf.get_default_session() - TFPolicyGraph.__init__( - self, ob_space, action_space, self.sess, obs_input=self.x, - action_sampler=self.action_dist.sample(), loss=self.loss, - loss_inputs=self.loss_in, is_training=self.is_training, - state_inputs=self.state_in, state_outputs=self.state_out) + # Setup the policy + self.observations = tf.placeholder( + tf.float32, [None] + list(observation_space.shape)) + dist_class, logit_dim = ModelCatalog.get_action_dist( + action_space, self.config["model"]) + self.model = ModelCatalog.get_model( + self.observations, logit_dim, self.config["model"]) + action_dist = dist_class(self.model.outputs) + self.vf = tf.reshape( + linear(self.model.last_layer, 1, "value", normc_initializer(1.0)), + [-1]) + self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, + tf.get_variable_scope().name) + is_training = tf.placeholder_with_default(True, ()) - self.sess.run(tf.global_variables_initializer()) - - if self.summarize: - bs = tf.to_float(tf.shape(self.x)[0]) - tf.summary.scalar("model/policy_graph", self.pi_loss / bs) - tf.summary.scalar("model/value_loss", self.vf_loss / bs) - tf.summary.scalar("model/entropy", self.entropy / bs) - tf.summary.scalar("model/grad_gnorm", tf.global_norm(self._grads)) - tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list)) - self.summary_op = tf.summary.merge_all() - - def _setup_graph(self, ob_space, ac_space): - raise NotImplementedError - - def setup_loss(self, action_space): + # Setup the policy loss if isinstance(action_space, gym.spaces.Box): ac_size = action_space.shape[0] - self.ac = tf.placeholder(tf.float32, [None, ac_size], name="ac") + actions = tf.placeholder(tf.float32, [None, ac_size], name="ac") elif isinstance(action_space, gym.spaces.Discrete): - self.ac = tf.placeholder(tf.int64, [None], name="ac") + actions = tf.placeholder(tf.int64, [None], name="ac") else: raise UnsupportedSpaceException( "Action space {} is not supported for A3C.".format( action_space)) - self.adv = tf.placeholder(tf.float32, [None], name="adv") - self.r = tf.placeholder(tf.float32, [None], name="r") + advantages = tf.placeholder(tf.float32, [None], name="advantages") + v_target = tf.placeholder(tf.float32, [None], name="v_target") + self.loss = A3CLoss( + action_dist, actions, advantages, v_target, self.vf, + self.config["vf_loss_coeff"], self.config["entropy_coeff"]) - log_prob = self.action_dist.logp(self.ac) + # Initialize TFPolicyGraph + loss_in = [ + ("obs", self.observations), + ("actions", actions), + ("advantages", advantages), + ("value_targets", v_target), + ] + for i, ph in enumerate(self.model.state_in): + loss_in.append(("state_in_{}".format(i), ph)) + self.state_in = self.model.state_in + self.state_out = self.model.state_out + TFPolicyGraph.__init__( + self, observation_space, action_space, self.sess, + obs_input=self.observations, action_sampler=action_dist.sample(), + loss=self.loss.total_loss, loss_inputs=loss_in, + is_training=is_training, state_inputs=self.state_in, + state_outputs=self.state_out) - # The "policy gradients" loss: its derivative is precisely the policy - # gradient. Notice that self.ac is a placeholder that is provided - # externally. adv will contain the advantages, as calculated in - # compute_advantages. - self.pi_loss = - tf.reduce_sum(log_prob * self.adv) + if self.config.get("summarize"): + bs = tf.to_float(tf.shape(self.observations)[0]) + tf.summary.scalar("model/policy_graph", self.loss.pi_loss / bs) + tf.summary.scalar("model/value_loss", self.loss.vf_loss / bs) + tf.summary.scalar("model/entropy", self.loss.entropy / bs) + tf.summary.scalar("model/grad_gnorm", tf.global_norm(self._grads)) + tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list)) + self.summary_op = tf.summary.merge_all() - delta = self.vf - self.r - self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta)) - self.entropy = tf.reduce_sum(self.action_dist.entropy()) - self.loss = (self.pi_loss + - self.vf_loss * self.config["vf_loss_coeff"] + - self.entropy * self.config["entropy_coeff"]) + self.sess.run(tf.global_variables_initializer()) + + def extra_compute_action_fetches(self): + return {"vf_preds": self.vf} + + def value(self, ob, *args): + feed_dict = {self.observations: [ob]} + assert len(args) == len(self.state_in), (args, self.state_in) + for k, v in zip(self.state_in, args): + feed_dict[k] = v + vf = self.sess.run(self.vf, feed_dict) + return vf[0] def optimizer(self): return tf.train.AdamOptimizer(self.config["lr"]) def gradients(self, optimizer): - grads = tf.gradients(self.loss, self.var_list) + grads = tf.gradients(self.loss.total_loss, self.var_list) self.grads, _ = tf.clip_by_global_norm(grads, self.config["grad_clip"]) clipped_grads = list(zip(self.grads, self.var_list)) return clipped_grads def extra_compute_grad_fetches(self): - if self.summarize: + if self.config.get("summarize"): return {"summary": self.summary_op} else: return {} + def get_initial_state(self): + return self.model.state_init + def postprocess_trajectory(self, sample_batch, other_agent_batches=None): completed = sample_batch["dones"][-1] if completed: diff --git a/python/ray/rllib/a3c/a3c_torch_policy.py b/python/ray/rllib/a3c/a3c_torch_policy.py index 79ae75c11..3813f1e20 100644 --- a/python/ray/rllib/a3c/a3c_torch_policy.py +++ b/python/ray/rllib/a3c/a3c_torch_policy.py @@ -2,114 +2,78 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import numpy as np -from threading import Lock - import torch import torch.nn.functional as F +from torch import nn import ray -from ray.rllib.models.pytorch.misc import var_to_np, convert_batch +from ray.rllib.models.pytorch.misc import var_to_np from ray.rllib.models.catalog import ModelCatalog -from ray.rllib.utils.process_rollout import compute_advantages -from ray.rllib.utils.policy_graph import PolicyGraph +from ray.rllib.utils.postprocessing import compute_advantages +from ray.rllib.utils.torch_policy_graph import TorchPolicyGraph -class SharedTorchPolicy(PolicyGraph): - """A simple, non-recurrent PyTorch policy example.""" +class A3CLoss(nn.Module): + def __init__(self, policy_model, vf_loss_coeff=0.5, entropy_coeff=-0.01): + nn.Module.__init__(self) + self.policy_model = policy_model + self.vf_loss_coeff = vf_loss_coeff + self.entropy_coeff = entropy_coeff - def __init__(self, obs_space, action_space, config): - config = dict(ray.rllib.a3c.a3c.DEFAULT_CONFIG, **config) - PolicyGraph.__init__(self, obs_space, action_space, config) - self.local_steps = 0 - self.config = config - self.summarize = config.get("summarize") - self.setup_graph(obs_space, action_space) - torch.set_num_threads(2) - self.lock = Lock() - - def setup_graph(self, obs_space, action_space): - _, self.logit_dim = ModelCatalog.get_action_dist( - action_space, self.config["model"]) - self._model = ModelCatalog.get_torch_model( - obs_space.shape, self.logit_dim, self.config["model"]) - self.optimizer = torch.optim.Adam( - self._model.parameters(), lr=self.config["lr"]) - - def compute_actions(self, obs, state, is_training=False): - assert not state, "RNN not supported" - with self.lock: - ob = torch.from_numpy(np.array(obs)).float() - logits, values = self._model(ob) - samples = F.softmax(logits, dim=1).multinomial(1).squeeze(0) - return var_to_np(samples), [], {"vf_preds": var_to_np(values)} - - def compute_gradients(self, samples): - with self.lock: - self.backward(samples) - # Note that return values are just references; - # calling zero_grad will modify the values - return [p.grad.data.numpy() for p in self._model.parameters()], {} - - def apply_gradients(self, grads): - self.optimizer.zero_grad() - for g, p in zip(grads, self._model.parameters()): - p.grad = torch.from_numpy(g) - self.optimizer.step() - return {} - - def get_weights(self): - # !! This only returns references to the data. - return self._model.state_dict() - - def set_weights(self, weights): - with self.lock: - self._model.load_state_dict(weights) - - def value(self, obs): - with self.lock: - obs = torch.from_numpy(obs).float().unsqueeze(0) - res = self._model.hidden_layers(obs) - res = self._model.value_branch(res) - res = res.squeeze() - return var_to_np(res) - - def forward(self, obs_batch, actions): - logits, values = self._model(obs_batch) + def forward(self, observations, actions, advantages, value_targets): + logits, values = self.policy_model(observations) log_probs = F.log_softmax(logits, dim=1) probs = F.softmax(logits, dim=1) action_log_probs = log_probs.gather(1, actions.view(-1, 1)) entropy = -(log_probs * probs).sum(-1).sum() - return values, action_log_probs, entropy - - def backward(self, sample_batch): - """Loss is encoded here. - - Defining a new loss function would start by rewriting this function. - """ - - states, actions, advs, rs = convert_batch(sample_batch) - values, action_log_probs, entropy = self.forward(states, actions) - pi_err = -advs.dot(action_log_probs.reshape(-1)) - value_err = F.mse_loss(values.reshape(-1), rs) - - self.optimizer.zero_grad() - + pi_err = -advantages.dot(action_log_probs.reshape(-1)) + value_err = F.mse_loss(values.reshape(-1), value_targets) overall_err = sum([ pi_err, - self.config["vf_loss_coeff"] * value_err, - self.config["entropy_coeff"] * entropy, + self.vf_loss_coeff * value_err, + self.entropy_coeff * entropy, ]) + return overall_err - overall_err.backward() - torch.nn.utils.clip_grad_norm_(self._model.parameters(), - self.config["grad_clip"]) + +class A3CTorchPolicyGraph(TorchPolicyGraph): + """A simple, non-recurrent PyTorch policy example.""" + + def __init__(self, obs_space, action_space, config): + config = dict(ray.rllib.a3c.a3c.DEFAULT_CONFIG, **config) + self.config = config + _, self.logit_dim = ModelCatalog.get_action_dist( + action_space, self.config["model"]) + self.model = ModelCatalog.get_torch_model( + obs_space.shape, self.logit_dim, self.config["model"]) + loss = A3CLoss( + self.model, self.config["vf_loss_coeff"], + self.config["entropy_coeff"]) + TorchPolicyGraph.__init__( + self, obs_space, action_space, self.model, loss, + loss_inputs=[ + "obs", "actions", "advantages", "value_targets"]) + + def extra_action_out(self, model_out): + return {"vf_preds": var_to_np(model_out[1])} + + def optimizer(self): + return torch.optim.Adam( + self.model.parameters(), lr=self.config["lr"]) def postprocess_trajectory(self, sample_batch, other_agent_batches=None): completed = sample_batch["dones"][-1] if completed: last_r = 0.0 else: - last_r = self.value(sample_batch["new_obs"][-1]) + last_r = self._value(sample_batch["new_obs"][-1]) return compute_advantages( sample_batch, last_r, self.config["gamma"], self.config["lambda"]) + + def _value(self, obs): + with self.lock: + obs = torch.from_numpy(obs).float().unsqueeze(0) + res = self.model.hidden_layers(obs) + res = self.model.value_branch(res) + res = res.squeeze() + return var_to_np(res) diff --git a/python/ray/rllib/a3c/common.py b/python/ray/rllib/a3c/common.py deleted file mode 100644 index cc2179c2f..000000000 --- a/python/ray/rllib/a3c/common.py +++ /dev/null @@ -1,16 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - - -def get_policy_cls(config): - if config["use_lstm"]: - from ray.rllib.a3c.shared_model_lstm import SharedModelLSTM - policy_cls = SharedModelLSTM - elif config["use_pytorch"]: - from ray.rllib.a3c.a3c_torch_policy import SharedTorchPolicy - policy_cls = SharedTorchPolicy - else: - from ray.rllib.a3c.shared_model import SharedModel - policy_cls = SharedModel - return policy_cls diff --git a/python/ray/rllib/a3c/shared_model.py b/python/ray/rllib/a3c/shared_model.py deleted file mode 100644 index 7b442ed61..000000000 --- a/python/ray/rllib/a3c/shared_model.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -from ray.rllib.models.misc import linear, normc_initializer -from ray.rllib.a3c.a3c_tf_policy import A3CTFPolicyGraph -from ray.rllib.models.catalog import ModelCatalog - - -class SharedModel(A3CTFPolicyGraph): - - def __init__(self, ob_space, ac_space, config, **kwargs): - super(SharedModel, self).__init__( - ob_space, ac_space, config, **kwargs) - - def _setup_graph(self, ob_space, ac_space): - self.x = tf.placeholder(tf.float32, [None] + list(ob_space.shape)) - dist_class, self.logit_dim = ModelCatalog.get_action_dist( - ac_space, self.config["model"]) - self._model = ModelCatalog.get_model( - self.x, self.logit_dim, self.config["model"]) - self.logits = self._model.outputs - self.action_dist = dist_class(self.logits) - self.vf = tf.reshape(linear(self._model.last_layer, 1, "value", - normc_initializer(1.0)), [-1]) - - self.sample = self.action_dist.sample() - self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, - tf.get_variable_scope().name) - self.global_step = tf.get_variable( - "global_step", [], tf.int32, - initializer=tf.constant_initializer(0, dtype=tf.int32), - trainable=False) - - self.state_in = [] - self.state_out = [] - - def setup_loss(self, action_space): - A3CTFPolicyGraph.setup_loss(self, action_space) - self.loss_in = [ - ("obs", self.x), - ("actions", self.ac), - ("advantages", self.adv), - ("value_targets", self.r), - ] - - def extra_compute_action_fetches(self): - return {"vf_preds": self.vf} - - def value(self, ob, *args): - vf = self.sess.run(self.vf, {self.x: [ob]}) - return vf[0] diff --git a/python/ray/rllib/a3c/shared_model_lstm.py b/python/ray/rllib/a3c/shared_model_lstm.py deleted file mode 100644 index abb0186bc..000000000 --- a/python/ray/rllib/a3c/shared_model_lstm.py +++ /dev/null @@ -1,63 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -from ray.rllib.models.misc import linear, normc_initializer -from ray.rllib.models.catalog import ModelCatalog -from ray.rllib.a3c.a3c_tf_policy import A3CTFPolicyGraph -from ray.rllib.models.lstm import LSTM - - -class SharedModelLSTM(A3CTFPolicyGraph): - - def __init__(self, ob_space, ac_space, config, **kwargs): - super(SharedModelLSTM, self).__init__( - ob_space, ac_space, config, **kwargs) - - def _setup_graph(self, ob_space, ac_space): - self.x = tf.placeholder(tf.float32, [None] + list(ob_space.shape)) - dist_class, self.logit_dim = ModelCatalog.get_action_dist( - ac_space, self.config["model"]) - self._model = LSTM(self.x, self.logit_dim, {}) - - self.state_in = self._model.state_in - self.state_out = self._model.state_out - - self.logits = self._model.outputs - self.action_dist = dist_class(self.logits) - # with tf.variable_scope("vf"): - # vf_model = ModelCatalog.get_model(self.x, 1) - self.vf = tf.reshape(linear(self._model.last_layer, 1, "value", - normc_initializer(1.0)), [-1]) - - self.sample = self.action_dist.sample() - self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, - tf.get_variable_scope().name) - self.global_step = tf.get_variable( - "global_step", [], tf.int32, - initializer=tf.constant_initializer(0, dtype=tf.int32), - trainable=False) - - def get_initial_state(self): - return self._model.state_init - - def setup_loss(self, action_space): - A3CTFPolicyGraph.setup_loss(self, action_space) - self.loss_in = [ - ("obs", self.x), - ("actions", self.ac), - ("advantages", self.adv), - ("value_targets", self.r), - ("state_in_0", self.state_in[0]), - ("state_in_1", self.state_in[1]), - ] - - def extra_compute_action_fetches(self): - return {"vf_preds": self.vf} - - def value(self, ob, c, h): - vf = self.sess.run(self.vf, {self.x: [ob], - self.state_in[0]: c, - self.state_in[1]: h}) - return vf[0] diff --git a/python/ray/rllib/ddpg/ddpg_policy_graph.py b/python/ray/rllib/ddpg/ddpg_policy_graph.py index a76d3fa8d..870c5bcec 100644 --- a/python/ray/rllib/ddpg/ddpg_policy_graph.py +++ b/python/ray/rllib/ddpg/ddpg_policy_graph.py @@ -22,62 +22,88 @@ Q_SCOPE = "q_func" Q_TARGET_SCOPE = "target_q_func" -def _build_p_network(inputs, dim_actions, config): - """ - map an observation (i.e., state) to an action where - each entry takes value from (0, 1) due to the sigmoid function - """ - frontend = ModelCatalog.get_model(inputs, 1, config["model"]) +class PNetwork(object): + """Maps an observations (i.e., state) to an action where each entry takes + value from (0, 1) due to the sigmoid function.""" - hiddens = config["actor_hiddens"] - action_out = frontend.last_layer - for hidden in hiddens: - action_out = layers.fully_connected( - action_out, num_outputs=hidden, activation_fn=tf.nn.relu) - # Use sigmoid layer to bound values within (0, 1) - # shape of action_scores is [batch_size, dim_actions] - action_scores = layers.fully_connected( - action_out, num_outputs=dim_actions, activation_fn=tf.nn.sigmoid) - - return action_scores + def __init__(self, model, dim_actions, hiddens=[64, 64]): + action_out = model.last_layer + for hidden in hiddens: + action_out = layers.fully_connected( + action_out, num_outputs=hidden, activation_fn=tf.nn.relu) + # Use sigmoid layer to bound values within (0, 1) + # shape of action_scores is [batch_size, dim_actions] + self.action_scores = layers.fully_connected( + action_out, num_outputs=dim_actions, activation_fn=tf.nn.sigmoid) -# As a stochastic policy for inference, but a deterministic policy for training -# thus ignore batch_size issue when constructing a stochastic action -def _build_action_network(p_values, low_action, high_action, stochastic, eps, - theta, sigma): - # shape is [None, dim_action] - deterministic_actions = (high_action - low_action) * p_values + low_action +class ActionNetwork(object): + """Acts as a stochastic policy for inference, but a deterministic policy + for training, thus ignoring the batch_size issue when constructing a + stochastic action.""" - exploration_sample = tf.get_variable( - name="ornstein_uhlenbeck", - dtype=tf.float32, - initializer=low_action.size * [.0], - trainable=False) - normal_sample = tf.random_normal( - shape=[low_action.size], mean=0.0, stddev=1.0) - exploration_value = tf.assign_add( - exploration_sample, - theta * (.0 - exploration_sample) + sigma * normal_sample) - stochastic_actions = deterministic_actions + eps * ( - high_action - low_action) * exploration_value + def __init__( + self, p_values, low_action, high_action, stochastic, eps, + theta=0.15, sigma=0.2): - return tf.cond(stochastic, lambda: stochastic_actions, - lambda: deterministic_actions) + # shape is [None, dim_action] + deterministic_actions = ( + (high_action - low_action) * p_values + low_action) + + exploration_sample = tf.get_variable( + name="ornstein_uhlenbeck", + dtype=tf.float32, + initializer=low_action.size * [.0], + trainable=False) + normal_sample = tf.random_normal( + shape=[low_action.size], mean=0.0, stddev=1.0) + exploration_value = tf.assign_add( + exploration_sample, + theta * (.0 - exploration_sample) + sigma * normal_sample) + stochastic_actions = deterministic_actions + eps * ( + high_action - low_action) * exploration_value + + self.actions = tf.cond( + stochastic, lambda: stochastic_actions, + lambda: deterministic_actions) -def _build_q_network(inputs, action_inputs, config): - frontend = ModelCatalog.get_model(inputs, 1, config["model"]) +class QNetwork(object): + def __init__(self, model, action_inputs, hiddens=[64, 64]): + q_out = tf.concat([model.last_layer, action_inputs], axis=1) + for hidden in hiddens: + q_out = layers.fully_connected( + q_out, num_outputs=hidden, activation_fn=tf.nn.relu) + self.value = layers.fully_connected( + q_out, num_outputs=1, activation_fn=None) - hiddens = config["critic_hiddens"] - q_out = tf.concat([frontend.last_layer, action_inputs], axis=1) - for hidden in hiddens: - q_out = layers.fully_connected( - q_out, num_outputs=hidden, activation_fn=tf.nn.relu) - q_scores = layers.fully_connected(q_out, num_outputs=1, activation_fn=None) +class ActorCriticLoss(object): + def __init__( + self, q_t, q_tp1, q_tp0, importance_weights, rewards, done_mask, + gamma=0.99, n_step=1, use_huber=False, huber_threshold=1.0): - return q_scores + q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) + + q_tp1_best = tf.squeeze( + input=q_tp1, axis=len(q_tp1.shape) - 1) + q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = rewards + gamma**n_step * q_tp1_best_masked + + # compute the error (potentially clipped) + self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) + if use_huber: + errors = _huber_loss(self.td_error, huber_threshold) + else: + errors = 0.5 * tf.square(self.td_error) + + self.critic_loss = tf.reduce_mean(importance_weights * errors) + + # for policy gradient + self.actor_loss = -1.0 * tf.reduce_mean(q_tp0) + self.total_loss = self.actor_loss + self.critic_loss class DDPGPolicyGraph(TFPolicyGraph): @@ -98,6 +124,28 @@ class DDPGPolicyGraph(TFPolicyGraph): self.critic_optimizer = tf.train.AdamOptimizer( learning_rate=config["critic_lr"]) + def _build_q_network(obs, actions): + return QNetwork( + ModelCatalog.get_model(obs, 1, config["model"]), + actions, + config["critic_hiddens"]).value + + def _build_p_network(obs): + return PNetwork( + ModelCatalog.get_model(obs, 1, config["model"]), + dim_actions, + config["actor_hiddens"]).action_scores + + def _build_action_network(p_values, stochastic, eps): + return ActionNetwork( + p_values, + low_action, + high_action, + stochastic, + eps, + config["exploration_theta"], + config["exploration_sigma"]).actions + # Action inputs self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") self.eps = tf.placeholder(tf.float32, (), name="eps") @@ -106,15 +154,13 @@ class DDPGPolicyGraph(TFPolicyGraph): # Actor: P (policy) network with tf.variable_scope(P_SCOPE) as scope: - p_values = _build_p_network(self.cur_observations, - dim_actions, config) + p_values = _build_p_network(self.cur_observations) self.p_func_vars = _scope_vars(scope.name) # Action outputs with tf.variable_scope(A_SCOPE): self.output_actions = _build_action_network( - p_values, low_action, high_action, self.stochastic, self.eps, - config["exploration_theta"], config["exploration_sigma"]) + p_values, self.stochastic, self.eps) with tf.variable_scope(A_SCOPE, reuse=True): exploration_sample = tf.get_variable(name="ornstein_uhlenbeck") @@ -137,11 +183,11 @@ class DDPGPolicyGraph(TFPolicyGraph): # p network evaluation with tf.variable_scope(P_SCOPE, reuse=True) as scope: - self.p_t = _build_p_network(self.obs_t, dim_actions, config) + self.p_t = _build_p_network(self.obs_t) # target p network evaluation with tf.variable_scope(P_TARGET_SCOPE) as scope: - p_tp1 = _build_p_network(self.obs_tp1, dim_actions, config) + p_tp1 = _build_p_network(self.obs_tp1) target_p_func_vars = _scope_vars(scope.name) # Action outputs @@ -149,59 +195,37 @@ class DDPGPolicyGraph(TFPolicyGraph): deterministic_flag = tf.constant(value=False, dtype=tf.bool) zero_eps = tf.constant(value=.0, dtype=tf.float32) output_actions = _build_action_network( - self.p_t, low_action, high_action, deterministic_flag, - zero_eps, config["exploration_theta"], - config["exploration_sigma"]) + self.p_t, deterministic_flag, zero_eps) output_actions_estimated = _build_action_network( - p_tp1, low_action, high_action, deterministic_flag, - zero_eps, config["exploration_theta"], - config["exploration_sigma"]) + p_tp1, deterministic_flag, zero_eps) # q network evaluation with tf.variable_scope(Q_SCOPE) as scope: - q_t = _build_q_network(self.obs_t, self.act_t, config) + q_t = _build_q_network(self.obs_t, self.act_t) self.q_func_vars = _scope_vars(scope.name) with tf.variable_scope(Q_SCOPE, reuse=True): - q_tp0 = _build_q_network(self.obs_t, output_actions, config) + q_tp0 = _build_q_network(self.obs_t, output_actions) # target q network evalution with tf.variable_scope(Q_TARGET_SCOPE) as scope: - q_tp1 = _build_q_network( - self.obs_tp1, output_actions_estimated, config) + q_tp1 = _build_q_network(self.obs_tp1, output_actions_estimated) target_q_func_vars = _scope_vars(scope.name) - q_t_selected = tf.squeeze(q_t, axis=len(q_t.shape) - 1) - - q_tp1_best = tf.squeeze( - input=q_tp1, axis=len(q_tp1.shape) - 1) - q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best - - # compute RHS of bellman equation - q_t_selected_target = ( - self.rew_t + config["gamma"]**config["n_step"] * q_tp1_best_masked) - - # compute the error (potentially clipped) - self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) - if config.get("use_huber"): - errors = _huber_loss(self.td_error, config.get("huber_threshold")) - else: - errors = 0.5 * tf.square(self.td_error) - - self.loss = tf.reduce_mean(self.importance_weights * errors) - - # for policy gradient - self.actor_loss = -1.0 * tf.reduce_mean(q_tp0) + self.loss = ActorCriticLoss( + q_t, q_tp1, q_tp0, self.importance_weights, self.rew_t, + self.done_mask, config["gamma"], config["n_step"], + config["use_huber"], config["huber_threshold"]) if config["l2_reg"] is not None: for var in self.p_func_vars: if "bias" not in var.name: - self.actor_loss += ( + self.loss.actor_loss += ( config["l2_reg"] * 0.5 * tf.nn.l2_loss(var)) for var in self.q_func_vars: if "bias" not in var.name: - self.loss += config["l2_reg"] * 0.5 * tf.nn.l2_loss( - var) + self.loss.critic_loss += ( + config["l2_reg"] * 0.5 * tf.nn.l2_loss(var)) # update_target_fn will be called periodically to copy Q network to # target Q network @@ -235,7 +259,7 @@ class DDPGPolicyGraph(TFPolicyGraph): TFPolicyGraph.__init__( self, observation_space, action_space, self.sess, obs_input=self.cur_observations, - action_sampler=self.output_actions, loss=self.loss, + action_sampler=self.output_actions, loss=self.loss.total_loss, loss_inputs=self.loss_inputs, is_training=self.is_training) self.sess.run(tf.global_variables_initializer()) @@ -251,19 +275,19 @@ class DDPGPolicyGraph(TFPolicyGraph): if self.config["grad_norm_clipping"] is not None: actor_grads_and_vars = _minimize_and_clip( self.actor_optimizer, - self.actor_loss, + self.loss.actor_loss, var_list=self.p_func_vars, clip_val=self.config["grad_norm_clipping"]) critic_grads_and_vars = _minimize_and_clip( self.critic_optimizer, - self.loss, + self.loss.critic_loss, var_list=self.q_func_vars, clip_val=self.config["grad_norm_clipping"]) else: actor_grads_and_vars = self.actor_optimizer.compute_gradients( - self.actor_loss, var_list=self.p_func_vars) + self.loss.actor_loss, var_list=self.p_func_vars) critic_grads_and_vars = self.critic_optimizer.compute_gradients( - self.loss, var_list=self.q_func_vars) + self.loss.critic_loss, var_list=self.q_func_vars) actor_grads_and_vars = [ (g, v) for (g, v) in actor_grads_and_vars if g is not None] critic_grads_and_vars = [ @@ -279,7 +303,7 @@ class DDPGPolicyGraph(TFPolicyGraph): def extra_compute_grad_fetches(self): return { - "td_error": self.td_error, + "td_error": self.loss.td_error, } def postprocess_trajectory(self, sample_batch, other_agent_batches=None): @@ -288,7 +312,7 @@ class DDPGPolicyGraph(TFPolicyGraph): def compute_td_error(self, obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): td_err = self.sess.run( - self.td_error, + self.loss.td_error, feed_dict={ self.obs_t: [np.array(ob) for ob in obs_t], self.act_t: act_t, diff --git a/python/ray/rllib/dqn/dqn_policy_graph.py b/python/ray/rllib/dqn/dqn_policy_graph.py index 5db7bc651..ecf6ac5dc 100644 --- a/python/ray/rllib/dqn/dqn_policy_graph.py +++ b/python/ray/rllib/dqn/dqn_policy_graph.py @@ -18,6 +18,224 @@ Q_SCOPE = "q_func" Q_TARGET_SCOPE = "target_q_func" +class QNetwork(object): + def __init__(self, model, num_actions, dueling=False, hiddens=[256]): + with tf.variable_scope("action_value"): + action_out = model.last_layer + for hidden in hiddens: + action_out = layers.fully_connected( + action_out, num_outputs=hidden, activation_fn=tf.nn.relu) + action_scores = layers.fully_connected( + action_out, num_outputs=num_actions, activation_fn=None) + + if dueling: + with tf.variable_scope("state_value"): + state_out = model.last_layer + for hidden in hiddens: + state_out = layers.fully_connected( + state_out, num_outputs=hidden, + activation_fn=tf.nn.relu) + state_score = layers.fully_connected( + state_out, num_outputs=1, activation_fn=None) + action_scores_mean = tf.reduce_mean(action_scores, 1) + action_scores_centered = action_scores - tf.expand_dims( + action_scores_mean, 1) + self.value = state_score + action_scores_centered + else: + self.value = action_scores + + +class QValuePolicy(object): + def __init__(self, q_values, observations, num_actions, stochastic, eps): + deterministic_actions = tf.argmax(q_values, axis=1) + batch_size = tf.shape(observations)[0] + random_actions = tf.random_uniform( + tf.stack([batch_size]), minval=0, maxval=num_actions, + dtype=tf.int64) + chose_random = tf.random_uniform( + tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps + stochastic_actions = tf.where( + chose_random, random_actions, deterministic_actions) + self.action = tf.cond( + stochastic, lambda: stochastic_actions, + lambda: deterministic_actions) + + +class QLoss(object): + def __init__( + self, q_t_selected, q_tp1_best, importance_weights, rewards, + done_mask, gamma=0.99, n_step=1): + + q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best + + # compute RHS of bellman equation + q_t_selected_target = rewards + gamma ** n_step * q_tp1_best_masked + + # compute the error (potentially clipped) + self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) + self.loss = tf.reduce_mean( + importance_weights * _huber_loss(self.td_error)) + + +class DQNPolicyGraph(TFPolicyGraph): + def __init__(self, observation_space, action_space, config): + config = dict(ray.rllib.dqn.dqn.DEFAULT_CONFIG, **config) + if not isinstance(action_space, Discrete): + raise UnsupportedSpaceException( + "Action space {} is not supported for DQN.".format( + action_space)) + + self.config = config + self.cur_epsilon = 1.0 + num_actions = action_space.n + + def _build_q_network(obs): + return QNetwork( + ModelCatalog.get_model(obs, 1, config["model"]), + num_actions, config["dueling"], config["hiddens"]).value + + # Action inputs + self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") + self.eps = tf.placeholder(tf.float32, (), name="eps") + self.cur_observations = tf.placeholder( + tf.float32, shape=(None,) + observation_space.shape) + + # Action Q network + with tf.variable_scope(Q_SCOPE) as scope: + q_values = _build_q_network(self.cur_observations) + self.q_func_vars = _scope_vars(scope.name) + + # Action outputs + self.output_actions = QValuePolicy( + q_values, + self.cur_observations, + num_actions, + self.stochastic, + self.eps).action + + # Replay inputs + self.obs_t = tf.placeholder( + tf.float32, shape=(None,) + observation_space.shape) + self.act_t = tf.placeholder(tf.int32, [None], name="action") + self.rew_t = tf.placeholder(tf.float32, [None], name="reward") + self.obs_tp1 = tf.placeholder( + tf.float32, shape=(None,) + observation_space.shape) + self.done_mask = tf.placeholder(tf.float32, [None], name="done") + self.importance_weights = tf.placeholder( + tf.float32, [None], name="weight") + + # q network evaluation + with tf.variable_scope(Q_SCOPE, reuse=True): + q_t = _build_q_network(self.obs_t) + + # target q network evalution + with tf.variable_scope(Q_TARGET_SCOPE) as scope: + q_tp1 = _build_q_network(self.obs_tp1) + self.target_q_func_vars = _scope_vars(scope.name) + + # q scores for actions which we know were selected in the given state. + q_t_selected = tf.reduce_sum( + q_t * tf.one_hot(self.act_t, num_actions), 1) + + # compute estimate of best possible value starting from state at t + 1 + if config["double_q"]: + with tf.variable_scope(Q_SCOPE, reuse=True): + q_tp1_using_online_net = _build_q_network(self.obs_tp1) + q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) + q_tp1_best = tf.reduce_sum( + q_tp1 * tf.one_hot( + q_tp1_best_using_online_net, num_actions), 1) + else: + q_tp1_best = tf.reduce_max(q_tp1, 1) + + self.loss = QLoss( + q_t_selected, q_tp1_best, self.importance_weights, + self.rew_t, self.done_mask, config["gamma"], config["n_step"]) + + # update_target_fn will be called periodically to copy Q network to + # target Q network + update_target_expr = [] + for var, var_target in zip( + sorted(self.q_func_vars, key=lambda v: v.name), + sorted(self.target_q_func_vars, key=lambda v: v.name)): + update_target_expr.append(var_target.assign(var)) + self.update_target_expr = tf.group(*update_target_expr) + + # initialize TFPolicyGraph + self.sess = tf.get_default_session() + self.loss_inputs = [ + ("obs", self.obs_t), + ("actions", self.act_t), + ("rewards", self.rew_t), + ("new_obs", self.obs_tp1), + ("dones", self.done_mask), + ("weights", self.importance_weights), + ] + self.is_training = tf.placeholder_with_default(True, ()) + TFPolicyGraph.__init__( + self, observation_space, action_space, self.sess, + obs_input=self.cur_observations, + action_sampler=self.output_actions, loss=self.loss.loss, + loss_inputs=self.loss_inputs, is_training=self.is_training) + self.sess.run(tf.global_variables_initializer()) + + def optimizer(self): + return tf.train.AdamOptimizer(learning_rate=self.config["lr"]) + + def gradients(self, optimizer): + if self.config["grad_norm_clipping"] is not None: + grads_and_vars = _minimize_and_clip( + optimizer, self.loss.loss, var_list=self.q_func_vars, + clip_val=self.config["grad_norm_clipping"]) + else: + grads_and_vars = optimizer.compute_gradients( + self.loss.loss, var_list=self.q_func_vars) + grads_and_vars = [ + (g, v) for (g, v) in grads_and_vars if g is not None] + return grads_and_vars + + def extra_compute_action_feed_dict(self): + return { + self.stochastic: True, + self.eps: self.cur_epsilon, + } + + def extra_compute_grad_fetches(self): + return { + "td_error": self.loss.td_error, + } + + def postprocess_trajectory(self, sample_batch, other_agent_batches=None): + return _postprocess_dqn(self, sample_batch) + + def compute_td_error( + self, obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): + td_err = self.sess.run( + self.loss.td_error, + feed_dict={ + self.obs_t: [np.array(ob) for ob in obs_t], + self.act_t: act_t, + self.rew_t: rew_t, + self.obs_tp1: [np.array(ob) for ob in obs_tp1], + self.done_mask: done_mask, + self.importance_weights: importance_weights + }) + return td_err + + def update_target(self): + return self.sess.run(self.update_target_expr) + + def set_epsilon(self, epsilon): + self.cur_epsilon = epsilon + + def get_state(self): + return [TFPolicyGraph.get_state(self), self.cur_epsilon] + + def set_state(self, state): + TFPolicyGraph.set_state(self, state[0]) + self.set_epsilon(state[1]) + + def adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): """Rewrites the given trajectory fragments to encode n-step rewards. @@ -46,169 +264,6 @@ def adjust_nstep(n_step, gamma, obs, actions, rewards, new_obs, dones): del arr[new_len:] -class DQNPolicyGraph(TFPolicyGraph): - def __init__(self, observation_space, action_space, config): - config = dict(ray.rllib.dqn.dqn.DEFAULT_CONFIG, **config) - if not isinstance(action_space, Discrete): - raise UnsupportedSpaceException( - "Action space {} is not supported for DQN.".format( - action_space)) - - self.config = config - self.cur_epsilon = 1.0 - num_actions = action_space.n - - # Action inputs - self.stochastic = tf.placeholder(tf.bool, (), name="stochastic") - self.eps = tf.placeholder(tf.float32, (), name="eps") - self.cur_observations = tf.placeholder( - tf.float32, shape=(None,) + observation_space.shape) - - # Action Q network - with tf.variable_scope(Q_SCOPE) as scope: - q_values = _build_q_network( - self.cur_observations, num_actions, config) - self.q_func_vars = _scope_vars(scope.name) - - # Action outputs - self.output_actions = _build_action_network( - q_values, - self.cur_observations, - num_actions, - self.stochastic, - self.eps) - - # Replay inputs - self.obs_t = tf.placeholder( - tf.float32, shape=(None,) + observation_space.shape) - self.act_t = tf.placeholder(tf.int32, [None], name="action") - self.rew_t = tf.placeholder(tf.float32, [None], name="reward") - self.obs_tp1 = tf.placeholder( - tf.float32, shape=(None,) + observation_space.shape) - self.done_mask = tf.placeholder(tf.float32, [None], name="done") - self.importance_weights = tf.placeholder( - tf.float32, [None], name="weight") - - # q network evaluation - with tf.variable_scope(Q_SCOPE, reuse=True): - q_t = _build_q_network(self.obs_t, num_actions, config) - - # target q network evalution - with tf.variable_scope(Q_TARGET_SCOPE) as scope: - q_tp1 = _build_q_network(self.obs_tp1, num_actions, config) - self.target_q_func_vars = _scope_vars(scope.name) - - # q scores for actions which we know were selected in the given state. - q_t_selected = tf.reduce_sum( - q_t * tf.one_hot(self.act_t, num_actions), 1) - - # compute estimate of best possible value starting from state at t + 1 - if config["double_q"]: - with tf.variable_scope(Q_SCOPE, reuse=True): - q_tp1_using_online_net = _build_q_network( - self.obs_tp1, num_actions, config) - q_tp1_best_using_online_net = tf.argmax(q_tp1_using_online_net, 1) - q_tp1_best = tf.reduce_sum( - q_tp1 * tf.one_hot( - q_tp1_best_using_online_net, num_actions), 1) - else: - q_tp1_best = tf.reduce_max(q_tp1, 1) - q_tp1_best_masked = (1.0 - self.done_mask) * q_tp1_best - - # compute RHS of bellman equation - q_t_selected_target = ( - self.rew_t + - config["gamma"] ** config["n_step"] * q_tp1_best_masked) - - # compute the error (potentially clipped) - self.td_error = q_t_selected - tf.stop_gradient(q_t_selected_target) - self.loss = tf.reduce_mean( - self.importance_weights * _huber_loss(self.td_error)) - - # update_target_fn will be called periodically to copy Q network to - # target Q network - update_target_expr = [] - for var, var_target in zip( - sorted(self.q_func_vars, key=lambda v: v.name), - sorted(self.target_q_func_vars, key=lambda v: v.name)): - update_target_expr.append(var_target.assign(var)) - self.update_target_expr = tf.group(*update_target_expr) - - # initialize TFPolicyGraph - self.sess = tf.get_default_session() - self.loss_inputs = [ - ("obs", self.obs_t), - ("actions", self.act_t), - ("rewards", self.rew_t), - ("new_obs", self.obs_tp1), - ("dones", self.done_mask), - ("weights", self.importance_weights), - ] - self.is_training = tf.placeholder_with_default(True, ()) - TFPolicyGraph.__init__( - self, observation_space, action_space, self.sess, - obs_input=self.cur_observations, - action_sampler=self.output_actions, loss=self.loss, - loss_inputs=self.loss_inputs, is_training=self.is_training) - self.sess.run(tf.global_variables_initializer()) - - def optimizer(self): - return tf.train.AdamOptimizer(learning_rate=self.config["lr"]) - - def gradients(self, optimizer): - if self.config["grad_norm_clipping"] is not None: - grads_and_vars = _minimize_and_clip( - optimizer, self.loss, var_list=self.q_func_vars, - clip_val=self.config["grad_norm_clipping"]) - else: - grads_and_vars = optimizer.compute_gradients( - self.loss, var_list=self.q_func_vars) - grads_and_vars = [ - (g, v) for (g, v) in grads_and_vars if g is not None] - return grads_and_vars - - def extra_compute_action_feed_dict(self): - return { - self.stochastic: True, - self.eps: self.cur_epsilon, - } - - def extra_compute_grad_fetches(self): - return { - "td_error": self.td_error, - } - - def postprocess_trajectory(self, sample_batch, other_agent_batches=None): - return _postprocess_dqn(self, sample_batch) - - def compute_td_error( - self, obs_t, act_t, rew_t, obs_tp1, done_mask, importance_weights): - td_err = self.sess.run( - self.td_error, - feed_dict={ - self.obs_t: [np.array(ob) for ob in obs_t], - self.act_t: act_t, - self.rew_t: rew_t, - self.obs_tp1: [np.array(ob) for ob in obs_tp1], - self.done_mask: done_mask, - self.importance_weights: importance_weights - }) - return td_err - - def update_target(self): - return self.sess.run(self.update_target_expr) - - def set_epsilon(self, epsilon): - self.cur_epsilon = epsilon - - def get_state(self): - return [TFPolicyGraph.get_state(self), self.cur_epsilon] - - def set_state(self, state): - TFPolicyGraph.set_state(self, state[0]) - self.set_epsilon(state[1]) - - def _postprocess_dqn(policy_graph, sample_batch): obs, actions, rewards, new_obs, dones = [ list(x) for x in sample_batch.columns( @@ -237,51 +292,6 @@ def _postprocess_dqn(policy_graph, sample_batch): return batch -def _build_q_network(inputs, num_actions, config): - dueling = config["dueling"] - hiddens = config["hiddens"] - frontend = ModelCatalog.get_model(inputs, 1, config["model"]) - frontend_out = frontend.last_layer - - with tf.variable_scope("action_value"): - action_out = frontend_out - for hidden in hiddens: - action_out = layers.fully_connected( - action_out, num_outputs=hidden, activation_fn=tf.nn.relu) - action_scores = layers.fully_connected( - action_out, num_outputs=num_actions, activation_fn=None) - - if dueling: - with tf.variable_scope("state_value"): - state_out = frontend_out - for hidden in hiddens: - state_out = layers.fully_connected( - state_out, num_outputs=hidden, activation_fn=tf.nn.relu) - state_score = layers.fully_connected( - state_out, num_outputs=1, activation_fn=None) - action_scores_mean = tf.reduce_mean(action_scores, 1) - action_scores_centered = action_scores - tf.expand_dims( - action_scores_mean, 1) - return state_score + action_scores_centered - else: - return action_scores - - -def _build_action_network( - q_values, observations, num_actions, stochastic, eps): - deterministic_actions = tf.argmax(q_values, axis=1) - batch_size = tf.shape(observations)[0] - random_actions = tf.random_uniform( - tf.stack([batch_size]), minval=0, maxval=num_actions, dtype=tf.int64) - chose_random = tf.random_uniform( - tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps - stochastic_actions = tf.where( - chose_random, random_actions, deterministic_actions) - return tf.cond( - stochastic, lambda: stochastic_actions, - lambda: deterministic_actions) - - def _huber_loss(x, delta=1.0): """Reference: https://en.wikipedia.org/wiki/Huber_loss""" return tf.where( diff --git a/python/ray/rllib/models/__init__.py b/python/ray/rllib/models/__init__.py index af3ac81dc..d381985dc 100644 --- a/python/ray/rllib/models/__init__.py +++ b/python/ray/rllib/models/__init__.py @@ -3,12 +3,10 @@ from ray.rllib.models.action_dist import (ActionDistribution, Categorical, DiagGaussian, Deterministic) from ray.rllib.models.model import Model from ray.rllib.models.fcnet import FullyConnectedNetwork -from ray.rllib.models.convnet import ConvolutionalNetwork from ray.rllib.models.lstm import LSTM from ray.rllib.models.multiagentfcnet import MultiAgentFullyConnectedNetwork __all__ = ["ActionDistribution", "ActionDistribution", "Categorical", "DiagGaussian", "Deterministic", "ModelCatalog", "Model", - "FullyConnectedNetwork", "ConvolutionalNetwork", "LSTM", - "MultiAgentFullyConnectedNetwork"] + "FullyConnectedNetwork", "LSTM", "MultiAgentFullyConnectedNetwork"] diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index 8579d5e89..7d67e591e 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -16,6 +16,7 @@ from ray.rllib.models.action_dist import ( from ray.rllib.models.preprocessors import get_preprocessor from ray.rllib.models.fcnet import FullyConnectedNetwork from ray.rllib.models.visionnet import VisionNetwork +from ray.rllib.models.lstm import LSTM from ray.rllib.models.multiagentfcnet import MultiAgentFullyConnectedNetwork @@ -31,6 +32,7 @@ MODEL_CONFIGS = [ "free_log_std", # Documented in ray.rllib.models.Model "channel_major", # Pytorch conv requires images to be channel-major "squash_to_range", # Whether to squash the action output to space range + "use_lstm", # Whether to use a LSTM model # === Options for custom models === "custom_preprocessor", # Name of a custom preprocessor to use @@ -148,6 +150,9 @@ class ModelCatalog(object): return _global_registry.get(RLLIB_MODEL, model)( inputs, num_outputs, options) + if options.get("use_lstm"): + return LSTM(inputs, num_outputs, options) + obs_rank = len(inputs.shape) - 1 # num_outputs > 1 used to avoid hitting this with the value function diff --git a/python/ray/rllib/models/convnet.py b/python/ray/rllib/models/convnet.py deleted file mode 100644 index 4074e0ad3..000000000 --- a/python/ray/rllib/models/convnet.py +++ /dev/null @@ -1,23 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf - -from ray.rllib.models.model import Model -from ray.rllib.models.misc import normc_initializer, conv2d, linear - - -class ConvolutionalNetwork(Model): - """Generic convolutional network.""" - # TODO(rliaw): converge on one generic ConvNet model - def _init(self, inputs, num_outputs, options): - x = inputs - with tf.name_scope("convnet"): - for i in range(4): - x = tf.nn.elu(conv2d(x, 32, "l{}".format(i+1), [3, 3], [2, 2])) - r, c = x.shape[1].value, x.shape[2].value - x = tf.reshape(x, [-1, r*c*32]) - fc1 = linear(x, 256, "fc1") - fc2 = linear(x, num_outputs, "fc2", normc_initializer(0.01)) - return fc2, fc1 diff --git a/python/ray/rllib/models/ddpgnet.py b/python/ray/rllib/models/ddpgnet.py deleted file mode 100644 index b881e6013..000000000 --- a/python/ray/rllib/models/ddpgnet.py +++ /dev/null @@ -1,49 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -import tensorflow.contrib.slim as slim - -from ray.rllib.models.model import Model - - -class DDPGActor(Model): - """Actor network for DDPG.""" - - def _init(self, inputs, num_outputs, options): - w_normal = tf.truncated_normal_initializer() - w_init = tf.random_uniform_initializer(minval=-0.003, maxval=0.003) - ac_bound = options["action_bound"] - - net = slim.fully_connected( - inputs, 400, activation_fn=tf.nn.relu, - weights_initializer=w_normal) - net = slim.fully_connected( - net, 300, activation_fn=tf.nn.relu, weights_initializer=w_normal) - out = slim.fully_connected( - net, num_outputs, activation_fn=tf.nn.tanh, - weights_initializer=w_init) - scaled_out = tf.multiply(out, ac_bound) - return scaled_out, net - - -class DDPGCritic(Model): - """Critic network for DDPG.""" - - def _init(self, inputs, num_outputs, options): - obs, action = inputs - w_normal = tf.truncated_normal_initializer() - w_init = tf.random_uniform_initializer(minval=-0.0003, maxval=0.0003) - net = slim.fully_connected( - obs, 400, activation_fn=tf.nn.relu, weights_initializer=w_normal) - t1 = slim.fully_connected( - net, 300, activation_fn=None, biases_initializer=None, - weights_initializer=w_normal) - t2 = slim.fully_connected( - action, 300, activation_fn=None, weights_initializer=w_normal) - net = tf.nn.relu(tf.add(t1, t2)) - - out = slim.fully_connected( - net, 1, activation_fn=None, weights_initializer=w_init) - return out, net diff --git a/python/ray/rllib/models/model.py b/python/ray/rllib/models/model.py index b1c5145d8..f9081751c 100644 --- a/python/ray/rllib/models/model.py +++ b/python/ray/rllib/models/model.py @@ -27,9 +27,15 @@ class Model(object): inputs (Tensor): The input placeholder for this model. outputs (Tensor): The output vector of this model. last_layer (Tensor): The network layer right before the model output. + state_init (list): List of initial recurrent state tensors (if any). + state_in (list): List of input recurrent state tensors (if any). + state_out (list): List of output recurrent state tensors (if any). """ def __init__(self, inputs, num_outputs, options): + self.state_init = [] + self.state_in = [] + self.state_out = [] self.inputs = inputs if options.get("free_log_std", False): assert num_outputs % 2 == 0 diff --git a/python/ray/rllib/models/pytorch/misc.py b/python/ray/rllib/models/pytorch/misc.py index cd54fc04b..7a790e43e 100644 --- a/python/ray/rllib/models/pytorch/misc.py +++ b/python/ray/rllib/models/pytorch/misc.py @@ -7,18 +7,8 @@ import numpy as np import torch -def convert_batch(trajectory): - """Convert trajectory from numpy to PT variable""" - states = torch.from_numpy(trajectory["obs"]).float() - acs = torch.from_numpy(trajectory["actions"]) - advs = torch.from_numpy( - trajectory["advantages"].copy()).float().reshape(-1) - rs = torch.from_numpy(trajectory["rewards"]).float().reshape(-1) - return states, acs, advs, rs - - def var_to_np(var): - return var.detach().numpy() + return var.cpu().detach().numpy() def normc_initializer(std=1.0): diff --git a/python/ray/rllib/optimizers/sample_batch.py b/python/ray/rllib/optimizers/sample_batch.py index 620eced0f..14584b41f 100644 --- a/python/ray/rllib/optimizers/sample_batch.py +++ b/python/ray/rllib/optimizers/sample_batch.py @@ -5,11 +5,17 @@ from __future__ import print_function import collections import numpy as np - # Defaults policy id for single agent environments DEFAULT_POLICY_ID = "default" +def to_float_array(v): + arr = np.array(v) + if arr.dtype == np.float64: + return arr.astype(np.float32) # save some memory + return arr + + class SampleBatchBuilder(object): """Util to build a SampleBatch incrementally. @@ -38,7 +44,8 @@ class SampleBatchBuilder(object): def build_and_reset(self): """Returns a sample batch including all previously added values.""" - batch = SampleBatch({k: np.array(v) for k, v in self.buffers.items()}) + batch = SampleBatch( + {k: to_float_array(v) for k, v in self.buffers.items()}) self.buffers.clear() self.count = 0 return batch diff --git a/python/ray/rllib/pg/pg_policy_graph.py b/python/ray/rllib/pg/pg_policy_graph.py index b2c3e1f8f..6a095ee85 100644 --- a/python/ray/rllib/pg/pg_policy_graph.py +++ b/python/ray/rllib/pg/pg_policy_graph.py @@ -6,42 +6,46 @@ import tensorflow as tf import ray from ray.rllib.models.catalog import ModelCatalog -from ray.rllib.utils.process_rollout import compute_advantages +from ray.rllib.utils.postprocessing import compute_advantages from ray.rllib.utils.tf_policy_graph import TFPolicyGraph -class PGPolicyGraph(TFPolicyGraph): +class PGLoss(object): + def __init__(self, action_dist, actions, advantages): + self.loss = -tf.reduce_mean(action_dist.logp(actions) * advantages) + +class PGPolicyGraph(TFPolicyGraph): def __init__(self, obs_space, action_space, config): config = dict(ray.rllib.pg.pg.DEFAULT_CONFIG, **config) self.config = config - # setup policy - self.x = tf.placeholder(tf.float32, shape=[None]+list(obs_space.shape)) + # Setup policy + obs = tf.placeholder(tf.float32, shape=[None]+list(obs_space.shape)) dist_class, self.logit_dim = ModelCatalog.get_action_dist( action_space, self.config["model"]) - self.model = ModelCatalog.get_model( - self.x, self.logit_dim, options=self.config["model"]) - self.dist = dist_class(self.model.outputs) # logit for each action + model = ModelCatalog.get_model( + obs, self.logit_dim, options=self.config["model"]) + action_dist = dist_class(model.outputs) # logit for each action - # setup policy loss - self.ac = ModelCatalog.get_action_placeholder(action_space) - self.adv = tf.placeholder(tf.float32, [None], name="adv") - self.loss = -tf.reduce_mean(self.dist.logp(self.ac) * self.adv) + # Setup policy loss + actions = ModelCatalog.get_action_placeholder(action_space) + advantages = tf.placeholder(tf.float32, [None], name="adv") + loss = PGLoss(action_dist, actions, advantages).loss - # initialize TFPolicyGraph - self.sess = tf.get_default_session() - self.loss_in = [ - ("obs", self.x), - ("actions", self.ac), - ("advantages", self.adv), + # Initialize TFPolicyGraph + sess = tf.get_default_session() + loss_in = [ + ("obs", obs), + ("actions", actions), + ("advantages", advantages), ] self.is_training = tf.placeholder_with_default(True, ()) TFPolicyGraph.__init__( - self, obs_space, action_space, self.sess, obs_input=self.x, - action_sampler=self.dist.sample(), loss=self.loss, - loss_inputs=self.loss_in, is_training=self.is_training) - self.sess.run(tf.global_variables_initializer()) + self, obs_space, action_space, sess, obs_input=obs, + action_sampler=action_dist.sample(), loss=loss, + loss_inputs=loss_in, is_training=self.is_training) + sess.run(tf.global_variables_initializer()) def postprocess_trajectory(self, sample_batch, other_agent_batches=None): return compute_advantages( diff --git a/python/ray/rllib/ppo/ppo_evaluator.py b/python/ray/rllib/ppo/ppo_evaluator.py index 2cac85cd5..46d2ae223 100644 --- a/python/ray/rllib/ppo/ppo_evaluator.py +++ b/python/ray/rllib/ppo/ppo_evaluator.py @@ -11,7 +11,7 @@ from ray.rllib.optimizers import SampleBatch, TFMultiGPUSupport from ray.rllib.models import ModelCatalog from ray.rllib.utils.sampler import SyncSampler from ray.rllib.utils.filter import get_filter, MeanStdFilter -from ray.rllib.utils.process_rollout import compute_advantages +from ray.rllib.utils.postprocessing import compute_advantages from ray.rllib.ppo.loss import ProximalPolicyGraph diff --git a/python/ray/rllib/test/test_checkpoint_restore.py b/python/ray/rllib/test/test_checkpoint_restore.py index 68eeb27ea..fe954423e 100644 --- a/python/ray/rllib/test/test_checkpoint_restore.py +++ b/python/ray/rllib/test/test_checkpoint_restore.py @@ -25,7 +25,7 @@ CONFIGS = { "DQN": {}, "DDPG": {"noise_scale": 0.0, "timesteps_per_iteration": 100}, "PPO": {"num_sgd_iter": 5, "timesteps_per_batch": 1000, "num_workers": 2}, - "A3C": {"use_lstm": False, "num_workers": 1}, + "A3C": {"num_workers": 1}, } diff --git a/python/ray/rllib/test/test_common_policy_evaluator.py b/python/ray/rllib/test/test_common_policy_evaluator.py index c256f2780..1f6b77956 100644 --- a/python/ray/rllib/test/test_common_policy_evaluator.py +++ b/python/ray/rllib/test/test_common_policy_evaluator.py @@ -11,7 +11,7 @@ from ray.rllib.pg import PGAgent from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \ collect_metrics from ray.rllib.utils.policy_graph import PolicyGraph -from ray.rllib.utils.process_rollout import compute_advantages +from ray.rllib.utils.postprocessing import compute_advantages from ray.rllib.utils.vector_env import VectorEnv from ray.tune.registry import register_env diff --git a/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml b/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml index e7ed26e92..08e0c05af 100644 --- a/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml +++ b/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml @@ -42,21 +42,6 @@ halfcheetah-ddpg: learning_starts: 500 sample_batch_size: 1 train_batch_size: 64 - smoothing_num_episodes: 10 - - # === Tensorflow === - tf_session_args: { - "device_count": { - "CPU": 2 - }, - "log_device_placement": False, - "allow_soft_placement": True, - "gpu_options": { - "allow_growth": True - }, - "inter_op_parallelism_threads": 1, - "intra_op_parallelism_threads": 1, - } # === Parallelism === num_workers: 0 diff --git a/python/ray/rllib/tuned_examples/mountaincarcontinuous-ddpg.yaml b/python/ray/rllib/tuned_examples/mountaincarcontinuous-ddpg.yaml index f3363032c..34d588a91 100644 --- a/python/ray/rllib/tuned_examples/mountaincarcontinuous-ddpg.yaml +++ b/python/ray/rllib/tuned_examples/mountaincarcontinuous-ddpg.yaml @@ -42,21 +42,6 @@ mountaincarcontinuous-ddpg: learning_starts: 1000 sample_batch_size: 1 train_batch_size: 64 - smoothing_num_episodes: 10 - - # === Tensorflow === - tf_session_args: { - "device_count": { - "CPU": 2 - }, - "log_device_placement": False, - "allow_soft_placement": True, - "gpu_options": { - "allow_growth": True - }, - "inter_op_parallelism_threads": 1, - "intra_op_parallelism_threads": 1, - } # === Parallelism === num_workers: 0 diff --git a/python/ray/rllib/tuned_examples/pendulum-ddpg.yaml b/python/ray/rllib/tuned_examples/pendulum-ddpg.yaml index 1c377ca84..01efc1466 100644 --- a/python/ray/rllib/tuned_examples/pendulum-ddpg.yaml +++ b/python/ray/rllib/tuned_examples/pendulum-ddpg.yaml @@ -42,21 +42,6 @@ pendulum-ddpg: learning_starts: 500 sample_batch_size: 1 train_batch_size: 64 - smoothing_num_episodes: 10 - - # === Tensorflow === - tf_session_args: { - "device_count": { - "CPU": 2 - }, - "log_device_placement": False, - "allow_soft_placement": True, - "gpu_options": { - "allow_growth": True - }, - "inter_op_parallelism_threads": 1, - "intra_op_parallelism_threads": 1, - } # === Parallelism === num_workers: 0 diff --git a/python/ray/rllib/tuned_examples/pong-a3c-pytorch.yaml b/python/ray/rllib/tuned_examples/pong-a3c-pytorch.yaml index c39516ab6..4d85eb688 100644 --- a/python/ray/rllib/tuned_examples/pong-a3c-pytorch.yaml +++ b/python/ray/rllib/tuned_examples/pong-a3c-pytorch.yaml @@ -4,7 +4,6 @@ pong-a3c-pytorch-cnn: config: num_workers: 16 batch_size: 20 - use_lstm: false use_pytorch: true vf_loss_coeff: 0.5 entropy_coeff: -0.01 @@ -15,6 +14,7 @@ pong-a3c-pytorch-cnn: observation_filter: NoFilter reward_filter: NoFilter model: + use_lstm: false channel_major: true dim: 80 grayscale: true diff --git a/python/ray/rllib/tuned_examples/pong-a3c.yaml b/python/ray/rllib/tuned_examples/pong-a3c.yaml index 029ec97f2..400401b6d 100644 --- a/python/ray/rllib/tuned_examples/pong-a3c.yaml +++ b/python/ray/rllib/tuned_examples/pong-a3c.yaml @@ -2,9 +2,8 @@ pong-a3c: env: PongDeterministic-v4 run: A3C config: - num_workers: 16 + num_workers: 1 batch_size: 20 - use_lstm: true use_pytorch: false vf_loss_coeff: 0.5 entropy_coeff: -0.01 @@ -15,6 +14,7 @@ pong-a3c: observation_filter: NoFilter reward_filter: NoFilter model: + use_lstm: true channel_major: false dim: 42 grayscale: true diff --git a/python/ray/rllib/utils/policy_graph.py b/python/ray/rllib/utils/policy_graph.py index d7c526401..e156bdde2 100644 --- a/python/ray/rllib/utils/policy_graph.py +++ b/python/ray/rllib/utils/policy_graph.py @@ -24,9 +24,13 @@ class PolicyGraph(object): def __init__(self, observation_space, action_space, config): """Initialize the graph. + This is the standard constructor for policy graphs. The policy graph + class you pass into CommonPolicyEvaluator will be constructed with + these arguments. + Args: - observation_space (gym.Space): Observation space of the env. - action_space (gym.Space): Action space of the env. + observation_space (gym.Space): Observation space of the policy. + action_space (gym.Space): Action space of the policy. config (dict): Policy-specific configuration data. """ diff --git a/python/ray/rllib/utils/process_rollout.py b/python/ray/rllib/utils/postprocessing.py similarity index 88% rename from python/ray/rllib/utils/process_rollout.py rename to python/ray/rllib/utils/postprocessing.py index ed7088bb1..1e2f2ebef 100644 --- a/python/ray/rllib/utils/process_rollout.py +++ b/python/ray/rllib/utils/postprocessing.py @@ -23,7 +23,8 @@ def compute_advantages(rollout, last_r, gamma, lambda_=1.0, use_gae=True): Returns: SampleBatch (SampleBatch): Object with experience from rollout and - processed rewards.""" + processed rewards. + """ traj = {} trajsize = len(rollout["actions"]) @@ -37,13 +38,14 @@ def compute_advantages(rollout, last_r, gamma, lambda_=1.0, use_gae=True): # This formula for the advantage comes # "Generalized Advantage Estimation": https://arxiv.org/abs/1506.02438 traj["advantages"] = discount(delta_t, gamma * lambda_) - traj["value_targets"] = traj["advantages"] + traj["vf_preds"] + traj["value_targets"] = ( + traj["advantages"] + traj["vf_preds"]).copy().astype(np.float32) else: rewards_plus_v = np.concatenate( [rollout["rewards"], np.array([last_r])]) traj["advantages"] = discount(rewards_plus_v, gamma)[:-1] - traj["advantages"] = traj["advantages"].copy() + traj["advantages"] = traj["advantages"].copy().astype(np.float32) assert all(val.shape[0] == trajsize for val in traj.values()), \ "Rollout stacked incorrectly!" diff --git a/python/ray/rllib/utils/sampler.py b/python/ray/rllib/utils/sampler.py index ca6f4dda0..1d8509179 100644 --- a/python/ray/rllib/utils/sampler.py +++ b/python/ray/rllib/utils/sampler.py @@ -219,7 +219,7 @@ def _env_runner( else: all_done = False # At least send an empty dict if not done - actions_to_send[env_id] + actions_to_send[env_id] = {} # For each agent in the environment for agent_id, raw_obs in agent_obs.items(): diff --git a/python/ray/rllib/utils/tf_policy_graph.py b/python/ray/rllib/utils/tf_policy_graph.py index 74cf1345b..f881478e3 100644 --- a/python/ray/rllib/utils/tf_policy_graph.py +++ b/python/ray/rllib/utils/tf_policy_graph.py @@ -18,6 +18,10 @@ class TFPolicyGraph(PolicyGraph): All input and output tensors are of shape [BATCH_DIM, ...]. + Attributes: + observation_space (gym.Space): observation space of the policy. + action_space (gym.Space): action space of the policy. + Examples: >>> policy = TFPolicyGraphSubclass( sess, obs_input, action_sampler, loss, loss_inputs, is_training) @@ -33,7 +37,7 @@ class TFPolicyGraph(PolicyGraph): self, observation_space, action_space, sess, obs_input, action_sampler, loss, loss_inputs, is_training, state_inputs=None, state_outputs=None): - """Initialize the policy. + """Initialize the policy graph. Arguments: observation_space (gym.Space): Observation space of the env. @@ -71,7 +75,8 @@ class TFPolicyGraph(PolicyGraph): self._loss, self._sess) assert len(self._state_inputs) == len(self._state_outputs) == \ - len(self.get_initial_state()) + len(self.get_initial_state()), \ + (self._state_inputs, self._state_outputs, self.get_initial_state()) def build_compute_actions( self, builder, obs_batch, state_batches=None, is_training=False): diff --git a/python/ray/rllib/utils/torch_policy_graph.py b/python/ray/rllib/utils/torch_policy_graph.py new file mode 100644 index 000000000..96114cc5c --- /dev/null +++ b/python/ray/rllib/utils/torch_policy_graph.py @@ -0,0 +1,104 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +from threading import Lock + +import torch +import torch.nn.functional as F + +from ray.rllib.models.pytorch.misc import var_to_np +from ray.rllib.utils.policy_graph import PolicyGraph + + +class TorchPolicyGraph(PolicyGraph): + """Template for a PyTorch policy and loss to use with RLlib. + + This is similar to TFPolicyGraph, but for PyTorch. + + Attributes: + observation_space (gym.Space): observation space of the policy. + action_space (gym.Space): action space of the policy. + lock (Lock): Lock that must be held around PyTorch ops on this graph. + This is necessary when using the async sampler. + """ + + def __init__( + self, observation_space, action_space, model, loss, loss_inputs): + """Build a policy graph from policy and loss torch modules. + + Note that module inputs will be CPU tensors. The model and loss modules + are responsible for moving inputs to the right device. + + Arguments: + observation_space (gym.Space): observation space of the policy. + action_space (gym.Space): action space of the policy. + model (nn.Module): PyTorch policy module. Given observations as + input, this module must a list of outputs where the first item + are action logits, and the remainder can be any value. + loss (nn.Module): Loss defined as a PyTorch module. The inputs for + this module are defined by the `loss_inputs` param. This module + returns a single scalar loss. + loss_inputs (list): List of SampleBatch columns that will be + passed to the loss module's forward() function when computing + the loss. For example, ["obs", "action", "advantages"]. + """ + self.observation_space = observation_space + self.action_space = action_space + self.lock = Lock() + self._model = model + self._loss = loss + self._loss_inputs = loss_inputs + self._optimizer = self.optimizer() + + def extra_action_out(self, model_out): + """Returns dict of extra info to include in experience batch. + + Arguments: + model_out (list): Outputs of the policy model module.""" + return {} + + def optimizer(self): + """Custom PyTorch optimizer to use.""" + return torch.optim.Adam(self._model.parameters()) + + def compute_actions( + self, obs_batch, state_batches=None, is_training=False): + if state_batches: + raise NotImplementedError("Torch RNN support") + with self.lock: + with torch.no_grad(): + ob = torch.from_numpy(np.array(obs_batch)).float() + model_out = self._model(ob) + logits = model_out[0] # assume the first output is the logits + actions = F.softmax(logits, dim=1).multinomial(1).squeeze(0) + return var_to_np(actions), [], self.extra_action_out(model_out) + + def compute_gradients(self, postprocessed_batch): + with self.lock: + loss_in = [] + for key in self._loss_inputs: + loss_in.append(torch.from_numpy(postprocessed_batch[key])) + loss_out = self._loss(*loss_in) + self._optimizer.zero_grad() + loss_out.backward() + # Note that return values are just references; + # calling zero_grad will modify the values + grads = [var_to_np(p.grad.data) for p in self._model.parameters()] + return grads, {} + + def apply_gradients(self, gradients): + with self.lock: + for g, p in zip(gradients, self._model.parameters()): + p.grad = torch.from_numpy(g) + self._optimizer.step() + return {} + + def get_weights(self): + with self.lock: + return self._model.state_dict() + + def set_weights(self, weights): + with self.lock: + self._model.load_state_dict(weights) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 0a29af107..2324b3f60 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -11,51 +11,6 @@ ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) DOCKER_SHA=$($ROOT_DIR/../../build-docker.sh --output-sha --no-cache) echo "Using Docker image" $DOCKER_SHA -python $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=5 \ - --num-redis-shards=10 \ - --test-script=/ray/test/jenkins_tests/multi_node_tests/test_0.py - -python $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=5 \ - --num-redis-shards=5 \ - --num-gpus=0,1,2,3,4 \ - --num-drivers=7 \ - --driver-locations=0,1,0,1,2,3,4 \ - --test-script=/ray/test/jenkins_tests/multi_node_tests/remove_driver_test.py - -python $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=5 \ - --num-redis-shards=2 \ - --num-gpus=0,0,5,6,50 \ - --num-drivers=100 \ - --test-script=/ray/test/jenkins_tests/multi_node_tests/many_drivers_test.py - -python $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=1 \ - --mem-size=60G \ - --shm-size=60G \ - --test-script=/ray/test/jenkins_tests/multi_node_tests/large_memory_test.py - -# Test that the example applications run. - -# docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ -# python /ray/examples/lbfgs/driver.py - -# docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ -# python /ray/examples/rl_pong/driver.py \ -# --iterations=3 - -# docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ -# python /ray/examples/hyperopt/hyperopt_simple.py - -# docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ -# python /ray/examples/hyperopt/hyperopt_adaptive.py - docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env PongDeterministic-v0 \ @@ -96,7 +51,6 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env CartPole-v0 \ --run A3C \ --stop '{"training_iteration": 2}' \ - --config '{"use_lstm": false}' docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ @@ -151,14 +105,14 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env PongDeterministic-v4 \ --run A3C \ --stop '{"training_iteration": 2}' \ - --config '{"num_workers": 2, "use_lstm": false, "use_pytorch": true, "model": {"grayscale": true, "zero_mean": false, "dim": 80, "channel_major": true}}' + --config '{"num_workers": 2, "use_pytorch": true, "model": {"use_lstm": false, "grayscale": true, "zero_mean": false, "dim": 80, "channel_major": true}}' docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run A3C \ --stop '{"training_iteration": 2}' \ - --config '{"num_workers": 2, "use_lstm": false, "use_pytorch": true}' + --config '{"num_workers": 2, "use_pytorch": true}' docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ @@ -260,3 +214,33 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/multiagent_cartpole.py + +python $ROOT_DIR/multi_node_docker_test.py \ + --docker-image=$DOCKER_SHA \ + --num-nodes=5 \ + --num-redis-shards=10 \ + --test-script=/ray/test/jenkins_tests/multi_node_tests/test_0.py + +python $ROOT_DIR/multi_node_docker_test.py \ + --docker-image=$DOCKER_SHA \ + --num-nodes=5 \ + --num-redis-shards=5 \ + --num-gpus=0,1,2,3,4 \ + --num-drivers=7 \ + --driver-locations=0,1,0,1,2,3,4 \ + --test-script=/ray/test/jenkins_tests/multi_node_tests/remove_driver_test.py + +python $ROOT_DIR/multi_node_docker_test.py \ + --docker-image=$DOCKER_SHA \ + --num-nodes=5 \ + --num-redis-shards=2 \ + --num-gpus=0,0,5,6,50 \ + --num-drivers=100 \ + --test-script=/ray/test/jenkins_tests/multi_node_tests/many_drivers_test.py + +python $ROOT_DIR/multi_node_docker_test.py \ + --docker-image=$DOCKER_SHA \ + --num-nodes=1 \ + --mem-size=60G \ + --shm-size=60G \ + --test-script=/ray/test/jenkins_tests/multi_node_tests/large_memory_test.py