diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst index 64117132c..0b80f0d8e 100644 --- a/doc/source/cluster/autoscaling.rst +++ b/doc/source/cluster/autoscaling.rst @@ -36,6 +36,21 @@ The basic autoscaling config settings are as follows: # considered idle if there are no tasks or actors running on it. idle_timeout_minutes: 5 + +Manually Adding Nodes without Resources (Unmanaged Nodes) +--------------------------------------------------------- + +In some cases, adding special nodes without any resources (i.e. `num_cpus=0`) may be desirable. Such nodes can be used as a driver which connects to the cluster to launch jobs. + +In order to manually add a node to an autoscaled cluster, the `ray-cluster-name` tag should be set and `ray-node-type` tag should be set to `unmanaged`. + +Unmanaged nodes **must have 0 resources**. + +If you are using the `available_node_types` field, you should create a custom node type with `resources: {}`, and `max_workers: 0` when configuring the autoscaler. + +The autoscaler will not attempt to start, stop, or update unmanaged nodes. The user is responsible for properly setting up and cleaning up unmanaged nodes. + + Multiple Node Type Autoscaling ------------------------------ diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index deec0386d..eb907176a 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -13,10 +13,11 @@ import yaml from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized from ray.autoscaler.node_provider import get_node_provider -from ray.autoscaler.tags import ( - TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, - TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, - TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER) +from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, + TAG_RAY_FILE_MOUNTS_CONTENTS, + TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, + TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, + NODE_KIND_WORKER, NODE_KIND_UNMANAGED) from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.node_launcher import NodeLauncher from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler @@ -146,10 +147,12 @@ class StandardAutoscaler: nodes = self.workers() # Check pending nodes immediately after fetching the number of running # nodes to minimize chance number of pending nodes changing after - # additional nodes are launched. + # additional nodes (managed and unmanaged) are launched. num_pending = self.pending_launches.value - self.load_metrics.prune_active_ips( - [self.provider.internal_ip(node_id) for node_id in nodes]) + self.load_metrics.prune_active_ips([ + self.provider.internal_ip(node_id) + for node_id in self.all_workers() + ]) target_workers = self.target_num_workers() if len(nodes) >= target_workers: @@ -165,8 +168,9 @@ class StandardAutoscaler: nodes_to_terminate = [] for node_id in nodes: node_ip = self.provider.internal_ip(node_id) - if node_ip in last_used and last_used[node_ip] < horizon and \ - len(nodes) - len(nodes_to_terminate) > target_workers: + if (node_ip in last_used and last_used[node_ip] < horizon) and \ + (len(nodes) - len(nodes_to_terminate) + > target_workers): logger.info("StandardAutoscaler: " "{}: Terminating idle node".format(node_id)) nodes_to_terminate.append(node_id) @@ -182,11 +186,12 @@ class StandardAutoscaler: # Terminate nodes if there are too many nodes_to_terminate = [] - while len(nodes) > self.config["max_workers"]: + while (len(nodes) - + len(nodes_to_terminate)) > self.config["max_workers"] and nodes: + to_terminate = nodes.pop() logger.info("StandardAutoscaler: " - "{}: Terminating unneeded node".format(nodes[-1])) - nodes_to_terminate.append(nodes[-1]) - nodes = nodes[:-1] + "{}: Terminating unneeded node".format(to_terminate)) + nodes_to_terminate.append(to_terminate) if nodes_to_terminate: self.provider.terminate_nodes(nodes_to_terminate) @@ -511,10 +516,17 @@ class StandardAutoscaler: config = copy.deepcopy(self.config) self.launch_queue.put((config, count, node_type)) + def all_workers(self): + return self.workers() + self.unmanaged_workers() + def workers(self): return self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + def unmanaged_workers(self): + return self.provider.non_terminated_nodes( + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_UNMANAGED}) + def log_info_string(self, nodes, target): tmp = "Cluster status: " tmp += self.info_string(nodes, target) diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/resource_demand_scheduler.py index faed8c478..00acd2f01 100644 --- a/python/ray/autoscaler/resource_demand_scheduler.py +++ b/python/ray/autoscaler/resource_demand_scheduler.py @@ -14,7 +14,7 @@ import collections from typing import List, Dict, Tuple from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED logger = logging.getLogger(__name__) @@ -90,9 +90,14 @@ class ResourceDemandScheduler: def add_node(node_type, available_resources=None): if node_type not in self.node_types: - raise RuntimeError("Missing entry for node_type {} in " - "available_node_types config: {}".format( - node_type, self.node_types)) + logger.warn( + f"Missing entry for node_type {node_type} in " + f"cluster config: {self.node_types} under entry " + f"available_node_types. This node's resources will be " + f"ignored. If you are using an unmanaged node, manually " + f"set the user_node_type tag to \"{NODE_KIND_UNMANAGED}\"" + f"in your cloud provider's management console.") + return None # Careful not to include the same dict object multiple times. available = copy.deepcopy(self.node_types[node_type]["resources"]) # If available_resources is None this might be because the node is diff --git a/python/ray/autoscaler/tags.py b/python/ray/autoscaler/tags.py index 468dadd1c..2326f54e3 100644 --- a/python/ray/autoscaler/tags.py +++ b/python/ray/autoscaler/tags.py @@ -8,6 +8,7 @@ TAG_RAY_NODE_NAME = "ray-node-name" TAG_RAY_NODE_KIND = "ray-node-type" NODE_KIND_HEAD = "head" NODE_KIND_WORKER = "worker" +NODE_KIND_UNMANAGED = "unmanaged" # Tag for user defined node types (e.g., m4xl_spot). This is used for multi # node type clusters. diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index daf21d738..ab19d070b 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -612,6 +612,50 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(11) + def testUnmanagedNodes(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 20 + config["initial_workers"] = 0 + config["idle_timeout_minutes"] = 0 + config["autoscaling_mode"] = "aggressive" + config["target_utilization_fraction"] = 0.8 + config_path = self.write_config(config) + + self.provider = MockProvider() + self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1) + head_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] + + self.provider.create_node({}, {TAG_RAY_NODE_KIND: "unmanaged"}, 1) + unmanaged_ip = self.provider.non_terminated_node_ips( + tag_filters={TAG_RAY_NODE_KIND: "unmanaged"}, )[0] + + runner = MockProcessRunner() + + lm = LoadMetrics() + lm.local_ip = head_ip + + autoscaler = StandardAutoscaler( + config_path, + lm, + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + process_runner=runner, + update_interval_s=0) + + autoscaler.update() + self.waitForNodes(2) + # This node has num_cpus=0 + lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {}) + autoscaler.update() + self.waitForNodes(2) + # 1 CPU task cannot be scheduled. + lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {"CPU": 1}) + autoscaler.update() + self.waitForNodes(3) + def testDelayedLaunch(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider()