[autoscaler] Do not count head node with min_workers constraint. (#12980)

This commit is contained in:
Ameer Haj Ali
2020-12-21 00:54:46 +02:00
committed by GitHub
parent 7ab9164f1b
commit 11f34f72d8
4 changed files with 330 additions and 193 deletions
+5 -4
View File
@@ -149,7 +149,6 @@ class StandardAutoscaler:
def _update(self):
now = time.time()
# Throttle autoscaling updates to this interval to avoid exceeding
# rate limits on API calls.
if now - self.last_update_time < self.update_interval_s:
@@ -333,7 +332,7 @@ class StandardAutoscaler:
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
head_node_resources = static_nodes[head_ip]
head_node_resources = static_nodes.get(head_ip, {})
else:
head_node_resources = {}
@@ -482,11 +481,13 @@ class StandardAutoscaler:
# for legacy yamls.
self.resource_demand_scheduler.reset_config(
self.provider, self.available_node_types,
self.config["max_workers"], upscaling_speed)
self.config["max_workers"], self.config["head_node_type"],
upscaling_speed)
else:
self.resource_demand_scheduler = ResourceDemandScheduler(
self.provider, self.available_node_types,
self.config["max_workers"], upscaling_speed)
self.config["max_workers"], self.config["head_node_type"],
upscaling_speed)
except Exception as e:
if errors_fatal:
@@ -47,16 +47,19 @@ class ResourceDemandScheduler:
provider: NodeProvider,
node_types: Dict[NodeType, NodeTypeConfigDict],
max_workers: int,
head_node_type: NodeType,
upscaling_speed: float = 1) -> None:
self.provider = provider
self.node_types = copy.deepcopy(node_types)
self.max_workers = max_workers
self.head_node_type = head_node_type
self.upscaling_speed = upscaling_speed
def reset_config(self,
provider: NodeProvider,
node_types: Dict[NodeType, NodeTypeConfigDict],
max_workers: int,
head_node_type: NodeType,
upscaling_speed: float = 1) -> None:
"""Updates the class state variables.
@@ -89,6 +92,7 @@ class ResourceDemandScheduler:
self.provider = provider
self.node_types = copy.deepcopy(final_node_types)
self.max_workers = max_workers
self.head_node_type = head_node_type
self.upscaling_speed = upscaling_speed
def is_legacy_yaml(self,
@@ -153,7 +157,7 @@ class ResourceDemandScheduler:
adjusted_min_workers) = \
_add_min_workers_nodes(
node_resources, node_type_counts, self.node_types,
self.max_workers, ensure_min_cluster_size)
self.max_workers, self.head_node_type, ensure_min_cluster_size)
# Step 3: add nodes for strict spread groups
logger.info(f"Placement group demands: {pending_placement_groups}")
@@ -490,7 +494,7 @@ def _add_min_workers_nodes(
node_resources: List[ResourceDict],
node_type_counts: Dict[NodeType, int],
node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int,
ensure_min_cluster_size: List[ResourceDict]
head_node_type: NodeType, ensure_min_cluster_size: List[ResourceDict]
) -> (List[ResourceDict], Dict[NodeType, int], Dict[NodeType, int]):
"""Updates resource demands to respect the min_workers and
request_resources() constraints.
@@ -515,6 +519,9 @@ def _add_min_workers_nodes(
existing = node_type_counts.get(node_type, 0)
target = min(
config.get("min_workers", 0), config.get("max_workers", 0))
if node_type == head_node_type:
# Add 1 to account for head node.
target = target + 1
if existing < target:
total_nodes_to_add_dict[node_type] = target - existing
node_type_counts[node_type] = target
+62 -35
View File
@@ -12,7 +12,6 @@ import sys
from jsonschema.exceptions import ValidationError
import ray
import ray._private.services as services
from ray.autoscaler._private.util import prepare_config, validate_config
from ray.autoscaler._private import commands
from ray.autoscaler.sdk import get_docker_host_mount_location
@@ -559,8 +558,13 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(12)])
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
@@ -569,16 +573,16 @@ class AutoscalingTest(unittest.TestCase):
max_failures=0,
process_runner=runner,
update_interval_s=0)
self.waitForNodes(0)
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update the config to reduce the cluster size
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
autoscaler.update()
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Update the config to reduce the cluster size
new_config["min_workers"] = 10
@@ -587,12 +591,13 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
# Because one worker already started, the scheduler waits for its
# resources to be updated before it launches the remaining min_workers.
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
worker_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0]
lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {})
autoscaler.update()
self.waitForNodes(10)
self.waitForNodes(
10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testInitialWorkers(self):
"""initial_workers is deprecated, this tests that it is ignored."""
@@ -760,7 +765,10 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1)
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
head_ip = self.provider.non_terminated_node_ips(
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
@@ -964,8 +972,13 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm = LoadMetrics()
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
@@ -975,7 +988,7 @@ class AutoscalingTest(unittest.TestCase):
max_failures=0,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Write a corrupted config
self.write_config("asdf", call_prepare_config=False)
@@ -983,7 +996,10 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
time.sleep(0.1)
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 2
assert len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})) == 2
# New a good config again
new_config = SMALL_CLUSTER.copy()
@@ -996,7 +1012,8 @@ class AutoscalingTest(unittest.TestCase):
# resources to be updated before it launches the remaining min_workers.
lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {})
autoscaler.update()
self.waitForNodes(10)
self.waitForNodes(
10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testMaxFailures(self):
config_path = self.write_config(SMALL_CLUSTER)
@@ -1113,53 +1130,60 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
lm = LoadMetrics()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
}, 1)
lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
assert len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})) == 0
autoscaler.update()
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.update()
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 1
assert len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})) == 1
# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(
local_ip, {"CPU": 2}, {"CPU": 0}, {},
waiting_bundles=2 * [{
"CPU": 2
}]) # head
autoscaler.update()
lm.update(
"172.0.0.0", {"CPU": 2}, {"CPU": 0}, {},
"172.0.0.1", {"CPU": 2}, {"CPU": 0}, {},
waiting_bundles=2 * [{
"CPU": 2
}])
autoscaler.update()
self.waitForNodes(3)
self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
lm.update(
"172.0.0.1", {"CPU": 2}, {"CPU": 0}, {},
"172.0.0.2", {"CPU": 2}, {"CPU": 0}, {},
waiting_bundles=3 * [{
"CPU": 2
}])
autoscaler.update()
self.waitForNodes(5)
self.waitForNodes(5, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Holds steady when load is removed
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
assert autoscaler.pending_launches.value == 0
assert len(self.provider.non_terminated_nodes({})) == 5
assert len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})) == 5
# Scales down as nodes become unused
lm.last_used_time_by_ip["172.0.0.0"] = 0
lm.last_used_time_by_ip["172.0.0.1"] = 0
lm.last_used_time_by_ip["172.0.0.2"] = 0
autoscaler.update()
assert autoscaler.pending_launches.value == 0
@@ -1167,18 +1191,21 @@ class AutoscalingTest(unittest.TestCase):
# are not connected and hence we rely more on connected nodes for
# min_workers. When the "pending" nodes show up as connected,
# then we can terminate the ones connected before.
assert len(self.provider.non_terminated_nodes({})) == 4
lm.last_used_time_by_ip["172.0.0.2"] = 0
assert len(
self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})) == 4
lm.last_used_time_by_ip["172.0.0.3"] = 0
lm.last_used_time_by_ip["172.0.0.4"] = 0
autoscaler.update()
assert autoscaler.pending_launches.value == 0
# 2 nodes and not 1 because 1 is needed for min_worker and the other 1
# is still not connected.
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# when we connect it, we will see 1 node.
lm.last_used_time_by_ip["172.0.0.4"] = 0
lm.last_used_time_by_ip["172.0.0.5"] = 0
autoscaler.update()
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testTargetUtilizationFraction(self):
config = SMALL_CLUSTER.copy()
+254 -152
View File
@@ -256,19 +256,19 @@ def test_add_min_workers_nodes():
}
assert _add_min_workers_nodes([],
{},
types, None, None) == \
types, None, None, None) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
{"m2.large": 50, "gpu": 99999})
assert _add_min_workers_nodes([{"CPU": 2}]*5,
{"m2.large": 5},
types, None, None) == \
types, None, None, None) == \
([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999},
{"m2.large": 45, "gpu": 99999})
assert _add_min_workers_nodes([{"CPU": 2}]*60,
{"m2.large": 60},
types, None, None) == \
types, None, None, None) == \
([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999},
{"gpu": 99999})
@@ -279,7 +279,7 @@ def test_add_min_workers_nodes():
}] * 99999, {
"m2.large": 50,
"gpu": 99999
}, types, None, None) == ([{
}, types, None, None, None) == ([{
"CPU": 2
}] * 50 + [{
"GPU": 1
@@ -289,11 +289,11 @@ def test_add_min_workers_nodes():
}, {})
assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None,
None) == ([], {}, {})
None, None) == ([], {}, {})
types["gpubla"]["max_workers"] = 10
assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None,
None) == ([{
None, None) == ([{
"GPU": 1
}] * 10, {
"gpubla": 10
@@ -306,9 +306,13 @@ def test_get_nodes_to_launch_with_min_workers():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 2
scheduler = ResourceDemandScheduler(provider, new_types, 3)
scheduler = ResourceDemandScheduler(
provider, new_types, 3, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
}, 1)
nodes = provider.non_terminated_nodes({})
@@ -318,15 +322,19 @@ def test_get_nodes_to_launch_with_min_workers():
to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{
"GPU": 8
}], utilizations, [], {})
assert to_launch == {"p2.8xlarge": 1}
assert to_launch == {"p2.8xlarge": 2}
def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 2
scheduler = ResourceDemandScheduler(provider, new_types, 10)
scheduler = ResourceDemandScheduler(
provider, new_types, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"
}, 1)
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
nodes = provider.non_terminated_nodes({})
@@ -336,17 +344,18 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
utilizations = {ip: {"GPU": 8} for ip in ips}
# 1 more on the way
pending_nodes = {"p2.8xlarge": 1}
# requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge
# requires 3 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge
demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}]
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations, [], {})
assert to_launch == {"p2.xlarge": 1}
# 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand.
# 2 p2.8xlarge are running/pending. So we need 1 more p2.8xlarge only to
# meet the min_workers constraint and the demand.
# 3 min_workers + 1 head of p2.8xlarge covers the 3 p2.8xlarge + 1
# p2.xlarge demand. 3 p2.8xlarge are running/pending. So we need 1 more
# p2.8xlarge only tomeet the min_workers constraint and the demand.
new_types["p2.8xlarge"]["min_workers"] = 3
scheduler = ResourceDemandScheduler(provider, new_types, 10)
scheduler = ResourceDemandScheduler(
provider, new_types, 10, head_node_type="p2.8xlarge")
to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands,
utilizations, [], {})
# Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)]
@@ -355,7 +364,8 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing():
def test_get_nodes_to_launch_limits():
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 3)
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 3, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
@@ -372,7 +382,8 @@ def test_get_nodes_to_launch_limits():
def test_calculate_node_resources():
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
@@ -403,7 +414,8 @@ def test_request_resources_existing_usage():
"max_workers": 40,
},
}
scheduler = ResourceDemandScheduler(provider, TYPES, max_workers=100)
scheduler = ResourceDemandScheduler(
provider, TYPES, max_workers=100, head_node_type="empty_node")
# 5 nodes with 32 CPU and 8 GPU each
provider.create_node({}, {
@@ -475,7 +487,10 @@ def test_backlog_queue_impact_on_binpacking_time():
num_available_nodes, time_to_assert, demand_request_shape):
provider = MockProvider()
scheduler = ResourceDemandScheduler(
provider, new_types, max_workers=10000)
provider,
new_types,
max_workers=10000,
head_node_type="m4.16xlarge")
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "m4.16xlarge",
@@ -574,7 +589,8 @@ def test_backlog_queue_impact_on_binpacking_time():
class TestPlacementGroupScaling:
def test_strategies(self):
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
# At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is
@@ -616,7 +632,8 @@ class TestPlacementGroupScaling:
def test_many_strict_spreads(self):
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2)
# At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is
@@ -640,7 +657,8 @@ class TestPlacementGroupScaling:
def test_packing(self):
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, TYPES_A, 10)
scheduler = ResourceDemandScheduler(
provider, TYPES_A, 10, head_node_type="p2.8xlarge")
provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1)
# At this point our cluster has 1 p2.8xlarge instances (8 GPUs) and is
@@ -668,7 +686,8 @@ def test_get_concurrent_resource_demand_to_launch():
node_types["m4.large"]["min_workers"] = 2
node_types["m4.large"]["max_workers"] = 100
provider = MockProvider()
scheduler = ResourceDemandScheduler(provider, node_types, 200)
scheduler = ResourceDemandScheduler(
provider, node_types, 200, head_node_type="empty_node")
# Sanity check.
assert len(provider.non_terminated_nodes({})) == 0
@@ -776,7 +795,8 @@ def test_get_nodes_to_launch_max_launch_concurrency():
new_types["p2.8xlarge"]["min_workers"] = 4
new_types["p2.8xlarge"]["max_workers"] = 40
scheduler = ResourceDemandScheduler(provider, new_types, 30)
scheduler = ResourceDemandScheduler(
provider, new_types, 30, head_node_type=None)
to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {})
# Respects min_workers despite concurrency limitation.
@@ -847,7 +867,10 @@ def test_handle_legacy_cluster_config_yaml():
cluster_config = rewrite_legacy_yaml_to_available_node_types(
cluster_config)
scheduler = ResourceDemandScheduler(
provider, cluster_config["available_node_types"], 0)
provider,
cluster_config["available_node_types"],
0,
head_node_type=NODE_TYPE_LEGACY_HEAD)
provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD
@@ -1084,17 +1107,46 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(3)
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(3)
def testScaleUpMinSanityWithHeadNode(self):
"""Make sure when min_workers is used with head node it does not count
head_node in min_workers."""
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["empty_node"]["min_workers"] = 2
config["available_node_types"]["empty_node"]["max_workers"] = 2
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(3)
autoscaler.update()
self.waitForNodes(3)
def testPlacementGroup(self):
# Note this is mostly an integration test. See
@@ -1102,21 +1154,23 @@ class AutoscalingTest(unittest.TestCase):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["min_workers"] = 0
config["max_workers"] = 999
config["head_node_type"] = "m4.4xlarge"
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "m4.4xlarge"
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
lm = LoadMetrics(head_ip)
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: "head",
TAG_RAY_USER_NODE_TYPE: "m4.4xlarge"
}, 1)
head_ip = self.provider.non_terminated_node_ips({})[0]
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
@@ -1172,20 +1226,24 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(2)
assert len(self.provider.mock_nodes) == 2
self.waitForNodes(3)
assert len(self.provider.mock_nodes) == 3
assert {
self.provider.mock_nodes[0].node_type,
self.provider.mock_nodes[1].node_type
self.provider.mock_nodes[1].node_type,
self.provider.mock_nodes[2].node_type
} == {"p2.8xlarge", "m4.large"}
self.provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
@@ -1195,16 +1253,17 @@ class AutoscalingTest(unittest.TestCase):
TAG_RAY_USER_NODE_TYPE: "m4.16xlarge",
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
}, 2)
assert len(self.provider.non_terminated_nodes({})) == 6
assert len(self.provider.non_terminated_nodes({})) == 7
# Make sure that after idle_timeout_minutes we don't kill idle
# min workers.
for node_id in self.provider.non_terminated_nodes({}):
lm.last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(3)
cnt = 0
for id in self.provider.mock_nodes:
# [1:] skips the head node.
for id in list(self.provider.mock_nodes.keys())[1:]:
if self.provider.mock_nodes[id].state == "running" or \
self.provider.mock_nodes[id].state == "pending":
assert self.provider.mock_nodes[id].node_type in {
@@ -1218,6 +1277,7 @@ class AutoscalingTest(unittest.TestCase):
# Commenting out this line causes the test case to fail?!?!
config["min_workers"] = 0
config["target_utilization_fraction"] = 1.0
config["head_node_type"] = "p2.xlarge"
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {
@@ -1295,28 +1355,32 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 1}])
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
autoscaler.request_resources([{"CPU": 32}] * 4)
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[2].node_type == "m4.16xlarge"
self.waitForNodes(5)
assert self.provider.mock_nodes[3].node_type == "m4.16xlarge"
assert self.provider.mock_nodes[4].node_type == "m4.16xlarge"
def testResourcePassing(self):
config = MULTI_WORKER_CLUSTER.copy()
@@ -1326,23 +1390,27 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(0)
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
# TODO (Alex): Autoscaler creates the node during one update then
# starts the updater in the enxt update. The sleep is largely
@@ -1353,11 +1421,11 @@ class AutoscalingTest(unittest.TestCase):
# These checks are done separately because we have no guarantees on the
# order the dict is serialized in.
runner.assert_has_call("172.0.0.0", "RAY_OVERRIDE_RESOURCES=")
runner.assert_has_call("172.0.0.0", "\"CPU\":2")
runner.assert_has_call("172.0.0.1", "RAY_OVERRIDE_RESOURCES=")
runner.assert_has_call("172.0.0.1", "\"CPU\":32")
runner.assert_has_call("172.0.0.1", "\"GPU\":8")
runner.assert_has_call("172.0.0.1", "\"CPU\":2")
runner.assert_has_call("172.0.0.2", "RAY_OVERRIDE_RESOURCES=")
runner.assert_has_call("172.0.0.2", "\"CPU\":32")
runner.assert_has_call("172.0.0.2", "\"GPU\":8")
def testScaleUpLoadMetrics(self):
config = MULTI_WORKER_CLUSTER.copy()
@@ -1366,16 +1434,20 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(0)
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.update()
lm.update(
"1.2.3.4", {}, {}, {},
@@ -1386,10 +1458,10 @@ class AutoscalingTest(unittest.TestCase):
"CPU": 16
}])
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
nodes = {
self.provider.mock_nodes[0].node_type,
self.provider.mock_nodes[1].node_type
self.provider.mock_nodes[1].node_type,
self.provider.mock_nodes[2].node_type
}
assert nodes == {"p2.xlarge", "m4.4xlarge"}
@@ -1407,40 +1479,46 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 1}])
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.xlarge"
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[3].node_type == "p2.xlarge"
autoscaler.update()
sleep(0.1)
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"new_worker_setup_command")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"setup_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"worker_setup_cmd")
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"new_worker_initialization_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"setup_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"worker_setup_cmd")
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
"new_worker_initialization_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip,
"init_cmd")
def testDockerWorkers(self):
@@ -1461,28 +1539,32 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 1}])
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.xlarge"
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[3].node_type == "p2.xlarge"
autoscaler.update()
# Fill up m4, p2.8, p2 and request 2 more CPUs
autoscaler.request_resources([{
@@ -1495,33 +1577,33 @@ class AutoscalingTest(unittest.TestCase):
"CPU": 2
}])
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[3].node_type == "m4.16xlarge"
self.waitForNodes(5)
assert self.provider.mock_nodes[4].node_type == "m4.16xlarge"
autoscaler.update()
sleep(0.1)
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"p2.8x-run-options")
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"p2.8x_image:latest")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"default-image:nightly")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"standard-run-options")
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
"p2x_image:nightly")
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
"standard-run-options")
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip,
"p2.8x-run-options")
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[4].internal_ip,
"default-image:nightly")
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
runner.assert_has_call(self.provider.mock_nodes[4].internal_ip,
"standard-run-options")
runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip,
runner.assert_not_has_call(self.provider.mock_nodes[4].internal_ip,
"p2.8x-run-options")
runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip,
runner.assert_not_has_call(self.provider.mock_nodes[4].internal_ip,
"p2x_image:nightly")
def testUpdateConfig(self):
@@ -1531,21 +1613,25 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
config["available_node_types"]["m4.large"]["min_workers"] = 0
config["available_node_types"]["m4.large"]["node_config"][
"field_changed"] = 1
config_path = self.write_config(config)
autoscaler.update()
self.waitForNodes(0)
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testEmptyDocker(self):
config = MULTI_WORKER_CLUSTER.copy()
@@ -1555,23 +1641,27 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
LoadMetrics("172.0.0.0"),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 1}])
assert len(self.provider.non_terminated_nodes({})) == 1
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
assert self.provider.mock_nodes[1].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.8xlarge"
def testRequestResourcesIdleTimeout(self):
"""Test request_resources() with and without idle timeout."""
@@ -1599,8 +1689,12 @@ class AutoscalingTest(unittest.TestCase):
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
autoscaler = StandardAutoscaler(
config_path,
lm,
@@ -1608,14 +1702,14 @@ class AutoscalingTest(unittest.TestCase):
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(0)
self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.update()
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
non_terminated_nodes = autoscaler.provider.non_terminated_nodes({})
assert len(non_terminated_nodes) == 1
node_id = non_terminated_nodes[0]
node_ip = autoscaler.provider.non_terminated_node_ips({})[0]
assert len(non_terminated_nodes) == 2
node_id = non_terminated_nodes[1]
node_ip = autoscaler.provider.non_terminated_node_ips({})[1]
# A hack to check if the node was terminated when it shouldn't.
autoscaler.provider.mock_nodes[node_id].state = "unterminatable"
@@ -1629,10 +1723,10 @@ class AutoscalingTest(unittest.TestCase):
}])
autoscaler.update()
# this fits on request_resources()!
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2)
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
lm.update(
node_ip,
@@ -1642,7 +1736,7 @@ class AutoscalingTest(unittest.TestCase):
"WORKER": 1.0
}])
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
lm.update(
node_ip,
config["available_node_types"]["def_worker"]["resources"],
@@ -1653,13 +1747,13 @@ class AutoscalingTest(unittest.TestCase):
}])
autoscaler.update()
# Still 2 as the second node did not show up a heart beat.
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# If node {node_id} was terminated any time then it's state will be set
# to terminated.
assert autoscaler.provider.mock_nodes[
node_id].state == "unterminatable"
lm.update(
"172.0.0.1",
"172.0.0.2",
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"], {},
waiting_bundles=[{
@@ -1669,7 +1763,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
# Now it is 1 because it showed up in last used (heart beat).
# The remaining one is 127.0.0.1.
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testRequestResourcesRaceConditionsLong(self):
"""Test request_resources(), race conditions & demands/min_workers.
@@ -1704,7 +1798,11 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
config_path,
lm,
@@ -1714,11 +1812,11 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.update()
# 1 min worker for both min_worker and request_resources()
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
non_terminated_nodes = autoscaler.provider.non_terminated_nodes({})
assert len(non_terminated_nodes) == 1
node_id = non_terminated_nodes[0]
node_ip = autoscaler.provider.non_terminated_node_ips({})[0]
assert len(non_terminated_nodes) == 2
node_id = non_terminated_nodes[1]
node_ip = autoscaler.provider.non_terminated_node_ips({})[1]
# A hack to check if the node was terminated when it shouldn't.
autoscaler.provider.mock_nodes[node_id].state = "unterminatable"
@@ -1733,12 +1831,12 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2)
autoscaler.update()
# 2 requested_resource, 1 min worker, 1 free node -> 2 nodes total
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}])
autoscaler.update()
# Still 2 because the second one is not connected and hence
# request_resources occupies the connected node.
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 3)
lm.update(
node_ip,
@@ -1748,14 +1846,14 @@ class AutoscalingTest(unittest.TestCase):
"WORKER": 1.0
}] * 3)
autoscaler.update()
self.waitForNodes(3)
self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
autoscaler.request_resources([])
lm.update("172.0.0.1",
lm.update("172.0.0.2",
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"],
{})
lm.update("172.0.0.2",
lm.update("172.0.0.3",
config["available_node_types"]["def_worker"]["resources"],
config["available_node_types"]["def_worker"]["resources"],
{})
@@ -1763,7 +1861,7 @@ class AutoscalingTest(unittest.TestCase):
config["available_node_types"]["def_worker"]["resources"],
{}, {})
autoscaler.update()
self.waitForNodes(1)
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# If node {node_id} was terminated any time then it's state will be set
# to terminated.
assert autoscaler.provider.mock_nodes[
@@ -1799,7 +1897,11 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
runner = MockProcessRunner()
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
lm = LoadMetrics()
self.provider.create_node({}, {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: "empty_node"
}, 1)
lm = LoadMetrics("172.0.0.0")
autoscaler = StandardAutoscaler(
config_path,
lm,
@@ -1809,7 +1911,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.request_resources([{"CPU": 2, "WORKER": 1.0}] * 2)
autoscaler.update()
# 2 min worker for both min_worker and request_resources(), not 3.
self.waitForNodes(2)
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def testRequestResourcesRaceConditionWithResourceDemands(self):
"""Test request_resources() with resource_demands.