diff --git a/release/stress_tests/autoscaler-cluster.yaml b/release/stress_tests/autoscaler-cluster.yaml new file mode 100644 index 000000000..a75b04fae --- /dev/null +++ b/release/stress_tests/autoscaler-cluster.yaml @@ -0,0 +1,108 @@ +#################################################################### +# All nodes in this cluster will auto-terminate in 1 hour +#################################################################### + +# An unique identifier for the head node and workers of this cluster. +cluster_name: autoscaler-stress-test-1.1.0-alex + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 100 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 100 + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-1 + availability_zone: us-west-1a + cache_stopped_nodes: False + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Provider-specific config for the head node, e.g. instance type. By default +# Ray will auto-configure unspecified fields such as SubnetId and KeyName. +# For more documentation on available fields, see: +# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances +head_node: + InstanceType: m4.16xlarge + ImageId: ami-0cc472544ce594a19 # Custom ami + + # Set primary volume to 25 GiB + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 100 + + # Additional options in the boto docs. + +# Provider-specific config for worker nodes, e.g. instance type. By default +# Ray will auto-configure unspecified fields such as SubnetId and KeyName. +# For more documentation on available fields, see: +# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances +worker_nodes: + InstanceType: m4.large + ImageId: ami-0cc472544ce594a19 # Custom ami + + # Set primary volume to 25 GiB + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 100 + + # Run workers on spot by default. Comment this out to use on-demand. + InstanceMarketOptions: + MarketType: spot + # Additional options can be found in the boto docs, e.g. + # SpotOptions: + # MaxPrice: MAX_HOURLY_PRICE + + # Additional options in the boto docs. + +# List of shell commands to run to set up nodes. +setup_commands: + # Uncomment these if you want to build ray from source. + # - sudo apt-get -qq update + # - sudo apt-get install -y build-essential curl unzip + # # Build Ray. + # - git clone https://github.com/ray-project/ray || true + # - ray/ci/travis/install-bazel.sh + - pip install -U pip + - pip install terminado + - pip install boto3==1.4.8 cython==0.29.0 + # - cd ray/python; git checkout master; git pull; pip install -e . --verbose + - pip install -U pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-1.2.0.dev0-cp38-cp38-manylinux2014_x86_64.whl + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --num-gpus=100 diff --git a/release/stress_tests/workloads/test_many_tasks.py b/release/stress_tests/workloads/test_many_tasks.py index 1abebfb51..bc95b0407 100644 --- a/release/stress_tests/workloads/test_many_tasks.py +++ b/release/stress_tests/workloads/test_many_tasks.py @@ -45,72 +45,91 @@ class Actor(object): # Stage 0: Submit a bunch of small tasks with large returns. -stage_0_iterations = [] -start_time = time.time() -logger.info("Submitting many tasks with large returns.") -for i in range(10): - iteration_start = time.time() - logger.info("Iteration %s", i) - ray.get([f.remote(1000000) for _ in range(1000)]) - stage_0_iterations.append(time.time() - iteration_start) +def stage0(): + stage_0_iterations = [] + start_time = time.time() + logger.info("Submitting many tasks with large returns.") + for i in range(10): + iteration_start = time.time() + logger.info("Iteration %s", i) + ray.get([f.remote(1000000) for _ in range(1000)]) + stage_0_iterations.append(time.time() - iteration_start) -stage_0_time = time.time() - start_time + return time.time() - start_time + + +stage_0_time = stage0() logger.info("Finished stage 0 after %s seconds.", stage_0_time) -# Stage 1: Launch a bunch of tasks. -stage_1_iterations = [] -start_time = time.time() -logger.info("Submitting many tasks.") -for i in range(10): - iteration_start = time.time() - logger.info("Iteration %s", i) - ray.get([f.remote(0) for _ in range(100000)]) - stage_1_iterations.append(time.time() - iteration_start) -stage_1_time = time.time() - start_time +# Stage 1: Launch a bunch of tasks. +def stage1(): + stage_1_iterations = [] + start_time = time.time() + logger.info("Submitting many tasks.") + for i in range(10): + iteration_start = time.time() + logger.info("Iteration %s", i) + ray.get([f.remote(0) for _ in range(100000)]) + stage_1_iterations.append(time.time() - iteration_start) + + return time.time() - start_time, stage_1_iterations + + +stage_1_time, stage_1_iterations = stage1() logger.info("Finished stage 1 after %s seconds.", stage_1_time) + # Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This # test starts to fail if we increase the number of tasks in the inner loop from # 500 to 1000. (approximately 615 seconds) -stage_2_iterations = [] -start_time = time.time() -logger.info("Submitting tasks with many dependencies.") -x_ids = [] -for _ in range(5): - iteration_start = time.time() - for i in range(20): - logger.info("Iteration %s. Cumulative time %s seconds", i, - time.time() - start_time) - x_ids = [f.remote(0, *x_ids) for _ in range(500)] - ray.get(x_ids) - stage_2_iterations.append(time.time() - iteration_start) - logger.info("Finished after %s seconds.", time.time() - start_time) +def stage2(): + stage_2_iterations = [] + start_time = time.time() + logger.info("Submitting tasks with many dependencies.") + x_ids = [] + for _ in range(5): + iteration_start = time.time() + for i in range(20): + logger.info("Iteration %s. Cumulative time %s seconds", i, + time.time() - start_time) + x_ids = [f.remote(0, *x_ids) for _ in range(500)] + ray.get(x_ids) + stage_2_iterations.append(time.time() - iteration_start) + logger.info("Finished after %s seconds.", time.time() - start_time) + return time.time() - start_time, stage_2_iterations -stage_2_time = time.time() - start_time + +stage_2_time, stage_2_iterations = stage2() logger.info("Finished stage 2 after %s seconds.", stage_2_time) -# Create a bunch of actors. -start_time = time.time() -logger.info("Creating %s actors.", num_remote_cpus) -actors = [Actor.remote() for _ in range(num_remote_cpus)] -stage_3_creation_time = time.time() - start_time -logger.info("Finished stage 3 actor creation in %s seconds.", - stage_3_creation_time) -# Submit a bunch of small tasks to each actor. (approximately 1070 seconds) -start_time = time.time() -logger.info("Submitting many small actor tasks.") -for N in [1000, 100000]: - x_ids = [] - for i in range(N): - x_ids = [a.method.remote(0) for a in actors] - if i % 100 == 0: - logger.info("Submitted {}".format(i * len(actors))) - ray.get(x_ids) -stage_3_time = time.time() - start_time +# Create a bunch of actors. +def stage3(): + start_time = time.time() + logger.info("Creating %s actors.", num_remote_cpus) + actors = [Actor.remote() for _ in range(num_remote_cpus)] + stage_3_creation_time = time.time() - start_time + logger.info("Finished stage 3 actor creation in %s seconds.", + stage_3_creation_time) + + # Submit a bunch of small tasks to each actor. (approximately 1070 seconds) + start_time = time.time() + logger.info("Submitting many small actor tasks.") + for N in [1000, 100000]: + x_ids = [] + for i in range(N): + x_ids = [a.method.remote(0) for a in actors] + if i % 100 == 0: + logger.info("Submitted {}".format(i * len(actors))) + ray.get(x_ids) + return time.time() - start_time, stage_3_creation_time + + +stage_3_time, stage_3_creation_time = stage3() logger.info("Finished stage 3 in %s seconds.", stage_3_time) + # This tests https://github.com/ray-project/ray/issues/10150. The only way to # integration test this is via performance. The goal is to fill up the cluster # so that all tasks can be run, but spillback is required. Since the driver @@ -119,38 +138,39 @@ logger.info("Finished stage 3 in %s seconds.", stage_3_time) # task will require O(N) queries. Since we limit the number of inflight # requests, we will run into head of line blocking and we should be able to # measure this timing. -num_tasks = int(ray.cluster_resources()["GPU"]) -logger.info(f"Scheduling many tasks for spillback.") +def stage4(): + num_tasks = int(ray.cluster_resources()["GPU"]) + logger.info(f"Scheduling many tasks for spillback.") + + @ray.remote(num_gpus=1) + def func(t): + if t % 100 == 0: + logger.info(f"[spillback test] {t}/{num_tasks}") + start = time.perf_counter() + time.sleep(1) + end = time.perf_counter() + return start, end, ray.worker.global_worker.node.unique_id + + results = ray.get([func.remote(i) for i in range(num_tasks)]) + + host_to_start_times = defaultdict(list) + for start, end, host in results: + host_to_start_times[host].append(start) + + spreads = [] + for host in host_to_start_times: + last = max(host_to_start_times[host]) + first = min(host_to_start_times[host]) + spread = last - first + spreads.append(spread) + logger.info(f"Spread: {last - first}\tLast: {last}\tFirst: {first}") + + avg_spread = sum(spreads) / len(spreads) + logger.info(f"Avg spread: {sum(spreads)/len(spreads)}") + return avg_spread -@ray.remote(num_gpus=1) -def func(t): - if t % 100 == 0: - logger.info(f"[spillback test] {t}/{num_tasks}") - start = time.perf_counter() - time.sleep(1) - end = time.perf_counter() - return start, end, ray.worker.global_worker.node.unique_id - - -results = ray.get([func.remote(i) for i in range(num_tasks)]) - -host_to_start_times = defaultdict(list) -for start, end, host in results: - host_to_start_times[host].append(start) - -spreads = [] -for host in host_to_start_times: - last = max(host_to_start_times[host]) - first = min(host_to_start_times[host]) - spread = last - first - spreads.append(spread) - logger.info(f"Spread: {last - first}\tLast: {last}\tFirst: {first}") - -# avg_spread ~ 115 with Ray 1.0 scheduler. ~695 with (buggy) 0.8.7 scheduler. -avg_spread = sum(spreads) / len(spreads) -logger.info(f"Avg spread: {sum(spreads)/len(spreads)}") - +stage_4_spread = stage4() print("Stage 0 results:") print("\tTotal time: {}".format(stage_0_time)) @@ -173,7 +193,8 @@ print("\tActor creation time: {}".format(stage_3_creation_time)) print("\tTotal time: {}".format(stage_3_time)) print("Stage 4 results:") -print(f"\tScheduling spread: {avg_spread}.") +# avg_spread ~ 115 with Ray 1.0 scheduler. ~695 with (buggy) 0.8.7 scheduler. +print(f"\tScheduling spread: {stage_4_spread}.") # TODO(rkn): The test below is commented out because it currently does not # pass.