mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 01:59:23 +08:00
[serve] Remove SLO code and blist dependency (#10075)
This commit is contained in:
+1
-10
@@ -356,18 +356,11 @@ def shadow_traffic(endpoint_name, backend_tag, proportion):
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def get_handle(endpoint_name,
|
||||
relative_slo_ms=None,
|
||||
absolute_slo_ms=None,
|
||||
missing_ok=False):
|
||||
def get_handle(endpoint_name, missing_ok=False):
|
||||
"""Retrieve RayServeHandle for service endpoint to invoke it from Python.
|
||||
|
||||
Args:
|
||||
endpoint_name (str): A registered service endpoint.
|
||||
relative_slo_ms(float): Specify relative deadline in milliseconds for
|
||||
queries fired using this handle. (Default: None)
|
||||
absolute_slo_ms(float): Specify absolute deadline in milliseconds for
|
||||
queries fired using this handle. (Default: None)
|
||||
missing_ok (bool): If true, skip the check for the endpoint existence.
|
||||
It can be useful when the endpoint has not been registered.
|
||||
|
||||
@@ -382,8 +375,6 @@ def get_handle(endpoint_name,
|
||||
return RayServeHandle(
|
||||
list(routers.values())[0],
|
||||
endpoint_name,
|
||||
relative_slo_ms,
|
||||
absolute_slo_ms,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -331,7 +331,7 @@ class RayServeWorker:
|
||||
get_call_method = attrgetter("call_method")
|
||||
sorted_batch = sorted(batch, key=get_call_method)
|
||||
for _, group in groupby(sorted_batch, key=get_call_method):
|
||||
group = sorted(group)
|
||||
group = list(group)
|
||||
evaluated = asyncio.ensure_future(self.invoke_batch(group))
|
||||
all_evaluated_futures.append(evaluated)
|
||||
result_futures = [q.async_future for q in group]
|
||||
|
||||
@@ -19,9 +19,6 @@ DEFAULT_HTTP_PORT = 8000
|
||||
#: Max concurrency
|
||||
ASYNC_CONCURRENCY = int(1e6)
|
||||
|
||||
#: Default latency SLO
|
||||
DEFAULT_LATENCY_SLO_MS = 1e9
|
||||
|
||||
#: Interval for metric client to push metrics to exporters
|
||||
METRIC_PUSH_INTERVAL_S = 2
|
||||
|
||||
|
||||
@@ -89,13 +89,13 @@ class RandomEndpointPolicy(EndpointPolicy):
|
||||
rstate.random())
|
||||
|
||||
assigned_backends.add(chosen_backend)
|
||||
backend_queues[chosen_backend].add(query)
|
||||
backend_queues[chosen_backend].appendleft(query)
|
||||
if len(shadow_backends) > 0:
|
||||
shadow_query = copy.copy(query)
|
||||
shadow_query.async_future = None
|
||||
shadow_query.is_shadow_query = True
|
||||
for shadow_backend in shadow_backends:
|
||||
assigned_backends.add(shadow_backend)
|
||||
backend_queues[shadow_backend].add(shadow_query)
|
||||
backend_queues[shadow_backend].appendleft(shadow_query)
|
||||
|
||||
return assigned_backends
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
"""
|
||||
SLO [reverse] example of ray.serve module
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
import ray
|
||||
import ray.serve as serve
|
||||
|
||||
# initialize ray serve system.
|
||||
serve.init()
|
||||
|
||||
|
||||
# a backend can be a function or class.
|
||||
# it can be made to be invoked from web as well as python.
|
||||
def echo_v1(flask_request, response="hello from python!"):
|
||||
if serve.context.web:
|
||||
response = flask_request.url
|
||||
return response
|
||||
|
||||
|
||||
serve.create_backend("echo:v1", echo_v1)
|
||||
|
||||
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
|
||||
|
||||
# wait for routing table to get populated
|
||||
time.sleep(2)
|
||||
|
||||
# relative slo (10 ms deadline) can be specified via http
|
||||
slo_ms = 10.0
|
||||
# absolute slo (10 ms deadline) can be specified via http
|
||||
abs_slo_ms = 11.9
|
||||
print("> [HTTP] Pinging http://127.0.0.1:8000/"
|
||||
"echo?relative_slo_ms={}".format(slo_ms))
|
||||
print(
|
||||
requests.get("http://127.0.0.1:8000/"
|
||||
"echo?relative_slo_ms={}".format(slo_ms)).json())
|
||||
print("> [HTTP] Pinging http://127.0.0.1:8000/"
|
||||
"echo?absolute_slo_ms={}".format(abs_slo_ms))
|
||||
print(
|
||||
requests.get("http://127.0.0.1:8000/"
|
||||
"echo?absolute_slo_ms={}".format(abs_slo_ms)).json())
|
||||
|
||||
# get the handle of the endpoint
|
||||
handle = serve.get_handle("my_endpoint")
|
||||
|
||||
future_list = []
|
||||
|
||||
# fire 10 requests with slo's in the (almost) reverse order of the order in
|
||||
# which remote procedure call is done
|
||||
for r in range(10):
|
||||
slo_ms = 1000 - 100 * r
|
||||
response = "hello from request: {} slo: {}".format(r, slo_ms)
|
||||
print("> [REMOTE] Pinging handle.remote(response='{}',slo_ms={})".format(
|
||||
response, slo_ms))
|
||||
|
||||
# overriding slo for each query.
|
||||
# Generally slo is specified for a service handle but it can
|
||||
# be overrided using options for query specific demands
|
||||
f = handle.options(relative_slo_ms=slo_ms).remote(response=response)
|
||||
future_list.append(f)
|
||||
|
||||
# get results of queries as they complete
|
||||
# should be completed (almost) according to the order of their slo time
|
||||
left_futures = future_list
|
||||
while left_futures:
|
||||
completed_futures, remaining_futures = ray.wait(left_futures, timeout=0.05)
|
||||
if len(completed_futures) > 0:
|
||||
result = ray.get(completed_futures[0])
|
||||
print(result)
|
||||
left_futures = remaining_futures
|
||||
@@ -31,35 +31,14 @@ class RayServeHandle:
|
||||
self,
|
||||
router_handle,
|
||||
endpoint_name,
|
||||
relative_slo_ms=None,
|
||||
absolute_slo_ms=None,
|
||||
method_name=None,
|
||||
shard_key=None,
|
||||
):
|
||||
self.router_handle = router_handle
|
||||
self.endpoint_name = endpoint_name
|
||||
assert relative_slo_ms is None or absolute_slo_ms is None, (
|
||||
"Can't specify both "
|
||||
"relative and absolute "
|
||||
"slo's together!")
|
||||
self.relative_slo_ms = self._check_slo_ms(relative_slo_ms)
|
||||
self.absolute_slo_ms = self._check_slo_ms(absolute_slo_ms)
|
||||
self.method_name = method_name
|
||||
self.shard_key = shard_key
|
||||
|
||||
def _check_slo_ms(self, slo_value):
|
||||
if slo_value is not None:
|
||||
try:
|
||||
slo_value = float(slo_value)
|
||||
if slo_value < 0:
|
||||
raise ValueError(
|
||||
"Request SLO must be positive, it is {}".format(
|
||||
slo_value))
|
||||
return slo_value
|
||||
except ValueError as e:
|
||||
raise RayServeException(str(e))
|
||||
return None
|
||||
|
||||
def remote(self, *args, **kwargs):
|
||||
if len(args) != 0:
|
||||
raise RayServeException(
|
||||
@@ -73,26 +52,13 @@ class RayServeHandle:
|
||||
request_in_object = RequestMetadata(
|
||||
self.endpoint_name,
|
||||
TaskContext.Python,
|
||||
self.relative_slo_ms,
|
||||
self.absolute_slo_ms,
|
||||
call_method=method_name,
|
||||
shard_key=self.shard_key,
|
||||
)
|
||||
return self.router_handle.enqueue_request.remote(
|
||||
request_in_object, **kwargs)
|
||||
|
||||
def options(self,
|
||||
method_name=None,
|
||||
shard_key=None,
|
||||
relative_slo_ms=None,
|
||||
absolute_slo_ms=None):
|
||||
# If both the slo's are None then then we use a high default
|
||||
# value so other queries can be prioritize and put in front of these
|
||||
# queries.
|
||||
assert not all([absolute_slo_ms, relative_slo_ms
|
||||
]), ("Can't specify both "
|
||||
"relative and absolute "
|
||||
"slo's together!")
|
||||
def options(self, method_name=None, shard_key=None):
|
||||
|
||||
# Don't override existing method
|
||||
if method_name is None and self.method_name is not None:
|
||||
@@ -104,8 +70,6 @@ class RayServeHandle:
|
||||
return RayServeHandle(
|
||||
self.router_handle,
|
||||
self.endpoint_name,
|
||||
relative_slo_ms,
|
||||
absolute_slo_ms,
|
||||
method_name=method_name,
|
||||
shard_key=shard_key,
|
||||
)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
from urllib.parse import parse_qs
|
||||
import socket
|
||||
from typing import List
|
||||
|
||||
@@ -61,32 +60,6 @@ class HTTPProxy:
|
||||
|
||||
return b"".join(body_buffer)
|
||||
|
||||
def _parse_latency_slo(self, scope):
|
||||
query_string = scope["query_string"].decode("ascii")
|
||||
query_kwargs = parse_qs(query_string)
|
||||
|
||||
relative_slo_ms = query_kwargs.pop("relative_slo_ms", None)
|
||||
absolute_slo_ms = query_kwargs.pop("absolute_slo_ms", None)
|
||||
relative_slo_ms = self._validate_slo_ms(relative_slo_ms)
|
||||
absolute_slo_ms = self._validate_slo_ms(absolute_slo_ms)
|
||||
if relative_slo_ms is not None and absolute_slo_ms is not None:
|
||||
raise ValueError("Both relative and absolute slo's"
|
||||
"cannot be specified.")
|
||||
return relative_slo_ms, absolute_slo_ms
|
||||
|
||||
def _validate_slo_ms(self, request_slo_ms):
|
||||
if request_slo_ms is None:
|
||||
return None
|
||||
if len(request_slo_ms) != 1:
|
||||
raise ValueError(
|
||||
"Multiple SLO specified, please specific only one.")
|
||||
request_slo_ms = request_slo_ms[0]
|
||||
request_slo_ms = float(request_slo_ms)
|
||||
if request_slo_ms < 0:
|
||||
raise ValueError("Request SLO must be positive, it is {}".format(
|
||||
request_slo_ms))
|
||||
return request_slo_ms
|
||||
|
||||
def _make_error_sender(self, scope, receive, send):
|
||||
async def sender(error_message, status_code):
|
||||
response = Response(error_message, status_code=status_code)
|
||||
@@ -142,19 +115,10 @@ class HTTPProxy:
|
||||
|
||||
http_body_bytes = await self.receive_http_body(scope, receive, send)
|
||||
|
||||
# get slo_ms before enqueuing the query
|
||||
try:
|
||||
relative_slo_ms, absolute_slo_ms = self._parse_latency_slo(scope)
|
||||
except ValueError as e:
|
||||
await error_sender(str(e), 400)
|
||||
return
|
||||
|
||||
headers = {k.decode(): v.decode() for k, v in scope["headers"]}
|
||||
request_metadata = RequestMetadata(
|
||||
endpoint_name,
|
||||
TaskContext.Web,
|
||||
relative_slo_ms=relative_slo_ms,
|
||||
absolute_slo_ms=absolute_slo_ms,
|
||||
call_method=headers.get("X-SERVE-CALL-METHOD".lower(), "__call__"),
|
||||
shard_key=headers.get("X-SERVE-SHARD-KEY".lower(), None),
|
||||
)
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import time
|
||||
from ray.serve.constants import DEFAULT_LATENCY_SLO_MS
|
||||
import ray.cloudpickle as pickle
|
||||
|
||||
|
||||
@@ -10,37 +8,19 @@ class RequestMetadata:
|
||||
Args:
|
||||
endpoint(str): A registered endpoint.
|
||||
request_context(TaskContext): Context of a request.
|
||||
request_slo_ms(float): Expected time for the query to get
|
||||
completed.
|
||||
is_wall_clock_time(bool): if True, router won't add wall clock
|
||||
time to `request_slo_ms`.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
endpoint,
|
||||
request_context,
|
||||
relative_slo_ms=None,
|
||||
absolute_slo_ms=None,
|
||||
call_method="__call__",
|
||||
shard_key=None):
|
||||
|
||||
self.endpoint = endpoint
|
||||
self.request_context = request_context
|
||||
self.relative_slo_ms = relative_slo_ms
|
||||
self.absolute_slo_ms = absolute_slo_ms
|
||||
self.call_method = call_method
|
||||
self.shard_key = shard_key
|
||||
|
||||
def adjust_relative_slo_ms(self) -> float:
|
||||
"""Normalize the input latency objective to absolute timestamp.
|
||||
|
||||
"""
|
||||
slo_ms = self.relative_slo_ms
|
||||
if slo_ms is None:
|
||||
slo_ms = DEFAULT_LATENCY_SLO_MS
|
||||
current_time_ms = time.time() * 1000
|
||||
return current_time_ms + slo_ms
|
||||
|
||||
def ray_serialize(self):
|
||||
return pickle.dumps(self.__dict__)
|
||||
|
||||
|
||||
@@ -5,8 +5,6 @@ import time
|
||||
from typing import DefaultDict, List
|
||||
import pickle
|
||||
|
||||
import blist
|
||||
|
||||
from ray.exceptions import RayTaskError
|
||||
|
||||
import ray
|
||||
@@ -24,7 +22,6 @@ class Query:
|
||||
request_args,
|
||||
request_kwargs,
|
||||
request_context,
|
||||
request_slo_ms,
|
||||
call_method="__call__",
|
||||
shard_key=None,
|
||||
async_future=None,
|
||||
@@ -36,10 +33,6 @@ class Query:
|
||||
|
||||
self.async_future = async_future
|
||||
|
||||
# Service level objective in milliseconds. This is expected to be the
|
||||
# absolute time since unix epoch.
|
||||
self.request_slo_ms = request_slo_ms
|
||||
|
||||
self.call_method = call_method
|
||||
self.shard_key = shard_key
|
||||
self.is_shadow_query = is_shadow_query
|
||||
@@ -59,11 +52,6 @@ class Query:
|
||||
kwargs = pickle.loads(value)
|
||||
return Query(**kwargs)
|
||||
|
||||
# adding comparator fn for maintaining an
|
||||
# ascending order sorted list w.r.t request_slo_ms
|
||||
def __lt__(self, other):
|
||||
return self.request_slo_ms < other.request_slo_ms
|
||||
|
||||
|
||||
def _make_future_unwrapper(client_futures: List[asyncio.Future],
|
||||
host_future: asyncio.Future):
|
||||
@@ -111,7 +99,7 @@ class Router:
|
||||
# backend_name -> worker replica tag queue
|
||||
self.worker_queues: DefaultDict[deque[str]] = defaultdict(deque)
|
||||
# backend_name -> worker payload queue
|
||||
self.backend_queues = defaultdict(blist.sortedlist)
|
||||
self.backend_queues = defaultdict(deque)
|
||||
|
||||
# -- Metadata -- #
|
||||
|
||||
@@ -184,18 +172,11 @@ class Router:
|
||||
logger.debug("Received a request for endpoint {}".format(endpoint))
|
||||
self.num_router_requests.labels(endpoint=endpoint).add()
|
||||
|
||||
# check if the slo specified is directly the
|
||||
# wall clock time
|
||||
if request_meta.absolute_slo_ms is not None:
|
||||
request_slo_ms = request_meta.absolute_slo_ms
|
||||
else:
|
||||
request_slo_ms = request_meta.adjust_relative_slo_ms()
|
||||
request_context = request_meta.request_context
|
||||
query = Query(
|
||||
request_args,
|
||||
request_kwargs,
|
||||
request_context,
|
||||
request_slo_ms,
|
||||
call_method=request_meta.call_method,
|
||||
shard_key=request_meta.shard_key,
|
||||
async_future=asyncio.get_event_loop().create_future())
|
||||
@@ -366,7 +347,7 @@ class Router:
|
||||
backend, curr_queries))
|
||||
continue
|
||||
|
||||
request = buffer_queue.pop(0)
|
||||
request = buffer_queue.pop()
|
||||
self.queries_counter[backend][backend_replica_tag] += 1
|
||||
future = asyncio.get_event_loop().create_task(
|
||||
self._do_query(backend, backend_replica_tag, request))
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
|
||||
import pytest
|
||||
@@ -66,31 +65,6 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
|
||||
assert got_work.request_kwargs == {}
|
||||
|
||||
|
||||
async def test_slo(serve_instance, task_runner_mock_actor):
|
||||
q = ray.remote(Router).remote()
|
||||
await q.setup.remote("")
|
||||
await q.set_traffic.remote("svc", TrafficPolicy({"backend-slo": 1.0}))
|
||||
|
||||
all_request_sent = []
|
||||
for i in range(10):
|
||||
slo_ms = 1000 - 100 * i
|
||||
all_request_sent.append(
|
||||
q.enqueue_request.remote(
|
||||
RequestMetadata("svc", None, relative_slo_ms=slo_ms), i))
|
||||
|
||||
await q.add_new_worker.remote("backend-slo", "replica-1",
|
||||
task_runner_mock_actor)
|
||||
|
||||
await asyncio.gather(*all_request_sent)
|
||||
|
||||
i_should_be = 9
|
||||
all_calls = await task_runner_mock_actor.get_all_calls.remote()
|
||||
all_calls = all_calls[-10:]
|
||||
for call in all_calls:
|
||||
assert call.request_args[0] == i_should_be
|
||||
i_should_be -= 1
|
||||
|
||||
|
||||
async def test_alter_backend(serve_instance, task_runner_mock_actor):
|
||||
q = ray.remote(Router).remote()
|
||||
await q.setup.remote("")
|
||||
|
||||
+1
-1
@@ -110,7 +110,7 @@ if os.getenv("RAY_USE_NEW_GCS") == "on":
|
||||
# in this directory
|
||||
extras = {
|
||||
"debug": [],
|
||||
"serve": ["uvicorn", "flask", "blist", "requests"],
|
||||
"serve": ["uvicorn", "flask", "requests"],
|
||||
"tune": ["tabulate", "tensorboardX", "pandas"]
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user