[Autoscaler] Demand autoscaler take into account utilized resources (#10464)

This commit is contained in:
Alex Wu
2020-09-06 20:54:44 -07:00
committed by GitHub
parent c5e9bafe15
commit 8906c1a59f
5 changed files with 97 additions and 21 deletions
+7 -2
View File
@@ -199,11 +199,15 @@ class StandardAutoscaler:
self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
resource_demand_vector))
resource_demand_vector,
self.load_metrics.get_resource_utilization()))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch:
self.launch_new_node(count, node_type=node_type)
num_pending = self.pending_launches.value
nodes = self.workers()
# Launch additional nodes of the default type, if still needed.
num_workers = len(nodes) + num_pending
if num_workers < target_workers:
@@ -509,7 +513,8 @@ class StandardAutoscaler:
tmp += "\n"
if self.resource_demand_scheduler:
tmp += self.resource_demand_scheduler.debug_string(
nodes, self.pending_launches.breakdown())
nodes, self.pending_launches.breakdown(),
self.load_metrics.get_resource_utilization())
if _internal_kv_initialized():
_internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True)
logger.info(tmp)
+9 -4
View File
@@ -16,13 +16,14 @@ class LoadMetrics:
can be removed.
"""
def __init__(self):
def __init__(self, local_ip=None):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()
self.local_ip = services.get_node_ip_address(
) if local_ip is None else local_ip
self.waiting_bundles = []
self.infeasible_bundles = []
@@ -98,7 +99,10 @@ class LoadMetrics:
"""
return self.static_resources_by_ip.values()
def get_resource_usage(self):
def get_resource_utilization(self):
return self.dynamic_resources_by_ip
def _get_resource_usage(self):
num_nodes = len(self.static_resources_by_ip)
nodes_used = 0.0
num_nonidle = 0
@@ -145,7 +149,8 @@ class LoadMetrics:
["{}: {}".format(k, v) for k, v in sorted(self._info().items())])
def _info(self):
nodes_used, resources_used, resources_total = self.get_resource_usage()
nodes_used, resources_used, resources_total = self._get_resource_usage(
)
now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
@@ -39,11 +39,10 @@ class ResourceDemandScheduler:
self.node_types = node_types
self.max_workers = max_workers
# TODO(ekl) take into account existing utilization of node resources. We
# should subtract these from node resources prior to running bin packing.
def get_nodes_to_launch(self, nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict]
resource_demands: List[ResourceDict],
usage_by_ip: Dict[str, ResourceDict]
) -> List[Tuple[NodeType, int]]:
"""Given resource demands, return node types to add to the cluster.
@@ -64,7 +63,8 @@ class ResourceDemandScheduler:
return []
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes)
nodes, pending_nodes, usage_by_ip)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
@@ -72,34 +72,43 @@ class ResourceDemandScheduler:
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
nodes = get_nodes_for(self.node_types, node_type_counts,
self.max_workers - len(nodes), unfulfilled)
nodes = get_nodes_for(
self.node_types, node_type_counts,
self.max_workers - len(nodes) - sum(pending_nodes.values()),
unfulfilled)
logger.info("Node requests: {}".format(nodes))
return nodes
def calculate_node_resources(
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]
) -> (List[ResourceDict], Dict[NodeType, int]):
"""Returns node resource list and node type counts."""
node_resources = []
node_type_counts = collections.defaultdict(int)
def add_node(node_type):
def add_node(node_type, existing_resource_usages=None):
if node_type not in self.node_types:
raise RuntimeError("Missing entry for node_type {} in "
"available_node_types config: {}".format(
node_type, self.node_types))
# Careful not to include the same dict object multiple times.
node_resources.append(
copy.deepcopy(self.node_types[node_type]["resources"]))
available = copy.deepcopy(self.node_types[node_type]["resources"])
if existing_resource_usages:
for resource, used in existing_resource_usages.items():
available[resource] -= used
node_resources.append(available)
node_type_counts[node_type] += 1
for node_id in nodes:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
add_node(node_type)
ip = self.provider.internal_ip(node_id)
resources = usage_by_ip.get(ip, {})
add_node(node_type, resources)
for node_type, count in pending_nodes.items():
for _ in range(count):
@@ -108,9 +117,11 @@ class ResourceDemandScheduler:
return node_resources, node_type_counts
def debug_string(self, nodes: List[NodeID],
pending_nodes: Dict[NodeID, int]) -> str:
pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]) -> str:
print(f"{usage_by_ip}")
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes)
nodes, pending_nodes, usage_by_ip)
out = "Worker node types:"
for node_type, count in node_type_counts.items():
+1 -1
View File
@@ -76,7 +76,7 @@ def setup_monitor(address):
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
while True:
monitor.process_messages()
resource_usage = monitor.load_metrics.get_resource_usage()
resource_usage = monitor.load_metrics._get_resource_usage()
if "memory" in resource_usage[1]:
del resource_usage[1]["memory"]
@@ -14,7 +14,7 @@ from ray.autoscaler.node_provider import NODE_PROVIDERS
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
get_bin_pack_residual, get_nodes_for
get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler
from ray.test_utils import same_elements
from time import sleep
@@ -163,6 +163,23 @@ def test_get_nodes_respects_max_limit():
}] * 10) == [("m4.large", 2)]
def test_get_nodes_to_launch_limits():
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 3)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
utilizations = {ip: {"GPU": 8} for ip in ips}
to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{
"GPU": 8
}] * 2, utilizations)
assert to_launch == []
class LoadMetricsTest(unittest.TestCase):
def testResourceDemandVector(self):
lm = LoadMetrics()
@@ -260,6 +277,44 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
def testScaleUpIgnoreUsed(self):
config = MULTI_WORKER_CLUSTER.copy()
# Commenting out this line causes the test case to fail?!?!
config["min_workers"] = 0
config["target_utilization_fraction"] = 1.0
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "p2.xlarge"
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
self.provider.finish_starting_nodes()
runner = MockProcessRunner()
lm = LoadMetrics(local_ip=head_ip)
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(1)
lm.update(head_ip, {"CPU": 4, "GPU": 1}, {}, {})
self.waitForNodes(1)
lm.update(
head_ip, {
"CPU": 4,
"GPU": 1
}, {"GPU": 1}, {},
waiting_bundles=[{
"GPU": 1
}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.xlarge"
def testRequestBundlesAccountsForHeadNode(self):
config = MULTI_WORKER_CLUSTER.copy()
config["head_node_type"] = "p2.8xlarge"