diff --git a/doc/source/cluster/examples/simple-trainer.py b/doc/source/cluster/examples/simple-trainer.py new file mode 100644 index 000000000..691d13443 --- /dev/null +++ b/doc/source/cluster/examples/simple-trainer.py @@ -0,0 +1,29 @@ +# trainer.py +from collections import Counter +import os +import sys +import time +import ray + +num_cpus = int(sys.argv[1]) + +ray.init(address=os.environ["ip_head"]) + +print("Nodes in the Ray cluster:") +print(ray.nodes()) + + +@ray.remote +def f(): + time.sleep(1) + return ray.services.get_node_ip_address() + + +# The following takes one second (assuming that +# ray was able to access all of the allocated nodes). +for i in range(60): + start = time.time() + ip_addresses = ray.get([f.remote() for _ in range(num_cpus)]) + print(Counter(ip_addresses)) + end = time.time() + print(end - start) diff --git a/doc/source/cluster/examples/slurm-basic.rst b/doc/source/cluster/examples/slurm-basic.rst new file mode 100644 index 000000000..6b11a8bfa --- /dev/null +++ b/doc/source/cluster/examples/slurm-basic.rst @@ -0,0 +1,8 @@ +:orphan: + +.. _slurm-basic: + +slurm-basic.sh +~~~~~~~~~~~~~~ + +.. literalinclude:: /cluster/examples/slurm-basic.sh diff --git a/doc/source/cluster/examples/slurm-basic.sh b/doc/source/cluster/examples/slurm-basic.sh new file mode 100644 index 000000000..9471aac9b --- /dev/null +++ b/doc/source/cluster/examples/slurm-basic.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# shellcheck disable=SC2206 +#SBATCH --job-name=test +#SBATCH --cpus-per-task=5 +#SBATCH --mem-per-cpu=1GB +#SBATCH --nodes=4 +#SBATCH --tasks-per-node=1 +#SBATCH --time=00:30:00 + +set -x + +# __doc_head_address_start__ + +# Getting the node names +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +nodes_array=($nodes) + +head_node=${nodes_array[0]} +head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address) + +# if we detect a space character in the head node IP, we'll +# convert it to an ipv4 address. This step is optional. +if [[ "$head_node_ip" == *" "* ]]; then +IFS=' ' read -ra ADDR <<<"$head_node_ip" +if [[ ${#ADDR[0]} -gt 16 ]]; then + head_node_ip=${ADDR[1]} +else + head_node_ip=${ADDR[0]} +fi +echo "IPV6 address detected. We split the IPV4 address as $head_node_ip" +fi +# __doc_head_address_end__ + +# __doc_head_ray_start__ +port=6379 +ip_head=$head_node_ip:$port +export ip_head +echo "IP Head: $ip_head" + +echo "Starting HEAD at $head_node" +srun --nodes=1 --ntasks=1 -w "$head_node" \ + ray start --head --node-ip-address="$head_node_ip" --port=$port \ + --num-cpus "${SLURM_CPUS_PER_TASK}" --num-gpus "${SLURM_GPUS_PER_TASK}" --block & +# __doc_head_ray_end__ + +# __doc_worker_ray_start__ +# optional, though may be useful in certain versions of Ray < 1.0. +sleep 10 + +# number of nodes other than the head node +worker_num=$((SLURM_JOB_NUM_NODES - 1)) + +for ((i = 1; i <= worker_num; i++)); do + node_i=${nodes_array[$i]} + echo "Starting WORKER $i at $node_i" + srun --nodes=1 --ntasks=1 -w "$node_i" \ + ray start --address "$ip_head" \ + --num-cpus "${SLURM_CPUS_PER_TASK}" --num-gpus "${SLURM_GPUS_PER_TASK}" --block & + sleep 5 +done +# __doc_worker_ray_end__ + +# __doc_script_start__ +# ray/doc/source/cluster/examples/simple-trainer.py +python -u simple-trainer.py \ No newline at end of file diff --git a/doc/source/cluster/examples/slurm-launch.py b/doc/source/cluster/examples/slurm-launch.py new file mode 100644 index 000000000..26dd98a87 --- /dev/null +++ b/doc/source/cluster/examples/slurm-launch.py @@ -0,0 +1,103 @@ +# slurm-launch.py +# Usage: +# python slurm-launch.py --exp-name test \ +# --command "rllib train --run PPO --env CartPole-v0" + +import argparse +import subprocess +import sys +import time + +from pathlib import Path + +template_file = Path(__file__) / "slurm-template.sh" +JOB_NAME = "${JOB_NAME}" +NUM_NODES = "${NUM_NODES}" +NUM_GPUS_PER_NODE = "${NUM_GPUS_PER_NODE}" +PARTITION_OPTION = "${PARTITION_OPTION}" +COMMAND_PLACEHOLDER = "${COMMAND_PLACEHOLDER}" +GIVEN_NODE = "${GIVEN_NODE}" +LOAD_ENV = "${LOAD_ENV}" + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--exp-name", + type=str, + required=True, + help="The job name and path to logging file (exp_name.log).") + parser.add_argument( + "--num-nodes", + "-n", + type=int, + default=1, + help="Number of nodes to use.") + parser.add_argument( + "--node", + "-w", + type=str, + help="The specified nodes to use. Same format as the " + "return of 'sinfo'. Default: ''.") + parser.add_argument( + "--num-gpus", + type=int, + default=0, + help="Number of GPUs to use in each node. (Default: 0)") + parser.add_argument( + "--partition", + "-p", + type=str, + ) + parser.add_argument( + "--load-env", + type=str, + help="The script to load your environment ('module load cuda/10.1')") + parser.add_argument( + "--command", + type=str, + required=True, + help="The command you wish to execute. For example: " + " --command 'python test.py'. " + "Note that the command must be a string.") + args = parser.parse_args() + + if args.node: + # assert args.num_nodes == 1 + node_info = "#SBATCH -w {}".format(args.node) + else: + node_info = "" + + job_name = "{}_{}".format(args.exp_name, + time.strftime("%m%d-%H%M", time.localtime())) + + partition_option = "#SBATCH --partition={}".format( + args.partition) if args.partition else "" + + # ===== Modified the template script ===== + with open(template_file, "r") as f: + text = f.read() + text = text.replace(JOB_NAME, job_name) + text = text.replace(NUM_NODES, str(args.num_nodes)) + text = text.replace(NUM_GPUS_PER_NODE, str(args.num_gpus)) + text = text.replace(PARTITION_OPTION, partition_option) + text = text.replace(COMMAND_PLACEHOLDER, str(args.command)) + text = text.replace(LOAD_ENV, str(args.load_env)) + text = text.replace(GIVEN_NODE, node_info) + text = text.replace( + "# THIS FILE IS A TEMPLATE AND IT SHOULD NOT BE DEPLOYED TO " + "PRODUCTION!", + "# THIS FILE IS MODIFIED AUTOMATICALLY FROM TEMPLATE AND SHOULD BE " + "RUNNABLE!") + + # ===== Save the script ===== + script_file = "{}.sh".format(job_name) + with open(script_file, "w") as f: + f.write(text) + + # ===== Submit the job ===== + print("Starting to submit job!") + subprocess.Popen(["sbatch", script_file]) + print( + "Job submitted! Script file is at: <{}>. Log file is at: <{}>".format( + script_file, "{}.log".format(job_name))) + sys.exit(0) diff --git a/doc/source/cluster/examples/slurm-launch.rst b/doc/source/cluster/examples/slurm-launch.rst new file mode 100644 index 000000000..f727b4da3 --- /dev/null +++ b/doc/source/cluster/examples/slurm-launch.rst @@ -0,0 +1,8 @@ +:orphan: + +.. _slurm-launch: + +slurm-launch.py +~~~~~~~~~~~~~~~ + +.. literalinclude:: /cluster/examples/slurm-launch.py diff --git a/doc/source/cluster/examples/slurm-template.rst b/doc/source/cluster/examples/slurm-template.rst new file mode 100644 index 000000000..e037bab17 --- /dev/null +++ b/doc/source/cluster/examples/slurm-template.rst @@ -0,0 +1,9 @@ +:orphan: + +.. _slurm-template: + +slurm-template.sh +~~~~~~~~~~~~~~~~~ + +.. literalinclude:: /cluster/examples/slurm-template.sh + :language: bash diff --git a/doc/source/cluster/examples/slurm-template.sh b/doc/source/cluster/examples/slurm-template.sh new file mode 100644 index 000000000..91f9d88b7 --- /dev/null +++ b/doc/source/cluster/examples/slurm-template.sh @@ -0,0 +1,64 @@ +#!/bin/bash +# shellcheck disable=SC2206 +# THIS FILE IS GENERATED BY AUTOMATION SCRIPT! PLEASE REFER TO ORIGINAL SCRIPT! +# THIS FILE IS A TEMPLATE AND IT SHOULD NOT BE DEPLOYED TO PRODUCTION! +${PARTITION_OPTION} +#SBATCH --job-name=${JOB_NAME} +#SBATCH --output=${JOB_NAME}.log +${GIVEN_NODE} +### This script works for any number of nodes, Ray will find and manage all resources +#SBATCH --nodes=${NUM_NODES} +#SBATCH --exclusive +### Give all resources to a single Ray task, ray can manage the resources internally +#SBATCH --ntasks-per-node=1 +#SBATCH --gpus-per-task=${NUM_GPUS_PER_NODE} + +# Load modules or your own conda environment here +# module load pytorch/v1.4.0-gpu +# conda activate ${CONDA_ENV} +${LOAD_ENV} + +# ===== DO NOT CHANGE THINGS HERE UNLESS YOU KNOW WHAT YOU ARE DOING ===== +# This script is a modification to the implementation suggest by gregSchwartz18 here: +# https://github.com/ray-project/ray/issues/826#issuecomment-522116599 +redis_password=$(uuidgen) +export redis_password + +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") # Getting the node names +nodes_array=($nodes) + +node_1=${nodes_array[0]} +ip=$(srun --nodes=1 --ntasks=1 -w "$node_1" hostname --ip-address) # making redis-address + +# if we detect a space character in the head node IP, we'll +# convert it to an ipv4 address. This step is optional. +if [[ "$ip" == *" "* ]]; then + IFS=' ' read -ra ADDR <<< "$ip" + if [[ ${#ADDR[0]} -gt 16 ]]; then + ip=${ADDR[1]} + else + ip=${ADDR[0]} + fi + echo "IPV6 address detected. We split the IPV4 address as $ip" +fi + +port=6379 +ip_head=$ip:$port +export ip_head +echo "IP Head: $ip_head" + +echo "STARTING HEAD at $node_1" +srun --nodes=1 --ntasks=1 -w "$node_1" \ + ray start --head --node-ip-address="$ip" --port=$port --redis-password="$redis_password" --block & +sleep 30 + +worker_num=$((SLURM_JOB_NUM_NODES - 1)) #number of nodes other than the head node +for ((i = 1; i <= worker_num; i++)); do + node_i=${nodes_array[$i]} + echo "STARTING WORKER $i at $node_i" + srun --nodes=1 --ntasks=1 -w "$node_i" ray start --address "$ip_head" --redis-password="$redis_password" --block & + sleep 5 +done + +# ===== Call your code below ===== +${COMMAND_PLACEHOLDER} diff --git a/doc/source/cluster/slurm.rst b/doc/source/cluster/slurm.rst index 08b299d5e..a9b6f6d99 100644 --- a/doc/source/cluster/slurm.rst +++ b/doc/source/cluster/slurm.rst @@ -3,7 +3,188 @@ Deploying on Slurm ================== -Clusters managed by Slurm may require that Ray is initialized as a part of the submitted job. This can be done by using ``srun`` within the submitted script. +Slurm usage with Ray can be a little bit unintuitive. + +* SLURM requires multiple copies of the same program are submitted multiple times to the same cluster to do cluster programming. This is particularly well-suited for MPI-based workloads. +* Ray, on the other hand, expects a head-worker architecture with a single point of entry. That is, you'll need to start a Ray head node, multiple Ray worker nodes, and run your Ray script on the head node. + +This document aims to clarify how to run Ray on SLURM. + +.. contents:: + :local: + + +Walkthrough using Ray with SLURM +-------------------------------- + +Many SLURM deployments require you to interact with slurm via ``sbatch``, which executes a batch script on SLURM. + +To run a Ray job with ``sbatch``, you will want to start a Ray cluster in the sbatch job with multiple ``srun`` commands (tasks), and then execute your python script that uses Ray. Each task will run on a separate node and start/connect to a Ray runtime. + +The below walkthrough will do the following: + +1. Set the proper headers for the ``sbatch`` script. +2. Load the proper environment/modules. +3. Fetch a list of available computing nodes and their IP addresses. +4. Launch a head ray process in one of the node (called the head node). +5. Launch Ray processes in (n-1) worker nodes and connects them to the head node by providing the head node address. +6. After the underlying ray cluster is ready, submit the user specified task. + +See :ref:`slurm-basic.sh ` for an end-to-end example. + +.. _ray-slurm-headers: + +sbatch directives +~~~~~~~~~~~~~~~~~ + +In your sbatch script, you'll want to add `directives to provide context `__ for your job to SLURM. + +.. code-block:: bash + + #!/bin/bash + #SBATCH --job-name=my-workload + +You'll need to tell SLURM to allocate nodes specifically for Ray. Ray will then find and manage all resources on each node. + +.. code-block:: bash + + ### Modify this according to your Ray workload. + #SBATCH --nodes=4 + #SBATCH --exclusive + +Important: To ensure that each Ray worker runtime will run on a separate node, set ``tasks-per-node``. + +.. code-block:: bash + + #SBATCH --tasks-per-node=1 + +Since we've set `tasks-per-node = 1`, this will be used to guarantee that each Ray worker runtime will obtain the +proper resources. In this example, we ask for at least 5 CPUs and 5 GB of memory per node. + +.. code-block:: bash + + ### Modify this according to your Ray workload. + #SBATCH --cpus-per-task=5 + #SBATCH --mem-per-cpu=1GB + ### Similarly, you can also specify the number of GPUs per node. + ### Modify this according to your Ray workload. Sometimes this + ### should be 'gres' instead. + #SBATCH --gpus-per-task=1 + + +You can also add other optional flags to your sbatch directives. + + +Loading your environment +~~~~~~~~~~~~~~~~~~~~~~~~ + +First, you'll often want to Load modules or your own conda environment at the beginning of the script. + +Note that this is an optional step, but it is often required for enabling the right set of dependencies. + +.. code-block:: bash + + # Example: module load pytorch/v1.4.0-gpu + # Example: conda activate my-env + + conda activate my-env + +Obtain the head IP address +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Next, we'll want to obtain a hostname and a node IP address for the head node. This way, when we start worker nodes, we'll be able to properly connect to the right head node. + +.. literalinclude:: /cluster/examples/slurm-basic.sh + :language: bash + :start-after: __doc_head_address_start__ + :end-before: __doc_head_address_end__ + + + +Starting the Ray head node +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +After detecting the head node hostname and head node IP, we'll want to create +a Ray head node runtime. We'll do this by using ``srun`` as a background task +as a single task/node (recall that ``tasks-per-node=1``). + +Below, you'll see that we explicitly specify the number of CPUs (``num-cpus``) +and number of GPUs (``num-gpus``) to Ray, as this will prevent Ray from using +more resources than allocated. We also need to explictly +indicate the ``node-ip-address`` for the Ray head runtime: + +.. literalinclude:: /cluster/examples/slurm-basic.sh + :language: bash + :start-after: __doc_head_ray_start__ + :end-before: __doc_head_ray_end__ + +By backgrounding the above srun task, we can proceed to start the Ray worker runtimes. + +Starting the Ray worker nodes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Below, we do the same thing, but for each worker. Make sure the Ray head and Ray worker processes are not started on the same node. + +.. literalinclude:: /cluster/examples/slurm-basic.sh + :language: bash + :start-after: __doc_worker_ray_start__ + :end-before: __doc_worker_ray_end__ + +Submitting your script +~~~~~~~~~~~~~~~~~~~~~~ + +Finally, you can invoke your Python script: + +.. literalinclude:: /cluster/examples/slurm-basic.sh + :language: bash + :start-after: __doc_script_start__ + + +Python-interface SLURM scripts +------------------------------ + +[Contributed by @pengzhenghao] Below, we provide a helper utility (:ref:`slurm-launch.py `) to auto-generate SLURM scripts and launch. +``slurm-launch.py`` uses an underlying template (:ref:`slurm-template.sh `) and fills out placeholders given user input. + +You can feel free to copy both files into your cluster for use. Feel free to also open any PRs for contributions to improve this script! + +Usage example +~~~~~~~~~~~~~ + +If you want to utilize a multi-node cluster in slurm: + +.. code-block:: bash + + python slurm-launch.py --exp-name test --command "python your_file.py" --num-nodes 3 + +If you want to specify the computing node(s), just use the same node name(s) in the same format of the output of ``sinfo`` command: + +.. code-block:: bash + + python slurm-launch.py --exp-name test --command "python your_file.py" --num-nodes 3 --node NODE_NAMES + + +There are other options you can use when calling ``python slurm-launch.py``: + +* ``--exp-name``: The experiment name. Will generate ``{exp-name}_{date}-{time}.sh`` and ``{exp-name}_{date}-{time}.log``. +* ``--command``: The command you wish to run. For example: ``rllib train XXX`` or ``python XXX.py``. +* ``--num-gpus``: The number of GPUs you wish to use in each computing node. Default: 0. +* ``--node`` (``-w``): The specific nodes you wish to use, in the same form as the output of ``sinfo``. Nodes are automatically assigned if not specified. +* ``--num-nodes`` (``-n``): The number of nodes you wish to use. Default: 1. +* ``--partition`` (``-p``): The partition you wish to use. Default: "", will use user's default partition. +* ``--load-env``: The command to setup your environment. For example: ``module load cuda/10.1``. Default: "". + +Note that the :ref:`slurm-template.sh ` is compatible with both IPV4 and IPV6 ip address of the computing nodes. + +Implementation +~~~~~~~~~~~~~~ + +Concretely, the (:ref:`slurm-launch.py `) does the following things: + +1. It automatically writes your requirements, e.g. number of CPUs, GPUs per node, the number of nodes and so on, to a sbatch script name ``{exp-name}_{date}-{time}.sh``. Your command (``--command``) to launch your own job is also written into the sbatch script. +2. Then it will submit the sbatch script to slurm manager via a new process. +3. Finally, the python process will terminate itself and leaves a log file named ``{exp-name}_{date}-{time}.log`` to record the progress of your submitted command. At the mean time, the ray cluster and your job is running in the slurm cluster. + Examples and templates ---------------------- @@ -13,82 +194,11 @@ Here are some community-contributed templates for using SLURM with Ray: - `Ray sbatch submission scripts`_ used at `NERSC `_, a US national lab. - `YASPI`_ (yet another slurm python interface) by @albanie. The goal of yaspi is to provide an interface to submitting slurm jobs, thereby obviating the joys of sbatch files. It does so through recipes - these are collections of templates and rules for generating sbatch scripts. Supports job submissions for Ray. -- `Template script`_ by @pengzhenghao +- `Convenient python interface`_ to launch ray cluster and submit task by @pengzhenghao .. _`Ray sbatch submission scripts`: https://github.com/NERSC/slurm-ray-cluster .. _`YASPI`: https://github.com/albanie/yaspi -.. _`Template script`: https://gist.github.com/pengzhenghao/b348db1075101a9b986c4cdfea13dcd6 +.. _`Convenient python interface`: https://github.com/pengzhenghao/use-ray-with-slurm - -Starter SLURM script --------------------- - -.. code-block:: bash - - #!/bin/bash - #SBATCH --job-name=test - #SBATCH --cpus-per-task=5 - #SBATCH --mem-per-cpu=1GB - #SBATCH --nodes=4 - #SBATCH --tasks-per-node=1 - #SBATCH --time=00:30:00 - #SBATCH --reservation=test - - let "worker_num=(${SLURM_NTASKS} - 1)" - - # Define the total number of CPU cores available to ray - let "total_cores=${worker_num} * ${SLURM_CPUS_PER_TASK}" - - suffix='6379' - ip_head=`hostname`:$suffix - export ip_head # Exporting for latter access by trainer.py - - # Start the ray head node on the node that executes this script by specifying --nodes=1 and --nodelist=`hostname` - # We are using 1 task on this node and 5 CPUs (Threads). Have the dashboard listen to 0.0.0.0 to bind it to all - # network interfaces. This allows to access the dashboard through port-forwarding: - # Let's say the hostname=cluster-node-500 To view the dashboard on localhost:8265, set up an ssh-tunnel like this: (assuming the firewall allows it) - # $ ssh -N -f -L 8265:cluster-node-500:8265 user@big-cluster - srun --nodes=1 --ntasks=1 --cpus-per-task=${SLURM_CPUS_PER_TASK} --nodelist=`hostname` ray start --head --block --dashboard-host 0.0.0.0 --port=6379 --num-cpus ${SLURM_CPUS_PER_TASK} & - sleep 5 - # Make sure the head successfully starts before any worker does, otherwise - # the worker will not be able to connect to redis. In case of longer delay, - # adjust the sleeptime above to ensure proper order. - - # Now we execute worker_num worker nodes on all nodes in the allocation except hostname by - # specifying --nodes=${worker_num} and --exclude=`hostname`. Use 1 task per node, so worker_num tasks in total - # (--ntasks=${worker_num}) and 5 CPUs per task (--cps-per-task=${SLURM_CPUS_PER_TASK}). - srun --nodes=${worker_num} --ntasks=${worker_num} --cpus-per-task=${SLURM_CPUS_PER_TASK} --exclude=`hostname` ray start --address $ip_head --block --num-cpus ${SLURM_CPUS_PER_TASK} & - sleep 5 - - python -u trainer.py ${total_cores} # Pass the total number of allocated CPUs - -.. code-block:: python - - # trainer.py - from collections import Counter - import os - import sys - import time - import ray - - num_cpus = int(sys.argv[1]) - - ray.init(address=os.environ["ip_head"]) - - print("Nodes in the Ray cluster:") - print(ray.nodes()) - - @ray.remote - def f(): - time.sleep(1) - return ray.services.get_node_ip_address() - - # The following takes one second (assuming that ray was able to access all of the allocated nodes). - for i in range(60): - start = time.time() - ip_addresses = ray.get([f.remote() for _ in range(num_cpus)]) - print(Counter(ip_addresses)) - end = time.time() - print(end - start)