From db59736b1a815e2b5d0a24137186544758778969 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Thu, 4 Feb 2021 10:30:03 -0800 Subject: [PATCH] [autoscaler][kubernetes] Add ability to not copy cluster config to head node when calling `create_or_update_head_node`. (#13720) * Add option to skip bootstrapping head node autoscaling config * don't close remote config before copying * Type * Type hints etc. * test * Test CR to config conversion * comment --- python/ray/autoscaler/_private/commands.py | 129 ++++++++------ .../operator_configs/example_cluster.yaml | 2 +- .../operator_configs/example_cluster2.yaml | 2 +- python/ray/ray_operator/operator.py | 3 +- python/ray/tests/BUILD | 1 + python/ray/tests/test_k8s_operator_mock.py | 162 ++++++++++++++++++ 6 files changed, 246 insertions(+), 53 deletions(-) create mode 100644 python/ray/tests/test_k8s_operator_mock.py diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index df0a10449..84d3b1569 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -34,7 +34,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \ from ray.autoscaler._private.cli_logger import cli_logger, cf from ray.autoscaler._private.updater import NodeUpdaterThread from ray.autoscaler._private.command_runner import set_using_login_shells, \ - set_rsync_silent + set_rsync_silent from ray.autoscaler._private.event_system import (CreateClusterEvent, global_event_system) from ray.autoscaler._private.log_timer import LogTimer @@ -137,17 +137,22 @@ def request_resources(num_cpus: Optional[int] = None, overwrite=True) -def create_or_update_cluster(config_file: str, - override_min_workers: Optional[int], - override_max_workers: Optional[int], - no_restart: bool, - restart_only: bool, - yes: bool, - override_cluster_name: Optional[str] = None, - no_config_cache: bool = False, - redirect_command_output: Optional[bool] = False, - use_login_shells: bool = True) -> Dict[str, Any]: +def create_or_update_cluster( + config_file: str, + override_min_workers: Optional[int], + override_max_workers: Optional[int], + no_restart: bool, + restart_only: bool, + yes: bool, + override_cluster_name: Optional[str] = None, + no_config_cache: bool = False, + redirect_command_output: Optional[bool] = False, + use_login_shells: bool = True, + no_monitor_on_head: bool = False) -> Dict[str, Any]: """Create or updates an autoscaling Ray cluster from a config json.""" + # no_monitor_on_head is an internal flag used by the Ray K8s operator. + # If True, prevents autoscaling config sync to the Ray head during cluster + # creation. See https://github.com/ray-project/ray/pull/13720. set_using_login_shells(use_login_shells) if not use_login_shells: cmd_output_util.set_allow_interactive(False) @@ -225,7 +230,7 @@ def create_or_update_cluster(config_file: str, try_logging_config(config) get_or_create_head_node(config, config_file, no_restart, restart_only, yes, - override_cluster_name) + override_cluster_name, no_monitor_on_head) return config @@ -485,13 +490,17 @@ def monitor_cluster(cluster_config_file: str, num_lines: int, port_forward=None) -def warn_about_bad_start_command(start_commands: List[str]) -> None: +def warn_about_bad_start_command(start_commands: List[str], + no_monitor_on_head: bool = False) -> None: ray_start_cmd = list(filter(lambda x: "ray start" in x, start_commands)) if len(ray_start_cmd) == 0: cli_logger.warning( "Ray runtime will not be started because `{}` is not in `{}`.", cf.bold("ray start"), cf.bold("head_start_ray_commands")) - if not any("autoscaling-config" in x for x in ray_start_cmd): + + autoscaling_config_in_ray_start_cmd = any( + "autoscaling-config" in x for x in ray_start_cmd) + if not (autoscaling_config_in_ray_start_cmd or no_monitor_on_head): cli_logger.warning( "The head node will not launch any workers because " "`{}` does not have `{}` set.\n" @@ -507,6 +516,7 @@ def get_or_create_head_node(config: Dict[str, Any], restart_only: bool, yes: bool, override_cluster_name: Optional[str], + no_monitor_on_head: bool = False, _provider: Optional[NodeProvider] = None, _runner: ModuleType = subprocess) -> None: """Create the cluster head node, which in turn creates the workers.""" @@ -629,41 +639,11 @@ def get_or_create_head_node(config: Dict[str, Any], (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf( config["file_mounts"], None, config) - # Rewrite the auth config so that the head - # node can update the workers - remote_config = copy.deepcopy(config) - - # drop proxy options if they exist, otherwise - # head node won't be able to connect to workers - remote_config["auth"].pop("ssh_proxy_command", None) - - if "ssh_private_key" in config["auth"]: - remote_key_path = "~/ray_bootstrap_key.pem" - remote_config["auth"]["ssh_private_key"] = remote_key_path - - # Adjust for new file locations - new_mounts = {} - for remote_path in config["file_mounts"]: - new_mounts[remote_path] = remote_path - remote_config["file_mounts"] = new_mounts - remote_config["no_restart"] = no_restart - - remote_config = provider.prepare_for_head_node(remote_config) - - # Now inject the rewritten config and SSH key into the head node - remote_config_file = tempfile.NamedTemporaryFile( - "w", prefix="ray-bootstrap-") - remote_config_file.write(json.dumps(remote_config)) - remote_config_file.flush() - config["file_mounts"].update({ - "~/ray_bootstrap_config.yaml": remote_config_file.name - }) - - if "ssh_private_key" in config["auth"]: - config["file_mounts"].update({ - remote_key_path: config["auth"]["ssh_private_key"], - }) - cli_logger.print("Prepared bootstrap config") + if not no_monitor_on_head: + # Return remote_config_file to avoid prematurely closing it. + config, remote_config_file = _set_up_config_for_head_node( + config, provider, no_restart) + cli_logger.print("Prepared bootstrap config") if restart_only: setup_commands = [] @@ -676,7 +656,8 @@ def get_or_create_head_node(config: Dict[str, Any], ray_start_commands = config["head_start_ray_commands"] if not no_restart: - warn_about_bad_start_command(ray_start_commands) + warn_about_bad_start_command(ray_start_commands, + no_monitor_on_head) updater = NodeUpdaterThread( node_id=head_node, @@ -737,6 +718,54 @@ def get_or_create_head_node(config: Dict[str, Any], cli_logger.print(" {}", remote_shell_str.strip()) +def _set_up_config_for_head_node(config: Dict[str, Any], + provider: NodeProvider, + no_restart: bool) ->\ + Tuple[Dict[str, Any], Any]: + """Prepares autoscaling config and, if needed, ssh key, to be mounted onto + the Ray head node for use by the autoscaler. + + Returns the modified config and the temporary config file that will be + mounted onto the head node. + """ + # Rewrite the auth config so that the head + # node can update the workers + remote_config = copy.deepcopy(config) + + # drop proxy options if they exist, otherwise + # head node won't be able to connect to workers + remote_config["auth"].pop("ssh_proxy_command", None) + + if "ssh_private_key" in config["auth"]: + remote_key_path = "~/ray_bootstrap_key.pem" + remote_config["auth"]["ssh_private_key"] = remote_key_path + + # Adjust for new file locations + new_mounts = {} + for remote_path in config["file_mounts"]: + new_mounts[remote_path] = remote_path + remote_config["file_mounts"] = new_mounts + remote_config["no_restart"] = no_restart + + remote_config = provider.prepare_for_head_node(remote_config) + + # Now inject the rewritten config and SSH key into the head node + remote_config_file = tempfile.NamedTemporaryFile( + "w", prefix="ray-bootstrap-") + remote_config_file.write(json.dumps(remote_config)) + remote_config_file.flush() + config["file_mounts"].update({ + "~/ray_bootstrap_config.yaml": remote_config_file.name + }) + + if "ssh_private_key" in config["auth"]: + config["file_mounts"].update({ + remote_key_path: config["auth"]["ssh_private_key"], + }) + + return config, remote_config_file + + def attach_cluster(config_file: str, start: bool, use_screen: bool, diff --git a/python/ray/autoscaler/kubernetes/operator_configs/example_cluster.yaml b/python/ray/autoscaler/kubernetes/operator_configs/example_cluster.yaml index 8d2aa4561..2735c72eb 100644 --- a/python/ray/autoscaler/kubernetes/operator_configs/example_cluster.yaml +++ b/python/ray/autoscaler/kubernetes/operator_configs/example_cluster.yaml @@ -119,7 +119,7 @@ spec: # Note dashboard-host is set to 0.0.0.0 so that Kubernetes can port forward. headStartRayCommands: - ray stop - - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0 + - ulimit -n 65536; ray start --head --no-monitor --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0 # Commands to start Ray on worker nodes. You don't need to change this. workerStartRayCommands: - ray stop diff --git a/python/ray/autoscaler/kubernetes/operator_configs/example_cluster2.yaml b/python/ray/autoscaler/kubernetes/operator_configs/example_cluster2.yaml index 0c6eb604e..7341e16fa 100644 --- a/python/ray/autoscaler/kubernetes/operator_configs/example_cluster2.yaml +++ b/python/ray/autoscaler/kubernetes/operator_configs/example_cluster2.yaml @@ -119,7 +119,7 @@ spec: # Note dashboard-host is set to 0.0.0.0 so that Kubernetes can port forward. headStartRayCommands: - ray stop - - ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0 + - ulimit -n 65536; ray start --head --no-monitor --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0 # Commands to start Ray on worker nodes. You don't need to change this. workerStartRayCommands: - ray stop diff --git a/python/ray/ray_operator/operator.py b/python/ray/ray_operator/operator.py index e39f4cfef..bfbde8055 100644 --- a/python/ray/ray_operator/operator.py +++ b/python/ray/ray_operator/operator.py @@ -62,7 +62,8 @@ class RayCluster(): no_restart=False, restart_only=False, yes=True, - no_config_cache=True) + no_config_cache=True, + no_monitor_on_head=True) self.write_config() def start_monitor(self) -> None: diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 806f04fe5..4ef81d504 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -92,6 +92,7 @@ py_test_module_list( "test_dask_scheduler.py", "test_debug_tools.py", "test_job.py", + "test_k8s_operator_mock.py", "test_memstat.py", "test_metrics_agent.py", "test_microbenchmarks.py", diff --git a/python/ray/tests/test_k8s_operator_mock.py b/python/ray/tests/test_k8s_operator_mock.py new file mode 100644 index 000000000..a3bbf5766 --- /dev/null +++ b/python/ray/tests/test_k8s_operator_mock.py @@ -0,0 +1,162 @@ +import os +import unittest +from unittest.mock import patch + +import pytest +import tempfile +import yaml + +from ray.autoscaler.tags import TAG_RAY_NODE_KIND, NODE_KIND_HEAD +from ray.autoscaler.node_provider import NodeProvider +from ray.ray_operator.operator import RayCluster +from ray.ray_operator.operator_utils import cr_to_config +from ray.autoscaler._private.kubernetes.node_provider import\ + KubernetesNodeProvider +from ray.autoscaler._private.updater import NodeUpdaterThread +""" +Tests that, when the K8s operator launches a cluster, no files are mounted onto +the head node. +The main idea is to mock the NodeUpdaterThread to report if it received any +file mounts. +""" + +# NodeUpdaterThread mock methods +START = "start" +JOIN = "join" + + +def mock_start(self): + # Detects any file mounts passed in NodeUpdaterThread.__init__() + if self.file_mounts: + raise ValueError("File mounts in operator's code path.") + + +def mock_join(self): + # Fake success + self.exitcode = 0 + return + + +# RayCluster mock methods +SETUP_LOGGING = "setup_logging" +WRITE_CONFIG = "write_config" + + +def mock_setup_logging(self): + return + + +def mock_write_config(self): + # Use a named temporary file instead of a real one. + self.config_file = tempfile.NamedTemporaryFile("w") + self.config_path = self.config_file.name + yaml.dump(self.config, self.config_file) + self.config_file.flush() + + +# KubernetesNodeProvider mock methods +INIT = "__init__" +NON_TERMINATED_NODES = "non_terminated_nodes" +CREATE_NODE = "create_node" +BOOTSTRAP_CONFIG = "bootstrap_config" + +HEAD_NODE_TAGS = {TAG_RAY_NODE_KIND: NODE_KIND_HEAD} + + +def mock_init(self, provider_config, cluster_name): + # Adds an attribute to detect if the provider has created the head. + NodeProvider.__init__(self, provider_config, cluster_name) + self.cluster_name = cluster_name + self.namespace = provider_config["namespace"] + + self._head_created = False + + +def mock_non_terminated_nodes(self, node_tags): + # First time this is called, it returns an empty list. + # Second time, returns a mock head node id. + if HEAD_NODE_TAGS.items() <= node_tags.items() and self._head_created: + # Second call. + return ["HEAD"] + elif node_tags == HEAD_NODE_TAGS: + # First call. + return [] + else: + # Should not go here. + raise ValueError("Test passed invalid parameters.") + + +def mock_create_node(self, node_config, tags, count): + # Called during head node creation. Marks that a head node has been + # created. + if HEAD_NODE_TAGS.items() <= tags.items() and count == 1: + self._head_created = True + else: + raise ValueError(f"Test passed invalid parameter {tags} {count}.") + + +def mock_bootstrap_config(cluster_config): + # KubernetesNodeProvider.bootstrap_config has no side effects + # on cluster_config -- the method just creates K8s API objects. + # Thus it makes sense to dummy out the K8s API calls and return + # the config. + return cluster_config + + +def custom_resources(): + # K8s custom resources used in test. + here = os.path.realpath(__file__) + ray_python_root = os.path.dirname(os.path.dirname(here)) + relative_path = "autoscaler/kubernetes/operator_configs" + abs_path = os.path.join(ray_python_root, relative_path) + cluster1, cluster2 = "example_cluster.yaml", "example_cluster2.yaml" + path1, path2 = os.path.join(abs_path, cluster1), os.path.join( + abs_path, cluster2) + cr1, cr2 = (yaml.safe_load(open(path1).read()), + yaml.safe_load(open(path2).read())) + # Metadata and field is filled by K8s in real life. + cr1["metadata"]["uid"] = "abc" + cr2["metadata"]["uid"] = "xyz" + return cr1, cr2 + + +class OperatorTest(unittest.TestCase): + def test_no_file_mounts_k8s_operator_cluster_launch(self): + with patch.object(NodeUpdaterThread, START, mock_start),\ + patch.object(NodeUpdaterThread, JOIN, mock_join),\ + patch.object(RayCluster, SETUP_LOGGING, mock_setup_logging),\ + patch.object(RayCluster, WRITE_CONFIG, mock_write_config),\ + patch.object(KubernetesNodeProvider, INIT, mock_init),\ + patch.object(KubernetesNodeProvider, NON_TERMINATED_NODES, + mock_non_terminated_nodes),\ + patch.object(KubernetesNodeProvider, CREATE_NODE, + mock_create_node),\ + patch.object(KubernetesNodeProvider, BOOTSTRAP_CONFIG, + mock_bootstrap_config): + + cluster_cr1, cluster_cr2 = custom_resources() + + # Ensure that operator does not mount any files during cluster + # launch. + config1 = cr_to_config(cluster_cr1) + config1["provider"]["namespace"] = "test" + cluster1 = RayCluster(config1) + cluster1.start_head() + + # Check that this test is working correctly by inserting extraneous + # file mounts and confirming a ValueError from the mocked + # NodeUpdater. + config2 = cr_to_config(cluster_cr2) + config2["provider"]["namespace"] = "test" + # Note: There is no user interface for adding file mounts + # to the config of a Ray cluster run via the operator. + # This purely for purposes of testing this test. + config2["file_mounts"] = {"remote_foo": os.path.abspath(__file__)} + cluster2 = RayCluster(config2) + with pytest.raises(ValueError): + cluster2.start_head() + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__]))