From 4b14bf85e4eb90e3a94d0d2038663e64bd4d8bc7 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Mon, 17 Aug 2020 13:57:15 -0700 Subject: [PATCH] [Autoscaler] Resource demand vector (hearbeat -> autoscaler plumbing) (#10127) --- python/ray/monitor.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index ab830bcb6..c26435d6f 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -88,6 +88,28 @@ class Monitor: """ self.primary_subscribe_client.psubscribe(pattern) + def handle_resource_demands(self, resource_load_by_shape): + """Handle the message.resource_load_by_shape protobuf for the demand + based autoscaling. Catch and log all exceptions so this doesn't + interfere with the utilization based autoscaler until we're confident + this is stable. + + Args: + resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands + in protobuf form or None. + """ + try: + if not self.autoscaler: + return + bundles = [] + for resource_demand_pb in list( + resource_load_by_shape.resource_demands): + request_shape = dict(resource_demand_pb.shape) + bundles.append(request_shape) + self.autoscaler.request_resources(bundles) + except Exception as e: + logger.exception(e) + def xray_heartbeat_batch_handler(self, unused_channel, data): """Handle an xray heartbeat batch message from Redis.""" @@ -113,6 +135,7 @@ class Monitor: logger.warning( "Monitor: " "could not find ip for client {}".format(client_id)) + self.handle_resource_demands(message.resource_load_by_shape) def xray_job_notification_handler(self, unused_channel, data): """Handle a notification that a job has been added or removed.