From 4fc0452e7be5b319f47bedb3962d87ec2586912e Mon Sep 17 00:00:00 2001 From: PidgeyBE Date: Mon, 10 Aug 2020 21:13:52 +0200 Subject: [PATCH] [autoscaler] Service and Ingress per worker pod (#9359) --- doc/source/cluster/cloud.rst | 1 + python/ray/autoscaler/kubernetes/__init__.py | 10 + .../kubernetes/example-ingress.yaml | 327 ++++++++++++++++++ .../autoscaler/kubernetes/node_provider.py | 82 ++++- 4 files changed, 417 insertions(+), 3 deletions(-) create mode 100644 python/ray/autoscaler/kubernetes/example-ingress.yaml diff --git a/doc/source/cluster/cloud.rst b/doc/source/cluster/cloud.rst index c89469221..642614f02 100644 --- a/doc/source/cluster/cloud.rst +++ b/doc/source/cluster/cloud.rst @@ -153,6 +153,7 @@ Kubernetes The cluster launcher can also be used to start Ray clusters on an existing Kubernetes cluster. First, install the Kubernetes API client (``pip install kubernetes``), then make sure your Kubernetes credentials are set up properly to access the cluster (if a command like ``kubectl get pods`` succeeds, you should be good to go). Once you have ``kubectl`` configured locally to access the remote cluster, you should be ready to launch your cluster. The provided `ray/python/ray/autoscaler/kubernetes/example-full.yaml `__ cluster config file will create a small cluster of one pod for the head node configured to autoscale up to two worker node pods, with all pods requiring 1 CPU and 0.5GiB of memory. +It's also possible to deploy service and ingress resources for each scaled worker pod. An example is provided in `ray/python/ray/autoscaler/kubernetes/example-ingress.yaml `__. Test that it works by running the following commands from your local machine: diff --git a/python/ray/autoscaler/kubernetes/__init__.py b/python/ray/autoscaler/kubernetes/__init__.py index 4dd701fa1..300addc2b 100644 --- a/python/ray/autoscaler/kubernetes/__init__.py +++ b/python/ray/autoscaler/kubernetes/__init__.py @@ -4,6 +4,7 @@ from kubernetes.config.config_exception import ConfigException _configured = False _core_api = None _auth_api = None +_extensions_beta_api = None def _load_config(): @@ -35,4 +36,13 @@ def auth_api(): return _auth_api +def extensions_beta_api(): + global _extensions_beta_api + if _extensions_beta_api is None: + _load_config() + _extensions_beta_api = kubernetes.client.ExtensionsV1beta1Api() + + return _extensions_beta_api + + log_prefix = "KubernetesNodeProvider: " diff --git a/python/ray/autoscaler/kubernetes/example-ingress.yaml b/python/ray/autoscaler/kubernetes/example-ingress.yaml new file mode 100644 index 000000000..d0dcb2f3e --- /dev/null +++ b/python/ray/autoscaler/kubernetes/example-ingress.yaml @@ -0,0 +1,327 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: default + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 0 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 2 + +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 1 + +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 1 + +# Kubernetes resources that need to be configured for the autoscaler to be +# able to manage the Ray cluster. If any of the provided resources don't +# exist, the autoscaler will attempt to create them. If this fails, you may +# not have the required permissions and will have to request them to be +# created by your cluster administrator. +provider: + type: kubernetes + + # Exposing external IP addresses for ray pods isn't currently supported. + use_internal_ips: true + + # Namespace to use for all resources created. + namespace: ray + + # ServiceAccount created by the autoscaler for the head node pod that it + # runs in. If this field isn't provided, the head pod config below must + # contain a user-created service account with the proper permissions. + autoscaler_service_account: + apiVersion: v1 + kind: ServiceAccount + metadata: + name: autoscaler + + # Role created by the autoscaler for the head node pod that it runs in. + # If this field isn't provided, the role referenced in + # autoscaler_role_binding must exist and have at least these permissions. + autoscaler_role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: autoscaler + rules: + - apiGroups: [""] + resources: ["pods", "pods/status", "pods/exec", "services"] + verbs: ["get", "watch", "list", "create", "delete", "patch"] + - apiGroups: ["extensions"] + resources: ["ingresses"] + verbs: ["get", "watch", "list", "create", "delete", "patch"] + + # RoleBinding created by the autoscaler for the head node pod that it runs + # in. If this field isn't provided, the head pod config below must contain + # a user-created service account with the proper permissions. + autoscaler_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: autoscaler + subjects: + - kind: ServiceAccount + name: autoscaler + roleRef: + kind: Role + name: autoscaler + apiGroup: rbac.authorization.k8s.io + + services: + # Service that maps to the head node of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: ray-head + spec: + # This selector must match the head node pod's selector below. + selector: + component: ray-head + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + + # Service that maps to the worker nodes of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: ray-workers + spec: + # This selector must match the worker node pods' selector below. + selector: + component: ray-worker + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + +# Kubernetes pod config for the head node pod. +head_node: + pod: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: ray-head- + + # Must match the head node service selector above if a head node + # service is required. + labels: + component: ray-head + spec: + # Change this if you altered the autoscaler_service_account above + # or want to provide your own. + serviceAccountName: autoscaler + + # Restarting the head node automatically is not currently supported. + # If the head node goes down, `ray up` must be run again. + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: dshm + emptyDir: + medium: Memory + + containers: + - name: ray-node + imagePullPolicy: IfNotPresent + # You are free (and encouraged) to use your own container image, + # but it should have the following installed: + # - rsync (used for `ray rsync` commands and file mounts) + # - screen (used for `ray attach`) + # - kubectl (used by the autoscaler to manage worker pods) + image: rayproject/autoscaler + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + ports: + - containerPort: 6379 # Redis port. + - containerPort: 6380 # Redis port. + - containerPort: 6381 # Redis port. + - containerPort: 12345 # Ray internal communication. + - containerPort: 12346 # Ray internal communication. + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 1000m + memory: 512Mi + limits: + # The maximum memory that this pod is allowed to use. The + # limit will be detected by ray and split to use 10% for + # redis, 30% for the shared memory object store, and the + # rest for application memory. If this limit is not set and + # the object store size is not set manually, ray will + # allocate a very large object store in each pod that may + # cause problems for other pods. + memory: 2Gi + env: + # This is used in the head_start_ray_commands below so that + # Ray can spawn the correct number of processes. Omitting this + # may lead to degraded performance. + - name: MY_CPU_REQUEST + valueFrom: + resourceFieldRef: + resource: requests.cpu + + +# Kubernetes pod config for worker node pods. +worker_nodes: + pod: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: ray-worker- + + # Must match the worker node service selector above if a worker node + # service is required. + labels: + component: ray-worker + spec: + serviceAccountName: default + + # Worker nodes will be managed automatically by the head node, so + # do not change the restart policy. + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: dshm + emptyDir: + medium: Memory + + containers: + - name: ray-node + imagePullPolicy: IfNotPresent + # You are free (and encouraged) to use your own container image, + # but it should have the following installed: + # - rsync (used for `ray rsync` commands and file mounts) + image: rayproject/autoscaler + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + ports: + - containerPort: 12345 # Ray internal communication. + - containerPort: 12346 # Ray internal communication. + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 1000m + memory: 512Mi + limits: + # This memory limit will be detected by ray and split into + # 30% for plasma, and 70% for workers. + memory: 2Gi + env: + # This is used in the head_start_ray_commands below so that + # Ray can spawn the correct number of processes. Omitting this + # may lead to degraded performance. + - name: MY_CPU_REQUEST + valueFrom: + resourceFieldRef: + resource: requests.cpu + + service: + apiVersion: v1 + kind: Service + # The service name gets automatically set by the + # autoscaler and gets the same name as the pod. + spec: + # The right selector is automatically applied by autoscaler + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + + ingress: + apiVersion: extensions/v1beta1 + kind: Ingress + metadata: + annotations: + kubernetes.io/ingress.class: nginx + spec: + rules: + - host: localhost + http: + paths: + - path: / + backend: + # The value of the serviceName must be set to `${RAY_POD_NAME} and will be + # automatically replaced by the name of the pod. + serviceName: ${RAY_POD_NAME} + servicePort: 8000 + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: [] + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +# Note webui-host is set to 0.0.0.0 so that kubernetes can port forward. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --webui-host 0.0.0.0 + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --num-cpus=$MY_CPU_REQUEST --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/python/ray/autoscaler/kubernetes/node_provider.py b/python/ray/autoscaler/kubernetes/node_provider.py index 18fe56082..facc44d82 100644 --- a/python/ray/autoscaler/kubernetes/node_provider.py +++ b/python/ray/autoscaler/kubernetes/node_provider.py @@ -1,7 +1,9 @@ import logging +from uuid import uuid4 +from kubernetes.client.rest import ApiException from ray.autoscaler.command_runner import KubernetesCommandRunner -from ray.autoscaler.kubernetes import core_api, log_prefix +from ray.autoscaler.kubernetes import core_api, log_prefix, extensions_beta_api from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.kubernetes.config import bootstrap_kubernetes from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME @@ -69,8 +71,13 @@ class KubernetesNodeProvider(NodeProvider): core_api().patch_namespaced_pod(node_id, self.namespace, pod) def create_node(self, node_config, tags, count): - pod_spec = node_config.copy() + conf = node_config.copy() + pod_spec = conf.get("pod", conf) + service_spec = conf.get("service") + ingress_spec = conf.get("ingress") + node_uuid = str(uuid4()) tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name + tags["ray-node-uuid"] = node_uuid pod_spec["metadata"]["namespace"] = self.namespace if "labels" in pod_spec["metadata"]: pod_spec["metadata"]["labels"].update(tags) @@ -78,11 +85,52 @@ class KubernetesNodeProvider(NodeProvider): pod_spec["metadata"]["labels"] = tags logger.info(log_prefix + "calling create_namespaced_pod " "(count={}).".format(count)) + new_nodes = [] for _ in range(count): - core_api().create_namespaced_pod(self.namespace, pod_spec) + pod = core_api().create_namespaced_pod(self.namespace, pod_spec) + new_nodes.append(pod) + + new_svcs = [] + if service_spec is not None: + logger.info(log_prefix + "calling create_namespaced_service " + "(count={}).".format(count)) + + for new_node in new_nodes: + + metadata = service_spec.get("metadata", {}) + metadata["name"] = new_node.metadata.name + service_spec["metadata"] = metadata + service_spec["spec"]["selector"] = {"ray-node-uuid": node_uuid} + svc = core_api().create_namespaced_service( + self.namespace, service_spec) + new_svcs.append(svc) + + if ingress_spec is not None: + logger.info(log_prefix + "calling create_namespaced_ingress " + "(count={}).".format(count)) + for new_svc in new_svcs: + metadata = ingress_spec.get("metadata", {}) + metadata["name"] = new_svc.metadata.name + ingress_spec["metadata"] = metadata + ingress_spec = _add_service_name_to_service_port( + ingress_spec, new_svc.metadata.name) + extensions_beta_api().create_namespaced_ingress( + self.namespace, ingress_spec) def terminate_node(self, node_id): + logger.info(log_prefix + "calling delete_namespaced_pod") core_api().delete_namespaced_pod(node_id, self.namespace) + try: + core_api().delete_namespaced_service(node_id, self.namespace) + except ApiException: + pass + try: + extensions_beta_api().delete_namespaced_ingress( + node_id, + self.namespace, + ) + except ApiException: + pass def terminate_nodes(self, node_ids): for node_id in node_ids: @@ -102,3 +150,31 @@ class KubernetesNodeProvider(NodeProvider): @staticmethod def bootstrap_config(cluster_config): return bootstrap_kubernetes(cluster_config) + + +def _add_service_name_to_service_port(spec, svc_name): + """Goes recursively through the ingress manifest and adds the + right serviceName next to every servicePort definition. + """ + if isinstance(spec, dict): + dict_keys = list(spec.keys()) + for k in dict_keys: + spec[k] = _add_service_name_to_service_port(spec[k], svc_name) + + # The magic string ${RAY_POD_NAME} is replaced with + # the true service name, which is equal to the worker pod name. + if k == "serviceName": + if spec[k] != "${RAY_POD_NAME}": + raise ValueError( + "The value of serviceName must be set to " + "${RAY_POD_NAME}. It is automatically replaced " + "when using the autoscaler.") + else: + spec["serviceName"] = svc_name + + elif isinstance(spec, list): + spec = [ + _add_service_name_to_service_port(item, svc_name) for item in spec + ] + + return spec