From ebdf8ba3fa70256c6523183937a524a93a340e17 Mon Sep 17 00:00:00 2001 From: Ameer Haj Ali Date: Wed, 4 Nov 2020 22:05:48 +0200 Subject: [PATCH] [autoscaler] Support legacy cluster configs with the new resource demand scheduler (#11751) --- python/ray/autoscaler/_private/autoscaler.py | 9 +- .../ray/autoscaler/_private/load_metrics.py | 13 +- .../_private/resource_demand_scheduler.py | 89 +++++++++- python/ray/autoscaler/_private/util.py | 35 +++- python/ray/autoscaler/tags.py | 4 + python/ray/tests/test_autoscaler.py | 2 - python/ray/tests/test_autoscaler_yaml.py | 4 +- .../test_cli_patterns/test_ray_attach.txt | 2 + .../tests/test_cli_patterns/test_ray_exec.txt | 2 + .../test_cli_patterns/test_ray_submit.txt | 4 + .../tests/test_cli_patterns/test_ray_up.txt | 2 + .../test_cli_patterns/test_ray_up_config.yaml | 6 +- .../test_cli_patterns/test_ray_up_record.txt | 2 + .../tests/test_resource_demand_scheduler.py | 161 ++++++++++++++++-- 14 files changed, 300 insertions(+), 35 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 4d48cc7d6..1fb973bd1 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -218,7 +218,8 @@ class StandardAutoscaler: self.pending_launches.breakdown(), resource_demand_vector, self.load_metrics.get_resource_utilization(), - pending_placement_groups) + pending_placement_groups, + self.load_metrics.get_static_node_resources_by_ip()) for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) @@ -300,9 +301,9 @@ class StandardAutoscaler: if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] node_type_counts[node_type] += 1 - if node_type_counts[node_type] <= \ - self.available_node_types[node_type].get( - "min_workers", 0): + min_workers = self.available_node_types[node_type].get( + "min_workers", 0) + if node_type_counts[node_type] <= min_workers: return True return False diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 00368d1ea..777ee1073 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -6,6 +6,8 @@ import numpy as np import ray._private.services as services from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES from ray.gcs_utils import PlacementGroupTableData +from ray.autoscaler._private.resource_demand_scheduler import \ + NodeIP, ResourceDict logger = logging.getLogger(__name__) @@ -104,7 +106,7 @@ class LoadMetrics: return self._info()["NumNodesConnected"] def get_node_resources(self): - """Return a list of node resources (static resource sizes. + """Return a list of node resources (static resource sizes). Example: >>> metrics.get_node_resources() @@ -112,6 +114,15 @@ class LoadMetrics: """ return self.static_resources_by_ip.values() + def get_static_node_resources_by_ip(self) -> Dict[NodeIP, ResourceDict]: + """Return a dict of node resources for every node ip. + + Example: + >>> lm.get_static_node_resources_by_ip() + {127.0.0.1: {"CPU": 1}, 127.0.0.2: {"CPU": 4, "GPU": 8}} + """ + return self.static_resources_by_ip + def get_resource_utilization(self): return self.dynamic_resources_by_ip diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 865b2e215..15944c1d8 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -17,11 +17,13 @@ from typing import List, Dict from ray.autoscaler.node_provider import NodeProvider from ray.gcs_utils import PlacementGroupTableData from ray.core.generated.common_pb2 import PlacementStrategy -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED +from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, + NODE_TYPE_LEGACY_WORKER, NODE_KIND_WORKER, + NODE_TYPE_LEGACY_HEAD, TAG_RAY_NODE_KIND) logger = logging.getLogger(__name__) -# e.g., m4.16xlarge. +# e.g., cpu_4_ondemand. NodeType = str # e.g., {"resources": ..., "max_workers": ...}. @@ -42,14 +44,19 @@ class ResourceDemandScheduler: node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int): self.provider = provider - self.node_types = node_types + self.node_types = copy.deepcopy(node_types) self.max_workers = max_workers + # is_legacy_yaml tracks if the cluster configs was originally without + # available_node_types and was autofilled with available_node_types. + self.is_legacy_yaml = (NODE_TYPE_LEGACY_HEAD in node_types + and NODE_TYPE_LEGACY_WORKER in node_types) def get_nodes_to_launch( self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], resource_demands: List[ResourceDict], usage_by_ip: Dict[NodeIP, ResourceDict], - pending_placement_groups: List[PlacementGroupTableData] + pending_placement_groups: List[PlacementGroupTableData], + static_node_resources: Dict[NodeIP, ResourceDict] ) -> Dict[NodeType, int]: """Given resource demands, return node types to add to the cluster. @@ -68,7 +75,13 @@ class ResourceDemandScheduler: pending_nodes: Summary of node types currently being launched. resource_demands: Vector of resource demands from the scheduler. usage_by_ip: Mapping from ip to available resources. + pending_placement_groups: Placement group demands. + static_node_resources: Mapping from ip to static node resources. """ + if self.is_legacy_yaml: + # When using legacy yaml files we need to infer the head & worker + # node resources from the static node resources from LoadMetrics. + self._infer_legacy_node_resources_if_needed(static_node_resources) node_resources: List[ResourceDict] node_type_counts: Dict[NodeType, int] @@ -87,6 +100,13 @@ class ResourceDemandScheduler: placement_group_demand_vector, strict_spreads = \ placement_groups_to_resource_demands(pending_placement_groups) resource_demands.extend(placement_group_demand_vector) + + if self.is_legacy_yaml and \ + not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: + # Need to launch worker nodes to later infer their + # resources. + return self._legacy_worker_node_to_launch( + nodes, pending_nodes, node_resources, resource_demands) placement_group_nodes_to_add, node_resources, node_type_counts = \ self.reserve_and_allocate_spread( strict_spreads, node_resources, node_type_counts) @@ -118,6 +138,67 @@ class ResourceDemandScheduler: logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add + def _legacy_worker_node_to_launch( + self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], + node_resources: List[ResourceDict], + resource_demands: List[ResourceDict]) -> Dict[NodeType, int]: + """Get worker nodes to launch when resources missing in legacy yamls. + + If there is unfulfilled demand and we don't know the resources of the + workers, it returns max(1, min_workers) worker nodes from which we + later calculate the node resources. + """ + if self.max_workers == 0: + return {} + elif pending_nodes or len(nodes) > 1: + # If we are already launching a worker node. + # If first worker node fails this will never launch more nodes. + return {} + else: + unfulfilled, _ = get_bin_pack_residual(node_resources, + resource_demands) + if self.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] > 0 or \ + unfulfilled: + return { + NODE_TYPE_LEGACY_WORKER: max( + 1, self.node_types[NODE_TYPE_LEGACY_WORKER][ + "min_workers"]) + } + else: + return {} + + def _infer_legacy_node_resources_if_needed( + self, static_node_resources: Dict[NodeIP, ResourceDict] + ) -> (bool, Dict[NodeType, int]): + """Infers node resources for legacy config files. + + Updates the resources of the head and worker node types in + self.node_types. + Args: + static_node_resources: Mapping from ip to static node resources. + """ + # We fill the head node resources only once. + if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]: + assert len(static_node_resources) == 1 # Only the head node. + self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = next( + iter(static_node_resources.values())) + # We fill the worker node resources only once. + if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: + if len(static_node_resources) > 1: + # Set the node_types here as we already launched a worker node + # from which we directly get the node_resources. + worker_nodes = self.provider.non_terminated_nodes( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + worker_node_ips = [ + self.provider.internal_ip(node_id) + for node_id in worker_nodes + ] + for ip in worker_node_ips: + if ip in static_node_resources: + self.node_types[NODE_TYPE_LEGACY_WORKER][ + "resources"] = static_node_resources[ip] + assert self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"] + def _get_concurrent_resource_demand_to_launch( self, to_launch: Dict[NodeType, int], connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID], diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index 6aa6a7bb6..7b5314d88 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -12,6 +12,7 @@ import ray._private.services as services from ray.autoscaler._private.providers import _get_default_config, \ _NODE_PROVIDERS from ray.autoscaler._private.docker import validate_docker_config +from ray.autoscaler.tags import NODE_TYPE_LEGACY_WORKER, NODE_TYPE_LEGACY_HEAD REQUIRED, OPTIONAL = True, False RAY_SCHEMA_PATH = os.path.join( @@ -98,15 +99,45 @@ def prepare_config(config): return with_defaults +def rewrite_legacy_yaml_to_available_node_types( + config: Dict[str, Any]) -> Dict[str, Any]: + if "available_node_types" in config: + return config + else: + # TODO(ameer/ekl/alex): we can also rewrite here many other fields + # that include initialization/setup/start commands and ImageId. + config["available_node_types"] = { + NODE_TYPE_LEGACY_HEAD: { + "node_config": config["head_node"], + "resources": {}, + "min_workers": 0, + "max_workers": 0, + }, + NODE_TYPE_LEGACY_WORKER: { + "node_config": config["worker_nodes"], + "resources": {}, + "min_workers": config["min_workers"], + "max_workers": config["max_workers"], + }, + } + config["head_node_type"] = NODE_TYPE_LEGACY_HEAD + config["worker_default_node_type"] = NODE_TYPE_LEGACY_WORKER + return config + + def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: defaults = _get_default_config(config["provider"]) defaults.update(config) defaults["auth"] = defaults.get("auth", {}) + defaults = rewrite_legacy_yaml_to_available_node_types(defaults) try: defaults = _fillout_available_node_types_resources(defaults) - except Exception: - logger.exception("Failed to autodetect node resources.") + except ValueError: + # When the user uses a wrong instance type. raise + except Exception: + # When the user is using e.g., staroid, but it is not installed. + logger.exception("Failed to autodetect node resources.") return defaults diff --git a/python/ray/autoscaler/tags.py b/python/ray/autoscaler/tags.py index 2326f54e3..8ad263d1c 100644 --- a/python/ray/autoscaler/tags.py +++ b/python/ray/autoscaler/tags.py @@ -13,6 +13,10 @@ NODE_KIND_UNMANAGED = "unmanaged" # Tag for user defined node types (e.g., m4xl_spot). This is used for multi # node type clusters. TAG_RAY_USER_NODE_TYPE = "ray-user-node-type" +# Tag for autofilled node types for legacy cluster yamls without multi +# node type defined in the cluster configs. +NODE_TYPE_LEGACY_HEAD = "ray-legacy-head-node-type" +NODE_TYPE_LEGACY_WORKER = "ray-legacy-worker-node-type" # Tag that reports the current state of the node (e.g. Updating, Up-to-date) TAG_RAY_NODE_STATUS = "ray-node-status" diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 0fcee585c..6029eee16 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -1053,8 +1053,6 @@ class AutoscalingTest(unittest.TestCase): def testReportsConfigFailures(self): config = copy.deepcopy(SMALL_CLUSTER) - config["provider"]["type"] = "external" - config = prepare_config(config) config["provider"]["type"] = "mock" config_path = self.write_config(config) self.provider = MockProvider() diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index 6f70bba70..27e2a7bb5 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -23,10 +23,8 @@ CONFIG_PATHS += recursive_fnmatch( class AutoscalingConfigTest(unittest.TestCase): def testValidateDefaultConfig(self): for config_path in CONFIG_PATHS: - if ("aws/example-multi-node-type.yaml" in config_path - or "staroid/example-multi-node-type.yaml" in config_path): + if "aws/example-multi-node-type.yaml" in config_path: # aws is tested in testValidateDefaultConfigAWSMultiNodeTypes. - # staroid fails as it requires an installation of staroid. continue with open(config_path) as f: config = yaml.safe_load(f) diff --git a/python/ray/tests/test_cli_patterns/test_ray_attach.txt b/python/ray/tests/test_cli_patterns/test_ray_attach.txt index e743b90cd..f895234a1 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_attach.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_attach.txt @@ -1,3 +1,5 @@ +Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. +Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ ubuntu@ip-.+:~\$ exit diff --git a/python/ray/tests/test_cli_patterns/test_ray_exec.txt b/python/ray/tests/test_cli_patterns/test_ray_exec.txt index 975ba3b52..1720747dd 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_exec.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_exec.txt @@ -1,3 +1,5 @@ +Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. +Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ This is a test! diff --git a/python/ray/tests/test_cli_patterns/test_ray_submit.txt b/python/ray/tests/test_cli_patterns/test_ray_submit.txt index efc900092..61d3e722e 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_submit.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_submit.txt @@ -1,5 +1,9 @@ +Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. +Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ +Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. +Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ This is a test! diff --git a/python/ray/tests/test_cli_patterns/test_ray_up.txt b/python/ray/tests/test_cli_patterns/test_ray_up.txt index 48bc59c71..7e52609b3 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up.txt @@ -1,5 +1,7 @@ Cluster: test-cli +Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. +Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings AWS config IAM Profile: .+ \[default\] diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml b/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml index 14f1b924d..1410708f5 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml +++ b/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml @@ -5,7 +5,7 @@ file_mounts: ~/tests: . head_node: ImageId: latest_dlami - InstanceType: t3a.small + InstanceType: t1.micro head_setup_commands: - echo head head_start_ray_commands: @@ -30,9 +30,9 @@ setup_commands: target_utilization_fraction: 0.9 worker_nodes: ImageId: latest_dlami - InstanceType: t3a.small + InstanceType: t1.micro worker_setup_commands: - echo worker worker_start_ray_commands: - ray stop - - ray start --address=$RAY_HEAD_IP \ No newline at end of file + - ray start --address=$RAY_HEAD_IP diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt index 2e70f7aa6..5370014ad 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt @@ -1,4 +1,6 @@ .+\.py.*Cluster: test-cli +.+\.py.*Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. +.+\.py.*Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. .+\.py.*Checking AWS environment settings .+\.py.*Creating new IAM instance profile ray-autoscaler-v1 for use as the default\. .+\.py.*Creating new IAM role ray-autoscaler-v1 for use as the default instance role\. diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 7b0748f55..376910afe 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -7,6 +7,8 @@ import unittest import copy import ray +from ray.autoscaler._private.util import \ + rewrite_legacy_yaml_to_available_node_types from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ MockProcessRunner from ray.autoscaler._private.providers import (_NODE_PROVIDERS, @@ -21,7 +23,9 @@ from ray.gcs_utils import PlacementGroupTableData from ray.core.generated.common_pb2 import Bundle, PlacementStrategy from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \ NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \ - STATUS_UP_TO_DATE, STATUS_UNINITIALIZED + STATUS_UP_TO_DATE, STATUS_UNINITIALIZED, \ + NODE_KIND_HEAD, NODE_TYPE_LEGACY_WORKER, \ + NODE_TYPE_LEGACY_HEAD from ray.test_utils import same_elements from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE @@ -293,7 +297,7 @@ def test_get_nodes_to_launch_with_min_workers(): to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{ "GPU": 8 - }], utilizations, []) + }], utilizations, [], {}) assert to_launch == {"p2.8xlarge": 1} @@ -315,7 +319,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): # requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}] to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, - utilizations, []) + utilizations, [], {}) assert to_launch == {"p2.xlarge": 1} # 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand. @@ -324,7 +328,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): new_types["p2.8xlarge"]["min_workers"] = 3 scheduler = ResourceDemandScheduler(provider, new_types, 10) to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, - utilizations, []) + utilizations, [], {}) # Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)] assert to_launch == {"p2.8xlarge": 1} @@ -342,7 +346,7 @@ def test_get_nodes_to_launch_limits(): to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{ "GPU": 8 - }] * 2, utilizations, []) + }] * 2, utilizations, [], {}) assert to_launch == {} @@ -362,7 +366,7 @@ def test_calculate_node_resources(): # requires 4 p2.8xls (only 3 are in cluster/pending) demands = [{"GPU": 8}] * (len(utilizations) + 2) to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, - utilizations, []) + utilizations, [], {}) assert to_launch == {"p2.8xlarge": 1} @@ -403,7 +407,7 @@ def test_backlog_queue_impact_on_binpacking_time(): AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE t1 = time.time() to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, demands, - usage_by_ip, []) + usage_by_ip, [], {}) t2 = time.time() assert t2 - t1 < time_to_assert print("The time took to launch", to_launch, @@ -511,8 +515,8 @@ class TestPlacementGroupScaling: strategy=PlacementStrategy.SPREAD, bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), ] - to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands, - {}, pending_placement_groups) + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, resource_demands, {}, pending_placement_groups, {}) assert to_launch == {"p2.8xlarge": 2} def test_many_strict_spreads(self): @@ -535,8 +539,8 @@ class TestPlacementGroupScaling: # Each placement group will take up 2 GPUs per node, but the distinct # placement groups should still reuse the same nodes. pending_placement_groups = pending_placement_groups * 3 - to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands, - {}, pending_placement_groups) + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, resource_demands, {}, pending_placement_groups, {}) assert to_launch == {"p2.8xlarge": 1} def test_packing(self): @@ -557,8 +561,8 @@ class TestPlacementGroupScaling: ] # The 2 resource demand gpus should still be packed onto the same node # as the 6 GPU placement group. - to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands, - {}, pending_placement_groups) + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, resource_demands, {}, pending_placement_groups, {}) assert to_launch == {} @@ -679,7 +683,7 @@ def test_get_nodes_to_launch_max_launch_concurrency(): scheduler = ResourceDemandScheduler(provider, new_types, 30) - to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, []) + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {}) # Respects min_workers despite concurrency limitation. assert to_launch == {"p2.8xlarge": 4} @@ -698,7 +702,7 @@ def test_get_nodes_to_launch_max_launch_concurrency(): # requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running} demands = [{"GPU": 8}] * (len(utilizations) + 40) to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, - utilizations, []) + utilizations, [], {}) # Enforces max launch to 5 when < 5 running. 2 are pending/launching. assert to_launch == {"p2.8xlarge": 3} @@ -715,12 +719,137 @@ def test_get_nodes_to_launch_max_launch_concurrency(): # Requires additional 17 p2.8xls (now 1 pending, 1 launching, 8 running} demands = [{"GPU": 8}] * (len(utilizations) + 15) to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, - utilizations, []) + utilizations, [], {}) # We are allowed to launch up to 8 more since 8 are running. # We already have 2 pending/launching, so only 6 remain. assert to_launch == {"p2.8xlarge": 6} +def test_rewrite_legacy_yaml_to_available_node_types(): + cluster_config = copy.deepcopy(SMALL_CLUSTER) # Legacy cluster_config. + cluster_config = rewrite_legacy_yaml_to_available_node_types( + cluster_config) + assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD][ + "max_workers"] == 0 + assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD][ + "min_workers"] == 0 + assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD][ + "node_config"] == SMALL_CLUSTER["head_node"] + + assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][ + "node_config"] == SMALL_CLUSTER["worker_nodes"] + assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][ + "max_workers"] == SMALL_CLUSTER["max_workers"] + assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][ + "min_workers"] == SMALL_CLUSTER["min_workers"] + + +def test_handle_legacy_cluster_config_yaml(): + provider = MockProvider() + head_resources = {"CPU": 8, "GPU": 1} + worker_resources = {"CPU": 32, "GPU": 8} + cluster_config = copy.deepcopy(SMALL_CLUSTER) # Legacy cluster_config. + cluster_config = rewrite_legacy_yaml_to_available_node_types( + cluster_config) + scheduler = ResourceDemandScheduler( + provider, cluster_config["available_node_types"], 0) + provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) + head_ip = provider.non_terminated_node_ips({})[0] + head_node_id = provider.non_terminated_nodes({})[0] + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], + {head_ip: head_resources}) + assert to_launch == {} # Should always be empty with max_workers = 0. + + scheduler.max_workers = 30 + min_workers = scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] = 0 + to_launch = scheduler.get_nodes_to_launch([head_node_id], {}, [], {}, [], + {head_ip: head_resources}) + assert to_launch == { + } # Since the resource demand does not require adding nodes. + to_launch = scheduler.get_nodes_to_launch([head_node_id], {}, + [head_resources], {}, [], + {head_ip: head_resources}) + assert to_launch == { + } # Since the resource demand does not require adding nodes. + + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] = min_workers + # Returns min_workers when min_workers>0. + to_launch = scheduler.get_nodes_to_launch([head_node_id], {}, + [head_resources], {}, [], + {head_ip: head_resources}) + assert to_launch == {NODE_TYPE_LEGACY_WORKER: min_workers} + + provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_WORKER + }, min_workers) + nodes = provider.non_terminated_nodes({}) + to_launch = scheduler.get_nodes_to_launch(nodes, {}, [head_resources], {}, + [], {head_ip: head_resources}) + assert to_launch == {} # A node is running, at some point it'll connect. + pending_launches = {NODE_TYPE_LEGACY_WORKER: 4} + to_launch = scheduler.get_nodes_to_launch([], pending_launches, + [head_resources], {}, [], + {head_ip: head_resources}) + assert to_launch == {} # A node is launching, at some point it'll connect. + + # Now assume that we already launched/connected the nodes. + ips = provider.non_terminated_node_ips({}) + lm = LoadMetrics() + worker_ips = [] + for ip in ips: + if ip == head_ip: + lm.update(ip, head_resources, head_resources, {}) + else: + lm.update(ip, worker_resources, worker_resources, {}) + worker_ips.append(ip) + + assert not scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["resources"] + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, [], {}, [], lm.get_static_node_resources_by_ip()) + assert scheduler.node_types[NODE_TYPE_LEGACY_WORKER][ + "resources"] == worker_resources + assert to_launch == {} + utilizations = {ip: worker_resources for ip in worker_ips} + utilizations[head_ip] = head_resources + # Requires 4 nodes since worker resources is bigger than head reasources. + demands = [worker_resources] * (len(utilizations) + 3) + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, demands, utilizations, [], + lm.get_static_node_resources_by_ip()) + # 4 nodes are necessary to meet resource demand, but we never exceed + # max_workers. + assert to_launch == {} + scheduler.max_workers = 10 + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, demands, utilizations, [], + lm.get_static_node_resources_by_ip()) + # 4 nodes are necessary to meet resource demand, but we never exceed + # max_workers. + assert to_launch == {} + scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10 + to_launch = scheduler.get_nodes_to_launch( + nodes, {}, demands, utilizations, [], + lm.get_static_node_resources_by_ip()) + # 4 nodes are necessary to meet resource demand. + assert to_launch == {NODE_TYPE_LEGACY_WORKER: 4} + to_launch = scheduler.get_nodes_to_launch(nodes, pending_launches, demands, + utilizations, [], + lm.get_node_resources()) + # 0 because there are 4 pending launches and we only need 4. + assert to_launch == {} + to_launch = scheduler.get_nodes_to_launch(nodes, pending_launches, + demands * 2, utilizations, [], + lm.get_node_resources()) + # 1 because there are 4 pending launches and we only allow a max of 5. + assert to_launch == {NODE_TYPE_LEGACY_WORKER: 1} + + class LoadMetricsTest(unittest.TestCase): def testResourceDemandVector(self): lm = LoadMetrics()