mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 14:48:54 +08:00
[autoscaler] Autoscaler hangs forever on non-zero exit code command (#3969)
This commit is contained in:
committed by
Robert Nishihara
parent
49e9bec988
commit
04fc145a44
+202
-189
@@ -82,35 +82,37 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
|
||||
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
|
||||
def remaining_nodes():
|
||||
if workers_only:
|
||||
A = []
|
||||
else:
|
||||
A = [
|
||||
try:
|
||||
|
||||
def remaining_nodes():
|
||||
if workers_only:
|
||||
A = []
|
||||
else:
|
||||
A = [
|
||||
node_id for node_id in provider.nodes({
|
||||
TAG_RAY_NODE_TYPE: "head"
|
||||
})
|
||||
]
|
||||
|
||||
A += [
|
||||
node_id for node_id in provider.nodes({
|
||||
TAG_RAY_NODE_TYPE: "head"
|
||||
TAG_RAY_NODE_TYPE: "worker"
|
||||
})
|
||||
]
|
||||
return A
|
||||
|
||||
A += [
|
||||
node_id for node_id in provider.nodes({
|
||||
TAG_RAY_NODE_TYPE: "worker"
|
||||
})
|
||||
]
|
||||
return A
|
||||
|
||||
# Loop here to check that both the head and worker nodes are actually
|
||||
# really gone
|
||||
A = remaining_nodes()
|
||||
with LogTimer("teardown_cluster: Termination done."):
|
||||
while A:
|
||||
logger.info("teardown_cluster: "
|
||||
"Terminating {} nodes...".format(len(A)))
|
||||
provider.terminate_nodes(A)
|
||||
time.sleep(1)
|
||||
A = remaining_nodes()
|
||||
|
||||
provider.cleanup()
|
||||
# Loop here to check that both the head and worker nodes are actually
|
||||
# really gone
|
||||
A = remaining_nodes()
|
||||
with LogTimer("teardown_cluster: Termination done."):
|
||||
while A:
|
||||
logger.info("teardown_cluster: "
|
||||
"Terminating {} nodes...".format(len(A)))
|
||||
provider.terminate_nodes(A)
|
||||
time.sleep(1)
|
||||
A = remaining_nodes()
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
def kill_node(config_file, yes, override_cluster_name):
|
||||
@@ -147,121 +149,125 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
|
||||
override_cluster_name):
|
||||
"""Create the cluster head node, which in turn creates the workers."""
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: "head",
|
||||
}
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
if len(nodes) > 0:
|
||||
try:
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: "head",
|
||||
}
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
if len(nodes) > 0:
|
||||
head_node = nodes[0]
|
||||
else:
|
||||
head_node = None
|
||||
|
||||
if not head_node:
|
||||
confirm("This will create a new cluster", yes)
|
||||
elif not no_restart:
|
||||
confirm("This will restart cluster services", yes)
|
||||
|
||||
launch_hash = hash_launch_conf(config["head_node"], config["auth"])
|
||||
if head_node is None or provider.node_tags(head_node).get(
|
||||
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
|
||||
if head_node is not None:
|
||||
confirm("Head node config out-of-date. It will be terminated",
|
||||
yes)
|
||||
logger.info(
|
||||
"get_or_create_head_node: "
|
||||
"Terminating outdated head node {}".format(head_node))
|
||||
provider.terminate_node(head_node)
|
||||
logger.info("get_or_create_head_node: Launching new head node...")
|
||||
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
|
||||
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
|
||||
config["cluster_name"])
|
||||
provider.create_node(config["head_node"], head_node_tags, 1)
|
||||
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
assert len(nodes) == 1, "Failed to create head node."
|
||||
head_node = nodes[0]
|
||||
else:
|
||||
head_node = None
|
||||
|
||||
if not head_node:
|
||||
confirm("This will create a new cluster", yes)
|
||||
elif not no_restart:
|
||||
confirm("This will restart cluster services", yes)
|
||||
# TODO(ekl) right now we always update the head node even if the hash
|
||||
# matches. We could prompt the user for what they want to do here.
|
||||
runtime_hash = hash_runtime_conf(config["file_mounts"], config)
|
||||
logger.info("get_or_create_head_node: Updating files on head node...")
|
||||
|
||||
launch_hash = hash_launch_conf(config["head_node"], config["auth"])
|
||||
if head_node is None or provider.node_tags(head_node).get(
|
||||
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
|
||||
if head_node is not None:
|
||||
confirm("Head node config out-of-date. It will be terminated", yes)
|
||||
logger.info("get_or_create_head_node: "
|
||||
"Terminating outdated head node {}".format(head_node))
|
||||
provider.terminate_node(head_node)
|
||||
logger.info("get_or_create_head_node: Launching new head node...")
|
||||
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
|
||||
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
|
||||
config["cluster_name"])
|
||||
provider.create_node(config["head_node"], head_node_tags, 1)
|
||||
# Rewrite the auth config so that the head node can update the workers
|
||||
remote_key_path = "~/ray_bootstrap_key.pem"
|
||||
remote_config = copy.deepcopy(config)
|
||||
remote_config["auth"]["ssh_private_key"] = remote_key_path
|
||||
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
assert len(nodes) == 1, "Failed to create head node."
|
||||
head_node = nodes[0]
|
||||
# Adjust for new file locations
|
||||
new_mounts = {}
|
||||
for remote_path in config["file_mounts"]:
|
||||
new_mounts[remote_path] = remote_path
|
||||
remote_config["file_mounts"] = new_mounts
|
||||
remote_config["no_restart"] = no_restart
|
||||
|
||||
# TODO(ekl) right now we always update the head node even if the hash
|
||||
# matches. We could prompt the user for what they want to do in this case.
|
||||
runtime_hash = hash_runtime_conf(config["file_mounts"], config)
|
||||
logger.info("get_or_create_head_node: Updating files on head node...")
|
||||
# Now inject the rewritten config and SSH key into the head node
|
||||
remote_config_file = tempfile.NamedTemporaryFile(
|
||||
"w", prefix="ray-bootstrap-")
|
||||
remote_config_file.write(json.dumps(remote_config))
|
||||
remote_config_file.flush()
|
||||
config["file_mounts"].update({
|
||||
remote_key_path: config["auth"]["ssh_private_key"],
|
||||
"~/ray_bootstrap_config.yaml": remote_config_file.name
|
||||
})
|
||||
|
||||
# Rewrite the auth config so that the head node can update the workers
|
||||
remote_key_path = "~/ray_bootstrap_key.pem"
|
||||
remote_config = copy.deepcopy(config)
|
||||
remote_config["auth"]["ssh_private_key"] = remote_key_path
|
||||
if restart_only:
|
||||
init_commands = config["head_start_ray_commands"]
|
||||
elif no_restart:
|
||||
init_commands = (
|
||||
config["setup_commands"] + config["head_setup_commands"])
|
||||
else:
|
||||
init_commands = (
|
||||
config["setup_commands"] + config["head_setup_commands"] +
|
||||
config["head_start_ray_commands"])
|
||||
|
||||
# Adjust for new file locations
|
||||
new_mounts = {}
|
||||
for remote_path in config["file_mounts"]:
|
||||
new_mounts[remote_path] = remote_path
|
||||
remote_config["file_mounts"] = new_mounts
|
||||
remote_config["no_restart"] = no_restart
|
||||
updater = NodeUpdaterThread(
|
||||
head_node,
|
||||
config["provider"],
|
||||
provider,
|
||||
config["auth"],
|
||||
config["cluster_name"],
|
||||
config["file_mounts"],
|
||||
init_commands,
|
||||
runtime_hash,
|
||||
)
|
||||
updater.start()
|
||||
updater.join()
|
||||
|
||||
# Now inject the rewritten config and SSH key into the head node
|
||||
remote_config_file = tempfile.NamedTemporaryFile(
|
||||
"w", prefix="ray-bootstrap-")
|
||||
remote_config_file.write(json.dumps(remote_config))
|
||||
remote_config_file.flush()
|
||||
config["file_mounts"].update({
|
||||
remote_key_path: config["auth"]["ssh_private_key"],
|
||||
"~/ray_bootstrap_config.yaml": remote_config_file.name
|
||||
})
|
||||
# Refresh the node cache so we see the external ip if available
|
||||
provider.nodes(head_node_tags)
|
||||
|
||||
if restart_only:
|
||||
init_commands = config["head_start_ray_commands"]
|
||||
elif no_restart:
|
||||
init_commands = (
|
||||
config["setup_commands"] + config["head_setup_commands"])
|
||||
else:
|
||||
init_commands = (
|
||||
config["setup_commands"] + config["head_setup_commands"] +
|
||||
config["head_start_ray_commands"])
|
||||
if updater.exitcode != 0:
|
||||
logger.error("get_or_create_head_node: "
|
||||
"Updating {} failed".format(
|
||||
provider.external_ip(head_node)))
|
||||
sys.exit(1)
|
||||
logger.info("get_or_create_head_node: "
|
||||
"Head node up-to-date, IP address is: {}".format(
|
||||
provider.external_ip(head_node)))
|
||||
|
||||
updater = NodeUpdaterThread(
|
||||
head_node,
|
||||
config["provider"],
|
||||
provider,
|
||||
config["auth"],
|
||||
config["cluster_name"],
|
||||
config["file_mounts"],
|
||||
init_commands,
|
||||
runtime_hash,
|
||||
)
|
||||
updater.start()
|
||||
updater.join()
|
||||
|
||||
# Refresh the node cache so we see the external ip if available
|
||||
provider.nodes(head_node_tags)
|
||||
|
||||
if updater.exitcode != 0:
|
||||
logger.error("get_or_create_head_node: "
|
||||
"Updating {} failed".format(
|
||||
provider.external_ip(head_node)))
|
||||
sys.exit(1)
|
||||
logger.info("get_or_create_head_node: "
|
||||
"Head node up-to-date, IP address is: {}".format(
|
||||
provider.external_ip(head_node)))
|
||||
|
||||
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
|
||||
for s in init_commands:
|
||||
if ("ray start" in s and "docker exec" in s
|
||||
and "--autoscaling-config" in s):
|
||||
monitor_str = "docker exec {} /bin/sh -c {}".format(
|
||||
config["docker"]["container_name"], quote(monitor_str))
|
||||
if override_cluster_name:
|
||||
modifiers = " --cluster-name={}".format(quote(override_cluster_name))
|
||||
else:
|
||||
modifiers = ""
|
||||
print("To monitor auto-scaling activity, you can run:\n\n"
|
||||
" ray exec {} {}{}\n".format(config_file, quote(monitor_str),
|
||||
modifiers))
|
||||
print("To open a console on the cluster:\n\n"
|
||||
" ray attach {}{}\n".format(config_file, modifiers))
|
||||
print("To ssh manually to the cluster, run:\n\n"
|
||||
" ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
|
||||
config["auth"]["ssh_user"],
|
||||
provider.external_ip(head_node)))
|
||||
|
||||
provider.cleanup()
|
||||
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
|
||||
for s in init_commands:
|
||||
if ("ray start" in s and "docker exec" in s
|
||||
and "--autoscaling-config" in s):
|
||||
monitor_str = "docker exec {} /bin/sh -c {}".format(
|
||||
config["docker"]["container_name"], quote(monitor_str))
|
||||
if override_cluster_name:
|
||||
modifiers = " --cluster-name={}".format(
|
||||
quote(override_cluster_name))
|
||||
else:
|
||||
modifiers = ""
|
||||
print("To monitor auto-scaling activity, you can run:\n\n"
|
||||
" ray exec {} {}{}\n".format(config_file, quote(monitor_str),
|
||||
modifiers))
|
||||
print("To open a console on the cluster:\n\n"
|
||||
" ray attach {}{}\n".format(config_file, modifiers))
|
||||
print("To ssh manually to the cluster, run:\n\n"
|
||||
" ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
|
||||
config["auth"]["ssh_user"],
|
||||
provider.external_ip(head_node)))
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
def attach_cluster(config_file, start, use_tmux, override_cluster_name, new):
|
||||
@@ -314,43 +320,45 @@ def exec_cluster(config_file, cmd, screen, tmux, stop, start,
|
||||
config, config_file, override_cluster_name, create_if_needed=start)
|
||||
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
updater = NodeUpdaterThread(
|
||||
head_node,
|
||||
config["provider"],
|
||||
provider,
|
||||
config["auth"],
|
||||
config["cluster_name"],
|
||||
config["file_mounts"],
|
||||
[],
|
||||
"",
|
||||
)
|
||||
if stop:
|
||||
cmd += ("; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes "
|
||||
try:
|
||||
updater = NodeUpdaterThread(
|
||||
head_node,
|
||||
config["provider"],
|
||||
provider,
|
||||
config["auth"],
|
||||
config["cluster_name"],
|
||||
config["file_mounts"],
|
||||
[],
|
||||
"",
|
||||
)
|
||||
if stop:
|
||||
cmd += (
|
||||
"; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes "
|
||||
"--workers-only; sudo shutdown -h now")
|
||||
_exec(
|
||||
updater,
|
||||
cmd,
|
||||
screen,
|
||||
tmux,
|
||||
expect_error=stop,
|
||||
port_forward=port_forward)
|
||||
_exec(
|
||||
updater,
|
||||
cmd,
|
||||
screen,
|
||||
tmux,
|
||||
expect_error=stop,
|
||||
port_forward=port_forward)
|
||||
|
||||
if tmux or screen:
|
||||
attach_command_parts = ["ray attach", config_file]
|
||||
if override_cluster_name is not None:
|
||||
attach_command_parts.append(
|
||||
"--cluster-name={}".format(override_cluster_name))
|
||||
if tmux:
|
||||
attach_command_parts.append("--tmux")
|
||||
elif screen:
|
||||
attach_command_parts.append("--screen")
|
||||
if tmux or screen:
|
||||
attach_command_parts = ["ray attach", config_file]
|
||||
if override_cluster_name is not None:
|
||||
attach_command_parts.append(
|
||||
"--cluster-name={}".format(override_cluster_name))
|
||||
if tmux:
|
||||
attach_command_parts.append("--tmux")
|
||||
elif screen:
|
||||
attach_command_parts.append("--screen")
|
||||
|
||||
attach_command = " ".join(attach_command_parts)
|
||||
attach_info = "Use `{}` to check on command status.".format(
|
||||
attach_command)
|
||||
logger.info(attach_info)
|
||||
|
||||
provider.cleanup()
|
||||
attach_command = " ".join(attach_command_parts)
|
||||
attach_info = "Use `{}` to check on command status.".format(
|
||||
attach_command)
|
||||
logger.info(attach_info)
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
def _exec(updater, cmd, screen, tmux, expect_error=False, port_forward=None):
|
||||
@@ -395,23 +403,24 @@ def rsync(config_file, source, target, override_cluster_name, down):
|
||||
config, config_file, override_cluster_name, create_if_needed=False)
|
||||
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
updater = NodeUpdaterThread(
|
||||
head_node,
|
||||
config["provider"],
|
||||
provider,
|
||||
config["auth"],
|
||||
config["cluster_name"],
|
||||
config["file_mounts"],
|
||||
[],
|
||||
"",
|
||||
)
|
||||
if down:
|
||||
rsync = updater.rsync_down
|
||||
else:
|
||||
rsync = updater.rsync_up
|
||||
rsync(source, target, check_error=False)
|
||||
|
||||
provider.cleanup()
|
||||
try:
|
||||
updater = NodeUpdaterThread(
|
||||
head_node,
|
||||
config["provider"],
|
||||
provider,
|
||||
config["auth"],
|
||||
config["cluster_name"],
|
||||
config["file_mounts"],
|
||||
[],
|
||||
"",
|
||||
)
|
||||
if down:
|
||||
rsync = updater.rsync_down
|
||||
else:
|
||||
rsync = updater.rsync_up
|
||||
rsync(source, target, check_error=False)
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
def get_head_node_ip(config_file, override_cluster_name):
|
||||
@@ -422,9 +431,11 @@ def get_head_node_ip(config_file, override_cluster_name):
|
||||
config["cluster_name"] = override_cluster_name
|
||||
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
head_node = _get_head_node(config, config_file, override_cluster_name)
|
||||
ip = provider.external_ip(head_node)
|
||||
provider.cleanup()
|
||||
try:
|
||||
head_node = _get_head_node(config, config_file, override_cluster_name)
|
||||
ip = provider.external_ip(head_node)
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
return ip
|
||||
|
||||
@@ -445,11 +456,13 @@ def _get_head_node(config,
|
||||
override_cluster_name,
|
||||
create_if_needed=False):
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: "head",
|
||||
}
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
provider.cleanup()
|
||||
try:
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: "head",
|
||||
}
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
if len(nodes) > 0:
|
||||
head_node = nodes[0]
|
||||
|
||||
@@ -11,28 +11,38 @@ import subprocess
|
||||
|
||||
import ray
|
||||
|
||||
if __name__ == "__main__":
|
||||
rllib_home = os.path.abspath(os.path.join(ray.__file__, "../rllib"))
|
||||
local_home = os.path.abspath(os.path.dirname(__file__))
|
||||
assert os.path.isdir(rllib_home), rllib_home
|
||||
|
||||
def do_link(package):
|
||||
package_home = os.path.abspath(
|
||||
os.path.join(ray.__file__, "../{}".format(package)))
|
||||
local_home = os.path.abspath(
|
||||
os.path.join(__file__, "../../{}".format(package)))
|
||||
assert os.path.isdir(package_home), package_home
|
||||
assert os.path.isdir(local_home), local_home
|
||||
click.confirm(
|
||||
"This will replace:\n {}\nwith a symlink to:\n {}".format(
|
||||
rllib_home, local_home),
|
||||
abort=True)
|
||||
if os.access(os.path.dirname(rllib_home), os.W_OK):
|
||||
subprocess.check_call(["rm", "-rf", rllib_home])
|
||||
subprocess.check_call(["ln", "-s", local_home, rllib_home])
|
||||
if not click.confirm(
|
||||
"This will replace:\n {}\nwith a symlink to:\n {}".format(
|
||||
package_home, local_home),
|
||||
default=True):
|
||||
return
|
||||
if os.access(os.path.dirname(package_home), os.W_OK):
|
||||
subprocess.check_call(["rm", "-rf", package_home])
|
||||
subprocess.check_call(["ln", "-s", local_home, package_home])
|
||||
else:
|
||||
print("You don't have write permission to {}, using sudo:".format(
|
||||
rllib_home))
|
||||
subprocess.check_call(["sudo", "rm", "-rf", rllib_home])
|
||||
subprocess.check_call(["sudo", "ln", "-s", local_home, rllib_home])
|
||||
package_home))
|
||||
subprocess.check_call(["sudo", "rm", "-rf", package_home])
|
||||
subprocess.check_call(["sudo", "ln", "-s", local_home, package_home])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
do_link("rllib")
|
||||
do_link("tune")
|
||||
do_link("autoscaler")
|
||||
print("Created links.\n\nIf you run into issues initializing Ray, please "
|
||||
"ensure that your local repo and the installed Ray is in sync "
|
||||
"ensure that your local repo and the installed Ray are in sync "
|
||||
"(pip install -U the latest wheels at "
|
||||
"https://ray.readthedocs.io/en/latest/installation.html, "
|
||||
"and ensure you are up-to-date on the master branch on git).\n\n"
|
||||
"Note that you may need to delete the rllib symlink when pip "
|
||||
"Note that you may need to delete the package symlinks when pip "
|
||||
"installing new Ray versions to prevent pip from overwriting files "
|
||||
"in your git repo.")
|
||||
|
||||
Reference in New Issue
Block a user