[autoscaler] Support legacy cluster configs with the new resource demand scheduler (#11751)

This commit is contained in:
Ameer Haj Ali
2020-11-04 22:05:48 +02:00
committed by GitHub
parent 31598338b3
commit ebdf8ba3fa
14 changed files with 300 additions and 35 deletions
+5 -4
View File
@@ -218,7 +218,8 @@ class StandardAutoscaler:
self.pending_launches.breakdown(),
resource_demand_vector,
self.load_metrics.get_resource_utilization(),
pending_placement_groups)
pending_placement_groups,
self.load_metrics.get_static_node_resources_by_ip())
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
@@ -300,9 +301,9 @@ class StandardAutoscaler:
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_type_counts[node_type] += 1
if node_type_counts[node_type] <= \
self.available_node_types[node_type].get(
"min_workers", 0):
min_workers = self.available_node_types[node_type].get(
"min_workers", 0)
if node_type_counts[node_type] <= min_workers:
return True
return False
+12 -1
View File
@@ -6,6 +6,8 @@ import numpy as np
import ray._private.services as services
from ray.autoscaler._private.constants import MEMORY_RESOURCE_UNIT_BYTES
from ray.gcs_utils import PlacementGroupTableData
from ray.autoscaler._private.resource_demand_scheduler import \
NodeIP, ResourceDict
logger = logging.getLogger(__name__)
@@ -104,7 +106,7 @@ class LoadMetrics:
return self._info()["NumNodesConnected"]
def get_node_resources(self):
"""Return a list of node resources (static resource sizes.
"""Return a list of node resources (static resource sizes).
Example:
>>> metrics.get_node_resources()
@@ -112,6 +114,15 @@ class LoadMetrics:
"""
return self.static_resources_by_ip.values()
def get_static_node_resources_by_ip(self) -> Dict[NodeIP, ResourceDict]:
"""Return a dict of node resources for every node ip.
Example:
>>> lm.get_static_node_resources_by_ip()
{127.0.0.1: {"CPU": 1}, 127.0.0.2: {"CPU": 4, "GPU": 8}}
"""
return self.static_resources_by_ip
def get_resource_utilization(self):
return self.dynamic_resources_by_ip
@@ -17,11 +17,13 @@ from typing import List, Dict
from ray.autoscaler.node_provider import NodeProvider
from ray.gcs_utils import PlacementGroupTableData
from ray.core.generated.common_pb2 import PlacementStrategy
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED
from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, NODE_KIND_UNMANAGED,
NODE_TYPE_LEGACY_WORKER, NODE_KIND_WORKER,
NODE_TYPE_LEGACY_HEAD, TAG_RAY_NODE_KIND)
logger = logging.getLogger(__name__)
# e.g., m4.16xlarge.
# e.g., cpu_4_ondemand.
NodeType = str
# e.g., {"resources": ..., "max_workers": ...}.
@@ -42,14 +44,19 @@ class ResourceDemandScheduler:
node_types: Dict[NodeType, NodeTypeConfigDict],
max_workers: int):
self.provider = provider
self.node_types = node_types
self.node_types = copy.deepcopy(node_types)
self.max_workers = max_workers
# is_legacy_yaml tracks if the cluster configs was originally without
# available_node_types and was autofilled with available_node_types.
self.is_legacy_yaml = (NODE_TYPE_LEGACY_HEAD in node_types
and NODE_TYPE_LEGACY_WORKER in node_types)
def get_nodes_to_launch(
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict],
usage_by_ip: Dict[NodeIP, ResourceDict],
pending_placement_groups: List[PlacementGroupTableData]
pending_placement_groups: List[PlacementGroupTableData],
static_node_resources: Dict[NodeIP, ResourceDict]
) -> Dict[NodeType, int]:
"""Given resource demands, return node types to add to the cluster.
@@ -68,7 +75,13 @@ class ResourceDemandScheduler:
pending_nodes: Summary of node types currently being launched.
resource_demands: Vector of resource demands from the scheduler.
usage_by_ip: Mapping from ip to available resources.
pending_placement_groups: Placement group demands.
static_node_resources: Mapping from ip to static node resources.
"""
if self.is_legacy_yaml:
# When using legacy yaml files we need to infer the head & worker
# node resources from the static node resources from LoadMetrics.
self._infer_legacy_node_resources_if_needed(static_node_resources)
node_resources: List[ResourceDict]
node_type_counts: Dict[NodeType, int]
@@ -87,6 +100,13 @@ class ResourceDemandScheduler:
placement_group_demand_vector, strict_spreads = \
placement_groups_to_resource_demands(pending_placement_groups)
resource_demands.extend(placement_group_demand_vector)
if self.is_legacy_yaml and \
not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
# Need to launch worker nodes to later infer their
# resources.
return self._legacy_worker_node_to_launch(
nodes, pending_nodes, node_resources, resource_demands)
placement_group_nodes_to_add, node_resources, node_type_counts = \
self.reserve_and_allocate_spread(
strict_spreads, node_resources, node_type_counts)
@@ -118,6 +138,67 @@ class ResourceDemandScheduler:
logger.info("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
def _legacy_worker_node_to_launch(
self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int],
node_resources: List[ResourceDict],
resource_demands: List[ResourceDict]) -> Dict[NodeType, int]:
"""Get worker nodes to launch when resources missing in legacy yamls.
If there is unfulfilled demand and we don't know the resources of the
workers, it returns max(1, min_workers) worker nodes from which we
later calculate the node resources.
"""
if self.max_workers == 0:
return {}
elif pending_nodes or len(nodes) > 1:
# If we are already launching a worker node.
# If first worker node fails this will never launch more nodes.
return {}
else:
unfulfilled, _ = get_bin_pack_residual(node_resources,
resource_demands)
if self.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] > 0 or \
unfulfilled:
return {
NODE_TYPE_LEGACY_WORKER: max(
1, self.node_types[NODE_TYPE_LEGACY_WORKER][
"min_workers"])
}
else:
return {}
def _infer_legacy_node_resources_if_needed(
self, static_node_resources: Dict[NodeIP, ResourceDict]
) -> (bool, Dict[NodeType, int]):
"""Infers node resources for legacy config files.
Updates the resources of the head and worker node types in
self.node_types.
Args:
static_node_resources: Mapping from ip to static node resources.
"""
# We fill the head node resources only once.
if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]:
assert len(static_node_resources) == 1 # Only the head node.
self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = next(
iter(static_node_resources.values()))
# We fill the worker node resources only once.
if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
if len(static_node_resources) > 1:
# Set the node_types here as we already launched a worker node
# from which we directly get the node_resources.
worker_nodes = self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
worker_node_ips = [
self.provider.internal_ip(node_id)
for node_id in worker_nodes
]
for ip in worker_node_ips:
if ip in static_node_resources:
self.node_types[NODE_TYPE_LEGACY_WORKER][
"resources"] = static_node_resources[ip]
assert self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]
def _get_concurrent_resource_demand_to_launch(
self, to_launch: Dict[NodeType, int],
connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID],
+33 -2
View File
@@ -12,6 +12,7 @@ import ray._private.services as services
from ray.autoscaler._private.providers import _get_default_config, \
_NODE_PROVIDERS
from ray.autoscaler._private.docker import validate_docker_config
from ray.autoscaler.tags import NODE_TYPE_LEGACY_WORKER, NODE_TYPE_LEGACY_HEAD
REQUIRED, OPTIONAL = True, False
RAY_SCHEMA_PATH = os.path.join(
@@ -98,15 +99,45 @@ def prepare_config(config):
return with_defaults
def rewrite_legacy_yaml_to_available_node_types(
config: Dict[str, Any]) -> Dict[str, Any]:
if "available_node_types" in config:
return config
else:
# TODO(ameer/ekl/alex): we can also rewrite here many other fields
# that include initialization/setup/start commands and ImageId.
config["available_node_types"] = {
NODE_TYPE_LEGACY_HEAD: {
"node_config": config["head_node"],
"resources": {},
"min_workers": 0,
"max_workers": 0,
},
NODE_TYPE_LEGACY_WORKER: {
"node_config": config["worker_nodes"],
"resources": {},
"min_workers": config["min_workers"],
"max_workers": config["max_workers"],
},
}
config["head_node_type"] = NODE_TYPE_LEGACY_HEAD
config["worker_default_node_type"] = NODE_TYPE_LEGACY_WORKER
return config
def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
defaults = _get_default_config(config["provider"])
defaults.update(config)
defaults["auth"] = defaults.get("auth", {})
defaults = rewrite_legacy_yaml_to_available_node_types(defaults)
try:
defaults = _fillout_available_node_types_resources(defaults)
except Exception:
logger.exception("Failed to autodetect node resources.")
except ValueError:
# When the user uses a wrong instance type.
raise
except Exception:
# When the user is using e.g., staroid, but it is not installed.
logger.exception("Failed to autodetect node resources.")
return defaults
+4
View File
@@ -13,6 +13,10 @@ NODE_KIND_UNMANAGED = "unmanaged"
# Tag for user defined node types (e.g., m4xl_spot). This is used for multi
# node type clusters.
TAG_RAY_USER_NODE_TYPE = "ray-user-node-type"
# Tag for autofilled node types for legacy cluster yamls without multi
# node type defined in the cluster configs.
NODE_TYPE_LEGACY_HEAD = "ray-legacy-head-node-type"
NODE_TYPE_LEGACY_WORKER = "ray-legacy-worker-node-type"
# Tag that reports the current state of the node (e.g. Updating, Up-to-date)
TAG_RAY_NODE_STATUS = "ray-node-status"
-2
View File
@@ -1053,8 +1053,6 @@ class AutoscalingTest(unittest.TestCase):
def testReportsConfigFailures(self):
config = copy.deepcopy(SMALL_CLUSTER)
config["provider"]["type"] = "external"
config = prepare_config(config)
config["provider"]["type"] = "mock"
config_path = self.write_config(config)
self.provider = MockProvider()
+1 -3
View File
@@ -23,10 +23,8 @@ CONFIG_PATHS += recursive_fnmatch(
class AutoscalingConfigTest(unittest.TestCase):
def testValidateDefaultConfig(self):
for config_path in CONFIG_PATHS:
if ("aws/example-multi-node-type.yaml" in config_path
or "staroid/example-multi-node-type.yaml" in config_path):
if "aws/example-multi-node-type.yaml" in config_path:
# aws is tested in testValidateDefaultConfigAWSMultiNodeTypes.
# staroid fails as it requires an installation of staroid.
continue
with open(config_path) as f:
config = yaml.safe_load(f)
@@ -1,3 +1,5 @@
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
Checking AWS environment settings
Fetched IP: .+
ubuntu@ip-.+:~\$ exit
@@ -1,3 +1,5 @@
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
Checking AWS environment settings
Fetched IP: .+
This is a test!
@@ -1,5 +1,9 @@
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
Checking AWS environment settings
Fetched IP: .+
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
Checking AWS environment settings
Fetched IP: .+
This is a test!
@@ -1,5 +1,7 @@
Cluster: test-cli
Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
Checking AWS environment settings
AWS config
IAM Profile: .+ \[default\]
@@ -5,7 +5,7 @@ file_mounts:
~/tests: .
head_node:
ImageId: latest_dlami
InstanceType: t3a.small
InstanceType: t1.micro
head_setup_commands:
- echo head
head_start_ray_commands:
@@ -30,9 +30,9 @@ setup_commands:
target_utilization_fraction: 0.9
worker_nodes:
ImageId: latest_dlami
InstanceType: t3a.small
InstanceType: t1.micro
worker_setup_commands:
- echo worker
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP
- ray start --address=$RAY_HEAD_IP
@@ -1,4 +1,6 @@
.+\.py.*Cluster: test-cli
.+\.py.*Updating the resources of ray-legacy-head-node-type to {'CPU': 1}.
.+\.py.*Updating the resources of ray-legacy-worker-node-type to {'CPU': 1}.
.+\.py.*Checking AWS environment settings
.+\.py.*Creating new IAM instance profile ray-autoscaler-v1 for use as the default\.
.+\.py.*Creating new IAM role ray-autoscaler-v1 for use as the default instance role\.
@@ -7,6 +7,8 @@ import unittest
import copy
import ray
from ray.autoscaler._private.util import \
rewrite_legacy_yaml_to_available_node_types
from ray.tests.test_autoscaler import SMALL_CLUSTER, MockProvider, \
MockProcessRunner
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
@@ -21,7 +23,9 @@ from ray.gcs_utils import PlacementGroupTableData
from ray.core.generated.common_pb2 import Bundle, PlacementStrategy
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, \
NODE_KIND_WORKER, TAG_RAY_NODE_STATUS, \
STATUS_UP_TO_DATE, STATUS_UNINITIALIZED
STATUS_UP_TO_DATE, STATUS_UNINITIALIZED, \
NODE_KIND_HEAD, NODE_TYPE_LEGACY_WORKER, \
NODE_TYPE_LEGACY_HEAD
from ray.test_utils import same_elements
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
@@ -293,7 +297,7 @@ def test_get_nodes_to_launch_with_min_workers():
to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{
"GPU": 8
}], utilizations, [])
}], utilizations, [], {})
assert to_launch == {"p2.8xlarge": 1}
@@ -315,7 +319,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
# requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge
demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}]
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations, [])
utilizations, [], {})
assert to_launch == {"p2.xlarge": 1}
# 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand.
@@ -324,7 +328,7 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
new_types["p2.8xlarge"]["min_workers"] = 3
scheduler = ResourceDemandScheduler(provider, new_types, 10)
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations, [])
utilizations, [], {})
# Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)]
assert to_launch == {"p2.8xlarge": 1}
@@ -342,7 +346,7 @@ def test_get_nodes_to_launch_limits():
to_launch = scheduler.get_nodes_to_launch(nodes, {"p2.8xlarge": 1}, [{
"GPU": 8
}] * 2, utilizations, [])
}] * 2, utilizations, [], {})
assert to_launch == {}
@@ -362,7 +366,7 @@ def test_calculate_node_resources():
# requires 4 p2.8xls (only 3 are in cluster/pending)
demands = [{"GPU": 8}] * (len(utilizations) + 2)
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations, [])
utilizations, [], {})
assert to_launch == {"p2.8xlarge": 1}
@@ -403,7 +407,7 @@ def test_backlog_queue_impact_on_binpacking_time():
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
t1 = time.time()
to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, demands,
usage_by_ip, [])
usage_by_ip, [], {})
t2 = time.time()
assert t2 - t1 < time_to_assert
print("The time took to launch", to_launch,
@@ -511,8 +515,8 @@ class TestPlacementGroupScaling:
strategy=PlacementStrategy.SPREAD,
bundles=([Bundle(unit_resources={"GPU": 2})] * 2)),
]
to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands,
{}, pending_placement_groups)
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, resource_demands, {}, pending_placement_groups, {})
assert to_launch == {"p2.8xlarge": 2}
def test_many_strict_spreads(self):
@@ -535,8 +539,8 @@ class TestPlacementGroupScaling:
# Each placement group will take up 2 GPUs per node, but the distinct
# placement groups should still reuse the same nodes.
pending_placement_groups = pending_placement_groups * 3
to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands,
{}, pending_placement_groups)
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, resource_demands, {}, pending_placement_groups, {})
assert to_launch == {"p2.8xlarge": 1}
def test_packing(self):
@@ -557,8 +561,8 @@ class TestPlacementGroupScaling:
]
# The 2 resource demand gpus should still be packed onto the same node
# as the 6 GPU placement group.
to_launch = scheduler.get_nodes_to_launch(nodes, {}, resource_demands,
{}, pending_placement_groups)
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, resource_demands, {}, pending_placement_groups, {})
assert to_launch == {}
@@ -679,7 +683,7 @@ def test_get_nodes_to_launch_max_launch_concurrency():
scheduler = ResourceDemandScheduler(provider, new_types, 30)
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [])
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {})
# Respects min_workers despite concurrency limitation.
assert to_launch == {"p2.8xlarge": 4}
@@ -698,7 +702,7 @@ def test_get_nodes_to_launch_max_launch_concurrency():
# requires 41 p2.8xls (currently 1 pending, 1 launching, 0 running}
demands = [{"GPU": 8}] * (len(utilizations) + 40)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations, [])
utilizations, [], {})
# Enforces max launch to 5 when < 5 running. 2 are pending/launching.
assert to_launch == {"p2.8xlarge": 3}
@@ -715,12 +719,137 @@ def test_get_nodes_to_launch_max_launch_concurrency():
# Requires additional 17 p2.8xls (now 1 pending, 1 launching, 8 running}
demands = [{"GPU": 8}] * (len(utilizations) + 15)
to_launch = scheduler.get_nodes_to_launch(nodes, launching_nodes, demands,
utilizations, [])
utilizations, [], {})
# We are allowed to launch up to 8 more since 8 are running.
# We already have 2 pending/launching, so only 6 remain.
assert to_launch == {"p2.8xlarge": 6}
def test_rewrite_legacy_yaml_to_available_node_types():
cluster_config = copy.deepcopy(SMALL_CLUSTER) # Legacy cluster_config.
cluster_config = rewrite_legacy_yaml_to_available_node_types(
cluster_config)
assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD][
"max_workers"] == 0
assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD][
"min_workers"] == 0
assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_HEAD][
"node_config"] == SMALL_CLUSTER["head_node"]
assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][
"node_config"] == SMALL_CLUSTER["worker_nodes"]
assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][
"max_workers"] == SMALL_CLUSTER["max_workers"]
assert cluster_config["available_node_types"][NODE_TYPE_LEGACY_WORKER][
"min_workers"] == SMALL_CLUSTER["min_workers"]
def test_handle_legacy_cluster_config_yaml():
provider = MockProvider()
head_resources = {"CPU": 8, "GPU": 1}
worker_resources = {"CPU": 32, "GPU": 8}
cluster_config = copy.deepcopy(SMALL_CLUSTER) # Legacy cluster_config.
cluster_config = rewrite_legacy_yaml_to_available_node_types(
cluster_config)
scheduler = ResourceDemandScheduler(
provider, cluster_config["available_node_types"], 0)
provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
head_ip = provider.non_terminated_node_ips({})[0]
head_node_id = provider.non_terminated_nodes({})[0]
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [],
{head_ip: head_resources})
assert to_launch == {} # Should always be empty with max_workers = 0.
scheduler.max_workers = 30
min_workers = scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"]
scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] = 0
to_launch = scheduler.get_nodes_to_launch([head_node_id], {}, [], {}, [],
{head_ip: head_resources})
assert to_launch == {
} # Since the resource demand does not require adding nodes.
to_launch = scheduler.get_nodes_to_launch([head_node_id], {},
[head_resources], {}, [],
{head_ip: head_resources})
assert to_launch == {
} # Since the resource demand does not require adding nodes.
scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["min_workers"] = min_workers
# Returns min_workers when min_workers>0.
to_launch = scheduler.get_nodes_to_launch([head_node_id], {},
[head_resources], {}, [],
{head_ip: head_resources})
assert to_launch == {NODE_TYPE_LEGACY_WORKER: min_workers}
provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_WORKER
}, min_workers)
nodes = provider.non_terminated_nodes({})
to_launch = scheduler.get_nodes_to_launch(nodes, {}, [head_resources], {},
[], {head_ip: head_resources})
assert to_launch == {} # A node is running, at some point it'll connect.
pending_launches = {NODE_TYPE_LEGACY_WORKER: 4}
to_launch = scheduler.get_nodes_to_launch([], pending_launches,
[head_resources], {}, [],
{head_ip: head_resources})
assert to_launch == {} # A node is launching, at some point it'll connect.
# Now assume that we already launched/connected the nodes.
ips = provider.non_terminated_node_ips({})
lm = LoadMetrics()
worker_ips = []
for ip in ips:
if ip == head_ip:
lm.update(ip, head_resources, head_resources, {})
else:
lm.update(ip, worker_resources, worker_resources, {})
worker_ips.append(ip)
assert not scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, [], {}, [], lm.get_static_node_resources_by_ip())
assert scheduler.node_types[NODE_TYPE_LEGACY_WORKER][
"resources"] == worker_resources
assert to_launch == {}
utilizations = {ip: worker_resources for ip in worker_ips}
utilizations[head_ip] = head_resources
# Requires 4 nodes since worker resources is bigger than head reasources.
demands = [worker_resources] * (len(utilizations) + 3)
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, demands, utilizations, [],
lm.get_static_node_resources_by_ip())
# 4 nodes are necessary to meet resource demand, but we never exceed
# max_workers.
assert to_launch == {}
scheduler.max_workers = 10
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, demands, utilizations, [],
lm.get_static_node_resources_by_ip())
# 4 nodes are necessary to meet resource demand, but we never exceed
# max_workers.
assert to_launch == {}
scheduler.node_types[NODE_TYPE_LEGACY_WORKER]["max_workers"] = 10
to_launch = scheduler.get_nodes_to_launch(
nodes, {}, demands, utilizations, [],
lm.get_static_node_resources_by_ip())
# 4 nodes are necessary to meet resource demand.
assert to_launch == {NODE_TYPE_LEGACY_WORKER: 4}
to_launch = scheduler.get_nodes_to_launch(nodes, pending_launches, demands,
utilizations, [],
lm.get_node_resources())
# 0 because there are 4 pending launches and we only need 4.
assert to_launch == {}
to_launch = scheduler.get_nodes_to_launch(nodes, pending_launches,
demands * 2, utilizations, [],
lm.get_node_resources())
# 1 because there are 4 pending launches and we only allow a max of 5.
assert to_launch == {NODE_TYPE_LEGACY_WORKER: 1}
class LoadMetricsTest(unittest.TestCase):
def testResourceDemandVector(self):
lm = LoadMetrics()