mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 11:10:25 +08:00
[serve] Cleanup backend state, move checkpointing and async goal logic inside (#13298)
This commit is contained in:
@@ -169,11 +169,11 @@ class Client:
|
||||
self._shutdown = True
|
||||
|
||||
@_ensure_connected
|
||||
def _get_result(self, result_object_id: ray.ObjectRef) -> bool:
|
||||
result_id: UUID = ray.get(result_object_id)
|
||||
result = ray.get(self._controller.wait_for_event.remote(result_id))
|
||||
logger.debug(f"Getting result_id ({result_id}) with result: {result}")
|
||||
return result
|
||||
def _wait_for_goal(self, result_object_id: ray.ObjectRef) -> bool:
|
||||
goal_id: Optional[UUID] = ray.get(result_object_id)
|
||||
if goal_id is not None:
|
||||
ray.get(self._controller.wait_for_goal.remote(goal_id))
|
||||
logger.debug(f"Goal {goal_id} completed.")
|
||||
|
||||
@_ensure_connected
|
||||
def create_endpoint(self,
|
||||
@@ -229,7 +229,7 @@ class Client:
|
||||
"an element of type {}".format(type(method)))
|
||||
upper_methods.append(method.upper())
|
||||
|
||||
self._get_result(
|
||||
self._wait_for_goal(
|
||||
self._controller.create_endpoint.remote(
|
||||
endpoint_name, {backend: 1.0}, route, upper_methods))
|
||||
|
||||
@@ -306,7 +306,7 @@ class Client:
|
||||
"config_options must be a BackendConfig or dictionary.")
|
||||
if isinstance(config_options, dict):
|
||||
config_options = BackendConfig.parse_obj(config_options)
|
||||
self._get_result(
|
||||
self._wait_for_goal(
|
||||
self._controller.update_backend_config.remote(
|
||||
backend_tag, config_options))
|
||||
|
||||
@@ -404,7 +404,7 @@ class Client:
|
||||
raise TypeError("config must be a BackendConfig or a dictionary.")
|
||||
|
||||
backend_config._validate_complete()
|
||||
self._get_result(
|
||||
self._wait_for_goal(
|
||||
self._controller.create_backend.remote(backend_tag, backend_config,
|
||||
replica_config))
|
||||
|
||||
@@ -427,7 +427,7 @@ class Client:
|
||||
force (bool): Whether or not to force the deletion, without waiting
|
||||
for graceful shutdown. Default to false.
|
||||
"""
|
||||
self._get_result(
|
||||
self._wait_for_goal(
|
||||
self._controller.delete_backend.remote(backend_tag, force))
|
||||
|
||||
@_ensure_connected
|
||||
|
||||
@@ -1,17 +1,30 @@
|
||||
import asyncio
|
||||
from asyncio.futures import Future
|
||||
from collections import defaultdict
|
||||
import time
|
||||
from typing import Dict, Any, List, Optional, Set, Tuple
|
||||
from uuid import uuid4
|
||||
|
||||
import ray
|
||||
import ray.cloudpickle as pickle
|
||||
from ray.actor import ActorHandle
|
||||
from ray.serve.config import BackendConfig
|
||||
from ray.serve.backend_worker import create_backend_replica
|
||||
from ray.serve.common import (
|
||||
BackendInfo,
|
||||
BackendTag,
|
||||
Duration,
|
||||
GoalId,
|
||||
ReplicaTag,
|
||||
)
|
||||
from ray.serve.config import BackendConfig, ReplicaConfig
|
||||
from ray.serve.constants import LongPollKey
|
||||
from ray.serve.exceptions import RayServeException
|
||||
from ray.serve.kv_store import RayInternalKVStore
|
||||
from ray.serve.long_poll import LongPollHost
|
||||
from ray.serve.utils import (format_actor_name, get_random_letters, logger,
|
||||
try_schedule_resources_on_nodes)
|
||||
from ray.serve.common import (BackendInfo, BackendTag, Duration, GoalId,
|
||||
ReplicaTag)
|
||||
|
||||
CHECKPOINT_KEY = "serve-backend-state-checkpoint"
|
||||
|
||||
# Feature flag for controller resource checking. If true, controller will
|
||||
# error if the desired replicas exceed current resource availability.
|
||||
@@ -25,12 +38,12 @@ class BackendState:
|
||||
called with a lock held.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
controller_name: str,
|
||||
detached: bool,
|
||||
checkpoint: bytes = None):
|
||||
self.controller_name = controller_name
|
||||
self.detached = detached
|
||||
def __init__(self, controller_name: str, detached: bool,
|
||||
kv_store: RayInternalKVStore, long_poll_host: LongPollHost):
|
||||
self._controller_name = controller_name
|
||||
self._detached = detached
|
||||
self._kv_store = kv_store
|
||||
self._long_poll_host = long_poll_host
|
||||
|
||||
# Non-checkpointed state.
|
||||
self.currently_starting_replicas: Dict[asyncio.Future, Tuple[
|
||||
@@ -48,28 +61,50 @@ class BackendState:
|
||||
self.backend_replicas_to_stop: Dict[BackendTag, List[Tuple[
|
||||
ReplicaTag, Duration]]] = defaultdict(list)
|
||||
self.backends_to_remove: List[BackendTag] = list()
|
||||
self.pending_goals: Dict[GoalId, asyncio.Event] = dict()
|
||||
|
||||
checkpoint = self._kv_store.get(CHECKPOINT_KEY)
|
||||
if checkpoint is not None:
|
||||
(self.backends, self.backend_replicas, self.goals,
|
||||
self.backend_replicas_to_start, self.backend_replicas_to_stop,
|
||||
self.backend_to_remove) = pickle.loads(checkpoint)
|
||||
self.backend_to_remove,
|
||||
pending_goal_ids) = pickle.loads(checkpoint)
|
||||
|
||||
# Fetch actor handles for all of the backend replicas in the system.
|
||||
# All of these backend_replicas are guaranteed to already exist because
|
||||
# they would not be written to a checkpoint in self.backend_replicas
|
||||
# until they were created.
|
||||
for backend_tag, replica_dict in self.backend_replicas.items():
|
||||
for replica_tag in replica_dict.keys():
|
||||
replica_name = format_actor_name(replica_tag,
|
||||
self.controller_name)
|
||||
self.backend_replicas[backend_tag][
|
||||
replica_tag] = ray.get_actor(replica_name)
|
||||
for goal_id in pending_goal_ids:
|
||||
self._create_goal(goal_id)
|
||||
|
||||
def checkpoint(self):
|
||||
return pickle.dumps(
|
||||
(self.backends, self.backend_replicas, self.goals,
|
||||
self.backend_replicas_to_start, self.backend_replicas_to_stop,
|
||||
self.backends_to_remove))
|
||||
# Fetch actor handles for all backend replicas in the system.
|
||||
# All of these backend_replicas are guaranteed to already exist
|
||||
# because they would not be written to a checkpoint in
|
||||
# self.backend_replicas until they were created.
|
||||
for backend_tag, replica_dict in self.backend_replicas.items():
|
||||
for replica_tag in replica_dict.keys():
|
||||
replica_name = format_actor_name(replica_tag,
|
||||
self._controller_name)
|
||||
self.backend_replicas[backend_tag][
|
||||
replica_tag] = ray.get_actor(replica_name)
|
||||
|
||||
self._notify_backend_configs_changed()
|
||||
self._notify_replica_handles_changed()
|
||||
|
||||
def _checkpoint(self) -> None:
|
||||
self._kv_store.put(
|
||||
CHECKPOINT_KEY,
|
||||
pickle.dumps(
|
||||
(self.backends, self.backend_replicas, self.goals,
|
||||
self.backend_replicas_to_start, self.backend_replicas_to_stop,
|
||||
self.backends_to_remove, list(self.pending_goals.keys()))))
|
||||
|
||||
def _notify_backend_configs_changed(self) -> None:
|
||||
self._long_poll_host.notify_changed(LongPollKey.BACKEND_CONFIGS,
|
||||
self.get_backend_configs())
|
||||
|
||||
def _notify_replica_handles_changed(self) -> None:
|
||||
self._long_poll_host.notify_changed(
|
||||
LongPollKey.REPLICA_HANDLES, {
|
||||
backend_tag: list(replica_dict.values())
|
||||
for backend_tag, replica_dict in self.backend_replicas.items()
|
||||
})
|
||||
|
||||
def get_backend_configs(self) -> Dict[BackendTag, BackendConfig]:
|
||||
return {
|
||||
@@ -84,35 +119,136 @@ class BackendState:
|
||||
def get_backend(self, backend_tag: BackendTag) -> Optional[BackendInfo]:
|
||||
return self.backends.get(backend_tag)
|
||||
|
||||
def num_pending_goals(self) -> int:
|
||||
return len(self.pending_goals)
|
||||
|
||||
async def wait_for_goal(self, goal_id: GoalId) -> None:
|
||||
start = time.time()
|
||||
if goal_id not in self.pending_goals:
|
||||
logger.debug(f"Goal {goal_id} not found")
|
||||
return True
|
||||
event = self.pending_goals[goal_id]
|
||||
await event.wait()
|
||||
logger.debug(
|
||||
f"Waiting for goal {goal_id} took {time.time() - start} seconds")
|
||||
|
||||
def _complete_goal(self, goal_id: GoalId) -> None:
|
||||
logger.debug(f"Completing goal {goal_id}")
|
||||
event = self.pending_goals.pop(goal_id, None)
|
||||
if event:
|
||||
event.set()
|
||||
|
||||
def _create_goal(self, goal_id: Optional[GoalId] = None) -> GoalId:
|
||||
if goal_id is None:
|
||||
goal_id = uuid4()
|
||||
event = asyncio.Event()
|
||||
self.pending_goals[goal_id] = event
|
||||
return goal_id
|
||||
|
||||
def _set_backend_goal(self, backend_tag: BackendTag,
|
||||
backend_info: Optional[BackendInfo],
|
||||
goal_id: GoalId) -> Optional[GoalId]:
|
||||
backend_info: BackendInfo) -> None:
|
||||
existing_goal = self.goals.get(backend_tag)
|
||||
self.backends[backend_tag] = backend_info
|
||||
if not backend_info:
|
||||
new_goal = self._create_goal()
|
||||
|
||||
if backend_info is not None:
|
||||
self.backends[backend_tag] = backend_info
|
||||
|
||||
self.goals[backend_tag] = new_goal
|
||||
|
||||
return new_goal, existing_goal
|
||||
|
||||
def create_backend(self, backend_tag: BackendTag,
|
||||
backend_config: BackendConfig,
|
||||
replica_config: ReplicaConfig) -> Optional[GoalId]:
|
||||
# Ensures this method is idempotent.
|
||||
backend_info = self.backends.get(backend_tag)
|
||||
if backend_info is not None:
|
||||
if (backend_info.backend_config == backend_config
|
||||
and backend_info.replica_config == replica_config):
|
||||
return None
|
||||
|
||||
backend_replica = create_backend_replica(replica_config.func_or_class)
|
||||
|
||||
# Save creator that starts replicas, the arguments to be passed in,
|
||||
# and the configuration for the backends.
|
||||
backend_info = BackendInfo(
|
||||
worker_class=backend_replica,
|
||||
backend_config=backend_config,
|
||||
replica_config=replica_config)
|
||||
|
||||
new_goal, existing_goal = self._set_backend_goal(
|
||||
backend_tag, backend_info)
|
||||
|
||||
try:
|
||||
self.scale_backend_replicas(backend_tag,
|
||||
backend_config.num_replicas)
|
||||
except RayServeException as e:
|
||||
del self.backends[backend_tag]
|
||||
self.goals[backend_tag] = goal_id
|
||||
return existing_goal
|
||||
raise e
|
||||
|
||||
def completed_goals(self) -> List[GoalId]:
|
||||
completed_goals = []
|
||||
all_tags = set(self.backend_replicas.keys()).union(
|
||||
set(self.backends.keys()))
|
||||
# NOTE(edoakes): we must write a checkpoint before starting new
|
||||
# or pushing the updated config to avoid inconsistent state if we
|
||||
# crash while making the change.
|
||||
self._checkpoint()
|
||||
self._notify_backend_configs_changed()
|
||||
|
||||
for backend_tag in all_tags:
|
||||
desired_info = self.backends.get(backend_tag)
|
||||
existing_info = self.backend_replicas.get(backend_tag)
|
||||
# Check for deleting
|
||||
if (not desired_info or
|
||||
desired_info.backend_config.num_replicas == 0) and \
|
||||
(not existing_info or len(existing_info) == 0):
|
||||
completed_goals.append(self.goals[backend_tag])
|
||||
if existing_goal is not None:
|
||||
self._complete_goal(existing_goal)
|
||||
return new_goal
|
||||
|
||||
# Check for a non-zero number of backends
|
||||
if desired_info and existing_info and desired_info.backend_config.\
|
||||
num_replicas == len(existing_info):
|
||||
completed_goals.append(self.goals[backend_tag])
|
||||
return completed_goals
|
||||
def delete_backend(self, backend_tag: BackendTag,
|
||||
force_kill: bool = False) -> Optional[GoalId]:
|
||||
# This method must be idempotent. We should validate that the
|
||||
# specified backend exists on the client.
|
||||
if backend_tag not in self.backends:
|
||||
return None
|
||||
|
||||
# Scale its replicas down to 0.
|
||||
self.scale_backend_replicas(backend_tag, 0, force_kill)
|
||||
|
||||
# Remove the backend's metadata.
|
||||
del self.backends[backend_tag]
|
||||
|
||||
# Add the intention to remove the backend from the routers.
|
||||
self.backends_to_remove.append(backend_tag)
|
||||
|
||||
new_goal, existing_goal = self._set_backend_goal(backend_tag, None)
|
||||
|
||||
self._checkpoint()
|
||||
if existing_goal is not None:
|
||||
self._complete_goal(existing_goal)
|
||||
return new_goal
|
||||
|
||||
def update_backend_config(self, backend_tag: BackendTag,
|
||||
config_options: BackendConfig):
|
||||
if backend_tag not in self.backends:
|
||||
raise ValueError(f"Backend {backend_tag} is not registered")
|
||||
|
||||
stored_backend_config = self.backends[backend_tag].backend_config
|
||||
updated_config = stored_backend_config.copy(
|
||||
update=config_options.dict(exclude_unset=True))
|
||||
updated_config._validate_complete()
|
||||
self.backends[backend_tag].backend_config = updated_config
|
||||
|
||||
new_goal, existing_goal = self._set_backend_goal(
|
||||
backend_tag, self.backends[backend_tag])
|
||||
|
||||
# Scale the replicas with the new configuration.
|
||||
self.scale_backend_replicas(backend_tag, updated_config.num_replicas)
|
||||
|
||||
# NOTE(edoakes): we must write a checkpoint before pushing the
|
||||
# update to avoid inconsistent state if we crash after pushing the
|
||||
# update.
|
||||
self._checkpoint()
|
||||
if existing_goal is not None:
|
||||
self._complete_goal(existing_goal)
|
||||
|
||||
# Inform the routers and backend replicas about config changes.
|
||||
# TODO(edoakes): this should only happen if we change something other
|
||||
# than num_replicas.
|
||||
self._notify_backend_configs_changed()
|
||||
|
||||
return new_goal
|
||||
|
||||
def _start_backend_replica(self, backend_tag: BackendTag,
|
||||
replica_tag: ReplicaTag) -> ActorHandle:
|
||||
@@ -125,7 +261,7 @@ class BackendState:
|
||||
# NOTE(edoakes): the replicas may already be created if we
|
||||
# failed after creating them but before writing a
|
||||
# checkpoint.
|
||||
replica_name = format_actor_name(replica_tag, self.controller_name)
|
||||
replica_name = format_actor_name(replica_tag, self._controller_name)
|
||||
try:
|
||||
replica_handle = ray.get_actor(replica_name)
|
||||
except ValueError:
|
||||
@@ -135,13 +271,13 @@ class BackendState:
|
||||
|
||||
replica_handle = ray.remote(backend_info.worker_class).options(
|
||||
name=replica_name,
|
||||
lifetime="detached" if self.detached else None,
|
||||
lifetime="detached" if self._detached else None,
|
||||
max_restarts=-1,
|
||||
max_task_retries=-1,
|
||||
**backend_info.replica_config.ray_actor_options).remote(
|
||||
backend_tag, replica_tag,
|
||||
backend_info.replica_config.actor_init_args,
|
||||
backend_info.backend_config, self.controller_name)
|
||||
backend_info.backend_config, self._controller_name)
|
||||
|
||||
return replica_handle
|
||||
|
||||
@@ -228,7 +364,7 @@ class BackendState:
|
||||
self.backend_replicas_to_stop.items()):
|
||||
for replica_tag, shutdown_timeout in replicas_to_stop:
|
||||
replica_name = format_actor_name(replica_tag,
|
||||
self.controller_name)
|
||||
self._controller_name)
|
||||
|
||||
async def kill_actor(replica_name_to_use):
|
||||
# NOTE: the replicas may already be stopped if we failed
|
||||
@@ -280,9 +416,9 @@ class BackendState:
|
||||
in_flight: Set[Future[Any]] = set()
|
||||
|
||||
if self.currently_stopping_replicas:
|
||||
done_stoppping, in_flight = await asyncio.wait(
|
||||
done_stopping, in_flight = await asyncio.wait(
|
||||
list(self.currently_stopping_replicas.keys()), timeout=0)
|
||||
for fut in done_stoppping:
|
||||
for fut in done_stopping:
|
||||
(backend_tag,
|
||||
replica_tag) = self.currently_stopping_replicas.pop(fut)
|
||||
|
||||
@@ -307,8 +443,30 @@ class BackendState:
|
||||
if len(self.backend_replicas[backend_tag]) == 0:
|
||||
del self.backend_replicas[backend_tag]
|
||||
|
||||
def _completed_goals(self) -> List[GoalId]:
|
||||
completed_goals = []
|
||||
all_tags = set(self.backend_replicas.keys()).union(
|
||||
set(self.backends.keys()))
|
||||
|
||||
for backend_tag in all_tags:
|
||||
desired_info = self.backends.get(backend_tag)
|
||||
existing_info = self.backend_replicas.get(backend_tag)
|
||||
# Check for deleting
|
||||
if (not desired_info or
|
||||
desired_info.backend_config.num_replicas == 0) and \
|
||||
(not existing_info or len(existing_info) == 0):
|
||||
completed_goals.append(self.goals[backend_tag])
|
||||
|
||||
# Check for a non-zero number of backends
|
||||
if desired_info and existing_info and desired_info.backend_config.\
|
||||
num_replicas == len(existing_info):
|
||||
completed_goals.append(self.goals[backend_tag])
|
||||
return completed_goals
|
||||
|
||||
async def update(self) -> bool:
|
||||
"""Returns whether the number of backends has changed."""
|
||||
for goal_id in self._completed_goals():
|
||||
self._complete_goal(goal_id)
|
||||
|
||||
self._start_pending_replicas()
|
||||
self._stop_pending_replicas()
|
||||
|
||||
@@ -318,5 +476,7 @@ class BackendState:
|
||||
await self._check_currently_starting_replicas()
|
||||
await self._check_currently_stopping_replicas()
|
||||
|
||||
return (len(self.currently_starting_replicas) != num_starting) or \
|
||||
(len(self.currently_stopping_replicas) != num_stopping)
|
||||
if (len(self.currently_starting_replicas) != num_starting) or \
|
||||
(len(self.currently_stopping_replicas) != num_stopping):
|
||||
self._checkpoint()
|
||||
self._notify_replica_handles_changed()
|
||||
|
||||
+28
-248
@@ -1,20 +1,11 @@
|
||||
import asyncio
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Any, List, Optional
|
||||
from uuid import uuid4, UUID
|
||||
|
||||
import ray
|
||||
import ray.cloudpickle as pickle
|
||||
from ray.actor import ActorHandle
|
||||
from ray.serve.backend_state import BackendState
|
||||
from ray.serve.backend_worker import create_backend_replica
|
||||
from ray.serve.constants import LongPollKey
|
||||
from ray.serve.common import (
|
||||
BackendInfo,
|
||||
BackendTag,
|
||||
EndpointTag,
|
||||
GoalId,
|
||||
@@ -24,7 +15,6 @@ from ray.serve.common import (
|
||||
)
|
||||
from ray.serve.config import BackendConfig, HTTPOptions, ReplicaConfig
|
||||
from ray.serve.endpoint_state import EndpointState
|
||||
from ray.serve.exceptions import RayServeException
|
||||
from ray.serve.http_state import HTTPState
|
||||
from ray.serve.kv_store import RayInternalKVStore
|
||||
from ray.serve.long_poll import LongPollHost
|
||||
@@ -39,19 +29,6 @@ CHECKPOINT_KEY = "serve-controller-checkpoint"
|
||||
CONTROL_LOOP_PERIOD_S = 1.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class FutureResult:
|
||||
# Goal requested when this future was created
|
||||
requested_goal: Dict[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Checkpoint:
|
||||
backend_state_checkpoint: bytes
|
||||
# TODO(ilr) Rename reconciler to PendingState
|
||||
inflight_reqs: Dict[uuid4, FutureResult]
|
||||
|
||||
|
||||
@ray.remote
|
||||
class ServeController:
|
||||
"""Responsible for managing the state of the serving system.
|
||||
@@ -92,11 +69,6 @@ class ServeController:
|
||||
# at any given time.
|
||||
self.write_lock = asyncio.Lock()
|
||||
|
||||
# Map of awaiting results
|
||||
# TODO(ilr): Checkpoint this once this becomes asynchronous
|
||||
self.inflight_results: Dict[UUID, asyncio.Event] = dict()
|
||||
self._serializable_inflight_results: Dict[UUID, FutureResult] = dict()
|
||||
|
||||
# NOTE(simon): Currently we do all-to-all broadcast. This means
|
||||
# any listeners will receive notification for all changes. This
|
||||
# can be problem at scale, e.g. updating a single backend config
|
||||
@@ -106,70 +78,16 @@ class ServeController:
|
||||
|
||||
self.http_state = HTTPState(controller_name, detached, http_config)
|
||||
self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host)
|
||||
|
||||
checkpoint_bytes = self.kv_store.get(CHECKPOINT_KEY)
|
||||
if checkpoint_bytes is None:
|
||||
logger.debug("No checkpoint found")
|
||||
self.backend_state = BackendState(controller_name, detached)
|
||||
else:
|
||||
checkpoint: Checkpoint = pickle.loads(checkpoint_bytes)
|
||||
self.backend_state = BackendState(
|
||||
controller_name,
|
||||
detached,
|
||||
checkpoint=checkpoint.backend_state_checkpoint)
|
||||
|
||||
self._serializable_inflight_results = checkpoint.inflight_reqs
|
||||
for uuid, fut_result in self._serializable_inflight_results.items(
|
||||
):
|
||||
self._create_event_with_result(fut_result.requested_goal, uuid)
|
||||
|
||||
self.notify_backend_configs_changed()
|
||||
self.notify_replica_handles_changed()
|
||||
self.backend_state = BackendState(controller_name, detached,
|
||||
self.kv_store, self.long_poll_host)
|
||||
|
||||
asyncio.get_event_loop().create_task(self.run_control_loop())
|
||||
|
||||
async def wait_for_event(self, uuid: UUID) -> bool:
|
||||
start = time.time()
|
||||
if uuid not in self.inflight_results:
|
||||
logger.debug(f"UUID ({uuid}) not found!!!")
|
||||
return True
|
||||
event = self.inflight_results[uuid]
|
||||
await event.wait()
|
||||
self.inflight_results.pop(uuid)
|
||||
self._serializable_inflight_results.pop(uuid)
|
||||
async with self.write_lock:
|
||||
self._checkpoint()
|
||||
logger.debug(f"Waiting for {uuid} took {time.time() - start} seconds")
|
||||
async def wait_for_goal(self, goal_id: GoalId) -> None:
|
||||
await self.backend_state.wait_for_goal(goal_id)
|
||||
|
||||
return True
|
||||
|
||||
def _create_event_with_result(
|
||||
self,
|
||||
goal_state: Dict[str, any],
|
||||
recreation_uuid: Optional[UUID] = None) -> UUID:
|
||||
# NOTE(ilr) Must be called before checkpointing!
|
||||
event = asyncio.Event()
|
||||
event.result = FutureResult(goal_state)
|
||||
uuid_val = recreation_uuid or uuid4()
|
||||
self.inflight_results[uuid_val] = event
|
||||
self._serializable_inflight_results[uuid_val] = event.result
|
||||
return uuid_val
|
||||
|
||||
async def _num_inflight_results(self) -> int:
|
||||
return len(self.inflight_results)
|
||||
|
||||
def notify_replica_handles_changed(self):
|
||||
self.long_poll_host.notify_changed(
|
||||
LongPollKey.REPLICA_HANDLES, {
|
||||
backend_tag: list(replica_dict.values())
|
||||
for backend_tag, replica_dict in
|
||||
self.backend_state.backend_replicas.items()
|
||||
})
|
||||
|
||||
def notify_backend_configs_changed(self):
|
||||
self.long_poll_host.notify_changed(
|
||||
LongPollKey.BACKEND_CONFIGS,
|
||||
self.backend_state.get_backend_configs())
|
||||
async def _num_pending_goals(self) -> int:
|
||||
return self.backend_state.num_pending_goals()
|
||||
|
||||
async def listen_for_change(self, keys_to_snapshot_ids: Dict[str, int]):
|
||||
"""Proxy long pull client's listen request.
|
||||
@@ -186,45 +104,11 @@ class ServeController:
|
||||
"""Returns a dictionary of node ID to http_proxy actor handles."""
|
||||
return self.http_state.get_http_proxy_handles()
|
||||
|
||||
def _checkpoint(self) -> None:
|
||||
"""Checkpoint internal state and write it to the KV store."""
|
||||
assert self.write_lock.locked()
|
||||
logger.debug("Writing checkpoint")
|
||||
start = time.time()
|
||||
|
||||
checkpoint = pickle.dumps(
|
||||
Checkpoint(self.backend_state.checkpoint(),
|
||||
self._serializable_inflight_results))
|
||||
|
||||
self.kv_store.put(CHECKPOINT_KEY, checkpoint)
|
||||
logger.debug("Wrote checkpoint in {:.3f}s".format(time.time() - start))
|
||||
|
||||
if random.random(
|
||||
) < _CRASH_AFTER_CHECKPOINT_PROBABILITY and self.detached:
|
||||
logger.warning("Intentionally crashing after checkpoint")
|
||||
os._exit(0)
|
||||
|
||||
async def reconcile_current_and_goal_backends(self):
|
||||
pass
|
||||
|
||||
def set_goal_id(self, goal_id: UUID) -> None:
|
||||
event = self.inflight_results.get(goal_id)
|
||||
logger.debug(f"Setting goal id {goal_id}")
|
||||
if event:
|
||||
event.set()
|
||||
|
||||
async def run_control_loop(self) -> None:
|
||||
while True:
|
||||
async with self.write_lock:
|
||||
self.http_state.update()
|
||||
|
||||
completed_ids = self.backend_state.completed_goals()
|
||||
for done_id in completed_ids:
|
||||
self.set_goal_id(done_id)
|
||||
delta_workers = await self.backend_state.update()
|
||||
if delta_workers:
|
||||
self.notify_replica_handles_changed()
|
||||
self._checkpoint()
|
||||
await self.backend_state.update()
|
||||
|
||||
await asyncio.sleep(CONTROL_LOOP_PERIOD_S)
|
||||
|
||||
@@ -241,17 +125,6 @@ class ServeController:
|
||||
"""Returns a dictionary of backend tag to backend config."""
|
||||
return self.endpoint_state.get_endpoints()
|
||||
|
||||
def _set_traffic(self, endpoint_name: str,
|
||||
traffic_dict: Dict[str, float]) -> UUID:
|
||||
for backend in traffic_dict:
|
||||
if self.backend_state.get_backend(backend) is None:
|
||||
raise ValueError(
|
||||
"Attempted to assign traffic to a backend '{}' that "
|
||||
"is not registered.".format(backend))
|
||||
|
||||
self.endpoint_state.set_traffic_policy(endpoint_name,
|
||||
TrafficPolicy(traffic_dict))
|
||||
|
||||
def _validate_traffic_dict(self, traffic_dict: Dict[str, float]):
|
||||
for backend in traffic_dict:
|
||||
if self.backend_state.get_backend(backend) is None:
|
||||
@@ -259,15 +132,20 @@ class ServeController:
|
||||
"Attempted to assign traffic to a backend '{}' that "
|
||||
"is not registered.".format(backend))
|
||||
|
||||
async def set_traffic(self, endpoint_name: str,
|
||||
async def set_traffic(self, endpoint: str,
|
||||
traffic_dict: Dict[str, float]) -> None:
|
||||
"""Sets the traffic policy for the specified endpoint."""
|
||||
async with self.write_lock:
|
||||
self._validate_traffic_dict(traffic_dict)
|
||||
self._set_traffic(endpoint_name, traffic_dict)
|
||||
|
||||
logger.info("Setting traffic for endpoint "
|
||||
f"'{endpoint}' to '{traffic_dict}'.")
|
||||
|
||||
self.endpoint_state.set_traffic_policy(endpoint,
|
||||
TrafficPolicy(traffic_dict))
|
||||
|
||||
async def shadow_traffic(self, endpoint_name: str, backend_tag: BackendTag,
|
||||
proportion: float) -> UUID:
|
||||
proportion: float) -> None:
|
||||
"""Shadow traffic from the endpoint to the backend."""
|
||||
async with self.write_lock:
|
||||
if self.backend_state.get_backend(backend_tag) is None:
|
||||
@@ -285,7 +163,7 @@ class ServeController:
|
||||
# TODO(architkulkarni): add Optional for route after cloudpickle upgrade
|
||||
async def create_endpoint(self, endpoint: str,
|
||||
traffic_dict: Dict[str, float], route,
|
||||
methods: List[str]) -> UUID:
|
||||
methods: List[str]) -> None:
|
||||
"""Create a new endpoint with the specified route and methods.
|
||||
|
||||
If the route is None, this is a "headless" endpoint that will not
|
||||
@@ -310,67 +188,18 @@ class ServeController:
|
||||
async with self.write_lock:
|
||||
self.endpoint_state.delete_endpoint(endpoint)
|
||||
|
||||
async def set_backend_goal(self, backend_tag: BackendTag,
|
||||
backend_info: BackendInfo,
|
||||
new_id: GoalId) -> None:
|
||||
# NOTE(ilr) Must checkpoint after doing this!
|
||||
existing_id_to_set = self.backend_state._set_backend_goal(
|
||||
backend_tag, backend_info, new_id)
|
||||
if existing_id_to_set:
|
||||
self.set_goal_id(existing_id_to_set)
|
||||
|
||||
async def create_backend(self, backend_tag: BackendTag,
|
||||
backend_config: BackendConfig,
|
||||
replica_config: ReplicaConfig) -> UUID:
|
||||
async def create_backend(
|
||||
self, backend_tag: BackendTag, backend_config: BackendConfig,
|
||||
replica_config: ReplicaConfig) -> Optional[GoalId]:
|
||||
"""Register a new backend under the specified tag."""
|
||||
async with self.write_lock:
|
||||
# Ensures this method is idempotent.
|
||||
backend_info = self.backend_state.get_backend(backend_tag)
|
||||
if backend_info is not None:
|
||||
if (backend_info.backend_config == backend_config
|
||||
and backend_info.replica_config == replica_config):
|
||||
return
|
||||
|
||||
backend_replica = create_backend_replica(
|
||||
replica_config.func_or_class)
|
||||
|
||||
# Save creator that starts replicas, the arguments to be passed in,
|
||||
# and the configuration for the backends.
|
||||
backend_info = BackendInfo(
|
||||
worker_class=backend_replica,
|
||||
backend_config=backend_config,
|
||||
replica_config=replica_config)
|
||||
|
||||
return_uuid = self._create_event_with_result({
|
||||
backend_tag: backend_info
|
||||
})
|
||||
|
||||
await self.set_backend_goal(backend_tag, backend_info, return_uuid)
|
||||
|
||||
try:
|
||||
# This call should be to run control loop
|
||||
self.backend_state.scale_backend_replicas(
|
||||
backend_tag, backend_config.num_replicas)
|
||||
except RayServeException as e:
|
||||
del self.backend_state.backends[backend_tag]
|
||||
raise e
|
||||
|
||||
# NOTE(edoakes): we must write a checkpoint before starting new
|
||||
# or pushing the updated config to avoid inconsistent state if we
|
||||
# crash while making the change.
|
||||
self._checkpoint()
|
||||
self.notify_backend_configs_changed()
|
||||
return return_uuid
|
||||
return self.backend_state.create_backend(
|
||||
backend_tag, backend_config, replica_config)
|
||||
|
||||
async def delete_backend(self,
|
||||
backend_tag: BackendTag,
|
||||
force_kill: bool = False) -> UUID:
|
||||
force_kill: bool = False) -> Optional[GoalId]:
|
||||
async with self.write_lock:
|
||||
# This method must be idempotent. We should validate that the
|
||||
# specified backend exists on the client.
|
||||
if self.backend_state.get_backend(backend_tag) is None:
|
||||
return
|
||||
|
||||
# Check that the specified backend isn't used by any endpoints.
|
||||
for endpoint, info in self.endpoint_state.get_endpoints().items():
|
||||
if (backend_tag in info["traffic"]
|
||||
@@ -379,68 +208,19 @@ class ServeController:
|
||||
"and cannot be deleted. Please remove "
|
||||
"the backend from all endpoints and try "
|
||||
"again.".format(backend_tag, endpoint))
|
||||
|
||||
# Scale its replicas down to 0.
|
||||
self.backend_state.scale_backend_replicas(backend_tag, 0,
|
||||
force_kill)
|
||||
|
||||
# Remove the backend's metadata.
|
||||
del self.backend_state.backends[backend_tag]
|
||||
|
||||
# Add the intention to remove the backend from the routers.
|
||||
self.backend_state.backends_to_remove.append(backend_tag)
|
||||
|
||||
return_uuid = self._create_event_with_result({backend_tag: None})
|
||||
# Remove the backend's metadata.
|
||||
await self.set_backend_goal(backend_tag, None, return_uuid)
|
||||
# NOTE(edoakes): we must write a checkpoint before removing the
|
||||
# backend from the routers to avoid inconsistent state if we crash
|
||||
# after pushing the update.
|
||||
self._checkpoint()
|
||||
return return_uuid
|
||||
return self.backend_state.delete_backend(backend_tag, force_kill)
|
||||
|
||||
async def update_backend_config(self, backend_tag: BackendTag,
|
||||
config_options: BackendConfig) -> UUID:
|
||||
config_options: BackendConfig) -> GoalId:
|
||||
"""Set the config for the specified backend."""
|
||||
async with self.write_lock:
|
||||
assert (self.backend_state.get_backend(backend_tag)
|
||||
), "Backend {} is not registered.".format(backend_tag)
|
||||
assert isinstance(config_options, BackendConfig)
|
||||
|
||||
stored_backend_config = self.backend_state.get_backend(
|
||||
backend_tag).backend_config
|
||||
backend_config = stored_backend_config.copy(
|
||||
update=config_options.dict(exclude_unset=True))
|
||||
backend_config._validate_complete()
|
||||
self.backend_state.get_backend(
|
||||
backend_tag).backend_config = backend_config
|
||||
backend_info = self.backend_state.get_backend(backend_tag)
|
||||
|
||||
return_uuid = self._create_event_with_result({
|
||||
backend_tag: backend_info
|
||||
})
|
||||
await self.set_backend_goal(backend_tag, backend_info, return_uuid)
|
||||
|
||||
# Scale the replicas with the new configuration.
|
||||
|
||||
# This should be to run the control loop
|
||||
self.backend_state.scale_backend_replicas(
|
||||
backend_tag, backend_config.num_replicas)
|
||||
|
||||
# NOTE(edoakes): we must write a checkpoint before pushing the
|
||||
# update to avoid inconsistent state if we crash after pushing the
|
||||
# update.
|
||||
self._checkpoint()
|
||||
|
||||
# Inform the routers and backend replicas about config changes.
|
||||
self.notify_backend_configs_changed()
|
||||
|
||||
return return_uuid
|
||||
return self.backend_state.update_backend_config(
|
||||
backend_tag, config_options)
|
||||
|
||||
def get_backend_config(self, backend_tag: BackendTag) -> BackendConfig:
|
||||
"""Get the current config for the specified backend."""
|
||||
assert (self.backend_state.get_backend(backend_tag)
|
||||
), "Backend {} is not registered.".format(backend_tag)
|
||||
if self.backend_state.get_backend(backend_tag) is None:
|
||||
raise ValueError(f"Backend {backend_tag} is not registered.")
|
||||
return self.backend_state.get_backend(backend_tag).backend_config
|
||||
|
||||
def get_http_config(self):
|
||||
|
||||
@@ -6,7 +6,7 @@ import ray
|
||||
def test_controller_inflight_requests_clear(serve_instance):
|
||||
client = serve_instance
|
||||
initial_number_reqs = ray.get(
|
||||
client._controller._num_inflight_results.remote())
|
||||
client._controller._num_pending_goals.remote())
|
||||
|
||||
def function(_):
|
||||
return "hello"
|
||||
@@ -14,7 +14,7 @@ def test_controller_inflight_requests_clear(serve_instance):
|
||||
client.create_backend("tst", function)
|
||||
client.create_endpoint("end_pt", backend="tst")
|
||||
|
||||
assert ray.get(client._controller._num_inflight_results.remote()
|
||||
assert ray.get(client._controller._num_pending_goals.remote()
|
||||
) - initial_number_reqs == 0
|
||||
|
||||
|
||||
|
||||
@@ -226,7 +226,7 @@ def test_create_backend_idempotent(serve_instance):
|
||||
|
||||
for i in range(10):
|
||||
ray.get(
|
||||
controller.wait_for_event.remote(
|
||||
controller.wait_for_goal.remote(
|
||||
controller.create_backend.remote("my_backend", backend_config,
|
||||
replica_config)))
|
||||
|
||||
@@ -249,7 +249,7 @@ def test_create_endpoint_idempotent(serve_instance):
|
||||
|
||||
for i in range(10):
|
||||
ray.get(
|
||||
controller.wait_for_event.remote(
|
||||
controller.wait_for_goal.remote(
|
||||
controller.create_endpoint.remote(
|
||||
"my_endpoint", {"my_backend": 1.0}, "/my_route", ["GET"])))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user