[docs] Add more guideline on using ray in slurm cluster (#12819)

Co-authored-by: Sumanth Ratna <sumanthratna@gmail.com>
Co-authored-by: PENG Zhenghao <pengzh@ie.cuhk.edu.hk>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
PENG Zhenghao
2021-01-15 04:17:53 +08:00
committed by GitHub
parent d98235cc84
commit e63da54931
8 changed files with 470 additions and 74 deletions
@@ -0,0 +1,29 @@
# trainer.py
from collections import Counter
import os
import sys
import time
import ray
num_cpus = int(sys.argv[1])
ray.init(address=os.environ["ip_head"])
print("Nodes in the Ray cluster:")
print(ray.nodes())
@ray.remote
def f():
time.sleep(1)
return ray.services.get_node_ip_address()
# The following takes one second (assuming that
# ray was able to access all of the allocated nodes).
for i in range(60):
start = time.time()
ip_addresses = ray.get([f.remote() for _ in range(num_cpus)])
print(Counter(ip_addresses))
end = time.time()
print(end - start)
@@ -0,0 +1,8 @@
:orphan:
.. _slurm-basic:
slurm-basic.sh
~~~~~~~~~~~~~~
.. literalinclude:: /cluster/examples/slurm-basic.sh
@@ -0,0 +1,65 @@
#!/bin/bash
# shellcheck disable=SC2206
#SBATCH --job-name=test
#SBATCH --cpus-per-task=5
#SBATCH --mem-per-cpu=1GB
#SBATCH --nodes=4
#SBATCH --tasks-per-node=1
#SBATCH --time=00:30:00
set -x
# __doc_head_address_start__
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
# if we detect a space character in the head node IP, we'll
# convert it to an ipv4 address. This step is optional.
if [[ "$head_node_ip" == *" "* ]]; then
IFS=' ' read -ra ADDR <<<"$head_node_ip"
if [[ ${#ADDR[0]} -gt 16 ]]; then
head_node_ip=${ADDR[1]}
else
head_node_ip=${ADDR[0]}
fi
echo "IPV6 address detected. We split the IPV4 address as $head_node_ip"
fi
# __doc_head_address_end__
# __doc_head_ray_start__
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"
echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
ray start --head --node-ip-address="$head_node_ip" --port=$port \
--num-cpus "${SLURM_CPUS_PER_TASK}" --num-gpus "${SLURM_GPUS_PER_TASK}" --block &
# __doc_head_ray_end__
# __doc_worker_ray_start__
# optional, though may be useful in certain versions of Ray < 1.0.
sleep 10
# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
echo "Starting WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w "$node_i" \
ray start --address "$ip_head" \
--num-cpus "${SLURM_CPUS_PER_TASK}" --num-gpus "${SLURM_GPUS_PER_TASK}" --block &
sleep 5
done
# __doc_worker_ray_end__
# __doc_script_start__
# ray/doc/source/cluster/examples/simple-trainer.py
python -u simple-trainer.py
+103
View File
@@ -0,0 +1,103 @@
# slurm-launch.py
# Usage:
# python slurm-launch.py --exp-name test \
# --command "rllib train --run PPO --env CartPole-v0"
import argparse
import subprocess
import sys
import time
from pathlib import Path
template_file = Path(__file__) / "slurm-template.sh"
JOB_NAME = "${JOB_NAME}"
NUM_NODES = "${NUM_NODES}"
NUM_GPUS_PER_NODE = "${NUM_GPUS_PER_NODE}"
PARTITION_OPTION = "${PARTITION_OPTION}"
COMMAND_PLACEHOLDER = "${COMMAND_PLACEHOLDER}"
GIVEN_NODE = "${GIVEN_NODE}"
LOAD_ENV = "${LOAD_ENV}"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--exp-name",
type=str,
required=True,
help="The job name and path to logging file (exp_name.log).")
parser.add_argument(
"--num-nodes",
"-n",
type=int,
default=1,
help="Number of nodes to use.")
parser.add_argument(
"--node",
"-w",
type=str,
help="The specified nodes to use. Same format as the "
"return of 'sinfo'. Default: ''.")
parser.add_argument(
"--num-gpus",
type=int,
default=0,
help="Number of GPUs to use in each node. (Default: 0)")
parser.add_argument(
"--partition",
"-p",
type=str,
)
parser.add_argument(
"--load-env",
type=str,
help="The script to load your environment ('module load cuda/10.1')")
parser.add_argument(
"--command",
type=str,
required=True,
help="The command you wish to execute. For example: "
" --command 'python test.py'. "
"Note that the command must be a string.")
args = parser.parse_args()
if args.node:
# assert args.num_nodes == 1
node_info = "#SBATCH -w {}".format(args.node)
else:
node_info = ""
job_name = "{}_{}".format(args.exp_name,
time.strftime("%m%d-%H%M", time.localtime()))
partition_option = "#SBATCH --partition={}".format(
args.partition) if args.partition else ""
# ===== Modified the template script =====
with open(template_file, "r") as f:
text = f.read()
text = text.replace(JOB_NAME, job_name)
text = text.replace(NUM_NODES, str(args.num_nodes))
text = text.replace(NUM_GPUS_PER_NODE, str(args.num_gpus))
text = text.replace(PARTITION_OPTION, partition_option)
text = text.replace(COMMAND_PLACEHOLDER, str(args.command))
text = text.replace(LOAD_ENV, str(args.load_env))
text = text.replace(GIVEN_NODE, node_info)
text = text.replace(
"# THIS FILE IS A TEMPLATE AND IT SHOULD NOT BE DEPLOYED TO "
"PRODUCTION!",
"# THIS FILE IS MODIFIED AUTOMATICALLY FROM TEMPLATE AND SHOULD BE "
"RUNNABLE!")
# ===== Save the script =====
script_file = "{}.sh".format(job_name)
with open(script_file, "w") as f:
f.write(text)
# ===== Submit the job =====
print("Starting to submit job!")
subprocess.Popen(["sbatch", script_file])
print(
"Job submitted! Script file is at: <{}>. Log file is at: <{}>".format(
script_file, "{}.log".format(job_name)))
sys.exit(0)
@@ -0,0 +1,8 @@
:orphan:
.. _slurm-launch:
slurm-launch.py
~~~~~~~~~~~~~~~
.. literalinclude:: /cluster/examples/slurm-launch.py
@@ -0,0 +1,9 @@
:orphan:
.. _slurm-template:
slurm-template.sh
~~~~~~~~~~~~~~~~~
.. literalinclude:: /cluster/examples/slurm-template.sh
:language: bash
@@ -0,0 +1,64 @@
#!/bin/bash
# shellcheck disable=SC2206
# THIS FILE IS GENERATED BY AUTOMATION SCRIPT! PLEASE REFER TO ORIGINAL SCRIPT!
# THIS FILE IS A TEMPLATE AND IT SHOULD NOT BE DEPLOYED TO PRODUCTION!
${PARTITION_OPTION}
#SBATCH --job-name=${JOB_NAME}
#SBATCH --output=${JOB_NAME}.log
${GIVEN_NODE}
### This script works for any number of nodes, Ray will find and manage all resources
#SBATCH --nodes=${NUM_NODES}
#SBATCH --exclusive
### Give all resources to a single Ray task, ray can manage the resources internally
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-task=${NUM_GPUS_PER_NODE}
# Load modules or your own conda environment here
# module load pytorch/v1.4.0-gpu
# conda activate ${CONDA_ENV}
${LOAD_ENV}
# ===== DO NOT CHANGE THINGS HERE UNLESS YOU KNOW WHAT YOU ARE DOING =====
# This script is a modification to the implementation suggest by gregSchwartz18 here:
# https://github.com/ray-project/ray/issues/826#issuecomment-522116599
redis_password=$(uuidgen)
export redis_password
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") # Getting the node names
nodes_array=($nodes)
node_1=${nodes_array[0]}
ip=$(srun --nodes=1 --ntasks=1 -w "$node_1" hostname --ip-address) # making redis-address
# if we detect a space character in the head node IP, we'll
# convert it to an ipv4 address. This step is optional.
if [[ "$ip" == *" "* ]]; then
IFS=' ' read -ra ADDR <<< "$ip"
if [[ ${#ADDR[0]} -gt 16 ]]; then
ip=${ADDR[1]}
else
ip=${ADDR[0]}
fi
echo "IPV6 address detected. We split the IPV4 address as $ip"
fi
port=6379
ip_head=$ip:$port
export ip_head
echo "IP Head: $ip_head"
echo "STARTING HEAD at $node_1"
srun --nodes=1 --ntasks=1 -w "$node_1" \
ray start --head --node-ip-address="$ip" --port=$port --redis-password="$redis_password" --block &
sleep 30
worker_num=$((SLURM_JOB_NUM_NODES - 1)) #number of nodes other than the head node
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
echo "STARTING WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w "$node_i" ray start --address "$ip_head" --redis-password="$redis_password" --block &
sleep 5
done
# ===== Call your code below =====
${COMMAND_PLACEHOLDER}
+184 -74
View File
@@ -3,7 +3,188 @@
Deploying on Slurm
==================
Clusters managed by Slurm may require that Ray is initialized as a part of the submitted job. This can be done by using ``srun`` within the submitted script.
Slurm usage with Ray can be a little bit unintuitive.
* SLURM requires multiple copies of the same program are submitted multiple times to the same cluster to do cluster programming. This is particularly well-suited for MPI-based workloads.
* Ray, on the other hand, expects a head-worker architecture with a single point of entry. That is, you'll need to start a Ray head node, multiple Ray worker nodes, and run your Ray script on the head node.
This document aims to clarify how to run Ray on SLURM.
.. contents::
:local:
Walkthrough using Ray with SLURM
--------------------------------
Many SLURM deployments require you to interact with slurm via ``sbatch``, which executes a batch script on SLURM.
To run a Ray job with ``sbatch``, you will want to start a Ray cluster in the sbatch job with multiple ``srun`` commands (tasks), and then execute your python script that uses Ray. Each task will run on a separate node and start/connect to a Ray runtime.
The below walkthrough will do the following:
1. Set the proper headers for the ``sbatch`` script.
2. Load the proper environment/modules.
3. Fetch a list of available computing nodes and their IP addresses.
4. Launch a head ray process in one of the node (called the head node).
5. Launch Ray processes in (n-1) worker nodes and connects them to the head node by providing the head node address.
6. After the underlying ray cluster is ready, submit the user specified task.
See :ref:`slurm-basic.sh <slurm-basic>` for an end-to-end example.
.. _ray-slurm-headers:
sbatch directives
~~~~~~~~~~~~~~~~~
In your sbatch script, you'll want to add `directives to provide context <https://slurm.schedmd.com/sbatch.html>`__ for your job to SLURM.
.. code-block:: bash
#!/bin/bash
#SBATCH --job-name=my-workload
You'll need to tell SLURM to allocate nodes specifically for Ray. Ray will then find and manage all resources on each node.
.. code-block:: bash
### Modify this according to your Ray workload.
#SBATCH --nodes=4
#SBATCH --exclusive
Important: To ensure that each Ray worker runtime will run on a separate node, set ``tasks-per-node``.
.. code-block:: bash
#SBATCH --tasks-per-node=1
Since we've set `tasks-per-node = 1`, this will be used to guarantee that each Ray worker runtime will obtain the
proper resources. In this example, we ask for at least 5 CPUs and 5 GB of memory per node.
.. code-block:: bash
### Modify this according to your Ray workload.
#SBATCH --cpus-per-task=5
#SBATCH --mem-per-cpu=1GB
### Similarly, you can also specify the number of GPUs per node.
### Modify this according to your Ray workload. Sometimes this
### should be 'gres' instead.
#SBATCH --gpus-per-task=1
You can also add other optional flags to your sbatch directives.
Loading your environment
~~~~~~~~~~~~~~~~~~~~~~~~
First, you'll often want to Load modules or your own conda environment at the beginning of the script.
Note that this is an optional step, but it is often required for enabling the right set of dependencies.
.. code-block:: bash
# Example: module load pytorch/v1.4.0-gpu
# Example: conda activate my-env
conda activate my-env
Obtain the head IP address
~~~~~~~~~~~~~~~~~~~~~~~~~~
Next, we'll want to obtain a hostname and a node IP address for the head node. This way, when we start worker nodes, we'll be able to properly connect to the right head node.
.. literalinclude:: /cluster/examples/slurm-basic.sh
:language: bash
:start-after: __doc_head_address_start__
:end-before: __doc_head_address_end__
Starting the Ray head node
~~~~~~~~~~~~~~~~~~~~~~~~~~
After detecting the head node hostname and head node IP, we'll want to create
a Ray head node runtime. We'll do this by using ``srun`` as a background task
as a single task/node (recall that ``tasks-per-node=1``).
Below, you'll see that we explicitly specify the number of CPUs (``num-cpus``)
and number of GPUs (``num-gpus``) to Ray, as this will prevent Ray from using
more resources than allocated. We also need to explictly
indicate the ``node-ip-address`` for the Ray head runtime:
.. literalinclude:: /cluster/examples/slurm-basic.sh
:language: bash
:start-after: __doc_head_ray_start__
:end-before: __doc_head_ray_end__
By backgrounding the above srun task, we can proceed to start the Ray worker runtimes.
Starting the Ray worker nodes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Below, we do the same thing, but for each worker. Make sure the Ray head and Ray worker processes are not started on the same node.
.. literalinclude:: /cluster/examples/slurm-basic.sh
:language: bash
:start-after: __doc_worker_ray_start__
:end-before: __doc_worker_ray_end__
Submitting your script
~~~~~~~~~~~~~~~~~~~~~~
Finally, you can invoke your Python script:
.. literalinclude:: /cluster/examples/slurm-basic.sh
:language: bash
:start-after: __doc_script_start__
Python-interface SLURM scripts
------------------------------
[Contributed by @pengzhenghao] Below, we provide a helper utility (:ref:`slurm-launch.py <slurm-launch>`) to auto-generate SLURM scripts and launch.
``slurm-launch.py`` uses an underlying template (:ref:`slurm-template.sh <slurm-template>`) and fills out placeholders given user input.
You can feel free to copy both files into your cluster for use. Feel free to also open any PRs for contributions to improve this script!
Usage example
~~~~~~~~~~~~~
If you want to utilize a multi-node cluster in slurm:
.. code-block:: bash
python slurm-launch.py --exp-name test --command "python your_file.py" --num-nodes 3
If you want to specify the computing node(s), just use the same node name(s) in the same format of the output of ``sinfo`` command:
.. code-block:: bash
python slurm-launch.py --exp-name test --command "python your_file.py" --num-nodes 3 --node NODE_NAMES
There are other options you can use when calling ``python slurm-launch.py``:
* ``--exp-name``: The experiment name. Will generate ``{exp-name}_{date}-{time}.sh`` and ``{exp-name}_{date}-{time}.log``.
* ``--command``: The command you wish to run. For example: ``rllib train XXX`` or ``python XXX.py``.
* ``--num-gpus``: The number of GPUs you wish to use in each computing node. Default: 0.
* ``--node`` (``-w``): The specific nodes you wish to use, in the same form as the output of ``sinfo``. Nodes are automatically assigned if not specified.
* ``--num-nodes`` (``-n``): The number of nodes you wish to use. Default: 1.
* ``--partition`` (``-p``): The partition you wish to use. Default: "", will use user's default partition.
* ``--load-env``: The command to setup your environment. For example: ``module load cuda/10.1``. Default: "".
Note that the :ref:`slurm-template.sh <slurm-template>` is compatible with both IPV4 and IPV6 ip address of the computing nodes.
Implementation
~~~~~~~~~~~~~~
Concretely, the (:ref:`slurm-launch.py <slurm-launch>`) does the following things:
1. It automatically writes your requirements, e.g. number of CPUs, GPUs per node, the number of nodes and so on, to a sbatch script name ``{exp-name}_{date}-{time}.sh``. Your command (``--command``) to launch your own job is also written into the sbatch script.
2. Then it will submit the sbatch script to slurm manager via a new process.
3. Finally, the python process will terminate itself and leaves a log file named ``{exp-name}_{date}-{time}.log`` to record the progress of your submitted command. At the mean time, the ray cluster and your job is running in the slurm cluster.
Examples and templates
----------------------
@@ -13,82 +194,11 @@ Here are some community-contributed templates for using SLURM with Ray:
- `Ray sbatch submission scripts`_ used at `NERSC <https://www.nersc.gov/>`_, a US national lab.
- `YASPI`_ (yet another slurm python interface) by @albanie. The goal of yaspi is to provide an interface to submitting slurm jobs, thereby obviating the joys of sbatch files. It does so through recipes - these are collections of templates and rules for generating sbatch scripts. Supports job submissions for Ray.
- `Template script`_ by @pengzhenghao
- `Convenient python interface`_ to launch ray cluster and submit task by @pengzhenghao
.. _`Ray sbatch submission scripts`: https://github.com/NERSC/slurm-ray-cluster
.. _`YASPI`: https://github.com/albanie/yaspi
.. _`Template script`: https://gist.github.com/pengzhenghao/b348db1075101a9b986c4cdfea13dcd6
.. _`Convenient python interface`: https://github.com/pengzhenghao/use-ray-with-slurm
Starter SLURM script
--------------------
.. code-block:: bash
#!/bin/bash
#SBATCH --job-name=test
#SBATCH --cpus-per-task=5
#SBATCH --mem-per-cpu=1GB
#SBATCH --nodes=4
#SBATCH --tasks-per-node=1
#SBATCH --time=00:30:00
#SBATCH --reservation=test
let "worker_num=(${SLURM_NTASKS} - 1)"
# Define the total number of CPU cores available to ray
let "total_cores=${worker_num} * ${SLURM_CPUS_PER_TASK}"
suffix='6379'
ip_head=`hostname`:$suffix
export ip_head # Exporting for latter access by trainer.py
# Start the ray head node on the node that executes this script by specifying --nodes=1 and --nodelist=`hostname`
# We are using 1 task on this node and 5 CPUs (Threads). Have the dashboard listen to 0.0.0.0 to bind it to all
# network interfaces. This allows to access the dashboard through port-forwarding:
# Let's say the hostname=cluster-node-500 To view the dashboard on localhost:8265, set up an ssh-tunnel like this: (assuming the firewall allows it)
# $ ssh -N -f -L 8265:cluster-node-500:8265 user@big-cluster
srun --nodes=1 --ntasks=1 --cpus-per-task=${SLURM_CPUS_PER_TASK} --nodelist=`hostname` ray start --head --block --dashboard-host 0.0.0.0 --port=6379 --num-cpus ${SLURM_CPUS_PER_TASK} &
sleep 5
# Make sure the head successfully starts before any worker does, otherwise
# the worker will not be able to connect to redis. In case of longer delay,
# adjust the sleeptime above to ensure proper order.
# Now we execute worker_num worker nodes on all nodes in the allocation except hostname by
# specifying --nodes=${worker_num} and --exclude=`hostname`. Use 1 task per node, so worker_num tasks in total
# (--ntasks=${worker_num}) and 5 CPUs per task (--cps-per-task=${SLURM_CPUS_PER_TASK}).
srun --nodes=${worker_num} --ntasks=${worker_num} --cpus-per-task=${SLURM_CPUS_PER_TASK} --exclude=`hostname` ray start --address $ip_head --block --num-cpus ${SLURM_CPUS_PER_TASK} &
sleep 5
python -u trainer.py ${total_cores} # Pass the total number of allocated CPUs
.. code-block:: python
# trainer.py
from collections import Counter
import os
import sys
import time
import ray
num_cpus = int(sys.argv[1])
ray.init(address=os.environ["ip_head"])
print("Nodes in the Ray cluster:")
print(ray.nodes())
@ray.remote
def f():
time.sleep(1)
return ray.services.get_node_ip_address()
# The following takes one second (assuming that ray was able to access all of the allocated nodes).
for i in range(60):
start = time.time()
ip_addresses = ray.get([f.remote() for _ in range(num_cpus)])
print(Counter(ip_addresses))
end = time.time()
print(end - start)