diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 2e55ed151..64167b4cb 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -149,7 +149,6 @@ class StandardAutoscaler: def _update(self): now = time.time() - # Throttle autoscaling updates to this interval to avoid exceeding # rate limits on API calls. if now - self.last_update_time < self.update_interval_s: @@ -333,7 +332,7 @@ class StandardAutoscaler: NodeIP, ResourceDict] = \ self.load_metrics.get_static_node_resources_by_ip() - head_node_resources = static_nodes[head_ip] + head_node_resources = static_nodes.get(head_ip, {}) else: head_node_resources = {} @@ -482,11 +481,13 @@ class StandardAutoscaler: # for legacy yamls. self.resource_demand_scheduler.reset_config( self.provider, self.available_node_types, - self.config["max_workers"], upscaling_speed) + self.config["max_workers"], self.config["head_node_type"], + upscaling_speed) else: self.resource_demand_scheduler = ResourceDemandScheduler( self.provider, self.available_node_types, - self.config["max_workers"], upscaling_speed) + self.config["max_workers"], self.config["head_node_type"], + upscaling_speed) except Exception as e: if errors_fatal: diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 6bbae1762..f3ec607df 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -47,16 +47,19 @@ class ResourceDemandScheduler: provider: NodeProvider, node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int, + head_node_type: NodeType, upscaling_speed: float = 1) -> None: self.provider = provider self.node_types = copy.deepcopy(node_types) self.max_workers = max_workers + self.head_node_type = head_node_type self.upscaling_speed = upscaling_speed def reset_config(self, provider: NodeProvider, node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int, + head_node_type: NodeType, upscaling_speed: float = 1) -> None: """Updates the class state variables. @@ -89,6 +92,7 @@ class ResourceDemandScheduler: self.provider = provider self.node_types = copy.deepcopy(final_node_types) self.max_workers = max_workers + self.head_node_type = head_node_type self.upscaling_speed = upscaling_speed def is_legacy_yaml(self, @@ -153,7 +157,7 @@ class ResourceDemandScheduler: adjusted_min_workers) = \ _add_min_workers_nodes( node_resources, node_type_counts, self.node_types, - self.max_workers, ensure_min_cluster_size) + self.max_workers, self.head_node_type, ensure_min_cluster_size) # Step 3: add nodes for strict spread groups logger.info(f"Placement group demands: {pending_placement_groups}") @@ -490,7 +494,7 @@ def _add_min_workers_nodes( node_resources: List[ResourceDict], node_type_counts: Dict[NodeType, int], node_types: Dict[NodeType, NodeTypeConfigDict], max_workers: int, - ensure_min_cluster_size: List[ResourceDict] + head_node_type: NodeType, ensure_min_cluster_size: List[ResourceDict] ) -> (List[ResourceDict], Dict[NodeType, int], Dict[NodeType, int]): """Updates resource demands to respect the min_workers and request_resources() constraints. @@ -515,6 +519,9 @@ def _add_min_workers_nodes( existing = node_type_counts.get(node_type, 0) target = min( config.get("min_workers", 0), config.get("max_workers", 0)) + if node_type == head_node_type: + # Add 1 to account for head node. + target = target + 1 if existing < target: total_nodes_to_add_dict[node_type] = target - existing node_type_counts[node_type] = target diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 7ef1e9c5b..72f361fe2 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -12,7 +12,6 @@ import sys from jsonschema.exceptions import ValidationError import ray -import ray._private.services as services from ray.autoscaler._private.util import prepare_config, validate_config from ray.autoscaler._private import commands from ray.autoscaler.sdk import get_docker_host_mount_location @@ -559,8 +558,13 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)]) + runner.respond_to_call("json .Config.Env", ["[]" for i in range(12)]) lm = LoadMetrics() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) + lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) autoscaler = StandardAutoscaler( config_path, lm, @@ -569,16 +573,16 @@ class AutoscalingTest(unittest.TestCase): max_failures=0, process_runner=runner, update_interval_s=0) - self.waitForNodes(0) + self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Update the config to reduce the cluster size new_config = SMALL_CLUSTER.copy() new_config["max_workers"] = 1 self.write_config(new_config) autoscaler.update() - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Update the config to reduce the cluster size new_config["min_workers"] = 10 @@ -587,12 +591,13 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() # Because one worker already started, the scheduler waits for its # resources to be updated before it launches the remaining min_workers. - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) worker_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}, )[0] lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {}) autoscaler.update() - self.waitForNodes(10) + self.waitForNodes( + 10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testInitialWorkers(self): """initial_workers is deprecated, this tests that it is ignored.""" @@ -760,7 +765,10 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() - self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "head", + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) head_ip = self.provider.non_terminated_node_ips( tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0] @@ -964,8 +972,13 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(10)]) + runner.respond_to_call("json .Config.Env", ["[]" for i in range(11)]) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) lm = LoadMetrics() + lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) autoscaler = StandardAutoscaler( config_path, lm, @@ -975,7 +988,7 @@ class AutoscalingTest(unittest.TestCase): max_failures=0, update_interval_s=0) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Write a corrupted config self.write_config("asdf", call_prepare_config=False) @@ -983,7 +996,10 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() time.sleep(0.1) assert autoscaler.pending_launches.value == 0 - assert len(self.provider.non_terminated_nodes({})) == 2 + assert len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + })) == 2 # New a good config again new_config = SMALL_CLUSTER.copy() @@ -996,7 +1012,8 @@ class AutoscalingTest(unittest.TestCase): # resources to be updated before it launches the remaining min_workers. lm.update(worker_ip, {"CPU": 1}, {"CPU": 1}, {}) autoscaler.update() - self.waitForNodes(10) + self.waitForNodes( + 10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testMaxFailures(self): config_path = self.write_config(SMALL_CLUSTER) @@ -1113,53 +1130,60 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() lm = LoadMetrics() runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)]) + runner.respond_to_call("json .Config.Env", ["[]" for i in range(6)]) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD + }, 1) + lm.update("172.0.0.0", {"CPU": 1}, {"CPU": 0}, {}) autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 + assert len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + })) == 0 autoscaler.update() - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.update() assert autoscaler.pending_launches.value == 0 - assert len(self.provider.non_terminated_nodes({})) == 1 + assert len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + })) == 1 - # Scales up as nodes are reported as used - local_ip = services.get_node_ip_address() - lm.update( - local_ip, {"CPU": 2}, {"CPU": 0}, {}, - waiting_bundles=2 * [{ - "CPU": 2 - }]) # head autoscaler.update() lm.update( - "172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}, + "172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}, waiting_bundles=2 * [{ "CPU": 2 }]) autoscaler.update() - self.waitForNodes(3) + self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) lm.update( - "172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}, + "172.0.0.2", {"CPU": 2}, {"CPU": 0}, {}, waiting_bundles=3 * [{ "CPU": 2 }]) autoscaler.update() - self.waitForNodes(5) + self.waitForNodes(5, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # Holds steady when load is removed - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {}) lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}) + lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 2}, {}) autoscaler.update() assert autoscaler.pending_launches.value == 0 - assert len(self.provider.non_terminated_nodes({})) == 5 + assert len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + })) == 5 # Scales down as nodes become unused - lm.last_used_time_by_ip["172.0.0.0"] = 0 lm.last_used_time_by_ip["172.0.0.1"] = 0 + lm.last_used_time_by_ip["172.0.0.2"] = 0 autoscaler.update() assert autoscaler.pending_launches.value == 0 @@ -1167,18 +1191,21 @@ class AutoscalingTest(unittest.TestCase): # are not connected and hence we rely more on connected nodes for # min_workers. When the "pending" nodes show up as connected, # then we can terminate the ones connected before. - assert len(self.provider.non_terminated_nodes({})) == 4 - lm.last_used_time_by_ip["172.0.0.2"] = 0 + assert len( + self.provider.non_terminated_nodes({ + TAG_RAY_NODE_KIND: NODE_KIND_WORKER + })) == 4 lm.last_used_time_by_ip["172.0.0.3"] = 0 + lm.last_used_time_by_ip["172.0.0.4"] = 0 autoscaler.update() assert autoscaler.pending_launches.value == 0 # 2 nodes and not 1 because 1 is needed for min_worker and the other 1 # is still not connected. - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # when we connect it, we will see 1 node. - lm.last_used_time_by_ip["172.0.0.4"] = 0 + lm.last_used_time_by_ip["172.0.0.5"] = 0 autoscaler.update() - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testTargetUtilizationFraction(self): config = SMALL_CLUSTER.copy() diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 50d899af0..067b5f53d 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -256,19 +256,19 @@ def test_add_min_workers_nodes(): } assert _add_min_workers_nodes([], {}, - types, None, None) == \ + types, None, None, None) == \ ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, {"m2.large": 50, "gpu": 99999}) assert _add_min_workers_nodes([{"CPU": 2}]*5, {"m2.large": 5}, - types, None, None) == \ + types, None, None, None) == \ ([{"CPU": 2}]*50+[{"GPU": 1}]*99999, {"m2.large": 50, "gpu": 99999}, {"m2.large": 45, "gpu": 99999}) assert _add_min_workers_nodes([{"CPU": 2}]*60, {"m2.large": 60}, - types, None, None) == \ + types, None, None, None) == \ ([{"CPU": 2}]*60+[{"GPU": 1}]*99999, {"m2.large": 60, "gpu": 99999}, {"gpu": 99999}) @@ -279,7 +279,7 @@ def test_add_min_workers_nodes(): }] * 99999, { "m2.large": 50, "gpu": 99999 - }, types, None, None) == ([{ + }, types, None, None, None) == ([{ "CPU": 2 }] * 50 + [{ "GPU": 1 @@ -289,11 +289,11 @@ def test_add_min_workers_nodes(): }, {}) assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None, - None) == ([], {}, {}) + None, None) == ([], {}, {}) types["gpubla"]["max_workers"] = 10 assert _add_min_workers_nodes([], {}, {"gpubla": types["gpubla"]}, None, - None) == ([{ + None, None) == ([{ "GPU": 1 }] * 10, { "gpubla": 10 @@ -306,9 +306,13 @@ def test_get_nodes_to_launch_with_min_workers(): provider = MockProvider() new_types = copy.deepcopy(TYPES_A) new_types["p2.8xlarge"]["min_workers"] = 2 - scheduler = ResourceDemandScheduler(provider, new_types, 3) + scheduler = ResourceDemandScheduler( + provider, new_types, 3, head_node_type="p2.8xlarge") - provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_KIND: NODE_KIND_HEAD + }, 1) nodes = provider.non_terminated_nodes({}) @@ -318,15 +322,19 @@ def test_get_nodes_to_launch_with_min_workers(): to_launch = scheduler.get_nodes_to_launch(nodes, {}, [{ "GPU": 8 }], utilizations, [], {}) - assert to_launch == {"p2.8xlarge": 1} + assert to_launch == {"p2.8xlarge": 2} def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): provider = MockProvider() new_types = copy.deepcopy(TYPES_A) new_types["p2.8xlarge"]["min_workers"] = 2 - scheduler = ResourceDemandScheduler(provider, new_types, 10) - + scheduler = ResourceDemandScheduler( + provider, new_types, 10, head_node_type="p2.8xlarge") + provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge" + }, 1) provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) nodes = provider.non_terminated_nodes({}) @@ -336,17 +344,18 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): utilizations = {ip: {"GPU": 8} for ip in ips} # 1 more on the way pending_nodes = {"p2.8xlarge": 1} - # requires 2 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge + # requires 3 p2.8xls (only 2 are in cluster/pending) and 1 p2.xlarge demands = [{"GPU": 8}] * (len(utilizations) + 1) + [{"GPU": 1}] to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, utilizations, [], {}) assert to_launch == {"p2.xlarge": 1} - # 3 min_workers of p2.8xlarge covers the 2 p2.8xlarge + 1 p2.xlarge demand. - # 2 p2.8xlarge are running/pending. So we need 1 more p2.8xlarge only to - # meet the min_workers constraint and the demand. + # 3 min_workers + 1 head of p2.8xlarge covers the 3 p2.8xlarge + 1 + # p2.xlarge demand. 3 p2.8xlarge are running/pending. So we need 1 more + # p2.8xlarge only tomeet the min_workers constraint and the demand. new_types["p2.8xlarge"]["min_workers"] = 3 - scheduler = ResourceDemandScheduler(provider, new_types, 10) + scheduler = ResourceDemandScheduler( + provider, new_types, 10, head_node_type="p2.8xlarge") to_launch = scheduler.get_nodes_to_launch(nodes, pending_nodes, demands, utilizations, [], {}) # Make sure it does not return [("p2.8xlarge", 1), ("p2.xlarge", 1)] @@ -355,7 +364,8 @@ def test_get_nodes_to_launch_with_min_workers_and_bin_packing(): def test_get_nodes_to_launch_limits(): provider = MockProvider() - scheduler = ResourceDemandScheduler(provider, TYPES_A, 3) + scheduler = ResourceDemandScheduler( + provider, TYPES_A, 3, head_node_type="p2.8xlarge") provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) @@ -372,7 +382,8 @@ def test_get_nodes_to_launch_limits(): def test_calculate_node_resources(): provider = MockProvider() - scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + scheduler = ResourceDemandScheduler( + provider, TYPES_A, 10, head_node_type="p2.8xlarge") provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) @@ -403,7 +414,8 @@ def test_request_resources_existing_usage(): "max_workers": 40, }, } - scheduler = ResourceDemandScheduler(provider, TYPES, max_workers=100) + scheduler = ResourceDemandScheduler( + provider, TYPES, max_workers=100, head_node_type="empty_node") # 5 nodes with 32 CPU and 8 GPU each provider.create_node({}, { @@ -475,7 +487,10 @@ def test_backlog_queue_impact_on_binpacking_time(): num_available_nodes, time_to_assert, demand_request_shape): provider = MockProvider() scheduler = ResourceDemandScheduler( - provider, new_types, max_workers=10000) + provider, + new_types, + max_workers=10000, + head_node_type="m4.16xlarge") provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "m4.16xlarge", @@ -574,7 +589,8 @@ def test_backlog_queue_impact_on_binpacking_time(): class TestPlacementGroupScaling: def test_strategies(self): provider = MockProvider() - scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + scheduler = ResourceDemandScheduler( + provider, TYPES_A, 10, head_node_type="p2.8xlarge") provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) # At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is @@ -616,7 +632,8 @@ class TestPlacementGroupScaling: def test_many_strict_spreads(self): provider = MockProvider() - scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + scheduler = ResourceDemandScheduler( + provider, TYPES_A, 10, head_node_type="p2.8xlarge") provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 2) # At this point our cluster has 2 p2.8xlarge instances (16 GPUs) and is @@ -640,7 +657,8 @@ class TestPlacementGroupScaling: def test_packing(self): provider = MockProvider() - scheduler = ResourceDemandScheduler(provider, TYPES_A, 10) + scheduler = ResourceDemandScheduler( + provider, TYPES_A, 10, head_node_type="p2.8xlarge") provider.create_node({}, {TAG_RAY_USER_NODE_TYPE: "p2.8xlarge"}, 1) # At this point our cluster has 1 p2.8xlarge instances (8 GPUs) and is @@ -668,7 +686,8 @@ def test_get_concurrent_resource_demand_to_launch(): node_types["m4.large"]["min_workers"] = 2 node_types["m4.large"]["max_workers"] = 100 provider = MockProvider() - scheduler = ResourceDemandScheduler(provider, node_types, 200) + scheduler = ResourceDemandScheduler( + provider, node_types, 200, head_node_type="empty_node") # Sanity check. assert len(provider.non_terminated_nodes({})) == 0 @@ -776,7 +795,8 @@ def test_get_nodes_to_launch_max_launch_concurrency(): new_types["p2.8xlarge"]["min_workers"] = 4 new_types["p2.8xlarge"]["max_workers"] = 40 - scheduler = ResourceDemandScheduler(provider, new_types, 30) + scheduler = ResourceDemandScheduler( + provider, new_types, 30, head_node_type=None) to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {}) # Respects min_workers despite concurrency limitation. @@ -847,7 +867,10 @@ def test_handle_legacy_cluster_config_yaml(): cluster_config = rewrite_legacy_yaml_to_available_node_types( cluster_config) scheduler = ResourceDemandScheduler( - provider, cluster_config["available_node_types"], 0) + provider, + cluster_config["available_node_types"], + 0, + head_node_type=NODE_TYPE_LEGACY_HEAD) provider.create_node({}, { TAG_RAY_NODE_KIND: NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE: NODE_TYPE_LEGACY_HEAD @@ -1084,17 +1107,46 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + LoadMetrics("172.0.0.0"), max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(3) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(3) + + def testScaleUpMinSanityWithHeadNode(self): + """Make sure when min_workers is used with head node it does not count + head_node in min_workers.""" + config = copy.deepcopy(MULTI_WORKER_CLUSTER) + config["available_node_types"]["empty_node"]["min_workers"] = 2 + config["available_node_types"]["empty_node"]["max_workers"] = 2 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics("172.0.0.0"), + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 1 + autoscaler.update() + self.waitForNodes(3) + autoscaler.update() + self.waitForNodes(3) def testPlacementGroup(self): # Note this is mostly an integration test. See @@ -1102,21 +1154,23 @@ class AutoscalingTest(unittest.TestCase): config = copy.deepcopy(MULTI_WORKER_CLUSTER) config["min_workers"] = 0 config["max_workers"] = 999 + config["head_node_type"] = "m4.4xlarge" config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - lm = LoadMetrics() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: "head", + TAG_RAY_USER_NODE_TYPE: "m4.4xlarge" + }, 1) + head_ip = self.provider.non_terminated_node_ips({})[0] + lm = LoadMetrics(head_ip) autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, process_runner=runner, update_interval_s=0) - self.provider.create_node({}, { - TAG_RAY_NODE_KIND: "head", - TAG_RAY_USER_NODE_TYPE: "m4.4xlarge" - }, 1) - head_ip = self.provider.non_terminated_node_ips({})[0] + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) @@ -1172,20 +1226,24 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - lm = LoadMetrics() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + lm = LoadMetrics("172.0.0.0") autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() - self.waitForNodes(2) - assert len(self.provider.mock_nodes) == 2 + self.waitForNodes(3) + assert len(self.provider.mock_nodes) == 3 assert { - self.provider.mock_nodes[0].node_type, - self.provider.mock_nodes[1].node_type + self.provider.mock_nodes[1].node_type, + self.provider.mock_nodes[2].node_type } == {"p2.8xlarge", "m4.large"} self.provider.create_node({}, { TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", @@ -1195,16 +1253,17 @@ class AutoscalingTest(unittest.TestCase): TAG_RAY_USER_NODE_TYPE: "m4.16xlarge", TAG_RAY_NODE_KIND: NODE_KIND_WORKER }, 2) - assert len(self.provider.non_terminated_nodes({})) == 6 + assert len(self.provider.non_terminated_nodes({})) == 7 # Make sure that after idle_timeout_minutes we don't kill idle # min workers. for node_id in self.provider.non_terminated_nodes({}): lm.last_used_time_by_ip[self.provider.internal_ip(node_id)] = -60 autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(3) cnt = 0 - for id in self.provider.mock_nodes: + # [1:] skips the head node. + for id in list(self.provider.mock_nodes.keys())[1:]: if self.provider.mock_nodes[id].state == "running" or \ self.provider.mock_nodes[id].state == "pending": assert self.provider.mock_nodes[id].node_type in { @@ -1218,6 +1277,7 @@ class AutoscalingTest(unittest.TestCase): # Commenting out this line causes the test case to fail?!?! config["min_workers"] = 0 config["target_utilization_fraction"] = 1.0 + config["head_node_type"] = "p2.xlarge" config_path = self.write_config(config) self.provider = MockProvider() self.provider.create_node({}, { @@ -1295,28 +1355,32 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + LoadMetrics("172.0.0.0"), max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 - autoscaler.update() - self.waitForNodes(0) - autoscaler.request_resources([{"CPU": 1}]) + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - assert self.provider.mock_nodes[0].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) - assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + assert self.provider.mock_nodes[1].node_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) + autoscaler.update() + self.waitForNodes(3) + assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" autoscaler.request_resources([{"CPU": 32}] * 4) autoscaler.update() - self.waitForNodes(4) - assert self.provider.mock_nodes[2].node_type == "m4.16xlarge" + self.waitForNodes(5) assert self.provider.mock_nodes[3].node_type == "m4.16xlarge" + assert self.provider.mock_nodes[4].node_type == "m4.16xlarge" def testResourcePassing(self): config = MULTI_WORKER_CLUSTER.copy() @@ -1326,23 +1390,27 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + LoadMetrics("172.0.0.0"), max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() - self.waitForNodes(0) + self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() - self.waitForNodes(1) - assert self.provider.mock_nodes[0].node_type == "m4.large" + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + assert self.provider.mock_nodes[1].node_type == "m4.large" autoscaler.request_resources([{"GPU": 8}]) autoscaler.update() - self.waitForNodes(2) - assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" # TODO (Alex): Autoscaler creates the node during one update then # starts the updater in the enxt update. The sleep is largely @@ -1353,11 +1421,11 @@ class AutoscalingTest(unittest.TestCase): # These checks are done separately because we have no guarantees on the # order the dict is serialized in. - runner.assert_has_call("172.0.0.0", "RAY_OVERRIDE_RESOURCES=") - runner.assert_has_call("172.0.0.0", "\"CPU\":2") runner.assert_has_call("172.0.0.1", "RAY_OVERRIDE_RESOURCES=") - runner.assert_has_call("172.0.0.1", "\"CPU\":32") - runner.assert_has_call("172.0.0.1", "\"GPU\":8") + runner.assert_has_call("172.0.0.1", "\"CPU\":2") + runner.assert_has_call("172.0.0.2", "RAY_OVERRIDE_RESOURCES=") + runner.assert_has_call("172.0.0.2", "\"CPU\":32") + runner.assert_has_call("172.0.0.2", "\"GPU\":8") def testScaleUpLoadMetrics(self): config = MULTI_WORKER_CLUSTER.copy() @@ -1366,16 +1434,20 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - lm = LoadMetrics() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + lm = LoadMetrics("172.0.0.0") autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() - self.waitForNodes(0) + self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.update() lm.update( "1.2.3.4", {}, {}, {}, @@ -1386,10 +1458,10 @@ class AutoscalingTest(unittest.TestCase): "CPU": 16 }]) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) nodes = { - self.provider.mock_nodes[0].node_type, - self.provider.mock_nodes[1].node_type + self.provider.mock_nodes[1].node_type, + self.provider.mock_nodes[2].node_type } assert nodes == {"p2.xlarge", "m4.4xlarge"} @@ -1407,40 +1479,46 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) + runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)]) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + lm = LoadMetrics("172.0.0.0") + lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {}) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + lm, max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 - autoscaler.update() - self.waitForNodes(0) - autoscaler.request_resources([{"CPU": 1}]) + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - assert self.provider.mock_nodes[0].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) - assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" - autoscaler.request_resources([{"GPU": 1}] * 9) + assert self.provider.mock_nodes[1].node_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) autoscaler.update() self.waitForNodes(3) - assert self.provider.mock_nodes[2].node_type == "p2.xlarge" + assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" + autoscaler.request_resources([{"GPU": 1}] * 9) + autoscaler.update() + self.waitForNodes(4) + assert self.provider.mock_nodes[3].node_type == "p2.xlarge" autoscaler.update() sleep(0.1) - runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, "new_worker_setup_command") - runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, - "setup_cmd") - runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, - "worker_setup_cmd") - runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, - "new_worker_initialization_cmd") runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, + "setup_cmd") + runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, + "worker_setup_cmd") + runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, + "new_worker_initialization_cmd") + runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip, "init_cmd") def testDockerWorkers(self): @@ -1461,28 +1539,32 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)]) + runner.respond_to_call("json .Config.Env", ["[]" for i in range(5)]) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + LoadMetrics("172.0.0.0"), max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 - autoscaler.update() - self.waitForNodes(0) - autoscaler.request_resources([{"CPU": 1}]) + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - assert self.provider.mock_nodes[0].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) - assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" - autoscaler.request_resources([{"GPU": 1}] * 9) + assert self.provider.mock_nodes[1].node_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) autoscaler.update() self.waitForNodes(3) - assert self.provider.mock_nodes[2].node_type == "p2.xlarge" + assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" + autoscaler.request_resources([{"GPU": 1}] * 9) + autoscaler.update() + self.waitForNodes(4) + assert self.provider.mock_nodes[3].node_type == "p2.xlarge" autoscaler.update() # Fill up m4, p2.8, p2 and request 2 more CPUs autoscaler.request_resources([{ @@ -1495,33 +1577,33 @@ class AutoscalingTest(unittest.TestCase): "CPU": 2 }]) autoscaler.update() - self.waitForNodes(4) - assert self.provider.mock_nodes[3].node_type == "m4.16xlarge" + self.waitForNodes(5) + assert self.provider.mock_nodes[4].node_type == "m4.16xlarge" autoscaler.update() sleep(0.1) - runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, "p2.8x-run-options") - runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, "p2.8x_image:latest") - runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, + runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, "default-image:nightly") - runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, + runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, "standard-run-options") - runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, "p2x_image:nightly") - runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, "standard-run-options") - runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, + runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip, "p2.8x-run-options") - runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[4].internal_ip, "default-image:nightly") - runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, + runner.assert_has_call(self.provider.mock_nodes[4].internal_ip, "standard-run-options") - runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip, + runner.assert_not_has_call(self.provider.mock_nodes[4].internal_ip, "p2.8x-run-options") - runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip, + runner.assert_not_has_call(self.provider.mock_nodes[4].internal_ip, "p2x_image:nightly") def testUpdateConfig(self): @@ -1531,21 +1613,25 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + LoadMetrics("172.0.0.0"), max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) config["available_node_types"]["m4.large"]["min_workers"] = 0 config["available_node_types"]["m4.large"]["node_config"][ "field_changed"] = 1 config_path = self.write_config(config) autoscaler.update() - self.waitForNodes(0) + self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testEmptyDocker(self): config = MULTI_WORKER_CLUSTER.copy() @@ -1555,23 +1641,27 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) autoscaler = StandardAutoscaler( config_path, - LoadMetrics(), + LoadMetrics("172.0.0.0"), max_failures=0, process_runner=runner, update_interval_s=0) - assert len(self.provider.non_terminated_nodes({})) == 0 - autoscaler.update() - self.waitForNodes(0) - autoscaler.request_resources([{"CPU": 1}]) + assert len(self.provider.non_terminated_nodes({})) == 1 autoscaler.update() self.waitForNodes(1) - assert self.provider.mock_nodes[0].node_type == "m4.large" - autoscaler.request_resources([{"GPU": 8}]) + autoscaler.request_resources([{"CPU": 1}]) autoscaler.update() self.waitForNodes(2) - assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + assert self.provider.mock_nodes[1].node_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) + autoscaler.update() + self.waitForNodes(3) + assert self.provider.mock_nodes[2].node_type == "p2.8xlarge" def testRequestResourcesIdleTimeout(self): """Test request_resources() with and without idle timeout.""" @@ -1599,8 +1689,12 @@ class AutoscalingTest(unittest.TestCase): config_path = self.write_config(config) self.provider = MockProvider() runner = MockProcessRunner() - lm = LoadMetrics() - runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + lm = LoadMetrics("172.0.0.0") + runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) autoscaler = StandardAutoscaler( config_path, lm, @@ -1608,14 +1702,14 @@ class AutoscalingTest(unittest.TestCase): process_runner=runner, update_interval_s=0) autoscaler.update() - self.waitForNodes(0) + self.waitForNodes(0, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) autoscaler.update() - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) non_terminated_nodes = autoscaler.provider.non_terminated_nodes({}) - assert len(non_terminated_nodes) == 1 - node_id = non_terminated_nodes[0] - node_ip = autoscaler.provider.non_terminated_node_ips({})[0] + assert len(non_terminated_nodes) == 2 + node_id = non_terminated_nodes[1] + node_ip = autoscaler.provider.non_terminated_node_ips({})[1] # A hack to check if the node was terminated when it shouldn't. autoscaler.provider.mock_nodes[node_id].state = "unterminatable" @@ -1629,10 +1723,10 @@ class AutoscalingTest(unittest.TestCase): }]) autoscaler.update() # this fits on request_resources()! - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) lm.update( node_ip, @@ -1642,7 +1736,7 @@ class AutoscalingTest(unittest.TestCase): "WORKER": 1.0 }]) autoscaler.update() - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) lm.update( node_ip, config["available_node_types"]["def_worker"]["resources"], @@ -1653,13 +1747,13 @@ class AutoscalingTest(unittest.TestCase): }]) autoscaler.update() # Still 2 as the second node did not show up a heart beat. - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # If node {node_id} was terminated any time then it's state will be set # to terminated. assert autoscaler.provider.mock_nodes[ node_id].state == "unterminatable" lm.update( - "172.0.0.1", + "172.0.0.2", config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], {}, waiting_bundles=[{ @@ -1669,7 +1763,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() # Now it is 1 because it showed up in last used (heart beat). # The remaining one is 127.0.0.1. - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testRequestResourcesRaceConditionsLong(self): """Test request_resources(), race conditions & demands/min_workers. @@ -1704,7 +1798,11 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)]) - lm = LoadMetrics() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + lm = LoadMetrics("172.0.0.0") autoscaler = StandardAutoscaler( config_path, lm, @@ -1714,11 +1812,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) autoscaler.update() # 1 min worker for both min_worker and request_resources() - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) non_terminated_nodes = autoscaler.provider.non_terminated_nodes({}) - assert len(non_terminated_nodes) == 1 - node_id = non_terminated_nodes[0] - node_ip = autoscaler.provider.non_terminated_node_ips({})[0] + assert len(non_terminated_nodes) == 2 + node_id = non_terminated_nodes[1] + node_ip = autoscaler.provider.non_terminated_node_ips({})[1] # A hack to check if the node was terminated when it shouldn't. autoscaler.provider.mock_nodes[node_id].state = "unterminatable" @@ -1733,12 +1831,12 @@ class AutoscalingTest(unittest.TestCase): autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 2) autoscaler.update() # 2 requested_resource, 1 min worker, 1 free node -> 2 nodes total - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}]) autoscaler.update() # Still 2 because the second one is not connected and hence # request_resources occupies the connected node. - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([{"CPU": 0.2, "WORKER": 1.0}] * 3) lm.update( node_ip, @@ -1748,14 +1846,14 @@ class AutoscalingTest(unittest.TestCase): "WORKER": 1.0 }] * 3) autoscaler.update() - self.waitForNodes(3) + self.waitForNodes(3, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) autoscaler.request_resources([]) - lm.update("172.0.0.1", + lm.update("172.0.0.2", config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], {}) - lm.update("172.0.0.2", + lm.update("172.0.0.3", config["available_node_types"]["def_worker"]["resources"], config["available_node_types"]["def_worker"]["resources"], {}) @@ -1763,7 +1861,7 @@ class AutoscalingTest(unittest.TestCase): config["available_node_types"]["def_worker"]["resources"], {}, {}) autoscaler.update() - self.waitForNodes(1) + self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) # If node {node_id} was terminated any time then it's state will be set # to terminated. assert autoscaler.provider.mock_nodes[ @@ -1799,7 +1897,11 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() runner = MockProcessRunner() runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)]) - lm = LoadMetrics() + self.provider.create_node({}, { + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, + TAG_RAY_USER_NODE_TYPE: "empty_node" + }, 1) + lm = LoadMetrics("172.0.0.0") autoscaler = StandardAutoscaler( config_path, lm, @@ -1809,7 +1911,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.request_resources([{"CPU": 2, "WORKER": 1.0}] * 2) autoscaler.update() # 2 min worker for both min_worker and request_resources(), not 3. - self.waitForNodes(2) + self.waitForNodes(2, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) def testRequestResourcesRaceConditionWithResourceDemands(self): """Test request_resources() with resource_demands.