[serve] Require backend when creating endpoint (#8764)

This commit is contained in:
Edward Oakes
2020-06-06 21:10:42 -05:00
committed by GitHub
parent b07b4f2e55
commit 5d124489a9
30 changed files with 272 additions and 256 deletions
+37 -6
View File
@@ -119,18 +119,49 @@ def init(name=None,
@_ensure_connected
def create_endpoint(endpoint_name, route=None, methods=["GET"]):
def create_endpoint(endpoint_name,
*,
backend=None,
route=None,
methods=["GET"]):
"""Create a service endpoint given route_expression.
Args:
endpoint_name (str): A name to associate to the endpoint. It will be
used as key to set traffic policy.
route (str): A string begin with "/". HTTP server will use
endpoint_name (str): A name to associate to with the endpoint.
backend (str, required): The backend that will serve requests to
this endpoint. To change this or split traffic among backends, use
`serve.set_traffic`.
route (str, optional): A string begin with "/". HTTP server will use
the string to match the path.
methods(List[str], optional): The HTTP methods that are valid for this
endpoint.
"""
if backend is None:
raise TypeError("backend must be specified when creating "
"an endpoint.")
elif not isinstance(backend, str):
raise TypeError("backend must be a string, got {}.".format(
type(backend)))
if route is not None:
if not isinstance(route, str) or not route.startswith("/"):
raise TypeError("route must be a string starting with '/'.")
if not isinstance(methods, list):
raise TypeError(
"methods must be a list of strings, but got type {}".format(
type(methods)))
upper_methods = []
for method in methods:
if not isinstance(method, str):
raise TypeError("methods must be a list of strings, but contained "
"an element of type {}".format(type(method)))
upper_methods.append(method.upper())
ray.get(
master_actor.create_endpoint.remote(route, endpoint_name,
[m.upper() for m in methods]))
master_actor.create_endpoint.remote(endpoint_name, {backend: 1.0},
route, upper_methods))
@_ensure_connected
+1 -2
View File
@@ -12,9 +12,8 @@ def noop(_):
return ""
serve.create_endpoint("noop", "/noop")
serve.create_backend("noop", noop)
serve.set_traffic("noop", {"noop": 1.0})
serve.create_endpoint("noop", backend="noop", route="/noop")
url = "{}/noop".format(DEFAULT_HTTP_ADDRESS)
while requests.get(url).status_code == 404:
@@ -12,9 +12,8 @@ class Counter:
return {"current_counter": self.count}
serve.create_endpoint("counter", "/counter")
serve.create_backend("counter", Counter)
serve.set_traffic("counter", {"counter": 1.0})
serve.create_endpoint("counter", backend="counter", route="/counter")
requests.get("http://127.0.0.1:8000/counter").json()
# > {"current_counter": self.count}
@@ -8,9 +8,8 @@ def echo(flask_request):
return "hello " + flask_request.args.get("name", "serve!")
serve.create_endpoint("hello", "/hello")
serve.create_backend("hello", echo)
serve.set_traffic("hello", {"hello": 1.0})
serve.create_endpoint("hello", backend="hello", route="/hello")
requests.get("http://127.0.0.1:8000/hello").text
# > "hello serve!"
@@ -30,9 +30,9 @@ def batch_adder_v0(flask_requests: List):
# __doc_deploy_begin__
serve.init()
serve.create_endpoint("adder", "/adder", methods=["GET"])
serve.create_backend("adder:v0", batch_adder_v0, config={"max_batch_size": 4})
serve.set_traffic("adder", {"adder:v0": 1})
serve.create_endpoint(
"adder", backend="adder:v0", route="/adder", methods=["GET"])
# __doc_deploy_end__
@@ -70,9 +70,8 @@ ray.init(address="auto")
# listen on 0.0.0.0 to make the HTTP server accessible from other machines.
serve.init(http_host="0.0.0.0")
serve.create_endpoint("iris_classifier", "/regressor")
serve.create_backend("lr:v1", BoostingModel)
serve.set_traffic("iris_classifier", {"lr:v1": 1, "version": "v1"})
serve.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")
# __doc_create_deploy_end__
# __doc_query_begin__
@@ -45,9 +45,12 @@ class ImageModel:
# __doc_deploy_begin__
serve.init()
serve.create_endpoint("predictor", "/image_predict", methods=["POST"])
serve.create_backend("resnet18:v0", ImageModel)
serve.set_traffic("predictor", {"resnet18:v0": 1})
serve.create_endpoint(
"predictor",
backend="resnet18:v0",
route="/image_predict",
methods=["POST"])
# __doc_deploy_end__
# __doc_query_begin__
@@ -66,9 +66,8 @@ class BoostingModel:
# __doc_deploy_begin__
serve.init()
serve.create_endpoint("iris_classifier", "/regressor")
serve.create_backend("lr:v1", BoostingModel)
serve.set_traffic("iris_classifier", {"lr:v1": 1})
serve.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")
# __doc_deploy_end__
# __doc_query_begin__
@@ -69,9 +69,8 @@ class TFMnistModel:
# __doc_deploy_begin__
serve.init()
serve.create_endpoint(endpoint_name="tf_classifier", route="/mnist")
serve.create_backend("tf:v1", TFMnistModel, "/tmp/mnist_model.h5")
serve.set_traffic("tf_classifier", {"tf:v1": 1})
serve.create_endpoint("tf_classifier", backend="tf:v1", route="/mnist")
# __doc_deploy_end__
# __doc_query_begin__
+1 -2
View File
@@ -16,9 +16,8 @@ def echo(flask_request):
serve.init()
serve.create_endpoint("my_endpoint", "/echo")
serve.create_backend("echo:v1", echo)
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
while True:
resp = requests.get("http://127.0.0.1:8000/echo").json()
+1 -2
View File
@@ -25,9 +25,8 @@ class MagicCounter:
serve.init()
serve.create_endpoint("magic_counter", "/counter")
serve.create_backend("counter:v1", MagicCounter, 42) # increment=42
serve.set_traffic("magic_counter", {"counter:v1": 1.0})
serve.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
print("Sending ten queries via HTTP")
for i in range(10):
@@ -36,11 +36,10 @@ class MagicCounter:
serve.init()
serve.create_endpoint("magic_counter", "/counter")
serve.create_backend(
"counter:v1", MagicCounter, 42,
config={"max_batch_size": 5}) # increment=42
serve.set_traffic("magic_counter", {"counter:v1": 1.0})
serve.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
print("Sending ten queries via HTTP")
for i in range(10):
+1 -2
View File
@@ -27,14 +27,13 @@ class MagicCounter:
serve.init()
serve.create_endpoint("magic_counter", "/counter")
# specify max_batch_size in BackendConfig
backend_config = {"max_batch_size": 5}
serve.create_backend(
"counter:v1", MagicCounter, 42, config=backend_config) # increment=42
print("Backend Config for backend: 'counter:v1'")
print(backend_config)
serve.set_traffic("magic_counter", {"counter:v1": 1.0})
serve.create_endpoint("magic_counter", backend="counter:v1", route="/counter")
handle = serve.get_handle("magic_counter")
future_list = []
+1 -3
View File
@@ -28,9 +28,7 @@ def echo(_):
serve.init()
serve.create_endpoint("my_endpoint", "/echo")
serve.create_backend("echo:v1", echo)
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
for _ in range(2):
resp = requests.get("http://127.0.0.1:8000/echo").json()
@@ -27,16 +27,16 @@ serve.init(
queueing_policy=serve.RoutePolicy.FixedPacking,
policy_kwargs={"packing_num": 5})
# create a service
serve.create_endpoint("my_endpoint", "/echo")
# create first backend
serve.create_backend("echo:v1", echo_v1)
# create service backed by the first backend
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
# create second backend
serve.create_backend("echo:v2", echo_v2)
# link and split the service to two backends
# split the service between the two backends
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
while True:
+3 -6
View File
@@ -13,9 +13,6 @@ from ray.serve.utils import pformat_color_json
# initialize ray serve system.
serve.init()
# an endpoint is associated with an http URL.
serve.create_endpoint("my_endpoint", "/echo")
# a backend can be a function or class.
# it can be made to be invoked from web as well as python.
@@ -27,9 +24,9 @@ def echo_v1(flask_request, response="hello from python!"):
serve.create_backend("echo:v1", echo_v1)
# We can link an endpoint to a backend, the means all the traffic
# goes to my_endpoint will now goes to echo:v1 backend.
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
# An endpoint is associated with an HTTP path and traffic to the endpoint
# will be serviced by the echo:v1 backend.
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
print(requests.get("http://127.0.0.1:8000/echo", timeout=0.5).text)
# The service will be reachable from http
+4 -8
View File
@@ -15,36 +15,32 @@ def echo_v1(_, response="hello from python!"):
return f"echo_v1({response})"
serve.create_endpoint("echo_v1", "/echo_v1")
serve.create_backend("echo_v1", echo_v1)
serve.set_traffic("echo_v1", {"echo_v1": 1.0})
serve.create_endpoint("echo_v1", backend="echo_v1", route="/echo_v1")
def echo_v2(_, relay=""):
return f"echo_v2({relay})"
serve.create_endpoint("echo_v2", "/echo_v2")
serve.create_backend("echo_v2", echo_v2)
serve.set_traffic("echo_v2", {"echo_v2": 1.0})
serve.create_endpoint("echo_v2", backend="echo_v2", route="/echo_v2")
def echo_v3(_, relay=""):
return f"echo_v3({relay})"
serve.create_endpoint("echo_v3", "/echo_v3")
serve.create_backend("echo_v3", echo_v3)
serve.set_traffic("echo_v3", {"echo_v3": 1.0})
serve.create_endpoint("echo_v3", backend="echo_v3", route="/echo_v3")
def echo_v4(_, relay1="", relay2=""):
return f"echo_v4({relay1} , {relay2})"
serve.create_endpoint("echo_v4", "/echo_v4")
serve.create_backend("echo_v4", echo_v4)
serve.set_traffic("echo_v4", {"echo_v4": 1.0})
serve.create_endpoint("echo_v4", backend="echo_v4", route="/echo_v4")
"""
The pipeline created is as follows -
"my_endpoint1"
@@ -21,16 +21,16 @@ def echo_v2(_):
# specify the router policy as RoundRobin
serve.init(queueing_policy=serve.RoutePolicy.RoundRobin)
# create a service
serve.create_endpoint("my_endpoint", "/echo")
# create first backend
serve.create_backend("echo:v1", echo_v1)
# create a service backend by the first backend
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
# create second backend
serve.create_backend("echo:v2", echo_v2)
# link and split the service to two backends
# split the service between the two backends
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
while True:
@@ -12,9 +12,6 @@ import ray.serve as serve
# initialize ray serve system.
serve.init()
# an endpoint is associated with an http URL.
serve.create_endpoint("my_endpoint", "/echo")
# a backend can be a function or class.
# it can be made to be invoked from web as well as python.
@@ -25,7 +22,8 @@ def echo_v1(flask_request, response="hello from python!"):
serve.create_backend("echo:v1", echo_v1)
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
# wait for routing table to get populated
time.sleep(2)
+1 -2
View File
@@ -20,9 +20,8 @@ def echo_v2(_):
serve.init()
serve.create_endpoint("my_endpoint", "/echo")
serve.create_backend("echo:v1", echo_v1)
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
serve.create_endpoint("my_endpoint", backend="echo:v1", route="/echo")
for _ in range(3):
resp = requests.get("http://127.0.0.1:8000/echo").json()
+38 -39
View File
@@ -465,44 +465,45 @@ class ServeMaster:
}
return endpoints
async def set_traffic(self, endpoint_name, traffic_policy_dictionary):
async def _set_traffic(self, endpoint_name, traffic_dict):
if endpoint_name not in self.get_all_endpoints():
raise ValueError("Attempted to assign traffic for an endpoint '{}'"
" that is not registered.".format(endpoint_name))
assert isinstance(traffic_dict,
dict), "Traffic policy must be dictionary"
prob = 0
for backend, weight in traffic_dict.items():
if weight < 0:
raise ValueError(
"Attempted to assign a weight of {} to backend '{}'. "
"Weights cannot be negative.".format(weight, backend))
prob += weight
if backend not in self.backends:
raise ValueError(
"Attempted to assign traffic to a backend '{}' that "
"is not registered.".format(backend))
# These weights will later be plugged into np.random.choice, which
# uses a tolerance of 1e-8.
assert np.isclose(
prob, 1, atol=1e-8
), "weights must sum to 1, currently they sum to {}".format(prob)
self.traffic_policies[endpoint_name] = traffic_dict
# NOTE(edoakes): we must write a checkpoint before pushing the
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
await self.router.set_traffic.remote(endpoint_name, traffic_dict)
async def set_traffic(self, endpoint_name, traffic_dict):
"""Sets the traffic policy for the specified endpoint."""
async with self.write_lock:
if endpoint_name not in self.get_all_endpoints():
raise ValueError(
"Attempted to assign traffic for an endpoint '{}'"
" that is not registered.".format(endpoint_name))
await self._set_traffic(endpoint_name, traffic_dict)
assert isinstance(traffic_policy_dictionary,
dict), "Traffic policy must be dictionary"
prob = 0
for backend, weight in traffic_policy_dictionary.items():
if weight < 0:
raise ValueError(
"Attempted to assign a weight of {} to backend '{}'. "
"Weights cannot be negative.".format(weight, backend))
prob += weight
if backend not in self.backends:
raise ValueError(
"Attempted to assign traffic to a backend '{}' that "
"is not registered.".format(backend))
# These weights will later be plugged into np.random.choice, which
# uses a tolerance of 1e-8.
assert np.isclose(
prob, 1, atol=1e-8
), "weights must sum to 1, currently they sum to {}".format(prob)
self.traffic_policies[endpoint_name] = traffic_policy_dictionary
# NOTE(edoakes): we must write a checkpoint before pushing the
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
await self.router.set_traffic.remote(endpoint_name,
traffic_policy_dictionary)
async def create_endpoint(self, route, endpoint, methods):
async def create_endpoint(self, endpoint, traffic_dict, route, methods):
"""Create a new endpoint with the specified route and methods.
If the route is None, this is a "headless" endpoint that will not
@@ -537,10 +538,8 @@ class ServeMaster:
self.routes[route] = (endpoint, methods)
# NOTE(edoakes): we must write a checkpoint before pushing the
# update to avoid inconsistent state if we crash after pushing the
# update.
self._checkpoint()
# NOTE(edoakes): checkpoint is written in self._set_traffic.
await self._set_traffic(endpoint, traffic_dict)
await self.http_proxy.set_route_table.remote(self.routes)
async def delete_endpoint(self, endpoint):
+77 -58
View File
@@ -11,7 +11,13 @@ from ray.serve.utils import get_random_letters
def test_e2e(serve_instance):
serve.init()
serve.create_endpoint("endpoint", "/api", methods=["GET", "POST"])
def function(flask_request):
return {"method": flask_request.method}
serve.create_backend("echo:v1", function)
serve.create_endpoint(
"endpoint", backend="echo:v1", route="/api", methods=["GET", "POST"])
retry_count = 5
timeout_sleep = 0.5
@@ -29,12 +35,6 @@ def test_e2e(serve_instance):
assert False, ("Route table hasn't been updated after 3 tries."
"The latest error was {}").format(e)
def function(flask_request):
return {"method": flask_request.method}
serve.create_backend("echo:v1", function)
serve.set_traffic("endpoint", {"echo:v1": 1.0})
resp = requests.get("http://127.0.0.1:8000/api").json()["method"]
assert resp == "GET"
@@ -43,14 +43,12 @@ def test_e2e(serve_instance):
def test_call_method(serve_instance):
serve.create_endpoint("endpoint", "/api")
class CallMethod:
def method(self, request):
return "hello"
serve.create_backend("backend", CallMethod)
serve.set_traffic("endpoint", {"backend": 1.0})
serve.create_endpoint("endpoint", backend="backend", route="/api")
# Test HTTP path.
resp = requests.get(
@@ -65,37 +63,46 @@ def test_call_method(serve_instance):
def test_no_route(serve_instance):
serve.create_endpoint("noroute-endpoint")
def func(_, i=1):
return 1
serve.create_backend("backend:1", func)
serve.set_traffic("noroute-endpoint", {"backend:1": 1.0})
serve.create_endpoint("noroute-endpoint", backend="backend:1")
service_handle = serve.get_handle("noroute-endpoint")
result = ray.get(service_handle.remote(i=1))
assert result == 1
def test_reject_duplicate_route(serve_instance):
def f():
pass
serve.create_backend("backend", f)
route = "/foo"
serve.create_endpoint("bar", route=route)
serve.create_endpoint("bar", backend="backend", route=route)
with pytest.raises(ValueError):
serve.create_endpoint("foo", route=route)
serve.create_endpoint("foo", backend="backend", route=route)
def test_reject_duplicate_endpoint(serve_instance):
def f():
pass
serve.create_backend("backend", f)
endpoint_name = "foo"
serve.create_endpoint(endpoint_name, route="/ok")
serve.create_endpoint(endpoint_name, backend="backend", route="/ok")
with pytest.raises(ValueError):
serve.create_endpoint(endpoint_name, route="/different")
serve.create_endpoint(
endpoint_name, backend="backend", route="/different")
def test_set_traffic_missing_data(serve_instance):
endpoint_name = "foobar"
backend_name = "foo_backend"
serve.create_endpoint(endpoint_name)
serve.create_backend(backend_name, lambda: 5)
serve.create_endpoint(endpoint_name, backend=backend_name)
with pytest.raises(ValueError):
serve.set_traffic(endpoint_name, {"nonexistent_backend": 1.0})
with pytest.raises(ValueError):
@@ -111,16 +118,14 @@ def test_scaling_replicas(serve_instance):
self.count += 1
return self.count
serve.create_endpoint("counter", "/increment")
serve.create_backend("counter:v1", Counter, config={"num_replicas": 2})
serve.create_endpoint("counter", backend="counter:v1", route="/increment")
# Keep checking the routing table until /increment is populated
while "/increment" not in requests.get(
"http://127.0.0.1:8000/-/routes").json():
time.sleep(0.2)
serve.create_backend("counter:v1", Counter, config={"num_replicas": 2})
serve.set_traffic("counter", {"counter:v1": 1.0})
counter_result = []
for _ in range(10):
resp = requests.get("http://127.0.0.1:8000/increment").json()
@@ -151,18 +156,17 @@ def test_batching(serve_instance):
batch_size = serve.context.batch_size
return [self.count] * batch_size
serve.create_endpoint("counter1", "/increment2")
# set the max batch size
serve.create_backend(
"counter:v11", BatchingExample, config={"max_batch_size": 5})
serve.create_endpoint(
"counter1", backend="counter:v11", route="/increment2")
# Keep checking the routing table until /increment is populated
while "/increment2" not in requests.get(
"http://127.0.0.1:8000/-/routes").json():
time.sleep(0.2)
# set the max batch size
serve.create_backend(
"counter:v11", BatchingExample, config={"max_batch_size": 5})
serve.set_traffic("counter1", {"counter:v11": 1.0})
future_list = []
handle = serve.get_handle("counter1")
for _ in range(20):
@@ -186,11 +190,11 @@ def test_batching_exception(serve_instance):
batch_size = serve.context.batch_size
return batch_size
serve.create_endpoint("exception-test", "/noListReturned")
# set the max batch size
serve.create_backend(
"exception:v1", NoListReturned, config={"max_batch_size": 5})
serve.set_traffic("exception-test", {"exception:v1": 1.0})
serve.create_endpoint(
"exception-test", backend="exception:v1", route="/noListReturned")
handle = serve.get_handle("exception-test")
with pytest.raises(ray.exceptions.RayTaskError):
@@ -207,7 +211,6 @@ def test_updating_config(serve_instance):
batch_size = serve.context.batch_size
return [1] * batch_size
serve.create_endpoint("bsimple", "/bsimple")
serve.create_backend(
"bsimple:v1",
BatchSimple,
@@ -215,6 +218,8 @@ def test_updating_config(serve_instance):
"max_batch_size": 2,
"num_replicas": 3
})
serve.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple")
master_actor = serve.api._get_master_actor()
old_replica_tag_list = ray.get(
master_actor._list_replicas.remote("bsimple:v1"))
@@ -234,13 +239,12 @@ def test_updating_config(serve_instance):
def test_delete_backend(serve_instance):
serve.create_endpoint("delete_backend", "/delete-backend")
def function():
return "hello"
serve.create_backend("delete:v1", function)
serve.set_traffic("delete_backend", {"delete:v1": 1.0})
serve.create_endpoint(
"delete_backend", backend="delete:v1", route="/delete-backend")
assert requests.get("http://127.0.0.1:8000/delete-backend").text == "hello"
@@ -274,18 +278,18 @@ def test_delete_backend(serve_instance):
@pytest.mark.parametrize("route", [None, "/delete-endpoint"])
def test_delete_endpoint(serve_instance, route):
endpoint_name = "delete_endpoint" + str(route)
serve.create_endpoint(endpoint_name, route=route)
serve.delete_endpoint(endpoint_name)
# Check that we can reuse a deleted endpoint name and route.
serve.create_endpoint(endpoint_name, route=route)
def function():
return "hello"
serve.create_backend("delete-endpoint:v1", function)
serve.set_traffic(endpoint_name, {"delete-endpoint:v1": 1.0})
backend_name = "delete-endpoint:v1"
serve.create_backend(backend_name, function)
endpoint_name = "delete_endpoint" + str(route)
serve.create_endpoint(endpoint_name, backend=backend_name, route=route)
serve.delete_endpoint(endpoint_name)
# Check that we can reuse a deleted endpoint name and route.
serve.create_endpoint(endpoint_name, backend=backend_name, route=route)
if route is not None:
assert requests.get(
@@ -296,8 +300,7 @@ def test_delete_endpoint(serve_instance, route):
# Check that deleting the endpoint doesn't delete the backend.
serve.delete_endpoint(endpoint_name)
serve.create_endpoint(endpoint_name, route=route)
serve.set_traffic(endpoint_name, {"delete-endpoint:v1": 1.0})
serve.create_endpoint(endpoint_name, backend=backend_name, route=route)
if route is not None:
assert requests.get(
@@ -309,8 +312,6 @@ def test_delete_endpoint(serve_instance, route):
@pytest.mark.parametrize("route", [None, "/shard"])
def test_shard_key(serve_instance, route):
serve.create_endpoint("endpoint", route=route)
# Create five backends that return different integers.
num_backends = 5
traffic_dict = {}
@@ -323,6 +324,8 @@ def test_shard_key(serve_instance, route):
traffic_dict[backend_name] = 1.0 / num_backends
serve.create_backend(backend_name, function)
serve.create_endpoint(
"endpoint", backend=list(traffic_dict.keys())[0], route=route)
serve.set_traffic("endpoint", traffic_dict)
def do_request(shard_key):
@@ -355,26 +358,24 @@ def test_name():
endpoint = "endpoint"
serve.init(name="cluster1", http_port=8001)
serve.create_endpoint(endpoint, route=route)
def function():
return "hello1"
serve.create_backend(backend, function)
serve.set_traffic(endpoint, {backend: 1.0})
serve.create_endpoint(endpoint, backend=backend, route=route)
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
# Create a second cluster on port 8002. Create an endpoint and backend with
# the same names and check that they don't collide.
serve.init(name="cluster2", http_port=8002)
serve.create_endpoint(endpoint, route=route)
def function():
return "hello2"
serve.create_backend(backend, function)
serve.set_traffic(endpoint, {backend: 1.0})
serve.create_endpoint(endpoint, backend=backend, route=route)
assert requests.get("http://127.0.0.1:8001" + route).text == "hello1"
assert requests.get("http://127.0.0.1:8002" + route).text == "hello2"
@@ -419,10 +420,9 @@ def test_parallel_start(serve_instance):
def __call__(self, _):
return "Ready"
serve.create_endpoint("test-parallel")
serve.create_backend(
"p:v0", LongStartingServable, config={"num_replicas": 2})
serve.set_traffic("test-parallel", {"p:v0": 1})
serve.create_endpoint("test-parallel", backend="p:v0")
handle = serve.get_handle("test-parallel")
ray.get(handle.remote(), timeout=10)
@@ -434,17 +434,20 @@ def test_list_endpoints(serve_instance):
def f():
pass
serve.create_endpoint("endpoint", "/api", methods=["GET", "POST"])
serve.create_endpoint("endpoint2", methods=["POST"])
serve.create_backend("backend", f)
serve.set_traffic("endpoint2", {"backend": 1.0})
serve.create_backend("backend2", f)
serve.create_endpoint(
"endpoint", backend="backend", route="/api", methods=["GET", "POST"])
serve.create_endpoint("endpoint2", backend="backend2", methods=["POST"])
endpoints = serve.list_endpoints()
assert "endpoint" in endpoints
assert endpoints["endpoint"] == {
"route": "/api",
"methods": ["GET", "POST"],
"traffic": {}
"traffic": {
"backend": 1.0
}
}
assert "endpoint2" in endpoints
@@ -452,7 +455,7 @@ def test_list_endpoints(serve_instance):
"route": None,
"methods": ["POST"],
"traffic": {
"backend": 1.0
"backend2": 1.0
}
}
@@ -488,3 +491,19 @@ def test_list_backends(serve_instance):
serve.delete_backend("backend2")
assert len(serve.list_backends()) == 0
def test_endpoint_input_validation(serve_instance):
serve.init()
def f():
pass
serve.create_backend("backend", f)
with pytest.raises(TypeError):
serve.create_endpoint("endpoint")
with pytest.raises(TypeError):
serve.create_endpoint("endpoint", route="/hello")
with pytest.raises(TypeError):
serve.create_endpoint("endpoint", backend=2)
serve.create_endpoint("endpoint", backend="backend")
+15 -13
View File
@@ -21,13 +21,13 @@ def request_with_retries(endpoint, timeout=30):
def test_master_failure(serve_instance):
serve.init()
serve.create_endpoint("master_failure", "/master_failure")
def function():
return "hello1"
serve.create_backend("master_failure:v1", function)
serve.set_traffic("master_failure", {"master_failure:v1": 1.0})
serve.create_endpoint(
"master_failure", backend="master_failure:v1", route="/master_failure")
assert request_with_retries("/master_failure", timeout=1).text == "hello1"
@@ -56,12 +56,14 @@ def test_master_failure(serve_instance):
def function():
return "hello3"
ray.kill(serve.api._get_master_actor(), no_restart=False)
serve.create_endpoint("master_failure_2", "/master_failure_2")
ray.kill(serve.api._get_master_actor(), no_restart=False)
serve.create_backend("master_failure_2", function)
ray.kill(serve.api._get_master_actor(), no_restart=False)
serve.set_traffic("master_failure_2", {"master_failure_2": 1.0})
serve.create_endpoint(
"master_failure_2",
backend="master_failure_2",
route="/master_failure_2")
ray.kill(serve.api._get_master_actor(), no_restart=False)
for _ in range(10):
response = request_with_retries("/master_failure", timeout=30)
@@ -78,13 +80,13 @@ def _kill_http_proxy():
def test_http_proxy_failure(serve_instance):
serve.init()
serve.create_endpoint("proxy_failure", "/proxy_failure")
def function():
return "hello1"
serve.create_backend("proxy_failure:v1", function)
serve.set_traffic("proxy_failure", {"proxy_failure:v1": 1.0})
serve.create_endpoint(
"proxy_failure", backend="proxy_failure:v1", route="/proxy_failure")
assert request_with_retries("/proxy_failure", timeout=1.0).text == "hello1"
@@ -112,13 +114,13 @@ def _kill_router():
def test_router_failure(serve_instance):
serve.init()
serve.create_endpoint("router_failure", "/router_failure")
def function():
return "hello1"
serve.create_backend("router_failure:v1", function)
serve.set_traffic("router_failure", {"router_failure:v1": 1.0})
serve.create_endpoint(
"router_failure", backend="router_failure:v1", route="/router_failure")
assert request_with_retries("/router_failure", timeout=5).text == "hello1"
@@ -154,14 +156,14 @@ def _get_worker_handles(backend):
# serving requests.
def test_worker_restart(serve_instance):
serve.init()
serve.create_endpoint("worker_failure", "/worker_failure")
class Worker1:
def __call__(self):
return os.getpid()
serve.create_backend("worker_failure:v1", Worker1)
serve.set_traffic("worker_failure", {"worker_failure:v1": 1.0})
serve.create_endpoint(
"worker_failure", backend="worker_failure:v1", route="/worker_failure")
# Get the PID of the worker.
old_pid = request_with_retries("/worker_failure", timeout=1).text
@@ -186,7 +188,6 @@ def test_worker_restart(serve_instance):
def test_worker_replica_failure(serve_instance):
serve.http_proxy.MAX_ACTOR_DEAD_RETRIES = 0
serve.init()
serve.create_endpoint("replica_failure", "/replica_failure")
class Worker:
# Assumes that two replicas are started. Will hang forever in the
@@ -216,7 +217,8 @@ def test_worker_replica_failure(serve_instance):
temp_path = tempfile.gettempdir() + "/" + serve.utils.get_random_letters()
serve.create_backend("replica_failure", Worker, temp_path)
serve.update_backend_config("replica_failure", {"num_replicas": 2})
serve.set_traffic("replica_failure", {"replica_failure": 1.0})
serve.create_endpoint(
"replica_failure", backend="replica_failure", route="/replica_failure")
# Wait until both replicas have been started.
responses = set()
+10 -4
View File
@@ -18,12 +18,18 @@ def test_handle_in_endpoint(serve_instance):
def __call__(self):
return ray.get(self.handle.remote())
serve.create_endpoint("endpoint1", "/endpoint1", methods=["GET", "POST"])
serve.create_backend("endpoint1:v0", Endpoint1)
serve.set_traffic("endpoint1", {"endpoint1:v0": 1.0})
serve.create_endpoint(
"endpoint1",
backend="endpoint1:v0",
route="/endpoint1",
methods=["GET", "POST"])
serve.create_endpoint("endpoint2", "/endpoint2", methods=["GET", "POST"])
serve.create_backend("endpoint2:v0", Endpoint2)
serve.set_traffic("endpoint2", {"endpoint2:v0": 1.0})
serve.create_endpoint(
"endpoint2",
backend="endpoint2:v0",
route="/endpoint2",
methods=["GET", "POST"])
assert requests.get("http://127.0.0.1:8000/endpoint2").text == "hello"
+1 -1
View File
@@ -144,8 +144,8 @@ async def test_system_metric_endpoints(serve_instance):
def test_error_counter(flask_request):
1 / 0
serve.create_endpoint("test_metrics", "/measure")
serve.create_backend("m:v1", test_error_counter)
serve.create_endpoint("test_metrics", backend="m:v1", route="/measure")
serve.set_traffic("test_metrics", {"m:v1": 1})
# Check metrics are exposed under http endpoint
+2 -2
View File
@@ -6,13 +6,13 @@ from ray import serve
def test_nonblocking():
serve.init()
serve.create_endpoint("nonblocking", "/nonblocking")
def function(flask_request):
return {"method": flask_request.method}
serve.create_backend("nonblocking:v1", function)
serve.set_traffic("nonblocking", {"nonblocking:v1": 1.0})
serve.create_endpoint(
"nonblocking", backend="nonblocking:v1", route="/nonblocking")
resp = requests.get("http://127.0.0.1:8000/nonblocking").json()["method"]
assert resp == "GET"
+1 -2
View File
@@ -17,9 +17,8 @@ serve.init()
def driver(flask_request):
return "OK!"
serve.create_endpoint("driver", "/driver")
serve.create_backend("driver", driver)
serve.set_traffic("driver", {{"driver": 1.0}})
serve.create_endpoint("driver", backend="driver", route="/driver")
""".format(ray.worker._global_node._redis_address)
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: