diff --git a/python/ray/autoscaler/kubernetes/config.py b/python/ray/autoscaler/kubernetes/config.py index b0a9452ce..f0e853ec1 100644 --- a/python/ray/autoscaler/kubernetes/config.py +++ b/python/ray/autoscaler/kubernetes/config.py @@ -21,6 +21,10 @@ def using_existing_msg(resource_type, name): return "using existing {} '{}'".format(resource_type, name) +def updating_existing_msg(resource_type, name): + return "updating existing {} '{}'".format(resource_type, name) + + def not_found_msg(resource_type, name): return "{} '{}' not found, attempting to create it".format( resource_type, name) @@ -43,6 +47,7 @@ def bootstrap_kubernetes(config): _configure_autoscaler_service_account(namespace, config["provider"]) _configure_autoscaler_role(namespace, config["provider"]) _configure_autoscaler_role_binding(namespace, config["provider"]) + _configure_services(namespace, config["provider"]) return config @@ -151,3 +156,36 @@ def _configure_autoscaler_role_binding(namespace, provider_config): logger.info(log_prefix + not_found_msg(binding_field, name)) auth_api().create_namespaced_role_binding(namespace, binding) logger.info(log_prefix + created_msg(binding_field, name)) + + +def _configure_services(namespace, provider_config): + service_field = "services" + if service_field not in provider_config: + logger.info(log_prefix + not_provided_msg(service_field)) + return + + services = provider_config[service_field] + for service in services: + if "namespace" not in service["metadata"]: + service["metadata"]["namespace"] = namespace + elif service["metadata"]["namespace"] != namespace: + raise InvalidNamespaceError(service_field, namespace) + + name = service["metadata"]["name"] + field_selector = "metadata.name={}".format(name) + services = core_api().list_namespaced_service( + namespace, field_selector=field_selector).items + if len(services) > 0: + assert len(services) == 1 + existing_service = services[0] + if service == existing_service: + logger.info(log_prefix + using_existing_msg("service", name)) + return + else: + logger.info(log_prefix + + updating_existing_msg("service", name)) + core_api().patch_namespaced_service(name, namespace, service) + else: + logger.info(log_prefix + not_found_msg("service", name)) + core_api().create_namespaced_service(namespace, service) + logger.info(log_prefix + created_msg("service", name)) diff --git a/python/ray/autoscaler/kubernetes/example-full.yaml b/python/ray/autoscaler/kubernetes/example-full.yaml index f84c1cd96..d7188b689 100644 --- a/python/ray/autoscaler/kubernetes/example-full.yaml +++ b/python/ray/autoscaler/kubernetes/example-full.yaml @@ -81,6 +81,41 @@ provider: name: autoscaler apiGroup: rbac.authorization.k8s.io + services: + # Service that maps to the head node of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: ray-head + spec: + # This selector must match the head node pod's selector below. + selector: + component: ray-head + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + + # Service that maps to the worker nodes of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: ray-workers + spec: + # This selector must match the worker node pods' selector below. + selector: + component: ray-worker + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + # Kubernetes pod config for the head node pod. head_node: apiVersion: v1 @@ -88,6 +123,11 @@ head_node: metadata: # Automatically generates a name for the pod with this prefix. generateName: ray-head- + + # Must match the head node service selector above if a head node + # service is required. + labels: + component: ray-head spec: # Change this if you altered the autoscaler_service_account above # or want to provide your own. @@ -160,6 +200,11 @@ worker_nodes: metadata: # Automatically generates a name for the pod with this prefix. generateName: ray-worker- + + # Must match the worker node service selector above if a worker node + # service is required. + labels: + component: ray-worker spec: serviceAccountName: default diff --git a/python/ray/autoscaler/kubernetes/node_provider.py b/python/ray/autoscaler/kubernetes/node_provider.py index a65f7e823..1775cece1 100644 --- a/python/ray/autoscaler/kubernetes/node_provider.py +++ b/python/ray/autoscaler/kubernetes/node_provider.py @@ -62,14 +62,18 @@ class KubernetesNodeProvider(NodeProvider): return pod.status.pod_ip def set_node_tags(self, node_id, tags): - body = {"metadata": {"labels": tags}} - core_api().patch_namespaced_pod(node_id, self.namespace, body) + pod = core_api().read_namespaced_pod_status(node_id, self.namespace) + pod.metadata.labels.update(tags) + core_api().patch_namespaced_pod(node_id, self.namespace, pod) def create_node(self, node_config, tags, count): pod_spec = node_config.copy() tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name pod_spec["metadata"]["namespace"] = self.namespace - pod_spec["metadata"]["labels"] = tags + if "labels" in pod_spec["metadata"]: + pod_spec["metadata"]["labels"].update(tags) + else: + pod_spec["metadata"]["labels"] = tags logger.info(log_prefix + "calling create_namespaced_pod " "(count={}).".format(count)) for _ in range(count):