mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 04:27:27 +08:00
[autoscaler] Remove legacy autoscaler (#11802)
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
from collections import defaultdict, namedtuple
|
||||
from typing import Any, Optional, Dict
|
||||
from typing import Any, Optional, Dict, List
|
||||
import copy
|
||||
import logging
|
||||
import math
|
||||
import numpy as np
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
@@ -56,7 +55,7 @@ class StandardAutoscaler:
|
||||
cluster size.
|
||||
|
||||
StandardAutoscaler is also used to bootstrap clusters (by adding workers
|
||||
until the target cluster size is met).
|
||||
until the cluster size that can handle the resource demand is met).
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
@@ -71,6 +70,7 @@ class StandardAutoscaler:
|
||||
# Keep this before self.reset (self.provider needs to be created
|
||||
# exactly once).
|
||||
self.provider = None
|
||||
self.resource_demand_scheduler = None
|
||||
self.reset(errors_fatal=True)
|
||||
self.load_metrics = load_metrics
|
||||
|
||||
@@ -86,7 +86,6 @@ class StandardAutoscaler:
|
||||
self.num_failures = 0
|
||||
self.last_update_time = 0.0
|
||||
self.update_interval_s = update_interval_s
|
||||
self.bringup = True
|
||||
|
||||
# Node launchers
|
||||
self.launch_queue = queue.Queue()
|
||||
@@ -115,8 +114,6 @@ class StandardAutoscaler:
|
||||
for local_path in self.config["file_mounts"].values():
|
||||
assert os.path.exists(local_path)
|
||||
|
||||
# Aggregate resources the user is requesting of the cluster.
|
||||
self.resource_requests = defaultdict(int)
|
||||
# List of resource bundles the user is requesting of the cluster.
|
||||
self.resource_demand_vector = []
|
||||
|
||||
@@ -148,21 +145,12 @@ class StandardAutoscaler:
|
||||
|
||||
self.last_update_time = now
|
||||
nodes = self.workers()
|
||||
# Check pending nodes immediately after fetching the number of running
|
||||
# nodes to minimize chance number of pending nodes changing after
|
||||
# additional nodes (managed and unmanaged) are launched.
|
||||
num_pending = self.pending_launches.value
|
||||
|
||||
self.load_metrics.prune_active_ips([
|
||||
self.provider.internal_ip(node_id)
|
||||
for node_id in self.all_workers()
|
||||
])
|
||||
target_workers = self.target_num_workers()
|
||||
|
||||
if len(nodes) >= target_workers:
|
||||
if "CPU" in self.resource_requests:
|
||||
del self.resource_requests["CPU"]
|
||||
|
||||
self.log_info_string(nodes, target_workers)
|
||||
self.log_info_string(nodes)
|
||||
|
||||
# Terminate any idle or out of date nodes
|
||||
last_used = self.load_metrics.last_used_time_by_ip
|
||||
@@ -170,16 +158,19 @@ class StandardAutoscaler:
|
||||
|
||||
nodes_to_terminate = []
|
||||
node_type_counts = collections.defaultdict(int)
|
||||
for node_id in nodes:
|
||||
# Sort based on last used to make sure to keep min_workers that
|
||||
# were most recently used. Otherwise, _keep_min_workers_of_node_type
|
||||
# might keep a node that should be terminated.
|
||||
for node_id in self._sort_based_on_last_used(nodes, last_used):
|
||||
# Make sure to not kill idle node types if the number of workers
|
||||
# of that type is lower/equal to the min_workers of that type.
|
||||
if self._keep_min_worker_of_node_type(node_id, node_type_counts):
|
||||
if self._keep_min_worker_of_node_type(
|
||||
node_id,
|
||||
node_type_counts) and self.launch_config_ok(node_id):
|
||||
continue
|
||||
|
||||
node_ip = self.provider.internal_ip(node_id)
|
||||
if (node_ip in last_used and last_used[node_ip] < horizon) and \
|
||||
(len(nodes) - len(nodes_to_terminate)
|
||||
> target_workers):
|
||||
if node_ip in last_used and last_used[node_ip] < horizon:
|
||||
logger.info("StandardAutoscaler: "
|
||||
"{}: Terminating idle node".format(node_id))
|
||||
nodes_to_terminate.append(node_id)
|
||||
@@ -191,7 +182,7 @@ class StandardAutoscaler:
|
||||
if nodes_to_terminate:
|
||||
self.provider.terminate_nodes(nodes_to_terminate)
|
||||
nodes = self.workers()
|
||||
self.log_info_string(nodes, target_workers)
|
||||
self.log_info_string(nodes)
|
||||
|
||||
# Terminate nodes if there are too many
|
||||
nodes_to_terminate = []
|
||||
@@ -205,37 +196,21 @@ class StandardAutoscaler:
|
||||
if nodes_to_terminate:
|
||||
self.provider.terminate_nodes(nodes_to_terminate)
|
||||
nodes = self.workers()
|
||||
self.log_info_string(nodes, target_workers)
|
||||
|
||||
# First let the resource demand scheduler launch nodes, if enabled.
|
||||
if self.resource_demand_scheduler:
|
||||
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
|
||||
self.provider.non_terminated_nodes(tag_filters={}),
|
||||
self.pending_launches.breakdown(),
|
||||
self.load_metrics.get_resource_demand_vector(),
|
||||
self.load_metrics.get_resource_utilization(),
|
||||
self.load_metrics.get_pending_placement_groups(),
|
||||
self.load_metrics.get_static_node_resources_by_ip(),
|
||||
ensure_min_cluster_size=self.resource_demand_vector)
|
||||
for node_type, count in to_launch.items():
|
||||
self.launch_new_node(count, node_type=node_type)
|
||||
self.log_info_string(nodes)
|
||||
|
||||
num_pending = self.pending_launches.value
|
||||
nodes = self.workers()
|
||||
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
|
||||
self.provider.non_terminated_nodes(tag_filters={}),
|
||||
self.pending_launches.breakdown(),
|
||||
self.load_metrics.get_resource_demand_vector(),
|
||||
self.load_metrics.get_resource_utilization(),
|
||||
self.load_metrics.get_pending_placement_groups(),
|
||||
self.load_metrics.get_static_node_resources_by_ip(),
|
||||
ensure_min_cluster_size=self.resource_demand_vector)
|
||||
for node_type, count in to_launch.items():
|
||||
self.launch_new_node(count, node_type=node_type)
|
||||
|
||||
# Launch additional nodes of the default type, if still needed.
|
||||
num_workers = len(nodes) + num_pending
|
||||
max_allowed = min(self.max_launch_batch,
|
||||
self.max_concurrent_launches - num_pending)
|
||||
if num_workers < target_workers and max_allowed > 0:
|
||||
num_launches = min(max_allowed, target_workers - num_workers)
|
||||
self.launch_new_node(num_launches,
|
||||
self.config.get("worker_default_node_type"))
|
||||
nodes = self.workers()
|
||||
self.log_info_string(nodes, target_workers)
|
||||
elif self.load_metrics.num_workers_connected() >= target_workers:
|
||||
self.bringup = False
|
||||
self.log_info_string(nodes, target_workers)
|
||||
nodes = self.workers()
|
||||
|
||||
# Process any completed updates
|
||||
completed = []
|
||||
@@ -253,7 +228,7 @@ class StandardAutoscaler:
|
||||
# immediately trying to restart Ray on the new node.
|
||||
self.load_metrics.mark_active(self.provider.internal_ip(node_id))
|
||||
nodes = self.workers()
|
||||
self.log_info_string(nodes, target_workers)
|
||||
self.log_info_string(nodes)
|
||||
|
||||
# Update nodes with out-of-date files.
|
||||
# TODO(edoakes): Spawning these threads directly seems to cause
|
||||
@@ -279,6 +254,25 @@ class StandardAutoscaler:
|
||||
for node_id in nodes:
|
||||
self.recover_if_needed(node_id, now)
|
||||
|
||||
def _sort_based_on_last_used(self, nodes: List[NodeID],
|
||||
last_used: Dict[str, float]) -> List[NodeID]:
|
||||
"""Sort the nodes based on the last time they were used.
|
||||
|
||||
The first item in the return list is the least recently used.
|
||||
"""
|
||||
updated_last_used = copy.deepcopy(last_used)
|
||||
now = time.time()
|
||||
for node_id in nodes:
|
||||
node_ip = self.provider.internal_ip(node_id)
|
||||
if node_ip not in updated_last_used:
|
||||
updated_last_used[node_ip] = now
|
||||
|
||||
def last_time_used(node_id: NodeID):
|
||||
node_ip = self.provider.internal_ip(node_id)
|
||||
return updated_last_used[node_ip]
|
||||
|
||||
return sorted(nodes, key=last_time_used, reverse=True)
|
||||
|
||||
def _keep_min_worker_of_node_type(self, node_id: NodeID,
|
||||
node_type_counts: Dict[NodeType, int]):
|
||||
"""Returns if workers of node_type should be terminated.
|
||||
@@ -293,15 +287,16 @@ class StandardAutoscaler:
|
||||
Returns:
|
||||
bool: if workers of node_types should be terminated or not.
|
||||
"""
|
||||
if self.resource_demand_scheduler:
|
||||
tags = self.provider.node_tags(node_id)
|
||||
if TAG_RAY_USER_NODE_TYPE in tags:
|
||||
node_type = tags[TAG_RAY_USER_NODE_TYPE]
|
||||
node_type_counts[node_type] += 1
|
||||
min_workers = self.available_node_types[node_type].get(
|
||||
"min_workers", 0)
|
||||
if node_type_counts[node_type] <= min_workers:
|
||||
return True
|
||||
tags = self.provider.node_tags(node_id)
|
||||
if TAG_RAY_USER_NODE_TYPE in tags:
|
||||
node_type = tags[TAG_RAY_USER_NODE_TYPE]
|
||||
node_type_counts[node_type] += 1
|
||||
min_workers = self.available_node_types[node_type].get(
|
||||
"min_workers", 0)
|
||||
max_workers = self.available_node_types[node_type].get(
|
||||
"max_workers", 0)
|
||||
if node_type_counts[node_type] <= min(min_workers, max_workers):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@@ -349,15 +344,41 @@ class StandardAutoscaler:
|
||||
if not self.provider:
|
||||
self.provider = _get_node_provider(self.config["provider"],
|
||||
self.config["cluster_name"])
|
||||
# Check whether we can enable the resource demand scheduler.
|
||||
if "available_node_types" in self.config:
|
||||
self.available_node_types = self.config["available_node_types"]
|
||||
|
||||
self.available_node_types = self.config["available_node_types"]
|
||||
upscaling_speed = self.config.get("upscaling_speed")
|
||||
aggressive = self.config.get("autoscaling_mode") == "aggressive"
|
||||
target_utilization_fraction = self.config.get(
|
||||
"target_utilization_fraction")
|
||||
if upscaling_speed:
|
||||
upscaling_speed = float(upscaling_speed)
|
||||
# TODO(ameer): consider adding (if users ask) an option of
|
||||
# initial_upscaling_num_workers.
|
||||
elif aggressive:
|
||||
upscaling_speed = 99999
|
||||
logger.warning(
|
||||
"Legacy aggressive autoscaling mode "
|
||||
"detected. Replacing it by setting upscaling_speed to "
|
||||
"99999.")
|
||||
elif target_utilization_fraction:
|
||||
upscaling_speed = 1 / max(target_utilization_fraction, 0.001)
|
||||
logger.warning(
|
||||
"Legacy target_utilization_fraction config "
|
||||
"detected. Replacing it by setting upscaling_speed to " +
|
||||
"1 / target_utilization_fraction.")
|
||||
else:
|
||||
upscaling_speed = 1.0
|
||||
if self.resource_demand_scheduler:
|
||||
# The node types are autofilled internally for legacy yamls,
|
||||
# overwriting the class will remove the inferred node resources
|
||||
# for legacy yamls.
|
||||
self.resource_demand_scheduler.reset_config(
|
||||
self.provider, self.available_node_types,
|
||||
self.config["max_workers"], upscaling_speed)
|
||||
else:
|
||||
self.resource_demand_scheduler = ResourceDemandScheduler(
|
||||
self.provider, self.available_node_types,
|
||||
self.config["max_workers"])
|
||||
else:
|
||||
self.available_node_types = None
|
||||
self.resource_demand_scheduler = None
|
||||
self.config["max_workers"], upscaling_speed)
|
||||
|
||||
except Exception as e:
|
||||
if errors_fatal:
|
||||
@@ -366,37 +387,6 @@ class StandardAutoscaler:
|
||||
logger.exception("StandardAutoscaler: "
|
||||
"Error parsing config.")
|
||||
|
||||
def target_num_workers(self):
|
||||
target_frac = self.config["target_utilization_fraction"]
|
||||
cur_used = self.load_metrics.approx_workers_used()
|
||||
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
|
||||
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
|
||||
|
||||
initial_workers = self.config["initial_workers"]
|
||||
aggressive = self.config["autoscaling_mode"] == "aggressive"
|
||||
if self.bringup:
|
||||
ideal_num_workers = max(ideal_num_workers, initial_workers)
|
||||
elif aggressive and cur_used > 0:
|
||||
# If we want any workers, we want at least initial_workers
|
||||
ideal_num_workers = max(ideal_num_workers, initial_workers)
|
||||
|
||||
# Other resources are not supported at present.
|
||||
if "CPU" in self.resource_requests:
|
||||
try:
|
||||
cores_per_worker = self.config["worker_nodes"]["Resources"][
|
||||
"CPU"]
|
||||
except KeyError:
|
||||
cores_per_worker = 1 # Assume the worst
|
||||
|
||||
cores_desired = self.resource_requests["CPU"]
|
||||
|
||||
ideal_num_workers = max(
|
||||
ideal_num_workers,
|
||||
int(np.ceil(cores_desired / cores_per_worker)))
|
||||
|
||||
return min(self.config["max_workers"],
|
||||
max(self.config["min_workers"], ideal_num_workers))
|
||||
|
||||
def launch_config_ok(self, node_id):
|
||||
node_tags = self.provider.node_tags(node_id)
|
||||
tag_launch_conf = node_tags.get(TAG_RAY_LAUNCH_CONFIG)
|
||||
@@ -577,50 +567,40 @@ class StandardAutoscaler:
|
||||
return self.provider.non_terminated_nodes(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED})
|
||||
|
||||
def log_info_string(self, nodes, target):
|
||||
def log_info_string(self, nodes):
|
||||
tmp = "Cluster status: "
|
||||
tmp += self.info_string(nodes, target)
|
||||
tmp += self.info_string(nodes)
|
||||
tmp += "\n"
|
||||
tmp += self.load_metrics.info_string()
|
||||
tmp += "\n"
|
||||
if self.resource_demand_scheduler:
|
||||
tmp += self.resource_demand_scheduler.debug_string(
|
||||
nodes, self.pending_launches.breakdown(),
|
||||
self.load_metrics.get_resource_utilization())
|
||||
tmp += self.resource_demand_scheduler.debug_string(
|
||||
nodes, self.pending_launches.breakdown(),
|
||||
self.load_metrics.get_resource_utilization())
|
||||
if _internal_kv_initialized():
|
||||
_internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True)
|
||||
logger.info(tmp)
|
||||
logger.debug(tmp)
|
||||
|
||||
def info_string(self, nodes, target):
|
||||
def info_string(self, nodes):
|
||||
suffix = ""
|
||||
if self.pending_launches:
|
||||
suffix += " ({} pending)".format(self.pending_launches.value)
|
||||
if self.updaters:
|
||||
suffix += " ({} updating)".format(len(self.updaters))
|
||||
if self.num_failed_updates:
|
||||
suffix += " ({} failed to update)".format(
|
||||
len(self.num_failed_updates))
|
||||
if self.bringup:
|
||||
suffix += " (bringup=True)"
|
||||
|
||||
return "{}/{} target nodes{}".format(len(nodes), target, suffix)
|
||||
return "{} nodes{}".format(len(nodes), suffix)
|
||||
|
||||
def request_resources(self, resources):
|
||||
"""Called by monitor to request resources (EXPERIMENTAL).
|
||||
def request_resources(self, resources: List[dict]):
|
||||
"""Called by monitor to request resources.
|
||||
|
||||
Args:
|
||||
resources: Either a list of resource bundles or a single resource
|
||||
demand dictionary.
|
||||
resources: A list of resource bundles.
|
||||
"""
|
||||
if resources:
|
||||
logger.info(
|
||||
"StandardAutoscaler: resource_requests={}".format(resources))
|
||||
if isinstance(resources, list):
|
||||
self.resource_demand_vector = resources
|
||||
else:
|
||||
for resource, count in resources.items():
|
||||
self.resource_requests[resource] = max(
|
||||
self.resource_requests[resource], count)
|
||||
assert isinstance(resources, list), resources
|
||||
self.resource_demand_vector = resources
|
||||
|
||||
def kill_workers(self):
|
||||
logger.error("StandardAutoscaler: kill_workers triggered")
|
||||
|
||||
@@ -302,12 +302,6 @@ class AWSNodeProvider(NodeProvider):
|
||||
tags = to_aws_format(tags)
|
||||
conf = node_config.copy()
|
||||
|
||||
# Delete unsupported keys from the node config
|
||||
try:
|
||||
del conf["Resources"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
tag_pairs = [{
|
||||
"Key": TAG_RAY_CLUSTER_NAME,
|
||||
"Value": self.cluster_name,
|
||||
@@ -515,8 +509,8 @@ class AWSNodeProvider(NodeProvider):
|
||||
available_node_types[node_type].get("resources", {}):
|
||||
available_node_types[node_type][
|
||||
"resources"] = autodetected_resources
|
||||
cli_logger.print("Updating the resources of {} to {}.",
|
||||
node_type, autodetected_resources)
|
||||
logger.debug("Updating the resources of {} to {}.".format(
|
||||
node_type, autodetected_resources))
|
||||
else:
|
||||
raise ValueError("Instance type " + instance_type +
|
||||
" is not available in AWS region: " +
|
||||
|
||||
@@ -99,12 +99,6 @@ class LoadMetrics:
|
||||
prune(self.resource_load_by_ip)
|
||||
prune(self.last_heartbeat_time_by_ip)
|
||||
|
||||
def approx_workers_used(self):
|
||||
return self._info()["NumNodesUsed"]
|
||||
|
||||
def num_workers_connected(self):
|
||||
return self._info()["NumNodesConnected"]
|
||||
|
||||
def get_node_resources(self):
|
||||
"""Return a list of node resources (static resource sizes).
|
||||
|
||||
@@ -128,9 +122,7 @@ class LoadMetrics:
|
||||
|
||||
def _get_resource_usage(self):
|
||||
num_nodes = 0
|
||||
nodes_used = 0.0
|
||||
num_nonidle = 0
|
||||
has_saturated_node = False
|
||||
resources_used = {}
|
||||
resources_total = {}
|
||||
for ip, max_resources in self.static_resources_by_ip.items():
|
||||
@@ -143,7 +135,6 @@ class LoadMetrics:
|
||||
max_frac = 0.0
|
||||
for resource_id, amount in resource_load.items():
|
||||
if amount > 0:
|
||||
has_saturated_node = True
|
||||
max_frac = 1.0 # the resource is saturated
|
||||
for resource_id, amount in max_resources.items():
|
||||
used = amount - avail_resources[resource_id]
|
||||
@@ -157,17 +148,10 @@ class LoadMetrics:
|
||||
frac = used / float(amount)
|
||||
if frac > max_frac:
|
||||
max_frac = frac
|
||||
nodes_used += max_frac
|
||||
if max_frac > 0:
|
||||
num_nonidle += 1
|
||||
|
||||
# If any nodes have a queue buildup, assume all non-idle nodes are 100%
|
||||
# busy, plus the head node. This guards against the case of not scaling
|
||||
# up due to poor task packing.
|
||||
if has_saturated_node:
|
||||
nodes_used = min(num_nonidle + 1.0, num_nodes)
|
||||
|
||||
return nodes_used, resources_used, resources_total
|
||||
return resources_used, resources_total
|
||||
|
||||
def get_resource_demand_vector(self):
|
||||
return self.waiting_bundles + self.infeasible_bundles
|
||||
@@ -180,8 +164,7 @@ class LoadMetrics:
|
||||
["{}: {}".format(k, v) for k, v in sorted(self._info().items())])
|
||||
|
||||
def _info(self):
|
||||
nodes_used, resources_used, resources_total = self._get_resource_usage(
|
||||
)
|
||||
resources_used, resources_total = self._get_resource_usage()
|
||||
|
||||
now = time.time()
|
||||
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
|
||||
@@ -211,8 +194,6 @@ class LoadMetrics:
|
||||
for rid in sorted(resources_used)
|
||||
if not rid.startswith("node:")
|
||||
]),
|
||||
"NumNodesConnected": len(self.static_resources_by_ip),
|
||||
"NumNodesUsed": round(nodes_used, 2),
|
||||
"NodeIdleSeconds": "Min={} Mean={} Max={}".format(
|
||||
int(np.min(idle_times)) if idle_times else -1,
|
||||
int(np.mean(idle_times)) if idle_times else -1,
|
||||
|
||||
@@ -17,12 +17,15 @@ from typing import List, Dict
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.gcs_utils import PlacementGroupTableData
|
||||
from ray.core.generated.common_pb2 import PlacementStrategy
|
||||
from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED,
|
||||
NODE_TYPE_LEGACY_WORKER, NODE_KIND_WORKER,
|
||||
NODE_TYPE_LEGACY_HEAD, TAG_RAY_NODE_KIND)
|
||||
from ray.autoscaler.tags import (
|
||||
TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, NODE_TYPE_LEGACY_WORKER,
|
||||
NODE_KIND_WORKER, NODE_TYPE_LEGACY_HEAD, TAG_RAY_NODE_KIND, NODE_KIND_HEAD)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# The minimum number of nodes to launch concurrently.
|
||||
UPSCALING_INITIAL_NUM_NODES = 5
|
||||
|
||||
# e.g., cpu_4_ondemand.
|
||||
NodeType = str
|
||||
|
||||
@@ -40,21 +43,69 @@ NodeIP = str
|
||||
|
||||
|
||||
class ResourceDemandScheduler:
|
||||
def __init__(self, provider: NodeProvider,
|
||||
def __init__(self,
|
||||
provider: NodeProvider,
|
||||
node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
max_workers: int):
|
||||
max_workers: int,
|
||||
upscaling_speed: float = 1) -> None:
|
||||
self.provider = provider
|
||||
self.node_types = copy.deepcopy(node_types)
|
||||
self.max_workers = max_workers
|
||||
# is_legacy_yaml tracks if the cluster configs was originally without
|
||||
# available_node_types and was autofilled with available_node_types.
|
||||
self.is_legacy_yaml = (NODE_TYPE_LEGACY_HEAD in node_types
|
||||
and NODE_TYPE_LEGACY_WORKER in node_types)
|
||||
self.upscaling_speed = upscaling_speed
|
||||
|
||||
def reset_config(self,
|
||||
provider: NodeProvider,
|
||||
node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
max_workers: int,
|
||||
upscaling_speed: float = 1) -> None:
|
||||
"""Updates the class state variables.
|
||||
|
||||
For legacy yamls, it merges previous state and new state to make sure
|
||||
inferered resources are not lost.
|
||||
"""
|
||||
new_node_types = copy.deepcopy(node_types)
|
||||
final_node_types = new_node_types
|
||||
if self.is_legacy_yaml(new_node_types): # If new configs are legacy.
|
||||
if self.is_legacy_yaml(): # If old configs were legacy.
|
||||
|
||||
def _update_based_on_node_config(node_type: NodeType) -> None:
|
||||
if self.node_types[node_type][
|
||||
"node_config"] == new_node_types[node_type][
|
||||
"node_config"]: # If node config didnt change.
|
||||
if self.node_types[node_type]["resources"]:
|
||||
# If we already know the resources, do not
|
||||
# overwrite them. This helps also if in legacy
|
||||
# yamls the user provides "resources" field.
|
||||
del new_node_types[node_type]["resources"]
|
||||
self.node_types[node_type].update(
|
||||
new_node_types[node_type])
|
||||
else:
|
||||
self.node_types[node_type] = new_node_types[node_type]
|
||||
|
||||
_update_based_on_node_config(NODE_TYPE_LEGACY_HEAD)
|
||||
_update_based_on_node_config(NODE_TYPE_LEGACY_WORKER)
|
||||
final_node_types = self.node_types
|
||||
|
||||
self.provider = provider
|
||||
self.node_types = copy.deepcopy(final_node_types)
|
||||
self.max_workers = max_workers
|
||||
self.upscaling_speed = upscaling_speed
|
||||
|
||||
def is_legacy_yaml(self,
|
||||
node_types: Dict[NodeType, NodeTypeConfigDict] = None
|
||||
) -> bool:
|
||||
"""Returns if the node types came from a legacy yaml.
|
||||
|
||||
A legacy yaml is one that was originally without available_node_types
|
||||
and was autofilled with available_node_types."""
|
||||
node_types = node_types or self.node_types
|
||||
return (NODE_TYPE_LEGACY_HEAD in node_types
|
||||
and NODE_TYPE_LEGACY_WORKER in node_types)
|
||||
|
||||
def get_nodes_to_launch(
|
||||
self,
|
||||
nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeType, int],
|
||||
launching_nodes: Dict[NodeType, int],
|
||||
resource_demands: List[ResourceDict],
|
||||
unused_resources_by_ip: Dict[NodeIP, ResourceDict],
|
||||
pending_placement_groups: List[PlacementGroupTableData],
|
||||
@@ -75,7 +126,7 @@ class ResourceDemandScheduler:
|
||||
|
||||
Args:
|
||||
nodes: List of existing nodes in the cluster.
|
||||
pending_nodes: Summary of node types currently being launched.
|
||||
launching_nodes: Summary of node types currently being launched.
|
||||
resource_demands: Vector of resource demands from the scheduler.
|
||||
unused_resources_by_ip: Mapping from ip to available resources.
|
||||
pending_placement_groups: Placement group demands.
|
||||
@@ -102,7 +153,7 @@ class ResourceDemandScheduler:
|
||||
else:
|
||||
resource_requests = []
|
||||
|
||||
if self.is_legacy_yaml:
|
||||
if self.is_legacy_yaml():
|
||||
# When using legacy yaml files we need to infer the head & worker
|
||||
# node resources from the static node resources from LoadMetrics.
|
||||
self._infer_legacy_node_resources_if_needed(max_resources_by_ip)
|
||||
@@ -110,7 +161,7 @@ class ResourceDemandScheduler:
|
||||
node_resources: List[ResourceDict]
|
||||
node_type_counts: Dict[NodeType, int]
|
||||
node_resources, node_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes, unused_resources_by_ip)
|
||||
nodes, launching_nodes, unused_resources_by_ip)
|
||||
|
||||
logger.info("Cluster resources: {}".format(node_resources))
|
||||
logger.info("Node counts: {}".format(node_type_counts))
|
||||
@@ -125,12 +176,12 @@ class ResourceDemandScheduler:
|
||||
placement_groups_to_resource_demands(pending_placement_groups)
|
||||
resource_demands.extend(placement_group_demand_vector)
|
||||
|
||||
if self.is_legacy_yaml and \
|
||||
if self.is_legacy_yaml() and \
|
||||
not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
|
||||
# Need to launch worker nodes to later infer their
|
||||
# resources.
|
||||
return self._legacy_worker_node_to_launch(
|
||||
nodes, pending_nodes, node_resources, resource_demands)
|
||||
nodes, launching_nodes, node_resources, resource_demands)
|
||||
placement_group_nodes_to_add, node_resources, node_type_counts = \
|
||||
self.reserve_and_allocate_spread(
|
||||
strict_spreads, node_resources, node_type_counts)
|
||||
@@ -164,13 +215,13 @@ class ResourceDemandScheduler:
|
||||
# Limit the number of concurrent launches
|
||||
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch(
|
||||
total_nodes_to_add, unused_resources_by_ip.keys(), nodes,
|
||||
pending_nodes, nodes_to_add_based_on_requests)
|
||||
launching_nodes, nodes_to_add_based_on_requests)
|
||||
|
||||
logger.info("Node requests: {}".format(total_nodes_to_add))
|
||||
return total_nodes_to_add
|
||||
|
||||
def _legacy_worker_node_to_launch(
|
||||
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
|
||||
self, nodes: List[NodeID], launching_nodes: Dict[NodeType, int],
|
||||
node_resources: List[ResourceDict],
|
||||
resource_demands: List[ResourceDict]) -> Dict[NodeType, int]:
|
||||
"""Get worker nodes to launch when resources missing in legacy yamls.
|
||||
@@ -179,22 +230,24 @@ class ResourceDemandScheduler:
|
||||
workers, it returns max(1, min_workers) worker nodes from which we
|
||||
later calculate the node resources.
|
||||
"""
|
||||
worker_nodes = self.provider.non_terminated_nodes(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
|
||||
if self.max_workers == 0:
|
||||
return {}
|
||||
elif pending_nodes or len(nodes) > 1:
|
||||
# If we are already launching a worker node.
|
||||
# If first worker node fails this will never launch more nodes.
|
||||
elif sum(launching_nodes.values()) + len(worker_nodes) > 0:
|
||||
# If we are already launching a worker node, wait for its resources
|
||||
# to be known.
|
||||
# TODO(ameer): Note that if first worker node fails this will never
|
||||
# launch any more nodes.
|
||||
return {}
|
||||
else:
|
||||
unfulfilled, _ = get_bin_pack_residual(node_resources,
|
||||
resource_demands)
|
||||
if self.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] > 0 or \
|
||||
unfulfilled:
|
||||
return {
|
||||
NODE_TYPE_LEGACY_WORKER: max(
|
||||
1, self.node_types[NODE_TYPE_LEGACY_WORKER][
|
||||
"min_workers"])
|
||||
}
|
||||
workers_to_add = min(
|
||||
self.node_types[NODE_TYPE_LEGACY_WORKER].get("min_workers", 0),
|
||||
self.node_types[NODE_TYPE_LEGACY_WORKER].get("max_workers", 0))
|
||||
if workers_to_add > 0 or unfulfilled:
|
||||
return {NODE_TYPE_LEGACY_WORKER: max(1, workers_to_add)}
|
||||
else:
|
||||
return {}
|
||||
|
||||
@@ -210,25 +263,29 @@ class ResourceDemandScheduler:
|
||||
"""
|
||||
# We fill the head node resources only once.
|
||||
if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]:
|
||||
assert len(max_resources_by_ip) == 1 # Only the head node.
|
||||
self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = next(
|
||||
iter(max_resources_by_ip.values()))
|
||||
try:
|
||||
head_ip = self.provider.internal_ip(
|
||||
self.provider.non_terminated_nodes({
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
|
||||
})[0])
|
||||
self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = \
|
||||
copy.deepcopy(max_resources_by_ip[head_ip])
|
||||
except (IndexError, KeyError):
|
||||
logger.exception("Could not reach the head node.")
|
||||
# We fill the worker node resources only once.
|
||||
if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
|
||||
if len(max_resources_by_ip) > 1:
|
||||
# Set the node_types here as we already launched a worker node
|
||||
# from which we directly get the node_resources.
|
||||
worker_nodes = self.provider.non_terminated_nodes(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
|
||||
worker_node_ips = [
|
||||
self.provider.internal_ip(node_id)
|
||||
for node_id in worker_nodes
|
||||
]
|
||||
for ip in worker_node_ips:
|
||||
if ip in max_resources_by_ip:
|
||||
self.node_types[NODE_TYPE_LEGACY_WORKER][
|
||||
"resources"] = max_resources_by_ip[ip]
|
||||
assert self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]
|
||||
# Set the node_types here in case we already launched a worker node
|
||||
# from which we can directly get the node_resources.
|
||||
worker_nodes = self.provider.non_terminated_nodes(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
|
||||
worker_node_ips = [
|
||||
self.provider.internal_ip(node_id) for node_id in worker_nodes
|
||||
]
|
||||
for ip in worker_node_ips:
|
||||
if ip in max_resources_by_ip:
|
||||
self.node_types[NODE_TYPE_LEGACY_WORKER][
|
||||
"resources"] = copy.deepcopy(max_resources_by_ip[ip])
|
||||
break
|
||||
|
||||
def _get_concurrent_resource_demand_to_launch(
|
||||
self,
|
||||
@@ -247,7 +304,8 @@ class ResourceDemandScheduler:
|
||||
1) Calculates the running nodes.
|
||||
2) Calculates the pending nodes and gets the launching nodes.
|
||||
3) Limits the total number of pending + currently-launching +
|
||||
to-be-launched nodes to max(5, frac * running_nodes[node_type]).
|
||||
to-be-launched nodes to:
|
||||
max(5, self.upscaling_speed * running_nodes[node_type]).
|
||||
|
||||
Args:
|
||||
to_launch: List of number of nodes to launch based on resource
|
||||
@@ -262,8 +320,6 @@ class ResourceDemandScheduler:
|
||||
Dict[NodeType, int]: Maximum number of nodes to launch for each
|
||||
node type.
|
||||
"""
|
||||
# TODO(ameer): Consider making frac configurable.
|
||||
frac = 1
|
||||
updated_nodes_to_launch = {}
|
||||
running_nodes, pending_nodes = \
|
||||
self._separate_running_and_pending_nodes(
|
||||
@@ -273,7 +329,8 @@ class ResourceDemandScheduler:
|
||||
# Enforce here max allowed pending nodes to be frac of total
|
||||
# running nodes.
|
||||
max_allowed_pending_nodes = max(
|
||||
5, int(frac * running_nodes[node_type]))
|
||||
UPSCALING_INITIAL_NUM_NODES,
|
||||
int(self.upscaling_speed * running_nodes[node_type]))
|
||||
total_pending_nodes = pending_launches_nodes.get(
|
||||
node_type, 0) + pending_nodes[node_type]
|
||||
|
||||
@@ -502,7 +559,8 @@ def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
utilization_scores = []
|
||||
for node_type in node_types:
|
||||
if (existing_nodes.get(node_type, 0) + nodes_to_add.get(
|
||||
node_type, 0) >= node_types[node_type]["max_workers"]):
|
||||
node_type, 0) >= node_types[node_type].get(
|
||||
"max_workers", 0)):
|
||||
continue
|
||||
node_resources = node_types[node_type]["resources"]
|
||||
if strict_spread:
|
||||
|
||||
@@ -101,28 +101,29 @@ def prepare_config(config):
|
||||
|
||||
def rewrite_legacy_yaml_to_available_node_types(
|
||||
config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if "available_node_types" in config:
|
||||
return config
|
||||
else:
|
||||
|
||||
if "available_node_types" not in config:
|
||||
# TODO(ameer/ekl/alex): we can also rewrite here many other fields
|
||||
# that include initialization/setup/start commands and ImageId.
|
||||
logger.debug("Converting legacy cluster config to multi node types.")
|
||||
config["available_node_types"] = {
|
||||
NODE_TYPE_LEGACY_HEAD: {
|
||||
"node_config": config["head_node"],
|
||||
"resources": {},
|
||||
"resources": config["head_node"].get("resources") or {},
|
||||
"min_workers": 0,
|
||||
"max_workers": 0,
|
||||
},
|
||||
NODE_TYPE_LEGACY_WORKER: {
|
||||
"node_config": config["worker_nodes"],
|
||||
"resources": {},
|
||||
"min_workers": config["min_workers"],
|
||||
"max_workers": config["max_workers"],
|
||||
"resources": config["worker_nodes"].get("resources") or {},
|
||||
"min_workers": config.get("min_workers", 0),
|
||||
"max_workers": config.get("max_workers", 0),
|
||||
},
|
||||
}
|
||||
config["head_node_type"] = NODE_TYPE_LEGACY_HEAD
|
||||
config["worker_default_node_type"] = NODE_TYPE_LEGACY_WORKER
|
||||
return config
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
|
||||
@@ -9,28 +9,17 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
# Empty string means disabled.
|
||||
docker: {}
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This max value allowed is 1.0, which is the most conservative setting.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,12 +9,11 @@ min_workers: 2
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -37,13 +33,6 @@ docker:
|
||||
# worker_image: "rayproject/ray:latest-cpu"
|
||||
# worker_run_options: []
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This max value allowed is 1.0, which is the most conservative setting.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -31,13 +27,6 @@ docker:
|
||||
|
||||
# worker_image: "rayproject/ray:latest"
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -14,15 +14,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 0
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -40,13 +36,6 @@ docker:
|
||||
|
||||
# worker_image: "rayproject/ray:latest"
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This max value allowed is 1.0, which is the most conservative setting.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -3,6 +3,12 @@ cluster_name: multi_node_type
|
||||
min_workers: 1
|
||||
max_workers: 40
|
||||
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# Cloud-provider specific configuration.
|
||||
provider:
|
||||
type: aws
|
||||
@@ -62,8 +68,6 @@ head_node:
|
||||
worker_nodes:
|
||||
ImageId: latest_dlami
|
||||
|
||||
# Configure the cluster for very conservative auto-scaling otherwise.
|
||||
target_utilization_fraction: 1.0
|
||||
idle_timeout_minutes: 2
|
||||
|
||||
# How Ray will authenticate with newly launched nodes.
|
||||
|
||||
@@ -9,28 +9,17 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
# Empty string means disabled.
|
||||
docker: {}
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -37,13 +33,6 @@ docker:
|
||||
# worker_image: "rayproject/ray:latest-cpu"
|
||||
# worker_run_options: []
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -31,13 +27,6 @@ docker:
|
||||
|
||||
# worker_image: "rayproject/ray:latest"
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -35,13 +31,6 @@ docker:
|
||||
|
||||
# worker_image: "rayproject/ray:latest"
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,29 +9,17 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
# Empty string means disabled.
|
||||
docker: {}
|
||||
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -37,14 +33,6 @@ docker:
|
||||
# worker_image: "rayproject/ray:latest-cpu"
|
||||
# worker_run_options: []
|
||||
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,15 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
# and opens all the necessary ports to support the Ray cluster.
|
||||
@@ -32,14 +28,6 @@ docker:
|
||||
|
||||
# worker_image: "rayproject/ray:latest"
|
||||
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
|
||||
@@ -9,22 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
@@ -9,22 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
@@ -9,22 +9,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 1
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 1
|
||||
|
||||
@@ -1,25 +1,24 @@
|
||||
# An unique identifier for the head node and workers of this cluster.
|
||||
cluster_name: default
|
||||
|
||||
## NOTE: Typically for local clusters, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
## NOTE: Typically for local clusters, min_workers == max_workers == len(worker_ips).
|
||||
|
||||
# The minimum number of workers nodes to launch in addition to the head
|
||||
# node. This number should be >= 0.
|
||||
# Typically, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
# Typically, min_workers == max_workers == len(worker_ips).
|
||||
min_workers: 0
|
||||
# The initial number of worker nodes to launch in addition to the head node.
|
||||
# Typically, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
initial_workers: 0
|
||||
|
||||
# The maximum number of workers nodes to launch in addition to the head node.
|
||||
# This takes precedence over min_workers.
|
||||
# Typically, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
# Typically, min_workers == max_workers == len(worker_ips).
|
||||
max_workers: 0
|
||||
|
||||
# Autoscaling parameters.
|
||||
# Ignore this if min_workers == initial_workers == max_workers.
|
||||
autoscaling_mode: default
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
|
||||
@@ -4,7 +4,7 @@ max_workers: 0
|
||||
docker:
|
||||
image: ""
|
||||
container_name: ""
|
||||
target_utilization_fraction: 0.8
|
||||
upscaling_speed: 1.0
|
||||
idle_timeout_minutes: 5
|
||||
provider:
|
||||
type: local
|
||||
|
||||
@@ -1,25 +1,24 @@
|
||||
# An unique identifier for the head node and workers of this cluster.
|
||||
cluster_name: default
|
||||
|
||||
## NOTE: Typically for local clusters, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
## NOTE: Typically for local clusters, min_workers == max_workers == len(worker_ips).
|
||||
|
||||
# The minimum number of workers nodes to launch in addition to the head
|
||||
# node. This number should be >= 0.
|
||||
# Typically, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
# Typically, min_workers == max_workers == len(worker_ips).
|
||||
min_workers: 0
|
||||
# The initial number of worker nodes to launch in addition to the head node.
|
||||
# Typically, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
initial_workers: 0
|
||||
|
||||
# The maximum number of workers nodes to launch in addition to the head node.
|
||||
# This takes precedence over min_workers.
|
||||
# Typically, min_workers == initial_workers == max_workers == len(worker_ips).
|
||||
# Typically, min_workers == max_workers == len(worker_ips).
|
||||
max_workers: 0
|
||||
|
||||
# Autoscaling parameters.
|
||||
# Ignore this if min_workers == initial_workers == max_workers.
|
||||
autoscaling_mode: default
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
# This executes all commands on all nodes in the docker container,
|
||||
|
||||
@@ -49,6 +49,11 @@
|
||||
"minimum": 0,
|
||||
"maximum": 1
|
||||
},
|
||||
"upscaling_speed": {
|
||||
"description": "The autoscaler will scale up the cluster faster with higher upscaling speed. E.g., if the task requires adding more nodes then autoscaler will gradually scale up the cluster in chunks of upscaling_speed*currently_running_nodes. This number should be > 0.",
|
||||
"type": "number",
|
||||
"minimum": 0
|
||||
},
|
||||
"idle_timeout_minutes": {
|
||||
"description": "If a node is idle for this many minutes, it will be removed.",
|
||||
"type": "integer",
|
||||
|
||||
@@ -10,22 +10,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 2
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
@@ -10,22 +10,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 5
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
@@ -10,22 +10,11 @@ min_workers: 0
|
||||
# node. This takes precedence over min_workers.
|
||||
max_workers: 5
|
||||
|
||||
# The initial number of worker nodes to launch in addition to the head
|
||||
# node. When the cluster is first brought up (or when it is refreshed with a
|
||||
# subsequent `ray up`) this number of nodes will be started.
|
||||
initial_workers: 0
|
||||
|
||||
# Whether or not to autoscale aggressively. If this is enabled, if at any point
|
||||
# we would start more workers, we start at least enough to bring us to
|
||||
# initial_workers.
|
||||
autoscaling_mode: default
|
||||
|
||||
# The autoscaler will scale up the cluster to this target fraction of resource
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
target_utilization_fraction: 0.8
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
@@ -3,6 +3,12 @@ cluster_name: multi-node-type # name with 'a-z' and '-'
|
||||
min_workers: 1
|
||||
max_workers: 40
|
||||
|
||||
# The autoscaler will scale up the cluster faster with higher upscaling speed.
|
||||
# E.g., if the task requires adding more nodes then autoscaler will gradually
|
||||
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
|
||||
# This number should be > 0.
|
||||
upscaling_speed: 1.0
|
||||
|
||||
# Cloud-provider specific configuration.
|
||||
provider:
|
||||
type: staroid
|
||||
@@ -108,6 +114,4 @@ worker_default_node_type: cpu_4_spot
|
||||
# type configs given above.
|
||||
#worker_nodes:
|
||||
|
||||
# Configure the cluster for very conservative auto-scaling otherwise.
|
||||
target_utilization_fraction: 0.9
|
||||
idle_timeout_minutes: 5
|
||||
|
||||
+142
-170
@@ -18,10 +18,12 @@ from ray.autoscaler._private import commands
|
||||
from ray.autoscaler.sdk import get_docker_host_mount_location
|
||||
from ray.autoscaler._private.load_metrics import LoadMetrics
|
||||
from ray.autoscaler._private.autoscaler import StandardAutoscaler
|
||||
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
|
||||
_clear_provider_cache)
|
||||
from ray.autoscaler._private.providers import (
|
||||
_NODE_PROVIDERS, _clear_provider_cache, _DEFAULT_CONFIGS)
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE, \
|
||||
NODE_TYPE_LEGACY_HEAD, NODE_TYPE_LEGACY_WORKER, NODE_KIND_HEAD, \
|
||||
NODE_KIND_WORKER
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.test_utils import RayTestTimeoutException
|
||||
import pytest
|
||||
@@ -254,60 +256,6 @@ SMALL_CLUSTER = {
|
||||
|
||||
|
||||
class LoadMetricsTest(unittest.TestCase):
|
||||
def testUpdate(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
|
||||
assert lm.approx_workers_used() == 0.5
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
assert lm.approx_workers_used() == 1.0
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
|
||||
assert lm.approx_workers_used() == 2.0
|
||||
|
||||
def testLoadMessages(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 0.5)
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1})
|
||||
self.assertEqual(lm.approx_workers_used(), 1.0)
|
||||
|
||||
# Both nodes count as busy since there is a queue on one.
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 2}, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
|
||||
# No queue anymore, so we're back to exact accounting.
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 1.5)
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
|
||||
lm.update("3.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("4.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("5.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("6.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("7.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("8.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 8.0)
|
||||
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) # no queue anymore
|
||||
self.assertEqual(lm.approx_workers_used(), 4.5)
|
||||
|
||||
def testPruneByNodeIp(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {})
|
||||
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
|
||||
assert lm.approx_workers_used() == 1.0
|
||||
|
||||
def testBottleneckResource(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
|
||||
assert lm.approx_workers_used() == 1.88
|
||||
|
||||
def testHeartbeat(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
|
||||
@@ -331,14 +279,13 @@ class LoadMetricsTest(unittest.TestCase):
|
||||
assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, "
|
||||
"1.05 GiB/1.05 GiB memory, "
|
||||
"1.05 GiB/2.1 GiB object_store_memory") in debug
|
||||
assert "NumNodesConnected: 3" in debug
|
||||
assert "NumNodesUsed: 2.88" in debug
|
||||
|
||||
|
||||
class AutoscalingTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
_NODE_PROVIDERS["mock"] = \
|
||||
lambda config: self.create_provider
|
||||
_DEFAULT_CONFIGS["mock"] = _DEFAULT_CONFIGS["local"]
|
||||
self.provider = None
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
|
||||
@@ -375,10 +322,13 @@ class AutoscalingTest(unittest.TestCase):
|
||||
assert self.provider
|
||||
return self.provider
|
||||
|
||||
def write_config(self, config):
|
||||
def write_config(self, config, call_prepare_config=True):
|
||||
new_config = copy.deepcopy(config)
|
||||
if call_prepare_config:
|
||||
new_config = prepare_config(new_config)
|
||||
path = os.path.join(self.tmpdir, "simple.yaml")
|
||||
with open(path, "w") as f:
|
||||
f.write(yaml.dump(config))
|
||||
f.write(yaml.dump(new_config))
|
||||
return path
|
||||
|
||||
def testAutoscalerConfigValidationFailNotFatal(self):
|
||||
@@ -386,7 +336,6 @@ class AutoscalingTest(unittest.TestCase):
|
||||
# First check that this config is actually invalid
|
||||
with pytest.raises(ValidationError):
|
||||
validate_config(invalid_config)
|
||||
|
||||
config_path = self.write_config(invalid_config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
@@ -579,37 +528,6 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
|
||||
def testManualAutoscaling(self):
|
||||
config = SMALL_CLUSTER.copy()
|
||||
config["min_workers"] = 0
|
||||
config["max_workers"] = 50
|
||||
cores_per_node = 2
|
||||
config["worker_nodes"] = {"Resources": {"CPU": cores_per_node}}
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
max_launch_batch=5,
|
||||
max_concurrent_launches=5,
|
||||
max_failures=0,
|
||||
process_runner=runner,
|
||||
update_interval_s=0)
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
autoscaler.update()
|
||||
self.waitForNodes(0)
|
||||
autoscaler.request_resources({"CPU": cores_per_node * 10})
|
||||
for _ in range(5): # Maximum launch batch is 5
|
||||
time.sleep(0.01)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(10)
|
||||
autoscaler.request_resources({"CPU": cores_per_node * 30})
|
||||
for _ in range(4): # Maximum launch batch is 5
|
||||
time.sleep(0.01)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(30)
|
||||
|
||||
def testTerminateOutdatedNodesGracefully(self):
|
||||
config = SMALL_CLUSTER.copy()
|
||||
config["min_workers"] = 5
|
||||
@@ -639,9 +557,10 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
lm,
|
||||
max_launch_batch=5,
|
||||
max_concurrent_launches=5,
|
||||
max_failures=0,
|
||||
@@ -663,11 +582,17 @@ class AutoscalingTest(unittest.TestCase):
|
||||
new_config["max_workers"] = 10
|
||||
self.write_config(new_config)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(6)
|
||||
# Because one worker already started, the scheduler waits for its
|
||||
# resources to be updated before it launches the remaining min_workers.
|
||||
self.waitForNodes(1)
|
||||
worker_ip = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
|
||||
lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {})
|
||||
autoscaler.update()
|
||||
self.waitForNodes(10)
|
||||
|
||||
def testInitialWorkers(self):
|
||||
"""initial_workers is deprecated, this tests that it is ignored."""
|
||||
config = SMALL_CLUSTER.copy()
|
||||
config["min_workers"] = 0
|
||||
config["max_workers"] = 20
|
||||
@@ -685,10 +610,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
update_interval_s=0)
|
||||
self.waitForNodes(0)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(5) # expected due to batch sizes and concurrency
|
||||
autoscaler.update()
|
||||
self.waitForNodes(10)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(0)
|
||||
|
||||
def testAggressiveAutoscaling(self):
|
||||
config = SMALL_CLUSTER.copy()
|
||||
@@ -696,13 +618,16 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config["max_workers"] = 20
|
||||
config["initial_workers"] = 10
|
||||
config["idle_timeout_minutes"] = 0
|
||||
config["autoscaling_mode"] = "aggressive"
|
||||
config["upscaling_speed"] = config["max_workers"]
|
||||
config_path = self.write_config(config)
|
||||
|
||||
self.provider = MockProvider()
|
||||
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1)
|
||||
self.provider.create_node({}, {
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
|
||||
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
|
||||
}, 1)
|
||||
head_ip = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_HEAD}, )[0]
|
||||
runner = MockProcessRunner()
|
||||
|
||||
lm = LoadMetrics()
|
||||
@@ -718,32 +643,45 @@ class AutoscalingTest(unittest.TestCase):
|
||||
update_interval_s=0)
|
||||
|
||||
self.waitForNodes(1)
|
||||
lm.update(
|
||||
head_ip, {"CPU": 1}, {"CPU": 0}, {},
|
||||
waiting_bundles=[{
|
||||
"CPU": 1
|
||||
}] * 7,
|
||||
infeasible_bundles=[{
|
||||
"CPU": 1
|
||||
}] * 3)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(6) # expected due to batch sizes and concurrency
|
||||
self.waitForNodes(2) # launches a single node to get its resources
|
||||
worker_ip = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
|
||||
lm.update(
|
||||
worker_ip, {"CPU": 1}, {"CPU": 1}, {},
|
||||
waiting_bundles=[{
|
||||
"CPU": 1
|
||||
}] * 7,
|
||||
infeasible_bundles=[{
|
||||
"CPU": 1
|
||||
}] * 3)
|
||||
# Otherwise the worker is immediately terminated due to being idle.
|
||||
lm.last_used_time_by_ip[worker_ip] = time.time() + 5
|
||||
autoscaler.update()
|
||||
self.waitForNodes(11)
|
||||
|
||||
# Connect the head and workers to end the bringup phase
|
||||
addrs = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: "worker"}, )
|
||||
addrs += head_ip
|
||||
for addr in addrs:
|
||||
lm.update(addr, {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update(addr, {"CPU": 2}, {"CPU": 2}, {})
|
||||
assert autoscaler.bringup
|
||||
worker_ips = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )
|
||||
for ip in worker_ips:
|
||||
lm.last_used_time_by_ip[ip] = 0
|
||||
autoscaler.update()
|
||||
|
||||
assert not autoscaler.bringup
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
|
||||
# All of the nodes are down. Simulate some load on the head node
|
||||
lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {})
|
||||
|
||||
autoscaler.update()
|
||||
self.waitForNodes(6) # expected due to batch sizes and concurrency
|
||||
autoscaler.update()
|
||||
self.waitForNodes(11)
|
||||
self.waitForNodes(1) # only the head node
|
||||
# Make sure they don't get overwritten.
|
||||
assert autoscaler.resource_demand_scheduler.node_types[
|
||||
NODE_TYPE_LEGACY_HEAD]["resources"] == {
|
||||
"CPU": 1
|
||||
}
|
||||
assert autoscaler.resource_demand_scheduler.node_types[
|
||||
NODE_TYPE_LEGACY_WORKER]["resources"] == {
|
||||
"CPU": 1
|
||||
}
|
||||
|
||||
def testUnmanagedNodes(self):
|
||||
config = SMALL_CLUSTER.copy()
|
||||
@@ -786,7 +724,11 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
# 1 CPU task cannot be scheduled.
|
||||
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {"CPU": 1})
|
||||
lm.update(
|
||||
unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {},
|
||||
waiting_bundles=[{
|
||||
"CPU": 1
|
||||
}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(3)
|
||||
|
||||
@@ -868,7 +810,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.update()
|
||||
assert len(self.provider.non_terminated_nodes({})) == 1
|
||||
|
||||
def testDelayedLaunchWithFailure(self):
|
||||
def testDelayedLaunchWithMinWorkers(self):
|
||||
config = SMALL_CLUSTER.copy()
|
||||
config["min_workers"] = 10
|
||||
config["max_workers"] = 10
|
||||
@@ -892,33 +834,16 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.update()
|
||||
# Synchronization: wait for launchy thread to be blocked on rtc1
|
||||
waiters = rtc1._cond._waiters
|
||||
self.waitFor(lambda: len(waiters) == 1)
|
||||
assert autoscaler.pending_launches.value == 5
|
||||
self.waitFor(lambda: len(waiters) == 2)
|
||||
assert autoscaler.pending_launches.value == 10
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
|
||||
# Call update() to launch a second wave of 3 nodes,
|
||||
# as 5 + 3 = 8 = max_concurrent_launches.
|
||||
# Make this wave complete immediately.
|
||||
rtc2 = threading.Event()
|
||||
self.provider.ready_to_create = rtc2
|
||||
rtc2.set()
|
||||
autoscaler.update()
|
||||
self.waitForNodes(3)
|
||||
assert autoscaler.pending_launches.value == 5
|
||||
|
||||
# The first wave of 5 will now tragically fail
|
||||
self.provider.fail_creates = True
|
||||
self.waitForNodes(0) # Nodes are not added on top of pending.
|
||||
rtc1.set()
|
||||
self.waitFor(lambda: autoscaler.pending_launches.value == 0)
|
||||
assert len(self.provider.non_terminated_nodes({})) == 3
|
||||
|
||||
# Retry the first wave, allowing it to succeed this time
|
||||
self.provider.fail_creates = False
|
||||
autoscaler.update()
|
||||
self.waitForNodes(8)
|
||||
assert len(self.provider.non_terminated_nodes({})) == 10
|
||||
self.waitForNodes(10)
|
||||
assert autoscaler.pending_launches.value == 0
|
||||
|
||||
# Final wave of 2 nodes
|
||||
autoscaler.update()
|
||||
self.waitForNodes(10)
|
||||
assert autoscaler.pending_launches.value == 0
|
||||
@@ -971,9 +896,10 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
lm,
|
||||
max_launch_batch=10,
|
||||
max_concurrent_launches=10,
|
||||
process_runner=runner,
|
||||
@@ -983,7 +909,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
self.waitForNodes(2)
|
||||
|
||||
# Write a corrupted config
|
||||
self.write_config("asdf")
|
||||
self.write_config("asdf", call_prepare_config=False)
|
||||
for _ in range(10):
|
||||
autoscaler.update()
|
||||
time.sleep(0.1)
|
||||
@@ -995,6 +921,11 @@ class AutoscalingTest(unittest.TestCase):
|
||||
new_config["min_workers"] = 10
|
||||
new_config["max_workers"] = 10
|
||||
self.write_config(new_config)
|
||||
worker_ip = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
|
||||
# Because one worker already started, the scheduler waits for its
|
||||
# resources to be updated before it launches the remaining min_workers.
|
||||
lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {})
|
||||
autoscaler.update()
|
||||
self.waitForNodes(10)
|
||||
|
||||
@@ -1120,11 +1051,24 @@ class AutoscalingTest(unittest.TestCase):
|
||||
|
||||
# Scales up as nodes are reported as used
|
||||
local_ip = services.get_node_ip_address()
|
||||
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
|
||||
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1
|
||||
lm.update(
|
||||
local_ip, {"CPU": 2}, {"CPU": 0}, {},
|
||||
waiting_bundles=2 * [{
|
||||
"CPU": 2
|
||||
}]) # head
|
||||
autoscaler.update()
|
||||
lm.update(
|
||||
"172.0.0.0", {"CPU": 2}, {"CPU": 0}, {},
|
||||
waiting_bundles=2 * [{
|
||||
"CPU": 2
|
||||
}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(3)
|
||||
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update(
|
||||
"172.0.0.1", {"CPU": 2}, {"CPU": 0}, {},
|
||||
waiting_bundles=3 * [{
|
||||
"CPU": 2
|
||||
}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(5)
|
||||
|
||||
@@ -1139,6 +1083,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
lm.last_used_time_by_ip["172.0.0.0"] = 0
|
||||
lm.last_used_time_by_ip["172.0.0.1"] = 0
|
||||
autoscaler.update()
|
||||
|
||||
assert autoscaler.pending_launches.value == 0
|
||||
assert len(self.provider.non_terminated_nodes({})) == 3
|
||||
lm.last_used_time_by_ip["172.0.0.2"] = 0
|
||||
@@ -1147,11 +1092,11 @@ class AutoscalingTest(unittest.TestCase):
|
||||
assert autoscaler.pending_launches.value == 0
|
||||
assert len(self.provider.non_terminated_nodes({})) == 1
|
||||
|
||||
def testDontScaleBelowTarget(self):
|
||||
def testTargetUtilizationFraction(self):
|
||||
config = SMALL_CLUSTER.copy()
|
||||
config["min_workers"] = 0
|
||||
config["max_workers"] = 2
|
||||
config["target_utilization_fraction"] = 0.5
|
||||
config["max_workers"] = 20
|
||||
config["upscaling_speed"] = 10
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
lm = LoadMetrics()
|
||||
@@ -1166,25 +1111,49 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.update()
|
||||
assert autoscaler.pending_launches.value == 0
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
|
||||
# Scales up as nodes are reported as used
|
||||
local_ip = services.get_node_ip_address()
|
||||
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
|
||||
# 1.0 nodes used => target nodes = 2 => target workers = 1
|
||||
self.provider.create_node({}, {
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
|
||||
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
|
||||
}, 1)
|
||||
head_ip = self.provider.non_terminated_node_ips({})[0]
|
||||
lm.local_ip = head_ip
|
||||
lm.update(
|
||||
head_ip, {"CPU": 2}, {"CPU": 1}, {}, waiting_bundles=[{
|
||||
"CPU": 1
|
||||
}]) # head
|
||||
# The headnode should be sufficient for now
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
|
||||
# Make new node idle, and never used.
|
||||
# Should hold steady as target is still 2.
|
||||
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
|
||||
lm.last_used_time_by_ip["172.0.0.0"] = 0
|
||||
# Requires 1 more worker as the head node is fully used.
|
||||
lm.update(
|
||||
head_ip, {"CPU": 2}, {"CPU": 0}, {}, waiting_bundles=[{
|
||||
"CPU": 1
|
||||
}])
|
||||
autoscaler.update()
|
||||
assert len(self.provider.non_terminated_nodes({})) == 1
|
||||
self.waitForNodes(2) # 1 worker is added to get its resources.
|
||||
worker_ip = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
|
||||
lm.update(
|
||||
worker_ip, {"CPU": 1}, {"CPU": 1}, {},
|
||||
waiting_bundles=[{
|
||||
"CPU": 1
|
||||
}] * 7,
|
||||
infeasible_bundles=[{
|
||||
"CPU": 1
|
||||
}] * 4)
|
||||
# Add another 10 workers (frac=1/0.1=10, 1 worker running, 10*1=10)
|
||||
# and bypass constraint of 5 due to target utiization fraction.
|
||||
autoscaler.update()
|
||||
self.waitForNodes(12)
|
||||
|
||||
# Reduce load on head => target nodes = 1 => target workers = 0
|
||||
lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {})
|
||||
worker_ips = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )
|
||||
for ip in worker_ips:
|
||||
lm.last_used_time_by_ip[ip] = 0
|
||||
autoscaler.update()
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
self.waitForNodes(1) # only the head node
|
||||
assert len(self.provider.non_terminated_nodes({})) == 1
|
||||
|
||||
def testRecoverUnhealthyWorkers(self):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
@@ -1243,7 +1212,10 @@ class AutoscalingTest(unittest.TestCase):
|
||||
"type": "external",
|
||||
"module": "does-not-exist",
|
||||
}
|
||||
invalid_provider = self.write_config(config)
|
||||
with pytest.raises(ValueError):
|
||||
invalid_provider = self.write_config(
|
||||
config, call_prepare_config=True)
|
||||
invalid_provider = self.write_config(config, call_prepare_config=False)
|
||||
with pytest.raises(ValueError):
|
||||
StandardAutoscaler(
|
||||
invalid_provider, LoadMetrics(), update_interval_s=0)
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
|
||||
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
|
||||
Checking AWS environment settings
|
||||
Fetched IP: .+
|
||||
ubuntu@ip-.+:~\$ exit
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
|
||||
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
|
||||
Checking AWS environment settings
|
||||
Fetched IP: .+
|
||||
This is a test!
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
|
||||
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
|
||||
Checking AWS environment settings
|
||||
Fetched IP: .+
|
||||
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
|
||||
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
|
||||
Checking AWS environment settings
|
||||
Fetched IP: .+
|
||||
This is a test!
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
Cluster: test-cli
|
||||
|
||||
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
|
||||
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
|
||||
Checking AWS environment settings
|
||||
AWS config
|
||||
IAM Profile: .+ \[default\]
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
.+\.py.*Cluster: test-cli
|
||||
.+\.py.*Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
|
||||
.+\.py.*Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
|
||||
.+\.py.*Checking AWS environment settings
|
||||
.+\.py.*Creating new IAM instance profile ray-autoscaler-v1 for use as the default\.
|
||||
.+\.py.*Creating new IAM role ray-autoscaler-v1 for use as the default instance role\.
|
||||
|
||||
@@ -78,23 +78,23 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
|
||||
monitor.process_messages()
|
||||
resource_usage = monitor.load_metrics._get_resource_usage()
|
||||
|
||||
if "memory" in resource_usage[0]:
|
||||
del resource_usage[0]["memory"]
|
||||
if "object_store_memory" in resource_usage[1]:
|
||||
del resource_usage[0]["object_store_memory"]
|
||||
if "memory" in resource_usage[1]:
|
||||
del resource_usage[1]["memory"]
|
||||
if "object_store_memory" in resource_usage[2]:
|
||||
if "object_store_memory" in resource_usage[1]:
|
||||
del resource_usage[1]["object_store_memory"]
|
||||
if "memory" in resource_usage[2]:
|
||||
del resource_usage[2]["memory"]
|
||||
if "object_store_memory" in resource_usage[2]:
|
||||
del resource_usage[2]["object_store_memory"]
|
||||
for key in list(resource_usage[0].keys()):
|
||||
if key.startswith("node:"):
|
||||
del resource_usage[0][key]
|
||||
for key in list(resource_usage[1].keys()):
|
||||
if key.startswith("node:"):
|
||||
del resource_usage[1][key]
|
||||
for key in list(resource_usage[2].keys()):
|
||||
if key.startswith("node:"):
|
||||
del resource_usage[2][key]
|
||||
|
||||
if expected_resource_usage is None:
|
||||
if all(x for x in resource_usage[1:]):
|
||||
if all(x for x in resource_usage[0:]):
|
||||
break
|
||||
elif all(x == y
|
||||
for x, y in zip(resource_usage, expected_resource_usage)):
|
||||
@@ -125,7 +125,7 @@ def test_heartbeats_single(ray_start_cluster_head):
|
||||
cluster = ray_start_cluster_head
|
||||
monitor = setup_monitor(cluster.address)
|
||||
total_cpus = ray.state.cluster_resources()["CPU"]
|
||||
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": total_cpus}))
|
||||
verify_load_metrics(monitor, ({"CPU": 0.0}, {"CPU": total_cpus}))
|
||||
|
||||
@ray.remote
|
||||
def work(signal):
|
||||
@@ -139,11 +139,7 @@ def test_heartbeats_single(ray_start_cluster_head):
|
||||
signal = SignalActor.remote()
|
||||
|
||||
work_handle = work.remote(signal)
|
||||
verify_load_metrics(monitor, (1.0 / total_cpus, {
|
||||
"CPU": 1.0
|
||||
}, {
|
||||
"CPU": total_cpus
|
||||
}))
|
||||
verify_load_metrics(monitor, ({"CPU": 1.0}, {"CPU": total_cpus}))
|
||||
|
||||
ray.get(signal.send.remote())
|
||||
ray.get(work_handle)
|
||||
@@ -163,11 +159,7 @@ def test_heartbeats_single(ray_start_cluster_head):
|
||||
test_actor = Actor.remote()
|
||||
work_handle = test_actor.work.remote(signal)
|
||||
|
||||
verify_load_metrics(monitor, (1.0 / total_cpus, {
|
||||
"CPU": 1.0
|
||||
}, {
|
||||
"CPU": total_cpus
|
||||
}))
|
||||
verify_load_metrics(monitor, ({"CPU": 1.0}, {"CPU": total_cpus}))
|
||||
|
||||
ray.get(signal.send.remote())
|
||||
ray.get(work_handle)
|
||||
|
||||
@@ -1025,7 +1025,10 @@ class AutoscalingTest(unittest.TestCase):
|
||||
"empty_node")
|
||||
|
||||
def testScaleUpMinSanity(self):
|
||||
config_path = self.write_config(MULTI_WORKER_CLUSTER)
|
||||
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
|
||||
config["available_node_types"]["m4.large"]["min_workers"] = \
|
||||
config["min_workers"]
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
autoscaler = StandardAutoscaler(
|
||||
@@ -1109,11 +1112,9 @@ class AutoscalingTest(unittest.TestCase):
|
||||
|
||||
def testScaleUpMinWorkers(self):
|
||||
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
|
||||
config["min_workers"] = 2
|
||||
config["max_workers"] = 50
|
||||
config["idle_timeout_minutes"] = 1
|
||||
# Since config["min_workers"] > 1, the remaining worker is started
|
||||
# with the default worker node type.
|
||||
config["available_node_types"]["m4.large"]["min_workers"] = 1
|
||||
config["available_node_types"]["p2.8xlarge"]["min_workers"] = 1
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
@@ -1467,7 +1468,9 @@ class AutoscalingTest(unittest.TestCase):
|
||||
"p2x_image:nightly")
|
||||
|
||||
def testUpdateConfig(self):
|
||||
config = MULTI_WORKER_CLUSTER.copy()
|
||||
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
|
||||
config["available_node_types"]["m4.large"]["min_workers"] = \
|
||||
config["min_workers"]
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
@@ -1480,7 +1483,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
config["min_workers"] = 0
|
||||
config["available_node_types"]["m4.large"]["min_workers"] = 0
|
||||
config["available_node_types"]["m4.large"]["node_config"][
|
||||
"field_changed"] = 1
|
||||
config_path = self.write_config(config)
|
||||
|
||||
Reference in New Issue
Block a user