mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 01:23:10 +08:00
[Serve] Async Router (#6873)
This commit is contained in:
@@ -87,10 +87,12 @@ def get_async(object_id):
|
||||
|
||||
if object_id.is_direct_call_type():
|
||||
inner_future = loop.create_future()
|
||||
# We must add the done_callback before sending to in_memory_store_get
|
||||
inner_future.add_done_callback(done_callback)
|
||||
core_worker.in_memory_store_get_async(object_id, inner_future)
|
||||
else:
|
||||
inner_future = as_future(object_id)
|
||||
inner_future.add_done_callback(done_callback)
|
||||
inner_future.add_done_callback(done_callback)
|
||||
# A hack to keep reference to inner_future so it doesn't get GC.
|
||||
user_future.inner_future = inner_future
|
||||
# A hack to keep a reference to the object ID for ref counting.
|
||||
|
||||
@@ -2,10 +2,10 @@ from ray.experimental.serve.backend_config import BackendConfig
|
||||
from ray.experimental.serve.policy import RoutePolicy
|
||||
from ray.experimental.serve.api import (
|
||||
init, create_backend, create_endpoint, link, split, get_handle, stat,
|
||||
set_backend_config, get_backend_config, accept_batch) # noqa: E402
|
||||
set_backend_config, get_backend_config, accept_batch, route) # noqa: E402
|
||||
|
||||
__all__ = [
|
||||
"init", "create_backend", "create_endpoint", "link", "split", "get_handle",
|
||||
"stat", "set_backend_config", "get_backend_config", "BackendConfig",
|
||||
"RoutePolicy", "accept_batch"
|
||||
"RoutePolicy", "accept_batch", "route"
|
||||
]
|
||||
|
||||
@@ -16,6 +16,7 @@ from ray.experimental.serve.utils import (block_until_http_ready,
|
||||
from ray.experimental.serve.exceptions import RayServeException
|
||||
from ray.experimental.serve.backend_config import BackendConfig
|
||||
from ray.experimental.serve.policy import RoutePolicy
|
||||
from ray.experimental.serve.queues import Query
|
||||
global_state = None
|
||||
|
||||
|
||||
@@ -111,6 +112,10 @@ def init(kv_store_connector=None,
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Register serialization context once
|
||||
ray.register_custom_serializer(Query, Query.ray_serialize,
|
||||
Query.ray_deserialize)
|
||||
|
||||
if kv_store_path is None:
|
||||
_, kv_store_path = mkstemp()
|
||||
|
||||
@@ -439,3 +444,16 @@ def stat(percentiles=[50, 90, 95],
|
||||
"""
|
||||
return ray.get(global_state.init_or_get_metric_monitor().collect.remote(
|
||||
percentiles, agg_windows_seconds))
|
||||
|
||||
|
||||
class route:
|
||||
def __init__(self, url_route):
|
||||
self.route = url_route
|
||||
|
||||
def __call__(self, func_or_class):
|
||||
name = func_or_class.__name__
|
||||
backend_tag = "{}:v0".format(name)
|
||||
|
||||
create_backend(func_or_class, backend_tag)
|
||||
create_endpoint(name, self.route)
|
||||
link(name, backend_tag)
|
||||
|
||||
@@ -8,10 +8,16 @@ SERVE_NURSERY_NAME = "SERVE_ACTOR_NURSERY"
|
||||
BOOTSTRAP_KV_STORE_CONN_KEY = "kv_store_connector"
|
||||
|
||||
#: HTTP Address
|
||||
DEFAULT_HTTP_ADDRESS = "http://0.0.0.0:8000"
|
||||
DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000"
|
||||
|
||||
#: HTTP Host
|
||||
DEFAULT_HTTP_HOST = "0.0.0.0"
|
||||
DEFAULT_HTTP_HOST = "127.0.0.1"
|
||||
|
||||
#: HTTP Port
|
||||
DEFAULT_HTTP_PORT = 8000
|
||||
|
||||
#: Max concurrency
|
||||
ASYNC_CONCURRENCY = int(1e6)
|
||||
|
||||
#: Default latency SLO
|
||||
DEFAULT_LATENCY_SLO_MS = 1e9
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
from ray.experimental import serve
|
||||
from ray.experimental.serve.constants import DEFAULT_HTTP_ADDRESS
|
||||
import requests
|
||||
import time
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
|
||||
serve.init(blocking=True)
|
||||
|
||||
|
||||
@serve.route("/noop")
|
||||
def noop(_):
|
||||
return ""
|
||||
|
||||
|
||||
url = "{}/noop".format(DEFAULT_HTTP_ADDRESS)
|
||||
while requests.get(url).status_code == 404:
|
||||
time.sleep(1)
|
||||
print("Waiting for noop route to showup.")
|
||||
|
||||
latency = []
|
||||
for _ in tqdm(range(5200)):
|
||||
start = time.perf_counter()
|
||||
resp = requests.get(url)
|
||||
end = time.perf_counter()
|
||||
latency.append(end - start)
|
||||
|
||||
# Remove initial samples
|
||||
latency = latency[200:]
|
||||
|
||||
series = pd.Series(latency) * 1000
|
||||
print("Latency for single noop backend (ms)")
|
||||
print(series.describe(percentiles=[0.5, 0.9, 0.95, 0.99]))
|
||||
@@ -1,7 +1,7 @@
|
||||
import ray
|
||||
from ray.experimental.serve.constants import (
|
||||
BOOTSTRAP_KV_STORE_CONN_KEY, DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
|
||||
SERVE_NURSERY_NAME)
|
||||
SERVE_NURSERY_NAME, ASYNC_CONCURRENCY)
|
||||
from ray.experimental.serve.kv_store_service import (
|
||||
BackendTable, RoutingTable, TrafficPolicyTable)
|
||||
from ray.experimental.serve.metric import (MetricMonitor,
|
||||
@@ -37,9 +37,16 @@ class ActorNursery:
|
||||
|
||||
self.bootstrap_state = dict()
|
||||
|
||||
def start_actor(self, actor_cls, tag, init_args=(), init_kwargs={}):
|
||||
def start_actor(self,
|
||||
actor_cls,
|
||||
tag,
|
||||
init_args=(),
|
||||
init_kwargs={},
|
||||
is_asyncio=False):
|
||||
"""Start an actor and add it to the nursery"""
|
||||
handle = actor_cls.remote(*init_args, **init_kwargs)
|
||||
max_concurrency = ASYNC_CONCURRENCY if is_asyncio else None
|
||||
handle = (actor_cls.options(max_concurrency=max_concurrency).remote(
|
||||
*init_args, **init_kwargs))
|
||||
self.actor_handles[handle] = tag
|
||||
return [handle]
|
||||
|
||||
@@ -137,8 +144,9 @@ class GlobalState:
|
||||
self.actor_nursery_handle.start_actor.remote(
|
||||
self.queueing_policy.value,
|
||||
init_kwargs=policy_kwargs,
|
||||
tag=queue_actor_tag))
|
||||
handle.register_self_handle.remote(handle)
|
||||
tag=queue_actor_tag,
|
||||
is_asyncio=True))
|
||||
# handle.register_self_handle.remote(handle)
|
||||
self.refresh_actor_handle_cache()
|
||||
|
||||
return self.actor_handle_cache[queue_actor_tag]
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import ray
|
||||
from ray.experimental import serve
|
||||
from ray.experimental.serve.context import TaskContext
|
||||
from ray.experimental.serve.exceptions import RayServeException
|
||||
@@ -48,14 +47,12 @@ class RayServeHandle:
|
||||
except ValueError as e:
|
||||
raise RayServeException(str(e))
|
||||
|
||||
result_object_id_bytes = ray.get(
|
||||
self.router_handle.enqueue_request.remote(
|
||||
service=self.endpoint_name,
|
||||
request_args=(),
|
||||
request_kwargs=kwargs,
|
||||
request_context=TaskContext.Python,
|
||||
request_slo_ms=request_slo_ms))
|
||||
return ray.ObjectID(result_object_id_bytes)
|
||||
return self.router_handle.enqueue_request.remote(
|
||||
service=self.endpoint_name,
|
||||
request_args=(),
|
||||
request_kwargs=kwargs,
|
||||
request_context=TaskContext.Python,
|
||||
request_slo_ms=request_slo_ms)
|
||||
|
||||
def get_traffic_policy(self):
|
||||
# TODO(simon): This method is implemented via checking global state
|
||||
|
||||
@@ -1,7 +1,178 @@
|
||||
from enum import Enum
|
||||
from ray.experimental.serve.queues import (
|
||||
RoundRobinPolicyQueueActor, RandomPolicyQueueActor,
|
||||
PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor)
|
||||
import itertools
|
||||
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
from ray.experimental.serve.queues import (CentralizedQueues)
|
||||
from ray.experimental.serve.utils import logger
|
||||
|
||||
|
||||
class RandomPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for Random policy.This backend selection policy is
|
||||
`Stateless` meaning the current decisions of selecting backend are
|
||||
not dependent on previous decisions. Random policy (randomly) samples
|
||||
backends based on backend weights for every query. This policy uses the
|
||||
weights assigned to backends.
|
||||
"""
|
||||
|
||||
async def _flush_service_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.service_queues.items():
|
||||
# while there are incoming requests and there are backends
|
||||
while queue.qsize() and len(self.traffic[service]):
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
backend_weights = list(self.traffic[service].values())
|
||||
# randomly choose a backend for every query
|
||||
chosen_backend = np.random.choice(
|
||||
backend_names, replace=False, p=backend_weights).squeeze()
|
||||
logger.debug("Matching service {} to backend {}".format(
|
||||
service, chosen_backend))
|
||||
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class RandomPolicyQueueActor(RandomPolicyQueue):
|
||||
pass
|
||||
|
||||
|
||||
class RoundRobinPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for RoundRobin policy. This backend selection policy
|
||||
is `Stateful` meaning the current decisions of selecting backend are
|
||||
dependent on previous decisions. RoundRobinPolicy assigns queries in
|
||||
an interleaved manner to every backend serving for a service. Consider
|
||||
backend A,B linked to a service. Now queries will be assigned to backends
|
||||
in the following order - [ A, B, A, B ... ] . This policy doesn't use the
|
||||
weights assigned to backends.
|
||||
"""
|
||||
|
||||
# Saves the information about last assigned
|
||||
# backend for every service
|
||||
round_robin_iterator_map = {}
|
||||
|
||||
async def set_traffic(self, service, traffic_dict):
|
||||
logger.debug("Setting traffic for service %s to %s", service,
|
||||
traffic_dict)
|
||||
self.traffic[service] = traffic_dict
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
self.round_robin_iterator_map[service] = itertools.cycle(backend_names)
|
||||
await self.flush()
|
||||
|
||||
async def _flush_service_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.service_queues.items():
|
||||
# if there are incoming requests and there are backends
|
||||
if queue.qsize() and len(self.traffic[service]):
|
||||
while queue.qsize():
|
||||
# choose the next backend available from persistent
|
||||
# information
|
||||
chosen_backend = next(
|
||||
self.round_robin_iterator_map[service])
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue):
|
||||
pass
|
||||
|
||||
|
||||
class PowerOfTwoPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for powerOfTwo policy. This backend selection policy is
|
||||
`Stateless` meaning the current decisions of selecting backend are
|
||||
dependent on previous decisions. PowerOfTwo policy (randomly) samples two
|
||||
backends (say Backend A,B among A,B,C) based on the backend weights
|
||||
specified and chooses the backend which is less loaded. This policy uses
|
||||
the weights assigned to backends.
|
||||
"""
|
||||
|
||||
async def _flush_service_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.service_queues.items():
|
||||
# while there are incoming requests and there are backends
|
||||
while queue.qsize() and len(self.traffic[service]):
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
backend_weights = list(self.traffic[service].values())
|
||||
if len(self.traffic[service]) >= 2:
|
||||
# randomly pick 2 backends
|
||||
backend1, backend2 = np.random.choice(
|
||||
backend_names, 2, replace=False, p=backend_weights)
|
||||
|
||||
# see the length of buffer queues of the two backends
|
||||
# and pick the one which has less no. of queries
|
||||
# in the buffer
|
||||
if (len(self.buffer_queues[backend1]) <= len(
|
||||
self.buffer_queues[backend2])):
|
||||
chosen_backend = backend1
|
||||
else:
|
||||
chosen_backend = backend2
|
||||
logger.debug("[Power of two chocies] found two backends "
|
||||
"{} and {}: choosing {}.".format(
|
||||
backend1, backend2, chosen_backend))
|
||||
else:
|
||||
chosen_backend = np.random.choice(
|
||||
backend_names, replace=False,
|
||||
p=backend_weights).squeeze()
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue):
|
||||
pass
|
||||
|
||||
|
||||
class FixedPackingPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for FixedPacking policy. This backend selection policy is
|
||||
`Stateful` meaning the current decisions of selecting backend are dependent
|
||||
on previous decisions. FixedPackingPolicy is k RoundRobin policy where
|
||||
first packing_num queries are handled by 'backend-1' and next k queries are
|
||||
handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are
|
||||
served by the same service. This policy doesn't use the weights assigned to
|
||||
backends.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, packing_num=3):
|
||||
# Saves the information about last assigned
|
||||
# backend for every service
|
||||
self.fixed_packing_iterator_map = {}
|
||||
self.packing_num = packing_num
|
||||
super().__init__()
|
||||
|
||||
async def set_traffic(self, service, traffic_dict):
|
||||
logger.debug("Setting traffic for service %s to %s", service,
|
||||
traffic_dict)
|
||||
self.traffic[service] = traffic_dict
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
self.fixed_packing_iterator_map[service] = itertools.cycle(
|
||||
itertools.chain.from_iterable(
|
||||
itertools.repeat(x, self.packing_num) for x in backend_names))
|
||||
await self.flush()
|
||||
|
||||
async def _flush_service_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.service_queues.items():
|
||||
# if there are incoming requests and there are backends
|
||||
if queue.qsize() and len(self.traffic[service]):
|
||||
while queue.qsize():
|
||||
# choose the next backend available from persistent
|
||||
# information
|
||||
chosen_backend = next(
|
||||
self.fixed_packing_iterator_map[service])
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue):
|
||||
pass
|
||||
|
||||
|
||||
class RoutePolicy(Enum):
|
||||
|
||||
@@ -1,43 +1,94 @@
|
||||
from collections import defaultdict, deque
|
||||
import asyncio
|
||||
import copy
|
||||
from collections import defaultdict
|
||||
import time
|
||||
from typing import DefaultDict, Union, List
|
||||
import pickle
|
||||
|
||||
import numpy as np
|
||||
# Note on choosing blist instead of stdlib heapq
|
||||
# 1. pop operation should be O(1) (amortized)
|
||||
# (helpful even for batched pop)
|
||||
# 2. There should not be significant overhead in
|
||||
# maintaining the sorted list.
|
||||
# 3. The blist implementation is fast and uses C extensions.
|
||||
import blist
|
||||
|
||||
import ray
|
||||
from ray.experimental.serve.utils import logger
|
||||
import itertools
|
||||
from blist import sortedlist
|
||||
import time
|
||||
from ray.experimental.serve.constants import DEFAULT_LATENCY_SLO_MS
|
||||
|
||||
|
||||
class Query:
|
||||
def __init__(self,
|
||||
request_args,
|
||||
request_kwargs,
|
||||
request_context,
|
||||
request_slo_ms,
|
||||
result_object_id=None):
|
||||
def __init__(self, request_args, request_kwargs, request_context,
|
||||
request_slo_ms):
|
||||
self.request_args = request_args
|
||||
self.request_kwargs = request_kwargs
|
||||
self.request_context = request_context
|
||||
|
||||
if result_object_id is None:
|
||||
self.result_object_id = ray.ObjectID.from_random()
|
||||
else:
|
||||
self.result_object_id = result_object_id
|
||||
self.async_future = asyncio.get_event_loop().create_future()
|
||||
|
||||
# Service level objective in milliseconds. This is expected to be the
|
||||
# absolute time since unix epoch.
|
||||
self.request_slo_ms = request_slo_ms
|
||||
|
||||
def ray_serialize(self):
|
||||
# NOTE: this method is needed because Query need to be serialized and
|
||||
# sent to the replica worker. However, after we send the query to
|
||||
# replica worker the async_future is still needed to retrieve the final
|
||||
# result. Therefore we need a way to pass the information to replica
|
||||
# worker without removing async_future.
|
||||
clone = copy.copy(self)
|
||||
clone.async_future = None
|
||||
# We can't use cloudpickle due to a recursion issue
|
||||
return pickle.dumps(clone)
|
||||
|
||||
@staticmethod
|
||||
def ray_deserialize(value):
|
||||
return pickle.loads(value)
|
||||
|
||||
# adding comparator fn for maintaining an
|
||||
# ascending order sorted list w.r.t request_slo_ms
|
||||
def __lt__(self, other):
|
||||
return self.request_slo_ms < other.request_slo_ms
|
||||
|
||||
def __repr__(self):
|
||||
return "<Query args={} kwargs={}>".format(self.request_args,
|
||||
self.request_kwargs)
|
||||
|
||||
class WorkIntent:
|
||||
def __init__(self, replica_handle):
|
||||
self.replica_handle = replica_handle
|
||||
|
||||
def _adjust_latency_slo(slo_ms: Union[float, int, None]) -> float:
|
||||
"""Normalize the input latency objective to absoluate timestamp.
|
||||
|
||||
Input:
|
||||
slo_ms(float, int, None): If value is None, then we use a high default
|
||||
value so other queries can be prioritize and put in front of these
|
||||
queries.
|
||||
"""
|
||||
if slo_ms is None:
|
||||
slo_ms = DEFAULT_LATENCY_SLO_MS
|
||||
current_time_ms = time.time() * 1000
|
||||
return current_time_ms + slo_ms
|
||||
|
||||
|
||||
def _make_future_unwrapper(client_futures: List[asyncio.Future],
|
||||
host_future: asyncio.Future):
|
||||
"""Distribute the result of host_future to each of client_future"""
|
||||
for client_future in client_futures:
|
||||
# Keep a reference to host future so the host future won't get
|
||||
# garbage collected.
|
||||
client_future.host_ref = host_future
|
||||
|
||||
def unwrap_future(_):
|
||||
result = host_future.result()
|
||||
|
||||
if isinstance(result, list):
|
||||
for client_future, result_item in zip(client_futures, result):
|
||||
client_future.set_result(result_item)
|
||||
else: # Result is an exception.
|
||||
for client_future in client_futures:
|
||||
client_future.set_result(result)
|
||||
|
||||
return unwrap_future
|
||||
|
||||
|
||||
class CentralizedQueues:
|
||||
@@ -77,27 +128,44 @@ class CentralizedQueues:
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Note: Several queues are used in the router
|
||||
# - When a request come in, it's placed inside its corresponding
|
||||
# service_queue.
|
||||
# - The service_queue is dequed during flush operation, which moves
|
||||
# the queries to backend buffer_queue. Here we match a request
|
||||
# for a service to a backend given some policy.
|
||||
# - The worker_queue is used to collect idle actor handle. These
|
||||
# handles are dequed during the second stage of flush operation,
|
||||
# which assign queries in buffer_queue to actor handle.
|
||||
|
||||
# -- Queues -- #
|
||||
|
||||
# service_name -> request queue
|
||||
self.queues = defaultdict(deque)
|
||||
self.service_queues: DefaultDict[asyncio.Queue[Query]] = defaultdict(
|
||||
asyncio.Queue)
|
||||
# backend_name -> worker request queue
|
||||
self.worker_queues: DefaultDict[asyncio.Queue[
|
||||
ray.actor.ActorHandle]] = defaultdict(asyncio.Queue)
|
||||
# backend_name -> worker payload queue
|
||||
self.buffer_queues = defaultdict(blist.sortedlist)
|
||||
|
||||
# -- Metadata -- #
|
||||
|
||||
# service_name -> traffic_policy
|
||||
self.traffic = defaultdict(dict)
|
||||
|
||||
# backend_name -> backend_config
|
||||
self.backend_info = dict()
|
||||
|
||||
# backend_name -> worker request queue
|
||||
self.workers = defaultdict(deque)
|
||||
# -- Synchronization -- #
|
||||
|
||||
# backend_name -> worker payload queue
|
||||
# using blist sortedlist for deadline awareness
|
||||
# blist is chosen because:
|
||||
# 1. pop operation should be O(1) (amortized)
|
||||
# (helpful even for batched pop)
|
||||
# 2. There should not be significant overhead in
|
||||
# maintaining the sorted list.
|
||||
# 3. The blist implementation is fast and uses C extensions.
|
||||
self.buffer_queues = defaultdict(sortedlist)
|
||||
# This lock guarantee that only one flush operation can happen at a
|
||||
# time. Without the lock, multiple flush operation can pop from the
|
||||
# same buffer_queue and worker_queue and create deadlock. For example,
|
||||
# an operation holding the only query and the other flush operation
|
||||
# holding the only idle replica. Additionally, allowing only one flush
|
||||
# operation at a time simplifies design overhead for custom queuing and
|
||||
# batching polcies.
|
||||
self.flush_lock = asyncio.Lock()
|
||||
|
||||
def is_ready(self):
|
||||
return True
|
||||
@@ -111,79 +179,99 @@ class CentralizedQueues:
|
||||
for backend_name, queue in self.buffer_queues.items()
|
||||
}
|
||||
|
||||
# request_slo_ms is time specified in milliseconds till which the
|
||||
# answer of the query should be calculated
|
||||
def enqueue_request(self,
|
||||
service,
|
||||
request_args,
|
||||
request_kwargs,
|
||||
request_context,
|
||||
request_slo_ms=None):
|
||||
if request_slo_ms is None:
|
||||
# if request_slo_ms is not specified then set it to a high level
|
||||
request_slo_ms = 1e9
|
||||
async def enqueue_request(self,
|
||||
service,
|
||||
request_args,
|
||||
request_kwargs,
|
||||
request_context,
|
||||
request_slo_ms=None):
|
||||
logger.debug("Received a request for service {}".format(service))
|
||||
|
||||
# add wall clock time to specify the deadline for completion of query
|
||||
# this also assures FIFO behaviour if request_slo_ms is not specified
|
||||
request_slo_ms += (time.time() * 1000)
|
||||
request_slo_ms = _adjust_latency_slo(request_slo_ms)
|
||||
query = Query(request_args, request_kwargs, request_context,
|
||||
request_slo_ms)
|
||||
self.queues[service].append(query)
|
||||
self.flush()
|
||||
return query.result_object_id.binary()
|
||||
await self.service_queues[service].put(query)
|
||||
await self.flush()
|
||||
|
||||
def dequeue_request(self, backend, replica_handle):
|
||||
intention = WorkIntent(replica_handle)
|
||||
self.workers[backend].append(intention)
|
||||
self.flush()
|
||||
# Note: a future change can be to directly return the ObjectID from
|
||||
# replica task submission
|
||||
result = await query.async_future
|
||||
return result
|
||||
|
||||
def remove_and_destory_replica(self, backend, replica_handle):
|
||||
# NOTE: this function scale by O(#replicas for the backend)
|
||||
new_queue = deque()
|
||||
target_id = replica_handle._actor_id
|
||||
async def dequeue_request(self, backend, replica_handle):
|
||||
logger.debug(
|
||||
"Received a dequeue request for backend {}".format(backend))
|
||||
await self.worker_queues[backend].put(replica_handle)
|
||||
await self.flush()
|
||||
|
||||
for work_intent in self.workers[backend]:
|
||||
if work_intent.replica_handle._actor_id != target_id:
|
||||
new_queue.append(work_intent)
|
||||
async def remove_and_destory_replica(self, backend, replica_handle):
|
||||
# We need this lock because we modify worker_queue here.
|
||||
async with self.flush_lock:
|
||||
old_queue = self.worker_queues[backend]
|
||||
new_queue = asyncio.Queue()
|
||||
target_id = replica_handle._actor_id
|
||||
|
||||
self.workers[backend] = new_queue
|
||||
while not old_queue.empty():
|
||||
replica_handle = await old_queue.get()
|
||||
if replica_handle._actor_id != target_id:
|
||||
await new_queue.put(replica_handle)
|
||||
|
||||
replica_handle.__ray_terminate__.remote()
|
||||
self.worker_queues[backend] = new_queue
|
||||
# TODO: consider await this with timeout, or use ray_kill
|
||||
replica_handle.__ray_terminate__.remote()
|
||||
|
||||
def link(self, service, backend):
|
||||
async def link(self, service, backend):
|
||||
logger.debug("Link %s with %s", service, backend)
|
||||
self.set_traffic(service, {backend: 1.0})
|
||||
await self.set_traffic(service, {backend: 1.0})
|
||||
|
||||
def set_traffic(self, service, traffic_dict):
|
||||
async def set_traffic(self, service, traffic_dict):
|
||||
logger.debug("Setting traffic for service %s to %s", service,
|
||||
traffic_dict)
|
||||
self.traffic[service] = traffic_dict
|
||||
self.flush()
|
||||
await self.flush()
|
||||
|
||||
def set_backend_config(self, backend, config_dict):
|
||||
async def set_backend_config(self, backend, config_dict):
|
||||
logger.debug("Setting backend config for "
|
||||
"backend {} to {}".format(backend, config_dict))
|
||||
self.backend_info[backend] = config_dict
|
||||
|
||||
def flush(self):
|
||||
async def flush(self):
|
||||
"""In the default case, flush calls ._flush.
|
||||
|
||||
When this class is a Ray actor, .flush can be scheduled as a remote
|
||||
method invocation.
|
||||
"""
|
||||
self._flush()
|
||||
async with self.flush_lock:
|
||||
await self._flush_service_queues()
|
||||
await self._flush_buffer_queues()
|
||||
|
||||
def _get_available_backends(self, service):
|
||||
backends_in_policy = set(self.traffic[service].keys())
|
||||
available_workers = {
|
||||
backend
|
||||
for backend, queues in self.workers.items() if len(queues) > 0
|
||||
for backend, queues in self.worker_queues.items()
|
||||
if queues.qsize() > 0
|
||||
}
|
||||
return list(backends_in_policy.intersection(available_workers))
|
||||
|
||||
async def _flush_service_queues(self):
|
||||
"""Selects the backend and puts the service queue query to the buffer
|
||||
Expected Implementation:
|
||||
The implementer is expected to access and manipulate
|
||||
self.service_queues : dict[str,Deque]
|
||||
self.buffer_queues : dict[str,sortedlist]
|
||||
For registering the implemented policies register at policy.py
|
||||
Expected Behavior:
|
||||
the Deque of all services in self.service_queues linked with
|
||||
atleast one backend must be empty irrespective of whatever
|
||||
backend policy is implemented.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"This method should be implemented by child class.")
|
||||
|
||||
# flushes the buffer queue and assigns work to workers
|
||||
def _flush_buffer(self):
|
||||
for service in self.queues.keys():
|
||||
async def _flush_buffer_queues(self):
|
||||
for service in self.traffic.keys():
|
||||
ready_backends = self._get_available_backends(service)
|
||||
for backend in ready_backends:
|
||||
# no work available
|
||||
@@ -191,224 +279,40 @@ class CentralizedQueues:
|
||||
continue
|
||||
|
||||
buffer_queue = self.buffer_queues[backend]
|
||||
work_queue = self.workers[backend]
|
||||
worker_queue = self.worker_queues[backend]
|
||||
|
||||
logger.debug("Assigning queries for backend {} with buffer "
|
||||
"queue size {} and worker queue size {}".format(
|
||||
backend, len(buffer_queue),
|
||||
worker_queue.qsize()))
|
||||
|
||||
max_batch_size = None
|
||||
if backend in self.backend_info:
|
||||
max_batch_size = self.backend_info[backend][
|
||||
"max_batch_size"]
|
||||
|
||||
while len(buffer_queue) and len(work_queue):
|
||||
# get the work from work intent queue
|
||||
work = work_queue.popleft()
|
||||
# see if backend accepts batched queries
|
||||
if max_batch_size is not None:
|
||||
pop_size = min(len(buffer_queue), max_batch_size)
|
||||
request = [
|
||||
buffer_queue.pop(0) for _ in range(pop_size)
|
||||
]
|
||||
else:
|
||||
request = buffer_queue.pop(0)
|
||||
await self._assign_query_to_worker(buffer_queue, worker_queue,
|
||||
max_batch_size)
|
||||
|
||||
work.replica_handle._ray_serve_call.remote(request)
|
||||
async def _assign_query_to_worker(self,
|
||||
buffer_queue,
|
||||
worker_queue,
|
||||
max_batch_size=None):
|
||||
|
||||
# selects the backend and puts the service queue query to the buffer
|
||||
# different policies will implement different backend selection policies
|
||||
def _flush_service_queue(self):
|
||||
"""
|
||||
Expected Implementation:
|
||||
The implementer is expected to access and manipulate
|
||||
self.queues : dict[str,Deque]
|
||||
self.buffer_queues : dict[str,sortedlist]
|
||||
For registering the implemented policies register at policy.py!
|
||||
Expected Behavior:
|
||||
the Deque of all services in self.queues linked with
|
||||
atleast one backend must be empty irrespective of whatever
|
||||
backend policy is implemented.
|
||||
"""
|
||||
pass
|
||||
|
||||
# _flush function has to flush the service and buffer queues.
|
||||
def _flush(self):
|
||||
self._flush_service_queue()
|
||||
self._flush_buffer()
|
||||
|
||||
|
||||
class CentralizedQueuesActor(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for converting wrapper policy classes to ray
|
||||
actors. This is needed to make `flush` call asynchronous.
|
||||
"""
|
||||
self_handle = None
|
||||
|
||||
def register_self_handle(self, handle_to_this_actor):
|
||||
self.self_handle = handle_to_this_actor
|
||||
|
||||
def flush(self):
|
||||
if self.self_handle:
|
||||
self.self_handle._flush.remote()
|
||||
else:
|
||||
self._flush()
|
||||
|
||||
|
||||
class RandomPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for Random policy.This backend selection policy is
|
||||
`Stateless` meaning the current decisions of selecting backend are
|
||||
not dependent on previous decisions. Random policy (randomly) samples
|
||||
backends based on backend weights for every query. This policy uses the
|
||||
weights assigned to backends.
|
||||
"""
|
||||
|
||||
def _flush_service_queue(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.queues.items():
|
||||
# while there are incoming requests and there are backends
|
||||
while len(queue) and len(self.traffic[service]):
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
backend_weights = list(self.traffic[service].values())
|
||||
# randomly choose a backend for every query
|
||||
chosen_backend = np.random.choice(
|
||||
backend_names, p=backend_weights).squeeze()
|
||||
|
||||
request = queue.popleft()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class RandomPolicyQueueActor(RandomPolicyQueue, CentralizedQueuesActor):
|
||||
pass
|
||||
|
||||
|
||||
class RoundRobinPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for RoundRobin policy. This backend selection policy
|
||||
is `Stateful` meaning the current decisions of selecting backend are
|
||||
dependent on previous decisions. RoundRobinPolicy assigns queries in
|
||||
an interleaved manner to every backend serving for a service. Consider
|
||||
backend A,B linked to a service. Now queries will be assigned to backends
|
||||
in the following order - [ A, B, A, B ... ] . This policy doesn't use the
|
||||
weights assigned to backends.
|
||||
"""
|
||||
|
||||
# Saves the information about last assigned
|
||||
# backend for every service
|
||||
round_robin_iterator_map = {}
|
||||
|
||||
def set_traffic(self, service, traffic_dict):
|
||||
logger.debug("Setting traffic for service %s to %s", service,
|
||||
traffic_dict)
|
||||
self.traffic[service] = traffic_dict
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
self.round_robin_iterator_map[service] = itertools.cycle(backend_names)
|
||||
self.flush()
|
||||
|
||||
def _flush_service_queue(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.queues.items():
|
||||
# if there are incoming requests and there are backends
|
||||
if len(queue) and len(self.traffic[service]):
|
||||
while len(queue):
|
||||
# choose the next backend available from persistent
|
||||
# information
|
||||
chosen_backend = next(
|
||||
self.round_robin_iterator_map[service])
|
||||
request = queue.popleft()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue,
|
||||
CentralizedQueuesActor):
|
||||
pass
|
||||
|
||||
|
||||
class PowerOfTwoPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for powerOfTwo policy. This backend selection policy is
|
||||
`Stateless` meaning the current decisions of selecting backend are
|
||||
dependent on previous decisions. PowerOfTwo policy (randomly) samples two
|
||||
backends (say Backend A,B among A,B,C) based on the backend weights
|
||||
specified and chooses the backend which is less loaded. This policy uses
|
||||
the weights assigned to backends.
|
||||
"""
|
||||
|
||||
def _flush_service_queue(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.queues.items():
|
||||
# while there are incoming requests and there are backends
|
||||
while len(queue) and len(self.traffic[service]):
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
backend_weights = list(self.traffic[service].values())
|
||||
if len(self.traffic[service]) >= 2:
|
||||
# randomly pick 2 backends
|
||||
backend1, backend2 = np.random.choice(
|
||||
backend_names, 2, p=backend_weights)
|
||||
|
||||
# see the length of buffer queues of the two backends
|
||||
# and pick the one which has less no. of queries
|
||||
# in the buffer
|
||||
if (len(self.buffer_queues[backend1]) <= len(
|
||||
self.buffer_queues[backend2])):
|
||||
chosen_backend = backend1
|
||||
else:
|
||||
chosen_backend = backend2
|
||||
else:
|
||||
chosen_backend = np.random.choice(
|
||||
backend_names, p=backend_weights).squeeze()
|
||||
request = queue.popleft()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue,
|
||||
CentralizedQueuesActor):
|
||||
pass
|
||||
|
||||
|
||||
class FixedPackingPolicyQueue(CentralizedQueues):
|
||||
"""
|
||||
A wrapper class for FixedPacking policy. This backend selection policy is
|
||||
`Stateful` meaning the current decisions of selecting backend are dependent
|
||||
on previous decisions. FixedPackingPolicy is k RoundRobin policy where
|
||||
first packing_num queries are handled by 'backend-1' and next k queries are
|
||||
handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are
|
||||
served by the same service. This policy doesn't use the weights assigned to
|
||||
backends.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, packing_num=3):
|
||||
# Saves the information about last assigned
|
||||
# backend for every service
|
||||
self.fixed_packing_iterator_map = {}
|
||||
self.packing_num = packing_num
|
||||
super().__init__()
|
||||
|
||||
def set_traffic(self, service, traffic_dict):
|
||||
logger.debug("Setting traffic for service %s to %s", service,
|
||||
traffic_dict)
|
||||
self.traffic[service] = traffic_dict
|
||||
backend_names = list(self.traffic[service].keys())
|
||||
self.fixed_packing_iterator_map[service] = itertools.cycle(
|
||||
itertools.chain.from_iterable(
|
||||
itertools.repeat(x, self.packing_num) for x in backend_names))
|
||||
self.flush()
|
||||
|
||||
def _flush_service_queue(self):
|
||||
# perform traffic splitting for requests
|
||||
for service, queue in self.queues.items():
|
||||
# if there are incoming requests and there are backends
|
||||
if len(queue) and len(self.traffic[service]):
|
||||
while len(queue):
|
||||
# choose the next backend available from persistent
|
||||
# information
|
||||
chosen_backend = next(
|
||||
self.fixed_packing_iterator_map[service])
|
||||
request = queue.popleft()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue,
|
||||
CentralizedQueuesActor):
|
||||
pass
|
||||
while len(buffer_queue) and worker_queue.qsize():
|
||||
worker = await worker_queue.get()
|
||||
if max_batch_size is None: # No batching
|
||||
request = buffer_queue.pop(0)
|
||||
future = worker._ray_serve_call.remote(request).as_future()
|
||||
# chaining satisfies request.async_future with future result.
|
||||
asyncio.futures._chain_future(future, request.async_future)
|
||||
else:
|
||||
real_batch_size = min(len(buffer_queue), max_batch_size)
|
||||
requests = [
|
||||
buffer_queue.pop(0) for _ in range(real_batch_size)
|
||||
]
|
||||
future = worker._ray_serve_call.remote(requests).as_future()
|
||||
future.add_done_callback(
|
||||
_make_future_unwrapper(
|
||||
client_futures=[req.async_future for req in requests],
|
||||
host_future=future))
|
||||
|
||||
@@ -64,6 +64,7 @@ class HTTPProxy:
|
||||
self.serve_global_state = GlobalState()
|
||||
self.route_table_cache = dict()
|
||||
|
||||
self.route_checker_task = None
|
||||
self.route_checker_should_shutdown = False
|
||||
|
||||
async def route_checker(self, interval):
|
||||
@@ -82,10 +83,11 @@ class HTTPProxy:
|
||||
message = await receive()
|
||||
if message["type"] == "lifespan.startup":
|
||||
await _async_init()
|
||||
asyncio.ensure_future(
|
||||
self.route_checker_task = asyncio.get_event_loop().create_task(
|
||||
self.route_checker(interval=HTTP_ROUTER_CHECKER_INTERVAL_S))
|
||||
await send({"type": "lifespan.startup.complete"})
|
||||
elif message["type"] == "lifespan.shutdown":
|
||||
self.route_checker_task.cancel()
|
||||
self.route_checker_should_shutdown = True
|
||||
await send({"type": "lifespan.shutdown.complete"})
|
||||
|
||||
@@ -148,16 +150,14 @@ class HTTPProxy:
|
||||
await JSONResponse({"error": str(e)})(scope, receive, send)
|
||||
return
|
||||
|
||||
result_object_id_bytes = await (
|
||||
self.serve_global_state.init_or_get_router()
|
||||
.enqueue_request.remote(
|
||||
service=endpoint_name,
|
||||
request_args=(scope, http_body_bytes),
|
||||
request_kwargs=dict(),
|
||||
request_context=TaskContext.Web,
|
||||
request_slo_ms=request_slo_ms))
|
||||
|
||||
result = await ray.ObjectID(result_object_id_bytes)
|
||||
actual_result = await (self.serve_global_state.init_or_get_router()
|
||||
.enqueue_request.remote(
|
||||
service=endpoint_name,
|
||||
request_args=(scope, http_body_bytes),
|
||||
request_kwargs=dict(),
|
||||
request_context=TaskContext.Web,
|
||||
request_slo_ms=request_slo_ms))
|
||||
result = actual_result
|
||||
|
||||
if isinstance(result, ray.exceptions.RayTaskError):
|
||||
await JSONResponse({
|
||||
|
||||
@@ -95,19 +95,18 @@ class RayServeMixin:
|
||||
self._ray_serve_self_handle)
|
||||
|
||||
def invoke_single(self, request_item):
|
||||
args, kwargs, is_web_context, result_object_id = parse_request_item(
|
||||
request_item)
|
||||
args, kwargs, is_web_context = parse_request_item(request_item)
|
||||
serve_context.web = is_web_context
|
||||
start_timestamp = time.time()
|
||||
|
||||
try:
|
||||
result = self.__call__(*args, **kwargs)
|
||||
ray.worker.global_worker.put_object(result, result_object_id)
|
||||
except Exception as e:
|
||||
wrapped_exception = wrap_to_ray_error(e)
|
||||
result = wrap_to_ray_error(e)
|
||||
self._serve_metric_error_counter += 1
|
||||
ray.worker.global_worker.put_object(wrapped_exception,
|
||||
result_object_id)
|
||||
|
||||
self._serve_metric_latency_list.append(time.time() - start_timestamp)
|
||||
return result
|
||||
|
||||
def invoke_batch(self, request_item_list):
|
||||
# TODO(alind) : create no-http services. The enqueues
|
||||
@@ -124,78 +123,75 @@ class RayServeMixin:
|
||||
# args_list : [val1,val2, ...... , valn]
|
||||
# where n (current batch size) <= max_batch_size of a backend
|
||||
|
||||
arg_list = []
|
||||
kwargs_list = defaultdict(list)
|
||||
result_object_ids, context_flag_list, arg_list = [], [], []
|
||||
curr_batch_size = len(request_item_list)
|
||||
context_flags = set()
|
||||
batch_size = len(request_item_list)
|
||||
|
||||
for item in request_item_list:
|
||||
args, kwargs, is_web_context, result_object_id = (
|
||||
parse_request_item(item))
|
||||
context_flag_list.append(is_web_context)
|
||||
args, kwargs, is_web_context = parse_request_item(item)
|
||||
context_flags.add(is_web_context)
|
||||
|
||||
# Python context only have kwargs
|
||||
# Web context only have one positional argument
|
||||
if is_web_context:
|
||||
arg_list.append(args[0])
|
||||
# Python context only have kwargs
|
||||
flask_request = args[0]
|
||||
arg_list.append(flask_request)
|
||||
else:
|
||||
# Web context only have one positional argument
|
||||
for k, v in kwargs.items():
|
||||
kwargs_list[k].append(v)
|
||||
result_object_ids.append(result_object_id)
|
||||
|
||||
# Set the flask request as a list to conform
|
||||
# with batching semantics: when in batching
|
||||
# mode, each argument it turned into list.
|
||||
arg_list.append(FakeFlaskRequest())
|
||||
|
||||
try:
|
||||
# check mixing of query context
|
||||
# unified context needed
|
||||
if len(set(context_flag_list)) != 1:
|
||||
if len(context_flags) != 1:
|
||||
raise RayServeException(
|
||||
"Batched queries contain mixed context.")
|
||||
serve_context.web = all(context_flag_list)
|
||||
if serve_context.web:
|
||||
args = (arg_list, )
|
||||
else:
|
||||
# Set the flask request as a list to conform
|
||||
# with batching semantics: when in batching
|
||||
# mode, each argument it turned into list.
|
||||
fake_flask_request_lst = [
|
||||
FakeFlaskRequest() for _ in range(curr_batch_size)
|
||||
]
|
||||
args = (fake_flask_request_lst, )
|
||||
# set the current batch size (n) for serve_context
|
||||
serve_context.batch_size = len(result_object_ids)
|
||||
"Batched queries contain mixed context. Please only send "
|
||||
"the same type of requests in batching mode.")
|
||||
|
||||
serve_context.web = context_flags.pop()
|
||||
serve_context.batch_size = batch_size
|
||||
start_timestamp = time.time()
|
||||
|
||||
result_list = self.__call__(*args, **kwargs_list)
|
||||
if (not isinstance(result_list, list)) or (len(result_list) !=
|
||||
len(result_object_ids)):
|
||||
|
||||
self._serve_metric_latency_list.append(time.time() -
|
||||
start_timestamp)
|
||||
if (not isinstance(result_list,
|
||||
list)) or (len(result_list) != batch_size):
|
||||
raise RayServeException("__call__ function "
|
||||
"doesn't preserve batch-size. "
|
||||
"Please return a list of result "
|
||||
"with length equals to the batch "
|
||||
"size.")
|
||||
for result, result_object_id in zip(result_list,
|
||||
result_object_ids):
|
||||
ray.worker.global_worker.put_object(result, result_object_id)
|
||||
self._serve_metric_latency_list.append(time.time() -
|
||||
start_timestamp)
|
||||
return result_list
|
||||
except Exception as e:
|
||||
wrapped_exception = wrap_to_ray_error(e)
|
||||
self._serve_metric_error_counter += len(result_object_ids)
|
||||
for result_object_id in result_object_ids:
|
||||
ray.worker.global_worker.put_object(wrapped_exception,
|
||||
result_object_id)
|
||||
self._serve_metric_error_counter += batch_size
|
||||
return [wrapped_exception for _ in range(batch_size)]
|
||||
|
||||
def _ray_serve_call(self, request):
|
||||
work_item = request
|
||||
# check if work_item is a list or not
|
||||
# if it is list: then batching supported
|
||||
if not isinstance(work_item, list):
|
||||
self.invoke_single(work_item)
|
||||
if not isinstance(request, list):
|
||||
result = self.invoke_single(request)
|
||||
else:
|
||||
self.invoke_batch(work_item)
|
||||
result = self.invoke_batch(request)
|
||||
|
||||
# re-assign to default values
|
||||
serve_context.web = False
|
||||
serve_context.batch_size = None
|
||||
|
||||
# Tell router that current actor is idle
|
||||
self._ray_serve_fetch()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class TaskRunnerBackend(TaskRunner, RayServeMixin):
|
||||
"""A simple function serving backend
|
||||
|
||||
@@ -14,13 +14,9 @@ ray.init(address="auto")
|
||||
from ray.experimental import serve
|
||||
serve.init()
|
||||
|
||||
|
||||
def function(flask_request):
|
||||
@serve.route("/driver")
|
||||
def driver(flask_request):
|
||||
return "OK!"
|
||||
|
||||
serve.create_endpoint("driver", "/driver")
|
||||
serve.create_backend(function, "driver:v1")
|
||||
serve.link("driver", "driver:v1")
|
||||
"""
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
|
||||
|
||||
@@ -1,135 +1,190 @@
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
import ray
|
||||
from ray.experimental.serve.queues import RandomPolicyQueue
|
||||
from ray.experimental.serve.queues import (RoundRobinPolicyQueue,
|
||||
FixedPackingPolicyQueue)
|
||||
|
||||
from ray.experimental.serve.policy import (
|
||||
RandomPolicyQueue, RandomPolicyQueueActor, RoundRobinPolicyQueueActor,
|
||||
PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor)
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
def make_task_runner_mock():
|
||||
@ray.remote(num_cpus=0)
|
||||
class TaskRunnerMock:
|
||||
def __init__(self):
|
||||
self.query = None
|
||||
self.queries = []
|
||||
|
||||
async def _ray_serve_call(self, request_item):
|
||||
self.query = request_item
|
||||
self.queries.append(request_item)
|
||||
return "DONE"
|
||||
|
||||
def get_recent_call(self):
|
||||
return self.query
|
||||
|
||||
def get_all_calls(self):
|
||||
return self.queries
|
||||
|
||||
return TaskRunnerMock.remote()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def task_runner_mock_actor():
|
||||
@ray.remote
|
||||
class TaskRunnerMock:
|
||||
def __init__(self):
|
||||
self.result = None
|
||||
|
||||
def _ray_serve_call(self, request_item):
|
||||
self.result = request_item
|
||||
|
||||
def get_recent_call(self):
|
||||
return self.result
|
||||
|
||||
actor = TaskRunnerMock.remote()
|
||||
yield actor
|
||||
yield make_task_runner_mock()
|
||||
|
||||
|
||||
def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueue()
|
||||
q.link("svc", "backend")
|
||||
async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
q.link.remote("svc", "backend")
|
||||
q.dequeue_request.remote("backend", task_runner_mock_actor)
|
||||
|
||||
result_object_id = q.enqueue_request("svc", 1, "kwargs", None)
|
||||
q.dequeue_request("backend", task_runner_mock_actor)
|
||||
got_work = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
# Make sure we get the request result back
|
||||
result = await q.enqueue_request.remote("svc", 1, "kwargs", None)
|
||||
assert result == "DONE"
|
||||
|
||||
# Make sure it's the right request
|
||||
got_work = await task_runner_mock_actor.get_recent_call.remote()
|
||||
assert got_work.request_args == 1
|
||||
assert got_work.request_kwargs == "kwargs"
|
||||
|
||||
ray.worker.global_worker.put_object(2, got_work.result_object_id)
|
||||
assert ray.get(ray.ObjectID(result_object_id)) == 2
|
||||
|
||||
async def test_slo(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
await q.link.remote("svc", "backend")
|
||||
|
||||
def test_slo(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueue()
|
||||
q.link("svc", "backend")
|
||||
|
||||
all_request_sent = []
|
||||
for i in range(10):
|
||||
slo_ms = 1000 - 100 * i
|
||||
q.enqueue_request("svc", i, "kwargs", None, request_slo_ms=slo_ms)
|
||||
all_request_sent.append(
|
||||
q.enqueue_request.remote(
|
||||
"svc", i, "kwargs", None, request_slo_ms=slo_ms))
|
||||
|
||||
for i in range(10):
|
||||
q.dequeue_request("backend", task_runner_mock_actor)
|
||||
got_work = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
assert got_work.request_args == (9 - i)
|
||||
await q.dequeue_request.remote("backend", task_runner_mock_actor)
|
||||
|
||||
await asyncio.gather(*all_request_sent)
|
||||
|
||||
i_should_be = 9
|
||||
all_calls = await task_runner_mock_actor.get_all_calls.remote()
|
||||
all_calls = all_calls[-10:]
|
||||
for call in all_calls:
|
||||
assert call.request_args == i_should_be
|
||||
i_should_be -= 1
|
||||
|
||||
|
||||
def test_alter_backend(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueue()
|
||||
async def test_alter_backend(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
|
||||
q.set_traffic("svc", {"backend-1": 1})
|
||||
result_object_id = q.enqueue_request("svc", 1, "kwargs", None)
|
||||
q.dequeue_request("backend-1", task_runner_mock_actor)
|
||||
got_work = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
await q.set_traffic.remote("svc", {"backend-1": 1})
|
||||
await q.dequeue_request.remote("backend-1", task_runner_mock_actor)
|
||||
await q.enqueue_request.remote("svc", 1, "kwargs", None)
|
||||
got_work = await task_runner_mock_actor.get_recent_call.remote()
|
||||
assert got_work.request_args == 1
|
||||
ray.worker.global_worker.put_object(2, got_work.result_object_id)
|
||||
assert ray.get(ray.ObjectID(result_object_id)) == 2
|
||||
|
||||
q.set_traffic("svc", {"backend-2": 1})
|
||||
result_object_id = q.enqueue_request("svc", 1, "kwargs", None)
|
||||
q.dequeue_request("backend-2", task_runner_mock_actor)
|
||||
got_work = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
assert got_work.request_args == 1
|
||||
ray.worker.global_worker.put_object(2, got_work.result_object_id)
|
||||
assert ray.get(ray.ObjectID(result_object_id)) == 2
|
||||
await q.set_traffic.remote("svc", {"backend-2": 1})
|
||||
await q.dequeue_request.remote("backend-2", task_runner_mock_actor)
|
||||
await q.enqueue_request.remote("svc", 2, "kwargs", None)
|
||||
got_work = await task_runner_mock_actor.get_recent_call.remote()
|
||||
assert got_work.request_args == 2
|
||||
|
||||
|
||||
def test_split_traffic(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueue()
|
||||
async def test_split_traffic_random(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
|
||||
await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)]
|
||||
for _ in range(20):
|
||||
await q.dequeue_request.remote("backend-1", runner_1)
|
||||
await q.dequeue_request.remote("backend-2", runner_2)
|
||||
|
||||
q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
# assume 50% split, the probability of all 20 requests goes to a
|
||||
# single queue is 0.5^20 ~ 1-6
|
||||
for _ in range(20):
|
||||
q.enqueue_request("svc", 1, "kwargs", None)
|
||||
q.dequeue_request("backend-1", task_runner_mock_actor)
|
||||
result_one = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
q.dequeue_request("backend-2", task_runner_mock_actor)
|
||||
result_two = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
await q.enqueue_request.remote("svc", 1, "kwargs", None)
|
||||
|
||||
got_work = [result_one, result_two]
|
||||
got_work = [
|
||||
await runner.get_recent_call.remote()
|
||||
for runner in (runner_1, runner_2)
|
||||
]
|
||||
assert [g.request_args for g in got_work] == [1, 1]
|
||||
|
||||
|
||||
def test_split_traffic_round_robin(serve_instance, task_runner_mock_actor):
|
||||
q = RoundRobinPolicyQueue()
|
||||
q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
# since round robin policy is stateful firing two queries consecutively
|
||||
# would transfer the queries to two different backends
|
||||
for _ in range(2):
|
||||
q.enqueue_request("svc", 1, "kwargs", None)
|
||||
q.dequeue_request("backend-1", task_runner_mock_actor)
|
||||
result_one = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
q.dequeue_request("backend-2", task_runner_mock_actor)
|
||||
result_two = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
async def test_round_robin(serve_instance, task_runner_mock_actor):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
|
||||
got_work = [result_one, result_two]
|
||||
await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)]
|
||||
|
||||
# NOTE: this is the only difference between the
|
||||
# test_split_traffic_random and test_round_robin
|
||||
for _ in range(10):
|
||||
await q.dequeue_request.remote("backend-1", runner_1)
|
||||
await q.dequeue_request.remote("backend-2", runner_2)
|
||||
|
||||
for _ in range(20):
|
||||
await q.enqueue_request.remote("svc", 1, "kwargs", None)
|
||||
|
||||
got_work = [
|
||||
await runner.get_recent_call.remote()
|
||||
for runner in (runner_1, runner_2)
|
||||
]
|
||||
assert [g.request_args for g in got_work] == [1, 1]
|
||||
|
||||
|
||||
def test_split_traffic_fixed_packing(serve_instance, task_runner_mock_actor):
|
||||
async def test_fixed_packing(serve_instance):
|
||||
packing_num = 4
|
||||
q = FixedPackingPolicyQueue(packing_num=packing_num)
|
||||
q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
|
||||
# fire twice the number of queries as the packing number
|
||||
for i in range(2 * packing_num):
|
||||
q.enqueue_request("svc", i, "kwargs", None)
|
||||
q = FixedPackingPolicyQueueActor.remote(packing_num=packing_num)
|
||||
await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
|
||||
runner_1, runner_2 = (make_task_runner_mock() for _ in range(2))
|
||||
# both the backends will get equal number of queries
|
||||
# as it is packed round robin
|
||||
for _ in range(packing_num):
|
||||
q.dequeue_request("backend-1", task_runner_mock_actor)
|
||||
await q.dequeue_request.remote("backend-1", runner_1)
|
||||
await q.dequeue_request.remote("backend-2", runner_2)
|
||||
|
||||
result_one = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
|
||||
for _ in range(packing_num):
|
||||
q.dequeue_request("backend-2", task_runner_mock_actor)
|
||||
|
||||
result_two = ray.get(task_runner_mock_actor.get_recent_call.remote())
|
||||
|
||||
got_work = [result_one, result_two]
|
||||
assert [g.request_args
|
||||
for g in got_work] == [packing_num - 1, 2 * packing_num - 1]
|
||||
for backend, runner in zip(["1", "2"], [runner_1, runner_2]):
|
||||
for _ in range(packing_num):
|
||||
input_value = "should-go-to-backend-{}".format(backend)
|
||||
await q.enqueue_request.remote("svc", input_value, "kwargs", None)
|
||||
all_calls = await runner.get_all_calls.remote()
|
||||
for call in all_calls:
|
||||
assert call.request_args == input_value
|
||||
|
||||
|
||||
def test_queue_remove_replicas(serve_instance, task_runner_mock_actor):
|
||||
async def test_power_of_two_choices(serve_instance):
|
||||
q = PowerOfTwoPolicyQueueActor.remote()
|
||||
enqueue_futures = []
|
||||
|
||||
# First, fill the queue for backend-1 with 3 requests
|
||||
await q.set_traffic.remote("svc", {"backend-1": 1.0})
|
||||
for _ in range(3):
|
||||
future = q.enqueue_request.remote("svc", "1", "", None)
|
||||
enqueue_futures.append(future)
|
||||
|
||||
# Then, add a new backend, this backend should be filled next
|
||||
await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5})
|
||||
for _ in range(2):
|
||||
future = q.enqueue_request.remote("svc", "2", "", None)
|
||||
enqueue_futures.append(future)
|
||||
|
||||
runner_1, runner_2 = (make_task_runner_mock() for _ in range(2))
|
||||
for _ in range(3):
|
||||
await q.dequeue_request.remote("backend-1", runner_1)
|
||||
await q.dequeue_request.remote("backend-2", runner_2)
|
||||
|
||||
await asyncio.gather(*enqueue_futures)
|
||||
|
||||
assert len(await runner_1.get_all_calls.remote()) == 3
|
||||
assert len(await runner_2.get_all_calls.remote()) == 2
|
||||
|
||||
|
||||
async def test_queue_remove_replicas(serve_instance):
|
||||
temp_actor = make_task_runner_mock()
|
||||
q = RandomPolicyQueue()
|
||||
q.dequeue_request("backend", task_runner_mock_actor)
|
||||
q.remove_and_destory_replica("backend", task_runner_mock_actor)
|
||||
assert len(q.workers["backend"]) == 0
|
||||
await q.dequeue_request("backend", temp_actor)
|
||||
await q.remove_and_destory_replica("backend", temp_actor)
|
||||
assert q.worker_queues["backend"].qsize() == 0
|
||||
|
||||
@@ -2,12 +2,14 @@ import pytest
|
||||
|
||||
import ray
|
||||
import ray.experimental.serve.context as context
|
||||
from ray.experimental.serve.queues import RoundRobinPolicyQueueActor
|
||||
from ray.experimental.serve.policy import RoundRobinPolicyQueueActor
|
||||
from ray.experimental.serve.task_runner import (
|
||||
RayServeMixin, TaskRunner, TaskRunnerActor, wrap_to_ray_error)
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
def test_runner_basic():
|
||||
|
||||
async def test_runner_basic():
|
||||
def echo(i):
|
||||
return i
|
||||
|
||||
@@ -15,12 +17,12 @@ def test_runner_basic():
|
||||
assert r(1) == 1
|
||||
|
||||
|
||||
def test_runner_wraps_error():
|
||||
async def test_runner_wraps_error():
|
||||
wrapped = wrap_to_ray_error(Exception())
|
||||
assert isinstance(wrapped, ray.exceptions.RayTaskError)
|
||||
|
||||
|
||||
def test_runner_actor(serve_instance):
|
||||
async def test_runner_actor(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
|
||||
def echo(flask_request, i=None):
|
||||
@@ -30,24 +32,21 @@ def test_runner_actor(serve_instance):
|
||||
PRODUCER_NAME = "prod"
|
||||
|
||||
runner = TaskRunnerActor.remote(echo)
|
||||
|
||||
runner._ray_serve_setup.remote(CONSUMER_NAME, q, runner)
|
||||
runner._ray_serve_fetch.remote()
|
||||
|
||||
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
|
||||
|
||||
for query in [333, 444, 555]:
|
||||
result_token = ray.ObjectID(
|
||||
ray.get(
|
||||
q.enqueue_request.remote(
|
||||
PRODUCER_NAME,
|
||||
request_args=None,
|
||||
request_kwargs={"i": query},
|
||||
request_context=context.TaskContext.Python)))
|
||||
assert ray.get(result_token) == query
|
||||
result = await q.enqueue_request.remote(
|
||||
PRODUCER_NAME,
|
||||
request_args=None,
|
||||
request_kwargs={"i": query},
|
||||
request_context=context.TaskContext.Python)
|
||||
assert result == query
|
||||
|
||||
|
||||
def test_ray_serve_mixin(serve_instance):
|
||||
async def test_ray_serve_mixin(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
|
||||
CONSUMER_NAME = "runner-cls"
|
||||
@@ -72,17 +71,15 @@ def test_ray_serve_mixin(serve_instance):
|
||||
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
|
||||
|
||||
for query in [333, 444, 555]:
|
||||
result_token = ray.ObjectID(
|
||||
ray.get(
|
||||
q.enqueue_request.remote(
|
||||
PRODUCER_NAME,
|
||||
request_args=None,
|
||||
request_kwargs={"i": query},
|
||||
request_context=context.TaskContext.Python)))
|
||||
assert ray.get(result_token) == query + 3
|
||||
result = await q.enqueue_request.remote(
|
||||
PRODUCER_NAME,
|
||||
request_args=None,
|
||||
request_kwargs={"i": query},
|
||||
request_context=context.TaskContext.Python)
|
||||
assert result == query + 3
|
||||
|
||||
|
||||
def test_task_runner_check_context(serve_instance):
|
||||
async def test_task_runner_check_context(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
|
||||
def echo(flask_request, i=None):
|
||||
@@ -98,13 +95,11 @@ def test_task_runner_check_context(serve_instance):
|
||||
runner._ray_serve_fetch.remote()
|
||||
|
||||
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
|
||||
result_token = ray.ObjectID(
|
||||
ray.get(
|
||||
q.enqueue_request.remote(
|
||||
PRODUCER_NAME,
|
||||
request_args=None,
|
||||
request_kwargs={"i": 42},
|
||||
request_context=context.TaskContext.Python)))
|
||||
result_oid = q.enqueue_request.remote(
|
||||
PRODUCER_NAME,
|
||||
request_args=None,
|
||||
request_kwargs={"i": 42},
|
||||
request_context=context.TaskContext.Python)
|
||||
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
ray.get(result_token)
|
||||
await result_oid
|
||||
|
||||
@@ -4,6 +4,7 @@ import random
|
||||
import string
|
||||
import time
|
||||
import io
|
||||
import os
|
||||
|
||||
import requests
|
||||
from pygments import formatters, highlight, lexers
|
||||
@@ -23,14 +24,16 @@ def parse_request_item(request_item):
|
||||
args = (FakeFlaskRequest(), )
|
||||
kwargs = request_item.request_kwargs
|
||||
|
||||
result_object_id = request_item.result_object_id
|
||||
return args, kwargs, is_web_context, result_object_id
|
||||
return args, kwargs, is_web_context
|
||||
|
||||
|
||||
def _get_logger():
|
||||
logger = logging.getLogger("ray.serve")
|
||||
# TODO(simon): Make logging level configurable.
|
||||
logger.setLevel(logging.INFO)
|
||||
if os.environ.get("SERVE_LOG_DEBUG"):
|
||||
logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
logger.setLevel(logging.INFO)
|
||||
return logger
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user