diff --git a/ci/travis/format.sh b/ci/travis/format.sh index c86060eaa..2e6171662 100755 --- a/ci/travis/format.sh +++ b/ci/travis/format.sh @@ -97,6 +97,7 @@ MYPY_FILES=( 'autoscaler/node_provider.py' 'autoscaler/sdk.py' 'autoscaler/_private/commands.py' + 'operator.py' ) YAPF_EXCLUDES=( diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 5f12bb293..43ea7b2b6 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -72,6 +72,7 @@ class StandardAutoscaler: self.provider = None self.resource_demand_scheduler = None self.reset(errors_fatal=True) + self.head_node_ip = load_metrics.local_ip self.load_metrics = load_metrics self.max_failures = max_failures @@ -443,7 +444,7 @@ class StandardAutoscaler: initialization_commands=[], setup_commands=[], ray_start_commands=with_head_node_ip( - self.config["worker_start_ray_commands"]), + self.config["worker_start_ray_commands"], self.head_node_ip), runtime_hash=self.runtime_hash, file_mounts_contents_hash=self.file_mounts_contents_hash, process_runner=self.process_runner, @@ -516,9 +517,10 @@ class StandardAutoscaler: file_mounts=self.config["file_mounts"], initialization_commands=with_head_node_ip( self._get_node_type_specific_fields( - node_id, "initialization_commands")), - setup_commands=with_head_node_ip(init_commands), - ray_start_commands=with_head_node_ip(ray_start_commands), + node_id, "initialization_commands"), self.head_node_ip), + setup_commands=with_head_node_ip(init_commands, self.head_node_ip), + ray_start_commands=with_head_node_ip(ray_start_commands, + self.head_node_ip), runtime_hash=self.runtime_hash, file_mounts_contents_hash=self.file_mounts_contents_hash, is_head_node=False, diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 63cd5f9aa..c3a7dffe1 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -135,7 +135,7 @@ def create_or_update_cluster(config_file: str, override_cluster_name: Optional[str] = None, no_config_cache: bool = False, redirect_command_output: Optional[bool] = False, - use_login_shells: bool = True) -> None: + use_login_shells: bool = True) -> Dict[str, Any]: """Create or updates an autoscaling Ray cluster from a config json.""" set_using_login_shells(use_login_shells) if not use_login_shells: @@ -215,6 +215,7 @@ def create_or_update_cluster(config_file: str, try_logging_config(config) get_or_create_head_node(config, config_file, no_restart, restart_only, yes, override_cluster_name) + return config CONFIG_CACHE_VERSION = 1 diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index 9c7c4e5c5..d5fade4e5 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -162,8 +162,9 @@ def merge_setup_commands(config): return config -def with_head_node_ip(cmds): - head_ip = services.get_node_ip_address() +def with_head_node_ip(cmds, head_ip=None): + if head_ip is None: + head_ip = services.get_node_ip_address() out = [] for cmd in cmds: out.append("export RAY_HEAD_IP={}; {}".format(head_ip, cmd)) diff --git a/python/ray/autoscaler/kubernetes/operator_configs/operator_config.yaml b/python/ray/autoscaler/kubernetes/operator_configs/operator_config.yaml new file mode 100644 index 000000000..5cd239fac --- /dev/null +++ b/python/ray/autoscaler/kubernetes/operator_configs/operator_config.yaml @@ -0,0 +1,50 @@ +operator_role: +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ray-operator-serviceaccount +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: ray-operator-role +rules: +- apiGroups: ["", "rbac.authorization.k8s.io"] + resources: ["configmaps", "pods", "pods/exec", "services", "serviceaccounts", "roles", "rolebindings"] + verbs: ["get", "watch", "list", "create", "delete", "patch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: ray-operator-rolebinding +subjects: +- kind: ServiceAccount + name: ray-operator-serviceaccount +roleRef: + kind: Role + name: ray-operator-role + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 +kind: Pod +metadata: + name: ray-operator-pod +spec: + serviceAccountName: ray-operator-serviceaccount + containers: + - name: ray + imagePullPolicy: Always + image: rayproject/ray:nightly + command: ["/bin/bash", "-c", "--"] + args: ["ray-operator; trap : TERM INT; sleep infinity & wait;"] + env: + - name: RAY_OPERATOR_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + resources: + requests: + cpu: 1 + memory: 1Gi + limits: + memory: 2Gi diff --git a/python/ray/autoscaler/kubernetes/operator_configs/test_cluster_config.yaml b/python/ray/autoscaler/kubernetes/operator_configs/test_cluster_config.yaml new file mode 100644 index 000000000..d42e5749d --- /dev/null +++ b/python/ray/autoscaler/kubernetes/operator_configs/test_cluster_config.yaml @@ -0,0 +1,260 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: default + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Kubernetes resources that need to be configured for the autoscaler to be +# able to manage the Ray cluster. If any of the provided resources don't +# exist, the autoscaler will attempt to create them. If this fails, you may +# not have the required permissions and will have to request them to be +# created by your cluster administrator. +provider: + type: kubernetes + + # Exposing external IP addresses for ray pods isn't currently supported. + use_internal_ips: true + + # Namespace to use for all resources created. + namespace: ray + + services: + # Service that maps to the head node of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: ray-head + spec: + # This selector must match the head node pod's selector below. + selector: + component: ray-head + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + + # Service that maps to the worker nodes of the Ray cluster. + - apiVersion: v1 + kind: Service + metadata: + # NOTE: If you're running multiple Ray clusters with services + # on one Kubernetes cluster, they must have unique service + # names. + name: ray-workers + spec: + # This selector must match the worker node pods' selector below. + selector: + component: ray-worker + ports: + - protocol: TCP + port: 8000 + targetPort: 8000 + +# Kubernetes pod config for the head node pod. +available_node_types: + head_node: + resources: {} + node_config: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: ray-head- + + # Must match the head node service selector above if a head node + # service is required. + labels: + component: ray-head + spec: + # Restarting the head node automatically is not currently supported. + # If the head node goes down, `ray up` must be run again. + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: dshm + emptyDir: + medium: Memory + + containers: + - name: ray-node + imagePullPolicy: Always + # You are free (and encouraged) to use your own container image, + # but it should have the following installed: + # - rsync (used for `ray rsync` commands and file mounts) + # - screen (used for `ray attach`) + # - kubectl (used by the autoscaler to manage worker pods) + image: rayproject/ray:nightly + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + ports: + - containerPort: 6379 # Redis port. + - containerPort: 6380 # Redis port. + - containerPort: 6381 # Redis port. + - containerPort: 12345 # Ray internal communication. + - containerPort: 12346 # Ray internal communication. + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 1000m + memory: 512Mi + limits: + # The maximum memory that this pod is allowed to use. The + # limit will be detected by ray and split to use 10% for + # redis, 30% for the shared memory object store, and the + # rest for application memory. If this limit is not set and + # the object store size is not set manually, ray will + # allocate a very large object store in each pod that may + # cause problems for other pods. + memory: 2Gi + env: + # This is used in the head_start_ray_commands below so that + # Ray can spawn the correct number of processes. Omitting this + # may lead to degraded performance. + - name: MY_CPU_REQUEST + valueFrom: + resourceFieldRef: + resource: requests.cpu + + worker_nodes: + resources: {} + min_workers: 1 + max_workers: 2 + node_config: + apiVersion: v1 + kind: Pod + metadata: + # Automatically generates a name for the pod with this prefix. + generateName: ray-worker- + + # Must match the worker node service selector above if a worker node + # service is required. + labels: + component: ray-worker + spec: + serviceAccountName: default + + # Worker nodes will be managed automatically by the head node, so + # do not change the restart policy. + restartPolicy: Never + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumes: + - name: dshm + emptyDir: + medium: Memory + + containers: + - name: ray-node + imagePullPolicy: Always + # You are free (and encouraged) to use your own container image, + # but it should have the following installed: + # - rsync (used for `ray rsync` commands and file mounts) + image: rayproject/ray:nightly + # Do not change this command - it keeps the pod alive until it is + # explicitly killed. + command: ["/bin/bash", "-c", "--"] + args: ["trap : TERM INT; sleep infinity & wait;"] + ports: + - containerPort: 12345 # Ray internal communication. + - containerPort: 12346 # Ray internal communication. + + # This volume allocates shared memory for Ray to use for its plasma + # object store. If you do not provide this, Ray will fall back to + # /tmp which cause slowdowns if is not a shared memory volume. + volumeMounts: + - mountPath: /dev/shm + name: dshm + resources: + requests: + cpu: 100m + memory: 512Mi + limits: + # This memory limit will be detected by ray and split into + # 30% for plasma, and 70% for workers. + memory: 2Gi + env: + # This is used in the head_start_ray_commands below so that + # Ray can spawn the correct number of processes. Omitting this + # may lead to degraded performance. + - name: MY_CPU_REQUEST + valueFrom: + resourceFieldRef: + resource: requests.cpu + +head_node_type: + head_node + +worker_default_node_type: + worker_nodes +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +} + +# Files or directories to copy from the head node to the worker nodes. The format is a +# list of paths. The same path on the head node will be copied to the worker node. +# This behavior is a subset of the file_mounts behavior. In the vast majority of cases +# you should just use file_mounts. Only use this if you know what you're doing! +cluster_synced_files: [] + +# Whether changes to directories in file_mounts or cluster_synced_files in the head node +# should sync to the worker node continuously +file_mounts_sync_continuously: False + +# Patterns for files to exclude when running rsync up or rsync down. +# This is not supported on kubernetes. +rsync_exclude: [] + +# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for +# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided +# as a value, the behavior will match git's behavior for finding and using .gitignore files. +# This is not supported on kubernetes. +rsync_filter: [] + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: [] + +# List of shell commands to run to set up nodes. +setup_commands: [] + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: [] + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +# Note webui-host is set to 0.0.0.0 so that kubernetes can port forward. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --object-manager-port=8076 --dashboard-host 0.0.0.0 + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --num-cpus=$MY_CPU_REQUEST --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/python/ray/autoscaler/sdk.py b/python/ray/autoscaler/sdk.py index c6f165ded..87559c099 100644 --- a/python/ray/autoscaler/sdk.py +++ b/python/ray/autoscaler/sdk.py @@ -16,7 +16,7 @@ def create_or_update_cluster(cluster_config: Union[dict, str], *, no_restart: bool = False, restart_only: bool = False, - no_config_cache: bool = False) -> None: + no_config_cache: bool = False) -> Dict[str, Any]: """Create or updates an autoscaling Ray cluster from a config json. Args: diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 984caadfe..c3d47aa15 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -103,7 +103,8 @@ class Monitor: # Keep a mapping from raylet client ID to IP address to use # for updating the load metrics. self.raylet_id_to_ip_map = {} - self.load_metrics = LoadMetrics() + head_node_ip = redis_address.split(":")[0] + self.load_metrics = LoadMetrics(local_ip=head_node_ip) if autoscaling_config: self.autoscaler = StandardAutoscaler(autoscaling_config, self.load_metrics) diff --git a/python/ray/operator.py b/python/ray/operator.py new file mode 100644 index 000000000..a277b74bb --- /dev/null +++ b/python/ray/operator.py @@ -0,0 +1,108 @@ +""" +Ray operator for Kubernetes. + +Reads ray cluster config from a k8s ConfigMap, starts a ray head node pod using +create_or_update_cluster(), then runs an autoscaling loop in the operator pod +executing this script. Writes autoscaling logs to the directory +/root/ray-operator-logs. + +In this setup, the ray head node does not run an autoscaler. It is important +NOT to supply an --autoscaling-config argument to head node's ray start command +in the cluster config when using this operator. + +To run, first create a ConfigMap named ray-operator-configmap from a ray +cluster config. Then apply the manifest at python/ray/autoscaler/kubernetes/operator_configs/operator_config.yaml + +For example: +kubectl create namespace raytest +kubectl -n raytest create configmap ray-operator-configmap --from-file=python/ray/autoscaler/kubernetes/operator_configs/test_cluster_config.yaml +kubectl -n raytest apply -f python/ray/autoscaler/kubernetes/operator_configs/operator_config.yaml +""" # noqa +import os +from typing import Any, Dict, IO, Tuple + +import kubernetes +import yaml + +from ray._private import services +from ray.autoscaler._private.commands import create_or_update_cluster +from ray.autoscaler._private.kubernetes import core_api +from ray.utils import open_log +from ray import ray_constants + +RAY_CLUSTER_NAMESPACE = os.environ.get("RAY_OPERATOR_POD_NAMESPACE") +RAY_CONFIG_MAP = "ray-operator-configmap" +RAY_CONFIG_DIR = "/root" + +LOG_DIR = "/root/ray-operator-logs" +ERR_NAME, OUT_NAME = "ray-operator.err", "ray-operator.out" + + +def prepare_ray_cluster_config() -> str: + config_map = core_api().read_namespaced_config_map( + name=RAY_CONFIG_MAP, namespace=RAY_CLUSTER_NAMESPACE) + + # config_map.data consists of a single key:value pair + for config_file_name, config_string in config_map.data.items(): + config = yaml.safe_load(config_string) + config["provider"]["namespace"] = RAY_CLUSTER_NAMESPACE + cluster_config_path = os.path.join(RAY_CONFIG_DIR, config_file_name) + with open(cluster_config_path, "w") as file: + yaml.dump(config, file) + + return cluster_config_path + + +def get_ray_head_pod_ip(config: Dict[str, Any]) -> str: + cluster_name = config["cluster_name"] + label_selector = f"component=ray-head,ray-cluster-name={cluster_name}" + pods = core_api().list_namespaced_pod( + namespace=RAY_CLUSTER_NAMESPACE, label_selector=label_selector).items + assert (len(pods)) == 1 + head_pod = pods.pop() + return head_pod.status.pod_ip + + +def get_logs() -> Tuple[IO, IO]: + try: + os.makedirs(LOG_DIR) + except OSError: + pass + + err_path = os.path.join(LOG_DIR, ERR_NAME) + out_path = os.path.join(LOG_DIR, OUT_NAME) + + return open_log(err_path), open_log(out_path) + + +def main(): + kubernetes.config.load_incluster_config() + cluster_config_path = prepare_ray_cluster_config() + + config = create_or_update_cluster( + cluster_config_path, + override_min_workers=None, + override_max_workers=None, + no_restart=False, + restart_only=False, + yes=True, + no_config_cache=True) + with open(cluster_config_path, "w") as file: + yaml.dump(config, file) + + ray_head_pod_ip = get_ray_head_pod_ip(config) + # TODO: Add support for user-specified redis port and password + redis_address = services.address(ray_head_pod_ip, + ray_constants.DEFAULT_PORT) + stderr_file, stdout_file = get_logs() + + services.start_monitor( + redis_address, + stdout_file=stdout_file, + stderr_file=stderr_file, + autoscaling_config=cluster_config_path, + redis_password=ray_constants.REDIS_DEFAULT_PASSWORD) + + +if __name__ == "__main__": + main() diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index 27e2a7bb5..235240b12 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -20,6 +20,15 @@ CONFIG_PATHS += recursive_fnmatch( os.path.join(RAY_PATH, "tune", "examples"), "*.yaml") +def ignore_k8s_operator_configs(paths): + return [ + path for path in paths if "kubernetes/operator_configs" not in path + ] + + +CONFIG_PATHS = ignore_k8s_operator_configs(CONFIG_PATHS) + + class AutoscalingConfigTest(unittest.TestCase): def testValidateDefaultConfig(self): for config_path in CONFIG_PATHS: diff --git a/python/setup.py b/python/setup.py index f0e2907b7..26a1b0b32 100644 --- a/python/setup.py +++ b/python/setup.py @@ -109,11 +109,10 @@ extras = { "dataclasses; python_version < '3.7'" ], "tune": [ - "dataclasses; python_version < '3.7'", - "pandas", - "tabulate", - "tensorboardX", - ] + "dataclasses; python_version < '3.7'", "pandas", "tabulate", + "tensorboardX" + ], + "k8s": ["kubernetes"] } extras["rllib"] = extras["tune"] + [ @@ -468,7 +467,8 @@ setuptools.setup( entry_points={ "console_scripts": [ "ray=ray.scripts.scripts:main", - "rllib=ray.rllib.scripts:cli [rllib]", "tune=ray.tune.scripts:cli" + "rllib=ray.rllib.scripts:cli [rllib]", "tune=ray.tune.scripts:cli", + "ray-operator=ray.operator:main" ] }, include_package_data=True,