mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 00:52:10 +08:00
[autoscaler] Initial support for multiple worker types (#9096)
* wip * fix * update * debug state * fix * update * update * fix test * fix * fix * update * fix * types and docs * update
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
from collections import defaultdict
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import numpy as np
|
||||
@@ -18,15 +17,13 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
|
||||
STATUS_UP_TO_DATE, NODE_TYPE_WORKER)
|
||||
from ray.autoscaler.updater import NodeUpdaterThread
|
||||
from ray.autoscaler.node_launcher import NodeLauncher
|
||||
from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler
|
||||
from ray.autoscaler.util import ConcurrentCounter, validate_config, \
|
||||
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
|
||||
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
|
||||
from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
|
||||
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
|
||||
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \
|
||||
AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
import ray.services as services
|
||||
from ray.worker import global_worker
|
||||
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
|
||||
from six.moves import queue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -64,6 +61,15 @@ class StandardAutoscaler:
|
||||
self.provider = get_node_provider(self.config["provider"],
|
||||
self.config["cluster_name"])
|
||||
|
||||
# Check whether we can enable the resource demand scheduler.
|
||||
if "available_instance_types" in self.config:
|
||||
self.instance_types = self.config["available_instance_types"]
|
||||
self.resource_demand_scheduler = ResourceDemandScheduler(
|
||||
self.provider, self.instance_types, self.config["max_workers"])
|
||||
else:
|
||||
self.instance_types = None
|
||||
self.resource_demand_scheduler = None
|
||||
|
||||
self.max_failures = max_failures
|
||||
self.max_launch_batch = max_launch_batch
|
||||
self.max_concurrent_launches = max_concurrent_launches
|
||||
@@ -103,7 +109,10 @@ class StandardAutoscaler:
|
||||
for local_path in self.config["file_mounts"].values():
|
||||
assert os.path.exists(local_path)
|
||||
|
||||
# Aggregate resources the user is requesting of the cluster.
|
||||
self.resource_requests = defaultdict(int)
|
||||
# List of resource bundles the user is requesting of the cluster.
|
||||
self.resource_demand_vector = None
|
||||
|
||||
logger.info("StandardAutoscaler: {}".format(self.config))
|
||||
|
||||
@@ -178,6 +187,17 @@ class StandardAutoscaler:
|
||||
nodes = self.workers()
|
||||
self.log_info_string(nodes, target_workers)
|
||||
|
||||
# First let the resource demand scheduler launch nodes, if enabled.
|
||||
if self.resource_demand_scheduler and self.resource_demand_vector:
|
||||
# TODO(ekl) include head node in the node list
|
||||
instances = (
|
||||
self.resource_demand_scheduler.get_instances_to_launch(
|
||||
nodes, self.pending_launches.breakdown(),
|
||||
self.resource_demand_vector))
|
||||
# TODO(ekl) also enforce max launch concurrency here?
|
||||
for instance_type, count in instances:
|
||||
self.launch_new_node(count, instance_type=instance_type)
|
||||
|
||||
# Launch additional nodes of the default type, if still needed.
|
||||
num_pending = self.pending_launches.value
|
||||
num_workers = len(nodes) + num_pending
|
||||
@@ -383,6 +403,11 @@ class StandardAutoscaler:
|
||||
def launch_new_node(self, count, instance_type):
|
||||
logger.info(
|
||||
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
|
||||
# Try to fill in the default instance type so we can tag it properly.
|
||||
if not instance_type:
|
||||
instance_type = self.provider.get_instance_type(
|
||||
self.config["worker_nodes"])
|
||||
assert instance_type is not None
|
||||
self.pending_launches.inc(instance_type, count)
|
||||
config = copy.deepcopy(self.config)
|
||||
self.launch_queue.put((config, count, instance_type))
|
||||
@@ -397,6 +422,9 @@ class StandardAutoscaler:
|
||||
tmp += "\n"
|
||||
tmp += self.load_metrics.info_string()
|
||||
tmp += "\n"
|
||||
if self.resource_demand_scheduler:
|
||||
tmp += self.resource_demand_scheduler.debug_string(
|
||||
nodes, self.pending_launches.breakdown())
|
||||
if _internal_kv_initialized():
|
||||
_internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True)
|
||||
logger.info(tmp)
|
||||
@@ -416,12 +444,20 @@ class StandardAutoscaler:
|
||||
return "{}/{} target nodes{}".format(len(nodes), target, suffix)
|
||||
|
||||
def request_resources(self, resources):
|
||||
for resource, count in resources.items():
|
||||
self.resource_requests[resource] = max(
|
||||
self.resource_requests[resource], count)
|
||||
"""Called by monitor to request resources (EXPERIMENTAL).
|
||||
|
||||
Args:
|
||||
resources: Either a list of resource bundles or a single resource
|
||||
demand dictionary.
|
||||
"""
|
||||
logger.info(
|
||||
"StandardAutoscaler: resource_requests={}".format(resources))
|
||||
if isinstance(resources, list):
|
||||
self.resource_demand_vector = resources
|
||||
else:
|
||||
for resource, count in resources.items():
|
||||
self.resource_requests[resource] = max(
|
||||
self.resource_requests[resource], count)
|
||||
|
||||
def kill_workers(self):
|
||||
logger.error("StandardAutoscaler: kill_workers triggered")
|
||||
@@ -432,32 +468,6 @@ class StandardAutoscaler:
|
||||
len(nodes)))
|
||||
|
||||
|
||||
# Note: this is an (experimental) user-facing API, do not move.
|
||||
def request_resources(num_cpus=None, num_gpus=None, bundles=None):
|
||||
"""Remotely request some CPU or GPU resources from the autoscaler.
|
||||
|
||||
This function is to be called e.g. on a node before submitting a bunch of
|
||||
ray.remote calls to ensure that resources rapidly become available.
|
||||
|
||||
In the future this could be extended to do GPU cores or other custom
|
||||
resources.
|
||||
|
||||
This function is non blocking.
|
||||
|
||||
Args:
|
||||
|
||||
num_cpus: int -- the number of CPU cores to request
|
||||
num_gpus: int -- the number of GPUs to request (Not implemented)
|
||||
|
||||
"""
|
||||
if num_gpus is not None:
|
||||
raise NotImplementedError(
|
||||
"GPU resource is not yet supported through request_resources")
|
||||
r = services.create_redis_client(
|
||||
global_worker.node.redis_address,
|
||||
password=global_worker.node.redis_password)
|
||||
if num_cpus > 0:
|
||||
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
|
||||
json.dumps({
|
||||
"CPU": num_cpus
|
||||
}))
|
||||
def request_resources(num_cpus=None, num_gpus=None):
|
||||
raise DeprecationWarning(
|
||||
"Please use ray.autoscaler.commands.request_resources instead.")
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
# EXPERIMENTAL: an example of configuring a mixed-worker cluster. Currently
|
||||
# multi-worker autoscaling only works if you use the request_resources() call.
|
||||
cluster_name: auto_instance_type
|
||||
min_workers: 1
|
||||
max_workers: 40
|
||||
|
||||
# Cloud-provider specific configuration.
|
||||
provider:
|
||||
type: aws
|
||||
region: us-west-2
|
||||
availability_zone: us-west-2a
|
||||
|
||||
# Tell the autoscaler the allowed node types and the resources they provide.
|
||||
# This only has an effect if you use the experimental request_resources() call.
|
||||
available_instance_types:
|
||||
m4.xlarge:
|
||||
resources: {"CPU": 4}
|
||||
max_workers: 10
|
||||
m4.4xlarge:
|
||||
resources: {"CPU": 16}
|
||||
max_workers: 10
|
||||
p2.xlarge:
|
||||
resources: {"CPU": 4, "GPU": 1}
|
||||
max_workers: 4
|
||||
p2.8xlarge:
|
||||
resources: {"CPU": 32, "GPU": 8}
|
||||
max_workers: 2
|
||||
|
||||
# Configure the cluster for very conservative auto-scaling otherwise.
|
||||
target_utilization_fraction: 1.0
|
||||
idle_timeout_minutes: 2
|
||||
|
||||
# How Ray will authenticate with newly launched nodes.
|
||||
auth:
|
||||
ssh_user: ubuntu
|
||||
|
||||
# Provider-specific config for the head node, e.g. instance type.
|
||||
head_node:
|
||||
InstanceType: m4.xlarge
|
||||
ImageId: latest_dlami
|
||||
|
||||
# Provider-specific config for the worker nodes, e.g. instance type.
|
||||
# NOTE: the instance type can be overriden by the resource demand scheduler.
|
||||
# The instance type set here is only used as the default fallback.
|
||||
worker_nodes:
|
||||
InstanceType: m4.xlarge
|
||||
ImageId: latest_dlami
|
||||
@@ -1,4 +1,5 @@
|
||||
import random
|
||||
import copy
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
@@ -9,7 +10,7 @@ from botocore.config import Config
|
||||
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \
|
||||
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_TYPE
|
||||
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_TYPE, TAG_RAY_INSTANCE_TYPE
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
|
||||
@@ -184,7 +185,19 @@ class AWSNodeProvider(NodeProvider):
|
||||
|
||||
self.tag_cache_update_event.set()
|
||||
|
||||
def create_node_of_type(self, node_config, tags, instance_type, count):
|
||||
assert instance_type is not None
|
||||
node_config["InstanceType"] = instance_type
|
||||
return self.create_node(node_config, tags, count)
|
||||
|
||||
def get_instance_type(self, node_config):
|
||||
return node_config["InstanceType"]
|
||||
|
||||
def create_node(self, node_config, tags, count):
|
||||
# Always add the instance type tag, since node reuse is unsafe
|
||||
# otherwise.
|
||||
tags = copy.deepcopy(tags)
|
||||
tags[TAG_RAY_INSTANCE_TYPE] = node_config["InstanceType"]
|
||||
# Try to reuse previously stopped nodes with compatible configs
|
||||
if self.cache_stopped_nodes:
|
||||
filters = [
|
||||
@@ -200,6 +213,10 @@ class AWSNodeProvider(NodeProvider):
|
||||
"Name": "tag:{}".format(TAG_RAY_NODE_TYPE),
|
||||
"Values": [tags[TAG_RAY_NODE_TYPE]],
|
||||
},
|
||||
{
|
||||
"Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE),
|
||||
"Values": [tags[TAG_RAY_INSTANCE_TYPE]],
|
||||
},
|
||||
{
|
||||
"Name": "tag:{}".format(TAG_RAY_LAUNCH_CONFIG),
|
||||
"Values": [tags[TAG_RAY_LAUNCH_CONFIG]],
|
||||
|
||||
@@ -23,6 +23,7 @@ from ray.autoscaler.util import validate_config, hash_runtime_conf, \
|
||||
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
|
||||
TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD
|
||||
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
from ray.autoscaler.updater import NodeUpdaterThread
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
from ray.autoscaler.docker import with_docker_exec
|
||||
@@ -56,6 +57,30 @@ def debug_status():
|
||||
return status
|
||||
|
||||
|
||||
def request_resources(num_cpus=None, bundles=None):
|
||||
"""Remotely request some CPU or GPU resources from the autoscaler.
|
||||
|
||||
This function is to be called e.g. on a node before submitting a bunch of
|
||||
ray.remote calls to ensure that resources rapidly become available.
|
||||
|
||||
This function is EXPERIMENTAL.
|
||||
|
||||
Args:
|
||||
num_cpus: int -- the number of CPU cores to request
|
||||
bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This
|
||||
only has an effect if you've configured `available_instance_types`
|
||||
if your cluster config.
|
||||
"""
|
||||
r = _redis()
|
||||
if num_cpus is not None and num_cpus > 0:
|
||||
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
|
||||
json.dumps({
|
||||
"CPU": num_cpus
|
||||
}))
|
||||
if bundles:
|
||||
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(bundles))
|
||||
|
||||
|
||||
def create_or_update_cluster(config_file, override_min_workers,
|
||||
override_max_workers, no_restart, restart_only,
|
||||
yes, override_cluster_name):
|
||||
|
||||
@@ -3,7 +3,8 @@ import threading
|
||||
|
||||
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS,
|
||||
TAG_RAY_NODE_TYPE, TAG_RAY_NODE_NAME,
|
||||
STATUS_UNINITIALIZED, NODE_TYPE_WORKER)
|
||||
TAG_RAY_INSTANCE_TYPE, STATUS_UNINITIALIZED,
|
||||
NODE_TYPE_WORKER)
|
||||
from ray.autoscaler.util import hash_launch_conf
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -32,7 +33,7 @@ class NodeLauncher(threading.Thread):
|
||||
TAG_RAY_LAUNCH_CONFIG: launch_hash,
|
||||
}
|
||||
if instance_type:
|
||||
# node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type
|
||||
node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type
|
||||
self.provider.create_node_of_type(node_config, node_tags,
|
||||
instance_type, count)
|
||||
else:
|
||||
|
||||
@@ -211,6 +211,22 @@ class NodeProvider:
|
||||
"""Clean-up when a Provider is no longer required."""
|
||||
pass
|
||||
|
||||
def create_node_of_type(self, node_config, tags, instance_type, count):
|
||||
"""Creates a number of nodes with a given instance type.
|
||||
|
||||
This is an optional method only required if using the resource
|
||||
demand scheduler.
|
||||
"""
|
||||
assert instance_type is not None
|
||||
raise NotImplementedError
|
||||
|
||||
def get_instance_type(self, node_config):
|
||||
"""Returns the instance type of this node config.
|
||||
|
||||
This is an optional method only required if using the resource
|
||||
demand scheduler."""
|
||||
return None
|
||||
|
||||
def get_command_runner(self,
|
||||
log_prefix,
|
||||
node_id,
|
||||
|
||||
@@ -241,6 +241,24 @@
|
||||
},
|
||||
"no_restart": {
|
||||
"description": "Whether to avoid restarting the cluster during updates. This field is controlled by the ray --no-restart flag and cannot be set by the user."
|
||||
},
|
||||
"available_instance_types": {
|
||||
"type": "object",
|
||||
"description": "A list of instance types available for launching with 'auto' worker type.",
|
||||
"patternProperties": {
|
||||
".*": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"max_workers": {"type": "integer"},
|
||||
"resources": {
|
||||
"type": "object",
|
||||
".*": {"type": "number"}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,226 @@
|
||||
import copy
|
||||
import numpy as np
|
||||
import logging
|
||||
import collections
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# e.g., m4.16xlarge.
|
||||
InstanceType = str
|
||||
|
||||
# e.g., {"resources": ..., "max_workers": ...}.
|
||||
InstanceTypeConfigDict = str
|
||||
|
||||
# e.g., {"GPU": 1}.
|
||||
ResourceDict = str
|
||||
|
||||
# e.g., IP address of the node.
|
||||
NodeID = str
|
||||
|
||||
|
||||
class ResourceDemandScheduler:
|
||||
def __init__(self, provider: NodeProvider,
|
||||
instance_types: Dict[InstanceType, InstanceTypeConfigDict],
|
||||
max_workers: int):
|
||||
self.provider = provider
|
||||
self.instance_types = instance_types
|
||||
self.max_workers = max_workers
|
||||
|
||||
def debug_string(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeID, int]) -> str:
|
||||
node_resources, instance_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes)
|
||||
|
||||
out = "Worker instance types:"
|
||||
for instance_type, count in instance_type_counts.items():
|
||||
out += "\n - {}: {}".format(instance_type, count)
|
||||
if pending_nodes.get(instance_type):
|
||||
out += " ({} pending)".format(pending_nodes[instance_type])
|
||||
|
||||
return out
|
||||
|
||||
def calculate_node_resources(
|
||||
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]
|
||||
) -> (List[ResourceDict], Dict[InstanceType, int]):
|
||||
"""Returns node resource list and instance type counts."""
|
||||
|
||||
node_resources = []
|
||||
instance_type_counts = collections.defaultdict(int)
|
||||
|
||||
def add_instance(instance_type):
|
||||
if instance_type not in self.instance_types:
|
||||
raise RuntimeError(
|
||||
"Missing entry for instance_type {} in "
|
||||
"available_instance_types config: {}".format(
|
||||
instance_type, self.instance_types))
|
||||
# Careful not to include the same dict object multiple times.
|
||||
node_resources.append(
|
||||
copy.deepcopy(self.instance_types[instance_type]["resources"]))
|
||||
instance_type_counts[instance_type] += 1
|
||||
|
||||
for node_id in nodes:
|
||||
tags = self.provider.node_tags(node_id)
|
||||
if TAG_RAY_INSTANCE_TYPE in tags:
|
||||
instance_type = tags[TAG_RAY_INSTANCE_TYPE]
|
||||
add_instance(instance_type)
|
||||
|
||||
for instance_type, count in pending_nodes.items():
|
||||
for _ in range(count):
|
||||
add_instance(instance_type)
|
||||
|
||||
return node_resources, instance_type_counts
|
||||
|
||||
def get_instances_to_launch(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[InstanceType, int],
|
||||
resource_demands: List[ResourceDict]
|
||||
) -> List[Tuple[InstanceType, int]]:
|
||||
"""Get a list of instance types that should be added to the cluster.
|
||||
|
||||
This method:
|
||||
(1) calculates the resources present in the cluster.
|
||||
(2) calculates the unfulfilled resource bundles.
|
||||
(3) calculates which instances need to be launched to fulfill all
|
||||
the bundle requests, subject to max_worker constraints.
|
||||
"""
|
||||
|
||||
if resource_demands is None:
|
||||
logger.info("No resource demands")
|
||||
return []
|
||||
|
||||
node_resources, instance_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes)
|
||||
logger.info("Cluster resources: {}".format(node_resources))
|
||||
logger.info("Instance counts: {}".format(instance_type_counts))
|
||||
|
||||
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
|
||||
logger.info("Resource demands: {}".format(resource_demands))
|
||||
logger.info("Unfulfilled demands: {}".format(unfulfilled))
|
||||
|
||||
instances = get_instances_for(
|
||||
self.instance_types, instance_type_counts,
|
||||
self.max_workers - len(nodes), unfulfilled)
|
||||
logger.info("Instance requests: {}".format(instances))
|
||||
return instances
|
||||
|
||||
|
||||
def get_instances_for(
|
||||
instance_types: Dict[InstanceType, InstanceTypeConfigDict],
|
||||
existing_instances: Dict[InstanceType, int], max_to_add: int,
|
||||
resources: List[ResourceDict]) -> List[Tuple[InstanceType, int]]:
|
||||
"""Determine instances to add given resource demands and constraints.
|
||||
|
||||
Args:
|
||||
instance_types: instance types config.
|
||||
existing_instances: counts of existing instances already launched.
|
||||
This sets constraints on the number of new instances to add.
|
||||
max_to_add: global constraint on instances to add.
|
||||
resources: resource demands to fulfill.
|
||||
|
||||
Returns:
|
||||
List of instances types and count to add.
|
||||
"""
|
||||
instances_to_add = collections.defaultdict(int)
|
||||
allocated_resources = []
|
||||
|
||||
while resources and sum(instances_to_add.values()) < max_to_add:
|
||||
utilization_scores = []
|
||||
for instance_type in instance_types:
|
||||
if (existing_instances.get(
|
||||
instance_type, 0) + instances_to_add.get(instance_type, 0)
|
||||
>= instance_types[instance_type]["max_workers"]):
|
||||
continue
|
||||
node_resources = instance_types[instance_type]["resources"]
|
||||
score = _utilization_score(node_resources, resources)
|
||||
if score is not None:
|
||||
utilization_scores.append((score, instance_type))
|
||||
|
||||
# Give up, no feasible node.
|
||||
if not utilization_scores:
|
||||
logger.info("No feasible instance to add for {}".format(resources))
|
||||
break
|
||||
|
||||
utilization_scores = sorted(utilization_scores, reverse=True)
|
||||
best_instance_type = utilization_scores[0][1]
|
||||
instances_to_add[best_instance_type] += 1
|
||||
allocated_resources.append(
|
||||
instance_types[best_instance_type]["resources"])
|
||||
residual = get_bin_pack_residual(allocated_resources[-1:], resources)
|
||||
assert len(residual) < len(resources), (resources, residual)
|
||||
resources = residual
|
||||
|
||||
return list(instances_to_add.items())
|
||||
|
||||
|
||||
def _utilization_score(node_resources: ResourceDict,
|
||||
resources: ResourceDict) -> float:
|
||||
remaining = copy.deepcopy(node_resources)
|
||||
|
||||
fittable = []
|
||||
for r in resources:
|
||||
if _fits(remaining, r):
|
||||
fittable.append(r)
|
||||
_inplace_subtract(remaining, r)
|
||||
if not fittable:
|
||||
return None
|
||||
|
||||
util_by_resources = []
|
||||
for k, v in node_resources.items():
|
||||
util = (v - remaining[k]) / v
|
||||
util_by_resources.append(v * (util**3))
|
||||
|
||||
# Prioritize using all resources first, then prioritize overall balance
|
||||
# of multiple resources.
|
||||
return (min(util_by_resources), np.mean(util_by_resources))
|
||||
|
||||
|
||||
def get_bin_pack_residual(
|
||||
node_resources: List[ResourceDict],
|
||||
resource_demands: List[ResourceDict]) -> List[ResourceDict]:
|
||||
"""Return a subset of resource_demands that cannot fit in the cluster.
|
||||
|
||||
TODO(ekl): this currently does not guarantee the resources will be packed
|
||||
correctly by the Ray scheduler. This is only possible once the Ray backend
|
||||
supports a placement groups API.
|
||||
|
||||
Args:
|
||||
node_resources (List[ResourceDict]): List of resources per node.
|
||||
resource_demands (List[ResourceDict]): List of resource bundles that
|
||||
need to be bin packed onto the nodes.
|
||||
|
||||
Returns:
|
||||
List[ResourceDict] the residual list resources that do not fit.
|
||||
"""
|
||||
|
||||
unfulfilled = []
|
||||
|
||||
# A most naive bin packing algorithm.
|
||||
nodes = copy.deepcopy(node_resources)
|
||||
for demand in resource_demands:
|
||||
found = False
|
||||
for node in nodes:
|
||||
if _fits(node, demand):
|
||||
_inplace_subtract(node, demand)
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
unfulfilled.append(demand)
|
||||
|
||||
return unfulfilled
|
||||
|
||||
|
||||
def _fits(node: ResourceDict, resources: ResourceDict) -> bool:
|
||||
for k, v in resources.items():
|
||||
if v > node.get(k, 0.0):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None:
|
||||
for k, v in resources.items():
|
||||
assert k in node, (k, node)
|
||||
node[k] -= v
|
||||
assert node[k] >= 0.0, (node, k, v)
|
||||
@@ -8,6 +8,10 @@ TAG_RAY_NODE_TYPE = "ray-node-type"
|
||||
NODE_TYPE_HEAD = "head"
|
||||
NODE_TYPE_WORKER = "worker"
|
||||
|
||||
# Tag for the provider-specific instance type (e.g., m4.4xlarge). This is used
|
||||
# for automatic worker instance type selection.
|
||||
TAG_RAY_INSTANCE_TYPE = "ray-instance-type"
|
||||
|
||||
# Tag that reports the current state of the node (e.g. Updating, Up-to-date)
|
||||
TAG_RAY_NODE_STATUS = "ray-node-status"
|
||||
STATUS_UNINITIALIZED = "uninitialized"
|
||||
|
||||
@@ -76,13 +76,13 @@ class KubernetesCommandRunner:
|
||||
port_forward_cmd) + " failed with error: " + perr
|
||||
raise Exception(exception_str)
|
||||
else:
|
||||
logger.info(self.log_prefix + "Running {}...".format(cmd))
|
||||
final_cmd = self.kubectl + ["exec", "-it"]
|
||||
final_cmd += [
|
||||
self.node_id,
|
||||
"--",
|
||||
]
|
||||
final_cmd += with_interactive(cmd)
|
||||
logger.info(self.log_prefix + "Running {}".format(final_cmd))
|
||||
try:
|
||||
if with_output:
|
||||
return self.process_runner.check_output(
|
||||
|
||||
@@ -21,12 +21,13 @@ import pytest
|
||||
|
||||
|
||||
class MockNode:
|
||||
def __init__(self, node_id, tags):
|
||||
def __init__(self, node_id, tags, instance_type=None):
|
||||
self.node_id = node_id
|
||||
self.state = "pending"
|
||||
self.tags = tags
|
||||
self.external_ip = "1.2.3.4"
|
||||
self.internal_ip = "172.0.0.{}".format(self.node_id)
|
||||
self.instance_type = instance_type
|
||||
|
||||
def matches(self, tags):
|
||||
for k, v in tags.items():
|
||||
@@ -119,7 +120,7 @@ class MockProvider(NodeProvider):
|
||||
def external_ip(self, node_id):
|
||||
return self.mock_nodes[node_id].external_ip
|
||||
|
||||
def create_node(self, node_config, tags, count):
|
||||
def create_node(self, node_config, tags, count, instance_type=None):
|
||||
self.ready_to_create.wait()
|
||||
if self.fail_creates:
|
||||
return
|
||||
@@ -130,9 +131,17 @@ class MockProvider(NodeProvider):
|
||||
node.state = "pending"
|
||||
node.tags.update(tags)
|
||||
for _ in range(count):
|
||||
self.mock_nodes[self.next_id] = MockNode(self.next_id, tags.copy())
|
||||
self.mock_nodes[self.next_id] = MockNode(self.next_id, tags.copy(),
|
||||
instance_type)
|
||||
self.next_id += 1
|
||||
|
||||
def create_node_of_type(self, node_config, tags, instance_type, count):
|
||||
return self.create_node(
|
||||
node_config, tags, count, instance_type=instance_type)
|
||||
|
||||
def get_instance_type(self, node_config):
|
||||
return "m4.large"
|
||||
|
||||
def set_node_tags(self, node_id, tags):
|
||||
self.mock_nodes[node_id].tags.update(tags)
|
||||
|
||||
|
||||
@@ -0,0 +1,231 @@
|
||||
import pytest
|
||||
import time
|
||||
import yaml
|
||||
import tempfile
|
||||
import shutil
|
||||
import unittest
|
||||
|
||||
import ray
|
||||
from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \
|
||||
MockProcessRunner
|
||||
from ray.autoscaler.autoscaler import StandardAutoscaler
|
||||
from ray.autoscaler.load_metrics import LoadMetrics
|
||||
from ray.autoscaler.node_provider import NODE_PROVIDERS
|
||||
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
|
||||
get_bin_pack_residual, get_instances_for
|
||||
|
||||
TYPES_A = {
|
||||
"m4.large": {
|
||||
"resources": {
|
||||
"CPU": 2
|
||||
},
|
||||
"max_workers": 10,
|
||||
},
|
||||
"m4.4xlarge": {
|
||||
"resources": {
|
||||
"CPU": 16
|
||||
},
|
||||
"max_workers": 8,
|
||||
},
|
||||
"m4.16xlarge": {
|
||||
"resources": {
|
||||
"CPU": 64
|
||||
},
|
||||
"max_workers": 4,
|
||||
},
|
||||
"p2.xlarge": {
|
||||
"resources": {
|
||||
"CPU": 16,
|
||||
"GPU": 1
|
||||
},
|
||||
"max_workers": 10,
|
||||
},
|
||||
"p2.8xlarge": {
|
||||
"resources": {
|
||||
"CPU": 32,
|
||||
"GPU": 8
|
||||
},
|
||||
"max_workers": 4,
|
||||
},
|
||||
}
|
||||
|
||||
MULTI_WORKER_CLUSTER = dict(SMALL_CLUSTER, **{
|
||||
"available_instance_types": TYPES_A,
|
||||
})
|
||||
|
||||
|
||||
def test_util_score():
|
||||
assert _utilization_score({"CPU": 64}, [{"TPU": 16}]) is None
|
||||
assert _utilization_score({"GPU": 4}, [{"GPU": 2}]) == (0.5, 0.5)
|
||||
assert _utilization_score({"GPU": 4}, [{"GPU": 1}, {"GPU": 1}]) == \
|
||||
(0.5, 0.5)
|
||||
assert _utilization_score({"GPU": 2}, [{"GPU": 2}]) == (2, 2)
|
||||
assert _utilization_score({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == (2, 2)
|
||||
assert _utilization_score({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == (0, 1)
|
||||
assert _utilization_score({"CPU": 64}, [{"CPU": 64}]) == (64, 64)
|
||||
assert _utilization_score({"CPU": 64}, [{"CPU": 32}]) == (8, 8)
|
||||
assert _utilization_score({"CPU": 64}, [{"CPU": 16}, {"CPU": 16}]) == \
|
||||
(8, 8)
|
||||
|
||||
|
||||
def test_bin_pack():
|
||||
assert get_bin_pack_residual([], [{"GPU": 2}, {"GPU": 2}]) == \
|
||||
[{"GPU": 2}, {"GPU": 2}]
|
||||
assert get_bin_pack_residual([{"GPU": 2}], [{"GPU": 2}, {"GPU": 2}]) == \
|
||||
[{"GPU": 2}]
|
||||
assert get_bin_pack_residual([{"GPU": 4}], [{"GPU": 2}, {"GPU": 2}]) == []
|
||||
arg = [{"GPU": 2}, {"GPU": 2, "CPU": 2}]
|
||||
assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == []
|
||||
arg = [{"CPU": 2}, {"GPU": 2}]
|
||||
assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [{"GPU": 2}]
|
||||
|
||||
|
||||
def test_get_instances_packing_heuristic():
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \
|
||||
[("p2.8xlarge", 1)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \
|
||||
[("p2.8xlarge", 1)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \
|
||||
[("p2.xlarge", 4)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \
|
||||
== [("p2.8xlarge", 3)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \
|
||||
== []
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \
|
||||
[("m4.16xlarge", 3)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \
|
||||
== [("m4.16xlarge", 1), ("m4.large", 1)]
|
||||
assert get_instances_for(
|
||||
TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \
|
||||
[("m4.16xlarge", 1), ("m4.4xlarge", 2)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \
|
||||
[("m4.16xlarge", 1), ("m4.4xlarge", 1)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \
|
||||
[("m4.16xlarge", 1), ("m4.4xlarge", 1)]
|
||||
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \
|
||||
[("m4.16xlarge", 1), ("m4.4xlarge", 2), ("m4.large", 2)]
|
||||
assert get_instances_for(
|
||||
TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \
|
||||
[("m4.16xlarge", 1), ("p2.xlarge", 1)]
|
||||
assert get_instances_for(
|
||||
TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \
|
||||
[("m4.16xlarge", 1), ("p2.8xlarge", 1)]
|
||||
|
||||
|
||||
def test_get_instances_respects_max_limit():
|
||||
types = {
|
||||
"m4.large": {
|
||||
"resources": {
|
||||
"CPU": 2
|
||||
},
|
||||
"max_workers": 10,
|
||||
},
|
||||
"gpu": {
|
||||
"resources": {
|
||||
"GPU": 1
|
||||
},
|
||||
"max_workers": 99999,
|
||||
},
|
||||
}
|
||||
assert get_instances_for(types, {}, 2, [{"CPU": 1}] * 10) == \
|
||||
[("m4.large", 2)]
|
||||
assert get_instances_for(types, {"m4.large": 9999}, 9999, [{
|
||||
"CPU": 1
|
||||
}] * 10) == []
|
||||
assert get_instances_for(types, {"m4.large": 0}, 9999, [{
|
||||
"CPU": 1
|
||||
}] * 10) == [("m4.large", 5)]
|
||||
assert get_instances_for(types, {"m4.large": 7}, 4, [{
|
||||
"CPU": 1
|
||||
}] * 10) == [("m4.large", 3)]
|
||||
assert get_instances_for(types, {"m4.large": 7}, 2, [{
|
||||
"CPU": 1
|
||||
}] * 10) == [("m4.large", 2)]
|
||||
|
||||
|
||||
class AutoscalingTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
NODE_PROVIDERS["mock"] = \
|
||||
lambda: (None, self.create_provider)
|
||||
self.provider = None
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
del NODE_PROVIDERS["mock"]
|
||||
shutil.rmtree(self.tmpdir)
|
||||
ray.shutdown()
|
||||
|
||||
def waitForNodes(self, expected, comparison=None, tag_filters={}):
|
||||
MAX_ITER = 50
|
||||
for i in range(MAX_ITER):
|
||||
n = len(self.provider.non_terminated_nodes(tag_filters))
|
||||
if comparison is None:
|
||||
comparison = self.assertEqual
|
||||
try:
|
||||
comparison(n, expected)
|
||||
return
|
||||
except Exception:
|
||||
if i == MAX_ITER - 1:
|
||||
raise
|
||||
time.sleep(.1)
|
||||
|
||||
def create_provider(self, config, cluster_name):
|
||||
assert self.provider
|
||||
return self.provider
|
||||
|
||||
def write_config(self, config):
|
||||
path = self.tmpdir + "/simple.yaml"
|
||||
with open(path, "w") as f:
|
||||
f.write(yaml.dump(config))
|
||||
return path
|
||||
|
||||
def testScaleUpMinSanity(self):
|
||||
config_path = self.write_config(MULTI_WORKER_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
max_failures=0,
|
||||
process_runner=runner,
|
||||
update_interval_s=0)
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
|
||||
def testRequestBundles(self):
|
||||
config = MULTI_WORKER_CLUSTER.copy()
|
||||
config["min_workers"] = 0
|
||||
config["max_workers"] = 50
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
max_failures=0,
|
||||
process_runner=runner,
|
||||
update_interval_s=0)
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
autoscaler.update()
|
||||
self.waitForNodes(0)
|
||||
autoscaler.request_resources([{"CPU": 1}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
assert self.provider.mock_nodes[0].instance_type == "m4.large"
|
||||
autoscaler.request_resources([{"GPU": 8}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
assert self.provider.mock_nodes[1].instance_type == "p2.8xlarge"
|
||||
autoscaler.request_resources([{"CPU": 32}] * 4)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(4)
|
||||
assert self.provider.mock_nodes[2].instance_type == "m4.16xlarge"
|
||||
assert self.provider.mock_nodes[3].instance_type == "m4.16xlarge"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
Reference in New Issue
Block a user