[rllib] Clean up agent resource configurations (#3296)

Closes #3284
This commit is contained in:
Eric Liang
2018-11-13 18:00:03 -08:00
committed by Richard Liaw
parent d4fad222e1
commit 65c27c70cf
35 changed files with 126 additions and 478 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ def _setup_logger():
def _register_all():
for key in [
"PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG", "APEX_DDPG",
"PPO", "ES", "DQN", "APEX", "A3C", "PG", "DDPG", "APEX_DDPG",
"IMPALA", "ARS", "A2C", "__fake", "__sigmoid_fake_data",
"__parameter_tuning"
]:
-11
View File
@@ -5,12 +5,10 @@ from __future__ import print_function
from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG as A3C_CONFIG
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
A2C_DEFAULT_CONFIG = merge_dicts(
A3C_CONFIG,
{
"gpu": False,
"sample_batch_size": 20,
"min_iter_time_s": 10,
"sample_async": False,
@@ -28,12 +26,3 @@ class A2CAgent(A3CAgent):
return SyncSamplesOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=cf["gpu_fraction"] if cf["gpu"] else 0,
extra_cpu=cf["num_workers"],
extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0)
+1 -15
View File
@@ -7,8 +7,6 @@ import time
from ray.rllib.agents.a3c.a3c_tf_policy_graph import A3CPolicyGraph
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
# yapf: disable
# __sphinx_doc_begin__
@@ -29,8 +27,6 @@ DEFAULT_CONFIG = with_common_config({
"vf_loss_coeff": 0.5,
# Entropy coefficient
"entropy_coeff": -0.01,
# Whether to place workers on GPUs
"use_gpu_for_workers": False,
# Min time per iteration
"min_iter_time_s": 5,
# Workers sample async. Note that this increases the effective
@@ -48,15 +44,6 @@ class A3CAgent(Agent):
_default_config = DEFAULT_CONFIG
_policy_graph = A3CPolicyGraph
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=0,
extra_cpu=cf["num_workers"],
extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0)
def _init(self):
if self.config["use_pytorch"]:
from ray.rllib.agents.a3c.a3c_torch_policy_graph import \
@@ -68,8 +55,7 @@ class A3CAgent(Agent):
self.local_evaluator = self.make_local_evaluator(
self.env_creator, policy_cls)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, policy_cls, self.config["num_workers"],
{"num_gpus": 1 if self.config["use_gpu_for_workers"] else 0})
self.env_creator, policy_cls, self.config["num_workers"])
self.optimizer = self._make_optimizer()
def _make_optimizer(self):
+53 -9
View File
@@ -17,6 +17,7 @@ from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.utils import FilterManager, deep_update, merge_dicts
from ray.tune.registry import ENV_CREATOR, _global_registry
from ray.tune.trainable import Trainable
from ray.tune.trial import Resources
from ray.tune.logger import UnifiedLogger
from ray.tune.result import DEFAULT_RESULTS_DIR
@@ -63,11 +64,25 @@ COMMON_CONFIG = {
# Whether to use rllib or deepmind preprocessors by default
"preprocessor_pref": "deepmind",
# === Resources ===
# Number of actors used for parallelism
"num_workers": 2,
# Number of GPUs to allocate to the driver. Note that not all algorithms
# can take advantage of driver GPUs. This can be fraction (e.g., 0.3 GPUs).
"num_gpus": 0,
# Number of CPUs to allocate per worker.
"num_cpus_per_worker": 1,
# Number of GPUs to allocate per worker. This can be fractional.
"num_gpus_per_worker": 0,
# Any custom resources to allocate per worker.
"custom_resources_per_worker": {},
# Number of CPUs to allocate for the driver. Note: this only takes effect
# when running in Tune.
"num_cpus_for_driver": 1,
# === Execution ===
# Number of environments to evaluate vectorwise per worker.
"num_envs_per_worker": 1,
# Number of actors used for parallelism
"num_workers": 2,
# Default sample batch size
"sample_batch_size": 200,
# Training batch size, if applicable. Should be >= sample_batch_size.
@@ -104,8 +119,6 @@ COMMON_CONFIG = {
},
# Whether to LZ4 compress observations
"compress_observations": False,
# Allocate a fraction of a GPU instead of one (e.g., 0.3 GPUs)
"gpu_fraction": 1,
# Drop metric batches from unresponsive workers after this many seconds
"collect_metrics_timeout": 180,
@@ -149,6 +162,17 @@ class Agent(Trainable):
"tf_session_args", "env_config", "model", "optimizer", "multiagent"
]
@classmethod
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
Agent._validate_config(cf)
# TODO(ekl): add custom resources here once tune supports them
return Resources(
cpu=cf["num_cpus_for_driver"],
gpu=cf["num_gpus"],
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def make_local_evaluator(self, env_creator, policy_graph):
"""Convenience method to return configured local evaluator."""
@@ -163,10 +187,15 @@ class Agent(Trainable):
config["local_evaluator_tf_session_args"]
}))
def make_remote_evaluators(self, env_creator, policy_graph, count,
remote_args):
def make_remote_evaluators(self, env_creator, policy_graph, count):
"""Convenience method to return a number of remote evaluators."""
remote_args = {
"num_cpus": self.config["num_cpus_per_worker"],
"num_gpus": self.config["num_gpus_per_worker"],
"resources": self.config["custom_resources_per_worker"],
}
cls = PolicyEvaluator.as_remote(**remote_args).remote
return [
self._make_evaluator(cls, env_creator, policy_graph, i + 1,
@@ -212,6 +241,21 @@ class Agent(Trainable):
"DEFAULT_CONFIG defined by each agent for more info.\n\n"
"The config of this agent is: {}".format(config))
@staticmethod
def _validate_config(config):
if "gpu" in config:
raise ValueError(
"The `gpu` config is deprecated, please use `num_gpus=0|1` "
"instead.")
if "gpu_fraction" in config:
raise ValueError(
"The `gpu_fraction` config is deprecated, please use "
"`num_gpus=<fraction>` instead.")
if "use_gpu_for_workers" in config:
raise ValueError(
"The `use_gpu_for_workers` config is deprecated, please use "
"`num_gpus_per_worker=1` instead.")
def __init__(self, config=None, env=None, logger_creator=None):
"""Initialize an RLLib agent.
@@ -224,6 +268,7 @@ class Agent(Trainable):
"""
config = config or {}
Agent._validate_config(config)
# Vars to synchronize to evaluators on each train call
self.global_vars = {"timestep": 0}
@@ -370,6 +415,8 @@ class Agent(Trainable):
if hasattr(self, "remote_evaluators"):
for ev in self.remote_evaluators:
ev.__ray_terminate__.remote()
if hasattr(self, "optimizer"):
self.optimizer.stop()
def __getstate__(self):
state = {}
@@ -429,9 +476,6 @@ def get_agent_class(alg):
elif alg == "A2C":
from ray.rllib.agents import a3c
return a3c.A2CAgent
elif alg == "BC":
from ray.rllib.agents import bc
return bc.BCAgent
elif alg == "PG":
from ray.rllib.agents import pg
return pg.PGAgent
-6
View File
@@ -13,7 +13,6 @@ import time
import ray
from ray.rllib.agents import Agent, with_common_config
from ray.tune.trial import Resources
from ray.rllib.agents.ars import optimizers
from ray.rllib.agents.ars import policies
@@ -162,11 +161,6 @@ class ARSAgent(Agent):
_agent_name = "ARS"
_default_config = DEFAULT_CONFIG
@classmethod
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"])
def _init(self):
env = self.env_creator(self.config["env_config"])
from ray.rllib import models
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.agents.bc.bc import BCAgent, DEFAULT_CONFIG
__all__ = ["BCAgent", "DEFAULT_CONFIG"]
-99
View File
@@ -1,99 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.agents.agent import Agent
from ray.rllib.agents.bc.bc_evaluator import BCEvaluator, \
GPURemoteBCEvaluator, RemoteBCEvaluator
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
DEFAULT_CONFIG = {
# Number of workers (excluding master)
"num_workers": 1,
# Size of rollout batch
"batch_size": 100,
# Max global norm for each gradient calculated by worker
"grad_clip": 40.0,
# Learning rate
"lr": 0.0001,
# Whether to use a GPU for local optimization.
"gpu": False,
# Whether to place workers on GPUs
"use_gpu_for_workers": False,
# Model and preprocessor options
"model": {
# (Image statespace) - Converts image to Channels = 1
"grayscale": True,
# (Image statespace) - Each pixel
"zero_mean": False,
# (Image statespace) - Converts image to (dim, dim, C)
"dim": 84,
# (Image statespace) - Converts image shape to (C, dim, dim)
"channel_major": False
},
# Arguments to pass to the rllib optimizer
"optimizer": {
# Number of gradients applied for each `train` step
"grads_per_step": 100,
},
# Arguments to pass to the env creator
"env_config": {},
}
class BCAgent(Agent):
_agent_name = "BC"
_default_config = DEFAULT_CONFIG
_allow_unknown_configs = True
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
if cf["use_gpu_for_workers"]:
num_gpus_per_worker = cf["gpu_fraction"]
else:
num_gpus_per_worker = 0
return Resources(
cpu=1,
gpu=cf["gpu"] and cf["gpu_fraction"] or 0,
extra_cpu=cf["num_workers"],
extra_gpu=num_gpus_per_worker * cf["num_workers"])
def _init(self):
self.local_evaluator = BCEvaluator(self.env_creator, self.config,
self.logdir)
if self.config["use_gpu_for_workers"]:
remote_cls = GPURemoteBCEvaluator
else:
remote_cls = RemoteBCEvaluator
self.remote_evaluators = [
remote_cls.remote(self.env_creator, self.config, self.logdir)
for _ in range(self.config["num_workers"])
]
self.optimizer = AsyncGradientsOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
def _train(self):
self.optimizer.step()
metric_lists = [
re.get_metrics.remote() for re in self.remote_evaluators
]
total_samples = 0
total_loss = 0
for metrics in metric_lists:
for m in ray.get(metrics):
total_samples += m["num_samples"]
total_loss += m["loss"]
result = dict(
mean_loss=total_loss / total_samples,
timesteps_this_iter=total_samples,
)
return result
def compute_action(self, observation):
action, info = self.local_evaluator.policy.compute(observation)
return action
@@ -1,64 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pickle
from six.moves import queue
import ray
from ray.rllib.agents.bc.experience_dataset import ExperienceDataset
from ray.rllib.agents.bc.policy import BCPolicy
from ray.rllib.evaluation.interface import EvaluatorInterface
from ray.rllib.models import ModelCatalog
class BCEvaluator(EvaluatorInterface):
def __init__(self, env_creator, config, logdir):
env = ModelCatalog.get_preprocessor_as_wrapper(
env_creator(config["env_config"]), config["model"])
self.dataset = ExperienceDataset(config["dataset_path"])
self.policy = BCPolicy(env.observation_space, env.action_space, config)
self.config = config
self.logdir = logdir
self.metrics_queue = queue.Queue()
def sample(self):
return self.dataset.sample(self.config["batch_size"])
def compute_gradients(self, samples):
gradient, info = self.policy.compute_gradients(samples)
self.metrics_queue.put({
"num_samples": info["num_samples"],
"loss": info["loss"]
})
return gradient, {}
def apply_gradients(self, grads):
self.policy.apply_gradients(grads)
def get_weights(self):
return self.policy.get_weights()
def set_weights(self, params):
self.policy.set_weights(params)
def save(self):
weights = self.get_weights()
return pickle.dumps({"weights": weights})
def restore(self, objs):
objs = pickle.loads(objs)
self.set_weights(objs["weights"])
def get_metrics(self):
completed = []
while True:
try:
completed.append(self.metrics_queue.get_nowait())
except queue.Empty:
break
return completed
RemoteBCEvaluator = ray.remote(BCEvaluator)
GPURemoteBCEvaluator = ray.remote(num_gpus=1)(BCEvaluator)
@@ -1,34 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import itertools
import pickle
import numpy as np
class ExperienceDataset(object):
def __init__(self, dataset_path):
"""Create dataset of experience to imitate.
Parameters
----------
dataset_path:
Path of file containing the database as pickled list of trajectories,
each trajectory being a list of steps,
each step containing the observation and action as its first two
elements.
The file must be available on each machine used by a BCEvaluator.
"""
self._dataset = list(
itertools.chain.from_iterable(
pickle.load(open(dataset_path, "rb"))))
def sample(self, batch_size):
indexes = np.random.choice(len(self._dataset), batch_size)
samples = {
'observations': [self._dataset[i][0] for i in indexes],
'actions': [self._dataset[i][1] for i in indexes]
}
return samples
-106
View File
@@ -1,106 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import gym
import ray
from ray.rllib.models.catalog import ModelCatalog
class BCPolicy(object):
def __init__(self, obs_space, action_space, config):
self.local_steps = 0
self.config = config
self.summarize = config.get("summarize")
self._setup_graph(obs_space, action_space)
self.setup_loss(action_space)
self.setup_gradients()
self.initialize()
def _setup_graph(self, obs_space, ac_space):
self.x = tf.placeholder(tf.float32, [None] + list(obs_space.shape))
dist_class, self.logit_dim = ModelCatalog.get_action_dist(
ac_space, self.config["model"])
self._model = ModelCatalog.get_model(self.x, self.logit_dim,
self.config["model"])
self.logits = self._model.outputs
self.curr_dist = dist_class(self.logits)
self.sample = self.curr_dist.sample()
self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
tf.get_variable_scope().name)
def setup_loss(self, action_space):
if isinstance(action_space, gym.spaces.Box):
self.ac = tf.placeholder(
tf.float32, [None] + list(action_space.shape), name="ac")
elif isinstance(action_space, gym.spaces.Discrete):
self.ac = tf.placeholder(tf.int64, [None], name="ac")
else:
raise NotImplementedError("action space" +
str(type(action_space)) +
"currently not supported")
log_prob = self.curr_dist.logp(self.ac)
self.pi_loss = -tf.reduce_sum(log_prob)
self.loss = self.pi_loss
def setup_gradients(self):
grads = tf.gradients(self.loss, self.var_list)
self.grads, _ = tf.clip_by_global_norm(grads, self.config["grad_clip"])
grads_and_vars = list(zip(self.grads, self.var_list))
opt = tf.train.AdamOptimizer(self.config["lr"])
self._apply_gradients = opt.apply_gradients(grads_and_vars)
def initialize(self):
if self.summarize:
bs = tf.to_float(tf.shape(self.x)[0])
tf.summary.scalar("model/policy_loss", self.pi_loss / bs)
tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads))
tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list))
self.summary_op = tf.summary.merge_all()
# TODO(rliaw): Can consider exposing these parameters
self.sess = tf.Session(
graph=self.g,
config=tf.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=2,
gpu_options=tf.GPUOptions(allow_growth=True)))
self.variables = ray.experimental.TensorFlowVariables(
self.loss, self.sess)
self.sess.run(tf.global_variables_initializer())
def compute_gradients(self, samples):
info = {}
feed_dict = {
self.x: samples["observations"],
self.ac: samples["actions"]
}
self.grads = [g for g in self.grads if g is not None]
self.local_steps += 1
if self.summarize:
loss, grad, summ = self.sess.run(
[self.loss, self.grads, self.summary_op], feed_dict=feed_dict)
info["summary"] = summ
else:
loss, grad = self.sess.run(
[self.loss, self.grads], feed_dict=feed_dict)
info["num_samples"] = len(samples)
info["loss"] = loss
return grad, info
def apply_gradients(self, grads):
feed_dict = {self.grads[i]: grads[i] for i in range(len(grads))}
self.sess.run(self._apply_gradients, feed_dict=feed_dict)
def get_weights(self):
weights = self.variables.get_weights()
return weights
def set_weights(self, weights):
self.variables.set_weights(weights)
def compute(self, ob, *args):
action = self.sess.run(self.sample, {self.x: [ob]})
return action, None
+1 -11
View File
@@ -4,7 +4,6 @@ from __future__ import print_function
from ray.rllib.agents.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG as DDPG_CONFIG
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
APEX_DDPG_DEFAULT_CONFIG = merge_dicts(
DDPG_CONFIG, # see also the options in ddpg.py, which are also supported
@@ -17,7 +16,7 @@ APEX_DDPG_DEFAULT_CONFIG = merge_dicts(
"debug": False
}),
"n_step": 3,
"gpu": False,
"num_gpus": 0,
"num_workers": 32,
"buffer_size": 2000000,
"learning_starts": 50000,
@@ -43,15 +42,6 @@ class ApexDDPGAgent(DDPGAgent):
_agent_name = "APEX_DDPG"
_default_config = APEX_DDPG_DEFAULT_CONFIG
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=cf["gpu"] and cf["gpu_fraction"] or 0,
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
-6
View File
@@ -90,16 +90,10 @@ DEFAULT_CONFIG = with_common_config({
"train_batch_size": 256,
# === Parallelism ===
# Whether to use a GPU for local optimization.
"gpu": False,
# Number of workers for collecting samples with. This only makes sense
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Whether to allocate GPUs for workers (if > 0).
"num_gpus_per_worker": 0,
# Whether to allocate CPUs for workers (if > 0).
"num_cpus_per_worker": 1,
# Optimizer class to use.
"optimizer_class": "SyncReplayOptimizer",
# Whether to use a distribution of epsilons across workers for exploration.
+1 -11
View File
@@ -4,7 +4,6 @@ from __future__ import print_function
from ray.rllib.agents.dqn.dqn import DQNAgent, DEFAULT_CONFIG as DQN_CONFIG
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
# yapf: disable
# __sphinx_doc_begin__
@@ -19,7 +18,7 @@ APEX_DEFAULT_CONFIG = merge_dicts(
"debug": False
}),
"n_step": 3,
"gpu": True,
"num_gpus": 1,
"num_workers": 32,
"buffer_size": 2000000,
"learning_starts": 50000,
@@ -46,15 +45,6 @@ class ApexAgent(DQNAgent):
_agent_name = "APEX"
_default_config = APEX_DEFAULT_CONFIG
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=cf["gpu"] and cf["gpu_fraction"] or 0,
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
+3 -25
View File
@@ -8,9 +8,7 @@ from ray.rllib import optimizers
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.agents.dqn.dqn_policy_graph import DQNPolicyGraph
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.utils import merge_dicts
from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule
from ray.tune.trial import Resources
OPTIMIZER_SHARED_CONFIGS = [
"buffer_size", "prioritized_replay", "prioritized_replay_alpha",
@@ -42,8 +40,6 @@ DEFAULT_CONFIG = with_common_config({
"hiddens": [256],
# N-step Q learning
"n_step": 1,
# Whether to use rllib or deepmind preprocessors
"preprocessor_pref": "deepmind",
# === Exploration ===
# Max num timesteps for annealing schedules. Exploration is annealed from
@@ -98,16 +94,10 @@ DEFAULT_CONFIG = with_common_config({
"train_batch_size": 32,
# === Parallelism ===
# Whether to use a GPU for local optimization.
"gpu": False,
# Number of workers for collecting samples with. This only makes sense
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Whether to allocate GPUs for workers (if > 0).
"num_gpus_per_worker": 0,
# Whether to allocate CPUs for workers (if > 0).
"num_cpus_per_worker": 1,
# Optimizer class to use.
"optimizer_class": "SyncReplayOptimizer",
# Whether to use a distribution of epsilons across workers for exploration.
@@ -128,15 +118,6 @@ class DQNAgent(Agent):
_default_config = DEFAULT_CONFIG
_policy_graph = DQNPolicyGraph
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=cf["gpu"] and cf["gpu_fraction"] or 0,
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def _init(self):
# Update effective batch size to include n-step
adjusted_batch_size = max(self.config["sample_batch_size"],
@@ -163,12 +144,9 @@ class DQNAgent(Agent):
self.env_creator, self._policy_graph)
def create_remote_evaluators():
return self.make_remote_evaluators(
self.env_creator, self._policy_graph,
self.config["num_workers"], {
"num_cpus": self.config["num_cpus_per_worker"],
"num_gpus": self.config["num_gpus_per_worker"]
})
return self.make_remote_evaluators(self.env_creator,
self._policy_graph,
self.config["num_workers"])
if self.config["optimizer_class"] != "AsyncReplayOptimizer":
self.remote_evaluators = create_remote_evaluators()
-7
View File
@@ -12,12 +12,10 @@ import time
import ray
from ray.rllib.agents import Agent, with_common_config
from ray.tune.trial import Resources
from ray.rllib.agents.es import optimizers
from ray.rllib.agents.es import policies
from ray.rllib.agents.es import utils
from ray.rllib.utils import merge_dicts
from ray.rllib.utils import FilterManager
logger = logging.getLogger(__name__)
@@ -169,11 +167,6 @@ class ESAgent(Agent):
_agent_name = "ES"
_default_config = DEFAULT_CONFIG
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"])
def _init(self):
policy_params = {"action_noise_std": 0.01}
+1 -14
View File
@@ -8,7 +8,6 @@ from ray.rllib.agents.a3c.a3c_tf_policy_graph import A3CPolicyGraph
from ray.rllib.agents.impala.vtrace_policy_graph import VTracePolicyGraph
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.optimizers import AsyncSamplesOptimizer
from ray.tune.trial import Resources
OPTIMIZER_SHARED_CONFIGS = [
"lr",
@@ -36,8 +35,6 @@ DEFAULT_CONFIG = with_common_config({
"train_batch_size": 500,
"min_iter_time_s": 10,
"num_workers": 2,
"num_cpus_per_worker": 1,
"num_gpus_per_worker": 0,
# number of GPUs the learner should use.
"num_gpus": 1,
# set >1 to load data into GPUs in parallel. Increases GPU memory usage
@@ -77,15 +74,6 @@ class ImpalaAgent(Agent):
_default_config = DEFAULT_CONFIG
_policy_graph = VTracePolicyGraph
@classmethod
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
return Resources(
cpu=1,
gpu=cf["num_gpus"] and cf["num_gpus"] * cf["gpu_fraction"] or 0,
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def _init(self):
for k in OPTIMIZER_SHARED_CONFIGS:
if k not in self.config["optimizer"]:
@@ -97,8 +85,7 @@ class ImpalaAgent(Agent):
self.local_evaluator = self.make_local_evaluator(
self.env_creator, policy_cls)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, policy_cls, self.config["num_workers"],
{"num_cpus": 1})
self.env_creator, policy_cls, self.config["num_workers"])
self.optimizer = AsyncSamplesOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
+12 -9
View File
@@ -6,18 +6,19 @@ import os
import pickle
import numpy as np
from ray.rllib.agents.agent import Agent
from ray.rllib.agents.agent import Agent, with_common_config
class _MockAgent(Agent):
"""Mock agent for use in tests"""
_agent_name = "MockAgent"
_default_config = {
_default_config = with_common_config({
"mock_error": False,
"persistent_error": False,
"test_variable": 1
}
"test_variable": 1,
"num_workers": 0,
})
def _init(self):
self.info = None
@@ -59,13 +60,14 @@ class _SigmoidFakeData(_MockAgent):
This can be helpful for evaluating early stopping algorithms."""
_agent_name = "SigmoidFakeData"
_default_config = {
_default_config = with_common_config({
"width": 100,
"height": 100,
"offset": 0,
"iter_time": 10,
"iter_timesteps": 1,
}
"num_workers": 0,
})
def _train(self):
i = max(0, self.iteration - self.config["offset"])
@@ -82,13 +84,14 @@ class _SigmoidFakeData(_MockAgent):
class _ParameterTuningAgent(_MockAgent):
_agent_name = "ParameterTuningAgent"
_default_config = {
_default_config = with_common_config({
"reward_amt": 10,
"dummy_param": 10,
"dummy_param2": 15,
"iter_time": 10,
"iter_timesteps": 1
}
"iter_timesteps": 1,
"num_workers": 0,
})
def _train(self):
return dict(
+1 -9
View File
@@ -5,8 +5,6 @@ from __future__ import print_function
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.agents.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils import merge_dicts
from ray.tune.trial import Resources
# yapf: disable
# __sphinx_doc_begin__
@@ -31,17 +29,11 @@ class PGAgent(Agent):
_default_config = DEFAULT_CONFIG
_policy_graph = PGPolicyGraph
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"])
def _init(self):
self.local_evaluator = self.make_local_evaluator(
self.env_creator, self._policy_graph)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, self._policy_graph, self.config["num_workers"],
{})
self.env_creator, self._policy_graph, self.config["num_workers"])
self.optimizer = SyncSamplesOptimizer(self.local_evaluator,
self.remote_evaluators,
self.config["optimizer"])
+1 -21
View File
@@ -6,9 +6,7 @@ import logging
from ray.rllib.agents import Agent, with_common_config
from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph
from ray.rllib.utils import merge_dicts
from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer
from ray.tune.trial import Resources
logger = logging.getLogger(__name__)
@@ -47,12 +45,6 @@ DEFAULT_CONFIG = with_common_config({
"vf_clip_param": 10.0,
# Target value for KL divergence
"kl_target": 0.01,
# Number of GPUs to use for SGD
"num_gpus": 0,
# Whether to allocate GPUs for workers (if > 0).
"num_gpus_per_worker": 0,
# Whether to allocate CPUs for workers (if > 0).
"num_cpus_per_worker": 1,
# Whether to rollout "complete_episodes" or "truncate_episodes"
"batch_mode": "truncate_episodes",
# Which observation filter to apply to the observation
@@ -71,24 +63,12 @@ class PPOAgent(Agent):
_default_config = DEFAULT_CONFIG
_policy_graph = PPOPolicyGraph
@classmethod
def default_resource_request(cls, config):
cf = merge_dicts(cls._default_config, config)
return Resources(
cpu=1,
gpu=cf["num_gpus"],
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def _init(self):
self._validate_config()
self.local_evaluator = self.make_local_evaluator(
self.env_creator, self._policy_graph)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, self._policy_graph, self.config["num_workers"], {
"num_cpus": self.config["num_cpus_per_worker"],
"num_gpus": self.config["num_gpus_per_worker"]
})
self.env_creator, self._policy_graph, self.config["num_workers"])
if self.config["simple_optimizer"]:
self.optimizer = SyncSamplesOptimizer(
self.local_evaluator, self.remote_evaluators, {
@@ -81,8 +81,9 @@ class PolicyEvaluator(EvaluatorInterface):
"""
@classmethod
def as_remote(cls, num_cpus=None, num_gpus=None):
return ray.remote(num_cpus=num_cpus, num_gpus=num_gpus)(cls)
def as_remote(cls, num_cpus=None, num_gpus=None, resources=None):
return ray.remote(
num_cpus=num_cpus, num_gpus=num_gpus, resources=resources)(cls)
def __init__(self,
env_creator,
@@ -47,6 +47,8 @@ class CustomModel1(Model):
auxiliary_name_scope=False):
last_layer = slim.fully_connected(
input_dict["obs"], 64, activation_fn=tf.nn.relu, scope="fc1")
last_layer = slim.fully_connected(
last_layer, 64, activation_fn=tf.nn.relu, scope="fc2")
output = slim.fully_connected(
last_layer, num_outputs, activation_fn=None, scope="fc_out")
return output, last_layer
@@ -61,6 +63,8 @@ class CustomModel2(Model):
auxiliary_name_scope=False):
last_layer = slim.fully_connected(
input_dict["obs"], 64, activation_fn=tf.nn.relu, scope="fc1")
last_layer = slim.fully_connected(
last_layer, 64, activation_fn=tf.nn.relu, scope="fc2")
output = slim.fully_connected(
last_layer, num_outputs, activation_fn=None, scope="fc_out")
return output, last_layer
@@ -84,7 +88,7 @@ if __name__ == "__main__":
"model": {
"custom_model": ["model1", "model2"][i % 2],
},
"gamma": random.choice([0.5, 0.8, 0.9, 0.95, 0.99]),
"gamma": random.choice([0.95, 0.99]),
}
return (PPOPolicyGraph, obs_space, act_space, config)
@@ -134,9 +134,10 @@ class LearnerThread(threading.Thread):
self.grad_timer = TimerStat()
self.daemon = True
self.weights_updated = False
self.stopped = False
def run(self):
while True:
while not self.stopped:
self.step()
def step(self):
@@ -299,6 +300,11 @@ class AsyncReplayOptimizer(PolicyOptimizer):
return sample_timesteps, train_timesteps
def stop(self):
for r in self.replay_actors:
r.__ray_terminate__.remote()
self.learner.stopped = True
def stats(self):
replay_stats = ray.get(self.replay_actors[0].stats.remote(self.debug))
timing = {
@@ -49,9 +49,10 @@ class LearnerThread(threading.Thread):
self.daemon = True
self.weights_updated = False
self.stats = {}
self.stopped = False
def run(self):
while True:
while not self.stopped:
self.step()
def step(self):
@@ -329,6 +330,9 @@ class AsyncSamplesOptimizer(PolicyOptimizer):
return sample_timesteps, train_timesteps
def stop(self):
self.learner.stopped = True
def stats(self):
timing = {
"{}_time_ms".format(k): round(1000 * self.timers[k].mean, 3)
@@ -143,6 +143,10 @@ class PolicyOptimizer(object):
])
return local_result + remote_results
def stop(self):
"""Release any resources used by this optimizer."""
pass
@staticmethod
def _check_not_multiagent(sample_batch):
if isinstance(sample_batch, MultiAgentBatch):
@@ -13,7 +13,7 @@ atari-a2c:
clip_rewards: True
num_workers: 5
num_envs_per_worker: 5
gpu: true
num_gpus: 1
lr_schedule: [
[0, 0.0007],
[20000000, 0.000000000001],
@@ -23,7 +23,7 @@ apex:
prioritized_replay_alpha: 0.5
beta_annealing_fraction: 1.0
final_prioritized_replay_beta: 1.0
gpu: true
num_gpus: 1
# APEX
num_workers: 8
@@ -27,5 +27,5 @@ basic-dqn:
prioritized_replay_alpha: 0.5
beta_annealing_fraction: 1.0
final_prioritized_replay_beta: 1.0
gpu: true
num_gpus: 1
timesteps_per_iteration: 10000
@@ -29,5 +29,5 @@ atari-basic-dqn:
prioritized_replay_alpha: 0.5
beta_annealing_fraction: 1.0
final_prioritized_replay_beta: 1.0
gpu: true
num_gpus: 1
timesteps_per_iteration: 10000
@@ -27,5 +27,5 @@ dueling-ddqn:
prioritized_replay_alpha: 0.5
beta_annealing_fraction: 1.0
final_prioritized_replay_beta: 1.0
gpu: true
num_gpus: 1
timesteps_per_iteration: 10000
@@ -6,7 +6,7 @@ pong-deterministic-dqn:
episode_reward_mean: 20
time_total_s: 7200
config:
gpu: True
num_gpus: 1
gamma: 0.99
lr: .0001
learning_starts: 10000
+12 -1
View File
@@ -29,6 +29,8 @@ class RayTrialExecutor(TrialExecutor):
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False
if ray.is_initialized():
self._update_avail_resources()
def _setup_runner(self, trial):
cls = ray.remote(
@@ -257,7 +259,16 @@ class RayTrialExecutor(TrialExecutor):
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
else:
return ""
return "Resources requested: ?"
def resource_string(self):
"""Returns a string describing the total resources available."""
if self._resources_initialized:
return "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
else:
return "? CPUs, ? GPUs"
def on_step_begin(self):
"""Before step() called, update the available resources."""
+5 -1
View File
@@ -158,7 +158,11 @@ class TrialExecutor(object):
def debug_string(self):
"""Returns a human readable message for printing to the console."""
pass
raise NotImplementedError
def resource_string(self):
"""Returns a string describing the total resources available."""
raise NotImplementedError
def restore(self, trial, checkpoint=None):
"""Restores training state from a checkpoint.
+2 -2
View File
@@ -122,13 +122,13 @@ class TrialRunner(object):
if not self.has_resources(trial.resources):
raise TuneError(
("Insufficient cluster resources to launch trial: "
"trial requested {} but the cluster summary: {} "
"trial requested {} but the cluster has only {}. "
"Pass `queue_trials=True` in "
"ray.tune.run_experiments() or on the command "
"line to queue trials until the cluster scales "
"up. {}").format(
trial.resources.summary_string(),
self.trial_executor.debug_string(),
self.trial_executor.resource_string(),
trial._get_trainable_cls().resource_help(
trial.config)))
elif trial.status == Trial.PAUSED: