[autoscaler] Removed .cleanup() from NodeProvider and commands.py (#11543)

This commit is contained in:
Gekho457
2020-10-22 17:46:49 -04:00
committed by GitHub
parent 47531ac7e6
commit 2d1f52c21c
2 changed files with 390 additions and 434 deletions
+390 -430
View File
@@ -323,87 +323,82 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
logger, "Ignoring error attempting a clean shutdown.")
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
def remaining_nodes():
workers = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
def remaining_nodes():
workers = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
if keep_min_workers:
min_workers = config.get("min_workers", 0)
if keep_min_workers:
min_workers = config.get("min_workers", 0)
cli_logger.print(
"{} random worker nodes will not be shut down. " +
cf.dimmed("(due to {})"), cf.bold(min_workers),
cf.bold("--keep-min-workers"))
cli_logger.old_info(logger,
"teardown_cluster: Keeping {} nodes...",
min_workers)
cli_logger.print(
"{} random worker nodes will not be shut down. " +
cf.dimmed("(due to {})"), cf.bold(min_workers),
cf.bold("--keep-min-workers"))
cli_logger.old_info(
logger, "teardown_cluster: Keeping {} nodes...", min_workers)
workers = random.sample(workers, len(workers) - min_workers)
workers = random.sample(workers, len(workers) - min_workers)
# todo: it's weird to kill the head node but not all workers
if workers_only:
cli_logger.print(
"The head node will not be shut down. " +
cf.dimmed("(due to {})"), cf.bold("--workers-only"))
# todo: it's weird to kill the head node but not all workers
if workers_only:
cli_logger.print(
"The head node will not be shut down. " +
cf.dimmed("(due to {})"), cf.bold("--workers-only"))
return workers
return workers
head = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})
head = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})
return head + workers
return head + workers
def run_docker_stop(node, container_name):
try:
exec_cluster(
config_file,
cmd=f"docker stop {container_name}",
run_env="host",
screen=False,
tmux=False,
stop=False,
start=False,
override_cluster_name=override_cluster_name,
port_forward=None,
with_output=False)
except Exception:
cli_logger.warning(f"Docker stop failed on {node}")
cli_logger.old_warning(logger, f"Docker stop failed on {node}")
def run_docker_stop(node, container_name):
try:
exec_cluster(
config_file,
cmd=f"docker stop {container_name}",
run_env="host",
screen=False,
tmux=False,
stop=False,
start=False,
override_cluster_name=override_cluster_name,
port_forward=None,
with_output=False)
except Exception:
cli_logger.warning(f"Docker stop failed on {node}")
cli_logger.old_warning(logger, f"Docker stop failed on {node}")
# Loop here to check that both the head and worker nodes are actually
# really gone
A = remaining_nodes()
# Loop here to check that both the head and worker nodes are actually
# really gone
A = remaining_nodes()
container_name = config.get("docker", {}).get("container_name")
if container_name:
for node in A:
run_docker_stop(node, container_name)
container_name = config.get("docker", {}).get("container_name")
if container_name:
for node in A:
run_docker_stop(node, container_name)
with LogTimer("teardown_cluster: done."):
while A:
cli_logger.old_info(
logger, "teardown_cluster: "
"Shutting down {} nodes...", len(A))
with LogTimer("teardown_cluster: done."):
while A:
cli_logger.old_info(
logger, "teardown_cluster: "
"Shutting down {} nodes...", len(A))
provider.terminate_nodes(A)
provider.terminate_nodes(A)
cli_logger.print(
"Requested {} nodes to shut down.",
cf.bold(len(A)),
_tags=dict(interval="1s"))
cli_logger.print(
"Requested {} nodes to shut down.",
cf.bold(len(A)),
_tags=dict(interval="1s"))
time.sleep(
POLL_INTERVAL) # todo: interval should be a variable
A = remaining_nodes()
cli_logger.print("{} nodes remaining after {} second(s).",
cf.bold(len(A)), POLL_INTERVAL)
cli_logger.success("No nodes remaining.")
finally:
provider.cleanup()
time.sleep(POLL_INTERVAL) # todo: interval should be a variable
A = remaining_nodes()
cli_logger.print("{} nodes remaining after {} second(s).",
cf.bold(len(A)), POLL_INTERVAL)
cli_logger.success("No nodes remaining.")
def kill_node(config_file: str, yes: bool, hard: bool,
@@ -419,41 +414,38 @@ def kill_node(config_file: str, yes: bool, hard: bool,
cli_logger.old_confirm("This will kill a node in your cluster", yes)
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
node = random.choice(nodes)
cli_logger.print("Shutdown " + cf.bold("{}"), node)
cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node)
if hard:
provider.terminate_node(node)
else:
updater = NodeUpdaterThread(
node_id=node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=[],
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
is_head_node=False,
docker_config=config.get("docker"))
nodes = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
node = random.choice(nodes)
cli_logger.print("Shutdown " + cf.bold("{}"), node)
cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node)
if hard:
provider.terminate_node(node)
else:
updater = NodeUpdaterThread(
node_id=node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=[],
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
is_head_node=False,
docker_config=config.get("docker"))
_exec(updater, "ray stop", False, False)
_exec(updater, "ray stop", False, False)
time.sleep(POLL_INTERVAL)
time.sleep(POLL_INTERVAL)
if config.get("provider", {}).get("use_internal_ips", False) is True:
node_ip = provider.internal_ip(node)
else:
node_ip = provider.external_ip(node)
finally:
provider.cleanup()
if config.get("provider", {}).get("use_internal_ips", False) is True:
node_ip = provider.internal_ip(node)
else:
node_ip = provider.external_ip(node)
return node_ip
@@ -513,255 +505,245 @@ def get_or_create_head_node(config: Dict[str, Any],
config = copy.deepcopy(config)
config_file = os.path.abspath(config_file)
try:
head_node_tags = {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
}
nodes = provider.non_terminated_nodes(head_node_tags)
if len(nodes) > 0:
head_node = nodes[0]
else:
head_node = None
head_node_tags = {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
}
nodes = provider.non_terminated_nodes(head_node_tags)
if len(nodes) > 0:
head_node = nodes[0]
else:
head_node = None
if not head_node:
if not head_node:
cli_logger.confirm(
yes,
"No head node found. "
"Launching a new cluster.",
_abort=True)
cli_logger.old_confirm("This will create a new cluster", yes)
elif not no_restart:
cli_logger.old_confirm("This will restart cluster services", yes)
if head_node:
if restart_only:
cli_logger.confirm(
yes,
"No head node found. "
"Launching a new cluster.",
"Updating cluster configuration and "
"restarting the cluster Ray runtime. "
"Setup commands will not be run due to `{}`.\n",
cf.bold("--restart-only"),
_abort=True)
cli_logger.old_confirm("This will create a new cluster", yes)
elif not no_restart:
cli_logger.old_confirm("This will restart cluster services", yes)
elif no_restart:
cli_logger.print(
"Cluster Ray runtime will not be restarted due "
"to `{}`.", cf.bold("--no-restart"))
cli_logger.confirm(
yes,
"Updating cluster configuration and "
"running setup commands.",
_abort=True)
else:
cli_logger.print(
"Updating cluster configuration and running full setup.")
cli_logger.confirm(
yes,
cf.bold("Cluster Ray runtime will be restarted."),
_abort=True)
cli_logger.newline()
if head_node:
if restart_only:
cli_logger.confirm(
yes,
"Updating cluster configuration and "
"restarting the cluster Ray runtime. "
"Setup commands will not be run due to `{}`.\n",
cf.bold("--restart-only"),
_abort=True)
elif no_restart:
# TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
head_node_config = copy.deepcopy(config["head_node"])
if "head_node_type" in config:
head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"]
head_node_config.update(config["available_node_types"][config[
"head_node_type"]]["node_config"])
launch_hash = hash_launch_conf(head_node_config, config["auth"])
if head_node is None or provider.node_tags(head_node).get(
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
with cli_logger.group("Acquiring an up-to-date head node"):
if head_node is not None:
cli_logger.print(
"Cluster Ray runtime will not be restarted due "
"to `{}`.", cf.bold("--no-restart"))
cli_logger.confirm(
yes,
"Updating cluster configuration and "
"running setup commands.",
_abort=True)
else:
"Currently running head node is out-of-date with "
"cluster configuration")
cli_logger.print(
"Updating cluster configuration and running full setup.")
cli_logger.confirm(
yes,
cf.bold("Cluster Ray runtime will be restarted."),
_abort=True)
cli_logger.newline()
# TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
head_node_config = copy.deepcopy(config["head_node"])
if "head_node_type" in config:
head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"]
head_node_config.update(config["available_node_types"][config[
"head_node_type"]]["node_config"])
launch_hash = hash_launch_conf(head_node_config, config["auth"])
if head_node is None or provider.node_tags(head_node).get(
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
with cli_logger.group("Acquiring an up-to-date head node"):
if head_node is not None:
cli_logger.print(
"Currently running head node is out-of-date with "
"cluster configuration")
cli_logger.print(
"hash is {}, expected {}",
cf.bold(
provider.node_tags(head_node)
.get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash))
cli_logger.confirm(yes, "Relaunching it.", _abort=True)
cli_logger.old_confirm(
"Head node config out-of-date. It will be terminated",
yes)
cli_logger.old_info(
logger, "get_or_create_head_node: "
"Shutting down outdated head node {}", head_node)
provider.terminate_node(head_node)
cli_logger.print("Terminated head node {}", head_node)
"hash is {}, expected {}",
cf.bold(
provider.node_tags(head_node)
.get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash))
cli_logger.confirm(yes, "Relaunching it.", _abort=True)
cli_logger.old_confirm(
"Head node config out-of-date. It will be terminated", yes)
cli_logger.old_info(
logger,
"get_or_create_head_node: Launching new head node...")
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
config["cluster_name"])
provider.create_node(head_node_config, head_node_tags, 1)
cli_logger.print("Launched a new head node")
start = time.time()
head_node = None
with cli_logger.group("Fetching the new head node"):
while True:
if time.time() - start > 50:
cli_logger.abort(
"Head node fetch timed out.") # todo: msg
raise RuntimeError("Failed to create head node.")
nodes = provider.non_terminated_nodes(head_node_tags)
if len(nodes) == 1:
head_node = nodes[0]
break
time.sleep(POLL_INTERVAL)
cli_logger.newline()
with cli_logger.group(
"Setting up head node",
_numbered=("<>", 1, 1),
# cf.bold(provider.node_tags(head_node)[TAG_RAY_NODE_NAME]),
_tags=dict()): # add id, ARN to tags?
# TODO(ekl) right now we always update the head node even if the
# hash matches.
# We could prompt the user for what they want to do here.
# No need to pass in cluster_sync_files because we use this
# hash to set up the head node
(runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
config["file_mounts"], None, config)
cli_logger.old_info(
logger,
"get_or_create_head_node: Updating files on head node...")
# Rewrite the auth config so that the head
# node can update the workers
remote_config = copy.deepcopy(config)
# drop proxy options if they exist, otherwise
# head node won't be able to connect to workers
remote_config["auth"].pop("ssh_proxy_command", None)
if "ssh_private_key" in config["auth"]:
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config["auth"]["ssh_private_key"] = remote_key_path
# Adjust for new file locations
new_mounts = {}
for remote_path in config["file_mounts"]:
new_mounts[remote_path] = remote_path
remote_config["file_mounts"] = new_mounts
remote_config["no_restart"] = no_restart
remote_config = provider.prepare_for_head_node(remote_config)
# Now inject the rewritten config and SSH key into the head node
remote_config_file = tempfile.NamedTemporaryFile(
"w", prefix="ray-bootstrap-")
remote_config_file.write(json.dumps(remote_config))
remote_config_file.flush()
config["file_mounts"].update({
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if "ssh_private_key" in config["auth"]:
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
})
cli_logger.print("Prepared bootstrap config")
if restart_only:
setup_commands = []
ray_start_commands = config["head_start_ray_commands"]
elif no_restart:
setup_commands = config["head_setup_commands"]
ray_start_commands = []
else:
setup_commands = config["head_setup_commands"]
ray_start_commands = config["head_start_ray_commands"]
if not no_restart:
warn_about_bad_start_command(ray_start_commands)
updater = NodeUpdaterThread(
node_id=head_node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=config["initialization_commands"],
setup_commands=setup_commands,
ray_start_commands=ray_start_commands,
process_runner=_runner,
runtime_hash=runtime_hash,
file_mounts_contents_hash=file_mounts_contents_hash,
is_head_node=True,
rsync_options={
"rsync_exclude": config.get("rsync_exclude"),
"rsync_filter": config.get("rsync_filter")
},
docker_config=config.get("docker"))
updater.start()
updater.join()
# Refresh the node cache so we see the external ip if available
provider.non_terminated_nodes(head_node_tags)
if config.get("provider", {}).get("use_internal_ips",
False) is True:
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
if updater.exitcode != 0:
# todo: this does not follow the mockup and is not good enough
cli_logger.abort("Failed to setup head node.")
cli_logger.old_error(
logger, "get_or_create_head_node: "
"Updating {} failed", head_node_ip)
sys.exit(1)
"Shutting down outdated head node {}", head_node)
provider.terminate_node(head_node)
cli_logger.print("Terminated head node {}", head_node)
cli_logger.old_info(
logger, "get_or_create_head_node: "
"Head node up-to-date, IP address is: {}", head_node_ip)
logger, "get_or_create_head_node: Launching new head node...")
monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*"
if override_cluster_name:
modifiers = " --cluster-name={}".format(
quote(override_cluster_name))
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
config["cluster_name"])
provider.create_node(head_node_config, head_node_tags, 1)
cli_logger.print("Launched a new head node")
start = time.time()
head_node = None
with cli_logger.group("Fetching the new head node"):
while True:
if time.time() - start > 50:
cli_logger.abort(
"Head node fetch timed out.") # todo: msg
raise RuntimeError("Failed to create head node.")
nodes = provider.non_terminated_nodes(head_node_tags)
if len(nodes) == 1:
head_node = nodes[0]
break
time.sleep(POLL_INTERVAL)
cli_logger.newline()
with cli_logger.group(
"Setting up head node",
_numbered=("<>", 1, 1),
# cf.bold(provider.node_tags(head_node)[TAG_RAY_NODE_NAME]),
_tags=dict()): # add id, ARN to tags?
# TODO(ekl) right now we always update the head node even if the
# hash matches.
# We could prompt the user for what they want to do here.
# No need to pass in cluster_sync_files because we use this
# hash to set up the head node
(runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
config["file_mounts"], None, config)
cli_logger.old_info(
logger, "get_or_create_head_node: Updating files on head node...")
# Rewrite the auth config so that the head
# node can update the workers
remote_config = copy.deepcopy(config)
# drop proxy options if they exist, otherwise
# head node won't be able to connect to workers
remote_config["auth"].pop("ssh_proxy_command", None)
if "ssh_private_key" in config["auth"]:
remote_key_path = "~/ray_bootstrap_key.pem"
remote_config["auth"]["ssh_private_key"] = remote_key_path
# Adjust for new file locations
new_mounts = {}
for remote_path in config["file_mounts"]:
new_mounts[remote_path] = remote_path
remote_config["file_mounts"] = new_mounts
remote_config["no_restart"] = no_restart
remote_config = provider.prepare_for_head_node(remote_config)
# Now inject the rewritten config and SSH key into the head node
remote_config_file = tempfile.NamedTemporaryFile(
"w", prefix="ray-bootstrap-")
remote_config_file.write(json.dumps(remote_config))
remote_config_file.flush()
config["file_mounts"].update({
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if "ssh_private_key" in config["auth"]:
config["file_mounts"].update({
remote_key_path: config["auth"]["ssh_private_key"],
})
cli_logger.print("Prepared bootstrap config")
if restart_only:
setup_commands = []
ray_start_commands = config["head_start_ray_commands"]
elif no_restart:
setup_commands = config["head_setup_commands"]
ray_start_commands = []
else:
modifiers = ""
setup_commands = config["head_setup_commands"]
ray_start_commands = config["head_start_ray_commands"]
if cli_logger.old_style:
print("To monitor autoscaling activity, you can run:\n\n"
" ray exec {} {}{}\n".format(config_file,
quote(monitor_str), modifiers))
print("To open a console on the cluster:\n\n"
" ray attach {}{}\n".format(config_file, modifiers))
if not no_restart:
warn_about_bad_start_command(ray_start_commands)
print("To get a remote shell to the cluster manually, run:\n\n"
" {}\n".format(
updater.cmd_runner.remote_shell_command_str()))
updater = NodeUpdaterThread(
node_id=head_node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=config["initialization_commands"],
setup_commands=setup_commands,
ray_start_commands=ray_start_commands,
process_runner=_runner,
runtime_hash=runtime_hash,
file_mounts_contents_hash=file_mounts_contents_hash,
is_head_node=True,
rsync_options={
"rsync_exclude": config.get("rsync_exclude"),
"rsync_filter": config.get("rsync_filter")
},
docker_config=config.get("docker"))
updater.start()
updater.join()
cli_logger.newline()
with cli_logger.group("Useful commands"):
cli_logger.print("Monitor autoscaling with")
cli_logger.print(
cf.bold(" ray exec {}{} {}"), config_file, modifiers,
quote(monitor_str))
# Refresh the node cache so we see the external ip if available
provider.non_terminated_nodes(head_node_tags)
cli_logger.print("Connect to a terminal on the cluster head:")
cli_logger.print(
cf.bold(" ray attach {}{}"), config_file, modifiers)
if config.get("provider", {}).get("use_internal_ips", False) is True:
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
remote_shell_str = updater.cmd_runner.remote_shell_command_str()
cli_logger.print("Get a remote shell to the cluster manually:")
cli_logger.print(" {}", remote_shell_str.strip())
finally:
provider.cleanup()
if updater.exitcode != 0:
# todo: this does not follow the mockup and is not good enough
cli_logger.abort("Failed to setup head node.")
cli_logger.old_error(
logger, "get_or_create_head_node: "
"Updating {} failed", head_node_ip)
sys.exit(1)
cli_logger.old_info(
logger, "get_or_create_head_node: "
"Head node up-to-date, IP address is: {}", head_node_ip)
monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*"
if override_cluster_name:
modifiers = " --cluster-name={}".format(quote(override_cluster_name))
else:
modifiers = ""
if cli_logger.old_style:
print("To monitor autoscaling activity, you can run:\n\n"
" ray exec {} {}{}\n".format(config_file, quote(monitor_str),
modifiers))
print("To open a console on the cluster:\n\n"
" ray attach {}{}\n".format(config_file, modifiers))
print("To get a remote shell to the cluster manually, run:\n\n"
" {}\n".format(updater.cmd_runner.remote_shell_command_str()))
cli_logger.newline()
with cli_logger.group("Useful commands"):
cli_logger.print("Monitor autoscaling with")
cli_logger.print(
cf.bold(" ray exec {}{} {}"), config_file, modifiers,
quote(monitor_str))
cli_logger.print("Connect to a terminal on the cluster head:")
cli_logger.print(cf.bold(" ray attach {}{}"), config_file, modifiers)
remote_shell_str = updater.cmd_runner.remote_shell_command_str()
cli_logger.print("Get a remote shell to the cluster manually:")
cli_logger.print(" {}", remote_shell_str.strip())
def attach_cluster(config_file: str,
@@ -857,62 +839,59 @@ def exec_cluster(config_file: str,
config, config_file, override_cluster_name, create_if_needed=start)
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
updater = NodeUpdaterThread(
node_id=head_node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=[],
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
is_head_node=True,
rsync_options={
"rsync_exclude": config.get("rsync_exclude"),
"rsync_filter": config.get("rsync_filter")
},
docker_config=config.get("docker"))
shutdown_after_run = False
if cmd and stop:
cmd += "; ".join([
"ray stop",
"ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only"
])
shutdown_after_run = True
updater = NodeUpdaterThread(
node_id=head_node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=[],
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
file_mounts_contents_hash="",
is_head_node=True,
rsync_options={
"rsync_exclude": config.get("rsync_exclude"),
"rsync_filter": config.get("rsync_filter")
},
docker_config=config.get("docker"))
shutdown_after_run = False
if cmd and stop:
cmd += "; ".join([
"ray stop",
"ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only"
])
shutdown_after_run = True
result = _exec(
updater,
cmd,
screen,
tmux,
port_forward=port_forward,
with_output=with_output,
run_env=run_env,
shutdown_after_run=shutdown_after_run)
if tmux or screen:
attach_command_parts = ["ray attach", config_file]
if override_cluster_name is not None:
attach_command_parts.append(
"--cluster-name={}".format(override_cluster_name))
if tmux:
attach_command_parts.append("--tmux")
elif screen:
attach_command_parts.append("--screen")
result = _exec(
updater,
cmd,
screen,
tmux,
port_forward=port_forward,
with_output=with_output,
run_env=run_env,
shutdown_after_run=shutdown_after_run)
if tmux or screen:
attach_command_parts = ["ray attach", config_file]
if override_cluster_name is not None:
attach_command_parts.append(
"--cluster-name={}".format(override_cluster_name))
if tmux:
attach_command_parts.append("--tmux")
elif screen:
attach_command_parts.append("--screen")
attach_command = " ".join(attach_command_parts)
cli_logger.print("Run `{}` to check command status.",
cf.bold(attach_command))
attach_command = " ".join(attach_command_parts)
cli_logger.print("Run `{}` to check command status.",
cf.bold(attach_command))
attach_info = "Use `{}` to check on command status.".format(
attach_command)
cli_logger.old_info(logger, attach_info)
return result
finally:
provider.cleanup()
attach_info = "Use `{}` to check on command status.".format(
attach_command)
cli_logger.old_info(logger, attach_info)
return result
def _exec(updater: NodeUpdaterThread,
@@ -1028,25 +1007,20 @@ def rsync(config_file: str,
else:
updater.sync_file_mounts(rsync)
try:
nodes = []
head_node = _get_head_node(
config, config_file, override_cluster_name, create_if_needed=False)
if ip_address:
nodes = [
provider.get_node_id(
ip_address, use_internal_ip=use_internal_ip)
]
else:
if all_nodes:
nodes = _get_worker_nodes(config, override_cluster_name)
nodes += [head_node]
nodes = []
head_node = _get_head_node(
config, config_file, override_cluster_name, create_if_needed=False)
if ip_address:
nodes = [
provider.get_node_id(ip_address, use_internal_ip=use_internal_ip)
]
else:
if all_nodes:
nodes = _get_worker_nodes(config, override_cluster_name)
nodes += [head_node]
for node_id in nodes:
rsync_to_node(node_id, is_head_node=(node_id == head_node))
finally:
provider.cleanup()
for node_id in nodes:
rsync_to_node(node_id, is_head_node=(node_id == head_node))
def get_head_node_ip(config_file: str,
@@ -1058,14 +1032,11 @@ def get_head_node_ip(config_file: str,
config["cluster_name"] = override_cluster_name
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
head_node = _get_head_node(config, config_file, override_cluster_name)
if config.get("provider", {}).get("use_internal_ips", False):
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
finally:
provider.cleanup()
head_node = _get_head_node(config, config_file, override_cluster_name)
if config.get("provider", {}).get("use_internal_ips", False):
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
return head_node_ip
@@ -1080,17 +1051,14 @@ def get_worker_node_ips(config_file: str,
config["cluster_name"] = override_cluster_name
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
nodes = provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
if config.get("provider", {}).get("use_internal_ips", False) is True:
return [provider.internal_ip(node) for node in nodes]
else:
return [provider.external_ip(node) for node in nodes]
finally:
provider.cleanup()
if config.get("provider", {}).get("use_internal_ips", False) is True:
return [provider.internal_ip(node) for node in nodes]
else:
return [provider.external_ip(node) for node in nodes]
def _get_worker_nodes(config: Dict[str, Any],
@@ -1101,12 +1069,7 @@ def _get_worker_nodes(config: Dict[str, Any],
config["cluster_name"] = override_cluster_name
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
return provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
})
finally:
provider.cleanup()
return provider.non_terminated_nodes({TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
def _get_head_node(config: Dict[str, Any],
@@ -1114,13 +1077,10 @@ def _get_head_node(config: Dict[str, Any],
override_cluster_name: Optional[str],
create_if_needed: bool = False) -> str:
provider = _get_node_provider(config["provider"], config["cluster_name"])
try:
head_node_tags = {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
}
nodes = provider.non_terminated_nodes(head_node_tags)
finally:
provider.cleanup()
head_node_tags = {
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
}
nodes = provider.non_terminated_nodes(head_node_tags)
if len(nodes) > 0:
head_node = nodes[0]
-4
View File
@@ -129,10 +129,6 @@ class NodeProvider:
"{}: Terminating node".format(node_id))
self.terminate_node(node_id)
def cleanup(self) -> None:
"""Clean-up when a Provider is no longer required."""
pass
@staticmethod
def bootstrap_config(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
"""Bootstraps the cluster config by adding env defaults if needed."""