diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index 8fa312812..9cee9ff58 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -36,7 +36,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout networkx tabulate psutil aiohttp \ - uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures + uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures blist elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget -q https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv @@ -52,7 +52,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout networkx tabulate psutil aiohttp \ - uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures + uvicorn dataclasses pygments werkzeug kubernetes flask grpcio pytest-sugar pytest-rerunfailures blist elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y build-essential curl unzip diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index b8ee6b358..b6af5a5d2 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -23,3 +23,4 @@ flask uvicorn pygments werkzeug +blist \ No newline at end of file diff --git a/python/ray/experimental/serve/examples/echo_slo_reverse.py b/python/ray/experimental/serve/examples/echo_slo_reverse.py new file mode 100644 index 000000000..e5791aab9 --- /dev/null +++ b/python/ray/experimental/serve/examples/echo_slo_reverse.py @@ -0,0 +1,64 @@ +""" +SLO [reverse] example of ray.serve module +""" + +import time + +import requests + +import ray +import ray.experimental.serve as serve + +# initialize ray serve system. +# blocking=True will wait for HTTP server to be ready to serve request. +serve.init(blocking=True) + +# an endpoint is associated with an http URL. +serve.create_endpoint("my_endpoint", "/echo") + + +# a backend can be a function or class. +# it can be made to be invoked from web as well as python. +def echo_v1(flask_request, response="hello from python!"): + if serve.context.web: + response = flask_request.url + return response + + +serve.create_backend(echo_v1, "echo:v1") +serve.link("my_endpoint", "echo:v1") + +# wait for routing table to get populated +time.sleep(2) + +# slo (10 milliseconds deadline) can be specified via http +slo_ms = 10.0 +print("> [HTTP] Pinging http://127.0.0.1:8000/echo?slo_ms={}".format(slo_ms)) +print( + requests.get("http://127.0.0.1:8000/echo?slo_ms={}".format(slo_ms)).json()) + +# get the handle of the endpoint +handle = serve.get_handle("my_endpoint") + +future_list = [] + +# fire 10 requests with slo's in the (almost) reverse order of the order in +# which remote procedure call is done +for r in range(10): + slo_ms = 1000 - 100 * r + response = "hello from request: {} slo: {}".format(r, slo_ms) + print("> [REMOTE] Pinging handle.remote(response='{}',slo_ms={})".format( + response, slo_ms)) + # slo can be specified via remote function + f = handle.remote(response=response, slo_ms=slo_ms) + future_list.append(f) + +# get results of queries as they complete +# should be completed (almost) according to the order of their slo time +left_futures = future_list +while left_futures: + completed_futures, remaining_futures = ray.wait(left_futures, timeout=0.05) + if len(completed_futures) > 0: + result = ray.get(completed_futures[0]) + print(result) + left_futures = remaining_futures diff --git a/python/ray/experimental/serve/handle.py b/python/ray/experimental/serve/handle.py index 6102f8b0e..b4cd2eedf 100644 --- a/python/ray/experimental/serve/handle.py +++ b/python/ray/experimental/serve/handle.py @@ -36,12 +36,25 @@ class RayServeHandle: raise RayServeException( "handle.remote must be invoked with keyword arguments.") + # get slo_ms before enqueuing the query + request_slo_ms = kwargs.pop("slo_ms", None) + if request_slo_ms is not None: + try: + request_slo_ms = float(request_slo_ms) + if request_slo_ms < 0: + raise ValueError( + "Request SLO must be positive, it is {}".format( + request_slo_ms)) + except ValueError as e: + raise RayServeException(str(e)) + result_object_id_bytes = ray.get( self.router_handle.enqueue_request.remote( service=self.endpoint_name, request_args=(), request_kwargs=kwargs, - request_context=TaskContext.Python)) + request_context=TaskContext.Python, + request_slo_ms=request_slo_ms)) return ray.ObjectID(result_object_id_bytes) def get_traffic_policy(self): diff --git a/python/ray/experimental/serve/queues.py b/python/ray/experimental/serve/queues.py index 413c2123a..023c554e5 100644 --- a/python/ray/experimental/serve/queues.py +++ b/python/ray/experimental/serve/queues.py @@ -4,6 +4,8 @@ import numpy as np import ray from ray.experimental.serve.utils import logger +from blist import sortedlist +import time class Query: @@ -11,6 +13,7 @@ class Query: request_args, request_kwargs, request_context, + request_slo_ms, result_object_id=None): self.request_args = request_args self.request_kwargs = request_kwargs @@ -21,6 +24,15 @@ class Query: else: self.result_object_id = result_object_id + # Service level objective in milliseconds. This is expected to be the + # absolute time since unix epoch. + self.request_slo_ms = request_slo_ms + + # adding comparator fn for maintaining an + # ascending order sorted list w.r.t request_slo_ms + def __lt__(self, other): + return self.request_slo_ms < other.request_slo_ms + class WorkIntent: def __init__(self, replica_handle): @@ -74,7 +86,14 @@ class CentralizedQueues: self.workers = defaultdict(deque) # backend_name -> worker payload queue - self.buffer_queues = defaultdict(deque) + # using blist sortedlist for deadline awareness + # blist is chosen because: + # 1. pop operation should be O(1) (amortized) + # (helpful even for batched pop) + # 2. There should not be significant overhead in + # maintaining the sorted list. + # 3. The blist implementation is fast and uses C extensions. + self.buffer_queues = defaultdict(sortedlist) def is_ready(self): return True @@ -88,9 +107,23 @@ class CentralizedQueues: for backend_name, queue in self.buffer_queues.items() } - def enqueue_request(self, service, request_args, request_kwargs, - request_context): - query = Query(request_args, request_kwargs, request_context) + # request_slo_ms is time specified in milliseconds till which the + # answer of the query should be calculated + def enqueue_request(self, + service, + request_args, + request_kwargs, + request_context, + request_slo_ms=None): + if request_slo_ms is None: + # if request_slo_ms is not specified then set it to a high level + request_slo_ms = 1e9 + + # add wall clock time to specify the deadline for completion of query + # this also assures FIFO behaviour if request_slo_ms is not specified + request_slo_ms += (time.time() * 1000) + query = Query(request_args, request_kwargs, request_context, + request_slo_ms) self.queues[service].append(query) self.flush() return query.result_object_id.binary() @@ -147,11 +180,15 @@ class CentralizedQueues: while len(queue) and len(self.traffic[service]): backend_names = list(self.traffic[service].keys()) backend_weights = list(self.traffic[service].values()) + # TODO(alind): is random choice good for deadline awareness? + # putting query in a buffer of a non available backend may + # not be good chosen_backend = np.random.choice( backend_names, p=backend_weights).squeeze() request = queue.popleft() - self.buffer_queues[chosen_backend].append(request) + # maintain a sorted list in the buffer queue of the backend + self.buffer_queues[chosen_backend].add(request) # distach buffer queues to work queues for service in self.queues.keys(): @@ -165,7 +202,7 @@ class CentralizedQueues: work_queue = self.workers[backend] while len(buffer_queue) and len(work_queue): request, work = ( - buffer_queue.popleft(), + buffer_queue.pop(0), work_queue.popleft(), ) work.replica_handle._ray_serve_call.remote(request) diff --git a/python/ray/experimental/serve/server.py b/python/ray/experimental/serve/server.py index 30f25b303..5a4e97476 100644 --- a/python/ray/experimental/serve/server.py +++ b/python/ray/experimental/serve/server.py @@ -8,6 +8,7 @@ from ray.experimental.async_api import _async_init, as_future from ray.experimental.serve.constants import HTTP_ROUTER_CHECKER_INTERVAL_S from ray.experimental.serve.context import TaskContext from ray.experimental.serve.utils import BytesEncoder +from urllib.parse import parse_qs class JSONResponse: @@ -128,13 +129,33 @@ class HTTPProxy: endpoint_name = self.route_table_cache[current_path] http_body_bytes = await self.receive_http_body(scope, receive, send) + # get slo_ms before enqueuing the query + query_string = scope["query_string"].decode("ascii") + query_kwargs = parse_qs(query_string) + request_slo_ms = query_kwargs.pop("slo_ms", None) + if request_slo_ms is not None: + try: + if len(request_slo_ms) != 1: + raise ValueError( + "Multiple SLO specified, please specific only one.") + request_slo_ms = request_slo_ms[0] + request_slo_ms = float(request_slo_ms) + if request_slo_ms < 0: + raise ValueError( + "Request SLO must be positive, it is {}".format( + request_slo_ms)) + except ValueError as e: + await JSONResponse({"error": str(e)})(scope, receive, send) + return + result_object_id_bytes = await as_future( self.serve_global_state.init_or_get_router() .enqueue_request.remote( service=endpoint_name, request_args=(scope, http_body_bytes), request_kwargs=dict(), - request_context=TaskContext.Web)) + request_context=TaskContext.Web, + request_slo_ms=request_slo_ms)) result = await as_future(ray.ObjectID(result_object_id_bytes)) diff --git a/python/ray/experimental/serve/tests/test_queue.py b/python/ray/experimental/serve/tests/test_queue.py index c16558463..9d74da089 100644 --- a/python/ray/experimental/serve/tests/test_queue.py +++ b/python/ray/experimental/serve/tests/test_queue.py @@ -34,6 +34,19 @@ def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): assert ray.get(ray.ObjectID(result_object_id)) == 2 +def test_slo(serve_instance, task_runner_mock_actor): + q = CentralizedQueues() + q.link("svc", "backend") + + for i in range(10): + slo_ms = 1000 - 100 * i + q.enqueue_request("svc", i, "kwargs", None, request_slo_ms=slo_ms) + for i in range(10): + q.dequeue_request("backend", task_runner_mock_actor) + got_work = ray.get(task_runner_mock_actor.get_recent_call.remote()) + assert got_work.request_args == (9 - i) + + def test_alter_backend(serve_instance, task_runner_mock_actor): q = CentralizedQueues() diff --git a/python/setup.py b/python/setup.py index 2f172b766..153c9784c 100644 --- a/python/setup.py +++ b/python/setup.py @@ -79,7 +79,7 @@ extras = { ], "debug": ["psutil", "setproctitle", "py-spy >= 0.2.0"], "dashboard": ["aiohttp", "google", "grpcio", "psutil", "setproctitle"], - "serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas"], + "serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas", "blist"], "tune": ["tabulate"], }