script for launching nodes on ec2 (#270)

* original spark-ec2 script

* modifying spark-ec2 for ray
This commit is contained in:
Robert Nishihara
2016-07-16 15:14:14 -07:00
committed by Philipp Moritz
parent 97270f5aec
commit 8465df1146
3 changed files with 1149 additions and 12 deletions
+54 -2
View File
@@ -6,14 +6,66 @@ Ray can be used in several ways. In addition to running on a single machine, Ray
is designed to run on a cluster of machines. This document is about how to use
Ray on a cluster.
### Launching a cluster on EC2
This section describes how to start a cluster on EC2. These instructions are
copied and adapted from https://github.com/amplab/spark-ec2.
#### Before you start
- Create an Amazon EC2 key pair for yourself. This can be done by logging into
your Amazon Web Services account through the [AWS
console](http://aws.amazon.com/console/), clicking Key Pairs on the left
sidebar, and creating and downloading a key. Make sure that you set the
permissions for the private key file to `600` (i.e. only you can read and write
it) so that `ssh` will work.
- Whenever you want to use the `ec2.py` script, set the environment variables
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to your Amazon EC2 access key ID
and secret access key. These can be obtained from the [AWS
homepage](http://aws.amazon.com/) by clicking Account > Security Credentials >
Access Credentials.
#### Launching a Cluster
- Go into the `ray/scripts` directory.
- Run `python ec2.py -k <keypair> -i <key-file> -s <num-slaves> launch
<cluster-name>`, where `<keypair>` is the name of your EC2 key pair (that you
gave it when you created it), `<key-file>` is the private key file for your key
pair, `<num-slaves>` is the number of slave nodes to launch (try 1 at first),
and `<cluster-name>` is the name to give to your cluster.
For example:
```bash
export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
python ec2.py --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 launch my-ray-cluster
```
The following options are worth pointing out:
- `--instance-type=<instance-type>` can be used to specify an EC2 instance type
to use. For now, the script only supports 64-bit instance types, and the default
type is `m3.large` (which has 2 cores and 7.5 GB RAM).
- `--region=<ec2-region>` specifies an EC2 region in which to launch instances.
The default region is `us-east-1`.
- `--zone=<ec2-zone>` can be used to specify an EC2 availability zone to launch
instances in. Sometimes, you will get an error because there is not enough
capacity in one zone, and you should try to launch in another.
- `--spot-price=<price>` will launch the worker nodes as [Spot
Instances](http://aws.amazon.com/ec2/spot-instances/), bidding for the given
maximum price (in dollars).
### Getting started with Ray on a cluster
These instructions work on EC2, but they may require some modifications to run
on your own cluster. In particular, on EC2, running `sudo` does not require a
password, and we currently don't handle the case where a password is needed.
1. Create a file `nodes.txt` of the IP addresses of the nodes in the cluster.
For example
1. If you launched a cluster using the `ec2.py` script from the previous
section, then the file `ray/scripts/nodes.txt` will already have been created.
Otherwise, create a file `nodes.txt` of the IP addresses of the nodes in the
cluster. For example
12.34.56.789
12.34.567.89
+28 -10
View File
@@ -25,13 +25,17 @@ class RayCluster(object):
directory in which Ray should be installed.
"""
def __init__(self, node_ip_addresses, username, key_file, installation_directory):
def __init__(self, node_ip_addresses, node_private_ip_addresses, username, key_file, installation_directory):
"""Initialize the RayCluster object.
Args:
node_ip_addresses (List[str]): A list of the ip addresses of the nodes in
the cluster. The first element is the head node and will host the
scheduler process.
node_private_ip_addresses (List[str]): A list of the ip addresses that the
nodes use internally to connect to one another. We include this because
on EC2 communication within a security group must be done over private
ip addresses.
username (str): The username used to ssh to nodes in the cluster.
key_file (str): The path to the key used to ssh to nodes in the cluster.
installation_directory (str): The path on the nodes in the cluster to the
@@ -43,6 +47,7 @@ class RayCluster(object):
"""
_check_ip_addresses(node_ip_addresses)
self.node_ip_addresses = node_ip_addresses
self.node_private_ip_addresses = node_private_ip_addresses
self.username = username
self.key_file = key_file
self.installation_directory = installation_directory
@@ -149,7 +154,7 @@ class RayCluster(object):
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\", local=False)" > start_scheduler.out 2> start_scheduler.err < /dev/null &
""".format(scripts_directory, self.node_ip_addresses[0])
""".format(scripts_directory, self.node_private_ip_addresses[0])
self._run_command_over_ssh(self.node_ip_addresses[0], start_scheduler_command)
# Start the workers on each node
@@ -160,7 +165,7 @@ class RayCluster(object):
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, self.node_ip_addresses[0], self.node_ip_addresses[i], num_workers_per_node, remote_worker_path)
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path)
start_workers_commands.append(start_workers_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands)
@@ -170,7 +175,7 @@ class RayCluster(object):
print """
source "{}";
python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach
""".format(setup_env_path, shell_script_path, self.node_ip_addresses[0], self.node_ip_addresses[0], self.node_ip_addresses[0])
""".format(setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])
def restart_workers(self, worker_directory, num_workers_per_node=10):
"""Restart the workers on the cluster.
@@ -191,9 +196,10 @@ class RayCluster(object):
scripts_directory = os.path.join(self.installation_directory, "ray/scripts")
head_node_ip_address = self.node_ip_addresses[0]
scheduler_address = "{}:10001".format(head_node_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray.
objstore_address = "{}:20001".format(head_node_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray.
shell_address = "{}:{}".format(head_node_ip_address, np.random.randint(30000, 40000)) # This address must be currently unused. In particular, it cannot be the address of any currently running shell.
head_node_private_ip_address = self.node_private_ip_addresses[0]
scheduler_address = "{}:10001".format(head_node_private_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray.
objstore_address = "{}:20001".format(head_node_private_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray.
shell_address = "{}:{}".format(head_node_private_ip_address, np.random.randint(30000, 40000)) # This address must be currently unused. In particular, it cannot be the address of any currently running shell.
# Kill the current workers by attaching a driver to the scheduler and calling ray.kill_workers()
# The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> "
@@ -212,7 +218,7 @@ class RayCluster(object):
cd "{}";
source ../setup-env.sh;
python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null &
""".format(scripts_directory, self.node_ip_addresses[0], self.node_ip_addresses[i], num_workers_per_node, remote_worker_path)
""".format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path)
start_workers_commands.append(start_workers_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands)
@@ -325,6 +331,18 @@ if __name__ == "__main__":
username = args.username
key_file = args.key_file
installation_directory = args.installation_directory
node_ip_addresses = map(lambda s: str(s.strip()), open(args.nodes).readlines())
cluster = RayCluster(node_ip_addresses, username, key_file, installation_directory)
node_ip_addresses = []
node_private_ip_addresses = []
for line in open(args.nodes).readlines():
parts = line.split(",")
ip_address = str(parts[0].strip())
if len(parts) == 1:
private_ip_address = ip_address
elif len(parts) == 2:
private_ip_address = str(parts[1].strip())
else:
raise Exception("Each line in the nodes file must have either one or two ip addresses.")
node_ip_addresses.append(ip_address)
node_private_ip_addresses.append(private_ip_address)
cluster = RayCluster(node_ip_addresses, node_private_ip_addresses, username, key_file, installation_directory)
IPython.embed()
+1067
View File
File diff suppressed because it is too large Load Diff