diff --git a/ci/long_running_tests/workloads/serve_failure.py b/ci/long_running_tests/workloads/serve_failure.py index 29b7b1c3c..406d35e55 100644 --- a/ci/long_running_tests/workloads/serve_failure.py +++ b/ci/long_running_tests/workloads/serve_failure.py @@ -38,9 +38,8 @@ class RandomKiller: def _get_all_serve_actors(self): master = serve.api._get_controller() - [router] = ray.get(master.get_router.remote()) - [http_proxy] = ray.get(master.get_http_proxy.remote()) - all_handles = [master, router, http_proxy] + routers = ray.get(master.get_router.remote()) + all_handles = routers + [master] worker_handle_dict = ray.get(master.get_all_worker_handles.remote()) for _, replica_dict in worker_handle_dict.items(): all_handles.extend(list(replica_dict.values())) diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index 56e25b7b6..360feda86 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -99,6 +99,14 @@ py_test( ) +py_test( + name = "test_scaling", + size = "small", + srcs = serve_tests_srcs, + tags = ["exclusive"], + deps = [":serve_lib"], +) + py_test( name = "test_util", size = "small", diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 44d7da155..6755134b6 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -55,10 +55,12 @@ def accept_batch(f): return f -def init(name=None, - http_host=DEFAULT_HTTP_HOST, - http_port=DEFAULT_HTTP_PORT, - metric_exporter=InMemoryExporter): +def init( + name=None, + http_host=DEFAULT_HTTP_HOST, + http_port=DEFAULT_HTTP_PORT, + metric_exporter=InMemoryExporter, +): """Initialize or connect to a serve cluster. If serve cluster is already initialized, this function will just return. @@ -71,11 +73,12 @@ def init(name=None, name (str): A unique name for this serve instance. This allows multiple serve instances to run on the same ray cluster. Must be specified in all subsequent serve.init() calls. - http_host (str): Host for HTTP server. Default to "0.0.0.0". - http_port (int): Port for HTTP server. Default to 8000. + http_host (str): Host for HTTP servers. Default to "0.0.0.0". Serve + starts one HTTP server per node in the Ray cluster. + http_port (int, List[int]): Port for HTTP server. Default to 8000. metric_exporter(ExporterInterface): The class aggregates metrics from all RayServe actors and optionally export them to external - services. RayServe has two options built in: InMemoryExporter and + services. Ray Serve has two options built in: InMemoryExporter and PrometheusExporter """ if name is not None and not isinstance(name, str): @@ -94,19 +97,27 @@ def init(name=None, except ValueError: pass - # TODO(edoakes): for now, always start the HTTP proxy on the node that - # serve.init() was run on. We should consider making this configurable - # in the future. - http_node_id = ray.state.current_node_id() controller = ServeController.options( name=controller_name, max_restarts=-1, max_task_retries=-1, - ).remote(name, http_node_id, http_host, http_port, metric_exporter) + ).remote( + name, + http_host, + http_port, + metric_exporter, + ) - block_until_http_ready( - "http://{}:{}/-/routes".format(http_host, http_port), - timeout=HTTP_PROXY_TIMEOUT) + futures = [] + for node_id in ray.state.node_ids(): + future = block_until_http_ready.options( + num_cpus=0, resources={ + node_id: 0.01 + }).remote( + "http://{}:{}/-/routes".format(http_host, http_port), + timeout=HTTP_PROXY_TIMEOUT) + futures.append(future) + ray.get(futures) @_ensure_connected @@ -122,6 +133,7 @@ def shutdown(): controller = None +@_ensure_connected def create_endpoint(endpoint_name, *, backend=None, @@ -356,7 +368,7 @@ def get_handle(endpoint_name, assert endpoint_name in ray.get(controller.get_all_endpoints.remote()) return RayServeHandle( - ray.get(controller.get_http_proxy.remote())[0], + ray.get(controller.get_router.remote())[0], endpoint_name, relative_slo_ms, absolute_slo_ms, diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 771605961..d16db021d 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -1,5 +1,6 @@ import asyncio from collections import defaultdict, namedtuple +from itertools import groupby import os import random import time @@ -87,8 +88,8 @@ class ServeController: requires all implementations here to be idempotent. """ - async def __init__(self, instance_name, http_node_id, http_proxy_host, - http_proxy_port, metric_exporter_class): + async def __init__(self, instance_name, http_proxy_host, http_proxy_port, + metric_exporter_class): # Unique name of the serve instance managed by this actor. Used to # namespace child actors and checkpoints. self.instance_name = instance_name @@ -121,15 +122,13 @@ class ServeController: self.write_lock = asyncio.Lock() # Cached handles to actors in the system. - self.router = None - self.http_proxy = None + self.routers = [] self.metric_exporter = None # If starting the actor for the first time, starts up the other system # components. If recovering, fetches their actor handles. self._get_or_start_metric_exporter(metric_exporter_class) - self._get_or_start_http_proxy(http_node_id, http_proxy_host, - http_proxy_port) + self._get_or_start_routers(http_proxy_host, http_proxy_port) # NOTE(edoakes): unfortunately, we can't completely recover from a # checkpoint in the constructor because we block while waiting for @@ -151,40 +150,42 @@ class ServeController: asyncio.get_event_loop().create_task( self._recover_from_checkpoint(checkpoint)) - def _get_or_start_http_proxy(self, node_id, host, port): + def _get_or_start_routers(self, host, port): """Get the HTTP proxy belonging to this serve instance. If the HTTP proxy does not already exist, it will be started. """ - proxy_name = format_actor_name(SERVE_PROXY_NAME, self.instance_name) - try: - self.http_proxy = ray.get_actor(proxy_name) - except ValueError: - logger.info( - "Starting HTTP proxy with name '{}' on node '{}'".format( - proxy_name, node_id)) - self.http_proxy = HTTPProxyActor.options( - name=proxy_name, - max_concurrency=ASYNC_CONCURRENCY, - max_restarts=-1, - max_task_retries=-1, - resources={ - node_id: 0.01 - }, - ).remote( - host, port, instance_name=self.instance_name) + # TODO(simon): We don't handle nodes being added/removed. To do that, + # we should implement some sort of control loop in master actor. + for _, node_id_group in groupby(sorted(ray.state.node_ids())): + for index, node_id in enumerate(node_id_group): + proxy_name = format_actor_name(SERVE_PROXY_NAME, + self.instance_name) + proxy_name += "-{}-{}".format(node_id, index) + try: + router = ray.get_actor(proxy_name) + except ValueError: + logger.info( + "Starting HTTP proxy with name '{}' on node '{}' " + "listening on port {}".format(proxy_name, node_id, + port)) + router = HTTPProxyActor.options( + name=proxy_name, + max_concurrency=ASYNC_CONCURRENCY, + max_restarts=-1, + max_task_retries=-1, + resources={ + node_id: 0.01 + }, + ).remote( + host, port, instance_name=self.instance_name) + self.routers.append(router) - # Since router is a merged with HTTP proxy actor, the router will be - # proxied via the HTTP actor. Even though the two variable names are - # pointing to the same object, their semantic differences make the code - # more readable. (e.g. http_proxy.set_route_table, router.add_worker) - self.router = self.http_proxy - - def get_http_proxy(self): + def get_router(self): """Returns a handle to the HTTP proxy managed by this actor.""" - return [self.http_proxy] + return self.routers - def get_http_proxy_config(self): + def get_router_config(self): """Called by the HTTP proxy on startup to fetch required state.""" return self.routes @@ -267,20 +268,31 @@ class ServeController: # Push configuration state to the router. # TODO(edoakes): should we make this a pull-only model for simplicity? for endpoint, traffic_policy in self.traffic_policies.items(): - await self.router.set_traffic.remote(endpoint, traffic_policy) + await asyncio.gather(*[ + router.set_traffic.remote(endpoint, traffic_policy) + for router in self.routers + ]) for backend_tag, replica_dict in self.workers.items(): for replica_tag, worker in replica_dict.items(): - await self.router.add_new_worker.remote( - backend_tag, replica_tag, worker) + await asyncio.gather(*[ + router.add_new_worker.remote(backend_tag, replica_tag, + worker) + for router in self.routers + ]) for backend, info in self.backends.items(): - await self.router.set_backend_config.remote( - backend, info.backend_config) + await asyncio.gather(*[ + router.set_backend_config.remote(backend, info.backend_config) + for router in self.routers + ]) await self.broadcast_backend_config(backend) # Push configuration state to the HTTP proxy. - await self.http_proxy.set_route_table.remote(self.routes) + await asyncio.gather(*[ + router.set_route_table.remote(self.routes) + for router in self.routers + ]) # Start/stop any pending backend replicas. await self._start_pending_replicas() @@ -353,8 +365,11 @@ class ServeController: self.workers[backend_tag][replica_tag] = worker_handle # Register the worker with the router. - await self.router.add_new_worker.remote(backend_tag, replica_tag, - worker_handle) + await asyncio.gather(*[ + router.add_new_worker.remote(backend_tag, replica_tag, + worker_handle) + for router in self.routers + ]) async def _start_pending_replicas(self): """Starts the pending backend replicas in self.replicas_to_start. @@ -392,8 +407,10 @@ class ServeController: continue # Remove the replica from router. This call is idempotent. - await self.router.remove_worker.remote(backend_tag, - replica_tag) + await asyncio.gather(*[ + router.remove_worker.remote(backend_tag, replica_tag) + for router in self.routers + ]) # TODO(edoakes): this logic isn't ideal because there may be # pending tasks still executing on the replica. However, if we @@ -410,7 +427,10 @@ class ServeController: Clears self.backends_to_remove. """ for backend_tag in self.backends_to_remove: - await self.router.remove_backend.remote(backend_tag) + await asyncio.gather(*[ + router.remove_backend.remote(backend_tag) + for router in self.routers + ]) self.backends_to_remove.clear() async def _remove_pending_endpoints(self): @@ -419,7 +439,10 @@ class ServeController: Clears self.endpoints_to_remove. """ for endpoint_tag in self.endpoints_to_remove: - await self.router.remove_endpoint.remote(endpoint_tag) + await asyncio.gather(*[ + router.remove_endpoint.remote(endpoint_tag) + for router in self.routers + ]) self.endpoints_to_remove.clear() def _scale_replicas(self, backend_tag, num_replicas): @@ -533,7 +556,10 @@ class ServeController: # update to avoid inconsistent state if we crash after pushing the # update. self._checkpoint() - await self.router.set_traffic.remote(endpoint_name, traffic_policy) + await asyncio.gather(*[ + router.set_traffic.remote(endpoint_name, traffic_policy) + for router in self.routers + ]) async def set_traffic(self, endpoint_name, traffic_dict): """Sets the traffic policy for the specified endpoint.""" @@ -560,8 +586,12 @@ class ServeController: # update to avoid inconsistent state if we crash after pushing the # update. self._checkpoint() - await self.router.set_traffic.remote( - endpoint_name, self.traffic_policies[endpoint_name]) + await asyncio.gather(*[ + router.set_traffic.remote( + endpoint_name, + self.traffic_policies[endpoint_name], + ) for router in self.routers + ]) async def create_endpoint(self, endpoint, traffic_dict, route, methods): """Create a new endpoint with the specified route and methods. @@ -600,7 +630,10 @@ class ServeController: # NOTE(edoakes): checkpoint is written in self._set_traffic. await self._set_traffic(endpoint, traffic_dict) - await self.http_proxy.set_route_table.remote(self.routes) + await asyncio.gather(*[ + router.set_route_table.remote(self.routes) + for router in self.routers + ]) async def delete_endpoint(self, endpoint): """Delete the specified endpoint. @@ -635,7 +668,10 @@ class ServeController: # Update the HTTP proxy first to ensure no new requests for the # endpoint are sent to the router. - await self.http_proxy.set_route_table.remote(self.routes) + await asyncio.gather(*[ + router.set_route_table.remote(self.routes) + for router in self.routers + ]) await self._remove_pending_endpoints() async def create_backend(self, backend_tag, backend_config, @@ -660,8 +696,10 @@ class ServeController: # Set the backend config inside the router # (particularly for max-batch-size). - await self.router.set_backend_config.remote( - backend_tag, backend_config) + await asyncio.gather(*[ + router.set_backend_config.remote(backend_tag, backend_config) + for router in self.routers + ]) await self.broadcast_backend_config(backend_tag) async def delete_backend(self, backend_tag): @@ -717,8 +755,10 @@ class ServeController: # Inform the router about change in configuration # (particularly for setting max_batch_size). - await self.router.set_backend_config.remote( - backend_tag, backend_config) + await asyncio.gather(*[ + router.set_backend_config.remote(backend_tag, backend_config) + for router in self.routers + ]) await self._start_pending_replicas() await self._stop_pending_replicas() @@ -748,7 +788,8 @@ class ServeController: async def shutdown(self): """Shuts down the serve instance completely.""" async with self.write_lock: - ray.kill(self.http_proxy, no_restart=True) + for router in self.routers: + ray.kill(router, no_restart=True) ray.kill(self.metric_exporter, no_restart=True) for replica_dict in self.workers.values(): for replica in replica_dict.values(): diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 17987bb8e..180391b63 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -1,5 +1,6 @@ import asyncio from urllib.parse import parse_qs +import socket import uvicorn @@ -30,7 +31,7 @@ class HTTPProxy: assert ray.is_initialized() controller = serve.api._get_controller() - self.route_table = await controller.get_http_proxy_config.remote() + self.route_table = await controller.get_router_config.remote() # The exporter is required to return results for /-/metrics endpoint. [self.metric_exporter] = await controller.get_metric_exporter.remote() @@ -179,7 +180,19 @@ class HTTPProxyActor: # Start running the HTTP server on the event loop. asyncio.get_event_loop().create_task(self.run()) + def ready(self): + return True + async def run(self): + sock = socket.socket() + # These two socket options will allow multiple process to bind the the + # same port. Kernel will evenly load balance among the port listeners. + # Note: this will only work on Linux. + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(socket, "SO_REUSEPORT"): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + sock.bind((self.host, self.port)) + # Note(simon): we have to use lower level uvicorn Config and Server # class because we want to run the server as a coroutine. The only # alternative is to call uvicorn.run which is blocking. @@ -194,7 +207,7 @@ class HTTPProxyActor: # because the existing implementation fails if it isn't running in # the main thread and uvicorn doesn't expose a way to configure it. server.install_signal_handlers = lambda: None - await server.serve() + await server.serve(sockets=[sock]) async def set_route_table(self, route_table): self.app.set_route_table(route_table) diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 683e9ba8e..288faa3f3 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -170,9 +170,6 @@ class Router: "from backend."), label_names=("backend", )) - def is_ready(self): - return True - async def enqueue_request(self, request_meta, *request_args, **request_kwargs): endpoint = request_meta.endpoint diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 737704a34..16db58cc5 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -75,9 +75,10 @@ def test_controller_failure(serve_instance): assert response.text == "hello3" -def _kill_http_proxy(): - [http_proxy] = ray.get(serve.api._get_controller().get_http_proxy.remote()) - ray.kill(http_proxy, no_restart=False) +def _kill_routers(): + routers = ray.get(serve.api._get_controller().get_router.remote()) + for router in routers: + ray.kill(router, no_restart=False) def test_http_proxy_failure(serve_instance): @@ -96,7 +97,7 @@ def test_http_proxy_failure(serve_instance): response = request_with_retries("/proxy_failure", timeout=30) assert response.text == "hello1" - _kill_http_proxy() + _kill_routers() def function(): return "hello2" diff --git a/python/ray/serve/tests/test_scaling.py b/python/ray/serve/tests/test_scaling.py new file mode 100644 index 000000000..a44575943 --- /dev/null +++ b/python/ray/serve/tests/test_scaling.py @@ -0,0 +1,45 @@ +import sys +import socket + +import pytest + +import ray +from ray import serve +from ray.serve.constants import SERVE_PROXY_NAME +from ray.serve.utils import block_until_http_ready +from ray.cluster_utils import Cluster + + +@pytest.mark.skipif( + not hasattr(socket, "SO_REUSEPORT"), + reason=("Port sharing only works on newer verion of Linux. " + "This test can only be ran when port sharing is supported.")) +def test_multiple_routers(): + cluster = Cluster() + head_node = cluster.add_node() + cluster.add_node() + + ray.init(head_node.address) + node_ids = ray.state.node_ids() + assert len(node_ids) == 2 + serve.init(http_port=8005) + + # two actors should be started + head_http = ray.get_actor(SERVE_PROXY_NAME + + "-{}-{}".format(node_ids[0], 0)) + ray.get_actor(SERVE_PROXY_NAME + "-{}-{}".format(node_ids[0], 1)) + + # wait for the actors to come up + ray.get(block_until_http_ready.remote("http://127.0.0.1:8005/-/routes")) + + # kill the head_http server, the HTTP server should still functions + ray.kill(head_http, no_restart=True) + ray.get(block_until_http_ready.remote("http://127.0.0.1:8005/-/routes")) + + # cleanup the nodes (otherwise Ray will segfault) + ray.shutdown() + cluster.shutdown() + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index a22430c39..d940ad562 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -78,6 +78,7 @@ class ServeEncoder(json.JSONEncoder): return super().default(o) +@ray.remote(num_cpus=0) def block_until_http_ready(http_endpoint, backoff_time_s=1, timeout=HTTP_PROXY_TIMEOUT):