From 0eae9177669aae82db948faf2e86653880642844 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 16 Nov 2017 21:58:30 -0800 Subject: [PATCH] [rllib] Clean up evolution strategies example. (#1225) * Remove ES observation statistics. * Consolidate policy classes. * Remove random stream. * Move rollout function out of policy. * Consolidate policy initialization. * Replace act implementation with sess.run. * Remove tf_utils. * Remove variable scope. * Remove unused imports. * Use regular TF session. * Use MeanStdFilter. * Minor. * Clarify naming. * Update documentation. * eps -> episodes * Report noiseless evaluation runs. * Clean up naming. * Update documentation. * Fix some bugs. * Make it run on atari. * Don't add action noise during evaluation runs. * Add ES to checkpoint/restore test. * Small cleanups and remove redundant calls to get_weights. * Remove outdated comment. --- doc/source/example-evolution-strategies.rst | 18 +- python/ray/rllib/es/es.py | 280 ++++++++--------- python/ray/rllib/es/optimizers.py | 5 +- python/ray/rllib/es/policies.py | 235 ++++---------- python/ray/rllib/es/tf_util.py | 292 ------------------ python/ray/rllib/es/utils.py | 34 +- python/ray/rllib/es/viz.py | 43 --- .../ray/rllib/test/test_checkpoint_restore.py | 8 +- test/jenkins_tests/run_multi_node_tests.sh | 7 + 9 files changed, 220 insertions(+), 702 deletions(-) delete mode 100644 python/ray/rllib/es/tf_util.py delete mode 100644 python/ray/rllib/es/viz.py diff --git a/doc/source/example-evolution-strategies.rst b/doc/source/example-evolution-strategies.rst index 25dfb515a..e2e2fd113 100644 --- a/doc/source/example-evolution-strategies.rst +++ b/doc/source/example-evolution-strategies.rst @@ -20,6 +20,16 @@ on the ``Humanoid-v1`` gym environment. python/ray/rllib/train.py --env=Humanoid-v1 --alg=ES +To train a policy on a cluster (e.g., using 900 workers), run the following. + +.. code-block:: bash + + python ray/python/ray/rllib/train.py \ + --env=Humanoid-v1 \ + --alg=ES \ + --redis-address= \ + --config='{"num_workers": 900, "episodes_per_batch": 10000, "timesteps_per_batch": 100000}' + At the heart of this example, we define a ``Worker`` class. These workers have a method ``do_rollouts``, which will be used to perform simulate randomly perturbed policies in a given environment. @@ -34,14 +44,12 @@ perturbed policies in a given environment. # Details omitted. def do_rollouts(self, params): - # Set the network weights. - self.policy.set_trainable_flat(params) perturbation = # Generate a random perturbation to the policy. - self.policy.set_trainable_flat(params + perturbation) + self.policy.set_weights(params + perturbation) # Do rollout with the perturbed policy. - self.policy.set_trainable_flat(params - perturbation) + self.policy.set_weights(params - perturbation) # Do rollout with the perturbed policy. # Return the rewards. @@ -60,7 +68,7 @@ and use the rewards from the rollouts to update the policy. while True: # Get the current policy weights. - theta = policy.get_trainable_flat() + theta = policy.get_weights() # Put the current policy weights in the object store. theta_id = ray.put(theta) # Use the actors to do rollouts, note that we pass in the ID of the policy diff --git a/python/ray/rllib/es/es.py b/python/ray/rllib/es/es.py index a25f89f7a..68f01b509 100644 --- a/python/ray/rllib/es/es.py +++ b/python/ray/rllib/es/es.py @@ -18,29 +18,26 @@ from ray.rllib.models import ModelCatalog from ray.rllib.es import optimizers from ray.rllib.es import policies from ray.rllib.es import tabular_logger as tlogger -from ray.rllib.es import tf_util from ray.rllib.es import utils from ray.tune.result import TrainingResult Result = namedtuple("Result", [ - "noise_inds_n", "returns_n2", "sign_returns_n2", "lengths_n2", - "eval_return", "eval_length", "ob_sum", "ob_sumsq", "ob_count" + "noise_indices", "noisy_returns", "sign_noisy_returns", "noisy_lengths", + "eval_returns", "eval_lengths" ]) DEFAULT_CONFIG = dict( - l2coeff=0.005, + l2_coeff=0.005, noise_stdev=0.02, episodes_per_batch=1000, timesteps_per_batch=10000, - calc_obstat_prob=0.01, - eval_prob=0, - snapshot_freq=0, + eval_prob=0.003, return_proc_mode="centered_rank", - episode_cutoff_mode="env_default", num_workers=10, - stepsize=.01) + stepsize=0.01, + observation_filter="MeanStdFilter") @ray.remote @@ -60,8 +57,8 @@ class SharedNoiseTable(object): def get(self, i, dim): return self.noise[i:i + dim] - def sample_index(self, stream, dim): - return stream.randint(0, len(self.noise) - dim + 1) + def sample_index(self, dim): + return np.random.randint(0, len(self.noise) - dim + 1) @ray.remote @@ -77,82 +74,63 @@ class Worker(object): self.preprocessor = ModelCatalog.get_preprocessor(self.env) self.sess = utils.make_session(single_threaded=True) - self.policy = policies.GenericPolicy( - self.env.observation_space, self.env.action_space, - self.preprocessor, **policy_params) - tf_util.initialize() + self.policy = policies.GenericPolicy(self.sess, self.env.action_space, + self.preprocessor, + config["observation_filter"], + **policy_params) - self.rs = np.random.RandomState() + def rollout(self, timestep_limit, add_noise=True): + rollout_rewards, rollout_length = policies.rollout( + self.policy, self.env, timestep_limit=timestep_limit, + add_noise=add_noise) + return rollout_rewards, rollout_length - assert ( - self.policy.needs_ob_stat == - (self.config["calc_obstat_prob"] != 0)) - - def rollout_and_update_ob_stat(self, timestep_limit, task_ob_stat): - if (self.policy.needs_ob_stat and - self.config["calc_obstat_prob"] != 0 and - self.rs.rand() < self.config["calc_obstat_prob"]): - rollout_rews, rollout_len, obs = self.policy.rollout( - self.env, self.preprocessor, timestep_limit=timestep_limit, - save_obs=True, random_stream=self.rs) - task_ob_stat.increment(obs.sum(axis=0), np.square(obs).sum(axis=0), - len(obs)) - else: - rollout_rews, rollout_len = self.policy.rollout( - self.env, self.preprocessor, timestep_limit=timestep_limit, - random_stream=self.rs) - return rollout_rews, rollout_len - - def do_rollouts(self, params, ob_mean, ob_std, timestep_limit=None): + def do_rollouts(self, params, timestep_limit=None): # Set the network weights. - self.policy.set_trainable_flat(params) + self.policy.set_weights(params) - if self.policy.needs_ob_stat: - self.policy.set_ob_stat(ob_mean, ob_std) - - if self.config["eval_prob"] != 0: - raise NotImplementedError("Eval rollouts are not implemented.") - - noise_inds, returns, sign_returns, lengths = [], [], [], [] - # We set eps=0 because we're incrementing only. - task_ob_stat = utils.RunningStat(self.preprocessor.shape, eps=0) + noise_indices, returns, sign_returns, lengths = [], [], [], [] + eval_returns, eval_lengths = [], [] # Perform some rollouts with noise. task_tstart = time.time() - while (len(noise_inds) == 0 or + while (len(noise_indices) == 0 or time.time() - task_tstart < self.min_task_runtime): - noise_idx = self.noise.sample_index( - self.rs, self.policy.num_params) - perturbation = self.config["noise_stdev"] * self.noise.get( - noise_idx, self.policy.num_params) - # These two sampling steps could be done in parallel on different - # actors letting us update twice as frequently. - self.policy.set_trainable_flat(params + perturbation) - rews_pos, len_pos = self.rollout_and_update_ob_stat(timestep_limit, - task_ob_stat) + if np.random.uniform() < self.config["eval_prob"]: + # Do an evaluation run with no perturbation. + self.policy.set_weights(params) + rewards, length = self.rollout(timestep_limit, add_noise=False) + eval_returns.append(rewards.sum()) + eval_lengths.append(length) + else: + # Do a regular run with parameter perturbations. + noise_index = self.noise.sample_index(self.policy.num_params) - self.policy.set_trainable_flat(params - perturbation) - rews_neg, len_neg = self.rollout_and_update_ob_stat(timestep_limit, - task_ob_stat) + perturbation = self.config["noise_stdev"] * self.noise.get( + noise_index, self.policy.num_params) - noise_inds.append(noise_idx) - returns.append([rews_pos.sum(), rews_neg.sum()]) - sign_returns.append( - [np.sign(rews_pos).sum(), np.sign(rews_neg).sum()]) - lengths.append([len_pos, len_neg]) + # These two sampling steps could be done in parallel on + # different actors letting us update twice as frequently. + self.policy.set_weights(params + perturbation) + rewards_pos, lengths_pos = self.rollout(timestep_limit) + + self.policy.set_weights(params - perturbation) + rewards_neg, lengths_neg = self.rollout(timestep_limit) + + noise_indices.append(noise_index) + returns.append([rewards_pos.sum(), rewards_neg.sum()]) + sign_returns.append( + [np.sign(rewards_pos).sum(), np.sign(rewards_neg).sum()]) + lengths.append([lengths_pos, lengths_neg]) return Result( - noise_inds_n=np.array(noise_inds), - returns_n2=np.array(returns, dtype=np.float32), - sign_returns_n2=np.array(sign_returns, dtype=np.float32), - lengths_n2=np.array(lengths, dtype=np.int32), - eval_return=None, - eval_length=None, - ob_sum=(None if task_ob_stat.count == 0 else task_ob_stat.sum), - ob_sumsq=(None if task_ob_stat.count == 0 - else task_ob_stat.sumsq), - ob_count=task_ob_stat.count) + noise_indices=noise_indices, + noisy_returns=returns, + sign_noisy_returns=sign_returns, + noisy_lengths=lengths, + eval_returns=eval_returns, + eval_lengths=eval_lengths) class ESAgent(Agent): @@ -160,9 +138,8 @@ class ESAgent(Agent): _default_config = DEFAULT_CONFIG def _init(self): - policy_params = { - "ac_noise_std": 0.01 + "action_noise_std": 0.01 } env = self.env_creator() @@ -170,11 +147,9 @@ class ESAgent(Agent): self.sess = utils.make_session(single_threaded=False) self.policy = policies.GenericPolicy( - env.observation_space, env.action_space, preprocessor, - **policy_params) - tf_util.initialize() + self.sess, env.action_space, preprocessor, + self.config["observation_filter"], **policy_params) self.optimizer = optimizers.Adam(self.policy, self.config["stepsize"]) - self.ob_stat = utils.RunningStat(preprocessor.shape, eps=1e-2) # Create the shared noise table. print("Creating shared noise table.") @@ -192,132 +167,133 @@ class ESAgent(Agent): self.timesteps_so_far = 0 self.tstart = time.time() - def _collect_results(self, theta_id, min_eps, min_timesteps): - num_eps, num_timesteps = 0, 0 + def _collect_results(self, theta_id, min_episodes, min_timesteps): + num_episodes, num_timesteps = 0, 0 results = [] - while num_eps < min_eps or num_timesteps < min_timesteps: + while num_episodes < min_episodes or num_timesteps < min_timesteps: print( "Collected {} episodes {} timesteps so far this iter".format( - num_eps, num_timesteps)) - rollout_ids = [worker.do_rollouts.remote( - theta_id, - self.ob_stat.mean if self.policy.needs_ob_stat else None, - self.ob_stat.std if self.policy.needs_ob_stat else None) - for worker in self.workers] + num_episodes, num_timesteps)) + rollout_ids = [worker.do_rollouts.remote(theta_id) + for worker in self.workers] # Get the results of the rollouts. for result in ray.get(rollout_ids): results.append(result) - num_eps += result.lengths_n2.size - num_timesteps += result.lengths_n2.sum() - return results + # Update the number of episodes and the number of timesteps + # keeping in mind that result.noisy_lengths is a list of lists, + # where the inner lists have length 2. + num_episodes += sum([len(pair) for pair + in result.noisy_lengths]) + num_timesteps += sum([sum(pair) for pair + in result.noisy_lengths]) + return results, num_episodes, num_timesteps def _train(self): config = self.config step_tstart = time.time() - theta = self.policy.get_trainable_flat() + theta = self.policy.get_weights() assert theta.dtype == np.float32 # Put the current policy weights in the object store. theta_id = ray.put(theta) # Use the actors to do rollouts, note that we pass in the ID of the # policy weights. - results = self._collect_results( + results, num_episodes, num_timesteps = self._collect_results( theta_id, config["episodes_per_batch"], config["timesteps_per_batch"]) - curr_task_results = [] - ob_count_this_batch = 0 - # Loop over the results + all_noise_indices = [] + all_training_returns = [] + all_training_lengths = [] + all_eval_returns = [] + all_eval_lengths = [] + + # Loop over the results. for result in results: - assert result.eval_length is None, "We aren't doing eval rollouts." - assert result.noise_inds_n.ndim == 1 - assert result.returns_n2.shape == (len(result.noise_inds_n), 2) - assert result.lengths_n2.shape == (len(result.noise_inds_n), 2) - assert result.returns_n2.dtype == np.float32 + all_eval_returns += result.eval_returns + all_eval_lengths += result.eval_lengths - result_num_eps = result.lengths_n2.size - result_num_timesteps = result.lengths_n2.sum() - self.episodes_so_far += result_num_eps - self.timesteps_so_far += result_num_timesteps + all_noise_indices += result.noise_indices + all_training_returns += result.noisy_returns + all_training_lengths += result.noisy_lengths - curr_task_results.append(result) - # Update ob stats. - if self.policy.needs_ob_stat and result.ob_count > 0: - self.ob_stat.increment( - result.ob_sum, result.ob_sumsq, result.ob_count) - ob_count_this_batch += result.ob_count + assert len(all_eval_returns) == len(all_eval_lengths) + assert (len(all_noise_indices) == len(all_training_returns) == + len(all_training_lengths)) + + self.episodes_so_far += num_episodes + self.timesteps_so_far += num_timesteps # Assemble the results. - noise_inds_n = np.concatenate( - [r.noise_inds_n for r in curr_task_results]) - returns_n2 = np.concatenate([r.returns_n2 for r in curr_task_results]) - lengths_n2 = np.concatenate([r.lengths_n2 for r in curr_task_results]) - assert (noise_inds_n.shape[0] == returns_n2.shape[0] == - lengths_n2.shape[0]) + eval_returns = np.array(all_eval_returns) + eval_lengths = np.array(all_eval_lengths) + noise_indices = np.array(all_noise_indices) + noisy_returns = np.array(all_training_returns) + noisy_lengths = np.array(all_training_lengths) + # Process the returns. if config["return_proc_mode"] == "centered_rank": - proc_returns_n2 = utils.compute_centered_ranks(returns_n2) + proc_noisy_returns = utils.compute_centered_ranks(noisy_returns) else: raise NotImplementedError(config["return_proc_mode"]) # Compute and take a step. g, count = utils.batched_weighted_sum( - proc_returns_n2[:, 0] - proc_returns_n2[:, 1], - (self.noise.get(idx, self.policy.num_params) - for idx in noise_inds_n), + proc_noisy_returns[:, 0] - proc_noisy_returns[:, 1], + (self.noise.get(index, self.policy.num_params) + for index in noise_indices), batch_size=500) - g /= returns_n2.size + g /= noisy_returns.size assert ( g.shape == (self.policy.num_params,) and g.dtype == np.float32 and - count == len(noise_inds_n)) - update_ratio = self.optimizer.update(-g + config["l2coeff"] * theta) - - # Update ob stat (we're never running the policy in the master, but we - # might be snapshotting the policy). - if self.policy.needs_ob_stat: - self.policy.set_ob_stat(self.ob_stat.mean, self.ob_stat.std) + count == len(noise_indices)) + # Compute the new weights theta. + theta, update_ratio = self.optimizer.update( + -g + config["l2_coeff"] * theta) + # Set the new weights in the local copy of the policy. + self.policy.set_weights(theta) step_tend = time.time() - tlogger.record_tabular("EpRewMean", returns_n2.mean()) - tlogger.record_tabular("EpRewStd", returns_n2.std()) - tlogger.record_tabular("EpLenMean", lengths_n2.mean()) + tlogger.record_tabular("EvalEpRewMean", eval_returns.mean()) + tlogger.record_tabular("EvalEpRewStd", eval_returns.std()) + tlogger.record_tabular("EvalEpLenMean", eval_lengths.mean()) - tlogger.record_tabular( - "Norm", float(np.square(self.policy.get_trainable_flat()).sum())) + tlogger.record_tabular("EpRewMean", noisy_returns.mean()) + tlogger.record_tabular("EpRewStd", noisy_returns.std()) + tlogger.record_tabular("EpLenMean", noisy_lengths.mean()) + + tlogger.record_tabular("Norm", float(np.square(theta).sum())) tlogger.record_tabular("GradNorm", float(np.square(g).sum())) tlogger.record_tabular("UpdateRatio", float(update_ratio)) - tlogger.record_tabular("EpisodesThisIter", lengths_n2.size) + tlogger.record_tabular("EpisodesThisIter", noisy_lengths.size) tlogger.record_tabular("EpisodesSoFar", self.episodes_so_far) - tlogger.record_tabular("TimestepsThisIter", lengths_n2.sum()) + tlogger.record_tabular("TimestepsThisIter", noisy_lengths.sum()) tlogger.record_tabular("TimestepsSoFar", self.timesteps_so_far) - tlogger.record_tabular("ObCount", ob_count_this_batch) - tlogger.record_tabular("TimeElapsedThisIter", step_tend - step_tstart) tlogger.record_tabular("TimeElapsed", step_tend - self.tstart) tlogger.dump_tabular() info = { - "weights_norm": np.square(self.policy.get_trainable_flat()).sum(), + "weights_norm": np.square(theta).sum(), "grad_norm": np.square(g).sum(), "update_ratio": update_ratio, - "episodes_this_iter": lengths_n2.size, + "episodes_this_iter": noisy_lengths.size, "episodes_so_far": self.episodes_so_far, - "timesteps_this_iter": lengths_n2.sum(), + "timesteps_this_iter": noisy_lengths.sum(), "timesteps_so_far": self.timesteps_so_far, - "ob_count": ob_count_this_batch, "time_elapsed_this_iter": step_tend - step_tstart, "time_elapsed": step_tend - self.tstart } result = TrainingResult( - episode_reward_mean=returns_n2.mean(), - episode_len_mean=lengths_n2.mean(), - timesteps_this_iter=lengths_n2.sum(), + episode_reward_mean=eval_returns.mean(), + episode_len_mean=eval_lengths.mean(), + timesteps_this_iter=noisy_lengths.sum(), info=info) return result @@ -325,10 +301,9 @@ class ESAgent(Agent): def _save(self): checkpoint_path = os.path.join( self.logdir, "checkpoint-{}".format(self.iteration)) - weights = self.policy.get_trainable_flat() + weights = self.policy.get_weights() objects = [ weights, - self.ob_stat, self.episodes_so_far, self.timesteps_so_far] pickle.dump(objects, open(checkpoint_path, "wb")) @@ -336,10 +311,9 @@ class ESAgent(Agent): def _restore(self, checkpoint_path): objects = pickle.load(open(checkpoint_path, "rb")) - self.policy.set_trainable_flat(objects[0]) - self.ob_stat = objects[1] - self.episodes_so_far = objects[2] - self.timesteps_so_far = objects[3] + self.policy.set_weights(objects[0]) + self.episodes_so_far = objects[1] + self.timesteps_so_far = objects[2] def compute_action(self, observation): - return self.policy.act([observation])[0] + return self.policy.compute(observation, update=False)[0] diff --git a/python/ray/rllib/es/optimizers.py b/python/ray/rllib/es/optimizers.py index 66a0ca69d..f5ef4e109 100644 --- a/python/ray/rllib/es/optimizers.py +++ b/python/ray/rllib/es/optimizers.py @@ -17,10 +17,9 @@ class Optimizer(object): def update(self, globalg): self.t += 1 step = self._compute_step(globalg) - theta = self.pi.get_trainable_flat() + theta = self.pi.get_weights() ratio = np.linalg.norm(step) / np.linalg.norm(theta) - self.pi.set_trainable_flat(theta + step) - return ratio + return theta + step, ratio def _compute_step(self, globalg): raise NotImplementedError diff --git a/python/ray/rllib/es/policies.py b/python/ray/rllib/es/policies.py index 4d445937c..c07f73248 100644 --- a/python/ray/rllib/es/policies.py +++ b/python/ray/rllib/es/policies.py @@ -5,191 +5,86 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import logging -import pickle - -import gym.spaces -import h5py +import gym import numpy as np import tensorflow as tf -from ray.rllib.es import tf_util as U +import ray from ray.rllib.models import ModelCatalog - -logger = logging.getLogger(__name__) +# TODO(rkn): Move these filters out of PPO to somewhere common. +from ray.rllib.ppo.filter import NoFilter, MeanStdFilter -class Policy: - def __init__(self, *args, **kwargs): - self.args, self.kwargs = args, kwargs - self.scope = self._initialize(*args, **kwargs) - self.all_variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, - self.scope.name) +def rollout(policy, env, timestep_limit=None, add_noise=False): + """Do a rollout. - self.trainable_variables = tf.get_collection( - tf.GraphKeys.TRAINABLE_VARIABLES, self.scope.name) - self.num_params = sum(int(np.prod(v.get_shape().as_list())) - for v in self.trainable_variables) - self._setfromflat = U.SetFromFlat(self.trainable_variables) - self._getflat = U.GetFlat(self.trainable_variables) - - logger.info('Trainable variables ({} parameters)' - .format(self.num_params)) - for v in self.trainable_variables: - shp = v.get_shape().as_list() - logger.info('- {} shape:{} size:{}'.format(v.name, shp, - np.prod(shp))) - logger.info('All variables') - for v in self.all_variables: - shp = v.get_shape().as_list() - logger.info('- {} shape:{} size:{}'.format(v.name, shp, - np.prod(shp))) - - placeholders = [tf.placeholder(v.value().dtype, - v.get_shape().as_list()) - for v in self.all_variables] - self.set_all_vars = U.function( - inputs=placeholders, - outputs=[], - updates=[tf.group(*[v.assign(p) for v, p - in zip(self.all_variables, placeholders)])] - ) - - def _initialize(self, *args, **kwargs): - raise NotImplementedError - - def save(self, filename): - assert filename.endswith('.h5') - with h5py.File(filename, 'w') as f: - for v in self.all_variables: - f[v.name] = v.eval() - # TODO: It would be nice to avoid pickle, but it's convenient to - # pass Python objects to _initialize (like Gym spaces or numpy - # arrays). - f.attrs['name'] = type(self).__name__ - f.attrs['args_and_kwargs'] = np.void(pickle.dumps((self.args, - self.kwargs), - protocol=-1)) - - @classmethod - def Load(cls, filename, extra_kwargs=None): - with h5py.File(filename, 'r') as f: - args, kwargs = pickle.loads(f.attrs['args_and_kwargs'].tostring()) - if extra_kwargs: - kwargs.update(extra_kwargs) - policy = cls(*args, **kwargs) - policy.set_all_vars(*[f[v.name][...] - for v in policy.all_variables]) - return policy - - # === Rollouts/training === - - def rollout(self, env, preprocessor, render=False, timestep_limit=None, - save_obs=False, random_stream=None): - """Do a rollout. - - If random_stream is provided, the rollout will take noisy actions with - noise drawn from that stream. Otherwise, no action noise will be added. - """ - env_timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit" - ".max_episode_steps") - timestep_limit = (env_timestep_limit if timestep_limit is None - else min(timestep_limit, env_timestep_limit)) - rews = [] - t = 0 - if save_obs: - obs = [] - ob = preprocessor.transform(env.reset()) - for _ in range(timestep_limit): - ac = self.act(ob[None], random_stream=random_stream)[0] - if save_obs: - obs.append(ob) - ob, rew, done, _ = env.step(ac) - ob = preprocessor.transform(ob) - rews.append(rew) - t += 1 - if render: - env.render() - if done: - break - rews = np.array(rews, dtype=np.float32) - if save_obs: - return rews, t, np.array(obs) - return rews, t - - def act(self, ob, random_stream=None): - raise NotImplementedError - - def set_trainable_flat(self, x): - self._setfromflat(x) - - def get_trainable_flat(self): - return self._getflat() - - @property - def needs_ob_stat(self): - raise NotImplementedError - - def set_ob_stat(self, ob_mean, ob_std): - raise NotImplementedError + If add_noise is True, the rollout will take noisy actions with + noise drawn from that stream. Otherwise, no action noise will be added. + """ + env_timestep_limit = env.spec.tags.get("wrapper_config.TimeLimit" + ".max_episode_steps") + timestep_limit = (env_timestep_limit if timestep_limit is None + else min(timestep_limit, env_timestep_limit)) + rews = [] + t = 0 + observation = env.reset() + for _ in range(timestep_limit): + ac = policy.compute(observation, add_noise=add_noise)[0] + observation, rew, done, _ = env.step(ac) + rews.append(rew) + t += 1 + if done: + break + rews = np.array(rews, dtype=np.float32) + return rews, t -def bins(x, dim, num_bins, name): - scores = U.dense(x, dim * num_bins, name, U.normc_initializer(0.01)) - scores_nab = tf.reshape(scores, [-1, dim, num_bins]) - return tf.argmax(scores_nab, 2) - - -class GenericPolicy(Policy): - def _initialize(self, ob_space, ac_space, preprocessor, ac_noise_std): - self.ac_space = ac_space - self.ac_noise_std = ac_noise_std +class GenericPolicy(object): + def __init__(self, sess, action_space, preprocessor, + observation_filter, action_noise_std): + self.sess = sess + self.action_space = action_space + self.action_noise_std = action_noise_std self.preprocessor = preprocessor - with tf.variable_scope(type(self).__name__) as scope: - # Observation normalization. - ob_mean = tf.get_variable( - 'ob_mean', self.preprocessor.shape, tf.float32, - tf.constant_initializer(np.nan), trainable=False) - ob_std = tf.get_variable( - 'ob_std', self.preprocessor.shape, tf.float32, - tf.constant_initializer(np.nan), trainable=False) - in_mean = tf.placeholder(tf.float32, self.preprocessor.shape) - in_std = tf.placeholder(tf.float32, self.preprocessor.shape) - self._set_ob_mean_std = U.function([in_mean, in_std], [], updates=[ - tf.assign(ob_mean, in_mean), - tf.assign(ob_std, in_std), - ]) + if observation_filter == "MeanStdFilter": + self.observation_filter = MeanStdFilter( + self.preprocessor.shape, clip=None) + elif observation_filter == "NoFilter": + self.observation_filter = NoFilter() + else: + raise Exception("Unknown observation_filter: " + + str("observation_filter")) - inputs = tf.placeholder( - tf.float32, [None] + list(self.preprocessor.shape)) + self.inputs = tf.placeholder( + tf.float32, [None] + list(self.preprocessor.shape)) - # TODO(ekl): we should do clipping in a standard RLlib preprocessor - clipped_inputs = tf.clip_by_value( - (inputs - ob_mean) / ob_std, -5.0, 5.0) + # Policy network. + dist_class, dist_dim = ModelCatalog.get_action_dist( + self.action_space, dist_type="deterministic") + model = ModelCatalog.get_model(self.inputs, dist_dim) + dist = dist_class(model.outputs) + self.sampler = dist.sample() - # Policy network. - dist_class, dist_dim = ModelCatalog.get_action_dist( - self.ac_space, dist_type='deterministic') - model = ModelCatalog.get_model(clipped_inputs, dist_dim) - dist = dist_class(model.outputs) - self._act = U.function([inputs], dist.sample()) - return scope + self.variables = ray.experimental.TensorFlowVariables( + model.outputs, self.sess) - def act(self, ob, random_stream=None): - a = self._act(ob) - if not isinstance(self.ac_space, gym.spaces.Discrete) and \ - random_stream is not None and self.ac_noise_std != 0: - a += random_stream.randn(*a.shape) * self.ac_noise_std - return a + self.num_params = sum([np.prod(variable.shape.as_list()) + for _, variable + in self.variables.variables.items()]) + self.sess.run(tf.global_variables_initializer()) - @property - def needs_ob_stat(self): - return True + def compute(self, observation, add_noise=False, update=True): + observation = self.preprocessor.transform(observation) + observation = self.observation_filter(observation[None], update=update) + action = self.sess.run(self.sampler, + feed_dict={self.inputs: observation}) + if add_noise and isinstance(self.action_space, gym.spaces.Box): + action += np.random.randn(*action.shape) * self.action_noise_std + return action - @property - def needs_ref_batch(self): - return False + def set_weights(self, x): + self.variables.set_flat(x) - def set_ob_stat(self, ob_mean, ob_std): - self._set_ob_mean_std(ob_mean, ob_std) + def get_weights(self): + return self.variables.get_flat() diff --git a/python/ray/rllib/es/tf_util.py b/python/ray/rllib/es/tf_util.py deleted file mode 100644 index 5a7cc63fb..000000000 --- a/python/ray/rllib/es/tf_util.py +++ /dev/null @@ -1,292 +0,0 @@ -# Code in this file is copied and adapted from -# https://github.com/openai/evolution-strategies-starter. - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np -import tensorflow as tf -import functools -import os - -# Tensorflow must be at least version 1.0.0 for the example to work. -if int(tf.__version__.split(".")[0]) < 1: - raise Exception("Your Tensorflow version is less than 1.0.0. Please " - "update Tensorflow to the latest version.") - -# ================================================================ -# Import all names into common namespace -# ================================================================ - -clip = tf.clip_by_value - -# Make consistent with numpy - - -def sum(x, axis=None, keepdims=False): - return tf.reduce_sum(x, reduction_indices=None if axis is None else [axis], - keep_dims=keepdims) - - -def mean(x, axis=None, keepdims=False): - return tf.reduce_mean(x, reduction_indices=(None if axis is None - else [axis]), - keep_dims=keepdims) - - -def var(x, axis=None, keepdims=False): - meanx = mean(x, axis=axis, keepdims=keepdims) - return mean(tf.square(x - meanx), axis=axis, keepdims=keepdims) - - -def std(x, axis=None, keepdims=False): - return tf.sqrt(var(x, axis=axis, keepdims=keepdims)) - - -def max(x, axis=None, keepdims=False): - return tf.reduce_max(x, reduction_indices=None if axis is None else [axis], - keep_dims=keepdims) - - -def min(x, axis=None, keepdims=False): - return tf.reduce_min(x, reduction_indices=None if axis is None else [axis], - keep_dims=keepdims) - - -def concatenate(arrs, axis=0): - return tf.concat(arrs, axis) - - -def argmax(x, axis=None): - return tf.argmax(x, dimension=axis) - -# Extras - - -def l2loss(params): - if len(params) == 0: - return tf.constant(0.0) - else: - return tf.add_n([sum(tf.square(p)) for p in params]) - - -def lrelu(x, leak=0.2): - f1 = 0.5 * (1 + leak) - f2 = 0.5 * (1 - leak) - return f1 * x + f2 * abs(x) - - -def categorical_sample_logits(X): - # https://github.com/tensorflow/tensorflow/issues/456 - U = tf.random_uniform(tf.shape(X)) - return argmax(X - tf.log(-tf.log(U)), axis=1) - -# Global session - - -def get_session(): - return tf.get_default_session() - - -def single_threaded_session(): - tf_config = tf.ConfigProto(inter_op_parallelism_threads=1, - intra_op_parallelism_threads=1) - return tf.Session(config=tf_config) - - -ALREADY_INITIALIZED = set() - - -def initialize(): - new_variables = set(tf.global_variables()) - ALREADY_INITIALIZED - get_session().run(tf.variables_initializer(new_variables)) - ALREADY_INITIALIZED.update(new_variables) - - -def eval(expr, feed_dict=None): - if feed_dict is None: - feed_dict = {} - return get_session().run(expr, feed_dict=feed_dict) - - -def set_value(v, val): - get_session().run(v.assign(val)) - - -def load_state(fname): - saver = tf.train.Saver() - saver.restore(get_session(), fname) - - -def save_state(fname): - os.makedirs(os.path.dirname(fname), exist_ok=True) - saver = tf.train.Saver() - saver.save(get_session(), fname) - -# Model components - - -def normc_initializer(std=1.0): - def _initializer(shape, dtype=None, partition_info=None): - out = np.random.randn(*shape).astype(np.float32) - out *= std / np.sqrt(np.square(out).sum(axis=0, keepdims=True)) - return tf.constant(out) - return _initializer - - -def dense(x, size, name, weight_init=None, bias=True): - w = tf.get_variable(name + "/w", [x.get_shape()[1], size], - initializer=weight_init) - ret = tf.matmul(x, w) - if bias: - b = tf.get_variable(name + "/b", [size], - initializer=tf.zeros_initializer()) - return ret + b - else: - return ret - -# Basic Stuff - - -def function(inputs, outputs, updates=None, givens=None): - if isinstance(outputs, list): - return _Function(inputs, outputs, updates, givens=givens) - elif isinstance(outputs, dict): - f = _Function(inputs, outputs.values(), updates, givens=givens) - return lambda *inputs: dict(zip(outputs.keys(), f(*inputs))) - else: - f = _Function(inputs, [outputs], updates, givens=givens) - return lambda *inputs: f(*inputs)[0] - - -class _Function(object): - def __init__(self, inputs, outputs, updates, givens, check_nan=False): - assert all(len(i.op.inputs) == 0 for i in inputs), ("inputs should " - "all be " - "placeholders") - self.inputs = inputs - updates = updates or [] - self.update_group = tf.group(*updates) - self.outputs_update = list(outputs) + [self.update_group] - self.givens = {} if givens is None else givens - self.check_nan = check_nan - - def __call__(self, *inputvals): - assert len(inputvals) == len(self.inputs) - feed_dict = dict(zip(self.inputs, inputvals)) - feed_dict.update(self.givens) - results = get_session().run(self.outputs_update, - feed_dict=feed_dict)[:-1] - if self.check_nan: - if any(np.isnan(r).any() for r in results): - raise RuntimeError("Nan detected") - return results - - -# Graph traversal - -VARIABLES = {} - -# Flat vectors - - -def var_shape(x): - out = [k.value for k in x.get_shape()] - assert all(isinstance(a, int) for a in out), ("shape function assumes " - "that shape is fully known") - return out - - -def numel(x): - return intprod(var_shape(x)) - - -def intprod(x): - return int(np.prod(x)) - - -def flatgrad(loss, var_list): - grads = tf.gradients(loss, var_list) - return tf.concat([tf.reshape(grad, [numel(v)], 0) - for (v, grad) in zip(var_list, grads)]) - - -class SetFromFlat(object): - def __init__(self, var_list, dtype=tf.float32): - assigns = [] - shapes = list(map(var_shape, var_list)) - total_size = np.sum([intprod(shape) for shape in shapes]) - - self.theta = theta = tf.placeholder(dtype, [total_size]) - start = 0 - assigns = [] - for (shape, v) in zip(shapes, var_list): - size = intprod(shape) - assigns.append(tf.assign(v, tf.reshape(theta[start:start + size], - shape))) - start += size - assert start == total_size - self.op = tf.group(*assigns) - - def __call__(self, theta): - get_session().run(self.op, feed_dict={self.theta: theta}) - - -class GetFlat(object): - def __init__(self, var_list): - self.op = tf.concat([tf.reshape(v, [numel(v)]) for v in var_list], 0) - - def __call__(self): - return get_session().run(self.op) - -# Misc - - -def scope_vars(scope, trainable_only): - """Get variables inside a scope. The scope can be specified as a string.""" - return tf.get_collection((tf.GraphKeys.TRAINABLE_VARIABLES - if trainable_only - else tf.GraphKeys.GLOBAL_VARIABLES), - scope=(scope if isinstance(scope, str) - else scope.name)) - - -def in_session(f): - @functools.wraps(f) - def newfunc(*args, **kwargs): - with tf.Session(): - f(*args, **kwargs) - return newfunc - - -# A mapping from name -> (placeholder, dtype, shape). -_PLACEHOLDER_CACHE = {} - - -def get_placeholder(name, dtype, shape): - print("calling get_placeholder", name) - if name in _PLACEHOLDER_CACHE: - out, dtype1, shape1 = _PLACEHOLDER_CACHE[name] - assert dtype1 == dtype and shape1 == shape - return out - else: - out = tf.placeholder(dtype=dtype, shape=shape, name=name) - _PLACEHOLDER_CACHE[name] = (out, dtype, shape) - return out - - -def get_placeholder_cached(name): - return _PLACEHOLDER_CACHE[name][0] - - -def flattenallbut0(x): - return tf.reshape(x, [-1, intprod(x.get_shape().as_list()[1:])]) - - -def reset(): - global _PLACEHOLDER_CACHE - global VARIABLES - _PLACEHOLDER_CACHE = {} - VARIABLES = {} - tf.reset_default_graph() diff --git a/python/ray/rllib/es/utils.py b/python/ray/rllib/es/utils.py index 733badf72..6ea5d31ac 100644 --- a/python/ray/rllib/es/utils.py +++ b/python/ray/rllib/es/utils.py @@ -30,10 +30,9 @@ def compute_centered_ranks(x): def make_session(single_threaded): if not single_threaded: - return tf.InteractiveSession() - return tf.InteractiveSession( - config=tf.ConfigProto(inter_op_parallelism_threads=1, - intra_op_parallelism_threads=1)) + return tf.Session() + return tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, + intra_op_parallelism_threads=1)) def itergroups(items, group_size): @@ -58,30 +57,3 @@ def batched_weighted_sum(weights, vecs, batch_size): np.asarray(batch_vecs, dtype=np.float32)) num_items_summed += len(batch_weights) return total, num_items_summed - - -class RunningStat(object): - def __init__(self, shape, eps): - self.sum = np.zeros(shape, dtype=np.float32) - self.sumsq = np.full(shape, eps, dtype=np.float32) - self.count = eps - - def increment(self, s, ssq, c): - self.sum += s - self.sumsq += ssq - self.count += c - - @property - def mean(self): - return self.sum / self.count - - @property - def std(self): - return np.sqrt(np.maximum( - self.sumsq / self.count - np.square(self.mean), 1e-2)) - - def set_from_init(self, init_mean, init_std, init_count): - self.sum[:] = init_mean * init_count - self.sumsq[:] = (np.square(init_mean) + - np.square(init_std)) * init_count - self.count = init_count diff --git a/python/ray/rllib/es/viz.py b/python/ray/rllib/es/viz.py deleted file mode 100644 index d208b9d16..000000000 --- a/python/ray/rllib/es/viz.py +++ /dev/null @@ -1,43 +0,0 @@ -# Code in this file is copied and adapted from -# https://github.com/openai/evolution-strategies-starter. - -import click - - -@click.command() -@click.argument("env_id") -@click.argument("policy_file") -@click.option("--record", is_flag=True) -@click.option("--stochastic", is_flag=True) -@click.option("--extra_kwargs") -def main(env_id, policy_file, record, stochastic, extra_kwargs): - import gym - from gym import wrappers - import tensorflow as tf - from policies import MujocoPolicy - import numpy as np - - env = gym.make(env_id) - if record: - import uuid - env = wrappers.Monitor(env, "/tmp/" + str(uuid.uuid4()), force=True) - - if extra_kwargs: - import json - extra_kwargs = json.loads(extra_kwargs) - - with tf.Session(): - pi = MujocoPolicy.Load(policy_file, extra_kwargs=extra_kwargs) - while True: - rews, t = pi.rollout(env, render=True, - random_stream=(np.random if stochastic - else None)) - print("return={:.4f} len={}".format(rews.sum(), t)) - - if record: - env.close() - return - - -if __name__ == "__main__": - main() diff --git a/python/ray/rllib/test/test_checkpoint_restore.py b/python/ray/rllib/test/test_checkpoint_restore.py index 33ac3628a..e8b262d3e 100755 --- a/python/ray/rllib/test/test_checkpoint_restore.py +++ b/python/ray/rllib/test/test_checkpoint_restore.py @@ -6,7 +6,6 @@ from __future__ import print_function import numpy as np import ray -import random from ray.rllib.agent import get_agent_class @@ -21,13 +20,13 @@ def get_mean_action(alg, obs): ray.init() CONFIGS = { + "ES": {"episodes_per_batch": 10, "timesteps_per_batch": 100}, "DQN": {}, "PPO": {"num_sgd_iter": 5, "timesteps_per_batch": 1000}, "A3C": {"use_lstm": False}, } -# https://github.com/ray-project/ray/issues/1062 for enabling ES test as well -for name in ["DQN", "PPO", "A3C"]: +for name in ["ES", "DQN", "PPO", "A3C"]: cls = get_agent_class(name) alg1 = cls("CartPole-v0", CONFIGS[name]) alg2 = cls("CartPole-v0", CONFIGS[name]) @@ -40,8 +39,7 @@ for name in ["DQN", "PPO", "A3C"]: alg2.restore(alg1.save()) for _ in range(10): - obs = [ - random.random(), random.random(), random.random(), random.random()] + obs = np.random.uniform(size=4) a1 = get_mean_action(alg1, obs) a2 = get_mean_action(alg2, obs) print("Checking computed actions", alg1, obs, a1, a2) diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 73971c933..5098500c3 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -84,6 +84,13 @@ docker run --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"stepsize": 0.01}' +docker run --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env Pong-v0 \ + --alg ES \ + --stop '{"training_iteration": 2}' \ + --config '{"stepsize": 0.01}' + docker run --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \