diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 4387cc3c0..ef2b0701d 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -1,4 +1,5 @@ from collections import defaultdict +from typing import Optional import copy import logging import math @@ -63,8 +64,8 @@ class StandardAutoscaler: self.config["cluster_name"]) # Check whether we can enable the resource demand scheduler. - if "available_instance_types" in self.config: - self.instance_types = self.config["available_instance_types"] + if "available_node_types" in self.config: + self.instance_types = self.config["available_node_types"] self.resource_demand_scheduler = ResourceDemandScheduler( self.provider, self.instance_types, self.config["max_workers"]) else: @@ -212,7 +213,8 @@ class StandardAutoscaler: self.max_concurrent_launches - num_pending) num_launches = min(max_allowed, target_workers - num_workers) - self.launch_new_node(num_launches) + self.launch_new_node(num_launches, + self.config.get("worker_default_node_type")) nodes = self.workers() self.log_info_string(nodes, target_workers) elif self.load_metrics.num_workers_connected() >= target_workers: @@ -439,13 +441,10 @@ class StandardAutoscaler: return False return True - def launch_new_node(self, count, instance_type=None): + def launch_new_node(self, count: int, + instance_type: Optional[str]) -> None: logger.info( "StandardAutoscaler: Queue {} new nodes for launch".format(count)) - # Try to fill in the default instance type so we can tag it properly. - if not instance_type: - instance_type = self.provider.get_instance_type( - self.config["worker_nodes"]) self.pending_launches.inc(instance_type, count) config = copy.deepcopy(self.config) self.launch_queue.put((config, count, instance_type)) diff --git a/python/ray/autoscaler/aws/example-auto-instance-type.yaml b/python/ray/autoscaler/aws/example-auto-instance-type.yaml deleted file mode 100644 index c61951af0..000000000 --- a/python/ray/autoscaler/aws/example-auto-instance-type.yaml +++ /dev/null @@ -1,47 +0,0 @@ -# EXPERIMENTAL: an example of configuring a mixed-worker cluster. Currently -# multi-worker autoscaling only works if you use the request_resources() call. -cluster_name: auto_instance_type -min_workers: 1 -max_workers: 40 - -# Cloud-provider specific configuration. -provider: - type: aws - region: us-west-2 - availability_zone: us-west-2a - -# Tell the autoscaler the allowed node types and the resources they provide. -# This only has an effect if you use the experimental request_resources() call. -available_instance_types: - m4.xlarge: - resources: {"CPU": 4} - max_workers: 10 - m4.4xlarge: - resources: {"CPU": 16, "Custom1": 1} - max_workers: 10 - p2.xlarge: - resources: {"CPU": 4, "GPU": 1, "Custom2": 2} - max_workers: 4 - p2.8xlarge: - resources: {"CPU": 32, "GPU": 8} - max_workers: 2 - -# Configure the cluster for very conservative auto-scaling otherwise. -target_utilization_fraction: 1.0 -idle_timeout_minutes: 2 - -# How Ray will authenticate with newly launched nodes. -auth: - ssh_user: ubuntu - -# Provider-specific config for the head node, e.g. instance type. -head_node: - InstanceType: m4.xlarge - ImageId: latest_dlami - -# Provider-specific config for the worker nodes, e.g. instance type. -# NOTE: the instance type can be overriden by the resource demand scheduler. -# The instance type set here is only used as the default fallback. -worker_nodes: - InstanceType: m4.xlarge - ImageId: latest_dlami diff --git a/python/ray/autoscaler/aws/example-multi-node-type.yaml b/python/ray/autoscaler/aws/example-multi-node-type.yaml new file mode 100644 index 000000000..a6f9a3118 --- /dev/null +++ b/python/ray/autoscaler/aws/example-multi-node-type.yaml @@ -0,0 +1,66 @@ +# Experimental: an example of configuring a mixed-node-type cluster. +cluster_name: multi_node_type +min_workers: 1 +max_workers: 40 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +# Tell the autoscaler the allowed node types and the resources they provide. +# The key is the name of the node type, which is just for debugging purposes. +# The node config specifies the launch config and physical instance type. +available_node_types: + cpu_4_ondemand: + node_config: + InstanceType: m4.xlarge + resources: {"CPU": 4} + max_workers: 5 + cpu_4_spot: + node_config: + InstanceType: m4.xlarge + InstanceMarketOptions: + MarketType: spot + resources: {"CPU": 4} + max_workers: 20 + cpu_16_ondemand: + node_config: + InstanceType: m4.4xlarge + resources: {"CPU": 16, "Custom1": 1} + max_workers: 10 + gpu_1_ondemand: + node_config: + InstanceType: p2.xlarge + resources: {"CPU": 4, "GPU": 1, "Custom2": 2} + max_workers: 4 + gpu_8_ondemand: + node_config: + InstanceType: p2.8xlarge + resources: {"CPU": 32, "GPU": 8} + max_workers: 2 + +# Specify the node type of the head node (as configured above). +head_node_type: cpu_4_ondemand + +# Specify the default type of the worker node (as configured above). +worker_default_node_type: cpu_4_spot + +# The default settings for the head node. This will be merged with the per-node +# type configs given above. +head_node: + ImageId: latest_dlami + +# The default settings for worker nodes. This will be merged with the per-node +# type configs given above. +worker_nodes: + ImageId: latest_dlami + +# Configure the cluster for very conservative auto-scaling otherwise. +target_utilization_fraction: 1.0 +idle_timeout_minutes: 2 + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index 7cfca9ebb..d77d06ef9 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -193,21 +193,12 @@ class AWSNodeProvider(NodeProvider): self.tag_cache_update_event.set() - def create_node_of_type(self, node_config, tags, instance_type, count): - assert instance_type is not None - node_config["InstanceType"] = instance_type - return self.create_node(node_config, tags, count) - - def get_instance_type(self, node_config): - return node_config["InstanceType"] - def create_node(self, node_config, tags, count): - # Always add the instance type tag, since node reuse is unsafe - # otherwise. tags = copy.deepcopy(tags) - tags[TAG_RAY_INSTANCE_TYPE] = node_config["InstanceType"] # Try to reuse previously stopped nodes with compatible configs if self.cache_stopped_nodes: + # TODO(ekl) this is breaking the abstraction boundary a little by + # peeking into the tag set. filters = [ { "Name": "instance-state-name", @@ -221,15 +212,17 @@ class AWSNodeProvider(NodeProvider): "Name": "tag:{}".format(TAG_RAY_NODE_TYPE), "Values": [tags[TAG_RAY_NODE_TYPE]], }, - { - "Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE), - "Values": [tags[TAG_RAY_INSTANCE_TYPE]], - }, { "Name": "tag:{}".format(TAG_RAY_LAUNCH_CONFIG), "Values": [tags[TAG_RAY_LAUNCH_CONFIG]], }, ] + # This tag may not always be present. + if TAG_RAY_INSTANCE_TYPE in tags: + filters.append({ + "Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE), + "Values": [tags[TAG_RAY_INSTANCE_TYPE]], + }) reuse_nodes = list( self.ec2.instances.filter(Filters=filters))[:count] diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 3ea83ff48..b16753d3a 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -5,6 +5,7 @@ import logging import os import random import sys +import subprocess import tempfile import time from typing import Any, Dict, Optional @@ -25,7 +26,7 @@ from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS, \ PROVIDER_PRETTY_NAMES, try_get_log_state, try_logging_config, \ try_reload_log_state from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ - TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD + TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD, TAG_RAY_INSTANCE_TYPE from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler.updater import NodeUpdaterThread @@ -454,11 +455,19 @@ def warn_about_bad_start_command(start_commands): "to ray start in the head_start_ray_commands section.") -def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, - override_cluster_name): +def get_or_create_head_node(config, + config_file, + no_restart, + restart_only, + yes, + override_cluster_name, + _provider=None, + _runner=subprocess): """Create the cluster head node, which in turn creates the workers.""" - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = (_provider or get_node_provider(config["provider"], + config["cluster_name"])) + config = copy.deepcopy(config) raw_config_file = config_file # used for printing to the user config_file = os.path.abspath(config_file) try: @@ -508,7 +517,14 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, _abort=True) cli_logger.newline() - launch_hash = hash_launch_conf(config["head_node"], config["auth"]) + # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync) + head_node_config = copy.deepcopy(config["head_node"]) + if "head_node_type" in config: + head_node_tags[TAG_RAY_INSTANCE_TYPE] = config["head_node_type"] + head_node_config.update(config["available_node_types"][config[ + "head_node_type"]]["node_config"]) + + launch_hash = hash_launch_conf(head_node_config, config["auth"]) if head_node is None or provider.node_tags(head_node).get( TAG_RAY_LAUNCH_CONFIG) != launch_hash: with cli_logger.group("Acquiring an up-to-date head node"): @@ -540,7 +556,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( config["cluster_name"]) - provider.create_node(config["head_node"], head_node_tags, 1) + provider.create_node(head_node_config, head_node_tags, 1) cli_logger.print("Launched a new head node") start = time.time() @@ -633,6 +649,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, initialization_commands=config["initialization_commands"], setup_commands=init_commands, ray_start_commands=ray_start_commands, + process_runner=_runner, runtime_hash=runtime_hash, file_mounts_contents_hash=file_mounts_contents_hash, docker_config=config.get("docker")) diff --git a/python/ray/autoscaler/node_launcher.py b/python/ray/autoscaler/node_launcher.py index e90e0ba04..fdf65ed65 100644 --- a/python/ray/autoscaler/node_launcher.py +++ b/python/ray/autoscaler/node_launcher.py @@ -1,3 +1,5 @@ +from typing import Any, Optional, Dict +import copy import logging import threading @@ -28,24 +30,30 @@ class NodeLauncher(threading.Thread): self.index = str(index) if index is not None else "" super(NodeLauncher, self).__init__(*args, **kwargs) - def _launch_node(self, config, count, instance_type): + def _launch_node(self, config: Dict[str, Any], count: int, + instance_type: Optional[str]): + if self.instance_types: + assert instance_type, instance_type worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER} before = self.provider.non_terminated_nodes(tag_filters=worker_filter) launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"]) self.log("Launching {} nodes, type {}.".format(count, instance_type)) - node_config = config["worker_nodes"] + node_config = copy.deepcopy(config["worker_nodes"]) node_tags = { TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]), TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER, TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED, TAG_RAY_LAUNCH_CONFIG: launch_hash, } + # A custom node type is specified; set the tag in this case, and also + # merge the configs. We merge the configs instead of overriding, so + # that the bootstrapped per-cloud properties are preserved. + # TODO(ekl) this logic is duplicated in commands.py (keep in sync) if instance_type: node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type - self.provider.create_node_of_type(node_config, node_tags, - instance_type, count) - else: - self.provider.create_node(node_config, node_tags, count) + node_config.update( + config["available_node_types"][instance_type]["node_config"]) + self.provider.create_node(node_config, node_tags, count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): self.log("No new nodes reported after node creation.") diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 7d01bfa10..6e34077ec 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -234,22 +234,6 @@ class NodeProvider: """Clean-up when a Provider is no longer required.""" pass - def create_node_of_type(self, node_config, tags, instance_type, count): - """Creates a number of nodes with a given instance type. - - This is an optional method only required if using the resource - demand scheduler. - """ - assert instance_type is not None - raise NotImplementedError - - def get_instance_type(self, node_config): - """Returns the instance type of this node config. - - This is an optional method only required if using the resource - demand scheduler.""" - return None - @staticmethod def bootstrap_config(cluster_config): """Bootstraps the cluster config by adding env defaults if needed.""" diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index fe7341031..7cec8f094 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -220,6 +220,14 @@ } } }, + "head_node_type": { + "type": "string", + "description": "If using multiple node types, specifies the head node type." + }, + "worker_default_node_type": { + "type": "string", + "description": "If using multiple node types, specifies the default worker node type." + }, "head_node": { "type": "object", "description": "Provider-specific config for the head node, e.g. instance type." @@ -267,13 +275,18 @@ "no_restart": { "description": "Whether to avoid restarting the cluster during updates. This field is controlled by the ray --no-restart flag and cannot be set by the user." }, - "available_instance_types": { + "available_node_types": { "type": "object", - "description": "A list of instance types available for launching with 'auto' worker type.", + "description": "A list of node types for multi-node-type autoscaling.", "patternProperties": { ".*": { "type": "object", + "required": [ "resources", "node_config" ], "properties": { + "node_config": { + "type": "object", + "description": "Provider-specific config for the node, e.g. instance type." + }, "max_workers": {"type": "integer"}, "resources": { "type": "object", diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/util.py index f014c93b7..8525a180b 100644 --- a/python/ray/autoscaler/util.py +++ b/python/ray/autoscaler/util.py @@ -70,6 +70,22 @@ def validate_config(config: Dict[str, Any]) -> None: "machine and make sure the versions match.".format( ray_version=ray.__version__)) + if "available_node_types" in config: + if "head_node_type" not in config: + raise ValueError( + "You must specify `head_node_type` if `available_node_types " + "is set.") + if config["head_node_type"] not in config["available_node_types"]: + raise ValueError( + "`head_node_type` must be one of `available_node_types`.") + if "worker_default_node_type" not in config: + raise ValueError("You must specify `worker_default_node_type` if " + "`available_node_types is set.") + if (config["worker_default_node_type"] not in config[ + "available_node_types"]): + raise ValueError("`worker_default_node_type` must be one of " + "`available_node_types`.") + def prepare_config(config): with_defaults = fillout_defaults(config) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index b2708793f..c39a2e1fc 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -11,22 +11,24 @@ from jsonschema.exceptions import ValidationError import ray import ray.services as services from ray.autoscaler.util import prepare_config, validate_config +from ray.autoscaler.commands import get_or_create_head_node from ray.autoscaler.load_metrics import LoadMetrics from ray.autoscaler.autoscaler import StandardAutoscaler from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS, \ - STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED + STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_INSTANCE_TYPE from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider from ray.test_utils import RayTestTimeoutException import pytest class MockNode: - def __init__(self, node_id, tags, instance_type=None): + def __init__(self, node_id, tags, node_config, instance_type): self.node_id = node_id self.state = "pending" self.tags = tags self.external_ip = "1.2.3.4" self.internal_ip = "172.0.0.{}".format(self.node_id) + self.node_config = node_config self.instance_type = instance_type def matches(self, tags): @@ -95,7 +97,7 @@ class MockProcessRunner: class MockProvider(NodeProvider): - def __init__(self, cache_stopped=False, default_instance_type=None): + def __init__(self, cache_stopped=False): self.mock_nodes = {} self.next_id = 0 self.throw = False @@ -103,7 +105,6 @@ class MockProvider(NodeProvider): self.ready_to_create = threading.Event() self.ready_to_create.set() self.cache_stopped = cache_stopped - self.default_instance_type = default_instance_type def non_terminated_nodes(self, tag_filters): if self.throw: @@ -138,7 +139,7 @@ class MockProvider(NodeProvider): def external_ip(self, node_id): return self.mock_nodes[node_id].external_ip - def create_node(self, node_config, tags, count, instance_type=None): + def create_node(self, node_config, tags, count): self.ready_to_create.wait() if self.fail_creates: return @@ -149,17 +150,11 @@ class MockProvider(NodeProvider): node.state = "pending" node.tags.update(tags) for _ in range(count): - self.mock_nodes[self.next_id] = MockNode(self.next_id, tags.copy(), - instance_type) + self.mock_nodes[self.next_id] = MockNode( + self.next_id, tags.copy(), node_config, + tags.get(TAG_RAY_INSTANCE_TYPE)) self.next_id += 1 - def create_node_of_type(self, node_config, tags, instance_type, count): - return self.create_node( - node_config, tags, count, instance_type=instance_type) - - def get_instance_type(self, node_config): - return self.default_instance_type - def set_node_tags(self, node_id, tags): self.mock_nodes[node_id].tags.update(tags) @@ -376,6 +371,25 @@ class AutoscalingTest(unittest.TestCase): except ValidationError: self.fail("Default config did not pass validation test!") + def testGetOrCreateHeadNode(self): + config_path = self.write_config(SMALL_CLUSTER) + self.provider = MockProvider() + runner = MockProcessRunner() + get_or_create_head_node( + SMALL_CLUSTER, + config_path, + no_restart=False, + restart_only=False, + yes=True, + override_cluster_name=None, + _provider=self.provider, + _runner=runner) + self.waitForNodes(1) + runner.assert_has_call("1.2.3.4", "init_cmd") + runner.assert_has_call("1.2.3.4", "head_setup_cmd") + runner.assert_has_call("1.2.3.4", "start_ray_head") + self.assertEqual(self.provider.mock_nodes[0].instance_type, None) + def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index c7a133e97..53f55d710 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -11,6 +11,8 @@ from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ from ray.autoscaler.autoscaler import StandardAutoscaler from ray.autoscaler.load_metrics import LoadMetrics from ray.autoscaler.node_provider import NODE_PROVIDERS +from ray.autoscaler.commands import get_or_create_head_node +from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ get_bin_pack_residual, get_instances_for @@ -18,24 +20,30 @@ from time import sleep TYPES_A = { "m4.large": { + "node_config": { + "FooProperty": 42, + }, "resources": { "CPU": 2 }, "max_workers": 10, }, "m4.4xlarge": { + "node_config": {}, "resources": { "CPU": 16 }, "max_workers": 8, }, "m4.16xlarge": { + "node_config": {}, "resources": { "CPU": 64 }, "max_workers": 4, }, "p2.xlarge": { + "node_config": {}, "resources": { "CPU": 16, "GPU": 1 @@ -43,6 +51,7 @@ TYPES_A = { "max_workers": 10, }, "p2.8xlarge": { + "node_config": {}, "resources": { "CPU": 32, "GPU": 8 @@ -51,9 +60,12 @@ TYPES_A = { }, } -MULTI_WORKER_CLUSTER = dict(SMALL_CLUSTER, **{ - "available_instance_types": TYPES_A, -}) +MULTI_WORKER_CLUSTER = dict( + SMALL_CLUSTER, **{ + "available_node_types": TYPES_A, + "head_node_type": "m4.large", + "worker_default_node_type": "m4.large", + }) def test_util_score(): @@ -182,9 +194,33 @@ class AutoscalingTest(unittest.TestCase): f.write(yaml.dump(config)) return path + def testGetOrCreateMultiNodeType(self): + config_path = self.write_config(MULTI_WORKER_CLUSTER) + self.provider = MockProvider() + runner = MockProcessRunner() + get_or_create_head_node( + MULTI_WORKER_CLUSTER, + config_path, + no_restart=False, + restart_only=False, + yes=True, + override_cluster_name=None, + _provider=self.provider, + _runner=runner) + self.waitForNodes(1) + runner.assert_has_call("1.2.3.4", "init_cmd") + runner.assert_has_call("1.2.3.4", "head_setup_cmd") + runner.assert_has_call("1.2.3.4", "start_ray_head") + self.assertEqual(self.provider.mock_nodes[0].instance_type, "m4.large") + self.assertEqual( + self.provider.mock_nodes[0].node_config.get("FooProperty"), 42) + self.assertEqual( + self.provider.mock_nodes[0].tags.get(TAG_RAY_INSTANCE_TYPE), + "m4.large") + def testScaleUpMinSanity(self): config_path = self.write_config(MULTI_WORKER_CLUSTER) - self.provider = MockProvider(default_instance_type="m4.large") + self.provider = MockProvider() runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, @@ -203,7 +239,7 @@ class AutoscalingTest(unittest.TestCase): config["min_workers"] = 0 config["max_workers"] = 50 config_path = self.write_config(config) - self.provider = MockProvider(default_instance_type="m4.large") + self.provider = MockProvider() runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, @@ -233,7 +269,7 @@ class AutoscalingTest(unittest.TestCase): config["min_workers"] = 0 config["max_workers"] = 50 config_path = self.write_config(config) - self.provider = MockProvider(default_instance_type="m4.large") + self.provider = MockProvider() runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path,