[RLlib] Allow for more than 2^31 policy timesteps. (#11301)

This commit is contained in:
Sven Mika
2020-10-12 22:49:11 +02:00
committed by GitHub
parent f5e2cda68a
commit 8ea1bc5ff9
11 changed files with 96 additions and 14 deletions
+7
View File
@@ -1431,6 +1431,13 @@ py_test(
args = ["TestSupportedSpacesEvolutionAlgos"]
)
py_test(
name = "tests/test_timesteps",
tags = ["tests_dir", "tests_dir_T"],
size = "small",
srcs = ["tests/test_timesteps.py"]
)
# --------------------------------------------------------------------
# examples/ directory
#
+2 -1
View File
@@ -46,12 +46,13 @@ class RandomEnv(gym.Env):
self.action_space, action))
self.steps += 1
done = False
# We are done as per our max-episode-len.
if self.max_episode_len is not None and \
self.steps >= self.max_episode_len:
done = True
# Max not reached yet -> Sample done via p_done.
else:
elif self.p_done > 0.0:
done = bool(
np.random.choice(
[True, False], p=[self.p_done, 1.0 - self.p_done]))
+1 -1
View File
@@ -167,7 +167,7 @@ class DynamicTFPolicy(TFPolicy):
tf.float32, [None], name="prev_reward")
explore = tf1.placeholder_with_default(
True, (), name="is_exploring")
timestep = tf1.placeholder(tf.int32, (), name="timestep")
timestep = tf1.placeholder(tf.int64, (), name="timestep")
self._input_dict = {
SampleBatch.CUR_OBS: obs,
+6 -4
View File
@@ -20,7 +20,7 @@ tf1, tf, tfv = try_import_tf()
logger = logging.getLogger(__name__)
def _convert_to_tf(x):
def _convert_to_tf(x, dtype=None):
if isinstance(x, SampleBatch):
x = {k: v for k, v in x.items() if k != SampleBatch.INFOS}
return tf.nest.map_structure(_convert_to_tf, x)
@@ -28,8 +28,9 @@ def _convert_to_tf(x):
return x
if x is not None:
d = dtype
x = tf.nest.map_structure(
lambda f: tf.convert_to_tensor(f) if f is not None else None, x)
lambda f: tf.convert_to_tensor(f, d) if f is not None else None, x)
return x
@@ -52,9 +53,10 @@ def convert_eager_inputs(func):
def _func(*args, **kwargs):
if tf.executing_eagerly():
args = [_convert_to_tf(x) for x in args]
# TODO(gehring): find a way to remove specific hacks
# TODO: (sven) find a way to remove key-specific hacks.
kwargs = {
k: _convert_to_tf(v)
k: _convert_to_tf(
v, dtype=tf.int64 if k == "timestep" else None)
for k, v in kwargs.items()
if k not in {"info_batch", "episodes"}
}
+1 -1
View File
@@ -177,7 +177,7 @@ class TFPolicy(Policy):
self._apply_op = None
self._stats_fetches = {}
self._timestep = timestep if timestep is not None else \
tf1.placeholder(tf.int32, (), name="timestep")
tf1.placeholder(tf.int64, (), name="timestep")
self._optimizer = None
self._grads_and_vars = None
+54
View File
@@ -0,0 +1,54 @@
import numpy as np
import unittest
import ray
import ray.rllib.agents.pg as pg
from ray.rllib.examples.env.random_env import RandomEnv
from ray.rllib.utils.test_utils import framework_iterator
class TestTimeSteps(unittest.TestCase):
@classmethod
def setUpClass(cls):
ray.init()
@classmethod
def tearDownClass(cls):
ray.shutdown()
def test_timesteps(self):
"""Test whether a PGTrainer can be built with both frameworks."""
config = pg.DEFAULT_CONFIG.copy()
config["num_workers"] = 0 # Run locally.
config["model"]["fcnet_hiddens"] = [1]
config["model"]["fcnet_activation"] = None
obs = np.array(1)
obs_one_hot = np.array([[0.0, 1.0]])
for _ in framework_iterator(config):
trainer = pg.PGTrainer(config=config, env=RandomEnv)
policy = trainer.get_policy()
for i in range(1, 21):
trainer.compute_action(obs)
self.assertEqual(policy.global_timestep, i)
for i in range(1, 21):
policy.compute_actions(obs_one_hot)
self.assertEqual(policy.global_timestep, i + 20)
# Artificially set ts to 100Bio, then keep computing actions and
# train.
crazy_timesteps = int(1e11)
policy.global_timestep = crazy_timesteps
# Run for 10 more ts.
for i in range(1, 11):
policy.compute_actions(obs_one_hot)
self.assertEqual(policy.global_timestep, i + crazy_timesteps)
trainer.train()
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+5 -1
View File
@@ -1,3 +1,4 @@
import numpy as np
from typing import Union, Optional
from ray.rllib.models.action_dist import ActionDistribution
@@ -54,7 +55,10 @@ class EpsilonGreedy(Exploration):
# The current timestep value (tf-var or python int).
self.last_timestep = get_variable(
0, framework=framework, tf_name="timestep")
np.array(0, np.int64),
framework=framework,
tf_name="timestep",
dtype=np.int64)
# Build the tf-info-op.
if self.framework in ["tf2", "tf", "tfe"]:
+5 -1
View File
@@ -1,4 +1,5 @@
from gym.spaces import Space
import numpy as np
from typing import Union, Optional
from ray.rllib.models.action_dist import ActionDistribution
@@ -74,7 +75,10 @@ class GaussianNoise(Exploration):
# The current timestep value (tf-var or python int).
self.last_timestep = get_variable(
0, framework=self.framework, tf_name="timestep")
np.array(0, np.int64),
framework=self.framework,
tf_name="timestep",
dtype=np.int64)
# Build the tf-info-op.
if self.framework in ["tf2", "tf", "tfe"]:
@@ -1,4 +1,5 @@
import gym
import numpy as np
import tree
from typing import Union
@@ -53,7 +54,10 @@ class StochasticSampling(Exploration):
# The current timestep value (tf-var or python int).
self.last_timestep = get_variable(
0, framework=self.framework, tf_name="timestep")
np.array(0, np.int64),
framework=self.framework,
tf_name="timestep",
dtype=np.int64)
@override(Exploration)
def get_exploration_action(self,
+7 -2
View File
@@ -1,4 +1,5 @@
import logging
import numpy as np
import os
import sys
from typing import Any, Optional
@@ -185,6 +186,7 @@ def get_variable(value,
does not have any (e.g. if it's an initializer w/o explicit value).
dtype (Optional[TensorType]): An optional dtype to use iff `value` does
not have any (e.g. if it's an initializer w/o explicit value).
This should always be a numpy dtype (e.g. np.float32, np.int64).
Returns:
any: A framework-specific variable (tf.Variable, torch.tensor, or
@@ -207,10 +209,13 @@ def get_variable(value,
elif framework == "torch" and torch_tensor is True:
torch, _ = try_import_torch()
var_ = torch.from_numpy(value)
if dtype == torch.float32:
if dtype in [torch.float32, np.float32]:
var_ = var_.float()
elif dtype == torch.int32:
elif dtype in [torch.int32, np.int32]:
var_ = var_.int()
elif dtype in [torch.float64, np.float64]:
var_ = var_.double()
if device:
var_ = var_.to(device)
var_.requires_grad = trainable
+3 -2
View File
@@ -65,7 +65,7 @@ class PiecewiseSchedule(Schedule):
"provided!"
endpoints = tf.cast(
tf.stack([e[0] for e in self.endpoints] + [-1]), tf.int32)
tf.stack([e[0] for e in self.endpoints] + [-1]), tf.int64)
# Create all possible interpolation results.
results_list = []
@@ -79,6 +79,7 @@ class PiecewiseSchedule(Schedule):
# Return correct results tensor depending on where we find t.
def _cond(i, x):
x = tf.cast(x, tf.int64)
return tf.logical_not(
tf.logical_or(
tf.equal(endpoints[i + 1], -1),
@@ -88,5 +89,5 @@ class PiecewiseSchedule(Schedule):
return (i + 1, t)
idx_and_t = tf.while_loop(_cond, _body,
[tf.constant(0, dtype=tf.int32), t])
[tf.constant(0, dtype=tf.int64), t])
return results_list[idx_and_t[0]]