mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 02:47:10 +08:00
[Serve] Add Latency and Queue Size Metrics (#10535)
This commit is contained in:
@@ -19,6 +19,7 @@ from ray.serve.exceptions import RayServeException
|
||||
from ray.experimental import metrics
|
||||
from ray.serve.config import BackendConfig
|
||||
from ray.serve.router import Query
|
||||
from ray.serve.constants import DEFAULT_LATENCY_BUCKET_MS
|
||||
from ray.exceptions import RayTaskError
|
||||
|
||||
logger = _get_logger()
|
||||
@@ -162,6 +163,8 @@ class RayServeWorker:
|
||||
self.batch_queue = BatchQueue(self.config.max_batch_size or 1,
|
||||
self.config.batch_wait_timeout)
|
||||
|
||||
self.num_ongoing_requests = 0
|
||||
|
||||
self.request_counter = metrics.Count(
|
||||
"backend_request_counter", ("Number of queries that have been "
|
||||
"processed in this replica"),
|
||||
@@ -176,6 +179,25 @@ class RayServeWorker:
|
||||
"has been restarted due to failure."), "restarts",
|
||||
["backend", "replica_tag"])
|
||||
|
||||
self.queuing_latency_tracker = metrics.Histogram(
|
||||
"backend_queuing_latency_ms",
|
||||
("The latency for queries waiting in the replica's queue "
|
||||
"waiting to be processed or batched."), "ms",
|
||||
DEFAULT_LATENCY_BUCKET_MS, ["backend", "replica_tag"])
|
||||
self.processing_latency_tracker = metrics.Histogram(
|
||||
"backend_processing_latency_ms",
|
||||
"The latency for queries to be processed", "ms",
|
||||
DEFAULT_LATENCY_BUCKET_MS,
|
||||
["backend", "replica_tag", "batch_size"])
|
||||
self.num_queued_items = metrics.Gauge(
|
||||
"replica_queued_queries",
|
||||
"Current number of queries queued in the the backend replicas",
|
||||
"requests", ["backend", "replica_tag"])
|
||||
self.num_processing_items = metrics.Gauge(
|
||||
"replica_processing_queries",
|
||||
"Current number of queries being processed", "requests",
|
||||
["backend", "replica_tag"])
|
||||
|
||||
self.restart_counter.record(1, {
|
||||
"backend": self.backend_tag,
|
||||
"replica_tag": self.replica_tag
|
||||
@@ -221,6 +243,8 @@ class RayServeWorker:
|
||||
method_to_call = self.get_runner_method(request_item)
|
||||
args = args if self.has_positional_args(method_to_call) else []
|
||||
method_to_call = ensure_async(method_to_call)
|
||||
|
||||
start = time.time()
|
||||
try:
|
||||
result = await method_to_call(*args, **kwargs)
|
||||
self.request_counter.record(1, {"backend": self.backend_tag})
|
||||
@@ -229,6 +253,12 @@ class RayServeWorker:
|
||||
self.error_counter.record(1, {"backend": self.backend_tag})
|
||||
finally:
|
||||
self._reset_context()
|
||||
self.processing_latency_tracker.record(
|
||||
(time.time() - start) * 1000, {
|
||||
"backend": self.backend_tag,
|
||||
"replica": self.replica_tag,
|
||||
"batch_size": "1"
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
@@ -261,6 +291,7 @@ class RayServeWorker:
|
||||
if self.has_positional_args(call_method):
|
||||
arg_list.append(FakeFlaskRequest())
|
||||
|
||||
timing_start = time.time()
|
||||
try:
|
||||
# Check mixing of query context (unified context needed).
|
||||
if len(context_flags) != 1:
|
||||
@@ -302,12 +333,20 @@ class RayServeWorker:
|
||||
".".format(batch_size, len(result_list)))
|
||||
raise RayServeException(error_message)
|
||||
self._reset_context()
|
||||
return result_list
|
||||
except Exception as e:
|
||||
wrapped_exception = wrap_to_ray_error(e)
|
||||
self.error_counter.record(1, {"backend": self.backend_tag})
|
||||
self._reset_context()
|
||||
return [wrapped_exception for _ in range(batch_size)]
|
||||
result_list = [wrapped_exception for _ in range(batch_size)]
|
||||
|
||||
self.processing_latency_tracker.record(
|
||||
(time.time() - timing_start) * 1000, {
|
||||
"backend": self.backend_tag,
|
||||
"replica_tag": self.replica_tag,
|
||||
"batch_size": str(batch_size)
|
||||
})
|
||||
|
||||
return result_list
|
||||
|
||||
async def main_loop(self) -> None:
|
||||
while True:
|
||||
@@ -316,6 +355,23 @@ class RayServeWorker:
|
||||
# updated until after the current iteration.
|
||||
batch = await self.batch_queue.wait_for_batch()
|
||||
|
||||
# Record metrics
|
||||
self.num_queued_items.record(self.batch_queue.qsize(), {
|
||||
"backend": self.backend_tag,
|
||||
"replica_tag": self.replica_tag
|
||||
})
|
||||
self.num_processing_items.record(
|
||||
self.num_ongoing_requests - self.batch_queue.qsize(), {
|
||||
"backend": self.backend_tag,
|
||||
"replica_tag": self.replica_tag
|
||||
})
|
||||
for query in batch:
|
||||
queuing_time = (time.time() - query.tick_enter_replica) * 1000
|
||||
self.queuing_latency_tracker.record(queuing_time, {
|
||||
"backend": self.backend_tag,
|
||||
"replica_tag": self.replica_tag
|
||||
})
|
||||
|
||||
all_evaluated_futures = []
|
||||
|
||||
if not self.config.internal_metadata.accepts_batches:
|
||||
@@ -350,8 +406,15 @@ class RayServeWorker:
|
||||
request: Union[Query, bytes]) -> asyncio.Future:
|
||||
if isinstance(request, bytes):
|
||||
request = Query.ray_deserialize(request)
|
||||
|
||||
request.tick_enter_replica = time.time()
|
||||
logger.debug("Worker {} got request {}".format(self.replica_tag,
|
||||
request))
|
||||
request.async_future = asyncio.get_event_loop().create_future()
|
||||
self.num_ongoing_requests += 1
|
||||
|
||||
self.batch_queue.put(request)
|
||||
return await request.async_future
|
||||
result = await request.async_future
|
||||
|
||||
self.num_ongoing_requests -= 1
|
||||
return result
|
||||
|
||||
@@ -18,3 +18,19 @@ ASYNC_CONCURRENCY = int(1e6)
|
||||
|
||||
#: Time to wait for HTTP proxy in `serve.init()`
|
||||
HTTP_PROXY_TIMEOUT = 60
|
||||
|
||||
#: Default histogram buckets for latency tracker
|
||||
DEFAULT_LATENCY_BUCKET_MS = [
|
||||
1,
|
||||
2,
|
||||
5,
|
||||
10,
|
||||
20,
|
||||
50,
|
||||
100,
|
||||
200,
|
||||
500,
|
||||
1000,
|
||||
2000,
|
||||
5000,
|
||||
]
|
||||
|
||||
@@ -28,6 +28,9 @@ class Query:
|
||||
metadata: RequestMetadata
|
||||
async_future: Optional[asyncio.Future] = None
|
||||
|
||||
tick_enter_router: Optional[float] = None
|
||||
tick_enter_replica: Optional[float] = None
|
||||
|
||||
def __reduce__(self):
|
||||
return type(self).ray_deserialize, (self.ray_serialize(), )
|
||||
|
||||
@@ -132,6 +135,11 @@ class Router:
|
||||
("Number of requests that errored when getting result "
|
||||
"from the backend."), "requests", ["backend"])
|
||||
|
||||
self.backend_queue_size = metrics.Gauge(
|
||||
"backend_queued_queries",
|
||||
"Current number of queries queued in the router for a backend",
|
||||
"requests", ["backend"])
|
||||
|
||||
asyncio.get_event_loop().create_task(self.report_queue_lengths())
|
||||
|
||||
async def enqueue_request(self, request_meta, *request_args,
|
||||
@@ -327,9 +335,14 @@ class Router:
|
||||
|
||||
async def report_queue_lengths(self):
|
||||
while True:
|
||||
queue_lengths = {
|
||||
backend: len(q)
|
||||
for backend, q in self.backend_queues.items()
|
||||
}
|
||||
self.controller.report_queue_lengths.remote(
|
||||
self.name, {
|
||||
backend: len(q)
|
||||
for backend, q in self.backend_queues.items()
|
||||
})
|
||||
self.name, queue_lengths)
|
||||
|
||||
for backend, length in queue_lengths.items():
|
||||
self.backend_queue_size.record(length, {"backend": backend})
|
||||
|
||||
await asyncio.sleep(REPORT_QUEUE_LENGTH_PERIOD_S)
|
||||
|
||||
@@ -11,7 +11,10 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False):
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def _shared_serve_instance():
|
||||
ray.init(num_cpus=36)
|
||||
ray.init(
|
||||
num_cpus=36,
|
||||
_metrics_export_port=9999,
|
||||
_system_config={"metrics_report_interval_ms": 1000})
|
||||
serve.init()
|
||||
yield
|
||||
|
||||
|
||||
@@ -10,8 +10,9 @@ from ray import serve
|
||||
from ray.test_utils import wait_for_condition
|
||||
from ray.serve import constants
|
||||
from ray.serve.exceptions import RayServeException
|
||||
from ray.serve.utils import format_actor_name, get_random_letters
|
||||
from ray.serve.config import BackendConfig
|
||||
from ray.serve.utils import (block_until_http_ready, format_actor_name,
|
||||
get_random_letters)
|
||||
|
||||
|
||||
def test_e2e(serve_instance):
|
||||
@@ -896,6 +897,54 @@ def test_shadow_traffic(serve_instance):
|
||||
wait_for_condition(check_requests)
|
||||
|
||||
|
||||
def test_serve_metrics(serve_instance):
|
||||
@serve.accept_batch
|
||||
def batcher(flask_requests):
|
||||
return ["hello"] * len(flask_requests)
|
||||
|
||||
serve.create_backend("metrics", batcher)
|
||||
serve.create_endpoint("metrics", backend="metrics", route="/metrics")
|
||||
# send 10 concurrent requests
|
||||
url = "http://127.0.0.1:8000/metrics"
|
||||
ray.get([block_until_http_ready.remote(url) for _ in range(10)])
|
||||
|
||||
def verify_metrics(do_assert=False):
|
||||
resp = requests.get("http://127.0.0.1:9999").text
|
||||
|
||||
expected_metrics = [
|
||||
# counter
|
||||
"num_router_requests_total",
|
||||
"num_http_requests_total",
|
||||
"backend_queued_queries_total",
|
||||
"backend_request_counter_requests_total",
|
||||
"backend_worker_starts_restarts_total",
|
||||
# histogram
|
||||
"backend_processing_latency_ms_bucket",
|
||||
"backend_processing_latency_ms_count",
|
||||
"backend_processing_latency_ms_sum",
|
||||
"backend_queuing_latency_ms_bucket",
|
||||
"backend_queuing_latency_ms_count",
|
||||
"backend_queuing_latency_ms_sum",
|
||||
# gauge
|
||||
"replica_processing_queries",
|
||||
"replica_queued_queries",
|
||||
]
|
||||
for metric in expected_metrics:
|
||||
# For the final error round
|
||||
if do_assert:
|
||||
assert metric in resp
|
||||
# For the wait_for_condition
|
||||
else:
|
||||
if metric not in resp:
|
||||
return False
|
||||
return True
|
||||
|
||||
try:
|
||||
wait_for_condition(verify_metrics, retry_interval_ms=500)
|
||||
except RuntimeError:
|
||||
verify_metrics()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", "-s", __file__]))
|
||||
|
||||
Reference in New Issue
Block a user