mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 01:00:10 +08:00
[autoscaler] Add documentation for multi node type autoscaling (#10405)
This commit is contained in:
@@ -42,7 +42,7 @@ docker:
|
||||
# usage. For example, if a cluster of 10 nodes is 100% busy and
|
||||
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
|
||||
# can be decreased to increase the aggressiveness of upscaling.
|
||||
# This value must be less than 1.0 for scaling to happen.
|
||||
# This max value allowed is 1.0, which is the most conservative setting.
|
||||
target_utilization_fraction: 0.8
|
||||
|
||||
# If a node is idle for this many minutes, it will be removed.
|
||||
|
||||
@@ -18,16 +18,11 @@ available_node_types:
|
||||
InstanceType: m4.xlarge
|
||||
resources: {"CPU": 4}
|
||||
max_workers: 5
|
||||
cpu_4_spot:
|
||||
node_config:
|
||||
InstanceType: m4.xlarge
|
||||
InstanceMarketOptions:
|
||||
MarketType: spot
|
||||
resources: {"CPU": 4}
|
||||
max_workers: 20
|
||||
cpu_16_ondemand:
|
||||
cpu_16_spot:
|
||||
node_config:
|
||||
InstanceType: m4.4xlarge
|
||||
InstanceMarketOptions:
|
||||
MarketType: spot
|
||||
resources: {"CPU": 16, "Custom1": 1}
|
||||
max_workers: 10
|
||||
gpu_1_ondemand:
|
||||
@@ -35,17 +30,21 @@ available_node_types:
|
||||
InstanceType: p2.xlarge
|
||||
resources: {"CPU": 4, "GPU": 1, "Custom2": 2}
|
||||
max_workers: 4
|
||||
worker_setup_commands:
|
||||
- pip install tensorflow-gpu # Example command.
|
||||
gpu_8_ondemand:
|
||||
node_config:
|
||||
InstanceType: p2.8xlarge
|
||||
resources: {"CPU": 32, "GPU": 8}
|
||||
max_workers: 2
|
||||
worker_setup_commands:
|
||||
- pip install tensorflow-gpu # Example command.
|
||||
|
||||
# Specify the node type of the head node (as configured above).
|
||||
head_node_type: cpu_4_ondemand
|
||||
|
||||
# Specify the default type of the worker node (as configured above).
|
||||
worker_default_node_type: cpu_4_spot
|
||||
worker_default_node_type: cpu_16_spot
|
||||
|
||||
# The default settings for the head node. This will be merged with the per-node
|
||||
# type configs given above.
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
"""Implements multi-node-type autoscaling.
|
||||
|
||||
This file implements an autoscaling algorithm that is aware of multiple node
|
||||
types (e.g., example-multi-node-type.yaml). The Ray autoscaler will pass in
|
||||
a vector of resource shape demands, and the resource demand scheduler will
|
||||
return a list of node types that can satisfy the demands given constraints
|
||||
(i.e., reverse bin packing).
|
||||
"""
|
||||
|
||||
import copy
|
||||
import numpy as np
|
||||
import logging
|
||||
@@ -30,18 +39,43 @@ class ResourceDemandScheduler:
|
||||
self.node_types = node_types
|
||||
self.max_workers = max_workers
|
||||
|
||||
def debug_string(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeID, int]) -> str:
|
||||
# TODO(ekl) take into account existing utilization of node resources. We
|
||||
# should subtract these from node resources prior to running bin packing.
|
||||
def get_nodes_to_launch(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeType, int],
|
||||
resource_demands: List[ResourceDict]
|
||||
) -> List[Tuple[NodeType, int]]:
|
||||
"""Given resource demands, return node types to add to the cluster.
|
||||
|
||||
This method:
|
||||
(1) calculates the resources present in the cluster.
|
||||
(2) calculates the unfulfilled resource bundles.
|
||||
(3) calculates which nodes need to be launched to fulfill all
|
||||
the bundle requests, subject to max_worker constraints.
|
||||
|
||||
Args:
|
||||
nodes: List of existing nodes in the cluster.
|
||||
pending_nodes: Summary of node types currently being launched.
|
||||
resource_demands: Vector of resource demands from the scheduler.
|
||||
"""
|
||||
|
||||
if resource_demands is None:
|
||||
logger.info("No resource demands")
|
||||
return []
|
||||
|
||||
node_resources, node_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes)
|
||||
logger.info("Cluster resources: {}".format(node_resources))
|
||||
logger.info("Node counts: {}".format(node_type_counts))
|
||||
|
||||
out = "Worker node types:"
|
||||
for node_type, count in node_type_counts.items():
|
||||
out += "\n - {}: {}".format(node_type, count)
|
||||
if pending_nodes.get(node_type):
|
||||
out += " ({} pending)".format(pending_nodes[node_type])
|
||||
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
|
||||
logger.info("Resource demands: {}".format(resource_demands))
|
||||
logger.info("Unfulfilled demands: {}".format(unfulfilled))
|
||||
|
||||
return out
|
||||
nodes = get_nodes_for(self.node_types, node_type_counts,
|
||||
self.max_workers - len(nodes), unfulfilled)
|
||||
logger.info("Node requests: {}".format(nodes))
|
||||
return nodes
|
||||
|
||||
def calculate_node_resources(
|
||||
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]
|
||||
@@ -73,36 +107,18 @@ class ResourceDemandScheduler:
|
||||
|
||||
return node_resources, node_type_counts
|
||||
|
||||
def get_nodes_to_launch(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeType, int],
|
||||
resource_demands: List[ResourceDict]
|
||||
) -> List[Tuple[NodeType, int]]:
|
||||
"""Get a list of node types that should be added to the cluster.
|
||||
|
||||
This method:
|
||||
(1) calculates the resources present in the cluster.
|
||||
(2) calculates the unfulfilled resource bundles.
|
||||
(3) calculates which nodes need to be launched to fulfill all
|
||||
the bundle requests, subject to max_worker constraints.
|
||||
"""
|
||||
|
||||
if resource_demands is None:
|
||||
logger.info("No resource demands")
|
||||
return []
|
||||
|
||||
def debug_string(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeID, int]) -> str:
|
||||
node_resources, node_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes)
|
||||
logger.info("Cluster resources: {}".format(node_resources))
|
||||
logger.info("Node counts: {}".format(node_type_counts))
|
||||
|
||||
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
|
||||
logger.info("Resource demands: {}".format(resource_demands))
|
||||
logger.info("Unfulfilled demands: {}".format(unfulfilled))
|
||||
out = "Worker node types:"
|
||||
for node_type, count in node_type_counts.items():
|
||||
out += "\n - {}: {}".format(node_type, count)
|
||||
if pending_nodes.get(node_type):
|
||||
out += " ({} pending)".format(pending_nodes[node_type])
|
||||
|
||||
nodes = get_nodes_for(self.node_types, node_type_counts,
|
||||
self.max_workers - len(nodes), unfulfilled)
|
||||
logger.info("Node requests: {}".format(nodes))
|
||||
return nodes
|
||||
return out
|
||||
|
||||
|
||||
def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
|
||||
Reference in New Issue
Block a user