[autoscaler] Also account for head node resources in multi node type autoscaling (#10230)

This commit is contained in:
Eric Liang
2020-08-24 10:26:22 -07:00
committed by GitHub
parent f051c2852e
commit 4761eacc3e
2 changed files with 51 additions and 9 deletions
+4 -4
View File
@@ -198,12 +198,12 @@ class StandardAutoscaler:
# First let the resource demand scheduler launch nodes, if enabled.
if self.resource_demand_scheduler and self.resource_demand_vector:
# TODO(ekl) include head node in the node list
instances = (self.resource_demand_scheduler.get_nodes_to_launch(
nodes, self.pending_launches.breakdown(),
to_launch = (self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.resource_demand_vector))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in instances:
for node_type, count in to_launch:
self.launch_new_node(count, node_type=node_type)
# Launch additional nodes of the default type, if still needed.
@@ -12,17 +12,22 @@ from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.node_provider import NODE_PROVIDERS
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
get_bin_pack_residual, get_nodes_for
from time import sleep
TYPES_A = {
"m4.large": {
"empty_node": {
"node_config": {
"FooProperty": 42,
},
"resources": {},
"max_workers": 0,
},
"m4.large": {
"node_config": {},
"resources": {
"CPU": 2
},
@@ -63,7 +68,7 @@ TYPES_A = {
MULTI_WORKER_CLUSTER = dict(
SMALL_CLUSTER, **{
"available_node_types": TYPES_A,
"head_node_type": "m4.large",
"head_node_type": "empty_node",
"worker_default_node_type": "m4.large",
})
@@ -211,14 +216,14 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_has_call("1.2.3.4", "init_cmd")
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
runner.assert_has_call("1.2.3.4", "start_ray_head")
self.assertEqual(self.provider.mock_nodes[0].node_type, "m4.large")
self.assertEqual(self.provider.mock_nodes[0].node_type, "empty_node")
self.assertEqual(
self.provider.mock_nodes[0].node_config.get("FooProperty"), 42)
self.assertEqual(
self.provider.mock_nodes[0].node_config.get("TestProp"), 1)
self.assertEqual(
self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE),
"m4.large")
"empty_node")
def testScaleUpMinSanity(self):
config_path = self.write_config(MULTI_WORKER_CLUSTER)
@@ -236,6 +241,43 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
def testRequestBundlesAccountsForHeadNode(self):
config = MULTI_WORKER_CLUSTER.copy()
config["head_node_type"] = "p2.8xlarge"
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: "head"
}, 1)
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 1
# These requests fit on the head node.
autoscaler.update()
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(1)
assert len(self.provider.mock_nodes) == 1
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(1)
# This request requires an additional worker node.
autoscaler.request_resources([{"GPU": 8}] * 2)
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
def testRequestBundles(self):
config = MULTI_WORKER_CLUSTER.copy()
config["min_workers"] = 0