diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index d838c6be1..41b45e22c 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -159,11 +159,15 @@ class ResourceDemandScheduler: node_resources, node_type_counts, self.node_types, self.max_workers, self.head_node_type, ensure_min_cluster_size) - # Step 3: add nodes for strict spread groups - logger.debug(f"Placement group demands: {pending_placement_groups}") + # Step 3: get resource demands of placement groups and return the + # groups that should be strictly spread. + logger.info(f"Placement group demands: {pending_placement_groups}") placement_group_demand_vector, strict_spreads = \ placement_groups_to_resource_demands(pending_placement_groups) - resource_demands.extend(placement_group_demand_vector) + # Place placement groups demand vector at the beginning of the resource + # demands vector to make it consistent (results in the same types of + # nodes to add) with pg_demands_nodes_max_launch_limit calculated later + resource_demands = placement_group_demand_vector + resource_demands if self.is_legacy_yaml() and \ not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: @@ -179,18 +183,32 @@ class ResourceDemandScheduler: return self._legacy_worker_node_to_launch( nodes, launching_nodes, node_resources, resource_demands + request_resources_demands) - placement_group_nodes_to_add, node_resources, node_type_counts = \ + + spread_pg_nodes_to_add, node_resources, node_type_counts = \ self.reserve_and_allocate_spread( strict_spreads, node_resources, node_type_counts) + # Calculate the nodes to add for bypassing max launch limit for + # placement groups and spreads. + unfulfilled_placement_groups_demands, _ = get_bin_pack_residual( + node_resources, placement_group_demand_vector) + # Add 1 to account for the head node. + max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) + pg_demands_nodes_max_launch_limit = get_nodes_for( + self.node_types, node_type_counts, self.head_node_type, max_to_add, + unfulfilled_placement_groups_demands) + placement_groups_nodes_max_limit = { + node_type: spread_pg_nodes_to_add.get(node_type, 0) + + pg_demands_nodes_max_launch_limit.get(node_type, 0) + for node_type in self.node_types + } + # Step 4/5: add nodes for pending tasks, actors, and non-strict spread # groups unfulfilled, _ = get_bin_pack_residual(node_resources, resource_demands) logger.debug("Resource demands: {}".format(resource_demands)) logger.debug("Unfulfilled demands: {}".format(unfulfilled)) - # Add 1 to account for the head node. - max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) nodes_to_add_based_on_demand = get_nodes_for( self.node_types, node_type_counts, self.head_node_type, max_to_add, unfulfilled) @@ -201,15 +219,16 @@ class ResourceDemandScheduler: for node_type in self.node_types: nodes_to_add = (adjusted_min_workers.get( - node_type, 0) + placement_group_nodes_to_add.get(node_type, 0) - + nodes_to_add_based_on_demand.get(node_type, 0)) + node_type, 0) + spread_pg_nodes_to_add.get(node_type, 0) + + nodes_to_add_based_on_demand.get(node_type, 0)) if nodes_to_add > 0: total_nodes_to_add[node_type] = nodes_to_add # Limit the number of concurrent launches total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( total_nodes_to_add, unused_resources_by_ip.keys(), nodes, - launching_nodes, adjusted_min_workers) + launching_nodes, adjusted_min_workers, + placement_groups_nodes_max_limit) logger.debug("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add @@ -288,6 +307,7 @@ class ResourceDemandScheduler: non_terminated_nodes: List[NodeID], pending_launches_nodes: Dict[NodeType, int], adjusted_min_workers: Dict[NodeType, int], + placement_group_nodes: Dict[NodeType, int], ) -> Dict[NodeType, int]: """Updates the max concurrent resources to launch for each node type. @@ -311,6 +331,8 @@ class ResourceDemandScheduler: min_workers and request_resources(). This overrides the launch limits since the user is hinting to immediately scale up to this size. + placement_group_nodes: Nodes to launch for placement groups. + This overrides the launch concurrency limits. Returns: Dict[NodeType, int]: Maximum number of nodes to launch for each node type. @@ -333,8 +355,9 @@ class ResourceDemandScheduler: max_allowed_pending_nodes - total_pending_nodes, # Allow more nodes if this is to respect min_workers or - # request_resources(). - adjusted_min_workers.get(node_type, 0)) + # request_resources() or placement groups. + adjusted_min_workers.get(node_type, 0) + + placement_group_nodes.get(node_type, 0)) if upper_bound > 0: updated_nodes_to_launch[node_type] = min( diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index a4bfe7393..4b2027af1 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -766,7 +766,8 @@ def test_get_concurrent_resource_demand_to_launch(): # Sanity check. updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}, {}) + scheduler._get_concurrent_resource_demand_to_launch( + {}, [], [], {}, {}, {}) assert updated_to_launch == {} provider.create_node({}, { @@ -785,11 +786,38 @@ def test_get_concurrent_resource_demand_to_launch(): connected_nodes = [] # All the non_terminated_nodes are not connected yet. updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes, {}) + pending_launches_nodes, {}, {}) # Note: we have 2 pending/launching gpus, 3 pending/launching cpus, # 0 running gpu, and 0 running cpus. assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2} + # Test min_workers bypass max launch limit. + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, + connected_nodes, + non_terminated_nodes, + pending_launches_nodes, + adjusted_min_workers={"m4.large": 40}, + placement_group_nodes={}) + assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40} + # Test placement groups bypass max launch limit. + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, + connected_nodes, + non_terminated_nodes, + pending_launches_nodes, {}, + placement_group_nodes={"m4.large": 40}) + assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40} + # Test combining min_workers and placement groups bypass max launch limit. + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, + connected_nodes, + non_terminated_nodes, + pending_launches_nodes, + adjusted_min_workers={"m4.large": 25}, + placement_group_nodes={"m4.large": 15}) + assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40} + # This starts the min workers only, so we have no more pending workers. # The workers here are either running (connected) or in # pending_launches_nodes (i.e., launching). @@ -798,7 +826,7 @@ def test_get_concurrent_resource_demand_to_launch(): ] updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes, {}) + pending_launches_nodes, {}, {}) # Note that here we have 1 launching gpu, 1 launching cpu, # 1 running gpu, and 2 running cpus. assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4} @@ -819,7 +847,7 @@ def test_get_concurrent_resource_demand_to_launch(): pending_launches_nodes = {} # No pending launches updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes, {}) + pending_launches_nodes, {}, {}) # Note: we have 5 pending cpus. So we are not allowed to start any. # Still only 2 running cpus. assert updated_to_launch == {} @@ -830,7 +858,7 @@ def test_get_concurrent_resource_demand_to_launch(): ] updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes, {}) + pending_launches_nodes, {}, {}) # Note: that here we have 7 running cpus and nothing pending/launching. assert updated_to_launch == {"m4.large": 7} @@ -846,7 +874,7 @@ def test_get_concurrent_resource_demand_to_launch(): pending_launches_nodes = {"m4.large": 1} updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes, {}) + pending_launches_nodes, {}, {}) # Note: we have 8 pending/launching cpus and only 7 running. # So we should not launch anything (8 < 7). assert updated_to_launch == {} @@ -857,24 +885,90 @@ def test_get_concurrent_resource_demand_to_launch(): ] updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes, {}) + pending_launches_nodes, {}, {}) # Note: that here we have 14 running cpus and 1 launching. assert updated_to_launch == {"m4.large": 13} +def test_get_nodes_to_launch_max_launch_concurrency_placement_groups(): + provider = MockProvider() + new_types = copy.deepcopy(TYPES_A) + new_types["p2.8xlarge"]["min_workers"] = 10 + new_types["p2.8xlarge"]["max_workers"] = 40 + + scheduler = ResourceDemandScheduler( + provider, new_types, 50, head_node_type=None) + + pending_placement_groups = [ + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 8})] * 25)) + ] + # placement groups should bypass max launch limit. + # Note that 25 = max(placement group resources=25, min_workers=10). + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, + pending_placement_groups, {}) + assert to_launch == {"p2.8xlarge": 25} + + pending_placement_groups = [ + # Requires 25 p2.8xlarge nodes. + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.STRICT_SPREAD, + bundles=([Bundle(unit_resources={"GPU": 2})] * 25)), + # Requires 5 additional nodes (total 30). + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 6})] * 30)) + ] + + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, + pending_placement_groups, {}) + # Test that combining spreads and normal placement group demands bypasses + # launch limit. + assert to_launch == {"p2.8xlarge": 30} + + pending_placement_groups = [ + # Requires 25 p2.8xlarge nodes. + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.STRICT_SPREAD, + bundles=([Bundle(unit_resources={"GPU": 2})] * 25)), + # Requires 35 additional nodes (total 60). + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 6})] * 60)) + ] + + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, + pending_placement_groups, {}) + # make sure it still respects max_workers of p2.8xlarge. + assert to_launch == {"p2.8xlarge": 40} + + scheduler.node_types["p2.8xlarge"]["max_workers"] = 60 + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, + pending_placement_groups, {}) + # make sure it still respects global max_workers constraint. + # 50 + 1 is global max_workers + head node.ß + assert to_launch == {"p2.8xlarge": 51} + + def test_get_nodes_to_launch_max_launch_concurrency(): provider = MockProvider() new_types = copy.deepcopy(TYPES_A) - new_types["p2.8xlarge"]["min_workers"] = 4 + new_types["p2.8xlarge"]["min_workers"] = 10 new_types["p2.8xlarge"]["max_workers"] = 40 scheduler = ResourceDemandScheduler( provider, new_types, 30, head_node_type=None) to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {}) - # Respects min_workers despite concurrency limitation. - assert to_launch == {"p2.8xlarge": 4} - + # Respects min_workers despite max launch limit. + assert to_launch == {"p2.8xlarge": 10} + scheduler.node_types["p2.8xlarge"]["min_workers"] = 4 provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED