[serve] Refactor to full control loop design (#12537)

This commit is contained in:
Ian Rodney
2020-12-20 11:03:57 -08:00
committed by GitHub
parent 407a3523f3
commit d6e243ad46
+132 -63
View File
@@ -155,6 +155,24 @@ class ActorStateReconciler:
default_factory=lambda: defaultdict(list))
backends_to_remove: List[BackendTag] = field(default_factory=list)
# NOTE(ilr): These are not checkpointed, but will be recreated by
# `_enqueue_pending_scale_changes_loop`.
currently_starting_replicas: Dict[asyncio.Future, Tuple[
BackendTag, ReplicaTag, ActorHandle]] = field(default_factory=dict)
currently_stopping_replicas: Dict[asyncio.Future, Tuple[
BackendTag, ReplicaTag]] = field(default_factory=dict)
def __getstate__(self):
state = self.__dict__.copy()
del state["currently_stopping_replicas"]
del state["currently_starting_replicas"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.currently_stopping_replicas = {}
self.currently_starting_replicas = {}
# TODO(edoakes): consider removing this and just using the names.
def http_proxy_handles(self) -> List[ActorHandle]:
@@ -174,42 +192,6 @@ class ActorStateReconciler:
for replica_dict in self.backend_replicas.values()
]))
async def _start_pending_backend_replicas(
self, current_state: SystemState) -> None:
"""Starts the pending backend replicas in self.backend_replicas_to_start.
Waits for replicas to start up, then removes them from
self.backend_replicas_to_start.
"""
fut_to_replica_info = {}
for backend_tag, replicas_to_create in self.backend_replicas_to_start.\
items():
for replica_tag in replicas_to_create:
replica_handle = await self._start_backend_replica(
current_state, backend_tag, replica_tag)
ready_future = replica_handle.ready.remote().as_future()
fut_to_replica_info[ready_future] = (backend_tag, replica_tag,
replica_handle)
start = time.time()
prev_warning = start
while fut_to_replica_info:
if time.time() - prev_warning > REPLICA_STARTUP_TIME_WARNING_S:
prev_warning = time.time()
logger.warning("Waited {:.2f}s for replicas to start up. Make "
"sure there are enough resources to create the "
"replicas.".format(time.time() - start))
done, pending = await asyncio.wait(
list(fut_to_replica_info.keys()), timeout=1)
for fut in done:
(backend_tag, replica_tag,
replica_handle) = fut_to_replica_info.pop(fut)
self.backend_replicas[backend_tag][
replica_tag] = replica_handle
self.backend_replicas_to_start.clear()
async def _start_backend_replica(self, current_state: SystemState,
backend_tag: BackendTag,
replica_tag: ReplicaTag) -> ActorHandle:
@@ -254,6 +236,7 @@ class ActorStateReconciler:
intended replicas. This avoids inconsistencies with starting/stopping a
replica and then crashing before writing a checkpoint.
"""
logger.debug("Scaling backend '{}' to {} replicas".format(
backend_tag, num_replicas))
assert (backend_tag in backends
@@ -300,32 +283,102 @@ class ActorStateReconciler:
self.backend_replicas_to_stop[backend_tag].append(replica_tag)
async def _stop_pending_backend_replicas(self) -> None:
"""Stops the pending backend replicas in self.backend_replicas_to_stop.
async def _enqueue_pending_scale_changes_loop(self,
current_state: SystemState):
for backend_tag, replicas_to_create in self.backend_replicas_to_start.\
items():
for replica_tag in replicas_to_create:
replica_handle = await self._start_backend_replica(
current_state, backend_tag, replica_tag)
ready_future = replica_handle.ready.remote().as_future()
self.currently_starting_replicas[ready_future] = (
backend_tag, replica_tag, replica_handle)
Removes backend_replicas from the http_proxy, kills them, and clears
self.backend_replicas_to_stop.
"""
for backend_tag, replicas_list in self.backend_replicas_to_stop.items(
):
for replica_tag in replicas_list:
# NOTE(edoakes): the replicas may already be stopped if we
# failed after stopping them but before writing a checkpoint.
for backend_tag, replicas_to_stop in self.backend_replicas_to_stop.\
items():
for replica_tag in replicas_to_stop:
replica_name = format_actor_name(replica_tag,
self.controller_name)
try:
replica = ray.get_actor(replica_name)
except ValueError:
continue
# TODO(edoakes): this logic isn't ideal because there may be
# pending tasks still executing on the replica. However, if we
# use replica.__ray_terminate__, we may send it while the
# replica is being restarted and there's no way to tell if it
# successfully killed the worker or not.
ray.kill(replica, no_restart=True)
async def kill_actor(replica_name_to_use):
# NOTE: the replicas may already be stopped if we failed
# after stopping them but before writing a checkpoint.
try:
replica = ray.get_actor(replica_name_to_use)
except ValueError:
return
self.backend_replicas_to_stop.clear()
# TODO(edoakes): this logic isn't ideal because there may
# be pending tasks still executing on the replica. However,
# if we use replica.__ray_terminate__, we may send it while
# the replica is being restarted and there's no way to tell
# if it successfully killed the worker or not.
ray.kill(replica, no_restart=True)
self.currently_stopping_replicas[asyncio.ensure_future(
kill_actor(replica_name))] = (backend_tag, replica_tag)
async def _check_currently_starting_replicas(self) -> bool:
"""Returns a boolean specifying if there are more replicas to start"""
in_flight = list()
if self.currently_starting_replicas:
done, in_flight = await asyncio.wait(
list(self.currently_starting_replicas.keys()), timeout=0)
for fut in done:
(backend_tag, replica_tag,
replica_handle) = self.currently_starting_replicas.pop(fut)
self.backend_replicas[backend_tag][
replica_tag] = replica_handle
backend = self.backend_replicas_to_start.get(backend_tag)
if backend:
try:
backend.remove(replica_tag)
except ValueError:
pass
if len(backend) == 0:
del self.backend_replicas_to_start[backend_tag]
return len(in_flight) > 0
async def _check_currently_stopping_replicas(self) -> bool:
"""Returns a boolean specifying if there are more replicas to stop"""
in_flight = list()
if self.currently_stopping_replicas:
done_stoppping, in_flight = await asyncio.wait(
list(self.currently_stopping_replicas.keys()), timeout=0)
for fut in done_stoppping:
(backend_tag,
replica_tag) = self.currently_stopping_replicas.pop(fut)
backend = self.backend_replicas_to_stop.get(backend_tag)
if backend:
try:
backend.remove(replica_tag)
except ValueError:
pass
if len(backend) == 0:
del self.backend_replicas_to_stop[backend_tag]
return len(in_flight) > 0
async def backend_control_loop(self):
start = time.time()
prev_warning = start
need_to_continue = True
while need_to_continue:
if time.time() - prev_warning > REPLICA_STARTUP_TIME_WARNING_S:
prev_warning = time.time()
logger.warning("Waited {:.2f}s for replicas to start up. Make "
"sure there are enough resources to create the "
"replicas.".format(time.time() - start))
need_to_continue = (
await self._check_currently_starting_replicas()
or await self._check_currently_stopping_replicas())
asyncio.sleep(1)
def _start_http_proxies_if_needed(self, http_host: str, http_port: str,
http_middlewares: List[Any]) -> None:
@@ -415,8 +468,8 @@ class ActorStateReconciler:
backend, metadata.autoscaling_config)
# Start/stop any pending backend replicas.
await self._start_pending_backend_replicas(current_state)
await self._stop_pending_backend_replicas()
await self._enqueue_pending_scale_changes_loop(current_state)
await self.backend_control_loop()
return autoscaling_policies
@@ -671,6 +724,12 @@ class ServeController:
await self.update_backend_config(
backend, BackendConfig(num_replicas=new_num_replicas))
async def reconcile_current_and_goal_backends(self):
pass
# backends_to_delete = set(
# self.current_state.backends.keys()).difference(
# self.goal_state.backends.keys())
async def run_control_loop(self) -> None:
while True:
await self.do_autoscale()
@@ -872,6 +931,7 @@ class ServeController:
backend_tag, metadata.autoscaling_config)
try:
# This call should be to run control loop
self.actor_reconciler._scale_backend_replicas(
self.current_state.backends, backend_tag,
backend_config.num_replicas)
@@ -886,8 +946,9 @@ class ServeController:
# or pushing the updated config to avoid inconsistent state if we
# crash while making the change.
self._checkpoint()
await self.actor_reconciler._start_pending_backend_replicas(
await self.actor_reconciler._enqueue_pending_scale_changes_loop(
self.current_state)
await self.actor_reconciler.backend_control_loop()
self.notify_replica_handles_changed()
@@ -916,6 +977,10 @@ class ServeController:
# Scale its replicas down to 0. This will also remove the backend
# from self.current_state.backends and
# self.actor_reconciler.backend_replicas.
self.goal_state.backends[backend_tag] = None
# This should be a call to the control loop
self.actor_reconciler._scale_backend_replicas(
self.current_state.backends, backend_tag, 0)
@@ -932,7 +997,9 @@ class ServeController:
# backend from the routers to avoid inconsistent state if we crash
# after pushing the update.
self._checkpoint()
await self.actor_reconciler._stop_pending_backend_replicas()
await self.actor_reconciler._enqueue_pending_scale_changes_loop(
self.current_state)
await self.actor_reconciler.backend_control_loop()
self.notify_replica_handles_changed()
return return_uuid
@@ -955,6 +1022,8 @@ class ServeController:
backend_info = self.current_state.get_backend(backend_tag)
# Scale the replicas with the new configuration.
# This should be to run the control loop
self.actor_reconciler._scale_backend_replicas(
self.current_state.backends, backend_tag,
backend_config.num_replicas)
@@ -970,9 +1039,9 @@ class ServeController:
# Inform the routers about change in configuration
# (particularly for setting max_batch_size).
await self.actor_reconciler._start_pending_backend_replicas(
await self.actor_reconciler._enqueue_pending_scale_changes_loop(
self.current_state)
await self.actor_reconciler._stop_pending_backend_replicas()
await self.actor_reconciler.backend_control_loop()
self.notify_replica_handles_changed()
self.notify_backend_configs_changed()