diff --git a/doc/source/example-a3c.rst b/doc/source/example-a3c.rst index 5c3b0d465..e9f0d4510 100644 --- a/doc/source/example-a3c.rst +++ b/doc/source/example-a3c.rst @@ -98,7 +98,7 @@ We use a Ray Actor to simulate the environment. self.policy.set_weights(params) rollout = self.pull_batch_from_queue() batch = process_rollout(rollout, gamma=0.99, lambda_=1.0) - gradient = self.policy.get_gradients(batch) + gradient = self.policy.compute_gradients(batch) info = {"id": self.id, "size": len(batch.a)} return gradient, info @@ -138,7 +138,7 @@ global model parameters. The main training script looks like the following. obs += info["size"] # apply update, get the weights from the model, start a new task on the same actor object - policy.model_update(gradient) + policy.apply_gradients(gradient) parameters = policy.get_weights() gradient_list.extend([agents[info["id"]].compute_gradient(parameters)]) return policy diff --git a/python/ray/rllib/a3c/a3c.py b/python/ray/rllib/a3c/a3c.py index 1c39f3c4d..6bd6c5469 100644 --- a/python/ray/rllib/a3c/a3c.py +++ b/python/ray/rllib/a3c/a3c.py @@ -4,14 +4,12 @@ from __future__ import print_function import numpy as np import pickle -import tensorflow as tf -import six.moves.queue as queue import os import ray from ray.rllib.agent import Agent -from ray.rllib.a3c.runner import RunnerThread, process_rollout from ray.rllib.a3c.envs import create_and_wrap +from ray.rllib.a3c.runner import RemoteRunner from ray.rllib.a3c.shared_model import SharedModel from ray.rllib.a3c.shared_model_lstm import SharedModelLSTM from ray.tune.result import TrainingResult @@ -24,76 +22,11 @@ DEFAULT_CONFIG = { "use_lstm": True, "model": {"grayscale": True, "zero_mean": False, - "dim": 42} + "dim": 42, + "channel_major": True} } -@ray.remote -class Runner(object): - """Actor object to start running simulation on workers. - - The gradient computation is also executed from this object. - """ - def __init__(self, env_creator, policy_cls, actor_id, batch_size, - preprocess_config, logdir): - env = create_and_wrap(env_creator, preprocess_config) - self.id = actor_id - # TODO(rliaw): should change this to be just env.observation_space - self.policy = policy_cls(env.observation_space.shape, env.action_space) - self.runner = RunnerThread(env, self.policy, batch_size) - self.env = env - self.logdir = logdir - self.start() - - def pull_batch_from_queue(self): - """Take a rollout from the queue of the thread runner.""" - rollout = self.runner.queue.get(timeout=600.0) - if isinstance(rollout, BaseException): - raise rollout - while not rollout.terminal: - try: - part = self.runner.queue.get_nowait() - if isinstance(part, BaseException): - raise rollout - rollout.extend(part) - except queue.Empty: - break - return rollout - - def get_completed_rollout_metrics(self): - """Returns metrics on previously completed rollouts. - - Calling this clears the queue of completed rollout metrics. - """ - completed = [] - while True: - try: - completed.append(self.runner.metrics_queue.get_nowait()) - except queue.Empty: - break - return completed - - def start(self): - summary_writer = tf.summary.FileWriter( - os.path.join(self.logdir, "agent_%d" % self.id)) - self.summary_writer = summary_writer - self.runner.start_runner(self.policy.sess, summary_writer) - - def compute_gradient(self, params): - self.policy.set_weights(params) - rollout = self.pull_batch_from_queue() - batch = process_rollout(rollout, gamma=0.99, lambda_=1.0) - gradient, info = self.policy.get_gradients(batch) - if "summary" in info: - self.summary_writer.add_summary( - tf.Summary.FromString(info['summary']), - self.policy.local_steps) - self.summary_writer.flush() - info = {"id": self.id, - "size": len(batch.a)} - return gradient, info - - class A3CAgent(Agent): _agent_name = "A3C" _default_config = DEFAULT_CONFIG @@ -107,9 +40,9 @@ class A3CAgent(Agent): self.policy = policy_cls( self.env.observation_space.shape, self.env.action_space) self.agents = [ - Runner.remote(self.env_creator, policy_cls, i, - self.config["batch_size"], - self.config["model"], self.logdir) + RemoteRunner.remote(self.env_creator, policy_cls, i, + self.config["batch_size"], + self.config["model"], self.logdir) for i in range(self.config["num_workers"])] self.parameters = self.policy.get_weights() @@ -122,7 +55,7 @@ class A3CAgent(Agent): while gradient_list: done_id, gradient_list = ray.wait(gradient_list) gradient, info = ray.get(done_id)[0] - self.policy.model_update(gradient) + self.policy.apply_gradients(gradient) self.parameters = self.policy.get_weights() if batches_so_far < max_batches: batches_so_far += 1 @@ -168,5 +101,5 @@ class A3CAgent(Agent): self.policy.set_weights(self.parameters) def compute_action(self, observation): - actions = self.policy.compute_actions(observation) + actions = self.policy.compute_action(observation) return actions[0] diff --git a/python/ray/rllib/a3c/common.py b/python/ray/rllib/a3c/common.py new file mode 100644 index 000000000..17e6e7f9b --- /dev/null +++ b/python/ray/rllib/a3c/common.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import scipy.signal +from collections import namedtuple + + +def discount(x, gamma): + return scipy.signal.lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1] + + +def process_rollout(rollout, gamma, lambda_=1.0): + """Given a rollout, compute its returns and the advantage.""" + batch_si = np.asarray(rollout.states) + batch_a = np.asarray(rollout.actions) + rewards = np.asarray(rollout.rewards) + vpred_t = np.asarray(rollout.values + [rollout.r]) + + rewards_plus_v = np.asarray(rollout.rewards + [rollout.r]) + batch_r = discount(rewards_plus_v, gamma)[:-1] + delta_t = rewards + gamma * vpred_t[1:] - vpred_t[:-1] + # This formula for the advantage comes "Generalized Advantage Estimation": + # https://arxiv.org/abs/1506.02438 + batch_adv = discount(delta_t, gamma * lambda_) + + features = rollout.features[0] + return Batch(batch_si, batch_a, batch_adv, batch_r, rollout.terminal, + features) + + +Batch = namedtuple( + "Batch", ["si", "a", "adv", "r", "terminal", "features"]) + +CompletedRollout = namedtuple( + "CompletedRollout", ["episode_length", "episode_reward"]) diff --git a/python/ray/rllib/a3c/policy.py b/python/ray/rllib/a3c/policy.py index fc9ae3392..2b01aaeb1 100644 --- a/python/ray/rllib/a3c/policy.py +++ b/python/ray/rllib/a3c/policy.py @@ -2,99 +2,29 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import tensorflow as tf -import ray -import gym - class Policy(object): """The policy base class.""" def __init__(self, ob_space, action_space, name="local", summarize=True): - self.local_steps = 0 - self.summarize = summarize - worker_device = "/job:localhost/replica:0/task:0/cpu:0" - self.g = tf.Graph() - with self.g.as_default(), tf.device(worker_device): - with tf.variable_scope(name): - self.setup_graph(ob_space, action_space) - assert all([hasattr(self, attr) - for attr in ["vf", "logits", "x", "var_list"]]) - print("Setting up loss") - self.setup_loss(action_space) - self.setup_gradients() - self.initialize() + pass - def setup_graph(self): + def apply_gradients(self, grads): raise NotImplementedError - def setup_loss(self, action_space): - if isinstance(action_space, gym.spaces.Box): - ac_size = action_space.shape[0] - self.ac = tf.placeholder(tf.float32, [None, ac_size], name="ac") - elif isinstance(action_space, gym.spaces.Discrete): - self.ac = tf.placeholder(tf.int64, [None], name="ac") - else: - raise NotImplemented( - "action space" + str(type(action_space)) + - "currently not supported") - self.adv = tf.placeholder(tf.float32, [None], name="adv") - self.r = tf.placeholder(tf.float32, [None], name="r") - - log_prob = self.curr_dist.logp(self.ac) - - # The "policy gradients" loss: its derivative is precisely the policy - # gradient. Notice that self.ac is a placeholder that is provided - # externally. adv will contain the advantages, as calculated in - # process_rollout. - self.pi_loss = - tf.reduce_sum(log_prob * self.adv) - - delta = self.vf - self.r - self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta)) - self.entropy = tf.reduce_sum(self.curr_dist.entropy()) - self.loss = self.pi_loss + 0.5 * self.vf_loss - self.entropy * 0.01 - - def setup_gradients(self): - grads = tf.gradients(self.loss, self.var_list) - self.grads, _ = tf.clip_by_global_norm(grads, 40.0) - grads_and_vars = list(zip(self.grads, self.var_list)) - opt = tf.train.AdamOptimizer(1e-4) - self._apply_gradients = opt.apply_gradients(grads_and_vars) - - def initialize(self): - if self.summarize: - bs = tf.to_float(tf.shape(self.x)[0]) - tf.summary.scalar("model/policy_loss", self.pi_loss / bs) - tf.summary.scalar("model/value_loss", self.vf_loss / bs) - tf.summary.scalar("model/entropy", self.entropy / bs) - tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads)) - tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list)) - self.summary_op = tf.summary.merge_all() - - self.sess = tf.Session(graph=self.g, config=tf.ConfigProto( - intra_op_parallelism_threads=1, inter_op_parallelism_threads=2)) - self.variables = ray.experimental.TensorFlowVariables(self.loss, - self.sess) - self.sess.run(tf.global_variables_initializer()) - - def model_update(self, grads): - feed_dict = {self.grads[i]: grads[i] - for i in range(len(grads))} - self.sess.run(self._apply_gradients, feed_dict=feed_dict) - def get_weights(self): - weights = self.variables.get_weights() - return weights + raise NotImplementedError def set_weights(self, weights): - self.variables.set_weights(weights) + raise NotImplementedError - def get_gradients(self, batch): + def compute_gradients(self, batch): raise NotImplementedError def get_vf_loss(self): raise NotImplementedError - def compute_actions(self, observations): + def compute_action(self, observations): + """Compute action for a _single_ observation""" raise NotImplementedError def value(self, ob): diff --git a/python/ray/rllib/a3c/runner.py b/python/ray/rllib/a3c/runner.py index f6a2834fa..c490ad42e 100644 --- a/python/ray/rllib/a3c/runner.py +++ b/python/ray/rllib/a3c/runner.py @@ -2,182 +2,82 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from collections import namedtuple -import numpy as np +from ray.rllib.a3c.envs import create_and_wrap import tensorflow as tf import six.moves.queue as queue -import scipy.signal -import threading +from ray.rllib.a3c.runner_thread import RunnerThread +from ray.rllib.a3c.common import process_rollout +from ray.rllib.a3c.tfpolicy import TFPolicy +import ray +import os -def discount(x, gamma): - return scipy.signal.lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1] +class Runner(object): + """Actor object to start running simulation on workers. - -def process_rollout(rollout, gamma, lambda_=1.0): - """Given a rollout, compute its returns and the advantage.""" - batch_si = np.asarray(rollout.states) - batch_a = np.asarray(rollout.actions) - rewards = np.asarray(rollout.rewards) - vpred_t = np.asarray(rollout.values + [rollout.r]) - - rewards_plus_v = np.asarray(rollout.rewards + [rollout.r]) - batch_r = discount(rewards_plus_v, gamma)[:-1] - delta_t = rewards + gamma * vpred_t[1:] - vpred_t[:-1] - # This formula for the advantage comes "Generalized Advantage Estimation": - # https://arxiv.org/abs/1506.02438 - batch_adv = discount(delta_t, gamma * lambda_) - - features = rollout.features[0] - return Batch(batch_si, batch_a, batch_adv, batch_r, rollout.terminal, - features) - - -Batch = namedtuple( - "Batch", ["si", "a", "adv", "r", "terminal", "features"]) - -CompletedRollout = namedtuple( - "CompletedRollout", ["episode_length", "episode_reward"]) - - -class PartialRollout(object): - """A piece of a complete rollout. - - We run our agent, and process its experience once it has processed enough - steps. + The gradient computation is also executed from this object. """ - def __init__(self): - self.states = [] - self.actions = [] - self.rewards = [] - self.values = [] - self.r = 0.0 - self.terminal = False - self.features = [] - - def add(self, state, action, reward, value, terminal, features): - self.states += [state] - self.actions += [action] - self.rewards += [reward] - self.values += [value] - self.terminal = terminal - self.features += [features] - - def extend(self, other): - assert not self.terminal - self.states.extend(other.states) - self.actions.extend(other.actions) - self.rewards.extend(other.rewards) - self.values.extend(other.values) - self.r = other.r - self.terminal = other.terminal - self.features.extend(other.features) - - -class RunnerThread(threading.Thread): - """This thread interacts with the environment and tells it what to do.""" - def __init__(self, env, policy, num_local_steps, visualise=False): - threading.Thread.__init__(self) - self.queue = queue.Queue(5) - self.metrics_queue = queue.Queue() - self.num_local_steps = num_local_steps + def __init__(self, env_creator, policy_cls, actor_id, batch_size, + preprocess_config, logdir): + env = create_and_wrap(env_creator, preprocess_config) + self.id = actor_id + # TODO(rliaw): should change this to be just env.observation_space + self.policy = policy_cls(env.observation_space.shape, env.action_space) + self.runner = RunnerThread(env, self.policy, batch_size) self.env = env - self.last_features = None - self.policy = policy - self.daemon = True - self.sess = None - self.summary_writer = None - self.visualise = visualise - - def start_runner(self, sess, summary_writer): - self.sess = sess - self.summary_writer = summary_writer + self.logdir = logdir self.start() - def run(self): - try: - with self.sess.as_default(): - self._run() - except BaseException as e: - self.queue.put(e) - raise e + def pull_batch_from_queue(self): + """Take a rollout from the queue of the thread runner.""" + rollout = self.runner.queue.get(timeout=600.0) + if isinstance(rollout, BaseException): + raise rollout + while not rollout.terminal: + try: + part = self.runner.queue.get_nowait() + if isinstance(part, BaseException): + raise rollout + rollout.extend(part) + except queue.Empty: + break + return rollout - def _run(self): - rollout_provider = env_runner( - self.env, self.policy, self.num_local_steps, - self.summary_writer, self.visualise) + def get_completed_rollout_metrics(self): + """Returns metrics on previously completed rollouts. + + Calling this clears the queue of completed rollout metrics. + """ + completed = [] while True: - # The timeout variable exists because apparently, if one worker - # dies, the other workers won't die with it, unless the timeout is - # set to some large number. This is an empirical observation. - item = next(rollout_provider) - if isinstance(item, CompletedRollout): - self.metrics_queue.put(item) - else: - self.queue.put(item, timeout=600.0) + try: + completed.append(self.runner.metrics_queue.get_nowait()) + except queue.Empty: + break + return completed + + def start(self): + summary_writer = tf.summary.FileWriter( + os.path.join(self.logdir, "agent_%d" % self.id)) + self.summary_writer = summary_writer + if isinstance(self.policy, TFPolicy): + self.runner.start_runner(self.policy.sess, summary_writer) + else: + self.runner.start_runner(tf.Session(), summary_writer) + + def compute_gradient(self, params): + self.policy.set_weights(params) + rollout = self.pull_batch_from_queue() + batch = process_rollout(rollout, gamma=0.99, lambda_=1.0) + gradient, info = self.policy.compute_gradients(batch) + if "summary" in info: + self.summary_writer.add_summary( + tf.Summary.FromString(info['summary']), + self.policy.local_steps) + self.summary_writer.flush() + info = {"id": self.id, + "size": len(batch.a)} + return gradient, info -def env_runner(env, policy, num_local_steps, summary_writer, render): - """This implements the logic of the thread runner. - - It continually runs the policy, and as long as the rollout exceeds a - certain length, the thread runner appends the policy to the queue. - """ - last_state = env.reset() - timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit" - ".max_episode_steps") - last_features = policy.get_initial_features() - length = 0 - rewards = 0 - rollout_number = 0 - - while True: - terminal_end = False - rollout = PartialRollout() - - for _ in range(num_local_steps): - fetched = policy.compute_actions(last_state, *last_features) - action, value_, features = fetched[0], fetched[1], fetched[2:] - # Argmax to convert from one-hot. - state, reward, terminal, info = env.step(action) - if render: - env.render() - - length += 1 - rewards += reward - if length >= timestep_limit: - terminal = True - - # Collect the experience. - rollout.add(last_state, action, reward, value_, terminal, - last_features) - - last_state = state - last_features = features - - if info: - summary = tf.Summary() - for k, v in info.items(): - summary.value.add(tag=k, simple_value=float(v)) - summary_writer.add_summary(summary, rollout_number) - summary_writer.flush() - - if terminal: - terminal_end = True - yield CompletedRollout(length, rewards) - - if (length >= timestep_limit or - not env.metadata.get("semantics.autoreset")): - last_state = env.reset() - last_features = policy.get_initial_features() - rollout_number += 1 - length = 0 - rewards = 0 - break - - if not terminal_end: - rollout.r = policy.value(last_state, *last_features) - - # Once we have enough experience, yield it, and have the ThreadRunner - # place it on a queue. - yield rollout +RemoteRunner = ray.remote(Runner) diff --git a/python/ray/rllib/a3c/runner_thread.py b/python/ray/rllib/a3c/runner_thread.py new file mode 100644 index 000000000..c82125434 --- /dev/null +++ b/python/ray/rllib/a3c/runner_thread.py @@ -0,0 +1,151 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import six.moves.queue as queue +import threading +from ray.rllib.a3c.common import CompletedRollout + + +class PartialRollout(object): + """A piece of a complete rollout. + + We run our agent, and process its experience once it has processed enough + steps. + """ + def __init__(self): + self.states = [] + self.actions = [] + self.rewards = [] + self.values = [] + self.r = 0.0 + self.terminal = False + self.features = [] + + def add(self, state, action, reward, value, terminal, features): + self.states += [state] + self.actions += [action] + self.rewards += [reward] + self.values += [value] + self.terminal = terminal + self.features += [features] + + def extend(self, other): + assert not self.terminal + self.states.extend(other.states) + self.actions.extend(other.actions) + self.rewards.extend(other.rewards) + self.values.extend(other.values) + self.r = other.r + self.terminal = other.terminal + self.features.extend(other.features) + + +class RunnerThread(threading.Thread): + """This thread interacts with the environment and tells it what to do.""" + def __init__(self, env, policy, num_local_steps, visualise=False): + threading.Thread.__init__(self) + self.queue = queue.Queue(5) + self.metrics_queue = queue.Queue() + self.num_local_steps = num_local_steps + self.env = env + self.last_features = None + self.policy = policy + self.daemon = True + self.sess = None + self.summary_writer = None + self.visualise = visualise + + def start_runner(self, sess, summary_writer): + self.sess = sess + self.summary_writer = summary_writer + self.start() + + def run(self): + try: + with self.sess.as_default(): + self._run() + except BaseException as e: + self.queue.put(e) + raise e + + def _run(self): + rollout_provider = env_runner( + self.env, self.policy, self.num_local_steps, + self.summary_writer, self.visualise) + while True: + # The timeout variable exists because apparently, if one worker + # dies, the other workers won't die with it, unless the timeout is + # set to some large number. This is an empirical observation. + item = next(rollout_provider) + if isinstance(item, CompletedRollout): + self.metrics_queue.put(item) + else: + self.queue.put(item, timeout=600.0) + + +def env_runner(env, policy, num_local_steps, summary_writer, render): + """This implements the logic of the thread runner. + + It continually runs the policy, and as long as the rollout exceeds a + certain length, the thread runner appends the policy to the queue. + """ + last_state = env.reset() + timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit" + ".max_episode_steps") + last_features = policy.get_initial_features() + length = 0 + rewards = 0 + rollout_number = 0 + + while True: + terminal_end = False + rollout = PartialRollout() + + for _ in range(num_local_steps): + fetched = policy.compute_action(last_state, *last_features) + action, value_, features = fetched[0], fetched[1], fetched[2:] + # Argmax to convert from one-hot. + state, reward, terminal, info = env.step(action) + if render: + env.render() + + length += 1 + rewards += reward + if length >= timestep_limit: + terminal = True + + # Collect the experience. + rollout.add(last_state, action, reward, value_, terminal, + last_features) + + last_state = state + last_features = features + + if info: + summary = tf.Summary() + for k, v in info.items(): + summary.value.add(tag=k, simple_value=float(v)) + summary_writer.add_summary(summary, rollout_number) + summary_writer.flush() + + if terminal: + terminal_end = True + yield CompletedRollout(length, rewards) + + if (length >= timestep_limit or + not env.metadata.get("semantics.autoreset")): + last_state = env.reset() + last_features = policy.get_initial_features() + rollout_number += 1 + length = 0 + rewards = 0 + break + + if not terminal_end: + rollout.r = policy.value(last_state, *last_features) + + # Once we have enough experience, yield it, and have the ThreadRunner + # place it on a queue. + yield rollout diff --git a/python/ray/rllib/a3c/shared_model.py b/python/ray/rllib/a3c/shared_model.py index ea96beef4..bdf3900c5 100644 --- a/python/ray/rllib/a3c/shared_model.py +++ b/python/ray/rllib/a3c/shared_model.py @@ -4,11 +4,11 @@ from __future__ import print_function import tensorflow as tf from ray.rllib.models.misc import linear, normc_initializer -from ray.rllib.a3c.policy import Policy +from ray.rllib.a3c.tfpolicy import TFPolicy from ray.rllib.models.catalog import ModelCatalog -class SharedModel(Policy): +class SharedModel(TFPolicy): def __init__(self, ob_space, ac_space, **kwargs): super(SharedModel, self).__init__(ob_space, ac_space, **kwargs) @@ -31,7 +31,7 @@ class SharedModel(Policy): initializer=tf.constant_initializer(0, dtype=tf.int32), trainable=False) - def get_gradients(self, batch): + def compute_gradients(self, batch): info = {} feed_dict = { self.x: batch.si, @@ -49,13 +49,14 @@ class SharedModel(Policy): grad = self.sess.run(self.grads, feed_dict=feed_dict) return grad, info - def compute_actions(self, ob, *args): + def compute_action(self, ob, *args): action, vf = self.sess.run([self.sample, self.vf], {self.x: [ob]}) - return action[0], vf + return action[0], vf[0] def value(self, ob, *args): - return self.sess.run(self.vf, {self.x: [ob]})[0] + vf = self.sess.run(self.vf, {self.x: [ob]}) + return vf[0] def get_initial_features(self): return [] diff --git a/python/ray/rllib/a3c/shared_model_lstm.py b/python/ray/rllib/a3c/shared_model_lstm.py index 32369ba2f..17b304749 100644 --- a/python/ray/rllib/a3c/shared_model_lstm.py +++ b/python/ray/rllib/a3c/shared_model_lstm.py @@ -5,11 +5,11 @@ from __future__ import print_function import tensorflow as tf from ray.rllib.models.misc import linear, normc_initializer from ray.rllib.models.catalog import ModelCatalog -from ray.rllib.a3c.policy import Policy +from ray.rllib.a3c.tfpolicy import TFPolicy from ray.rllib.models.lstm import LSTM -class SharedModelLSTM(Policy): +class SharedModelLSTM(TFPolicy): def __init__(self, ob_space, ac_space, **kwargs): super(SharedModelLSTM, self).__init__(ob_space, ac_space, **kwargs) @@ -38,7 +38,7 @@ class SharedModelLSTM(Policy): initializer=tf.constant_initializer(0, dtype=tf.int32), trainable=False) - def get_gradients(self, batch): + def compute_gradients(self, batch): """Computing the gradient is actually model-dependent. The LSTM needs its hidden states in order to compute the gradient @@ -62,20 +62,18 @@ class SharedModelLSTM(Policy): grad = self.sess.run(self.grads, feed_dict=feed_dict) return grad, info - def compute_actions(self, ob, c, h): - output = self.sess.run([self.sample, self.vf] + self.state_out, - {self.x: [ob], - self.state_in[0]: c, - self.state_in[1]: h}) - output = list(output) - output[0] = output[0][0] - return output + def compute_action(self, ob, c, h): + action, vf, c, h = self.sess.run( + [self.sample, self.vf] + self.state_out, + {self.x: [ob], self.state_in[0]: c, self.state_in[1]: h}) + return action[0], vf[0], c, h def value(self, ob, c, h): # process_rollout is very non-intuitive due to value being a float - return self.sess.run(self.vf, {self.x: [ob], - self.state_in[0]: c, - self.state_in[1]: h})[0] + vf = self.sess.run(self.vf, {self.x: [ob], + self.state_in[0]: c, + self.state_in[1]: h}) + return vf[0] def get_initial_features(self): return self.state_init diff --git a/python/ray/rllib/a3c/tfpolicy.py b/python/ray/rllib/a3c/tfpolicy.py new file mode 100644 index 000000000..f73974b83 --- /dev/null +++ b/python/ray/rllib/a3c/tfpolicy.py @@ -0,0 +1,102 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import ray +import gym +from ray.rllib.a3c.policy import Policy + + +class TFPolicy(Policy): + """The policy base class.""" + def __init__(self, ob_space, action_space, name="local", summarize=True): + self.local_steps = 0 + self.summarize = summarize + worker_device = "/job:localhost/replica:0/task:0/cpu:0" + self.g = tf.Graph() + with self.g.as_default(), tf.device(worker_device): + with tf.variable_scope(name): + self.setup_graph(ob_space, action_space) + assert all([hasattr(self, attr) + for attr in ["vf", "logits", "x", "var_list"]]) + print("Setting up loss") + self.setup_loss(action_space) + self.setup_gradients() + self.initialize() + + def setup_graph(self): + raise NotImplementedError + + def setup_loss(self, action_space): + if isinstance(action_space, gym.spaces.Box): + ac_size = action_space.shape[0] + self.ac = tf.placeholder(tf.float32, [None, ac_size], name="ac") + elif isinstance(action_space, gym.spaces.Discrete): + self.ac = tf.placeholder(tf.int64, [None], name="ac") + else: + raise NotImplemented( + "action space" + str(type(action_space)) + + "currently not supported") + self.adv = tf.placeholder(tf.float32, [None], name="adv") + self.r = tf.placeholder(tf.float32, [None], name="r") + + log_prob = self.curr_dist.logp(self.ac) + + # The "policy gradients" loss: its derivative is precisely the policy + # gradient. Notice that self.ac is a placeholder that is provided + # externally. adv will contain the advantages, as calculated in + # process_rollout. + self.pi_loss = - tf.reduce_sum(log_prob * self.adv) + + delta = self.vf - self.r + self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta)) + self.entropy = tf.reduce_sum(self.curr_dist.entropy()) + self.loss = self.pi_loss + 0.5 * self.vf_loss - self.entropy * 0.01 + + def setup_gradients(self): + grads = tf.gradients(self.loss, self.var_list) + self.grads, _ = tf.clip_by_global_norm(grads, 40.0) + grads_and_vars = list(zip(self.grads, self.var_list)) + opt = tf.train.AdamOptimizer(1e-4) + self._apply_gradients = opt.apply_gradients(grads_and_vars) + + def initialize(self): + if self.summarize: + bs = tf.to_float(tf.shape(self.x)[0]) + tf.summary.scalar("model/policy_loss", self.pi_loss / bs) + tf.summary.scalar("model/value_loss", self.vf_loss / bs) + tf.summary.scalar("model/entropy", self.entropy / bs) + tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads)) + tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list)) + self.summary_op = tf.summary.merge_all() + + self.sess = tf.Session(graph=self.g, config=tf.ConfigProto( + intra_op_parallelism_threads=1, inter_op_parallelism_threads=2)) + self.variables = ray.experimental.TensorFlowVariables(self.loss, + self.sess) + self.sess.run(tf.global_variables_initializer()) + + def apply_gradients(self, grads): + feed_dict = {self.grads[i]: grads[i] + for i in range(len(grads))} + self.sess.run(self._apply_gradients, feed_dict=feed_dict) + + def get_weights(self): + weights = self.variables.get_weights() + return weights + + def set_weights(self, weights): + self.variables.set_weights(weights) + + def compute_gradients(self, batch): + raise NotImplementedError + + def get_vf_loss(self): + raise NotImplementedError + + def compute_action(self, observations): + raise NotImplementedError + + def value(self, ob): + raise NotImplementedError diff --git a/python/ray/rllib/a3c/torchpolicy.py b/python/ray/rllib/a3c/torchpolicy.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index ecbea794a..a23db88dd 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -21,7 +21,8 @@ MODEL_CONFIGS = [ "extra_frameskip", # (int) for number of frames to skip "fcnet_activation", # Nonlinearity for fully connected net (tanh, relu) "fcnet_hiddens", # Number of hidden layers for fully connected net - "free_log_std" # Documented in ray.rllib.models.Model + "free_log_std", # Documented in ray.rllib.models.Model + "channel_major", # Pytorch conv requires images to be channel-major ] diff --git a/python/ray/rllib/models/preprocessors.py b/python/ray/rllib/models/preprocessors.py index 97ed9e5cd..93ef6a0b4 100644 --- a/python/ray/rllib/models/preprocessors.py +++ b/python/ray/rllib/models/preprocessors.py @@ -30,11 +30,16 @@ class AtariPixelPreprocessor(Preprocessor): self._grayscale = self._options.get("grayscale", False) self._zero_mean = self._options.get("zero_mean", True) self._dim = self._options.get("dim", 80) + self._pytorch = self._options.get("pytorch", False) if self._grayscale: self.shape = (self._dim, self._dim, 1) else: self.shape = (self._dim, self._dim, 3) + # pytorch requires (# in-channels, row dim, col dim) + if self._pytorch: + self.shape = self.shape[::-1] + # TODO(ekl) why does this need to return an extra size-1 dim (the [None]) def transform(self, observation): """Downsamples images from (210, 160, 3) by the configured factor.""" @@ -54,6 +59,8 @@ class AtariPixelPreprocessor(Preprocessor): scaled = (scaled - 128) / 128 else: scaled *= 1.0 / 255.0 + if self._pytorch: + scaled = np.reshape(scaled, self.shape) return scaled