diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index a4d8b3165..45d573a5f 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -216,7 +216,6 @@ class StandardAutoscaler: self.pending_launches.breakdown(), resource_demand_vector, self.load_metrics.get_resource_utilization())) - # TODO(ekl) also enforce max launch concurrency here? for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 751b18443..dafe2afe5 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -14,7 +14,8 @@ import collections from typing import List, Dict from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED, \ + STATUS_UPDATE_FAILED, STATUS_UP_TO_DATE, TAG_RAY_NODE_STATUS logger = logging.getLogger(__name__) @@ -88,9 +89,83 @@ class ResourceDemandScheduler: if nodes_to_add > 0: total_nodes_to_add[node_type] = nodes_to_add + # Limit the number of concurrent launches + total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( + total_nodes_to_add, nodes, pending_nodes) + logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add + def _get_concurrent_resource_demand_to_launch( + self, to_launch: Dict[NodeType, int], + non_terminated_nodes: List[NodeID], + pending_launches_nodes: Dict[NodeType, int] + ) -> Dict[NodeType, int]: + """Updates the max concurrent resources to launch for each node type. + + Given the current nodes that should be launched, the non terminated + nodes (running and pending) and the pending to be launched nodes. This + method calculates the maximum number of nodes to launch concurrently + for each node type as follows: + 1) Calculates the running nodes. + 2) Calculates the pending nodes and gets the launching nodes. + 3) Limits the total number of pending + currently-launching + + to-be-launched nodes to max(5, frac * running_nodes[node_type]). + + Args: + to_launch: Number of nodes to launch based on resource demand. + non_terminated_nodes: Non terminated nodes (pending/running). + pending_launches_nodes: Nodes that are in the launch queue. + Returns: + Dict[NodeType, int]: Maximum number of nodes to launch for each + node type. + """ + # TODO(ameer): Consider making frac configurable. + frac = 1 + updated_nodes_to_launch = {} + running_nodes, pending_nodes = \ + self._separate_running_and_pending_nodes( + non_terminated_nodes + ) + for node_type in to_launch: + # Enforce here max allowed pending nodes to be frac of total + # running nodes. + max_allowed_pending_nodes = max( + 5, int(frac * running_nodes[node_type])) + total_pending_nodes = pending_launches_nodes.get( + node_type, 0) + pending_nodes[node_type] + + # Allow more nodes if this is to respect min_workers constraint. + nodes_to_add = max( + max_allowed_pending_nodes - total_pending_nodes, + self.node_types[node_type].get("min_workers", 0) - + total_pending_nodes - running_nodes[node_type]) + + if nodes_to_add > 0: + updated_nodes_to_launch[node_type] = min( + nodes_to_add, to_launch[node_type]) + + return updated_nodes_to_launch + + def _separate_running_and_pending_nodes( + self, + non_terminated_nodes: List[NodeID], + ) -> (Dict[NodeType, int], Dict[NodeType, int]): + """Receives non terminated nodes & splits them to pending & running.""" + + running_nodes = collections.defaultdict(int) + pending_nodes = collections.defaultdict(int) + for node_id in non_terminated_nodes: + tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in tags: + node_type = tags[TAG_RAY_USER_NODE_TYPE] + status = tags.get(TAG_RAY_NODE_STATUS) + if status == STATUS_UP_TO_DATE: + running_nodes[node_type] += 1 + elif status != STATUS_UPDATE_FAILED: + pending_nodes[node_type] += 1 + return running_nodes, pending_nodes + def calculate_node_resources( self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int], usage_by_ip: Dict[str, ResourceDict] diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 680421e87..33c822b84 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -18,7 +18,8 @@ from ray.autoscaler._private.resource_demand_scheduler import \ _utilization_score, _add_min_workers_nodes, \ get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \ - NODE_KIND_WORKER + NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \ + STATUS_UP_TO_DATE, STATUS_UNINITIALIZED from ray.test_utils import same_elements from time import sleep @@ -321,6 +322,161 @@ def test_calculate_node_resources(): assert to_launch == {"p2.8xlarge": 1} +def test_get_concurrent_resource_demand_to_launch(): + node_types = copy.deepcopy(TYPES_A) + node_types["p2.8xlarge"]["min_workers"] = 1 + node_types["p2.8xlarge"]["max_workers"] = 10 + node_types["m4.large"]["min_workers"] = 2 + node_types["m4.large"]["max_workers"] = 100 + provider = MockProvider() + scheduler = ResourceDemandScheduler(provider, node_types, 200) + # Sanity check. + assert len(provider.non_terminated_nodes({})) == 0 + + # Sanity check. + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch({}, [], {}) + assert updated_to_launch == {} + + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED + }, 1) + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "m4.large", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED + }, 2) + + # All nodes so far are pending/launching here. + to_launch = {"p2.8xlarge": 4, "m4.large": 40} + non_terminated_nodes = provider.non_terminated_nodes({}) + pending_launches_nodes = {"p2.8xlarge": 1, "m4.large": 1} + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch( + to_launch, non_terminated_nodes, pending_launches_nodes) + # Note: we have 2 pending/launching gpus, 3 pending/launching cpus, + # 0 running gpu, and 0 running cpus. + assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2} + + # This starts the min workers only, so we have no more pending workers. + # The workers here are either running or in pending_launches_nodes, + # which is "launching". + for node_id in non_terminated_nodes: + provider.set_node_tags(node_id, + {TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch( + to_launch, non_terminated_nodes, pending_launches_nodes) + # Note that here we have 1 launching gpu, 1 launching cpu, + # 1 running gpu, and 2 running cpus. + assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4} + + # Launch the nodes. Note, after create_node the node is pending. + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED + }, 5) + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "m4.large", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED + }, 5) + + # Continue scaling. + non_terminated_nodes = provider.non_terminated_nodes({}) + to_launch = {"m4.large": 36} # No more gpus are necessary + pending_launches_nodes = {} # No pending launches + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch( + to_launch, non_terminated_nodes, pending_launches_nodes) + # Note: we have 5 pending cpus. So we are not allowed to start any. + # Still only 2 running cpus. + assert updated_to_launch == {} + + for node_id in non_terminated_nodes: + provider.set_node_tags(node_id, + {TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch( + to_launch, non_terminated_nodes, pending_launches_nodes) + # Note: that here we have 7 running cpus and nothing pending/launching. + assert updated_to_launch == {"m4.large": 7} + + # Launch the nodes. Note, after create_node the node is pending. + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "m4.large", + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED + }, 7) + + # Continue scaling. + non_terminated_nodes = provider.non_terminated_nodes({}) + to_launch = {"m4.large": 29} + pending_launches_nodes = {"m4.large": 1} + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch( + to_launch, non_terminated_nodes, pending_launches_nodes) + # Note: we have 8 pending/launching cpus and only 7 running. + # So we should not launch anything (8 < 7). + assert updated_to_launch == {} + + for node_id in non_terminated_nodes: + provider.set_node_tags(node_id, + {TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) + updated_to_launch = \ + scheduler._get_concurrent_resource_demand_to_launch( + to_launch, non_terminated_nodes, pending_launches_nodes) + # Note: that here we have 14 running cpus and 1 launching. + assert updated_to_launch == {"m4.large": 13} + + +def test_get_nodes_to_launch_max_launch_concurrency(): + provider = MockProvider() + new_types = copy.deepcopy(TYPES_A) + new_types["p2.8xlarge"]["min_workers"] = 4 + new_types["p2.8xlarge"]["max_workers"] = 40 + + scheduler = ResourceDemandScheduler(provider, new_types, 30) + + to_launch = scheduler.get_nodes_to_launch([], {}, [], []) + # Respects min_workers despite concurrency limitation. + assert to_launch == {"p2.8xlarge": 4} + + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED + }, 1) + nodes = provider.non_terminated_nodes({}) + ips = provider.non_terminated_node_ips({}) + utilizations = {ip: {"GPU": 8} for ip in ips} + launching_nodes = {"p2.8xlarge": 1} + # requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running} + demands = [{"GPU": 8}] * (len(utilizations) + 40) + to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, + utilizations) + # Enforces max launch to 5 when < 5 running. 2 are pending/launching. + assert to_launch == {"p2.8xlarge": 3} + + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + }, 8) + nodes = provider.non_terminated_nodes({}) + ips = provider.non_terminated_node_ips({}) + utilizations = {ip: {"GPU": 8} for ip in ips} + launching_nodes = {"p2.8xlarge": 1} + # requires 17 p2.8xls (currently 1 pending, 1 launching, 8 running} + demands = [{"GPU": 8}] * (len(utilizations) + 15) + to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands, + utilizations) + # We are allowed to launch up to 8 more since 8 are running. + # We already have 2 pending/launching, so only 6 remain. + assert to_launch == {"p2.8xlarge": 6} + + class LoadMetricsTest(unittest.TestCase): def testResourceDemandVector(self): lm = LoadMetrics()