[serve] Support batches for ImportedBackends (#13843)

This commit is contained in:
Edward Oakes
2021-02-02 09:44:01 -06:00
committed by GitHub
parent d29fcfb45c
commit a6138ca31f
3 changed files with 17 additions and 6 deletions
+8
View File
@@ -1,3 +1,4 @@
from ray import serve
from ray.serve.utils import import_class
@@ -26,6 +27,13 @@ class ImportedBackend:
# proxy it manually.
return self.wrapped.reconfigure(*args, **kwargs)
# We mark 'accept_batch' here just so this will always pass the
# check we make during create_backend(). Unfortunately this means
# that validation won't happen until the replica is created.
@serve.accept_batch
def __call__(self, *args, **kwargs):
return self.wrapped(*args, **kwargs)
def __getattr__(self, attr):
"""Proxy all other methods to the wrapper class."""
return getattr(self.wrapped, attr)
@@ -7,7 +7,7 @@ def test_imported_backend(serve_instance):
client = serve_instance
backend_class = ImportedBackend("ray.serve.utils.MockImportedBackend")
config = BackendConfig(user_config="config")
config = BackendConfig(user_config="config", max_batch_size=2)
client.create_backend(
"imported", backend_class, "input_arg", config=config)
client.create_endpoint("imported", backend="imported")
+8 -5
View File
@@ -392,11 +392,14 @@ class MockImportedBackend:
def reconfigure(self, config):
self.config = config
def __call__(self, *args):
return {"arg": self.arg, "config": self.config}
def __call__(self, batch):
return [{
"arg": self.arg,
"config": self.config
} for _ in range(len(batch))]
async def other_method(self, request):
return await request.body()
async def other_method(self, batch):
return [await request.body() for request in batch]
def compute_iterable_delta(old: Iterable,
@@ -406,7 +409,7 @@ def compute_iterable_delta(old: Iterable,
Usage:
>>> old = {"a", "b"}
>>> new = {"a", "d"}
>>> compute_dict_delta(old, new)
>>> compute_iterable_delta(old, new)
({"d"}, {"b"}, {"a"})
"""
old_keys, new_keys = set(old), set(new)