mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 11:21:15 +08:00
[Autoscaler] Resource demand vector (hearbeat -> autoscaler plumbing) (#10127)
This commit is contained in:
@@ -88,6 +88,28 @@ class Monitor:
|
||||
"""
|
||||
self.primary_subscribe_client.psubscribe(pattern)
|
||||
|
||||
def handle_resource_demands(self, resource_load_by_shape):
|
||||
"""Handle the message.resource_load_by_shape protobuf for the demand
|
||||
based autoscaling. Catch and log all exceptions so this doesn't
|
||||
interfere with the utilization based autoscaler until we're confident
|
||||
this is stable.
|
||||
|
||||
Args:
|
||||
resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands
|
||||
in protobuf form or None.
|
||||
"""
|
||||
try:
|
||||
if not self.autoscaler:
|
||||
return
|
||||
bundles = []
|
||||
for resource_demand_pb in list(
|
||||
resource_load_by_shape.resource_demands):
|
||||
request_shape = dict(resource_demand_pb.shape)
|
||||
bundles.append(request_shape)
|
||||
self.autoscaler.request_resources(bundles)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
|
||||
def xray_heartbeat_batch_handler(self, unused_channel, data):
|
||||
"""Handle an xray heartbeat batch message from Redis."""
|
||||
|
||||
@@ -113,6 +135,7 @@ class Monitor:
|
||||
logger.warning(
|
||||
"Monitor: "
|
||||
"could not find ip for client {}".format(client_id))
|
||||
self.handle_resource_demands(message.resource_load_by_shape)
|
||||
|
||||
def xray_job_notification_handler(self, unused_channel, data):
|
||||
"""Handle a notification that a job has been added or removed.
|
||||
|
||||
Reference in New Issue
Block a user