From 035a4ad0a2dbe1ee4855e12e99cd0c1887048f23 Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Sat, 3 Oct 2020 13:18:01 -0700 Subject: [PATCH] [autoscaler] Update Autoscaler SDK (#11185) * small sdk addition * create RAY_AUTOSCALER_CONSTANTS * better name * move to private * better naming --- python/ray/autoscaler/_private/autoscaler.py | 7 ++-- python/ray/autoscaler/_private/aws/config.py | 2 +- .../autoscaler/_private/aws/node_provider.py | 3 +- python/ray/autoscaler/_private/cli_logger.py | 4 +- python/ray/autoscaler/_private/commands.py | 5 ++- python/ray/autoscaler/_private/constants.py | 41 +++++++++++++++++++ python/ray/autoscaler/_private/docker.py | 4 +- .../ray/autoscaler/_private/load_metrics.py | 2 +- python/ray/autoscaler/_private/updater.py | 7 ++-- python/ray/autoscaler/sdk.py | 15 ++++++- python/ray/ray_constants.py | 26 ------------ 11 files changed, 73 insertions(+), 43 deletions(-) create mode 100644 python/ray/autoscaler/_private/constants.py diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index bc549a963..a4d8b3165 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -26,9 +26,10 @@ from ray.autoscaler._private.resource_demand_scheduler import \ from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR -from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ - AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ - AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S +from ray.autoscaler._private.constants import \ + AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \ + AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \ + AUTOSCALER_HEARTBEAT_TIMEOUT_S from six.moves import queue logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/_private/aws/config.py b/python/ray/autoscaler/_private/aws/config.py index 8b865a3cb..efde00fa6 100644 --- a/python/ray/autoscaler/_private/aws/config.py +++ b/python/ray/autoscaler/_private/aws/config.py @@ -11,7 +11,7 @@ import boto3 from botocore.config import Config import botocore -from ray.ray_constants import BOTO_MAX_RETRIES +from ray.autoscaler._private.constants import BOTO_MAX_RETRIES from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD from ray.autoscaler._private.providers import _PROVIDER_PRETTY_NAMES from ray.autoscaler._private.aws.utils import LazyDefaultDict, \ diff --git a/python/ray/autoscaler/_private/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py index da487966d..19b55d625 100644 --- a/python/ray/autoscaler/_private/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -12,7 +12,8 @@ from botocore.config import Config from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \ TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE -from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES +from ray.autoscaler._private.constants import BOTO_MAX_RETRIES, \ + BOTO_CREATE_MAX_RETRIES from ray.autoscaler._private.aws.config import bootstrap_aws from ray.autoscaler._private.log_timer import LogTimer diff --git a/python/ray/autoscaler/_private/cli_logger.py b/python/ray/autoscaler/_private/cli_logger.py index 57aea7fec..3bdf73d28 100644 --- a/python/ray/autoscaler/_private/cli_logger.py +++ b/python/ray/autoscaler/_private/cli_logger.py @@ -303,8 +303,8 @@ class _CliLogger(): def set_format(self, format_tmpl=None): if not format_tmpl: - import ray.ray_constants as ray_constants - format_tmpl = ray_constants.LOGGER_FORMAT + from ray.autoscaler._private.constants import LOGGER_FORMAT + format_tmpl = LOGGER_FORMAT self._formatter = logging.Formatter(format_tmpl) def configure(self, log_style=None, color_mode=None, verbosity=None): diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index d01b48c3d..3c5e71ae4 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -19,7 +19,8 @@ except ImportError: # py2 from ray.experimental.internal_kv import _internal_kv_get import ray._private.services as services -from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL +from ray.autoscaler._private.constants import \ + AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \ hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \ DEBUG_AUTOSCALING_STATUS @@ -1046,7 +1047,7 @@ def get_head_node_ip(config_file: str, provider = _get_node_provider(config["provider"], config["cluster_name"]) try: head_node = _get_head_node(config, config_file, override_cluster_name) - if config.get("provider", {}).get("use_internal_ips", False) is True: + if config.get("provider", {}).get("use_internal_ips", False): head_node_ip = provider.internal_ip(head_node) else: head_node_ip = provider.external_ip(head_node) diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py new file mode 100644 index 000000000..0991c86d6 --- /dev/null +++ b/python/ray/autoscaler/_private/constants.py @@ -0,0 +1,41 @@ +import os + +from ray.ray_constants import ( # noqa F401 + AUTOSCALER_RESOURCE_REQUEST_CHANNEL, LOGGER_FORMAT, + MEMORY_RESOURCE_UNIT_BYTES, RESOURCES_ENVIRONMENT_VARIABLE) + + +def env_integer(key, default): + if key in os.environ: + return int(os.environ[key]) + return default + + +# Abort autoscaling if more than this number of errors are encountered. This +# is a safety feature to prevent e.g. runaway node launches. +AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5) + +# The maximum number of nodes to launch in a single request. +# Multiple requests may be made for this batch size, up to +# the limit of AUTOSCALER_MAX_CONCURRENT_LAUNCHES. +AUTOSCALER_MAX_LAUNCH_BATCH = env_integer("AUTOSCALER_MAX_LAUNCH_BATCH", 5) + +# Max number of nodes to launch at a time. +AUTOSCALER_MAX_CONCURRENT_LAUNCHES = env_integer( + "AUTOSCALER_MAX_CONCURRENT_LAUNCHES", 10) + +# Interval at which to perform autoscaling updates. +AUTOSCALER_UPDATE_INTERVAL_S = env_integer("AUTOSCALER_UPDATE_INTERVAL_S", 5) + +# The autoscaler will attempt to restart Ray on nodes it hasn't heard from +# in more than this interval. +AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S", + 30) + +# Max number of retries to AWS (default is 5, time increases exponentially) +BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) +# Max number of retries to create an EC2 node (retry different subnet) +BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5) + +# Host path that Docker mounts attach to +DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount" diff --git a/python/ray/autoscaler/_private/docker.py b/python/ray/autoscaler/_private/docker.py index 80b99b7d7..4d7f20733 100644 --- a/python/ray/autoscaler/_private/docker.py +++ b/python/ray/autoscaler/_private/docker.py @@ -4,9 +4,9 @@ try: # py3 except ImportError: # py2 from pipes import quote -logger = logging.getLogger(__name__) +from ray.autoscaler._private.constants import DOCKER_MOUNT_PREFIX -DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount" +logger = logging.getLogger(__name__) def validate_docker_config(config): diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index b43c602fc..4823ddf1b 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -3,7 +3,7 @@ import time import numpy as np import ray._private.services as services -from ray.ray_constants import MEMORY_RESOURCE_UNIT_BYTES +from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/_private/updater.py b/python/ray/autoscaler/_private/updater.py index f305f3b51..3b6b20431 100644 --- a/python/ray/autoscaler/_private/updater.py +++ b/python/ray/autoscaler/_private/updater.py @@ -15,8 +15,8 @@ from ray.autoscaler._private.command_runner import NODE_START_WAIT_S, \ from ray.autoscaler._private.log_timer import LogTimer from ray.autoscaler._private.cli_logger import cli_logger, cf import ray.autoscaler._private.subprocess_output_util as cmd_output_util - -from ray import ray_constants +from ray.autoscaler._private.constants import \ + RESOURCES_ENVIRONMENT_VARIABLE logger = logging.getLogger(__name__) @@ -410,8 +410,7 @@ class NodeUpdater: for cmd in self.ray_start_commands: if self.node_resources: env_vars = { - ray_constants.RESOURCES_ENVIRONMENT_VARIABLE: self. - node_resources + RESOURCES_ENVIRONMENT_VARIABLE: self.node_resources } else: env_vars = {} diff --git a/python/ray/autoscaler/sdk.py b/python/ray/autoscaler/sdk.py index 801bbb847..123058097 100644 --- a/python/ray/autoscaler/sdk.py +++ b/python/ray/autoscaler/sdk.py @@ -1,6 +1,6 @@ """IMPORTANT: this is an experimental interface and not currently stable.""" -from typing import Optional, List, Union +from typing import Any, Dict, Optional, List, Union import json import os import tempfile @@ -186,3 +186,16 @@ def _as_config_file(cluster_config: Union[dict, str]): if not os.path.exists(cluster_config): raise ValueError("Cluster config not found {}".format(cluster_config)) return cluster_config + + +def bootstrap_config(cluster_config: Dict[str, any], + no_config_cache: bool = False) -> bool: + """Validate and add provider-specific fields to the config. For example, + IAM/authentication may be added here.""" + return commands._bootstrap_config(cluster_config, no_config_cache) + + +def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: + """Fillout default values for a cluster_config based on the provider.""" + from ray.autoscaler._private.util import fillout_defaults + return fillout_defaults(config) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 80def4d53..29f5387b5 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -134,35 +134,9 @@ RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:" RESOURCES_ENVIRONMENT_VARIABLE = "RAY_OVERRIDE_RESOURCES" -# Abort autoscaling if more than this number of errors are encountered. This -# is a safety feature to prevent e.g. runaway node launches. -AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5) - -# The maximum number of nodes to launch in a single request. -# Multiple requests may be made for this batch size, up to -# the limit of AUTOSCALER_MAX_CONCURRENT_LAUNCHES. -AUTOSCALER_MAX_LAUNCH_BATCH = env_integer("AUTOSCALER_MAX_LAUNCH_BATCH", 5) - -# Max number of nodes to launch at a time. -AUTOSCALER_MAX_CONCURRENT_LAUNCHES = env_integer( - "AUTOSCALER_MAX_CONCURRENT_LAUNCHES", 10) - -# Interval at which to perform autoscaling updates. -AUTOSCALER_UPDATE_INTERVAL_S = env_integer("AUTOSCALER_UPDATE_INTERVAL_S", 5) - -# The autoscaler will attempt to restart Ray on nodes it hasn't heard from -# in more than this interval. -AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S", - 30) - # The reporter will report its statistics this often (milliseconds). REPORTER_UPDATE_INTERVAL_MS = env_integer("REPORTER_UPDATE_INTERVAL_MS", 2500) -# Max number of retries to AWS (default is 5, time increases exponentially) -BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) -# Max number of retries to create an EC2 node (retry different subnet) -BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5) - LOGGER_FORMAT = ( "%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s") LOGGER_FORMAT_HELP = f"The logging format. default='{LOGGER_FORMAT}'"