From 89460b8d11bdd6194e7752cff55229714a6ccd60 Mon Sep 17 00:00:00 2001 From: Adam Gleave Date: Thu, 28 Jun 2018 15:33:51 -0700 Subject: [PATCH] autoscaler: count head node, don't kill below target (fixes #2317) (#2320) Specifically, subtracts 1 from the target number of workers, taking into account that the head node has some computational resources. Do not kill an idle node if it would drop us below the target number of nodes (in which case we just immediately relaunch). --- python/ray/autoscaler/autoscaler.py | 14 ++++--- test/autoscaler_test.py | 58 +++++++++++++++++++++++------ 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index c8b852c4e..e56dc36fc 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -359,6 +359,7 @@ class StandardAutoscaler(object): print(self.debug_string(nodes)) self.load_metrics.prune_active_ips( [self.provider.internal_ip(node_id) for node_id in nodes]) + target_workers = self.target_num_workers() # Terminate any idle or out of date nodes last_used = self.load_metrics.last_used_time_by_ip @@ -367,7 +368,7 @@ class StandardAutoscaler(object): for node_id in nodes: node_ip = self.provider.internal_ip(node_id) if node_ip in last_used and last_used[node_ip] < horizon and \ - len(nodes) - num_terminated > self.config["min_workers"]: + len(nodes) - num_terminated > target_workers: num_terminated += 1 print("StandardAutoscaler: Terminating idle node: " "{}".format(node_id)) @@ -394,12 +395,12 @@ class StandardAutoscaler(object): print(self.debug_string(nodes)) # Launch new nodes if needed - target_num = self.target_num_workers() - num_nodes = len(nodes) + num_pending - if num_nodes < target_num: + num_workers = len(nodes) + num_pending + if num_workers < target_workers: max_allowed = min(self.max_launch_batch, self.max_concurrent_launches - num_pending) - self.launch_new_node(min(max_allowed, target_num - num_nodes)) + num_launches = min(max_allowed, target_workers - num_workers) + self.launch_new_node(num_launches) print(self.debug_string()) # Process any completed updates @@ -453,7 +454,8 @@ class StandardAutoscaler(object): def target_num_workers(self): target_frac = self.config["target_utilization_fraction"] cur_used = self.load_metrics.approx_workers_used() - ideal_num_workers = int(np.ceil(cur_used / float(target_frac))) + ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) + ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 9120b332e..8e0c5a3f0 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -11,6 +11,7 @@ import yaml import copy import ray +import ray.services as services from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \ fillout_defaults, validate_config from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS @@ -572,7 +573,7 @@ class AutoscalingTest(unittest.TestCase): def testScaleUpBasedOnLoad(self): config = SMALL_CLUSTER.copy() - config["min_workers"] = 2 + config["min_workers"] = 1 config["max_workers"] = 10 config["target_utilization_fraction"] = 0.5 config_path = self.write_config(config) @@ -582,38 +583,73 @@ class AutoscalingTest(unittest.TestCase): config_path, lm, max_failures=0, update_interval_s=0) self.assertEqual(len(self.provider.nodes({})), 0) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(1) autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 2) + self.assertEqual(len(self.provider.nodes({})), 1) # Scales up as nodes are reported as used - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) + local_ip = services.get_node_ip_address() + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1 + autoscaler.update() + self.waitForNodes(3) lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}) autoscaler.update() - self.waitForNodes(4) - lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 0}) - autoscaler.update() - self.waitForNodes(6) + self.waitForNodes(5) # Holds steady when load is removed lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}) lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}) autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 6) + self.assertEqual(len(self.provider.nodes({})), 5) # Scales down as nodes become unused lm.last_used_time_by_ip["172.0.0.0"] = 0 lm.last_used_time_by_ip["172.0.0.1"] = 0 autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 4) + self.assertEqual(len(self.provider.nodes({})), 3) lm.last_used_time_by_ip["172.0.0.2"] = 0 lm.last_used_time_by_ip["172.0.0.3"] = 0 autoscaler.update() self.assertEqual(autoscaler.num_launches_pending.value, 0) - self.assertEqual(len(self.provider.nodes({})), 2) + self.assertEqual(len(self.provider.nodes({})), 1) + + def testDontScaleBelowTarget(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 2 + config["target_utilization_fraction"] = 0.5 + config_path = self.write_config(config) + self.provider = MockProvider() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, lm, max_failures=0, update_interval_s=0) + self.assertEqual(len(self.provider.nodes({})), 0) + autoscaler.update() + self.assertEqual(autoscaler.num_launches_pending.value, 0) + self.assertEqual(len(self.provider.nodes({})), 0) + + # Scales up as nodes are reported as used + local_ip = services.get_node_ip_address() + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head + # 1.0 nodes used => target nodes = 2 => target workers = 1 + autoscaler.update() + self.waitForNodes(1) + + # Make new node idle, and never used. + # Should hold steady as target is still 2. + lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}) + lm.last_used_time_by_ip["172.0.0.0"] = 0 + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 1) + + # Reduce load on head => target nodes = 1 => target workers = 0 + lm.update(local_ip, {"CPU": 2}, {"CPU": 1}) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 0) def testRecoverUnhealthyWorkers(self): config_path = self.write_config(SMALL_CLUSTER)