From 1187d1dd3eed65566b22876e6ad1091367d08679 Mon Sep 17 00:00:00 2001 From: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com> Date: Wed, 3 Feb 2021 18:07:11 -0800 Subject: [PATCH] [autoscaler][kubernetes][operator] Rudimentary error handling, make "MODIFIED" -> update event work. (#13756) --- doc/source/cluster/k8s-operator.rst | 3 ++ .../operator_configs/cluster_crd.yaml | 16 ++++++++ .../kubernetes/operator_configs/operator.yaml | 2 +- python/ray/ray_operator/operator.py | 39 ++++++++++++++++--- python/ray/ray_operator/operator_utils.py | 13 +++++++ .../ray/tests/test_k8s_operator_examples.py | 14 ++++++- 6 files changed, 80 insertions(+), 7 deletions(-) diff --git a/doc/source/cluster/k8s-operator.rst b/doc/source/cluster/k8s-operator.rst index 2fb8efef8..d846fe029 100644 --- a/doc/source/cluster/k8s-operator.rst +++ b/doc/source/cluster/k8s-operator.rst @@ -19,6 +19,9 @@ The rest of this document explains step-by-step how to use the Ray Kubernetes Op .. role:: bash(code) :language: bash +.. note:: + The Ray Kubernetes Operator is still experimental. For the yaml files in the examples below, we recomend using the latest master version of Ray. + .. warning:: The Ray Kubernetes Operator requires Kubernetes version at least ``v1.17.0``. Check Kubernetes version info with the command :bash:`kubectl version`. diff --git a/python/ray/autoscaler/kubernetes/operator_configs/cluster_crd.yaml b/python/ray/autoscaler/kubernetes/operator_configs/cluster_crd.yaml index 75a802b58..5387803c1 100644 --- a/python/ray/autoscaler/kubernetes/operator_configs/cluster_crd.yaml +++ b/python/ray/autoscaler/kubernetes/operator_configs/cluster_crd.yaml @@ -13,6 +13,16 @@ spec: - name: v1 served: true storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: status + type: string + description: Running or Error + jsonPath: .status.phase + - name: age + type: date + jsonPath: .metadata.creationTimestamp schema: openAPIV3Schema: description: Ray cluster configuration @@ -20,6 +30,12 @@ spec: required: - spec properties: + status: + type: object + properties: + phase: + description: Running or Error + type: string spec: type: object required: diff --git a/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml b/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml index 2c170f072..6f259a9a7 100644 --- a/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml +++ b/python/ray/autoscaler/kubernetes/operator_configs/operator.yaml @@ -10,7 +10,7 @@ metadata: name: ray-operator-role rules: - apiGroups: ["", "cluster.ray.io"] - resources: ["rayclusters", "rayclusters/finalizers", "pods", "pods/exec"] + resources: ["rayclusters", "rayclusters/finalizers", "rayclusters/status", "pods", "pods/exec"] verbs: ["get", "watch", "list", "create", "delete", "patch", "update"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/python/ray/ray_operator/operator.py b/python/ray/ray_operator/operator.py index cc03c2fef..e39f4cfef 100644 --- a/python/ray/ray_operator/operator.py +++ b/python/ray/ray_operator/operator.py @@ -12,10 +12,12 @@ from ray import monitor from ray.ray_operator import operator_utils from ray import ray_constants +logger = logging.getLogger(__name__) + class RayCluster(): def __init__(self, config: Dict[str, Any]): - self.config = config + self.set_config(config) self.name = self.config["cluster_name"] self.config_path = operator_utils.config_path(self.name) @@ -23,6 +25,9 @@ class RayCluster(): self.subprocess = None # type: Optional[mp.Process] + def set_config(self, config: Dict[str, Any]) -> None: + self.config = config + def do_in_subprocess(self, f: Callable[[], None], wait_to_finish: bool = False) -> None: @@ -96,18 +101,42 @@ class RayCluster(): ray_clusters = {} +last_generation = {} -def cluster_action(cluster_config: Dict[str, Any], event_type: str) -> None: +def handle_event(event_type, cluster_cr, cluster_name): + # TODO: This only detects errors in the parent process and thus doesn't + # catch cluster-specific autoscaling failures. Fix that (perhaps at + # the same time that we eliminate subprocesses). + try: + cluster_action(event_type, cluster_cr, cluster_name) + except Exception: + logger.exception(f"Error while updating RayCluster {cluster_name}.") + operator_utils.set_status(cluster_cr, cluster_name, "Error") + + +def cluster_action(event_type, cluster_cr, cluster_name) -> None: + + cluster_config = operator_utils.cr_to_config(cluster_cr) cluster_name = cluster_config["cluster_name"] + if event_type == "ADDED": + operator_utils.set_status(cluster_cr, cluster_name, "Running") ray_clusters[cluster_name] = RayCluster(cluster_config) ray_clusters[cluster_name].create_or_update() + last_generation[cluster_name] = cluster_cr["metadata"]["generation"] elif event_type == "MODIFIED": - ray_clusters[cluster_name].create_or_update() + # Check metadata.generation to determine if there's a spec change. + current_generation = cluster_cr["metadata"]["generation"] + if current_generation > last_generation[cluster_name]: + ray_clusters[cluster_name].set_config(cluster_config) + ray_clusters[cluster_name].create_or_update() + last_generation[cluster_name] = current_generation + elif event_type == "DELETED": ray_clusters[cluster_name].clean_up() del ray_clusters[cluster_name] + del last_generation[cluster_name] def main() -> None: @@ -119,9 +148,9 @@ def main() -> None: try: for event in cluster_cr_stream: cluster_cr = event["object"] + cluster_name = cluster_cr["metadata"]["name"] event_type = event["type"] - cluster_config = operator_utils.cr_to_config(cluster_cr) - cluster_action(cluster_config, event_type) + handle_event(event_type, cluster_cr, cluster_name) except ApiException as e: if e.status == 404: raise Exception( diff --git a/python/ray/ray_operator/operator_utils.py b/python/ray/ray_operator/operator_utils.py index 5d51baebb..e20cd6719 100644 --- a/python/ray/ray_operator/operator_utils.py +++ b/python/ray/ray_operator/operator_utils.py @@ -99,3 +99,16 @@ def translate(configuration: Dict[str, Any], dictionary[field]: configuration[field] for field in dictionary if field in configuration } + + +def set_status(cluster_cr: Dict[str, Any], cluster_name: str, + status: str) -> None: + # TODO: Add retry logic in case of 409 due to old resource version. + cluster_cr["status"] = {"phase": status} + custom_objects_api()\ + .patch_namespaced_custom_object_status(namespace=RAY_NAMESPACE, + group="cluster.ray.io", + version="v1", + plural="rayclusters", + name=cluster_name, + body=cluster_cr) diff --git a/python/ray/tests/test_k8s_operator_examples.py b/python/ray/tests/test_k8s_operator_examples.py index 6ca2aca37..1636b347b 100644 --- a/python/ray/tests/test_k8s_operator_examples.py +++ b/python/ray/tests/test_k8s_operator_examples.py @@ -1,5 +1,6 @@ -"""Tests launch and teardown of multiple Ray clusters using Kubernetes +"""Tests launch, teardown, and update of multiple Ray clusters using Kubernetes operator.""" +import copy import sys import os import subprocess @@ -130,6 +131,17 @@ class KubernetesOperatorTest(unittest.TestCase): # Four pods remain wait_for_pods(4) + # Check that cluster updates work: increase minWorkers to 3 + # and check that one worker is created. + example_cluster_edit = copy.deepcopy(example_cluster_config) + example_cluster_edit["spec"]["podTypes"][1]["minWorkers"] = 3 + yaml.dump(example_cluster_edit, example_cluster_file) + example_cluster_file.flush() + cm = f"kubectl -n {NAMESPACE} apply -f {example_cluster_file.name}" + subprocess.check_call(cm, shell=True) + + wait_for_pods(5) + # Delete the first cluster cmd = f"kubectl -n {NAMESPACE} delete -f"\ f"{example_cluster_file.name}"