diff --git a/ci/stress_tests/test_dead_actors.py b/ci/stress_tests/test_dead_actors.py index eee7ca2e3..3a9a5a07a 100644 --- a/ci/stress_tests/test_dead_actors.py +++ b/ci/stress_tests/test_dead_actors.py @@ -7,6 +7,7 @@ from __future__ import print_function import logging import numpy as np import sys +import time import ray @@ -15,6 +16,24 @@ logger = logging.getLogger(__name__) ray.init(address="localhost:6379") +# These numbers need to correspond with the autoscaler config file. +# The number of remote nodes in the autoscaler should upper bound +# these because sometimes nodes fail to update. +num_remote_nodes = 100 +head_node_cpus = 2 +num_remote_cpus = num_remote_nodes * head_node_cpus + +# Wait until the expected number of nodes have joined the cluster. +while True: + num_nodes = len(ray.nodes()) + logger.info("Waiting for nodes {}/{}".format(num_nodes, + num_remote_nodes + 1)) + if num_nodes >= num_remote_nodes + 1: + break + time.sleep(5) +logger.info("Nodes have all joined. There are %s resources.", + ray.cluster_resources()) + @ray.remote class Child(object): @@ -61,7 +80,10 @@ parents = [ Parent.remote(num_children, death_probability) for _ in range(num_parents) ] +start = time.time() +loop_times = [] for i in range(100): + loop_start = time.time() ray.get([parent.ping.remote(10) for parent in parents]) # Kill a parent actor with some probability. @@ -72,3 +94,9 @@ for i in range(100): parents[parent_index] = Parent.remote(num_children, death_probability) logger.info("Finished trial %s", i) + loop_times.append(time.time() - loop_start) + +print("Finished in: {}s".format(time.time() - start)) +print("Average iteration time: {}s".format(sum(loop_times) / len(loop_times))) +print("Max iteration time: {}s".format(max(loop_times))) +print("Min iteration time: {}s".format(min(loop_times))) diff --git a/ci/stress_tests/test_many_tasks.py b/ci/stress_tests/test_many_tasks.py index 70b36bc19..8d9777a71 100644 --- a/ci/stress_tests/test_many_tasks.py +++ b/ci/stress_tests/test_many_tasks.py @@ -47,33 +47,46 @@ class Actor(object): return np.ones(size, dtype=np.uint8) -# Launch a bunch of tasks. (approximately 200 seconds) +# Stage 1: Launch a bunch of tasks. +stage_1_iterations = [] start_time = time.time() logger.info("Submitting many tasks.") for i in range(10): + iteration_start = time.time() logger.info("Iteration %s", i) ray.get([f.remote(0) for _ in range(100000)]) -logger.info("Finished after %s seconds.", time.time() - start_time) + stage_1_iterations.append(time.time() - iteration_start) + +stage_1_time = time.time() - start_time +logger.info("Finished stage 1 after %s seconds.", stage_1_time) # Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This # test starts to fail if we increase the number of tasks in the inner loop from # 500 to 1000. (approximately 615 seconds) +stage_2_iterations = [] start_time = time.time() logger.info("Submitting tasks with many dependencies.") x_ids = [] for _ in range(5): + iteration_start = time.time() for i in range(20): logger.info("Iteration %s. Cumulative time %s seconds", i, time.time() - start_time) x_ids = [f.remote(0, *x_ids) for _ in range(500)] ray.get(x_ids) + stage_2_iterations.append(time.time() - iteration_start) logger.info("Finished after %s seconds.", time.time() - start_time) +stage_2_time = time.time() - start_time +logger.info("Finished stage 2 after %s seconds.", stage_2_time) + # Create a bunch of actors. start_time = time.time() logger.info("Creating %s actors.", num_remote_cpus) actors = [Actor.remote() for _ in range(num_remote_cpus)] -logger.info("Finished after %s seconds.", time.time() - start_time) +stage_3_creation_time = time.time() - start_time +logger.info("Finished stage 3 actor creation in %s seconds.", + stage_3_creation_time) # Submit a bunch of small tasks to each actor. (approximately 1070 seconds) start_time = time.time() @@ -85,7 +98,26 @@ for N in [1000, 100000]: if i % 100 == 0: logger.info("Submitted {}".format(i * len(actors))) ray.get(x_ids) -logger.info("Finished after %s seconds.", time.time() - start_time) +stage_3_time = time.time() - start_time +logger.info("Finished stage 3 in %s seconds.", stage_3_time) + +print("Stage 1 results:") +print("\tTotal time: {}".format(stage_1_time)) +print("\tAverage iteration time: {}".format( + sum(stage_1_iterations) / len(stage_1_iterations))) +print("\tMax iteration time: {}".format(max(stage_1_iterations))) +print("\tMin iteration time: {}".format(min(stage_1_iterations))) + +print("Stage 2 results:") +print("\tTotal time: {}".format(stage_2_time)) +print("\tAverage iteration time: {}".format( + sum(stage_2_iterations) / len(stage_2_iterations))) +print("\tMax iteration time: {}".format(max(stage_2_iterations))) +print("\tMin iteration time: {}".format(min(stage_2_iterations))) + +print("Stage 3 results:") +print("\tActor creation time: {}".format(stage_3_creation_time)) +print("\tTotal time: {}".format(stage_3_time)) # TODO(rkn): The test below is commented out because it currently does not # pass.