mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 14:48:54 +08:00
* AWS: support multiple availability zones (fix #2177) * Bugfix: [] rather than () * Test config * Test config tweaks * Remove test config * Formatting fixes * Update YAML config
This commit is contained in:
@@ -156,33 +156,25 @@ def _configure_subnet(config):
|
||||
"and trying this again. Note that the subnet must map public IPs "
|
||||
"on instance launch.")
|
||||
if "availability_zone" in config["provider"]:
|
||||
default_subnet = next((
|
||||
s for s in subnets
|
||||
if s.availability_zone == config["provider"]["availability_zone"]),
|
||||
None)
|
||||
if not default_subnet:
|
||||
azs = config["provider"]["availability_zone"].split(',')
|
||||
subnets = [s for s in subnets if s.availability_zone in azs]
|
||||
if not subnets:
|
||||
raise Exception(
|
||||
"No usable subnets matching availability zone {} "
|
||||
"found. Choose a different availability zone or try "
|
||||
"manually creating an instance in your specified region "
|
||||
"to populate the list of subnets and trying this again."
|
||||
.format(config["provider"]["availability_zone"]))
|
||||
else:
|
||||
default_subnet = subnets[0]
|
||||
|
||||
if "SubnetId" not in config["head_node"]:
|
||||
assert default_subnet.map_public_ip_on_launch, \
|
||||
"The chosen subnet must map nodes with public IPs on launch"
|
||||
config["head_node"]["SubnetId"] = default_subnet.id
|
||||
print("SubnetId not specified for head node, using {} in {}".format(
|
||||
default_subnet.id, default_subnet.availability_zone))
|
||||
subnet_ids = [s.subnet_id for s in subnets]
|
||||
subnet_descr = [(s.subnet_id, s.availability_zone) for s in subnets]
|
||||
if "SubnetIds" not in config["head_node"]:
|
||||
config["head_node"]["SubnetIds"] = subnet_ids
|
||||
print("SubnetIds not specified for head node, using ", subnet_descr)
|
||||
|
||||
if "SubnetId" not in config["worker_nodes"]:
|
||||
assert default_subnet.map_public_ip_on_launch, \
|
||||
"The chosen subnet must map nodes with public IPs on launch"
|
||||
config["worker_nodes"]["SubnetId"] = default_subnet.id
|
||||
print("SubnetId not specified for workers, using {} in {}".format(
|
||||
default_subnet.id, default_subnet.availability_zone))
|
||||
if "SubnetIds" not in config["worker_nodes"]:
|
||||
config["worker_nodes"]["SubnetIds"] = subnet_ids
|
||||
print("SubnetId not specified for workers, using ", subnet_descr)
|
||||
|
||||
return config
|
||||
|
||||
@@ -193,8 +185,8 @@ def _configure_security_group(config):
|
||||
return config # have user-defined groups
|
||||
|
||||
group_name = SECURITY_GROUP_TEMPLATE.format(config["cluster_name"])
|
||||
subnet = _get_subnet_or_die(config, config["worker_nodes"]["SubnetId"])
|
||||
security_group = _get_security_group(config, subnet.vpc_id, group_name)
|
||||
vpc_id = _get_vpc_id_or_die(config, config["worker_nodes"]["SubnetIds"][0])
|
||||
security_group = _get_security_group(config, vpc_id, group_name)
|
||||
|
||||
if security_group is None:
|
||||
print("Creating new security group {}".format(group_name))
|
||||
@@ -202,8 +194,8 @@ def _configure_security_group(config):
|
||||
client.create_security_group(
|
||||
Description="Auto-created security group for Ray workers",
|
||||
GroupName=group_name,
|
||||
VpcId=subnet.vpc_id)
|
||||
security_group = _get_security_group(config, subnet.vpc_id, group_name)
|
||||
VpcId=vpc_id)
|
||||
security_group = _get_security_group(config, vpc_id, group_name)
|
||||
assert security_group, "Failed to create security group"
|
||||
|
||||
if not security_group.ip_permissions:
|
||||
@@ -236,7 +228,7 @@ def _configure_security_group(config):
|
||||
return config
|
||||
|
||||
|
||||
def _get_subnet_or_die(config, subnet_id):
|
||||
def _get_vpc_id_or_die(config, subnet_id):
|
||||
ec2 = _resource("ec2", config)
|
||||
subnet = list(
|
||||
ec2.subnets.filter(Filters=[{
|
||||
@@ -245,7 +237,7 @@ def _get_subnet_or_die(config, subnet_id):
|
||||
}]))
|
||||
assert len(subnet) == 1, "Subnet not found"
|
||||
subnet = subnet[0]
|
||||
return subnet
|
||||
return subnet.vpc_id
|
||||
|
||||
|
||||
def _get_security_group(config, vpc_id, group_name):
|
||||
|
||||
@@ -30,7 +30,10 @@ idle_timeout_minutes: 5
|
||||
provider:
|
||||
type: aws
|
||||
region: us-west-2
|
||||
availability_zone: us-west-2a
|
||||
# Availability zone(s), comma-separated, that nodes may be launched in.
|
||||
# Nodes are currently spread between zones by a round-robin approach,
|
||||
# however this implementation detail should not be relied upon.
|
||||
availability_zone: us-west-2a,us-west-2b
|
||||
|
||||
# How Ray will authenticate with newly launched nodes.
|
||||
auth:
|
||||
|
||||
@@ -2,6 +2,8 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import random
|
||||
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
|
||||
@@ -35,6 +37,9 @@ class AWSNodeProvider(NodeProvider):
|
||||
self.ec2 = boto3.resource(
|
||||
"ec2", region_name=provider_config["region"], config=config)
|
||||
|
||||
# Try availability zones round-robin, starting from random offset
|
||||
self.subnet_idx = random.randint(0, 100)
|
||||
|
||||
# Cache of node objects from the last nodes() call. This avoids
|
||||
# excessive DescribeInstances requests.
|
||||
self.cached_nodes = {}
|
||||
@@ -121,9 +126,15 @@ class AWSNodeProvider(NodeProvider):
|
||||
"Key": k,
|
||||
"Value": v,
|
||||
})
|
||||
# SubnetIds is not a real config key: we must resolve to a
|
||||
# single SubnetId before invoking the AWS API.
|
||||
subnet_ids = conf.pop("SubnetIds")
|
||||
subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)]
|
||||
self.subnet_idx += 1
|
||||
conf.update({
|
||||
"MinCount": 1,
|
||||
"MaxCount": count,
|
||||
"SubnetId": subnet_id,
|
||||
"TagSpecifications": conf.get("TagSpecifications", []) + [{
|
||||
"ResourceType": "instance",
|
||||
"Tags": tag_pairs,
|
||||
|
||||
@@ -8,7 +8,7 @@ import os
|
||||
|
||||
def env_integer(key, default):
|
||||
if key in os.environ:
|
||||
return int(os.environ(key))
|
||||
return int(os.environ[key])
|
||||
return default
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user