mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 23:39:37 +08:00
[autoscaling] increase connect timeout, boto retries, and check subnet conf (#1422)
* some autoscaling config tweaks * Sun Jan 14 13:56:55 PST 2018 * Mon Jan 15 14:21:09 PST 2018 * increase backoff * Mon Jan 15 14:40:47 PST 2018 * check boto version
This commit is contained in:
@@ -2,26 +2,35 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from distutils.version import StrictVersion
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES
|
||||
|
||||
RAY = "ray-autoscaler"
|
||||
DEFAULT_RAY_INSTANCE_PROFILE = RAY
|
||||
DEFAULT_RAY_IAM_ROLE = RAY
|
||||
SECURITY_GROUP_TEMPLATE = RAY + "-{}"
|
||||
|
||||
assert StrictVersion(boto3.__version__) >= StrictVersion("1.4.8"), \
|
||||
"Boto3 version >= 1.4.8 required, try `pip install -U boto3`"
|
||||
|
||||
def key_pair(i):
|
||||
|
||||
def key_pair(i, region):
|
||||
"""Returns the ith default (aws_key_pair_name, key_pair_path)."""
|
||||
if i == 0:
|
||||
return RAY, os.path.expanduser("~/.ssh/{}.pem".format(RAY))
|
||||
return (
|
||||
"{}_{}".format(RAY, region),
|
||||
os.path.expanduser("~/.ssh/{}_{}.pem".format(RAY, region)))
|
||||
return (
|
||||
"{}_{}".format(RAY, i),
|
||||
os.path.expanduser("~/.ssh/{}_{}.pem".format(RAY, i)))
|
||||
"{}_{}_{}".format(RAY, i, region),
|
||||
os.path.expanduser("~/.ssh/{}_{}_{}.pem".format(RAY, i, region)))
|
||||
|
||||
|
||||
# Suppress excessive connection dropped logs from boto
|
||||
@@ -103,7 +112,7 @@ def _configure_key_pair(config):
|
||||
|
||||
# Try a few times to get or create a good key pair.
|
||||
for i in range(10):
|
||||
key_name, key_path = key_pair(i)
|
||||
key_name, key_path = key_pair(i, config["provider"]["region"])
|
||||
key = _get_key(key_name, config)
|
||||
|
||||
# Found a good key.
|
||||
@@ -135,14 +144,16 @@ def _configure_key_pair(config):
|
||||
def _configure_subnet(config):
|
||||
ec2 = _resource("ec2", config)
|
||||
subnets = sorted(
|
||||
[s for s in ec2.subnets.all() if s.state == "available"],
|
||||
[s for s in ec2.subnets.all()
|
||||
if s.state == "available" and s.map_public_ip_on_launch],
|
||||
reverse=True, # sort from Z-A
|
||||
key=lambda subnet: subnet.availability_zone)
|
||||
if not subnets:
|
||||
raise Exception(
|
||||
"No subnets found, try manually creating an instance in "
|
||||
"No usable subnets found, try manually creating an instance in "
|
||||
"your specified region to populate the list of subnets "
|
||||
"and trying this again.")
|
||||
"and trying this again. Note that the subnet must map public IPs "
|
||||
"on instance launch.")
|
||||
if "availability_zone" in config["provider"]:
|
||||
default_subnet = next((s for s in subnets
|
||||
if s.availability_zone ==
|
||||
@@ -150,7 +161,7 @@ def _configure_subnet(config):
|
||||
None)
|
||||
if not default_subnet:
|
||||
raise Exception(
|
||||
"No available subnets matching availability zone {} "
|
||||
"No usable subnets matching availability zone {} "
|
||||
"found. Choose a different availability zone or try "
|
||||
"manually creating an instance in your specified region "
|
||||
"to populate the list of subnets and trying this again."
|
||||
@@ -159,11 +170,15 @@ def _configure_subnet(config):
|
||||
default_subnet = subnets[0]
|
||||
|
||||
if "SubnetId" not in config["head_node"]:
|
||||
assert default_subnet.map_public_ip_on_launch, \
|
||||
"The chosen subnet must map nodes with public IPs on launch"
|
||||
config["head_node"]["SubnetId"] = default_subnet.id
|
||||
print("SubnetId not specified for head node, using {} in {}".format(
|
||||
default_subnet.id, default_subnet.availability_zone))
|
||||
|
||||
if "SubnetId" not in config["worker_nodes"]:
|
||||
assert default_subnet.map_public_ip_on_launch, \
|
||||
"The chosen subnet must map nodes with public IPs on launch"
|
||||
config["worker_nodes"]["SubnetId"] = default_subnet.id
|
||||
print("SubnetId not specified for workers, using {} in {}".format(
|
||||
default_subnet.id, default_subnet.availability_zone))
|
||||
@@ -260,8 +275,11 @@ def _get_key(key_name, config):
|
||||
|
||||
|
||||
def _client(name, config):
|
||||
return boto3.client(name, config["provider"]["region"])
|
||||
boto_config = Config(retries=dict(max_attempts=BOTO_MAX_RETRIES))
|
||||
return boto3.client(name, config["provider"]["region"], config=boto_config)
|
||||
|
||||
|
||||
def _resource(name, config):
|
||||
return boto3.resource(name, config["provider"]["region"])
|
||||
boto_config = Config(retries=dict(max_attempts=BOTO_MAX_RETRIES))
|
||||
return boto3.resource(
|
||||
name, config["provider"]["region"], config=boto_config)
|
||||
|
||||
@@ -3,15 +3,19 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES
|
||||
|
||||
|
||||
class AWSNodeProvider(NodeProvider):
|
||||
def __init__(self, provider_config, cluster_name):
|
||||
NodeProvider.__init__(self, provider_config, cluster_name)
|
||||
self.ec2 = boto3.resource("ec2", region_name=provider_config["region"])
|
||||
config = Config(retries=dict(max_attempts=BOTO_MAX_RETRIES))
|
||||
self.ec2 = boto3.resource(
|
||||
"ec2", region_name=provider_config["region"], config=config)
|
||||
|
||||
# Cache of node objects from the last nodes() call. This avoids
|
||||
# excessive DescribeInstances requests.
|
||||
|
||||
@@ -90,7 +90,7 @@ class NodeUpdater(object):
|
||||
self.ssh_ip = self.provider.external_ip(self.node_id)
|
||||
if self.ssh_ip is not None:
|
||||
break
|
||||
time.sleep(5)
|
||||
time.sleep(10)
|
||||
assert self.ssh_ip is not None, "Unable to find IP of node"
|
||||
|
||||
# Wait for SSH access
|
||||
@@ -135,7 +135,7 @@ class NodeUpdater(object):
|
||||
"mkdir -p {}".format(os.path.dirname(remote_path)))
|
||||
self.process_runner.check_call([
|
||||
"rsync", "-e", "ssh -i {} ".format(self.ssh_private_key) +
|
||||
"-o ConnectTimeout=60s -o StrictHostKeyChecking=no",
|
||||
"-o ConnectTimeout=120s -o StrictHostKeyChecking=no",
|
||||
"--delete", "-avz", "{}".format(local_path),
|
||||
"{}@{}:{}".format(self.ssh_user, self.ssh_ip, remote_path)
|
||||
], stdout=self.stdout, stderr=self.stderr)
|
||||
@@ -146,7 +146,7 @@ class NodeUpdater(object):
|
||||
for cmd in self.setup_cmds:
|
||||
self.ssh_cmd(cmd, verbose=True)
|
||||
|
||||
def ssh_cmd(self, cmd, connect_timeout=60, redirect=None, verbose=False):
|
||||
def ssh_cmd(self, cmd, connect_timeout=120, redirect=None, verbose=False):
|
||||
if verbose:
|
||||
print(
|
||||
"NodeUpdater: running {} on {}...".format(
|
||||
|
||||
@@ -18,3 +18,6 @@ AUTOSCALER_UPDATE_INTERVAL_S = 5
|
||||
# The autoscaler will attempt to restart Ray on nodes it hasn't heard from
|
||||
# in more than this interval.
|
||||
AUTOSCALER_HEARTBEAT_TIMEOUT_S = 30
|
||||
|
||||
# Max number of retries to AWS (default is 5, time increases exponentially)
|
||||
BOTO_MAX_RETRIES = 12
|
||||
|
||||
Reference in New Issue
Block a user