diff --git a/doc/source/rllib-concepts.rst b/doc/source/rllib-concepts.rst index f752279cb..68c160c91 100644 --- a/doc/source/rllib-concepts.rst +++ b/doc/source/rllib-concepts.rst @@ -17,7 +17,7 @@ Policy Evaluation Given an environment and policy graph, policy evaluation produces `batches `__ of experiences. This is your classic "environment interaction loop". Efficient policy evaluation can be burdensome to get right, especially when leveraging vectorization, RNNs, or when operating in a multi-agent environment. RLlib provides a `PolicyEvaluator `__ class that manages all of this, and this class is used in most RLlib algorithms. -You can also use policy evaluation standalone to produce batches of experiences. This can be done by calling ``ev.sample()`` on an evaluator instance, or ``ev.sample.remote()`` in parallel on evaluator instances created as Ray actors (see ``PolicyEvalutor.as_remote()``). +You can also use policy evaluation standalone to produce batches of experiences. This can be done by calling ``ev.sample()`` on an evaluator instance, or ``ev.sample.remote()`` in parallel on evaluator instances created as Ray actors (see ``PolicyEvaluator.as_remote()``). Policy Optimization ------------------- diff --git a/doc/source/rllib-models.rst b/doc/source/rllib-models.rst index 6efc4abb8..a2a9233ef 100644 --- a/doc/source/rllib-models.rst +++ b/doc/source/rllib-models.rst @@ -15,7 +15,7 @@ RLlib picks default models based on a simple heuristic: a `vision network `__. More generally, RLlib supports the use of recurrent models for its policy gradient algorithms (A3C, PPO, PG, IMPALA), and RNN support is built into its policy evaluation utilities. -For preprocessors, RLlib tries to pick one of its built-in preprocessor based on the environment's observation space. Discrete observations are one-hot encoded, Atari observations downscaled, and Tuple observations flattened (there isn't native tuple support yet, but you can reshape the flattened observation in a custom model). Note that for Atari, RLlib defaults to using the `DeepMind preprocessors `__, which are also used by the OpenAI baselines library. +For preprocessors, RLlib tries to pick one of its built-in preprocessor based on the environment's observation space. Discrete observations are one-hot encoded, Atari observations downscaled, and Tuple and Dict observations flattened (these are unflattened and accessible via the ``input_dict`` parameter in custom models). Note that for Atari, RLlib defaults to using the `DeepMind preprocessors `__, which are also used by the OpenAI baselines library. Built-in Model Parameters ~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -30,7 +30,7 @@ The following is a list of the built-in model hyperparameters: Custom Models ------------- -Custom models should subclass the common RLlib `model class `__ and override the ``_build_layers`` method. This method takes in a tensor input (observation), and returns a feature layer and float vector of the specified output size. The model can then be registered and used in place of a built-in model: +Custom models should subclass the common RLlib `model class `__ and override the ``_build_layers_v2`` method. This method takes in a dict of tensor inputs (the observation ``obs``, ``prev_action``, and ``prev_reward``), and returns a feature layer and float vector of the specified output size. The model can then be registered and used in place of a built-in model: .. code-block:: python @@ -39,9 +39,38 @@ Custom models should subclass the common RLlib `model class >> print(input_dict) + {'prev_actions': , + 'prev_rewards': , + 'obs': OrderedDict([ + ('sensors', OrderedDict([ + ('front_cam', [ + , + ]), + ('position', ), + ('velocity', )]))])} + """ + + layer1 = slim.fully_connected(input_dict["obs"], 64, ...) + layer2 = slim.fully_connected(layer1, 64, ...) ... return layerN, layerN_minus_1 @@ -55,12 +84,12 @@ Custom models should subclass the common RLlib `model class `__ and associated `training scripts `__. The ``CarlaModel`` class defined there operates over a composite (Tuple) observation space including both images and scalar measurements. +For a full example of a custom model in code, see the `Carla RLlib model `__ and associated `training scripts `__. You can also reference the `unit tests `__ for Tuple and Dict spaces, which show how to access nested observation fields. Custom Preprocessors -------------------- -Similarly, custom preprocessors should subclass the RLlib `preprocessor class `__ and be registered in the model catalog: +Similarly, custom preprocessors should subclass the RLlib `preprocessor class `__ and be registered in the model catalog. Note that you can alternatively use `gym wrapper classes `__ around your environment instead of preprocessors. .. code-block:: python @@ -69,8 +98,8 @@ Similarly, custom preprocessors should subclass the RLlib `preprocessor class rllib.AsyncVectorEnv rllib.ServingEnv => rllib.AsyncVectorEnv + Attributes: + action_space (gym.Space): Action space. This must be defined for + single-agent envs. Multi-agent envs can set this to None. + observation_space (gym.Space): Observation space. This must be defined + for single-agent envs. Multi-agent envs can set this to None. + Examples: >>> env = MyAsyncVectorEnv() >>> obs, rewards, dones, infos, off_policy_actions = env.poll() @@ -142,8 +148,14 @@ def _with_dummy_agent_id(env_id_to_values, dummy_id=_DUMMY_AGENT_ID): class _ServingEnvToAsync(AsyncVectorEnv): """Internal adapter of ServingEnv to AsyncVectorEnv.""" - def __init__(self, serving_env): + def __init__(self, serving_env, preprocessor=None): self.serving_env = serving_env + self.prep = preprocessor + self.action_space = serving_env.action_space + if preprocessor: + self.observation_space = preprocessor.observation_space + else: + self.observation_space = serving_env.observation_space serving_env.start() def poll(self): @@ -168,7 +180,10 @@ class _ServingEnvToAsync(AsyncVectorEnv): if episode.cur_done: del self.serving_env._episodes[eid] if data: - all_obs[eid] = data["obs"] + if self.prep: + all_obs[eid] = self.prep.transform(data["obs"]) + else: + all_obs[eid] = data["obs"] all_rewards[eid] = data["reward"] all_dones[eid] = data["done"] all_infos[eid] = data["info"] @@ -196,6 +211,8 @@ class _VectorEnvToAsync(AsyncVectorEnv): def __init__(self, vector_env): self.vector_env = vector_env + self.action_space = vector_env.action_space + self.observation_space = vector_env.observation_space self.num_envs = vector_env.num_envs self.new_obs = self.vector_env.vector_reset() self.cur_rewards = [None for _ in range(self.num_envs)] diff --git a/python/ray/rllib/env/serving_env.py b/python/ray/rllib/env/serving_env.py index 0c1e3ec0d..528cae266 100644 --- a/python/ray/rllib/env/serving_env.py +++ b/python/ray/rllib/env/serving_env.py @@ -25,6 +25,10 @@ class ServingEnv(threading.Thread): This env is thread-safe, but individual episodes must be executed serially. + Attributes: + action_space (gym.Space): Action space. + observation_space (gym.Space): Observation space. + Examples: >>> register_env("my_env", lambda config: YourServingEnv(config)) >>> agent = DQNAgent(env="my_env") @@ -57,10 +61,12 @@ class ServingEnv(threading.Thread): """Override this to implement the run loop. Your loop should continuously: - 1. Call self.start_episode() - 2. Call self.get_action() or self.log_action() - 3. Call self.log_returns() - 4. Call self.end_episode() + 1. Call self.start_episode(episode_id) + 2. Call self.get_action(episode_id, obs) + -or- + self.log_action(episode_id, obs, action) + 3. Call self.log_returns(episode_id, reward) + 4. Call self.end_episode(episode_id, obs) 5. Wait if nothing to do. Multiple episodes may be started at the same time. diff --git a/python/ray/rllib/env/vector_env.py b/python/ray/rllib/env/vector_env.py index 7fb5b1605..8d2289cf4 100644 --- a/python/ray/rllib/env/vector_env.py +++ b/python/ray/rllib/env/vector_env.py @@ -69,6 +69,8 @@ class _VectorizedGymEnv(VectorEnv): self.num_envs = num_envs while len(self.envs) < self.num_envs: self.envs.append(self.make_env(len(self.envs))) + self.action_space = self.envs[0].action_space + self.observation_space = self.envs[0].observation_space def vector_reset(self): return [e.reset() for e in self.envs] diff --git a/python/ray/rllib/evaluation/episode.py b/python/ray/rllib/evaluation/episode.py index fc99d79fb..ebd6ea784 100644 --- a/python/ray/rllib/evaluation/episode.py +++ b/python/ray/rllib/evaluation/episode.py @@ -54,6 +54,8 @@ class MultiAgentEpisode(object): self._agent_to_last_obs = {} self._agent_to_last_action = {} self._agent_to_last_pi_info = {} + self._agent_to_prev_action = {} + self._agent_reward_history = defaultdict(list) def policy_for(self, agent_id): """Returns the policy graph for the specified agent. @@ -72,19 +74,33 @@ class MultiAgentEpisode(object): return self._agent_to_last_obs.get(agent_id) def last_action_for(self, agent_id): - """Returns the last action for the specified agent.""" + """Returns the last action for the specified agent, or zeros.""" - action = self._agent_to_last_action[agent_id] - # Concatenate tuple actions - if isinstance(action, list): - expanded = [] - for a in action: - if len(a.shape) == 1: - expanded.append(np.expand_dims(a, 1)) - else: - expanded.append(a) - action = np.concatenate(expanded, axis=1).flatten() - return action + if agent_id in self._agent_to_last_action: + return _flatten_action(self._agent_to_last_action[agent_id]) + else: + policy = self._policies[self.policy_for(agent_id)] + flat = _flatten_action(policy.action_space.sample()) + return np.zeros_like(flat) + + def prev_action_for(self, agent_id): + """Returns the previous action for the specified agent.""" + + if agent_id in self._agent_to_prev_action: + return _flatten_action(self._agent_to_prev_action[agent_id]) + else: + # We're at t=0, so return all zeros. + return np.zeros_like(self.last_action_for(agent_id)) + + def prev_reward_for(self, agent_id): + """Returns the previous reward for the specified agent.""" + + history = self._agent_reward_history[agent_id] + if len(history) >= 2: + return history[-2] + else: + # We're at t=0, so there is no previous reward, just return zero. + return 0.0 def rnn_state_for(self, agent_id): """Returns the last RNN state for the specified agent.""" @@ -105,6 +121,7 @@ class MultiAgentEpisode(object): self.agent_rewards[agent_id, self.policy_for(agent_id)] += reward self.total_reward += reward + self._agent_reward_history[agent_id].append(reward) def _set_rnn_state(self, agent_id, rnn_state): self._agent_to_rnn_state[agent_id] = rnn_state @@ -117,3 +134,16 @@ class MultiAgentEpisode(object): def _set_last_pi_info(self, agent_id, pi_info): self._agent_to_last_pi_info[agent_id] = pi_info + + +def _flatten_action(action): + # Concatenate tuple actions + if isinstance(action, list) or isinstance(action, tuple): + expanded = [] + for a in action: + if not hasattr(a, "shape") or len(a.shape) == 0: + expanded.append(np.expand_dims(a, 1)) + else: + expanded.append(a) + action = np.concatenate(expanded, axis=0).flatten() + return action diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 548b65806..db88eb759 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -11,8 +11,6 @@ from ray.rllib.models import ModelCatalog from ray.rllib.env.async_vector_env import AsyncVectorEnv from ray.rllib.env.atari_wrappers import wrap_deepmind, is_atari from ray.rllib.env.env_context import EnvContext -from ray.rllib.env.serving_env import ServingEnv -from ray.rllib.env.vector_env import VectorEnv from ray.rllib.env.multi_agent_env import MultiAgentEnv from ray.rllib.evaluation.interface import EvaluatorInterface from ray.rllib.evaluation.sample_batch import MultiAgentBatch, \ @@ -179,11 +177,15 @@ class PolicyEvaluator(EvaluatorInterface): self.compress_observations = compress_observations self.env = env_creator(env_context) - if isinstance(self.env, VectorEnv) or \ - isinstance(self.env, ServingEnv) or \ - isinstance(self.env, MultiAgentEnv) or \ + if isinstance(self.env, MultiAgentEnv) or \ isinstance(self.env, AsyncVectorEnv): + if model_config.get("custom_preprocessor"): + raise ValueError( + "Custom preprocessors are not supported for env types " + "MultiAgentEnv and AsyncVectorEnv. Please preprocess " + "observations in your env directly.") + def wrap(env): return env # we can't auto-wrap these env types elif is_atari(self.env) and \ @@ -294,6 +296,18 @@ class PolicyEvaluator(EvaluatorInterface): merged_conf = policy_config.copy() merged_conf.update(conf) with tf.variable_scope(name): + if isinstance(obs_space, gym.spaces.Dict): + raise ValueError( + "Found raw Dict space as input to policy graph. " + "Please preprocess your environment observations " + "with DictFlatteningPreprocessor and set the " + "obs space to `preprocessor.observation_space`.") + elif isinstance(obs_space, gym.spaces.Tuple): + raise ValueError( + "Found raw Tuple space as input to policy graph. " + "Please preprocess your environment observations " + "with TupleFlatteningPreprocessor and set the " + "obs space to `preprocessor.observation_space`.") policy_map[name] = cls(obs_space, act_space, merged_conf) return policy_map diff --git a/python/ray/rllib/evaluation/policy_graph.py b/python/ray/rllib/evaluation/policy_graph.py index 925fa70aa..b2d154f48 100644 --- a/python/ray/rllib/evaluation/policy_graph.py +++ b/python/ray/rllib/evaluation/policy_graph.py @@ -40,6 +40,8 @@ class PolicyGraph(object): def compute_actions(self, obs_batch, state_batches, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): """Compute actions for the current policy. @@ -47,6 +49,8 @@ class PolicyGraph(object): Arguments: obs_batch (np.ndarray): batch of observations state_batches (list): list of RNN state input batches, if any + prev_action_batch (np.ndarray): batch of previous action values + prev_reward_batch (np.ndarray): batch of previous rewards is_training (bool): whether we are training the policy episodes (list): MultiAgentEpisode for each obs in obs_batch. This provides access to all of the internal episode state, @@ -65,6 +69,8 @@ class PolicyGraph(object): def compute_single_action(self, obs, state, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episode=None): """Unbatched version of compute_actions. @@ -72,6 +78,8 @@ class PolicyGraph(object): Arguments: obs (obj): single observation state_batches (list): list of RNN state inputs, if any + prev_action_batch (np.ndarray): batch of previous action values + prev_reward_batch (np.ndarray): batch of previous rewards is_training (bool): whether we are training the policy episode (MultiAgentEpisode): this provides access to all of the internal episode state, which may be useful for model-based or diff --git a/python/ray/rllib/evaluation/sampler.py b/python/ray/rllib/evaluation/sampler.py index 64999d363..2f90dcbab 100644 --- a/python/ray/rllib/evaluation/sampler.py +++ b/python/ray/rllib/evaluation/sampler.py @@ -3,22 +3,25 @@ from __future__ import division from __future__ import print_function from collections import defaultdict, namedtuple +import numpy as np import six.moves.queue as queue import threading -from ray.rllib.evaluation.episode import MultiAgentEpisode +from ray.rllib.evaluation.episode import MultiAgentEpisode, _flatten_action from ray.rllib.evaluation.sample_batch import MultiAgentSampleBatchBuilder, \ MultiAgentBatch from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph from ray.rllib.env.async_vector_env import AsyncVectorEnv from ray.rllib.env.atari_wrappers import get_wrapper_by_cls, MonitorEnv +from ray.rllib.models.action_dist import TupleActions from ray.rllib.utils.tf_run_builder import TFRunBuilder RolloutMetrics = namedtuple( "RolloutMetrics", ["episode_length", "episode_reward", "agent_rewards"]) -PolicyEvalData = namedtuple("PolicyEvalData", - ["env_id", "agent_id", "obs", "rnn_state"]) +PolicyEvalData = namedtuple( + "PolicyEvalData", + ["env_id", "agent_id", "obs", "rnn_state", "prev_action", "prev_reward"]) class SyncSampler(object): @@ -281,7 +284,9 @@ def _env_runner(async_vector_env, if not agent_done: to_eval[policy_id].append( PolicyEvalData(env_id, agent_id, filtered_obs, - episode.rnn_state_for(agent_id))) + episode.rnn_state_for(agent_id), + episode.last_action_for(agent_id), + rewards[env_id][agent_id] or 0.0)) last_observation = episode.last_observation_for(agent_id) episode._set_last_observation(agent_id, filtered_obs) @@ -297,6 +302,8 @@ def _env_runner(async_vector_env, obs=last_observation, actions=episode.last_action_for(agent_id), rewards=rewards[env_id][agent_id], + prev_actions=episode.prev_action_for(agent_id), + prev_rewards=episode.prev_reward_for(agent_id), dones=agent_done, infos=infos[env_id][agent_id], new_obs=filtered_obs, @@ -326,12 +333,17 @@ def _env_runner(async_vector_env, episode = active_episodes[env_id] for agent_id, raw_obs in resetted_obs.items(): policy_id = episode.policy_for(agent_id) + policy = _get_or_raise(policies, policy_id) filtered_obs = _get_or_raise(obs_filters, policy_id)(raw_obs) episode._set_last_observation(agent_id, filtered_obs) to_eval[policy_id].append( - PolicyEvalData(env_id, agent_id, filtered_obs, - episode.rnn_state_for(agent_id))) + PolicyEvalData( + env_id, agent_id, filtered_obs, + episode.rnn_state_for(agent_id), + np.zeros_like( + _flatten_action( + policy.action_space.sample())), 0.0)) # Batch eval policy actions if possible if tf_sess: @@ -350,11 +362,15 @@ def _env_runner(async_vector_env, pending_fetches[policy_id] = policy.build_compute_actions( builder, [t.obs for t in eval_data], rnn_in, + prev_action_batch=[t.prev_action for t in eval_data], + prev_reward_batch=[t.prev_reward for t in eval_data], is_training=True) else: eval_results[policy_id] = policy.compute_actions( [t.obs for t in eval_data], rnn_in, + prev_action_batch=[t.prev_action for t in eval_data], + prev_reward_batch=[t.prev_reward for t in eval_data], is_training=True, episodes=[active_episodes[t.env_id] for t in eval_data]) if builder: @@ -374,6 +390,7 @@ def _env_runner(async_vector_env, for f_i, column in enumerate(rnn_out_cols): pi_info_cols["state_out_{}".format(f_i)] = column # Save output rows + actions = _unbatch_tuple_actions(actions) for i, action in enumerate(actions): env_id = eval_data[i].env_id agent_id = eval_data[i].agent_id @@ -413,6 +430,19 @@ def _fetch_atari_metrics(async_vector_env): return atari_out +def _unbatch_tuple_actions(action_batch): + # convert list of batches -> batch of lists + if isinstance(action_batch, TupleActions): + out = [] + for j in range(len(action_batch.batches[0])): + out.append([ + action_batch.batches[i][j] + for i in range(len(action_batch.batches)) + ]) + return out + return action_batch + + def _to_column_format(rnn_state_rows): num_cols = len(rnn_state_rows[0]) return [[row[i] for row in rnn_state_rows] for i in range(num_cols)] diff --git a/python/ray/rllib/evaluation/tf_policy_graph.py b/python/ray/rllib/evaluation/tf_policy_graph.py index 09a84981e..a7b34c2ce 100644 --- a/python/ray/rllib/evaluation/tf_policy_graph.py +++ b/python/ray/rllib/evaluation/tf_policy_graph.py @@ -46,6 +46,8 @@ class TFPolicyGraph(PolicyGraph): loss_inputs, state_inputs=None, state_outputs=None, + prev_action_input=None, + prev_reward_input=None, seq_lens=None, max_seq_len=20): """Initialize the policy graph. @@ -65,6 +67,8 @@ class TFPolicyGraph(PolicyGraph): and has shape [BATCH_SIZE, data...]. state_inputs (list): list of RNN state input Tensors. state_outputs (list): list of RNN state output Tensors. + prev_action_input (Tensor): placeholder for previous actions + prev_reward_input (Tensor): placeholder for previous rewards seq_lens (Tensor): placeholder for RNN sequence lengths, of shape [NUM_SEQUENCES]. Note that NUM_SEQUENCES << BATCH_SIZE. See models/lstm.py for more information. @@ -75,6 +79,8 @@ class TFPolicyGraph(PolicyGraph): self.action_space = action_space self._sess = sess self._obs_input = obs_input + self._prev_action_input = prev_action_input + self._prev_reward_input = prev_reward_input self._sampler = action_sampler self._loss = loss self._loss_inputs = loss_inputs @@ -112,6 +118,8 @@ class TFPolicyGraph(PolicyGraph): builder, obs_batch, state_batches=None, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): state_batches = state_batches or [] @@ -121,6 +129,10 @@ class TFPolicyGraph(PolicyGraph): builder.add_feed_dict({self._obs_input: obs_batch}) if state_batches: builder.add_feed_dict({self._seq_lens: np.ones(len(obs_batch))}) + if self._prev_action_input is not None and prev_action_batch: + builder.add_feed_dict({self._prev_action_input: prev_action_batch}) + if self._prev_reward_input is not None and prev_reward_batch: + builder.add_feed_dict({self._prev_reward_input: prev_reward_batch}) builder.add_feed_dict({self._is_training: is_training}) builder.add_feed_dict(dict(zip(self._state_inputs, state_batches))) fetches = builder.add_fetches([self._sampler] + self._state_outputs + @@ -130,11 +142,14 @@ class TFPolicyGraph(PolicyGraph): def compute_actions(self, obs_batch, state_batches=None, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): builder = TFRunBuilder(self._sess, "compute_actions") fetches = self.build_compute_actions(builder, obs_batch, state_batches, - is_training) + prev_action_batch, + prev_reward_batch, is_training) return builder.get(fetches) def _get_loss_inputs_dict(self, batch): diff --git a/python/ray/rllib/evaluation/torch_policy_graph.py b/python/ray/rllib/evaluation/torch_policy_graph.py index 741357f3a..cb990c36f 100644 --- a/python/ray/rllib/evaluation/torch_policy_graph.py +++ b/python/ray/rllib/evaluation/torch_policy_graph.py @@ -70,6 +70,8 @@ class TorchPolicyGraph(PolicyGraph): def compute_actions(self, obs_batch, state_batches=None, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): if state_batches: diff --git a/python/ray/rllib/examples/carla/models.py b/python/ray/rllib/examples/carla/models.py index fd20cd0c0..3f8cc0c5b 100644 --- a/python/ray/rllib/examples/carla/models.py +++ b/python/ray/rllib/examples/carla/models.py @@ -20,6 +20,7 @@ class CarlaModel(Model): further fully connected layers. """ + # TODO(ekl): use build_layers_v2 for native dict space support def _build_layers(self, inputs, num_outputs, options): # Parse options image_shape = options["custom_options"]["image_shape"] diff --git a/python/ray/rllib/examples/cartpole_lstm.py b/python/ray/rllib/examples/cartpole_lstm.py index 67fd35d28..ddc89c47e 100644 --- a/python/ray/rllib/examples/cartpole_lstm.py +++ b/python/ray/rllib/examples/cartpole_lstm.py @@ -14,6 +14,7 @@ import numpy as np parser = argparse.ArgumentParser() parser.add_argument("--stop", type=int, default=200) +parser.add_argument("--use-prev-action-reward", action="store_true") parser.add_argument("--run", type=str, default="PPO") @@ -183,10 +184,13 @@ if __name__ == "__main__": "stop": { "episode_reward_mean": args.stop }, - "config": dict(configs[args.run], **{ - "model": { - "use_lstm": True, - }, - }), + "config": dict( + configs[args.run], **{ + "model": { + "use_lstm": True, + "lstm_use_prev_action_reward": args. + use_prev_action_reward, + }, + }), } }) diff --git a/python/ray/rllib/models/action_dist.py b/python/ray/rllib/models/action_dist.py index b0cfe4141..91b8d2fce 100644 --- a/python/ray/rllib/models/action_dist.py +++ b/python/ray/rllib/models/action_dist.py @@ -2,9 +2,11 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +from collections import namedtuple +import distutils.version + import tensorflow as tf import numpy as np -import distutils.version use_tf150_api = (distutils.version.LooseVersion(tf.VERSION) >= distutils.version.LooseVersion("1.5.0")) @@ -225,4 +227,8 @@ class MultiActionDistribution(ActionDistribution): def sample(self): """Draw a sample from the action distribution.""" - return [[s.sample() for s in self.child_distributions]] + + return TupleActions([s.sample() for s in self.child_distributions]) + + +TupleActions = namedtuple("TupleActions", ["batches"]) diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index d2038f55f..1c55cb79e 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -10,6 +10,9 @@ from functools import partial from ray.tune.registry import RLLIB_MODEL, RLLIB_PREPROCESSOR, \ _global_registry +from ray.rllib.env.async_vector_env import _ServingEnvToAsync +from ray.rllib.env.serving_env import ServingEnv +from ray.rllib.env.vector_env import VectorEnv from ray.rllib.models.action_dist import ( Categorical, Deterministic, DiagGaussian, MultiActionDistribution, squash_to_range) @@ -41,6 +44,8 @@ MODEL_DEFAULTS = { "max_seq_len": 20, # Size of the LSTM cell "lstm_cell_size": 256, + # Whether to feed a_{t-1}, r_{t-1} to LSTM + "lstm_use_prev_action_reward": False, # == Atari == # Whether to enable framestack for Atari envs @@ -133,10 +138,6 @@ class ModelCatalog(object): action_placeholder (Tensor): A placeholder for the actions """ - # TODO(ekl) are list spaces valid? - if isinstance(action_space, list): - action_space = gym.spaces.Tuple(action_space) - if isinstance(action_space, gym.spaces.Box): return tf.placeholder( tf.float32, shape=(None, action_space.shape[0]), name="action") @@ -160,11 +161,18 @@ class ModelCatalog(object): " not supported".format(action_space)) @staticmethod - def get_model(inputs, num_outputs, options, state_in=None, seq_lens=None): + def get_model(input_dict, + obs_space, + num_outputs, + options, + state_in=None, + seq_lens=None): """Returns a suitable model conforming to given input and output specs. Args: - inputs (Tensor): The input tensor to the model. + input_dict (dict): Dict of input tensors to the model, including + the observation under the "obs" key. + obs_space (Space): Observation space of the target gym env. num_outputs (int): The size of the output vector of the model. options (dict): Optional args to pass to the model constructor. state_in (list): Optional RNN state in tensors. @@ -174,34 +182,40 @@ class ModelCatalog(object): model (Model): Neural network model. """ + assert isinstance(input_dict, dict) options = options or MODEL_DEFAULTS - model = ModelCatalog._get_model(inputs, num_outputs, options, state_in, - seq_lens) + model = ModelCatalog._get_model(input_dict, obs_space, num_outputs, + options, state_in, seq_lens) if options.get("use_lstm"): - model = LSTM(model.last_layer, num_outputs, options, state_in, + copy = dict(input_dict) + copy["obs"] = model.last_layer + model = LSTM(copy, obs_space, num_outputs, options, state_in, seq_lens) return model @staticmethod - def _get_model(inputs, num_outputs, options, state_in, seq_lens): + def _get_model(input_dict, obs_space, num_outputs, options, state_in, + seq_lens): if options.get("custom_model"): model = options["custom_model"] print("Using custom model {}".format(model)) return _global_registry.get(RLLIB_MODEL, model)( - inputs, + input_dict, + obs_space, num_outputs, options, state_in=state_in, seq_lens=seq_lens) - obs_rank = len(inputs.shape) - 1 + obs_rank = len(input_dict["obs"].shape) - 1 if obs_rank > 1: - return VisionNetwork(inputs, num_outputs, options) + return VisionNetwork(input_dict, obs_space, num_outputs, options) - return FullyConnectedNetwork(inputs, num_outputs, options) + return FullyConnectedNetwork(input_dict, obs_space, num_outputs, + options) @staticmethod def get_torch_model(input_shape, num_outputs, options=None): @@ -243,7 +257,7 @@ class ModelCatalog(object): """Returns a suitable processor for the given environment. Args: - env (gym.Env): The gym environment to preprocess. + env (gym.Env|VectorEnv|ServingEnv): The environment to wrap. options (dict): Options to pass to the preprocessor. Returns: @@ -269,16 +283,23 @@ class ModelCatalog(object): """Returns a preprocessor as a gym observation wrapper. Args: - env (gym.Env): The gym environment to wrap. + env (gym.Env|VectorEnv|ServingEnv): The environment to wrap. options (dict): Options to pass to the preprocessor. Returns: - wrapper (gym.ObservationWrapper): Preprocessor in wrapper form. + env (RLlib env): Wrapped environment """ options = options or MODEL_DEFAULTS preprocessor = ModelCatalog.get_preprocessor(env, options) - return _RLlibPreprocessorWrapper(env, preprocessor) + if isinstance(env, gym.Env): + return _RLlibPreprocessorWrapper(env, preprocessor) + elif isinstance(env, VectorEnv): + return _RLlibVectorPreprocessorWrapper(env, preprocessor) + elif isinstance(env, ServingEnv): + return _ServingEnvToAsync(env, preprocessor) + else: + raise ValueError("Don't know how to wrap {}".format(env)) @staticmethod def register_custom_preprocessor(preprocessor_name, preprocessor_class): @@ -314,10 +335,32 @@ class _RLlibPreprocessorWrapper(gym.ObservationWrapper): def __init__(self, env, preprocessor): super(_RLlibPreprocessorWrapper, self).__init__(env) self.preprocessor = preprocessor - - from gym.spaces.box import Box - self.observation_space = Box( - -1.0, 1.0, preprocessor.shape, dtype=np.float32) + self.observation_space = preprocessor.observation_space def observation(self, observation): return self.preprocessor.transform(observation) + + +class _RLlibVectorPreprocessorWrapper(VectorEnv): + """Preprocessing wrapper for vector envs.""" + + def __init__(self, env, preprocessor): + self.env = env + self.prep = preprocessor + self.action_space = env.action_space + self.observation_space = preprocessor.observation_space + self.num_envs = env.num_envs + + def vector_reset(self): + return [self.prep.transform(obs) for obs in self.env.vector_reset()] + + def reset_at(self, index): + return self.prep.transform(self.env.reset_at(index)) + + def vector_step(self, actions): + obs, rewards, dones, infos = self.env.vector_step(actions) + obs = [self.prep.transform(o) for o in obs] + return obs, rewards, dones, infos + + def get_unwrapped(self): + return self.env.get_unwrapped() diff --git a/python/ray/rllib/models/fcnet.py b/python/ray/rllib/models/fcnet.py index e703fb0a0..5a759fd59 100644 --- a/python/ray/rllib/models/fcnet.py +++ b/python/ray/rllib/models/fcnet.py @@ -13,6 +13,12 @@ class FullyConnectedNetwork(Model): """Generic fully connected network.""" def _build_layers(self, inputs, num_outputs, options): + """Process the flattened inputs. + + Note that dict inputs will be flattened into a vector. To define a + model that processes the components separately, use _build_layers_v2(). + """ + hiddens = options.get("fcnet_hiddens") activation = get_activation_fn(options.get("fcnet_activation")) diff --git a/python/ray/rllib/models/lstm.py b/python/ray/rllib/models/lstm.py index b8dea3ede..5f3bdc8b7 100644 --- a/python/ray/rllib/models/lstm.py +++ b/python/ray/rllib/models/lstm.py @@ -9,6 +9,10 @@ the LSTM cell, we reshape the input to add the expected time dimension. During postprocessing, we dynamically pad the experience batches so that this reshaping is possible. +Note that this padding strategy only works out if we assume zero inputs don't +meaningfully affect the loss function. This happens to be true for all the +current algorithms: https://github.com/ray-project/ray/issues/2992 + See the add_time_dimension() and chop_into_sequences() functions below for more info. """ @@ -134,9 +138,24 @@ class LSTM(Model): self.seq_lens. See add_time_dimension() for more information. """ - def _build_layers(self, inputs, num_outputs, options): + def _build_layers_v2(self, input_dict, num_outputs, options): cell_size = options.get("lstm_cell_size") - last_layer = add_time_dimension(inputs, self.seq_lens) + if options.get("lstm_use_prev_action_reward"): + action_dim = int( + np.product( + input_dict["prev_actions"].get_shape().as_list()[1:])) + features = tf.concat( + [ + input_dict["obs"], + tf.reshape( + tf.cast(input_dict["prev_actions"], tf.float32), + [-1, action_dim]), + tf.reshape(input_dict["prev_rewards"], [-1, 1]), + ], + axis=1) + else: + features = input_dict["obs"] + last_layer = add_time_dimension(features, self.seq_lens) # Setup the LSTM cell lstm = rnn.BasicLSTMCell(cell_size, state_is_tuple=True) diff --git a/python/ray/rllib/models/model.py b/python/ray/rllib/models/model.py index 168f29c74..fb8ea6687 100644 --- a/python/ray/rllib/models/model.py +++ b/python/ray/rllib/models/model.py @@ -2,8 +2,13 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +from collections import OrderedDict + +import gym import tensorflow as tf +from ray.rllib.models.preprocessors import get_preprocessor + class Model(object): """Defines an abstract network model for use with RLlib. @@ -16,12 +21,12 @@ class Model(object): needs to further post-processing (e.g. Actor and Critic networks in A3C). Attributes: - inputs (Tensor): The input placeholder for this model, of shape - [BATCH_SIZE, ...]. + input_dict (dict): Dictionary of input tensors, including "obs", + "prev_action", "prev_reward". outputs (Tensor): The output vector of this model, of shape [BATCH_SIZE, num_outputs]. - last_layer (Tensor): The network layer right before the model output, - of shape [BATCH_SIZE, N]. + last_layer (Tensor): The feature layer right before the model output, + of shape [BATCH_SIZE, f]. state_init (list): List of initial recurrent state tensors (if any). state_in (list): List of input recurrent state tensors (if any). state_out (list): List of output recurrent state tensors (if any). @@ -38,12 +43,13 @@ class Model(object): """ def __init__(self, - inputs, + input_dict, + obs_space, num_outputs, options, state_in=None, seq_lens=None): - self.inputs = inputs + assert isinstance(input_dict, dict), input_dict # Default attribute values for the non-RNN case self.state_init = [] @@ -58,8 +64,26 @@ class Model(object): if options.get("free_log_std"): assert num_outputs % 2 == 0 num_outputs = num_outputs // 2 - self.outputs, self.last_layer = self._build_layers( - inputs, num_outputs, options) + try: + self.outputs, self.last_layer = self._build_layers_v2( + _restore_original_dimensions(input_dict, obs_space), + num_outputs, options) + except NotImplementedError: + self.outputs, self.last_layer = self._build_layers( + input_dict["obs"], num_outputs, options) + + # Validate the output shape + try: + out = tf.convert_to_tensor(self.outputs) + shape = out.shape.as_list() + except Exception: + raise ValueError("Output is not a tensor: {}".format(self.outputs)) + else: + if len(shape) != 2 or shape[1] != num_outputs: + raise ValueError( + "Expected output shape of [None, {}], got {}".format( + num_outputs, shape)) + if options.get("free_log_std", False): log_std = tf.get_variable( name="log_std", @@ -68,6 +92,80 @@ class Model(object): self.outputs = tf.concat( [self.outputs, 0.0 * self.outputs + log_std], 1) - def _build_layers(self): - """Builds and returns the output and last layer of the network.""" + def _build_layers(self, inputs, num_outputs, options): + """Builds and returns the output and last layer of the network. + + Deprecated: use _build_layers_v2 instead, which has better support + for dict and tuple spaces. + """ raise NotImplementedError + + def _build_layers_v2(self, input_dict, num_outputs, options): + """Define the layers of a custom model. + + Arguments: + input_dict (dict): Dictionary of input tensors, including "obs", + "prev_action", "prev_reward". + num_outputs (int): Output tensor must be of size + [BATCH_SIZE, num_outputs]. + options (dict): Model options. + + Returns: + (outputs, feature_layer): Tensors of size [BATCH_SIZE, num_outputs] + and [BATCH_SIZE, desired_feature_size]. + + When using dict or tuple observation spaces, you can access + the nested sub-observation batches here as well: + + Examples: + >>> print(input_dict) + {'prev_actions': , + 'prev_rewards': , + 'obs': OrderedDict([ + ('sensors', OrderedDict([ + ('front_cam', [ + , + ]), + ('position', ), + ('velocity', )]))])} + """ + raise NotImplementedError + + +def _restore_original_dimensions(input_dict, obs_space): + if hasattr(obs_space, "original_space"): + return dict( + input_dict, + obs=_unpack_obs(input_dict["obs"], obs_space.original_space)) + return input_dict + + +def _unpack_obs(obs, space): + if (isinstance(space, gym.spaces.Dict) + or isinstance(space, gym.spaces.Tuple)): + prep = get_preprocessor(space)(space) + if len(obs.shape) != 2 or obs.shape[1] != prep.shape[0]: + raise ValueError( + "Expected flattened obs shape of [None, {}], got {}".format( + prep.shape[0], obs.shape)) + assert len(prep.preprocessors) == len(space.spaces), \ + (len(prep.preprocessors) == len(space.spaces)) + offset = 0 + if isinstance(space, gym.spaces.Tuple): + u = [] + for p, v in zip(prep.preprocessors, space.spaces): + obs_slice = obs[:, offset:offset + p.size] + offset += p.size + u.append( + _unpack_obs( + tf.reshape(obs_slice, [-1] + list(p.shape)), v)) + else: + u = OrderedDict() + for p, (k, v) in zip(prep.preprocessors, space.spaces.items()): + obs_slice = obs[:, offset:offset + p.size] + offset += p.size + u[k] = _unpack_obs( + tf.reshape(obs_slice, [-1] + list(p.shape)), v) + return u + else: + return obs diff --git a/python/ray/rllib/models/preprocessors.py b/python/ray/rllib/models/preprocessors.py index cd72d1922..b7e084062 100644 --- a/python/ray/rllib/models/preprocessors.py +++ b/python/ray/rllib/models/preprocessors.py @@ -16,19 +16,34 @@ class Preprocessor(object): shape (obj): Shape of the preprocessed output. """ - def __init__(self, obs_space, options): + def __init__(self, obs_space, options=None): legacy_patch_shapes(obs_space) self._obs_space = obs_space - self._options = options - self._init() + self._options = options or {} + self.shape = self._init_shape(obs_space, options) - def _init(self): - pass + def _init_shape(self, obs_space, options): + """Returns the shape after preprocessing.""" + raise NotImplementedError def transform(self, observation): """Returns the preprocessed observation.""" raise NotImplementedError + @property + def size(self): + return int(np.product(self.shape)) + + @property + def observation_space(self): + obs_space = gym.spaces.Box(-1.0, 1.0, self.shape, dtype=np.float32) + # Stash the unwrapped space so that we can unwrap dict and tuple spaces + # automatically in model.py + if (isinstance(self, TupleFlatteningPreprocessor) + or isinstance(self, DictFlatteningPreprocessor)): + obs_space.original_space = self._obs_space + return obs_space + class GenericPixelPreprocessor(Preprocessor): """Generic image preprocessor. @@ -37,19 +52,20 @@ class GenericPixelPreprocessor(Preprocessor): instead for deepmind-style Atari preprocessing. """ - def _init(self): - self._grayscale = self._options.get("grayscale") - self._zero_mean = self._options.get("zero_mean") - self._dim = self._options.get("dim") - self._channel_major = self._options.get("channel_major") + def _init_shape(self, obs_space, options): + self._grayscale = options.get("grayscale") + self._zero_mean = options.get("zero_mean") + self._dim = options.get("dim") + self._channel_major = options.get("channel_major") if self._grayscale: - self.shape = (self._dim, self._dim, 1) + shape = (self._dim, self._dim, 1) else: - self.shape = (self._dim, self._dim, 3) + shape = (self._dim, self._dim, 3) # channel_major requires (# in-channels, row dim, col dim) if self._channel_major: - self.shape = self.shape[-1:] + self.shape[:-1] + shape = shape[-1:] + shape[:-1] + return shape def transform(self, observation): """Downsamples images from (210, 160, 3) by the configured factor.""" @@ -75,16 +91,16 @@ class GenericPixelPreprocessor(Preprocessor): class AtariRamPreprocessor(Preprocessor): - def _init(self): - self.shape = (128, ) + def _init_shape(self, obs_space, options): + return (128, ) def transform(self, observation): return (observation - 128) / 128 class OneHotPreprocessor(Preprocessor): - def _init(self): - self.shape = (self._obs_space.n, ) + def _init_shape(self, obs_space, options): + return (self._obs_space.n, ) def transform(self, observation): arr = np.zeros(self._obs_space.n) @@ -93,8 +109,8 @@ class OneHotPreprocessor(Preprocessor): class NoPreprocessor(Preprocessor): - def _init(self): - self.shape = self._obs_space.shape + def _init_shape(self, obs_space, options): + return self._obs_space.shape def transform(self, observation): return observation @@ -103,11 +119,10 @@ class NoPreprocessor(Preprocessor): class TupleFlatteningPreprocessor(Preprocessor): """Preprocesses each tuple element, then flattens it all into a vector. - If desired, the vector output can be unpacked via tf.reshape() within a - custom model to handle each component separately. + RLlib models will unpack the flattened output before _build_layers_v2(). """ - def _init(self): + def _init_shape(self, obs_space, options): assert isinstance(self._obs_space, gym.spaces.Tuple) size = 0 self.preprocessors = [] @@ -116,17 +131,43 @@ class TupleFlatteningPreprocessor(Preprocessor): print("Creating sub-preprocessor for", space) preprocessor = get_preprocessor(space)(space, self._options) self.preprocessors.append(preprocessor) - size += np.product(preprocessor.shape) - self.shape = (size, ) + size += preprocessor.size + return (size, ) def transform(self, observation): assert len(observation) == len(self.preprocessors), observation return np.concatenate([ - np.reshape(p.transform(o), [np.product(p.shape)]) + np.reshape(p.transform(o), [p.size]) for (o, p) in zip(observation, self.preprocessors) ]) +class DictFlatteningPreprocessor(Preprocessor): + """Preprocesses each dict value, then flattens it all into a vector. + + RLlib models will unpack the flattened output before _build_layers_v2(). + """ + + def _init_shape(self, obs_space, options): + assert isinstance(self._obs_space, gym.spaces.Dict) + size = 0 + self.preprocessors = [] + for space in self._obs_space.spaces.values(): + print("Creating sub-preprocessor for", space) + preprocessor = get_preprocessor(space)(space, self._options) + self.preprocessors.append(preprocessor) + size += preprocessor.size + return (size, ) + + def transform(self, observation): + assert len(observation) == len(self.preprocessors), \ + (len(observation), len(self.preprocessors)) + return np.concatenate([ + np.reshape(p.transform(o), [p.size]) + for (o, p) in zip(observation.values(), self.preprocessors) + ]) + + def get_preprocessor(space): """Returns an appropriate preprocessor class for the given space.""" @@ -141,6 +182,8 @@ def get_preprocessor(space): preprocessor = AtariRamPreprocessor elif isinstance(space, gym.spaces.Tuple): preprocessor = TupleFlatteningPreprocessor + elif isinstance(space, gym.spaces.Dict): + preprocessor = DictFlatteningPreprocessor else: preprocessor = NoPreprocessor diff --git a/python/ray/rllib/models/visionnet.py b/python/ray/rllib/models/visionnet.py index 902addb6a..4105af7dd 100644 --- a/python/ray/rllib/models/visionnet.py +++ b/python/ray/rllib/models/visionnet.py @@ -12,7 +12,8 @@ from ray.rllib.models.misc import get_activation_fn, flatten class VisionNetwork(Model): """Generic vision network.""" - def _build_layers(self, inputs, num_outputs, options): + def _build_layers_v2(self, input_dict, num_outputs, options): + inputs = input_dict["obs"] filters = options.get("conv_filters") if not filters: filters = get_filter_config(options) diff --git a/python/ray/rllib/test/test_catalog.py b/python/ray/rllib/test/test_catalog.py index 62468e123..852a02fc4 100644 --- a/python/ray/rllib/test/test_catalog.py +++ b/python/ray/rllib/test/test_catalog.py @@ -15,16 +15,18 @@ from ray.rllib.models.visionnet import VisionNetwork class CustomPreprocessor(Preprocessor): - pass + def _init_shape(self, obs_space, options): + return None class CustomPreprocessor2(Preprocessor): - pass + def _init_shape(self, obs_space, options): + return None class CustomModel(Model): def _build_layers(self, *args): - return None, None + return tf.constant([[0] * 5]), None class ModelCatalogTest(unittest.TestCase): @@ -69,20 +71,24 @@ class ModelCatalogTest(unittest.TestCase): ray.init() with tf.variable_scope("test1"): - p1 = ModelCatalog.get_model( - np.zeros((10, 3), dtype=np.float32), 5, {}) + p1 = ModelCatalog.get_model({ + "obs": np.zeros((10, 3), dtype=np.float32) + }, Box(0, 1, shape=(3, ), dtype=np.float32), 5, {}) self.assertEqual(type(p1), FullyConnectedNetwork) with tf.variable_scope("test2"): - p2 = ModelCatalog.get_model( - np.zeros((10, 84, 84, 3), dtype=np.float32), 5, {}) + p2 = ModelCatalog.get_model({ + "obs": np.zeros((10, 84, 84, 3), dtype=np.float32) + }, Box(0, 1, shape=(84, 84, 3), dtype=np.float32), 5, {}) self.assertEqual(type(p2), VisionNetwork) def testCustomModel(self): ray.init() ModelCatalog.register_custom_model("foo", CustomModel) - p1 = ModelCatalog.get_model( - tf.constant([1, 2, 3]), 5, {"custom_model": "foo"}) + p1 = ModelCatalog.get_model({ + "obs": tf.constant([1, 2, 3]) + }, Box(0, 1, shape=(3, ), dtype=np.float32), 5, + {"custom_model": "foo"}) self.assertEqual(str(type(p1)), str(CustomModel)) diff --git a/python/ray/rllib/test/test_multi_agent_env.py b/python/ray/rllib/test/test_multi_agent_env.py index 493b338cf..31f3103f5 100644 --- a/python/ray/rllib/test/test_multi_agent_env.py +++ b/python/ray/rllib/test/test_multi_agent_env.py @@ -314,6 +314,8 @@ class TestMultiAgentEnv(unittest.TestCase): def compute_actions(self, obs_batch, state_batches, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): return [0] * len(obs_batch), [[h] * len(obs_batch)], {} @@ -337,6 +339,8 @@ class TestMultiAgentEnv(unittest.TestCase): def compute_actions(self, obs_batch, state_batches, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): # Pretend we did a model-based rollout and want to return diff --git a/python/ray/rllib/test/test_nested_spaces.py b/python/ray/rllib/test/test_nested_spaces.py new file mode 100644 index 000000000..f7f5f5981 --- /dev/null +++ b/python/ray/rllib/test/test_nested_spaces.py @@ -0,0 +1,249 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pickle + +from gym import spaces +from gym.envs.registration import EnvSpec +import gym +import tensorflow.contrib.slim as slim +import tensorflow as tf +import unittest + +import ray +from ray.rllib.agents.pg import PGAgent +from ray.rllib.env.async_vector_env import AsyncVectorEnv +from ray.rllib.env.vector_env import VectorEnv +from ray.rllib.models import ModelCatalog +from ray.rllib.models.model import Model +from ray.rllib.test.test_serving_env import SimpleServing +from ray.tune.registry import register_env + +DICT_SPACE = spaces.Dict({ + "sensors": spaces.Dict({ + "position": spaces.Box(low=-100, high=100, shape=(3, )), + "velocity": spaces.Box(low=-1, high=1, shape=(3, )), + "front_cam": spaces.Tuple( + (spaces.Box(low=0, high=1, shape=(10, 10, 3)), + spaces.Box(low=0, high=1, shape=(10, 10, 3)))), + "rear_cam": spaces.Box(low=0, high=1, shape=(10, 10, 3)), + }), + "inner_state": spaces.Dict({ + "charge": spaces.Discrete(100), + "job_status": spaces.Dict({ + "task": spaces.Discrete(5), + "progress": spaces.Box(low=0, high=100, shape=()), + }) + }) +}) + +DICT_SAMPLES = [DICT_SPACE.sample() for _ in range(10)] + +TUPLE_SPACE = spaces.Tuple([ + spaces.Box(low=-100, high=100, shape=(3, )), + spaces.Tuple((spaces.Box(low=0, high=1, shape=(10, 10, 3)), + spaces.Box(low=0, high=1, shape=(10, 10, 3)))), + spaces.Discrete(5), +]) + +TUPLE_SAMPLES = [TUPLE_SPACE.sample() for _ in range(10)] + + +def one_hot(i, n): + out = [0.0] * n + out[i] = 1.0 + return out + + +class NestedDictEnv(gym.Env): + def __init__(self): + self.action_space = spaces.Discrete(2) + self.observation_space = DICT_SPACE + self._spec = EnvSpec("NestedDictEnv-v0") + self.steps = 0 + + def reset(self): + self.steps = 0 + return DICT_SAMPLES[0] + + def step(self, action): + self.steps += 1 + return DICT_SAMPLES[self.steps], 1, self.steps >= 5, {} + + +class NestedTupleEnv(gym.Env): + def __init__(self): + self.action_space = spaces.Discrete(2) + self.observation_space = TUPLE_SPACE + self._spec = EnvSpec("NestedTupleEnv-v0") + self.steps = 0 + + def reset(self): + self.steps = 0 + return TUPLE_SAMPLES[0] + + def step(self, action): + self.steps += 1 + return TUPLE_SAMPLES[self.steps], 1, self.steps >= 5, {} + + +class InvalidModel(Model): + def _build_layers_v2(self, input_dict, num_outputs, options): + return "not", "valid" + + +class DictSpyModel(Model): + capture_index = 0 + + def _build_layers_v2(self, input_dict, num_outputs, options): + def spy(pos, front_cam, task): + # TF runs this function in an isolated context, so we have to use + # redis to communicate back to our suite + ray.experimental.internal_kv._internal_kv_put( + "d_spy_in_{}".format(DictSpyModel.capture_index), + pickle.dumps((pos, front_cam, task))) + DictSpyModel.capture_index += 1 + return 0 + + spy_fn = tf.py_func( + spy, [ + input_dict["obs"]["sensors"]["position"], + input_dict["obs"]["sensors"]["front_cam"][0], + input_dict["obs"]["inner_state"]["job_status"]["task"] + ], + tf.int64, + stateful=True) + + with tf.control_dependencies([spy_fn]): + output = slim.fully_connected( + input_dict["obs"]["sensors"]["position"], num_outputs) + return output, output + + +class TupleSpyModel(Model): + capture_index = 0 + + def _build_layers_v2(self, input_dict, num_outputs, options): + def spy(pos, cam, task): + # TF runs this function in an isolated context, so we have to use + # redis to communicate back to our suite + ray.experimental.internal_kv._internal_kv_put( + "t_spy_in_{}".format(TupleSpyModel.capture_index), + pickle.dumps((pos, cam, task))) + TupleSpyModel.capture_index += 1 + return 0 + + spy_fn = tf.py_func( + spy, [ + input_dict["obs"][0], + input_dict["obs"][1][0], + input_dict["obs"][2], + ], + tf.int64, + stateful=True) + + with tf.control_dependencies([spy_fn]): + output = slim.fully_connected(input_dict["obs"][0], num_outputs) + return output, output + + +class NestedSpacesTest(unittest.TestCase): + def testInvalidModel(self): + ModelCatalog.register_custom_model("invalid", InvalidModel) + self.assertRaises(ValueError, lambda: PGAgent( + env="CartPole-v0", config={ + "model": { + "custom_model": "invalid", + }, + })) + + def doTestNestedDict(self, make_env): + ModelCatalog.register_custom_model("composite", DictSpyModel) + register_env("nested", make_env) + pg = PGAgent( + env="nested", + config={ + "num_workers": 0, + "sample_batch_size": 5, + "model": { + "custom_model": "composite", + }, + }) + pg.train() + + # Check that the model sees the correct reconstructed observations + for i in range(4): + seen = pickle.loads( + ray.experimental.internal_kv._internal_kv_get( + "d_spy_in_{}".format(i))) + pos_i = DICT_SAMPLES[i]["sensors"]["position"].tolist() + cam_i = DICT_SAMPLES[i]["sensors"]["front_cam"][0].tolist() + task_i = one_hot( + DICT_SAMPLES[i]["inner_state"]["job_status"]["task"], 5) + self.assertEqual(seen[0][0].tolist(), pos_i) + self.assertEqual(seen[1][0].tolist(), cam_i) + self.assertEqual(seen[2][0].tolist(), task_i) + + def doTestNestedTuple(self, make_env): + ModelCatalog.register_custom_model("composite2", TupleSpyModel) + register_env("nested2", make_env) + pg = PGAgent( + env="nested2", + config={ + "num_workers": 0, + "sample_batch_size": 5, + "model": { + "custom_model": "composite2", + }, + }) + pg.train() + + # Check that the model sees the correct reconstructed observations + for i in range(4): + seen = pickle.loads( + ray.experimental.internal_kv._internal_kv_get( + "t_spy_in_{}".format(i))) + pos_i = TUPLE_SAMPLES[i][0].tolist() + cam_i = TUPLE_SAMPLES[i][1][0].tolist() + task_i = one_hot(TUPLE_SAMPLES[i][2], 5) + self.assertEqual(seen[0][0].tolist(), pos_i) + self.assertEqual(seen[1][0].tolist(), cam_i) + self.assertEqual(seen[2][0].tolist(), task_i) + + def testNestedDictGym(self): + self.doTestNestedDict(lambda _: NestedDictEnv()) + + def testNestedDictVector(self): + self.doTestNestedDict( + lambda _: VectorEnv.wrap(lambda i: NestedDictEnv())) + + def testNestedDictServing(self): + self.doTestNestedDict(lambda _: SimpleServing(NestedDictEnv())) + + def testNestedDictAsync(self): + self.assertRaisesRegexp( + ValueError, "Found raw Dict space.*", + lambda: self.doTestNestedDict( + lambda _: AsyncVectorEnv.wrap_async(NestedDictEnv()))) + + def testNestedTupleGym(self): + self.doTestNestedTuple(lambda _: NestedTupleEnv()) + + def testNestedTupleVector(self): + self.doTestNestedTuple( + lambda _: VectorEnv.wrap(lambda i: NestedTupleEnv())) + + def testNestedTupleServing(self): + self.doTestNestedTuple(lambda _: SimpleServing(NestedTupleEnv())) + + def testNestedTupleAsync(self): + self.assertRaisesRegexp( + ValueError, "Found raw Tuple space.*", + lambda: self.doTestNestedTuple( + lambda _: AsyncVectorEnv.wrap_async(NestedTupleEnv()))) + + +if __name__ == "__main__": + ray.init(num_cpus=5) + unittest.main(verbosity=2) diff --git a/python/ray/rllib/test/test_policy_evaluator.py b/python/ray/rllib/test/test_policy_evaluator.py index c4c2baf6e..b90613603 100644 --- a/python/ray/rllib/test/test_policy_evaluator.py +++ b/python/ray/rllib/test/test_policy_evaluator.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import gym +import numpy as np import time import unittest @@ -21,6 +22,8 @@ class MockPolicyGraph(PolicyGraph): def compute_actions(self, obs_batch, state_batches, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): return [0] * len(obs_batch), [], {} @@ -33,6 +36,8 @@ class BadPolicyGraph(PolicyGraph): def compute_actions(self, obs_batch, state_batches, + prev_action_batch=None, + prev_reward_batch=None, is_training=False, episodes=None): raise Exception("intentional error") @@ -107,8 +112,23 @@ class TestPolicyEvaluator(unittest.TestCase): env_creator=lambda _: gym.make("CartPole-v0"), policy_graph=MockPolicyGraph) batch = ev.sample() - for key in ["obs", "actions", "rewards", "dones", "advantages"]: + for key in [ + "obs", "actions", "rewards", "dones", "advantages", + "prev_rewards", "prev_actions" + ]: self.assertIn(key, batch) + + def to_prev(vec): + out = np.zeros_like(vec) + for i, v in enumerate(vec): + if i + 1 < len(out) and not batch["dones"][i]: + out[i + 1] = v + return out.tolist() + + self.assertEqual(batch["prev_rewards"].tolist(), + to_prev(batch["rewards"])) + self.assertEqual(batch["prev_actions"].tolist(), + to_prev(batch["actions"])) self.assertGreater(batch["advantages"][0], 1) def testGlobalVarsUpdate(self): diff --git a/python/ray/rllib/test/test_supported_spaces.py b/python/ray/rllib/test/test_supported_spaces.py index 20ef872ae..4f1aee012 100644 --- a/python/ray/rllib/test/test_supported_spaces.py +++ b/python/ray/rllib/test/test_supported_spaces.py @@ -2,7 +2,7 @@ import unittest import traceback import gym -from gym.spaces import Box, Discrete, Tuple +from gym.spaces import Box, Discrete, Tuple, Dict from gym.envs.registration import EnvSpec import numpy as np import sys @@ -14,33 +14,28 @@ from ray.tune.registry import register_env ACTION_SPACES_TO_TEST = { "discrete": Discrete(5), - "vector": Box(0.0, 1.0, (5, ), dtype=np.float32), - "simple_tuple": Tuple([ - Box(0.0, 1.0, (5, ), dtype=np.float32), - Box(0.0, 1.0, (5, ), dtype=np.float32) - ]), - "mixed_tuple": Tuple( + "vector": Box(-1.0, 1.0, (5, ), dtype=np.float32), + "tuple": Tuple( [Discrete(2), Discrete(3), - Box(0.0, 1.0, (5, ), dtype=np.float32)]), + Box(-1.0, 1.0, (5, ), dtype=np.float32)]), } OBSERVATION_SPACES_TO_TEST = { "discrete": Discrete(5), - "vector": Box(0.0, 1.0, (5, ), dtype=np.float32), - "image": Box(0.0, 1.0, (84, 84, 1), dtype=np.float32), - "atari": Box(0.0, 1.0, (210, 160, 3), dtype=np.float32), - "atari_ram": Box(0.0, 1.0, (128, ), dtype=np.float32), - "simple_tuple": Tuple([ - Box(0.0, 1.0, (5, ), dtype=np.float32), - Box(0.0, 1.0, (5, ), dtype=np.float32) - ]), - "mixed_tuple": Tuple( - [Discrete(10), Box(0.0, 1.0, (5, ), dtype=np.float32)]), + "vector": Box(-1.0, 1.0, (5, ), dtype=np.float32), + "image": Box(-1.0, 1.0, (84, 84, 1), dtype=np.float32), + "atari": Box(-1.0, 1.0, (210, 160, 3), dtype=np.float32), + "tuple": Tuple([Discrete(10), + Box(-1.0, 1.0, (5, ), dtype=np.float32)]), + "dict": Dict({ + "task": Discrete(10), + "position": Box(-1.0, 1.0, (5, ), dtype=np.float32), + }), } -def make_stub_env(action_space, obs_space): +def make_stub_env(action_space, obs_space, check_action_bounds): class StubEnv(gym.Env): def __init__(self): self.action_space = action_space @@ -52,16 +47,23 @@ def make_stub_env(action_space, obs_space): return sample def step(self, action): + if check_action_bounds and not self.action_space.contains(action): + raise ValueError("Illegal action for {}: {}".format( + self.action_space, action)) + if (isinstance(self.action_space, Tuple) + and len(action) != len(self.action_space.spaces)): + raise ValueError("Illegal action for {}: {}".format( + self.action_space, action)) return self.observation_space.sample(), 1, True, {} return StubEnv -def check_support(alg, config, stats): +def check_support(alg, config, stats, check_bounds=False): for a_name, action_space in ACTION_SPACES_TO_TEST.items(): for o_name, obs_space in OBSERVATION_SPACES_TO_TEST.items(): print("=== Testing", alg, action_space, obs_space, "===") - stub_env = make_stub_env(action_space, obs_space) + stub_env = make_stub_env(action_space, obs_space, check_bounds) register_env("stub_env", lambda c: stub_env()) stat = "ok" a = None @@ -105,8 +107,13 @@ class ModelSupportedSpaces(unittest.TestCase): "num_sgd_iter": 1, "train_batch_size": 10, "sample_batch_size": 10, - "sgd_minibatch_size": 1 - }, stats) + "sgd_minibatch_size": 1, + "model": { + "squash_to_range": True + }, + }, + stats, + check_bounds=True) check_support( "ES", { "num_workers": 1, diff --git a/python/ray/rllib/utils/tf_run_builder.py b/python/ray/rllib/utils/tf_run_builder.py index 030642ae5..2ea3ba7b8 100644 --- a/python/ray/rllib/utils/tf_run_builder.py +++ b/python/ray/rllib/utils/tf_run_builder.py @@ -26,7 +26,8 @@ class TFRunBuilder(object): def add_feed_dict(self, feed_dict): assert not self._executed for k in feed_dict: - assert k not in self.feed_dict + if k in self.feed_dict: + raise ValueError("Key added twice: {}".format(k)) self.feed_dict.update(feed_dict) def add_fetches(self, fetches): diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 0a6db03b9..821fefa7b 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -246,6 +246,9 @@ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_policy_evaluator.py +docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/test/test_nested_spaces.py + docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_serving_env.py @@ -314,6 +317,9 @@ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/cartpole_lstm.py --run=IMPALA --stop=100 +docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/examples/cartpole_lstm.py --stop=200 --use-prev-action-reward + docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/experimental/sgd/test_sgd.py --num-iters=2