From 0ffe657e277e39a562072babc471d3bd632bc050 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 1 Jul 2016 14:10:51 -0700 Subject: [PATCH] enable restarting workers in singlenode case, plus cleanups to cluster.py (#190) --- README.md | 15 ++- lib/python/ray/__init__.py | 2 +- lib/python/ray/services.py | 23 ++++- lib/python/ray/worker.py | 28 +++++- scripts/cluster.py | 201 +++++++++++++++++++++++++++++++++---- scripts/start_workers.py | 15 --- 6 files changed, 234 insertions(+), 50 deletions(-) delete mode 100644 scripts/start_workers.py diff --git a/README.md b/README.md index e506e2cd9..a0c64081e 100644 --- a/README.md +++ b/README.md @@ -72,11 +72,16 @@ appropriate values. This assumes that you can connect to each IP address in ssh -i key.pem ubuntu@ ``` 4. The previous command should open a Python interpreter. To install Ray on the -cluster, run `install_ray(node_addresses)` in the interpreter. The interpreter -should block until the installation has completed. +cluster, run `install_ray()` in the interpreter. The interpreter should block +until the installation has completed. 5. To check that the installation succeeded, you can ssh to each node, cd into the directory `ray/test/`, and run the tests (e.g., `python runtest.py`). 6. Now that Ray has been installed, you can start the cluster (the scheduler, -object stores, and workers) with the command `start_ray(node_addresses, -"/home/ubuntu/ray/test/test_worker.py")`, where the second argument is the path -on each node in the cluster to the worker code that you would like to use. +object stores, and workers) with the command +`start_ray("/home/ubuntu/ray/scripts/default_worker.py")`, where the argument is +the path on each node in the cluster to the worker code that you would like to +use. The workers can be restarted with +`restart_workers("/home/ubuntu/ray/scripts/default_worker.py")`, for example if +you wish to update the application code running on the workers. The cluster +processes (the scheduler, the object stores, and the workers) can be stopped +with `stop_ray()`. diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 2c8381429..2284976fe 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -8,6 +8,6 @@ PYTHON_MODE = 3 import libraylib as lib import serialization -from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers +from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers, restart_workers_local from libraylib import ObjRef import internal diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 215ffeb52..de2254297 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -5,7 +5,7 @@ import time import datetime import ray -import ray.worker as worker +import worker from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP _services_env = os.environ.copy() @@ -94,7 +94,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None :param scheduler_address: ip address and port of the scheduler (which may run on a different node) :param node_ip_address: ip address (without port) of the node this function is run on :param num_workers: the number of workers to be started on this node - :worker_path: path of the source code that will be run on the worker + :param worker_path: path of the source code that will be run on the worker """ objstore_address = address(node_ip_address, new_objstore_port()) start_objstore(scheduler_address, objstore_address) @@ -102,9 +102,26 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None for _ in range(num_workers): start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.3) - ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), is_driver=True) time.sleep(0.5) +def start_workers(scheduler_address, objstore_address, num_workers, worker_path): + """ + Start a new set of workers on this node. This assumes that the scheduler is + already running and that the object store on this node is already running. + The intended use case is that a developer wants to update the code running + on the worker processes so first kills all of the workers and then runs this + method. + + :param scheduler_address: ip address and port of the scheduler (which may run on a different node) + :param objstore_address: ip address and port of the object store (which runs on the same node) + :param num_workers: the number of workers to be started on this node + :param worker_path: path of the source code that will be run on the worker + """ + node_ip_address = objstore_address.split(":")[0] + for _ in range(num_workers): + start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + # driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and # ray.SHELL_MODE if it is being used interactively in a shell. It can also equal # ray.PYTHON_MODE to run things in a manner equivalent to serial Python code. diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index f17455694..9652d2e25 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -14,6 +14,7 @@ from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP import serialization import ray.internal.graph_pb2 import ray.graph +import services class RayFailedObject(object): """If a task throws an exception during execution, a RayFailedObject is stored in the object store for each of the tasks outputs.""" @@ -196,15 +197,18 @@ def register_module(module, recursive=False, worker=global_worker): # elif recursive and isinstance(val, ModuleType): # register_module(val, recursive, worker) -def connect(scheduler_addr, objstore_addr, worker_addr, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE): +def connect(scheduler_address, objstore_address, worker_address, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE): if hasattr(worker, "handle"): del worker.handle - worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr, is_driver) + worker.scheduler_address = scheduler_address + worker.objstore_address = objstore_address + worker.worker_address = worker_address + worker.handle = ray.lib.create_worker(worker.scheduler_address, worker.objstore_address, worker.worker_address, is_driver) + worker.set_mode(mode) FORMAT = "%(asctime)-15s %(message)s" - log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr)) + log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_address)) logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log") ray.lib.set_log_config(log_basename + "-c++.log") - worker.set_mode(mode) def disconnect(worker=global_worker): ray.lib.disconnect(worker.handle) @@ -230,11 +234,27 @@ def put(value, worker=global_worker): return objref def kill_workers(worker=global_worker): + """ + This method kills all of the workers in the cluster. It does not kill drivers. + """ success = ray.lib.kill_workers(worker.handle) if not success: print "Could not kill all workers; check that there are no tasks currently running." return success +def restart_workers_local(num_workers, worker_path, worker=global_worker): + """ + This method kills all of the workers and starts new workers locally on the + same node as the driver. This is intended for use in the case where Ray is + being used on a single node. + + :param num_workers: the number of workers to be started + :param worker_path: path of the source code that will be run on the worker + """ + if not kill_workers(worker): + return False + services.start_workers(worker.scheduler_address, worker.objstore_address, num_workers, worker_path) + def main_loop(worker=global_worker): if not ray.lib.connected(worker.handle): raise Exception("Worker is attempting to enter main_loop but has not been connected yet.") diff --git a/scripts/cluster.py b/scripts/cluster.py index 78bf015fb..f6a0bf13d 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -18,11 +18,31 @@ parser.add_argument("--username", type=str, required=True, help="User name for l parser.add_argument("--installation-directory", type=str, required=True, help="The directory in which to install Ray.") def run_command_over_ssh(node_ip_address, username, key_file, command): + """ + This method is used for connecting to a node with ssh and running a sequence + of commands. + + :param node_ip_address: the ip address of the node to ssh to + :param username: the username used to ssh to the cluster + :param key_file: the key used to ssh to the cluster + :param command: the command to run over ssh, currently this command is not allowed to have any single quotes + """ + if "'" in command: + raise Exception("Commands run over ssh must not contain the single quote character. This command does: {}".format(command)) full_command = "ssh -o StrictHostKeyChecking=no -i {} {}@{} '{}'".format(key_file, username, node_ip_address, command) subprocess.call([full_command], shell=True) print "Finished running command '{}' on {}@{}.".format(command, username, node_ip_address) -def install_ray_multi_node(node_ip_addresses, username, key_file, installation_directory): +def _install_ray(node_ip_addresses, username, key_file, installation_directory): + """ + This method is used to install Ray on a cluster. For each node in the cluster, + it will ssh to the node and run the build scripts. + + :param node_ip_addresses: ip addresses of the nodes on which to install Ray + :param username: the username used to ssh to the cluster + :param key_file: the key used to ssh to the cluster + :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" + """ def install_ray_over_ssh(node_ip_address, username, key_file, installation_directory): install_ray_command = """ sudo apt-get update && @@ -43,21 +63,37 @@ def install_ray_multi_node(node_ip_addresses, username, key_file, installation_d for t in threads: t.join() -def start_ray_multi_node(node_ip_addresses, username, key_file, worker_path, installation_directory): - build_directory = os.path.join(installation_directory, "ray/build") +def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory): + """ + This method is used to start Ray on a cluster. It will ssh to the head node, + that is, the first node in the list node_ip_addresses, and it will start + the scheduler. Then it will ssh to each node and start an object store and + some workers. + + :param node_ip_addresses: ip addresses of the nodes on which to install Ray + :param username: the username used to ssh to the cluster + :param key_file: the key used to ssh to the cluster + :param worker_path: path of the source code to have the workers run + :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" + """ + scripts_directory = os.path.join(installation_directory, "ray/scripts") + # Start the scheduler + # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " start_scheduler_command = """ cd "{}"; - nohup ./scheduler {}:10001 > scheduler.out 2> scheduler.err < /dev/null & - """.format(build_directory, node_ip_addresses[0]) + source ../setup-env.sh; + python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\")" > start_scheduler.out 2> start_scheduler.err < /dev/null & + """.format(scripts_directory, node_ip_addresses[0]) run_command_over_ssh(node_ip_addresses[0], username, key_file, start_scheduler_command) + # Start the workers on each node + # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " for i, node_ip_address in enumerate(node_ip_addresses): - scripts_directory = os.path.join(installation_directory, "ray/scripts") start_workers_command = """ cd "{}"; source ../setup-env.sh; - python start_workers.py --scheduler-address={}:10001 --node-ip={} --worker-path="{}" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], worker_path) + python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & + """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path) run_command_over_ssh(node_ip_address, username, key_file, start_workers_command) print "cluster started; you can start the shell on the head node with:" @@ -68,12 +104,69 @@ def start_ray_multi_node(node_ip_addresses, username, key_file, worker_path, ins python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach """.format(setup_env_path, shell_script_path, node_ip_addresses[0], node_ip_addresses[0], node_ip_addresses[0]) -def stop_ray_multi_node(node_ip_addresses, username, key_file): +def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory): + """ + This method is used for restarting the workers in the cluster, for example, to + use new application code. This is done without shutting down the scheduler + or the object stores so that work is not thrown away. It also does not shut + down any drivers. + + :param node_ip_addresses: ip addresses of the nodes on which to restart the workers + :param username: the username used to ssh to the cluster + :param key_file: the key used to ssh to the cluster + :param worker_path: path of the source code to have the workers run + :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" + """ + scripts_directory = os.path.join(installation_directory, "ray/scripts") + head_node_ip_address = node_ip_addresses[0] + scheduler_address = "{}:10001".format(head_node_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray. + objstore_address = "{}:20001".format(head_node_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray. + shell_address = "{}:30000".format(head_node_ip_address) # This address must be currently unused. In particular, it cannot be the address of any currently running shell. + + # Kill the current workers by attaching a driver to the scheduler and calling ray.kill_workers() + # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " + kill_workers_command = """ + cd "{}"; + source ../setup-env.sh; + python -c "import ray; ray.connect(\\\"{}\\\", \\\"{}\\\", \\\"{}\\\", is_driver=True); ray.kill_workers()" + """.format(scripts_directory, scheduler_address, objstore_address, shell_address) + run_command_over_ssh(head_node_ip_address, username, key_file, kill_workers_command) + + # Start new workers on each node + # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " + for i, node_ip_address in enumerate(node_ip_addresses): + start_workers_command = """ + cd "{}"; + source ../setup-env.sh; + python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & + """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path) + run_command_over_ssh(node_ip_address, username, key_file, start_workers_command) + +def _stop_ray(node_ip_addresses, username, key_file): + """ + This method is used for stopping a Ray cluster. It will ssh to each node and + kill every schedule, object store, and Python process. + + :param node_ip_addresses: ip addresses of the nodes on which to restart the workers + :param username: the username used to ssh to the cluster + :param key_file: the key used to ssh to the cluster + """ kill_cluster_command = "killall scheduler objstore python > /dev/null 2> /dev/null" for node_ip_address in node_ip_addresses: run_command_over_ssh(node_ip_address, username, key_file, kill_cluster_command) -def update_ray_multi_node(node_ip_addresses, username, key_file, installation_directory): +def _update_ray(node_ip_addresses, username, key_file, installation_directory): + """ + This method is used for updating the Ray source code on a Ray cluster. It + will ssh to each node, will pull the latest source code from the Ray + repository, and will rerun the build script (though currently it will not + rebuild the third party libraries). + + :param node_ip_addresses: ip addresses of the nodes on which to restart the workers + :param username: the username used to ssh to the cluster + :param key_file: the key used to ssh to the cluster + :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" + """ ray_directory = os.path.join(installation_directory, "ray") update_cluster_command = """ cd "{}" && @@ -85,34 +178,98 @@ def update_ray_multi_node(node_ip_addresses, username, key_file, installation_di for node_ip_address in node_ip_addresses: run_command_over_ssh(node_ip_address, username, key_file, update_cluster_command) -# Returns true if address is a valid IPv4 address and false otherwise. def is_valid_ip(ip_address): + """ + This method returns true if an address is a valid IPv4 address and returns + false otherwise. + + :param ip_address: the ip address to check + """ try: socket.inet_aton(ip_address) return True except socket.error: return False +def check_ip_addresses(node_ip_addresses): + """ + This method checks if all of the addresses in a list are valid IPv4 address. + If not, it returns false and prints an error message for each invalid + address. + + :param node_ip_addresses: the list of ip addresses to check + """ + addresses_valid = True + for index, node_ip_address in enumerate(node_ip_addresses): + if not is_valid_ip(node_ip_address): + print "ERROR: node_ip_addresses[{}] is '{}', which is not a valid IP address.".format(index, node_ip_address) + addresses_valid = False + return addresses_valid + if __name__ == "__main__": args = parser.parse_args() username = args.username key_file = args.key_file installation_directory = args.installation_directory node_ip_addresses = map(lambda s: str(s.strip()), open(args.nodes).readlines()) - for index, node_ip_address in enumerate(node_ip_addresses): - if not is_valid_ip(node_ip_address): - print "\nWARNING: The string '{}' from line {} in the file {} is not a valid IP address.\n".format(node_ip_address, index + 1, args.nodes) - def install_ray(node_ip_addresses): - install_ray_multi_node(node_ip_addresses, username, key_file, installation_directory) + def install_ray(node_ip_addresses=node_ip_addresses): + """ + This method is used to install Ray on a cluster. For each node in the cluster, + it will ssh to the node and run the build scripts. - def start_ray(node_ip_addresses, worker_path): - start_ray_multi_node(node_ip_addresses, username, key_file, worker_path, installation_directory) + :param node_ip_addresses: ip addresses of the nodes on which to install Ray + """ + if check_ip_addresses(node_ip_addresses): + _install_ray(node_ip_addresses, username, key_file, installation_directory) - def stop_ray(node_ip_addresses): - stop_ray_multi_node(node_ip_addresses, username, key_file) + def start_ray(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses): + """ + This method is used to start Ray on a cluster. It will ssh to the head node, + that is, the first node in the list node_ip_addresses, and it will start + the scheduler. Then it will ssh to each node and start an object store and + some workers. - def update_ray(node_ip_addresses): - update_ray_multi_node(node_ip_addresses, username, key_file, installation_directory) + :param worker_path: path of the source code to have the workers run + :param node_ip_addresses: ip addresses of the nodes on which to install Ray + """ + if check_ip_addresses(node_ip_addresses): + _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory) + + def restart_workers(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses): + """ + This method is used for restarting the workers in the cluster, for example, to + use new application code. This is done without shutting down the scheduler + or the object stores so that work is not thrown away. It also does not + shut down any drivers. + + :param node_ip_addresses: ip addresses of the nodes on which to restart the workers + :param worker_path: path of the source code to have the workers run + :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" + """ + if check_ip_addresses(node_ip_addresses): + _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory) + + def stop_ray(node_ip_addresses=node_ip_addresses): + """ + This method is used for stopping a Ray cluster. It will ssh to each node and + kill every schedule, object store, and Python process. + + :param node_ip_addresses: ip addresses of the nodes on which to restart the workers + """ + if check_ip_addresses(node_ip_addresses): + _stop_ray(node_ip_addresses, username, key_file) + + def update_ray(node_ip_addresses=node_ip_addresses): + """ + This method is used for updating the Ray source code on a Ray cluster. It + will ssh to each node, will pull the latest source code from the Ray + repository, and will rerun the build script (though currently it will not + rebuild the third party libraries). + + :param node_ip_addresses: ip addresses of the nodes on which to restart the workers + """ + if check_ip_addresses(node_ip_addresses): + _update_ray(node_ip_addresses, username, key_file, installation_directory) IPython.embed() diff --git a/scripts/start_workers.py b/scripts/start_workers.py deleted file mode 100644 index 52d4ef30d..000000000 --- a/scripts/start_workers.py +++ /dev/null @@ -1,15 +0,0 @@ -import argparse -from ray.services import start_node -import time - -parser = argparse.ArgumentParser(description="Starting workers on a node of the cluster (invoked locally on the node).") -parser.add_argument("--scheduler-address", type=str, help="Address of the scheduler running on the head node (ip + port).") -parser.add_argument("--node-ip", type=str, help="IP address of the current worker.") -parser.add_argument("--num-workers", type=int, default=20, help="Number of workers to be started on the node.") -parser.add_argument("--worker-path", type=str, help="Path to the worker file.") - -if __name__ == "__main__": - args = parser.parse_args() - start_node(args.scheduler_address, args.node_ip, args.num_workers, worker_path=args.worker_path) - - time.sleep(1000000000) # TODO(pcm): Figure out why object store file handle is closed if we don't do this