From d6e243ad4651967ef7650f3a7a05f5bec9d9737b Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Sun, 20 Dec 2020 11:03:57 -0800 Subject: [PATCH] [serve] Refactor to full control loop design (#12537) --- python/ray/serve/controller.py | 195 ++++++++++++++++++++++----------- 1 file changed, 132 insertions(+), 63 deletions(-) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index cba1ec64b..17a543048 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -155,6 +155,24 @@ class ActorStateReconciler: default_factory=lambda: defaultdict(list)) backends_to_remove: List[BackendTag] = field(default_factory=list) + # NOTE(ilr): These are not checkpointed, but will be recreated by + # `_enqueue_pending_scale_changes_loop`. + currently_starting_replicas: Dict[asyncio.Future, Tuple[ + BackendTag, ReplicaTag, ActorHandle]] = field(default_factory=dict) + currently_stopping_replicas: Dict[asyncio.Future, Tuple[ + BackendTag, ReplicaTag]] = field(default_factory=dict) + + def __getstate__(self): + state = self.__dict__.copy() + del state["currently_stopping_replicas"] + del state["currently_starting_replicas"] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self.currently_stopping_replicas = {} + self.currently_starting_replicas = {} + # TODO(edoakes): consider removing this and just using the names. def http_proxy_handles(self) -> List[ActorHandle]: @@ -174,42 +192,6 @@ class ActorStateReconciler: for replica_dict in self.backend_replicas.values() ])) - async def _start_pending_backend_replicas( - self, current_state: SystemState) -> None: - """Starts the pending backend replicas in self.backend_replicas_to_start. - - Waits for replicas to start up, then removes them from - self.backend_replicas_to_start. - """ - fut_to_replica_info = {} - for backend_tag, replicas_to_create in self.backend_replicas_to_start.\ - items(): - for replica_tag in replicas_to_create: - replica_handle = await self._start_backend_replica( - current_state, backend_tag, replica_tag) - ready_future = replica_handle.ready.remote().as_future() - fut_to_replica_info[ready_future] = (backend_tag, replica_tag, - replica_handle) - - start = time.time() - prev_warning = start - while fut_to_replica_info: - if time.time() - prev_warning > REPLICA_STARTUP_TIME_WARNING_S: - prev_warning = time.time() - logger.warning("Waited {:.2f}s for replicas to start up. Make " - "sure there are enough resources to create the " - "replicas.".format(time.time() - start)) - - done, pending = await asyncio.wait( - list(fut_to_replica_info.keys()), timeout=1) - for fut in done: - (backend_tag, replica_tag, - replica_handle) = fut_to_replica_info.pop(fut) - self.backend_replicas[backend_tag][ - replica_tag] = replica_handle - - self.backend_replicas_to_start.clear() - async def _start_backend_replica(self, current_state: SystemState, backend_tag: BackendTag, replica_tag: ReplicaTag) -> ActorHandle: @@ -254,6 +236,7 @@ class ActorStateReconciler: intended replicas. This avoids inconsistencies with starting/stopping a replica and then crashing before writing a checkpoint. """ + logger.debug("Scaling backend '{}' to {} replicas".format( backend_tag, num_replicas)) assert (backend_tag in backends @@ -300,32 +283,102 @@ class ActorStateReconciler: self.backend_replicas_to_stop[backend_tag].append(replica_tag) - async def _stop_pending_backend_replicas(self) -> None: - """Stops the pending backend replicas in self.backend_replicas_to_stop. + async def _enqueue_pending_scale_changes_loop(self, + current_state: SystemState): + for backend_tag, replicas_to_create in self.backend_replicas_to_start.\ + items(): + for replica_tag in replicas_to_create: + replica_handle = await self._start_backend_replica( + current_state, backend_tag, replica_tag) + ready_future = replica_handle.ready.remote().as_future() + self.currently_starting_replicas[ready_future] = ( + backend_tag, replica_tag, replica_handle) - Removes backend_replicas from the http_proxy, kills them, and clears - self.backend_replicas_to_stop. - """ - for backend_tag, replicas_list in self.backend_replicas_to_stop.items( - ): - for replica_tag in replicas_list: - # NOTE(edoakes): the replicas may already be stopped if we - # failed after stopping them but before writing a checkpoint. + for backend_tag, replicas_to_stop in self.backend_replicas_to_stop.\ + items(): + for replica_tag in replicas_to_stop: replica_name = format_actor_name(replica_tag, self.controller_name) - try: - replica = ray.get_actor(replica_name) - except ValueError: - continue - # TODO(edoakes): this logic isn't ideal because there may be - # pending tasks still executing on the replica. However, if we - # use replica.__ray_terminate__, we may send it while the - # replica is being restarted and there's no way to tell if it - # successfully killed the worker or not. - ray.kill(replica, no_restart=True) + async def kill_actor(replica_name_to_use): + # NOTE: the replicas may already be stopped if we failed + # after stopping them but before writing a checkpoint. + try: + replica = ray.get_actor(replica_name_to_use) + except ValueError: + return - self.backend_replicas_to_stop.clear() + # TODO(edoakes): this logic isn't ideal because there may + # be pending tasks still executing on the replica. However, + # if we use replica.__ray_terminate__, we may send it while + # the replica is being restarted and there's no way to tell + # if it successfully killed the worker or not. + ray.kill(replica, no_restart=True) + + self.currently_stopping_replicas[asyncio.ensure_future( + kill_actor(replica_name))] = (backend_tag, replica_tag) + + async def _check_currently_starting_replicas(self) -> bool: + """Returns a boolean specifying if there are more replicas to start""" + in_flight = list() + + if self.currently_starting_replicas: + done, in_flight = await asyncio.wait( + list(self.currently_starting_replicas.keys()), timeout=0) + for fut in done: + (backend_tag, replica_tag, + replica_handle) = self.currently_starting_replicas.pop(fut) + self.backend_replicas[backend_tag][ + replica_tag] = replica_handle + + backend = self.backend_replicas_to_start.get(backend_tag) + if backend: + try: + backend.remove(replica_tag) + except ValueError: + pass + if len(backend) == 0: + del self.backend_replicas_to_start[backend_tag] + return len(in_flight) > 0 + + async def _check_currently_stopping_replicas(self) -> bool: + """Returns a boolean specifying if there are more replicas to stop""" + in_flight = list() + if self.currently_stopping_replicas: + done_stoppping, in_flight = await asyncio.wait( + list(self.currently_stopping_replicas.keys()), timeout=0) + for fut in done_stoppping: + (backend_tag, + replica_tag) = self.currently_stopping_replicas.pop(fut) + + backend = self.backend_replicas_to_stop.get(backend_tag) + + if backend: + try: + backend.remove(replica_tag) + except ValueError: + pass + if len(backend) == 0: + del self.backend_replicas_to_stop[backend_tag] + + return len(in_flight) > 0 + + async def backend_control_loop(self): + start = time.time() + prev_warning = start + need_to_continue = True + while need_to_continue: + if time.time() - prev_warning > REPLICA_STARTUP_TIME_WARNING_S: + prev_warning = time.time() + logger.warning("Waited {:.2f}s for replicas to start up. Make " + "sure there are enough resources to create the " + "replicas.".format(time.time() - start)) + + need_to_continue = ( + await self._check_currently_starting_replicas() + or await self._check_currently_stopping_replicas()) + + asyncio.sleep(1) def _start_http_proxies_if_needed(self, http_host: str, http_port: str, http_middlewares: List[Any]) -> None: @@ -415,8 +468,8 @@ class ActorStateReconciler: backend, metadata.autoscaling_config) # Start/stop any pending backend replicas. - await self._start_pending_backend_replicas(current_state) - await self._stop_pending_backend_replicas() + await self._enqueue_pending_scale_changes_loop(current_state) + await self.backend_control_loop() return autoscaling_policies @@ -671,6 +724,12 @@ class ServeController: await self.update_backend_config( backend, BackendConfig(num_replicas=new_num_replicas)) + async def reconcile_current_and_goal_backends(self): + pass + # backends_to_delete = set( + # self.current_state.backends.keys()).difference( + # self.goal_state.backends.keys()) + async def run_control_loop(self) -> None: while True: await self.do_autoscale() @@ -872,6 +931,7 @@ class ServeController: backend_tag, metadata.autoscaling_config) try: + # This call should be to run control loop self.actor_reconciler._scale_backend_replicas( self.current_state.backends, backend_tag, backend_config.num_replicas) @@ -886,8 +946,9 @@ class ServeController: # or pushing the updated config to avoid inconsistent state if we # crash while making the change. self._checkpoint() - await self.actor_reconciler._start_pending_backend_replicas( + await self.actor_reconciler._enqueue_pending_scale_changes_loop( self.current_state) + await self.actor_reconciler.backend_control_loop() self.notify_replica_handles_changed() @@ -916,6 +977,10 @@ class ServeController: # Scale its replicas down to 0. This will also remove the backend # from self.current_state.backends and # self.actor_reconciler.backend_replicas. + + self.goal_state.backends[backend_tag] = None + + # This should be a call to the control loop self.actor_reconciler._scale_backend_replicas( self.current_state.backends, backend_tag, 0) @@ -932,7 +997,9 @@ class ServeController: # backend from the routers to avoid inconsistent state if we crash # after pushing the update. self._checkpoint() - await self.actor_reconciler._stop_pending_backend_replicas() + await self.actor_reconciler._enqueue_pending_scale_changes_loop( + self.current_state) + await self.actor_reconciler.backend_control_loop() self.notify_replica_handles_changed() return return_uuid @@ -955,6 +1022,8 @@ class ServeController: backend_info = self.current_state.get_backend(backend_tag) # Scale the replicas with the new configuration. + + # This should be to run the control loop self.actor_reconciler._scale_backend_replicas( self.current_state.backends, backend_tag, backend_config.num_replicas) @@ -970,9 +1039,9 @@ class ServeController: # Inform the routers about change in configuration # (particularly for setting max_batch_size). - await self.actor_reconciler._start_pending_backend_replicas( + await self.actor_reconciler._enqueue_pending_scale_changes_loop( self.current_state) - await self.actor_reconciler._stop_pending_backend_replicas() + await self.actor_reconciler.backend_control_loop() self.notify_replica_handles_changed() self.notify_backend_configs_changed()