diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index 4d2285db7..a5c208bed 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -45,30 +45,38 @@ until the installation has completed. The standard output from the nodes will be redirected to your terminal. 5. To check that the installation succeeded, you can ssh to each node, cd into the directory `ray/test/`, and run the tests (e.g., `python runtest.py`). -6. Now that Ray has been installed, you can start the cluster (the scheduler, -object stores, and workers) with the command -`start_ray("/home/ubuntu/ray/scripts/default_worker.py")`, where the second -argument is the path on each node in the cluster to the worker code that you -would like to use. After completing successfully, this command will print out a -command that can be run on the head node to attach a shell (the driver) to the -cluster. For example, +6. Create a directory (for example, `mkdir ~/example_ray_code`) containing the +worker `worker.py` code along with the code for any modules imported by +`worker.py`. For example, + + ``` + cp ray/scripts/default_worker.py ~/example_ray_code/worker.py + cp ray/scripts/example_functions.py ~/example_ray_code/ + ``` + +7. Start the cluster (the scheduler, object stores, and workers) with the +command `start_ray("~/example_ray_code")`, where the second argument is the +local path to the worker code that you would like to use. This command will copy +the worker code to each node and will start the cluster. After completing +successfully, this command will print out a command that can be run on the head +node to attach a shell (the driver) to the cluster. For example, ``` source "$RAY_HOME/setup-env.sh"; - python "$RAY_HOME/scripts/shell.py" --scheduler-address=52.50.28.103:10001 --objstore-address=52.50.28.103:20001 --worker-address=52.50.28.103:30001 --attach + python "$RAY_HOME/scripts/shell.py" --scheduler-address=12.34.56.789:10001 --objstore-address=12.34.56.789:20001 --worker-address=12.34.56.789:30001 --attach ``` -7. Note that there are several more commands that can be run from within +8. Note that there are several more commands that can be run from within `cluster.py`. - `install_ray()` - This pulls the Ray source code on each node, builds all of the third party libraries, and builds the project itself. - - `start_ray(worker_path, num_workers_per_node=10)` - This starts a + - `start_ray(worker_directory, num_workers_per_node=10)` - This starts a scheduler process on the head node, and it starts an object store and some workers on each node. - `stop_ray()` - This shuts down the cluster (killing all of the processes). - - `restart_workers(worker_path, num_workers_per_node=10)` - This kills the - current workers and starts new workers using the worker code from the + - `restart_workers(worker_directory, num_workers_per_node=10)` - This kills + the current workers and starts new workers using the worker code from the given file. Currently, this can only run when there are no tasks currently executing on any of the workers. - `update_ray()` - This pulls the latest Ray source code and builds it. diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 6a47f5fdc..a0e48beb4 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -145,8 +145,6 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None time.sleep(0.2) for _ in range(num_workers): start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False) - time.sleep(0.3) - ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), is_driver=True) time.sleep(0.5) def start_workers(scheduler_address, objstore_address, num_workers, worker_path): diff --git a/scripts/cluster.py b/scripts/cluster.py index f4cdae33d..0ff93ece4 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -64,7 +64,7 @@ def _install_ray(node_ip_addresses, username, key_file, installation_directory): for t in threads: t.join() -def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory): +def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory): """ This method is used to start Ray on a cluster. It will ssh to the head node, that is, the first node in the list node_ip_addresses, and it will start @@ -74,9 +74,12 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work :param node_ip_addresses: ip addresses of the nodes on which to install Ray :param username: the username used to ssh to the cluster :param key_file: the key used to ssh to the cluster - :param worker_path: path of the source code to have the workers run + :param worker_directory: local directory containing the worker source code :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" """ + # First update the worker code on the nodes. + remote_worker_path = _update_worker_code(node_ip_addresses, worker_directory, installation_directory) + scripts_directory = os.path.join(installation_directory, "ray/scripts") # Start the scheduler # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " @@ -94,7 +97,7 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work cd "{}"; source ../setup-env.sh; python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path) + """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, remote_worker_path) run_command_over_ssh(node_ip_address, username, key_file, start_workers_command) print "cluster started; you can start the shell on the head node with:" @@ -105,7 +108,7 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach """.format(setup_env_path, shell_script_path, node_ip_addresses[0], node_ip_addresses[0], node_ip_addresses[0]) -def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory): +def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory): """ This method is used for restarting the workers in the cluster, for example, to use new application code. This is done without shutting down the scheduler @@ -115,9 +118,12 @@ def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node :param node_ip_addresses: ip addresses of the nodes on which to restart the workers :param username: the username used to ssh to the cluster :param key_file: the key used to ssh to the cluster - :param worker_path: path of the source code to have the workers run + :param worker_directory: local directory containing the worker source code :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" """ + # First update the worker code on the nodes. + remote_worker_path = _update_worker_code(node_ip_addresses, worker_directory, installation_directory) + scripts_directory = os.path.join(installation_directory, "ray/scripts") head_node_ip_address = node_ip_addresses[0] scheduler_address = "{}:10001".format(head_node_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray. @@ -140,7 +146,7 @@ def _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node cd "{}"; source ../setup-env.sh; python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, worker_path) + """.format(scripts_directory, node_ip_addresses[0], node_ip_addresses[i], num_workers_per_node, remote_worker_path) run_command_over_ssh(node_ip_address, username, key_file, start_workers_command) def _stop_ray(node_ip_addresses, username, key_file): @@ -179,6 +185,47 @@ def _update_ray(node_ip_addresses, username, key_file, installation_directory): for node_ip_address in node_ip_addresses: run_command_over_ssh(node_ip_address, username, key_file, update_cluster_command) +def _update_worker_code(node_ip_addresses, worker_directory, installation_directory): + """ + This method is used to sync update the worker source code on each node in the + cluster. The worker_directory will be copied under installation_directory. + For example, we call _update_worker_code(node_ip_addresses, "~/a/b/c", + "/d/e/f"), then the contents of ~/a/b/c on the local machine will be copied + to /d/e/f/ray_worker_files/c on each node in the cluster. + + :param node_ip_addresses: ip addresses of the nodes on which to restart the + workers + :param worker_directory: The path on the local machine to the directory that + contains the worker code. This directory must contain a file worker.py. + :param installation_directory: Directory in which ray is installed, for + example "/home/ubuntu/". + + :rtype: A string with the path to the source code of the worker on the remote + nodes. + """ + worker_directory = os.path.expanduser(worker_directory) + if not os.path.isdir(worker_directory): + raise Exception("Directory {} does not exist.".format(worker_directory)) + if not os.path.exists(os.path.join(worker_directory, "worker.py")): + raise Exception("Directory {} does not contain a file named worker.py.".format(worker_directory)) + # If worker_directory is "/a/b/c", then local_directory_name is "c". + local_directory_name = os.path.split(os.path.realpath(worker_directory))[1] + remote_directory = os.path.join(installation_directory, "ray_worker_files", local_directory_name) + for node_ip_address in node_ip_addresses: + # Remove and recreate the directory on the node. + recreate_directory_command = """ + rm -r "{}"; + mkdir -p "{}" + """.format(remote_directory, remote_directory) + run_command_over_ssh(node_ip_address, username, key_file, recreate_directory_command) + # Copy the files from the local machine to the node. + copy_command = """ + scp -r -i {} {}/* {}@{}:{}/ + """.format(key_file, worker_directory, username, node_ip_address, remote_directory) + subprocess.call([copy_command], shell=True) + remote_worker_path = os.path.join(remote_directory, "worker.py") + return remote_worker_path + def is_valid_ip(ip_address): """ This method returns true if an address is a valid IPv4 address and returns @@ -224,20 +271,20 @@ if __name__ == "__main__": if check_ip_addresses(node_ip_addresses): _install_ray(node_ip_addresses, username, key_file, installation_directory) - def start_ray(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses): + def start_ray(worker_directory, num_workers_per_node=10, node_ip_addresses=node_ip_addresses): """ This method is used to start Ray on a cluster. It will ssh to the head node, that is, the first node in the list node_ip_addresses, and it will start the scheduler. Then it will ssh to each node and start an object store and some workers. - :param worker_path: path of the source code to have the workers run + :param worker_directory: path of the source code to have the workers run :param node_ip_addresses: ip addresses of the nodes on which to install Ray """ if check_ip_addresses(node_ip_addresses): - _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory) + _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory) - def restart_workers(worker_path, num_workers_per_node=10, node_ip_addresses=node_ip_addresses): + def restart_workers(worker_directory, num_workers_per_node=10, node_ip_addresses=node_ip_addresses): """ This method is used for restarting the workers in the cluster, for example, to use new application code. This is done without shutting down the scheduler @@ -245,11 +292,11 @@ if __name__ == "__main__": shut down any drivers. :param node_ip_addresses: ip addresses of the nodes on which to restart the workers - :param worker_path: path of the source code to have the workers run + :param worker_directory: path of the source code to have the workers run :param installation_directory: directory in which Ray is installed, for example "/home/ubuntu/" """ if check_ip_addresses(node_ip_addresses): - _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_path, installation_directory) + _restart_workers(node_ip_addresses, username, key_file, num_workers_per_node, worker_directory, installation_directory) def stop_ray(node_ip_addresses=node_ip_addresses): """