[Object Spilling] 100GB shuffle release test (#13729)

This commit is contained in:
SangBin Cho
2021-01-29 12:38:06 -08:00
committed by GitHub
parent 1a9a0024d5
commit c21a79ae6e
4 changed files with 330 additions and 0 deletions
+16
View File
@@ -148,6 +148,22 @@ is generally the easiest way to run release tests.
Run the ``python/ray/tests/test_k8s_*`` to make sure K8s cluster launcher and operator works. Make sure the docker image is the released version.
6. **Data processing tests**
.. code-block:: bash
data_processing_tests/README.rst
Follow the instructions to kick off the tests and check the status of the workloads.
Data processing tests make sure all the data processing features are reliable and performant.
The following tests should be run.
- ``data_processing_tests/workloads/streaming_shuffle.py`` run the 100GB streaming shuffle in a single node & fake 4 nodes cluster.
**IMPORTANT** Check if the workload scripts has terminated. If so, please record the result (both read/write bandwidth and the shuffle result) to the ``release_logs/data_processing_tests/[test_name]``.
Both shuffling runtime and read/write bandwidth shouldn't be decreasing more than 15% compared to the previous release.
Identify and Resolve Release Blockers
-------------------------------------
If a release blocking issue arises in the course of testing, you should
+9
View File
@@ -0,0 +1,9 @@
Running script
--------------
Run `unset RAY_ADDRESS; python workloads/streaming_shuffle.py`
Cluster configurations
----------------------
Make sure the test runs in i3.8xl (IO optimized instance).
+128
View File
@@ -0,0 +1,128 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: native-shuffle-tests
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 0
# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
image: "" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
# image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull
container_name: ""
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
run_options: [] # Extra options to pass into "docker run"
# Example of running a GPU head with CPU workers
# head_image: "rayproject/ray-ml:latest-gpu"
# Allow Ray to automatically detect GPUs
# worker_image: "rayproject/ray-ml:latest-cpu"
# worker_run_options: []
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
# Availability zone(s), comma-separated, that nodes may be launched in.
# Nodes are currently spread between zones by a round-robin approach,
# however this implementation detail should not be relied upon.
availability_zone: us-west-2a,us-west-2b
# Whether to allow node reuse. If set to False, nodes will be terminated
# instead of stopped.
cache_stopped_nodes: True # If not present, the default is True.
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
head_node:
InstanceType: i3.8xlarge
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# You can provision additional disk space with a conf as follows
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 1000
# Additional options in the boto docs.
# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
worker_nodes:
InstanceType: i3.8xlarge
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# You can provision additional disk space with a conf as follows
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 1000
# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
- "**/.git"
- "**/.git/**"
# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
- ".gitignore"
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []
# List of shell commands to run to set up nodes.
setup_commands:
- echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc
- pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl
# Not necessary.
- sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 65535" >> /etc/security/limits.conf; echo "* hard nofile 65535" >> /etc/security/limits.conf;'
- pip install tqdm
# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
# - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --system-config='{"automatic_object_spilling_enabled":true,"max_io_workers":1,"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}'
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
# - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
@@ -0,0 +1,177 @@
import time
import json
import ray
import numpy as np
from typing import List
from tqdm import tqdm
from ray.cluster_utils import Cluster
num_nodes = 4
num_cpus = 4
partition_size = int(500e6) # 500MB
# Number of map & reduce tasks == num_partitions.
# Number of objects == num_partitions ^ 2.
num_partitions = 200
# There are two int64 per row, so we divide by 8 * 2 bytes.
rows_per_partition = partition_size // (8 * 2)
object_store_size = 20 * 1024 * 1024 * 1024 # 20G
system_config = {
"automatic_object_spilling_enabled": True,
"max_io_workers": 1,
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": "/tmp/spill"
}
},
separators=(",", ":"))
}
def display_spilling_info(address):
state = ray.state.GlobalState()
state._initialize_global_state(address,
ray.ray_constants.REDIS_DEFAULT_PASSWORD)
raylet = state.node_table()[0]
memory_summary = ray.internal.internal_api.memory_summary(
raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
for line in memory_summary.split("\n"):
if "Spilled" in line:
print(line)
if "Restored" in line:
print(line)
print("\n\n")
@ray.remote
class Counter:
def __init__(self):
self.num_map = 0
self.num_reduce = 0
def inc(self):
self.num_map += 1
# print("Num map tasks finished", self.num_map)
def inc2(self):
self.num_reduce += 1
# print("Num reduce tasks finished", self.num_reduce)
def finish(self):
pass
# object store peak memory: O(partition size / num partitions)
# heap memory: O(partition size / num partitions)
@ray.remote(num_returns=num_partitions)
def shuffle_map_streaming(
i, counter_handle=None) -> List["ObjectRef[np.ndarray]"]:
outputs = [
ray.put(
np.ones((rows_per_partition // num_partitions, 2), dtype=np.int64))
for _ in range(num_partitions)
]
counter_handle.inc.remote()
return outputs
# object store peak memory: O(partition size / num partitions)
# heap memory: O(partition size) -- TODO can be reduced too
@ray.remote
def shuffle_reduce_streaming(*inputs, counter_handle=None) -> np.ndarray:
out = None
for chunk in inputs:
if out is None:
out = ray.get(chunk)
else:
out = np.concatenate([out, ray.get(chunk)])
counter_handle.inc2.remote()
return out
shuffle_map = shuffle_map_streaming
shuffle_reduce = shuffle_reduce_streaming
def run_shuffle():
counter = Counter.remote()
start = time.time()
print("start map")
shuffle_map_out = [
shuffle_map.remote(i, counter_handle=counter)
for i in range(num_partitions)
]
# wait until all map is done before reduce phase.
for out in tqdm(shuffle_map_out):
ray.get(out)
# Start reducing
shuffle_reduce_out = [
shuffle_reduce.remote(
*[shuffle_map_out[i][j] for i in range(num_partitions)],
counter_handle=counter) for j in range(num_partitions)
]
print("start shuffle.")
pbar = tqdm(total=num_partitions)
total_rows = 0
ready, unready = ray.wait(shuffle_reduce_out)
while unready:
ready, unready = ray.wait(unready)
for output in ready:
pbar.update(1)
total_rows += ray.get(output).shape[0]
delta = time.time() - start
ray.get(counter.finish.remote())
print("Shuffled", total_rows * 8 * 2, "bytes in", delta,
"seconds in a single node.\n")
def run_single_node():
address = ray.init(
num_cpus=num_cpus * num_nodes,
object_store_memory=object_store_size,
_system_config=system_config)
# Run shuffle.
print(
"\n\nTest streaming shuffle with a single node.\n"
f"Shuffle size: {partition_size * num_partitions / 1024 / 1024 / 1024}"
"GB")
run_shuffle()
time.sleep(5)
display_spilling_info(address["redis_address"])
ray.shutdown()
time.sleep(5)
def run_multi_nodes():
c = Cluster()
c.add_node(
num_cpus=4,
object_store_memory=object_store_size,
_system_config=system_config)
ray.init(address=c.address)
for _ in range(num_nodes - 1): # subtract a head node.
c.add_node(num_cpus=4, object_store_memory=object_store_size)
c.wait_for_nodes()
# Run shuffle.
print(
f"\n\nTest streaming shuffle with {num_nodes} nodes.\n"
f"Shuffle size: {partition_size * num_partitions / 1024 / 1024 / 1024}"
"GB")
run_shuffle()
time.sleep(5)
display_spilling_info(c.address)
ray.shutdown()
c.shutdown()
time.sleep(5)
run_single_node()
run_multi_nodes()