diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index b99832319..d8a54724c 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -149,7 +149,7 @@ class HTTPProxy: if not isinstance(result, ray.exceptions.RayActorError): await Response(result).send(scope, receive, send) break - logger.warning("Got RayActorError:", str(result)) + logger.warning("Got RayActorError: {}".format(str(result))) await asyncio.sleep(0.1) except Exception as e: error_message = "Internal Error. Traceback: {}.".format(e) diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index a5ace3a50..89e0582b9 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -1,3 +1,8 @@ +import asyncio +from collections import defaultdict +from functools import wraps +import inspect + import ray from ray.serve.backend_config import BackendConfig from ray.serve.constants import ASYNC_CONCURRENCY @@ -7,11 +12,44 @@ from ray.serve.kv_store_service import (BackendTable, RoutingTable, TrafficPolicyTable) from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop) from ray.serve.backend_worker import create_backend_worker -from ray.serve.utils import expand, get_random_letters +from ray.serve.utils import expand, get_random_letters, logger import numpy as np +def async_retryable(cls): + """Make all actor method invocations on the class retryable. + + Note: This will retry actor_handle.method_name.remote(), but it must + be invoked in an async context. + + Usage: + @ray.remote(max_reconstructions=10000) + @async_retryable + class A: + pass + """ + for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): + + def decorate_with_retry(f): + @wraps(f) + async def retry_method(*args, **kwargs): + while True: + result = await f(*args, **kwargs) + if isinstance(result, ray.exceptions.RayActorError): + logger.warning( + "Actor method '{}' failed, retrying after 100ms.". + format(name)) + await asyncio.sleep(0.1) + else: + return result + + return retry_method + + method.__ray_invocation_decorator__ = decorate_with_retry + return cls + + @ray.remote class ServeMaster: """Initialize and store all actor handles. @@ -27,7 +65,8 @@ class ServeMaster: self.route_table = RoutingTable(kv_store_connector) self.backend_table = BackendTable(kv_store_connector) self.policy_table = TrafficPolicyTable(kv_store_connector) - self.replica_tag_to_workers = dict() + # Dictionary of backend tag to dictionaries of replica tag to worker. + self.workers = defaultdict(dict) self.router = None self.http_proxy = None @@ -38,8 +77,10 @@ class ServeMaster: def start_router(self, router_class, init_kwargs): assert self.router is None, "Router already started." - self.router = router_class.options( - max_concurrency=ASYNC_CONCURRENCY).remote(**init_kwargs) + self.router = async_retryable(router_class).options( + max_concurrency=ASYNC_CONCURRENCY, + max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION, + ).remote(**init_kwargs) def get_router(self): assert self.router is not None, "Router not started yet." @@ -54,7 +95,7 @@ class ServeMaster: assert self.http_proxy is None, "HTTP proxy already started." assert self.router is not None, ( "Router must be started before HTTP proxy.") - self.http_proxy = HTTPProxyActor.options( + self.http_proxy = async_retryable(HTTPProxyActor).options( max_concurrency=ASYNC_CONCURRENCY, max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION, ).remote(host, port) @@ -103,7 +144,7 @@ class ServeMaster: await self._start_backend_replica(backend_tag) elif delta_num_replicas < 0: for _ in range(-delta_num_replicas): - self._remove_backend_replica(backend_tag) + await self._remove_backend_replica(backend_tag) async def get_backend_worker_config(self): return self.get_router() @@ -118,6 +159,7 @@ class ServeMaster: # TODO(edoakes): we should guarantee that if calls to the master # succeed, the cluster state has changed and if they fail, it hasn't. # Once we have master actor fault tolerance, this breaks that guarantee + # because this method could fail after writing the replica to the DB. self.backend_table.add_replica(backend_tag, replica_tag) @@ -136,17 +178,18 @@ class ServeMaster: # Start the worker. worker_handle = backend_actor._remote(**kwargs) - self.replica_tag_to_workers[replica_tag] = worker_handle + self.workers[backend_tag][replica_tag] = worker_handle # Wait for the worker to start up. await worker_handle.ready.remote() - await self.get_router()[0].add_new_worker.remote( - backend_tag, worker_handle) + + [router] = self.get_router() + await router.add_new_worker.remote(backend_tag, worker_handle) # Register the worker with the metric monitor. self.get_metric_monitor()[0].add_target.remote(worker_handle) - def _remove_backend_replica(self, backend_tag): + async def _remove_backend_replica(self, backend_tag): assert (backend_tag in self.backend_table.list_backends() ), "Backend {} is not registered.".format(backend_tag) assert (len(self._list_replicas(backend_tag)) > @@ -154,28 +197,29 @@ class ServeMaster: backend_tag) replica_tag = self.backend_table.remove_replica(backend_tag) - assert replica_tag in self.replica_tag_to_workers - replica_handle = self.replica_tag_to_workers.pop(replica_tag) + assert backend_tag in self.workers + assert replica_tag in self.workers[backend_tag] + replica_handle = self.workers[backend_tag].pop(replica_tag) + if len(self.workers[backend_tag]) == 0: + del self.workers[backend_tag] # Remove the replica from metric monitor. [monitor] = self.get_metric_monitor() - ray.get(monitor.remove_target.remote(replica_handle)) + await monitor.remove_target.remote(replica_handle) # Remove the replica from router. # This will also destroy the actor handle. [router] = self.get_router() - ray.get( - router.remove_and_destroy_replica.remote(backend_tag, - replica_handle)) + await router.remove_worker.remote(backend_tag, replica_handle) def get_all_worker_handles(self): - return self.replica_tag_to_workers + return self.workers def get_all_endpoints(self): return expand( self.route_table.list_service(include_headless=True).values()) - def split_traffic(self, endpoint_name, traffic_policy_dictionary): + async def split_traffic(self, endpoint_name, traffic_policy_dictionary): assert endpoint_name in expand( self.route_table.list_service(include_headless=True).values()) @@ -193,18 +237,18 @@ class ServeMaster: self.policy_table.register_traffic_policy(endpoint_name, traffic_policy_dictionary) [router] = self.get_router() - ray.get( - router.set_traffic.remote(endpoint_name, - traffic_policy_dictionary)) - def create_endpoint(self, route, endpoint_name, methods): + await router.set_traffic.remote(endpoint_name, + traffic_policy_dictionary) + + async def create_endpoint(self, route, endpoint_name, methods): self.route_table.register_service( route, endpoint_name, methods=methods) [http_proxy] = self.get_http_proxy() - ray.get( - http_proxy.set_route_table.remote( - self.route_table.list_service( - include_methods=True, include_headless=False))) + + await http_proxy.set_route_table.remote( + self.route_table.list_service( + include_methods=True, include_headless=False)) async def create_backend(self, backend_tag, backend_config, func_or_class, actor_init_args): @@ -223,8 +267,9 @@ class ServeMaster: # Set the backend config inside the router # (particularly for max-batch-size). [router] = self.get_router() - ray.get( - router.set_backend_config.remote(backend_tag, backend_config_dict)) + await router.set_backend_config.remote(backend_tag, + backend_config_dict) + await self.scale_replicas(backend_tag, backend_config_dict["num_replicas"]) @@ -246,8 +291,8 @@ class ServeMaster: # Inform the router about change in configuration # (particularly for setting max_batch_size). [router] = self.get_router() - ray.get( - router.set_backend_config.remote(backend_tag, backend_config_dict)) + await router.set_backend_config.remote(backend_tag, + backend_config_dict) # Restart replicas if there is a change in the backend config related # to restart_configs. diff --git a/python/ray/serve/policy.py b/python/ray/serve/policy.py index 20e53533f..eb6443c6d 100644 --- a/python/ray/serve/policy.py +++ b/python/ray/serve/policy.py @@ -139,12 +139,12 @@ class FixedPackingPolicyQueue(Router): """ - def __init__(self, packing_num=3): + async def __init__(self, packing_num=3): # Saves the information about last assigned # backend for every service self.fixed_packing_iterator_map = {} self.packing_num = packing_num - super().__init__() + await super().__init__() async def set_traffic(self, service, traffic_dict): logger.debug("Setting traffic for service %s to %s", service, diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 2f1519f8e..9031343ac 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -2,7 +2,6 @@ import asyncio import copy from collections import defaultdict from typing import DefaultDict, List -import ray.cloudpickle as pickle # Note on choosing blist instead of stdlib heapq # 1. pop operation should be O(1) (amortized) @@ -13,6 +12,7 @@ import ray.cloudpickle as pickle import blist import ray +import ray.cloudpickle as pickle from ray.serve.utils import logger @@ -110,7 +110,7 @@ class Router: 3. When there is only 1 backend ready, we will only use that backend. """ - def __init__(self): + async def __init__(self): # Note: Several queues are used in the router # - When a request come in, it's placed inside its corresponding # service_queue. @@ -150,6 +150,16 @@ class Router: # batching polcies. self.flush_lock = asyncio.Lock() + # Fetch the worker handles from the master actor. We use a "pull-based" + # approach instead of pushing them from the master so that the router + # can transparently recover from failure. + ray.serve.init() + master_actor = ray.serve.api._get_master_actor() + backend_dict = ray.get(master_actor.get_all_worker_handles.remote()) + for backend, replica_dict in backend_dict.items(): + for worker in replica_dict.values(): + await self.add_new_worker(backend, worker) + def is_ready(self): return True @@ -197,21 +207,21 @@ class Router: await self.worker_queues[backend].put(worker_handle) await self.flush() - async def remove_and_destroy_replica(self, backend, replica_handle): + async def remove_worker(self, backend, worker_handle): # We need this lock because we modify worker_queue here. async with self.flush_lock: old_queue = self.worker_queues[backend] new_queue = asyncio.Queue() - target_id = replica_handle._actor_id + target_id = worker_handle._actor_id while not old_queue.empty(): - replica_handle = await old_queue.get() - if replica_handle._actor_id != target_id: - await new_queue.put(replica_handle) + worker_handle = await old_queue.get() + if worker_handle._actor_id != target_id: + await new_queue.put(worker_handle) self.worker_queues[backend] = new_queue - # TODO: consider await this with timeout, or use ray_kill - replica_handle.__ray_terminate__.remote() + # TODO: consider awaiting this on a timeout or using ray.kill(). + worker_handle.__ray_terminate__.remote() async def link(self, service, backend): logger.debug("Link %s with %s", service, backend) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index ec6f75062..b85bc887e 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -192,8 +192,10 @@ def test_killing_replicas(serve_instance): serve.set_backend_config("simple:v1", bnew_config) new_replica_tag_list = ray.get( master_actor._list_replicas.remote("simple:v1")) - new_all_tag_list = list( - ray.get(master_actor.get_all_worker_handles.remote()).keys()) + new_all_tag_list = [] + for worker_dict in ray.get( + master_actor.get_all_worker_handles.remote()).values(): + new_all_tag_list.extend(list(worker_dict.keys())) # the new_replica_tag_list must be subset of all_tag_list assert set(new_replica_tag_list) <= set(new_all_tag_list) @@ -226,8 +228,10 @@ def test_not_killing_replicas(serve_instance): serve.set_backend_config("bsimple:v1", bnew_config) new_replica_tag_list = ray.get( master_actor._list_replicas.remote("bsimple:v1")) - new_all_tag_list = list( - ray.get(master_actor.get_all_worker_handles.remote()).keys()) + new_all_tag_list = [] + for worker_dict in ray.get( + master_actor.get_all_worker_handles.remote()).values(): + new_all_tag_list.extend(list(worker_dict.keys())) # the old and new replica tag list should be identical # and should be subset of all_tag_list diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index f9b40f98f..8d09a1f8b 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -54,14 +54,45 @@ def test_http_proxy_failure(serve_instance): assert response.text == "hello2" -def _get_worker_handles(backend): - handles = {} - for tag, handle in ray.get(serve.api._get_master_actor() - .get_all_worker_handles.remote()).items(): - if tag.startswith(backend): - handles[tag] = handle +def _kill_router(): + [router] = ray.get(serve.api._get_master_actor().get_router.remote()) + ray.kill(router) - return handles + +def test_router_failure(serve_instance): + serve.init() + serve.create_endpoint("router_failure", "/router_failure", methods=["GET"]) + + def function(): + return "hello1" + + serve.create_backend(function, "router_failure:v1") + serve.link("router_failure", "router_failure:v1") + + assert request_with_retries("/router_failure", timeout=5).text == "hello1" + + for _ in range(10): + response = request_with_retries("/router_failure", timeout=30) + assert response.text == "hello1" + + _kill_router() + + def function(): + return "hello2" + + serve.create_backend(function, "router_failure:v2") + serve.link("router_failure", "router_failure:v2") + + for _ in range(10): + response = request_with_retries("/router_failure", timeout=30) + assert response.text == "hello2" + + +def _get_worker_handles(backend): + master_actor = serve.api._get_master_actor() + backend_dict = ray.get(master_actor.get_all_worker_handles.remote()) + + return list(backend_dict[backend].values()) # Test that a worker dying unexpectedly causes it to restart and continue @@ -83,7 +114,7 @@ def test_worker_restart(serve_instance): # Kill the worker. handles = _get_worker_handles("worker_failure:v1") assert len(handles) == 1 - ray.kill(list(handles.values())[0]) + ray.kill(handles[0]) # Wait until the worker is killed and a one is started. start = time.time() @@ -145,7 +176,7 @@ def test_worker_replica_failure(serve_instance): # Kill one of the replicas. handles = _get_worker_handles("replica_failure") assert len(handles) == 2 - ray.kill(list(handles.values())[0]) + ray.kill(handles[0]) # Check that the other replica still serves requests. for _ in range(10): diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index 0fe403b25..b737adf51 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -180,8 +180,13 @@ async def test_power_of_two_choices(serve_instance): async def test_queue_remove_replicas(serve_instance): + @ray.remote + class TestRandomPolicyQueueActor(RandomPolicyQueue): + def worker_queue_size(self, backend): + return self.worker_queues["backend"].qsize() + temp_actor = make_task_runner_mock() - q = RandomPolicyQueue() - await q.add_new_worker("backend", temp_actor) - await q.remove_and_destroy_replica("backend", temp_actor) - assert q.worker_queues["backend"].qsize() == 0 + q = TestRandomPolicyQueueActor.remote() + await q.add_new_worker.remote("backend", temp_actor) + await q.remove_worker.remote("backend", temp_actor) + assert ray.get(q.worker_queue_size.remote("backend")) == 0