diff --git a/.gitignore b/.gitignore index 7bbc71c..72c7d1b 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,7 @@ ENV/ # mypy .mypy_cache/ + +.DS_Store +.idea +log* diff --git a/README.md b/README.md index 669c4e8..3c828b7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,76 @@ # Run-Skeleton-Run -Reason8.ai PyTorch solution for NIPS RL 2017 challenge +[Reason8.ai](https://reason8.ai) PyTorch solution for 3rd place [NIPS RL 2017 challenge](https://www.crowdai.org/challenges/nips-2017-learning-to-run/leaderboards?challenge_round_id=12). + +Additional thanks to [Michail Pavlov](https://github.com/fgvbrt) for collaboration. + +## Agent policies + +### no-flip-state-action + +![Alt Text](http://www.sheawong.com/wp-content/uploads/2013/08/keephatin.gif) + +![alt text](https://github.com/Scitator/Run-Skeleton-Run/blob/master/gifs/noflip.gif) + +### flip-state-action + +![alt text](https://github.com/Scitator/Run-Skeleton-Run/blob/master/gifs/flip.gif) + + +## How to setup environment? + +1. `sh setup_conda.sh` +2. `source activate opensim-rl` + +Would like to test baselines? (Need MPI support) +3. `sudo apt-get install openmpi-bin openmpi-doc libopenmpi-dev` +3+. `sh setup_env_mpi.sh` + +OR like DDPG agents? +3. `sh setup_env.sh` + +4. Congrats! Now you are ready to check our agents. + + +## Run DDPG agent + +``` +CUDA_VISIBLE_DEVICES="" PYTHONPATH=. python ddpg/train.py \ + --logdir ./logs_ddpg \ + --num-threads 4 \ + --ddpg-wrapper \ + --skip-frames 5 \ + --fail-reward -0.2 \ + --reward-scale 10 \ + --flip-state-action \ + --actor-layers 64-64 --actor-layer-norm --actor-parameters-noise \ + --actor-lr 0.001 --actor-lr-end 0.00001 \ + --critic-layers 64-32 --critic-layer-norm \ + --critic-lr 0.002 --critic-lr-end 0.00001 \ + --initial-epsilon 0.5 --final-epsilon 0.001 \ + --tau 0.0001 +``` + + +## Evaluate DDPG agent + +``` +CUDA_VISIBLE_DEVICES="" PYTHONPATH=./ python ddpg/submit.py \ + --restore-actor-from ./logs_ddpg/actor_state_dict.pkl \ + --restore-critic-from ./logs_ddpg/critic_state_dict.pkl \ + --restore-args-from ./logs_ddpg/args.json \ + --num-episodes 10 + +``` + + +## Run TRPO/PPO agent + +``` +CUDA_VISIBLE_DEVICES="" PYTHONPATH=. python ddpg/train.py \ + --agent ppo \ + --logdir ./logs_baseline \ + --baseline-wrapper \ + --skip-frames 5 \ + --fail-reward -0.2 \ + --reward-scale 10 +``` diff --git a/baselines/__init__.py b/baselines/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/baselines/baselines_common/__init__.py b/baselines/baselines_common/__init__.py new file mode 100644 index 0000000..a07d7df --- /dev/null +++ b/baselines/baselines_common/__init__.py @@ -0,0 +1,4 @@ +from baselines.baselines_common.console_util import * +from baselines.baselines_common.dataset import Dataset +from baselines.baselines_common.math_util import * +from baselines.baselines_common.misc_util import * diff --git a/baselines/baselines_common/cg.py b/baselines/baselines_common/cg.py new file mode 100644 index 0000000..59fda0e --- /dev/null +++ b/baselines/baselines_common/cg.py @@ -0,0 +1,38 @@ +import numpy as np + + +def cg(f_Ax, b, cg_iters=10, callback=None, verbose=False, residual_tol=1e-10): + """ + Demmel p 312 + """ + p = b.copy() + r = b.copy() + x = np.zeros_like(b) + rdotr = r.dot(r) + + fmtstr = "%10i %10.3g %10.3g" + titlestr = "%10s %10s %10s" + if verbose: + print(titlestr % ("iter", "residual norm", "soln norm")) + + for i in range(cg_iters): + if callback is not None: + callback(x) + if verbose: print(fmtstr % (i, rdotr, np.linalg.norm(x))) + z = f_Ax(p) + v = rdotr / p.dot(z) + x += v * p + r -= v * z + newrdotr = r.dot(r) + mu = newrdotr / rdotr + p = r + mu * p + + rdotr = newrdotr + if rdotr < residual_tol: + break + + if callback is not None: + callback(x) + if verbose: + print(fmtstr % (i + 1, rdotr, np.linalg.norm(x))) # pylint: disable=W0631 + return x diff --git a/baselines/baselines_common/console_util.py b/baselines/baselines_common/console_util.py new file mode 100644 index 0000000..6def0c9 --- /dev/null +++ b/baselines/baselines_common/console_util.py @@ -0,0 +1,62 @@ +from __future__ import print_function +from contextlib import contextmanager +import numpy as np +import time + + +# ================================================================ +# Misc +# ================================================================ + +def fmt_row(width, row, header=False): + out = " | ".join(fmt_item(x, width) for x in row) + if header: out = out + "\n" + "-" * len(out) + return out + + +def fmt_item(x, l): + if isinstance(x, np.ndarray): + assert x.ndim == 0 + x = x.item() + if isinstance(x, float): + rep = "%g" % x + else: + rep = str(x) + return " " * (l - len(rep)) + rep + + +color2num = dict( + gray=30, + red=31, + green=32, + yellow=33, + blue=34, + magenta=35, + cyan=36, + white=37, + crimson=38 +) + + +def colorize(string, color, bold=False, highlight=False): + attr = [] + num = color2num[color] + if highlight: num += 10 + attr.append(str(num)) + if bold: attr.append('1') + return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string) + + +MESSAGE_DEPTH = 0 + + +@contextmanager +def timed(msg): + global MESSAGE_DEPTH # pylint: disable=W0603 + print(colorize('\t' * MESSAGE_DEPTH + '=: ' + msg, color='magenta')) + tstart = time.time() + MESSAGE_DEPTH += 1 + yield + MESSAGE_DEPTH -= 1 + print(colorize('\t' * MESSAGE_DEPTH + "done in %.3f seconds" % (time.time() - tstart), + color='magenta')) diff --git a/baselines/baselines_common/dataset.py b/baselines/baselines_common/dataset.py new file mode 100644 index 0000000..85b5e55 --- /dev/null +++ b/baselines/baselines_common/dataset.py @@ -0,0 +1,63 @@ +import numpy as np + + +class Dataset(object): + def __init__(self, data_map, deterministic=False, shuffle=True): + self.data_map = data_map + self.deterministic = deterministic + self.enable_shuffle = shuffle + self.n = next(iter(data_map.values())).shape[0] + self._next_id = 0 + self.shuffle() + + def shuffle(self): + if self.deterministic: + return + perm = np.arange(self.n) + np.random.shuffle(perm) + + for key in self.data_map: + self.data_map[key] = self.data_map[key][perm] + + self._next_id = 0 + + def next_batch(self, batch_size): + if self._next_id >= self.n and self.enable_shuffle: + self.shuffle() + + cur_id = self._next_id + cur_batch_size = min(batch_size, self.n - self._next_id) + self._next_id += cur_batch_size + + data_map = dict() + for key in self.data_map: + data_map[key] = self.data_map[key][cur_id:cur_id + cur_batch_size] + return data_map + + def iterate_once(self, batch_size): + if self.enable_shuffle: self.shuffle() + + while self._next_id <= self.n - batch_size: + yield self.next_batch(batch_size) + self._next_id = 0 + + def subset(self, num_elements, deterministic=True): + data_map = dict() + for key in self.data_map: + data_map[key] = self.data_map[key][:num_elements] + return Dataset(data_map, deterministic) + + +def iterbatches(arrays, *, num_batches=None, batch_size=None, shuffle=True, + include_final_partial_batch=True): + assert (num_batches is None) != ( + batch_size is None), 'Provide num_batches or batch_size, but not both' + arrays = tuple(map(np.asarray, arrays)) + n = arrays[0].shape[0] + assert all(a.shape[0] == n for a in arrays[1:]) + inds = np.arange(n) + if shuffle: np.random.shuffle(inds) + sections = np.arange(0, n, batch_size)[1:] if num_batches is None else num_batches + for batch_inds in np.array_split(inds, sections): + if include_final_partial_batch or len(batch_inds) == batch_size: + yield tuple(a[batch_inds] for a in arrays) diff --git a/baselines/baselines_common/distributions.py b/baselines/baselines_common/distributions.py new file mode 100644 index 0000000..1d42559 --- /dev/null +++ b/baselines/baselines_common/distributions.py @@ -0,0 +1,377 @@ +import tensorflow as tf +import numpy as np +import baselines.baselines_common.tf_util as U +from tensorflow.python.ops import math_ops +from tensorflow.python.ops import nn + + +class Pd(object): + """ + A particular probability distribution + """ + + def flatparam(self): + raise NotImplementedError + + def mode(self): + raise NotImplementedError + + def neglogp(self, x): + # Usually it's easier to define the negative logprob + raise NotImplementedError + + def kl(self, other): + raise NotImplementedError + + def entropy(self): + raise NotImplementedError + + def sample(self): + raise NotImplementedError + + def logp(self, x): + return - self.neglogp(x) + + +class PdType(object): + """ + Parametrized family of probability distributions + """ + + def pdclass(self): + raise NotImplementedError + + def pdfromflat(self, flat): + return self.pdclass()(flat) + + def param_shape(self): + raise NotImplementedError + + def sample_shape(self): + raise NotImplementedError + + def sample_dtype(self): + raise NotImplementedError + + def param_placeholder(self, prepend_shape, name=None): + return tf.placeholder(dtype=tf.float32, shape=prepend_shape + self.param_shape(), name=name) + + def sample_placeholder(self, prepend_shape, name=None): + return tf.placeholder(dtype=self.sample_dtype(), shape=prepend_shape + self.sample_shape(), + name=name) + + +class CategoricalPdType(PdType): + def __init__(self, ncat): + self.ncat = ncat + + def pdclass(self): + return CategoricalPd + + def param_shape(self): + return [self.ncat] + + def sample_shape(self): + return [] + + def sample_dtype(self): + return tf.int32 + + +class MultiCategoricalPdType(PdType): + def __init__(self, low, high): + self.low = low + self.high = high + self.ncats = high - low + 1 + + def pdclass(self): + return MultiCategoricalPd + + def pdfromflat(self, flat): + return MultiCategoricalPd(self.low, self.high, flat) + + def param_shape(self): + return [sum(self.ncats)] + + def sample_shape(self): + return [len(self.ncats)] + + def sample_dtype(self): + return tf.int32 + + +class DiagGaussianPdType(PdType): + def __init__(self, size): + self.size = size + + def pdclass(self): + return DiagGaussianPd + + def param_shape(self): + return [2 * self.size] + + def sample_shape(self): + return [self.size] + + def sample_dtype(self): + return tf.float32 + + +class BernoulliPdType(PdType): + def __init__(self, size): + self.size = size + + def pdclass(self): + return BernoulliPd + + def param_shape(self): + return [self.size] + + def sample_shape(self): + return [self.size] + + def sample_dtype(self): + return tf.int32 + + +# WRONG SECOND DERIVATIVES +# class CategoricalPd(Pd): +# def __init__(self, logits): +# self.logits = logits +# self.ps = tf.nn.softmax(logits) +# @classmethod +# def fromflat(cls, flat): +# return cls(flat) +# def flatparam(self): +# return self.logits +# def mode(self): +# return U.argmax(self.logits, axis=-1) +# def logp(self, x): +# return -tf.nn.sparse_softmax_cross_entropy_with_logits(self.logits, x) +# def kl(self, other): +# return tf.nn.softmax_cross_entropy_with_logits(other.logits, self.ps) \ +# - tf.nn.softmax_cross_entropy_with_logits(self.logits, self.ps) +# def entropy(self): +# return tf.nn.softmax_cross_entropy_with_logits(self.logits, self.ps) +# def sample(self): +# u = tf.random_uniform(tf.shape(self.logits)) +# return U.argmax(self.logits - tf.log(-tf.log(u)), axis=-1) + +class CategoricalPd(Pd): + def __init__(self, logits): + self.logits = logits + + def flatparam(self): + return self.logits + + def mode(self): + return U.argmax(self.logits, axis=-1) + + def neglogp(self, x): + # return tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logits, labels=x) + # Note: we can't use sparse_softmax_cross_entropy_with_logits because + # the implementation does not allow second-order derivatives... + one_hot_actions = tf.one_hot(x, self.logits.get_shape().as_list()[-1]) + return tf.nn.softmax_cross_entropy_with_logits( + logits=self.logits, + labels=one_hot_actions) + + def kl(self, other): + a0 = self.logits - U.max(self.logits, axis=-1, keepdims=True) + a1 = other.logits - U.max(other.logits, axis=-1, keepdims=True) + ea0 = tf.exp(a0) + ea1 = tf.exp(a1) + z0 = U.sum(ea0, axis=-1, keepdims=True) + z1 = U.sum(ea1, axis=-1, keepdims=True) + p0 = ea0 / z0 + return U.sum(p0 * (a0 - tf.log(z0) - a1 + tf.log(z1)), axis=-1) + + def entropy(self): + a0 = self.logits - U.max(self.logits, axis=-1, keepdims=True) + ea0 = tf.exp(a0) + z0 = U.sum(ea0, axis=-1, keepdims=True) + p0 = ea0 / z0 + return U.sum(p0 * (tf.log(z0) - a0), axis=-1) + + def sample(self): + u = tf.random_uniform(tf.shape(self.logits)) + return tf.argmax(self.logits - tf.log(-tf.log(u)), axis=-1) + + @classmethod + def fromflat(cls, flat): + return cls(flat) + + +class MultiCategoricalPd(Pd): + def __init__(self, low, high, flat): + self.flat = flat + self.low = tf.constant(low, dtype=tf.int32) + self.categoricals = list( + map(CategoricalPd, tf.split(flat, high - low + 1, axis=len(flat.get_shape()) - 1))) + + def flatparam(self): + return self.flat + + def mode(self): + return self.low + tf.cast(tf.stack([p.mode() for p in self.categoricals], axis=-1), + tf.int32) + + def neglogp(self, x): + return tf.add_n([p.neglogp(px) for p, px in zip( + self.categoricals, tf.unstack(x - self.low, + axis=len(x.get_shape()) - 1))]) + + def kl(self, other): + return tf.add_n([ + p.kl(q) for p, q in zip(self.categoricals, other.categoricals) + ]) + + def entropy(self): + return tf.add_n([p.entropy() for p in self.categoricals]) + + def sample(self): + return self.low + tf.cast(tf.stack([p.sample() for p in self.categoricals], axis=-1), + tf.int32) + + @classmethod + def fromflat(cls, flat): + raise NotImplementedError + + +class DiagGaussianPd(Pd): + def __init__(self, flat): + self.flat = flat + mean, logstd = tf.split(axis=len(flat.shape) - 1, num_or_size_splits=2, value=flat) + self.mean = mean + self.logstd = logstd + self.std = tf.exp(logstd) + + def flatparam(self): + return self.flat + + def mode(self): + return self.mean + + def neglogp(self, x): + return 0.5 * U.sum(tf.square((x - self.mean) / self.std), axis=-1) \ + + 0.5 * np.log(2.0 * np.pi) * tf.to_float(tf.shape(x)[-1]) \ + + U.sum(self.logstd, axis=-1) + + def kl(self, other): + assert isinstance(other, DiagGaussianPd) + return U.sum(other.logstd - self.logstd + ( + tf.square(self.std) + tf.square(self.mean - other.mean)) / ( + 2.0 * tf.square(other.std)) - 0.5, axis=-1) + + def entropy(self): + return U.sum(self.logstd + .5 * np.log(2.0 * np.pi * np.e), axis=-1) + + def sample(self): + return self.mean + self.std * tf.random_normal(tf.shape(self.mean)) + + @classmethod + def fromflat(cls, flat): + return cls(flat) + + +class BernoulliPd(Pd): + def __init__(self, logits): + self.logits = logits + self.ps = tf.sigmoid(logits) + + def flatparam(self): + return self.logits + + def mode(self): + return tf.round(self.ps) + + def neglogp(self, x): + return U.sum( + tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=tf.to_float(x)), + axis=-1) + + def kl(self, other): + return U.sum(tf.nn.sigmoid_cross_entropy_with_logits(logits=other.logits, labels=self.ps), + axis=-1) - U.sum( + tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=self.ps), axis=-1) + + def entropy(self): + return U.sum(tf.nn.sigmoid_cross_entropy_with_logits(logits=self.logits, labels=self.ps), + axis=-1) + + def sample(self): + u = tf.random_uniform(tf.shape(self.ps)) + return tf.to_float(math_ops.less(u, self.ps)) + + @classmethod + def fromflat(cls, flat): + return cls(flat) + + +def make_pdtype(ac_space): + from gym import spaces + if isinstance(ac_space, spaces.Box): + assert len(ac_space.shape) == 1 + return DiagGaussianPdType(ac_space.shape[0]) + elif isinstance(ac_space, spaces.Discrete): + return CategoricalPdType(ac_space.n) + elif isinstance(ac_space, spaces.MultiDiscrete): + return MultiCategoricalPdType(ac_space.low, ac_space.high) + elif isinstance(ac_space, spaces.MultiBinary): + return BernoulliPdType(ac_space.n) + else: + raise NotImplementedError + + +def shape_el(v, i): + maybe = v.get_shape()[i] + if maybe is not None: + return maybe + else: + return tf.shape(v)[i] + + +@U.in_session +def test_probtypes(): + np.random.seed(0) + + pdparam_diag_gauss = np.array([-.2, .3, .4, -.5, .1, -.5, .1, 0.8]) + diag_gauss = DiagGaussianPdType(pdparam_diag_gauss.size // 2) # pylint: disable=E1101 + validate_probtype(diag_gauss, pdparam_diag_gauss) + + pdparam_categorical = np.array([-.2, .3, .5]) + categorical = CategoricalPdType(pdparam_categorical.size) # pylint: disable=E1101 + validate_probtype(categorical, pdparam_categorical) + + pdparam_bernoulli = np.array([-.2, .3, .5]) + bernoulli = BernoulliPdType(pdparam_bernoulli.size) # pylint: disable=E1101 + validate_probtype(bernoulli, pdparam_bernoulli) + + +def validate_probtype(probtype, pdparam): + N = 100000 + # Check to see if mean negative log likelihood == differential entropy + Mval = np.repeat(pdparam[None, :], N, axis=0) + M = probtype.param_placeholder([N]) + X = probtype.sample_placeholder([N]) + pd = probtype.pdclass()(M) + calcloglik = U.function([X, M], pd.logp(X)) + calcent = U.function([M], pd.entropy()) + Xval = U.eval(pd.sample(), feed_dict={M: Mval}) + logliks = calcloglik(Xval, Mval) + entval_ll = - logliks.mean() # pylint: disable=E1101 + entval_ll_stderr = logliks.std() / np.sqrt(N) # pylint: disable=E1101 + entval = calcent(Mval).mean() # pylint: disable=E1101 + assert np.abs(entval - entval_ll) < 3 * entval_ll_stderr # within 3 sigmas + + # Check to see if kldiv[p,q] = - ent[p] - E_p[log q] + M2 = probtype.param_placeholder([N]) + pd2 = probtype.pdclass()(M2) + q = pdparam + np.random.randn(pdparam.size) * 0.1 + Mval2 = np.repeat(q[None, :], N, axis=0) + calckl = U.function([M, M2], pd.kl(pd2)) + klval = calckl(Mval, Mval2).mean() # pylint: disable=E1101 + logliks = calcloglik(Xval, Mval2) + klval_ll = - entval - logliks.mean() # pylint: disable=E1101 + klval_ll_stderr = logliks.std() / np.sqrt(N) # pylint: disable=E1101 + assert np.abs(klval - klval_ll) < 3 * klval_ll_stderr # within 3 sigmas diff --git a/baselines/baselines_common/math_util.py b/baselines/baselines_common/math_util.py new file mode 100644 index 0000000..476927b --- /dev/null +++ b/baselines/baselines_common/math_util.py @@ -0,0 +1,92 @@ +import numpy as np +import scipy.signal + + +def discount(x, gamma): + """ + computes discounted sums along 0th dimension of x. + + inputs + ------ + x: ndarray + gamma: float + + outputs + ------- + y: ndarray with same shape as x, satisfying + + y[t] = x[t] + gamma*x[t+1] + gamma^2*x[t+2] + ... + gamma^k x[t+k], + where k = len(x) - t - 1 + + """ + assert x.ndim >= 1 + return scipy.signal.lfilter([1], [1, -gamma], x[::-1], axis=0)[::-1] + + +def explained_variance(ypred, y): + """ + Computes fraction of variance that ypred explains about y. + Returns 1 - Var[y-ypred] / Var[y] + + interpretation: + ev=0 => might as well have predicted zero + ev=1 => perfect prediction + ev<0 => worse than just predicting zero + + """ + assert y.ndim == 1 and ypred.ndim == 1 + vary = np.var(y) + return np.nan if vary == 0 else 1 - np.var(y - ypred) / vary + + +def explained_variance_2d(ypred, y): + assert y.ndim == 2 and ypred.ndim == 2 + vary = np.var(y, axis=0) + out = 1 - np.var(y - ypred) / vary + out[vary < 1e-10] = 0 + return out + + +def ncc(ypred, y): + return np.corrcoef(ypred, y)[1, 0] + + +def flatten_arrays(arrs): + return np.concatenate([arr.flat for arr in arrs]) + + +def unflatten_vector(vec, shapes): + i = 0 + arrs = [] + for shape in shapes: + size = np.prod(shape) + arr = vec[i:i + size].reshape(shape) + arrs.append(arr) + i += size + return arrs + + +def discount_with_boundaries(X, New, gamma): + """ + X: 2d array of floats, time x features + New: 2d array of bools, indicating when a new episode has started + """ + Y = np.zeros_like(X) + T = X.shape[0] + Y[T - 1] = X[T - 1] + for t in range(T - 2, -1, -1): + Y[t] = X[t] + gamma * Y[t + 1] * (1 - New[t + 1]) + return Y + + +def test_discount_with_boundaries(): + gamma = 0.9 + x = np.array([1.0, 2.0, 3.0, 4.0], 'float32') + starts = [1.0, 0.0, 0.0, 1.0] + y = discount_with_boundaries(x, starts, gamma) + assert np.allclose(y, [ + 1 + gamma * 2 + gamma ** 2 * 3, + 2 + gamma * 3, + 3, + 4 + ]) diff --git a/baselines/baselines_common/misc_util.py b/baselines/baselines_common/misc_util.py new file mode 100644 index 0000000..4e45ce7 --- /dev/null +++ b/baselines/baselines_common/misc_util.py @@ -0,0 +1,328 @@ +import gym +import numpy as np +import os +import pickle +import random +import tempfile +import time +import zipfile + + +def zipsame(*seqs): + L = len(seqs[0]) + assert all(len(seq) == L for seq in seqs[1:]) + return zip(*seqs) + + +def unpack(seq, sizes): + """ + Unpack 'seq' into a sequence of lists, with lengths specified by 'sizes'. + None = just one bare element, not a list + + Example: + unpack([1,2,3,4,5,6], [3,None,2]) -> ([1,2,3], 4, [5,6]) + """ + seq = list(seq) + it = iter(seq) + assert sum(1 if s is None else s for s in sizes) == len(seq), "Trying to unpack %s into %s" % (seq, sizes) + for size in sizes: + if size is None: + yield it.__next__() + else: + li = [] + for _ in range(size): + li.append(it.__next__()) + yield li + + +class EzPickle(object): + """Objects that are pickled and unpickled via their constructor + arguments. + + Example usage: + + class Dog(Animal, EzPickle): + def __init__(self, furcolor, tailkind="bushy"): + Animal.__init__() + EzPickle.__init__(furcolor, tailkind) + ... + + When this object is unpickled, a new Dog will be constructed by passing the provided + furcolor and tailkind into the constructor. However, philosophers are still not sure + whether it is still the same dog. + + This is generally needed only for environments which wrap C/C++ code, such as MuJoCo + and Atari. + """ + + def __init__(self, *args, **kwargs): + self._ezpickle_args = args + self._ezpickle_kwargs = kwargs + + def __getstate__(self): + return {"_ezpickle_args": self._ezpickle_args, "_ezpickle_kwargs": self._ezpickle_kwargs} + + def __setstate__(self, d): + out = type(self)(*d["_ezpickle_args"], **d["_ezpickle_kwargs"]) + self.__dict__.update(out.__dict__) + + +def set_global_seeds(i): + try: + import tensorflow as tf + except ImportError: + pass + else: + tf.set_random_seed(i) + np.random.seed(i) + random.seed(i) + + +def pretty_eta(seconds_left): + """Print the number of seconds in human readable format. + + Examples: + 2 days + 2 hours and 37 minutes + less than a minute + + Paramters + --------- + seconds_left: int + Number of seconds to be converted to the ETA + Returns + ------- + eta: str + String representing the pretty ETA. + """ + minutes_left = seconds_left // 60 + seconds_left %= 60 + hours_left = minutes_left // 60 + minutes_left %= 60 + days_left = hours_left // 24 + hours_left %= 24 + + def helper(cnt, name): + return "{} {}{}".format(str(cnt), name, ('s' if cnt > 1 else '')) + + if days_left > 0: + msg = helper(days_left, 'day') + if hours_left > 0: + msg += ' and ' + helper(hours_left, 'hour') + return msg + if hours_left > 0: + msg = helper(hours_left, 'hour') + if minutes_left > 0: + msg += ' and ' + helper(minutes_left, 'minute') + return msg + if minutes_left > 0: + return helper(minutes_left, 'minute') + return 'less than a minute' + + +class RunningAvg(object): + def __init__(self, gamma, init_value=None): + """Keep a running estimate of a quantity. This is a bit like mean + but more sensitive to recent changes. + + Parameters + ---------- + gamma: float + Must be between 0 and 1, where 0 is the most sensitive to recent + changes. + init_value: float or None + Initial value of the estimate. If None, it will be set on the first update. + """ + self._value = init_value + self._gamma = gamma + + def update(self, new_val): + """Update the estimate. + + Parameters + ---------- + new_val: float + new observated value of estimated quantity. + """ + if self._value is None: + self._value = new_val + else: + self._value = self._gamma * self._value + (1.0 - self._gamma) * new_val + + def __float__(self): + """Get the current estimate""" + return self._value + + +class SimpleMonitor(gym.Wrapper): + def __init__(self, env): + """Adds two qunatities to info returned by every step: + + num_steps: int + Number of steps takes so far + rewards: [float] + All the cumulative rewards for the episodes completed so far. + """ + super().__init__(env) + # current episode state + self._current_reward = None + self._num_steps = None + # temporary monitor state that we do not save + self._time_offset = None + self._total_steps = None + # monitor state + self._episode_rewards = [] + self._episode_lengths = [] + self._episode_end_times = [] + + def _reset(self): + obs = self.env.reset() + # recompute temporary state if needed + if self._time_offset is None: + self._time_offset = time.time() + if len(self._episode_end_times) > 0: + self._time_offset -= self._episode_end_times[-1] + if self._total_steps is None: + self._total_steps = sum(self._episode_lengths) + # update monitor state + if self._current_reward is not None: + self._episode_rewards.append(self._current_reward) + self._episode_lengths.append(self._num_steps) + self._episode_end_times.append(time.time() - self._time_offset) + # reset episode state + self._current_reward = 0 + self._num_steps = 0 + + return obs + + def _step(self, action): + obs, rew, done, info = self.env.step(action) + self._current_reward += rew + self._num_steps += 1 + self._total_steps += 1 + info['steps'] = self._total_steps + info['rewards'] = self._episode_rewards + return (obs, rew, done, info) + + def get_state(self): + return { + 'env_id': self.env.unwrapped.spec.id, + 'episode_data': { + 'episode_rewards': self._episode_rewards, + 'episode_lengths': self._episode_lengths, + 'episode_end_times': self._episode_end_times, + 'initial_reset_time': 0, + } + } + + def set_state(self, state): + assert state['env_id'] == self.env.unwrapped.spec.id + ed = state['episode_data'] + self._episode_rewards = ed['episode_rewards'] + self._episode_lengths = ed['episode_lengths'] + self._episode_end_times = ed['episode_end_times'] + + +def boolean_flag(parser, name, default=False, help=None): + """Add a boolean flag to argparse parser. + + Parameters + ---------- + parser: argparse.Parser + parser to add the flag to + name: str + -- will enable the flag, while --no- will disable it + default: bool or None + default value of the flag + help: str + help string for the flag + """ + dest = name.replace('-', '_') + parser.add_argument("--" + name, action="store_true", default=default, dest=dest, help=help) + parser.add_argument("--no-" + name, action="store_false", dest=dest) + + +def get_wrapper_by_name(env, classname): + """Given an a gym environment possibly wrapped multiple times, returns a wrapper + of class named classname or raises ValueError if no such wrapper was applied + + Parameters + ---------- + env: gym.Env of gym.Wrapper + gym environment + classname: str + name of the wrapper + + Returns + ------- + wrapper: gym.Wrapper + wrapper named classname + """ + currentenv = env + while True: + if classname == currentenv.class_name(): + return currentenv + elif isinstance(currentenv, gym.Wrapper): + currentenv = currentenv.env + else: + raise ValueError("Couldn't find wrapper named %s" % classname) + + +def relatively_safe_pickle_dump(obj, path, compression=False): + """This is just like regular pickle dump, except from the fact that failure cases are + different: + + - It's never possible that we end up with a pickle in corrupted state. + - If a there was a different file at the path, that file will remain unchanged in the + even of failure (provided that filesystem rename is atomic). + - it is sometimes possible that we end up with useless temp file which needs to be + deleted manually (it will be removed automatically on the next function call) + + The indended use case is periodic checkpoints of experiment state, such that we never + corrupt previous checkpoints if the current one fails. + + Parameters + ---------- + obj: object + object to pickle + path: str + path to the output file + compression: bool + if true pickle will be compressed + """ + temp_storage = path + ".relatively_safe" + if compression: + # Using gzip here would be simpler, but the size is limited to 2GB + with tempfile.NamedTemporaryFile() as uncompressed_file: + pickle.dump(obj, uncompressed_file) + with zipfile.ZipFile(temp_storage, "w", compression=zipfile.ZIP_DEFLATED) as myzip: + myzip.write(uncompressed_file.name, "data") + else: + with open(temp_storage, "wb") as f: + pickle.dump(obj, f) + os.rename(temp_storage, path) + + +def pickle_load(path, compression=False): + """Unpickle a possible compressed pickle. + + Parameters + ---------- + path: str + path to the output file + compression: bool + if true assumes that pickle was compressed when created and attempts decompression. + + Returns + ------- + obj: object + the unpickled object + """ + + if compression: + with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip: + with myzip.open("data") as f: + return pickle.load(f) + else: + with open(path, "rb") as f: + return pickle.load(f) diff --git a/baselines/baselines_common/mpi_adam.py b/baselines/baselines_common/mpi_adam.py new file mode 100644 index 0000000..70fb22f --- /dev/null +++ b/baselines/baselines_common/mpi_adam.py @@ -0,0 +1,85 @@ +from mpi4py import MPI +import baselines.baselines_common.tf_util as U +import tensorflow as tf +import numpy as np + + +class MpiAdam(object): + def __init__(self, var_list, *, + beta1=0.9, beta2=0.999, epsilon=1e-08, + scale_grad_by_procs=True, + comm=None): + self.var_list = var_list + self.beta1 = beta1 + self.beta2 = beta2 + self.epsilon = epsilon + self.scale_grad_by_procs = scale_grad_by_procs + size = sum(U.numel(v) for v in var_list) + self.m = np.zeros(size, 'float32') + self.v = np.zeros(size, 'float32') + + self.t = 0 + self.setfromflat = U.SetFromFlat(var_list) + self.getflat = U.GetFlat(var_list) + self.comm = MPI.COMM_WORLD if comm is None else comm + + def update(self, localg, stepsize): + if self.t % 100 == 0: + self.check_synced() + localg = localg.astype('float32') + globalg = np.zeros_like(localg) + self.comm.Allreduce(localg, globalg, op=MPI.SUM) + if self.scale_grad_by_procs: + globalg /= self.comm.Get_size() + + self.t += 1 + a = stepsize * np.sqrt(1 - self.beta2 ** self.t) / (1 - self.beta1 ** self.t) + self.m = self.beta1 * self.m + (1 - self.beta1) * globalg + self.v = self.beta2 * self.v + (1 - self.beta2) * (globalg * globalg) + step = (- a) * self.m / (np.sqrt(self.v) + self.epsilon) + self.setfromflat(self.getflat() + step) + + def sync(self): + theta = self.getflat() + self.comm.Bcast(theta, root=0) + self.setfromflat(theta) + + def check_synced(self): + if self.comm.Get_rank() == 0: # this is root + theta = self.getflat() + self.comm.Bcast(theta, root=0) + else: + thetalocal = self.getflat() + thetaroot = np.empty_like(thetalocal) + self.comm.Bcast(thetaroot, root=0) + assert (thetaroot == thetalocal).all(), (thetaroot, thetalocal) + + +@U.in_session +def test_MpiAdam(): + np.random.seed(0) + tf.set_random_seed(0) + + a = tf.Variable(np.random.randn(3).astype('float32')) + b = tf.Variable(np.random.randn(2, 5).astype('float32')) + loss = tf.reduce_sum(tf.square(a)) + tf.reduce_sum(tf.sin(b)) + + stepsize = 1e-2 + update_op = tf.train.AdamOptimizer(stepsize).minimize(loss) + do_update = U.function([], loss, updates=[update_op]) + + tf.get_default_session().run(tf.global_variables_initializer()) + for i in range(10): + print(i, do_update()) + + tf.set_random_seed(0) + tf.get_default_session().run(tf.global_variables_initializer()) + + var_list = [a, b] + lossandgrad = U.function([], [loss, U.flatgrad(loss, var_list)], updates=[update_op]) + adam = MpiAdam(var_list) + + for i in range(10): + l, g = lossandgrad() + adam.update(g, stepsize) + print(i, l) diff --git a/baselines/baselines_common/mpi_fork.py b/baselines/baselines_common/mpi_fork.py new file mode 100644 index 0000000..c92bc16 --- /dev/null +++ b/baselines/baselines_common/mpi_fork.py @@ -0,0 +1,24 @@ +import os, subprocess, sys + + +def mpi_fork(n, bind_to_core=False): + """Re-launches the current script with workers + Returns "parent" for original parent, "child" for MPI children + """ + if n <= 1: + return "child" + if os.getenv("IN_MPI") is None: + env = os.environ.copy() + env.update( + MKL_NUM_THREADS="1", + OMP_NUM_THREADS="1", + IN_MPI="1" + ) + args = ["mpirun", "-np", str(n)] + if bind_to_core: + args += ["-bind-to", "core"] + args += [sys.executable] + sys.argv + subprocess.check_call(args, env=env) + return "parent" + else: + return "child" diff --git a/baselines/baselines_common/mpi_moments.py b/baselines/baselines_common/mpi_moments.py new file mode 100644 index 0000000..0b8473a --- /dev/null +++ b/baselines/baselines_common/mpi_moments.py @@ -0,0 +1,52 @@ +from mpi4py import MPI +import numpy as np +from baselines.baselines_common import zipsame + + +def mpi_moments(x, axis=0): + x = np.asarray(x, dtype='float64') + newshape = list(x.shape) + newshape.pop(axis) + n = np.prod(newshape, dtype=int) + totalvec = np.zeros(n * 2 + 1, 'float64') + addvec = np.concatenate([x.sum(axis=axis).ravel(), + np.square(x).sum(axis=axis).ravel(), + np.array([x.shape[axis]], dtype='float64')]) + MPI.COMM_WORLD.Allreduce(addvec, totalvec, op=MPI.SUM) + sum = totalvec[:n] + sumsq = totalvec[n:2 * n] + count = totalvec[2 * n] + if count == 0: + mean = np.empty(newshape); + mean[:] = np.nan + std = np.empty(newshape); + std[:] = np.nan + else: + mean = sum / count + std = np.sqrt(np.maximum(sumsq / count - np.square(mean), 0)) + return mean, std, count + + +def test_runningmeanstd(): + comm = MPI.COMM_WORLD + np.random.seed(0) + for (triple, axis) in [ + ((np.random.randn(3), np.random.randn(4), np.random.randn(5)), 0), + ((np.random.randn(3, 2), np.random.randn(4, 2), np.random.randn(5, 2)), 0), + ((np.random.randn(2, 3), np.random.randn(2, 4), np.random.randn(2, 4)), 1), + ]: + + x = np.concatenate(triple, axis=axis) + ms1 = [x.mean(axis=axis), x.std(axis=axis), x.shape[axis]] + + ms2 = mpi_moments(triple[comm.Get_rank()], axis=axis) + + for (a1, a2) in zipsame(ms1, ms2): + print(a1, a2) + assert np.allclose(a1, a2) + print("ok!") + + +if __name__ == "__main__": + # mpirun -np 3 python