[autoscaler] Adding Azure Support (#7080)

* adding directory and node_provider entry for azure autoscaler

* adding initial cut at azure autoscaler functionality, needs testing and node_provider methods need updating

* adding todos and switching to auth file for service principal authentication

* adding role / scope to service principal

* resolving issues with app credentials

* adding retry for setting service principal role

* typo and adding retry to nic creation

* adding nsg to config, moving nic/public ip to node provider, cleanup node_provider, leaving in NodeProvider stub for testing

* linting

* updating cleanup and fixing bugs

* adding directory and node_provider entry for azure autoscaler

* adding initial cut at azure autoscaler functionality, needs testing and node_provider methods need updating

* adding todos and switching to auth file for service principal authentication

* adding role / scope to service principal

* resolving issues with app credentials

* adding retry for setting service principal role

* typo and adding retry to nic creation

* adding nsg to config, moving nic/public ip to node provider, cleanup node_provider, leaving in NodeProvider stub for testing

* linting

* updating cleanup and fixing bugs

* minor fixes

* first working version :)

* added tag support

* added msi identity intermediate

* enable MSI through user managed identity

* updated schema

* extend yaml schema
remove service principal code
add re-use of managed user identity

* fix rg_id

* fix logging

* replace manual cluster yaml validation with json schema
- improved error message
- support for intellisense in VSCode (or other IDEs)

* run linting

* updating yaml configs and formatting

* updating yaml configs and formatting

* typo in example config

* pulling default config from example-full

* resetting min, init worker prop

* adding docs for azure autoscaler and fixing status

* add azure to docs, fix config for spot instances, update azure provider to avoid caching issues during deployment

* fix for default subscription in azure node provider

* vm dev image build

* minor change

* keeping example-full.yaml in autoscaler/azure, updating azure example config

* linting azure config

* extending retries on azure config

* lint

* support for internal ips, fix to azure docs, and new azure gpu example config

* linting

* Update python/ray/autoscaler/azure/node_provider.py

Co-Authored-By: Richard Liaw <rliaw@berkeley.edu>

* revert_this

* remove_schema

* updating configs and removing ssh keygen, tweak azure node provider terminate

* minor tweaks

