diff --git a/python/ray/rllib/common.py b/python/ray/rllib/common.py index de0c57d0a..c30a8267b 100644 --- a/python/ray/rllib/common.py +++ b/python/ray/rllib/common.py @@ -8,6 +8,7 @@ import sys import tempfile import uuid import smart_open + if sys.version_info[0] == 2: import cStringIO as StringIO elif sys.version_info[0] == 3: @@ -60,9 +61,9 @@ class Algorithm(object): you should create a new algorithm instance for each training session. Attributes: - env_name (str): Name of the OpenAI gym environment to train against. - config (obj): Algorithm-specific configuration data. - logdir (str): Directory in which training outputs should be placed. + env_name (str): Name of the OpenAI gym environment to train against. + config (obj): Algorithm-specific configuration data. + logdir (str): Directory in which training outputs should be placed. TODO(ekl): support checkpoint / restore of training state. """ @@ -71,11 +72,11 @@ class Algorithm(object): """Initialize an RLLib algorithm. Args: - env_name (str): The name of the OpenAI gym environment to use. - config (obj): Algorithm-specific configuration data. - upload_dir (str): Root directory into which the output directory - should be placed. Can be local like file:///tmp/ray/ or on S3 - like s3://bucketname/. + env_name (str): The name of the OpenAI gym environment to use. + config (obj): Algorithm-specific configuration data. + upload_dir (str): Root directory into which the output directory + should be placed. Can be local like file:///tmp/ray/ or on S3 + like s3://bucketname/. """ upload_dir = "file:///tmp/ray" if upload_dir is None else upload_dir self.experiment_id = uuid.uuid4() @@ -88,8 +89,8 @@ class Algorithm(object): self.__class__.__name__, datetime.today().strftime("%Y-%m-%d_%H-%M-%S")) if upload_dir.startswith("file"): - self.logdir = "file://" + tempfile.mkdtemp(prefix=prefix, - dir="/tmp/ray") + self.logdir = "file://" + tempfile.mkdtemp( + prefix=prefix, dir="/tmp/ray") else: self.logdir = os.path.join(upload_dir, prefix) log_path = os.path.join(self.logdir, "config.json") @@ -103,7 +104,7 @@ class Algorithm(object): """Runs one logical iteration of training. Returns: - A TrainingResult that describes training progress. + A TrainingResult that describes training progress. """ raise NotImplementedError diff --git a/python/ray/rllib/evolution_strategies/evolution_strategies.py b/python/ray/rllib/evolution_strategies/evolution_strategies.py index 21062dc19..537338036 100644 --- a/python/ray/rllib/evolution_strategies/evolution_strategies.py +++ b/python/ray/rllib/evolution_strategies/evolution_strategies.py @@ -73,15 +73,15 @@ class Worker(object): self.env = gym.make(env_name) self.sess = utils.make_session(single_threaded=True) - self.policy = policies.MujocoPolicy(self.env.observation_space, - self.env.action_space, - **policy_params) + self.policy = policies.GenericPolicy( + self.env.observation_space, self.env.action_space, **policy_params) tf_util.initialize() self.rs = np.random.RandomState() - assert self.policy.needs_ob_stat == (self.config["calc_obstat_prob"] != - 0) + assert ( + self.policy.needs_ob_stat == + (self.config["calc_obstat_prob"] != 0)) def rollout_and_update_ob_stat(self, timestep_limit, task_ob_stat): if (self.policy.needs_ob_stat and @@ -109,15 +109,15 @@ class Worker(object): noise_inds, returns, sign_returns, lengths = [], [], [], [] # We set eps=0 because we're incrementing only. - task_ob_stat = utils.RunningStat(self.env.observation_space.shape, - eps=0) + task_ob_stat = utils.RunningStat( + self.env.observation_space.shape, eps=0) # Perform some rollouts with noise. task_tstart = time.time() while (len(noise_inds) == 0 or time.time() - task_tstart < self.min_task_runtime): - noise_idx = self.noise.sample_index(self.rs, - self.policy.num_params) + noise_idx = self.noise.sample_index( + self.rs, self.policy.num_params) perturbation = self.config["noise_stdev"] * self.noise.get( noise_idx, self.policy.num_params) @@ -133,8 +133,8 @@ class Worker(object): noise_inds.append(noise_idx) returns.append([rews_pos.sum(), rews_neg.sum()]) - sign_returns.append([np.sign(rews_pos).sum(), - np.sign(rews_neg).sum()]) + sign_returns.append( + [np.sign(rews_pos).sum(), np.sign(rews_neg).sum()]) lengths.append([len_pos, len_neg]) return Result( @@ -157,13 +157,17 @@ class EvolutionStrategies(Algorithm): Algorithm.__init__(self, env_name, config, upload_dir=upload_dir) policy_params = { - "ac_bins": "continuous:", - "ac_noise_std": 0.01, - "nonlin_type": "tanh", - "hidden_dims": [256, 256], - "connection_type": "ff" + "ac_noise_std": 0.01 } + env = gym.make(env_name) + utils.make_session(single_threaded=False) + self.policy = policies.GenericPolicy( + env.observation_space, env.action_space, **policy_params) + tf_util.initialize() + self.optimizer = optimizers.Adam(self.policy, config["stepsize"]) + self.ob_stat = utils.RunningStat(env.observation_space.shape, eps=1e-2) + # Create the shared noise table. print("Creating shared noise table.") noise_id = create_shared_noise.remote() @@ -171,17 +175,9 @@ class EvolutionStrategies(Algorithm): # Create the actors. print("Creating actors.") - self.workers = [Worker.remote(config, policy_params, env_name, - noise_id) - for _ in range(config["num_workers"])] - - env = gym.make(env_name) - utils.make_session(single_threaded=False) - self.policy = policies.MujocoPolicy( - env.observation_space, env.action_space, **policy_params) - tf_util.initialize() - self.optimizer = optimizers.Adam(self.policy, config["stepsize"]) - self.ob_stat = utils.RunningStat(env.observation_space.shape, eps=1e-2) + self.workers = [ + Worker.remote(config, policy_params, env_name, noise_id) + for _ in range(config["num_workers"])] self.episodes_so_far = 0 self.timesteps_so_far = 0 @@ -241,13 +237,13 @@ class EvolutionStrategies(Algorithm): curr_task_results.append(result) # Update ob stats. if self.policy.needs_ob_stat and result.ob_count > 0: - self.ob_stat.increment(result.ob_sum, result.ob_sumsq, - result.ob_count) + self.ob_stat.increment( + result.ob_sum, result.ob_sumsq, result.ob_count) ob_count_this_batch += result.ob_count # Assemble the results. - noise_inds_n = np.concatenate([r.noise_inds_n for - r in curr_task_results]) + noise_inds_n = np.concatenate( + [r.noise_inds_n for r in curr_task_results]) returns_n2 = np.concatenate([r.returns_n2 for r in curr_task_results]) lengths_n2 = np.concatenate([r.lengths_n2 for r in curr_task_results]) assert (noise_inds_n.shape[0] == returns_n2.shape[0] == @@ -265,9 +261,10 @@ class EvolutionStrategies(Algorithm): for idx in noise_inds_n), batch_size=500) g /= returns_n2.size - assert (g.shape == (self.policy.num_params,) and - g.dtype == np.float32 and - count == len(noise_inds_n)) + assert ( + g.shape == (self.policy.num_params,) and + g.dtype == np.float32 and + count == len(noise_inds_n)) update_ratio = self.optimizer.update(-g + config["l2coeff"] * theta) # Update ob stat (we're never running the policy in the master, but we diff --git a/python/ray/rllib/evolution_strategies/policies.py b/python/ray/rllib/evolution_strategies/policies.py index 20273130f..db6c2716e 100644 --- a/python/ray/rllib/evolution_strategies/policies.py +++ b/python/ray/rllib/evolution_strategies/policies.py @@ -8,11 +8,13 @@ from __future__ import print_function import logging import pickle +import gym.spaces import h5py import numpy as np import tensorflow as tf from ray.rllib.evolution_strategies import tf_util as U +from ray.rllib.models import ModelCatalog logger = logging.getLogger(__name__) @@ -137,24 +139,10 @@ def bins(x, dim, num_bins, name): return tf.argmax(scores_nab, 2) -class MujocoPolicy(Policy): - def _initialize(self, ob_space, ac_space, ac_bins, ac_noise_std, - nonlin_type, hidden_dims, connection_type): +class GenericPolicy(Policy): + def _initialize(self, ob_space, ac_space, ac_noise_std): self.ac_space = ac_space - self.ac_bins = ac_bins self.ac_noise_std = ac_noise_std - self.hidden_dims = hidden_dims - self.connection_type = connection_type - - assert len(ob_space.shape) == len(self.ac_space.shape) == 1 - assert (np.all(np.isfinite(self.ac_space.low)) and - np.all(np.isfinite(self.ac_space.high))), ("Action bounds " - "required") - - self.nonlin = {'tanh': tf.tanh, - 'relu': tf.nn.relu, - 'lrelu': U.lrelu, - 'elu': tf.nn.elu}[nonlin_type] with tf.variable_scope(type(self).__name__) as scope: # Observation normalization. @@ -171,72 +159,24 @@ class MujocoPolicy(Policy): tf.assign(ob_std, in_std), ]) + inputs = tf.placeholder(tf.float32, [None] + list(ob_space.shape)) + + # TODO(ekl): we should do clipping in a standard RLlib preprocessor + clipped_inputs = tf.clip_by_value( + (inputs - ob_mean) / ob_std, -5.0, 5.0) + # Policy network. - o = tf.placeholder(tf.float32, [None] + list(ob_space.shape)) - a = self._make_net(tf.clip_by_value((o - ob_mean) / ob_std, - -5.0, 5.0)) - self._act = U.function([o], a) + dist_class, dist_dim = ModelCatalog.get_action_dist( + self.ac_space, dist_type='deterministic') + model = ModelCatalog.get_model(clipped_inputs, dist_dim) + dist = dist_class(model.outputs) + self._act = U.function([inputs], dist.sample()) return scope - def _make_net(self, o): - # Process observation. - if self.connection_type == 'ff': - x = o - for ilayer, hd in enumerate(self.hidden_dims): - x = self.nonlin(U.dense(x, hd, 'l{}'.format(ilayer), - U.normc_initializer(1.0))) - else: - raise NotImplementedError(self.connection_type) - - # Map to action. - adim = self.ac_space.shape[0] - ahigh = self.ac_space.high - alow = self.ac_space.low - assert isinstance(self.ac_bins, str) - ac_bin_mode, ac_bin_arg = self.ac_bins.split(':') - - if ac_bin_mode == 'uniform': - # Uniformly spaced bins, from ac_space.low to ac_space.high. - num_ac_bins = int(ac_bin_arg) - aidx_na = bins(x, adim, num_ac_bins, 'out') - ac_range_1a = (ahigh - alow)[None, :] - a = (1. / (num_ac_bins - 1.) * tf.to_float(aidx_na) * ac_range_1a + - alow[None, :]) - - elif ac_bin_mode == 'custom': - # Custom bins specified as a list of values from -1 to 1. - # The bins are rescaled to ac_space.low to ac_space.high. - acvals_k = np.array(list(map(float, ac_bin_arg.split(','))), - dtype=np.float32) - logger.info('Custom action values: ' + ' '.join('{:.3f}'.format(x) - for x in acvals_k)) - assert (acvals_k.ndim == 1 and acvals_k[0] == -1 and - acvals_k[-1] == 1) - acvals_ak = ((ahigh - alow)[:, None] / - (acvals_k[-1] - acvals_k[0]) * - (acvals_k - acvals_k[0])[None, :] + alow[:, None]) - - aidx_na = bins(x, adim, len(acvals_k), - 'out') # Values in [0, k-1]. - a = tf.gather_nd( - acvals_ak, - tf.concat([ - tf.tile(np.arange(adim)[None, :, None], - [tf.shape(aidx_na)[0], 1, 1]), - 2, - tf.expand_dims(aidx_na, -1) - ]) # (n, a, 2) - ) # (n, a) - elif ac_bin_mode == 'continuous': - a = U.dense(x, adim, 'out', U.normc_initializer(0.01)) - else: - raise NotImplementedError(ac_bin_mode) - - return a - def act(self, ob, random_stream=None): a = self._act(ob) - if random_stream is not None and self.ac_noise_std != 0: + if not isinstance(self.ac_space, gym.spaces.Discrete) and \ + random_stream is not None and self.ac_noise_std != 0: a += random_stream.randn(*a.shape) * self.ac_noise_std return a diff --git a/python/ray/rllib/models/README.txt b/python/ray/rllib/models/README.txt new file mode 100644 index 000000000..d8bd20da3 --- /dev/null +++ b/python/ray/rllib/models/README.txt @@ -0,0 +1 @@ +Shared neural network models for RLlib. diff --git a/python/ray/rllib/models/__init__.py b/python/ray/rllib/models/__init__.py new file mode 100644 index 000000000..7ed71e5e6 --- /dev/null +++ b/python/ray/rllib/models/__init__.py @@ -0,0 +1,3 @@ +from ray.rllib.models.catalog import ModelCatalog + +__all__ = ["ModelCatalog"] diff --git a/python/ray/rllib/policy_gradient/distributions.py b/python/ray/rllib/models/action_dist.py similarity index 58% rename from python/ray/rllib/policy_gradient/distributions.py rename to python/ray/rllib/models/action_dist.py index 915531c51..d1dc8687f 100644 --- a/python/ray/rllib/policy_gradient/distributions.py +++ b/python/ray/rllib/models/action_dist.py @@ -6,16 +6,38 @@ import tensorflow as tf import numpy as np -class Categorical(object): - def __init__(self, logits): - self.logits = logits +class ActionDistribution(object): + """The policy action distribution of an agent. + + Args: + inputs (Tensor): The input vector to compute samples from. + """ + + def __init__(self, inputs): + self.inputs = inputs + + def logp(self, x): + raise NotImplementedError + + def kl(self, other): + raise NotImplementedError + + def entropy(self): + raise NotImplementedError + + def sample(self): + raise NotImplementedError + + +class Categorical(ActionDistribution): + """Categorical distribution for discrete action spaces.""" def logp(self, x): return -tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=self.logits, labels=x) + logits=self.inputs, labels=x) def entropy(self): - a0 = self.logits - tf.reduce_max(self.logits, reduction_indices=[1], + a0 = self.inputs - tf.reduce_max(self.inputs, reduction_indices=[1], keep_dims=True) ea0 = tf.exp(a0) z0 = tf.reduce_sum(ea0, reduction_indices=[1], keep_dims=True) @@ -23,9 +45,9 @@ class Categorical(object): return tf.reduce_sum(p0 * (tf.log(z0) - a0), reduction_indices=[1]) def kl(self, other): - a0 = self.logits - tf.reduce_max(self.logits, reduction_indices=[1], + a0 = self.inputs - tf.reduce_max(self.inputs, reduction_indices=[1], keep_dims=True) - a1 = other.logits - tf.reduce_max(other.logits, reduction_indices=[1], + a1 = other.inputs - tf.reduce_max(other.inputs, reduction_indices=[1], keep_dims=True) ea0 = tf.exp(a0) ea1 = tf.exp(a1) @@ -36,13 +58,19 @@ class Categorical(object): reduction_indices=[1]) def sample(self): - return tf.multinomial(self.logits, 1) + return tf.multinomial(self.inputs, 1)[0] -class DiagGaussian(object): - def __init__(self, flat): - self.flat = flat - mean, logstd = tf.split(flat, 2, axis=1) +class DiagGaussian(ActionDistribution): + """Action distribution where each vector element is a gaussian. + + The first half of the input vector defines the gaussian means, and the + second half the gaussian standard deviations. + """ + + def __init__(self, inputs): + ActionDistribution.__init__(self, inputs) + mean, logstd = tf.split(inputs, 2, axis=1) self.mean = mean self.logstd = logstd self.std = tf.exp(logstd) @@ -67,3 +95,13 @@ class DiagGaussian(object): def sample(self): return self.mean + self.std * tf.random_normal(tf.shape(self.mean)) + + +class Deterministic(ActionDistribution): + """Action distribution that returns the input values directly. + + This is similar to DiagGaussian with standard deviation zero. + """ + + def sample(self): + return self.inputs diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py new file mode 100644 index 000000000..5874c70cc --- /dev/null +++ b/python/ray/rllib/models/catalog.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import gym + +from ray.rllib.models.action_dist import ( + Categorical, Deterministic, DiagGaussian) +from ray.rllib.models.fcnet import FullyConnectedNetwork +from ray.rllib.models.visionnet import VisionNetwork + + +class ModelCatalog(object): + """Registry of default models and action distributions for envs. + + Example: + dist_class, dist_dim = ModelCatalog.get_action_dist(env.action_space) + model = ModelCatalog.get_model(inputs, dist_dim) + dist = dist_class(model.outputs) + action_op = dist.sample() + """ + + @staticmethod + def get_action_dist(action_space, dist_type=None): + """Returns action distribution class and size for the given action space. + + Args: + action_space (Space): Action space of the target gym env. + + Returns: + dist_class (ActionDistribution): Python class of the distribution. + dist_dim (int): The size of the input vector to the distribution. + """ + + if isinstance(action_space, gym.spaces.Box): + if dist_type is None: + return DiagGaussian, action_space.shape[0] * 2 + elif dist_type == 'deterministic': + return Deterministic, action_space.shape[0] + elif isinstance(action_space, gym.spaces.Discrete): + return Categorical, action_space.n + + raise NotImplementedError( + "Unsupported args: {} {}".format(action_space, dist_type)) + + @staticmethod + def get_model(inputs, num_outputs): + """Returns a suitable model conforming to given input and output specs. + + Args: + inputs (Tensor): The input tensor to the model. + num_outputs (int): The size of the output vector of the model. + + Returns: + model (Model): Neural network model. + """ + + obs_rank = len(inputs.get_shape()) - 1 + + if obs_rank > 1: + return VisionNetwork(inputs, num_outputs) + + return FullyConnectedNetwork(inputs, num_outputs) + + @staticmethod + def get_preprocessor(env_name): + """Returns a suitable processor for the given environment. + + Args: + env_name (str): The name of the environment. + + Returns: + preprocessor (Preprocessor): Preprocessor for the env observations. + """ + + raise NotImplementedError diff --git a/python/ray/rllib/models/fcnet.py b/python/ray/rllib/models/fcnet.py new file mode 100644 index 000000000..9237bbb67 --- /dev/null +++ b/python/ray/rllib/models/fcnet.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import tensorflow.contrib.slim as slim + +import numpy as np + +from ray.rllib.models.model import Model + + +def normc_initializer(std=1.0): + def _initializer(shape, dtype=None, partition_info=None): + out = np.random.randn(*shape).astype(np.float32) + out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True)) + return tf.constant(out) + return _initializer + + +class FullyConnectedNetwork(Model): + """Generic fully connected network.""" + + def _init(self, inputs, num_outputs): + with tf.name_scope("fc_net"): + fc1 = slim.fully_connected( + inputs, 256, weights_initializer=normc_initializer(1.0), + activation_fn=tf.nn.tanh, + scope="fc1") + fc2 = slim.fully_connected( + fc1, 256, weights_initializer=normc_initializer(1.0), + activation_fn=tf.nn.tanh, + scope="fc2") + fc3 = slim.fully_connected( + fc2, num_outputs, weights_initializer=normc_initializer(0.01), + activation_fn=None, scope="fc3") + return fc3 diff --git a/python/ray/rllib/models/model.py b/python/ray/rllib/models/model.py new file mode 100644 index 000000000..483923c82 --- /dev/null +++ b/python/ray/rllib/models/model.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +class Model(object): + """Defines an abstract network model for use with RLlib. + + Models convert input tensors to a number of output features. These features + can then be interpreted by ActionDistribution classes to determine + e.g. agent action values. + """ + + def __init__(self, inputs, num_outputs): + self.inputs = inputs + self.outputs = self._init(inputs, num_outputs) + + def _init(self): + """Initializes the model given self.inputs and self.num_outputs.""" + raise NotImplementedError + + def inputs(self): + """Returns the input placeholder for this model.""" + return self.inputs + + def outputs(self): + """Returns the output tensor of this model.""" + return self.outputs diff --git a/python/ray/rllib/models/preprocessor.py b/python/ray/rllib/models/preprocessor.py new file mode 100644 index 000000000..a9bd75a3d --- /dev/null +++ b/python/ray/rllib/models/preprocessor.py @@ -0,0 +1,14 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +# TODO(ekl) implement common preprocessors +class Preprocessor(object): + def output_shape(self): + """Returns the new output shape, or None if unchanged.""" + raise NotImplementedError + + def preprocess(self, observation): + """Returns the preprocessed observation.""" + raise NotImplementedError diff --git a/python/ray/rllib/models/visionnet.py b/python/ray/rllib/models/visionnet.py new file mode 100644 index 000000000..3d4c43132 --- /dev/null +++ b/python/ray/rllib/models/visionnet.py @@ -0,0 +1,22 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf +import tensorflow.contrib.slim as slim + +from ray.rllib.models.model import Model + + +class VisionNetwork(Model): + """Generic vision network.""" + + def _init(self, inputs, num_outputs): + with tf.name_scope("vision_net"): + conv1 = slim.conv2d(inputs, 16, [8, 8], 4, scope="conv1") + conv2 = slim.conv2d(conv1, 32, [4, 4], 2, scope="conv2") + fc1 = slim.conv2d( + conv2, 512, [10, 10], padding="VALID", scope="fc1") + fc2 = slim.conv2d(fc1, num_outputs, [1, 1], activation_fn=None, + normalizer_fn=None, scope="fc2") + return tf.squeeze(fc2, [1, 2]) diff --git a/python/ray/rllib/policy_gradient/agent.py b/python/ray/rllib/policy_gradient/agent.py index ef1499c0b..85de0d06f 100644 --- a/python/ray/rllib/policy_gradient/agent.py +++ b/python/ray/rllib/policy_gradient/agent.py @@ -11,7 +11,7 @@ from tensorflow.python import debug as tf_debug import ray from ray.rllib.parallel import LocalSyncParallelOptimizer -from ray.rllib.policy_gradient.distributions import Categorical, DiagGaussian +from ray.rllib.models import ModelCatalog from ray.rllib.policy_gradient.env import BatchedEnv from ray.rllib.policy_gradient.loss import ProximalPolicyLoss from ray.rllib.policy_gradient.filter import MeanStdFilter @@ -33,8 +33,8 @@ class Agent(object): network weights. When run as a remote agent, only this graph is used. """ - def __init__(self, name, batchsize, preprocessor, config, logdir, - is_remote): + def __init__( + self, name, batchsize, preprocessor, config, logdir, is_remote): if is_remote: os.environ["CUDA_VISIBLE_DEVICES"] = "" devices = ["/cpu:0"] @@ -54,37 +54,30 @@ class Agent(object): self.sess = tf.Session(config=config_proto) if config["use_tf_debugger"] and not is_remote: self.sess = tf_debug.LocalCLIDebugWrapperSession(self.sess) - self.sess.add_tensor_filter("has_inf_or_nan", - tf_debug.has_inf_or_nan) + self.sess.add_tensor_filter( + "has_inf_or_nan", tf_debug.has_inf_or_nan) # Defines the training inputs. - self.kl_coeff = tf.placeholder(name="newkl", shape=(), - dtype=tf.float32) - self.observations = tf.placeholder(tf.float32, - shape=(None,) + preprocessor.shape) + self.kl_coeff = tf.placeholder( + name="newkl", shape=(), dtype=tf.float32) + self.observations = tf.placeholder( + tf.float32, shape=(None,) + preprocessor.shape) self.advantages = tf.placeholder(tf.float32, shape=(None,)) action_space = self.env.action_space if isinstance(action_space, gym.spaces.Box): - # The first half of the dimensions are the means, the second half - # are the standard deviations. - self.action_dim = action_space.shape[0] - self.action_shape = (self.action_dim,) - self.logit_dim = 2 * self.action_dim - self.actions = tf.placeholder(tf.float32, - shape=(None, self.action_dim)) - self.distribution_class = DiagGaussian + self.actions = tf.placeholder( + tf.float32, shape=(None, action_space.shape[0])) elif isinstance(action_space, gym.spaces.Discrete): - self.action_dim = action_space.n - self.action_shape = () - self.logit_dim = self.action_dim self.actions = tf.placeholder(tf.int64, shape=(None,)) - self.distribution_class = Categorical else: - raise NotImplemented("action space" + str(type(action_space)) + - "currently not supported") - self.prev_logits = tf.placeholder(tf.float32, - shape=(None, self.logit_dim)) + raise NotImplemented( + "action space" + str(type(action_space)) + + "currently not supported") + self.distribution_class, self.logit_dim = ModelCatalog.get_action_dist( + action_space) + self.prev_logits = tf.placeholder( + tf.float32, shape=(None, self.logit_dim)) assert config["sgd_batchsize"] % len(devices) == 0, \ "Batch size must be evenly divisible by devices" @@ -99,7 +92,8 @@ class Agent(object): return ProximalPolicyLoss( self.env.observation_space, self.env.action_space, obs, advs, acts, plog, self.logit_dim, - self.kl_coeff, self.distribution_class, self.config, self.sess) + self.kl_coeff, self.distribution_class, self.config, + self.sess) self.par_opt = LocalSyncParallelOptimizer( tf.train.AdamOptimizer(self.config["sgd_stepsize"]), @@ -118,14 +112,13 @@ class Agent(object): self.mean_kl = tf.reduce_mean( tf.stack(values=[policy.mean_kl for policy in policies]), 0) self.mean_entropy = tf.reduce_mean( - tf.stack(values=[policy.mean_entropy for policy in policies]), - 0) + tf.stack( + values=[policy.mean_entropy for policy in policies]), 0) # References to the model weights self.common_policy = self.par_opt.get_common_loss() self.variables = ray.experimental.TensorFlowVariables( - self.common_policy.loss, - self.sess) + self.common_policy.loss, self.sess) self.observation_filter = MeanStdFilter(preprocessor.shape, clip=None) self.reward_filter = MeanStdFilter((), clip=5.0) self.sess.run(tf.global_variables_initializer()) @@ -139,8 +132,8 @@ class Agent(object): trajectories["logprobs"]], full_trace=full_trace) - def run_sgd_minibatch(self, batch_index, kl_coeff, full_trace, - file_writer): + def run_sgd_minibatch( + self, batch_index, kl_coeff, full_trace, file_writer): return self.par_opt.optimize( self.sess, batch_index, diff --git a/python/ray/rllib/policy_gradient/env.py b/python/ray/rllib/policy_gradient/env.py index 74795b8f4..7dcaf2f9b 100644 --- a/python/ray/rllib/policy_gradient/env.py +++ b/python/ray/rllib/policy_gradient/env.py @@ -55,8 +55,7 @@ class BatchedEnv(object): observations.append(np.zeros(self.shape)) rewards.append(0.0) continue - observation, reward, done, info = self.envs[i].step( - action if len(action) > 1 else action[0]) + observation, reward, done, info = self.envs[i].step(action) if render: self.envs[0].render() observations.append(self.preprocessor(observation)) diff --git a/python/ray/rllib/policy_gradient/loss.py b/python/ray/rllib/policy_gradient/loss.py index 211128727..ef40f577a 100644 --- a/python/ray/rllib/policy_gradient/loss.py +++ b/python/ray/rllib/policy_gradient/loss.py @@ -4,8 +4,8 @@ from __future__ import print_function import gym.spaces import tensorflow as tf -from ray.rllib.policy_gradient.models.visionnet import vision_net -from ray.rllib.policy_gradient.models.fcnet import fc_net + +from ray.rllib.models import ModelCatalog class ProximalPolicyLoss(object): @@ -21,11 +21,8 @@ class ProximalPolicyLoss(object): # Saved so that we can compute actions given different observations self.observations = observations - if len(observation_space.shape) > 1: - self.curr_logits = vision_net(observations, num_classes=logit_dim) - else: - assert len(observation_space.shape) == 1 - self.curr_logits = fc_net(observations, num_classes=logit_dim) + self.curr_logits = ModelCatalog.get_model( + observations, logit_dim).outputs self.curr_dist = distribution_class(self.curr_logits) self.sampler = self.curr_dist.sample() diff --git a/python/ray/rllib/policy_gradient/models/__init__.py b/python/ray/rllib/policy_gradient/models/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/ray/rllib/policy_gradient/models/fcnet.py b/python/ray/rllib/policy_gradient/models/fcnet.py deleted file mode 100644 index 057b2004d..000000000 --- a/python/ray/rllib/policy_gradient/models/fcnet.py +++ /dev/null @@ -1,38 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -import tensorflow.contrib.slim as slim - -import numpy as np - - -def normc_initializer(std=1.0): - def _initializer(shape, dtype=None, partition_info=None): - out = np.random.randn(*shape).astype(np.float32) - out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True)) - return tf.constant(out) - return _initializer - - -def fc_net(inputs, num_classes=10, logstd=False): - with tf.name_scope("fc_net"): - fc1 = slim.fully_connected(inputs, 128, - weights_initializer=normc_initializer(1.0), - scope="fc1") - fc2 = slim.fully_connected(fc1, 128, - weights_initializer=normc_initializer(1.0), - scope="fc2") - fc3 = slim.fully_connected(fc2, 128, - weights_initializer=normc_initializer(1.0), - scope="fc3") - fc4 = slim.fully_connected(fc3, num_classes, - weights_initializer=normc_initializer(0.01), - activation_fn=None, scope="fc4") - if logstd: - logstd = tf.get_variable(name="logstd", shape=[num_classes], - initializer=tf.zeros_initializer) - return tf.concat(1, [fc4, logstd]) - else: - return fc4 diff --git a/python/ray/rllib/policy_gradient/models/visionnet.py b/python/ray/rllib/policy_gradient/models/visionnet.py deleted file mode 100644 index 2b03b2ff7..000000000 --- a/python/ray/rllib/policy_gradient/models/visionnet.py +++ /dev/null @@ -1,16 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -import tensorflow.contrib.slim as slim - - -def vision_net(inputs, num_classes=10): - with tf.name_scope("vision_net"): - conv1 = slim.conv2d(inputs, 16, [8, 8], 4, scope="conv1") - conv2 = slim.conv2d(conv1, 32, [4, 4], 2, scope="conv2") - fc1 = slim.conv2d(conv2, 512, [10, 10], padding="VALID", scope="fc1") - fc2 = slim.conv2d(fc1, num_classes, [1, 1], activation_fn=None, - normalizer_fn=None, scope="fc2") - return tf.squeeze(fc2, [1, 2]) diff --git a/python/ray/rllib/policy_gradient/policy_gradient.py b/python/ray/rllib/policy_gradient/policy_gradient.py index 760ca5320..df2e2b0ee 100644 --- a/python/ray/rllib/policy_gradient/policy_gradient.py +++ b/python/ray/rllib/policy_gradient/policy_gradient.py @@ -48,8 +48,7 @@ class PolicyGradient(Algorithm): Algorithm.__init__(self, env_name, config, upload_dir=upload_dir) - # TODO(ekl): The preprocessor should be associated with the env - # elsewhere. + # TODO(ekl): preprocessor should be associated with the env elsewhere if self.env_name == "Pong-v0": preprocessor = AtariPixelPreprocessor() elif self.env_name == "Pong-ram-v3": @@ -58,6 +57,8 @@ class PolicyGradient(Algorithm): preprocessor = NoPreprocessor() elif self.env_name == "Walker2d-v1": preprocessor = NoPreprocessor() + elif self.env_name == "Humanoid-v1": + preprocessor = NoPreprocessor() else: preprocessor = AtariPixelPreprocessor() @@ -93,8 +94,8 @@ class PolicyGradient(Algorithm): if config["model_checkpoint_file"]: checkpoint_path = saver.save( model.sess, - os.path.join(self.logdir, - config["model_checkpoint_file"] % j)) + os.path.join( + self.logdir, config["model_checkpoint_file"] % j)) print("Checkpoint saved in file: %s" % checkpoint_path) checkpointing_end = time.time() weights = ray.put(model.get_weights()) @@ -135,8 +136,8 @@ class PolicyGradient(Algorithm): for i in range(config["num_sgd_iter"]): sgd_start = time.time() batch_index = 0 - num_batches = (int(tuples_per_device) // - int(model.per_device_batch_size)) + num_batches = ( + int(tuples_per_device) // int(model.per_device_batch_size)) loss, kl, entropy = [], [], [] permutation = np.random.permutation(num_batches) while batch_index < num_batches: @@ -155,8 +156,8 @@ class PolicyGradient(Algorithm): kl = np.mean(kl) entropy = np.mean(entropy) sgd_end = time.time() - print("{:>15}{:15.5e}{:15.5e}{:15.5e}".format(i, loss, kl, - entropy)) + print( + "{:>15}{:15.5e}{:15.5e}{:15.5e}".format(i, loss, kl, entropy)) values = [] if i == config["num_sgd_iter"] - 1: diff --git a/python/ray/rllib/policy_gradient/test/test.py b/python/ray/rllib/policy_gradient/test/test.py index 325ee437f..a9ff78d4e 100644 --- a/python/ray/rllib/policy_gradient/test/test.py +++ b/python/ray/rllib/policy_gradient/test/test.py @@ -7,11 +7,12 @@ import numpy as np import tensorflow as tf from numpy.testing import assert_allclose -from ray.rllib.policy_gradient.distributions import Categorical +from ray.rllib.models.action_dist import Categorical from ray.rllib.policy_gradient.utils import flatten, concatenate -class DistibutionsTest(unittest.TestCase): +# TODO(ekl): move to rllib/models dir +class DistributionsTest(unittest.TestCase): def testCategorical(self): num_samples = 100000