[autoscaler][kubernetes] Add ability to not copy cluster config to head node when calling create_or_update_head_node. (#13720)

* Add option to skip bootstrapping head node autoscaling config

* don't close remote config before copying

* Type

* Type hints etc.

* test

* Test CR to config conversion

* comment
This commit is contained in:
Dmitri Gekhtman
2021-02-04 10:30:03 -08:00
committed by GitHub
parent 1e113d2e6e
commit db59736b1a
6 changed files with 246 additions and 53 deletions
+79 -50
View File
@@ -34,7 +34,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \
from ray.autoscaler._private.cli_logger import cli_logger, cf
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.command_runner import set_using_login_shells, \
set_rsync_silent
set_rsync_silent
from ray.autoscaler._private.event_system import (CreateClusterEvent,
global_event_system)
from ray.autoscaler._private.log_timer import LogTimer
@@ -137,17 +137,22 @@ def request_resources(num_cpus: Optional[int] = None,
overwrite=True)
def create_or_update_cluster(config_file: str,
override_min_workers: Optional[int],
override_max_workers: Optional[int],
no_restart: bool,
restart_only: bool,
yes: bool,
override_cluster_name: Optional[str] = None,
no_config_cache: bool = False,
redirect_command_output: Optional[bool] = False,
use_login_shells: bool = True) -> Dict[str, Any]:
def create_or_update_cluster(
config_file: str,
override_min_workers: Optional[int],
override_max_workers: Optional[int],
no_restart: bool,
restart_only: bool,
yes: bool,
override_cluster_name: Optional[str] = None,
no_config_cache: bool = False,
redirect_command_output: Optional[bool] = False,
use_login_shells: bool = True,
no_monitor_on_head: bool = False) -> Dict[str, Any]:
"""Create or updates an autoscaling Ray cluster from a config json."""
# no_monitor_on_head is an internal flag used by the Ray K8s operator.
# If True, prevents autoscaling config sync to the Ray head during cluster
# creation. See https://github.com/ray-project/ray/pull/13720.
set_using_login_shells(use_login_shells)
if not use_login_shells:
cmd_output_util.set_allow_interactive(False)
@@ -225,7 +230,7 @@ def create_or_update_cluster(config_file: str,
try_logging_config(config)
get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
override_cluster_name)
override_cluster_name, no_monitor_on_head)
return config
@@ -485,13 +490,17 @@ def monitor_cluster(cluster_config_file: str, num_lines: int,
port_forward=None)
def warn_about_bad_start_command(start_commands: List[str]) -> None:
def warn_about_bad_start_command(start_commands: List[str],
no_monitor_on_head: bool = False) -> None:
ray_start_cmd = list(filter(lambda x: "ray start" in x, start_commands))
if len(ray_start_cmd) == 0:
cli_logger.warning(
"Ray runtime will not be started because `{}` is not in `{}`.",
cf.bold("ray start"), cf.bold("head_start_ray_commands"))
if not any("autoscaling-config" in x for x in ray_start_cmd):
autoscaling_config_in_ray_start_cmd = any(
"autoscaling-config" in x for x in ray_start_cmd)
if not (autoscaling_config_in_ray_start_cmd or no_monitor_on_head):
cli_logger.warning(
"The head node will not launch any workers because "
"`{}` does not have `{}` set.\n"
@@ -507,6 +516,7 @@ def get_or_create_head_node(config: Dict[str, Any],
restart_only: bool,
yes: bool,
override_cluster_name: Optional[str],
no_monitor_on_head: bool = False,
_provider: Optional[NodeProvider] = None,
_runner: ModuleType = subprocess) -> None:
"""Create the cluster head node, which in turn creates the workers."""
@@ -629,41 +639,11 @@ def get_or_create_head_node(config: Dict[str, Any],
(runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
config["file_mounts"], None, config)
# Rewrite the auth config so that the head
# node can update the workers
remote_config = copy.deepcopy(config)
# drop proxy options if they exist, otherwise
# head node won't be able to connect to workers
remote_config["auth"].pop("ssh_proxy_command", None)
if "ssh_private_key" in config["auth"]:
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config["auth"]["ssh_private_key"] = remote_key_path
# Adjust for new file locations
new_mounts = {}
for remote_path in config["file_mounts"]:
new_mounts[remote_path] = remote_path
remote_config["file_mounts"] = new_mounts
remote_config["no_restart"] = no_restart
remote_config = provider.prepare_for_head_node(remote_config)
# Now inject the rewritten config and SSH key into the head node
remote_config_file = tempfile.NamedTemporaryFile(
"w", prefix="ray-bootstrap-")
remote_config_file.write(json.dumps(remote_config))
remote_config_file.flush()
config["file_mounts"].update({
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if "ssh_private_key" in config["auth"]:
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
})
cli_logger.print("Prepared bootstrap config")
if not no_monitor_on_head:
# Return remote_config_file to avoid prematurely closing it.
config, remote_config_file = _set_up_config_for_head_node(
config, provider, no_restart)
cli_logger.print("Prepared bootstrap config")
if restart_only:
setup_commands = []
@@ -676,7 +656,8 @@ def get_or_create_head_node(config: Dict[str, Any],
ray_start_commands = config["head_start_ray_commands"]
if not no_restart:
warn_about_bad_start_command(ray_start_commands)
warn_about_bad_start_command(ray_start_commands,
no_monitor_on_head)
updater = NodeUpdaterThread(
node_id=head_node,
@@ -737,6 +718,54 @@ def get_or_create_head_node(config: Dict[str, Any],
cli_logger.print(" {}", remote_shell_str.strip())
def _set_up_config_for_head_node(config: Dict[str, Any],
provider: NodeProvider,
no_restart: bool) ->\
Tuple[Dict[str, Any], Any]:
"""Prepares autoscaling config and, if needed, ssh key, to be mounted onto
the Ray head node for use by the autoscaler.
Returns the modified config and the temporary config file that will be
mounted onto the head node.
"""
# Rewrite the auth config so that the head
# node can update the workers
remote_config = copy.deepcopy(config)
# drop proxy options if they exist, otherwise
# head node won't be able to connect to workers
remote_config["auth"].pop("ssh_proxy_command", None)
if "ssh_private_key" in config["auth"]:
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config["auth"]["ssh_private_key"] = remote_key_path
# Adjust for new file locations
new_mounts = {}
for remote_path in config["file_mounts"]:
new_mounts[remote_path] = remote_path
remote_config["file_mounts"] = new_mounts
remote_config["no_restart"] = no_restart
remote_config = provider.prepare_for_head_node(remote_config)
# Now inject the rewritten config and SSH key into the head node
remote_config_file = tempfile.NamedTemporaryFile(
"w", prefix="ray-bootstrap-")
remote_config_file.write(json.dumps(remote_config))
remote_config_file.flush()
config["file_mounts"].update({
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if "ssh_private_key" in config["auth"]:
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
})
return config, remote_config_file
def attach_cluster(config_file: str,
start: bool,
use_screen: bool,
@@ -119,7 +119,7 @@ spec:
# Note dashboard-host is set to 0.0.0.0 so that Kubernetes can port forward.
headStartRayCommands:
- ray stop
- ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0
- ulimit -n 65536; ray start --head --no-monitor --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0
# Commands to start Ray on worker nodes. You don't need to change this.
workerStartRayCommands:
- ray stop
@@ -119,7 +119,7 @@ spec:
# Note dashboard-host is set to 0.0.0.0 so that Kubernetes can port forward.
headStartRayCommands:
- ray stop
- ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0
- ulimit -n 65536; ray start --head --no-monitor --port=6379 --object-manager-port=8076 --dashboard-host 0.0.0.0
# Commands to start Ray on worker nodes. You don't need to change this.
workerStartRayCommands:
- ray stop
+2 -1
View File
@@ -62,7 +62,8 @@ class RayCluster():
no_restart=False,
restart_only=False,
yes=True,
no_config_cache=True)
no_config_cache=True,
no_monitor_on_head=True)
self.write_config()
def start_monitor(self) -> None:
+1
View File
@@ -92,6 +92,7 @@ py_test_module_list(
"test_dask_scheduler.py",
"test_debug_tools.py",
"test_job.py",
"test_k8s_operator_mock.py",
"test_memstat.py",
"test_metrics_agent.py",
"test_microbenchmarks.py",
+162
View File
@@ -0,0 +1,162 @@
import os
import unittest
from unittest.mock import patch
import pytest
import tempfile
import yaml
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, NODE_KIND_HEAD
from ray.autoscaler.node_provider import NodeProvider
from ray.ray_operator.operator import RayCluster
from ray.ray_operator.operator_utils import cr_to_config
from ray.autoscaler._private.kubernetes.node_provider import\
KubernetesNodeProvider
from ray.autoscaler._private.updater import NodeUpdaterThread
"""
Tests that, when the K8s operator launches a cluster, no files are mounted onto
the head node.
The main idea is to mock the NodeUpdaterThread to report if it received any
file mounts.
"""
# NodeUpdaterThread mock methods
START = "start"
JOIN = "join"
def mock_start(self):
# Detects any file mounts passed in NodeUpdaterThread.__init__()
if self.file_mounts:
raise ValueError("File mounts in operator's code path.")
def mock_join(self):
# Fake success
self.exitcode = 0
return
# RayCluster mock methods
SETUP_LOGGING = "setup_logging"
WRITE_CONFIG = "write_config"
def mock_setup_logging(self):
return
def mock_write_config(self):
# Use a named temporary file instead of a real one.
self.config_file = tempfile.NamedTemporaryFile("w")
self.config_path = self.config_file.name
yaml.dump(self.config, self.config_file)
self.config_file.flush()
# KubernetesNodeProvider mock methods
INIT = "__init__"
NON_TERMINATED_NODES = "non_terminated_nodes"
CREATE_NODE = "create_node"
BOOTSTRAP_CONFIG = "bootstrap_config"
HEAD_NODE_TAGS = {TAG_RAY_NODE_KIND: NODE_KIND_HEAD}
def mock_init(self, provider_config, cluster_name):
# Adds an attribute to detect if the provider has created the head.
NodeProvider.__init__(self, provider_config, cluster_name)
self.cluster_name = cluster_name
self.namespace = provider_config["namespace"]
self._head_created = False
def mock_non_terminated_nodes(self, node_tags):
# First time this is called, it returns an empty list.
# Second time, returns a mock head node id.
if HEAD_NODE_TAGS.items() <= node_tags.items() and self._head_created:
# Second call.
return ["HEAD"]
elif node_tags == HEAD_NODE_TAGS:
# First call.
return []
else:
# Should not go here.
raise ValueError("Test passed invalid parameters.")
def mock_create_node(self, node_config, tags, count):
# Called during head node creation. Marks that a head node has been
# created.
if HEAD_NODE_TAGS.items() <= tags.items() and count == 1:
self._head_created = True
else:
raise ValueError(f"Test passed invalid parameter {tags} {count}.")
def mock_bootstrap_config(cluster_config):
# KubernetesNodeProvider.bootstrap_config has no side effects
# on cluster_config -- the method just creates K8s API objects.
# Thus it makes sense to dummy out the K8s API calls and return
# the config.
return cluster_config
def custom_resources():
# K8s custom resources used in test.
here = os.path.realpath(__file__)
ray_python_root = os.path.dirname(os.path.dirname(here))
relative_path = "autoscaler/kubernetes/operator_configs"
abs_path = os.path.join(ray_python_root, relative_path)
cluster1, cluster2 = "example_cluster.yaml", "example_cluster2.yaml"
path1, path2 = os.path.join(abs_path, cluster1), os.path.join(
abs_path, cluster2)
cr1, cr2 = (yaml.safe_load(open(path1).read()),
yaml.safe_load(open(path2).read()))
# Metadata and field is filled by K8s in real life.
cr1["metadata"]["uid"] = "abc"
cr2["metadata"]["uid"] = "xyz"
return cr1, cr2
class OperatorTest(unittest.TestCase):
def test_no_file_mounts_k8s_operator_cluster_launch(self):
with patch.object(NodeUpdaterThread, START, mock_start),\
patch.object(NodeUpdaterThread, JOIN, mock_join),\
patch.object(RayCluster, SETUP_LOGGING, mock_setup_logging),\
patch.object(RayCluster, WRITE_CONFIG, mock_write_config),\
patch.object(KubernetesNodeProvider, INIT, mock_init),\
patch.object(KubernetesNodeProvider, NON_TERMINATED_NODES,
mock_non_terminated_nodes),\
patch.object(KubernetesNodeProvider, CREATE_NODE,
mock_create_node),\
patch.object(KubernetesNodeProvider, BOOTSTRAP_CONFIG,
mock_bootstrap_config):
cluster_cr1, cluster_cr2 = custom_resources()
# Ensure that operator does not mount any files during cluster
# launch.
config1 = cr_to_config(cluster_cr1)
config1["provider"]["namespace"] = "test"
cluster1 = RayCluster(config1)
cluster1.start_head()
# Check that this test is working correctly by inserting extraneous
# file mounts and confirming a ValueError from the mocked
# NodeUpdater.
config2 = cr_to_config(cluster_cr2)
config2["provider"]["namespace"] = "test"
# Note: There is no user interface for adding file mounts
# to the config of a Ray cluster run via the operator.
# This purely for purposes of testing this test.
config2["file_mounts"] = {"remote_foo": os.path.abspath(__file__)}
cluster2 = RayCluster(config2)
with pytest.raises(ValueError):
cluster2.start_head()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))