diff --git a/python/ray/serve/benchmarks/cluster.yaml b/python/ray/serve/benchmarks/cluster.yaml index 831f0f1ff..0a4388533 100644 --- a/python/ray/serve/benchmarks/cluster.yaml +++ b/python/ray/serve/benchmarks/cluster.yaml @@ -1,7 +1,7 @@ cluster_name: default -min_workers: 22 -max_workers: 22 -initial_workers: 22 +min_workers: 5 +max_workers: 5 +initial_workers: 5 autoscaling_mode: default docker: image: 'anyscale/ray-ml:latest' @@ -28,6 +28,7 @@ initialization_commands: [] setup_commands: - apt-get install build-essential libssl-dev git -y - 'rm -r wrk || true && git clone https://github.com/wg/wrk.git wrk && cd wrk && make -j && cp wrk /usr/local/bin' + - ray install-nightly head_setup_commands: [] worker_setup_commands: [] head_start_ray_commands: diff --git a/python/ray/serve/benchmarks/microbenchmark.py b/python/ray/serve/benchmarks/microbenchmark.py index e4f058e0f..4a34e3418 100644 --- a/python/ray/serve/benchmarks/microbenchmark.py +++ b/python/ray/serve/benchmarks/microbenchmark.py @@ -86,13 +86,14 @@ async def main(): client.create_backend("backend", backend) client.create_endpoint("endpoint", backend="backend", route="/api") for intermediate_handles in [False, True]: - if (intermediate_handles): + if intermediate_handles: client.create_endpoint( "backend", backend="backend", route="/backend") class forwardActor: def __init__(self): + client = serve.connect() self.handle = client.get_handle("backend") def __call__(self, _): diff --git a/python/ray/serve/benchmarks/scalability.py b/python/ray/serve/benchmarks/scalability.py index d11564567..c424ae321 100644 --- a/python/ray/serve/benchmarks/scalability.py +++ b/python/ray/serve/benchmarks/scalability.py @@ -36,73 +36,76 @@ from ray import serve from ray.serve import BackendConfig from ray.serve.utils import logger -from ray.util.placement_group import (placement_group, remove_placement_group) +from ray.util.placement_group import placement_group, remove_placement_group ray.shutdown() ray.init(address="auto") -client = serve.start() -# These numbers need to correspond with the autoscaler config file. -# The number of remote nodes in the autoscaler should upper bound -# these because sometimes nodes fail to update. -num_workers = 20 -expected_num_nodes = num_workers + 1 -cpus_per_node = 4 -num_remote_cpus = expected_num_nodes * cpus_per_node +# We ask for more worker but only need to run on smaller subset. +# This should account for worker nodes failed to launch. +expected_num_nodes = 6 +num_replicas = 11 +# wrk HTTP load testing config +num_connections = 20 +num_threads = 2 +time_to_run = "20s" # Wait until the expected number of nodes have joined the cluster. while True: - num_nodes = len(ray.nodes()) + num_nodes = len(list(filter(lambda node: node["Alive"], ray.nodes()))) logger.info("Waiting for nodes {}/{}".format(num_nodes, expected_num_nodes)) if num_nodes >= expected_num_nodes: break time.sleep(5) + logger.info("Nodes have all joined. There are %s resources.", ray.cluster_resources()) +client = serve.start() + def hey(_): time.sleep(0.01) # Sleep for 10ms return b"hey" -num_connections = int(num_remote_cpus * 0.75) -num_threads = 2 -time_to_run = "10s" - pg = placement_group( [{ "CPU": 1 } for _ in range(expected_num_nodes)], strategy="STRICT_SPREAD") ray.get(pg.ready()) -# The number of replicas is the number of cores remaining after accounting -# for the one HTTP proxy actor on each node, the "hey" requester task on each -# node, and the serve controller. -# num_replicas = expected_num_nodes * (cpus_per_node - 2) - 1 -num_replicas = ray.available_resources()["CPU"] logger.info("Starting %i replicas", num_replicas) client.create_backend( "hey", hey, config=BackendConfig(num_replicas=num_replicas)) client.create_endpoint("hey", backend="hey", route="/hey") -@ray.remote +@ray.remote(num_cpus=0) def run_wrk(): - logger.info("Warming up for ~3 seconds") - for _ in range(5): - resp = requests.get("http://127.0.0.1:8000/hey").text - logger.info("Received response \'" + resp + "\'") - time.sleep(0.5) + logger.info("Warming up") + for _ in range(10): + try: + resp = requests.get("http://127.0.0.1:8000/hey").text + logger.info("Received response '" + resp + "'") + time.sleep(0.5) + except Exception as e: + logger.info(f"Got exception {e}") result = subprocess.run( [ - "wrk", "-c", - str(num_connections), "-t", - str(num_threads), "-d", time_to_run, "http://127.0.0.1:8000/hey" + "wrk", + "-c", + str(num_connections), + "-t", + str(num_threads), + "-d", + time_to_run, + "http://127.0.0.1:8000/hey", ], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + ) return result.stdout.decode() diff --git a/python/ray/serve/benchmarks/single.yaml b/python/ray/serve/benchmarks/single.yaml index b93577fca..4500d65ab 100644 --- a/python/ray/serve/benchmarks/single.yaml +++ b/python/ray/serve/benchmarks/single.yaml @@ -23,6 +23,7 @@ initialization_commands: [] setup_commands: - apt-get install build-essential libssl-dev git -y - 'rm -r wrk || true && git clone https://github.com/wg/wrk.git wrk && cd wrk && make -j && cp wrk /usr/local/bin' + - ray install-nightly head_setup_commands: [] worker_setup_commands: [] head_start_ray_commands: diff --git a/release/long_running_tests/cluster.yaml b/release/long_running_tests/cluster.yaml index 074d445ba..e51cd010b 100644 --- a/release/long_running_tests/cluster.yaml +++ b/release/long_running_tests/cluster.yaml @@ -1,48 +1,17 @@ -cluster_name: default -min_workers: 0 -max_workers: 0 -target_utilization_fraction: 0.8 -idle_timeout_minutes: 5 +cluster_name: ray-release-long-running-tests + +docker: + image: anyscale/ray:latest + container_name: ray_container + pull_before_run: False -# Cloud-provider specific configuration. provider: type: aws region: us-west-2 - availability_zone: us-west-2a + availability_zone: us-west-2a, us-west-2b, us-west-2c + auth: ssh_user: ubuntu head_node: - InstanceType: m5.2xlarge - ImageId: ami-0888a3b5189309429 # DLAMI 7/1/19 - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 150 - -worker_nodes: - InstanceType: m5.large - ImageId: ami-0888a3b5189309429 # DLAMI 7/1/19 - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 150 - - # Run workers on spot by default. Comment this out to use on-demand. - InstanceMarketOptions: - MarketType: spot - -# List of shell commands to run to set up nodes. -setup_commands: [] - -# Custom commands that will be run on the head node after common setup. -head_setup_commands: [] - -# Custom commands that will be run on worker nodes after common setup. -worker_setup_commands: [] - -# Command to start ray on the head node. You don't need to change this. -head_start_ray_commands: [] - -# Command to start ray on worker nodes. You don't need to change this. -worker_start_ray_commands: [] + InstanceType: m5.xlarge diff --git a/release/long_running_tests/run.sh b/release/long_running_tests/run.sh index eba55e842..e48d7c899 100644 --- a/release/long_running_tests/run.sh +++ b/release/long_running_tests/run.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -ray_version="" +ray_version="" commit="" ray_branch="" workload="" @@ -48,20 +48,20 @@ echo "commit: $commit" echo "branch: $ray_branch" echo "workload: $workload" -wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp36-cp36m-manylinux2014_x86_64.whl" +wheel="https://s3-us-west-2.amazonaws.com/ray-wheels/$ray_branch/$commit/ray-$ray_version-cp37-cp37m-manylinux2014_x86_64.whl" -echo set-window-option -g mouse on > ~/.tmux.conf -echo 'termcapinfo xterm* ti@:te@' > ~/.screenrc # Serve load testing tool -rm -r wrk || true && git clone https://github.com/wg/wrk.git wrk && cd wrk && make -j && sudo cp wrk /usr/local/bin -pip install -U pip -unset RAY_ADDRESS -source activate tensorflow_p36 -conda remove -y --force wrapt || true +cur_dir=$(pwd) +cd /tmp && rm -rf wrk && git clone https://github.com/wg/wrk.git wrk && cd wrk && make -j && cp wrk /usr/local/bin +cd "$cur_dir" || exit + pip install --upgrade pip pip install -U tensorflow==1.14 -pip install -q -U "$wheel" Click +pip install -q -U "$wheel" pip install -q "ray[all]" "gym[atari]" -cd .. + +ray stop && sleep 2 + +unset RAY_ADDRESS python "./workloads/$workload.py" diff --git a/release/long_running_tests/workloads/serve.py b/release/long_running_tests/workloads/serve.py index 6d404ac51..59b230776 100644 --- a/release/long_running_tests/workloads/serve.py +++ b/release/long_running_tests/workloads/serve.py @@ -11,7 +11,7 @@ from ray.cluster_utils import Cluster num_redis_shards = 1 redis_max_memory = 10**8 object_store_memory = 10**8 -num_nodes = 5 +num_nodes = 1 cluster = Cluster() for i in range(num_nodes): cluster.add_node( @@ -22,21 +22,20 @@ for i in range(num_nodes): resources={str(i): 2}, object_store_memory=object_store_memory, redis_max_memory=redis_max_memory, - dashboard_host="0.0.0.0") + dashboard_host="0.0.0.0", + ) ray.init(address=cluster.address, dashboard_host="0.0.0.0") client = serve.start() @serve.accept_batch -def echo(_): +def echo(requests_batch): time.sleep(0.01) # Sleep for 10ms - ray.show_in_dashboard( - str(serve.context.batch_size), key="Current batch size") - return ["hi {}".format(i) for i in range(serve.context.batch_size)] + return ["hi" for _ in range(len(requests_batch))] -config = {"num_replicas": 30, "max_batch_size": 16} +config = {"num_replicas": 7, "max_batch_size": 16} client.create_backend("echo:v1", echo, config=config) client.create_endpoint("echo", backend="echo:v1", route="/echo") @@ -53,12 +52,18 @@ time_to_run = "60m" while True: proc = subprocess.Popen( [ - "wrk", "-c", - str(connections), "-t", - str(num_threads), "-s", time_to_run, "http://127.0.0.1:8000/echo" + "wrk", + "-c", + str(connections), + "-t", + str(num_threads), + "-d", + time_to_run, + "http://127.0.0.1:8000/echo", ], stdout=PIPE, - stderr=PIPE) + stderr=PIPE, + ) print("started load testing") proc.wait() out, err = proc.communicate() diff --git a/release/long_running_tests/workloads/serve_failure.py b/release/long_running_tests/workloads/serve_failure.py index 129853289..534dcbda7 100644 --- a/release/long_running_tests/workloads/serve_failure.py +++ b/release/long_running_tests/workloads/serve_failure.py @@ -11,19 +11,20 @@ from ray.cluster_utils import Cluster num_redis_shards = 1 redis_max_memory = 10**8 object_store_memory = 10**8 -num_nodes = 5 -cpus_per_node = 2 +num_nodes = 1 +cpus_per_node = 10 cluster = Cluster() for i in range(num_nodes): cluster.add_node( redis_port=6379 if i == 0 else None, num_redis_shards=num_redis_shards if i == 0 else None, - num_cpus=2, + num_cpus=16, num_gpus=0, resources={str(i): 2}, object_store_memory=object_store_memory, redis_max_memory=redis_max_memory, - dashboard_host="0.0.0.0") + dashboard_host="0.0.0.0", + ) ray.init( address=cluster.address, dashboard_host="0.0.0.0", log_to_driver=False)