diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index f76dd59c4..cc3472949 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -354,24 +354,30 @@ class ServeMaster: async def _stop_pending_replicas(self): """Stops the pending backend replicas in self.replicas_to_stop. - Stops workers by telling the router to remove them. - - Clears self.replicas_to_stop. + Removes workers from the router, kills them, and clears + self.replicas_to_stop. """ for backend_tag, replicas_to_stop in self.replicas_to_stop.items(): for replica_tag in replicas_to_stop: # NOTE(edoakes): the replicas may already be stopped if we # failed after stopping them but before writing a checkpoint. try: - # Remove the replica from router. - # This will also submit __ray_terminate__ on the worker. - # NOTE(edoakes): we currently need to kill the worker from - # the router to guarantee that the router won't submit any - # more requests to it. - await self.router.remove_worker.remote( - backend_tag, replica_tag) + replica = ray.util.get_actor(replica_tag) except ValueError: - pass + continue + + # Remove the replica from router. This call is idempotent. + await self.router.remove_worker.remote(backend_tag, + replica_tag) + + # TODO(edoakes): this logic isn't ideal because there may be + # pending tasks still executing on the replica. However, if we + # use replica.__ray_terminate__, we may send it while the + # replica is being restarted and there's no way to tell if it + # successfully killed the worker or not. + worker = ray.worker.global_worker + # Kill the actor with no_restart=True. + worker.core_worker.kill_actor(replica._ray_actor_id, True) self.replicas_to_stop.clear() diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index ac4568e07..ca6987545 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -249,7 +249,7 @@ class Router: backend_replica_tag = backend_tag + ":" + replica_tag if backend_replica_tag not in self.replicas: return - worker_handle = self.replicas.pop(backend_replica_tag) + del self.replicas[backend_replica_tag] # We need this lock because we modify worker_queue here. async with self.flush_lock: @@ -262,10 +262,6 @@ class Router: await new_queue.put(curr_tag) self.worker_queues[backend_tag] = new_queue - # We need to terminate the worker here instead of from the master - # so we can guarantee that the router won't submit any more tasks - # on it. - worker_handle.__ray_terminate__.remote() async def set_traffic(self, endpoint, traffic_dict): logger.debug("Setting traffic for endpoint %s to %s", endpoint,