[autoscaler] resource demand scheduler get_nodes_to_launch should return Dict not List (#11118)

This commit is contained in:
Ameer Haj Ali
2020-09-29 15:32:56 -07:00
committed by GitHub
parent b8f344f695
commit f17ad5d060
3 changed files with 52 additions and 43 deletions
+1 -1
View File
@@ -216,7 +216,7 @@ class StandardAutoscaler:
resource_demand_vector,
self.load_metrics.get_resource_utilization()))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch:
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
num_pending = self.pending_launches.value
@@ -11,7 +11,7 @@ import copy
import numpy as np
import logging
import collections
from typing import List, Dict, Tuple
from typing import List, Dict
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED
@@ -39,11 +39,10 @@ class ResourceDemandScheduler:
self.node_types = node_types
self.max_workers = max_workers
def get_nodes_to_launch(self, nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict],
usage_by_ip: Dict[str, ResourceDict]
) -> List[Tuple[NodeType, int]]:
def get_nodes_to_launch(
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict],
usage_by_ip: Dict[str, ResourceDict]) -> Dict[NodeType, int]:
"""Given resource demands, return node types to add to the cluster.
This method:
@@ -68,7 +67,7 @@ class ResourceDemandScheduler:
_add_min_workers_nodes(
node_resources, node_type_counts, self.node_types)
nodes_to_add_based_on_demand = []
nodes_to_add_based_on_demand = {}
if resource_demands is not None:
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
@@ -82,8 +81,13 @@ class ResourceDemandScheduler:
# Merge nodes to add based on demand and nodes to add based on
# min_workers constraint. We add them because nodes to add based on
# demand was calculated after the min_workers constraint was respected.
total_nodes_to_add = nodes_to_add_based_on_demand + \
min_workers_nodes_to_add
total_nodes_to_add = {}
for node_type in self.node_types:
nodes_to_add = nodes_to_add_based_on_demand.get(node_type, 0) + \
min_workers_nodes_to_add.get(node_type, 0)
if nodes_to_add > 0:
total_nodes_to_add[node_type] = nodes_to_add
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
@@ -161,7 +165,7 @@ def _add_min_workers_nodes(
node_resources: List[ResourceDict],
node_type_counts: Dict[NodeType, int],
node_types: Dict[NodeType, NodeTypeConfigDict],
) -> (List[ResourceDict], Dict[NodeType, int], List[Tuple[NodeType, int]]):
) -> (List[ResourceDict], Dict[NodeType, int], Dict[NodeType, int]):
"""Updates resource demands to respect the min_workers constraint.
Args:
@@ -187,13 +191,12 @@ def _add_min_workers_nodes(
node_resources.extend(
[available] * total_nodes_to_add_dict[node_type])
return node_resources, node_type_counts, list(
total_nodes_to_add_dict.items())
return node_resources, node_type_counts, total_nodes_to_add_dict
def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
existing_nodes: Dict[NodeType, int], max_to_add: int,
resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]:
resources: List[ResourceDict]) -> Dict[NodeType, int]:
"""Determine nodes to add given resource demands and constraints.
Args:
@@ -204,7 +207,7 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
resources: resource demands to fulfill.
Returns:
List of nodes types and count to add.
Dict of count to add for each node type.
"""
nodes_to_add = collections.defaultdict(int)
allocated_resources = []
@@ -234,7 +237,7 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
assert len(residual) < len(resources), (resources, residual)
resources = residual
return list(nodes_to_add.items())
return nodes_to_add
def _utilization_score(node_resources: ResourceDict,
@@ -105,34 +105,34 @@ def test_bin_pack():
def test_get_nodes_packing_heuristic():
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \
[("p2.8xlarge", 1)]
{"p2.8xlarge": 1}
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \
[("p2.8xlarge", 1)]
{"p2.8xlarge": 1}
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \
[("p2.xlarge", 4)]
{"p2.xlarge": 4}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \
== [("p2.8xlarge", 3)]
== {"p2.8xlarge": 3}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \
== []
== {}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \
[("m4.16xlarge", 3)]
{"m4.16xlarge": 3}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \
== [("m4.16xlarge", 1), ("m4.large", 1)]
== {"m4.16xlarge": 1, "m4.large": 1}
assert get_nodes_for(
TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 2)]
{"m4.16xlarge": 1, "m4.4xlarge": 2}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 1)]
{"m4.16xlarge": 1, "m4.4xlarge": 1}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 1)]
{"m4.16xlarge": 1, "m4.4xlarge": 1}
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 2), ("m4.large", 2)]
{"m4.16xlarge": 1, "m4.4xlarge": 2, "m4.large": 2}
assert get_nodes_for(
TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \
[("m4.16xlarge", 1), ("p2.xlarge", 1)]
{"m4.16xlarge": 1, "p2.xlarge": 1}
assert get_nodes_for(
TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \
[("m4.16xlarge", 1), ("p2.8xlarge", 1)]
{"m4.16xlarge": 1, "p2.8xlarge": 1}
def test_get_nodes_respects_max_limit():
@@ -151,19 +151,25 @@ def test_get_nodes_respects_max_limit():
},
}
assert get_nodes_for(types, {}, 2, [{"CPU": 1}] * 10) == \
[("m4.large", 2)]
{"m4.large": 2}
assert get_nodes_for(types, {"m4.large": 9999}, 9999, [{
"CPU": 1
}] * 10) == []
}] * 10) == {}
assert get_nodes_for(types, {"m4.large": 0}, 9999, [{
"CPU": 1
}] * 10) == [("m4.large", 5)]
}] * 10) == {
"m4.large": 5
}
assert get_nodes_for(types, {"m4.large": 7}, 4, [{
"CPU": 1
}] * 10) == [("m4.large", 3)]
}] * 10) == {
"m4.large": 3
}
assert get_nodes_for(types, {"m4.large": 7}, 2, [{
"CPU": 1
}] * 10) == [("m4.large", 2)]
}] * 10) == {
"m4.large": 2
}
def test_add_min_workers_nodes():
@@ -194,19 +200,19 @@ def test_add_min_workers_nodes():
{},
types) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
[("m2.large", 50), ("gpu", 99999)])
{"m2.large": 50, "gpu": 99999})
assert _add_min_workers_nodes([{"CPU": 2}]*5,
{"m2.large": 5},
types) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
[("m2.large", 45), ("gpu", 99999)])
{"m2.large": 45, "gpu": 99999})
assert _add_min_workers_nodes([{"CPU": 2}]*60,
{"m2.large": 60},
types) == \
([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999},
[("gpu", 99999)])
{"gpu": 99999})
assert _add_min_workers_nodes([{
"CPU": 2
@@ -222,7 +228,7 @@ def test_add_min_workers_nodes():
}] * 99999, {
"m2.large": 50,
"gpu": 99999
}, [])
}, {})
def test_get_nodes_to_launch_with_min_workers():
@@ -241,7 +247,7 @@ def test_get_nodes_to_launch_with_min_workers():
to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{
"GPU": 8
}], utilizations)
assert to_launch == [("p2.8xlarge", 1)]
assert to_launch == {"p2.8xlarge": 1}
def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
@@ -263,7 +269,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}]
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
assert to_launch == [("p2.xlarge", 1)]
assert to_launch == {"p2.xlarge": 1}
# 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand.
# 2 p2.8xlarge are running/pending. So we need 1 more p2.8xlarge only to
@@ -273,7 +279,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
# Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)]
assert to_launch == [("p2.8xlarge", 1)]
assert to_launch == {"p2.8xlarge": 1}
def test_get_nodes_to_launch_limits():
@@ -290,7 +296,7 @@ def test_get_nodes_to_launch_limits():
to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{
"GPU": 8
}] * 2, utilizations)
assert to_launch == []
assert to_launch == {}
def test_calculate_node_resources():
@@ -311,7 +317,7 @@ def test_calculate_node_resources():
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
assert to_launch == [("p2.8xlarge", 1)]
assert to_launch == {"p2.8xlarge": 1}
class LoadMetricsTest(unittest.TestCase):