[rllib] Pull out shared models for evolution strategies and policy gradient. (#719)

* wip

* works with cartpole

* lint

* fix pg

* comment

* action dist rename

* preprocessor

* fix test

* typo

* fix the action[0] nonsense

* revert

* satisfy the lint

* wip

* works with cartpole

* lint

* fix pg

* comment

* action dist rename

* preprocessor

* fix test

* typo

* fix the action[0] nonsense

* revert

* satisfy the lint

* Minor indentation changes.

* fix merge

* add humanoid

* fix linting

* more 4 space

* fix

* fix linT

* oops

* es parity
This commit is contained in:
Eric Liang
2017-07-17 01:58:54 -07:00
committed by Philipp Moritz
parent 8fc7dc3ed4
commit 420013774c
19 changed files with 333 additions and 239 deletions
+12 -11
View File
@@ -8,6 +8,7 @@ import sys
import tempfile
import uuid
import smart_open
if sys.version_info[0] == 2:
import cStringIO as StringIO
elif sys.version_info[0] == 3:
@@ -60,9 +61,9 @@ class Algorithm(object):
you should create a new algorithm instance for each training session.
Attributes:
env_name (str): Name of the OpenAI gym environment to train against.
config (obj): Algorithm-specific configuration data.
logdir (str): Directory in which training outputs should be placed.
env_name (str): Name of the OpenAI gym environment to train against.
config (obj): Algorithm-specific configuration data.
logdir (str): Directory in which training outputs should be placed.
TODO(ekl): support checkpoint / restore of training state.
"""
@@ -71,11 +72,11 @@ class Algorithm(object):
"""Initialize an RLLib algorithm.
Args:
env_name (str): The name of the OpenAI gym environment to use.
config (obj): Algorithm-specific configuration data.
upload_dir (str): Root directory into which the output directory
should be placed. Can be local like file:///tmp/ray/ or on S3
like s3://bucketname/.
env_name (str): The name of the OpenAI gym environment to use.
config (obj): Algorithm-specific configuration data.
upload_dir (str): Root directory into which the output directory
should be placed. Can be local like file:///tmp/ray/ or on S3
like s3://bucketname/.
"""
upload_dir = "file:///tmp/ray" if upload_dir is None else upload_dir
self.experiment_id = uuid.uuid4()
@@ -88,8 +89,8 @@ class Algorithm(object):
self.__class__.__name__,
datetime.today().strftime("%Y-%m-%d_%H-%M-%S"))
if upload_dir.startswith("file"):
self.logdir = "file://" + tempfile.mkdtemp(prefix=prefix,
dir="/tmp/ray")
self.logdir = "file://" + tempfile.mkdtemp(
prefix=prefix, dir="/tmp/ray")
else:
self.logdir = os.path.join(upload_dir, prefix)
log_path = os.path.join(self.logdir, "config.json")
@@ -103,7 +104,7 @@ class Algorithm(object):
"""Runs one logical iteration of training.
Returns:
A TrainingResult that describes training progress.
A TrainingResult that describes training progress.
"""
raise NotImplementedError
@@ -73,15 +73,15 @@ class Worker(object):
self.env = gym.make(env_name)
self.sess = utils.make_session(single_threaded=True)
self.policy = policies.MujocoPolicy(self.env.observation_space,
self.env.action_space,
**policy_params)
self.policy = policies.GenericPolicy(
self.env.observation_space, self.env.action_space, **policy_params)
tf_util.initialize()
self.rs = np.random.RandomState()
assert self.policy.needs_ob_stat == (self.config["calc_obstat_prob"] !=
0)
assert (
self.policy.needs_ob_stat ==
(self.config["calc_obstat_prob"] != 0))
def rollout_and_update_ob_stat(self, timestep_limit, task_ob_stat):
if (self.policy.needs_ob_stat and
@@ -109,15 +109,15 @@ class Worker(object):
noise_inds, returns, sign_returns, lengths = [], [], [], []
# We set eps=0 because we're incrementing only.
task_ob_stat = utils.RunningStat(self.env.observation_space.shape,
eps=0)
task_ob_stat = utils.RunningStat(
self.env.observation_space.shape, eps=0)
# Perform some rollouts with noise.
task_tstart = time.time()
while (len(noise_inds) == 0 or
time.time() - task_tstart < self.min_task_runtime):
noise_idx = self.noise.sample_index(self.rs,
self.policy.num_params)
noise_idx = self.noise.sample_index(
self.rs, self.policy.num_params)
perturbation = self.config["noise_stdev"] * self.noise.get(
noise_idx, self.policy.num_params)
@@ -133,8 +133,8 @@ class Worker(object):
noise_inds.append(noise_idx)
returns.append([rews_pos.sum(), rews_neg.sum()])
sign_returns.append([np.sign(rews_pos).sum(),
np.sign(rews_neg).sum()])
sign_returns.append(
[np.sign(rews_pos).sum(), np.sign(rews_neg).sum()])
lengths.append([len_pos, len_neg])
return Result(
@@ -157,13 +157,17 @@ class EvolutionStrategies(Algorithm):
Algorithm.__init__(self, env_name, config, upload_dir=upload_dir)
policy_params = {
"ac_bins": "continuous:",
"ac_noise_std": 0.01,
"nonlin_type": "tanh",
"hidden_dims": [256, 256],
"connection_type": "ff"
"ac_noise_std": 0.01
}
env = gym.make(env_name)
utils.make_session(single_threaded=False)
self.policy = policies.GenericPolicy(
env.observation_space, env.action_space, **policy_params)
tf_util.initialize()
self.optimizer = optimizers.Adam(self.policy, config["stepsize"])
self.ob_stat = utils.RunningStat(env.observation_space.shape, eps=1e-2)
# Create the shared noise table.
print("Creating shared noise table.")
noise_id = create_shared_noise.remote()
@@ -171,17 +175,9 @@ class EvolutionStrategies(Algorithm):
# Create the actors.
print("Creating actors.")
self.workers = [Worker.remote(config, policy_params, env_name,
noise_id)
for _ in range(config["num_workers"])]
env = gym.make(env_name)
utils.make_session(single_threaded=False)
self.policy = policies.MujocoPolicy(
env.observation_space, env.action_space, **policy_params)
tf_util.initialize()
self.optimizer = optimizers.Adam(self.policy, config["stepsize"])
self.ob_stat = utils.RunningStat(env.observation_space.shape, eps=1e-2)
self.workers = [
Worker.remote(config, policy_params, env_name, noise_id)
for _ in range(config["num_workers"])]
self.episodes_so_far = 0
self.timesteps_so_far = 0
@@ -241,13 +237,13 @@ class EvolutionStrategies(Algorithm):
curr_task_results.append(result)
# Update ob stats.
if self.policy.needs_ob_stat and result.ob_count > 0:
self.ob_stat.increment(result.ob_sum, result.ob_sumsq,
result.ob_count)
self.ob_stat.increment(
result.ob_sum, result.ob_sumsq, result.ob_count)
ob_count_this_batch += result.ob_count
# Assemble the results.
noise_inds_n = np.concatenate([r.noise_inds_n for
r in curr_task_results])
noise_inds_n = np.concatenate(
[r.noise_inds_n for r in curr_task_results])
returns_n2 = np.concatenate([r.returns_n2 for r in curr_task_results])
lengths_n2 = np.concatenate([r.lengths_n2 for r in curr_task_results])
assert (noise_inds_n.shape[0] == returns_n2.shape[0] ==
@@ -265,9 +261,10 @@ class EvolutionStrategies(Algorithm):
for idx in noise_inds_n),
batch_size=500)
g /= returns_n2.size
assert (g.shape == (self.policy.num_params,) and
g.dtype == np.float32 and
count == len(noise_inds_n))
assert (
g.shape == (self.policy.num_params,) and
g.dtype == np.float32 and
count == len(noise_inds_n))
update_ratio = self.optimizer.update(-g + config["l2coeff"] * theta)
# Update ob stat (we're never running the policy in the master, but we
@@ -8,11 +8,13 @@ from __future__ import print_function
import logging
import pickle
import gym.spaces
import h5py
import numpy as np
import tensorflow as tf
from ray.rllib.evolution_strategies import tf_util as U
from ray.rllib.models import ModelCatalog
logger = logging.getLogger(__name__)
@@ -137,24 +139,10 @@ def bins(x, dim, num_bins, name):
return tf.argmax(scores_nab, 2)
class MujocoPolicy(Policy):
def _initialize(self, ob_space, ac_space, ac_bins, ac_noise_std,
nonlin_type, hidden_dims, connection_type):
class GenericPolicy(Policy):
def _initialize(self, ob_space, ac_space, ac_noise_std):
self.ac_space = ac_space
self.ac_bins = ac_bins
self.ac_noise_std = ac_noise_std
self.hidden_dims = hidden_dims
self.connection_type = connection_type
assert len(ob_space.shape) == len(self.ac_space.shape) == 1
assert (np.all(np.isfinite(self.ac_space.low)) and
np.all(np.isfinite(self.ac_space.high))), ("Action bounds "
"required")
self.nonlin = {'tanh': tf.tanh,
'relu': tf.nn.relu,
'lrelu': U.lrelu,
'elu': tf.nn.elu}[nonlin_type]
with tf.variable_scope(type(self).__name__) as scope:
# Observation normalization.
@@ -171,72 +159,24 @@ class MujocoPolicy(Policy):
tf.assign(ob_std, in_std),
])
inputs = tf.placeholder(tf.float32, [None] + list(ob_space.shape))
# TODO(ekl): we should do clipping in a standard RLlib preprocessor
clipped_inputs = tf.clip_by_value(
(inputs - ob_mean) / ob_std, -5.0, 5.0)
# Policy network.
o = tf.placeholder(tf.float32, [None] + list(ob_space.shape))
a = self._make_net(tf.clip_by_value((o - ob_mean) / ob_std,
-5.0, 5.0))
self._act = U.function([o], a)
dist_class, dist_dim = ModelCatalog.get_action_dist(
self.ac_space, dist_type='deterministic')
model = ModelCatalog.get_model(clipped_inputs, dist_dim)
dist = dist_class(model.outputs)
self._act = U.function([inputs], dist.sample())
return scope
def _make_net(self, o):
# Process observation.
if self.connection_type == 'ff':
x = o
for ilayer, hd in enumerate(self.hidden_dims):
x = self.nonlin(U.dense(x, hd, 'l{}'.format(ilayer),
U.normc_initializer(1.0)))
else:
raise NotImplementedError(self.connection_type)
# Map to action.
adim = self.ac_space.shape[0]
ahigh = self.ac_space.high
alow = self.ac_space.low
assert isinstance(self.ac_bins, str)
ac_bin_mode, ac_bin_arg = self.ac_bins.split(':')
if ac_bin_mode == 'uniform':
# Uniformly spaced bins, from ac_space.low to ac_space.high.
num_ac_bins = int(ac_bin_arg)
aidx_na = bins(x, adim, num_ac_bins, 'out')
ac_range_1a = (ahigh - alow)[None, :]
a = (1. / (num_ac_bins - 1.) * tf.to_float(aidx_na) * ac_range_1a +
alow[None, :])
elif ac_bin_mode == 'custom':
# Custom bins specified as a list of values from -1 to 1.
# The bins are rescaled to ac_space.low to ac_space.high.
acvals_k = np.array(list(map(float, ac_bin_arg.split(','))),
dtype=np.float32)
logger.info('Custom action values: ' + ' '.join('{:.3f}'.format(x)
for x in acvals_k))
assert (acvals_k.ndim == 1 and acvals_k[0] == -1 and
acvals_k[-1] == 1)
acvals_ak = ((ahigh - alow)[:, None] /
(acvals_k[-1] - acvals_k[0]) *
(acvals_k - acvals_k[0])[None, :] + alow[:, None])
aidx_na = bins(x, adim, len(acvals_k),
'out') # Values in [0, k-1].
a = tf.gather_nd(
acvals_ak,
tf.concat([
tf.tile(np.arange(adim)[None, :, None],
[tf.shape(aidx_na)[0], 1, 1]),
2,
tf.expand_dims(aidx_na, -1)
]) # (n, a, 2)
) # (n, a)
elif ac_bin_mode == 'continuous':
a = U.dense(x, adim, 'out', U.normc_initializer(0.01))
else:
raise NotImplementedError(ac_bin_mode)
return a
def act(self, ob, random_stream=None):
a = self._act(ob)
if random_stream is not None and self.ac_noise_std != 0:
if not isinstance(self.ac_space, gym.spaces.Discrete) and \
random_stream is not None and self.ac_noise_std != 0:
a += random_stream.randn(*a.shape) * self.ac_noise_std
return a
+1
View File
@@ -0,0 +1 @@
Shared neural network models for RLlib.
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.models.catalog import ModelCatalog
__all__ = ["ModelCatalog"]
@@ -6,16 +6,38 @@ import tensorflow as tf
import numpy as np
class Categorical(object):
def __init__(self, logits):
self.logits = logits
class ActionDistribution(object):
"""The policy action distribution of an agent.
Args:
inputs (Tensor): The input vector to compute samples from.
"""
def __init__(self, inputs):
self.inputs = inputs
def logp(self, x):
raise NotImplementedError
def kl(self, other):
raise NotImplementedError
def entropy(self):
raise NotImplementedError
def sample(self):
raise NotImplementedError
class Categorical(ActionDistribution):
"""Categorical distribution for discrete action spaces."""
def logp(self, x):
return -tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=self.logits, labels=x)
logits=self.inputs, labels=x)
def entropy(self):
a0 = self.logits - tf.reduce_max(self.logits, reduction_indices=[1],
a0 = self.inputs - tf.reduce_max(self.inputs, reduction_indices=[1],
keep_dims=True)
ea0 = tf.exp(a0)
z0 = tf.reduce_sum(ea0, reduction_indices=[1], keep_dims=True)
@@ -23,9 +45,9 @@ class Categorical(object):
return tf.reduce_sum(p0 * (tf.log(z0) - a0), reduction_indices=[1])
def kl(self, other):
a0 = self.logits - tf.reduce_max(self.logits, reduction_indices=[1],
a0 = self.inputs - tf.reduce_max(self.inputs, reduction_indices=[1],
keep_dims=True)
a1 = other.logits - tf.reduce_max(other.logits, reduction_indices=[1],
a1 = other.inputs - tf.reduce_max(other.inputs, reduction_indices=[1],
keep_dims=True)
ea0 = tf.exp(a0)
ea1 = tf.exp(a1)
@@ -36,13 +58,19 @@ class Categorical(object):
reduction_indices=[1])
def sample(self):
return tf.multinomial(self.logits, 1)
return tf.multinomial(self.inputs, 1)[0]
class DiagGaussian(object):
def __init__(self, flat):
self.flat = flat
mean, logstd = tf.split(flat, 2, axis=1)
class DiagGaussian(ActionDistribution):
"""Action distribution where each vector element is a gaussian.
The first half of the input vector defines the gaussian means, and the
second half the gaussian standard deviations.
"""
def __init__(self, inputs):
ActionDistribution.__init__(self, inputs)
mean, logstd = tf.split(inputs, 2, axis=1)
self.mean = mean
self.logstd = logstd
self.std = tf.exp(logstd)
@@ -67,3 +95,13 @@ class DiagGaussian(object):
def sample(self):
return self.mean + self.std * tf.random_normal(tf.shape(self.mean))
class Deterministic(ActionDistribution):
"""Action distribution that returns the input values directly.
This is similar to DiagGaussian with standard deviation zero.
"""
def sample(self):
return self.inputs
+76
View File
@@ -0,0 +1,76 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gym
from ray.rllib.models.action_dist import (
Categorical, Deterministic, DiagGaussian)
from ray.rllib.models.fcnet import FullyConnectedNetwork
from ray.rllib.models.visionnet import VisionNetwork
class ModelCatalog(object):
"""Registry of default models and action distributions for envs.
Example:
dist_class, dist_dim = ModelCatalog.get_action_dist(env.action_space)
model = ModelCatalog.get_model(inputs, dist_dim)
dist = dist_class(model.outputs)
action_op = dist.sample()
"""
@staticmethod
def get_action_dist(action_space, dist_type=None):
"""Returns action distribution class and size for the given action space.
Args:
action_space (Space): Action space of the target gym env.
Returns:
dist_class (ActionDistribution): Python class of the distribution.
dist_dim (int): The size of the input vector to the distribution.
"""
if isinstance(action_space, gym.spaces.Box):
if dist_type is None:
return DiagGaussian, action_space.shape[0] * 2
elif dist_type == 'deterministic':
return Deterministic, action_space.shape[0]
elif isinstance(action_space, gym.spaces.Discrete):
return Categorical, action_space.n
raise NotImplementedError(
"Unsupported args: {} {}".format(action_space, dist_type))
@staticmethod
def get_model(inputs, num_outputs):
"""Returns a suitable model conforming to given input and output specs.
Args:
inputs (Tensor): The input tensor to the model.
num_outputs (int): The size of the output vector of the model.
Returns:
model (Model): Neural network model.
"""
obs_rank = len(inputs.get_shape()) - 1
if obs_rank > 1:
return VisionNetwork(inputs, num_outputs)
return FullyConnectedNetwork(inputs, num_outputs)
@staticmethod
def get_preprocessor(env_name):
"""Returns a suitable processor for the given environment.
Args:
env_name (str): The name of the environment.
Returns:
preprocessor (Preprocessor): Preprocessor for the env observations.
"""
raise NotImplementedError
+37
View File
@@ -0,0 +1,37 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.slim as slim
import numpy as np
from ray.rllib.models.model import Model
def normc_initializer(std=1.0):
def _initializer(shape, dtype=None, partition_info=None):
out = np.random.randn(*shape).astype(np.float32)
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
return tf.constant(out)
return _initializer
class FullyConnectedNetwork(Model):
"""Generic fully connected network."""
def _init(self, inputs, num_outputs):
with tf.name_scope("fc_net"):
fc1 = slim.fully_connected(
inputs, 256, weights_initializer=normc_initializer(1.0),
activation_fn=tf.nn.tanh,
scope="fc1")
fc2 = slim.fully_connected(
fc1, 256, weights_initializer=normc_initializer(1.0),
activation_fn=tf.nn.tanh,
scope="fc2")
fc3 = slim.fully_connected(
fc2, num_outputs, weights_initializer=normc_initializer(0.01),
activation_fn=None, scope="fc3")
return fc3
+28
View File
@@ -0,0 +1,28 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
class Model(object):
"""Defines an abstract network model for use with RLlib.
Models convert input tensors to a number of output features. These features
can then be interpreted by ActionDistribution classes to determine
e.g. agent action values.
"""
def __init__(self, inputs, num_outputs):
self.inputs = inputs
self.outputs = self._init(inputs, num_outputs)
def _init(self):
"""Initializes the model given self.inputs and self.num_outputs."""
raise NotImplementedError
def inputs(self):
"""Returns the input placeholder for this model."""
return self.inputs
def outputs(self):
"""Returns the output tensor of this model."""
return self.outputs
+14
View File
@@ -0,0 +1,14 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# TODO(ekl) implement common preprocessors
class Preprocessor(object):
def output_shape(self):
"""Returns the new output shape, or None if unchanged."""
raise NotImplementedError
def preprocess(self, observation):
"""Returns the preprocessed observation."""
raise NotImplementedError
+22
View File
@@ -0,0 +1,22 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.slim as slim
from ray.rllib.models.model import Model
class VisionNetwork(Model):
"""Generic vision network."""
def _init(self, inputs, num_outputs):
with tf.name_scope("vision_net"):
conv1 = slim.conv2d(inputs, 16, [8, 8], 4, scope="conv1")
conv2 = slim.conv2d(conv1, 32, [4, 4], 2, scope="conv2")
fc1 = slim.conv2d(
conv2, 512, [10, 10], padding="VALID", scope="fc1")
fc2 = slim.conv2d(fc1, num_outputs, [1, 1], activation_fn=None,
normalizer_fn=None, scope="fc2")
return tf.squeeze(fc2, [1, 2])
+25 -32
View File
@@ -11,7 +11,7 @@ from tensorflow.python import debug as tf_debug
import ray
from ray.rllib.parallel import LocalSyncParallelOptimizer
from ray.rllib.policy_gradient.distributions import Categorical, DiagGaussian
from ray.rllib.models import ModelCatalog
from ray.rllib.policy_gradient.env import BatchedEnv
from ray.rllib.policy_gradient.loss import ProximalPolicyLoss
from ray.rllib.policy_gradient.filter import MeanStdFilter
@@ -33,8 +33,8 @@ class Agent(object):
network weights. When run as a remote agent, only this graph is used.
"""
def __init__(self, name, batchsize, preprocessor, config, logdir,
is_remote):
def __init__(
self, name, batchsize, preprocessor, config, logdir, is_remote):
if is_remote:
os.environ["CUDA_VISIBLE_DEVICES"] = ""
devices = ["/cpu:0"]
@@ -54,37 +54,30 @@ class Agent(object):
self.sess = tf.Session(config=config_proto)
if config["use_tf_debugger"] and not is_remote:
self.sess = tf_debug.LocalCLIDebugWrapperSession(self.sess)
self.sess.add_tensor_filter("has_inf_or_nan",
tf_debug.has_inf_or_nan)
self.sess.add_tensor_filter(
"has_inf_or_nan", tf_debug.has_inf_or_nan)
# Defines the training inputs.
self.kl_coeff = tf.placeholder(name="newkl", shape=(),
dtype=tf.float32)
self.observations = tf.placeholder(tf.float32,
shape=(None,) + preprocessor.shape)
self.kl_coeff = tf.placeholder(
name="newkl", shape=(), dtype=tf.float32)
self.observations = tf.placeholder(
tf.float32, shape=(None,) + preprocessor.shape)
self.advantages = tf.placeholder(tf.float32, shape=(None,))
action_space = self.env.action_space
if isinstance(action_space, gym.spaces.Box):
# The first half of the dimensions are the means, the second half
# are the standard deviations.
self.action_dim = action_space.shape[0]
self.action_shape = (self.action_dim,)
self.logit_dim = 2 * self.action_dim
self.actions = tf.placeholder(tf.float32,
shape=(None, self.action_dim))
self.distribution_class = DiagGaussian
self.actions = tf.placeholder(
tf.float32, shape=(None, action_space.shape[0]))
elif isinstance(action_space, gym.spaces.Discrete):
self.action_dim = action_space.n
self.action_shape = ()
self.logit_dim = self.action_dim
self.actions = tf.placeholder(tf.int64, shape=(None,))
self.distribution_class = Categorical
else:
raise NotImplemented("action space" + str(type(action_space)) +
"currently not supported")
self.prev_logits = tf.placeholder(tf.float32,
shape=(None, self.logit_dim))
raise NotImplemented(
"action space" + str(type(action_space)) +
"currently not supported")
self.distribution_class, self.logit_dim = ModelCatalog.get_action_dist(
action_space)
self.prev_logits = tf.placeholder(
tf.float32, shape=(None, self.logit_dim))
assert config["sgd_batchsize"] % len(devices) == 0, \
"Batch size must be evenly divisible by devices"
@@ -99,7 +92,8 @@ class Agent(object):
return ProximalPolicyLoss(
self.env.observation_space, self.env.action_space,
obs, advs, acts, plog, self.logit_dim,
self.kl_coeff, self.distribution_class, self.config, self.sess)
self.kl_coeff, self.distribution_class, self.config,
self.sess)
self.par_opt = LocalSyncParallelOptimizer(
tf.train.AdamOptimizer(self.config["sgd_stepsize"]),
@@ -118,14 +112,13 @@ class Agent(object):
self.mean_kl = tf.reduce_mean(
tf.stack(values=[policy.mean_kl for policy in policies]), 0)
self.mean_entropy = tf.reduce_mean(
tf.stack(values=[policy.mean_entropy for policy in policies]),
0)
tf.stack(
values=[policy.mean_entropy for policy in policies]), 0)
# References to the model weights
self.common_policy = self.par_opt.get_common_loss()
self.variables = ray.experimental.TensorFlowVariables(
self.common_policy.loss,
self.sess)
self.common_policy.loss, self.sess)
self.observation_filter = MeanStdFilter(preprocessor.shape, clip=None)
self.reward_filter = MeanStdFilter((), clip=5.0)
self.sess.run(tf.global_variables_initializer())
@@ -139,8 +132,8 @@ class Agent(object):
trajectories["logprobs"]],
full_trace=full_trace)
def run_sgd_minibatch(self, batch_index, kl_coeff, full_trace,
file_writer):
def run_sgd_minibatch(
self, batch_index, kl_coeff, full_trace, file_writer):
return self.par_opt.optimize(
self.sess,
batch_index,
+1 -2
View File
@@ -55,8 +55,7 @@ class BatchedEnv(object):
observations.append(np.zeros(self.shape))
rewards.append(0.0)
continue
observation, reward, done, info = self.envs[i].step(
action if len(action) > 1 else action[0])
observation, reward, done, info = self.envs[i].step(action)
if render:
self.envs[0].render()
observations.append(self.preprocessor(observation))
+4 -7
View File
@@ -4,8 +4,8 @@ from __future__ import print_function
import gym.spaces
import tensorflow as tf
from ray.rllib.policy_gradient.models.visionnet import vision_net
from ray.rllib.policy_gradient.models.fcnet import fc_net
from ray.rllib.models import ModelCatalog
class ProximalPolicyLoss(object):
@@ -21,11 +21,8 @@ class ProximalPolicyLoss(object):
# Saved so that we can compute actions given different observations
self.observations = observations
if len(observation_space.shape) > 1:
self.curr_logits = vision_net(observations, num_classes=logit_dim)
else:
assert len(observation_space.shape) == 1
self.curr_logits = fc_net(observations, num_classes=logit_dim)
self.curr_logits = ModelCatalog.get_model(
observations, logit_dim).outputs
self.curr_dist = distribution_class(self.curr_logits)
self.sampler = self.curr_dist.sample()
@@ -1,38 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.slim as slim
import numpy as np
def normc_initializer(std=1.0):
def _initializer(shape, dtype=None, partition_info=None):
out = np.random.randn(*shape).astype(np.float32)
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
return tf.constant(out)
return _initializer
def fc_net(inputs, num_classes=10, logstd=False):
with tf.name_scope("fc_net"):
fc1 = slim.fully_connected(inputs, 128,
weights_initializer=normc_initializer(1.0),
scope="fc1")
fc2 = slim.fully_connected(fc1, 128,
weights_initializer=normc_initializer(1.0),
scope="fc2")
fc3 = slim.fully_connected(fc2, 128,
weights_initializer=normc_initializer(1.0),
scope="fc3")
fc4 = slim.fully_connected(fc3, num_classes,
weights_initializer=normc_initializer(0.01),
activation_fn=None, scope="fc4")
if logstd:
logstd = tf.get_variable(name="logstd", shape=[num_classes],
initializer=tf.zeros_initializer)
return tf.concat(1, [fc4, logstd])
else:
return fc4
@@ -1,16 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.slim as slim
def vision_net(inputs, num_classes=10):
with tf.name_scope("vision_net"):
conv1 = slim.conv2d(inputs, 16, [8, 8], 4, scope="conv1")
conv2 = slim.conv2d(conv1, 32, [4, 4], 2, scope="conv2")
fc1 = slim.conv2d(conv2, 512, [10, 10], padding="VALID", scope="fc1")
fc2 = slim.conv2d(fc1, num_classes, [1, 1], activation_fn=None,
normalizer_fn=None, scope="fc2")
return tf.squeeze(fc2, [1, 2])
@@ -48,8 +48,7 @@ class PolicyGradient(Algorithm):
Algorithm.__init__(self, env_name, config, upload_dir=upload_dir)
# TODO(ekl): The preprocessor should be associated with the env
# elsewhere.
# TODO(ekl): preprocessor should be associated with the env elsewhere
if self.env_name == "Pong-v0":
preprocessor = AtariPixelPreprocessor()
elif self.env_name == "Pong-ram-v3":
@@ -58,6 +57,8 @@ class PolicyGradient(Algorithm):
preprocessor = NoPreprocessor()
elif self.env_name == "Walker2d-v1":
preprocessor = NoPreprocessor()
elif self.env_name == "Humanoid-v1":
preprocessor = NoPreprocessor()
else:
preprocessor = AtariPixelPreprocessor()
@@ -93,8 +94,8 @@ class PolicyGradient(Algorithm):
if config["model_checkpoint_file"]:
checkpoint_path = saver.save(
model.sess,
os.path.join(self.logdir,
config["model_checkpoint_file"] % j))
os.path.join(
self.logdir, config["model_checkpoint_file"] % j))
print("Checkpoint saved in file: %s" % checkpoint_path)
checkpointing_end = time.time()
weights = ray.put(model.get_weights())
@@ -135,8 +136,8 @@ class PolicyGradient(Algorithm):
for i in range(config["num_sgd_iter"]):
sgd_start = time.time()
batch_index = 0
num_batches = (int(tuples_per_device) //
int(model.per_device_batch_size))
num_batches = (
int(tuples_per_device) // int(model.per_device_batch_size))
loss, kl, entropy = [], [], []
permutation = np.random.permutation(num_batches)
while batch_index < num_batches:
@@ -155,8 +156,8 @@ class PolicyGradient(Algorithm):
kl = np.mean(kl)
entropy = np.mean(entropy)
sgd_end = time.time()
print("{:>15}{:15.5e}{:15.5e}{:15.5e}".format(i, loss, kl,
entropy))
print(
"{:>15}{:15.5e}{:15.5e}{:15.5e}".format(i, loss, kl, entropy))
values = []
if i == config["num_sgd_iter"] - 1:
@@ -7,11 +7,12 @@ import numpy as np
import tensorflow as tf
from numpy.testing import assert_allclose
from ray.rllib.policy_gradient.distributions import Categorical
from ray.rllib.models.action_dist import Categorical
from ray.rllib.policy_gradient.utils import flatten, concatenate
class DistibutionsTest(unittest.TestCase):
# TODO(ekl): move to rllib/models dir
class DistributionsTest(unittest.TestCase):
def testCategorical(self):
num_samples = 100000