[autoscaler] Split autoscaler interface public private (#10898)

This commit is contained in:
Eric Liang
2020-09-18 18:16:23 -07:00
committed by GitHub
parent 9a07c7b963
commit 6a227ae501
56 changed files with 436 additions and 251 deletions
+5
View File
@@ -0,0 +1,5 @@
## Note on interface stability.
All the public Python methods and attributes declared in this package can be considered stable public interfaces (except for those in the _private package).
We also guarantee backwards compatibility for the cluster YAMLs.
@@ -12,16 +12,17 @@ import yaml
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.node_provider import _get_node_provider
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED)
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.node_launcher import NodeLauncher
from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler
from ray.autoscaler.util import ConcurrentCounter, validate_config, \
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler._private.resource_demand_scheduler import \
ResourceDemandScheduler
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
@@ -305,8 +306,8 @@ class StandardAutoscaler:
self.runtime_hash = new_runtime_hash
self.file_mounts_contents_hash = new_file_mounts_contents_hash
if not self.provider:
self.provider = get_node_provider(self.config["provider"],
self.config["cluster_name"])
self.provider = _get_node_provider(self.config["provider"],
self.config["cluster_name"])
# Check whether we can enable the resource demand scheduler.
if "available_node_types" in self.config:
self.available_node_types = self.config["available_node_types"]
@@ -579,8 +580,3 @@ class StandardAutoscaler:
self.provider.terminate_nodes(nodes)
logger.error("StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))
def request_resources(num_cpus=None, num_gpus=None):
raise DeprecationWarning(
"Please use ray.autoscaler.commands.request_resources instead.")
@@ -13,10 +13,11 @@ import botocore
from ray.ray_constants import BOTO_MAX_RETRIES
from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD
from ray.autoscaler.aws.utils import LazyDefaultDict, handle_boto_error
from ray.autoscaler.node_provider import PROVIDER_PRETTY_NAMES
from ray.autoscaler.node_provider import _PROVIDER_PRETTY_NAMES
from ray.autoscaler._private.aws.utils import LazyDefaultDict, \
handle_boto_error
from ray.autoscaler._private.cli_logger import cli_logger
from ray.autoscaler.cli_logger import cli_logger
import colorful as cf
logger = logging.getLogger(__name__)
@@ -100,7 +101,7 @@ def _arn_to_name(arn):
def log_to_cli(config):
provider_name = PROVIDER_PRETTY_NAMES.get("aws", None)
provider_name = _PROVIDER_PRETTY_NAMES.get("aws", None)
cli_logger.doassert(provider_name is not None,
"Could not find a pretty name for the AWS provider.")
@@ -9,14 +9,14 @@ import botocore
from botocore.config import Config
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.aws.config import bootstrap_aws
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE
from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES
from ray.autoscaler.log_timer import LogTimer
from ray.autoscaler._private.aws.config import bootstrap_aws
from ray.autoscaler._private.log_timer import LogTimer
from ray.autoscaler.aws.utils import boto_exception_handler
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.aws.utils import boto_exception_handler
from ray.autoscaler._private.cli_logger import cli_logger
import colorful as cf
logger = logging.getLogger(__name__)
@@ -1,6 +1,6 @@
from collections import defaultdict
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.cli_logger import cli_logger
import colorful as cf
@@ -13,8 +13,8 @@ from azure.mgmt.resource.resources.models import DeploymentMode
from knack.util import CLIError
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.azure.config import bootstrap_azure
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME
from ray.autoscaler._private.azure.config import bootstrap_azure
VM_NAME_MAX_LEN = 64
VM_NAME_UUID_LEN = 8
@@ -4,7 +4,7 @@
# function for demonstration purposes. Primarily useful for tuning color and
# other formatting.
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.cli_logger import cli_logger
import colorful as cf
cli_logger.old_style = False
@@ -11,18 +11,18 @@ import sys
import time
import warnings
from ray.autoscaler.docker import check_bind_mounts_cmd, \
from ray.autoscaler._private.docker import check_bind_mounts_cmd, \
check_docker_running_cmd, \
check_docker_image, \
docker_start_cmds, \
DOCKER_MOUNT_PREFIX, \
with_docker_exec
from ray.autoscaler.log_timer import LogTimer
from ray.autoscaler._private.log_timer import LogTimer
from ray.autoscaler.subprocess_output_util import (
from ray.autoscaler._private.subprocess_output_util import (
run_cmd_redirected, ProcessRunnerError, is_output_redirected)
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.cli_logger import cli_logger
import colorful as cf
logger = logging.getLogger(__name__)
@@ -9,7 +9,7 @@ import sys
import subprocess
import tempfile
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
import click
import yaml
@@ -20,26 +20,23 @@ except ImportError: # py2
from ray.experimental.internal_kv import _internal_kv_get
import ray.services as services
from ray.autoscaler.util import validate_config, hash_runtime_conf, \
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \
hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \
DEBUG_AUTOSCALING_STATUS
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS, \
PROVIDER_PRETTY_NAMES, try_get_log_state, try_logging_config, \
try_reload_log_state
from ray.autoscaler.node_provider import _get_node_provider, \
_NODE_PROVIDERS, _PROVIDER_PRETTY_NAMES
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \
TAG_RAY_NODE_NAME, NODE_KIND_WORKER, NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.command_runner import set_using_login_shells, \
from ray.autoscaler._private.cli_logger import cli_logger
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.command_runner import set_using_login_shells, \
set_rsync_silent
from ray.autoscaler.log_timer import LogTimer
from ray.autoscaler._private.log_timer import LogTimer
from ray.worker import global_worker
from ray.util.debug import log_once
import ray.autoscaler.subprocess_output_util as cmd_output_util
from ray.autoscaler.cli_logger import cli_logger
import ray.autoscaler._private.subprocess_output_util as cmd_output_util
logger = logging.getLogger(__name__)
@@ -59,6 +56,26 @@ def _redis():
return redis_client
def try_logging_config(config):
if config["provider"]["type"] == "aws":
from ray.autoscaler._private.aws.config import log_to_cli
log_to_cli(config)
def try_get_log_state(provider_config):
if provider_config["type"] == "aws":
from ray.autoscaler._private.aws.config import get_log_state
return get_log_state()
def try_reload_log_state(provider_config, log_state):
if not log_state:
return
if provider_config["type"] == "aws":
from ray.autoscaler._private.aws.config import reload_log_state
return reload_log_state(log_state)
def debug_status():
"""Return a debug string for the autoscaler."""
status = _internal_kv_get(DEBUG_AUTOSCALING_STATUS)
@@ -143,14 +160,14 @@ def create_or_update_cluster(config_file: str,
# todo: validate file_mounts, ssh keys, etc.
importer = NODE_PROVIDERS.get(config["provider"]["type"])
importer = _NODE_PROVIDERS.get(config["provider"]["type"])
if not importer:
cli_logger.abort(
"Unknown provider type " + cf.bold("{}") + "\n"
"Available providers are: {}", config["provider"]["type"],
cli_logger.render_list([
k for k in NODE_PROVIDERS.keys()
if NODE_PROVIDERS[k] is not None
k for k in _NODE_PROVIDERS.keys()
if _NODE_PROVIDERS[k] is not None
]))
raise NotImplementedError("Unsupported provider {}".format(
config["provider"]))
@@ -236,7 +253,7 @@ def _bootstrap_config(config: Dict[str, Any],
config_cache.get("_version", "none"), CONFIG_CACHE_VERSION)
validate_config(config)
importer = NODE_PROVIDERS.get(config["provider"]["type"])
importer = _NODE_PROVIDERS.get(config["provider"]["type"])
if not importer:
raise NotImplementedError("Unsupported provider {}".format(
config["provider"]))
@@ -245,7 +262,7 @@ def _bootstrap_config(config: Dict[str, Any],
with cli_logger.timed(
"Checking {} environment settings",
PROVIDER_PRETTY_NAMES.get(config["provider"]["type"])):
_PROVIDER_PRETTY_NAMES.get(config["provider"]["type"])):
resolved_config = provider_cls.bootstrap_config(config)
if not no_config_cache:
@@ -298,7 +315,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
cli_logger.old_exception(
logger, "Ignoring error attempting a clean shutdown.")
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
def remaining_nodes():
@@ -402,7 +419,7 @@ def kill_node(config_file, yes, hard, override_cluster_name):
cli_logger.confirm(yes, "A random node will be killed.")
cli_logger.old_confirm("This will kill a node in your cluster", yes)
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
@@ -491,8 +508,8 @@ def get_or_create_head_node(config,
_provider=None,
_runner=subprocess):
"""Create the cluster head node, which in turn creates the workers."""
provider = (_provider or get_node_provider(config["provider"],
config["cluster_name"]))
provider = (_provider or _get_node_provider(config["provider"],
config["cluster_name"]))
config = copy.deepcopy(config)
config_file = os.path.abspath(config_file)
@@ -793,7 +810,7 @@ def attach_cluster(config_file: str,
def exec_cluster(config_file: str,
*,
cmd: Any = None,
cmd: str = None,
run_env: str = "auto",
screen: bool = False,
tmux: bool = False,
@@ -833,7 +850,7 @@ def exec_cluster(config_file: str,
head_node = _get_head_node(
config, config_file, override_cluster_name, create_if_needed=start)
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
updater = NodeUpdaterThread(
node_id=head_node,
@@ -955,7 +972,7 @@ def rsync(config_file: str,
is_file_mount = True
break
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = []
if all_nodes:
@@ -1010,7 +1027,7 @@ def get_head_node_ip(config_file: str,
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
head_node = _get_head_node(config, config_file, override_cluster_name)
if config.get("provider", {}).get("use_internal_ips", False) is True:
@@ -1024,14 +1041,14 @@ def get_head_node_ip(config_file: str,
def get_worker_node_ips(config_file: str,
override_cluster_name: Optional[str]) -> str:
override_cluster_name: Optional[str]) -> List[str]:
"""Returns worker node IPs for given configuration file."""
config = yaml.safe_load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
@@ -1051,7 +1068,7 @@ def _get_worker_nodes(config, override_cluster_name):
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
return provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
@@ -1064,7 +1081,7 @@ def _get_head_node(config: Dict[str, Any],
config_file: str,
override_cluster_name: Optional[str],
create_if_needed: bool = False) -> str:
provider = get_node_provider(config["provider"], config["cluster_name"])
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
head_node_tags = {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
@@ -4,9 +4,9 @@ import time
import logging
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.gcp.config import bootstrap_gcp
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME
from ray.autoscaler.gcp.config import MAX_POLLS, POLL_INTERVAL, \
from ray.autoscaler._private.gcp.config import bootstrap_gcp
from ray.autoscaler._private.gcp.config import MAX_POLLS, POLL_INTERVAL, \
construct_clients_from_provider_config
logger = logging.getLogger(__name__)
@@ -0,0 +1,48 @@
import kubernetes
from kubernetes.config.config_exception import ConfigException
_configured = False
_core_api = None
_auth_api = None
_extensions_beta_api = None
def _load_config():
global _configured
if _configured:
return
try:
kubernetes.config.load_incluster_config()
except ConfigException:
kubernetes.config.load_kube_config()
_configured = True
def core_api():
global _core_api
if _core_api is None:
_load_config()
_core_api = kubernetes.client.CoreV1Api()
return _core_api
def auth_api():
global _auth_api
if _auth_api is None:
_load_config()
_auth_api = kubernetes.client.RbacAuthorizationV1Api()
return _auth_api
def extensions_beta_api():
global _extensions_beta_api
if _extensions_beta_api is None:
_load_config()
_extensions_beta_api = kubernetes.client.ExtensionsV1beta1Api()
return _extensions_beta_api
log_prefix = "KubernetesNodeProvider: "
@@ -2,7 +2,7 @@ import logging
from kubernetes import client
from ray.autoscaler.kubernetes import auth_api, core_api, log_prefix
from ray.autoscaler._private.kubernetes import auth_api, core_api, log_prefix
logger = logging.getLogger(__name__)
@@ -2,10 +2,11 @@ import logging
from uuid import uuid4
from kubernetes.client.rest import ApiException
from ray.autoscaler.command_runner import KubernetesCommandRunner
from ray.autoscaler.kubernetes import core_api, log_prefix, extensions_beta_api
from ray.autoscaler._private.command_runner import KubernetesCommandRunner
from ray.autoscaler._private.kubernetes import core_api, log_prefix, \
extensions_beta_api
from ray.autoscaler._private.kubernetes.config import bootstrap_kubernetes
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.kubernetes.config import bootstrap_kubernetes
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME
logger = logging.getLogger(__name__)
@@ -6,12 +6,12 @@ import socket
import logging
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.local.config import bootstrap_local
from ray.autoscaler.tags import (
TAG_RAY_NODE_KIND,
NODE_KIND_WORKER,
NODE_KIND_HEAD,
)
from ray.autoscaler._private.local.config import bootstrap_local
logger = logging.getLogger(__name__)
@@ -1,7 +1,7 @@
import datetime
import logging
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.cli_logger import cli_logger
logger = logging.getLogger(__name__)
@@ -7,7 +7,7 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS,
TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME,
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED,
NODE_KIND_WORKER)
from ray.autoscaler.util import hash_launch_conf
from ray.autoscaler._private.util import hash_launch_conf
logger = logging.getLogger(__name__)
@@ -5,7 +5,7 @@ import tempfile
import time
import sys
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.cli_logger import cli_logger
import colorful as cf
CONN_REFUSED_PATIENCE = 30 # how long to wait for sshd to run
@@ -10,13 +10,12 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \
TAG_RAY_FILE_MOUNTS_CONTENTS, \
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \
STATUS_SETTING_UP, STATUS_SYNCING_FILES
from ray.autoscaler.command_runner import NODE_START_WAIT_S, \
from ray.autoscaler._private.command_runner import NODE_START_WAIT_S, \
ProcessRunnerError
from ray.autoscaler.log_timer import LogTimer
from ray.autoscaler._private.log_timer import LogTimer
from ray.autoscaler._private.cli_logger import cli_logger
import ray.autoscaler._private.subprocess_output_util as cmd_output_util
import ray.autoscaler.subprocess_output_util as cmd_output_util
from ray.autoscaler.cli_logger import cli_logger
from ray import ray_constants
import colorful as cf
@@ -8,8 +8,8 @@ from typing import Any, Dict
import ray
import ray.services as services
from ray.autoscaler.node_provider import get_default_config
from ray.autoscaler.docker import validate_docker_config
from ray.autoscaler.node_provider import _get_default_config
from ray.autoscaler._private.docker import validate_docker_config
REQUIRED, OPTIONAL = True, False
RAY_SCHEMA_PATH = os.path.join(
@@ -95,7 +95,7 @@ def prepare_config(config):
def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
defaults = get_default_config(config["provider"])
defaults = _get_default_config(config["provider"])
defaults.update(config)
defaults["auth"] = defaults.get("auth", {})
return defaults
@@ -1,48 +0,0 @@
import kubernetes
from kubernetes.config.config_exception import ConfigException
_configured = False
_core_api = None
_auth_api = None
_extensions_beta_api = None
def _load_config():
global _configured
if _configured:
return
try:
kubernetes.config.load_incluster_config()
except ConfigException:
kubernetes.config.load_kube_config()
_configured = True
def core_api():
global _core_api
if _core_api is None:
_load_config()
_core_api = kubernetes.client.CoreV1Api()
return _core_api
def auth_api():
global _auth_api
if _auth_api is None:
_load_config()
_auth_api = kubernetes.client.RbacAuthorizationV1Api()
return _auth_api
def extensions_beta_api():
global _extensions_beta_api
if _extensions_beta_api is None:
_load_config()
_extensions_beta_api = kubernetes.client.ExtensionsV1beta1Api()
return _extensions_beta_api
log_prefix = "KubernetesNodeProvider: "
@@ -10,7 +10,7 @@ from http.server import SimpleHTTPRequestHandler, HTTPServer
import json
import socket
from ray.autoscaler.local.node_provider import LocalNodeProvider
from ray.autoscaler._private.local.node_provider import LocalNodeProvider
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
+44 -64
View File
@@ -5,126 +5,106 @@ from typing import Any, Dict
import yaml
from ray.autoscaler.command_runner import SSHCommandRunner, DockerCommandRunner
from ray.autoscaler._private.command_runner import \
SSHCommandRunner, DockerCommandRunner
logger = logging.getLogger(__name__)
def import_aws(provider_config):
from ray.autoscaler.aws.node_provider import AWSNodeProvider
def _import_aws(provider_config):
from ray.autoscaler._private.aws.node_provider import AWSNodeProvider
return AWSNodeProvider
def import_gcp(provider_config):
from ray.autoscaler.gcp.node_provider import GCPNodeProvider
def _import_gcp(provider_config):
from ray.autoscaler._private.gcp.node_provider import GCPNodeProvider
return GCPNodeProvider
def import_azure(provider_config):
from ray.autoscaler.azure.node_provider import AzureNodeProvider
def _import_azure(provider_config):
from ray.autoscaler._private.azure.node_provider import AzureNodeProvider
return AzureNodeProvider
def import_local(provider_config):
def _import_local(provider_config):
if "coordinator_address" in provider_config:
from ray.autoscaler.local.coordinator_node_provider import (
from ray.autoscaler._private.local.coordinator_node_provider import (
CoordinatorSenderNodeProvider)
return CoordinatorSenderNodeProvider
else:
from ray.autoscaler.local.node_provider import LocalNodeProvider
from ray.autoscaler._private.local.node_provider import \
LocalNodeProvider
return LocalNodeProvider
def import_kubernetes(provider_config):
from ray.autoscaler.kubernetes.node_provider import KubernetesNodeProvider
def _import_kubernetes(provider_config):
from ray.autoscaler._private.kubernetes.node_provider import \
KubernetesNodeProvider
return KubernetesNodeProvider
def load_local_example_config():
def _load_local_example_config():
import ray.autoscaler.local as ray_local
return os.path.join(
os.path.dirname(ray_local.__file__), "example-full.yaml")
def load_kubernetes_example_config():
def _load_kubernetes_example_config():
import ray.autoscaler.kubernetes as ray_kubernetes
return os.path.join(
os.path.dirname(ray_kubernetes.__file__), "example-full.yaml")
def load_aws_example_config():
def _load_aws_example_config():
import ray.autoscaler.aws as ray_aws
return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml")
def load_gcp_example_config():
def _load_gcp_example_config():
import ray.autoscaler.gcp as ray_gcp
return os.path.join(os.path.dirname(ray_gcp.__file__), "example-full.yaml")
def load_azure_example_config():
def _load_azure_example_config():
import ray.autoscaler.azure as ray_azure
return os.path.join(
os.path.dirname(ray_azure.__file__), "example-full.yaml")
def import_external(provider_config):
provider_cls = load_class(path=provider_config["module"])
def _import_external(provider_config):
provider_cls = _load_class(path=provider_config["module"])
return provider_cls
NODE_PROVIDERS = {
"local": import_local,
"aws": import_aws,
"gcp": import_gcp,
"azure": import_azure,
"kubernetes": import_kubernetes,
"docker": None,
"external": import_external # Import an external module
_NODE_PROVIDERS = {
"local": _import_local,
"aws": _import_aws,
"gcp": _import_gcp,
"azure": _import_azure,
"kubernetes": _import_kubernetes,
"external": _import_external # Import an external module
}
PROVIDER_PRETTY_NAMES = {
_PROVIDER_PRETTY_NAMES = {
"local": "Local",
"aws": "AWS",
"gcp": "GCP",
"azure": "Azure",
"kubernetes": "Kubernetes",
# "docker": "Docker", # not supported
"external": "External"
}
DEFAULT_CONFIGS = {
"local": load_local_example_config,
"aws": load_aws_example_config,
"gcp": load_gcp_example_config,
"azure": load_azure_example_config,
"kubernetes": load_kubernetes_example_config,
"docker": None,
_DEFAULT_CONFIGS = {
"local": _load_local_example_config,
"aws": _load_aws_example_config,
"gcp": _load_gcp_example_config,
"azure": _load_azure_example_config,
"kubernetes": _load_kubernetes_example_config,
}
def try_logging_config(config):
if config["provider"]["type"] == "aws":
from ray.autoscaler.aws.config import log_to_cli
log_to_cli(config)
def try_get_log_state(provider_config):
if provider_config["type"] == "aws":
from ray.autoscaler.aws.config import get_log_state
return get_log_state()
def try_reload_log_state(provider_config, log_state):
if not log_state:
return
if provider_config["type"] == "aws":
from ray.autoscaler.aws.config import reload_log_state
return reload_log_state(log_state)
def load_class(path):
"""
Load a class at runtime given a full path.
def _load_class(path):
"""Load a class at runtime given a full path.
Example of the path: mypkg.mysubpkg.myclass
"""
@@ -138,9 +118,9 @@ def load_class(path):
return getattr(module, class_str)
def get_node_provider(provider_config: Dict[str, Any],
cluster_name: str) -> Any:
importer = NODE_PROVIDERS.get(provider_config["type"])
def _get_node_provider(provider_config: Dict[str, Any],
cluster_name: str) -> Any:
importer = _NODE_PROVIDERS.get(provider_config["type"])
if importer is None:
raise NotImplementedError("Unsupported node provider: {}".format(
provider_config["type"]))
@@ -148,10 +128,10 @@ def get_node_provider(provider_config: Dict[str, Any],
return provider_cls(provider_config, cluster_name)
def get_default_config(provider_config):
def _get_default_config(provider_config):
if provider_config["type"] == "external":
return {}
load_config = DEFAULT_CONFIGS.get(provider_config["type"])
load_config = _DEFAULT_CONFIGS.get(provider_config["type"])
if load_config is None:
raise NotImplementedError("Unsupported node provider: {}".format(
provider_config["type"]))
+181
View File
@@ -0,0 +1,181 @@
"""IMPORTANT: this is an experimental interface and not currently stable."""
from typing import Optional, List, Union
import json
import os
import tempfile
from ray.autoscaler._private import commands
def create_or_update_cluster(cluster_config: Union[dict, str],
*,
no_restart: bool = False,
restart_only: bool = False,
no_config_cache: bool = False) -> None:
"""Create or updates an autoscaling Ray cluster from a config json.
Args:
cluster_config (Union[str, dict]): Either the config dict of the
cluster, or a path pointing to a file containing the config.
no_restart (bool): Whether to skip restarting Ray services during the
update. This avoids interrupting running jobs and can be used to
dynamically adjust autoscaler configuration.
restart_only (bool): Whether to skip running setup commands and only
restart Ray. This cannot be used with 'no-restart'.
no_config_cache (bool): Whether to disable the config cache and fully
resolve all environment settings from the Cloud provider again.
"""
return commands.create_or_update_cluster(
config_file=_as_config_file(cluster_config),
override_min_workers=None,
override_max_workers=None,
no_restart=no_restart,
restart_only=restart_only,
yes=True,
override_cluster_name=None,
no_config_cache=no_config_cache,
redirect_command_output=None,
use_login_shells=True)
def teardown_cluster(cluster_config: Union[dict, str]) -> None:
"""Destroys all nodes of a Ray cluster described by a config json.
Args:
cluster_config (Union[str, dict]): Either the config dict of the
cluster, or a path pointing to a file containing the config.
"""
return commands.teardown_cluster(
config_file=_as_config_file(cluster_config),
yes=True,
workers_only=False,
override_cluster_name=None,
keep_min_workers=False)
def run_on_cluster(cluster_config: Union[dict, str],
*,
cmd: Optional[str] = None,
run_env: str = "auto",
no_config_cache: bool = False,
port_forward: Union[int, List[int]] = None,
with_output: bool = False) -> str:
"""Runs a command on the specified cluster.
Args:
cluster_config (Union[str, dict]): Either the config dict of the
cluster, or a path pointing to a file containing the config.
cmd (str): the command to run, or None for a no-op command.
run_env (str): whether to run the command on the host or in a
container. Select between "auto", "host" and "docker".
no_config_cache (bool): Whether to disable the config cache and fully
resolve all environment settings from the Cloud provider again.
port_forward (int or list[int]): port(s) to forward.
with_output (bool): Whether to capture command output.
Returns:
The output of the command as a string.
"""
return commands.exec_cluster(
_as_config_file(cluster_config),
cmd=cmd,
run_env=run_env,
screen=False,
tmux=False,
stop=False,
start=False,
override_cluster_name=None,
no_config_cache=no_config_cache,
port_forward=port_forward,
with_output=with_output)
def rsync(cluster_config: Union[dict, str],
*,
source: str,
target: str,
down: bool,
no_config_cache: bool = False):
"""Rsyncs files to or from the cluster.
Args:
cluster_config (Union[str, dict]): Either the config dict of the
cluster, or a path pointing to a file containing the config.
source (str): rsync source argument.
target (str): rsync target argument.
down (bool): whether we're syncing remote -> local.
no_config_cache (bool): Whether to disable the config cache and fully
resolve all environment settings from the Cloud provider again.
Raises:
RuntimeError if the cluster head node is not found.
"""
return commands.rsync(
config_file=_as_config_file(cluster_config),
source=source,
target=target,
override_cluster_name=None,
down=down,
no_config_cache=no_config_cache,
all_nodes=False)
def get_head_node_ip(cluster_config: Union[dict, str]) -> str:
"""Returns head node IP for given configuration file if exists.
Args:
cluster_config (Union[str, dict]): Either the config dict of the
cluster, or a path pointing to a file containing the config.
Returns:
The ip address of the cluster head node.
Raises:
RuntimeError if the cluster is not found.
"""
return commands.get_head_node_ip(_as_config_file(cluster_config))
def get_worker_node_ips(cluster_config: Union[dict, str]) -> List[str]:
"""Returns worker node IPs for given configuration file.
Args:
cluster_config (Union[str, dict]): Either the config dict of the
cluster, or a path pointing to a file containing the config.
Returns:
List of worker node ip addresses.
Raises:
RuntimeError if the cluster is not found.
"""
return commands.get_worker_node_ips(_as_config_file(cluster_config))
def request_resources(num_cpus=None, bundles=None):
"""Remotely request some CPU or GPU resources from the autoscaler.
This function is to be called e.g. on a node before submitting a bunch of
ray.remote calls to ensure that resources rapidly become available.
This function is EXPERIMENTAL.
Args:
num_cpus: int -- the number of CPU cores to request
bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This
only has an effect if you've configured `available_node_types`
if your cluster config.
"""
return commands.request_resources(num_cpus, bundles)
def _as_config_file(cluster_config: Union[dict, str]):
if isinstance(cluster_config, dict):
tmp = tempfile.NamedTemporaryFile("w", prefix="autoscaler-sdk-tmp-")
tmp.write(json.dumps(cluster_config))
tmp.flush()
cluster_config = tmp.name
if not os.path.exists(cluster_config):
raise ValueError("Cluster config not found {}".format(cluster_config))
return cluster_config
+7 -6
View File
@@ -6,13 +6,13 @@ import traceback
import json
import ray
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.commands import teardown_cluster
from ray.autoscaler._private.load_metrics import LoadMetrics
import ray.gcs_utils
import ray.utils
import ray.ray_constants as ray_constants
from ray.utils import binary_to_hex, setup_logger
from ray.autoscaler.commands import teardown_cluster
import redis
logger = logging.getLogger(__name__)
@@ -160,9 +160,10 @@ class Monitor:
binary_to_hex(job_id)))
def autoscaler_resource_request_handler(self, _, data):
"""Handle a notification of a resource request for the autoscaler. This channel
and method are only used by the manual
`ray.autoscaler.commands.request_resources` api.
"""Handle a notification of a resource request for the autoscaler.
This channel and method are only used by the manual
`ray.autoscaler.sdk.request_resources` api.
Args:
channel: unused
+2 -2
View File
@@ -14,14 +14,14 @@ import yaml
import ray
import psutil
import ray.services as services
from ray.autoscaler.commands import (
from ray.autoscaler._private.commands import (
attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster,
rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips,
debug_status, RUN_ENV_TYPES)
import ray.ray_constants as ray_constants
import ray.utils
from ray.autoscaler.cli_logger import cli_logger
from ray.autoscaler._private.cli_logger import cli_logger
import colorful as cf
logger = logging.getLogger(__name__)
+1 -1
View File
@@ -1,6 +1,6 @@
import pytest
from ray.autoscaler.aws.config import _resource_cache
from ray.autoscaler._private.aws.config import _resource_cache
from botocore.stub import Stubber
+10 -9
View File
@@ -4,14 +4,14 @@ from datetime import datetime
# Override global constants used in AWS autoscaler config artifact names.
# This helps ensure that any unmocked test doesn't alter non-test artifacts.
ray.autoscaler.aws.config.RAY = \
ray.autoscaler._private.aws.config.RAY = \
"ray-autoscaler-aws-test"
ray.autoscaler.aws.config.DEFAULT_RAY_INSTANCE_PROFILE = \
ray.autoscaler.aws.config.RAY + "-v1"
ray.autoscaler.aws.config.DEFAULT_RAY_IAM_ROLE = \
ray.autoscaler.aws.config.RAY + "-v1"
ray.autoscaler.aws.config.SECURITY_GROUP_TEMPLATE = \
ray.autoscaler.aws.config.RAY + "-{}"
ray.autoscaler._private.aws.config.DEFAULT_RAY_INSTANCE_PROFILE = \
ray.autoscaler._private.aws.config.RAY + "-v1"
ray.autoscaler._private.aws.config.DEFAULT_RAY_IAM_ROLE = \
ray.autoscaler._private.aws.config.RAY + "-v1"
ray.autoscaler._private.aws.config.SECURITY_GROUP_TEMPLATE = \
ray.autoscaler._private.aws.config.RAY + "-{}"
# Default IAM instance profile to expose to tests.
DEFAULT_INSTANCE_PROFILE = {
@@ -35,7 +35,7 @@ DEFAULT_INSTANCE_PROFILE = {
# Default EC2 key pair to expose to tests.
DEFAULT_KEY_PAIR = {
"KeyFingerprint": "00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00",
"KeyName": ray.autoscaler.aws.config.RAY + "_us-west-2",
"KeyName": ray.autoscaler._private.aws.config.RAY + "_us-west-2",
}
# Primary EC2 subnet to expose to tests.
@@ -69,7 +69,8 @@ DEFAULT_CLUSTER_NAME = "test-cluster-name"
# (prior to inbound rule configuration).
DEFAULT_SG = {
"Description": "Auto-created security group for Ray workers",
"GroupName": ray.autoscaler.aws.config.RAY + "-" + DEFAULT_CLUSTER_NAME,
"GroupName": ray.autoscaler._private.aws.config.RAY + "-" +
DEFAULT_CLUSTER_NAME,
"OwnerId": "test-owner",
"GroupId": "sg-1234abcd",
"VpcId": DEFAULT_SUBNET["VpcId"],
+3 -2
View File
@@ -2,11 +2,12 @@ import os
import yaml
import ray
from ray.autoscaler.commands import prepare_config, validate_config
from ray.autoscaler._private.commands import prepare_config, validate_config
from ray.tests.aws.utils.constants import DEFAULT_CLUSTER_NAME
def get_aws_example_config_file_path(file_name):
import ray.autoscaler.aws
return os.path.join(
os.path.dirname(ray.autoscaler.aws.__file__), file_name)
@@ -20,7 +21,7 @@ def bootstrap_aws_config(config):
config = prepare_config(config)
validate_config(config)
config["cluster_name"] = DEFAULT_CLUSTER_NAME
return ray.autoscaler.aws.config.bootstrap_aws(config)
return ray.autoscaler._private.aws.config.bootstrap_aws(config)
def bootstrap_aws_example_config_file(file_name):
+1 -1
View File
@@ -1,4 +1,4 @@
from ray.autoscaler.aws.config import key_pair
from ray.autoscaler._private.aws.config import key_pair
from ray.tests.aws.utils.constants import DEFAULT_KEY_PAIR
+1 -1
View File
@@ -12,7 +12,7 @@ def configure_iam_role_default(iam_client_stub):
iam_client_stub.add_response(
"get_instance_profile",
expected_params={
"InstanceProfileName": ray.autoscaler.aws.config.
"InstanceProfileName": ray.autoscaler._private.aws.config.
DEFAULT_RAY_INSTANCE_PROFILE
},
service_response={"InstanceProfile": DEFAULT_INSTANCE_PROFILE})
+9 -9
View File
@@ -10,14 +10,14 @@ from jsonschema.exceptions import ValidationError
import ray
import ray.services as services
from ray.autoscaler.util import prepare_config, validate_config
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler._private.util import prepare_config, validate_config
from ray.autoscaler._private.commands import get_or_create_head_node
from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE
from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider
from ray.autoscaler.node_provider import _NODE_PROVIDERS, NodeProvider
from ray.test_utils import RayTestTimeoutException
import pytest
@@ -319,14 +319,14 @@ class LoadMetricsTest(unittest.TestCase):
class AutoscalingTest(unittest.TestCase):
def setUp(self):
NODE_PROVIDERS["mock"] = \
_NODE_PROVIDERS["mock"] = \
lambda config: self.create_provider
self.provider = None
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
self.provider = None
del NODE_PROVIDERS["mock"]
del _NODE_PROVIDERS["mock"]
shutil.rmtree(self.tmpdir)
ray.shutdown()
@@ -1327,7 +1327,7 @@ class AutoscalingTest(unittest.TestCase):
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
# Simulate a second `ray up` call
from ray.autoscaler import util
from ray.autoscaler._private import util
util._hash_cache = {}
runner = MockProcessRunner()
lm = LoadMetrics()
+1 -1
View File
@@ -6,7 +6,7 @@ import unittest
import urllib
import yaml
from ray.autoscaler.util import prepare_config, validate_config
from ray.autoscaler._private.util import prepare_config, validate_config
from ray.test_utils import recursive_fnmatch
RAY_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
+1 -1
View File
@@ -34,7 +34,7 @@ from click.testing import CliRunner
from testfixtures import Replacer
from testfixtures.popen import MockPopen, PopenBehaviour
import ray.autoscaler.aws.config as aws_config
import ray.autoscaler._private.aws.config as aws_config
import ray.scripts.scripts as scripts
+2 -2
View File
@@ -1,9 +1,9 @@
import pytest
from ray.tests.test_autoscaler import MockProvider, MockProcessRunner
from ray.autoscaler.command_runner import CommandRunnerInterface, \
from ray.autoscaler._private.command_runner import CommandRunnerInterface, \
SSHCommandRunner, _with_environment_variables, DockerCommandRunner, \
KubernetesCommandRunner
from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX
from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX
from getpass import getuser
import hashlib
+17 -17
View File
@@ -3,10 +3,10 @@ import unittest
import socket
import json
from ray.autoscaler.node_provider import _NODE_PROVIDERS, _get_node_provider
from ray.autoscaler.local.coordinator_server import OnPremCoordinatorServer
from ray.autoscaler.node_provider import NODE_PROVIDERS, get_node_provider
from ray.autoscaler.local.node_provider import LocalNodeProvider
from ray.autoscaler.local.coordinator_node_provider import (
from ray.autoscaler._private.local.node_provider import LocalNodeProvider
from ray.autoscaler._private.local.coordinator_node_provider import (
CoordinatorSenderNodeProvider)
from ray.autoscaler.tags import (TAG_RAY_NODE_KIND, TAG_RAY_CLUSTER_NAME,
TAG_RAY_NODE_NAME, NODE_KIND_WORKER,
@@ -35,10 +35,10 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
"""Check correct import when coordinator_address is in config yaml."""
provider_config = {"coordinator_address": "fake_address:1234"}
coordinator_node_provider = NODE_PROVIDERS.get("local")(
coordinator_node_provider = _NODE_PROVIDERS.get("local")(
provider_config)
assert coordinator_node_provider is CoordinatorSenderNodeProvider
local_node_provider = NODE_PROVIDERS.get("local")({})
local_node_provider = _NODE_PROVIDERS.get("local")({})
assert local_node_provider is LocalNodeProvider
def testClusterStateInit(self):
@@ -59,8 +59,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
},
}
provider_config = cluster_config["provider"]
node_provider = get_node_provider(provider_config,
cluster_config["cluster_name"])
node_provider = _get_node_provider(provider_config,
cluster_config["cluster_name"])
assert isinstance(node_provider, LocalNodeProvider)
expected_workers = {}
expected_workers[provider_config["head_ip"]] = {
@@ -85,8 +85,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
# Test removing workers updates the cluster state.
del expected_workers[provider_config["worker_ips"][0]]
removed_ip = provider_config["worker_ips"].pop()
node_provider = get_node_provider(provider_config,
cluster_config["cluster_name"])
node_provider = _get_node_provider(provider_config,
cluster_config["cluster_name"])
workers = json.loads(open(state_save_path).read())
assert workers == expected_workers
@@ -98,8 +98,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
"state": "terminated",
}
provider_config["worker_ips"].append(removed_ip)
node_provider = get_node_provider(provider_config,
cluster_config["cluster_name"])
node_provider = _get_node_provider(provider_config,
cluster_config["cluster_name"])
workers = json.loads(open(state_save_path).read())
assert workers == expected_workers
@@ -162,8 +162,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
"worker_nodes": {},
}
provider_config = cluster_config["provider"]
node_provider_1 = get_node_provider(provider_config,
cluster_config["cluster_name"])
node_provider_1 = _get_node_provider(provider_config,
cluster_config["cluster_name"])
assert isinstance(node_provider_1, CoordinatorSenderNodeProvider)
assert not node_provider_1.non_terminated_nodes({})
@@ -189,8 +189,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
# Add another cluster.
cluster_config["cluster_name"] = "random_name_2"
provider_config = cluster_config["provider"]
node_provider_2 = get_node_provider(provider_config,
cluster_config["cluster_name"])
node_provider_2 = _get_node_provider(provider_config,
cluster_config["cluster_name"])
assert not node_provider_2.non_terminated_nodes({})
assert not node_provider_2.is_running(self.list_of_node_ips[1])
assert node_provider_2.is_terminated(self.list_of_node_ips[1])
@@ -211,8 +211,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
# Add another cluster (should fail because we only have two nodes).
cluster_config["cluster_name"] = "random_name_3"
provider_config = cluster_config["provider"]
node_provider_3 = get_node_provider(provider_config,
cluster_config["cluster_name"])
node_provider_3 = _get_node_provider(provider_config,
cluster_config["cluster_name"])
assert not node_provider_3.non_terminated_nodes(head_node_tags)
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
cluster_config["cluster_name"])
@@ -8,13 +8,14 @@ import unittest
import ray
from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \
MockProcessRunner
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.node_provider import NODE_PROVIDERS
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
from ray.autoscaler.node_provider import _NODE_PROVIDERS
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.commands import get_or_create_head_node
from ray.autoscaler._private.resource_demand_scheduler import \
_utilization_score, \
get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.test_utils import same_elements
from time import sleep
@@ -221,14 +222,14 @@ class LoadMetricsTest(unittest.TestCase):
class AutoscalingTest(unittest.TestCase):
def setUp(self):
NODE_PROVIDERS["mock"] = \
_NODE_PROVIDERS["mock"] = \
lambda config: self.create_provider
self.provider = None
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
self.provider = None
del NODE_PROVIDERS["mock"]
del _NODE_PROVIDERS["mock"]
shutil.rmtree(self.tmpdir)
ray.shutdown()
+1 -1
View File
@@ -4,7 +4,7 @@ import kubernetes
import subprocess
from ray import services, logger
from ray.autoscaler.command_runner import KubernetesCommandRunner
from ray.autoscaler._private.command_runner import KubernetesCommandRunner
from ray.tune.syncer import NodeSyncer
from ray.tune.sync_client import SyncClient
@@ -1,6 +1,6 @@
import unittest
from ray.autoscaler.command_runner import KUBECTL_RSYNC
from ray.autoscaler._private.command_runner import KUBECTL_RSYNC
from ray.tune.integration.kubernetes import KubernetesSyncer, \
KubernetesSyncClient
from ray.tune.syncer import NodeSyncer