From c9010eb8ad856e4fb284a5d9f71a61cb283eb7cf Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 23 Jun 2020 13:42:03 -0500 Subject: [PATCH] [serve] Add serve.shutdown() (#8766) --- doc/source/serve/package-ref.rst | 2 +- python/ray/experimental/internal_kv.py | 10 ++++---- python/ray/serve/__init__.py | 3 ++- python/ray/serve/api.py | 12 ++++++++++ python/ray/serve/kv_store.py | 11 +++++++++ python/ray/serve/master.py | 18 +++++++++++---- python/ray/serve/tests/conftest.py | 2 ++ python/ray/serve/tests/test_api.py | 32 +++++++++++++++++++++++++- 8 files changed, 77 insertions(+), 13 deletions(-) diff --git a/doc/source/serve/package-ref.rst b/doc/source/serve/package-ref.rst index 5fb94aeeb..1236a1081 100644 --- a/doc/source/serve/package-ref.rst +++ b/doc/source/serve/package-ref.rst @@ -4,6 +4,7 @@ Package Reference Basic APIs ---------- .. autofunction:: ray.serve.init +.. autofunction:: ray.serve.shutdown .. autofunction:: ray.serve.create_backend .. autofunction:: ray.serve.create_endpoint @@ -37,4 +38,3 @@ Advanced APIs ``serve.accept_batch`` marks your backend API does accept list of input instead of just single input. .. autofunction:: ray.serve.accept_batch - diff --git a/python/ray/experimental/internal_kv.py b/python/ray/experimental/internal_kv.py index 1bdd910a6..14434b558 100644 --- a/python/ray/experimental/internal_kv.py +++ b/python/ray/experimental/internal_kv.py @@ -1,7 +1,5 @@ import ray -_local = {} # dict for local mode - def _internal_kv_initialized(): worker = ray.worker.global_worker @@ -11,9 +9,7 @@ def _internal_kv_initialized(): def _internal_kv_get(key): """Fetch the value of a binary key.""" - worker = ray.worker.global_worker - - return worker.redis_client.hget(key, "value") + return ray.worker.global_worker.redis_client.hget(key, "value") def _internal_kv_put(key, value, overwrite=False): @@ -32,3 +28,7 @@ def _internal_kv_put(key, value, overwrite=False): else: updated = worker.redis_client.hsetnx(key, "value", value) return updated == 0 # already exists + + +def _internal_kv_del(key): + return ray.worker.global_worker.redis_client.delete(key) diff --git a/python/ray/serve/__init__.py b/python/ray/serve/__init__.py index 7bc2ff580..ed6d9f697 100644 --- a/python/ray/serve/__init__.py +++ b/python/ray/serve/__init__.py @@ -1,7 +1,7 @@ from ray.serve.api import ( init, create_backend, delete_backend, create_endpoint, delete_endpoint, set_traffic, get_handle, stat, update_backend_config, get_backend_config, - accept_batch, list_backends, list_endpoints) # noqa: E402 + accept_batch, list_backends, list_endpoints, shutdown) # noqa: E402 __all__ = [ "init", @@ -17,4 +17,5 @@ __all__ = [ "accept_batch", "list_backends", "list_endpoints", + "shutdown", ] diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 7387bcfdd..736bfc895 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -119,6 +119,18 @@ def init(name=None, @_ensure_connected +def shutdown(): + """Completely shut down the connected Serve instance. + + Shuts down all processes and deletes all state associated with the Serve + instance that's currently connected to (via serve.init). + """ + global master_actor + ray.get(master_actor.shutdown.remote()) + ray.kill(master_actor, no_restart=True) + master_actor = None + + def create_endpoint(endpoint_name, *, backend=None, diff --git a/python/ray/serve/kv_store.py b/python/ray/serve/kv_store.py index e89377650..485eed996 100644 --- a/python/ray/serve/kv_store.py +++ b/python/ray/serve/kv_store.py @@ -45,3 +45,14 @@ class RayInternalKVStore: raise TypeError("key must be a string, got: {}.".format(type(key))) return ray_kv._internal_kv_get(self._format_key(key)) + + def delete(self, key): + """Delete the value associated with the given key from the store. + + Args: + key (str) + """ + + if not isinstance(key, str): + raise TypeError("key must be a string, got: {}.".format(type(key))) + return ray_kv._internal_kv_del(self._format_key(key)) diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index d7834b9c9..ee39f71d6 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -61,7 +61,7 @@ class ServeMaster: # namespace child actors and checkpoints. self.instance_name = instance_name # Used to read/write checkpoints. - self.kv_store = RayInternalKVStore() + self.kv_store = RayInternalKVStore(namespace=instance_name) # path -> (endpoint, methods). self.routes = {} # backend -> (backend_worker, backend_config, replica_config). @@ -112,10 +112,7 @@ class ServeMaster: # a checkpoint to the event loop. Other state-changing calls acquire # this lock and will be blocked until recovering from the checkpoint # finishes. - checkpoint_key = CHECKPOINT_KEY - if self.instance_name is not None: - checkpoint_key = "{}:{}".format(self.instance_name, checkpoint_key) - checkpoint = self.kv_store.get(checkpoint_key) + checkpoint = self.kv_store.get(CHECKPOINT_KEY) if checkpoint is None: logger.debug("No checkpoint found") else: @@ -712,3 +709,14 @@ class ServeMaster: assert (backend_tag in self.backends ), "Backend {} is not registered.".format(backend_tag) return self.backends[backend_tag][2] + + async def shutdown(self): + """Shuts down the serve instance completely.""" + async with self.write_lock: + ray.kill(self.http_proxy, no_restart=True) + ray.kill(self.router, no_restart=True) + ray.kill(self.metric_exporter, no_restart=True) + for replica_dict in self.workers.values(): + for replica in replica_dict.values(): + ray.kill(replica, no_restart=True) + self.kv_store.delete(CHECKPOINT_KEY) diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index dcd2d0c66..9fe0ede12 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -20,6 +20,8 @@ def _shared_serve_instance(): def serve_instance(_shared_serve_instance): serve.init() yield + # Re-init if necessary. + serve.init() master = serve.api._get_master_actor() # Clear all state between tests to avoid naming collisions. for endpoint in ray.get(master.get_all_endpoints.remote()): diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index ef96dccd5..527165cfc 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -6,8 +6,10 @@ import requests import ray from ray import serve -from ray.serve.utils import get_random_letters +from ray.test_utils import wait_for_condition +from ray.serve import constants from ray.serve.exceptions import RayServeException +from ray.serve.utils import format_actor_name, get_random_letters def test_e2e(serve_instance): @@ -546,6 +548,34 @@ def test_create_infeasible_error(serve_instance): assert len(replicas) == 0 +def test_shutdown(serve_instance): + def f(): + pass + + instance_name = "shutdown" + serve.init(name=instance_name, http_port=8002) + serve.create_backend("backend", f) + serve.create_endpoint("endpoint", backend="backend") + + serve.shutdown() + with pytest.raises(RayServeException, match="Please run serve.init"): + serve.list_backends() + + def check_dead(): + for actor_name in [ + constants.SERVE_MASTER_NAME, constants.SERVE_PROXY_NAME, + constants.SERVE_ROUTER_NAME, constants.SERVE_METRIC_SINK_NAME + ]: + try: + ray.get_actor(format_actor_name(actor_name, instance_name)) + return False + except ValueError: + pass + return True + + assert wait_for_condition(check_dead) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", "-s", __file__]))