mirror of
https://github.com/wassname/ray.git
synced 2026-07-04 10:46:52 +08:00
[serve] Make HTTP proxy fault tolerant (#7936)
This commit is contained in:
+2
-1
@@ -407,6 +407,7 @@ class ActorClass:
|
||||
resources=None,
|
||||
is_direct_call=None,
|
||||
max_concurrency=None,
|
||||
max_reconstructions=None,
|
||||
name=None,
|
||||
detached=False):
|
||||
"""Create an actor.
|
||||
@@ -545,7 +546,7 @@ class ActorClass:
|
||||
meta.language,
|
||||
meta.actor_creation_function_descriptor,
|
||||
creation_args,
|
||||
meta.max_reconstructions,
|
||||
max_reconstructions or meta.max_reconstructions,
|
||||
resources,
|
||||
actor_placement_resources,
|
||||
max_concurrency,
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import asyncio
|
||||
import socket
|
||||
|
||||
import uvicorn
|
||||
|
||||
import ray
|
||||
from ray.serve.constants import SERVE_MASTER_NAME
|
||||
from ray.serve.context import TaskContext
|
||||
from ray.serve.request_params import RequestMetadata
|
||||
from ray.serve.http_util import Response
|
||||
@@ -21,17 +23,13 @@ class HTTPProxy:
|
||||
|
||||
def __init__(self):
|
||||
assert ray.is_initialized()
|
||||
# Must be set via set_route_table.
|
||||
self.route_table = dict()
|
||||
# Must be set via set_router_handle.
|
||||
self.router_handle = None
|
||||
master = ray.util.get_actor(SERVE_MASTER_NAME)
|
||||
self.route_table, [self.router_handle] = ray.get(
|
||||
master.get_http_proxy_config.remote())
|
||||
|
||||
def set_route_table(self, route_table):
|
||||
self.route_table = route_table
|
||||
|
||||
def set_router_handle(self, router_handle):
|
||||
self.router_handle = router_handle
|
||||
|
||||
async def handle_lifespan_message(self, scope, receive, send):
|
||||
assert scope["type"] == "lifespan"
|
||||
|
||||
@@ -139,8 +137,6 @@ class HTTPProxy:
|
||||
absolute_slo_ms=absolute_slo_ms,
|
||||
call_method=headers.get("X-SERVE-CALL-METHOD".lower(), "__call__"))
|
||||
|
||||
assert self.route_table is not None, (
|
||||
"Router handle must be set via set_router_handle.")
|
||||
try:
|
||||
result = await self.router_handle.enqueue_request.remote(
|
||||
request_metadata, scope, http_body_bytes)
|
||||
@@ -152,13 +148,18 @@ class HTTPProxy:
|
||||
|
||||
@ray.remote
|
||||
class HTTPProxyActor:
|
||||
def __init__(self):
|
||||
def __init__(self, host, port):
|
||||
self.app = HTTPProxy()
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
async def run(self, host, port):
|
||||
# Start running the HTTP server on the event loop.
|
||||
asyncio.get_event_loop().create_task(self.run())
|
||||
|
||||
async def run(self):
|
||||
sock = socket.socket()
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind((host, port))
|
||||
sock.bind((self.host, self.port))
|
||||
sock.set_inheritable(True)
|
||||
|
||||
config = uvicorn.Config(self.app, lifespan="on", access_log=False)
|
||||
@@ -171,6 +172,3 @@ class HTTPProxyActor:
|
||||
|
||||
async def set_route_table(self, route_table):
|
||||
self.app.set_route_table(route_table)
|
||||
|
||||
async def set_router_handle(self, router_handle):
|
||||
self.app.set_router_handle(router_handle)
|
||||
|
||||
@@ -45,13 +45,23 @@ class ServeMaster:
|
||||
return [self.router]
|
||||
|
||||
def start_http_proxy(self, host, port):
|
||||
"""Start the HTTP proxy on the given host:port.
|
||||
|
||||
On startup (or restart), the HTTP proxy will fetch its config via
|
||||
get_http_proxy_config.
|
||||
"""
|
||||
assert self.http_proxy is None, "HTTP proxy already started."
|
||||
assert self.router is not None, (
|
||||
"Router must be started before HTTP proxy.")
|
||||
self.http_proxy = HTTPProxyActor.options(
|
||||
max_concurrency=ASYNC_CONCURRENCY).remote()
|
||||
self.http_proxy.run.remote(host, port)
|
||||
ray.get(self.http_proxy.set_router_handle.remote(self.router))
|
||||
max_concurrency=ASYNC_CONCURRENCY,
|
||||
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
|
||||
).remote(host, port)
|
||||
|
||||
async def get_http_proxy_config(self):
|
||||
route_table = self.route_table.list_service(
|
||||
include_methods=True, include_headless=False)
|
||||
return route_table, self.get_router()
|
||||
|
||||
def get_http_proxy(self):
|
||||
assert self.http_proxy is not None, "HTTP proxy not started yet."
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
import time
|
||||
import requests
|
||||
|
||||
from ray import serve
|
||||
import ray
|
||||
|
||||
|
||||
def _kill_http_proxy():
|
||||
[http_proxy] = ray.get(
|
||||
serve.api._get_master_actor().get_http_proxy.remote())
|
||||
ray.kill(http_proxy)
|
||||
|
||||
|
||||
def request_with_retries(endpoint, verify_response, timeout=30):
|
||||
start = time.time()
|
||||
while True:
|
||||
try:
|
||||
verify_response(requests.get("http://127.0.0.1:8000" + endpoint))
|
||||
break
|
||||
except requests.RequestException:
|
||||
if time.time() - start > timeout:
|
||||
raise TimeoutError
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def test_http_proxy_failure(serve_instance):
|
||||
serve.init()
|
||||
serve.create_endpoint(
|
||||
"failure_endpoint", "/failure_endpoint", methods=["GET"])
|
||||
|
||||
def function(flask_request):
|
||||
return "hello1"
|
||||
|
||||
serve.create_backend(function, "failure:v1")
|
||||
serve.link("failure_endpoint", "failure:v1")
|
||||
|
||||
def verify_response(response):
|
||||
assert response.text == "hello1"
|
||||
|
||||
request_with_retries("/failure_endpoint", verify_response, timeout=0)
|
||||
|
||||
_kill_http_proxy()
|
||||
|
||||
request_with_retries("/failure_endpoint", verify_response, timeout=30)
|
||||
|
||||
_kill_http_proxy()
|
||||
|
||||
def function(flask_request):
|
||||
return "hello2"
|
||||
|
||||
serve.create_backend(function, "failure:v2")
|
||||
serve.link("failure_endpoint", "failure:v2")
|
||||
|
||||
def verify_response(response):
|
||||
assert response.text == "hello2"
|
||||
|
||||
request_with_retries("/failure_endpoint", verify_response, timeout=30)
|
||||
Reference in New Issue
Block a user