diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index 14646cb93..77b764743 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -6,14 +6,66 @@ Ray can be used in several ways. In addition to running on a single machine, Ray is designed to run on a cluster of machines. This document is about how to use Ray on a cluster. +### Launching a cluster on EC2 + +This section describes how to start a cluster on EC2. These instructions are +copied and adapted from https://github.com/amplab/spark-ec2. + +#### Before you start + +- Create an Amazon EC2 key pair for yourself. This can be done by logging into +your Amazon Web Services account through the [AWS +console](http://aws.amazon.com/console/), clicking Key Pairs on the left +sidebar, and creating and downloading a key. Make sure that you set the +permissions for the private key file to `600` (i.e. only you can read and write +it) so that `ssh` will work. +- Whenever you want to use the `ec2.py` script, set the environment variables +`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to your Amazon EC2 access key ID +and secret access key. These can be obtained from the [AWS +homepage](http://aws.amazon.com/) by clicking Account > Security Credentials > +Access Credentials. + +#### Launching a Cluster + +- Go into the `ray/scripts` directory. +- Run `python ec2.py -k -i -s launch +`, where `` is the name of your EC2 key pair (that you +gave it when you created it), `` is the private key file for your key +pair, `` is the number of slave nodes to launch (try 1 at first), +and `` is the name to give to your cluster. + + For example: + + ```bash + export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU + export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123 + python ec2.py --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 launch my-ray-cluster + ``` + +The following options are worth pointing out: + +- `--instance-type=` can be used to specify an EC2 instance type +to use. For now, the script only supports 64-bit instance types, and the default +type is `m3.large` (which has 2 cores and 7.5 GB RAM). +- `--region=` specifies an EC2 region in which to launch instances. +The default region is `us-east-1`. +- `--zone=` can be used to specify an EC2 availability zone to launch +instances in. Sometimes, you will get an error because there is not enough +capacity in one zone, and you should try to launch in another. +- `--spot-price=` will launch the worker nodes as [Spot +Instances](http://aws.amazon.com/ec2/spot-instances/), bidding for the given +maximum price (in dollars). + ### Getting started with Ray on a cluster These instructions work on EC2, but they may require some modifications to run on your own cluster. In particular, on EC2, running `sudo` does not require a password, and we currently don't handle the case where a password is needed. -1. Create a file `nodes.txt` of the IP addresses of the nodes in the cluster. -For example +1. If you launched a cluster using the `ec2.py` script from the previous +section, then the file `ray/scripts/nodes.txt` will already have been created. +Otherwise, create a file `nodes.txt` of the IP addresses of the nodes in the +cluster. For example 12.34.56.789 12.34.567.89 diff --git a/scripts/cluster.py b/scripts/cluster.py index 51b80c7ec..02d24310a 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -25,13 +25,17 @@ class RayCluster(object): directory in which Ray should be installed. """ - def __init__(self, node_ip_addresses, username, key_file, installation_directory): + def __init__(self, node_ip_addresses, node_private_ip_addresses, username, key_file, installation_directory): """Initialize the RayCluster object. Args: node_ip_addresses (List[str]): A list of the ip addresses of the nodes in the cluster. The first element is the head node and will host the scheduler process. + node_private_ip_addresses (List[str]): A list of the ip addresses that the + nodes use internally to connect to one another. We include this because + on EC2 communication within a security group must be done over private + ip addresses. username (str): The username used to ssh to nodes in the cluster. key_file (str): The path to the key used to ssh to nodes in the cluster. installation_directory (str): The path on the nodes in the cluster to the @@ -43,6 +47,7 @@ class RayCluster(object): """ _check_ip_addresses(node_ip_addresses) self.node_ip_addresses = node_ip_addresses + self.node_private_ip_addresses = node_private_ip_addresses self.username = username self.key_file = key_file self.installation_directory = installation_directory @@ -149,7 +154,7 @@ class RayCluster(object): cd "{}"; source ../setup-env.sh; python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\", local=False)" > start_scheduler.out 2> start_scheduler.err < /dev/null & - """.format(scripts_directory, self.node_ip_addresses[0]) + """.format(scripts_directory, self.node_private_ip_addresses[0]) self._run_command_over_ssh(self.node_ip_addresses[0], start_scheduler_command) # Start the workers on each node @@ -160,7 +165,7 @@ class RayCluster(object): cd "{}"; source ../setup-env.sh; python -c "import ray; ray.services.start_node(\\\"{}:10001\\\", \\\"{}\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, self.node_ip_addresses[0], self.node_ip_addresses[i], num_workers_per_node, remote_worker_path) + """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path) start_workers_commands.append(start_workers_command) self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) @@ -170,7 +175,7 @@ class RayCluster(object): print """ source "{}"; python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach - """.format(setup_env_path, shell_script_path, self.node_ip_addresses[0], self.node_ip_addresses[0], self.node_ip_addresses[0]) + """.format(setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) def restart_workers(self, worker_directory, num_workers_per_node=10): """Restart the workers on the cluster. @@ -191,9 +196,10 @@ class RayCluster(object): scripts_directory = os.path.join(self.installation_directory, "ray/scripts") head_node_ip_address = self.node_ip_addresses[0] - scheduler_address = "{}:10001".format(head_node_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray. - objstore_address = "{}:20001".format(head_node_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray. - shell_address = "{}:{}".format(head_node_ip_address, np.random.randint(30000, 40000)) # This address must be currently unused. In particular, it cannot be the address of any currently running shell. + head_node_private_ip_address = self.node_private_ip_addresses[0] + scheduler_address = "{}:10001".format(head_node_private_ip_address) # This needs to be the address of the currently running scheduler, which was presumably created in _start_ray. + objstore_address = "{}:20001".format(head_node_private_ip_address) # This needs to be the address of the currently running object store, which was presumably created in _start_ray. + shell_address = "{}:{}".format(head_node_private_ip_address, np.random.randint(30000, 40000)) # This address must be currently unused. In particular, it cannot be the address of any currently running shell. # Kill the current workers by attaching a driver to the scheduler and calling ray.kill_workers() # The triple backslashes are used for two rounds of escaping, something like \\\" -> \" -> " @@ -212,7 +218,7 @@ class RayCluster(object): cd "{}"; source ../setup-env.sh; python -c "import ray; ray.services.start_workers(\\\"{}:10001\\\", \\\"{}:20001\\\", {}, worker_path=\\\"{}\\\")" > start_workers.out 2> start_workers.err < /dev/null & - """.format(scripts_directory, self.node_ip_addresses[0], self.node_ip_addresses[i], num_workers_per_node, remote_worker_path) + """.format(scripts_directory, self.node_private_ip_addresses[0], self.node_private_ip_addresses[i], num_workers_per_node, remote_worker_path) start_workers_commands.append(start_workers_command) self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) @@ -325,6 +331,18 @@ if __name__ == "__main__": username = args.username key_file = args.key_file installation_directory = args.installation_directory - node_ip_addresses = map(lambda s: str(s.strip()), open(args.nodes).readlines()) - cluster = RayCluster(node_ip_addresses, username, key_file, installation_directory) + node_ip_addresses = [] + node_private_ip_addresses = [] + for line in open(args.nodes).readlines(): + parts = line.split(",") + ip_address = str(parts[0].strip()) + if len(parts) == 1: + private_ip_address = ip_address + elif len(parts) == 2: + private_ip_address = str(parts[1].strip()) + else: + raise Exception("Each line in the nodes file must have either one or two ip addresses.") + node_ip_addresses.append(ip_address) + node_private_ip_addresses.append(private_ip_address) + cluster = RayCluster(node_ip_addresses, node_private_ip_addresses, username, key_file, installation_directory) IPython.embed() diff --git a/scripts/ec2.py b/scripts/ec2.py new file mode 100644 index 000000000..56c0e8e94 --- /dev/null +++ b/scripts/ec2.py @@ -0,0 +1,1067 @@ +# This script is copied and adapted from https://github.com/amplab/spark-ec2. + +from __future__ import division, print_function, with_statement + +import codecs +import hashlib +import itertools +import logging +import os +import os.path +import pipes +import random +import shutil +import string +from stat import S_IRUSR +import subprocess +import sys +import tarfile +import tempfile +import textwrap +import time +import warnings +from datetime import datetime +from optparse import OptionParser +from sys import stderr + +import boto +from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType +from boto import ec2 + + +class UsageError(Exception): + pass + + +# Configure and parse our command-line arguments +def parse_args(): + parser = OptionParser( + prog="ec2", + usage="%prog [options] \n\n" + + " can be: launch, destroy, stop, start, get-master, reboot-slaves") + + parser.add_option( + "-s", "--slaves", type="int", default=1, + help="Number of slaves to launch (default: %default)") + parser.add_option( + "-k", "--key-pair", + help="Key pair to use on instances") + parser.add_option( + "-i", "--identity-file", + help="SSH private key file to use for logging into instances") + parser.add_option( + "-p", "--profile", default=None, + help="If you have multiple profiles (AWS or boto config), you can configure " + + "additional, named profiles by using this option (default: %default)") + parser.add_option( + "-t", "--instance-type", default="m3.large", + help="Type of instance to launch (default: %default). " + + "WARNING: must be 64-bit; small instances won't work") + parser.add_option( + "-m", "--master-instance-type", default="", + help="Master instance type (leave empty for same as instance-type)") + parser.add_option( + "-r", "--region", default="us-east-1", + help="EC2 region used to launch instances in, or to find them in (default: %default)") + parser.add_option( + "-z", "--zone", default="", + help="Availability zone to launch instances in, or 'all' to spread " + + "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "between zones applies) (default: a single zone chosen at random)") + parser.add_option( + "-a", "--ami", + help="Amazon Machine Image ID to use") + parser.add_option( + "-D", metavar="[ADDRESS:]PORT", dest="proxy_port", + help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + + "the given local address (for use with login)") + parser.add_option( + "--resume", action="store_true", default=False, + help="Resume installation on a previously launched cluster " + + "(for debugging)") + parser.add_option( + "--ebs-vol-size", metavar="SIZE", type="int", default=0, + help="Size (in GB) of each EBS volume.") + parser.add_option( + "--ebs-vol-type", default="standard", + help="EBS volume type (e.g. 'gp2', 'standard').") + parser.add_option( + "--ebs-vol-num", type="int", default=1, + help="Number of EBS volumes to attach to each node as /vol[x]. " + + "The volumes will be deleted when the instances terminate. " + + "Only possible on EBS-backed AMIs. " + + "EBS volumes are only attached if --ebs-vol-size > 0. " + + "Only support up to 8 EBS volumes.") + parser.add_option( + "--placement-group", type="string", default=None, + help="Which placement group to try and launch " + + "instances into. Assumes placement group is already " + + "created.") + parser.add_option( + "--swap", metavar="SWAP", type="int", default=1024, + help="Swap space to set up per node, in MB (default: %default)") + parser.add_option( + "--spot-price", metavar="PRICE", type="float", + help="If specified, launch slaves as spot instances with the given " + + "maximum price (in dollars)") + parser.add_option( + "-u", "--user", default="ubuntu", + help="The SSH user you want to connect as (default: %default)") + parser.add_option( + "--delete-groups", action="store_true", default=False, + help="When destroying a cluster, delete the security groups that were created") + parser.add_option( + "--use-existing-master", action="store_true", default=False, + help="Launch fresh slaves, but use an existing stopped master if possible") + parser.add_option( + "--authorized-address", type="string", default="0.0.0.0/0", + help="Address to authorize on created security groups (default: %default)") + parser.add_option( + "--additional-security-group", type="string", default="", + help="Additional security group to place the machines in") + parser.add_option( + "--additional-tags", type="string", default="", + help="Additional tags to set on the machines; tags are comma-separated, while name and " + + "value are colon separated; ex: \"Task:MyProject,Env:production\"") + parser.add_option( + "--subnet-id", default=None, + help="VPC subnet to launch instances in") + parser.add_option( + "--vpc-id", default=None, + help="VPC to launch instances in") + parser.add_option( + "--private-ips", action="store_true", default=False, + help="Use private IPs for instances rather than public if VPC/subnet " + + "requires that.") + parser.add_option( + "--instance-initiated-shutdown-behavior", default="stop", + choices=["stop", "terminate"], + help="Whether instances should terminate when shut down or just stop") + parser.add_option( + "--instance-profile-name", default=None, + help="IAM profile name to launch instances under") + + (opts, args) = parser.parse_args() + if len(args) != 2: + parser.print_help() + sys.exit(1) + (action, cluster_name) = args + opts.identity_file = os.path.expanduser(opts.identity_file) + + # Boto config check + # http://boto.cloudhackers.com/en/latest/boto_config_tut.html + home_dir = os.getenv('HOME') + if home_dir is None or not os.path.isfile(home_dir + '/.boto'): + if not os.path.isfile('/etc/boto.cfg'): + # If there is no boto config, check aws credentials + if not os.path.isfile(home_dir + '/.aws/credentials'): + if os.getenv('AWS_ACCESS_KEY_ID') is None: + print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set", + file=stderr) + sys.exit(1) + if os.getenv('AWS_SECRET_ACCESS_KEY') is None: + print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set", + file=stderr) + sys.exit(1) + return (opts, action, cluster_name) + + +# Get the EC2 security group of the given name, creating it if it doesn't exist +def get_or_make_group(conn, name, vpc_id): + groups = conn.get_all_security_groups() + group = [g for g in groups if g.name == name] + if len(group) > 0: + return group[0] + else: + print("Creating security group " + name) + return conn.create_security_group(name, "Ray EC2 group", vpc_id) + +# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/ +# Last Updated: 2015-06-19 +# For easy maintainability, please keep this manually-inputted dictionary sorted by key. +EC2_INSTANCE_TYPES = { + "c1.medium": "pvm", + "c1.xlarge": "pvm", + "c3.large": "hvm", + "c3.xlarge": "hvm", + "c3.2xlarge": "hvm", + "c3.4xlarge": "hvm", + "c3.8xlarge": "hvm", + "c4.large": "hvm", + "c4.xlarge": "hvm", + "c4.2xlarge": "hvm", + "c4.4xlarge": "hvm", + "c4.8xlarge": "hvm", + "cc1.4xlarge": "hvm", + "cc2.8xlarge": "hvm", + "cg1.4xlarge": "hvm", + "cr1.8xlarge": "hvm", + "d2.xlarge": "hvm", + "d2.2xlarge": "hvm", + "d2.4xlarge": "hvm", + "d2.8xlarge": "hvm", + "g2.2xlarge": "hvm", + "g2.8xlarge": "hvm", + "hi1.4xlarge": "pvm", + "hs1.8xlarge": "pvm", + "i2.xlarge": "hvm", + "i2.2xlarge": "hvm", + "i2.4xlarge": "hvm", + "i2.8xlarge": "hvm", + "m1.small": "pvm", + "m1.medium": "pvm", + "m1.large": "pvm", + "m1.xlarge": "pvm", + "m2.xlarge": "pvm", + "m2.2xlarge": "pvm", + "m2.4xlarge": "pvm", + "m3.medium": "hvm", + "m3.large": "hvm", + "m3.xlarge": "hvm", + "m3.2xlarge": "hvm", + "m4.large": "hvm", + "m4.xlarge": "hvm", + "m4.2xlarge": "hvm", + "m4.4xlarge": "hvm", + "m4.10xlarge": "hvm", + "r3.large": "hvm", + "r3.xlarge": "hvm", + "r3.2xlarge": "hvm", + "r3.4xlarge": "hvm", + "r3.8xlarge": "hvm", + "t1.micro": "pvm", + "t2.micro": "hvm", + "t2.small": "hvm", + "t2.medium": "hvm", + "t2.large": "hvm", +} + +# Launch a cluster of the given name, by setting up its security groups, +# and then starting new instances in them. +# Returns a tuple of EC2 reservation objects for the master and slaves +# Fails if there already instances running in the cluster's groups. +def launch_cluster(conn, opts, cluster_name): + if opts.identity_file is None: + print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr) + sys.exit(1) + + if opts.key_pair is None: + print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr) + sys.exit(1) + + user_data_content = None + + print("Setting up security groups...") + master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) + slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + authorized_address = opts.authorized_address + if master_group.rules == []: # Group was just now created + master_group.authorize(src_group=master_group) + master_group.authorize(src_group=slave_group) + master_group.authorize('tcp', 22, 22, authorized_address) + if slave_group.rules == []: # Group was just now created + slave_group.authorize(src_group=master_group) + slave_group.authorize(src_group=slave_group) + slave_group.authorize('tcp', 22, 22, authorized_address) + + # Check if instances are already running in our groups + existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if existing_slaves or (existing_masters and not opts.use_existing_master): + print("ERROR: There are already instances running in group %s or %s" % + (master_group.name, slave_group.name), file=stderr) + sys.exit(1) + + # Use the default Ubuntu AMI. + if opts.ami is None: + if opts.region == "us-east-1": + opts.ami = "ami-2d39803a" + elif opts.region == "us-west-1": + opts.ami = "ami-06116566" + elif opts.region == "us-west-2": + opts.ami = "ami-9abea4fb" + elif opts.region == "eu-west-1": + opts.ami = "ami-f95ef58a" + elif opts.region == "eu-central-1": + opts.ami = "ami-87564feb" + elif opts.region == "ap-northeast-1": + opts.ami = "ami-a21529cc" + elif opts.region == "ap-northeast-2": + opts.ami = "ami-09dc1267" + elif opts.region == "ap-southeast-1": + opts.ami = "ami-25c00c46" + elif opts.region == "ap-southeast-2": + opts.ami = "ami-6c14310f" + elif opts.region == "ap-south-1": + opts.ami = "ami-4a90fa25" + elif opts.region == "sa-east-1": + opts.ami = "ami-0fb83963" + else: + raise Exception("The specified region is unknown.") + + # we use group ids to work around https://github.com/boto/boto/issues/350 + additional_group_ids = [] + if opts.additional_security_group: + additional_group_ids = [sg.id + for sg in conn.get_all_security_groups() + if opts.additional_security_group in (sg.name, sg.id)] + print("Launching instances...") + + try: + image = conn.get_all_images(image_ids=[opts.ami])[0] + except: + print("Could not find AMI " + opts.ami, file=stderr) + sys.exit(1) + + # Create block device mapping so that we can add EBS volumes if asked to. + # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz + block_map = BlockDeviceMapping() + if opts.ebs_vol_size > 0: + for i in range(opts.ebs_vol_num): + device = EBSBlockDeviceType() + device.size = opts.ebs_vol_size + device.volume_type = opts.ebs_vol_type + device.delete_on_termination = True + block_map["/dev/sd" + chr(ord('s') + i)] = device + + # AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342). + if opts.instance_type.startswith('m3.'): + for i in range(get_num_disks(opts.instance_type)): + dev = BlockDeviceType() + dev.ephemeral_name = 'ephemeral%d' % i + # The first ephemeral drive is /dev/sdb. + name = '/dev/sd' + string.ascii_letters[i + 1] + block_map[name] = dev + + # Launch slaves + if opts.spot_price is not None: + # Launch spot instances with the requested price + print("Requesting %d slaves as spot instances with price $%.3f" % + (opts.slaves, opts.spot_price)) + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + my_req_ids = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_reqs = conn.request_spot_instances( + price=opts.spot_price, + image_id=opts.ami, + launch_group="launch-group-%s" % cluster_name, + placement=zone, + count=num_slaves_this_zone, + key_name=opts.key_pair, + security_group_ids=[slave_group.id] + additional_group_ids, + instance_type=opts.instance_type, + block_device_map=block_map, + subnet_id=opts.subnet_id, + placement_group=opts.placement_group, + user_data=user_data_content, + instance_profile_name=opts.instance_profile_name) + my_req_ids += [req.id for req in slave_reqs] + i += 1 + + print("Waiting for spot instances to be granted...") + try: + while True: + time.sleep(10) + reqs = conn.get_all_spot_instance_requests() + id_to_req = {} + for r in reqs: + id_to_req[r.id] = r + active_instance_ids = [] + for i in my_req_ids: + if i in id_to_req and id_to_req[i].state == "active": + active_instance_ids.append(id_to_req[i].instance_id) + if len(active_instance_ids) == opts.slaves: + print("All %d slaves granted" % opts.slaves) + reservations = conn.get_all_reservations(active_instance_ids) + slave_nodes = [] + for r in reservations: + slave_nodes += r.instances + break + else: + print("%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves)) + except: + print("Canceling spot instance requests") + conn.cancel_spot_instance_requests(my_req_ids) + # Log a warning if any of these requests actually launched instances: + (master_nodes, slave_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + running = len(master_nodes) + len(slave_nodes) + if running: + print(("WARNING: %d instances are still running" % running), file=stderr) + sys.exit(0) + else: + # Launch non-spot instances + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + slave_nodes = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + if num_slaves_this_zone > 0: + slave_res = image.run( + key_name=opts.key_pair, + security_group_ids=[slave_group.id] + additional_group_ids, + instance_type=opts.instance_type, + placement=zone, + min_count=num_slaves_this_zone, + max_count=num_slaves_this_zone, + block_device_map=block_map, + subnet_id=opts.subnet_id, + placement_group=opts.placement_group, + user_data=user_data_content, + instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, + instance_profile_name=opts.instance_profile_name) + slave_nodes += slave_res.instances + print("Launched {s} slave{plural_s} in {z}, regid = {r}".format( + s=num_slaves_this_zone, + plural_s=('' if num_slaves_this_zone == 1 else 's'), + z=zone, + r=slave_res.id)) + i += 1 + + # Launch or resume masters + if existing_masters: + print("Starting master...") + for inst in existing_masters: + if inst.state not in ["shutting-down", "terminated"]: + inst.start() + master_nodes = existing_masters + else: + master_type = opts.master_instance_type + if master_type == "": + master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name + master_res = image.run( + key_name=opts.key_pair, + security_group_ids=[master_group.id] + additional_group_ids, + instance_type=master_type, + placement=opts.zone, + min_count=1, + max_count=1, + block_device_map=block_map, + subnet_id=opts.subnet_id, + placement_group=opts.placement_group, + user_data=user_data_content, + instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior, + instance_profile_name=opts.instance_profile_name) + + master_nodes = master_res.instances + print("Launched master in %s, regid = %s" % (zone, master_res.id)) + + # This wait time corresponds to SPARK-4983 + print("Waiting for AWS to propagate instance metadata...") + time.sleep(15) + + # Give the instances descriptive names and set additional tags + additional_tags = {} + if opts.additional_tags.strip(): + additional_tags = dict( + map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',') + ) + + for master in master_nodes: + master.add_tags( + dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) + ) + + for slave in slave_nodes: + slave.add_tags( + dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + ) + + # Return all the instances + return (master_nodes, slave_nodes) + + +def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): + """ + Get the EC2 instances in an existing cluster if available. + Returns a tuple of lists of EC2 instance objects for the masters and slaves. + """ + print("Searching for existing cluster {c} in region {r}...".format( + c=cluster_name, r=opts.region)) + + def get_instances(group_names): + """ + Get all non-terminated instances that belong to any of the provided security groups. + + EC2 reservation filters and instance states are documented here: + http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options + """ + reservations = conn.get_all_reservations( + filters={"instance.group-name": group_names}) + instances = itertools.chain.from_iterable(r.instances for r in reservations) + return [i for i in instances if i.state not in ["shutting-down", "terminated"]] + + master_instances = get_instances([cluster_name + "-master"]) + slave_instances = get_instances([cluster_name + "-slaves"]) + + if any((master_instances, slave_instances)): + print("Found {m} master{plural_m}, {s} slave{plural_s}.".format( + m=len(master_instances), + plural_m=('' if len(master_instances) == 1 else 's'), + s=len(slave_instances), + plural_s=('' if len(slave_instances) == 1 else 's'))) + + if not master_instances and die_on_error: + print("ERROR: Could not find a master for cluster {c} in region {r}.".format( + c=cluster_name, r=opts.region), file=sys.stderr) + sys.exit(1) + + return (master_instances, slave_instances) + + +# Deploy configuration files and run setup scripts on a newly launched +# or started EC2 cluster. +def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): + master = get_dns_name(master_nodes[0], opts.private_ips) + if deploy_ssh_key: + print("Generating cluster's SSH key on master...") + key_setup = """ + [ -f ~/.ssh/id_rsa ] || + (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && + cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) + """ + ssh(master, opts, key_setup) + dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) + print("Transferring cluster's SSH key to slaves...") + for slave in slave_nodes: + slave_address = get_dns_name(slave, opts.private_ips) + print(slave_address) + ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) + + +def is_ssh_available(host, opts, print_ssh_output=True): + """ + Check if SSH is available on a host. + """ + s = subprocess.Popen( + ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', + '%s@%s' % (opts.user, host), stringify_command('true')], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order + ) + cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout + + if s.returncode != 0 and print_ssh_output: + # extra leading newline is for spacing in wait_for_cluster_state() + print(textwrap.dedent("""\n + Warning: SSH connection error. (This could be temporary.) + Host: {h} + SSH return code: {r} + SSH output: {o} + """).format( + h=host, + r=s.returncode, + o=cmd_output.strip() + )) + + return s.returncode == 0 + + +def is_cluster_ssh_available(cluster_instances, opts): + """ + Check if SSH is available on all the instances in a cluster. + """ + for i in cluster_instances: + dns_name = get_dns_name(i, opts.private_ips) + if not is_ssh_available(host=dns_name, opts=opts): + return False + else: + return True + + +def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): + """ + Wait for all the instances in the cluster to reach a designated state. + + cluster_instances: a list of boto.ec2.instance.Instance + cluster_state: a string representing the desired state of all the instances in the cluster + value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as + 'running', 'terminated', etc. + (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250) + """ + sys.stdout.write( + "Waiting for cluster to enter '{s}' state.".format(s=cluster_state) + ) + sys.stdout.flush() + + start_time = datetime.now() + num_attempts = 0 + + while True: + time.sleep(5 * num_attempts) # seconds + + for i in cluster_instances: + i.update() + + max_batch = 100 + statuses = [] + for j in xrange(0, len(cluster_instances), max_batch): + batch = [i.id for i in cluster_instances[j:j + max_batch]] + statuses.extend(conn.get_all_instance_status(instance_ids=batch)) + + if cluster_state == 'ssh-ready': + if all(i.state == 'running' for i in cluster_instances) and \ + all(s.system_status.status == 'ok' for s in statuses) and \ + all(s.instance_status.status == 'ok' for s in statuses) and \ + is_cluster_ssh_available(cluster_instances, opts): + break + else: + if all(i.state == cluster_state for i in cluster_instances): + break + + num_attempts += 1 + + sys.stdout.write(".") + sys.stdout.flush() + + sys.stdout.write("\n") + + end_time = datetime.now() + print("Cluster is now in '{s}' state. Waited {t} seconds.".format( + s=cluster_state, + t=(end_time - start_time).seconds + )) + + +# Get number of local disks available for a given EC2 instance type. +def get_num_disks(instance_type): + # Source: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html + # Last Updated: 2015-06-19 + # For easy maintainability, please keep this manually-inputted dictionary sorted by key. + disks_by_instance = { + "c1.medium": 1, + "c1.xlarge": 4, + "c3.large": 2, + "c3.xlarge": 2, + "c3.2xlarge": 2, + "c3.4xlarge": 2, + "c3.8xlarge": 2, + "c4.large": 0, + "c4.xlarge": 0, + "c4.2xlarge": 0, + "c4.4xlarge": 0, + "c4.8xlarge": 0, + "cc1.4xlarge": 2, + "cc2.8xlarge": 4, + "cg1.4xlarge": 2, + "cr1.8xlarge": 2, + "d2.xlarge": 3, + "d2.2xlarge": 6, + "d2.4xlarge": 12, + "d2.8xlarge": 24, + "g2.2xlarge": 1, + "g2.8xlarge": 2, + "hi1.4xlarge": 2, + "hs1.8xlarge": 24, + "i2.xlarge": 1, + "i2.2xlarge": 2, + "i2.4xlarge": 4, + "i2.8xlarge": 8, + "m1.small": 1, + "m1.medium": 1, + "m1.large": 2, + "m1.xlarge": 4, + "m2.xlarge": 1, + "m2.2xlarge": 1, + "m2.4xlarge": 2, + "m3.medium": 1, + "m3.large": 1, + "m3.xlarge": 2, + "m3.2xlarge": 2, + "m4.large": 0, + "m4.xlarge": 0, + "m4.2xlarge": 0, + "m4.4xlarge": 0, + "m4.10xlarge": 0, + "r3.large": 1, + "r3.xlarge": 1, + "r3.2xlarge": 1, + "r3.4xlarge": 1, + "r3.8xlarge": 2, + "t1.micro": 0, + "t2.micro": 0, + "t2.small": 0, + "t2.medium": 0, + "t2.large": 0, + } + if instance_type in disks_by_instance: + return disks_by_instance[instance_type] + else: + print("WARNING: Don't know number of disks on instance type %s; assuming 1" + % instance_type, file=stderr) + return 1 + + +def stringify_command(parts): + if isinstance(parts, str): + return parts + else: + return ' '.join(map(pipes.quote, parts)) + + +def ssh_args(opts): + parts = ['-o', 'StrictHostKeyChecking=no'] + parts += ['-o', 'UserKnownHostsFile=/dev/null'] + if opts.identity_file is not None: + parts += ['-i', opts.identity_file] + return parts + + +def ssh_command(opts): + return ['ssh'] + ssh_args(opts) + + +# Run a command on a host through ssh, retrying up to five times +# and then throwing an exception if ssh continues to fail. +def ssh(host, opts, command): + tries = 0 + while True: + try: + return subprocess.check_call( + ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host), + stringify_command(command)]) + except subprocess.CalledProcessError as e: + if tries > 5: + # If this was an ssh failure, provide the user with hints. + if e.returncode == 255: + raise UsageError( + "Failed to SSH to remote host {0}.\n" + "Please check that you have provided the correct --identity-file and " + "--key-pair parameters and try again.".format(host)) + else: + raise e + print("Error executing remote command, retrying after 30 seconds: {0}".format(e), + file=stderr) + time.sleep(30) + tries = tries + 1 + + +# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990) +def _check_output(*popenargs, **kwargs): + if 'stdout' in kwargs: + raise ValueError('stdout argument not allowed, it will be overridden.') + process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) + output, unused_err = process.communicate() + retcode = process.poll() + if retcode: + cmd = kwargs.get("args") + if cmd is None: + cmd = popenargs[0] + raise subprocess.CalledProcessError(retcode, cmd, output=output) + return output + + +def ssh_read(host, opts, command): + return _check_output( + ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)]) + + +def ssh_write(host, opts, command, arguments): + tries = 0 + while True: + proc = subprocess.Popen( + ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)], + stdin=subprocess.PIPE) + proc.stdin.write(arguments) + proc.stdin.close() + status = proc.wait() + if status == 0: + break + elif tries > 5: + raise RuntimeError("ssh_write failed with error %s" % proc.returncode) + else: + print("Error {0} while executing remote command, retrying after 30 seconds". + format(status), file=stderr) + time.sleep(30) + tries = tries + 1 + + +# Gets a list of zones to launch instances in +def get_zones(conn, opts): + if opts.zone == 'all': + zones = [z.name for z in conn.get_all_zones()] + else: + zones = [opts.zone] + return zones + + +# Gets the number of items in a partition +def get_partition(total, num_partitions, current_partitions): + num_slaves_this_zone = total // num_partitions + if (total % num_partitions) - current_partitions > 0: + num_slaves_this_zone += 1 + return num_slaves_this_zone + + +# Gets the IP address, taking into account the --private-ips flag +def get_ip_address(instance, private_ips=False): + ip = instance.ip_address if not private_ips else \ + instance.private_ip_address + return ip + + +# Gets the DNS name, taking into account the --private-ips flag +def get_dns_name(instance, private_ips=False): + dns = instance.public_dns_name if not private_ips else \ + instance.private_ip_address + if not dns: + raise UsageError("Failed to determine hostname of {0}.\n" + "Please check that you provided --private-ips if " + "necessary".format(instance)) + return dns + + +# Write the public and private ip addresses of the master and slave nodes to a file. This will be used by cluster.py. +def write_public_and_private_ip_addresses_to_file(master_nodes, slave_nodes): + master_public_ip_address = master_nodes[0].ip_address + master_private_ip_address = master_nodes[0].private_ip_address + slave_public_ip_addresses = [node.ip_address for node in slave_nodes] + slave_private_ip_addresses = [node.private_ip_address for node in slave_nodes] + all_public_ip_addresses = [master_public_ip_address] + all_public_ip_addresses.extend(slave_public_ip_addresses) + all_private_ip_addresses = [master_private_ip_address] + all_private_ip_addresses.extend(slave_private_ip_addresses) + + with open("nodes.txt", "w") as f: + for i in range(len(all_public_ip_addresses)): + f.write("{}, {}\n".format(all_public_ip_addresses[i], all_private_ip_addresses[i])) + + +def real_main(): + (opts, action, cluster_name) = parse_args() + + if opts.identity_file is not None: + if not os.path.exists(opts.identity_file): + print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file), + file=stderr) + sys.exit(1) + + file_mode = os.stat(opts.identity_file).st_mode + if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00': + print("ERROR: The identity file must be accessible only by you.", file=stderr) + print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file), + file=stderr) + sys.exit(1) + + if opts.instance_type not in EC2_INSTANCE_TYPES: + print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format( + t=opts.instance_type), file=stderr) + + if opts.master_instance_type != "": + if opts.master_instance_type not in EC2_INSTANCE_TYPES: + print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( + t=opts.master_instance_type), file=stderr) + # Since we try instance types even if we can't resolve them, we check if they resolve first + # and, if they do, see if they resolve to the same virtualization type. + if opts.instance_type in EC2_INSTANCE_TYPES and \ + opts.master_instance_type in EC2_INSTANCE_TYPES: + if EC2_INSTANCE_TYPES[opts.instance_type] != \ + EC2_INSTANCE_TYPES[opts.master_instance_type]: + print("Error: this script currently does not support having a master and slaves " + "with different AMI virtualization types.", file=stderr) + print("master instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr) + print("slave instance virtualization type: {t}".format( + t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr) + sys.exit(1) + + try: + if opts.profile is None: + conn = ec2.connect_to_region(opts.region) + else: + conn = ec2.connect_to_region(opts.region, profile_name=opts.profile) + except Exception as e: + print((e), file=stderr) + sys.exit(1) + + # Select an AZ at random if it was not specified. + if opts.zone == "": + opts.zone = random.choice(conn.get_all_zones()).name + + if action == "launch": + if opts.slaves <= 0: + print("ERROR: You have to start at least 1 slave", file=sys.stderr) + sys.exit(1) + if opts.resume: + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + else: + (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) + wait_for_cluster_state( + conn=conn, + opts=opts, + cluster_instances=(master_nodes + slave_nodes), + cluster_state='ssh-ready' + ) + setup_cluster(conn, master_nodes, slave_nodes, opts, True) + + # Write the public and private ip addresses to a file. + write_public_and_private_ip_addresses_to_file(master_nodes, slave_nodes) + + elif action == "destroy": + (master_nodes, slave_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + + if any(master_nodes + slave_nodes): + print("The following instances will be terminated:") + for inst in master_nodes + slave_nodes: + print("> %s" % get_dns_name(inst, opts.private_ips)) + print("ALL DATA ON ALL NODES WILL BE LOST!!") + + msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) + response = raw_input(msg) + if response == "y": + print("Terminating master...") + for inst in master_nodes: + inst.terminate() + print("Terminating slaves...") + for inst in slave_nodes: + inst.terminate() + + # Delete security groups as well + if opts.delete_groups: + group_names = [cluster_name + "-master", cluster_name + "-slaves"] + wait_for_cluster_state( + conn=conn, + opts=opts, + cluster_instances=(master_nodes + slave_nodes), + cluster_state='terminated' + ) + print("Deleting security groups (this will take some time)...") + attempt = 1 + while attempt <= 3: + print("Attempt %d" % attempt) + groups = [g for g in conn.get_all_security_groups() if g.name in group_names] + success = True + # Delete individual rules in all groups before deleting groups to + # remove dependencies between them + for group in groups: + print("Deleting rules in security group " + group.name) + for rule in group.rules: + for grant in rule.grants: + success &= group.revoke(ip_protocol=rule.ip_protocol, + from_port=rule.from_port, + to_port=rule.to_port, + src_group=grant) + + # Sleep for AWS eventual-consistency to catch up, and for instances + # to terminate + time.sleep(30) # Yes, it does have to be this long :-( + for group in groups: + try: + # It is needed to use group_id to make it work with VPC + conn.delete_security_group(group_id=group.id) + print("Deleted security group %s" % group.name) + except boto.exception.EC2ResponseError: + success = False + print("Failed to delete security group %s" % group.name) + + # Unfortunately, group.revoke() returns True even if a rule was not + # deleted, so this needs to be rerun if something fails + if success: + break + + attempt += 1 + + if not success: + print("Failed to delete all security groups after 3 tries.") + print("Try re-running in a few minutes.") + + elif action == "reboot-slaves": + response = raw_input( + "Are you sure you want to reboot the cluster " + + cluster_name + " slaves?\n" + + "Reboot cluster slaves " + cluster_name + " (y/N): ") + if response == "y": + (master_nodes, slave_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + print("Rebooting slaves...") + for inst in slave_nodes: + if inst.state not in ["shutting-down", "terminated"]: + print("Rebooting " + inst.id) + inst.reboot() + + elif action == "get-master": + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + if not master_nodes[0].public_dns_name and not opts.private_ips: + print("Master has no public DNS name. Maybe you meant to specify --private-ips?") + else: + print(get_dns_name(master_nodes[0], opts.private_ips)) + + elif action == "stop": + response = raw_input( + "Are you sure you want to stop the cluster " + + cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + + "All data on spot-instance slaves will be lost.\n" + + "Stop cluster " + cluster_name + " (y/N): ") + if response == "y": + (master_nodes, slave_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + print("Stopping master...") + for inst in master_nodes: + if inst.state not in ["shutting-down", "terminated"]: + inst.stop() + print("Stopping slaves...") + for inst in slave_nodes: + if inst.state not in ["shutting-down", "terminated"]: + if inst.spot_instance_request_id: + inst.terminate() + else: + inst.stop() + + elif action == "start": + (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + print("Starting slaves...") + for inst in slave_nodes: + if inst.state not in ["shutting-down", "terminated"]: + inst.start() + print("Starting master...") + for inst in master_nodes: + if inst.state not in ["shutting-down", "terminated"]: + inst.start() + wait_for_cluster_state( + conn=conn, + opts=opts, + cluster_instances=(master_nodes + slave_nodes), + cluster_state='ssh-ready' + ) + + # Determine types of running instances + existing_master_type = master_nodes[0].instance_type + existing_slave_type = slave_nodes[0].instance_type + # Setting opts.master_instance_type to the empty string indicates we + # have the same instance type for the master and the slaves + if existing_master_type == existing_slave_type: + existing_master_type = "" + opts.master_instance_type = existing_master_type + opts.instance_type = existing_slave_type + + setup_cluster(conn, master_nodes, slave_nodes, opts, False) + + # Write the public and private ip addresses to a file. + write_public_and_private_ip_addresses_to_file(master_nodes, slave_nodes) + + else: + print("Invalid action: %s" % action, file=stderr) + sys.exit(1) + + +def main(): + try: + real_main() + except UsageError as e: + print("\nError:\n", e, file=stderr) + sys.exit(1) + + +if __name__ == "__main__": + logging.basicConfig() + main()