From 7aade469d0d203bb913f77e4f2f20d8f956f582b Mon Sep 17 00:00:00 2001 From: Ameer Haj Ali Date: Fri, 30 Oct 2020 23:12:06 +0200 Subject: [PATCH] [autoscaler] fix the autoscaling bug for continuously launching failed nodes (#11714) --- .../_private/resource_demand_scheduler.py | 29 ++++--- .../tests/test_resource_demand_scheduler.py | 82 ++++++++++--------- 2 files changed, 60 insertions(+), 51 deletions(-) diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index a5edcd001..865b2e215 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -17,8 +17,7 @@ from typing import List, Dict from ray.autoscaler.node_provider import NodeProvider from ray.gcs_utils import PlacementGroupTableData from ray.core.generated.common_pb2 import PlacementStrategy -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, \ - STATUS_UPDATE_FAILED, STATUS_UP_TO_DATE, TAG_RAY_NODE_STATUS +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED logger = logging.getLogger(__name__) @@ -31,9 +30,12 @@ NodeTypeConfigDict = str # e.g., {"GPU": 1}. ResourceDict = Dict[str, Number] -# e.g., IP address of the node. +# e.g., "node-1". NodeID = str +# e.g., "127.0.0.1". +NodeIP = str + class ResourceDemandScheduler: def __init__(self, provider: NodeProvider, @@ -46,7 +48,7 @@ class ResourceDemandScheduler: def get_nodes_to_launch( self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], resource_demands: List[ResourceDict], - usage_by_ip: Dict[str, ResourceDict], + usage_by_ip: Dict[NodeIP, ResourceDict], pending_placement_groups: List[PlacementGroupTableData] ) -> Dict[NodeType, int]: """Given resource demands, return node types to add to the cluster. @@ -111,14 +113,14 @@ class ResourceDemandScheduler: # Limit the number of concurrent launches total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( - total_nodes_to_add, nodes, pending_nodes) + total_nodes_to_add, usage_by_ip.keys(), nodes, pending_nodes) logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add def _get_concurrent_resource_demand_to_launch( self, to_launch: Dict[NodeType, int], - non_terminated_nodes: List[NodeID], + connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID], pending_launches_nodes: Dict[NodeType, int] ) -> Dict[NodeType, int]: """Updates the max concurrent resources to launch for each node type. @@ -133,7 +135,9 @@ class ResourceDemandScheduler: to-be-launched nodes to max(5, frac * running_nodes[node_type]). Args: - to_launch: Number of nodes to launch based on resource demand. + to_launch: List of number of nodes to launch based on resource + demand for every node type. + connected_nodes: Running nodes (from LoadMetrics). non_terminated_nodes: Non terminated nodes (pending/running). pending_launches_nodes: Nodes that are in the launch queue. Returns: @@ -145,7 +149,7 @@ class ResourceDemandScheduler: updated_nodes_to_launch = {} running_nodes, pending_nodes = \ self._separate_running_and_pending_nodes( - non_terminated_nodes + non_terminated_nodes, connected_nodes, ) for node_type in to_launch: # Enforce here max allowed pending nodes to be frac of total @@ -170,8 +174,9 @@ class ResourceDemandScheduler: def _separate_running_and_pending_nodes( self, non_terminated_nodes: List[NodeID], + connected_nodes: List[NodeIP], ) -> (Dict[NodeType, int], Dict[NodeType, int]): - """Receives non terminated nodes & splits them to pending & running.""" + """Splits connected and non terminated nodes to pending & running.""" running_nodes = collections.defaultdict(int) pending_nodes = collections.defaultdict(int) @@ -179,10 +184,10 @@ class ResourceDemandScheduler: tags = self.provider.node_tags(node_id) if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] - status = tags.get(TAG_RAY_NODE_STATUS) - if status == STATUS_UP_TO_DATE: + node_ip = self.provider.internal_ip(node_id) + if node_ip in connected_nodes: running_nodes[node_type] += 1 - elif status != STATUS_UPDATE_FAILED: + else: pending_nodes[node_type] += 1 return running_nodes, pending_nodes diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 9673fc817..8239e1280 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -468,40 +468,39 @@ def test_get_concurrent_resource_demand_to_launch(): # Sanity check. updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch({}, [], {}) + scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}) assert updated_to_launch == {} provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", TAG_RAY_NODE_KIND: NODE_KIND_WORKER, - TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED }, 1) provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "m4.large", TAG_RAY_NODE_KIND: NODE_KIND_WORKER, - TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED }, 2) # All nodes so far are pending/launching here. to_launch = {"p2.8xlarge": 4, "m4.large": 40} non_terminated_nodes = provider.non_terminated_nodes({}) pending_launches_nodes = {"p2.8xlarge": 1, "m4.large": 1} - updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch( - to_launch, non_terminated_nodes, pending_launches_nodes) + connected_nodes = [] # All the non_terminated_nodes are not connected yet. + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, connected_nodes, non_terminated_nodes, + pending_launches_nodes) # Note: we have 2 pending/launching gpus, 3 pending/launching cpus, # 0 running gpu, and 0 running cpus. assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2} # This starts the min workers only, so we have no more pending workers. - # The workers here are either running or in pending_launches_nodes, - # which is "launching". - for node_id in non_terminated_nodes: - provider.set_node_tags(node_id, - {TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) - updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch( - to_launch, non_terminated_nodes, pending_launches_nodes) + # The workers here are either running (connected) or in + # pending_launches_nodes (i.e., launching). + connected_nodes = [ + provider.internal_ip(node_id) for node_id in non_terminated_nodes + ] + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, connected_nodes, non_terminated_nodes, + pending_launches_nodes) # Note that here we have 1 launching gpu, 1 launching cpu, # 1 running gpu, and 2 running cpus. assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4} @@ -510,31 +509,30 @@ def test_get_concurrent_resource_demand_to_launch(): provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", TAG_RAY_NODE_KIND: NODE_KIND_WORKER, - TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED }, 5) provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "m4.large", TAG_RAY_NODE_KIND: NODE_KIND_WORKER, - TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED }, 5) # Continue scaling. non_terminated_nodes = provider.non_terminated_nodes({}) to_launch = {"m4.large": 36} # No more gpus are necessary pending_launches_nodes = {} # No pending launches - updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch( - to_launch, non_terminated_nodes, pending_launches_nodes) + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, connected_nodes, non_terminated_nodes, + pending_launches_nodes) # Note: we have 5 pending cpus. So we are not allowed to start any. # Still only 2 running cpus. assert updated_to_launch == {} - for node_id in non_terminated_nodes: - provider.set_node_tags(node_id, - {TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) - updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch( - to_launch, non_terminated_nodes, pending_launches_nodes) + # All the non_terminated_nodes are connected here. + connected_nodes = [ + provider.internal_ip(node_id) for node_id in non_terminated_nodes + ] + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, connected_nodes, non_terminated_nodes, + pending_launches_nodes) # Note: that here we have 7 running cpus and nothing pending/launching. assert updated_to_launch == {"m4.large": 7} @@ -542,26 +540,26 @@ def test_get_concurrent_resource_demand_to_launch(): provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "m4.large", TAG_RAY_NODE_KIND: NODE_KIND_WORKER, - TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED }, 7) # Continue scaling. non_terminated_nodes = provider.non_terminated_nodes({}) to_launch = {"m4.large": 29} pending_launches_nodes = {"m4.large": 1} - updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch( - to_launch, non_terminated_nodes, pending_launches_nodes) + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, connected_nodes, non_terminated_nodes, + pending_launches_nodes) # Note: we have 8 pending/launching cpus and only 7 running. # So we should not launch anything (8 < 7). assert updated_to_launch == {} - for node_id in non_terminated_nodes: - provider.set_node_tags(node_id, - {TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) - updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch( - to_launch, non_terminated_nodes, pending_launches_nodes) + # All the non_terminated_nodes are connected here. + connected_nodes = [ + provider.internal_ip(node_id) for node_id in non_terminated_nodes + ] + updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( + to_launch, connected_nodes, non_terminated_nodes, + pending_launches_nodes) # Note: that here we have 14 running cpus and 1 launching. assert updated_to_launch == {"m4.large": 13} @@ -574,7 +572,7 @@ def test_get_nodes_to_launch_max_launch_concurrency(): scheduler = ResourceDemandScheduler(provider, new_types, 30) - to_launch = scheduler.get_nodes_to_launch([], {}, [], [], []) + to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, []) # Respects min_workers despite concurrency limitation. assert to_launch == {"p2.8xlarge": 4} @@ -583,7 +581,11 @@ def test_get_nodes_to_launch_max_launch_concurrency(): TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED }, 1) nodes = provider.non_terminated_nodes({}) - ips = provider.non_terminated_node_ips({}) + # Trying to force here that the node shows in nodes but not connected yet + # and hence does not show up in LoadMetrics (or utilizations). + ips = provider.non_terminated_node_ips({ + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + }) utilizations = {ip: {"GPU": 8} for ip in ips} launching_nodes = {"p2.8xlarge": 1} # requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running} @@ -598,10 +600,12 @@ def test_get_nodes_to_launch_max_launch_concurrency(): TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE }, 8) nodes = provider.non_terminated_nodes({}) - ips = provider.non_terminated_node_ips({}) + ips = provider.non_terminated_node_ips({ + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + }) utilizations = {ip: {"GPU": 8} for ip in ips} launching_nodes = {"p2.8xlarge": 1} - # requires 17 p2.8xls (currently 1 pending, 1 launching, 8 running} + # Requires additional 17 p2.8xls (now 1 pending, 1 launching, 8 running} demands = [{"GPU": 8}] * (len(utilizations) + 15) to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, utilizations, [])