From 6b86d4d2802b62e6ee30613894fff9c626b2e375 Mon Sep 17 00:00:00 2001 From: Ameer Haj Ali Date: Fri, 2 Oct 2020 21:16:43 -0700 Subject: [PATCH] Automatically detect CPU, GPU, accelerator_type for AWS (#11147) --- doc/source/cluster/autoscaling.rst | 17 ++-- .../autoscaler/_private/aws/node_provider.py | 44 +++++++++++ python/ray/autoscaler/_private/util.py | 25 +++++- .../aws/example-multi-node-type.yaml | 13 +++- python/ray/autoscaler/node_provider.py | 8 +- python/ray/tests/test_autoscaler_yaml.py | 77 +++++++++++++++++++ 6 files changed, 172 insertions(+), 12 deletions(-) diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst index 022cad6d1..fc91225ab 100644 --- a/doc/source/cluster/autoscaling.rst +++ b/doc/source/cluster/autoscaling.rst @@ -48,7 +48,7 @@ Unmanaged nodes **must have 0 resources**. If you are using the `available_node_types` field, you should create a custom node type with `resources: {}`, and `max_workers: 0` when configuring the autoscaler. -The autoscaler will not attempt to start, stop, or update unmanaged nodes. The user is responsible for properly setting up and cleaning up unmanaged nodes. +The autoscaler will not attempt to start, stop, or update unmanaged nodes. The user is responsible for properly setting up and cleaning up unmanaged nodes. Multiple Node Type Autoscaling @@ -71,7 +71,9 @@ An example of configuring multiple node types is as follows `(full example) Dict[str, Any]: + """Fills out missing "resources" field for available_node_types.""" + if "available_node_types" not in cluster_config: + return cluster_config + cluster_config = copy.deepcopy(cluster_config) + + instances_list = boto3.client("ec2").describe_instance_types()[ + "InstanceTypes"] + instances_dict = { + instance["InstanceType"]: instance + for instance in instances_list + } + available_node_types = cluster_config["available_node_types"] + for node_type in available_node_types: + instance_type = available_node_types[node_type]["node_config"][ + "InstanceType"] + if instance_type in instances_dict: + cpus = instances_dict[instance_type]["VCpuInfo"][ + "DefaultVCpus"] + autodetected_resources = {"CPU": cpus} + gpus = instances_dict[instance_type].get("GpuInfo", + {}).get("Gpus") + if gpus is not None: + # TODO(ameer): currently we support one gpu type per node. + assert len(gpus) == 1 + gpu_name = gpus[0]["Name"] + autodetected_resources.update({ + "GPU": gpus[0]["Count"], + f"accelerator_type:{gpu_name}": 1 + }) + autodetected_resources.update( + available_node_types[node_type].get("resources", {})) + if autodetected_resources != \ + available_node_types[node_type].get("resources", {}): + available_node_types[node_type][ + "resources"] = autodetected_resources + cli_logger.print("Updating the resources of {} to {}.", + node_type, autodetected_resources) + + return cluster_config diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index 890f3d19a..f3f28d549 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -1,4 +1,5 @@ import collections +import logging import hashlib import json import jsonschema @@ -8,7 +9,8 @@ from typing import Any, Dict import ray import ray._private.services as services -from ray.autoscaler._private.providers import _get_default_config +from ray.autoscaler._private.providers import _get_default_config, \ + _NODE_PROVIDERS from ray.autoscaler._private.docker import validate_docker_config REQUIRED, OPTIONAL = True, False @@ -19,6 +21,8 @@ RAY_SCHEMA_PATH = os.path.join( DEBUG_AUTOSCALING_ERROR = "__autoscaling_error" DEBUG_AUTOSCALING_STATUS = "__autoscaling_status" +logger = logging.getLogger(__name__) + class ConcurrentCounter: def __init__(self): @@ -98,9 +102,28 @@ def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: defaults = _get_default_config(config["provider"]) defaults.update(config) defaults["auth"] = defaults.get("auth", {}) + try: + defaults = _fillout_available_node_types_resources(defaults) + except Exception: + # We don't want to introduce new errors with filling available node + # types resources feature. + logger.exception("Failed to autodetect node resources") + return defaults +def _fillout_available_node_types_resources( + cluster_config: Dict[str, Any]) -> Dict[str, Any]: + """Fills out missing "resources" field for available_node_types.""" + if "available_node_types" in cluster_config: + importer = _NODE_PROVIDERS.get(cluster_config["provider"]["type"]) + if importer is not None: + provider_cls = importer(cluster_config["provider"]) + return provider_cls.fillout_available_node_types_resources( + cluster_config) + return cluster_config + + def merge_setup_commands(config): config["head_setup_commands"] = ( config["setup_commands"] + config["head_setup_commands"]) diff --git a/python/ray/autoscaler/aws/example-multi-node-type.yaml b/python/ray/autoscaler/aws/example-multi-node-type.yaml index a77aeacd0..14d8c363b 100644 --- a/python/ray/autoscaler/aws/example-multi-node-type.yaml +++ b/python/ray/autoscaler/aws/example-multi-node-type.yaml @@ -16,7 +16,9 @@ available_node_types: cpu_4_ondemand: node_config: InstanceType: m4.xlarge - resources: {"CPU": 4} + # For AWS instances, autoscaler will automatically add the available + # CPUs/GPUs/accelerator_type ({"CPU": 4} for m4.xlarge) in "resources". + # resources: {"CPU": 4} min_workers: 1 max_workers: 5 cpu_16_spot: @@ -24,19 +26,22 @@ available_node_types: InstanceType: m4.4xlarge InstanceMarketOptions: MarketType: spot - resources: {"CPU": 16, "Custom1": 1} + # Autoscaler will auto fill the CPU resources below. + resources: {"Custom1": 1, "is_spot": 1} max_workers: 10 gpu_1_ondemand: node_config: InstanceType: p2.xlarge - resources: {"CPU": 4, "GPU": 1, "Custom2": 2, "accelerator_type:K80": 1} + # Autoscaler will auto fill the CPU/GPU resources below. + resources: {"Custom2": 2} max_workers: 4 worker_setup_commands: - pip install tensorflow-gpu # Example command. gpu_8_ondemand: node_config: InstanceType: p3.8xlarge - resources: {"CPU": 32, "GPU": 4, "accelerator_type:V100": 1} + # Autoscaler autofills the "resources" below. + # resources: {"CPU": 32, "GPU": 4, "accelerator_type:V100": 1} max_workers: 2 worker_setup_commands: - pip install tensorflow-gpu # Example command. diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index cc37313c7..dd287a743 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -1,5 +1,5 @@ import logging -from typing import Any +from typing import Any, Dict from ray.autoscaler._private.command_runner import \ SSHCommandRunner, DockerCommandRunner @@ -174,3 +174,9 @@ class NodeProvider: def prepare_for_head_node(self, cluster_config): """Returns a new cluster config with custom configs for head node.""" return cluster_config + + @staticmethod + def fillout_available_node_types_resources( + self, cluster_config: Dict[str, Any]) -> Dict[str, Any]: + """Fills out missing "resources" field for available_node_types.""" + return cluster_config diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index 2d57a2f9a..0f4896324 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -5,6 +5,8 @@ import tempfile import unittest import urllib import yaml +import copy +from unittest.mock import MagicMock, Mock, patch from ray.autoscaler._private.util import prepare_config, validate_config from ray.test_utils import recursive_fnmatch @@ -20,6 +22,9 @@ CONFIG_PATHS += recursive_fnmatch( class AutoscalingConfigTest(unittest.TestCase): def testValidateDefaultConfig(self): for config_path in CONFIG_PATHS: + if "aws/example-multi-node-type.yaml" in config_path: + # This is tested in testValidateDefaultConfigAWSMultiNodeTypes. + continue with open(config_path) as f: config = yaml.safe_load(f) config = prepare_config(config) @@ -28,6 +33,78 @@ class AutoscalingConfigTest(unittest.TestCase): except Exception: self.fail("Config did not pass validation test!") + def testValidateDefaultConfigAWSMultiNodeTypes(self): + aws_config_path = os.path.join( + RAY_PATH, "autoscaler/aws/example-multi-node-type.yaml") + with open(aws_config_path) as f: + config = yaml.safe_load(f) + new_config = copy.deepcopy(config) + # modify it here + new_config["available_node_types"] = { + "cpu_4_ondemand": new_config["available_node_types"][ + "cpu_4_ondemand"], + "cpu_16_spot": new_config["available_node_types"]["cpu_16_spot"], + "gpu_8_ondemand": new_config["available_node_types"][ + "gpu_8_ondemand"] + } + orig_new_config = copy.deepcopy(new_config) + expected_available_node_types = orig_new_config["available_node_types"] + expected_available_node_types["cpu_4_ondemand"]["resources"] = { + "CPU": 4 + } + expected_available_node_types["cpu_16_spot"]["resources"] = { + "CPU": 16, + "Custom1": 1, + "is_spot": 1 + } + expected_available_node_types["gpu_8_ondemand"]["resources"] = { + "CPU": 32, + "GPU": 4, + "accelerator_type:V100": 1 + } + + boto3_dict = { + "InstanceTypes": [{ + "InstanceType": "m4.xlarge", + "VCpuInfo": { + "DefaultVCpus": 4 + } + }, { + "InstanceType": "m4.4xlarge", + "VCpuInfo": { + "DefaultVCpus": 16 + } + }, { + "InstanceType": "p3.8xlarge", + "VCpuInfo": { + "DefaultVCpus": 32 + }, + "GpuInfo": { + "Gpus": [{ + "Name": "V100", + "Count": 4 + }] + } + }] + } + boto3_mock = Mock() + describe_instance_types_mock = Mock() + describe_instance_types_mock.describe_instance_types = MagicMock( + return_value=boto3_dict) + boto3_mock.client = MagicMock( + return_value=describe_instance_types_mock) + with patch.multiple( + "ray.autoscaler._private.aws.node_provider", + boto3=boto3_mock, + ): + new_config = prepare_config(new_config) + + try: + validate_config(new_config) + expected_available_node_types == new_config["available_node_types"] + except Exception: + self.fail("Config did not pass multi node types auto fill test!") + def testValidateNetworkConfig(self): web_yaml = "https://raw.githubusercontent.com/ray-project/ray/" \ "master/python/ray/autoscaler/aws/example-full.yaml"