diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst index af091c815..996413e8f 100644 --- a/doc/source/cluster/autoscaling.rst +++ b/doc/source/cluster/autoscaling.rst @@ -8,12 +8,14 @@ Cluster Autoscaling Basics ------ -The Ray Cluster Launcher will automatically enable a load-based autoscaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up to the specified ``max_workers`` limit (specified in the cluster config). When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed. +The Ray Cluster Launcher will automatically enable a load-based autoscaler. The scheduler will look at the task, actor, and placement group resource demands from the cluster, and tries to add the minimum set of nodes that can fulfill these demands. When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed. + +To avoid launching too many nodes at once, the number of nodes allowed to be pending is limited by the ``upscaling_speed`` setting. By default it is set to ``1.0``, which means the cluster can grow in size by at most ``100%`` at a time (doubling in size each time). This fraction can be set to as high as needed, e.g., ``99999`` to allow the cluster to quickly grow to its max size. In more detail, the autoscaler implements the following control loop: - 1. It calculates the estimated utilization of the cluster based on the most-currently-assigned resource. For example, suppose a cluster has 100/200 CPUs assigned, but 20/25 GPUs assigned, then the utilization will be considered to be max(100/200, 15/25) = 60%. - 2. If the estimated utilization is greater than the target (80% by default), then the autoscaler will attempt to add nodes to the cluster. + 1. It calculates the number of nodes required to satisfy all currently pending tasks, actor, and placement group requests. + 2. If the number of nodes required total divided by the number of current nodes exceeds ``1 + upscaling_speed``, then the number of nodes launched will be limited by that threshold. 3. If a node is idle for a timeout (5 minutes by default), it is removed from the cluster. The basic autoscaling config settings are as follows: @@ -27,12 +29,11 @@ The basic autoscaling config settings are as follows: # node. This number should be >= 0. min_workers: 0 - # The autoscaler will scale up the cluster to this target fraction of resource - # usage. For example, if a cluster of 10 nodes is 100% busy and - # target_utilization is 0.8, it would resize the cluster to 13. This fraction - # can be decreased to increase the aggressiveness of upscaling. - # The max value allowed is 1.0, which is the most conservative setting. - target_utilization_fraction: 0.8 + # The autoscaler will scale up the cluster faster with higher upscaling speed. + # E.g., if the task requires adding more nodes then autoscaler will gradually + # scale up the cluster in chunks of upscaling_speed*currently_running_nodes. + # This number should be > 0. + upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. A node is # considered idle if there are no tasks or actors running on it. @@ -41,7 +42,7 @@ The basic autoscaling config settings are as follows: Programmatically Scaling a Cluster ---------------------------------- -You can from within a Ray program command the autoscaler to scale the cluster up to a desired size with ``request_resources()`` call. The cluster will immediately attempt to scale to accomodate the requested resources, bypassing normal upscaling delay. +You can from within a Ray program command the autoscaler to scale the cluster up to a desired size with ``request_resources()`` call. The cluster will immediately attempt to scale to accomodate the requested resources, bypassing normal upscaling speed constraints. .. autofunction:: ray.autoscaler.sdk.request_resources @@ -62,12 +63,10 @@ The autoscaler will not attempt to start, stop, or update unmanaged nodes. The u Multiple Node Type Autoscaling ------------------------------ -Ray supports multiple node types in a single cluster. In this mode of operation, the scheduler will look at the queue of resource shape demands from the cluster (e.g., there might be 10 tasks queued each requesting ``{"GPU": 4, "CPU": 16}``), and tries to add the minimum set of nodes that can fulfill these resource demands. This enables precise, rapid scale up compared to looking only at resource utilization, as the autoscaler also has visibility into the queue of resource demands. +Ray supports multiple node types in a single cluster. In this mode of operation, the scheduler will choose the types of nodes to add based on the resource demands, instead of always adding the same kind of node type. The concept of a cluster node type encompasses both the physical instance type (e.g., AWS p3.8xl GPU nodes vs m4.16xl CPU nodes), as well as other attributes (e.g., IAM role, the machine image, etc). `Custom resources `__ can be specified for each node type so that Ray is aware of the demand for specific node types at the application level (e.g., a task may request to be placed on a machine with a specific role or machine image via custom resource). -Multi-node type autoscaling operates in conjunction with the basic autoscaler. You may want to configure the basic autoscaler accordingly to act conservatively (i.e., set ``target_utilization_fraction: 1.0``). - An example of configuring multiple node types is as follows `(full example) `__: .. code-block:: yaml diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 3b9373383..5f12bb293 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -1,9 +1,8 @@ from collections import defaultdict, namedtuple -from typing import Any, Optional, Dict +from typing import Any, Optional, Dict, List import copy import logging import math -import numpy as np import os import subprocess import threading @@ -56,7 +55,7 @@ class StandardAutoscaler: cluster size. StandardAutoscaler is also used to bootstrap clusters (by adding workers - until the target cluster size is met). + until the cluster size that can handle the resource demand is met). """ def __init__(self, @@ -71,6 +70,7 @@ class StandardAutoscaler: # Keep this before self.reset (self.provider needs to be created # exactly once). self.provider = None + self.resource_demand_scheduler = None self.reset(errors_fatal=True) self.load_metrics = load_metrics @@ -86,7 +86,6 @@ class StandardAutoscaler: self.num_failures = 0 self.last_update_time = 0.0 self.update_interval_s = update_interval_s - self.bringup = True # Node launchers self.launch_queue = queue.Queue() @@ -115,8 +114,6 @@ class StandardAutoscaler: for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) - # Aggregate resources the user is requesting of the cluster. - self.resource_requests = defaultdict(int) # List of resource bundles the user is requesting of the cluster. self.resource_demand_vector = [] @@ -148,21 +145,12 @@ class StandardAutoscaler: self.last_update_time = now nodes = self.workers() - # Check pending nodes immediately after fetching the number of running - # nodes to minimize chance number of pending nodes changing after - # additional nodes (managed and unmanaged) are launched. - num_pending = self.pending_launches.value + self.load_metrics.prune_active_ips([ self.provider.internal_ip(node_id) for node_id in self.all_workers() ]) - target_workers = self.target_num_workers() - - if len(nodes) >= target_workers: - if "CPU" in self.resource_requests: - del self.resource_requests["CPU"] - - self.log_info_string(nodes, target_workers) + self.log_info_string(nodes) # Terminate any idle or out of date nodes last_used = self.load_metrics.last_used_time_by_ip @@ -170,16 +158,19 @@ class StandardAutoscaler: nodes_to_terminate = [] node_type_counts = collections.defaultdict(int) - for node_id in nodes: + # Sort based on last used to make sure to keep min_workers that + # were most recently used. Otherwise, _keep_min_workers_of_node_type + # might keep a node that should be terminated. + for node_id in self._sort_based_on_last_used(nodes, last_used): # Make sure to not kill idle node types if the number of workers # of that type is lower/equal to the min_workers of that type. - if self._keep_min_worker_of_node_type(node_id, node_type_counts): + if self._keep_min_worker_of_node_type( + node_id, + node_type_counts) and self.launch_config_ok(node_id): continue node_ip = self.provider.internal_ip(node_id) - if (node_ip in last_used and last_used[node_ip] < horizon) and \ - (len(nodes) - len(nodes_to_terminate) - > target_workers): + if node_ip in last_used and last_used[node_ip] < horizon: logger.info("StandardAutoscaler: " "{}: Terminating idle node".format(node_id)) nodes_to_terminate.append(node_id) @@ -191,7 +182,7 @@ class StandardAutoscaler: if nodes_to_terminate: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes, target_workers) + self.log_info_string(nodes) # Terminate nodes if there are too many nodes_to_terminate = [] @@ -205,37 +196,21 @@ class StandardAutoscaler: if nodes_to_terminate: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes, target_workers) - # First let the resource demand scheduler launch nodes, if enabled. - if self.resource_demand_scheduler: - to_launch = self.resource_demand_scheduler.get_nodes_to_launch( - self.provider.non_terminated_nodes(tag_filters={}), - self.pending_launches.breakdown(), - self.load_metrics.get_resource_demand_vector(), - self.load_metrics.get_resource_utilization(), - self.load_metrics.get_pending_placement_groups(), - self.load_metrics.get_static_node_resources_by_ip(), - ensure_min_cluster_size=self.resource_demand_vector) - for node_type, count in to_launch.items(): - self.launch_new_node(count, node_type=node_type) + self.log_info_string(nodes) - num_pending = self.pending_launches.value - nodes = self.workers() + to_launch = self.resource_demand_scheduler.get_nodes_to_launch( + self.provider.non_terminated_nodes(tag_filters={}), + self.pending_launches.breakdown(), + self.load_metrics.get_resource_demand_vector(), + self.load_metrics.get_resource_utilization(), + self.load_metrics.get_pending_placement_groups(), + self.load_metrics.get_static_node_resources_by_ip(), + ensure_min_cluster_size=self.resource_demand_vector) + for node_type, count in to_launch.items(): + self.launch_new_node(count, node_type=node_type) - # Launch additional nodes of the default type, if still needed. - num_workers = len(nodes) + num_pending - max_allowed = min(self.max_launch_batch, - self.max_concurrent_launches - num_pending) - if num_workers < target_workers and max_allowed > 0: - num_launches = min(max_allowed, target_workers - num_workers) - self.launch_new_node(num_launches, - self.config.get("worker_default_node_type")) - nodes = self.workers() - self.log_info_string(nodes, target_workers) - elif self.load_metrics.num_workers_connected() >= target_workers: - self.bringup = False - self.log_info_string(nodes, target_workers) + nodes = self.workers() # Process any completed updates completed = [] @@ -253,7 +228,7 @@ class StandardAutoscaler: # immediately trying to restart Ray on the new node. self.load_metrics.mark_active(self.provider.internal_ip(node_id)) nodes = self.workers() - self.log_info_string(nodes, target_workers) + self.log_info_string(nodes) # Update nodes with out-of-date files. # TODO(edoakes): Spawning these threads directly seems to cause @@ -279,6 +254,25 @@ class StandardAutoscaler: for node_id in nodes: self.recover_if_needed(node_id, now) + def _sort_based_on_last_used(self, nodes: List[NodeID], + last_used: Dict[str, float]) -> List[NodeID]: + """Sort the nodes based on the last time they were used. + + The first item in the return list is the least recently used. + """ + updated_last_used = copy.deepcopy(last_used) + now = time.time() + for node_id in nodes: + node_ip = self.provider.internal_ip(node_id) + if node_ip not in updated_last_used: + updated_last_used[node_ip] = now + + def last_time_used(node_id: NodeID): + node_ip = self.provider.internal_ip(node_id) + return updated_last_used[node_ip] + + return sorted(nodes, key=last_time_used, reverse=True) + def _keep_min_worker_of_node_type(self, node_id: NodeID, node_type_counts: Dict[NodeType, int]): """Returns if workers of node_type should be terminated. @@ -293,15 +287,16 @@ class StandardAutoscaler: Returns: bool: if workers of node_types should be terminated or not. """ - if self.resource_demand_scheduler: - tags = self.provider.node_tags(node_id) - if TAG_RAY_USER_NODE_TYPE in tags: - node_type = tags[TAG_RAY_USER_NODE_TYPE] - node_type_counts[node_type] += 1 - min_workers = self.available_node_types[node_type].get( - "min_workers", 0) - if node_type_counts[node_type] <= min_workers: - return True + tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in tags: + node_type = tags[TAG_RAY_USER_NODE_TYPE] + node_type_counts[node_type] += 1 + min_workers = self.available_node_types[node_type].get( + "min_workers", 0) + max_workers = self.available_node_types[node_type].get( + "max_workers", 0) + if node_type_counts[node_type] <= min(min_workers, max_workers): + return True return False @@ -349,15 +344,41 @@ class StandardAutoscaler: if not self.provider: self.provider = _get_node_provider(self.config["provider"], self.config["cluster_name"]) - # Check whether we can enable the resource demand scheduler. - if "available_node_types" in self.config: - self.available_node_types = self.config["available_node_types"] + + self.available_node_types = self.config["available_node_types"] + upscaling_speed = self.config.get("upscaling_speed") + aggressive = self.config.get("autoscaling_mode") == "aggressive" + target_utilization_fraction = self.config.get( + "target_utilization_fraction") + if upscaling_speed: + upscaling_speed = float(upscaling_speed) + # TODO(ameer): consider adding (if users ask) an option of + # initial_upscaling_num_workers. + elif aggressive: + upscaling_speed = 99999 + logger.warning( + "Legacy aggressive autoscaling mode " + "detected. Replacing it by setting upscaling_speed to " + "99999.") + elif target_utilization_fraction: + upscaling_speed = 1 / max(target_utilization_fraction, 0.001) + logger.warning( + "Legacy target_utilization_fraction config " + "detected. Replacing it by setting upscaling_speed to " + + "1 / target_utilization_fraction.") + else: + upscaling_speed = 1.0 + if self.resource_demand_scheduler: + # The node types are autofilled internally for legacy yamls, + # overwriting the class will remove the inferred node resources + # for legacy yamls. + self.resource_demand_scheduler.reset_config( + self.provider, self.available_node_types, + self.config["max_workers"], upscaling_speed) + else: self.resource_demand_scheduler = ResourceDemandScheduler( self.provider, self.available_node_types, - self.config["max_workers"]) - else: - self.available_node_types = None - self.resource_demand_scheduler = None + self.config["max_workers"], upscaling_speed) except Exception as e: if errors_fatal: @@ -366,37 +387,6 @@ class StandardAutoscaler: logger.exception("StandardAutoscaler: " "Error parsing config.") - def target_num_workers(self): - target_frac = self.config["target_utilization_fraction"] - cur_used = self.load_metrics.approx_workers_used() - ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) - ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node - - initial_workers = self.config["initial_workers"] - aggressive = self.config["autoscaling_mode"] == "aggressive" - if self.bringup: - ideal_num_workers = max(ideal_num_workers, initial_workers) - elif aggressive and cur_used > 0: - # If we want any workers, we want at least initial_workers - ideal_num_workers = max(ideal_num_workers, initial_workers) - - # Other resources are not supported at present. - if "CPU" in self.resource_requests: - try: - cores_per_worker = self.config["worker_nodes"]["Resources"][ - "CPU"] - except KeyError: - cores_per_worker = 1 # Assume the worst - - cores_desired = self.resource_requests["CPU"] - - ideal_num_workers = max( - ideal_num_workers, - int(np.ceil(cores_desired / cores_per_worker))) - - return min(self.config["max_workers"], - max(self.config["min_workers"], ideal_num_workers)) - def launch_config_ok(self, node_id): node_tags = self.provider.node_tags(node_id) tag_launch_conf = node_tags.get(TAG_RAY_LAUNCH_CONFIG) @@ -577,50 +567,40 @@ class StandardAutoscaler: return self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED}) - def log_info_string(self, nodes, target): + def log_info_string(self, nodes): tmp = "Cluster status: " - tmp += self.info_string(nodes, target) + tmp += self.info_string(nodes) tmp += "\n" tmp += self.load_metrics.info_string() tmp += "\n" - if self.resource_demand_scheduler: - tmp += self.resource_demand_scheduler.debug_string( - nodes, self.pending_launches.breakdown(), - self.load_metrics.get_resource_utilization()) + tmp += self.resource_demand_scheduler.debug_string( + nodes, self.pending_launches.breakdown(), + self.load_metrics.get_resource_utilization()) if _internal_kv_initialized(): _internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True) - logger.info(tmp) + logger.debug(tmp) - def info_string(self, nodes, target): + def info_string(self, nodes): suffix = "" - if self.pending_launches: - suffix += " ({} pending)".format(self.pending_launches.value) if self.updaters: suffix += " ({} updating)".format(len(self.updaters)) if self.num_failed_updates: suffix += " ({} failed to update)".format( len(self.num_failed_updates)) - if self.bringup: - suffix += " (bringup=True)" - return "{}/{} target nodes{}".format(len(nodes), target, suffix) + return "{} nodes{}".format(len(nodes), suffix) - def request_resources(self, resources): - """Called by monitor to request resources (EXPERIMENTAL). + def request_resources(self, resources: List[dict]): + """Called by monitor to request resources. Args: - resources: Either a list of resource bundles or a single resource - demand dictionary. + resources: A list of resource bundles. """ if resources: logger.info( "StandardAutoscaler: resource_requests={}".format(resources)) - if isinstance(resources, list): - self.resource_demand_vector = resources - else: - for resource, count in resources.items(): - self.resource_requests[resource] = max( - self.resource_requests[resource], count) + assert isinstance(resources, list), resources + self.resource_demand_vector = resources def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") diff --git a/python/ray/autoscaler/_private/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py index c49c9046e..25f971a2b 100644 --- a/python/ray/autoscaler/_private/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -302,12 +302,6 @@ class AWSNodeProvider(NodeProvider): tags = to_aws_format(tags) conf = node_config.copy() - # Delete unsupported keys from the node config - try: - del conf["Resources"] - except KeyError: - pass - tag_pairs = [{ "Key": TAG_RAY_CLUSTER_NAME, "Value": self.cluster_name, @@ -515,8 +509,8 @@ class AWSNodeProvider(NodeProvider): available_node_types[node_type].get("resources", {}): available_node_types[node_type][ "resources"] = autodetected_resources - cli_logger.print("Updating the resources of {} to {}.", - node_type, autodetected_resources) + logger.debug("Updating the resources of {} to {}.".format( + node_type, autodetected_resources)) else: raise ValueError("Instance type " + instance_type + " is not available in AWS region: " + diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 777ee1073..d5f3b73e9 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -99,12 +99,6 @@ class LoadMetrics: prune(self.resource_load_by_ip) prune(self.last_heartbeat_time_by_ip) - def approx_workers_used(self): - return self._info()["NumNodesUsed"] - - def num_workers_connected(self): - return self._info()["NumNodesConnected"] - def get_node_resources(self): """Return a list of node resources (static resource sizes). @@ -128,9 +122,7 @@ class LoadMetrics: def _get_resource_usage(self): num_nodes = 0 - nodes_used = 0.0 num_nonidle = 0 - has_saturated_node = False resources_used = {} resources_total = {} for ip, max_resources in self.static_resources_by_ip.items(): @@ -143,7 +135,6 @@ class LoadMetrics: max_frac = 0.0 for resource_id, amount in resource_load.items(): if amount > 0: - has_saturated_node = True max_frac = 1.0 # the resource is saturated for resource_id, amount in max_resources.items(): used = amount - avail_resources[resource_id] @@ -157,17 +148,10 @@ class LoadMetrics: frac = used / float(amount) if frac > max_frac: max_frac = frac - nodes_used += max_frac if max_frac > 0: num_nonidle += 1 - # If any nodes have a queue buildup, assume all non-idle nodes are 100% - # busy, plus the head node. This guards against the case of not scaling - # up due to poor task packing. - if has_saturated_node: - nodes_used = min(num_nonidle + 1.0, num_nodes) - - return nodes_used, resources_used, resources_total + return resources_used, resources_total def get_resource_demand_vector(self): return self.waiting_bundles + self.infeasible_bundles @@ -180,8 +164,7 @@ class LoadMetrics: ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]) def _info(self): - nodes_used, resources_used, resources_total = self._get_resource_usage( - ) + resources_used, resources_total = self._get_resource_usage() now = time.time() idle_times = [now - t for t in self.last_used_time_by_ip.values()] @@ -211,8 +194,6 @@ class LoadMetrics: for rid in sorted(resources_used) if not rid.startswith("node:") ]), - "NumNodesConnected": len(self.static_resources_by_ip), - "NumNodesUsed": round(nodes_used, 2), "NodeIdleSeconds": "Min={} Mean={} Max={}".format( int(np.min(idle_times)) if idle_times else -1, int(np.mean(idle_times)) if idle_times else -1, diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 4a0bcdee5..62a71ef3a 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -17,12 +17,15 @@ from typing import List, Dict from ray.autoscaler.node_provider import NodeProvider from ray.gcs_utils import PlacementGroupTableData from ray.core.generated.common_pb2 import PlacementStrategy -from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, - NODE_TYPE_LEGACY_WORKER, NODE_KIND_WORKER, - NODE_TYPE_LEGACY_HEAD, TAG_RAY_NODE_KIND) +from ray.autoscaler.tags import ( + TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, NODE_TYPE_LEGACY_WORKER, + NODE_KIND_WORKER, NODE_TYPE_LEGACY_HEAD, TAG_RAY_NODE_KIND, NODE_KIND_HEAD) logger = logging.getLogger(__name__) +# The minimum number of nodes to launch concurrently. +UPSCALING_INITIAL_NUM_NODES = 5 + # e.g., cpu_4_ondemand. NodeType = str @@ -40,21 +43,69 @@ NodeIP = str class ResourceDemandScheduler: - def __init__(self, provider: NodeProvider, + def __init__(self, + provider: NodeProvider, node_types: Dict[NodeType, NodeTypeConfigDict], - max_workers: int): + max_workers: int, + upscaling_speed: float = 1) -> None: self.provider = provider self.node_types = copy.deepcopy(node_types) self.max_workers = max_workers - # is_legacy_yaml tracks if the cluster configs was originally without - # available_node_types and was autofilled with available_node_types. - self.is_legacy_yaml = (NODE_TYPE_LEGACY_HEAD in node_types - and NODE_TYPE_LEGACY_WORKER in node_types) + self.upscaling_speed = upscaling_speed + + def reset_config(self, + provider: NodeProvider, + node_types: Dict[NodeType, NodeTypeConfigDict], + max_workers: int, + upscaling_speed: float = 1) -> None: + """Updates the class state variables. + + For legacy yamls, it merges previous state and new state to make sure + inferered resources are not lost. + """ + new_node_types = copy.deepcopy(node_types) + final_node_types = new_node_types + if self.is_legacy_yaml(new_node_types): # If new configs are legacy. + if self.is_legacy_yaml(): # If old configs were legacy. + + def _update_based_on_node_config(node_type: NodeType) -> None: + if self.node_types[node_type][ + "node_config"] == new_node_types[node_type][ + "node_config"]: # If node config didnt change. + if self.node_types[node_type]["resources"]: + # If we already know the resources, do not + # overwrite them. This helps also if in legacy + # yamls the user provides "resources" field. + del new_node_types[node_type]["resources"] + self.node_types[node_type].update( + new_node_types[node_type]) + else: + self.node_types[node_type] = new_node_types[node_type] + + _update_based_on_node_config(NODE_TYPE_LEGACY_HEAD) + _update_based_on_node_config(NODE_TYPE_LEGACY_WORKER) + final_node_types = self.node_types + + self.provider = provider + self.node_types = copy.deepcopy(final_node_types) + self.max_workers = max_workers + self.upscaling_speed = upscaling_speed + + def is_legacy_yaml(self, + node_types: Dict[NodeType, NodeTypeConfigDict] = None + ) -> bool: + """Returns if the node types came from a legacy yaml. + + A legacy yaml is one that was originally without available_node_types + and was autofilled with available_node_types.""" + node_types = node_types or self.node_types + return (NODE_TYPE_LEGACY_HEAD in node_types + and NODE_TYPE_LEGACY_WORKER in node_types) def get_nodes_to_launch( self, nodes: List[NodeID], - pending_nodes: Dict[NodeType, int], + launching_nodes: Dict[NodeType, int], resource_demands: List[ResourceDict], unused_resources_by_ip: Dict[NodeIP, ResourceDict], pending_placement_groups: List[PlacementGroupTableData], @@ -75,7 +126,7 @@ class ResourceDemandScheduler: Args: nodes: List of existing nodes in the cluster. - pending_nodes: Summary of node types currently being launched. + launching_nodes: Summary of node types currently being launched. resource_demands: Vector of resource demands from the scheduler. unused_resources_by_ip: Mapping from ip to available resources. pending_placement_groups: Placement group demands. @@ -102,7 +153,7 @@ class ResourceDemandScheduler: else: resource_requests = [] - if self.is_legacy_yaml: + if self.is_legacy_yaml(): # When using legacy yaml files we need to infer the head & worker # node resources from the static node resources from LoadMetrics. self._infer_legacy_node_resources_if_needed(max_resources_by_ip) @@ -110,7 +161,7 @@ class ResourceDemandScheduler: node_resources: List[ResourceDict] node_type_counts: Dict[NodeType, int] node_resources, node_type_counts = self.calculate_node_resources( - nodes, pending_nodes, unused_resources_by_ip) + nodes, launching_nodes, unused_resources_by_ip) logger.info("Cluster resources: {}".format(node_resources)) logger.info("Node counts: {}".format(node_type_counts)) @@ -125,12 +176,12 @@ class ResourceDemandScheduler: placement_groups_to_resource_demands(pending_placement_groups) resource_demands.extend(placement_group_demand_vector) - if self.is_legacy_yaml and \ + if self.is_legacy_yaml() and \ not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: # Need to launch worker nodes to later infer their # resources. return self._legacy_worker_node_to_launch( - nodes, pending_nodes, node_resources, resource_demands) + nodes, launching_nodes, node_resources, resource_demands) placement_group_nodes_to_add, node_resources, node_type_counts = \ self.reserve_and_allocate_spread( strict_spreads, node_resources, node_type_counts) @@ -164,13 +215,13 @@ class ResourceDemandScheduler: # Limit the number of concurrent launches total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( total_nodes_to_add, unused_resources_by_ip.keys(), nodes, - pending_nodes, nodes_to_add_based_on_requests) + launching_nodes, nodes_to_add_based_on_requests) logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add def _legacy_worker_node_to_launch( - self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], + self, nodes: List[NodeID], launching_nodes: Dict[NodeType, int], node_resources: List[ResourceDict], resource_demands: List[ResourceDict]) -> Dict[NodeType, int]: """Get worker nodes to launch when resources missing in legacy yamls. @@ -179,22 +230,24 @@ class ResourceDemandScheduler: workers, it returns max(1, min_workers) worker nodes from which we later calculate the node resources. """ + worker_nodes = self.provider.non_terminated_nodes( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) if self.max_workers == 0: return {} - elif pending_nodes or len(nodes) > 1: - # If we are already launching a worker node. - # If first worker node fails this will never launch more nodes. + elif sum(launching_nodes.values()) + len(worker_nodes) > 0: + # If we are already launching a worker node, wait for its resources + # to be known. + # TODO(ameer): Note that if first worker node fails this will never + # launch any more nodes. return {} else: unfulfilled, _ = get_bin_pack_residual(node_resources, resource_demands) - if self.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] > 0 or \ - unfulfilled: - return { - NODE_TYPE_LEGACY_WORKER: max( - 1, self.node_types[NODE_TYPE_LEGACY_WORKER][ - "min_workers"]) - } + workers_to_add = min( + self.node_types[NODE_TYPE_LEGACY_WORKER].get("min_workers", 0), + self.node_types[NODE_TYPE_LEGACY_WORKER].get("max_workers", 0)) + if workers_to_add > 0 or unfulfilled: + return {NODE_TYPE_LEGACY_WORKER: max(1, workers_to_add)} else: return {} @@ -210,25 +263,29 @@ class ResourceDemandScheduler: """ # We fill the head node resources only once. if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]: - assert len(max_resources_by_ip) == 1 # Only the head node. - self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = next( - iter(max_resources_by_ip.values())) + try: + head_ip = self.provider.internal_ip( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_HEAD + })[0]) + self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = \ + copy.deepcopy(max_resources_by_ip[head_ip]) + except (IndexError, KeyError): + logger.exception("Could not reach the head node.") # We fill the worker node resources only once. if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: - if len(max_resources_by_ip) > 1: - # Set the node_types here as we already launched a worker node - # from which we directly get the node_resources. - worker_nodes = self.provider.non_terminated_nodes( - tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) - worker_node_ips = [ - self.provider.internal_ip(node_id) - for node_id in worker_nodes - ] - for ip in worker_node_ips: - if ip in max_resources_by_ip: - self.node_types[NODE_TYPE_LEGACY_WORKER][ - "resources"] = max_resources_by_ip[ip] - assert self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"] + # Set the node_types here in case we already launched a worker node + # from which we can directly get the node_resources. + worker_nodes = self.provider.non_terminated_nodes( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + worker_node_ips = [ + self.provider.internal_ip(node_id) for node_id in worker_nodes + ] + for ip in worker_node_ips: + if ip in max_resources_by_ip: + self.node_types[NODE_TYPE_LEGACY_WORKER][ + "resources"] = copy.deepcopy(max_resources_by_ip[ip]) + break def _get_concurrent_resource_demand_to_launch( self, @@ -247,7 +304,8 @@ class ResourceDemandScheduler: 1) Calculates the running nodes. 2) Calculates the pending nodes and gets the launching nodes. 3) Limits the total number of pending + currently-launching + - to-be-launched nodes to max(5, frac * running_nodes[node_type]). + to-be-launched nodes to: + max(5, self.upscaling_speed * running_nodes[node_type]). Args: to_launch: List of number of nodes to launch based on resource @@ -262,8 +320,6 @@ class ResourceDemandScheduler: Dict[NodeType, int]: Maximum number of nodes to launch for each node type. """ - # TODO(ameer): Consider making frac configurable. - frac = 1 updated_nodes_to_launch = {} running_nodes, pending_nodes = \ self._separate_running_and_pending_nodes( @@ -273,7 +329,8 @@ class ResourceDemandScheduler: # Enforce here max allowed pending nodes to be frac of total # running nodes. max_allowed_pending_nodes = max( - 5, int(frac * running_nodes[node_type])) + UPSCALING_INITIAL_NUM_NODES, + int(self.upscaling_speed * running_nodes[node_type])) total_pending_nodes = pending_launches_nodes.get( node_type, 0) + pending_nodes[node_type] @@ -502,7 +559,8 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], utilization_scores = [] for node_type in node_types: if (existing_nodes.get(node_type, 0) + nodes_to_add.get( - node_type, 0) >= node_types[node_type]["max_workers"]): + node_type, 0) >= node_types[node_type].get( + "max_workers", 0)): continue node_resources = node_types[node_type]["resources"] if strict_spread: diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index 7b5314d88..9c7c4e5c5 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -101,28 +101,29 @@ def prepare_config(config): def rewrite_legacy_yaml_to_available_node_types( config: Dict[str, Any]) -> Dict[str, Any]: - if "available_node_types" in config: - return config - else: + + if "available_node_types" not in config: # TODO(ameer/ekl/alex): we can also rewrite here many other fields # that include initialization/setup/start commands and ImageId. + logger.debug("Converting legacy cluster config to multi node types.") config["available_node_types"] = { NODE_TYPE_LEGACY_HEAD: { "node_config": config["head_node"], - "resources": {}, + "resources": config["head_node"].get("resources") or {}, "min_workers": 0, "max_workers": 0, }, NODE_TYPE_LEGACY_WORKER: { "node_config": config["worker_nodes"], - "resources": {}, - "min_workers": config["min_workers"], - "max_workers": config["max_workers"], + "resources": config["worker_nodes"].get("resources") or {}, + "min_workers": config.get("min_workers", 0), + "max_workers": config.get("max_workers", 0), }, } config["head_node_type"] = NODE_TYPE_LEGACY_HEAD config["worker_default_node_type"] = NODE_TYPE_LEGACY_WORKER - return config + + return config def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: diff --git a/python/ray/autoscaler/aws/defaults.yaml b/python/ray/autoscaler/aws/defaults.yaml index a274a9fc8..b85df640c 100644 --- a/python/ray/autoscaler/aws/defaults.yaml +++ b/python/ray/autoscaler/aws/defaults.yaml @@ -9,28 +9,17 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. docker: {} -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This max value allowed is 1.0, which is the most conservative setting. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/aws/development-example.yaml b/python/ray/autoscaler/aws/development-example.yaml index 1adf7d7c5..7abb83bdd 100644 --- a/python/ray/autoscaler/aws/development-example.yaml +++ b/python/ray/autoscaler/aws/development-example.yaml @@ -9,12 +9,11 @@ min_workers: 2 # node. This takes precedence over min_workers. max_workers: 2 -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index 7343cee0b..f47a7523f 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -37,13 +33,6 @@ docker: # worker_image: "rayproject/ray:latest-cpu" # worker_run_options: [] -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This max value allowed is 1.0, which is the most conservative setting. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/aws/example-gpu-docker.yaml b/python/ray/autoscaler/aws/example-gpu-docker.yaml index b65e56464..6916feb9c 100644 --- a/python/ray/autoscaler/aws/example-gpu-docker.yaml +++ b/python/ray/autoscaler/aws/example-gpu-docker.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -31,13 +27,6 @@ docker: # worker_image: "rayproject/ray:latest" -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/aws/example-ml.yaml b/python/ray/autoscaler/aws/example-ml.yaml index 7802b9808..8da0baff6 100644 --- a/python/ray/autoscaler/aws/example-ml.yaml +++ b/python/ray/autoscaler/aws/example-ml.yaml @@ -14,15 +14,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 0 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -40,13 +36,6 @@ docker: # worker_image: "rayproject/ray:latest" -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This max value allowed is 1.0, which is the most conservative setting. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/aws/example-multi-node-type.yaml b/python/ray/autoscaler/aws/example-multi-node-type.yaml index 14d8c363b..56b5c1b78 100644 --- a/python/ray/autoscaler/aws/example-multi-node-type.yaml +++ b/python/ray/autoscaler/aws/example-multi-node-type.yaml @@ -3,6 +3,12 @@ cluster_name: multi_node_type min_workers: 1 max_workers: 40 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + # Cloud-provider specific configuration. provider: type: aws @@ -62,8 +68,6 @@ head_node: worker_nodes: ImageId: latest_dlami -# Configure the cluster for very conservative auto-scaling otherwise. -target_utilization_fraction: 1.0 idle_timeout_minutes: 2 # How Ray will authenticate with newly launched nodes. diff --git a/python/ray/autoscaler/azure/defaults.yaml b/python/ray/autoscaler/azure/defaults.yaml index 75cb0e182..4f2acdb3d 100644 --- a/python/ray/autoscaler/azure/defaults.yaml +++ b/python/ray/autoscaler/azure/defaults.yaml @@ -9,28 +9,17 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. docker: {} -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/azure/example-full.yaml b/python/ray/autoscaler/azure/example-full.yaml index c5466ccda..07fa6e495 100644 --- a/python/ray/autoscaler/azure/example-full.yaml +++ b/python/ray/autoscaler/azure/example-full.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -37,13 +33,6 @@ docker: # worker_image: "rayproject/ray:latest-cpu" # worker_run_options: [] -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/azure/example-gpu-docker.yaml b/python/ray/autoscaler/azure/example-gpu-docker.yaml index 262eff1a8..11dcece40 100644 --- a/python/ray/autoscaler/azure/example-gpu-docker.yaml +++ b/python/ray/autoscaler/azure/example-gpu-docker.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -31,13 +27,6 @@ docker: # worker_image: "rayproject/ray:latest" -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/azure/example-gpu.yaml b/python/ray/autoscaler/azure/example-gpu.yaml index 06cf42e6e..52d533b59 100644 --- a/python/ray/autoscaler/azure/example-gpu.yaml +++ b/python/ray/autoscaler/azure/example-gpu.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -35,13 +31,6 @@ docker: # worker_image: "rayproject/ray:latest" -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/gcp/defaults.yaml b/python/ray/autoscaler/gcp/defaults.yaml index 09794f0b1..667fa8f93 100644 --- a/python/ray/autoscaler/gcp/defaults.yaml +++ b/python/ray/autoscaler/gcp/defaults.yaml @@ -9,29 +9,17 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. docker: {} - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index ba0b4fc87..2f66d1dd3 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -37,14 +33,6 @@ docker: # worker_image: "rayproject/ray:latest-cpu" # worker_run_options: [] - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/gcp/example-gpu-docker.yaml b/python/ray/autoscaler/gcp/example-gpu-docker.yaml index ad6cd71de..6552a8c89 100644 --- a/python/ray/autoscaler/gcp/example-gpu-docker.yaml +++ b/python/ray/autoscaler/gcp/example-gpu-docker.yaml @@ -9,15 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. @@ -32,14 +28,6 @@ docker: # worker_image: "rayproject/ray:latest" - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 - # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/kubernetes/defaults.yaml b/python/ray/autoscaler/kubernetes/defaults.yaml index 4a5ff7e9b..8ba0b24fd 100644 --- a/python/ray/autoscaler/kubernetes/defaults.yaml +++ b/python/ray/autoscaler/kubernetes/defaults.yaml @@ -9,22 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/kubernetes/example-full.yaml b/python/ray/autoscaler/kubernetes/example-full.yaml index 2098c8a2c..17f63f606 100644 --- a/python/ray/autoscaler/kubernetes/example-full.yaml +++ b/python/ray/autoscaler/kubernetes/example-full.yaml @@ -9,22 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/kubernetes/example-ingress.yaml b/python/ray/autoscaler/kubernetes/example-ingress.yaml index 2c56a9b30..358dd09a3 100644 --- a/python/ray/autoscaler/kubernetes/example-ingress.yaml +++ b/python/ray/autoscaler/kubernetes/example-ingress.yaml @@ -9,22 +9,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 1 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 1 diff --git a/python/ray/autoscaler/local/defaults.yaml b/python/ray/autoscaler/local/defaults.yaml index c5d4911cd..b300a69a3 100644 --- a/python/ray/autoscaler/local/defaults.yaml +++ b/python/ray/autoscaler/local/defaults.yaml @@ -1,25 +1,24 @@ # An unique identifier for the head node and workers of this cluster. cluster_name: default -## NOTE: Typically for local clusters, min_workers == initial_workers == max_workers == len(worker_ips). +## NOTE: Typically for local clusters, min_workers == max_workers == len(worker_ips). # The minimum number of workers nodes to launch in addition to the head # node. This number should be >= 0. -# Typically, min_workers == initial_workers == max_workers == len(worker_ips). +# Typically, min_workers == max_workers == len(worker_ips). min_workers: 0 -# The initial number of worker nodes to launch in addition to the head node. -# Typically, min_workers == initial_workers == max_workers == len(worker_ips). -initial_workers: 0 # The maximum number of workers nodes to launch in addition to the head node. # This takes precedence over min_workers. -# Typically, min_workers == initial_workers == max_workers == len(worker_ips). +# Typically, min_workers == max_workers == len(worker_ips). max_workers: 0 -# Autoscaling parameters. -# Ignore this if min_workers == initial_workers == max_workers. -autoscaling_mode: default -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + idle_timeout_minutes: 5 # This executes all commands on all nodes in the docker container, diff --git a/python/ray/autoscaler/local/development-example.yaml b/python/ray/autoscaler/local/development-example.yaml index b8578a2ab..66744c274 100644 --- a/python/ray/autoscaler/local/development-example.yaml +++ b/python/ray/autoscaler/local/development-example.yaml @@ -4,7 +4,7 @@ max_workers: 0 docker: image: "" container_name: "" -target_utilization_fraction: 0.8 +upscaling_speed: 1.0 idle_timeout_minutes: 5 provider: type: local diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml index 55794ce36..ff0d651d0 100644 --- a/python/ray/autoscaler/local/example-full.yaml +++ b/python/ray/autoscaler/local/example-full.yaml @@ -1,25 +1,24 @@ # An unique identifier for the head node and workers of this cluster. cluster_name: default -## NOTE: Typically for local clusters, min_workers == initial_workers == max_workers == len(worker_ips). +## NOTE: Typically for local clusters, min_workers == max_workers == len(worker_ips). # The minimum number of workers nodes to launch in addition to the head # node. This number should be >= 0. -# Typically, min_workers == initial_workers == max_workers == len(worker_ips). +# Typically, min_workers == max_workers == len(worker_ips). min_workers: 0 -# The initial number of worker nodes to launch in addition to the head node. -# Typically, min_workers == initial_workers == max_workers == len(worker_ips). -initial_workers: 0 # The maximum number of workers nodes to launch in addition to the head node. # This takes precedence over min_workers. -# Typically, min_workers == initial_workers == max_workers == len(worker_ips). +# Typically, min_workers == max_workers == len(worker_ips). max_workers: 0 -# Autoscaling parameters. -# Ignore this if min_workers == initial_workers == max_workers. -autoscaling_mode: default -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + idle_timeout_minutes: 5 # This executes all commands on all nodes in the docker container, diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 276e4c654..74e873736 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -49,6 +49,11 @@ "minimum": 0, "maximum": 1 }, + "upscaling_speed": { + "description": "The autoscaler will scale up the cluster faster with higher upscaling speed. E.g., if the task requires adding more nodes then autoscaler will gradually scale up the cluster in chunks of upscaling_speed*currently_running_nodes. This number should be > 0.", + "type": "number", + "minimum": 0 + }, "idle_timeout_minutes": { "description": "If a node is idle for this many minutes, it will be removed.", "type": "integer", diff --git a/python/ray/autoscaler/staroid/defaults.yaml b/python/ray/autoscaler/staroid/defaults.yaml index 8e9c81a46..490005399 100644 --- a/python/ray/autoscaler/staroid/defaults.yaml +++ b/python/ray/autoscaler/staroid/defaults.yaml @@ -10,22 +10,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 2 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/staroid/example-full.yaml b/python/ray/autoscaler/staroid/example-full.yaml index 29af14b90..55b743ec4 100644 --- a/python/ray/autoscaler/staroid/example-full.yaml +++ b/python/ray/autoscaler/staroid/example-full.yaml @@ -10,22 +10,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 5 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/staroid/example-gpu.yaml b/python/ray/autoscaler/staroid/example-gpu.yaml index b61aaa664..3afda0636 100644 --- a/python/ray/autoscaler/staroid/example-gpu.yaml +++ b/python/ray/autoscaler/staroid/example-gpu.yaml @@ -10,22 +10,11 @@ min_workers: 0 # node. This takes precedence over min_workers. max_workers: 5 -# The initial number of worker nodes to launch in addition to the head -# node. When the cluster is first brought up (or when it is refreshed with a -# subsequent `ray up`) this number of nodes will be started. -initial_workers: 0 - -# Whether or not to autoscale aggressively. If this is enabled, if at any point -# we would start more workers, we start at least enough to bring us to -# initial_workers. -autoscaling_mode: default - -# The autoscaler will scale up the cluster to this target fraction of resource -# usage. For example, if a cluster of 10 nodes is 100% busy and -# target_utilization is 0.8, it would resize the cluster to 13. This fraction -# can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. -target_utilization_fraction: 0.8 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 # If a node is idle for this many minutes, it will be removed. idle_timeout_minutes: 5 diff --git a/python/ray/autoscaler/staroid/example-multi-node-type.yaml b/python/ray/autoscaler/staroid/example-multi-node-type.yaml index 958f32ded..860bb6a87 100644 --- a/python/ray/autoscaler/staroid/example-multi-node-type.yaml +++ b/python/ray/autoscaler/staroid/example-multi-node-type.yaml @@ -3,6 +3,12 @@ cluster_name: multi-node-type # name with 'a-z' and '-' min_workers: 1 max_workers: 40 +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + # Cloud-provider specific configuration. provider: type: staroid @@ -108,6 +114,4 @@ worker_default_node_type: cpu_4_spot # type configs given above. #worker_nodes: -# Configure the cluster for very conservative auto-scaling otherwise. -target_utilization_fraction: 0.9 idle_timeout_minutes: 5 diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 6029eee16..3cbd4b202 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -18,10 +18,12 @@ from ray.autoscaler._private import commands from ray.autoscaler.sdk import get_docker_host_mount_location from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.autoscaler import StandardAutoscaler -from ray.autoscaler._private.providers import (_NODE_PROVIDERS, - _clear_provider_cache) +from ray.autoscaler._private.providers import ( + _NODE_PROVIDERS, _clear_provider_cache, _DEFAULT_CONFIGS) from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ - STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE + STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE, \ + NODE_TYPE_LEGACY_HEAD, NODE_TYPE_LEGACY_WORKER, NODE_KIND_HEAD, \ + NODE_KIND_WORKER from ray.autoscaler.node_provider import NodeProvider from ray.test_utils import RayTestTimeoutException import pytest @@ -254,60 +256,6 @@ SMALL_CLUSTER = { class LoadMetricsTest(unittest.TestCase): - def testUpdate(self): - lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) - assert lm.approx_workers_used() == 0.5 - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) - assert lm.approx_workers_used() == 1.0 - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) - assert lm.approx_workers_used() == 2.0 - - def testLoadMessages(self): - lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) - self.assertEqual(lm.approx_workers_used(), 0.5) - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1}) - self.assertEqual(lm.approx_workers_used(), 1.0) - - # Both nodes count as busy since there is a queue on one. - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 2}, {}) - self.assertEqual(lm.approx_workers_used(), 2.0) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) - self.assertEqual(lm.approx_workers_used(), 2.0) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) - self.assertEqual(lm.approx_workers_used(), 2.0) - - # No queue anymore, so we're back to exact accounting. - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) - self.assertEqual(lm.approx_workers_used(), 1.5) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1}) - self.assertEqual(lm.approx_workers_used(), 2.0) - - lm.update("3.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("4.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("5.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("6.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("7.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("8.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - self.assertEqual(lm.approx_workers_used(), 8.0) - - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) # no queue anymore - self.assertEqual(lm.approx_workers_used(), 4.5) - - def testPruneByNodeIp(self): - lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {}) - lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {}) - lm.prune_active_ips({"1.1.1.1", "4.4.4.4"}) - assert lm.approx_workers_used() == 1.0 - - def testBottleneckResource(self): - lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) - assert lm.approx_workers_used() == 1.88 - def testHeartbeat(self): lm = LoadMetrics() lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) @@ -331,14 +279,13 @@ class LoadMetricsTest(unittest.TestCase): assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, " "1.05 GiB/1.05 GiB memory, " "1.05 GiB/2.1 GiB object_store_memory") in debug - assert "NumNodesConnected: 3" in debug - assert "NumNodesUsed: 2.88" in debug class AutoscalingTest(unittest.TestCase): def setUp(self): _NODE_PROVIDERS["mock"] = \ lambda config: self.create_provider + _DEFAULT_CONFIGS["mock"] = _DEFAULT_CONFIGS["local"] self.provider = None self.tmpdir = tempfile.mkdtemp() @@ -375,10 +322,13 @@ class AutoscalingTest(unittest.TestCase): assert self.provider return self.provider - def write_config(self, config): + def write_config(self, config, call_prepare_config=True): + new_config = copy.deepcopy(config) + if call_prepare_config: + new_config = prepare_config(new_config) path = os.path.join(self.tmpdir, "simple.yaml") with open(path, "w") as f: - f.write(yaml.dump(config)) + f.write(yaml.dump(new_config)) return path def testAutoscalerConfigValidationFailNotFatal(self): @@ -386,7 +336,6 @@ class AutoscalingTest(unittest.TestCase): # First check that this config is actually invalid with pytest.raises(ValidationError): validate_config(invalid_config) - config_path = self.write_config(invalid_config) self.provider = MockProvider() runner = MockProcessRunner() @@ -579,37 +528,6 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) - def testManualAutoscaling(self): - config = SMALL_CLUSTER.copy() - config["min_workers"] = 0 - config["max_workers"] = 50 - cores_per_node = 2 - config["worker_nodes"] = {"Resources": {"CPU": cores_per_node}} - config_path = self.write_config(config) - self.provider = MockProvider() - runner = MockProcessRunner() - autoscaler = StandardAutoscaler( - config_path, - LoadMetrics(), - max_launch_batch=5, - max_concurrent_launches=5, - max_failures=0, - process_runner=runner, - update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 - autoscaler.update() - self.waitForNodes(0) - autoscaler.request_resources({"CPU": cores_per_node * 10}) - for _ in range(5): # Maximum launch batch is 5 - time.sleep(0.01) - autoscaler.update() - self.waitForNodes(10) - autoscaler.request_resources({"CPU": cores_per_node * 30}) - for _ in range(4): # Maximum launch batch is 5 - time.sleep(0.01) - autoscaler.update() - self.waitForNodes(30) - def testTerminateOutdatedNodesGracefully(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 5 @@ -639,9 +557,10 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() + lm = LoadMetrics() autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + lm, max_launch_batch=5, max_concurrent_launches=5, max_failures=0, @@ -663,11 +582,17 @@ class AutoscalingTest(unittest.TestCase): new_config["max_workers"] = 10 self.write_config(new_config) autoscaler.update() - self.waitForNodes(6) + # Because one worker already started, the scheduler waits for its + # resources to be updated before it launches the remaining min_workers. + self.waitForNodes(1) + worker_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0] + lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {}) autoscaler.update() self.waitForNodes(10) def testInitialWorkers(self): + """initial_workers is deprecated, this tests that it is ignored.""" config = SMALL_CLUSTER.copy() config["min_workers"] = 0 config["max_workers"] = 20 @@ -685,10 +610,7 @@ class AutoscalingTest(unittest.TestCase): update_interval_s=0) self.waitForNodes(0) autoscaler.update() - self.waitForNodes(5) # expected due to batch sizes and concurrency - autoscaler.update() - self.waitForNodes(10) - autoscaler.update() + self.waitForNodes(0) def testAggressiveAutoscaling(self): config = SMALL_CLUSTER.copy() @@ -696,13 +618,16 @@ class AutoscalingTest(unittest.TestCase): config["max_workers"] = 20 config["initial_workers"] = 10 config["idle_timeout_minutes"] = 0 - config["autoscaling_mode"] = "aggressive" + config["upscaling_speed"] = config["max_workers"] config_path = self.write_config(config) self.provider = MockProvider() - self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) head_ip = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, )[0] runner = MockProcessRunner() lm = LoadMetrics() @@ -718,32 +643,45 @@ class AutoscalingTest(unittest.TestCase): update_interval_s=0) self.waitForNodes(1) + lm.update( + head_ip, {"CPU": 1}, {"CPU": 0}, {}, + waiting_bundles=[{ + "CPU": 1 + }] * 7, + infeasible_bundles=[{ + "CPU": 1 + }] * 3) autoscaler.update() - self.waitForNodes(6) # expected due to batch sizes and concurrency + self.waitForNodes(2) # launches a single node to get its resources + worker_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0] + lm.update( + worker_ip, {"CPU": 1}, {"CPU": 1}, {}, + waiting_bundles=[{ + "CPU": 1 + }] * 7, + infeasible_bundles=[{ + "CPU": 1 + }] * 3) + # Otherwise the worker is immediately terminated due to being idle. + lm.last_used_time_by_ip[worker_ip] = time.time() + 5 autoscaler.update() self.waitForNodes(11) - - # Connect the head and workers to end the bringup phase - addrs = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_KIND: "worker"}, ) - addrs += head_ip - for addr in addrs: - lm.update(addr, {"CPU": 2}, {"CPU": 0}, {}) - lm.update(addr, {"CPU": 2}, {"CPU": 2}, {}) - assert autoscaler.bringup + worker_ips = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, ) + for ip in worker_ips: + lm.last_used_time_by_ip[ip] = 0 autoscaler.update() - - assert not autoscaler.bringup - autoscaler.update() - self.waitForNodes(1) - - # All of the nodes are down. Simulate some load on the head node - lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {}) - - autoscaler.update() - self.waitForNodes(6) # expected due to batch sizes and concurrency - autoscaler.update() - self.waitForNodes(11) + self.waitForNodes(1) # only the head node + # Make sure they don't get overwritten. + assert autoscaler.resource_demand_scheduler.node_types[ + NODE_TYPE_LEGACY_HEAD]["resources"] == { + "CPU": 1 + } + assert autoscaler.resource_demand_scheduler.node_types[ + NODE_TYPE_LEGACY_WORKER]["resources"] == { + "CPU": 1 + } def testUnmanagedNodes(self): config = SMALL_CLUSTER.copy() @@ -786,7 +724,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) # 1 CPU task cannot be scheduled. - lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {"CPU": 1}) + lm.update( + unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {}, + waiting_bundles=[{ + "CPU": 1 + }]) autoscaler.update() self.waitForNodes(3) @@ -868,7 +810,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 1 - def testDelayedLaunchWithFailure(self): + def testDelayedLaunchWithMinWorkers(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 10 config["max_workers"] = 10 @@ -892,33 +834,16 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() # Synchronization: wait for launchy thread to be blocked on rtc1 waiters = rtc1._cond._waiters - self.waitFor(lambda: len(waiters) == 1) - assert autoscaler.pending_launches.value == 5 + self.waitFor(lambda: len(waiters) == 2) + assert autoscaler.pending_launches.value == 10 assert len(self.provider.non_terminated_nodes({})) == 0 - - # Call update() to launch a second wave of 3 nodes, - # as 5 + 3 = 8 = max_concurrent_launches. - # Make this wave complete immediately. - rtc2 = threading.Event() - self.provider.ready_to_create = rtc2 - rtc2.set() autoscaler.update() - self.waitForNodes(3) - assert autoscaler.pending_launches.value == 5 - - # The first wave of 5 will now tragically fail - self.provider.fail_creates = True + self.waitForNodes(0) # Nodes are not added on top of pending. rtc1.set() self.waitFor(lambda: autoscaler.pending_launches.value == 0) - assert len(self.provider.non_terminated_nodes({})) == 3 - - # Retry the first wave, allowing it to succeed this time - self.provider.fail_creates = False - autoscaler.update() - self.waitForNodes(8) + assert len(self.provider.non_terminated_nodes({})) == 10 + self.waitForNodes(10) assert autoscaler.pending_launches.value == 0 - - # Final wave of 2 nodes autoscaler.update() self.waitForNodes(10) assert autoscaler.pending_launches.value == 0 @@ -971,9 +896,10 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() + lm = LoadMetrics() autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + lm, max_launch_batch=10, max_concurrent_launches=10, process_runner=runner, @@ -983,7 +909,7 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(2) # Write a corrupted config - self.write_config("asdf") + self.write_config("asdf", call_prepare_config=False) for _ in range(10): autoscaler.update() time.sleep(0.1) @@ -995,6 +921,11 @@ class AutoscalingTest(unittest.TestCase): new_config["min_workers"] = 10 new_config["max_workers"] = 10 self.write_config(new_config) + worker_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0] + # Because one worker already started, the scheduler waits for its + # resources to be updated before it launches the remaining min_workers. + lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {}) autoscaler.update() self.waitForNodes(10) @@ -1120,11 +1051,24 @@ class AutoscalingTest(unittest.TestCase): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1 + lm.update( + local_ip, {"CPU": 2}, {"CPU": 0}, {}, + waiting_bundles=2 * [{ + "CPU": 2 + }]) # head + autoscaler.update() + lm.update( + "172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}, + waiting_bundles=2 * [{ + "CPU": 2 + }]) autoscaler.update() self.waitForNodes(3) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update( + "172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}, + waiting_bundles=3 * [{ + "CPU": 2 + }]) autoscaler.update() self.waitForNodes(5) @@ -1139,6 +1083,7 @@ class AutoscalingTest(unittest.TestCase): lm.last_used_time_by_ip["172.0.0.0"] = 0 lm.last_used_time_by_ip["172.0.0.1"] = 0 autoscaler.update() + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 3 lm.last_used_time_by_ip["172.0.0.2"] = 0 @@ -1147,11 +1092,11 @@ class AutoscalingTest(unittest.TestCase): assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 1 - def testDontScaleBelowTarget(self): + def testTargetUtilizationFraction(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 0 - config["max_workers"] = 2 - config["target_utilization_fraction"] = 0.5 + config["max_workers"] = 20 + config["upscaling_speed"] = 10 config_path = self.write_config(config) self.provider = MockProvider() lm = LoadMetrics() @@ -1166,25 +1111,49 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 0 - - # Scales up as nodes are reported as used - local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head - # 1.0 nodes used => target nodes = 2 => target workers = 1 + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) + head_ip = self.provider.non_terminated_node_ips({})[0] + lm.local_ip = head_ip + lm.update( + head_ip, {"CPU": 2}, {"CPU": 1}, {}, waiting_bundles=[{ + "CPU": 1 + }]) # head + # The headnode should be sufficient for now autoscaler.update() self.waitForNodes(1) - # Make new node idle, and never used. - # Should hold steady as target is still 2. - lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {}) - lm.last_used_time_by_ip["172.0.0.0"] = 0 + # Requires 1 more worker as the head node is fully used. + lm.update( + head_ip, {"CPU": 2}, {"CPU": 0}, {}, waiting_bundles=[{ + "CPU": 1 + }]) autoscaler.update() - assert len(self.provider.non_terminated_nodes({})) == 1 + self.waitForNodes(2) # 1 worker is added to get its resources. + worker_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0] + lm.update( + worker_ip, {"CPU": 1}, {"CPU": 1}, {}, + waiting_bundles=[{ + "CPU": 1 + }] * 7, + infeasible_bundles=[{ + "CPU": 1 + }] * 4) + # Add another 10 workers (frac=1/0.1=10, 1 worker running, 10*1=10) + # and bypass constraint of 5 due to target utiization fraction. + autoscaler.update() + self.waitForNodes(12) - # Reduce load on head => target nodes = 1 => target workers = 0 - lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {}) + worker_ips = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, ) + for ip in worker_ips: + lm.last_used_time_by_ip[ip] = 0 autoscaler.update() - assert len(self.provider.non_terminated_nodes({})) == 0 + self.waitForNodes(1) # only the head node + assert len(self.provider.non_terminated_nodes({})) == 1 def testRecoverUnhealthyWorkers(self): config_path = self.write_config(SMALL_CLUSTER) @@ -1243,7 +1212,10 @@ class AutoscalingTest(unittest.TestCase): "type": "external", "module": "does-not-exist", } - invalid_provider = self.write_config(config) + with pytest.raises(ValueError): + invalid_provider = self.write_config( + config, call_prepare_config=True) + invalid_provider = self.write_config(config, call_prepare_config=False) with pytest.raises(ValueError): StandardAutoscaler( invalid_provider, LoadMetrics(), update_interval_s=0) diff --git a/python/ray/tests/test_cli_patterns/test_ray_attach.txt b/python/ray/tests/test_cli_patterns/test_ray_attach.txt index f895234a1..e743b90cd 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_attach.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_attach.txt @@ -1,5 +1,3 @@ -Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. -Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ ubuntu@ip-.+:~\$ exit diff --git a/python/ray/tests/test_cli_patterns/test_ray_exec.txt b/python/ray/tests/test_cli_patterns/test_ray_exec.txt index 1720747dd..975ba3b52 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_exec.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_exec.txt @@ -1,5 +1,3 @@ -Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. -Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ This is a test! diff --git a/python/ray/tests/test_cli_patterns/test_ray_submit.txt b/python/ray/tests/test_cli_patterns/test_ray_submit.txt index 61d3e722e..efc900092 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_submit.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_submit.txt @@ -1,9 +1,5 @@ -Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. -Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ -Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. -Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings Fetched IP: .+ This is a test! diff --git a/python/ray/tests/test_cli_patterns/test_ray_up.txt b/python/ray/tests/test_cli_patterns/test_ray_up.txt index 7e52609b3..48bc59c71 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up.txt @@ -1,7 +1,5 @@ Cluster: test-cli -Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. -Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. Checking AWS environment settings AWS config IAM Profile: .+ \[default\] diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt index 5370014ad..2e70f7aa6 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt @@ -1,6 +1,4 @@ .+\.py.*Cluster: test-cli -.+\.py.*Updating the resources of ray-legacy-head-node-type to {'CPU': 1}. -.+\.py.*Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}. .+\.py.*Checking AWS environment settings .+\.py.*Creating new IAM instance profile ray-autoscaler-v1 for use as the default\. .+\.py.*Creating new IAM role ray-autoscaler-v1 for use as the default instance role\. diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 57c4f4a4c..7a4eeb0e6 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -78,23 +78,23 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): monitor.process_messages() resource_usage = monitor.load_metrics._get_resource_usage() + if "memory" in resource_usage[0]: + del resource_usage[0]["memory"] + if "object_store_memory" in resource_usage[1]: + del resource_usage[0]["object_store_memory"] if "memory" in resource_usage[1]: del resource_usage[1]["memory"] - if "object_store_memory" in resource_usage[2]: + if "object_store_memory" in resource_usage[1]: del resource_usage[1]["object_store_memory"] - if "memory" in resource_usage[2]: - del resource_usage[2]["memory"] - if "object_store_memory" in resource_usage[2]: - del resource_usage[2]["object_store_memory"] + for key in list(resource_usage[0].keys()): + if key.startswith("node:"): + del resource_usage[0][key] for key in list(resource_usage[1].keys()): if key.startswith("node:"): del resource_usage[1][key] - for key in list(resource_usage[2].keys()): - if key.startswith("node:"): - del resource_usage[2][key] if expected_resource_usage is None: - if all(x for x in resource_usage[1:]): + if all(x for x in resource_usage[0:]): break elif all(x == y for x, y in zip(resource_usage, expected_resource_usage)): @@ -125,7 +125,7 @@ def test_heartbeats_single(ray_start_cluster_head): cluster = ray_start_cluster_head monitor = setup_monitor(cluster.address) total_cpus = ray.state.cluster_resources()["CPU"] - verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": total_cpus})) + verify_load_metrics(monitor, ({"CPU": 0.0}, {"CPU": total_cpus})) @ray.remote def work(signal): @@ -139,11 +139,7 @@ def test_heartbeats_single(ray_start_cluster_head): signal = SignalActor.remote() work_handle = work.remote(signal) - verify_load_metrics(monitor, (1.0 / total_cpus, { - "CPU": 1.0 - }, { - "CPU": total_cpus - })) + verify_load_metrics(monitor, ({"CPU": 1.0}, {"CPU": total_cpus})) ray.get(signal.send.remote()) ray.get(work_handle) @@ -163,11 +159,7 @@ def test_heartbeats_single(ray_start_cluster_head): test_actor = Actor.remote() work_handle = test_actor.work.remote(signal) - verify_load_metrics(monitor, (1.0 / total_cpus, { - "CPU": 1.0 - }, { - "CPU": total_cpus - })) + verify_load_metrics(monitor, ({"CPU": 1.0}, {"CPU": total_cpus})) ray.get(signal.send.remote()) ray.get(work_handle) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index b305edca9..ccaa57022 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -1025,7 +1025,10 @@ class AutoscalingTest(unittest.TestCase): "empty_node") def testScaleUpMinSanity(self): - config_path = self.write_config(MULTI_WORKER_CLUSTER) + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"]["m4.large"]["min_workers"] = \ + config["min_workers"] + config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() autoscaler = StandardAutoscaler( @@ -1109,11 +1112,9 @@ class AutoscalingTest(unittest.TestCase): def testScaleUpMinWorkers(self): config = copy.deepcopy(MULTI_WORKER_CLUSTER) - config["min_workers"] = 2 config["max_workers"] = 50 config["idle_timeout_minutes"] = 1 - # Since config["min_workers"] > 1, the remaining worker is started - # with the default worker node type. + config["available_node_types"]["m4.large"]["min_workers"] = 1 config["available_node_types"]["p2.8xlarge"]["min_workers"] = 1 config_path = self.write_config(config) self.provider = MockProvider() @@ -1467,7 +1468,9 @@ class AutoscalingTest(unittest.TestCase): "p2x_image:nightly") def testUpdateConfig(self): - config = MULTI_WORKER_CLUSTER.copy() + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"]["m4.large"]["min_workers"] = \ + config["min_workers"] config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() @@ -1480,7 +1483,7 @@ class AutoscalingTest(unittest.TestCase): assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(2) - config["min_workers"] = 0 + config["available_node_types"]["m4.large"]["min_workers"] = 0 config["available_node_types"]["m4.large"]["node_config"][ "field_changed"] = 1 config_path = self.write_config(config)