[Serve] Allow multiple HTTP servers. (#9523)

This commit is contained in:
Simon Mo
2020-07-24 12:41:20 -07:00
committed by GitHub
parent 590943a499
commit a078a21437
9 changed files with 200 additions and 83 deletions
+8
View File
@@ -99,6 +99,14 @@ py_test(
)
py_test(
name = "test_scaling",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive"],
deps = [":serve_lib"],
)
py_test(
name = "test_util",
size = "small",
+28 -16
View File
@@ -55,10 +55,12 @@ def accept_batch(f):
return f
def init(name=None,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
metric_exporter=InMemoryExporter):
def init(
name=None,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
metric_exporter=InMemoryExporter,
):
"""Initialize or connect to a serve cluster.
If serve cluster is already initialized, this function will just return.
@@ -71,11 +73,12 @@ def init(name=None,
name (str): A unique name for this serve instance. This allows
multiple serve instances to run on the same ray cluster. Must be
specified in all subsequent serve.init() calls.
http_host (str): Host for HTTP server. Default to "0.0.0.0".
http_port (int): Port for HTTP server. Default to 8000.
http_host (str): Host for HTTP servers. Default to "0.0.0.0". Serve
starts one HTTP server per node in the Ray cluster.
http_port (int, List[int]): Port for HTTP server. Default to 8000.
metric_exporter(ExporterInterface): The class aggregates metrics from
all RayServe actors and optionally export them to external
services. RayServe has two options built in: InMemoryExporter and
services. Ray Serve has two options built in: InMemoryExporter and
PrometheusExporter
"""
if name is not None and not isinstance(name, str):
@@ -94,19 +97,27 @@ def init(name=None,
except ValueError:
pass
# TODO(edoakes): for now, always start the HTTP proxy on the node that
# serve.init() was run on. We should consider making this configurable
# in the future.
http_node_id = ray.state.current_node_id()
controller = ServeController.options(
name=controller_name,
max_restarts=-1,
max_task_retries=-1,
).remote(name, http_node_id, http_host, http_port, metric_exporter)
).remote(
name,
http_host,
http_port,
metric_exporter,
)
block_until_http_ready(
"http://{}:{}/-/routes".format(http_host, http_port),
timeout=HTTP_PROXY_TIMEOUT)
futures = []
for node_id in ray.state.node_ids():
future = block_until_http_ready.options(
num_cpus=0, resources={
node_id: 0.01
}).remote(
"http://{}:{}/-/routes".format(http_host, http_port),
timeout=HTTP_PROXY_TIMEOUT)
futures.append(future)
ray.get(futures)
@_ensure_connected
@@ -122,6 +133,7 @@ def shutdown():
controller = None
@_ensure_connected
def create_endpoint(endpoint_name,
*,
backend=None,
@@ -356,7 +368,7 @@ def get_handle(endpoint_name,
assert endpoint_name in ray.get(controller.get_all_endpoints.remote())
return RayServeHandle(
ray.get(controller.get_http_proxy.remote())[0],
ray.get(controller.get_router.remote())[0],
endpoint_name,
relative_slo_ms,
absolute_slo_ms,
+96 -55
View File
@@ -1,5 +1,6 @@
import asyncio
from collections import defaultdict, namedtuple
from itertools import groupby
import os
import random
import time
@@ -87,8 +88,8 @@ class ServeController:
requires all implementations here to be idempotent.
"""
async def __init__(self, instance_name, http_node_id, http_proxy_host,
http_proxy_port, metric_exporter_class):
async def __init__(self, instance_name, http_proxy_host, http_proxy_port,
metric_exporter_class):
# Unique name of the serve instance managed by this actor. Used to
# namespace child actors and checkpoints.
self.instance_name = instance_name
@@ -121,15 +122,13 @@ class ServeController:
self.write_lock = asyncio.Lock()
# Cached handles to actors in the system.
self.router = None
self.http_proxy = None
self.routers = []
self.metric_exporter = None
# If starting the actor for the first time, starts up the other system
# components. If recovering, fetches their actor handles.
self._get_or_start_metric_exporter(metric_exporter_class)
self._get_or_start_http_proxy(http_node_id, http_proxy_host,
http_proxy_port)
self._get_or_start_routers(http_proxy_host, http_proxy_port)
# NOTE(edoakes): unfortunately, we can't completely recover from a
# checkpoint in the constructor because we block while waiting for
@@ -151,40 +150,42 @@ class ServeController:
asyncio.get_event_loop().create_task(
self._recover_from_checkpoint(checkpoint))
def _get_or_start_http_proxy(self, node_id, host, port):
def _get_or_start_routers(self, host, port):
"""Get the HTTP proxy belonging to this serve instance.
If the HTTP proxy does not already exist, it will be started.
"""
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.instance_name)
try:
self.http_proxy = ray.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}'".format(
proxy_name, node_id))
self.http_proxy = HTTPProxyActor.options(
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
max_task_retries=-1,
resources={
node_id: 0.01
},
).remote(
host, port, instance_name=self.instance_name)
# TODO(simon): We don't handle nodes being added/removed. To do that,
# we should implement some sort of control loop in master actor.
for _, node_id_group in groupby(sorted(ray.state.node_ids())):
for index, node_id in enumerate(node_id_group):
proxy_name = format_actor_name(SERVE_PROXY_NAME,
self.instance_name)
proxy_name += "-{}-{}".format(node_id, index)
try:
router = ray.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}' "
"listening on port {}".format(proxy_name, node_id,
port))
router = HTTPProxyActor.options(
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
max_task_retries=-1,
resources={
node_id: 0.01
},
).remote(
host, port, instance_name=self.instance_name)
self.routers.append(router)
# Since router is a merged with HTTP proxy actor, the router will be
# proxied via the HTTP actor. Even though the two variable names are
# pointing to the same object, their semantic differences make the code
# more readable. (e.g. http_proxy.set_route_table, router.add_worker)
self.router = self.http_proxy
def get_http_proxy(self):
def get_router(self):
"""Returns a handle to the HTTP proxy managed by this actor."""
return [self.http_proxy]
return self.routers
def get_http_proxy_config(self):
def get_router_config(self):
"""Called by the HTTP proxy on startup to fetch required state."""
return self.routes
@@ -267,20 +268,31 @@ class ServeController:
# Push configuration state to the router.
# TODO(edoakes): should we make this a pull-only model for simplicity?
for endpoint, traffic_policy in self.traffic_policies.items():
await self.router.set_traffic.remote(endpoint, traffic_policy)
await asyncio.gather(*[
router.set_traffic.remote(endpoint, traffic_policy)
for router in self.routers
])
for backend_tag, replica_dict in self.workers.items():
for replica_tag, worker in replica_dict.items():
await self.router.add_new_worker.remote(
backend_tag, replica_tag, worker)
await asyncio.gather(*[
router.add_new_worker.remote(backend_tag, replica_tag,
worker)
for router in self.routers
])
for backend, info in self.backends.items():
await self.router.set_backend_config.remote(
backend, info.backend_config)
await asyncio.gather(*[
router.set_backend_config.remote(backend, info.backend_config)
for router in self.routers
])
await self.broadcast_backend_config(backend)
# Push configuration state to the HTTP proxy.
await self.http_proxy.set_route_table.remote(self.routes)
await asyncio.gather(*[
router.set_route_table.remote(self.routes)
for router in self.routers
])
# Start/stop any pending backend replicas.
await self._start_pending_replicas()
@@ -353,8 +365,11 @@ class ServeController:
self.workers[backend_tag][replica_tag] = worker_handle
# Register the worker with the router.
await self.router.add_new_worker.remote(backend_tag, replica_tag,
worker_handle)
await asyncio.gather(*[
router.add_new_worker.remote(backend_tag, replica_tag,
worker_handle)
for router in self.routers
])
async def _start_pending_replicas(self):
"""Starts the pending backend replicas in self.replicas_to_start.
@@ -392,8 +407,10 @@ class ServeController:
continue
# Remove the replica from router. This call is idempotent.
await self.router.remove_worker.remote(backend_tag,
replica_tag)
await asyncio.gather(*[
router.remove_worker.remote(backend_tag, replica_tag)
for router in self.routers
])
# TODO(edoakes): this logic isn't ideal because there may be
# pending tasks still executing on the replica. However, if we
@@ -410,7 +427,10 @@ class ServeController:
Clears self.backends_to_remove.
"""
for backend_tag in self.backends_to_remove:
await self.router.remove_backend.remote(backend_tag)
await asyncio.gather(*[
router.remove_backend.remote(backend_tag)
for router in self.routers
])
self.backends_to_remove.clear()
async def _remove_pending_endpoints(self):
@@ -419,7 +439,10 @@ class ServeController:
Clears self.endpoints_to_remove.
"""
for endpoint_tag in self.endpoints_to_remove:
await self.router.remove_endpoint.remote(endpoint_tag)
await asyncio.gather(*[
router.remove_endpoint.remote(endpoint_tag)
for router in self.routers
])
self.endpoints_to_remove.clear()
def _scale_replicas(self, backend_tag, num_replicas):
@@ -533,7 +556,10 @@ class ServeController:
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
await self.router.set_traffic.remote(endpoint_name, traffic_policy)
await asyncio.gather(*[
router.set_traffic.remote(endpoint_name, traffic_policy)
for router in self.routers
])
async def set_traffic(self, endpoint_name, traffic_dict):
"""Sets the traffic policy for the specified endpoint."""
@@ -560,8 +586,12 @@ class ServeController:
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
await self.router.set_traffic.remote(
endpoint_name, self.traffic_policies[endpoint_name])
await asyncio.gather(*[
router.set_traffic.remote(
endpoint_name,
self.traffic_policies[endpoint_name],
) for router in self.routers
])
async def create_endpoint(self, endpoint, traffic_dict, route, methods):
"""Create a new endpoint with the specified route and methods.
@@ -600,7 +630,10 @@ class ServeController:
# NOTE(edoakes): checkpoint is written in self._set_traffic.
await self._set_traffic(endpoint, traffic_dict)
await self.http_proxy.set_route_table.remote(self.routes)
await asyncio.gather(*[
router.set_route_table.remote(self.routes)
for router in self.routers
])
async def delete_endpoint(self, endpoint):
"""Delete the specified endpoint.
@@ -635,7 +668,10 @@ class ServeController:
# Update the HTTP proxy first to ensure no new requests for the
# endpoint are sent to the router.
await self.http_proxy.set_route_table.remote(self.routes)
await asyncio.gather(*[
router.set_route_table.remote(self.routes)
for router in self.routers
])
await self._remove_pending_endpoints()
async def create_backend(self, backend_tag, backend_config,
@@ -660,8 +696,10 @@ class ServeController:
# Set the backend config inside the router
# (particularly for max-batch-size).
await self.router.set_backend_config.remote(
backend_tag, backend_config)
await asyncio.gather(*[
router.set_backend_config.remote(backend_tag, backend_config)
for router in self.routers
])
await self.broadcast_backend_config(backend_tag)
async def delete_backend(self, backend_tag):
@@ -717,8 +755,10 @@ class ServeController:
# Inform the router about change in configuration
# (particularly for setting max_batch_size).
await self.router.set_backend_config.remote(
backend_tag, backend_config)
await asyncio.gather(*[
router.set_backend_config.remote(backend_tag, backend_config)
for router in self.routers
])
await self._start_pending_replicas()
await self._stop_pending_replicas()
@@ -748,7 +788,8 @@ class ServeController:
async def shutdown(self):
"""Shuts down the serve instance completely."""
async with self.write_lock:
ray.kill(self.http_proxy, no_restart=True)
for router in self.routers:
ray.kill(router, no_restart=True)
ray.kill(self.metric_exporter, no_restart=True)
for replica_dict in self.workers.values():
for replica in replica_dict.values():
+15 -2
View File
@@ -1,5 +1,6 @@
import asyncio
from urllib.parse import parse_qs
import socket
import uvicorn
@@ -30,7 +31,7 @@ class HTTPProxy:
assert ray.is_initialized()
controller = serve.api._get_controller()
self.route_table = await controller.get_http_proxy_config.remote()
self.route_table = await controller.get_router_config.remote()
# The exporter is required to return results for /-/metrics endpoint.
[self.metric_exporter] = await controller.get_metric_exporter.remote()
@@ -179,7 +180,19 @@ class HTTPProxyActor:
# Start running the HTTP server on the event loop.
asyncio.get_event_loop().create_task(self.run())
def ready(self):
return True
async def run(self):
sock = socket.socket()
# These two socket options will allow multiple process to bind the the
# same port. Kernel will evenly load balance among the port listeners.
# Note: this will only work on Linux.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind((self.host, self.port))
# Note(simon): we have to use lower level uvicorn Config and Server
# class because we want to run the server as a coroutine. The only
# alternative is to call uvicorn.run which is blocking.
@@ -194,7 +207,7 @@ class HTTPProxyActor:
# because the existing implementation fails if it isn't running in
# the main thread and uvicorn doesn't expose a way to configure it.
server.install_signal_handlers = lambda: None
await server.serve()
await server.serve(sockets=[sock])
async def set_route_table(self, route_table):
self.app.set_route_table(route_table)
-3
View File
@@ -170,9 +170,6 @@ class Router:
"from backend."),
label_names=("backend", ))
def is_ready(self):
return True
async def enqueue_request(self, request_meta, *request_args,
**request_kwargs):
endpoint = request_meta.endpoint
+5 -4
View File
@@ -75,9 +75,10 @@ def test_controller_failure(serve_instance):
assert response.text == "hello3"
def _kill_http_proxy():
[http_proxy] = ray.get(serve.api._get_controller().get_http_proxy.remote())
ray.kill(http_proxy, no_restart=False)
def _kill_routers():
routers = ray.get(serve.api._get_controller().get_router.remote())
for router in routers:
ray.kill(router, no_restart=False)
def test_http_proxy_failure(serve_instance):
@@ -96,7 +97,7 @@ def test_http_proxy_failure(serve_instance):
response = request_with_retries("/proxy_failure", timeout=30)
assert response.text == "hello1"
_kill_http_proxy()
_kill_routers()
def function():
return "hello2"
+45
View File
@@ -0,0 +1,45 @@
import sys
import socket
import pytest
import ray
from ray import serve
from ray.serve.constants import SERVE_PROXY_NAME
from ray.serve.utils import block_until_http_ready
from ray.cluster_utils import Cluster
@pytest.mark.skipif(
not hasattr(socket, "SO_REUSEPORT"),
reason=("Port sharing only works on newer verion of Linux. "
"This test can only be ran when port sharing is supported."))
def test_multiple_routers():
cluster = Cluster()
head_node = cluster.add_node()
cluster.add_node()
ray.init(head_node.address)
node_ids = ray.state.node_ids()
assert len(node_ids) == 2
serve.init(http_port=8005)
# two actors should be started
head_http = ray.get_actor(SERVE_PROXY_NAME +
"-{}-{}".format(node_ids[0], 0))
ray.get_actor(SERVE_PROXY_NAME + "-{}-{}".format(node_ids[0], 1))
# wait for the actors to come up
ray.get(block_until_http_ready.remote("http://127.0.0.1:8005/-/routes"))
# kill the head_http server, the HTTP server should still functions
ray.kill(head_http, no_restart=True)
ray.get(block_until_http_ready.remote("http://127.0.0.1:8005/-/routes"))
# cleanup the nodes (otherwise Ray will segfault)
ray.shutdown()
cluster.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
+1
View File
@@ -78,6 +78,7 @@ class ServeEncoder(json.JSONEncoder):
return super().default(o)
@ray.remote(num_cpus=0)
def block_until_http_ready(http_endpoint,
backoff_time_s=1,
timeout=HTTP_PROXY_TIMEOUT):