mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 04:20:14 +08:00
autoscaler/monitor: Kill workers on exception (#4997)
This commit is contained in:
committed by
Philipp Moritz
parent
1a8d0af814
commit
49c6e81de2
@@ -10,6 +10,7 @@ import math
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import traceback
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
@@ -661,6 +662,20 @@ class StandardAutoscaler(object):
|
||||
return "{}/{} target nodes{}".format(
|
||||
len(nodes), self.target_num_workers(), suffix)
|
||||
|
||||
def kill_workers(self):
|
||||
logger.error("StandardAutoscaler: kill_workers triggered")
|
||||
|
||||
while True:
|
||||
try:
|
||||
nodes = self.workers()
|
||||
if nodes:
|
||||
self.provider.terminate_nodes(nodes)
|
||||
logger.error("StandardAutoscaler: terminated {} node(s)".format(len(nodes)))
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
def typename(v):
|
||||
if isinstance(v, type):
|
||||
|
||||
@@ -290,7 +290,7 @@ class Monitor(object):
|
||||
|
||||
self.gcs_flush_policy.record_flush()
|
||||
|
||||
def run(self):
|
||||
def _run(self):
|
||||
"""Run the monitor.
|
||||
|
||||
This function loops forever, checking for messages about dead database
|
||||
@@ -322,9 +322,13 @@ class Monitor(object):
|
||||
# messages.
|
||||
time.sleep(ray._config.heartbeat_timeout_milliseconds() * 1e-3)
|
||||
|
||||
# TODO(rkn): This infinite loop should be inside of a try/except block,
|
||||
# and if an exception is thrown we should push an error message to all
|
||||
# drivers.
|
||||
def run(self):
|
||||
try:
|
||||
self._run()
|
||||
except Exception:
|
||||
if self.autoscaler:
|
||||
self.autoscaler.kill_workers()
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user