diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index c8b852c4e..e56dc36fc 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -359,6 +359,7 @@ class StandardAutoscaler(object): print(self.debug_string(nodes)) self.load_metrics.prune_active_ips( [self.provider.internal_ip(node_id) for node_id in nodes]) + target_workers = self.target_num_workers() # Terminate any idle or out of date nodes last_used = self.load_metrics.last_used_time_by_ip @@ -367,7 +368,7 @@ class StandardAutoscaler(object): for node_id in nodes: node_ip = self.provider.internal_ip(node_id) if node_ip in last_used and last_used[node_ip] < horizon and \ - len(nodes) - num_terminated > self.config["min_workers"]: + len(nodes) - num_terminated > target_workers: num_terminated += 1 print("StandardAutoscaler: Terminating idle node: " "{}".format(node_id)) @@ -394,12 +395,12 @@ class StandardAutoscaler(object): print(self.debug_string(nodes)) # Launch new nodes if needed - target_num = self.target_num_workers() - num_nodes = len(nodes) + num_pending - if num_nodes < target_num: + num_workers = len(nodes) + num_pending + if num_workers < target_workers: max_allowed = min(self.max_launch_batch, self.max_concurrent_launches - num_pending) - self.launch_new_node(min(max_allowed, target_num - num_nodes)) + num_launches = min(max_allowed, target_workers - num_workers) + self.launch_new_node(num_launches) print(self.debug_string()) # Process any completed updates @@ -453,7 +454,8 @@ class StandardAutoscaler(object): def target_num_workers(self): target_frac = self.config["target_utilization_fraction"] cur_used = self.load_metrics.approx_workers_used() - ideal_num_workers = int(np.ceil(cur_used / float(target_frac))) + ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) + ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 9120b332e..8e0c5a3f0 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -11,6 +11,7 @@ import yaml import copy import ray +import ray.services as services from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \ fillout_defaults, validate_config from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS @@ -572,7 +573,7 @@ class AutoscalingTest(unittest.TestCase): def testScaleUpBasedOnLoad(self): config = SMALL_CLUSTER.copy() - config["min_workers"] = 2 + config["min_workers"] = 1 config["max_workers"] = 10 config["target_utilization_fraction"] = 0.5 config_path = self.write_config(config) @@ -582,38 +583,73 @@ class AutoscalingTest(unittest.TestCase): config_path, lm, max_failures=0, update_interval_s=0) self.assertEqual(len(self.provider.nodes({})), 0) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(1) autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 2) + self.assertEqual(len(self.provider.nodes({})), 1) # Scales up as nodes are reported as used - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) + local_ip = services.get_node_ip_address() + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1 + autoscaler.update() + self.waitForNodes(3) lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}) autoscaler.update() - self.waitForNodes(4) - lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 0}) - autoscaler.update() - self.waitForNodes(6) + self.waitForNodes(5) # Holds steady when load is removed lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}) lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}) autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 6) + self.assertEqual(len(self.provider.nodes({})), 5) # Scales down as nodes become unused lm.last_used_time_by_ip["172.0.0.0"] = 0 lm.last_used_time_by_ip["172.0.0.1"] = 0 autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 4) + self.assertEqual(len(self.provider.nodes({})), 3) lm.last_used_time_by_ip["172.0.0.2"] = 0 lm.last_used_time_by_ip["172.0.0.3"] = 0 autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 2) + self.assertEqual(len(self.provider.nodes({})), 1) + + def testDontScaleBelowTarget(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 2 + config["target_utilization_fraction"] = 0.5 + config_path = self.write_config(config) + self.provider = MockProvider() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, lm, max_failures=0, update_interval_s=0) + self.assertEqual(len(self.provider.nodes({})), 0) + autoscaler.update() + self.assertEqual(autoscaler.num_launches_pending.value, 0) + self.assertEqual(len(self.provider.nodes({})), 0) + + # Scales up as nodes are reported as used + local_ip = services.get_node_ip_address() + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head + # 1.0 nodes used => target nodes = 2 => target workers = 1 + autoscaler.update() + self.waitForNodes(1) + + # Make new node idle, and never used. + # Should hold steady as target is still 2. + lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}) + lm.last_used_time_by_ip["172.0.0.0"] = 0 + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 1) + + # Reduce load on head => target nodes = 1 => target workers = 0 + lm.update(local_ip, {"CPU": 2}, {"CPU": 1}) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 0) def testRecoverUnhealthyWorkers(self): config_path = self.write_config(SMALL_CLUSTER)