diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 5b57010f6..2767c82b7 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -199,11 +199,15 @@ class StandardAutoscaler: self.resource_demand_scheduler.get_nodes_to_launch( self.provider.non_terminated_nodes(tag_filters={}), self.pending_launches.breakdown(), - resource_demand_vector)) + resource_demand_vector, + self.load_metrics.get_resource_utilization())) # TODO(ekl) also enforce max launch concurrency here? for node_type, count in to_launch: self.launch_new_node(count, node_type=node_type) + num_pending = self.pending_launches.value + nodes = self.workers() + # Launch additional nodes of the default type, if still needed. num_workers = len(nodes) + num_pending if num_workers < target_workers: @@ -509,7 +513,8 @@ class StandardAutoscaler: tmp += "\n" if self.resource_demand_scheduler: tmp += self.resource_demand_scheduler.debug_string( - nodes, self.pending_launches.breakdown()) + nodes, self.pending_launches.breakdown(), + self.load_metrics.get_resource_utilization()) if _internal_kv_initialized(): _internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True) logger.info(tmp) diff --git a/python/ray/autoscaler/load_metrics.py b/python/ray/autoscaler/load_metrics.py index 3d7253c9d..57399f303 100644 --- a/python/ray/autoscaler/load_metrics.py +++ b/python/ray/autoscaler/load_metrics.py @@ -16,13 +16,14 @@ class LoadMetrics: can be removed. """ - def __init__(self): + def __init__(self, local_ip=None): self.last_used_time_by_ip = {} self.last_heartbeat_time_by_ip = {} self.static_resources_by_ip = {} self.dynamic_resources_by_ip = {} self.resource_load_by_ip = {} - self.local_ip = services.get_node_ip_address() + self.local_ip = services.get_node_ip_address( + ) if local_ip is None else local_ip self.waiting_bundles = [] self.infeasible_bundles = [] @@ -98,7 +99,10 @@ class LoadMetrics: """ return self.static_resources_by_ip.values() - def get_resource_usage(self): + def get_resource_utilization(self): + return self.dynamic_resources_by_ip + + def _get_resource_usage(self): num_nodes = len(self.static_resources_by_ip) nodes_used = 0.0 num_nonidle = 0 @@ -145,7 +149,8 @@ class LoadMetrics: ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]) def _info(self): - nodes_used, resources_used, resources_total = self.get_resource_usage() + nodes_used, resources_used, resources_total = self._get_resource_usage( + ) now = time.time() idle_times = [now - t for t in self.last_used_time_by_ip.values()] diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/resource_demand_scheduler.py index f1b6baeba..1e39dca67 100644 --- a/python/ray/autoscaler/resource_demand_scheduler.py +++ b/python/ray/autoscaler/resource_demand_scheduler.py @@ -39,11 +39,10 @@ class ResourceDemandScheduler: self.node_types = node_types self.max_workers = max_workers - # TODO(ekl) take into account existing utilization of node resources. We - # should subtract these from node resources prior to running bin packing. def get_nodes_to_launch(self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], - resource_demands: List[ResourceDict] + resource_demands: List[ResourceDict], + usage_by_ip: Dict[str, ResourceDict] ) -> List[Tuple[NodeType, int]]: """Given resource demands, return node types to add to the cluster. @@ -64,7 +63,8 @@ class ResourceDemandScheduler: return [] node_resources, node_type_counts = self.calculate_node_resources( - nodes, pending_nodes) + nodes, pending_nodes, usage_by_ip) + logger.info("Cluster resources: {}".format(node_resources)) logger.info("Node counts: {}".format(node_type_counts)) @@ -72,34 +72,43 @@ class ResourceDemandScheduler: logger.info("Resource demands: {}".format(resource_demands)) logger.info("Unfulfilled demands: {}".format(unfulfilled)) - nodes = get_nodes_for(self.node_types, node_type_counts, - self.max_workers - len(nodes), unfulfilled) + nodes = get_nodes_for( + self.node_types, node_type_counts, + self.max_workers - len(nodes) - sum(pending_nodes.values()), + unfulfilled) logger.info("Node requests: {}".format(nodes)) return nodes def calculate_node_resources( - self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int] + self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int], + usage_by_ip: Dict[str, ResourceDict] ) -> (List[ResourceDict], Dict[NodeType, int]): """Returns node resource list and node type counts.""" node_resources = [] node_type_counts = collections.defaultdict(int) - def add_node(node_type): + def add_node(node_type, existing_resource_usages=None): if node_type not in self.node_types: raise RuntimeError("Missing entry for node_type {} in " "available_node_types config: {}".format( node_type, self.node_types)) # Careful not to include the same dict object multiple times. - node_resources.append( - copy.deepcopy(self.node_types[node_type]["resources"])) + available = copy.deepcopy(self.node_types[node_type]["resources"]) + if existing_resource_usages: + for resource, used in existing_resource_usages.items(): + available[resource] -= used + + node_resources.append(available) node_type_counts[node_type] += 1 for node_id in nodes: tags = self.provider.node_tags(node_id) if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] - add_node(node_type) + ip = self.provider.internal_ip(node_id) + resources = usage_by_ip.get(ip, {}) + add_node(node_type, resources) for node_type, count in pending_nodes.items(): for _ in range(count): @@ -108,9 +117,11 @@ class ResourceDemandScheduler: return node_resources, node_type_counts def debug_string(self, nodes: List[NodeID], - pending_nodes: Dict[NodeID, int]) -> str: + pending_nodes: Dict[NodeID, int], + usage_by_ip: Dict[str, ResourceDict]) -> str: + print(f"{usage_by_ip}") node_resources, node_type_counts = self.calculate_node_resources( - nodes, pending_nodes) + nodes, pending_nodes, usage_by_ip) out = "Worker node types:" for node_type, count in node_type_counts.items(): diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 8f910e56c..af5310ea4 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -76,7 +76,7 @@ def setup_monitor(address): def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): while True: monitor.process_messages() - resource_usage = monitor.load_metrics.get_resource_usage() + resource_usage = monitor.load_metrics._get_resource_usage() if "memory" in resource_usage[1]: del resource_usage[1]["memory"] diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 985c45551..23c2aab21 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -14,7 +14,7 @@ from ray.autoscaler.node_provider import NODE_PROVIDERS from ray.autoscaler.commands import get_or_create_head_node from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ - get_bin_pack_residual, get_nodes_for + get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler from ray.test_utils import same_elements from time import sleep @@ -163,6 +163,23 @@ def test_get_nodes_respects_max_limit(): }] * 10) == [("m4.large", 2)] +def test_get_nodes_to_launch_limits(): + provider = MockProvider() + scheduler = ResourceDemandScheduler(provider, TYPES_A, 3) + + provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) + + nodes = provider.non_terminated_nodes({}) + + ips = provider.non_terminated_node_ips({}) + utilizations = {ip: {"GPU": 8} for ip in ips} + + to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{ + "GPU": 8 + }] * 2, utilizations) + assert to_launch == [] + + class LoadMetricsTest(unittest.TestCase): def testResourceDemandVector(self): lm = LoadMetrics() @@ -260,6 +277,44 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) + def testScaleUpIgnoreUsed(self): + config = MULTI_WORKER_CLUSTER.copy() + # Commenting out this line causes the test case to fail?!?! + config["min_workers"] = 0 + config["target_utilization_fraction"] = 1.0 + config_path = self.write_config(config) + self.provider = MockProvider() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "head", + TAG_RAY_USER_NODE_TYPE: "p2.xlarge" + }, 1) + head_ip = self.provider.non_terminated_node_ips({})[0] + self.provider.finish_starting_nodes() + runner = MockProcessRunner() + lm = LoadMetrics(local_ip=head_ip) + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + autoscaler.update() + self.waitForNodes(1) + lm.update(head_ip, {"CPU": 4, "GPU": 1}, {}, {}) + self.waitForNodes(1) + + lm.update( + head_ip, { + "CPU": 4, + "GPU": 1 + }, {"GPU": 1}, {}, + waiting_bundles=[{ + "GPU": 1 + }]) + autoscaler.update() + self.waitForNodes(2) + assert self.provider.mock_nodes[1].node_type == "p2.xlarge" + def testRequestBundlesAccountsForHeadNode(self): config = MULTI_WORKER_CLUSTER.copy() config["head_node_type"] = "p2.8xlarge"