diff --git a/BUILD.bazel b/BUILD.bazel index e4a59518c..84db79b10 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1797,6 +1797,7 @@ filegroup( srcs = glob([ "python/ray/*.py", "python/ray/autoscaler/*.py", + "python/ray/autoscaler/_private/*.py", "python/ray/autoscaler/aws/example-full.yaml", "python/ray/autoscaler/azure/example-full.yaml", "python/ray/autoscaler/gcp/example-full.yaml", diff --git a/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py b/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py index 6ab4ea61c..696855e28 100644 --- a/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py +++ b/ci/long_running_distributed_tests/workloads/pytorch_pbt_failure.py @@ -10,7 +10,7 @@ import torchvision.transforms as transforms import ray from ray import tune -from ray.autoscaler.commands import kill_node +from ray.autoscaler._private.commands import kill_node from ray.tune import CLIReporter from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.schedulers import PopulationBasedTraining diff --git a/python/ray/autoscaler/README.rst b/python/ray/autoscaler/README.rst new file mode 100644 index 000000000..a47b82d79 --- /dev/null +++ b/python/ray/autoscaler/README.rst @@ -0,0 +1,5 @@ +## Note on interface stability. + +All the public Python methods and attributes declared in this package can be considered stable public interfaces (except for those in the _private package). + +We also guarantee backwards compatibility for the cluster YAMLs. diff --git a/python/ray/autoscaler/_private/__init__.py b/python/ray/autoscaler/_private/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py similarity index 97% rename from python/ray/autoscaler/autoscaler.py rename to python/ray/autoscaler/_private/autoscaler.py index eb907176a..f83cabbb6 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -12,16 +12,17 @@ import yaml from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized -from ray.autoscaler.node_provider import get_node_provider +from ray.autoscaler.node_provider import _get_node_provider from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER, NODE_KIND_UNMANAGED) -from ray.autoscaler.updater import NodeUpdaterThread -from ray.autoscaler.node_launcher import NodeLauncher -from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler -from ray.autoscaler.util import ConcurrentCounter, validate_config, \ +from ray.autoscaler._private.updater import NodeUpdaterThread +from ray.autoscaler._private.node_launcher import NodeLauncher +from ray.autoscaler._private.resource_demand_scheduler import \ + ResourceDemandScheduler +from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \ with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ @@ -305,8 +306,8 @@ class StandardAutoscaler: self.runtime_hash = new_runtime_hash self.file_mounts_contents_hash = new_file_mounts_contents_hash if not self.provider: - self.provider = get_node_provider(self.config["provider"], - self.config["cluster_name"]) + self.provider = _get_node_provider(self.config["provider"], + self.config["cluster_name"]) # Check whether we can enable the resource demand scheduler. if "available_node_types" in self.config: self.available_node_types = self.config["available_node_types"] @@ -579,8 +580,3 @@ class StandardAutoscaler: self.provider.terminate_nodes(nodes) logger.error("StandardAutoscaler: terminated {} node(s)".format( len(nodes))) - - -def request_resources(num_cpus=None, num_gpus=None): - raise DeprecationWarning( - "Please use ray.autoscaler.commands.request_resources instead.") diff --git a/python/ray/autoscaler/_private/aws/__init__.py b/python/ray/autoscaler/_private/aws/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/_private/aws/config.py similarity index 98% rename from python/ray/autoscaler/aws/config.py rename to python/ray/autoscaler/_private/aws/config.py index 14a0cac52..7831b9634 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/_private/aws/config.py @@ -13,10 +13,11 @@ import botocore from ray.ray_constants import BOTO_MAX_RETRIES from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD -from ray.autoscaler.aws.utils import LazyDefaultDict, handle_boto_error -from ray.autoscaler.node_provider import PROVIDER_PRETTY_NAMES +from ray.autoscaler.node_provider import _PROVIDER_PRETTY_NAMES +from ray.autoscaler._private.aws.utils import LazyDefaultDict, \ + handle_boto_error +from ray.autoscaler._private.cli_logger import cli_logger -from ray.autoscaler.cli_logger import cli_logger import colorful as cf logger = logging.getLogger(__name__) @@ -100,7 +101,7 @@ def _arn_to_name(arn): def log_to_cli(config): - provider_name = PROVIDER_PRETTY_NAMES.get("aws", None) + provider_name = _PROVIDER_PRETTY_NAMES.get("aws", None) cli_logger.doassert(provider_name is not None, "Could not find a pretty name for the AWS provider.") diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py similarity index 98% rename from python/ray/autoscaler/aws/node_provider.py rename to python/ray/autoscaler/_private/aws/node_provider.py index 7b43ca63c..4a7301100 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -9,14 +9,14 @@ import botocore from botocore.config import Config from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.aws.config import bootstrap_aws from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \ TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES -from ray.autoscaler.log_timer import LogTimer +from ray.autoscaler._private.aws.config import bootstrap_aws +from ray.autoscaler._private.log_timer import LogTimer -from ray.autoscaler.aws.utils import boto_exception_handler -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.aws.utils import boto_exception_handler +from ray.autoscaler._private.cli_logger import cli_logger import colorful as cf logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/aws/utils.py b/python/ray/autoscaler/_private/aws/utils.py similarity index 98% rename from python/ray/autoscaler/aws/utils.py rename to python/ray/autoscaler/_private/aws/utils.py index 98cab90c5..b75075a4e 100644 --- a/python/ray/autoscaler/aws/utils.py +++ b/python/ray/autoscaler/_private/aws/utils.py @@ -1,6 +1,6 @@ from collections import defaultdict -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.cli_logger import cli_logger import colorful as cf diff --git a/python/ray/autoscaler/_private/azure/__init__.py b/python/ray/autoscaler/_private/azure/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/azure/azure-config-template.json b/python/ray/autoscaler/_private/azure/azure-config-template.json similarity index 100% rename from python/ray/autoscaler/azure/azure-config-template.json rename to python/ray/autoscaler/_private/azure/azure-config-template.json diff --git a/python/ray/autoscaler/azure/azure-vm-template.json b/python/ray/autoscaler/_private/azure/azure-vm-template.json similarity index 100% rename from python/ray/autoscaler/azure/azure-vm-template.json rename to python/ray/autoscaler/_private/azure/azure-vm-template.json diff --git a/python/ray/autoscaler/azure/config.py b/python/ray/autoscaler/_private/azure/config.py similarity index 100% rename from python/ray/autoscaler/azure/config.py rename to python/ray/autoscaler/_private/azure/config.py diff --git a/python/ray/autoscaler/azure/node_provider.py b/python/ray/autoscaler/_private/azure/node_provider.py similarity index 99% rename from python/ray/autoscaler/azure/node_provider.py rename to python/ray/autoscaler/_private/azure/node_provider.py index 8833a68bf..b4a160268 100644 --- a/python/ray/autoscaler/azure/node_provider.py +++ b/python/ray/autoscaler/_private/azure/node_provider.py @@ -13,8 +13,8 @@ from azure.mgmt.resource.resources.models import DeploymentMode from knack.util import CLIError from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.azure.config import bootstrap_azure from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME +from ray.autoscaler._private.azure.config import bootstrap_azure VM_NAME_MAX_LEN = 64 VM_NAME_UUID_LEN = 8 diff --git a/python/ray/autoscaler/cli_logger.py b/python/ray/autoscaler/_private/cli_logger.py similarity index 100% rename from python/ray/autoscaler/cli_logger.py rename to python/ray/autoscaler/_private/cli_logger.py diff --git a/python/ray/autoscaler/cli_logger_demoall.py b/python/ray/autoscaler/_private/cli_logger_demoall.py similarity index 95% rename from python/ray/autoscaler/cli_logger_demoall.py rename to python/ray/autoscaler/_private/cli_logger_demoall.py index 88193a675..1f7c1e68b 100755 --- a/python/ray/autoscaler/cli_logger_demoall.py +++ b/python/ray/autoscaler/_private/cli_logger_demoall.py @@ -4,7 +4,7 @@ # function for demonstration purposes. Primarily useful for tuning color and # other formatting. -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.cli_logger import cli_logger import colorful as cf cli_logger.old_style = False diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/_private/command_runner.py similarity index 99% rename from python/ray/autoscaler/command_runner.py rename to python/ray/autoscaler/_private/command_runner.py index 0249c856b..7f11ae176 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/_private/command_runner.py @@ -11,18 +11,18 @@ import sys import time import warnings -from ray.autoscaler.docker import check_bind_mounts_cmd, \ +from ray.autoscaler._private.docker import check_bind_mounts_cmd, \ check_docker_running_cmd, \ check_docker_image, \ docker_start_cmds, \ DOCKER_MOUNT_PREFIX, \ with_docker_exec -from ray.autoscaler.log_timer import LogTimer +from ray.autoscaler._private.log_timer import LogTimer -from ray.autoscaler.subprocess_output_util import ( +from ray.autoscaler._private.subprocess_output_util import ( run_cmd_redirected, ProcessRunnerError, is_output_redirected) -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.cli_logger import cli_logger import colorful as cf logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/_private/commands.py similarity index 94% rename from python/ray/autoscaler/commands.py rename to python/ray/autoscaler/_private/commands.py index df95c8145..9a6a189cf 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -9,7 +9,7 @@ import sys import subprocess import tempfile import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List import click import yaml @@ -20,26 +20,23 @@ except ImportError: # py2 from ray.experimental.internal_kv import _internal_kv_get import ray.services as services -from ray.autoscaler.util import validate_config, hash_runtime_conf, \ +from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL +from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \ hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \ DEBUG_AUTOSCALING_STATUS -from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS, \ - PROVIDER_PRETTY_NAMES, try_get_log_state, try_logging_config, \ - try_reload_log_state +from ray.autoscaler.node_provider import _get_node_provider, \ + _NODE_PROVIDERS, _PROVIDER_PRETTY_NAMES from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \ TAG_RAY_NODE_NAME, NODE_KIND_WORKER, NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE - -from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL -from ray.autoscaler.updater import NodeUpdaterThread -from ray.autoscaler.command_runner import set_using_login_shells, \ +from ray.autoscaler._private.cli_logger import cli_logger +from ray.autoscaler._private.updater import NodeUpdaterThread +from ray.autoscaler._private.command_runner import set_using_login_shells, \ set_rsync_silent -from ray.autoscaler.log_timer import LogTimer +from ray.autoscaler._private.log_timer import LogTimer from ray.worker import global_worker from ray.util.debug import log_once -import ray.autoscaler.subprocess_output_util as cmd_output_util - -from ray.autoscaler.cli_logger import cli_logger +import ray.autoscaler._private.subprocess_output_util as cmd_output_util logger = logging.getLogger(__name__) @@ -59,6 +56,26 @@ def _redis(): return redis_client +def try_logging_config(config): + if config["provider"]["type"] == "aws": + from ray.autoscaler._private.aws.config import log_to_cli + log_to_cli(config) + + +def try_get_log_state(provider_config): + if provider_config["type"] == "aws": + from ray.autoscaler._private.aws.config import get_log_state + return get_log_state() + + +def try_reload_log_state(provider_config, log_state): + if not log_state: + return + if provider_config["type"] == "aws": + from ray.autoscaler._private.aws.config import reload_log_state + return reload_log_state(log_state) + + def debug_status(): """Return a debug string for the autoscaler.""" status = _internal_kv_get(DEBUG_AUTOSCALING_STATUS) @@ -143,14 +160,14 @@ def create_or_update_cluster(config_file: str, # todo: validate file_mounts, ssh keys, etc. - importer = NODE_PROVIDERS.get(config["provider"]["type"]) + importer = _NODE_PROVIDERS.get(config["provider"]["type"]) if not importer: cli_logger.abort( "Unknown provider type " + cf.bold("{}") + "\n" "Available providers are: {}", config["provider"]["type"], cli_logger.render_list([ - k for k in NODE_PROVIDERS.keys() - if NODE_PROVIDERS[k] is not None + k for k in _NODE_PROVIDERS.keys() + if _NODE_PROVIDERS[k] is not None ])) raise NotImplementedError("Unsupported provider {}".format( config["provider"])) @@ -236,7 +253,7 @@ def _bootstrap_config(config: Dict[str, Any], config_cache.get("_version", "none"), CONFIG_CACHE_VERSION) validate_config(config) - importer = NODE_PROVIDERS.get(config["provider"]["type"]) + importer = _NODE_PROVIDERS.get(config["provider"]["type"]) if not importer: raise NotImplementedError("Unsupported provider {}".format( config["provider"])) @@ -245,7 +262,7 @@ def _bootstrap_config(config: Dict[str, Any], with cli_logger.timed( "Checking {} environment settings", - PROVIDER_PRETTY_NAMES.get(config["provider"]["type"])): + _PROVIDER_PRETTY_NAMES.get(config["provider"]["type"])): resolved_config = provider_cls.bootstrap_config(config) if not no_config_cache: @@ -298,7 +315,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, cli_logger.old_exception( logger, "Ignoring error attempting a clean shutdown.") - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: def remaining_nodes(): @@ -402,7 +419,7 @@ def kill_node(config_file, yes, hard, override_cluster_name): cli_logger.confirm(yes, "A random node will be killed.") cli_logger.old_confirm("This will kill a node in your cluster", yes) - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: nodes = provider.non_terminated_nodes({ TAG_RAY_NODE_KIND: NODE_KIND_WORKER @@ -491,8 +508,8 @@ def get_or_create_head_node(config, _provider=None, _runner=subprocess): """Create the cluster head node, which in turn creates the workers.""" - provider = (_provider or get_node_provider(config["provider"], - config["cluster_name"])) + provider = (_provider or _get_node_provider(config["provider"], + config["cluster_name"])) config = copy.deepcopy(config) config_file = os.path.abspath(config_file) @@ -793,7 +810,7 @@ def attach_cluster(config_file: str, def exec_cluster(config_file: str, *, - cmd: Any = None, + cmd: str = None, run_env: str = "auto", screen: bool = False, tmux: bool = False, @@ -833,7 +850,7 @@ def exec_cluster(config_file: str, head_node = _get_head_node( config, config_file, override_cluster_name, create_if_needed=start) - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: updater = NodeUpdaterThread( node_id=head_node, @@ -955,7 +972,7 @@ def rsync(config_file: str, is_file_mount = True break - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: nodes = [] if all_nodes: @@ -1010,7 +1027,7 @@ def get_head_node_ip(config_file: str, if override_cluster_name is not None: config["cluster_name"] = override_cluster_name - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: head_node = _get_head_node(config, config_file, override_cluster_name) if config.get("provider", {}).get("use_internal_ips", False) is True: @@ -1024,14 +1041,14 @@ def get_head_node_ip(config_file: str, def get_worker_node_ips(config_file: str, - override_cluster_name: Optional[str]) -> str: + override_cluster_name: Optional[str]) -> List[str]: """Returns worker node IPs for given configuration file.""" config = yaml.safe_load(open(config_file).read()) if override_cluster_name is not None: config["cluster_name"] = override_cluster_name - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: nodes = provider.non_terminated_nodes({ TAG_RAY_NODE_KIND: NODE_KIND_WORKER @@ -1051,7 +1068,7 @@ def _get_worker_nodes(config, override_cluster_name): if override_cluster_name is not None: config["cluster_name"] = override_cluster_name - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: return provider.non_terminated_nodes({ TAG_RAY_NODE_KIND: NODE_KIND_WORKER @@ -1064,7 +1081,7 @@ def _get_head_node(config: Dict[str, Any], config_file: str, override_cluster_name: Optional[str], create_if_needed: bool = False) -> str: - provider = get_node_provider(config["provider"], config["cluster_name"]) + provider = _get_node_provider(config["provider"], config["cluster_name"]) try: head_node_tags = { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/_private/docker.py similarity index 100% rename from python/ray/autoscaler/docker.py rename to python/ray/autoscaler/_private/docker.py diff --git a/python/ray/autoscaler/_private/gcp/__init__.py b/python/ray/autoscaler/_private/gcp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/gcp/config.py b/python/ray/autoscaler/_private/gcp/config.py similarity index 100% rename from python/ray/autoscaler/gcp/config.py rename to python/ray/autoscaler/_private/gcp/config.py diff --git a/python/ray/autoscaler/gcp/node_provider.py b/python/ray/autoscaler/_private/gcp/node_provider.py similarity index 98% rename from python/ray/autoscaler/gcp/node_provider.py rename to python/ray/autoscaler/_private/gcp/node_provider.py index cbc4aecca..79da7ab3e 100644 --- a/python/ray/autoscaler/gcp/node_provider.py +++ b/python/ray/autoscaler/_private/gcp/node_provider.py @@ -4,9 +4,9 @@ import time import logging from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.gcp.config import bootstrap_gcp from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME -from ray.autoscaler.gcp.config import MAX_POLLS, POLL_INTERVAL, \ +from ray.autoscaler._private.gcp.config import bootstrap_gcp +from ray.autoscaler._private.gcp.config import MAX_POLLS, POLL_INTERVAL, \ construct_clients_from_provider_config logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/_private/kubernetes/__init__.py b/python/ray/autoscaler/_private/kubernetes/__init__.py new file mode 100644 index 000000000..300addc2b --- /dev/null +++ b/python/ray/autoscaler/_private/kubernetes/__init__.py @@ -0,0 +1,48 @@ +import kubernetes +from kubernetes.config.config_exception import ConfigException + +_configured = False +_core_api = None +_auth_api = None +_extensions_beta_api = None + + +def _load_config(): + global _configured + if _configured: + return + try: + kubernetes.config.load_incluster_config() + except ConfigException: + kubernetes.config.load_kube_config() + _configured = True + + +def core_api(): + global _core_api + if _core_api is None: + _load_config() + _core_api = kubernetes.client.CoreV1Api() + + return _core_api + + +def auth_api(): + global _auth_api + if _auth_api is None: + _load_config() + _auth_api = kubernetes.client.RbacAuthorizationV1Api() + + return _auth_api + + +def extensions_beta_api(): + global _extensions_beta_api + if _extensions_beta_api is None: + _load_config() + _extensions_beta_api = kubernetes.client.ExtensionsV1beta1Api() + + return _extensions_beta_api + + +log_prefix = "KubernetesNodeProvider: " diff --git a/python/ray/autoscaler/kubernetes/config.py b/python/ray/autoscaler/_private/kubernetes/config.py similarity index 98% rename from python/ray/autoscaler/kubernetes/config.py rename to python/ray/autoscaler/_private/kubernetes/config.py index f0e853ec1..6588d5626 100644 --- a/python/ray/autoscaler/kubernetes/config.py +++ b/python/ray/autoscaler/_private/kubernetes/config.py @@ -2,7 +2,7 @@ import logging from kubernetes import client -from ray.autoscaler.kubernetes import auth_api, core_api, log_prefix +from ray.autoscaler._private.kubernetes import auth_api, core_api, log_prefix logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/kubernetes/kubectl-rsync.sh b/python/ray/autoscaler/_private/kubernetes/kubectl-rsync.sh similarity index 100% rename from python/ray/autoscaler/kubernetes/kubectl-rsync.sh rename to python/ray/autoscaler/_private/kubernetes/kubectl-rsync.sh diff --git a/python/ray/autoscaler/kubernetes/node_provider.py b/python/ray/autoscaler/_private/kubernetes/node_provider.py similarity index 96% rename from python/ray/autoscaler/kubernetes/node_provider.py rename to python/ray/autoscaler/_private/kubernetes/node_provider.py index a374397a7..22ff84c50 100644 --- a/python/ray/autoscaler/kubernetes/node_provider.py +++ b/python/ray/autoscaler/_private/kubernetes/node_provider.py @@ -2,10 +2,11 @@ import logging from uuid import uuid4 from kubernetes.client.rest import ApiException -from ray.autoscaler.command_runner import KubernetesCommandRunner -from ray.autoscaler.kubernetes import core_api, log_prefix, extensions_beta_api +from ray.autoscaler._private.command_runner import KubernetesCommandRunner +from ray.autoscaler._private.kubernetes import core_api, log_prefix, \ + extensions_beta_api +from ray.autoscaler._private.kubernetes.config import bootstrap_kubernetes from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.kubernetes.config import bootstrap_kubernetes from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py similarity index 100% rename from python/ray/autoscaler/load_metrics.py rename to python/ray/autoscaler/_private/load_metrics.py diff --git a/python/ray/autoscaler/_private/local/__init__.py b/python/ray/autoscaler/_private/local/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/local/config.py b/python/ray/autoscaler/_private/local/config.py similarity index 100% rename from python/ray/autoscaler/local/config.py rename to python/ray/autoscaler/_private/local/config.py diff --git a/python/ray/autoscaler/local/coordinator_node_provider.py b/python/ray/autoscaler/_private/local/coordinator_node_provider.py similarity index 100% rename from python/ray/autoscaler/local/coordinator_node_provider.py rename to python/ray/autoscaler/_private/local/coordinator_node_provider.py diff --git a/python/ray/autoscaler/local/node_provider.py b/python/ray/autoscaler/_private/local/node_provider.py similarity index 99% rename from python/ray/autoscaler/local/node_provider.py rename to python/ray/autoscaler/_private/local/node_provider.py index 82b1a8654..576ab4667 100644 --- a/python/ray/autoscaler/local/node_provider.py +++ b/python/ray/autoscaler/_private/local/node_provider.py @@ -6,12 +6,12 @@ import socket import logging from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.local.config import bootstrap_local from ray.autoscaler.tags import ( TAG_RAY_NODE_KIND, NODE_KIND_WORKER, NODE_KIND_HEAD, ) +from ray.autoscaler._private.local.config import bootstrap_local logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/log_timer.py b/python/ray/autoscaler/_private/log_timer.py similarity index 92% rename from python/ray/autoscaler/log_timer.py rename to python/ray/autoscaler/_private/log_timer.py index ea60796a3..e8fa6edac 100644 --- a/python/ray/autoscaler/log_timer.py +++ b/python/ray/autoscaler/_private/log_timer.py @@ -1,7 +1,7 @@ import datetime import logging -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.cli_logger import cli_logger logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/node_launcher.py b/python/ray/autoscaler/_private/node_launcher.py similarity index 98% rename from python/ray/autoscaler/node_launcher.py rename to python/ray/autoscaler/_private/node_launcher.py index 4f0572ea9..dc3a8b329 100644 --- a/python/ray/autoscaler/node_launcher.py +++ b/python/ray/autoscaler/_private/node_launcher.py @@ -7,7 +7,7 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, NODE_KIND_WORKER) -from ray.autoscaler.util import hash_launch_conf +from ray.autoscaler._private.util import hash_launch_conf logger = logging.getLogger(__name__) diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py similarity index 100% rename from python/ray/autoscaler/resource_demand_scheduler.py rename to python/ray/autoscaler/_private/resource_demand_scheduler.py diff --git a/python/ray/autoscaler/subprocess_output_util.py b/python/ray/autoscaler/_private/subprocess_output_util.py similarity index 99% rename from python/ray/autoscaler/subprocess_output_util.py rename to python/ray/autoscaler/_private/subprocess_output_util.py index 067707a1c..b22fbdc4b 100644 --- a/python/ray/autoscaler/subprocess_output_util.py +++ b/python/ray/autoscaler/_private/subprocess_output_util.py @@ -5,7 +5,7 @@ import tempfile import time import sys -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.cli_logger import cli_logger import colorful as cf CONN_REFUSED_PATIENCE = 30 # how long to wait for sshd to run diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/_private/updater.py similarity index 98% rename from python/ray/autoscaler/updater.py rename to python/ray/autoscaler/_private/updater.py index 3ae962a83..337d373e3 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/_private/updater.py @@ -10,13 +10,12 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ TAG_RAY_FILE_MOUNTS_CONTENTS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES -from ray.autoscaler.command_runner import NODE_START_WAIT_S, \ +from ray.autoscaler._private.command_runner import NODE_START_WAIT_S, \ ProcessRunnerError -from ray.autoscaler.log_timer import LogTimer +from ray.autoscaler._private.log_timer import LogTimer +from ray.autoscaler._private.cli_logger import cli_logger +import ray.autoscaler._private.subprocess_output_util as cmd_output_util -import ray.autoscaler.subprocess_output_util as cmd_output_util - -from ray.autoscaler.cli_logger import cli_logger from ray import ray_constants import colorful as cf diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/_private/util.py similarity index 97% rename from python/ray/autoscaler/util.py rename to python/ray/autoscaler/_private/util.py index 1dbbbfd88..b2f9bce02 100644 --- a/python/ray/autoscaler/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -8,8 +8,8 @@ from typing import Any, Dict import ray import ray.services as services -from ray.autoscaler.node_provider import get_default_config -from ray.autoscaler.docker import validate_docker_config +from ray.autoscaler.node_provider import _get_default_config +from ray.autoscaler._private.docker import validate_docker_config REQUIRED, OPTIONAL = True, False RAY_SCHEMA_PATH = os.path.join( @@ -95,7 +95,7 @@ def prepare_config(config): def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: - defaults = get_default_config(config["provider"]) + defaults = _get_default_config(config["provider"]) defaults.update(config) defaults["auth"] = defaults.get("auth", {}) return defaults diff --git a/python/ray/autoscaler/kubernetes/__init__.py b/python/ray/autoscaler/kubernetes/__init__.py index 300addc2b..e69de29bb 100644 --- a/python/ray/autoscaler/kubernetes/__init__.py +++ b/python/ray/autoscaler/kubernetes/__init__.py @@ -1,48 +0,0 @@ -import kubernetes -from kubernetes.config.config_exception import ConfigException - -_configured = False -_core_api = None -_auth_api = None -_extensions_beta_api = None - - -def _load_config(): - global _configured - if _configured: - return - try: - kubernetes.config.load_incluster_config() - except ConfigException: - kubernetes.config.load_kube_config() - _configured = True - - -def core_api(): - global _core_api - if _core_api is None: - _load_config() - _core_api = kubernetes.client.CoreV1Api() - - return _core_api - - -def auth_api(): - global _auth_api - if _auth_api is None: - _load_config() - _auth_api = kubernetes.client.RbacAuthorizationV1Api() - - return _auth_api - - -def extensions_beta_api(): - global _extensions_beta_api - if _extensions_beta_api is None: - _load_config() - _extensions_beta_api = kubernetes.client.ExtensionsV1beta1Api() - - return _extensions_beta_api - - -log_prefix = "KubernetesNodeProvider: " diff --git a/python/ray/autoscaler/local/coordinator_server.py b/python/ray/autoscaler/local/coordinator_server.py index 428fa0bd1..13dc851d0 100644 --- a/python/ray/autoscaler/local/coordinator_server.py +++ b/python/ray/autoscaler/local/coordinator_server.py @@ -10,7 +10,7 @@ from http.server import SimpleHTTPRequestHandler, HTTPServer import json import socket -from ray.autoscaler.local.node_provider import LocalNodeProvider +from ray.autoscaler._private.local.node_provider import LocalNodeProvider logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 224420772..5bbbf0fdb 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -5,126 +5,106 @@ from typing import Any, Dict import yaml -from ray.autoscaler.command_runner import SSHCommandRunner, DockerCommandRunner +from ray.autoscaler._private.command_runner import \ + SSHCommandRunner, DockerCommandRunner logger = logging.getLogger(__name__) -def import_aws(provider_config): - from ray.autoscaler.aws.node_provider import AWSNodeProvider +def _import_aws(provider_config): + from ray.autoscaler._private.aws.node_provider import AWSNodeProvider return AWSNodeProvider -def import_gcp(provider_config): - from ray.autoscaler.gcp.node_provider import GCPNodeProvider +def _import_gcp(provider_config): + from ray.autoscaler._private.gcp.node_provider import GCPNodeProvider return GCPNodeProvider -def import_azure(provider_config): - from ray.autoscaler.azure.node_provider import AzureNodeProvider +def _import_azure(provider_config): + from ray.autoscaler._private.azure.node_provider import AzureNodeProvider return AzureNodeProvider -def import_local(provider_config): +def _import_local(provider_config): if "coordinator_address" in provider_config: - from ray.autoscaler.local.coordinator_node_provider import ( + from ray.autoscaler._private.local.coordinator_node_provider import ( CoordinatorSenderNodeProvider) return CoordinatorSenderNodeProvider else: - from ray.autoscaler.local.node_provider import LocalNodeProvider + from ray.autoscaler._private.local.node_provider import \ + LocalNodeProvider return LocalNodeProvider -def import_kubernetes(provider_config): - from ray.autoscaler.kubernetes.node_provider import KubernetesNodeProvider +def _import_kubernetes(provider_config): + from ray.autoscaler._private.kubernetes.node_provider import \ + KubernetesNodeProvider return KubernetesNodeProvider -def load_local_example_config(): +def _load_local_example_config(): import ray.autoscaler.local as ray_local return os.path.join( os.path.dirname(ray_local.__file__), "example-full.yaml") -def load_kubernetes_example_config(): +def _load_kubernetes_example_config(): import ray.autoscaler.kubernetes as ray_kubernetes return os.path.join( os.path.dirname(ray_kubernetes.__file__), "example-full.yaml") -def load_aws_example_config(): +def _load_aws_example_config(): import ray.autoscaler.aws as ray_aws return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml") -def load_gcp_example_config(): +def _load_gcp_example_config(): import ray.autoscaler.gcp as ray_gcp return os.path.join(os.path.dirname(ray_gcp.__file__), "example-full.yaml") -def load_azure_example_config(): +def _load_azure_example_config(): import ray.autoscaler.azure as ray_azure return os.path.join( os.path.dirname(ray_azure.__file__), "example-full.yaml") -def import_external(provider_config): - provider_cls = load_class(path=provider_config["module"]) +def _import_external(provider_config): + provider_cls = _load_class(path=provider_config["module"]) return provider_cls -NODE_PROVIDERS = { - "local": import_local, - "aws": import_aws, - "gcp": import_gcp, - "azure": import_azure, - "kubernetes": import_kubernetes, - "docker": None, - "external": import_external # Import an external module +_NODE_PROVIDERS = { + "local": _import_local, + "aws": _import_aws, + "gcp": _import_gcp, + "azure": _import_azure, + "kubernetes": _import_kubernetes, + "external": _import_external # Import an external module } -PROVIDER_PRETTY_NAMES = { + +_PROVIDER_PRETTY_NAMES = { "local": "Local", "aws": "AWS", "gcp": "GCP", "azure": "Azure", "kubernetes": "Kubernetes", - # "docker": "Docker", # not supported "external": "External" } -DEFAULT_CONFIGS = { - "local": load_local_example_config, - "aws": load_aws_example_config, - "gcp": load_gcp_example_config, - "azure": load_azure_example_config, - "kubernetes": load_kubernetes_example_config, - "docker": None, +_DEFAULT_CONFIGS = { + "local": _load_local_example_config, + "aws": _load_aws_example_config, + "gcp": _load_gcp_example_config, + "azure": _load_azure_example_config, + "kubernetes": _load_kubernetes_example_config, } -def try_logging_config(config): - if config["provider"]["type"] == "aws": - from ray.autoscaler.aws.config import log_to_cli - log_to_cli(config) - - -def try_get_log_state(provider_config): - if provider_config["type"] == "aws": - from ray.autoscaler.aws.config import get_log_state - return get_log_state() - - -def try_reload_log_state(provider_config, log_state): - if not log_state: - return - if provider_config["type"] == "aws": - from ray.autoscaler.aws.config import reload_log_state - return reload_log_state(log_state) - - -def load_class(path): - """ - Load a class at runtime given a full path. +def _load_class(path): + """Load a class at runtime given a full path. Example of the path: mypkg.mysubpkg.myclass """ @@ -138,9 +118,9 @@ def load_class(path): return getattr(module, class_str) -def get_node_provider(provider_config: Dict[str, Any], - cluster_name: str) -> Any: - importer = NODE_PROVIDERS.get(provider_config["type"]) +def _get_node_provider(provider_config: Dict[str, Any], + cluster_name: str) -> Any: + importer = _NODE_PROVIDERS.get(provider_config["type"]) if importer is None: raise NotImplementedError("Unsupported node provider: {}".format( provider_config["type"])) @@ -148,10 +128,10 @@ def get_node_provider(provider_config: Dict[str, Any], return provider_cls(provider_config, cluster_name) -def get_default_config(provider_config): +def _get_default_config(provider_config): if provider_config["type"] == "external": return {} - load_config = DEFAULT_CONFIGS.get(provider_config["type"]) + load_config = _DEFAULT_CONFIGS.get(provider_config["type"]) if load_config is None: raise NotImplementedError("Unsupported node provider: {}".format( provider_config["type"])) diff --git a/python/ray/autoscaler/sdk.py b/python/ray/autoscaler/sdk.py new file mode 100644 index 000000000..1083af76e --- /dev/null +++ b/python/ray/autoscaler/sdk.py @@ -0,0 +1,181 @@ +"""IMPORTANT: this is an experimental interface and not currently stable.""" + +from typing import Optional, List, Union +import json +import os +import tempfile + +from ray.autoscaler._private import commands + + +def create_or_update_cluster(cluster_config: Union[dict, str], + *, + no_restart: bool = False, + restart_only: bool = False, + no_config_cache: bool = False) -> None: + """Create or updates an autoscaling Ray cluster from a config json. + + Args: + cluster_config (Union[str, dict]): Either the config dict of the + cluster, or a path pointing to a file containing the config. + no_restart (bool): Whether to skip restarting Ray services during the + update. This avoids interrupting running jobs and can be used to + dynamically adjust autoscaler configuration. + restart_only (bool): Whether to skip running setup commands and only + restart Ray. This cannot be used with 'no-restart'. + no_config_cache (bool): Whether to disable the config cache and fully + resolve all environment settings from the Cloud provider again. + """ + return commands.create_or_update_cluster( + config_file=_as_config_file(cluster_config), + override_min_workers=None, + override_max_workers=None, + no_restart=no_restart, + restart_only=restart_only, + yes=True, + override_cluster_name=None, + no_config_cache=no_config_cache, + redirect_command_output=None, + use_login_shells=True) + + +def teardown_cluster(cluster_config: Union[dict, str]) -> None: + """Destroys all nodes of a Ray cluster described by a config json. + + Args: + cluster_config (Union[str, dict]): Either the config dict of the + cluster, or a path pointing to a file containing the config. + """ + return commands.teardown_cluster( + config_file=_as_config_file(cluster_config), + yes=True, + workers_only=False, + override_cluster_name=None, + keep_min_workers=False) + + +def run_on_cluster(cluster_config: Union[dict, str], + *, + cmd: Optional[str] = None, + run_env: str = "auto", + no_config_cache: bool = False, + port_forward: Union[int, List[int]] = None, + with_output: bool = False) -> str: + """Runs a command on the specified cluster. + + Args: + cluster_config (Union[str, dict]): Either the config dict of the + cluster, or a path pointing to a file containing the config. + cmd (str): the command to run, or None for a no-op command. + run_env (str): whether to run the command on the host or in a + container. Select between "auto", "host" and "docker". + no_config_cache (bool): Whether to disable the config cache and fully + resolve all environment settings from the Cloud provider again. + port_forward (int or list[int]): port(s) to forward. + with_output (bool): Whether to capture command output. + + Returns: + The output of the command as a string. + """ + return commands.exec_cluster( + _as_config_file(cluster_config), + cmd=cmd, + run_env=run_env, + screen=False, + tmux=False, + stop=False, + start=False, + override_cluster_name=None, + no_config_cache=no_config_cache, + port_forward=port_forward, + with_output=with_output) + + +def rsync(cluster_config: Union[dict, str], + *, + source: str, + target: str, + down: bool, + no_config_cache: bool = False): + """Rsyncs files to or from the cluster. + + Args: + cluster_config (Union[str, dict]): Either the config dict of the + cluster, or a path pointing to a file containing the config. + source (str): rsync source argument. + target (str): rsync target argument. + down (bool): whether we're syncing remote -> local. + no_config_cache (bool): Whether to disable the config cache and fully + resolve all environment settings from the Cloud provider again. + + Raises: + RuntimeError if the cluster head node is not found. + """ + return commands.rsync( + config_file=_as_config_file(cluster_config), + source=source, + target=target, + override_cluster_name=None, + down=down, + no_config_cache=no_config_cache, + all_nodes=False) + + +def get_head_node_ip(cluster_config: Union[dict, str]) -> str: + """Returns head node IP for given configuration file if exists. + + Args: + cluster_config (Union[str, dict]): Either the config dict of the + cluster, or a path pointing to a file containing the config. + + Returns: + The ip address of the cluster head node. + + Raises: + RuntimeError if the cluster is not found. + """ + return commands.get_head_node_ip(_as_config_file(cluster_config)) + + +def get_worker_node_ips(cluster_config: Union[dict, str]) -> List[str]: + """Returns worker node IPs for given configuration file. + + Args: + cluster_config (Union[str, dict]): Either the config dict of the + cluster, or a path pointing to a file containing the config. + + Returns: + List of worker node ip addresses. + + Raises: + RuntimeError if the cluster is not found. + """ + return commands.get_worker_node_ips(_as_config_file(cluster_config)) + + +def request_resources(num_cpus=None, bundles=None): + """Remotely request some CPU or GPU resources from the autoscaler. + + This function is to be called e.g. on a node before submitting a bunch of + ray.remote calls to ensure that resources rapidly become available. + + This function is EXPERIMENTAL. + + Args: + num_cpus: int -- the number of CPU cores to request + bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This + only has an effect if you've configured `available_node_types` + if your cluster config. + """ + return commands.request_resources(num_cpus, bundles) + + +def _as_config_file(cluster_config: Union[dict, str]): + if isinstance(cluster_config, dict): + tmp = tempfile.NamedTemporaryFile("w", prefix="autoscaler-sdk-tmp-") + tmp.write(json.dumps(cluster_config)) + tmp.flush() + cluster_config = tmp.name + if not os.path.exists(cluster_config): + raise ValueError("Cluster config not found {}".format(cluster_config)) + return cluster_config diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 95fb73b8e..971b17f5e 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -6,13 +6,13 @@ import traceback import json import ray -from ray.autoscaler.autoscaler import StandardAutoscaler -from ray.autoscaler.load_metrics import LoadMetrics +from ray.autoscaler._private.autoscaler import StandardAutoscaler +from ray.autoscaler._private.commands import teardown_cluster +from ray.autoscaler._private.load_metrics import LoadMetrics import ray.gcs_utils import ray.utils import ray.ray_constants as ray_constants from ray.utils import binary_to_hex, setup_logger -from ray.autoscaler.commands import teardown_cluster import redis logger = logging.getLogger(__name__) @@ -160,9 +160,10 @@ class Monitor: binary_to_hex(job_id))) def autoscaler_resource_request_handler(self, _, data): - """Handle a notification of a resource request for the autoscaler. This channel - and method are only used by the manual - `ray.autoscaler.commands.request_resources` api. + """Handle a notification of a resource request for the autoscaler. + + This channel and method are only used by the manual + `ray.autoscaler.sdk.request_resources` api. Args: channel: unused diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index fee4897c8..256c672b7 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -14,14 +14,14 @@ import yaml import ray import psutil import ray.services as services -from ray.autoscaler.commands import ( +from ray.autoscaler._private.commands import ( attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster, rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips, debug_status, RUN_ENV_TYPES) import ray.ray_constants as ray_constants import ray.utils -from ray.autoscaler.cli_logger import cli_logger +from ray.autoscaler._private.cli_logger import cli_logger import colorful as cf logger = logging.getLogger(__name__) diff --git a/python/ray/tests/aws/conftest.py b/python/ray/tests/aws/conftest.py index 91762a3d4..ea490e818 100644 --- a/python/ray/tests/aws/conftest.py +++ b/python/ray/tests/aws/conftest.py @@ -1,6 +1,6 @@ import pytest -from ray.autoscaler.aws.config import _resource_cache +from ray.autoscaler._private.aws.config import _resource_cache from botocore.stub import Stubber diff --git a/python/ray/tests/aws/utils/constants.py b/python/ray/tests/aws/utils/constants.py index 4827288a0..0267b8518 100644 --- a/python/ray/tests/aws/utils/constants.py +++ b/python/ray/tests/aws/utils/constants.py @@ -4,14 +4,14 @@ from datetime import datetime # Override global constants used in AWS autoscaler config artifact names. # This helps ensure that any unmocked test doesn't alter non-test artifacts. -ray.autoscaler.aws.config.RAY = \ +ray.autoscaler._private.aws.config.RAY = \ "ray-autoscaler-aws-test" -ray.autoscaler.aws.config.DEFAULT_RAY_INSTANCE_PROFILE = \ - ray.autoscaler.aws.config.RAY + "-v1" -ray.autoscaler.aws.config.DEFAULT_RAY_IAM_ROLE = \ - ray.autoscaler.aws.config.RAY + "-v1" -ray.autoscaler.aws.config.SECURITY_GROUP_TEMPLATE = \ - ray.autoscaler.aws.config.RAY + "-{}" +ray.autoscaler._private.aws.config.DEFAULT_RAY_INSTANCE_PROFILE = \ + ray.autoscaler._private.aws.config.RAY + "-v1" +ray.autoscaler._private.aws.config.DEFAULT_RAY_IAM_ROLE = \ + ray.autoscaler._private.aws.config.RAY + "-v1" +ray.autoscaler._private.aws.config.SECURITY_GROUP_TEMPLATE = \ + ray.autoscaler._private.aws.config.RAY + "-{}" # Default IAM instance profile to expose to tests. DEFAULT_INSTANCE_PROFILE = { @@ -35,7 +35,7 @@ DEFAULT_INSTANCE_PROFILE = { # Default EC2 key pair to expose to tests. DEFAULT_KEY_PAIR = { "KeyFingerprint": "00:11:22:33:44:55:66:77:88:99:AA:BB:CC:DD:EE:FF:00", - "KeyName": ray.autoscaler.aws.config.RAY + "_us-west-2", + "KeyName": ray.autoscaler._private.aws.config.RAY + "_us-west-2", } # Primary EC2 subnet to expose to tests. @@ -69,7 +69,8 @@ DEFAULT_CLUSTER_NAME = "test-cluster-name" # (prior to inbound rule configuration). DEFAULT_SG = { "Description": "Auto-created security group for Ray workers", - "GroupName": ray.autoscaler.aws.config.RAY + "-" + DEFAULT_CLUSTER_NAME, + "GroupName": ray.autoscaler._private.aws.config.RAY + "-" + + DEFAULT_CLUSTER_NAME, "OwnerId": "test-owner", "GroupId": "sg-1234abcd", "VpcId": DEFAULT_SUBNET["VpcId"], diff --git a/python/ray/tests/aws/utils/helpers.py b/python/ray/tests/aws/utils/helpers.py index f1f5f382f..bdab5a796 100644 --- a/python/ray/tests/aws/utils/helpers.py +++ b/python/ray/tests/aws/utils/helpers.py @@ -2,11 +2,12 @@ import os import yaml import ray -from ray.autoscaler.commands import prepare_config, validate_config +from ray.autoscaler._private.commands import prepare_config, validate_config from ray.tests.aws.utils.constants import DEFAULT_CLUSTER_NAME def get_aws_example_config_file_path(file_name): + import ray.autoscaler.aws return os.path.join( os.path.dirname(ray.autoscaler.aws.__file__), file_name) @@ -20,7 +21,7 @@ def bootstrap_aws_config(config): config = prepare_config(config) validate_config(config) config["cluster_name"] = DEFAULT_CLUSTER_NAME - return ray.autoscaler.aws.config.bootstrap_aws(config) + return ray.autoscaler._private.aws.config.bootstrap_aws(config) def bootstrap_aws_example_config_file(file_name): diff --git a/python/ray/tests/aws/utils/mocks.py b/python/ray/tests/aws/utils/mocks.py index 2114cf5d0..4c60abf43 100644 --- a/python/ray/tests/aws/utils/mocks.py +++ b/python/ray/tests/aws/utils/mocks.py @@ -1,4 +1,4 @@ -from ray.autoscaler.aws.config import key_pair +from ray.autoscaler._private.aws.config import key_pair from ray.tests.aws.utils.constants import DEFAULT_KEY_PAIR diff --git a/python/ray/tests/aws/utils/stubs.py b/python/ray/tests/aws/utils/stubs.py index e11b8a351..7840447d8 100644 --- a/python/ray/tests/aws/utils/stubs.py +++ b/python/ray/tests/aws/utils/stubs.py @@ -12,7 +12,7 @@ def configure_iam_role_default(iam_client_stub): iam_client_stub.add_response( "get_instance_profile", expected_params={ - "InstanceProfileName": ray.autoscaler.aws.config. + "InstanceProfileName": ray.autoscaler._private.aws.config. DEFAULT_RAY_INSTANCE_PROFILE }, service_response={"InstanceProfile": DEFAULT_INSTANCE_PROFILE}) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 5862d6171..752d3c354 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -10,14 +10,14 @@ from jsonschema.exceptions import ValidationError import ray import ray.services as services -from ray.autoscaler.util import prepare_config, validate_config -from ray.autoscaler.commands import get_or_create_head_node -from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX -from ray.autoscaler.load_metrics import LoadMetrics -from ray.autoscaler.autoscaler import StandardAutoscaler +from ray.autoscaler._private.util import prepare_config, validate_config +from ray.autoscaler._private.commands import get_or_create_head_node +from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX +from ray.autoscaler._private.load_metrics import LoadMetrics +from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE -from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider +from ray.autoscaler.node_provider import _NODE_PROVIDERS, NodeProvider from ray.test_utils import RayTestTimeoutException import pytest @@ -319,14 +319,14 @@ class LoadMetricsTest(unittest.TestCase): class AutoscalingTest(unittest.TestCase): def setUp(self): - NODE_PROVIDERS["mock"] = \ + _NODE_PROVIDERS["mock"] = \ lambda config: self.create_provider self.provider = None self.tmpdir = tempfile.mkdtemp() def tearDown(self): self.provider = None - del NODE_PROVIDERS["mock"] + del _NODE_PROVIDERS["mock"] shutil.rmtree(self.tmpdir) ray.shutdown() @@ -1327,7 +1327,7 @@ class AutoscalingTest(unittest.TestCase): f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") # Simulate a second `ray up` call - from ray.autoscaler import util + from ray.autoscaler._private import util util._hash_cache = {} runner = MockProcessRunner() lm = LoadMetrics() diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index b768ea531..2d57a2f9a 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -6,7 +6,7 @@ import unittest import urllib import yaml -from ray.autoscaler.util import prepare_config, validate_config +from ray.autoscaler._private.util import prepare_config, validate_config from ray.test_utils import recursive_fnmatch RAY_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index ed922a0f4..21ee42662 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -34,7 +34,7 @@ from click.testing import CliRunner from testfixtures import Replacer from testfixtures.popen import MockPopen, PopenBehaviour -import ray.autoscaler.aws.config as aws_config +import ray.autoscaler._private.aws.config as aws_config import ray.scripts.scripts as scripts diff --git a/python/ray/tests/test_command_runner.py b/python/ray/tests/test_command_runner.py index 977b2e299..bb87a44d2 100644 --- a/python/ray/tests/test_command_runner.py +++ b/python/ray/tests/test_command_runner.py @@ -1,9 +1,9 @@ import pytest from ray.tests.test_autoscaler import MockProvider, MockProcessRunner -from ray.autoscaler.command_runner import CommandRunnerInterface, \ +from ray.autoscaler._private.command_runner import CommandRunnerInterface, \ SSHCommandRunner, _with_environment_variables, DockerCommandRunner, \ KubernetesCommandRunner -from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX +from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX from getpass import getuser import hashlib diff --git a/python/ray/tests/test_coordinator_server.py b/python/ray/tests/test_coordinator_server.py index ef1bb74cc..713bdbf39 100644 --- a/python/ray/tests/test_coordinator_server.py +++ b/python/ray/tests/test_coordinator_server.py @@ -3,10 +3,10 @@ import unittest import socket import json +from ray.autoscaler.node_provider import _NODE_PROVIDERS, _get_node_provider from ray.autoscaler.local.coordinator_server import OnPremCoordinatorServer -from ray.autoscaler.node_provider import NODE_PROVIDERS, get_node_provider -from ray.autoscaler.local.node_provider import LocalNodeProvider -from ray.autoscaler.local.coordinator_node_provider import ( +from ray.autoscaler._private.local.node_provider import LocalNodeProvider +from ray.autoscaler._private.local.coordinator_node_provider import ( CoordinatorSenderNodeProvider) from ray.autoscaler.tags import (TAG_RAY_NODE_KIND, TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, NODE_KIND_WORKER, @@ -35,10 +35,10 @@ class OnPremCoordinatorServerTest(unittest.TestCase): """Check correct import when coordinator_address is in config yaml.""" provider_config = {"coordinator_address": "fake_address:1234"} - coordinator_node_provider = NODE_PROVIDERS.get("local")( + coordinator_node_provider = _NODE_PROVIDERS.get("local")( provider_config) assert coordinator_node_provider is CoordinatorSenderNodeProvider - local_node_provider = NODE_PROVIDERS.get("local")({}) + local_node_provider = _NODE_PROVIDERS.get("local")({}) assert local_node_provider is LocalNodeProvider def testClusterStateInit(self): @@ -59,8 +59,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase): }, } provider_config = cluster_config["provider"] - node_provider = get_node_provider(provider_config, - cluster_config["cluster_name"]) + node_provider = _get_node_provider(provider_config, + cluster_config["cluster_name"]) assert isinstance(node_provider, LocalNodeProvider) expected_workers = {} expected_workers[provider_config["head_ip"]] = { @@ -85,8 +85,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase): # Test removing workers updates the cluster state. del expected_workers[provider_config["worker_ips"][0]] removed_ip = provider_config["worker_ips"].pop() - node_provider = get_node_provider(provider_config, - cluster_config["cluster_name"]) + node_provider = _get_node_provider(provider_config, + cluster_config["cluster_name"]) workers = json.loads(open(state_save_path).read()) assert workers == expected_workers @@ -98,8 +98,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase): "state": "terminated", } provider_config["worker_ips"].append(removed_ip) - node_provider = get_node_provider(provider_config, - cluster_config["cluster_name"]) + node_provider = _get_node_provider(provider_config, + cluster_config["cluster_name"]) workers = json.loads(open(state_save_path).read()) assert workers == expected_workers @@ -162,8 +162,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase): "worker_nodes": {}, } provider_config = cluster_config["provider"] - node_provider_1 = get_node_provider(provider_config, - cluster_config["cluster_name"]) + node_provider_1 = _get_node_provider(provider_config, + cluster_config["cluster_name"]) assert isinstance(node_provider_1, CoordinatorSenderNodeProvider) assert not node_provider_1.non_terminated_nodes({}) @@ -189,8 +189,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase): # Add another cluster. cluster_config["cluster_name"] = "random_name_2" provider_config = cluster_config["provider"] - node_provider_2 = get_node_provider(provider_config, - cluster_config["cluster_name"]) + node_provider_2 = _get_node_provider(provider_config, + cluster_config["cluster_name"]) assert not node_provider_2.non_terminated_nodes({}) assert not node_provider_2.is_running(self.list_of_node_ips[1]) assert node_provider_2.is_terminated(self.list_of_node_ips[1]) @@ -211,8 +211,8 @@ class OnPremCoordinatorServerTest(unittest.TestCase): # Add another cluster (should fail because we only have two nodes). cluster_config["cluster_name"] = "random_name_3" provider_config = cluster_config["provider"] - node_provider_3 = get_node_provider(provider_config, - cluster_config["cluster_name"]) + node_provider_3 = _get_node_provider(provider_config, + cluster_config["cluster_name"]) assert not node_provider_3.non_terminated_nodes(head_node_tags) head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( cluster_config["cluster_name"]) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index f6200b0e5..c3b25fd51 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -8,13 +8,14 @@ import unittest import ray from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ MockProcessRunner -from ray.autoscaler.autoscaler import StandardAutoscaler -from ray.autoscaler.load_metrics import LoadMetrics -from ray.autoscaler.node_provider import NODE_PROVIDERS -from ray.autoscaler.commands import get_or_create_head_node -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND -from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ +from ray.autoscaler.node_provider import _NODE_PROVIDERS +from ray.autoscaler._private.autoscaler import StandardAutoscaler +from ray.autoscaler._private.load_metrics import LoadMetrics +from ray.autoscaler._private.commands import get_or_create_head_node +from ray.autoscaler._private.resource_demand_scheduler import \ + _utilization_score, \ get_bin_pack_residual, get_nodes_for, ResourceDemandScheduler +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND from ray.test_utils import same_elements from time import sleep @@ -221,14 +222,14 @@ class LoadMetricsTest(unittest.TestCase): class AutoscalingTest(unittest.TestCase): def setUp(self): - NODE_PROVIDERS["mock"] = \ + _NODE_PROVIDERS["mock"] = \ lambda config: self.create_provider self.provider = None self.tmpdir = tempfile.mkdtemp() def tearDown(self): self.provider = None - del NODE_PROVIDERS["mock"] + del _NODE_PROVIDERS["mock"] shutil.rmtree(self.tmpdir) ray.shutdown() diff --git a/python/ray/tune/integration/kubernetes.py b/python/ray/tune/integration/kubernetes.py index 1926e9861..adb67a57f 100644 --- a/python/ray/tune/integration/kubernetes.py +++ b/python/ray/tune/integration/kubernetes.py @@ -4,7 +4,7 @@ import kubernetes import subprocess from ray import services, logger -from ray.autoscaler.command_runner import KubernetesCommandRunner +from ray.autoscaler._private.command_runner import KubernetesCommandRunner from ray.tune.syncer import NodeSyncer from ray.tune.sync_client import SyncClient diff --git a/python/ray/tune/tests/test_integration_kubernetes.py b/python/ray/tune/tests/test_integration_kubernetes.py index 0699cdc27..82b880790 100644 --- a/python/ray/tune/tests/test_integration_kubernetes.py +++ b/python/ray/tune/tests/test_integration_kubernetes.py @@ -1,6 +1,6 @@ import unittest -from ray.autoscaler.command_runner import KUBECTL_RSYNC +from ray.autoscaler._private.command_runner import KUBECTL_RSYNC from ray.tune.integration.kubernetes import KubernetesSyncer, \ KubernetesSyncClient from ray.tune.syncer import NodeSyncer