diff --git a/python/MANIFEST.in b/python/MANIFEST.in new file mode 100644 index 000000000..9ee720972 --- /dev/null +++ b/python/MANIFEST.in @@ -0,0 +1 @@ +include ray/python/ray/autoscaler/ray-schema.json \ No newline at end of file diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index f28b1df9b..efdd86663 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -1,18 +1,18 @@ +from collections import defaultdict import copy import hashlib import json +import jsonschema import logging import math +import numpy as np import os import subprocess import threading import time -from collections import defaultdict - -import numpy as np -import ray.services as services import yaml -from ray.worker import global_worker + +import ray from ray.autoscaler.docker import dockerize_if_needed from ray.autoscaler.node_provider import get_node_provider, \ get_default_config @@ -25,131 +25,15 @@ from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \ AUTOSCALER_RESOURCE_REQUEST_CHANNEL, MEMORY_RESOURCE_UNIT_BYTES -from six import string_types +import ray.services as services +from ray.worker import global_worker from six.moves import queue logger = logging.getLogger(__name__) REQUIRED, OPTIONAL = True, False - -# For (a, b), if a is a dictionary object, then -# no extra fields can be introduced. - -CLUSTER_CONFIG_SCHEMA = { - # An unique identifier for the head node and workers of this cluster. - "cluster_name": (str, REQUIRED), - - # The minimum number of workers nodes to launch in addition to the head - # node. This number should be >= 0. - "min_workers": (int, OPTIONAL), - - # The maximum number of workers nodes to launch in addition to the head - # node. This takes precedence over min_workers. - "max_workers": (int, REQUIRED), - - # The number of workers to launch initially, in addition to the head node. - "initial_workers": (int, OPTIONAL), - - # The mode of the autoscaler e.g. default, aggressive - "autoscaling_mode": (str, OPTIONAL), - - # The autoscaler will scale up the cluster to this target fraction of - # resources usage. For example, if a cluster of 8 nodes is 100% busy - # and target_utilization was 0.8, it would resize the cluster to 10. - "target_utilization_fraction": (float, OPTIONAL), - - # If a node is idle for this many minutes, it will be removed. - "idle_timeout_minutes": (int, OPTIONAL), - - # Cloud-provider specific configuration. - "provider": ( - { - "type": (str, REQUIRED), # e.g. aws - "region": (str, OPTIONAL), # e.g. us-east-1 - "availability_zone": (str, OPTIONAL), # e.g. us-east-1a - "module": (str, - OPTIONAL), # module, if using external node provider - "project_id": (None, OPTIONAL), # gcp project id, if using gcp - "head_ip": (str, OPTIONAL), # local cluster head node - "worker_ips": (list, OPTIONAL), # local cluster worker nodes - "use_internal_ips": (bool, OPTIONAL), # don't require public ips - "namespace": (str, OPTIONAL), # k8s namespace, if using k8s - - # k8s autoscaler permissions, if using k8s - "autoscaler_service_account": (dict, OPTIONAL), - "autoscaler_role": (dict, OPTIONAL), - "autoscaler_role_binding": (dict, OPTIONAL), - "extra_config": (dict, OPTIONAL), # provider-specific config - - # Whether to try to reuse previously stopped nodes instead of - # launching nodes. This will also cause the autoscaler to stop - # nodes instead of terminating them. Only implemented for AWS. - "cache_stopped_nodes": (bool, OPTIONAL), - }, - REQUIRED), - - # How Ray will authenticate with newly launched nodes. - "auth": ( - { - "ssh_user": (str, OPTIONAL), # e.g. ubuntu - "ssh_private_key": (str, OPTIONAL), - }, - OPTIONAL), - - # Docker configuration. If this is specified, all setup and start commands - # will be executed in the container. - "docker": ( - { - "image": (str, OPTIONAL), # e.g. tensorflow/tensorflow:1.5.0-py3 - "container_name": (str, OPTIONAL), # e.g., ray_docker - "pull_before_run": (bool, OPTIONAL), # run `docker pull` first - # shared options for starting head/worker docker - "run_options": (list, OPTIONAL), - - # image for head node, takes precedence over "image" if specified - "head_image": (str, OPTIONAL), - # head specific run options, appended to run_options - "head_run_options": (list, OPTIONAL), - # analogous to head_image - "worker_image": (str, OPTIONAL), - # analogous to head_run_options - "worker_run_options": (list, OPTIONAL), - }, - OPTIONAL), - - # Provider-specific config for the head node, e.g. instance type. - "head_node": (dict, OPTIONAL), - - # Provider-specific config for worker nodes. e.g. instance type. - "worker_nodes": (dict, OPTIONAL), - - # Map of remote paths to local paths, e.g. {"/tmp/data": "/my/local/data"} - "file_mounts": (dict, OPTIONAL), - - # List of commands that will be run before `setup_commands`. If docker is - # enabled, these commands will run outside the container and before docker - # is setup. - "initialization_commands": (list, OPTIONAL), - - # List of common shell commands to run to setup nodes. - "setup_commands": (list, OPTIONAL), - - # Commands that will be run on the head node after common setup. - "head_setup_commands": (list, OPTIONAL), - - # Commands that will be run on worker nodes after common setup. - "worker_setup_commands": (list, OPTIONAL), - - # Command to start ray on the head node. You shouldn't need to modify this. - "head_start_ray_commands": (list, OPTIONAL), - - # Command to start ray on worker nodes. You shouldn't need to modify this. - "worker_start_ray_commands": (list, OPTIONAL), - - # Whether to avoid restarting the cluster during updates. This field is - # controlled by the ray --no-restart flag and cannot be set by the user. - "no_restart": (None, OPTIONAL), -} +RAY_SCHEMA_PATH = os.path.join( + os.path.dirname(ray.autoscaler.__file__), "ray-schema.json") class LoadMetrics: @@ -764,61 +648,17 @@ class StandardAutoscaler: len(nodes))) -def typename(v): - if isinstance(v, type): - return v.__name__ - else: - return type(v).__name__ - - -def check_required(config, schema): - # Check required schema entries - if not isinstance(config, dict): - raise ValueError("Config is not a dictionary") - - for k, (v, kreq) in schema.items(): - if v is None: - continue # None means we don't validate the field - if kreq is REQUIRED: - if k not in config: - type_str = typename(v) - raise ValueError( - "Missing required config key `{}` of type {}".format( - k, type_str)) - if not isinstance(v, type): - check_required(config[k], v) - - -def check_extraneous(config, schema): - """Make sure all items of config are in schema""" - if not isinstance(config, dict): - raise ValueError("Config {} is not a dictionary".format(config)) - for k in config: - if k not in schema: - raise ValueError("Unexpected config key `{}` not in {}".format( - k, list(schema.keys()))) - v, kreq = schema[k] - if v is None: - continue - elif isinstance(v, type): - if not isinstance(config[k], v): - if v is str and isinstance(config[k], string_types): - continue - raise ValueError( - "Config key `{}` has wrong type {}, expected {}".format( - k, - type(config[k]).__name__, v.__name__)) - else: - check_extraneous(config[k], v) - - -def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA): +def validate_config(config): """Required Dicts indicate that no extra fields can be introduced.""" if not isinstance(config, dict): raise ValueError("Config {} is not a dictionary".format(config)) - check_required(config, schema) - check_extraneous(config, schema) + with open(RAY_SCHEMA_PATH) as f: + schema = json.load(f) + try: + jsonschema.validate(config, schema) + except jsonschema.ValidationError as e: + raise jsonschema.ValidationError(message=e.message) from None def fillout_defaults(config): diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json new file mode 100644 index 000000000..32903a3e5 --- /dev/null +++ b/python/ray/autoscaler/ray-schema.json @@ -0,0 +1,243 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/python/ray/autoscaler/ray-schema.json", + "title": "Ray AutoScaler", + "description": "Ray autoscaler schema", + "type": "object", + "definitions": { + "commands": { + "type": "array", + "items": { + "type": "string", + "description": "shell command" + } + } + }, + "required": [ + "cluster_name", + "provider" + ], + "additionalProperties": false, + "properties": { + "cluster_name": { + "description": "An unique identifier for the head node and workers of this cluster.", + "type": "string" + }, + "min_workers": { + "description": "The minimum number of workers nodes to launch in addition to the head node. This number should be >= 0", + "type": "integer", + "minimum": 0 + }, + "max_workers": { + "description": "The maximum number of workers nodes to launch in addition to the head node. This takes precedence over min_workers.", + "type": "integer", + "minimum": 0 + }, + "initial_workers": { + "description": "The number of workers to launch initially, in addition to the head node.", + "type": "integer", + "minimum": 0 + }, + "autoscaling_mode": { + "description": "The mode of the autoscaler e.g. default, aggressive", + "type": "string", + "enum": [ "default", "aggressive" ] + }, + "target_utilization_fraction": { + "description": "The autoscaler will scale up the cluster to this target fraction of resources usage. For example, if a cluster of 8 nodes is 100% busy # and target_utilization was 0.8, it would resize the cluster to 10.", + "type": "number", + "minimum": 0, + "maximum": 1 + }, + "idle_timeout_minutes": { + "description": "If a node is idle for this many minutes, it will be removed.", + "type": "integer", + "minimum": 0 + }, + "provider": { + "type": "object", + "description": "Cloud-provider specific configuration.", + "required": [ "type" ], + "additionalProperties": false, + "properties": { + "type": { + "type": "string", + "description": "e.g. aws, azure, gcp,..." + }, + "region": { + "type": "string", + "description": "e.g. us-east-1" + }, + "module": { + "type": "string", + "description": "module, if using external node provider" + }, + "head_ip": { + "type": "string", + "description": "gcp project id, if using gcp" + }, + "worker_ips": { + "type": "array", + "description": "local cluster head node" + }, + "use_internal_ips": { + "type": "boolean", + "description": "don't require public ips" + }, + "namespace": { + "type": "string", + "description": "k8s namespace, if using k8s" + }, + "location": { + "type": "string", + "description": "Azure location" + }, + "resource_group": { + "type": "string", + "description": "Azure resource group" + }, + "tags": { + "type": "object", + "description": "Azure user-defined tags" + }, + "subscription_id": { + "type": "string", + "description": "Azure subscription id" + }, + "msi_identity_id": { + "type": "string", + "description": "User-defined managed identity (generated by config)" + }, + "msi_identity_principal_id": { + "type": "string", + "description": "User-defined managed identity principal id (generated by config)" + }, + "subnet_id": { + "type": "string", + "description": "Network subnet id" + }, + "autoscaler_service_account": { + "type": "object", + "description": "k8s autoscaler permissions, if using k8s" + }, + "autoscaler_role": { + "type": "object", + "description": "k8s autoscaler permissions, if using k8s" + }, + "autoscaler_role_binding": { + "type": "object", + "description": "k8s autoscaler permissions, if using k8s" + }, + "extra_config": { + "type": "object", + "description": "provider-specific config" + }, + "cache_stopped_nodes": { + "type": "boolean", + "description": " Whether to try to reuse previously stopped nodes instead of launching nodes. This will also cause the autoscaler to stop nodes instead of terminating them. Only implemented for AWS." + }, + "availability_zone": { + "type": "string", + "description": "GCP availability zone" + }, + "project_id": { + "type": ["string", "null"], + "description": "GCP globally unique project id" + } + } + }, + "auth": { + "type": "object", + "description": "How Ray will authenticate with newly launched nodes.", + "additionalProperties": false, + "properties": { + "ssh_user": { + "type": "string", + "default": "ubuntu" + }, + "ssh_private_key": { + "type": "string" + } + } + }, + "docker": { + "type": "object", + "description": "Docker configuration. If this is specified, all setup and start commands will be executed in the container.", + "additionalProperties": false, + "properties": { + "image": { + "type": "string", + "description": "the docker image name", + "default": "tensorflow/tensorflow:1.5.0-py3" + }, + "container_name": { + "type": "string", + "default": "ray_docker" + }, + "pull_before_run": { + "type": "boolean", + "description": "run `docker pull` first" + }, + "run_options": { + "type": "array", + "description": "shared options for starting head/worker docker" + }, + "head_image": { + "type": "string", + "description": "image for head node, takes precedence over 'image' if specified" + }, + "head_run_options": { + "type": "array", + "description": "head specific run options, appended to run_options" + }, + "worker_image": { + "type": "string", + "description": "analogous to head_image" + }, + "worker_run_options": { + "type": "array", + "description": "analogous to head_run_options" + } + } + }, + "head_node": { + "type": "object", + "description": "Provider-specific config for the head node, e.g. instance type." + }, + "worker_nodes": { + "type": "object", + "description": "Provider-specific config for worker nodes. e.g. instance type." + }, + "file_mounts": { + "type": "object", + "description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}" + }, + "initialization_commands": { + "$ref": "#/definitions/commands", + "description": "List of commands that will be run before `setup_commands`. If docker is enabled, these commands will run outside the container and before docker is setup." + }, + "setup_commands": { + "$ref": "#/definitions/commands", + "description": "List of common shell commands to run to setup nodes." + }, + "head_setup_commands": { + "$ref": "#/definitions/commands", + "description": "Commands that will be run on the head node after common setup." + }, + "worker_setup_commands": { + "$ref": "#/definitions/commands", + "description": "Commands that will be run on worker nodes after common setup." + }, + "head_start_ray_commands": { + "$ref": "#/definitions/commands", + "description": "Command to start ray on the head node. You shouldn't need to modify this." + }, + "worker_start_ray_commands": { + "$ref": "#/definitions/commands", + "description": "Command to start ray on worker nodes. You shouldn't need to modify this." + }, + "no_restart": { + "description": "Whether to avoid restarting the cluster during updates. This field is controlled by the ray --no-restart flag and cannot be set by the user." + } + } +} \ No newline at end of file diff --git a/python/ray/tests/additional_property.yaml b/python/ray/tests/additional_property.yaml new file mode 100644 index 000000000..f965831de --- /dev/null +++ b/python/ray/tests/additional_property.yaml @@ -0,0 +1,30 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: minimal +hi: 1 +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. min_workers default to 0. +min_workers: 0 +max_workers: 0 + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2b + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + +head_node: + InstanceType: m4.10xlarge + ImageId: ami-3b6bce43 # Amazon Deep Learning AMI (Ubuntu) + +setup_commands: + - error me + # - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc + # - source activate tensorflow_p27 && pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.4.0-cp27-cp27mu-manylinux1_x86_64.whl + +# # Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml \ No newline at end of file diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 47fe8bc2e..7ef722d88 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -5,6 +5,7 @@ import time import unittest import yaml import copy +from jsonschema.exceptions import ValidationError import ray import ray.services as services @@ -323,17 +324,17 @@ class AutoscalingTest(unittest.TestCase): self.fail("Test config did not pass validation test!") config["blah"] = "blah" - with pytest.raises(ValueError): + with pytest.raises(ValidationError): validate_config(config) del config["blah"] config["provider"]["blah"] = "blah" - with pytest.raises(ValueError): + with pytest.raises(ValidationError): validate_config(config) del config["provider"]["blah"] del config["provider"] - with pytest.raises(ValueError): + with pytest.raises(ValidationError): validate_config(config) def testValidateDefaultConfig(self): @@ -346,7 +347,7 @@ class AutoscalingTest(unittest.TestCase): config = fillout_defaults(config) try: validate_config(config) - except Exception: + except ValidationError: self.fail("Default config did not pass validation test!") def testScaleUp(self): diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index 6753e26f4..b25bb9dc9 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -1,3 +1,4 @@ +import jsonschema import os import unittest import yaml @@ -15,7 +16,6 @@ CONFIG_PATHS += recursive_fnmatch( class AutoscalingConfigTest(unittest.TestCase): def testValidateDefaultConfig(self): - for config_path in CONFIG_PATHS: with open(config_path) as f: config = yaml.safe_load(f) @@ -25,6 +25,18 @@ class AutoscalingConfigTest(unittest.TestCase): except Exception: self.fail("Config did not pass validation test!") + def _test_invalid_config(self, config_path): + with open(os.path.join(RAY_PATH, config_path)) as f: + config = yaml.safe_load(f) + try: + validate_config(config) + self.fail("Expected validation to fail for {}".format(config_path)) + except jsonschema.ValidationError: + pass + + def testInvalidConfig(self): + self._test_invalid_config("tests/additional_property.yaml") + if __name__ == "__main__": import pytest diff --git a/python/setup.py b/python/setup.py index 8e8ef7509..1ee34d158 100644 --- a/python/setup.py +++ b/python/setup.py @@ -75,7 +75,7 @@ extras = { "debug": [], "dashboard": [], "serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas", "blist"], - "tune": ["tabulate", "tensorboardX"], + "tune": ["tabulate", "tensorboardX"] } extras["rllib"] = extras["tune"] + [ @@ -178,6 +178,7 @@ requires = [ "packaging", "pytest", "pyyaml", + "jsonschema", "redis>=3.3.2", # NOTE: Don't upgrade the version of six! Doing so causes installation # problems. See https://github.com/ray-project/ray/issues/4169.