[Autoscaler] New output log format (#12772)

This commit is contained in:
Alex Wu
2020-12-23 12:02:55 -08:00
committed by GitHub
parent d95c8b8a41
commit 8df94e33e0
11 changed files with 907 additions and 154 deletions
+10 -2
View File
@@ -13,6 +13,7 @@ import ray.new_dashboard.utils as dashboard_utils
import ray._private.services
import ray.utils
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS,
DEBUG_AUTOSCALING_STATUS_LEGACY,
DEBUG_AUTOSCALING_ERROR)
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
@@ -113,13 +114,20 @@ class ReportHead(dashboard_utils.DashboardHeadModule):
"""
aioredis_client = self._dashboard_head.aioredis_client
status = await aioredis_client.hget(DEBUG_AUTOSCALING_STATUS, "value")
legacy_status = await aioredis_client.hget(
DEBUG_AUTOSCALING_STATUS_LEGACY, "value")
formatted_status_string = await aioredis_client.hget(
DEBUG_AUTOSCALING_STATUS, "value")
formatted_status = json.loads(formatted_status_string.decode()
) if formatted_status_string else {}
error = await aioredis_client.hget(DEBUG_AUTOSCALING_ERROR, "value")
return dashboard_utils.rest_response(
success=True,
message="Got cluster status.",
autoscaling_status=status.decode() if status else None,
autoscaling_status=legacy_status.decode()
if legacy_status else None,
autoscaling_error=error.decode() if error else None,
cluster_status=formatted_status if formatted_status else None,
)
async def run(self, server):
+7 -2
View File
@@ -19,7 +19,7 @@ from ray import ray_constants
from ray.test_utils import (format_web_url, wait_for_condition,
wait_until_server_available, run_string_as_driver,
wait_until_succeeded_without_exception)
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS,
from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS_LEGACY,
DEBUG_AUTOSCALING_ERROR)
import ray.new_dashboard.consts as dashboard_consts
import ray.new_dashboard.utils as dashboard_utils
@@ -458,11 +458,14 @@ def test_get_cluster_status(ray_start_with_dashboard):
def get_cluster_status():
response = requests.get(f"{webui_url}/api/cluster_status")
response.raise_for_status()
print(response.json())
assert response.json()["result"]
assert "autoscalingStatus" in response.json()["data"]
assert response.json()["data"]["autoscalingStatus"] is None
assert "autoscalingError" in response.json()["data"]
assert response.json()["data"]["autoscalingError"] is None
assert "clusterStatus" in response.json()["data"]
assert "loadMetricsReport" in response.json()["data"]["clusterStatus"]
wait_until_succeeded_without_exception(get_cluster_status,
(requests.RequestException, ))
@@ -478,7 +481,7 @@ def test_get_cluster_status(ray_start_with_dashboard):
port=int(address[1]),
password=ray_constants.REDIS_DEFAULT_PASSWORD)
client.hset(DEBUG_AUTOSCALING_STATUS, "value", "hello")
client.hset(DEBUG_AUTOSCALING_STATUS_LEGACY, "value", "hello")
client.hset(DEBUG_AUTOSCALING_ERROR, "value", "world")
response = requests.get(f"{webui_url}/api/cluster_status")
@@ -488,6 +491,8 @@ def test_get_cluster_status(ray_start_with_dashboard):
assert response.json()["data"]["autoscalingStatus"] == "hello"
assert "autoscalingError" in response.json()["data"]
assert response.json()["data"]["autoscalingError"] == "world"
assert "clusterStatus" in response.json()["data"]
assert "loadMetricsReport" in response.json()["data"]["clusterStatus"]
def test_immutable_types():
+98 -70
View File
@@ -1,4 +1,4 @@
from collections import defaultdict, namedtuple
from collections import defaultdict, namedtuple, Counter
from typing import Any, Optional, Dict, List
from urllib3.exceptions import MaxRetryError
import copy
@@ -16,8 +16,10 @@ from ray.experimental.internal_kv import _internal_kv_put, \
from ray.autoscaler.tags import (
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER,
NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH,
STATUS_SYNCING_FILES, STATUS_SETTING_UP, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
from ray.autoscaler._private.legacy_info_string import legacy_log_info_string
from ray.autoscaler._private.providers import _get_node_provider
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.node_launcher import NodeLauncher
@@ -25,8 +27,8 @@ from ray.autoscaler._private.resource_demand_scheduler import \
get_bin_pack_residual, ResourceDemandScheduler, NodeType, NodeID, NodeIP, \
ResourceDict
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, add_prefix, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_ERROR, format_info_string
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
@@ -41,20 +43,23 @@ UpdateInstructions = namedtuple(
"UpdateInstructions",
["node_id", "init_commands", "start_ray_commands", "docker_config"])
AutoscalerSummary = namedtuple(
"AutoscalerSummary",
["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"])
class StandardAutoscaler:
"""The autoscaling control loop for a Ray cluster.
There are two ways to start an autoscaling cluster: manually by running
`ray start --head --autoscaling-config=/path/to/config.yaml` on a
instance that has permission to launch other instances, or you can also use
`ray up /path/to/config.yaml` from your laptop, which will
configure the right AWS/Cloud roles automatically.
StandardAutoscaler's `update` method is periodically called by `monitor.py`
to add and remove nodes as necessary. Currently, load-based autoscaling is
not implemented, so all this class does is try to maintain a constant
cluster size.
`ray start --head --autoscaling-config=/path/to/config.yaml` on a instance
that has permission to launch other instances, or you can also use `ray up
/path/to/config.yaml` from your laptop, which will configure the right
AWS/Cloud roles automatically. See the documentation for a full definition
of autoscaling behavior:
https://docs.ray.io/en/master/cluster/autoscaling.html
StandardAutoscaler's `update` method is periodically called in
`monitor.py`'s monitoring loop.
StandardAutoscaler is also used to bootstrap clusters (by adding workers
until the cluster size that can handle the resource demand is met).
@@ -120,9 +125,6 @@ class StandardAutoscaler:
for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)
# List of resource bundles the user is requesting of the cluster.
self.resource_demand_vector = []
logger.info("StandardAutoscaler: {}".format(self.config))
def update(self):
@@ -161,7 +163,6 @@ class StandardAutoscaler:
self.provider.internal_ip(node_id)
for node_id in self.all_workers()
])
self.log_info_string(nodes)
# Terminate any idle or out of date nodes
last_used = self.load_metrics.last_used_time_by_ip
@@ -175,7 +176,7 @@ class StandardAutoscaler:
sorted_node_ids = self._sort_based_on_last_used(nodes, last_used)
# Don't terminate nodes needed by request_resources()
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
if self.resource_demand_vector:
if self.load_metrics.get_resource_requests():
nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate(
sorted_node_ids)
@@ -201,7 +202,6 @@ class StandardAutoscaler:
if nodes_to_terminate:
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()
self.log_info_string(nodes)
# Terminate nodes if there are too many
nodes_to_terminate = []
@@ -216,8 +216,6 @@ class StandardAutoscaler:
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()
self.log_info_string(nodes)
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
@@ -225,7 +223,7 @@ class StandardAutoscaler:
self.load_metrics.get_resource_utilization(),
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.resource_demand_vector)
ensure_min_cluster_size=self.load_metrics.get_resource_requests())
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
@@ -255,7 +253,6 @@ class StandardAutoscaler:
self.provider.terminate_nodes(nodes_to_terminate)
nodes = self.workers()
self.log_info_string(nodes)
# Update nodes with out-of-date files.
# TODO(edoakes): Spawning these threads directly seems to cause
@@ -281,6 +278,9 @@ class StandardAutoscaler:
for node_id in nodes:
self.recover_if_needed(node_id, now)
logger.info(self.info_string())
legacy_log_info_string(self, nodes)
def _sort_based_on_last_used(self, nodes: List[NodeID],
last_used: Dict[str, float]) -> List[NodeID]:
"""Sort the nodes based on the last time they were used.
@@ -361,7 +361,7 @@ class StandardAutoscaler:
used_resource_requests: List[ResourceDict]
_, used_resource_requests = \
get_bin_pack_residual(max_node_resources,
self.resource_demand_vector)
self.load_metrics.get_resource_requests())
# Remove the first entry (the head node).
max_node_resources.pop(0)
# Remove the first entry (the head node).
@@ -533,15 +533,17 @@ class StandardAutoscaler:
if not self.can_update(node_id):
return
key = self.provider.internal_ip(node_id)
if key not in self.load_metrics.last_heartbeat_time_by_ip:
self.load_metrics.last_heartbeat_time_by_ip[key] = now
last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[key]
delta = now - last_heartbeat_time
if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S:
return
if key in self.load_metrics.last_heartbeat_time_by_ip:
last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[
key]
delta = now - last_heartbeat_time
if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S:
return
logger.warning("StandardAutoscaler: "
"{}: No heartbeat in {}s, "
"restarting Ray to recover...".format(node_id, delta))
"{}: No recent heartbeat, "
"restarting Ray to recover...".format(node_id))
updater = NodeUpdaterThread(
node_id=node_id,
provider_config=self.config["provider"],
@@ -678,43 +680,6 @@ class StandardAutoscaler:
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED})
def log_info_string(self, nodes):
tmp = "Cluster status: "
tmp += self.info_string(nodes)
tmp += "\n"
tmp += self.load_metrics.info_string()
tmp += "\n"
tmp += self.resource_demand_scheduler.debug_string(
nodes, self.pending_launches.breakdown(),
self.load_metrics.get_resource_utilization())
if _internal_kv_initialized():
_internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True)
if self.prefix_cluster_info:
tmp = add_prefix(tmp, self.config["cluster_name"])
logger.debug(tmp)
def info_string(self, nodes):
suffix = ""
if self.updaters:
suffix += " ({} updating)".format(len(self.updaters))
if self.num_failed_updates:
suffix += " ({} failed to update)".format(
len(self.num_failed_updates))
return "{} nodes{}".format(len(nodes), suffix)
def request_resources(self, resources: List[dict]):
"""Called by monitor to request resources.
Args:
resources: A list of resource bundles.
"""
if resources:
logger.info(
"StandardAutoscaler: resource_requests={}".format(resources))
assert isinstance(resources, list), resources
self.resource_demand_vector = resources
def kill_workers(self):
logger.error("StandardAutoscaler: kill_workers triggered")
nodes = self.workers()
@@ -722,3 +687,66 @@ class StandardAutoscaler:
self.provider.terminate_nodes(nodes)
logger.error("StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))
def summary(self):
"""Summarizes the active, pending, and failed node launches.
An active node is a node whose raylet is actively reporting heartbeats.
A pending node is non-active node whose node tag is uninitialized,
waiting for ssh, syncing files, or setting up.
If a node is not pending or active, it is failed.
Returns:
AutoscalerSummary: The summary.
"""
all_node_ids = self.provider.non_terminated_nodes(tag_filters={})
active_nodes = Counter()
pending_nodes = []
failed_nodes = []
for node_id in all_node_ids:
ip = self.provider.internal_ip(node_id)
node_tags = self.provider.node_tags(node_id)
if node_tags[TAG_RAY_NODE_KIND] == NODE_KIND_UNMANAGED:
continue
node_type = node_tags[TAG_RAY_USER_NODE_TYPE]
# TODO (Alex): If a node's raylet has died, it shouldn't be marked
# as active.
is_active = self.load_metrics.is_active(ip)
if is_active:
active_nodes[node_type] += 1
else:
status = node_tags[TAG_RAY_NODE_STATUS]
pending_states = [
STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH,
STATUS_SYNCING_FILES, STATUS_SETTING_UP
]
is_pending = status in pending_states
if is_pending:
pending_nodes.append((ip, node_type))
else:
# TODO (Alex): Failed nodes are now immediately killed, so
# this list will almost always be empty. We should ideally
# keep a cache of recently failed nodes and their startup
# logs.
failed_nodes.append((ip, node_type))
# The concurrent counter leaves some 0 counts in, so we need to
# manually filter those out.
pending_launches = {}
for node_type, count in self.pending_launches.breakdown().items():
if count:
pending_launches[node_type] = count
return AutoscalerSummary(
active_nodes=active_nodes,
pending_nodes=pending_nodes,
pending_launches=pending_launches,
failed_nodes=failed_nodes)
def info_string(self):
lm_summary = self.load_metrics.summary()
autoscaler_summary = self.summary()
return "\n" + format_info_string(lm_summary, autoscaler_summary)
@@ -43,6 +43,10 @@ from ray.worker import global_worker # type: ignore
from ray.util.debug import log_once
import ray.autoscaler._private.subprocess_output_util as cmd_output_util
from ray.autoscaler._private.load_metrics import LoadMetricsSummary
from ray.autoscaler._private.autoscaler import AutoscalerSummary
from ray.autoscaler._private.util import format_info_string, \
format_info_string_no_node_types
logger = logging.getLogger(__name__)
@@ -94,6 +98,14 @@ def debug_status() -> str:
status = "No cluster status."
else:
status = status.decode("utf-8")
as_dict = json.loads(status)
lm_summary = LoadMetricsSummary(**as_dict["load_metrics_report"])
if "autoscaler_report" in as_dict:
autoscaler_summary = AutoscalerSummary(
**as_dict["autoscaler_report"])
status = format_info_string(lm_summary, autoscaler_summary)
else:
status = format_info_string_no_node_types(lm_summary)
if error:
status += "\n"
status += error.decode("utf-8")
@@ -0,0 +1,35 @@
import logging
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS_LEGACY
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
"""This file provides legacy support for the old info string in order to
ensure the dashboard's `api/cluster_status` does not break backwards
compatibilty.
"""
logger = logging.getLogger(__name__)
def legacy_log_info_string(autoscaler, nodes):
tmp = "Cluster status: "
tmp += info_string(autoscaler, nodes)
tmp += "\n"
tmp += autoscaler.load_metrics.info_string()
tmp += "\n"
tmp += autoscaler.resource_demand_scheduler.debug_string(
nodes, autoscaler.pending_launches.breakdown(),
autoscaler.load_metrics.get_resource_utilization())
if _internal_kv_initialized():
_internal_kv_put(DEBUG_AUTOSCALING_STATUS_LEGACY, tmp, overwrite=True)
logger.debug(tmp)
def info_string(autoscaler, nodes):
suffix = ""
if autoscaler.updaters:
suffix += " ({} updating)".format(len(autoscaler.updaters))
if autoscaler.num_failed_updates:
suffix += " ({} failed to update)".format(
len(autoscaler.num_failed_updates))
return "{} nodes{}".format(len(nodes), suffix)
+88 -4
View File
@@ -1,16 +1,26 @@
from collections import namedtuple
from functools import reduce
import logging
import time
from typing import Dict, List
import numpy as np
import ray._private.services as services
from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES
from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES,\
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.util import add_resources, freq_of_dicts
from ray.gcs_utils import PlacementGroupTableData
from ray.autoscaler._private.resource_demand_scheduler import \
NodeIP, ResourceDict
from ray.core.generated.common_pb2 import PlacementStrategy
logger = logging.getLogger(__name__)
LoadMetricsSummary = namedtuple("LoadMetricsSummary", [
"head_ip", "usage", "resource_demand", "pg_demand", "request_demand",
"node_types"
])
class LoadMetrics:
"""Container for cluster load metrics.
@@ -31,6 +41,7 @@ class LoadMetrics:
self.waiting_bundles = []
self.infeasible_bundles = []
self.pending_placement_groups = []
self.resource_requests = []
def update(self,
ip: str,
@@ -72,9 +83,12 @@ class LoadMetrics:
def mark_active(self, ip):
assert ip is not None, "IP should be known at this time"
logger.info("Node {} is newly setup, treating as active".format(ip))
logger.debug("Node {} is newly setup, treating as active".format(ip))
self.last_heartbeat_time_by_ip[ip] = time.time()
def is_active(self, ip):
return ip in self.last_heartbeat_time_by_ip
def prune_active_ips(self, active_ips):
active_ips = set(active_ips)
active_ips.add(self.local_ip)
@@ -155,12 +169,82 @@ class LoadMetrics:
return resources_used, resources_total
def get_resource_demand_vector(self):
return self.waiting_bundles + self.infeasible_bundles
def get_resource_demand_vector(self, clip=True):
if clip:
# Bound the total number of bundles to
# 2xMAX_RESOURCE_DEMAND_VECTOR_SIZE. This guarantees the resource
# demand scheduler bin packing algorithm takes a reasonable amount
# of time to run.
return (
self.
waiting_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE] +
self.
infeasible_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE]
)
else:
return self.waiting_bundles + self.infeasible_bundles
def get_resource_requests(self):
return self.resource_requests
def get_pending_placement_groups(self):
return self.pending_placement_groups
def summary(self):
available_resources = reduce(add_resources,
self.dynamic_resources_by_ip.values()
) if self.dynamic_resources_by_ip else {}
total_resources = reduce(add_resources,
self.static_resources_by_ip.values()
) if self.static_resources_by_ip else {}
usage_dict = {}
for key in total_resources:
total = total_resources[key]
usage_dict[key] = (total - available_resources[key], total)
summarized_demand_vector = freq_of_dicts(
self.get_resource_demand_vector(clip=False))
summarized_resource_requests = freq_of_dicts(
self.get_resource_requests())
def placement_group_serializer(pg):
bundles = tuple(
frozenset(bundle.unit_resources.items())
for bundle in pg.bundles)
return (bundles, pg.strategy)
def placement_group_deserializer(pg_tuple):
# We marshal this as a dictionary so that we can easily json.dumps
# it later.
# TODO (Alex): Would there be a benefit to properly
# marshalling this (into a protobuf)?
bundles = list(map(dict, pg_tuple[0]))
return {
"bundles": freq_of_dicts(bundles),
"strategy": PlacementStrategy.Name(pg_tuple[1])
}
summarized_placement_groups = freq_of_dicts(
self.get_pending_placement_groups(),
serializer=placement_group_serializer,
deserializer=placement_group_deserializer)
nodes_summary = freq_of_dicts(self.static_resources_by_ip.values())
return LoadMetricsSummary(
head_ip=self.local_ip,
usage=usage_dict,
resource_demand=summarized_demand_vector,
pg_demand=summarized_placement_groups,
request_demand=summarized_resource_requests,
node_types=nodes_summary)
def set_resource_requests(self, requested_resources):
if requested_resources is not None:
assert isinstance(requested_resources, list), requested_resources
self.resource_requests = [
request for request in requested_resources if len(request) > 0
]
def info_string(self):
return " - " + "\n - ".join(
["{}: {}".format(k, v) for k, v in sorted(self._info().items())])
@@ -149,8 +149,8 @@ class ResourceDemandScheduler:
node_resources, node_type_counts = self.calculate_node_resources(
nodes, launching_nodes, unused_resources_by_ip)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
logger.debug("Cluster resources: {}".format(node_resources))
logger.debug("Node counts: {}".format(node_type_counts))
# Step 2: add nodes to add to satisfy min_workers for each type
(node_resources,
node_type_counts,
@@ -160,7 +160,7 @@ class ResourceDemandScheduler:
self.max_workers, self.head_node_type, ensure_min_cluster_size)
# Step 3: add nodes for strict spread groups
logger.info(f"Placement group demands: {pending_placement_groups}")
logger.debug(f"Placement group demands: {pending_placement_groups}")
placement_group_demand_vector, strict_spreads = \
placement_groups_to_resource_demands(pending_placement_groups)
resource_demands.extend(placement_group_demand_vector)
@@ -187,8 +187,8 @@ class ResourceDemandScheduler:
# groups
unfulfilled, _ = get_bin_pack_residual(node_resources,
resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
logger.debug("Resource demands: {}".format(resource_demands))
logger.debug("Unfulfilled demands: {}".format(unfulfilled))
# Add 1 to account for the head node.
max_to_add = self.max_workers + 1 - sum(node_type_counts.values())
nodes_to_add_based_on_demand = get_nodes_for(
@@ -211,7 +211,7 @@ class ResourceDemandScheduler:
total_nodes_to_add, unused_resources_by_ip.keys(), nodes,
launching_nodes, adjusted_min_workers)
logger.info("Node requests: {}".format(total_nodes_to_add))
logger.debug("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
def _legacy_worker_node_to_launch(
@@ -615,8 +615,14 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
# starts up because placement groups are scheduled via custom
# resources. This will behave properly with the current utilization
# score heuristic, but it's a little dangerous and misleading.
logger.info(
"No feasible node type to add for {}".format(resources))
logger.warning(
f"The autoscaler could not find a node type to satisfy the"
f"request: {resources}. If this request is related to "
f"placement groups the resource request will resolve itself, "
f"otherwise please specify a node type with the necessary "
f"resource "
f"https://docs.ray.io/en/master/cluster/autoscaling.html#multiple-node-type-autoscaling." # noqa: E501
)
break
utilization_scores = sorted(utilization_scores, reverse=True)
+174 -1
View File
@@ -1,13 +1,15 @@
import collections
from datetime import datetime
import logging
import hashlib
import json
import jsonschema
import os
import threading
from typing import Any, Dict
from typing import Any, Dict, List
import ray
import ray.ray_constants
import ray._private.services as services
from ray.autoscaler._private.providers import _get_default_config
from ray.autoscaler._private.docker import validate_docker_config
@@ -20,6 +22,7 @@ RAY_SCHEMA_PATH = os.path.join(
# Internal kv keys for storing debug status.
DEBUG_AUTOSCALING_ERROR = "__autoscaling_error"
DEBUG_AUTOSCALING_STATUS = "__autoscaling_status"
DEBUG_AUTOSCALING_STATUS_LEGACY = "__autoscaling_status_legacy"
logger = logging.getLogger(__name__)
@@ -246,6 +249,47 @@ def hash_runtime_conf(file_mounts,
return (_hash_cache[conf_str], file_mounts_contents_hash)
def add_resources(dict1: Dict[str, float],
dict2: Dict[str, float]) -> Dict[str, float]:
"""Add the values in two dictionaries.
Returns:
dict: A new dictionary (inputs remain unmodified).
"""
new_dict = dict1.copy()
for k, v in dict2.items():
new_dict[k] = v + new_dict.get(k, 0)
return new_dict
def freq_of_dicts(dicts: List[Dict],
serializer=lambda d: frozenset(d.items()),
deserializer=dict):
"""Count a list of dictionaries (or unhashable types).
This is somewhat annoying because mutable data structures aren't hashable,
and set/dict keys must be hashable.
Args:
dicts (List[D]): A list of dictionaries to be counted.
serializer (D -> S): A custom serailization function. The output type S
must be hashable. The default serializer converts a dictionary into
a frozenset of KV pairs.
deserializer (S -> U): A custom deserialization function. See the
serializer for information about type S. For dictionaries U := D.
Returns:
List[Tuple[U, int]]: Returns a list of tuples. Each entry in the list
is a tuple containing a unique entry from `dicts` and its
corresponding frequency count.
"""
freqs = collections.Counter(map(lambda d: serializer(d), dicts))
as_list = []
for as_set, count in freqs.items():
as_list.append((deserializer(as_set), count))
return as_list
def add_prefix(info_string, prefix):
"""Prefixes each line of info_string, except the first, by prefix."""
lines = info_string.split("\n")
@@ -255,3 +299,132 @@ def add_prefix(info_string, prefix):
prefixed_lines.append(prefixed_line)
prefixed_info_string = "\n".join(prefixed_lines)
return prefixed_info_string
def format_pg(pg):
strategy = pg["strategy"]
bundles = pg["bundles"]
shape_strs = []
for bundle, count in bundles:
shape_strs.append(f"{bundle} * {count}")
bundles_str = ", ".join(shape_strs)
return f"{bundles_str} ({strategy})"
def get_usage_report(lm_summary):
usage_lines = []
for resource, (used, total) in lm_summary.usage.items():
line = f" {used}/{total} {resource}"
if resource in ["memory", "object_store_memory"]:
to_GiB = ray.ray_constants.MEMORY_RESOURCE_UNIT_BYTES / 2**30
used *= to_GiB
total *= to_GiB
line = f" {used:.2f}/{total:.3f} GiB {resource}"
usage_lines.append(line)
usage_report = "\n".join(usage_lines)
return usage_report
def get_demand_report(lm_summary):
demand_lines = []
for bundle, count in lm_summary.resource_demand:
line = f" {bundle}: {count}+ pending tasks/actors"
demand_lines.append(line)
for entry in lm_summary.pg_demand:
pg, count = entry
pg_str = format_pg(pg)
line = f" {pg_str}: {count}+ pending placement groups"
demand_lines.append(line)
for bundle, count in lm_summary.request_demand:
line = f" {bundle}: {count}+ from request_resources()"
demand_lines.append(line)
if len(demand_lines) > 0:
demand_report = "\n".join(demand_lines)
else:
demand_report = " (no resource demands)"
return demand_report
def format_info_string(lm_summary, autoscaler_summary, time=None):
if time is None:
time = datetime.now()
header = "=" * 8 + f" Autoscaler status: {time} " + "=" * 8
separator = "-" * len(header)
available_node_report_lines = []
for node_type, count in autoscaler_summary.active_nodes.items():
line = f" {count} {node_type}"
available_node_report_lines.append(line)
available_node_report = "\n".join(available_node_report_lines)
pending_lines = []
for node_type, count in autoscaler_summary.pending_launches.items():
line = f" {node_type}, {count} launching"
pending_lines.append(line)
for ip, node_type in autoscaler_summary.pending_nodes:
line = f" {ip}: {node_type}, setting up"
pending_lines.append(line)
if pending_lines:
pending_report = "\n".join(pending_lines)
else:
pending_report = " (no pending nodes)"
failure_lines = []
for ip, node_type in autoscaler_summary.failed_nodes:
line = f" {ip}: {node_type}"
failure_report = "Recent failures:\n"
if failure_lines:
failure_report += "\n".join(failure_lines)
else:
failure_report += " (no failures)"
usage_report = get_usage_report(lm_summary)
demand_report = get_demand_report(lm_summary)
formatted_output = f"""{header}
Node status
{separator}
Healthy:
{available_node_report}
Pending:
{pending_report}
{failure_report}
Resources
{separator}
Usage:
{usage_report}
Demands:
{demand_report}"""
return formatted_output
def format_info_string_no_node_types(lm_summary, time=None):
if time is None:
time = datetime.now()
header = "=" * 8 + f" Cluster status: {time} " + "=" * 8
separator = "-" * len(header)
node_lines = []
for node_type, count in lm_summary.node_types:
line = f" {count} node(s) with resources: {node_type}"
node_lines.append(line)
node_report = "\n".join(node_lines)
usage_report = get_usage_report(lm_summary)
demand_report = get_demand_report(lm_summary)
formatted_output = f"""{header}
Node status
{separator}
{node_report}
Resources
{separator}
Usage:
{usage_report}
Demands:
{demand_report}"""
return formatted_output
+19 -15
View File
@@ -15,11 +15,14 @@ from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS
import ray.gcs_utils
import ray.utils
import ray.ray_constants as ray_constants
from ray.ray_logging import setup_component_logger
from ray._raylet import GlobalStateAccessor
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
import redis
@@ -65,11 +68,7 @@ def parse_resource_demands(resource_load_by_shape):
except Exception:
logger.exception("Failed to parse resource demands.")
# Bound the total number of bundles to 2xMAX_RESOURCE_DEMAND_VECTOR_SIZE.
# This guarantees the resource demand scheduler bin packing algorithm takes
# a reasonable amount of time to run.
return waiting_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE], \
infeasible_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE]
return waiting_bundles, infeasible_bundles
class Monitor:
@@ -184,14 +183,8 @@ class Monitor:
data: a resource request as JSON, e.g. {"CPU": 1}
"""
if not self.autoscaler:
return
try:
self.autoscaler.request_resources(json.loads(data))
except Exception:
# We don't want this to kill the monitor.
traceback.print_exc()
resource_request = json.loads(data)
self.load_metrics.set_resource_requests(resource_request)
def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
@@ -257,12 +250,23 @@ class Monitor:
# Handle messages from the subscription channels.
while True:
self.update_raylet_map()
self.update_load_metrics()
status = {
"load_metrics_report": self.load_metrics.summary()._asdict()
}
# Process autoscaling actions
if self.autoscaler:
# Only used to update the load metrics for the autoscaler.
self.update_raylet_map()
self.update_load_metrics()
self.autoscaler.update()
status[
"autoscaler_report"] = self.autoscaler.summary()._asdict()
as_json = json.dumps(status)
if _internal_kv_initialized():
_internal_kv_put(
DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True)
# Process a round of messages.
self.process_messages()
+47 -13
View File
@@ -54,8 +54,11 @@ class MockProcessRunner:
self.calls = []
self.fail_cmds = fail_cmds or []
self.call_response = {}
self.ready_to_run = threading.Event()
self.ready_to_run.set()
def check_call(self, cmd, *args, **kwargs):
self.ready_to_run.wait()
for token in self.fail_cmds:
if token in str(cmd):
raise CalledProcessError(1, token,
@@ -165,22 +168,28 @@ class MockProvider(NodeProvider):
]
def is_running(self, node_id):
return self.mock_nodes[node_id].state == "running"
with self.lock:
return self.mock_nodes[node_id].state == "running"
def is_terminated(self, node_id):
return self.mock_nodes[node_id].state in ["stopped", "terminated"]
with self.lock:
return self.mock_nodes[node_id].state in ["stopped", "terminated"]
def node_tags(self, node_id):
return self.mock_nodes[node_id].tags
with self.lock:
return self.mock_nodes[node_id].tags
def internal_ip(self, node_id):
return self.mock_nodes[node_id].internal_ip
with self.lock:
return self.mock_nodes[node_id].internal_ip
def external_ip(self, node_id):
return self.mock_nodes[node_id].external_ip
with self.lock:
return self.mock_nodes[node_id].external_ip
def create_node(self, node_config, tags, count):
self.ready_to_create.wait()
def create_node(self, node_config, tags, count, _skip_wait=False):
if not _skip_wait:
self.ready_to_create.wait()
if self.fail_creates:
return
with self.lock:
@@ -200,7 +209,8 @@ class MockProvider(NodeProvider):
self.next_id += 1
def set_node_tags(self, node_id, tags):
self.mock_nodes[node_id].tags.update(tags)
with self.lock:
self.mock_nodes[node_id].tags.update(tags)
def terminate_node(self, node_id):
with self.lock:
@@ -534,7 +544,11 @@ class AutoscalingTest(unittest.TestCase):
config["max_workers"] = 5
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "worker"}, 10)
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "worker",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_WORKER
}, 10)
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
autoscaler = StandardAutoscaler(
@@ -562,6 +576,7 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
@@ -658,11 +673,15 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
# 1 head node.
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
# still 1 head node because request_resources fits in the headnode.
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}] + [{"CPU": 2}] * 9)
autoscaler.load_metrics.set_resource_requests([{
"CPU": 1
}] + [{
"CPU": 2
}] * 9)
autoscaler.update()
self.waitForNodes(2) # Adds a single worker to get its resources.
autoscaler.update()
@@ -767,7 +786,8 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
@@ -817,7 +837,11 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1)
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
@@ -975,6 +999,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm = LoadMetrics()
@@ -1096,6 +1121,14 @@ class AutoscalingTest(unittest.TestCase):
assert len(self.provider.non_terminated_nodes({})) < 2
def testConfiguresOutdatedNodes(self):
from ray.autoscaler._private.cli_logger import cli_logger
def do_nothing(*args, **kwargs):
pass
cli_logger._print = type(cli_logger._print)(do_nothing,
type(cli_logger))
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
@@ -1133,6 +1166,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
@@ -1,4 +1,5 @@
import pytest
from datetime import datetime
import time
import yaml
import tempfile
@@ -8,13 +9,16 @@ import copy
import ray
from ray.autoscaler._private.util import \
rewrite_legacy_yaml_to_available_node_types
rewrite_legacy_yaml_to_available_node_types, format_info_string, \
format_info_string_no_node_types
from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \
MockProcessRunner
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
_clear_provider_cache)
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.autoscaler import StandardAutoscaler, \
AutoscalerSummary
from ray.autoscaler._private.load_metrics import LoadMetrics, \
LoadMetricsSummary
from ray.autoscaler._private.commands import get_or_create_head_node
from ray.autoscaler._private.resource_demand_scheduler import \
_utilization_score, _add_min_workers_nodes, \
@@ -24,6 +28,7 @@ from ray.core.generated.common_pb2 import Bundle, PlacementStrategy
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \
NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UNINITIALIZED, \
STATUS_UPDATE_FAILED, \
NODE_KIND_HEAD, NODE_TYPE_LEGACY_WORKER, \
NODE_TYPE_LEGACY_HEAD
from ray.test_utils import same_elements
@@ -368,6 +373,7 @@ def test_get_nodes_to_launch_with_min_workers():
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
}, 1)
@@ -390,9 +396,13 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
provider, new_types, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"
}, 1)
provider.create_node({}, {
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"
}, 1)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
nodes = provider.non_terminated_nodes({})
@@ -424,7 +434,10 @@ def test_get_nodes_to_launch_limits():
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 3, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
provider.create_node({}, {
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"
}, 2)
nodes = provider.non_terminated_nodes({})
@@ -442,7 +455,10 @@ def test_calculate_node_resources():
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
provider.create_node({}, {
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"
}, 2)
nodes = provider.non_terminated_nodes({})
@@ -1059,6 +1075,86 @@ class LoadMetricsTest(unittest.TestCase):
pending_placement_groups=pending_placement_groups)
assert lm.get_pending_placement_groups() == pending_placement_groups
def testSummary(self):
lm = LoadMetrics(local_ip="1.1.1.1")
assert lm.summary() is not None
pending_placement_groups = [
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
]
lm.update("1.1.1.1", {"CPU": 64}, {"CPU": 2}, {})
lm.update("1.1.1.2", {
"CPU": 64,
"GPU": 8,
"accelerator_type:V100": 1
}, {
"CPU": 0,
"GPU": 1,
"accelerator_type:V100": 1
}, {})
lm.update("1.1.1.3", {
"CPU": 64,
"GPU": 8,
"accelerator_type:V100": 1
}, {
"CPU": 0,
"GPU": 0,
"accelerator_type:V100": 0.92
}, {})
lm.update(
"1.1.1.4", {"CPU": 2}, {"CPU": 2}, {},
waiting_bundles=[{
"GPU": 2
}] * 10,
infeasible_bundles=[{
"CPU": 16
}, {
"GPU": 2
}, {
"CPU": 16,
"GPU": 2
}],
pending_placement_groups=pending_placement_groups)
lm.set_resource_requests([{"CPU": 64}, {"GPU": 8}, {"GPU": 8}])
summary = lm.summary()
assert summary.head_ip == "1.1.1.1"
assert summary.usage["CPU"] == (190, 194)
assert summary.usage["GPU"] == (15, 16)
assert summary.usage["accelerator_type:V100"][1] == 2, \
"Not comparing the usage value due to floating point error."
assert ({"GPU": 2}, 11) in summary.resource_demand
assert ({"CPU": 16}, 1) in summary.resource_demand
assert ({"CPU": 16, "GPU": 2}, 1) in summary.resource_demand
assert len(summary.resource_demand) == 3
assert ({
"bundles": [({
"GPU": 2
}, 2)],
"strategy": "PACK"
}, 2) in summary.pg_demand
assert len(summary.pg_demand) == 1
assert ({"GPU": 8}, 2) in summary.request_demand
assert ({"CPU": 64}, 1) in summary.request_demand
assert len(summary.request_demand) == 2
# TODO (Alex): This set of nodes won't be very useful in practice
# because the node:xxx.xxx.xxx.xxx resources means that no 2 nodes
# should ever have the same set of resources.
assert len(summary.node_types) == 3
class AutoscalingTest(unittest.TestCase):
def setUp(self):
@@ -1157,6 +1253,87 @@ class AutoscalingTest(unittest.TestCase):
self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE),
"empty_node")
def testSummary(self):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["m4.large"]["min_workers"] = 2
config["max_workers"] = 10
config["docker"] = {}
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
lm = LoadMetrics(local_ip=head_ip)
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
max_launch_batch=1,
max_concurrent_launches=10,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(3)
for ip in self.provider.non_terminated_node_ips({}):
lm.update(ip, {"CPU": 2}, {"CPU": 0}, {})
lm.update(head_ip, {"CPU": 16}, {"CPU": 1}, {})
autoscaler.update()
while True:
if len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
})) == 3:
break
# After this section, the p2.xlarge is now in the setup process.
runner.ready_to_run.clear()
lm.update(
head_ip, {"CPU": 16}, {"CPU": 1}, {}, waiting_bundles=[{
"GPU": 1
}])
autoscaler.update()
self.waitForNodes(4)
self.provider.ready_to_create.clear()
lm.set_resource_requests([{"CPU": 64}] * 2)
autoscaler.update()
self.provider.create_node(
{}, {
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_USER_NODE_TYPE: "m4.4xlarge",
TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED
},
1,
_skip_wait=True)
self.waitForNodes(5)
print(f"Head ip: {head_ip}")
summary = autoscaler.summary()
assert summary.active_nodes["m4.large"] == 2
assert summary.active_nodes["empty_node"] == 1
assert len(summary.active_nodes) == 2, summary.active_nodes
assert summary.pending_nodes == [("172.0.0.3", "p2.xlarge")]
assert summary.pending_launches == {"m4.16xlarge": 2}
assert summary.failed_nodes == [("172.0.0.4", "m4.4xlarge")]
# Make sure we return something (and don't throw exceptions). Let's not
# get bogged down with a full cli test here.
assert len(autoscaler.info_string()) > 1
def testScaleUpMinSanity(self):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["m4.large"]["min_workers"] = \
@@ -1166,6 +1343,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
@@ -1191,6 +1369,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
@@ -1217,6 +1396,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "m4.4xlarge"
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
@@ -1227,7 +1407,7 @@ class AutoscalingTest(unittest.TestCase):
max_failures=0,
process_runner=runner,
update_interval_s=0)
head_ip = self.provider.non_terminated_node_ips({})[0]
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
@@ -1285,6 +1465,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
@@ -1304,11 +1485,15 @@ class AutoscalingTest(unittest.TestCase):
} == {"p2.8xlarge", "m4.large"}
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 2)
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.16xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 2)
assert len(self.provider.non_terminated_nodes({})) == 7
# Make sure that after idle_timeout_minutes we don't kill idle
@@ -1339,7 +1524,9 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "p2.xlarge"
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "p2.xlarge",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
self.provider.finish_starting_nodes()
@@ -1377,7 +1564,9 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: "head"
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_NODE_KIND: "head",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 1)
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
@@ -1391,16 +1580,16 @@ class AutoscalingTest(unittest.TestCase):
# These requests fit on the head node.
autoscaler.update()
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(1)
assert len(self.provider.mock_nodes) == 1
autoscaler.request_resources([{"GPU": 8}])
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(1)
# This request requires an additional worker node.
autoscaler.request_resources([{"GPU": 8}] * 2)
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}] * 2)
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
@@ -1415,6 +1604,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
@@ -1426,15 +1616,15 @@ class AutoscalingTest(unittest.TestCase):
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
autoscaler.request_resources([{"CPU": 32}] * 4)
autoscaler.load_metrics.set_resource_requests([{"CPU": 32}] * 4)
autoscaler.update()
self.waitForNodes(5)
@@ -1450,6 +1640,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
self.provider.create_node({}, {
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
@@ -1462,11 +1653,11 @@ class AutoscalingTest(unittest.TestCase):
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
@@ -1495,6 +1686,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
@@ -1541,6 +1733,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
@@ -1554,15 +1747,15 @@ class AutoscalingTest(unittest.TestCase):
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
autoscaler.load_metrics.set_resource_requests([{"GPU": 1}] * 9)
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[3].node_type == "p2.xlarge"
@@ -1601,6 +1794,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
@@ -1612,21 +1806,21 @@ class AutoscalingTest(unittest.TestCase):
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
autoscaler.load_metrics.set_resource_requests([{"GPU": 1}] * 9)
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[3].node_type == "p2.xlarge"
autoscaler.update()
# Fill up m4, p2.8, p2 and request 2 more CPUs
autoscaler.request_resources([{
autoscaler.load_metrics.set_resource_requests([{
"CPU": 2
}, {
"CPU": 16
@@ -1674,6 +1868,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
@@ -1702,6 +1897,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
@@ -1713,11 +1909,11 @@ class AutoscalingTest(unittest.TestCase):
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.load_metrics.set_resource_requests([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.load_metrics.set_resource_requests([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
@@ -1750,6 +1946,7 @@ class AutoscalingTest(unittest.TestCase):
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
@@ -1762,7 +1959,10 @@ class AutoscalingTest(unittest.TestCase):
update_interval_s=0)
autoscaler.update()
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
non_terminated_nodes = autoscaler.provider.non_terminated_nodes({})
@@ -1783,10 +1983,16 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
# this fits on request_resources()!
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2)
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}] * 2)
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}])
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"], {}, {},
@@ -1859,6 +2065,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
@@ -1868,7 +2075,10 @@ class AutoscalingTest(unittest.TestCase):
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
# 1 min worker for both min_worker and request_resources()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
@@ -1887,16 +2097,25 @@ class AutoscalingTest(unittest.TestCase):
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2)
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}] * 2)
autoscaler.update()
# 2 requested_resource, 1 min worker, 1 free node -> 2 nodes total
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}])
autoscaler.update()
# Still 2 because the second one is not connected and hence
# request_resources occupies the connected node.
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 3)
autoscaler.load_metrics.set_resource_requests([{
"CPU": 0.2,
"WORKER": 1.0
}] * 3)
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"], {}, {},
@@ -1906,7 +2125,7 @@ class AutoscalingTest(unittest.TestCase):
}] * 3)
autoscaler.update()
self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([])
autoscaler.load_metrics.set_resource_requests([])
lm.update("172.0.0.2",
config["available_node_types"]["def_worker"]["resources"],
@@ -1919,6 +2138,8 @@ class AutoscalingTest(unittest.TestCase):
lm.update(node_ip,
config["available_node_types"]["def_worker"]["resources"],
{}, {})
print("============ Should scale down from here =============",
node_id)
autoscaler.update()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# If node {node_id} was terminated any time then it's state will be set
@@ -1958,6 +2179,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
@@ -1967,7 +2189,10 @@ class AutoscalingTest(unittest.TestCase):
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.request_resources([{"CPU": 2, "WORKER": 1.0}] * 2)
autoscaler.load_metrics.set_resource_requests([{
"CPU": 2,
"WORKER": 1.0
}] * 2)
autoscaler.update()
# 2 min worker for both min_worker and request_resources(), not 3.
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
@@ -1998,12 +2223,14 @@ class AutoscalingTest(unittest.TestCase):
"max_workers": 3,
}
})
config["idle_timeout_minutes"] = 0
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "empty_node"
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE,
TAG_RAY_USER_NODE_TYPE: "empty_node",
}, 1)
runner = MockProcessRunner()
@@ -2023,7 +2250,10 @@ class AutoscalingTest(unittest.TestCase):
waiting_bundles=[{
"CPU": 2
}])
autoscaler.request_resources([{"CPU": 2, "GPU": 1}] * 2)
autoscaler.load_metrics.set_resource_requests([{
"CPU": 2,
"GPU": 1
}] * 2)
autoscaler.update()
# 1 head, 1 worker.
self.waitForNodes(2)
@@ -2041,6 +2271,140 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(2)
def format_pg(pg):
strategy = pg["strategy"]
bundles = pg["bundles"]
shape_strs = []
for bundle, count in bundles:
shape_strs.append(f"{bundle} * {count}")
bundles_str = ", ".join(shape_strs)
return f"{bundles_str} ({strategy})"
def test_info_string():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530, 544),
"GPU": (2, 2),
"AcceleratorType:V100": (0, 2),
"memory": (0, 1583.19),
"object_store_memory": (0, 471.02)
},
resource_demand=[({
"CPU": 1
}, 150)],
pg_demand=[({
"bundles": [({
"CPU": 4
}, 5)],
"strategy": "PACK"
}, 420)],
request_demand=[({
"CPU": 16
}, 100)],
node_types=[])
autoscaler_summary = AutoscalerSummary(
active_nodes={
"p3.2xlarge": 2,
"m4.4xlarge": 20
},
pending_nodes=[("1.2.3.4", "m4.4xlarge"), ("1.2.3.5", "m4.4xlarge")],
pending_launches={"m4.4xlarge": 2},
failed_nodes=[("1.2.3.6", "p3.2xlarge")])
expected = """
======== Autoscaler status: 2020-12-28 01:02:03 ========
Node status
--------------------------------------------------------
Healthy:
2 p3.2xlarge
20 m4.4xlarge
Pending:
m4.4xlarge, 2 launching
1.2.3.4: m4.4xlarge, setting up
1.2.3.5: m4.4xlarge, setting up
Recent failures:
(no failures)
Resources
--------------------------------------------------------
Usage:
530/544 CPU
2/2 GPU
0/2 AcceleratorType:V100
0.00/77.304 GiB memory
0.00/22.999 GiB object_store_memory
Demands:
{'CPU': 1}: 150+ pending tasks/actors
{'CPU': 4} * 5 (PACK): 420+ pending placement groups
{'CPU': 16}: 100+ from request_resources()
""".strip()
actual = format_info_string(
lm_summary,
autoscaler_summary,
time=datetime(year=2020, month=12, day=28, hour=1, minute=2, second=3))
print(actual)
assert expected == actual
def test_info_string_no_node_type():
lm_summary = LoadMetricsSummary(
head_ip="0.0.0.0",
usage={
"CPU": (530, 544),
"GPU": (2, 2),
"AcceleratorType:V100": (0, 2),
"memory": (0, 1583.19),
"object_store_memory": (0, 471.02)
},
resource_demand=[({
"CPU": 1
}, 150)],
pg_demand=[({
"bundles": [({
"CPU": 4
}, 5)],
"strategy": "PACK"
}, 420)],
request_demand=[({
"CPU": 16
}, 100)],
node_types=[({
"CPU": 16
}, 1)])
expected = """
======== Cluster status: 2020-12-28 01:02:03 ========
Node status
-----------------------------------------------------
1 node(s) with resources: {'CPU': 16}
Resources
-----------------------------------------------------
Usage:
530/544 CPU
2/2 GPU
0/2 AcceleratorType:V100
0.00/77.304 GiB memory
0.00/22.999 GiB object_store_memory
Demands:
{'CPU': 1}: 150+ pending tasks/actors
{'CPU': 4} * 5 (PACK): 420+ pending placement groups
{'CPU': 16}: 100+ from request_resources()
""".strip()
actual = format_info_string_no_node_types(
lm_summary,
time=datetime(year=2020, month=12, day=28, hour=1, minute=2, second=3))
print(actual)
assert expected == actual
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))