mirror of
https://github.com/wassname/ray.git
synced 2026-07-04 08:11:44 +08:00
Print summaries for stress tests (#6498)
This commit is contained in:
@@ -7,6 +7,7 @@ from __future__ import print_function
|
||||
import logging
|
||||
import numpy as np
|
||||
import sys
|
||||
import time
|
||||
|
||||
import ray
|
||||
|
||||
@@ -15,6 +16,24 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
ray.init(address="localhost:6379")
|
||||
|
||||
# These numbers need to correspond with the autoscaler config file.
|
||||
# The number of remote nodes in the autoscaler should upper bound
|
||||
# these because sometimes nodes fail to update.
|
||||
num_remote_nodes = 100
|
||||
head_node_cpus = 2
|
||||
num_remote_cpus = num_remote_nodes * head_node_cpus
|
||||
|
||||
# Wait until the expected number of nodes have joined the cluster.
|
||||
while True:
|
||||
num_nodes = len(ray.nodes())
|
||||
logger.info("Waiting for nodes {}/{}".format(num_nodes,
|
||||
num_remote_nodes + 1))
|
||||
if num_nodes >= num_remote_nodes + 1:
|
||||
break
|
||||
time.sleep(5)
|
||||
logger.info("Nodes have all joined. There are %s resources.",
|
||||
ray.cluster_resources())
|
||||
|
||||
|
||||
@ray.remote
|
||||
class Child(object):
|
||||
@@ -61,7 +80,10 @@ parents = [
|
||||
Parent.remote(num_children, death_probability) for _ in range(num_parents)
|
||||
]
|
||||
|
||||
start = time.time()
|
||||
loop_times = []
|
||||
for i in range(100):
|
||||
loop_start = time.time()
|
||||
ray.get([parent.ping.remote(10) for parent in parents])
|
||||
|
||||
# Kill a parent actor with some probability.
|
||||
@@ -72,3 +94,9 @@ for i in range(100):
|
||||
parents[parent_index] = Parent.remote(num_children, death_probability)
|
||||
|
||||
logger.info("Finished trial %s", i)
|
||||
loop_times.append(time.time() - loop_start)
|
||||
|
||||
print("Finished in: {}s".format(time.time() - start))
|
||||
print("Average iteration time: {}s".format(sum(loop_times) / len(loop_times)))
|
||||
print("Max iteration time: {}s".format(max(loop_times)))
|
||||
print("Min iteration time: {}s".format(min(loop_times)))
|
||||
|
||||
@@ -47,33 +47,46 @@ class Actor(object):
|
||||
return np.ones(size, dtype=np.uint8)
|
||||
|
||||
|
||||
# Launch a bunch of tasks. (approximately 200 seconds)
|
||||
# Stage 1: Launch a bunch of tasks.
|
||||
stage_1_iterations = []
|
||||
start_time = time.time()
|
||||
logger.info("Submitting many tasks.")
|
||||
for i in range(10):
|
||||
iteration_start = time.time()
|
||||
logger.info("Iteration %s", i)
|
||||
ray.get([f.remote(0) for _ in range(100000)])
|
||||
logger.info("Finished after %s seconds.", time.time() - start_time)
|
||||
stage_1_iterations.append(time.time() - iteration_start)
|
||||
|
||||
stage_1_time = time.time() - start_time
|
||||
logger.info("Finished stage 1 after %s seconds.", stage_1_time)
|
||||
|
||||
# Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This
|
||||
# test starts to fail if we increase the number of tasks in the inner loop from
|
||||
# 500 to 1000. (approximately 615 seconds)
|
||||
stage_2_iterations = []
|
||||
start_time = time.time()
|
||||
logger.info("Submitting tasks with many dependencies.")
|
||||
x_ids = []
|
||||
for _ in range(5):
|
||||
iteration_start = time.time()
|
||||
for i in range(20):
|
||||
logger.info("Iteration %s. Cumulative time %s seconds", i,
|
||||
time.time() - start_time)
|
||||
x_ids = [f.remote(0, *x_ids) for _ in range(500)]
|
||||
ray.get(x_ids)
|
||||
stage_2_iterations.append(time.time() - iteration_start)
|
||||
logger.info("Finished after %s seconds.", time.time() - start_time)
|
||||
|
||||
stage_2_time = time.time() - start_time
|
||||
logger.info("Finished stage 2 after %s seconds.", stage_2_time)
|
||||
|
||||
# Create a bunch of actors.
|
||||
start_time = time.time()
|
||||
logger.info("Creating %s actors.", num_remote_cpus)
|
||||
actors = [Actor.remote() for _ in range(num_remote_cpus)]
|
||||
logger.info("Finished after %s seconds.", time.time() - start_time)
|
||||
stage_3_creation_time = time.time() - start_time
|
||||
logger.info("Finished stage 3 actor creation in %s seconds.",
|
||||
stage_3_creation_time)
|
||||
|
||||
# Submit a bunch of small tasks to each actor. (approximately 1070 seconds)
|
||||
start_time = time.time()
|
||||
@@ -85,7 +98,26 @@ for N in [1000, 100000]:
|
||||
if i % 100 == 0:
|
||||
logger.info("Submitted {}".format(i * len(actors)))
|
||||
ray.get(x_ids)
|
||||
logger.info("Finished after %s seconds.", time.time() - start_time)
|
||||
stage_3_time = time.time() - start_time
|
||||
logger.info("Finished stage 3 in %s seconds.", stage_3_time)
|
||||
|
||||
print("Stage 1 results:")
|
||||
print("\tTotal time: {}".format(stage_1_time))
|
||||
print("\tAverage iteration time: {}".format(
|
||||
sum(stage_1_iterations) / len(stage_1_iterations)))
|
||||
print("\tMax iteration time: {}".format(max(stage_1_iterations)))
|
||||
print("\tMin iteration time: {}".format(min(stage_1_iterations)))
|
||||
|
||||
print("Stage 2 results:")
|
||||
print("\tTotal time: {}".format(stage_2_time))
|
||||
print("\tAverage iteration time: {}".format(
|
||||
sum(stage_2_iterations) / len(stage_2_iterations)))
|
||||
print("\tMax iteration time: {}".format(max(stage_2_iterations)))
|
||||
print("\tMin iteration time: {}".format(min(stage_2_iterations)))
|
||||
|
||||
print("Stage 3 results:")
|
||||
print("\tActor creation time: {}".format(stage_3_creation_time))
|
||||
print("\tTotal time: {}".format(stage_3_time))
|
||||
|
||||
# TODO(rkn): The test below is commented out because it currently does not
|
||||
# pass.
|
||||
|
||||
Reference in New Issue
Block a user