From 43be73e4cfb16fee4d8029d4cd0dc0750500c9ed Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 30 Apr 2020 13:10:39 -0500 Subject: [PATCH] [serve] Add delete_backend call (#8252) --- doc/source/rayserve/overview.rst | 8 +++ python/ray/serve/__init__.py | 13 ++--- python/ray/serve/api.py | 9 ++++ python/ray/serve/master.py | 79 ++++++++++++++++++++++++------ python/ray/serve/router.py | 12 +++++ python/ray/serve/tests/test_api.py | 43 +++++++++++++++- 6 files changed, 141 insertions(+), 23 deletions(-) diff --git a/doc/source/rayserve/overview.rst b/doc/source/rayserve/overview.rst index 70a74b975..e713091c8 100644 --- a/doc/source/rayserve/overview.rst +++ b/doc/source/rayserve/overview.rst @@ -157,6 +157,14 @@ Once we've done that, we can now query our endpoint via HTTP (we use `requests` import requests print(requests.get("http://127.0.0.1:8000/-/routes", timeout=0.5).text) +To delete a backend, we can use `serve.delete_backend`. +Note that the backend must not be use by any endpoints in order to be delete. +Once a backend is deleted, its tag can be reused. + +.. code-block:: python + + serve.delete_backend("simple_backend") + Configuring Backends ~~~~~~~~~~~~~~~~~~~~ diff --git a/python/ray/serve/__init__.py b/python/ray/serve/__init__.py index 48fdb9bb3..fcf827595 100644 --- a/python/ray/serve/__init__.py +++ b/python/ray/serve/__init__.py @@ -1,11 +1,12 @@ from ray.serve.backend_config import BackendConfig from ray.serve.policy import RoutePolicy -from ray.serve.api import (init, create_backend, create_endpoint, set_traffic, - get_handle, stat, set_backend_config, - get_backend_config, accept_batch) # noqa: E402 +from ray.serve.api import (init, create_backend, delete_backend, + create_endpoint, set_traffic, get_handle, stat, + set_backend_config, get_backend_config, + accept_batch) # noqa: E402 __all__ = [ - "init", "create_backend", "create_endpoint", "set_traffic", "get_handle", - "stat", "set_backend_config", "get_backend_config", "BackendConfig", - "RoutePolicy", "accept_batch" + "init", "create_backend", "delete_backend", "create_endpoint", + "set_traffic", "get_handle", "stat", "set_backend_config", + "get_backend_config", "BackendConfig", "RoutePolicy", "accept_batch" ] diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 76fa12c31..a3152a0f5 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -240,6 +240,15 @@ def create_backend(func_or_class, backend_config, func_or_class, actor_init_args) +@_ensure_connected +def delete_backend(backend_tag): + """Delete the given backend. + + The backend must not currently be used by any endpoints. + """ + retry_actor_failures(master_actor.delete_backend, backend_tag) + + @_ensure_connected def set_traffic(endpoint_name, traffic_policy_dictionary): """Associate a service endpoint with traffic policy. diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 9c1fc9dd5..c2d194db9 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -64,6 +64,9 @@ class ServeMaster: self.replicas_to_start = defaultdict(list) # replicas that should be stopped if recovering from a checkpoint. self.replicas_to_stop = defaultdict(list) + # backends that should be removed from the router if recovering from a + # checkpoint. + self.backends_to_remove = list() # endpoint -> traffic_dict self.traffic_policies = dict() # Dictionary of backend tag to dictionaries of replica tag to worker. @@ -179,7 +182,8 @@ class ServeMaster: start = time.time() checkpoint = pickle.dumps( (self.routes, self.backends, self.traffic_policies, self.replicas, - self.replicas_to_start, self.replicas_to_stop)) + self.replicas_to_start, self.replicas_to_stop, + self.backends_to_remove)) self.kv_store_client.put("checkpoint", checkpoint) logger.debug("Wrote checkpoint in {:.2f}".format(time.time() - start)) @@ -208,8 +212,8 @@ class ServeMaster: # Load internal state from the checkpoint data. (self.routes, self.backends, self.traffic_policies, self.replicas, - self.replicas_to_start, - self.replicas_to_stop) = pickle.loads(checkpoint_bytes) + self.replicas_to_start, self.replicas_to_stop, + self.backends_to_remove) = pickle.loads(checkpoint_bytes) # Fetch actor handles for all of the backend replicas in the system. # All of these workers are guaranteed to already exist because they @@ -241,6 +245,9 @@ class ServeMaster: await self._start_pending_replicas() await self._stop_pending_replicas() + # Remove any pending backends. + await self._remove_pending_backends() + logger.info( "Recovered from checkpoint in {:.3f}s".format(time.time() - start)) @@ -342,6 +349,15 @@ class ServeMaster: self.replicas_to_stop.clear() + async def _remove_pending_backends(self): + """Removes the pending backends in self.backends_to_remove. + + Clears self.backends_to_remove. + """ + for backend_tag in self.backends_to_remove: + await self.router.remove_backend.remote(backend_tag) + self.backends_to_remove.clear() + def _scale_replicas(self, backend_tag, num_replicas): """Scale the given backend to the number of replicas. @@ -394,18 +410,20 @@ class ServeMaster: async def set_traffic(self, endpoint_name, traffic_policy_dictionary): """Sets the traffic policy for the specified endpoint.""" async with self.write_lock: - assert endpoint_name in self.get_all_endpoints(), \ - "Attempted to assign traffic for an endpoint '{}'" \ - " that is not registered.".format(endpoint_name) + if endpoint_name not in self.get_all_endpoints(): + raise ValueError( + "Attempted to assign traffic for an endpoint '{}'" + " that is not registered.".format(endpoint_name)) assert isinstance(traffic_policy_dictionary, dict), "Traffic policy must be dictionary" prob = 0 for backend, weight in traffic_policy_dictionary.items(): prob += weight - assert backend in self.backends, \ - "Attempted to assign traffic to a backend '{}' that " \ - "is not registered.".format(backend) + if backend not in self.backends: + raise ValueError( + "Attempted to assign traffic to a backend '{}' that " + "is not registered.".format(backend)) assert np.isclose( prob, 1, atol=0.02 @@ -481,21 +499,52 @@ class ServeMaster: # crash while making the change. self._checkpoint() await self._start_pending_replicas() - await self._stop_pending_replicas() # Set the backend config inside the router # (particularly for max-batch-size). await self.router.set_backend_config.remote( backend_tag, backend_config_dict) + async def delete_backend(self, backend_tag): + async with self.write_lock: + # This method must be idempotent. We should validate that the + # specified backend exists on the client. + if backend_tag not in self.backends: + return + + # Check that the specified backend isn't used by any endpoints. + for endpoint, traffic_dict in self.traffic_policies.items(): + if backend_tag in traffic_dict: + raise ValueError("Backend '{}' is used by endpoint '{}' " + "and cannot be deleted. Please remove " + "the backend from all endpoints and try " + "again.".format(backend_tag, endpoint)) + + # Scale its replicas down to 0. This will also remove the backend + # from self.backends and self.replicas. + self._scale_replicas(backend_tag, 0) + + # Remove the backend's metadata. + del self.backends[backend_tag] + + # Add the intention to remove the backend from the router. + self.backends_to_remove.append(backend_tag) + + # NOTE(edoakes): we must write a checkpoint before removing the + # backend from the router to avoid inconsistent state if we crash + # after pushing the update. + self._checkpoint() + await self._stop_pending_replicas() + await self._remove_pending_backends() + async def set_backend_config(self, backend_tag, backend_config): """Set the config for the specified backend.""" async with self.write_lock: - assert (backend_tag in self.backends - ), "Backend {} is not registered.".format(backend_tag) - assert isinstance(backend_config, - BackendConfig), ("backend_config must be" - " of instance BackendConfig") + if backend_tag not in self.backends: + raise ValueError( + "Backend '{}' is not registered.".format(backend_tag)) + if not isinstance(backend_config, BackendConfig): + raise ValueError("backend_config must be a BackendConfig.") backend_config_dict = dict(backend_config) backend_worker, init_args, old_backend_config_dict = self.backends[ backend_tag] diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index f4333b203..19e962db7 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -269,6 +269,18 @@ class Router: "backend {} to {}".format(backend, config_dict)) self.backend_info[backend] = config_dict + async def remove_backend(self, backend): + logger.debug("Removing backend {}".format(backend)) + async with self.flush_lock: + await self._flush_service_queues() + await self._flush_buffer_queues() + if backend in self.backend_info: + del self.backend_info[backend] + if backend in self.worker_queues: + del self.worker_queues[backend] + if backend in self.buffer_queues: + del self.buffer_queues[backend] + async def flush(self): """In the default case, flush calls ._flush. diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 011202630..614668ddc 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -94,9 +94,9 @@ def test_set_traffic_missing_data(serve_instance): backend_name = "foo_backend" serve.create_endpoint(endpoint_name) serve.create_backend(lambda: 5, backend_name) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): serve.set_traffic(endpoint_name, {"nonexistent_backend": 1.0}) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): serve.set_traffic("nonexistent_endpoint_name", {backend_name: 1.0}) @@ -267,3 +267,42 @@ def test_not_killing_replicas(serve_instance): # and should be subset of all_tag_list assert set(old_replica_tag_list) <= set(new_all_tag_list) assert set(old_replica_tag_list) == set(new_replica_tag_list) + + +def test_delete_backend(serve_instance): + serve.create_endpoint("delete_backend", "/delete-backend") + + def function(): + return "hello" + + serve.create_backend(function, "delete:v1") + serve.set_traffic("delete_backend", {"delete:v1": 1.0}) + + assert requests.get("http://127.0.0.1:8000/delete-backend").text == "hello" + + # Check that we can't delete the backend while it's in use. + with pytest.raises(ValueError): + serve.delete_backend("delete:v1") + + serve.create_backend(function, "delete:v2") + serve.set_traffic("delete_backend", {"delete:v1": 0.5, "delete:v2": 0.5}) + + with pytest.raises(ValueError): + serve.delete_backend("delete:v1") + + # Check that the backend can be deleted once it's no longer in use. + serve.set_traffic("delete_backend", {"delete:v2": 1.0}) + serve.delete_backend("delete:v1") + + # Check that we can no longer use the previously deleted backend. + with pytest.raises(ValueError): + serve.set_traffic("delete_backend", {"delete:v1": 1.0}) + + def function2(): + return "olleh" + + # Check that we can now reuse the previously delete backend's tag. + serve.create_backend(function2, "delete:v1") + serve.set_traffic("delete_backend", {"delete:v1": 1.0}) + + assert requests.get("http://127.0.0.1:8000/delete-backend").text == "olleh"