From 4ada3e4c9952ffd6f573f9813a0231bafc6cf17c Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 24 Nov 2020 13:06:08 -0600 Subject: [PATCH] [serve] Incremental change towards async control loop for replica startup (#12281) --- python/ray/serve/controller.py | 92 ++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index c0a7d5da5..fb3d1f840 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -36,6 +36,8 @@ _RESOURCE_CHECK_ENABLED = True # How often to call the control loop on the controller. CONTROL_LOOP_PERIOD_S = 1.0 +REPLICA_STARTUP_TIME_WARNING_S = 5 + # TypeDefs BackendTag = str EndpointTag = str @@ -152,28 +154,48 @@ class ActorStateReconciler: self, config_store: ConfigurationStore) -> None: """Starts the pending backend replicas in self.backend_replicas_to_start. - Starts the replica, then pushes an update to the router to add it to - the proper backend. If the replica has already been started, only - updates the router. - - Clears self.backend_replicas_to_start. + Waits for replicas to start up, then removes them from + self.backend_replicas_to_start. """ - replica_started_futures = [] + fut_to_replica_info = {} for backend_tag, replicas_to_create in self.backend_replicas_to_start.\ items(): for replica_tag in replicas_to_create: - replica_started_futures.append( - self._start_backend_replicas(config_store, backend_tag, - replica_tag)) + replica_handle = await self._start_backend_replica( + config_store, backend_tag, replica_tag) + ready_future = replica_handle.ready.remote().as_future() + fut_to_replica_info[ready_future] = (backend_tag, replica_tag, + replica_handle) - # Wait on all creation task futures together. - await asyncio.gather(*replica_started_futures) + start = time.time() + prev_warning = start + while fut_to_replica_info: + if time.time() - prev_warning > REPLICA_STARTUP_TIME_WARNING_S: + prev_warning = time.time() + logger.warning("Waited {:.2f}s for replicas to start up. Make " + "sure there are enough resources to create the " + "replicas.".format(time.time() - start)) + + done, pending = await asyncio.wait( + list(fut_to_replica_info.keys()), timeout=1) + for fut in done: + (backend_tag, replica_tag, + replica_handle) = fut_to_replica_info.pop(fut) + self.backend_replicas[backend_tag][ + replica_tag] = replica_handle self.backend_replicas_to_start.clear() - async def _start_backend_replicas(self, config_store: ConfigurationStore, - backend_tag: BackendTag, - replica_tag: ReplicaTag) -> None: + async def _start_backend_replica(self, config_store: ConfigurationStore, + backend_tag: BackendTag, + replica_tag: ReplicaTag) -> ActorHandle: + """Start a replica and return its actor handle. + + Checks if the named actor already exists before starting a new one. + + Assumes that the backend configuration has already been registered + in the ConfigurationStore. + """ # NOTE(edoakes): the replicas may already be created if we # failed after creating them but before writing a # checkpoint. @@ -181,10 +203,21 @@ class ActorStateReconciler: try: replica_handle = ray.get_actor(replica_name) except ValueError: - replica_handle = await self._start_single_replica( - config_store, backend_tag, replica_tag, replica_name) + logger.debug("Starting replica '{}' for backend '{}'.".format( + replica_tag, backend_tag)) + backend_info = config_store.get_backend(backend_tag) - self.backend_replicas[backend_tag][replica_tag] = replica_handle + replica_handle = ray.remote(backend_info.worker_class).options( + name=replica_name, + lifetime="detached" if self.detached else None, + max_restarts=-1, + max_task_retries=-1, + **backend_info.replica_config.ray_actor_options).remote( + backend_tag, replica_tag, + backend_info.replica_config.actor_init_args, + backend_info.backend_config, self.controller_name) + + return replica_handle def _scale_backend_replicas(self, backends: Dict[BackendTag, BackendInfo], backend_tag: BackendTag, @@ -271,31 +304,6 @@ class ActorStateReconciler: self.backend_replicas_to_stop.clear() - async def _start_single_replica( - self, config_store: ConfigurationStore, backend_tag: BackendTag, - replica_tag: ReplicaTag, replica_name: str) -> ActorHandle: - """Creates a backend replica and waits for it to start up. - - Assumes that the backend configuration has already been registered - in the ConfigurationStore. - """ - logger.debug("Starting replica '{}' for backend '{}'.".format( - replica_tag, backend_tag)) - backend_info = config_store.get_backend(backend_tag) - - replica_handle = ray.remote(backend_info.worker_class).options( - name=replica_name, - lifetime="detached" if self.detached else None, - max_restarts=-1, - max_task_retries=-1, - **backend_info.replica_config.ray_actor_options).remote( - backend_tag, replica_tag, - backend_info.replica_config.actor_init_args, - backend_info.backend_config, self.controller_name) - # TODO(edoakes): we should probably have a timeout here. - await replica_handle.ready.remote() - return replica_handle - def _start_routers_if_needed(self, http_host: str, http_port: str, http_middlewares: List[Any]) -> None: """Start a router on every node if it doesn't already exist."""