From 0baf992a4f4073bdc5edf8ccc2993617dc30f655 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 20 Aug 2020 14:37:41 -0700 Subject: [PATCH] [hotfix] [autoscaler] Address remaining comments on renaming instance => node (#10229) * more renaming * fix import --- python/ray/autoscaler/autoscaler.py | 7 +- python/ray/autoscaler/aws/config.py | 8 +-- python/ray/autoscaler/aws/node_provider.py | 8 +-- .../autoscaler/resource_demand_scheduler.py | 65 +++++++++---------- .../tests/test_resource_demand_scheduler.py | 42 ++++++------ 5 files changed, 64 insertions(+), 66 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 6bfd1b6eb..4959280b5 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -199,10 +199,9 @@ class StandardAutoscaler: # First let the resource demand scheduler launch nodes, if enabled. if self.resource_demand_scheduler and self.resource_demand_vector: # TODO(ekl) include head node in the node list - instances = ( - self.resource_demand_scheduler.get_instances_to_launch( - nodes, self.pending_launches.breakdown(), - self.resource_demand_vector)) + instances = (self.resource_demand_scheduler.get_nodes_to_launch( + nodes, self.pending_launches.breakdown(), + self.resource_demand_vector)) # TODO(ekl) also enforce max launch concurrency here? for node_type, count in instances: self.launch_new_node(count, node_type=node_type) diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index 3001c4fa9..16c9544b0 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -28,7 +28,7 @@ SECURITY_GROUP_TEMPLATE = RAY + "-{}" # Mapping from the node type tag to the section of the autoscaler yaml that # contains the config for the node type. -NODE_TYPE_CONFIG_KEYS = { +NODE_KIND_CONFIG_KEYS = { NODE_KIND_WORKER: "worker_nodes", NODE_KIND_HEAD: "head_node", } @@ -426,8 +426,8 @@ def _configure_security_group(config): head_security_group_src="config", workers_security_group_src="config") node_types_to_configure = [ - node_type for node_type, config_key in NODE_TYPE_CONFIG_KEYS.items() - if "SecurityGroupIds" not in config[NODE_TYPE_CONFIG_KEYS[node_type]] + node_type for node_type, config_key in NODE_KIND_CONFIG_KEYS.items() + if "SecurityGroupIds" not in config[NODE_KIND_CONFIG_KEYS[node_type]] ] if not node_types_to_configure: return config # have user-defined groups @@ -506,7 +506,7 @@ def _get_or_create_vpc_security_groups(conf, node_types): node_type_to_vpc = { node_type: _get_vpc_id_or_die( ec2, - conf[NODE_TYPE_CONFIG_KEYS[node_type]]["SubnetIds"][0], + conf[NODE_KIND_CONFIG_KEYS[node_type]]["SubnetIds"][0], ) for node_type in node_types } diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index 6ea3a301b..f301fa6ff 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -11,7 +11,7 @@ from botocore.config import Config from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.aws.config import bootstrap_aws from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \ - TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_INSTANCE_TYPE + TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES from ray.autoscaler.log_timer import LogTimer @@ -218,10 +218,10 @@ class AWSNodeProvider(NodeProvider): }, ] # This tag may not always be present. - if TAG_RAY_INSTANCE_TYPE in tags: + if TAG_RAY_USER_NODE_TYPE in tags: filters.append({ - "Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE), - "Values": [tags[TAG_RAY_INSTANCE_TYPE]], + "Name": "tag:{}".format(TAG_RAY_USER_NODE_TYPE), + "Values": [tags[TAG_RAY_USER_NODE_TYPE]], }) reuse_nodes = list( diff --git a/python/ray/autoscaler/resource_demand_scheduler.py b/python/ray/autoscaler/resource_demand_scheduler.py index de41551cf..908bb14ea 100644 --- a/python/ray/autoscaler/resource_demand_scheduler.py +++ b/python/ray/autoscaler/resource_demand_scheduler.py @@ -35,7 +35,7 @@ class ResourceDemandScheduler: node_resources, node_type_counts = self.calculate_node_resources( nodes, pending_nodes) - out = "Worker instance types:" + out = "Worker node types:" for node_type, count in node_type_counts.items(): out += "\n - {}: {}".format(node_type, count) if pending_nodes.get(node_type): @@ -46,12 +46,12 @@ class ResourceDemandScheduler: def calculate_node_resources( self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int] ) -> (List[ResourceDict], Dict[NodeType, int]): - """Returns node resource list and instance type counts.""" + """Returns node resource list and node type counts.""" node_resources = [] node_type_counts = collections.defaultdict(int) - def add_instance(node_type): + def add_node(node_type): if node_type not in self.node_types: raise RuntimeError("Missing entry for node_type {} in " "available_node_types config: {}".format( @@ -65,24 +65,24 @@ class ResourceDemandScheduler: tags = self.provider.node_tags(node_id) if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] - add_instance(node_type) + add_node(node_type) for node_type, count in pending_nodes.items(): for _ in range(count): - add_instance(node_type) + add_node(node_type) return node_resources, node_type_counts - def get_instances_to_launch(self, nodes: List[NodeID], - pending_nodes: Dict[NodeType, int], - resource_demands: List[ResourceDict] - ) -> List[Tuple[NodeType, int]]: - """Get a list of instance types that should be added to the cluster. + def get_nodes_to_launch(self, nodes: List[NodeID], + pending_nodes: Dict[NodeType, int], + resource_demands: List[ResourceDict] + ) -> List[Tuple[NodeType, int]]: + """Get a list of node types that should be added to the cluster. This method: (1) calculates the resources present in the cluster. (2) calculates the unfulfilled resource bundles. - (3) calculates which instances need to be launched to fulfill all + (3) calculates which nodes need to be launched to fulfill all the bundle requests, subject to max_worker constraints. """ @@ -93,42 +93,40 @@ class ResourceDemandScheduler: node_resources, node_type_counts = self.calculate_node_resources( nodes, pending_nodes) logger.info("Cluster resources: {}".format(node_resources)) - logger.info("Instance counts: {}".format(node_type_counts)) + logger.info("Node counts: {}".format(node_type_counts)) unfulfilled = get_bin_pack_residual(node_resources, resource_demands) logger.info("Resource demands: {}".format(resource_demands)) logger.info("Unfulfilled demands: {}".format(unfulfilled)) - instances = get_instances_for(self.node_types, node_type_counts, - self.max_workers - len(nodes), - unfulfilled) - logger.info("Instance requests: {}".format(instances)) - return instances + nodes = get_nodes_for(self.node_types, node_type_counts, + self.max_workers - len(nodes), unfulfilled) + logger.info("Node requests: {}".format(nodes)) + return nodes -def get_instances_for( - node_types: Dict[NodeType, NodeTypeConfigDict], - existing_instances: Dict[NodeType, int], max_to_add: int, - resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]: - """Determine instances to add given resource demands and constraints. +def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict], + existing_nodes: Dict[NodeType, int], max_to_add: int, + resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]: + """Determine nodes to add given resource demands and constraints. Args: - node_types: instance types config. - existing_instances: counts of existing instances already launched. - This sets constraints on the number of new instances to add. - max_to_add: global constraint on instances to add. + node_types: node types config. + existing_nodes: counts of existing nodes already launched. + This sets constraints on the number of new nodes to add. + max_to_add: global constraint on nodes to add. resources: resource demands to fulfill. Returns: - List of instances types and count to add. + List of nodes types and count to add. """ - instances_to_add = collections.defaultdict(int) + nodes_to_add = collections.defaultdict(int) allocated_resources = [] - while resources and sum(instances_to_add.values()) < max_to_add: + while resources and sum(nodes_to_add.values()) < max_to_add: utilization_scores = [] for node_type in node_types: - if (existing_instances.get(node_type, 0) + instances_to_add.get( + if (existing_nodes.get(node_type, 0) + nodes_to_add.get( node_type, 0) >= node_types[node_type]["max_workers"]): continue node_resources = node_types[node_type]["resources"] @@ -138,18 +136,19 @@ def get_instances_for( # Give up, no feasible node. if not utilization_scores: - logger.info("No feasible instance to add for {}".format(resources)) + logger.info( + "No feasible node type to add for {}".format(resources)) break utilization_scores = sorted(utilization_scores, reverse=True) best_node_type = utilization_scores[0][1] - instances_to_add[best_node_type] += 1 + nodes_to_add[best_node_type] += 1 allocated_resources.append(node_types[best_node_type]["resources"]) residual = get_bin_pack_residual(allocated_resources[-1:], resources) assert len(residual) < len(resources), (resources, residual) resources = residual - return list(instances_to_add.items()) + return list(nodes_to_add.items()) def _utilization_score(node_resources: ResourceDict, diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 765e3e568..570ad5740 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -14,7 +14,7 @@ from ray.autoscaler.node_provider import NODE_PROVIDERS from ray.autoscaler.commands import get_or_create_head_node from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ - get_bin_pack_residual, get_instances_for + get_bin_pack_residual, get_nodes_for from time import sleep @@ -94,39 +94,39 @@ def test_bin_pack(): assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [{"GPU": 2}] -def test_get_instances_packing_heuristic(): - assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \ +def test_get_nodes_packing_heuristic(): + assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \ [("p2.8xlarge", 1)] - assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \ [("p2.8xlarge", 1)] - assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \ [("p2.xlarge", 4)] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \ == [("p2.8xlarge", 3)] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \ == [] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \ [("m4.16xlarge", 3)] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \ == [("m4.16xlarge", 1), ("m4.large", 1)] - assert get_instances_for( + assert get_nodes_for( TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \ [("m4.16xlarge", 1), ("m4.4xlarge", 2)] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \ [("m4.16xlarge", 1), ("m4.4xlarge", 1)] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \ [("m4.16xlarge", 1), ("m4.4xlarge", 1)] - assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \ + assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \ [("m4.16xlarge", 1), ("m4.4xlarge", 2), ("m4.large", 2)] - assert get_instances_for( + assert get_nodes_for( TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \ [("m4.16xlarge", 1), ("p2.xlarge", 1)] - assert get_instances_for( + assert get_nodes_for( TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \ [("m4.16xlarge", 1), ("p2.8xlarge", 1)] -def test_get_instances_respects_max_limit(): +def test_get_nodes_respects_max_limit(): types = { "m4.large": { "resources": { @@ -141,18 +141,18 @@ def test_get_instances_respects_max_limit(): "max_workers": 99999, }, } - assert get_instances_for(types, {}, 2, [{"CPU": 1}] * 10) == \ + assert get_nodes_for(types, {}, 2, [{"CPU": 1}] * 10) == \ [("m4.large", 2)] - assert get_instances_for(types, {"m4.large": 9999}, 9999, [{ + assert get_nodes_for(types, {"m4.large": 9999}, 9999, [{ "CPU": 1 }] * 10) == [] - assert get_instances_for(types, {"m4.large": 0}, 9999, [{ + assert get_nodes_for(types, {"m4.large": 0}, 9999, [{ "CPU": 1 }] * 10) == [("m4.large", 5)] - assert get_instances_for(types, {"m4.large": 7}, 4, [{ + assert get_nodes_for(types, {"m4.large": 7}, 4, [{ "CPU": 1 }] * 10) == [("m4.large", 3)] - assert get_instances_for(types, {"m4.large": 7}, 2, [{ + assert get_nodes_for(types, {"m4.large": 7}, 2, [{ "CPU": 1 }] * 10) == [("m4.large", 2)]