From 0306e4d5896db71d12ba852bd06f63c4e75793af Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 2 Jun 2020 17:16:29 -0500 Subject: [PATCH] [serve] Refer to serve "instances," not "clusters" (#8746) --- doc/source/serve/deployment.rst | 27 ++++++++++++------------ python/ray/serve/api.py | 14 ++++++------ python/ray/serve/backend_worker.py | 4 ++-- python/ray/serve/http_proxy.py | 4 ++-- python/ray/serve/master.py | 34 +++++++++++++++--------------- python/ray/serve/router.py | 4 ++-- python/ray/serve/tests/test_api.py | 10 ++++----- python/ray/serve/utils.py | 6 +++--- 8 files changed, 51 insertions(+), 52 deletions(-) diff --git a/doc/source/serve/deployment.rst b/doc/source/serve/deployment.rst index 92c2b25ce..9a1e9ec55 100644 --- a/doc/source/serve/deployment.rst +++ b/doc/source/serve/deployment.rst @@ -6,10 +6,10 @@ In the :doc:`key-concepts`, you saw some of the basics of how to write serve app This section will dive a bit deeper into how Ray Serve runs on a Ray cluster and how you're able to deploy and update your serve application over time. -To deploy a Ray Serve application (and cluster) you're going to need several things. +To deploy a Ray Serve instance you're going to need several things. -1. A running Ray cluster (you can deploy one on your local machine for testing). -2. A Ray Serve cluster To learn more about Ray clusters see :doc:`../cluster-index`. +1. A running Ray cluster (you can deploy one on your local machine for testing). To learn more about Ray clusters see :doc:`../cluster-index`. +2. A Ray Serve instance. 3. Your Ray Serve endpoint(s) and backend(s). .. contents:: Deploying Ray Serve @@ -57,8 +57,7 @@ Creating a Model and Serving it In the following snippet we will complete two things: 1. Define a servable model by instantiating a class and defining the ``__call__`` method. -2. Connect to our running Ray cluster(``ray.init(...)``) and then start or connect to the Ray Serve service -on that cluster(``serve.init(...)``). +2. Connect to our running Ray cluster(``ray.init(...)``) and then start or connect to the Ray Serve instance on that cluster(``serve.init(...)``). You can see that defining the model is straightforward and simple, we're simply instantiating @@ -277,26 +276,26 @@ opt for launching a Ray Cluster locally. Specify a Ray cluster like we did in :r To learn more, in general, about Ray Clusters see :doc:`../cluster-index`. -Deploying Multiple Serve Clusters on a Single Ray Cluster +Deploying Multiple Serve Instaces on a Single Ray Cluster --------------------------------------------------------- -You can run multiple serve clusters on the same Ray cluster by providing a ``cluster_name`` to ``serve.init()``. +You can run multiple serve instances on the same Ray cluster by providing a ``name`` in ``serve.init()``. .. code-block:: python - # Create a first cluster whose HTTP server listens on 8000. - serve.init(cluster_name="cluster1", http_port=8000) + # Create a first instance whose HTTP server listens on 8000. + serve.init(name="instance1", http_port=8000) serve.create_endpoint("counter1", "/increment") - # Create a second cluster whose HTTP server listens on 8001. - serve.init(cluster_name="cluster2", http_port=8001) + # Create a second instance whose HTTP server listens on 8001. + serve.init(name="instance2", http_port=8001) serve.create_endpoint("counter1", "/increment") - # Create a backend that will be served on the second cluster. + # Create a backend that will be served on the second instance. serve.create_backend("counter1", function) serve.set_traffic("counter1", {"counter1": 1.0}) - # Switch back the the first cluster and create the same backend on it. - serve.init(cluster_name="cluster1") + # Switch back the the first instance and create the same backend on it. + serve.init(name="instance1") serve.create_backend("counter1", function) serve.set_traffic("counter1", {"counter1": 1.0}) diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 026b99f48..30a464498 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -58,7 +58,7 @@ def accept_batch(f): return f -def init(cluster_name=None, +def init(name=None, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, metric_exporter=InMemoryExporter): @@ -71,8 +71,8 @@ def init(cluster_name=None, separately before calling `serve.init`. Args: - cluster_name (str): A unique name for this serve cluster. This allows - multiple serve clusters to run on the same ray cluster. Must be + name (str): A unique name for this serve instance. This allows + multiple serve instances to run on the same ray cluster. Must be specified in all subsequent serve.init() calls. http_host (str): Host for HTTP server. Default to "0.0.0.0". http_port (int): Port for HTTP server. Default to 8000. @@ -81,8 +81,8 @@ def init(cluster_name=None, services. RayServe has two options built in: InMemoryExporter and PrometheusExporter """ - if cluster_name is not None and not isinstance(cluster_name, str): - raise TypeError("cluster_name must be a string.") + if name is not None and not isinstance(name, str): + raise TypeError("name must be a string.") # Initialize ray if needed. if not ray.is_initialized(): @@ -90,7 +90,7 @@ def init(cluster_name=None, # Try to get serve master actor if it exists global master_actor - master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name) + master_actor_name = format_actor_name(SERVE_MASTER_NAME, name) try: master_actor = ray.get_actor(master_actor_name) return @@ -111,7 +111,7 @@ def init(cluster_name=None, master_actor = ServeMaster.options( name=master_actor_name, max_restarts=-1, - ).remote(cluster_name, http_node_id, http_host, http_port, metric_exporter) + ).remote(name, http_node_id, http_host, http_port, metric_exporter) block_until_http_ready( "http://{}:{}/-/routes".format(http_host, http_port), diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index e05267dc8..faac14192 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -31,8 +31,8 @@ def create_backend_worker(func_or_class): backend_tag, replica_tag, init_args, - cluster_name=None): - serve.init(cluster_name=cluster_name) + instance_name=None): + serve.init(name=instance_name) if is_function: _callable = func_or_class else: diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 968b5d443..24797c276 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -194,8 +194,8 @@ class HTTPProxy: @ray.remote class HTTPProxyActor: - async def __init__(self, host, port, cluster_name=None): - serve.init(cluster_name=cluster_name) + async def __init__(self, host, port, instance_name=None): + serve.init(name=instance_name) self.app = HTTPProxy() await self.app.fetch_config_from_master() self.host = host diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 798e40b94..7b49bed66 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -50,11 +50,11 @@ class ServeMaster: requires all implementations here to be idempotent. """ - async def __init__(self, cluster_name, http_node_id, http_proxy_host, + async def __init__(self, instance_name, http_node_id, http_proxy_host, http_proxy_port, metric_exporter_class): - # Unique name of the serve cluster managed by this actor. Used to + # Unique name of the serve instance managed by this actor. Used to # namespace child actors and checkpoints. - self.cluster_name = cluster_name + self.instance_name = instance_name # Used to read/write checkpoints. self.kv_store = RayInternalKVStore() # path -> (endpoint, methods). @@ -108,8 +108,8 @@ class ServeMaster: # this lock and will be blocked until recovering from the checkpoint # finishes. checkpoint_key = CHECKPOINT_KEY - if self.cluster_name is not None: - checkpoint_key = "{}:{}".format(self.cluster_name, checkpoint_key) + if self.instance_name is not None: + checkpoint_key = "{}:{}".format(self.instance_name, checkpoint_key) checkpoint = self.kv_store.get(checkpoint_key) if checkpoint is None: logger.debug("No checkpoint found") @@ -119,11 +119,11 @@ class ServeMaster: self._recover_from_checkpoint(checkpoint)) def _get_or_start_router(self): - """Get the router belonging to this serve cluster. + """Get the router belonging to this serve instance. If the router does not already exist, it will be started. """ - router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name) + router_name = format_actor_name(SERVE_ROUTER_NAME, self.instance_name) try: self.router = ray.get_actor(router_name) except ValueError: @@ -132,18 +132,18 @@ class ServeMaster: name=router_name, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, - ).remote(cluster_name=self.cluster_name) + ).remote(instance_name=self.instance_name) def get_router(self): """Returns a handle to the router managed by this actor.""" return [self.router] def _get_or_start_http_proxy(self, node_id, host, port): - """Get the HTTP proxy belonging to this serve cluster. + """Get the HTTP proxy belonging to this serve instance. If the HTTP proxy does not already exist, it will be started. """ - proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name) + proxy_name = format_actor_name(SERVE_PROXY_NAME, self.instance_name) try: self.http_proxy = ray.get_actor(proxy_name) except ValueError: @@ -158,7 +158,7 @@ class ServeMaster: node_id: 0.01 }, ).remote( - host, port, cluster_name=self.cluster_name) + host, port, instance_name=self.instance_name) def get_http_proxy(self): """Returns a handle to the HTTP proxy managed by this actor.""" @@ -169,12 +169,12 @@ class ServeMaster: return self.routes, self.get_router() def _get_or_start_metric_exporter(self, metric_exporter_class): - """Get the metric exporter belonging to this serve cluster. + """Get the metric exporter belonging to this serve instance. If the metric exporter does not already exist, it will be started. """ metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME, - self.cluster_name) + self.instance_name) try: self.metric_exporter = ray.get_actor(metric_sink_name) except ValueError: @@ -204,7 +204,7 @@ class ServeMaster: os._exit(0) async def _recover_from_checkpoint(self, checkpoint_bytes): - """Recover the cluster state from the provided checkpoint. + """Recover the instance state from the provided checkpoint. Performs the following operations: 1) Deserializes the internal state from the checkpoint. @@ -240,7 +240,7 @@ class ServeMaster: for backend_tag, replica_tags in self.replicas.items(): for replica_tag in replica_tags: replica_name = format_actor_name(replica_tag, - self.cluster_name) + self.instance_name) self.workers[backend_tag][replica_tag] = ray.get_actor( replica_name) @@ -304,7 +304,7 @@ class ServeMaster: (backend_worker, backend_config, replica_config) = self.backends[backend_tag] - replica_name = format_actor_name(replica_tag, self.cluster_name) + replica_name = format_actor_name(replica_tag, self.instance_name) worker_handle = async_retryable(ray.remote(backend_worker)).options( name=replica_name, max_restarts=-1, @@ -312,7 +312,7 @@ class ServeMaster: backend_tag, replica_tag, replica_config.actor_init_args, - cluster_name=self.cluster_name) + instance_name=self.instance_name) # TODO(edoakes): we should probably have a timeout here. await worker_handle.ready.remote() return worker_handle diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 1cbb4d2b9..22f07976b 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -86,7 +86,7 @@ def _make_future_unwrapper(client_futures: List[asyncio.Future], class Router: """A router that routes request to available workers.""" - async def __init__(self, cluster_name=None): + async def __init__(self, instance_name=None): # Note: Several queues are used in the router # - When a request come in, it's placed inside its corresponding # endpoint_queue. @@ -133,7 +133,7 @@ class Router: # the master actor. We use a "pull-based" approach instead of pushing # them from the master so that the router can transparently recover # from failure. - serve.init(cluster_name=cluster_name) + serve.init(name=instance_name) master_actor = serve.api._get_master_actor() traffic_policies = retry_actor_failures( diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 2c197b9f3..7ab86a2b3 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -346,15 +346,15 @@ def test_shard_key(serve_instance, route): assert do_request(shard_key) == results[shard_key] -def test_cluster_name(): +def test_name(): with pytest.raises(TypeError): - serve.init(cluster_name=1) + serve.init(name=1) route = "/api" backend = "backend" endpoint = "endpoint" - serve.init(cluster_name="cluster1", http_port=8001) + serve.init(name="cluster1", http_port=8001) serve.create_endpoint(endpoint, route=route) def function(): @@ -367,7 +367,7 @@ def test_cluster_name(): # Create a second cluster on port 8002. Create an endpoint and backend with # the same names and check that they don't collide. - serve.init(cluster_name="cluster2", http_port=8002) + serve.init(name="cluster2", http_port=8002) serve.create_endpoint(endpoint, route=route) def function(): @@ -385,7 +385,7 @@ def test_cluster_name(): assert requests.get("http://127.0.0.1:8001" + route).text == "hello1" # Check that we can re-connect to the first cluster. - serve.init(cluster_name="cluster1") + serve.init(name="cluster1") serve.delete_endpoint(endpoint) serve.delete_backend(backend) diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index df04c9cfb..4e0407d98 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -179,8 +179,8 @@ async def retry_actor_failures_async(f, *args, **kwargs): ACTOR_FAILURE_RETRY_TIMEOUT_S, f._method_name)) -def format_actor_name(actor_name, cluster_name=None): - if cluster_name is None: +def format_actor_name(actor_name, instance_name=None): + if instance_name is None: return actor_name else: - return "{}:{}".format(cluster_name, actor_name) + return "{}:{}".format(instance_name, actor_name)