diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 04dc8a7a8..e368c8b1e 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -66,7 +66,8 @@ def _bootstrap_config(config): return resolved_config -def teardown_cluster(config_file, yes, workers_only, override_cluster_name): +def teardown_cluster(config_file, yes, workers_only, override_cluster_name, + keep_min_workers): """Destroys all nodes of a Ray cluster described by a config json.""" config = yaml.safe_load(open(config_file).read()) @@ -81,16 +82,25 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): try: def remaining_nodes(): - if workers_only: - A = [] - else: - A = provider.non_terminated_nodes({ - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD - }) - A += provider.non_terminated_nodes({ + + workers = provider.non_terminated_nodes({ TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER }) - return A + + if keep_min_workers: + min_workers = config.get("min_workers", 0) + logger.info("teardown_cluster: " + "Keeping {} nodes...".format(min_workers)) + workers = random.sample(workers, len(workers) - min_workers) + + if workers_only: + return workers + + head = provider.non_terminated_nodes({ + TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD + }) + + return head + workers # Loop here to check that both the head and worker nodes are actually # really gone diff --git a/python/ray/monitor.py b/python/ray/monitor.py index ca4ed2974..d15a984f9 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -15,6 +15,7 @@ import ray.utils import ray.ray_constants as ray_constants from ray.utils import (binary_to_hex, binary_to_object_id, binary_to_task_id, hex_to_binary, setup_logger) +from ray.autoscaler.commands import teardown_cluster logger = logging.getLogger(__name__) @@ -48,8 +49,10 @@ class Monitor: if autoscaling_config: self.autoscaler = StandardAutoscaler(autoscaling_config, self.load_metrics) + self.autoscaling_config = autoscaling_config else: self.autoscaler = None + self.autoscaling_config = None # Experimental feature: GCS flushing. self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ @@ -353,6 +356,39 @@ class Monitor: time.sleep( ray._config.raylet_heartbeat_timeout_milliseconds() * 1e-3) + def destroy_autoscaler_workers(self): + """Cleanup the autoscaler, in case of an exception in the run() method. + + We kill the worker nodes, but retain the head node in order to keep + logs around, keeping costs minimal. This monitor process runs on the + head node anyway, so this is more reliable.""" + + if self.autoscaler is None: + return # Nothing to clean up. + + if self.autoscaling_config is None: + # This is a logic error in the program. Can't do anything. + logger.error( + "Monitor: Cleanup failed due to lack of autoscaler config.") + return + + logger.info("Monitor: Exception caught. Taking down workers...") + clean = False + while not clean: + try: + teardown_cluster( + config_file=self.autoscaling_config, + yes=True, # Non-interactive. + workers_only=True, # Retain head node for logs. + override_cluster_name=None, + keep_min_workers=True, # Retain minimal amount of workers. + ) + clean = True + logger.info("Monitor: Workers taken down.") + except Exception: + logger.error("Monitor: Cleanup exception. Trying again...") + time.sleep(2) + def run(self): try: self._run() @@ -412,6 +448,9 @@ if __name__ == "__main__": try: monitor.run() except Exception as e: + # Take down autoscaler workers if necessary. + monitor.destroy_autoscaler_workers() + # Something went wrong, so push an error to all drivers. redis_client = ray.services.create_redis_client( args.redis_address, password=args.redis_password) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 84bf8f211..02bdd948a 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -531,6 +531,11 @@ def create_or_update(cluster_config_file, min_workers, max_workers, no_restart, is_flag=True, default=False, help="Only destroy the workers.") +@click.option( + "--keep-min-workers", + is_flag=True, + default=False, + help="Retain the minimal amount of workers specified in the config.") @click.option( "--yes", "-y", @@ -543,9 +548,11 @@ def create_or_update(cluster_config_file, min_workers, max_workers, no_restart, required=False, type=str, help="Override the configured cluster name.") -def teardown(cluster_config_file, yes, workers_only, cluster_name): +def teardown(cluster_config_file, yes, workers_only, cluster_name, + keep_min_workers): """Tear down the Ray cluster.""" - teardown_cluster(cluster_config_file, yes, workers_only, cluster_name) + teardown_cluster(cluster_config_file, yes, workers_only, cluster_name, + keep_min_workers) @cli.command()