[Autoscaler] Multi node commands (#10236)

This commit is contained in:
Alex Wu
2020-08-25 23:35:38 -07:00
committed by GitHub
parent 0dae50b5eb
commit 9ca159aa0b
4 changed files with 93 additions and 5 deletions
+19 -3
View File
@@ -389,6 +389,19 @@ class StandardAutoscaler:
updater.start()
self.updaters[node_id] = updater
def _get_node_type_specific_commands(self, node_id: str,
commands_key: str):
commands = self.config[commands_key]
node_tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in node_tags:
node_type = node_tags[TAG_RAY_USER_NODE_TYPE]
if node_type not in self.available_node_types:
raise ValueError(f"Unknown node type tag: {node_type}.")
node_specific_config = self.available_node_types[node_type]
if commands_key in node_specific_config:
commands = node_specific_config[commands_key]
return commands
def should_update(self, node_id):
if not self.can_update(node_id):
return None, None, None # no update
@@ -402,10 +415,12 @@ class StandardAutoscaler:
init_commands = []
ray_commands = self.config["worker_start_ray_commands"]
elif successful_updated and self.config.get("no_restart", False):
init_commands = self.config["worker_setup_commands"]
init_commands = self._get_node_type_specific_commands(
node_id, "worker_setup_commands")
ray_commands = []
else:
init_commands = self.config["worker_setup_commands"]
init_commands = self._get_node_type_specific_commands(
node_id, "worker_setup_commands")
ray_commands = self.config["worker_start_ray_commands"]
return (node_id, init_commands, ray_commands)
@@ -420,7 +435,8 @@ class StandardAutoscaler:
cluster_name=self.config["cluster_name"],
file_mounts=self.config["file_mounts"],
initialization_commands=with_head_node_ip(
self.config["initialization_commands"]),
self._get_node_type_specific_commands(
node_id, "initialization_commands")),
setup_commands=with_head_node_ip(init_commands),
ray_start_commands=with_head_node_ip(ray_start_commands),
runtime_hash=self.runtime_hash,
+10
View File
@@ -37,6 +37,10 @@ def dockerize_if_needed(config):
if docker_pull:
docker_pull_cmd = "docker pull {}".format(docker_image)
config["initialization_commands"].append(docker_pull_cmd)
for node_type_config in config.get("available_node_types",
{}).values():
node_type_config["initialization_commands"].append(docker_pull_cmd)
pass
head_docker_start = docker_start_cmds(ssh_user, head_docker_image,
docker_mounts, cname,
@@ -59,6 +63,12 @@ def dockerize_if_needed(config):
container_name=cname,
env_vars=["RAY_HEAD_IP"])
for node_type_config in config.get("available_node_types", {}).values():
if "worker_setup_commands" in node_type_config:
node_type_config["worker_setup_commands"] = worker_docker_start + (
with_docker_exec(
node_type_config["worker_setup_commands"],
container_name=cname))
return config
+10 -1
View File
@@ -291,7 +291,16 @@
"resources": {
"type": "object",
".*": {"type": "number"}
}
},
"initialization_commands": {
"$ref": "#/definitions/commands",
"description": "List of commands that will be run before `setup_commands`. If docker is enabled, these commands will run outside the container and before docker is setup."
},
"worker_setup_commands": {
"$ref": "#/definitions/commands",
"description": "List of common shell commands to run to setup nodes."
},
"additionalProperties": false
},
"additionalProperties": false
}
@@ -200,6 +200,10 @@ class AutoscalingTest(unittest.TestCase):
return path
def testGetOrCreateMultiNodeType(self):
config = MULTI_WORKER_CLUSTER.copy()
# Commenting out this line causes the test case to fail?!?!
config["min_workers"] = 0
config_path = self.write_config(config)
config_path = self.write_config(MULTI_WORKER_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
@@ -214,7 +218,7 @@ class AutoscalingTest(unittest.TestCase):
_runner=runner)
self.waitForNodes(1)
runner.assert_has_call("1.2.3.4", "init_cmd")
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
runner.assert_has_call("1.2.3.4", "setup_cmd")
runner.assert_has_call("1.2.3.4", "start_ray_head")
self.assertEqual(self.provider.mock_nodes[0].node_type, "empty_node")
self.assertEqual(
@@ -348,6 +352,55 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_has_call("172.0.0.1", "CPU: 32")
runner.assert_has_call("172.0.0.1", "GPU: 8")
def testCommandPassing(self):
t = "custom"
config = MULTI_WORKER_CLUSTER.copy()
config["available_node_types"]["p2.8xlarge"][
"worker_setup_commands"] = ["new_worker_setup_command"]
config["available_node_types"]["p2.xlarge"][
"initialization_commands"] = ["new_worker_initialization_cmd"]
config["available_node_types"]["p2.xlarge"]["resources"][t] = 1
# Commenting out this line causes the test case to fail?!?!
config["min_workers"] = 0
config["max_workers"] = 10
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
# autoscaler.request_resources([{t: 1}])
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.xlarge"
autoscaler.update()
sleep(0.1)
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
"new_worker_setup_command")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"setup_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"worker_setup_cmd")
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"new_worker_initialization_cmd")
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"init_cmd")
if __name__ == "__main__":
import sys