diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index f3ec607df..aba8cff2d 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -192,7 +192,8 @@ class ResourceDemandScheduler: # Add 1 to account for the head node. max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) nodes_to_add_based_on_demand = get_nodes_for( - self.node_types, node_type_counts, max_to_add, unfulfilled) + self.node_types, node_type_counts, self.head_node_type, max_to_add, + unfulfilled) # Merge nodes to add based on demand and nodes to add based on # min_workers constraint. We add them because nodes to add based on # demand was calculated after the min_workers constraint was respected. @@ -447,6 +448,7 @@ class ResourceDemandScheduler: to_launch = get_nodes_for( self.node_types, node_type_counts, + self.head_node_type, max_to_add, unfulfilled, strict_spread=True) @@ -544,7 +546,7 @@ def _add_min_workers_nodes( max_node_resources, ensure_min_cluster_size) # Get the nodes to meet the unfulfilled. nodes_to_add_request_resources = get_nodes_for( - node_types, node_type_counts, max_to_add, + node_types, node_type_counts, head_node_type, max_to_add, resource_requests_unfulfilled) # Update the resources, counts and total nodes to add. for node_type in nodes_to_add_request_resources: @@ -565,6 +567,7 @@ def _add_min_workers_nodes( def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], existing_nodes: Dict[NodeType, int], + head_node_type: NodeType, max_to_add: int, resources: List[ResourceDict], strict_spread: bool = False) -> Dict[NodeType, int]: @@ -588,9 +591,13 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], while resources and sum(nodes_to_add.values()) < max_to_add: utilization_scores = [] for node_type in node_types: + max_workers_of_node_type = node_types[node_type].get( + "max_workers", 0) + if head_node_type == node_type: + # Add 1 to account for head node. + max_workers_of_node_type = max_workers_of_node_type + 1 if (existing_nodes.get(node_type, 0) + nodes_to_add.get( - node_type, 0) >= node_types[node_type].get( - "max_workers", 0)): + node_type, 0) >= max_workers_of_node_type): continue node_resources = node_types[node_type]["resources"] if strict_spread: diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 067b5f53d..2093f1e14 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -143,43 +143,100 @@ def test_bin_pack(): def test_get_nodes_packing_heuristic(): - assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \ - {"p2.8xlarge": 1} - assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \ - {"p2.8xlarge": 1} - assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \ - {"p2.xlarge": 4} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \ - == {"p2.8xlarge": 3} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \ - == {} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \ - {"m4.16xlarge": 3} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \ - == {"m4.16xlarge": 1, "m4.large": 1} + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "GPU": 8 + }]) == { + "p2.8xlarge": 1 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "GPU": 1 + }] * 6) == { + "p2.8xlarge": 1 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "GPU": 1 + }] * 4) == { + "p2.xlarge": 4 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 32, + "GPU": 1 + }] * 3) == { + "p2.8xlarge": 3 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 64, + "GPU": 1 + }] * 3) == {} + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 64 + }] * 3) == { + "m4.16xlarge": 3 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 64 + }, { + "CPU": 1 + }]) == { + "m4.16xlarge": 1, + "m4.large": 1 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 64 + }, { + "CPU": 9 + }, { + "CPU": 9 + }]) == { + "m4.16xlarge": 1, + "m4.4xlarge": 2 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 16 + }] * 5) == { + "m4.16xlarge": 1, + "m4.4xlarge": 1 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 8 + }] * 10) == { + "m4.16xlarge": 1, + "m4.4xlarge": 1 + } + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "CPU": 1 + }] * 100) == { + "m4.16xlarge": 1, + "m4.4xlarge": 2, + "m4.large": 2 + } + + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, [{ + "GPU": 1 + }] + ([{ + "CPU": 1 + }] * 64)) == { + "m4.16xlarge": 1, + "p2.xlarge": 1 + } + + assert get_nodes_for(TYPES_A, {}, "empty_node", 9999, ([{ + "GPU": 1 + }] * 8) + ([{ + "CPU": 1 + }] * 64)) == { + "m4.16xlarge": 1, + "p2.8xlarge": 1 + } + assert get_nodes_for( - TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \ - {"m4.16xlarge": 1, "m4.4xlarge": 2} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \ - {"m4.16xlarge": 1, "m4.4xlarge": 1} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \ - {"m4.16xlarge": 1, "m4.4xlarge": 1} - assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \ - {"m4.16xlarge": 1, "m4.4xlarge": 2, "m4.large": 2} - assert get_nodes_for( - TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \ - {"m4.16xlarge": 1, "p2.xlarge": 1} - assert get_nodes_for( - TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \ - {"m4.16xlarge": 1, "p2.8xlarge": 1} - assert get_nodes_for( - TYPES_A, {}, 9999, [{ + TYPES_A, {}, "empty_node", 9999, [{ "GPU": 1 }] * 8, strict_spread=False) == { "p2.8xlarge": 1 } assert get_nodes_for( - TYPES_A, {}, 9999, [{ + TYPES_A, {}, "empty_node", 9999, [{ "GPU": 1 }] * 8, strict_spread=True) == { "p2.xlarge": 8 @@ -201,22 +258,22 @@ def test_get_nodes_respects_max_limit(): "max_workers": 99999, }, } - assert get_nodes_for(types, {}, 2, [{"CPU": 1}] * 10) == \ + assert get_nodes_for(types, {}, "empty_node", 2, [{"CPU": 1}] * 10) == \ {"m4.large": 2} - assert get_nodes_for(types, {"m4.large": 9999}, 9999, [{ + assert get_nodes_for(types, {"m4.large": 9999}, "empty_node", 9999, [{ "CPU": 1 }] * 10) == {} - assert get_nodes_for(types, {"m4.large": 0}, 9999, [{ + assert get_nodes_for(types, {"m4.large": 0}, "empty_node", 9999, [{ "CPU": 1 }] * 10) == { "m4.large": 5 } - assert get_nodes_for(types, {"m4.large": 7}, 4, [{ + assert get_nodes_for(types, {"m4.large": 7}, "m4.large", 4, [{ "CPU": 1 }] * 10) == { - "m4.large": 3 + "m4.large": 4 } - assert get_nodes_for(types, {"m4.large": 7}, 2, [{ + assert get_nodes_for(types, {"m4.large": 7}, "m4.large", 2, [{ "CPU": 1 }] * 10) == { "m4.large": 2 @@ -1355,6 +1412,7 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() + runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE: "empty_node" @@ -1379,6 +1437,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.request_resources([{"CPU": 32}] * 4) autoscaler.update() self.waitForNodes(5) + assert self.provider.mock_nodes[3].node_type == "m4.16xlarge" assert self.provider.mock_nodes[4].node_type == "m4.16xlarge"