[autoscaler]: Kill workers if the monitor raises an exception (#3977)

Co-authored-by: CJosephides <cjosephides@gmail.com>
This commit is contained in:
Daniel Edgecumbe
2020-01-23 20:12:52 +00:00
committed by Edward Oakes
parent cfbde39ba8
commit e516c50745
3 changed files with 67 additions and 11 deletions
+19 -9
View File
@@ -66,7 +66,8 @@ def _bootstrap_config(config):
return resolved_config
def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
def teardown_cluster(config_file, yes, workers_only, override_cluster_name,
keep_min_workers):
"""Destroys all nodes of a Ray cluster described by a config json."""
config = yaml.safe_load(open(config_file).read())
@@ -81,16 +82,25 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
try:
def remaining_nodes():
if workers_only:
A = []
else:
A = provider.non_terminated_nodes({
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD
})
A += provider.non_terminated_nodes({
workers = provider.non_terminated_nodes({
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
})
return A
if keep_min_workers:
min_workers = config.get("min_workers", 0)
logger.info("teardown_cluster: "
"Keeping {} nodes...".format(min_workers))
workers = random.sample(workers, len(workers) - min_workers)
if workers_only:
return workers
head = provider.non_terminated_nodes({
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD
})
return head + workers
# Loop here to check that both the head and worker nodes are actually
# really gone
+39
View File
@@ -15,6 +15,7 @@ import ray.utils
import ray.ray_constants as ray_constants
from ray.utils import (binary_to_hex, binary_to_object_id, binary_to_task_id,
hex_to_binary, setup_logger)
from ray.autoscaler.commands import teardown_cluster
logger = logging.getLogger(__name__)
@@ -48,8 +49,10 @@ class Monitor:
if autoscaling_config:
self.autoscaler = StandardAutoscaler(autoscaling_config,
self.load_metrics)
self.autoscaling_config = autoscaling_config
else:
self.autoscaler = None
self.autoscaling_config = None
# Experimental feature: GCS flushing.
self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ
@@ -353,6 +356,39 @@ class Monitor:
time.sleep(
ray._config.raylet_heartbeat_timeout_milliseconds() * 1e-3)
def destroy_autoscaler_workers(self):
"""Cleanup the autoscaler, in case of an exception in the run() method.
We kill the worker nodes, but retain the head node in order to keep
logs around, keeping costs minimal. This monitor process runs on the
head node anyway, so this is more reliable."""
if self.autoscaler is None:
return # Nothing to clean up.
if self.autoscaling_config is None:
# This is a logic error in the program. Can't do anything.
logger.error(
"Monitor: Cleanup failed due to lack of autoscaler config.")
return
logger.info("Monitor: Exception caught. Taking down workers...")
clean = False
while not clean:
try:
teardown_cluster(
config_file=self.autoscaling_config,
yes=True, # Non-interactive.
workers_only=True, # Retain head node for logs.
override_cluster_name=None,
keep_min_workers=True, # Retain minimal amount of workers.
)
clean = True
logger.info("Monitor: Workers taken down.")
except Exception:
logger.error("Monitor: Cleanup exception. Trying again...")
time.sleep(2)
def run(self):
try:
self._run()
@@ -412,6 +448,9 @@ if __name__ == "__main__":
try:
monitor.run()
except Exception as e:
# Take down autoscaler workers if necessary.
monitor.destroy_autoscaler_workers()
# Something went wrong, so push an error to all drivers.
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
+9 -2
View File
@@ -531,6 +531,11 @@ def create_or_update(cluster_config_file, min_workers, max_workers, no_restart,
is_flag=True,
default=False,
help="Only destroy the workers.")
@click.option(
"--keep-min-workers",
is_flag=True,
default=False,
help="Retain the minimal amount of workers specified in the config.")
@click.option(
"--yes",
"-y",
@@ -543,9 +548,11 @@ def create_or_update(cluster_config_file, min_workers, max_workers, no_restart,
required=False,
type=str,
help="Override the configured cluster name.")
def teardown(cluster_config_file, yes, workers_only, cluster_name):
def teardown(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers):
"""Tear down the Ray cluster."""
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name)
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers)
@cli.command()