[autoscaler] Fix semantics of request_resources (#11820)

This commit is contained in:
Eric Liang
2020-11-09 14:57:40 -08:00
committed by GitHub
parent 1c132f2ff8
commit a9cf0141a0
6 changed files with 199 additions and 64 deletions
+9 -8
View File
@@ -209,17 +209,14 @@ class StandardAutoscaler:
# First let the resource demand scheduler launch nodes, if enabled.
if self.resource_demand_scheduler:
resource_demand_vector = self.resource_demand_vector + \
self.load_metrics.get_resource_demand_vector()
pending_placement_groups = \
self.load_metrics.get_pending_placement_groups()
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
resource_demand_vector,
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
pending_placement_groups,
self.load_metrics.get_static_node_resources_by_ip())
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.resource_demand_vector)
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
@@ -563,7 +560,11 @@ class StandardAutoscaler:
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
self.pending_launches.inc(node_type, count)
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count, node_type))
# Split into individual launch requests of the max batch size.
while count > 0:
self.launch_queue.put((config, min(count, self.max_launch_batch),
node_type))
count -= self.max_launch_batch
def all_workers(self):
return self.workers() + self.unmanaged_workers()
+14 -12
View File
@@ -19,6 +19,7 @@ try: # py3
except ImportError: # py2
from pipes import quote
import ray
from ray.experimental.internal_kv import _internal_kv_get
import ray._private.services as services
from ray.autoscaler.node_provider import NodeProvider
@@ -104,22 +105,23 @@ def request_resources(num_cpus: Optional[int] = None,
This function is to be called e.g. on a node before submitting a bunch of
ray.remote calls to ensure that resources rapidly become available.
This function is EXPERIMENTAL.
Args:
num_cpus: int -- the number of CPU cores to request
bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This
only has an effect if you've configured `available_node_types`
if your cluster config.
num_cpus (int): Scale the cluster to ensure this number of CPUs are
available. This request is persistent until another call to
request_resources() is made.
bundles (List[ResourceDict]): Scale the cluster to ensure this set of
resource shapes can fit. This request is persistent until another
call to request_resources() is made.
"""
if not ray.is_initialized():
raise RuntimeError("Ray is not initialized yet")
r = _redis()
if num_cpus is not None and num_cpus > 0:
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
json.dumps({
"CPU": num_cpus
}))
to_request = []
if num_cpus:
to_request += [{"CPU": 1}] * num_cpus
if bundles:
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(bundles))
to_request += bundles
r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(to_request))
def create_or_update_cluster(config_file: str,
@@ -52,11 +52,14 @@ class ResourceDemandScheduler:
and NODE_TYPE_LEGACY_WORKER in node_types)
def get_nodes_to_launch(
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
self,
nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict],
usage_by_ip: Dict[NodeIP, ResourceDict],
unused_resources_by_ip: Dict[NodeIP, ResourceDict],
pending_placement_groups: List[PlacementGroupTableData],
static_node_resources: Dict[NodeIP, ResourceDict]
max_resources_by_ip: Dict[NodeIP, ResourceDict],
ensure_min_cluster_size: List[ResourceDict] = None,
) -> Dict[NodeType, int]:
"""Given resource demands, return node types to add to the cluster.
@@ -74,19 +77,40 @@ class ResourceDemandScheduler:
nodes: List of existing nodes in the cluster.
pending_nodes: Summary of node types currently being launched.
resource_demands: Vector of resource demands from the scheduler.
usage_by_ip: Mapping from ip to available resources.
unused_resources_by_ip: Mapping from ip to available resources.
pending_placement_groups: Placement group demands.
static_node_resources: Mapping from ip to static node resources.
max_resources_by_ip: Mapping from ip to static node resources.
ensure_min_cluster_size: Try to ensure the cluster can fit at least
this set of resources. This differs from resources_demands in
that we don't take into account existing usage.
"""
# If the user is using request_resources() API, calculate the remaining
# delta resources required to meet their requested cluster size.
if ensure_min_cluster_size is not None:
used_resources = []
for ip, max_res in max_resources_by_ip.items():
res = copy.deepcopy(max_res)
_inplace_subtract(res, unused_resources_by_ip.get(ip, {}))
used_resources.append(res)
# Example: user requests 1000 CPUs, but the cluster is currently
# 500 CPUs in size with 250 used. Then, the delta is 750 CPUs that
# we need to fit to get the cluster to scale to 1000.
resource_requests, _ = get_bin_pack_residual(
used_resources, ensure_min_cluster_size)
resource_demands += resource_requests
else:
resource_requests = []
if self.is_legacy_yaml:
# When using legacy yaml files we need to infer the head & worker
# node resources from the static node resources from LoadMetrics.
self._infer_legacy_node_resources_if_needed(static_node_resources)
self._infer_legacy_node_resources_if_needed(max_resources_by_ip)
node_resources: List[ResourceDict]
node_type_counts: Dict[NodeType, int]
node_resources, node_type_counts = \
self.calculate_node_resources(nodes, pending_nodes, usage_by_ip)
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes, unused_resources_by_ip)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))
@@ -118,6 +142,12 @@ class ResourceDemandScheduler:
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
max_to_add = self.max_workers - sum(node_type_counts.values())
if resource_requests:
nodes_to_add_based_on_requests = get_nodes_for(
self.node_types, node_type_counts, max_to_add,
resource_requests)
else:
nodes_to_add_based_on_requests = {}
nodes_to_add_based_on_demand = get_nodes_for(
self.node_types, node_type_counts, max_to_add, unfulfilled)
# Merge nodes to add based on demand and nodes to add based on
@@ -133,7 +163,8 @@ class ResourceDemandScheduler:
# Limit the number of concurrent launches
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch(
total_nodes_to_add, usage_by_ip.keys(), nodes, pending_nodes)
total_nodes_to_add, unused_resources_by_ip.keys(), nodes,
pending_nodes, nodes_to_add_based_on_requests)
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
@@ -168,23 +199,23 @@ class ResourceDemandScheduler:
return {}
def _infer_legacy_node_resources_if_needed(
self, static_node_resources: Dict[NodeIP, ResourceDict]
self, max_resources_by_ip: Dict[NodeIP, ResourceDict]
) -> (bool, Dict[NodeType, int]):
"""Infers node resources for legacy config files.
Updates the resources of the head and worker node types in
self.node_types.
Args:
static_node_resources: Mapping from ip to static node resources.
max_resources_by_ip: Mapping from ip to static node resources.
"""
# We fill the head node resources only once.
if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]:
assert len(static_node_resources) == 1 # Only the head node.
assert len(max_resources_by_ip) == 1 # Only the head node.
self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = next(
iter(static_node_resources.values()))
iter(max_resources_by_ip.values()))
# We fill the worker node resources only once.
if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
if len(static_node_resources) > 1:
if len(max_resources_by_ip) > 1:
# Set the node_types here as we already launched a worker node
# from which we directly get the node_resources.
worker_nodes = self.provider.non_terminated_nodes(
@@ -194,15 +225,18 @@ class ResourceDemandScheduler:
for node_id in worker_nodes
]
for ip in worker_node_ips:
if ip in static_node_resources:
if ip in max_resources_by_ip:
self.node_types[NODE_TYPE_LEGACY_WORKER][
"resources"] = static_node_resources[ip]
"resources"] = max_resources_by_ip[ip]
assert self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]
def _get_concurrent_resource_demand_to_launch(
self, to_launch: Dict[NodeType, int],
connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID],
pending_launches_nodes: Dict[NodeType, int]
self,
to_launch: Dict[NodeType, int],
connected_nodes: List[NodeIP],
non_terminated_nodes: List[NodeID],
pending_launches_nodes: Dict[NodeType, int],
nodes_to_add_based_on_requests: Dict[NodeType, int],
) -> Dict[NodeType, int]:
"""Updates the max concurrent resources to launch for each node type.
@@ -221,6 +255,9 @@ class ResourceDemandScheduler:
connected_nodes: Running nodes (from LoadMetrics).
non_terminated_nodes: Non terminated nodes (pending/running).
pending_launches_nodes: Nodes that are in the launch queue.
nodes_to_add_based_on_requests: Nodes to launch to satisfy
request_resources(). This overrides the launch limits since the
user is hinting to immediately scale up to this size.
Returns:
Dict[NodeType, int]: Maximum number of nodes to launch for each
node type.
@@ -240,15 +277,20 @@ class ResourceDemandScheduler:
total_pending_nodes = pending_launches_nodes.get(
node_type, 0) + pending_nodes[node_type]
# Allow more nodes if this is to respect min_workers constraint.
nodes_to_add = max(
upper_bound = max(
max_allowed_pending_nodes - total_pending_nodes,
self.node_types[node_type].get("min_workers", 0) -
total_pending_nodes - running_nodes[node_type])
if nodes_to_add > 0:
# Allow more nodes if this is to respect min_workers.
self.node_types[node_type].get("min_workers", 0) -
total_pending_nodes - running_nodes[node_type],
# Allow more nodes from request_resources API.
nodes_to_add_based_on_requests.get(node_type,
0) - total_pending_nodes)
if upper_bound > 0:
updated_nodes_to_launch[node_type] = min(
nodes_to_add, to_launch[node_type])
upper_bound, to_launch[node_type])
return updated_nodes_to_launch
@@ -274,7 +316,7 @@ class ResourceDemandScheduler:
def calculate_node_resources(
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]
unused_resources_by_ip: Dict[str, ResourceDict]
) -> (List[ResourceDict], Dict[NodeType, int]):
"""Returns node resource list and node type counts.
@@ -317,7 +359,7 @@ class ResourceDemandScheduler:
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
ip = self.provider.internal_ip(node_id)
available_resources = usage_by_ip.get(ip)
available_resources = unused_resources_by_ip.get(ip)
add_node(node_type, available_resources)
for node_type, count in pending_nodes.items():
@@ -375,9 +417,9 @@ class ResourceDemandScheduler:
def debug_string(self, nodes: List[NodeID],
pending_nodes: Dict[NodeID, int],
usage_by_ip: Dict[str, ResourceDict]) -> str:
unused_resources_by_ip: Dict[str, ResourceDict]) -> str:
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes, usage_by_ip)
nodes, pending_nodes, unused_resources_by_ip)
out = "Worker node types:"
for node_type, count in node_type_counts.items():
@@ -537,13 +579,14 @@ def get_bin_pack_residual(node_resources: List[ResourceDict],
Returns:
List[ResourceDict] the residual list resources that do not fit.
List[ResourceDict]: The updated node_resources after the method.
"""
unfulfilled = []
# A most naive bin packing algorithm.
nodes = copy.deepcopy(node_resources)
# List of nodes that cannot be used again due to strict spread.
used = []
for demand in resource_demands:
found = False
+14 -6
View File
@@ -180,13 +180,21 @@ def request_resources(num_cpus: Optional[int] = None,
This function is to be called e.g. on a node before submitting a bunch of
ray.remote calls to ensure that resources rapidly become available.
This function is EXPERIMENTAL.
Args:
num_cpus: int -- the number of CPU cores to request
bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This
only has an effect if you've configured `available_node_types`
if your cluster config.
num_cpus (int): Scale the cluster to ensure this number of CPUs are
available. This request is persistent until another call to
request_resources() is made.
bundles (List[ResourceDict]): Scale the cluster to ensure this set of
resource shapes can fit. This request is persistent until another
call to request_resources() is made.
Examples:
>>> # Request 1000 CPUs.
>>> request_resources(num_cpus=1000)
>>> # Request 64 CPUs and also fit a 1-GPU/4-CPU task.
>>> request_resources(num_cpus=64, bundles=[{"GPU": 1, "CPU": 4}])
>>> # Same as requesting num_cpus=3.
>>> request_resources(bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}])
"""
return commands.request_resources(num_cpus, bundles)
@@ -371,6 +371,81 @@ def test_calculate_node_resources():
assert to_launch == {"p2.8xlarge": 1}
def test_request_resources_existing_usage():
provider = MockProvider()
TYPES = {
"p2.8xlarge": {
"node_config": {},
"resources": {
"CPU": 32,
"GPU": 8
},
"max_workers": 40,
},
}
scheduler = ResourceDemandScheduler(provider, TYPES, max_workers=100)
# 5 nodes with 32 CPU and 8 GPU each
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE
}, 2)
all_nodes = provider.non_terminated_nodes({})
node_ips = provider.non_terminated_node_ips({})
assert len(node_ips) == 2, node_ips
# Fully utilized, no requests.
avail_by_ip = {ip: {} for ip in node_ips}
max_by_ip = {ip: {"GPU": 8, "CPU": 32} for ip in node_ips}
demands = []
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
assert len(to_launch) == 0, to_launch
# Fully utilized, resource requests exactly equal.
avail_by_ip = {ip: {} for ip in node_ips}
demands = [{"GPU": 4}] * 4
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
assert len(to_launch) == 0, to_launch
# Fully utilized, resource requests in excess.
avail_by_ip = {ip: {} for ip in node_ips}
demands = [{"GPU": 4}] * 7
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
assert to_launch.get("p2.8xlarge") == 2, to_launch
# Not utilized, no requests.
avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips}
demands = []
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
assert len(to_launch) == 0, to_launch
# Not utilized, resource requests exactly equal.
avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips}
demands = [{"GPU": 4}] * 4
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
assert len(to_launch) == 0, to_launch
# Not utilized, resource requests in excess.
avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips}
demands = [{"GPU": 4}] * 7
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
assert to_launch.get("p2.8xlarge") == 2, to_launch
# Not utilized, resource requests hugely in excess.
avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips}
demands = [{"GPU": 4}] * 70
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip,
[], max_by_ip, demands)
# This bypasses the launch rate limit.
assert to_launch.get("p2.8xlarge") == 33, to_launch
def test_backlog_queue_impact_on_binpacking_time():
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["max_workers"] = 1000
@@ -579,7 +654,7 @@ def test_get_concurrent_resource_demand_to_launch():
# Sanity check.
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {})
scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}, {})
assert updated_to_launch == {}
provider.create_node({}, {
@@ -598,7 +673,7 @@ def test_get_concurrent_resource_demand_to_launch():
connected_nodes = [] # All the non_terminated_nodes are not connected yet.
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
pending_launches_nodes, {})
# Note: we have 2 pending/launching gpus, 3 pending/launching cpus,
# 0 running gpu, and 0 running cpus.
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2}
@@ -611,7 +686,7 @@ def test_get_concurrent_resource_demand_to_launch():
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
pending_launches_nodes, {})
# Note that here we have 1 launching gpu, 1 launching cpu,
# 1 running gpu, and 2 running cpus.
assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4}
@@ -632,7 +707,7 @@ def test_get_concurrent_resource_demand_to_launch():
pending_launches_nodes = {} # No pending launches
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
pending_launches_nodes, {})
# Note: we have 5 pending cpus. So we are not allowed to start any.
# Still only 2 running cpus.
assert updated_to_launch == {}
@@ -643,7 +718,7 @@ def test_get_concurrent_resource_demand_to_launch():
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
pending_launches_nodes, {})
# Note: that here we have 7 running cpus and nothing pending/launching.
assert updated_to_launch == {"m4.large": 7}
@@ -659,7 +734,7 @@ def test_get_concurrent_resource_demand_to_launch():
pending_launches_nodes = {"m4.large": 1}
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
pending_launches_nodes, {})
# Note: we have 8 pending/launching cpus and only 7 running.
# So we should not launch anything (8 < 7).
assert updated_to_launch == {}
@@ -670,7 +745,7 @@ def test_get_concurrent_resource_demand_to_launch():
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes)
pending_launches_nodes, {})
# Note: that here we have 14 running cpus and 1 launching.
assert updated_to_launch == {"m4.large": 13}