From fb23bd6fc02857f2703ffc09c543cde18cfd60ab Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Sun, 17 May 2020 00:14:42 -0500 Subject: [PATCH] [serve] Optionally namespace serve clusters (#8447) --- doc/source/rayserve/overview.rst | 24 +++++++++++++ python/ray/serve/api.py | 34 +++++++++--------- python/ray/serve/backend_worker.py | 24 +++++++++---- python/ray/serve/http_proxy.py | 9 ++--- python/ray/serve/master.py | 58 +++++++++++++++++++----------- python/ray/serve/metric/client.py | 15 +------- python/ray/serve/router.py | 12 ++++--- python/ray/serve/tests/conftest.py | 1 + python/ray/serve/tests/test_api.py | 44 +++++++++++++++++++++++ python/ray/serve/utils.py | 7 ++++ 10 files changed, 163 insertions(+), 65 deletions(-) diff --git a/doc/source/rayserve/overview.rst b/doc/source/rayserve/overview.rst index a101d6db4..772eba700 100644 --- a/doc/source/rayserve/overview.rst +++ b/doc/source/rayserve/overview.rst @@ -246,6 +246,30 @@ The shard key can either be specified via the X-SERVE-SHARD-KEY HTTP header or ` handle = serve.get_handle("api_endpoint") handler.options(shard_key=session_id).remote(args) +Running Multiple Serve Clusters on one Ray Cluster +++++++++++++++++++++++++++++++++++++++++++++++++++ + +You can run multiple serve clusters on the same Ray cluster by providing a ``cluster_name`` to ``serve.init()``. + +.. code-block:: python + + # Create a first cluster whose HTTP server listens on 8000. + serve.init(cluster_name="cluster1", http_port=8000) + serve.create_endpoint("counter1", "/increment") + + # Create a second cluster whose HTTP server listens on 8001. + serve.init(cluster_name="cluster2", http_port=8001) + serve.create_endpoint("counter1", "/increment") + + # Create a backend that will be served on the second cluster. + serve.create_backend("counter1", function) + serve.set_traffic("counter1", {"counter1": 1.0}) + + # Switch back the the first cluster and create the same backend on it. + serve.init(cluster_name="cluster1") + serve.create_backend("counter1", function) + serve.set_traffic("counter1", {"counter1": 1.0}) + Other Resources --------------- diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 63f7ed74a..f26f8b3e4 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -7,7 +7,8 @@ from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, SERVE_MASTER_NAME) from ray.serve.master import ServeMaster from ray.serve.handle import RayServeHandle -from ray.serve.utils import block_until_http_ready, retry_actor_failures +from ray.serve.utils import (block_until_http_ready, format_actor_name, + retry_actor_failures) from ray.serve.exceptions import RayServeException from ray.serve.config import BackendConfig, ReplicaConfig from ray.serve.router import Query @@ -23,16 +24,15 @@ def _get_master_actor(): """ global master_actor if master_actor is None: - master_actor = ray.util.get_actor(SERVE_MASTER_NAME) + raise RayServeException("Please run serve.init to initialize or " + "connect to existing ray serve cluster.") return master_actor def _ensure_connected(f): @wraps(f) def check(*args, **kwargs): - if _get_master_actor() is None: - raise RayServeException("Please run serve.init to initialize or " - "connect to existing ray serve cluster.") + _get_master_actor() return f(*args, **kwargs) return check @@ -60,7 +60,8 @@ def accept_batch(f): return f -def init(blocking=False, +def init(cluster_name=None, + blocking=False, start_server=True, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, @@ -78,6 +79,9 @@ def init(blocking=False, requirement. Args: + cluster_name (str): A unique name for this serve cluster. This allows + multiple serve clusters to run on the same ray cluster. Must be + specified in all subsequent serve.init() calls. blocking (bool): If true, the function will wait for the HTTP server to be healthy, and other components to be ready before returns. start_server (bool): If true, `serve.init` starts http server. @@ -92,21 +96,18 @@ def init(blocking=False, services. RayServe has two options built in: InMemoryExporter and PrometheusExporter """ - global master_actor - if master_actor is not None: - return + if cluster_name is not None and not isinstance(cluster_name, str): + raise TypeError("cluster_name must be a string.") # Initialize ray if needed. if not ray.is_initialized(): ray.init(**ray_init_kwargs) - # Register serialization context once - ray.register_custom_serializer(Query, Query.ray_serialize, - Query.ray_deserialize) - # Try to get serve master actor if it exists + global master_actor + master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name) try: - master_actor = ray.util.get_actor(SERVE_MASTER_NAME) + master_actor = ray.util.get_actor(master_actor_name) return except ValueError: pass @@ -124,9 +125,10 @@ def init(blocking=False, http_node_id = ray.state.current_node_id() master_actor = ServeMaster.options( detached=True, - name=SERVE_MASTER_NAME, + name=master_actor_name, max_restarts=-1, - ).remote(start_server, http_node_id, http_host, http_port, metric_exporter) + ).remote(cluster_name, start_server, http_node_id, http_host, http_port, + metric_exporter) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 0d115eddb..e05267dc8 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -7,7 +7,8 @@ from ray import serve from ray.serve import context as serve_context from ray.serve.context import FakeFlaskRequest from collections import defaultdict -from ray.serve.utils import parse_request_item, _get_logger +from ray.serve.utils import (parse_request_item, _get_logger, + retry_actor_failures) from ray.serve.exceptions import RayServeException from ray.serve.metric import MetricClient from ray.async_compat import sync_to_async @@ -26,15 +27,24 @@ def create_backend_worker(func_or_class): assert False, "func_or_class must be function or class." class RayServeWrappedWorker(object): - def __init__(self, backend_tag, replica_tag, init_args): - serve.init() + def __init__(self, + backend_tag, + replica_tag, + init_args, + cluster_name=None): + serve.init(cluster_name=cluster_name) if is_function: _callable = func_or_class else: _callable = func_or_class(*init_args) + master = serve.api._get_master_actor() + [metric_exporter] = retry_actor_failures( + master.get_metric_exporter) + metric_client = MetricClient( + metric_exporter, default_labels={"backend": backend_tag}) self.backend = RayServeWorker(backend_tag, replica_tag, _callable, - is_function) + is_function, metric_client) async def handle_request(self, request): return await self.backend.handle_request(request) @@ -67,14 +77,14 @@ def ensure_async(func): class RayServeWorker: """Handles requests with the provided callable.""" - def __init__(self, name, replica_tag, _callable, is_function): + def __init__(self, name, replica_tag, _callable, is_function, + metric_client): self.name = name self.replica_tag = replica_tag self.callable = _callable self.is_function = is_function - self.metric_client = MetricClient.connect_from_serve( - default_labels={"backend": self.name}) + self.metric_client = metric_client self.request_counter = self.metric_client.new_counter( "backend_request_counter", description=("Number of queries that have been " diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 2975ebfd8..968b5d443 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -4,12 +4,12 @@ import socket import uvicorn import ray +from ray import serve from ray.serve.context import TaskContext from ray.serve.metric import MetricClient from ray.serve.request_params import RequestMetadata from ray.serve.http_util import Response from ray.serve.utils import logger, retry_actor_failures_async -from ray.serve.constants import SERVE_MASTER_NAME from urllib.parse import parse_qs @@ -29,7 +29,7 @@ class HTTPProxy: async def fetch_config_from_master(self): assert ray.is_initialized() - master = ray.util.get_actor(SERVE_MASTER_NAME) + master = serve.api._get_master_actor() self.route_table, [ self.router_handle @@ -39,7 +39,7 @@ class HTTPProxy: [self.metric_exporter] = await retry_actor_failures_async( master.get_metric_exporter) - self.metric_client = MetricClient.connect_from_serve() + self.metric_client = MetricClient(self.metric_exporter) self.request_counter = self.metric_client.new_counter( "num_http_requests", description="The number of requests processed", @@ -194,7 +194,8 @@ class HTTPProxy: @ray.remote class HTTPProxyActor: - async def __init__(self, host, port): + async def __init__(self, host, port, cluster_name=None): + serve.init(cluster_name=cluster_name) self.app = HTTPProxy() await self.app.fetch_config_from_master() self.host = host diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index f680b57a6..89f884a36 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -13,7 +13,8 @@ from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore from ray.serve.metric.exporter import MetricExporterActor from ray.serve.router import Router -from ray.serve.utils import async_retryable, get_random_letters, logger +from ray.serve.utils import (async_retryable, format_actor_name, + get_random_letters, logger) import numpy as np @@ -49,10 +50,13 @@ class ServeMaster: requires all implementations here to be idempotent. """ - async def __init__(self, start_http_proxy, http_node_id, http_proxy_host, - http_proxy_port, metric_exporter_class): + async def __init__(self, cluster_name, start_http_proxy, http_node_id, + http_proxy_host, http_proxy_port, + metric_exporter_class): + # Unique name of the serve cluster managed by this actor. Used to + # namespace child actors and checkpoints. + self.cluster_name = cluster_name # Used to read/write checkpoints. - # TODO(edoakes): namespace the master actor and its checkpoints. self.kv_store = RayInternalKVStore() # path -> (endpoint, methods). self.routes = {} @@ -105,7 +109,10 @@ class ServeMaster: # a checkpoint to the event loop. Other state-changing calls acquire # this lock and will be blocked until recovering from the checkpoint # finishes. - checkpoint = self.kv_store.get(CHECKPOINT_KEY) + checkpoint_key = CHECKPOINT_KEY + if self.cluster_name is not None: + checkpoint_key = "{}:{}".format(self.cluster_name, checkpoint_key) + checkpoint = self.kv_store.get(checkpoint_key) if checkpoint is None: logger.debug("No checkpoint found") else: @@ -118,16 +125,17 @@ class ServeMaster: If the router does not already exist, it will be started. """ + router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name) try: - self.router = ray.util.get_actor(SERVE_ROUTER_NAME) + self.router = ray.util.get_actor(router_name) except ValueError: - logger.info( - "Starting router with name '{}'".format(SERVE_ROUTER_NAME)) + logger.info("Starting router with name '{}'".format(router_name)) self.router = async_retryable(ray.remote(Router)).options( detached=True, - name=SERVE_ROUTER_NAME, + name=router_name, max_concurrency=ASYNC_CONCURRENCY, - max_restarts=-1).remote() + max_restarts=-1, + ).remote(cluster_name=self.cluster_name) def get_router(self): """Returns a handle to the router managed by this actor.""" @@ -138,21 +146,23 @@ class ServeMaster: If the HTTP proxy does not already exist, it will be started. """ + proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name) try: - self.http_proxy = ray.util.get_actor(SERVE_PROXY_NAME) + self.http_proxy = ray.util.get_actor(proxy_name) except ValueError: logger.info( "Starting HTTP proxy with name '{}' on node '{}'".format( - SERVE_PROXY_NAME, node_id)) + proxy_name, node_id)) self.http_proxy = async_retryable(HTTPProxyActor).options( detached=True, - name=SERVE_PROXY_NAME, + name=proxy_name, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, resources={ node_id: 0.01 }, - ).remote(host, port) + ).remote( + host, port, cluster_name=self.cluster_name) def get_http_proxy(self): """Returns a handle to the HTTP proxy managed by this actor.""" @@ -167,14 +177,16 @@ class ServeMaster: If the metric exporter does not already exist, it will be started. """ + metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME, + self.cluster_name) try: - self.metric_exporter = ray.util.get_actor(SERVE_METRIC_SINK_NAME) + self.metric_exporter = ray.util.get_actor(metric_sink_name) except ValueError: logger.info("Starting metric exporter with name '{}'".format( - SERVE_METRIC_SINK_NAME)) + metric_sink_name)) self.metric_exporter = MetricExporterActor.options( detached=True, - name=SERVE_METRIC_SINK_NAME).remote(metric_exporter_class) + name=metric_sink_name).remote(metric_exporter_class) def get_metric_exporter(self): """Returns a handle to the metric exporter managed by this actor.""" @@ -232,8 +244,10 @@ class ServeMaster: # were created. for backend_tag, replica_tags in self.replicas.items(): for replica_tag in replica_tags: + replica_name = format_actor_name(replica_tag, + self.cluster_name) self.workers[backend_tag][replica_tag] = ray.util.get_actor( - replica_tag) + replica_name) # Push configuration state to the router. # TODO(edoakes): should we make this a pull-only model for simplicity? @@ -295,12 +309,16 @@ class ServeMaster: (backend_worker, backend_config, replica_config) = self.backends[backend_tag] + replica_name = format_actor_name(replica_tag, self.cluster_name) worker_handle = async_retryable(ray.remote(backend_worker)).options( detached=True, - name=replica_tag, + name=replica_name, max_restarts=-1, **replica_config.ray_actor_options).remote( - backend_tag, replica_tag, replica_config.actor_init_args) + backend_tag, + replica_tag, + replica_config.actor_init_args, + cluster_name=self.cluster_name) # TODO(edoakes): we should probably have a timeout here. await worker_handle.ready.remote() return worker_handle diff --git a/python/ray/serve/metric/client.py b/python/ray/serve/metric/client.py index 457b92a1e..442ad4f4c 100644 --- a/python/ray/serve/metric/client.py +++ b/python/ray/serve/metric/client.py @@ -6,8 +6,7 @@ from ray.serve.metric.types import ( convert_event_type_to_class, MetricMetadata, ) -from ray.serve.utils import (retry_actor_failures, retry_actor_failures_async, - _get_logger) +from ray.serve.utils import retry_actor_failures_async, _get_logger from ray.serve.constants import METRIC_PUSH_INTERVAL_S logger = _get_logger() @@ -38,18 +37,6 @@ class MetricClient: self.push_to_exporter_forever(push_interval)) logger.debug("Initialized client") - @staticmethod - def connect_from_serve(default_labels: Optional[Dict[str, str]] = None): - """Create the metric client automatically when running inside serve.""" - from ray.serve.api import _get_master_actor - - master_actor = _get_master_actor() - [metric_exporter] = retry_actor_failures( - master_actor.get_metric_exporter) - return MetricClient( - metric_exporter_actor=metric_exporter, - default_labels=default_labels) - def new_counter(self, name: str, *, diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index b07ff3dbe..ac4568e07 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -15,6 +15,8 @@ import blist import ray import ray.cloudpickle as pickle from ray.exceptions import RayTaskError + +from ray import serve from ray.serve.metric import MetricClient from ray.serve.policy import RandomEndpointPolicy from ray.serve.utils import logger, retry_actor_failures @@ -106,7 +108,7 @@ class Router: 3. When there is only 1 backend ready, we will only use that backend. """ - async def __init__(self): + async def __init__(self, cluster_name=None): # Note: Several queues are used in the router # - When a request come in, it's placed inside its corresponding # endpoint_queue. @@ -152,8 +154,8 @@ class Router: # the master actor. We use a "pull-based" approach instead of pushing # them from the master so that the router can transparently recover # from failure. - ray.serve.init() - master_actor = ray.serve.api._get_master_actor() + serve.init(cluster_name=cluster_name) + master_actor = serve.api._get_master_actor() traffic_policies = retry_actor_failures( master_actor.get_traffic_policies) @@ -171,7 +173,9 @@ class Router: for backend, backend_config in backend_configs.items(): await self.set_backend_config(backend, backend_config) - self.metric_client = MetricClient.connect_from_serve() + [metric_exporter] = retry_actor_failures( + master_actor.get_metric_exporter) + self.metric_client = MetricClient(metric_exporter) self.num_router_requests = self.metric_client.new_counter( "num_router_requests", description="Number of requests processed by the router.", diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 8e3048868..37c03b2b0 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -17,6 +17,7 @@ def _shared_serve_instance(): @pytest.fixture def serve_instance(_shared_serve_instance): + serve.init() yield master = serve.api._get_master_actor() # Clear all state between tests to avoid naming collisions. diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 32a9332ba..5c9d5397f 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -342,3 +342,47 @@ def test_shard_key(serve_instance, route): # Check that the shard keys are mapped to the same backends. for shard_key in shard_keys: assert do_request(shard_key) == results[shard_key] + + +def test_cluster_name(): + with pytest.raises(TypeError): + serve.init(cluster_name=1) + + route = "/api" + backend = "backend" + endpoint = "endpoint" + + serve.init(cluster_name="cluster1", blocking=True, http_port=8001) + serve.create_endpoint(endpoint, route=route) + + def function(): + return "hello1" + + serve.create_backend(backend, function) + serve.set_traffic(endpoint, {backend: 1.0}) + + assert requests.get("http://127.0.0.1:8001" + route).text == "hello1" + + # Create a second cluster on port 8002. Create an endpoint and backend with + # the same names and check that they don't collide. + serve.init(cluster_name="cluster2", blocking=True, http_port=8002) + serve.create_endpoint(endpoint, route=route) + + def function(): + return "hello2" + + serve.create_backend(backend, function) + serve.set_traffic(endpoint, {backend: 1.0}) + + assert requests.get("http://127.0.0.1:8001" + route).text == "hello1" + assert requests.get("http://127.0.0.1:8002" + route).text == "hello2" + + # Check that deleting the backend in the current cluster doesn't. + serve.delete_endpoint(endpoint) + serve.delete_backend(backend) + assert requests.get("http://127.0.0.1:8001" + route).text == "hello1" + + # Check that we can re-connect to the first cluster. + serve.init(cluster_name="cluster1") + serve.delete_endpoint(endpoint) + serve.delete_backend(backend) diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index 9dfbb0e04..8e5aa697d 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -177,3 +177,10 @@ async def retry_actor_failures_async(f, *args, **kwargs): raise RuntimeError("Timed out after {}s waiting for actor " "method '{}' to succeed.".format( ACTOR_FAILURE_RETRY_TIMEOUT_S, f._method_name)) + + +def format_actor_name(actor_name, cluster_name=None): + if cluster_name is None: + return actor_name + else: + return "{}:{}".format(cluster_name, actor_name)