From 30684446a676894bc82f28bb0658c62ba4082e21 Mon Sep 17 00:00:00 2001 From: Adam Gleave Date: Tue, 19 Jun 2018 20:22:07 -0700 Subject: [PATCH] Support multiple availability zones in AWS (fix #2177) (#2254) * AWS: support multiple availability zones (fix #2177) * Bugfix: [] rather than () * Test config * Test config tweaks * Remove test config * Formatting fixes * Update YAML config --- python/ray/autoscaler/aws/config.py | 42 +++++++++------------ python/ray/autoscaler/aws/example-full.yaml | 5 ++- python/ray/autoscaler/aws/node_provider.py | 11 ++++++ python/ray/ray_constants.py | 2 +- 4 files changed, 33 insertions(+), 27 deletions(-) diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index 97d333783..beaa38ee2 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -156,33 +156,25 @@ def _configure_subnet(config): "and trying this again. Note that the subnet must map public IPs " "on instance launch.") if "availability_zone" in config["provider"]: - default_subnet = next(( - s for s in subnets - if s.availability_zone == config["provider"]["availability_zone"]), - None) - if not default_subnet: + azs = config["provider"]["availability_zone"].split(',') + subnets = [s for s in subnets if s.availability_zone in azs] + if not subnets: raise Exception( "No usable subnets matching availability zone {} " "found. Choose a different availability zone or try " "manually creating an instance in your specified region " "to populate the list of subnets and trying this again." .format(config["provider"]["availability_zone"])) - else: - default_subnet = subnets[0] - if "SubnetId" not in config["head_node"]: - assert default_subnet.map_public_ip_on_launch, \ - "The chosen subnet must map nodes with public IPs on launch" - config["head_node"]["SubnetId"] = default_subnet.id - print("SubnetId not specified for head node, using {} in {}".format( - default_subnet.id, default_subnet.availability_zone)) + subnet_ids = [s.subnet_id for s in subnets] + subnet_descr = [(s.subnet_id, s.availability_zone) for s in subnets] + if "SubnetIds" not in config["head_node"]: + config["head_node"]["SubnetIds"] = subnet_ids + print("SubnetIds not specified for head node, using ", subnet_descr) - if "SubnetId" not in config["worker_nodes"]: - assert default_subnet.map_public_ip_on_launch, \ - "The chosen subnet must map nodes with public IPs on launch" - config["worker_nodes"]["SubnetId"] = default_subnet.id - print("SubnetId not specified for workers, using {} in {}".format( - default_subnet.id, default_subnet.availability_zone)) + if "SubnetIds" not in config["worker_nodes"]: + config["worker_nodes"]["SubnetIds"] = subnet_ids + print("SubnetId not specified for workers, using ", subnet_descr) return config @@ -193,8 +185,8 @@ def _configure_security_group(config): return config # have user-defined groups group_name = SECURITY_GROUP_TEMPLATE.format(config["cluster_name"]) - subnet = _get_subnet_or_die(config, config["worker_nodes"]["SubnetId"]) - security_group = _get_security_group(config, subnet.vpc_id, group_name) + vpc_id = _get_vpc_id_or_die(config, config["worker_nodes"]["SubnetIds"][0]) + security_group = _get_security_group(config, vpc_id, group_name) if security_group is None: print("Creating new security group {}".format(group_name)) @@ -202,8 +194,8 @@ def _configure_security_group(config): client.create_security_group( Description="Auto-created security group for Ray workers", GroupName=group_name, - VpcId=subnet.vpc_id) - security_group = _get_security_group(config, subnet.vpc_id, group_name) + VpcId=vpc_id) + security_group = _get_security_group(config, vpc_id, group_name) assert security_group, "Failed to create security group" if not security_group.ip_permissions: @@ -236,7 +228,7 @@ def _configure_security_group(config): return config -def _get_subnet_or_die(config, subnet_id): +def _get_vpc_id_or_die(config, subnet_id): ec2 = _resource("ec2", config) subnet = list( ec2.subnets.filter(Filters=[{ @@ -245,7 +237,7 @@ def _get_subnet_or_die(config, subnet_id): }])) assert len(subnet) == 1, "Subnet not found" subnet = subnet[0] - return subnet + return subnet.vpc_id def _get_security_group(config, vpc_id, group_name): diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index e6fba5c38..9d1f581ef 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -30,7 +30,10 @@ idle_timeout_minutes: 5 provider: type: aws region: us-west-2 - availability_zone: us-west-2a + # Availability zone(s), comma-separated, that nodes may be launched in. + # Nodes are currently spread between zones by a round-robin approach, + # however this implementation detail should not be relied upon. + availability_zone: us-west-2a,us-west-2b # How Ray will authenticate with newly launched nodes. auth: diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index ced3601cd..3fb19632a 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -2,6 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import random + import boto3 from botocore.config import Config @@ -35,6 +37,9 @@ class AWSNodeProvider(NodeProvider): self.ec2 = boto3.resource( "ec2", region_name=provider_config["region"], config=config) + # Try availability zones round-robin, starting from random offset + self.subnet_idx = random.randint(0, 100) + # Cache of node objects from the last nodes() call. This avoids # excessive DescribeInstances requests. self.cached_nodes = {} @@ -121,9 +126,15 @@ class AWSNodeProvider(NodeProvider): "Key": k, "Value": v, }) + # SubnetIds is not a real config key: we must resolve to a + # single SubnetId before invoking the AWS API. + subnet_ids = conf.pop("SubnetIds") + subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)] + self.subnet_idx += 1 conf.update({ "MinCount": 1, "MaxCount": count, + "SubnetId": subnet_id, "TagSpecifications": conf.get("TagSpecifications", []) + [{ "ResourceType": "instance", "Tags": tag_pairs, diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 9925f08a7..ada9a76aa 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -8,7 +8,7 @@ import os def env_integer(key, default): if key in os.environ: - return int(os.environ(key)) + return int(os.environ[key]) return default