Remove unused ci/performance_tests (#12767)

This commit is contained in:
Edward Oakes
2020-12-10 18:23:16 -06:00
committed by GitHub
parent c7b6ec88ef
commit 0e90cbcd19
2 changed files with 0 additions and 279 deletions
-31
View File
@@ -1,31 +0,0 @@
Performance Tests
=================
This directory contains scripts for running performance benchmarks. These
benchmarks are intended to be used by Ray developers to check if a given pull
request introduces a performance regression.
To check if a pull request introduces a performance regression, it is necessary
to run these benchmarks on the codebase before and after the change.
Running the Workloads
---------------------
To run the workload on a single machine, do the following.
.. code-block:: bash
python test_performance.py --num-nodes=3
This will start simulate a 3 node cluster on your local machine, attach to it,
and run the benchmarks. To run the benchmarks on an existing cluster, do the
following.
.. code-block:: bash
python test_performance.py --num-nodes=3 --address=<redis-address>
The ``--num-nodes`` flag must match the number of nodes in the cluster. The
nodes in the cluster must be configured with the appropriate resource labels. In
particular, the ith node in the cluster must have a resource named ``"i"``
with quantity ``500``.
-248
View File
@@ -1,248 +0,0 @@
import argparse
import logging
import numpy as np
import time
import ray
from ray.cluster_utils import Cluster
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(
description="Parse arguments for running the performance tests.")
parser.add_argument(
"--num-nodes",
required=True,
type=int,
help="The number of nodes to simulate in the cluster.")
parser.add_argument(
"--skip-object-store-warmup",
default=False,
action="store_true",
help="True if the object store should not be warmed up. This could cause "
"the benchmarks to appear slower than usual.")
parser.add_argument(
"--address",
required=False,
type=str,
help="The address of the cluster to connect to. If this is ommitted, then "
"a cluster will be started locally (on a single machine).")
def start_local_cluster(num_nodes, object_store_memory):
"""Start a local Ray cluster.
The ith node in the cluster will have a resource named "i".
Args:
num_nodes: The number of nodes to start in the cluster.
Returns:
The cluster object.
"""
num_redis_shards = 2
redis_max_memory = 10**8
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=num_redis_shards if i == 0 else None,
num_cpus=8 if i == 0 else 2,
num_gpus=0,
resources={str(i): 500},
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory)
ray.init(address=cluster.address)
return cluster
def wait_for_and_check_cluster_configuration(num_nodes):
"""Check that the cluster's custom resources are properly configured.
The ith node should have a resource labeled 'i' with quantity 500.
Args:
num_nodes: The number of nodes that we expect to be in the cluster.
Raises:
RuntimeError: This exception is raised if the cluster is not configured
properly for this test.
"""
logger.warning("Waiting for cluster to have %s nodes.", num_nodes)
while True:
nodes = ray.nodes()
if len(nodes) == num_nodes:
break
if len(nodes) > num_nodes:
raise RuntimeError(
"The cluster has %s nodes, but it should "
"only have %s.", len(nodes), num_nodes)
if not ([set(node["Resources"].keys())
for node in ray.nodes()] == [{str(i), "CPU"}
for i in range(num_nodes)]):
raise RuntimeError(
"The ith node in the cluster should have a "
"custom resource called 'i' with quantity "
"500. The nodes are\n%s", ray.nodes())
if not ([[
resource_quantity
for resource_name, resource_quantity in node["Resources"].items()
if resource_name != "CPU"
] for node in ray.nodes()] == num_nodes * [[500.0]]):
raise RuntimeError(
"The ith node in the cluster should have a "
"custom resource called 'i' with quantity "
"500. The nodes are\n%s", ray.nodes())
for node in ray.nodes():
if ("0" in node["Resources"] and node["ObjectStoreSocketName"] !=
ray.worker.global_worker.plasma_client.store_socket_name):
raise RuntimeError("The node that this driver is connected to "
"must have a custom resource labeled '0'.")
@ray.remote
def create_array(size):
return np.zeros(shape=size, dtype=np.uint8)
@ray.remote
def no_op(*values):
# The reason that this function takes *values is so that we can pass in
# an arbitrary number of object refs to create task dependencies.
return 1
@ray.remote
class Actor(object):
def ping(self, *values):
pass
def warm_up_cluster(num_nodes, object_store_memory):
"""Warm up the cluster.
This will allocate enough objects in each object store to cause eviction
because the first time a driver or worker touches a region of memory in the
object store, it may be slower.
Note that remote functions are exported lazily, so the first invocation of
a given remote function will be slower.
"""
logger.warning("Warming up the object store.")
size = object_store_memory * 2 // 5
num_objects = 2
while size > 0:
object_refs = []
for i in range(num_nodes):
for _ in range(num_objects):
object_refs += [
create_array._remote(args=[size], resources={str(i): 1})
]
size = size // 2
num_objects = min(num_objects * 2, 1000)
for object_ref in object_refs:
ray.get(object_ref)
logger.warning("Finished warming up the object store.")
# Invoke all of the remote functions once so that the definitions are
# broadcast to the workers.
ray.get(no_op.remote())
ray.get(Actor.remote().ping.remote())
def run_multiple_trials(f, num_trials):
durations = []
for _ in range(num_trials):
start = time.time()
f()
durations.append(time.time() - start)
return durations
def test_tasks(num_nodes):
def one_thousand_serial_tasks_local_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"0": 1}))
durations = run_multiple_trials(one_thousand_serial_tasks_local_node, 10)
logger.warning(
"one_thousand_serial_tasks_local_node \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def one_thousand_serial_tasks_remote_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"1": 1}))
durations = run_multiple_trials(one_thousand_serial_tasks_remote_node, 10)
logger.warning(
"one_thousand_serial_tasks_remote_node \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def ten_thousand_parallel_tasks_local():
ray.get([no_op._remote(resources={"0": 1}) for _ in range(10000)])
durations = run_multiple_trials(ten_thousand_parallel_tasks_local, 5)
logger.warning(
"ten_thousand_parallel_tasks_local \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def ten_thousand_parallel_tasks_load_balanced():
ray.get([
no_op._remote(resources={str(i % num_nodes): 1})
for i in range(10000)
])
durations = run_multiple_trials(ten_thousand_parallel_tasks_load_balanced,
5)
logger.warning(
"ten_thousand_parallel_tasks_load_balanced \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
if __name__ == "__main__":
args = parser.parse_args()
num_nodes = args.num_nodes
object_store_memory = 10**8
# Configure the cluster or check that it is properly configured.
if num_nodes < 2:
raise ValueError("The --num-nodes argument must be at least 2.")
if args.address:
ray.init(address=args.address)
wait_for_and_check_cluster_configuration(num_nodes)
logger.warning(
"Running performance benchmarks on the cluster with "
"address %s.", args.address)
else:
logger.warning(
"Running performance benchmarks on a simulated cluster "
"of %s nodes.", num_nodes)
cluster = start_local_cluster(num_nodes, object_store_memory)
if not args.skip_object_store_warmup:
warm_up_cluster(num_nodes, object_store_memory)
# Run the benchmarks.
test_tasks(num_nodes)
# TODO(rkn): Test actors, test object transfers, test tasks with many
# dependencies.