diff --git a/ci/long_running_tests/README.rst b/ci/long_running_tests/README.rst new file mode 100644 index 000000000..27bf37987 --- /dev/null +++ b/ci/long_running_tests/README.rst @@ -0,0 +1,30 @@ +Long Running Tests +================== + +This directory contains scripts for starting long-running workloads which are +intended to run forever until they fail. + +Running the Workloads +--------------------- + +To run the workloads, run ``./start_workloads.sh``. This will start one EC2 +instance per workload and will start the workloads running (one per instance). +Running the ``./start_workloads.sh`` script again will clean up any state from +the previous runs and will start the workloads again. + +Check Workload Statuses +----------------------- + +To check up on the workloads, run ``./check_workloads.sh``. This will print the +tail of each workload, and from the output you might be able to see if something +has failed. + +To debug workloads that have failed, you may find it useful to ssh to the +relevant machine, attach to the tmux session (usually ``tmux a -t 0``), inspect +the logs under ``/tmp/ray/session*/logs/``, and also inspect +``/tmp/ray/session*/debug_state.txt``. + +Adding a Workload +----------------- + +To create a new workload, simply add a new Python file under ``workloads/``. diff --git a/ci/long_running_tests/check_workloads.sh b/ci/long_running_tests/check_workloads.sh new file mode 100755 index 000000000..591b0d7f8 --- /dev/null +++ b/ci/long_running_tests/check_workloads.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +# set -x + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +pushd "$ROOT_DIR" + +for workload_file in "$ROOT_DIR"/workloads/*; do + file_name=$(basename -- $workload_file) + workload_name="${file_name%.*}" + echo "======================================================================" + echo "WORKLOAD: $workload_name" + echo "======================================================================" + + ray exec config.yaml --cluster-name="$workload_name" "tmux capture-pane -p" + echo "" + echo "ssh to this machine with:" + echo " ray attach $ROOT_DIR/config.yaml --cluster-name=$workload_name" + echo "" + echo "" +done + +popd diff --git a/ci/long_running_tests/config.yaml b/ci/long_running_tests/config.yaml new file mode 100644 index 000000000..80505f606 --- /dev/null +++ b/ci/long_running_tests/config.yaml @@ -0,0 +1,65 @@ +cluster_name: default +min_workers: 0 +max_workers: 0 +target_utilization_fraction: 0.8 +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a +auth: + ssh_user: ubuntu + +head_node: + InstanceType: m5.2xlarge + ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. + + # Set primary volume to 25 GiB + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 50 + +worker_nodes: + InstanceType: m5.large + ImageId: ami-0def3275 # Default Ubuntu 16.04 AMI. + + # Set primary volume to 25 GiB + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 50 + + # Run workers on spot by default. Comment this out to use on-demand. + InstanceMarketOptions: + MarketType: spot + +# List of shell commands to run to set up nodes. +setup_commands: + # Install Anaconda. + - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true + - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true + - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc + # Some Python dependencies. + - pip install boto3==1.4.8 cython==0.29.0 + # # Uncomment the following if you wish to install Ray instead. + # - sudo apt-get update + # - sudo apt-get install -y cmake pkg-config build-essential autoconf curl libtool unzip flex bison python + # - git clone https://github.com/ray-project/ray || true + # - cd ray/python; git checkout master; git pull; pip install -e . --verbose + # Install nightly Ray wheels. + - pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.7.0.dev0-cp36-cp36m-manylinux1_x86_64.whl + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: [] + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: [] diff --git a/ci/long_running_tests/start_workloads.sh b/ci/long_running_tests/start_workloads.sh new file mode 100755 index 000000000..26b2bbca4 --- /dev/null +++ b/ci/long_running_tests/start_workloads.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +set -e + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +pushd "$ROOT_DIR" + +# Start one instance per workload. +for workload_file in "$ROOT_DIR"/workloads/*; do + file_name=$(basename -- $workload_file) + workload_name="${file_name%.*}" + ray up -y config.yaml --cluster-name="$workload_name" & +done + +# Wait for all of the nodes to be up. +for pid in `jobs -p`; do + wait $pid +done + +# Start the workloads running. +for workload_file in "$ROOT_DIR"/workloads/*; do + file_name=$(basename -- $workload_file) + workload_name="${file_name%.*}" + # Copy the workload to the cluster. + ray rsync_up config.yaml --cluster-name="$workload_name" "$workload_file" "$file_name" + # Clean up previous runs if relevant. + ray exec config.yaml --cluster-name="$workload_name" "ray stop; rm -r /tmp/ray; tmux kill-server | true" + # Start the workload. + ray exec config.yaml --cluster-name="$workload_name" "python $file_name" --tmux +done + +popd + +# Print some helpful information. + +echo "" +echo "" + +echo "To kill the instances, use the following commands." +echo "" +for workload_file in "$ROOT_DIR"/workloads/*; do + file_name=$(basename -- $workload_file) + workload_name="${file_name%.*}" + echo " ray down -y $ROOT_DIR/config.yaml --cluster-name=$workload_name" +done + +echo "" +echo "" + +echo "Use the following commands to attach to the relevant drivers." +echo "" +for workload_file in "$ROOT_DIR"/workloads/*; do + file_name=$(basename -- $workload_file) + workload_name="${file_name%.*}" + echo " ray attach $ROOT_DIR/config.yaml --cluster-name=$workload_name --tmux" +done + +echo "" +echo "" + +echo "To check up on the scripts, run the following." +echo " $ROOT_DIR/check_workloads.sh" diff --git a/ci/long_running_tests/workloads/workload1.py b/ci/long_running_tests/workloads/workload1.py new file mode 100644 index 000000000..852e6a1bf --- /dev/null +++ b/ci/long_running_tests/workloads/workload1.py @@ -0,0 +1,67 @@ +# This workload tests submitting and getting many tasks over and over. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import ray +from ray.tests.cluster_utils import Cluster + +num_redis_shards = 5 +redis_max_memory = 10**8 +object_store_memory = 10**8 +num_nodes = 10 + +message = ("Make sure there is enough memory on this machine to run this " + "workload. We divide the system memory by 2 to provide a buffer.") +assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory < + ray.utils.get_system_memory() / 2) + +# Simulate a cluster on one machine. + +cluster = Cluster() +for i in range(num_nodes): + cluster.add_node( + redis_port=6379 if i == 0 else None, + num_redis_shards=num_redis_shards if i == 0 else None, + num_cpus=2, + num_gpus=0, + resources={str(i): 2}, + object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory) +ray.init(redis_address=cluster.redis_address) + +# Run the workload. + + +@ray.remote +def f(*xs): + return 1 + + +iteration = 0 +ids = [] +start_time = time.time() +previous_time = start_time +while True: + for _ in range(50): + new_constrained_ids = [ + f._remote(args=[*ids], resources={str(i % num_nodes): 1}) + for i in range(25) + ] + new_unconstrained_ids = [f.remote(*ids) for _ in range(25)] + ids = new_constrained_ids + new_unconstrained_ids + + ray.get(ids) + + new_time = time.time() + print("Iteration {}:\n" + " - Iteration time: {}.\n" + " - Absolute time: {}.\n" + " - Total elapsed time: {}.".format( + iteration, new_time - previous_time, new_time, + new_time - start_time)) + previous_time = new_time + iteration += 1 diff --git a/ci/long_running_tests/workloads/workload2.py b/ci/long_running_tests/workloads/workload2.py new file mode 100644 index 000000000..75fefb058 --- /dev/null +++ b/ci/long_running_tests/workloads/workload2.py @@ -0,0 +1,70 @@ +# This workload tests submitting many actor methods. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import ray +from ray.tests.cluster_utils import Cluster + +num_redis_shards = 5 +redis_max_memory = 10**8 +object_store_memory = 10**8 +num_nodes = 10 + +message = ("Make sure there is enough memory on this machine to run this " + "workload. We divide the system memory by 2 to provide a buffer.") +assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory < + ray.utils.get_system_memory() / 2), message + +# Simulate a cluster on one machine. + +cluster = Cluster() +for i in range(num_nodes): + cluster.add_node( + redis_port=6379 if i == 0 else None, + num_redis_shards=num_redis_shards if i == 0 else None, + num_cpus=2, + num_gpus=0, + resources={str(i): 2}, + object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory) +ray.init(redis_address=cluster.redis_address) + +# Run the workload. + + +@ray.remote +class Actor(object): + def __init__(self): + self.value = 0 + + def method(self): + self.value += 1 + + +actors = [ + Actor._remote([], {}, num_cpus=0.1, resources={str(i % num_nodes): 0.1}) + for i in range(num_nodes * 5) +] + +iteration = 0 +start_time = time.time() +previous_time = start_time +while True: + for _ in range(100): + previous_ids = [a.method.remote() for a in actors] + + ray.get(previous_ids) + + new_time = time.time() + print("Iteration {}:\n" + " - Iteration time: {}.\n" + " - Absolute time: {}.\n" + " - Total elapsed time: {}.".format( + iteration, new_time - previous_time, new_time, + new_time - start_time)) + previous_time = new_time + iteration += 1 diff --git a/ci/long_running_tests/workloads/workload3.py b/ci/long_running_tests/workloads/workload3.py new file mode 100644 index 000000000..20de8358d --- /dev/null +++ b/ci/long_running_tests/workloads/workload3.py @@ -0,0 +1,70 @@ +# This workload tests repeatedly killing a node and adding a new node. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import ray +from ray.tests.cluster_utils import Cluster + +num_redis_shards = 5 +redis_max_memory = 10**8 +object_store_memory = 10**8 +num_nodes = 10 + +message = ("Make sure there is enough memory on this machine to run this " + "workload. We divide the system memory by 2 to provide a buffer.") +assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory < + ray.utils.get_system_memory() / 2), message + +# Simulate a cluster on one machine. + +cluster = Cluster() +for i in range(num_nodes): + cluster.add_node( + redis_port=6379 if i == 0 else None, + num_redis_shards=num_redis_shards if i == 0 else None, + num_cpus=2, + num_gpus=0, + resources={str(i): 2}, + object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory) +ray.init(redis_address=cluster.redis_address) + +# Run the workload. + + +@ray.remote +def f(*xs): + return 1 + + +iteration = 0 +previous_ids = [1 for _ in range(100)] +start_time = time.time() +previous_time = start_time +while True: + for _ in range(100): + previous_ids = [f.remote(previous_id) for previous_id in previous_ids] + + ray.wait(previous_ids, num_returns=len(previous_ids)) + + for _ in range(100): + previous_ids = [f.remote(previous_id) for previous_id in previous_ids] + + node_to_kill = cluster.list_all_nodes()[1] + # Remove the first non-head node. + cluster.remove_node(node_to_kill) + cluster.add_node() + + new_time = time.time() + print("Iteration {}:\n" + " - Iteration time: {}.\n" + " - Absolute time: {}.\n" + " - Total elapsed time: {}.".format( + iteration, new_time - previous_time, new_time, + new_time - start_time)) + previous_time = new_time + iteration += 1 diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index d08f7ef14..16e49b788 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -126,10 +126,8 @@ def run(args, parser): cluster = Cluster() for _ in range(args.ray_num_nodes): cluster.add_node( - resources={ - "num_cpus": args.ray_num_cpus or 1, - "num_gpus": args.ray_num_gpus or 0, - }, + num_cpus=args.ray_num_cpus or 1, + num_gpus=args.ray_num_gpus or 0, object_store_memory=args.ray_object_store_memory, redis_max_memory=args.ray_redis_max_memory) ray.init(redis_address=cluster.redis_address)