From c21a79ae6e35bdc01b4b40d27f67502489d73390 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 29 Jan 2021 12:38:06 -0800 Subject: [PATCH] [Object Spilling] 100GB shuffle release test (#13729) --- release/RELEASE_PROCESS.rst | 16 ++ release/data_processing_tests/README.rst | 9 + release/data_processing_tests/cluster.yaml | 128 +++++++++++++ .../workloads/streaming_shuffle.py | 177 ++++++++++++++++++ 4 files changed, 330 insertions(+) create mode 100644 release/data_processing_tests/README.rst create mode 100644 release/data_processing_tests/cluster.yaml create mode 100644 release/data_processing_tests/workloads/streaming_shuffle.py diff --git a/release/RELEASE_PROCESS.rst b/release/RELEASE_PROCESS.rst index 018f56bdf..80afb3589 100644 --- a/release/RELEASE_PROCESS.rst +++ b/release/RELEASE_PROCESS.rst @@ -148,6 +148,22 @@ is generally the easiest way to run release tests. Run the ``python/ray/tests/test_k8s_*`` to make sure K8s cluster launcher and operator works. Make sure the docker image is the released version. +6. **Data processing tests** + + .. code-block:: bash + + data_processing_tests/README.rst + + Follow the instructions to kick off the tests and check the status of the workloads. + Data processing tests make sure all the data processing features are reliable and performant. + The following tests should be run. + + - ``data_processing_tests/workloads/streaming_shuffle.py`` run the 100GB streaming shuffle in a single node & fake 4 nodes cluster. + + **IMPORTANT** Check if the workload scripts has terminated. If so, please record the result (both read/write bandwidth and the shuffle result) to the ``release_logs/data_processing_tests/[test_name]``. + Both shuffling runtime and read/write bandwidth shouldn't be decreasing more than 15% compared to the previous release. + + Identify and Resolve Release Blockers ------------------------------------- If a release blocking issue arises in the course of testing, you should diff --git a/release/data_processing_tests/README.rst b/release/data_processing_tests/README.rst new file mode 100644 index 000000000..3db8eeb9c --- /dev/null +++ b/release/data_processing_tests/README.rst @@ -0,0 +1,9 @@ +Running script +-------------- + +Run `unset RAY_ADDRESS; python workloads/streaming_shuffle.py` + +Cluster configurations +---------------------- + +Make sure the test runs in i3.8xl (IO optimized instance). \ No newline at end of file diff --git a/release/data_processing_tests/cluster.yaml b/release/data_processing_tests/cluster.yaml new file mode 100644 index 000000000..903dd2564 --- /dev/null +++ b/release/data_processing_tests/cluster.yaml @@ -0,0 +1,128 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: native-shuffle-tests + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 0 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 0 + +# The autoscaler will scale up the cluster faster with higher upscaling speed. +# E.g., if the task requires adding more nodes then autoscaler will gradually +# scale up the cluster in chunks of upscaling_speed*currently_running_nodes. +# This number should be > 0. +upscaling_speed: 1.0 + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "" # You can change this to latest-cpu if you don't need GPU support and want a faster startup + # image: rayproject/ray:latest-gpu # use this one if you don't need ML dependencies, it's faster to pull + container_name: "" + # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image + # if no cached version is present. + pull_before_run: True + run_options: [] # Extra options to pass into "docker run" + + # Example of running a GPU head with CPU workers + # head_image: "rayproject/ray-ml:latest-gpu" + # Allow Ray to automatically detect GPUs + + # worker_image: "rayproject/ray-ml:latest-cpu" + # worker_run_options: [] + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + # Availability zone(s), comma-separated, that nodes may be launched in. + # Nodes are currently spread between zones by a round-robin approach, + # however this implementation detail should not be relied upon. + availability_zone: us-west-2a,us-west-2b + # Whether to allow node reuse. If set to False, nodes will be terminated + # instead of stopped. + cache_stopped_nodes: True # If not present, the default is True. + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu +# By default Ray creates a new private keypair, but you can also use your own. +# If you do so, make sure to also set "KeyName" in the head and worker node +# configurations below. +# ssh_private_key: /path/to/your/key.pem + +# Provider-specific config for the head node, e.g. instance type. By default +# Ray will auto-configure unspecified fields such as SubnetId and KeyName. +# For more documentation on available fields, see: +# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances +head_node: + InstanceType: i3.8xlarge + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + + # You can provision additional disk space with a conf as follows + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 1000 + + # Additional options in the boto docs. + +# Provider-specific config for worker nodes, e.g. instance type. By default +# Ray will auto-configure unspecified fields such as SubnetId and KeyName. +# For more documentation on available fields, see: +# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances +worker_nodes: + InstanceType: i3.8xlarge + ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30 + + # You can provision additional disk space with a conf as follows + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 1000 + +# Patterns for files to exclude when running rsync up or rsync down +rsync_exclude: + - "**/.git" + - "**/.git/**" + +# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for +# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided +# as a value, the behavior will match git's behavior for finding and using .gitignore files. +rsync_filter: + - ".gitignore" + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: + - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl + # Not necessary. + - sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 65535" >> /etc/security/limits.conf; echo "* hard nofile 65535" >> /etc/security/limits.conf;' + - pip install tqdm + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + # - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --system-config='{"automatic_object_spilling_enabled":true,"max_io_workers":1,"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}' + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + # - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/release/data_processing_tests/workloads/streaming_shuffle.py b/release/data_processing_tests/workloads/streaming_shuffle.py new file mode 100644 index 000000000..903042bb9 --- /dev/null +++ b/release/data_processing_tests/workloads/streaming_shuffle.py @@ -0,0 +1,177 @@ +import time +import json +import ray +import numpy as np +from typing import List +from tqdm import tqdm + +from ray.cluster_utils import Cluster + +num_nodes = 4 +num_cpus = 4 +partition_size = int(500e6) # 500MB +# Number of map & reduce tasks == num_partitions. +# Number of objects == num_partitions ^ 2. +num_partitions = 200 +# There are two int64 per row, so we divide by 8 * 2 bytes. +rows_per_partition = partition_size // (8 * 2) +object_store_size = 20 * 1024 * 1024 * 1024 # 20G + +system_config = { + "automatic_object_spilling_enabled": True, + "max_io_workers": 1, + "object_spilling_config": json.dumps( + { + "type": "filesystem", + "params": { + "directory_path": "/tmp/spill" + } + }, + separators=(",", ":")) +} + + +def display_spilling_info(address): + state = ray.state.GlobalState() + state._initialize_global_state(address, + ray.ray_constants.REDIS_DEFAULT_PASSWORD) + raylet = state.node_table()[0] + memory_summary = ray.internal.internal_api.memory_summary( + raylet["NodeManagerAddress"], raylet["NodeManagerPort"]) + for line in memory_summary.split("\n"): + if "Spilled" in line: + print(line) + if "Restored" in line: + print(line) + print("\n\n") + + +@ray.remote +class Counter: + def __init__(self): + self.num_map = 0 + self.num_reduce = 0 + + def inc(self): + self.num_map += 1 + # print("Num map tasks finished", self.num_map) + + def inc2(self): + self.num_reduce += 1 + # print("Num reduce tasks finished", self.num_reduce) + + def finish(self): + pass + + +# object store peak memory: O(partition size / num partitions) +# heap memory: O(partition size / num partitions) +@ray.remote(num_returns=num_partitions) +def shuffle_map_streaming( + i, counter_handle=None) -> List["ObjectRef[np.ndarray]"]: + outputs = [ + ray.put( + np.ones((rows_per_partition // num_partitions, 2), dtype=np.int64)) + for _ in range(num_partitions) + ] + counter_handle.inc.remote() + return outputs + + +# object store peak memory: O(partition size / num partitions) +# heap memory: O(partition size) -- TODO can be reduced too +@ray.remote +def shuffle_reduce_streaming(*inputs, counter_handle=None) -> np.ndarray: + out = None + for chunk in inputs: + if out is None: + out = ray.get(chunk) + else: + out = np.concatenate([out, ray.get(chunk)]) + counter_handle.inc2.remote() + return out + + +shuffle_map = shuffle_map_streaming +shuffle_reduce = shuffle_reduce_streaming + + +def run_shuffle(): + counter = Counter.remote() + start = time.time() + print("start map") + shuffle_map_out = [ + shuffle_map.remote(i, counter_handle=counter) + for i in range(num_partitions) + ] + # wait until all map is done before reduce phase. + for out in tqdm(shuffle_map_out): + ray.get(out) + + # Start reducing + shuffle_reduce_out = [ + shuffle_reduce.remote( + *[shuffle_map_out[i][j] for i in range(num_partitions)], + counter_handle=counter) for j in range(num_partitions) + ] + + print("start shuffle.") + pbar = tqdm(total=num_partitions) + total_rows = 0 + ready, unready = ray.wait(shuffle_reduce_out) + while unready: + ready, unready = ray.wait(unready) + for output in ready: + pbar.update(1) + total_rows += ray.get(output).shape[0] + delta = time.time() - start + + ray.get(counter.finish.remote()) + print("Shuffled", total_rows * 8 * 2, "bytes in", delta, + "seconds in a single node.\n") + + +def run_single_node(): + address = ray.init( + num_cpus=num_cpus * num_nodes, + object_store_memory=object_store_size, + _system_config=system_config) + + # Run shuffle. + print( + "\n\nTest streaming shuffle with a single node.\n" + f"Shuffle size: {partition_size * num_partitions / 1024 / 1024 / 1024}" + "GB") + run_shuffle() + time.sleep(5) + display_spilling_info(address["redis_address"]) + ray.shutdown() + time.sleep(5) + + +def run_multi_nodes(): + c = Cluster() + c.add_node( + num_cpus=4, + object_store_memory=object_store_size, + _system_config=system_config) + ray.init(address=c.address) + for _ in range(num_nodes - 1): # subtract a head node. + c.add_node(num_cpus=4, object_store_memory=object_store_size) + c.wait_for_nodes() + + # Run shuffle. + print( + f"\n\nTest streaming shuffle with {num_nodes} nodes.\n" + f"Shuffle size: {partition_size * num_partitions / 1024 / 1024 / 1024}" + "GB") + run_shuffle() + time.sleep(5) + display_spilling_info(c.address) + ray.shutdown() + c.shutdown() + time.sleep(5) + + +run_single_node() +run_multi_nodes()