diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 4d199b3fe..9a2114756 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -366,7 +366,7 @@ class ServeMaster: Clears self.endpoints_to_remove. """ for endpoint_tag in self.endpoints_to_remove: - await self.router.remove_service.remote(endpoint_tag) + await self.router.remove_endpoint.remote(endpoint_tag) self.endpoints_to_remove.clear() def _scale_replicas(self, backend_tag, num_replicas): @@ -456,7 +456,7 @@ class ServeMaster: be added to the HTTP proxy (can only be accessed via a handle). """ async with self.write_lock: - # If this is a headless service with no route, key the endpoint + # If this is a headless endpoint with no route, key the endpoint # based on its name. # TODO(edoakes): we should probably just store routes and endpoints # separately. diff --git a/python/ray/serve/policy.py b/python/ray/serve/policy.py index eb6443c6d..5d957677a 100644 --- a/python/ray/serve/policy.py +++ b/python/ray/serve/policy.py @@ -17,18 +17,18 @@ class RandomPolicyQueue(Router): weights assigned to backends. """ - async def _flush_service_queues(self): + async def _flush_endpoint_queues(self): # perform traffic splitting for requests - for service, queue in self.service_queues.items(): + for endpoint, queue in self.endpoint_queues.items(): # while there are incoming requests and there are backends - while queue.qsize() and len(self.traffic[service]): - backend_names = list(self.traffic[service].keys()) - backend_weights = list(self.traffic[service].values()) + while queue.qsize() and len(self.traffic[endpoint]): + backend_names = list(self.traffic[endpoint].keys()) + backend_weights = list(self.traffic[endpoint].values()) # randomly choose a backend for every query chosen_backend = np.random.choice( backend_names, replace=False, p=backend_weights).squeeze() - logger.debug("Matching service {} to backend {}".format( - service, chosen_backend)) + logger.debug("Matching endpoint {} to backend {}".format( + endpoint, chosen_backend)) request = await queue.get() self.buffer_queues[chosen_backend].add(request) @@ -44,34 +44,34 @@ class RoundRobinPolicyQueue(Router): A wrapper class for RoundRobin policy. This backend selection policy is `Stateful` meaning the current decisions of selecting backend are dependent on previous decisions. RoundRobinPolicy assigns queries in - an interleaved manner to every backend serving for a service. Consider - backend A,B linked to a service. Now queries will be assigned to backends + an interleaved manner to every backend serving for an endpoint. Consider + backend A,B linked to a endpoint. Now queries will be assigned to backends in the following order - [ A, B, A, B ... ] . This policy doesn't use the weights assigned to backends. """ - # Saves the information about last assigned - # backend for every service + # Saves the information about last assigned backend for every endpoint. round_robin_iterator_map = {} - async def set_traffic(self, service, traffic_dict): - logger.debug("Setting traffic for service %s to %s", service, + async def set_traffic(self, endpoint, traffic_dict): + logger.debug("Setting traffic for endpoint %s to %s", endpoint, traffic_dict) - self.traffic[service] = traffic_dict - backend_names = list(self.traffic[service].keys()) - self.round_robin_iterator_map[service] = itertools.cycle(backend_names) + self.traffic[endpoint] = traffic_dict + backend_names = list(self.traffic[endpoint].keys()) + self.round_robin_iterator_map[endpoint] = itertools.cycle( + backend_names) await self.flush() - async def _flush_service_queues(self): + async def _flush_endpoint_queues(self): # perform traffic splitting for requests - for service, queue in self.service_queues.items(): + for endpoint, queue in self.endpoint_queues.items(): # if there are incoming requests and there are backends - if queue.qsize() and len(self.traffic[service]): + if queue.qsize() and len(self.traffic[endpoint]): while queue.qsize(): # choose the next backend available from persistent # information chosen_backend = next( - self.round_robin_iterator_map[service]) + self.round_robin_iterator_map[endpoint]) request = await queue.get() self.buffer_queues[chosen_backend].add(request) @@ -91,14 +91,14 @@ class PowerOfTwoPolicyQueue(Router): the weights assigned to backends. """ - async def _flush_service_queues(self): + async def _flush_endpoint_queues(self): # perform traffic splitting for requests - for service, queue in self.service_queues.items(): + for endpoint, queue in self.endpoint_queues.items(): # while there are incoming requests and there are backends - while queue.qsize() and len(self.traffic[service]): - backend_names = list(self.traffic[service].keys()) - backend_weights = list(self.traffic[service].values()) - if len(self.traffic[service]) >= 2: + while queue.qsize() and len(self.traffic[endpoint]): + backend_names = list(self.traffic[endpoint].keys()) + backend_weights = list(self.traffic[endpoint].values()) + if len(self.traffic[endpoint]) >= 2: # randomly pick 2 backends backend1, backend2 = np.random.choice( backend_names, 2, replace=False, p=backend_weights) @@ -134,38 +134,38 @@ class FixedPackingPolicyQueue(Router): on previous decisions. FixedPackingPolicy is k RoundRobin policy where first packing_num queries are handled by 'backend-1' and next k queries are handled by 'backend-2' and so on ... where 'backend-1' and 'backend-2' are - served by the same service. This policy doesn't use the weights assigned to - backends. + served by the same endpoint. This policy doesn't use the weights assigned + to backends. """ async def __init__(self, packing_num=3): # Saves the information about last assigned - # backend for every service + # backend for every endpoint self.fixed_packing_iterator_map = {} self.packing_num = packing_num await super().__init__() - async def set_traffic(self, service, traffic_dict): - logger.debug("Setting traffic for service %s to %s", service, + async def set_traffic(self, endpoint, traffic_dict): + logger.debug("Setting traffic for endpoint %s to %s", endpoint, traffic_dict) - self.traffic[service] = traffic_dict - backend_names = list(self.traffic[service].keys()) - self.fixed_packing_iterator_map[service] = itertools.cycle( + self.traffic[endpoint] = traffic_dict + backend_names = list(self.traffic[endpoint].keys()) + self.fixed_packing_iterator_map[endpoint] = itertools.cycle( itertools.chain.from_iterable( itertools.repeat(x, self.packing_num) for x in backend_names)) await self.flush() - async def _flush_service_queues(self): + async def _flush_endpoint_queues(self): # perform traffic splitting for requests - for service, queue in self.service_queues.items(): + for endpoint, queue in self.endpoint_queues.items(): # if there are incoming requests and there are backends - if queue.qsize() and len(self.traffic[service]): + if queue.qsize() and len(self.traffic[endpoint]): while queue.qsize(): # choose the next backend available from persistent # information chosen_backend = next( - self.fixed_packing_iterator_map[service]) + self.fixed_packing_iterator_map[endpoint]) request = await queue.get() self.buffer_queues[chosen_backend].add(request) diff --git a/python/ray/serve/request_params.py b/python/ray/serve/request_params.py index 0d5015f25..156c93903 100644 --- a/python/ray/serve/request_params.py +++ b/python/ray/serve/request_params.py @@ -5,10 +5,10 @@ import ray.cloudpickle as pickle class RequestMetadata: """ - Request Arguments required for enqueuing a request to the service - queue. + Request arguments required for enqueuing a request to the endpoint queue. + Args: - service(str): A registered service endpoint. + endpoint(str): A registered endpoint. request_context(TaskContext): Context of a request. request_slo_ms(float): Expected time for the query to get completed. @@ -17,20 +17,20 @@ class RequestMetadata: """ def __init__(self, - service, + endpoint, request_context, relative_slo_ms=None, absolute_slo_ms=None, call_method="__call__"): - self.service = service + self.endpoint = endpoint self.request_context = request_context self.relative_slo_ms = relative_slo_ms self.absolute_slo_ms = absolute_slo_ms self.call_method = call_method def adjust_relative_slo_ms(self) -> float: - """Normalize the input latency objective to absoluate timestamp. + """Normalize the input latency objective to absolute timestamp. """ slo_ms = self.relative_slo_ms diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index c26e3c17f..b048182ad 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -104,18 +104,18 @@ class Router: async def __init__(self): # Note: Several queues are used in the router # - When a request come in, it's placed inside its corresponding - # service_queue. - # - The service_queue is dequed during flush operation, which moves + # endpoint_queue. + # - The endpoint_queue is dequeued during flush operation, which moves # the queries to backend buffer_queue. Here we match a request - # for a service to a backend given some policy. + # for an endpoint to a backend given some policy. # - The worker_queue is used to collect idle actor handle. These # handles are dequed during the second stage of flush operation, # which assign queries in buffer_queue to actor handle. # -- Queues -- # - # service_name -> request queue - self.service_queues: DefaultDict[asyncio.Queue[Query]] = defaultdict( + # endpoint_name -> request queue + self.endpoint_queues: DefaultDict[asyncio.Queue[Query]] = defaultdict( asyncio.Queue) # backend_name -> worker request queue self.worker_queues: DefaultDict[asyncio.Queue[ @@ -125,7 +125,7 @@ class Router: # -- Metadata -- # - # service_name -> traffic_policy + # endpoint_name -> traffic_policy self.traffic = defaultdict(dict) # backend_name -> backend_config self.backend_info = dict() @@ -180,8 +180,8 @@ class Router: async def enqueue_request(self, request_meta, *request_args, **request_kwargs): - service = request_meta.service - logger.debug("Received a request for service {}".format(service)) + endpoint = request_meta.endpoint + logger.debug("Received a request for endpoint {}".format(endpoint)) # check if the slo specified is directly the # wall clock time @@ -197,7 +197,7 @@ class Router: request_slo_ms, call_method=request_meta.call_method, async_future=asyncio.get_event_loop().create_future()) - await self.service_queues[service].put(query) + await self.endpoint_queues[endpoint].put(query) await self.flush() # Note: a future change can be to directly return the ObjectID from @@ -244,21 +244,21 @@ class Router: # on it. worker_handle.__ray_terminate__.remote() - async def set_traffic(self, service, traffic_dict): - logger.debug("Setting traffic for service %s to %s", service, + async def set_traffic(self, endpoint, traffic_dict): + logger.debug("Setting traffic for endpoint %s to %s", endpoint, traffic_dict) - self.traffic[service] = traffic_dict + self.traffic[endpoint] = traffic_dict await self.flush() - async def remove_service(self, service): - logger.debug("Removing service {}".format(service)) + async def remove_endpoint(self, endpoint): + logger.debug("Removing endpoint {}".format(endpoint)) async with self.flush_lock: - await self._flush_service_queues() + await self._flush_endpoint_queues() await self._flush_buffer_queues() - if service in self.service_queues: - del self.service_queues[service] - if service in self.traffic: - del self.traffic[service] + if endpoint in self.endpoint_queues: + del self.endpoint_queues[endpoint] + if endpoint in self.traffic: + del self.traffic[endpoint] async def set_backend_config(self, backend, config): logger.debug("Setting backend config for " @@ -268,7 +268,7 @@ class Router: async def remove_backend(self, backend): logger.debug("Removing backend {}".format(backend)) async with self.flush_lock: - await self._flush_service_queues() + await self._flush_endpoint_queues() await self._flush_buffer_queues() if backend in self.backend_info: del self.backend_info[backend] @@ -284,11 +284,11 @@ class Router: method invocation. """ async with self.flush_lock: - await self._flush_service_queues() + await self._flush_endpoint_queues() await self._flush_buffer_queues() - def _get_available_backends(self, service): - backends_in_policy = set(self.traffic[service].keys()) + def _get_available_backends(self, endpoint): + backends_in_policy = set(self.traffic[endpoint].keys()) available_workers = { backend for backend, queues in self.worker_queues.items() @@ -296,25 +296,25 @@ class Router: } return list(backends_in_policy.intersection(available_workers)) - async def _flush_service_queues(self): - """Selects the backend and puts the service queue query to the buffer + async def _flush_endpoint_queues(self): + """Selects the backend and puts the endpoint queue query to the buffer Expected Implementation: The implementer is expected to access and manipulate - self.service_queues : dict[str,Deque] + self.endpoint_queues : dict[str,Deque] self.buffer_queues : dict[str,sortedlist] For registering the implemented policies register at policy.py Expected Behavior: - the Deque of all services in self.service_queues linked with + the Deque of all endpoints in self.endpoint_queues linked with atleast one backend must be empty irrespective of whatever backend policy is implemented. """ raise NotImplementedError( "This method should be implemented by child class.") - # flushes the buffer queue and assigns work to workers + # Flushes the buffer queue and assigns work to workers. async def _flush_buffer_queues(self): - for service in self.traffic.keys(): - ready_backends = self._get_available_backends(service) + for endpoint in self.traffic: + ready_backends = self._get_available_backends(endpoint) for backend in ready_backends: # no work available if len(self.buffer_queues[backend]) == 0: