From ce0885a8975307594ebb97c28303ba3e949e880f Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 16 Mar 2020 22:23:16 -0700 Subject: [PATCH] [Serve] UI Improvements (#7569) --- python/ray/serve/api.py | 65 +++++++++++++++++++--------- python/ray/serve/backend_config.py | 9 +++- python/ray/serve/exceptions.py | 6 +++ python/ray/serve/handle.py | 38 ++++++++++++---- python/ray/serve/kv_store_service.py | 2 +- python/ray/serve/scripts.py | 50 --------------------- python/ray/serve/tests/test_api.py | 17 ++++++++ 7 files changed, 105 insertions(+), 82 deletions(-) delete mode 100644 python/ray/serve/scripts.py diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index ccf033d62..87ebaa076 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -14,7 +14,7 @@ from ray.serve.kv_store_service import SQLiteKVStore from ray.serve.task_runner import RayServeMixin, TaskRunnerActor from ray.serve.utils import (block_until_http_ready, get_random_letters, expand) -from ray.serve.exceptions import RayServeException +from ray.serve.exceptions import RayServeException, batch_annotation_not_found from ray.serve.backend_config import BackendConfig from ray.serve.policy import RoutePolicy from ray.serve.queues import Query @@ -175,8 +175,12 @@ def set_backend_config(backend_tag, backend_config): BackendConfig), ("backend_config must be" " of instance BackendConfig") backend_config_dict = dict(backend_config) - old_backend_config_dict = global_state.backend_table.get_info(backend_tag) + + if (not old_backend_config_dict["has_accept_batch_annotation"] + and backend_config.max_batch_size is not None): + raise batch_annotation_not_found + global_state.backend_table.register_info(backend_tag, backend_config_dict) # inform the router about change in configuration @@ -194,10 +198,10 @@ def set_backend_config(backend_tag, backend_config): for k in BackendConfig.restart_on_change_fields) if need_to_restart_replicas: # kill all the replicas for restarting with new configurations - scale(backend_tag, 0) + _scale(backend_tag, 0) # scale the replicas with new configuration - scale(backend_tag, backend_config_dict["num_replicas"]) + _scale(backend_tag, backend_config_dict["num_replicas"]) @_ensure_connected @@ -213,11 +217,18 @@ def get_backend_config(backend_tag): return BackendConfig(**backend_config_dict) +def _backend_accept_batch(func_or_class): + if inspect.isfunction(func_or_class): + return hasattr(func_or_class, "serve_accept_batch") + elif inspect.isclass(func_or_class): + return hasattr(func_or_class.__call__, "serve_accept_batch") + + @_ensure_connected def create_backend(func_or_class, backend_tag, *actor_init_args, - backend_config=BackendConfig()): + backend_config=None): """Create a backend using func_or_class and assign backend_tag. Args: @@ -230,33 +241,28 @@ def create_backend(func_or_class, *actor_init_args (optional): the argument to pass to the class initialization method. """ + # Configure backend_config + if backend_config is None: + backend_config = BackendConfig() assert isinstance(backend_config, BackendConfig), ("backend_config must be" " of instance BackendConfig") - backend_config_dict = dict(backend_config) + # Make sure the batch size is correct should_accept_batch = (True if backend_config.max_batch_size is not None else False) - batch_annotation_not_found = RayServeException( - "max_batch_size is set in config but the function or method does not " - "accept batching. Please use @serve.accept_batch to explicitly mark " - "the function or method as batchable and takes in list as arguments.") + if should_accept_batch and not _backend_accept_batch(func_or_class): + raise batch_annotation_not_found + if _backend_accept_batch(func_or_class): + backend_config.has_accept_batch_annotation = True arg_list = [] if inspect.isfunction(func_or_class): - if should_accept_batch and not hasattr(func_or_class, - "serve_accept_batch"): - raise batch_annotation_not_found - # arg list for a fn is function itself arg_list = [func_or_class] # ignore lint on lambda expression creator = lambda kwrgs: TaskRunnerActor._remote(**kwrgs) # noqa: E731 elif inspect.isclass(func_or_class): - if should_accept_batch and not hasattr(func_or_class.__call__, - "serve_accept_batch"): - raise batch_annotation_not_found - # Python inheritance order is right-to-left. We put RayServeMixin # on the left to make sure its methods are not overriden. @ray.remote @@ -271,6 +277,8 @@ def create_backend(func_or_class, "Backend must be a function or class, it is {}.".format( type(func_or_class))) + backend_config_dict = dict(backend_config) + # save creator which starts replicas global_state.backend_table.register_backend(backend_tag, creator) @@ -284,7 +292,7 @@ def create_backend(func_or_class, # particularly for max-batch-size ray.get(global_state.init_or_get_router().set_backend_config.remote( backend_tag, backend_config_dict)) - scale(backend_tag, backend_config_dict["num_replicas"]) + _scale(backend_tag, backend_config_dict["num_replicas"]) def _start_replica(backend_tag): @@ -344,7 +352,7 @@ def _remove_replica(backend_tag): @_ensure_connected -def scale(backend_tag, num_replicas): +def _scale(backend_tag, num_replicas): """Set the number of replicas for backend_tag. Args: @@ -462,6 +470,21 @@ def stat(percentiles=[50, 90, 95], class route: + """Convient method to create a backend and link to service. + + When called, the following will happen: + - An endpoint is created with the same of the function + - A backend is created and instantiate the function + - The endpoint and backend are linked together + - The handle is returned + + .. code-block:: python + + @serve.route("/path") + def my_handler(flask_request): + ... + """ + def __init__(self, url_route): self.route = url_route @@ -472,3 +495,5 @@ class route: create_backend(func_or_class, backend_tag) create_endpoint(name, self.route) link(name, backend_tag) + + return get_handle(name) diff --git a/python/ray/serve/backend_config.py b/python/ray/serve/backend_config.py index d4cde75f5..7bd6f53fa 100644 --- a/python/ray/serve/backend_config.py +++ b/python/ray/serve/backend_config.py @@ -4,7 +4,9 @@ from copy import deepcopy class BackendConfig: # configs not needed for actor creation when # instantiating a replica - _serve_configs = ["_num_replicas", "max_batch_size"] + _serve_configs = [ + "_num_replicas", "max_batch_size", "has_accept_batch_annotation" + ] # configs which when changed leads to restarting # the existing replicas. @@ -17,10 +19,13 @@ class BackendConfig: num_cpus=None, num_gpus=None, memory=None, - object_store_memory=None): + object_store_memory=None, + has_accept_batch_annotation=False): """ Class for defining backend configuration. """ + # backend metadata + self.has_accept_batch_annotation = has_accept_batch_annotation # serve configs self.num_replicas = num_replicas diff --git a/python/ray/serve/exceptions.py b/python/ray/serve/exceptions.py index 7e1f957cf..82056d4bf 100644 --- a/python/ray/serve/exceptions.py +++ b/python/ray/serve/exceptions.py @@ -1,2 +1,8 @@ class RayServeException(Exception): pass + + +batch_annotation_not_found = RayServeException( + "max_batch_size is set in config but the function or method does not " + "accept batching. Please use @serve.accept_batch to explicitly mark " + "the function or method as batchable and takes in list as arguments.") diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index 3efce5308..937fbf002 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -78,19 +78,39 @@ class RayServeHandle: relative_slo_ms, absolute_slo_ms) def get_traffic_policy(self): - # TODO(simon): This method is implemented via checking global state - # because we are sure handle and global_state are in the same process. - # However, once global_state is deprecated, this method need to be - # updated accordingly. - history = serve.global_state.policy_action_history[self.endpoint_name] - if len(history): - return history[-1] - else: - return None + policy_table = serve.api._get_global_state().policy_table + all_services = policy_table.list_traffic_policy() + return all_services[self.endpoint_name] def get_http_endpoint(self): return DEFAULT_HTTP_ADDRESS + def _ensure_backend_unique(self, backend_tag=None): + traffic_policy = self.get_traffic_policy() + if backend_tag is None: + assert len(traffic_policy) == 1, ( + "Multiple backends detected. " + "Please pass in backend_tag=... argument to specify backend.") + backends = set(traffic_policy.keys()) + return backends.pop() + else: + assert backend_tag in traffic_policy, ( + "Backend {} not found in avaiable backends: {}.".format( + backend_tag, list(traffic_policy.keys()))) + return backend_tag + + def scale(self, new_num_replicas, backend_tag=None): + backend_tag = self._ensure_backend_unique(backend_tag) + config = serve.get_backend_config(backend_tag) + config.num_replicas = new_num_replicas + serve.set_backend_config(backend_tag, config) + + def set_max_batch_size(self, new_max_batch_size, backend_tag=None): + backend_tag = self._ensure_backend_unique(backend_tag) + config = serve.get_backend_config(backend_tag) + config.max_batch_size = new_max_batch_size + serve.set_backend_config(backend_tag, config) + def __repr__(self): return """ RayServeHandle( diff --git a/python/ray/serve/kv_store_service.py b/python/ray/serve/kv_store_service.py index 5960e0b97..f9287a92a 100644 --- a/python/ray/serve/kv_store_service.py +++ b/python/ray/serve/kv_store_service.py @@ -279,5 +279,5 @@ class TrafficPolicyTable: def list_traffic_policy(self): return { service: json.loads(policy) - for service, policy in self.traffic_policy_table.as_dict() + for service, policy in self.traffic_policy_table.as_dict().items() } diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py deleted file mode 100644 index 812ebf0dd..000000000 --- a/python/ray/serve/scripts.py +++ /dev/null @@ -1,50 +0,0 @@ -import json - -import click - -import ray -import ray.serve as serve - - -@click.group("serve", help="Commands working with ray serve") -def serve_cli(): - pass - - -@serve_cli.command(help="Initialize ray serve components") -def init(): - ray.init(address="auto") - serve.init(blocking=True) - - -@serve_cli.command(help="Split traffic for a endpoint") -@click.argument("endpoint", required=True, type=str) -# TODO(simon): Make traffic dictionary more ergonomic. e.g. -# --traffic backend1=0.5 --traffic backend2=0.5 -@click.option( - "--traffic", - required=True, - type=str, - help="Traffic dictionary in JSON format") -def split(endpoint, traffic): - ray.init(address="auto") - serve.init() - - serve.split(endpoint, json.loads(traffic)) - - -@serve_cli.command(help="Scale the number of replicas for a backend") -@click.argument("backend", required=True, type=str) -@click.option( - "--num-replicas", - required=True, - type=int, - help="New number of replicas to set") -def scale(backend_tag, num_replicas): - if num_replicas <= 0: - click.Abort( - "Cannot set number of replicas to be smaller or equal to 0.") - ray.init(address="auto") - serve.init() - - serve.scale(backend_tag, num_replicas) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 93ba92f6e..574ac29c4 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -6,6 +6,23 @@ from ray import serve from ray.serve import BackendConfig import ray from ray.serve.constants import NO_ROUTE_KEY +from ray.serve.exceptions import RayServeException +from ray.serve.handle import RayServeHandle + + +def test_route_decorator(serve_instance): + @serve.route("/hello_world") + def hello_world(_): + return "" + + assert isinstance(hello_world, RayServeHandle) + + hello_world.scale(2) + assert serve.get_backend_config("hello_world:v0").num_replicas == 2 + + with pytest.raises( + RayServeException, match="method does not accept batching"): + hello_world.set_max_batch_size(2) def test_e2e(serve_instance):