Co-authored-by: Markus Cozowicz <marcozo@microsoft.com>
Co-authored-by: Ubuntu <marcozo@mc-ray-jumpbox.chcbtljllnieveqhw3e4c1ducc.xx.internal.cloudapp.net>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Scott Graham
2020-03-15 17:48:27 -04:00
committed by GitHub
parent 3f1fcaa024
commit 37e4d29f87
13 changed files with 1012 additions and 4 deletions
+227
View File
@@ -0,0 +1,227 @@
import logging
import os
import time
import uuid
from azure.common.exceptions import CloudError, AuthenticationError
from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.msi import ManagedServiceIdentityClient
RETRIES = 30
MSI_NAME = "ray-msi-user-identity"
NSG_NAME = "ray-nsg"
SUBNET_NAME = "ray-subnet"
VNET_NAME = "ray-vnet"
logger = logging.getLogger(__name__)
def bootstrap_azure(config):
config = _configure_resource_group(config)
config = _configure_msi_user(config)
config = _configure_key_pair(config)
config = _configure_network(config)
return config
def _get_client(client_class, config):
kwargs = {}
if "subscription_id" in config["provider"]:
kwargs["subscription_id"] = config["provider"]["subscription_id"]
return get_client_from_cli_profile(client_class=client_class, **kwargs)
def _configure_resource_group(config):
# TODO: look at availability sets
# https://docs.microsoft.com/en-us/azure/virtual-machines/windows/tutorial-availability-sets
resource_client = _get_client(ResourceManagementClient, config)
subscription_id = resource_client.config.subscription_id
logger.info("Using subscription id: %s", subscription_id)
config["provider"]["subscription_id"] = subscription_id
assert "resource_group" in config["provider"], (
"Provider config must include resource_group field")
resource_group = config["provider"]["resource_group"]
assert "location" in config["provider"], (
"Provider config must include location field")
params = {"location": config["provider"]["location"]}
if "tags" in config["provider"]:
params["tags"] = config["provider"]["tags"]
logger.info("Creating/Updating Resource Group: %s", resource_group)
resource_client.resource_groups.create_or_update(
resource_group_name=resource_group, parameters=params)
return config
def _configure_msi_user(config):
msi_client = _get_client(ManagedServiceIdentityClient, config)
resource_client = _get_client(ResourceManagementClient, config)
auth_client = _get_client(AuthorizationManagementClient, config)
resource_group = config["provider"]["resource_group"]
location = config["provider"]["location"]
resource_group_id = resource_client.resource_groups.get(resource_group).id
try:
identity = msi_client.user_assigned_identities.list_by_resource_group(
resource_group_name=resource_group,
filter="name eq '{}'".format(MSI_NAME)).next()
logger.info("Found MSI User Assigned Identity: %s", MSI_NAME)
except StopIteration:
logger.info("Creating MSI User Assigned Identity: %s", MSI_NAME)
identity = msi_client.user_assigned_identities.create_or_update(
resource_group_name=resource_group,
resource_name=MSI_NAME,
location=location)
identity_id = identity.id
principal_id = identity.principal_id
config["provider"]["msi_identity_id"] = identity_id
config["provider"]["msi_identity_principal_id"] = principal_id
# assign Contributor role for MSI User Identity to resource group
role_id = auth_client.role_definitions.list(
scope=resource_group_id, filter="roleName eq 'Contributor'").next().id
role_params = {"role_definition_id": role_id, "principal_id": principal_id}
for _ in range(RETRIES):
try:
filter_expr = "principalId eq '{}'".format(principal_id)
assignments = auth_client.role_assignments.list_for_scope(
scope=resource_group_id, filter=filter_expr)
if any(a.role_definition_id == role_id for a in assignments):
break
auth_client.role_assignments.create(
scope=resource_group_id,
role_assignment_name=uuid.uuid4(),
parameters=role_params)
logger.info("Assigning Contributor Role to MSI User")
except CloudError as ce:
if ce.inner_exception.error == "PrincipalNotFound":
time.sleep(5)
else:
raise Exception(
"Failed to create contributor role assignment (timeout)")
return config
def _configure_key_pair(config):
ssh_user = config["auth"]["ssh_user"]
for key_type in ["ssh_private_key", "ssh_public_key"]:
try:
key_path = os.path.expanduser(config["auth"][key_type])
except KeyError:
raise Exception("Config must define {}".format(key_type))
except TypeError:
raise Exception("Invalid config value for {}".format(key_type))
assert os.path.exists(key_path), (
"Could not find ssh key: {}".format(key_path))
if key_type == "ssh_public_key":
with open(key_path, "r") as f:
public_key = f.read()
os_profile = {
"admin_username": ssh_user,
"computer_name": None,
"linux_configuration": {
"disable_password_authentication": True,
"ssh": {
"public_keys": [{
"key_data": public_key,
"path": "/home/{}/.ssh/authorized_keys".format(ssh_user)
}]
}
}
}
for node_type in ["head_node", "worker_nodes"]:
config[node_type]["os_profile"] = os_profile
return config
def _configure_network(config):
# skip this if subnet is manually set in configuration yaml
if "subnet_id" in config["provider"]:
return config
location = config["provider"]["location"]
resource_group = config["provider"]["resource_group"]
network_client = _get_client(NetworkManagementClient, config)
vnets = []
for _ in range(RETRIES):
try:
vnets = list(
network_client.virtual_networks.list(
resource_group_name=resource_group,
filter="name eq '{}'".format(VNET_NAME)))
break
except CloudError:
time.sleep(1)
except AuthenticationError:
# wait for service principal authorization to populate
time.sleep(1)
# can't update vnet if subnet already exists
if not vnets:
# create vnet
logger.info("Creating/Updating VNet: %s", VNET_NAME)
vnet_params = {
"location": location,
"address_space": {
"address_prefixes": ["10.0.0.0/16"]
}
}
network_client.virtual_networks.create_or_update(
resource_group_name=resource_group,
virtual_network_name=VNET_NAME,
parameters=vnet_params).wait()
# create subnet
logger.info("Creating/Updating Subnet: %s", SUBNET_NAME)
subnet_params = {"address_prefix": "10.0.0.0/24"}
subnet = network_client.subnets.create_or_update(
resource_group_name=resource_group,
virtual_network_name=VNET_NAME,
subnet_name=SUBNET_NAME,
subnet_parameters=subnet_params).result()
config["provider"]["subnet_id"] = subnet.id
# create network security group
logger.info("Creating/Updating Network Security Group: %s", NSG_NAME)
nsg_params = {
"location": location,
"security_rules": [{
"protocol": "Tcp",
"source_port_range": "*",
"source_address_prefix": "*",
"destination_port_range": "22",
"destination_address_prefix": "*",
"access": "Allow",
"priority": 300,
"direction": "Inbound",
"name": "ssh_rule"
}]
}
network_client.network_security_groups.create_or_update(
resource_group_name=resource_group,
network_security_group_name=NSG_NAME,
parameters=nsg_params).wait()
return config
@@ -0,0 +1,160 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: default
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2
# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
image: "" # e.g., tensorflow/tensorflow:1.5.0-py3
container_name: "" # e.g. ray_docker
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
run_options: [] # Extra options to pass into "docker run"
# Example of running a GPU head with CPU workers
# head_image: "tensorflow/tensorflow:1.13.1-py3"
# head_run_options:
# - --runtime=nvidia
# worker_image: "ubuntu:18.04"
# worker_run_options: []
# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: azure
# https://azure.microsoft.com/en-us/global-infrastructure/locations
location: westus2
resource_group: ray-cluster
# set subscription id otherwise the default from az cli will be used
# subscription_id: 00000000-0000-0000-0000-000000000000
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# you must specify paths to matching private and public key pair files
# use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
ssh_private_key: ~/.ssh/id_rsa
ssh_public_key: ~/.ssh/id_rsa.pub
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields
# The Azure Python SDK client expects slug_style property names
# For more documentation on available fields, see:
# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python
# Note: the Azure Python SDK expects all parameter keys to be in slug_style
# the styles of parameter values are not changed
head_node:
hardware_profile:
vm_size: Standard_D2s_v3
storage_profile:
os_disk:
create_option: FromImage
caching: ReadWrite
image_reference:
# List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
publisher: microsoft-dsvm
offer: ubuntu-1804
sku: 1804-gen2
version: 20.02.01
# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields
# Documentation on fields used can be found here:
# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python
# Note: the Azure Python SDK expects all parameter keys to be in slug_style
# the styles of parameter values are not changed
worker_nodes:
hardware_profile:
vm_size: Standard_F2s_v2
storage_profile:
os_disk:
create_option: FromImage
caching: ReadWrite
image_reference:
# List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
publisher: microsoft-dsvm
offer: ubuntu-1804
sku: 1804-gen
version: 20.02.01
# You can provision additional disk space as follows
# data_disks:
# - disk_size_gb: 1024
# run workers on spot instances by default
priority: Spot
eviction_policy: Deallocate
billing_profile:
max_price: -1
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
# get rid of annoying Ubuntu message
- touch ~/.sudo_as_admin_successful
# List of shell commands to run to set up nodes.
setup_commands:
# Note: if you're developing Ray, you probably want to create an AMI that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
# change to use environment desired
#- echo "conda activate py37_pytorch" >> ~/.bashrc
#- echo "conda activate py37_tensorflow" >> ~/.bashrc
- pip install -U https://sdgraystorage.blob.core.windows.net/ray-wheels/ray-0.9.0.dev0-cp37-cp37m-manylinux1_x86_64.whl
# Consider uncommenting these if you also want to run apt-get commands during setup
# - sudo pkill -9 apt-get || true
# - sudo pkill -9 dpkg || true
# - sudo dpkg --configure -a
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install azure-cli-core azure-core azure-mgmt-authorization azure-mgmt-compute azure-mgmt-msi azure-mgmt-network
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
@@ -0,0 +1,110 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: gpu-docker
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2
# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
image: "tensorflow/tensorflow:1.13.1-gpu-py3"
container_name: "ray-nvidia-docker-test" # e.g. ray_docker
run_options:
- --runtime=nvidia
# # Example of running a GPU head with CPU workers
# head_image: "tensorflow/tensorflow:1.13.1-gpu-py3"
# head_run_options:
# - --runtime=nvidia
# worker_image: "ubuntu:18.04"
# worker_run_options: []
# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: azure
location: westus2
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# you must specify paths to matching private and public key pair files
# use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
ssh_private_key: ~/.ssh/id_rsa
ssh_public_key: ~/.ssh/id_rsa.pub
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields
# The Azure Python SDK client expects slug_style property names
# For more documentation on available fields, see:
# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python
# Note: the Azure Python SDK expects all parameter keys to be in slug_style
# the styles of parameter values are not changed
head_node:
hardware_profile:
vm_size: Standard_NC6s_v3
# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields
# The Azure Python SDK client expects slug_style property names
# For more documentation on available fields, see:
# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python
# Note: the Azure Python SDK expects all parameter keys to be in slug_style
# the styles of parameter values are not changed
worker_nodes:
hardware_profile:
vm_size: Standard_NC6s_v3
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# List of shell commands to run to set up nodes.
setup_commands:
- pip install -U https://sdgraystorage.blob.core.windows.net/ray-wheels/ray-0.9.0.dev0-cp37-cp37m-manylinux1_x86_64.whl
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install azure-cli-core azure-core azure-mgmt-authorization azure-mgmt-compute azure-mgmt-msi azure-mgmt-network
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
@@ -0,0 +1,159 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: default
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2
# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 0
# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
image: "" # e.g., tensorflow/tensorflow:1.5.0-py3
container_name: "" # e.g. ray_docker
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
run_options: [] # Extra options to pass into "docker run"
# Example of running a GPU head with CPU workers
# head_image: "tensorflow/tensorflow:1.13.1-py3"
# head_run_options:
# - --runtime=nvidia
# worker_image: "ubuntu:18.04"
# worker_run_options: []
# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
type: azure
# https://azure.microsoft.com/en-us/global-infrastructure/locations
location: westus2
resource_group: ray-cluster
# set subscription id otherwise the default from az cli will be used
# subscription_id: 00000000-0000-0000-0000-000000000000
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# you must specify paths to matching private and public key pair files
# use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
ssh_private_key: ~/.ssh/id_rsa
ssh_public_key: ~/.ssh/id_rsa.pub
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields
# The Azure Python SDK client expects slug_style property names
# For more documentation on available fields, see:
# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python
# Note: the Azure Python SDK expects all parameter keys to be in slug_style
# the styles of parameter values are not changed
head_node:
hardware_profile:
vm_size: Standard_NC6
storage_profile:
os_disk:
create_option: FromImage
caching: ReadWrite
image_reference:
# List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
publisher: microsoft-dsvm
offer: ubuntu-1804
sku: 1804
version: 20.02.01
# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields
# Documentation on fields used can be found here:
# https://docs.microsoft.com/en-us/python/api/overview/azure/virtualmachines?view=azure-python
# Note: the Azure Python SDK expects all parameter keys to be in slug_style
# the styles of parameter values are not changed
worker_nodes:
hardware_profile:
vm_size: Standard_NC6
storage_profile:
os_disk:
create_option: FromImage
caching: ReadWrite
image_reference:
# List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
publisher: microsoft-dsvm
offer: ubuntu-1804
sku: 1804
version: 20.02.01
# You can provision additional disk space as follows
# data_disks:
# - disk_size_gb: 1024
# run workers on spot instances by default
priority: Spot
eviction_policy: Deallocate
billing_profile:
max_price: -1
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
}
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
# get rid of annoying Ubuntu message
- touch ~/.sudo_as_admin_successful
# List of shell commands to run to set up nodes.
setup_commands:
# Note: if you're developing Ray, you probably want to create an AMI that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
# - echo 'conda activate py37_pytorch' >> ~/.bashrc
- echo 'conda activate py37_tensorflow' >> ~/.bashrc
- pip install -U https://sdgraystorage.blob.core.windows.net/ray-wheels/ray-0.9.0.dev0-cp37-cp37m-manylinux1_x86_64.whl
# Consider uncommenting these if you also want to run apt-get commands during setup
# - sudo pkill -9 apt-get || true
# - sudo pkill -9 dpkg || true
# - sudo dpkg --configure -a
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install azure-cli-core azure-core azure-mgmt-authorization azure-mgmt-network azure-mgmt-compute azure-mgmt-msi
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
@@ -0,0 +1,20 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers. min_workers default to 0.
max_workers: 1
# Cloud-provider specific configuration.
provider:
type: azure
location: westus2
resource_group: ray-cluster
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# you must specify paths to matching private and public key pair files
# use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
ssh_private_key: ~/.ssh/id_rsa
ssh_public_key: ~/.ssh/id_rsa.pub
@@ -0,0 +1,291 @@
import logging
from threading import RLock
from uuid import uuid4
from azure.common.client_factory import get_client_from_cli_profile
from msrestazure.azure_active_directory import MSIAuthentication
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.compute.models import ResourceIdentityType
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME
VM_NAME_MAX_LEN = 64
VM_NAME_UUID_LEN = 8
logger = logging.getLogger(__name__)
def synchronized(f):
def wrapper(self, *args, **kwargs):
self.lock.acquire()
try:
return f(self, *args, **kwargs)
finally:
self.lock.release()
return wrapper
class AzureNodeProvider(NodeProvider):
"""Node Provider for Azure
This provider assumes Azure credentials are set by running ``az login``
and the default subscription is configured through ``az account``
or set in the ``provider`` field of the autoscaler configuration.
Nodes may be in one of three states: {pending, running, terminated}. Nodes
appear immediately once started by ``create_node``, and transition
immediately to terminated when ``terminate_node`` is called.
"""
def __init__(self, provider_config, cluster_name):
NodeProvider.__init__(self, provider_config, cluster_name)
kwargs = {}
if "subscription_id" in provider_config:
kwargs["subscription_id"] = provider_config["subscription_id"]
try:
self.compute_client = get_client_from_cli_profile(
client_class=ComputeManagementClient, **kwargs)
self.network_client = get_client_from_cli_profile(
client_class=NetworkManagementClient, **kwargs)
except Exception:
logger.info(
"CLI profile authentication failed. Trying MSI", exc_info=True)
credentials = MSIAuthentication()
self.compute_client = ComputeManagementClient(
credentials=credentials, **kwargs)
self.network_client = NetworkManagementClient(
credentials=credentials, **kwargs)
self.lock = RLock()
# cache node objects
self.cached_nodes = {}
@synchronized
def _get_filtered_nodes(self, tag_filters):
def match_tags(vm):
for k, v in tag_filters.items():
if vm.tags.get(k) != v:
return False
return True
vms = self.compute_client.virtual_machines.list(
resource_group_name=self.provider_config["resource_group"])
nodes = [self._extract_metadata(vm) for vm in filter(match_tags, vms)]
self.cached_nodes = {node["name"]: node for node in nodes}
return self.cached_nodes
def _extract_metadata(self, vm):
# get tags
metadata = {"name": vm.name, "tags": vm.tags, "status": ""}
# get status
resource_group = self.provider_config["resource_group"]
instance = self.compute_client.virtual_machines.instance_view(
resource_group_name=resource_group, vm_name=vm.name).as_dict()
for status in instance["statuses"]:
code, state = status["code"].split("/")
# skip provisioning status
if code == "PowerState":
metadata["status"] = state
break
# get ip data
nic_id = vm.network_profile.network_interfaces[0].id
metadata["nic_name"] = nic_id.split("/")[-1]
nic = self.network_client.network_interfaces.get(
resource_group_name=resource_group,
network_interface_name=metadata["nic_name"])
ip_config = nic.ip_configurations[0]
if not self.provider_config.get("use_internal_ips", False):
public_ip_id = ip_config.public_ip_address.id
metadata["public_ip_name"] = public_ip_id.split("/")[-1]
public_ip = self.network_client.public_ip_addresses.get(
resource_group_name=resource_group,
public_ip_address_name=metadata["public_ip_name"])
metadata["external_ip"] = public_ip.ip_address
metadata["internal_ip"] = ip_config.private_ip_address
return metadata
def non_terminated_nodes(self, tag_filters):
"""Return a list of node ids filtered by the specified tags dict.
This list must not include terminated nodes. For performance reasons,
providers are allowed to cache the result of a call to nodes() to
serve single-node queries (e.g. is_running(node_id)). This means that
nodes() must be called again to refresh results.
Examples:
>>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"})
["node-1", "node-2"]
"""
nodes = self._get_filtered_nodes(tag_filters=tag_filters)
return [
k for k, v in nodes.items()
if not v["status"].startswith("deallocat")
]
def is_running(self, node_id):
"""Return whether the specified node is running."""
# always get current status
node = self._get_node(node_id=node_id)
return node["status"] == "running"
def is_terminated(self, node_id):
"""Return whether the specified node is terminated."""
# always get current status
node = self._get_node(node_id=node_id)
return node["status"].startswith("deallocat")
def node_tags(self, node_id):
"""Returns the tags of the given node (string dict)."""
return self._get_cached_node(node_id=node_id)["tags"]
def external_ip(self, node_id):
"""Returns the external ip of the given node."""
ip = (self._get_cached_node(node_id=node_id)["external_ip"]
or self._get_node(node_id=node_id)["external_ip"])
return ip
def internal_ip(self, node_id):
"""Returns the internal ip (Ray ip) of the given node."""
ip = (self._get_cached_node(node_id=node_id)["internal_ip"]
or self._get_node(node_id=node_id)["internal_ip"])
return ip
def create_node(self, node_config, tags, count):
"""Creates a number of nodes within the namespace."""
# TODO: restart deallocated nodes if possible
location = self.provider_config["location"]
resource_group = self.provider_config["resource_group"]
subnet_id = self.provider_config["subnet_id"]
config = node_config.copy()
config_tags = config.get("tags", {})
config_tags.update(tags)
config_tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name
config["tags"] = config_tags
config["location"] = location
name_tag = config_tags.get(TAG_RAY_NODE_NAME, "node")
for _ in range(count):
unique_id = uuid4().hex[:VM_NAME_UUID_LEN]
vm_name = "{name}-{id}".format(name=name_tag, id=unique_id)
config["os_profile"]["computer_name"] = vm_name
try:
assert len(vm_name) <= VM_NAME_MAX_LEN
except AssertionError as e:
e.args += ("name", vm_name)
raise
ip_configuration = {"name": uuid4(), "subnet": {"id": subnet_id}}
if not self.provider_config.get("use_internal_ips", False):
# create public ip address
public_ip_addess_params = {
"location": location,
"public_ip_allocation_method": "Dynamic"
}
public_ip_address = (
self.network_client.public_ip_addresses.create_or_update(
resource_group_name=resource_group,
public_ip_address_name="{}-ip".format(vm_name),
parameters=public_ip_addess_params).result())
ip_configuration["public_ip_address"] = public_ip_address
nic_params = {
"location": location,
"ip_configurations": [ip_configuration]
}
nic = self.network_client.network_interfaces.create_or_update(
resource_group_name=resource_group,
network_interface_name="{}-nic".format(vm_name),
parameters=nic_params).result()
# update vm config with network parameters
config["network_profile"] = {
"network_interfaces": [{
"id": nic.id
}]
}
config["identity"] = {
"type": ResourceIdentityType.user_assigned,
"user_assigned_identities": [{
# zero-documentation.. *sigh*
"key": self.provider_config["msi_identity_id"],
"value": {
"principal_id": self.provider_config[
"msi_identity_principal_id"],
"client_id": self.provider_config["msi_identity_id"]
}
}]
}
# TODO: do we need to wait or fire and forget is fine?
self.compute_client.virtual_machines.create_or_update(
resource_group_name=self.provider_config["resource_group"],
vm_name=vm_name,
parameters=config)
@synchronized
def set_node_tags(self, node_id, tags):
"""Sets the tag values (string dict) for the specified node."""
node_tags = self._get_cached_node(node_id)["tags"]
node_tags.update(tags)
self.compute_client.virtual_machines.update(
resource_group_name=self.provider_config["resource_group"],
vm_name=node_id,
parameters={"tags": node_tags})
self.cached_nodes[node_id]["tags"] = node_tags
def terminate_node(self, node_id):
"""Terminates the specified node. This will delete the VM and
associated resources (NIC, IP, Storage) for the specified node."""
# self.compute_client.virtual_machines.deallocate(
# resource_group_name=self.provider_config["resource_group"],
# vm_name=node_id)
resource_group = self.provider_config["resource_group"]
nodes = self._get_filtered_nodes(
tag_filters={TAG_RAY_CLUSTER_NAME: self.cluster_name})
for node, metadata in nodes.items():
# gather disks to delete later
vm = self.compute_client.virtual_machines.get(
resource_group_name=resource_group, vm_name=node)
disks = {d.name for d in vm.storage_profile.data_disks}
disks.add(vm.storage_profile.os_disk.name)
# delete machine, must wait for this to complete
self.compute_client.virtual_machines.delete(
resource_group_name=resource_group, vm_name=node).wait()
# delete nic
self.network_client.network_interfaces.delete(
resource_group_name=resource_group,
network_interface_name=metadata["nic_name"])
# delete ip address
if "public_ip_name" in metadata:
self.network_client.public_ip_addresses.delete(
resource_group_name=resource_group,
public_ip_address_name=metadata["public_ip_name"])
# delete disks
for disk in disks:
self.compute_client.disks.delete(
resource_group_name=resource_group, disk_name=disk)
def _get_node(self, node_id):
self._get_filtered_nodes({}) # Side effect: updates cache
return self.cached_nodes[node_id]
def _get_cached_node(self, node_id):
if node_id in self.cached_nodes:
return self.cached_nodes[node_id]
return self._get_node(node_id=node_id)
+14 -2
View File
@@ -18,6 +18,12 @@ def import_gcp():
return bootstrap_gcp, GCPNodeProvider
def import_azure():
from ray.autoscaler.azure.config import bootstrap_azure
from ray.autoscaler.azure.node_provider import AzureNodeProvider
return bootstrap_azure, AzureNodeProvider
def import_local():
from ray.autoscaler.local.config import bootstrap_local
from ray.autoscaler.local.node_provider import LocalNodeProvider
@@ -52,6 +58,12 @@ def load_gcp_example_config():
return os.path.join(os.path.dirname(ray_gcp.__file__), "example-full.yaml")
def load_azure_example_config():
import ray.autoscaler.azure as ray_azure
return os.path.join(
os.path.dirname(ray_azure.__file__), "example-full.yaml")
def import_external():
"""Mock a normal provider importer."""
@@ -65,7 +77,7 @@ NODE_PROVIDERS = {
"local": import_local,
"aws": import_aws,
"gcp": import_gcp,
"azure": None, # TODO: support more node providers
"azure": import_azure,
"kubernetes": import_kubernetes,
"docker": None,
"external": import_external # Import an external module
@@ -75,7 +87,7 @@ DEFAULT_CONFIGS = {
"local": load_local_example_config,
"aws": load_aws_example_config,
"gcp": load_gcp_example_config,
"azure": None, # TODO: support more node providers
"azure": load_azure_example_config,
"kubernetes": load_kubernetes_example_config,
"docker": None,
}
+3
View File
@@ -155,6 +155,9 @@
"type": "string",
"default": "ubuntu"
},
"ssh_public_key": {
"type": "string"
},
"ssh_private_key": {
"type": "string"
}
+1
View File
@@ -42,6 +42,7 @@ optional_ray_files = []
ray_autoscaler_files = [
"ray/autoscaler/aws/example-full.yaml",
"ray/autoscaler/azure/example-full.yaml",
"ray/autoscaler/gcp/example-full.yaml",
"ray/autoscaler/local/example-full.yaml",
"ray/autoscaler/kubernetes/example-full.yaml",