mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 14:39:44 +08:00
[autoscaler] Update Autoscaler SDK (#11185)
* small sdk addition * create RAY_AUTOSCALER_CONSTANTS * better name * move to private * better naming
This commit is contained in:
@@ -26,9 +26,10 @@ from ray.autoscaler._private.resource_demand_scheduler import \
|
||||
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
|
||||
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
|
||||
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
|
||||
from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
|
||||
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
|
||||
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S
|
||||
from ray.autoscaler._private.constants import \
|
||||
AUTOSCALER_MAX_NUM_FAILURES, AUTOSCALER_MAX_LAUNCH_BATCH, \
|
||||
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
|
||||
AUTOSCALER_HEARTBEAT_TIMEOUT_S
|
||||
from six.moves import queue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -11,7 +11,7 @@ import boto3
|
||||
from botocore.config import Config
|
||||
import botocore
|
||||
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES
|
||||
from ray.autoscaler._private.constants import BOTO_MAX_RETRIES
|
||||
from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD
|
||||
from ray.autoscaler._private.providers import _PROVIDER_PRETTY_NAMES
|
||||
from ray.autoscaler._private.aws.utils import LazyDefaultDict, \
|
||||
|
||||
@@ -12,7 +12,8 @@ from botocore.config import Config
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \
|
||||
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES
|
||||
from ray.autoscaler._private.constants import BOTO_MAX_RETRIES, \
|
||||
BOTO_CREATE_MAX_RETRIES
|
||||
from ray.autoscaler._private.aws.config import bootstrap_aws
|
||||
from ray.autoscaler._private.log_timer import LogTimer
|
||||
|
||||
|
||||
@@ -303,8 +303,8 @@ class _CliLogger():
|
||||
|
||||
def set_format(self, format_tmpl=None):
|
||||
if not format_tmpl:
|
||||
import ray.ray_constants as ray_constants
|
||||
format_tmpl = ray_constants.LOGGER_FORMAT
|
||||
from ray.autoscaler._private.constants import LOGGER_FORMAT
|
||||
format_tmpl = LOGGER_FORMAT
|
||||
self._formatter = logging.Formatter(format_tmpl)
|
||||
|
||||
def configure(self, log_style=None, color_mode=None, verbosity=None):
|
||||
|
||||
@@ -19,7 +19,8 @@ except ImportError: # py2
|
||||
|
||||
from ray.experimental.internal_kv import _internal_kv_get
|
||||
import ray._private.services as services
|
||||
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
from ray.autoscaler._private.constants import \
|
||||
AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \
|
||||
hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \
|
||||
DEBUG_AUTOSCALING_STATUS
|
||||
@@ -1046,7 +1047,7 @@ def get_head_node_ip(config_file: str,
|
||||
provider = _get_node_provider(config["provider"], config["cluster_name"])
|
||||
try:
|
||||
head_node = _get_head_node(config, config_file, override_cluster_name)
|
||||
if config.get("provider", {}).get("use_internal_ips", False) is True:
|
||||
if config.get("provider", {}).get("use_internal_ips", False):
|
||||
head_node_ip = provider.internal_ip(head_node)
|
||||
else:
|
||||
head_node_ip = provider.external_ip(head_node)
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
import os
|
||||
|
||||
from ray.ray_constants import ( # noqa F401
|
||||
AUTOSCALER_RESOURCE_REQUEST_CHANNEL, LOGGER_FORMAT,
|
||||
MEMORY_RESOURCE_UNIT_BYTES, RESOURCES_ENVIRONMENT_VARIABLE)
|
||||
|
||||
|
||||
def env_integer(key, default):
|
||||
if key in os.environ:
|
||||
return int(os.environ[key])
|
||||
return default
|
||||
|
||||
|
||||
# Abort autoscaling if more than this number of errors are encountered. This
|
||||
# is a safety feature to prevent e.g. runaway node launches.
|
||||
AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5)
|
||||
|
||||
# The maximum number of nodes to launch in a single request.
|
||||
# Multiple requests may be made for this batch size, up to
|
||||
# the limit of AUTOSCALER_MAX_CONCURRENT_LAUNCHES.
|
||||
AUTOSCALER_MAX_LAUNCH_BATCH = env_integer("AUTOSCALER_MAX_LAUNCH_BATCH", 5)
|
||||
|
||||
# Max number of nodes to launch at a time.
|
||||
AUTOSCALER_MAX_CONCURRENT_LAUNCHES = env_integer(
|
||||
"AUTOSCALER_MAX_CONCURRENT_LAUNCHES", 10)
|
||||
|
||||
# Interval at which to perform autoscaling updates.
|
||||
AUTOSCALER_UPDATE_INTERVAL_S = env_integer("AUTOSCALER_UPDATE_INTERVAL_S", 5)
|
||||
|
||||
# The autoscaler will attempt to restart Ray on nodes it hasn't heard from
|
||||
# in more than this interval.
|
||||
AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S",
|
||||
30)
|
||||
|
||||
# Max number of retries to AWS (default is 5, time increases exponentially)
|
||||
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
|
||||
# Max number of retries to create an EC2 node (retry different subnet)
|
||||
BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5)
|
||||
|
||||
# Host path that Docker mounts attach to
|
||||
DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount"
|
||||
@@ -4,9 +4,9 @@ try: # py3
|
||||
except ImportError: # py2
|
||||
from pipes import quote
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from ray.autoscaler._private.constants import DOCKER_MOUNT_PREFIX
|
||||
|
||||
DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def validate_docker_config(config):
|
||||
|
||||
@@ -3,7 +3,7 @@ import time
|
||||
|
||||
import numpy as np
|
||||
import ray._private.services as services
|
||||
from ray.ray_constants import MEMORY_RESOURCE_UNIT_BYTES
|
||||
from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -15,8 +15,8 @@ from ray.autoscaler._private.command_runner import NODE_START_WAIT_S, \
|
||||
from ray.autoscaler._private.log_timer import LogTimer
|
||||
from ray.autoscaler._private.cli_logger import cli_logger, cf
|
||||
import ray.autoscaler._private.subprocess_output_util as cmd_output_util
|
||||
|
||||
from ray import ray_constants
|
||||
from ray.autoscaler._private.constants import \
|
||||
RESOURCES_ENVIRONMENT_VARIABLE
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -410,8 +410,7 @@ class NodeUpdater:
|
||||
for cmd in self.ray_start_commands:
|
||||
if self.node_resources:
|
||||
env_vars = {
|
||||
ray_constants.RESOURCES_ENVIRONMENT_VARIABLE: self.
|
||||
node_resources
|
||||
RESOURCES_ENVIRONMENT_VARIABLE: self.node_resources
|
||||
}
|
||||
else:
|
||||
env_vars = {}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""IMPORTANT: this is an experimental interface and not currently stable."""
|
||||
|
||||
from typing import Optional, List, Union
|
||||
from typing import Any, Dict, Optional, List, Union
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
@@ -186,3 +186,16 @@ def _as_config_file(cluster_config: Union[dict, str]):
|
||||
if not os.path.exists(cluster_config):
|
||||
raise ValueError("Cluster config not found {}".format(cluster_config))
|
||||
return cluster_config
|
||||
|
||||
|
||||
def bootstrap_config(cluster_config: Dict[str, any],
|
||||
no_config_cache: bool = False) -> bool:
|
||||
"""Validate and add provider-specific fields to the config. For example,
|
||||
IAM/authentication may be added here."""
|
||||
return commands._bootstrap_config(cluster_config, no_config_cache)
|
||||
|
||||
|
||||
def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Fillout default values for a cluster_config based on the provider."""
|
||||
from ray.autoscaler._private.util import fillout_defaults
|
||||
return fillout_defaults(config)
|
||||
|
||||
@@ -134,35 +134,9 @@ RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:"
|
||||
|
||||
RESOURCES_ENVIRONMENT_VARIABLE = "RAY_OVERRIDE_RESOURCES"
|
||||
|
||||
# Abort autoscaling if more than this number of errors are encountered. This
|
||||
# is a safety feature to prevent e.g. runaway node launches.
|
||||
AUTOSCALER_MAX_NUM_FAILURES = env_integer("AUTOSCALER_MAX_NUM_FAILURES", 5)
|
||||
|
||||
# The maximum number of nodes to launch in a single request.
|
||||
# Multiple requests may be made for this batch size, up to
|
||||
# the limit of AUTOSCALER_MAX_CONCURRENT_LAUNCHES.
|
||||
AUTOSCALER_MAX_LAUNCH_BATCH = env_integer("AUTOSCALER_MAX_LAUNCH_BATCH", 5)
|
||||
|
||||
# Max number of nodes to launch at a time.
|
||||
AUTOSCALER_MAX_CONCURRENT_LAUNCHES = env_integer(
|
||||
"AUTOSCALER_MAX_CONCURRENT_LAUNCHES", 10)
|
||||
|
||||
# Interval at which to perform autoscaling updates.
|
||||
AUTOSCALER_UPDATE_INTERVAL_S = env_integer("AUTOSCALER_UPDATE_INTERVAL_S", 5)
|
||||
|
||||
# The autoscaler will attempt to restart Ray on nodes it hasn't heard from
|
||||
# in more than this interval.
|
||||
AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S",
|
||||
30)
|
||||
|
||||
# The reporter will report its statistics this often (milliseconds).
|
||||
REPORTER_UPDATE_INTERVAL_MS = env_integer("REPORTER_UPDATE_INTERVAL_MS", 2500)
|
||||
|
||||
# Max number of retries to AWS (default is 5, time increases exponentially)
|
||||
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
|
||||
# Max number of retries to create an EC2 node (retry different subnet)
|
||||
BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5)
|
||||
|
||||
LOGGER_FORMAT = (
|
||||
"%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s")
|
||||
LOGGER_FORMAT_HELP = f"The logging format. default='{LOGGER_FORMAT}'"
|
||||
|
||||
Reference in New Issue
Block a user