mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 15:43:55 +08:00
[Serve] UI Improvements (#7569)
This commit is contained in:
+45
-20
@@ -14,7 +14,7 @@ from ray.serve.kv_store_service import SQLiteKVStore
|
||||
from ray.serve.task_runner import RayServeMixin, TaskRunnerActor
|
||||
from ray.serve.utils import (block_until_http_ready, get_random_letters,
|
||||
expand)
|
||||
from ray.serve.exceptions import RayServeException
|
||||
from ray.serve.exceptions import RayServeException, batch_annotation_not_found
|
||||
from ray.serve.backend_config import BackendConfig
|
||||
from ray.serve.policy import RoutePolicy
|
||||
from ray.serve.queues import Query
|
||||
@@ -175,8 +175,12 @@ def set_backend_config(backend_tag, backend_config):
|
||||
BackendConfig), ("backend_config must be"
|
||||
" of instance BackendConfig")
|
||||
backend_config_dict = dict(backend_config)
|
||||
|
||||
old_backend_config_dict = global_state.backend_table.get_info(backend_tag)
|
||||
|
||||
if (not old_backend_config_dict["has_accept_batch_annotation"]
|
||||
and backend_config.max_batch_size is not None):
|
||||
raise batch_annotation_not_found
|
||||
|
||||
global_state.backend_table.register_info(backend_tag, backend_config_dict)
|
||||
|
||||
# inform the router about change in configuration
|
||||
@@ -194,10 +198,10 @@ def set_backend_config(backend_tag, backend_config):
|
||||
for k in BackendConfig.restart_on_change_fields)
|
||||
if need_to_restart_replicas:
|
||||
# kill all the replicas for restarting with new configurations
|
||||
scale(backend_tag, 0)
|
||||
_scale(backend_tag, 0)
|
||||
|
||||
# scale the replicas with new configuration
|
||||
scale(backend_tag, backend_config_dict["num_replicas"])
|
||||
_scale(backend_tag, backend_config_dict["num_replicas"])
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
@@ -213,11 +217,18 @@ def get_backend_config(backend_tag):
|
||||
return BackendConfig(**backend_config_dict)
|
||||
|
||||
|
||||
def _backend_accept_batch(func_or_class):
|
||||
if inspect.isfunction(func_or_class):
|
||||
return hasattr(func_or_class, "serve_accept_batch")
|
||||
elif inspect.isclass(func_or_class):
|
||||
return hasattr(func_or_class.__call__, "serve_accept_batch")
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def create_backend(func_or_class,
|
||||
backend_tag,
|
||||
*actor_init_args,
|
||||
backend_config=BackendConfig()):
|
||||
backend_config=None):
|
||||
"""Create a backend using func_or_class and assign backend_tag.
|
||||
|
||||
Args:
|
||||
@@ -230,33 +241,28 @@ def create_backend(func_or_class,
|
||||
*actor_init_args (optional): the argument to pass to the class
|
||||
initialization method.
|
||||
"""
|
||||
# Configure backend_config
|
||||
if backend_config is None:
|
||||
backend_config = BackendConfig()
|
||||
assert isinstance(backend_config,
|
||||
BackendConfig), ("backend_config must be"
|
||||
" of instance BackendConfig")
|
||||
backend_config_dict = dict(backend_config)
|
||||
|
||||
# Make sure the batch size is correct
|
||||
should_accept_batch = (True if backend_config.max_batch_size is not None
|
||||
else False)
|
||||
batch_annotation_not_found = RayServeException(
|
||||
"max_batch_size is set in config but the function or method does not "
|
||||
"accept batching. Please use @serve.accept_batch to explicitly mark "
|
||||
"the function or method as batchable and takes in list as arguments.")
|
||||
if should_accept_batch and not _backend_accept_batch(func_or_class):
|
||||
raise batch_annotation_not_found
|
||||
if _backend_accept_batch(func_or_class):
|
||||
backend_config.has_accept_batch_annotation = True
|
||||
|
||||
arg_list = []
|
||||
if inspect.isfunction(func_or_class):
|
||||
if should_accept_batch and not hasattr(func_or_class,
|
||||
"serve_accept_batch"):
|
||||
raise batch_annotation_not_found
|
||||
|
||||
# arg list for a fn is function itself
|
||||
arg_list = [func_or_class]
|
||||
# ignore lint on lambda expression
|
||||
creator = lambda kwrgs: TaskRunnerActor._remote(**kwrgs) # noqa: E731
|
||||
elif inspect.isclass(func_or_class):
|
||||
if should_accept_batch and not hasattr(func_or_class.__call__,
|
||||
"serve_accept_batch"):
|
||||
raise batch_annotation_not_found
|
||||
|
||||
# Python inheritance order is right-to-left. We put RayServeMixin
|
||||
# on the left to make sure its methods are not overriden.
|
||||
@ray.remote
|
||||
@@ -271,6 +277,8 @@ def create_backend(func_or_class,
|
||||
"Backend must be a function or class, it is {}.".format(
|
||||
type(func_or_class)))
|
||||
|
||||
backend_config_dict = dict(backend_config)
|
||||
|
||||
# save creator which starts replicas
|
||||
global_state.backend_table.register_backend(backend_tag, creator)
|
||||
|
||||
@@ -284,7 +292,7 @@ def create_backend(func_or_class,
|
||||
# particularly for max-batch-size
|
||||
ray.get(global_state.init_or_get_router().set_backend_config.remote(
|
||||
backend_tag, backend_config_dict))
|
||||
scale(backend_tag, backend_config_dict["num_replicas"])
|
||||
_scale(backend_tag, backend_config_dict["num_replicas"])
|
||||
|
||||
|
||||
def _start_replica(backend_tag):
|
||||
@@ -344,7 +352,7 @@ def _remove_replica(backend_tag):
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def scale(backend_tag, num_replicas):
|
||||
def _scale(backend_tag, num_replicas):
|
||||
"""Set the number of replicas for backend_tag.
|
||||
|
||||
Args:
|
||||
@@ -462,6 +470,21 @@ def stat(percentiles=[50, 90, 95],
|
||||
|
||||
|
||||
class route:
|
||||
"""Convient method to create a backend and link to service.
|
||||
|
||||
When called, the following will happen:
|
||||
- An endpoint is created with the same of the function
|
||||
- A backend is created and instantiate the function
|
||||
- The endpoint and backend are linked together
|
||||
- The handle is returned
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@serve.route("/path")
|
||||
def my_handler(flask_request):
|
||||
...
|
||||
"""
|
||||
|
||||
def __init__(self, url_route):
|
||||
self.route = url_route
|
||||
|
||||
@@ -472,3 +495,5 @@ class route:
|
||||
create_backend(func_or_class, backend_tag)
|
||||
create_endpoint(name, self.route)
|
||||
link(name, backend_tag)
|
||||
|
||||
return get_handle(name)
|
||||
|
||||
@@ -4,7 +4,9 @@ from copy import deepcopy
|
||||
class BackendConfig:
|
||||
# configs not needed for actor creation when
|
||||
# instantiating a replica
|
||||
_serve_configs = ["_num_replicas", "max_batch_size"]
|
||||
_serve_configs = [
|
||||
"_num_replicas", "max_batch_size", "has_accept_batch_annotation"
|
||||
]
|
||||
|
||||
# configs which when changed leads to restarting
|
||||
# the existing replicas.
|
||||
@@ -17,10 +19,13 @@ class BackendConfig:
|
||||
num_cpus=None,
|
||||
num_gpus=None,
|
||||
memory=None,
|
||||
object_store_memory=None):
|
||||
object_store_memory=None,
|
||||
has_accept_batch_annotation=False):
|
||||
"""
|
||||
Class for defining backend configuration.
|
||||
"""
|
||||
# backend metadata
|
||||
self.has_accept_batch_annotation = has_accept_batch_annotation
|
||||
|
||||
# serve configs
|
||||
self.num_replicas = num_replicas
|
||||
|
||||
@@ -1,2 +1,8 @@
|
||||
class RayServeException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
batch_annotation_not_found = RayServeException(
|
||||
"max_batch_size is set in config but the function or method does not "
|
||||
"accept batching. Please use @serve.accept_batch to explicitly mark "
|
||||
"the function or method as batchable and takes in list as arguments.")
|
||||
|
||||
@@ -78,19 +78,39 @@ class RayServeHandle:
|
||||
relative_slo_ms, absolute_slo_ms)
|
||||
|
||||
def get_traffic_policy(self):
|
||||
# TODO(simon): This method is implemented via checking global state
|
||||
# because we are sure handle and global_state are in the same process.
|
||||
# However, once global_state is deprecated, this method need to be
|
||||
# updated accordingly.
|
||||
history = serve.global_state.policy_action_history[self.endpoint_name]
|
||||
if len(history):
|
||||
return history[-1]
|
||||
else:
|
||||
return None
|
||||
policy_table = serve.api._get_global_state().policy_table
|
||||
all_services = policy_table.list_traffic_policy()
|
||||
return all_services[self.endpoint_name]
|
||||
|
||||
def get_http_endpoint(self):
|
||||
return DEFAULT_HTTP_ADDRESS
|
||||
|
||||
def _ensure_backend_unique(self, backend_tag=None):
|
||||
traffic_policy = self.get_traffic_policy()
|
||||
if backend_tag is None:
|
||||
assert len(traffic_policy) == 1, (
|
||||
"Multiple backends detected. "
|
||||
"Please pass in backend_tag=... argument to specify backend.")
|
||||
backends = set(traffic_policy.keys())
|
||||
return backends.pop()
|
||||
else:
|
||||
assert backend_tag in traffic_policy, (
|
||||
"Backend {} not found in avaiable backends: {}.".format(
|
||||
backend_tag, list(traffic_policy.keys())))
|
||||
return backend_tag
|
||||
|
||||
def scale(self, new_num_replicas, backend_tag=None):
|
||||
backend_tag = self._ensure_backend_unique(backend_tag)
|
||||
config = serve.get_backend_config(backend_tag)
|
||||
config.num_replicas = new_num_replicas
|
||||
serve.set_backend_config(backend_tag, config)
|
||||
|
||||
def set_max_batch_size(self, new_max_batch_size, backend_tag=None):
|
||||
backend_tag = self._ensure_backend_unique(backend_tag)
|
||||
config = serve.get_backend_config(backend_tag)
|
||||
config.max_batch_size = new_max_batch_size
|
||||
serve.set_backend_config(backend_tag, config)
|
||||
|
||||
def __repr__(self):
|
||||
return """
|
||||
RayServeHandle(
|
||||
|
||||
@@ -279,5 +279,5 @@ class TrafficPolicyTable:
|
||||
def list_traffic_policy(self):
|
||||
return {
|
||||
service: json.loads(policy)
|
||||
for service, policy in self.traffic_policy_table.as_dict()
|
||||
for service, policy in self.traffic_policy_table.as_dict().items()
|
||||
}
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
import json
|
||||
|
||||
import click
|
||||
|
||||
import ray
|
||||
import ray.serve as serve
|
||||
|
||||
|
||||
@click.group("serve", help="Commands working with ray serve")
|
||||
def serve_cli():
|
||||
pass
|
||||
|
||||
|
||||
@serve_cli.command(help="Initialize ray serve components")
|
||||
def init():
|
||||
ray.init(address="auto")
|
||||
serve.init(blocking=True)
|
||||
|
||||
|
||||
@serve_cli.command(help="Split traffic for a endpoint")
|
||||
@click.argument("endpoint", required=True, type=str)
|
||||
# TODO(simon): Make traffic dictionary more ergonomic. e.g.
|
||||
# --traffic backend1=0.5 --traffic backend2=0.5
|
||||
@click.option(
|
||||
"--traffic",
|
||||
required=True,
|
||||
type=str,
|
||||
help="Traffic dictionary in JSON format")
|
||||
def split(endpoint, traffic):
|
||||
ray.init(address="auto")
|
||||
serve.init()
|
||||
|
||||
serve.split(endpoint, json.loads(traffic))
|
||||
|
||||
|
||||
@serve_cli.command(help="Scale the number of replicas for a backend")
|
||||
@click.argument("backend", required=True, type=str)
|
||||
@click.option(
|
||||
"--num-replicas",
|
||||
required=True,
|
||||
type=int,
|
||||
help="New number of replicas to set")
|
||||
def scale(backend_tag, num_replicas):
|
||||
if num_replicas <= 0:
|
||||
click.Abort(
|
||||
"Cannot set number of replicas to be smaller or equal to 0.")
|
||||
ray.init(address="auto")
|
||||
serve.init()
|
||||
|
||||
serve.scale(backend_tag, num_replicas)
|
||||
@@ -6,6 +6,23 @@ from ray import serve
|
||||
from ray.serve import BackendConfig
|
||||
import ray
|
||||
from ray.serve.constants import NO_ROUTE_KEY
|
||||
from ray.serve.exceptions import RayServeException
|
||||
from ray.serve.handle import RayServeHandle
|
||||
|
||||
|
||||
def test_route_decorator(serve_instance):
|
||||
@serve.route("/hello_world")
|
||||
def hello_world(_):
|
||||
return ""
|
||||
|
||||
assert isinstance(hello_world, RayServeHandle)
|
||||
|
||||
hello_world.scale(2)
|
||||
assert serve.get_backend_config("hello_world:v0").num_replicas == 2
|
||||
|
||||
with pytest.raises(
|
||||
RayServeException, match="method does not accept batching"):
|
||||
hello_world.set_max_batch_size(2)
|
||||
|
||||
|
||||
def test_e2e(serve_instance):
|
||||
|
||||
Reference in New Issue
Block a user