[Autoscaler] Proper resource demand plumbing (#10329)

This commit is contained in:
Alex Wu
2020-08-26 23:36:01 -07:00
committed by GitHub
parent 9056854c06
commit 6d2af33a01
5 changed files with 115 additions and 18 deletions
+13 -9
View File
@@ -117,7 +117,7 @@ class StandardAutoscaler:
# Aggregate resources the user is requesting of the cluster.
self.resource_requests = defaultdict(int)
# List of resource bundles the user is requesting of the cluster.
self.resource_demand_vector = None
self.resource_demand_vector = []
logger.info("StandardAutoscaler: {}".format(self.config))
@@ -197,14 +197,18 @@ class StandardAutoscaler:
self.log_info_string(nodes, target_workers)
# First let the resource demand scheduler launch nodes, if enabled.
if self.resource_demand_scheduler and self.resource_demand_vector:
to_launch = (self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.resource_demand_vector))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch:
self.launch_new_node(count, node_type=node_type)
if self.resource_demand_scheduler:
resource_demand_vector = self.resource_demand_vector + \
self.load_metrics.get_resource_demand_vector()
if resource_demand_vector:
to_launch = (
self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
resource_demand_vector))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in to_launch:
self.launch_new_node(count, node_type=node_type)
# Launch additional nodes of the default type, if still needed.
num_workers = len(nodes) + num_pending
+14 -1
View File
@@ -23,8 +23,16 @@ class LoadMetrics:
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()
self.waiting_bundles = []
self.infeasible_bundles = []
def update(self, ip, static_resources, dynamic_resources, resource_load):
def update(self,
ip,
static_resources,
dynamic_resources,
resource_load,
waiting_bundles=[],
infeasible_bundles=[]):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources
@@ -43,6 +51,8 @@ class LoadMetrics:
static_resources != dynamic_resources:
self.last_used_time_by_ip[ip] = now
self.last_heartbeat_time_by_ip[ip] = now
self.waiting_bundles = waiting_bundles
self.infeasible_bundles = infeasible_bundles
def mark_active(self, ip):
assert ip is not None, "IP should be known at this time"
@@ -127,6 +137,9 @@ class LoadMetrics:
return nodes_used, resources_used, resources_total
def get_resource_demand_vector(self):
return self.waiting_bundles + self.infeasible_bundles
def info_string(self):
return " - " + "\n - ".join(
["{}: {}".format(k, v) for k, v in sorted(self._info().items())])
+16 -7
View File
@@ -88,7 +88,7 @@ class Monitor:
"""
self.primary_subscribe_client.psubscribe(pattern)
def handle_resource_demands(self, resource_load_by_shape):
def parse_resource_demands(self, resource_load_by_shape):
"""Handle the message.resource_load_by_shape protobuf for the demand
based autoscaling. Catch and log all exceptions so this doesn't
interfere with the utilization based autoscaler until we're confident
@@ -98,17 +98,21 @@ class Monitor:
resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands
in protobuf form or None.
"""
waiting_bundles, infeasible_bundles = [], []
try:
if not self.autoscaler:
return
bundles = []
for resource_demand_pb in list(
resource_load_by_shape.resource_demands):
request_shape = dict(resource_demand_pb.shape)
bundles.append(request_shape)
self.autoscaler.request_resources(bundles)
for _ in range(resource_demand_pb.num_ready_requests_queued):
waiting_bundles.append(request_shape)
for _ in range(
resource_demand_pb.num_infeasible_requests_queued):
infeasible_bundles.append(request_shape)
except Exception as e:
logger.exception(e)
return waiting_bundles, infeasible_bundles
def xray_heartbeat_batch_handler(self, unused_channel, data):
"""Handle an xray heartbeat batch message from Redis."""
@@ -125,16 +129,19 @@ class Monitor:
for resource in total_resources:
available_resources.setdefault(resource, 0.0)
waiting_bundles, infeasible_bundles = \
self.parse_resource_demands(message.resource_load_by_shape)
# Update the load metrics for this raylet.
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
ip = self.raylet_id_to_ip_map.get(client_id)
if ip:
self.load_metrics.update(ip, total_resources,
available_resources, resource_load)
available_resources, resource_load,
waiting_bundles, infeasible_bundles)
else:
logger.warning(
f"Monitor: could not find ip for client {client_id}")
self.handle_resource_demands(message.resource_load_by_shape)
def xray_job_notification_handler(self, unused_channel, data):
"""Handle a notification that a job has been added or removed.
@@ -153,7 +160,9 @@ class Monitor:
binary_to_hex(job_id)))
def autoscaler_resource_request_handler(self, _, data):
"""Handle a notification of a resource request for the autoscaler.
"""Handle a notification of a resource request for the autoscaler. This channel
and method are only used by the manual
`ray.autoscaler.commands.request_resources` api.
Args:
channel: unused
+19
View File
@@ -336,6 +336,25 @@ def dicts_equal(dict1, dict2, abs_tol=1e-4):
return True
def same_elements(elems_a, elems_b):
"""Checks if two iterables (such as lists) contain the same elements. Elements
do not have to be hashable (this allows us to compare sets of dicts for
example). This comparison is not necessarily efficient.
"""
a = list(elems_a)
b = list(elems_b)
for x in a:
if x not in b:
return False
for x in b:
if x not in a:
return False
return True
@ray.remote
def _put(obj):
return obj
@@ -15,6 +15,7 @@ from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
get_bin_pack_residual, get_nodes_for
from ray.test_utils import same_elements
from time import sleep
@@ -162,6 +163,24 @@ def test_get_nodes_respects_max_limit():
}] * 10) == [("m4.large", 2)]
class LoadMetricsTest(unittest.TestCase):
def testResourceDemandVector(self):
lm = LoadMetrics()
lm.update(
"1.1.1.1", {"CPU": 2}, {"CPU": 1}, {},
waiting_bundles=[{
"GPU": 1
}],
infeasible_bundles=[{
"CPU": 16
}])
assert same_elements(lm.get_resource_demand_vector(), [{
"CPU": 16
}, {
"GPU": 1
}])
class AutoscalingTest(unittest.TestCase):
def setUp(self):
NODE_PROVIDERS["mock"] = \
@@ -352,6 +371,40 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_has_call("172.0.0.1", "CPU: 32")
runner.assert_has_call("172.0.0.1", "GPU: 8")
def testScaleUpLoadMetrics(self):
config = MULTI_WORKER_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.update()
lm.update(
"1.2.3.4", {}, {}, {},
waiting_bundles=[{
"GPU": 1
}],
infeasible_bundles=[{
"CPU": 16
}])
autoscaler.update()
self.waitForNodes(2)
nodes = {
self.provider.mock_nodes[0].node_type,
self.provider.mock_nodes[1].node_type
}
assert nodes == {"p2.xlarge", "m4.4xlarge"}
def testCommandPassing(self):
t = "custom"
config = MULTI_WORKER_CLUSTER.copy()
@@ -384,7 +437,6 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
# autoscaler.request_resources([{t: 1}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.xlarge"