From 95d187e5564953857fd85f6a04f8be7515a4e914 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 30 Apr 2020 20:59:07 -0500 Subject: [PATCH] [serve] Add delete_endpoint call (#8256) --- doc/source/rayserve/overview.rst | 7 +++ python/ray/serve/__init__.py | 11 ++-- python/ray/serve/api.py | 9 +++ python/ray/serve/master.py | 61 ++++++++++++++++++- python/ray/serve/router.py | 24 +++----- python/ray/serve/tests/test_api.py | 35 +++++++++++ python/ray/serve/tests/test_backend_worker.py | 10 +-- python/ray/serve/tests/test_router.py | 4 +- 8 files changed, 132 insertions(+), 29 deletions(-) diff --git a/doc/source/rayserve/overview.rst b/doc/source/rayserve/overview.rst index e713091c8..f5dc7590d 100644 --- a/doc/source/rayserve/overview.rst +++ b/doc/source/rayserve/overview.rst @@ -108,6 +108,13 @@ model that you'll be serving. To create one, we'll simply specify the name, rout serve.create_endpoint("simple_endpoint", "/simple") +You can also delete an endpoint using `serve.delete_endpoint`. +Note that this will not delete any associated backends, which can be reused for other endpoints. + +.. code-block:: python + + serve.delete_endpoint("simple_endpoint") + .. _serve-backend: Backends diff --git a/python/ray/serve/__init__.py b/python/ray/serve/__init__.py index fcf827595..e2b4518e9 100644 --- a/python/ray/serve/__init__.py +++ b/python/ray/serve/__init__.py @@ -1,12 +1,13 @@ from ray.serve.backend_config import BackendConfig from ray.serve.policy import RoutePolicy from ray.serve.api import (init, create_backend, delete_backend, - create_endpoint, set_traffic, get_handle, stat, - set_backend_config, get_backend_config, - accept_batch) # noqa: E402 + create_endpoint, delete_endpoint, set_traffic, + get_handle, stat, set_backend_config, + get_backend_config, accept_batch) # noqa: E402 __all__ = [ "init", "create_backend", "delete_backend", "create_endpoint", - "set_traffic", "get_handle", "stat", "set_backend_config", - "get_backend_config", "BackendConfig", "RoutePolicy", "accept_batch" + "delete_endpoint", "set_traffic", "get_handle", "stat", + "set_backend_config", "get_backend_config", "BackendConfig", "RoutePolicy", + "accept_batch" ] diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index a3152a0f5..04563a99a 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -166,6 +166,15 @@ def create_endpoint(endpoint_name, route=None, methods=["GET"]): [m.upper() for m in methods]) +@_ensure_connected +def delete_endpoint(endpoint): + """Delete the given endpoint. + + Does not delete any associated backends. + """ + retry_actor_failures(master_actor.delete_endpoint, endpoint) + + @_ensure_connected def set_backend_config(backend_tag, backend_config): """Set a backend configuration for a backend tag diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index c2d194db9..36a1ba5cf 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -67,6 +67,9 @@ class ServeMaster: # backends that should be removed from the router if recovering from a # checkpoint. self.backends_to_remove = list() + # endpoints that should be removed from the router if recovering from a + # checkpoint. + self.endpoints_to_remove = list() # endpoint -> traffic_dict self.traffic_policies = dict() # Dictionary of backend tag to dictionaries of replica tag to worker. @@ -183,7 +186,7 @@ class ServeMaster: checkpoint = pickle.dumps( (self.routes, self.backends, self.traffic_policies, self.replicas, self.replicas_to_start, self.replicas_to_stop, - self.backends_to_remove)) + self.backends_to_remove, self.endpoints_to_remove)) self.kv_store_client.put("checkpoint", checkpoint) logger.debug("Wrote checkpoint in {:.2f}".format(time.time() - start)) @@ -213,7 +216,8 @@ class ServeMaster: # Load internal state from the checkpoint data. (self.routes, self.backends, self.traffic_policies, self.replicas, self.replicas_to_start, self.replicas_to_stop, - self.backends_to_remove) = pickle.loads(checkpoint_bytes) + self.backends_to_remove, + self.endpoints_to_remove) = pickle.loads(checkpoint_bytes) # Fetch actor handles for all of the backend replicas in the system. # All of these workers are guaranteed to already exist because they @@ -245,8 +249,9 @@ class ServeMaster: await self._start_pending_replicas() await self._stop_pending_replicas() - # Remove any pending backends. + # Remove any pending backends and endpoints. await self._remove_pending_backends() + await self._remove_pending_endpoints() logger.info( "Recovered from checkpoint in {:.3f}s".format(time.time() - start)) @@ -358,6 +363,15 @@ class ServeMaster: await self.router.remove_backend.remote(backend_tag) self.backends_to_remove.clear() + async def _remove_pending_endpoints(self): + """Removes the pending endpoints in self.endpoints_to_remove. + + Clears self.endpoints_to_remove. + """ + for endpoint_tag in self.endpoints_to_remove: + await self.router.remove_service.remote(endpoint_tag) + self.endpoints_to_remove.clear() + def _scale_replicas(self, backend_tag, num_replicas): """Scale the given backend to the number of replicas. @@ -479,6 +493,47 @@ class ServeMaster: self._checkpoint() await self.http_proxy.set_route_table.remote(self.routes) + async def delete_endpoint(self, endpoint): + """Delete the specified endpoint. + + Does not modify any corresponding backends. + """ + logger.info("Deleting endpoint '{}'".format(endpoint)) + async with self.write_lock: + # This method must be idempotent. We should validate that the + # specified endpoint exists on the client. + if endpoint not in self.traffic_policies: + logger.info("Endpoint '{}' doesn't exist".format(endpoint)) + return + + # Remove the traffic policy entry. + del self.traffic_policies[endpoint] + + for route, (route_endpoint, _) in self.routes.items(): + if route_endpoint == endpoint: + route_to_delete = route + break + else: + # This should never happen, we either add to or delete from + # both self.traffic_policies and self.routes. + assert False, "No route found for endpoint '{}'".format( + endpoint) + + # Remove the routing entry. + del self.routes[route_to_delete] + + self.endpoints_to_remove.append(endpoint) + + # NOTE(edoakes): we must write a checkpoint before pushing the + # updates to the HTTP proxy and router to avoid inconsistent state + # if we crash after pushing the update. + self._checkpoint() + + # Update the HTTP proxy first to ensure no new requests for the + # endpoint are sent to the router. + await self.http_proxy.set_route_table.remote(self.routes) + await self._remove_pending_endpoints() + async def create_backend(self, backend_tag, backend_config, func_or_class, actor_init_args): """Register a new backend under the specified tag.""" diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 19e962db7..ebf4534fa 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -88,16 +88,6 @@ class Router: The traffic policy is used to assign requests to workers. - Behavior: - >>> # psuedo-code - >>> router = Router() - >>> router.enqueue_request( - "service-name", request_args, request_kwargs, request_context) - # nothing happens, request is queued. - >>> router.add_new_worker("backend-1", worker_handle) - >>> router.link("service-name", "backend-1") - # the request is assigned to the worker - Traffic policy splits the traffic among different replicas probabilistically: @@ -254,16 +244,22 @@ class Router: # on it. worker_handle.__ray_terminate__.remote() - async def link(self, service, backend): - logger.debug("Link %s with %s", service, backend) - await self.set_traffic(service, {backend: 1.0}) - async def set_traffic(self, service, traffic_dict): logger.debug("Setting traffic for service %s to %s", service, traffic_dict) self.traffic[service] = traffic_dict await self.flush() + async def remove_service(self, service): + logger.debug("Removing service {}".format(service)) + async with self.flush_lock: + await self._flush_service_queues() + await self._flush_buffer_queues() + if service in self.service_queues: + del self.service_queues[service] + if service in self.traffic: + del self.traffic[service] + async def set_backend_config(self, backend, config_dict): logger.debug("Setting backend config for " "backend {} to {}".format(backend, config_dict)) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 614668ddc..085bce581 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -306,3 +306,38 @@ def test_delete_backend(serve_instance): serve.set_traffic("delete_backend", {"delete:v1": 1.0}) assert requests.get("http://127.0.0.1:8000/delete-backend").text == "olleh" + + +@pytest.mark.parametrize("route", [None, "/delete-endpoint"]) +def test_delete_endpoint(serve_instance, route): + endpoint_name = "delete_endpoint" + str(route) + serve.create_endpoint(endpoint_name, route=route) + serve.delete_endpoint(endpoint_name) + + # Check that we can reuse a deleted endpoint name and route. + serve.create_endpoint(endpoint_name, route=route) + + def function(): + return "hello" + + serve.create_backend(function, "delete-endpoint:v1") + serve.set_traffic(endpoint_name, {"delete-endpoint:v1": 1.0}) + + if route is not None: + assert requests.get( + "http://127.0.0.1:8000/delete-endpoint").text == "hello" + else: + handle = serve.get_handle(endpoint_name) + assert ray.get(handle.remote()) == "hello" + + # Check that deleting the endpoint doesn't delete the backend. + serve.delete_endpoint(endpoint_name) + serve.create_endpoint(endpoint_name, route=route) + serve.set_traffic(endpoint_name, {"delete-endpoint:v1": 1.0}) + + if route is not None: + assert requests.get( + "http://127.0.0.1:8000/delete-endpoint").text == "hello" + else: + handle = serve.get_handle(endpoint_name) + assert ray.get(handle.remote()) == "hello" diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index a5f9b7d59..41ac03c26 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -54,7 +54,7 @@ async def test_runner_actor(serve_instance): worker = setup_worker(CONSUMER_NAME, echo) await q.add_new_worker.remote(CONSUMER_NAME, "replica1", worker) - q.link.remote(PRODUCER_NAME, CONSUMER_NAME) + q.set_traffic.remote(PRODUCER_NAME, {CONSUMER_NAME: 1.0}) for query in [333, 444, 555]: query_param = RequestMetadata(PRODUCER_NAME, @@ -79,7 +79,7 @@ async def test_ray_serve_mixin(serve_instance): worker = setup_worker(CONSUMER_NAME, MyAdder, init_args=(3, )) await q.add_new_worker.remote(CONSUMER_NAME, "replica1", worker) - q.link.remote(PRODUCER_NAME, CONSUMER_NAME) + q.set_traffic.remote(PRODUCER_NAME, {CONSUMER_NAME: 1.0}) for query in [333, 444, 555]: query_param = RequestMetadata(PRODUCER_NAME, @@ -101,7 +101,7 @@ async def test_task_runner_check_context(serve_instance): worker = setup_worker(CONSUMER_NAME, echo) await q.add_new_worker.remote(CONSUMER_NAME, "replica1", worker) - q.link.remote(PRODUCER_NAME, CONSUMER_NAME) + q.set_traffic.remote(PRODUCER_NAME, {CONSUMER_NAME: 1.0}) query_param = RequestMetadata(PRODUCER_NAME, context.TaskContext.Python) result_oid = q.enqueue_request.remote(query_param, i=42) @@ -125,7 +125,7 @@ async def test_task_runner_custom_method_single(serve_instance): worker = setup_worker(CONSUMER_NAME, NonBatcher) await q.add_new_worker.remote(CONSUMER_NAME, "replica1", worker) - q.link.remote(PRODUCER_NAME, CONSUMER_NAME) + q.set_traffic.remote(PRODUCER_NAME, {CONSUMER_NAME: 1.0}) query_param = RequestMetadata( PRODUCER_NAME, context.TaskContext.Python, call_method="a") @@ -159,7 +159,7 @@ async def test_task_runner_custom_method_batch(serve_instance): worker = setup_worker(CONSUMER_NAME, Batcher) - await q.link.remote(PRODUCER_NAME, CONSUMER_NAME) + await q.set_traffic.remote(PRODUCER_NAME, {CONSUMER_NAME: 1.0}) await q.set_backend_config.remote( CONSUMER_NAME, BackendConfig(max_batch_size=10).__dict__) diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index c6b752c3e..4bbf2b50f 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -42,7 +42,7 @@ def task_runner_mock_actor(): async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): q = RandomPolicyQueueActor.remote() - q.link.remote("svc", "backend-single-prod") + q.set_traffic.remote("svc", {"backend-single-prod": 1.0}) q.add_new_worker.remote("backend-single-prod", "replica-1", task_runner_mock_actor) @@ -58,7 +58,7 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): async def test_slo(serve_instance, task_runner_mock_actor): q = RandomPolicyQueueActor.remote() - await q.link.remote("svc", "backend-slo") + await q.set_traffic.remote("svc", {"backend-slo": 1.0}) all_request_sent = [] for i in range(10):