mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 22:20:52 +08:00
Automatically detect CPU, GPU, accelerator_type for AWS (#11147)
This commit is contained in:
@@ -3,6 +3,7 @@ import copy
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
|
||||
import boto3
|
||||
import botocore
|
||||
@@ -478,3 +479,46 @@ class AWSNodeProvider(NodeProvider):
|
||||
@staticmethod
|
||||
def bootstrap_config(cluster_config):
|
||||
return bootstrap_aws(cluster_config)
|
||||
|
||||
@staticmethod
|
||||
def fillout_available_node_types_resources(
|
||||
cluster_config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Fills out missing "resources" field for available_node_types."""
|
||||
if "available_node_types" not in cluster_config:
|
||||
return cluster_config
|
||||
cluster_config = copy.deepcopy(cluster_config)
|
||||
|
||||
instances_list = boto3.client("ec2").describe_instance_types()[
|
||||
"InstanceTypes"]
|
||||
instances_dict = {
|
||||
instance["InstanceType"]: instance
|
||||
for instance in instances_list
|
||||
}
|
||||
available_node_types = cluster_config["available_node_types"]
|
||||
for node_type in available_node_types:
|
||||
instance_type = available_node_types[node_type]["node_config"][
|
||||
"InstanceType"]
|
||||
if instance_type in instances_dict:
|
||||
cpus = instances_dict[instance_type]["VCpuInfo"][
|
||||
"DefaultVCpus"]
|
||||
autodetected_resources = {"CPU": cpus}
|
||||
gpus = instances_dict[instance_type].get("GpuInfo",
|
||||
{}).get("Gpus")
|
||||
if gpus is not None:
|
||||
# TODO(ameer): currently we support one gpu type per node.
|
||||
assert len(gpus) == 1
|
||||
gpu_name = gpus[0]["Name"]
|
||||
autodetected_resources.update({
|
||||
"GPU": gpus[0]["Count"],
|
||||
f"accelerator_type:{gpu_name}": 1
|
||||
})
|
||||
autodetected_resources.update(
|
||||
available_node_types[node_type].get("resources", {}))
|
||||
if autodetected_resources != \
|
||||
available_node_types[node_type].get("resources", {}):
|
||||
available_node_types[node_type][
|
||||
"resources"] = autodetected_resources
|
||||
cli_logger.print("Updating the resources of {} to {}.",
|
||||
node_type, autodetected_resources)
|
||||
|
||||
return cluster_config
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import collections
|
||||
import logging
|
||||
import hashlib
|
||||
import json
|
||||
import jsonschema
|
||||
@@ -8,7 +9,8 @@ from typing import Any, Dict
|
||||
|
||||
import ray
|
||||
import ray._private.services as services
|
||||
from ray.autoscaler._private.providers import _get_default_config
|
||||
from ray.autoscaler._private.providers import _get_default_config, \
|
||||
_NODE_PROVIDERS
|
||||
from ray.autoscaler._private.docker import validate_docker_config
|
||||
|
||||
REQUIRED, OPTIONAL = True, False
|
||||
@@ -19,6 +21,8 @@ RAY_SCHEMA_PATH = os.path.join(
|
||||
DEBUG_AUTOSCALING_ERROR = "__autoscaling_error"
|
||||
DEBUG_AUTOSCALING_STATUS = "__autoscaling_status"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConcurrentCounter:
|
||||
def __init__(self):
|
||||
@@ -98,9 +102,28 @@ def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
defaults = _get_default_config(config["provider"])
|
||||
defaults.update(config)
|
||||
defaults["auth"] = defaults.get("auth", {})
|
||||
try:
|
||||
defaults = _fillout_available_node_types_resources(defaults)
|
||||
except Exception:
|
||||
# We don't want to introduce new errors with filling available node
|
||||
# types resources feature.
|
||||
logger.exception("Failed to autodetect node resources")
|
||||
|
||||
return defaults
|
||||
|
||||
|
||||
def _fillout_available_node_types_resources(
|
||||
cluster_config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Fills out missing "resources" field for available_node_types."""
|
||||
if "available_node_types" in cluster_config:
|
||||
importer = _NODE_PROVIDERS.get(cluster_config["provider"]["type"])
|
||||
if importer is not None:
|
||||
provider_cls = importer(cluster_config["provider"])
|
||||
return provider_cls.fillout_available_node_types_resources(
|
||||
cluster_config)
|
||||
return cluster_config
|
||||
|
||||
|
||||
def merge_setup_commands(config):
|
||||
config["head_setup_commands"] = (
|
||||
config["setup_commands"] + config["head_setup_commands"])
|
||||
|
||||
@@ -16,7 +16,9 @@ available_node_types:
|
||||
cpu_4_ondemand:
|
||||
node_config:
|
||||
InstanceType: m4.xlarge
|
||||
resources: {"CPU": 4}
|
||||
# For AWS instances, autoscaler will automatically add the available
|
||||
# CPUs/GPUs/accelerator_type ({"CPU": 4} for m4.xlarge) in "resources".
|
||||
# resources: {"CPU": 4}
|
||||
min_workers: 1
|
||||
max_workers: 5
|
||||
cpu_16_spot:
|
||||
@@ -24,19 +26,22 @@ available_node_types:
|
||||
InstanceType: m4.4xlarge
|
||||
InstanceMarketOptions:
|
||||
MarketType: spot
|
||||
resources: {"CPU": 16, "Custom1": 1}
|
||||
# Autoscaler will auto fill the CPU resources below.
|
||||
resources: {"Custom1": 1, "is_spot": 1}
|
||||
max_workers: 10
|
||||
gpu_1_ondemand:
|
||||
node_config:
|
||||
InstanceType: p2.xlarge
|
||||
resources: {"CPU": 4, "GPU": 1, "Custom2": 2, "accelerator_type:K80": 1}
|
||||
# Autoscaler will auto fill the CPU/GPU resources below.
|
||||
resources: {"Custom2": 2}
|
||||
max_workers: 4
|
||||
worker_setup_commands:
|
||||
- pip install tensorflow-gpu # Example command.
|
||||
gpu_8_ondemand:
|
||||
node_config:
|
||||
InstanceType: p3.8xlarge
|
||||
resources: {"CPU": 32, "GPU": 4, "accelerator_type:V100": 1}
|
||||
# Autoscaler autofills the "resources" below.
|
||||
# resources: {"CPU": 32, "GPU": 4, "accelerator_type:V100": 1}
|
||||
max_workers: 2
|
||||
worker_setup_commands:
|
||||
- pip install tensorflow-gpu # Example command.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import Any, Dict
|
||||
|
||||
from ray.autoscaler._private.command_runner import \
|
||||
SSHCommandRunner, DockerCommandRunner
|
||||
@@ -174,3 +174,9 @@ class NodeProvider:
|
||||
def prepare_for_head_node(self, cluster_config):
|
||||
"""Returns a new cluster config with custom configs for head node."""
|
||||
return cluster_config
|
||||
|
||||
@staticmethod
|
||||
def fillout_available_node_types_resources(
|
||||
self, cluster_config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Fills out missing "resources" field for available_node_types."""
|
||||
return cluster_config
|
||||
|
||||
@@ -5,6 +5,8 @@ import tempfile
|
||||
import unittest
|
||||
import urllib
|
||||
import yaml
|
||||
import copy
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
from ray.autoscaler._private.util import prepare_config, validate_config
|
||||
from ray.test_utils import recursive_fnmatch
|
||||
@@ -20,6 +22,9 @@ CONFIG_PATHS += recursive_fnmatch(
|
||||
class AutoscalingConfigTest(unittest.TestCase):
|
||||
def testValidateDefaultConfig(self):
|
||||
for config_path in CONFIG_PATHS:
|
||||
if "aws/example-multi-node-type.yaml" in config_path:
|
||||
# This is tested in testValidateDefaultConfigAWSMultiNodeTypes.
|
||||
continue
|
||||
with open(config_path) as f:
|
||||
config = yaml.safe_load(f)
|
||||
config = prepare_config(config)
|
||||
@@ -28,6 +33,78 @@ class AutoscalingConfigTest(unittest.TestCase):
|
||||
except Exception:
|
||||
self.fail("Config did not pass validation test!")
|
||||
|
||||
def testValidateDefaultConfigAWSMultiNodeTypes(self):
|
||||
aws_config_path = os.path.join(
|
||||
RAY_PATH, "autoscaler/aws/example-multi-node-type.yaml")
|
||||
with open(aws_config_path) as f:
|
||||
config = yaml.safe_load(f)
|
||||
new_config = copy.deepcopy(config)
|
||||
# modify it here
|
||||
new_config["available_node_types"] = {
|
||||
"cpu_4_ondemand": new_config["available_node_types"][
|
||||
"cpu_4_ondemand"],
|
||||
"cpu_16_spot": new_config["available_node_types"]["cpu_16_spot"],
|
||||
"gpu_8_ondemand": new_config["available_node_types"][
|
||||
"gpu_8_ondemand"]
|
||||
}
|
||||
orig_new_config = copy.deepcopy(new_config)
|
||||
expected_available_node_types = orig_new_config["available_node_types"]
|
||||
expected_available_node_types["cpu_4_ondemand"]["resources"] = {
|
||||
"CPU": 4
|
||||
}
|
||||
expected_available_node_types["cpu_16_spot"]["resources"] = {
|
||||
"CPU": 16,
|
||||
"Custom1": 1,
|
||||
"is_spot": 1
|
||||
}
|
||||
expected_available_node_types["gpu_8_ondemand"]["resources"] = {
|
||||
"CPU": 32,
|
||||
"GPU": 4,
|
||||
"accelerator_type:V100": 1
|
||||
}
|
||||
|
||||
boto3_dict = {
|
||||
"InstanceTypes": [{
|
||||
"InstanceType": "m4.xlarge",
|
||||
"VCpuInfo": {
|
||||
"DefaultVCpus": 4
|
||||
}
|
||||
}, {
|
||||
"InstanceType": "m4.4xlarge",
|
||||
"VCpuInfo": {
|
||||
"DefaultVCpus": 16
|
||||
}
|
||||
}, {
|
||||
"InstanceType": "p3.8xlarge",
|
||||
"VCpuInfo": {
|
||||
"DefaultVCpus": 32
|
||||
},
|
||||
"GpuInfo": {
|
||||
"Gpus": [{
|
||||
"Name": "V100",
|
||||
"Count": 4
|
||||
}]
|
||||
}
|
||||
}]
|
||||
}
|
||||
boto3_mock = Mock()
|
||||
describe_instance_types_mock = Mock()
|
||||
describe_instance_types_mock.describe_instance_types = MagicMock(
|
||||
return_value=boto3_dict)
|
||||
boto3_mock.client = MagicMock(
|
||||
return_value=describe_instance_types_mock)
|
||||
with patch.multiple(
|
||||
"ray.autoscaler._private.aws.node_provider",
|
||||
boto3=boto3_mock,
|
||||
):
|
||||
new_config = prepare_config(new_config)
|
||||
|
||||
try:
|
||||
validate_config(new_config)
|
||||
expected_available_node_types == new_config["available_node_types"]
|
||||
except Exception:
|
||||
self.fail("Config did not pass multi node types auto fill test!")
|
||||
|
||||
def testValidateNetworkConfig(self):
|
||||
web_yaml = "https://raw.githubusercontent.com/ray-project/ray/" \
|
||||
"master/python/ray/autoscaler/aws/example-full.yaml"
|
||||
|
||||
Reference in New Issue
Block a user