From b6c42f96beab3ee00fe4b246e5e9d0479ad379ca Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 31 Dec 2017 14:39:57 -0800 Subject: [PATCH] Auto-scale ray clusters based on GCS load metrics (#1348) This adds (experimental) auto-scaling support for Ray clusters based on GCS load metrics. The auto-scaling algorithm is as follows: Based on current (instantaneous) load information, we compute the approximate number of "used workers". This is based on the bottleneck resource, e.g. if 8/8 GPUs are used in a 8-node cluster but all the CPUs are idle, the number of used nodes is still counted as 8. This number can also be fractional. We scale that number by 1 / target_utilization_fraction and round up to determine the target cluster size (subject to the max_workers constraint). The autoscaler control loop takes care of launching new nodes until the target cluster size is met. When a node is idle for more than idle_timeout_minutes, we remove it from the cluster if that would not drop the cluster size below min_workers. Note that we'll need to update the wheel in the example yaml file after this PR is merged. --- python/ray/autoscaler/autoscaler.py | 303 ++++++++++++++---- .../autoscaler/aws/development-example.yaml | 58 ++-- python/ray/autoscaler/aws/example.yaml | 42 ++- python/ray/autoscaler/aws/node_provider.py | 27 +- python/ray/autoscaler/commands.py | 110 ++++--- python/ray/autoscaler/node_provider.py | 9 +- python/ray/autoscaler/updater.py | 16 +- python/ray/monitor.py | 47 ++- python/ray/ray_constants.py | 20 ++ python/ray/scripts/scripts.py | 11 +- python/ray/services.py | 9 +- test/autoscaler_test.py | 181 +++++++++-- 12 files changed, 657 insertions(+), 176 deletions(-) create mode 100644 python/ray/ray_constants.py diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index d57cb7189..13f97e0ea 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -6,12 +6,18 @@ import json import hashlib import os import subprocess +import time import traceback from collections import defaultdict +from datetime import datetime +import numpy as np import yaml +from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ + AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ + AUTOSCALER_HEARTBEAT_TIMEOUT_S from ray.autoscaler.node_provider import get_node_provider from ray.autoscaler.updater import NodeUpdaterProcess from ray.autoscaler.tags import TAG_RAY_LAUNCH_CONFIG, \ @@ -31,6 +37,14 @@ CLUSTER_CONFIG_SCHEMA = { # node. This takes precedence over min_workers. "max_workers": int, + # The autoscaler will scale up the cluster to this target fraction of + # resources usage. For example, if a cluster of 8 nodes is 100% busy + # and target_utilization was 0.8, it would resize the cluster to 10. + "target_utilization_fraction": float, + + # If a node is idle for this many minutes, it will be removed. + "idle_timeout_minutes": int, + # Cloud-provider specific configuration. "provider": { "type": str, # e.g. aws @@ -49,20 +63,114 @@ CLUSTER_CONFIG_SCHEMA = { # Map of remote paths to local paths, e.g. {"/tmp/data": "/my/local/data"} "file_mounts": dict, - # List of shell commands to run to initialize the head node. - "head_init_commands": list, + # List of common shell commands to run to initialize nodes. + "setup_commands": list, - # List of shell commands to run to initialize workers. - "worker_init_commands": list, + # Commands that will be run on the head node after common setup. + "head_setup_commands": list, + + # Commands that will be run on worker nodes after common setup. + "worker_setup_commands": list, + + # Command to start ray on the head node. You shouldn't need to modify this. + "head_start_ray_commands": list, + + # Command to start ray on worker nodes. You shouldn't need to modify this. + "worker_start_ray_commands": list, + + # Whether to avoid restarting the cluster during updates. This field is + # controlled by the ray --no-restart flag and cannot be set by the user. + "no_restart": None, } -# Abort autoscaling if more than this number of errors are encountered. This -# is a safety feature to prevent e.g. runaway node launches. -MAX_NUM_FAILURES = 5 +class LoadMetrics(object): + """Container for cluster load metrics. -# Max number of nodes to launch at a time. -MAX_CONCURRENT_LAUNCHES = 10 + Metrics here are updated from local scheduler heartbeats. The autoscaler + queries these metrics to determine when to scale up, and which nodes + can be removed. + """ + + def __init__(self): + self.last_used_time_by_ip = {} + self.last_heartbeat_time_by_ip = {} + self.static_resources_by_ip = {} + self.dynamic_resources_by_ip = {} + self.local_ip = services.get_node_ip_address() + + def update(self, ip, static_resources, dynamic_resources): + self.static_resources_by_ip[ip] = static_resources + self.dynamic_resources_by_ip[ip] = dynamic_resources + now = time.time() + if ip not in self.last_used_time_by_ip or \ + static_resources != dynamic_resources: + self.last_used_time_by_ip[ip] = now + self.last_heartbeat_time_by_ip[ip] = now + + def mark_active(self, ip): + self.last_heartbeat_time_by_ip[ip] = time.time() + + def prune_active_ips(self, active_ips): + active_ips = set(active_ips) + active_ips.add(self.local_ip) + + def prune(mapping): + unwanted = set(mapping) - active_ips + for unwanted_key in unwanted: + del mapping[unwanted_key] + if unwanted: + print( + "Removed {} stale ip mappings: {} not in {}".format( + len(unwanted), unwanted, active_ips)) + prune(self.last_used_time_by_ip) + prune(self.static_resources_by_ip) + prune(self.dynamic_resources_by_ip) + + def approx_workers_used(self): + return self._info()["NumNodesUsed"] + + def debug_string(self): + return " - {}".format( + "\n - ".join( + ["{}: {}".format(k, v) + for k, v in sorted(self._info().items())])) + + def _info(self): + nodes_used = 0.0 + resources_used = {} + resources_total = {} + now = time.time() + for ip, max_resources in self.static_resources_by_ip.items(): + avail_resources = self.dynamic_resources_by_ip[ip] + max_frac = 0.0 + for resource_id, amount in max_resources.items(): + used = amount - avail_resources[resource_id] + if resource_id not in resources_used: + resources_used[resource_id] = 0.0 + resources_total[resource_id] = 0.0 + resources_used[resource_id] += used + resources_total[resource_id] += amount + assert used >= 0 + if amount > 0: + frac = used / float(amount) + if frac > max_frac: + max_frac = frac + nodes_used += max_frac + idle_times = [now - t for t in self.last_used_time_by_ip.values()] + return { + "ResourceUsage": ", ".join([ + "{}/{} {}".format( + round(resources_used[rid], 2), + round(resources_total[rid], 2), rid) + for rid in sorted(resources_used)]), + "NumNodesConnected": len(self.static_resources_by_ip), + "NumNodesUsed": round(nodes_used, 2), + "NodeIdleSeconds": "Min={} Mean={} Max={}".format( + int(np.min(idle_times)) if idle_times else -1, + int(np.mean(idle_times)) if idle_times else -1, + int(np.max(idle_times)) if idle_times else -1), + } class StandardAutoscaler(object): @@ -84,12 +192,15 @@ class StandardAutoscaler(object): """ def __init__( - self, config_path, - max_concurrent_launches=MAX_CONCURRENT_LAUNCHES, - max_failures=MAX_NUM_FAILURES, process_runner=subprocess, - verbose_updates=False, node_updater_cls=NodeUpdaterProcess): + self, config_path, load_metrics, + max_concurrent_launches=AUTOSCALER_MAX_CONCURRENT_LAUNCHES, + max_failures=AUTOSCALER_MAX_NUM_FAILURES, + process_runner=subprocess, verbose_updates=False, + node_updater_cls=NodeUpdaterProcess, + update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S): self.config_path = config_path self.reload_config(errors_fatal=True) + self.load_metrics = load_metrics self.provider = get_node_provider( self.config["provider"], self.config["cluster_name"]) @@ -102,7 +213,10 @@ class StandardAutoscaler(object): # Map from node_id to NodeUpdater processes self.updaters = {} self.num_failed_updates = defaultdict(int) + self.num_successful_updates = defaultdict(int) self.num_failures = 0 + self.last_update_time = 0.0 + self.update_interval_s = update_interval_s for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) @@ -123,43 +237,59 @@ class StandardAutoscaler(object): raise e def _update(self): - nodes = self.workers() - target_num_workers = self.config["max_workers"] + # Throttle autoscaling updates to this interval to avoid exceeding + # rate limits on API calls. + if time.time() - self.last_update_time < self.update_interval_s: + return - # Terminate nodes while there are too many - while len(nodes) > target_num_workers: + self.last_update_time = time.time() + nodes = self.workers() + print(self.debug_string(nodes)) + self.load_metrics.prune_active_ips( + [self.provider.internal_ip(node_id) for node_id in nodes]) + + # Terminate any idle or out of date nodes + last_used = self.load_metrics.last_used_time_by_ip + horizon = time.time() - (60 * self.config["idle_timeout_minutes"]) + num_terminated = 0 + for node_id in nodes: + node_ip = self.provider.internal_ip(node_id) + if node_ip in last_used and last_used[node_ip] < horizon and \ + len(nodes) - num_terminated > self.config["min_workers"]: + num_terminated += 1 + print( + "StandardAutoscaler: Terminating idle node: " + "{}".format(node_id)) + self.provider.terminate_node(node_id) + elif not self.launch_config_ok(node_id): + num_terminated += 1 + print( + "StandardAutoscaler: Terminating outdated node: " + "{}".format(node_id)) + self.provider.terminate_node(node_id) + if num_terminated > 0: + nodes = self.workers() + print(self.debug_string(nodes)) + + # Terminate nodes if there are too many + num_terminated = 0 + while len(nodes) > self.config["max_workers"]: + num_terminated += 1 print( "StandardAutoscaler: Terminating unneeded node: " "{}".format(nodes[-1])) self.provider.terminate_node(nodes[-1]) + nodes = nodes[:-1] + if num_terminated > 0: nodes = self.workers() - print(self.debug_string()) + print(self.debug_string(nodes)) - if target_num_workers == 0: - return - - # Update nodes with out-of-date files - for node_id in nodes: - self.update_if_needed(node_id) - - # Launch a new node if needed - if len(nodes) < target_num_workers: + # Launch new nodes if needed + target_num = self.target_num_workers() + if len(nodes) < target_num: self.launch_new_node( - min( - self.max_concurrent_launches, - target_num_workers - len(nodes))) + min(self.max_concurrent_launches, target_num - len(nodes))) print(self.debug_string()) - return - else: - # If enough nodes, terminate an out-of-date node. - for node_id in nodes: - if not self.launch_config_ok(node_id): - print( - "StandardAutoscaler: Terminating outdated node: " - "{}".format(node_id)) - self.provider.terminate_node(node_id) - print(self.debug_string()) - return # Process any completed updates completed = [] @@ -168,10 +298,24 @@ class StandardAutoscaler(object): completed.append(node_id) if completed: for node_id in completed: - if self.updaters[node_id].exitcode != 0: + if self.updaters[node_id].exitcode == 0: + self.num_successful_updates[node_id] += 1 + else: self.num_failed_updates[node_id] += 1 del self.updaters[node_id] - print(self.debug_string()) + # Mark the node as active to prevent the node recovery logic + # immediately trying to restart Ray on the new node. + self.load_metrics.mark_active(self.provider.internal_ip(node_id)) + nodes = self.workers() + print(self.debug_string(nodes)) + + # Update nodes with out-of-date files + for node_id in nodes: + self.update_if_needed(node_id) + + # Attempt to recover unhealthy nodes + for node_id in nodes: + self.recover_if_needed(node_id) def reload_config(self, errors_fatal=False): try: @@ -181,7 +325,10 @@ class StandardAutoscaler(object): new_launch_hash = hash_launch_conf( new_config["worker_nodes"], new_config["auth"]) new_runtime_hash = hash_runtime_conf( - new_config["file_mounts"], new_config["worker_init_commands"]) + new_config["file_mounts"], + [new_config["setup_commands"], + new_config["worker_setup_commands"], + new_config["worker_start_ray_commands"]]) self.config = new_config self.launch_hash = new_launch_hash self.runtime_hash = new_runtime_hash @@ -193,6 +340,14 @@ class StandardAutoscaler(object): "StandardAutoscaler: Error parsing config: {}", traceback.format_exc()) + def target_num_workers(self): + target_frac = self.config["target_utilization_fraction"] + cur_used = self.load_metrics.approx_workers_used() + ideal_num_workers = int(np.ceil(cur_used / float(target_frac))) + return min( + self.config["max_workers"], + max(self.config["min_workers"], ideal_num_workers)) + def launch_config_ok(self, node_id): launch_conf = self.provider.node_tags(node_id).get( TAG_RAY_LAUNCH_CONFIG) @@ -209,30 +364,66 @@ class StandardAutoscaler(object): return False return True + def recover_if_needed(self, node_id): + if not self.can_update(node_id): + return + last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip.get( + self.provider.internal_ip(node_id), 0) + if time.time() - last_heartbeat_time < AUTOSCALER_HEARTBEAT_TIMEOUT_S: + return + print("StandardAutoscaler: Restarting Ray on {}".format(node_id)) + updater = self.node_updater_cls( + node_id, + self.config["provider"], + self.config["auth"], + self.config["cluster_name"], + {}, + with_head_node_ip(self.config["worker_start_ray_commands"]), + self.runtime_hash, + redirect_output=not self.verbose_updates, + process_runner=self.process_runner) + updater.start() + self.updaters[node_id] = updater + def update_if_needed(self, node_id): - if not self.provider.is_running(node_id): - return - if not self.launch_config_ok(node_id): - return - if node_id in self.updaters: - return - if self.num_failed_updates.get(node_id, 0) > 0: # TODO(ekl) retry? + if not self.can_update(node_id): return if self.files_up_to_date(node_id): return + if self.config.get("no_restart", False) and \ + self.num_successful_updates.get(node_id, 0) > 0: + init_commands = ( + self.config["setup_commands"] + + self.config["worker_setup_commands"]) + else: + init_commands = ( + self.config["setup_commands"] + + self.config["worker_setup_commands"] + + self.config["worker_start_ray_commands"]) updater = self.node_updater_cls( node_id, self.config["provider"], self.config["auth"], self.config["cluster_name"], self.config["file_mounts"], - with_head_node_ip(self.config["worker_init_commands"]), + with_head_node_ip(init_commands), self.runtime_hash, redirect_output=not self.verbose_updates, process_runner=self.process_runner) updater.start() self.updaters[node_id] = updater + def can_update(self, node_id): + if not self.provider.is_running(node_id): + return False + if not self.launch_config_ok(node_id): + return False + if node_id in self.updaters: + return False + if self.num_failed_updates.get(node_id, 0) > 0: # TODO(ekl) retry? + return False + return True + def launch_new_node(self, count): print("StandardAutoscaler: Launching {} new nodes".format(count)) num_before = len(self.workers()) @@ -257,21 +448,23 @@ class StandardAutoscaler(object): def debug_string(self, nodes=None): if nodes is None: nodes = self.workers() - target_num_workers = self.config["max_workers"] suffix = "" if self.updaters: suffix += " ({} updating)".format(len(self.updaters)) if self.num_failed_updates: suffix += " ({} failed to update)".format( len(self.num_failed_updates)) - return "StandardAutoscaler: Have {} / {} target nodes{}".format( - len(nodes), target_num_workers, suffix) + return "StandardAutoscaler [{}]: {}/{} target nodes{}\n{}".format( + datetime.now(), len(nodes), self.target_num_workers(), + suffix, self.load_metrics.debug_string()) def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA): if type(config) is not dict: raise ValueError("Config is not a dictionary") for k, v in schema.items(): + if v is None: + continue # None means we don't validate the field if k not in config: raise ValueError( "Missing required config key `{}` of type {}".format( diff --git a/python/ray/autoscaler/aws/development-example.yaml b/python/ray/autoscaler/aws/development-example.yaml index e4f618427..8ed34c34a 100644 --- a/python/ray/autoscaler/aws/development-example.yaml +++ b/python/ray/autoscaler/aws/development-example.yaml @@ -9,6 +9,15 @@ min_workers: 2 # node. This takes precedence over min_workers. max_workers: 2 +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + # Cloud-provider specific configuration. provider: type: aws @@ -56,37 +65,32 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# List of shell commands to run to initialize the head node. -head_init_commands: +# List of shell commands to run to set up nodes. +setup_commands: # Install basics. - sudo apt-get update - sudo apt-get install -y cmake pkg-config build-essential autoconf curl libtool unzip python # Install Anaconda. - - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh - - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 + - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true + - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc # Build Ray. - - git clone https://github.com/ray-project/ray - - PATH=/home/ubuntu/anaconda3/bin:$PATH pip install -U cloudpickle boto3==1.4.8 - - cd ray/python; PATH=/home/ubuntu/anaconda3/bin:$PATH python setup.py develop - # Start Ray. - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray stop - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray start --head --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml + - git clone https://github.com/ray-project/ray || true + - pip install -U cloudpickle boto3==1.4.8 + - cd ray/python; python setup.py develop -# List of shell commands to run to initialize workers. -worker_init_commands: - # Install basics. - - sudo apt-get update - - sudo apt-get install -y cmake pkg-config build-essential autoconf curl libtool unzip python - # Install Anaconda. - - sudo apt-get update - - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh - - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 - - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc - # Build Ray. - - git clone https://github.com/ray-project/ray - - PATH=/home/ubuntu/anaconda3/bin:$PATH pip install -U cloudpickle boto3==1.4.8 - - cd ray/python; PATH=/home/ubuntu/anaconda3/bin:$PATH python setup.py develop - # Start Ray. - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray stop - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray start --head --redis-address=$RAY_HEAD_IP:6379 +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ray start --head --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ray start --redis-address=$RAY_HEAD_IP:6379 diff --git a/python/ray/autoscaler/aws/example.yaml b/python/ray/autoscaler/aws/example.yaml index dd34dbc10..933d3db8b 100644 --- a/python/ray/autoscaler/aws/example.yaml +++ b/python/ray/autoscaler/aws/example.yaml @@ -7,7 +7,16 @@ min_workers: 0 # The maximum number of workers nodes to launch in addition to the head # node. This takes precedence over min_workers. -max_workers: 2 +max_workers: 4 + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 # Cloud-provider specific configuration. provider: @@ -56,18 +65,27 @@ file_mounts: { # "/path2/on/remote/machine": "/path2/on/local/machine", } -# List of shell commands to run to initialize the head node. -head_init_commands: +# List of shell commands to run to set up nodes. +setup_commands: # Note: if you're developing Ray, you probably want to create an AMI that # has your Ray repo pre-cloned. Then, you can replace the pip installs # below with a git checkout (and possibly a recompile). - - PATH=/home/ubuntu/anaconda3/bin:$PATH pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/f5ea44338eca392df3a868035df3901829cc2ca1/ray-0.3.0-cp36-cp36m-manylinux1_x86_64.whl - - PATH=/home/ubuntu/anaconda3/bin:$PATH pip install boto3==1.4.8 # 1.4.8 adds InstanceMarketOptions - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray stop - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray start --head --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml + # TODO(ekl) update this to a wheel from master + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/f5ea44338eca392df3a868035df3901829cc2ca1/ray-0.3.0-cp36-cp36m-manylinux1_x86_64.whl -# List of shell commands to run to initialize workers. -worker_init_commands: - - PATH=/home/ubuntu/anaconda3/bin:$PATH pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/f5ea44338eca392df3a868035df3901829cc2ca1/ray-0.3.0-cp36-cp36m-manylinux1_x86_64.whl - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray stop - - PATH=/home/ubuntu/anaconda3/bin:$PATH ray start --redis-address=$RAY_HEAD_IP:6379 +# Custom commands that will be run on the head node after common setup. +head_setup_commands: + - pip install boto3==1.4.8 # 1.4.8 adds InstanceMarketOptions + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ray start --head --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ray start --redis-address=$RAY_HEAD_IP:6379 diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index 3b6e47614..ca6576ac7 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -13,6 +13,14 @@ class AWSNodeProvider(NodeProvider): NodeProvider.__init__(self, provider_config, cluster_name) self.ec2 = boto3.resource("ec2", region_name=provider_config["region"]) + # Cache of node objects from the last nodes() call. This avoids + # excessive DescribeInstances requests. + self.cached_nodes = {} + + # Cache of ip lookups. We assume IPs never change once assigned. + self.internal_ip_cache = {} + self.external_ip_cache = {} + def nodes(self, tag_filters): filters = [ { @@ -30,6 +38,7 @@ class AWSNodeProvider(NodeProvider): "Values": [v], }) instances = list(self.ec2.instances.filter(Filters=filters)) + self.cached_nodes = {i.id: i for i in instances} return [i.id for i in instances] def is_running(self, node_id): @@ -49,8 +58,22 @@ class AWSNodeProvider(NodeProvider): return tags def external_ip(self, node_id): + if node_id in self.external_ip_cache: + return self.external_ip_cache[node_id] node = self._node(node_id) - return node.public_ip_address + ip = node.public_ip_address + if ip: + self.external_ip_cache[node_id] = ip + return ip + + def internal_ip(self, node_id): + if node_id in self.internal_ip_cache: + return self.internal_ip_cache[node_id] + node = self._node(node_id) + ip = node.private_ip_address + if ip: + self.internal_ip_cache[node_id] = ip + return ip def set_node_tags(self, node_id, tags): node = self._node(node_id) @@ -90,6 +113,8 @@ class AWSNodeProvider(NodeProvider): node.terminate() def _node(self, node_id): + if node_id in self.cached_nodes: + return self.cached_nodes[node_id] matches = list(self.ec2.instances.filter(InstanceIds=[node_id])) assert len(matches) == 1, "Invalid instance id {}".format(node_id) return matches[0] diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index d0a1d5bf2..28aa59042 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -14,12 +14,12 @@ from ray.autoscaler.autoscaler import validate_config, hash_runtime_conf, \ hash_launch_conf from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ - TAG_RAY_RUNTIME_CONFIG, TAG_NAME + TAG_NAME from ray.autoscaler.updater import NodeUpdaterProcess def create_or_update_cluster( - config_file, override_min_workers, override_max_workers, sync_only): + config_file, override_min_workers, override_max_workers, no_restart): """Create or updates an autoscaling Ray cluster from a config json.""" config = yaml.load(open(config_file).read()) @@ -29,9 +29,6 @@ def create_or_update_cluster( config["min_workers"] = override_min_workers if override_max_workers is not None: config["max_workers"] = override_max_workers - if sync_only: - config["worker_init_commands"] = [] - config["head_init_commands"] = [] importer = NODE_PROVIDERS.get(config["provider"]["type"]) if not importer: @@ -40,7 +37,7 @@ def create_or_update_cluster( bootstrap_config, _ = importer() config = bootstrap_config(config) - get_or_create_head_node(config) + get_or_create_head_node(config, no_restart) def teardown_cluster(config_file): @@ -48,6 +45,8 @@ def teardown_cluster(config_file): config = yaml.load(open(config_file).read()) + confirm("This will destroy your cluster") + validate_config(config) provider = get_node_provider(config["provider"], config["cluster_name"]) head_node_tags = { @@ -65,7 +64,7 @@ def teardown_cluster(config_file): nodes = provider.nodes({}) -def get_or_create_head_node(config): +def get_or_create_head_node(config, no_restart): """Create the cluster head node, which in turn creates the workers.""" provider = get_node_provider(config["provider"], config["cluster_name"]) @@ -78,6 +77,11 @@ def get_or_create_head_node(config): else: head_node = None + if not head_node: + confirm("This will create a new cluster") + elif not no_restart: + confirm("This will restart your cluster") + launch_hash = hash_launch_conf(config["head_node"], config["auth"]) if head_node is None or provider.node_tags(head_node).get( TAG_RAY_LAUNCH_CONFIG) != launch_hash: @@ -93,48 +97,60 @@ def get_or_create_head_node(config): assert len(nodes) == 1, "Failed to create head node." head_node = nodes[0] + # TODO(ekl) right now we always update the head node even if the hash + # matches. We could prompt the user for what they want to do in this case. runtime_hash = hash_runtime_conf(config["file_mounts"], config) + print("Updating files on head node...") - if provider.node_tags(head_node).get( - TAG_RAY_RUNTIME_CONFIG) != runtime_hash: - print("Updating files on head node...") + # Rewrite the auth config so that the head node can update the workers + remote_key_path = "~/ray_bootstrap_key.pem" + remote_config = copy.deepcopy(config) + remote_config["auth"]["ssh_private_key"] = remote_key_path - # Rewrite the auth config so that the head node can update the workers - remote_key_path = "~/ray_bootstrap_key.pem" - remote_config = copy.deepcopy(config) - remote_config["auth"]["ssh_private_key"] = remote_key_path + # Adjust for new file locations + new_mounts = {} + for remote_path in config["file_mounts"]: + new_mounts[remote_path] = remote_path + remote_config["file_mounts"] = new_mounts + remote_config["no_restart"] = no_restart - # Adjust for new file locations - new_mounts = {} - for remote_path in config["file_mounts"]: - new_mounts[remote_path] = remote_path - remote_config["file_mounts"] = new_mounts + # Now inject the rewritten config and SSH key into the head node + remote_config_file = tempfile.NamedTemporaryFile( + "w", prefix="ray-bootstrap-") + remote_config_file.write(json.dumps(remote_config)) + remote_config_file.flush() + config["file_mounts"].update({ + remote_key_path: config["auth"]["ssh_private_key"], + "~/ray_bootstrap_config.yaml": remote_config_file.name + }) - # Now inject the rewritten config and SSH key into the head node - remote_config_file = tempfile.NamedTemporaryFile( - "w", prefix="ray-bootstrap-") - remote_config_file.write(json.dumps(remote_config)) - remote_config_file.flush() - config["file_mounts"].update({ - remote_key_path: config["auth"]["ssh_private_key"], - "~/ray_bootstrap_config.yaml": remote_config_file.name - }) + if no_restart: + init_commands = ( + config["setup_commands"] + config["head_setup_commands"]) + else: + init_commands = ( + config["setup_commands"] + config["head_setup_commands"] + + config["head_start_ray_commands"]) - updater = NodeUpdaterProcess( - head_node, - config["provider"], - config["auth"], - config["cluster_name"], - config["file_mounts"], - config["head_init_commands"], - runtime_hash, - redirect_output=False) - updater.start() - updater.join() - if updater.exitcode != 0: - print("Error: updating {} failed".format( - provider.external_ip(head_node))) - sys.exit(1) + updater = NodeUpdaterProcess( + head_node, + config["provider"], + config["auth"], + config["cluster_name"], + config["file_mounts"], + init_commands, + runtime_hash, + redirect_output=False) + updater.start() + updater.join() + + # Refresh the node cache so we see the external ip if available + provider.nodes(head_node_tags) + + if updater.exitcode != 0: + print("Error: updating {} failed".format( + provider.external_ip(head_node))) + sys.exit(1) print( "Head node up-to-date, IP address is: {}".format( provider.external_ip(head_node))) @@ -150,3 +166,11 @@ def get_or_create_head_node(config): config["auth"]["ssh_private_key"], config["auth"]["ssh_user"], provider.external_ip(head_node))) + + +def confirm(msg): + print("{}. Do you want to continue [y/N]? ".format(msg), end="") + answer = input() + if answer.strip().lower() != "y": + print("Abort.") + exit(1) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index e936477e2..f5e4ee831 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -46,7 +46,10 @@ class NodeProvider(object): def nodes(self, tag_filters): """Return a list of node ids filtered by the specified tags dict. - This list must not include terminated nodes. + This list must not include terminated nodes. For performance reasons, + providers are allowed to cache the result of a call to nodes() to + serve single-node queries (e.g. is_running(node_id)). This means that + nodes() must be called again to refresh results. Examples: >>> provider.nodes({TAG_RAY_NODE_TYPE: "Worker"}) @@ -70,6 +73,10 @@ class NodeProvider(object): """Returns the external ip of the given node.""" raise NotImplementedError + def internal_ip(self, node_id): + """Returns the internal ip (Ray ip) of the given node.""" + raise NotImplementedError + def create_node(self, node_config, tags, count): """Creates a number of nodes within the namespace.""" raise NotImplementedError diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 3d8759cc4..d200a522b 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import pipes import os import subprocess import sys @@ -23,7 +24,7 @@ class NodeUpdater(object): def __init__( self, node_id, provider_config, auth_config, cluster_name, - file_mounts, init_cmds, runtime_hash, redirect_output=True, + file_mounts, setup_cmds, runtime_hash, redirect_output=True, process_runner=subprocess): self.daemon = True self.process_runner = process_runner @@ -33,7 +34,7 @@ class NodeUpdater(object): self.ssh_ip = self.provider.external_ip(node_id) self.node_id = node_id self.file_mounts = file_mounts - self.init_cmds = init_cmds + self.setup_cmds = setup_cmds self.runtime_hash = runtime_hash if redirect_output: self.logfile = tempfile.NamedTemporaryFile( @@ -93,6 +94,7 @@ class NodeUpdater(object): assert self.ssh_ip is not None, "Unable to find IP of node" # Wait for SSH access + ssh_ok = False while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): try: @@ -105,6 +107,7 @@ class NodeUpdater(object): self.ssh_cmd( "uptime", connect_timeout=5, redirect=open("/dev/null", "w")) + ssh_ok = True except Exception as e: print( "NodeUpdater: SSH not up, retrying: {}".format(e), @@ -112,7 +115,7 @@ class NodeUpdater(object): time.sleep(5) else: break - assert not self.provider.is_terminated(self.node_id) + assert ssh_ok, "Unable to SSH to node" # Rsync file mounts self.provider.set_node_tags( @@ -139,8 +142,8 @@ class NodeUpdater(object): # Run init commands self.provider.set_node_tags( - self.node_id, {TAG_RAY_NODE_STATUS: "RunningInitCmds"}) - for cmd in self.init_cmds: + self.node_id, {TAG_RAY_NODE_STATUS: "SettingUp"}) + for cmd in self.setup_cmds: self.ssh_cmd(cmd, verbose=True) def ssh_cmd(self, cmd, connect_timeout=60, redirect=None, verbose=False): @@ -149,12 +152,13 @@ class NodeUpdater(object): "NodeUpdater: running {} on {}...".format( cmd, self.ssh_ip), file=self.stdout) + force_interactive = "set -i && source ~/.bashrc && " self.process_runner.check_call([ "ssh", "-o", "ConnectTimeout={}s".format(connect_timeout), "-o", "StrictHostKeyChecking=no", "-i", self.ssh_private_key, "{}@{}".format(self.ssh_user, self.ssh_ip), - cmd, + "bash --login -c {}".format(pipes.quote(force_interactive + cmd)) ], stdout=redirect or self.stdout, stderr=redirect or self.stderr) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index a9487e2d2..b58237f68 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import argparse +import binascii import json import logging import os @@ -14,9 +15,11 @@ import ray.utils import redis # Import flatbuffer bindings. from ray.core.generated.DriverTableMessage import DriverTableMessage +from ray.core.generated.LocalSchedulerInfoMessage import \ + LocalSchedulerInfoMessage from ray.core.generated.SubscribeToDBClientTableReply import \ - SubscribeToDBClientTableReply -from ray.autoscaler.autoscaler import StandardAutoscaler + SubscribeToDBClientTableReply +from ray.autoscaler.autoscaler import LoadMetrics, StandardAutoscaler from ray.core.generated.TaskInfo import TaskInfo from ray.services import get_ip_address, get_port from ray.utils import binary_to_hex, binary_to_object_id, hex_to_binary @@ -31,6 +34,7 @@ NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE TASK_STATUS_LOST = 32 # common/state/redis.cc +LOCAL_SCHEDULER_INFO_CHANNEL = b"local_schedulers" PLASMA_MANAGER_HEARTBEAT_CHANNEL = b"plasma_managers" DRIVER_DEATH_CHANNEL = b"driver_deaths" @@ -92,8 +96,10 @@ class Monitor(object): self.dead_local_schedulers = set() self.live_plasma_managers = Counter() self.dead_plasma_managers = set() + self.load_metrics = LoadMetrics() if autoscaling_config: - self.autoscaler = StandardAutoscaler(autoscaling_config) + self.autoscaler = StandardAutoscaler( + autoscaling_config, self.load_metrics) else: self.autoscaler = None @@ -286,6 +292,36 @@ class Monitor(object): # already dead. del self.live_plasma_managers[db_client_id] + def local_scheduler_info_handler(self, unused_channel, data): + """Handle a local scheduler heartbeat from Redis.""" + + message = LocalSchedulerInfoMessage.GetRootAsLocalSchedulerInfoMessage( + data, 0) + num_resources = message.DynamicResourcesLength() + static_resources = {} + dynamic_resources = {} + for i in range(num_resources): + dyn = message.DynamicResources(i) + static = message.StaticResources(i) + dynamic_resources[dyn.Key().decode("utf-8")] = dyn.Value() + static_resources[static.Key().decode("utf-8")] = static.Value() + client_id = binascii.hexlify(message.DbClientId()).decode("utf-8") + clients = ray.global_state.client_table() + local_schedulers = [ + entry for client in clients.values() for entry in client + if (entry["ClientType"] == "local_scheduler" and not + entry["Deleted"]) + ] + ip = None + for ls in local_schedulers: + if ls["DBClientID"] == client_id: + ip = ls["AuxAddress"].split(":")[0] + if ip: + self.load_metrics.update(ip, static_resources, dynamic_resources) + else: + print("Warning: could not find ip for client {} in {}".format( + client_id, local_schedulers)) + def plasma_manager_heartbeat_handler(self, unused_channel, data): """Handle a plasma manager heartbeat from Redis. @@ -513,6 +549,10 @@ class Monitor(object): assert self.subscribed[channel] # The message was a heartbeat from a plasma manager. message_handler = self.plasma_manager_heartbeat_handler + elif channel == LOCAL_SCHEDULER_INFO_CHANNEL: + assert self.subscribed[channel] + # The message was a heartbeat from a local scheduler + message_handler = self.local_scheduler_info_handler elif channel == DB_CLIENT_TABLE_NAME: assert self.subscribed[channel] # The message was a notification from the db_client table. @@ -537,6 +577,7 @@ class Monitor(object): """ # Initialize the subscription channel. self.subscribe(DB_CLIENT_TABLE_NAME) + self.subscribe(LOCAL_SCHEDULER_INFO_CHANNEL) self.subscribe(PLASMA_MANAGER_HEARTBEAT_CHANNEL) self.subscribe(DRIVER_DEATH_CHANNEL) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py new file mode 100644 index 000000000..30e239cf0 --- /dev/null +++ b/python/ray/ray_constants.py @@ -0,0 +1,20 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +"""Ray constants used in the Python code.""" + + +# Abort autoscaling if more than this number of errors are encountered. This +# is a safety feature to prevent e.g. runaway node launches. +AUTOSCALER_MAX_NUM_FAILURES = 5 + +# Max number of nodes to launch at a time. +AUTOSCALER_MAX_CONCURRENT_LAUNCHES = 10 + +# Interval at which to perform autoscaling updates. +AUTOSCALER_UPDATE_INTERVAL_S = 5 + +# The autoscaler will attempt to restart Ray on nodes it hasn't heard from +# in more than this interval. +AUTOSCALER_HEARTBEAT_TIMEOUT_S = 30 diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 25aa3fb8f..bf95ebe11 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -245,10 +245,9 @@ def stop(): @click.command() @click.argument("cluster_config_file", required=True, type=str) @click.option( - "--sync-only", is_flag=True, default=False, help=( - "Whether to only perform the file sync stage when updating nodes. " - "This avoids interrupting running jobs. You can use this when " - "resizing the cluster with the min/max_workers flag.")) + "--no-restart", is_flag=True, default=False, help=( + "Whether to skip restarting Ray services during the update. " + "This avoids interrupting running jobs.")) @click.option( "--min-workers", required=False, type=int, help=( "Override the configured min worker node count for the cluster.")) @@ -256,9 +255,9 @@ def stop(): "--max-workers", required=False, type=int, help=( "Override the configured max worker node count for the cluster.")) def create_or_update( - cluster_config_file, min_workers, max_workers, sync_only): + cluster_config_file, min_workers, max_workers, no_restart): create_or_update_cluster( - cluster_config_file, min_workers, max_workers, sync_only) + cluster_config_file, min_workers, max_workers, no_restart) @click.command() diff --git a/python/ray/services.py b/python/ray/services.py index 23743728c..b51210efc 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -554,7 +554,7 @@ def start_log_monitor(redis_address, node_ip_address, stdout_file=None, log_monitor_filepath = os.path.join( os.path.dirname(os.path.abspath(__file__)), "log_monitor.py") - p = subprocess.Popen([sys.executable, log_monitor_filepath, + p = subprocess.Popen([sys.executable, "-u", log_monitor_filepath, "--redis-address", redis_address, "--node-ip-address", node_ip_address], stdout=stdout_file, stderr=stderr_file) @@ -850,6 +850,7 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, default. """ command = [sys.executable, + "-u", worker_path, "--node-ip-address=" + node_ip_address, "--object-store-name=" + object_store_name, @@ -884,6 +885,7 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None, monitor_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "monitor.py") command = [sys.executable, + "-u", monitor_path, "--redis-address=" + str(redis_address)] if autoscaling_config: @@ -1347,6 +1349,7 @@ def new_log_files(name, redirect_output): date_str = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") log_stdout = "{}/{}-{}-{:05d}.out".format(logs_dir, name, date_str, log_id) log_stderr = "{}/{}-{}-{:05d}.err".format(logs_dir, name, date_str, log_id) - log_stdout_file = open(log_stdout, "a") - log_stderr_file = open(log_stderr, "a") + # Line-buffer the output (mode 1) + log_stdout_file = open(log_stdout, "a", buffering=1) + log_stderr_file = open(log_stderr, "a", buffering=1) return log_stdout_file, log_stderr_file diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 01df58744..919028dba 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -9,7 +9,7 @@ import unittest import yaml import ray -from ray.autoscaler.autoscaler import StandardAutoscaler +from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider from ray.autoscaler.updater import NodeUpdaterThread @@ -21,6 +21,7 @@ class MockNode(object): self.state = "pending" self.tags = tags self.external_ip = "1.2.3.4" + self.internal_ip = "172.0.0.{}".format(self.node_id) def matches(self, tags): for k, v in tags.items(): @@ -64,6 +65,9 @@ class MockProvider(NodeProvider): def node_tags(self, node_id): return self.mock_nodes[node_id].tags + def internal_ip(self, node_id): + return self.mock_nodes[node_id].internal_ip + def external_ip(self, node_id): return self.mock_nodes[node_id].external_ip @@ -85,6 +89,8 @@ SMALL_CLUSTER = { "cluster_name": "default", "min_workers": 2, "max_workers": 2, + "target_utilization_fraction": 0.8, + "idle_timeout_minutes": 5, "provider": { "type": "mock", "region": "us-east-1", @@ -100,11 +106,55 @@ SMALL_CLUSTER = { "TestProp": 2, }, "file_mounts": {}, - "head_init_commands": ["cmd1", "cmd2"], - "worker_init_commands": ["cmd1"], + "setup_commands": ["cmd1"], + "head_setup_commands": ["cmd2"], + "worker_setup_commands": ["cmd3"], + "head_start_ray_commands": ["start_ray_head"], + "worker_start_ray_commands": ["start_ray_worker"], } +class LoadMetricsTest(unittest.TestCase): + def testUpdate(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}) + self.assertEqual(lm.approx_workers_used(), 0.5) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) + self.assertEqual(lm.approx_workers_used(), 1.0) + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}) + self.assertEqual(lm.approx_workers_used(), 2.0) + + def testPruneByNodeIp(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}) + lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}) + lm.prune_active_ips({"1.1.1.1", "4.4.4.4"}) + self.assertEqual(lm.approx_workers_used(), 1.0) + + def testBottleneckResource(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) + lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) + self.assertEqual(lm.approx_workers_used(), 1.88) + + def testHeartbeat(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}) + lm.mark_active("2.2.2.2") + self.assertIn("1.1.1.1", lm.last_heartbeat_time_by_ip) + self.assertIn("2.2.2.2", lm.last_heartbeat_time_by_ip) + self.assertNotIn("3.3.3.3", lm.last_heartbeat_time_by_ip) + + def testDebugString(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) + lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) + debug = lm.debug_string() + self.assertIn("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU", debug) + self.assertIn("NumNodesConnected: 2", debug) + self.assertIn("NumNodesUsed: 1.88", debug) + + class AutoscalingTest(unittest.TestCase): def setUp(self): NODE_PROVIDERS["mock"] = \ @@ -137,12 +187,15 @@ class AutoscalingTest(unittest.TestCase): def testInvalidConfig(self): invalid_config = "/dev/null" self.assertRaises( - ValueError, lambda: StandardAutoscaler(invalid_config)) + ValueError, + lambda: StandardAutoscaler( + invalid_config, LoadMetrics(), update_interval_s=0)) def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() - autoscaler = StandardAutoscaler(config_path, max_failures=0) + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_failures=0, update_interval_s=0) self.assertEqual(len(self.provider.nodes({})), 0) autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -156,7 +209,8 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "Worker"}, 10) - autoscaler = StandardAutoscaler(config_path, max_failures=0) + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_failures=0, update_interval_s=0) self.assertEqual(len(self.provider.nodes({})), 10) # Gradually scales down to meet target size, never going too low @@ -172,7 +226,8 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() autoscaler = StandardAutoscaler( - config_path, max_concurrent_launches=5, max_failures=0) + config_path, LoadMetrics(), max_concurrent_launches=5, + max_failures=0, update_interval_s=0) self.assertEqual(len(self.provider.nodes({})), 0) autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -185,6 +240,7 @@ class AutoscalingTest(unittest.TestCase): self.assertEqual(len(self.provider.nodes({})), 1) # Update the config to reduce the cluster size + new_config["min_workers"] = 10 new_config["max_workers"] = 10 self.write_config(new_config) autoscaler.update() @@ -192,10 +248,25 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 10) + def testUpdateThrottling(self): + config_path = self.write_config(SMALL_CLUSTER) + self.provider = MockProvider() + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_concurrent_launches=5, + max_failures=0, update_interval_s=10) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 2) + new_config = SMALL_CLUSTER.copy() + new_config["max_workers"] = 1 + self.write_config(new_config) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 2) # not updated yet + def testLaunchConfigChange(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() - autoscaler = StandardAutoscaler(config_path, max_failures=0) + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_failures=0, update_interval_s=0) autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -214,7 +285,8 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() autoscaler = StandardAutoscaler( - config_path, max_concurrent_launches=10, max_failures=0) + config_path, LoadMetrics(), max_concurrent_launches=10, + max_failures=0, update_interval_s=0) autoscaler.update() # Write a corrupted config @@ -225,6 +297,7 @@ class AutoscalingTest(unittest.TestCase): # New a good config again new_config = SMALL_CLUSTER.copy() + new_config["min_workers"] = 10 new_config["max_workers"] = 10 self.write_config(new_config) autoscaler.update() @@ -234,7 +307,8 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() self.provider.throw = True - autoscaler = StandardAutoscaler(config_path, max_failures=2) + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_failures=2, update_interval_s=0) autoscaler.update() autoscaler.update() self.assertRaises(Exception, autoscaler.update) @@ -243,13 +317,15 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() self.provider.fail_creates = True - autoscaler = StandardAutoscaler(config_path, max_failures=0) + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_failures=0, update_interval_s=0) self.assertRaises(AssertionError, autoscaler.update) def testLaunchNewNodeOnOutOfBandTerminate(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() - autoscaler = StandardAutoscaler(config_path, max_failures=0) + autoscaler = StandardAutoscaler( + config_path, LoadMetrics(), max_failures=0, update_interval_s=0) autoscaler.update() autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -264,8 +340,9 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, max_failures=0, process_runner=runner, - verbose_updates=True, node_updater_cls=NodeUpdaterThread) + config_path, LoadMetrics(), max_failures=0, process_runner=runner, + verbose_updates=True, node_updater_cls=NodeUpdaterThread, + update_interval_s=0) autoscaler.update() autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -283,8 +360,9 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner(fail_cmds=["cmd1"]) autoscaler = StandardAutoscaler( - config_path, max_failures=0, process_runner=runner, - verbose_updates=True, node_updater_cls=NodeUpdaterThread) + config_path, LoadMetrics(), max_failures=0, process_runner=runner, + verbose_updates=True, node_updater_cls=NodeUpdaterThread, + update_interval_s=0) autoscaler.update() autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -302,8 +380,9 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner() autoscaler = StandardAutoscaler( - config_path, max_failures=0, process_runner=runner, - verbose_updates=True, node_updater_cls=NodeUpdaterThread) + config_path, LoadMetrics(), max_failures=0, process_runner=runner, + verbose_updates=True, node_updater_cls=NodeUpdaterThread, + update_interval_s=0) autoscaler.update() autoscaler.update() self.assertEqual(len(self.provider.nodes({})), 2) @@ -315,12 +394,76 @@ class AutoscalingTest(unittest.TestCase): {TAG_RAY_NODE_STATUS: "Up-to-date"})) == 2) runner.calls = [] new_config = SMALL_CLUSTER.copy() - new_config["worker_init_commands"] = ["cmdX", "cmdY"] + new_config["worker_setup_commands"] = ["cmdX", "cmdY"] self.write_config(new_config) autoscaler.update() autoscaler.update() self.waitFor(lambda: len(runner.calls) > 0) + def testScaleUpBasedOnLoad(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 2 + config["max_workers"] = 10 + config["target_utilization_fraction"] = 0.5 + config_path = self.write_config(config) + self.provider = MockProvider() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, lm, max_failures=0, update_interval_s=0) + self.assertEqual(len(self.provider.nodes({})), 0) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 2) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 2) + + # Scales up as nodes are reported as used + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 4) + lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 0}) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 6) + + # Holds steady when load is removed + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}) + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}) + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 6) + + # Scales down as nodes become unused + lm.last_used_time_by_ip["172.0.0.0"] = 0 + lm.last_used_time_by_ip["172.0.0.1"] = 0 + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 4) + lm.last_used_time_by_ip["172.0.0.2"] = 0 + lm.last_used_time_by_ip["172.0.0.3"] = 0 + autoscaler.update() + self.assertEqual(len(self.provider.nodes({})), 2) + + def testRecoverUnhealthyWorkers(self): + config_path = self.write_config(SMALL_CLUSTER) + self.provider = MockProvider() + runner = MockProcessRunner() + lm = LoadMetrics() + autoscaler = StandardAutoscaler( + config_path, lm, max_failures=0, process_runner=runner, + verbose_updates=True, node_updater_cls=NodeUpdaterThread, + update_interval_s=0) + autoscaler.update() + for node in self.provider.mock_nodes.values(): + node.state = "running" + autoscaler.update() + self.waitFor( + lambda: len(self.provider.nodes( + {TAG_RAY_NODE_STATUS: "Up-to-date"})) == 2) + + # Mark a node as unhealthy + lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0 + num_calls = len(runner.calls) + autoscaler.update() + self.waitFor(lambda: len(runner.calls) > num_calls) + if __name__ == "__main__": unittest.main(verbosity=2)