diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst index 9a8774da8..64117132c 100644 --- a/doc/source/cluster/autoscaling.rst +++ b/doc/source/cluster/autoscaling.rst @@ -114,3 +114,15 @@ The ``worker_setup_commands`` field (and also the ``initialization_commands`` fi worker_setup_commands: - pip install tensorflow-gpu # Example command. + +Docker Support +~~~~~~~~~~~~~~ +The ``worker_image`` and ``pull_before_run`` fields override the correpsonding field in the top level ``docker`` section for the node type. The ``worker_run_options`` field is combined with top level ``docker: run_options`` field to produce the docker run command for the given node_type. The following configuration is for a GPU enabled node type. + +.. code:: + + pull_before_run: True + worker_image: + - rayproject/ray-ml:latest-gpu + worker_run_options: + - --runtime=nvidia diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 5ee728b94..5b57010f6 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -1,5 +1,5 @@ -from collections import defaultdict -from typing import Optional +from collections import defaultdict, namedtuple +from typing import Any, Optional import copy import logging import math @@ -30,6 +30,12 @@ from six.moves import queue logger = logging.getLogger(__name__) +# Tuple of modified fields for the given node_id returned by should_update +# that will be passed into a NodeUpdaterThread. +UpdateInstructions = namedtuple( + "UpdateInstructions", + ["node_id", "init_commands", "start_ray_commands", "docker_config"]) + class StandardAutoscaler: """The autoscaling control loop for a Ray cluster. @@ -236,14 +242,15 @@ class StandardAutoscaler: # problems. They should at a minimum be spawned as daemon threads. # See https://github.com/ray-project/ray/pull/5903 for more info. T = [] - for node_id, commands, ray_start in (self.should_update(node_id) - for node_id in nodes): + for node_id, commands, ray_start, docker_config in ( + self.should_update(node_id) for node_id in nodes): if node_id is not None: resources = self._node_resources(node_id) T.append( threading.Thread( target=self.spawn_updater, - args=(node_id, commands, ray_start, resources))) + args=(node_id, commands, ray_start, resources, + docker_config))) for t in T: t.start() for t in T: @@ -401,44 +408,56 @@ class StandardAutoscaler: updater.start() self.updaters[node_id] = updater - def _get_node_type_specific_commands(self, node_id: str, - commands_key: str): - commands = self.config[commands_key] + def _get_node_type_specific_fields(self, node_id: str, + fields_key: str) -> Any: + fields = self.config[fields_key] node_tags = self.provider.node_tags(node_id) if TAG_RAY_USER_NODE_TYPE in node_tags: node_type = node_tags[TAG_RAY_USER_NODE_TYPE] if node_type not in self.available_node_types: raise ValueError(f"Unknown node type tag: {node_type}.") node_specific_config = self.available_node_types[node_type] - if commands_key in node_specific_config: - commands = node_specific_config[commands_key] - return commands + if fields_key in node_specific_config: + fields = node_specific_config[fields_key] + return fields + + def _get_node_specific_docker_config(self, node_id): + docker_config = copy.deepcopy(self.config.get("docker", {})) + node_specific_docker = self._get_node_type_specific_fields( + node_id, "docker") + docker_config.update(node_specific_docker) + return docker_config def should_update(self, node_id): if not self.can_update(node_id): - return None, None, None # no update + return UpdateInstructions(None, None, None, None) # no update status = self.provider.node_tags(node_id).get(TAG_RAY_NODE_STATUS) if status == STATUS_UP_TO_DATE and self.files_up_to_date(node_id): - return None, None, None # no update + return UpdateInstructions(None, None, None, None) # no update successful_updated = self.num_successful_updates.get(node_id, 0) > 0 if successful_updated and self.config.get("restart_only", False): init_commands = [] ray_commands = self.config["worker_start_ray_commands"] elif successful_updated and self.config.get("no_restart", False): - init_commands = self._get_node_type_specific_commands( + init_commands = self._get_node_type_specific_fields( node_id, "worker_setup_commands") ray_commands = [] else: - init_commands = self._get_node_type_specific_commands( + init_commands = self._get_node_type_specific_fields( node_id, "worker_setup_commands") ray_commands = self.config["worker_start_ray_commands"] - return (node_id, init_commands, ray_commands) + docker_config = self._get_node_specific_docker_config(node_id) + return UpdateInstructions( + node_id=node_id, + init_commands=init_commands, + start_ray_commands=ray_commands, + docker_config=docker_config) def spawn_updater(self, node_id, init_commands, ray_start_commands, - node_resources): + node_resources, docker_config): updater = NodeUpdaterThread( node_id=node_id, provider_config=self.config["provider"], @@ -447,7 +466,7 @@ class StandardAutoscaler: cluster_name=self.config["cluster_name"], file_mounts=self.config["file_mounts"], initialization_commands=with_head_node_ip( - self._get_node_type_specific_commands( + self._get_node_type_specific_fields( node_id, "initialization_commands")), setup_commands=with_head_node_ip(init_commands), ray_start_commands=with_head_node_ip(ray_start_commands), @@ -457,7 +476,7 @@ class StandardAutoscaler: cluster_synced_files=self.config["cluster_synced_files"], process_runner=self.process_runner, use_internal_ip=True, - docker_config=self.config.get("docker"), + docker_config=docker_config, node_resources=node_resources) updater.start() self.updaters[node_id] = updater diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 5b6f2df71..cd38f09e7 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -705,9 +705,8 @@ class DockerCommandRunner(CommandRunnerInterface): def run_init(self, *, as_head, file_mounts): image = self.docker_config.get("image") - if image is None: - image = self.docker_config.get( - f"{'head' if as_head else 'worker'}_image") + image = self.docker_config.get( + f"{'head' if as_head else 'worker'}_image", image) self._check_docker_installed() if self.docker_config.get("pull_before_run", True): diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index e5bf50911..0546ba20d 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -302,8 +302,26 @@ }, "worker_setup_commands": { "$ref": "#/definitions/commands", - "description": "List of common shell commands to run to setup nodes." - } + "description": "List of common shell commands to run to setup nodes. This node specfic list will override the global setup_commands and worker_setup_commands." + }, + "docker": { + "description": "Configuration of Worker nodes.", + "type": "object", + "properties": { + "pull_before_run": { + "type": "boolean", + "description": "run `docker pull` first" + }, + "worker_image": { + "type": "string", + "description": "analogous to head_image" + }, + "worker_run_options": { + "type": "array", + "description": "analogous to head_run_options, merged with the global docker run_options." + } + } + } }, "additionalProperties": false } diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index d7c52ce42..985c45551 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -449,6 +449,86 @@ class AutoscalingTest(unittest.TestCase): runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, "init_cmd") + def testDockerWorkers(self): + config = MULTI_WORKER_CLUSTER.copy() + config["available_node_types"]["p2.8xlarge"]["docker"] = { + "worker_image": "p2.8x_image:latest", + "worker_run_options": ["p2.8x-run-options"] + } + config["available_node_types"]["p2.xlarge"]["docker"] = { + "worker_image": "p2x_image:nightly" + } + config["docker"]["worker_run_options"] = ["standard-run-options"] + config["docker"]["image"] = "default-image:nightly" + config["docker"]["worker_image"] = "default-image:nightly" + # Commenting out this line causes the test case to fail?!?! + config["min_workers"] = 0 + config["max_workers"] = 10 + config_path = self.write_config(config) + self.provider = MockProvider() + runner = MockProcessRunner() + autoscaler = StandardAutoscaler( + config_path, + LoadMetrics(), + max_failures=0, + process_runner=runner, + update_interval_s=0) + assert len(self.provider.non_terminated_nodes({})) == 0 + autoscaler.update() + self.waitForNodes(0) + autoscaler.request_resources([{"CPU": 1}]) + autoscaler.update() + self.waitForNodes(1) + assert self.provider.mock_nodes[0].node_type == "m4.large" + autoscaler.request_resources([{"GPU": 8}]) + autoscaler.update() + self.waitForNodes(2) + assert self.provider.mock_nodes[1].node_type == "p2.8xlarge" + autoscaler.request_resources([{"GPU": 1}] * 9) + autoscaler.update() + self.waitForNodes(3) + assert self.provider.mock_nodes[2].node_type == "p2.xlarge" + autoscaler.update() + # Fill up m4, p2.8, p2 and request 2 more CPUs + autoscaler.request_resources([{ + "CPU": 2 + }, { + "CPU": 16 + }, { + "CPU": 32 + }, { + "CPU": 2 + }]) + autoscaler.update() + self.waitForNodes(4) + assert self.provider.mock_nodes[3].node_type == "m4.16xlarge" + autoscaler.update() + sleep(0.1) + runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, + "p2.8x-run-options") + runner.assert_has_call(self.provider.mock_nodes[1].internal_ip, + "p2.8x_image:latest") + runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, + "default-image:nightly") + runner.assert_not_has_call(self.provider.mock_nodes[1].internal_ip, + "standard-run-options") + + runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, + "p2x_image:nightly") + runner.assert_has_call(self.provider.mock_nodes[2].internal_ip, + "standard-run-options") + runner.assert_not_has_call(self.provider.mock_nodes[2].internal_ip, + "p2.8x-run-options") + + runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, + "default-image:nightly") + runner.assert_has_call(self.provider.mock_nodes[3].internal_ip, + "standard-run-options") + runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip, + "p2.8x-run-options") + runner.assert_not_has_call(self.provider.mock_nodes[3].internal_ip, + "p2x_image:nightly") + def testUpdateConfig(self): config = MULTI_WORKER_CLUSTER.copy() config_path = self.write_config(config)