From 3a53ea60d9a124d0d434bd73d8a7578c8e5808a2 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Mon, 30 Mar 2020 11:53:05 -0500 Subject: [PATCH] [Serve] Push route table updates to HTTP proxy (#7774) --- python/ray/serve/api.py | 31 ++++---- python/ray/serve/constants.py | 7 +- python/ray/serve/global_state.py | 48 ++++++------ python/ray/serve/{server.py => http_proxy.py} | 74 ++++++++++--------- python/ray/serve/kv_store_service.py | 8 +- python/ray/serve/utils.py | 2 +- 6 files changed, 86 insertions(+), 84 deletions(-) rename python/ray/serve/{server.py => http_proxy.py} (72%) diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 353f04546..671ab6f89 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -8,7 +8,7 @@ import numpy as np import ray from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, - SERVE_NURSERY_NAME) + SERVE_MASTER_NAME) from ray.serve.global_state import GlobalState, start_initial_state from ray.serve.kv_store_service import SQLiteKVStore from ray.serve.task_runner import RayServeMixin, TaskRunnerActor @@ -114,9 +114,9 @@ def init( if not ray.is_initialized(): ray.init(**ray_init_kwargs) - # Try to get serve nursery if there exists + # Try to get serve master actor if it exists try: - ray.util.get_actor(SERVE_NURSERY_NAME) + ray.util.get_actor(SERVE_MASTER_NAME) global_state = GlobalState() return except ValueError: @@ -138,15 +138,16 @@ def init( def kv_store_connector(namespace): return SQLiteKVStore(namespace, db_path=kv_store_path) - nursery = start_initial_state(kv_store_connector) + master = start_initial_state(kv_store_connector) - global_state = GlobalState(nursery) - if start_server: - global_state.init_or_get_http_server(host=http_host, port=http_port) - global_state.init_or_get_router( + global_state = GlobalState(master) + router = global_state.init_or_get_router( queueing_policy=queueing_policy, policy_kwargs=policy_kwargs) global_state.init_or_get_metric_monitor( gc_window_seconds=gc_window_seconds) + if start_server: + global_state.init_or_get_http_proxy( + host=http_host, port=http_port).set_router_handle.remote(router) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( @@ -168,6 +169,9 @@ def create_endpoint(endpoint_name, route=None, methods=["GET"]): methods = [m.upper() for m in methods] global_state.route_table.register_service( route, endpoint_name, methods=methods) + ray.get(global_state.init_or_get_http_proxy().set_route_table.remote( + global_state.route_table.list_service( + include_methods=True, include_headless=False))) @_ensure_connected @@ -321,9 +325,9 @@ def _start_replica(backend_tag): # get actor creation kwargs actor_kwargs = backend_config.get_actor_creation_args(init_args) - # Create the runner in the nursery + # Create the runner in the master actor [runner_handle] = ray.get( - global_state.actor_nursery_handle.start_actor_with_creator.remote( + global_state.master_actor_handle.start_actor_with_creator.remote( creator, actor_kwargs, replica_tag)) # Setup the worker @@ -347,15 +351,14 @@ def _remove_replica(backend_tag): replica_tag = global_state.backend_table.remove_replica(backend_tag) [replica_handle] = ray.get( - global_state.actor_nursery_handle.get_handle.remote(replica_tag)) + global_state.master_actor_handle.get_handle.remote(replica_tag)) # Remove the replica from metric monitor. ray.get(global_state.init_or_get_metric_monitor().remove_target.remote( replica_handle)) - # Remove the replica from actor nursery. - ray.get( - global_state.actor_nursery_handle.remove_handle.remote(replica_tag)) + # Remove the replica from master actor. + ray.get(global_state.master_actor_handle.remove_handle.remote(replica_tag)) # Remove the replica from router. # This will also destory the actor handle. diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index fb0d94793..73cbd77b9 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -1,8 +1,5 @@ -#: The interval which http server refreshes its routing table -HTTP_ROUTER_CHECKER_INTERVAL_S = 2 - -#: Actor name used to register actor nursery -SERVE_NURSERY_NAME = "SERVE_ACTOR_NURSERY" +#: Actor name used to register master actor +SERVE_MASTER_NAME = "SERVE_MASTER_ACTOR" #: KVStore connector key in bootstrap config BOOTSTRAP_KV_STORE_CONN_KEY = "kv_store_connector" diff --git a/python/ray/serve/global_state.py b/python/ray/serve/global_state.py index 363098cd7..ca4da11d2 100644 --- a/python/ray/serve/global_state.py +++ b/python/ray/serve/global_state.py @@ -1,27 +1,27 @@ import ray from ray.serve.constants import (BOOTSTRAP_KV_STORE_CONN_KEY, DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, - SERVE_NURSERY_NAME, ASYNC_CONCURRENCY) + SERVE_MASTER_NAME, ASYNC_CONCURRENCY) from ray.serve.kv_store_service import (BackendTable, RoutingTable, TrafficPolicyTable) from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop) from ray.serve.policy import RoutePolicy -from ray.serve.server import HTTPActor +from ray.serve.http_proxy import HTTPProxyActor def start_initial_state(kv_store_connector): - nursery_handle = ActorNursery.remote() - ray.util.register_actor(SERVE_NURSERY_NAME, nursery_handle) + master_handle = ServeMaster.remote() + ray.util.register_actor(SERVE_MASTER_NAME, master_handle) ray.get( - nursery_handle.store_bootstrap_state.remote( - BOOTSTRAP_KV_STORE_CONN_KEY, kv_store_connector)) - return nursery_handle + master_handle.store_bootstrap_state.remote(BOOTSTRAP_KV_STORE_CONN_KEY, + kv_store_connector)) + return master_handle @ray.remote -class ActorNursery: +class ServeMaster: """Initialize and store all actor handles. Note: @@ -88,15 +88,15 @@ class GlobalState: 2. A actor supervisor service """ - def __init__(self, actor_nursery_handle=None): + def __init__(self, master_actor_handle=None): # Get actor nursery handle - if actor_nursery_handle is None: - actor_nursery_handle = ray.util.get_actor(SERVE_NURSERY_NAME) - self.actor_nursery_handle = actor_nursery_handle + if master_actor_handle is None: + master_actor_handle = ray.util.get_actor(SERVE_MASTER_NAME) + self.master_actor_handle = master_actor_handle # Connect to all the table bootstrap_config = ray.get( - self.actor_nursery_handle.get_bootstrap_state_dict.remote()) + self.master_actor_handle.get_bootstrap_state_dict.remote()) kv_store_connector = bootstrap_config[BOOTSTRAP_KV_STORE_CONN_KEY] self.route_table = RoutingTable(kv_store_connector) self.backend_table = BackendTable(kv_store_connector) @@ -106,25 +106,25 @@ class GlobalState: def refresh_actor_handle_cache(self): self.actor_handle_cache = ray.get( - self.actor_nursery_handle.get_all_handles.remote()) + self.master_actor_handle.get_all_handles.remote()) - def init_or_get_http_server(self, - host=DEFAULT_HTTP_HOST, - port=DEFAULT_HTTP_PORT): - if "http_server" not in self.actor_handle_cache: + def init_or_get_http_proxy(self, + host=DEFAULT_HTTP_HOST, + port=DEFAULT_HTTP_PORT): + if "http_proxy" not in self.actor_handle_cache: [handle] = ray.get( - self.actor_nursery_handle.start_actor.remote( - HTTPActor, tag="http_server")) + self.master_actor_handle.start_actor.remote( + HTTPProxyActor, tag="http_proxy")) handle.run.remote(host=host, port=port) self.refresh_actor_handle_cache() - return self.actor_handle_cache["http_server"] + return self.actor_handle_cache["http_proxy"] def _get_queueing_policy(self, default_policy): return_policy = default_policy # check if there is already a queue_actor running # with policy as p.name for the case where - # serve nursery exists: ray.util.get_actor(SERVE_NURSERY_NAME) + # serve nursery exists: ray.util.get_actor(SERVE_MASTER_NAME) for p in RoutePolicy: queue_actor_tag = "queue_actor::" + p.name if queue_actor_tag in self.actor_handle_cache: @@ -141,7 +141,7 @@ class GlobalState: queue_actor_tag = "queue_actor::" + self.queueing_policy.name if queue_actor_tag not in self.actor_handle_cache: [handle] = ray.get( - self.actor_nursery_handle.start_actor.remote( + self.master_actor_handle.start_actor.remote( self.queueing_policy.value, init_kwargs=policy_kwargs, tag=queue_actor_tag, @@ -154,7 +154,7 @@ class GlobalState: def init_or_get_metric_monitor(self, gc_window_seconds=3600): if "metric_monitor" not in self.actor_handle_cache: [handle] = ray.get( - self.actor_nursery_handle.start_actor.remote( + self.master_actor_handle.start_actor.remote( MetricMonitor, init_args=(gc_window_seconds, ), tag="metric_monitor")) diff --git a/python/ray/serve/server.py b/python/ray/serve/http_proxy.py similarity index 72% rename from python/ray/serve/server.py rename to python/ray/serve/http_proxy.py index 8e86871cc..4e3ce6d85 100644 --- a/python/ray/serve/server.py +++ b/python/ray/serve/http_proxy.py @@ -1,10 +1,8 @@ -import asyncio +import socket import uvicorn import ray -from ray.experimental.async_api import _async_init -from ray.serve.constants import HTTP_ROUTER_CHECKER_INTERVAL_S from ray.serve.context import TaskContext from ray.serve.request_params import RequestMetadata from ray.serve.http_util import Response @@ -23,39 +21,24 @@ class HTTPProxy: def __init__(self): assert ray.is_initialized() + # Must be set via set_route_table. + self.route_table = dict() + # Must be set via set_router_handle. + self.router_handle = None - # Delay import due to GlobalState depends on HTTP actor - from ray.serve.global_state import GlobalState + def set_route_table(self, route_table): + self.route_table = route_table - self.serve_global_state = GlobalState() - self.route_table_cache = dict() - - self.route_checker_task = None - self.route_checker_should_shutdown = False - - async def route_checker(self, interval): - while True: - if self.route_checker_should_shutdown: - return - - self.route_table_cache = ( - self.serve_global_state.route_table.list_service( - include_methods=True, include_headless=False)) - - await asyncio.sleep(interval) + def set_router_handle(self, router_handle): + self.router_handle = router_handle async def handle_lifespan_message(self, scope, receive, send): assert scope["type"] == "lifespan" message = await receive() if message["type"] == "lifespan.startup": - await _async_init() - self.route_checker_task = asyncio.get_event_loop().create_task( - self.route_checker(interval=HTTP_ROUTER_CHECKER_INTERVAL_S)) await send({"type": "lifespan.startup.complete"}) elif message["type"] == "lifespan.shutdown": - self.route_checker_task.cancel() - self.route_checker_should_shutdown = True await send({"type": "lifespan.shutdown.complete"}) async def receive_http_body(self, scope, receive, send): @@ -113,14 +96,16 @@ class HTTPProxy: error_sender = self._make_error_sender(scope, receive, send) + assert self.route_table is not None, ( + "Route table must be set via set_route_table.") assert scope["type"] == "http" current_path = scope["path"] if current_path == "/-/routes": - await Response(self.route_table_cache).send(scope, receive, send) + await Response(self.route_table).send(scope, receive, send) return # TODO(simon): Use werkzeug route mapper to support variable path - if current_path not in self.route_table_cache: + if current_path not in self.route_table: error_message = ( "Path {} not found. " "Please ping http://.../-/routes for routing table" @@ -128,7 +113,7 @@ class HTTPProxy: await error_sender(error_message, 404) return - endpoint_name, methods_allowed = self.route_table_cache[current_path] + endpoint_name, methods_allowed = self.route_table[current_path] if scope["method"] not in methods_allowed: error_message = ("Methods {} not allowed. " @@ -154,10 +139,11 @@ class HTTPProxy: absolute_slo_ms=absolute_slo_ms, call_method=headers.get("X-SERVE-CALL-METHOD".lower(), "__call__")) + assert self.route_table is not None, ( + "Router handle must be set via set_router_handle.") try: - result = await (self.serve_global_state.init_or_get_router() - .enqueue_request.remote(request_metadata, scope, - http_body_bytes)) + result = await self.router_handle.enqueue_request.remote( + request_metadata, scope, http_body_bytes) await Response(result).send(scope, receive, send) except Exception as e: error_message = "Internal Error. Traceback: {}.".format(e) @@ -165,10 +151,26 @@ class HTTPProxy: @ray.remote -class HTTPActor: +class HTTPProxyActor: def __init__(self): self.app = HTTPProxy() - def run(self, host="0.0.0.0", port=8000): - uvicorn.run( - self.app, host=host, port=port, lifespan="on", access_log=False) + async def run(self, host="0.0.0.0", port=8000): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.set_inheritable(True) + + config = uvicorn.Config(self.app, lifespan="on", access_log=False) + server = uvicorn.Server(config=config) + # TODO(edoakes): we need to override install_signal_handlers here + # because the existing implementation fails if it isn't running in + # the main thread and uvicorn doesn't expose a way to configure it. + server.install_signal_handlers = lambda: None + await server.serve(sockets=[sock]) + + async def set_route_table(self, route_table): + self.app.set_route_table(route_table) + + async def set_router_handle(self, router_handle): + self.app.set_router_handle(router_handle) diff --git a/python/ray/serve/kv_store_service.py b/python/ray/serve/kv_store_service.py index b90a8ab09..49cd69ec1 100644 --- a/python/ray/serve/kv_store_service.py +++ b/python/ray/serve/kv_store_service.py @@ -225,13 +225,13 @@ class RoutingTable: This method is used for two purpose: - 1. Make sure HTTP server has started and healthy. Incremented request - count means HTTP server is actively fetching routing table. + 1. Make sure HTTP proxy has started and healthy. Incremented request + count means HTTP proxy is actively fetching routing table. - 2. Make sure HTTP server does not have stale routing table. This number + 2. Make sure HTTP proxy does not have stale routing table. This number should be incremented every HTTP_ROUTER_CHECKER_INTERVAL_S seconds. Supervisor should check this number as indirect indicator of http - server's health. + proxy's health. """ return self.request_count diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index 567adfcd4..8d0aa771d 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -111,7 +111,7 @@ def block_until_http_ready(http_endpoint, num_retries=5, backoff_time_s=1): retries -= 1 if retries == 0: raise Exception( - "HTTP server not ready after {} retries.".format(num_retries)) + "HTTP proxy not ready after {} retries.".format(num_retries)) def get_random_letters(length=6):