From 85a68761196b6e9b0db0fdf87bc7fb750f4163fe Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 20 Aug 2020 12:27:11 -0700 Subject: [PATCH] [autoscaler] Rename instance_type => node_type, TAG_RAY_INSTANCE_TYPE => TAG_RAY_USER_NODE_TYPE (#10207) --- python/ray/autoscaler/autoscaler.py | 35 +++---- python/ray/autoscaler/aws/config.py | 14 +-- python/ray/autoscaler/aws/node_provider.py | 6 +- python/ray/autoscaler/azure/node_provider.py | 2 +- python/ray/autoscaler/commands.py | 22 ++--- python/ray/autoscaler/local/node_provider.py | 24 ++--- python/ray/autoscaler/node_launcher.py | 34 +++---- python/ray/autoscaler/node_provider.py | 2 +- .../autoscaler/resource_demand_scheduler.py | 91 +++++++++---------- python/ray/autoscaler/tags.py | 15 +-- python/ray/tests/test_autoscaler.py | 20 ++-- python/ray/tests/test_coordinator_server.py | 20 ++-- .../tests/test_resource_demand_scheduler.py | 20 ++-- 13 files changed, 153 insertions(+), 152 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index ef2b0701d..6bfd1b6eb 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -15,8 +15,8 @@ from ray.experimental.internal_kv import _internal_kv_put, \ from ray.autoscaler.node_provider import get_node_provider from ray.autoscaler.tags import ( TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, - TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, - TAG_RAY_INSTANCE_TYPE, STATUS_UP_TO_DATE, NODE_TYPE_WORKER) + TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND, + TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER) from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.node_launcher import NodeLauncher from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler @@ -65,11 +65,12 @@ class StandardAutoscaler: # Check whether we can enable the resource demand scheduler. if "available_node_types" in self.config: - self.instance_types = self.config["available_node_types"] + self.available_node_types = self.config["available_node_types"] self.resource_demand_scheduler = ResourceDemandScheduler( - self.provider, self.instance_types, self.config["max_workers"]) + self.provider, self.available_node_types, + self.config["max_workers"]) else: - self.instance_types = None + self.available_node_types = None self.resource_demand_scheduler = None self.max_failures = max_failures @@ -97,7 +98,7 @@ class StandardAutoscaler: queue=self.launch_queue, index=i, pending=self.pending_launches, - instance_types=self.instance_types, + node_types=self.available_node_types, ) node_launcher.daemon = True node_launcher.start() @@ -203,8 +204,8 @@ class StandardAutoscaler: nodes, self.pending_launches.breakdown(), self.resource_demand_vector)) # TODO(ekl) also enforce max launch concurrency here? - for instance_type, count in instances: - self.launch_new_node(count, instance_type=instance_type) + for node_type, count in instances: + self.launch_new_node(count, node_type=node_type) # Launch additional nodes of the default type, if still needed. num_workers = len(nodes) + num_pending @@ -262,10 +263,11 @@ class StandardAutoscaler: self.recover_if_needed(node_id, now) def _node_resources(self, node_id): - instance_type = self.provider.node_tags(node_id).get( - TAG_RAY_INSTANCE_TYPE) - if self.instance_types and instance_type in self.instance_types: - return self.instance_types[instance_type].get("resources", {}) + node_type = self.provider.node_tags(node_id).get( + TAG_RAY_USER_NODE_TYPE) + if self.available_node_types: + return self.available_node_types.get(node_type, {}).get( + "resources", {}) else: return {} @@ -441,17 +443,16 @@ class StandardAutoscaler: return False return True - def launch_new_node(self, count: int, - instance_type: Optional[str]) -> None: + def launch_new_node(self, count: int, node_type: Optional[str]) -> None: logger.info( "StandardAutoscaler: Queue {} new nodes for launch".format(count)) - self.pending_launches.inc(instance_type, count) + self.pending_launches.inc(node_type, count) config = copy.deepcopy(self.config) - self.launch_queue.put((config, count, instance_type)) + self.launch_queue.put((config, count, node_type)) def workers(self): return self.provider.non_terminated_nodes( - tag_filters={TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}) + tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def log_info_string(self, nodes, target): tmp = "Cluster status: " diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index 24d00cf65..3001c4fa9 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -12,7 +12,7 @@ from botocore.config import Config import botocore from ray.ray_constants import BOTO_MAX_RETRIES -from ray.autoscaler.tags import NODE_TYPE_WORKER, NODE_TYPE_HEAD +from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD from ray.autoscaler.aws.utils import LazyDefaultDict, handle_boto_error from ray.autoscaler.node_provider import PROVIDER_PRETTY_NAMES @@ -29,8 +29,8 @@ SECURITY_GROUP_TEMPLATE = RAY + "-{}" # Mapping from the node type tag to the section of the autoscaler yaml that # contains the config for the node type. NODE_TYPE_CONFIG_KEYS = { - NODE_TYPE_WORKER: "worker_nodes", - NODE_TYPE_HEAD: "head_node", + NODE_KIND_WORKER: "worker_nodes", + NODE_KIND_HEAD: "head_node", } DEFAULT_AMI_NAME = "AWS Deep Learning AMI (Ubuntu 18.04) V30.0" @@ -434,8 +434,8 @@ def _configure_security_group(config): security_groups = _upsert_security_groups(config, node_types_to_configure) - if NODE_TYPE_HEAD in node_types_to_configure: - head_sg = security_groups[NODE_TYPE_HEAD] + if NODE_KIND_HEAD in node_types_to_configure: + head_sg = security_groups[NODE_KIND_HEAD] _set_config_info(head_security_group_src="default") cli_logger.old_info( @@ -444,8 +444,8 @@ def _configure_security_group(config): head_sg.group_name, head_sg.id) config["head_node"]["SecurityGroupIds"] = [head_sg.id] - if NODE_TYPE_WORKER in node_types_to_configure: - workers_sg = security_groups[NODE_TYPE_WORKER] + if NODE_KIND_WORKER in node_types_to_configure: + workers_sg = security_groups[NODE_KIND_WORKER] _set_config_info(workers_security_group_src="default") cli_logger.old_info( diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index d77d06ef9..6ea3a301b 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -11,7 +11,7 @@ from botocore.config import Config from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.aws.config import bootstrap_aws from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \ - TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_TYPE, TAG_RAY_INSTANCE_TYPE + TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_INSTANCE_TYPE from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES from ray.autoscaler.log_timer import LogTimer @@ -209,8 +209,8 @@ class AWSNodeProvider(NodeProvider): "Values": [self.cluster_name], }, { - "Name": "tag:{}".format(TAG_RAY_NODE_TYPE), - "Values": [tags[TAG_RAY_NODE_TYPE]], + "Name": "tag:{}".format(TAG_RAY_NODE_KIND), + "Values": [tags[TAG_RAY_NODE_KIND]], }, { "Name": "tag:{}".format(TAG_RAY_LAUNCH_CONFIG), diff --git a/python/ray/autoscaler/azure/node_provider.py b/python/ray/autoscaler/azure/node_provider.py index 69c197e61..8833a68bf 100644 --- a/python/ray/autoscaler/azure/node_provider.py +++ b/python/ray/autoscaler/azure/node_provider.py @@ -135,7 +135,7 @@ class AzureNodeProvider(NodeProvider): nodes() must be called again to refresh results. Examples: - >>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) + >>> provider.non_terminated_nodes({TAG_RAY_NODE_KIND: "worker"}) ["node-1", "node-2"] """ nodes = self._get_filtered_nodes(tag_filters=tag_filters) diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index b16753d3a..73228381d 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -25,8 +25,8 @@ from ray.autoscaler.util import validate_config, hash_runtime_conf, \ from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS, \ PROVIDER_PRETTY_NAMES, try_get_log_state, try_logging_config, \ try_reload_log_state -from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ - TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD, TAG_RAY_INSTANCE_TYPE +from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \ + TAG_RAY_NODE_NAME, NODE_KIND_WORKER, NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler.updater import NodeUpdaterThread @@ -82,7 +82,7 @@ def request_resources(num_cpus=None, bundles=None): Args: num_cpus: int -- the number of CPU cores to request bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This - only has an effect if you've configured `available_instance_types` + only has an effect if you've configured `available_node_types` if your cluster config. """ r = _redis() @@ -311,7 +311,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, def remaining_nodes(): workers = provider.non_terminated_nodes({ - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }) if keep_min_workers: @@ -336,7 +336,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, return workers head = provider.non_terminated_nodes({ - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD + TAG_RAY_NODE_KIND: NODE_KIND_HEAD }) return head + workers @@ -379,7 +379,7 @@ def kill_node(config_file, yes, hard, override_cluster_name): provider = get_node_provider(config["provider"], config["cluster_name"]) try: nodes = provider.non_terminated_nodes({ - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }) node = random.choice(nodes) cli_logger.print("Shutdown " + cf.bold("{}"), node) @@ -472,7 +472,7 @@ def get_or_create_head_node(config, config_file = os.path.abspath(config_file) try: head_node_tags = { - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD, + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, } nodes = provider.non_terminated_nodes(head_node_tags) if len(nodes) > 0: @@ -520,7 +520,7 @@ def get_or_create_head_node(config, # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync) head_node_config = copy.deepcopy(config["head_node"]) if "head_node_type" in config: - head_node_tags[TAG_RAY_INSTANCE_TYPE] = config["head_node_type"] + head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"] head_node_config.update(config["available_node_types"][config[ "head_node_type"]]["node_config"]) @@ -985,7 +985,7 @@ def get_worker_node_ips(config_file: str, provider = get_node_provider(config["provider"], config["cluster_name"]) try: nodes = provider.non_terminated_nodes({ - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }) if config.get("provider", {}).get("use_internal_ips", False) is True: @@ -1005,7 +1005,7 @@ def _get_worker_nodes(config, override_cluster_name): provider = get_node_provider(config["provider"], config["cluster_name"]) try: return provider.non_terminated_nodes({ - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }) finally: provider.cleanup() @@ -1018,7 +1018,7 @@ def _get_head_node(config: Dict[str, Any], provider = get_node_provider(config["provider"], config["cluster_name"]) try: head_node_tags = { - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD, + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, } nodes = provider.non_terminated_nodes(head_node_tags) finally: diff --git a/python/ray/autoscaler/local/node_provider.py b/python/ray/autoscaler/local/node_provider.py index 46d47d663..82b1a8654 100644 --- a/python/ray/autoscaler/local/node_provider.py +++ b/python/ray/autoscaler/local/node_provider.py @@ -8,9 +8,9 @@ import logging from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.local.config import bootstrap_local from ray.autoscaler.tags import ( - TAG_RAY_NODE_TYPE, - NODE_TYPE_WORKER, - NODE_TYPE_HEAD, + TAG_RAY_NODE_KIND, + NODE_KIND_WORKER, + NODE_KIND_HEAD, ) logger = logging.getLogger(__name__) @@ -31,8 +31,8 @@ class ClusterState: workers = json.loads(open(self.save_path).read()) head_config = workers.get(provider_config["head_ip"]) if (not head_config or - head_config.get("tags", {}).get(TAG_RAY_NODE_TYPE) - != NODE_TYPE_HEAD): + head_config.get("tags", {}).get(TAG_RAY_NODE_KIND) + != NODE_KIND_HEAD): workers = {} logger.info("Head IP changed - recreating cluster.") else: @@ -43,23 +43,23 @@ class ClusterState: if worker_ip not in workers: workers[worker_ip] = { "tags": { - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }, "state": "terminated", } else: - assert (workers[worker_ip]["tags"][TAG_RAY_NODE_TYPE] - == NODE_TYPE_WORKER) + assert (workers[worker_ip]["tags"][TAG_RAY_NODE_KIND] + == NODE_KIND_WORKER) if provider_config["head_ip"] not in workers: workers[provider_config["head_ip"]] = { "tags": { - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD + TAG_RAY_NODE_KIND: NODE_KIND_HEAD }, "state": "terminated", } else: assert (workers[provider_config["head_ip"]]["tags"][ - TAG_RAY_NODE_TYPE] == NODE_TYPE_HEAD) + TAG_RAY_NODE_KIND] == NODE_KIND_HEAD) # Relevant when a user reduces the number of workers # without changing the headnode. list_of_node_ips = list(provider_config["worker_ips"]) @@ -209,13 +209,13 @@ class LocalNodeProvider(NodeProvider): def create_node(self, node_config, tags, count): """Creates min(count, currently available) nodes.""" - node_type = tags[TAG_RAY_NODE_TYPE] + node_type = tags[TAG_RAY_NODE_KIND] with self.state.file_lock: workers = self.state.get() for node_id, info in workers.items(): if (info["state"] == "terminated" and (self.use_coordinator - or info["tags"][TAG_RAY_NODE_TYPE] == node_type)): + or info["tags"][TAG_RAY_NODE_KIND] == node_type)): info["tags"] = tags info["state"] = "running" self.state.put(node_id, info) diff --git a/python/ray/autoscaler/node_launcher.py b/python/ray/autoscaler/node_launcher.py index fdf65ed65..8f89af3c2 100644 --- a/python/ray/autoscaler/node_launcher.py +++ b/python/ray/autoscaler/node_launcher.py @@ -4,9 +4,9 @@ import logging import threading from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS, - TAG_RAY_NODE_TYPE, TAG_RAY_NODE_NAME, - TAG_RAY_INSTANCE_TYPE, STATUS_UNINITIALIZED, - NODE_TYPE_WORKER) + TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME, + TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, + NODE_KIND_WORKER) from ray.autoscaler.util import hash_launch_conf logger = logging.getLogger(__name__) @@ -19,29 +19,29 @@ class NodeLauncher(threading.Thread): provider, queue, pending, - instance_types=None, + node_types=None, index=None, *args, **kwargs): self.queue = queue self.pending = pending self.provider = provider - self.instance_types = instance_types + self.node_types = node_types self.index = str(index) if index is not None else "" super(NodeLauncher, self).__init__(*args, **kwargs) def _launch_node(self, config: Dict[str, Any], count: int, - instance_type: Optional[str]): - if self.instance_types: - assert instance_type, instance_type - worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER} + node_type: Optional[str]): + if self.node_types: + assert node_type, node_type + worker_filter = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} before = self.provider.non_terminated_nodes(tag_filters=worker_filter) launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"]) - self.log("Launching {} nodes, type {}.".format(count, instance_type)) + self.log("Launching {} nodes, type {}.".format(count, node_type)) node_config = copy.deepcopy(config["worker_nodes"]) node_tags = { TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]), - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER, + TAG_RAY_NODE_KIND: NODE_KIND_WORKER, TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED, TAG_RAY_LAUNCH_CONFIG: launch_hash, } @@ -49,10 +49,10 @@ class NodeLauncher(threading.Thread): # merge the configs. We merge the configs instead of overriding, so # that the bootstrapped per-cloud properties are preserved. # TODO(ekl) this logic is duplicated in commands.py (keep in sync) - if instance_type: - node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type + if node_type: + node_tags[TAG_RAY_USER_NODE_TYPE] = node_type node_config.update( - config["available_node_types"][instance_type]["node_config"]) + config["available_node_types"][node_type]["node_config"]) self.provider.create_node(node_config, node_tags, count) after = self.provider.non_terminated_nodes(tag_filters=worker_filter) if set(after).issubset(before): @@ -60,14 +60,14 @@ class NodeLauncher(threading.Thread): def run(self): while True: - config, count, instance_type = self.queue.get() + config, count, node_type = self.queue.get() self.log("Got {} nodes to launch.".format(count)) try: - self._launch_node(config, count, instance_type) + self._launch_node(config, count, node_type) except Exception: logger.exception("Launch failed") finally: - self.pending.dec(instance_type, count) + self.pending.dec(node_type, count) def log(self, statement): prefix = "NodeLauncher{}:".format(self.index) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 6e34077ec..224420772 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -186,7 +186,7 @@ class NodeProvider: nodes() must be called again to refresh results. Examples: - >>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) + >>> provider.non_terminated_nodes({TAG_RAY_NODE_KIND: "worker"}) ["node-1", "node-2"] """ raise NotImplementedError diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/resource_demand_scheduler.py index 354650c7e..de41551cf 100644 --- a/python/ray/autoscaler/resource_demand_scheduler.py +++ b/python/ray/autoscaler/resource_demand_scheduler.py @@ -5,15 +5,15 @@ import collections from typing import List, Dict, Tuple from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE logger = logging.getLogger(__name__) # e.g., m4.16xlarge. -InstanceType = str +NodeType = str # e.g., {"resources": ..., "max_workers": ...}. -InstanceTypeConfigDict = str +NodeTypeConfigDict = str # e.g., {"GPU": 1}. ResourceDict = str @@ -24,60 +24,59 @@ NodeID = str class ResourceDemandScheduler: def __init__(self, provider: NodeProvider, - instance_types: Dict[InstanceType, InstanceTypeConfigDict], + node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int): self.provider = provider - self.instance_types = instance_types + self.node_types = node_types self.max_workers = max_workers def debug_string(self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]) -> str: - node_resources, instance_type_counts = self.calculate_node_resources( + node_resources, node_type_counts = self.calculate_node_resources( nodes, pending_nodes) out = "Worker instance types:" - for instance_type, count in instance_type_counts.items(): - out += "\n - {}: {}".format(instance_type, count) - if pending_nodes.get(instance_type): - out += " ({} pending)".format(pending_nodes[instance_type]) + for node_type, count in node_type_counts.items(): + out += "\n - {}: {}".format(node_type, count) + if pending_nodes.get(node_type): + out += " ({} pending)".format(pending_nodes[node_type]) return out def calculate_node_resources( self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int] - ) -> (List[ResourceDict], Dict[InstanceType, int]): + ) -> (List[ResourceDict], Dict[NodeType, int]): """Returns node resource list and instance type counts.""" node_resources = [] - instance_type_counts = collections.defaultdict(int) + node_type_counts = collections.defaultdict(int) - def add_instance(instance_type): - if instance_type not in self.instance_types: - raise RuntimeError( - "Missing entry for instance_type {} in " - "available_instance_types config: {}".format( - instance_type, self.instance_types)) + def add_instance(node_type): + if node_type not in self.node_types: + raise RuntimeError("Missing entry for node_type {} in " + "available_node_types config: {}".format( + node_type, self.node_types)) # Careful not to include the same dict object multiple times. node_resources.append( - copy.deepcopy(self.instance_types[instance_type]["resources"])) - instance_type_counts[instance_type] += 1 + copy.deepcopy(self.node_types[node_type]["resources"])) + node_type_counts[node_type] += 1 for node_id in nodes: tags = self.provider.node_tags(node_id) - if TAG_RAY_INSTANCE_TYPE in tags: - instance_type = tags[TAG_RAY_INSTANCE_TYPE] - add_instance(instance_type) + if TAG_RAY_USER_NODE_TYPE in tags: + node_type = tags[TAG_RAY_USER_NODE_TYPE] + add_instance(node_type) - for instance_type, count in pending_nodes.items(): + for node_type, count in pending_nodes.items(): for _ in range(count): - add_instance(instance_type) + add_instance(node_type) - return node_resources, instance_type_counts + return node_resources, node_type_counts def get_instances_to_launch(self, nodes: List[NodeID], - pending_nodes: Dict[InstanceType, int], + pending_nodes: Dict[NodeType, int], resource_demands: List[ResourceDict] - ) -> List[Tuple[InstanceType, int]]: + ) -> List[Tuple[NodeType, int]]: """Get a list of instance types that should be added to the cluster. This method: @@ -91,30 +90,30 @@ class ResourceDemandScheduler: logger.info("No resource demands") return [] - node_resources, instance_type_counts = self.calculate_node_resources( + node_resources, node_type_counts = self.calculate_node_resources( nodes, pending_nodes) logger.info("Cluster resources: {}".format(node_resources)) - logger.info("Instance counts: {}".format(instance_type_counts)) + logger.info("Instance counts: {}".format(node_type_counts)) unfulfilled = get_bin_pack_residual(node_resources, resource_demands) logger.info("Resource demands: {}".format(resource_demands)) logger.info("Unfulfilled demands: {}".format(unfulfilled)) - instances = get_instances_for( - self.instance_types, instance_type_counts, - self.max_workers - len(nodes), unfulfilled) + instances = get_instances_for(self.node_types, node_type_counts, + self.max_workers - len(nodes), + unfulfilled) logger.info("Instance requests: {}".format(instances)) return instances def get_instances_for( - instance_types: Dict[InstanceType, InstanceTypeConfigDict], - existing_instances: Dict[InstanceType, int], max_to_add: int, - resources: List[ResourceDict]) -> List[Tuple[InstanceType, int]]: + node_types: Dict[NodeType, NodeTypeConfigDict], + existing_instances: Dict[NodeType, int], max_to_add: int, + resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]: """Determine instances to add given resource demands and constraints. Args: - instance_types: instance types config. + node_types: instance types config. existing_instances: counts of existing instances already launched. This sets constraints on the number of new instances to add. max_to_add: global constraint on instances to add. @@ -128,15 +127,14 @@ def get_instances_for( while resources and sum(instances_to_add.values()) < max_to_add: utilization_scores = [] - for instance_type in instance_types: - if (existing_instances.get( - instance_type, 0) + instances_to_add.get(instance_type, 0) - >= instance_types[instance_type]["max_workers"]): + for node_type in node_types: + if (existing_instances.get(node_type, 0) + instances_to_add.get( + node_type, 0) >= node_types[node_type]["max_workers"]): continue - node_resources = instance_types[instance_type]["resources"] + node_resources = node_types[node_type]["resources"] score = _utilization_score(node_resources, resources) if score is not None: - utilization_scores.append((score, instance_type)) + utilization_scores.append((score, node_type)) # Give up, no feasible node. if not utilization_scores: @@ -144,10 +142,9 @@ def get_instances_for( break utilization_scores = sorted(utilization_scores, reverse=True) - best_instance_type = utilization_scores[0][1] - instances_to_add[best_instance_type] += 1 - allocated_resources.append( - instance_types[best_instance_type]["resources"]) + best_node_type = utilization_scores[0][1] + instances_to_add[best_node_type] += 1 + allocated_resources.append(node_types[best_node_type]["resources"]) residual = get_bin_pack_residual(allocated_resources[-1:], resources) assert len(residual) < len(resources), (resources, residual) resources = residual diff --git a/python/ray/autoscaler/tags.py b/python/ray/autoscaler/tags.py index d24ac3a73..468dadd1c 100644 --- a/python/ray/autoscaler/tags.py +++ b/python/ray/autoscaler/tags.py @@ -3,14 +3,15 @@ # Tag for the name of the node TAG_RAY_NODE_NAME = "ray-node-name" -# Tag for the type of node (e.g. Head, Worker) -TAG_RAY_NODE_TYPE = "ray-node-type" -NODE_TYPE_HEAD = "head" -NODE_TYPE_WORKER = "worker" +# Tag for the kind of node (e.g. Head, Worker). For legacy reasons, the tag +# value says 'type' instead of 'kind'. +TAG_RAY_NODE_KIND = "ray-node-type" +NODE_KIND_HEAD = "head" +NODE_KIND_WORKER = "worker" -# Tag for the provider-specific instance type (e.g., m4.4xlarge). This is used -# for automatic worker instance type selection. -TAG_RAY_INSTANCE_TYPE = "ray-instance-type" +# Tag for user defined node types (e.g., m4xl_spot). This is used for multi +# node type clusters. +TAG_RAY_USER_NODE_TYPE = "ray-user-node-type" # Tag that reports the current state of the node (e.g. Updating, Up-to-date) TAG_RAY_NODE_STATUS = "ray-node-status" diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index c39a2e1fc..1ad1ed69a 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -14,22 +14,22 @@ from ray.autoscaler.util import prepare_config, validate_config from ray.autoscaler.commands import get_or_create_head_node from ray.autoscaler.load_metrics import LoadMetrics from ray.autoscaler.autoscaler import StandardAutoscaler -from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS, \ - STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_INSTANCE_TYPE +from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ + STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider from ray.test_utils import RayTestTimeoutException import pytest class MockNode: - def __init__(self, node_id, tags, node_config, instance_type): + def __init__(self, node_id, tags, node_config, node_type): self.node_id = node_id self.state = "pending" self.tags = tags self.external_ip = "1.2.3.4" self.internal_ip = "172.0.0.{}".format(self.node_id) self.node_config = node_config - self.instance_type = instance_type + self.node_type = node_type def matches(self, tags): for k, v in tags.items(): @@ -152,7 +152,7 @@ class MockProvider(NodeProvider): for _ in range(count): self.mock_nodes[self.next_id] = MockNode( self.next_id, tags.copy(), node_config, - tags.get(TAG_RAY_INSTANCE_TYPE)) + tags.get(TAG_RAY_USER_NODE_TYPE)) self.next_id += 1 def set_node_tags(self, node_id, tags): @@ -388,7 +388,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].instance_type, None) + self.assertEqual(self.provider.mock_nodes[0].node_type, None) def testScaleUp(self): config_path = self.write_config(SMALL_CLUSTER) @@ -443,7 +443,7 @@ class AutoscalingTest(unittest.TestCase): config["max_workers"] = 5 config_path = self.write_config(config) self.provider = MockProvider() - self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "worker"}, 10) + self.provider.create_node({}, {TAG_RAY_NODE_KIND: "worker"}, 10) runner = MockProcessRunner() autoscaler = StandardAutoscaler( config_path, @@ -527,9 +527,9 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() - self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "head"}, 1) + self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1) head_ip = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_TYPE: "head"}, )[0] + tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] runner = MockProcessRunner() lm = LoadMetrics() @@ -552,7 +552,7 @@ class AutoscalingTest(unittest.TestCase): # Connect the head and workers to end the bringup phase addrs = self.provider.non_terminated_node_ips( - tag_filters={TAG_RAY_NODE_TYPE: "worker"}, ) + tag_filters={TAG_RAY_NODE_KIND: "worker"}, ) addrs += head_ip for addr in addrs: lm.update(addr, {"CPU": 2}, {"CPU": 0}, {}) diff --git a/python/ray/tests/test_coordinator_server.py b/python/ray/tests/test_coordinator_server.py index 4b627bef8..ef1bb74cc 100644 --- a/python/ray/tests/test_coordinator_server.py +++ b/python/ray/tests/test_coordinator_server.py @@ -8,9 +8,9 @@ from ray.autoscaler.node_provider import NODE_PROVIDERS, get_node_provider from ray.autoscaler.local.node_provider import LocalNodeProvider from ray.autoscaler.local.coordinator_node_provider import ( CoordinatorSenderNodeProvider) -from ray.autoscaler.tags import (TAG_RAY_NODE_TYPE, TAG_RAY_CLUSTER_NAME, - TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, - NODE_TYPE_HEAD) +from ray.autoscaler.tags import (TAG_RAY_NODE_KIND, TAG_RAY_CLUSTER_NAME, + TAG_RAY_NODE_NAME, NODE_KIND_WORKER, + NODE_KIND_HEAD) import pytest @@ -65,13 +65,13 @@ class OnPremCoordinatorServerTest(unittest.TestCase): expected_workers = {} expected_workers[provider_config["head_ip"]] = { "tags": { - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD + TAG_RAY_NODE_KIND: NODE_KIND_HEAD }, "state": "terminated", } expected_workers[provider_config["worker_ips"][0]] = { "tags": { - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }, "state": "terminated", } @@ -93,7 +93,7 @@ class OnPremCoordinatorServerTest(unittest.TestCase): # Test adding back workers updates the cluster state. expected_workers[removed_ip] = { "tags": { - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER }, "state": "terminated", } @@ -171,7 +171,7 @@ class OnPremCoordinatorServerTest(unittest.TestCase): assert node_provider_1.is_terminated(self.list_of_node_ips[0]) assert not node_provider_1.node_tags(self.list_of_node_ips[0]) head_node_tags = { - TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD, + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, } assert not node_provider_1.non_terminated_nodes(head_node_tags) head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( @@ -232,17 +232,17 @@ class OnPremCoordinatorServerTest(unittest.TestCase): worker_node_tags = { TAG_RAY_NODE_NAME: "ray-{}-worker".format( cluster_config["cluster_name"]), - TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER + TAG_RAY_NODE_KIND: NODE_KIND_WORKER } node_provider_3.create_node(cluster_config["worker_nodes"], worker_node_tags, 1) assert node_provider_3.non_terminated_nodes( {}) == self.list_of_node_ips - worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER} + worker_filter = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER} assert node_provider_3.non_terminated_nodes(worker_filter) == [ self.list_of_node_ips[1] ] - head_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD} + head_filter = {TAG_RAY_NODE_KIND: NODE_KIND_HEAD} assert node_provider_3.non_terminated_nodes(head_filter) == [ self.list_of_node_ips[0] ] diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 53f55d710..765e3e568 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -12,7 +12,7 @@ from ray.autoscaler.autoscaler import StandardAutoscaler from ray.autoscaler.load_metrics import LoadMetrics from ray.autoscaler.node_provider import NODE_PROVIDERS from ray.autoscaler.commands import get_or_create_head_node -from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ get_bin_pack_residual, get_instances_for @@ -211,11 +211,13 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].instance_type, "m4.large") + self.assertEqual(self.provider.mock_nodes[0].node_type, "m4.large") self.assertEqual( self.provider.mock_nodes[0].node_config.get("FooProperty"), 42) self.assertEqual( - self.provider.mock_nodes[0].tags.get(TAG_RAY_INSTANCE_TYPE), + self.provider.mock_nodes[0].node_config.get("TestProp"), 1) + self.assertEqual( + self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE), "m4.large") def testScaleUpMinSanity(self): @@ -253,16 +255,16 @@ class AutoscalingTest(unittest.TestCase): autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() self.waitForNodes(1) - assert self.provider.mock_nodes[0].instance_type == "m4.large" + assert self.provider.mock_nodes[0].node_type == "m4.large" autoscaler.request_resources([{"GPU": 8}]) autoscaler.update() self.waitForNodes(2) - assert self.provider.mock_nodes[1].instance_type == "p2.8xlarge" + assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" autoscaler.request_resources([{"CPU": 32}] * 4) autoscaler.update() self.waitForNodes(4) - assert self.provider.mock_nodes[2].instance_type == "m4.16xlarge" - assert self.provider.mock_nodes[3].instance_type == "m4.16xlarge" + assert self.provider.mock_nodes[2].node_type == "m4.16xlarge" + assert self.provider.mock_nodes[3].node_type == "m4.16xlarge" def testResourcePassing(self): config = MULTI_WORKER_CLUSTER.copy() @@ -283,11 +285,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() self.waitForNodes(1) - assert self.provider.mock_nodes[0].instance_type == "m4.large" + assert self.provider.mock_nodes[0].node_type == "m4.large" autoscaler.request_resources([{"GPU": 8}]) autoscaler.update() self.waitForNodes(2) - assert self.provider.mock_nodes[1].instance_type == "p2.8xlarge" + assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" # TODO (Alex): Autoscaler creates the node during one update then # starts the updater in the enxt update. The sleep is largely