[autoscaler] Refactor multi node type autoscaler config (#10190)

This commit is contained in:
Eric Liang
2020-08-19 20:46:00 -07:00
committed by GitHub
parent 2fd59de05d
commit 538cb802d5
11 changed files with 219 additions and 120 deletions
+7 -8
View File
@@ -1,4 +1,5 @@
from collections import defaultdict
from typing import Optional
import copy
import logging
import math
@@ -63,8 +64,8 @@ class StandardAutoscaler:
self.config["cluster_name"])
# Check whether we can enable the resource demand scheduler.
if "available_instance_types" in self.config:
self.instance_types = self.config["available_instance_types"]
if "available_node_types" in self.config:
self.instance_types = self.config["available_node_types"]
self.resource_demand_scheduler = ResourceDemandScheduler(
self.provider, self.instance_types, self.config["max_workers"])
else:
@@ -212,7 +213,8 @@ class StandardAutoscaler:
self.max_concurrent_launches - num_pending)
num_launches = min(max_allowed, target_workers - num_workers)
self.launch_new_node(num_launches)
self.launch_new_node(num_launches,
self.config.get("worker_default_node_type"))
nodes = self.workers()
self.log_info_string(nodes, target_workers)
elif self.load_metrics.num_workers_connected() >= target_workers:
@@ -439,13 +441,10 @@ class StandardAutoscaler:
return False
return True
def launch_new_node(self, count, instance_type=None):
def launch_new_node(self, count: int,
instance_type: Optional[str]) -> None:
logger.info(
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
# Try to fill in the default instance type so we can tag it properly.
if not instance_type:
instance_type = self.provider.get_instance_type(
self.config["worker_nodes"])
self.pending_launches.inc(instance_type, count)
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count, instance_type))
@@ -1,47 +0,0 @@
# EXPERIMENTAL: an example of configuring a mixed-worker cluster. Currently
# multi-worker autoscaling only works if you use the request_resources() call.
cluster_name: auto_instance_type
min_workers: 1
max_workers: 40
# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
# Tell the autoscaler the allowed node types and the resources they provide.
# This only has an effect if you use the experimental request_resources() call.
available_instance_types:
m4.xlarge:
resources: {"CPU": 4}
max_workers: 10
m4.4xlarge:
resources: {"CPU": 16, "Custom1": 1}
max_workers: 10
p2.xlarge:
resources: {"CPU": 4, "GPU": 1, "Custom2": 2}
max_workers: 4
p2.8xlarge:
resources: {"CPU": 32, "GPU": 8}
max_workers: 2
# Configure the cluster for very conservative auto-scaling otherwise.
target_utilization_fraction: 1.0
idle_timeout_minutes: 2
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# Provider-specific config for the head node, e.g. instance type.
head_node:
InstanceType: m4.xlarge
ImageId: latest_dlami
# Provider-specific config for the worker nodes, e.g. instance type.
# NOTE: the instance type can be overriden by the resource demand scheduler.
# The instance type set here is only used as the default fallback.
worker_nodes:
InstanceType: m4.xlarge
ImageId: latest_dlami
@@ -0,0 +1,66 @@
# Experimental: an example of configuring a mixed-node-type cluster.
cluster_name: multi_node_type
min_workers: 1
max_workers: 40
# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
cpu_4_ondemand:
node_config:
InstanceType: m4.xlarge
resources: {"CPU": 4}
max_workers: 5
cpu_4_spot:
node_config:
InstanceType: m4.xlarge
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 4}
max_workers: 20
cpu_16_ondemand:
node_config:
InstanceType: m4.4xlarge
resources: {"CPU": 16, "Custom1": 1}
max_workers: 10
gpu_1_ondemand:
node_config:
InstanceType: p2.xlarge
resources: {"CPU": 4, "GPU": 1, "Custom2": 2}
max_workers: 4
gpu_8_ondemand:
node_config:
InstanceType: p2.8xlarge
resources: {"CPU": 32, "GPU": 8}
max_workers: 2
# Specify the node type of the head node (as configured above).
head_node_type: cpu_4_ondemand
# Specify the default type of the worker node (as configured above).
worker_default_node_type: cpu_4_spot
# The default settings for the head node. This will be merged with the per-node
# type configs given above.
head_node:
ImageId: latest_dlami
# The default settings for worker nodes. This will be merged with the per-node
# type configs given above.
worker_nodes:
ImageId: latest_dlami
# Configure the cluster for very conservative auto-scaling otherwise.
target_utilization_fraction: 1.0
idle_timeout_minutes: 2
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
+8 -15
View File
@@ -193,21 +193,12 @@ class AWSNodeProvider(NodeProvider):
self.tag_cache_update_event.set()
def create_node_of_type(self, node_config, tags, instance_type, count):
assert instance_type is not None
node_config["InstanceType"] = instance_type
return self.create_node(node_config, tags, count)
def get_instance_type(self, node_config):
return node_config["InstanceType"]
def create_node(self, node_config, tags, count):
# Always add the instance type tag, since node reuse is unsafe
# otherwise.
tags = copy.deepcopy(tags)
tags[TAG_RAY_INSTANCE_TYPE] = node_config["InstanceType"]
# Try to reuse previously stopped nodes with compatible configs
if self.cache_stopped_nodes:
# TODO(ekl) this is breaking the abstraction boundary a little by
# peeking into the tag set.
filters = [
{
"Name": "instance-state-name",
@@ -221,15 +212,17 @@ class AWSNodeProvider(NodeProvider):
"Name": "tag:{}".format(TAG_RAY_NODE_TYPE),
"Values": [tags[TAG_RAY_NODE_TYPE]],
},
{
"Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE),
"Values": [tags[TAG_RAY_INSTANCE_TYPE]],
},
{
"Name": "tag:{}".format(TAG_RAY_LAUNCH_CONFIG),
"Values": [tags[TAG_RAY_LAUNCH_CONFIG]],
},
]
# This tag may not always be present.
if TAG_RAY_INSTANCE_TYPE in tags:
filters.append({
"Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE),
"Values": [tags[TAG_RAY_INSTANCE_TYPE]],
})
reuse_nodes = list(
self.ec2.instances.filter(Filters=filters))[:count]
+23 -6
View File
@@ -5,6 +5,7 @@ import logging
import os
import random
import sys
import subprocess
import tempfile
import time
from typing import Any, Dict, Optional
@@ -25,7 +26,7 @@ from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS, \
PROVIDER_PRETTY_NAMES, try_get_log_state, try_logging_config, \
try_reload_log_state
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD
TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD, TAG_RAY_INSTANCE_TYPE
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
from ray.autoscaler.updater import NodeUpdaterThread
@@ -454,11 +455,19 @@ def warn_about_bad_start_command(start_commands):
"to ray start in the head_start_ray_commands section.")
def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
override_cluster_name):
def get_or_create_head_node(config,
config_file,
no_restart,
restart_only,
yes,
override_cluster_name,
_provider=None,
_runner=subprocess):
"""Create the cluster head node, which in turn creates the workers."""
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = (_provider or get_node_provider(config["provider"],
config["cluster_name"]))
config = copy.deepcopy(config)
raw_config_file = config_file # used for printing to the user
config_file = os.path.abspath(config_file)
try:
@@ -508,7 +517,14 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
_abort=True)
cli_logger.newline()
launch_hash = hash_launch_conf(config["head_node"], config["auth"])
# TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
head_node_config = copy.deepcopy(config["head_node"])
if "head_node_type" in config:
head_node_tags[TAG_RAY_INSTANCE_TYPE] = config["head_node_type"]
head_node_config.update(config["available_node_types"][config[
"head_node_type"]]["node_config"])
launch_hash = hash_launch_conf(head_node_config, config["auth"])
if head_node is None or provider.node_tags(head_node).get(
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
with cli_logger.group("Acquiring an up-to-date head node"):
@@ -540,7 +556,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
config["cluster_name"])
provider.create_node(config["head_node"], head_node_tags, 1)
provider.create_node(head_node_config, head_node_tags, 1)
cli_logger.print("Launched a new head node")
start = time.time()
@@ -633,6 +649,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
initialization_commands=config["initialization_commands"],
setup_commands=init_commands,
ray_start_commands=ray_start_commands,
process_runner=_runner,
runtime_hash=runtime_hash,
file_mounts_contents_hash=file_mounts_contents_hash,
docker_config=config.get("docker"))
+14 -6
View File
@@ -1,3 +1,5 @@
from typing import Any, Optional, Dict
import copy
import logging
import threading
@@ -28,24 +30,30 @@ class NodeLauncher(threading.Thread):
self.index = str(index) if index is not None else ""
super(NodeLauncher, self).__init__(*args, **kwargs)
def _launch_node(self, config, count, instance_type):
def _launch_node(self, config: Dict[str, Any], count: int,
instance_type: Optional[str]):
if self.instance_types:
assert instance_type, instance_type
worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}
before = self.provider.non_terminated_nodes(tag_filters=worker_filter)
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
self.log("Launching {} nodes, type {}.".format(count, instance_type))
node_config = config["worker_nodes"]
node_config = copy.deepcopy(config["worker_nodes"])
node_tags = {
TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]),
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
TAG_RAY_LAUNCH_CONFIG: launch_hash,
}
# A custom node type is specified; set the tag in this case, and also
# merge the configs. We merge the configs instead of overriding, so
# that the bootstrapped per-cloud properties are preserved.
# TODO(ekl) this logic is duplicated in commands.py (keep in sync)
if instance_type:
node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type
self.provider.create_node_of_type(node_config, node_tags,
instance_type, count)
else:
self.provider.create_node(node_config, node_tags, count)
node_config.update(
config["available_node_types"][instance_type]["node_config"])
self.provider.create_node(node_config, node_tags, count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
-16
View File
@@ -234,22 +234,6 @@ class NodeProvider:
"""Clean-up when a Provider is no longer required."""
pass
def create_node_of_type(self, node_config, tags, instance_type, count):
"""Creates a number of nodes with a given instance type.
This is an optional method only required if using the resource
demand scheduler.
"""
assert instance_type is not None
raise NotImplementedError
def get_instance_type(self, node_config):
"""Returns the instance type of this node config.
This is an optional method only required if using the resource
demand scheduler."""
return None
@staticmethod
def bootstrap_config(cluster_config):
"""Bootstraps the cluster config by adding env defaults if needed."""
+15 -2
View File
@@ -220,6 +220,14 @@
}
}
},
"head_node_type": {
"type": "string",
"description": "If using multiple node types, specifies the head node type."
},
"worker_default_node_type": {
"type": "string",
"description": "If using multiple node types, specifies the default worker node type."
},
"head_node": {
"type": "object",
"description": "Provider-specific config for the head node, e.g. instance type."
@@ -267,13 +275,18 @@
"no_restart": {
"description": "Whether to avoid restarting the cluster during updates. This field is controlled by the ray --no-restart flag and cannot be set by the user."
},
"available_instance_types": {
"available_node_types": {
"type": "object",
"description": "A list of instance types available for launching with 'auto' worker type.",
"description": "A list of node types for multi-node-type autoscaling.",
"patternProperties": {
".*": {
"type": "object",
"required": [ "resources", "node_config" ],
"properties": {
"node_config": {
"type": "object",
"description": "Provider-specific config for the node, e.g. instance type."
},
"max_workers": {"type": "integer"},
"resources": {
"type": "object",
+16
View File
@@ -70,6 +70,22 @@ def validate_config(config: Dict[str, Any]) -> None:
"machine and make sure the versions match.".format(
ray_version=ray.__version__))
if "available_node_types" in config:
if "head_node_type" not in config:
raise ValueError(
"You must specify `head_node_type` if `available_node_types "
"is set.")
if config["head_node_type"] not in config["available_node_types"]:
raise ValueError(
"`head_node_type` must be one of `available_node_types`.")
if "worker_default_node_type" not in config:
raise ValueError("You must specify `worker_default_node_type` if "
"`available_node_types is set.")
if (config["worker_default_node_type"] not in config[
"available_node_types"]):
raise ValueError("`worker_default_node_type` must be one of "
"`available_node_types`.")
def prepare_config(config):
with_defaults = fillout_defaults(config)
+28 -14
View File
@@ -11,22 +11,24 @@ from jsonschema.exceptions import ValidationError
import ray
import ray.services as services
from ray.autoscaler.util import prepare_config, validate_config
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_INSTANCE_TYPE
from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider
from ray.test_utils import RayTestTimeoutException
import pytest
class MockNode:
def __init__(self, node_id, tags, instance_type=None):
def __init__(self, node_id, tags, node_config, instance_type):
self.node_id = node_id
self.state = "pending"
self.tags = tags
self.external_ip = "1.2.3.4"
self.internal_ip = "172.0.0.{}".format(self.node_id)
self.node_config = node_config
self.instance_type = instance_type
def matches(self, tags):
@@ -95,7 +97,7 @@ class MockProcessRunner:
class MockProvider(NodeProvider):
def __init__(self, cache_stopped=False, default_instance_type=None):
def __init__(self, cache_stopped=False):
self.mock_nodes = {}
self.next_id = 0
self.throw = False
@@ -103,7 +105,6 @@ class MockProvider(NodeProvider):
self.ready_to_create = threading.Event()
self.ready_to_create.set()
self.cache_stopped = cache_stopped
self.default_instance_type = default_instance_type
def non_terminated_nodes(self, tag_filters):
if self.throw:
@@ -138,7 +139,7 @@ class MockProvider(NodeProvider):
def external_ip(self, node_id):
return self.mock_nodes[node_id].external_ip
def create_node(self, node_config, tags, count, instance_type=None):
def create_node(self, node_config, tags, count):
self.ready_to_create.wait()
if self.fail_creates:
return
@@ -149,17 +150,11 @@ class MockProvider(NodeProvider):
node.state = "pending"
node.tags.update(tags)
for _ in range(count):
self.mock_nodes[self.next_id] = MockNode(self.next_id, tags.copy(),
instance_type)
self.mock_nodes[self.next_id] = MockNode(
self.next_id, tags.copy(), node_config,
tags.get(TAG_RAY_INSTANCE_TYPE))
self.next_id += 1
def create_node_of_type(self, node_config, tags, instance_type, count):
return self.create_node(
node_config, tags, count, instance_type=instance_type)
def get_instance_type(self, node_config):
return self.default_instance_type
def set_node_tags(self, node_id, tags):
self.mock_nodes[node_id].tags.update(tags)
@@ -376,6 +371,25 @@ class AutoscalingTest(unittest.TestCase):
except ValidationError:
self.fail("Default config did not pass validation test!")
def testGetOrCreateHeadNode(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
get_or_create_head_node(
SMALL_CLUSTER,
config_path,
no_restart=False,
restart_only=False,
yes=True,
override_cluster_name=None,
_provider=self.provider,
_runner=runner)
self.waitForNodes(1)
runner.assert_has_call("1.2.3.4", "init_cmd")
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
runner.assert_has_call("1.2.3.4", "start_ray_head")
self.assertEqual(self.provider.mock_nodes[0].instance_type, None)
def testScaleUp(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
@@ -11,6 +11,8 @@ from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.node_provider import NODE_PROVIDERS
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
get_bin_pack_residual, get_instances_for
@@ -18,24 +20,30 @@ from time import sleep
TYPES_A = {
"m4.large": {
"node_config": {
"FooProperty": 42,
},
"resources": {
"CPU": 2
},
"max_workers": 10,
},
"m4.4xlarge": {
"node_config": {},
"resources": {
"CPU": 16
},
"max_workers": 8,
},
"m4.16xlarge": {
"node_config": {},
"resources": {
"CPU": 64
},
"max_workers": 4,
},
"p2.xlarge": {
"node_config": {},
"resources": {
"CPU": 16,
"GPU": 1
@@ -43,6 +51,7 @@ TYPES_A = {
"max_workers": 10,
},
"p2.8xlarge": {
"node_config": {},
"resources": {
"CPU": 32,
"GPU": 8
@@ -51,9 +60,12 @@ TYPES_A = {
},
}
MULTI_WORKER_CLUSTER = dict(SMALL_CLUSTER, **{
"available_instance_types": TYPES_A,
})
MULTI_WORKER_CLUSTER = dict(
SMALL_CLUSTER, **{
"available_node_types": TYPES_A,
"head_node_type": "m4.large",
"worker_default_node_type": "m4.large",
})
def test_util_score():
@@ -182,9 +194,33 @@ class AutoscalingTest(unittest.TestCase):
f.write(yaml.dump(config))
return path
def testGetOrCreateMultiNodeType(self):
config_path = self.write_config(MULTI_WORKER_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
get_or_create_head_node(
MULTI_WORKER_CLUSTER,
config_path,
no_restart=False,
restart_only=False,
yes=True,
override_cluster_name=None,
_provider=self.provider,
_runner=runner)
self.waitForNodes(1)
runner.assert_has_call("1.2.3.4", "init_cmd")
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
runner.assert_has_call("1.2.3.4", "start_ray_head")
self.assertEqual(self.provider.mock_nodes[0].instance_type, "m4.large")
self.assertEqual(
self.provider.mock_nodes[0].node_config.get("FooProperty"), 42)
self.assertEqual(
self.provider.mock_nodes[0].tags.get(TAG_RAY_INSTANCE_TYPE),
"m4.large")
def testScaleUpMinSanity(self):
config_path = self.write_config(MULTI_WORKER_CLUSTER)
self.provider = MockProvider(default_instance_type="m4.large")
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
@@ -203,7 +239,7 @@ class AutoscalingTest(unittest.TestCase):
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
self.provider = MockProvider(default_instance_type="m4.large")
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
@@ -233,7 +269,7 @@ class AutoscalingTest(unittest.TestCase):
config["min_workers"] = 0
config["max_workers"] = 50
config_path = self.write_config(config)
self.provider = MockProvider(default_instance_type="m4.large")
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,