diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index 23540e23c..df103f5df 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -87,10 +87,12 @@ def get_async(object_id): if object_id.is_direct_call_type(): inner_future = loop.create_future() + # We must add the done_callback before sending to in_memory_store_get + inner_future.add_done_callback(done_callback) core_worker.in_memory_store_get_async(object_id, inner_future) else: inner_future = as_future(object_id) - inner_future.add_done_callback(done_callback) + inner_future.add_done_callback(done_callback) # A hack to keep reference to inner_future so it doesn't get GC. user_future.inner_future = inner_future # A hack to keep a reference to the object ID for ref counting. diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py index e4af4c373..6ebfec9cd 100644 --- a/python/ray/experimental/serve/__init__.py +++ b/python/ray/experimental/serve/__init__.py @@ -2,10 +2,10 @@ from ray.experimental.serve.backend_config import BackendConfig from ray.experimental.serve.policy import RoutePolicy from ray.experimental.serve.api import ( init, create_backend, create_endpoint, link, split, get_handle, stat, - set_backend_config, get_backend_config, accept_batch) # noqa: E402 + set_backend_config, get_backend_config, accept_batch, route) # noqa: E402 __all__ = [ "init", "create_backend", "create_endpoint", "link", "split", "get_handle", "stat", "set_backend_config", "get_backend_config", "BackendConfig", - "RoutePolicy", "accept_batch" + "RoutePolicy", "accept_batch", "route" ] diff --git a/python/ray/experimental/serve/api.py b/python/ray/experimental/serve/api.py index e1ed0e2bd..c00f732d8 100644 --- a/python/ray/experimental/serve/api.py +++ b/python/ray/experimental/serve/api.py @@ -16,6 +16,7 @@ from ray.experimental.serve.utils import (block_until_http_ready, from ray.experimental.serve.exceptions import RayServeException from ray.experimental.serve.backend_config import BackendConfig from ray.experimental.serve.policy import RoutePolicy +from ray.experimental.serve.queues import Query global_state = None @@ -111,6 +112,10 @@ def init(kv_store_connector=None, except ValueError: pass + # Register serialization context once + ray.register_custom_serializer(Query, Query.ray_serialize, + Query.ray_deserialize) + if kv_store_path is None: _, kv_store_path = mkstemp() @@ -439,3 +444,16 @@ def stat(percentiles=[50, 90, 95], """ return ray.get(global_state.init_or_get_metric_monitor().collect.remote( percentiles, agg_windows_seconds)) + + +class route: + def __init__(self, url_route): + self.route = url_route + + def __call__(self, func_or_class): + name = func_or_class.__name__ + backend_tag = "{}:v0".format(name) + + create_backend(func_or_class, backend_tag) + create_endpoint(name, self.route) + link(name, backend_tag) diff --git a/python/ray/experimental/serve/constants.py b/python/ray/experimental/serve/constants.py index 1b15c1e96..1819e1825 100644 --- a/python/ray/experimental/serve/constants.py +++ b/python/ray/experimental/serve/constants.py @@ -8,10 +8,16 @@ SERVE_NURSERY_NAME = "SERVE_ACTOR_NURSERY" BOOTSTRAP_KV_STORE_CONN_KEY = "kv_store_connector" #: HTTP Address -DEFAULT_HTTP_ADDRESS = "http://0.0.0.0:8000" +DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000" #: HTTP Host -DEFAULT_HTTP_HOST = "0.0.0.0" +DEFAULT_HTTP_HOST = "127.0.0.1" #: HTTP Port DEFAULT_HTTP_PORT = 8000 + +#: Max concurrency +ASYNC_CONCURRENCY = int(1e6) + +#: Default latency SLO +DEFAULT_LATENCY_SLO_MS = 1e9 diff --git a/python/ray/experimental/serve/examples/benchmark.py b/python/ray/experimental/serve/examples/benchmark.py new file mode 100644 index 000000000..6ed4c54cc --- /dev/null +++ b/python/ray/experimental/serve/examples/benchmark.py @@ -0,0 +1,33 @@ +from ray.experimental import serve +from ray.experimental.serve.constants import DEFAULT_HTTP_ADDRESS +import requests +import time +import pandas as pd +from tqdm import tqdm + +serve.init(blocking=True) + + +@serve.route("/noop") +def noop(_): + return "" + + +url = "{}/noop".format(DEFAULT_HTTP_ADDRESS) +while requests.get(url).status_code == 404: + time.sleep(1) + print("Waiting for noop route to showup.") + +latency = [] +for _ in tqdm(range(5200)): + start = time.perf_counter() + resp = requests.get(url) + end = time.perf_counter() + latency.append(end - start) + +# Remove initial samples +latency = latency[200:] + +series = pd.Series(latency) * 1000 +print("Latency for single noop backend (ms)") +print(series.describe(percentiles=[0.5, 0.9, 0.95, 0.99])) diff --git a/python/ray/experimental/serve/global_state.py b/python/ray/experimental/serve/global_state.py index a4c78e9e2..db04172ea 100644 --- a/python/ray/experimental/serve/global_state.py +++ b/python/ray/experimental/serve/global_state.py @@ -1,7 +1,7 @@ import ray from ray.experimental.serve.constants import ( BOOTSTRAP_KV_STORE_CONN_KEY, DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, - SERVE_NURSERY_NAME) + SERVE_NURSERY_NAME, ASYNC_CONCURRENCY) from ray.experimental.serve.kv_store_service import ( BackendTable, RoutingTable, TrafficPolicyTable) from ray.experimental.serve.metric import (MetricMonitor, @@ -37,9 +37,16 @@ class ActorNursery: self.bootstrap_state = dict() - def start_actor(self, actor_cls, tag, init_args=(), init_kwargs={}): + def start_actor(self, + actor_cls, + tag, + init_args=(), + init_kwargs={}, + is_asyncio=False): """Start an actor and add it to the nursery""" - handle = actor_cls.remote(*init_args, **init_kwargs) + max_concurrency = ASYNC_CONCURRENCY if is_asyncio else None + handle = (actor_cls.options(max_concurrency=max_concurrency).remote( + *init_args, **init_kwargs)) self.actor_handles[handle] = tag return [handle] @@ -137,8 +144,9 @@ class GlobalState: self.actor_nursery_handle.start_actor.remote( self.queueing_policy.value, init_kwargs=policy_kwargs, - tag=queue_actor_tag)) - handle.register_self_handle.remote(handle) + tag=queue_actor_tag, + is_asyncio=True)) + # handle.register_self_handle.remote(handle) self.refresh_actor_handle_cache() return self.actor_handle_cache[queue_actor_tag] diff --git a/python/ray/experimental/serve/handle.py b/python/ray/experimental/serve/handle.py index b4cd2eedf..bb93995e3 100644 --- a/python/ray/experimental/serve/handle.py +++ b/python/ray/experimental/serve/handle.py @@ -1,4 +1,3 @@ -import ray from ray.experimental import serve from ray.experimental.serve.context import TaskContext from ray.experimental.serve.exceptions import RayServeException @@ -48,14 +47,12 @@ class RayServeHandle: except ValueError as e: raise RayServeException(str(e)) - result_object_id_bytes = ray.get( - self.router_handle.enqueue_request.remote( - service=self.endpoint_name, - request_args=(), - request_kwargs=kwargs, - request_context=TaskContext.Python, - request_slo_ms=request_slo_ms)) - return ray.ObjectID(result_object_id_bytes) + return self.router_handle.enqueue_request.remote( + service=self.endpoint_name, + request_args=(), + request_kwargs=kwargs, + request_context=TaskContext.Python, + request_slo_ms=request_slo_ms) def get_traffic_policy(self): # TODO(simon): This method is implemented via checking global state diff --git a/python/ray/experimental/serve/policy.py b/python/ray/experimental/serve/policy.py index 91998f3ec..1b32283a2 100644 --- a/python/ray/experimental/serve/policy.py +++ b/python/ray/experimental/serve/policy.py @@ -1,7 +1,178 @@ from enum import Enum -from ray.experimental.serve.queues import ( - RoundRobinPolicyQueueActor, RandomPolicyQueueActor, - PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor) +import itertools + +import numpy as np + +import ray +from ray.experimental.serve.queues import (CentralizedQueues) +from ray.experimental.serve.utils import logger + + +class RandomPolicyQueue(CentralizedQueues): + """ + A wrapper class for Random policy.This backend selection policy is + `Stateless` meaning the current decisions of selecting backend are + not dependent on previous decisions. Random policy (randomly) samples + backends based on backend weights for every query. This policy uses the + weights assigned to backends. + """ + + async def _flush_service_queues(self): + # perform traffic splitting for requests + for service, queue in self.service_queues.items(): + # while there are incoming requests and there are backends + while queue.qsize() and len(self.traffic[service]): + backend_names = list(self.traffic[service].keys()) + backend_weights = list(self.traffic[service].values()) + # randomly choose a backend for every query + chosen_backend = np.random.choice( + backend_names, replace=False, p=backend_weights).squeeze() + logger.debug("Matching service {} to backend {}".format( + service, chosen_backend)) + + request = await queue.get() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class RandomPolicyQueueActor(RandomPolicyQueue): + pass + + +class RoundRobinPolicyQueue(CentralizedQueues): + """ + A wrapper class for RoundRobin policy. This backend selection policy + is `Stateful` meaning the current decisions of selecting backend are + dependent on previous decisions. RoundRobinPolicy assigns queries in + an interleaved manner to every backend serving for a service. Consider + backend A,B linked to a service. Now queries will be assigned to backends + in the following order - [ A, B, A, B ... ] . This policy doesn't use the + weights assigned to backends. + """ + + # Saves the information about last assigned + # backend for every service + round_robin_iterator_map = {} + + async def set_traffic(self, service, traffic_dict): + logger.debug("Setting traffic for service %s to %s", service, + traffic_dict) + self.traffic[service] = traffic_dict + backend_names = list(self.traffic[service].keys()) + self.round_robin_iterator_map[service] = itertools.cycle(backend_names) + await self.flush() + + async def _flush_service_queues(self): + # perform traffic splitting for requests + for service, queue in self.service_queues.items(): + # if there are incoming requests and there are backends + if queue.qsize() and len(self.traffic[service]): + while queue.qsize(): + # choose the next backend available from persistent + # information + chosen_backend = next( + self.round_robin_iterator_map[service]) + request = await queue.get() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue): + pass + + +class PowerOfTwoPolicyQueue(CentralizedQueues): + """ + A wrapper class for powerOfTwo policy. This backend selection policy is + `Stateless` meaning the current decisions of selecting backend are + dependent on previous decisions. PowerOfTwo policy (randomly) samples two + backends (say Backend A,B among A,B,C) based on the backend weights + specified and chooses the backend which is less loaded. This policy uses + the weights assigned to backends. + """ + + async def _flush_service_queues(self): + # perform traffic splitting for requests + for service, queue in self.service_queues.items(): + # while there are incoming requests and there are backends + while queue.qsize() and len(self.traffic[service]): + backend_names = list(self.traffic[service].keys()) + backend_weights = list(self.traffic[service].values()) + if len(self.traffic[service]) >= 2: + # randomly pick 2 backends + backend1, backend2 = np.random.choice( + backend_names, 2, replace=False, p=backend_weights) + + # see the length of buffer queues of the two backends + # and pick the one which has less no. of queries + # in the buffer + if (len(self.buffer_queues[backend1]) <= len( + self.buffer_queues[backend2])): + chosen_backend = backend1 + else: + chosen_backend = backend2 + logger.debug("[Power of two chocies] found two backends " + "{} and {}: choosing {}.".format( + backend1, backend2, chosen_backend)) + else: + chosen_backend = np.random.choice( + backend_names, replace=False, + p=backend_weights).squeeze() + request = await queue.get() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue): + pass + + +class FixedPackingPolicyQueue(CentralizedQueues): + """ + A wrapper class for FixedPacking policy. This backend selection policy is + `Stateful` meaning the current decisions of selecting backend are dependent + on previous decisions. FixedPackingPolicy is k RoundRobin policy where + first packing_num queries are handled by 'backend-1' and next k queries are + handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are + served by the same service. This policy doesn't use the weights assigned to + backends. + + """ + + def __init__(self, packing_num=3): + # Saves the information about last assigned + # backend for every service + self.fixed_packing_iterator_map = {} + self.packing_num = packing_num + super().__init__() + + async def set_traffic(self, service, traffic_dict): + logger.debug("Setting traffic for service %s to %s", service, + traffic_dict) + self.traffic[service] = traffic_dict + backend_names = list(self.traffic[service].keys()) + self.fixed_packing_iterator_map[service] = itertools.cycle( + itertools.chain.from_iterable( + itertools.repeat(x, self.packing_num) for x in backend_names)) + await self.flush() + + async def _flush_service_queues(self): + # perform traffic splitting for requests + for service, queue in self.service_queues.items(): + # if there are incoming requests and there are backends + if queue.qsize() and len(self.traffic[service]): + while queue.qsize(): + # choose the next backend available from persistent + # information + chosen_backend = next( + self.fixed_packing_iterator_map[service]) + request = await queue.get() + self.buffer_queues[chosen_backend].add(request) + + +@ray.remote +class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue): + pass class RoutePolicy(Enum): diff --git a/python/ray/experimental/serve/queues.py b/python/ray/experimental/serve/queues.py index b952ecad2..fed77faef 100644 --- a/python/ray/experimental/serve/queues.py +++ b/python/ray/experimental/serve/queues.py @@ -1,43 +1,94 @@ -from collections import defaultdict, deque +import asyncio +import copy +from collections import defaultdict +import time +from typing import DefaultDict, Union, List +import pickle -import numpy as np +# Note on choosing blist instead of stdlib heapq +# 1. pop operation should be O(1) (amortized) +# (helpful even for batched pop) +# 2. There should not be significant overhead in +# maintaining the sorted list. +# 3. The blist implementation is fast and uses C extensions. +import blist import ray from ray.experimental.serve.utils import logger -import itertools -from blist import sortedlist -import time +from ray.experimental.serve.constants import DEFAULT_LATENCY_SLO_MS class Query: - def __init__(self, - request_args, - request_kwargs, - request_context, - request_slo_ms, - result_object_id=None): + def __init__(self, request_args, request_kwargs, request_context, + request_slo_ms): self.request_args = request_args self.request_kwargs = request_kwargs self.request_context = request_context - if result_object_id is None: - self.result_object_id = ray.ObjectID.from_random() - else: - self.result_object_id = result_object_id + self.async_future = asyncio.get_event_loop().create_future() # Service level objective in milliseconds. This is expected to be the # absolute time since unix epoch. self.request_slo_ms = request_slo_ms + def ray_serialize(self): + # NOTE: this method is needed because Query need to be serialized and + # sent to the replica worker. However, after we send the query to + # replica worker the async_future is still needed to retrieve the final + # result. Therefore we need a way to pass the information to replica + # worker without removing async_future. + clone = copy.copy(self) + clone.async_future = None + # We can't use cloudpickle due to a recursion issue + return pickle.dumps(clone) + + @staticmethod + def ray_deserialize(value): + return pickle.loads(value) + # adding comparator fn for maintaining an # ascending order sorted list w.r.t request_slo_ms def __lt__(self, other): return self.request_slo_ms < other.request_slo_ms + def __repr__(self): + return "".format(self.request_args, + self.request_kwargs) -class WorkIntent: - def __init__(self, replica_handle): - self.replica_handle = replica_handle + +def _adjust_latency_slo(slo_ms: Union[float, int, None]) -> float: + """Normalize the input latency objective to absoluate timestamp. + + Input: + slo_ms(float, int, None): If value is None, then we use a high default + value so other queries can be prioritize and put in front of these + queries. + """ + if slo_ms is None: + slo_ms = DEFAULT_LATENCY_SLO_MS + current_time_ms = time.time() * 1000 + return current_time_ms + slo_ms + + +def _make_future_unwrapper(client_futures: List[asyncio.Future], + host_future: asyncio.Future): + """Distribute the result of host_future to each of client_future""" + for client_future in client_futures: + # Keep a reference to host future so the host future won't get + # garbage collected. + client_future.host_ref = host_future + + def unwrap_future(_): + result = host_future.result() + + if isinstance(result, list): + for client_future, result_item in zip(client_futures, result): + client_future.set_result(result_item) + else: # Result is an exception. + for client_future in client_futures: + client_future.set_result(result) + + return unwrap_future class CentralizedQueues: @@ -77,27 +128,44 @@ class CentralizedQueues: """ def __init__(self): + # Note: Several queues are used in the router + # - When a request come in, it's placed inside its corresponding + # service_queue. + # - The service_queue is dequed during flush operation, which moves + # the queries to backend buffer_queue. Here we match a request + # for a service to a backend given some policy. + # - The worker_queue is used to collect idle actor handle. These + # handles are dequed during the second stage of flush operation, + # which assign queries in buffer_queue to actor handle. + + # -- Queues -- # + # service_name -> request queue - self.queues = defaultdict(deque) + self.service_queues: DefaultDict[asyncio.Queue[Query]] = defaultdict( + asyncio.Queue) + # backend_name -> worker request queue + self.worker_queues: DefaultDict[asyncio.Queue[ + ray.actor.ActorHandle]] = defaultdict(asyncio.Queue) + # backend_name -> worker payload queue + self.buffer_queues = defaultdict(blist.sortedlist) + + # -- Metadata -- # # service_name -> traffic_policy self.traffic = defaultdict(dict) - # backend_name -> backend_config self.backend_info = dict() - # backend_name -> worker request queue - self.workers = defaultdict(deque) + # -- Synchronization -- # - # backend_name -> worker payload queue - # using blist sortedlist for deadline awareness - # blist is chosen because: - # 1. pop operation should be O(1) (amortized) - # (helpful even for batched pop) - # 2. There should not be significant overhead in - # maintaining the sorted list. - # 3. The blist implementation is fast and uses C extensions. - self.buffer_queues = defaultdict(sortedlist) + # This lock guarantee that only one flush operation can happen at a + # time. Without the lock, multiple flush operation can pop from the + # same buffer_queue and worker_queue and create deadlock. For example, + # an operation holding the only query and the other flush operation + # holding the only idle replica. Additionally, allowing only one flush + # operation at a time simplifies design overhead for custom queuing and + # batching polcies. + self.flush_lock = asyncio.Lock() def is_ready(self): return True @@ -111,79 +179,99 @@ class CentralizedQueues: for backend_name, queue in self.buffer_queues.items() } - # request_slo_ms is time specified in milliseconds till which the - # answer of the query should be calculated - def enqueue_request(self, - service, - request_args, - request_kwargs, - request_context, - request_slo_ms=None): - if request_slo_ms is None: - # if request_slo_ms is not specified then set it to a high level - request_slo_ms = 1e9 + async def enqueue_request(self, + service, + request_args, + request_kwargs, + request_context, + request_slo_ms=None): + logger.debug("Received a request for service {}".format(service)) - # add wall clock time to specify the deadline for completion of query - # this also assures FIFO behaviour if request_slo_ms is not specified - request_slo_ms += (time.time() * 1000) + request_slo_ms = _adjust_latency_slo(request_slo_ms) query = Query(request_args, request_kwargs, request_context, request_slo_ms) - self.queues[service].append(query) - self.flush() - return query.result_object_id.binary() + await self.service_queues[service].put(query) + await self.flush() - def dequeue_request(self, backend, replica_handle): - intention = WorkIntent(replica_handle) - self.workers[backend].append(intention) - self.flush() + # Note: a future change can be to directly return the ObjectID from + # replica task submission + result = await query.async_future + return result - def remove_and_destory_replica(self, backend, replica_handle): - # NOTE: this function scale by O(#replicas for the backend) - new_queue = deque() - target_id = replica_handle._actor_id + async def dequeue_request(self, backend, replica_handle): + logger.debug( + "Received a dequeue request for backend {}".format(backend)) + await self.worker_queues[backend].put(replica_handle) + await self.flush() - for work_intent in self.workers[backend]: - if work_intent.replica_handle._actor_id != target_id: - new_queue.append(work_intent) + async def remove_and_destory_replica(self, backend, replica_handle): + # We need this lock because we modify worker_queue here. + async with self.flush_lock: + old_queue = self.worker_queues[backend] + new_queue = asyncio.Queue() + target_id = replica_handle._actor_id - self.workers[backend] = new_queue + while not old_queue.empty(): + replica_handle = await old_queue.get() + if replica_handle._actor_id != target_id: + await new_queue.put(replica_handle) - replica_handle.__ray_terminate__.remote() + self.worker_queues[backend] = new_queue + # TODO: consider await this with timeout, or use ray_kill + replica_handle.__ray_terminate__.remote() - def link(self, service, backend): + async def link(self, service, backend): logger.debug("Link %s with %s", service, backend) - self.set_traffic(service, {backend: 1.0}) + await self.set_traffic(service, {backend: 1.0}) - def set_traffic(self, service, traffic_dict): + async def set_traffic(self, service, traffic_dict): logger.debug("Setting traffic for service %s to %s", service, traffic_dict) self.traffic[service] = traffic_dict - self.flush() + await self.flush() - def set_backend_config(self, backend, config_dict): + async def set_backend_config(self, backend, config_dict): logger.debug("Setting backend config for " "backend {} to {}".format(backend, config_dict)) self.backend_info[backend] = config_dict - def flush(self): + async def flush(self): """In the default case, flush calls ._flush. When this class is a Ray actor, .flush can be scheduled as a remote method invocation. """ - self._flush() + async with self.flush_lock: + await self._flush_service_queues() + await self._flush_buffer_queues() def _get_available_backends(self, service): backends_in_policy = set(self.traffic[service].keys()) available_workers = { backend - for backend, queues in self.workers.items() if len(queues) > 0 + for backend, queues in self.worker_queues.items() + if queues.qsize() > 0 } return list(backends_in_policy.intersection(available_workers)) + async def _flush_service_queues(self): + """Selects the backend and puts the service queue query to the buffer + Expected Implementation: + The implementer is expected to access and manipulate + self.service_queues : dict[str,Deque] + self.buffer_queues : dict[str,sortedlist] + For registering the implemented policies register at policy.py + Expected Behavior: + the Deque of all services in self.service_queues linked with + atleast one backend must be empty irrespective of whatever + backend policy is implemented. + """ + raise NotImplementedError( + "This method should be implemented by child class.") + # flushes the buffer queue and assigns work to workers - def _flush_buffer(self): - for service in self.queues.keys(): + async def _flush_buffer_queues(self): + for service in self.traffic.keys(): ready_backends = self._get_available_backends(service) for backend in ready_backends: # no work available @@ -191,224 +279,40 @@ class CentralizedQueues: continue buffer_queue = self.buffer_queues[backend] - work_queue = self.workers[backend] + worker_queue = self.worker_queues[backend] + + logger.debug("Assigning queries for backend {} with buffer " + "queue size {} and worker queue size {}".format( + backend, len(buffer_queue), + worker_queue.qsize())) + max_batch_size = None if backend in self.backend_info: max_batch_size = self.backend_info[backend][ "max_batch_size"] - while len(buffer_queue) and len(work_queue): - # get the work from work intent queue - work = work_queue.popleft() - # see if backend accepts batched queries - if max_batch_size is not None: - pop_size = min(len(buffer_queue), max_batch_size) - request = [ - buffer_queue.pop(0) for _ in range(pop_size) - ] - else: - request = buffer_queue.pop(0) + await self._assign_query_to_worker(buffer_queue, worker_queue, + max_batch_size) - work.replica_handle._ray_serve_call.remote(request) + async def _assign_query_to_worker(self, + buffer_queue, + worker_queue, + max_batch_size=None): - # selects the backend and puts the service queue query to the buffer - # different policies will implement different backend selection policies - def _flush_service_queue(self): - """ - Expected Implementation: - The implementer is expected to access and manipulate - self.queues : dict[str,Deque] - self.buffer_queues : dict[str,sortedlist] - For registering the implemented policies register at policy.py! - Expected Behavior: - the Deque of all services in self.queues linked with - atleast one backend must be empty irrespective of whatever - backend policy is implemented. - """ - pass - - # _flush function has to flush the service and buffer queues. - def _flush(self): - self._flush_service_queue() - self._flush_buffer() - - -class CentralizedQueuesActor(CentralizedQueues): - """ - A wrapper class for converting wrapper policy classes to ray - actors. This is needed to make `flush` call asynchronous. - """ - self_handle = None - - def register_self_handle(self, handle_to_this_actor): - self.self_handle = handle_to_this_actor - - def flush(self): - if self.self_handle: - self.self_handle._flush.remote() - else: - self._flush() - - -class RandomPolicyQueue(CentralizedQueues): - """ - A wrapper class for Random policy.This backend selection policy is - `Stateless` meaning the current decisions of selecting backend are - not dependent on previous decisions. Random policy (randomly) samples - backends based on backend weights for every query. This policy uses the - weights assigned to backends. - """ - - def _flush_service_queue(self): - # perform traffic splitting for requests - for service, queue in self.queues.items(): - # while there are incoming requests and there are backends - while len(queue) and len(self.traffic[service]): - backend_names = list(self.traffic[service].keys()) - backend_weights = list(self.traffic[service].values()) - # randomly choose a backend for every query - chosen_backend = np.random.choice( - backend_names, p=backend_weights).squeeze() - - request = queue.popleft() - self.buffer_queues[chosen_backend].add(request) - - -@ray.remote -class RandomPolicyQueueActor(RandomPolicyQueue, CentralizedQueuesActor): - pass - - -class RoundRobinPolicyQueue(CentralizedQueues): - """ - A wrapper class for RoundRobin policy. This backend selection policy - is `Stateful` meaning the current decisions of selecting backend are - dependent on previous decisions. RoundRobinPolicy assigns queries in - an interleaved manner to every backend serving for a service. Consider - backend A,B linked to a service. Now queries will be assigned to backends - in the following order - [ A, B, A, B ... ] . This policy doesn't use the - weights assigned to backends. - """ - - # Saves the information about last assigned - # backend for every service - round_robin_iterator_map = {} - - def set_traffic(self, service, traffic_dict): - logger.debug("Setting traffic for service %s to %s", service, - traffic_dict) - self.traffic[service] = traffic_dict - backend_names = list(self.traffic[service].keys()) - self.round_robin_iterator_map[service] = itertools.cycle(backend_names) - self.flush() - - def _flush_service_queue(self): - # perform traffic splitting for requests - for service, queue in self.queues.items(): - # if there are incoming requests and there are backends - if len(queue) and len(self.traffic[service]): - while len(queue): - # choose the next backend available from persistent - # information - chosen_backend = next( - self.round_robin_iterator_map[service]) - request = queue.popleft() - self.buffer_queues[chosen_backend].add(request) - - -@ray.remote -class RoundRobinPolicyQueueActor(RoundRobinPolicyQueue, - CentralizedQueuesActor): - pass - - -class PowerOfTwoPolicyQueue(CentralizedQueues): - """ - A wrapper class for powerOfTwo policy. This backend selection policy is - `Stateless` meaning the current decisions of selecting backend are - dependent on previous decisions. PowerOfTwo policy (randomly) samples two - backends (say Backend A,B among A,B,C) based on the backend weights - specified and chooses the backend which is less loaded. This policy uses - the weights assigned to backends. - """ - - def _flush_service_queue(self): - # perform traffic splitting for requests - for service, queue in self.queues.items(): - # while there are incoming requests and there are backends - while len(queue) and len(self.traffic[service]): - backend_names = list(self.traffic[service].keys()) - backend_weights = list(self.traffic[service].values()) - if len(self.traffic[service]) >= 2: - # randomly pick 2 backends - backend1, backend2 = np.random.choice( - backend_names, 2, p=backend_weights) - - # see the length of buffer queues of the two backends - # and pick the one which has less no. of queries - # in the buffer - if (len(self.buffer_queues[backend1]) <= len( - self.buffer_queues[backend2])): - chosen_backend = backend1 - else: - chosen_backend = backend2 - else: - chosen_backend = np.random.choice( - backend_names, p=backend_weights).squeeze() - request = queue.popleft() - self.buffer_queues[chosen_backend].add(request) - - -@ray.remote -class PowerOfTwoPolicyQueueActor(PowerOfTwoPolicyQueue, - CentralizedQueuesActor): - pass - - -class FixedPackingPolicyQueue(CentralizedQueues): - """ - A wrapper class for FixedPacking policy. This backend selection policy is - `Stateful` meaning the current decisions of selecting backend are dependent - on previous decisions. FixedPackingPolicy is k RoundRobin policy where - first packing_num queries are handled by 'backend-1' and next k queries are - handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are - served by the same service. This policy doesn't use the weights assigned to - backends. - - """ - - def __init__(self, packing_num=3): - # Saves the information about last assigned - # backend for every service - self.fixed_packing_iterator_map = {} - self.packing_num = packing_num - super().__init__() - - def set_traffic(self, service, traffic_dict): - logger.debug("Setting traffic for service %s to %s", service, - traffic_dict) - self.traffic[service] = traffic_dict - backend_names = list(self.traffic[service].keys()) - self.fixed_packing_iterator_map[service] = itertools.cycle( - itertools.chain.from_iterable( - itertools.repeat(x, self.packing_num) for x in backend_names)) - self.flush() - - def _flush_service_queue(self): - # perform traffic splitting for requests - for service, queue in self.queues.items(): - # if there are incoming requests and there are backends - if len(queue) and len(self.traffic[service]): - while len(queue): - # choose the next backend available from persistent - # information - chosen_backend = next( - self.fixed_packing_iterator_map[service]) - request = queue.popleft() - self.buffer_queues[chosen_backend].add(request) - - -@ray.remote -class FixedPackingPolicyQueueActor(FixedPackingPolicyQueue, - CentralizedQueuesActor): - pass + while len(buffer_queue) and worker_queue.qsize(): + worker = await worker_queue.get() + if max_batch_size is None: # No batching + request = buffer_queue.pop(0) + future = worker._ray_serve_call.remote(request).as_future() + # chaining satisfies request.async_future with future result. + asyncio.futures._chain_future(future, request.async_future) + else: + real_batch_size = min(len(buffer_queue), max_batch_size) + requests = [ + buffer_queue.pop(0) for _ in range(real_batch_size) + ] + future = worker._ray_serve_call.remote(requests).as_future() + future.add_done_callback( + _make_future_unwrapper( + client_futures=[req.async_future for req in requests], + host_future=future)) diff --git a/python/ray/experimental/serve/server.py b/python/ray/experimental/serve/server.py index 1f33e9f44..d628d8f93 100644 --- a/python/ray/experimental/serve/server.py +++ b/python/ray/experimental/serve/server.py @@ -64,6 +64,7 @@ class HTTPProxy: self.serve_global_state = GlobalState() self.route_table_cache = dict() + self.route_checker_task = None self.route_checker_should_shutdown = False async def route_checker(self, interval): @@ -82,10 +83,11 @@ class HTTPProxy: message = await receive() if message["type"] == "lifespan.startup": await _async_init() - asyncio.ensure_future( + self.route_checker_task = asyncio.get_event_loop().create_task( self.route_checker(interval=HTTP_ROUTER_CHECKER_INTERVAL_S)) await send({"type": "lifespan.startup.complete"}) elif message["type"] == "lifespan.shutdown": + self.route_checker_task.cancel() self.route_checker_should_shutdown = True await send({"type": "lifespan.shutdown.complete"}) @@ -148,16 +150,14 @@ class HTTPProxy: await JSONResponse({"error": str(e)})(scope, receive, send) return - result_object_id_bytes = await ( - self.serve_global_state.init_or_get_router() - .enqueue_request.remote( - service=endpoint_name, - request_args=(scope, http_body_bytes), - request_kwargs=dict(), - request_context=TaskContext.Web, - request_slo_ms=request_slo_ms)) - - result = await ray.ObjectID(result_object_id_bytes) + actual_result = await (self.serve_global_state.init_or_get_router() + .enqueue_request.remote( + service=endpoint_name, + request_args=(scope, http_body_bytes), + request_kwargs=dict(), + request_context=TaskContext.Web, + request_slo_ms=request_slo_ms)) + result = actual_result if isinstance(result, ray.exceptions.RayTaskError): await JSONResponse({ diff --git a/python/ray/experimental/serve/task_runner.py b/python/ray/experimental/serve/task_runner.py index 40d8ef20c..5bbcfc946 100644 --- a/python/ray/experimental/serve/task_runner.py +++ b/python/ray/experimental/serve/task_runner.py @@ -95,19 +95,18 @@ class RayServeMixin: self._ray_serve_self_handle) def invoke_single(self, request_item): - args, kwargs, is_web_context, result_object_id = parse_request_item( - request_item) + args, kwargs, is_web_context = parse_request_item(request_item) serve_context.web = is_web_context start_timestamp = time.time() + try: result = self.__call__(*args, **kwargs) - ray.worker.global_worker.put_object(result, result_object_id) except Exception as e: - wrapped_exception = wrap_to_ray_error(e) + result = wrap_to_ray_error(e) self._serve_metric_error_counter += 1 - ray.worker.global_worker.put_object(wrapped_exception, - result_object_id) + self._serve_metric_latency_list.append(time.time() - start_timestamp) + return result def invoke_batch(self, request_item_list): # TODO(alind) : create no-http services. The enqueues @@ -124,78 +123,75 @@ class RayServeMixin: # args_list : [val1,val2, ...... , valn] # where n (current batch size) <= max_batch_size of a backend + arg_list = [] kwargs_list = defaultdict(list) - result_object_ids, context_flag_list, arg_list = [], [], [] - curr_batch_size = len(request_item_list) + context_flags = set() + batch_size = len(request_item_list) for item in request_item_list: - args, kwargs, is_web_context, result_object_id = ( - parse_request_item(item)) - context_flag_list.append(is_web_context) + args, kwargs, is_web_context = parse_request_item(item) + context_flags.add(is_web_context) - # Python context only have kwargs - # Web context only have one positional argument if is_web_context: - arg_list.append(args[0]) + # Python context only have kwargs + flask_request = args[0] + arg_list.append(flask_request) else: + # Web context only have one positional argument for k, v in kwargs.items(): kwargs_list[k].append(v) - result_object_ids.append(result_object_id) + + # Set the flask request as a list to conform + # with batching semantics: when in batching + # mode, each argument it turned into list. + arg_list.append(FakeFlaskRequest()) try: # check mixing of query context # unified context needed - if len(set(context_flag_list)) != 1: + if len(context_flags) != 1: raise RayServeException( - "Batched queries contain mixed context.") - serve_context.web = all(context_flag_list) - if serve_context.web: - args = (arg_list, ) - else: - # Set the flask request as a list to conform - # with batching semantics: when in batching - # mode, each argument it turned into list. - fake_flask_request_lst = [ - FakeFlaskRequest() for _ in range(curr_batch_size) - ] - args = (fake_flask_request_lst, ) - # set the current batch size (n) for serve_context - serve_context.batch_size = len(result_object_ids) + "Batched queries contain mixed context. Please only send " + "the same type of requests in batching mode.") + + serve_context.web = context_flags.pop() + serve_context.batch_size = batch_size start_timestamp = time.time() + result_list = self.__call__(*args, **kwargs_list) - if (not isinstance(result_list, list)) or (len(result_list) != - len(result_object_ids)): + + self._serve_metric_latency_list.append(time.time() - + start_timestamp) + if (not isinstance(result_list, + list)) or (len(result_list) != batch_size): raise RayServeException("__call__ function " "doesn't preserve batch-size. " "Please return a list of result " "with length equals to the batch " "size.") - for result, result_object_id in zip(result_list, - result_object_ids): - ray.worker.global_worker.put_object(result, result_object_id) - self._serve_metric_latency_list.append(time.time() - - start_timestamp) + return result_list except Exception as e: wrapped_exception = wrap_to_ray_error(e) - self._serve_metric_error_counter += len(result_object_ids) - for result_object_id in result_object_ids: - ray.worker.global_worker.put_object(wrapped_exception, - result_object_id) + self._serve_metric_error_counter += batch_size + return [wrapped_exception for _ in range(batch_size)] def _ray_serve_call(self, request): - work_item = request # check if work_item is a list or not # if it is list: then batching supported - if not isinstance(work_item, list): - self.invoke_single(work_item) + if not isinstance(request, list): + result = self.invoke_single(request) else: - self.invoke_batch(work_item) + result = self.invoke_batch(request) # re-assign to default values serve_context.web = False serve_context.batch_size = None + + # Tell router that current actor is idle self._ray_serve_fetch() + return result + class TaskRunnerBackend(TaskRunner, RayServeMixin): """A simple function serving backend diff --git a/python/ray/experimental/serve/tests/test_persistence.py b/python/ray/experimental/serve/tests/test_persistence.py index 037a79447..52c4cdc92 100644 --- a/python/ray/experimental/serve/tests/test_persistence.py +++ b/python/ray/experimental/serve/tests/test_persistence.py @@ -14,13 +14,9 @@ ray.init(address="auto") from ray.experimental import serve serve.init() - -def function(flask_request): +@serve.route("/driver") +def driver(flask_request): return "OK!" - -serve.create_endpoint("driver", "/driver") -serve.create_backend(function, "driver:v1") -serve.link("driver", "driver:v1") """ with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: diff --git a/python/ray/experimental/serve/tests/test_queue.py b/python/ray/experimental/serve/tests/test_queue.py index 181be51cd..ce9ab2531 100644 --- a/python/ray/experimental/serve/tests/test_queue.py +++ b/python/ray/experimental/serve/tests/test_queue.py @@ -1,135 +1,190 @@ +import asyncio + import pytest import ray -from ray.experimental.serve.queues import RandomPolicyQueue -from ray.experimental.serve.queues import (RoundRobinPolicyQueue, - FixedPackingPolicyQueue) + +from ray.experimental.serve.policy import ( + RandomPolicyQueue, RandomPolicyQueueActor, RoundRobinPolicyQueueActor, + PowerOfTwoPolicyQueueActor, FixedPackingPolicyQueueActor) + +pytestmark = pytest.mark.asyncio + + +def make_task_runner_mock(): + @ray.remote(num_cpus=0) + class TaskRunnerMock: + def __init__(self): + self.query = None + self.queries = [] + + async def _ray_serve_call(self, request_item): + self.query = request_item + self.queries.append(request_item) + return "DONE" + + def get_recent_call(self): + return self.query + + def get_all_calls(self): + return self.queries + + return TaskRunnerMock.remote() @pytest.fixture(scope="session") def task_runner_mock_actor(): - @ray.remote - class TaskRunnerMock: - def __init__(self): - self.result = None - - def _ray_serve_call(self, request_item): - self.result = request_item - - def get_recent_call(self): - return self.result - - actor = TaskRunnerMock.remote() - yield actor + yield make_task_runner_mock() -def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueue() - q.link("svc", "backend") +async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): + q = RandomPolicyQueueActor.remote() + q.link.remote("svc", "backend") + q.dequeue_request.remote("backend", task_runner_mock_actor) - result_object_id = q.enqueue_request("svc", 1, "kwargs", None) - q.dequeue_request("backend", task_runner_mock_actor) - got_work = ray.get(task_runner_mock_actor.get_recent_call.remote()) + # Make sure we get the request result back + result = await q.enqueue_request.remote("svc", 1, "kwargs", None) + assert result == "DONE" + + # Make sure it's the right request + got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.request_args == 1 assert got_work.request_kwargs == "kwargs" - ray.worker.global_worker.put_object(2, got_work.result_object_id) - assert ray.get(ray.ObjectID(result_object_id)) == 2 +async def test_slo(serve_instance, task_runner_mock_actor): + q = RandomPolicyQueueActor.remote() + await q.link.remote("svc", "backend") -def test_slo(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueue() - q.link("svc", "backend") - + all_request_sent = [] for i in range(10): slo_ms = 1000 - 100 * i - q.enqueue_request("svc", i, "kwargs", None, request_slo_ms=slo_ms) + all_request_sent.append( + q.enqueue_request.remote( + "svc", i, "kwargs", None, request_slo_ms=slo_ms)) + for i in range(10): - q.dequeue_request("backend", task_runner_mock_actor) - got_work = ray.get(task_runner_mock_actor.get_recent_call.remote()) - assert got_work.request_args == (9 - i) + await q.dequeue_request.remote("backend", task_runner_mock_actor) + + await asyncio.gather(*all_request_sent) + + i_should_be = 9 + all_calls = await task_runner_mock_actor.get_all_calls.remote() + all_calls = all_calls[-10:] + for call in all_calls: + assert call.request_args == i_should_be + i_should_be -= 1 -def test_alter_backend(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueue() +async def test_alter_backend(serve_instance, task_runner_mock_actor): + q = RandomPolicyQueueActor.remote() - q.set_traffic("svc", {"backend-1": 1}) - result_object_id = q.enqueue_request("svc", 1, "kwargs", None) - q.dequeue_request("backend-1", task_runner_mock_actor) - got_work = ray.get(task_runner_mock_actor.get_recent_call.remote()) + await q.set_traffic.remote("svc", {"backend-1": 1}) + await q.dequeue_request.remote("backend-1", task_runner_mock_actor) + await q.enqueue_request.remote("svc", 1, "kwargs", None) + got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.request_args == 1 - ray.worker.global_worker.put_object(2, got_work.result_object_id) - assert ray.get(ray.ObjectID(result_object_id)) == 2 - q.set_traffic("svc", {"backend-2": 1}) - result_object_id = q.enqueue_request("svc", 1, "kwargs", None) - q.dequeue_request("backend-2", task_runner_mock_actor) - got_work = ray.get(task_runner_mock_actor.get_recent_call.remote()) - assert got_work.request_args == 1 - ray.worker.global_worker.put_object(2, got_work.result_object_id) - assert ray.get(ray.ObjectID(result_object_id)) == 2 + await q.set_traffic.remote("svc", {"backend-2": 1}) + await q.dequeue_request.remote("backend-2", task_runner_mock_actor) + await q.enqueue_request.remote("svc", 2, "kwargs", None) + got_work = await task_runner_mock_actor.get_recent_call.remote() + assert got_work.request_args == 2 -def test_split_traffic(serve_instance, task_runner_mock_actor): - q = RandomPolicyQueue() +async def test_split_traffic_random(serve_instance, task_runner_mock_actor): + q = RandomPolicyQueueActor.remote() + + await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5}) + runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)] + for _ in range(20): + await q.dequeue_request.remote("backend-1", runner_1) + await q.dequeue_request.remote("backend-2", runner_2) - q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) # assume 50% split, the probability of all 20 requests goes to a # single queue is 0.5^20 ~ 1-6 for _ in range(20): - q.enqueue_request("svc", 1, "kwargs", None) - q.dequeue_request("backend-1", task_runner_mock_actor) - result_one = ray.get(task_runner_mock_actor.get_recent_call.remote()) - q.dequeue_request("backend-2", task_runner_mock_actor) - result_two = ray.get(task_runner_mock_actor.get_recent_call.remote()) + await q.enqueue_request.remote("svc", 1, "kwargs", None) - got_work = [result_one, result_two] + got_work = [ + await runner.get_recent_call.remote() + for runner in (runner_1, runner_2) + ] assert [g.request_args for g in got_work] == [1, 1] -def test_split_traffic_round_robin(serve_instance, task_runner_mock_actor): - q = RoundRobinPolicyQueue() - q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) - # since round robin policy is stateful firing two queries consecutively - # would transfer the queries to two different backends - for _ in range(2): - q.enqueue_request("svc", 1, "kwargs", None) - q.dequeue_request("backend-1", task_runner_mock_actor) - result_one = ray.get(task_runner_mock_actor.get_recent_call.remote()) - q.dequeue_request("backend-2", task_runner_mock_actor) - result_two = ray.get(task_runner_mock_actor.get_recent_call.remote()) +async def test_round_robin(serve_instance, task_runner_mock_actor): + q = RoundRobinPolicyQueueActor.remote() - got_work = [result_one, result_two] + await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5}) + runner_1, runner_2 = [make_task_runner_mock() for _ in range(2)] + + # NOTE: this is the only difference between the + # test_split_traffic_random and test_round_robin + for _ in range(10): + await q.dequeue_request.remote("backend-1", runner_1) + await q.dequeue_request.remote("backend-2", runner_2) + + for _ in range(20): + await q.enqueue_request.remote("svc", 1, "kwargs", None) + + got_work = [ + await runner.get_recent_call.remote() + for runner in (runner_1, runner_2) + ] assert [g.request_args for g in got_work] == [1, 1] -def test_split_traffic_fixed_packing(serve_instance, task_runner_mock_actor): +async def test_fixed_packing(serve_instance): packing_num = 4 - q = FixedPackingPolicyQueue(packing_num=packing_num) - q.set_traffic("svc", {"backend-1": 0.5, "backend-2": 0.5}) - - # fire twice the number of queries as the packing number - for i in range(2 * packing_num): - q.enqueue_request("svc", i, "kwargs", None) + q = FixedPackingPolicyQueueActor.remote(packing_num=packing_num) + await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5}) + runner_1, runner_2 = (make_task_runner_mock() for _ in range(2)) # both the backends will get equal number of queries # as it is packed round robin for _ in range(packing_num): - q.dequeue_request("backend-1", task_runner_mock_actor) + await q.dequeue_request.remote("backend-1", runner_1) + await q.dequeue_request.remote("backend-2", runner_2) - result_one = ray.get(task_runner_mock_actor.get_recent_call.remote()) - - for _ in range(packing_num): - q.dequeue_request("backend-2", task_runner_mock_actor) - - result_two = ray.get(task_runner_mock_actor.get_recent_call.remote()) - - got_work = [result_one, result_two] - assert [g.request_args - for g in got_work] == [packing_num - 1, 2 * packing_num - 1] + for backend, runner in zip(["1", "2"], [runner_1, runner_2]): + for _ in range(packing_num): + input_value = "should-go-to-backend-{}".format(backend) + await q.enqueue_request.remote("svc", input_value, "kwargs", None) + all_calls = await runner.get_all_calls.remote() + for call in all_calls: + assert call.request_args == input_value -def test_queue_remove_replicas(serve_instance, task_runner_mock_actor): +async def test_power_of_two_choices(serve_instance): + q = PowerOfTwoPolicyQueueActor.remote() + enqueue_futures = [] + + # First, fill the queue for backend-1 with 3 requests + await q.set_traffic.remote("svc", {"backend-1": 1.0}) + for _ in range(3): + future = q.enqueue_request.remote("svc", "1", "", None) + enqueue_futures.append(future) + + # Then, add a new backend, this backend should be filled next + await q.set_traffic.remote("svc", {"backend-1": 0.5, "backend-2": 0.5}) + for _ in range(2): + future = q.enqueue_request.remote("svc", "2", "", None) + enqueue_futures.append(future) + + runner_1, runner_2 = (make_task_runner_mock() for _ in range(2)) + for _ in range(3): + await q.dequeue_request.remote("backend-1", runner_1) + await q.dequeue_request.remote("backend-2", runner_2) + + await asyncio.gather(*enqueue_futures) + + assert len(await runner_1.get_all_calls.remote()) == 3 + assert len(await runner_2.get_all_calls.remote()) == 2 + + +async def test_queue_remove_replicas(serve_instance): + temp_actor = make_task_runner_mock() q = RandomPolicyQueue() - q.dequeue_request("backend", task_runner_mock_actor) - q.remove_and_destory_replica("backend", task_runner_mock_actor) - assert len(q.workers["backend"]) == 0 + await q.dequeue_request("backend", temp_actor) + await q.remove_and_destory_replica("backend", temp_actor) + assert q.worker_queues["backend"].qsize() == 0 diff --git a/python/ray/experimental/serve/tests/test_task_runner.py b/python/ray/experimental/serve/tests/test_task_runner.py index 158d88fc4..d303d0d13 100644 --- a/python/ray/experimental/serve/tests/test_task_runner.py +++ b/python/ray/experimental/serve/tests/test_task_runner.py @@ -2,12 +2,14 @@ import pytest import ray import ray.experimental.serve.context as context -from ray.experimental.serve.queues import RoundRobinPolicyQueueActor +from ray.experimental.serve.policy import RoundRobinPolicyQueueActor from ray.experimental.serve.task_runner import ( RayServeMixin, TaskRunner, TaskRunnerActor, wrap_to_ray_error) +pytestmark = pytest.mark.asyncio -def test_runner_basic(): + +async def test_runner_basic(): def echo(i): return i @@ -15,12 +17,12 @@ def test_runner_basic(): assert r(1) == 1 -def test_runner_wraps_error(): +async def test_runner_wraps_error(): wrapped = wrap_to_ray_error(Exception()) assert isinstance(wrapped, ray.exceptions.RayTaskError) -def test_runner_actor(serve_instance): +async def test_runner_actor(serve_instance): q = RoundRobinPolicyQueueActor.remote() def echo(flask_request, i=None): @@ -30,24 +32,21 @@ def test_runner_actor(serve_instance): PRODUCER_NAME = "prod" runner = TaskRunnerActor.remote(echo) - runner._ray_serve_setup.remote(CONSUMER_NAME, q, runner) runner._ray_serve_fetch.remote() q.link.remote(PRODUCER_NAME, CONSUMER_NAME) for query in [333, 444, 555]: - result_token = ray.ObjectID( - ray.get( - q.enqueue_request.remote( - PRODUCER_NAME, - request_args=None, - request_kwargs={"i": query}, - request_context=context.TaskContext.Python))) - assert ray.get(result_token) == query + result = await q.enqueue_request.remote( + PRODUCER_NAME, + request_args=None, + request_kwargs={"i": query}, + request_context=context.TaskContext.Python) + assert result == query -def test_ray_serve_mixin(serve_instance): +async def test_ray_serve_mixin(serve_instance): q = RoundRobinPolicyQueueActor.remote() CONSUMER_NAME = "runner-cls" @@ -72,17 +71,15 @@ def test_ray_serve_mixin(serve_instance): q.link.remote(PRODUCER_NAME, CONSUMER_NAME) for query in [333, 444, 555]: - result_token = ray.ObjectID( - ray.get( - q.enqueue_request.remote( - PRODUCER_NAME, - request_args=None, - request_kwargs={"i": query}, - request_context=context.TaskContext.Python))) - assert ray.get(result_token) == query + 3 + result = await q.enqueue_request.remote( + PRODUCER_NAME, + request_args=None, + request_kwargs={"i": query}, + request_context=context.TaskContext.Python) + assert result == query + 3 -def test_task_runner_check_context(serve_instance): +async def test_task_runner_check_context(serve_instance): q = RoundRobinPolicyQueueActor.remote() def echo(flask_request, i=None): @@ -98,13 +95,11 @@ def test_task_runner_check_context(serve_instance): runner._ray_serve_fetch.remote() q.link.remote(PRODUCER_NAME, CONSUMER_NAME) - result_token = ray.ObjectID( - ray.get( - q.enqueue_request.remote( - PRODUCER_NAME, - request_args=None, - request_kwargs={"i": 42}, - request_context=context.TaskContext.Python))) + result_oid = q.enqueue_request.remote( + PRODUCER_NAME, + request_args=None, + request_kwargs={"i": 42}, + request_context=context.TaskContext.Python) with pytest.raises(ray.exceptions.RayTaskError): - ray.get(result_token) + await result_oid diff --git a/python/ray/experimental/serve/utils.py b/python/ray/experimental/serve/utils.py index bf41513dc..88274cf27 100644 --- a/python/ray/experimental/serve/utils.py +++ b/python/ray/experimental/serve/utils.py @@ -4,6 +4,7 @@ import random import string import time import io +import os import requests from pygments import formatters, highlight, lexers @@ -23,14 +24,16 @@ def parse_request_item(request_item): args = (FakeFlaskRequest(), ) kwargs = request_item.request_kwargs - result_object_id = request_item.result_object_id - return args, kwargs, is_web_context, result_object_id + return args, kwargs, is_web_context def _get_logger(): logger = logging.getLogger("ray.serve") # TODO(simon): Make logging level configurable. - logger.setLevel(logging.INFO) + if os.environ.get("SERVE_LOG_DEBUG"): + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) return logger