diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 687f5b53d..91cfb3f56 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -301,7 +301,9 @@ Approach 1: Use the Agent API and update the environment between calls to ``trai phase = 1 else: phase = 0 - agent.optimizer.foreach_evaluator(lambda ev: ev.env.set_phase(phase)) + agent.optimizer.foreach_evaluator( + lambda ev: ev.foreach_env( + lambda env: env.set_phase(phase))) ray.init() tune.run_experiments({ @@ -335,7 +337,9 @@ Approach 2: Use the callbacks API to update the environment on new training resu else: phase = 0 agent = info["agent"] - agent.optimizer.foreach_evaluator(lambda ev: ev.env.set_phase(phase)) + agent.optimizer.foreach_evaluator( + lambda ev: ev.foreach_env( + lambda env: env.set_phase(phase))) ray.init() tune.run_experiments({ diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 5902e63ec..4c90a905e 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -494,6 +494,16 @@ class PolicyEvaluator(EvaluatorInterface): self.policy_map[DEFAULT_POLICY_ID].compute_apply(samples)) return grad_fetch + @DeveloperAPI + def foreach_env(self, func): + """Apply the given function to each underlying env instance.""" + + envs = self.async_env.get_unwrapped() + if not envs: + return [func(self.async_env)] + else: + return [func(e) for e in envs] + @DeveloperAPI def get_policy(self, policy_id=DEFAULT_POLICY_ID): """Return policy graph for the specified id, or None. diff --git a/python/ray/rllib/test/test_policy_evaluator.py b/python/ray/rllib/test/test_policy_evaluator.py index adff6aa91..a71644d26 100644 --- a/python/ray/rllib/test/test_policy_evaluator.py +++ b/python/ray/rllib/test/test_policy_evaluator.py @@ -209,16 +209,21 @@ class TestPolicyEvaluator(unittest.TestCase): def testQueryEvaluators(self): register_env("test", lambda _: gym.make("CartPole-v0")) pg = PGAgent( - env="test", config={ + env="test", + config={ "num_workers": 2, - "sample_batch_size": 5 + "sample_batch_size": 5, + "num_envs_per_worker": 2, }) results = pg.optimizer.foreach_evaluator( lambda ev: ev.sample_batch_size) results2 = pg.optimizer.foreach_evaluator_with_index( lambda ev, i: (i, ev.sample_batch_size)) - self.assertEqual(results, [5, 5, 5]) - self.assertEqual(results2, [(0, 5), (1, 5), (2, 5)]) + results3 = pg.optimizer.foreach_evaluator( + lambda ev: ev.foreach_env(lambda env: 1)) + self.assertEqual(results, [10, 10, 10]) + self.assertEqual(results2, [(0, 10), (1, 10), (2, 10)]) + self.assertEqual(results3, [[1, 1], [1, 1], [1, 1]]) def testRewardClipping(self): # clipping on