From 52e5c9b22d93ec5b73567f1ac88b9b0152eb5fbf Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 13 Oct 2019 20:31:42 -0700 Subject: [PATCH] [tune] CPU-Only Head Node support (#5900) * trialqueue * add tests --- python/ray/tune/ray_trial_executor.py | 21 +++--- python/ray/tune/tests/test_cluster.py | 54 +++++++++++++++ .../ray/tune/tests/test_ray_trial_executor.py | 67 +++++++++++++++++++ python/ray/tune/tests/test_trial_runner.py | 3 - 4 files changed, 129 insertions(+), 16 deletions(-) diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 5e8c1f42a..b3ce9dd34 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -45,6 +45,9 @@ class RayTrialExecutor(TrialExecutor): ray_auto_init=False, refresh_period=RESOURCE_REFRESH_PERIOD): super(RayTrialExecutor, self).__init__(queue_trials) + # Check for if we are launching a trial without resources in kick off + # autoscaler. + self._trial_queued = False self._running = {} # Since trial resume after paused should not run # trial.train.remote(), thus no more new remote object id generated. @@ -454,22 +457,14 @@ class RayTrialExecutor(TrialExecutor): for res in resources.custom_resources)) if have_space: + # The assumption right now is that we block all trials if one + # trial is queued. + self._trial_queued = False return True - can_overcommit = self._queue_trials - - if ((resources.cpu_total() > 0 and currently_available.cpu <= 0) - or (resources.gpu_total() > 0 and currently_available.gpu <= 0) - or - (resources.memory_total() > 0 and currently_available.memory <= 0) - or (resources.object_store_memory_total() > 0 - and currently_available.object_store_memory <= 0) or any( - (resources.get_res_total(res_name) > 0 - and currently_available.get(res_name) <= 0) - for res_name in resources.custom_resources)): - can_overcommit = False # requested resource is already saturated - + can_overcommit = self._queue_trials and not self._trial_queued if can_overcommit: + self._trial_queued = True logger.warning( "Allowing trial to start even though the " "cluster does not have enough free resources. Trial actors " diff --git a/python/ray/tune/tests/test_cluster.py b/python/ray/tune/tests/test_cluster.py index ae287cfda..a6ff605d9 100644 --- a/python/ray/tune/tests/test_cluster.py +++ b/python/ray/tune/tests/test_cluster.py @@ -16,8 +16,10 @@ from ray.rllib import _register_all from ray.tests.cluster_utils import Cluster from ray.tests.utils import run_string_as_driver_nonblocking from ray.tune.error import TuneError +from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.experiment import Experiment from ray.tune.trial import Trial +from ray.tune.resources import Resources from ray.tune.trial_runner import TrialRunner from ray.tune.suggest import BasicVariantGenerator @@ -156,6 +158,58 @@ def test_remove_node_before_result(start_connected_emptyhead_cluster): runner.step() +def test_queue_trials(start_connected_emptyhead_cluster): + """Tests explicit oversubscription for autoscaling. + + Tune oversubscribes a trial when `queue_trials=True`, but + does not block other trials from running. + """ + cluster = start_connected_emptyhead_cluster + runner = TrialRunner() + + def create_trial(cpu, gpu=0): + kwargs = { + "resources": Resources(cpu=cpu, gpu=gpu), + "stopping_criterion": { + "training_iteration": 3 + } + } + return Trial("__fake", **kwargs) + + runner.add_trial(create_trial(cpu=1)) + with pytest.raises(TuneError): + runner.step() # run 1 + + del runner + + executor = RayTrialExecutor(queue_trials=True) + runner = TrialRunner(trial_executor=executor) + cluster.add_node(num_cpus=2) + cluster.wait_for_nodes() + + cpu_only = create_trial(cpu=1) + runner.add_trial(cpu_only) + runner.step() # add cpu_only trial + + gpu_trial = create_trial(cpu=1, gpu=1) + runner.add_trial(gpu_trial) + runner.step() # queue gpu_trial + + # This tests that the cpu_only trial should bypass the queued trial. + for i in range(3): + runner.step() + assert cpu_only.status == Trial.TERMINATED + assert gpu_trial.status == Trial.RUNNING + + # Scale up + cluster.add_node(num_cpus=1, num_gpus=1) + cluster.wait_for_nodes() + + for i in range(3): + runner.step() + assert gpu_trial.status == Trial.TERMINATED + + def test_trial_migration(start_connected_emptyhead_cluster): """Removing a node while cluster has space should migrate trial. diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 1acc5d3ff..12e49f505 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import unittest import ray @@ -13,6 +14,7 @@ from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, Checkpoint from ray.tune.resources import Resources +from ray.tests.cluster_utils import Cluster class RayTrialExecutorTest(unittest.TestCase): @@ -112,6 +114,71 @@ class RayTrialExecutorTest(unittest.TestCase): return suggester.next_trials() +class RayExecutorQueueTest(unittest.TestCase): + def setUp(self): + self.trial_executor = RayTrialExecutor( + queue_trials=True, refresh_period=0) + self.cluster = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "num_cpus": 1, + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 10 + }) + }) + # Pytest doesn't play nicely with imports + _register_all() + + def tearDown(self): + ray.shutdown() + self.cluster.shutdown() + _register_all() # re-register the evicted objects + + def testQueueTrial(self): + """Tests that reset handles NotImplemented properly.""" + + def create_trial(cpu, gpu=0): + return Trial("__fake", resources=Resources(cpu=cpu, gpu=gpu)) + + cpu_only = create_trial(1, 0) + self.assertTrue(self.trial_executor.has_resources(cpu_only.resources)) + self.trial_executor.start_trial(cpu_only) + + gpu_only = create_trial(0, 1) + self.assertTrue(self.trial_executor.has_resources(gpu_only.resources)) + + def testHeadBlocking(self): + def create_trial(cpu, gpu=0): + return Trial("__fake", resources=Resources(cpu=cpu, gpu=gpu)) + + gpu_trial = create_trial(1, 1) + self.assertTrue(self.trial_executor.has_resources(gpu_trial.resources)) + self.trial_executor.start_trial(gpu_trial) + + # TODO(rliaw): This behavior is probably undesirable, but right now + # trials with different resource requirements is not often used. + cpu_only_trial = create_trial(1, 0) + self.assertFalse( + self.trial_executor.has_resources(cpu_only_trial.resources)) + + self.cluster.add_node(num_cpus=1, num_gpus=1) + self.cluster.wait_for_nodes() + + self.assertTrue( + self.trial_executor.has_resources(cpu_only_trial.resources)) + self.trial_executor.start_trial(cpu_only_trial) + + cpu_only_trial2 = create_trial(1, 0) + self.assertTrue( + self.trial_executor.has_resources(cpu_only_trial2.resources)) + self.trial_executor.start_trial(cpu_only_trial2) + + cpu_only_trial3 = create_trial(1, 0) + self.assertFalse( + self.trial_executor.has_resources(cpu_only_trial3.resources)) + + class LocalModeExecutorTest(RayTrialExecutorTest): def setUp(self): self.trial_executor = RayTrialExecutor(queue_trials=False) diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index 2d62db3bb..64c5fbea1 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -270,9 +270,6 @@ class TrainableFunctionApiTest(unittest.TestCase): self.assertEqual(f(1, 0, True).status, Trial.TERMINATED) self.assertEqual(f(1, 0, True).status, Trial.TERMINATED) - # Infeasible even with queueing enabled (no gpus) - self.assertRaises(TuneError, lambda: f(1, 1, True)) - # Too large resource request self.assertRaises(TuneError, lambda: f(100, 100, False)) self.assertRaises(TuneError, lambda: f(0, 100, False))