diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index f401e7885..c57d19a8c 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -1,6 +1,5 @@ from collections import defaultdict import copy -import json import logging import math import numpy as np @@ -18,15 +17,13 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, STATUS_UP_TO_DATE, NODE_TYPE_WORKER) from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.node_launcher import NodeLauncher +from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler from ray.autoscaler.util import ConcurrentCounter, validate_config, \ with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ - AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \ - AUTOSCALER_RESOURCE_REQUEST_CHANNEL -import ray.services as services -from ray.worker import global_worker + AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S from six.moves import queue logger = logging.getLogger(__name__) @@ -64,6 +61,15 @@ class StandardAutoscaler: self.provider = get_node_provider(self.config["provider"], self.config["cluster_name"]) + # Check whether we can enable the resource demand scheduler. + if "available_instance_types" in self.config: + self.instance_types = self.config["available_instance_types"] + self.resource_demand_scheduler = ResourceDemandScheduler( + self.provider, self.instance_types, self.config["max_workers"]) + else: + self.instance_types = None + self.resource_demand_scheduler = None + self.max_failures = max_failures self.max_launch_batch = max_launch_batch self.max_concurrent_launches = max_concurrent_launches @@ -103,7 +109,10 @@ class StandardAutoscaler: for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) + # Aggregate resources the user is requesting of the cluster. self.resource_requests = defaultdict(int) + # List of resource bundles the user is requesting of the cluster. + self.resource_demand_vector = None logger.info("StandardAutoscaler: {}".format(self.config)) @@ -178,6 +187,17 @@ class StandardAutoscaler: nodes = self.workers() self.log_info_string(nodes, target_workers) + # First let the resource demand scheduler launch nodes, if enabled. + if self.resource_demand_scheduler and self.resource_demand_vector: + # TODO(ekl) include head node in the node list + instances = ( + self.resource_demand_scheduler.get_instances_to_launch( + nodes, self.pending_launches.breakdown(), + self.resource_demand_vector)) + # TODO(ekl) also enforce max launch concurrency here? + for instance_type, count in instances: + self.launch_new_node(count, instance_type=instance_type) + # Launch additional nodes of the default type, if still needed. num_pending = self.pending_launches.value num_workers = len(nodes) + num_pending @@ -383,6 +403,11 @@ class StandardAutoscaler: def launch_new_node(self, count, instance_type): logger.info( "StandardAutoscaler: Queue {} new nodes for launch".format(count)) + # Try to fill in the default instance type so we can tag it properly. + if not instance_type: + instance_type = self.provider.get_instance_type( + self.config["worker_nodes"]) + assert instance_type is not None self.pending_launches.inc(instance_type, count) config = copy.deepcopy(self.config) self.launch_queue.put((config, count, instance_type)) @@ -397,6 +422,9 @@ class StandardAutoscaler: tmp += "\n" tmp += self.load_metrics.info_string() tmp += "\n" + if self.resource_demand_scheduler: + tmp += self.resource_demand_scheduler.debug_string( + nodes, self.pending_launches.breakdown()) if _internal_kv_initialized(): _internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True) logger.info(tmp) @@ -416,12 +444,20 @@ class StandardAutoscaler: return "{}/{} target nodes{}".format(len(nodes), target, suffix) def request_resources(self, resources): - for resource, count in resources.items(): - self.resource_requests[resource] = max( - self.resource_requests[resource], count) + """Called by monitor to request resources (EXPERIMENTAL). + Args: + resources: Either a list of resource bundles or a single resource + demand dictionary. + """ logger.info( "StandardAutoscaler: resource_requests={}".format(resources)) + if isinstance(resources, list): + self.resource_demand_vector = resources + else: + for resource, count in resources.items(): + self.resource_requests[resource] = max( + self.resource_requests[resource], count) def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") @@ -432,32 +468,6 @@ class StandardAutoscaler: len(nodes))) -# Note: this is an (experimental) user-facing API, do not move. -def request_resources(num_cpus=None, num_gpus=None, bundles=None): - """Remotely request some CPU or GPU resources from the autoscaler. - - This function is to be called e.g. on a node before submitting a bunch of - ray.remote calls to ensure that resources rapidly become available. - - In the future this could be extended to do GPU cores or other custom - resources. - - This function is non blocking. - - Args: - - num_cpus: int -- the number of CPU cores to request - num_gpus: int -- the number of GPUs to request (Not implemented) - - """ - if num_gpus is not None: - raise NotImplementedError( - "GPU resource is not yet supported through request_resources") - r = services.create_redis_client( - global_worker.node.redis_address, - password=global_worker.node.redis_password) - if num_cpus > 0: - r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, - json.dumps({ - "CPU": num_cpus - })) +def request_resources(num_cpus=None, num_gpus=None): + raise DeprecationWarning( + "Please use ray.autoscaler.commands.request_resources instead.") diff --git a/python/ray/autoscaler/aws/example-auto-instance-type.yaml b/python/ray/autoscaler/aws/example-auto-instance-type.yaml new file mode 100644 index 000000000..5efebd185 --- /dev/null +++ b/python/ray/autoscaler/aws/example-auto-instance-type.yaml @@ -0,0 +1,47 @@ +# EXPERIMENTAL: an example of configuring a mixed-worker cluster. Currently +# multi-worker autoscaling only works if you use the request_resources() call. +cluster_name: auto_instance_type +min_workers: 1 +max_workers: 40 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +# Tell the autoscaler the allowed node types and the resources they provide. +# This only has an effect if you use the experimental request_resources() call. +available_instance_types: + m4.xlarge: + resources: {"CPU": 4} + max_workers: 10 + m4.4xlarge: + resources: {"CPU": 16} + max_workers: 10 + p2.xlarge: + resources: {"CPU": 4, "GPU": 1} + max_workers: 4 + p2.8xlarge: + resources: {"CPU": 32, "GPU": 8} + max_workers: 2 + +# Configure the cluster for very conservative auto-scaling otherwise. +target_utilization_fraction: 1.0 +idle_timeout_minutes: 2 + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + +# Provider-specific config for the head node, e.g. instance type. +head_node: + InstanceType: m4.xlarge + ImageId: latest_dlami + +# Provider-specific config for the worker nodes, e.g. instance type. +# NOTE: the instance type can be overriden by the resource demand scheduler. +# The instance type set here is only used as the default fallback. +worker_nodes: + InstanceType: m4.xlarge + ImageId: latest_dlami diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index c19ec1ac5..6b30870bf 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -1,4 +1,5 @@ import random +import copy import threading from collections import defaultdict import logging @@ -9,7 +10,7 @@ from botocore.config import Config from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \ - TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_TYPE + TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_TYPE, TAG_RAY_INSTANCE_TYPE from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES from ray.autoscaler.log_timer import LogTimer @@ -184,7 +185,19 @@ class AWSNodeProvider(NodeProvider): self.tag_cache_update_event.set() + def create_node_of_type(self, node_config, tags, instance_type, count): + assert instance_type is not None + node_config["InstanceType"] = instance_type + return self.create_node(node_config, tags, count) + + def get_instance_type(self, node_config): + return node_config["InstanceType"] + def create_node(self, node_config, tags, count): + # Always add the instance type tag, since node reuse is unsafe + # otherwise. + tags = copy.deepcopy(tags) + tags[TAG_RAY_INSTANCE_TYPE] = node_config["InstanceType"] # Try to reuse previously stopped nodes with compatible configs if self.cache_stopped_nodes: filters = [ @@ -200,6 +213,10 @@ class AWSNodeProvider(NodeProvider): "Name": "tag:{}".format(TAG_RAY_NODE_TYPE), "Values": [tags[TAG_RAY_NODE_TYPE]], }, + { + "Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE), + "Values": [tags[TAG_RAY_INSTANCE_TYPE]], + }, { "Name": "tag:{}".format(TAG_RAY_LAUNCH_CONFIG), "Values": [tags[TAG_RAY_LAUNCH_CONFIG]], diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index c491392cd..a419977cc 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -23,6 +23,7 @@ from ray.autoscaler.util import validate_config, hash_runtime_conf, \ from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD +from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.log_timer import LogTimer from ray.autoscaler.docker import with_docker_exec @@ -56,6 +57,30 @@ def debug_status(): return status +def request_resources(num_cpus=None, bundles=None): + """Remotely request some CPU or GPU resources from the autoscaler. + + This function is to be called e.g. on a node before submitting a bunch of + ray.remote calls to ensure that resources rapidly become available. + + This function is EXPERIMENTAL. + + Args: + num_cpus: int -- the number of CPU cores to request + bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This + only has an effect if you've configured `available_instance_types` + if your cluster config. + """ + r = _redis() + if num_cpus is not None and num_cpus > 0: + r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, + json.dumps({ + "CPU": num_cpus + })) + if bundles: + r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(bundles)) + + def create_or_update_cluster(config_file, override_min_workers, override_max_workers, no_restart, restart_only, yes, override_cluster_name): diff --git a/python/ray/autoscaler/node_launcher.py b/python/ray/autoscaler/node_launcher.py index f23846a78..a2776e88b 100644 --- a/python/ray/autoscaler/node_launcher.py +++ b/python/ray/autoscaler/node_launcher.py @@ -3,7 +3,8 @@ import threading from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, TAG_RAY_NODE_NAME, - STATUS_UNINITIALIZED, NODE_TYPE_WORKER) + TAG_RAY_INSTANCE_TYPE, STATUS_UNINITIALIZED, + NODE_TYPE_WORKER) from ray.autoscaler.util import hash_launch_conf logger = logging.getLogger(__name__) @@ -32,7 +33,7 @@ class NodeLauncher(threading.Thread): TAG_RAY_LAUNCH_CONFIG: launch_hash, } if instance_type: - # node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type + node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type self.provider.create_node_of_type(node_config, node_tags, instance_type, count) else: diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 15b6f8573..7aa67d36b 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -211,6 +211,22 @@ class NodeProvider: """Clean-up when a Provider is no longer required.""" pass + def create_node_of_type(self, node_config, tags, instance_type, count): + """Creates a number of nodes with a given instance type. + + This is an optional method only required if using the resource + demand scheduler. + """ + assert instance_type is not None + raise NotImplementedError + + def get_instance_type(self, node_config): + """Returns the instance type of this node config. + + This is an optional method only required if using the resource + demand scheduler.""" + return None + def get_command_runner(self, log_prefix, node_id, diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index ca1d2ff07..9fc256bca 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -241,6 +241,24 @@ }, "no_restart": { "description": "Whether to avoid restarting the cluster during updates. This field is controlled by the ray --no-restart flag and cannot be set by the user." + }, + "available_instance_types": { + "type": "object", + "description": "A list of instance types available for launching with 'auto' worker type.", + "patternProperties": { + ".*": { + "type": "object", + "properties": { + "max_workers": {"type": "integer"}, + "resources": { + "type": "object", + ".*": {"type": "number"} + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false } } -} \ No newline at end of file +} diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/resource_demand_scheduler.py new file mode 100644 index 000000000..354650c7e --- /dev/null +++ b/python/ray/autoscaler/resource_demand_scheduler.py @@ -0,0 +1,226 @@ +import copy +import numpy as np +import logging +import collections +from typing import List, Dict, Tuple + +from ray.autoscaler.node_provider import NodeProvider +from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE + +logger = logging.getLogger(__name__) + +# e.g., m4.16xlarge. +InstanceType = str + +# e.g., {"resources": ..., "max_workers": ...}. +InstanceTypeConfigDict = str + +# e.g., {"GPU": 1}. +ResourceDict = str + +# e.g., IP address of the node. +NodeID = str + + +class ResourceDemandScheduler: + def __init__(self, provider: NodeProvider, + instance_types: Dict[InstanceType, InstanceTypeConfigDict], + max_workers: int): + self.provider = provider + self.instance_types = instance_types + self.max_workers = max_workers + + def debug_string(self, nodes: List[NodeID], + pending_nodes: Dict[NodeID, int]) -> str: + node_resources, instance_type_counts = self.calculate_node_resources( + nodes, pending_nodes) + + out = "Worker instance types:" + for instance_type, count in instance_type_counts.items(): + out += "\n - {}: {}".format(instance_type, count) + if pending_nodes.get(instance_type): + out += " ({} pending)".format(pending_nodes[instance_type]) + + return out + + def calculate_node_resources( + self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int] + ) -> (List[ResourceDict], Dict[InstanceType, int]): + """Returns node resource list and instance type counts.""" + + node_resources = [] + instance_type_counts = collections.defaultdict(int) + + def add_instance(instance_type): + if instance_type not in self.instance_types: + raise RuntimeError( + "Missing entry for instance_type {} in " + "available_instance_types config: {}".format( + instance_type, self.instance_types)) + # Careful not to include the same dict object multiple times. + node_resources.append( + copy.deepcopy(self.instance_types[instance_type]["resources"])) + instance_type_counts[instance_type] += 1 + + for node_id in nodes: + tags = self.provider.node_tags(node_id) + if TAG_RAY_INSTANCE_TYPE in tags: + instance_type = tags[TAG_RAY_INSTANCE_TYPE] + add_instance(instance_type) + + for instance_type, count in pending_nodes.items(): + for _ in range(count): + add_instance(instance_type) + + return node_resources, instance_type_counts + + def get_instances_to_launch(self, nodes: List[NodeID], + pending_nodes: Dict[InstanceType, int], + resource_demands: List[ResourceDict] + ) -> List[Tuple[InstanceType, int]]: + """Get a list of instance types that should be added to the cluster. + + This method: + (1) calculates the resources present in the cluster. + (2) calculates the unfulfilled resource bundles. + (3) calculates which instances need to be launched to fulfill all + the bundle requests, subject to max_worker constraints. + """ + + if resource_demands is None: + logger.info("No resource demands") + return [] + + node_resources, instance_type_counts = self.calculate_node_resources( + nodes, pending_nodes) + logger.info("Cluster resources: {}".format(node_resources)) + logger.info("Instance counts: {}".format(instance_type_counts)) + + unfulfilled = get_bin_pack_residual(node_resources, resource_demands) + logger.info("Resource demands: {}".format(resource_demands)) + logger.info("Unfulfilled demands: {}".format(unfulfilled)) + + instances = get_instances_for( + self.instance_types, instance_type_counts, + self.max_workers - len(nodes), unfulfilled) + logger.info("Instance requests: {}".format(instances)) + return instances + + +def get_instances_for( + instance_types: Dict[InstanceType, InstanceTypeConfigDict], + existing_instances: Dict[InstanceType, int], max_to_add: int, + resources: List[ResourceDict]) -> List[Tuple[InstanceType, int]]: + """Determine instances to add given resource demands and constraints. + + Args: + instance_types: instance types config. + existing_instances: counts of existing instances already launched. + This sets constraints on the number of new instances to add. + max_to_add: global constraint on instances to add. + resources: resource demands to fulfill. + + Returns: + List of instances types and count to add. + """ + instances_to_add = collections.defaultdict(int) + allocated_resources = [] + + while resources and sum(instances_to_add.values()) < max_to_add: + utilization_scores = [] + for instance_type in instance_types: + if (existing_instances.get( + instance_type, 0) + instances_to_add.get(instance_type, 0) + >= instance_types[instance_type]["max_workers"]): + continue + node_resources = instance_types[instance_type]["resources"] + score = _utilization_score(node_resources, resources) + if score is not None: + utilization_scores.append((score, instance_type)) + + # Give up, no feasible node. + if not utilization_scores: + logger.info("No feasible instance to add for {}".format(resources)) + break + + utilization_scores = sorted(utilization_scores, reverse=True) + best_instance_type = utilization_scores[0][1] + instances_to_add[best_instance_type] += 1 + allocated_resources.append( + instance_types[best_instance_type]["resources"]) + residual = get_bin_pack_residual(allocated_resources[-1:], resources) + assert len(residual) < len(resources), (resources, residual) + resources = residual + + return list(instances_to_add.items()) + + +def _utilization_score(node_resources: ResourceDict, + resources: ResourceDict) -> float: + remaining = copy.deepcopy(node_resources) + + fittable = [] + for r in resources: + if _fits(remaining, r): + fittable.append(r) + _inplace_subtract(remaining, r) + if not fittable: + return None + + util_by_resources = [] + for k, v in node_resources.items(): + util = (v - remaining[k]) / v + util_by_resources.append(v * (util**3)) + + # Prioritize using all resources first, then prioritize overall balance + # of multiple resources. + return (min(util_by_resources), np.mean(util_by_resources)) + + +def get_bin_pack_residual( + node_resources: List[ResourceDict], + resource_demands: List[ResourceDict]) -> List[ResourceDict]: + """Return a subset of resource_demands that cannot fit in the cluster. + + TODO(ekl): this currently does not guarantee the resources will be packed + correctly by the Ray scheduler. This is only possible once the Ray backend + supports a placement groups API. + + Args: + node_resources (List[ResourceDict]): List of resources per node. + resource_demands (List[ResourceDict]): List of resource bundles that + need to be bin packed onto the nodes. + + Returns: + List[ResourceDict] the residual list resources that do not fit. + """ + + unfulfilled = [] + + # A most naive bin packing algorithm. + nodes = copy.deepcopy(node_resources) + for demand in resource_demands: + found = False + for node in nodes: + if _fits(node, demand): + _inplace_subtract(node, demand) + found = True + break + if not found: + unfulfilled.append(demand) + + return unfulfilled + + +def _fits(node: ResourceDict, resources: ResourceDict) -> bool: + for k, v in resources.items(): + if v > node.get(k, 0.0): + return False + return True + + +def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None: + for k, v in resources.items(): + assert k in node, (k, node) + node[k] -= v + assert node[k] >= 0.0, (node, k, v) diff --git a/python/ray/autoscaler/tags.py b/python/ray/autoscaler/tags.py index fca372281..6a49e9e9b 100644 --- a/python/ray/autoscaler/tags.py +++ b/python/ray/autoscaler/tags.py @@ -8,6 +8,10 @@ TAG_RAY_NODE_TYPE = "ray-node-type" NODE_TYPE_HEAD = "head" NODE_TYPE_WORKER = "worker" +# Tag for the provider-specific instance type (e.g., m4.4xlarge). This is used +# for automatic worker instance type selection. +TAG_RAY_INSTANCE_TYPE = "ray-instance-type" + # Tag that reports the current state of the node (e.g. Updating, Up-to-date) TAG_RAY_NODE_STATUS = "ray-node-status" STATUS_UNINITIALIZED = "uninitialized" diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index dd6b58ea4..32c5f2f17 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -76,13 +76,13 @@ class KubernetesCommandRunner: port_forward_cmd) + " failed with error: " + perr raise Exception(exception_str) else: - logger.info(self.log_prefix + "Running {}...".format(cmd)) final_cmd = self.kubectl + ["exec", "-it"] final_cmd += [ self.node_id, "--", ] final_cmd += with_interactive(cmd) + logger.info(self.log_prefix + "Running {}".format(final_cmd)) try: if with_output: return self.process_runner.check_output( diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 5cca474af..0f03146b7 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -21,12 +21,13 @@ import pytest class MockNode: - def __init__(self, node_id, tags): + def __init__(self, node_id, tags, instance_type=None): self.node_id = node_id self.state = "pending" self.tags = tags self.external_ip = "1.2.3.4" self.internal_ip = "172.0.0.{}".format(self.node_id) + self.instance_type = instance_type def matches(self, tags): for k, v in tags.items(): @@ -119,7 +120,7 @@ class MockProvider(NodeProvider): def external_ip(self, node_id): return self.mock_nodes[node_id].external_ip - def create_node(self, node_config, tags, count): + def create_node(self, node_config, tags, count, instance_type=None): self.ready_to_create.wait() if self.fail_creates: return @@ -130,9 +131,17 @@ class MockProvider(NodeProvider): node.state = "pending" node.tags.update(tags) for _ in range(count): - self.mock_nodes[self.next_id] = MockNode(self.next_id, tags.copy()) + self.mock_nodes[self.next_id] = MockNode(self.next_id, tags.copy(), + instance_type) self.next_id += 1 + def create_node_of_type(self, node_config, tags, instance_type, count): + return self.create_node( + node_config, tags, count, instance_type=instance_type) + + def get_instance_type(self, node_config): + return "m4.large" + def set_node_tags(self, node_id, tags): self.mock_nodes[node_id].tags.update(tags) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py new file mode 100644 index 000000000..6c4d84069 --- /dev/null +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -0,0 +1,231 @@ +import pytest +import time +import yaml +import tempfile +import shutil +import unittest + +import ray +from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ + MockProcessRunner +from ray.autoscaler.autoscaler import StandardAutoscaler +from ray.autoscaler.load_metrics import LoadMetrics +from ray.autoscaler.node_provider import NODE_PROVIDERS +from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ + get_bin_pack_residual, get_instances_for + +TYPES_A = { + "m4.large": { + "resources": { + "CPU": 2 + }, + "max_workers": 10, + }, + "m4.4xlarge": { + "resources": { + "CPU": 16 + }, + "max_workers": 8, + }, + "m4.16xlarge": { + "resources": { + "CPU": 64 + }, + "max_workers": 4, + }, + "p2.xlarge": { + "resources": { + "CPU": 16, + "GPU": 1 + }, + "max_workers": 10, + }, + "p2.8xlarge": { + "resources": { + "CPU": 32, + "GPU": 8 + }, + "max_workers": 4, + }, +} + +MULTI_WORKER_CLUSTER = dict(SMALL_CLUSTER, **{ + "available_instance_types": TYPES_A, +}) + + +def test_util_score(): + assert _utilization_score({"CPU": 64}, [{"TPU": 16}]) is None + assert _utilization_score({"GPU": 4}, [{"GPU": 2}]) == (0.5, 0.5) + assert _utilization_score({"GPU": 4}, [{"GPU": 1}, {"GPU": 1}]) == \ + (0.5, 0.5) + assert _utilization_score({"GPU": 2}, [{"GPU": 2}]) == (2, 2) + assert _utilization_score({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == (2, 2) + assert _utilization_score({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == (0, 1) + assert _utilization_score({"CPU": 64}, [{"CPU": 64}]) == (64, 64) + assert _utilization_score({"CPU": 64}, [{"CPU": 32}]) == (8, 8) + assert _utilization_score({"CPU": 64}, [{"CPU": 16}, {"CPU": 16}]) == \ + (8, 8) + + +def test_bin_pack(): + assert get_bin_pack_residual([], [{"GPU": 2}, {"GPU": 2}]) == \ + [{"GPU": 2}, {"GPU": 2}] + assert get_bin_pack_residual([{"GPU": 2}], [{"GPU": 2}, {"GPU": 2}]) == \ + [{"GPU": 2}] + assert get_bin_pack_residual([{"GPU": 4}], [{"GPU": 2}, {"GPU": 2}]) == [] + arg = [{"GPU": 2}, {"GPU": 2, "CPU": 2}] + assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [] + arg = [{"CPU": 2}, {"GPU": 2}] + assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [{"GPU": 2}] + + +def test_get_instances_packing_heuristic(): + assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \ + [("p2.8xlarge", 1)] + assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \ + [("p2.8xlarge", 1)] + assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \ + [("p2.xlarge", 4)] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \ + == [("p2.8xlarge", 3)] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \ + == [] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \ + [("m4.16xlarge", 3)] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \ + == [("m4.16xlarge", 1), ("m4.large", 1)] + assert get_instances_for( + TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \ + [("m4.16xlarge", 1), ("m4.4xlarge", 2)] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \ + [("m4.16xlarge", 1), ("m4.4xlarge", 1)] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \ + [("m4.16xlarge", 1), ("m4.4xlarge", 1)] + assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \ + [("m4.16xlarge", 1), ("m4.4xlarge", 2), ("m4.large", 2)] + assert get_instances_for( + TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \ + [("m4.16xlarge", 1), ("p2.xlarge", 1)] + assert get_instances_for( + TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \ + [("m4.16xlarge", 1), ("p2.8xlarge", 1)] + + +def test_get_instances_respects_max_limit(): + types = { + "m4.large": { + "resources": { + "CPU": 2 + }, + "max_workers": 10, + }, + "gpu": { + "resources": { + "GPU": 1 + }, + "max_workers": 99999, + }, + } + assert get_instances_for(types, {}, 2, [{"CPU": 1}] * 10) == \ + [("m4.large", 2)] + assert get_instances_for(types, {"m4.large": 9999}, 9999, [{ + "CPU": 1 + }] * 10) == [] + assert get_instances_for(types, {"m4.large": 0}, 9999, [{ + "CPU": 1 + }] * 10) == [("m4.large", 5)] + assert get_instances_for(types, {"m4.large": 7}, 4, [{ + "CPU": 1 + }] * 10) == [("m4.large", 3)] + assert get_instances_for(types, {"m4.large": 7}, 2, [{ + "CPU": 1 + }] * 10) == [("m4.large", 2)] + + +class AutoscalingTest(unittest.TestCase): + def setUp(self): + NODE_PROVIDERS["mock"] = \ + lambda: (None, self.create_provider) + self.provider = None + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + del NODE_PROVIDERS["mock"] + shutil.rmtree(self.tmpdir) + ray.shutdown() + + def waitForNodes(self, expected, comparison=None, tag_filters={}): + MAX_ITER = 50 + for i in range(MAX_ITER): + n = len(self.provider.non_terminated_nodes(tag_filters)) + if comparison is None: + comparison = self.assertEqual + try: + comparison(n, expected) + return + except Exception: + if i == MAX_ITER - 1: + raise + time.sleep(.1) + + def create_provider(self, config, cluster_name): + assert self.provider + return self.provider + + def write_config(self, config): + path = self.tmpdir + "/simple.yaml" + with open(path, "w") as f: + f.write(yaml.dump(config)) + return path + + def testScaleUpMinSanity(self): + config_path = self.write_config(MULTI_WORKER_CLUSTER) + self.provider = MockProvider() + runner = MockProcessRunner() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(2) + autoscaler.update() + self.waitForNodes(2) + + def testRequestBundles(self): + config = MULTI_WORKER_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 50 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(0) + autoscaler.request_resources([{"CPU": 1}]) + autoscaler.update() + self.waitForNodes(1) + assert self.provider.mock_nodes[0].instance_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) + autoscaler.update() + self.waitForNodes(2) + assert self.provider.mock_nodes[1].instance_type == "p2.8xlarge" + autoscaler.request_resources([{"CPU": 32}] * 4) + autoscaler.update() + self.waitForNodes(4) + assert self.provider.mock_nodes[2].instance_type == "m4.16xlarge" + assert self.provider.mock_nodes[3].instance_type == "m4.16xlarge" + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__]))