diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 72cce85f3..7caddb4d8 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -238,6 +238,8 @@ class RayServeWorker: return getattr(self.callable, method_name) async def invoke_single(self, request_item: Query) -> Any: + logger.debug("Replica {} started executing request {}".format( + self.replica_tag, request_item.metadata.request_id)) method_to_call = ensure_async(self.get_runner_method(request_item)) arg = parse_request_item(request_item) @@ -252,8 +254,9 @@ class RayServeWorker: result = wrap_to_ray_error(e) self.error_counter.record(1) + latency_ms = (time.time() - start) * 1000 self.processing_latency_tracker.record( - (time.time() - start) * 1000, tags={"batch_size": "1"}) + latency_ms, tags={"batch_size": "1"}) return result @@ -264,6 +267,8 @@ class RayServeWorker: # Construct the batch of requests for item in request_item_list: + logger.debug("Replica {} started executing request {}".format( + self.replica_tag, item.metadata.request_id)) args.append(parse_request_item(item)) call_methods.add(self.get_runner_method(item)) @@ -303,9 +308,9 @@ class RayServeWorker: self.error_counter.record(1) result_list = [wrapped_exception for _ in range(batch_size)] + latency_ms = (time.time() - timing_start) * 1000 self.processing_latency_tracker.record( - (time.time() - timing_start) * 1000, - tags={"batch_size": str(batch_size)}) + latency_ms, tags={"batch_size": str(batch_size)}) return result_list @@ -375,13 +380,16 @@ class RayServeWorker: request = Query.ray_deserialize(request) request.tick_enter_replica = time.time() - logger.debug("Worker {} got request {}".format(self.replica_tag, - request)) + logger.debug("Replica {} received request {}".format( + self.replica_tag, request.metadata.request_id)) request.async_future = asyncio.get_event_loop().create_future() self.num_ongoing_requests += 1 self.batch_queue.put(request) result = await request.async_future + request_time_ms = (time.time() - request.tick_enter_replica) * 1000 + logger.debug("Replica {} finished request {} in {:.2f}ms".format( + self.replica_tag, request.metadata.request_id, request_time_ms)) self.num_ongoing_requests -= 1 return result diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index cfc1fbfc3..5d6d5616d 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -2,6 +2,7 @@ from typing import Optional, Dict, Any, Union from ray.serve.context import TaskContext from ray.serve.router import RequestMetadata +from ray.serve.utils import get_random_letters class RayServeHandle: @@ -60,6 +61,7 @@ class RayServeHandle: ``request.args``. """ request_metadata = RequestMetadata( + get_random_letters(10), # Used for debugging. self.endpoint_name, TaskContext.Python, call_method=self.method_name or "__call__", diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 59fb47f38..e513cb675 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -8,6 +8,7 @@ import ray from ray.exceptions import RayTaskError from ray.serve.context import TaskContext from ray.util import metrics +from ray.serve.utils import _get_logger, get_random_letters from ray.serve.http_util import Response from ray.serve.router import Router, RequestMetadata @@ -15,6 +16,8 @@ from ray.serve.router import Router, RequestMetadata # TODO(edoakes): this should probably be configurable. MAX_ACTOR_DEAD_RETRIES = 10 +logger = _get_logger() + class HTTPProxy: """ @@ -108,6 +111,7 @@ class HTTPProxy: headers = {k.decode(): v.decode() for k, v in scope["headers"]} request_metadata = RequestMetadata( + get_random_letters(10), # Used for debugging. endpoint_name, TaskContext.Web, http_method=scope["method"].upper(), diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 4ff393044..d7d31bf50 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -19,6 +19,7 @@ REPORT_QUEUE_LENGTH_PERIOD_S = 1.0 @dataclass class RequestMetadata: + request_id: str endpoint: str request_context: TaskContext @@ -164,7 +165,9 @@ class Router: async def enqueue_request(self, request_meta, *request_args, **request_kwargs): endpoint = request_meta.endpoint - logger.debug("Received a request for endpoint {}".format(endpoint)) + logger.debug("Received request {} for endpoint {}.".format( + request_meta.request_id, endpoint)) + request_start = time.time() self.num_router_requests.record(1, tags={"endpoint": endpoint}) request_context = request_meta.request_context @@ -184,6 +187,10 @@ class Router: self.num_error_endpoint_requests.record( 1, tags={"endpoint": endpoint}) result = e + + request_time_ms = (time.time() - request_start) * 1000 + logger.debug("Finished request {} in {:.2f}ms".format( + request_meta.request_id, request_time_ms)) return result async def add_new_worker(self, backend_tag, replica_tag, worker_handle): @@ -294,7 +301,6 @@ class Router: # If the worker died, this will be a RayActorError. Just return it and # let the HTTP proxy handle the retry logic. logger.debug("Sending query to replica:" + backend_replica_tag) - start = time.time() worker = self.replicas[backend_replica_tag] try: object_ref = worker.handle_request.remote(req.ray_serialize()) @@ -311,7 +317,6 @@ class Router: result = error self.queries_counter[backend][backend_replica_tag] -= 1 await self.mark_worker_idle(backend, backend_replica_tag) - logger.debug("Got result in {:.2f}s".format(time.time() - start)) return result def _assign_query_to_worker(self, backend, buffer_queue, worker_queue): @@ -344,6 +349,8 @@ class Router: continue request = buffer_queue.pop() + logger.debug("Assigning request {} to replica {}.".format( + request.metadata.request_id, backend_replica_tag)) self.queries_counter[backend][backend_replica_tag] += 1 future = asyncio.get_event_loop().create_task( self._do_query(backend, backend_replica_tag, request)) diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 5d53cde33..50833ddfc 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -11,8 +11,7 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False) == 1: @pytest.fixture(scope="session") def _shared_serve_instance(): - # Uncomment the line below to turn on debug log for tests. - # os.environ["SERVE_LOG_DEBUG"] = "1" + os.environ["SERVE_LOG_DEBUG"] = "1" # Turns on debug log for tests # Overriding task_retry_delay_ms to relaunch actors more quickly ray.init( num_cpus=36, diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index b0bd6763a..d53b8d915 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -11,6 +11,7 @@ from ray.serve.controller import TrafficPolicy from ray.serve.router import Router, RequestMetadata from ray.serve.config import BackendConfig, BackendMetadata from ray.serve.exceptions import RayServeException +from ray.serve.utils import get_random_letters pytestmark = pytest.mark.asyncio @@ -59,7 +60,10 @@ async def add_servable_to_router(servable, router, **kwargs): def make_request_param(call_method="__call__"): return RequestMetadata( - "endpoint", context.TaskContext.Python, call_method=call_method) + get_random_letters(10), + "endpoint", + context.TaskContext.Python, + call_method=call_method) @pytest.fixture diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index e05c38f91..4ac0c7b00 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -55,7 +55,8 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): task_runner_mock_actor) # Make sure we get the request result back - result = await q.enqueue_request.remote(RequestMetadata("svc", None), 1) + result = await q.enqueue_request.remote( + RequestMetadata(get_random_letters(10), "svc", None), 1) assert result == "DONE" # Make sure it's the right request @@ -71,14 +72,16 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor): await q.set_traffic.remote("svc", TrafficPolicy({"backend-alter": 1})) await q.add_new_worker.remote("backend-alter", "replica-1", task_runner_mock_actor) - await q.enqueue_request.remote(RequestMetadata("svc", None), 1) + await q.enqueue_request.remote( + RequestMetadata(get_random_letters(10), "svc", None), 1) got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.args[0] == 1 await q.set_traffic.remote("svc", TrafficPolicy({"backend-alter-2": 1})) await q.add_new_worker.remote("backend-alter-2", "replica-1", task_runner_mock_actor) - await q.enqueue_request.remote(RequestMetadata("svc", None), 2) + await q.enqueue_request.remote( + RequestMetadata(get_random_letters(10), "svc", None), 2) got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.args[0] == 2 @@ -99,7 +102,8 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor): # assume 50% split, the probability of all 20 requests goes to a # single queue is 0.5^20 ~ 1-6 for _ in range(20): - await q.enqueue_request.remote(RequestMetadata("svc", None), 1) + await q.enqueue_request.remote( + RequestMetadata(get_random_letters(10), "svc", None), 1) got_work = [ await runner.get_recent_call.remote() @@ -138,7 +142,9 @@ async def test_shard_key(serve_instance, task_runner_mock_actor): shard_keys = [get_random_letters() for _ in range(100)] for shard_key in shard_keys: await q.enqueue_request.remote( - RequestMetadata("svc", None, shard_key=shard_key), shard_key) + RequestMetadata( + get_random_letters(10), "svc", None, shard_key=shard_key), + shard_key) # Log the shard keys that were assigned to each backend. runner_shard_keys = defaultdict(set) @@ -151,7 +157,9 @@ async def test_shard_key(serve_instance, task_runner_mock_actor): # Send queries with the same shard keys a second time. for shard_key in shard_keys: await q.enqueue_request.remote( - RequestMetadata("svc", None, shard_key=shard_key), shard_key) + RequestMetadata( + get_random_letters(10), "svc", None, shard_key=shard_key), + shard_key) # Check that the requests were all mapped to the same backends. for i, runner in enumerate(runners): @@ -186,8 +194,10 @@ async def test_router_use_max_concurrency(serve_instance): await q.set_backend_config.remote(backend_name, config) # We send over two queries - first_query = q.enqueue_request.remote(RequestMetadata("svc", None), 1) - second_query = q.enqueue_request.remote(RequestMetadata("svc", None), 1) + first_query = q.enqueue_request.remote( + RequestMetadata(get_random_letters(10), "svc", None), 1) + second_query = q.enqueue_request.remote( + RequestMetadata(get_random_letters(10), "svc", None), 1) # Neither queries should be available with pytest.raises(ray.exceptions.GetTimeoutError):