diff --git a/doc/cluster-api.rst b/doc/cluster-api.rst index 34ffb5c5a..cf11ee6b0 100644 --- a/doc/cluster-api.rst +++ b/doc/cluster-api.rst @@ -7,3 +7,4 @@ The Cluster API .. automethod:: cluster.RayCluster.stop_ray .. automethod:: cluster.RayCluster.restart_workers .. automethod:: cluster.RayCluster.update_ray +.. automethod:: cluster.RayCluster.run_command_over_ssh_on_all_nodes_in_parallel diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index cd851f6a8..d6502a106 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -155,7 +155,24 @@ to the cluster's head node (as described by the output of the ray.init(node_ip_address="98.76.54.321", scheduler_address="98.76.54.321:10001") ``` -7. Note that there are several more commands that can be run from within +8. If you would like to run the example applications on the cluster. You will +need to install a few more Python packages. This can be done, within +`cluster.py`, by running the following. + ```python + install_example_dependencies_command = """ + # Install TensorFlow + sudo pip install --upgrade https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.9.0-cp27-none-linux_x86_64.whl; + # Install SciPy + sudo apt-get -y install libatlas-base-dev gfortran; + sudo pip install scipy; + # Install Gym + sudo apt-get -y install zlib1g-dev libjpeg-dev xvfb libav-tools xorg-dev python-opengl libsdl2-dev swig wget; + sudo pip install gym[atari] + """ + cluster.run_command_over_ssh_on_all_nodes_in_parallel(install_example_dependencies_command) + ``` + +9. Note that there are several more commands that can be run from within `cluster.py`. - `cluster.install_ray()` - This pulls the Ray source code on each node, @@ -167,3 +184,5 @@ to the cluster's head node (as described by the output of the processes). - `cluster.update_ray()` - This pulls the latest Ray source code and builds it. + - `cluster.run_command_over_ssh_on_all_nodes_in_parallel(command)` - This + will ssh to each node in the cluster and run a command. diff --git a/scripts/cluster.py b/scripts/cluster.py index 59a27e339..d3b44c120 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -83,25 +83,34 @@ class RayCluster(object): for t in threads: t.join() - def _run_command_over_ssh_on_all_nodes_in_parallel(self, commands): + def run_command_over_ssh_on_all_nodes_in_parallel(self, command): """Run a command over ssh on all nodes in the cluster in parallel. Args: - commands: This is either a single command to run on every node in the - cluster ove ssh, or it is a list of commands of the same length as + command: This is either a single command to run on every node in the + cluster over ssh, or it is a list of commands of the same length as node_ip_addresses, in which case the ith command will be run on the ith element of node_ip_addresses. Currently this command is not allowed to have any single quotes. Raises: - Exception: An exception is raised if commands is not a string or is not a + Exception: An exception is raised if command is not a string or is not a list with the same length as node_ip_addresses. """ - if isinstance(commands, str): - # If there is only one command, then run this command on every node in the cluster. - commands = len(self.node_ip_addresses) * [commands] + if isinstance(command, str): + # If there is only one command, then run this command on every node in the + # cluster. + commands = len(self.node_ip_addresses) * [command] + else: + # Otherwise, there is a list of one command for each node in the cluster. + commands = command + # Make sure we have one command for each node. if len(commands) != len(self.node_ip_addresses): raise Exception("The number of commands must match the number of nodes.") + # Make sure that the commands do not contain any single quotes. + for command in commands: + if "'" in command: + raise Exception("Commands run over ssh must not contain the single quote character. This command does: {}".format(command)) functions = [] inputs = [] def function(node_ip_address, command): @@ -127,7 +136,7 @@ class RayCluster(object): ./setup.sh; ./build.sh """.format(self.installation_directory, self.installation_directory) - self._run_command_over_ssh_on_all_nodes_in_parallel(install_ray_command) + self.run_command_over_ssh_on_all_nodes_in_parallel(install_ray_command) def start_ray(self, user_source_directory=None, num_workers_per_node=10): """Start Ray on a cluster. @@ -168,7 +177,7 @@ class RayCluster(object): python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, user_source_directory={})" > start_workers.out 2> start_workers.err < /dev/null & """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_user_source_directory_str) start_workers_commands.append(start_workers_command) - self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) + self.run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh") cd_location = remote_user_source_directory if user_source_directory is not None else os.path.join(self.installation_directory, "ray") @@ -195,7 +204,7 @@ class RayCluster(object): kill every schedule, object store, and Python process. """ kill_cluster_command = "killall scheduler objstore python > /dev/null 2> /dev/null" - self._run_command_over_ssh_on_all_nodes_in_parallel(kill_cluster_command) + self.run_command_over_ssh_on_all_nodes_in_parallel(kill_cluster_command) def update_ray(self, branch=None): """Pull the latest Ray source code and rebuild Ray. @@ -219,7 +228,7 @@ class RayCluster(object): (make -C "./build" clean || rm -rf "./build") && ./build.sh """.format(ray_directory, change_branch_command) - self._run_command_over_ssh_on_all_nodes_in_parallel(update_cluster_command) + self.run_command_over_ssh_on_all_nodes_in_parallel(update_cluster_command) def _update_user_code(self, user_source_directory): """Update the user's source code on each node in the cluster. @@ -250,7 +259,7 @@ class RayCluster(object): rm -r "{}"; mkdir -p "{}" """.format(remote_directory, remote_directory) - self._run_command_over_ssh_on_all_nodes_in_parallel(recreate_directory_command) + self.run_command_over_ssh_on_all_nodes_in_parallel(recreate_directory_command) # Copy the files from the local machine to the node. def copy_function(node_ip_address): copy_command = """ @@ -316,7 +325,7 @@ if __name__ == "__main__": # used for installing Ray. Note that single quotes around 'echo $HOME' are # important. If you use double quotes, then the $HOME environment variable # will be expanded locally instead of remotely. - echo_home_command = "ssh -i {} {}@{} 'echo $HOME'".format(key_file, username, node_ip_addresses[0]) + echo_home_command = "ssh -o StrictHostKeyChecking=no -i {} {}@{} 'echo $HOME'".format(key_file, username, node_ip_addresses[0]) installation_directory = subprocess.check_output(echo_home_command, shell=True).strip() print "Using '{}' as the home directory on the cluster.".format(installation_directory) # Create the Raycluster object.