[serve] Push requests to workers instead of polling via dequeue_request (#7965)

This commit is contained in:
Edward Oakes
2020-04-10 14:47:03 -05:00
committed by GitHub
parent d8f5b52265
commit 7be7af11ab
7 changed files with 71 additions and 102 deletions
+1 -1
View File
@@ -14,7 +14,7 @@ from ray.serve.utils import block_until_http_ready
from ray.serve.exceptions import RayServeException, batch_annotation_not_found
from ray.serve.backend_config import BackendConfig
from ray.serve.policy import RoutePolicy
from ray.serve.queues import Query
from ray.serve.router import Query
from ray.serve.request_params import RequestMetadata
master_actor = None
+11 -39
View File
@@ -13,6 +13,8 @@ from ray.async_compat import sync_to_async
def create_backend_worker(func_or_class):
"""Creates a worker class wrapping the provided function or class."""
if inspect.isfunction(func_or_class):
is_function = True
elif inspect.isclass(func_or_class):
@@ -25,45 +27,21 @@ def create_backend_worker(func_or_class):
backend_tag,
replica_tag,
init_args,
self_handle=None,
router_handle=None,
start_running=True):
router_handle=None):
serve.init()
if is_function:
_callable = func_or_class
else:
_callable = func_or_class(*init_args)
if self_handle is None:
assert router_handle is None
if router_handle is None:
master_actor = serve.api._get_master_actor()
# TODO(edoakes): this is a hacky workaround because there is
# a race condition when the master starts up a worker: the
# master starts the worker then adds its handle to a local map
# and the worker queries the master for its own handle from
# that map. If there's a large enough delay in the master, the
# handle may not have been added to the map yet (seen in CI).
# This will be fixed soon when the router just pushes tasks to
# the workers instead of the workers indicating that they're
# available.
start = time.time()
while time.time() - start < 5:
try:
print("Calling get_backend_replica_config")
[self_handle], [router_handle] = ray.get(
master_actor.get_backend_replica_config.remote(
replica_tag))
print("Got get_backend_replica_config")
break
except ray.exceptions.RayTaskError:
pass
[router_handle] = ray.get(
master_actor.get_backend_worker_config.remote())
self.backend = RayServeWorker(backend_tag, _callable, self_handle,
self.backend = RayServeWorker(backend_tag, _callable,
router_handle, is_function)
if start_running:
self.backend.mark_idle_in_router()
def get_metrics(self):
return self.backend.get_metrics()
@@ -78,7 +56,8 @@ def create_backend_worker(func_or_class):
def wrap_to_ray_error(exception):
"""Utility method that catch and seal exceptions in execution"""
"""Utility method to wrap exceptions in user code."""
try:
# Raise and catch so we can access traceback.format_exc()
raise exception
@@ -95,13 +74,11 @@ def ensure_async(func):
class RayServeWorker:
"""Fetches requests and handles them with the provided callable."""
"""Handles requests with the provided callable."""
def __init__(self, name, _callable, self_handle, router_handle,
is_function):
def __init__(self, name, _callable, router_handle, is_function):
self.name = name
self.callable = _callable
self.self_handle = self_handle
self.router_handle = router_handle
self.is_function = is_function
@@ -124,10 +101,6 @@ class RayServeWorker:
},
}
def mark_idle_in_router(self):
# Tell the router that this worker can accept tasks.
self.router_handle.dequeue_request.remote(self.name, self.self_handle)
def get_runner_method(self, request_item):
method_name = request_item.call_method
if not hasattr(self.callable, method_name):
@@ -257,6 +230,5 @@ class RayServeWorker:
# re-assign to default values
serve_context.web = False
serve_context.batch_size = None
self.mark_idle_in_router()
return result
+4 -2
View File
@@ -105,8 +105,8 @@ class ServeMaster:
for _ in range(-delta_num_replicas):
self._remove_backend_replica(backend_tag)
async def get_backend_replica_config(self, replica_tag):
return [self.tag_to_actor_handles[replica_tag]], self.get_router()
async def get_backend_worker_config(self):
return self.get_router()
async def _start_backend_replica(self, backend_tag):
assert (backend_tag in self.backend_table.list_backends()
@@ -138,6 +138,8 @@ class ServeMaster:
# Wait for the worker to start up.
await worker_handle.ready.remote()
await self.get_router()[0].add_new_worker.remote(
backend_tag, worker_handle)
# Register the worker with the metric monitor.
self.get_metric_monitor()[0].add_target.remote(worker_handle)
+5 -5
View File
@@ -4,11 +4,11 @@ import itertools
import numpy as np
import ray
from ray.serve.queues import (CentralizedQueues)
from ray.serve.router import Router
from ray.serve.utils import logger
class RandomPolicyQueue(CentralizedQueues):
class RandomPolicyQueue(Router):
"""
A wrapper class for Random policy.This backend selection policy is
`Stateless` meaning the current decisions of selecting backend are
@@ -39,7 +39,7 @@ class RandomPolicyQueueActor(RandomPolicyQueue):
pass
class RoundRobinPolicyQueue(CentralizedQueues):
class RoundRobinPolicyQueue(Router):
"""
A wrapper class for RoundRobin policy. This backend selection policy
is `Stateful` meaning the current decisions of selecting backend are
@@ -81,7 +81,7 @@ class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue):
pass
class PowerOfTwoPolicyQueue(CentralizedQueues):
class PowerOfTwoPolicyQueue(Router):
"""
A wrapper class for powerOfTwo policy. This backend selection policy is
`Stateless` meaning the current decisions of selecting backend are
@@ -127,7 +127,7 @@ class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue):
pass
class FixedPackingPolicyQueue(CentralizedQueues):
class FixedPackingPolicyQueue(Router):
"""
A wrapper class for FixedPacking policy. This backend selection policy is
`Stateful` meaning the current decisions of selecting backend are dependent
@@ -82,28 +82,20 @@ def _make_future_unwrapper(client_futures: List[asyncio.Future],
return unwrap_future
class CentralizedQueues:
class Router:
"""A router that routes request to available workers.
Router accepts each request from the `enqueue_request` method and enqueues
it. It also accepts worker request to work (called work_intention in code)
from workers via the `dequeue_request` method. The traffic policy is used
to match requests with their corresponding workers.
The traffic policy is used to assign requests to workers.
Behavior:
>>> # psuedo-code
>>> queue = CentralizedQueues()
>>> queue.enqueue_request(
>>> router = Router()
>>> router.enqueue_request(
"service-name", request_args, request_kwargs, request_context)
# nothing happens, request is queued.
# returns result ObjectID, which will contains the final result
>>> queue.dequeue_request('backend-1', replica_handle)
# nothing happens, work intention is queued.
# return work ObjectID, which will contains the future request payload
>>> queue.link('service-name', 'backend-1')
# here the enqueue_requester is matched with replica, request
# data is put into work ObjectID, and the replica processes the request
# and store the result into result ObjectID
>>> router.add_new_worker("backend-1", worker_handle)
>>> router.link("service-name", "backend-1")
# the request is assigned to the worker
Traffic policy splits the traffic among different replicas
probabilistically:
@@ -197,10 +189,12 @@ class CentralizedQueues:
result = await query.async_future
return result
async def dequeue_request(self, backend, replica_handle):
logger.debug(
"Received a dequeue request for backend {}".format(backend))
await self.worker_queues[backend].put(replica_handle)
async def add_new_worker(self, backend, worker_handle):
logger.debug("New worker added for backend '{}'".format(backend))
await self.mark_worker_idle(backend, worker_handle)
async def mark_worker_idle(self, backend, worker_handle):
await self.worker_queues[backend].put(worker_handle)
await self.flush()
async def remove_and_destroy_replica(self, backend, replica_handle):
@@ -290,10 +284,16 @@ class CentralizedQueues:
max_batch_size = self.backend_info[backend][
"max_batch_size"]
await self._assign_query_to_worker(buffer_queue, worker_queue,
max_batch_size)
await self._assign_query_to_worker(
backend, buffer_queue, worker_queue, max_batch_size)
async def _do_query(self, backend, worker, req):
result = await worker.handle_request.remote(req)
await self.mark_worker_idle(backend, worker)
return result
async def _assign_query_to_worker(self,
backend,
buffer_queue,
worker_queue,
max_batch_size=None):
@@ -302,7 +302,8 @@ class CentralizedQueues:
worker = await worker_queue.get()
if max_batch_size is None: # No batching
request = buffer_queue.pop(0)
future = worker.handle_request.remote(request).as_future()
future = asyncio.get_event_loop().create_task(
self._do_query(backend, worker, request))
# chaining satisfies request.async_future with future result.
asyncio.futures._chain_future(future, request.async_future)
else:
@@ -317,7 +318,8 @@ class CentralizedQueues:
requests_group[request.call_method].append(request)
for group in requests_group.values():
future = worker.handle_request.remote(group).as_future()
future = asyncio.get_event_loop().create_task(
self._do_query(backend, worker, group))
future.add_done_callback(
_make_future_unwrapper(
client_futures=[req.async_future for req in group],
+12 -14
View File
@@ -19,14 +19,12 @@ def setup_worker(name, func_or_class, router_handle, init_args=None):
@ray.remote
class WorkerActor:
def setup(self, self_handle, router_handle):
def __init__(self, router_handle):
self.worker = create_backend_worker(func_or_class)(
name,
name + ":tag",
init_args,
self_handle=self_handle[0],
router_handle=router_handle[0],
start_running=False)
name, name + ":tag", init_args, router_handle=router_handle[0])
def ready(self):
pass
def get_metrics(self):
return self.worker.get_metrics()
@@ -37,8 +35,8 @@ def setup_worker(name, func_or_class, router_handle, init_args=None):
async def handle_request(self, *args, **kwargs):
return await self.worker.handle_request(*args, **kwargs)
worker = WorkerActor.remote()
ray.get(worker.setup.remote([worker], [router_handle]))
worker = WorkerActor.remote([router_handle])
ray.get(worker.ready.remote())
return worker
@@ -57,7 +55,7 @@ async def test_runner_actor(serve_instance):
PRODUCER_NAME = "prod"
worker = setup_worker(CONSUMER_NAME, echo, q)
await worker.run.remote()
await q.add_new_worker.remote(CONSUMER_NAME, worker)
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
@@ -82,7 +80,7 @@ async def test_ray_serve_mixin(serve_instance):
return i + self.increment
worker = setup_worker(CONSUMER_NAME, MyAdder, q, init_args=(3, ))
await worker.run.remote()
await q.add_new_worker.remote(CONSUMER_NAME, worker)
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
@@ -104,7 +102,7 @@ async def test_task_runner_check_context(serve_instance):
PRODUCER_NAME = "producer"
worker = setup_worker(CONSUMER_NAME, echo, q)
await worker.run.remote()
await q.add_new_worker.remote(CONSUMER_NAME, worker)
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
query_param = RequestMetadata(PRODUCER_NAME, context.TaskContext.Python)
@@ -128,7 +126,7 @@ async def test_task_runner_custom_method_single(serve_instance):
PRODUCER_NAME = "producer"
worker = setup_worker(CONSUMER_NAME, NonBatcher, q)
await worker.run.remote()
await q.add_new_worker.remote(CONSUMER_NAME, worker)
q.link.remote(PRODUCER_NAME, CONSUMER_NAME)
@@ -176,7 +174,7 @@ async def test_task_runner_custom_method_batch(serve_instance):
futures = [q.enqueue_request.remote(a_query_param) for _ in range(2)]
futures += [q.enqueue_request.remote(b_query_param) for _ in range(2)]
await worker.run.remote()
await q.add_new_worker.remote(CONSUMER_NAME, worker)
gathered = await asyncio.gather(*futures)
assert set(gathered) == {"a-0", "a-1", "b-0", "b-1"}
@@ -40,7 +40,7 @@ def task_runner_mock_actor():
async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
q = RandomPolicyQueueActor.remote()
q.link.remote("svc", "backend")
q.dequeue_request.remote("backend", task_runner_mock_actor)
q.add_new_worker.remote("backend", task_runner_mock_actor)
# Make sure we get the request result back
result = await q.enqueue_request.remote(RequestMetadata("svc", None), 1)
@@ -63,8 +63,7 @@ async def test_slo(serve_instance, task_runner_mock_actor):
q.enqueue_request.remote(
RequestMetadata("svc", None, relative_slo_ms=slo_ms), i))
for i in range(10):
await q.dequeue_request.remote("backend", task_runner_mock_actor)
await q.add_new_worker.remote("backend", task_runner_mock_actor)
await asyncio.gather(*all_request_sent)
@@ -80,13 +79,13 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor):
q = RandomPolicyQueueActor.remote()
await q.set_traffic.remote("svc", {"backend-1": 1})
await q.dequeue_request.remote("backend-1", task_runner_mock_actor)
await q.add_new_worker.remote("backend-1", task_runner_mock_actor)
await q.enqueue_request.remote(RequestMetadata("svc", None), 1)
got_work = await task_runner_mock_actor.get_recent_call.remote()
assert got_work.request_args[0] == 1
await q.set_traffic.remote("svc", {"backend-2": 1})
await q.dequeue_request.remote("backend-2", task_runner_mock_actor)
await q.add_new_worker.remote("backend-2", task_runner_mock_actor)
await q.enqueue_request.remote(RequestMetadata("svc", None), 2)
got_work = await task_runner_mock_actor.get_recent_call.remote()
assert got_work.request_args[0] == 2
@@ -97,9 +96,8 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor):
await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5})
runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)]
for _ in range(20):
await q.dequeue_request.remote("backend-1", runner_1)
await q.dequeue_request.remote("backend-2", runner_2)
await q.add_new_worker.remote("backend-1", runner_1)
await q.add_new_worker.remote("backend-2", runner_2)
# assume 50% split, the probability of all 20 requests goes to a
# single queue is 0.5^20 ~ 1-6
@@ -121,9 +119,8 @@ async def test_round_robin(serve_instance, task_runner_mock_actor):
# NOTE: this is the only difference between the
# test_split_traffic_random and test_round_robin
for _ in range(10):
await q.dequeue_request.remote("backend-1", runner_1)
await q.dequeue_request.remote("backend-2", runner_2)
await q.add_new_worker.remote("backend-1", runner_1)
await q.add_new_worker.remote("backend-2", runner_2)
for _ in range(20):
await q.enqueue_request.remote(RequestMetadata("svc", None), 1)
@@ -143,9 +140,8 @@ async def test_fixed_packing(serve_instance):
runner_1, runner_2 = (make_task_runner_mock() for _ in range(2))
# both the backends will get equal number of queries
# as it is packed round robin
for _ in range(packing_num):
await q.dequeue_request.remote("backend-1", runner_1)
await q.dequeue_request.remote("backend-2", runner_2)
await q.add_new_worker.remote("backend-1", runner_1)
await q.add_new_worker.remote("backend-2", runner_2)
for backend, runner in zip(["1", "2"], [runner_1, runner_2]):
for _ in range(packing_num):
@@ -174,9 +170,8 @@ async def test_power_of_two_choices(serve_instance):
enqueue_futures.append(future)
runner_1, runner_2 = (make_task_runner_mock() for _ in range(2))
for _ in range(3):
await q.dequeue_request.remote("backend-1", runner_1)
await q.dequeue_request.remote("backend-2", runner_2)
await q.add_new_worker.remote("backend-1", runner_1)
await q.add_new_worker.remote("backend-2", runner_2)
await asyncio.gather(*enqueue_futures)
@@ -187,6 +182,6 @@ async def test_power_of_two_choices(serve_instance):
async def test_queue_remove_replicas(serve_instance):
temp_actor = make_task_runner_mock()
q = RandomPolicyQueue()
await q.dequeue_request("backend", temp_actor)
await q.add_new_worker("backend", temp_actor)
await q.remove_and_destroy_replica("backend", temp_actor)
assert q.worker_queues["backend"].qsize() == 0