[serve] Refer to serve "instances," not "clusters" (#8746)

This commit is contained in:
Edward Oakes
2020-06-02 17:16:29 -05:00
committed by GitHub
parent 2e82e05e4b
commit 0306e4d589
8 changed files with 51 additions and 52 deletions
+7 -7
View File
@@ -58,7 +58,7 @@ def accept_batch(f):
return f
def init(cluster_name=None,
def init(name=None,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
metric_exporter=InMemoryExporter):
@@ -71,8 +71,8 @@ def init(cluster_name=None,
separately before calling `serve.init`.
Args:
cluster_name (str): A unique name for this serve cluster. This allows
multiple serve clusters to run on the same ray cluster. Must be
name (str): A unique name for this serve instance. This allows
multiple serve instances to run on the same ray cluster. Must be
specified in all subsequent serve.init() calls.
http_host (str): Host for HTTP server. Default to "0.0.0.0".
http_port (int): Port for HTTP server. Default to 8000.
@@ -81,8 +81,8 @@ def init(cluster_name=None,
services. RayServe has two options built in: InMemoryExporter and
PrometheusExporter
"""
if cluster_name is not None and not isinstance(cluster_name, str):
raise TypeError("cluster_name must be a string.")
if name is not None and not isinstance(name, str):
raise TypeError("name must be a string.")
# Initialize ray if needed.
if not ray.is_initialized():
@@ -90,7 +90,7 @@ def init(cluster_name=None,
# Try to get serve master actor if it exists
global master_actor
master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name)
master_actor_name = format_actor_name(SERVE_MASTER_NAME, name)
try:
master_actor = ray.get_actor(master_actor_name)
return
@@ -111,7 +111,7 @@ def init(cluster_name=None,
master_actor = ServeMaster.options(
name=master_actor_name,
max_restarts=-1,
).remote(cluster_name, http_node_id, http_host, http_port, metric_exporter)
).remote(name, http_node_id, http_host, http_port, metric_exporter)
block_until_http_ready(
"http://{}:{}/-/routes".format(http_host, http_port),
+2 -2
View File
@@ -31,8 +31,8 @@ def create_backend_worker(func_or_class):
backend_tag,
replica_tag,
init_args,
cluster_name=None):
serve.init(cluster_name=cluster_name)
instance_name=None):
serve.init(name=instance_name)
if is_function:
_callable = func_or_class
else:
+2 -2
View File
@@ -194,8 +194,8 @@ class HTTPProxy:
@ray.remote
class HTTPProxyActor:
async def __init__(self, host, port, cluster_name=None):
serve.init(cluster_name=cluster_name)
async def __init__(self, host, port, instance_name=None):
serve.init(name=instance_name)
self.app = HTTPProxy()
await self.app.fetch_config_from_master()
self.host = host
+17 -17
View File
@@ -50,11 +50,11 @@ class ServeMaster:
requires all implementations here to be idempotent.
"""
async def __init__(self, cluster_name, http_node_id, http_proxy_host,
async def __init__(self, instance_name, http_node_id, http_proxy_host,
http_proxy_port, metric_exporter_class):
# Unique name of the serve cluster managed by this actor. Used to
# Unique name of the serve instance managed by this actor. Used to
# namespace child actors and checkpoints.
self.cluster_name = cluster_name
self.instance_name = instance_name
# Used to read/write checkpoints.
self.kv_store = RayInternalKVStore()
# path -> (endpoint, methods).
@@ -108,8 +108,8 @@ class ServeMaster:
# this lock and will be blocked until recovering from the checkpoint
# finishes.
checkpoint_key = CHECKPOINT_KEY
if self.cluster_name is not None:
checkpoint_key = "{}:{}".format(self.cluster_name, checkpoint_key)
if self.instance_name is not None:
checkpoint_key = "{}:{}".format(self.instance_name, checkpoint_key)
checkpoint = self.kv_store.get(checkpoint_key)
if checkpoint is None:
logger.debug("No checkpoint found")
@@ -119,11 +119,11 @@ class ServeMaster:
self._recover_from_checkpoint(checkpoint))
def _get_or_start_router(self):
"""Get the router belonging to this serve cluster.
"""Get the router belonging to this serve instance.
If the router does not already exist, it will be started.
"""
router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name)
router_name = format_actor_name(SERVE_ROUTER_NAME, self.instance_name)
try:
self.router = ray.get_actor(router_name)
except ValueError:
@@ -132,18 +132,18 @@ class ServeMaster:
name=router_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
).remote(cluster_name=self.cluster_name)
).remote(instance_name=self.instance_name)
def get_router(self):
"""Returns a handle to the router managed by this actor."""
return [self.router]
def _get_or_start_http_proxy(self, node_id, host, port):
"""Get the HTTP proxy belonging to this serve cluster.
"""Get the HTTP proxy belonging to this serve instance.
If the HTTP proxy does not already exist, it will be started.
"""
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name)
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.instance_name)
try:
self.http_proxy = ray.get_actor(proxy_name)
except ValueError:
@@ -158,7 +158,7 @@ class ServeMaster:
node_id: 0.01
},
).remote(
host, port, cluster_name=self.cluster_name)
host, port, instance_name=self.instance_name)
def get_http_proxy(self):
"""Returns a handle to the HTTP proxy managed by this actor."""
@@ -169,12 +169,12 @@ class ServeMaster:
return self.routes, self.get_router()
def _get_or_start_metric_exporter(self, metric_exporter_class):
"""Get the metric exporter belonging to this serve cluster.
"""Get the metric exporter belonging to this serve instance.
If the metric exporter does not already exist, it will be started.
"""
metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME,
self.cluster_name)
self.instance_name)
try:
self.metric_exporter = ray.get_actor(metric_sink_name)
except ValueError:
@@ -204,7 +204,7 @@ class ServeMaster:
os._exit(0)
async def _recover_from_checkpoint(self, checkpoint_bytes):
"""Recover the cluster state from the provided checkpoint.
"""Recover the instance state from the provided checkpoint.
Performs the following operations:
1) Deserializes the internal state from the checkpoint.
@@ -240,7 +240,7 @@ class ServeMaster:
for backend_tag, replica_tags in self.replicas.items():
for replica_tag in replica_tags:
replica_name = format_actor_name(replica_tag,
self.cluster_name)
self.instance_name)
self.workers[backend_tag][replica_tag] = ray.get_actor(
replica_name)
@@ -304,7 +304,7 @@ class ServeMaster:
(backend_worker, backend_config,
replica_config) = self.backends[backend_tag]
replica_name = format_actor_name(replica_tag, self.cluster_name)
replica_name = format_actor_name(replica_tag, self.instance_name)
worker_handle = async_retryable(ray.remote(backend_worker)).options(
name=replica_name,
max_restarts=-1,
@@ -312,7 +312,7 @@ class ServeMaster:
backend_tag,
replica_tag,
replica_config.actor_init_args,
cluster_name=self.cluster_name)
instance_name=self.instance_name)
# TODO(edoakes): we should probably have a timeout here.
await worker_handle.ready.remote()
return worker_handle
+2 -2
View File
@@ -86,7 +86,7 @@ def _make_future_unwrapper(client_futures: List[asyncio.Future],
class Router:
"""A router that routes request to available workers."""
async def __init__(self, cluster_name=None):
async def __init__(self, instance_name=None):
# Note: Several queues are used in the router
# - When a request come in, it's placed inside its corresponding
# endpoint_queue.
@@ -133,7 +133,7 @@ class Router:
# the master actor. We use a "pull-based" approach instead of pushing
# them from the master so that the router can transparently recover
# from failure.
serve.init(cluster_name=cluster_name)
serve.init(name=instance_name)
master_actor = serve.api._get_master_actor()
traffic_policies = retry_actor_failures(
+5 -5
View File
@@ -346,15 +346,15 @@ def test_shard_key(serve_instance, route):
assert do_request(shard_key) == results[shard_key]
def test_cluster_name():
def test_name():
with pytest.raises(TypeError):
serve.init(cluster_name=1)
serve.init(name=1)
route = "/api"
backend = "backend"
endpoint = "endpoint"
serve.init(cluster_name="cluster1", http_port=8001)
serve.init(name="cluster1", http_port=8001)
serve.create_endpoint(endpoint, route=route)
def function():
@@ -367,7 +367,7 @@ def test_cluster_name():
# Create a second cluster on port 8002. Create an endpoint and backend with
# the same names and check that they don't collide.
serve.init(cluster_name="cluster2", http_port=8002)
serve.init(name="cluster2", http_port=8002)
serve.create_endpoint(endpoint, route=route)
def function():
@@ -385,7 +385,7 @@ def test_cluster_name():
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
# Check that we can re-connect to the first cluster.
serve.init(cluster_name="cluster1")
serve.init(name="cluster1")
serve.delete_endpoint(endpoint)
serve.delete_backend(backend)
+3 -3
View File
@@ -179,8 +179,8 @@ async def retry_actor_failures_async(f, *args, **kwargs):
ACTOR_FAILURE_RETRY_TIMEOUT_S, f._method_name))
def format_actor_name(actor_name, cluster_name=None):
if cluster_name is None:
def format_actor_name(actor_name, instance_name=None):
if instance_name is None:
return actor_name
else:
return "{}:{}".format(cluster_name, actor_name)
return "{}:{}".format(instance_name, actor_name)