[rllib] Clean up evolution strategies example. (#1225)

* Remove ES observation statistics.

* Consolidate policy classes.

* Remove random stream.

* Move rollout function out of policy.

* Consolidate policy initialization.

* Replace act implementation with sess.run.

* Remove tf_utils.

* Remove variable scope.

* Remove unused imports.

* Use regular TF session.

* Use MeanStdFilter.

* Minor.

* Clarify naming.

* Update documentation.

* eps -> episodes

* Report noiseless evaluation runs.

* Clean up naming.

* Update documentation.

* Fix some bugs.

* Make it run on atari.

* Don't add action noise during evaluation runs.

* Add ES to checkpoint/restore test.

* Small cleanups and remove redundant calls to get_weights.

* Remove outdated comment.
This commit is contained in:
Robert Nishihara
2017-11-16 21:58:30 -08:00
committed by Philipp Moritz
parent eadb998643
commit 0eae917766
9 changed files with 220 additions and 702 deletions
+127 -153
View File
@@ -18,29 +18,26 @@ from ray.rllib.models import ModelCatalog
from ray.rllib.es import optimizers
from ray.rllib.es import policies
from ray.rllib.es import tabular_logger as tlogger
from ray.rllib.es import tf_util
from ray.rllib.es import utils
from ray.tune.result import TrainingResult
Result = namedtuple("Result", [
"noise_inds_n", "returns_n2", "sign_returns_n2", "lengths_n2",
"eval_return", "eval_length", "ob_sum", "ob_sumsq", "ob_count"
"noise_indices", "noisy_returns", "sign_noisy_returns", "noisy_lengths",
"eval_returns", "eval_lengths"
])
DEFAULT_CONFIG = dict(
l2coeff=0.005,
l2_coeff=0.005,
noise_stdev=0.02,
episodes_per_batch=1000,
timesteps_per_batch=10000,
calc_obstat_prob=0.01,
eval_prob=0,
snapshot_freq=0,
eval_prob=0.003,
return_proc_mode="centered_rank",
episode_cutoff_mode="env_default",
num_workers=10,
stepsize=.01)
stepsize=0.01,
observation_filter="MeanStdFilter")
@ray.remote
@@ -60,8 +57,8 @@ class SharedNoiseTable(object):
def get(self, i, dim):
return self.noise[i:i + dim]
def sample_index(self, stream, dim):
return stream.randint(0, len(self.noise) - dim + 1)
def sample_index(self, dim):
return np.random.randint(0, len(self.noise) - dim + 1)
@ray.remote
@@ -77,82 +74,63 @@ class Worker(object):
self.preprocessor = ModelCatalog.get_preprocessor(self.env)
self.sess = utils.make_session(single_threaded=True)
self.policy = policies.GenericPolicy(
self.env.observation_space, self.env.action_space,
self.preprocessor, **policy_params)
tf_util.initialize()
self.policy = policies.GenericPolicy(self.sess, self.env.action_space,
self.preprocessor,
config["observation_filter"],
**policy_params)
self.rs = np.random.RandomState()
def rollout(self, timestep_limit, add_noise=True):
rollout_rewards, rollout_length = policies.rollout(
self.policy, self.env, timestep_limit=timestep_limit,
add_noise=add_noise)
return rollout_rewards, rollout_length
assert (
self.policy.needs_ob_stat ==
(self.config["calc_obstat_prob"] != 0))
def rollout_and_update_ob_stat(self, timestep_limit, task_ob_stat):
if (self.policy.needs_ob_stat and
self.config["calc_obstat_prob"] != 0 and
self.rs.rand() < self.config["calc_obstat_prob"]):
rollout_rews, rollout_len, obs = self.policy.rollout(
self.env, self.preprocessor, timestep_limit=timestep_limit,
save_obs=True, random_stream=self.rs)
task_ob_stat.increment(obs.sum(axis=0), np.square(obs).sum(axis=0),
len(obs))
else:
rollout_rews, rollout_len = self.policy.rollout(
self.env, self.preprocessor, timestep_limit=timestep_limit,
random_stream=self.rs)
return rollout_rews, rollout_len
def do_rollouts(self, params, ob_mean, ob_std, timestep_limit=None):
def do_rollouts(self, params, timestep_limit=None):
# Set the network weights.
self.policy.set_trainable_flat(params)
self.policy.set_weights(params)
if self.policy.needs_ob_stat:
self.policy.set_ob_stat(ob_mean, ob_std)
if self.config["eval_prob"] != 0:
raise NotImplementedError("Eval rollouts are not implemented.")
noise_inds, returns, sign_returns, lengths = [], [], [], []
# We set eps=0 because we're incrementing only.
task_ob_stat = utils.RunningStat(self.preprocessor.shape, eps=0)
noise_indices, returns, sign_returns, lengths = [], [], [], []
eval_returns, eval_lengths = [], []
# Perform some rollouts with noise.
task_tstart = time.time()
while (len(noise_inds) == 0 or
while (len(noise_indices) == 0 or
time.time() - task_tstart < self.min_task_runtime):
noise_idx = self.noise.sample_index(
self.rs, self.policy.num_params)
perturbation = self.config["noise_stdev"] * self.noise.get(
noise_idx, self.policy.num_params)
# These two sampling steps could be done in parallel on different
# actors letting us update twice as frequently.
self.policy.set_trainable_flat(params + perturbation)
rews_pos, len_pos = self.rollout_and_update_ob_stat(timestep_limit,
task_ob_stat)
if np.random.uniform() < self.config["eval_prob"]:
# Do an evaluation run with no perturbation.
self.policy.set_weights(params)
rewards, length = self.rollout(timestep_limit, add_noise=False)
eval_returns.append(rewards.sum())
eval_lengths.append(length)
else:
# Do a regular run with parameter perturbations.
noise_index = self.noise.sample_index(self.policy.num_params)
self.policy.set_trainable_flat(params - perturbation)
rews_neg, len_neg = self.rollout_and_update_ob_stat(timestep_limit,
task_ob_stat)
perturbation = self.config["noise_stdev"] * self.noise.get(
noise_index, self.policy.num_params)
noise_inds.append(noise_idx)
returns.append([rews_pos.sum(), rews_neg.sum()])
sign_returns.append(
[np.sign(rews_pos).sum(), np.sign(rews_neg).sum()])
lengths.append([len_pos, len_neg])
# These two sampling steps could be done in parallel on
# different actors letting us update twice as frequently.
self.policy.set_weights(params + perturbation)
rewards_pos, lengths_pos = self.rollout(timestep_limit)
self.policy.set_weights(params - perturbation)
rewards_neg, lengths_neg = self.rollout(timestep_limit)
noise_indices.append(noise_index)
returns.append([rewards_pos.sum(), rewards_neg.sum()])
sign_returns.append(
[np.sign(rewards_pos).sum(), np.sign(rewards_neg).sum()])
lengths.append([lengths_pos, lengths_neg])
return Result(
noise_inds_n=np.array(noise_inds),
returns_n2=np.array(returns, dtype=np.float32),
sign_returns_n2=np.array(sign_returns, dtype=np.float32),
lengths_n2=np.array(lengths, dtype=np.int32),
eval_return=None,
eval_length=None,
ob_sum=(None if task_ob_stat.count == 0 else task_ob_stat.sum),
ob_sumsq=(None if task_ob_stat.count == 0
else task_ob_stat.sumsq),
ob_count=task_ob_stat.count)
noise_indices=noise_indices,
noisy_returns=returns,
sign_noisy_returns=sign_returns,
noisy_lengths=lengths,
eval_returns=eval_returns,
eval_lengths=eval_lengths)
class ESAgent(Agent):
@@ -160,9 +138,8 @@ class ESAgent(Agent):
_default_config = DEFAULT_CONFIG
def _init(self):
policy_params = {
"ac_noise_std": 0.01
"action_noise_std": 0.01
}
env = self.env_creator()
@@ -170,11 +147,9 @@ class ESAgent(Agent):
self.sess = utils.make_session(single_threaded=False)
self.policy = policies.GenericPolicy(
env.observation_space, env.action_space, preprocessor,
**policy_params)
tf_util.initialize()
self.sess, env.action_space, preprocessor,
self.config["observation_filter"], **policy_params)
self.optimizer = optimizers.Adam(self.policy, self.config["stepsize"])
self.ob_stat = utils.RunningStat(preprocessor.shape, eps=1e-2)
# Create the shared noise table.
print("Creating shared noise table.")
@@ -192,132 +167,133 @@ class ESAgent(Agent):
self.timesteps_so_far = 0
self.tstart = time.time()
def _collect_results(self, theta_id, min_eps, min_timesteps):
num_eps, num_timesteps = 0, 0
def _collect_results(self, theta_id, min_episodes, min_timesteps):
num_episodes, num_timesteps = 0, 0
results = []
while num_eps < min_eps or num_timesteps < min_timesteps:
while num_episodes < min_episodes or num_timesteps < min_timesteps:
print(
"Collected {} episodes {} timesteps so far this iter".format(
num_eps, num_timesteps))
rollout_ids = [worker.do_rollouts.remote(
theta_id,
self.ob_stat.mean if self.policy.needs_ob_stat else None,
self.ob_stat.std if self.policy.needs_ob_stat else None)
for worker in self.workers]
num_episodes, num_timesteps))
rollout_ids = [worker.do_rollouts.remote(theta_id)
for worker in self.workers]
# Get the results of the rollouts.
for result in ray.get(rollout_ids):
results.append(result)
num_eps += result.lengths_n2.size
num_timesteps += result.lengths_n2.sum()
return results
# Update the number of episodes and the number of timesteps
# keeping in mind that result.noisy_lengths is a list of lists,
# where the inner lists have length 2.
num_episodes += sum([len(pair) for pair
in result.noisy_lengths])
num_timesteps += sum([sum(pair) for pair
in result.noisy_lengths])
return results, num_episodes, num_timesteps
def _train(self):
config = self.config
step_tstart = time.time()
theta = self.policy.get_trainable_flat()
theta = self.policy.get_weights()
assert theta.dtype == np.float32
# Put the current policy weights in the object store.
theta_id = ray.put(theta)
# Use the actors to do rollouts, note that we pass in the ID of the
# policy weights.
results = self._collect_results(
results, num_episodes, num_timesteps = self._collect_results(
theta_id,
config["episodes_per_batch"],
config["timesteps_per_batch"])
curr_task_results = []
ob_count_this_batch = 0
# Loop over the results
all_noise_indices = []
all_training_returns = []
all_training_lengths = []
all_eval_returns = []
all_eval_lengths = []
# Loop over the results.
for result in results:
assert result.eval_length is None, "We aren't doing eval rollouts."
assert result.noise_inds_n.ndim == 1
assert result.returns_n2.shape == (len(result.noise_inds_n), 2)
assert result.lengths_n2.shape == (len(result.noise_inds_n), 2)
assert result.returns_n2.dtype == np.float32
all_eval_returns += result.eval_returns
all_eval_lengths += result.eval_lengths
result_num_eps = result.lengths_n2.size
result_num_timesteps = result.lengths_n2.sum()
self.episodes_so_far += result_num_eps
self.timesteps_so_far += result_num_timesteps
all_noise_indices += result.noise_indices
all_training_returns += result.noisy_returns
all_training_lengths += result.noisy_lengths
curr_task_results.append(result)
# Update ob stats.
if self.policy.needs_ob_stat and result.ob_count > 0:
self.ob_stat.increment(
result.ob_sum, result.ob_sumsq, result.ob_count)
ob_count_this_batch += result.ob_count
assert len(all_eval_returns) == len(all_eval_lengths)
assert (len(all_noise_indices) == len(all_training_returns) ==
len(all_training_lengths))
self.episodes_so_far += num_episodes
self.timesteps_so_far += num_timesteps
# Assemble the results.
noise_inds_n = np.concatenate(
[r.noise_inds_n for r in curr_task_results])
returns_n2 = np.concatenate([r.returns_n2 for r in curr_task_results])
lengths_n2 = np.concatenate([r.lengths_n2 for r in curr_task_results])
assert (noise_inds_n.shape[0] == returns_n2.shape[0] ==
lengths_n2.shape[0])
eval_returns = np.array(all_eval_returns)
eval_lengths = np.array(all_eval_lengths)
noise_indices = np.array(all_noise_indices)
noisy_returns = np.array(all_training_returns)
noisy_lengths = np.array(all_training_lengths)
# Process the returns.
if config["return_proc_mode"] == "centered_rank":
proc_returns_n2 = utils.compute_centered_ranks(returns_n2)
proc_noisy_returns = utils.compute_centered_ranks(noisy_returns)
else:
raise NotImplementedError(config["return_proc_mode"])
# Compute and take a step.
g, count = utils.batched_weighted_sum(
proc_returns_n2[:, 0] - proc_returns_n2[:, 1],
(self.noise.get(idx, self.policy.num_params)
for idx in noise_inds_n),
proc_noisy_returns[:, 0] - proc_noisy_returns[:, 1],
(self.noise.get(index, self.policy.num_params)
for index in noise_indices),
batch_size=500)
g /= returns_n2.size
g /= noisy_returns.size
assert (
g.shape == (self.policy.num_params,) and
g.dtype == np.float32 and
count == len(noise_inds_n))
update_ratio = self.optimizer.update(-g + config["l2coeff"] * theta)
# Update ob stat (we're never running the policy in the master, but we
# might be snapshotting the policy).
if self.policy.needs_ob_stat:
self.policy.set_ob_stat(self.ob_stat.mean, self.ob_stat.std)
count == len(noise_indices))
# Compute the new weights theta.
theta, update_ratio = self.optimizer.update(
-g + config["l2_coeff"] * theta)
# Set the new weights in the local copy of the policy.
self.policy.set_weights(theta)
step_tend = time.time()
tlogger.record_tabular("EpRewMean", returns_n2.mean())
tlogger.record_tabular("EpRewStd", returns_n2.std())
tlogger.record_tabular("EpLenMean", lengths_n2.mean())
tlogger.record_tabular("EvalEpRewMean", eval_returns.mean())
tlogger.record_tabular("EvalEpRewStd", eval_returns.std())
tlogger.record_tabular("EvalEpLenMean", eval_lengths.mean())
tlogger.record_tabular(
"Norm", float(np.square(self.policy.get_trainable_flat()).sum()))
tlogger.record_tabular("EpRewMean", noisy_returns.mean())
tlogger.record_tabular("EpRewStd", noisy_returns.std())
tlogger.record_tabular("EpLenMean", noisy_lengths.mean())
tlogger.record_tabular("Norm", float(np.square(theta).sum()))
tlogger.record_tabular("GradNorm", float(np.square(g).sum()))
tlogger.record_tabular("UpdateRatio", float(update_ratio))
tlogger.record_tabular("EpisodesThisIter", lengths_n2.size)
tlogger.record_tabular("EpisodesThisIter", noisy_lengths.size)
tlogger.record_tabular("EpisodesSoFar", self.episodes_so_far)
tlogger.record_tabular("TimestepsThisIter", lengths_n2.sum())
tlogger.record_tabular("TimestepsThisIter", noisy_lengths.sum())
tlogger.record_tabular("TimestepsSoFar", self.timesteps_so_far)
tlogger.record_tabular("ObCount", ob_count_this_batch)
tlogger.record_tabular("TimeElapsedThisIter", step_tend - step_tstart)
tlogger.record_tabular("TimeElapsed", step_tend - self.tstart)
tlogger.dump_tabular()
info = {
"weights_norm": np.square(self.policy.get_trainable_flat()).sum(),
"weights_norm": np.square(theta).sum(),
"grad_norm": np.square(g).sum(),
"update_ratio": update_ratio,
"episodes_this_iter": lengths_n2.size,
"episodes_this_iter": noisy_lengths.size,
"episodes_so_far": self.episodes_so_far,
"timesteps_this_iter": lengths_n2.sum(),
"timesteps_this_iter": noisy_lengths.sum(),
"timesteps_so_far": self.timesteps_so_far,
"ob_count": ob_count_this_batch,
"time_elapsed_this_iter": step_tend - step_tstart,
"time_elapsed": step_tend - self.tstart
}
result = TrainingResult(
episode_reward_mean=returns_n2.mean(),
episode_len_mean=lengths_n2.mean(),
timesteps_this_iter=lengths_n2.sum(),
episode_reward_mean=eval_returns.mean(),
episode_len_mean=eval_lengths.mean(),
timesteps_this_iter=noisy_lengths.sum(),
info=info)
return result
@@ -325,10 +301,9 @@ class ESAgent(Agent):
def _save(self):
checkpoint_path = os.path.join(
self.logdir, "checkpoint-{}".format(self.iteration))
weights = self.policy.get_trainable_flat()
weights = self.policy.get_weights()
objects = [
weights,
self.ob_stat,
self.episodes_so_far,
self.timesteps_so_far]
pickle.dump(objects, open(checkpoint_path, "wb"))
@@ -336,10 +311,9 @@ class ESAgent(Agent):
def _restore(self, checkpoint_path):
objects = pickle.load(open(checkpoint_path, "rb"))
self.policy.set_trainable_flat(objects[0])
self.ob_stat = objects[1]
self.episodes_so_far = objects[2]
self.timesteps_so_far = objects[3]
self.policy.set_weights(objects[0])
self.episodes_so_far = objects[1]
self.timesteps_so_far = objects[2]
def compute_action(self, observation):
return self.policy.act([observation])[0]
return self.policy.compute(observation, update=False)[0]
+2 -3
View File
@@ -17,10 +17,9 @@ class Optimizer(object):
def update(self, globalg):
self.t += 1
step = self._compute_step(globalg)
theta = self.pi.get_trainable_flat()
theta = self.pi.get_weights()
ratio = np.linalg.norm(step) / np.linalg.norm(theta)
self.pi.set_trainable_flat(theta + step)
return ratio
return theta + step, ratio
def _compute_step(self, globalg):
raise NotImplementedError
+65 -170
View File
@@ -5,191 +5,86 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import pickle
import gym.spaces
import h5py
import gym
import numpy as np
import tensorflow as tf
from ray.rllib.es import tf_util as U
import ray
from ray.rllib.models import ModelCatalog
logger = logging.getLogger(__name__)
# TODO(rkn): Move these filters out of PPO to somewhere common.
from ray.rllib.ppo.filter import NoFilter, MeanStdFilter
class Policy:
def __init__(self, *args, **kwargs):
self.args, self.kwargs = args, kwargs
self.scope = self._initialize(*args, **kwargs)
self.all_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,
self.scope.name)
def rollout(policy, env, timestep_limit=None, add_noise=False):
"""Do a rollout.
self.trainable_variables = tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES, self.scope.name)
self.num_params = sum(int(np.prod(v.get_shape().as_list()))
for v in self.trainable_variables)
self._setfromflat = U.SetFromFlat(self.trainable_variables)
self._getflat = U.GetFlat(self.trainable_variables)
logger.info('Trainable variables ({} parameters)'
.format(self.num_params))
for v in self.trainable_variables:
shp = v.get_shape().as_list()
logger.info('- {} shape:{} size:{}'.format(v.name, shp,
np.prod(shp)))
logger.info('All variables')
for v in self.all_variables:
shp = v.get_shape().as_list()
logger.info('- {} shape:{} size:{}'.format(v.name, shp,
np.prod(shp)))
placeholders = [tf.placeholder(v.value().dtype,
v.get_shape().as_list())
for v in self.all_variables]
self.set_all_vars = U.function(
inputs=placeholders,
outputs=[],
updates=[tf.group(*[v.assign(p) for v, p
in zip(self.all_variables, placeholders)])]
)
def _initialize(self, *args, **kwargs):
raise NotImplementedError
def save(self, filename):
assert filename.endswith('.h5')
with h5py.File(filename, 'w') as f:
for v in self.all_variables:
f[v.name] = v.eval()
# TODO: It would be nice to avoid pickle, but it's convenient to
# pass Python objects to _initialize (like Gym spaces or numpy
# arrays).
f.attrs['name'] = type(self).__name__
f.attrs['args_and_kwargs'] = np.void(pickle.dumps((self.args,
self.kwargs),
protocol=-1))
@classmethod
def Load(cls, filename, extra_kwargs=None):
with h5py.File(filename, 'r') as f:
args, kwargs = pickle.loads(f.attrs['args_and_kwargs'].tostring())
if extra_kwargs:
kwargs.update(extra_kwargs)
policy = cls(*args, **kwargs)
policy.set_all_vars(*[f[v.name][...]
for v in policy.all_variables])
return policy
# === Rollouts/training ===
def rollout(self, env, preprocessor, render=False, timestep_limit=None,
save_obs=False, random_stream=None):
"""Do a rollout.
If random_stream is provided, the rollout will take noisy actions with
noise drawn from that stream. Otherwise, no action noise will be added.
"""
env_timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit"
".max_episode_steps")
timestep_limit = (env_timestep_limit if timestep_limit is None
else min(timestep_limit, env_timestep_limit))
rews = []
t = 0
if save_obs:
obs = []
ob = preprocessor.transform(env.reset())
for _ in range(timestep_limit):
ac = self.act(ob[None], random_stream=random_stream)[0]
if save_obs:
obs.append(ob)
ob, rew, done, _ = env.step(ac)
ob = preprocessor.transform(ob)
rews.append(rew)
t += 1
if render:
env.render()
if done:
break
rews = np.array(rews, dtype=np.float32)
if save_obs:
return rews, t, np.array(obs)
return rews, t
def act(self, ob, random_stream=None):
raise NotImplementedError
def set_trainable_flat(self, x):
self._setfromflat(x)
def get_trainable_flat(self):
return self._getflat()
@property
def needs_ob_stat(self):
raise NotImplementedError
def set_ob_stat(self, ob_mean, ob_std):
raise NotImplementedError
If add_noise is True, the rollout will take noisy actions with
noise drawn from that stream. Otherwise, no action noise will be added.
"""
env_timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit"
".max_episode_steps")
timestep_limit = (env_timestep_limit if timestep_limit is None
else min(timestep_limit, env_timestep_limit))
rews = []
t = 0
observation = env.reset()
for _ in range(timestep_limit):
ac = policy.compute(observation, add_noise=add_noise)[0]
observation, rew, done, _ = env.step(ac)
rews.append(rew)
t += 1
if done:
break
rews = np.array(rews, dtype=np.float32)
return rews, t
def bins(x, dim, num_bins, name):
scores = U.dense(x, dim * num_bins, name, U.normc_initializer(0.01))
scores_nab = tf.reshape(scores, [-1, dim, num_bins])
return tf.argmax(scores_nab, 2)
class GenericPolicy(Policy):
def _initialize(self, ob_space, ac_space, preprocessor, ac_noise_std):
self.ac_space = ac_space
self.ac_noise_std = ac_noise_std
class GenericPolicy(object):
def __init__(self, sess, action_space, preprocessor,
observation_filter, action_noise_std):
self.sess = sess
self.action_space = action_space
self.action_noise_std = action_noise_std
self.preprocessor = preprocessor
with tf.variable_scope(type(self).__name__) as scope:
# Observation normalization.
ob_mean = tf.get_variable(
'ob_mean', self.preprocessor.shape, tf.float32,
tf.constant_initializer(np.nan), trainable=False)
ob_std = tf.get_variable(
'ob_std', self.preprocessor.shape, tf.float32,
tf.constant_initializer(np.nan), trainable=False)
in_mean = tf.placeholder(tf.float32, self.preprocessor.shape)
in_std = tf.placeholder(tf.float32, self.preprocessor.shape)
self._set_ob_mean_std = U.function([in_mean, in_std], [], updates=[
tf.assign(ob_mean, in_mean),
tf.assign(ob_std, in_std),
])
if observation_filter == "MeanStdFilter":
self.observation_filter = MeanStdFilter(
self.preprocessor.shape, clip=None)
elif observation_filter == "NoFilter":
self.observation_filter = NoFilter()
else:
raise Exception("Unknown observation_filter: " +
str("observation_filter"))
inputs = tf.placeholder(
tf.float32, [None] + list(self.preprocessor.shape))
self.inputs = tf.placeholder(
tf.float32, [None] + list(self.preprocessor.shape))
# TODO(ekl): we should do clipping in a standard RLlib preprocessor
clipped_inputs = tf.clip_by_value(
(inputs - ob_mean) / ob_std, -5.0, 5.0)
# Policy network.
dist_class, dist_dim = ModelCatalog.get_action_dist(
self.action_space, dist_type="deterministic")
model = ModelCatalog.get_model(self.inputs, dist_dim)
dist = dist_class(model.outputs)
self.sampler = dist.sample()
# Policy network.
dist_class, dist_dim = ModelCatalog.get_action_dist(
self.ac_space, dist_type='deterministic')
model = ModelCatalog.get_model(clipped_inputs, dist_dim)
dist = dist_class(model.outputs)
self._act = U.function([inputs], dist.sample())
return scope
self.variables = ray.experimental.TensorFlowVariables(
model.outputs, self.sess)
def act(self, ob, random_stream=None):
a = self._act(ob)
if not isinstance(self.ac_space, gym.spaces.Discrete) and \
random_stream is not None and self.ac_noise_std != 0:
a += random_stream.randn(*a.shape) * self.ac_noise_std
return a
self.num_params = sum([np.prod(variable.shape.as_list())
for _, variable
in self.variables.variables.items()])
self.sess.run(tf.global_variables_initializer())
@property
def needs_ob_stat(self):
return True
def compute(self, observation, add_noise=False, update=True):
observation = self.preprocessor.transform(observation)
observation = self.observation_filter(observation[None], update=update)
action = self.sess.run(self.sampler,
feed_dict={self.inputs: observation})
if add_noise and isinstance(self.action_space, gym.spaces.Box):
action += np.random.randn(*action.shape) * self.action_noise_std
return action
@property
def needs_ref_batch(self):
return False
def set_weights(self, x):
self.variables.set_flat(x)
def set_ob_stat(self, ob_mean, ob_std):
self._set_ob_mean_std(ob_mean, ob_std)
def get_weights(self):
return self.variables.get_flat()
-292
View File
@@ -1,292 +0,0 @@
# Code in this file is copied and adapted from
# https://github.com/openai/evolution-strategies-starter.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
import functools
import os
# Tensorflow must be at least version 1.0.0 for the example to work.
if int(tf.__version__.split(".")[0]) < 1:
raise Exception("Your Tensorflow version is less than 1.0.0. Please "
"update Tensorflow to the latest version.")
# ================================================================
# Import all names into common namespace
# ================================================================
clip = tf.clip_by_value
# Make consistent with numpy
def sum(x, axis=None, keepdims=False):
return tf.reduce_sum(x, reduction_indices=None if axis is None else [axis],
keep_dims=keepdims)
def mean(x, axis=None, keepdims=False):
return tf.reduce_mean(x, reduction_indices=(None if axis is None
else [axis]),
keep_dims=keepdims)
def var(x, axis=None, keepdims=False):
meanx = mean(x, axis=axis, keepdims=keepdims)
return mean(tf.square(x - meanx), axis=axis, keepdims=keepdims)
def std(x, axis=None, keepdims=False):
return tf.sqrt(var(x, axis=axis, keepdims=keepdims))
def max(x, axis=None, keepdims=False):
return tf.reduce_max(x, reduction_indices=None if axis is None else [axis],
keep_dims=keepdims)
def min(x, axis=None, keepdims=False):
return tf.reduce_min(x, reduction_indices=None if axis is None else [axis],
keep_dims=keepdims)
def concatenate(arrs, axis=0):
return tf.concat(arrs, axis)
def argmax(x, axis=None):
return tf.argmax(x, dimension=axis)
# Extras
def l2loss(params):
if len(params) == 0:
return tf.constant(0.0)
else:
return tf.add_n([sum(tf.square(p)) for p in params])
def lrelu(x, leak=0.2):
f1 = 0.5 * (1 + leak)
f2 = 0.5 * (1 - leak)
return f1 * x + f2 * abs(x)
def categorical_sample_logits(X):
# https://github.com/tensorflow/tensorflow/issues/456
U = tf.random_uniform(tf.shape(X))
return argmax(X - tf.log(-tf.log(U)), axis=1)
# Global session
def get_session():
return tf.get_default_session()
def single_threaded_session():
tf_config = tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1)
return tf.Session(config=tf_config)
ALREADY_INITIALIZED = set()
def initialize():
new_variables = set(tf.global_variables()) - ALREADY_INITIALIZED
get_session().run(tf.variables_initializer(new_variables))
ALREADY_INITIALIZED.update(new_variables)
def eval(expr, feed_dict=None):
if feed_dict is None:
feed_dict = {}
return get_session().run(expr, feed_dict=feed_dict)
def set_value(v, val):
get_session().run(v.assign(val))
def load_state(fname):
saver = tf.train.Saver()
saver.restore(get_session(), fname)
def save_state(fname):
os.makedirs(os.path.dirname(fname), exist_ok=True)
saver = tf.train.Saver()
saver.save(get_session(), fname)
# Model components
def normc_initializer(std=1.0):
def _initializer(shape, dtype=None, partition_info=None):
out = np.random.randn(*shape).astype(np.float32)
out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
return tf.constant(out)
return _initializer
def dense(x, size, name, weight_init=None, bias=True):
w = tf.get_variable(name + "/w", [x.get_shape()[1], size],
initializer=weight_init)
ret = tf.matmul(x, w)
if bias:
b = tf.get_variable(name + "/b", [size],
initializer=tf.zeros_initializer())
return ret + b
else:
return ret
# Basic Stuff
def function(inputs, outputs, updates=None, givens=None):
if isinstance(outputs, list):
return _Function(inputs, outputs, updates, givens=givens)
elif isinstance(outputs, dict):
f = _Function(inputs, outputs.values(), updates, givens=givens)
return lambda *inputs: dict(zip(outputs.keys(), f(*inputs)))
else:
f = _Function(inputs, [outputs], updates, givens=givens)
return lambda *inputs: f(*inputs)[0]
class _Function(object):
def __init__(self, inputs, outputs, updates, givens, check_nan=False):
assert all(len(i.op.inputs) == 0 for i in inputs), ("inputs should "
"all be "
"placeholders")
self.inputs = inputs
updates = updates or []
self.update_group = tf.group(*updates)
self.outputs_update = list(outputs) + [self.update_group]
self.givens = {} if givens is None else givens
self.check_nan = check_nan
def __call__(self, *inputvals):
assert len(inputvals) == len(self.inputs)
feed_dict = dict(zip(self.inputs, inputvals))
feed_dict.update(self.givens)
results = get_session().run(self.outputs_update,
feed_dict=feed_dict)[:-1]
if self.check_nan:
if any(np.isnan(r).any() for r in results):
raise RuntimeError("Nan detected")
return results
# Graph traversal
VARIABLES = {}
# Flat vectors
def var_shape(x):
out = [k.value for k in x.get_shape()]
assert all(isinstance(a, int) for a in out), ("shape function assumes "
"that shape is fully known")
return out
def numel(x):
return intprod(var_shape(x))
def intprod(x):
return int(np.prod(x))
def flatgrad(loss, var_list):
grads = tf.gradients(loss, var_list)
return tf.concat([tf.reshape(grad, [numel(v)], 0)
for (v, grad) in zip(var_list, grads)])
class SetFromFlat(object):
def __init__(self, var_list, dtype=tf.float32):
assigns = []
shapes = list(map(var_shape, var_list))
total_size = np.sum([intprod(shape) for shape in shapes])
self.theta = theta = tf.placeholder(dtype, [total_size])
start = 0
assigns = []
for (shape, v) in zip(shapes, var_list):
size = intprod(shape)
assigns.append(tf.assign(v, tf.reshape(theta[start:start + size],
shape)))
start += size
assert start == total_size
self.op = tf.group(*assigns)
def __call__(self, theta):
get_session().run(self.op, feed_dict={self.theta: theta})
class GetFlat(object):
def __init__(self, var_list):
self.op = tf.concat([tf.reshape(v, [numel(v)]) for v in var_list], 0)
def __call__(self):
return get_session().run(self.op)
# Misc
def scope_vars(scope, trainable_only):
"""Get variables inside a scope. The scope can be specified as a string."""
return tf.get_collection((tf.GraphKeys.TRAINABLE_VARIABLES
if trainable_only
else tf.GraphKeys.GLOBAL_VARIABLES),
scope=(scope if isinstance(scope, str)
else scope.name))
def in_session(f):
@functools.wraps(f)
def newfunc(*args, **kwargs):
with tf.Session():
f(*args, **kwargs)
return newfunc
# A mapping from name -> (placeholder, dtype, shape).
_PLACEHOLDER_CACHE = {}
def get_placeholder(name, dtype, shape):
print("calling get_placeholder", name)
if name in _PLACEHOLDER_CACHE:
out, dtype1, shape1 = _PLACEHOLDER_CACHE[name]
assert dtype1 == dtype and shape1 == shape
return out
else:
out = tf.placeholder(dtype=dtype, shape=shape, name=name)
_PLACEHOLDER_CACHE[name] = (out, dtype, shape)
return out
def get_placeholder_cached(name):
return _PLACEHOLDER_CACHE[name][0]
def flattenallbut0(x):
return tf.reshape(x, [-1, intprod(x.get_shape().as_list()[1:])])
def reset():
global _PLACEHOLDER_CACHE
global VARIABLES
_PLACEHOLDER_CACHE = {}
VARIABLES = {}
tf.reset_default_graph()
+3 -31
View File
@@ -30,10 +30,9 @@ def compute_centered_ranks(x):
def make_session(single_threaded):
if not single_threaded:
return tf.InteractiveSession()
return tf.InteractiveSession(
config=tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1))
return tf.Session()
return tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1))
def itergroups(items, group_size):
@@ -58,30 +57,3 @@ def batched_weighted_sum(weights, vecs, batch_size):
np.asarray(batch_vecs, dtype=np.float32))
num_items_summed += len(batch_weights)
return total, num_items_summed
class RunningStat(object):
def __init__(self, shape, eps):
self.sum = np.zeros(shape, dtype=np.float32)
self.sumsq = np.full(shape, eps, dtype=np.float32)
self.count = eps
def increment(self, s, ssq, c):
self.sum += s
self.sumsq += ssq
self.count += c
@property
def mean(self):
return self.sum / self.count
@property
def std(self):
return np.sqrt(np.maximum(
self.sumsq / self.count - np.square(self.mean), 1e-2))
def set_from_init(self, init_mean, init_std, init_count):
self.sum[:] = init_mean * init_count
self.sumsq[:] = (np.square(init_mean) +
np.square(init_std)) * init_count
self.count = init_count
-43
View File
@@ -1,43 +0,0 @@
# Code in this file is copied and adapted from
# https://github.com/openai/evolution-strategies-starter.
import click
@click.command()
@click.argument("env_id")
@click.argument("policy_file")
@click.option("--record", is_flag=True)
@click.option("--stochastic", is_flag=True)
@click.option("--extra_kwargs")
def main(env_id, policy_file, record, stochastic, extra_kwargs):
import gym
from gym import wrappers
import tensorflow as tf
from policies import MujocoPolicy
import numpy as np
env = gym.make(env_id)
if record:
import uuid
env = wrappers.Monitor(env, "/tmp/" + str(uuid.uuid4()), force=True)
if extra_kwargs:
import json
extra_kwargs = json.loads(extra_kwargs)
with tf.Session():
pi = MujocoPolicy.Load(policy_file, extra_kwargs=extra_kwargs)
while True:
rews, t = pi.rollout(env, render=True,
random_stream=(np.random if stochastic
else None))
print("return={:.4f} len={}".format(rews.sum(), t))
if record:
env.close()
return
if __name__ == "__main__":
main()
@@ -6,7 +6,6 @@ from __future__ import print_function
import numpy as np
import ray
import random
from ray.rllib.agent import get_agent_class
@@ -21,13 +20,13 @@ def get_mean_action(alg, obs):
ray.init()
CONFIGS = {
"ES": {"episodes_per_batch": 10, "timesteps_per_batch": 100},
"DQN": {},
"PPO": {"num_sgd_iter": 5, "timesteps_per_batch": 1000},
"A3C": {"use_lstm": False},
}
# https://github.com/ray-project/ray/issues/1062 for enabling ES test as well
for name in ["DQN", "PPO", "A3C"]:
for name in ["ES", "DQN", "PPO", "A3C"]:
cls = get_agent_class(name)
alg1 = cls("CartPole-v0", CONFIGS[name])
alg2 = cls("CartPole-v0", CONFIGS[name])
@@ -40,8 +39,7 @@ for name in ["DQN", "PPO", "A3C"]:
alg2.restore(alg1.save())
for _ in range(10):
obs = [
random.random(), random.random(), random.random(), random.random()]
obs = np.random.uniform(size=4)
a1 = get_mean_action(alg1, obs)
a2 = get_mean_action(alg2, obs)
print("Checking computed actions", alg1, obs, a1, a2)