mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 17:49:47 +08:00
[autoscaler] run setup commands with restart_only=True (#13836)
This commit is contained in:
@@ -646,7 +646,12 @@ def get_or_create_head_node(config: Dict[str, Any],
|
||||
cli_logger.print("Prepared bootstrap config")
|
||||
|
||||
if restart_only:
|
||||
setup_commands = []
|
||||
# Docker may re-launch nodes, requiring setup
|
||||
# commands to be rerun.
|
||||
if config.get("docker", {}).get("container_name"):
|
||||
setup_commands = config["head_setup_commands"]
|
||||
else:
|
||||
setup_commands = []
|
||||
ray_start_commands = config["head_start_ray_commands"]
|
||||
elif no_restart:
|
||||
setup_commands = config["head_setup_commands"]
|
||||
@@ -678,7 +683,8 @@ def get_or_create_head_node(config: Dict[str, Any],
|
||||
"rsync_exclude": config.get("rsync_exclude"),
|
||||
"rsync_filter": config.get("rsync_filter")
|
||||
},
|
||||
docker_config=config.get("docker"))
|
||||
docker_config=config.get("docker"),
|
||||
restart_only=restart_only)
|
||||
updater.start()
|
||||
updater.join()
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ class NodeUpdater:
|
||||
use_internal_ip: Wwhether the node_id belongs to an internal ip
|
||||
or external ip.
|
||||
docker_config: Docker section of autoscaler yaml
|
||||
restart_only: Whether to skip setup commands & just restart ray
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
@@ -68,7 +69,8 @@ class NodeUpdater:
|
||||
rsync_options=None,
|
||||
process_runner=subprocess,
|
||||
use_internal_ip=False,
|
||||
docker_config=None):
|
||||
docker_config=None,
|
||||
restart_only=False):
|
||||
|
||||
self.log_prefix = "NodeUpdater: {}: ".format(node_id)
|
||||
use_internal_ip = (use_internal_ip
|
||||
@@ -106,6 +108,7 @@ class NodeUpdater:
|
||||
self.auth_config = auth_config
|
||||
self.is_head_node = is_head_node
|
||||
self.docker_config = docker_config
|
||||
self.restart_only = restart_only
|
||||
|
||||
def run(self):
|
||||
if cmd_output_util.does_allow_interactive(
|
||||
@@ -298,6 +301,11 @@ class NodeUpdater:
|
||||
sync_run_yet=False)
|
||||
if init_required:
|
||||
node_tags[TAG_RAY_RUNTIME_CONFIG] += "-invalidate"
|
||||
# This ensures that `setup_commands` are not removed
|
||||
self.restart_only = False
|
||||
|
||||
if self.restart_only:
|
||||
self.setup_commands = []
|
||||
|
||||
# runtime_hash will only change whenever the user restarts
|
||||
# or updates their cluster with `get_or_create_head_node`
|
||||
|
||||
@@ -500,7 +500,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
_provider=self.provider,
|
||||
_runner=runner)
|
||||
self.waitForNodes(1)
|
||||
# Init & Setup commands msut be run for Docker!
|
||||
# Init & Setup commands must be run for Docker!
|
||||
runner.assert_has_call("1.2.3.4", "init_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "start_ray_head")
|
||||
@@ -543,6 +543,34 @@ class AutoscalingTest(unittest.TestCase):
|
||||
assert first_mkdir < first_rsync
|
||||
assert first_rsync < first_cp
|
||||
|
||||
def testGetOrCreateHeadNodeFromStoppedRestartOnly(self):
|
||||
self.testGetOrCreateHeadNode()
|
||||
self.provider.cache_stopped = True
|
||||
existing_nodes = self.provider.non_terminated_nodes({})
|
||||
assert len(existing_nodes) == 1
|
||||
self.provider.terminate_node(existing_nodes[0])
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Mounts", ["[]"])
|
||||
# Two initial calls to docker cp, + 2 more calls during run_init
|
||||
runner.respond_to_call(".State.Running",
|
||||
["false", "false", "false", "false"])
|
||||
runner.respond_to_call("json .Config.Env", ["[]"])
|
||||
commands.get_or_create_head_node(
|
||||
SMALL_CLUSTER,
|
||||
printable_config_file=config_path,
|
||||
no_restart=False,
|
||||
restart_only=True,
|
||||
yes=True,
|
||||
override_cluster_name=None,
|
||||
_provider=self.provider,
|
||||
_runner=runner)
|
||||
self.waitForNodes(1)
|
||||
# Init & Setup commands must be run for Docker!
|
||||
runner.assert_has_call("1.2.3.4", "init_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "start_ray_head")
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32", "Failing on Windows.")
|
||||
def testDockerFileMountsAdded(self):
|
||||
config = copy.deepcopy(SMALL_CLUSTER)
|
||||
|
||||
Reference in New Issue
Block a user