mirror of
https://github.com/wassname/ray.git
synced 2026-07-06 05:16:30 +08:00
[Serve] Fix Serve Release Tests (#12777)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
cluster_name: default
|
||||
min_workers: 22
|
||||
max_workers: 22
|
||||
initial_workers: 22
|
||||
min_workers: 5
|
||||
max_workers: 5
|
||||
initial_workers: 5
|
||||
autoscaling_mode: default
|
||||
docker:
|
||||
image: 'anyscale/ray-ml:latest'
|
||||
@@ -28,6 +28,7 @@ initialization_commands: []
|
||||
setup_commands:
|
||||
- apt-get install build-essential libssl-dev git -y
|
||||
- 'rm -r wrk || true && git clone https://github.com/wg/wrk.git wrk && cd wrk && make -j && cp wrk /usr/local/bin'
|
||||
- ray install-nightly
|
||||
head_setup_commands: []
|
||||
worker_setup_commands: []
|
||||
head_start_ray_commands:
|
||||
|
||||
@@ -86,13 +86,14 @@ async def main():
|
||||
client.create_backend("backend", backend)
|
||||
client.create_endpoint("endpoint", backend="backend", route="/api")
|
||||
for intermediate_handles in [False, True]:
|
||||
if (intermediate_handles):
|
||||
if intermediate_handles:
|
||||
|
||||
client.create_endpoint(
|
||||
"backend", backend="backend", route="/backend")
|
||||
|
||||
class forwardActor:
|
||||
def __init__(self):
|
||||
client = serve.connect()
|
||||
self.handle = client.get_handle("backend")
|
||||
|
||||
def __call__(self, _):
|
||||
|
||||
@@ -36,73 +36,76 @@ from ray import serve
|
||||
from ray.serve import BackendConfig
|
||||
from ray.serve.utils import logger
|
||||
|
||||
from ray.util.placement_group import (placement_group, remove_placement_group)
|
||||
from ray.util.placement_group import placement_group, remove_placement_group
|
||||
|
||||
ray.shutdown()
|
||||
ray.init(address="auto")
|
||||
client = serve.start()
|
||||
|
||||
# These numbers need to correspond with the autoscaler config file.
|
||||
# The number of remote nodes in the autoscaler should upper bound
|
||||
# these because sometimes nodes fail to update.
|
||||
num_workers = 20
|
||||
expected_num_nodes = num_workers + 1
|
||||
cpus_per_node = 4
|
||||
num_remote_cpus = expected_num_nodes * cpus_per_node
|
||||
# We ask for more worker but only need to run on smaller subset.
|
||||
# This should account for worker nodes failed to launch.
|
||||
expected_num_nodes = 6
|
||||
num_replicas = 11
|
||||
# wrk HTTP load testing config
|
||||
num_connections = 20
|
||||
num_threads = 2
|
||||
time_to_run = "20s"
|
||||
|
||||
# Wait until the expected number of nodes have joined the cluster.
|
||||
while True:
|
||||
num_nodes = len(ray.nodes())
|
||||
num_nodes = len(list(filter(lambda node: node["Alive"], ray.nodes())))
|
||||
logger.info("Waiting for nodes {}/{}".format(num_nodes,
|
||||
expected_num_nodes))
|
||||
if num_nodes >= expected_num_nodes:
|
||||
break
|
||||
time.sleep(5)
|
||||
|
||||
logger.info("Nodes have all joined. There are %s resources.",
|
||||
ray.cluster_resources())
|
||||
|
||||
client = serve.start()
|
||||
|
||||
|
||||
def hey(_):
|
||||
time.sleep(0.01) # Sleep for 10ms
|
||||
return b"hey"
|
||||
|
||||
|
||||
num_connections = int(num_remote_cpus * 0.75)
|
||||
num_threads = 2
|
||||
time_to_run = "10s"
|
||||
|
||||
pg = placement_group(
|
||||
[{
|
||||
"CPU": 1
|
||||
} for _ in range(expected_num_nodes)], strategy="STRICT_SPREAD")
|
||||
ray.get(pg.ready())
|
||||
|
||||
# The number of replicas is the number of cores remaining after accounting
|
||||
# for the one HTTP proxy actor on each node, the "hey" requester task on each
|
||||
# node, and the serve controller.
|
||||
# num_replicas = expected_num_nodes * (cpus_per_node - 2) - 1
|
||||
num_replicas = ray.available_resources()["CPU"]
|
||||
logger.info("Starting %i replicas", num_replicas)
|
||||
client.create_backend(
|
||||
"hey", hey, config=BackendConfig(num_replicas=num_replicas))
|
||||
client.create_endpoint("hey", backend="hey", route="/hey")
|
||||
|
||||
|
||||
@ray.remote
|
||||
@ray.remote(num_cpus=0)
|
||||
def run_wrk():
|
||||
logger.info("Warming up for ~3 seconds")
|
||||
for _ in range(5):
|
||||
resp = requests.get("http://127.0.0.1:8000/hey").text
|
||||
logger.info("Received response \'" + resp + "\'")
|
||||
time.sleep(0.5)
|
||||
logger.info("Warming up")
|
||||
for _ in range(10):
|
||||
try:
|
||||
resp = requests.get("http://127.0.0.1:8000/hey").text
|
||||
logger.info("Received response '" + resp + "'")
|
||||
time.sleep(0.5)
|
||||
except Exception as e:
|
||||
logger.info(f"Got exception {e}")
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
"wrk", "-c",
|
||||
str(num_connections), "-t",
|
||||
str(num_threads), "-d", time_to_run, "http://127.0.0.1:8000/hey"
|
||||
"wrk",
|
||||
"-c",
|
||||
str(num_connections),
|
||||
"-t",
|
||||
str(num_threads),
|
||||
"-d",
|
||||
time_to_run,
|
||||
"http://127.0.0.1:8000/hey",
|
||||
],
|
||||
stdout=subprocess.PIPE)
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
return result.stdout.decode()
|
||||
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ initialization_commands: []
|
||||
setup_commands:
|
||||
- apt-get install build-essential libssl-dev git -y
|
||||
- 'rm -r wrk || true && git clone https://github.com/wg/wrk.git wrk && cd wrk && make -j && cp wrk /usr/local/bin'
|
||||
- ray install-nightly
|
||||
head_setup_commands: []
|
||||
worker_setup_commands: []
|
||||
head_start_ray_commands:
|
||||
|
||||
Reference in New Issue
Block a user