[autoscaler] Support min_workers for multi node type (#11041)

* prepare for head node

* move command runner interface outside _private

* remove space

* Eric

* flake

* min_workers in multi node type

* fixing edge cases

* eric not idle

* fix target_workers to consider min_workers of node types

* idle timeout

* minor

* minor fix

* test

* lint

* eric v2

* eric 3

* min_workers constraint before bin packing

* Update resource_demand_scheduler.py

* Revert "Update resource_demand_scheduler.py"

This reverts commit 818a63a2c86d8437b3ef21c5035d701c1d1127b5.

* reducing diff

Co-authored-by: Ameer Haj Ali <ameerhajali@ameers-mbp.lan>
Co-authored-by: Alex Wu <alex@anyscale.io>
Co-authored-by: Alex Wu <itswu.alex@gmail.com>
This commit is contained in:
Ameer Haj Ali
2020-09-28 22:02:01 -07:00
committed by GitHub
parent 0a6164ab15
commit 0d36e4c025
6 changed files with 286 additions and 36 deletions
+43 -12
View File
@@ -1,5 +1,5 @@
from collections import defaultdict, namedtuple
from typing import Any, Optional
from typing import Any, Optional, Dict
import copy
import logging
import math
@@ -9,6 +9,7 @@ import subprocess
import threading
import time
import yaml
import collections
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
@@ -21,7 +22,7 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler._private.resource_demand_scheduler import \
ResourceDemandScheduler
ResourceDemandScheduler, NodeType, NodeID
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
@@ -167,7 +168,13 @@ class StandardAutoscaler:
horizon = now - (60 * self.config["idle_timeout_minutes"])
nodes_to_terminate = []
node_type_counts = collections.defaultdict(int)
for node_id in nodes:
# Make sure to not kill idle node types if the number of workers
# of that type is lower/equal to the min_workers of that type.
if self._keep_min_worker_of_node_type(node_id, node_type_counts):
continue
node_ip = self.provider.internal_ip(node_id)
if (node_ip in last_used and last_used[node_ip] < horizon) and \
(len(nodes) - len(nodes_to_terminate)
@@ -203,16 +210,14 @@ class StandardAutoscaler:
if self.resource_demand_scheduler:
resource_demand_vector = self.resource_demand_vector + \
self.load_metrics.get_resource_demand_vector()
if resource_demand_vector:
to_launch = (
self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
resource_demand_vector,
self.load_metrics.get_resource_utilization()))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch:
self.launch_new_node(count, node_type=node_type)
to_launch = (self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
resource_demand_vector,
self.load_metrics.get_resource_utilization()))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch:
self.launch_new_node(count, node_type=node_type)
num_pending = self.pending_launches.value
nodes = self.workers()
@@ -274,6 +279,32 @@ class StandardAutoscaler:
for node_id in nodes:
self.recover_if_needed(node_id, now)
def _keep_min_worker_of_node_type(self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]):
"""Returns if workers of node_type should be terminated.
Receives the counters of running nodes so far and determines if idle
node_id should be terminated or not. It also updates the counters
(node_type_counts), which is returned by reference.
Args:
node_type_counts(Dict[NodeType, int]): The non_terminated node
types counted so far.
Returns:
bool: if workers of node_types should be terminated or not.
"""
if self.resource_demand_scheduler:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_type_counts[node_type] += 1
if node_type_counts[node_type] <= \
self.available_node_types[node_type].get(
"min_workers", 0):
return True
return False
def _node_resources(self, node_id):
node_type = self.provider.node_tags(node_id).get(
TAG_RAY_USER_NODE_TYPE)
@@ -48,8 +48,10 @@ class ResourceDemandScheduler:
This method:
(1) calculates the resources present in the cluster.
(2) calculates the unfulfilled resource bundles.
(3) calculates which nodes need to be launched to fulfill all
(2) calculates the remaining nodes to add to respect min_workers
constraint per node type.
(3) calculates the unfulfilled resource bundles.
(4) calculates which nodes need to be launched to fulfill all
the bundle requests, subject to max_worker constraints.
Args:
@@ -59,32 +61,47 @@ class ResourceDemandScheduler:
usage_by_ip: Mapping from ip to available resources.
"""
if resource_demands is None:
logger.info("No resource demands")
return []
node_resources, node_type_counts = \
self.calculate_node_resources(nodes, pending_nodes, usage_by_ip)
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes, usage_by_ip)
node_resources, node_type_counts, min_workers_nodes_to_add = \
_add_min_workers_nodes(
node_resources, node_type_counts, self.node_types)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
nodes = get_nodes_for(
self.node_types, node_type_counts,
self.max_workers - len(nodes) - sum(pending_nodes.values()),
unfulfilled)
logger.info("Node requests: {}".format(nodes))
return nodes
nodes_to_add_based_on_demand = []
if resource_demands is not None:
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
unfulfilled = get_bin_pack_residual(node_resources,
resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
max_to_add = self.max_workers - sum(node_type_counts.values())
nodes_to_add_based_on_demand = get_nodes_for(
self.node_types, node_type_counts, max_to_add, unfulfilled)
# Merge nodes to add based on demand and nodes to add based on
# min_workers constraint. We add them because nodes to add based on
# demand was calculated after the min_workers constraint was respected.
total_nodes_to_add = nodes_to_add_based_on_demand + \
min_workers_nodes_to_add
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
def calculate_node_resources(
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]
) -> (List[ResourceDict], Dict[NodeType, int]):
"""Returns node resource list and node type counts."""
"""Returns node resource list and node type counts.
Counts the running nodes, pending nodes.
Args:
nodes: Existing nodes.
pending_nodes: Pending nodes.
Returns:
node_resources: a list of running + pending resources.
E.g., [{"CPU": 4}, {"GPU": 2}].
node_type_counts: running + pending workers per node type.
"""
node_resources = []
node_type_counts = collections.defaultdict(int)
@@ -140,6 +157,40 @@ class ResourceDemandScheduler:
return out
def _add_min_workers_nodes(
node_resources: List[ResourceDict],
node_type_counts: Dict[NodeType, int],
node_types: Dict[NodeType, NodeTypeConfigDict],
) -> (List[ResourceDict], Dict[NodeType, int], List[Tuple[NodeType, int]]):
"""Updates resource demands to respect the min_workers constraint.
Args:
node_resources: Resources of exisiting nodes already launched/pending.
node_type_counts: Counts of existing nodes already launched/pending.
node_types: Node types config.
Returns:
node_resources: The updated node resources after adding min_workers
constraint per node type.
node_type_counts: The updated node counts after adding min_workers
constraint per node type.
total_nodes_to_add: The nodes to add to respect min_workers constraint.
"""
total_nodes_to_add_dict = {}
for node_type, config in node_types.items():
existing = node_type_counts.get(node_type, 0)
target = config.get("min_workers", 0)
if existing < target:
total_nodes_to_add_dict[node_type] = target - existing
node_type_counts[node_type] = target
available = copy.deepcopy(node_types[node_type]["resources"])
node_resources.extend(
[available] * total_nodes_to_add_dict[node_type])
return node_resources, node_type_counts, list(
total_nodes_to_add_dict.items())
def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
existing_nodes: Dict[NodeType, int], max_to_add: int,
resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]:
@@ -17,6 +17,7 @@ available_node_types:
node_config:
InstanceType: m4.xlarge
resources: {"CPU": 4}
min_workers: 1
max_workers: 5
cpu_16_spot:
node_config:
+1
View File
@@ -291,6 +291,7 @@
"type": "object",
"description": "Provider-specific config for the node, e.g. instance type."
},
"min_workers": {"type": "integer"},
"max_workers": {"type": "integer"},
"resources": {
"type": "object",
@@ -4,6 +4,7 @@ import yaml
import tempfile
import shutil
import unittest
import copy
import ray
from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \
@@ -13,9 +14,10 @@ from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.commands import get_or_create_head_node
from ray.autoscaler._private.resource_demand_scheduler import \
_utilization_score, \
_utilization_score, _add_min_workers_nodes, \
get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \
NODE_KIND_WORKER
from ray.test_utils import same_elements
from time import sleep
@@ -164,6 +166,116 @@ def test_get_nodes_respects_max_limit():
}] * 10) == [("m4.large", 2)]
def test_add_min_workers_nodes():
types = {
"m2.large": {
"resources": {
"CPU": 2
},
"min_workers": 50,
"max_workers": 100,
},
"m4.large": {
"resources": {
"CPU": 2
},
"min_workers": 0,
"max_workers": 10,
},
"gpu": {
"resources": {
"GPU": 1
},
"min_workers": 99999,
"max_workers": 99999,
},
}
assert _add_min_workers_nodes([],
{},
types) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
[("m2.large", 50), ("gpu", 99999)])
assert _add_min_workers_nodes([{"CPU": 2}]*5,
{"m2.large": 5},
types) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
[("m2.large", 45), ("gpu", 99999)])
assert _add_min_workers_nodes([{"CPU": 2}]*60,
{"m2.large": 60},
types) == \
([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999},
[("gpu", 99999)])
assert _add_min_workers_nodes([{
"CPU": 2
}] * 50 + [{
"GPU": 1
}] * 99999, {
"m2.large": 50,
"gpu": 99999
}, types) == ([{
"CPU": 2
}] * 50 + [{
"GPU": 1
}] * 99999, {
"m2.large": 50,
"gpu": 99999
}, [])
def test_get_nodes_to_launch_with_min_workers():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 2
scheduler = ResourceDemandScheduler(provider, new_types, 3)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
utilizations = {ip: {"GPU": 8} for ip in ips}
to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{
"GPU": 8
}], utilizations)
assert to_launch == [("p2.8xlarge", 1)]
def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 2
scheduler = ResourceDemandScheduler(provider, new_types, 10)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
# 1 free p2.8xls
utilizations = {ip: {"GPU": 8} for ip in ips}
# 1 more on the way
pending_nodes = {"p2.8xlarge": 1}
# requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge
demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}]
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
assert to_launch == [("p2.xlarge", 1)]
# 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand.
# 2 p2.8xlarge are running/pending. So we need 1 more p2.8xlarge only to
# meet the min_workers constraint and the demand.
new_types["p2.8xlarge"]["min_workers"] = 3
scheduler = ResourceDemandScheduler(provider, new_types, 10)
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations)
# Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)]
assert to_launch == [("p2.8xlarge", 1)]
def test_get_nodes_to_launch_limits():
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 3)
@@ -299,6 +411,58 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
def testScaleUpMinWorkers(self):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 2
config["max_workers"] = 50
config["idle_timeout_minutes"] = 1
# Since config["min_workers"] > 1, the remaining worker is started
# with the default worker node type.
config["available_node_types"]["p2.8xlarge"]["min_workers"] = 1
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(2)
assert len(self.provider.mock_nodes) == 2
assert {
self.provider.mock_nodes[0].node_type,
self.provider.mock_nodes[1].node_type
} == {"p2.8xlarge", "m4.large"}
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
}, 2)
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.16xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
}, 2)
assert len(self.provider.non_terminated_nodes({})) == 6
# Make sure that after idle_timeout_minutes we don't kill idle
# min workers.
for node_id in self.provider.non_terminated_nodes({}):
lm.last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60
autoscaler.update()
self.waitForNodes(2)
cnt = 0
for id in self.provider.mock_nodes:
if self.provider.mock_nodes[id].state == "running" or \
self.provider.mock_nodes[id].state == "pending":
assert self.provider.mock_nodes[id].node_type in {
"p2.8xlarge", "m4.large"
}
cnt += 1
assert cnt == 2
def testScaleUpIgnoreUsed(self):
config = MULTI_WORKER_CLUSTER.copy()
# Commenting out this line causes the test case to fail?!?!