[autoscaler] Add an initial_workers option (#3530)

## What do these changes do?

    This option goes along with `min_workers`, and `max_workers`.  When the
    cluster is first brought up (or when it is refreshed with a subsequent
    `ray up`) this number of nodes will be started.
    
    It's a workaround for issues of scaling (see related issues) where it
    can take a long time (or forever in the case where the head node has
    `--num-cpus 0`) to scale up a cluster in response to increasing demand.


## Related issue number

Workaround for https://github.com/ray-project/ray/issues/3339 and https://github.com/ray-project/ray/issues/2106
This commit is contained in:
mattearllongshot
2019-01-06 01:58:42 +00:00
committed by Richard Liaw
parent 067976ad3d
commit 681e8cd3fd
5 changed files with 50 additions and 4 deletions
+17 -4
View File
@@ -50,6 +50,9 @@ CLUSTER_CONFIG_SCHEMA = {
# node. This takes precedence over min_workers.
"max_workers": (int, REQUIRED),
# The number of workers to launch initially, in addition to the head node.
"initial_workers": (int, OPTIONAL),
# The autoscaler will scale up the cluster to this target fraction of
# resources usage. For example, if a cluster of 8 nodes is 100% busy
# and target_utilization was 0.8, it would resize the cluster to 10.
@@ -324,6 +327,7 @@ class StandardAutoscaler(object):
self.num_failures = 0
self.last_update_time = 0.0
self.update_interval_s = update_interval_s
self.bringup = True
# Node launchers
self.launch_queue = queue.Queue()
@@ -418,6 +422,8 @@ class StandardAutoscaler(object):
num_launches = min(max_allowed, target_workers - num_workers)
self.launch_new_node(num_launches)
logger.info(self.info_string())
else:
self.bringup = False
# Process any completed updates
completed = []
@@ -467,10 +473,15 @@ class StandardAutoscaler(object):
logger.exception("StandardAutoscaler: Error parsing config.")
def target_num_workers(self):
target_frac = self.config["target_utilization_fraction"]
cur_used = self.load_metrics.approx_workers_used()
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
initial_workers = self.config["initial_workers"]
if self.bringup:
ideal_num_workers = initial_workers
else:
target_frac = self.config["target_utilization_fraction"]
cur_used = self.load_metrics.approx_workers_used()
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
return min(self.config["max_workers"],
max(self.config["min_workers"], ideal_num_workers))
@@ -577,6 +588,8 @@ class StandardAutoscaler(object):
if self.num_failed_updates:
suffix += " ({} failed to update)".format(
len(self.num_failed_updates))
if self.bringup:
suffix += " (bringup=True)"
return "StandardAutoscaler [{}]: {}/{} target nodes{}\n{}".format(
datetime.now(), len(nodes), self.target_num_workers(), suffix,
self.load_metrics.info_string())
@@ -9,6 +9,11 @@ min_workers: 0
# node. This takes precedence over min_workers.
max_workers: 2
# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
@@ -9,6 +9,11 @@ min_workers: 0
# node. This takes precedence over min_workers.
max_workers: 2
# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
@@ -1,6 +1,7 @@
cluster_name: default
min_workers: 0
max_workers: 0
initial_workers: 0
docker:
image: ""
container_name: ""