diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index 2fdd001d4..8faef274d 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -13,6 +13,7 @@ import ray.new_dashboard.utils as dashboard_utils import ray._private.services import ray.utils from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS, + DEBUG_AUTOSCALING_STATUS_LEGACY, DEBUG_AUTOSCALING_ERROR) from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc @@ -113,13 +114,20 @@ class ReportHead(dashboard_utils.DashboardHeadModule): """ aioredis_client = self._dashboard_head.aioredis_client - status = await aioredis_client.hget(DEBUG_AUTOSCALING_STATUS, "value") + legacy_status = await aioredis_client.hget( + DEBUG_AUTOSCALING_STATUS_LEGACY, "value") + formatted_status_string = await aioredis_client.hget( + DEBUG_AUTOSCALING_STATUS, "value") + formatted_status = json.loads(formatted_status_string.decode() + ) if formatted_status_string else {} error = await aioredis_client.hget(DEBUG_AUTOSCALING_ERROR, "value") return dashboard_utils.rest_response( success=True, message="Got cluster status.", - autoscaling_status=status.decode() if status else None, + autoscaling_status=legacy_status.decode() + if legacy_status else None, autoscaling_error=error.decode() if error else None, + cluster_status=formatted_status if formatted_status else None, ) async def run(self, server): diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 32836c0ae..4bd3e0300 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -19,7 +19,7 @@ from ray import ray_constants from ray.test_utils import (format_web_url, wait_for_condition, wait_until_server_available, run_string_as_driver, wait_until_succeeded_without_exception) -from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS, +from ray.autoscaler._private.util import (DEBUG_AUTOSCALING_STATUS_LEGACY, DEBUG_AUTOSCALING_ERROR) import ray.new_dashboard.consts as dashboard_consts import ray.new_dashboard.utils as dashboard_utils @@ -458,11 +458,14 @@ def test_get_cluster_status(ray_start_with_dashboard): def get_cluster_status(): response = requests.get(f"{webui_url}/api/cluster_status") response.raise_for_status() + print(response.json()) assert response.json()["result"] assert "autoscalingStatus" in response.json()["data"] assert response.json()["data"]["autoscalingStatus"] is None assert "autoscalingError" in response.json()["data"] assert response.json()["data"]["autoscalingError"] is None + assert "clusterStatus" in response.json()["data"] + assert "loadMetricsReport" in response.json()["data"]["clusterStatus"] wait_until_succeeded_without_exception(get_cluster_status, (requests.RequestException, )) @@ -478,7 +481,7 @@ def test_get_cluster_status(ray_start_with_dashboard): port=int(address[1]), password=ray_constants.REDIS_DEFAULT_PASSWORD) - client.hset(DEBUG_AUTOSCALING_STATUS, "value", "hello") + client.hset(DEBUG_AUTOSCALING_STATUS_LEGACY, "value", "hello") client.hset(DEBUG_AUTOSCALING_ERROR, "value", "world") response = requests.get(f"{webui_url}/api/cluster_status") @@ -488,6 +491,8 @@ def test_get_cluster_status(ray_start_with_dashboard): assert response.json()["data"]["autoscalingStatus"] == "hello" assert "autoscalingError" in response.json()["data"] assert response.json()["data"]["autoscalingError"] == "world" + assert "clusterStatus" in response.json()["data"] + assert "loadMetricsReport" in response.json()["data"]["clusterStatus"] def test_immutable_types(): diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 64167b4cb..56c8fa634 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,4 +1,4 @@ -from collections import defaultdict, namedtuple +from collections import defaultdict, namedtuple, Counter from typing import Any, Optional, Dict, List from urllib3.exceptions import MaxRetryError import copy @@ -16,8 +16,10 @@ from ray.experimental.internal_kv import _internal_kv_put, \ from ray.autoscaler.tags import ( TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, - TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER, - NODE_KIND_UNMANAGED, NODE_KIND_HEAD) + TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH, + STATUS_SYNCING_FILES, STATUS_SETTING_UP, STATUS_UP_TO_DATE, + NODE_KIND_WORKER, NODE_KIND_UNMANAGED, NODE_KIND_HEAD) +from ray.autoscaler._private.legacy_info_string import legacy_log_info_string from ray.autoscaler._private.providers import _get_node_provider from ray.autoscaler._private.updater import NodeUpdaterThread from ray.autoscaler._private.node_launcher import NodeLauncher @@ -25,8 +27,8 @@ from ray.autoscaler._private.resource_demand_scheduler import \ get_bin_pack_residual, ResourceDemandScheduler, NodeType, NodeID, NodeIP, \ ResourceDict from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ - with_head_node_ip, hash_launch_conf, hash_runtime_conf, add_prefix, \ - DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR + with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ + DEBUG_AUTOSCALING_ERROR, format_info_string from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \ AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ @@ -41,20 +43,23 @@ UpdateInstructions = namedtuple( "UpdateInstructions", ["node_id", "init_commands", "start_ray_commands", "docker_config"]) +AutoscalerSummary = namedtuple( + "AutoscalerSummary", + ["active_nodes", "pending_nodes", "pending_launches", "failed_nodes"]) + class StandardAutoscaler: """The autoscaling control loop for a Ray cluster. There are two ways to start an autoscaling cluster: manually by running - `ray start --head --autoscaling-config=/path/to/config.yaml` on a - instance that has permission to launch other instances, or you can also use - `ray up /path/to/config.yaml` from your laptop, which will - configure the right AWS/Cloud roles automatically. - - StandardAutoscaler's `update` method is periodically called by `monitor.py` - to add and remove nodes as necessary. Currently, load-based autoscaling is - not implemented, so all this class does is try to maintain a constant - cluster size. + `ray start --head --autoscaling-config=/path/to/config.yaml` on a instance + that has permission to launch other instances, or you can also use `ray up + /path/to/config.yaml` from your laptop, which will configure the right + AWS/Cloud roles automatically. See the documentation for a full definition + of autoscaling behavior: + https://docs.ray.io/en/master/cluster/autoscaling.html + StandardAutoscaler's `update` method is periodically called in + `monitor.py`'s monitoring loop. StandardAutoscaler is also used to bootstrap clusters (by adding workers until the cluster size that can handle the resource demand is met). @@ -120,9 +125,6 @@ class StandardAutoscaler: for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) - # List of resource bundles the user is requesting of the cluster. - self.resource_demand_vector = [] - logger.info("StandardAutoscaler: {}".format(self.config)) def update(self): @@ -161,7 +163,6 @@ class StandardAutoscaler: self.provider.internal_ip(node_id) for node_id in self.all_workers() ]) - self.log_info_string(nodes) # Terminate any idle or out of date nodes last_used = self.load_metrics.last_used_time_by_ip @@ -175,7 +176,7 @@ class StandardAutoscaler: sorted_node_ids = self._sort_based_on_last_used(nodes, last_used) # Don't terminate nodes needed by request_resources() nodes_allowed_to_terminate: Dict[NodeID, bool] = {} - if self.resource_demand_vector: + if self.load_metrics.get_resource_requests(): nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate( sorted_node_ids) @@ -201,7 +202,6 @@ class StandardAutoscaler: if nodes_to_terminate: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes) # Terminate nodes if there are too many nodes_to_terminate = [] @@ -216,8 +216,6 @@ class StandardAutoscaler: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes) - to_launch = self.resource_demand_scheduler.get_nodes_to_launch( self.provider.non_terminated_nodes(tag_filters={}), self.pending_launches.breakdown(), @@ -225,7 +223,7 @@ class StandardAutoscaler: self.load_metrics.get_resource_utilization(), self.load_metrics.get_pending_placement_groups(), self.load_metrics.get_static_node_resources_by_ip(), - ensure_min_cluster_size=self.resource_demand_vector) + ensure_min_cluster_size=self.load_metrics.get_resource_requests()) for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) @@ -255,7 +253,6 @@ class StandardAutoscaler: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes) # Update nodes with out-of-date files. # TODO(edoakes): Spawning these threads directly seems to cause @@ -281,6 +278,9 @@ class StandardAutoscaler: for node_id in nodes: self.recover_if_needed(node_id, now) + logger.info(self.info_string()) + legacy_log_info_string(self, nodes) + def _sort_based_on_last_used(self, nodes: List[NodeID], last_used: Dict[str, float]) -> List[NodeID]: """Sort the nodes based on the last time they were used. @@ -361,7 +361,7 @@ class StandardAutoscaler: used_resource_requests: List[ResourceDict] _, used_resource_requests = \ get_bin_pack_residual(max_node_resources, - self.resource_demand_vector) + self.load_metrics.get_resource_requests()) # Remove the first entry (the head node). max_node_resources.pop(0) # Remove the first entry (the head node). @@ -533,15 +533,17 @@ class StandardAutoscaler: if not self.can_update(node_id): return key = self.provider.internal_ip(node_id) - if key not in self.load_metrics.last_heartbeat_time_by_ip: - self.load_metrics.last_heartbeat_time_by_ip[key] = now - last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[key] - delta = now - last_heartbeat_time - if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S: - return + + if key in self.load_metrics.last_heartbeat_time_by_ip: + last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[ + key] + delta = now - last_heartbeat_time + if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S: + return + logger.warning("StandardAutoscaler: " - "{}: No heartbeat in {}s, " - "restarting Ray to recover...".format(node_id, delta)) + "{}: No recent heartbeat, " + "restarting Ray to recover...".format(node_id)) updater = NodeUpdaterThread( node_id=node_id, provider_config=self.config["provider"], @@ -678,43 +680,6 @@ class StandardAutoscaler: return self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED}) - def log_info_string(self, nodes): - tmp = "Cluster status: " - tmp += self.info_string(nodes) - tmp += "\n" - tmp += self.load_metrics.info_string() - tmp += "\n" - tmp += self.resource_demand_scheduler.debug_string( - nodes, self.pending_launches.breakdown(), - self.load_metrics.get_resource_utilization()) - if _internal_kv_initialized(): - _internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True) - if self.prefix_cluster_info: - tmp = add_prefix(tmp, self.config["cluster_name"]) - logger.debug(tmp) - - def info_string(self, nodes): - suffix = "" - if self.updaters: - suffix += " ({} updating)".format(len(self.updaters)) - if self.num_failed_updates: - suffix += " ({} failed to update)".format( - len(self.num_failed_updates)) - - return "{} nodes{}".format(len(nodes), suffix) - - def request_resources(self, resources: List[dict]): - """Called by monitor to request resources. - - Args: - resources: A list of resource bundles. - """ - if resources: - logger.info( - "StandardAutoscaler: resource_requests={}".format(resources)) - assert isinstance(resources, list), resources - self.resource_demand_vector = resources - def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") nodes = self.workers() @@ -722,3 +687,66 @@ class StandardAutoscaler: self.provider.terminate_nodes(nodes) logger.error("StandardAutoscaler: terminated {} node(s)".format( len(nodes))) + + def summary(self): + """Summarizes the active, pending, and failed node launches. + + An active node is a node whose raylet is actively reporting heartbeats. + A pending node is non-active node whose node tag is uninitialized, + waiting for ssh, syncing files, or setting up. + If a node is not pending or active, it is failed. + + Returns: + AutoscalerSummary: The summary. + """ + all_node_ids = self.provider.non_terminated_nodes(tag_filters={}) + + active_nodes = Counter() + pending_nodes = [] + failed_nodes = [] + + for node_id in all_node_ids: + ip = self.provider.internal_ip(node_id) + node_tags = self.provider.node_tags(node_id) + if node_tags[TAG_RAY_NODE_KIND] == NODE_KIND_UNMANAGED: + continue + node_type = node_tags[TAG_RAY_USER_NODE_TYPE] + + # TODO (Alex): If a node's raylet has died, it shouldn't be marked + # as active. + is_active = self.load_metrics.is_active(ip) + if is_active: + active_nodes[node_type] += 1 + else: + status = node_tags[TAG_RAY_NODE_STATUS] + pending_states = [ + STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH, + STATUS_SYNCING_FILES, STATUS_SETTING_UP + ] + is_pending = status in pending_states + if is_pending: + pending_nodes.append((ip, node_type)) + else: + # TODO (Alex): Failed nodes are now immediately killed, so + # this list will almost always be empty. We should ideally + # keep a cache of recently failed nodes and their startup + # logs. + failed_nodes.append((ip, node_type)) + + # The concurrent counter leaves some 0 counts in, so we need to + # manually filter those out. + pending_launches = {} + for node_type, count in self.pending_launches.breakdown().items(): + if count: + pending_launches[node_type] = count + + return AutoscalerSummary( + active_nodes=active_nodes, + pending_nodes=pending_nodes, + pending_launches=pending_launches, + failed_nodes=failed_nodes) + + def info_string(self): + lm_summary = self.load_metrics.summary() + autoscaler_summary = self.summary() + return "\n" + format_info_string(lm_summary, autoscaler_summary) diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 0c7e3abbd..247ba0d69 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -43,6 +43,10 @@ from ray.worker import global_worker # type: ignore from ray.util.debug import log_once import ray.autoscaler._private.subprocess_output_util as cmd_output_util +from ray.autoscaler._private.load_metrics import LoadMetricsSummary +from ray.autoscaler._private.autoscaler import AutoscalerSummary +from ray.autoscaler._private.util import format_info_string, \ + format_info_string_no_node_types logger = logging.getLogger(__name__) @@ -94,6 +98,14 @@ def debug_status() -> str: status = "No cluster status." else: status = status.decode("utf-8") + as_dict = json.loads(status) + lm_summary = LoadMetricsSummary(**as_dict["load_metrics_report"]) + if "autoscaler_report" in as_dict: + autoscaler_summary = AutoscalerSummary( + **as_dict["autoscaler_report"]) + status = format_info_string(lm_summary, autoscaler_summary) + else: + status = format_info_string_no_node_types(lm_summary) if error: status += "\n" status += error.decode("utf-8") diff --git a/python/ray/autoscaler/_private/legacy_info_string.py b/python/ray/autoscaler/_private/legacy_info_string.py new file mode 100644 index 000000000..99791efd7 --- /dev/null +++ b/python/ray/autoscaler/_private/legacy_info_string.py @@ -0,0 +1,35 @@ +import logging +from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS_LEGACY +from ray.experimental.internal_kv import _internal_kv_put, \ + _internal_kv_initialized +"""This file provides legacy support for the old info string in order to +ensure the dashboard's `api/cluster_status` does not break backwards +compatibilty. +""" + +logger = logging.getLogger(__name__) + + +def legacy_log_info_string(autoscaler, nodes): + tmp = "Cluster status: " + tmp += info_string(autoscaler, nodes) + tmp += "\n" + tmp += autoscaler.load_metrics.info_string() + tmp += "\n" + tmp += autoscaler.resource_demand_scheduler.debug_string( + nodes, autoscaler.pending_launches.breakdown(), + autoscaler.load_metrics.get_resource_utilization()) + if _internal_kv_initialized(): + _internal_kv_put(DEBUG_AUTOSCALING_STATUS_LEGACY, tmp, overwrite=True) + logger.debug(tmp) + + +def info_string(autoscaler, nodes): + suffix = "" + if autoscaler.updaters: + suffix += " ({} updating)".format(len(autoscaler.updaters)) + if autoscaler.num_failed_updates: + suffix += " ({} failed to update)".format( + len(autoscaler.num_failed_updates)) + + return "{} nodes{}".format(len(nodes), suffix) diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index b688fe617..dc3178015 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -1,16 +1,26 @@ +from collections import namedtuple +from functools import reduce import logging import time from typing import Dict, List import numpy as np import ray._private.services as services -from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES +from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES,\ + AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE +from ray.autoscaler._private.util import add_resources, freq_of_dicts from ray.gcs_utils import PlacementGroupTableData from ray.autoscaler._private.resource_demand_scheduler import \ NodeIP, ResourceDict +from ray.core.generated.common_pb2 import PlacementStrategy logger = logging.getLogger(__name__) +LoadMetricsSummary = namedtuple("LoadMetricsSummary", [ + "head_ip", "usage", "resource_demand", "pg_demand", "request_demand", + "node_types" +]) + class LoadMetrics: """Container for cluster load metrics. @@ -31,6 +41,7 @@ class LoadMetrics: self.waiting_bundles = [] self.infeasible_bundles = [] self.pending_placement_groups = [] + self.resource_requests = [] def update(self, ip: str, @@ -72,9 +83,12 @@ class LoadMetrics: def mark_active(self, ip): assert ip is not None, "IP should be known at this time" - logger.info("Node {} is newly setup, treating as active".format(ip)) + logger.debug("Node {} is newly setup, treating as active".format(ip)) self.last_heartbeat_time_by_ip[ip] = time.time() + def is_active(self, ip): + return ip in self.last_heartbeat_time_by_ip + def prune_active_ips(self, active_ips): active_ips = set(active_ips) active_ips.add(self.local_ip) @@ -155,12 +169,82 @@ class LoadMetrics: return resources_used, resources_total - def get_resource_demand_vector(self): - return self.waiting_bundles + self.infeasible_bundles + def get_resource_demand_vector(self, clip=True): + if clip: + # Bound the total number of bundles to + # 2xMAX_RESOURCE_DEMAND_VECTOR_SIZE. This guarantees the resource + # demand scheduler bin packing algorithm takes a reasonable amount + # of time to run. + return ( + self. + waiting_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE] + + self. + infeasible_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE] + ) + else: + return self.waiting_bundles + self.infeasible_bundles + + def get_resource_requests(self): + return self.resource_requests def get_pending_placement_groups(self): return self.pending_placement_groups + def summary(self): + available_resources = reduce(add_resources, + self.dynamic_resources_by_ip.values() + ) if self.dynamic_resources_by_ip else {} + total_resources = reduce(add_resources, + self.static_resources_by_ip.values() + ) if self.static_resources_by_ip else {} + usage_dict = {} + for key in total_resources: + total = total_resources[key] + usage_dict[key] = (total - available_resources[key], total) + + summarized_demand_vector = freq_of_dicts( + self.get_resource_demand_vector(clip=False)) + summarized_resource_requests = freq_of_dicts( + self.get_resource_requests()) + + def placement_group_serializer(pg): + bundles = tuple( + frozenset(bundle.unit_resources.items()) + for bundle in pg.bundles) + return (bundles, pg.strategy) + + def placement_group_deserializer(pg_tuple): + # We marshal this as a dictionary so that we can easily json.dumps + # it later. + # TODO (Alex): Would there be a benefit to properly + # marshalling this (into a protobuf)? + bundles = list(map(dict, pg_tuple[0])) + return { + "bundles": freq_of_dicts(bundles), + "strategy": PlacementStrategy.Name(pg_tuple[1]) + } + + summarized_placement_groups = freq_of_dicts( + self.get_pending_placement_groups(), + serializer=placement_group_serializer, + deserializer=placement_group_deserializer) + nodes_summary = freq_of_dicts(self.static_resources_by_ip.values()) + + return LoadMetricsSummary( + head_ip=self.local_ip, + usage=usage_dict, + resource_demand=summarized_demand_vector, + pg_demand=summarized_placement_groups, + request_demand=summarized_resource_requests, + node_types=nodes_summary) + + def set_resource_requests(self, requested_resources): + if requested_resources is not None: + assert isinstance(requested_resources, list), requested_resources + self.resource_requests = [ + request for request in requested_resources if len(request) > 0 + ] + def info_string(self): return " - " + "\n - ".join( ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]) diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index aba8cff2d..d838c6be1 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -149,8 +149,8 @@ class ResourceDemandScheduler: node_resources, node_type_counts = self.calculate_node_resources( nodes, launching_nodes, unused_resources_by_ip) - logger.info("Cluster resources: {}".format(node_resources)) - logger.info("Node counts: {}".format(node_type_counts)) + logger.debug("Cluster resources: {}".format(node_resources)) + logger.debug("Node counts: {}".format(node_type_counts)) # Step 2: add nodes to add to satisfy min_workers for each type (node_resources, node_type_counts, @@ -160,7 +160,7 @@ class ResourceDemandScheduler: self.max_workers, self.head_node_type, ensure_min_cluster_size) # Step 3: add nodes for strict spread groups - logger.info(f"Placement group demands: {pending_placement_groups}") + logger.debug(f"Placement group demands: {pending_placement_groups}") placement_group_demand_vector, strict_spreads = \ placement_groups_to_resource_demands(pending_placement_groups) resource_demands.extend(placement_group_demand_vector) @@ -187,8 +187,8 @@ class ResourceDemandScheduler: # groups unfulfilled, _ = get_bin_pack_residual(node_resources, resource_demands) - logger.info("Resource demands: {}".format(resource_demands)) - logger.info("Unfulfilled demands: {}".format(unfulfilled)) + logger.debug("Resource demands: {}".format(resource_demands)) + logger.debug("Unfulfilled demands: {}".format(unfulfilled)) # Add 1 to account for the head node. max_to_add = self.max_workers + 1 - sum(node_type_counts.values()) nodes_to_add_based_on_demand = get_nodes_for( @@ -211,7 +211,7 @@ class ResourceDemandScheduler: total_nodes_to_add, unused_resources_by_ip.keys(), nodes, launching_nodes, adjusted_min_workers) - logger.info("Node requests: {}".format(total_nodes_to_add)) + logger.debug("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add def _legacy_worker_node_to_launch( @@ -615,8 +615,14 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], # starts up because placement groups are scheduled via custom # resources. This will behave properly with the current utilization # score heuristic, but it's a little dangerous and misleading. - logger.info( - "No feasible node type to add for {}".format(resources)) + logger.warning( + f"The autoscaler could not find a node type to satisfy the" + f"request: {resources}. If this request is related to " + f"placement groups the resource request will resolve itself, " + f"otherwise please specify a node type with the necessary " + f"resource " + f"https://docs.ray.io/en/master/cluster/autoscaling.html#multiple-node-type-autoscaling." # noqa: E501 + ) break utilization_scores = sorted(utilization_scores, reverse=True) diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index b4066df95..1ab7c2e68 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -1,13 +1,15 @@ import collections +from datetime import datetime import logging import hashlib import json import jsonschema import os import threading -from typing import Any, Dict +from typing import Any, Dict, List import ray +import ray.ray_constants import ray._private.services as services from ray.autoscaler._private.providers import _get_default_config from ray.autoscaler._private.docker import validate_docker_config @@ -20,6 +22,7 @@ RAY_SCHEMA_PATH = os.path.join( # Internal kv keys for storing debug status. DEBUG_AUTOSCALING_ERROR = "__autoscaling_error" DEBUG_AUTOSCALING_STATUS = "__autoscaling_status" +DEBUG_AUTOSCALING_STATUS_LEGACY = "__autoscaling_status_legacy" logger = logging.getLogger(__name__) @@ -246,6 +249,47 @@ def hash_runtime_conf(file_mounts, return (_hash_cache[conf_str], file_mounts_contents_hash) +def add_resources(dict1: Dict[str, float], + dict2: Dict[str, float]) -> Dict[str, float]: + """Add the values in two dictionaries. + + Returns: + dict: A new dictionary (inputs remain unmodified). + """ + new_dict = dict1.copy() + for k, v in dict2.items(): + new_dict[k] = v + new_dict.get(k, 0) + return new_dict + + +def freq_of_dicts(dicts: List[Dict], + serializer=lambda d: frozenset(d.items()), + deserializer=dict): + """Count a list of dictionaries (or unhashable types). + + This is somewhat annoying because mutable data structures aren't hashable, + and set/dict keys must be hashable. + + Args: + dicts (List[D]): A list of dictionaries to be counted. + serializer (D -> S): A custom serailization function. The output type S + must be hashable. The default serializer converts a dictionary into + a frozenset of KV pairs. + deserializer (S -> U): A custom deserialization function. See the + serializer for information about type S. For dictionaries U := D. + + Returns: + List[Tuple[U, int]]: Returns a list of tuples. Each entry in the list + is a tuple containing a unique entry from `dicts` and its + corresponding frequency count. + """ + freqs = collections.Counter(map(lambda d: serializer(d), dicts)) + as_list = [] + for as_set, count in freqs.items(): + as_list.append((deserializer(as_set), count)) + return as_list + + def add_prefix(info_string, prefix): """Prefixes each line of info_string, except the first, by prefix.""" lines = info_string.split("\n") @@ -255,3 +299,132 @@ def add_prefix(info_string, prefix): prefixed_lines.append(prefixed_line) prefixed_info_string = "\n".join(prefixed_lines) return prefixed_info_string + + +def format_pg(pg): + strategy = pg["strategy"] + bundles = pg["bundles"] + shape_strs = [] + for bundle, count in bundles: + shape_strs.append(f"{bundle} * {count}") + bundles_str = ", ".join(shape_strs) + return f"{bundles_str} ({strategy})" + + +def get_usage_report(lm_summary): + usage_lines = [] + for resource, (used, total) in lm_summary.usage.items(): + line = f" {used}/{total} {resource}" + if resource in ["memory", "object_store_memory"]: + to_GiB = ray.ray_constants.MEMORY_RESOURCE_UNIT_BYTES / 2**30 + used *= to_GiB + total *= to_GiB + line = f" {used:.2f}/{total:.3f} GiB {resource}" + usage_lines.append(line) + usage_report = "\n".join(usage_lines) + return usage_report + + +def get_demand_report(lm_summary): + demand_lines = [] + for bundle, count in lm_summary.resource_demand: + line = f" {bundle}: {count}+ pending tasks/actors" + demand_lines.append(line) + for entry in lm_summary.pg_demand: + pg, count = entry + pg_str = format_pg(pg) + line = f" {pg_str}: {count}+ pending placement groups" + demand_lines.append(line) + for bundle, count in lm_summary.request_demand: + line = f" {bundle}: {count}+ from request_resources()" + demand_lines.append(line) + if len(demand_lines) > 0: + demand_report = "\n".join(demand_lines) + else: + demand_report = " (no resource demands)" + return demand_report + + +def format_info_string(lm_summary, autoscaler_summary, time=None): + if time is None: + time = datetime.now() + header = "=" * 8 + f" Autoscaler status: {time} " + "=" * 8 + separator = "-" * len(header) + available_node_report_lines = [] + for node_type, count in autoscaler_summary.active_nodes.items(): + line = f" {count} {node_type}" + available_node_report_lines.append(line) + available_node_report = "\n".join(available_node_report_lines) + + pending_lines = [] + for node_type, count in autoscaler_summary.pending_launches.items(): + line = f" {node_type}, {count} launching" + pending_lines.append(line) + for ip, node_type in autoscaler_summary.pending_nodes: + line = f" {ip}: {node_type}, setting up" + pending_lines.append(line) + if pending_lines: + pending_report = "\n".join(pending_lines) + else: + pending_report = " (no pending nodes)" + + failure_lines = [] + for ip, node_type in autoscaler_summary.failed_nodes: + line = f" {ip}: {node_type}" + failure_report = "Recent failures:\n" + if failure_lines: + failure_report += "\n".join(failure_lines) + else: + failure_report += " (no failures)" + + usage_report = get_usage_report(lm_summary) + demand_report = get_demand_report(lm_summary) + + formatted_output = f"""{header} +Node status +{separator} +Healthy: +{available_node_report} +Pending: +{pending_report} +{failure_report} + +Resources +{separator} + +Usage: +{usage_report} + +Demands: +{demand_report}""" + return formatted_output + + +def format_info_string_no_node_types(lm_summary, time=None): + if time is None: + time = datetime.now() + header = "=" * 8 + f" Cluster status: {time} " + "=" * 8 + separator = "-" * len(header) + + node_lines = [] + for node_type, count in lm_summary.node_types: + line = f" {count} node(s) with resources: {node_type}" + node_lines.append(line) + node_report = "\n".join(node_lines) + + usage_report = get_usage_report(lm_summary) + demand_report = get_demand_report(lm_summary) + + formatted_output = f"""{header} +Node status +{separator} +{node_report} + +Resources +{separator} +Usage: +{usage_report} + +Demands: +{demand_report}""" + return formatted_output diff --git a/python/ray/monitor.py b/python/ray/monitor.py index f650e151a..aa819c7d3 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -15,11 +15,14 @@ from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE +from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS import ray.gcs_utils import ray.utils import ray.ray_constants as ray_constants from ray.ray_logging import setup_component_logger from ray._raylet import GlobalStateAccessor +from ray.experimental.internal_kv import _internal_kv_put, \ + _internal_kv_initialized import redis @@ -65,11 +68,7 @@ def parse_resource_demands(resource_load_by_shape): except Exception: logger.exception("Failed to parse resource demands.") - # Bound the total number of bundles to 2xMAX_RESOURCE_DEMAND_VECTOR_SIZE. - # This guarantees the resource demand scheduler bin packing algorithm takes - # a reasonable amount of time to run. - return waiting_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE], \ - infeasible_bundles[:AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE] + return waiting_bundles, infeasible_bundles class Monitor: @@ -184,14 +183,8 @@ class Monitor: data: a resource request as JSON, e.g. {"CPU": 1} """ - if not self.autoscaler: - return - - try: - self.autoscaler.request_resources(json.loads(data)) - except Exception: - # We don't want this to kill the monitor. - traceback.print_exc() + resource_request = json.loads(data) + self.load_metrics.set_resource_requests(resource_request) def process_messages(self, max_messages=10000): """Process all messages ready in the subscription channels. @@ -257,12 +250,23 @@ class Monitor: # Handle messages from the subscription channels. while True: + self.update_raylet_map() + self.update_load_metrics() + status = { + "load_metrics_report": self.load_metrics.summary()._asdict() + } + # Process autoscaling actions if self.autoscaler: # Only used to update the load metrics for the autoscaler. - self.update_raylet_map() - self.update_load_metrics() self.autoscaler.update() + status[ + "autoscaler_report"] = self.autoscaler.summary()._asdict() + + as_json = json.dumps(status) + if _internal_kv_initialized(): + _internal_kv_put( + DEBUG_AUTOSCALING_STATUS, as_json, overwrite=True) # Process a round of messages. self.process_messages() diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 72f361fe2..628c1b191 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -54,8 +54,11 @@ class MockProcessRunner: self.calls = [] self.fail_cmds = fail_cmds or [] self.call_response = {} + self.ready_to_run = threading.Event() + self.ready_to_run.set() def check_call(self, cmd, *args, **kwargs): + self.ready_to_run.wait() for token in self.fail_cmds: if token in str(cmd): raise CalledProcessError(1, token, @@ -165,22 +168,28 @@ class MockProvider(NodeProvider): ] def is_running(self, node_id): - return self.mock_nodes[node_id].state == "running" + with self.lock: + return self.mock_nodes[node_id].state == "running" def is_terminated(self, node_id): - return self.mock_nodes[node_id].state in ["stopped", "terminated"] + with self.lock: + return self.mock_nodes[node_id].state in ["stopped", "terminated"] def node_tags(self, node_id): - return self.mock_nodes[node_id].tags + with self.lock: + return self.mock_nodes[node_id].tags def internal_ip(self, node_id): - return self.mock_nodes[node_id].internal_ip + with self.lock: + return self.mock_nodes[node_id].internal_ip def external_ip(self, node_id): - return self.mock_nodes[node_id].external_ip + with self.lock: + return self.mock_nodes[node_id].external_ip - def create_node(self, node_config, tags, count): - self.ready_to_create.wait() + def create_node(self, node_config, tags, count, _skip_wait=False): + if not _skip_wait: + self.ready_to_create.wait() if self.fail_creates: return with self.lock: @@ -200,7 +209,8 @@ class MockProvider(NodeProvider): self.next_id += 1 def set_node_tags(self, node_id, tags): - self.mock_nodes[node_id].tags.update(tags) + with self.lock: + self.mock_nodes[node_id].tags.update(tags) def terminate_node(self, node_id): with self.lock: @@ -534,7 +544,11 @@ class AutoscalingTest(unittest.TestCase): config["max_workers"] = 5 config_path = self.write_config(config) self.provider = MockProvider() - self.provider.create_node({}, {TAG_RAY_NODE_KIND: "worker"}, 10) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "worker", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_WORKER + }, 10) runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) autoscaler = StandardAutoscaler( @@ -562,6 +576,7 @@ class AutoscalingTest(unittest.TestCase): lm = LoadMetrics() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD }, 1) lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) @@ -658,11 +673,15 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() # 1 head node. self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() # still 1 head node because request_resources fits in the headnode. self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}] + [{"CPU": 2}] * 9) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 1 + }] + [{ + "CPU": 2 + }] * 9) autoscaler.update() self.waitForNodes(2) # Adds a single worker to get its resources. autoscaler.update() @@ -767,7 +786,8 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() self.provider.create_node({}, { TAG_RAY_NODE_KIND: "head", - TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE }, 1) head_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] @@ -817,7 +837,11 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() - self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "head", + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + }, 1) head_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] @@ -975,6 +999,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD }, 1) lm = LoadMetrics() @@ -1096,6 +1121,14 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) < 2 def testConfiguresOutdatedNodes(self): + from ray.autoscaler._private.cli_logger import cli_logger + + def do_nothing(*args, **kwargs): + pass + + cli_logger._print = type(cli_logger._print)(do_nothing, + type(cli_logger)) + config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() @@ -1133,6 +1166,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD }, 1) lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 2093f1e14..a4bfe7393 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -1,4 +1,5 @@ import pytest +from datetime import datetime import time import yaml import tempfile @@ -8,13 +9,16 @@ import copy import ray from ray.autoscaler._private.util import \ - rewrite_legacy_yaml_to_available_node_types + rewrite_legacy_yaml_to_available_node_types, format_info_string, \ + format_info_string_no_node_types from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ MockProcessRunner from ray.autoscaler._private.providers import (_NODE_PROVIDERS, _clear_provider_cache) -from ray.autoscaler._private.autoscaler import StandardAutoscaler -from ray.autoscaler._private.load_metrics import LoadMetrics +from ray.autoscaler._private.autoscaler import StandardAutoscaler, \ + AutoscalerSummary +from ray.autoscaler._private.load_metrics import LoadMetrics, \ + LoadMetricsSummary from ray.autoscaler._private.commands import get_or_create_head_node from ray.autoscaler._private.resource_demand_scheduler import \ _utilization_score, _add_min_workers_nodes, \ @@ -24,6 +28,7 @@ from ray.core.generated.common_pb2 import Bundle, PlacementStrategy from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \ NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \ STATUS_UP_TO_DATE, STATUS_UNINITIALIZED, \ + STATUS_UPDATE_FAILED, \ NODE_KIND_HEAD, NODE_TYPE_LEGACY_WORKER, \ NODE_TYPE_LEGACY_HEAD from ray.test_utils import same_elements @@ -368,6 +373,7 @@ def test_get_nodes_to_launch_with_min_workers(): provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_NODE_KIND: NODE_KIND_HEAD }, 1) @@ -390,9 +396,13 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): provider, new_types, 10, head_node_type="p2.8xlarge") provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge" + }, 1) + provider.create_node({}, { + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "p2.8xlarge" }, 1) - provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) nodes = provider.non_terminated_nodes({}) @@ -424,7 +434,10 @@ def test_get_nodes_to_launch_limits(): scheduler = ResourceDemandScheduler( provider, TYPES_A, 3, head_node_type="p2.8xlarge") - provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) + provider.create_node({}, { + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge" + }, 2) nodes = provider.non_terminated_nodes({}) @@ -442,7 +455,10 @@ def test_calculate_node_resources(): scheduler = ResourceDemandScheduler( provider, TYPES_A, 10, head_node_type="p2.8xlarge") - provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) + provider.create_node({}, { + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge" + }, 2) nodes = provider.non_terminated_nodes({}) @@ -1059,6 +1075,86 @@ class LoadMetricsTest(unittest.TestCase): pending_placement_groups=pending_placement_groups) assert lm.get_pending_placement_groups() == pending_placement_groups + def testSummary(self): + lm = LoadMetrics(local_ip="1.1.1.1") + assert lm.summary() is not None + pending_placement_groups = [ + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + PlacementGroupTableData( + state=PlacementGroupTableData.RESCHEDULING, + strategy=PlacementStrategy.PACK, + bundles=([Bundle(unit_resources={"GPU": 2})] * 2)), + ] + lm.update("1.1.1.1", {"CPU": 64}, {"CPU": 2}, {}) + lm.update("1.1.1.2", { + "CPU": 64, + "GPU": 8, + "accelerator_type:V100": 1 + }, { + "CPU": 0, + "GPU": 1, + "accelerator_type:V100": 1 + }, {}) + lm.update("1.1.1.3", { + "CPU": 64, + "GPU": 8, + "accelerator_type:V100": 1 + }, { + "CPU": 0, + "GPU": 0, + "accelerator_type:V100": 0.92 + }, {}) + lm.update( + "1.1.1.4", {"CPU": 2}, {"CPU": 2}, {}, + waiting_bundles=[{ + "GPU": 2 + }] * 10, + infeasible_bundles=[{ + "CPU": 16 + }, { + "GPU": 2 + }, { + "CPU": 16, + "GPU": 2 + }], + pending_placement_groups=pending_placement_groups) + + lm.set_resource_requests([{"CPU": 64}, {"GPU": 8}, {"GPU": 8}]) + + summary = lm.summary() + + assert summary.head_ip == "1.1.1.1" + + assert summary.usage["CPU"] == (190, 194) + assert summary.usage["GPU"] == (15, 16) + assert summary.usage["accelerator_type:V100"][1] == 2, \ + "Not comparing the usage value due to floating point error." + + assert ({"GPU": 2}, 11) in summary.resource_demand + assert ({"CPU": 16}, 1) in summary.resource_demand + assert ({"CPU": 16, "GPU": 2}, 1) in summary.resource_demand + assert len(summary.resource_demand) == 3 + + assert ({ + "bundles": [({ + "GPU": 2 + }, 2)], + "strategy": "PACK" + }, 2) in summary.pg_demand + assert len(summary.pg_demand) == 1 + + assert ({"GPU": 8}, 2) in summary.request_demand + assert ({"CPU": 64}, 1) in summary.request_demand + assert len(summary.request_demand) == 2 + + # TODO (Alex): This set of nodes won't be very useful in practice + # because the node:xxx.xxx.xxx.xxx resources means that no 2 nodes + # should ever have the same set of resources. + assert len(summary.node_types) == 3 + class AutoscalingTest(unittest.TestCase): def setUp(self): @@ -1157,6 +1253,87 @@ class AutoscalingTest(unittest.TestCase): self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE), "empty_node") + def testSummary(self): + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"]["m4.large"]["min_workers"] = 2 + config["max_workers"] = 10 + config["docker"] = {} + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + }, 1) + head_ip = self.provider.non_terminated_node_ips({})[0] + lm = LoadMetrics(local_ip=head_ip) + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + max_launch_batch=1, + max_concurrent_launches=10, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 1 + autoscaler.update() + self.waitForNodes(3) + + for ip in self.provider.non_terminated_node_ips({}): + lm.update(ip, {"CPU": 2}, {"CPU": 0}, {}) + + lm.update(head_ip, {"CPU": 16}, {"CPU": 1}, {}) + autoscaler.update() + + while True: + if len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + })) == 3: + break + + # After this section, the p2.xlarge is now in the setup process. + runner.ready_to_run.clear() + + lm.update( + head_ip, {"CPU": 16}, {"CPU": 1}, {}, waiting_bundles=[{ + "GPU": 1 + }]) + + autoscaler.update() + self.waitForNodes(4) + + self.provider.ready_to_create.clear() + lm.set_resource_requests([{"CPU": 64}] * 2) + autoscaler.update() + + self.provider.create_node( + {}, { + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_USER_NODE_TYPE: "m4.4xlarge", + TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED + }, + 1, + _skip_wait=True) + self.waitForNodes(5) + + print(f"Head ip: {head_ip}") + summary = autoscaler.summary() + + assert summary.active_nodes["m4.large"] == 2 + assert summary.active_nodes["empty_node"] == 1 + assert len(summary.active_nodes) == 2, summary.active_nodes + + assert summary.pending_nodes == [("172.0.0.3", "p2.xlarge")] + assert summary.pending_launches == {"m4.16xlarge": 2} + + assert summary.failed_nodes == [("172.0.0.4", "m4.4xlarge")] + + # Make sure we return something (and don't throw exceptions). Let's not + # get bogged down with a full cli test here. + assert len(autoscaler.info_string()) > 1 + def testScaleUpMinSanity(self): config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["available_node_types"]["m4.large"]["min_workers"] = \ @@ -1166,6 +1343,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) autoscaler = StandardAutoscaler( @@ -1191,6 +1369,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) autoscaler = StandardAutoscaler( @@ -1217,6 +1396,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: "head", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "m4.4xlarge" }, 1) head_ip = self.provider.non_terminated_node_ips({})[0] @@ -1227,7 +1407,7 @@ class AutoscalingTest(unittest.TestCase): max_failures=0, process_runner=runner, update_interval_s=0) - + head_ip = self.provider.non_terminated_node_ips({})[0] assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) @@ -1285,6 +1465,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) lm = LoadMetrics("172.0.0.0") @@ -1304,11 +1485,15 @@ class AutoscalingTest(unittest.TestCase): } == {"p2.8xlarge", "m4.large"} self.provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", - TAG_RAY_NODE_KIND: NODE_KIND_WORKER + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE }, 2) self.provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "m4.16xlarge", - TAG_RAY_NODE_KIND: NODE_KIND_WORKER + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE }, 2) assert len(self.provider.non_terminated_nodes({})) == 7 # Make sure that after idle_timeout_minutes we don't kill idle @@ -1339,7 +1524,9 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() self.provider.create_node({}, { TAG_RAY_NODE_KIND: "head", - TAG_RAY_USER_NODE_TYPE: "p2.xlarge" + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: "p2.xlarge", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE }, 1) head_ip = self.provider.non_terminated_node_ips({})[0] self.provider.finish_starting_nodes() @@ -1377,7 +1564,9 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() self.provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", - TAG_RAY_NODE_KIND: "head" + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_NODE_KIND: "head", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE }, 1) runner = MockProcessRunner() autoscaler = StandardAutoscaler( @@ -1391,16 +1580,16 @@ class AutoscalingTest(unittest.TestCase): # These requests fit on the head node. autoscaler.update() self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() self.waitForNodes(1) assert len(self.provider.mock_nodes) == 1 - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}]) autoscaler.update() self.waitForNodes(1) # This request requires an additional worker node. - autoscaler.request_resources([{"GPU": 8}] * 2) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}] * 2) autoscaler.update() self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" @@ -1415,6 +1604,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) autoscaler = StandardAutoscaler( @@ -1426,15 +1616,15 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}]) autoscaler.update() self.waitForNodes(3) assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" - autoscaler.request_resources([{"CPU": 32}] * 4) + autoscaler.load_metrics.set_resource_requests([{"CPU": 32}] * 4) autoscaler.update() self.waitForNodes(5) @@ -1450,6 +1640,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) self.provider.create_node({}, { + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_NODE_KIND: NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) @@ -1462,11 +1653,11 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) assert self.provider.mock_nodes[1].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}]) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" @@ -1495,6 +1686,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) lm = LoadMetrics("172.0.0.0") @@ -1541,6 +1733,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) lm = LoadMetrics("172.0.0.0") @@ -1554,15 +1747,15 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}]) autoscaler.update() self.waitForNodes(3) assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" - autoscaler.request_resources([{"GPU": 1}] * 9) + autoscaler.load_metrics.set_resource_requests([{"GPU": 1}] * 9) autoscaler.update() self.waitForNodes(4) assert self.provider.mock_nodes[3].node_type == "p2.xlarge" @@ -1601,6 +1794,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) autoscaler = StandardAutoscaler( @@ -1612,21 +1806,21 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}]) autoscaler.update() self.waitForNodes(3) assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" - autoscaler.request_resources([{"GPU": 1}] * 9) + autoscaler.load_metrics.set_resource_requests([{"GPU": 1}] * 9) autoscaler.update() self.waitForNodes(4) assert self.provider.mock_nodes[3].node_type == "p2.xlarge" autoscaler.update() # Fill up m4, p2.8, p2 and request 2 more CPUs - autoscaler.request_resources([{ + autoscaler.load_metrics.set_resource_requests([{ "CPU": 2 }, { "CPU": 16 @@ -1674,6 +1868,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) autoscaler = StandardAutoscaler( @@ -1702,6 +1897,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) autoscaler = StandardAutoscaler( @@ -1713,11 +1909,11 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - autoscaler.request_resources([{"CPU": 1}]) + autoscaler.load_metrics.set_resource_requests([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.load_metrics.set_resource_requests([{"GPU": 8}]) autoscaler.update() self.waitForNodes(3) assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" @@ -1750,6 +1946,7 @@ class AutoscalingTest(unittest.TestCase): runner = MockProcessRunner() self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) lm = LoadMetrics("172.0.0.0") @@ -1762,7 +1959,10 @@ class AutoscalingTest(unittest.TestCase): update_interval_s=0) autoscaler.update() self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }]) autoscaler.update() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) non_terminated_nodes = autoscaler.provider.non_terminated_nodes({}) @@ -1783,10 +1983,16 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() # this fits on request_resources()! self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }] * 2) autoscaler.update() self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }]) lm.update( node_ip, config["available_node_types"]["def_worker"]["resources"], {}, {}, @@ -1859,6 +2065,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) lm = LoadMetrics("172.0.0.0") @@ -1868,7 +2075,10 @@ class AutoscalingTest(unittest.TestCase): max_failures=0, process_runner=runner, update_interval_s=0) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }]) autoscaler.update() # 1 min worker for both min_worker and request_resources() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) @@ -1887,16 +2097,25 @@ class AutoscalingTest(unittest.TestCase): "CPU": 0.2, "WORKER": 1.0 }]) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }] * 2) autoscaler.update() # 2 requested_resource, 1 min worker, 1 free node -> 2 nodes total self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }]) autoscaler.update() # Still 2 because the second one is not connected and hence # request_resources occupies the connected node. self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 3) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 0.2, + "WORKER": 1.0 + }] * 3) lm.update( node_ip, config["available_node_types"]["def_worker"]["resources"], {}, {}, @@ -1906,7 +2125,7 @@ class AutoscalingTest(unittest.TestCase): }] * 3) autoscaler.update() self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - autoscaler.request_resources([]) + autoscaler.load_metrics.set_resource_requests([]) lm.update("172.0.0.2", config["available_node_types"]["def_worker"]["resources"], @@ -1919,6 +2138,8 @@ class AutoscalingTest(unittest.TestCase): lm.update(node_ip, config["available_node_types"]["def_worker"]["resources"], {}, {}) + print("============ Should scale down from here =============", + node_id) autoscaler.update() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # If node {node_id} was terminated any time then it's state will be set @@ -1958,6 +2179,7 @@ class AutoscalingTest(unittest.TestCase): runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) self.provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, TAG_RAY_USER_NODE_TYPE: "empty_node" }, 1) lm = LoadMetrics("172.0.0.0") @@ -1967,7 +2189,10 @@ class AutoscalingTest(unittest.TestCase): max_failures=0, process_runner=runner, update_interval_s=0) - autoscaler.request_resources([{"CPU": 2, "WORKER": 1.0}] * 2) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 2, + "WORKER": 1.0 + }] * 2) autoscaler.update() # 2 min worker for both min_worker and request_resources(), not 3. self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) @@ -1998,12 +2223,14 @@ class AutoscalingTest(unittest.TestCase): "max_workers": 3, } }) + config["idle_timeout_minutes"] = 0 config_path = self.write_config(config) self.provider = MockProvider() self.provider.create_node({}, { TAG_RAY_NODE_KIND: "head", - TAG_RAY_USER_NODE_TYPE: "empty_node" + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, + TAG_RAY_USER_NODE_TYPE: "empty_node", }, 1) runner = MockProcessRunner() @@ -2023,7 +2250,10 @@ class AutoscalingTest(unittest.TestCase): waiting_bundles=[{ "CPU": 2 }]) - autoscaler.request_resources([{"CPU": 2, "GPU": 1}] * 2) + autoscaler.load_metrics.set_resource_requests([{ + "CPU": 2, + "GPU": 1 + }] * 2) autoscaler.update() # 1 head, 1 worker. self.waitForNodes(2) @@ -2041,6 +2271,140 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(2) +def format_pg(pg): + strategy = pg["strategy"] + bundles = pg["bundles"] + shape_strs = [] + for bundle, count in bundles: + shape_strs.append(f"{bundle} * {count}") + bundles_str = ", ".join(shape_strs) + return f"{bundles_str} ({strategy})" + + +def test_info_string(): + lm_summary = LoadMetricsSummary( + head_ip="0.0.0.0", + usage={ + "CPU": (530, 544), + "GPU": (2, 2), + "AcceleratorType:V100": (0, 2), + "memory": (0, 1583.19), + "object_store_memory": (0, 471.02) + }, + resource_demand=[({ + "CPU": 1 + }, 150)], + pg_demand=[({ + "bundles": [({ + "CPU": 4 + }, 5)], + "strategy": "PACK" + }, 420)], + request_demand=[({ + "CPU": 16 + }, 100)], + node_types=[]) + autoscaler_summary = AutoscalerSummary( + active_nodes={ + "p3.2xlarge": 2, + "m4.4xlarge": 20 + }, + pending_nodes=[("1.2.3.4", "m4.4xlarge"), ("1.2.3.5", "m4.4xlarge")], + pending_launches={"m4.4xlarge": 2}, + failed_nodes=[("1.2.3.6", "p3.2xlarge")]) + + expected = """ +======== Autoscaler status: 2020-12-28 01:02:03 ======== +Node status +-------------------------------------------------------- +Healthy: + 2 p3.2xlarge + 20 m4.4xlarge +Pending: + m4.4xlarge, 2 launching + 1.2.3.4: m4.4xlarge, setting up + 1.2.3.5: m4.4xlarge, setting up +Recent failures: + (no failures) + +Resources +-------------------------------------------------------- + +Usage: + 530/544 CPU + 2/2 GPU + 0/2 AcceleratorType:V100 + 0.00/77.304 GiB memory + 0.00/22.999 GiB object_store_memory + +Demands: + {'CPU': 1}: 150+ pending tasks/actors + {'CPU': 4} * 5 (PACK): 420+ pending placement groups + {'CPU': 16}: 100+ from request_resources() +""".strip() + + actual = format_info_string( + lm_summary, + autoscaler_summary, + time=datetime(year=2020, month=12, day=28, hour=1, minute=2, second=3)) + print(actual) + assert expected == actual + + +def test_info_string_no_node_type(): + lm_summary = LoadMetricsSummary( + head_ip="0.0.0.0", + usage={ + "CPU": (530, 544), + "GPU": (2, 2), + "AcceleratorType:V100": (0, 2), + "memory": (0, 1583.19), + "object_store_memory": (0, 471.02) + }, + resource_demand=[({ + "CPU": 1 + }, 150)], + pg_demand=[({ + "bundles": [({ + "CPU": 4 + }, 5)], + "strategy": "PACK" + }, 420)], + request_demand=[({ + "CPU": 16 + }, 100)], + node_types=[({ + "CPU": 16 + }, 1)]) + + expected = """ +======== Cluster status: 2020-12-28 01:02:03 ======== +Node status +----------------------------------------------------- + 1 node(s) with resources: {'CPU': 16} + +Resources +----------------------------------------------------- +Usage: + 530/544 CPU + 2/2 GPU + 0/2 AcceleratorType:V100 + 0.00/77.304 GiB memory + 0.00/22.999 GiB object_store_memory + +Demands: + {'CPU': 1}: 150+ pending tasks/actors + {'CPU': 4} * 5 (PACK): 420+ pending placement groups + {'CPU': 16}: 100+ from request_resources() +""".strip() + + actual = format_info_string_no_node_types( + lm_summary, + time=datetime(year=2020, month=12, day=28, hour=1, minute=2, second=3)) + print(actual) + assert expected == actual + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__]))