diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index e4a8fa7ce..b42cd7846 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -169,11 +169,11 @@ class Client: self._shutdown = True @_ensure_connected - def _get_result(self, result_object_id: ray.ObjectRef) -> bool: - result_id: UUID = ray.get(result_object_id) - result = ray.get(self._controller.wait_for_event.remote(result_id)) - logger.debug(f"Getting result_id ({result_id}) with result: {result}") - return result + def _wait_for_goal(self, result_object_id: ray.ObjectRef) -> bool: + goal_id: Optional[UUID] = ray.get(result_object_id) + if goal_id is not None: + ray.get(self._controller.wait_for_goal.remote(goal_id)) + logger.debug(f"Goal {goal_id} completed.") @_ensure_connected def create_endpoint(self, @@ -229,7 +229,7 @@ class Client: "an element of type {}".format(type(method))) upper_methods.append(method.upper()) - self._get_result( + self._wait_for_goal( self._controller.create_endpoint.remote( endpoint_name, {backend: 1.0}, route, upper_methods)) @@ -306,7 +306,7 @@ class Client: "config_options must be a BackendConfig or dictionary.") if isinstance(config_options, dict): config_options = BackendConfig.parse_obj(config_options) - self._get_result( + self._wait_for_goal( self._controller.update_backend_config.remote( backend_tag, config_options)) @@ -404,7 +404,7 @@ class Client: raise TypeError("config must be a BackendConfig or a dictionary.") backend_config._validate_complete() - self._get_result( + self._wait_for_goal( self._controller.create_backend.remote(backend_tag, backend_config, replica_config)) @@ -427,7 +427,7 @@ class Client: force (bool): Whether or not to force the deletion, without waiting for graceful shutdown. Default to false. """ - self._get_result( + self._wait_for_goal( self._controller.delete_backend.remote(backend_tag, force)) @_ensure_connected diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index 20b70a9e0..9df9f8d03 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -1,17 +1,30 @@ import asyncio from asyncio.futures import Future from collections import defaultdict +import time from typing import Dict, Any, List, Optional, Set, Tuple +from uuid import uuid4 import ray import ray.cloudpickle as pickle from ray.actor import ActorHandle -from ray.serve.config import BackendConfig +from ray.serve.backend_worker import create_backend_replica +from ray.serve.common import ( + BackendInfo, + BackendTag, + Duration, + GoalId, + ReplicaTag, +) +from ray.serve.config import BackendConfig, ReplicaConfig +from ray.serve.constants import LongPollKey from ray.serve.exceptions import RayServeException +from ray.serve.kv_store import RayInternalKVStore +from ray.serve.long_poll import LongPollHost from ray.serve.utils import (format_actor_name, get_random_letters, logger, try_schedule_resources_on_nodes) -from ray.serve.common import (BackendInfo, BackendTag, Duration, GoalId, - ReplicaTag) + +CHECKPOINT_KEY = "serve-backend-state-checkpoint" # Feature flag for controller resource checking. If true, controller will # error if the desired replicas exceed current resource availability. @@ -25,12 +38,12 @@ class BackendState: called with a lock held. """ - def __init__(self, - controller_name: str, - detached: bool, - checkpoint: bytes = None): - self.controller_name = controller_name - self.detached = detached + def __init__(self, controller_name: str, detached: bool, + kv_store: RayInternalKVStore, long_poll_host: LongPollHost): + self._controller_name = controller_name + self._detached = detached + self._kv_store = kv_store + self._long_poll_host = long_poll_host # Non-checkpointed state. self.currently_starting_replicas: Dict[asyncio.Future, Tuple[ @@ -48,28 +61,50 @@ class BackendState: self.backend_replicas_to_stop: Dict[BackendTag, List[Tuple[ ReplicaTag, Duration]]] = defaultdict(list) self.backends_to_remove: List[BackendTag] = list() + self.pending_goals: Dict[GoalId, asyncio.Event] = dict() + checkpoint = self._kv_store.get(CHECKPOINT_KEY) if checkpoint is not None: (self.backends, self.backend_replicas, self.goals, self.backend_replicas_to_start, self.backend_replicas_to_stop, - self.backend_to_remove) = pickle.loads(checkpoint) + self.backend_to_remove, + pending_goal_ids) = pickle.loads(checkpoint) - # Fetch actor handles for all of the backend replicas in the system. - # All of these backend_replicas are guaranteed to already exist because - # they would not be written to a checkpoint in self.backend_replicas - # until they were created. - for backend_tag, replica_dict in self.backend_replicas.items(): - for replica_tag in replica_dict.keys(): - replica_name = format_actor_name(replica_tag, - self.controller_name) - self.backend_replicas[backend_tag][ - replica_tag] = ray.get_actor(replica_name) + for goal_id in pending_goal_ids: + self._create_goal(goal_id) - def checkpoint(self): - return pickle.dumps( - (self.backends, self.backend_replicas, self.goals, - self.backend_replicas_to_start, self.backend_replicas_to_stop, - self.backends_to_remove)) + # Fetch actor handles for all backend replicas in the system. + # All of these backend_replicas are guaranteed to already exist + # because they would not be written to a checkpoint in + # self.backend_replicas until they were created. + for backend_tag, replica_dict in self.backend_replicas.items(): + for replica_tag in replica_dict.keys(): + replica_name = format_actor_name(replica_tag, + self._controller_name) + self.backend_replicas[backend_tag][ + replica_tag] = ray.get_actor(replica_name) + + self._notify_backend_configs_changed() + self._notify_replica_handles_changed() + + def _checkpoint(self) -> None: + self._kv_store.put( + CHECKPOINT_KEY, + pickle.dumps( + (self.backends, self.backend_replicas, self.goals, + self.backend_replicas_to_start, self.backend_replicas_to_stop, + self.backends_to_remove, list(self.pending_goals.keys())))) + + def _notify_backend_configs_changed(self) -> None: + self._long_poll_host.notify_changed(LongPollKey.BACKEND_CONFIGS, + self.get_backend_configs()) + + def _notify_replica_handles_changed(self) -> None: + self._long_poll_host.notify_changed( + LongPollKey.REPLICA_HANDLES, { + backend_tag: list(replica_dict.values()) + for backend_tag, replica_dict in self.backend_replicas.items() + }) def get_backend_configs(self) -> Dict[BackendTag, BackendConfig]: return { @@ -84,35 +119,136 @@ class BackendState: def get_backend(self, backend_tag: BackendTag) -> Optional[BackendInfo]: return self.backends.get(backend_tag) + def num_pending_goals(self) -> int: + return len(self.pending_goals) + + async def wait_for_goal(self, goal_id: GoalId) -> None: + start = time.time() + if goal_id not in self.pending_goals: + logger.debug(f"Goal {goal_id} not found") + return True + event = self.pending_goals[goal_id] + await event.wait() + logger.debug( + f"Waiting for goal {goal_id} took {time.time() - start} seconds") + + def _complete_goal(self, goal_id: GoalId) -> None: + logger.debug(f"Completing goal {goal_id}") + event = self.pending_goals.pop(goal_id, None) + if event: + event.set() + + def _create_goal(self, goal_id: Optional[GoalId] = None) -> GoalId: + if goal_id is None: + goal_id = uuid4() + event = asyncio.Event() + self.pending_goals[goal_id] = event + return goal_id + def _set_backend_goal(self, backend_tag: BackendTag, - backend_info: Optional[BackendInfo], - goal_id: GoalId) -> Optional[GoalId]: + backend_info: BackendInfo) -> None: existing_goal = self.goals.get(backend_tag) - self.backends[backend_tag] = backend_info - if not backend_info: + new_goal = self._create_goal() + + if backend_info is not None: + self.backends[backend_tag] = backend_info + + self.goals[backend_tag] = new_goal + + return new_goal, existing_goal + + def create_backend(self, backend_tag: BackendTag, + backend_config: BackendConfig, + replica_config: ReplicaConfig) -> Optional[GoalId]: + # Ensures this method is idempotent. + backend_info = self.backends.get(backend_tag) + if backend_info is not None: + if (backend_info.backend_config == backend_config + and backend_info.replica_config == replica_config): + return None + + backend_replica = create_backend_replica(replica_config.func_or_class) + + # Save creator that starts replicas, the arguments to be passed in, + # and the configuration for the backends. + backend_info = BackendInfo( + worker_class=backend_replica, + backend_config=backend_config, + replica_config=replica_config) + + new_goal, existing_goal = self._set_backend_goal( + backend_tag, backend_info) + + try: + self.scale_backend_replicas(backend_tag, + backend_config.num_replicas) + except RayServeException as e: del self.backends[backend_tag] - self.goals[backend_tag] = goal_id - return existing_goal + raise e - def completed_goals(self) -> List[GoalId]: - completed_goals = [] - all_tags = set(self.backend_replicas.keys()).union( - set(self.backends.keys())) + # NOTE(edoakes): we must write a checkpoint before starting new + # or pushing the updated config to avoid inconsistent state if we + # crash while making the change. + self._checkpoint() + self._notify_backend_configs_changed() - for backend_tag in all_tags: - desired_info = self.backends.get(backend_tag) - existing_info = self.backend_replicas.get(backend_tag) - # Check for deleting - if (not desired_info or - desired_info.backend_config.num_replicas == 0) and \ - (not existing_info or len(existing_info) == 0): - completed_goals.append(self.goals[backend_tag]) + if existing_goal is not None: + self._complete_goal(existing_goal) + return new_goal - # Check for a non-zero number of backends - if desired_info and existing_info and desired_info.backend_config.\ - num_replicas == len(existing_info): - completed_goals.append(self.goals[backend_tag]) - return completed_goals + def delete_backend(self, backend_tag: BackendTag, + force_kill: bool = False) -> Optional[GoalId]: + # This method must be idempotent. We should validate that the + # specified backend exists on the client. + if backend_tag not in self.backends: + return None + + # Scale its replicas down to 0. + self.scale_backend_replicas(backend_tag, 0, force_kill) + + # Remove the backend's metadata. + del self.backends[backend_tag] + + # Add the intention to remove the backend from the routers. + self.backends_to_remove.append(backend_tag) + + new_goal, existing_goal = self._set_backend_goal(backend_tag, None) + + self._checkpoint() + if existing_goal is not None: + self._complete_goal(existing_goal) + return new_goal + + def update_backend_config(self, backend_tag: BackendTag, + config_options: BackendConfig): + if backend_tag not in self.backends: + raise ValueError(f"Backend {backend_tag} is not registered") + + stored_backend_config = self.backends[backend_tag].backend_config + updated_config = stored_backend_config.copy( + update=config_options.dict(exclude_unset=True)) + updated_config._validate_complete() + self.backends[backend_tag].backend_config = updated_config + + new_goal, existing_goal = self._set_backend_goal( + backend_tag, self.backends[backend_tag]) + + # Scale the replicas with the new configuration. + self.scale_backend_replicas(backend_tag, updated_config.num_replicas) + + # NOTE(edoakes): we must write a checkpoint before pushing the + # update to avoid inconsistent state if we crash after pushing the + # update. + self._checkpoint() + if existing_goal is not None: + self._complete_goal(existing_goal) + + # Inform the routers and backend replicas about config changes. + # TODO(edoakes): this should only happen if we change something other + # than num_replicas. + self._notify_backend_configs_changed() + + return new_goal def _start_backend_replica(self, backend_tag: BackendTag, replica_tag: ReplicaTag) -> ActorHandle: @@ -125,7 +261,7 @@ class BackendState: # NOTE(edoakes): the replicas may already be created if we # failed after creating them but before writing a # checkpoint. - replica_name = format_actor_name(replica_tag, self.controller_name) + replica_name = format_actor_name(replica_tag, self._controller_name) try: replica_handle = ray.get_actor(replica_name) except ValueError: @@ -135,13 +271,13 @@ class BackendState: replica_handle = ray.remote(backend_info.worker_class).options( name=replica_name, - lifetime="detached" if self.detached else None, + lifetime="detached" if self._detached else None, max_restarts=-1, max_task_retries=-1, **backend_info.replica_config.ray_actor_options).remote( backend_tag, replica_tag, backend_info.replica_config.actor_init_args, - backend_info.backend_config, self.controller_name) + backend_info.backend_config, self._controller_name) return replica_handle @@ -228,7 +364,7 @@ class BackendState: self.backend_replicas_to_stop.items()): for replica_tag, shutdown_timeout in replicas_to_stop: replica_name = format_actor_name(replica_tag, - self.controller_name) + self._controller_name) async def kill_actor(replica_name_to_use): # NOTE: the replicas may already be stopped if we failed @@ -280,9 +416,9 @@ class BackendState: in_flight: Set[Future[Any]] = set() if self.currently_stopping_replicas: - done_stoppping, in_flight = await asyncio.wait( + done_stopping, in_flight = await asyncio.wait( list(self.currently_stopping_replicas.keys()), timeout=0) - for fut in done_stoppping: + for fut in done_stopping: (backend_tag, replica_tag) = self.currently_stopping_replicas.pop(fut) @@ -307,8 +443,30 @@ class BackendState: if len(self.backend_replicas[backend_tag]) == 0: del self.backend_replicas[backend_tag] + def _completed_goals(self) -> List[GoalId]: + completed_goals = [] + all_tags = set(self.backend_replicas.keys()).union( + set(self.backends.keys())) + + for backend_tag in all_tags: + desired_info = self.backends.get(backend_tag) + existing_info = self.backend_replicas.get(backend_tag) + # Check for deleting + if (not desired_info or + desired_info.backend_config.num_replicas == 0) and \ + (not existing_info or len(existing_info) == 0): + completed_goals.append(self.goals[backend_tag]) + + # Check for a non-zero number of backends + if desired_info and existing_info and desired_info.backend_config.\ + num_replicas == len(existing_info): + completed_goals.append(self.goals[backend_tag]) + return completed_goals + async def update(self) -> bool: - """Returns whether the number of backends has changed.""" + for goal_id in self._completed_goals(): + self._complete_goal(goal_id) + self._start_pending_replicas() self._stop_pending_replicas() @@ -318,5 +476,7 @@ class BackendState: await self._check_currently_starting_replicas() await self._check_currently_stopping_replicas() - return (len(self.currently_starting_replicas) != num_starting) or \ - (len(self.currently_stopping_replicas) != num_stopping) + if (len(self.currently_starting_replicas) != num_starting) or \ + (len(self.currently_stopping_replicas) != num_stopping): + self._checkpoint() + self._notify_replica_handles_changed() diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 93c88681c..e6d0aa82a 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -1,20 +1,11 @@ import asyncio -import os -import random -import time from collections import defaultdict -from dataclasses import dataclass from typing import Dict, Any, List, Optional -from uuid import uuid4, UUID import ray -import ray.cloudpickle as pickle from ray.actor import ActorHandle from ray.serve.backend_state import BackendState -from ray.serve.backend_worker import create_backend_replica -from ray.serve.constants import LongPollKey from ray.serve.common import ( - BackendInfo, BackendTag, EndpointTag, GoalId, @@ -24,7 +15,6 @@ from ray.serve.common import ( ) from ray.serve.config import BackendConfig, HTTPOptions, ReplicaConfig from ray.serve.endpoint_state import EndpointState -from ray.serve.exceptions import RayServeException from ray.serve.http_state import HTTPState from ray.serve.kv_store import RayInternalKVStore from ray.serve.long_poll import LongPollHost @@ -39,19 +29,6 @@ CHECKPOINT_KEY = "serve-controller-checkpoint" CONTROL_LOOP_PERIOD_S = 1.0 -@dataclass -class FutureResult: - # Goal requested when this future was created - requested_goal: Dict[str, Any] - - -@dataclass -class Checkpoint: - backend_state_checkpoint: bytes - # TODO(ilr) Rename reconciler to PendingState - inflight_reqs: Dict[uuid4, FutureResult] - - @ray.remote class ServeController: """Responsible for managing the state of the serving system. @@ -92,11 +69,6 @@ class ServeController: # at any given time. self.write_lock = asyncio.Lock() - # Map of awaiting results - # TODO(ilr): Checkpoint this once this becomes asynchronous - self.inflight_results: Dict[UUID, asyncio.Event] = dict() - self._serializable_inflight_results: Dict[UUID, FutureResult] = dict() - # NOTE(simon): Currently we do all-to-all broadcast. This means # any listeners will receive notification for all changes. This # can be problem at scale, e.g. updating a single backend config @@ -106,70 +78,16 @@ class ServeController: self.http_state = HTTPState(controller_name, detached, http_config) self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host) - - checkpoint_bytes = self.kv_store.get(CHECKPOINT_KEY) - if checkpoint_bytes is None: - logger.debug("No checkpoint found") - self.backend_state = BackendState(controller_name, detached) - else: - checkpoint: Checkpoint = pickle.loads(checkpoint_bytes) - self.backend_state = BackendState( - controller_name, - detached, - checkpoint=checkpoint.backend_state_checkpoint) - - self._serializable_inflight_results = checkpoint.inflight_reqs - for uuid, fut_result in self._serializable_inflight_results.items( - ): - self._create_event_with_result(fut_result.requested_goal, uuid) - - self.notify_backend_configs_changed() - self.notify_replica_handles_changed() + self.backend_state = BackendState(controller_name, detached, + self.kv_store, self.long_poll_host) asyncio.get_event_loop().create_task(self.run_control_loop()) - async def wait_for_event(self, uuid: UUID) -> bool: - start = time.time() - if uuid not in self.inflight_results: - logger.debug(f"UUID ({uuid}) not found!!!") - return True - event = self.inflight_results[uuid] - await event.wait() - self.inflight_results.pop(uuid) - self._serializable_inflight_results.pop(uuid) - async with self.write_lock: - self._checkpoint() - logger.debug(f"Waiting for {uuid} took {time.time() - start} seconds") + async def wait_for_goal(self, goal_id: GoalId) -> None: + await self.backend_state.wait_for_goal(goal_id) - return True - - def _create_event_with_result( - self, - goal_state: Dict[str, any], - recreation_uuid: Optional[UUID] = None) -> UUID: - # NOTE(ilr) Must be called before checkpointing! - event = asyncio.Event() - event.result = FutureResult(goal_state) - uuid_val = recreation_uuid or uuid4() - self.inflight_results[uuid_val] = event - self._serializable_inflight_results[uuid_val] = event.result - return uuid_val - - async def _num_inflight_results(self) -> int: - return len(self.inflight_results) - - def notify_replica_handles_changed(self): - self.long_poll_host.notify_changed( - LongPollKey.REPLICA_HANDLES, { - backend_tag: list(replica_dict.values()) - for backend_tag, replica_dict in - self.backend_state.backend_replicas.items() - }) - - def notify_backend_configs_changed(self): - self.long_poll_host.notify_changed( - LongPollKey.BACKEND_CONFIGS, - self.backend_state.get_backend_configs()) + async def _num_pending_goals(self) -> int: + return self.backend_state.num_pending_goals() async def listen_for_change(self, keys_to_snapshot_ids: Dict[str, int]): """Proxy long pull client's listen request. @@ -186,45 +104,11 @@ class ServeController: """Returns a dictionary of node ID to http_proxy actor handles.""" return self.http_state.get_http_proxy_handles() - def _checkpoint(self) -> None: - """Checkpoint internal state and write it to the KV store.""" - assert self.write_lock.locked() - logger.debug("Writing checkpoint") - start = time.time() - - checkpoint = pickle.dumps( - Checkpoint(self.backend_state.checkpoint(), - self._serializable_inflight_results)) - - self.kv_store.put(CHECKPOINT_KEY, checkpoint) - logger.debug("Wrote checkpoint in {:.3f}s".format(time.time() - start)) - - if random.random( - ) < _CRASH_AFTER_CHECKPOINT_PROBABILITY and self.detached: - logger.warning("Intentionally crashing after checkpoint") - os._exit(0) - - async def reconcile_current_and_goal_backends(self): - pass - - def set_goal_id(self, goal_id: UUID) -> None: - event = self.inflight_results.get(goal_id) - logger.debug(f"Setting goal id {goal_id}") - if event: - event.set() - async def run_control_loop(self) -> None: while True: async with self.write_lock: self.http_state.update() - - completed_ids = self.backend_state.completed_goals() - for done_id in completed_ids: - self.set_goal_id(done_id) - delta_workers = await self.backend_state.update() - if delta_workers: - self.notify_replica_handles_changed() - self._checkpoint() + await self.backend_state.update() await asyncio.sleep(CONTROL_LOOP_PERIOD_S) @@ -241,17 +125,6 @@ class ServeController: """Returns a dictionary of backend tag to backend config.""" return self.endpoint_state.get_endpoints() - def _set_traffic(self, endpoint_name: str, - traffic_dict: Dict[str, float]) -> UUID: - for backend in traffic_dict: - if self.backend_state.get_backend(backend) is None: - raise ValueError( - "Attempted to assign traffic to a backend '{}' that " - "is not registered.".format(backend)) - - self.endpoint_state.set_traffic_policy(endpoint_name, - TrafficPolicy(traffic_dict)) - def _validate_traffic_dict(self, traffic_dict: Dict[str, float]): for backend in traffic_dict: if self.backend_state.get_backend(backend) is None: @@ -259,15 +132,20 @@ class ServeController: "Attempted to assign traffic to a backend '{}' that " "is not registered.".format(backend)) - async def set_traffic(self, endpoint_name: str, + async def set_traffic(self, endpoint: str, traffic_dict: Dict[str, float]) -> None: """Sets the traffic policy for the specified endpoint.""" async with self.write_lock: self._validate_traffic_dict(traffic_dict) - self._set_traffic(endpoint_name, traffic_dict) + + logger.info("Setting traffic for endpoint " + f"'{endpoint}' to '{traffic_dict}'.") + + self.endpoint_state.set_traffic_policy(endpoint, + TrafficPolicy(traffic_dict)) async def shadow_traffic(self, endpoint_name: str, backend_tag: BackendTag, - proportion: float) -> UUID: + proportion: float) -> None: """Shadow traffic from the endpoint to the backend.""" async with self.write_lock: if self.backend_state.get_backend(backend_tag) is None: @@ -285,7 +163,7 @@ class ServeController: # TODO(architkulkarni): add Optional for route after cloudpickle upgrade async def create_endpoint(self, endpoint: str, traffic_dict: Dict[str, float], route, - methods: List[str]) -> UUID: + methods: List[str]) -> None: """Create a new endpoint with the specified route and methods. If the route is None, this is a "headless" endpoint that will not @@ -310,67 +188,18 @@ class ServeController: async with self.write_lock: self.endpoint_state.delete_endpoint(endpoint) - async def set_backend_goal(self, backend_tag: BackendTag, - backend_info: BackendInfo, - new_id: GoalId) -> None: - # NOTE(ilr) Must checkpoint after doing this! - existing_id_to_set = self.backend_state._set_backend_goal( - backend_tag, backend_info, new_id) - if existing_id_to_set: - self.set_goal_id(existing_id_to_set) - - async def create_backend(self, backend_tag: BackendTag, - backend_config: BackendConfig, - replica_config: ReplicaConfig) -> UUID: + async def create_backend( + self, backend_tag: BackendTag, backend_config: BackendConfig, + replica_config: ReplicaConfig) -> Optional[GoalId]: """Register a new backend under the specified tag.""" async with self.write_lock: - # Ensures this method is idempotent. - backend_info = self.backend_state.get_backend(backend_tag) - if backend_info is not None: - if (backend_info.backend_config == backend_config - and backend_info.replica_config == replica_config): - return - - backend_replica = create_backend_replica( - replica_config.func_or_class) - - # Save creator that starts replicas, the arguments to be passed in, - # and the configuration for the backends. - backend_info = BackendInfo( - worker_class=backend_replica, - backend_config=backend_config, - replica_config=replica_config) - - return_uuid = self._create_event_with_result({ - backend_tag: backend_info - }) - - await self.set_backend_goal(backend_tag, backend_info, return_uuid) - - try: - # This call should be to run control loop - self.backend_state.scale_backend_replicas( - backend_tag, backend_config.num_replicas) - except RayServeException as e: - del self.backend_state.backends[backend_tag] - raise e - - # NOTE(edoakes): we must write a checkpoint before starting new - # or pushing the updated config to avoid inconsistent state if we - # crash while making the change. - self._checkpoint() - self.notify_backend_configs_changed() - return return_uuid + return self.backend_state.create_backend( + backend_tag, backend_config, replica_config) async def delete_backend(self, backend_tag: BackendTag, - force_kill: bool = False) -> UUID: + force_kill: bool = False) -> Optional[GoalId]: async with self.write_lock: - # This method must be idempotent. We should validate that the - # specified backend exists on the client. - if self.backend_state.get_backend(backend_tag) is None: - return - # Check that the specified backend isn't used by any endpoints. for endpoint, info in self.endpoint_state.get_endpoints().items(): if (backend_tag in info["traffic"] @@ -379,68 +208,19 @@ class ServeController: "and cannot be deleted. Please remove " "the backend from all endpoints and try " "again.".format(backend_tag, endpoint)) - - # Scale its replicas down to 0. - self.backend_state.scale_backend_replicas(backend_tag, 0, - force_kill) - - # Remove the backend's metadata. - del self.backend_state.backends[backend_tag] - - # Add the intention to remove the backend from the routers. - self.backend_state.backends_to_remove.append(backend_tag) - - return_uuid = self._create_event_with_result({backend_tag: None}) - # Remove the backend's metadata. - await self.set_backend_goal(backend_tag, None, return_uuid) - # NOTE(edoakes): we must write a checkpoint before removing the - # backend from the routers to avoid inconsistent state if we crash - # after pushing the update. - self._checkpoint() - return return_uuid + return self.backend_state.delete_backend(backend_tag, force_kill) async def update_backend_config(self, backend_tag: BackendTag, - config_options: BackendConfig) -> UUID: + config_options: BackendConfig) -> GoalId: """Set the config for the specified backend.""" async with self.write_lock: - assert (self.backend_state.get_backend(backend_tag) - ), "Backend {} is not registered.".format(backend_tag) - assert isinstance(config_options, BackendConfig) - - stored_backend_config = self.backend_state.get_backend( - backend_tag).backend_config - backend_config = stored_backend_config.copy( - update=config_options.dict(exclude_unset=True)) - backend_config._validate_complete() - self.backend_state.get_backend( - backend_tag).backend_config = backend_config - backend_info = self.backend_state.get_backend(backend_tag) - - return_uuid = self._create_event_with_result({ - backend_tag: backend_info - }) - await self.set_backend_goal(backend_tag, backend_info, return_uuid) - - # Scale the replicas with the new configuration. - - # This should be to run the control loop - self.backend_state.scale_backend_replicas( - backend_tag, backend_config.num_replicas) - - # NOTE(edoakes): we must write a checkpoint before pushing the - # update to avoid inconsistent state if we crash after pushing the - # update. - self._checkpoint() - - # Inform the routers and backend replicas about config changes. - self.notify_backend_configs_changed() - - return return_uuid + return self.backend_state.update_backend_config( + backend_tag, config_options) def get_backend_config(self, backend_tag: BackendTag) -> BackendConfig: """Get the current config for the specified backend.""" - assert (self.backend_state.get_backend(backend_tag) - ), "Backend {} is not registered.".format(backend_tag) + if self.backend_state.get_backend(backend_tag) is None: + raise ValueError(f"Backend {backend_tag} is not registered.") return self.backend_state.get_backend(backend_tag).backend_config def get_http_config(self): diff --git a/python/ray/serve/tests/test_controller.py b/python/ray/serve/tests/test_controller.py index 1900d39dc..37019f26d 100644 --- a/python/ray/serve/tests/test_controller.py +++ b/python/ray/serve/tests/test_controller.py @@ -6,7 +6,7 @@ import ray def test_controller_inflight_requests_clear(serve_instance): client = serve_instance initial_number_reqs = ray.get( - client._controller._num_inflight_results.remote()) + client._controller._num_pending_goals.remote()) def function(_): return "hello" @@ -14,7 +14,7 @@ def test_controller_inflight_requests_clear(serve_instance): client.create_backend("tst", function) client.create_endpoint("end_pt", backend="tst") - assert ray.get(client._controller._num_inflight_results.remote() + assert ray.get(client._controller._num_pending_goals.remote() ) - initial_number_reqs == 0 diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 2388cf4a5..7ecba4d51 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -226,7 +226,7 @@ def test_create_backend_idempotent(serve_instance): for i in range(10): ray.get( - controller.wait_for_event.remote( + controller.wait_for_goal.remote( controller.create_backend.remote("my_backend", backend_config, replica_config))) @@ -249,7 +249,7 @@ def test_create_endpoint_idempotent(serve_instance): for i in range(10): ray.get( - controller.wait_for_event.remote( + controller.wait_for_goal.remote( controller.create_endpoint.remote( "my_endpoint", {"my_backend": 1.0}, "/my_route", ["GET"])))