diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 4959280b5..375f19b87 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -198,12 +198,12 @@ class StandardAutoscaler: # First let the resource demand scheduler launch nodes, if enabled. if self.resource_demand_scheduler and self.resource_demand_vector: - # TODO(ekl) include head node in the node list - instances = (self.resource_demand_scheduler.get_nodes_to_launch( - nodes, self.pending_launches.breakdown(), + to_launch = (self.resource_demand_scheduler.get_nodes_to_launch( + self.provider.non_terminated_nodes(tag_filters={}), + self.pending_launches.breakdown(), self.resource_demand_vector)) # TODO(ekl) also enforce max launch concurrency here? - for node_type, count in instances: + for node_type, count in to_launch: self.launch_new_node(count, node_type=node_type) # Launch additional nodes of the default type, if still needed. diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 570ad5740..2338ed60d 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -12,17 +12,22 @@ from ray.autoscaler.autoscaler import StandardAutoscaler from ray.autoscaler.load_metrics import LoadMetrics from ray.autoscaler.node_provider import NODE_PROVIDERS from ray.autoscaler.commands import get_or_create_head_node -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE +from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND from ray.autoscaler.resource_demand_scheduler import _utilization_score, \ get_bin_pack_residual, get_nodes_for from time import sleep TYPES_A = { - "m4.large": { + "empty_node": { "node_config": { "FooProperty": 42, }, + "resources": {}, + "max_workers": 0, + }, + "m4.large": { + "node_config": {}, "resources": { "CPU": 2 }, @@ -63,7 +68,7 @@ TYPES_A = { MULTI_WORKER_CLUSTER = dict( SMALL_CLUSTER, **{ "available_node_types": TYPES_A, - "head_node_type": "m4.large", + "head_node_type": "empty_node", "worker_default_node_type": "m4.large", }) @@ -211,14 +216,14 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "init_cmd") runner.assert_has_call("1.2.3.4", "head_setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") - self.assertEqual(self.provider.mock_nodes[0].node_type, "m4.large") + self.assertEqual(self.provider.mock_nodes[0].node_type, "empty_node") self.assertEqual( self.provider.mock_nodes[0].node_config.get("FooProperty"), 42) self.assertEqual( self.provider.mock_nodes[0].node_config.get("TestProp"), 1) self.assertEqual( self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE), - "m4.large") + "empty_node") def testScaleUpMinSanity(self): config_path = self.write_config(MULTI_WORKER_CLUSTER) @@ -236,6 +241,43 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) + def testRequestBundlesAccountsForHeadNode(self): + config = MULTI_WORKER_CLUSTER.copy() + config["head_node_type"] = "p2.8xlarge" + config["min_workers"] = 0 + config["max_workers"] = 50 + config_path = self.write_config(config) + self.provider = MockProvider() + self.provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_KIND: "head" + }, 1) + runner = MockProcessRunner() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 1 + + # These requests fit on the head node. + autoscaler.update() + self.waitForNodes(1) + autoscaler.request_resources([{"CPU": 1}]) + autoscaler.update() + self.waitForNodes(1) + assert len(self.provider.mock_nodes) == 1 + autoscaler.request_resources([{"GPU": 8}]) + autoscaler.update() + self.waitForNodes(1) + + # This request requires an additional worker node. + autoscaler.request_resources([{"GPU": 8}] * 2) + autoscaler.update() + self.waitForNodes(2) + assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + def testRequestBundles(self): config = MULTI_WORKER_CLUSTER.copy() config["min_workers"] = 0