From 2fdefe19b787b373f35b814208e67ee2eff5f0a1 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 11 Sep 2019 11:31:35 -0700 Subject: [PATCH] Take into account queue length in autoscaling (#5684) --- doc/source/autoscaling.rst | 10 +++--- python/ray/autoscaler/autoscaler.py | 28 +++++++-------- python/ray/log_monitor.py | 3 +- python/ray/monitor.py | 6 +++- python/ray/tests/test_autoscaler.py | 55 +++++++++++++++++------------ src/ray/raylet/scheduling_queue.cc | 7 ++-- 6 files changed, 63 insertions(+), 46 deletions(-) diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index f1b414407..d68fa1659 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -14,7 +14,7 @@ as described in `the boto docs `__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers `__. Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="localhost:6379")``. +SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="auto")``. .. code-block:: bash @@ -37,7 +37,7 @@ First, install the Google API client (``pip install google-api-python-client``), Then you're ready to go. The provided `ray/python/ray/autoscaler/gcp/example-full.yaml `__ cluster config file will create a small cluster with a n1-standard-2 head node (on-demand) configured to autoscale up to two n1-standard-2 `preemptible workers `__. Note that you'll need to fill in your project id in those templates. Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``. +SSH into the head node and then run Ray programs with ``ray.init(address="auto")``. .. code-block:: bash @@ -59,7 +59,7 @@ This is used when you have a list of machine IP addresses to connect in a Ray cl Be sure to specify the proper ``head_ip``, list of ``worker_ips``, and the ``ssh_user`` field. Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``. +SSH into the head node and then run Ray programs with ``ray.init(address="auto")``. .. code-block:: bash @@ -77,7 +77,7 @@ SSH into the head node and then run Ray programs with ``ray.init(address="localh Running commands on new and existing clusters --------------------------------------------- -You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="localhost:6379")``. +You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="auto")``. .. code-block:: bash @@ -261,7 +261,7 @@ with GPU worker nodes instead. .. code-block:: yaml - min_workers: 1 # must have at least 1 GPU worker (issue #2106) + min_workers: 0 # NOTE: older Ray versions may need 1+ GPU workers (#2106) max_workers: 10 head_node: InstanceType: m4.large diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 8873fec71..6e1e8c00c 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -10,7 +10,6 @@ import math import os import subprocess import threading -import traceback import time from collections import defaultdict @@ -157,9 +156,11 @@ class LoadMetrics(object): self.last_heartbeat_time_by_ip = {} self.static_resources_by_ip = {} self.dynamic_resources_by_ip = {} + self.resource_load_by_ip = {} self.local_ip = services.get_node_ip_address() - def update(self, ip, static_resources, dynamic_resources): + def update(self, ip, static_resources, dynamic_resources, resource_load): + self.resource_load_by_ip[ip] = resource_load self.static_resources_by_ip[ip] = static_resources # We are not guaranteed to have a corresponding dynamic resource for @@ -204,6 +205,7 @@ class LoadMetrics(object): prune(self.last_used_time_by_ip) prune(self.static_resources_by_ip) prune(self.dynamic_resources_by_ip) + prune(self.resource_load_by_ip) prune(self.last_heartbeat_time_by_ip) def approx_workers_used(self): @@ -218,7 +220,11 @@ class LoadMetrics(object): resources_total = {} for ip, max_resources in self.static_resources_by_ip.items(): avail_resources = self.dynamic_resources_by_ip[ip] + resource_load = self.resource_load_by_ip[ip] max_frac = 0.0 + for resource_id, amount in resource_load.items(): + if amount > 0: + max_frac = 1.0 # the resource is saturated for resource_id, amount in max_resources.items(): used = amount - avail_resources[resource_id] if resource_id not in resources_used: @@ -722,19 +728,11 @@ class StandardAutoscaler(object): def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") - - while True: - try: - nodes = self.workers() - if nodes: - self.provider.terminate_nodes(nodes) - logger.error( - "StandardAutoscaler: terminated {} node(s)".format( - len(nodes))) - except Exception: - traceback.print_exc() - - time.sleep(10) + nodes = self.workers() + if nodes: + self.provider.terminate_nodes(nodes) + logger.error("StandardAutoscaler: terminated {} node(s)".format( + len(nodes))) def typename(v): diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 37239950e..b0427b1d4 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -115,7 +115,8 @@ class LogMonitor(object): log_file_paths = glob.glob("{}/worker*[.out|.err]".format( self.logs_dir)) # segfaults and other serious errors are logged here - raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir)) + raylet_err_paths = (glob.glob("{}/raylet*.err".format(self.logs_dir)) + + glob.glob("{}/monitor*.err".format(self.logs_dir))) for file_path in log_file_paths + raylet_err_paths: if os.path.isfile( file_path) and file_path not in self.log_filenames: diff --git a/python/ray/monitor.py b/python/ray/monitor.py index c4531d8b3..12c30614f 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -108,6 +108,9 @@ class Monitor(object): message = ray.gcs_utils.HeartbeatBatchTableData.FromString( heartbeat_data) for heartbeat_message in message.batch: + resource_load = dict( + zip(heartbeat_message.resource_load_label, + heartbeat_message.resource_load_capacity)) total_resources = dict( zip(heartbeat_message.resources_total_label, heartbeat_message.resources_total_capacity)) @@ -122,7 +125,7 @@ class Monitor(object): ip = self.raylet_id_to_ip_map.get(client_id) if ip: self.load_metrics.update(ip, total_resources, - available_resources) + available_resources, resource_load) else: logger.warning( "Monitor: " @@ -357,6 +360,7 @@ class Monitor(object): try: self._run() except Exception: + logger.exception("Error in monitor loop") if self.autoscaler: self.autoscaler.kill_workers() raise diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 36d55e940..52dc87f18 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -142,29 +142,40 @@ SMALL_CLUSTER = { class LoadMetricsTest(unittest.TestCase): def testUpdate(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) assert lm.approx_workers_used() == 0.5 - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) assert lm.approx_workers_used() == 1.0 - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}) + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) + assert lm.approx_workers_used() == 2.0 + + def testLoadMessages(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) + assert lm.approx_workers_used() == 0.5 + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1}) + assert lm.approx_workers_used() == 1.0 + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) + assert lm.approx_workers_used() == 1.5 + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1}) assert lm.approx_workers_used() == 2.0 def testPruneByNodeIp(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}) - lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}) + lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {}) lm.prune_active_ips({"1.1.1.1", "4.4.4.4"}) assert lm.approx_workers_used() == 1.0 def testBottleneckResource(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) assert lm.approx_workers_used() == 1.88 def testHeartbeat(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) lm.mark_active("2.2.2.2") assert "1.1.1.1" in lm.last_heartbeat_time_by_ip assert "2.2.2.2" in lm.last_heartbeat_time_by_ip @@ -172,15 +183,15 @@ class LoadMetricsTest(unittest.TestCase): def testDebugString(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) lm.update("3.3.3.3", { "memory": 20, "object_store_memory": 40 }, { "memory": 0, "object_store_memory": 20 - }) + }, {}) debug = lm.info_string() assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, " "1.05 GiB/1.05 GiB memory, " @@ -418,8 +429,8 @@ class AutoscalingTest(unittest.TestCase): tag_filters={TAG_RAY_NODE_TYPE: "worker"}, ) addrs += head_ip for addr in addrs: - lm.update(addr, {"CPU": 2}, {"CPU": 0}) - lm.update(addr, {"CPU": 2}, {"CPU": 2}) + lm.update(addr, {"CPU": 2}, {"CPU": 0}, {}) + lm.update(addr, {"CPU": 2}, {"CPU": 2}, {}) assert autoscaler.bringup autoscaler.update() @@ -428,7 +439,7 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(1) # All of the nodes are down. Simulate some load on the head node - lm.update(head_ip, {"CPU": 2}, {"CPU": 0}) + lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {}) autoscaler.update() self.waitForNodes(6) # expected due to batch sizes and concurrency @@ -702,17 +713,17 @@ class AutoscalingTest(unittest.TestCase): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1 + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1 autoscaler.update() self.waitForNodes(3) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}) + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}) autoscaler.update() self.waitForNodes(5) # Holds steady when load is removed - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}) + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {}) + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}) autoscaler.update() assert autoscaler.num_launches_pending.value == 0 assert len(self.provider.non_terminated_nodes({})) == 5 @@ -746,20 +757,20 @@ class AutoscalingTest(unittest.TestCase): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head # 1.0 nodes used => target nodes = 2 => target workers = 1 autoscaler.update() self.waitForNodes(1) # Make new node idle, and never used. # Should hold steady as target is still 2. - lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}) + lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {}) lm.last_used_time_by_ip["172.0.0.0"] = 0 autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 1 # Reduce load on head => target nodes = 1 => target workers = 0 - lm.update(local_ip, {"CPU": 2}, {"CPU": 1}) + lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {}) autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 0 diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 5a0f91dd7..8e20aacfa 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -131,8 +131,11 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id, } ResourceSet SchedulingQueue::GetResourceLoad() const { - // TODO(atumanov): consider other types of tasks as part of load. - return ready_queue_->GetCurrentResourceLoad(); + auto load = ready_queue_->GetCurrentResourceLoad(); + // Also take into account infeasible tasks so they show up for autoscaling. + load.AddResources( + task_queues_[static_cast(TaskState::INFEASIBLE)]->GetCurrentResourceLoad()); + return load; } const std::unordered_set &SchedulingQueue::GetBlockedTaskIds() const {