[Autoscaler] Placement group autoscaling (#11243)

This commit is contained in:
Alex Wu
2020-10-14 13:11:46 -07:00
committed by GitHub
parent aefcf901d3
commit 7466ce82df
5 changed files with 455 additions and 58 deletions
+5 -2
View File
@@ -211,11 +211,14 @@ class StandardAutoscaler:
if self.resource_demand_scheduler:
resource_demand_vector = self.resource_demand_vector + \
self.load_metrics.get_resource_demand_vector()
to_launch = (self.resource_demand_scheduler.get_nodes_to_launch(
pending_placement_groups = \
self.load_metrics.get_pending_placement_groups()
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
resource_demand_vector,
self.load_metrics.get_resource_utilization()))
self.load_metrics.get_resource_utilization(),
pending_placement_groups)
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
+18 -8
View File
@@ -1,9 +1,11 @@
import logging
import time
from typing import Dict, List
import numpy as np
import ray._private.services as services
from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES
from ray.gcs_utils import PlacementGroupTableData
logger = logging.getLogger(__name__)
@@ -26,16 +28,18 @@ class LoadMetrics:
) if local_ip is None else local_ip
self.waiting_bundles = []
self.infeasible_bundles = []
self.pending_placement_groups = []
def update(self,
ip,
static_resources,
update_dynamic_resources,
dynamic_resources,
update_resource_load,
resource_load,
waiting_bundles=None,
infeasible_bundles=None):
ip: str,
static_resources: Dict[str, Dict],
update_dynamic_resources: bool,
dynamic_resources: Dict[str, Dict],
update_resource_load: bool,
resource_load: Dict[str, Dict],
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None):
# If light heartbeat enabled, only resources changed will be received.
# We should update the changed part and compare static_resources with
# dynamic_resources using those updated.
@@ -50,6 +54,8 @@ class LoadMetrics:
waiting_bundles = []
if not infeasible_bundles:
infeasible_bundles = []
if not pending_placement_groups:
pending_placement_groups = []
# We are not guaranteed to have a corresponding dynamic resource
# for every static resource because dynamic resources are based on
@@ -69,6 +75,7 @@ class LoadMetrics:
self.last_heartbeat_time_by_ip[ip] = now
self.waiting_bundles = waiting_bundles
self.infeasible_bundles = infeasible_bundles
self.pending_placement_groups = pending_placement_groups
def mark_active(self, ip):
assert ip is not None, "IP should be known at this time"
@@ -159,6 +166,9 @@ class LoadMetrics:
def get_resource_demand_vector(self):
return self.waiting_bundles + self.infeasible_bundles
def get_pending_placement_groups(self):
return self.pending_placement_groups
def info_string(self):
return " - " + "\n - ".join(
["{}: {}".format(k, v) for k, v in sorted(self._info().items())])
@@ -11,9 +11,12 @@ import copy
import numpy as np
import logging
import collections
from numbers import Number
from typing import List, Dict
from ray.autoscaler.node_provider import NodeProvider
from ray.gcs_utils import PlacementGroupTableData
from ray.core.generated.common_pb2 import PlacementStrategy
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, \
STATUS_UPDATE_FAILED, STATUS_UP_TO_DATE, TAG_RAY_NODE_STATUS
@@ -26,7 +29,7 @@ NodeType = str
NodeTypeConfigDict = str
# e.g., {"GPU": 1}.
ResourceDict = str
ResourceDict = Dict[str, Number]
# e.g., IP address of the node.
NodeID = str
@@ -43,15 +46,19 @@ class ResourceDemandScheduler:
def get_nodes_to_launch(
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict],
usage_by_ip: Dict[str, ResourceDict]) -> Dict[NodeType, int]:
usage_by_ip: Dict[str, ResourceDict],
pending_placement_groups: List[PlacementGroupTableData]
) -> Dict[NodeType, int]:
"""Given resource demands, return node types to add to the cluster.
This method:
(1) calculates the resources present in the cluster.
(2) calculates the remaining nodes to add to respect min_workers
constraint per node type.
(3) calculates the unfulfilled resource bundles.
(4) calculates which nodes need to be launched to fulfill all
constraint per node type.
(3) for each strict spread placement group, reserve space on
available nodes and launch new nodes if necessary.
(4) calculates the unfulfilled resource bundles.
(5) calculates which nodes need to be launched to fulfill all
the bundle requests, subject to max_worker constraints.
Args:
@@ -61,31 +68,44 @@ class ResourceDemandScheduler:
usage_by_ip: Mapping from ip to available resources.
"""
node_resources: List[ResourceDict]
node_type_counts: Dict[NodeType, int]
node_resources, node_type_counts = \
self.calculate_node_resources(nodes, pending_nodes, usage_by_ip)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
# Step 2: add nodes to add to satisfy min_workers for each type
node_resources, node_type_counts, min_workers_nodes_to_add = \
_add_min_workers_nodes(
node_resources, node_type_counts, self.node_types)
nodes_to_add_based_on_demand = {}
if resource_demands is not None:
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
unfulfilled = get_bin_pack_residual(node_resources,
resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
max_to_add = self.max_workers - sum(node_type_counts.values())
nodes_to_add_based_on_demand = get_nodes_for(
self.node_types, node_type_counts, max_to_add, unfulfilled)
# Step 3: add nodes for strict spread groups
logger.info(f"Placement group demands: {pending_placement_groups}")
placement_group_demand_vector, strict_spreads = \
placement_groups_to_resource_demands(pending_placement_groups)
resource_demands.extend(placement_group_demand_vector)
placement_group_nodes_to_add, node_resources, node_type_counts = \
self.reserve_and_allocate_spread(
strict_spreads, node_resources, node_type_counts)
# Step 4/5: add nodes for pending tasks, actors, and non-strict spread
# groups
unfulfilled, _ = get_bin_pack_residual(node_resources,
resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
max_to_add = self.max_workers - sum(node_type_counts.values())
nodes_to_add_based_on_demand = get_nodes_for(
self.node_types, node_type_counts, max_to_add, unfulfilled)
# Merge nodes to add based on demand and nodes to add based on
# min_workers constraint. We add them because nodes to add based on
# demand was calculated after the min_workers constraint was respected.
total_nodes_to_add = {}
for node_type in self.node_types:
nodes_to_add = nodes_to_add_based_on_demand.get(node_type, 0) + \
min_workers_nodes_to_add.get(node_type, 0)
nodes_to_add = (min_workers_nodes_to_add.get(
node_type, 0) + placement_group_nodes_to_add.get(node_type, 0)
+ nodes_to_add_based_on_demand.get(node_type, 0))
if nodes_to_add > 0:
total_nodes_to_add[node_type] = nodes_to_add
@@ -220,6 +240,53 @@ class ResourceDemandScheduler:
return node_resources, node_type_counts
def reserve_and_allocate_spread(self,
strict_spreads: List[List[ResourceDict]],
node_resources: List[ResourceDict],
node_type_counts: Dict[NodeType, int]):
"""For each strict spread, attempt to reserve as much space as possible
on the node, then allocate new nodes for the unfulfilled portion.
Args:
strict_spreads (List[List[ResourceDict]]): A list of placement
groups which must be spread out.
node_resources (List[ResourceDict]): Available node resources in
the cluster.
node_type_counts (Dict[NodeType, int]): The amount of each type of
node pending or in the cluster.
Returns:
Dict[NodeType, int]: Nodes to add.
List[ResourceDict]: The updated node_resources after the method.
Dict[NodeType, int]: The updated node_type_counts.
"""
to_add = collections.defaultdict(int)
for bundles in strict_spreads:
# Try to pack as many bundles of this group as possible on existing
# nodes. The remaining will be allocated on new nodes.
unfulfilled, node_resources = get_bin_pack_residual(
node_resources, bundles, strict_spread=True)
max_to_add = self.max_workers - sum(node_type_counts.values())
# Allocate new nodes for the remaining bundles that don't fit.
to_launch = get_nodes_for(
self.node_types,
node_type_counts,
max_to_add,
unfulfilled,
strict_spread=True)
_inplace_add(node_type_counts, to_launch)
_inplace_add(to_add, to_launch)
new_node_resources = _node_type_counts_to_node_resources(
self.node_types, to_launch)
# Update node resources to include newly launched nodes and their
# bundles.
unfulfilled, including_reserved = get_bin_pack_residual(
new_node_resources, unfulfilled, strict_spread=True)
assert not unfulfilled
node_resources += including_reserved
return to_add, node_resources, node_type_counts
def debug_string(self, nodes: List[NodeID],
pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]) -> str:
@@ -235,6 +302,19 @@ class ResourceDemandScheduler:
return out
def _node_type_counts_to_node_resources(
node_types: Dict[NodeType, NodeTypeConfigDict],
node_type_counts: Dict[NodeType, int]) -> List[ResourceDict]:
"""Converts a node_type_counts dict into a list of node_resources."""
resources = []
for node_type, count in node_type_counts.items():
# Be careful, each entry in the list must be deep copied!
resources += [
node_types[node_type]["resources"].copy() for _ in range(count)
]
return resources
def _add_min_workers_nodes(
node_resources: List[ResourceDict],
node_type_counts: Dict[NodeType, int],
@@ -269,8 +349,10 @@ def _add_min_workers_nodes(
def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
existing_nodes: Dict[NodeType, int], max_to_add: int,
resources: List[ResourceDict]) -> Dict[NodeType, int]:
existing_nodes: Dict[NodeType, int],
max_to_add: int,
resources: List[ResourceDict],
strict_spread: bool = False) -> Dict[NodeType, int]:
"""Determine nodes to add given resource demands and constraints.
Args:
@@ -279,12 +361,14 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
This sets constraints on the number of new nodes to add.
max_to_add: global constraint on nodes to add.
resources: resource demands to fulfill.
strict_spread: If true, each element in `resources` must be placed on a
different node.
Returns:
Dict of count to add for each node type.
"""
nodes_to_add = collections.defaultdict(int)
allocated_resources = []
while resources and sum(nodes_to_add.values()) < max_to_add:
utilization_scores = []
@@ -293,12 +377,21 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
node_type, 0) >= node_types[node_type]["max_workers"]):
continue
node_resources = node_types[node_type]["resources"]
score = _utilization_score(node_resources, resources)
if strict_spread:
# If handling strict spread, only one bundle can be placed on
# the node.
score = _utilization_score(node_resources, [resources[0]])
else:
score = _utilization_score(node_resources, resources)
if score is not None:
utilization_scores.append((score, node_type))
# Give up, no feasible node.
if not utilization_scores:
# TODO (Alex): We will hit this case every time a placement group
# starts up because placement groups are scheduled via custom
# resources. This will behave properly with the current utilization
# score heuristic, but it's a little dangerous and misleading.
logger.info(
"No feasible node type to add for {}".format(resources))
break
@@ -306,10 +399,14 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
utilization_scores = sorted(utilization_scores, reverse=True)
best_node_type = utilization_scores[0][1]
nodes_to_add[best_node_type] += 1
allocated_resources.append(node_types[best_node_type]["resources"])
residual = get_bin_pack_residual(allocated_resources[-1:], resources)
assert len(residual) < len(resources), (resources, residual)
resources = residual
if strict_spread:
resources = resources[1:]
else:
allocated_resource = node_types[best_node_type]["resources"]
residual, _ = get_bin_pack_residual([allocated_resource],
resources)
assert len(residual) < len(resources), (resources, residual)
resources = residual
return nodes_to_add
@@ -336,9 +433,9 @@ def _utilization_score(node_resources: ResourceDict,
return (min(util_by_resources), np.mean(util_by_resources))
def get_bin_pack_residual(
node_resources: List[ResourceDict],
resource_demands: List[ResourceDict]) -> List[ResourceDict]:
def get_bin_pack_residual(node_resources: List[ResourceDict],
resource_demands: List[ResourceDict],
strict_spread: bool = False) -> List[ResourceDict]:
"""Return a subset of resource_demands that cannot fit in the cluster.
TODO(ekl): this currently does not guarantee the resources will be packed
@@ -349,26 +446,37 @@ def get_bin_pack_residual(
node_resources (List[ResourceDict]): List of resources per node.
resource_demands (List[ResourceDict]): List of resource bundles that
need to be bin packed onto the nodes.
strict_spread (bool): If true, each element in resource_demands must be
placed on a different entry in `node_resources`.
Returns:
List[ResourceDict] the residual list resources that do not fit.
"""
unfulfilled = []
# A most naive bin packing algorithm.
nodes = copy.deepcopy(node_resources)
used = []
for demand in resource_demands:
found = False
for node in nodes:
node = None
for i in range(len(nodes)):
node = nodes[i]
if _fits(node, demand):
_inplace_subtract(node, demand)
found = True
# In the strict_spread case, we can't reuse nodes.
if strict_spread:
used.append(node)
del nodes[i]
break
if not found:
if found and node:
_inplace_subtract(node, demand)
else:
unfulfilled.append(demand)
return unfulfilled
return unfulfilled, nodes + used
def _fits(node: ResourceDict, resources: ResourceDict) -> bool:
@@ -383,3 +491,54 @@ def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None:
assert k in node, (k, node)
node[k] -= v
assert node[k] >= 0.0, (node, k, v)
def _inplace_add(a: collections.defaultdict, b: Dict) -> None:
"""Generically adds values in `b` to `a`.
a[k] should be defined for all k in b.keys()"""
for k, v in b.items():
a[k] += v
def placement_groups_to_resource_demands(
pending_placement_groups: List[PlacementGroupTableData]):
"""Preprocess placement group requests into regular resource demand vectors
when possible. The policy is:
* STRICT_PACK - Convert to a single bundle.
* PACK - Flatten into a resource demand vector.
* STRICT_SPREAD - Cannot be converted.
* SPREAD - Flatten into a resource demand vector.
Args:
pending_placement_groups (List[PlacementGroupData]): List of
PlacementGroupLoad's.
Returns:
List[ResourceDict]: The placement groups which were converted to a
resource demand vector.
List[List[ResourceDict]]: The placement groups which should be strictly
spread.
"""
resource_demand_vector = []
unconverted = []
for placement_group in pending_placement_groups:
shapes = [
dict(bundle.unit_resources) for bundle in placement_group.bundles
]
if (placement_group.strategy == PlacementStrategy.PACK
or placement_group.strategy == PlacementStrategy.SPREAD):
resource_demand_vector.extend(shapes)
elif placement_group.strategy == PlacementStrategy.STRICT_PACK:
combined = collections.defaultdict(float)
for shape in shapes:
for label, quantity in shape.items():
combined[label] += quantity
resource_demand_vector.append(combined)
elif (placement_group.strategy == PlacementStrategy.STRICT_SPREAD):
unconverted.append(shapes)
else:
logger.error(
f"Unknown placement group request type: {placement_group}. "
f"Please file a bug report "
f"https://github.com/ray-project/ray/issues/new.")
return resource_demand_vector, unconverted
+5 -1
View File
@@ -143,6 +143,9 @@ class Monitor:
waiting_bundles, infeasible_bundles = \
parse_resource_demands(message.resource_load_by_shape)
pending_placement_groups = list(
message.placement_group_load.placement_group_data)
# Update the load metrics for this raylet.
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
ip = self.raylet_id_to_ip_map.get(client_id)
@@ -154,7 +157,8 @@ class Monitor:
self.load_metrics.update(
ip, total_resources, update_available_resources,
available_resources, update_resource_load, resource_load,
waiting_bundles, infeasible_bundles)
waiting_bundles, infeasible_bundles,
pending_placement_groups)
else:
logger.warning(
f"Monitor: could not find ip for client {client_id}")
@@ -17,6 +17,8 @@ from ray.autoscaler._private.commands import get_or_create_head_node
from ray.autoscaler._private.resource_demand_scheduler import \
_utilization_score, _add_min_workers_nodes, \
get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler
from ray.gcs_utils import PlacementGroupTableData
from ray.core.generated.common_pb2 import Bundle, PlacementStrategy
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \
NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UNINITIALIZED
@@ -94,15 +96,44 @@ def test_util_score():
def test_bin_pack():
assert get_bin_pack_residual([], [{"GPU": 2}, {"GPU": 2}]) == \
assert get_bin_pack_residual([], [{"GPU": 2}, {"GPU": 2}])[0] == \
[{"GPU": 2}, {"GPU": 2}]
assert get_bin_pack_residual([{"GPU": 2}], [{"GPU": 2}, {"GPU": 2}]) == \
[{"GPU": 2}]
assert get_bin_pack_residual([{"GPU": 4}], [{"GPU": 2}, {"GPU": 2}]) == []
assert get_bin_pack_residual([{"GPU": 2}], [{"GPU": 2}, {"GPU": 2}])[0] \
== [{"GPU": 2}]
assert get_bin_pack_residual([{
"GPU": 4
}], [{
"GPU": 2
}, {
"GPU": 2
}])[0] == []
arg = [{"GPU": 2}, {"GPU": 2, "CPU": 2}]
assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == []
assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}])[0] == []
arg = [{"CPU": 2}, {"GPU": 2}]
assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [{"GPU": 2}]
assert get_bin_pack_residual(arg, [{
"GPU": 2
}, {
"GPU": 2
}])[0] == [{
"GPU": 2
}]
arg = [{"GPU": 3}]
assert get_bin_pack_residual(
arg, [{
"GPU": 1
}, {
"GPU": 1
}], strict_spread=False)[0] == []
assert get_bin_pack_residual(
arg, [{
"GPU": 1
}, {
"GPU": 1
}], strict_spread=True) == ([{
"GPU": 1
}], [{
"GPU": 2
}])
def test_get_nodes_packing_heuristic():
@@ -135,6 +166,18 @@ def test_get_nodes_packing_heuristic():
assert get_nodes_for(
TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \
{"m4.16xlarge": 1, "p2.8xlarge": 1}
assert get_nodes_for(
TYPES_A, {}, 9999, [{
"GPU": 1
}] * 8, strict_spread=False) == {
"p2.8xlarge": 1
}
assert get_nodes_for(
TYPES_A, {}, 9999, [{
"GPU": 1
}] * 8, strict_spread=True) == {
"p2.xlarge": 8
}
def test_get_nodes_respects_max_limit():
@@ -248,7 +291,7 @@ def test_get_nodes_to_launch_with_min_workers():
to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{
"GPU": 8
}], utilizations)
}], utilizations, [])
assert to_launch == {"p2.8xlarge": 1}
@@ -270,7 +313,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
# requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge
demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}]
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
utilizations, [])
assert to_launch == {"p2.xlarge": 1}
# 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand.
@@ -279,7 +322,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
new_types["p2.8xlarge"]["min_workers"] = 3
scheduler = ResourceDemandScheduler(provider, new_types, 10)
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
utilizations, [])
# Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)]
assert to_launch == {"p2.8xlarge": 1}
@@ -297,7 +340,7 @@ def test_get_nodes_to_launch_limits():
to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{
"GPU": 8
}] * 2, utilizations)
}] * 2, utilizations, [])
assert to_launch == {}
@@ -317,11 +360,101 @@ def test_calculate_node_resources():
# requires 4 p2.8xls (only 3 are in cluster/pending)
demands = [{"GPU": 8}] * (len(utilizations) + 2)
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
utilizations, [])
assert to_launch == {"p2.8xlarge": 1}
class TestPlacementGroupScaling:
def test_strategies(self):
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
# At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is
# fully idle.
nodes = provider.non_terminated_nodes({})
resource_demands = [{"GPU": 4}] * 2
pending_placement_groups = [
# Requires a new node (only uses 2 GPUs on it though).
PlacementGroupTableData(
state=PlacementGroupTableData.PENDING,
strategy=PlacementStrategy.STRICT_SPREAD,
bundles=[
Bundle(unit_resources={"GPU": 2}),
Bundle(unit_resources={"GPU": 2}),
Bundle(unit_resources={"GPU": 2})
]),
# Requires a new node (uses the whole node).
PlacementGroupTableData(
state=PlacementGroupTableData.PENDING,
strategy=PlacementStrategy.STRICT_PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 4)),
# Fits across the machines that strict spread.
PlacementGroupTableData(
# runs on.
state=PlacementGroupTableData.PENDING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
# Fits across the machines that strict spread.
PlacementGroupTableData(
# runs on.
state=PlacementGroupTableData.PENDING,
strategy=PlacementStrategy.SPREAD,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
]
to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands,
{}, pending_placement_groups)
assert to_launch == {"p2.8xlarge": 2}
def test_many_strict_spreads(self):
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
# At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is
# fully idle.
nodes = provider.non_terminated_nodes({})
resource_demands = [{"GPU": 1}] * 6
pending_placement_groups = [
# Requires a new node (only uses 2 GPUs on it though).
PlacementGroupTableData(
state=PlacementGroupTableData.PENDING,
strategy=PlacementStrategy.STRICT_SPREAD,
bundles=[Bundle(unit_resources={"GPU": 2})] * 3),
]
# Each placement group will take up 2 GPUs per node, but the distinct
# placement groups should still reuse the same nodes.
pending_placement_groups = pending_placement_groups * 3
to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands,
{}, pending_placement_groups)
assert to_launch == {"p2.8xlarge": 1}
def test_packing(self):
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
# At this point our cluster has 1 p2.8xlarge instances (8 GPUs) and is
# fully idle.
nodes = provider.non_terminated_nodes({})
resource_demands = [{"GPU": 1}] * 2
pending_placement_groups = [
PlacementGroupTableData(
state=PlacementGroupTableData.PENDING,
strategy=PlacementStrategy.STRICT_PACK,
bundles=[Bundle(unit_resources={"GPU": 2})] * 3),
]
# The 2 resource demand gpus should still be packed onto the same node
# as the 6 GPU placement group.
to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands,
{}, pending_placement_groups)
assert to_launch == {}
def test_get_concurrent_resource_demand_to_launch():
node_types = copy.deepcopy(TYPES_A)
node_types["p2.8xlarge"]["min_workers"] = 1
@@ -441,7 +574,7 @@ def test_get_nodes_to_launch_max_launch_concurrency():
scheduler = ResourceDemandScheduler(provider, new_types, 30)
to_launch = scheduler.get_nodes_to_launch([], {}, [], [])
to_launch = scheduler.get_nodes_to_launch([], {}, [], [], [])
# Respects min_workers despite concurrency limitation.
assert to_launch == {"p2.8xlarge": 4}
@@ -456,7 +589,7 @@ def test_get_nodes_to_launch_max_launch_concurrency():
# requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running}
demands = [{"GPU": 8}] * (len(utilizations) + 40)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations)
utilizations, [])
# Enforces max launch to 5 when < 5 running. 2 are pending/launching.
assert to_launch == {"p2.8xlarge": 3}
@@ -471,7 +604,7 @@ def test_get_nodes_to_launch_max_launch_concurrency():
# requires 17 p2.8xls (currently 1 pending, 1 launching, 8 running}
demands = [{"GPU": 8}] * (len(utilizations) + 15)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations)
utilizations, [])
# We are allowed to launch up to 8 more since 8 are running.
# We already have 2 pending/launching, so only 6 remain.
assert to_launch == {"p2.8xlarge": 6}
@@ -496,6 +629,25 @@ class LoadMetricsTest(unittest.TestCase):
"GPU": 1
}])
def testPlacementGroupLoad(self):
lm = LoadMetrics()
pending_placement_groups = [
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.SPREAD,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
]
lm.update(
"1.1.1.1", {},
True, {},
True, {},
pending_placement_groups=pending_placement_groups)
assert lm.get_pending_placement_groups() == pending_placement_groups
class AutoscalingTest(unittest.TestCase):
def setUp(self):
@@ -577,6 +729,75 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
def testPlacementGroup(self):
# Note this is mostly an integration test. See
# testPlacementGroupScaling for more comprehensive tests.
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 0
config["max_workers"] = 999
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "m4.4xlarge"
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
pending_placement_groups = [
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.STRICT_SPREAD,
bundles=[Bundle(unit_resources={"GPU": 2})] * 3),
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 5)),
]
# Since placement groups are implemented with custom resources, this is
# an example of the accompanying resource demands. Note the resource
# demand autoscaler will be unable to fulfill these demands, but we
# should still handle the other infeasible/waiting bundles.
placement_group_resource_demands = [{
"GPU_group_0_6c2506ac733bc37496295b02c4fad446": 0.0101,
"GPU_group_6c2506ac733bc37496295b02c4fad446": 0.0101
}]
lm.update(
head_ip, {"CPU": 16},
True, {"CPU": 16},
False, {},
infeasible_bundles=placement_group_resource_demands,
waiting_bundles=[{
"GPU": 8
}],
pending_placement_groups=pending_placement_groups)
autoscaler.update()
self.waitForNodes(5)
for i in range(1, 5):
assert self.provider.mock_nodes[i].node_type == "p2.8xlarge"
pending_placement_groups = [
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.STRICT_PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 4)),
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.SPREAD,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
]
def testScaleUpMinWorkers(self):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 2