[autoscaler] Fix custom node resources on head (#11896)

This commit is contained in:
Ian Rodney
2020-11-12 10:30:04 -08:00
committed by GitHub
parent ad639f12d8
commit 9254de0b02
5 changed files with 58 additions and 16 deletions
+2 -1
View File
@@ -450,7 +450,8 @@ class StandardAutoscaler:
process_runner=self.process_runner,
use_internal_ip=True,
is_head_node=False,
docker_config=self.config.get("docker"))
docker_config=self.config.get("docker"),
node_resources=self._node_resources(node_id))
updater.start()
self.updaters[node_id] = updater
+21 -11
View File
@@ -473,7 +473,7 @@ def warn_about_bad_start_command(start_commands: List[str]) -> None:
def get_or_create_head_node(config: Dict[str, Any],
config_file: str,
printable_config_file: str,
no_restart: bool,
restart_only: bool,
yes: bool,
@@ -487,7 +487,6 @@ def get_or_create_head_node(config: Dict[str, Any],
config["cluster_name"]))
config = copy.deepcopy(config)
config_file = os.path.abspath(config_file)
head_node_tags = {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
}
@@ -531,13 +530,18 @@ def get_or_create_head_node(config: Dict[str, Any],
_abort=True)
cli_logger.newline()
# TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
head_node_config = copy.deepcopy(config["head_node"])
head_node_resources = None
if "head_node_type" in config:
head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"]
head_node_config.update(config["available_node_types"][config[
"head_node_type"]]["node_config"])
head_node_type = config["head_node_type"]
head_node_tags[TAG_RAY_USER_NODE_TYPE] = head_node_type
head_config = config["available_node_types"][head_node_type]
head_node_config.update(head_config["node_config"])
# Not necessary to keep in sync with node_launcher.py
# Keep in sync with autoscaler.py _node_resources
head_node_resources = head_config.get("resources")
launch_hash = hash_launch_conf(head_node_config, config["auth"])
if head_node is None or provider.node_tags(head_node).get(
@@ -659,6 +663,7 @@ def get_or_create_head_node(config: Dict[str, Any],
runtime_hash=runtime_hash,
file_mounts_contents_hash=file_mounts_contents_hash,
is_head_node=True,
node_resources=head_node_resources,
rsync_options={
"rsync_exclude": config.get("rsync_exclude"),
"rsync_filter": config.get("rsync_filter")
@@ -688,13 +693,15 @@ def get_or_create_head_node(config: Dict[str, Any],
cli_logger.newline()
with cli_logger.group("Useful commands"):
printable_config_file = os.path.abspath(printable_config_file)
cli_logger.print("Monitor autoscaling with")
cli_logger.print(
cf.bold(" ray exec {}{} {}"), config_file, modifiers,
cf.bold(" ray exec {}{} {}"), printable_config_file, modifiers,
quote(monitor_str))
cli_logger.print("Connect to a terminal on the cluster head:")
cli_logger.print(cf.bold(" ray attach {}{}"), config_file, modifiers)
cli_logger.print(
cf.bold(" ray attach {}{}"), printable_config_file, modifiers)
remote_shell_str = updater.cmd_runner.remote_shell_command_str()
cli_logger.print("Get a remote shell to the cluster manually:")
@@ -1024,7 +1031,7 @@ def _get_worker_nodes(config: Dict[str, Any],
def _get_head_node(config: Dict[str, Any],
config_file: str,
printable_config_file: str,
override_cluster_name: Optional[str],
create_if_needed: bool = False) -> str:
provider = _get_node_provider(config["provider"], config["cluster_name"])
@@ -1039,13 +1046,16 @@ def _get_head_node(config: Dict[str, Any],
elif create_if_needed:
get_or_create_head_node(
config,
config_file,
printable_config_file=printable_config_file,
restart_only=False,
no_restart=False,
yes=True,
override_cluster_name=override_cluster_name)
return _get_head_node(
config, config_file, override_cluster_name, create_if_needed=False)
config,
printable_config_file,
override_cluster_name,
create_if_needed=False)
else:
raise RuntimeError("Head node of cluster ({}) not found!".format(
config["cluster_name"]))
+1 -1
View File
@@ -393,7 +393,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]"])
commands.get_or_create_head_node(
SMALL_CLUSTER,
config_path,
printable_config_file=config_path,
no_restart=False,
restart_only=False,
yes=True,
@@ -72,9 +72,9 @@
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Setup commands succeeded \[LogTimer=.+\]
.+\.py.*\[6/6\] Starting the Ray runtime
.+\.py.*Running `ray stop`
.+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray stop`
.+\.py.*Full command is `ssh.+`
.+\.py.*Running `ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml`
.+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml`
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Ray start commands succeeded \[LogTimer=.+\]
.+\.py.*NodeUpdater: i-.+: Applied config .+ \[LogTimer=.+\]
@@ -1005,7 +1005,7 @@ class AutoscalingTest(unittest.TestCase):
runner.respond_to_call("json .Config.Env", ["[]"])
get_or_create_head_node(
MULTI_WORKER_CLUSTER,
config_path,
printable_config_file=config_path,
no_restart=False,
restart_only=False,
yes=True,
@@ -1025,6 +1025,37 @@ class AutoscalingTest(unittest.TestCase):
self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE),
"empty_node")
def testGetOrCreateMultiNodeTypeCustomHeadResources(self):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["empty_node"]["resources"] = {
"empty_resource_name": 1000
}
config_path = self.write_config(config)
self.provider = MockProvider()
runner = MockProcessRunner()
get_or_create_head_node(
config,
printable_config_file=config_path,
no_restart=False,
restart_only=False,
yes=True,
override_cluster_name=None,
_provider=self.provider,
_runner=runner)
self.waitForNodes(1)
runner.assert_has_call("1.2.3.4", "init_cmd")
runner.assert_has_call("1.2.3.4", "setup_cmd")
runner.assert_has_call("1.2.3.4", "start_ray_head")
runner.assert_has_call("1.2.3.4", "empty_resource_name")
self.assertEqual(self.provider.mock_nodes[0].node_type, "empty_node")
self.assertEqual(
self.provider.mock_nodes[0].node_config.get("FooProperty"), 42)
self.assertEqual(
self.provider.mock_nodes[0].node_config.get("TestProp"), 1)
self.assertEqual(
self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE),
"empty_node")
def testScaleUpMinSanity(self):
config = copy.deepcopy(MULTI_WORKER_CLUSTER)
config["available_node_types"]["m4.large"]["min_workers"] = \