diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 53e01457e..4fe73ce66 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -14,7 +14,7 @@ from ray.serve.utils import block_until_http_ready from ray.serve.exceptions import RayServeException, batch_annotation_not_found from ray.serve.backend_config import BackendConfig from ray.serve.policy import RoutePolicy -from ray.serve.queues import Query +from ray.serve.router import Query from ray.serve.request_params import RequestMetadata master_actor = None diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 6d500e446..d14263dbe 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -13,6 +13,8 @@ from ray.async_compat import sync_to_async def create_backend_worker(func_or_class): + """Creates a worker class wrapping the provided function or class.""" + if inspect.isfunction(func_or_class): is_function = True elif inspect.isclass(func_or_class): @@ -25,45 +27,21 @@ def create_backend_worker(func_or_class): backend_tag, replica_tag, init_args, - self_handle=None, - router_handle=None, - start_running=True): + router_handle=None): serve.init() if is_function: _callable = func_or_class else: _callable = func_or_class(*init_args) - if self_handle is None: - assert router_handle is None + if router_handle is None: master_actor = serve.api._get_master_actor() - # TODO(edoakes): this is a hacky workaround because there is - # a race condition when the master starts up a worker: the - # master starts the worker then adds its handle to a local map - # and the worker queries the master for its own handle from - # that map. If there's a large enough delay in the master, the - # handle may not have been added to the map yet (seen in CI). - # This will be fixed soon when the router just pushes tasks to - # the workers instead of the workers indicating that they're - # available. - start = time.time() - while time.time() - start < 5: - try: - print("Calling get_backend_replica_config") - [self_handle], [router_handle] = ray.get( - master_actor.get_backend_replica_config.remote( - replica_tag)) - print("Got get_backend_replica_config") - break - except ray.exceptions.RayTaskError: - pass + [router_handle] = ray.get( + master_actor.get_backend_worker_config.remote()) - self.backend = RayServeWorker(backend_tag, _callable, self_handle, + self.backend = RayServeWorker(backend_tag, _callable, router_handle, is_function) - if start_running: - self.backend.mark_idle_in_router() - def get_metrics(self): return self.backend.get_metrics() @@ -78,7 +56,8 @@ def create_backend_worker(func_or_class): def wrap_to_ray_error(exception): - """Utility method that catch and seal exceptions in execution""" + """Utility method to wrap exceptions in user code.""" + try: # Raise and catch so we can access traceback.format_exc() raise exception @@ -95,13 +74,11 @@ def ensure_async(func): class RayServeWorker: - """Fetches requests and handles them with the provided callable.""" + """Handles requests with the provided callable.""" - def __init__(self, name, _callable, self_handle, router_handle, - is_function): + def __init__(self, name, _callable, router_handle, is_function): self.name = name self.callable = _callable - self.self_handle = self_handle self.router_handle = router_handle self.is_function = is_function @@ -124,10 +101,6 @@ class RayServeWorker: }, } - def mark_idle_in_router(self): - # Tell the router that this worker can accept tasks. - self.router_handle.dequeue_request.remote(self.name, self.self_handle) - def get_runner_method(self, request_item): method_name = request_item.call_method if not hasattr(self.callable, method_name): @@ -257,6 +230,5 @@ class RayServeWorker: # re-assign to default values serve_context.web = False serve_context.batch_size = None - self.mark_idle_in_router() return result diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 8794161fd..1004fb2fb 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -105,8 +105,8 @@ class ServeMaster: for _ in range(-delta_num_replicas): self._remove_backend_replica(backend_tag) - async def get_backend_replica_config(self, replica_tag): - return [self.tag_to_actor_handles[replica_tag]], self.get_router() + async def get_backend_worker_config(self): + return self.get_router() async def _start_backend_replica(self, backend_tag): assert (backend_tag in self.backend_table.list_backends() @@ -138,6 +138,8 @@ class ServeMaster: # Wait for the worker to start up. await worker_handle.ready.remote() + await self.get_router()[0].add_new_worker.remote( + backend_tag, worker_handle) # Register the worker with the metric monitor. self.get_metric_monitor()[0].add_target.remote(worker_handle) diff --git a/python/ray/serve/policy.py b/python/ray/serve/policy.py index 66fd73e64..20e53533f 100644 --- a/python/ray/serve/policy.py +++ b/python/ray/serve/policy.py @@ -4,11 +4,11 @@ import itertools import numpy as np import ray -from ray.serve.queues import (CentralizedQueues) +from ray.serve.router import Router from ray.serve.utils import logger -class RandomPolicyQueue(CentralizedQueues): +class RandomPolicyQueue(Router): """ A wrapper class for Random policy.This backend selection policy is `Stateless` meaning the current decisions of selecting backend are @@ -39,7 +39,7 @@ class RandomPolicyQueueActor(RandomPolicyQueue): pass -class RoundRobinPolicyQueue(CentralizedQueues): +class RoundRobinPolicyQueue(Router): """ A wrapper class for RoundRobin policy. This backend selection policy is `Stateful` meaning the current decisions of selecting backend are @@ -81,7 +81,7 @@ class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue): pass -class PowerOfTwoPolicyQueue(CentralizedQueues): +class PowerOfTwoPolicyQueue(Router): """ A wrapper class for powerOfTwo policy. This backend selection policy is `Stateless` meaning the current decisions of selecting backend are @@ -127,7 +127,7 @@ class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue): pass -class FixedPackingPolicyQueue(CentralizedQueues): +class FixedPackingPolicyQueue(Router): """ A wrapper class for FixedPacking policy. This backend selection policy is `Stateful` meaning the current decisions of selecting backend are dependent diff --git a/python/ray/serve/queues.py b/python/ray/serve/router.py similarity index 89% rename from python/ray/serve/queues.py rename to python/ray/serve/router.py index e872b03a0..0136c5838 100644 --- a/python/ray/serve/queues.py +++ b/python/ray/serve/router.py @@ -82,28 +82,20 @@ def _make_future_unwrapper(client_futures: List[asyncio.Future], return unwrap_future -class CentralizedQueues: +class Router: """A router that routes request to available workers. - Router accepts each request from the `enqueue_request` method and enqueues - it. It also accepts worker request to work (called work_intention in code) - from workers via the `dequeue_request` method. The traffic policy is used - to match requests with their corresponding workers. + The traffic policy is used to assign requests to workers. Behavior: >>> # psuedo-code - >>> queue = CentralizedQueues() - >>> queue.enqueue_request( + >>> router = Router() + >>> router.enqueue_request( "service-name", request_args, request_kwargs, request_context) # nothing happens, request is queued. - # returns result ObjectID, which will contains the final result - >>> queue.dequeue_request('backend-1', replica_handle) - # nothing happens, work intention is queued. - # return work ObjectID, which will contains the future request payload - >>> queue.link('service-name', 'backend-1') - # here the enqueue_requester is matched with replica, request - # data is put into work ObjectID, and the replica processes the request - # and store the result into result ObjectID + >>> router.add_new_worker("backend-1", worker_handle) + >>> router.link("service-name", "backend-1") + # the request is assigned to the worker Traffic policy splits the traffic among different replicas probabilistically: @@ -197,10 +189,12 @@ class CentralizedQueues: result = await query.async_future return result - async def dequeue_request(self, backend, replica_handle): - logger.debug( - "Received a dequeue request for backend {}".format(backend)) - await self.worker_queues[backend].put(replica_handle) + async def add_new_worker(self, backend, worker_handle): + logger.debug("New worker added for backend '{}'".format(backend)) + await self.mark_worker_idle(backend, worker_handle) + + async def mark_worker_idle(self, backend, worker_handle): + await self.worker_queues[backend].put(worker_handle) await self.flush() async def remove_and_destroy_replica(self, backend, replica_handle): @@ -290,10 +284,16 @@ class CentralizedQueues: max_batch_size = self.backend_info[backend][ "max_batch_size"] - await self._assign_query_to_worker(buffer_queue, worker_queue, - max_batch_size) + await self._assign_query_to_worker( + backend, buffer_queue, worker_queue, max_batch_size) + + async def _do_query(self, backend, worker, req): + result = await worker.handle_request.remote(req) + await self.mark_worker_idle(backend, worker) + return result async def _assign_query_to_worker(self, + backend, buffer_queue, worker_queue, max_batch_size=None): @@ -302,7 +302,8 @@ class CentralizedQueues: worker = await worker_queue.get() if max_batch_size is None: # No batching request = buffer_queue.pop(0) - future = worker.handle_request.remote(request).as_future() + future = asyncio.get_event_loop().create_task( + self._do_query(backend, worker, request)) # chaining satisfies request.async_future with future result. asyncio.futures._chain_future(future, request.async_future) else: @@ -317,7 +318,8 @@ class CentralizedQueues: requests_group[request.call_method].append(request) for group in requests_group.values(): - future = worker.handle_request.remote(group).as_future() + future = asyncio.get_event_loop().create_task( + self._do_query(backend, worker, group)) future.add_done_callback( _make_future_unwrapper( client_futures=[req.async_future for req in group], diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index a126084fa..de0ea4fb9 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -19,14 +19,12 @@ def setup_worker(name, func_or_class, router_handle, init_args=None): @ray.remote class WorkerActor: - def setup(self, self_handle, router_handle): + def __init__(self, router_handle): self.worker = create_backend_worker(func_or_class)( - name, - name + ":tag", - init_args, - self_handle=self_handle[0], - router_handle=router_handle[0], - start_running=False) + name, name + ":tag", init_args, router_handle=router_handle[0]) + + def ready(self): + pass def get_metrics(self): return self.worker.get_metrics() @@ -37,8 +35,8 @@ def setup_worker(name, func_or_class, router_handle, init_args=None): async def handle_request(self, *args, **kwargs): return await self.worker.handle_request(*args, **kwargs) - worker = WorkerActor.remote() - ray.get(worker.setup.remote([worker], [router_handle])) + worker = WorkerActor.remote([router_handle]) + ray.get(worker.ready.remote()) return worker @@ -57,7 +55,7 @@ async def test_runner_actor(serve_instance): PRODUCER_NAME = "prod" worker = setup_worker(CONSUMER_NAME, echo, q) - await worker.run.remote() + await q.add_new_worker.remote(CONSUMER_NAME, worker) q.link.remote(PRODUCER_NAME, CONSUMER_NAME) @@ -82,7 +80,7 @@ async def test_ray_serve_mixin(serve_instance): return i + self.increment worker = setup_worker(CONSUMER_NAME, MyAdder, q, init_args=(3, )) - await worker.run.remote() + await q.add_new_worker.remote(CONSUMER_NAME, worker) q.link.remote(PRODUCER_NAME, CONSUMER_NAME) @@ -104,7 +102,7 @@ async def test_task_runner_check_context(serve_instance): PRODUCER_NAME = "producer" worker = setup_worker(CONSUMER_NAME, echo, q) - await worker.run.remote() + await q.add_new_worker.remote(CONSUMER_NAME, worker) q.link.remote(PRODUCER_NAME, CONSUMER_NAME) query_param = RequestMetadata(PRODUCER_NAME, context.TaskContext.Python) @@ -128,7 +126,7 @@ async def test_task_runner_custom_method_single(serve_instance): PRODUCER_NAME = "producer" worker = setup_worker(CONSUMER_NAME, NonBatcher, q) - await worker.run.remote() + await q.add_new_worker.remote(CONSUMER_NAME, worker) q.link.remote(PRODUCER_NAME, CONSUMER_NAME) @@ -176,7 +174,7 @@ async def test_task_runner_custom_method_batch(serve_instance): futures = [q.enqueue_request.remote(a_query_param) for _ in range(2)] futures += [q.enqueue_request.remote(b_query_param) for _ in range(2)] - await worker.run.remote() + await q.add_new_worker.remote(CONSUMER_NAME, worker) gathered = await asyncio.gather(*futures) assert set(gathered) == {"a-0", "a-1", "b-0", "b-1"} diff --git a/python/ray/serve/tests/test_queue.py b/python/ray/serve/tests/test_router.py similarity index 85% rename from python/ray/serve/tests/test_queue.py rename to python/ray/serve/tests/test_router.py index c368dc718..0fe403b25 100644 --- a/python/ray/serve/tests/test_queue.py +++ b/python/ray/serve/tests/test_router.py @@ -40,7 +40,7 @@ def task_runner_mock_actor(): async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): q = RandomPolicyQueueActor.remote() q.link.remote("svc", "backend") - q.dequeue_request.remote("backend", task_runner_mock_actor) + q.add_new_worker.remote("backend", task_runner_mock_actor) # Make sure we get the request result back result = await q.enqueue_request.remote(RequestMetadata("svc", None), 1) @@ -63,8 +63,7 @@ async def test_slo(serve_instance, task_runner_mock_actor): q.enqueue_request.remote( RequestMetadata("svc", None, relative_slo_ms=slo_ms), i)) - for i in range(10): - await q.dequeue_request.remote("backend", task_runner_mock_actor) + await q.add_new_worker.remote("backend", task_runner_mock_actor) await asyncio.gather(*all_request_sent) @@ -80,13 +79,13 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor): q = RandomPolicyQueueActor.remote() await q.set_traffic.remote("svc", {"backend-1": 1}) - await q.dequeue_request.remote("backend-1", task_runner_mock_actor) + await q.add_new_worker.remote("backend-1", task_runner_mock_actor) await q.enqueue_request.remote(RequestMetadata("svc", None), 1) got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.request_args[0] == 1 await q.set_traffic.remote("svc", {"backend-2": 1}) - await q.dequeue_request.remote("backend-2", task_runner_mock_actor) + await q.add_new_worker.remote("backend-2", task_runner_mock_actor) await q.enqueue_request.remote(RequestMetadata("svc", None), 2) got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.request_args[0] == 2 @@ -97,9 +96,8 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor): await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5}) runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)] - for _ in range(20): - await q.dequeue_request.remote("backend-1", runner_1) - await q.dequeue_request.remote("backend-2", runner_2) + await q.add_new_worker.remote("backend-1", runner_1) + await q.add_new_worker.remote("backend-2", runner_2) # assume 50% split, the probability of all 20 requests goes to a # single queue is 0.5^20 ~ 1-6 @@ -121,9 +119,8 @@ async def test_round_robin(serve_instance, task_runner_mock_actor): # NOTE: this is the only difference between the # test_split_traffic_random and test_round_robin - for _ in range(10): - await q.dequeue_request.remote("backend-1", runner_1) - await q.dequeue_request.remote("backend-2", runner_2) + await q.add_new_worker.remote("backend-1", runner_1) + await q.add_new_worker.remote("backend-2", runner_2) for _ in range(20): await q.enqueue_request.remote(RequestMetadata("svc", None), 1) @@ -143,9 +140,8 @@ async def test_fixed_packing(serve_instance): runner_1, runner_2 = (make_task_runner_mock() for _ in range(2)) # both the backends will get equal number of queries # as it is packed round robin - for _ in range(packing_num): - await q.dequeue_request.remote("backend-1", runner_1) - await q.dequeue_request.remote("backend-2", runner_2) + await q.add_new_worker.remote("backend-1", runner_1) + await q.add_new_worker.remote("backend-2", runner_2) for backend, runner in zip(["1", "2"], [runner_1, runner_2]): for _ in range(packing_num): @@ -174,9 +170,8 @@ async def test_power_of_two_choices(serve_instance): enqueue_futures.append(future) runner_1, runner_2 = (make_task_runner_mock() for _ in range(2)) - for _ in range(3): - await q.dequeue_request.remote("backend-1", runner_1) - await q.dequeue_request.remote("backend-2", runner_2) + await q.add_new_worker.remote("backend-1", runner_1) + await q.add_new_worker.remote("backend-2", runner_2) await asyncio.gather(*enqueue_futures) @@ -187,6 +182,6 @@ async def test_power_of_two_choices(serve_instance): async def test_queue_remove_replicas(serve_instance): temp_actor = make_task_runner_mock() q = RandomPolicyQueue() - await q.dequeue_request("backend", temp_actor) + await q.add_new_worker("backend", temp_actor) await q.remove_and_destroy_replica("backend", temp_actor) assert q.worker_queues["backend"].qsize() == 0