[serve] Router fault tolerance (#8008)

This commit is contained in:
Edward Oakes
2020-04-19 11:04:06 -05:00
committed by GitHub
parent 165a86f1ab
commit da296bf8c5
7 changed files with 154 additions and 59 deletions
+1 -1
View File
@@ -149,7 +149,7 @@ class HTTPProxy:
if not isinstance(result, ray.exceptions.RayActorError):
await Response(result).send(scope, receive, send)
break
logger.warning("Got RayActorError:", str(result))
logger.warning("Got RayActorError: {}".format(str(result)))
await asyncio.sleep(0.1)
except Exception as e:
error_message = "Internal Error. Traceback: {}.".format(e)
+75 -30
View File
@@ -1,3 +1,8 @@
import asyncio
from collections import defaultdict
from functools import wraps
import inspect
import ray
from ray.serve.backend_config import BackendConfig
from ray.serve.constants import ASYNC_CONCURRENCY
@@ -7,11 +12,44 @@ from ray.serve.kv_store_service import (BackendTable, RoutingTable,
TrafficPolicyTable)
from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop)
from ray.serve.backend_worker import create_backend_worker
from ray.serve.utils import expand, get_random_letters
from ray.serve.utils import expand, get_random_letters, logger
import numpy as np
def async_retryable(cls):
"""Make all actor method invocations on the class retryable.
Note: This will retry actor_handle.method_name.remote(), but it must
be invoked in an async context.
Usage:
@ray.remote(max_reconstructions=10000)
@async_retryable
class A:
pass
"""
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
def decorate_with_retry(f):
@wraps(f)
async def retry_method(*args, **kwargs):
while True:
result = await f(*args, **kwargs)
if isinstance(result, ray.exceptions.RayActorError):
logger.warning(
"Actor method '{}' failed, retrying after 100ms.".
format(name))
await asyncio.sleep(0.1)
else:
return result
return retry_method
method.__ray_invocation_decorator__ = decorate_with_retry
return cls
@ray.remote
class ServeMaster:
"""Initialize and store all actor handles.
@@ -27,7 +65,8 @@ class ServeMaster:
self.route_table = RoutingTable(kv_store_connector)
self.backend_table = BackendTable(kv_store_connector)
self.policy_table = TrafficPolicyTable(kv_store_connector)
self.replica_tag_to_workers = dict()
# Dictionary of backend tag to dictionaries of replica tag to worker.
self.workers = defaultdict(dict)
self.router = None
self.http_proxy = None
@@ -38,8 +77,10 @@ class ServeMaster:
def start_router(self, router_class, init_kwargs):
assert self.router is None, "Router already started."
self.router = router_class.options(
max_concurrency=ASYNC_CONCURRENCY).remote(**init_kwargs)
self.router = async_retryable(router_class).options(
max_concurrency=ASYNC_CONCURRENCY,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
).remote(**init_kwargs)
def get_router(self):
assert self.router is not None, "Router not started yet."
@@ -54,7 +95,7 @@ class ServeMaster:
assert self.http_proxy is None, "HTTP proxy already started."
assert self.router is not None, (
"Router must be started before HTTP proxy.")
self.http_proxy = HTTPProxyActor.options(
self.http_proxy = async_retryable(HTTPProxyActor).options(
max_concurrency=ASYNC_CONCURRENCY,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
).remote(host, port)
@@ -103,7 +144,7 @@ class ServeMaster:
await self._start_backend_replica(backend_tag)
elif delta_num_replicas < 0:
for _ in range(-delta_num_replicas):
self._remove_backend_replica(backend_tag)
await self._remove_backend_replica(backend_tag)
async def get_backend_worker_config(self):
return self.get_router()
@@ -118,6 +159,7 @@ class ServeMaster:
# TODO(edoakes): we should guarantee that if calls to the master
# succeed, the cluster state has changed and if they fail, it hasn't.
# Once we have master actor fault tolerance, this breaks that guarantee
# because this method could fail after writing the replica to the DB.
self.backend_table.add_replica(backend_tag, replica_tag)
@@ -136,17 +178,18 @@ class ServeMaster:
# Start the worker.
worker_handle = backend_actor._remote(**kwargs)
self.replica_tag_to_workers[replica_tag] = worker_handle
self.workers[backend_tag][replica_tag] = worker_handle
# Wait for the worker to start up.
await worker_handle.ready.remote()
await self.get_router()[0].add_new_worker.remote(
backend_tag, worker_handle)
[router] = self.get_router()
await router.add_new_worker.remote(backend_tag, worker_handle)
# Register the worker with the metric monitor.
self.get_metric_monitor()[0].add_target.remote(worker_handle)
def _remove_backend_replica(self, backend_tag):
async def _remove_backend_replica(self, backend_tag):
assert (backend_tag in self.backend_table.list_backends()
), "Backend {} is not registered.".format(backend_tag)
assert (len(self._list_replicas(backend_tag)) >
@@ -154,28 +197,29 @@ class ServeMaster:
backend_tag)
replica_tag = self.backend_table.remove_replica(backend_tag)
assert replica_tag in self.replica_tag_to_workers
replica_handle = self.replica_tag_to_workers.pop(replica_tag)
assert backend_tag in self.workers
assert replica_tag in self.workers[backend_tag]
replica_handle = self.workers[backend_tag].pop(replica_tag)
if len(self.workers[backend_tag]) == 0:
del self.workers[backend_tag]
# Remove the replica from metric monitor.
[monitor] = self.get_metric_monitor()
ray.get(monitor.remove_target.remote(replica_handle))
await monitor.remove_target.remote(replica_handle)
# Remove the replica from router.
# This will also destroy the actor handle.
[router] = self.get_router()
ray.get(
router.remove_and_destroy_replica.remote(backend_tag,
replica_handle))
await router.remove_worker.remote(backend_tag, replica_handle)
def get_all_worker_handles(self):
return self.replica_tag_to_workers
return self.workers
def get_all_endpoints(self):
return expand(
self.route_table.list_service(include_headless=True).values())
def split_traffic(self, endpoint_name, traffic_policy_dictionary):
async def split_traffic(self, endpoint_name, traffic_policy_dictionary):
assert endpoint_name in expand(
self.route_table.list_service(include_headless=True).values())
@@ -193,18 +237,18 @@ class ServeMaster:
self.policy_table.register_traffic_policy(endpoint_name,
traffic_policy_dictionary)
[router] = self.get_router()
ray.get(
router.set_traffic.remote(endpoint_name,
traffic_policy_dictionary))
def create_endpoint(self, route, endpoint_name, methods):
await router.set_traffic.remote(endpoint_name,
traffic_policy_dictionary)
async def create_endpoint(self, route, endpoint_name, methods):
self.route_table.register_service(
route, endpoint_name, methods=methods)
[http_proxy] = self.get_http_proxy()
ray.get(
http_proxy.set_route_table.remote(
self.route_table.list_service(
include_methods=True, include_headless=False)))
await http_proxy.set_route_table.remote(
self.route_table.list_service(
include_methods=True, include_headless=False))
async def create_backend(self, backend_tag, backend_config, func_or_class,
actor_init_args):
@@ -223,8 +267,9 @@ class ServeMaster:
# Set the backend config inside the router
# (particularly for max-batch-size).
[router] = self.get_router()
ray.get(
router.set_backend_config.remote(backend_tag, backend_config_dict))
await router.set_backend_config.remote(backend_tag,
backend_config_dict)
await self.scale_replicas(backend_tag,
backend_config_dict["num_replicas"])
@@ -246,8 +291,8 @@ class ServeMaster:
# Inform the router about change in configuration
# (particularly for setting max_batch_size).
[router] = self.get_router()
ray.get(
router.set_backend_config.remote(backend_tag, backend_config_dict))
await router.set_backend_config.remote(backend_tag,
backend_config_dict)
# Restart replicas if there is a change in the backend config related
# to restart_configs.
+2 -2
View File
@@ -139,12 +139,12 @@ class FixedPackingPolicyQueue(Router):
"""
def __init__(self, packing_num=3):
async def __init__(self, packing_num=3):
# Saves the information about last assigned
# backend for every service
self.fixed_packing_iterator_map = {}
self.packing_num = packing_num
super().__init__()
await super().__init__()
async def set_traffic(self, service, traffic_dict):
logger.debug("Setting traffic for service %s to %s", service,
+19 -9
View File
@@ -2,7 +2,6 @@ import asyncio
import copy
from collections import defaultdict
from typing import DefaultDict, List
import ray.cloudpickle as pickle
# Note on choosing blist instead of stdlib heapq
# 1. pop operation should be O(1) (amortized)
@@ -13,6 +12,7 @@ import ray.cloudpickle as pickle
import blist
import ray
import ray.cloudpickle as pickle
from ray.serve.utils import logger
@@ -110,7 +110,7 @@ class Router:
3. When there is only 1 backend ready, we will only use that backend.
"""
def __init__(self):
async def __init__(self):
# Note: Several queues are used in the router
# - When a request come in, it's placed inside its corresponding
# service_queue.
@@ -150,6 +150,16 @@ class Router:
# batching polcies.
self.flush_lock = asyncio.Lock()
# Fetch the worker handles from the master actor. We use a "pull-based"
# approach instead of pushing them from the master so that the router
# can transparently recover from failure.
ray.serve.init()
master_actor = ray.serve.api._get_master_actor()
backend_dict = ray.get(master_actor.get_all_worker_handles.remote())
for backend, replica_dict in backend_dict.items():
for worker in replica_dict.values():
await self.add_new_worker(backend, worker)
def is_ready(self):
return True
@@ -197,21 +207,21 @@ class Router:
await self.worker_queues[backend].put(worker_handle)
await self.flush()
async def remove_and_destroy_replica(self, backend, replica_handle):
async def remove_worker(self, backend, worker_handle):
# We need this lock because we modify worker_queue here.
async with self.flush_lock:
old_queue = self.worker_queues[backend]
new_queue = asyncio.Queue()
target_id = replica_handle._actor_id
target_id = worker_handle._actor_id
while not old_queue.empty():
replica_handle = await old_queue.get()
if replica_handle._actor_id != target_id:
await new_queue.put(replica_handle)
worker_handle = await old_queue.get()
if worker_handle._actor_id != target_id:
await new_queue.put(worker_handle)
self.worker_queues[backend] = new_queue
# TODO: consider await this with timeout, or use ray_kill
replica_handle.__ray_terminate__.remote()
# TODO: consider awaiting this on a timeout or using ray.kill().
worker_handle.__ray_terminate__.remote()
async def link(self, service, backend):
logger.debug("Link %s with %s", service, backend)
+8 -4
View File
@@ -192,8 +192,10 @@ def test_killing_replicas(serve_instance):
serve.set_backend_config("simple:v1", bnew_config)
new_replica_tag_list = ray.get(
master_actor._list_replicas.remote("simple:v1"))
new_all_tag_list = list(
ray.get(master_actor.get_all_worker_handles.remote()).keys())
new_all_tag_list = []
for worker_dict in ray.get(
master_actor.get_all_worker_handles.remote()).values():
new_all_tag_list.extend(list(worker_dict.keys()))
# the new_replica_tag_list must be subset of all_tag_list
assert set(new_replica_tag_list) <= set(new_all_tag_list)
@@ -226,8 +228,10 @@ def test_not_killing_replicas(serve_instance):
serve.set_backend_config("bsimple:v1", bnew_config)
new_replica_tag_list = ray.get(
master_actor._list_replicas.remote("bsimple:v1"))
new_all_tag_list = list(
ray.get(master_actor.get_all_worker_handles.remote()).keys())
new_all_tag_list = []
for worker_dict in ray.get(
master_actor.get_all_worker_handles.remote()).values():
new_all_tag_list.extend(list(worker_dict.keys()))
# the old and new replica tag list should be identical
# and should be subset of all_tag_list
+40 -9
View File
@@ -54,14 +54,45 @@ def test_http_proxy_failure(serve_instance):
assert response.text == "hello2"
def _get_worker_handles(backend):
handles = {}
for tag, handle in ray.get(serve.api._get_master_actor()
.get_all_worker_handles.remote()).items():
if tag.startswith(backend):
handles[tag] = handle
def _kill_router():
[router] = ray.get(serve.api._get_master_actor().get_router.remote())
ray.kill(router)
return handles
def test_router_failure(serve_instance):
serve.init()
serve.create_endpoint("router_failure", "/router_failure", methods=["GET"])
def function():
return "hello1"
serve.create_backend(function, "router_failure:v1")
serve.link("router_failure", "router_failure:v1")
assert request_with_retries("/router_failure", timeout=5).text == "hello1"
for _ in range(10):
response = request_with_retries("/router_failure", timeout=30)
assert response.text == "hello1"
_kill_router()
def function():
return "hello2"
serve.create_backend(function, "router_failure:v2")
serve.link("router_failure", "router_failure:v2")
for _ in range(10):
response = request_with_retries("/router_failure", timeout=30)
assert response.text == "hello2"
def _get_worker_handles(backend):
master_actor = serve.api._get_master_actor()
backend_dict = ray.get(master_actor.get_all_worker_handles.remote())
return list(backend_dict[backend].values())
# Test that a worker dying unexpectedly causes it to restart and continue
@@ -83,7 +114,7 @@ def test_worker_restart(serve_instance):
# Kill the worker.
handles = _get_worker_handles("worker_failure:v1")
assert len(handles) == 1
ray.kill(list(handles.values())[0])
ray.kill(handles[0])
# Wait until the worker is killed and a one is started.
start = time.time()
@@ -145,7 +176,7 @@ def test_worker_replica_failure(serve_instance):
# Kill one of the replicas.
handles = _get_worker_handles("replica_failure")
assert len(handles) == 2
ray.kill(list(handles.values())[0])
ray.kill(handles[0])
# Check that the other replica still serves requests.
for _ in range(10):
+9 -4
View File
@@ -180,8 +180,13 @@ async def test_power_of_two_choices(serve_instance):
async def test_queue_remove_replicas(serve_instance):
@ray.remote
class TestRandomPolicyQueueActor(RandomPolicyQueue):
def worker_queue_size(self, backend):
return self.worker_queues["backend"].qsize()
temp_actor = make_task_runner_mock()
q = RandomPolicyQueue()
await q.add_new_worker("backend", temp_actor)
await q.remove_and_destroy_replica("backend", temp_actor)
assert q.worker_queues["backend"].qsize() == 0
q = TestRandomPolicyQueueActor.remote()
await q.add_new_worker.remote("backend", temp_actor)
await q.remove_worker.remote("backend", temp_actor)
assert ray.get(q.worker_queue_size.remote("backend")) == 0