mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 13:19:38 +08:00
[serve] Refactor router policies to remove inheritance (#8372)
This commit is contained in:
@@ -6,13 +6,14 @@ import time
|
||||
|
||||
import ray
|
||||
import ray.cloudpickle as pickle
|
||||
from ray.serve.backend_worker import create_backend_worker
|
||||
from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_ROUTER_NAME,
|
||||
SERVE_PROXY_NAME, SERVE_METRIC_SINK_NAME)
|
||||
from ray.serve.http_proxy import HTTPProxyActor
|
||||
from ray.serve.kv_store import RayInternalKVStore
|
||||
from ray.serve.backend_worker import create_backend_worker
|
||||
from ray.serve.utils import async_retryable, get_random_letters, logger
|
||||
from ray.serve.metric.exporter import MetricExporterActor
|
||||
from ray.serve.router import Router
|
||||
from ray.serve.utils import async_retryable, get_random_letters, logger
|
||||
|
||||
import numpy as np
|
||||
|
||||
@@ -48,8 +49,8 @@ class ServeMaster:
|
||||
requires all implementations here to be idempotent.
|
||||
"""
|
||||
|
||||
async def __init__(self, router_class, router_kwargs, start_http_proxy,
|
||||
http_proxy_host, http_proxy_port,
|
||||
async def __init__(self, router_policy, router_policy_kwargs,
|
||||
start_http_proxy, http_proxy_host, http_proxy_port,
|
||||
metric_exporter_class):
|
||||
# Used to read/write checkpoints.
|
||||
# TODO(edoakes): namespace the master actor and its checkpoints.
|
||||
@@ -88,7 +89,7 @@ class ServeMaster:
|
||||
# If starting the actor for the first time, starts up the other system
|
||||
# components. If recovering, fetches their actor handles.
|
||||
self._get_or_start_metric_exporter(metric_exporter_class)
|
||||
self._get_or_start_router(router_class, router_kwargs)
|
||||
self._get_or_start_router(router_policy, router_policy_kwargs)
|
||||
if start_http_proxy:
|
||||
self._get_or_start_http_proxy(http_proxy_host, http_proxy_port)
|
||||
|
||||
@@ -112,7 +113,7 @@ class ServeMaster:
|
||||
asyncio.get_event_loop().create_task(
|
||||
self._recover_from_checkpoint(checkpoint))
|
||||
|
||||
def _get_or_start_router(self, router_class, router_kwargs):
|
||||
def _get_or_start_router(self, policy, policy_kwargs):
|
||||
"""Get the router belonging to this serve cluster.
|
||||
|
||||
If the router does not already exist, it will be started.
|
||||
@@ -122,12 +123,12 @@ class ServeMaster:
|
||||
except ValueError:
|
||||
logger.info(
|
||||
"Starting router with name '{}'".format(SERVE_ROUTER_NAME))
|
||||
self.router = async_retryable(router_class).options(
|
||||
self.router = async_retryable(ray.remote(Router)).options(
|
||||
detached=True,
|
||||
name=SERVE_ROUTER_NAME,
|
||||
max_concurrency=ASYNC_CONCURRENCY,
|
||||
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
|
||||
).remote(**router_kwargs)
|
||||
).remote(policy, policy_kwargs)
|
||||
|
||||
def get_router(self):
|
||||
"""Returns a handle to the router managed by this actor."""
|
||||
|
||||
+134
-154
@@ -1,188 +1,168 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from enum import Enum
|
||||
import itertools
|
||||
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
from ray.serve.router import Router
|
||||
from ray.serve.utils import logger
|
||||
|
||||
|
||||
class RandomPolicyQueue(Router):
|
||||
class RoutingPolicy:
|
||||
"""Defines the interface for a routing policy for a single endpoint.
|
||||
|
||||
To add a new routing policy, a class should be defined that provides this
|
||||
interface. The class may be stateful, in which case it may also want to
|
||||
provide a non-default constructor. However, this state will be lost when
|
||||
the policy is updated (e.g., a new backend is added).
|
||||
"""
|
||||
A wrapper class for Random policy.This backend selection policy is
|
||||
`Stateless` meaning the current decisions of selecting backend are
|
||||
not dependent on previous decisions. Random policy (randomly) samples
|
||||
backends based on backend weights for every query. This policy uses the
|
||||
weights assigned to backends.
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
@abstractmethod
|
||||
async def flush(self, endpoint_queue, backend_queues):
|
||||
"""Flush the endpoint queue into the given backend queues.
|
||||
|
||||
This method should assign each query in the endpoint_queue to a
|
||||
backend in the backend_queues. Queries are assigned by popping them
|
||||
from the endpoint queue and pushing them onto a backend queue. The
|
||||
method must also return a set of all backend tags so that the caller
|
||||
knows which backend_queues to flush.
|
||||
|
||||
Arguments:
|
||||
endpoint_queue: asyncio.Queue containing queries to assign.
|
||||
backend_queues: Dict(str, asyncio.Queue) mapping backend tags to
|
||||
their corresponding query queues.
|
||||
|
||||
Returns:
|
||||
Set of backend tags that had queries added to their queues.
|
||||
"""
|
||||
assigned_backends = set()
|
||||
return assigned_backends
|
||||
|
||||
|
||||
class RandomPolicy(RoutingPolicy):
|
||||
"""
|
||||
A stateless policy that makes a weighted random decision to map each query
|
||||
to a backend using the specified weights.
|
||||
"""
|
||||
|
||||
async def _flush_endpoint_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for endpoint, queue in self.endpoint_queues.items():
|
||||
# while there are incoming requests and there are backends
|
||||
while queue.qsize() and len(self.traffic[endpoint]):
|
||||
backend_names = list(self.traffic[endpoint].keys())
|
||||
backend_weights = list(self.traffic[endpoint].values())
|
||||
# randomly choose a backend for every query
|
||||
chosen_backend = np.random.choice(
|
||||
backend_names, replace=False, p=backend_weights).squeeze()
|
||||
logger.debug("Matching endpoint {} to backend {}".format(
|
||||
endpoint, chosen_backend))
|
||||
def __init__(self, traffic_dict):
|
||||
self.backend_names = list(traffic_dict.keys())
|
||||
self.backend_weights = list(traffic_dict.values())
|
||||
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
async def flush(self, endpoint_queue, backend_queues):
|
||||
if len(self.backend_names) == 0:
|
||||
logger.info("No backends to assign traffic to.")
|
||||
return set()
|
||||
|
||||
assigned_backends = set()
|
||||
while endpoint_queue.qsize():
|
||||
chosen_backend = np.random.choice(
|
||||
self.backend_names, replace=False,
|
||||
p=self.backend_weights).squeeze()
|
||||
assigned_backends.add(chosen_backend)
|
||||
backend_queues[chosen_backend].add(await endpoint_queue.get())
|
||||
|
||||
return assigned_backends
|
||||
|
||||
|
||||
@ray.remote
|
||||
class RandomPolicyQueueActor(RandomPolicyQueue):
|
||||
pass
|
||||
class RoundRobinPolicy(RoutingPolicy):
|
||||
"""A stateful policy that assigns queries in round-robin order."""
|
||||
|
||||
def __init__(self, traffic_dict):
|
||||
# NOTE(edoakes): the backend weights are not used.
|
||||
self.backend_names = list(traffic_dict.keys())
|
||||
# Saves the information about last assigned backend for every endpoint.
|
||||
self.round_robin_iterator = itertools.cycle(self.backend_names)
|
||||
|
||||
async def flush(self, endpoint_queue, backend_queues):
|
||||
if len(self.backend_names) == 0:
|
||||
logger.info("No backends to assign traffic to.")
|
||||
return set()
|
||||
|
||||
assigned_backends = set()
|
||||
while endpoint_queue.qsize():
|
||||
chosen_backend = next(self.round_robin_iterator)
|
||||
assigned_backends.add(chosen_backend)
|
||||
backend_queues[chosen_backend].add(await endpoint_queue.get())
|
||||
|
||||
return assigned_backends
|
||||
|
||||
|
||||
class RoundRobinPolicyQueue(Router):
|
||||
"""
|
||||
A wrapper class for RoundRobin policy. This backend selection policy
|
||||
is `Stateful` meaning the current decisions of selecting backend are
|
||||
dependent on previous decisions. RoundRobinPolicy assigns queries in
|
||||
an interleaved manner to every backend serving for an endpoint. Consider
|
||||
backend A,B linked to a endpoint. Now queries will be assigned to backends
|
||||
in the following order - [ A, B, A, B ... ] . This policy doesn't use the
|
||||
weights assigned to backends.
|
||||
class PowerOfTwoPolicy(RoutingPolicy):
|
||||
"""A stateless policy that uses the "power of two" policy.
|
||||
|
||||
For each query, two random backends are chosen. Of those two, the query is
|
||||
assigned to the backend whose queue length is shorter.
|
||||
"""
|
||||
|
||||
# Saves the information about last assigned backend for every endpoint.
|
||||
round_robin_iterator_map = {}
|
||||
def __init__(self, traffic_dict):
|
||||
self.backend_names = list(traffic_dict.keys())
|
||||
self.backend_weights = list(traffic_dict.values())
|
||||
|
||||
async def set_traffic(self, endpoint, traffic_dict):
|
||||
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
|
||||
traffic_dict)
|
||||
self.traffic[endpoint] = traffic_dict
|
||||
backend_names = list(self.traffic[endpoint].keys())
|
||||
self.round_robin_iterator_map[endpoint] = itertools.cycle(
|
||||
backend_names)
|
||||
await self.flush()
|
||||
async def flush(self, endpoint_queue, backend_queues):
|
||||
if len(self.backend_names) == 0:
|
||||
logger.info("No backends to assign traffic to.")
|
||||
return set()
|
||||
|
||||
async def _flush_endpoint_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for endpoint, queue in self.endpoint_queues.items():
|
||||
# if there are incoming requests and there are backends
|
||||
if queue.qsize() and len(self.traffic[endpoint]):
|
||||
while queue.qsize():
|
||||
# choose the next backend available from persistent
|
||||
# information
|
||||
chosen_backend = next(
|
||||
self.round_robin_iterator_map[endpoint])
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
assigned_backends = set()
|
||||
while endpoint_queue.qsize():
|
||||
if len(self.backend_names) >= 2:
|
||||
backend1, backend2 = np.random.choice(
|
||||
self.backend_names,
|
||||
2,
|
||||
replace=False,
|
||||
p=self.backend_weights)
|
||||
|
||||
|
||||
@ray.remote
|
||||
class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue):
|
||||
pass
|
||||
|
||||
|
||||
class PowerOfTwoPolicyQueue(Router):
|
||||
"""
|
||||
A wrapper class for powerOfTwo policy. This backend selection policy is
|
||||
`Stateless` meaning the current decisions of selecting backend are
|
||||
dependent on previous decisions. PowerOfTwo policy (randomly) samples two
|
||||
backends (say Backend A,B among A,B,C) based on the backend weights
|
||||
specified and chooses the backend which is less loaded. This policy uses
|
||||
the weights assigned to backends.
|
||||
"""
|
||||
|
||||
async def _flush_endpoint_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for endpoint, queue in self.endpoint_queues.items():
|
||||
# while there are incoming requests and there are backends
|
||||
while queue.qsize() and len(self.traffic[endpoint]):
|
||||
backend_names = list(self.traffic[endpoint].keys())
|
||||
backend_weights = list(self.traffic[endpoint].values())
|
||||
if len(self.traffic[endpoint]) >= 2:
|
||||
# randomly pick 2 backends
|
||||
backend1, backend2 = np.random.choice(
|
||||
backend_names, 2, replace=False, p=backend_weights)
|
||||
|
||||
# see the length of buffer queues of the two backends
|
||||
# and pick the one which has less no. of queries
|
||||
# in the buffer
|
||||
if (len(self.buffer_queues[backend1]) <= len(
|
||||
self.buffer_queues[backend2])):
|
||||
chosen_backend = backend1
|
||||
else:
|
||||
chosen_backend = backend2
|
||||
logger.debug("[Power of two chocies] found two backends "
|
||||
"{} and {}: choosing {}.".format(
|
||||
backend1, backend2, chosen_backend))
|
||||
# Choose the backend that has a shorter queue.
|
||||
if (len(backend_queues[backend1]) <= len(
|
||||
backend_queues[backend2])):
|
||||
chosen_backend = backend1
|
||||
else:
|
||||
chosen_backend = np.random.choice(
|
||||
backend_names, replace=False,
|
||||
p=backend_weights).squeeze()
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
chosen_backend = backend2
|
||||
else:
|
||||
chosen_backend = np.random.choice(
|
||||
self.backend_names, replace=False,
|
||||
p=self.backend_weights).squeeze()
|
||||
backend_queues[chosen_backend].add(await endpoint_queue.get())
|
||||
assigned_backends.add(chosen_backend)
|
||||
|
||||
return assigned_backends
|
||||
|
||||
|
||||
@ray.remote
|
||||
class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue):
|
||||
pass
|
||||
|
||||
|
||||
class FixedPackingPolicyQueue(Router):
|
||||
"""
|
||||
A wrapper class for FixedPacking policy. This backend selection policy is
|
||||
`Stateful` meaning the current decisions of selecting backend are dependent
|
||||
on previous decisions. FixedPackingPolicy is k RoundRobin policy where
|
||||
first packing_num queries are handled by 'backend-1' and next k queries are
|
||||
handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are
|
||||
served by the same endpoint. This policy doesn't use the weights assigned
|
||||
to backends.
|
||||
class FixedPackingPolicy(RoutingPolicy):
|
||||
"""A stateful policy that uses a "fixed packing" policy.
|
||||
|
||||
The policy round-robins groups of packing_num queries across backends. For
|
||||
example, the first packing_num queries are handled by backend-1, then the
|
||||
next packing_num queries are handled by backend-2, etc.
|
||||
"""
|
||||
|
||||
async def __init__(self, packing_num=3):
|
||||
# Saves the information about last assigned
|
||||
# backend for every endpoint
|
||||
self.fixed_packing_iterator_map = {}
|
||||
self.packing_num = packing_num
|
||||
await super().__init__()
|
||||
|
||||
async def set_traffic(self, endpoint, traffic_dict):
|
||||
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
|
||||
traffic_dict)
|
||||
self.traffic[endpoint] = traffic_dict
|
||||
backend_names = list(self.traffic[endpoint].keys())
|
||||
self.fixed_packing_iterator_map[endpoint] = itertools.cycle(
|
||||
def __init__(self, traffic_dict, packing_num=3):
|
||||
# NOTE(edoakes): the backend weights are not used.
|
||||
self.backend_names = list(traffic_dict.keys())
|
||||
self.fixed_packing_iterator = itertools.cycle(
|
||||
itertools.chain.from_iterable(
|
||||
itertools.repeat(x, self.packing_num) for x in backend_names))
|
||||
await self.flush()
|
||||
itertools.repeat(x, self.packing_num)
|
||||
for x in self.backend_names))
|
||||
self.packing_num = packing_num
|
||||
|
||||
async def _flush_endpoint_queues(self):
|
||||
# perform traffic splitting for requests
|
||||
for endpoint, queue in self.endpoint_queues.items():
|
||||
# if there are incoming requests and there are backends
|
||||
if queue.qsize() and len(self.traffic[endpoint]):
|
||||
while queue.qsize():
|
||||
# choose the next backend available from persistent
|
||||
# information
|
||||
chosen_backend = next(
|
||||
self.fixed_packing_iterator_map[endpoint])
|
||||
request = await queue.get()
|
||||
self.buffer_queues[chosen_backend].add(request)
|
||||
async def flush(self, endpoint_queue, backend_queues):
|
||||
if len(self.backend_names) == 0:
|
||||
logger.info("No backends to assign traffic to.")
|
||||
return set()
|
||||
|
||||
assigned_backends = set()
|
||||
while endpoint_queue.qsize():
|
||||
chosen_backend = next(self.fixed_packing_iterator)
|
||||
backend_queues[chosen_backend].add(await endpoint_queue.get())
|
||||
assigned_backends.add(chosen_backend)
|
||||
|
||||
@ray.remote
|
||||
class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue):
|
||||
pass
|
||||
return assigned_backends
|
||||
|
||||
|
||||
class RoutePolicy(Enum):
|
||||
"""
|
||||
A class for registering the backend selection policy.
|
||||
Add a name and the corresponding class.
|
||||
Serve will support the added policy and policy can be accessed
|
||||
in `serve.init` method through name provided here.
|
||||
"""
|
||||
Random = RandomPolicyQueueActor
|
||||
RoundRobin = RoundRobinPolicyQueueActor
|
||||
PowerOfTwo = PowerOfTwoPolicyQueueActor
|
||||
FixedPacking = FixedPackingPolicyQueueActor
|
||||
"""All builtin routing policies."""
|
||||
Random = RandomPolicy
|
||||
RoundRobin = RoundRobinPolicy
|
||||
PowerOfTwo = PowerOfTwoPolicy
|
||||
FixedPacking = FixedPackingPolicy
|
||||
|
||||
+48
-57
@@ -103,7 +103,7 @@ class Router:
|
||||
3. When there is only 1 backend ready, we will only use that backend.
|
||||
"""
|
||||
|
||||
async def __init__(self):
|
||||
async def __init__(self, policy, policy_kwargs):
|
||||
# Note: Several queues are used in the router
|
||||
# - When a request come in, it's placed inside its corresponding
|
||||
# endpoint_queue.
|
||||
@@ -114,6 +114,11 @@ class Router:
|
||||
# handles are dequed during the second stage of flush operation,
|
||||
# which assign queries in buffer_queue to actor handle.
|
||||
|
||||
# policy.RoutePolicy.
|
||||
self.policy = policy
|
||||
# kwargs to pass into the policy when it's constructed.
|
||||
self.policy_kwargs = policy_kwargs
|
||||
|
||||
# -- Queues -- #
|
||||
|
||||
# endpoint_name -> request queue
|
||||
@@ -123,12 +128,12 @@ class Router:
|
||||
self.worker_queues: DefaultDict[asyncio.Queue[
|
||||
ray.actor.ActorHandle]] = defaultdict(asyncio.Queue)
|
||||
# backend_name -> worker payload queue
|
||||
self.buffer_queues = defaultdict(blist.sortedlist)
|
||||
self.backend_queues = defaultdict(blist.sortedlist)
|
||||
|
||||
# -- Metadata -- #
|
||||
|
||||
# endpoint_name -> traffic_policy
|
||||
self.traffic = defaultdict(dict)
|
||||
self.traffic = dict()
|
||||
# backend_name -> backend_config
|
||||
self.backend_info = dict()
|
||||
# replica tag -> worker_handle
|
||||
@@ -208,7 +213,8 @@ class Router:
|
||||
call_method=request_meta.call_method,
|
||||
async_future=asyncio.get_event_loop().create_future())
|
||||
await self.endpoint_queues[endpoint].put(query)
|
||||
await self.flush()
|
||||
async with self.flush_lock:
|
||||
await self.flush_endpoint_queue(endpoint)
|
||||
|
||||
# Note: a future change can be to directly return the ObjectID from
|
||||
# replica task submission
|
||||
@@ -226,7 +232,6 @@ class Router:
|
||||
self.replicas[backend_replica_tag] = worker_handle
|
||||
|
||||
logger.debug("New worker added for backend '{}'".format(backend_tag))
|
||||
# await worker_handle.ready.remote()
|
||||
await self.mark_worker_idle(backend_tag, backend_replica_tag)
|
||||
|
||||
async def mark_worker_idle(self, backend_tag, backend_replica_tag):
|
||||
@@ -234,7 +239,8 @@ class Router:
|
||||
return
|
||||
|
||||
await self.worker_queues[backend_tag].put(backend_replica_tag)
|
||||
await self.flush()
|
||||
async with self.flush_lock:
|
||||
await self.flush_backend_queues([backend_tag])
|
||||
|
||||
async def remove_worker(self, backend_tag, replica_tag):
|
||||
backend_replica_tag = backend_tag + ":" + replica_tag
|
||||
@@ -261,14 +267,15 @@ class Router:
|
||||
async def set_traffic(self, endpoint, traffic_dict):
|
||||
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
|
||||
traffic_dict)
|
||||
self.traffic[endpoint] = traffic_dict
|
||||
await self.flush()
|
||||
async with self.flush_lock:
|
||||
self.traffic[endpoint] = self.policy(traffic_dict,
|
||||
**self.policy_kwargs)
|
||||
await self.flush_endpoint_queue(endpoint)
|
||||
|
||||
async def remove_endpoint(self, endpoint):
|
||||
logger.debug("Removing endpoint {}".format(endpoint))
|
||||
async with self.flush_lock:
|
||||
await self._flush_endpoint_queues()
|
||||
await self._flush_buffer_queues()
|
||||
await self.flush_endpoint_queue(endpoint)
|
||||
if endpoint in self.endpoint_queues:
|
||||
del self.endpoint_queues[endpoint]
|
||||
if endpoint in self.traffic:
|
||||
@@ -282,24 +289,22 @@ class Router:
|
||||
async def remove_backend(self, backend):
|
||||
logger.debug("Removing backend {}".format(backend))
|
||||
async with self.flush_lock:
|
||||
await self._flush_endpoint_queues()
|
||||
await self._flush_buffer_queues()
|
||||
await self.flush_backend_queues([backend])
|
||||
if backend in self.backend_info:
|
||||
del self.backend_info[backend]
|
||||
if backend in self.worker_queues:
|
||||
del self.worker_queues[backend]
|
||||
if backend in self.buffer_queues:
|
||||
del self.buffer_queues[backend]
|
||||
if backend in self.backend_queues:
|
||||
del self.backend_queues[backend]
|
||||
|
||||
async def flush(self):
|
||||
"""In the default case, flush calls ._flush.
|
||||
|
||||
When this class is a Ray actor, .flush can be scheduled as a remote
|
||||
method invocation.
|
||||
"""
|
||||
async with self.flush_lock:
|
||||
await self._flush_endpoint_queues()
|
||||
await self._flush_buffer_queues()
|
||||
async def flush_endpoint_queue(self, endpoint):
|
||||
"""Attempt to schedule any pending requests to available backends."""
|
||||
assert self.flush_lock.locked()
|
||||
if endpoint not in self.traffic:
|
||||
return
|
||||
backends_to_flush = await self.traffic[endpoint].flush(
|
||||
self.endpoint_queues[endpoint], self.backend_queues)
|
||||
await self.flush_backend_queues(backends_to_flush)
|
||||
|
||||
def _get_available_backends(self, endpoint):
|
||||
backends_in_policy = set(self.traffic[endpoint].keys())
|
||||
@@ -310,44 +315,30 @@ class Router:
|
||||
}
|
||||
return list(backends_in_policy.intersection(available_workers))
|
||||
|
||||
async def _flush_endpoint_queues(self):
|
||||
"""Selects the backend and puts the endpoint queue query to the buffer
|
||||
Expected Implementation:
|
||||
The implementer is expected to access and manipulate
|
||||
self.endpoint_queues : dict[str,Deque]
|
||||
self.buffer_queues : dict[str,sortedlist]
|
||||
For registering the implemented policies register at policy.py
|
||||
Expected Behavior:
|
||||
the Deque of all endpoints in self.endpoint_queues linked with
|
||||
atleast one backend must be empty irrespective of whatever
|
||||
backend policy is implemented.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
"This method should be implemented by child class.")
|
||||
# Flushes the specified backend queues and assigns work to workers.
|
||||
async def flush_backend_queues(self, backends_to_flush):
|
||||
assert self.flush_lock.locked()
|
||||
for backend in backends_to_flush:
|
||||
# No workers available.
|
||||
if self.worker_queues[backend].qsize() == 0:
|
||||
continue
|
||||
# No work to do.
|
||||
if len(self.backend_queues[backend]) == 0:
|
||||
continue
|
||||
|
||||
# Flushes the buffer queue and assigns work to workers.
|
||||
async def _flush_buffer_queues(self):
|
||||
for endpoint in self.traffic:
|
||||
ready_backends = self._get_available_backends(endpoint)
|
||||
for backend in ready_backends:
|
||||
# no work available
|
||||
if len(self.buffer_queues[backend]) == 0:
|
||||
continue
|
||||
buffer_queue = self.backend_queues[backend]
|
||||
worker_queue = self.worker_queues[backend]
|
||||
|
||||
buffer_queue = self.buffer_queues[backend]
|
||||
worker_queue = self.worker_queues[backend]
|
||||
logger.debug("Assigning queries for backend {} with buffer "
|
||||
"queue size {} and worker queue size {}".format(
|
||||
backend, len(buffer_queue), worker_queue.qsize()))
|
||||
|
||||
logger.debug("Assigning queries for backend {} with buffer "
|
||||
"queue size {} and worker queue size {}".format(
|
||||
backend, len(buffer_queue),
|
||||
worker_queue.qsize()))
|
||||
max_batch_size = None
|
||||
if backend in self.backend_info:
|
||||
max_batch_size = self.backend_info[backend].max_batch_size
|
||||
|
||||
max_batch_size = None
|
||||
if backend in self.backend_info:
|
||||
max_batch_size = self.backend_info[backend].max_batch_size
|
||||
|
||||
await self._assign_query_to_worker(
|
||||
backend, buffer_queue, worker_queue, max_batch_size)
|
||||
await self._assign_query_to_worker(backend, buffer_queue,
|
||||
worker_queue, max_batch_size)
|
||||
|
||||
async def _do_query(self, backend, backend_replica_tag, req):
|
||||
# If the worker died, this will be a RayActorError. Just return it and
|
||||
|
||||
@@ -6,9 +6,10 @@ import numpy as np
|
||||
import ray
|
||||
from ray import serve
|
||||
import ray.serve.context as context
|
||||
from ray.serve.policy import RoundRobinPolicyQueueActor
|
||||
from ray.serve.policy import RoundRobinPolicy
|
||||
from ray.serve.backend_worker import create_backend_worker, wrap_to_ray_error
|
||||
from ray.serve.request_params import RequestMetadata
|
||||
from ray.serve.router import Router
|
||||
from ray.serve.config import BackendConfig
|
||||
from ray.serve.exceptions import RayServeException
|
||||
|
||||
@@ -42,7 +43,7 @@ async def test_runner_wraps_error():
|
||||
|
||||
|
||||
async def test_runner_actor(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RoundRobinPolicy, {})
|
||||
|
||||
def echo(flask_request, i=None):
|
||||
return i
|
||||
@@ -63,7 +64,7 @@ async def test_runner_actor(serve_instance):
|
||||
|
||||
|
||||
async def test_ray_serve_mixin(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RoundRobinPolicy, {})
|
||||
|
||||
CONSUMER_NAME = "runner-cls"
|
||||
PRODUCER_NAME = "prod-cls"
|
||||
@@ -88,7 +89,7 @@ async def test_ray_serve_mixin(serve_instance):
|
||||
|
||||
|
||||
async def test_task_runner_check_context(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RoundRobinPolicy, {})
|
||||
|
||||
def echo(flask_request, i=None):
|
||||
# Accessing the flask_request without web context should throw.
|
||||
@@ -109,7 +110,7 @@ async def test_task_runner_check_context(serve_instance):
|
||||
|
||||
|
||||
async def test_task_runner_custom_method_single(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RoundRobinPolicy, {})
|
||||
|
||||
class NonBatcher:
|
||||
def a(self, _):
|
||||
@@ -143,7 +144,7 @@ async def test_task_runner_custom_method_single(serve_instance):
|
||||
|
||||
|
||||
async def test_task_runner_custom_method_batch(serve_instance):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RoundRobinPolicy, {})
|
||||
|
||||
@serve.accept_batch
|
||||
class Batcher:
|
||||
|
||||
@@ -3,15 +3,15 @@ import asyncio
|
||||
import pytest
|
||||
import ray
|
||||
|
||||
from ray.serve.policy import (
|
||||
RandomPolicyQueue, RandomPolicyQueueActor, RoundRobinPolicyQueueActor,
|
||||
PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor)
|
||||
from ray.serve.policy import (RandomPolicy, RoundRobinPolicy, PowerOfTwoPolicy,
|
||||
FixedPackingPolicy)
|
||||
from ray.serve.router import Router
|
||||
from ray.serve.request_params import RequestMetadata
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
def make_task_runner_mock():
|
||||
def mock_task_runner():
|
||||
@ray.remote(num_cpus=0)
|
||||
class TaskRunnerMock:
|
||||
def __init__(self):
|
||||
@@ -35,13 +35,13 @@ def make_task_runner_mock():
|
||||
return TaskRunnerMock.remote()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@pytest.fixture
|
||||
def task_runner_mock_actor():
|
||||
yield make_task_runner_mock()
|
||||
yield mock_task_runner()
|
||||
|
||||
|
||||
async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RandomPolicy, {})
|
||||
q.set_traffic.remote("svc", {"backend-single-prod": 1.0})
|
||||
q.add_new_worker.remote("backend-single-prod", "replica-1",
|
||||
task_runner_mock_actor)
|
||||
@@ -57,7 +57,7 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
|
||||
|
||||
|
||||
async def test_slo(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RandomPolicy, {})
|
||||
await q.set_traffic.remote("svc", {"backend-slo": 1.0})
|
||||
|
||||
all_request_sent = []
|
||||
@@ -81,7 +81,7 @@ async def test_slo(serve_instance, task_runner_mock_actor):
|
||||
|
||||
|
||||
async def test_alter_backend(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RandomPolicy, {})
|
||||
|
||||
await q.set_traffic.remote("svc", {"backend-alter": 1})
|
||||
await q.add_new_worker.remote("backend-alter", "replica-1",
|
||||
@@ -99,13 +99,13 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor):
|
||||
|
||||
|
||||
async def test_split_traffic_random(serve_instance, task_runner_mock_actor):
|
||||
q = RandomPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RandomPolicy, {})
|
||||
|
||||
await q.set_traffic.remote("svc", {
|
||||
"backend-split": 0.5,
|
||||
"backend-split-2": 0.5
|
||||
})
|
||||
runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)]
|
||||
runner_1, runner_2 = [mock_task_runner() for _ in range(2)]
|
||||
await q.add_new_worker.remote("backend-split", "replica-1", runner_1)
|
||||
await q.add_new_worker.remote("backend-split-2", "replica-1", runner_2)
|
||||
|
||||
@@ -122,10 +122,10 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor):
|
||||
|
||||
|
||||
async def test_round_robin(serve_instance, task_runner_mock_actor):
|
||||
q = RoundRobinPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(RoundRobinPolicy, {})
|
||||
|
||||
await q.set_traffic.remote("svc", {"backend-rr": 0.5, "backend-rr-2": 0.5})
|
||||
runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)]
|
||||
runner_1, runner_2 = [mock_task_runner() for _ in range(2)]
|
||||
|
||||
# NOTE: this is the only difference between the
|
||||
# test_split_traffic_random and test_round_robin
|
||||
@@ -144,13 +144,14 @@ async def test_round_robin(serve_instance, task_runner_mock_actor):
|
||||
|
||||
async def test_fixed_packing(serve_instance):
|
||||
packing_num = 4
|
||||
q = FixedPackingPolicyQueueActor.remote(packing_num=packing_num)
|
||||
q = ray.remote(Router).remote(FixedPackingPolicy,
|
||||
{"packing_num": packing_num})
|
||||
await q.set_traffic.remote("svc", {
|
||||
"backend-fixed": 0.5,
|
||||
"backend-fixed-2": 0.5
|
||||
})
|
||||
|
||||
runner_1, runner_2 = (make_task_runner_mock() for _ in range(2))
|
||||
runner_1, runner_2 = (mock_task_runner() for _ in range(2))
|
||||
# both the backends will get equal number of queries
|
||||
# as it is packed round robin
|
||||
await q.add_new_worker.remote("backend-fixed", "replica-1", runner_1)
|
||||
@@ -167,7 +168,7 @@ async def test_fixed_packing(serve_instance):
|
||||
|
||||
|
||||
async def test_power_of_two_choices(serve_instance):
|
||||
q = PowerOfTwoPolicyQueueActor.remote()
|
||||
q = ray.remote(Router).remote(PowerOfTwoPolicy, {})
|
||||
enqueue_futures = []
|
||||
|
||||
# First, fill the queue for backend-1 with 3 requests
|
||||
@@ -185,7 +186,7 @@ async def test_power_of_two_choices(serve_instance):
|
||||
future = q.enqueue_request.remote(RequestMetadata("svc", None), "2")
|
||||
enqueue_futures.append(future)
|
||||
|
||||
runner_1, runner_2 = (make_task_runner_mock() for _ in range(2))
|
||||
runner_1, runner_2 = (mock_task_runner() for _ in range(2))
|
||||
await q.add_new_worker.remote("backend-pow2", "replica-1", runner_1)
|
||||
await q.add_new_worker.remote("backend-pow2-2", "replica-1", runner_2)
|
||||
|
||||
@@ -196,13 +197,12 @@ async def test_power_of_two_choices(serve_instance):
|
||||
|
||||
|
||||
async def test_queue_remove_replicas(serve_instance):
|
||||
@ray.remote
|
||||
class TestRandomPolicyQueueActor(RandomPolicyQueue):
|
||||
class TestRouter(Router):
|
||||
def worker_queue_size(self, backend):
|
||||
return self.worker_queues["backend-remove"].qsize()
|
||||
|
||||
temp_actor = make_task_runner_mock()
|
||||
q = TestRandomPolicyQueueActor.remote()
|
||||
temp_actor = mock_task_runner()
|
||||
q = ray.remote(TestRouter).remote(RandomPolicy, {})
|
||||
await q.add_new_worker.remote("backend-remove", "replica-1", temp_actor)
|
||||
await q.remove_worker.remote("backend-remove", "replica-1")
|
||||
assert ray.get(q.worker_queue_size.remote("backend")) == 0
|
||||
|
||||
Reference in New Issue
Block a user