mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 01:46:10 +08:00
[serve] Add delete_backend call (#8252)
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
from ray.serve.backend_config import BackendConfig
|
||||
from ray.serve.policy import RoutePolicy
|
||||
from ray.serve.api import (init, create_backend, create_endpoint, set_traffic,
|
||||
get_handle, stat, set_backend_config,
|
||||
get_backend_config, accept_batch) # noqa: E402
|
||||
from ray.serve.api import (init, create_backend, delete_backend,
|
||||
create_endpoint, set_traffic, get_handle, stat,
|
||||
set_backend_config, get_backend_config,
|
||||
accept_batch) # noqa: E402
|
||||
|
||||
__all__ = [
|
||||
"init", "create_backend", "create_endpoint", "set_traffic", "get_handle",
|
||||
"stat", "set_backend_config", "get_backend_config", "BackendConfig",
|
||||
"RoutePolicy", "accept_batch"
|
||||
"init", "create_backend", "delete_backend", "create_endpoint",
|
||||
"set_traffic", "get_handle", "stat", "set_backend_config",
|
||||
"get_backend_config", "BackendConfig", "RoutePolicy", "accept_batch"
|
||||
]
|
||||
|
||||
@@ -240,6 +240,15 @@ def create_backend(func_or_class,
|
||||
backend_config, func_or_class, actor_init_args)
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def delete_backend(backend_tag):
|
||||
"""Delete the given backend.
|
||||
|
||||
The backend must not currently be used by any endpoints.
|
||||
"""
|
||||
retry_actor_failures(master_actor.delete_backend, backend_tag)
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def set_traffic(endpoint_name, traffic_policy_dictionary):
|
||||
"""Associate a service endpoint with traffic policy.
|
||||
|
||||
+64
-15
@@ -64,6 +64,9 @@ class ServeMaster:
|
||||
self.replicas_to_start = defaultdict(list)
|
||||
# replicas that should be stopped if recovering from a checkpoint.
|
||||
self.replicas_to_stop = defaultdict(list)
|
||||
# backends that should be removed from the router if recovering from a
|
||||
# checkpoint.
|
||||
self.backends_to_remove = list()
|
||||
# endpoint -> traffic_dict
|
||||
self.traffic_policies = dict()
|
||||
# Dictionary of backend tag to dictionaries of replica tag to worker.
|
||||
@@ -179,7 +182,8 @@ class ServeMaster:
|
||||
start = time.time()
|
||||
checkpoint = pickle.dumps(
|
||||
(self.routes, self.backends, self.traffic_policies, self.replicas,
|
||||
self.replicas_to_start, self.replicas_to_stop))
|
||||
self.replicas_to_start, self.replicas_to_stop,
|
||||
self.backends_to_remove))
|
||||
|
||||
self.kv_store_client.put("checkpoint", checkpoint)
|
||||
logger.debug("Wrote checkpoint in {:.2f}".format(time.time() - start))
|
||||
@@ -208,8 +212,8 @@ class ServeMaster:
|
||||
|
||||
# Load internal state from the checkpoint data.
|
||||
(self.routes, self.backends, self.traffic_policies, self.replicas,
|
||||
self.replicas_to_start,
|
||||
self.replicas_to_stop) = pickle.loads(checkpoint_bytes)
|
||||
self.replicas_to_start, self.replicas_to_stop,
|
||||
self.backends_to_remove) = pickle.loads(checkpoint_bytes)
|
||||
|
||||
# Fetch actor handles for all of the backend replicas in the system.
|
||||
# All of these workers are guaranteed to already exist because they
|
||||
@@ -241,6 +245,9 @@ class ServeMaster:
|
||||
await self._start_pending_replicas()
|
||||
await self._stop_pending_replicas()
|
||||
|
||||
# Remove any pending backends.
|
||||
await self._remove_pending_backends()
|
||||
|
||||
logger.info(
|
||||
"Recovered from checkpoint in {:.3f}s".format(time.time() - start))
|
||||
|
||||
@@ -342,6 +349,15 @@ class ServeMaster:
|
||||
|
||||
self.replicas_to_stop.clear()
|
||||
|
||||
async def _remove_pending_backends(self):
|
||||
"""Removes the pending backends in self.backends_to_remove.
|
||||
|
||||
Clears self.backends_to_remove.
|
||||
"""
|
||||
for backend_tag in self.backends_to_remove:
|
||||
await self.router.remove_backend.remote(backend_tag)
|
||||
self.backends_to_remove.clear()
|
||||
|
||||
def _scale_replicas(self, backend_tag, num_replicas):
|
||||
"""Scale the given backend to the number of replicas.
|
||||
|
||||
@@ -394,18 +410,20 @@ class ServeMaster:
|
||||
async def set_traffic(self, endpoint_name, traffic_policy_dictionary):
|
||||
"""Sets the traffic policy for the specified endpoint."""
|
||||
async with self.write_lock:
|
||||
assert endpoint_name in self.get_all_endpoints(), \
|
||||
"Attempted to assign traffic for an endpoint '{}'" \
|
||||
" that is not registered.".format(endpoint_name)
|
||||
if endpoint_name not in self.get_all_endpoints():
|
||||
raise ValueError(
|
||||
"Attempted to assign traffic for an endpoint '{}'"
|
||||
" that is not registered.".format(endpoint_name))
|
||||
|
||||
assert isinstance(traffic_policy_dictionary,
|
||||
dict), "Traffic policy must be dictionary"
|
||||
prob = 0
|
||||
for backend, weight in traffic_policy_dictionary.items():
|
||||
prob += weight
|
||||
assert backend in self.backends, \
|
||||
"Attempted to assign traffic to a backend '{}' that " \
|
||||
"is not registered.".format(backend)
|
||||
if backend not in self.backends:
|
||||
raise ValueError(
|
||||
"Attempted to assign traffic to a backend '{}' that "
|
||||
"is not registered.".format(backend))
|
||||
|
||||
assert np.isclose(
|
||||
prob, 1, atol=0.02
|
||||
@@ -481,21 +499,52 @@ class ServeMaster:
|
||||
# crash while making the change.
|
||||
self._checkpoint()
|
||||
await self._start_pending_replicas()
|
||||
await self._stop_pending_replicas()
|
||||
|
||||
# Set the backend config inside the router
|
||||
# (particularly for max-batch-size).
|
||||
await self.router.set_backend_config.remote(
|
||||
backend_tag, backend_config_dict)
|
||||
|
||||
async def delete_backend(self, backend_tag):
|
||||
async with self.write_lock:
|
||||
# This method must be idempotent. We should validate that the
|
||||
# specified backend exists on the client.
|
||||
if backend_tag not in self.backends:
|
||||
return
|
||||
|
||||
# Check that the specified backend isn't used by any endpoints.
|
||||
for endpoint, traffic_dict in self.traffic_policies.items():
|
||||
if backend_tag in traffic_dict:
|
||||
raise ValueError("Backend '{}' is used by endpoint '{}' "
|
||||
"and cannot be deleted. Please remove "
|
||||
"the backend from all endpoints and try "
|
||||
"again.".format(backend_tag, endpoint))
|
||||
|
||||
# Scale its replicas down to 0. This will also remove the backend
|
||||
# from self.backends and self.replicas.
|
||||
self._scale_replicas(backend_tag, 0)
|
||||
|
||||
# Remove the backend's metadata.
|
||||
del self.backends[backend_tag]
|
||||
|
||||
# Add the intention to remove the backend from the router.
|
||||
self.backends_to_remove.append(backend_tag)
|
||||
|
||||
# NOTE(edoakes): we must write a checkpoint before removing the
|
||||
# backend from the router to avoid inconsistent state if we crash
|
||||
# after pushing the update.
|
||||
self._checkpoint()
|
||||
await self._stop_pending_replicas()
|
||||
await self._remove_pending_backends()
|
||||
|
||||
async def set_backend_config(self, backend_tag, backend_config):
|
||||
"""Set the config for the specified backend."""
|
||||
async with self.write_lock:
|
||||
assert (backend_tag in self.backends
|
||||
), "Backend {} is not registered.".format(backend_tag)
|
||||
assert isinstance(backend_config,
|
||||
BackendConfig), ("backend_config must be"
|
||||
" of instance BackendConfig")
|
||||
if backend_tag not in self.backends:
|
||||
raise ValueError(
|
||||
"Backend '{}' is not registered.".format(backend_tag))
|
||||
if not isinstance(backend_config, BackendConfig):
|
||||
raise ValueError("backend_config must be a BackendConfig.")
|
||||
backend_config_dict = dict(backend_config)
|
||||
backend_worker, init_args, old_backend_config_dict = self.backends[
|
||||
backend_tag]
|
||||
|
||||
@@ -269,6 +269,18 @@ class Router:
|
||||
"backend {} to {}".format(backend, config_dict))
|
||||
self.backend_info[backend] = config_dict
|
||||
|
||||
async def remove_backend(self, backend):
|
||||
logger.debug("Removing backend {}".format(backend))
|
||||
async with self.flush_lock:
|
||||
await self._flush_service_queues()
|
||||
await self._flush_buffer_queues()
|
||||
if backend in self.backend_info:
|
||||
del self.backend_info[backend]
|
||||
if backend in self.worker_queues:
|
||||
del self.worker_queues[backend]
|
||||
if backend in self.buffer_queues:
|
||||
del self.buffer_queues[backend]
|
||||
|
||||
async def flush(self):
|
||||
"""In the default case, flush calls ._flush.
|
||||
|
||||
|
||||
@@ -94,9 +94,9 @@ def test_set_traffic_missing_data(serve_instance):
|
||||
backend_name = "foo_backend"
|
||||
serve.create_endpoint(endpoint_name)
|
||||
serve.create_backend(lambda: 5, backend_name)
|
||||
with pytest.raises(AssertionError):
|
||||
with pytest.raises(ValueError):
|
||||
serve.set_traffic(endpoint_name, {"nonexistent_backend": 1.0})
|
||||
with pytest.raises(AssertionError):
|
||||
with pytest.raises(ValueError):
|
||||
serve.set_traffic("nonexistent_endpoint_name", {backend_name: 1.0})
|
||||
|
||||
|
||||
@@ -267,3 +267,42 @@ def test_not_killing_replicas(serve_instance):
|
||||
# and should be subset of all_tag_list
|
||||
assert set(old_replica_tag_list) <= set(new_all_tag_list)
|
||||
assert set(old_replica_tag_list) == set(new_replica_tag_list)
|
||||
|
||||
|
||||
def test_delete_backend(serve_instance):
|
||||
serve.create_endpoint("delete_backend", "/delete-backend")
|
||||
|
||||
def function():
|
||||
return "hello"
|
||||
|
||||
serve.create_backend(function, "delete:v1")
|
||||
serve.set_traffic("delete_backend", {"delete:v1": 1.0})
|
||||
|
||||
assert requests.get("http://127.0.0.1:8000/delete-backend").text == "hello"
|
||||
|
||||
# Check that we can't delete the backend while it's in use.
|
||||
with pytest.raises(ValueError):
|
||||
serve.delete_backend("delete:v1")
|
||||
|
||||
serve.create_backend(function, "delete:v2")
|
||||
serve.set_traffic("delete_backend", {"delete:v1": 0.5, "delete:v2": 0.5})
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
serve.delete_backend("delete:v1")
|
||||
|
||||
# Check that the backend can be deleted once it's no longer in use.
|
||||
serve.set_traffic("delete_backend", {"delete:v2": 1.0})
|
||||
serve.delete_backend("delete:v1")
|
||||
|
||||
# Check that we can no longer use the previously deleted backend.
|
||||
with pytest.raises(ValueError):
|
||||
serve.set_traffic("delete_backend", {"delete:v1": 1.0})
|
||||
|
||||
def function2():
|
||||
return "olleh"
|
||||
|
||||
# Check that we can now reuse the previously delete backend's tag.
|
||||
serve.create_backend(function2, "delete:v1")
|
||||
serve.set_traffic("delete_backend", {"delete:v1": 1.0})
|
||||
|
||||
assert requests.get("http://127.0.0.1:8000/delete-backend").text == "olleh"
|
||||
|
||||
Reference in New Issue
Block a user