[autoscaler] Refactor code in preparation for multi instance type support (#8632)

* wip refactor

* add util

* wip

* fix

* fix

* remove

* remove extraneous string type for sg
This commit is contained in:
Eric Liang
2020-06-03 12:53:55 -07:00
committed by GitHub
parent 1e4a1360fd
commit a24d117c68
9 changed files with 415 additions and 365 deletions
+18 -336
View File
@@ -1,8 +1,6 @@
from collections import defaultdict
import copy
import hashlib
import json
import jsonschema
import logging
import math
import numpy as np
@@ -12,252 +10,24 @@ import threading
import time
import yaml
import ray
from ray.autoscaler.docker import dockerize_if_needed
from ray.autoscaler.node_provider import get_node_provider, \
get_default_config
from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE,
TAG_RAY_NODE_NAME, STATUS_UP_TO_DATE,
STATUS_UNINITIALIZED, NODE_TYPE_WORKER)
STATUS_UP_TO_DATE, NODE_TYPE_WORKER)
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.node_launcher import NodeLauncher
from ray.autoscaler.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf
from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \
AUTOSCALER_RESOURCE_REQUEST_CHANNEL, MEMORY_RESOURCE_UNIT_BYTES
AUTOSCALER_RESOURCE_REQUEST_CHANNEL
import ray.services as services
from ray.worker import global_worker
from six.moves import queue
logger = logging.getLogger(__name__)
REQUIRED, OPTIONAL = True, False
RAY_SCHEMA_PATH = os.path.join(
os.path.dirname(ray.autoscaler.__file__), "ray-schema.json")
NODE_TYPE_CONFIG_KEYS = {"workers": "worker_nodes", "head": "head_node"}
class LoadMetrics:
"""Container for cluster load metrics.
Metrics here are updated from raylet heartbeats. The autoscaler
queries these metrics to determine when to scale up, and which nodes
can be removed.
"""
def __init__(self):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()
def update(self, ip, static_resources, dynamic_resources, resource_load):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources
# We are not guaranteed to have a corresponding dynamic resource for
# every static resource because dynamic resources are based on the
# available resources in the heartbeat, which does not exist if it is
# zero. Thus, we have to update dynamic resources here.
dynamic_resources_update = dynamic_resources.copy()
for resource_name, capacity in static_resources.items():
if resource_name not in dynamic_resources_update:
dynamic_resources_update[resource_name] = 0.0
self.dynamic_resources_by_ip[ip] = dynamic_resources_update
now = time.time()
if ip not in self.last_used_time_by_ip or \
static_resources != dynamic_resources:
self.last_used_time_by_ip[ip] = now
self.last_heartbeat_time_by_ip[ip] = now
def mark_active(self, ip):
assert ip is not None, "IP should be known at this time"
logger.info("Node {} is newly setup, treating as active".format(ip))
self.last_heartbeat_time_by_ip[ip] = time.time()
def prune_active_ips(self, active_ips):
active_ips = set(active_ips)
active_ips.add(self.local_ip)
def prune(mapping):
unwanted = set(mapping) - active_ips
for unwanted_key in unwanted:
logger.info("LoadMetrics: "
"Removed mapping: {} - {}".format(
unwanted_key, mapping[unwanted_key]))
del mapping[unwanted_key]
if unwanted:
logger.info(
"LoadMetrics: "
"Removed {} stale ip mappings: {} not in {}".format(
len(unwanted), unwanted, active_ips))
assert not (unwanted & set(mapping))
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
prune(self.dynamic_resources_by_ip)
prune(self.resource_load_by_ip)
prune(self.last_heartbeat_time_by_ip)
def approx_workers_used(self):
return self._info()["NumNodesUsed"]
def num_workers_connected(self):
return self._info()["NumNodesConnected"]
def get_resource_usage(self):
num_nodes = len(self.static_resources_by_ip)
nodes_used = 0.0
num_nonidle = 0
has_saturated_node = False
resources_used = {}
resources_total = {}
for ip, max_resources in self.static_resources_by_ip.items():
avail_resources = self.dynamic_resources_by_ip[ip]
resource_load = self.resource_load_by_ip[ip]
max_frac = 0.0
for resource_id, amount in resource_load.items():
if amount > 0:
has_saturated_node = True
max_frac = 1.0 # the resource is saturated
for resource_id, amount in max_resources.items():
used = amount - avail_resources[resource_id]
if resource_id not in resources_used:
resources_used[resource_id] = 0.0
resources_total[resource_id] = 0.0
resources_used[resource_id] += used
resources_total[resource_id] += amount
used = max(0, used)
if amount > 0:
frac = used / float(amount)
if frac > max_frac:
max_frac = frac
nodes_used += max_frac
if max_frac > 0:
num_nonidle += 1
# If any nodes have a queue buildup, assume all non-idle nodes are 100%
# busy, plus the head node. This guards against the case of not scaling
# up due to poor task packing.
if has_saturated_node:
nodes_used = min(num_nonidle + 1.0, num_nodes)
return nodes_used, resources_used, resources_total
def info_string(self):
return ", ".join(
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
def _info(self):
nodes_used, resources_used, resources_total = self.get_resource_usage()
now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
heartbeat_times = [
now - t for t in self.last_heartbeat_time_by_ip.values()
]
most_delayed_heartbeats = sorted(
self.last_heartbeat_time_by_ip.items(),
key=lambda pair: pair[1])[:5]
most_delayed_heartbeats = {
ip: (now - t)
for ip, t in most_delayed_heartbeats
}
def format_resource(key, value):
if key in ["object_store_memory", "memory"]:
return "{} GiB".format(
round(value * MEMORY_RESOURCE_UNIT_BYTES / 1e9, 2))
else:
return round(value, 2)
return {
"ResourceUsage": ", ".join([
"{}/{} {}".format(
format_resource(rid, resources_used[rid]),
format_resource(rid, resources_total[rid]), rid)
for rid in sorted(resources_used)
]),
"NumNodesConnected": len(self.static_resources_by_ip),
"NumNodesUsed": round(nodes_used, 2),
"NodeIdleSeconds": "Min={} Mean={} Max={}".format(
int(np.min(idle_times)) if idle_times else -1,
int(np.mean(idle_times)) if idle_times else -1,
int(np.max(idle_times)) if idle_times else -1),
"TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format(
int(np.min(heartbeat_times)) if heartbeat_times else -1,
int(np.mean(heartbeat_times)) if heartbeat_times else -1,
int(np.max(heartbeat_times)) if heartbeat_times else -1),
"MostDelayedHeartbeats": most_delayed_heartbeats,
}
class NodeLauncher(threading.Thread):
def __init__(self, provider, queue, pending, index=None, *args, **kwargs):
self.queue = queue
self.pending = pending
self.provider = provider
self.index = str(index) if index is not None else ""
super(NodeLauncher, self).__init__(*args, **kwargs)
def _launch_node(self, config, count):
worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}
before = self.provider.non_terminated_nodes(tag_filters=worker_filter)
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
self.log("Launching {} nodes.".format(count))
self.provider.create_node(
config["worker_nodes"], {
TAG_RAY_NODE_NAME: "ray-{}-worker".format(
config["cluster_name"]),
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
TAG_RAY_LAUNCH_CONFIG: launch_hash,
}, count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
def run(self):
while True:
config, count = self.queue.get()
self.log("Got {} nodes to launch.".format(count))
try:
self._launch_node(config, count)
except Exception:
logger.exception("Launch failed")
finally:
self.pending.dec(count)
def log(self, statement):
prefix = "NodeLauncher{}:".format(self.index)
logger.info(prefix + " {}".format(statement))
class ConcurrentCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def inc(self, count):
with self._lock:
self._value += count
return self._value
def dec(self, count):
with self._lock:
assert self._value >= count, "counter cannot go negative"
self._value -= count
return self._value
@property
def value(self):
with self._lock:
return self._value
class StandardAutoscaler:
"""The autoscaling control loop for a Ray cluster.
@@ -307,7 +77,7 @@ class StandardAutoscaler:
# Node launchers
self.launch_queue = queue.Queue()
self.num_launches_pending = ConcurrentCounter()
self.pending_launches = ConcurrentCounter()
max_batches = math.ceil(
max_concurrent_launches / float(max_launch_batch))
for i in range(int(max_batches)):
@@ -315,7 +85,7 @@ class StandardAutoscaler:
provider=self.provider,
queue=self.launch_queue,
index=i,
pending=self.num_launches_pending)
pending=self.pending_launches)
node_launcher.daemon = True
node_launcher.start()
@@ -356,7 +126,6 @@ class StandardAutoscaler:
return
self.last_update_time = now
num_pending = self.num_launches_pending.value
nodes = self.workers()
self.load_metrics.prune_active_ips(
[self.provider.internal_ip(node_id) for node_id in nodes])
@@ -403,14 +172,15 @@ class StandardAutoscaler:
nodes = self.workers()
self.log_info_string(nodes, target_workers)
# Launch new nodes if needed
# Launch additional nodes of the default type, if still needed.
num_pending = self.pending_launches.value
num_workers = len(nodes) + num_pending
if num_workers < target_workers:
max_allowed = min(self.max_launch_batch,
self.max_concurrent_launches - num_pending)
num_launches = min(max_allowed, target_workers - num_workers)
self.launch_new_node(num_launches)
self.launch_new_node(num_launches, instance_type=None)
nodes = self.workers()
self.log_info_string(nodes, target_workers)
elif self.load_metrics.num_workers_connected() >= target_workers:
@@ -603,12 +373,12 @@ class StandardAutoscaler:
return False
return True
def launch_new_node(self, count):
def launch_new_node(self, count, instance_type):
logger.info(
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
self.num_launches_pending.inc(count)
self.pending_launches.inc(instance_type, count)
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count))
self.launch_queue.put((config, count, instance_type))
def workers(self):
return self.provider.non_terminated_nodes(
@@ -621,8 +391,8 @@ class StandardAutoscaler:
def info_string(self, nodes, target):
suffix = ""
if self.num_launches_pending:
suffix += " ({} pending)".format(self.num_launches_pending.value)
if self.pending_launches:
suffix += " ({} pending)".format(self.pending_launches.value)
if self.updaters:
suffix += " ({} updating)".format(len(self.updaters))
if self.num_failed_updates:
@@ -650,95 +420,8 @@ class StandardAutoscaler:
len(nodes)))
def validate_config(config):
"""Required Dicts indicate that no extra fields can be introduced."""
if not isinstance(config, dict):
raise ValueError("Config {} is not a dictionary".format(config))
with open(RAY_SCHEMA_PATH) as f:
schema = json.load(f)
try:
jsonschema.validate(config, schema)
except jsonschema.ValidationError as e:
raise jsonschema.ValidationError(message=e.message) from None
def fillout_defaults(config):
defaults = get_default_config(config["provider"])
defaults.update(config)
merge_setup_commands(defaults)
dockerize_if_needed(defaults)
defaults["auth"] = defaults.get("auth", {})
return defaults
def merge_setup_commands(config):
config["head_setup_commands"] = (
config["setup_commands"] + config["head_setup_commands"])
config["worker_setup_commands"] = (
config["setup_commands"] + config["worker_setup_commands"])
return config
def with_head_node_ip(cmds):
head_ip = services.get_node_ip_address()
out = []
for cmd in cmds:
out.append("export RAY_HEAD_IP={}; {}".format(head_ip, cmd))
return out
def hash_launch_conf(node_conf, auth):
hasher = hashlib.sha1()
hasher.update(
json.dumps([node_conf, auth], sort_keys=True).encode("utf-8"))
return hasher.hexdigest()
# Cache the file hashes to avoid rescanning it each time. Also, this avoids
# inadvertently restarting workers if the file mount content is mutated on the
# head node.
_hash_cache = {}
def hash_runtime_conf(file_mounts, extra_objs):
hasher = hashlib.sha1()
def add_content_hashes(path):
def add_hash_of_file(fpath):
with open(fpath, "rb") as f:
for chunk in iter(lambda: f.read(2**20), b""):
hasher.update(chunk)
path = os.path.expanduser(path)
if os.path.isdir(path):
dirs = []
for dirpath, _, filenames in os.walk(path):
dirs.append((dirpath, sorted(filenames)))
for dirpath, filenames in sorted(dirs):
hasher.update(dirpath.encode("utf-8"))
for name in filenames:
hasher.update(name.encode("utf-8"))
fpath = os.path.join(dirpath, name)
add_hash_of_file(fpath)
else:
add_hash_of_file(path)
conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") +
json.dumps(extra_objs, sort_keys=True).encode("utf-8"))
# Important: only hash the files once. Otherwise, we can end up restarting
# workers if the files were changed and we re-hashed them.
if conf_str not in _hash_cache:
hasher.update(conf_str)
for local_path in sorted(file_mounts.values()):
add_content_hashes(local_path)
_hash_cache[conf_str] = hasher.hexdigest()
return _hash_cache[conf_str]
def request_resources(num_cpus=None, num_gpus=None):
# Note: this is an (experimental) user-facing API, do not move.
def request_resources(num_cpus=None, num_gpus=None, bundles=None):
"""Remotely request some CPU or GPU resources from the autoscaler.
This function is to be called e.g. on a node before submitting a bunch of
@@ -761,7 +444,6 @@ def request_resources(num_cpus=None, num_gpus=None):
r = services.create_redis_client(
global_worker.node.redis_address,
password=global_worker.node.redis_password)
assert isinstance(num_cpus, int)
if num_cpus > 0:
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
json.dumps({
+12 -5
View File
@@ -12,7 +12,7 @@ from botocore.config import Config
import botocore
from ray.ray_constants import BOTO_MAX_RETRIES
from ray.autoscaler.autoscaler import NODE_TYPE_CONFIG_KEYS
from ray.autoscaler.tags import NODE_TYPE_WORKER, NODE_TYPE_HEAD
from ray.autoscaler.aws.utils import LazyDefaultDict
logger = logging.getLogger(__name__)
@@ -22,6 +22,13 @@ DEFAULT_RAY_INSTANCE_PROFILE = RAY + "-v1"
DEFAULT_RAY_IAM_ROLE = RAY + "-v1"
SECURITY_GROUP_TEMPLATE = RAY + "-{}"
# Mapping from the node type tag to the section of the autoscaler yaml that
# contains the config for the node type.
NODE_TYPE_CONFIG_KEYS = {
NODE_TYPE_WORKER: "worker_nodes",
NODE_TYPE_HEAD: "head_node",
}
DEFAULT_AMI_NAME = "AWS Deep Learning AMI (Ubuntu 18.04) V26.0"
# Obtained from https://aws.amazon.com/marketplace/pp/B07Y43P7X5 on 4/25/2020.
@@ -246,16 +253,16 @@ def _configure_security_group(config):
security_groups = _upsert_security_groups(config, node_types_to_configure)
if "head" in node_types_to_configure:
head_sg = security_groups["head"]
if NODE_TYPE_HEAD in node_types_to_configure:
head_sg = security_groups[NODE_TYPE_HEAD]
logger.info(
"_configure_security_group: "
"SecurityGroupIds not specified for head node, using {} ({})"
.format(head_sg.group_name, head_sg.id))
config["head_node"]["SecurityGroupIds"] = [head_sg.id]
if "workers" in node_types_to_configure:
workers_sg = security_groups["workers"]
if NODE_TYPE_WORKER in node_types_to_configure:
workers_sg = security_groups[NODE_TYPE_WORKER]
logger.info("_configure_security_group: "
"SecurityGroupIds not specified for workers, using {} ({})"
.format(workers_sg.group_name, workers_sg.id))
+2 -1
View File
@@ -15,7 +15,7 @@ try: # py3
except ImportError: # py2
from pipes import quote
from ray.autoscaler.autoscaler import validate_config, hash_runtime_conf, \
from ray.autoscaler.util import validate_config, hash_runtime_conf, \
hash_launch_conf, fillout_defaults
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
@@ -51,6 +51,7 @@ def _bootstrap_config(config):
cache_key = os.path.join(tempfile.gettempdir(),
"ray-config-{}".format(hasher.hexdigest()))
if os.path.exists(cache_key):
logger.info("Using cached config at {}".format(cache_key))
return json.loads(open(cache_key).read())
validate_config(config)
+175
View File
@@ -0,0 +1,175 @@
import logging
import time
import numpy as np
import ray.services as services
from ray.ray_constants import MEMORY_RESOURCE_UNIT_BYTES
logger = logging.getLogger(__name__)
class LoadMetrics:
"""Container for cluster load metrics.
Metrics here are updated from raylet heartbeats. The autoscaler
queries these metrics to determine when to scale up, and which nodes
can be removed.
"""
def __init__(self):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()
def update(self, ip, static_resources, dynamic_resources, resource_load):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources
# We are not guaranteed to have a corresponding dynamic resource for
# every static resource because dynamic resources are based on the
# available resources in the heartbeat, which does not exist if it is
# zero. Thus, we have to update dynamic resources here.
dynamic_resources_update = dynamic_resources.copy()
for resource_name, capacity in static_resources.items():
if resource_name not in dynamic_resources_update:
dynamic_resources_update[resource_name] = 0.0
self.dynamic_resources_by_ip[ip] = dynamic_resources_update
now = time.time()
if ip not in self.last_used_time_by_ip or \
static_resources != dynamic_resources:
self.last_used_time_by_ip[ip] = now
self.last_heartbeat_time_by_ip[ip] = now
def mark_active(self, ip):
assert ip is not None, "IP should be known at this time"
logger.info("Node {} is newly setup, treating as active".format(ip))
self.last_heartbeat_time_by_ip[ip] = time.time()
def prune_active_ips(self, active_ips):
active_ips = set(active_ips)
active_ips.add(self.local_ip)
def prune(mapping):
unwanted = set(mapping) - active_ips
for unwanted_key in unwanted:
logger.info("LoadMetrics: "
"Removed mapping: {} - {}".format(
unwanted_key, mapping[unwanted_key]))
del mapping[unwanted_key]
if unwanted:
logger.info(
"LoadMetrics: "
"Removed {} stale ip mappings: {} not in {}".format(
len(unwanted), unwanted, active_ips))
assert not (unwanted & set(mapping))
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
prune(self.dynamic_resources_by_ip)
prune(self.resource_load_by_ip)
prune(self.last_heartbeat_time_by_ip)
def approx_workers_used(self):
return self._info()["NumNodesUsed"]
def num_workers_connected(self):
return self._info()["NumNodesConnected"]
def get_node_resources(self):
"""Return a list of node resources (static resource sizes.
Example:
>>> metrics.get_node_resources()
[{"CPU": 1}, {"CPU": 4, "GPU": 8}] # for two different nodes
"""
return self.static_resources_by_ip.values()
def get_resource_usage(self):
num_nodes = len(self.static_resources_by_ip)
nodes_used = 0.0
num_nonidle = 0
has_saturated_node = False
resources_used = {}
resources_total = {}
for ip, max_resources in self.static_resources_by_ip.items():
avail_resources = self.dynamic_resources_by_ip[ip]
resource_load = self.resource_load_by_ip[ip]
max_frac = 0.0
for resource_id, amount in resource_load.items():
if amount > 0:
has_saturated_node = True
max_frac = 1.0 # the resource is saturated
for resource_id, amount in max_resources.items():
used = amount - avail_resources[resource_id]
if resource_id not in resources_used:
resources_used[resource_id] = 0.0
resources_total[resource_id] = 0.0
resources_used[resource_id] += used
resources_total[resource_id] += amount
used = max(0, used)
if amount > 0:
frac = used / float(amount)
if frac > max_frac:
max_frac = frac
nodes_used += max_frac
if max_frac > 0:
num_nonidle += 1
# If any nodes have a queue buildup, assume all non-idle nodes are 100%
# busy, plus the head node. This guards against the case of not scaling
# up due to poor task packing.
if has_saturated_node:
nodes_used = min(num_nonidle + 1.0, num_nodes)
return nodes_used, resources_used, resources_total
def info_string(self):
return ", ".join(
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
def _info(self):
nodes_used, resources_used, resources_total = self.get_resource_usage()
now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
heartbeat_times = [
now - t for t in self.last_heartbeat_time_by_ip.values()
]
most_delayed_heartbeats = sorted(
self.last_heartbeat_time_by_ip.items(),
key=lambda pair: pair[1])[:5]
most_delayed_heartbeats = {
ip: (now - t)
for ip, t in most_delayed_heartbeats
}
def format_resource(key, value):
if key in ["object_store_memory", "memory"]:
return "{} GiB".format(
round(value * MEMORY_RESOURCE_UNIT_BYTES / 1e9, 2))
else:
return round(value, 2)
return {
"ResourceUsage": ", ".join([
"{}/{} {}".format(
format_resource(rid, resources_used[rid]),
format_resource(rid, resources_total[rid]), rid)
for rid in sorted(resources_used)
]),
"NumNodesConnected": len(self.static_resources_by_ip),
"NumNodesUsed": round(nodes_used, 2),
"NodeIdleSeconds": "Min={} Mean={} Max={}".format(
int(np.min(idle_times)) if idle_times else -1,
int(np.mean(idle_times)) if idle_times else -1,
int(np.max(idle_times)) if idle_times else -1),
"TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format(
int(np.min(heartbeat_times)) if heartbeat_times else -1,
int(np.mean(heartbeat_times)) if heartbeat_times else -1,
int(np.max(heartbeat_times)) if heartbeat_times else -1),
"MostDelayedHeartbeats": most_delayed_heartbeats,
}
+57
View File
@@ -0,0 +1,57 @@
import logging
import threading
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS,
TAG_RAY_NODE_TYPE, TAG_RAY_NODE_NAME,
STATUS_UNINITIALIZED, NODE_TYPE_WORKER)
from ray.autoscaler.util import hash_launch_conf
logger = logging.getLogger(__name__)
class NodeLauncher(threading.Thread):
"""Launches nodes asynchronously in the background."""
def __init__(self, provider, queue, pending, index=None, *args, **kwargs):
self.queue = queue
self.pending = pending
self.provider = provider
self.index = str(index) if index is not None else ""
super(NodeLauncher, self).__init__(*args, **kwargs)
def _launch_node(self, config, count, instance_type):
worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}
before = self.provider.non_terminated_nodes(tag_filters=worker_filter)
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
self.log("Launching {} nodes, type {}.".format(count, instance_type))
node_config = config["worker_nodes"]
node_tags = {
TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]),
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
TAG_RAY_LAUNCH_CONFIG: launch_hash,
}
if instance_type:
# node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type
self.provider.create_node_of_type(node_config, node_tags,
instance_type, count)
else:
self.provider.create_node(node_config, node_tags, count)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
self.log("No new nodes reported after node creation.")
def run(self):
while True:
config, count, instance_type = self.queue.get()
self.log("Got {} nodes to launch.".format(count))
try:
self._launch_node(config, count, instance_type)
except Exception:
logger.exception("Launch failed")
finally:
self.pending.dec(instance_type, count)
def log(self, statement):
prefix = "NodeLauncher{}:".format(self.index)
logger.info(prefix + " {}".format(statement))
+129
View File
@@ -0,0 +1,129 @@
import collections
import os
import json
import threading
import hashlib
import jsonschema
import ray
import ray.services as services
from ray.autoscaler.node_provider import get_default_config
from ray.autoscaler.docker import dockerize_if_needed
REQUIRED, OPTIONAL = True, False
RAY_SCHEMA_PATH = os.path.join(
os.path.dirname(ray.autoscaler.__file__), "ray-schema.json")
class ConcurrentCounter:
def __init__(self):
self._lock = threading.RLock()
self._counter = collections.defaultdict(int)
def inc(self, key, count):
with self._lock:
self._counter[key] += count
return self.value
def dec(self, key, count):
with self._lock:
self._counter[key] -= count
assert self._counter[key] >= 0, "counter cannot go negative"
return self.value
def breakdown(self):
with self._lock:
return dict(self._counter)
@property
def value(self):
with self._lock:
return sum(self._counter.values())
def validate_config(config):
"""Required Dicts indicate that no extra fields can be introduced."""
if not isinstance(config, dict):
raise ValueError("Config {} is not a dictionary".format(config))
with open(RAY_SCHEMA_PATH) as f:
schema = json.load(f)
try:
jsonschema.validate(config, schema)
except jsonschema.ValidationError as e:
raise jsonschema.ValidationError(message=e.message) from None
def fillout_defaults(config):
defaults = get_default_config(config["provider"])
defaults.update(config)
merge_setup_commands(defaults)
dockerize_if_needed(defaults)
defaults["auth"] = defaults.get("auth", {})
return defaults
def merge_setup_commands(config):
config["head_setup_commands"] = (
config["setup_commands"] + config["head_setup_commands"])
config["worker_setup_commands"] = (
config["setup_commands"] + config["worker_setup_commands"])
return config
def with_head_node_ip(cmds):
head_ip = services.get_node_ip_address()
out = []
for cmd in cmds:
out.append("export RAY_HEAD_IP={}; {}".format(head_ip, cmd))
return out
def hash_launch_conf(node_conf, auth):
hasher = hashlib.sha1()
hasher.update(
json.dumps([node_conf, auth], sort_keys=True).encode("utf-8"))
return hasher.hexdigest()
# Cache the file hashes to avoid rescanning it each time. Also, this avoids
# inadvertently restarting workers if the file mount content is mutated on the
# head node.
_hash_cache = {}
def hash_runtime_conf(file_mounts, extra_objs):
hasher = hashlib.sha1()
def add_content_hashes(path):
def add_hash_of_file(fpath):
with open(fpath, "rb") as f:
for chunk in iter(lambda: f.read(2**20), b""):
hasher.update(chunk)
path = os.path.expanduser(path)
if os.path.isdir(path):
dirs = []
for dirpath, _, filenames in os.walk(path):
dirs.append((dirpath, sorted(filenames)))
for dirpath, filenames in sorted(dirs):
hasher.update(dirpath.encode("utf-8"))
for name in filenames:
hasher.update(name.encode("utf-8"))
fpath = os.path.join(dirpath, name)
add_hash_of_file(fpath)
else:
add_hash_of_file(path)
conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") +
json.dumps(extra_objs, sort_keys=True).encode("utf-8"))
# Important: only hash the files once. Otherwise, we can end up restarting
# workers if the files were changed and we re-hashed them.
if conf_str not in _hash_cache:
hasher.update(conf_str)
for local_path in sorted(file_mounts.values()):
add_content_hashes(local_path)
_hash_cache[conf_str] = hasher.hexdigest()
return _hash_cache[conf_str]
+2 -1
View File
@@ -6,7 +6,8 @@ import traceback
import json
import ray
from ray.autoscaler.autoscaler import LoadMetrics, StandardAutoscaler
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.load_metrics import LoadMetrics
import ray.gcs_utils
import ray.utils
import ray.ray_constants as ray_constants
+19 -21
View File
@@ -10,8 +10,9 @@ from jsonschema.exceptions import ValidationError
import ray
import ray.services as services
from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \
fillout_defaults, validate_config
from ray.autoscaler.util import fillout_defaults, validate_config
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED
from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider
@@ -545,13 +546,13 @@ class AutoscalingTest(unittest.TestCase):
# Update will try to create, but will block until we set the flag
self.provider.ready_to_create.clear()
autoscaler.update()
assert autoscaler.num_launches_pending.value == 2
assert autoscaler.pending_launches.value == 2
assert len(self.provider.non_terminated_nodes({})) == 0
# Set the flag, check it updates
self.provider.ready_to_create.set()
self.waitForNodes(2)
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
# Update the config to reduce the cluster size
new_config = SMALL_CLUSTER.copy()
@@ -583,12 +584,9 @@ class AutoscalingTest(unittest.TestCase):
rtc1.clear()
autoscaler.update()
# Synchronization: wait for launchy thread to be blocked on rtc1
if hasattr(rtc1, "_cond"): # Python 3.5
waiters = rtc1._cond._waiters
else: # Python 2.7
waiters = rtc1._Event__cond._Condition__waiters
waiters = rtc1._cond._waiters
self.waitFor(lambda: len(waiters) == 1)
assert autoscaler.num_launches_pending.value == 5
assert autoscaler.pending_launches.value == 5
assert len(self.provider.non_terminated_nodes({})) == 0
# Call update() to launch a second wave of 3 nodes,
@@ -599,24 +597,24 @@ class AutoscalingTest(unittest.TestCase):
rtc2.set()
autoscaler.update()
self.waitForNodes(3)
assert autoscaler.num_launches_pending.value == 5
assert autoscaler.pending_launches.value == 5
# The first wave of 5 will now tragically fail
self.provider.fail_creates = True
rtc1.set()
self.waitFor(lambda: autoscaler.num_launches_pending.value == 0)
self.waitFor(lambda: autoscaler.pending_launches.value == 0)
assert len(self.provider.non_terminated_nodes({})) == 3
# Retry the first wave, allowing it to succeed this time
self.provider.fail_creates = False
autoscaler.update()
self.waitForNodes(8)
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
# Final wave of 2 nodes
autoscaler.update()
self.waitForNodes(10)
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
def testUpdateThrottling(self):
config_path = self.write_config(SMALL_CLUSTER)
@@ -632,7 +630,7 @@ class AutoscalingTest(unittest.TestCase):
update_interval_s=10)
autoscaler.update()
self.waitForNodes(2)
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
@@ -641,7 +639,7 @@ class AutoscalingTest(unittest.TestCase):
# note that node termination happens in the main thread, so
# we do not need to add any delay here before checking
assert len(self.provider.non_terminated_nodes({})) == 2
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
def testLaunchConfigChange(self):
config_path = self.write_config(SMALL_CLUSTER)
@@ -682,7 +680,7 @@ class AutoscalingTest(unittest.TestCase):
for _ in range(10):
autoscaler.update()
time.sleep(0.1)
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 2
# New a good config again
@@ -812,7 +810,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(1)
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 1
# Scales up as nodes are reported as used
@@ -829,19 +827,19 @@ class AutoscalingTest(unittest.TestCase):
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 5
# Scales down as nodes become unused
lm.last_used_time_by_ip["172.0.0.0"] = 0
lm.last_used_time_by_ip["172.0.0.1"] = 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 3
lm.last_used_time_by_ip["172.0.0.2"] = 0
lm.last_used_time_by_ip["172.0.0.3"] = 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 1
def testDontScaleBelowTarget(self):
@@ -861,7 +859,7 @@ class AutoscalingTest(unittest.TestCase):
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 0
# Scales up as nodes are reported as used
+1 -1
View File
@@ -5,7 +5,7 @@ import yaml
import urllib
import tempfile
from ray.autoscaler.autoscaler import fillout_defaults, validate_config
from ray.autoscaler.util import fillout_defaults, validate_config
from ray.test_utils import recursive_fnmatch
RAY_PATH = os.path.abspath(os.path.join(__file__, "../../"))