[autoscaler] fix the autoscaling bug for continuously launching failed nodes (#11714)

This commit is contained in:
Ameer Haj Ali
2020-10-30 23:12:06 +02:00
committed by GitHub
parent 8816d34541
commit 7aade469d0
2 changed files with 60 additions and 51 deletions
@@ -17,8 +17,7 @@ from typing import List, Dict
from ray.autoscaler.node_provider import NodeProvider
from ray.gcs_utils import PlacementGroupTableData
from ray.core.generated.common_pb2 import PlacementStrategy
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, \
STATUS_UPDATE_FAILED, STATUS_UP_TO_DATE, TAG_RAY_NODE_STATUS
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED
logger = logging.getLogger(__name__)
@@ -31,9 +30,12 @@ NodeTypeConfigDict = str
# e.g., {"GPU": 1}.
ResourceDict = Dict[str, Number]
# e.g., IP address of the node.
# e.g., "node-1".
NodeID = str
# e.g., "127.0.0.1".
NodeIP = str
class ResourceDemandScheduler:
def __init__(self, provider: NodeProvider,
@@ -46,7 +48,7 @@ class ResourceDemandScheduler:
def get_nodes_to_launch(
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict],
usage_by_ip: Dict[str, ResourceDict],
usage_by_ip: Dict[NodeIP, ResourceDict],
pending_placement_groups: List[PlacementGroupTableData]
) -> Dict[NodeType, int]:
"""Given resource demands, return node types to add to the cluster.
@@ -111,14 +113,14 @@ class ResourceDemandScheduler:
# Limit the number of concurrent launches
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch(
total_nodes_to_add, nodes, pending_nodes)
total_nodes_to_add, usage_by_ip.keys(), nodes, pending_nodes)
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
def _get_concurrent_resource_demand_to_launch(
self, to_launch: Dict[NodeType, int],
non_terminated_nodes: List[NodeID],
connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID],
pending_launches_nodes: Dict[NodeType, int]
) -> Dict[NodeType, int]:
"""Updates the max concurrent resources to launch for each node type.
@@ -133,7 +135,9 @@ class ResourceDemandScheduler:
to-be-launched nodes to max(5, frac * running_nodes[node_type]).
Args:
to_launch: Number of nodes to launch based on resource demand.
to_launch: List of number of nodes to launch based on resource
demand for every node type.
connected_nodes: Running nodes (from LoadMetrics).
non_terminated_nodes: Non terminated nodes (pending/running).
pending_launches_nodes: Nodes that are in the launch queue.
Returns:
@@ -145,7 +149,7 @@ class ResourceDemandScheduler:
updated_nodes_to_launch = {}
running_nodes, pending_nodes = \
self._separate_running_and_pending_nodes(
non_terminated_nodes
non_terminated_nodes, connected_nodes,
)
for node_type in to_launch:
# Enforce here max allowed pending nodes to be frac of total
@@ -170,8 +174,9 @@ class ResourceDemandScheduler:
def _separate_running_and_pending_nodes(
self,
non_terminated_nodes: List[NodeID],
connected_nodes: List[NodeIP],
) -> (Dict[NodeType, int], Dict[NodeType, int]):
"""Receives non terminated nodes & splits them to pending & running."""
"""Splits connected and non terminated nodes to pending & running."""
running_nodes = collections.defaultdict(int)
pending_nodes = collections.defaultdict(int)
@@ -179,10 +184,10 @@ class ResourceDemandScheduler:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
status = tags.get(TAG_RAY_NODE_STATUS)
if status == STATUS_UP_TO_DATE:
node_ip = self.provider.internal_ip(node_id)
if node_ip in connected_nodes:
running_nodes[node_type] += 1
elif status != STATUS_UPDATE_FAILED:
else:
pending_nodes[node_type] += 1
return running_nodes, pending_nodes
@@ -468,40 +468,39 @@ def test_get_concurrent_resource_demand_to_launch():
# Sanity check.
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch({}, [], {})
scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {})
assert updated_to_launch == {}
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 1)
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.large",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 2)
# All nodes so far are pending/launching here.
to_launch = {"p2.8xlarge": 4, "m4.large": 40}
non_terminated_nodes = provider.non_terminated_nodes({})
pending_launches_nodes = {"p2.8xlarge": 1, "m4.large": 1}
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
connected_nodes = [] # All the non_terminated_nodes are not connected yet.
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
# Note: we have 2 pending/launching gpus, 3 pending/launching cpus,
# 0 running gpu, and 0 running cpus.
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2}
# This starts the min workers only, so we have no more pending workers.
# The workers here are either running or in pending_launches_nodes,
# which is "launching".
for node_id in non_terminated_nodes:
provider.set_node_tags(node_id,
{TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# The workers here are either running (connected) or in
# pending_launches_nodes (i.e., launching).
connected_nodes = [
provider.internal_ip(node_id) for node_id in non_terminated_nodes
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
# Note that here we have 1 launching gpu, 1 launching cpu,
# 1 running gpu, and 2 running cpus.
assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4}
@@ -510,31 +509,30 @@ def test_get_concurrent_resource_demand_to_launch():
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 5)
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.large",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 5)
# Continue scaling.
non_terminated_nodes = provider.non_terminated_nodes({})
to_launch = {"m4.large": 36} # No more gpus are necessary
pending_launches_nodes = {} # No pending launches
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
# Note: we have 5 pending cpus. So we are not allowed to start any.
# Still only 2 running cpus.
assert updated_to_launch == {}
for node_id in non_terminated_nodes:
provider.set_node_tags(node_id,
{TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# All the non_terminated_nodes are connected here.
connected_nodes = [
provider.internal_ip(node_id) for node_id in non_terminated_nodes
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
# Note: that here we have 7 running cpus and nothing pending/launching.
assert updated_to_launch == {"m4.large": 7}
@@ -542,26 +540,26 @@ def test_get_concurrent_resource_demand_to_launch():
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.large",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 7)
# Continue scaling.
non_terminated_nodes = provider.non_terminated_nodes({})
to_launch = {"m4.large": 29}
pending_launches_nodes = {"m4.large": 1}
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
# Note: we have 8 pending/launching cpus and only 7 running.
# So we should not launch anything (8 < 7).
assert updated_to_launch == {}
for node_id in non_terminated_nodes:
provider.set_node_tags(node_id,
{TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch(
to_launch, non_terminated_nodes, pending_launches_nodes)
# All the non_terminated_nodes are connected here.
connected_nodes = [
provider.internal_ip(node_id) for node_id in non_terminated_nodes
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
# Note: that here we have 14 running cpus and 1 launching.
assert updated_to_launch == {"m4.large": 13}
@@ -574,7 +572,7 @@ def test_get_nodes_to_launch_max_launch_concurrency():
scheduler = ResourceDemandScheduler(provider, new_types, 30)
to_launch = scheduler.get_nodes_to_launch([], {}, [], [], [])
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [])
# Respects min_workers despite concurrency limitation.
assert to_launch == {"p2.8xlarge": 4}
@@ -583,7 +581,11 @@ def test_get_nodes_to_launch_max_launch_concurrency():
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
}, 1)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
# Trying to force here that the node shows in nodes but not connected yet
# and hence does not show up in LoadMetrics (or utilizations).
ips = provider.non_terminated_node_ips({
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
})
utilizations = {ip: {"GPU": 8} for ip in ips}
launching_nodes = {"p2.8xlarge": 1}
# requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running}
@@ -598,10 +600,12 @@ def test_get_nodes_to_launch_max_launch_concurrency():
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 8)
nodes = provider.non_terminated_nodes({})
ips = provider.non_terminated_node_ips({})
ips = provider.non_terminated_node_ips({
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
})
utilizations = {ip: {"GPU": 8} for ip in ips}
launching_nodes = {"p2.8xlarge": 1}
# requires 17 p2.8xls (currently 1 pending, 1 launching, 8 running}
# Requires additional 17 p2.8xls (now 1 pending, 1 launching, 8 running}
demands = [{"GPU": 8}] * (len(utilizations) + 15)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations, [])