From 6d2af33a0110df64c4a6de61d97076529444ce83 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Wed, 26 Aug 2020 23:36:01 -0700 Subject: [PATCH] [Autoscaler] Proper resource demand plumbing (#10329) --- python/ray/autoscaler/autoscaler.py | 22 ++++---- python/ray/autoscaler/load_metrics.py | 15 +++++- python/ray/monitor.py | 23 +++++--- python/ray/test_utils.py | 19 +++++++ .../tests/test_resource_demand_scheduler.py | 54 ++++++++++++++++++- 5 files changed, 115 insertions(+), 18 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index d37d18408..9259274ec 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -117,7 +117,7 @@ class StandardAutoscaler: # Aggregate resources the user is requesting of the cluster. self.resource_requests = defaultdict(int) # List of resource bundles the user is requesting of the cluster. - self.resource_demand_vector = None + self.resource_demand_vector = [] logger.info("StandardAutoscaler: {}".format(self.config)) @@ -197,14 +197,18 @@ class StandardAutoscaler: self.log_info_string(nodes, target_workers) # First let the resource demand scheduler launch nodes, if enabled. - if self.resource_demand_scheduler and self.resource_demand_vector: - to_launch = (self.resource_demand_scheduler.get_nodes_to_launch( - self.provider.non_terminated_nodes(tag_filters={}), - self.pending_launches.breakdown(), - self.resource_demand_vector)) - # TODO(ekl) also enforce max launch concurrency here? - for node_type, count in to_launch: - self.launch_new_node(count, node_type=node_type) + if self.resource_demand_scheduler: + resource_demand_vector = self.resource_demand_vector + \ + self.load_metrics.get_resource_demand_vector() + if resource_demand_vector: + to_launch = ( + self.resource_demand_scheduler.get_nodes_to_launch( + self.provider.non_terminated_nodes(tag_filters={}), + self.pending_launches.breakdown(), + resource_demand_vector)) + # TODO(ekl) also enforce max launch concurrency here? + for node_type, count in to_launch: + self.launch_new_node(count, node_type=node_type) # Launch additional nodes of the default type, if still needed. num_workers = len(nodes) + num_pending diff --git a/python/ray/autoscaler/load_metrics.py b/python/ray/autoscaler/load_metrics.py index cabbc88fe..3d7253c9d 100644 --- a/python/ray/autoscaler/load_metrics.py +++ b/python/ray/autoscaler/load_metrics.py @@ -23,8 +23,16 @@ class LoadMetrics: self.dynamic_resources_by_ip = {} self.resource_load_by_ip = {} self.local_ip = services.get_node_ip_address() + self.waiting_bundles = [] + self.infeasible_bundles = [] - def update(self, ip, static_resources, dynamic_resources, resource_load): + def update(self, + ip, + static_resources, + dynamic_resources, + resource_load, + waiting_bundles=[], + infeasible_bundles=[]): self.resource_load_by_ip[ip] = resource_load self.static_resources_by_ip[ip] = static_resources @@ -43,6 +51,8 @@ class LoadMetrics: static_resources != dynamic_resources: self.last_used_time_by_ip[ip] = now self.last_heartbeat_time_by_ip[ip] = now + self.waiting_bundles = waiting_bundles + self.infeasible_bundles = infeasible_bundles def mark_active(self, ip): assert ip is not None, "IP should be known at this time" @@ -127,6 +137,9 @@ class LoadMetrics: return nodes_used, resources_used, resources_total + def get_resource_demand_vector(self): + return self.waiting_bundles + self.infeasible_bundles + def info_string(self): return " - " + "\n - ".join( ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index b7d3a244c..7849cc1f7 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -88,7 +88,7 @@ class Monitor: """ self.primary_subscribe_client.psubscribe(pattern) - def handle_resource_demands(self, resource_load_by_shape): + def parse_resource_demands(self, resource_load_by_shape): """Handle the message.resource_load_by_shape protobuf for the demand based autoscaling. Catch and log all exceptions so this doesn't interfere with the utilization based autoscaler until we're confident @@ -98,17 +98,21 @@ class Monitor: resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands in protobuf form or None. """ + waiting_bundles, infeasible_bundles = [], [] try: if not self.autoscaler: return - bundles = [] for resource_demand_pb in list( resource_load_by_shape.resource_demands): request_shape = dict(resource_demand_pb.shape) - bundles.append(request_shape) - self.autoscaler.request_resources(bundles) + for _ in range(resource_demand_pb.num_ready_requests_queued): + waiting_bundles.append(request_shape) + for _ in range( + resource_demand_pb.num_infeasible_requests_queued): + infeasible_bundles.append(request_shape) except Exception as e: logger.exception(e) + return waiting_bundles, infeasible_bundles def xray_heartbeat_batch_handler(self, unused_channel, data): """Handle an xray heartbeat batch message from Redis.""" @@ -125,16 +129,19 @@ class Monitor: for resource in total_resources: available_resources.setdefault(resource, 0.0) + waiting_bundles, infeasible_bundles = \ + self.parse_resource_demands(message.resource_load_by_shape) + # Update the load metrics for this raylet. client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) ip = self.raylet_id_to_ip_map.get(client_id) if ip: self.load_metrics.update(ip, total_resources, - available_resources, resource_load) + available_resources, resource_load, + waiting_bundles, infeasible_bundles) else: logger.warning( f"Monitor: could not find ip for client {client_id}") - self.handle_resource_demands(message.resource_load_by_shape) def xray_job_notification_handler(self, unused_channel, data): """Handle a notification that a job has been added or removed. @@ -153,7 +160,9 @@ class Monitor: binary_to_hex(job_id))) def autoscaler_resource_request_handler(self, _, data): - """Handle a notification of a resource request for the autoscaler. + """Handle a notification of a resource request for the autoscaler. This channel + and method are only used by the manual + `ray.autoscaler.commands.request_resources` api. Args: channel: unused diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index ebc9e36d0..c7f5bef2b 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -336,6 +336,25 @@ def dicts_equal(dict1, dict2, abs_tol=1e-4): return True +def same_elements(elems_a, elems_b): + """Checks if two iterables (such as lists) contain the same elements. Elements + do not have to be hashable (this allows us to compare sets of dicts for + example). This comparison is not necessarily efficient. + """ + a = list(elems_a) + b = list(elems_b) + + for x in a: + if x not in b: + return False + + for x in b: + if x not in a: + return False + + return True + + @ray.remote def _put(obj): return obj diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 8bc8f6153..fbc731654 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -15,6 +15,7 @@ from ray.autoscaler.commands import get_or_create_head_node from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ get_bin_pack_residual, get_nodes_for +from ray.test_utils import same_elements from time import sleep @@ -162,6 +163,24 @@ def test_get_nodes_respects_max_limit(): }] * 10) == [("m4.large", 2)] +class LoadMetricsTest(unittest.TestCase): + def testResourceDemandVector(self): + lm = LoadMetrics() + lm.update( + "1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}, + waiting_bundles=[{ + "GPU": 1 + }], + infeasible_bundles=[{ + "CPU": 16 + }]) + assert same_elements(lm.get_resource_demand_vector(), [{ + "CPU": 16 + }, { + "GPU": 1 + }]) + + class AutoscalingTest(unittest.TestCase): def setUp(self): NODE_PROVIDERS["mock"] = \ @@ -352,6 +371,40 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.1", "CPU: 32") runner.assert_has_call("172.0.0.1", "GPU: 8") + def testScaleUpLoadMetrics(self): + config = MULTI_WORKER_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 50 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, + lm, + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(0) + autoscaler.update() + lm.update( + "1.2.3.4", {}, {}, {}, + waiting_bundles=[{ + "GPU": 1 + }], + infeasible_bundles=[{ + "CPU": 16 + }]) + autoscaler.update() + self.waitForNodes(2) + nodes = { + self.provider.mock_nodes[0].node_type, + self.provider.mock_nodes[1].node_type + } + assert nodes == {"p2.xlarge", "m4.4xlarge"} + def testCommandPassing(self): t = "custom" config = MULTI_WORKER_CLUSTER.copy() @@ -384,7 +437,6 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(2) assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" autoscaler.request_resources([{"GPU": 1}] * 9) - # autoscaler.request_resources([{t: 1}]) autoscaler.update() self.waitForNodes(3) assert self.provider.mock_nodes[2].node_type == "p2.xlarge"