From c4c0857107ee4c4b982667339e0e4c5d33d97e96 Mon Sep 17 00:00:00 2001 From: architkulkarni Date: Fri, 4 Sep 2020 17:24:24 -0700 Subject: [PATCH] [Serve] Parametrize tests for pydantic backend (#10559) --- python/ray/serve/tests/test_api.py | 324 ++++++----------------------- 1 file changed, 60 insertions(+), 264 deletions(-) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 26a88102b..da29f1bc3 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -157,7 +157,8 @@ def test_set_traffic_missing_data(serve_instance): client.set_traffic("nonexistent_endpoint_name", {backend_name: 1.0}) -def test_scaling_replicas(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_scaling_replicas(serve_instance, use_legacy_config): client = serve_instance class Counter: @@ -168,8 +169,11 @@ def test_scaling_replicas(serve_instance): self.count += 1 return self.count - client.create_backend( - "counter:v1", Counter, config=BackendConfig(num_replicas=2)) + config = { + "num_replicas": 2 + } if use_legacy_config else BackendConfig(num_replicas=2) + client.create_backend("counter:v1", Counter, config=config) + client.create_endpoint("counter", backend="counter:v1", route="/increment") # Keep checking the routing table until /increment is populated @@ -185,7 +189,10 @@ def test_scaling_replicas(serve_instance): # If the load is shared among two replicas. The max result cannot be 10. assert max(counter_result) < 10 - client.update_backend_config("counter:v1", {"num_replicas": 1}) + update_config = { + "num_replicas": 1 + } if use_legacy_config else BackendConfig(num_replicas=1) + client.update_backend_config("counter:v1", update_config) counter_result = [] for _ in range(10): @@ -196,45 +203,8 @@ def test_scaling_replicas(serve_instance): assert max(counter_result) - min(counter_result) > 6 -def test_scaling_replicas_legacy(serve_instance): - client = serve_instance - - class Counter: - def __init__(self): - self.count = 0 - - def __call__(self, _): - self.count += 1 - return self.count - - client.create_backend("counter:v1", Counter, config={"num_replicas": 2}) - client.create_endpoint("counter", backend="counter:v1", route="/increment") - - # Keep checking the routing table until /increment is populated - while "/increment" not in requests.get( - "http://127.0.0.1:8000/-/routes").json(): - time.sleep(0.2) - - counter_result = [] - for _ in range(10): - resp = requests.get("http://127.0.0.1:8000/increment").json() - counter_result.append(resp) - - # If the load is shared among two replicas. The max result cannot be 10. - assert max(counter_result) < 10 - - client.update_backend_config("counter:v1", {"num_replicas": 1}) - - counter_result = [] - for _ in range(10): - resp = requests.get("http://127.0.0.1:8000/increment").json() - counter_result.append(resp) - # Give some time for a replica to spin down. But majority of the request - # should be served by the only remaining replica. - assert max(counter_result) - min(counter_result) > 6 - - -def test_batching(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_batching(serve_instance, use_legacy_config): client = serve_instance class BatchingExample: @@ -248,10 +218,12 @@ def test_batching(serve_instance): return [self.count] * batch_size # set the max batch size - client.create_backend( - "counter:v11", - BatchingExample, - config=BackendConfig(max_batch_size=5, batch_wait_timeout=1)) + config = { + "max_batch_size": 5, + "batch_wait_timeout": 1 + } if use_legacy_config else BackendConfig( + max_batch_size=5, batch_wait_timeout=1) + client.create_backend("counter:v11", BatchingExample, config=config) client.create_endpoint( "counter1", backend="counter:v11", route="/increment2") @@ -273,48 +245,8 @@ def test_batching(serve_instance): assert max(counter_result) < 20 -def test_batching_legacy(serve_instance): - client = serve_instance - - class BatchingExample: - def __init__(self): - self.count = 0 - - @serve.accept_batch - def __call__(self, request): - self.count += 1 - return [self.count] * len(request) - - # set the max batch size - client.create_backend( - "counter:v11", - BatchingExample, - config={ - "max_batch_size": 5, - "batch_wait_timeout": 1 - }) - client.create_endpoint( - "counter1", backend="counter:v11", route="/increment2") - - # Keep checking the routing table until /increment is populated - while "/increment2" not in requests.get( - "http://127.0.0.1:8000/-/routes").json(): - time.sleep(0.2) - - future_list = [] - handle = client.get_handle("counter1") - for _ in range(20): - f = handle.remote() - future_list.append(f) - - counter_result = ray.get(future_list) - # since count is only updated per batch of queries - # If there atleast one __call__ fn call with batch size greater than 1 - # counter result will always be less than 20 - assert max(counter_result) < 20 - - -def test_batching_exception(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_batching_exception(serve_instance, use_legacy_config): client = serve_instance class NoListReturned: @@ -326,8 +258,10 @@ def test_batching_exception(serve_instance): return len(requests) # set the max batch size - client.create_backend( - "exception:v1", NoListReturned, config=BackendConfig(max_batch_size=5)) + config = { + "max_batch_size": 5 + } if use_legacy_config else BackendConfig(max_batch_size=5) + client.create_backend("exception:v1", NoListReturned, config=config) client.create_endpoint( "exception-test", backend="exception:v1", route="/noListReturned") @@ -336,30 +270,8 @@ def test_batching_exception(serve_instance): assert ray.get(handle.remote(temp=1)) -def test_batching_exception_legacy(serve_instance): - client = serve_instance - - class NoListReturned: - def __init__(self): - self.count = 0 - - @serve.accept_batch - def __call__(self, flask_request, temp=None): - batch_size = serve.context.batch_size - return batch_size - - # set the max batch size - client.create_backend( - "exception:v1", NoListReturned, config={"max_batch_size": 5}) - client.create_endpoint( - "exception-test", backend="exception:v1", route="/noListReturned") - - handle = client.get_handle("exception-test") - with pytest.raises(ray.exceptions.RayTaskError): - assert ray.get(handle.remote(temp=1)) - - -def test_updating_config(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_updating_config(serve_instance, use_legacy_config): client = serve_instance class BatchSimple: @@ -370,55 +282,22 @@ def test_updating_config(serve_instance): def __call__(self, request): return [1] * len(request) - client.create_backend( - "bsimple:v1", - BatchSimple, - config=BackendConfig(max_batch_size=2, num_replicas=3)) + config = { + "max_batch_size": 2, + "num_replicas": 3 + } if use_legacy_config else BackendConfig( + max_batch_size=2, num_replicas=3) + client.create_backend("bsimple:v1", BatchSimple, config=config) client.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple") controller = client._controller old_replica_tag_list = ray.get( controller._list_replicas.remote("bsimple:v1")) - client.update_backend_config("bsimple:v1", BackendConfig(max_batch_size=5)) - new_replica_tag_list = ray.get( - controller._list_replicas.remote("bsimple:v1")) - new_all_tag_list = [] - for worker_dict in ray.get( - controller.get_all_worker_handles.remote()).values(): - new_all_tag_list.extend(list(worker_dict.keys())) - - # the old and new replica tag list should be identical - # and should be subset of all_tag_list - assert set(old_replica_tag_list) <= set(new_all_tag_list) - assert set(old_replica_tag_list) == set(new_replica_tag_list) - - -def test_updating_config_legacy(serve_instance): - client = serve_instance - - class BatchSimple: - def __init__(self): - self.count = 0 - - @serve.accept_batch - def __call__(self, request): - return [1] * len(request) - - client.create_backend( - "bsimple:v1", - BatchSimple, - config={ - "max_batch_size": 2, - "num_replicas": 3 - }) - client.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple") - - controller = client._controller - old_replica_tag_list = ray.get( - controller._list_replicas.remote("bsimple:v1")) - - client.update_backend_config("bsimple:v1", {"max_batch_size": 5}) + update_config = { + "max_batch_size": 5 + } if use_legacy_config else BackendConfig(max_batch_size=5) + client.update_backend_config("bsimple:v1", update_config) new_replica_tag_list = ray.get( controller._list_replicas.remote("bsimple:v1")) new_all_tag_list = [] @@ -587,7 +466,8 @@ def test_multiple_instances(): client1.delete_backend(backend) -def test_parallel_start(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_parallel_start(serve_instance, use_legacy_config): client = serve_instance # Test the ability to start multiple replicas in parallel. @@ -618,47 +498,10 @@ def test_parallel_start(serve_instance): def __call__(self, _): return "Ready" - client.create_backend( - "p:v0", LongStartingServable, config=BackendConfig(num_replicas=2)) - client.create_endpoint("test-parallel", backend="p:v0") - handle = client.get_handle("test-parallel") - - ray.get(handle.remote(), timeout=10) - - -def test_parallel_start_legacy(serve_instance): - client = serve_instance - - # Test the ability to start multiple replicas in parallel. - # In the past, when Serve scale up a backend, it does so one by one and - # wait for each replica to initialize. This test avoid this by preventing - # the first replica to finish initialization unless the second replica is - # also started. - @ray.remote - class Barrier: - def __init__(self, release_on): - self.release_on = release_on - self.current_waiters = 0 - self.event = asyncio.Event() - - async def wait(self): - self.current_waiters += 1 - if self.current_waiters == self.release_on: - self.event.set() - else: - await self.event.wait() - - barrier = Barrier.remote(release_on=2) - - class LongStartingServable: - def __init__(self): - ray.get(barrier.wait.remote(), timeout=10) - - def __call__(self, _): - return "Ready" - - client.create_backend( - "p:v0", LongStartingServable, config={"num_replicas": 2}) + config = { + "num_replicas": 2 + } if use_legacy_config else BackendConfig(num_replicas=2) + client.create_backend("p:v0", LongStartingServable, config=config) client.create_endpoint("test-parallel", backend="p:v0") handle = client.get_handle("test-parallel") @@ -709,48 +552,27 @@ def test_list_endpoints(serve_instance): assert len(client.list_endpoints()) == 0 -def test_list_backends(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_list_backends(serve_instance, use_legacy_config): client = serve_instance @serve.accept_batch def f(): pass - client.create_backend( - "backend", f, config=BackendConfig(max_batch_size=10)) + config1 = { + "max_batch_size": 10 + } if use_legacy_config else BackendConfig(max_batch_size=10) + client.create_backend("backend", f, config=config1) backends = client.list_backends() assert len(backends) == 1 assert "backend" in backends assert backends["backend"]["max_batch_size"] == 10 - client.create_backend("backend2", f, config=BackendConfig(num_replicas=10)) - backends = client.list_backends() - assert len(backends) == 2 - assert backends["backend2"]["num_replicas"] == 10 - - client.delete_backend("backend") - backends = client.list_backends() - assert len(backends) == 1 - assert "backend2" in backends - - client.delete_backend("backend2") - assert len(client.list_backends()) == 0 - - -def test_list_backends_legacy(serve_instance): - client = serve_instance - - @serve.accept_batch - def f(): - pass - - client.create_backend("backend", f, config={"max_batch_size": 10}) - backends = client.list_backends() - assert len(backends) == 1 - assert "backend" in backends - assert backends["backend"]["max_batch_size"] == 10 - - client.create_backend("backend2", f, config={"num_replicas": 10}) + config2 = { + "num_replicas": 10 + } if use_legacy_config else BackendConfig(num_replicas=10) + client.create_backend("backend2", f, config=config2) backends = client.list_backends() assert len(backends) == 2 assert backends["backend2"]["num_replicas"] == 10 @@ -780,7 +602,8 @@ def test_endpoint_input_validation(serve_instance): client.create_endpoint("endpoint", backend="backend") -def test_create_infeasible_error(serve_instance): +@pytest.mark.parametrize("use_legacy_config", [False, True]) +def test_create_infeasible_error(serve_instance, use_legacy_config): client = serve_instance def f(): @@ -797,6 +620,10 @@ def test_create_infeasible_error(serve_instance): # Even each replica might be feasible, the total might not be. current_cpus = int(ray.nodes()[0]["Resources"]["CPU"]) + num_replicas = current_cpus + 20 + config = { + "num_replicas": num_replicas + } if use_legacy_config else BackendConfig(num_replicas=num_replicas) with pytest.raises(RayServeException, match="Cannot scale backend"): client.create_backend( "f:1", @@ -804,38 +631,7 @@ def test_create_infeasible_error(serve_instance): ray_actor_options={"resources": { "CPU": 1, }}, - config=BackendConfig(num_replicas=(current_cpus + 20))) - - # No replica should be created! - replicas = ray.get(client._controller._list_replicas.remote("f1")) - assert len(replicas) == 0 - - -def test_create_infeasible_error_legacy(serve_instance): - client = serve_instance - - def f(): - pass - - # Non existent resource should be infeasible. - with pytest.raises(RayServeException, match="Cannot scale backend"): - client.create_backend( - "f:1", - f, - ray_actor_options={"resources": { - "MagicMLResource": 100 - }}) - - # Even each replica might be feasible, the total might not be. - current_cpus = int(ray.nodes()[0]["Resources"]["CPU"]) - with pytest.raises(RayServeException, match="Cannot scale backend"): - client.create_backend( - "f:1", - f, - ray_actor_options={"resources": { - "CPU": 1, - }}, - config={"num_replicas": current_cpus + 20}) + config=config) # No replica should be created! replicas = ray.get(client._controller._list_replicas.remote("f1"))