[serve] Incremental change towards async control loop for replica startup (#12281)

This commit is contained in:
Edward Oakes
2020-11-24 13:06:08 -06:00
committed by GitHub
parent 888357d251
commit 4ada3e4c99
+50 -42
View File
@@ -36,6 +36,8 @@ _RESOURCE_CHECK_ENABLED = True
# How often to call the control loop on the controller.
CONTROL_LOOP_PERIOD_S = 1.0
REPLICA_STARTUP_TIME_WARNING_S = 5
# TypeDefs
BackendTag = str
EndpointTag = str
@@ -152,28 +154,48 @@ class ActorStateReconciler:
self, config_store: ConfigurationStore) -> None:
"""Starts the pending backend replicas in self.backend_replicas_to_start.
Starts the replica, then pushes an update to the router to add it to
the proper backend. If the replica has already been started, only
updates the router.
Clears self.backend_replicas_to_start.
Waits for replicas to start up, then removes them from
self.backend_replicas_to_start.
"""
replica_started_futures = []
fut_to_replica_info = {}
for backend_tag, replicas_to_create in self.backend_replicas_to_start.\
items():
for replica_tag in replicas_to_create:
replica_started_futures.append(
self._start_backend_replicas(config_store, backend_tag,
replica_tag))
replica_handle = await self._start_backend_replica(
config_store, backend_tag, replica_tag)
ready_future = replica_handle.ready.remote().as_future()
fut_to_replica_info[ready_future] = (backend_tag, replica_tag,
replica_handle)
# Wait on all creation task futures together.
await asyncio.gather(*replica_started_futures)
start = time.time()
prev_warning = start
while fut_to_replica_info:
if time.time() - prev_warning > REPLICA_STARTUP_TIME_WARNING_S:
prev_warning = time.time()
logger.warning("Waited {:.2f}s for replicas to start up. Make "
"sure there are enough resources to create the "
"replicas.".format(time.time() - start))
done, pending = await asyncio.wait(
list(fut_to_replica_info.keys()), timeout=1)
for fut in done:
(backend_tag, replica_tag,
replica_handle) = fut_to_replica_info.pop(fut)
self.backend_replicas[backend_tag][
replica_tag] = replica_handle
self.backend_replicas_to_start.clear()
async def _start_backend_replicas(self, config_store: ConfigurationStore,
backend_tag: BackendTag,
replica_tag: ReplicaTag) -> None:
async def _start_backend_replica(self, config_store: ConfigurationStore,
backend_tag: BackendTag,
replica_tag: ReplicaTag) -> ActorHandle:
"""Start a replica and return its actor handle.
Checks if the named actor already exists before starting a new one.
Assumes that the backend configuration has already been registered
in the ConfigurationStore.
"""
# NOTE(edoakes): the replicas may already be created if we
# failed after creating them but before writing a
# checkpoint.
@@ -181,10 +203,21 @@ class ActorStateReconciler:
try:
replica_handle = ray.get_actor(replica_name)
except ValueError:
replica_handle = await self._start_single_replica(
config_store, backend_tag, replica_tag, replica_name)
logger.debug("Starting replica '{}' for backend '{}'.".format(
replica_tag, backend_tag))
backend_info = config_store.get_backend(backend_tag)
self.backend_replicas[backend_tag][replica_tag] = replica_handle
replica_handle = ray.remote(backend_info.worker_class).options(
name=replica_name,
lifetime="detached" if self.detached else None,
max_restarts=-1,
max_task_retries=-1,
**backend_info.replica_config.ray_actor_options).remote(
backend_tag, replica_tag,
backend_info.replica_config.actor_init_args,
backend_info.backend_config, self.controller_name)
return replica_handle
def _scale_backend_replicas(self, backends: Dict[BackendTag, BackendInfo],
backend_tag: BackendTag,
@@ -271,31 +304,6 @@ class ActorStateReconciler:
self.backend_replicas_to_stop.clear()
async def _start_single_replica(
self, config_store: ConfigurationStore, backend_tag: BackendTag,
replica_tag: ReplicaTag, replica_name: str) -> ActorHandle:
"""Creates a backend replica and waits for it to start up.
Assumes that the backend configuration has already been registered
in the ConfigurationStore.
"""
logger.debug("Starting replica '{}' for backend '{}'.".format(
replica_tag, backend_tag))
backend_info = config_store.get_backend(backend_tag)
replica_handle = ray.remote(backend_info.worker_class).options(
name=replica_name,
lifetime="detached" if self.detached else None,
max_restarts=-1,
max_task_retries=-1,
**backend_info.replica_config.ray_actor_options).remote(
backend_tag, replica_tag,
backend_info.replica_config.actor_init_args,
backend_info.backend_config, self.controller_name)
# TODO(edoakes): we should probably have a timeout here.
await replica_handle.ready.remote()
return replica_handle
def _start_routers_if_needed(self, http_host: str, http_port: str,
http_middlewares: List[Any]) -> None:
"""Start a router on every node if it doesn't already exist."""