mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 22:37:36 +08:00
[rllib] Apex crash when compress_observations: False (#2426)
We shouldn't try to decompress uncompressed data. Also, fix resource requests for ddpg + GPU.
This commit is contained in:
@@ -3,6 +3,7 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from ray.rllib.agents.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG as DDPG_CONFIG
|
||||
from ray.tune.trial import Resources
|
||||
from ray.utils import merge_dicts
|
||||
|
||||
APEX_DDPG_DEFAULT_CONFIG = merge_dicts(
|
||||
@@ -16,6 +17,7 @@ APEX_DDPG_DEFAULT_CONFIG = merge_dicts(
|
||||
"debug": False
|
||||
}),
|
||||
"n_step": 3,
|
||||
"gpu": False,
|
||||
"num_workers": 32,
|
||||
"buffer_size": 2000000,
|
||||
"learning_starts": 50000,
|
||||
@@ -40,6 +42,15 @@ class ApexDDPGAgent(DDPGAgent):
|
||||
_agent_name = "APEX_DDPG"
|
||||
_default_config = APEX_DDPG_DEFAULT_CONFIG
|
||||
|
||||
@classmethod
|
||||
def default_resource_request(cls, config):
|
||||
cf = dict(cls._default_config, **config)
|
||||
return Resources(
|
||||
cpu=1 + cf["optimizer"]["num_replay_buffer_shards"],
|
||||
gpu=cf["gpu"] and 1 or 0,
|
||||
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
|
||||
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
|
||||
|
||||
def update_target_if_needed(self):
|
||||
# Ape-X updates based on num steps trained, not sampled
|
||||
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
|
||||
|
||||
@@ -63,6 +63,8 @@ DEFAULT_CONFIG = with_common_config({
|
||||
"prioritized_replay_eps": 1e-6,
|
||||
# Whether to clip rewards to [-1, 1] prior to adding to the replay buffer.
|
||||
"clip_rewards": True,
|
||||
# Whether to LZ4 compress observations
|
||||
"compress_observations": False,
|
||||
|
||||
# === Optimization ===
|
||||
# Learning rate for adam optimizer
|
||||
|
||||
@@ -136,8 +136,8 @@ class PolicyEvaluator(EvaluatorInterface):
|
||||
sample_async (bool): Whether to compute samples asynchronously in
|
||||
the background, which improves throughput but can cause samples
|
||||
to be slightly off-policy.
|
||||
compress_observations (bool): If true, compress the observations
|
||||
returned.
|
||||
compress_observations (bool): If true, compress the observations.
|
||||
They can be decompressed with rllib/utils/compression.
|
||||
num_envs (int): If more than one, will create multiple envs
|
||||
and vectorize the computation of actions. This has no effect if
|
||||
if the env already implements VectorEnv.
|
||||
|
||||
@@ -7,7 +7,7 @@ import random
|
||||
import sys
|
||||
|
||||
from ray.rllib.optimizers.segment_tree import SumSegmentTree, MinSegmentTree
|
||||
from ray.rllib.utils.compression import unpack
|
||||
from ray.rllib.utils.compression import unpack_if_needed
|
||||
from ray.rllib.utils.window_stat import WindowStat
|
||||
|
||||
|
||||
@@ -59,10 +59,10 @@ class ReplayBuffer(object):
|
||||
for i in idxes:
|
||||
data = self._storage[i]
|
||||
obs_t, action, reward, obs_tp1, done = data
|
||||
obses_t.append(np.array(unpack(obs_t), copy=False))
|
||||
obses_t.append(np.array(unpack_if_needed(obs_t), copy=False))
|
||||
actions.append(np.array(action, copy=False))
|
||||
rewards.append(reward)
|
||||
obses_tp1.append(np.array(unpack(obs_tp1), copy=False))
|
||||
obses_tp1.append(np.array(unpack_if_needed(obs_tp1), copy=False))
|
||||
dones.append(done)
|
||||
self._hit_count[i] += 1
|
||||
return (np.array(obses_t), np.array(actions), np.array(rewards),
|
||||
|
||||
@@ -41,6 +41,12 @@ def unpack(data):
|
||||
return data
|
||||
|
||||
|
||||
def unpack_if_needed(data):
|
||||
if isinstance(data, bytes):
|
||||
data = unpack(data)
|
||||
return data
|
||||
|
||||
|
||||
# Intel(R) Core(TM) i7-4600U CPU @ 2.10GHz
|
||||
# Compression speed: 753.664 MB/s
|
||||
# Compression ratio: 87.4839812046
|
||||
|
||||
Reference in New Issue
Block a user