From 162d063f0dd68c3c85f39bb9a8b9c275cc226f8c Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sun, 4 Mar 2018 23:35:58 -0800 Subject: [PATCH] [autoscaler/tune] Optional YAML Fields + Fix Pretty Printing for Tune (#1541) --- doc/source/autoscaling.rst | 18 +-- python/ray/autoscaler/autoscaler.py | 109 ++++++++++++------ .../aws/{example.yaml => example-full.yaml} | 7 +- .../ray/autoscaler/aws/example-minimal.yaml | 17 +++ python/ray/autoscaler/commands.py | 7 +- python/ray/autoscaler/docker.py | 2 + python/ray/autoscaler/node_provider.py | 30 +++++ python/ray/tune/logger.py | 12 ++ python/ray/tune/result.py | 18 --- python/ray/tune/trial.py | 4 +- python/setup.py | 6 + test/autoscaler_test.py | 39 ++++++- 12 files changed, 199 insertions(+), 70 deletions(-) rename python/ray/autoscaler/aws/{example.yaml => example-full.yaml} (90%) create mode 100644 python/ray/autoscaler/aws/example-minimal.yaml diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index c3f3f92c5..ec886e0f0 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -9,7 +9,7 @@ Quick start First, install boto (``pip install boto3``) and configure your AWS credentials in ``~/.aws/credentials``, as described in `the boto docs `__. -Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example.yaml `__ cluster config file will create a small cluster with a m4.large head node (on-demand), and two m4.large `spot workers `__, configured to autoscale up to four m4.large workers. +Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example-full.yaml `__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers `__. Try it out by running these commands from your personal computer. Once the cluster is started, you can then SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(redis_address=ray.services.get_node_ip_address() + ":6379")``. @@ -18,14 +18,14 @@ SSH into the head node, ``source activate tensorflow_p36``, and then run Ray pro # Create or update the cluster. When the command finishes, it will print # out the command that can be used to SSH into the cluster head node. - $ ray create_or_update ray/python/ray/autoscaler/aws/example.yaml + $ ray create_or_update ray/python/ray/autoscaler/aws/example-full.yaml # Reconfigure autoscaling behavior without interrupting running jobs - $ ray create_or_update ray/python/ray/autoscaler/aws/example.yaml \ + $ ray create_or_update ray/python/ray/autoscaler/aws/example-full.yaml \ --max-workers=N --no-restart # Teardown the cluster - $ ray teardown ray/python/ray/autoscaler/aws/example.yaml + $ ray teardown ray/python/ray/autoscaler/aws/example-full.yaml To run connect to applications running on the cluster (e.g. Jupyter notebook) using a web browser, you can forward the port to your local machine using SSH: @@ -66,10 +66,14 @@ The Ray autoscaler also reports per-node status in the form of instance tags. In Customizing cluster setup ------------------------- -You are encouraged to copy the example YAML file and modify it to your needs. This may include adding additional setup commands to install libraries or sync local data files. After you have customized the nodes, it is also a good idea to create a new machine image (AMI) and use that in the config file. This reduces worker setup time, improving the efficiency of auto-scaling. +You are encouraged to copy the example YAML file and modify it to your needs. This may include adding additional setup commands to install libraries or sync local data files. + +.. note:: After you have customized the nodes, it is also a good idea to create a new machine image (AMI) and use that in the config file. This reduces worker setup time, improving the efficiency of auto-scaling. The setup commands you use should ideally be *idempotent*, that is, can be run more than once. This allows Ray to update nodes after they have been created. You can usually make commands idempotent with small modifications, e.g. ``git clone foo`` can be rewritten as ``test -e foo || git clone foo`` which checks if the repo is already cloned first. +Most of the example YAML file is optional. Here is a `reference minimal YAML file `__, and you can find the defaults for `optional fields in this YAML file `__. + Syncing git branches -------------------- @@ -94,7 +98,7 @@ This tells ``ray create_or_update`` to sync the current git branch SHA from your Common cluster configurations ----------------------------- -The ``example.yaml`` configuration is enough to get started with Ray, but for more compute intensive workloads you will want to change the instance types to e.g. use GPU or larger compute instance by editing the yaml file. Here are a few common configurations: +The ``example-full.yaml`` configuration is enough to get started with Ray, but for more compute intensive workloads you will want to change the instance types to e.g. use GPU or larger compute instance by editing the yaml file. Here are a few common configurations: **GPU single node**: use Ray on a single large GPU instance. @@ -105,7 +109,7 @@ The ``example.yaml`` configuration is enough to get started with Ray, but for mo InstanceType: p2.8xlarge **Docker**: Specify docker image. This executes all commands on all nodes in the docker container, -and opens all the necessary ports to support the Ray cluster. +and opens all the necessary ports to support the Ray cluster. This currently does not have GPU support. .. code-block:: yaml diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index c6f4b0acc..e97e56477 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -18,77 +18,85 @@ import yaml from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ AUTOSCALER_HEARTBEAT_TIMEOUT_S -from ray.autoscaler.node_provider import get_node_provider +from ray.autoscaler.node_provider import get_node_provider, \ + get_default_config from ray.autoscaler.updater import NodeUpdaterProcess +from ray.autoscaler.docker import dockerize_if_needed from ray.autoscaler.tags import TAG_RAY_LAUNCH_CONFIG, \ TAG_RAY_RUNTIME_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, TAG_NAME import ray.services as services +REQUIRED, OPTIONAL = True, False +# For (a, b), if a is a dictionary object, then +# no extra fields can be introduced. CLUSTER_CONFIG_SCHEMA = { # An unique identifier for the head node and workers of this cluster. - "cluster_name": str, + "cluster_name": (str, REQUIRED), # The minimum number of workers nodes to launch in addition to the head # node. This number should be >= 0. - "min_workers": int, + "min_workers": (int, OPTIONAL), # The maximum number of workers nodes to launch in addition to the head # node. This takes precedence over min_workers. - "max_workers": int, + "max_workers": (int, REQUIRED), # The autoscaler will scale up the cluster to this target fraction of # resources usage. For example, if a cluster of 8 nodes is 100% busy # and target_utilization was 0.8, it would resize the cluster to 10. - "target_utilization_fraction": float, + "target_utilization_fraction": (float, OPTIONAL), # If a node is idle for this many minutes, it will be removed. - "idle_timeout_minutes": int, + "idle_timeout_minutes": (int, OPTIONAL), # Cloud-provider specific configuration. - "provider": { - "type": str, # e.g. aws - "region": str, # e.g. us-east-1 - "availability_zone": str, # e.g. us-east-1a - }, + "provider": ({ + "type": (str, REQUIRED), # e.g. aws + "region": (str, REQUIRED), # e.g. us-east-1 + "availability_zone": (str, REQUIRED), # e.g. us-east-1a + }, REQUIRED), # How Ray will authenticate with newly launched nodes. - "auth": dict, + "auth": ({ + "ssh_user": (str, REQUIRED), # e.g. ubuntu + "ssh_private_key": (str, OPTIONAL), + }, REQUIRED), # Docker configuration. If this is specified, all setup and start commands # will be executed in the container. - "docker": { - "image": str, # e.g. tensorflow/tensorflow:1.5.0-py3 - "container_name": str - }, + "docker": ({ + "image": (str, OPTIONAL), # e.g. tensorflow/tensorflow:1.5.0-py3 + "container_name": (str, OPTIONAL), # e.g., ray_docker + }, OPTIONAL), # Provider-specific config for the head node, e.g. instance type. - "head_node": dict, + "head_node": (dict, OPTIONAL), # Provider-specific config for worker nodes. e.g. instance type. - "worker_nodes": dict, + "worker_nodes": (dict, OPTIONAL), # Map of remote paths to local paths, e.g. {"/tmp/data": "/my/local/data"} - "file_mounts": dict, + "file_mounts": (dict, OPTIONAL), # List of common shell commands to run to initialize nodes. - "setup_commands": list, + "setup_commands": (list, OPTIONAL), # Commands that will be run on the head node after common setup. - "head_setup_commands": list, + "head_setup_commands": (list, OPTIONAL), # Commands that will be run on worker nodes after common setup. - "worker_setup_commands": list, + "worker_setup_commands": (list, OPTIONAL), # Command to start ray on the head node. You shouldn't need to modify this. - "head_start_ray_commands": list, + "head_start_ray_commands": (list, OPTIONAL), # Command to start ray on worker nodes. You shouldn't need to modify this. - "worker_start_ray_commands": list, + "worker_start_ray_commands": (list, OPTIONAL), # Whether to avoid restarting the cluster during updates. This field is # controlled by the ray --no-restart flag and cannot be set by the user. - "no_restart": None, + "no_restart": (None, OPTIONAL), } @@ -474,28 +482,57 @@ def typename(v): return type(v).__name__ -def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA): +def check_required(config, schema): + # Check required schema entries if type(config) is not dict: raise ValueError("Config is not a dictionary") - for k, v in schema.items(): + + for k, (v, kreq) in schema.items(): if v is None: continue # None means we don't validate the field - if k not in config: + if kreq is REQUIRED: + if k not in config: + type_str = typename(v) + raise ValueError( + "Missing required config key `{}` of type {}".format( + k, type_str)) + if not isinstance(v, type): + check_required(config[k], v) + + +def check_extraneous(config, schema): + """Make sure all items of config are in schema""" + if type(config) is not dict: + raise ValueError("Config is not a dictionary") + for k in config: + if k not in schema: raise ValueError( - "Missing required config key `{}` of type {}".format( - k, typename(v))) + "Unexpected config key `{}` not in {}".format( + k, list(schema.keys()))) + v, kreq = schema[k] if isinstance(v, type): if not isinstance(config[k], v): raise ValueError( "Config key `{}` has wrong type {}, expected {}".format( k, type(config[k]).__name__, v.__name__)) else: - validate_config(config[k], schema[k]) - for k in config.keys(): - if k not in schema: - raise ValueError( - "Unexpected config key `{}` not in {}".format( - k, schema.keys())) + check_extraneous(config[k], v) + + +def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA): + """Required Dicts indicate that no extra fields can be introduced.""" + if type(config) is not dict: + raise ValueError("Config is not a dictionary") + + check_required(config, schema) + check_extraneous(config, schema) + + +def fillout_defaults(config): + defaults = get_default_config(config["provider"]) + defaults.update(config) + dockerize_if_needed(defaults) + return defaults def with_head_node_ip(cmds): diff --git a/python/ray/autoscaler/aws/example.yaml b/python/ray/autoscaler/aws/example-full.yaml similarity index 90% rename from python/ray/autoscaler/aws/example.yaml rename to python/ray/autoscaler/aws/example-full.yaml index ae098e616..376dd3960 100644 --- a/python/ray/autoscaler/aws/example.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -3,7 +3,7 @@ cluster_name: default # The minimum number of workers nodes to launch in addition to the head # node. This number should be >= 0. -min_workers: 1 +min_workers: 0 # The maximum number of workers nodes to launch in addition to the head # node. This takes precedence over min_workers. @@ -85,7 +85,10 @@ setup_commands: # Note: if you're developing Ray, you probably want to create an AMI that # has your Ray repo pre-cloned. Then, you can replace the pip installs # below with a git checkout (and possibly a recompile). - - source activate tensorflow_p36 && most_recent() { echo pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/$(aws s3 ls s3://ray-wheels --recursive | grep $1 | sort -r | head -n 1 | awk '{print $4}'); } && $( most_recent "cp36-cp36m-manylinux1" ) || $( most_recent "cp35-cp35m-manylinux1" ) + - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.3.1-cp27-cp27mu-manylinux1_x86_64.whl + # - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.3.1-cp35-cp35m-manylinux1_x86_64.whl + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.3.1-cp36-cp36m-manylinux1_x86_64.whl # Consider uncommenting these if you also want to run apt-get commands during setup # - sudo pkill -9 apt-get || true # - sudo pkill -9 dpkg || true diff --git a/python/ray/autoscaler/aws/example-minimal.yaml b/python/ray/autoscaler/aws/example-minimal.yaml new file mode 100644 index 000000000..b265dddd8 --- /dev/null +++ b/python/ray/autoscaler/aws/example-minimal.yaml @@ -0,0 +1,17 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: minimal + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. min_workers default to 0. +max_workers: 1 + +# Cloud-provider specific configuration. +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 6e76c35d3..8be0b744a 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -16,8 +16,7 @@ except ImportError: # py2 from pipes import quote from ray.autoscaler.autoscaler import validate_config, hash_runtime_conf, \ - hash_launch_conf -from ray.autoscaler.docker import dockerize_if_needed + hash_launch_conf, fillout_defaults from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ TAG_NAME @@ -31,7 +30,7 @@ def create_or_update_cluster( config = yaml.load(open(config_file).read()) validate_config(config) - dockerize_if_needed(config) + config = fillout_defaults(config) if override_min_workers is not None: config["min_workers"] = override_min_workers @@ -53,7 +52,7 @@ def teardown_cluster(config_file, yes): config = yaml.load(open(config_file).read()) validate_config(config) - dockerize_if_needed(config) + config = fillout_defaults(config) confirm("This will destroy your cluster", yes) diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/docker.py index d6b71d729..f126681ba 100644 --- a/python/ray/autoscaler/docker.py +++ b/python/ray/autoscaler/docker.py @@ -13,6 +13,8 @@ def dockerize_if_needed(config): docker_image = config["docker"].get("image") cname = config["docker"].get("container_name") if not docker_image: + if cname: + print("Container name given but no Docker image - continuing...") return config else: assert cname, "Must provide container name!" diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index f5e4ee831..082bb5d43 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -2,6 +2,9 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import os +import yaml + def import_aws(): from ray.autoscaler.aws.config import bootstrap_aws @@ -9,6 +12,12 @@ def import_aws(): return bootstrap_aws, AWSNodeProvider +def load_aws_config(): + import ray.autoscaler.aws as ray_aws + return os.path.join(os.path.dirname( + ray_aws.__file__), "example-full.yaml") + + NODE_PROVIDERS = { "aws": import_aws, "gce": None, # TODO: support more node providers @@ -18,6 +27,15 @@ NODE_PROVIDERS = { "local_cluster": None, } +DEFAULT_CONFIGS = { + "aws": load_aws_config, + "gce": None, # TODO: support more node providers + "azure": None, + "kubernetes": None, + "docker": None, + "local_cluster": None, +} + def get_node_provider(provider_config, cluster_name): importer = NODE_PROVIDERS.get(provider_config["type"]) @@ -28,6 +46,18 @@ def get_node_provider(provider_config, cluster_name): return provider_cls(provider_config, cluster_name) +def get_default_config(provider_config): + load_config = DEFAULT_CONFIGS.get(provider_config["type"]) + if load_config is None: + raise NotImplementedError( + "Unsupported node provider: {}".format(provider_config["type"])) + path_to_default = load_config() + with open(path_to_default) as f: + defaults = yaml.load(f) + + return defaults + + class NodeProvider(object): """Interface for getting and returning nodes from a Cloud. diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 952049b33..0379c6c22 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -6,6 +6,7 @@ import csv import json import numpy as np import os +import yaml from ray.tune.result import TrainingResult from ray.tune.log_sync import get_syncer @@ -176,3 +177,14 @@ class _CustomEncoder(json.JSONEncoder): return float(value) if np.issubdtype(value, int): return int(value) + + +def pretty_print(result): + result = result._replace(config=None) # drop config from pretty print + out = {} + for k, v in result._asdict().items(): + if v is not None: + out[k] = v + + cleaned = json.dumps(out, cls=_CustomEncoder) + return yaml.dump(json.loads(cleaned), default_flow_style=False) diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 77c1aa5bf..0a1b4b192 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -3,14 +3,8 @@ from __future__ import division from __future__ import print_function from collections import namedtuple -import json import os -try: - import yaml -except ImportError: - print("Could not import YAML module, falling back to JSON pretty-printing") - yaml = None """ When using ray.tune with custom training scripts, you must periodically report @@ -93,16 +87,4 @@ TrainingResult = namedtuple("TrainingResult", [ ]) -def pretty_print(result): - result = result._replace(config=None) # drop config from pretty print - out = {} - for k, v in result._asdict().items(): - if v is not None: - out[k] = v - if yaml: - return yaml.safe_dump(out, default_flow_style=False) - else: - return json.dumps(out) + "\n" - - TrainingResult.__new__.__defaults__ = (None,) * len(TrainingResult._fields) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index f6166eaaf..a5a2e0a78 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -11,9 +11,9 @@ import ray import os from ray.tune import TuneError -from ray.tune.logger import NoopLogger, UnifiedLogger +from ray.tune.logger import NoopLogger, UnifiedLogger, pretty_print from ray.tune.registry import _default_registry, get_registry, TRAINABLE_CLASS -from ray.tune.result import TrainingResult, DEFAULT_RESULTS_DIR, pretty_print +from ray.tune.result import TrainingResult, DEFAULT_RESULTS_DIR from ray.utils import random_string, binary_to_hex DEBUG_PRINT_INTERVAL = 5 diff --git a/python/setup.py b/python/setup.py index 990975da0..d03675a67 100644 --- a/python/setup.py +++ b/python/setup.py @@ -33,6 +33,10 @@ ray_ui_files = [ "ray/core/src/catapult_files/trace_viewer_full.html" ] +ray_autoscaler_files = [ + "ray/autoscaler/aws/example-full.yaml" +] + # The UI files are mandatory if the INCLUDE_UI environment variable equals 1. # Otherwise, they are optional. if "INCLUDE_UI" in os.environ and os.environ["INCLUDE_UI"] == "1": @@ -40,6 +44,8 @@ if "INCLUDE_UI" in os.environ and os.environ["INCLUDE_UI"] == "1": else: optional_ray_files += ray_ui_files +optional_ray_files += ray_autoscaler_files + extras = { "rllib": [ "tensorflow", "pyyaml", "gym[atari]", "opencv-python", diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 878a0013b..f5193ca0e 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -7,9 +7,11 @@ import tempfile import time import unittest import yaml +import copy import ray -from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics +from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \ + fillout_defaults, validate_config from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider from ray.autoscaler.updater import NodeUpdaterThread @@ -197,6 +199,41 @@ class AutoscalingTest(unittest.TestCase): lambda: StandardAutoscaler( invalid_config, LoadMetrics(), update_interval_s=0)) + def testValidation(self): + """Ensures that schema validation is working.""" + config = copy.deepcopy(SMALL_CLUSTER) + try: + validate_config(config) + except Exception: + self.fail("Test config did not pass validation test!") + + config["blah"] = "blah" + with self.assertRaises(ValueError): + validate_config(config) + del config["blah"] + + config["provider"]["blah"] = "blah" + with self.assertRaises(ValueError): + validate_config(config) + del config["provider"]["blah"] + + del config["provider"] + with self.assertRaises(ValueError): + validate_config(config) + + def testValidateDefaultConfig(self): + config = {} + config["provider"] = { + "type": "aws", + "region": "us-east-1", + "availability_zone": "us-east-1a", + } + config = fillout_defaults(config) + try: + validate_config(config) + except Exception: + self.fail("Default config did not pass validation test!") + def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider()