[serve] Optionally namespace serve clusters (#8447)

This commit is contained in:
Edward Oakes
2020-05-17 00:14:42 -05:00
committed by GitHub
parent 67c01455fe
commit fb23bd6fc0
10 changed files with 163 additions and 65 deletions
+18 -16
View File
@@ -7,7 +7,8 @@ from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
SERVE_MASTER_NAME)
from ray.serve.master import ServeMaster
from ray.serve.handle import RayServeHandle
from ray.serve.utils import block_until_http_ready, retry_actor_failures
from ray.serve.utils import (block_until_http_ready, format_actor_name,
retry_actor_failures)
from ray.serve.exceptions import RayServeException
from ray.serve.config import BackendConfig, ReplicaConfig
from ray.serve.router import Query
@@ -23,16 +24,15 @@ def _get_master_actor():
"""
global master_actor
if master_actor is None:
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
raise RayServeException("Please run serve.init to initialize or "
"connect to existing ray serve cluster.")
return master_actor
def _ensure_connected(f):
@wraps(f)
def check(*args, **kwargs):
if _get_master_actor() is None:
raise RayServeException("Please run serve.init to initialize or "
"connect to existing ray serve cluster.")
_get_master_actor()
return f(*args, **kwargs)
return check
@@ -60,7 +60,8 @@ def accept_batch(f):
return f
def init(blocking=False,
def init(cluster_name=None,
blocking=False,
start_server=True,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
@@ -78,6 +79,9 @@ def init(blocking=False,
requirement.
Args:
cluster_name (str): A unique name for this serve cluster. This allows
multiple serve clusters to run on the same ray cluster. Must be
specified in all subsequent serve.init() calls.
blocking (bool): If true, the function will wait for the HTTP server to
be healthy, and other components to be ready before returns.
start_server (bool): If true, `serve.init` starts http server.
@@ -92,21 +96,18 @@ def init(blocking=False,
services. RayServe has two options built in: InMemoryExporter and
PrometheusExporter
"""
global master_actor
if master_actor is not None:
return
if cluster_name is not None and not isinstance(cluster_name, str):
raise TypeError("cluster_name must be a string.")
# Initialize ray if needed.
if not ray.is_initialized():
ray.init(**ray_init_kwargs)
# Register serialization context once
ray.register_custom_serializer(Query, Query.ray_serialize,
Query.ray_deserialize)
# Try to get serve master actor if it exists
global master_actor
master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name)
try:
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
master_actor = ray.util.get_actor(master_actor_name)
return
except ValueError:
pass
@@ -124,9 +125,10 @@ def init(blocking=False,
http_node_id = ray.state.current_node_id()
master_actor = ServeMaster.options(
detached=True,
name=SERVE_MASTER_NAME,
name=master_actor_name,
max_restarts=-1,
).remote(start_server, http_node_id, http_host, http_port, metric_exporter)
).remote(cluster_name, start_server, http_node_id, http_host, http_port,
metric_exporter)
if start_server and blocking:
block_until_http_ready("http://{}:{}/-/routes".format(
+17 -7
View File
@@ -7,7 +7,8 @@ from ray import serve
from ray.serve import context as serve_context
from ray.serve.context import FakeFlaskRequest
from collections import defaultdict
from ray.serve.utils import parse_request_item, _get_logger
from ray.serve.utils import (parse_request_item, _get_logger,
retry_actor_failures)
from ray.serve.exceptions import RayServeException
from ray.serve.metric import MetricClient
from ray.async_compat import sync_to_async
@@ -26,15 +27,24 @@ def create_backend_worker(func_or_class):
assert False, "func_or_class must be function or class."
class RayServeWrappedWorker(object):
def __init__(self, backend_tag, replica_tag, init_args):
serve.init()
def __init__(self,
backend_tag,
replica_tag,
init_args,
cluster_name=None):
serve.init(cluster_name=cluster_name)
if is_function:
_callable = func_or_class
else:
_callable = func_or_class(*init_args)
master = serve.api._get_master_actor()
[metric_exporter] = retry_actor_failures(
master.get_metric_exporter)
metric_client = MetricClient(
metric_exporter, default_labels={"backend": backend_tag})
self.backend = RayServeWorker(backend_tag, replica_tag, _callable,
is_function)
is_function, metric_client)
async def handle_request(self, request):
return await self.backend.handle_request(request)
@@ -67,14 +77,14 @@ def ensure_async(func):
class RayServeWorker:
"""Handles requests with the provided callable."""
def __init__(self, name, replica_tag, _callable, is_function):
def __init__(self, name, replica_tag, _callable, is_function,
metric_client):
self.name = name
self.replica_tag = replica_tag
self.callable = _callable
self.is_function = is_function
self.metric_client = MetricClient.connect_from_serve(
default_labels={"backend": self.name})
self.metric_client = metric_client
self.request_counter = self.metric_client.new_counter(
"backend_request_counter",
description=("Number of queries that have been "
+5 -4
View File
@@ -4,12 +4,12 @@ import socket
import uvicorn
import ray
from ray import serve
from ray.serve.context import TaskContext
from ray.serve.metric import MetricClient
from ray.serve.request_params import RequestMetadata
from ray.serve.http_util import Response
from ray.serve.utils import logger, retry_actor_failures_async
from ray.serve.constants import SERVE_MASTER_NAME
from urllib.parse import parse_qs
@@ -29,7 +29,7 @@ class HTTPProxy:
async def fetch_config_from_master(self):
assert ray.is_initialized()
master = ray.util.get_actor(SERVE_MASTER_NAME)
master = serve.api._get_master_actor()
self.route_table, [
self.router_handle
@@ -39,7 +39,7 @@ class HTTPProxy:
[self.metric_exporter] = await retry_actor_failures_async(
master.get_metric_exporter)
self.metric_client = MetricClient.connect_from_serve()
self.metric_client = MetricClient(self.metric_exporter)
self.request_counter = self.metric_client.new_counter(
"num_http_requests",
description="The number of requests processed",
@@ -194,7 +194,8 @@ class HTTPProxy:
@ray.remote
class HTTPProxyActor:
async def __init__(self, host, port):
async def __init__(self, host, port, cluster_name=None):
serve.init(cluster_name=cluster_name)
self.app = HTTPProxy()
await self.app.fetch_config_from_master()
self.host = host
+38 -20
View File
@@ -13,7 +13,8 @@ from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.metric.exporter import MetricExporterActor
from ray.serve.router import Router
from ray.serve.utils import async_retryable, get_random_letters, logger
from ray.serve.utils import (async_retryable, format_actor_name,
get_random_letters, logger)
import numpy as np
@@ -49,10 +50,13 @@ class ServeMaster:
requires all implementations here to be idempotent.
"""
async def __init__(self, start_http_proxy, http_node_id, http_proxy_host,
http_proxy_port, metric_exporter_class):
async def __init__(self, cluster_name, start_http_proxy, http_node_id,
http_proxy_host, http_proxy_port,
metric_exporter_class):
# Unique name of the serve cluster managed by this actor. Used to
# namespace child actors and checkpoints.
self.cluster_name = cluster_name
# Used to read/write checkpoints.
# TODO(edoakes): namespace the master actor and its checkpoints.
self.kv_store = RayInternalKVStore()
# path -> (endpoint, methods).
self.routes = {}
@@ -105,7 +109,10 @@ class ServeMaster:
# a checkpoint to the event loop. Other state-changing calls acquire
# this lock and will be blocked until recovering from the checkpoint
# finishes.
checkpoint = self.kv_store.get(CHECKPOINT_KEY)
checkpoint_key = CHECKPOINT_KEY
if self.cluster_name is not None:
checkpoint_key = "{}:{}".format(self.cluster_name, checkpoint_key)
checkpoint = self.kv_store.get(checkpoint_key)
if checkpoint is None:
logger.debug("No checkpoint found")
else:
@@ -118,16 +125,17 @@ class ServeMaster:
If the router does not already exist, it will be started.
"""
router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name)
try:
self.router = ray.util.get_actor(SERVE_ROUTER_NAME)
self.router = ray.util.get_actor(router_name)
except ValueError:
logger.info(
"Starting router with name '{}'".format(SERVE_ROUTER_NAME))
logger.info("Starting router with name '{}'".format(router_name))
self.router = async_retryable(ray.remote(Router)).options(
detached=True,
name=SERVE_ROUTER_NAME,
name=router_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1).remote()
max_restarts=-1,
).remote(cluster_name=self.cluster_name)
def get_router(self):
"""Returns a handle to the router managed by this actor."""
@@ -138,21 +146,23 @@ class ServeMaster:
If the HTTP proxy does not already exist, it will be started.
"""
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name)
try:
self.http_proxy = ray.util.get_actor(SERVE_PROXY_NAME)
self.http_proxy = ray.util.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}'".format(
SERVE_PROXY_NAME, node_id))
proxy_name, node_id))
self.http_proxy = async_retryable(HTTPProxyActor).options(
detached=True,
name=SERVE_PROXY_NAME,
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
resources={
node_id: 0.01
},
).remote(host, port)
).remote(
host, port, cluster_name=self.cluster_name)
def get_http_proxy(self):
"""Returns a handle to the HTTP proxy managed by this actor."""
@@ -167,14 +177,16 @@ class ServeMaster:
If the metric exporter does not already exist, it will be started.
"""
metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME,
self.cluster_name)
try:
self.metric_exporter = ray.util.get_actor(SERVE_METRIC_SINK_NAME)
self.metric_exporter = ray.util.get_actor(metric_sink_name)
except ValueError:
logger.info("Starting metric exporter with name '{}'".format(
SERVE_METRIC_SINK_NAME))
metric_sink_name))
self.metric_exporter = MetricExporterActor.options(
detached=True,
name=SERVE_METRIC_SINK_NAME).remote(metric_exporter_class)
name=metric_sink_name).remote(metric_exporter_class)
def get_metric_exporter(self):
"""Returns a handle to the metric exporter managed by this actor."""
@@ -232,8 +244,10 @@ class ServeMaster:
# were created.
for backend_tag, replica_tags in self.replicas.items():
for replica_tag in replica_tags:
replica_name = format_actor_name(replica_tag,
self.cluster_name)
self.workers[backend_tag][replica_tag] = ray.util.get_actor(
replica_tag)
replica_name)
# Push configuration state to the router.
# TODO(edoakes): should we make this a pull-only model for simplicity?
@@ -295,12 +309,16 @@ class ServeMaster:
(backend_worker, backend_config,
replica_config) = self.backends[backend_tag]
replica_name = format_actor_name(replica_tag, self.cluster_name)
worker_handle = async_retryable(ray.remote(backend_worker)).options(
detached=True,
name=replica_tag,
name=replica_name,
max_restarts=-1,
**replica_config.ray_actor_options).remote(
backend_tag, replica_tag, replica_config.actor_init_args)
backend_tag,
replica_tag,
replica_config.actor_init_args,
cluster_name=self.cluster_name)
# TODO(edoakes): we should probably have a timeout here.
await worker_handle.ready.remote()
return worker_handle
+1 -14
View File
@@ -6,8 +6,7 @@ from ray.serve.metric.types import (
convert_event_type_to_class,
MetricMetadata,
)
from ray.serve.utils import (retry_actor_failures, retry_actor_failures_async,
_get_logger)
from ray.serve.utils import retry_actor_failures_async, _get_logger
from ray.serve.constants import METRIC_PUSH_INTERVAL_S
logger = _get_logger()
@@ -38,18 +37,6 @@ class MetricClient:
self.push_to_exporter_forever(push_interval))
logger.debug("Initialized client")
@staticmethod
def connect_from_serve(default_labels: Optional[Dict[str, str]] = None):
"""Create the metric client automatically when running inside serve."""
from ray.serve.api import _get_master_actor
master_actor = _get_master_actor()
[metric_exporter] = retry_actor_failures(
master_actor.get_metric_exporter)
return MetricClient(
metric_exporter_actor=metric_exporter,
default_labels=default_labels)
def new_counter(self,
name: str,
*,
+8 -4
View File
@@ -15,6 +15,8 @@ import blist
import ray
import ray.cloudpickle as pickle
from ray.exceptions import RayTaskError
from ray import serve
from ray.serve.metric import MetricClient
from ray.serve.policy import RandomEndpointPolicy
from ray.serve.utils import logger, retry_actor_failures
@@ -106,7 +108,7 @@ class Router:
3. When there is only 1 backend ready, we will only use that backend.
"""
async def __init__(self):
async def __init__(self, cluster_name=None):
# Note: Several queues are used in the router
# - When a request come in, it's placed inside its corresponding
# endpoint_queue.
@@ -152,8 +154,8 @@ class Router:
# the master actor. We use a "pull-based" approach instead of pushing
# them from the master so that the router can transparently recover
# from failure.
ray.serve.init()
master_actor = ray.serve.api._get_master_actor()
serve.init(cluster_name=cluster_name)
master_actor = serve.api._get_master_actor()
traffic_policies = retry_actor_failures(
master_actor.get_traffic_policies)
@@ -171,7 +173,9 @@ class Router:
for backend, backend_config in backend_configs.items():
await self.set_backend_config(backend, backend_config)
self.metric_client = MetricClient.connect_from_serve()
[metric_exporter] = retry_actor_failures(
master_actor.get_metric_exporter)
self.metric_client = MetricClient(metric_exporter)
self.num_router_requests = self.metric_client.new_counter(
"num_router_requests",
description="Number of requests processed by the router.",
+1
View File
@@ -17,6 +17,7 @@ def _shared_serve_instance():
@pytest.fixture
def serve_instance(_shared_serve_instance):
serve.init()
yield
master = serve.api._get_master_actor()
# Clear all state between tests to avoid naming collisions.
+44
View File
@@ -342,3 +342,47 @@ def test_shard_key(serve_instance, route):
# Check that the shard keys are mapped to the same backends.
for shard_key in shard_keys:
assert do_request(shard_key) == results[shard_key]
def test_cluster_name():
with pytest.raises(TypeError):
serve.init(cluster_name=1)
route = "/api"
backend = "backend"
endpoint = "endpoint"
serve.init(cluster_name="cluster1", blocking=True, http_port=8001)
serve.create_endpoint(endpoint, route=route)
def function():
return "hello1"
serve.create_backend(backend, function)
serve.set_traffic(endpoint, {backend: 1.0})
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
# Create a second cluster on port 8002. Create an endpoint and backend with
# the same names and check that they don't collide.
serve.init(cluster_name="cluster2", blocking=True, http_port=8002)
serve.create_endpoint(endpoint, route=route)
def function():
return "hello2"
serve.create_backend(backend, function)
serve.set_traffic(endpoint, {backend: 1.0})
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
assert requests.get("http://127.0.0.1:8002" + route).text == "hello2"
# Check that deleting the backend in the current cluster doesn't.
serve.delete_endpoint(endpoint)
serve.delete_backend(backend)
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
# Check that we can re-connect to the first cluster.
serve.init(cluster_name="cluster1")
serve.delete_endpoint(endpoint)
serve.delete_backend(backend)
+7
View File
@@ -177,3 +177,10 @@ async def retry_actor_failures_async(f, *args, **kwargs):
raise RuntimeError("Timed out after {}s waiting for actor "
"method '{}' to succeed.".format(
ACTOR_FAILURE_RETRY_TIMEOUT_S, f._method_name))
def format_actor_name(actor_name, cluster_name=None):
if cluster_name is None:
return actor_name
else:
return "{}:{}".format(cluster_name, actor_name)