diff --git a/python/ray/projects/scripts.py b/python/ray/projects/scripts.py index 07763c1fd..5d2c10bf6 100644 --- a/python/ray/projects/scripts.py +++ b/python/ray/projects/scripts.py @@ -145,6 +145,120 @@ def load_project_or_throw(): "`ray project validate` to inspect the error.") +class SessionRunner(object): + """Class for setting up a session and executing commands in it.""" + + def __init__(self): + """Initialize session runner and try to parse the command arguments. + + Raises: + click.ClickException: This exception is raised if any error occurs. + """ + self.project_definition = load_project_or_throw() + + # Check for features we don't support right now + project_environment = self.project_definition.config["environment"] + need_docker = ("dockerfile" in project_environment + or "dockerimage" in project_environment) + if need_docker: + raise click.ClickException( + "Docker support in session is currently not implemented.") + + def create_cluster(self): + """Create a cluster that will run the session.""" + create_or_update_cluster( + config_file=self.project_definition.cluster_yaml(), + override_min_workers=None, + override_max_workers=None, + no_restart=False, + restart_only=False, + yes=True, + override_cluster_name=None, + ) + + def sync_files(self): + """Synchronize files with the session.""" + rsync( + self.project_definition.cluster_yaml(), + source=self.project_definition.root, + target=self.project_definition.working_directory(), + override_cluster_name=None, + down=False, + ) + + def setup_environment(self): + """Set up the environment of the session.""" + project_environment = self.project_definition.config["environment"] + + if "requirements" in project_environment: + requirements_txt = project_environment["requirements"] + + # Create a temporary requirements_txt in the head node. + remote_requirements_txt = ( + "/tmp/" + "ray_project_requirements_txt_{}".format( + time.time())) + + rsync( + self.project_definition.cluster_yaml(), + source=requirements_txt, + target=remote_requirements_txt, + override_cluster_name=None, + down=False, + ) + self.execute_command( + "pip install -r {}".format(remote_requirements_txt)) + + if "shell" in project_environment: + for cmd in project_environment["shell"]: + self.execute_command(cmd) + + def format_command(self, command, args, shell): + """Validate and format a session command. + + Args: + command (str, optional): Command from the project definition's + commands section to run, if any. + args (list): Arguments for the command to run. + shell (bool): If true, command is a shell command that should be + run directly. + + Returns: + The formatted shell command to run. + + Raises: + click.ClickException: This exception is raised if any error occurs. + """ + if shell: + return command + else: + try: + return self.project_definition.get_command_to_run( + command=command, args=args) + except ValueError as e: + raise click.ClickException(e) + + def execute_command(self, cmd): + """Execute a shell command in the session. + + Args: + cmd (str): Shell command to run in the session. It will be + run in the working directory of the project. + """ + cwd = self.project_definition.working_directory() + cmd = "cd {cwd}; {cmd}".format(cwd=cwd, cmd=cmd) + exec_cluster( + config_file=self.project_definition.cluster_yaml(), + cmd=cmd, + docker=False, + screen=False, + tmux=False, + stop=False, + start=False, + override_cluster_name=None, + port_forward=None, + ) + + @session_cli.command(help="Attach to an existing cluster") def attach(): project_definition = load_project_or_throw() @@ -168,6 +282,7 @@ def stop(): @session_cli.command( + name="start", context_settings=dict(ignore_unknown_options=True, ), help="Start a session based on current project config") @click.argument("command", required=False) @@ -178,99 +293,41 @@ def stop(): "If set, run the command as a raw shell command instead of looking up " "the command in the project config"), is_flag=True) -def start(command, args, shell): - project_definition = load_project_or_throw() - - if shell: - command_to_run = command +def session_start(command, args, shell): + runner = SessionRunner() + if shell or command: + # Get the actual command to run. + cmd = runner.format_command(command, args, shell) + num_steps = 4 else: - try: - command_to_run = project_definition.get_command_to_run( - command=command, args=args) - except ValueError as e: - raise click.ClickException(e) + num_steps = 3 - # Check for features we don't support right now - project_environment = project_definition.config["environment"] - need_docker = ("dockerfile" in project_environment - or "dockerimage" in project_environment) - if need_docker: - raise click.ClickException( - "Docker support in session is currently not implemented. " - "Please file an feature request at" - "https://github.com/ray-project/ray/issues") + logger.info("[1/{}] Creating cluster".format(num_steps)) + runner.create_cluster() + logger.info("[2/{}] Syncing the project".format(num_steps)) + runner.sync_files() + logger.info("[3/{}] Setting up environment".format(num_steps)) + runner.setup_environment() - logger.info("[1/4] Creating cluster") - create_or_update_cluster( - config_file=project_definition.cluster_yaml(), - override_min_workers=None, - override_max_workers=None, - no_restart=False, - restart_only=False, - yes=True, - override_cluster_name=None, - ) - - logger.info("[2/4] Syncing the project") - rsync( - project_definition.cluster_yaml(), - source=project_definition.root, - target=project_definition.working_directory(), - override_cluster_name=None, - down=False, - ) - - logger.info("[3/4] Setting up environment") - _setup_environment( - project_definition.cluster_yaml(), - project_environment, - cwd=project_definition.working_directory()) - - logger.info("[4/4] Running command") - logger.debug("Running {}".format(command)) - session_exec_cluster( - project_definition.cluster_yaml(), - command_to_run, - cwd=project_definition.working_directory()) + if shell or command: + # Run the actual command. + logger.info("[4/4] Running command") + runner.execute_command(cmd) -def session_exec_cluster(cluster_yaml, cmd, cwd=None): - if cwd is not None: - cmd = "cd {cwd}; {cmd}".format(cwd=cwd, cmd=cmd) - exec_cluster( - config_file=cluster_yaml, - cmd=cmd, - docker=False, - screen=False, - tmux=False, - stop=False, - start=False, - override_cluster_name=None, - port_forward=None, - ) - - -def _setup_environment(cluster_yaml, project_environment, cwd): - - if "requirements" in project_environment: - requirements_txt = project_environment["requirements"] - - # Create a temporary requirements_txt in the head node. - remote_requirements_txt = ( - "/tmp/" + "ray_project_requirements_txt_{}".format(time.time())) - - rsync( - cluster_yaml, - source=requirements_txt, - target=remote_requirements_txt, - override_cluster_name=None, - down=False, - ) - session_exec_cluster( - cluster_yaml, - "pip install -r {}".format(remote_requirements_txt), - cwd=cwd) - - if "shell" in project_environment: - for cmd in project_environment["shell"]: - session_exec_cluster(cluster_yaml, cmd, cwd=cwd) +@session_cli.command( + name="execute", + context_settings=dict(ignore_unknown_options=True, ), + help="Execute a command in a session") +@click.argument("command", required=False) +@click.argument("args", nargs=-1, type=click.UNPROCESSED) +@click.option( + "--shell", + help=( + "If set, run the command as a raw shell command instead of looking up " + "the command in the project config"), + is_flag=True) +def session_execute(command, args, shell): + runner = SessionRunner() + cmd = runner.format_command(command, args, shell) + runner.execute_command(cmd) diff --git a/python/ray/tests/test_projects.py b/python/ray/tests/test_projects.py index f24f6627a..c103a124b 100644 --- a/python/ray/tests/test_projects.py +++ b/python/ray/tests/test_projects.py @@ -12,7 +12,7 @@ import sys from contextlib import contextmanager -from ray.projects.scripts import start +from ray.projects.scripts import session_start, session_execute import ray if sys.version_info >= (3, 3): @@ -101,7 +101,7 @@ def run_test_project(project_dir, command, args): def test_session_start_default_project(): result, mock_calls, test_dir = run_test_project( - "session-tests/project-pass", start, []) + "session-tests/project-pass", session_start, ["default"]) loaded_project = ray.projects.ProjectDefinition(test_dir) assert result.exit_code == 0 @@ -143,9 +143,32 @@ def test_session_start_default_project(): assert expected_commands == commands_executed +def test_session_execute_default_project(): + result, mock_calls, test_dir = run_test_project( + "session-tests/project-pass", session_execute, ["default"]) + + loaded_project = ray.projects.ProjectDefinition(test_dir) + assert result.exit_code == 0 + + assert mock_calls["rsync"].call_count == 0 + assert mock_calls["create_or_update_cluster"].call_count == 0 + + exec_cluster_call = mock_calls["exec_cluster"] + commands_executed = [] + for _, kwargs in exec_cluster_call.call_args_list: + commands_executed.append(kwargs["cmd"].replace( + "cd {}; ".format(loaded_project.working_directory()), "")) + + expected_commands = [ + command["command"] for command in loaded_project.config["commands"] + ] + + assert expected_commands == commands_executed + + def test_session_start_docker_fail(): - result, _, _ = run_test_project("session-tests/with-docker-fail", start, - []) + result, _, _ = run_test_project("session-tests/with-docker-fail", + session_start, []) assert result.exit_code == 1 assert ("Docker support in session is currently " @@ -153,8 +176,8 @@ def test_session_start_docker_fail(): def test_session_invalid_config_errored(): - result, _, _ = run_test_project("session-tests/invalid-config-fail", start, - []) + result, _, _ = run_test_project("session-tests/invalid-config-fail", + session_start, []) assert result.exit_code == 1 assert "validation failed" in result.output @@ -164,7 +187,7 @@ def test_session_invalid_config_errored(): def test_session_create_command(): result, mock_calls, test_dir = run_test_project( - "session-tests/commands-test", start, + "session-tests/commands-test", session_start, ["first", "--a", "1", "--b", "2"]) # Verify the project can be loaded.