diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 9c6372529..602d28f1f 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -11,6 +11,7 @@ import time import sys import click import logging +import random import yaml try: # py3 @@ -94,6 +95,35 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) +def kill_node(config_file, yes, override_cluster_name): + """Kills a random Raylet worker.""" + + config = yaml.load(open(config_file).read()) + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name + config = _bootstrap_config(config) + + confirm("This will kill a node in your cluster", yes) + + provider = get_node_provider(config["provider"], config["cluster_name"]) + nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + node = random.choice(nodes) + logger.info("Terminating worker {}".format(node)) + updater = NodeUpdaterProcess( + node, + config["provider"], + config["auth"], + config["cluster_name"], + config["file_mounts"], [], + "", + redirect_output=False) + + _exec(updater, "ray stop", False, False) + + time.sleep(5) + return provider.external_ip(node) + + def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, override_cluster_name): """Create the cluster head node, which in turn creates the workers.""" @@ -343,6 +373,17 @@ def get_head_node_ip(config_file, override_cluster_name): return provider.external_ip(head_node) +def get_worker_node_ips(config_file, override_cluster_name): + """Returns worker node IPs for given configuration file.""" + + config = yaml.load(open(config_file).read()) + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name + provider = get_node_provider(config["provider"], config["cluster_name"]) + nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + return [provider.external_ip(node) for node in nodes] + + def _get_head_node(config, config_file, override_cluster_name, diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 50de6a467..207659fd1 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -9,9 +9,9 @@ import os import subprocess import ray.services as services -from ray.autoscaler.commands import (attach_cluster, exec_cluster, - create_or_update_cluster, rsync, - teardown_cluster, get_head_node_ip) +from ray.autoscaler.commands import ( + attach_cluster, exec_cluster, create_or_update_cluster, rsync, + teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips) import ray.ray_constants as ray_constants import ray.utils @@ -274,8 +274,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, # Get the node IP address if one is not provided. ray_params.update_if_absent( node_ip_address=services.get_node_ip_address()) - logger.info("Using IP address {} for this node." - .format(ray_params.node_ip_address)) + logger.info("Using IP address {} for this node.".format( + ray_params.node_ip_address)) ray_params.update_if_absent( redis_port=redis_port, redis_shard_ports=redis_shard_ports, @@ -342,8 +342,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, # Get the node IP address if one is not provided. ray_params.update_if_absent( node_ip_address=services.get_node_ip_address(redis_address)) - logger.info("Using IP address {} for this node." - .format(ray_params.node_ip_address)) + logger.info("Using IP address {} for this node.".format( + ray_params.node_ip_address)) # Check that there aren't already Redis clients with the same IP # address connected with this Redis instance. This raises an exception # if the Redis server already has clients on this node. @@ -456,6 +456,7 @@ def stop(): help="Don't ask for confirmation.") def create_or_update(cluster_config_file, min_workers, max_workers, no_restart, restart_only, yes, cluster_name): + """Create or update a Ray cluster.""" if restart_only or no_restart: assert restart_only != no_restart, "Cannot set both 'restart_only' " \ "and 'no_restart' at the same time!" @@ -483,9 +484,30 @@ def create_or_update(cluster_config_file, min_workers, max_workers, no_restart, type=str, help="Override the configured cluster name.") def teardown(cluster_config_file, yes, workers_only, cluster_name): + """Tear down the Ray cluster.""" teardown_cluster(cluster_config_file, yes, workers_only, cluster_name) +@cli.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.option( + "--yes", + "-y", + is_flag=True, + default=False, + help="Don't ask for confirmation.") +@click.option( + "--cluster-name", + "-n", + required=False, + type=str, + help="Override the configured cluster name.") +def kill_random_node(cluster_config_file, yes, cluster_name): + """Kills a random Ray node. For testing purposes only.""" + click.echo("Killed node with IP " + + kill_node(cluster_config_file, yes, cluster_name)) + + @cli.command() @click.argument("cluster_config_file", required=True, type=str) @click.option( @@ -664,6 +686,19 @@ def get_head_ip(cluster_config_file, cluster_name): click.echo(get_head_node_ip(cluster_config_file, cluster_name)) +@cli.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.option( + "--cluster-name", + "-n", + required=False, + type=str, + help="Override the configured cluster name.") +def get_worker_ips(cluster_config_file, cluster_name): + worker_ips = get_worker_node_ips(cluster_config_file, cluster_name) + click.echo("\n".join(worker_ips)) + + @cli.command() def stack(): COMMAND = """ @@ -700,7 +735,9 @@ cli.add_command(rsync_up, name="rsync_up") cli.add_command(submit) cli.add_command(teardown) cli.add_command(teardown, name="down") +cli.add_command(kill_random_node) cli.add_command(get_head_ip, name="get_head_ip") +cli.add_command(get_worker_ips) cli.add_command(stack)