[autoscaler] Replace cluster yaml validation with json schema v… (#7261)

* replace manual cluster yaml validation with json schema
- improved error message
- support for intellisense in VSCode (or other IDEs)
- run linting
- moved schema to ray/autoscaler
- fixed typo
- remove importlib dependency

* Update python/ray/autoscaler/autoscaler.py

* read

* restrict allowed properties

* added unit test for invalid yaml
added ray[test] package (remove pytest from default dependencies)

* updated autoscaler test to use ValidationError exception

* add missing dependency

* added pytest

* replace manual cluster yaml validation with json schema
- improved error message
- support for intellisense in VSCode (or other IDEs)
- run linting
- moved schema to ray/autoscaler
- fixed typo
- remove importlib dependency

* Update python/ray/autoscaler/autoscaler.py

* read

* restrict allowed properties

* added unit test for invalid yaml
added ray[test] package (remove pytest from default dependencies)

* updated autoscaler test to use ValidationError exception

* add missing dependency

* added pytest

* removed parameterized dependency
reverted ray[test] intro

* removed parameterized

* fix_tests

* format

Co-authored-by: Ubuntu <marcozo@mc-ray-jumpbox.chcbtljllnieveqhw3e4c1ducc.xx.internal.cloudapp.net>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Markus Cozowicz
2020-03-11 02:58:55 +01:00
committed by GitHub
parent 6163b21458
commit 49439611f1
7 changed files with 310 additions and 182 deletions
+1
View File
@@ -0,0 +1 @@
include ray/python/ray/autoscaler/ray-schema.json
+16 -176
View File
@@ -1,18 +1,18 @@
from collections import defaultdict
import copy
import hashlib
import json
import jsonschema
import logging
import math
import numpy as np
import os
import subprocess
import threading
import time
from collections import defaultdict
import numpy as np
import ray.services as services
import yaml
from ray.worker import global_worker
import ray
from ray.autoscaler.docker import dockerize_if_needed
from ray.autoscaler.node_provider import get_node_provider, \
get_default_config
@@ -25,131 +25,15 @@ from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \
AUTOSCALER_RESOURCE_REQUEST_CHANNEL, MEMORY_RESOURCE_UNIT_BYTES
from six import string_types
import ray.services as services
from ray.worker import global_worker
from six.moves import queue
logger = logging.getLogger(__name__)
REQUIRED, OPTIONAL = True, False
# For (a, b), if a is a dictionary object, then
# no extra fields can be introduced.
CLUSTER_CONFIG_SCHEMA = {
# An unique identifier for the head node and workers of this cluster.
"cluster_name": (str, REQUIRED),
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
"min_workers": (int, OPTIONAL),
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
"max_workers": (int, REQUIRED),
# The number of workers to launch initially, in addition to the head node.
"initial_workers": (int, OPTIONAL),
# The mode of the autoscaler e.g. default, aggressive
"autoscaling_mode": (str, OPTIONAL),
# The autoscaler will scale up the cluster to this target fraction of
# resources usage. For example, if a cluster of 8 nodes is 100% busy
# and target_utilization was 0.8, it would resize the cluster to 10.
"target_utilization_fraction": (float, OPTIONAL),
# If a node is idle for this many minutes, it will be removed.
"idle_timeout_minutes": (int, OPTIONAL),
# Cloud-provider specific configuration.
"provider": (
{
"type": (str, REQUIRED), # e.g. aws
"region": (str, OPTIONAL), # e.g. us-east-1
"availability_zone": (str, OPTIONAL), # e.g. us-east-1a
"module": (str,
OPTIONAL), # module, if using external node provider
"project_id": (None, OPTIONAL), # gcp project id, if using gcp
"head_ip": (str, OPTIONAL), # local cluster head node
"worker_ips": (list, OPTIONAL), # local cluster worker nodes
"use_internal_ips": (bool, OPTIONAL), # don't require public ips
"namespace": (str, OPTIONAL), # k8s namespace, if using k8s
# k8s autoscaler permissions, if using k8s
"autoscaler_service_account": (dict, OPTIONAL),
"autoscaler_role": (dict, OPTIONAL),
"autoscaler_role_binding": (dict, OPTIONAL),
"extra_config": (dict, OPTIONAL), # provider-specific config
# Whether to try to reuse previously stopped nodes instead of
# launching nodes. This will also cause the autoscaler to stop
# nodes instead of terminating them. Only implemented for AWS.
"cache_stopped_nodes": (bool, OPTIONAL),
},
REQUIRED),
# How Ray will authenticate with newly launched nodes.
"auth": (
{
"ssh_user": (str, OPTIONAL), # e.g. ubuntu
"ssh_private_key": (str, OPTIONAL),
},
OPTIONAL),
# Docker configuration. If this is specified, all setup and start commands
# will be executed in the container.
"docker": (
{
"image": (str, OPTIONAL), # e.g. tensorflow/tensorflow:1.5.0-py3
"container_name": (str, OPTIONAL), # e.g., ray_docker
"pull_before_run": (bool, OPTIONAL), # run `docker pull` first
# shared options for starting head/worker docker
"run_options": (list, OPTIONAL),
# image for head node, takes precedence over "image" if specified
"head_image": (str, OPTIONAL),
# head specific run options, appended to run_options
"head_run_options": (list, OPTIONAL),
# analogous to head_image
"worker_image": (str, OPTIONAL),
# analogous to head_run_options
"worker_run_options": (list, OPTIONAL),
},
OPTIONAL),
# Provider-specific config for the head node, e.g. instance type.
"head_node": (dict, OPTIONAL),
# Provider-specific config for worker nodes. e.g. instance type.
"worker_nodes": (dict, OPTIONAL),
# Map of remote paths to local paths, e.g. {"/tmp/data": "/my/local/data"}
"file_mounts": (dict, OPTIONAL),
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
"initialization_commands": (list, OPTIONAL),
# List of common shell commands to run to setup nodes.
"setup_commands": (list, OPTIONAL),
# Commands that will be run on the head node after common setup.
"head_setup_commands": (list, OPTIONAL),
# Commands that will be run on worker nodes after common setup.
"worker_setup_commands": (list, OPTIONAL),
# Command to start ray on the head node. You shouldn't need to modify this.
"head_start_ray_commands": (list, OPTIONAL),
# Command to start ray on worker nodes. You shouldn't need to modify this.
"worker_start_ray_commands": (list, OPTIONAL),
# Whether to avoid restarting the cluster during updates. This field is
# controlled by the ray --no-restart flag and cannot be set by the user.
"no_restart": (None, OPTIONAL),
}
RAY_SCHEMA_PATH = os.path.join(
os.path.dirname(ray.autoscaler.__file__), "ray-schema.json")
class LoadMetrics:
@@ -764,61 +648,17 @@ class StandardAutoscaler:
len(nodes)))
def typename(v):
if isinstance(v, type):
return v.__name__
else:
return type(v).__name__
def check_required(config, schema):
# Check required schema entries
if not isinstance(config, dict):
raise ValueError("Config is not a dictionary")
for k, (v, kreq) in schema.items():
if v is None:
continue # None means we don't validate the field
if kreq is REQUIRED:
if k not in config:
type_str = typename(v)
raise ValueError(
"Missing required config key `{}` of type {}".format(
k, type_str))
if not isinstance(v, type):
check_required(config[k], v)
def check_extraneous(config, schema):
"""Make sure all items of config are in schema"""
if not isinstance(config, dict):
raise ValueError("Config {} is not a dictionary".format(config))
for k in config:
if k not in schema:
raise ValueError("Unexpected config key `{}` not in {}".format(
k, list(schema.keys())))
v, kreq = schema[k]
if v is None:
continue
elif isinstance(v, type):
if not isinstance(config[k], v):
if v is str and isinstance(config[k], string_types):
continue
raise ValueError(
"Config key `{}` has wrong type {}, expected {}".format(
k,
type(config[k]).__name__, v.__name__))
else:
check_extraneous(config[k], v)
def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA):
def validate_config(config):
"""Required Dicts indicate that no extra fields can be introduced."""
if not isinstance(config, dict):
raise ValueError("Config {} is not a dictionary".format(config))
check_required(config, schema)
check_extraneous(config, schema)
with open(RAY_SCHEMA_PATH) as f:
schema = json.load(f)
try:
jsonschema.validate(config, schema)
except jsonschema.ValidationError as e:
raise jsonschema.ValidationError(message=e.message) from None
def fillout_defaults(config):
+243
View File
@@ -0,0 +1,243 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/python/ray/autoscaler/ray-schema.json",
"title": "Ray AutoScaler",
"description": "Ray autoscaler schema",
"type": "object",
"definitions": {
"commands": {
"type": "array",
"items": {
"type": "string",
"description": "shell command"
}
}
},
"required": [
"cluster_name",
"provider"
],
"additionalProperties": false,
"properties": {
"cluster_name": {
"description": "An unique identifier for the head node and workers of this cluster.",
"type": "string"
},
"min_workers": {
"description": "The minimum number of workers nodes to launch in addition to the head node. This number should be >= 0",
"type": "integer",
"minimum": 0
},
"max_workers": {
"description": "The maximum number of workers nodes to launch in addition to the head node. This takes precedence over min_workers.",
"type": "integer",
"minimum": 0
},
"initial_workers": {
"description": "The number of workers to launch initially, in addition to the head node.",
"type": "integer",
"minimum": 0
},
"autoscaling_mode": {
"description": "The mode of the autoscaler e.g. default, aggressive",
"type": "string",
"enum": [ "default", "aggressive" ]
},
"target_utilization_fraction": {
"description": "The autoscaler will scale up the cluster to this target fraction of resources usage. For example, if a cluster of 8 nodes is 100% busy # and target_utilization was 0.8, it would resize the cluster to 10.",
"type": "number",
"minimum": 0,
"maximum": 1
},
"idle_timeout_minutes": {
"description": "If a node is idle for this many minutes, it will be removed.",
"type": "integer",
"minimum": 0
},
"provider": {
"type": "object",
"description": "Cloud-provider specific configuration.",
"required": [ "type" ],
"additionalProperties": false,
"properties": {
"type": {
"type": "string",
"description": "e.g. aws, azure, gcp,..."
},
"region": {
"type": "string",
"description": "e.g. us-east-1"
},
"module": {
"type": "string",
"description": "module, if using external node provider"
},
"head_ip": {
"type": "string",
"description": "gcp project id, if using gcp"
},
"worker_ips": {
"type": "array",
"description": "local cluster head node"
},
"use_internal_ips": {
"type": "boolean",
"description": "don't require public ips"
},
"namespace": {
"type": "string",
"description": "k8s namespace, if using k8s"
},
"location": {
"type": "string",
"description": "Azure location"
},
"resource_group": {
"type": "string",
"description": "Azure resource group"
},
"tags": {
"type": "object",
"description": "Azure user-defined tags"
},
"subscription_id": {
"type": "string",
"description": "Azure subscription id"
},
"msi_identity_id": {
"type": "string",
"description": "User-defined managed identity (generated by config)"
},
"msi_identity_principal_id": {
"type": "string",
"description": "User-defined managed identity principal id (generated by config)"
},
"subnet_id": {
"type": "string",
"description": "Network subnet id"
},
"autoscaler_service_account": {
"type": "object",
"description": "k8s autoscaler permissions, if using k8s"
},
"autoscaler_role": {
"type": "object",
"description": "k8s autoscaler permissions, if using k8s"
},
"autoscaler_role_binding": {
"type": "object",
"description": "k8s autoscaler permissions, if using k8s"
},
"extra_config": {
"type": "object",
"description": "provider-specific config"
},
"cache_stopped_nodes": {
"type": "boolean",
"description": " Whether to try to reuse previously stopped nodes instead of launching nodes. This will also cause the autoscaler to stop nodes instead of terminating them. Only implemented for AWS."
},
"availability_zone": {
"type": "string",
"description": "GCP availability zone"
},
"project_id": {
"type": ["string", "null"],
"description": "GCP globally unique project id"
}
}
},
"auth": {
"type": "object",
"description": "How Ray will authenticate with newly launched nodes.",
"additionalProperties": false,
"properties": {
"ssh_user": {
"type": "string",
"default": "ubuntu"
},
"ssh_private_key": {
"type": "string"
}
}
},
"docker": {
"type": "object",
"description": "Docker configuration. If this is specified, all setup and start commands will be executed in the container.",
"additionalProperties": false,
"properties": {
"image": {
"type": "string",
"description": "the docker image name",
"default": "tensorflow/tensorflow:1.5.0-py3"
},
"container_name": {
"type": "string",
"default": "ray_docker"
},
"pull_before_run": {
"type": "boolean",
"description": "run `docker pull` first"
},
"run_options": {
"type": "array",
"description": "shared options for starting head/worker docker"
},
"head_image": {
"type": "string",
"description": "image for head node, takes precedence over 'image' if specified"
},
"head_run_options": {
"type": "array",
"description": "head specific run options, appended to run_options"
},
"worker_image": {
"type": "string",
"description": "analogous to head_image"
},
"worker_run_options": {
"type": "array",
"description": "analogous to head_run_options"
}
}
},
"head_node": {
"type": "object",
"description": "Provider-specific config for the head node, e.g. instance type."
},
"worker_nodes": {
"type": "object",
"description": "Provider-specific config for worker nodes. e.g. instance type."
},
"file_mounts": {
"type": "object",
"description": "Map of remote paths to local paths, e.g. {\"/tmp/data\": \"/my/local/data\"}"
},
"initialization_commands": {
"$ref": "#/definitions/commands",
"description": "List of commands that will be run before `setup_commands`. If docker is enabled, these commands will run outside the container and before docker is setup."
},
"setup_commands": {
"$ref": "#/definitions/commands",
"description": "List of common shell commands to run to setup nodes."
},
"head_setup_commands": {
"$ref": "#/definitions/commands",
"description": "Commands that will be run on the head node after common setup."
},
"worker_setup_commands": {
"$ref": "#/definitions/commands",
"description": "Commands that will be run on worker nodes after common setup."
},
"head_start_ray_commands": {
"$ref": "#/definitions/commands",
"description": "Command to start ray on the head node. You shouldn't need to modify this."
},
"worker_start_ray_commands": {
"$ref": "#/definitions/commands",
"description": "Command to start ray on worker nodes. You shouldn't need to modify this."
},
"no_restart": {
"description": "Whether to avoid restarting the cluster during updates. This field is controlled by the ray --no-restart flag and cannot be set by the user."
}
}
}
+30
View File
@@ -0,0 +1,30 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal
hi: 1
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers. min_workers default to 0.
min_workers: 0
max_workers: 0
provider:
type: aws
region: us-west-2
availability_zone: us-west-2b
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
head_node:
InstanceType: m4.10xlarge
ImageId: ami-3b6bce43 # Amazon Deep Learning AMI (Ubuntu)
setup_commands:
- error me
# - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc
# - source activate tensorflow_p27 && pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.4.0-cp27-cp27mu-manylinux1_x86_64.whl
# # Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
+5 -4
View File
@@ -5,6 +5,7 @@ import time
import unittest
import yaml
import copy
from jsonschema.exceptions import ValidationError
import ray
import ray.services as services
@@ -323,17 +324,17 @@ class AutoscalingTest(unittest.TestCase):
self.fail("Test config did not pass validation test!")
config["blah"] = "blah"
with pytest.raises(ValueError):
with pytest.raises(ValidationError):
validate_config(config)
del config["blah"]
config["provider"]["blah"] = "blah"
with pytest.raises(ValueError):
with pytest.raises(ValidationError):
validate_config(config)
del config["provider"]["blah"]
del config["provider"]
with pytest.raises(ValueError):
with pytest.raises(ValidationError):
validate_config(config)
def testValidateDefaultConfig(self):
@@ -346,7 +347,7 @@ class AutoscalingTest(unittest.TestCase):
config = fillout_defaults(config)
try:
validate_config(config)
except Exception:
except ValidationError:
self.fail("Default config did not pass validation test!")
def testScaleUp(self):
+13 -1
View File
@@ -1,3 +1,4 @@
import jsonschema
import os
import unittest
import yaml
@@ -15,7 +16,6 @@ CONFIG_PATHS += recursive_fnmatch(
class AutoscalingConfigTest(unittest.TestCase):
def testValidateDefaultConfig(self):
for config_path in CONFIG_PATHS:
with open(config_path) as f:
config = yaml.safe_load(f)
@@ -25,6 +25,18 @@ class AutoscalingConfigTest(unittest.TestCase):
except Exception:
self.fail("Config did not pass validation test!")
def _test_invalid_config(self, config_path):
with open(os.path.join(RAY_PATH, config_path)) as f:
config = yaml.safe_load(f)
try:
validate_config(config)
self.fail("Expected validation to fail for {}".format(config_path))
except jsonschema.ValidationError:
pass
def testInvalidConfig(self):
self._test_invalid_config("tests/additional_property.yaml")
if __name__ == "__main__":
import pytest
+2 -1
View File
@@ -75,7 +75,7 @@ extras = {
"debug": [],
"dashboard": [],
"serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas", "blist"],
"tune": ["tabulate", "tensorboardX"],
"tune": ["tabulate", "tensorboardX"]
}
extras["rllib"] = extras["tune"] + [
@@ -178,6 +178,7 @@ requires = [
"packaging",
"pytest",
"pyyaml",
"jsonschema",
"redis>=3.3.2",
# NOTE: Don't upgrade the version of six! Doing so causes installation
# problems. See https://github.com/ray-project/ray/issues/4169.