diff --git a/python/ray/experimental/serve/queues.py b/python/ray/experimental/serve/queues.py index 175533d64..f72d87f32 100644 --- a/python/ray/experimental/serve/queues.py +++ b/python/ray/experimental/serve/queues.py @@ -73,19 +73,22 @@ class CentralizedQueues: # service_name -> traffic_policy self.traffic = defaultdict(dict) - # backend_name -> worker queue + # backend_name -> worker request queue self.workers = defaultdict(deque) + # backend_name -> worker payload queue + self.buffer_queues = defaultdict(deque) + def is_ready(self): return True def _serve_metric(self): return { - "service_{}_queue_size".format(service_name): { + "backend_{}_queue_size".format(backend_name): { "value": len(queue), "type": "counter", } - for service_name, queue in self.queues.items() + for backend_name, queue in self.buffer_queues.items() } def enqueue_request(self, service, request_args, request_kwargs, @@ -129,39 +132,36 @@ class CentralizedQueues: return list(backends_in_policy.intersection(available_workers)) def _flush(self): + # perform traffic splitting for requests for service, queue in self.queues.items(): + # while there are incoming requests and there are backends + while len(queue) and len(self.traffic[service]): + backend_names = list(self.traffic[service].keys()) + backend_weights = list(self.traffic[service].values()) + chosen_backend = np.random.choice( + backend_names, p=backend_weights).squeeze() + + request = queue.popleft() + self.buffer_queues[chosen_backend].append(request) + + # distach buffer queues to work queues + for service in self.queues.keys(): ready_backends = self._get_available_backends(service) + for backend in ready_backends: + # no work available + if len(self.buffer_queues[backend]) == 0: + continue - while len(queue) and len(ready_backends): - # Fast path, only one backend available. - if len(ready_backends) == 1: - backend = ready_backends[0] - request, work = (queue.popleft(), - self.workers[backend].popleft()) - ray.worker.global_worker.put_object( - work.work_object_id, request) - - # We have more than one backend available. - # We will roll a dice among the multiple backends. - else: - backend_weights = np.array([ - self.traffic[service][backend_name] - for backend_name in ready_backends - ]) - # Normalize the weights to 1. - backend_weights /= backend_weights.sum() - chosen_backend = np.random.choice( - ready_backends, p=backend_weights).squeeze() - + buffer_queue = self.buffer_queues[backend] + work_queue = self.workers[backend] + while len(buffer_queue) and len(work_queue): request, work = ( - queue.popleft(), - self.workers[chosen_backend].popleft(), + buffer_queue.popleft(), + work_queue.popleft(), ) ray.worker.global_worker.put_object( work.work_object_id, request) - ready_backends = self._get_available_backends(service) - @ray.remote class CentralizedQueuesActor(CentralizedQueues): diff --git a/python/ray/experimental/serve/tests/test_queue.py b/python/ray/experimental/serve/tests/test_queue.py index 7f89cfbce..49bd05103 100644 --- a/python/ray/experimental/serve/tests/test_queue.py +++ b/python/ray/experimental/serve/tests/test_queue.py @@ -19,17 +19,17 @@ def test_single_prod_cons_queue(serve_instance): def test_alter_backend(serve_instance): q = CentralizedQueues() + q.set_traffic("svc", {"backend-1": 1}) result_object_id = q.enqueue_request("svc", 1, "kwargs", None) work_object_id = q.dequeue_request("backend-1") - q.set_traffic("svc", {"backend-1": 1}) got_work = ray.get(ray.ObjectID(work_object_id)) assert got_work.request_args == 1 ray.worker.global_worker.put_object(got_work.result_object_id, 2) assert ray.get(ray.ObjectID(result_object_id)) == 2 + q.set_traffic("svc", {"backend-2": 1}) result_object_id = q.enqueue_request("svc", 1, "kwargs", None) work_object_id = q.dequeue_request("backend-2") - q.set_traffic("svc", {"backend-2": 1}) got_work = ray.get(ray.ObjectID(work_object_id)) assert got_work.request_args == 1 ray.worker.global_worker.put_object(got_work.result_object_id, 2) @@ -39,12 +39,13 @@ def test_alter_backend(serve_instance): def test_split_traffic(serve_instance): q = CentralizedQueues() - q.enqueue_request("svc", 1, "kwargs", None) - q.enqueue_request("svc", 1, "kwargs", None) - q.set_traffic("svc", {}) + q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) + # assume 50% split, the probability of all 20 requests goes to a + # single queue is 0.5^20 ~ 1-6 + for _ in range(20): + q.enqueue_request("svc", 1, "kwargs", None) work_object_id_1 = q.dequeue_request("backend-1") work_object_id_2 = q.dequeue_request("backend-2") - q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) got_work = ray.get( [ray.ObjectID(work_object_id_1),