From 948b1b09e82a8964acb7c5b69a7782ebdffc8a1b Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 26 Aug 2019 19:28:19 -0700 Subject: [PATCH] Remove previous version of ray.serve (#5541) --- python/ray/experimental/serve/README.rst | 76 ------- python/ray/experimental/serve/__init__.py | 28 --- .../ray/experimental/serve/examples/adder.py | 47 ---- .../experimental/serve/examples/counter.py | 29 --- .../ray/experimental/serve/examples/halt.py | 41 ---- .../experimental/serve/frontend/__init__.py | 7 - .../serve/frontend/http_frontend.py | 72 ------ python/ray/experimental/serve/mixin.py | 63 ------ python/ray/experimental/serve/object_id.py | 21 -- .../ray/experimental/serve/router/__init__.py | 26 --- .../ray/experimental/serve/router/routers.py | 209 ------------------ .../experimental/serve/tests/test_actors.py | 68 ------ .../serve/tests/test_deadline_router.py | 91 -------- .../serve/tests/test_default_app.py | 46 ---- .../ray/experimental/serve/utils/__init__.py | 0 .../serve/utils/priority_queue.py | 27 --- 16 files changed, 851 deletions(-) delete mode 100644 python/ray/experimental/serve/README.rst delete mode 100644 python/ray/experimental/serve/__init__.py delete mode 100644 python/ray/experimental/serve/examples/adder.py delete mode 100644 python/ray/experimental/serve/examples/counter.py delete mode 100644 python/ray/experimental/serve/examples/halt.py delete mode 100644 python/ray/experimental/serve/frontend/__init__.py delete mode 100644 python/ray/experimental/serve/frontend/http_frontend.py delete mode 100644 python/ray/experimental/serve/mixin.py delete mode 100644 python/ray/experimental/serve/object_id.py delete mode 100644 python/ray/experimental/serve/router/__init__.py delete mode 100644 python/ray/experimental/serve/router/routers.py delete mode 100644 python/ray/experimental/serve/tests/test_actors.py delete mode 100644 python/ray/experimental/serve/tests/test_deadline_router.py delete mode 100644 python/ray/experimental/serve/tests/test_default_app.py delete mode 100644 python/ray/experimental/serve/utils/__init__.py delete mode 100644 python/ray/experimental/serve/utils/priority_queue.py diff --git a/python/ray/experimental/serve/README.rst b/python/ray/experimental/serve/README.rst deleted file mode 100644 index b59672c46..000000000 --- a/python/ray/experimental/serve/README.rst +++ /dev/null @@ -1,76 +0,0 @@ -Ray Serve Module -================ - -``ray.experimental.serve`` is a module for publishing your actors to -interact with outside world. - -Use Case --------- - -Serve machine learning model -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Scalable anayltics query -~~~~~~~~~~~~~~~~~~~~~~~~ - -Composible pipelines -~~~~~~~~~~~~~~~~~~~~ - -Architecture ------------- - -``ray.experimental.serve`` is implemented in a three-tiered system. Each -tier can scale horizontally. - -In the following illustration, call chain goes from top to bottom. Each -box is one or more replicated ray actors. - -:: - - +-------------------+ +-----------------+ +------------+ - Frontend | HTTP Frontend | | Arrow RPC | | ... | - Tier | | | | | | - +-------------------+ +-----------------+ +------------+ - - +------------------------------------------------------------+ - - +--------------------+ +-------------------+ - Router | Default Router | | Deadline Aware | - Tier | | | Router | - +--------------------+ +-------------------+ - - +------------------------------------------------------------+ - - +----------------+ +--------------+ +-------------+ - Managed | Managed Actor | | ... | | ... | - Actor | Replica | | | | | - Tier +----------------+ +--------------+ +-------------+ - -Frontend Tier -~~~~~~~~~~~~~ - -The frontend tier is repsonsible for interface with the world. Currently -``ray.experimental.serve`` provides implementation for - HTTP Frontend - -And we are planning to add support for - Arrow RPC - zeromq - -Router Tier -~~~~~~~~~~~ - -The router tier receives calls from frontend and route them to the -managed actors. Routers both *route* and *queue* incoming queries. -``ray.experimental.serve`` has native support for (micro-)batching -queries. - -In addition, we implemented a deadline aware routers that will put high -priority queries in the front of the queue so they will be delivered -first. - -Managed Actor Tier -~~~~~~~~~~~~~~~~~~ - -Managed actors will be managed by routers. These actors can contains -arbitrary methods. Methods in the actors class are assumed to be able to -take into a single input. To fully utilize the vectorized instructions, like -``np.sum``, you can use the ``@batched_input`` decorator, it will run your method -in on a micro-batch. diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py deleted file mode 100644 index 15757a9d5..000000000 --- a/python/ray/experimental/serve/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -"""A module for serving from actors. - -The ray.experimental.serve module is a module for publishing your actors to -interact with the outside world. -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import sys - -assert sys.version_info >= (3, ), ( - "ray.experimental.serve is a python3 only library") - -from ray.experimental.serve.router import (DeadlineAwareRouter, - SingleQuery) # noqa: E402 -from ray.experimental.serve.frontend import HTTPFrontendActor # noqa: E402 -from ray.experimental.serve.mixin import (RayServeMixin, - batched_input) # noqa: E402 - -__all__ = [ - "DeadlineAwareRouter", - "SingleQuery", - "HTTPFrontendActor", - "RayServeMixin", - "batched_input", -] diff --git a/python/ray/experimental/serve/examples/adder.py b/python/ray/experimental/serve/examples/adder.py deleted file mode 100644 index 862e61c71..000000000 --- a/python/ray/experimental/serve/examples/adder.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class VectorizedAdder(RayServeMixin): - """Actor that adds scaler_increment to input batch. - - result = np.array(input_batch) + scaler_increment - """ - - def __init__(self, scaler_increment): - self.inc = scaler_increment - - @batched_input - def __call__(self, input_batch): - arr = np.array(input_batch) - arr += self.inc - return arr.tolist() - - -@ray.remote -class ScalerAdder(RayServeMixin): - """Actor that adds a scaler_increment to a single input.""" - - def __init__(self, scaler_increment): - self.inc = scaler_increment - - def __call__(self, input_scaler): - return input_scaler + self.inc - - -@ray.remote -class VectorDouble(RayServeMixin): - """Actor that doubles the batched input.""" - - @batched_input - def __call__(self, batched_vectors): - matrix = np.array(batched_vectors) - matrix *= 2 - return [v.tolist() for v in matrix] diff --git a/python/ray/experimental/serve/examples/counter.py b/python/ray/experimental/serve/examples/counter.py deleted file mode 100644 index 369d53fb5..000000000 --- a/python/ray/experimental/serve/examples/counter.py +++ /dev/null @@ -1,29 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class Counter(RayServeMixin): - """Return the query id. Used for testing router.""" - - def __init__(self): - self.counter = 0 - - def __call__(self, batched_input): - self.counter += 1 - return self.counter - - -@ray.remote -class CustomCounter(RayServeMixin): - """Return the query id. Used for testing `serve_method` signature.""" - - serve_method = "count" - - @batched_input - def count(self, input_batch): - return [1 for _ in range(len(input_batch))] diff --git a/python/ray/experimental/serve/examples/halt.py b/python/ray/experimental/serve/examples/halt.py deleted file mode 100644 index eceb94d86..000000000 --- a/python/ray/experimental/serve/examples/halt.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import time - -import ray -from ray.experimental.serve import RayServeMixin, batched_input - - -@ray.remote -class SleepOnFirst(RayServeMixin): - """Sleep on the first request, return batch size. - - Used for testing the DeadlineAwareRouter. - """ - - def __init__(self, sleep_time): - self.nap_time = sleep_time - - @batched_input - def __call__(self, input_batch): - time.sleep(self.nap_time) - return [len(input_batch) for _ in range(len(input_batch))] - - -@ray.remote -class SleepCounter(RayServeMixin): - """Sleep on input argument seconds, return the query id. - - Used to test the DeadlineAwareRouter. - """ - - def __init__(self): - self.counter = 0 - - def __call__(self, inp): - time.sleep(inp) - - self.counter += 1 - return self.counter diff --git a/python/ray/experimental/serve/frontend/__init__.py b/python/ray/experimental/serve/frontend/__init__.py deleted file mode 100644 index b1cb44636..000000000 --- a/python/ray/experimental/serve/frontend/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from ray.experimental.serve.frontend.http_frontend import HTTPFrontendActor - -__all__ = ["HTTPFrontendActor"] diff --git a/python/ray/experimental/serve/frontend/http_frontend.py b/python/ray/experimental/serve/frontend/http_frontend.py deleted file mode 100644 index 66973caca..000000000 --- a/python/ray/experimental/serve/frontend/http_frontend.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import time - -import uvicorn -from starlette.applications import Starlette -from starlette.responses import JSONResponse - -import ray - - -def unwrap(future): - """Unwrap the result from ray.experimental.server router. - Router returns a list of object ids when you call them. - """ - - return ray.get(future)[0] - - -@ray.remote -class HTTPFrontendActor: - """HTTP API for an Actor. This exposes /{actor_name} endpoint for query. - - Request: - GET /{actor_name} or POST /{actor_name} - Content-type: application/json - { - "slo_ms": float, - "input": any - } - Response: - Content-type: application/json - { - "success": bool, - "actor": str, - "result": any - } - """ - - def __init__(self, ip="0.0.0.0", port=8080, router="DefaultRouter"): - self.ip = ip - self.port = port - self.router = ray.experimental.named_actors.get_actor(router) - - def start(self): - default_app = Starlette() - - @default_app.route("/{actor}", methods=["GET", "POST"]) - async def dispatch_remote_function(request): - data = await request.json() - actor_name = request.path_params["actor"] - - slo_seconds = data.pop("slo_ms") / 1000 - deadline = time.perf_counter() + slo_seconds - - inp = data.pop("input") - - result_future = unwrap( - self.router.call.remote(actor_name, inp, deadline)) - - # TODO(simon): change to asyncio ray.get - result = ray.get(result_future) - - return JSONResponse({ - "success": True, - "actor": actor_name, - "result": result - }) - - uvicorn.run(default_app, host=self.ip, port=self.port) diff --git a/python/ray/experimental/serve/mixin.py b/python/ray/experimental/serve/mixin.py deleted file mode 100644 index 858572634..000000000 --- a/python/ray/experimental/serve/mixin.py +++ /dev/null @@ -1,63 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import traceback -from typing import List - -import ray -from ray.experimental.serve import SingleQuery - - -def batched_input(func): - """Decorator to mark an actor method as accepting only a single input. - - By default methods accept a batch. - """ - func.ray_serve_batched_input = True - return func - - -def _execute_and_seal_error(method, arg, method_name): - """Execute method with arg and return the result. - - If the method fails, return a RayTaskError so it can be sealed in the - resultOID and retried by user. - """ - try: - return method(arg) - except Exception: - return ray.worker.RayTaskError(method_name, traceback.format_exc()) - - -class RayServeMixin: - """Enable a ray actor to interact with ray.serve - - Usage: - ``` - @ray.remote - class MyActor(RayServeMixin): - # This is optional, by default it is "__call__" - serve_method = 'my_method' - - def my_method(self, arg): - ... - ``` - """ - - serve_method = "__call__" - - def _dispatch(self, input_batch: List[SingleQuery]): - """Helper method to dispatch a batch of input to self.serve_method.""" - method = getattr(self, self.serve_method) - if hasattr(method, "ray_serve_batched_input"): - batch = [inp.data for inp in input_batch] - result = _execute_and_seal_error(method, batch, self.serve_method) - for res, inp in zip(result, input_batch): - ray.worker.global_worker.put_object(inp.result_object_id, res) - else: - for inp in input_batch: - result = _execute_and_seal_error(method, inp.data, - self.serve_method) - ray.worker.global_worker.put_object(inp.result_object_id, - result) diff --git a/python/ray/experimental/serve/object_id.py b/python/ray/experimental/serve/object_id.py deleted file mode 100644 index cdde52532..000000000 --- a/python/ray/experimental/serve/object_id.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Helper methods for dealing with ray.ObjectID -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray - - -def unwrap(future): - return ray.get(future)[0] - - -def get_new_oid(): - worker = ray.worker.global_worker - oid = ray._raylet.compute_put_id(worker.current_task_id, - worker.task_context.put_index) - worker.task_context.put_index += 1 - return oid diff --git a/python/ray/experimental/serve/router/__init__.py b/python/ray/experimental/serve/router/__init__.py deleted file mode 100644 index dae5fcb7c..000000000 --- a/python/ray/experimental/serve/router/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from ray.experimental.serve.router.routers import (DeadlineAwareRouter, - SingleQuery) -import ray - - -def start_router(router_class, router_name): - """Wrapper for starting a router and register it. - - Args: - router_class: The router class to instantiate. - router_name: The name to give to the router. - - Returns: - A handle to newly started router actor. - """ - handle = router_class.remote(router_name) - ray.experimental.register_actor(router_name, handle) - handle.start.remote() - return handle - - -__all__ = ["DeadlineAwareRouter", "SingleQuery"] diff --git a/python/ray/experimental/serve/router/routers.py b/python/ray/experimental/serve/router/routers.py deleted file mode 100644 index b11849039..000000000 --- a/python/ray/experimental/serve/router/routers.py +++ /dev/null @@ -1,209 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from collections import defaultdict -from functools import total_ordering -from typing import Callable, Dict, List, Set, Tuple - -import ray -from ray.experimental.serve.object_id import get_new_oid -from ray.experimental.serve.utils.priority_queue import PriorityQueue - -ACTOR_NOT_REGISTERED_MSG: Callable = ( - lambda name: ("Actor {} is not registered with this router. Please use " - "'router.register_actor.remote(...)' " - "to register it.").format(name)) - - -# Use @total_ordering so we can sort SingleQuery -@total_ordering -class SingleQuery: - """A data container for a query. - - Attributes: - data: The request data. - result_object_id: The result object ID. - deadline: The deadline in seconds. - """ - - def __init__(self, data, result_object_id: ray.ObjectID, - deadline_s: float): - self.data = data - self.result_object_id = result_object_id - self.deadline = deadline_s - - def __lt__(self, other): - return self.deadline < other.deadline - - def __eq__(self, other): - return self.deadline == other.deadline - - -@ray.remote -class DeadlineAwareRouter: - """DeadlineAwareRouter is a router that is aware of deadlines. - - It takes into consideration the deadline attached to each query. It will - reorder incoming query based on their deadlines. - """ - - def __init__(self, router_name): - # Runtime Data - self.query_queues: Dict[str, PriorityQueue] = defaultdict( - PriorityQueue) - self.running_queries: Dict[ray.ObjectID, ray.actor.ActorHandle] = {} - self.actor_handles: Dict[str, List[ray.actor.ActorHandle]] = ( - defaultdict(list)) - - # Actor Metadata - self.managed_actors: Dict[str, ray.actor.ActorClass] = {} - self.actor_init_arguments: Dict[str, Tuple[List, Dict]] = {} - self.max_batch_size: Dict[str, int] = {} - - # Router Metadata - self.name = router_name - - def start(self): - """Kick off the router loop""" - - # Note: This is meant for hiding the complexity for a user - # facing method. - # Because the `loop` api can be hard to understand. - ray.experimental.get_actor(self.name).loop.remote() - - def register_actor( - self, - actor_name: str, - actor_class: ray.actor.ActorClass, - init_args: List = [], - init_kwargs: dict = {}, - num_replicas: int = 1, - max_batch_size: int = -1, # Unbounded batch size - ): - """Register a new managed actor. - """ - self.managed_actors[actor_name] = actor_class - self.actor_init_arguments[actor_name] = (init_args, init_kwargs) - self.max_batch_size[actor_name] = max_batch_size - - ray.experimental.get_actor(self.name).set_replica.remote( - actor_name, num_replicas) - - def set_replica(self, actor_name, new_replica_count): - """Scale a managed actor according to new_replica_count.""" - assert actor_name in self.managed_actors, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) - - current_replicas = len(self.actor_handles[actor_name]) - - # Increase the number of replicas - if new_replica_count > current_replicas: - for _ in range(new_replica_count - current_replicas): - args = self.actor_init_arguments[actor_name][0] - kwargs = self.actor_init_arguments[actor_name][1] - new_actor_handle = self.managed_actors[actor_name].remote( - *args, **kwargs) - self.actor_handles[actor_name].append(new_actor_handle) - - # Decrease the number of replicas - if new_replica_count < current_replicas: - for _ in range(current_replicas - new_replica_count): - # Note actor destructor will be called after all remaining - # calls finish. Therefore it's safe to call del here. - del self.actor_handles[actor_name][-1] - - def call(self, actor_name, data, deadline_s): - """Enqueue a request to one of the actor managed by this router. - - Returns: - List[ray.ObjectID] with length 1, the object ID wrapped inside is - the result object ID when the query is executed. - """ - assert actor_name in self.managed_actors, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) - - result_object_id = get_new_oid() - - # Here, 'data_object_id' is either an ObjectID or an actual object. - # When it is an ObjectID, this is an optimization to avoid creating - # an extra copy of 'data' in the object store. - data_object_id = ray.worker.global_worker._current_task.arguments()[1] - - self.query_queues[actor_name].push( - SingleQuery(data_object_id, result_object_id, deadline_s)) - - return [result_object_id] - - def loop(self): - """Main loop for router. It will does the following things: - - 1. Check which running actors finished. - 2. Iterate over free actors and request queues, dispatch requests batch - to free actors. - 3. Tail recursively schedule itself. - """ - - # 1. Check which running actors finished. - ready_oids, _ = ray.wait( - object_ids=list(self.running_queries.keys()), - num_returns=len(self.running_queries), - timeout=0, - ) - - for ready_oid in ready_oids: - self.running_queries.pop(ready_oid) - busy_actors: Set[ray.actor.ActorHandle] = set( - self.running_queries.values()) - - # 2. Iterate over free actors and request queues, dispatch requests - # batch to free actors. - for actor_name, queue in self.query_queues.items(): - # try to drain the queue - for actor_handle in self.actor_handles[actor_name]: - if len(queue) == 0: - break - - if actor_handle in busy_actors: - continue - - # A free actor found. Dispatch queries. - batch = self._get_next_batch(actor_name) - assert len(batch) - - batch_result_object_id = actor_handle._dispatch.remote(batch) - self._mark_running(batch_result_object_id, actor_handle) - - # 3. Tail recursively schedule itself. - ray.experimental.get_actor(self.name).loop.remote() - - def _get_next_batch(self, actor_name: str) -> List[SingleQuery]: - """Get next batch of request for the actor whose name is provided.""" - assert actor_name in self.query_queues, ( - ACTOR_NOT_REGISTERED_MSG(actor_name)) - - inputs = [] - batch_size = self.max_batch_size[actor_name] - if batch_size == -1: - inp = self.query_queues[actor_name].try_pop() - while inp: - inputs.append(inp) - inp = self.query_queues[actor_name].try_pop() - else: - for _ in range(batch_size): - inp = self.query_queues[actor_name].try_pop() - if inp: - inputs.append(inp) - else: - break - - return inputs - - def _mark_running(self, batch_oid: ray.ObjectID, - actor_handle: ray.actor.ActorHandle): - """Mark actor_handle as running identified by batch_oid. - - This means that if batch_oid is fullfilled, then actor_handle must be - free. - """ - self.running_queries[batch_oid] = actor_handle diff --git a/python/ray/experimental/serve/tests/test_actors.py b/python/ray/experimental/serve/tests/test_actors.py deleted file mode 100644 index 3b2748b73..000000000 --- a/python/ray/experimental/serve/tests/test_actors.py +++ /dev/null @@ -1,68 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np -import pytest - -import ray -from ray.experimental.serve import SingleQuery -from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder -from ray.experimental.serve.examples.counter import Counter, CustomCounter -from ray.experimental.serve.object_id import get_new_oid - -INCREMENT = 3 - - -@pytest.fixture(scope="module") -def ray_start(): - ray.init(num_cpus=4) - yield - ray.shutdown() - - -@pytest.fixture -def generated_inputs(): - deadline = 11111.11 - inputs = [] - input_arr = np.arange(10) - for i in input_arr: - oid = get_new_oid() - inputs.append( - SingleQuery(data=i, result_object_id=oid, deadline_s=deadline)) - return inputs - - -def test_vadd(ray_start, generated_inputs): - adder = VectorizedAdder.remote(INCREMENT) - inputs = generated_inputs - oids = [inp.result_object_id for inp in inputs] - input_data = [inp.data for inp in inputs] - - adder._dispatch.remote(inputs) - result_arr = np.array(ray.get(oids)) - assert np.array_equal(result_arr, np.array(input_data) + INCREMENT) - - -def test_batched_input(ray_start, generated_inputs): - counter = Counter.remote() - counter._dispatch.remote(generated_inputs) - oids = [inp.result_object_id for inp in generated_inputs] - returned_query_ids = np.array(ray.get(oids)) - assert np.array_equal(returned_query_ids, np.arange(1, 11)) - - -def test_custom_method(ray_start, generated_inputs): - dummy = CustomCounter.remote() - dummy._dispatch.remote(generated_inputs) - oids = [inp.result_object_id for inp in generated_inputs] - returned_query_ids = np.array(ray.get(oids)) - assert np.array_equal(returned_query_ids, np.ones(10)) - - -def test_exception(ray_start): - adder = ScalerAdder.remote(INCREMENT) - query = SingleQuery("this can't be added with int", get_new_oid(), 10) - adder._dispatch.remote([query]) - with pytest.raises(ray.worker.RayTaskError): - ray.get(query.result_object_id) diff --git a/python/ray/experimental/serve/tests/test_deadline_router.py b/python/ray/experimental/serve/tests/test_deadline_router.py deleted file mode 100644 index d1d4d6769..000000000 --- a/python/ray/experimental/serve/tests/test_deadline_router.py +++ /dev/null @@ -1,91 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import time - -import numpy as np -import pytest - -import ray -from ray.experimental.serve.examples.adder import ScalerAdder, VectorizedAdder -from ray.experimental.serve.examples.halt import SleepCounter, SleepOnFirst -from ray.experimental.serve.object_id import unwrap -from ray.experimental.serve.router import DeadlineAwareRouter, start_router - - -@pytest.fixture(scope="module") -def router(): - # We need at least 5 workers so resource won't be oversubscribed - ray.init(num_cpus=5) - - # The following two blobs are equivalent - # - # handle = DeadlineAwareRouter.remote("DefaultTestRouter") - # ray.experimental.register_actor("DefaultTestRouter", handle) - # handle.start.remote() - # - # handle = start_router(DeadlineAwareRouter, "DefaultRouter") - handle = start_router(DeadlineAwareRouter, "DefaultRouter") - - handle.register_actor.remote( - "VAdder", VectorizedAdder, - init_kwargs={"scaler_increment": 1}) # init args - handle.register_actor.remote( - "SAdder", ScalerAdder, init_kwargs={"scaler_increment": 2}) - handle.register_actor.remote( - "SleepFirst", SleepOnFirst, init_kwargs={"sleep_time": 1}) - handle.register_actor.remote( - "SleepCounter", SleepCounter, max_batch_size=1) - - yield handle - - ray.shutdown() - - -@pytest.fixture -def now(): - return time.perf_counter() - - -def test_throw_assert(router: DeadlineAwareRouter, now: float): - try: - ray.get(router.call.remote("Action-Not-Exist", "input", now + 1)) - except ray.worker.RayTaskError as e: - assert "AssertionError" in e.traceback_str - - -def test_vector_adder(router: DeadlineAwareRouter, now: float): - result = unwrap(router.call.remote("VAdder", 42, now + 1)) - assert isinstance(result, ray.ObjectID) - assert ray.get(result) == 43 - - -def test_scaler_adder(router: DeadlineAwareRouter, now: float): - result = unwrap(router.call.remote("SAdder", 42, now + 1)) - assert isinstance(result, ray.ObjectID) - assert ray.get(result) == 44 - - -def test_batching_ability(router: DeadlineAwareRouter, now: float): - first = unwrap(router.call.remote("SleepFirst", 1, now + 1)) - rest = [ - unwrap(router.call.remote("SleepFirst", 1, now + 1)) for _ in range(10) - ] - assert ray.get(first) == 1 - assert np.alltrue(np.array(ray.get(rest)) == 10) - - -def test_deadline_priority(router: DeadlineAwareRouter, now: float): - # first sleep 2 seconds - first = unwrap(router.call.remote("SleepCounter", 2, now + 1)) - - # then send a request to with deadline farther away - second = unwrap(router.call.remote("SleepCounter", 0, now + 10)) - - # and a request with sooner deadline - third = unwrap(router.call.remote("SleepCounter", 0, now + 1)) - - id_1, id_2, id_3 = ray.get([first, second, third]) - - assert id_1 < id_3 < id_2 diff --git a/python/ray/experimental/serve/tests/test_default_app.py b/python/ray/experimental/serve/tests/test_default_app.py deleted file mode 100644 index 5eb758c5f..000000000 --- a/python/ray/experimental/serve/tests/test_default_app.py +++ /dev/null @@ -1,46 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import time - -import pytest -import requests - -import ray -from ray.experimental.serve import DeadlineAwareRouter -from ray.experimental.serve.examples.adder import VectorizedAdder -from ray.experimental.serve.frontend import HTTPFrontendActor -from ray.experimental.serve.router import start_router - -ROUTER_NAME = "DefaultRouter" -NUMBER_OF_TRIES = 5 - - -@pytest.fixture -def get_router(): - # We need this many workers so resource are not oversubscribed - ray.init(num_cpus=4) - router = start_router(DeadlineAwareRouter, ROUTER_NAME) - yield router - ray.shutdown() - - -def test_http_basic(get_router): - router = get_router - a = HTTPFrontendActor.remote(router=ROUTER_NAME) - a.start.remote() - - router.register_actor.remote( - "VAdder", VectorizedAdder, init_kwargs={"scaler_increment": 1}) - - for _ in range(NUMBER_OF_TRIES): - try: - url = "http://0.0.0.0:8080/VAdder" - payload = {"input": 10, "slo_ms": 1000} - resp = requests.request("POST", url, json=payload) - except Exception: - # it is possible that the actor is not yet instantiated - time.sleep(1) - - assert resp.json() == {"success": True, "actor": "VAdder", "result": 11} diff --git a/python/ray/experimental/serve/utils/__init__.py b/python/ray/experimental/serve/utils/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/ray/experimental/serve/utils/priority_queue.py b/python/ray/experimental/serve/utils/priority_queue.py deleted file mode 100644 index 05b7045b4..000000000 --- a/python/ray/experimental/serve/utils/priority_queue.py +++ /dev/null @@ -1,27 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import heapq - - -class PriorityQueue: - """A min-heap class wrapping heapq module.""" - - def __init__(self): - self.q = [] - - def push(self, item): - heapq.heappush(self.q, item) - - def pop(self): - return heapq.heappop(self.q) - - def try_pop(self): - if len(self.q) == 0: - return None - else: - return self.pop() - - def __len__(self): - return len(self.q)