mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 08:10:28 +08:00
[serve] Remove serve.link(), rename serve.split() -> serve.set_traffic() (#8072)
This commit is contained in:
@@ -1,11 +1,11 @@
|
||||
from ray.serve.backend_config import BackendConfig
|
||||
from ray.serve.policy import RoutePolicy
|
||||
from ray.serve.api import (init, create_backend, create_endpoint, link, split,
|
||||
from ray.serve.api import (init, create_backend, create_endpoint, set_traffic,
|
||||
get_handle, stat, set_backend_config,
|
||||
get_backend_config, accept_batch) # noqa: E402
|
||||
|
||||
__all__ = [
|
||||
"init", "create_backend", "create_endpoint", "link", "split", "get_handle",
|
||||
"init", "create_backend", "create_endpoint", "set_traffic", "get_handle",
|
||||
"stat", "set_backend_config", "get_backend_config", "BackendConfig",
|
||||
"RoutePolicy", "accept_batch"
|
||||
]
|
||||
|
||||
+4
-20
@@ -246,28 +246,12 @@ def create_backend(func_or_class,
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def link(endpoint_name, backend_tag):
|
||||
"""Associate a service endpoint with backend tag.
|
||||
|
||||
Example:
|
||||
|
||||
>>> serve.link("service-name", "backend:v1")
|
||||
|
||||
Note:
|
||||
This is equivalent to
|
||||
|
||||
>>> serve.split("service-name", {"backend:v1": 1.0})
|
||||
"""
|
||||
split(endpoint_name, {backend_tag: 1.0})
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
def split(endpoint_name, traffic_policy_dictionary):
|
||||
def set_traffic(endpoint_name, traffic_policy_dictionary):
|
||||
"""Associate a service endpoint with traffic policy.
|
||||
|
||||
Example:
|
||||
|
||||
>>> serve.split("service-name", {
|
||||
>>> serve.set_traffic("service-name", {
|
||||
"backend:v1": 0.5,
|
||||
"backend:v2": 0.5
|
||||
})
|
||||
@@ -278,8 +262,8 @@ def split(endpoint_name, traffic_policy_dictionary):
|
||||
to their traffic weights. The weights must sum to 1.
|
||||
"""
|
||||
ray.get(
|
||||
master_actor.split_traffic.remote(endpoint_name,
|
||||
traffic_policy_dictionary))
|
||||
master_actor.set_traffic.remote(endpoint_name,
|
||||
traffic_policy_dictionary))
|
||||
|
||||
|
||||
@_ensure_connected
|
||||
|
||||
@@ -14,7 +14,7 @@ def noop(_):
|
||||
|
||||
serve.create_endpoint("noop", "/noop")
|
||||
serve.create_backend(noop, "noop")
|
||||
serve.split("noop", {"noop": 1.0})
|
||||
serve.set_traffic("noop", {"noop": 1.0})
|
||||
|
||||
url = "{}/noop".format(DEFAULT_HTTP_ADDRESS)
|
||||
while requests.get(url).status_code == 404:
|
||||
|
||||
@@ -14,7 +14,7 @@ class Counter:
|
||||
|
||||
serve.create_endpoint("counter", "/counter")
|
||||
serve.create_backend(Counter, "counter")
|
||||
serve.split("counter", {"counter": 1.0})
|
||||
serve.set_traffic("counter", {"counter": 1.0})
|
||||
|
||||
requests.get("http://127.0.0.1:8000/counter").json()
|
||||
# > {"current_counter": self.count}
|
||||
|
||||
@@ -10,7 +10,7 @@ def echo(flask_request):
|
||||
|
||||
serve.create_endpoint("hello", "/hello")
|
||||
serve.create_backend(echo, "hello")
|
||||
serve.split("hello", {"hello": 1.0})
|
||||
serve.set_traffic("hello", {"hello": 1.0})
|
||||
|
||||
requests.get("http://127.0.0.1:8000/hello").text
|
||||
# > "hello serve!"
|
||||
|
||||
@@ -18,7 +18,7 @@ serve.init(blocking=True)
|
||||
|
||||
serve.create_endpoint("my_endpoint", "/echo")
|
||||
serve.create_backend(echo, "echo:v1")
|
||||
serve.link("my_endpoint", "echo:v1")
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
|
||||
|
||||
while True:
|
||||
resp = requests.get("http://127.0.0.1:8000/echo").json()
|
||||
|
||||
@@ -27,7 +27,7 @@ class MagicCounter:
|
||||
serve.init(blocking=True)
|
||||
serve.create_endpoint("magic_counter", "/counter")
|
||||
serve.create_backend(MagicCounter, "counter:v1", 42) # increment=42
|
||||
serve.link("magic_counter", "counter:v1")
|
||||
serve.set_traffic("magic_counter", {"counter:v1": 1.0})
|
||||
|
||||
print("Sending ten queries via HTTP")
|
||||
for i in range(10):
|
||||
|
||||
@@ -41,7 +41,7 @@ serve.create_endpoint("magic_counter", "/counter")
|
||||
b_config = BackendConfig(max_batch_size=5)
|
||||
serve.create_backend(
|
||||
MagicCounter, "counter:v1", 42, backend_config=b_config) # increment=42
|
||||
serve.link("magic_counter", "counter:v1")
|
||||
serve.set_traffic("magic_counter", {"counter:v1": 1.0})
|
||||
|
||||
print("Sending ten queries via HTTP")
|
||||
for i in range(10):
|
||||
|
||||
@@ -35,7 +35,7 @@ serve.create_backend(
|
||||
MagicCounter, "counter:v1", 42, backend_config=b_config) # increment=42
|
||||
print("Backend Config for backend: 'counter:v1'")
|
||||
print(b_config)
|
||||
serve.link("magic_counter", "counter:v1")
|
||||
serve.set_traffic("magic_counter", {"counter:v1": 1.0})
|
||||
|
||||
handle = serve.get_handle("magic_counter")
|
||||
future_list = []
|
||||
|
||||
@@ -30,7 +30,7 @@ serve.init(blocking=True)
|
||||
|
||||
serve.create_endpoint("my_endpoint", "/echo")
|
||||
serve.create_backend(echo, "echo:v1")
|
||||
serve.link("my_endpoint", "echo:v1")
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
|
||||
|
||||
for _ in range(2):
|
||||
resp = requests.get("http://127.0.0.1:8000/echo").json()
|
||||
|
||||
@@ -38,7 +38,7 @@ serve.create_backend(echo_v1, "echo:v1")
|
||||
serve.create_backend(echo_v2, "echo:v2")
|
||||
|
||||
# link and split the service to two backends
|
||||
serve.split("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
|
||||
while True:
|
||||
resp = requests.get("http://127.0.0.1:8000/echo").json()
|
||||
|
||||
@@ -31,7 +31,7 @@ backend_config_v1 = serve.get_backend_config("echo:v1")
|
||||
|
||||
# We can link an endpoint to a backend, the means all the traffic
|
||||
# goes to my_endpoint will now goes to echo:v1 backend.
|
||||
serve.link("my_endpoint", "echo:v1")
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
|
||||
|
||||
print(requests.get("http://127.0.0.1:8000/echo", timeout=0.5).text)
|
||||
# The service will be reachable from http
|
||||
@@ -51,7 +51,7 @@ serve.create_backend(echo_v2, "echo:v2")
|
||||
backend_config_v2 = serve.get_backend_config("echo:v2")
|
||||
|
||||
# The two backend will now split the traffic 50%-50%.
|
||||
serve.split("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
|
||||
# Observe requests are now split between two backends.
|
||||
for _ in range(10):
|
||||
|
||||
@@ -18,7 +18,7 @@ def echo_v1(_, response="hello from python!"):
|
||||
|
||||
serve.create_endpoint("echo_v1", "/echo_v1")
|
||||
serve.create_backend(echo_v1, "echo_v1")
|
||||
serve.split("echo_v1", {"echo_v1": 1.0})
|
||||
serve.set_traffic("echo_v1", {"echo_v1": 1.0})
|
||||
|
||||
|
||||
def echo_v2(_, relay=""):
|
||||
@@ -27,7 +27,7 @@ def echo_v2(_, relay=""):
|
||||
|
||||
serve.create_endpoint("echo_v2", "/echo_v2")
|
||||
serve.create_backend(echo_v2, "echo_v2")
|
||||
serve.split("echo_v2", {"echo_v2": 1.0})
|
||||
serve.set_traffic("echo_v2", {"echo_v2": 1.0})
|
||||
|
||||
|
||||
def echo_v3(_, relay=""):
|
||||
@@ -36,7 +36,7 @@ def echo_v3(_, relay=""):
|
||||
|
||||
serve.create_endpoint("echo_v3", "/echo_v3")
|
||||
serve.create_backend(echo_v3, "echo_v3")
|
||||
serve.split("echo_v3", {"echo_v3": 1.0})
|
||||
serve.set_traffic("echo_v3", {"echo_v3": 1.0})
|
||||
|
||||
|
||||
def echo_v4(_, relay1="", relay2=""):
|
||||
@@ -45,7 +45,7 @@ def echo_v4(_, relay1="", relay2=""):
|
||||
|
||||
serve.create_endpoint("echo_v4", "/echo_v4")
|
||||
serve.create_backend(echo_v4, "echo_v4")
|
||||
serve.split("echo_v4", {"echo_v4": 1.0})
|
||||
serve.set_traffic("echo_v4", {"echo_v4": 1.0})
|
||||
"""
|
||||
The pipeline created is as follows -
|
||||
"my_endpoint1"
|
||||
|
||||
@@ -31,7 +31,7 @@ serve.create_backend(echo_v1, "echo:v1")
|
||||
serve.create_backend(echo_v2, "echo:v2")
|
||||
|
||||
# link and split the service to two backends
|
||||
serve.split("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
|
||||
while True:
|
||||
resp = requests.get("http://127.0.0.1:8000/echo").json()
|
||||
|
||||
@@ -26,7 +26,7 @@ def echo_v1(flask_request, response="hello from python!"):
|
||||
|
||||
|
||||
serve.create_backend(echo_v1, "echo:v1")
|
||||
serve.link("my_endpoint", "echo:v1")
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
|
||||
|
||||
# wait for routing table to get populated
|
||||
time.sleep(2)
|
||||
|
||||
@@ -22,7 +22,7 @@ serve.init(blocking=True)
|
||||
|
||||
serve.create_endpoint("my_endpoint", "/echo")
|
||||
serve.create_backend(echo_v1, "echo:v1")
|
||||
serve.link("my_endpoint", "echo:v1")
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 1.0})
|
||||
|
||||
for _ in range(3):
|
||||
resp = requests.get("http://127.0.0.1:8000/echo").json()
|
||||
@@ -32,7 +32,7 @@ for _ in range(3):
|
||||
time.sleep(2)
|
||||
|
||||
serve.create_backend(echo_v2, "echo:v2")
|
||||
serve.split("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
serve.set_traffic("my_endpoint", {"echo:v1": 0.5, "echo:v2": 0.5})
|
||||
while True:
|
||||
resp = requests.get("http://127.0.0.1:8000/echo").json()
|
||||
print(pformat_color_json(resp))
|
||||
|
||||
@@ -219,7 +219,7 @@ class ServeMaster:
|
||||
return expand(
|
||||
self.route_table.list_service(include_headless=True).values())
|
||||
|
||||
async def split_traffic(self, endpoint_name, traffic_policy_dictionary):
|
||||
async def set_traffic(self, endpoint_name, traffic_policy_dictionary):
|
||||
assert endpoint_name in expand(
|
||||
self.route_table.list_service(include_headless=True).values())
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ def test_e2e(serve_instance):
|
||||
return {"method": flask_request.method}
|
||||
|
||||
serve.create_backend(function, "echo:v1")
|
||||
serve.link("endpoint", "echo:v1")
|
||||
serve.set_traffic("endpoint", {"echo:v1": 1.0})
|
||||
|
||||
resp = requests.get("http://127.0.0.1:8000/api").json()["method"]
|
||||
assert resp == "GET"
|
||||
@@ -47,7 +47,7 @@ def test_no_route(serve_instance):
|
||||
return 1
|
||||
|
||||
serve.create_backend(func, "backend:1")
|
||||
serve.link("noroute-endpoint", "backend:1")
|
||||
serve.set_traffic("noroute-endpoint", {"backend:1": 1.0})
|
||||
service_handle = serve.get_handle("noroute-endpoint")
|
||||
result = ray.get(service_handle.remote(i=1))
|
||||
assert result == 1
|
||||
@@ -71,7 +71,7 @@ def test_scaling_replicas(serve_instance):
|
||||
|
||||
b_config = BackendConfig(num_replicas=2)
|
||||
serve.create_backend(Counter, "counter:v1", backend_config=b_config)
|
||||
serve.link("counter", "counter:v1")
|
||||
serve.set_traffic("counter", {"counter:v1": 1.0})
|
||||
|
||||
counter_result = []
|
||||
for _ in range(10):
|
||||
@@ -116,7 +116,7 @@ def test_batching(serve_instance):
|
||||
b_config = BackendConfig(max_batch_size=5)
|
||||
serve.create_backend(
|
||||
BatchingExample, "counter:v11", backend_config=b_config)
|
||||
serve.link("counter1", "counter:v11")
|
||||
serve.set_traffic("counter1", {"counter:v11": 1.0})
|
||||
|
||||
future_list = []
|
||||
handle = serve.get_handle("counter1")
|
||||
@@ -146,7 +146,7 @@ def test_batching_exception(serve_instance):
|
||||
b_config = BackendConfig(max_batch_size=5)
|
||||
serve.create_backend(
|
||||
NoListReturned, "exception:v1", backend_config=b_config)
|
||||
serve.link("exception-test", "exception:v1")
|
||||
serve.set_traffic("exception-test", {"exception:v1": 1.0})
|
||||
|
||||
handle = serve.get_handle("exception-test")
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
|
||||
@@ -33,7 +33,7 @@ def test_http_proxy_failure(serve_instance):
|
||||
return "hello1"
|
||||
|
||||
serve.create_backend(function, "proxy_failure:v1")
|
||||
serve.link("proxy_failure", "proxy_failure:v1")
|
||||
serve.set_traffic("proxy_failure", {"proxy_failure:v1": 1.0})
|
||||
|
||||
assert request_with_retries("/proxy_failure", timeout=0.1).text == "hello1"
|
||||
|
||||
@@ -47,7 +47,7 @@ def test_http_proxy_failure(serve_instance):
|
||||
return "hello2"
|
||||
|
||||
serve.create_backend(function, "proxy_failure:v2")
|
||||
serve.link("proxy_failure", "proxy_failure:v2")
|
||||
serve.set_traffic("proxy_failure", {"proxy_failure:v2": 1.0})
|
||||
|
||||
for _ in range(10):
|
||||
response = request_with_retries("/proxy_failure", timeout=30)
|
||||
@@ -67,7 +67,7 @@ def test_router_failure(serve_instance):
|
||||
return "hello1"
|
||||
|
||||
serve.create_backend(function, "router_failure:v1")
|
||||
serve.link("router_failure", "router_failure:v1")
|
||||
serve.set_traffic("router_failure", {"router_failure:v1": 1.0})
|
||||
|
||||
assert request_with_retries("/router_failure", timeout=5).text == "hello1"
|
||||
|
||||
@@ -81,7 +81,7 @@ def test_router_failure(serve_instance):
|
||||
return "hello2"
|
||||
|
||||
serve.create_backend(function, "router_failure:v2")
|
||||
serve.link("router_failure", "router_failure:v2")
|
||||
serve.set_traffic("router_failure", {"router_failure:v2": 1.0})
|
||||
|
||||
for _ in range(10):
|
||||
response = request_with_retries("/router_failure", timeout=30)
|
||||
@@ -106,7 +106,7 @@ def test_worker_restart(serve_instance):
|
||||
return os.getpid()
|
||||
|
||||
serve.create_backend(Worker1, "worker_failure:v1")
|
||||
serve.link("worker_failure", "worker_failure:v1")
|
||||
serve.set_traffic("worker_failure", {"worker_failure:v1": 1.0})
|
||||
|
||||
# Get the PID of the worker.
|
||||
old_pid = request_with_retries("/worker_failure", timeout=0.1).text
|
||||
@@ -164,7 +164,7 @@ def test_worker_replica_failure(serve_instance):
|
||||
backend_config = serve.get_backend_config("replica_failure")
|
||||
backend_config.num_replicas = 2
|
||||
serve.set_backend_config("replica_failure", backend_config)
|
||||
serve.link("replica_failure", "replica_failure")
|
||||
serve.set_traffic("replica_failure", {"replica_failure": 1.0})
|
||||
|
||||
# Wait until both replicas have been started.
|
||||
responses = set()
|
||||
|
||||
@@ -20,10 +20,10 @@ def test_handle_in_endpoint(serve_instance):
|
||||
|
||||
serve.create_endpoint("endpoint1", "/endpoint1", methods=["GET", "POST"])
|
||||
serve.create_backend(Endpoint1, "endpoint1:v0")
|
||||
serve.link("endpoint1", "endpoint1:v0")
|
||||
serve.set_traffic("endpoint1", {"endpoint1:v0": 1.0})
|
||||
|
||||
serve.create_endpoint("endpoint2", "/endpoint2", methods=["GET", "POST"])
|
||||
serve.create_backend(Endpoint2, "endpoint2:v0")
|
||||
serve.link("endpoint2", "endpoint2:v0")
|
||||
serve.set_traffic("endpoint2", {"endpoint2:v0": 1.0})
|
||||
|
||||
assert requests.get("http://127.0.0.1:8000/endpoint2").text == "hello"
|
||||
|
||||
@@ -19,7 +19,7 @@ def driver(flask_request):
|
||||
|
||||
serve.create_endpoint("driver", "/driver")
|
||||
serve.create_backend(driver, "driver")
|
||||
serve.split("driver", {{"driver": 1.0}})
|
||||
serve.set_traffic("driver", {{"driver": 1.0}})
|
||||
""".format(ray.worker._global_node._redis_address)
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
|
||||
|
||||
Reference in New Issue
Block a user