diff --git a/BUILD.bazel b/BUILD.bazel index 106c706a2..d2f36e6f9 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1186,6 +1186,7 @@ filegroup( "python/ray/*.py", "python/ray/autoscaler/*.py", "python/ray/autoscaler/aws/example-full.yaml", + "python/ray/autoscaler/azure/example-full.yaml", "python/ray/autoscaler/gcp/example-full.yaml", "python/ray/autoscaler/local/example-full.yaml", "python/ray/cloudpickle/*.py", diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index d4f7b6021..5043609dc 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -38,7 +38,31 @@ Test that it works by running the following commands from your local machine: .. tip:: For the AWS node configuration, you can set ``"ImageId: latest_dlami"`` to automatically use the newest `Deep Learning AMI `_ for your region. For example, ``head_node: {InstanceType: c5.xlarge, ImageId: latest_dlami}``. -.. note:: You may see a message like: ``bash: cannot set terminal process group (-1): Inappropriate ioctl for device bash: no job control in this shell`` This is a harmless error. If the cluster launcher fails, it is most likely due to some other factor. +.. note:: You may see a message like: ``bash: cannot set terminal process group (-1):`` ``Inappropriate ioctl for device bash: no job control in this shell`` This is a harmless error. If the cluster launcher fails, it is most likely due to some other factor. + +Azure +~~~~~ + +First, install the Azure CLI (``pip install azure-cli azure-core``) then login using (``az login``). + +Set the subscription to use from the command line (``az account set -s ``) or by modifying the provider section of the config provided e.g: `ray/python/ray/autoscaler/azure/example-full.yaml` + +Once the Azure CLI is configured to manage resources on your Azure account, you should be ready to run the autoscaler. The provided `ray/python/ray/autoscaler/azure/example-full.yaml `__ cluster config file will create a small cluster with a Standard DS2v3 head node (on-demand) configured to autoscale up to two Standard DS2v3 `spot workers `__. Note that you'll need to fill in your resource group and location in those templates. + +Test that it works by running the following commands from your local machine: + +.. code-block:: bash + + # Create or update the cluster. When the command finishes, it will print + # out the command that can be used to SSH into the cluster head node. + $ ray up ray/python/ray/autoscaler/azure/example-full.yaml + + # Get a remote screen on the head node. + $ ray attach ray/python/ray/autoscaler/azure/example-full.yaml + $ source activate tensorflow_p36 + $ # Try running a Ray program with 'ray.init(address="auto")'. + # Tear down the cluster. + $ ray down ray/python/ray/autoscaler/azure/example-full.yaml Azure Portal ~~~~~~~~~~~~ diff --git a/doc/source/using-ray-on-a-cluster.rst b/doc/source/using-ray-on-a-cluster.rst index 83c32ec58..9d1031c45 100644 --- a/doc/source/using-ray-on-a-cluster.rst +++ b/doc/source/using-ray-on-a-cluster.rst @@ -5,7 +5,7 @@ Manual Cluster Setup .. note:: - If you're using AWS or GCP you should use the automated `setup commands `_. + If you're using AWS, Azure or GCP you should use the automated `setup commands `_. The instructions in this document work well for small clusters. For larger clusters, consider using the pssh package: ``sudo apt-get install pssh`` or diff --git a/python/ray/autoscaler/azure/__init__.py b/python/ray/autoscaler/azure/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/azure/config.py b/python/ray/autoscaler/azure/config.py new file mode 100644 index 000000000..844471af8 --- /dev/null +++ b/python/ray/autoscaler/azure/config.py @@ -0,0 +1,227 @@ +import logging +import os +import time +import uuid + +from azure.common.exceptions import CloudError, AuthenticationError +from azure.common.client_factory import get_client_from_cli_profile +from azure.mgmt.authorization import AuthorizationManagementClient +from azure.mgmt.network import NetworkManagementClient +from azure.mgmt.resource import ResourceManagementClient +from azure.mgmt.msi import ManagedServiceIdentityClient + +RETRIES = 30 +MSI_NAME = "ray-msi-user-identity" +NSG_NAME = "ray-nsg" +SUBNET_NAME = "ray-subnet" +VNET_NAME = "ray-vnet" + +logger = logging.getLogger(__name__) + + +def bootstrap_azure(config): + config = _configure_resource_group(config) + config = _configure_msi_user(config) + config = _configure_key_pair(config) + config = _configure_network(config) + return config + + +def _get_client(client_class, config): + kwargs = {} + if "subscription_id" in config["provider"]: + kwargs["subscription_id"] = config["provider"]["subscription_id"] + + return get_client_from_cli_profile(client_class=client_class, **kwargs) + + +def _configure_resource_group(config): + # TODO: look at availability sets + # https://docs.microsoft.com/en-us/azure/virtual-machines/windows/tutorial-availability-sets + resource_client = _get_client(ResourceManagementClient, config) + + subscription_id = resource_client.config.subscription_id + logger.info("Using subscription id: %s", subscription_id) + config["provider"]["subscription_id"] = subscription_id + + assert "resource_group" in config["provider"], ( + "Provider config must include resource_group field") + resource_group = config["provider"]["resource_group"] + + assert "location" in config["provider"], ( + "Provider config must include location field") + params = {"location": config["provider"]["location"]} + + if "tags" in config["provider"]: + params["tags"] = config["provider"]["tags"] + + logger.info("Creating/Updating Resource Group: %s", resource_group) + resource_client.resource_groups.create_or_update( + resource_group_name=resource_group, parameters=params) + + return config + + +def _configure_msi_user(config): + msi_client = _get_client(ManagedServiceIdentityClient, config) + resource_client = _get_client(ResourceManagementClient, config) + auth_client = _get_client(AuthorizationManagementClient, config) + + resource_group = config["provider"]["resource_group"] + location = config["provider"]["location"] + + resource_group_id = resource_client.resource_groups.get(resource_group).id + try: + identity = msi_client.user_assigned_identities.list_by_resource_group( + resource_group_name=resource_group, + filter="name eq '{}'".format(MSI_NAME)).next() + logger.info("Found MSI User Assigned Identity: %s", MSI_NAME) + except StopIteration: + logger.info("Creating MSI User Assigned Identity: %s", MSI_NAME) + identity = msi_client.user_assigned_identities.create_or_update( + resource_group_name=resource_group, + resource_name=MSI_NAME, + location=location) + + identity_id = identity.id + principal_id = identity.principal_id + config["provider"]["msi_identity_id"] = identity_id + config["provider"]["msi_identity_principal_id"] = principal_id + + # assign Contributor role for MSI User Identity to resource group + role_id = auth_client.role_definitions.list( + scope=resource_group_id, filter="roleName eq 'Contributor'").next().id + role_params = {"role_definition_id": role_id, "principal_id": principal_id} + + for _ in range(RETRIES): + try: + filter_expr = "principalId eq '{}'".format(principal_id) + assignments = auth_client.role_assignments.list_for_scope( + scope=resource_group_id, filter=filter_expr) + + if any(a.role_definition_id == role_id for a in assignments): + break + + auth_client.role_assignments.create( + scope=resource_group_id, + role_assignment_name=uuid.uuid4(), + parameters=role_params) + logger.info("Assigning Contributor Role to MSI User") + except CloudError as ce: + if ce.inner_exception.error == "PrincipalNotFound": + time.sleep(5) + else: + raise Exception( + "Failed to create contributor role assignment (timeout)") + + return config + + +def _configure_key_pair(config): + ssh_user = config["auth"]["ssh_user"] + + for key_type in ["ssh_private_key", "ssh_public_key"]: + try: + key_path = os.path.expanduser(config["auth"][key_type]) + except KeyError: + raise Exception("Config must define {}".format(key_type)) + except TypeError: + raise Exception("Invalid config value for {}".format(key_type)) + + assert os.path.exists(key_path), ( + "Could not find ssh key: {}".format(key_path)) + + if key_type == "ssh_public_key": + with open(key_path, "r") as f: + public_key = f.read() + + os_profile = { + "admin_username": ssh_user, + "computer_name": None, + "linux_configuration": { + "disable_password_authentication": True, + "ssh": { + "public_keys": [{ + "key_data": public_key, + "path": "/home/{}/.ssh/authorized_keys".format(ssh_user) + }] + } + } + } + for node_type in ["head_node", "worker_nodes"]: + config[node_type]["os_profile"] = os_profile + + return config + + +def _configure_network(config): + # skip this if subnet is manually set in configuration yaml + if "subnet_id" in config["provider"]: + return config + + location = config["provider"]["location"] + resource_group = config["provider"]["resource_group"] + network_client = _get_client(NetworkManagementClient, config) + + vnets = [] + for _ in range(RETRIES): + try: + vnets = list( + network_client.virtual_networks.list( + resource_group_name=resource_group, + filter="name eq '{}'".format(VNET_NAME))) + break + except CloudError: + time.sleep(1) + except AuthenticationError: + # wait for service principal authorization to populate + time.sleep(1) + + # can't update vnet if subnet already exists + if not vnets: + # create vnet + logger.info("Creating/Updating VNet: %s", VNET_NAME) + vnet_params = { + "location": location, + "address_space": { + "address_prefixes": ["10.0.0.0/16"] + } + } + network_client.virtual_networks.create_or_update( + resource_group_name=resource_group, + virtual_network_name=VNET_NAME, + parameters=vnet_params).wait() + + # create subnet + logger.info("Creating/Updating Subnet: %s", SUBNET_NAME) + subnet_params = {"address_prefix": "10.0.0.0/24"} + subnet = network_client.subnets.create_or_update( + resource_group_name=resource_group, + virtual_network_name=VNET_NAME, + subnet_name=SUBNET_NAME, + subnet_parameters=subnet_params).result() + + config["provider"]["subnet_id"] = subnet.id + + # create network security group + logger.info("Creating/Updating Network Security Group: %s", NSG_NAME) + nsg_params = { + "location": location, + "security_rules": [{ + "protocol": "Tcp", + "source_port_range": "*", + "source_address_prefix": "*", + "destination_port_range": "22", + "destination_address_prefix": "*", + "access": "Allow", + "priority": 300, + "direction": "Inbound", + "name": "ssh_rule" + }] + } + network_client.network_security_groups.create_or_update( + resource_group_name=resource_group, + network_security_group_name=NSG_NAME, + parameters=nsg_params).wait() + + return config diff --git a/python/ray/autoscaler/azure/example-full.yaml b/python/ray/autoscaler/azure/example-full.yaml new file mode 100644 index 000000000..3149ee40f --- /dev/null +++ b/python/ray/autoscaler/azure/example-full.yaml @@ -0,0 +1,160 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: default + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 0 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 2 + +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 0 + +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "" # e.g., tensorflow/tensorflow:1.5.0-py3 + container_name: "" # e.g. ray_docker + # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image + # if no cached version is present. + pull_before_run: True + run_options: [] # Extra options to pass into "docker run" + + # Example of running a GPU head with CPU workers + # head_image: "tensorflow/tensorflow:1.13.1-py3" + # head_run_options: + # - --runtime=nvidia + + # worker_image: "ubuntu:18.04" + # worker_run_options: [] + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: azure + # https://azure.microsoft.com/en-us/global-infrastructure/locations + location: westus2 + resource_group: ray-cluster + # set subscription id otherwise the default from az cli will be used + # subscription_id: 00000000-0000-0000-0000-000000000000 + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + # you must specify paths to matching private and public key pair files + # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair + ssh_private_key: ~/.ssh/id_rsa + ssh_public_key: ~/.ssh/id_rsa.pub + +# Provider-specific config for the head node, e.g. instance type. By default +# Ray will auto-configure unspecified fields +# The Azure Python SDK client expects slug_style property names +# For more documentation on available fields, see: +# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python +# Note: the Azure Python SDK expects all parameter keys to be in slug_style +# the styles of parameter values are not changed +head_node: + hardware_profile: + vm_size: Standard_D2s_v3 + storage_profile: + os_disk: + create_option: FromImage + caching: ReadWrite + image_reference: + # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage + publisher: microsoft-dsvm + offer: ubuntu-1804 + sku: 1804-gen2 + version: 20.02.01 + +# Provider-specific config for worker nodes, e.g. instance type. By default +# Ray will auto-configure unspecified fields +# Documentation on fields used can be found here: +# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python +# Note: the Azure Python SDK expects all parameter keys to be in slug_style +# the styles of parameter values are not changed +worker_nodes: + hardware_profile: + vm_size: Standard_F2s_v2 + storage_profile: + os_disk: + create_option: FromImage + caching: ReadWrite + image_reference: + # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage + publisher: microsoft-dsvm + offer: ubuntu-1804 + sku: 1804-gen + version: 20.02.01 + # You can provision additional disk space as follows + # data_disks: + # - disk_size_gb: 1024 + # run workers on spot instances by default + priority: Spot + eviction_policy: Deallocate + billing_profile: + max_price: -1 + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: + # get rid of annoying Ubuntu message + - touch ~/.sudo_as_admin_successful + +# List of shell commands to run to set up nodes. +setup_commands: + # Note: if you're developing Ray, you probably want to create an AMI that + # has your Ray repo pre-cloned. Then, you can replace the pip installs + # below with a git checkout (and possibly a recompile). + # change to use environment desired + #- echo "conda activate py37_pytorch" >> ~/.bashrc + #- echo "conda activate py37_tensorflow" >> ~/.bashrc + - pip install -U https://sdgraystorage.blob.core.windows.net/ray-wheels/ray-0.9.0.dev0-cp37-cp37m-manylinux1_x86_64.whl + # Consider uncommenting these if you also want to run apt-get commands during setup + # - sudo pkill -9 apt-get || true + # - sudo pkill -9 dpkg || true + # - sudo dpkg --configure -a + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: + - pip install azure-cli-core azure-core azure-mgmt-authorization azure-mgmt-compute azure-mgmt-msi azure-mgmt-network + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/python/ray/autoscaler/azure/example-gpu-docker.yaml b/python/ray/autoscaler/azure/example-gpu-docker.yaml new file mode 100644 index 000000000..6240d50ac --- /dev/null +++ b/python/ray/autoscaler/azure/example-gpu-docker.yaml @@ -0,0 +1,110 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: gpu-docker + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 0 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 2 + +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 0 + +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "tensorflow/tensorflow:1.13.1-gpu-py3" + container_name: "ray-nvidia-docker-test" # e.g. ray_docker + run_options: + - --runtime=nvidia + + # # Example of running a GPU head with CPU workers + # head_image: "tensorflow/tensorflow:1.13.1-gpu-py3" + # head_run_options: + # - --runtime=nvidia + + # worker_image: "ubuntu:18.04" + # worker_run_options: [] + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: azure + location: westus2 + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + # you must specify paths to matching private and public key pair files + # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair + ssh_private_key: ~/.ssh/id_rsa + ssh_public_key: ~/.ssh/id_rsa.pub + +# Provider-specific config for the head node, e.g. instance type. By default +# Ray will auto-configure unspecified fields +# The Azure Python SDK client expects slug_style property names +# For more documentation on available fields, see: +# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python +# Note: the Azure Python SDK expects all parameter keys to be in slug_style +# the styles of parameter values are not changed +head_node: + hardware_profile: + vm_size: Standard_NC6s_v3 + +# Provider-specific config for worker nodes, e.g. instance type. By default +# Ray will auto-configure unspecified fields +# The Azure Python SDK client expects slug_style property names +# For more documentation on available fields, see: +# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python +# Note: the Azure Python SDK expects all parameter keys to be in slug_style +# the styles of parameter values are not changed +worker_nodes: + hardware_profile: + vm_size: Standard_NC6s_v3 + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of shell commands to run to set up nodes. +setup_commands: + - pip install -U https://sdgraystorage.blob.core.windows.net/ray-wheels/ray-0.9.0.dev0-cp37-cp37m-manylinux1_x86_64.whl + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: + - pip install azure-cli-core azure-core azure-mgmt-authorization azure-mgmt-compute azure-mgmt-msi azure-mgmt-network + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/python/ray/autoscaler/azure/example-gpu.yaml b/python/ray/autoscaler/azure/example-gpu.yaml new file mode 100644 index 000000000..ef87d1d67 --- /dev/null +++ b/python/ray/autoscaler/azure/example-gpu.yaml @@ -0,0 +1,159 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: default + +# The minimum number of workers nodes to launch in addition to the head +# node. This number should be >= 0. +min_workers: 0 + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. +max_workers: 2 + +# The initial number of worker nodes to launch in addition to the head +# node. When the cluster is first brought up (or when it is refreshed with a +# subsequent `ray up`) this number of nodes will be started. +initial_workers: 0 + +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +autoscaling_mode: default + +# This executes all commands on all nodes in the docker container, +# and opens all the necessary ports to support the Ray cluster. +# Empty string means disabled. +docker: + image: "" # e.g., tensorflow/tensorflow:1.5.0-py3 + container_name: "" # e.g. ray_docker + # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image + # if no cached version is present. + pull_before_run: True + run_options: [] # Extra options to pass into "docker run" + + # Example of running a GPU head with CPU workers + # head_image: "tensorflow/tensorflow:1.13.1-py3" + # head_run_options: + # - --runtime=nvidia + + # worker_image: "ubuntu:18.04" + # worker_run_options: [] + +# The autoscaler will scale up the cluster to this target fraction of resource +# usage. For example, if a cluster of 10 nodes is 100% busy and +# target_utilization is 0.8, it would resize the cluster to 13. This fraction +# can be decreased to increase the aggressiveness of upscaling. +# This value must be less than 1.0 for scaling to happen. +target_utilization_fraction: 0.8 + +# If a node is idle for this many minutes, it will be removed. +idle_timeout_minutes: 5 + +# Cloud-provider specific configuration. +provider: + type: azure + # https://azure.microsoft.com/en-us/global-infrastructure/locations + location: westus2 + resource_group: ray-cluster + # set subscription id otherwise the default from az cli will be used + # subscription_id: 00000000-0000-0000-0000-000000000000 + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + # you must specify paths to matching private and public key pair files + # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair + ssh_private_key: ~/.ssh/id_rsa + ssh_public_key: ~/.ssh/id_rsa.pub + +# Provider-specific config for the head node, e.g. instance type. By default +# Ray will auto-configure unspecified fields +# The Azure Python SDK client expects slug_style property names +# For more documentation on available fields, see: +# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python +# Note: the Azure Python SDK expects all parameter keys to be in slug_style +# the styles of parameter values are not changed +head_node: + hardware_profile: + vm_size: Standard_NC6 + storage_profile: + os_disk: + create_option: FromImage + caching: ReadWrite + image_reference: + # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage + publisher: microsoft-dsvm + offer: ubuntu-1804 + sku: 1804 + version: 20.02.01 + +# Provider-specific config for worker nodes, e.g. instance type. By default +# Ray will auto-configure unspecified fields +# Documentation on fields used can be found here: +# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python +# Note: the Azure Python SDK expects all parameter keys to be in slug_style +# the styles of parameter values are not changed +worker_nodes: + hardware_profile: + vm_size: Standard_NC6 + storage_profile: + os_disk: + create_option: FromImage + caching: ReadWrite + image_reference: + # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage + publisher: microsoft-dsvm + offer: ubuntu-1804 + sku: 1804 + version: 20.02.01 + # You can provision additional disk space as follows + # data_disks: + # - disk_size_gb: 1024 + # run workers on spot instances by default + priority: Spot + eviction_policy: Deallocate + billing_profile: + max_price: -1 + +# Files or directories to copy to the head and worker nodes. The format is a +# dictionary from REMOTE_PATH: LOCAL_PATH, e.g. +file_mounts: { +# "/path1/on/remote/machine": "/path1/on/local/machine", +# "/path2/on/remote/machine": "/path2/on/local/machine", +} + +# List of commands that will be run before `setup_commands`. If docker is +# enabled, these commands will run outside the container and before docker +# is setup. +initialization_commands: + # get rid of annoying Ubuntu message + - touch ~/.sudo_as_admin_successful + +# List of shell commands to run to set up nodes. +setup_commands: + # Note: if you're developing Ray, you probably want to create an AMI that + # has your Ray repo pre-cloned. Then, you can replace the pip installs + # below with a git checkout (and possibly a recompile). + # - echo 'conda activate py37_pytorch' >> ~/.bashrc + - echo 'conda activate py37_tensorflow' >> ~/.bashrc + - pip install -U https://sdgraystorage.blob.core.windows.net/ray-wheels/ray-0.9.0.dev0-cp37-cp37m-manylinux1_x86_64.whl + # Consider uncommenting these if you also want to run apt-get commands during setup + # - sudo pkill -9 apt-get || true + # - sudo pkill -9 dpkg || true + # - sudo dpkg --configure -a + +# Custom commands that will be run on the head node after common setup. +head_setup_commands: + - pip install azure-cli-core azure-core azure-mgmt-authorization azure-mgmt-network azure-mgmt-compute azure-mgmt-msi + +# Custom commands that will be run on worker nodes after common setup. +worker_setup_commands: [] + +# Command to start ray on the head node. You don't need to change this. +head_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 \ No newline at end of file diff --git a/python/ray/autoscaler/azure/example-minimal.yaml b/python/ray/autoscaler/azure/example-minimal.yaml new file mode 100644 index 000000000..7d5c11fcd --- /dev/null +++ b/python/ray/autoscaler/azure/example-minimal.yaml @@ -0,0 +1,20 @@ +# An unique identifier for the head node and workers of this cluster. +cluster_name: minimal + +# The maximum number of workers nodes to launch in addition to the head +# node. This takes precedence over min_workers. min_workers default to 0. +max_workers: 1 + +# Cloud-provider specific configuration. +provider: + type: azure + location: westus2 + resource_group: ray-cluster + +# How Ray will authenticate with newly launched nodes. +auth: + ssh_user: ubuntu + # you must specify paths to matching private and public key pair files + # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair + ssh_private_key: ~/.ssh/id_rsa + ssh_public_key: ~/.ssh/id_rsa.pub diff --git a/python/ray/autoscaler/azure/node_provider.py b/python/ray/autoscaler/azure/node_provider.py new file mode 100644 index 000000000..68cdae055 --- /dev/null +++ b/python/ray/autoscaler/azure/node_provider.py @@ -0,0 +1,291 @@ +import logging +from threading import RLock +from uuid import uuid4 + +from azure.common.client_factory import get_client_from_cli_profile +from msrestazure.azure_active_directory import MSIAuthentication +from azure.mgmt.compute import ComputeManagementClient +from azure.mgmt.network import NetworkManagementClient +from azure.mgmt.compute.models import ResourceIdentityType + +from ray.autoscaler.node_provider import NodeProvider +from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME + +VM_NAME_MAX_LEN = 64 +VM_NAME_UUID_LEN = 8 + +logger = logging.getLogger(__name__) + + +def synchronized(f): + def wrapper(self, *args, **kwargs): + self.lock.acquire() + try: + return f(self, *args, **kwargs) + finally: + self.lock.release() + + return wrapper + + +class AzureNodeProvider(NodeProvider): + """Node Provider for Azure + + This provider assumes Azure credentials are set by running ``az login`` + and the default subscription is configured through ``az account`` + or set in the ``provider`` field of the autoscaler configuration. + + Nodes may be in one of three states: {pending, running, terminated}. Nodes + appear immediately once started by ``create_node``, and transition + immediately to terminated when ``terminate_node`` is called. + """ + + def __init__(self, provider_config, cluster_name): + NodeProvider.__init__(self, provider_config, cluster_name) + kwargs = {} + if "subscription_id" in provider_config: + kwargs["subscription_id"] = provider_config["subscription_id"] + try: + self.compute_client = get_client_from_cli_profile( + client_class=ComputeManagementClient, **kwargs) + self.network_client = get_client_from_cli_profile( + client_class=NetworkManagementClient, **kwargs) + except Exception: + logger.info( + "CLI profile authentication failed. Trying MSI", exc_info=True) + + credentials = MSIAuthentication() + self.compute_client = ComputeManagementClient( + credentials=credentials, **kwargs) + self.network_client = NetworkManagementClient( + credentials=credentials, **kwargs) + + self.lock = RLock() + + # cache node objects + self.cached_nodes = {} + + @synchronized + def _get_filtered_nodes(self, tag_filters): + def match_tags(vm): + for k, v in tag_filters.items(): + if vm.tags.get(k) != v: + return False + return True + + vms = self.compute_client.virtual_machines.list( + resource_group_name=self.provider_config["resource_group"]) + + nodes = [self._extract_metadata(vm) for vm in filter(match_tags, vms)] + self.cached_nodes = {node["name"]: node for node in nodes} + return self.cached_nodes + + def _extract_metadata(self, vm): + # get tags + metadata = {"name": vm.name, "tags": vm.tags, "status": ""} + + # get status + resource_group = self.provider_config["resource_group"] + instance = self.compute_client.virtual_machines.instance_view( + resource_group_name=resource_group, vm_name=vm.name).as_dict() + for status in instance["statuses"]: + code, state = status["code"].split("/") + # skip provisioning status + if code == "PowerState": + metadata["status"] = state + break + + # get ip data + nic_id = vm.network_profile.network_interfaces[0].id + metadata["nic_name"] = nic_id.split("/")[-1] + nic = self.network_client.network_interfaces.get( + resource_group_name=resource_group, + network_interface_name=metadata["nic_name"]) + ip_config = nic.ip_configurations[0] + + if not self.provider_config.get("use_internal_ips", False): + public_ip_id = ip_config.public_ip_address.id + metadata["public_ip_name"] = public_ip_id.split("/")[-1] + public_ip = self.network_client.public_ip_addresses.get( + resource_group_name=resource_group, + public_ip_address_name=metadata["public_ip_name"]) + metadata["external_ip"] = public_ip.ip_address + + metadata["internal_ip"] = ip_config.private_ip_address + + return metadata + + def non_terminated_nodes(self, tag_filters): + """Return a list of node ids filtered by the specified tags dict. + + This list must not include terminated nodes. For performance reasons, + providers are allowed to cache the result of a call to nodes() to + serve single-node queries (e.g. is_running(node_id)). This means that + nodes() must be called again to refresh results. + + Examples: + >>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) + ["node-1", "node-2"] + """ + nodes = self._get_filtered_nodes(tag_filters=tag_filters) + return [ + k for k, v in nodes.items() + if not v["status"].startswith("deallocat") + ] + + def is_running(self, node_id): + """Return whether the specified node is running.""" + # always get current status + node = self._get_node(node_id=node_id) + return node["status"] == "running" + + def is_terminated(self, node_id): + """Return whether the specified node is terminated.""" + # always get current status + node = self._get_node(node_id=node_id) + return node["status"].startswith("deallocat") + + def node_tags(self, node_id): + """Returns the tags of the given node (string dict).""" + return self._get_cached_node(node_id=node_id)["tags"] + + def external_ip(self, node_id): + """Returns the external ip of the given node.""" + ip = (self._get_cached_node(node_id=node_id)["external_ip"] + or self._get_node(node_id=node_id)["external_ip"]) + return ip + + def internal_ip(self, node_id): + """Returns the internal ip (Ray ip) of the given node.""" + ip = (self._get_cached_node(node_id=node_id)["internal_ip"] + or self._get_node(node_id=node_id)["internal_ip"]) + return ip + + def create_node(self, node_config, tags, count): + """Creates a number of nodes within the namespace.""" + # TODO: restart deallocated nodes if possible + location = self.provider_config["location"] + resource_group = self.provider_config["resource_group"] + subnet_id = self.provider_config["subnet_id"] + + config = node_config.copy() + config_tags = config.get("tags", {}) + config_tags.update(tags) + config_tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name + + config["tags"] = config_tags + config["location"] = location + name_tag = config_tags.get(TAG_RAY_NODE_NAME, "node") + + for _ in range(count): + unique_id = uuid4().hex[:VM_NAME_UUID_LEN] + vm_name = "{name}-{id}".format(name=name_tag, id=unique_id) + config["os_profile"]["computer_name"] = vm_name + + try: + assert len(vm_name) <= VM_NAME_MAX_LEN + except AssertionError as e: + e.args += ("name", vm_name) + raise + + ip_configuration = {"name": uuid4(), "subnet": {"id": subnet_id}} + + if not self.provider_config.get("use_internal_ips", False): + # create public ip address + public_ip_addess_params = { + "location": location, + "public_ip_allocation_method": "Dynamic" + } + public_ip_address = ( + self.network_client.public_ip_addresses.create_or_update( + resource_group_name=resource_group, + public_ip_address_name="{}-ip".format(vm_name), + parameters=public_ip_addess_params).result()) + ip_configuration["public_ip_address"] = public_ip_address + + nic_params = { + "location": location, + "ip_configurations": [ip_configuration] + } + nic = self.network_client.network_interfaces.create_or_update( + resource_group_name=resource_group, + network_interface_name="{}-nic".format(vm_name), + parameters=nic_params).result() + + # update vm config with network parameters + config["network_profile"] = { + "network_interfaces": [{ + "id": nic.id + }] + } + + config["identity"] = { + "type": ResourceIdentityType.user_assigned, + "user_assigned_identities": [{ + # zero-documentation.. *sigh* + "key": self.provider_config["msi_identity_id"], + "value": { + "principal_id": self.provider_config[ + "msi_identity_principal_id"], + "client_id": self.provider_config["msi_identity_id"] + } + }] + } + + # TODO: do we need to wait or fire and forget is fine? + self.compute_client.virtual_machines.create_or_update( + resource_group_name=self.provider_config["resource_group"], + vm_name=vm_name, + parameters=config) + + @synchronized + def set_node_tags(self, node_id, tags): + """Sets the tag values (string dict) for the specified node.""" + node_tags = self._get_cached_node(node_id)["tags"] + node_tags.update(tags) + self.compute_client.virtual_machines.update( + resource_group_name=self.provider_config["resource_group"], + vm_name=node_id, + parameters={"tags": node_tags}) + self.cached_nodes[node_id]["tags"] = node_tags + + def terminate_node(self, node_id): + """Terminates the specified node. This will delete the VM and + associated resources (NIC, IP, Storage) for the specified node.""" + # self.compute_client.virtual_machines.deallocate( + # resource_group_name=self.provider_config["resource_group"], + # vm_name=node_id) + resource_group = self.provider_config["resource_group"] + nodes = self._get_filtered_nodes( + tag_filters={TAG_RAY_CLUSTER_NAME: self.cluster_name}) + for node, metadata in nodes.items(): + # gather disks to delete later + vm = self.compute_client.virtual_machines.get( + resource_group_name=resource_group, vm_name=node) + disks = {d.name for d in vm.storage_profile.data_disks} + disks.add(vm.storage_profile.os_disk.name) + # delete machine, must wait for this to complete + self.compute_client.virtual_machines.delete( + resource_group_name=resource_group, vm_name=node).wait() + # delete nic + self.network_client.network_interfaces.delete( + resource_group_name=resource_group, + network_interface_name=metadata["nic_name"]) + # delete ip address + if "public_ip_name" in metadata: + self.network_client.public_ip_addresses.delete( + resource_group_name=resource_group, + public_ip_address_name=metadata["public_ip_name"]) + # delete disks + for disk in disks: + self.compute_client.disks.delete( + resource_group_name=resource_group, disk_name=disk) + + def _get_node(self, node_id): + self._get_filtered_nodes({}) # Side effect: updates cache + return self.cached_nodes[node_id] + + def _get_cached_node(self, node_id): + if node_id in self.cached_nodes: + return self.cached_nodes[node_id] + return self._get_node(node_id=node_id) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 92dec4073..27a4a61ba 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -18,6 +18,12 @@ def import_gcp(): return bootstrap_gcp, GCPNodeProvider +def import_azure(): + from ray.autoscaler.azure.config import bootstrap_azure + from ray.autoscaler.azure.node_provider import AzureNodeProvider + return bootstrap_azure, AzureNodeProvider + + def import_local(): from ray.autoscaler.local.config import bootstrap_local from ray.autoscaler.local.node_provider import LocalNodeProvider @@ -52,6 +58,12 @@ def load_gcp_example_config(): return os.path.join(os.path.dirname(ray_gcp.__file__), "example-full.yaml") +def load_azure_example_config(): + import ray.autoscaler.azure as ray_azure + return os.path.join( + os.path.dirname(ray_azure.__file__), "example-full.yaml") + + def import_external(): """Mock a normal provider importer.""" @@ -65,7 +77,7 @@ NODE_PROVIDERS = { "local": import_local, "aws": import_aws, "gcp": import_gcp, - "azure": None, # TODO: support more node providers + "azure": import_azure, "kubernetes": import_kubernetes, "docker": None, "external": import_external # Import an external module @@ -75,7 +87,7 @@ DEFAULT_CONFIGS = { "local": load_local_example_config, "aws": load_aws_example_config, "gcp": load_gcp_example_config, - "azure": None, # TODO: support more node providers + "azure": load_azure_example_config, "kubernetes": load_kubernetes_example_config, "docker": None, } diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 32903a3e5..cd17133e9 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -155,6 +155,9 @@ "type": "string", "default": "ubuntu" }, + "ssh_public_key": { + "type": "string" + }, "ssh_private_key": { "type": "string" } diff --git a/python/setup.py b/python/setup.py index 3d48e2c5a..36af00e76 100644 --- a/python/setup.py +++ b/python/setup.py @@ -42,6 +42,7 @@ optional_ray_files = [] ray_autoscaler_files = [ "ray/autoscaler/aws/example-full.yaml", + "ray/autoscaler/azure/example-full.yaml", "ray/autoscaler/gcp/example-full.yaml", "ray/autoscaler/local/example-full.yaml", "ray/autoscaler/kubernetes/example-full.yaml",