diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 6e1e8c00c..af6719019 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -215,7 +215,10 @@ class LoadMetrics(object): return self._info()["NumNodesConnected"] def get_resource_usage(self): + num_nodes = len(self.static_resources_by_ip) nodes_used = 0.0 + num_nonidle = 0 + has_saturated_node = False resources_used = {} resources_total = {} for ip, max_resources in self.static_resources_by_ip.items(): @@ -224,6 +227,7 @@ class LoadMetrics(object): max_frac = 0.0 for resource_id, amount in resource_load.items(): if amount > 0: + has_saturated_node = True max_frac = 1.0 # the resource is saturated for resource_id, amount in max_resources.items(): used = amount - avail_resources[resource_id] @@ -238,6 +242,14 @@ class LoadMetrics(object): if frac > max_frac: max_frac = frac nodes_used += max_frac + if max_frac > 0: + num_nonidle += 1 + + # If any nodes have a queue buildup, assume all non-idle nodes are 100% + # busy, plus the head node. This guards against the case of not scaling + # up due to poor task packing. + if has_saturated_node: + nodes_used = min(num_nonidle + 1.0, num_nodes) return nodes_used, resources_used, resources_total diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 52dc87f18..03fbffbcb 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -152,13 +152,34 @@ class LoadMetricsTest(unittest.TestCase): def testLoadMessages(self): lm = LoadMetrics() lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) - assert lm.approx_workers_used() == 0.5 + self.assertEqual(lm.approx_workers_used(), 0.5) lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1}) - assert lm.approx_workers_used() == 1.0 + self.assertEqual(lm.approx_workers_used(), 1.0) + + # Both nodes count as busy since there is a queue on one. + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 2}, {}) + self.assertEqual(lm.approx_workers_used(), 2.0) + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) + self.assertEqual(lm.approx_workers_used(), 2.0) lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) - assert lm.approx_workers_used() == 1.5 + self.assertEqual(lm.approx_workers_used(), 2.0) + + # No queue anymore, so we're back to exact accounting. + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + self.assertEqual(lm.approx_workers_used(), 1.5) lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1}) - assert lm.approx_workers_used() == 2.0 + self.assertEqual(lm.approx_workers_used(), 2.0) + + lm.update("3.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("4.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("5.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("6.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("7.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("8.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + self.assertEqual(lm.approx_workers_used(), 8.0) + + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) # no queue anymore + self.assertEqual(lm.approx_workers_used(), 4.5) def testPruneByNodeIp(self): lm = LoadMetrics() @@ -287,8 +308,13 @@ class AutoscalingTest(unittest.TestCase): def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, LoadMetrics(), max_failures=0, update_interval_s=0) + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(2) @@ -303,12 +329,14 @@ class AutoscalingTest(unittest.TestCase): config["worker_nodes"] = {"Resources": {"CPU": cores_per_node}} config_path = self.write_config(config) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=5, max_concurrent_launches=5, max_failures=0, + process_runner=runner, update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() @@ -329,8 +357,13 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "worker"}, 10) + runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, LoadMetrics(), max_failures=0, update_interval_s=0) + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) self.waitForNodes(10) # Gradually scales down to meet target size, never going too low @@ -345,12 +378,14 @@ class AutoscalingTest(unittest.TestCase): def testDynamicScaling(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=5, max_concurrent_launches=5, max_failures=0, + process_runner=runner, update_interval_s=0) self.waitForNodes(0) autoscaler.update() @@ -379,12 +414,14 @@ class AutoscalingTest(unittest.TestCase): config["initial_workers"] = 10 config_path = self.write_config(config) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=5, max_concurrent_launches=5, max_failures=0, + process_runner=runner, update_interval_s=0) self.waitForNodes(0) autoscaler.update() @@ -406,6 +443,7 @@ class AutoscalingTest(unittest.TestCase): self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "head"}, 1) head_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_TYPE: "head"}, )[0] + runner = MockProcessRunner() lm = LoadMetrics() lm.local_ip = head_ip @@ -416,6 +454,7 @@ class AutoscalingTest(unittest.TestCase): max_launch_batch=5, max_concurrent_launches=5, max_failures=0, + process_runner=runner, update_interval_s=0) self.waitForNodes(1) @@ -449,12 +488,14 @@ class AutoscalingTest(unittest.TestCase): def testDelayedLaunch(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=5, max_concurrent_launches=5, max_failures=0, + process_runner=runner, update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 @@ -482,12 +523,14 @@ class AutoscalingTest(unittest.TestCase): config["max_workers"] = 10 config_path = self.write_config(config) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=5, max_concurrent_launches=8, max_failures=0, + process_runner=runner, update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 @@ -535,12 +578,14 @@ class AutoscalingTest(unittest.TestCase): def testUpdateThrottling(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=5, max_concurrent_launches=5, max_failures=0, + process_runner=runner, update_interval_s=10) autoscaler.update() self.waitForNodes(2) @@ -577,11 +622,13 @@ class AutoscalingTest(unittest.TestCase): def testIgnoresCorruptedConfig(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_launch_batch=10, max_concurrent_launches=10, + process_runner=runner, max_failures=0, update_interval_s=0) autoscaler.update() @@ -607,8 +654,13 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() self.provider.throw = True + runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, LoadMetrics(), max_failures=2, update_interval_s=0) + config_path, + LoadMetrics(), + max_failures=2, + process_runner=runner, + update_interval_s=0) autoscaler.update() autoscaler.update() with pytest.raises(Exception): @@ -617,8 +669,13 @@ class AutoscalingTest(unittest.TestCase): def testLaunchNewNodeOnOutOfBandTerminate(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, LoadMetrics(), max_failures=0, update_interval_s=0) + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) autoscaler.update() autoscaler.update() self.waitForNodes(2) @@ -702,8 +759,13 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() lm = LoadMetrics() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, lm, max_failures=0, update_interval_s=0) + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(1) @@ -748,8 +810,13 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() lm = LoadMetrics() + runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, lm, max_failures=0, update_interval_s=0) + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() assert autoscaler.num_launches_pending.value == 0