From f17ad5d060fd167509f703deb02f282427fc56ad Mon Sep 17 00:00:00 2001 From: Ameer Haj Ali Date: Tue, 29 Sep 2020 15:32:56 -0700 Subject: [PATCH] [autoscaler] resource demand scheduler get_nodes_to_launch should return Dict not List (#11118) --- python/ray/autoscaler/_private/autoscaler.py | 2 +- .../_private/resource_demand_scheduler.py | 33 +++++----- .../tests/test_resource_demand_scheduler.py | 60 ++++++++++--------- 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 3bca70ffa..56996a2f4 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -216,7 +216,7 @@ class StandardAutoscaler: resource_demand_vector, self.load_metrics.get_resource_utilization())) # TODO(ekl) also enforce max launch concurrency here? - for node_type, count in to_launch: + for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) num_pending = self.pending_launches.value diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 346a5192c..cd7839b40 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -11,7 +11,7 @@ import copy import numpy as np import logging import collections -from typing import List, Dict, Tuple +from typing import List, Dict from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED @@ -39,11 +39,10 @@ class ResourceDemandScheduler: self.node_types = node_types self.max_workers = max_workers - def get_nodes_to_launch(self, nodes: List[NodeID], - pending_nodes: Dict[NodeType, int], - resource_demands: List[ResourceDict], - usage_by_ip: Dict[str, ResourceDict] - ) -> List[Tuple[NodeType, int]]: + def get_nodes_to_launch( + self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], + resource_demands: List[ResourceDict], + usage_by_ip: Dict[str, ResourceDict]) -> Dict[NodeType, int]: """Given resource demands, return node types to add to the cluster. This method: @@ -68,7 +67,7 @@ class ResourceDemandScheduler: _add_min_workers_nodes( node_resources, node_type_counts, self.node_types) - nodes_to_add_based_on_demand = [] + nodes_to_add_based_on_demand = {} if resource_demands is not None: logger.info("Cluster resources: {}".format(node_resources)) logger.info("Node counts: {}".format(node_type_counts)) @@ -82,8 +81,13 @@ class ResourceDemandScheduler: # Merge nodes to add based on demand and nodes to add based on # min_workers constraint. We add them because nodes to add based on # demand was calculated after the min_workers constraint was respected. - total_nodes_to_add = nodes_to_add_based_on_demand + \ - min_workers_nodes_to_add + total_nodes_to_add = {} + for node_type in self.node_types: + nodes_to_add = nodes_to_add_based_on_demand.get(node_type, 0) + \ + min_workers_nodes_to_add.get(node_type, 0) + if nodes_to_add > 0: + total_nodes_to_add[node_type] = nodes_to_add + logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add @@ -161,7 +165,7 @@ def _add_min_workers_nodes( node_resources: List[ResourceDict], node_type_counts: Dict[NodeType, int], node_types: Dict[NodeType, NodeTypeConfigDict], -) -> (List[ResourceDict], Dict[NodeType, int], List[Tuple[NodeType, int]]): +) -> (List[ResourceDict], Dict[NodeType, int], Dict[NodeType, int]): """Updates resource demands to respect the min_workers constraint. Args: @@ -187,13 +191,12 @@ def _add_min_workers_nodes( node_resources.extend( [available] * total_nodes_to_add_dict[node_type]) - return node_resources, node_type_counts, list( - total_nodes_to_add_dict.items()) + return node_resources, node_type_counts, total_nodes_to_add_dict def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], existing_nodes: Dict[NodeType, int], max_to_add: int, - resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]: + resources: List[ResourceDict]) -> Dict[NodeType, int]: """Determine nodes to add given resource demands and constraints. Args: @@ -204,7 +207,7 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], resources: resource demands to fulfill. Returns: - List of nodes types and count to add. + Dict of count to add for each node type. """ nodes_to_add = collections.defaultdict(int) allocated_resources = [] @@ -234,7 +237,7 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], assert len(residual) < len(resources), (resources, residual) resources = residual - return list(nodes_to_add.items()) + return nodes_to_add def _utilization_score(node_resources: ResourceDict, diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 3555a169a..17569defd 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -105,34 +105,34 @@ def test_bin_pack(): def test_get_nodes_packing_heuristic(): assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \ - [("p2.8xlarge", 1)] + {"p2.8xlarge": 1} assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \ - [("p2.8xlarge", 1)] + {"p2.8xlarge": 1} assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \ - [("p2.xlarge", 4)] + {"p2.xlarge": 4} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \ - == [("p2.8xlarge", 3)] + == {"p2.8xlarge": 3} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \ - == [] + == {} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \ - [("m4.16xlarge", 3)] + {"m4.16xlarge": 3} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \ - == [("m4.16xlarge", 1), ("m4.large", 1)] + == {"m4.16xlarge": 1, "m4.large": 1} assert get_nodes_for( TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \ - [("m4.16xlarge", 1), ("m4.4xlarge", 2)] + {"m4.16xlarge": 1, "m4.4xlarge": 2} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \ - [("m4.16xlarge", 1), ("m4.4xlarge", 1)] + {"m4.16xlarge": 1, "m4.4xlarge": 1} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \ - [("m4.16xlarge", 1), ("m4.4xlarge", 1)] + {"m4.16xlarge": 1, "m4.4xlarge": 1} assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \ - [("m4.16xlarge", 1), ("m4.4xlarge", 2), ("m4.large", 2)] + {"m4.16xlarge": 1, "m4.4xlarge": 2, "m4.large": 2} assert get_nodes_for( TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \ - [("m4.16xlarge", 1), ("p2.xlarge", 1)] + {"m4.16xlarge": 1, "p2.xlarge": 1} assert get_nodes_for( TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \ - [("m4.16xlarge", 1), ("p2.8xlarge", 1)] + {"m4.16xlarge": 1, "p2.8xlarge": 1} def test_get_nodes_respects_max_limit(): @@ -151,19 +151,25 @@ def test_get_nodes_respects_max_limit(): }, } assert get_nodes_for(types, {}, 2, [{"CPU": 1}] * 10) == \ - [("m4.large", 2)] + {"m4.large": 2} assert get_nodes_for(types, {"m4.large": 9999}, 9999, [{ "CPU": 1 - }] * 10) == [] + }] * 10) == {} assert get_nodes_for(types, {"m4.large": 0}, 9999, [{ "CPU": 1 - }] * 10) == [("m4.large", 5)] + }] * 10) == { + "m4.large": 5 + } assert get_nodes_for(types, {"m4.large": 7}, 4, [{ "CPU": 1 - }] * 10) == [("m4.large", 3)] + }] * 10) == { + "m4.large": 3 + } assert get_nodes_for(types, {"m4.large": 7}, 2, [{ "CPU": 1 - }] * 10) == [("m4.large", 2)] + }] * 10) == { + "m4.large": 2 + } def test_add_min_workers_nodes(): @@ -194,19 +200,19 @@ def test_add_min_workers_nodes(): {}, types) == \ ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, - [("m2.large", 50), ("gpu", 99999)]) + {"m2.large": 50, "gpu": 99999}) assert _add_min_workers_nodes([{"CPU": 2}]*5, {"m2.large": 5}, types) == \ ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, - [("m2.large", 45), ("gpu", 99999)]) + {"m2.large": 45, "gpu": 99999}) assert _add_min_workers_nodes([{"CPU": 2}]*60, {"m2.large": 60}, types) == \ ([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999}, - [("gpu", 99999)]) + {"gpu": 99999}) assert _add_min_workers_nodes([{ "CPU": 2 @@ -222,7 +228,7 @@ def test_add_min_workers_nodes(): }] * 99999, { "m2.large": 50, "gpu": 99999 - }, []) + }, {}) def test_get_nodes_to_launch_with_min_workers(): @@ -241,7 +247,7 @@ def test_get_nodes_to_launch_with_min_workers(): to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{ "GPU": 8 }], utilizations) - assert to_launch == [("p2.8xlarge", 1)] + assert to_launch == {"p2.8xlarge": 1} def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): @@ -263,7 +269,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}] to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, utilizations) - assert to_launch == [("p2.xlarge", 1)] + assert to_launch == {"p2.xlarge": 1} # 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand. # 2 p2.8xlarge are running/pending. So we need 1 more p2.8xlarge only to @@ -273,7 +279,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, utilizations) # Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)] - assert to_launch == [("p2.8xlarge", 1)] + assert to_launch == {"p2.8xlarge": 1} def test_get_nodes_to_launch_limits(): @@ -290,7 +296,7 @@ def test_get_nodes_to_launch_limits(): to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{ "GPU": 8 }] * 2, utilizations) - assert to_launch == [] + assert to_launch == {} def test_calculate_node_resources(): @@ -311,7 +317,7 @@ def test_calculate_node_resources(): to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, utilizations) - assert to_launch == [("p2.8xlarge", 1)] + assert to_launch == {"p2.8xlarge": 1} class LoadMetricsTest(unittest.TestCase):