diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index b30040e73..4b3f7992e 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -10,6 +10,7 @@ import math import os import subprocess import threading +import traceback import time from collections import defaultdict @@ -661,6 +662,20 @@ class StandardAutoscaler(object): return "{}/{} target nodes{}".format( len(nodes), self.target_num_workers(), suffix) + def kill_workers(self): + logger.error("StandardAutoscaler: kill_workers triggered") + + while True: + try: + nodes = self.workers() + if nodes: + self.provider.terminate_nodes(nodes) + logger.error("StandardAutoscaler: terminated {} node(s)".format(len(nodes))) + except Exception: + traceback.print_exc() + + time.sleep(10) + def typename(v): if isinstance(v, type): diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 35597ef23..b1b3df37d 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -290,7 +290,7 @@ class Monitor(object): self.gcs_flush_policy.record_flush() - def run(self): + def _run(self): """Run the monitor. This function loops forever, checking for messages about dead database @@ -322,9 +322,13 @@ class Monitor(object): # messages. time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3) - # TODO(rkn): This infinite loop should be inside of a try/except block, - # and if an exception is thrown we should push an error message to all - # drivers. + def run(self): + try: + self._run() + except Exception: + if self.autoscaler: + self.autoscaler.kill_workers() + raise if __name__ == "__main__":