[Autoscaler] Unmanaged nodes (#10513)

This commit is contained in:
Alex Wu
2020-09-13 11:58:47 -07:00
committed by GitHub
parent 8166d71bde
commit d0b73647b4
5 changed files with 94 additions and 17 deletions
+25 -13
View File
@@ -13,10 +13,11 @@ import yaml
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.tags import (
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER)
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED)
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.node_launcher import NodeLauncher
from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler
@@ -146,10 +147,12 @@ class StandardAutoscaler:
nodes = self.workers()
# Check pending nodes immediately after fetching the number of running
# nodes to minimize chance number of pending nodes changing after
# additional nodes are launched.
# additional nodes (managed and unmanaged) are launched.
num_pending = self.pending_launches.value
self.load_metrics.prune_active_ips(
[self.provider.internal_ip(node_id) for node_id in nodes])
self.load_metrics.prune_active_ips([
self.provider.internal_ip(node_id)
for node_id in self.all_workers()
])
target_workers = self.target_num_workers()
if len(nodes) >= target_workers:
@@ -165,8 +168,9 @@ class StandardAutoscaler:
nodes_to_terminate = []
for node_id in nodes:
node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon and \
len(nodes) - len(nodes_to_terminate) > target_workers:
if (node_ip in last_used and last_used[node_ip] < horizon) and \
(len(nodes) - len(nodes_to_terminate)
> target_workers):
logger.info("StandardAutoscaler: "
"{}: Terminating idle node".format(node_id))
nodes_to_terminate.append(node_id)
@@ -182,11 +186,12 @@ class StandardAutoscaler:
# Terminate nodes if there are too many
nodes_to_terminate = []
while len(nodes) > self.config["max_workers"]:
while (len(nodes) -
len(nodes_to_terminate)) > self.config["max_workers"] and nodes:
to_terminate = nodes.pop()
logger.info("StandardAutoscaler: "
"{}: Terminating unneeded node".format(nodes[-1]))
nodes_to_terminate.append(nodes[-1])
nodes = nodes[:-1]
"{}: Terminating unneeded node".format(to_terminate))
nodes_to_terminate.append(to_terminate)
if nodes_to_terminate:
self.provider.terminate_nodes(nodes_to_terminate)
@@ -511,10 +516,17 @@ class StandardAutoscaler:
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count, node_type))
def all_workers(self):
return self.workers() + self.unmanaged_workers()
def workers(self):
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def unmanaged_workers(self):
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED})
def log_info_string(self, nodes, target):
tmp = "Cluster status: "
tmp += self.info_string(nodes, target)
@@ -14,7 +14,7 @@ import collections
from typing import List, Dict, Tuple
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED
logger = logging.getLogger(__name__)
@@ -90,9 +90,14 @@ class ResourceDemandScheduler:
def add_node(node_type, available_resources=None):
if node_type not in self.node_types:
raise RuntimeError("Missing entry for node_type {} in "
"available_node_types config: {}".format(
node_type, self.node_types))
logger.warn(
f"Missing entry for node_type {node_type} in "
f"cluster config: {self.node_types} under entry "
f"available_node_types. This node's resources will be "
f"ignored. If you are using an unmanaged node, manually "
f"set the user_node_type tag to \"{NODE_KIND_UNMANAGED}\""
f"in your cloud provider's management console.")
return None
# Careful not to include the same dict object multiple times.
available = copy.deepcopy(self.node_types[node_type]["resources"])
# If available_resources is None this might be because the node is
+1
View File
@@ -8,6 +8,7 @@ TAG_RAY_NODE_NAME = "ray-node-name"
TAG_RAY_NODE_KIND = "ray-node-type"
NODE_KIND_HEAD = "head"
NODE_KIND_WORKER = "worker"
NODE_KIND_UNMANAGED = "unmanaged"
# Tag for user defined node types (e.g., m4xl_spot). This is used for multi
# node type clusters.
+44
View File
@@ -612,6 +612,50 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(11)
def testUnmanagedNodes(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 20
config["initial_workers"] = 0
config["idle_timeout_minutes"] = 0
config["autoscaling_mode"] = "aggressive"
config["target_utilization_fraction"] = 0.8
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "unmanaged"}, 1)
unmanaged_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0]
runner = MockProcessRunner()
lm = LoadMetrics()
lm.local_ip = head_ip
autoscaler = StandardAutoscaler(
config_path,
lm,
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
# This node has num_cpus=0
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {})
autoscaler.update()
self.waitForNodes(2)
# 1 CPU task cannot be scheduled.
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {"CPU": 1})
autoscaler.update()
self.waitForNodes(3)
def testDelayedLaunch(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()