diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 56996a2f4..bc549a963 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -13,12 +13,12 @@ import collections from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized -from ray.autoscaler.node_provider import _get_node_provider from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER, NODE_KIND_UNMANAGED) +from ray.autoscaler._private.providers import _get_node_provider from ray.autoscaler._private.updater import NodeUpdaterThread from ray.autoscaler._private.node_launcher import NodeLauncher from ray.autoscaler._private.resource_demand_scheduler import \ diff --git a/python/ray/autoscaler/_private/aws/config.py b/python/ray/autoscaler/_private/aws/config.py index afa63b949..042504d24 100644 --- a/python/ray/autoscaler/_private/aws/config.py +++ b/python/ray/autoscaler/_private/aws/config.py @@ -13,7 +13,7 @@ import botocore from ray.ray_constants import BOTO_MAX_RETRIES from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD -from ray.autoscaler.node_provider import _PROVIDER_PRETTY_NAMES +from ray.autoscaler._private.providers import _PROVIDER_PRETTY_NAMES from ray.autoscaler._private.aws.utils import LazyDefaultDict, \ handle_boto_error from ray.autoscaler._private.cli_logger import cli_logger, cf diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index dfc240315..948021db2 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -23,7 +23,7 @@ from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \ hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \ DEBUG_AUTOSCALING_STATUS -from ray.autoscaler.node_provider import _get_node_provider, \ +from ray.autoscaler._private.providers import _get_node_provider, \ _NODE_PROVIDERS, _PROVIDER_PRETTY_NAMES from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \ TAG_RAY_NODE_NAME, NODE_KIND_WORKER, NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE diff --git a/python/ray/autoscaler/_private/providers.py b/python/ray/autoscaler/_private/providers.py new file mode 100644 index 000000000..34258ba51 --- /dev/null +++ b/python/ray/autoscaler/_private/providers.py @@ -0,0 +1,164 @@ +import importlib +import logging +import os +from typing import Any, Dict + +import yaml + +logger = logging.getLogger(__name__) + + +def _import_aws(provider_config): + from ray.autoscaler._private.aws.node_provider import AWSNodeProvider + return AWSNodeProvider + + +def _import_gcp(provider_config): + from ray.autoscaler._private.gcp.node_provider import GCPNodeProvider + return GCPNodeProvider + + +def _import_azure(provider_config): + from ray.autoscaler._private.azure.node_provider import AzureNodeProvider + return AzureNodeProvider + + +def _import_local(provider_config): + if "coordinator_address" in provider_config: + from ray.autoscaler._private.local.coordinator_node_provider import ( + CoordinatorSenderNodeProvider) + return CoordinatorSenderNodeProvider + else: + from ray.autoscaler._private.local.node_provider import \ + LocalNodeProvider + return LocalNodeProvider + + +def _import_kubernetes(provider_config): + from ray.autoscaler._private.kubernetes.node_provider import \ + KubernetesNodeProvider + return KubernetesNodeProvider + + +def _import_staroid(provider_config): + from ray.autoscaler._private.staroid.node_provider import \ + StaroidNodeProvider + return StaroidNodeProvider + + +def _load_local_example_config(): + import ray.autoscaler.local as ray_local + return os.path.join( + os.path.dirname(ray_local.__file__), "example-full.yaml") + + +def _load_kubernetes_example_config(): + import ray.autoscaler.kubernetes as ray_kubernetes + return os.path.join( + os.path.dirname(ray_kubernetes.__file__), "example-full.yaml") + + +def _load_aws_example_config(): + import ray.autoscaler.aws as ray_aws + return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml") + + +def _load_gcp_example_config(): + import ray.autoscaler.gcp as ray_gcp + return os.path.join(os.path.dirname(ray_gcp.__file__), "example-full.yaml") + + +def _load_azure_example_config(): + import ray.autoscaler.azure as ray_azure + return os.path.join( + os.path.dirname(ray_azure.__file__), "example-full.yaml") + + +def _load_staroid_example_config(): + import ray.autoscaler.staroid as ray_staroid + return os.path.join( + os.path.dirname(ray_staroid.__file__), "example-full.yaml") + + +def _import_external(provider_config): + provider_cls = _load_class(path=provider_config["module"]) + return provider_cls + + +_NODE_PROVIDERS = { + "local": _import_local, + "aws": _import_aws, + "gcp": _import_gcp, + "azure": _import_azure, + "staroid": _import_staroid, + "kubernetes": _import_kubernetes, + "external": _import_external # Import an external module +} + +_PROVIDER_PRETTY_NAMES = { + "local": "Local", + "aws": "AWS", + "gcp": "GCP", + "azure": "Azure", + "staroid": "Staroid", + "kubernetes": "Kubernetes", + "external": "External" +} + +_DEFAULT_CONFIGS = { + "local": _load_local_example_config, + "aws": _load_aws_example_config, + "gcp": _load_gcp_example_config, + "azure": _load_azure_example_config, + "staroid": _load_staroid_example_config, + "kubernetes": _load_kubernetes_example_config, +} + + +def _load_class(path): + """Load a class at runtime given a full path. + + Example of the path: mypkg.mysubpkg.myclass + """ + class_data = path.split(".") + if len(class_data) < 2: + raise ValueError( + "You need to pass a valid path like mymodule.provider_class") + module_path = ".".join(class_data[:-1]) + class_str = class_data[-1] + module = importlib.import_module(module_path) + return getattr(module, class_str) + + +def _get_node_provider(provider_config: Dict[str, Any], + cluster_name: str) -> Any: + """Retrieve a node provider. + + This returns access to an INTERNAL API. It is not allowed to call this + from any Ray package outside the autoscaler. + """ + importer = _NODE_PROVIDERS.get(provider_config["type"]) + if importer is None: + raise NotImplementedError("Unsupported node provider: {}".format( + provider_config["type"])) + provider_cls = importer(provider_config) + return provider_cls(provider_config, cluster_name) + + +def _get_default_config(provider_config): + """Retrieve a node provider. + + This is an INTERNAL API. It is not allowed to call this from any Ray + package outside the autoscaler. + """ + if provider_config["type"] == "external": + return {} + load_config = _DEFAULT_CONFIGS.get(provider_config["type"]) + if load_config is None: + raise NotImplementedError("Unsupported node provider: {}".format( + provider_config["type"])) + path_to_default = load_config() + with open(path_to_default) as f: + defaults = yaml.safe_load(f) + + return defaults diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index 78cc9e636..890f3d19a 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -8,7 +8,7 @@ from typing import Any, Dict import ray import ray._private.services as services -from ray.autoscaler.node_provider import _get_default_config +from ray.autoscaler._private.providers import _get_default_config from ray.autoscaler._private.docker import validate_docker_config REQUIRED, OPTIONAL = True, False diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 56fc0a1be..151aafbed 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -4,6 +4,12 @@ from typing import Any, List, Tuple, Dict, Optional class CommandRunnerInterface: """Interface to run commands on a remote cluster node. + **Important**: This is an INTERNAL API that is only exposed for the purpose + of implementing custom node providers. It is not allowed to call into + CommandRunner methods from any Ray package outside the autoscaler, only to + define new implementations for use with the "external" node provider + option. + Command runner instances are returned by provider.get_command_runner().""" def run( diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 831687946..b2e935915 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -1,9 +1,5 @@ -import importlib import logging -import os -from typing import Any, Dict - -import yaml +from typing import Any from ray.autoscaler._private.command_runner import \ SSHCommandRunner, DockerCommandRunner @@ -11,155 +7,15 @@ from ray.autoscaler._private.command_runner import \ logger = logging.getLogger(__name__) -def _import_aws(provider_config): - from ray.autoscaler._private.aws.node_provider import AWSNodeProvider - return AWSNodeProvider - - -def _import_gcp(provider_config): - from ray.autoscaler._private.gcp.node_provider import GCPNodeProvider - return GCPNodeProvider - - -def _import_azure(provider_config): - from ray.autoscaler._private.azure.node_provider import AzureNodeProvider - return AzureNodeProvider - - -def _import_local(provider_config): - if "coordinator_address" in provider_config: - from ray.autoscaler._private.local.coordinator_node_provider import ( - CoordinatorSenderNodeProvider) - return CoordinatorSenderNodeProvider - else: - from ray.autoscaler._private.local.node_provider import \ - LocalNodeProvider - return LocalNodeProvider - - -def _import_kubernetes(provider_config): - from ray.autoscaler._private.kubernetes.node_provider import \ - KubernetesNodeProvider - return KubernetesNodeProvider - - -def _import_staroid(provider_config): - from ray.autoscaler._private.staroid.node_provider import \ - StaroidNodeProvider - return StaroidNodeProvider - - -def _load_local_example_config(): - import ray.autoscaler.local as ray_local - return os.path.join( - os.path.dirname(ray_local.__file__), "example-full.yaml") - - -def _load_kubernetes_example_config(): - import ray.autoscaler.kubernetes as ray_kubernetes - return os.path.join( - os.path.dirname(ray_kubernetes.__file__), "example-full.yaml") - - -def _load_aws_example_config(): - import ray.autoscaler.aws as ray_aws - return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml") - - -def _load_gcp_example_config(): - import ray.autoscaler.gcp as ray_gcp - return os.path.join(os.path.dirname(ray_gcp.__file__), "example-full.yaml") - - -def _load_azure_example_config(): - import ray.autoscaler.azure as ray_azure - return os.path.join( - os.path.dirname(ray_azure.__file__), "example-full.yaml") - - -def _load_staroid_example_config(): - import ray.autoscaler.staroid as ray_staroid - return os.path.join( - os.path.dirname(ray_staroid.__file__), "example-full.yaml") - - -def _import_external(provider_config): - provider_cls = _load_class(path=provider_config["module"]) - return provider_cls - - -_NODE_PROVIDERS = { - "local": _import_local, - "aws": _import_aws, - "gcp": _import_gcp, - "azure": _import_azure, - "staroid": _import_staroid, - "kubernetes": _import_kubernetes, - "external": _import_external # Import an external module -} - -_PROVIDER_PRETTY_NAMES = { - "local": "Local", - "aws": "AWS", - "gcp": "GCP", - "azure": "Azure", - "staroid": "Staroid", - "kubernetes": "Kubernetes", - "external": "External" -} - -_DEFAULT_CONFIGS = { - "local": _load_local_example_config, - "aws": _load_aws_example_config, - "gcp": _load_gcp_example_config, - "azure": _load_azure_example_config, - "staroid": _load_staroid_example_config, - "kubernetes": _load_kubernetes_example_config, -} - - -def _load_class(path): - """Load a class at runtime given a full path. - - Example of the path: mypkg.mysubpkg.myclass - """ - class_data = path.split(".") - if len(class_data) < 2: - raise ValueError( - "You need to pass a valid path like mymodule.provider_class") - module_path = ".".join(class_data[:-1]) - class_str = class_data[-1] - module = importlib.import_module(module_path) - return getattr(module, class_str) - - -def _get_node_provider(provider_config: Dict[str, Any], - cluster_name: str) -> Any: - importer = _NODE_PROVIDERS.get(provider_config["type"]) - if importer is None: - raise NotImplementedError("Unsupported node provider: {}".format( - provider_config["type"])) - provider_cls = importer(provider_config) - return provider_cls(provider_config, cluster_name) - - -def _get_default_config(provider_config): - if provider_config["type"] == "external": - return {} - load_config = _DEFAULT_CONFIGS.get(provider_config["type"]) - if load_config is None: - raise NotImplementedError("Unsupported node provider: {}".format( - provider_config["type"])) - path_to_default = load_config() - with open(path_to_default) as f: - defaults = yaml.safe_load(f) - - return defaults - - class NodeProvider: """Interface for getting and returning nodes from a Cloud. + **Important**: This is an INTERNAL API that is only exposed for the purpose + of implementing custom node providers. It is not allowed to call into + NodeProvider methods from any Ray package outside the autoscaler, only to + define new implementations of NodeProvider for use with the "external" node + provider option. + NodeProviders are namespaced by the `cluster_name` parameter; they only operate on nodes within that namespace. diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index e5d4a0fd4..b79ebd372 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -17,9 +17,10 @@ from ray.autoscaler._private.commands import get_or_create_head_node from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.autoscaler import StandardAutoscaler +from ray.autoscaler._private.providers import _NODE_PROVIDERS from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE -from ray.autoscaler.node_provider import _NODE_PROVIDERS, NodeProvider +from ray.autoscaler.node_provider import NodeProvider from ray.test_utils import RayTestTimeoutException import pytest diff --git a/python/ray/tests/test_coordinator_server.py b/python/ray/tests/test_coordinator_server.py index 713bdbf39..726ee9a80 100644 --- a/python/ray/tests/test_coordinator_server.py +++ b/python/ray/tests/test_coordinator_server.py @@ -3,8 +3,9 @@ import unittest import socket import json -from ray.autoscaler.node_provider import _NODE_PROVIDERS, _get_node_provider from ray.autoscaler.local.coordinator_server import OnPremCoordinatorServer +from ray.autoscaler._private.providers import _NODE_PROVIDERS, \ + _get_node_provider from ray.autoscaler._private.local.node_provider import LocalNodeProvider from ray.autoscaler._private.local.coordinator_node_provider import ( CoordinatorSenderNodeProvider) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 1f310822e..3e76c023e 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -9,7 +9,7 @@ import copy import ray from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \ MockProcessRunner -from ray.autoscaler.node_provider import _NODE_PROVIDERS +from ray.autoscaler._private.providers import _NODE_PROVIDERS from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.commands import get_or_create_head_node