mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 13:47:22 +08:00
@@ -45,6 +45,9 @@ class RayTrialExecutor(TrialExecutor):
|
||||
ray_auto_init=False,
|
||||
refresh_period=RESOURCE_REFRESH_PERIOD):
|
||||
super(RayTrialExecutor, self).__init__(queue_trials)
|
||||
# Check for if we are launching a trial without resources in kick off
|
||||
# autoscaler.
|
||||
self._trial_queued = False
|
||||
self._running = {}
|
||||
# Since trial resume after paused should not run
|
||||
# trial.train.remote(), thus no more new remote object id generated.
|
||||
@@ -454,22 +457,14 @@ class RayTrialExecutor(TrialExecutor):
|
||||
for res in resources.custom_resources))
|
||||
|
||||
if have_space:
|
||||
# The assumption right now is that we block all trials if one
|
||||
# trial is queued.
|
||||
self._trial_queued = False
|
||||
return True
|
||||
|
||||
can_overcommit = self._queue_trials
|
||||
|
||||
if ((resources.cpu_total() > 0 and currently_available.cpu <= 0)
|
||||
or (resources.gpu_total() > 0 and currently_available.gpu <= 0)
|
||||
or
|
||||
(resources.memory_total() > 0 and currently_available.memory <= 0)
|
||||
or (resources.object_store_memory_total() > 0
|
||||
and currently_available.object_store_memory <= 0) or any(
|
||||
(resources.get_res_total(res_name) > 0
|
||||
and currently_available.get(res_name) <= 0)
|
||||
for res_name in resources.custom_resources)):
|
||||
can_overcommit = False # requested resource is already saturated
|
||||
|
||||
can_overcommit = self._queue_trials and not self._trial_queued
|
||||
if can_overcommit:
|
||||
self._trial_queued = True
|
||||
logger.warning(
|
||||
"Allowing trial to start even though the "
|
||||
"cluster does not have enough free resources. Trial actors "
|
||||
|
||||
@@ -16,8 +16,10 @@ from ray.rllib import _register_all
|
||||
from ray.tests.cluster_utils import Cluster
|
||||
from ray.tests.utils import run_string_as_driver_nonblocking
|
||||
from ray.tune.error import TuneError
|
||||
from ray.tune.ray_trial_executor import RayTrialExecutor
|
||||
from ray.tune.experiment import Experiment
|
||||
from ray.tune.trial import Trial
|
||||
from ray.tune.resources import Resources
|
||||
from ray.tune.trial_runner import TrialRunner
|
||||
from ray.tune.suggest import BasicVariantGenerator
|
||||
|
||||
@@ -156,6 +158,58 @@ def test_remove_node_before_result(start_connected_emptyhead_cluster):
|
||||
runner.step()
|
||||
|
||||
|
||||
def test_queue_trials(start_connected_emptyhead_cluster):
|
||||
"""Tests explicit oversubscription for autoscaling.
|
||||
|
||||
Tune oversubscribes a trial when `queue_trials=True`, but
|
||||
does not block other trials from running.
|
||||
"""
|
||||
cluster = start_connected_emptyhead_cluster
|
||||
runner = TrialRunner()
|
||||
|
||||
def create_trial(cpu, gpu=0):
|
||||
kwargs = {
|
||||
"resources": Resources(cpu=cpu, gpu=gpu),
|
||||
"stopping_criterion": {
|
||||
"training_iteration": 3
|
||||
}
|
||||
}
|
||||
return Trial("__fake", **kwargs)
|
||||
|
||||
runner.add_trial(create_trial(cpu=1))
|
||||
with pytest.raises(TuneError):
|
||||
runner.step() # run 1
|
||||
|
||||
del runner
|
||||
|
||||
executor = RayTrialExecutor(queue_trials=True)
|
||||
runner = TrialRunner(trial_executor=executor)
|
||||
cluster.add_node(num_cpus=2)
|
||||
cluster.wait_for_nodes()
|
||||
|
||||
cpu_only = create_trial(cpu=1)
|
||||
runner.add_trial(cpu_only)
|
||||
runner.step() # add cpu_only trial
|
||||
|
||||
gpu_trial = create_trial(cpu=1, gpu=1)
|
||||
runner.add_trial(gpu_trial)
|
||||
runner.step() # queue gpu_trial
|
||||
|
||||
# This tests that the cpu_only trial should bypass the queued trial.
|
||||
for i in range(3):
|
||||
runner.step()
|
||||
assert cpu_only.status == Trial.TERMINATED
|
||||
assert gpu_trial.status == Trial.RUNNING
|
||||
|
||||
# Scale up
|
||||
cluster.add_node(num_cpus=1, num_gpus=1)
|
||||
cluster.wait_for_nodes()
|
||||
|
||||
for i in range(3):
|
||||
runner.step()
|
||||
assert gpu_trial.status == Trial.TERMINATED
|
||||
|
||||
|
||||
def test_trial_migration(start_connected_emptyhead_cluster):
|
||||
"""Removing a node while cluster has space should migrate trial.
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import json
|
||||
import unittest
|
||||
|
||||
import ray
|
||||
@@ -13,6 +14,7 @@ from ray.tune.registry import _global_registry, TRAINABLE_CLASS
|
||||
from ray.tune.suggest import BasicVariantGenerator
|
||||
from ray.tune.trial import Trial, Checkpoint
|
||||
from ray.tune.resources import Resources
|
||||
from ray.tests.cluster_utils import Cluster
|
||||
|
||||
|
||||
class RayTrialExecutorTest(unittest.TestCase):
|
||||
@@ -112,6 +114,71 @@ class RayTrialExecutorTest(unittest.TestCase):
|
||||
return suggester.next_trials()
|
||||
|
||||
|
||||
class RayExecutorQueueTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.trial_executor = RayTrialExecutor(
|
||||
queue_trials=True, refresh_period=0)
|
||||
self.cluster = Cluster(
|
||||
initialize_head=True,
|
||||
connect=True,
|
||||
head_node_args={
|
||||
"num_cpus": 1,
|
||||
"_internal_config": json.dumps({
|
||||
"num_heartbeats_timeout": 10
|
||||
})
|
||||
})
|
||||
# Pytest doesn't play nicely with imports
|
||||
_register_all()
|
||||
|
||||
def tearDown(self):
|
||||
ray.shutdown()
|
||||
self.cluster.shutdown()
|
||||
_register_all() # re-register the evicted objects
|
||||
|
||||
def testQueueTrial(self):
|
||||
"""Tests that reset handles NotImplemented properly."""
|
||||
|
||||
def create_trial(cpu, gpu=0):
|
||||
return Trial("__fake", resources=Resources(cpu=cpu, gpu=gpu))
|
||||
|
||||
cpu_only = create_trial(1, 0)
|
||||
self.assertTrue(self.trial_executor.has_resources(cpu_only.resources))
|
||||
self.trial_executor.start_trial(cpu_only)
|
||||
|
||||
gpu_only = create_trial(0, 1)
|
||||
self.assertTrue(self.trial_executor.has_resources(gpu_only.resources))
|
||||
|
||||
def testHeadBlocking(self):
|
||||
def create_trial(cpu, gpu=0):
|
||||
return Trial("__fake", resources=Resources(cpu=cpu, gpu=gpu))
|
||||
|
||||
gpu_trial = create_trial(1, 1)
|
||||
self.assertTrue(self.trial_executor.has_resources(gpu_trial.resources))
|
||||
self.trial_executor.start_trial(gpu_trial)
|
||||
|
||||
# TODO(rliaw): This behavior is probably undesirable, but right now
|
||||
# trials with different resource requirements is not often used.
|
||||
cpu_only_trial = create_trial(1, 0)
|
||||
self.assertFalse(
|
||||
self.trial_executor.has_resources(cpu_only_trial.resources))
|
||||
|
||||
self.cluster.add_node(num_cpus=1, num_gpus=1)
|
||||
self.cluster.wait_for_nodes()
|
||||
|
||||
self.assertTrue(
|
||||
self.trial_executor.has_resources(cpu_only_trial.resources))
|
||||
self.trial_executor.start_trial(cpu_only_trial)
|
||||
|
||||
cpu_only_trial2 = create_trial(1, 0)
|
||||
self.assertTrue(
|
||||
self.trial_executor.has_resources(cpu_only_trial2.resources))
|
||||
self.trial_executor.start_trial(cpu_only_trial2)
|
||||
|
||||
cpu_only_trial3 = create_trial(1, 0)
|
||||
self.assertFalse(
|
||||
self.trial_executor.has_resources(cpu_only_trial3.resources))
|
||||
|
||||
|
||||
class LocalModeExecutorTest(RayTrialExecutorTest):
|
||||
def setUp(self):
|
||||
self.trial_executor = RayTrialExecutor(queue_trials=False)
|
||||
|
||||
@@ -270,9 +270,6 @@ class TrainableFunctionApiTest(unittest.TestCase):
|
||||
self.assertEqual(f(1, 0, True).status, Trial.TERMINATED)
|
||||
self.assertEqual(f(1, 0, True).status, Trial.TERMINATED)
|
||||
|
||||
# Infeasible even with queueing enabled (no gpus)
|
||||
self.assertRaises(TuneError, lambda: f(1, 1, True))
|
||||
|
||||
# Too large resource request
|
||||
self.assertRaises(TuneError, lambda: f(100, 100, False))
|
||||
self.assertRaises(TuneError, lambda: f(0, 100, False))
|
||||
|
||||
Reference in New Issue
Block a user