From a695c651ee360f5ea70596d9f3286bc3d0d052c6 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 3 Feb 2021 11:46:25 -0600 Subject: [PATCH] [serve] Small cleanups for BackendState (#13870) --- python/ray/serve/backend_state.py | 53 +++++++------------------------ python/ray/serve/controller.py | 2 +- 2 files changed, 12 insertions(+), 43 deletions(-) diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index 4aad2671e..418ab3b2a 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -347,40 +347,10 @@ class BackendState: return new_goal_id - def _start_backend_replica(self, backend_tag: BackendTag, - replica_tag: ReplicaTag) -> ActorHandle: - """Start a replica and return its actor handle. - - Checks if the named actor already exists before starting a new one. - - Assumes that the backend configuration is already in the Goal State. - """ - # NOTE(edoakes): the replicas may already be created if we - # failed after creating them but before writing a - # checkpoint. - replica_name = format_actor_name(replica_tag, self._controller_name) - try: - replica_handle = ray.get_actor(replica_name) - except ValueError: - logger.debug("Starting replica '{}' for backend '{}'.".format( - replica_tag, backend_tag)) - backend_info = self.get_backend(backend_tag) - - replica_handle = ray.remote(backend_info.worker_class).options( - name=replica_name, - lifetime="detached" if self._detached else None, - max_restarts=-1, - max_task_retries=-1, - **backend_info.replica_config.ray_actor_options).remote( - backend_tag, replica_tag, - backend_info.replica_config.actor_init_args, - backend_info.backend_config, self._controller_name) - - return replica_handle - - def scale_backend_replicas( + def _scale_backend_replicas( self, backend_tag: BackendTag, + num_replicas: int, ) -> bool: """Scale the given backend to the number of replicas. @@ -391,8 +361,6 @@ class BackendState: inconsistencies with starting/stopping a replica and then crashing before writing a checkpoint. """ - num_replicas = self._target_replicas.get(backend_tag, 0) - logger.debug("Scaling backend '{}' to {} replicas".format( backend_tag, num_replicas)) assert (backend_tag in self._backend_metadata @@ -461,11 +429,11 @@ class BackendState: return True - def scale_all_backends(self): + def _scale_all_backends(self): checkpoint_needed = False for backend_tag, num_replicas in list(self._target_replicas.items()): - checkpoint_needed = (checkpoint_needed - or self.scale_backend_replicas(backend_tag)) + checkpoint_needed |= self._scale_backend_replicas( + backend_tag, num_replicas) if num_replicas == 0: del self._backend_metadata[backend_tag] del self._target_replicas[backend_tag] @@ -501,23 +469,24 @@ class BackendState: or state_dict.get(ReplicaState.STOPPING)): continue - # TODO(ilr): FIX - # Check for deleting + # Check for deleting. if (not desired_num_replicas or desired_num_replicas == 0) and \ (not existing_info or len(existing_info) == 0): completed_goals.append( self.backend_goals.pop(backend_tag, None)) - # Check for a non-zero number of backends + # Check for a non-zero number of backends. if (desired_num_replicas and existing_info) \ and desired_num_replicas == len(existing_info): completed_goals.append( self.backend_goals.pop(backend_tag, None)) return [goal for goal in completed_goals if goal] - async def update(self) -> bool: - self.scale_all_backends() + def update(self) -> bool: + """Updates the state of all running replicas to match the goal state. + """ + self._scale_all_backends() for goal_id in self._completed_goals(): self._goal_manager.complete_goal(goal_id) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index b5c65111a..0ad444a54 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -111,7 +111,7 @@ class ServeController: while True: async with self.write_lock: self.http_state.update() - await self.backend_state.update() + self.backend_state.update() await asyncio.sleep(CONTROL_LOOP_PERIOD_S)