[tune] Support Custom Resources (#2979)

Support arbitrary resource declarations in Tune.

Fixes https://github.com/ray-project/ray/issues/2875
This commit is contained in:
Richard Liaw
2019-02-07 00:29:19 -08:00
committed by GitHub
parent a654152f9c
commit 5db1afef07
4 changed files with 287 additions and 35 deletions
+67 -19
View File
@@ -35,7 +35,9 @@ class RayTrialExecutor(TrialExecutor):
def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu)(trial._get_trainable_cls())
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())
trial.init_logger()
# We checkpoint metadata here to try mitigating logdir duplication
@@ -229,16 +231,37 @@ class RayTrialExecutor(TrialExecutor):
return result
def _commit_resources(self, resources):
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))
custom_resources = {
k: committed.get(k) + resources.get_res_total(k)
for k in all_keys
}
self._committed_resources = Resources(
self._committed_resources.cpu + resources.cpu_total(),
self._committed_resources.gpu + resources.gpu_total())
committed.cpu + resources.cpu_total(),
committed.gpu + resources.gpu_total(),
custom_resources=custom_resources)
def _return_resources(self, resources):
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))
custom_resources = {
k: committed.get(k) - resources.get_res_total(k)
for k in all_keys
}
self._committed_resources = Resources(
self._committed_resources.cpu - resources.cpu_total(),
self._committed_resources.gpu - resources.gpu_total())
assert self._committed_resources.cpu >= 0
assert self._committed_resources.gpu >= 0
committed.cpu - resources.cpu_total(),
committed.gpu - resources.gpu_total(),
custom_resources=custom_resources)
assert self._committed_resources.is_nonnegative(), (
"Resource invalid: {}".format(resources))
def _update_avail_resources(self, num_retries=5):
for i in range(num_retries):
@@ -247,28 +270,37 @@ class RayTrialExecutor(TrialExecutor):
logger.warning("Cluster resources not detected. Retrying...")
time.sleep(0.5)
num_cpus = resources["CPU"]
num_gpus = resources["GPU"]
resources = resources.copy()
num_cpus = resources.pop("CPU")
num_gpus = resources.pop("GPU")
custom_resources = resources
self._avail_resources = Resources(int(num_cpus), int(num_gpus))
self._avail_resources = Resources(
int(num_cpus), int(num_gpus), custom_resources=custom_resources)
self._resources_initialized = True
def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources."""
self._update_avail_resources()
cpu_avail = self._avail_resources.cpu - self._committed_resources.cpu
gpu_avail = self._avail_resources.gpu - self._committed_resources.gpu
currently_available = Resources.subtract(self._avail_resources,
self._committed_resources)
have_space = (resources.cpu_total() <= cpu_avail
and resources.gpu_total() <= gpu_avail)
have_space = (
resources.cpu_total() <= currently_available.cpu
and resources.gpu_total() <= currently_available.gpu and all(
resources.get_res_total(res) <= currently_available.get(res)
for res in resources.custom_resources))
if have_space:
return True
can_overcommit = self._queue_trials
if (resources.cpu_total() > 0 and cpu_avail <= 0) or \
(resources.gpu_total() > 0 and gpu_avail <= 0):
if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \
(resources.gpu_total() > 0 and currently_available.gpu <= 0) or \
any((resources.get_res_total(res_name) > 0
and currently_available.get(res_name) <= 0)
for res_name in resources.custom_resources):
can_overcommit = False # requested resource is already saturated
if can_overcommit:
@@ -287,9 +319,18 @@ class RayTrialExecutor(TrialExecutor):
"""Returns a human readable message for printing to the console."""
if self._resources_initialized:
return "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
customs = ", ".join([
"{}/{} {}".format(
self._committed_resources.get_res_total(name),
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources
])
if customs:
status += " ({})".format(customs)
return status
else:
return "Resources requested: ?"
@@ -297,8 +338,15 @@ class RayTrialExecutor(TrialExecutor):
"""Returns a string describing the total resources available."""
if self._resources_initialized:
return "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
if self._avail_resources.custom_resources:
custom = ", ".join(
"{} {}".format(
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources)
res_str += " ({})".format(custom)
return res_str
else:
return "? CPUs, ? GPUs"
+137 -1
View File
@@ -23,7 +23,8 @@ from ray.tune.result import (DEFAULT_RESULTS_DIR, TIMESTEPS_TOTAL, DONE,
from ray.tune.logger import Logger
from ray.tune.util import pin_in_object_store, get_pinned_object
from ray.tune.experiment import Experiment
from ray.tune.trial import Trial, Resources, ExportFormat
from ray.tune.trial import (Trial, ExportFormat, Resources, resources_to_json,
json_to_resources)
from ray.tune.trial_runner import TrialRunner
from ray.tune.suggest import grid_search, BasicVariantGenerator
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
@@ -736,6 +737,28 @@ class RunExperimentTest(unittest.TestCase):
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)
def testCustomResources(self):
ray.shutdown()
ray.init(resources={"hi": 3})
class train(Trainable):
def _train(self):
return {"timesteps_this_iter": 1, "done": True}
trials = run_experiments({
"foo": {
"run": train,
"resources_per_trial": {
"cpu": 1,
"custom_resources": {
"hi": 2
}
}
}
})
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)
def testCustomLogger(self):
class CustomLogger(Logger):
def on_result(self, result):
@@ -1083,6 +1106,62 @@ class TrialRunnerTest(unittest.TestCase):
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
def testCustomResources(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=0, custom_resources={"a": 2}),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
def testExtraCustomResources(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(
cpu=1, gpu=0, extra_custom_resources={"a": 2}),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertTrue(sum(t.status == Trial.RUNNING for t in trials) < 2)
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
def testCustomResources2(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource1))
resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource2))
resource3 = Resources(cpu=1, gpu=0, custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource3))
resource4 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource4))
def testFractionalGpus(self):
ray.init(num_cpus=4, num_gpus=1)
runner = TrialRunner(BasicVariantGenerator())
@@ -1292,6 +1371,7 @@ class TrialRunnerTest(unittest.TestCase):
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
@@ -1878,5 +1958,61 @@ class SearchAlgorithmTest(unittest.TestCase):
self.assertTrue("d=4" in trial.experiment_tag)
class ResourcesTest(unittest.TestCase):
def testSubtraction(self):
resource_1 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
resource_2 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
new_res = Resources.subtract(resource_1, resource_2)
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(all(k == 0 for k in new_res.custom_resources.values()))
self.assertTrue(
all(k == 0 for k in new_res.extra_custom_resources.values()))
def testDifferentResources(self):
resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2})
new_res = Resources.subtract(resource_1, resource_2)
assert "c" in new_res.custom_resources
assert "b" in new_res.custom_resources
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(new_res.get("a") == 0)
def testSerialization(self):
original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
jsoned = resources_to_json(original)
new_resource = json_to_resources(jsoned)
self.assertEquals(original, new_resource)
if __name__ == "__main__":
unittest.main(verbosity=2)
+82 -14
View File
@@ -11,10 +11,10 @@ import json
import time
import tempfile
import os
from numbers import Number
# For compatibility under py2 to consider unicode as str
from six import string_types
from numbers import Number
import ray
from ray.tune import TuneError
@@ -38,11 +38,12 @@ def date_str():
class Resources(
namedtuple("Resources", ["cpu", "gpu", "extra_cpu", "extra_gpu"])):
namedtuple("Resources", [
"cpu", "gpu", "extra_cpu", "extra_gpu", "custom_resources",
"extra_custom_resources"
])):
"""Ray resources required to schedule a trial.
TODO: Custom resources.
Attributes:
cpu (float): Number of CPUs to allocate to the trial.
gpu (float): Number of GPUs to allocate to the trial.
@@ -50,21 +51,51 @@ class Resources(
launch additional Ray actors that use CPUs.
extra_gpu (float): Extra GPUs to reserve in case the trial needs to
launch additional Ray actors that use GPUs.
custom_resources (dict): Mapping of resource to quantity to allocate
to the trial.
extra_custom_resources (dict): Extra custom resources to reserve in
case the trial needs to launch additional Ray actors that use
any of these custom resources.
"""
__slots__ = ()
def __new__(cls, cpu, gpu, extra_cpu=0, extra_gpu=0):
for entry in [cpu, gpu, extra_cpu, extra_gpu]:
def __new__(cls,
cpu,
gpu,
extra_cpu=0,
extra_gpu=0,
custom_resources=None,
extra_custom_resources=None):
custom_resources = custom_resources or {}
extra_custom_resources = extra_custom_resources or {}
leftovers = set(custom_resources) ^ set(extra_custom_resources)
for value in leftovers:
custom_resources.setdefault(value, 0)
extra_custom_resources.setdefault(value, 0)
all_values = [cpu, gpu, extra_cpu, extra_gpu]
all_values += list(custom_resources.values())
all_values += list(extra_custom_resources.values())
assert len(custom_resources) == len(extra_custom_resources)
for entry in all_values:
assert isinstance(entry, Number), "Improper resource value."
assert entry >= 0, "Resource cannot be negative."
return super(Resources, cls).__new__(cls, cpu, gpu, extra_cpu,
extra_gpu)
return super(Resources,
cls).__new__(cls, cpu, gpu, extra_cpu, extra_gpu,
custom_resources, extra_custom_resources)
def summary_string(self):
return "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu,
self.gpu + self.extra_gpu)
summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu,
self.gpu + self.extra_gpu)
custom_summary = ", ".join([
"{} {}".format(self.get_res_total(res), res)
for res in self.custom_resources
])
if custom_summary:
summary += " ({})".format(custom_summary)
return summary
def cpu_total(self):
return self.cpu + self.extra_cpu
@@ -72,6 +103,40 @@ class Resources(
def gpu_total(self):
return self.gpu + self.extra_gpu
def get_res_total(self, key):
return self.custom_resources.get(
key, 0) + self.extra_custom_resources.get(key, 0)
def get(self, key):
return self.custom_resources.get(key, 0)
def is_nonnegative(self):
all_values = [self.cpu, self.gpu, self.extra_cpu, self.extra_gpu]
all_values += list(self.custom_resources.values())
all_values += list(self.extra_custom_resources.values())
return all(v >= 0 for v in all_values)
@classmethod
def subtract(cls, original, to_remove):
cpu = original.cpu - to_remove.cpu
gpu = original.gpu - to_remove.gpu
extra_cpu = original.extra_cpu - to_remove.extra_cpu
extra_gpu = original.extra_gpu - to_remove.extra_gpu
all_resources = set(original.custom_resources).union(
set(to_remove.custom_resources))
new_custom_res = {
k: original.custom_resources.get(k, 0) -
to_remove.custom_resources.get(k, 0)
for k in all_resources
}
extra_custom_res = {
k: original.extra_custom_resources.get(k, 0) -
to_remove.extra_custom_resources.get(k, 0)
for k in all_resources
}
return Resources(cpu, gpu, extra_cpu, extra_gpu, new_custom_res,
extra_custom_res)
def json_to_resources(data):
if data is None or data == "null":
@@ -84,12 +149,13 @@ def json_to_resources(data):
"The field `{}` is no longer supported. Use `extra_cpu` "
"or `extra_gpu` instead.".format(k))
if k not in Resources._fields:
raise TuneError(
"Unknown resource type {}, must be one of {}".format(
raise ValueError(
"Unknown resource field {}, must be one of {}".format(
k, Resources._fields))
return Resources(
data.get("cpu", 1), data.get("gpu", 0), data.get("extra_cpu", 0),
data.get("extra_gpu", 0))
data.get("extra_gpu", 0), data.get("custom_resources"),
data.get("extra_custom_resources"))
def resources_to_json(resources):
@@ -100,6 +166,8 @@ def resources_to_json(resources):
"gpu": resources.gpu,
"extra_cpu": resources.extra_cpu,
"extra_gpu": resources.extra_gpu,
"custom_resources": resources.custom_resources.copy(),
"extra_custom_resources": resources.extra_custom_resources.copy()
}
+1 -1
View File
@@ -159,7 +159,7 @@ def run_experiments(experiments,
metadata_checkpoint_dir=checkpoint_dir,
launch_web_server=with_server,
server_port=server_port,
verbose=int(verbose > 1),
verbose=bool(verbose > 1),
queue_trials=queue_trials,
trial_executor=trial_executor)