diff --git a/python/ray/experimental/serve/api.py b/python/ray/experimental/serve/api.py index 8476ad79e..d04cb77be 100644 --- a/python/ray/experimental/serve/api.py +++ b/python/ray/experimental/serve/api.py @@ -1,6 +1,7 @@ import inspect from functools import wraps from tempfile import mkstemp + from multiprocessing import cpu_count import numpy as np @@ -13,7 +14,7 @@ from ray.experimental.serve.global_state import (GlobalState, from ray.experimental.serve.kv_store_service import SQLiteKVStore from ray.experimental.serve.task_runner import RayServeMixin, TaskRunnerActor from ray.experimental.serve.utils import (block_until_http_ready, - get_random_letters) + get_random_letters, expand) from ray.experimental.serve.exceptions import RayServeException from ray.experimental.serve.backend_config import BackendConfig from ray.experimental.serve.policy import RoutePolicy @@ -64,6 +65,7 @@ def accept_batch(f): def init(kv_store_connector=None, kv_store_path=None, blocking=False, + start_server=True, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, ray_init_kwargs={ @@ -87,6 +89,8 @@ def init(kv_store_connector=None, kv_store_path (str, path): Path to the SQLite table. blocking (bool): If true, the function will wait for the HTTP server to be healthy, and other components to be ready before returns. + start_server (bool): If true, `serve.init` starts http server. + (Default: True) http_host (str): Host for HTTP server. Default to "0.0.0.0". http_port (int): Port for HTTP server. Default to 8000. ray_init_kwargs (dict): Argument passed to ray.init, if there is no ray @@ -132,18 +136,19 @@ def init(kv_store_connector=None, nursery = start_initial_state(kv_store_connector) global_state = GlobalState(nursery) - global_state.init_or_get_http_server(host=http_host, port=http_port) + if start_server: + global_state.init_or_get_http_server(host=http_host, port=http_port) global_state.init_or_get_router( queueing_policy=queueing_policy, policy_kwargs=policy_kwargs) global_state.init_or_get_metric_monitor( gc_window_seconds=gc_window_seconds) - if blocking: + if start_server and blocking: block_until_http_ready("http://{}:{}".format(http_host, http_port)) @_ensure_connected -def create_endpoint(endpoint_name, route, blocking=True): +def create_endpoint(endpoint_name, route=None, blocking=True): """Create a service endpoint given route_expression. Args: @@ -396,7 +401,8 @@ def split(endpoint_name, traffic_policy_dictionary): traffic_policy_dictionary (dict): a dictionary maps backend names to their traffic weights. The weights must sum to 1. """ - assert endpoint_name in global_state.route_table.list_service().values() + assert endpoint_name in expand( + global_state.route_table.list_service(include_headless=True).values()) assert isinstance(traffic_policy_dictionary, dict), "Traffic policy must be dictionary" @@ -430,7 +436,8 @@ def get_handle(endpoint_name, relative_slo_ms=None, absolute_slo_ms=None): Returns: RayServeHandle """ - assert endpoint_name in global_state.route_table.list_service().values() + assert endpoint_name in expand( + global_state.route_table.list_service(include_headless=True).values()) # Delay import due to it's dependency on global_state from ray.experimental.serve.handle import RayServeHandle diff --git a/python/ray/experimental/serve/constants.py b/python/ray/experimental/serve/constants.py index 1819e1825..fb0d94793 100644 --- a/python/ray/experimental/serve/constants.py +++ b/python/ray/experimental/serve/constants.py @@ -21,3 +21,6 @@ ASYNC_CONCURRENCY = int(1e6) #: Default latency SLO DEFAULT_LATENCY_SLO_MS = 1e9 + +#: Key for storing no http route services +NO_ROUTE_KEY = "NO_ROUTE" diff --git a/python/ray/experimental/serve/kv_store_service.py b/python/ray/experimental/serve/kv_store_service.py index 83aacc5f9..7d24b84b9 100644 --- a/python/ray/experimental/serve/kv_store_service.py +++ b/python/ray/experimental/serve/kv_store_service.py @@ -6,6 +6,8 @@ from ray import cloudpickle as pickle import ray.experimental.internal_kv as ray_kv from ray.experimental.serve.utils import logger +from typing import Union +from ray.experimental.serve.constants import NO_ROUTE_KEY class NamespacedKVStore(ABC): @@ -174,7 +176,7 @@ class RoutingTable: self.routing_table = kv_connector("routing_table") self.request_count = 0 - def register_service(self, route: str, service: str): + def register_service(self, route: Union[str, None], service: str): """Create an entry in the routing table Args: @@ -184,12 +186,27 @@ class RoutingTable: """ logger.debug("[KV] Registering route {} to service {}.".format( route, service)) - self.routing_table.put(route, service) - def list_service(self): - """Returns the routing table.""" - self.request_count += 1 + # put no route services in default key + if route is None: + no_http_services = json.loads( + self.routing_table.get(NO_ROUTE_KEY, "[]")) + no_http_services.append(service) + self.routing_table.put(NO_ROUTE_KEY, json.dumps(no_http_services)) + else: + self.routing_table.put(route, service) + + def list_service(self, include_headless=False): + """Returns the routing table. + Args: + include_headless: If True, returns a no route services (headless) + services with normal services. (Default: False) + """ table = self.routing_table.as_dict() + if include_headless: + table[NO_ROUTE_KEY] = json.loads(table.get(NO_ROUTE_KEY, "[]")) + else: + table.pop(NO_ROUTE_KEY, None) return table def get_request_count(self): diff --git a/python/ray/experimental/serve/tests/test_api.py b/python/ray/experimental/serve/tests/test_api.py index f756c9423..c432dd35f 100644 --- a/python/ray/experimental/serve/tests/test_api.py +++ b/python/ray/experimental/serve/tests/test_api.py @@ -5,6 +5,7 @@ import requests from ray.experimental import serve from ray.experimental.serve import BackendConfig import ray +from ray.experimental.serve.constants import NO_ROUTE_KEY def test_e2e(serve_instance): @@ -37,6 +38,26 @@ def test_e2e(serve_instance): assert resp == "OK" +def test_no_route(serve_instance): + serve.create_endpoint("noroute-endpoint", blocking=True) + global_state = serve.api._get_global_state() + + result = global_state.route_table.list_service(include_headless=True) + assert result[NO_ROUTE_KEY] == ["noroute-endpoint"] + + without_headless_result = global_state.route_table.list_service() + assert NO_ROUTE_KEY not in without_headless_result + + def func(_, i=1): + return 1 + + serve.create_backend(func, "backend:1") + serve.link("noroute-endpoint", "backend:1") + service_handle = serve.get_handle("noroute-endpoint") + result = ray.get(service_handle.remote(i=1)) + assert result == 1 + + def test_scaling_replicas(serve_instance): class Counter: def __init__(self): diff --git a/python/ray/experimental/serve/utils.py b/python/ray/experimental/serve/utils.py index 0cbb377b1..42283d06c 100644 --- a/python/ray/experimental/serve/utils.py +++ b/python/ray/experimental/serve/utils.py @@ -10,6 +10,21 @@ import requests from pygments import formatters, highlight, lexers from ray.experimental.serve.context import FakeFlaskRequest, TaskContext from ray.experimental.serve.http_util import build_flask_request +import itertools + + +def expand(l): + """ + Implements a nested flattening of a list. + Example: + >>> serve.utils.expand([1,2,[3,4,5],6]) + [1,2,3,4,5,6] + >>> serve.utils.expand(["a", ["b", "c"], "d", ["e", "f"]]) + ["a", "b", "c", "d", "e", "f"] + """ + return list( + itertools.chain.from_iterable( + [x if isinstance(x, list) else [x] for x in l])) def parse_request_item(request_item):