diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 18cd760f7..d5e30f698 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -15,6 +15,8 @@ from ray.core.generated.gcs_pb2 import ( TablePrefix, TablePubsub, TaskTableData, + ResourceDemand, + ResourceLoad, ResourceMap, ResourceTableData, ObjectLocationInfo, @@ -40,6 +42,8 @@ __all__ = [ "TablePrefix", "TablePubsub", "TaskTableData", + "ResourceDemand", + "ResourceLoad", "ResourceMap", "ResourceTableData", "construct_error_message", diff --git a/python/ray/monitor.py b/python/ray/monitor.py index ed05ac913..0464f221d 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -18,6 +18,44 @@ import redis logger = logging.getLogger(__name__) +def parse_resource_demands(resource_load_by_shape): + """Handle the message.resource_load_by_shape protobuf for the demand + based autoscaling. Catch and log all exceptions so this doesn't + interfere with the utilization based autoscaler until we're confident + this is stable. Worker queue backlogs are added to the appropriate + resource demand vector. + + Args: + resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands + in protobuf form or None. + + Returns: + List[ResourceDict]: Waiting bundles (ready and feasible). + List[ResourceDict]: Infeasible bundles. + """ + waiting_bundles, infeasible_bundles = [], [] + try: + for resource_demand_pb in list( + resource_load_by_shape.resource_demands): + request_shape = dict(resource_demand_pb.shape) + for _ in range(resource_demand_pb.num_ready_requests_queued): + waiting_bundles.append(request_shape) + for _ in range(resource_demand_pb.num_infeasible_requests_queued): + infeasible_bundles.append(request_shape) + + # Infeasible and ready states for tasks are (logically) + # mutually exclusive. + if resource_demand_pb.num_infeasible_requests_queued > 0: + backlog_queue = infeasible_bundles + else: + backlog_queue = waiting_bundles + for _ in range(resource_demand_pb.backlog_size): + backlog_queue.append(request_shape) + except Exception: + logger.exception("Failed to parse resource demands.") + return waiting_bundles, infeasible_bundles + + class Monitor: """A monitor for Ray processes. @@ -89,32 +127,6 @@ class Monitor: """ self.primary_subscribe_client.psubscribe(pattern) - def parse_resource_demands(self, resource_load_by_shape): - """Handle the message.resource_load_by_shape protobuf for the demand - based autoscaling. Catch and log all exceptions so this doesn't - interfere with the utilization based autoscaler until we're confident - this is stable. - - Args: - resource_load_by_shape (pb2.gcs.ResourceLoad): The resource demands - in protobuf form or None. - """ - waiting_bundles, infeasible_bundles = [], [] - try: - if self.autoscaler: - for resource_demand_pb in list( - resource_load_by_shape.resource_demands): - request_shape = dict(resource_demand_pb.shape) - for _ in range( - resource_demand_pb.num_ready_requests_queued): - waiting_bundles.append(request_shape) - for _ in range( - resource_demand_pb.num_infeasible_requests_queued): - infeasible_bundles.append(request_shape) - except Exception as e: - logger.exception(e) - return waiting_bundles, infeasible_bundles - def xray_heartbeat_batch_handler(self, unused_channel, data): """Handle an xray heartbeat batch message from Redis.""" @@ -129,7 +141,7 @@ class Monitor: available_resources = dict(heartbeat_message.resources_available) waiting_bundles, infeasible_bundles = \ - self.parse_resource_demands(message.resource_load_by_shape) + parse_resource_demands(message.resource_load_by_shape) # Update the load metrics for this raylet. client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 306408c47..9fc69cfd2 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -87,6 +87,7 @@ py_test_module_list( "test_metrics_agent.py", "test_microbenchmarks.py", "test_mini.py", + "test_monitor.py", "test_node_manager.py", "test_numba.py", "test_queue.py", diff --git a/python/ray/tests/test_monitor.py b/python/ray/tests/test_monitor.py new file mode 100644 index 000000000..ac67ddcf2 --- /dev/null +++ b/python/ray/tests/test_monitor.py @@ -0,0 +1,39 @@ +import ray +from ray.monitor import parse_resource_demands + + +def test_parse_resource_demands(): + resource_load_by_shape = ray.gcs_utils.ResourceLoad(resource_demands=[ + ray.gcs_utils.ResourceDemand( + shape={"CPU": 1}, + num_ready_requests_queued=1, + num_infeasible_requests_queued=0, + backlog_size=0), + ray.gcs_utils.ResourceDemand( + shape={"CPU": 2}, + num_ready_requests_queued=1, + num_infeasible_requests_queued=0, + backlog_size=1), + ray.gcs_utils.ResourceDemand( + shape={"CPU": 3}, + num_ready_requests_queued=0, + num_infeasible_requests_queued=1, + backlog_size=2), + ray.gcs_utils.ResourceDemand( + shape={"CPU": 4}, + num_ready_requests_queued=1, + num_infeasible_requests_queued=1, + backlog_size=2), + ]) + + waiting, infeasible = \ + parse_resource_demands(resource_load_by_shape) + + assert waiting.count({"CPU": 1}) == 1 + assert waiting.count({"CPU": 2}) == 2 + assert infeasible.count({"CPU": 3}) == 3 + # The {"CPU": 4} case here is inconsistent, but could happen. Since the + # heartbeats are eventually consistent, we won't worry about whether it's + # counted as infeasible or waiting, as long as it's accounted for and + # doesn't cause an error. + assert len(waiting + infeasible) == 10 diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0a43c32d1..35219d944 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -289,7 +289,7 @@ RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100) /// If true, the worker's queue backlog size will be propagated to the heartbeat batch /// data. -RAY_CONFIG(bool, report_worker_backlog, true) +RAY_CONFIG(bool, report_worker_backlog, false) /// The timeout for synchronous GCS requests in seconds. RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5)