diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index bcf2bfe36..344bc2a42 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -19,6 +19,7 @@ from ray.serve.exceptions import RayServeException from ray.experimental import metrics from ray.serve.config import BackendConfig from ray.serve.router import Query +from ray.serve.constants import DEFAULT_LATENCY_BUCKET_MS from ray.exceptions import RayTaskError logger = _get_logger() @@ -162,6 +163,8 @@ class RayServeWorker: self.batch_queue = BatchQueue(self.config.max_batch_size or 1, self.config.batch_wait_timeout) + self.num_ongoing_requests = 0 + self.request_counter = metrics.Count( "backend_request_counter", ("Number of queries that have been " "processed in this replica"), @@ -176,6 +179,25 @@ class RayServeWorker: "has been restarted due to failure."), "restarts", ["backend", "replica_tag"]) + self.queuing_latency_tracker = metrics.Histogram( + "backend_queuing_latency_ms", + ("The latency for queries waiting in the replica's queue " + "waiting to be processed or batched."), "ms", + DEFAULT_LATENCY_BUCKET_MS, ["backend", "replica_tag"]) + self.processing_latency_tracker = metrics.Histogram( + "backend_processing_latency_ms", + "The latency for queries to be processed", "ms", + DEFAULT_LATENCY_BUCKET_MS, + ["backend", "replica_tag", "batch_size"]) + self.num_queued_items = metrics.Gauge( + "replica_queued_queries", + "Current number of queries queued in the the backend replicas", + "requests", ["backend", "replica_tag"]) + self.num_processing_items = metrics.Gauge( + "replica_processing_queries", + "Current number of queries being processed", "requests", + ["backend", "replica_tag"]) + self.restart_counter.record(1, { "backend": self.backend_tag, "replica_tag": self.replica_tag @@ -221,6 +243,8 @@ class RayServeWorker: method_to_call = self.get_runner_method(request_item) args = args if self.has_positional_args(method_to_call) else [] method_to_call = ensure_async(method_to_call) + + start = time.time() try: result = await method_to_call(*args, **kwargs) self.request_counter.record(1, {"backend": self.backend_tag}) @@ -229,6 +253,12 @@ class RayServeWorker: self.error_counter.record(1, {"backend": self.backend_tag}) finally: self._reset_context() + self.processing_latency_tracker.record( + (time.time() - start) * 1000, { + "backend": self.backend_tag, + "replica": self.replica_tag, + "batch_size": "1" + }) return result @@ -261,6 +291,7 @@ class RayServeWorker: if self.has_positional_args(call_method): arg_list.append(FakeFlaskRequest()) + timing_start = time.time() try: # Check mixing of query context (unified context needed). if len(context_flags) != 1: @@ -302,12 +333,20 @@ class RayServeWorker: ".".format(batch_size, len(result_list))) raise RayServeException(error_message) self._reset_context() - return result_list except Exception as e: wrapped_exception = wrap_to_ray_error(e) self.error_counter.record(1, {"backend": self.backend_tag}) self._reset_context() - return [wrapped_exception for _ in range(batch_size)] + result_list = [wrapped_exception for _ in range(batch_size)] + + self.processing_latency_tracker.record( + (time.time() - timing_start) * 1000, { + "backend": self.backend_tag, + "replica_tag": self.replica_tag, + "batch_size": str(batch_size) + }) + + return result_list async def main_loop(self) -> None: while True: @@ -316,6 +355,23 @@ class RayServeWorker: # updated until after the current iteration. batch = await self.batch_queue.wait_for_batch() + # Record metrics + self.num_queued_items.record(self.batch_queue.qsize(), { + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + self.num_processing_items.record( + self.num_ongoing_requests - self.batch_queue.qsize(), { + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + for query in batch: + queuing_time = (time.time() - query.tick_enter_replica) * 1000 + self.queuing_latency_tracker.record(queuing_time, { + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + all_evaluated_futures = [] if not self.config.internal_metadata.accepts_batches: @@ -350,8 +406,15 @@ class RayServeWorker: request: Union[Query, bytes]) -> asyncio.Future: if isinstance(request, bytes): request = Query.ray_deserialize(request) + + request.tick_enter_replica = time.time() logger.debug("Worker {} got request {}".format(self.replica_tag, request)) request.async_future = asyncio.get_event_loop().create_future() + self.num_ongoing_requests += 1 + self.batch_queue.put(request) - return await request.async_future + result = await request.async_future + + self.num_ongoing_requests -= 1 + return result diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index ff950503c..1d42044ff 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -18,3 +18,19 @@ ASYNC_CONCURRENCY = int(1e6) #: Time to wait for HTTP proxy in `serve.init()` HTTP_PROXY_TIMEOUT = 60 + +#: Default histogram buckets for latency tracker +DEFAULT_LATENCY_BUCKET_MS = [ + 1, + 2, + 5, + 10, + 20, + 50, + 100, + 200, + 500, + 1000, + 2000, + 5000, +] diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index b058044c8..b9f957d16 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -28,6 +28,9 @@ class Query: metadata: RequestMetadata async_future: Optional[asyncio.Future] = None + tick_enter_router: Optional[float] = None + tick_enter_replica: Optional[float] = None + def __reduce__(self): return type(self).ray_deserialize, (self.ray_serialize(), ) @@ -132,6 +135,11 @@ class Router: ("Number of requests that errored when getting result " "from the backend."), "requests", ["backend"]) + self.backend_queue_size = metrics.Gauge( + "backend_queued_queries", + "Current number of queries queued in the router for a backend", + "requests", ["backend"]) + asyncio.get_event_loop().create_task(self.report_queue_lengths()) async def enqueue_request(self, request_meta, *request_args, @@ -327,9 +335,14 @@ class Router: async def report_queue_lengths(self): while True: + queue_lengths = { + backend: len(q) + for backend, q in self.backend_queues.items() + } self.controller.report_queue_lengths.remote( - self.name, { - backend: len(q) - for backend, q in self.backend_queues.items() - }) + self.name, queue_lengths) + + for backend, length in queue_lengths.items(): + self.backend_queue_size.record(length, {"backend": backend}) + await asyncio.sleep(REPORT_QUEUE_LENGTH_PERIOD_S) diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 1a40f0652..7bf721bdf 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -11,7 +11,10 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): @pytest.fixture(scope="session") def _shared_serve_instance(): - ray.init(num_cpus=36) + ray.init( + num_cpus=36, + _metrics_export_port=9999, + _system_config={"metrics_report_interval_ms": 1000}) serve.init() yield diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 803171c9e..06a39da1e 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -10,8 +10,9 @@ from ray import serve from ray.test_utils import wait_for_condition from ray.serve import constants from ray.serve.exceptions import RayServeException -from ray.serve.utils import format_actor_name, get_random_letters from ray.serve.config import BackendConfig +from ray.serve.utils import (block_until_http_ready, format_actor_name, + get_random_letters) def test_e2e(serve_instance): @@ -896,6 +897,54 @@ def test_shadow_traffic(serve_instance): wait_for_condition(check_requests) +def test_serve_metrics(serve_instance): + @serve.accept_batch + def batcher(flask_requests): + return ["hello"] * len(flask_requests) + + serve.create_backend("metrics", batcher) + serve.create_endpoint("metrics", backend="metrics", route="/metrics") + # send 10 concurrent requests + url = "http://127.0.0.1:8000/metrics" + ray.get([block_until_http_ready.remote(url) for _ in range(10)]) + + def verify_metrics(do_assert=False): + resp = requests.get("http://127.0.0.1:9999").text + + expected_metrics = [ + # counter + "num_router_requests_total", + "num_http_requests_total", + "backend_queued_queries_total", + "backend_request_counter_requests_total", + "backend_worker_starts_restarts_total", + # histogram + "backend_processing_latency_ms_bucket", + "backend_processing_latency_ms_count", + "backend_processing_latency_ms_sum", + "backend_queuing_latency_ms_bucket", + "backend_queuing_latency_ms_count", + "backend_queuing_latency_ms_sum", + # gauge + "replica_processing_queries", + "replica_queued_queries", + ] + for metric in expected_metrics: + # For the final error round + if do_assert: + assert metric in resp + # For the wait_for_condition + else: + if metric not in resp: + return False + return True + + try: + wait_for_condition(verify_metrics, retry_interval_ms=500) + except RuntimeError: + verify_metrics() + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", "-s", __file__]))