mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 03:13:07 +08:00
[Logging] Log rotation config (#13375)
* In Progress. * formatting. * in progress. * linting. * Done. * Fix typo. * Fixed the issue.
This commit is contained in:
@@ -1045,7 +1045,9 @@ def start_log_monitor(redis_address,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
redis_password=None,
|
||||
fate_share=None):
|
||||
fate_share=None,
|
||||
max_bytes=0,
|
||||
backup_count=0):
|
||||
"""Start a log monitor process.
|
||||
|
||||
Args:
|
||||
@@ -1056,17 +1058,20 @@ def start_log_monitor(redis_address,
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
no redirection should happen, then this should be None.
|
||||
redis_password (str): The password of the redis server.
|
||||
max_bytes (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's maxBytes.
|
||||
backup_count (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's backupCount.
|
||||
|
||||
Returns:
|
||||
ProcessInfo for the process that was started.
|
||||
"""
|
||||
log_monitor_filepath = os.path.join(RAY_PATH, "log_monitor.py")
|
||||
command = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
log_monitor_filepath,
|
||||
f"--redis-address={redis_address}",
|
||||
f"--logs-dir={logs_dir}",
|
||||
sys.executable, "-u", log_monitor_filepath,
|
||||
f"--redis-address={redis_address}", f"--logs-dir={logs_dir}",
|
||||
f"--logging-rotate-bytes={max_bytes}",
|
||||
f"--logging-rotate-backup-count={backup_count}"
|
||||
]
|
||||
if redis_password:
|
||||
command += ["--redis-password", redis_password]
|
||||
@@ -1088,7 +1093,9 @@ def start_dashboard(require_dashboard,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
redis_password=None,
|
||||
fate_share=None):
|
||||
fate_share=None,
|
||||
max_bytes=0,
|
||||
backup_count=0):
|
||||
"""Start a dashboard process.
|
||||
|
||||
Args:
|
||||
@@ -1107,6 +1114,10 @@ def start_dashboard(require_dashboard,
|
||||
stderr_file: A file handle opened for writing to redirect stderr to. If
|
||||
no redirection should happen, then this should be None.
|
||||
redis_password (str): The password of the redis server.
|
||||
max_bytes (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's maxBytes.
|
||||
backup_count (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's backupCount.
|
||||
|
||||
Returns:
|
||||
ProcessInfo for the process that was started.
|
||||
@@ -1132,14 +1143,11 @@ def start_dashboard(require_dashboard,
|
||||
dashboard_dir = "new_dashboard"
|
||||
dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py")
|
||||
command = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
dashboard_filepath,
|
||||
f"--host={host}",
|
||||
f"--port={port}",
|
||||
f"--redis-address={redis_address}",
|
||||
f"--temp-dir={temp_dir}",
|
||||
f"--log-dir={logdir}",
|
||||
sys.executable, "-u", dashboard_filepath, f"--host={host}",
|
||||
f"--port={port}", f"--redis-address={redis_address}",
|
||||
f"--temp-dir={temp_dir}", f"--log-dir={logdir}",
|
||||
f"--logging-rotate-bytes={max_bytes}",
|
||||
f"--logging-rotate-backup-count={backup_count}"
|
||||
]
|
||||
|
||||
if redis_password:
|
||||
@@ -1258,7 +1266,9 @@ def start_raylet(redis_address,
|
||||
fate_share=None,
|
||||
socket_to_use=None,
|
||||
head_node=False,
|
||||
start_initial_python_workers_for_first_job=False):
|
||||
start_initial_python_workers_for_first_job=False,
|
||||
max_bytes=0,
|
||||
backup_count=0):
|
||||
"""Start a raylet, which is a combined local scheduler and object manager.
|
||||
|
||||
Args:
|
||||
@@ -1295,6 +1305,10 @@ def start_raylet(redis_address,
|
||||
config (dict|None): Optional Raylet configuration that will
|
||||
override defaults in RayConfig.
|
||||
java_worker_options (list): The command options for Java worker.
|
||||
max_bytes (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's maxBytes.
|
||||
backup_count (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's backupCount.
|
||||
Returns:
|
||||
ProcessInfo for the process that was started.
|
||||
"""
|
||||
@@ -1372,6 +1386,8 @@ def start_raylet(redis_address,
|
||||
f"--config-list={config_str}",
|
||||
f"--temp-dir={temp_dir}",
|
||||
f"--metrics-agent-port={metrics_agent_port}",
|
||||
f"--logging-rotate-bytes={max_bytes}",
|
||||
f"--logging-rotate-backup-count={backup_count}",
|
||||
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
|
||||
]
|
||||
if redis_password:
|
||||
@@ -1402,6 +1418,8 @@ def start_raylet(redis_address,
|
||||
f"--raylet-name={raylet_name}",
|
||||
f"--temp-dir={temp_dir}",
|
||||
f"--log-dir={log_dir}",
|
||||
f"--logging-rotate-bytes={max_bytes}",
|
||||
f"--logging-rotate-backup-count={backup_count}",
|
||||
]
|
||||
|
||||
if redis_password is not None and len(redis_password) != 0:
|
||||
@@ -1780,7 +1798,9 @@ def start_monitor(redis_address,
|
||||
stderr_file=None,
|
||||
autoscaling_config=None,
|
||||
redis_password=None,
|
||||
fate_share=None):
|
||||
fate_share=None,
|
||||
max_bytes=0,
|
||||
backup_count=0):
|
||||
"""Run a process to monitor the other processes.
|
||||
|
||||
Args:
|
||||
@@ -1792,17 +1812,20 @@ def start_monitor(redis_address,
|
||||
no redirection should happen, then this should be None.
|
||||
autoscaling_config: path to autoscaling config file.
|
||||
redis_password (str): The password of the redis server.
|
||||
max_bytes (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's maxBytes.
|
||||
backup_count (int): Log rotation parameter. Corresponding to
|
||||
RotatingFileHandler's backupCount.
|
||||
|
||||
Returns:
|
||||
ProcessInfo for the process that was started.
|
||||
"""
|
||||
monitor_path = os.path.join(RAY_PATH, "monitor.py")
|
||||
command = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
monitor_path,
|
||||
f"--logs-dir={logs_dir}",
|
||||
"--redis-address=" + str(redis_address),
|
||||
sys.executable, "-u", monitor_path, f"--logs-dir={logs_dir}",
|
||||
f"--redis-address={redis_address}",
|
||||
f"--logging-rotate-bytes={max_bytes}",
|
||||
f"--logging-rotate-backup-count={backup_count}"
|
||||
]
|
||||
if autoscaling_config:
|
||||
command.append("--autoscaling-config=" + str(autoscaling_config))
|
||||
|
||||
+30
-2
@@ -142,6 +142,18 @@ class Node:
|
||||
if "plasma_store_as_thread" not in self._config:
|
||||
self._config["plasma_store_as_thread"] = True
|
||||
|
||||
# Configure log rotation parameters.
|
||||
self.max_bytes = int(
|
||||
os.getenv("RAY_ROTATION_MAX_BYTES",
|
||||
ray_constants.LOGGING_ROTATE_BYTES))
|
||||
self.backup_count = int(
|
||||
os.getenv("RAY_ROTATION_BACKUP_COUNT",
|
||||
ray_constants.LOGGING_ROTATE_BACKUP_COUNT))
|
||||
|
||||
assert self.max_bytes >= 0
|
||||
assert self.backup_count >= 0
|
||||
|
||||
# Register the temp dir.
|
||||
if head:
|
||||
redis_client = None
|
||||
# date including microsecond
|
||||
@@ -387,6 +399,14 @@ class Node:
|
||||
except AttributeError:
|
||||
return None
|
||||
|
||||
@property
|
||||
def logging_config(self):
|
||||
"""Get the logging config of the current node."""
|
||||
return {
|
||||
"log_rotation_max_bytes": self.max_bytes,
|
||||
"log_rotation_backup_count": self.backup_count
|
||||
}
|
||||
|
||||
@property
|
||||
def address_info(self):
|
||||
"""Get a dictionary of addresses."""
|
||||
@@ -653,7 +673,9 @@ class Node:
|
||||
stdout_file=subprocess.DEVNULL,
|
||||
stderr_file=subprocess.DEVNULL,
|
||||
redis_password=self._ray_params.redis_password,
|
||||
fate_share=self.kernel_fate_share)
|
||||
fate_share=self.kernel_fate_share,
|
||||
max_bytes=self.max_bytes,
|
||||
backup_count=self.backup_count)
|
||||
assert ray_constants.PROCESS_TYPE_LOG_MONITOR not in self.all_processes
|
||||
self.all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] = [
|
||||
process_info,
|
||||
@@ -677,6 +699,8 @@ class Node:
|
||||
stderr_file=subprocess.DEVNULL, # Avoid hang(fd inherit)
|
||||
redis_password=self._ray_params.redis_password,
|
||||
fate_share=self.kernel_fate_share,
|
||||
max_bytes=self.max_bytes,
|
||||
backup_count=self.backup_count,
|
||||
port=self._ray_params.dashboard_port)
|
||||
assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes
|
||||
if process_info is not None:
|
||||
@@ -772,6 +796,8 @@ class Node:
|
||||
fate_share=self.kernel_fate_share,
|
||||
socket_to_use=self.socket,
|
||||
head_node=self.head,
|
||||
max_bytes=self.max_bytes,
|
||||
backup_count=self.backup_count,
|
||||
start_initial_python_workers_for_first_job=self._ray_params.
|
||||
start_initial_python_workers_for_first_job)
|
||||
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
|
||||
@@ -797,7 +823,9 @@ class Node:
|
||||
stderr_file=stderr_file,
|
||||
autoscaling_config=self._ray_params.autoscaling_config,
|
||||
redis_password=self._ray_params.redis_password,
|
||||
fate_share=self.kernel_fate_share)
|
||||
fate_share=self.kernel_fate_share,
|
||||
max_bytes=self.max_bytes,
|
||||
backup_count=self.backup_count)
|
||||
assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes
|
||||
self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info]
|
||||
|
||||
|
||||
@@ -150,12 +150,9 @@ LOGGER_LEVEL = "info"
|
||||
LOGGER_LEVEL_CHOICES = ["debug", "info", "warning", "error", "critical"]
|
||||
LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info',"
|
||||
" 'warning', 'error', 'critical'], default='info'")
|
||||
# Default param for RotatingFileHandler
|
||||
# maxBytes. 10G by default. We intentionally set the default value high
|
||||
# so that users who won't care don't know about the existence of this.
|
||||
LOGGING_ROTATE_BYTES = 10 * 1000 * 1000 * 1000
|
||||
# The default will grow logs up until 500GB without log loss.
|
||||
LOGGING_ROTATE_BACKUP_COUNT = 50 # backupCount
|
||||
|
||||
LOGGING_ROTATE_BYTES = 512 * 1024 * 1024 # 512MB.
|
||||
LOGGING_ROTATE_BACKUP_COUNT = 5 # 5 Backup files at max.
|
||||
|
||||
# Constants used to define the different process types.
|
||||
PROCESS_TYPE_REAPER = "reaper"
|
||||
@@ -172,6 +169,8 @@ PROCESS_TYPE_PLASMA_STORE = "plasma_store"
|
||||
PROCESS_TYPE_REDIS_SERVER = "redis_server"
|
||||
PROCESS_TYPE_WEB_UI = "web_ui"
|
||||
PROCESS_TYPE_GCS_SERVER = "gcs_server"
|
||||
PROCESS_TYPE_PYTHON_CORE_WORKER_DRIVER = "python-core-driver"
|
||||
PROCESS_TYPE_PYTHON_CORE_WORKER = "python-core-worker"
|
||||
|
||||
# Log file names
|
||||
MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_MONITOR}.log"
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
import os
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
import ray
|
||||
from ray import ray_constants
|
||||
|
||||
|
||||
def set_logging_config(max_bytes, backup_count):
|
||||
os.environ["RAY_ROTATION_MAX_BYTES"] = str(max_bytes)
|
||||
os.environ["RAY_ROTATION_BACKUP_COUNT"] = str(backup_count)
|
||||
|
||||
|
||||
def test_log_rotation_config(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
max_bytes = 100
|
||||
backup_count = 3
|
||||
|
||||
# Create a cluster.
|
||||
set_logging_config(max_bytes, backup_count)
|
||||
head_node = cluster.add_node(num_cpus=0)
|
||||
# Set a different env var for a worker node.
|
||||
set_logging_config(0, 0)
|
||||
worker_node = cluster.add_node(num_cpus=0)
|
||||
cluster.wait_for_nodes()
|
||||
|
||||
config = head_node.logging_config
|
||||
assert config["log_rotation_max_bytes"] == max_bytes
|
||||
assert config["log_rotation_backup_count"] == backup_count
|
||||
config = worker_node.logging_config
|
||||
assert config["log_rotation_max_bytes"] == 0
|
||||
assert config["log_rotation_backup_count"] == 0
|
||||
|
||||
|
||||
def test_log_rotation(shutdown_only):
|
||||
max_bytes = 1
|
||||
backup_count = 3
|
||||
set_logging_config(max_bytes, backup_count)
|
||||
ray.init(num_cpus=1)
|
||||
session_dir = ray.worker.global_worker.node.address_info["session_dir"]
|
||||
session_path = Path(session_dir)
|
||||
log_dir_path = session_path / "logs"
|
||||
|
||||
log_rotating_component = [
|
||||
ray_constants.PROCESS_TYPE_DASHBOARD,
|
||||
ray_constants.PROCESS_TYPE_DASHBOARD_AGENT,
|
||||
ray_constants.PROCESS_TYPE_LOG_MONITOR,
|
||||
ray_constants.PROCESS_TYPE_MONITOR,
|
||||
ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER_DRIVER,
|
||||
ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER,
|
||||
# Below components are not log rotating now.
|
||||
# ray_constants.PROCESS_TYPE_RAYLET,
|
||||
# ray_constants.PROCESS_TYPE_GCS_SERVER,
|
||||
# ray_constants.PROCESS_TYPE_WORKER,
|
||||
]
|
||||
|
||||
# Run the basic workload.
|
||||
@ray.remote
|
||||
def f():
|
||||
for i in range(10):
|
||||
print(f"test {i}")
|
||||
|
||||
ray.get(f.remote())
|
||||
|
||||
paths = list(log_dir_path.iterdir())
|
||||
|
||||
def component_exist(component, paths):
|
||||
for path in paths:
|
||||
filename = path.stem
|
||||
if component in filename:
|
||||
return True
|
||||
return False
|
||||
|
||||
def component_file_size_small_enough(component):
|
||||
"""Although max_bytes is 1, the file can have size that is big.
|
||||
For example, if the logger prints the traceback, it can be
|
||||
much bigger. So, we shouldn't make the assertion too tight.
|
||||
"""
|
||||
small_enough_bytes = 512 # 512 bytes.
|
||||
for path in paths:
|
||||
if not component_exist(component, [path]):
|
||||
continue
|
||||
|
||||
if path.stat().st_size > small_enough_bytes:
|
||||
return False
|
||||
return True
|
||||
|
||||
for component in log_rotating_component:
|
||||
assert component_exist(component, paths)
|
||||
assert component_file_size_small_enough(component)
|
||||
|
||||
# Check if the backup count is respected.
|
||||
file_cnts = defaultdict(int)
|
||||
for path in paths:
|
||||
filename = path.stem
|
||||
filename_without_suffix = filename.split(".")[0]
|
||||
file_cnts[filename_without_suffix] += 1
|
||||
for filename, file_cnt in file_cnts.items():
|
||||
# There could be backup_count + 1 files.
|
||||
# EX) *.log, *.log.* (as many as backup count).
|
||||
assert file_cnt <= backup_count + 1, (
|
||||
f"{filename} has files that are more than "
|
||||
f"backup count {backup_count}, file count: {file_cnt}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
import sys
|
||||
# Make subprocess happy in bazel.
|
||||
os.environ["LC_ALL"] = "en_US.UTF-8"
|
||||
os.environ["LANG"] = "en_US.UTF-8"
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
@@ -109,6 +109,21 @@ parser.add_argument(
|
||||
help="A list of directories or jar files separated by colon that specify "
|
||||
"the search path for user code. This will be used as `CLASSPATH` in "
|
||||
"Java and `PYTHONPATH` in Python.")
|
||||
parser.add_argument(
|
||||
"--logging-rotate-bytes",
|
||||
required=False,
|
||||
type=int,
|
||||
default=ray_constants.LOGGING_ROTATE_BYTES,
|
||||
help="Specify the max bytes for rotating "
|
||||
"log file, default is "
|
||||
f"{ray_constants.LOGGING_ROTATE_BYTES} bytes.")
|
||||
parser.add_argument(
|
||||
"--logging-rotate-backup-count",
|
||||
required=False,
|
||||
type=int,
|
||||
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
|
||||
help="Specify the backup count of rotated log file, default is "
|
||||
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.")
|
||||
if __name__ == "__main__":
|
||||
# NOTE(sang): For some reason, if we move the code below
|
||||
# to a separate function, tensorflow will capture that method
|
||||
|
||||
Reference in New Issue
Block a user