service -> endpoint in router (#8269)

This commit is contained in:
Edward Oakes
2020-05-01 11:55:34 -05:00
committed by GitHub
parent 421b3c9d8b
commit 63bc7dc522
4 changed files with 76 additions and 76 deletions
+2 -2
View File
@@ -366,7 +366,7 @@ class ServeMaster:
Clears self.endpoints_to_remove.
"""
for endpoint_tag in self.endpoints_to_remove:
await self.router.remove_service.remote(endpoint_tag)
await self.router.remove_endpoint.remote(endpoint_tag)
self.endpoints_to_remove.clear()
def _scale_replicas(self, backend_tag, num_replicas):
@@ -456,7 +456,7 @@ class ServeMaster:
be added to the HTTP proxy (can only be accessed via a handle).
"""
async with self.write_lock:
# If this is a headless service with no route, key the endpoint
# If this is a headless endpoint with no route, key the endpoint
# based on its name.
# TODO(edoakes): we should probably just store routes and endpoints
# separately.
+38 -38
View File
@@ -17,18 +17,18 @@ class RandomPolicyQueue(Router):
weights assigned to backends.
"""
async def _flush_service_queues(self):
async def _flush_endpoint_queues(self):
# perform traffic splitting for requests
for service, queue in self.service_queues.items():
for endpoint, queue in self.endpoint_queues.items():
# while there are incoming requests and there are backends
while queue.qsize() and len(self.traffic[service]):
backend_names = list(self.traffic[service].keys())
backend_weights = list(self.traffic[service].values())
while queue.qsize() and len(self.traffic[endpoint]):
backend_names = list(self.traffic[endpoint].keys())
backend_weights = list(self.traffic[endpoint].values())
# randomly choose a backend for every query
chosen_backend = np.random.choice(
backend_names, replace=False, p=backend_weights).squeeze()
logger.debug("Matching service {} to backend {}".format(
service, chosen_backend))
logger.debug("Matching endpoint {} to backend {}".format(
endpoint, chosen_backend))
request = await queue.get()
self.buffer_queues[chosen_backend].add(request)
@@ -44,34 +44,34 @@ class RoundRobinPolicyQueue(Router):
A wrapper class for RoundRobin policy. This backend selection policy
is `Stateful` meaning the current decisions of selecting backend are
dependent on previous decisions. RoundRobinPolicy assigns queries in
an interleaved manner to every backend serving for a service. Consider
backend A,B linked to a service. Now queries will be assigned to backends
an interleaved manner to every backend serving for an endpoint. Consider
backend A,B linked to a endpoint. Now queries will be assigned to backends
in the following order - [ A, B, A, B ... ] . This policy doesn't use the
weights assigned to backends.
"""
# Saves the information about last assigned
# backend for every service
# Saves the information about last assigned backend for every endpoint.
round_robin_iterator_map = {}
async def set_traffic(self, service, traffic_dict):
logger.debug("Setting traffic for service %s to %s", service,
async def set_traffic(self, endpoint, traffic_dict):
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
traffic_dict)
self.traffic[service] = traffic_dict
backend_names = list(self.traffic[service].keys())
self.round_robin_iterator_map[service] = itertools.cycle(backend_names)
self.traffic[endpoint] = traffic_dict
backend_names = list(self.traffic[endpoint].keys())
self.round_robin_iterator_map[endpoint] = itertools.cycle(
backend_names)
await self.flush()
async def _flush_service_queues(self):
async def _flush_endpoint_queues(self):
# perform traffic splitting for requests
for service, queue in self.service_queues.items():
for endpoint, queue in self.endpoint_queues.items():
# if there are incoming requests and there are backends
if queue.qsize() and len(self.traffic[service]):
if queue.qsize() and len(self.traffic[endpoint]):
while queue.qsize():
# choose the next backend available from persistent
# information
chosen_backend = next(
self.round_robin_iterator_map[service])
self.round_robin_iterator_map[endpoint])
request = await queue.get()
self.buffer_queues[chosen_backend].add(request)
@@ -91,14 +91,14 @@ class PowerOfTwoPolicyQueue(Router):
the weights assigned to backends.
"""
async def _flush_service_queues(self):
async def _flush_endpoint_queues(self):
# perform traffic splitting for requests
for service, queue in self.service_queues.items():
for endpoint, queue in self.endpoint_queues.items():
# while there are incoming requests and there are backends
while queue.qsize() and len(self.traffic[service]):
backend_names = list(self.traffic[service].keys())
backend_weights = list(self.traffic[service].values())
if len(self.traffic[service]) >= 2:
while queue.qsize() and len(self.traffic[endpoint]):
backend_names = list(self.traffic[endpoint].keys())
backend_weights = list(self.traffic[endpoint].values())
if len(self.traffic[endpoint]) >= 2:
# randomly pick 2 backends
backend1, backend2 = np.random.choice(
backend_names, 2, replace=False, p=backend_weights)
@@ -134,38 +134,38 @@ class FixedPackingPolicyQueue(Router):
on previous decisions. FixedPackingPolicy is k RoundRobin policy where
first packing_num queries are handled by 'backend-1' and next k queries are
handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are
served by the same service. This policy doesn't use the weights assigned to
backends.
served by the same endpoint. This policy doesn't use the weights assigned
to backends.
"""
async def __init__(self, packing_num=3):
# Saves the information about last assigned
# backend for every service
# backend for every endpoint
self.fixed_packing_iterator_map = {}
self.packing_num = packing_num
await super().__init__()
async def set_traffic(self, service, traffic_dict):
logger.debug("Setting traffic for service %s to %s", service,
async def set_traffic(self, endpoint, traffic_dict):
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
traffic_dict)
self.traffic[service] = traffic_dict
backend_names = list(self.traffic[service].keys())
self.fixed_packing_iterator_map[service] = itertools.cycle(
self.traffic[endpoint] = traffic_dict
backend_names = list(self.traffic[endpoint].keys())
self.fixed_packing_iterator_map[endpoint] = itertools.cycle(
itertools.chain.from_iterable(
itertools.repeat(x, self.packing_num) for x in backend_names))
await self.flush()
async def _flush_service_queues(self):
async def _flush_endpoint_queues(self):
# perform traffic splitting for requests
for service, queue in self.service_queues.items():
for endpoint, queue in self.endpoint_queues.items():
# if there are incoming requests and there are backends
if queue.qsize() and len(self.traffic[service]):
if queue.qsize() and len(self.traffic[endpoint]):
while queue.qsize():
# choose the next backend available from persistent
# information
chosen_backend = next(
self.fixed_packing_iterator_map[service])
self.fixed_packing_iterator_map[endpoint])
request = await queue.get()
self.buffer_queues[chosen_backend].add(request)
+6 -6
View File
@@ -5,10 +5,10 @@ import ray.cloudpickle as pickle
class RequestMetadata:
"""
Request Arguments required for enqueuing a request to the service
queue.
Request arguments required for enqueuing a request to the endpoint queue.
Args:
service(str): A registered service endpoint.
endpoint(str): A registered endpoint.
request_context(TaskContext): Context of a request.
request_slo_ms(float): Expected time for the query to get
completed.
@@ -17,20 +17,20 @@ class RequestMetadata:
"""
def __init__(self,
service,
endpoint,
request_context,
relative_slo_ms=None,
absolute_slo_ms=None,
call_method="__call__"):
self.service = service
self.endpoint = endpoint
self.request_context = request_context
self.relative_slo_ms = relative_slo_ms
self.absolute_slo_ms = absolute_slo_ms
self.call_method = call_method
def adjust_relative_slo_ms(self) -> float:
"""Normalize the input latency objective to absoluate timestamp.
"""Normalize the input latency objective to absolute timestamp.
"""
slo_ms = self.relative_slo_ms
+30 -30
View File
@@ -104,18 +104,18 @@ class Router:
async def __init__(self):
# Note: Several queues are used in the router
# - When a request come in, it's placed inside its corresponding
# service_queue.
# - The service_queue is dequed during flush operation, which moves
# endpoint_queue.
# - The endpoint_queue is dequeued during flush operation, which moves
# the queries to backend buffer_queue. Here we match a request
# for a service to a backend given some policy.
# for an endpoint to a backend given some policy.
# - The worker_queue is used to collect idle actor handle. These
# handles are dequed during the second stage of flush operation,
# which assign queries in buffer_queue to actor handle.
# -- Queues -- #
# service_name -> request queue
self.service_queues: DefaultDict[asyncio.Queue[Query]] = defaultdict(
# endpoint_name -> request queue
self.endpoint_queues: DefaultDict[asyncio.Queue[Query]] = defaultdict(
asyncio.Queue)
# backend_name -> worker request queue
self.worker_queues: DefaultDict[asyncio.Queue[
@@ -125,7 +125,7 @@ class Router:
# -- Metadata -- #
# service_name -> traffic_policy
# endpoint_name -> traffic_policy
self.traffic = defaultdict(dict)
# backend_name -> backend_config
self.backend_info = dict()
@@ -180,8 +180,8 @@ class Router:
async def enqueue_request(self, request_meta, *request_args,
**request_kwargs):
service = request_meta.service
logger.debug("Received a request for service {}".format(service))
endpoint = request_meta.endpoint
logger.debug("Received a request for endpoint {}".format(endpoint))
# check if the slo specified is directly the
# wall clock time
@@ -197,7 +197,7 @@ class Router:
request_slo_ms,
call_method=request_meta.call_method,
async_future=asyncio.get_event_loop().create_future())
await self.service_queues[service].put(query)
await self.endpoint_queues[endpoint].put(query)
await self.flush()
# Note: a future change can be to directly return the ObjectID from
@@ -244,21 +244,21 @@ class Router:
# on it.
worker_handle.__ray_terminate__.remote()
async def set_traffic(self, service, traffic_dict):
logger.debug("Setting traffic for service %s to %s", service,
async def set_traffic(self, endpoint, traffic_dict):
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
traffic_dict)
self.traffic[service] = traffic_dict
self.traffic[endpoint] = traffic_dict
await self.flush()
async def remove_service(self, service):
logger.debug("Removing service {}".format(service))
async def remove_endpoint(self, endpoint):
logger.debug("Removing endpoint {}".format(endpoint))
async with self.flush_lock:
await self._flush_service_queues()
await self._flush_endpoint_queues()
await self._flush_buffer_queues()
if service in self.service_queues:
del self.service_queues[service]
if service in self.traffic:
del self.traffic[service]
if endpoint in self.endpoint_queues:
del self.endpoint_queues[endpoint]
if endpoint in self.traffic:
del self.traffic[endpoint]
async def set_backend_config(self, backend, config):
logger.debug("Setting backend config for "
@@ -268,7 +268,7 @@ class Router:
async def remove_backend(self, backend):
logger.debug("Removing backend {}".format(backend))
async with self.flush_lock:
await self._flush_service_queues()
await self._flush_endpoint_queues()
await self._flush_buffer_queues()
if backend in self.backend_info:
del self.backend_info[backend]
@@ -284,11 +284,11 @@ class Router:
method invocation.
"""
async with self.flush_lock:
await self._flush_service_queues()
await self._flush_endpoint_queues()
await self._flush_buffer_queues()
def _get_available_backends(self, service):
backends_in_policy = set(self.traffic[service].keys())
def _get_available_backends(self, endpoint):
backends_in_policy = set(self.traffic[endpoint].keys())
available_workers = {
backend
for backend, queues in self.worker_queues.items()
@@ -296,25 +296,25 @@ class Router:
}
return list(backends_in_policy.intersection(available_workers))
async def _flush_service_queues(self):
"""Selects the backend and puts the service queue query to the buffer
async def _flush_endpoint_queues(self):
"""Selects the backend and puts the endpoint queue query to the buffer
Expected Implementation:
The implementer is expected to access and manipulate
self.service_queues : dict[str,Deque]
self.endpoint_queues : dict[str,Deque]
self.buffer_queues : dict[str,sortedlist]
For registering the implemented policies register at policy.py
Expected Behavior:
the Deque of all services in self.service_queues linked with
the Deque of all endpoints in self.endpoint_queues linked with
atleast one backend must be empty irrespective of whatever
backend policy is implemented.
"""
raise NotImplementedError(
"This method should be implemented by child class.")
# flushes the buffer queue and assigns work to workers
# Flushes the buffer queue and assigns work to workers.
async def _flush_buffer_queues(self):
for service in self.traffic.keys():
ready_backends = self._get_available_backends(service)
for endpoint in self.traffic:
ready_backends = self._get_available_backends(endpoint)
for backend in ready_backends:
# no work available
if len(self.buffer_queues[backend]) == 0: