From 65c27c70cf88094c3facc95a0566aaf4159cb46e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 13 Nov 2018 18:00:03 -0800 Subject: [PATCH] [rllib] Clean up agent resource configurations (#3296) Closes #3284 --- doc/source/rllib-training.rst | 2 +- python/ray/rllib/__init__.py | 2 +- python/ray/rllib/agents/a3c/a2c.py | 11 -- python/ray/rllib/agents/a3c/a3c.py | 16 +-- python/ray/rllib/agents/agent.py | 62 ++++++++-- python/ray/rllib/agents/ars/ars.py | 6 - python/ray/rllib/agents/bc/__init__.py | 3 - python/ray/rllib/agents/bc/bc.py | 99 ---------------- python/ray/rllib/agents/bc/bc_evaluator.py | 64 ----------- .../ray/rllib/agents/bc/experience_dataset.py | 34 ------ python/ray/rllib/agents/bc/policy.py | 106 ------------------ python/ray/rllib/agents/ddpg/apex.py | 12 +- python/ray/rllib/agents/ddpg/ddpg.py | 6 - python/ray/rllib/agents/dqn/apex.py | 12 +- python/ray/rllib/agents/dqn/dqn.py | 28 +---- python/ray/rllib/agents/es/es.py | 7 -- python/ray/rllib/agents/impala/impala.py | 15 +-- python/ray/rllib/agents/mock.py | 21 ++-- python/ray/rllib/agents/pg/pg.py | 10 +- python/ray/rllib/agents/ppo/ppo.py | 22 +--- .../ray/rllib/evaluation/policy_evaluator.py | 5 +- .../ray/rllib/examples/multiagent_cartpole.py | 6 +- .../optimizers/async_replay_optimizer.py | 8 +- .../optimizers/async_samples_optimizer.py | 6 +- .../ray/rllib/optimizers/policy_optimizer.py | 4 + .../ray/rllib/tuned_examples/atari-a2c.yaml | 2 +- .../ray/rllib/tuned_examples/atari-apex.yaml | 2 +- .../rllib/tuned_examples/atari-dist-dqn.yaml | 2 +- .../ray/rllib/tuned_examples/atari-dqn.yaml | 2 +- .../rllib/tuned_examples/atari-duel-ddqn.yaml | 2 +- python/ray/rllib/tuned_examples/pong-dqn.yaml | 2 +- python/ray/tune/ray_trial_executor.py | 13 ++- python/ray/tune/trial_executor.py | 6 +- python/ray/tune/trial_runner.py | 4 +- test/jenkins_tests/run_multi_node_tests.sh | 2 +- 35 files changed, 126 insertions(+), 478 deletions(-) delete mode 100644 python/ray/rllib/agents/bc/__init__.py delete mode 100644 python/ray/rllib/agents/bc/bc.py delete mode 100644 python/ray/rllib/agents/bc/bc_evaluator.py delete mode 100644 python/ray/rllib/agents/bc/experience_dataset.py delete mode 100644 python/ray/rllib/agents/bc/policy.py diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 3340e021c..588ba5dcc 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -78,7 +78,7 @@ In an example below, we train A2C by specifying 8 workers through the config fla Specifying Resources ~~~~~~~~~~~~~~~~~~~~ -You can control the degree of parallelism used by setting the ``num_workers`` hyperparameter for most agents. Many agents also provide a ``num_gpus`` or ``gpu`` option. In addition, you can allocate a fraction of a GPU by setting ``gpu_fraction: f``. For example, with DQN you can pack five agents onto one GPU by setting ``gpu_fraction: 0.2``. Note that in Ray < 0.6.0 fractional GPU support requires setting the environment variable ``RAY_USE_XRAY=1``. +You can control the degree of parallelism used by setting the ``num_workers`` hyperparameter for most agents. The number of GPUs the driver should use can be set via the ``num_gpus`` option. Similarly, the resource allocation to workers can be controlled via ``num_cpus_per_worker``, ``num_gpus_per_worker``, and ``custom_resources_per_worker``. The number of GPUs can be a fractional quantity to allocate only a fraction of a GPU. For example, with DQN you can pack five agents onto one GPU by setting ``num_gpus: 0.2``. Note that in Ray < 0.6.0 fractional GPU support requires setting the environment variable ``RAY_USE_XRAY=1``. Common Parameters ~~~~~~~~~~~~~~~~~ diff --git a/python/ray/rllib/__init__.py b/python/ray/rllib/__init__.py index d888252ea..fd6ba3407 100644 --- a/python/ray/rllib/__init__.py +++ b/python/ray/rllib/__init__.py @@ -32,7 +32,7 @@ def _setup_logger(): def _register_all(): for key in [ - "PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG", "APEX_DDPG", + "PPO", "ES", "DQN", "APEX", "A3C", "PG", "DDPG", "APEX_DDPG", "IMPALA", "ARS", "A2C", "__fake", "__sigmoid_fake_data", "__parameter_tuning" ]: diff --git a/python/ray/rllib/agents/a3c/a2c.py b/python/ray/rllib/agents/a3c/a2c.py index a792d1d16..f4e7f394a 100644 --- a/python/ray/rllib/agents/a3c/a2c.py +++ b/python/ray/rllib/agents/a3c/a2c.py @@ -5,12 +5,10 @@ from __future__ import print_function from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG as A3C_CONFIG from ray.rllib.optimizers import SyncSamplesOptimizer from ray.rllib.utils import merge_dicts -from ray.tune.trial import Resources A2C_DEFAULT_CONFIG = merge_dicts( A3C_CONFIG, { - "gpu": False, "sample_batch_size": 20, "min_iter_time_s": 10, "sample_async": False, @@ -28,12 +26,3 @@ class A2CAgent(A3CAgent): return SyncSamplesOptimizer(self.local_evaluator, self.remote_evaluators, self.config["optimizer"]) - - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources( - cpu=1, - gpu=cf["gpu_fraction"] if cf["gpu"] else 0, - extra_cpu=cf["num_workers"], - extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0) diff --git a/python/ray/rllib/agents/a3c/a3c.py b/python/ray/rllib/agents/a3c/a3c.py index 63310c504..ebfec99e3 100644 --- a/python/ray/rllib/agents/a3c/a3c.py +++ b/python/ray/rllib/agents/a3c/a3c.py @@ -7,8 +7,6 @@ import time from ray.rllib.agents.a3c.a3c_tf_policy_graph import A3CPolicyGraph from ray.rllib.agents.agent import Agent, with_common_config from ray.rllib.optimizers import AsyncGradientsOptimizer -from ray.rllib.utils import merge_dicts -from ray.tune.trial import Resources # yapf: disable # __sphinx_doc_begin__ @@ -29,8 +27,6 @@ DEFAULT_CONFIG = with_common_config({ "vf_loss_coeff": 0.5, # Entropy coefficient "entropy_coeff": -0.01, - # Whether to place workers on GPUs - "use_gpu_for_workers": False, # Min time per iteration "min_iter_time_s": 5, # Workers sample async. Note that this increases the effective @@ -48,15 +44,6 @@ class A3CAgent(Agent): _default_config = DEFAULT_CONFIG _policy_graph = A3CPolicyGraph - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources( - cpu=1, - gpu=0, - extra_cpu=cf["num_workers"], - extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0) - def _init(self): if self.config["use_pytorch"]: from ray.rllib.agents.a3c.a3c_torch_policy_graph import \ @@ -68,8 +55,7 @@ class A3CAgent(Agent): self.local_evaluator = self.make_local_evaluator( self.env_creator, policy_cls) self.remote_evaluators = self.make_remote_evaluators( - self.env_creator, policy_cls, self.config["num_workers"], - {"num_gpus": 1 if self.config["use_gpu_for_workers"] else 0}) + self.env_creator, policy_cls, self.config["num_workers"]) self.optimizer = self._make_optimizer() def _make_optimizer(self): diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index b8004e328..cebfc5d77 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -17,6 +17,7 @@ from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.utils import FilterManager, deep_update, merge_dicts from ray.tune.registry import ENV_CREATOR, _global_registry from ray.tune.trainable import Trainable +from ray.tune.trial import Resources from ray.tune.logger import UnifiedLogger from ray.tune.result import DEFAULT_RESULTS_DIR @@ -63,11 +64,25 @@ COMMON_CONFIG = { # Whether to use rllib or deepmind preprocessors by default "preprocessor_pref": "deepmind", + # === Resources === + # Number of actors used for parallelism + "num_workers": 2, + # Number of GPUs to allocate to the driver. Note that not all algorithms + # can take advantage of driver GPUs. This can be fraction (e.g., 0.3 GPUs). + "num_gpus": 0, + # Number of CPUs to allocate per worker. + "num_cpus_per_worker": 1, + # Number of GPUs to allocate per worker. This can be fractional. + "num_gpus_per_worker": 0, + # Any custom resources to allocate per worker. + "custom_resources_per_worker": {}, + # Number of CPUs to allocate for the driver. Note: this only takes effect + # when running in Tune. + "num_cpus_for_driver": 1, + # === Execution === # Number of environments to evaluate vectorwise per worker. "num_envs_per_worker": 1, - # Number of actors used for parallelism - "num_workers": 2, # Default sample batch size "sample_batch_size": 200, # Training batch size, if applicable. Should be >= sample_batch_size. @@ -104,8 +119,6 @@ COMMON_CONFIG = { }, # Whether to LZ4 compress observations "compress_observations": False, - # Allocate a fraction of a GPU instead of one (e.g., 0.3 GPUs) - "gpu_fraction": 1, # Drop metric batches from unresponsive workers after this many seconds "collect_metrics_timeout": 180, @@ -149,6 +162,17 @@ class Agent(Trainable): "tf_session_args", "env_config", "model", "optimizer", "multiagent" ] + @classmethod + def default_resource_request(cls, config): + cf = dict(cls._default_config, **config) + Agent._validate_config(cf) + # TODO(ekl): add custom resources here once tune supports them + return Resources( + cpu=cf["num_cpus_for_driver"], + gpu=cf["num_gpus"], + extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], + extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) + def make_local_evaluator(self, env_creator, policy_graph): """Convenience method to return configured local evaluator.""" @@ -163,10 +187,15 @@ class Agent(Trainable): config["local_evaluator_tf_session_args"] })) - def make_remote_evaluators(self, env_creator, policy_graph, count, - remote_args): + def make_remote_evaluators(self, env_creator, policy_graph, count): """Convenience method to return a number of remote evaluators.""" + remote_args = { + "num_cpus": self.config["num_cpus_per_worker"], + "num_gpus": self.config["num_gpus_per_worker"], + "resources": self.config["custom_resources_per_worker"], + } + cls = PolicyEvaluator.as_remote(**remote_args).remote return [ self._make_evaluator(cls, env_creator, policy_graph, i + 1, @@ -212,6 +241,21 @@ class Agent(Trainable): "DEFAULT_CONFIG defined by each agent for more info.\n\n" "The config of this agent is: {}".format(config)) + @staticmethod + def _validate_config(config): + if "gpu" in config: + raise ValueError( + "The `gpu` config is deprecated, please use `num_gpus=0|1` " + "instead.") + if "gpu_fraction" in config: + raise ValueError( + "The `gpu_fraction` config is deprecated, please use " + "`num_gpus=` instead.") + if "use_gpu_for_workers" in config: + raise ValueError( + "The `use_gpu_for_workers` config is deprecated, please use " + "`num_gpus_per_worker=1` instead.") + def __init__(self, config=None, env=None, logger_creator=None): """Initialize an RLLib agent. @@ -224,6 +268,7 @@ class Agent(Trainable): """ config = config or {} + Agent._validate_config(config) # Vars to synchronize to evaluators on each train call self.global_vars = {"timestep": 0} @@ -370,6 +415,8 @@ class Agent(Trainable): if hasattr(self, "remote_evaluators"): for ev in self.remote_evaluators: ev.__ray_terminate__.remote() + if hasattr(self, "optimizer"): + self.optimizer.stop() def __getstate__(self): state = {} @@ -429,9 +476,6 @@ def get_agent_class(alg): elif alg == "A2C": from ray.rllib.agents import a3c return a3c.A2CAgent - elif alg == "BC": - from ray.rllib.agents import bc - return bc.BCAgent elif alg == "PG": from ray.rllib.agents import pg return pg.PGAgent diff --git a/python/ray/rllib/agents/ars/ars.py b/python/ray/rllib/agents/ars/ars.py index ab08dc00e..67e9ba242 100644 --- a/python/ray/rllib/agents/ars/ars.py +++ b/python/ray/rllib/agents/ars/ars.py @@ -13,7 +13,6 @@ import time import ray from ray.rllib.agents import Agent, with_common_config -from ray.tune.trial import Resources from ray.rllib.agents.ars import optimizers from ray.rllib.agents.ars import policies @@ -162,11 +161,6 @@ class ARSAgent(Agent): _agent_name = "ARS" _default_config = DEFAULT_CONFIG - @classmethod - def default_resource_request(cls, config): - cf = dict(cls._default_config, **config) - return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"]) - def _init(self): env = self.env_creator(self.config["env_config"]) from ray.rllib import models diff --git a/python/ray/rllib/agents/bc/__init__.py b/python/ray/rllib/agents/bc/__init__.py deleted file mode 100644 index eb0f8dc2d..000000000 --- a/python/ray/rllib/agents/bc/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from ray.rllib.agents.bc.bc import BCAgent, DEFAULT_CONFIG - -__all__ = ["BCAgent", "DEFAULT_CONFIG"] diff --git a/python/ray/rllib/agents/bc/bc.py b/python/ray/rllib/agents/bc/bc.py deleted file mode 100644 index b2552bf99..000000000 --- a/python/ray/rllib/agents/bc/bc.py +++ /dev/null @@ -1,99 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -from ray.rllib.agents.agent import Agent -from ray.rllib.agents.bc.bc_evaluator import BCEvaluator, \ - GPURemoteBCEvaluator, RemoteBCEvaluator -from ray.rllib.optimizers import AsyncGradientsOptimizer -from ray.rllib.utils import merge_dicts -from ray.tune.trial import Resources - -DEFAULT_CONFIG = { - # Number of workers (excluding master) - "num_workers": 1, - # Size of rollout batch - "batch_size": 100, - # Max global norm for each gradient calculated by worker - "grad_clip": 40.0, - # Learning rate - "lr": 0.0001, - # Whether to use a GPU for local optimization. - "gpu": False, - # Whether to place workers on GPUs - "use_gpu_for_workers": False, - # Model and preprocessor options - "model": { - # (Image statespace) - Converts image to Channels = 1 - "grayscale": True, - # (Image statespace) - Each pixel - "zero_mean": False, - # (Image statespace) - Converts image to (dim, dim, C) - "dim": 84, - # (Image statespace) - Converts image shape to (C, dim, dim) - "channel_major": False - }, - # Arguments to pass to the rllib optimizer - "optimizer": { - # Number of gradients applied for each `train` step - "grads_per_step": 100, - }, - # Arguments to pass to the env creator - "env_config": {}, -} - - -class BCAgent(Agent): - _agent_name = "BC" - _default_config = DEFAULT_CONFIG - _allow_unknown_configs = True - - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - if cf["use_gpu_for_workers"]: - num_gpus_per_worker = cf["gpu_fraction"] - else: - num_gpus_per_worker = 0 - return Resources( - cpu=1, - gpu=cf["gpu"] and cf["gpu_fraction"] or 0, - extra_cpu=cf["num_workers"], - extra_gpu=num_gpus_per_worker * cf["num_workers"]) - - def _init(self): - self.local_evaluator = BCEvaluator(self.env_creator, self.config, - self.logdir) - if self.config["use_gpu_for_workers"]: - remote_cls = GPURemoteBCEvaluator - else: - remote_cls = RemoteBCEvaluator - self.remote_evaluators = [ - remote_cls.remote(self.env_creator, self.config, self.logdir) - for _ in range(self.config["num_workers"]) - ] - self.optimizer = AsyncGradientsOptimizer(self.local_evaluator, - self.remote_evaluators, - self.config["optimizer"]) - - def _train(self): - self.optimizer.step() - metric_lists = [ - re.get_metrics.remote() for re in self.remote_evaluators - ] - total_samples = 0 - total_loss = 0 - for metrics in metric_lists: - for m in ray.get(metrics): - total_samples += m["num_samples"] - total_loss += m["loss"] - result = dict( - mean_loss=total_loss / total_samples, - timesteps_this_iter=total_samples, - ) - return result - - def compute_action(self, observation): - action, info = self.local_evaluator.policy.compute(observation) - return action diff --git a/python/ray/rllib/agents/bc/bc_evaluator.py b/python/ray/rllib/agents/bc/bc_evaluator.py deleted file mode 100644 index 4726b4a3c..000000000 --- a/python/ray/rllib/agents/bc/bc_evaluator.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import pickle -from six.moves import queue - -import ray -from ray.rllib.agents.bc.experience_dataset import ExperienceDataset -from ray.rllib.agents.bc.policy import BCPolicy -from ray.rllib.evaluation.interface import EvaluatorInterface -from ray.rllib.models import ModelCatalog - - -class BCEvaluator(EvaluatorInterface): - def __init__(self, env_creator, config, logdir): - env = ModelCatalog.get_preprocessor_as_wrapper( - env_creator(config["env_config"]), config["model"]) - self.dataset = ExperienceDataset(config["dataset_path"]) - self.policy = BCPolicy(env.observation_space, env.action_space, config) - self.config = config - self.logdir = logdir - self.metrics_queue = queue.Queue() - - def sample(self): - return self.dataset.sample(self.config["batch_size"]) - - def compute_gradients(self, samples): - gradient, info = self.policy.compute_gradients(samples) - self.metrics_queue.put({ - "num_samples": info["num_samples"], - "loss": info["loss"] - }) - return gradient, {} - - def apply_gradients(self, grads): - self.policy.apply_gradients(grads) - - def get_weights(self): - return self.policy.get_weights() - - def set_weights(self, params): - self.policy.set_weights(params) - - def save(self): - weights = self.get_weights() - return pickle.dumps({"weights": weights}) - - def restore(self, objs): - objs = pickle.loads(objs) - self.set_weights(objs["weights"]) - - def get_metrics(self): - completed = [] - while True: - try: - completed.append(self.metrics_queue.get_nowait()) - except queue.Empty: - break - return completed - - -RemoteBCEvaluator = ray.remote(BCEvaluator) -GPURemoteBCEvaluator = ray.remote(num_gpus=1)(BCEvaluator) diff --git a/python/ray/rllib/agents/bc/experience_dataset.py b/python/ray/rllib/agents/bc/experience_dataset.py deleted file mode 100644 index d08284184..000000000 --- a/python/ray/rllib/agents/bc/experience_dataset.py +++ /dev/null @@ -1,34 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import itertools -import pickle - -import numpy as np - - -class ExperienceDataset(object): - def __init__(self, dataset_path): - """Create dataset of experience to imitate. - - Parameters - ---------- - dataset_path: - Path of file containing the database as pickled list of trajectories, - each trajectory being a list of steps, - each step containing the observation and action as its first two - elements. - The file must be available on each machine used by a BCEvaluator. - """ - self._dataset = list( - itertools.chain.from_iterable( - pickle.load(open(dataset_path, "rb")))) - - def sample(self, batch_size): - indexes = np.random.choice(len(self._dataset), batch_size) - samples = { - 'observations': [self._dataset[i][0] for i in indexes], - 'actions': [self._dataset[i][1] for i in indexes] - } - return samples diff --git a/python/ray/rllib/agents/bc/policy.py b/python/ray/rllib/agents/bc/policy.py deleted file mode 100644 index a504e3ec6..000000000 --- a/python/ray/rllib/agents/bc/policy.py +++ /dev/null @@ -1,106 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf -import gym - -import ray -from ray.rllib.models.catalog import ModelCatalog - - -class BCPolicy(object): - def __init__(self, obs_space, action_space, config): - self.local_steps = 0 - self.config = config - self.summarize = config.get("summarize") - self._setup_graph(obs_space, action_space) - self.setup_loss(action_space) - self.setup_gradients() - self.initialize() - - def _setup_graph(self, obs_space, ac_space): - self.x = tf.placeholder(tf.float32, [None] + list(obs_space.shape)) - dist_class, self.logit_dim = ModelCatalog.get_action_dist( - ac_space, self.config["model"]) - self._model = ModelCatalog.get_model(self.x, self.logit_dim, - self.config["model"]) - self.logits = self._model.outputs - self.curr_dist = dist_class(self.logits) - self.sample = self.curr_dist.sample() - self.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, - tf.get_variable_scope().name) - - def setup_loss(self, action_space): - if isinstance(action_space, gym.spaces.Box): - self.ac = tf.placeholder( - tf.float32, [None] + list(action_space.shape), name="ac") - elif isinstance(action_space, gym.spaces.Discrete): - self.ac = tf.placeholder(tf.int64, [None], name="ac") - else: - raise NotImplementedError("action space" + - str(type(action_space)) + - "currently not supported") - log_prob = self.curr_dist.logp(self.ac) - self.pi_loss = -tf.reduce_sum(log_prob) - self.loss = self.pi_loss - - def setup_gradients(self): - grads = tf.gradients(self.loss, self.var_list) - self.grads, _ = tf.clip_by_global_norm(grads, self.config["grad_clip"]) - grads_and_vars = list(zip(self.grads, self.var_list)) - opt = tf.train.AdamOptimizer(self.config["lr"]) - self._apply_gradients = opt.apply_gradients(grads_and_vars) - - def initialize(self): - if self.summarize: - bs = tf.to_float(tf.shape(self.x)[0]) - tf.summary.scalar("model/policy_loss", self.pi_loss / bs) - tf.summary.scalar("model/grad_gnorm", tf.global_norm(self.grads)) - tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list)) - self.summary_op = tf.summary.merge_all() - - # TODO(rliaw): Can consider exposing these parameters - self.sess = tf.Session( - graph=self.g, - config=tf.ConfigProto( - intra_op_parallelism_threads=1, - inter_op_parallelism_threads=2, - gpu_options=tf.GPUOptions(allow_growth=True))) - self.variables = ray.experimental.TensorFlowVariables( - self.loss, self.sess) - self.sess.run(tf.global_variables_initializer()) - - def compute_gradients(self, samples): - info = {} - feed_dict = { - self.x: samples["observations"], - self.ac: samples["actions"] - } - self.grads = [g for g in self.grads if g is not None] - self.local_steps += 1 - if self.summarize: - loss, grad, summ = self.sess.run( - [self.loss, self.grads, self.summary_op], feed_dict=feed_dict) - info["summary"] = summ - else: - loss, grad = self.sess.run( - [self.loss, self.grads], feed_dict=feed_dict) - info["num_samples"] = len(samples) - info["loss"] = loss - return grad, info - - def apply_gradients(self, grads): - feed_dict = {self.grads[i]: grads[i] for i in range(len(grads))} - self.sess.run(self._apply_gradients, feed_dict=feed_dict) - - def get_weights(self): - weights = self.variables.get_weights() - return weights - - def set_weights(self, weights): - self.variables.set_weights(weights) - - def compute(self, ob, *args): - action = self.sess.run(self.sample, {self.x: [ob]}) - return action, None diff --git a/python/ray/rllib/agents/ddpg/apex.py b/python/ray/rllib/agents/ddpg/apex.py index e809ac865..c1699364a 100644 --- a/python/ray/rllib/agents/ddpg/apex.py +++ b/python/ray/rllib/agents/ddpg/apex.py @@ -4,7 +4,6 @@ from __future__ import print_function from ray.rllib.agents.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG as DDPG_CONFIG from ray.rllib.utils import merge_dicts -from ray.tune.trial import Resources APEX_DDPG_DEFAULT_CONFIG = merge_dicts( DDPG_CONFIG, # see also the options in ddpg.py, which are also supported @@ -17,7 +16,7 @@ APEX_DDPG_DEFAULT_CONFIG = merge_dicts( "debug": False }), "n_step": 3, - "gpu": False, + "num_gpus": 0, "num_workers": 32, "buffer_size": 2000000, "learning_starts": 50000, @@ -43,15 +42,6 @@ class ApexDDPGAgent(DDPGAgent): _agent_name = "APEX_DDPG" _default_config = APEX_DDPG_DEFAULT_CONFIG - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources( - cpu=1, - gpu=cf["gpu"] and cf["gpu_fraction"] or 0, - extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) - def update_target_if_needed(self): # Ape-X updates based on num steps trained, not sampled if self.optimizer.num_steps_trained - self.last_target_update_ts > \ diff --git a/python/ray/rllib/agents/ddpg/ddpg.py b/python/ray/rllib/agents/ddpg/ddpg.py index ed58718b4..b3dded59d 100644 --- a/python/ray/rllib/agents/ddpg/ddpg.py +++ b/python/ray/rllib/agents/ddpg/ddpg.py @@ -90,16 +90,10 @@ DEFAULT_CONFIG = with_common_config({ "train_batch_size": 256, # === Parallelism === - # Whether to use a GPU for local optimization. - "gpu": False, # Number of workers for collecting samples with. This only makes sense # to increase if your environment is particularly slow to sample, or if # you"re using the Async or Ape-X optimizers. "num_workers": 0, - # Whether to allocate GPUs for workers (if > 0). - "num_gpus_per_worker": 0, - # Whether to allocate CPUs for workers (if > 0). - "num_cpus_per_worker": 1, # Optimizer class to use. "optimizer_class": "SyncReplayOptimizer", # Whether to use a distribution of epsilons across workers for exploration. diff --git a/python/ray/rllib/agents/dqn/apex.py b/python/ray/rllib/agents/dqn/apex.py index 585bdeea8..a6738d661 100644 --- a/python/ray/rllib/agents/dqn/apex.py +++ b/python/ray/rllib/agents/dqn/apex.py @@ -4,7 +4,6 @@ from __future__ import print_function from ray.rllib.agents.dqn.dqn import DQNAgent, DEFAULT_CONFIG as DQN_CONFIG from ray.rllib.utils import merge_dicts -from ray.tune.trial import Resources # yapf: disable # __sphinx_doc_begin__ @@ -19,7 +18,7 @@ APEX_DEFAULT_CONFIG = merge_dicts( "debug": False }), "n_step": 3, - "gpu": True, + "num_gpus": 1, "num_workers": 32, "buffer_size": 2000000, "learning_starts": 50000, @@ -46,15 +45,6 @@ class ApexAgent(DQNAgent): _agent_name = "APEX" _default_config = APEX_DEFAULT_CONFIG - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources( - cpu=1, - gpu=cf["gpu"] and cf["gpu_fraction"] or 0, - extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) - def update_target_if_needed(self): # Ape-X updates based on num steps trained, not sampled if self.optimizer.num_steps_trained - self.last_target_update_ts > \ diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index c26102161..29003b4a3 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -8,9 +8,7 @@ from ray.rllib import optimizers from ray.rllib.agents.agent import Agent, with_common_config from ray.rllib.agents.dqn.dqn_policy_graph import DQNPolicyGraph from ray.rllib.evaluation.metrics import collect_metrics -from ray.rllib.utils import merge_dicts from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule -from ray.tune.trial import Resources OPTIMIZER_SHARED_CONFIGS = [ "buffer_size", "prioritized_replay", "prioritized_replay_alpha", @@ -42,8 +40,6 @@ DEFAULT_CONFIG = with_common_config({ "hiddens": [256], # N-step Q learning "n_step": 1, - # Whether to use rllib or deepmind preprocessors - "preprocessor_pref": "deepmind", # === Exploration === # Max num timesteps for annealing schedules. Exploration is annealed from @@ -98,16 +94,10 @@ DEFAULT_CONFIG = with_common_config({ "train_batch_size": 32, # === Parallelism === - # Whether to use a GPU for local optimization. - "gpu": False, # Number of workers for collecting samples with. This only makes sense # to increase if your environment is particularly slow to sample, or if # you"re using the Async or Ape-X optimizers. "num_workers": 0, - # Whether to allocate GPUs for workers (if > 0). - "num_gpus_per_worker": 0, - # Whether to allocate CPUs for workers (if > 0). - "num_cpus_per_worker": 1, # Optimizer class to use. "optimizer_class": "SyncReplayOptimizer", # Whether to use a distribution of epsilons across workers for exploration. @@ -128,15 +118,6 @@ class DQNAgent(Agent): _default_config = DEFAULT_CONFIG _policy_graph = DQNPolicyGraph - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources( - cpu=1, - gpu=cf["gpu"] and cf["gpu_fraction"] or 0, - extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) - def _init(self): # Update effective batch size to include n-step adjusted_batch_size = max(self.config["sample_batch_size"], @@ -163,12 +144,9 @@ class DQNAgent(Agent): self.env_creator, self._policy_graph) def create_remote_evaluators(): - return self.make_remote_evaluators( - self.env_creator, self._policy_graph, - self.config["num_workers"], { - "num_cpus": self.config["num_cpus_per_worker"], - "num_gpus": self.config["num_gpus_per_worker"] - }) + return self.make_remote_evaluators(self.env_creator, + self._policy_graph, + self.config["num_workers"]) if self.config["optimizer_class"] != "AsyncReplayOptimizer": self.remote_evaluators = create_remote_evaluators() diff --git a/python/ray/rllib/agents/es/es.py b/python/ray/rllib/agents/es/es.py index 9d9d5e240..550296812 100644 --- a/python/ray/rllib/agents/es/es.py +++ b/python/ray/rllib/agents/es/es.py @@ -12,12 +12,10 @@ import time import ray from ray.rllib.agents import Agent, with_common_config -from ray.tune.trial import Resources from ray.rllib.agents.es import optimizers from ray.rllib.agents.es import policies from ray.rllib.agents.es import utils -from ray.rllib.utils import merge_dicts from ray.rllib.utils import FilterManager logger = logging.getLogger(__name__) @@ -169,11 +167,6 @@ class ESAgent(Agent): _agent_name = "ES" _default_config = DEFAULT_CONFIG - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"]) - def _init(self): policy_params = {"action_noise_std": 0.01} diff --git a/python/ray/rllib/agents/impala/impala.py b/python/ray/rllib/agents/impala/impala.py index c48ea1c58..b9665e9bf 100644 --- a/python/ray/rllib/agents/impala/impala.py +++ b/python/ray/rllib/agents/impala/impala.py @@ -8,7 +8,6 @@ from ray.rllib.agents.a3c.a3c_tf_policy_graph import A3CPolicyGraph from ray.rllib.agents.impala.vtrace_policy_graph import VTracePolicyGraph from ray.rllib.agents.agent import Agent, with_common_config from ray.rllib.optimizers import AsyncSamplesOptimizer -from ray.tune.trial import Resources OPTIMIZER_SHARED_CONFIGS = [ "lr", @@ -36,8 +35,6 @@ DEFAULT_CONFIG = with_common_config({ "train_batch_size": 500, "min_iter_time_s": 10, "num_workers": 2, - "num_cpus_per_worker": 1, - "num_gpus_per_worker": 0, # number of GPUs the learner should use. "num_gpus": 1, # set >1 to load data into GPUs in parallel. Increases GPU memory usage @@ -77,15 +74,6 @@ class ImpalaAgent(Agent): _default_config = DEFAULT_CONFIG _policy_graph = VTracePolicyGraph - @classmethod - def default_resource_request(cls, config): - cf = dict(cls._default_config, **config) - return Resources( - cpu=1, - gpu=cf["num_gpus"] and cf["num_gpus"] * cf["gpu_fraction"] or 0, - extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) - def _init(self): for k in OPTIMIZER_SHARED_CONFIGS: if k not in self.config["optimizer"]: @@ -97,8 +85,7 @@ class ImpalaAgent(Agent): self.local_evaluator = self.make_local_evaluator( self.env_creator, policy_cls) self.remote_evaluators = self.make_remote_evaluators( - self.env_creator, policy_cls, self.config["num_workers"], - {"num_cpus": 1}) + self.env_creator, policy_cls, self.config["num_workers"]) self.optimizer = AsyncSamplesOptimizer(self.local_evaluator, self.remote_evaluators, self.config["optimizer"]) diff --git a/python/ray/rllib/agents/mock.py b/python/ray/rllib/agents/mock.py index 526ec146a..f4bf90991 100644 --- a/python/ray/rllib/agents/mock.py +++ b/python/ray/rllib/agents/mock.py @@ -6,18 +6,19 @@ import os import pickle import numpy as np -from ray.rllib.agents.agent import Agent +from ray.rllib.agents.agent import Agent, with_common_config class _MockAgent(Agent): """Mock agent for use in tests""" _agent_name = "MockAgent" - _default_config = { + _default_config = with_common_config({ "mock_error": False, "persistent_error": False, - "test_variable": 1 - } + "test_variable": 1, + "num_workers": 0, + }) def _init(self): self.info = None @@ -59,13 +60,14 @@ class _SigmoidFakeData(_MockAgent): This can be helpful for evaluating early stopping algorithms.""" _agent_name = "SigmoidFakeData" - _default_config = { + _default_config = with_common_config({ "width": 100, "height": 100, "offset": 0, "iter_time": 10, "iter_timesteps": 1, - } + "num_workers": 0, + }) def _train(self): i = max(0, self.iteration - self.config["offset"]) @@ -82,13 +84,14 @@ class _SigmoidFakeData(_MockAgent): class _ParameterTuningAgent(_MockAgent): _agent_name = "ParameterTuningAgent" - _default_config = { + _default_config = with_common_config({ "reward_amt": 10, "dummy_param": 10, "dummy_param2": 15, "iter_time": 10, - "iter_timesteps": 1 - } + "iter_timesteps": 1, + "num_workers": 0, + }) def _train(self): return dict( diff --git a/python/ray/rllib/agents/pg/pg.py b/python/ray/rllib/agents/pg/pg.py index 055a8faf4..925cbc1a1 100644 --- a/python/ray/rllib/agents/pg/pg.py +++ b/python/ray/rllib/agents/pg/pg.py @@ -5,8 +5,6 @@ from __future__ import print_function from ray.rllib.agents.agent import Agent, with_common_config from ray.rllib.agents.pg.pg_policy_graph import PGPolicyGraph from ray.rllib.optimizers import SyncSamplesOptimizer -from ray.rllib.utils import merge_dicts -from ray.tune.trial import Resources # yapf: disable # __sphinx_doc_begin__ @@ -31,17 +29,11 @@ class PGAgent(Agent): _default_config = DEFAULT_CONFIG _policy_graph = PGPolicyGraph - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"]) - def _init(self): self.local_evaluator = self.make_local_evaluator( self.env_creator, self._policy_graph) self.remote_evaluators = self.make_remote_evaluators( - self.env_creator, self._policy_graph, self.config["num_workers"], - {}) + self.env_creator, self._policy_graph, self.config["num_workers"]) self.optimizer = SyncSamplesOptimizer(self.local_evaluator, self.remote_evaluators, self.config["optimizer"]) diff --git a/python/ray/rllib/agents/ppo/ppo.py b/python/ray/rllib/agents/ppo/ppo.py index df379536a..c6c166445 100644 --- a/python/ray/rllib/agents/ppo/ppo.py +++ b/python/ray/rllib/agents/ppo/ppo.py @@ -6,9 +6,7 @@ import logging from ray.rllib.agents import Agent, with_common_config from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph -from ray.rllib.utils import merge_dicts from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer -from ray.tune.trial import Resources logger = logging.getLogger(__name__) @@ -47,12 +45,6 @@ DEFAULT_CONFIG = with_common_config({ "vf_clip_param": 10.0, # Target value for KL divergence "kl_target": 0.01, - # Number of GPUs to use for SGD - "num_gpus": 0, - # Whether to allocate GPUs for workers (if > 0). - "num_gpus_per_worker": 0, - # Whether to allocate CPUs for workers (if > 0). - "num_cpus_per_worker": 1, # Whether to rollout "complete_episodes" or "truncate_episodes" "batch_mode": "truncate_episodes", # Which observation filter to apply to the observation @@ -71,24 +63,12 @@ class PPOAgent(Agent): _default_config = DEFAULT_CONFIG _policy_graph = PPOPolicyGraph - @classmethod - def default_resource_request(cls, config): - cf = merge_dicts(cls._default_config, config) - return Resources( - cpu=1, - gpu=cf["num_gpus"], - extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"], - extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"]) - def _init(self): self._validate_config() self.local_evaluator = self.make_local_evaluator( self.env_creator, self._policy_graph) self.remote_evaluators = self.make_remote_evaluators( - self.env_creator, self._policy_graph, self.config["num_workers"], { - "num_cpus": self.config["num_cpus_per_worker"], - "num_gpus": self.config["num_gpus_per_worker"] - }) + self.env_creator, self._policy_graph, self.config["num_workers"]) if self.config["simple_optimizer"]: self.optimizer = SyncSamplesOptimizer( self.local_evaluator, self.remote_evaluators, { diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 3344ce464..db5f7ee88 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -81,8 +81,9 @@ class PolicyEvaluator(EvaluatorInterface): """ @classmethod - def as_remote(cls, num_cpus=None, num_gpus=None): - return ray.remote(num_cpus=num_cpus, num_gpus=num_gpus)(cls) + def as_remote(cls, num_cpus=None, num_gpus=None, resources=None): + return ray.remote( + num_cpus=num_cpus, num_gpus=num_gpus, resources=resources)(cls) def __init__(self, env_creator, diff --git a/python/ray/rllib/examples/multiagent_cartpole.py b/python/ray/rllib/examples/multiagent_cartpole.py index 002e1fa98..87a8eb928 100644 --- a/python/ray/rllib/examples/multiagent_cartpole.py +++ b/python/ray/rllib/examples/multiagent_cartpole.py @@ -47,6 +47,8 @@ class CustomModel1(Model): auxiliary_name_scope=False): last_layer = slim.fully_connected( input_dict["obs"], 64, activation_fn=tf.nn.relu, scope="fc1") + last_layer = slim.fully_connected( + last_layer, 64, activation_fn=tf.nn.relu, scope="fc2") output = slim.fully_connected( last_layer, num_outputs, activation_fn=None, scope="fc_out") return output, last_layer @@ -61,6 +63,8 @@ class CustomModel2(Model): auxiliary_name_scope=False): last_layer = slim.fully_connected( input_dict["obs"], 64, activation_fn=tf.nn.relu, scope="fc1") + last_layer = slim.fully_connected( + last_layer, 64, activation_fn=tf.nn.relu, scope="fc2") output = slim.fully_connected( last_layer, num_outputs, activation_fn=None, scope="fc_out") return output, last_layer @@ -84,7 +88,7 @@ if __name__ == "__main__": "model": { "custom_model": ["model1", "model2"][i % 2], }, - "gamma": random.choice([0.5, 0.8, 0.9, 0.95, 0.99]), + "gamma": random.choice([0.95, 0.99]), } return (PPOPolicyGraph, obs_space, act_space, config) diff --git a/python/ray/rllib/optimizers/async_replay_optimizer.py b/python/ray/rllib/optimizers/async_replay_optimizer.py index 70779f9e9..3cd5a16ad 100644 --- a/python/ray/rllib/optimizers/async_replay_optimizer.py +++ b/python/ray/rllib/optimizers/async_replay_optimizer.py @@ -134,9 +134,10 @@ class LearnerThread(threading.Thread): self.grad_timer = TimerStat() self.daemon = True self.weights_updated = False + self.stopped = False def run(self): - while True: + while not self.stopped: self.step() def step(self): @@ -299,6 +300,11 @@ class AsyncReplayOptimizer(PolicyOptimizer): return sample_timesteps, train_timesteps + def stop(self): + for r in self.replay_actors: + r.__ray_terminate__.remote() + self.learner.stopped = True + def stats(self): replay_stats = ray.get(self.replay_actors[0].stats.remote(self.debug)) timing = { diff --git a/python/ray/rllib/optimizers/async_samples_optimizer.py b/python/ray/rllib/optimizers/async_samples_optimizer.py index 5ad6bd809..e0ff26ed2 100644 --- a/python/ray/rllib/optimizers/async_samples_optimizer.py +++ b/python/ray/rllib/optimizers/async_samples_optimizer.py @@ -49,9 +49,10 @@ class LearnerThread(threading.Thread): self.daemon = True self.weights_updated = False self.stats = {} + self.stopped = False def run(self): - while True: + while not self.stopped: self.step() def step(self): @@ -329,6 +330,9 @@ class AsyncSamplesOptimizer(PolicyOptimizer): return sample_timesteps, train_timesteps + def stop(self): + self.learner.stopped = True + def stats(self): timing = { "{}_time_ms".format(k): round(1000 * self.timers[k].mean, 3) diff --git a/python/ray/rllib/optimizers/policy_optimizer.py b/python/ray/rllib/optimizers/policy_optimizer.py index 3a0d3cf80..3f958c4d6 100644 --- a/python/ray/rllib/optimizers/policy_optimizer.py +++ b/python/ray/rllib/optimizers/policy_optimizer.py @@ -143,6 +143,10 @@ class PolicyOptimizer(object): ]) return local_result + remote_results + def stop(self): + """Release any resources used by this optimizer.""" + pass + @staticmethod def _check_not_multiagent(sample_batch): if isinstance(sample_batch, MultiAgentBatch): diff --git a/python/ray/rllib/tuned_examples/atari-a2c.yaml b/python/ray/rllib/tuned_examples/atari-a2c.yaml index 53d1937cd..42ea11963 100644 --- a/python/ray/rllib/tuned_examples/atari-a2c.yaml +++ b/python/ray/rllib/tuned_examples/atari-a2c.yaml @@ -13,7 +13,7 @@ atari-a2c: clip_rewards: True num_workers: 5 num_envs_per_worker: 5 - gpu: true + num_gpus: 1 lr_schedule: [ [0, 0.0007], [20000000, 0.000000000001], diff --git a/python/ray/rllib/tuned_examples/atari-apex.yaml b/python/ray/rllib/tuned_examples/atari-apex.yaml index 23b1f19c1..e24e347dd 100644 --- a/python/ray/rllib/tuned_examples/atari-apex.yaml +++ b/python/ray/rllib/tuned_examples/atari-apex.yaml @@ -23,7 +23,7 @@ apex: prioritized_replay_alpha: 0.5 beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 - gpu: true + num_gpus: 1 # APEX num_workers: 8 diff --git a/python/ray/rllib/tuned_examples/atari-dist-dqn.yaml b/python/ray/rllib/tuned_examples/atari-dist-dqn.yaml index d71932986..57cd5635d 100644 --- a/python/ray/rllib/tuned_examples/atari-dist-dqn.yaml +++ b/python/ray/rllib/tuned_examples/atari-dist-dqn.yaml @@ -27,5 +27,5 @@ basic-dqn: prioritized_replay_alpha: 0.5 beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 - gpu: true + num_gpus: 1 timesteps_per_iteration: 10000 diff --git a/python/ray/rllib/tuned_examples/atari-dqn.yaml b/python/ray/rllib/tuned_examples/atari-dqn.yaml index 492901787..264ddfd27 100644 --- a/python/ray/rllib/tuned_examples/atari-dqn.yaml +++ b/python/ray/rllib/tuned_examples/atari-dqn.yaml @@ -29,5 +29,5 @@ atari-basic-dqn: prioritized_replay_alpha: 0.5 beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 - gpu: true + num_gpus: 1 timesteps_per_iteration: 10000 diff --git a/python/ray/rllib/tuned_examples/atari-duel-ddqn.yaml b/python/ray/rllib/tuned_examples/atari-duel-ddqn.yaml index 61ed3120d..be59d15ba 100644 --- a/python/ray/rllib/tuned_examples/atari-duel-ddqn.yaml +++ b/python/ray/rllib/tuned_examples/atari-duel-ddqn.yaml @@ -27,5 +27,5 @@ dueling-ddqn: prioritized_replay_alpha: 0.5 beta_annealing_fraction: 1.0 final_prioritized_replay_beta: 1.0 - gpu: true + num_gpus: 1 timesteps_per_iteration: 10000 diff --git a/python/ray/rllib/tuned_examples/pong-dqn.yaml b/python/ray/rllib/tuned_examples/pong-dqn.yaml index a0d39cc3d..2c3e5a877 100644 --- a/python/ray/rllib/tuned_examples/pong-dqn.yaml +++ b/python/ray/rllib/tuned_examples/pong-dqn.yaml @@ -6,7 +6,7 @@ pong-deterministic-dqn: episode_reward_mean: 20 time_total_s: 7200 config: - gpu: True + num_gpus: 1 gamma: 0.99 lr: .0001 learning_starts: 10000 diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 4a216a60d..2db878714 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -29,6 +29,8 @@ class RayTrialExecutor(TrialExecutor): self._avail_resources = Resources(cpu=0, gpu=0) self._committed_resources = Resources(cpu=0, gpu=0) self._resources_initialized = False + if ray.is_initialized(): + self._update_avail_resources() def _setup_runner(self, trial): cls = ray.remote( @@ -257,7 +259,16 @@ class RayTrialExecutor(TrialExecutor): self._committed_resources.cpu, self._avail_resources.cpu, self._committed_resources.gpu, self._avail_resources.gpu) else: - return "" + return "Resources requested: ?" + + def resource_string(self): + """Returns a string describing the total resources available.""" + + if self._resources_initialized: + return "{} CPUs, {} GPUs".format(self._avail_resources.cpu, + self._avail_resources.gpu) + else: + return "? CPUs, ? GPUs" def on_step_begin(self): """Before step() called, update the available resources.""" diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index b961d12b8..61f272b6a 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -158,7 +158,11 @@ class TrialExecutor(object): def debug_string(self): """Returns a human readable message for printing to the console.""" - pass + raise NotImplementedError + + def resource_string(self): + """Returns a string describing the total resources available.""" + raise NotImplementedError def restore(self, trial, checkpoint=None): """Restores training state from a checkpoint. diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 8a86ae500..01fa510c4 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -122,13 +122,13 @@ class TrialRunner(object): if not self.has_resources(trial.resources): raise TuneError( ("Insufficient cluster resources to launch trial: " - "trial requested {} but the cluster summary: {} " + "trial requested {} but the cluster has only {}. " "Pass `queue_trials=True` in " "ray.tune.run_experiments() or on the command " "line to queue trials until the cluster scales " "up. {}").format( trial.resources.summary_string(), - self.trial_executor.debug_string(), + self.trial_executor.resource_string(), trial._get_trainable_cls().resource_help( trial.config))) elif trial.status == Trial.PAUSED: diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index b160f73bc..77875d59c 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -99,7 +99,7 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --env CartPole-v0 \ --run APEX \ --stop '{"training_iteration": 2}' \ - --config '{"num_workers": 2, "timesteps_per_iteration": 1000, "gpu": false, "min_iter_time_s": 1}' + --config '{"num_workers": 2, "timesteps_per_iteration": 1000, "num_gpus": 0, "min_iter_time_s": 1}' docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \