enable restarting workers in singlenode case, plus cleanups to cluster.py (#190)

This commit is contained in:
Robert Nishihara
2016-07-01 14:10:51 -07:00
committed by Philipp Moritz
parent f5316d50fc
commit 0ffe657e27
6 changed files with 234 additions and 50 deletions
+10 -5
View File
@@ -72,11 +72,16 @@ appropriate values. This assumes that you can connect to each IP address in
ssh -i key.pem ubuntu@<ip-address>
```
4. The previous command should open a Python interpreter. To install Ray on the
cluster, run `install_ray(node_addresses)` in the interpreter. The interpreter
should block until the installation has completed.
cluster, run `install_ray()` in the interpreter. The interpreter should block
until the installation has completed.
5. To check that the installation succeeded, you can ssh to each node, cd into
the directory `ray/test/`, and run the tests (e.g., `python runtest.py`).
6. Now that Ray has been installed, you can start the cluster (the scheduler,
object stores, and workers) with the command `start_ray(node_addresses,
"/home/ubuntu/ray/test/test_worker.py")`, where the second argument is the path
on each node in the cluster to the worker code that you would like to use.
object stores, and workers) with the command
`start_ray("/home/ubuntu/ray/scripts/default_worker.py")`, where the argument is
the path on each node in the cluster to the worker code that you would like to
use. The workers can be restarted with
`restart_workers("/home/ubuntu/ray/scripts/default_worker.py")`, for example if
you wish to update the application code running on the workers. The cluster
processes (the scheduler, the object stores, and the workers) can be stopped
with `stop_ray()`.
+1 -1
View File
@@ -8,6 +8,6 @@ PYTHON_MODE = 3
import libraylib as lib
import serialization
from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers
from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers, restart_workers_local
from libraylib import ObjRef
import internal
+20 -3
View File
@@ -5,7 +5,7 @@ import time
import datetime
import ray
import ray.worker as worker
import worker
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
_services_env = os.environ.copy()
@@ -94,7 +94,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
:param scheduler_address: ip address and port of the scheduler (which may run on a different node)
:param node_ip_address: ip address (without port) of the node this function is run on
:param num_workers: the number of workers to be started on this node
:worker_path: path of the source code that will be run on the worker
:param worker_path: path of the source code that will be run on the worker
"""
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address)
@@ -102,9 +102,26 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
time.sleep(0.3)
ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), is_driver=True)
time.sleep(0.5)
def start_workers(scheduler_address, objstore_address, num_workers, worker_path):
"""
Start a new set of workers on this node. This assumes that the scheduler is
already running and that the object store on this node is already running.
The intended use case is that a developer wants to update the code running
on the worker processes so first kills all of the workers and then runs this
method.
:param scheduler_address: ip address and port of the scheduler (which may run on a different node)
:param objstore_address: ip address and port of the object store (which runs on the same node)
:param num_workers: the number of workers to be started on this node
:param worker_path: path of the source code that will be run on the worker
"""
node_ip_address = objstore_address.split(":")[0]
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
# driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and
# ray.SHELL_MODE if it is being used interactively in a shell. It can also equal
# ray.PYTHON_MODE to run things in a manner equivalent to serial Python code.
+24 -4
View File
@@ -14,6 +14,7 @@ from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
import serialization
import ray.internal.graph_pb2
import ray.graph
import services
class RayFailedObject(object):
"""If a task throws an exception during execution, a RayFailedObject is stored in the object store for each of the tasks outputs."""
@@ -196,15 +197,18 @@ def register_module(module, recursive=False, worker=global_worker):
# elif recursive and isinstance(val, ModuleType):
# register_module(val, recursive, worker)
def connect(scheduler_addr, objstore_addr, worker_addr, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE):
def connect(scheduler_address, objstore_address, worker_address, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE):
if hasattr(worker, "handle"):
del worker.handle
worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr, is_driver)
worker.scheduler_address = scheduler_address
worker.objstore_address = objstore_address
worker.worker_address = worker_address
worker.handle = ray.lib.create_worker(worker.scheduler_address, worker.objstore_address, worker.worker_address, is_driver)
worker.set_mode(mode)
FORMAT = "%(asctime)-15s %(message)s"
log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr))
log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_address))
logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log")
ray.lib.set_log_config(log_basename + "-c++.log")
worker.set_mode(mode)
def disconnect(worker=global_worker):
ray.lib.disconnect(worker.handle)
@@ -230,11 +234,27 @@ def put(value, worker=global_worker):
return objref
def kill_workers(worker=global_worker):
"""
This method kills all of the workers in the cluster. It does not kill drivers.
"""
success = ray.lib.kill_workers(worker.handle)
if not success:
print "Could not kill all workers; check that there are no tasks currently running."
return success
def restart_workers_local(num_workers, worker_path, worker=global_worker):
"""
This method kills all of the workers and starts new workers locally on the
same node as the driver. This is intended for use in the case where Ray is
being used on a single node.
:param num_workers: the number of workers to be started
:param worker_path: path of the source code that will be run on the worker
"""
if not kill_workers(worker):
return False
services.start_workers(worker.scheduler_address, worker.objstore_address, num_workers, worker_path)
def main_loop(worker=global_worker):
if not ray.lib.connected(worker.handle):
raise Exception("Worker is attempting to enter main_loop but has not been connected yet.")
+179 -22
View File
@@ -18,11 +18,31 @@ parser.add_argument("--username", type=str, required=True, help="User name for l
parser.add_argument("--installation-directory", type=str, required=True, help="The directory in which to install Ray.")
def run_command_over_ssh(node_ip_address, username, key_file, command):
"""
This method is used for connecting to a node with ssh and running a sequence
of commands.
:param node_ip_address: the ip address of the node to ssh to
:param username: the username used to ssh to the cluster
:param key_file: the key used to ssh to the cluster
:param command: the command to run over ssh, currently this command is not allowed to have any single quotes
"""
if "'" in command:
raise Exception("Commands run over ssh must not contain the single quote character. This command does: {}".format(command))
full_command = "ssh -o StrictHostKeyChecking=no -i {} {}@{} '{}'".format(key_file, username, node_ip_address, command)
subprocess.call([full_command], shell=True)
print "Finished running command '{}' on {}@{}.".format(command, username, node_ip_address)
def install_ray_multi_node(node_ip_addresses, username, key_file, installation_directory):
def _install_ray(node_ip_addresses, username, key_file, installation_directory):
"""
This method is used to install Ray on a cluster. For each node in the cluster,
it will ssh to the node and run the build scripts.
:param node_ip_addresses: ip addresses of the nodes on which to install Ray
:param username: the username used to ssh to the cluster
:param key_file: the key used to ssh to the cluster
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
"""
def install_ray_over_ssh(node_ip_address, username, key_file, installation_directory):
install_ray_command = """
sudo apt-get update &&
@@ -43,21 +63,37 @@ def install_ray_multi_node(node_ip_addresses, username, key_file, installation_d
for t in threads:
t.join()
def start_ray_multi_node(node_ip_addresses, username, key_file, worker_path, installation_directory):
build_directory = os.path.join(installation_directory, "ray/build")
def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory):
"""
This method is used to start Ray on a cluster. It will ssh to the head node,
that is, the first node in the list node_ip_addresses, and it will start
the scheduler. Then it will ssh to each node and start an object store and
some workers.
:param node_ip_addresses: ip addresses of the nodes on which to install Ray
:param username: the username used to ssh to the cluster
:param key_file: the key used to ssh to the cluster
:param worker_path: path of the source code to have the workers run
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
"""
scripts_directory = os.path.join(installation_directory, "ray/scripts")
# Start the scheduler
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
start_scheduler_command = """
cd "{}";
nohup ./scheduler {}:10001 > scheduler.out 2> scheduler.err < /dev/null &
""".format(build_directory, node_ip_addresses[0])
source ../setup-env.sh;
python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\")" > start_scheduler.out 2> start_scheduler.err < /dev/null &
""".format(scripts_directory, node_ip_addresses[0])
run_command_over_ssh(node_ip_addresses[0], username, key_file, start_scheduler_command)
# Start the workers on each node
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
for i, node_ip_address in enumerate(node_ip_addresses):
scripts_directory = os.path.join(installation_directory, "ray/scripts")
start_workers_command = """
cd "{}";
source ../setup-env.sh;
python start_workers.py --scheduler-address={}:10001 --node-ip={} --worker-path="{}" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], worker_path)
python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path)
run_command_over_ssh(node_ip_address, username, key_file, start_workers_command)
print "cluster started; you can start the shell on the head node with:"
@@ -68,12 +104,69 @@ def start_ray_multi_node(node_ip_addresses, username, key_file, worker_path, ins
python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach
""".format(setup_env_path, shell_script_path, node_ip_addresses[0], node_ip_addresses[0], node_ip_addresses[0])
def stop_ray_multi_node(node_ip_addresses, username, key_file):
def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory):
"""
This method is used for restarting the workers in the cluster, for example, to
use new application code. This is done without shutting down the scheduler
or the object stores so that work is not thrown away. It also does not shut
down any drivers.
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
:param username: the username used to ssh to the cluster
:param key_file: the key used to ssh to the cluster
:param worker_path: path of the source code to have the workers run
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
"""
scripts_directory = os.path.join(installation_directory, "ray/scripts")
head_node_ip_address = node_ip_addresses[0]
scheduler_address = "{}:10001".format(head_node_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray.
objstore_address = "{}:20001".format(head_node_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray.
shell_address = "{}:30000".format(head_node_ip_address) # This address must be currently unused. In particular, it cannot be the address of any currently running shell.
# Kill the current workers by attaching a driver to the scheduler and calling ray.kill_workers()
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
kill_workers_command = """
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.connect(\\\"{}\\\", \\\"{}\\\", \\\"{}\\\", is_driver=True); ray.kill_workers()"
""".format(scripts_directory, scheduler_address, objstore_address, shell_address)
run_command_over_ssh(head_node_ip_address, username, key_file, kill_workers_command)
# Start new workers on each node
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
for i, node_ip_address in enumerate(node_ip_addresses):
start_workers_command = """
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path)
run_command_over_ssh(node_ip_address, username, key_file, start_workers_command)
def _stop_ray(node_ip_addresses, username, key_file):
"""
This method is used for stopping a Ray cluster. It will ssh to each node and
kill every schedule, object store, and Python process.
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
:param username: the username used to ssh to the cluster
:param key_file: the key used to ssh to the cluster
"""
kill_cluster_command = "killall scheduler objstore python > /dev/null 2> /dev/null"
for node_ip_address in node_ip_addresses:
run_command_over_ssh(node_ip_address, username, key_file, kill_cluster_command)
def update_ray_multi_node(node_ip_addresses, username, key_file, installation_directory):
def _update_ray(node_ip_addresses, username, key_file, installation_directory):
"""
This method is used for updating the Ray source code on a Ray cluster. It
will ssh to each node, will pull the latest source code from the Ray
repository, and will rerun the build script (though currently it will not
rebuild the third party libraries).
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
:param username: the username used to ssh to the cluster
:param key_file: the key used to ssh to the cluster
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
"""
ray_directory = os.path.join(installation_directory, "ray")
update_cluster_command = """
cd "{}" &&
@@ -85,34 +178,98 @@ def update_ray_multi_node(node_ip_addresses, username, key_file, installation_di
for node_ip_address in node_ip_addresses:
run_command_over_ssh(node_ip_address, username, key_file, update_cluster_command)
# Returns true if address is a valid IPv4 address and false otherwise.
def is_valid_ip(ip_address):
"""
This method returns true if an address is a valid IPv4 address and returns
false otherwise.
:param ip_address: the ip address to check
"""
try:
socket.inet_aton(ip_address)
return True
except socket.error:
return False
def check_ip_addresses(node_ip_addresses):
"""
This method checks if all of the addresses in a list are valid IPv4 address.
If not, it returns false and prints an error message for each invalid
address.
:param node_ip_addresses: the list of ip addresses to check
"""
addresses_valid = True
for index, node_ip_address in enumerate(node_ip_addresses):
if not is_valid_ip(node_ip_address):
print "ERROR: node_ip_addresses[{}] is '{}', which is not a valid IP address.".format(index, node_ip_address)
addresses_valid = False
return addresses_valid
if __name__ == "__main__":
args = parser.parse_args()
username = args.username
key_file = args.key_file
installation_directory = args.installation_directory
node_ip_addresses = map(lambda s: str(s.strip()), open(args.nodes).readlines())
for index, node_ip_address in enumerate(node_ip_addresses):
if not is_valid_ip(node_ip_address):
print "\nWARNING: The string '{}' from line {} in the file {} is not a valid IP address.\n".format(node_ip_address, index + 1, args.nodes)
def install_ray(node_ip_addresses):
install_ray_multi_node(node_ip_addresses, username, key_file, installation_directory)
def install_ray(node_ip_addresses=node_ip_addresses):
"""
This method is used to install Ray on a cluster. For each node in the cluster,
it will ssh to the node and run the build scripts.
def start_ray(node_ip_addresses, worker_path):
start_ray_multi_node(node_ip_addresses, username, key_file, worker_path, installation_directory)
:param node_ip_addresses: ip addresses of the nodes on which to install Ray
"""
if check_ip_addresses(node_ip_addresses):
_install_ray(node_ip_addresses, username, key_file, installation_directory)
def stop_ray(node_ip_addresses):
stop_ray_multi_node(node_ip_addresses, username, key_file)
def start_ray(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses):
"""
This method is used to start Ray on a cluster. It will ssh to the head node,
that is, the first node in the list node_ip_addresses, and it will start
the scheduler. Then it will ssh to each node and start an object store and
some workers.
def update_ray(node_ip_addresses):
update_ray_multi_node(node_ip_addresses, username, key_file, installation_directory)
:param worker_path: path of the source code to have the workers run
:param node_ip_addresses: ip addresses of the nodes on which to install Ray
"""
if check_ip_addresses(node_ip_addresses):
_start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory)
def restart_workers(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses):
"""
This method is used for restarting the workers in the cluster, for example, to
use new application code. This is done without shutting down the scheduler
or the object stores so that work is not thrown away. It also does not
shut down any drivers.
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
:param worker_path: path of the source code to have the workers run
:param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/"
"""
if check_ip_addresses(node_ip_addresses):
_restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory)
def stop_ray(node_ip_addresses=node_ip_addresses):
"""
This method is used for stopping a Ray cluster. It will ssh to each node and
kill every schedule, object store, and Python process.
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
"""
if check_ip_addresses(node_ip_addresses):
_stop_ray(node_ip_addresses, username, key_file)
def update_ray(node_ip_addresses=node_ip_addresses):
"""
This method is used for updating the Ray source code on a Ray cluster. It
will ssh to each node, will pull the latest source code from the Ray
repository, and will rerun the build script (though currently it will not
rebuild the third party libraries).
:param node_ip_addresses: ip addresses of the nodes on which to restart the workers
"""
if check_ip_addresses(node_ip_addresses):
_update_ray(node_ip_addresses, username, key_file, installation_directory)
IPython.embed()
-15
View File
@@ -1,15 +0,0 @@
import argparse
from ray.services import start_node
import time
parser = argparse.ArgumentParser(description="Starting workers on a node of the cluster (invoked locally on the node).")
parser.add_argument("--scheduler-address", type=str, help="Address of the scheduler running on the head node (ip + port).")
parser.add_argument("--node-ip", type=str, help="IP address of the current worker.")
parser.add_argument("--num-workers", type=int, default=20, help="Number of workers to be started on the node.")
parser.add_argument("--worker-path", type=str, help="Path to the worker file.")
if __name__ == "__main__":
args = parser.parse_args()
start_node(args.scheduler_address, args.node_ip, args.num_workers, worker_path=args.worker_path)
time.sleep(1000000000) # TODO(pcm): Figure out why object store file handle is closed if we don't do this