From 06fec63c8734a414e6103697525ce7666e57bb7d Mon Sep 17 00:00:00 2001 From: Daniel Edgecumbe <45787862+ls-daniel@users.noreply.github.com> Date: Sat, 27 Jul 2019 01:14:45 +0100 Subject: [PATCH] [autoscaler] Add a 'request_cores' function for manual autoscaling (#4754) --- python/ray/autoscaler/autoscaler.py | 87 +++++++++++++++++++--- python/ray/autoscaler/aws/node_provider.py | 7 ++ python/ray/monitor.py | 25 +++++++ python/ray/ray_constants.py | 2 + python/ray/tests/test_autoscaler.py | 27 +++++++ 5 files changed, 137 insertions(+), 11 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index c9cf46ba9..bef222450 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -17,6 +17,7 @@ from collections import defaultdict import numpy as np import ray.services as services import yaml +from ray.worker import global_worker from ray.autoscaler.docker import dockerize_if_needed from ray.autoscaler.node_provider import get_node_provider, \ get_default_config @@ -26,7 +27,8 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, from ray.autoscaler.updater import NodeUpdaterThread from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ - AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S + AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \ + AUTOSCALER_RESOURCE_REQUEST_CHANNEL from six import string_types from six.moves import queue @@ -159,6 +161,7 @@ class LoadMetrics(object): def update(self, ip, static_resources, dynamic_resources): self.static_resources_by_ip[ip] = static_resources + # We are not guaranteed to have a corresponding dynamic resource for # every static resource because dynamic resources are based on the # available resources in the heartbeat, which does not exist if it is @@ -406,6 +409,8 @@ class StandardAutoscaler(object): for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) + self.resource_requests = defaultdict(int) + logger.info("StandardAutoscaler: {}".format(self.config)) def update(self): @@ -432,11 +437,16 @@ class StandardAutoscaler(object): self.last_update_time = now num_pending = self.num_launches_pending.value nodes = self.workers() - self.log_info_string(nodes) self.load_metrics.prune_active_ips( [self.provider.internal_ip(node_id) for node_id in nodes]) target_workers = self.target_num_workers() + if len(nodes) >= target_workers: + if "CPU" in self.resource_requests: + del self.resource_requests["CPU"] + + self.log_info_string(nodes, target_workers) + # Terminate any idle or out of date nodes last_used = self.load_metrics.last_used_time_by_ip horizon = now - (60 * self.config["idle_timeout_minutes"]) @@ -457,7 +467,7 @@ class StandardAutoscaler(object): if nodes_to_terminate: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes) + self.log_info_string(nodes, target_workers) # Terminate nodes if there are too many nodes_to_terminate = [] @@ -470,20 +480,22 @@ class StandardAutoscaler(object): if nodes_to_terminate: self.provider.terminate_nodes(nodes_to_terminate) nodes = self.workers() - self.log_info_string(nodes) + self.log_info_string(nodes, target_workers) # Launch new nodes if needed num_workers = len(nodes) + num_pending if num_workers < target_workers: max_allowed = min(self.max_launch_batch, self.max_concurrent_launches - num_pending) + num_launches = min(max_allowed, target_workers - num_workers) self.launch_new_node(num_launches) nodes = self.workers() - self.log_info_string(nodes) + self.log_info_string(nodes, target_workers) elif self.load_metrics.num_workers_connected() >= target_workers: logger.info("Ending bringup phase") self.bringup = False + self.log_info_string(nodes, target_workers) # Process any completed updates completed = [] @@ -501,7 +513,7 @@ class StandardAutoscaler(object): # immediately trying to restart Ray on the new node. self.load_metrics.mark_active(self.provider.internal_ip(node_id)) nodes = self.workers() - self.log_info_string(nodes) + self.log_info_string(nodes, target_workers) # Update nodes with out-of-date files T = [ @@ -556,6 +568,20 @@ class StandardAutoscaler(object): # If we want any workers, we want at least initial_workers ideal_num_workers = max(ideal_num_workers, initial_workers) + # Other resources are not supported at present. + if "CPU" in self.resource_requests: + try: + cores_per_worker = self.config["worker_nodes"]["Resources"][ + "CPU"] + except KeyError: + cores_per_worker = 1 # Assume the worst + + cores_desired = self.resource_requests["CPU"] + + ideal_num_workers = max( + ideal_num_workers, + int(np.ceil(cores_desired / cores_per_worker))) + return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) @@ -659,11 +685,12 @@ class StandardAutoscaler(object): return self.provider.non_terminated_nodes( tag_filters={TAG_RAY_NODE_TYPE: "worker"}) - def log_info_string(self, nodes): - logger.info("StandardAutoscaler: {}".format(self.info_string(nodes))) + def log_info_string(self, nodes, target): + logger.info("StandardAutoscaler: {}".format( + self.info_string(nodes, target))) logger.info("LoadMetrics: {}".format(self.load_metrics.info_string())) - def info_string(self, nodes): + def info_string(self, nodes, target): suffix = "" if self.num_launches_pending: suffix += " ({} pending)".format(self.num_launches_pending.value) @@ -675,8 +702,15 @@ class StandardAutoscaler(object): if self.bringup: suffix += " (bringup=True)" - return "{}/{} target nodes{}".format( - len(nodes), self.target_num_workers(), suffix) + return "{}/{} target nodes{}".format(len(nodes), target, suffix) + + def request_resources(self, resources): + for resource, count in resources.items(): + self.resource_requests[resource] = max( + self.resource_requests[resource], count) + + logger.info("StandardAutoscaler: resource_requests={}".format( + self.resource_requests)) def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") @@ -824,3 +858,34 @@ def hash_runtime_conf(file_mounts, extra_objs): _hash_cache[conf_str] = hasher.hexdigest() return _hash_cache[conf_str] + + +def request_resources(num_cpus=None, num_gpus=None): + """Remotely request some CPU or GPU resources from the autoscaler. + + This function is to be called e.g. on a node before submitting a bunch of + ray.remote calls to ensure that resources rapidly become available. + + In the future this could be extended to do GPU cores or other custom + resources. + + This function is non blocking. + + Args: + + num_cpus: int -- the number of CPU cores to request + num_gpus: int -- the number of GPUs to request (Not implemented) + + """ + if num_gpus is not None: + raise NotImplementedError( + "GPU resource is not yet supported through request_resources") + r = services.create_redis_client( + global_worker.node.redis_address, + password=global_worker.node.redis_password) + assert isinstance(num_cpus, int) + if num_cpus > 0: + r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, + json.dumps({ + "CPU": num_cpus + })) diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index adfeca5a9..387c250c9 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -173,6 +173,13 @@ class AWSNodeProvider(NodeProvider): def create_node(self, node_config, tags, count): tags = to_aws_format(tags) conf = node_config.copy() + + # Delete unsupported keys from the node config + try: + del conf["Resources"] + except KeyError: + pass + tag_pairs = [{ "Key": TAG_RAY_CLUSTER_NAME, "Value": self.cluster_name, diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 3c9810089..2abd512c3 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -7,6 +7,7 @@ import logging import os import time import traceback +import json import redis @@ -212,6 +213,23 @@ class Monitor(object): binary_to_hex(job_id))) self._xray_clean_up_entries_for_job(job_id) + def autoscaler_resource_request_handler(self, _, data): + """Handle a notification of a resource request for the autoscaler. + + Args: + channel: unused + data: a resource request as JSON, e.g. {"CPU": 1} + """ + + if not self.autoscaler: + return + + try: + self.autoscaler.request_resources(json.loads(data)) + except Exception: + # We don't want this to kill the monitor. + traceback.print_exc() + def process_messages(self, max_messages=10000): """Process all messages ready in the subscription channels. @@ -241,6 +259,9 @@ class Monitor(object): elif channel == ray.gcs_utils.XRAY_JOB_CHANNEL: # Handles driver death. message_handler = self.xray_job_notification_handler + elif (channel == + ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL): + message_handler = self.autoscaler_resource_request_handler else: raise Exception("This code should be unreachable.") @@ -307,6 +328,10 @@ class Monitor(object): self.subscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL) self.subscribe(ray.gcs_utils.XRAY_JOB_CHANNEL) + if self.autoscaler: + self.subscribe( + ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL) + # TODO(rkn): If there were any dead clients at startup, we should clean # up the associated state in the state tables. diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 6fd7d5b64..6bc744009 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -125,3 +125,5 @@ LOG_MONITOR_MAX_OPEN_FILES = 200 # A constant used as object metadata to indicate the object is raw binary. RAW_BUFFER_METADATA = b"RAW" + +AUTOSCALER_RESOURCE_REQUEST_CHANNEL = b"autoscaler_resource_request" diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index c0ecafc5c..dfb88a0bd 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -275,6 +275,33 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) + def testManualAutoscaling(self): + config = SMALL_CLUSTER.copy() + config["min_workers"] = 0 + config["max_workers"] = 50 + cores_per_node = 2 + config["worker_nodes"] = {"Resources": {"CPU": cores_per_node}} + config_path = self.write_config(config) + self.provider = MockProvider() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_launch_batch=5, + max_concurrent_launches=5, + max_failures=0, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(0) + autoscaler.request_resources({"CPU": cores_per_node * 10}) + for _ in range(3): # Maximum launch batch is 5 + autoscaler.update() + self.waitForNodes(10) + autoscaler.request_resources({"CPU": cores_per_node * 30}) + for _ in range(4): # Maximum launch batch is 5 + autoscaler.update() + self.waitForNodes(30) + def testTerminateOutdatedNodesGracefully(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 5