[autoscaler][kubernetes][operator] Rudimentary error handling, make "MODIFIED" -> update event work. (#13756)

This commit is contained in:
Dmitri Gekhtman
2021-02-03 18:07:11 -08:00
committed by GitHub
parent e8fce9f1f3
commit 1187d1dd3e
6 changed files with 80 additions and 7 deletions
+3
View File
@@ -19,6 +19,9 @@ The rest of this document explains step-by-step how to use the Ray Kubernetes Op
.. role:: bash(code)
:language: bash
.. note::
The Ray Kubernetes Operator is still experimental. For the yaml files in the examples below, we recomend using the latest master version of Ray.
.. warning::
The Ray Kubernetes Operator requires Kubernetes version at least ``v1.17.0``. Check Kubernetes version info with the command
:bash:`kubectl version`.
@@ -13,6 +13,16 @@ spec:
- name: v1
served: true
storage: true
subresources:
status: {}
additionalPrinterColumns:
- name: status
type: string
description: Running or Error
jsonPath: .status.phase
- name: age
type: date
jsonPath: .metadata.creationTimestamp
schema:
openAPIV3Schema:
description: Ray cluster configuration
@@ -20,6 +30,12 @@ spec:
required:
- spec
properties:
status:
type: object
properties:
phase:
description: Running or Error
type: string
spec:
type: object
required:
@@ -10,7 +10,7 @@ metadata:
name: ray-operator-role
rules:
- apiGroups: ["", "cluster.ray.io"]
resources: ["rayclusters", "rayclusters/finalizers", "pods", "pods/exec"]
resources: ["rayclusters", "rayclusters/finalizers", "rayclusters/status", "pods", "pods/exec"]
verbs: ["get", "watch", "list", "create", "delete", "patch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
+34 -5
View File
@@ -12,10 +12,12 @@ from ray import monitor
from ray.ray_operator import operator_utils
from ray import ray_constants
logger = logging.getLogger(__name__)
class RayCluster():
def __init__(self, config: Dict[str, Any]):
self.config = config
self.set_config(config)
self.name = self.config["cluster_name"]
self.config_path = operator_utils.config_path(self.name)
@@ -23,6 +25,9 @@ class RayCluster():
self.subprocess = None # type: Optional[mp.Process]
def set_config(self, config: Dict[str, Any]) -> None:
self.config = config
def do_in_subprocess(self,
f: Callable[[], None],
wait_to_finish: bool = False) -> None:
@@ -96,18 +101,42 @@ class RayCluster():
ray_clusters = {}
last_generation = {}
def cluster_action(cluster_config: Dict[str, Any], event_type: str) -> None:
def handle_event(event_type, cluster_cr, cluster_name):
# TODO: This only detects errors in the parent process and thus doesn't
# catch cluster-specific autoscaling failures. Fix that (perhaps at
# the same time that we eliminate subprocesses).
try:
cluster_action(event_type, cluster_cr, cluster_name)
except Exception:
logger.exception(f"Error while updating RayCluster {cluster_name}.")
operator_utils.set_status(cluster_cr, cluster_name, "Error")
def cluster_action(event_type, cluster_cr, cluster_name) -> None:
cluster_config = operator_utils.cr_to_config(cluster_cr)
cluster_name = cluster_config["cluster_name"]
if event_type == "ADDED":
operator_utils.set_status(cluster_cr, cluster_name, "Running")
ray_clusters[cluster_name] = RayCluster(cluster_config)
ray_clusters[cluster_name].create_or_update()
last_generation[cluster_name] = cluster_cr["metadata"]["generation"]
elif event_type == "MODIFIED":
ray_clusters[cluster_name].create_or_update()
# Check metadata.generation to determine if there's a spec change.
current_generation = cluster_cr["metadata"]["generation"]
if current_generation > last_generation[cluster_name]:
ray_clusters[cluster_name].set_config(cluster_config)
ray_clusters[cluster_name].create_or_update()
last_generation[cluster_name] = current_generation
elif event_type == "DELETED":
ray_clusters[cluster_name].clean_up()
del ray_clusters[cluster_name]
del last_generation[cluster_name]
def main() -> None:
@@ -119,9 +148,9 @@ def main() -> None:
try:
for event in cluster_cr_stream:
cluster_cr = event["object"]
cluster_name = cluster_cr["metadata"]["name"]
event_type = event["type"]
cluster_config = operator_utils.cr_to_config(cluster_cr)
cluster_action(cluster_config, event_type)
handle_event(event_type, cluster_cr, cluster_name)
except ApiException as e:
if e.status == 404:
raise Exception(
+13
View File
@@ -99,3 +99,16 @@ def translate(configuration: Dict[str, Any],
dictionary[field]: configuration[field]
for field in dictionary if field in configuration
}
def set_status(cluster_cr: Dict[str, Any], cluster_name: str,
status: str) -> None:
# TODO: Add retry logic in case of 409 due to old resource version.
cluster_cr["status"] = {"phase": status}
custom_objects_api()\
.patch_namespaced_custom_object_status(namespace=RAY_NAMESPACE,
group="cluster.ray.io",
version="v1",
plural="rayclusters",
name=cluster_name,
body=cluster_cr)
+13 -1
View File
@@ -1,5 +1,6 @@
"""Tests launch and teardown of multiple Ray clusters using Kubernetes
"""Tests launch, teardown, and update of multiple Ray clusters using Kubernetes
operator."""
import copy
import sys
import os
import subprocess
@@ -130,6 +131,17 @@ class KubernetesOperatorTest(unittest.TestCase):
# Four pods remain
wait_for_pods(4)
# Check that cluster updates work: increase minWorkers to 3
# and check that one worker is created.
example_cluster_edit = copy.deepcopy(example_cluster_config)
example_cluster_edit["spec"]["podTypes"][1]["minWorkers"] = 3
yaml.dump(example_cluster_edit, example_cluster_file)
example_cluster_file.flush()
cm = f"kubectl -n {NAMESPACE} apply -f {example_cluster_file.name}"
subprocess.check_call(cm, shell=True)
wait_for_pods(5)
# Delete the first cluster
cmd = f"kubectl -n {NAMESPACE} delete -f"\
f"{example_cluster_file.name}"