diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 060a56690..4f45230a2 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -125,13 +125,17 @@ class LoadMetrics: return self.dynamic_resources_by_ip def _get_resource_usage(self): - num_nodes = len(self.static_resources_by_ip) + num_nodes = 0 nodes_used = 0.0 num_nonidle = 0 has_saturated_node = False resources_used = {} resources_total = {} for ip, max_resources in self.static_resources_by_ip.items(): + # Nodes without resources don't count as nodes (e.g. unmanaged + # nodes) + if any(max_resources.values()): + num_nodes += 1 avail_resources = self.dynamic_resources_by_ip[ip] resource_load = self.resource_load_by_ip[ip] max_frac = 0.0 diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index e6be2854f..eb657cc4c 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -808,6 +808,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) # This node has num_cpus=0 + lm.update(head_ip, {"CPU": 1}, True, {"CPU": 0}, True, {}) lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {}) autoscaler.update() self.waitForNodes(2) @@ -816,6 +817,52 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(3) + def testUnmanagedNodes2(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 20 + config["initial_workers"] = 0 + config["idle_timeout_minutes"] = 0 + config["autoscaling_mode"] = "aggressive" + config["target_utilization_fraction"] = 1.0 + config_path = self.write_config(config) + + self.provider = MockProvider() + self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1) + head_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] + + self.provider.create_node({}, {TAG_RAY_NODE_KIND: "unmanaged"}, 1) + unmanaged_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0] + unmanaged_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0] + + runner = MockProcessRunner() + + lm = LoadMetrics() + lm.local_ip = head_ip + + autoscaler = StandardAutoscaler( + config_path, + lm, + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + process_runner=runner, + update_interval_s=0) + + lm.update(head_ip, {"CPU": 1}, True, {"CPU": 0}, True, {"CPU": 1}) + lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {}) + + # Note that we shouldn't autoscale here because the resource demand + # vector is not set and target utilization fraction = 1. + autoscaler.update() + # If the autoscaler was behaving incorrectly, it needs time to start + # the new node, otherwise it could scale up after this check. + time.sleep(0.2) + self.waitForNodes(2) + def testDelayedLaunch(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider()