diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index a806e3b62..1b2f45ebf 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -50,6 +50,9 @@ CLUSTER_CONFIG_SCHEMA = { # node. This takes precedence over min_workers. "max_workers": (int, REQUIRED), + # The number of workers to launch initially, in addition to the head node. + "initial_workers": (int, OPTIONAL), + # The autoscaler will scale up the cluster to this target fraction of # resources usage. For example, if a cluster of 8 nodes is 100% busy # and target_utilization was 0.8, it would resize the cluster to 10. @@ -324,6 +327,7 @@ class StandardAutoscaler(object): self.num_failures = 0 self.last_update_time = 0.0 self.update_interval_s = update_interval_s + self.bringup = True # Node launchers self.launch_queue = queue.Queue() @@ -418,6 +422,8 @@ class StandardAutoscaler(object): num_launches = min(max_allowed, target_workers - num_workers) self.launch_new_node(num_launches) logger.info(self.info_string()) + else: + self.bringup = False # Process any completed updates completed = [] @@ -467,10 +473,15 @@ class StandardAutoscaler(object): logger.exception("StandardAutoscaler: Error parsing config.") def target_num_workers(self): - target_frac = self.config["target_utilization_fraction"] - cur_used = self.load_metrics.approx_workers_used() - ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) - ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node + initial_workers = self.config["initial_workers"] + + if self.bringup: + ideal_num_workers = initial_workers + else: + target_frac = self.config["target_utilization_fraction"] + cur_used = self.load_metrics.approx_workers_used() + ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) + ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) @@ -577,6 +588,8 @@ class StandardAutoscaler(object): if self.num_failed_updates: suffix += " ({} failed to update)".format( len(self.num_failed_updates)) + if self.bringup: + suffix += " (bringup=True)" return "StandardAutoscaler [{}]: {}/{} target nodes{}\n{}".format( datetime.now(), len(nodes), self.target_num_workers(), suffix, self.load_metrics.info_string()) diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index afe767a93..a415135a1 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -9,6 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 0 + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index b841781e9..c7f2d8f72 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -9,6 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 0 + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml index 88d20dadd..51f64cbfb 100644 --- a/python/ray/autoscaler/local/example-full.yaml +++ b/python/ray/autoscaler/local/example-full.yaml @@ -1,6 +1,7 @@ cluster_name: default min_workers: 0 max_workers: 0 +initial_workers: 0 docker: image: "" container_name: "" diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 4312fdcb1..10e0cda1a 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -98,6 +98,7 @@ SMALL_CLUSTER = { "cluster_name": "default", "min_workers": 2, "max_workers": 2, + "initial_workers": 0, "target_utilization_fraction": 0.8, "idle_timeout_minutes": 5, "provider": { @@ -314,6 +315,27 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(10) + def testInitialWorkers(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 20 + config["initial_workers"] = 10 + config_path = self.write_config(config) + self.provider = MockProvider() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + update_interval_s=0) + self.waitForNodes(0) + autoscaler.update() + self.waitForNodes(5) # expected due to batch sizes and concurrency + autoscaler.update() + self.waitForNodes(10) + autoscaler.update() + def testDelayedLaunch(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider()