diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 3cdd76807..be1b5a144 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -116,7 +116,7 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): provider.cleanup() -def kill_node(config_file, yes, override_cluster_name): +def kill_node(config_file, yes, hard, override_cluster_name): """Kills a random Raylet worker.""" config = yaml.safe_load(open(config_file).read()) @@ -131,19 +131,21 @@ def kill_node(config_file, yes, override_cluster_name): nodes = provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) node = random.choice(nodes) logger.info("kill_node: Terminating worker {}".format(node)) + if hard: + provider.terminate_node(node) + else: + updater = NodeUpdaterThread( + node_id=node, + provider_config=config["provider"], + provider=provider, + auth_config=config["auth"], + cluster_name=config["cluster_name"], + file_mounts=config["file_mounts"], + initialization_commands=[], + setup_commands=[], + runtime_hash="") - updater = NodeUpdaterThread( - node_id=node, - provider_config=config["provider"], - provider=provider, - auth_config=config["auth"], - cluster_name=config["cluster_name"], - file_mounts=config["file_mounts"], - initialization_commands=[], - setup_commands=[], - runtime_hash="") - - _exec(updater, "ray stop", False, False) + _exec(updater, "ray stop", False, False) time.sleep(5) @@ -157,6 +159,13 @@ def kill_node(config_file, yes, override_cluster_name): return node_ip +def monitor_cluster(cluster_config_file, num_lines, override_cluster_name): + """Kills a random Raylet worker.""" + cmd = "tail -n {} -f /tmp/ray/session_*/logs/monitor*".format(num_lines) + exec_cluster(cluster_config_file, cmd, False, False, False, False, False, + override_cluster_name, None) + + def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, override_cluster_name): """Create the cluster head node, which in turn creates the workers.""" diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5a0529a51..4c8b0abba 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -12,8 +12,8 @@ import sys import ray.services as services from ray.autoscaler.commands import ( - attach_cluster, exec_cluster, create_or_update_cluster, rsync, - teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips) + attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster, + rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips) import ray.ray_constants as ray_constants import ray.utils @@ -494,16 +494,41 @@ def teardown(cluster_config_file, yes, workers_only, cluster_name): is_flag=True, default=False, help="Don't ask for confirmation.") +@click.option( + "--hard", + is_flag=True, + default=False, + help="Terminates the node via node provider (defaults to a 'soft kill'" + " which terminates Ray but does not actually delete the instances).") @click.option( "--cluster-name", "-n", required=False, type=str, help="Override the configured cluster name.") -def kill_random_node(cluster_config_file, yes, cluster_name): +def kill_random_node(cluster_config_file, yes, hard, cluster_name): """Kills a random Ray node. For testing purposes only.""" click.echo("Killed node with IP " + - kill_node(cluster_config_file, yes, cluster_name)) + kill_node(cluster_config_file, yes, hard, cluster_name)) + + +@cli.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.option( + "--lines", + required=False, + default=100, + type=int, + help="Number of lines to tail.") +@click.option( + "--cluster-name", + "-n", + required=False, + type=str, + help="Override the configured cluster name.") +def monitor(cluster_config_file, lines, cluster_name): + """Runs `tail -n [lines] -f /tmp/ray/session_*/logs/monitor*` on head.""" + monitor_cluster(cluster_config_file, lines, cluster_name) @cli.command()