diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index a7a83cc07..0d2f39563 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -1,8 +1,6 @@ from collections import defaultdict import copy -import hashlib import json -import jsonschema import logging import math import numpy as np @@ -12,252 +10,24 @@ import threading import time import yaml -import ray -from ray.autoscaler.docker import dockerize_if_needed -from ray.autoscaler.node_provider import get_node_provider, \ - get_default_config +from ray.autoscaler.node_provider import get_node_provider from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, - TAG_RAY_NODE_NAME, STATUS_UP_TO_DATE, - STATUS_UNINITIALIZED, NODE_TYPE_WORKER) + STATUS_UP_TO_DATE, NODE_TYPE_WORKER) from ray.autoscaler.updater import NodeUpdaterThread +from ray.autoscaler.node_launcher import NodeLauncher +from ray.autoscaler.util import ConcurrentCounter, validate_config, \ + with_head_node_ip, hash_launch_conf, hash_runtime_conf from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \ - AUTOSCALER_RESOURCE_REQUEST_CHANNEL, MEMORY_RESOURCE_UNIT_BYTES + AUTOSCALER_RESOURCE_REQUEST_CHANNEL import ray.services as services from ray.worker import global_worker from six.moves import queue logger = logging.getLogger(__name__) -REQUIRED, OPTIONAL = True, False -RAY_SCHEMA_PATH = os.path.join( - os.path.dirname(ray.autoscaler.__file__), "ray-schema.json") - -NODE_TYPE_CONFIG_KEYS = {"workers": "worker_nodes", "head": "head_node"} - - -class LoadMetrics: - """Container for cluster load metrics. - - Metrics here are updated from raylet heartbeats. The autoscaler - queries these metrics to determine when to scale up, and which nodes - can be removed. - """ - - def __init__(self): - self.last_used_time_by_ip = {} - self.last_heartbeat_time_by_ip = {} - self.static_resources_by_ip = {} - self.dynamic_resources_by_ip = {} - self.resource_load_by_ip = {} - self.local_ip = services.get_node_ip_address() - - def update(self, ip, static_resources, dynamic_resources, resource_load): - self.resource_load_by_ip[ip] = resource_load - self.static_resources_by_ip[ip] = static_resources - - # We are not guaranteed to have a corresponding dynamic resource for - # every static resource because dynamic resources are based on the - # available resources in the heartbeat, which does not exist if it is - # zero. Thus, we have to update dynamic resources here. - dynamic_resources_update = dynamic_resources.copy() - for resource_name, capacity in static_resources.items(): - if resource_name not in dynamic_resources_update: - dynamic_resources_update[resource_name] = 0.0 - self.dynamic_resources_by_ip[ip] = dynamic_resources_update - - now = time.time() - if ip not in self.last_used_time_by_ip or \ - static_resources != dynamic_resources: - self.last_used_time_by_ip[ip] = now - self.last_heartbeat_time_by_ip[ip] = now - - def mark_active(self, ip): - assert ip is not None, "IP should be known at this time" - logger.info("Node {} is newly setup, treating as active".format(ip)) - self.last_heartbeat_time_by_ip[ip] = time.time() - - def prune_active_ips(self, active_ips): - active_ips = set(active_ips) - active_ips.add(self.local_ip) - - def prune(mapping): - unwanted = set(mapping) - active_ips - for unwanted_key in unwanted: - logger.info("LoadMetrics: " - "Removed mapping: {} - {}".format( - unwanted_key, mapping[unwanted_key])) - del mapping[unwanted_key] - if unwanted: - logger.info( - "LoadMetrics: " - "Removed {} stale ip mappings: {} not in {}".format( - len(unwanted), unwanted, active_ips)) - assert not (unwanted & set(mapping)) - - prune(self.last_used_time_by_ip) - prune(self.static_resources_by_ip) - prune(self.dynamic_resources_by_ip) - prune(self.resource_load_by_ip) - prune(self.last_heartbeat_time_by_ip) - - def approx_workers_used(self): - return self._info()["NumNodesUsed"] - - def num_workers_connected(self): - return self._info()["NumNodesConnected"] - - def get_resource_usage(self): - num_nodes = len(self.static_resources_by_ip) - nodes_used = 0.0 - num_nonidle = 0 - has_saturated_node = False - resources_used = {} - resources_total = {} - for ip, max_resources in self.static_resources_by_ip.items(): - avail_resources = self.dynamic_resources_by_ip[ip] - resource_load = self.resource_load_by_ip[ip] - max_frac = 0.0 - for resource_id, amount in resource_load.items(): - if amount > 0: - has_saturated_node = True - max_frac = 1.0 # the resource is saturated - for resource_id, amount in max_resources.items(): - used = amount - avail_resources[resource_id] - if resource_id not in resources_used: - resources_used[resource_id] = 0.0 - resources_total[resource_id] = 0.0 - resources_used[resource_id] += used - resources_total[resource_id] += amount - used = max(0, used) - if amount > 0: - frac = used / float(amount) - if frac > max_frac: - max_frac = frac - nodes_used += max_frac - if max_frac > 0: - num_nonidle += 1 - - # If any nodes have a queue buildup, assume all non-idle nodes are 100% - # busy, plus the head node. This guards against the case of not scaling - # up due to poor task packing. - if has_saturated_node: - nodes_used = min(num_nonidle + 1.0, num_nodes) - - return nodes_used, resources_used, resources_total - - def info_string(self): - return ", ".join( - ["{}={}".format(k, v) for k, v in sorted(self._info().items())]) - - def _info(self): - nodes_used, resources_used, resources_total = self.get_resource_usage() - - now = time.time() - idle_times = [now - t for t in self.last_used_time_by_ip.values()] - heartbeat_times = [ - now - t for t in self.last_heartbeat_time_by_ip.values() - ] - most_delayed_heartbeats = sorted( - self.last_heartbeat_time_by_ip.items(), - key=lambda pair: pair[1])[:5] - most_delayed_heartbeats = { - ip: (now - t) - for ip, t in most_delayed_heartbeats - } - - def format_resource(key, value): - if key in ["object_store_memory", "memory"]: - return "{} GiB".format( - round(value * MEMORY_RESOURCE_UNIT_BYTES / 1e9, 2)) - else: - return round(value, 2) - - return { - "ResourceUsage": ", ".join([ - "{}/{} {}".format( - format_resource(rid, resources_used[rid]), - format_resource(rid, resources_total[rid]), rid) - for rid in sorted(resources_used) - ]), - "NumNodesConnected": len(self.static_resources_by_ip), - "NumNodesUsed": round(nodes_used, 2), - "NodeIdleSeconds": "Min={} Mean={} Max={}".format( - int(np.min(idle_times)) if idle_times else -1, - int(np.mean(idle_times)) if idle_times else -1, - int(np.max(idle_times)) if idle_times else -1), - "TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format( - int(np.min(heartbeat_times)) if heartbeat_times else -1, - int(np.mean(heartbeat_times)) if heartbeat_times else -1, - int(np.max(heartbeat_times)) if heartbeat_times else -1), - "MostDelayedHeartbeats": most_delayed_heartbeats, - } - - -class NodeLauncher(threading.Thread): - def __init__(self, provider, queue, pending, index=None, *args, **kwargs): - self.queue = queue - self.pending = pending - self.provider = provider - self.index = str(index) if index is not None else "" - super(NodeLauncher, self).__init__(*args, **kwargs) - - def _launch_node(self, config, count): - worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER} - before = self.provider.non_terminated_nodes(tag_filters=worker_filter) - launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"]) - self.log("Launching {} nodes.".format(count)) - self.provider.create_node( - config["worker_nodes"], { - TAG_RAY_NODE_NAME: "ray-{}-worker".format( - config["cluster_name"]), - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER, - TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED, - TAG_RAY_LAUNCH_CONFIG: launch_hash, - }, count) - after = self.provider.non_terminated_nodes(tag_filters=worker_filter) - if set(after).issubset(before): - self.log("No new nodes reported after node creation.") - - def run(self): - while True: - config, count = self.queue.get() - self.log("Got {} nodes to launch.".format(count)) - try: - self._launch_node(config, count) - except Exception: - logger.exception("Launch failed") - finally: - self.pending.dec(count) - - def log(self, statement): - prefix = "NodeLauncher{}:".format(self.index) - logger.info(prefix + " {}".format(statement)) - - -class ConcurrentCounter: - def __init__(self): - self._value = 0 - self._lock = threading.Lock() - - def inc(self, count): - with self._lock: - self._value += count - return self._value - - def dec(self, count): - with self._lock: - assert self._value >= count, "counter cannot go negative" - self._value -= count - return self._value - - @property - def value(self): - with self._lock: - return self._value - class StandardAutoscaler: """The autoscaling control loop for a Ray cluster. @@ -307,7 +77,7 @@ class StandardAutoscaler: # Node launchers self.launch_queue = queue.Queue() - self.num_launches_pending = ConcurrentCounter() + self.pending_launches = ConcurrentCounter() max_batches = math.ceil( max_concurrent_launches / float(max_launch_batch)) for i in range(int(max_batches)): @@ -315,7 +85,7 @@ class StandardAutoscaler: provider=self.provider, queue=self.launch_queue, index=i, - pending=self.num_launches_pending) + pending=self.pending_launches) node_launcher.daemon = True node_launcher.start() @@ -356,7 +126,6 @@ class StandardAutoscaler: return self.last_update_time = now - num_pending = self.num_launches_pending.value nodes = self.workers() self.load_metrics.prune_active_ips( [self.provider.internal_ip(node_id) for node_id in nodes]) @@ -403,14 +172,15 @@ class StandardAutoscaler: nodes = self.workers() self.log_info_string(nodes, target_workers) - # Launch new nodes if needed + # Launch additional nodes of the default type, if still needed. + num_pending = self.pending_launches.value num_workers = len(nodes) + num_pending if num_workers < target_workers: max_allowed = min(self.max_launch_batch, self.max_concurrent_launches - num_pending) num_launches = min(max_allowed, target_workers - num_workers) - self.launch_new_node(num_launches) + self.launch_new_node(num_launches, instance_type=None) nodes = self.workers() self.log_info_string(nodes, target_workers) elif self.load_metrics.num_workers_connected() >= target_workers: @@ -603,12 +373,12 @@ class StandardAutoscaler: return False return True - def launch_new_node(self, count): + def launch_new_node(self, count, instance_type): logger.info( "StandardAutoscaler: Queue {} new nodes for launch".format(count)) - self.num_launches_pending.inc(count) + self.pending_launches.inc(instance_type, count) config = copy.deepcopy(self.config) - self.launch_queue.put((config, count)) + self.launch_queue.put((config, count, instance_type)) def workers(self): return self.provider.non_terminated_nodes( @@ -621,8 +391,8 @@ class StandardAutoscaler: def info_string(self, nodes, target): suffix = "" - if self.num_launches_pending: - suffix += " ({} pending)".format(self.num_launches_pending.value) + if self.pending_launches: + suffix += " ({} pending)".format(self.pending_launches.value) if self.updaters: suffix += " ({} updating)".format(len(self.updaters)) if self.num_failed_updates: @@ -650,95 +420,8 @@ class StandardAutoscaler: len(nodes))) -def validate_config(config): - """Required Dicts indicate that no extra fields can be introduced.""" - if not isinstance(config, dict): - raise ValueError("Config {} is not a dictionary".format(config)) - - with open(RAY_SCHEMA_PATH) as f: - schema = json.load(f) - try: - jsonschema.validate(config, schema) - except jsonschema.ValidationError as e: - raise jsonschema.ValidationError(message=e.message) from None - - -def fillout_defaults(config): - defaults = get_default_config(config["provider"]) - defaults.update(config) - merge_setup_commands(defaults) - dockerize_if_needed(defaults) - defaults["auth"] = defaults.get("auth", {}) - return defaults - - -def merge_setup_commands(config): - config["head_setup_commands"] = ( - config["setup_commands"] + config["head_setup_commands"]) - config["worker_setup_commands"] = ( - config["setup_commands"] + config["worker_setup_commands"]) - return config - - -def with_head_node_ip(cmds): - head_ip = services.get_node_ip_address() - out = [] - for cmd in cmds: - out.append("export RAY_HEAD_IP={}; {}".format(head_ip, cmd)) - return out - - -def hash_launch_conf(node_conf, auth): - hasher = hashlib.sha1() - hasher.update( - json.dumps([node_conf, auth], sort_keys=True).encode("utf-8")) - return hasher.hexdigest() - - -# Cache the file hashes to avoid rescanning it each time. Also, this avoids -# inadvertently restarting workers if the file mount content is mutated on the -# head node. -_hash_cache = {} - - -def hash_runtime_conf(file_mounts, extra_objs): - hasher = hashlib.sha1() - - def add_content_hashes(path): - def add_hash_of_file(fpath): - with open(fpath, "rb") as f: - for chunk in iter(lambda: f.read(2**20), b""): - hasher.update(chunk) - - path = os.path.expanduser(path) - if os.path.isdir(path): - dirs = [] - for dirpath, _, filenames in os.walk(path): - dirs.append((dirpath, sorted(filenames))) - for dirpath, filenames in sorted(dirs): - hasher.update(dirpath.encode("utf-8")) - for name in filenames: - hasher.update(name.encode("utf-8")) - fpath = os.path.join(dirpath, name) - add_hash_of_file(fpath) - else: - add_hash_of_file(path) - - conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") + - json.dumps(extra_objs, sort_keys=True).encode("utf-8")) - - # Important: only hash the files once. Otherwise, we can end up restarting - # workers if the files were changed and we re-hashed them. - if conf_str not in _hash_cache: - hasher.update(conf_str) - for local_path in sorted(file_mounts.values()): - add_content_hashes(local_path) - _hash_cache[conf_str] = hasher.hexdigest() - - return _hash_cache[conf_str] - - -def request_resources(num_cpus=None, num_gpus=None): +# Note: this is an (experimental) user-facing API, do not move. +def request_resources(num_cpus=None, num_gpus=None, bundles=None): """Remotely request some CPU or GPU resources from the autoscaler. This function is to be called e.g. on a node before submitting a bunch of @@ -761,7 +444,6 @@ def request_resources(num_cpus=None, num_gpus=None): r = services.create_redis_client( global_worker.node.redis_address, password=global_worker.node.redis_password) - assert isinstance(num_cpus, int) if num_cpus > 0: r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps({ diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index 96f93a667..1f2cb5891 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -12,7 +12,7 @@ from botocore.config import Config import botocore from ray.ray_constants import BOTO_MAX_RETRIES -from ray.autoscaler.autoscaler import NODE_TYPE_CONFIG_KEYS +from ray.autoscaler.tags import NODE_TYPE_WORKER, NODE_TYPE_HEAD from ray.autoscaler.aws.utils import LazyDefaultDict logger = logging.getLogger(__name__) @@ -22,6 +22,13 @@ DEFAULT_RAY_INSTANCE_PROFILE = RAY + "-v1" DEFAULT_RAY_IAM_ROLE = RAY + "-v1" SECURITY_GROUP_TEMPLATE = RAY + "-{}" +# Mapping from the node type tag to the section of the autoscaler yaml that +# contains the config for the node type. +NODE_TYPE_CONFIG_KEYS = { + NODE_TYPE_WORKER: "worker_nodes", + NODE_TYPE_HEAD: "head_node", +} + DEFAULT_AMI_NAME = "AWS Deep Learning AMI (Ubuntu 18.04) V26.0" # Obtained from https://aws.amazon.com/marketplace/pp/B07Y43P7X5 on 4/25/2020. @@ -246,16 +253,16 @@ def _configure_security_group(config): security_groups = _upsert_security_groups(config, node_types_to_configure) - if "head" in node_types_to_configure: - head_sg = security_groups["head"] + if NODE_TYPE_HEAD in node_types_to_configure: + head_sg = security_groups[NODE_TYPE_HEAD] logger.info( "_configure_security_group: " "SecurityGroupIds not specified for head node, using {} ({})" .format(head_sg.group_name, head_sg.id)) config["head_node"]["SecurityGroupIds"] = [head_sg.id] - if "workers" in node_types_to_configure: - workers_sg = security_groups["workers"] + if NODE_TYPE_WORKER in node_types_to_configure: + workers_sg = security_groups[NODE_TYPE_WORKER] logger.info("_configure_security_group: " "SecurityGroupIds not specified for workers, using {} ({})" .format(workers_sg.group_name, workers_sg.id)) diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index a1dd8e287..81c40494e 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -15,7 +15,7 @@ try: # py3 except ImportError: # py2 from pipes import quote -from ray.autoscaler.autoscaler import validate_config, hash_runtime_conf, \ +from ray.autoscaler.util import validate_config, hash_runtime_conf, \ hash_launch_conf, fillout_defaults from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ @@ -51,6 +51,7 @@ def _bootstrap_config(config): cache_key = os.path.join(tempfile.gettempdir(), "ray-config-{}".format(hasher.hexdigest())) if os.path.exists(cache_key): + logger.info("Using cached config at {}".format(cache_key)) return json.loads(open(cache_key).read()) validate_config(config) diff --git a/python/ray/autoscaler/load_metrics.py b/python/ray/autoscaler/load_metrics.py new file mode 100644 index 000000000..23c96f9fd --- /dev/null +++ b/python/ray/autoscaler/load_metrics.py @@ -0,0 +1,175 @@ +import logging +import time + +import numpy as np +import ray.services as services +from ray.ray_constants import MEMORY_RESOURCE_UNIT_BYTES + +logger = logging.getLogger(__name__) + + +class LoadMetrics: + """Container for cluster load metrics. + + Metrics here are updated from raylet heartbeats. The autoscaler + queries these metrics to determine when to scale up, and which nodes + can be removed. + """ + + def __init__(self): + self.last_used_time_by_ip = {} + self.last_heartbeat_time_by_ip = {} + self.static_resources_by_ip = {} + self.dynamic_resources_by_ip = {} + self.resource_load_by_ip = {} + self.local_ip = services.get_node_ip_address() + + def update(self, ip, static_resources, dynamic_resources, resource_load): + self.resource_load_by_ip[ip] = resource_load + self.static_resources_by_ip[ip] = static_resources + + # We are not guaranteed to have a corresponding dynamic resource for + # every static resource because dynamic resources are based on the + # available resources in the heartbeat, which does not exist if it is + # zero. Thus, we have to update dynamic resources here. + dynamic_resources_update = dynamic_resources.copy() + for resource_name, capacity in static_resources.items(): + if resource_name not in dynamic_resources_update: + dynamic_resources_update[resource_name] = 0.0 + self.dynamic_resources_by_ip[ip] = dynamic_resources_update + + now = time.time() + if ip not in self.last_used_time_by_ip or \ + static_resources != dynamic_resources: + self.last_used_time_by_ip[ip] = now + self.last_heartbeat_time_by_ip[ip] = now + + def mark_active(self, ip): + assert ip is not None, "IP should be known at this time" + logger.info("Node {} is newly setup, treating as active".format(ip)) + self.last_heartbeat_time_by_ip[ip] = time.time() + + def prune_active_ips(self, active_ips): + active_ips = set(active_ips) + active_ips.add(self.local_ip) + + def prune(mapping): + unwanted = set(mapping) - active_ips + for unwanted_key in unwanted: + logger.info("LoadMetrics: " + "Removed mapping: {} - {}".format( + unwanted_key, mapping[unwanted_key])) + del mapping[unwanted_key] + if unwanted: + logger.info( + "LoadMetrics: " + "Removed {} stale ip mappings: {} not in {}".format( + len(unwanted), unwanted, active_ips)) + assert not (unwanted & set(mapping)) + + prune(self.last_used_time_by_ip) + prune(self.static_resources_by_ip) + prune(self.dynamic_resources_by_ip) + prune(self.resource_load_by_ip) + prune(self.last_heartbeat_time_by_ip) + + def approx_workers_used(self): + return self._info()["NumNodesUsed"] + + def num_workers_connected(self): + return self._info()["NumNodesConnected"] + + def get_node_resources(self): + """Return a list of node resources (static resource sizes. + + Example: + >>> metrics.get_node_resources() + [{"CPU": 1}, {"CPU": 4, "GPU": 8}] # for two different nodes + """ + return self.static_resources_by_ip.values() + + def get_resource_usage(self): + num_nodes = len(self.static_resources_by_ip) + nodes_used = 0.0 + num_nonidle = 0 + has_saturated_node = False + resources_used = {} + resources_total = {} + for ip, max_resources in self.static_resources_by_ip.items(): + avail_resources = self.dynamic_resources_by_ip[ip] + resource_load = self.resource_load_by_ip[ip] + max_frac = 0.0 + for resource_id, amount in resource_load.items(): + if amount > 0: + has_saturated_node = True + max_frac = 1.0 # the resource is saturated + for resource_id, amount in max_resources.items(): + used = amount - avail_resources[resource_id] + if resource_id not in resources_used: + resources_used[resource_id] = 0.0 + resources_total[resource_id] = 0.0 + resources_used[resource_id] += used + resources_total[resource_id] += amount + used = max(0, used) + if amount > 0: + frac = used / float(amount) + if frac > max_frac: + max_frac = frac + nodes_used += max_frac + if max_frac > 0: + num_nonidle += 1 + + # If any nodes have a queue buildup, assume all non-idle nodes are 100% + # busy, plus the head node. This guards against the case of not scaling + # up due to poor task packing. + if has_saturated_node: + nodes_used = min(num_nonidle + 1.0, num_nodes) + + return nodes_used, resources_used, resources_total + + def info_string(self): + return ", ".join( + ["{}={}".format(k, v) for k, v in sorted(self._info().items())]) + + def _info(self): + nodes_used, resources_used, resources_total = self.get_resource_usage() + + now = time.time() + idle_times = [now - t for t in self.last_used_time_by_ip.values()] + heartbeat_times = [ + now - t for t in self.last_heartbeat_time_by_ip.values() + ] + most_delayed_heartbeats = sorted( + self.last_heartbeat_time_by_ip.items(), + key=lambda pair: pair[1])[:5] + most_delayed_heartbeats = { + ip: (now - t) + for ip, t in most_delayed_heartbeats + } + + def format_resource(key, value): + if key in ["object_store_memory", "memory"]: + return "{} GiB".format( + round(value * MEMORY_RESOURCE_UNIT_BYTES / 1e9, 2)) + else: + return round(value, 2) + + return { + "ResourceUsage": ", ".join([ + "{}/{} {}".format( + format_resource(rid, resources_used[rid]), + format_resource(rid, resources_total[rid]), rid) + for rid in sorted(resources_used) + ]), + "NumNodesConnected": len(self.static_resources_by_ip), + "NumNodesUsed": round(nodes_used, 2), + "NodeIdleSeconds": "Min={} Mean={} Max={}".format( + int(np.min(idle_times)) if idle_times else -1, + int(np.mean(idle_times)) if idle_times else -1, + int(np.max(idle_times)) if idle_times else -1), + "TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format( + int(np.min(heartbeat_times)) if heartbeat_times else -1, + int(np.mean(heartbeat_times)) if heartbeat_times else -1, + int(np.max(heartbeat_times)) if heartbeat_times else -1), + "MostDelayedHeartbeats": most_delayed_heartbeats, + } diff --git a/python/ray/autoscaler/node_launcher.py b/python/ray/autoscaler/node_launcher.py new file mode 100644 index 000000000..f23846a78 --- /dev/null +++ b/python/ray/autoscaler/node_launcher.py @@ -0,0 +1,57 @@ +import logging +import threading + +from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, + TAG_RAY_NODE_TYPE, TAG_RAY_NODE_NAME, + STATUS_UNINITIALIZED, NODE_TYPE_WORKER) +from ray.autoscaler.util import hash_launch_conf + +logger = logging.getLogger(__name__) + + +class NodeLauncher(threading.Thread): + """Launches nodes asynchronously in the background.""" + + def __init__(self, provider, queue, pending, index=None, *args, **kwargs): + self.queue = queue + self.pending = pending + self.provider = provider + self.index = str(index) if index is not None else "" + super(NodeLauncher, self).__init__(*args, **kwargs) + + def _launch_node(self, config, count, instance_type): + worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER} + before = self.provider.non_terminated_nodes(tag_filters=worker_filter) + launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"]) + self.log("Launching {} nodes, type {}.".format(count, instance_type)) + node_config = config["worker_nodes"] + node_tags = { + TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]), + TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER, + TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED, + TAG_RAY_LAUNCH_CONFIG: launch_hash, + } + if instance_type: + # node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type + self.provider.create_node_of_type(node_config, node_tags, + instance_type, count) + else: + self.provider.create_node(node_config, node_tags, count) + after = self.provider.non_terminated_nodes(tag_filters=worker_filter) + if set(after).issubset(before): + self.log("No new nodes reported after node creation.") + + def run(self): + while True: + config, count, instance_type = self.queue.get() + self.log("Got {} nodes to launch.".format(count)) + try: + self._launch_node(config, count, instance_type) + except Exception: + logger.exception("Launch failed") + finally: + self.pending.dec(instance_type, count) + + def log(self, statement): + prefix = "NodeLauncher{}:".format(self.index) + logger.info(prefix + " {}".format(statement)) diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/util.py new file mode 100644 index 000000000..7c56cca31 --- /dev/null +++ b/python/ray/autoscaler/util.py @@ -0,0 +1,129 @@ +import collections +import os +import json +import threading +import hashlib +import jsonschema + +import ray +import ray.services as services +from ray.autoscaler.node_provider import get_default_config +from ray.autoscaler.docker import dockerize_if_needed + +REQUIRED, OPTIONAL = True, False +RAY_SCHEMA_PATH = os.path.join( + os.path.dirname(ray.autoscaler.__file__), "ray-schema.json") + + +class ConcurrentCounter: + def __init__(self): + self._lock = threading.RLock() + self._counter = collections.defaultdict(int) + + def inc(self, key, count): + with self._lock: + self._counter[key] += count + return self.value + + def dec(self, key, count): + with self._lock: + self._counter[key] -= count + assert self._counter[key] >= 0, "counter cannot go negative" + return self.value + + def breakdown(self): + with self._lock: + return dict(self._counter) + + @property + def value(self): + with self._lock: + return sum(self._counter.values()) + + +def validate_config(config): + """Required Dicts indicate that no extra fields can be introduced.""" + if not isinstance(config, dict): + raise ValueError("Config {} is not a dictionary".format(config)) + + with open(RAY_SCHEMA_PATH) as f: + schema = json.load(f) + try: + jsonschema.validate(config, schema) + except jsonschema.ValidationError as e: + raise jsonschema.ValidationError(message=e.message) from None + + +def fillout_defaults(config): + defaults = get_default_config(config["provider"]) + defaults.update(config) + merge_setup_commands(defaults) + dockerize_if_needed(defaults) + defaults["auth"] = defaults.get("auth", {}) + return defaults + + +def merge_setup_commands(config): + config["head_setup_commands"] = ( + config["setup_commands"] + config["head_setup_commands"]) + config["worker_setup_commands"] = ( + config["setup_commands"] + config["worker_setup_commands"]) + return config + + +def with_head_node_ip(cmds): + head_ip = services.get_node_ip_address() + out = [] + for cmd in cmds: + out.append("export RAY_HEAD_IP={}; {}".format(head_ip, cmd)) + return out + + +def hash_launch_conf(node_conf, auth): + hasher = hashlib.sha1() + hasher.update( + json.dumps([node_conf, auth], sort_keys=True).encode("utf-8")) + return hasher.hexdigest() + + +# Cache the file hashes to avoid rescanning it each time. Also, this avoids +# inadvertently restarting workers if the file mount content is mutated on the +# head node. +_hash_cache = {} + + +def hash_runtime_conf(file_mounts, extra_objs): + hasher = hashlib.sha1() + + def add_content_hashes(path): + def add_hash_of_file(fpath): + with open(fpath, "rb") as f: + for chunk in iter(lambda: f.read(2**20), b""): + hasher.update(chunk) + + path = os.path.expanduser(path) + if os.path.isdir(path): + dirs = [] + for dirpath, _, filenames in os.walk(path): + dirs.append((dirpath, sorted(filenames))) + for dirpath, filenames in sorted(dirs): + hasher.update(dirpath.encode("utf-8")) + for name in filenames: + hasher.update(name.encode("utf-8")) + fpath = os.path.join(dirpath, name) + add_hash_of_file(fpath) + else: + add_hash_of_file(path) + + conf_str = (json.dumps(file_mounts, sort_keys=True).encode("utf-8") + + json.dumps(extra_objs, sort_keys=True).encode("utf-8")) + + # Important: only hash the files once. Otherwise, we can end up restarting + # workers if the files were changed and we re-hashed them. + if conf_str not in _hash_cache: + hasher.update(conf_str) + for local_path in sorted(file_mounts.values()): + add_content_hashes(local_path) + _hash_cache[conf_str] = hasher.hexdigest() + + return _hash_cache[conf_str] diff --git a/python/ray/monitor.py b/python/ray/monitor.py index a4cf9b736..722ff8b84 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -6,7 +6,8 @@ import traceback import json import ray -from ray.autoscaler.autoscaler import LoadMetrics, StandardAutoscaler +from ray.autoscaler.autoscaler import StandardAutoscaler +from ray.autoscaler.load_metrics import LoadMetrics import ray.gcs_utils import ray.utils import ray.ray_constants as ray_constants diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index ad34377de..d5b332881 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -10,8 +10,9 @@ from jsonschema.exceptions import ValidationError import ray import ray.services as services -from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \ - fillout_defaults, validate_config +from ray.autoscaler.util import fillout_defaults, validate_config +from ray.autoscaler.load_metrics import LoadMetrics +from ray.autoscaler.autoscaler import StandardAutoscaler from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider @@ -545,13 +546,13 @@ class AutoscalingTest(unittest.TestCase): # Update will try to create, but will block until we set the flag self.provider.ready_to_create.clear() autoscaler.update() - assert autoscaler.num_launches_pending.value == 2 + assert autoscaler.pending_launches.value == 2 assert len(self.provider.non_terminated_nodes({})) == 0 # Set the flag, check it updates self.provider.ready_to_create.set() self.waitForNodes(2) - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 # Update the config to reduce the cluster size new_config = SMALL_CLUSTER.copy() @@ -583,12 +584,9 @@ class AutoscalingTest(unittest.TestCase): rtc1.clear() autoscaler.update() # Synchronization: wait for launchy thread to be blocked on rtc1 - if hasattr(rtc1, "_cond"): # Python 3.5 - waiters = rtc1._cond._waiters - else: # Python 2.7 - waiters = rtc1._Event__cond._Condition__waiters + waiters = rtc1._cond._waiters self.waitFor(lambda: len(waiters) == 1) - assert autoscaler.num_launches_pending.value == 5 + assert autoscaler.pending_launches.value == 5 assert len(self.provider.non_terminated_nodes({})) == 0 # Call update() to launch a second wave of 3 nodes, @@ -599,24 +597,24 @@ class AutoscalingTest(unittest.TestCase): rtc2.set() autoscaler.update() self.waitForNodes(3) - assert autoscaler.num_launches_pending.value == 5 + assert autoscaler.pending_launches.value == 5 # The first wave of 5 will now tragically fail self.provider.fail_creates = True rtc1.set() - self.waitFor(lambda: autoscaler.num_launches_pending.value == 0) + self.waitFor(lambda: autoscaler.pending_launches.value == 0) assert len(self.provider.non_terminated_nodes({})) == 3 # Retry the first wave, allowing it to succeed this time self.provider.fail_creates = False autoscaler.update() self.waitForNodes(8) - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 # Final wave of 2 nodes autoscaler.update() self.waitForNodes(10) - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 def testUpdateThrottling(self): config_path = self.write_config(SMALL_CLUSTER) @@ -632,7 +630,7 @@ class AutoscalingTest(unittest.TestCase): update_interval_s=10) autoscaler.update() self.waitForNodes(2) - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 new_config = SMALL_CLUSTER.copy() new_config["max_workers"] = 1 self.write_config(new_config) @@ -641,7 +639,7 @@ class AutoscalingTest(unittest.TestCase): # note that node termination happens in the main thread, so # we do not need to add any delay here before checking assert len(self.provider.non_terminated_nodes({})) == 2 - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 def testLaunchConfigChange(self): config_path = self.write_config(SMALL_CLUSTER) @@ -682,7 +680,7 @@ class AutoscalingTest(unittest.TestCase): for _ in range(10): autoscaler.update() time.sleep(0.1) - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 2 # New a good config again @@ -812,7 +810,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(1) autoscaler.update() - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 1 # Scales up as nodes are reported as used @@ -829,19 +827,19 @@ class AutoscalingTest(unittest.TestCase): lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {}) lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}) autoscaler.update() - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 5 # Scales down as nodes become unused lm.last_used_time_by_ip["172.0.0.0"] = 0 lm.last_used_time_by_ip["172.0.0.1"] = 0 autoscaler.update() - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 3 lm.last_used_time_by_ip["172.0.0.2"] = 0 lm.last_used_time_by_ip["172.0.0.3"] = 0 autoscaler.update() - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 1 def testDontScaleBelowTarget(self): @@ -861,7 +859,7 @@ class AutoscalingTest(unittest.TestCase): update_interval_s=0) assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() - assert autoscaler.num_launches_pending.value == 0 + assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 0 # Scales up as nodes are reported as used diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index b83e76014..4e8802c3c 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -5,7 +5,7 @@ import yaml import urllib import tempfile -from ray.autoscaler.autoscaler import fillout_defaults, validate_config +from ray.autoscaler.util import fillout_defaults, validate_config from ray.test_utils import recursive_fnmatch RAY_PATH = os.path.abspath(os.path.join(__file__, "../../"))