mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 09:24:28 +08:00
[Serve] Parametrize tests for pydantic backend (#10559)
This commit is contained in:
@@ -157,7 +157,8 @@ def test_set_traffic_missing_data(serve_instance):
|
||||
client.set_traffic("nonexistent_endpoint_name", {backend_name: 1.0})
|
||||
|
||||
|
||||
def test_scaling_replicas(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_scaling_replicas(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
class Counter:
|
||||
@@ -168,8 +169,11 @@ def test_scaling_replicas(serve_instance):
|
||||
self.count += 1
|
||||
return self.count
|
||||
|
||||
client.create_backend(
|
||||
"counter:v1", Counter, config=BackendConfig(num_replicas=2))
|
||||
config = {
|
||||
"num_replicas": 2
|
||||
} if use_legacy_config else BackendConfig(num_replicas=2)
|
||||
client.create_backend("counter:v1", Counter, config=config)
|
||||
|
||||
client.create_endpoint("counter", backend="counter:v1", route="/increment")
|
||||
|
||||
# Keep checking the routing table until /increment is populated
|
||||
@@ -185,7 +189,10 @@ def test_scaling_replicas(serve_instance):
|
||||
# If the load is shared among two replicas. The max result cannot be 10.
|
||||
assert max(counter_result) < 10
|
||||
|
||||
client.update_backend_config("counter:v1", {"num_replicas": 1})
|
||||
update_config = {
|
||||
"num_replicas": 1
|
||||
} if use_legacy_config else BackendConfig(num_replicas=1)
|
||||
client.update_backend_config("counter:v1", update_config)
|
||||
|
||||
counter_result = []
|
||||
for _ in range(10):
|
||||
@@ -196,45 +203,8 @@ def test_scaling_replicas(serve_instance):
|
||||
assert max(counter_result) - min(counter_result) > 6
|
||||
|
||||
|
||||
def test_scaling_replicas_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
class Counter:
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
|
||||
def __call__(self, _):
|
||||
self.count += 1
|
||||
return self.count
|
||||
|
||||
client.create_backend("counter:v1", Counter, config={"num_replicas": 2})
|
||||
client.create_endpoint("counter", backend="counter:v1", route="/increment")
|
||||
|
||||
# Keep checking the routing table until /increment is populated
|
||||
while "/increment" not in requests.get(
|
||||
"http://127.0.0.1:8000/-/routes").json():
|
||||
time.sleep(0.2)
|
||||
|
||||
counter_result = []
|
||||
for _ in range(10):
|
||||
resp = requests.get("http://127.0.0.1:8000/increment").json()
|
||||
counter_result.append(resp)
|
||||
|
||||
# If the load is shared among two replicas. The max result cannot be 10.
|
||||
assert max(counter_result) < 10
|
||||
|
||||
client.update_backend_config("counter:v1", {"num_replicas": 1})
|
||||
|
||||
counter_result = []
|
||||
for _ in range(10):
|
||||
resp = requests.get("http://127.0.0.1:8000/increment").json()
|
||||
counter_result.append(resp)
|
||||
# Give some time for a replica to spin down. But majority of the request
|
||||
# should be served by the only remaining replica.
|
||||
assert max(counter_result) - min(counter_result) > 6
|
||||
|
||||
|
||||
def test_batching(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_batching(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
class BatchingExample:
|
||||
@@ -248,10 +218,12 @@ def test_batching(serve_instance):
|
||||
return [self.count] * batch_size
|
||||
|
||||
# set the max batch size
|
||||
client.create_backend(
|
||||
"counter:v11",
|
||||
BatchingExample,
|
||||
config=BackendConfig(max_batch_size=5, batch_wait_timeout=1))
|
||||
config = {
|
||||
"max_batch_size": 5,
|
||||
"batch_wait_timeout": 1
|
||||
} if use_legacy_config else BackendConfig(
|
||||
max_batch_size=5, batch_wait_timeout=1)
|
||||
client.create_backend("counter:v11", BatchingExample, config=config)
|
||||
client.create_endpoint(
|
||||
"counter1", backend="counter:v11", route="/increment2")
|
||||
|
||||
@@ -273,48 +245,8 @@ def test_batching(serve_instance):
|
||||
assert max(counter_result) < 20
|
||||
|
||||
|
||||
def test_batching_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
class BatchingExample:
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
|
||||
@serve.accept_batch
|
||||
def __call__(self, request):
|
||||
self.count += 1
|
||||
return [self.count] * len(request)
|
||||
|
||||
# set the max batch size
|
||||
client.create_backend(
|
||||
"counter:v11",
|
||||
BatchingExample,
|
||||
config={
|
||||
"max_batch_size": 5,
|
||||
"batch_wait_timeout": 1
|
||||
})
|
||||
client.create_endpoint(
|
||||
"counter1", backend="counter:v11", route="/increment2")
|
||||
|
||||
# Keep checking the routing table until /increment is populated
|
||||
while "/increment2" not in requests.get(
|
||||
"http://127.0.0.1:8000/-/routes").json():
|
||||
time.sleep(0.2)
|
||||
|
||||
future_list = []
|
||||
handle = client.get_handle("counter1")
|
||||
for _ in range(20):
|
||||
f = handle.remote()
|
||||
future_list.append(f)
|
||||
|
||||
counter_result = ray.get(future_list)
|
||||
# since count is only updated per batch of queries
|
||||
# If there atleast one __call__ fn call with batch size greater than 1
|
||||
# counter result will always be less than 20
|
||||
assert max(counter_result) < 20
|
||||
|
||||
|
||||
def test_batching_exception(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_batching_exception(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
class NoListReturned:
|
||||
@@ -326,8 +258,10 @@ def test_batching_exception(serve_instance):
|
||||
return len(requests)
|
||||
|
||||
# set the max batch size
|
||||
client.create_backend(
|
||||
"exception:v1", NoListReturned, config=BackendConfig(max_batch_size=5))
|
||||
config = {
|
||||
"max_batch_size": 5
|
||||
} if use_legacy_config else BackendConfig(max_batch_size=5)
|
||||
client.create_backend("exception:v1", NoListReturned, config=config)
|
||||
client.create_endpoint(
|
||||
"exception-test", backend="exception:v1", route="/noListReturned")
|
||||
|
||||
@@ -336,30 +270,8 @@ def test_batching_exception(serve_instance):
|
||||
assert ray.get(handle.remote(temp=1))
|
||||
|
||||
|
||||
def test_batching_exception_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
class NoListReturned:
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
|
||||
@serve.accept_batch
|
||||
def __call__(self, flask_request, temp=None):
|
||||
batch_size = serve.context.batch_size
|
||||
return batch_size
|
||||
|
||||
# set the max batch size
|
||||
client.create_backend(
|
||||
"exception:v1", NoListReturned, config={"max_batch_size": 5})
|
||||
client.create_endpoint(
|
||||
"exception-test", backend="exception:v1", route="/noListReturned")
|
||||
|
||||
handle = client.get_handle("exception-test")
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
assert ray.get(handle.remote(temp=1))
|
||||
|
||||
|
||||
def test_updating_config(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_updating_config(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
class BatchSimple:
|
||||
@@ -370,55 +282,22 @@ def test_updating_config(serve_instance):
|
||||
def __call__(self, request):
|
||||
return [1] * len(request)
|
||||
|
||||
client.create_backend(
|
||||
"bsimple:v1",
|
||||
BatchSimple,
|
||||
config=BackendConfig(max_batch_size=2, num_replicas=3))
|
||||
config = {
|
||||
"max_batch_size": 2,
|
||||
"num_replicas": 3
|
||||
} if use_legacy_config else BackendConfig(
|
||||
max_batch_size=2, num_replicas=3)
|
||||
client.create_backend("bsimple:v1", BatchSimple, config=config)
|
||||
client.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple")
|
||||
|
||||
controller = client._controller
|
||||
old_replica_tag_list = ray.get(
|
||||
controller._list_replicas.remote("bsimple:v1"))
|
||||
|
||||
client.update_backend_config("bsimple:v1", BackendConfig(max_batch_size=5))
|
||||
new_replica_tag_list = ray.get(
|
||||
controller._list_replicas.remote("bsimple:v1"))
|
||||
new_all_tag_list = []
|
||||
for worker_dict in ray.get(
|
||||
controller.get_all_worker_handles.remote()).values():
|
||||
new_all_tag_list.extend(list(worker_dict.keys()))
|
||||
|
||||
# the old and new replica tag list should be identical
|
||||
# and should be subset of all_tag_list
|
||||
assert set(old_replica_tag_list) <= set(new_all_tag_list)
|
||||
assert set(old_replica_tag_list) == set(new_replica_tag_list)
|
||||
|
||||
|
||||
def test_updating_config_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
class BatchSimple:
|
||||
def __init__(self):
|
||||
self.count = 0
|
||||
|
||||
@serve.accept_batch
|
||||
def __call__(self, request):
|
||||
return [1] * len(request)
|
||||
|
||||
client.create_backend(
|
||||
"bsimple:v1",
|
||||
BatchSimple,
|
||||
config={
|
||||
"max_batch_size": 2,
|
||||
"num_replicas": 3
|
||||
})
|
||||
client.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple")
|
||||
|
||||
controller = client._controller
|
||||
old_replica_tag_list = ray.get(
|
||||
controller._list_replicas.remote("bsimple:v1"))
|
||||
|
||||
client.update_backend_config("bsimple:v1", {"max_batch_size": 5})
|
||||
update_config = {
|
||||
"max_batch_size": 5
|
||||
} if use_legacy_config else BackendConfig(max_batch_size=5)
|
||||
client.update_backend_config("bsimple:v1", update_config)
|
||||
new_replica_tag_list = ray.get(
|
||||
controller._list_replicas.remote("bsimple:v1"))
|
||||
new_all_tag_list = []
|
||||
@@ -587,7 +466,8 @@ def test_multiple_instances():
|
||||
client1.delete_backend(backend)
|
||||
|
||||
|
||||
def test_parallel_start(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_parallel_start(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
# Test the ability to start multiple replicas in parallel.
|
||||
@@ -618,47 +498,10 @@ def test_parallel_start(serve_instance):
|
||||
def __call__(self, _):
|
||||
return "Ready"
|
||||
|
||||
client.create_backend(
|
||||
"p:v0", LongStartingServable, config=BackendConfig(num_replicas=2))
|
||||
client.create_endpoint("test-parallel", backend="p:v0")
|
||||
handle = client.get_handle("test-parallel")
|
||||
|
||||
ray.get(handle.remote(), timeout=10)
|
||||
|
||||
|
||||
def test_parallel_start_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
# Test the ability to start multiple replicas in parallel.
|
||||
# In the past, when Serve scale up a backend, it does so one by one and
|
||||
# wait for each replica to initialize. This test avoid this by preventing
|
||||
# the first replica to finish initialization unless the second replica is
|
||||
# also started.
|
||||
@ray.remote
|
||||
class Barrier:
|
||||
def __init__(self, release_on):
|
||||
self.release_on = release_on
|
||||
self.current_waiters = 0
|
||||
self.event = asyncio.Event()
|
||||
|
||||
async def wait(self):
|
||||
self.current_waiters += 1
|
||||
if self.current_waiters == self.release_on:
|
||||
self.event.set()
|
||||
else:
|
||||
await self.event.wait()
|
||||
|
||||
barrier = Barrier.remote(release_on=2)
|
||||
|
||||
class LongStartingServable:
|
||||
def __init__(self):
|
||||
ray.get(barrier.wait.remote(), timeout=10)
|
||||
|
||||
def __call__(self, _):
|
||||
return "Ready"
|
||||
|
||||
client.create_backend(
|
||||
"p:v0", LongStartingServable, config={"num_replicas": 2})
|
||||
config = {
|
||||
"num_replicas": 2
|
||||
} if use_legacy_config else BackendConfig(num_replicas=2)
|
||||
client.create_backend("p:v0", LongStartingServable, config=config)
|
||||
client.create_endpoint("test-parallel", backend="p:v0")
|
||||
handle = client.get_handle("test-parallel")
|
||||
|
||||
@@ -709,48 +552,27 @@ def test_list_endpoints(serve_instance):
|
||||
assert len(client.list_endpoints()) == 0
|
||||
|
||||
|
||||
def test_list_backends(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_list_backends(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
@serve.accept_batch
|
||||
def f():
|
||||
pass
|
||||
|
||||
client.create_backend(
|
||||
"backend", f, config=BackendConfig(max_batch_size=10))
|
||||
config1 = {
|
||||
"max_batch_size": 10
|
||||
} if use_legacy_config else BackendConfig(max_batch_size=10)
|
||||
client.create_backend("backend", f, config=config1)
|
||||
backends = client.list_backends()
|
||||
assert len(backends) == 1
|
||||
assert "backend" in backends
|
||||
assert backends["backend"]["max_batch_size"] == 10
|
||||
|
||||
client.create_backend("backend2", f, config=BackendConfig(num_replicas=10))
|
||||
backends = client.list_backends()
|
||||
assert len(backends) == 2
|
||||
assert backends["backend2"]["num_replicas"] == 10
|
||||
|
||||
client.delete_backend("backend")
|
||||
backends = client.list_backends()
|
||||
assert len(backends) == 1
|
||||
assert "backend2" in backends
|
||||
|
||||
client.delete_backend("backend2")
|
||||
assert len(client.list_backends()) == 0
|
||||
|
||||
|
||||
def test_list_backends_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
@serve.accept_batch
|
||||
def f():
|
||||
pass
|
||||
|
||||
client.create_backend("backend", f, config={"max_batch_size": 10})
|
||||
backends = client.list_backends()
|
||||
assert len(backends) == 1
|
||||
assert "backend" in backends
|
||||
assert backends["backend"]["max_batch_size"] == 10
|
||||
|
||||
client.create_backend("backend2", f, config={"num_replicas": 10})
|
||||
config2 = {
|
||||
"num_replicas": 10
|
||||
} if use_legacy_config else BackendConfig(num_replicas=10)
|
||||
client.create_backend("backend2", f, config=config2)
|
||||
backends = client.list_backends()
|
||||
assert len(backends) == 2
|
||||
assert backends["backend2"]["num_replicas"] == 10
|
||||
@@ -780,7 +602,8 @@ def test_endpoint_input_validation(serve_instance):
|
||||
client.create_endpoint("endpoint", backend="backend")
|
||||
|
||||
|
||||
def test_create_infeasible_error(serve_instance):
|
||||
@pytest.mark.parametrize("use_legacy_config", [False, True])
|
||||
def test_create_infeasible_error(serve_instance, use_legacy_config):
|
||||
client = serve_instance
|
||||
|
||||
def f():
|
||||
@@ -797,6 +620,10 @@ def test_create_infeasible_error(serve_instance):
|
||||
|
||||
# Even each replica might be feasible, the total might not be.
|
||||
current_cpus = int(ray.nodes()[0]["Resources"]["CPU"])
|
||||
num_replicas = current_cpus + 20
|
||||
config = {
|
||||
"num_replicas": num_replicas
|
||||
} if use_legacy_config else BackendConfig(num_replicas=num_replicas)
|
||||
with pytest.raises(RayServeException, match="Cannot scale backend"):
|
||||
client.create_backend(
|
||||
"f:1",
|
||||
@@ -804,38 +631,7 @@ def test_create_infeasible_error(serve_instance):
|
||||
ray_actor_options={"resources": {
|
||||
"CPU": 1,
|
||||
}},
|
||||
config=BackendConfig(num_replicas=(current_cpus + 20)))
|
||||
|
||||
# No replica should be created!
|
||||
replicas = ray.get(client._controller._list_replicas.remote("f1"))
|
||||
assert len(replicas) == 0
|
||||
|
||||
|
||||
def test_create_infeasible_error_legacy(serve_instance):
|
||||
client = serve_instance
|
||||
|
||||
def f():
|
||||
pass
|
||||
|
||||
# Non existent resource should be infeasible.
|
||||
with pytest.raises(RayServeException, match="Cannot scale backend"):
|
||||
client.create_backend(
|
||||
"f:1",
|
||||
f,
|
||||
ray_actor_options={"resources": {
|
||||
"MagicMLResource": 100
|
||||
}})
|
||||
|
||||
# Even each replica might be feasible, the total might not be.
|
||||
current_cpus = int(ray.nodes()[0]["Resources"]["CPU"])
|
||||
with pytest.raises(RayServeException, match="Cannot scale backend"):
|
||||
client.create_backend(
|
||||
"f:1",
|
||||
f,
|
||||
ray_actor_options={"resources": {
|
||||
"CPU": 1,
|
||||
}},
|
||||
config={"num_replicas": current_cpus + 20})
|
||||
config=config)
|
||||
|
||||
# No replica should be created!
|
||||
replicas = ray.get(client._controller._list_replicas.remote("f1"))
|
||||
|
||||
Reference in New Issue
Block a user