From ba0f531da0bc3d90882430a4374f78df86bddcba Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 18 Aug 2020 17:52:36 -0500 Subject: [PATCH] [serve] Remove SLO code and blist dependency (#10075) --- python/ray/serve/api.py | 11 +-- python/ray/serve/backend_worker.py | 2 +- python/ray/serve/constants.py | 3 - python/ray/serve/endpoint_policy.py | 4 +- python/ray/serve/examples/echo_slo_reverse.py | 73 ------------------- python/ray/serve/handle.py | 38 +--------- python/ray/serve/http_proxy.py | 36 --------- python/ray/serve/request_params.py | 20 ----- python/ray/serve/router.py | 23 +----- python/ray/serve/tests/test_router.py | 26 ------- python/setup.py | 2 +- 11 files changed, 8 insertions(+), 230 deletions(-) delete mode 100644 python/ray/serve/examples/echo_slo_reverse.py diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 2420c9abe..c23498a0f 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -356,18 +356,11 @@ def shadow_traffic(endpoint_name, backend_tag, proportion): @_ensure_connected -def get_handle(endpoint_name, - relative_slo_ms=None, - absolute_slo_ms=None, - missing_ok=False): +def get_handle(endpoint_name, missing_ok=False): """Retrieve RayServeHandle for service endpoint to invoke it from Python. Args: endpoint_name (str): A registered service endpoint. - relative_slo_ms(float): Specify relative deadline in milliseconds for - queries fired using this handle. (Default: None) - absolute_slo_ms(float): Specify absolute deadline in milliseconds for - queries fired using this handle. (Default: None) missing_ok (bool): If true, skip the check for the endpoint existence. It can be useful when the endpoint has not been registered. @@ -382,8 +375,6 @@ def get_handle(endpoint_name, return RayServeHandle( list(routers.values())[0], endpoint_name, - relative_slo_ms, - absolute_slo_ms, ) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 1624810ea..8a43a49ab 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -331,7 +331,7 @@ class RayServeWorker: get_call_method = attrgetter("call_method") sorted_batch = sorted(batch, key=get_call_method) for _, group in groupby(sorted_batch, key=get_call_method): - group = sorted(group) + group = list(group) evaluated = asyncio.ensure_future(self.invoke_batch(group)) all_evaluated_futures.append(evaluated) result_futures = [q.async_future for q in group] diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index d7668e251..3de75037f 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -19,9 +19,6 @@ DEFAULT_HTTP_PORT = 8000 #: Max concurrency ASYNC_CONCURRENCY = int(1e6) -#: Default latency SLO -DEFAULT_LATENCY_SLO_MS = 1e9 - #: Interval for metric client to push metrics to exporters METRIC_PUSH_INTERVAL_S = 2 diff --git a/python/ray/serve/endpoint_policy.py b/python/ray/serve/endpoint_policy.py index 9666e9fd5..c345ba30c 100644 --- a/python/ray/serve/endpoint_policy.py +++ b/python/ray/serve/endpoint_policy.py @@ -89,13 +89,13 @@ class RandomEndpointPolicy(EndpointPolicy): rstate.random()) assigned_backends.add(chosen_backend) - backend_queues[chosen_backend].add(query) + backend_queues[chosen_backend].appendleft(query) if len(shadow_backends) > 0: shadow_query = copy.copy(query) shadow_query.async_future = None shadow_query.is_shadow_query = True for shadow_backend in shadow_backends: assigned_backends.add(shadow_backend) - backend_queues[shadow_backend].add(shadow_query) + backend_queues[shadow_backend].appendleft(shadow_query) return assigned_backends diff --git a/python/ray/serve/examples/echo_slo_reverse.py b/python/ray/serve/examples/echo_slo_reverse.py deleted file mode 100644 index 098ac5ddd..000000000 --- a/python/ray/serve/examples/echo_slo_reverse.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -SLO [reverse] example of ray.serve module -""" - -import time - -import requests - -import ray -import ray.serve as serve - -# initialize ray serve system. -serve.init() - - -# a backend can be a function or class. -# it can be made to be invoked from web as well as python. -def echo_v1(flask_request, response="hello from python!"): - if serve.context.web: - response = flask_request.url - return response - - -serve.create_backend("echo:v1", echo_v1) - -serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo") - -# wait for routing table to get populated -time.sleep(2) - -# relative slo (10 ms deadline) can be specified via http -slo_ms = 10.0 -# absolute slo (10 ms deadline) can be specified via http -abs_slo_ms = 11.9 -print("> [HTTP] Pinging http://127.0.0.1:8000/" - "echo?relative_slo_ms={}".format(slo_ms)) -print( - requests.get("http://127.0.0.1:8000/" - "echo?relative_slo_ms={}".format(slo_ms)).json()) -print("> [HTTP] Pinging http://127.0.0.1:8000/" - "echo?absolute_slo_ms={}".format(abs_slo_ms)) -print( - requests.get("http://127.0.0.1:8000/" - "echo?absolute_slo_ms={}".format(abs_slo_ms)).json()) - -# get the handle of the endpoint -handle = serve.get_handle("my_endpoint") - -future_list = [] - -# fire 10 requests with slo's in the (almost) reverse order of the order in -# which remote procedure call is done -for r in range(10): - slo_ms = 1000 - 100 * r - response = "hello from request: {} slo: {}".format(r, slo_ms) - print("> [REMOTE] Pinging handle.remote(response='{}',slo_ms={})".format( - response, slo_ms)) - - # overriding slo for each query. - # Generally slo is specified for a service handle but it can - # be overrided using options for query specific demands - f = handle.options(relative_slo_ms=slo_ms).remote(response=response) - future_list.append(f) - -# get results of queries as they complete -# should be completed (almost) according to the order of their slo time -left_futures = future_list -while left_futures: - completed_futures, remaining_futures = ray.wait(left_futures, timeout=0.05) - if len(completed_futures) > 0: - result = ray.get(completed_futures[0]) - print(result) - left_futures = remaining_futures diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index db3dc2cd4..615f344dd 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -31,35 +31,14 @@ class RayServeHandle: self, router_handle, endpoint_name, - relative_slo_ms=None, - absolute_slo_ms=None, method_name=None, shard_key=None, ): self.router_handle = router_handle self.endpoint_name = endpoint_name - assert relative_slo_ms is None or absolute_slo_ms is None, ( - "Can't specify both " - "relative and absolute " - "slo's together!") - self.relative_slo_ms = self._check_slo_ms(relative_slo_ms) - self.absolute_slo_ms = self._check_slo_ms(absolute_slo_ms) self.method_name = method_name self.shard_key = shard_key - def _check_slo_ms(self, slo_value): - if slo_value is not None: - try: - slo_value = float(slo_value) - if slo_value < 0: - raise ValueError( - "Request SLO must be positive, it is {}".format( - slo_value)) - return slo_value - except ValueError as e: - raise RayServeException(str(e)) - return None - def remote(self, *args, **kwargs): if len(args) != 0: raise RayServeException( @@ -73,26 +52,13 @@ class RayServeHandle: request_in_object = RequestMetadata( self.endpoint_name, TaskContext.Python, - self.relative_slo_ms, - self.absolute_slo_ms, call_method=method_name, shard_key=self.shard_key, ) return self.router_handle.enqueue_request.remote( request_in_object, **kwargs) - def options(self, - method_name=None, - shard_key=None, - relative_slo_ms=None, - absolute_slo_ms=None): - # If both the slo's are None then then we use a high default - # value so other queries can be prioritize and put in front of these - # queries. - assert not all([absolute_slo_ms, relative_slo_ms - ]), ("Can't specify both " - "relative and absolute " - "slo's together!") + def options(self, method_name=None, shard_key=None): # Don't override existing method if method_name is None and self.method_name is not None: @@ -104,8 +70,6 @@ class RayServeHandle: return RayServeHandle( self.router_handle, self.endpoint_name, - relative_slo_ms, - absolute_slo_ms, method_name=method_name, shard_key=shard_key, ) diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index f594a3e67..775938e37 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -1,5 +1,4 @@ import asyncio -from urllib.parse import parse_qs import socket from typing import List @@ -61,32 +60,6 @@ class HTTPProxy: return b"".join(body_buffer) - def _parse_latency_slo(self, scope): - query_string = scope["query_string"].decode("ascii") - query_kwargs = parse_qs(query_string) - - relative_slo_ms = query_kwargs.pop("relative_slo_ms", None) - absolute_slo_ms = query_kwargs.pop("absolute_slo_ms", None) - relative_slo_ms = self._validate_slo_ms(relative_slo_ms) - absolute_slo_ms = self._validate_slo_ms(absolute_slo_ms) - if relative_slo_ms is not None and absolute_slo_ms is not None: - raise ValueError("Both relative and absolute slo's" - "cannot be specified.") - return relative_slo_ms, absolute_slo_ms - - def _validate_slo_ms(self, request_slo_ms): - if request_slo_ms is None: - return None - if len(request_slo_ms) != 1: - raise ValueError( - "Multiple SLO specified, please specific only one.") - request_slo_ms = request_slo_ms[0] - request_slo_ms = float(request_slo_ms) - if request_slo_ms < 0: - raise ValueError("Request SLO must be positive, it is {}".format( - request_slo_ms)) - return request_slo_ms - def _make_error_sender(self, scope, receive, send): async def sender(error_message, status_code): response = Response(error_message, status_code=status_code) @@ -142,19 +115,10 @@ class HTTPProxy: http_body_bytes = await self.receive_http_body(scope, receive, send) - # get slo_ms before enqueuing the query - try: - relative_slo_ms, absolute_slo_ms = self._parse_latency_slo(scope) - except ValueError as e: - await error_sender(str(e), 400) - return - headers = {k.decode(): v.decode() for k, v in scope["headers"]} request_metadata = RequestMetadata( endpoint_name, TaskContext.Web, - relative_slo_ms=relative_slo_ms, - absolute_slo_ms=absolute_slo_ms, call_method=headers.get("X-SERVE-CALL-METHOD".lower(), "__call__"), shard_key=headers.get("X-SERVE-SHARD-KEY".lower(), None), ) diff --git a/python/ray/serve/request_params.py b/python/ray/serve/request_params.py index 825c10521..76da18e12 100644 --- a/python/ray/serve/request_params.py +++ b/python/ray/serve/request_params.py @@ -1,5 +1,3 @@ -import time -from ray.serve.constants import DEFAULT_LATENCY_SLO_MS import ray.cloudpickle as pickle @@ -10,37 +8,19 @@ class RequestMetadata: Args: endpoint(str): A registered endpoint. request_context(TaskContext): Context of a request. - request_slo_ms(float): Expected time for the query to get - completed. - is_wall_clock_time(bool): if True, router won't add wall clock - time to `request_slo_ms`. """ def __init__(self, endpoint, request_context, - relative_slo_ms=None, - absolute_slo_ms=None, call_method="__call__", shard_key=None): self.endpoint = endpoint self.request_context = request_context - self.relative_slo_ms = relative_slo_ms - self.absolute_slo_ms = absolute_slo_ms self.call_method = call_method self.shard_key = shard_key - def adjust_relative_slo_ms(self) -> float: - """Normalize the input latency objective to absolute timestamp. - - """ - slo_ms = self.relative_slo_ms - if slo_ms is None: - slo_ms = DEFAULT_LATENCY_SLO_MS - current_time_ms = time.time() * 1000 - return current_time_ms + slo_ms - def ray_serialize(self): return pickle.dumps(self.__dict__) diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index d5337ade6..fd6fdfd02 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -5,8 +5,6 @@ import time from typing import DefaultDict, List import pickle -import blist - from ray.exceptions import RayTaskError import ray @@ -24,7 +22,6 @@ class Query: request_args, request_kwargs, request_context, - request_slo_ms, call_method="__call__", shard_key=None, async_future=None, @@ -36,10 +33,6 @@ class Query: self.async_future = async_future - # Service level objective in milliseconds. This is expected to be the - # absolute time since unix epoch. - self.request_slo_ms = request_slo_ms - self.call_method = call_method self.shard_key = shard_key self.is_shadow_query = is_shadow_query @@ -59,11 +52,6 @@ class Query: kwargs = pickle.loads(value) return Query(**kwargs) - # adding comparator fn for maintaining an - # ascending order sorted list w.r.t request_slo_ms - def __lt__(self, other): - return self.request_slo_ms < other.request_slo_ms - def _make_future_unwrapper(client_futures: List[asyncio.Future], host_future: asyncio.Future): @@ -111,7 +99,7 @@ class Router: # backend_name -> worker replica tag queue self.worker_queues: DefaultDict[deque[str]] = defaultdict(deque) # backend_name -> worker payload queue - self.backend_queues = defaultdict(blist.sortedlist) + self.backend_queues = defaultdict(deque) # -- Metadata -- # @@ -184,18 +172,11 @@ class Router: logger.debug("Received a request for endpoint {}".format(endpoint)) self.num_router_requests.labels(endpoint=endpoint).add() - # check if the slo specified is directly the - # wall clock time - if request_meta.absolute_slo_ms is not None: - request_slo_ms = request_meta.absolute_slo_ms - else: - request_slo_ms = request_meta.adjust_relative_slo_ms() request_context = request_meta.request_context query = Query( request_args, request_kwargs, request_context, - request_slo_ms, call_method=request_meta.call_method, shard_key=request_meta.shard_key, async_future=asyncio.get_event_loop().create_future()) @@ -366,7 +347,7 @@ class Router: backend, curr_queries)) continue - request = buffer_queue.pop(0) + request = buffer_queue.pop() self.queries_counter[backend][backend_replica_tag] += 1 future = asyncio.get_event_loop().create_task( self._do_query(backend, backend_replica_tag, request)) diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index 1d1566ed9..3e4514126 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -1,4 +1,3 @@ -import asyncio from collections import defaultdict import pytest @@ -66,31 +65,6 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): assert got_work.request_kwargs == {} -async def test_slo(serve_instance, task_runner_mock_actor): - q = ray.remote(Router).remote() - await q.setup.remote("") - await q.set_traffic.remote("svc", TrafficPolicy({"backend-slo": 1.0})) - - all_request_sent = [] - for i in range(10): - slo_ms = 1000 - 100 * i - all_request_sent.append( - q.enqueue_request.remote( - RequestMetadata("svc", None, relative_slo_ms=slo_ms), i)) - - await q.add_new_worker.remote("backend-slo", "replica-1", - task_runner_mock_actor) - - await asyncio.gather(*all_request_sent) - - i_should_be = 9 - all_calls = await task_runner_mock_actor.get_all_calls.remote() - all_calls = all_calls[-10:] - for call in all_calls: - assert call.request_args[0] == i_should_be - i_should_be -= 1 - - async def test_alter_backend(serve_instance, task_runner_mock_actor): q = ray.remote(Router).remote() await q.setup.remote("") diff --git a/python/setup.py b/python/setup.py index f1a8172f6..2fbded274 100644 --- a/python/setup.py +++ b/python/setup.py @@ -110,7 +110,7 @@ if os.getenv("RAY_USE_NEW_GCS") == "on": # in this directory extras = { "debug": [], - "serve": ["uvicorn", "flask", "blist", "requests"], + "serve": ["uvicorn", "flask", "requests"], "tune": ["tabulate", "tensorboardX", "pandas"] }