diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c62498021..0953b02a2 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -82,7 +82,8 @@ from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor import ray -from ray.async_compat import (sync_to_async, AsyncGetResponse) +from ray.async_compat import ( + sync_to_async, AsyncGetResponse, get_new_event_loop) import ray.memory_monitor as memory_monitor import ray.ray_constants as ray_constants from ray import profiling @@ -1187,7 +1188,7 @@ cdef class CoreWorker: def create_or_get_event_loop(self): if self.async_event_loop is None: - self.async_event_loop = asyncio.new_event_loop() + self.async_event_loop = get_new_event_loop() asyncio.set_event_loop(self.async_event_loop) # Initialize the async plasma connection. # Delayed import due to async_api depends on _raylet. diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index 1428e765c..ff96c20ae 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -7,9 +7,22 @@ from collections import namedtuple import time import inspect +try: + import uvloop +except ImportError: + uvloop = None + import ray +def get_new_event_loop(): + """Construct a new event loop. Ray will use uvloop if it exists""" + if uvloop: + return uvloop.new_event_loop() + else: + return asyncio.new_event_loop() + + def sync_to_async(func): """Convert a blocking function to async function""" diff --git a/python/ray/serve/benchmarks/baselines/Makefile b/python/ray/serve/benchmarks/baselines/Makefile new file mode 100644 index 000000000..ada0a0f0c --- /dev/null +++ b/python/ray/serve/benchmarks/baselines/Makefile @@ -0,0 +1,12 @@ +noop: + @echo "please specify which baseline to run" + +uvicorn: + uvicorn uvicorn_app:app --no-access-log --workers 1 + +fastapi: + uvicorn fastapi_app:app --no-access-log --workers 1 + +bench: + wrk -c 100 -t 10 -d 10s http://127.0.0.1:8000 + diff --git a/python/ray/serve/benchmarks/baselines/fastapi_app.py b/python/ray/serve/benchmarks/baselines/fastapi_app.py new file mode 100644 index 000000000..ae0a2a2a5 --- /dev/null +++ b/python/ray/serve/benchmarks/baselines/fastapi_app.py @@ -0,0 +1,8 @@ +from fastapi import FastAPI + +app = FastAPI() + + +@app.get("/") +async def read_root(): + return "Hello world" diff --git a/python/ray/serve/benchmarks/baselines/uvicorn_app.py b/python/ray/serve/benchmarks/baselines/uvicorn_app.py new file mode 100644 index 000000000..674fac79d --- /dev/null +++ b/python/ray/serve/benchmarks/baselines/uvicorn_app.py @@ -0,0 +1,13 @@ +async def app(scope, receive, send): + assert scope["type"] == "http" + await send({ + "type": "http.response.start", + "status": 200, + "headers": [ + [b"content-type", b"text/plain"], + ] + }) + await send({ + "type": "http.response.body", + "body": b"Hello, world!", + }) diff --git a/python/ray/serve/benchmarks/noop_latency.py b/python/ray/serve/benchmarks/noop_latency.py index 3c70cef10..4d9146287 100644 --- a/python/ray/serve/benchmarks/noop_latency.py +++ b/python/ray/serve/benchmarks/noop_latency.py @@ -1,35 +1,70 @@ -from ray import serve -from ray.serve.constants import DEFAULT_HTTP_ADDRESS -import requests import time +from typing import Optional + +import requests import pandas as pd from tqdm import tqdm +import click -serve.init() +from ray import serve +from ray.serve.constants import DEFAULT_HTTP_ADDRESS +from ray.serve import master + +master._TRACING_ENABLED = True -def noop(_): - return "" +def block_until_ready(url): + while requests.get(url).status_code == 404: + time.sleep(1) + print("Waiting for noop route to showup.") -serve.create_backend("noop", noop) -serve.create_endpoint("noop", backend="noop", route="/noop") +def run_http_benchmark(url, num_queries): + latency = [] + for _ in tqdm(range(num_queries + 200)): + start = time.perf_counter() + requests.get(url) + end = time.perf_counter() + latency.append(end - start) -url = "{}/noop".format(DEFAULT_HTTP_ADDRESS) -while requests.get(url).status_code == 404: - time.sleep(1) - print("Waiting for noop route to showup.") + # Remove initial samples + latency = latency[200:] -latency = [] -for _ in tqdm(range(5200)): - start = time.perf_counter() - resp = requests.get(url) - end = time.perf_counter() - latency.append(end - start) + series = pd.Series(latency) * 1000 + print("Latency for single noop backend (ms)") + print(series.describe(percentiles=[0.5, 0.9, 0.95, 0.99])) -# Remove initial samples -latency = latency[200:] -series = pd.Series(latency) * 1000 -print("Latency for single noop backend (ms)") -print(series.describe(percentiles=[0.5, 0.9, 0.95, 0.99])) +@click.command() +@click.option("--blocking", is_flag=True, required=False, help="Block forever") +@click.option("--num-queries", type=int, required=False) +@click.option("--num-replicas", type=int, default=1) +@click.option("--max-concurrent-queries", type=int, required=False) +def main(num_replicas: int, num_queries: Optional[int], + max_concurrent_queries: Optional[int], blocking: bool): + serve.init() + + def noop(_): + return "hello world" + + config = { + "num_replicas": num_replicas, + "max_concurrent_queries": max_concurrent_queries + } + print("Using config", config) + serve.create_backend("noop", noop, config=config) + serve.create_endpoint("noop", backend="noop", route="/noop") + + url = "{}/noop".format(DEFAULT_HTTP_ADDRESS) + block_until_ready(url) + + if num_queries: + run_http_benchmark(url, num_queries) + if blocking: + print("Endpoint {} is ready.".format(url)) + while True: + time.sleep(5) + + +if __name__ == "__main__": + main() diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 55821fb60..d236d1d66 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -1,5 +1,4 @@ import asyncio -import socket import uvicorn @@ -46,15 +45,6 @@ class HTTPProxy: def set_route_table(self, route_table): self.route_table = route_table - async def handle_lifespan_message(self, scope, receive, send): - assert scope["type"] == "lifespan" - - message = await receive() - if message["type"] == "lifespan.startup": - await send({"type": "lifespan.startup.complete"}) - elif message["type"] == "lifespan.shutdown": - await send({"type": "lifespan.shutdown.complete"}) - async def receive_http_body(self, scope, receive, send): body_buffer = [] more_body = True @@ -116,10 +106,6 @@ class HTTPProxy: # NOTE: This implements ASGI protocol specified in # https://asgi.readthedocs.io/en/latest/specs/index.html - if scope["type"] == "lifespan": - await self.handle_lifespan_message(scope, receive, send) - return - error_sender = self._make_error_sender(scope, receive, send) assert self.route_table is not None, ( @@ -202,18 +188,21 @@ class HTTPProxyActor: asyncio.get_event_loop().create_task(self.run()) async def run(self): - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((self.host, self.port)) - sock.set_inheritable(True) - - config = uvicorn.Config(self.app, lifespan="on", access_log=False) + # Note(simon): we have to use lower level uvicorn Config and Server + # class because we want to run the server as a coroutine. The only + # alternative is to call uvicorn.run which is blocking. + config = uvicorn.Config( + self.app, + host=self.host, + port=self.port, + lifespan="off", + access_log=False) server = uvicorn.Server(config=config) # TODO(edoakes): we need to override install_signal_handlers here # because the existing implementation fails if it isn't running in # the main thread and uvicorn doesn't expose a way to configure it. server.install_signal_handlers = lambda: None - await server.serve(sockets=[sock]) + await server.serve() async def set_route_table(self, route_table): self.app.set_route_table(route_table) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index dad1fd7da..81b8db028 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -559,7 +559,7 @@ def test_shutdown(serve_instance): pass instance_name = "shutdown" - serve.init(name=instance_name, http_port=8002) + serve.init(name=instance_name, http_port=8003) serve.create_backend("backend", f) serve.create_endpoint("endpoint", backend="backend")