From 7466ce82df32c47364e1a67e6aa29c68362eb801 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Wed, 14 Oct 2020 13:11:46 -0700 Subject: [PATCH] [Autoscaler] Placement group autoscaling (#11243) --- python/ray/autoscaler/_private/autoscaler.py | 7 +- .../ray/autoscaler/_private/load_metrics.py | 26 +- .../_private/resource_demand_scheduler.py | 225 +++++++++++++--- python/ray/monitor.py | 6 +- .../tests/test_resource_demand_scheduler.py | 249 +++++++++++++++++- 5 files changed, 455 insertions(+), 58 deletions(-) diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 45d573a5f..37da490ed 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -211,11 +211,14 @@ class StandardAutoscaler: if self.resource_demand_scheduler: resource_demand_vector = self.resource_demand_vector + \ self.load_metrics.get_resource_demand_vector() - to_launch = (self.resource_demand_scheduler.get_nodes_to_launch( + pending_placement_groups = \ + self.load_metrics.get_pending_placement_groups() + to_launch = self.resource_demand_scheduler.get_nodes_to_launch( self.provider.non_terminated_nodes(tag_filters={}), self.pending_launches.breakdown(), resource_demand_vector, - self.load_metrics.get_resource_utilization())) + self.load_metrics.get_resource_utilization(), + pending_placement_groups) for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 4823ddf1b..060a56690 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -1,9 +1,11 @@ import logging import time +from typing import Dict, List import numpy as np import ray._private.services as services from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES +from ray.gcs_utils import PlacementGroupTableData logger = logging.getLogger(__name__) @@ -26,16 +28,18 @@ class LoadMetrics: ) if local_ip is None else local_ip self.waiting_bundles = [] self.infeasible_bundles = [] + self.pending_placement_groups = [] def update(self, - ip, - static_resources, - update_dynamic_resources, - dynamic_resources, - update_resource_load, - resource_load, - waiting_bundles=None, - infeasible_bundles=None): + ip: str, + static_resources: Dict[str, Dict], + update_dynamic_resources: bool, + dynamic_resources: Dict[str, Dict], + update_resource_load: bool, + resource_load: Dict[str, Dict], + waiting_bundles: List[Dict[str, float]] = None, + infeasible_bundles: List[Dict[str, float]] = None, + pending_placement_groups: List[PlacementGroupTableData] = None): # If light heartbeat enabled, only resources changed will be received. # We should update the changed part and compare static_resources with # dynamic_resources using those updated. @@ -50,6 +54,8 @@ class LoadMetrics: waiting_bundles = [] if not infeasible_bundles: infeasible_bundles = [] + if not pending_placement_groups: + pending_placement_groups = [] # We are not guaranteed to have a corresponding dynamic resource # for every static resource because dynamic resources are based on @@ -69,6 +75,7 @@ class LoadMetrics: self.last_heartbeat_time_by_ip[ip] = now self.waiting_bundles = waiting_bundles self.infeasible_bundles = infeasible_bundles + self.pending_placement_groups = pending_placement_groups def mark_active(self, ip): assert ip is not None, "IP should be known at this time" @@ -159,6 +166,9 @@ class LoadMetrics: def get_resource_demand_vector(self): return self.waiting_bundles + self.infeasible_bundles + def get_pending_placement_groups(self): + return self.pending_placement_groups + def info_string(self): return " - " + "\n - ".join( ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]) diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index dafe2afe5..a5edcd001 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -11,9 +11,12 @@ import copy import numpy as np import logging import collections +from numbers import Number from typing import List, Dict from ray.autoscaler.node_provider import NodeProvider +from ray.gcs_utils import PlacementGroupTableData +from ray.core.generated.common_pb2 import PlacementStrategy from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, \ STATUS_UPDATE_FAILED, STATUS_UP_TO_DATE, TAG_RAY_NODE_STATUS @@ -26,7 +29,7 @@ NodeType = str NodeTypeConfigDict = str # e.g., {"GPU": 1}. -ResourceDict = str +ResourceDict = Dict[str, Number] # e.g., IP address of the node. NodeID = str @@ -43,15 +46,19 @@ class ResourceDemandScheduler: def get_nodes_to_launch( self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], resource_demands: List[ResourceDict], - usage_by_ip: Dict[str, ResourceDict]) -> Dict[NodeType, int]: + usage_by_ip: Dict[str, ResourceDict], + pending_placement_groups: List[PlacementGroupTableData] + ) -> Dict[NodeType, int]: """Given resource demands, return node types to add to the cluster. This method: (1) calculates the resources present in the cluster. (2) calculates the remaining nodes to add to respect min_workers - constraint per node type. - (3) calculates the unfulfilled resource bundles. - (4) calculates which nodes need to be launched to fulfill all + constraint per node type. + (3) for each strict spread placement group, reserve space on + available nodes and launch new nodes if necessary. + (4) calculates the unfulfilled resource bundles. + (5) calculates which nodes need to be launched to fulfill all the bundle requests, subject to max_worker constraints. Args: @@ -61,31 +68,44 @@ class ResourceDemandScheduler: usage_by_ip: Mapping from ip to available resources. """ + node_resources: List[ResourceDict] + node_type_counts: Dict[NodeType, int] node_resources, node_type_counts = \ self.calculate_node_resources(nodes, pending_nodes, usage_by_ip) + logger.info("Cluster resources: {}".format(node_resources)) + logger.info("Node counts: {}".format(node_type_counts)) + # Step 2: add nodes to add to satisfy min_workers for each type node_resources, node_type_counts, min_workers_nodes_to_add = \ _add_min_workers_nodes( node_resources, node_type_counts, self.node_types) - nodes_to_add_based_on_demand = {} - if resource_demands is not None: - logger.info("Cluster resources: {}".format(node_resources)) - logger.info("Node counts: {}".format(node_type_counts)) - unfulfilled = get_bin_pack_residual(node_resources, - resource_demands) - logger.info("Resource demands: {}".format(resource_demands)) - logger.info("Unfulfilled demands: {}".format(unfulfilled)) - max_to_add = self.max_workers - sum(node_type_counts.values()) - nodes_to_add_based_on_demand = get_nodes_for( - self.node_types, node_type_counts, max_to_add, unfulfilled) + # Step 3: add nodes for strict spread groups + logger.info(f"Placement group demands: {pending_placement_groups}") + placement_group_demand_vector, strict_spreads = \ + placement_groups_to_resource_demands(pending_placement_groups) + resource_demands.extend(placement_group_demand_vector) + placement_group_nodes_to_add, node_resources, node_type_counts = \ + self.reserve_and_allocate_spread( + strict_spreads, node_resources, node_type_counts) + + # Step 4/5: add nodes for pending tasks, actors, and non-strict spread + # groups + unfulfilled, _ = get_bin_pack_residual(node_resources, + resource_demands) + logger.info("Resource demands: {}".format(resource_demands)) + logger.info("Unfulfilled demands: {}".format(unfulfilled)) + max_to_add = self.max_workers - sum(node_type_counts.values()) + nodes_to_add_based_on_demand = get_nodes_for( + self.node_types, node_type_counts, max_to_add, unfulfilled) # Merge nodes to add based on demand and nodes to add based on # min_workers constraint. We add them because nodes to add based on # demand was calculated after the min_workers constraint was respected. total_nodes_to_add = {} for node_type in self.node_types: - nodes_to_add = nodes_to_add_based_on_demand.get(node_type, 0) + \ - min_workers_nodes_to_add.get(node_type, 0) + nodes_to_add = (min_workers_nodes_to_add.get( + node_type, 0) + placement_group_nodes_to_add.get(node_type, 0) + + nodes_to_add_based_on_demand.get(node_type, 0)) if nodes_to_add > 0: total_nodes_to_add[node_type] = nodes_to_add @@ -220,6 +240,53 @@ class ResourceDemandScheduler: return node_resources, node_type_counts + def reserve_and_allocate_spread(self, + strict_spreads: List[List[ResourceDict]], + node_resources: List[ResourceDict], + node_type_counts: Dict[NodeType, int]): + """For each strict spread, attempt to reserve as much space as possible + on the node, then allocate new nodes for the unfulfilled portion. + + Args: + strict_spreads (List[List[ResourceDict]]): A list of placement + groups which must be spread out. + node_resources (List[ResourceDict]): Available node resources in + the cluster. + node_type_counts (Dict[NodeType, int]): The amount of each type of + node pending or in the cluster. + + Returns: + Dict[NodeType, int]: Nodes to add. + List[ResourceDict]: The updated node_resources after the method. + Dict[NodeType, int]: The updated node_type_counts. + + """ + to_add = collections.defaultdict(int) + for bundles in strict_spreads: + # Try to pack as many bundles of this group as possible on existing + # nodes. The remaining will be allocated on new nodes. + unfulfilled, node_resources = get_bin_pack_residual( + node_resources, bundles, strict_spread=True) + max_to_add = self.max_workers - sum(node_type_counts.values()) + # Allocate new nodes for the remaining bundles that don't fit. + to_launch = get_nodes_for( + self.node_types, + node_type_counts, + max_to_add, + unfulfilled, + strict_spread=True) + _inplace_add(node_type_counts, to_launch) + _inplace_add(to_add, to_launch) + new_node_resources = _node_type_counts_to_node_resources( + self.node_types, to_launch) + # Update node resources to include newly launched nodes and their + # bundles. + unfulfilled, including_reserved = get_bin_pack_residual( + new_node_resources, unfulfilled, strict_spread=True) + assert not unfulfilled + node_resources += including_reserved + return to_add, node_resources, node_type_counts + def debug_string(self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int], usage_by_ip: Dict[str, ResourceDict]) -> str: @@ -235,6 +302,19 @@ class ResourceDemandScheduler: return out +def _node_type_counts_to_node_resources( + node_types: Dict[NodeType, NodeTypeConfigDict], + node_type_counts: Dict[NodeType, int]) -> List[ResourceDict]: + """Converts a node_type_counts dict into a list of node_resources.""" + resources = [] + for node_type, count in node_type_counts.items(): + # Be careful, each entry in the list must be deep copied! + resources += [ + node_types[node_type]["resources"].copy() for _ in range(count) + ] + return resources + + def _add_min_workers_nodes( node_resources: List[ResourceDict], node_type_counts: Dict[NodeType, int], @@ -269,8 +349,10 @@ def _add_min_workers_nodes( def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], - existing_nodes: Dict[NodeType, int], max_to_add: int, - resources: List[ResourceDict]) -> Dict[NodeType, int]: + existing_nodes: Dict[NodeType, int], + max_to_add: int, + resources: List[ResourceDict], + strict_spread: bool = False) -> Dict[NodeType, int]: """Determine nodes to add given resource demands and constraints. Args: @@ -279,12 +361,14 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], This sets constraints on the number of new nodes to add. max_to_add: global constraint on nodes to add. resources: resource demands to fulfill. + strict_spread: If true, each element in `resources` must be placed on a + different node. Returns: Dict of count to add for each node type. + """ nodes_to_add = collections.defaultdict(int) - allocated_resources = [] while resources and sum(nodes_to_add.values()) < max_to_add: utilization_scores = [] @@ -293,12 +377,21 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], node_type, 0) >= node_types[node_type]["max_workers"]): continue node_resources = node_types[node_type]["resources"] - score = _utilization_score(node_resources, resources) + if strict_spread: + # If handling strict spread, only one bundle can be placed on + # the node. + score = _utilization_score(node_resources, [resources[0]]) + else: + score = _utilization_score(node_resources, resources) if score is not None: utilization_scores.append((score, node_type)) # Give up, no feasible node. if not utilization_scores: + # TODO (Alex): We will hit this case every time a placement group + # starts up because placement groups are scheduled via custom + # resources. This will behave properly with the current utilization + # score heuristic, but it's a little dangerous and misleading. logger.info( "No feasible node type to add for {}".format(resources)) break @@ -306,10 +399,14 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], utilization_scores = sorted(utilization_scores, reverse=True) best_node_type = utilization_scores[0][1] nodes_to_add[best_node_type] += 1 - allocated_resources.append(node_types[best_node_type]["resources"]) - residual = get_bin_pack_residual(allocated_resources[-1:], resources) - assert len(residual) < len(resources), (resources, residual) - resources = residual + if strict_spread: + resources = resources[1:] + else: + allocated_resource = node_types[best_node_type]["resources"] + residual, _ = get_bin_pack_residual([allocated_resource], + resources) + assert len(residual) < len(resources), (resources, residual) + resources = residual return nodes_to_add @@ -336,9 +433,9 @@ def _utilization_score(node_resources: ResourceDict, return (min(util_by_resources), np.mean(util_by_resources)) -def get_bin_pack_residual( - node_resources: List[ResourceDict], - resource_demands: List[ResourceDict]) -> List[ResourceDict]: +def get_bin_pack_residual(node_resources: List[ResourceDict], + resource_demands: List[ResourceDict], + strict_spread: bool = False) -> List[ResourceDict]: """Return a subset of resource_demands that cannot fit in the cluster. TODO(ekl): this currently does not guarantee the resources will be packed @@ -349,26 +446,37 @@ def get_bin_pack_residual( node_resources (List[ResourceDict]): List of resources per node. resource_demands (List[ResourceDict]): List of resource bundles that need to be bin packed onto the nodes. + strict_spread (bool): If true, each element in resource_demands must be + placed on a different entry in `node_resources`. Returns: List[ResourceDict] the residual list resources that do not fit. + """ unfulfilled = [] # A most naive bin packing algorithm. nodes = copy.deepcopy(node_resources) + used = [] for demand in resource_demands: found = False - for node in nodes: + node = None + for i in range(len(nodes)): + node = nodes[i] if _fits(node, demand): - _inplace_subtract(node, demand) found = True + # In the strict_spread case, we can't reuse nodes. + if strict_spread: + used.append(node) + del nodes[i] break - if not found: + if found and node: + _inplace_subtract(node, demand) + else: unfulfilled.append(demand) - return unfulfilled + return unfulfilled, nodes + used def _fits(node: ResourceDict, resources: ResourceDict) -> bool: @@ -383,3 +491,54 @@ def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None: assert k in node, (k, node) node[k] -= v assert node[k] >= 0.0, (node, k, v) + + +def _inplace_add(a: collections.defaultdict, b: Dict) -> None: + """Generically adds values in `b` to `a`. + a[k] should be defined for all k in b.keys()""" + for k, v in b.items(): + a[k] += v + + +def placement_groups_to_resource_demands( + pending_placement_groups: List[PlacementGroupTableData]): + """Preprocess placement group requests into regular resource demand vectors + when possible. The policy is: + * STRICT_PACK - Convert to a single bundle. + * PACK - Flatten into a resource demand vector. + * STRICT_SPREAD - Cannot be converted. + * SPREAD - Flatten into a resource demand vector. + + Args: + pending_placement_groups (List[PlacementGroupData]): List of + PlacementGroupLoad's. + + Returns: + List[ResourceDict]: The placement groups which were converted to a + resource demand vector. + List[List[ResourceDict]]: The placement groups which should be strictly + spread. + """ + resource_demand_vector = [] + unconverted = [] + for placement_group in pending_placement_groups: + shapes = [ + dict(bundle.unit_resources) for bundle in placement_group.bundles + ] + if (placement_group.strategy == PlacementStrategy.PACK + or placement_group.strategy == PlacementStrategy.SPREAD): + resource_demand_vector.extend(shapes) + elif placement_group.strategy == PlacementStrategy.STRICT_PACK: + combined = collections.defaultdict(float) + for shape in shapes: + for label, quantity in shape.items(): + combined[label] += quantity + resource_demand_vector.append(combined) + elif (placement_group.strategy == PlacementStrategy.STRICT_SPREAD): + unconverted.append(shapes) + else: + logger.error( + f"Unknown placement group request type: {placement_group}. " + f"Please file a bug report " + f"https://github.com/ray-project/ray/issues/new.") + return resource_demand_vector, unconverted diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 0464f221d..f352c49a2 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -143,6 +143,9 @@ class Monitor: waiting_bundles, infeasible_bundles = \ parse_resource_demands(message.resource_load_by_shape) + pending_placement_groups = list( + message.placement_group_load.placement_group_data) + # Update the load metrics for this raylet. client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) ip = self.raylet_id_to_ip_map.get(client_id) @@ -154,7 +157,8 @@ class Monitor: self.load_metrics.update( ip, total_resources, update_available_resources, available_resources, update_resource_load, resource_load, - waiting_bundles, infeasible_bundles) + waiting_bundles, infeasible_bundles, + pending_placement_groups) else: logger.warning( f"Monitor: could not find ip for client {client_id}") diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 33c822b84..067d0c8f0 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -17,6 +17,8 @@ from ray.autoscaler._private.commands import get_or_create_head_node from ray.autoscaler._private.resource_demand_scheduler import \ _utilization_score, _add_min_workers_nodes, \ get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler +from ray.gcs_utils import PlacementGroupTableData +from ray.core.generated.common_pb2 import Bundle, PlacementStrategy from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \ NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \ STATUS_UP_TO_DATE, STATUS_UNINITIALIZED @@ -94,15 +96,44 @@ def test_util_score(): def test_bin_pack(): - assert get_bin_pack_residual([], [{"GPU": 2}, {"GPU": 2}]) == \ + assert get_bin_pack_residual([], [{"GPU": 2}, {"GPU": 2}])[0] == \ [{"GPU": 2}, {"GPU": 2}] - assert get_bin_pack_residual([{"GPU": 2}], [{"GPU": 2}, {"GPU": 2}]) == \ - [{"GPU": 2}] - assert get_bin_pack_residual([{"GPU": 4}], [{"GPU": 2}, {"GPU": 2}]) == [] + assert get_bin_pack_residual([{"GPU": 2}], [{"GPU": 2}, {"GPU": 2}])[0] \ + == [{"GPU": 2}] + assert get_bin_pack_residual([{ + "GPU": 4 + }], [{ + "GPU": 2 + }, { + "GPU": 2 + }])[0] == [] arg = [{"GPU": 2}, {"GPU": 2, "CPU": 2}] - assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [] + assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}])[0] == [] arg = [{"CPU": 2}, {"GPU": 2}] - assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [{"GPU": 2}] + assert get_bin_pack_residual(arg, [{ + "GPU": 2 + }, { + "GPU": 2 + }])[0] == [{ + "GPU": 2 + }] + arg = [{"GPU": 3}] + assert get_bin_pack_residual( + arg, [{ + "GPU": 1 + }, { + "GPU": 1 + }], strict_spread=False)[0] == [] + assert get_bin_pack_residual( + arg, [{ + "GPU": 1 + }, { + "GPU": 1 + }], strict_spread=True) == ([{ + "GPU": 1 + }], [{ + "GPU": 2 + }]) def test_get_nodes_packing_heuristic(): @@ -135,6 +166,18 @@ def test_get_nodes_packing_heuristic(): assert get_nodes_for( TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \ {"m4.16xlarge": 1, "p2.8xlarge": 1} + assert get_nodes_for( + TYPES_A, {}, 9999, [{ + "GPU": 1 + }] * 8, strict_spread=False) == { + "p2.8xlarge": 1 + } + assert get_nodes_for( + TYPES_A, {}, 9999, [{ + "GPU": 1 + }] * 8, strict_spread=True) == { + "p2.xlarge": 8 + } def test_get_nodes_respects_max_limit(): @@ -248,7 +291,7 @@ def test_get_nodes_to_launch_with_min_workers(): to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{ "GPU": 8 - }], utilizations) + }], utilizations, []) assert to_launch == {"p2.8xlarge": 1} @@ -270,7 +313,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): # requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}] to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, - utilizations) + utilizations, []) assert to_launch == {"p2.xlarge": 1} # 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand. @@ -279,7 +322,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): new_types["p2.8xlarge"]["min_workers"] = 3 scheduler = ResourceDemandScheduler(provider, new_types, 10) to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, - utilizations) + utilizations, []) # Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)] assert to_launch == {"p2.8xlarge": 1} @@ -297,7 +340,7 @@ def test_get_nodes_to_launch_limits(): to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{ "GPU": 8 - }] * 2, utilizations) + }] * 2, utilizations, []) assert to_launch == {} @@ -317,11 +360,101 @@ def test_calculate_node_resources(): # requires 4 p2.8xls (only 3 are in cluster/pending) demands = [{"GPU": 8}] * (len(utilizations) + 2) to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, - utilizations) + utilizations, []) assert to_launch == {"p2.8xlarge": 1} +class TestPlacementGroupScaling: + def test_strategies(self): + provider = MockProvider() + scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + + provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) + # At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is + # fully idle. + nodes = provider.non_terminated_nodes({}) + + resource_demands = [{"GPU": 4}] * 2 + pending_placement_groups = [ + # Requires a new node (only uses 2 GPUs on it though). + PlacementGroupTableData( + state=PlacementGroupTableData.PENDING, + strategy=PlacementStrategy.STRICT_SPREAD, + bundles=[ + Bundle(unit_resources={"GPU": 2}), + Bundle(unit_resources={"GPU": 2}), + Bundle(unit_resources={"GPU": 2}) + ]), + # Requires a new node (uses the whole node). + PlacementGroupTableData( + state=PlacementGroupTableData.PENDING, + strategy=PlacementStrategy.STRICT_PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 4)), + # Fits across the machines that strict spread. + PlacementGroupTableData( + # runs on. + state=PlacementGroupTableData.PENDING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + # Fits across the machines that strict spread. + PlacementGroupTableData( + # runs on. + state=PlacementGroupTableData.PENDING, + strategy=PlacementStrategy.SPREAD, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + ] + to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands, + {}, pending_placement_groups) + assert to_launch == {"p2.8xlarge": 2} + + def test_many_strict_spreads(self): + provider = MockProvider() + scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + + provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) + # At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is + # fully idle. + nodes = provider.non_terminated_nodes({}) + + resource_demands = [{"GPU": 1}] * 6 + pending_placement_groups = [ + # Requires a new node (only uses 2 GPUs on it though). + PlacementGroupTableData( + state=PlacementGroupTableData.PENDING, + strategy=PlacementStrategy.STRICT_SPREAD, + bundles=[Bundle(unit_resources={"GPU": 2})] * 3), + ] + # Each placement group will take up 2 GPUs per node, but the distinct + # placement groups should still reuse the same nodes. + pending_placement_groups = pending_placement_groups * 3 + to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands, + {}, pending_placement_groups) + assert to_launch == {"p2.8xlarge": 1} + + def test_packing(self): + provider = MockProvider() + scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + + provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) + # At this point our cluster has 1 p2.8xlarge instances (8 GPUs) and is + # fully idle. + nodes = provider.non_terminated_nodes({}) + + resource_demands = [{"GPU": 1}] * 2 + pending_placement_groups = [ + PlacementGroupTableData( + state=PlacementGroupTableData.PENDING, + strategy=PlacementStrategy.STRICT_PACK, + bundles=[Bundle(unit_resources={"GPU": 2})] * 3), + ] + # The 2 resource demand gpus should still be packed onto the same node + # as the 6 GPU placement group. + to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands, + {}, pending_placement_groups) + assert to_launch == {} + + def test_get_concurrent_resource_demand_to_launch(): node_types = copy.deepcopy(TYPES_A) node_types["p2.8xlarge"]["min_workers"] = 1 @@ -441,7 +574,7 @@ def test_get_nodes_to_launch_max_launch_concurrency(): scheduler = ResourceDemandScheduler(provider, new_types, 30) - to_launch = scheduler.get_nodes_to_launch([], {}, [], []) + to_launch = scheduler.get_nodes_to_launch([], {}, [], [], []) # Respects min_workers despite concurrency limitation. assert to_launch == {"p2.8xlarge": 4} @@ -456,7 +589,7 @@ def test_get_nodes_to_launch_max_launch_concurrency(): # requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running} demands = [{"GPU": 8}] * (len(utilizations) + 40) to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, - utilizations) + utilizations, []) # Enforces max launch to 5 when < 5 running. 2 are pending/launching. assert to_launch == {"p2.8xlarge": 3} @@ -471,7 +604,7 @@ def test_get_nodes_to_launch_max_launch_concurrency(): # requires 17 p2.8xls (currently 1 pending, 1 launching, 8 running} demands = [{"GPU": 8}] * (len(utilizations) + 15) to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, - utilizations) + utilizations, []) # We are allowed to launch up to 8 more since 8 are running. # We already have 2 pending/launching, so only 6 remain. assert to_launch == {"p2.8xlarge": 6} @@ -496,6 +629,25 @@ class LoadMetricsTest(unittest.TestCase): "GPU": 1 }]) + def testPlacementGroupLoad(self): + lm = LoadMetrics() + pending_placement_groups = [ + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.SPREAD, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + ] + lm.update( + "1.1.1.1", {}, + True, {}, + True, {}, + pending_placement_groups=pending_placement_groups) + assert lm.get_pending_placement_groups() == pending_placement_groups + class AutoscalingTest(unittest.TestCase): def setUp(self): @@ -577,6 +729,75 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) + def testPlacementGroup(self): + # Note this is mostly an integration test. See + # testPlacementGroupScaling for more comprehensive tests. + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["min_workers"] = 0 + config["max_workers"] = 999 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "head", + TAG_RAY_USER_NODE_TYPE: "m4.4xlarge" + }, 1) + head_ip = self.provider.non_terminated_node_ips({})[0] + assert len(self.provider.non_terminated_nodes({})) == 1 + autoscaler.update() + self.waitForNodes(1) + + pending_placement_groups = [ + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.STRICT_SPREAD, + bundles=[Bundle(unit_resources={"GPU": 2})] * 3), + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 5)), + ] + # Since placement groups are implemented with custom resources, this is + # an example of the accompanying resource demands. Note the resource + # demand autoscaler will be unable to fulfill these demands, but we + # should still handle the other infeasible/waiting bundles. + placement_group_resource_demands = [{ + "GPU_group_0_6c2506ac733bc37496295b02c4fad446": 0.0101, + "GPU_group_6c2506ac733bc37496295b02c4fad446": 0.0101 + }] + lm.update( + head_ip, {"CPU": 16}, + True, {"CPU": 16}, + False, {}, + infeasible_bundles=placement_group_resource_demands, + waiting_bundles=[{ + "GPU": 8 + }], + pending_placement_groups=pending_placement_groups) + autoscaler.update() + self.waitForNodes(5) + + for i in range(1, 5): + assert self.provider.mock_nodes[i].node_type == "p2.8xlarge" + + pending_placement_groups = [ + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.STRICT_PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 4)), + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.SPREAD, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + ] + def testScaleUpMinWorkers(self): config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["min_workers"] = 2