[Autoscaler] Do not count unmanaged nodes in load metrics (#11458)

* fixedd

* lint

* fixed other test case

* .

Co-authored-by: Alex Wu <alex@anyscale.com>
This commit is contained in:
Alex Wu
2020-10-21 22:14:21 -07:00
committed by Alex Wu
parent a2e12ceb2a
commit c26dbc1612
2 changed files with 52 additions and 1 deletions
@@ -125,13 +125,17 @@ class LoadMetrics:
return self.dynamic_resources_by_ip
def _get_resource_usage(self):
num_nodes = len(self.static_resources_by_ip)
num_nodes = 0
nodes_used = 0.0
num_nonidle = 0
has_saturated_node = False
resources_used = {}
resources_total = {}
for ip, max_resources in self.static_resources_by_ip.items():
# Nodes without resources don't count as nodes (e.g. unmanaged
# nodes)
if any(max_resources.values()):
num_nodes += 1
avail_resources = self.dynamic_resources_by_ip[ip]
resource_load = self.resource_load_by_ip[ip]
max_frac = 0.0
+47
View File
@@ -808,6 +808,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(2)
# This node has num_cpus=0
lm.update(head_ip, {"CPU": 1}, True, {"CPU": 0}, True, {})
lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {})
autoscaler.update()
self.waitForNodes(2)
@@ -816,6 +817,52 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(3)
def testUnmanagedNodes2(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 20
config["initial_workers"] = 0
config["idle_timeout_minutes"] = 0
config["autoscaling_mode"] = "aggressive"
config["target_utilization_fraction"] = 1.0
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "unmanaged"}, 1)
unmanaged_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0]
unmanaged_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0]
runner = MockProcessRunner()
lm = LoadMetrics()
lm.local_ip = head_ip
autoscaler = StandardAutoscaler(
config_path,
lm,
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
process_runner=runner,
update_interval_s=0)
lm.update(head_ip, {"CPU": 1}, True, {"CPU": 0}, True, {"CPU": 1})
lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {})
# Note that we shouldn't autoscale here because the resource demand
# vector is not set and target utilization fraction = 1.
autoscaler.update()
# If the autoscaler was behaving incorrectly, it needs time to start
# the new node, otherwise it could scale up after this check.
time.sleep(0.2)
self.waitForNodes(2)
def testDelayedLaunch(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()