From 2d1f52c21c67b71751d5cab2c32f1cb690ab651e Mon Sep 17 00:00:00 2001 From: Gekho457 <62982571+Gekho457@users.noreply.github.com> Date: Thu, 22 Oct 2020 17:46:49 -0400 Subject: [PATCH] [autoscaler] Removed .cleanup() from NodeProvider and commands.py (#11543) --- python/ray/autoscaler/_private/commands.py | 820 ++++++++++----------- python/ray/autoscaler/node_provider.py | 4 - 2 files changed, 390 insertions(+), 434 deletions(-) diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 4a235394f..8074cd048 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -323,87 +323,82 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, logger, "Ignoring error attempting a clean shutdown.") provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - def remaining_nodes(): - workers = provider.non_terminated_nodes({ - TAG_RAY_NODE_KIND: NODE_KIND_WORKER - }) + def remaining_nodes(): + workers = provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + }) - if keep_min_workers: - min_workers = config.get("min_workers", 0) + if keep_min_workers: + min_workers = config.get("min_workers", 0) - cli_logger.print( - "{} random worker nodes will not be shut down. " + - cf.dimmed("(due to {})"), cf.bold(min_workers), - cf.bold("--keep-min-workers")) - cli_logger.old_info(logger, - "teardown_cluster: Keeping {} nodes...", - min_workers) + cli_logger.print( + "{} random worker nodes will not be shut down. " + + cf.dimmed("(due to {})"), cf.bold(min_workers), + cf.bold("--keep-min-workers")) + cli_logger.old_info( + logger, "teardown_cluster: Keeping {} nodes...", min_workers) - workers = random.sample(workers, len(workers) - min_workers) + workers = random.sample(workers, len(workers) - min_workers) - # todo: it's weird to kill the head node but not all workers - if workers_only: - cli_logger.print( - "The head node will not be shut down. " + - cf.dimmed("(due to {})"), cf.bold("--workers-only")) + # todo: it's weird to kill the head node but not all workers + if workers_only: + cli_logger.print( + "The head node will not be shut down. " + + cf.dimmed("(due to {})"), cf.bold("--workers-only")) - return workers + return workers - head = provider.non_terminated_nodes({ - TAG_RAY_NODE_KIND: NODE_KIND_HEAD - }) + head = provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_HEAD + }) - return head + workers + return head + workers - def run_docker_stop(node, container_name): - try: - exec_cluster( - config_file, - cmd=f"docker stop {container_name}", - run_env="host", - screen=False, - tmux=False, - stop=False, - start=False, - override_cluster_name=override_cluster_name, - port_forward=None, - with_output=False) - except Exception: - cli_logger.warning(f"Docker stop failed on {node}") - cli_logger.old_warning(logger, f"Docker stop failed on {node}") + def run_docker_stop(node, container_name): + try: + exec_cluster( + config_file, + cmd=f"docker stop {container_name}", + run_env="host", + screen=False, + tmux=False, + stop=False, + start=False, + override_cluster_name=override_cluster_name, + port_forward=None, + with_output=False) + except Exception: + cli_logger.warning(f"Docker stop failed on {node}") + cli_logger.old_warning(logger, f"Docker stop failed on {node}") - # Loop here to check that both the head and worker nodes are actually - # really gone - A = remaining_nodes() + # Loop here to check that both the head and worker nodes are actually + # really gone + A = remaining_nodes() - container_name = config.get("docker", {}).get("container_name") - if container_name: - for node in A: - run_docker_stop(node, container_name) + container_name = config.get("docker", {}).get("container_name") + if container_name: + for node in A: + run_docker_stop(node, container_name) - with LogTimer("teardown_cluster: done."): - while A: - cli_logger.old_info( - logger, "teardown_cluster: " - "Shutting down {} nodes...", len(A)) + with LogTimer("teardown_cluster: done."): + while A: + cli_logger.old_info( + logger, "teardown_cluster: " + "Shutting down {} nodes...", len(A)) - provider.terminate_nodes(A) + provider.terminate_nodes(A) - cli_logger.print( - "Requested {} nodes to shut down.", - cf.bold(len(A)), - _tags=dict(interval="1s")) + cli_logger.print( + "Requested {} nodes to shut down.", + cf.bold(len(A)), + _tags=dict(interval="1s")) - time.sleep( - POLL_INTERVAL) # todo: interval should be a variable - A = remaining_nodes() - cli_logger.print("{} nodes remaining after {} second(s).", - cf.bold(len(A)), POLL_INTERVAL) - cli_logger.success("No nodes remaining.") - finally: - provider.cleanup() + time.sleep(POLL_INTERVAL) # todo: interval should be a variable + A = remaining_nodes() + cli_logger.print("{} nodes remaining after {} second(s).", + cf.bold(len(A)), POLL_INTERVAL) + cli_logger.success("No nodes remaining.") def kill_node(config_file: str, yes: bool, hard: bool, @@ -419,41 +414,38 @@ def kill_node(config_file: str, yes: bool, hard: bool, cli_logger.old_confirm("This will kill a node in your cluster", yes) provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - nodes = provider.non_terminated_nodes({ - TAG_RAY_NODE_KIND: NODE_KIND_WORKER - }) - node = random.choice(nodes) - cli_logger.print("Shutdown " + cf.bold("{}"), node) - cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node) - if hard: - provider.terminate_node(node) - else: - updater = NodeUpdaterThread( - node_id=node, - provider_config=config["provider"], - provider=provider, - auth_config=config["auth"], - cluster_name=config["cluster_name"], - file_mounts=config["file_mounts"], - initialization_commands=[], - setup_commands=[], - ray_start_commands=[], - runtime_hash="", - file_mounts_contents_hash="", - is_head_node=False, - docker_config=config.get("docker")) + nodes = provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + }) + node = random.choice(nodes) + cli_logger.print("Shutdown " + cf.bold("{}"), node) + cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node) + if hard: + provider.terminate_node(node) + else: + updater = NodeUpdaterThread( + node_id=node, + provider_config=config["provider"], + provider=provider, + auth_config=config["auth"], + cluster_name=config["cluster_name"], + file_mounts=config["file_mounts"], + initialization_commands=[], + setup_commands=[], + ray_start_commands=[], + runtime_hash="", + file_mounts_contents_hash="", + is_head_node=False, + docker_config=config.get("docker")) - _exec(updater, "ray stop", False, False) + _exec(updater, "ray stop", False, False) - time.sleep(POLL_INTERVAL) + time.sleep(POLL_INTERVAL) - if config.get("provider", {}).get("use_internal_ips", False) is True: - node_ip = provider.internal_ip(node) - else: - node_ip = provider.external_ip(node) - finally: - provider.cleanup() + if config.get("provider", {}).get("use_internal_ips", False) is True: + node_ip = provider.internal_ip(node) + else: + node_ip = provider.external_ip(node) return node_ip @@ -513,255 +505,245 @@ def get_or_create_head_node(config: Dict[str, Any], config = copy.deepcopy(config) config_file = os.path.abspath(config_file) - try: - head_node_tags = { - TAG_RAY_NODE_KIND: NODE_KIND_HEAD, - } - nodes = provider.non_terminated_nodes(head_node_tags) - if len(nodes) > 0: - head_node = nodes[0] - else: - head_node = None + head_node_tags = { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + } + nodes = provider.non_terminated_nodes(head_node_tags) + if len(nodes) > 0: + head_node = nodes[0] + else: + head_node = None - if not head_node: + if not head_node: + cli_logger.confirm( + yes, + "No head node found. " + "Launching a new cluster.", + _abort=True) + cli_logger.old_confirm("This will create a new cluster", yes) + elif not no_restart: + cli_logger.old_confirm("This will restart cluster services", yes) + + if head_node: + if restart_only: cli_logger.confirm( yes, - "No head node found. " - "Launching a new cluster.", + "Updating cluster configuration and " + "restarting the cluster Ray runtime. " + "Setup commands will not be run due to `{}`.\n", + cf.bold("--restart-only"), _abort=True) - cli_logger.old_confirm("This will create a new cluster", yes) - elif not no_restart: - cli_logger.old_confirm("This will restart cluster services", yes) + elif no_restart: + cli_logger.print( + "Cluster Ray runtime will not be restarted due " + "to `{}`.", cf.bold("--no-restart")) + cli_logger.confirm( + yes, + "Updating cluster configuration and " + "running setup commands.", + _abort=True) + else: + cli_logger.print( + "Updating cluster configuration and running full setup.") + cli_logger.confirm( + yes, + cf.bold("Cluster Ray runtime will be restarted."), + _abort=True) + cli_logger.newline() - if head_node: - if restart_only: - cli_logger.confirm( - yes, - "Updating cluster configuration and " - "restarting the cluster Ray runtime. " - "Setup commands will not be run due to `{}`.\n", - cf.bold("--restart-only"), - _abort=True) - elif no_restart: + # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync) + head_node_config = copy.deepcopy(config["head_node"]) + if "head_node_type" in config: + head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"] + head_node_config.update(config["available_node_types"][config[ + "head_node_type"]]["node_config"]) + + launch_hash = hash_launch_conf(head_node_config, config["auth"]) + if head_node is None or provider.node_tags(head_node).get( + TAG_RAY_LAUNCH_CONFIG) != launch_hash: + with cli_logger.group("Acquiring an up-to-date head node"): + if head_node is not None: cli_logger.print( - "Cluster Ray runtime will not be restarted due " - "to `{}`.", cf.bold("--no-restart")) - cli_logger.confirm( - yes, - "Updating cluster configuration and " - "running setup commands.", - _abort=True) - else: + "Currently running head node is out-of-date with " + "cluster configuration") cli_logger.print( - "Updating cluster configuration and running full setup.") - cli_logger.confirm( - yes, - cf.bold("Cluster Ray runtime will be restarted."), - _abort=True) - cli_logger.newline() - - # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync) - head_node_config = copy.deepcopy(config["head_node"]) - if "head_node_type" in config: - head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"] - head_node_config.update(config["available_node_types"][config[ - "head_node_type"]]["node_config"]) - - launch_hash = hash_launch_conf(head_node_config, config["auth"]) - if head_node is None or provider.node_tags(head_node).get( - TAG_RAY_LAUNCH_CONFIG) != launch_hash: - with cli_logger.group("Acquiring an up-to-date head node"): - if head_node is not None: - cli_logger.print( - "Currently running head node is out-of-date with " - "cluster configuration") - cli_logger.print( - "hash is {}, expected {}", - cf.bold( - provider.node_tags(head_node) - .get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash)) - cli_logger.confirm(yes, "Relaunching it.", _abort=True) - cli_logger.old_confirm( - "Head node config out-of-date. It will be terminated", - yes) - - cli_logger.old_info( - logger, "get_or_create_head_node: " - "Shutting down outdated head node {}", head_node) - - provider.terminate_node(head_node) - cli_logger.print("Terminated head node {}", head_node) + "hash is {}, expected {}", + cf.bold( + provider.node_tags(head_node) + .get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash)) + cli_logger.confirm(yes, "Relaunching it.", _abort=True) + cli_logger.old_confirm( + "Head node config out-of-date. It will be terminated", yes) cli_logger.old_info( - logger, - "get_or_create_head_node: Launching new head node...") - - head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash - head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( - config["cluster_name"]) - provider.create_node(head_node_config, head_node_tags, 1) - cli_logger.print("Launched a new head node") - - start = time.time() - head_node = None - with cli_logger.group("Fetching the new head node"): - while True: - if time.time() - start > 50: - cli_logger.abort( - "Head node fetch timed out.") # todo: msg - raise RuntimeError("Failed to create head node.") - nodes = provider.non_terminated_nodes(head_node_tags) - if len(nodes) == 1: - head_node = nodes[0] - break - time.sleep(POLL_INTERVAL) - cli_logger.newline() - - with cli_logger.group( - "Setting up head node", - _numbered=("<>", 1, 1), - # cf.bold(provider.node_tags(head_node)[TAG_RAY_NODE_NAME]), - _tags=dict()): # add id, ARN to tags? - - # TODO(ekl) right now we always update the head node even if the - # hash matches. - # We could prompt the user for what they want to do here. - # No need to pass in cluster_sync_files because we use this - # hash to set up the head node - (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf( - config["file_mounts"], None, config) - - cli_logger.old_info( - logger, - "get_or_create_head_node: Updating files on head node...") - - # Rewrite the auth config so that the head - # node can update the workers - remote_config = copy.deepcopy(config) - - # drop proxy options if they exist, otherwise - # head node won't be able to connect to workers - remote_config["auth"].pop("ssh_proxy_command", None) - - if "ssh_private_key" in config["auth"]: - remote_key_path = "~/ray_bootstrap_key.pem" - remote_config["auth"]["ssh_private_key"] = remote_key_path - - # Adjust for new file locations - new_mounts = {} - for remote_path in config["file_mounts"]: - new_mounts[remote_path] = remote_path - remote_config["file_mounts"] = new_mounts - remote_config["no_restart"] = no_restart - - remote_config = provider.prepare_for_head_node(remote_config) - - # Now inject the rewritten config and SSH key into the head node - remote_config_file = tempfile.NamedTemporaryFile( - "w", prefix="ray-bootstrap-") - remote_config_file.write(json.dumps(remote_config)) - remote_config_file.flush() - config["file_mounts"].update({ - "~/ray_bootstrap_config.yaml": remote_config_file.name - }) - - if "ssh_private_key" in config["auth"]: - config["file_mounts"].update({ - remote_key_path: config["auth"]["ssh_private_key"], - }) - cli_logger.print("Prepared bootstrap config") - - if restart_only: - setup_commands = [] - ray_start_commands = config["head_start_ray_commands"] - elif no_restart: - setup_commands = config["head_setup_commands"] - ray_start_commands = [] - else: - setup_commands = config["head_setup_commands"] - ray_start_commands = config["head_start_ray_commands"] - - if not no_restart: - warn_about_bad_start_command(ray_start_commands) - - updater = NodeUpdaterThread( - node_id=head_node, - provider_config=config["provider"], - provider=provider, - auth_config=config["auth"], - cluster_name=config["cluster_name"], - file_mounts=config["file_mounts"], - initialization_commands=config["initialization_commands"], - setup_commands=setup_commands, - ray_start_commands=ray_start_commands, - process_runner=_runner, - runtime_hash=runtime_hash, - file_mounts_contents_hash=file_mounts_contents_hash, - is_head_node=True, - rsync_options={ - "rsync_exclude": config.get("rsync_exclude"), - "rsync_filter": config.get("rsync_filter") - }, - docker_config=config.get("docker")) - updater.start() - updater.join() - - # Refresh the node cache so we see the external ip if available - provider.non_terminated_nodes(head_node_tags) - - if config.get("provider", {}).get("use_internal_ips", - False) is True: - head_node_ip = provider.internal_ip(head_node) - else: - head_node_ip = provider.external_ip(head_node) - - if updater.exitcode != 0: - # todo: this does not follow the mockup and is not good enough - cli_logger.abort("Failed to setup head node.") - - cli_logger.old_error( logger, "get_or_create_head_node: " - "Updating {} failed", head_node_ip) - sys.exit(1) + "Shutting down outdated head node {}", head_node) + + provider.terminate_node(head_node) + cli_logger.print("Terminated head node {}", head_node) cli_logger.old_info( - logger, "get_or_create_head_node: " - "Head node up-to-date, IP address is: {}", head_node_ip) + logger, "get_or_create_head_node: Launching new head node...") - monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*" - if override_cluster_name: - modifiers = " --cluster-name={}".format( - quote(override_cluster_name)) + head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash + head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( + config["cluster_name"]) + provider.create_node(head_node_config, head_node_tags, 1) + cli_logger.print("Launched a new head node") + + start = time.time() + head_node = None + with cli_logger.group("Fetching the new head node"): + while True: + if time.time() - start > 50: + cli_logger.abort( + "Head node fetch timed out.") # todo: msg + raise RuntimeError("Failed to create head node.") + nodes = provider.non_terminated_nodes(head_node_tags) + if len(nodes) == 1: + head_node = nodes[0] + break + time.sleep(POLL_INTERVAL) + cli_logger.newline() + + with cli_logger.group( + "Setting up head node", + _numbered=("<>", 1, 1), + # cf.bold(provider.node_tags(head_node)[TAG_RAY_NODE_NAME]), + _tags=dict()): # add id, ARN to tags? + + # TODO(ekl) right now we always update the head node even if the + # hash matches. + # We could prompt the user for what they want to do here. + # No need to pass in cluster_sync_files because we use this + # hash to set up the head node + (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf( + config["file_mounts"], None, config) + + cli_logger.old_info( + logger, "get_or_create_head_node: Updating files on head node...") + + # Rewrite the auth config so that the head + # node can update the workers + remote_config = copy.deepcopy(config) + + # drop proxy options if they exist, otherwise + # head node won't be able to connect to workers + remote_config["auth"].pop("ssh_proxy_command", None) + + if "ssh_private_key" in config["auth"]: + remote_key_path = "~/ray_bootstrap_key.pem" + remote_config["auth"]["ssh_private_key"] = remote_key_path + + # Adjust for new file locations + new_mounts = {} + for remote_path in config["file_mounts"]: + new_mounts[remote_path] = remote_path + remote_config["file_mounts"] = new_mounts + remote_config["no_restart"] = no_restart + + remote_config = provider.prepare_for_head_node(remote_config) + + # Now inject the rewritten config and SSH key into the head node + remote_config_file = tempfile.NamedTemporaryFile( + "w", prefix="ray-bootstrap-") + remote_config_file.write(json.dumps(remote_config)) + remote_config_file.flush() + config["file_mounts"].update({ + "~/ray_bootstrap_config.yaml": remote_config_file.name + }) + + if "ssh_private_key" in config["auth"]: + config["file_mounts"].update({ + remote_key_path: config["auth"]["ssh_private_key"], + }) + cli_logger.print("Prepared bootstrap config") + + if restart_only: + setup_commands = [] + ray_start_commands = config["head_start_ray_commands"] + elif no_restart: + setup_commands = config["head_setup_commands"] + ray_start_commands = [] else: - modifiers = "" + setup_commands = config["head_setup_commands"] + ray_start_commands = config["head_start_ray_commands"] - if cli_logger.old_style: - print("To monitor autoscaling activity, you can run:\n\n" - " ray exec {} {}{}\n".format(config_file, - quote(monitor_str), modifiers)) - print("To open a console on the cluster:\n\n" - " ray attach {}{}\n".format(config_file, modifiers)) + if not no_restart: + warn_about_bad_start_command(ray_start_commands) - print("To get a remote shell to the cluster manually, run:\n\n" - " {}\n".format( - updater.cmd_runner.remote_shell_command_str())) + updater = NodeUpdaterThread( + node_id=head_node, + provider_config=config["provider"], + provider=provider, + auth_config=config["auth"], + cluster_name=config["cluster_name"], + file_mounts=config["file_mounts"], + initialization_commands=config["initialization_commands"], + setup_commands=setup_commands, + ray_start_commands=ray_start_commands, + process_runner=_runner, + runtime_hash=runtime_hash, + file_mounts_contents_hash=file_mounts_contents_hash, + is_head_node=True, + rsync_options={ + "rsync_exclude": config.get("rsync_exclude"), + "rsync_filter": config.get("rsync_filter") + }, + docker_config=config.get("docker")) + updater.start() + updater.join() - cli_logger.newline() - with cli_logger.group("Useful commands"): - cli_logger.print("Monitor autoscaling with") - cli_logger.print( - cf.bold(" ray exec {}{} {}"), config_file, modifiers, - quote(monitor_str)) + # Refresh the node cache so we see the external ip if available + provider.non_terminated_nodes(head_node_tags) - cli_logger.print("Connect to a terminal on the cluster head:") - cli_logger.print( - cf.bold(" ray attach {}{}"), config_file, modifiers) + if config.get("provider", {}).get("use_internal_ips", False) is True: + head_node_ip = provider.internal_ip(head_node) + else: + head_node_ip = provider.external_ip(head_node) - remote_shell_str = updater.cmd_runner.remote_shell_command_str() - cli_logger.print("Get a remote shell to the cluster manually:") - cli_logger.print(" {}", remote_shell_str.strip()) - finally: - provider.cleanup() + if updater.exitcode != 0: + # todo: this does not follow the mockup and is not good enough + cli_logger.abort("Failed to setup head node.") + + cli_logger.old_error( + logger, "get_or_create_head_node: " + "Updating {} failed", head_node_ip) + sys.exit(1) + + cli_logger.old_info( + logger, "get_or_create_head_node: " + "Head node up-to-date, IP address is: {}", head_node_ip) + + monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*" + if override_cluster_name: + modifiers = " --cluster-name={}".format(quote(override_cluster_name)) + else: + modifiers = "" + + if cli_logger.old_style: + print("To monitor autoscaling activity, you can run:\n\n" + " ray exec {} {}{}\n".format(config_file, quote(monitor_str), + modifiers)) + print("To open a console on the cluster:\n\n" + " ray attach {}{}\n".format(config_file, modifiers)) + + print("To get a remote shell to the cluster manually, run:\n\n" + " {}\n".format(updater.cmd_runner.remote_shell_command_str())) + + cli_logger.newline() + with cli_logger.group("Useful commands"): + cli_logger.print("Monitor autoscaling with") + cli_logger.print( + cf.bold(" ray exec {}{} {}"), config_file, modifiers, + quote(monitor_str)) + + cli_logger.print("Connect to a terminal on the cluster head:") + cli_logger.print(cf.bold(" ray attach {}{}"), config_file, modifiers) + + remote_shell_str = updater.cmd_runner.remote_shell_command_str() + cli_logger.print("Get a remote shell to the cluster manually:") + cli_logger.print(" {}", remote_shell_str.strip()) def attach_cluster(config_file: str, @@ -857,62 +839,59 @@ def exec_cluster(config_file: str, config, config_file, override_cluster_name, create_if_needed=start) provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - updater = NodeUpdaterThread( - node_id=head_node, - provider_config=config["provider"], - provider=provider, - auth_config=config["auth"], - cluster_name=config["cluster_name"], - file_mounts=config["file_mounts"], - initialization_commands=[], - setup_commands=[], - ray_start_commands=[], - runtime_hash="", - file_mounts_contents_hash="", - is_head_node=True, - rsync_options={ - "rsync_exclude": config.get("rsync_exclude"), - "rsync_filter": config.get("rsync_filter") - }, - docker_config=config.get("docker")) - shutdown_after_run = False - if cmd and stop: - cmd += "; ".join([ - "ray stop", - "ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only" - ]) - shutdown_after_run = True + updater = NodeUpdaterThread( + node_id=head_node, + provider_config=config["provider"], + provider=provider, + auth_config=config["auth"], + cluster_name=config["cluster_name"], + file_mounts=config["file_mounts"], + initialization_commands=[], + setup_commands=[], + ray_start_commands=[], + runtime_hash="", + file_mounts_contents_hash="", + is_head_node=True, + rsync_options={ + "rsync_exclude": config.get("rsync_exclude"), + "rsync_filter": config.get("rsync_filter") + }, + docker_config=config.get("docker")) + shutdown_after_run = False + if cmd and stop: + cmd += "; ".join([ + "ray stop", + "ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only" + ]) + shutdown_after_run = True - result = _exec( - updater, - cmd, - screen, - tmux, - port_forward=port_forward, - with_output=with_output, - run_env=run_env, - shutdown_after_run=shutdown_after_run) - if tmux or screen: - attach_command_parts = ["ray attach", config_file] - if override_cluster_name is not None: - attach_command_parts.append( - "--cluster-name={}".format(override_cluster_name)) - if tmux: - attach_command_parts.append("--tmux") - elif screen: - attach_command_parts.append("--screen") + result = _exec( + updater, + cmd, + screen, + tmux, + port_forward=port_forward, + with_output=with_output, + run_env=run_env, + shutdown_after_run=shutdown_after_run) + if tmux or screen: + attach_command_parts = ["ray attach", config_file] + if override_cluster_name is not None: + attach_command_parts.append( + "--cluster-name={}".format(override_cluster_name)) + if tmux: + attach_command_parts.append("--tmux") + elif screen: + attach_command_parts.append("--screen") - attach_command = " ".join(attach_command_parts) - cli_logger.print("Run `{}` to check command status.", - cf.bold(attach_command)) + attach_command = " ".join(attach_command_parts) + cli_logger.print("Run `{}` to check command status.", + cf.bold(attach_command)) - attach_info = "Use `{}` to check on command status.".format( - attach_command) - cli_logger.old_info(logger, attach_info) - return result - finally: - provider.cleanup() + attach_info = "Use `{}` to check on command status.".format( + attach_command) + cli_logger.old_info(logger, attach_info) + return result def _exec(updater: NodeUpdaterThread, @@ -1028,25 +1007,20 @@ def rsync(config_file: str, else: updater.sync_file_mounts(rsync) - try: - nodes = [] - head_node = _get_head_node( - config, config_file, override_cluster_name, create_if_needed=False) - if ip_address: - nodes = [ - provider.get_node_id( - ip_address, use_internal_ip=use_internal_ip) - ] - else: - if all_nodes: - nodes = _get_worker_nodes(config, override_cluster_name) - nodes += [head_node] + nodes = [] + head_node = _get_head_node( + config, config_file, override_cluster_name, create_if_needed=False) + if ip_address: + nodes = [ + provider.get_node_id(ip_address, use_internal_ip=use_internal_ip) + ] + else: + if all_nodes: + nodes = _get_worker_nodes(config, override_cluster_name) + nodes += [head_node] - for node_id in nodes: - rsync_to_node(node_id, is_head_node=(node_id == head_node)) - - finally: - provider.cleanup() + for node_id in nodes: + rsync_to_node(node_id, is_head_node=(node_id == head_node)) def get_head_node_ip(config_file: str, @@ -1058,14 +1032,11 @@ def get_head_node_ip(config_file: str, config["cluster_name"] = override_cluster_name provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - head_node = _get_head_node(config, config_file, override_cluster_name) - if config.get("provider", {}).get("use_internal_ips", False): - head_node_ip = provider.internal_ip(head_node) - else: - head_node_ip = provider.external_ip(head_node) - finally: - provider.cleanup() + head_node = _get_head_node(config, config_file, override_cluster_name) + if config.get("provider", {}).get("use_internal_ips", False): + head_node_ip = provider.internal_ip(head_node) + else: + head_node_ip = provider.external_ip(head_node) return head_node_ip @@ -1080,17 +1051,14 @@ def get_worker_node_ips(config_file: str, config["cluster_name"] = override_cluster_name provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - nodes = provider.non_terminated_nodes({ - TAG_RAY_NODE_KIND: NODE_KIND_WORKER - }) + nodes = provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + }) - if config.get("provider", {}).get("use_internal_ips", False) is True: - return [provider.internal_ip(node) for node in nodes] - else: - return [provider.external_ip(node) for node in nodes] - finally: - provider.cleanup() + if config.get("provider", {}).get("use_internal_ips", False) is True: + return [provider.internal_ip(node) for node in nodes] + else: + return [provider.external_ip(node) for node in nodes] def _get_worker_nodes(config: Dict[str, Any], @@ -1101,12 +1069,7 @@ def _get_worker_nodes(config: Dict[str, Any], config["cluster_name"] = override_cluster_name provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - return provider.non_terminated_nodes({ - TAG_RAY_NODE_KIND: NODE_KIND_WORKER - }) - finally: - provider.cleanup() + return provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def _get_head_node(config: Dict[str, Any], @@ -1114,13 +1077,10 @@ def _get_head_node(config: Dict[str, Any], override_cluster_name: Optional[str], create_if_needed: bool = False) -> str: provider = _get_node_provider(config["provider"], config["cluster_name"]) - try: - head_node_tags = { - TAG_RAY_NODE_KIND: NODE_KIND_HEAD, - } - nodes = provider.non_terminated_nodes(head_node_tags) - finally: - provider.cleanup() + head_node_tags = { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + } + nodes = provider.non_terminated_nodes(head_node_tags) if len(nodes) > 0: head_node = nodes[0] diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 008602ef6..77cb34900 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -129,10 +129,6 @@ class NodeProvider: "{}: Terminating node".format(node_id)) self.terminate_node(node_id) - def cleanup(self) -> None: - """Clean-up when a Provider is no longer required.""" - pass - @staticmethod def bootstrap_config(cluster_config: Dict[str, Any]) -> Dict[str, Any]: """Bootstraps the cluster config by adding env defaults if needed."""