diff --git a/python/ray/autoscaler/_private/command_runner.py b/python/ray/autoscaler/_private/command_runner.py index 6da60fbb1..f72c136f7 100644 --- a/python/ray/autoscaler/_private/command_runner.py +++ b/python/ray/autoscaler/_private/command_runner.py @@ -1,6 +1,6 @@ from getpass import getuser from shlex import quote -from typing import Any, List, Tuple, Dict, Optional +from typing import Dict import click import hashlib import json @@ -11,6 +11,7 @@ import sys import time import warnings +from ray.autoscaler.command_runner import CommandRunnerInterface from ray.autoscaler._private.docker import check_bind_mounts_cmd, \ check_docker_running_cmd, \ check_docker_image, \ @@ -100,84 +101,6 @@ def _with_interactive(cmd): return ["bash", "--login", "-c", "-i", quote(force_interactive)] -class CommandRunnerInterface: - """Interface to run commands on a remote cluster node. - - Command runner instances are returned by provider.get_command_runner().""" - - def run( - self, - cmd: str = None, - timeout: int = 120, - exit_on_fail: bool = False, - port_forward: List[Tuple[int, int]] = None, - with_output: bool = False, - environment_variables: Dict[str, object] = None, - run_env: str = "auto", - ssh_options_override_ssh_key: str = "", - shutdown_after_run: bool = False, - ) -> str: - """Run the given command on the cluster node and optionally get output. - - WARNING: the cloudgateway needs arguments of "run" function to be json - dumpable to send them over HTTP requests. - - Args: - cmd (str): The command to run. - timeout (int): The command timeout in seconds. - exit_on_fail (bool): Whether to sys exit on failure. - port_forward (list): List of (local, remote) ports to forward, or - a single tuple. - with_output (bool): Whether to return output. - environment_variables (Dict[str, str | int | Dict[str, str]): - Environment variables that `cmd` should be run with. - run_env (str): Options: docker/host/auto. Used in - DockerCommandRunner to determine the run environment. - ssh_options_override_ssh_key (str): if provided, overwrites - SSHOptions class with SSHOptions(ssh_options_override_ssh_key). - shutdown_after_run (bool): if provided, shutdowns down the machine - after executing the command with `sudo shutdown -h now`. - """ - raise NotImplementedError - - def run_rsync_up(self, - source: str, - target: str, - options: Optional[Dict[str, Any]] = None) -> None: - """Rsync files up to the cluster node. - - Args: - source (str): The (local) source directory or file. - target (str): The (remote) destination path. - """ - raise NotImplementedError - - def run_rsync_down(self, - source: str, - target: str, - options: Optional[Dict[str, Any]] = None) -> None: - """Rsync files down from the cluster node. - - Args: - source (str): The (remote) source directory or file. - target (str): The (local) destination path. - """ - raise NotImplementedError - - def remote_shell_command_str(self) -> str: - """Return the command the user can use to open a shell.""" - raise NotImplementedError - - def run_init(self, *, as_head: bool, file_mounts: Dict[str, str]) -> None: - """Used to run extra initialization commands. - - Args: - as_head (bool): Run as head image or worker. - file_mounts (dict): Files to copy to the head and worker nodes. - """ - pass - - class KubernetesCommandRunner(CommandRunnerInterface): def __init__(self, log_prefix, namespace, node_id, auth_config, process_runner): diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py new file mode 100644 index 000000000..56fc0a1be --- /dev/null +++ b/python/ray/autoscaler/command_runner.py @@ -0,0 +1,79 @@ +from typing import Any, List, Tuple, Dict, Optional + + +class CommandRunnerInterface: + """Interface to run commands on a remote cluster node. + + Command runner instances are returned by provider.get_command_runner().""" + + def run( + self, + cmd: str = None, + timeout: int = 120, + exit_on_fail: bool = False, + port_forward: List[Tuple[int, int]] = None, + with_output: bool = False, + environment_variables: Dict[str, object] = None, + run_env: str = "auto", + ssh_options_override_ssh_key: str = "", + shutdown_after_run: bool = False, + ) -> str: + """Run the given command on the cluster node and optionally get output. + + WARNING: the cloudgateway needs arguments of "run" function to be json + dumpable to send them over HTTP requests. + + Args: + cmd (str): The command to run. + timeout (int): The command timeout in seconds. + exit_on_fail (bool): Whether to sys exit on failure. + port_forward (list): List of (local, remote) ports to forward, or + a single tuple. + with_output (bool): Whether to return output. + environment_variables (Dict[str, str | int | Dict[str, str]): + Environment variables that `cmd` should be run with. + run_env (str): Options: docker/host/auto. Used in + DockerCommandRunner to determine the run environment. + ssh_options_override_ssh_key (str): if provided, overwrites + SSHOptions class with SSHOptions(ssh_options_override_ssh_key). + shutdown_after_run (bool): if provided, shutdowns down the machine + after executing the command with `sudo shutdown -h now`. + """ + raise NotImplementedError + + def run_rsync_up(self, + source: str, + target: str, + options: Optional[Dict[str, Any]] = None) -> None: + """Rsync files up to the cluster node. + + Args: + source (str): The (local) source directory or file. + target (str): The (remote) destination path. + """ + raise NotImplementedError + + def run_rsync_down(self, + source: str, + target: str, + options: Optional[Dict[str, Any]] = None) -> None: + """Rsync files down from the cluster node. + + Args: + source (str): The (remote) source directory or file. + target (str): The (local) destination path. + """ + raise NotImplementedError + + def remote_shell_command_str(self) -> str: + """Return the command the user can use to open a shell.""" + raise NotImplementedError + + def run_init(self, *, as_head: bool, file_mounts: Dict[str, str]) -> None: + """Used to run extra initialization commands. + + Args: + as_head (bool): Run as head image or worker. + file_mounts (dict): Files to copy to the head and worker nodes. + """ + pass diff --git a/python/ray/tests/test_command_runner.py b/python/ray/tests/test_command_runner.py index 4c0943475..c8da3e389 100644 --- a/python/ray/tests/test_command_runner.py +++ b/python/ray/tests/test_command_runner.py @@ -4,9 +4,9 @@ import sys from unittest.mock import patch from ray.tests.test_autoscaler import MockProvider, MockProcessRunner -from ray.autoscaler._private.command_runner import CommandRunnerInterface, \ - SSHCommandRunner, _with_environment_variables, DockerCommandRunner, \ - KubernetesCommandRunner +from ray.autoscaler.command_runner import CommandRunnerInterface +from ray.autoscaler._private.command_runner import SSHCommandRunner, \ + DockerCommandRunner, KubernetesCommandRunner, _with_environment_variables from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX from getpass import getuser import hashlib