From 8ea1bc5ff9faa2cdea2074a1f84f3830f2c5a082 Mon Sep 17 00:00:00 2001 From: Sven Mika Date: Mon, 12 Oct 2020 22:49:11 +0200 Subject: [PATCH] [RLlib] Allow for more than 2^31 policy timesteps. (#11301) --- rllib/BUILD | 7 +++ rllib/examples/env/random_env.py | 3 +- rllib/policy/dynamic_tf_policy.py | 2 +- rllib/policy/eager_tf_policy.py | 10 ++-- rllib/policy/tf_policy.py | 2 +- rllib/tests/test_timesteps.py | 54 +++++++++++++++++++ rllib/utils/exploration/epsilon_greedy.py | 6 ++- rllib/utils/exploration/gaussian_noise.py | 6 ++- .../utils/exploration/stochastic_sampling.py | 6 ++- rllib/utils/framework.py | 9 +++- rllib/utils/schedules/piecewise_schedule.py | 5 +- 11 files changed, 96 insertions(+), 14 deletions(-) create mode 100644 rllib/tests/test_timesteps.py diff --git a/rllib/BUILD b/rllib/BUILD index a1afef2f6..2db5c73b9 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -1431,6 +1431,13 @@ py_test( args = ["TestSupportedSpacesEvolutionAlgos"] ) +py_test( + name = "tests/test_timesteps", + tags = ["tests_dir", "tests_dir_T"], + size = "small", + srcs = ["tests/test_timesteps.py"] +) + # -------------------------------------------------------------------- # examples/ directory # diff --git a/rllib/examples/env/random_env.py b/rllib/examples/env/random_env.py index 7f8a2ef18..ca650c4f2 100644 --- a/rllib/examples/env/random_env.py +++ b/rllib/examples/env/random_env.py @@ -46,12 +46,13 @@ class RandomEnv(gym.Env): self.action_space, action)) self.steps += 1 + done = False # We are done as per our max-episode-len. if self.max_episode_len is not None and \ self.steps >= self.max_episode_len: done = True # Max not reached yet -> Sample done via p_done. - else: + elif self.p_done > 0.0: done = bool( np.random.choice( [True, False], p=[self.p_done, 1.0 - self.p_done])) diff --git a/rllib/policy/dynamic_tf_policy.py b/rllib/policy/dynamic_tf_policy.py index dcec4ba33..7312a4602 100644 --- a/rllib/policy/dynamic_tf_policy.py +++ b/rllib/policy/dynamic_tf_policy.py @@ -167,7 +167,7 @@ class DynamicTFPolicy(TFPolicy): tf.float32, [None], name="prev_reward") explore = tf1.placeholder_with_default( True, (), name="is_exploring") - timestep = tf1.placeholder(tf.int32, (), name="timestep") + timestep = tf1.placeholder(tf.int64, (), name="timestep") self._input_dict = { SampleBatch.CUR_OBS: obs, diff --git a/rllib/policy/eager_tf_policy.py b/rllib/policy/eager_tf_policy.py index 2c90c959a..71dfae5fc 100644 --- a/rllib/policy/eager_tf_policy.py +++ b/rllib/policy/eager_tf_policy.py @@ -20,7 +20,7 @@ tf1, tf, tfv = try_import_tf() logger = logging.getLogger(__name__) -def _convert_to_tf(x): +def _convert_to_tf(x, dtype=None): if isinstance(x, SampleBatch): x = {k: v for k, v in x.items() if k != SampleBatch.INFOS} return tf.nest.map_structure(_convert_to_tf, x) @@ -28,8 +28,9 @@ def _convert_to_tf(x): return x if x is not None: + d = dtype x = tf.nest.map_structure( - lambda f: tf.convert_to_tensor(f) if f is not None else None, x) + lambda f: tf.convert_to_tensor(f, d) if f is not None else None, x) return x @@ -52,9 +53,10 @@ def convert_eager_inputs(func): def _func(*args, **kwargs): if tf.executing_eagerly(): args = [_convert_to_tf(x) for x in args] - # TODO(gehring): find a way to remove specific hacks + # TODO: (sven) find a way to remove key-specific hacks. kwargs = { - k: _convert_to_tf(v) + k: _convert_to_tf( + v, dtype=tf.int64 if k == "timestep" else None) for k, v in kwargs.items() if k not in {"info_batch", "episodes"} } diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 86cc0d178..bb4c06347 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -177,7 +177,7 @@ class TFPolicy(Policy): self._apply_op = None self._stats_fetches = {} self._timestep = timestep if timestep is not None else \ - tf1.placeholder(tf.int32, (), name="timestep") + tf1.placeholder(tf.int64, (), name="timestep") self._optimizer = None self._grads_and_vars = None diff --git a/rllib/tests/test_timesteps.py b/rllib/tests/test_timesteps.py new file mode 100644 index 000000000..5e5425b5c --- /dev/null +++ b/rllib/tests/test_timesteps.py @@ -0,0 +1,54 @@ +import numpy as np +import unittest + +import ray +import ray.rllib.agents.pg as pg +from ray.rllib.examples.env.random_env import RandomEnv +from ray.rllib.utils.test_utils import framework_iterator + + +class TestTimeSteps(unittest.TestCase): + @classmethod + def setUpClass(cls): + ray.init() + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + def test_timesteps(self): + """Test whether a PGTrainer can be built with both frameworks.""" + config = pg.DEFAULT_CONFIG.copy() + config["num_workers"] = 0 # Run locally. + config["model"]["fcnet_hiddens"] = [1] + config["model"]["fcnet_activation"] = None + + obs = np.array(1) + obs_one_hot = np.array([[0.0, 1.0]]) + + for _ in framework_iterator(config): + trainer = pg.PGTrainer(config=config, env=RandomEnv) + policy = trainer.get_policy() + + for i in range(1, 21): + trainer.compute_action(obs) + self.assertEqual(policy.global_timestep, i) + for i in range(1, 21): + policy.compute_actions(obs_one_hot) + self.assertEqual(policy.global_timestep, i + 20) + + # Artificially set ts to 100Bio, then keep computing actions and + # train. + crazy_timesteps = int(1e11) + policy.global_timestep = crazy_timesteps + # Run for 10 more ts. + for i in range(1, 11): + policy.compute_actions(obs_one_hot) + self.assertEqual(policy.global_timestep, i + crazy_timesteps) + trainer.train() + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/utils/exploration/epsilon_greedy.py b/rllib/utils/exploration/epsilon_greedy.py index 1aeef99c1..c45f712d2 100644 --- a/rllib/utils/exploration/epsilon_greedy.py +++ b/rllib/utils/exploration/epsilon_greedy.py @@ -1,3 +1,4 @@ +import numpy as np from typing import Union, Optional from ray.rllib.models.action_dist import ActionDistribution @@ -54,7 +55,10 @@ class EpsilonGreedy(Exploration): # The current timestep value (tf-var or python int). self.last_timestep = get_variable( - 0, framework=framework, tf_name="timestep") + np.array(0, np.int64), + framework=framework, + tf_name="timestep", + dtype=np.int64) # Build the tf-info-op. if self.framework in ["tf2", "tf", "tfe"]: diff --git a/rllib/utils/exploration/gaussian_noise.py b/rllib/utils/exploration/gaussian_noise.py index 549e00ffd..4a7f53f7d 100644 --- a/rllib/utils/exploration/gaussian_noise.py +++ b/rllib/utils/exploration/gaussian_noise.py @@ -1,4 +1,5 @@ from gym.spaces import Space +import numpy as np from typing import Union, Optional from ray.rllib.models.action_dist import ActionDistribution @@ -74,7 +75,10 @@ class GaussianNoise(Exploration): # The current timestep value (tf-var or python int). self.last_timestep = get_variable( - 0, framework=self.framework, tf_name="timestep") + np.array(0, np.int64), + framework=self.framework, + tf_name="timestep", + dtype=np.int64) # Build the tf-info-op. if self.framework in ["tf2", "tf", "tfe"]: diff --git a/rllib/utils/exploration/stochastic_sampling.py b/rllib/utils/exploration/stochastic_sampling.py index 655fe7173..c40fae254 100644 --- a/rllib/utils/exploration/stochastic_sampling.py +++ b/rllib/utils/exploration/stochastic_sampling.py @@ -1,4 +1,5 @@ import gym +import numpy as np import tree from typing import Union @@ -53,7 +54,10 @@ class StochasticSampling(Exploration): # The current timestep value (tf-var or python int). self.last_timestep = get_variable( - 0, framework=self.framework, tf_name="timestep") + np.array(0, np.int64), + framework=self.framework, + tf_name="timestep", + dtype=np.int64) @override(Exploration) def get_exploration_action(self, diff --git a/rllib/utils/framework.py b/rllib/utils/framework.py index 156539178..3595399d6 100644 --- a/rllib/utils/framework.py +++ b/rllib/utils/framework.py @@ -1,4 +1,5 @@ import logging +import numpy as np import os import sys from typing import Any, Optional @@ -185,6 +186,7 @@ def get_variable(value, does not have any (e.g. if it's an initializer w/o explicit value). dtype (Optional[TensorType]): An optional dtype to use iff `value` does not have any (e.g. if it's an initializer w/o explicit value). + This should always be a numpy dtype (e.g. np.float32, np.int64). Returns: any: A framework-specific variable (tf.Variable, torch.tensor, or @@ -207,10 +209,13 @@ def get_variable(value, elif framework == "torch" and torch_tensor is True: torch, _ = try_import_torch() var_ = torch.from_numpy(value) - if dtype == torch.float32: + if dtype in [torch.float32, np.float32]: var_ = var_.float() - elif dtype == torch.int32: + elif dtype in [torch.int32, np.int32]: var_ = var_.int() + elif dtype in [torch.float64, np.float64]: + var_ = var_.double() + if device: var_ = var_.to(device) var_.requires_grad = trainable diff --git a/rllib/utils/schedules/piecewise_schedule.py b/rllib/utils/schedules/piecewise_schedule.py index 8d3f22c6a..88e54dfe7 100644 --- a/rllib/utils/schedules/piecewise_schedule.py +++ b/rllib/utils/schedules/piecewise_schedule.py @@ -65,7 +65,7 @@ class PiecewiseSchedule(Schedule): "provided!" endpoints = tf.cast( - tf.stack([e[0] for e in self.endpoints] + [-1]), tf.int32) + tf.stack([e[0] for e in self.endpoints] + [-1]), tf.int64) # Create all possible interpolation results. results_list = [] @@ -79,6 +79,7 @@ class PiecewiseSchedule(Schedule): # Return correct results tensor depending on where we find t. def _cond(i, x): + x = tf.cast(x, tf.int64) return tf.logical_not( tf.logical_or( tf.equal(endpoints[i + 1], -1), @@ -88,5 +89,5 @@ class PiecewiseSchedule(Schedule): return (i + 1, t) idx_and_t = tf.while_loop(_cond, _body, - [tf.constant(0, dtype=tf.int32), t]) + [tf.constant(0, dtype=tf.int64), t]) return results_list[idx_and_t[0]]