[hotfix] [autoscaler] Address remaining comments on renaming instance => node (#10229)

* more renaming

* fix import
This commit is contained in:
Eric Liang
2020-08-20 14:37:41 -07:00
committed by GitHub
parent 85a6876119
commit 0baf992a4f
5 changed files with 64 additions and 66 deletions
+3 -4
View File
@@ -199,10 +199,9 @@ class StandardAutoscaler:
# First let the resource demand scheduler launch nodes, if enabled.
if self.resource_demand_scheduler and self.resource_demand_vector:
# TODO(ekl) include head node in the node list
instances = (
self.resource_demand_scheduler.get_instances_to_launch(
nodes, self.pending_launches.breakdown(),
self.resource_demand_vector))
instances = (self.resource_demand_scheduler.get_nodes_to_launch(
nodes, self.pending_launches.breakdown(),
self.resource_demand_vector))
# TODO(ekl) also enforce max launch concurrency here?
for node_type, count in instances:
self.launch_new_node(count, node_type=node_type)
+4 -4
View File
@@ -28,7 +28,7 @@ SECURITY_GROUP_TEMPLATE = RAY + "-{}"
# Mapping from the node type tag to the section of the autoscaler yaml that
# contains the config for the node type.
NODE_TYPE_CONFIG_KEYS = {
NODE_KIND_CONFIG_KEYS = {
NODE_KIND_WORKER: "worker_nodes",
NODE_KIND_HEAD: "head_node",
}
@@ -426,8 +426,8 @@ def _configure_security_group(config):
head_security_group_src="config", workers_security_group_src="config")
node_types_to_configure = [
node_type for node_type, config_key in NODE_TYPE_CONFIG_KEYS.items()
if "SecurityGroupIds" not in config[NODE_TYPE_CONFIG_KEYS[node_type]]
node_type for node_type, config_key in NODE_KIND_CONFIG_KEYS.items()
if "SecurityGroupIds" not in config[NODE_KIND_CONFIG_KEYS[node_type]]
]
if not node_types_to_configure:
return config # have user-defined groups
@@ -506,7 +506,7 @@ def _get_or_create_vpc_security_groups(conf, node_types):
node_type_to_vpc = {
node_type: _get_vpc_id_or_die(
ec2,
conf[NODE_TYPE_CONFIG_KEYS[node_type]]["SubnetIds"][0],
conf[NODE_KIND_CONFIG_KEYS[node_type]]["SubnetIds"][0],
)
for node_type in node_types
}
+4 -4
View File
@@ -11,7 +11,7 @@ from botocore.config import Config
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.aws.config import bootstrap_aws
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_INSTANCE_TYPE
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_USER_NODE_TYPE
from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES
from ray.autoscaler.log_timer import LogTimer
@@ -218,10 +218,10 @@ class AWSNodeProvider(NodeProvider):
},
]
# This tag may not always be present.
if TAG_RAY_INSTANCE_TYPE in tags:
if TAG_RAY_USER_NODE_TYPE in tags:
filters.append({
"Name": "tag:{}".format(TAG_RAY_INSTANCE_TYPE),
"Values": [tags[TAG_RAY_INSTANCE_TYPE]],
"Name": "tag:{}".format(TAG_RAY_USER_NODE_TYPE),
"Values": [tags[TAG_RAY_USER_NODE_TYPE]],
})
reuse_nodes = list(
@@ -35,7 +35,7 @@ class ResourceDemandScheduler:
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes)
out = "Worker instance types:"
out = "Worker node types:"
for node_type, count in node_type_counts.items():
out += "\n - {}: {}".format(node_type, count)
if pending_nodes.get(node_type):
@@ -46,12 +46,12 @@ class ResourceDemandScheduler:
def calculate_node_resources(
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]
) -> (List[ResourceDict], Dict[NodeType, int]):
"""Returns node resource list and instance type counts."""
"""Returns node resource list and node type counts."""
node_resources = []
node_type_counts = collections.defaultdict(int)
def add_instance(node_type):
def add_node(node_type):
if node_type not in self.node_types:
raise RuntimeError("Missing entry for node_type {} in "
"available_node_types config: {}".format(
@@ -65,24 +65,24 @@ class ResourceDemandScheduler:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
add_instance(node_type)
add_node(node_type)
for node_type, count in pending_nodes.items():
for _ in range(count):
add_instance(node_type)
add_node(node_type)
return node_resources, node_type_counts
def get_instances_to_launch(self, nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict]
) -> List[Tuple[NodeType, int]]:
"""Get a list of instance types that should be added to the cluster.
def get_nodes_to_launch(self, nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict]
) -> List[Tuple[NodeType, int]]:
"""Get a list of node types that should be added to the cluster.
This method:
(1) calculates the resources present in the cluster.
(2) calculates the unfulfilled resource bundles.
(3) calculates which instances need to be launched to fulfill all
(3) calculates which nodes need to be launched to fulfill all
the bundle requests, subject to max_worker constraints.
"""
@@ -93,42 +93,40 @@ class ResourceDemandScheduler:
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Instance counts: {}".format(node_type_counts))
logger.info("Node counts: {}".format(node_type_counts))
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
instances = get_instances_for(self.node_types, node_type_counts,
self.max_workers - len(nodes),
unfulfilled)
logger.info("Instance requests: {}".format(instances))
return instances
nodes = get_nodes_for(self.node_types, node_type_counts,
self.max_workers - len(nodes), unfulfilled)
logger.info("Node requests: {}".format(nodes))
return nodes
def get_instances_for(
node_types: Dict[NodeType, NodeTypeConfigDict],
existing_instances: Dict[NodeType, int], max_to_add: int,
resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]:
"""Determine instances to add given resource demands and constraints.
def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
existing_nodes: Dict[NodeType, int], max_to_add: int,
resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]:
"""Determine nodes to add given resource demands and constraints.
Args:
node_types: instance types config.
existing_instances: counts of existing instances already launched.
This sets constraints on the number of new instances to add.
max_to_add: global constraint on instances to add.
node_types: node types config.
existing_nodes: counts of existing nodes already launched.
This sets constraints on the number of new nodes to add.
max_to_add: global constraint on nodes to add.
resources: resource demands to fulfill.
Returns:
List of instances types and count to add.
List of nodes types and count to add.
"""
instances_to_add = collections.defaultdict(int)
nodes_to_add = collections.defaultdict(int)
allocated_resources = []
while resources and sum(instances_to_add.values()) < max_to_add:
while resources and sum(nodes_to_add.values()) < max_to_add:
utilization_scores = []
for node_type in node_types:
if (existing_instances.get(node_type, 0) + instances_to_add.get(
if (existing_nodes.get(node_type, 0) + nodes_to_add.get(
node_type, 0) >= node_types[node_type]["max_workers"]):
continue
node_resources = node_types[node_type]["resources"]
@@ -138,18 +136,19 @@ def get_instances_for(
# Give up, no feasible node.
if not utilization_scores:
logger.info("No feasible instance to add for {}".format(resources))
logger.info(
"No feasible node type to add for {}".format(resources))
break
utilization_scores = sorted(utilization_scores, reverse=True)
best_node_type = utilization_scores[0][1]
instances_to_add[best_node_type] += 1
nodes_to_add[best_node_type] += 1
allocated_resources.append(node_types[best_node_type]["resources"])
residual = get_bin_pack_residual(allocated_resources[-1:], resources)
assert len(residual) < len(resources), (resources, residual)
resources = residual
return list(instances_to_add.items())
return list(nodes_to_add.items())
def _utilization_score(node_resources: ResourceDict,
@@ -14,7 +14,7 @@ from ray.autoscaler.node_provider import NODE_PROVIDERS
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
get_bin_pack_residual, get_instances_for
get_bin_pack_residual, get_nodes_for
from time import sleep
@@ -94,39 +94,39 @@ def test_bin_pack():
assert get_bin_pack_residual(arg, [{"GPU": 2}, {"GPU": 2}]) == [{"GPU": 2}]
def test_get_instances_packing_heuristic():
assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \
def test_get_nodes_packing_heuristic():
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 8}]) == \
[("p2.8xlarge", 1)]
assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 6) == \
[("p2.8xlarge", 1)]
assert get_instances_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \
assert get_nodes_for(TYPES_A, {}, 9999, [{"GPU": 1}] * 4) == \
[("p2.xlarge", 4)]
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 32, "GPU": 1}] * 3) \
== [("p2.8xlarge", 3)]
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64, "GPU": 1}] * 3) \
== []
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}] * 3) == \
[("m4.16xlarge", 3)]
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 1}]) \
== [("m4.16xlarge", 1), ("m4.large", 1)]
assert get_instances_for(
assert get_nodes_for(
TYPES_A, {}, 9999, [{"CPU": 64}, {"CPU": 9}, {"CPU": 9}]) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 2)]
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 16}] * 5) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 1)]
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 8}] * 10) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 1)]
assert get_instances_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \
assert get_nodes_for(TYPES_A, {}, 9999, [{"CPU": 1}] * 100) == \
[("m4.16xlarge", 1), ("m4.4xlarge", 2), ("m4.large", 2)]
assert get_instances_for(
assert get_nodes_for(
TYPES_A, {}, 9999, [{"GPU": 1}] + ([{"CPU": 1}] * 64)) == \
[("m4.16xlarge", 1), ("p2.xlarge", 1)]
assert get_instances_for(
assert get_nodes_for(
TYPES_A, {}, 9999, ([{"GPU": 1}] * 8) + ([{"CPU": 1}] * 64)) == \
[("m4.16xlarge", 1), ("p2.8xlarge", 1)]
def test_get_instances_respects_max_limit():
def test_get_nodes_respects_max_limit():
types = {
"m4.large": {
"resources": {
@@ -141,18 +141,18 @@ def test_get_instances_respects_max_limit():
"max_workers": 99999,
},
}
assert get_instances_for(types, {}, 2, [{"CPU": 1}] * 10) == \
assert get_nodes_for(types, {}, 2, [{"CPU": 1}] * 10) == \
[("m4.large", 2)]
assert get_instances_for(types, {"m4.large": 9999}, 9999, [{
assert get_nodes_for(types, {"m4.large": 9999}, 9999, [{
"CPU": 1
}] * 10) == []
assert get_instances_for(types, {"m4.large": 0}, 9999, [{
assert get_nodes_for(types, {"m4.large": 0}, 9999, [{
"CPU": 1
}] * 10) == [("m4.large", 5)]
assert get_instances_for(types, {"m4.large": 7}, 4, [{
assert get_nodes_for(types, {"m4.large": 7}, 4, [{
"CPU": 1
}] * 10) == [("m4.large", 3)]
assert get_instances_for(types, {"m4.large": 7}, 2, [{
assert get_nodes_for(types, {"m4.large": 7}, 2, [{
"CPU": 1
}] * 10) == [("m4.large", 2)]