[autoscaler] Kubernetes autoscaler backend (#5492)

* Add Kubernetes NodeProvider to autoscaler

* Split off SSHCommandRunner

* Add KubernetesCommandRunner

* Cleanup

* More config options

* Check if auth present

* More auth checks

* Better output

* Always bootstrap config

* All working

* Add k8s-rsync comment

* Clean up manual k8s examples

* Fix up submit.yaml

* Automatically configure permissisons

* Fix get_node_provider arg

* Fix permissions

* Fill in empty auth

* Remove ray-cluster from this PR

* No hard dep on kubernetes library

* Move permissions into autoscaler config

* lint

* Fix indentation

* namespace validation

* Use cluster name tag

* Remove kubernetes from setup.py

* Comment in example configs

* Same default autoscaling config as aws

* Add Kubernetes quickstart

* lint

* Revert changes to submit.yaml (other PR)

* Install kubernetes in travis

* address comments

* Improve autoscaling doc

* kubectl command in setup

* Force use_internal_ips

* comments

* backend env in docs

* Change namespace config

* comments

* comments

* Fix yaml test
This commit is contained in:
Edward Oakes
2019-10-03 10:17:00 -07:00
committed by GitHub
parent 9df6eda84f
commit 972dddd776
13 changed files with 1074 additions and 271 deletions
+9 -2
View File
@@ -77,6 +77,12 @@ CLUSTER_CONFIG_SCHEMA = {
"head_ip": (str, OPTIONAL), # local cluster head node
"worker_ips": (list, OPTIONAL), # local cluster worker nodes
"use_internal_ips": (bool, OPTIONAL), # don't require public ips
"namespace": (str, OPTIONAL), # k8s namespace, if using k8s
# k8s autoscaler permissions, if using k8s
"autoscaler_service_account": (dict, OPTIONAL),
"autoscaler_role": (dict, OPTIONAL),
"autoscaler_role_binding": (dict, OPTIONAL),
"extra_config": (dict, OPTIONAL), # provider-specific config
# Whether to try to reuse previously stopped nodes instead of
@@ -89,10 +95,10 @@ CLUSTER_CONFIG_SCHEMA = {
# How Ray will authenticate with newly launched nodes.
"auth": (
{
"ssh_user": (str, REQUIRED), # e.g. ubuntu
"ssh_user": (str, OPTIONAL), # e.g. ubuntu
"ssh_private_key": (str, OPTIONAL),
},
REQUIRED),
OPTIONAL),
# Docker configuration. If this is specified, all setup and start commands
# will be executed in the container.
@@ -812,6 +818,7 @@ def fillout_defaults(config):
defaults.update(config)
merge_setup_commands(defaults)
dockerize_if_needed(defaults)
defaults["auth"] = defaults.get("auth", {})
return defaults
+13 -11
View File
@@ -76,13 +76,12 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
config = yaml.safe_load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
validate_config(config)
config = fillout_defaults(config)
validate_config(config)
confirm("This will destroy your cluster", yes)
provider = get_node_provider(config["provider"], config["cluster_name"])
try:
def remaining_nodes():
@@ -215,9 +214,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
logger.info("get_or_create_head_node: Updating files on head node...")
# Rewrite the auth config so that the head node can update the workers
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config = copy.deepcopy(config)
remote_config["auth"]["ssh_private_key"] = remote_key_path
if config["provider"]["type"] != "kubernetes":
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config["auth"]["ssh_private_key"] = remote_key_path
# Adjust for new file locations
new_mounts = {}
@@ -232,9 +232,12 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
remote_config_file.write(json.dumps(remote_config))
remote_config_file.flush()
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if config["provider"]["type"] != "kubernetes":
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
})
if restart_only:
init_commands = []
@@ -278,7 +281,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
"Head node up-to-date, IP address is: {}".format(head_node_ip))
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
use_docker = bool(config["docker"]["container_name"])
use_docker = "docker" in config and bool(
config["docker"]["container_name"])
if override_cluster_name:
modifiers = " --cluster-name={}".format(
quote(override_cluster_name))
@@ -291,10 +295,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
print("To open a console on the cluster:\n\n"
" ray attach {}{}\n".format(config_file, modifiers))
print("To ssh manually to the cluster, run:\n\n"
" ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
config["auth"]["ssh_user"],
head_node_ip))
print("To get a remote shell to the cluster manually, run:\n\n"
" {}\n".format(updater.cmd_runner.remote_shell_command_str()))
finally:
provider.cleanup()
@@ -424,7 +426,7 @@ def _exec(updater, cmd, screen, tmux, port_forward=None):
quote(cmd + "; exec bash")
]
cmd = " ".join(cmd)
updater.ssh_cmd(
updater.cmd_runner.run(
cmd,
allocate_tty=True,
exit_on_fail=True,
@@ -0,0 +1,42 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import kubernetes
from kubernetes.config.config_exception import ConfigException
_configured = False
_core_api = None
_auth_api = None
def _load_config():
global _configured
if _configured:
return
try:
kubernetes.config.load_incluster_config()
except ConfigException:
kubernetes.config.load_kube_config()
_configured = True
def core_api():
global _core_api
if _core_api is None:
_load_config()
_core_api = kubernetes.client.CoreV1Api()
return _core_api
def auth_api():
global _auth_api
if _auth_api is None:
_load_config()
_auth_api = kubernetes.client.RbacAuthorizationV1Api()
return _auth_api
log_prefix = "KubernetesNodeProvider: "
+157
View File
@@ -0,0 +1,157 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
from kubernetes import client
from ray.autoscaler.kubernetes import auth_api, core_api, log_prefix
logger = logging.getLogger(__name__)
class InvalidNamespaceError(ValueError):
def __init__(self, field_name, namespace):
self.message = ("Namespace of {} config doesn't match provided "
"namespace '{}'. Either set it to {} or remove the "
"field".format(field_name, namespace, namespace))
def __str__(self):
return self.message
def using_existing_msg(resource_type, name):
return "using existing {} '{}'".format(resource_type, name)
def not_found_msg(resource_type, name):
return "{} '{}' not found, attempting to create it".format(
resource_type, name)
def created_msg(resource_type, name):
return "successfully created {} '{}'".format(resource_type, name)
def not_provided_msg(resource_type):
return "no {} config provided, must already exist".format(resource_type)
def bootstrap_kubernetes(config):
if not config["provider"]["use_internal_ips"]:
return ValueError("Exposing external IP addresses for ray pods isn't "
"currently supported. Please set "
"'use_internal_ips' to false.")
namespace = _configure_namespace(config["provider"])
_configure_autoscaler_service_account(namespace, config["provider"])
_configure_autoscaler_role(namespace, config["provider"])
_configure_autoscaler_role_binding(namespace, config["provider"])
return config
def _configure_namespace(provider_config):
namespace_field = "namespace"
if namespace_field not in provider_config:
raise ValueError("Must specify namespace in Kubernetes config.")
namespace = provider_config[namespace_field]
field_selector = "metadata.name={}".format(namespace)
namespaces = core_api().list_namespace(field_selector=field_selector).items
if len(namespaces) > 0:
assert len(namespaces) == 1
logger.info(log_prefix +
using_existing_msg(namespace_field, namespace))
return namespace
logger.info(log_prefix + not_found_msg(namespace_field, namespace))
namespace_config = client.V1Namespace(
metadata=client.V1ObjectMeta(name=namespace))
core_api().create_namespace(namespace_config)
logger.info(log_prefix + created_msg(namespace_field, namespace))
return namespace
def _configure_autoscaler_service_account(namespace, provider_config):
account_field = "autoscaler_service_account"
if account_field not in provider_config:
logger.info(log_prefix + not_provided_msg(account_field))
return
account = provider_config[account_field]
if "namespace" not in account["metadata"]:
account["metadata"]["namespace"] = namespace
elif account["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(account_field, namespace)
name = account["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
accounts = core_api().list_namespaced_service_account(
namespace, field_selector=field_selector).items
if len(accounts) > 0:
assert len(accounts) == 1
logger.info(log_prefix + using_existing_msg(account_field, name))
return
logger.info(log_prefix + not_found_msg(account_field, name))
core_api().create_namespaced_service_account(namespace, account)
logger.info(log_prefix + created_msg(account_field, name))
def _configure_autoscaler_role(namespace, provider_config):
role_field = "autoscaler_role"
if role_field not in provider_config:
logger.info(log_prefix + not_provided_msg(role_field))
return
role = provider_config[role_field]
if "namespace" not in role["metadata"]:
role["metadata"]["namespace"] = namespace
elif role["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(role_field, namespace)
name = role["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
accounts = auth_api().list_namespaced_role(
namespace, field_selector=field_selector).items
if len(accounts) > 0:
assert len(accounts) == 1
logger.info(log_prefix + using_existing_msg(role_field, name))
return
logger.info(log_prefix + not_found_msg(role_field, name))
auth_api().create_namespaced_role(namespace, role)
logger.info(log_prefix + created_msg(role_field, name))
def _configure_autoscaler_role_binding(namespace, provider_config):
binding_field = "autoscaler_role_binding"
if binding_field not in provider_config:
logger.info(log_prefix + not_provided_msg(binding_field))
return
binding = provider_config[binding_field]
if "namespace" not in binding["metadata"]:
binding["metadata"]["namespace"] = namespace
elif binding["metadata"]["namespace"] != namespace:
raise InvalidNamespaceError(binding_field, namespace)
for subject in binding["subjects"]:
if "namespace" not in subject:
subject["namespace"] = namespace
elif subject["namespace"] != namespace:
raise InvalidNamespaceError(
binding_field + " subject '{}'".format(subject["name"]),
namespace)
name = binding["metadata"]["name"]
field_selector = "metadata.name={}".format(name)
accounts = auth_api().list_namespaced_role_binding(
namespace, field_selector=field_selector).items
if len(accounts) > 0:
assert len(accounts) == 1
logger.info(log_prefix + using_existing_msg(binding_field, name))
return
logger.info(log_prefix + not_found_msg(binding_field, name))
auth_api().create_namespaced_role_binding(namespace, binding)
logger.info(log_prefix + created_msg(binding_field, name))
@@ -0,0 +1,232 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: default
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2
# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Kubernetes resources that need to be configured for the autoscaler to be
# able to manage the Ray cluster. If any of the provided resources don't
# exist, the autoscaler will attempt to create them. If this fails, you may
# not have the required permissions and will have to request them to be
# created by your cluster administrator.
provider:
type: kubernetes
# Exposing external IP addresses for ray pods isn't currently supported.
use_internal_ips: true
# Namespace to use for all resources created.
namespace: ray
# ServiceAccount created by the autoscaler for the head node pod that it
# runs in. If this field isn't provided, the head pod config below must
# contain a user-created service account with the proper permissions.
autoscaler_service_account:
apiVersion: v1
kind: ServiceAccount
metadata:
name: autoscaler
# Role created by the autoscaler for the head node pod that it runs in.
# If this field isn't provided, the role referenced in
# autoscaler_role_binding must exist and have at least these permissions.
autoscaler_role:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: autoscaler
rules:
- apiGroups: [""]
resources: ["pods", "pods/status", "pods/exec"]
verbs: ["get", "watch", "list", "create", "delete", "patch"]
# RoleBinding created by the autoscaler for the head node pod that it runs
# in. If this field isn't provided, the head pod config below must contain
# a user-created service account with the proper permissions.
autoscaler_role_binding:
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: autoscaler
subjects:
- kind: ServiceAccount
name: autoscaler
roleRef:
kind: Role
name: autoscaler
apiGroup: rbac.authorization.k8s.io
# Kubernetes pod config for the head node pod.
head_node:
apiVersion: v1
kind: Pod
metadata:
# Automatically generates a name for the pod with this prefix.
generateName: ray-head-
spec:
# Change this if you altered the autoscaler_service_account above
# or want to provide your own.
serviceAccountName: autoscaler
# Restarting the head node automatically is not currently supported.
# If the head node goes down, `ray up` must be run again.
restartPolicy: Never
# This volume allocates shared memory for Ray to use for its plasma
# object store. If you do not provide this, Ray will fall back to
# /tmp which cause slowdowns if is not a shared memory volume.
volumes:
- name: dshm
emptyDir:
medium: Memory
containers:
- name: ray-node
imagePullPolicy: Always
# You are free (and encouraged) to use your own container image,
# but it should have the following installed:
# - rsync (used for `ray rsync` commands and file mounts)
# - screen (used for `ray attach`)
# - kubectl (used by the autoscaler to manage worker pods)
image: rayproject/autoscaler
# Do not change this command - it keeps the pod alive until it is
# explicitly killed.
command: ["/bin/bash", "-c", "--"]
args: ["trap : TERM INT; sleep infinity & wait;"]
ports:
- containerPort: 6379 # Redis port.
- containerPort: 6380 # Redis port.
- containerPort: 6381 # Redis port.
- containerPort: 12345 # Ray internal communication.
- containerPort: 12346 # Ray internal communication.
# This volume allocates shared memory for Ray to use for its plasma
# object store. If you do not provide this, Ray will fall back to
# /tmp which cause slowdowns if is not a shared memory volume.
volumeMounts:
- mountPath: /dev/shm
name: dshm
resources:
requests:
cpu: 1000m
memory: 512Mi
env:
# This is used in the head_start_ray_commands below so that
# Ray can spawn the correct number of processes. Omitting this
# may lead to degraded performance.
- name: MY_CPU_REQUEST
valueFrom:
resourceFieldRef:
resource: requests.cpu
# Kubernetes pod config for worker node pods.
worker_nodes:
apiVersion: v1
kind: Pod
metadata:
# Automatically generates a name for the pod with this prefix.
generateName: ray-worker-
spec:
serviceAccountName: default
# Worker nodes will be managed automatically by the head node, so
# do not change the restart policy.
restartPolicy: Never
# This volume allocates shared memory for Ray to use for its plasma
# object store. If you do not provide this, Ray will fall back to
# /tmp which cause slowdowns if is not a shared memory volume.
volumes:
- name: dshm
emptyDir:
medium: Memory
containers:
- name: ray-node
imagePullPolicy: Always
# You are free (and encouraged) to use your own container image,
# but it should have the following installed:
# - rsync (used for `ray rsync` commands and file mounts)
image: rayproject/autoscaler
# Do not change this command - it keeps the pod alive until it is
# explicitly killed.
command: ["/bin/bash", "-c", "--"]
args: ["trap : TERM INT; sleep infinity & wait;"]
ports:
- containerPort: 12345 # Ray internal communication.
- containerPort: 12346 # Ray internal communication.
# This volume allocates shared memory for Ray to use for its plasma
# object store. If you do not provide this, Ray will fall back to
# /tmp which cause slowdowns if is not a shared memory volume.
volumeMounts:
- mountPath: /dev/shm
name: dshm
resources:
requests:
cpu: 1000m
memory: 512Mi
env:
# This is used in the head_start_ray_commands below so that
# Ray can spawn the correct number of processes. Omitting this
# may lead to degraded performance.
- name: MY_CPU_REQUEST
valueFrom:
resourceFieldRef:
resource: requests.cpu
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []
# List of shell commands to run to set up nodes.
setup_commands: []
# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --num-cpus=$MY_CPU_REQUEST --redis-address=$RAY_HEAD_IP:6379 --object-manager-port=8076
@@ -0,0 +1,58 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers. min_workers default to 0.
max_workers: 1
# Kubernetes resources that need to be configured for the autoscaler to be
# able to manage the Ray cluster. If any of the provided resources don't
# exist, the autoscaler will attempt to create them. If this fails, you may
# not have the required permissions and will have to request them to be
# created by your cluster administrator.
provider:
type: kubernetes
# Exposing external IP addresses for ray pods isn't currently supported.
use_internal_ips: true
# Namespace to use for all resources created.
namespace: ray
# ServiceAccount created by the autoscaler for the head node pod that it
# runs in. If this field isn't provided, the head pod config below must
# contain a user-created service account with the proper permissions.
autoscaler_service_account:
apiVersion: v1
kind: ServiceAccount
metadata:
name: autoscaler
# Role created by the autoscaler for the head node pod that it runs in.
# If this field isn't provided, the role referenced in
# autoscaler_role_binding must exist and have at least these permissions.
autoscaler_role:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: autoscaler
rules:
- apiGroups: [""]
resources: ["pods", "pods/status", "pods/exec"]
verbs: ["get", "watch", "list", "create", "delete", "patch"]
# RoleBinding created by the autoscaler for the head node pod that it runs
# in. If this field isn't provided, the head pod config below must contain
# a user-created service account with the proper permissions.
autoscaler_role_binding:
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: autoscaler
subjects:
- kind: ServiceAccount
name: autoscaler
roleRef:
kind: Role
name: autoscaler
apiGroup: rbac.authorization.k8s.io
+25
View File
@@ -0,0 +1,25 @@
#!/bin/bash
# Helper script to use kubectl as a remote shell for rsync to sync files
# to/from pods that have rsync installed. Taken from:
# https://serverfault.com/questions/741670/rsync-files-to-a-kubernetes-pod/746352
if [ -z "$KRSYNC_STARTED" ]; then
export KRSYNC_STARTED=true
exec rsync --blocking-io --rsh "$0" $@
fi
# Running as --rsh
namespace=''
pod=$1
shift
# If use uses pod@namespace rsync passes as: {us} -l pod namespace ...
if [ "X$pod" = "X-l" ]; then
pod=$1
shift
namespace="-n $1"
shift
fi
exec kubectl $namespace exec -i $pod -- "$@"
@@ -0,0 +1,87 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
from ray.autoscaler.kubernetes import core_api, log_prefix
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME
logger = logging.getLogger(__name__)
def to_label_selector(tags):
label_selector = ""
for k, v in tags.items():
if label_selector != "":
label_selector += ","
label_selector += "{}={}".format(k, v)
return label_selector
class KubernetesNodeProvider(NodeProvider):
def __init__(self, provider_config, cluster_name):
NodeProvider.__init__(self, provider_config, cluster_name)
self.cluster_name = cluster_name
self.namespace = provider_config["namespace"]
def non_terminated_nodes(self, tag_filters):
# Match pods that are in the 'Pending' or 'Running' phase.
# Unfortunately there is no OR operator in field selectors, so we
# have to match on NOT any of the other phases.
field_selector = ",".join([
"status.phase!=Failed",
"status.phase!=Unknown",
"status.phase!=Succeeded",
"status.phase!=Terminating",
])
tag_filters[TAG_RAY_CLUSTER_NAME] = self.cluster_name
label_selector = to_label_selector(tag_filters)
pod_list = core_api().list_namespaced_pod(
self.namespace,
field_selector=field_selector,
label_selector=label_selector)
return [pod.metadata.name for pod in pod_list.items]
def is_running(self, node_id):
pod = core_api().read_namespaced_pod_status(node_id, self.namespace)
return pod.status.phase == "Running"
def is_terminated(self, node_id):
pod = core_api().read_namespaced_pod_status(node_id, self.namespace)
return pod.status.phase not in ["Running", "Pending"]
def node_tags(self, node_id):
pod = core_api().read_namespaced_pod_status(node_id, self.namespace)
return pod.metadata.labels
def external_ip(self, node_id):
raise NotImplementedError("Must use internal IPs with kubernetes.")
def internal_ip(self, node_id):
pod = core_api().read_namespaced_pod_status(node_id, self.namespace)
return pod.status.pod_ip
def set_node_tags(self, node_id, tags):
body = {"metadata": {"labels": tags}}
core_api().patch_namespaced_pod(node_id, self.namespace, body)
def create_node(self, node_config, tags, count):
pod_spec = node_config.copy()
tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name
pod_spec["metadata"]["namespace"] = self.namespace
pod_spec["metadata"]["labels"] = tags
logger.info(log_prefix + "calling create_namespaced_pod "
"(count={}).".format(count))
for _ in range(count):
core_api().create_namespaced_pod(self.namespace, pod_spec)
def terminate_node(self, node_id):
core_api().delete_namespaced_pod(node_id, self.namespace)
def terminate_nodes(self, node_ids):
for node_id in node_ids:
self.terminate_node(node_id)
+14 -2
View File
@@ -28,12 +28,24 @@ def import_local():
return bootstrap_local, LocalNodeProvider
def import_kubernetes():
from ray.autoscaler.kubernetes.config import bootstrap_kubernetes
from ray.autoscaler.kubernetes.node_provider import KubernetesNodeProvider
return bootstrap_kubernetes, KubernetesNodeProvider
def load_local_example_config():
import ray.autoscaler.local as ray_local
return os.path.join(
os.path.dirname(ray_local.__file__), "example-full.yaml")
def load_kubernetes_example_config():
import ray.autoscaler.kubernetes as ray_kubernetes
return os.path.join(
os.path.dirname(ray_kubernetes.__file__), "example-full.yaml")
def load_aws_example_config():
import ray.autoscaler.aws as ray_aws
return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml")
@@ -58,7 +70,7 @@ NODE_PROVIDERS = {
"aws": import_aws,
"gcp": import_gcp,
"azure": None, # TODO: support more node providers
"kubernetes": None,
"kubernetes": import_kubernetes,
"docker": None,
"external": import_external # Import an external module
}
@@ -68,7 +80,7 @@ DEFAULT_CONFIGS = {
"aws": load_aws_example_config,
"gcp": load_gcp_example_config,
"azure": None, # TODO: support more node providers
"kubernetes": None,
"kubernetes": load_kubernetes_example_config,
"docker": None,
}
+304 -175
View File
@@ -25,40 +25,142 @@ logger = logging.getLogger(__name__)
# How long to wait for a node to start, in seconds
NODE_START_WAIT_S = 300
SSH_CHECK_INTERVAL = 5
READY_CHECK_INTERVAL = 5
HASH_MAX_LENGTH = 10
KUBECTL_RSYNC = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "kubernetes/kubectl-rsync.sh")
def get_default_ssh_options(private_key, connect_timeout, ssh_control_path):
OPTS = [
("ConnectTimeout", "{}s".format(connect_timeout)),
("StrictHostKeyChecking", "no"),
("ControlMaster", "auto"),
("ControlPath", "{}/%C".format(ssh_control_path)),
("ControlPersist", "10s"),
]
return ["-i", private_key] + [
x for y in (["-o", "{}={}".format(k, v)] for k, v in OPTS) for x in y
]
def with_interactive(cmd):
force_interactive = ("true && source ~/.bashrc && "
"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ")
return ["bash", "--login", "-c", "-i", force_interactive + cmd]
class NodeUpdater(object):
"""A process for syncing files and running init commands on a node."""
class KubernetesCommandRunner(object):
def __init__(self, log_prefix, namespace, node_id, auth_config,
process_runner):
def __init__(self,
node_id,
provider_config,
provider,
auth_config,
cluster_name,
file_mounts,
initialization_commands,
setup_commands,
ray_start_commands,
runtime_hash,
process_runner=subprocess,
use_internal_ip=False):
self.log_prefix = log_prefix
self.process_runner = process_runner
self.node_id = node_id
self.namespace = namespace
self.kubectl = ["kubectl", "-n", self.namespace]
def run(self,
cmd,
timeout=120,
redirect=None,
allocate_tty=False,
exit_on_fail=False,
port_forward=None):
logger.info(self.log_prefix + "Running {}...".format(cmd))
if port_forward:
port_forward_cmd = self.kubectl + [
"port-forward", self.node_id,
str(port_forward)
]
port_forward_process = subprocess.Popen(port_forward_cmd)
# Give port-forward a grace period to run and print output before
# running the actual command. This is a little ugly, but it should
# work in most scenarios and nothing should go very wrong if the
# command starts running before the port forward starts.
time.sleep(1)
final_cmd = self.kubectl + [
"exec", "-it" if allocate_tty else "-i", self.node_id, "--"
] + with_interactive(cmd)
try:
self.process_runner.check_call(
final_cmd, stdout=redirect, stderr=redirect)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
logger.error(self.log_prefix +
"Command failed: \n\n {}\n".format(quoted_cmd))
sys.exit(1)
else:
raise
finally:
# Clean up the port forward process. First, try to let it exit
# gracefull with SIGTERM. If that doesn't work after 1s, send
# SIGKILL.
if port_forward:
port_forward_process.terminate()
for _ in range(10):
time.sleep(0.1)
port_forward_process.poll()
if port_forward_process.returncode:
break
logger.info(self.log_prefix +
"Waiting for port forward to die...")
else:
logger.warning(self.log_prefix +
"Killing port forward with SIGKILL.")
port_forward_process.kill()
def run_rsync_up(self, source, target, redirect=None):
if target.startswith("~"):
target = "/root" + target[1:]
try:
self.process_runner.check_call(
[
KUBECTL_RSYNC,
"-avz",
source,
"{}@{}:{}".format(self.node_id, self.namespace, target),
],
stdout=redirect,
stderr=redirect)
except Exception as e:
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.process_runner.check_call(
self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace,
self.node_id, target)
],
stdout=redirect,
stderr=redirect)
def run_rsync_down(self, source, target, redirect=None):
if target.startswith("~"):
target = "/root" + target[1:]
try:
self.process_runner.check_call(
[
KUBECTL_RSYNC,
"-avz",
"{}@{}:{}".format(self.node_id, self.namespace, source),
target,
],
stdout=redirect,
stderr=redirect)
except Exception as e:
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.process_runner.check_call(
self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id,
source), target
],
stdout=redirect,
stderr=redirect)
def remote_shell_command_str(self):
return "{} exec -it {} bash".format(" ".join(self.kubectl),
self.node_id)
class SSHCommandRunner(object):
def __init__(self, log_prefix, node_id, provider, auth_config,
cluster_name, process_runner, use_internal_ip):
ssh_control_hash = hashlib.md5(cluster_name.encode()).hexdigest()
ssh_user_hash = hashlib.md5(getuser().encode()).hexdigest()
@@ -66,24 +168,29 @@ class NodeUpdater(object):
ssh_user_hash[:HASH_MAX_LENGTH],
ssh_control_hash[:HASH_MAX_LENGTH])
self.daemon = True
self.log_prefix = log_prefix
self.process_runner = process_runner
self.node_id = node_id
self.use_internal_ip = (use_internal_ip or provider_config.get(
"use_internal_ips", False))
self.use_internal_ip = use_internal_ip
self.provider = provider
self.ssh_private_key = auth_config["ssh_private_key"]
self.ssh_user = auth_config["ssh_user"]
self.ssh_control_path = ssh_control_path
self.ssh_ip = None
self.file_mounts = {
remote: os.path.expanduser(local)
for remote, local in file_mounts.items()
}
self.initialization_commands = initialization_commands
self.setup_commands = setup_commands
self.ray_start_commands = ray_start_commands
self.runtime_hash = runtime_hash
def get_default_ssh_options(self, connect_timeout):
OPTS = [
("ConnectTimeout", "{}s".format(connect_timeout)),
("StrictHostKeyChecking", "no"),
("ControlMaster", "auto"),
("ControlPath", "{}/%C".format(self.ssh_control_path)),
("ControlPersist", "10s"),
]
return ["-i", self.ssh_private_key] + [
x for y in (["-o", "{}={}".format(k, v)] for k, v in OPTS)
for x in y
]
def get_node_ip(self):
if self.use_internal_ip:
@@ -94,8 +201,7 @@ class NodeUpdater(object):
def wait_for_ip(self, deadline):
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
logger.info("NodeUpdater: "
"Waiting for IP of {}...".format(self.node_id))
logger.info(self.log_prefix + "Waiting for IP...")
ip = self.get_node_ip()
if ip is not None:
return ip
@@ -110,7 +216,7 @@ class NodeUpdater(object):
# We assume that this never changes.
# I think that's reasonable.
deadline = time.time() + NODE_START_WAIT_S
with LogTimer("NodeUpdater: {}: Got IP".format(self.node_id)):
with LogTimer(self.log_prefix + "Got IP"):
ip = self.wait_for_ip(deadline)
assert ip is not None, "Unable to find IP of node"
@@ -134,24 +240,125 @@ class NodeUpdater(object):
stdout=redirect,
stderr=redirect)
except subprocess.CalledProcessError as e:
logger.warning(e)
logger.warning(self.log_prefix + str(e))
def run(self,
cmd,
timeout=120,
redirect=None,
allocate_tty=False,
exit_on_fail=False,
port_forward=None):
self.set_ssh_ip_if_required()
logger.info(self.log_prefix +
"Running {} on {}...".format(cmd, self.ssh_ip))
ssh = ["ssh"]
if allocate_tty:
ssh.append("-tt")
if port_forward:
ssh += ["-L", "{}:localhost:{}".format(port_forward, port_forward)]
final_cmd = ssh + self.get_default_ssh_options(timeout) + [
"{}@{}".format(self.ssh_user, self.ssh_ip)
] + with_interactive(cmd)
try:
self.process_runner.check_call(
final_cmd, stdout=redirect, stderr=redirect)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
logger.error(self.log_prefix +
"Command failed: \n\n {}\n".format(quoted_cmd))
sys.exit(1)
else:
raise
def run_rsync_up(self, source, target, redirect=None):
self.set_ssh_ip_if_required()
self.process_runner.check_call(
[
"rsync", "--rsh",
" ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz",
source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target)
],
stdout=redirect,
stderr=redirect)
def rsync_down(self, source, target, redirect=None):
self.set_ssh_ip_if_required()
self.process_runner.check_call(
[
"rsync", "--rsh",
" ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz",
"{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target
],
stdout=redirect,
stderr=redirect)
def remote_shell_command_str(self):
return "ssh -i {} {}@{}\n".format(self.ssh_private_key, self.ssh_user,
self.ssh_ip)
class NodeUpdater(object):
"""A process for syncing files and running init commands on a node."""
def __init__(self,
node_id,
provider_config,
provider,
auth_config,
cluster_name,
file_mounts,
initialization_commands,
setup_commands,
ray_start_commands,
runtime_hash,
process_runner=subprocess,
use_internal_ip=False):
self.log_prefix = "NodeUpdater: {}: ".format(node_id)
if provider_config["type"] == "kubernetes":
self.cmd_runner = KubernetesCommandRunner(
self.log_prefix, provider.namespace, node_id, auth_config,
process_runner)
else:
use_internal_ip = (use_internal_ip or provider_config.get(
"use_internal_ips", False))
self.cmd_runner = SSHCommandRunner(
self.log_prefix, node_id, provider, auth_config, cluster_name,
process_runner, use_internal_ip)
self.daemon = True
self.process_runner = process_runner
self.node_id = node_id
self.provider = provider
self.file_mounts = {
remote: os.path.expanduser(local)
for remote, local in file_mounts.items()
}
self.initialization_commands = initialization_commands
self.setup_commands = setup_commands
self.ray_start_commands = ray_start_commands
self.runtime_hash = runtime_hash
def run(self):
logger.info("NodeUpdater: "
"{}: Updating to {}".format(self.node_id,
self.runtime_hash))
logger.info(self.log_prefix +
"Updating to {}".format(self.runtime_hash))
try:
m = "{}: Applied config {}".format(self.node_id, self.runtime_hash)
with LogTimer("NodeUpdater: {}".format(m)):
with LogTimer(self.log_prefix +
"Applied config {}".format(self.runtime_hash)):
self.do_update()
except Exception as e:
error_str = str(e)
if hasattr(e, "cmd"):
error_str = "(Exit Status {}) {}".format(
e.returncode, " ".join(e.cmd))
logger.error("NodeUpdater: "
"{}: Error updating {}".format(
self.node_id, error_str))
logger.error(self.log_prefix +
"Error updating {}".format(error_str))
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED})
raise e
@@ -164,35 +371,6 @@ class NodeUpdater(object):
self.exitcode = 0
def wait_for_ssh(self, deadline):
logger.info("NodeUpdater: "
"{}: Waiting for SSH...".format(self.node_id))
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
try:
logger.debug("NodeUpdater: "
"{}: Waiting for SSH...".format(self.node_id))
# Setting redirect=False allows the user to see errors like
# unix_listener: path "/tmp/rkn_ray_ssh_sockets/..." too long
# for Unix domain socket.
self.ssh_cmd("uptime", connect_timeout=5, redirect=False)
return True
except Exception as e:
retry_str = str(e)
if hasattr(e, "cmd"):
retry_str = "(Exit Status {}): {}".format(
e.returncode, " ".join(e.cmd))
logger.debug("NodeUpdater: "
"{}: SSH not up, retrying: {}".format(
self.node_id, retry_str))
time.sleep(SSH_CHECK_INTERVAL)
return False
def sync_file_mounts(self, sync_cmd):
# Rsync file mounts
for remote_path, local_path in self.file_mounts.items():
@@ -203,26 +381,46 @@ class NodeUpdater(object):
if not remote_path.endswith("/"):
remote_path += "/"
m = "{}: Synced {} to {}".format(self.node_id, local_path,
remote_path)
with LogTimer("NodeUpdater {}".format(m)):
self.ssh_cmd(
"mkdir -p {}".format(os.path.dirname(remote_path)),
redirect=None,
)
with LogTimer(self.log_prefix +
"Synced {} to {}".format(local_path, remote_path)):
self.cmd_runner.run("mkdir -p {}".format(
os.path.dirname(remote_path)))
sync_cmd(local_path, remote_path, redirect=None)
def wait_ready(self, deadline):
with LogTimer(self.log_prefix + "Got remote shell"):
logger.info(self.log_prefix + "Waiting for remote shell...")
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
try:
logger.debug(self.log_prefix +
"Waiting for remote shell...")
# Setting redirect=False allows the user to see errors like
# unix_listener: path "/tmp/rkn_ray_ssh_sockets/..." too
# long for Unix domain socket.
self.cmd_runner.run("uptime", timeout=5, redirect=False)
return True
except Exception as e:
retry_str = str(e)
if hasattr(e, "cmd"):
retry_str = "(Exit Status {}): {}".format(
e.returncode, " ".join(e.cmd))
logger.debug(self.log_prefix +
"Node not up, retrying: {}".format(retry_str))
time.sleep(READY_CHECK_INTERVAL)
assert False, "Unable to connect to node"
def do_update(self):
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_WAITING_FOR_SSH})
deadline = time.time() + NODE_START_WAIT_S
self.set_ssh_ip_if_required()
# Wait for SSH access
with LogTimer("NodeUpdater: " "{}: Got SSH".format(self.node_id)):
ssh_ok = self.wait_for_ssh(deadline)
assert ssh_ok, "Unable to SSH to node"
self.wait_ready(deadline)
node_tags = self.provider.node_tags(self.node_id)
if node_tags.get(TAG_RAY_RUNTIME_CONFIG) == self.runtime_hash:
@@ -237,97 +435,28 @@ class NodeUpdater(object):
# Run init commands
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SETTING_UP})
m = "{}: Initialization commands completed".format(self.node_id)
with LogTimer("NodeUpdater: {}".format(m)):
with LogTimer(self.log_prefix +
"Initialization commands completed"):
for cmd in self.initialization_commands:
self.ssh_cmd(cmd)
self.cmd_runner.run(cmd)
m = "{}: Setup commands completed".format(self.node_id)
with LogTimer("NodeUpdater: {}".format(m)):
with LogTimer(self.log_prefix + "Setup commands completed"):
for cmd in self.setup_commands:
self.ssh_cmd(cmd)
self.cmd_runner.run(cmd)
m = "{}: Ray start commands completed".format(self.node_id)
with LogTimer("NodeUpdater: {}".format(m)):
with LogTimer(self.log_prefix + "Ray start commands completed"):
for cmd in self.ray_start_commands:
self.ssh_cmd(cmd)
self.cmd_runner.run(cmd)
def rsync_up(self, source, target, redirect=None):
logger.info("NodeUpdater: "
"{}: Syncing {} to {}...".format(self.node_id, source,
target))
self.set_ssh_ip_if_required()
self.process_runner.check_call(
[
"rsync", "-e", " ".join(["ssh"] + get_default_ssh_options(
self.ssh_private_key, 120, self.ssh_control_path)), "-avz",
source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target)
],
stdout=redirect or sys.stdout,
stderr=redirect or sys.stderr)
logger.info(self.log_prefix +
"Syncing {} to {}...".format(source, target))
self.cmd_runner.run_rsync_up(source, target, redirect=None)
def rsync_down(self, source, target, redirect=None):
logger.info("NodeUpdater: "
"{}: Syncing {} from {}...".format(self.node_id, source,
target))
self.set_ssh_ip_if_required()
self.process_runner.check_call(
[
"rsync", "-e", " ".join(["ssh"] + get_default_ssh_options(
self.ssh_private_key, 120, self.ssh_control_path)), "-avz",
"{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target
],
stdout=redirect or sys.stdout,
stderr=redirect or sys.stderr)
def ssh_cmd(self,
cmd,
connect_timeout=120,
redirect=None,
allocate_tty=False,
emulate_interactive=True,
exit_on_fail=False,
port_forward=None):
self.set_ssh_ip_if_required()
logger.info("NodeUpdater: Running {} on {}...".format(
cmd, self.ssh_ip))
ssh = ["ssh"]
if allocate_tty:
ssh.append("-tt")
if emulate_interactive:
force_interactive = (
"true && source ~/.bashrc && "
"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ")
cmd = "bash --login -c -i {}".format(
quote(force_interactive + cmd))
if port_forward is None:
ssh_opt = []
else:
ssh_opt = [
"-L", "{}:localhost:{}".format(port_forward, port_forward)
]
final_cmd = ssh + ssh_opt + get_default_ssh_options(
self.ssh_private_key, connect_timeout, self.ssh_control_path) + [
"{}@{}".format(self.ssh_user, self.ssh_ip), cmd
]
try:
self.process_runner.check_call(
final_cmd,
stdout=redirect or sys.stdout,
stderr=redirect or sys.stderr)
except subprocess.CalledProcessError:
if exit_on_fail:
# Only reason we need this exit flag here is because here we
# know the final command and can print it nicely before exit()
logger.error("Command failed: \n\n {}\n".format(
" ".join(final_cmd)))
sys.exit(1)
else:
raise
logger.info(self.log_prefix +
"Syncing {} from {}...".format(source, target))
self.cmd_runner.run_rsync_down(source, target, redirect=None)
class NodeUpdaterThread(NodeUpdater, Thread):
+2
View File
@@ -42,6 +42,8 @@ ray_autoscaler_files = [
"ray/autoscaler/aws/example-full.yaml",
"ray/autoscaler/gcp/example-full.yaml",
"ray/autoscaler/local/example-full.yaml",
"ray/autoscaler/kubernetes/example-full.yaml",
"ray/autoscaler/kubernetes/kubectl-rsync.sh",
]
ray_project_files = [