diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py new file mode 100644 index 000000000..079c65fec --- /dev/null +++ b/python/ray/serve/backend_state.py @@ -0,0 +1,316 @@ +import asyncio +from asyncio.futures import Future +from collections import defaultdict +from typing import Dict, Any, List, Optional, Set, Tuple + +import ray +import ray.cloudpickle as pickle +from ray.actor import ActorHandle +from ray.serve.config import BackendConfig +from ray.serve.exceptions import RayServeException +from ray.serve.utils import (format_actor_name, get_random_letters, logger, + try_schedule_resources_on_nodes) +from ray.serve.common import (BackendInfo, BackendTag, Duration, GoalId, + ReplicaTag) + +# Feature flag for controller resource checking. If true, controller will +# error if the desired replicas exceed current resource availability. +_RESOURCE_CHECK_ENABLED = True + + +class BackendState: + def __init__(self, + controller_name: str, + detached: bool, + checkpoint: bytes = None): + self.controller_name = controller_name + self.detached = detached + + # Non-checkpointed state. + self.currently_starting_replicas: Dict[asyncio.Future, Tuple[ + BackendTag, ReplicaTag, ActorHandle]] = dict() + self.currently_stopping_replicas: Dict[asyncio.Future, Tuple[ + BackendTag, ReplicaTag]] = dict() + + # Checkpointed state. + self.backends: Dict[BackendTag, BackendInfo] = dict() + self.backend_replicas: Dict[BackendTag, Dict[ + ReplicaTag, ActorHandle]] = defaultdict(dict) + self.goals: Dict[BackendTag, GoalId] = dict() + self.backend_replicas_to_start: Dict[BackendTag, List[ + ReplicaTag]] = defaultdict(list) + self.backend_replicas_to_stop: Dict[BackendTag, List[Tuple[ + ReplicaTag, Duration]]] = defaultdict(list) + self.backends_to_remove: List[BackendTag] = list() + + if checkpoint is not None: + (self.backends, self.backend_replicas, self.goals, + self.backend_replicas_to_start, self.backend_replicas_to_stop, + self.backend_to_remove) = pickle.loads(checkpoint) + + # Fetch actor handles for all of the backend replicas in the system. + # All of these backend_replicas are guaranteed to already exist because + # they would not be written to a checkpoint in self.backend_replicas + # until they were created. + for backend_tag, replica_dict in self.backend_replicas.items(): + for replica_tag in replica_dict.keys(): + replica_name = format_actor_name(replica_tag, + self.controller_name) + self.backend_replicas[backend_tag][ + replica_tag] = ray.get_actor(replica_name) + + def checkpoint(self): + return pickle.dumps( + (self.backends, self.backend_replicas, self.goals, + self.backend_replicas_to_start, self.backend_replicas_to_stop, + self.backends_to_remove)) + + def get_backend_configs(self) -> Dict[BackendTag, BackendConfig]: + return { + tag: info.backend_config + for tag, info in self.backends.items() + } + + def get_replica_handles( + self) -> Dict[BackendTag, Dict[ReplicaTag, ActorHandle]]: + return self.backend_replicas + + def get_backend(self, backend_tag: BackendTag) -> Optional[BackendInfo]: + return self.backends.get(backend_tag) + + def _set_backend_goal(self, backend_tag: BackendTag, + backend_info: Optional[BackendInfo], + goal_id: GoalId) -> Optional[GoalId]: + existing_goal = self.goals.get(backend_tag) + self.backends[backend_tag] = backend_info + if not backend_info: + del self.backends[backend_tag] + self.goals[backend_tag] = goal_id + return existing_goal + + def completed_goals(self) -> List[GoalId]: + completed_goals = [] + all_tags = set(self.backend_replicas.keys()).union( + set(self.backends.keys())) + + for backend_tag in all_tags: + desired_info = self.backends.get(backend_tag) + existing_info = self.backend_replicas.get(backend_tag) + # Check for deleting + if (not desired_info or + desired_info.backend_config.num_replicas == 0) and \ + (not existing_info or len(existing_info) == 0): + completed_goals.append(self.goals[backend_tag]) + + # Check for a non-zero number of backends + if desired_info and existing_info and desired_info.backend_config.\ + num_replicas == len(existing_info): + completed_goals.append(self.goals[backend_tag]) + return completed_goals + + def _start_backend_replica(self, backend_tag: BackendTag, + replica_tag: ReplicaTag) -> ActorHandle: + """Start a replica and return its actor handle. + + Checks if the named actor already exists before starting a new one. + + Assumes that the backend configuration is already in the Goal State. + """ + # NOTE(edoakes): the replicas may already be created if we + # failed after creating them but before writing a + # checkpoint. + replica_name = format_actor_name(replica_tag, self.controller_name) + try: + replica_handle = ray.get_actor(replica_name) + except ValueError: + logger.debug("Starting replica '{}' for backend '{}'.".format( + replica_tag, backend_tag)) + backend_info = self.get_backend(backend_tag) + + replica_handle = ray.remote(backend_info.worker_class).options( + name=replica_name, + lifetime="detached" if self.detached else None, + max_restarts=-1, + max_task_retries=-1, + **backend_info.replica_config.ray_actor_options).remote( + backend_tag, replica_tag, + backend_info.replica_config.actor_init_args, + backend_info.backend_config, self.controller_name) + + return replica_handle + + def scale_backend_replicas( + self, + backend_tag: BackendTag, + num_replicas: int, + force_kill: bool = False, + ) -> None: + """Scale the given backend to the number of replicas. + + NOTE: this does not actually start or stop the replicas, but instead + adds the intention to start/stop them to self.backend_replicas_to_start + and self.backend_replicas_to_stop. The caller is responsible for then + first writing a checkpoint and then actually starting/stopping the + intended replicas. This avoids inconsistencies with starting/stopping a + replica and then crashing before writing a checkpoint. + """ + + logger.debug("Scaling backend '{}' to {} replicas".format( + backend_tag, num_replicas)) + assert (backend_tag in self.backends + ), "Backend {} is not registered.".format(backend_tag) + assert num_replicas >= 0, ("Number of replicas must be" + " greater than or equal to 0.") + + current_num_replicas = len(self.backend_replicas[backend_tag]) + delta_num_replicas = num_replicas - current_num_replicas + + backend_info: BackendInfo = self.backends[backend_tag] + if delta_num_replicas > 0: + can_schedule = try_schedule_resources_on_nodes(requirements=[ + backend_info.replica_config.resource_dict + for _ in range(delta_num_replicas) + ]) + + if _RESOURCE_CHECK_ENABLED and not all(can_schedule): + num_possible = sum(can_schedule) + raise RayServeException( + "Cannot scale backend {} to {} replicas. Ray Serve tried " + "to add {} replicas but the resources only allows {} " + "to be added. To fix this, consider scaling to replica to " + "{} or add more resources to the cluster. You can check " + "avaiable resources with ray.nodes().".format( + backend_tag, num_replicas, delta_num_replicas, + num_possible, current_num_replicas + num_possible)) + + logger.debug("Adding {} replicas to backend {}".format( + delta_num_replicas, backend_tag)) + for _ in range(delta_num_replicas): + replica_tag = "{}#{}".format(backend_tag, get_random_letters()) + self.backend_replicas_to_start[backend_tag].append(replica_tag) + + elif delta_num_replicas < 0: + logger.debug("Removing {} replicas from backend '{}'".format( + -delta_num_replicas, backend_tag)) + assert len( + self.backend_replicas[backend_tag]) >= delta_num_replicas + replicas_copy = self.backend_replicas.copy() + for _ in range(-delta_num_replicas): + replica_tag, _ = replicas_copy[backend_tag].popitem() + + graceful_timeout_s = (backend_info.backend_config. + experimental_graceful_shutdown_timeout_s) + if force_kill: + graceful_timeout_s = 0 + self.backend_replicas_to_stop[backend_tag].append(( + replica_tag, + graceful_timeout_s, + )) + + def _start_pending_replicas(self): + for backend_tag, replicas_to_create in self.backend_replicas_to_start.\ + items(): + for replica_tag in replicas_to_create: + replica_handle = self._start_backend_replica( + backend_tag, replica_tag) + ready_future = replica_handle.ready.remote().as_future() + self.currently_starting_replicas[ready_future] = ( + backend_tag, replica_tag, replica_handle) + + def _stop_pending_replicas(self): + for backend_tag, replicas_to_stop in ( + self.backend_replicas_to_stop.items()): + for replica_tag, shutdown_timeout in replicas_to_stop: + replica_name = format_actor_name(replica_tag, + self.controller_name) + + async def kill_actor(replica_name_to_use): + # NOTE: the replicas may already be stopped if we failed + # after stopping them but before writing a checkpoint. + try: + replica = ray.get_actor(replica_name_to_use) + except ValueError: + return + + try: + await asyncio.wait_for( + replica.drain_pending_queries.remote(), + timeout=shutdown_timeout) + except asyncio.TimeoutError: + # Graceful period passed, kill it forcefully. + logger.debug( + f"{replica_name_to_use} did not shutdown after " + f"{shutdown_timeout}s, killing.") + finally: + ray.kill(replica, no_restart=True) + + self.currently_stopping_replicas[asyncio.ensure_future( + kill_actor(replica_name))] = (backend_tag, replica_tag) + + async def _check_currently_starting_replicas(self) -> int: + """Returns the number of pending replicas waiting to start""" + in_flight: Set[Future[Any]] = set() + + if self.currently_starting_replicas: + done, in_flight = await asyncio.wait( + list(self.currently_starting_replicas.keys()), timeout=0) + for fut in done: + (backend_tag, replica_tag, + replica_handle) = self.currently_starting_replicas.pop(fut) + self.backend_replicas[backend_tag][ + replica_tag] = replica_handle + + backend = self.backend_replicas_to_start.get(backend_tag) + if backend: + try: + backend.remove(replica_tag) + except ValueError: + pass + if len(backend) == 0: + del self.backend_replicas_to_start[backend_tag] + + async def _check_currently_stopping_replicas(self) -> int: + """Returns the number of replicas waiting to stop""" + in_flight: Set[Future[Any]] = set() + + if self.currently_stopping_replicas: + done_stoppping, in_flight = await asyncio.wait( + list(self.currently_stopping_replicas.keys()), timeout=0) + for fut in done_stoppping: + (backend_tag, + replica_tag) = self.currently_stopping_replicas.pop(fut) + + backend_to_stop = self.backend_replicas_to_stop.get( + backend_tag) + + if backend_to_stop: + try: + backend_to_stop.remove(replica_tag) + except ValueError: + pass + if len(backend_to_stop) == 0: + del self.backend_replicas_to_stop[backend_tag] + + backend = self.backend_replicas.get(backend_tag) + if backend: + try: + del backend[replica_tag] + except KeyError: + pass + + if len(self.backend_replicas[backend_tag]) == 0: + del self.backend_replicas[backend_tag] + + async def update(self) -> bool: + """Returns whether the number of backends has changed.""" + self._start_pending_replicas() + self._stop_pending_replicas() + + num_starting = len(self.currently_starting_replicas) + num_stopping = len(self.currently_stopping_replicas) + + await self._check_currently_starting_replicas() + await self._check_currently_stopping_replicas() + + return (len(self.currently_starting_replicas) != num_starting) or \ + (len(self.currently_stopping_replicas) != num_stopping) diff --git a/python/ray/serve/common.py b/python/ray/serve/common.py new file mode 100644 index 000000000..b951d45be --- /dev/null +++ b/python/ray/serve/common.py @@ -0,0 +1,59 @@ +from pydantic import BaseModel +from typing import Dict, Any +from uuid import UUID + +import numpy as np + +from ray.serve.config import BackendConfig, ReplicaConfig + +BackendTag = str +EndpointTag = str +ReplicaTag = str +NodeId = str +GoalId = UUID +Duration = float + + +class BackendInfo(BaseModel): + # TODO(architkulkarni): Add type hint for worker_class after upgrading + # cloudpickle and adding types to RayServeWrappedReplica + worker_class: Any + backend_config: BackendConfig + replica_config: ReplicaConfig + + class Config: + # TODO(architkulkarni): Remove once ReplicaConfig is a pydantic + # model + arbitrary_types_allowed = True + + +class TrafficPolicy: + def __init__(self, traffic_dict: Dict[str, float]) -> None: + self.traffic_dict: Dict[str, float] = dict() + self.shadow_dict: Dict[str, float] = dict() + self.set_traffic_dict(traffic_dict) + + def set_traffic_dict(self, traffic_dict: Dict[str, float]) -> None: + prob = 0 + for backend, weight in traffic_dict.items(): + if weight < 0: + raise ValueError( + "Attempted to assign a weight of {} to backend '{}'. " + "Weights cannot be negative.".format(weight, backend)) + prob += weight + + # These weights will later be plugged into np.random.choice, which + # uses a tolerance of 1e-8. + if not np.isclose(prob, 1, atol=1e-8): + raise ValueError("Traffic dictionary weights must sum to 1, " + "currently they sum to {}".format(prob)) + self.traffic_dict = traffic_dict + + def set_shadow(self, backend: str, proportion: float): + if proportion == 0 and backend in self.shadow_dict: + del self.shadow_dict[backend] + else: + self.shadow_dict[backend] = proportion + + def __repr__(self) -> str: + return f"" diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index b73cb5667..e523221a2 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -1,500 +1,46 @@ import asyncio -from asyncio.futures import Future from collections import defaultdict import os import random import time from dataclasses import dataclass -from typing import Dict, Any, List, Optional, Set, Tuple +from typing import Dict, Any, Optional from uuid import uuid4, UUID -from pydantic import BaseModel import ray import ray.cloudpickle as pickle from ray.serve.backend_worker import create_backend_replica from ray.serve.constants import ( - ASYNC_CONCURRENCY, - SERVE_PROXY_NAME, - LongPollKey, -) -from ray.serve.http_proxy import HTTPProxyActor + LongPollKey, ) from ray.serve.kv_store import RayInternalKVStore from ray.serve.exceptions import RayServeException -from ray.serve.utils import (format_actor_name, get_random_letters, logger, - try_schedule_resources_on_nodes, get_all_node_ids) +from ray.serve.utils import logger from ray.serve.config import BackendConfig, ReplicaConfig, HTTPConfig from ray.serve.long_poll import LongPollHost +from ray.serve.backend_state import BackendState +from ray.serve.endpoint_state import EndpointState +from ray.serve.http_state import HTTPState +from ray.serve.common import ( + BackendInfo, + BackendTag, + EndpointTag, + GoalId, + ReplicaTag, + NodeId, + TrafficPolicy, +) from ray.actor import ActorHandle -import numpy as np - # Used for testing purposes only. If this is set, the controller will crash # after writing each checkpoint with the specified probability. _CRASH_AFTER_CHECKPOINT_PROBABILITY = 0 CHECKPOINT_KEY = "serve-controller-checkpoint" -# Feature flag for controller resource checking. If true, controller will -# error if the desired replicas exceed current resource availability. -_RESOURCE_CHECK_ENABLED = True - # How often to call the control loop on the controller. CONTROL_LOOP_PERIOD_S = 1.0 REPLICA_STARTUP_TIME_WARNING_S = 5 -# TypeDefs -BackendTag = str -EndpointTag = str -ReplicaTag = str -NodeId = str -GoalId = UUID -Duration = float - - -class TrafficPolicy: - def __init__(self, traffic_dict: Dict[str, float]) -> None: - self.traffic_dict: Dict[str, float] = dict() - self.shadow_dict: Dict[str, float] = dict() - self.set_traffic_dict(traffic_dict) - - def set_traffic_dict(self, traffic_dict: Dict[str, float]) -> None: - prob = 0 - for backend, weight in traffic_dict.items(): - if weight < 0: - raise ValueError( - "Attempted to assign a weight of {} to backend '{}'. " - "Weights cannot be negative.".format(weight, backend)) - prob += weight - - # These weights will later be plugged into np.random.choice, which - # uses a tolerance of 1e-8. - if not np.isclose(prob, 1, atol=1e-8): - raise ValueError("Traffic dictionary weights must sum to 1, " - "currently they sum to {}".format(prob)) - self.traffic_dict = traffic_dict - - def set_shadow(self, backend: str, proportion: float): - if proportion == 0 and backend in self.shadow_dict: - del self.shadow_dict[backend] - else: - self.shadow_dict[backend] = proportion - - def __repr__(self) -> str: - return f"" - - -class HTTPState: - def __init__(self, controller_name: str, detached: bool, - config: HTTPConfig): - self._controller_name = controller_name - self._detached = detached - self._config = config - self._proxy_actors: Dict[NodeId, ActorHandle] = dict() - - # Will populate self.proxy_actors with existing actors. - self._start_proxies_if_needed() - - def get_config(self): - return self._config - - def get_http_proxy_handles(self) -> Dict[NodeId, ActorHandle]: - return self._proxy_actors - - def update(self): - self._start_proxies_if_needed() - self._stop_proxies_if_needed() - - def _start_proxies_if_needed(self) -> None: - """Start a proxy on every node if it doesn't already exist.""" - if self._config.host is None: - return - - for node_id, node_resource in get_all_node_ids(): - if node_id in self._proxy_actors: - continue - - name = format_actor_name(SERVE_PROXY_NAME, self._controller_name, - node_id) - try: - proxy = ray.get_actor(name) - except ValueError: - logger.info("Starting HTTP proxy with name '{}' on node '{}' " - "listening on '{}:{}'".format( - name, node_id, self._config.host, - self._config.port)) - proxy = HTTPProxyActor.options( - name=name, - lifetime="detached" if self._detached else None, - max_concurrency=ASYNC_CONCURRENCY, - max_restarts=-1, - max_task_retries=-1, - resources={ - node_resource: 0.01 - }, - ).remote( - self._config.host, - self._config.port, - controller_name=self._controller_name, - http_middlewares=self._config.middlewares) - - self._proxy_actors[node_id] = proxy - - def _stop_proxies_if_needed(self) -> bool: - """Removes proxy actors from any nodes that no longer exist.""" - all_node_ids = {node_id for node_id, _ in get_all_node_ids()} - to_stop = [] - for node_id in self._proxy_actors: - if node_id not in all_node_ids: - logger.info("Removing HTTP proxy on removed node '{}'.".format( - node_id)) - to_stop.append(node_id) - - for node_id in to_stop: - proxy = self._proxy_actors.pop(node_id) - ray.kill(proxy, no_restart=True) - - -class BackendInfo(BaseModel): - # TODO(architkulkarni): Add type hint for worker_class after upgrading - # cloudpickle and adding types to RayServeWrappedReplica - worker_class: Any - backend_config: BackendConfig - replica_config: ReplicaConfig - - class Config: - # TODO(architkulkarni): Remove once ReplicaConfig is a pydantic - # model - arbitrary_types_allowed = True - - -class BackendState: - def __init__(self, - controller_name: str, - detached: bool, - checkpoint: bytes = None): - self.controller_name = controller_name - self.detached = detached - - # Non-checkpointed state. - self.currently_starting_replicas: Dict[asyncio.Future, Tuple[ - BackendTag, ReplicaTag, ActorHandle]] = dict() - self.currently_stopping_replicas: Dict[asyncio.Future, Tuple[ - BackendTag, ReplicaTag]] = dict() - - # Checkpointed state. - self.backends: Dict[BackendTag, BackendInfo] = dict() - self.backend_replicas: Dict[BackendTag, Dict[ - ReplicaTag, ActorHandle]] = defaultdict(dict) - self.goals: Dict[BackendTag, GoalId] = dict() - self.backend_replicas_to_start: Dict[BackendTag, List[ - ReplicaTag]] = defaultdict(list) - self.backend_replicas_to_stop: Dict[BackendTag, List[Tuple[ - ReplicaTag, Duration]]] = defaultdict(list) - self.backends_to_remove: List[BackendTag] = list() - - if checkpoint is not None: - (self.backends, self.backend_replicas, self.goals, - self.backend_replicas_to_start, self.backend_replicas_to_stop, - self.backend_to_remove) = pickle.loads(checkpoint) - - # Fetch actor handles for all of the backend replicas in the system. - # All of these backend_replicas are guaranteed to already exist because - # they would not be written to a checkpoint in self.backend_replicas - # until they were created. - for backend_tag, replica_dict in self.backend_replicas.items(): - for replica_tag in replica_dict.keys(): - replica_name = format_actor_name(replica_tag, - self.controller_name) - self.backend_replicas[backend_tag][ - replica_tag] = ray.get_actor(replica_name) - - def checkpoint(self): - return pickle.dumps( - (self.backends, self.backend_replicas, self.goals, - self.backend_replicas_to_start, self.backend_replicas_to_stop, - self.backends_to_remove)) - - def get_backend_configs(self) -> Dict[BackendTag, BackendConfig]: - return { - tag: info.backend_config - for tag, info in self.backends.items() - } - - def get_replica_handles( - self) -> Dict[BackendTag, Dict[ReplicaTag, ActorHandle]]: - return self.backend_replicas - - def get_backend(self, backend_tag: BackendTag) -> Optional[BackendInfo]: - return self.backends.get(backend_tag) - - def _set_backend_goal(self, backend_tag: BackendTag, - backend_info: Optional[BackendInfo], - goal_id: GoalId) -> Optional[GoalId]: - existing_goal = self.goals.get(backend_tag) - self.backends[backend_tag] = backend_info - if not backend_info: - del self.backends[backend_tag] - self.goals[backend_tag] = goal_id - return existing_goal - - def completed_goals(self) -> List[GoalId]: - completed_goals = [] - all_tags = set(self.backend_replicas.keys()).union( - set(self.backends.keys())) - - for backend_tag in all_tags: - desired_info = self.backends.get(backend_tag) - existing_info = self.backend_replicas.get(backend_tag) - # Check for deleting - if (not desired_info or - desired_info.backend_config.num_replicas == 0) and \ - (not existing_info or len(existing_info) == 0): - completed_goals.append(self.goals[backend_tag]) - - # Check for a non-zero number of backends - if desired_info and existing_info and desired_info.backend_config.\ - num_replicas == len(existing_info): - completed_goals.append(self.goals[backend_tag]) - return completed_goals - - def _start_backend_replica(self, backend_tag: BackendTag, - replica_tag: ReplicaTag) -> ActorHandle: - """Start a replica and return its actor handle. - - Checks if the named actor already exists before starting a new one. - - Assumes that the backend configuration is already in the Goal State. - """ - # NOTE(edoakes): the replicas may already be created if we - # failed after creating them but before writing a - # checkpoint. - replica_name = format_actor_name(replica_tag, self.controller_name) - try: - replica_handle = ray.get_actor(replica_name) - except ValueError: - logger.debug("Starting replica '{}' for backend '{}'.".format( - replica_tag, backend_tag)) - backend_info = self.get_backend(backend_tag) - - replica_handle = ray.remote(backend_info.worker_class).options( - name=replica_name, - lifetime="detached" if self.detached else None, - max_restarts=-1, - max_task_retries=-1, - **backend_info.replica_config.ray_actor_options).remote( - backend_tag, replica_tag, - backend_info.replica_config.actor_init_args, - backend_info.backend_config, self.controller_name) - - return replica_handle - - def scale_backend_replicas( - self, - backend_tag: BackendTag, - num_replicas: int, - force_kill: bool = False, - ) -> None: - """Scale the given backend to the number of replicas. - - NOTE: this does not actually start or stop the replicas, but instead - adds the intention to start/stop them to self.backend_replicas_to_start - and self.backend_replicas_to_stop. The caller is responsible for then - first writing a checkpoint and then actually starting/stopping the - intended replicas. This avoids inconsistencies with starting/stopping a - replica and then crashing before writing a checkpoint. - """ - - logger.debug("Scaling backend '{}' to {} replicas".format( - backend_tag, num_replicas)) - assert (backend_tag in self.backends - ), "Backend {} is not registered.".format(backend_tag) - assert num_replicas >= 0, ("Number of replicas must be" - " greater than or equal to 0.") - - current_num_replicas = len(self.backend_replicas[backend_tag]) - delta_num_replicas = num_replicas - current_num_replicas - - backend_info: BackendInfo = self.backends[backend_tag] - if delta_num_replicas > 0: - can_schedule = try_schedule_resources_on_nodes(requirements=[ - backend_info.replica_config.resource_dict - for _ in range(delta_num_replicas) - ]) - - if _RESOURCE_CHECK_ENABLED and not all(can_schedule): - num_possible = sum(can_schedule) - raise RayServeException( - "Cannot scale backend {} to {} replicas. Ray Serve tried " - "to add {} replicas but the resources only allows {} " - "to be added. To fix this, consider scaling to replica to " - "{} or add more resources to the cluster. You can check " - "avaiable resources with ray.nodes().".format( - backend_tag, num_replicas, delta_num_replicas, - num_possible, current_num_replicas + num_possible)) - - logger.debug("Adding {} replicas to backend {}".format( - delta_num_replicas, backend_tag)) - for _ in range(delta_num_replicas): - replica_tag = "{}#{}".format(backend_tag, get_random_letters()) - self.backend_replicas_to_start[backend_tag].append(replica_tag) - - elif delta_num_replicas < 0: - logger.debug("Removing {} replicas from backend '{}'".format( - -delta_num_replicas, backend_tag)) - assert len( - self.backend_replicas[backend_tag]) >= delta_num_replicas - replicas_copy = self.backend_replicas.copy() - for _ in range(-delta_num_replicas): - replica_tag, _ = replicas_copy[backend_tag].popitem() - - graceful_timeout_s = (backend_info.backend_config. - experimental_graceful_shutdown_timeout_s) - if force_kill: - graceful_timeout_s = 0 - self.backend_replicas_to_stop[backend_tag].append(( - replica_tag, - graceful_timeout_s, - )) - - def _start_pending_replicas(self): - for backend_tag, replicas_to_create in self.backend_replicas_to_start.\ - items(): - for replica_tag in replicas_to_create: - replica_handle = self._start_backend_replica( - backend_tag, replica_tag) - ready_future = replica_handle.ready.remote().as_future() - self.currently_starting_replicas[ready_future] = ( - backend_tag, replica_tag, replica_handle) - - def _stop_pending_replicas(self): - for backend_tag, replicas_to_stop in ( - self.backend_replicas_to_stop.items()): - for replica_tag, shutdown_timeout in replicas_to_stop: - replica_name = format_actor_name(replica_tag, - self.controller_name) - - async def kill_actor(replica_name_to_use): - # NOTE: the replicas may already be stopped if we failed - # after stopping them but before writing a checkpoint. - try: - replica = ray.get_actor(replica_name_to_use) - except ValueError: - return - - try: - await asyncio.wait_for( - replica.drain_pending_queries.remote(), - timeout=shutdown_timeout) - except asyncio.TimeoutError: - # Graceful period passed, kill it forcefully. - logger.debug( - f"{replica_name_to_use} did not shutdown after " - f"{shutdown_timeout}s, killing.") - finally: - ray.kill(replica, no_restart=True) - - self.currently_stopping_replicas[asyncio.ensure_future( - kill_actor(replica_name))] = (backend_tag, replica_tag) - - async def _check_currently_starting_replicas(self) -> int: - """Returns the number of pending replicas waiting to start""" - in_flight: Set[Future[Any]] = set() - - if self.currently_starting_replicas: - done, in_flight = await asyncio.wait( - list(self.currently_starting_replicas.keys()), timeout=0) - for fut in done: - (backend_tag, replica_tag, - replica_handle) = self.currently_starting_replicas.pop(fut) - self.backend_replicas[backend_tag][ - replica_tag] = replica_handle - - backend = self.backend_replicas_to_start.get(backend_tag) - if backend: - try: - backend.remove(replica_tag) - except ValueError: - pass - if len(backend) == 0: - del self.backend_replicas_to_start[backend_tag] - - async def _check_currently_stopping_replicas(self) -> int: - """Returns the number of replicas waiting to stop""" - in_flight: Set[Future[Any]] = set() - - if self.currently_stopping_replicas: - done_stoppping, in_flight = await asyncio.wait( - list(self.currently_stopping_replicas.keys()), timeout=0) - for fut in done_stoppping: - (backend_tag, - replica_tag) = self.currently_stopping_replicas.pop(fut) - - backend_to_stop = self.backend_replicas_to_stop.get( - backend_tag) - - if backend_to_stop: - try: - backend_to_stop.remove(replica_tag) - except ValueError: - pass - if len(backend_to_stop) == 0: - del self.backend_replicas_to_stop[backend_tag] - - backend = self.backend_replicas.get(backend_tag) - if backend: - try: - del backend[replica_tag] - except KeyError: - pass - - if len(self.backend_replicas[backend_tag]) == 0: - del self.backend_replicas[backend_tag] - - async def update(self) -> bool: - """Returns whether the number of backends has changed.""" - self._start_pending_replicas() - self._stop_pending_replicas() - - num_starting = len(self.currently_starting_replicas) - num_stopping = len(self.currently_stopping_replicas) - - await self._check_currently_starting_replicas() - await self._check_currently_stopping_replicas() - - return (len(self.currently_starting_replicas) != num_starting) or \ - (len(self.currently_stopping_replicas) != num_stopping) - - -class EndpointState: - def __init__(self, checkpoint: bytes = None): - self.routes: Dict[BackendTag, Tuple[EndpointTag, Any]] = dict() - self.traffic_policies: Dict[EndpointTag, TrafficPolicy] = dict() - - if checkpoint is not None: - self.routes, self.traffic_policies = pickle.loads(checkpoint) - - def checkpoint(self): - return pickle.dumps((self.routes, self.traffic_policies)) - - def get_endpoints(self) -> Dict[EndpointTag, Dict[str, Any]]: - endpoints = {} - for route, (endpoint, methods) in self.routes.items(): - if endpoint in self.traffic_policies: - traffic_policy = self.traffic_policies[endpoint] - traffic_dict = traffic_policy.traffic_dict - shadow_dict = traffic_policy.shadow_dict - else: - traffic_dict = {} - shadow_dict = {} - - endpoints[endpoint] = { - "route": route if route.startswith("/") else None, - "methods": methods, - "traffic": traffic_dict, - "shadows": shadow_dict, - } - return endpoints - @dataclass class FutureResult: diff --git a/python/ray/serve/endpoint_state.py b/python/ray/serve/endpoint_state.py new file mode 100644 index 000000000..4fc880a95 --- /dev/null +++ b/python/ray/serve/endpoint_state.py @@ -0,0 +1,35 @@ +from typing import Dict, Any, Tuple + +import ray.cloudpickle as pickle +from ray.serve.common import BackendTag, EndpointTag, TrafficPolicy + + +class EndpointState: + def __init__(self, checkpoint: bytes = None): + self.routes: Dict[BackendTag, Tuple[EndpointTag, Any]] = dict() + self.traffic_policies: Dict[EndpointTag, TrafficPolicy] = dict() + + if checkpoint is not None: + self.routes, self.traffic_policies = pickle.loads(checkpoint) + + def checkpoint(self): + return pickle.dumps((self.routes, self.traffic_policies)) + + def get_endpoints(self) -> Dict[EndpointTag, Dict[str, Any]]: + endpoints = {} + for route, (endpoint, methods) in self.routes.items(): + if endpoint in self.traffic_policies: + traffic_policy = self.traffic_policies[endpoint] + traffic_dict = traffic_policy.traffic_dict + shadow_dict = traffic_policy.shadow_dict + else: + traffic_dict = {} + shadow_dict = {} + + endpoints[endpoint] = { + "route": route if route.startswith("/") else None, + "methods": methods, + "traffic": traffic_dict, + "shadows": shadow_dict, + } + return endpoints diff --git a/python/ray/serve/http_state.py b/python/ray/serve/http_state.py new file mode 100644 index 000000000..76027aea6 --- /dev/null +++ b/python/ray/serve/http_state.py @@ -0,0 +1,80 @@ +from typing import Dict + +import ray +from ray.actor import ActorHandle +from ray.serve.config import HTTPConfig +from ray.serve.constants import ASYNC_CONCURRENCY, SERVE_PROXY_NAME +from ray.serve.http_proxy import HTTPProxyActor +from ray.serve.utils import format_actor_name, logger, get_all_node_ids +from ray.serve.common import NodeId + + +class HTTPState: + def __init__(self, controller_name: str, detached: bool, + config: HTTPConfig): + self._controller_name = controller_name + self._detached = detached + self._config = config + self._proxy_actors: Dict[NodeId, ActorHandle] = dict() + + # Will populate self.proxy_actors with existing actors. + self._start_proxies_if_needed() + + def get_config(self): + return self._config + + def get_http_proxy_handles(self) -> Dict[NodeId, ActorHandle]: + return self._proxy_actors + + def update(self): + self._start_proxies_if_needed() + self._stop_proxies_if_needed() + + def _start_proxies_if_needed(self) -> None: + """Start a proxy on every node if it doesn't already exist.""" + if self._config.host is None: + return + + for node_id, node_resource in get_all_node_ids(): + if node_id in self._proxy_actors: + continue + + name = format_actor_name(SERVE_PROXY_NAME, self._controller_name, + node_id) + try: + proxy = ray.get_actor(name) + except ValueError: + logger.info("Starting HTTP proxy with name '{}' on node '{}' " + "listening on '{}:{}'".format( + name, node_id, self._config.host, + self._config.port)) + proxy = HTTPProxyActor.options( + name=name, + lifetime="detached" if self._detached else None, + max_concurrency=ASYNC_CONCURRENCY, + max_restarts=-1, + max_task_retries=-1, + resources={ + node_resource: 0.01 + }, + ).remote( + self._config.host, + self._config.port, + controller_name=self._controller_name, + http_middlewares=self._config.middlewares) + + self._proxy_actors[node_id] = proxy + + def _stop_proxies_if_needed(self) -> bool: + """Removes proxy actors from any nodes that no longer exist.""" + all_node_ids = {node_id for node_id, _ in get_all_node_ids()} + to_stop = [] + for node_id in self._proxy_actors: + if node_id not in all_node_ids: + logger.info("Removing HTTP proxy on removed node '{}'.".format( + node_id)) + to_stop.append(node_id) + + for node_id in to_stop: + proxy = self._proxy_actors.pop(node_id) + ray.kill(proxy, no_restart=True)