diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 66e66d489..b65753952 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -6,13 +6,14 @@ import time import ray import ray.cloudpickle as pickle +from ray.serve.backend_worker import create_backend_worker from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_ROUTER_NAME, SERVE_PROXY_NAME, SERVE_METRIC_SINK_NAME) from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore -from ray.serve.backend_worker import create_backend_worker -from ray.serve.utils import async_retryable, get_random_letters, logger from ray.serve.metric.exporter import MetricExporterActor +from ray.serve.router import Router +from ray.serve.utils import async_retryable, get_random_letters, logger import numpy as np @@ -48,8 +49,8 @@ class ServeMaster: requires all implementations here to be idempotent. """ - async def __init__(self, router_class, router_kwargs, start_http_proxy, - http_proxy_host, http_proxy_port, + async def __init__(self, router_policy, router_policy_kwargs, + start_http_proxy, http_proxy_host, http_proxy_port, metric_exporter_class): # Used to read/write checkpoints. # TODO(edoakes): namespace the master actor and its checkpoints. @@ -88,7 +89,7 @@ class ServeMaster: # If starting the actor for the first time, starts up the other system # components. If recovering, fetches their actor handles. self._get_or_start_metric_exporter(metric_exporter_class) - self._get_or_start_router(router_class, router_kwargs) + self._get_or_start_router(router_policy, router_policy_kwargs) if start_http_proxy: self._get_or_start_http_proxy(http_proxy_host, http_proxy_port) @@ -112,7 +113,7 @@ class ServeMaster: asyncio.get_event_loop().create_task( self._recover_from_checkpoint(checkpoint)) - def _get_or_start_router(self, router_class, router_kwargs): + def _get_or_start_router(self, policy, policy_kwargs): """Get the router belonging to this serve cluster. If the router does not already exist, it will be started. @@ -122,12 +123,12 @@ class ServeMaster: except ValueError: logger.info( "Starting router with name '{}'".format(SERVE_ROUTER_NAME)) - self.router = async_retryable(router_class).options( + self.router = async_retryable(ray.remote(Router)).options( detached=True, name=SERVE_ROUTER_NAME, max_concurrency=ASYNC_CONCURRENCY, max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION, - ).remote(**router_kwargs) + ).remote(policy, policy_kwargs) def get_router(self): """Returns a handle to the router managed by this actor.""" diff --git a/python/ray/serve/policy.py b/python/ray/serve/policy.py index 5d957677a..2ddebf03e 100644 --- a/python/ray/serve/policy.py +++ b/python/ray/serve/policy.py @@ -1,188 +1,168 @@ +from abc import ABCMeta, abstractmethod from enum import Enum import itertools import numpy as np -import ray -from ray.serve.router import Router from ray.serve.utils import logger -class RandomPolicyQueue(Router): +class RoutingPolicy: + """Defines the interface for a routing policy for a single endpoint. + + To add a new routing policy, a class should be defined that provides this + interface. The class may be stateful, in which case it may also want to + provide a non-default constructor. However, this state will be lost when + the policy is updated (e.g., a new backend is added). """ - A wrapper class for Random policy.This backend selection policy is - `Stateless` meaning the current decisions of selecting backend are - not dependent on previous decisions. Random policy (randomly) samples - backends based on backend weights for every query. This policy uses the - weights assigned to backends. + __metaclass__ = ABCMeta + + @abstractmethod + async def flush(self, endpoint_queue, backend_queues): + """Flush the endpoint queue into the given backend queues. + + This method should assign each query in the endpoint_queue to a + backend in the backend_queues. Queries are assigned by popping them + from the endpoint queue and pushing them onto a backend queue. The + method must also return a set of all backend tags so that the caller + knows which backend_queues to flush. + + Arguments: + endpoint_queue: asyncio.Queue containing queries to assign. + backend_queues: Dict(str, asyncio.Queue) mapping backend tags to + their corresponding query queues. + + Returns: + Set of backend tags that had queries added to their queues. + """ + assigned_backends = set() + return assigned_backends + + +class RandomPolicy(RoutingPolicy): + """ + A stateless policy that makes a weighted random decision to map each query + to a backend using the specified weights. """ - async def _flush_endpoint_queues(self): - # perform traffic splitting for requests - for endpoint, queue in self.endpoint_queues.items(): - # while there are incoming requests and there are backends - while queue.qsize() and len(self.traffic[endpoint]): - backend_names = list(self.traffic[endpoint].keys()) - backend_weights = list(self.traffic[endpoint].values()) - # randomly choose a backend for every query - chosen_backend = np.random.choice( - backend_names, replace=False, p=backend_weights).squeeze() - logger.debug("Matching endpoint {} to backend {}".format( - endpoint, chosen_backend)) + def __init__(self, traffic_dict): + self.backend_names = list(traffic_dict.keys()) + self.backend_weights = list(traffic_dict.values()) - request = await queue.get() - self.buffer_queues[chosen_backend].add(request) + async def flush(self, endpoint_queue, backend_queues): + if len(self.backend_names) == 0: + logger.info("No backends to assign traffic to.") + return set() + + assigned_backends = set() + while endpoint_queue.qsize(): + chosen_backend = np.random.choice( + self.backend_names, replace=False, + p=self.backend_weights).squeeze() + assigned_backends.add(chosen_backend) + backend_queues[chosen_backend].add(await endpoint_queue.get()) + + return assigned_backends -@ray.remote -class RandomPolicyQueueActor(RandomPolicyQueue): - pass +class RoundRobinPolicy(RoutingPolicy): + """A stateful policy that assigns queries in round-robin order.""" + + def __init__(self, traffic_dict): + # NOTE(edoakes): the backend weights are not used. + self.backend_names = list(traffic_dict.keys()) + # Saves the information about last assigned backend for every endpoint. + self.round_robin_iterator = itertools.cycle(self.backend_names) + + async def flush(self, endpoint_queue, backend_queues): + if len(self.backend_names) == 0: + logger.info("No backends to assign traffic to.") + return set() + + assigned_backends = set() + while endpoint_queue.qsize(): + chosen_backend = next(self.round_robin_iterator) + assigned_backends.add(chosen_backend) + backend_queues[chosen_backend].add(await endpoint_queue.get()) + + return assigned_backends -class RoundRobinPolicyQueue(Router): - """ - A wrapper class for RoundRobin policy. This backend selection policy - is `Stateful` meaning the current decisions of selecting backend are - dependent on previous decisions. RoundRobinPolicy assigns queries in - an interleaved manner to every backend serving for an endpoint. Consider - backend A,B linked to a endpoint. Now queries will be assigned to backends - in the following order - [ A, B, A, B ... ] . This policy doesn't use the - weights assigned to backends. +class PowerOfTwoPolicy(RoutingPolicy): + """A stateless policy that uses the "power of two" policy. + + For each query, two random backends are chosen. Of those two, the query is + assigned to the backend whose queue length is shorter. """ - # Saves the information about last assigned backend for every endpoint. - round_robin_iterator_map = {} + def __init__(self, traffic_dict): + self.backend_names = list(traffic_dict.keys()) + self.backend_weights = list(traffic_dict.values()) - async def set_traffic(self, endpoint, traffic_dict): - logger.debug("Setting traffic for endpoint %s to %s", endpoint, - traffic_dict) - self.traffic[endpoint] = traffic_dict - backend_names = list(self.traffic[endpoint].keys()) - self.round_robin_iterator_map[endpoint] = itertools.cycle( - backend_names) - await self.flush() + async def flush(self, endpoint_queue, backend_queues): + if len(self.backend_names) == 0: + logger.info("No backends to assign traffic to.") + return set() - async def _flush_endpoint_queues(self): - # perform traffic splitting for requests - for endpoint, queue in self.endpoint_queues.items(): - # if there are incoming requests and there are backends - if queue.qsize() and len(self.traffic[endpoint]): - while queue.qsize(): - # choose the next backend available from persistent - # information - chosen_backend = next( - self.round_robin_iterator_map[endpoint]) - request = await queue.get() - self.buffer_queues[chosen_backend].add(request) + assigned_backends = set() + while endpoint_queue.qsize(): + if len(self.backend_names) >= 2: + backend1, backend2 = np.random.choice( + self.backend_names, + 2, + replace=False, + p=self.backend_weights) - -@ray.remote -class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue): - pass - - -class PowerOfTwoPolicyQueue(Router): - """ - A wrapper class for powerOfTwo policy. This backend selection policy is - `Stateless` meaning the current decisions of selecting backend are - dependent on previous decisions. PowerOfTwo policy (randomly) samples two - backends (say Backend A,B among A,B,C) based on the backend weights - specified and chooses the backend which is less loaded. This policy uses - the weights assigned to backends. - """ - - async def _flush_endpoint_queues(self): - # perform traffic splitting for requests - for endpoint, queue in self.endpoint_queues.items(): - # while there are incoming requests and there are backends - while queue.qsize() and len(self.traffic[endpoint]): - backend_names = list(self.traffic[endpoint].keys()) - backend_weights = list(self.traffic[endpoint].values()) - if len(self.traffic[endpoint]) >= 2: - # randomly pick 2 backends - backend1, backend2 = np.random.choice( - backend_names, 2, replace=False, p=backend_weights) - - # see the length of buffer queues of the two backends - # and pick the one which has less no. of queries - # in the buffer - if (len(self.buffer_queues[backend1]) <= len( - self.buffer_queues[backend2])): - chosen_backend = backend1 - else: - chosen_backend = backend2 - logger.debug("[Power of two chocies] found two backends " - "{} and {}: choosing {}.".format( - backend1, backend2, chosen_backend)) + # Choose the backend that has a shorter queue. + if (len(backend_queues[backend1]) <= len( + backend_queues[backend2])): + chosen_backend = backend1 else: - chosen_backend = np.random.choice( - backend_names, replace=False, - p=backend_weights).squeeze() - request = await queue.get() - self.buffer_queues[chosen_backend].add(request) + chosen_backend = backend2 + else: + chosen_backend = np.random.choice( + self.backend_names, replace=False, + p=self.backend_weights).squeeze() + backend_queues[chosen_backend].add(await endpoint_queue.get()) + assigned_backends.add(chosen_backend) + + return assigned_backends -@ray.remote -class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue): - pass - - -class FixedPackingPolicyQueue(Router): - """ - A wrapper class for FixedPacking policy. This backend selection policy is - `Stateful` meaning the current decisions of selecting backend are dependent - on previous decisions. FixedPackingPolicy is k RoundRobin policy where - first packing_num queries are handled by 'backend-1' and next k queries are - handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are - served by the same endpoint. This policy doesn't use the weights assigned - to backends. +class FixedPackingPolicy(RoutingPolicy): + """A stateful policy that uses a "fixed packing" policy. + The policy round-robins groups of packing_num queries across backends. For + example, the first packing_num queries are handled by backend-1, then the + next packing_num queries are handled by backend-2, etc. """ - async def __init__(self, packing_num=3): - # Saves the information about last assigned - # backend for every endpoint - self.fixed_packing_iterator_map = {} - self.packing_num = packing_num - await super().__init__() - - async def set_traffic(self, endpoint, traffic_dict): - logger.debug("Setting traffic for endpoint %s to %s", endpoint, - traffic_dict) - self.traffic[endpoint] = traffic_dict - backend_names = list(self.traffic[endpoint].keys()) - self.fixed_packing_iterator_map[endpoint] = itertools.cycle( + def __init__(self, traffic_dict, packing_num=3): + # NOTE(edoakes): the backend weights are not used. + self.backend_names = list(traffic_dict.keys()) + self.fixed_packing_iterator = itertools.cycle( itertools.chain.from_iterable( - itertools.repeat(x, self.packing_num) for x in backend_names)) - await self.flush() + itertools.repeat(x, self.packing_num) + for x in self.backend_names)) + self.packing_num = packing_num - async def _flush_endpoint_queues(self): - # perform traffic splitting for requests - for endpoint, queue in self.endpoint_queues.items(): - # if there are incoming requests and there are backends - if queue.qsize() and len(self.traffic[endpoint]): - while queue.qsize(): - # choose the next backend available from persistent - # information - chosen_backend = next( - self.fixed_packing_iterator_map[endpoint]) - request = await queue.get() - self.buffer_queues[chosen_backend].add(request) + async def flush(self, endpoint_queue, backend_queues): + if len(self.backend_names) == 0: + logger.info("No backends to assign traffic to.") + return set() + assigned_backends = set() + while endpoint_queue.qsize(): + chosen_backend = next(self.fixed_packing_iterator) + backend_queues[chosen_backend].add(await endpoint_queue.get()) + assigned_backends.add(chosen_backend) -@ray.remote -class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue): - pass + return assigned_backends class RoutePolicy(Enum): - """ - A class for registering the backend selection policy. - Add a name and the corresponding class. - Serve will support the added policy and policy can be accessed - in `serve.init` method through name provided here. - """ - Random = RandomPolicyQueueActor - RoundRobin = RoundRobinPolicyQueueActor - PowerOfTwo = PowerOfTwoPolicyQueueActor - FixedPacking = FixedPackingPolicyQueueActor + """All builtin routing policies.""" + Random = RandomPolicy + RoundRobin = RoundRobinPolicy + PowerOfTwo = PowerOfTwoPolicy + FixedPacking = FixedPackingPolicy diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index b3551b495..ae94250ec 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -103,7 +103,7 @@ class Router: 3. When there is only 1 backend ready, we will only use that backend. """ - async def __init__(self): + async def __init__(self, policy, policy_kwargs): # Note: Several queues are used in the router # - When a request come in, it's placed inside its corresponding # endpoint_queue. @@ -114,6 +114,11 @@ class Router: # handles are dequed during the second stage of flush operation, # which assign queries in buffer_queue to actor handle. + # policy.RoutePolicy. + self.policy = policy + # kwargs to pass into the policy when it's constructed. + self.policy_kwargs = policy_kwargs + # -- Queues -- # # endpoint_name -> request queue @@ -123,12 +128,12 @@ class Router: self.worker_queues: DefaultDict[asyncio.Queue[ ray.actor.ActorHandle]] = defaultdict(asyncio.Queue) # backend_name -> worker payload queue - self.buffer_queues = defaultdict(blist.sortedlist) + self.backend_queues = defaultdict(blist.sortedlist) # -- Metadata -- # # endpoint_name -> traffic_policy - self.traffic = defaultdict(dict) + self.traffic = dict() # backend_name -> backend_config self.backend_info = dict() # replica tag -> worker_handle @@ -208,7 +213,8 @@ class Router: call_method=request_meta.call_method, async_future=asyncio.get_event_loop().create_future()) await self.endpoint_queues[endpoint].put(query) - await self.flush() + async with self.flush_lock: + await self.flush_endpoint_queue(endpoint) # Note: a future change can be to directly return the ObjectID from # replica task submission @@ -226,7 +232,6 @@ class Router: self.replicas[backend_replica_tag] = worker_handle logger.debug("New worker added for backend '{}'".format(backend_tag)) - # await worker_handle.ready.remote() await self.mark_worker_idle(backend_tag, backend_replica_tag) async def mark_worker_idle(self, backend_tag, backend_replica_tag): @@ -234,7 +239,8 @@ class Router: return await self.worker_queues[backend_tag].put(backend_replica_tag) - await self.flush() + async with self.flush_lock: + await self.flush_backend_queues([backend_tag]) async def remove_worker(self, backend_tag, replica_tag): backend_replica_tag = backend_tag + ":" + replica_tag @@ -261,14 +267,15 @@ class Router: async def set_traffic(self, endpoint, traffic_dict): logger.debug("Setting traffic for endpoint %s to %s", endpoint, traffic_dict) - self.traffic[endpoint] = traffic_dict - await self.flush() + async with self.flush_lock: + self.traffic[endpoint] = self.policy(traffic_dict, + **self.policy_kwargs) + await self.flush_endpoint_queue(endpoint) async def remove_endpoint(self, endpoint): logger.debug("Removing endpoint {}".format(endpoint)) async with self.flush_lock: - await self._flush_endpoint_queues() - await self._flush_buffer_queues() + await self.flush_endpoint_queue(endpoint) if endpoint in self.endpoint_queues: del self.endpoint_queues[endpoint] if endpoint in self.traffic: @@ -282,24 +289,22 @@ class Router: async def remove_backend(self, backend): logger.debug("Removing backend {}".format(backend)) async with self.flush_lock: - await self._flush_endpoint_queues() - await self._flush_buffer_queues() + await self.flush_backend_queues([backend]) if backend in self.backend_info: del self.backend_info[backend] if backend in self.worker_queues: del self.worker_queues[backend] - if backend in self.buffer_queues: - del self.buffer_queues[backend] + if backend in self.backend_queues: + del self.backend_queues[backend] - async def flush(self): - """In the default case, flush calls ._flush. - - When this class is a Ray actor, .flush can be scheduled as a remote - method invocation. - """ - async with self.flush_lock: - await self._flush_endpoint_queues() - await self._flush_buffer_queues() + async def flush_endpoint_queue(self, endpoint): + """Attempt to schedule any pending requests to available backends.""" + assert self.flush_lock.locked() + if endpoint not in self.traffic: + return + backends_to_flush = await self.traffic[endpoint].flush( + self.endpoint_queues[endpoint], self.backend_queues) + await self.flush_backend_queues(backends_to_flush) def _get_available_backends(self, endpoint): backends_in_policy = set(self.traffic[endpoint].keys()) @@ -310,44 +315,30 @@ class Router: } return list(backends_in_policy.intersection(available_workers)) - async def _flush_endpoint_queues(self): - """Selects the backend and puts the endpoint queue query to the buffer - Expected Implementation: - The implementer is expected to access and manipulate - self.endpoint_queues : dict[str,Deque] - self.buffer_queues : dict[str,sortedlist] - For registering the implemented policies register at policy.py - Expected Behavior: - the Deque of all endpoints in self.endpoint_queues linked with - atleast one backend must be empty irrespective of whatever - backend policy is implemented. - """ - raise NotImplementedError( - "This method should be implemented by child class.") + # Flushes the specified backend queues and assigns work to workers. + async def flush_backend_queues(self, backends_to_flush): + assert self.flush_lock.locked() + for backend in backends_to_flush: + # No workers available. + if self.worker_queues[backend].qsize() == 0: + continue + # No work to do. + if len(self.backend_queues[backend]) == 0: + continue - # Flushes the buffer queue and assigns work to workers. - async def _flush_buffer_queues(self): - for endpoint in self.traffic: - ready_backends = self._get_available_backends(endpoint) - for backend in ready_backends: - # no work available - if len(self.buffer_queues[backend]) == 0: - continue + buffer_queue = self.backend_queues[backend] + worker_queue = self.worker_queues[backend] - buffer_queue = self.buffer_queues[backend] - worker_queue = self.worker_queues[backend] + logger.debug("Assigning queries for backend {} with buffer " + "queue size {} and worker queue size {}".format( + backend, len(buffer_queue), worker_queue.qsize())) - logger.debug("Assigning queries for backend {} with buffer " - "queue size {} and worker queue size {}".format( - backend, len(buffer_queue), - worker_queue.qsize())) + max_batch_size = None + if backend in self.backend_info: + max_batch_size = self.backend_info[backend].max_batch_size - max_batch_size = None - if backend in self.backend_info: - max_batch_size = self.backend_info[backend].max_batch_size - - await self._assign_query_to_worker( - backend, buffer_queue, worker_queue, max_batch_size) + await self._assign_query_to_worker(backend, buffer_queue, + worker_queue, max_batch_size) async def _do_query(self, backend, backend_replica_tag, req): # If the worker died, this will be a RayActorError. Just return it and diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index 48a9128e5..97b701938 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -6,9 +6,10 @@ import numpy as np import ray from ray import serve import ray.serve.context as context -from ray.serve.policy import RoundRobinPolicyQueueActor +from ray.serve.policy import RoundRobinPolicy from ray.serve.backend_worker import create_backend_worker, wrap_to_ray_error from ray.serve.request_params import RequestMetadata +from ray.serve.router import Router from ray.serve.config import BackendConfig from ray.serve.exceptions import RayServeException @@ -42,7 +43,7 @@ async def test_runner_wraps_error(): async def test_runner_actor(serve_instance): - q = RoundRobinPolicyQueueActor.remote() + q = ray.remote(Router).remote(RoundRobinPolicy, {}) def echo(flask_request, i=None): return i @@ -63,7 +64,7 @@ async def test_runner_actor(serve_instance): async def test_ray_serve_mixin(serve_instance): - q = RoundRobinPolicyQueueActor.remote() + q = ray.remote(Router).remote(RoundRobinPolicy, {}) CONSUMER_NAME = "runner-cls" PRODUCER_NAME = "prod-cls" @@ -88,7 +89,7 @@ async def test_ray_serve_mixin(serve_instance): async def test_task_runner_check_context(serve_instance): - q = RoundRobinPolicyQueueActor.remote() + q = ray.remote(Router).remote(RoundRobinPolicy, {}) def echo(flask_request, i=None): # Accessing the flask_request without web context should throw. @@ -109,7 +110,7 @@ async def test_task_runner_check_context(serve_instance): async def test_task_runner_custom_method_single(serve_instance): - q = RoundRobinPolicyQueueActor.remote() + q = ray.remote(Router).remote(RoundRobinPolicy, {}) class NonBatcher: def a(self, _): @@ -143,7 +144,7 @@ async def test_task_runner_custom_method_single(serve_instance): async def test_task_runner_custom_method_batch(serve_instance): - q = RoundRobinPolicyQueueActor.remote() + q = ray.remote(Router).remote(RoundRobinPolicy, {}) @serve.accept_batch class Batcher: diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index 4bbf2b50f..444a7fa90 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -3,15 +3,15 @@ import asyncio import pytest import ray -from ray.serve.policy import ( - RandomPolicyQueue, RandomPolicyQueueActor, RoundRobinPolicyQueueActor, - PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor) +from ray.serve.policy import (RandomPolicy, RoundRobinPolicy, PowerOfTwoPolicy, + FixedPackingPolicy) +from ray.serve.router import Router from ray.serve.request_params import RequestMetadata pytestmark = pytest.mark.asyncio -def make_task_runner_mock(): +def mock_task_runner(): @ray.remote(num_cpus=0) class TaskRunnerMock: def __init__(self): @@ -35,13 +35,13 @@ def make_task_runner_mock(): return TaskRunnerMock.remote() -@pytest.fixture(scope="session") +@pytest.fixture def task_runner_mock_actor(): - yield make_task_runner_mock() + yield mock_task_runner() async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueueActor.remote() + q = ray.remote(Router).remote(RandomPolicy, {}) q.set_traffic.remote("svc", {"backend-single-prod": 1.0}) q.add_new_worker.remote("backend-single-prod", "replica-1", task_runner_mock_actor) @@ -57,7 +57,7 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): async def test_slo(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueueActor.remote() + q = ray.remote(Router).remote(RandomPolicy, {}) await q.set_traffic.remote("svc", {"backend-slo": 1.0}) all_request_sent = [] @@ -81,7 +81,7 @@ async def test_slo(serve_instance, task_runner_mock_actor): async def test_alter_backend(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueueActor.remote() + q = ray.remote(Router).remote(RandomPolicy, {}) await q.set_traffic.remote("svc", {"backend-alter": 1}) await q.add_new_worker.remote("backend-alter", "replica-1", @@ -99,13 +99,13 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor): async def test_split_traffic_random(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueueActor.remote() + q = ray.remote(Router).remote(RandomPolicy, {}) await q.set_traffic.remote("svc", { "backend-split": 0.5, "backend-split-2": 0.5 }) - runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)] + runner_1, runner_2 = [mock_task_runner() for _ in range(2)] await q.add_new_worker.remote("backend-split", "replica-1", runner_1) await q.add_new_worker.remote("backend-split-2", "replica-1", runner_2) @@ -122,10 +122,10 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor): async def test_round_robin(serve_instance, task_runner_mock_actor): - q = RoundRobinPolicyQueueActor.remote() + q = ray.remote(Router).remote(RoundRobinPolicy, {}) await q.set_traffic.remote("svc", {"backend-rr": 0.5, "backend-rr-2": 0.5}) - runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)] + runner_1, runner_2 = [mock_task_runner() for _ in range(2)] # NOTE: this is the only difference between the # test_split_traffic_random and test_round_robin @@ -144,13 +144,14 @@ async def test_round_robin(serve_instance, task_runner_mock_actor): async def test_fixed_packing(serve_instance): packing_num = 4 - q = FixedPackingPolicyQueueActor.remote(packing_num=packing_num) + q = ray.remote(Router).remote(FixedPackingPolicy, + {"packing_num": packing_num}) await q.set_traffic.remote("svc", { "backend-fixed": 0.5, "backend-fixed-2": 0.5 }) - runner_1, runner_2 = (make_task_runner_mock() for _ in range(2)) + runner_1, runner_2 = (mock_task_runner() for _ in range(2)) # both the backends will get equal number of queries # as it is packed round robin await q.add_new_worker.remote("backend-fixed", "replica-1", runner_1) @@ -167,7 +168,7 @@ async def test_fixed_packing(serve_instance): async def test_power_of_two_choices(serve_instance): - q = PowerOfTwoPolicyQueueActor.remote() + q = ray.remote(Router).remote(PowerOfTwoPolicy, {}) enqueue_futures = [] # First, fill the queue for backend-1 with 3 requests @@ -185,7 +186,7 @@ async def test_power_of_two_choices(serve_instance): future = q.enqueue_request.remote(RequestMetadata("svc", None), "2") enqueue_futures.append(future) - runner_1, runner_2 = (make_task_runner_mock() for _ in range(2)) + runner_1, runner_2 = (mock_task_runner() for _ in range(2)) await q.add_new_worker.remote("backend-pow2", "replica-1", runner_1) await q.add_new_worker.remote("backend-pow2-2", "replica-1", runner_2) @@ -196,13 +197,12 @@ async def test_power_of_two_choices(serve_instance): async def test_queue_remove_replicas(serve_instance): - @ray.remote - class TestRandomPolicyQueueActor(RandomPolicyQueue): + class TestRouter(Router): def worker_queue_size(self, backend): return self.worker_queues["backend-remove"].qsize() - temp_actor = make_task_runner_mock() - q = TestRandomPolicyQueueActor.remote() + temp_actor = mock_task_runner() + q = ray.remote(TestRouter).remote(RandomPolicy, {}) await q.add_new_worker.remote("backend-remove", "replica-1", temp_actor) await q.remove_worker.remote("backend-remove", "replica-1") assert ray.get(q.worker_queue_size.remote("backend")) == 0