[docker] Support multiple node types (#10504)

This commit is contained in:
Ian Rodney
2020-09-02 18:27:59 -07:00
committed by GitHub
parent dc7fe1a4c5
commit b9633a2b67
5 changed files with 152 additions and 24 deletions
+38 -19
View File
@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Optional
from collections import defaultdict, namedtuple
from typing import Any, Optional
import copy
import logging
import math
@@ -30,6 +30,12 @@ from six.moves import queue
logger = logging.getLogger(__name__)
# Tuple of modified fields for the given node_id returned by should_update
# that will be passed into a NodeUpdaterThread.
UpdateInstructions = namedtuple(
"UpdateInstructions",
["node_id", "init_commands", "start_ray_commands", "docker_config"])
class StandardAutoscaler:
"""The autoscaling control loop for a Ray cluster.
@@ -236,14 +242,15 @@ class StandardAutoscaler:
# problems. They should at a minimum be spawned as daemon threads.
# See https://github.com/ray-project/ray/pull/5903 for more info.
T = []
for node_id, commands, ray_start in (self.should_update(node_id)
for node_id in nodes):
for node_id, commands, ray_start, docker_config in (
self.should_update(node_id) for node_id in nodes):
if node_id is not None:
resources = self._node_resources(node_id)
T.append(
threading.Thread(
target=self.spawn_updater,
args=(node_id, commands, ray_start, resources)))
args=(node_id, commands, ray_start, resources,
docker_config)))
for t in T:
t.start()
for t in T:
@@ -401,44 +408,56 @@ class StandardAutoscaler:
updater.start()
self.updaters[node_id] = updater
def _get_node_type_specific_commands(self, node_id: str,
commands_key: str):
commands = self.config[commands_key]
def _get_node_type_specific_fields(self, node_id: str,
fields_key: str) -> Any:
fields = self.config[fields_key]
node_tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in node_tags:
node_type = node_tags[TAG_RAY_USER_NODE_TYPE]
if node_type not in self.available_node_types:
raise ValueError(f"Unknown node type tag: {node_type}.")
node_specific_config = self.available_node_types[node_type]
if commands_key in node_specific_config:
commands = node_specific_config[commands_key]
return commands
if fields_key in node_specific_config:
fields = node_specific_config[fields_key]
return fields
def _get_node_specific_docker_config(self, node_id):
docker_config = copy.deepcopy(self.config.get("docker", {}))
node_specific_docker = self._get_node_type_specific_fields(
node_id, "docker")
docker_config.update(node_specific_docker)
return docker_config
def should_update(self, node_id):
if not self.can_update(node_id):
return None, None, None # no update
return UpdateInstructions(None, None, None, None) # no update
status = self.provider.node_tags(node_id).get(TAG_RAY_NODE_STATUS)
if status == STATUS_UP_TO_DATE and self.files_up_to_date(node_id):
return None, None, None # no update
return UpdateInstructions(None, None, None, None) # no update
successful_updated = self.num_successful_updates.get(node_id, 0) > 0
if successful_updated and self.config.get("restart_only", False):
init_commands = []
ray_commands = self.config["worker_start_ray_commands"]
elif successful_updated and self.config.get("no_restart", False):
init_commands = self._get_node_type_specific_commands(
init_commands = self._get_node_type_specific_fields(
node_id, "worker_setup_commands")
ray_commands = []
else:
init_commands = self._get_node_type_specific_commands(
init_commands = self._get_node_type_specific_fields(
node_id, "worker_setup_commands")
ray_commands = self.config["worker_start_ray_commands"]
return (node_id, init_commands, ray_commands)
docker_config = self._get_node_specific_docker_config(node_id)
return UpdateInstructions(
node_id=node_id,
init_commands=init_commands,
start_ray_commands=ray_commands,
docker_config=docker_config)
def spawn_updater(self, node_id, init_commands, ray_start_commands,
node_resources):
node_resources, docker_config):
updater = NodeUpdaterThread(
node_id=node_id,
provider_config=self.config["provider"],
@@ -447,7 +466,7 @@ class StandardAutoscaler:
cluster_name=self.config["cluster_name"],
file_mounts=self.config["file_mounts"],
initialization_commands=with_head_node_ip(
self._get_node_type_specific_commands(
self._get_node_type_specific_fields(
node_id, "initialization_commands")),
setup_commands=with_head_node_ip(init_commands),
ray_start_commands=with_head_node_ip(ray_start_commands),
@@ -457,7 +476,7 @@ class StandardAutoscaler:
cluster_synced_files=self.config["cluster_synced_files"],
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config.get("docker"),
docker_config=docker_config,
node_resources=node_resources)
updater.start()
self.updaters[node_id] = updater
+2 -3
View File
@@ -705,9 +705,8 @@ class DockerCommandRunner(CommandRunnerInterface):
def run_init(self, *, as_head, file_mounts):
image = self.docker_config.get("image")
if image is None:
image = self.docker_config.get(
f"{'head' if as_head else 'worker'}_image")
image = self.docker_config.get(
f"{'head' if as_head else 'worker'}_image", image)
self._check_docker_installed()
if self.docker_config.get("pull_before_run", True):
+20 -2
View File
@@ -302,8 +302,26 @@
},
"worker_setup_commands": {
"$ref": "#/definitions/commands",
"description": "List of common shell commands to run to setup nodes."
}
"description": "List of common shell commands to run to setup nodes. This node specfic list will override the global setup_commands and worker_setup_commands."
},
"docker": {
"description": "Configuration of Worker nodes.",
"type": "object",
"properties": {
"pull_before_run": {
"type": "boolean",
"description": "run `docker pull` first"
},
"worker_image": {
"type": "string",
"description": "analogous to head_image"
},
"worker_run_options": {
"type": "array",
"description": "analogous to head_run_options, merged with the global docker run_options."
}
}
}
},
"additionalProperties": false
}
@@ -449,6 +449,86 @@ class AutoscalingTest(unittest.TestCase):
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"init_cmd")
def testDockerWorkers(self):
config = MULTI_WORKER_CLUSTER.copy()
config["available_node_types"]["p2.8xlarge"]["docker"] = {
"worker_image": "p2.8x_image:latest",
"worker_run_options": ["p2.8x-run-options"]
}
config["available_node_types"]["p2.xlarge"]["docker"] = {
"worker_image": "p2x_image:nightly"
}
config["docker"]["worker_run_options"] = ["standard-run-options"]
config["docker"]["image"] = "default-image:nightly"
config["docker"]["worker_image"] = "default-image:nightly"
# Commenting out this line causes the test case to fail?!?!
config["min_workers"] = 0
config["max_workers"] = 10
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(0)
autoscaler.request_resources([{"CPU": 1}])
autoscaler.update()
self.waitForNodes(1)
assert self.provider.mock_nodes[0].node_type == "m4.large"
autoscaler.request_resources([{"GPU": 8}])
autoscaler.update()
self.waitForNodes(2)
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
autoscaler.request_resources([{"GPU": 1}] * 9)
autoscaler.update()
self.waitForNodes(3)
assert self.provider.mock_nodes[2].node_type == "p2.xlarge"
autoscaler.update()
# Fill up m4, p2.8, p2 and request 2 more CPUs
autoscaler.request_resources([{
"CPU": 2
}, {
"CPU": 16
}, {
"CPU": 32
}, {
"CPU": 2
}])
autoscaler.update()
self.waitForNodes(4)
assert self.provider.mock_nodes[3].node_type == "m4.16xlarge"
autoscaler.update()
sleep(0.1)
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
"p2.8x-run-options")
runner.assert_has_call(self.provider.mock_nodes[1].internal_ip,
"p2.8x_image:latest")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"default-image:nightly")
runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip,
"standard-run-options")
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"p2x_image:nightly")
runner.assert_has_call(self.provider.mock_nodes[2].internal_ip,
"standard-run-options")
runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip,
"p2.8x-run-options")
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
"default-image:nightly")
runner.assert_has_call(self.provider.mock_nodes[3].internal_ip,
"standard-run-options")
runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip,
"p2.8x-run-options")
runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip,
"p2x_image:nightly")
def testUpdateConfig(self):
config = MULTI_WORKER_CLUSTER.copy()
config_path = self.write_config(config)