From 8e75d150f73c7c9d02842d1fc3f9b75b900ce3dd Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 19 Jul 2018 15:58:09 -0700 Subject: [PATCH] [rllib] Apex crash when compress_observations: False (#2426) We shouldn't try to decompress uncompressed data. Also, fix resource requests for ddpg + GPU. --- python/ray/rllib/agents/ddpg/apex.py | 11 +++++++++++ python/ray/rllib/agents/ddpg/ddpg.py | 2 ++ python/ray/rllib/evaluation/policy_evaluator.py | 4 ++-- python/ray/rllib/optimizers/replay_buffer.py | 6 +++--- python/ray/rllib/utils/compression.py | 6 ++++++ test/jenkins_tests/run_multi_node_tests.sh | 8 ++++++++ 6 files changed, 32 insertions(+), 5 deletions(-) diff --git a/python/ray/rllib/agents/ddpg/apex.py b/python/ray/rllib/agents/ddpg/apex.py index b35f1ea35..b114a10af 100644 --- a/python/ray/rllib/agents/ddpg/apex.py +++ b/python/ray/rllib/agents/ddpg/apex.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function from ray.rllib.agents.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG as DDPG_CONFIG +from ray.tune.trial import Resources from ray.utils import merge_dicts APEX_DDPG_DEFAULT_CONFIG = merge_dicts( @@ -16,6 +17,7 @@ APEX_DDPG_DEFAULT_CONFIG = merge_dicts( "debug": False }), "n_step": 3, + "gpu": False, "num_workers": 32, "buffer_size": 2000000, "learning_starts": 50000, @@ -40,6 +42,15 @@ class ApexDDPGAgent(DDPGAgent): _agent_name = "APEX_DDPG" _default_config = APEX_DDPG_DEFAULT_CONFIG + @classmethod + def default_resource_request(cls, config): + cf = dict(cls._default_config, **config) + return Resources( + cpu=1 + cf["optimizer"]["num_replay_buffer_shards"], + gpu=cf["gpu"] and 1 or 0, + extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], + extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) + def update_target_if_needed(self): # Ape-X updates based on num steps trained, not sampled if self.optimizer.num_steps_trained - self.last_target_update_ts > \ diff --git a/python/ray/rllib/agents/ddpg/ddpg.py b/python/ray/rllib/agents/ddpg/ddpg.py index 95b6859d2..d9816da3c 100644 --- a/python/ray/rllib/agents/ddpg/ddpg.py +++ b/python/ray/rllib/agents/ddpg/ddpg.py @@ -63,6 +63,8 @@ DEFAULT_CONFIG = with_common_config({ "prioritized_replay_eps": 1e-6, # Whether to clip rewards to [-1, 1] prior to adding to the replay buffer. "clip_rewards": True, + # Whether to LZ4 compress observations + "compress_observations": False, # === Optimization === # Learning rate for adam optimizer diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 58f121ef4..ff5d37857 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -136,8 +136,8 @@ class PolicyEvaluator(EvaluatorInterface): sample_async (bool): Whether to compute samples asynchronously in the background, which improves throughput but can cause samples to be slightly off-policy. - compress_observations (bool): If true, compress the observations - returned. + compress_observations (bool): If true, compress the observations. + They can be decompressed with rllib/utils/compression. num_envs (int): If more than one, will create multiple envs and vectorize the computation of actions. This has no effect if if the env already implements VectorEnv. diff --git a/python/ray/rllib/optimizers/replay_buffer.py b/python/ray/rllib/optimizers/replay_buffer.py index 6730a62b2..fecea9f9d 100644 --- a/python/ray/rllib/optimizers/replay_buffer.py +++ b/python/ray/rllib/optimizers/replay_buffer.py @@ -7,7 +7,7 @@ import random import sys from ray.rllib.optimizers.segment_tree import SumSegmentTree, MinSegmentTree -from ray.rllib.utils.compression import unpack +from ray.rllib.utils.compression import unpack_if_needed from ray.rllib.utils.window_stat import WindowStat @@ -59,10 +59,10 @@ class ReplayBuffer(object): for i in idxes: data = self._storage[i] obs_t, action, reward, obs_tp1, done = data - obses_t.append(np.array(unpack(obs_t), copy=False)) + obses_t.append(np.array(unpack_if_needed(obs_t), copy=False)) actions.append(np.array(action, copy=False)) rewards.append(reward) - obses_tp1.append(np.array(unpack(obs_tp1), copy=False)) + obses_tp1.append(np.array(unpack_if_needed(obs_tp1), copy=False)) dones.append(done) self._hit_count[i] += 1 return (np.array(obses_t), np.array(actions), np.array(rewards), diff --git a/python/ray/rllib/utils/compression.py b/python/ray/rllib/utils/compression.py index ddef7a6ab..5f28455ee 100644 --- a/python/ray/rllib/utils/compression.py +++ b/python/ray/rllib/utils/compression.py @@ -41,6 +41,12 @@ def unpack(data): return data +def unpack_if_needed(data): + if isinstance(data, bytes): + data = unpack(data) + return data + + # Intel(R) Core(TM) i7-4600U CPU @ 2.10GHz # Compression speed: 753.664 MB/s # Compression ratio: 87.4839812046 diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index d3341a2d3..bc4acbabf 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -191,6 +191,14 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 1}' +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env Pendulum-v0 \ + --run APEX_DDPG \ + --ray-num-cpus 8 + --stop '{"training_iteration": 2}' \ + --config '{"num_workers": 2, "optimizer": {"num_replay_buffer_shards": 1}, "learning_starts": 100}' + docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ sh /ray/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh