[release tests] test_many_tasks fix (#12984)

This commit is contained in:
Alex Wu
2020-12-22 11:05:33 -08:00
committed by GitHub
parent 01faeabc17
commit a79c9fcac3
2 changed files with 211 additions and 82 deletions
@@ -0,0 +1,108 @@
####################################################################
# All nodes in this cluster will auto-terminate in 1 hour
####################################################################
# An unique identifier for the head node and workers of this cluster.
cluster_name: autoscaler-stress-test-1.1.0-alex
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 100
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 100
# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-1
availability_zone: us-west-1a
cache_stopped_nodes: False
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
head_node:
InstanceType: m4.16xlarge
ImageId: ami-0cc472544ce594a19 # Custom ami
# Set primary volume to 25 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100
# Additional options in the boto docs.
# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
worker_nodes:
InstanceType: m4.large
ImageId: ami-0cc472544ce594a19 # Custom ami
# Set primary volume to 25 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100
# Run workers on spot by default. Comment this out to use on-demand.
InstanceMarketOptions:
MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# MaxPrice: MAX_HOURLY_PRICE
# Additional options in the boto docs.
# List of shell commands to run to set up nodes.
setup_commands:
# Uncomment these if you want to build ray from source.
# - sudo apt-get -qq update
# - sudo apt-get install -y build-essential curl unzip
# # Build Ray.
# - git clone https://github.com/ray-project/ray || true
# - ray/ci/travis/install-bazel.sh
- pip install -U pip
- pip install terminado
- pip install boto3==1.4.8 cython==0.29.0
# - cd ray/python; git checkout master; git pull; pip install -e . --verbose
- pip install -U pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-1.2.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --num-gpus=100
+103 -82
View File
@@ -45,72 +45,91 @@ class Actor(object):
# Stage 0: Submit a bunch of small tasks with large returns.
stage_0_iterations = []
start_time = time.time()
logger.info("Submitting many tasks with large returns.")
for i in range(10):
iteration_start = time.time()
logger.info("Iteration %s", i)
ray.get([f.remote(1000000) for _ in range(1000)])
stage_0_iterations.append(time.time() - iteration_start)
def stage0():
stage_0_iterations = []
start_time = time.time()
logger.info("Submitting many tasks with large returns.")
for i in range(10):
iteration_start = time.time()
logger.info("Iteration %s", i)
ray.get([f.remote(1000000) for _ in range(1000)])
stage_0_iterations.append(time.time() - iteration_start)
stage_0_time = time.time() - start_time
return time.time() - start_time
stage_0_time = stage0()
logger.info("Finished stage 0 after %s seconds.", stage_0_time)
# Stage 1: Launch a bunch of tasks.
stage_1_iterations = []
start_time = time.time()
logger.info("Submitting many tasks.")
for i in range(10):
iteration_start = time.time()
logger.info("Iteration %s", i)
ray.get([f.remote(0) for _ in range(100000)])
stage_1_iterations.append(time.time() - iteration_start)
stage_1_time = time.time() - start_time
# Stage 1: Launch a bunch of tasks.
def stage1():
stage_1_iterations = []
start_time = time.time()
logger.info("Submitting many tasks.")
for i in range(10):
iteration_start = time.time()
logger.info("Iteration %s", i)
ray.get([f.remote(0) for _ in range(100000)])
stage_1_iterations.append(time.time() - iteration_start)
return time.time() - start_time, stage_1_iterations
stage_1_time, stage_1_iterations = stage1()
logger.info("Finished stage 1 after %s seconds.", stage_1_time)
# Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This
# test starts to fail if we increase the number of tasks in the inner loop from
# 500 to 1000. (approximately 615 seconds)
stage_2_iterations = []
start_time = time.time()
logger.info("Submitting tasks with many dependencies.")
x_ids = []
for _ in range(5):
iteration_start = time.time()
for i in range(20):
logger.info("Iteration %s. Cumulative time %s seconds", i,
time.time() - start_time)
x_ids = [f.remote(0, *x_ids) for _ in range(500)]
ray.get(x_ids)
stage_2_iterations.append(time.time() - iteration_start)
logger.info("Finished after %s seconds.", time.time() - start_time)
def stage2():
stage_2_iterations = []
start_time = time.time()
logger.info("Submitting tasks with many dependencies.")
x_ids = []
for _ in range(5):
iteration_start = time.time()
for i in range(20):
logger.info("Iteration %s. Cumulative time %s seconds", i,
time.time() - start_time)
x_ids = [f.remote(0, *x_ids) for _ in range(500)]
ray.get(x_ids)
stage_2_iterations.append(time.time() - iteration_start)
logger.info("Finished after %s seconds.", time.time() - start_time)
return time.time() - start_time, stage_2_iterations
stage_2_time = time.time() - start_time
stage_2_time, stage_2_iterations = stage2()
logger.info("Finished stage 2 after %s seconds.", stage_2_time)
# Create a bunch of actors.
start_time = time.time()
logger.info("Creating %s actors.", num_remote_cpus)
actors = [Actor.remote() for _ in range(num_remote_cpus)]
stage_3_creation_time = time.time() - start_time
logger.info("Finished stage 3 actor creation in %s seconds.",
stage_3_creation_time)
# Submit a bunch of small tasks to each actor. (approximately 1070 seconds)
start_time = time.time()
logger.info("Submitting many small actor tasks.")
for N in [1000, 100000]:
x_ids = []
for i in range(N):
x_ids = [a.method.remote(0) for a in actors]
if i % 100 == 0:
logger.info("Submitted {}".format(i * len(actors)))
ray.get(x_ids)
stage_3_time = time.time() - start_time
# Create a bunch of actors.
def stage3():
start_time = time.time()
logger.info("Creating %s actors.", num_remote_cpus)
actors = [Actor.remote() for _ in range(num_remote_cpus)]
stage_3_creation_time = time.time() - start_time
logger.info("Finished stage 3 actor creation in %s seconds.",
stage_3_creation_time)
# Submit a bunch of small tasks to each actor. (approximately 1070 seconds)
start_time = time.time()
logger.info("Submitting many small actor tasks.")
for N in [1000, 100000]:
x_ids = []
for i in range(N):
x_ids = [a.method.remote(0) for a in actors]
if i % 100 == 0:
logger.info("Submitted {}".format(i * len(actors)))
ray.get(x_ids)
return time.time() - start_time, stage_3_creation_time
stage_3_time, stage_3_creation_time = stage3()
logger.info("Finished stage 3 in %s seconds.", stage_3_time)
# This tests https://github.com/ray-project/ray/issues/10150. The only way to
# integration test this is via performance. The goal is to fill up the cluster
# so that all tasks can be run, but spillback is required. Since the driver
@@ -119,38 +138,39 @@ logger.info("Finished stage 3 in %s seconds.", stage_3_time)
# task will require O(N) queries. Since we limit the number of inflight
# requests, we will run into head of line blocking and we should be able to
# measure this timing.
num_tasks = int(ray.cluster_resources()["GPU"])
logger.info(f"Scheduling many tasks for spillback.")
def stage4():
num_tasks = int(ray.cluster_resources()["GPU"])
logger.info(f"Scheduling many tasks for spillback.")
@ray.remote(num_gpus=1)
def func(t):
if t % 100 == 0:
logger.info(f"[spillback test] {t}/{num_tasks}")
start = time.perf_counter()
time.sleep(1)
end = time.perf_counter()
return start, end, ray.worker.global_worker.node.unique_id
results = ray.get([func.remote(i) for i in range(num_tasks)])
host_to_start_times = defaultdict(list)
for start, end, host in results:
host_to_start_times[host].append(start)
spreads = []
for host in host_to_start_times:
last = max(host_to_start_times[host])
first = min(host_to_start_times[host])
spread = last - first
spreads.append(spread)
logger.info(f"Spread: {last - first}\tLast: {last}\tFirst: {first}")
avg_spread = sum(spreads) / len(spreads)
logger.info(f"Avg spread: {sum(spreads)/len(spreads)}")
return avg_spread
@ray.remote(num_gpus=1)
def func(t):
if t % 100 == 0:
logger.info(f"[spillback test] {t}/{num_tasks}")
start = time.perf_counter()
time.sleep(1)
end = time.perf_counter()
return start, end, ray.worker.global_worker.node.unique_id
results = ray.get([func.remote(i) for i in range(num_tasks)])
host_to_start_times = defaultdict(list)
for start, end, host in results:
host_to_start_times[host].append(start)
spreads = []
for host in host_to_start_times:
last = max(host_to_start_times[host])
first = min(host_to_start_times[host])
spread = last - first
spreads.append(spread)
logger.info(f"Spread: {last - first}\tLast: {last}\tFirst: {first}")
# avg_spread ~ 115 with Ray 1.0 scheduler. ~695 with (buggy) 0.8.7 scheduler.
avg_spread = sum(spreads) / len(spreads)
logger.info(f"Avg spread: {sum(spreads)/len(spreads)}")
stage_4_spread = stage4()
print("Stage 0 results:")
print("\tTotal time: {}".format(stage_0_time))
@@ -173,7 +193,8 @@ print("\tActor creation time: {}".format(stage_3_creation_time))
print("\tTotal time: {}".format(stage_3_time))
print("Stage 4 results:")
print(f"\tScheduling spread: {avg_spread}.")
# avg_spread ~ 115 with Ray 1.0 scheduler. ~695 with (buggy) 0.8.7 scheduler.
print(f"\tScheduling spread: {stage_4_spread}.")
# TODO(rkn): The test below is commented out because it currently does not
# pass.