mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 04:03:14 +08:00
[autoscaler] Fixes max_workers bug. (#13008)
This commit is contained in:
@@ -192,7 +192,8 @@ class ResourceDemandScheduler:
|
||||
# Add 1 to account for the head node.
|
||||
max_to_add = self.max_workers + 1 - sum(node_type_counts.values())
|
||||
nodes_to_add_based_on_demand = get_nodes_for(
|
||||
self.node_types, node_type_counts, max_to_add, unfulfilled)
|
||||
self.node_types, node_type_counts, self.head_node_type, max_to_add,
|
||||
unfulfilled)
|
||||
# Merge nodes to add based on demand and nodes to add based on
|
||||
# min_workers constraint. We add them because nodes to add based on
|
||||
# demand was calculated after the min_workers constraint was respected.
|
||||
@@ -447,6 +448,7 @@ class ResourceDemandScheduler:
|
||||
to_launch = get_nodes_for(
|
||||
self.node_types,
|
||||
node_type_counts,
|
||||
self.head_node_type,
|
||||
max_to_add,
|
||||
unfulfilled,
|
||||
strict_spread=True)
|
||||
@@ -544,7 +546,7 @@ def _add_min_workers_nodes(
|
||||
max_node_resources, ensure_min_cluster_size)
|
||||
# Get the nodes to meet the unfulfilled.
|
||||
nodes_to_add_request_resources = get_nodes_for(
|
||||
node_types, node_type_counts, max_to_add,
|
||||
node_types, node_type_counts, head_node_type, max_to_add,
|
||||
resource_requests_unfulfilled)
|
||||
# Update the resources, counts and total nodes to add.
|
||||
for node_type in nodes_to_add_request_resources:
|
||||
@@ -565,6 +567,7 @@ def _add_min_workers_nodes(
|
||||
|
||||
def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
existing_nodes: Dict[NodeType, int],
|
||||
head_node_type: NodeType,
|
||||
max_to_add: int,
|
||||
resources: List[ResourceDict],
|
||||
strict_spread: bool = False) -> Dict[NodeType, int]:
|
||||
@@ -588,9 +591,13 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
while resources and sum(nodes_to_add.values()) < max_to_add:
|
||||
utilization_scores = []
|
||||
for node_type in node_types:
|
||||
max_workers_of_node_type = node_types[node_type].get(
|
||||
"max_workers", 0)
|
||||
if head_node_type == node_type:
|
||||
# Add 1 to account for head node.
|
||||
max_workers_of_node_type = max_workers_of_node_type + 1
|
||||
if (existing_nodes.get(node_type, 0) + nodes_to_add.get(
|
||||
node_type, 0) >= node_types[node_type].get(
|
||||
"max_workers", 0)):
|
||||
node_type, 0) >= max_workers_of_node_type):
|
||||
continue
|
||||
node_resources = node_types[node_type]["resources"]
|
||||
if strict_spread:
|
||||
|
||||
@@ -143,43 +143,100 @@ def test_bin_pack():
|
||||
|
||||
|
||||
def test_get_nodes_packing_heuristic():
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \
|
||||
{"p2.8xlarge": 1}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \
|
||||
{"p2.8xlarge": 1}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \
|
||||
{"p2.xlarge": 4}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \
|
||||
== {"p2.8xlarge": 3}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \
|
||||
== {}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \
|
||||
{"m4.16xlarge": 3}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \
|
||||
== {"m4.16xlarge": 1, "m4.large": 1}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"GPU": 8
|
||||
}]) == {
|
||||
"p2.8xlarge": 1
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"GPU": 1
|
||||
}] * 6) == {
|
||||
"p2.8xlarge": 1
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"GPU": 1
|
||||
}] * 4) == {
|
||||
"p2.xlarge": 4
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 32,
|
||||
"GPU": 1
|
||||
}] * 3) == {
|
||||
"p2.8xlarge": 3
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 64,
|
||||
"GPU": 1
|
||||
}] * 3) == {}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 64
|
||||
}] * 3) == {
|
||||
"m4.16xlarge": 3
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 64
|
||||
}, {
|
||||
"CPU": 1
|
||||
}]) == {
|
||||
"m4.16xlarge": 1,
|
||||
"m4.large": 1
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 64
|
||||
}, {
|
||||
"CPU": 9
|
||||
}, {
|
||||
"CPU": 9
|
||||
}]) == {
|
||||
"m4.16xlarge": 1,
|
||||
"m4.4xlarge": 2
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 16
|
||||
}] * 5) == {
|
||||
"m4.16xlarge": 1,
|
||||
"m4.4xlarge": 1
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 8
|
||||
}] * 10) == {
|
||||
"m4.16xlarge": 1,
|
||||
"m4.4xlarge": 1
|
||||
}
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"CPU": 1
|
||||
}] * 100) == {
|
||||
"m4.16xlarge": 1,
|
||||
"m4.4xlarge": 2,
|
||||
"m4.large": 2
|
||||
}
|
||||
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{
|
||||
"GPU": 1
|
||||
}] + ([{
|
||||
"CPU": 1
|
||||
}] * 64)) == {
|
||||
"m4.16xlarge": 1,
|
||||
"p2.xlarge": 1
|
||||
}
|
||||
|
||||
assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, ([{
|
||||
"GPU": 1
|
||||
}] * 8) + ([{
|
||||
"CPU": 1
|
||||
}] * 64)) == {
|
||||
"m4.16xlarge": 1,
|
||||
"p2.8xlarge": 1
|
||||
}
|
||||
|
||||
assert get_nodes_for(
|
||||
TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \
|
||||
{"m4.16xlarge": 1, "m4.4xlarge": 2}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \
|
||||
{"m4.16xlarge": 1, "m4.4xlarge": 1}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \
|
||||
{"m4.16xlarge": 1, "m4.4xlarge": 1}
|
||||
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \
|
||||
{"m4.16xlarge": 1, "m4.4xlarge": 2, "m4.large": 2}
|
||||
assert get_nodes_for(
|
||||
TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \
|
||||
{"m4.16xlarge": 1, "p2.xlarge": 1}
|
||||
assert get_nodes_for(
|
||||
TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \
|
||||
{"m4.16xlarge": 1, "p2.8xlarge": 1}
|
||||
assert get_nodes_for(
|
||||
TYPES_A, {}, 9999, [{
|
||||
TYPES_A, {}, "empty_node", 9999, [{
|
||||
"GPU": 1
|
||||
}] * 8, strict_spread=False) == {
|
||||
"p2.8xlarge": 1
|
||||
}
|
||||
assert get_nodes_for(
|
||||
TYPES_A, {}, 9999, [{
|
||||
TYPES_A, {}, "empty_node", 9999, [{
|
||||
"GPU": 1
|
||||
}] * 8, strict_spread=True) == {
|
||||
"p2.xlarge": 8
|
||||
@@ -201,22 +258,22 @@ def test_get_nodes_respects_max_limit():
|
||||
"max_workers": 99999,
|
||||
},
|
||||
}
|
||||
assert get_nodes_for(types, {}, 2, [{"CPU": 1}] * 10) == \
|
||||
assert get_nodes_for(types, {}, "empty_node", 2, [{"CPU": 1}] * 10) == \
|
||||
{"m4.large": 2}
|
||||
assert get_nodes_for(types, {"m4.large": 9999}, 9999, [{
|
||||
assert get_nodes_for(types, {"m4.large": 9999}, "empty_node", 9999, [{
|
||||
"CPU": 1
|
||||
}] * 10) == {}
|
||||
assert get_nodes_for(types, {"m4.large": 0}, 9999, [{
|
||||
assert get_nodes_for(types, {"m4.large": 0}, "empty_node", 9999, [{
|
||||
"CPU": 1
|
||||
}] * 10) == {
|
||||
"m4.large": 5
|
||||
}
|
||||
assert get_nodes_for(types, {"m4.large": 7}, 4, [{
|
||||
assert get_nodes_for(types, {"m4.large": 7}, "m4.large", 4, [{
|
||||
"CPU": 1
|
||||
}] * 10) == {
|
||||
"m4.large": 3
|
||||
"m4.large": 4
|
||||
}
|
||||
assert get_nodes_for(types, {"m4.large": 7}, 2, [{
|
||||
assert get_nodes_for(types, {"m4.large": 7}, "m4.large", 2, [{
|
||||
"CPU": 1
|
||||
}] * 10) == {
|
||||
"m4.large": 2
|
||||
@@ -1355,6 +1412,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)])
|
||||
self.provider.create_node({}, {
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
|
||||
TAG_RAY_USER_NODE_TYPE: "empty_node"
|
||||
@@ -1379,6 +1437,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.request_resources([{"CPU": 32}] * 4)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(5)
|
||||
|
||||
assert self.provider.mock_nodes[3].node_type == "m4.16xlarge"
|
||||
assert self.provider.mock_nodes[4].node_type == "m4.16xlarge"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user