diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst index 0b80f0d8e..d8719712b 100644 --- a/doc/source/cluster/autoscaling.rst +++ b/doc/source/cluster/autoscaling.rst @@ -72,6 +72,7 @@ An example of configuring multiple node types is as follows `(full example) (List[ResourceDict], Dict[NodeType, int]): - """Returns node resource list and node type counts.""" + """Returns node resource list and node type counts. + + Counts the running nodes, pending nodes. + Args: + nodes: Existing nodes. + pending_nodes: Pending nodes. + Returns: + node_resources: a list of running + pending resources. + E.g., [{"CPU": 4}, {"GPU": 2}]. + node_type_counts: running + pending workers per node type. + """ node_resources = [] node_type_counts = collections.defaultdict(int) @@ -140,6 +157,40 @@ class ResourceDemandScheduler: return out +def _add_min_workers_nodes( + node_resources: List[ResourceDict], + node_type_counts: Dict[NodeType, int], + node_types: Dict[NodeType, NodeTypeConfigDict], +) -> (List[ResourceDict], Dict[NodeType, int], List[Tuple[NodeType, int]]): + """Updates resource demands to respect the min_workers constraint. + + Args: + node_resources: Resources of exisiting nodes already launched/pending. + node_type_counts: Counts of existing nodes already launched/pending. + node_types: Node types config. + + Returns: + node_resources: The updated node resources after adding min_workers + constraint per node type. + node_type_counts: The updated node counts after adding min_workers + constraint per node type. + total_nodes_to_add: The nodes to add to respect min_workers constraint. + """ + total_nodes_to_add_dict = {} + for node_type, config in node_types.items(): + existing = node_type_counts.get(node_type, 0) + target = config.get("min_workers", 0) + if existing < target: + total_nodes_to_add_dict[node_type] = target - existing + node_type_counts[node_type] = target + available = copy.deepcopy(node_types[node_type]["resources"]) + node_resources.extend( + [available] * total_nodes_to_add_dict[node_type]) + + return node_resources, node_type_counts, list( + total_nodes_to_add_dict.items()) + + def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], existing_nodes: Dict[NodeType, int], max_to_add: int, resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]: diff --git a/python/ray/autoscaler/aws/example-multi-node-type.yaml b/python/ray/autoscaler/aws/example-multi-node-type.yaml index 008337de2..a77aeacd0 100644 --- a/python/ray/autoscaler/aws/example-multi-node-type.yaml +++ b/python/ray/autoscaler/aws/example-multi-node-type.yaml @@ -17,6 +17,7 @@ available_node_types: node_config: InstanceType: m4.xlarge resources: {"CPU": 4} + min_workers: 1 max_workers: 5 cpu_16_spot: node_config: diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 0546ba20d..172936341 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -291,6 +291,7 @@ "type": "object", "description": "Provider-specific config for the node, e.g. instance type." }, + "min_workers": {"type": "integer"}, "max_workers": {"type": "integer"}, "resources": { "type": "object", diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index c3b25fd51..3555a169a 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -4,6 +4,7 @@ import yaml import tempfile import shutil import unittest +import copy import ray from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ @@ -13,9 +14,10 @@ from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.commands import get_or_create_head_node from ray.autoscaler._private.resource_demand_scheduler import \ - _utilization_score, \ + _utilization_score, _add_min_workers_nodes, \ get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \ + NODE_KIND_WORKER from ray.test_utils import same_elements from time import sleep @@ -164,6 +166,116 @@ def test_get_nodes_respects_max_limit(): }] * 10) == [("m4.large", 2)] +def test_add_min_workers_nodes(): + types = { + "m2.large": { + "resources": { + "CPU": 2 + }, + "min_workers": 50, + "max_workers": 100, + }, + "m4.large": { + "resources": { + "CPU": 2 + }, + "min_workers": 0, + "max_workers": 10, + }, + "gpu": { + "resources": { + "GPU": 1 + }, + "min_workers": 99999, + "max_workers": 99999, + }, + } + assert _add_min_workers_nodes([], + {}, + types) == \ + ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, + [("m2.large", 50), ("gpu", 99999)]) + + assert _add_min_workers_nodes([{"CPU": 2}]*5, + {"m2.large": 5}, + types) == \ + ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, + [("m2.large", 45), ("gpu", 99999)]) + + assert _add_min_workers_nodes([{"CPU": 2}]*60, + {"m2.large": 60}, + types) == \ + ([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999}, + [("gpu", 99999)]) + + assert _add_min_workers_nodes([{ + "CPU": 2 + }] * 50 + [{ + "GPU": 1 + }] * 99999, { + "m2.large": 50, + "gpu": 99999 + }, types) == ([{ + "CPU": 2 + }] * 50 + [{ + "GPU": 1 + }] * 99999, { + "m2.large": 50, + "gpu": 99999 + }, []) + + +def test_get_nodes_to_launch_with_min_workers(): + provider = MockProvider() + new_types = copy.deepcopy(TYPES_A) + new_types["p2.8xlarge"]["min_workers"] = 2 + scheduler = ResourceDemandScheduler(provider, new_types, 3) + + provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) + + nodes = provider.non_terminated_nodes({}) + + ips = provider.non_terminated_node_ips({}) + utilizations = {ip: {"GPU": 8} for ip in ips} + + to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{ + "GPU": 8 + }], utilizations) + assert to_launch == [("p2.8xlarge", 1)] + + +def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): + provider = MockProvider() + new_types = copy.deepcopy(TYPES_A) + new_types["p2.8xlarge"]["min_workers"] = 2 + scheduler = ResourceDemandScheduler(provider, new_types, 10) + + provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) + + nodes = provider.non_terminated_nodes({}) + + ips = provider.non_terminated_node_ips({}) + # 1 free p2.8xls + utilizations = {ip: {"GPU": 8} for ip in ips} + # 1 more on the way + pending_nodes = {"p2.8xlarge": 1} + # requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge + demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}] + to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, + utilizations) + assert to_launch == [("p2.xlarge", 1)] + + # 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand. + # 2 p2.8xlarge are running/pending. So we need 1 more p2.8xlarge only to + # meet the min_workers constraint and the demand. + new_types["p2.8xlarge"]["min_workers"] = 3 + scheduler = ResourceDemandScheduler(provider, new_types, 10) + to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, + utilizations) + # Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)] + assert to_launch == [("p2.8xlarge", 1)] + + def test_get_nodes_to_launch_limits(): provider = MockProvider() scheduler = ResourceDemandScheduler(provider, TYPES_A, 3) @@ -299,6 +411,58 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) + def testScaleUpMinWorkers(self): + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["min_workers"] = 2 + config["max_workers"] = 50 + config["idle_timeout_minutes"] = 1 + # Since config["min_workers"] > 1, the remaining worker is started + # with the default worker node type. + config["available_node_types"]["p2.8xlarge"]["min_workers"] = 1 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(2) + assert len(self.provider.mock_nodes) == 2 + assert { + self.provider.mock_nodes[0].node_type, + self.provider.mock_nodes[1].node_type + } == {"p2.8xlarge", "m4.large"} + self.provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + }, 2) + self.provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "m4.16xlarge", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + }, 2) + assert len(self.provider.non_terminated_nodes({})) == 6 + # Make sure that after idle_timeout_minutes we don't kill idle + # min workers. + for node_id in self.provider.non_terminated_nodes({}): + lm.last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60 + autoscaler.update() + self.waitForNodes(2) + + cnt = 0 + for id in self.provider.mock_nodes: + if self.provider.mock_nodes[id].state == "running" or \ + self.provider.mock_nodes[id].state == "pending": + assert self.provider.mock_nodes[id].node_type in { + "p2.8xlarge", "m4.large" + } + cnt += 1 + assert cnt == 2 + def testScaleUpIgnoreUsed(self): config = MULTI_WORKER_CLUSTER.copy() # Commenting out this line causes the test case to fail?!?!