diff --git a/doc/source/rllib-env.rst b/doc/source/rllib-env.rst index 161b4a3a1..3c2e13fda 100644 --- a/doc/source/rllib-env.rst +++ b/doc/source/rllib-env.rst @@ -26,6 +26,26 @@ In the high-level agent APIs, environments are identified with string names. By while True: print(trainer.train()) +Configuring Environments +------------------------ + +In the above example, note that the ``env_creator`` function takes in an ``env_config`` object. This is a dict containing options passed in through your agent. You can also access ``env_config.worker_index`` and ``env_config.vector_index`` to get the worker id and env id within the worker (if ``num_envs_per_worker > 0``). This can be useful if you want to train over an ensemble of different environments, for example: + +.. code-block:: python + + class MultiEnv(gym.Env): + def __init__(self, env_config): + # pick actual env based on worker and env indexes + self.env = gym.make( + choose_env_for(env_config.worker_index, env_config.vector_index)) + self.action_space = self.env.action_space + self.observation_space = self.env.observation_space + def reset(self): + return self.env.reset() + def step(self, action): + return self.env.step(action) + + register_env("multienv", lambda config: MultiEnv(config)) OpenAI Gym ---------- diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 9f8b0bf1f..dd1398a05 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -108,7 +108,43 @@ Here is an example of the basic usage: checkpoint = agent.save() print("checkpoint saved at", checkpoint) -All RLlib agents implement the tune Trainable API, which means they support incremental training and checkpointing. This enables them to be easily used in experiments with Ray Tune. +.. note:: + + It's recommended that you run RLlib agents with `Tune `__, for easy experiment management and visualization of results. Just set ``"run": AGENT_NAME, "env": ENV_NAME`` in the experiment config. + +All RLlib agents are compatible with the `Tune API `__. This enables them to be easily used in experiments with `Tune `__. For example, the following code performs a simple hyperparam sweep of PPO: + +.. code-block:: python + + import ray + import ray.tune as tune + + ray.init() + tune.run_experiments({ + "my_experiment": { + "run": "PPO", + "env": "CartPole-v0", + "stop": {"episode_reward_mean": 200}, + "config": { + "num_workers": 1, + "sgd_stepsize": tune.grid_search([0.01, 0.001, 0.0001]), + }, + }, + }) + +Tune will schedule the trials to run in parallel on your Ray cluster: + +:: + + == Status == + Using FIFO scheduling algorithm. + Resources requested: 4/4 CPUs, 0/0 GPUs + Result logdir: /home/eric/ray_results/my_experiment + PENDING trials: + - PPO_CartPole-v0_2_sgd_stepsize=0.0001: PENDING + RUNNING trials: + - PPO_CartPole-v0_0_sgd_stepsize=0.01: RUNNING [pid=21940], 16 s, 4013 ts, 22 rew + - PPO_CartPole-v0_1_sgd_stepsize=0.001: RUNNING [pid=21942], 27 s, 8111 ts, 54.7 rew Accessing Global State ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/python/ray/rllib/env/async_vector_env.py b/python/ray/rllib/env/async_vector_env.py index ba0d63c12..952eb1e09 100644 --- a/python/ray/rllib/env/async_vector_env.py +++ b/python/ray/rllib/env/async_vector_env.py @@ -251,7 +251,7 @@ class _MultiAgentEnvToAsync(AsyncVectorEnv): self.num_envs = num_envs self.dones = set() while len(self.envs) < self.num_envs: - self.envs.append(self.make_env()) + self.envs.append(self.make_env(len(self.envs))) for env in self.envs: assert isinstance(env, MultiAgentEnv) self.env_states = [_MultiAgentEnvState(env) for env in self.envs] diff --git a/python/ray/rllib/env/env_context.py b/python/ray/rllib/env/env_context.py index e3885b0d3..dcf8372aa 100644 --- a/python/ray/rllib/env/env_context.py +++ b/python/ray/rllib/env/env_context.py @@ -15,8 +15,15 @@ class EnvContext(dict): Attributes: worker_index (int): When there are multiple workers created, this uniquely identifies the worker the env is created in. + vector_index (int): When there are multiple envs per worker, this + uniquely identifies the env index within the worker. """ - def __init__(self, env_config, worker_index): + def __init__(self, env_config, worker_index, vector_index=0): dict.__init__(self, env_config) self.worker_index = worker_index + self.vector_index = vector_index + + def with_vector_index(self, vector_index): + return EnvContext( + self, worker_index=self.worker_index, vector_index=vector_index) diff --git a/python/ray/rllib/env/vector_env.py b/python/ray/rllib/env/vector_env.py index 28791f552..fa4f60321 100644 --- a/python/ray/rllib/env/vector_env.py +++ b/python/ray/rllib/env/vector_env.py @@ -2,9 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import queue -import threading - class VectorEnv(object): """An environment that supports batch evaluation. @@ -70,20 +67,14 @@ class _VectorizedGymEnv(VectorEnv): self.make_env = make_env self.envs = existing_envs self.num_envs = num_envs - if make_env and num_envs > 1: - self.resetter = _AsyncResetter(make_env, int(self.num_envs**0.5)) - else: - self.resetter = _SimpleResetter(make_env) while len(self.envs) < self.num_envs: - self.envs.append(self.make_env()) + self.envs.append(self.make_env(len(self.envs))) def vector_reset(self): return [e.reset() for e in self.envs] def reset_at(self, index): - new_obs, new_env = self.resetter.trade_for_resetted(self.envs[index]) - self.envs[index] = new_env - return new_obs + return self.envs[index].reset() def vector_step(self, actions): obs_batch, rew_batch, done_batch, info_batch = [], [], [], [] @@ -97,42 +88,3 @@ class _VectorizedGymEnv(VectorEnv): def get_unwrapped(self): return self.envs[0] - - -class _AsyncResetter(threading.Thread): - """Does env reset asynchronously in the background. - - This is useful since resetting an env can be 100x slower than stepping.""" - - def __init__(self, make_env, pool_size): - threading.Thread.__init__(self) - self.make_env = make_env - self.pool_size = 0 - self.to_reset = queue.Queue() - self.resetted = queue.Queue() - self.daemon = True - self.pool_size = pool_size - while self.resetted.qsize() < self.pool_size: - env = self.make_env() - obs = env.reset() - self.resetted.put((obs, env)) - self.start() - - def run(self): - while True: - env = self.to_reset.get() - obs = env.reset() - self.resetted.put((obs, env)) - - def trade_for_resetted(self, env): - self.to_reset.put(env) - new_obs, new_env = self.resetted.get(timeout=30) - return new_obs, new_env - - -class _SimpleResetter(object): - def __init__(self, make_env): - pass - - def trade_for_resetted(self, env): - return env.reset(), env diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 4cc13852d..82b9569e0 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -190,8 +190,9 @@ class PolicyEvaluator(EvaluatorInterface): self.env = wrap(self.env) - def make_env(): - return wrap(env_creator(env_context)) + def make_env(vector_index): + return wrap( + env_creator(env_context.with_vector_index(vector_index))) self.tf_sess = None policy_dict = _validate_and_canonicalize(policy_graph, self.env) diff --git a/python/ray/rllib/test/test_multi_agent_env.py b/python/ray/rllib/test/test_multi_agent_env.py index c6ce25ed4..3150985db 100644 --- a/python/ray/rllib/test/test_multi_agent_env.py +++ b/python/ray/rllib/test/test_multi_agent_env.py @@ -160,7 +160,7 @@ class TestMultiAgentEnv(unittest.TestCase): self.assertEqual(done["__all__"], True) def testVectorizeBasic(self): - env = _MultiAgentEnvToAsync(lambda: BasicMultiAgent(2), [], 2) + env = _MultiAgentEnvToAsync(lambda v: BasicMultiAgent(2), [], 2) obs, rew, dones, _, _ = env.poll() self.assertEqual(obs, {0: {0: 0, 1: 0}, 1: {0: 0, 1: 0}}) self.assertEqual(rew, {0: {0: None, 1: None}, 1: {0: None, 1: None}}) @@ -236,7 +236,7 @@ class TestMultiAgentEnv(unittest.TestCase): }) def testVectorizeRoundRobin(self): - env = _MultiAgentEnvToAsync(lambda: RoundRobinMultiAgent(2), [], 2) + env = _MultiAgentEnvToAsync(lambda v: RoundRobinMultiAgent(2), [], 2) obs, rew, dones, _, _ = env.poll() self.assertEqual(obs, {0: {0: 0}, 1: {0: 0}}) self.assertEqual(rew, {0: {0: None}, 1: {0: None}}) diff --git a/python/ray/rllib/test/test_policy_evaluator.py b/python/ray/rllib/test/test_policy_evaluator.py index 472625fb3..0fb179781 100644 --- a/python/ray/rllib/test/test_policy_evaluator.py +++ b/python/ray/rllib/test/test_policy_evaluator.py @@ -33,8 +33,9 @@ class BadPolicyGraph(PolicyGraph): class MockEnv(gym.Env): - def __init__(self, episode_length): + def __init__(self, episode_length, config=None): self.episode_length = episode_length + self.config = config self.i = 0 self.observation_space = gym.spaces.Discrete(1) self.action_space = gym.spaces.Discrete(2) @@ -150,7 +151,7 @@ class TestPolicyEvaluator(unittest.TestCase): def testAutoVectorization(self): ev = PolicyEvaluator( - env_creator=lambda _: MockEnv(episode_length=20), + env_creator=lambda cfg: MockEnv(episode_length=20, config=cfg), policy_graph=MockPolicyGraph, batch_mode="truncate_episodes", batch_steps=16, @@ -165,6 +166,11 @@ class TestPolicyEvaluator(unittest.TestCase): self.assertEqual(batch.count, 16) result = collect_metrics(ev, []) self.assertEqual(result.episodes_total, 8) + indices = [] + for env in ev.async_env.vector_env.envs: + self.assertEqual(env.unwrapped.config.worker_index, 0) + indices.append(env.unwrapped.config.vector_index) + self.assertEqual(indices, [0, 1, 2, 3, 4, 5, 6, 7]) def testBatchDivisibilityCheck(self): self.assertRaises(