From f6a1698baba4ed6f9bea9c1c4e8a987511468f2f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 28 Aug 2020 19:57:21 -0700 Subject: [PATCH] [autoscaler] Add documentation for multi node type autoscaling (#10405) --- doc/source/cluster/autoscaling.rst | 116 ++++++++++++++++++ doc/source/cluster/launcher.rst | 9 -- doc/source/index.rst | 1 + python/ray/autoscaler/aws/example-full.yaml | 2 +- .../aws/example-multi-node-type.yaml | 17 ++- .../autoscaler/resource_demand_scheduler.py | 84 ++++++++----- 6 files changed, 176 insertions(+), 53 deletions(-) create mode 100644 doc/source/cluster/autoscaling.rst diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst new file mode 100644 index 000000000..4113aa795 --- /dev/null +++ b/doc/source/cluster/autoscaling.rst @@ -0,0 +1,116 @@ +.. _ref-autoscaling: + +Cluster Autoscaling +=================== + +Basics +------ + +The Ray Cluster Launcher will automatically enable a load-based autoscaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up to the specified ``max_workers`` limit (specified in the cluster config). When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed. + +In more detail, the autoscaler implements the following control loop: + + 1. It calculates the estimated utilization of the cluster based on the most-currently-assigned resource. For example, suppose a cluster has 100/200 CPUs assigned, but 20/25 GPUs assigned, then the utilization will be considered to be max(100/200, 15/25) = 60%. + 2. If the estimated utilization is greater than the target (80% by default), then the autoscaler will attempt to add nodes to the cluster. + 3. If a node is idle for a timeout (5 minutes by default), it is removed from the cluster. + +The basic autoscaling config settings are as follows: + +.. code:: + + # An unique identifier for the head node and workers of this cluster. + cluster_name: default + + # The minimum number of workers nodes to launch in addition to the head + # node. This number should be >= 0. + min_workers: 0 + + # The autoscaler will scale up the cluster to this target fraction of resource + # usage. For example, if a cluster of 10 nodes is 100% busy and + # target_utilization is 0.8, it would resize the cluster to 13. This fraction + # can be decreased to increase the aggressiveness of upscaling. + # The max value allowed is 1.0, which is the most conservative setting. + target_utilization_fraction: 0.8 + + # If a node is idle for this many minutes, it will be removed. A node is + # considered idle if there are no tasks or actors running on it. + idle_timeout_minutes: 5 + +Multiple Node Type Autoscaling +------------------------------ + +Ray supports multiple node types in a single cluster. In this mode of operation, the scheduler will look at the queue of resource shape demands from the cluster (e.g., there might be 10 tasks queued each requesting ``{"GPU": 4, "CPU": 16}``), and tries to add the minimum set of nodes that can fulfill these resource demands. This enables precise, rapid scale up compared to looking only at resource utilization, as the autoscaler also has visiblity into the aggregate resource load. + +The concept of a cluster node type encompasses both the physical instance type (e.g., AWS p3.8xl GPU nodes vs m4.16xl CPU nodes), as well as other attributes (e.g., IAM role, the machine image, etc). `Custom resources `__ can be specified for each node type so that Ray is aware of the demand for specific node types at the application level (e.g., a task may request to be placed on a machine with a specific role or machine image via custom resource). + +Multi-node type autoscaling operates in conjunction with the basic autoscaler. You may want to configure the basic autoscaler accordingly to act conservatively (i.e., set ``target_utilization_fraction: 1.0``). + +An example of configuring multiple node types is as follows `(full example) `__: + +.. code:: + + # Specify the allowed node types and the resources they provide. + # The key is the name of the node type, which is just for debugging purposes. + # The node config specifies the launch config and physical instance type. + available_node_types: + cpu_4_ondemand: + node_config: + InstanceType: m4.xlarge + resources: {"CPU": 4} + max_workers: 5 + cpu_16_spot: + node_config: + InstanceType: m4.4xlarge + InstanceMarketOptions: + MarketType: spot + resources: {"CPU": 16, "Custom1": 1, "is_spot": 1} + max_workers: 10 + gpu_1_ondemand: + node_config: + InstanceType: p2.xlarge + resources: {"CPU": 4, "GPU": 1, "Custom2": 2} + max_workers: 4 + worker_setup_commands: + - pip install tensorflow-gpu # Example command. + gpu_8_ondemand: + node_config: + InstanceType: p2.8xlarge + resources: {"CPU": 32, "GPU": 8} + max_workers: 2 + worker_setup_commands: + - pip install tensorflow-gpu # Example command. + + # Specify the node type of the head node (as configured above). + head_node_type: cpu_4_ondemand + + # Specify the default type of the worker node (as configured above). + worker_default_node_type: cpu_16_spot + + +The above config defines two CPU node types (``cpu_4_ondemand`` and ``cpu_16_spot``), and two GPU types (``gpu_1_ondemand`` and ``gpu_8_ondemand``). Each node type has a name (e.g., ``cpu_4_ondemand``), which has no semantic meaning and is only for debugging. Let's look at the inner fields of the ``gpu_1_ondemand`` node type: + +The node config tells the underlying Cloud provider how to launch a node of this type. This node config is merged with the top level node config of the YAML and can override fields (i.e., to specify the p2.xlarge instance type here): + +.. code:: + + node_config: + InstanceType: p2.xlarge + +The resources field tells the autoscaler what kinds of resources this node provides. This can include custom resources as well (e.g., "Custom2"). This field enables the autoscaler to automatically select the right kind of nodes to launch given the resource demands of the application. The resources specified here will be automatically passed to the ``ray start`` command for the node via an environment variable. For more information, see also the `resource demand scheduler `__: + +.. code:: + + resources: {"CPU": 4, "GPU": 1, "Custom2": 2} + +The ``max_workers`` field constrains the number of nodes of this type that can be launched: + +.. code:: + + max_workers: 4 + +The ``worker_setup_commands`` field can be used to override the setup and initialization commands for a node type. Note that you can only override the setup for worker nodes. The head node's setup commands are always configured via the top level field in the cluster YAML: + +.. code:: + + worker_setup_commands: + - pip install tensorflow-gpu # Example command. diff --git a/doc/source/cluster/launcher.rst b/doc/source/cluster/launcher.rst index a1f58102a..cde1eb305 100644 --- a/doc/source/cluster/launcher.rst +++ b/doc/source/cluster/launcher.rst @@ -213,15 +213,6 @@ This tells ``ray up`` to sync the current git branch SHA from your personal comp 2. Commit the changes with ``git commit`` and ``git push`` 3. Update files on your Ray cluster with ``ray up`` - -Autoscaling ------------ - -The Ray Cluster Launcher will automatically enable a load-based autoscaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up the specified ``max_workers`` limit (in the cluster config). When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed. - -The default idle timeout is 5 minutes, which can be set in the cluster config. This is to prevent excessive node churn which could impact performance and increase costs (in AWS / GCP there is a minimum billing charge of 1 minute per instance, after which usage is billed by the second). - - Questions or Issues? -------------------- diff --git a/doc/source/index.rst b/doc/source/index.rst index fb47bce94..e25bf380d 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -141,6 +141,7 @@ Academic Papers cluster/index.rst cluster/launcher.rst + cluster/autoscaling.rst cluster/cloud.rst cluster/deploy.rst diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index 345a9ccb6..89f815acb 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -42,7 +42,7 @@ docker: # usage. For example, if a cluster of 10 nodes is 100% busy and # target_utilization is 0.8, it would resize the cluster to 13. This fraction # can be decreased to increase the aggressiveness of upscaling. -# This value must be less than 1.0 for scaling to happen. +# This max value allowed is 1.0, which is the most conservative setting. target_utilization_fraction: 0.8 # If a node is idle for this many minutes, it will be removed. diff --git a/python/ray/autoscaler/aws/example-multi-node-type.yaml b/python/ray/autoscaler/aws/example-multi-node-type.yaml index a6f9a3118..ec3d5c37b 100644 --- a/python/ray/autoscaler/aws/example-multi-node-type.yaml +++ b/python/ray/autoscaler/aws/example-multi-node-type.yaml @@ -18,16 +18,11 @@ available_node_types: InstanceType: m4.xlarge resources: {"CPU": 4} max_workers: 5 - cpu_4_spot: - node_config: - InstanceType: m4.xlarge - InstanceMarketOptions: - MarketType: spot - resources: {"CPU": 4} - max_workers: 20 - cpu_16_ondemand: + cpu_16_spot: node_config: InstanceType: m4.4xlarge + InstanceMarketOptions: + MarketType: spot resources: {"CPU": 16, "Custom1": 1} max_workers: 10 gpu_1_ondemand: @@ -35,17 +30,21 @@ available_node_types: InstanceType: p2.xlarge resources: {"CPU": 4, "GPU": 1, "Custom2": 2} max_workers: 4 + worker_setup_commands: + - pip install tensorflow-gpu # Example command. gpu_8_ondemand: node_config: InstanceType: p2.8xlarge resources: {"CPU": 32, "GPU": 8} max_workers: 2 + worker_setup_commands: + - pip install tensorflow-gpu # Example command. # Specify the node type of the head node (as configured above). head_node_type: cpu_4_ondemand # Specify the default type of the worker node (as configured above). -worker_default_node_type: cpu_4_spot +worker_default_node_type: cpu_16_spot # The default settings for the head node. This will be merged with the per-node # type configs given above. diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/resource_demand_scheduler.py index 908bb14ea..f1b6baeba 100644 --- a/python/ray/autoscaler/resource_demand_scheduler.py +++ b/python/ray/autoscaler/resource_demand_scheduler.py @@ -1,3 +1,12 @@ +"""Implements multi-node-type autoscaling. + +This file implements an autoscaling algorithm that is aware of multiple node +types (e.g., example-multi-node-type.yaml). The Ray autoscaler will pass in +a vector of resource shape demands, and the resource demand scheduler will +return a list of node types that can satisfy the demands given constraints +(i.e., reverse bin packing). +""" + import copy import numpy as np import logging @@ -30,18 +39,43 @@ class ResourceDemandScheduler: self.node_types = node_types self.max_workers = max_workers - def debug_string(self, nodes: List[NodeID], - pending_nodes: Dict[NodeID, int]) -> str: + # TODO(ekl) take into account existing utilization of node resources. We + # should subtract these from node resources prior to running bin packing. + def get_nodes_to_launch(self, nodes: List[NodeID], + pending_nodes: Dict[NodeType, int], + resource_demands: List[ResourceDict] + ) -> List[Tuple[NodeType, int]]: + """Given resource demands, return node types to add to the cluster. + + This method: + (1) calculates the resources present in the cluster. + (2) calculates the unfulfilled resource bundles. + (3) calculates which nodes need to be launched to fulfill all + the bundle requests, subject to max_worker constraints. + + Args: + nodes: List of existing nodes in the cluster. + pending_nodes: Summary of node types currently being launched. + resource_demands: Vector of resource demands from the scheduler. + """ + + if resource_demands is None: + logger.info("No resource demands") + return [] + node_resources, node_type_counts = self.calculate_node_resources( nodes, pending_nodes) + logger.info("Cluster resources: {}".format(node_resources)) + logger.info("Node counts: {}".format(node_type_counts)) - out = "Worker node types:" - for node_type, count in node_type_counts.items(): - out += "\n - {}: {}".format(node_type, count) - if pending_nodes.get(node_type): - out += " ({} pending)".format(pending_nodes[node_type]) + unfulfilled = get_bin_pack_residual(node_resources, resource_demands) + logger.info("Resource demands: {}".format(resource_demands)) + logger.info("Unfulfilled demands: {}".format(unfulfilled)) - return out + nodes = get_nodes_for(self.node_types, node_type_counts, + self.max_workers - len(nodes), unfulfilled) + logger.info("Node requests: {}".format(nodes)) + return nodes def calculate_node_resources( self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int] @@ -73,36 +107,18 @@ class ResourceDemandScheduler: return node_resources, node_type_counts - def get_nodes_to_launch(self, nodes: List[NodeID], - pending_nodes: Dict[NodeType, int], - resource_demands: List[ResourceDict] - ) -> List[Tuple[NodeType, int]]: - """Get a list of node types that should be added to the cluster. - - This method: - (1) calculates the resources present in the cluster. - (2) calculates the unfulfilled resource bundles. - (3) calculates which nodes need to be launched to fulfill all - the bundle requests, subject to max_worker constraints. - """ - - if resource_demands is None: - logger.info("No resource demands") - return [] - + def debug_string(self, nodes: List[NodeID], + pending_nodes: Dict[NodeID, int]) -> str: node_resources, node_type_counts = self.calculate_node_resources( nodes, pending_nodes) - logger.info("Cluster resources: {}".format(node_resources)) - logger.info("Node counts: {}".format(node_type_counts)) - unfulfilled = get_bin_pack_residual(node_resources, resource_demands) - logger.info("Resource demands: {}".format(resource_demands)) - logger.info("Unfulfilled demands: {}".format(unfulfilled)) + out = "Worker node types:" + for node_type, count in node_type_counts.items(): + out += "\n - {}: {}".format(node_type, count) + if pending_nodes.get(node_type): + out += " ({} pending)".format(pending_nodes[node_type]) - nodes = get_nodes_for(self.node_types, node_type_counts, - self.max_workers - len(nodes), unfulfilled) - logger.info("Node requests: {}".format(nodes)) - return nodes + return out def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],