mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:44:07 +08:00
allow cluster script to update worker code on nodes (#243)
This commit is contained in:
committed by
Philipp Moritz
parent
86aef1bc56
commit
8952ff8cf9
@@ -45,30 +45,38 @@ until the installation has completed. The standard output from the nodes will
|
||||
be redirected to your terminal.
|
||||
5. To check that the installation succeeded, you can ssh to each node, cd into
|
||||
the directory `ray/test/`, and run the tests (e.g., `python runtest.py`).
|
||||
6. Now that Ray has been installed, you can start the cluster (the scheduler,
|
||||
object stores, and workers) with the command
|
||||
`start_ray("/home/ubuntu/ray/scripts/default_worker.py")`, where the second
|
||||
argument is the path on each node in the cluster to the worker code that you
|
||||
would like to use. After completing successfully, this command will print out a
|
||||
command that can be run on the head node to attach a shell (the driver) to the
|
||||
cluster. For example,
|
||||
6. Create a directory (for example, `mkdir ~/example_ray_code`) containing the
|
||||
worker `worker.py` code along with the code for any modules imported by
|
||||
`worker.py`. For example,
|
||||
|
||||
```
|
||||
cp ray/scripts/default_worker.py ~/example_ray_code/worker.py
|
||||
cp ray/scripts/example_functions.py ~/example_ray_code/
|
||||
```
|
||||
|
||||
7. Start the cluster (the scheduler, object stores, and workers) with the
|
||||
command `start_ray("~/example_ray_code")`, where the second argument is the
|
||||
local path to the worker code that you would like to use. This command will copy
|
||||
the worker code to each node and will start the cluster. After completing
|
||||
successfully, this command will print out a command that can be run on the head
|
||||
node to attach a shell (the driver) to the cluster. For example,
|
||||
|
||||
```
|
||||
source "$RAY_HOME/setup-env.sh";
|
||||
python "$RAY_HOME/scripts/shell.py" --scheduler-address=52.50.28.103:10001 --objstore-address=52.50.28.103:20001 --worker-address=52.50.28.103:30001 --attach
|
||||
python "$RAY_HOME/scripts/shell.py" --scheduler-address=12.34.56.789:10001 --objstore-address=12.34.56.789:20001 --worker-address=12.34.56.789:30001 --attach
|
||||
```
|
||||
|
||||
7. Note that there are several more commands that can be run from within
|
||||
8. Note that there are several more commands that can be run from within
|
||||
`cluster.py`.
|
||||
|
||||
- `install_ray()` - This pulls the Ray source code on each node, builds all
|
||||
of the third party libraries, and builds the project itself.
|
||||
- `start_ray(worker_path, num_workers_per_node=10)` - This starts a
|
||||
- `start_ray(worker_directory, num_workers_per_node=10)` - This starts a
|
||||
scheduler process on the head node, and it starts an object store and some
|
||||
workers on each node.
|
||||
- `stop_ray()` - This shuts down the cluster (killing all of the processes).
|
||||
- `restart_workers(worker_path, num_workers_per_node=10)` - This kills the
|
||||
current workers and starts new workers using the worker code from the
|
||||
- `restart_workers(worker_directory, num_workers_per_node=10)` - This kills
|
||||
the current workers and starts new workers using the worker code from the
|
||||
given file. Currently, this can only run when there are no tasks currently
|
||||
executing on any of the workers.
|
||||
- `update_ray()` - This pulls the latest Ray source code and builds it.
|
||||
|
||||
@@ -145,8 +145,6 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
|
||||
time.sleep(0.2)
|
||||
for _ in range(num_workers):
|
||||
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False)
|
||||
time.sleep(0.3)
|
||||
ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), is_driver=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
def start_workers(scheduler_address, objstore_address, num_workers, worker_path):
|
||||
|
||||
+59
-12
@@ -64,7 +64,7 @@ def _install_ray(node_ip_addresses, username, key_file, installation_directory):
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory):
|
||||
def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory):
|
||||
"""
|
||||
This method is used to start Ray on a cluster. It will ssh to the head node,
|
||||
that is, the first node in the list node_ip_addresses, and it will start
|
||||
@@ -74,9 +74,12 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work
|
||||
:param node_ip_addresses: ip addresses of the nodes on which to install Ray
|
||||
:param username: the username used to ssh to the cluster
|
||||
:param key_file: the key used to ssh to the cluster
|
||||
:param worker_path: path of the source code to have the workers run
|
||||
:param worker_directory: local directory containing the worker source code
|
||||
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
|
||||
"""
|
||||
# First update the worker code on the nodes.
|
||||
remote_worker_path = _update_worker_code(node_ip_addresses, worker_directory, installation_directory)
|
||||
|
||||
scripts_directory = os.path.join(installation_directory, "ray/scripts")
|
||||
# Start the scheduler
|
||||
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
|
||||
@@ -94,7 +97,7 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work
|
||||
cd "{}";
|
||||
source ../setup-env.sh;
|
||||
python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
|
||||
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path)
|
||||
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, remote_worker_path)
|
||||
run_command_over_ssh(node_ip_address, username, key_file, start_workers_command)
|
||||
|
||||
print "cluster started; you can start the shell on the head node with:"
|
||||
@@ -105,7 +108,7 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work
|
||||
python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach
|
||||
""".format(setup_env_path, shell_script_path, node_ip_addresses[0], node_ip_addresses[0], node_ip_addresses[0])
|
||||
|
||||
def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory):
|
||||
def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory):
|
||||
"""
|
||||
This method is used for restarting the workers in the cluster, for example, to
|
||||
use new application code. This is done without shutting down the scheduler
|
||||
@@ -115,9 +118,12 @@ def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node
|
||||
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
|
||||
:param username: the username used to ssh to the cluster
|
||||
:param key_file: the key used to ssh to the cluster
|
||||
:param worker_path: path of the source code to have the workers run
|
||||
:param worker_directory: local directory containing the worker source code
|
||||
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
|
||||
"""
|
||||
# First update the worker code on the nodes.
|
||||
remote_worker_path = _update_worker_code(node_ip_addresses, worker_directory, installation_directory)
|
||||
|
||||
scripts_directory = os.path.join(installation_directory, "ray/scripts")
|
||||
head_node_ip_address = node_ip_addresses[0]
|
||||
scheduler_address = "{}:10001".format(head_node_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray.
|
||||
@@ -140,7 +146,7 @@ def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node
|
||||
cd "{}";
|
||||
source ../setup-env.sh;
|
||||
python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
|
||||
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path)
|
||||
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, remote_worker_path)
|
||||
run_command_over_ssh(node_ip_address, username, key_file, start_workers_command)
|
||||
|
||||
def _stop_ray(node_ip_addresses, username, key_file):
|
||||
@@ -179,6 +185,47 @@ def _update_ray(node_ip_addresses, username, key_file, installation_directory):
|
||||
for node_ip_address in node_ip_addresses:
|
||||
run_command_over_ssh(node_ip_address, username, key_file, update_cluster_command)
|
||||
|
||||
def _update_worker_code(node_ip_addresses, worker_directory, installation_directory):
|
||||
"""
|
||||
This method is used to sync update the worker source code on each node in the
|
||||
cluster. The worker_directory will be copied under installation_directory.
|
||||
For example, we call _update_worker_code(node_ip_addresses, "~/a/b/c",
|
||||
"/d/e/f"), then the contents of ~/a/b/c on the local machine will be copied
|
||||
to /d/e/f/ray_worker_files/c on each node in the cluster.
|
||||
|
||||
:param node_ip_addresses: ip addresses of the nodes on which to restart the
|
||||
workers
|
||||
:param worker_directory: The path on the local machine to the directory that
|
||||
contains the worker code. This directory must contain a file worker.py.
|
||||
:param installation_directory: Directory in which ray is installed, for
|
||||
example "/home/ubuntu/".
|
||||
|
||||
:rtype: A string with the path to the source code of the worker on the remote
|
||||
nodes.
|
||||
"""
|
||||
worker_directory = os.path.expanduser(worker_directory)
|
||||
if not os.path.isdir(worker_directory):
|
||||
raise Exception("Directory {} does not exist.".format(worker_directory))
|
||||
if not os.path.exists(os.path.join(worker_directory, "worker.py")):
|
||||
raise Exception("Directory {} does not contain a file named worker.py.".format(worker_directory))
|
||||
# If worker_directory is "/a/b/c", then local_directory_name is "c".
|
||||
local_directory_name = os.path.split(os.path.realpath(worker_directory))[1]
|
||||
remote_directory = os.path.join(installation_directory, "ray_worker_files", local_directory_name)
|
||||
for node_ip_address in node_ip_addresses:
|
||||
# Remove and recreate the directory on the node.
|
||||
recreate_directory_command = """
|
||||
rm -r "{}";
|
||||
mkdir -p "{}"
|
||||
""".format(remote_directory, remote_directory)
|
||||
run_command_over_ssh(node_ip_address, username, key_file, recreate_directory_command)
|
||||
# Copy the files from the local machine to the node.
|
||||
copy_command = """
|
||||
scp -r -i {} {}/* {}@{}:{}/
|
||||
""".format(key_file, worker_directory, username, node_ip_address, remote_directory)
|
||||
subprocess.call([copy_command], shell=True)
|
||||
remote_worker_path = os.path.join(remote_directory, "worker.py")
|
||||
return remote_worker_path
|
||||
|
||||
def is_valid_ip(ip_address):
|
||||
"""
|
||||
This method returns true if an address is a valid IPv4 address and returns
|
||||
@@ -224,20 +271,20 @@ if __name__ == "__main__":
|
||||
if check_ip_addresses(node_ip_addresses):
|
||||
_install_ray(node_ip_addresses, username, key_file, installation_directory)
|
||||
|
||||
def start_ray(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses):
|
||||
def start_ray(worker_directory, num_workers_per_node=10, node_ip_addresses=node_ip_addresses):
|
||||
"""
|
||||
This method is used to start Ray on a cluster. It will ssh to the head node,
|
||||
that is, the first node in the list node_ip_addresses, and it will start
|
||||
the scheduler. Then it will ssh to each node and start an object store and
|
||||
some workers.
|
||||
|
||||
:param worker_path: path of the source code to have the workers run
|
||||
:param worker_directory: path of the source code to have the workers run
|
||||
:param node_ip_addresses: ip addresses of the nodes on which to install Ray
|
||||
"""
|
||||
if check_ip_addresses(node_ip_addresses):
|
||||
_start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory)
|
||||
_start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory)
|
||||
|
||||
def restart_workers(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses):
|
||||
def restart_workers(worker_directory, num_workers_per_node=10, node_ip_addresses=node_ip_addresses):
|
||||
"""
|
||||
This method is used for restarting the workers in the cluster, for example, to
|
||||
use new application code. This is done without shutting down the scheduler
|
||||
@@ -245,11 +292,11 @@ if __name__ == "__main__":
|
||||
shut down any drivers.
|
||||
|
||||
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
|
||||
:param worker_path: path of the source code to have the workers run
|
||||
:param worker_directory: path of the source code to have the workers run
|
||||
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
|
||||
"""
|
||||
if check_ip_addresses(node_ip_addresses):
|
||||
_restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory)
|
||||
_restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory)
|
||||
|
||||
def stop_ray(node_ip_addresses=node_ip_addresses):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user