From 9ca159aa0bf2ee1b0a0e7ed224a605af7aac0438 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Tue, 25 Aug 2020 23:35:38 -0700 Subject: [PATCH] [Autoscaler] Multi node commands (#10236) --- python/ray/autoscaler/autoscaler.py | 22 +++++++- python/ray/autoscaler/docker.py | 10 ++++ python/ray/autoscaler/ray-schema.json | 11 +++- .../tests/test_resource_demand_scheduler.py | 55 ++++++++++++++++++- 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 8adc40573..46c0cf4ea 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -389,6 +389,19 @@ class StandardAutoscaler: updater.start() self.updaters[node_id] = updater + def _get_node_type_specific_commands(self, node_id: str, + commands_key: str): + commands = self.config[commands_key] + node_tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in node_tags: + node_type = node_tags[TAG_RAY_USER_NODE_TYPE] + if node_type not in self.available_node_types: + raise ValueError(f"Unknown node type tag: {node_type}.") + node_specific_config = self.available_node_types[node_type] + if commands_key in node_specific_config: + commands = node_specific_config[commands_key] + return commands + def should_update(self, node_id): if not self.can_update(node_id): return None, None, None # no update @@ -402,10 +415,12 @@ class StandardAutoscaler: init_commands = [] ray_commands = self.config["worker_start_ray_commands"] elif successful_updated and self.config.get("no_restart", False): - init_commands = self.config["worker_setup_commands"] + init_commands = self._get_node_type_specific_commands( + node_id, "worker_setup_commands") ray_commands = [] else: - init_commands = self.config["worker_setup_commands"] + init_commands = self._get_node_type_specific_commands( + node_id, "worker_setup_commands") ray_commands = self.config["worker_start_ray_commands"] return (node_id, init_commands, ray_commands) @@ -420,7 +435,8 @@ class StandardAutoscaler: cluster_name=self.config["cluster_name"], file_mounts=self.config["file_mounts"], initialization_commands=with_head_node_ip( - self.config["initialization_commands"]), + self._get_node_type_specific_commands( + node_id, "initialization_commands")), setup_commands=with_head_node_ip(init_commands), ray_start_commands=with_head_node_ip(ray_start_commands), runtime_hash=self.runtime_hash, diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/docker.py index c9b5774d4..98b334a57 100644 --- a/python/ray/autoscaler/docker.py +++ b/python/ray/autoscaler/docker.py @@ -37,6 +37,10 @@ def dockerize_if_needed(config): if docker_pull: docker_pull_cmd = "docker pull {}".format(docker_image) config["initialization_commands"].append(docker_pull_cmd) + for node_type_config in config.get("available_node_types", + {}).values(): + node_type_config["initialization_commands"].append(docker_pull_cmd) + pass head_docker_start = docker_start_cmds(ssh_user, head_docker_image, docker_mounts, cname, @@ -59,6 +63,12 @@ def dockerize_if_needed(config): container_name=cname, env_vars=["RAY_HEAD_IP"]) + for node_type_config in config.get("available_node_types", {}).values(): + if "worker_setup_commands" in node_type_config: + node_type_config["worker_setup_commands"] = worker_docker_start + ( + with_docker_exec( + node_type_config["worker_setup_commands"], + container_name=cname)) return config diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index 7cec8f094..5c1f81573 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -291,7 +291,16 @@ "resources": { "type": "object", ".*": {"type": "number"} - } + }, + "initialization_commands": { + "$ref": "#/definitions/commands", + "description": "List of commands that will be run before `setup_commands`. If docker is enabled, these commands will run outside the container and before docker is setup." + }, + "worker_setup_commands": { + "$ref": "#/definitions/commands", + "description": "List of common shell commands to run to setup nodes." + }, + "additionalProperties": false }, "additionalProperties": false } diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 2338ed60d..8bc8f6153 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -200,6 +200,10 @@ class AutoscalingTest(unittest.TestCase): return path def testGetOrCreateMultiNodeType(self): + config = MULTI_WORKER_CLUSTER.copy() + # Commenting out this line causes the test case to fail?!?! + config["min_workers"] = 0 + config_path = self.write_config(config) config_path = self.write_config(MULTI_WORKER_CLUSTER) self.provider = MockProvider() runner = MockProcessRunner() @@ -214,7 +218,7 @@ class AutoscalingTest(unittest.TestCase): _runner=runner) self.waitForNodes(1) runner.assert_has_call("1.2.3.4", "init_cmd") - runner.assert_has_call("1.2.3.4", "head_setup_cmd") + runner.assert_has_call("1.2.3.4", "setup_cmd") runner.assert_has_call("1.2.3.4", "start_ray_head") self.assertEqual(self.provider.mock_nodes[0].node_type, "empty_node") self.assertEqual( @@ -348,6 +352,55 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("172.0.0.1", "CPU: 32") runner.assert_has_call("172.0.0.1", "GPU: 8") + def testCommandPassing(self): + t = "custom" + config = MULTI_WORKER_CLUSTER.copy() + config["available_node_types"]["p2.8xlarge"][ + "worker_setup_commands"] = ["new_worker_setup_command"] + config["available_node_types"]["p2.xlarge"][ + "initialization_commands"] = ["new_worker_initialization_cmd"] + config["available_node_types"]["p2.xlarge"]["resources"][t] = 1 + # Commenting out this line causes the test case to fail?!?! + config["min_workers"] = 0 + config["max_workers"] = 10 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(0) + autoscaler.request_resources([{"CPU": 1}]) + autoscaler.update() + self.waitForNodes(1) + assert self.provider.mock_nodes[0].node_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) + autoscaler.update() + self.waitForNodes(2) + assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + autoscaler.request_resources([{"GPU": 1}] * 9) + # autoscaler.request_resources([{t: 1}]) + autoscaler.update() + self.waitForNodes(3) + assert self.provider.mock_nodes[2].node_type == "p2.xlarge" + autoscaler.update() + sleep(0.1) + runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, + "new_worker_setup_command") + runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, + "setup_cmd") + runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, + "worker_setup_cmd") + runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, + "new_worker_initialization_cmd") + runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, + "init_cmd") + if __name__ == "__main__": import sys