diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 688babad6..d0eafc969 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1045,7 +1045,9 @@ def start_log_monitor(redis_address, stdout_file=None, stderr_file=None, redis_password=None, - fate_share=None): + fate_share=None, + max_bytes=0, + backup_count=0): """Start a log monitor process. Args: @@ -1056,17 +1058,20 @@ def start_log_monitor(redis_address, stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. redis_password (str): The password of the redis server. + max_bytes (int): Log rotation parameter. Corresponding to + RotatingFileHandler's maxBytes. + backup_count (int): Log rotation parameter. Corresponding to + RotatingFileHandler's backupCount. Returns: ProcessInfo for the process that was started. """ log_monitor_filepath = os.path.join(RAY_PATH, "log_monitor.py") command = [ - sys.executable, - "-u", - log_monitor_filepath, - f"--redis-address={redis_address}", - f"--logs-dir={logs_dir}", + sys.executable, "-u", log_monitor_filepath, + f"--redis-address={redis_address}", f"--logs-dir={logs_dir}", + f"--logging-rotate-bytes={max_bytes}", + f"--logging-rotate-backup-count={backup_count}" ] if redis_password: command += ["--redis-password", redis_password] @@ -1088,7 +1093,9 @@ def start_dashboard(require_dashboard, stdout_file=None, stderr_file=None, redis_password=None, - fate_share=None): + fate_share=None, + max_bytes=0, + backup_count=0): """Start a dashboard process. Args: @@ -1107,6 +1114,10 @@ def start_dashboard(require_dashboard, stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. redis_password (str): The password of the redis server. + max_bytes (int): Log rotation parameter. Corresponding to + RotatingFileHandler's maxBytes. + backup_count (int): Log rotation parameter. Corresponding to + RotatingFileHandler's backupCount. Returns: ProcessInfo for the process that was started. @@ -1132,14 +1143,11 @@ def start_dashboard(require_dashboard, dashboard_dir = "new_dashboard" dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py") command = [ - sys.executable, - "-u", - dashboard_filepath, - f"--host={host}", - f"--port={port}", - f"--redis-address={redis_address}", - f"--temp-dir={temp_dir}", - f"--log-dir={logdir}", + sys.executable, "-u", dashboard_filepath, f"--host={host}", + f"--port={port}", f"--redis-address={redis_address}", + f"--temp-dir={temp_dir}", f"--log-dir={logdir}", + f"--logging-rotate-bytes={max_bytes}", + f"--logging-rotate-backup-count={backup_count}" ] if redis_password: @@ -1258,7 +1266,9 @@ def start_raylet(redis_address, fate_share=None, socket_to_use=None, head_node=False, - start_initial_python_workers_for_first_job=False): + start_initial_python_workers_for_first_job=False, + max_bytes=0, + backup_count=0): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -1295,6 +1305,10 @@ def start_raylet(redis_address, config (dict|None): Optional Raylet configuration that will override defaults in RayConfig. java_worker_options (list): The command options for Java worker. + max_bytes (int): Log rotation parameter. Corresponding to + RotatingFileHandler's maxBytes. + backup_count (int): Log rotation parameter. Corresponding to + RotatingFileHandler's backupCount. Returns: ProcessInfo for the process that was started. """ @@ -1372,6 +1386,8 @@ def start_raylet(redis_address, f"--config-list={config_str}", f"--temp-dir={temp_dir}", f"--metrics-agent-port={metrics_agent_port}", + f"--logging-rotate-bytes={max_bytes}", + f"--logging-rotate-backup-count={backup_count}", "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", ] if redis_password: @@ -1402,6 +1418,8 @@ def start_raylet(redis_address, f"--raylet-name={raylet_name}", f"--temp-dir={temp_dir}", f"--log-dir={log_dir}", + f"--logging-rotate-bytes={max_bytes}", + f"--logging-rotate-backup-count={backup_count}", ] if redis_password is not None and len(redis_password) != 0: @@ -1780,7 +1798,9 @@ def start_monitor(redis_address, stderr_file=None, autoscaling_config=None, redis_password=None, - fate_share=None): + fate_share=None, + max_bytes=0, + backup_count=0): """Run a process to monitor the other processes. Args: @@ -1792,17 +1812,20 @@ def start_monitor(redis_address, no redirection should happen, then this should be None. autoscaling_config: path to autoscaling config file. redis_password (str): The password of the redis server. + max_bytes (int): Log rotation parameter. Corresponding to + RotatingFileHandler's maxBytes. + backup_count (int): Log rotation parameter. Corresponding to + RotatingFileHandler's backupCount. Returns: ProcessInfo for the process that was started. """ monitor_path = os.path.join(RAY_PATH, "monitor.py") command = [ - sys.executable, - "-u", - monitor_path, - f"--logs-dir={logs_dir}", - "--redis-address=" + str(redis_address), + sys.executable, "-u", monitor_path, f"--logs-dir={logs_dir}", + f"--redis-address={redis_address}", + f"--logging-rotate-bytes={max_bytes}", + f"--logging-rotate-backup-count={backup_count}" ] if autoscaling_config: command.append("--autoscaling-config=" + str(autoscaling_config)) diff --git a/python/ray/node.py b/python/ray/node.py index 086865023..9130b39fb 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -142,6 +142,18 @@ class Node: if "plasma_store_as_thread" not in self._config: self._config["plasma_store_as_thread"] = True + # Configure log rotation parameters. + self.max_bytes = int( + os.getenv("RAY_ROTATION_MAX_BYTES", + ray_constants.LOGGING_ROTATE_BYTES)) + self.backup_count = int( + os.getenv("RAY_ROTATION_BACKUP_COUNT", + ray_constants.LOGGING_ROTATE_BACKUP_COUNT)) + + assert self.max_bytes >= 0 + assert self.backup_count >= 0 + + # Register the temp dir. if head: redis_client = None # date including microsecond @@ -387,6 +399,14 @@ class Node: except AttributeError: return None + @property + def logging_config(self): + """Get the logging config of the current node.""" + return { + "log_rotation_max_bytes": self.max_bytes, + "log_rotation_backup_count": self.backup_count + } + @property def address_info(self): """Get a dictionary of addresses.""" @@ -653,7 +673,9 @@ class Node: stdout_file=subprocess.DEVNULL, stderr_file=subprocess.DEVNULL, redis_password=self._ray_params.redis_password, - fate_share=self.kernel_fate_share) + fate_share=self.kernel_fate_share, + max_bytes=self.max_bytes, + backup_count=self.backup_count) assert ray_constants.PROCESS_TYPE_LOG_MONITOR not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] = [ process_info, @@ -677,6 +699,8 @@ class Node: stderr_file=subprocess.DEVNULL, # Avoid hang(fd inherit) redis_password=self._ray_params.redis_password, fate_share=self.kernel_fate_share, + max_bytes=self.max_bytes, + backup_count=self.backup_count, port=self._ray_params.dashboard_port) assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes if process_info is not None: @@ -772,6 +796,8 @@ class Node: fate_share=self.kernel_fate_share, socket_to_use=self.socket, head_node=self.head, + max_bytes=self.max_bytes, + backup_count=self.backup_count, start_initial_python_workers_for_first_job=self._ray_params. start_initial_python_workers_for_first_job) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes @@ -797,7 +823,9 @@ class Node: stderr_file=stderr_file, autoscaling_config=self._ray_params.autoscaling_config, redis_password=self._ray_params.redis_password, - fate_share=self.kernel_fate_share) + fate_share=self.kernel_fate_share, + max_bytes=self.max_bytes, + backup_count=self.backup_count) assert ray_constants.PROCESS_TYPE_MONITOR not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_MONITOR] = [process_info] diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a5459b863..04dfd8f17 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -150,12 +150,9 @@ LOGGER_LEVEL = "info" LOGGER_LEVEL_CHOICES = ["debug", "info", "warning", "error", "critical"] LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info'," " 'warning', 'error', 'critical'], default='info'") -# Default param for RotatingFileHandler -# maxBytes. 10G by default. We intentionally set the default value high -# so that users who won't care don't know about the existence of this. -LOGGING_ROTATE_BYTES = 10 * 1000 * 1000 * 1000 -# The default will grow logs up until 500GB without log loss. -LOGGING_ROTATE_BACKUP_COUNT = 50 # backupCount + +LOGGING_ROTATE_BYTES = 512 * 1024 * 1024 # 512MB. +LOGGING_ROTATE_BACKUP_COUNT = 5 # 5 Backup files at max. # Constants used to define the different process types. PROCESS_TYPE_REAPER = "reaper" @@ -172,6 +169,8 @@ PROCESS_TYPE_PLASMA_STORE = "plasma_store" PROCESS_TYPE_REDIS_SERVER = "redis_server" PROCESS_TYPE_WEB_UI = "web_ui" PROCESS_TYPE_GCS_SERVER = "gcs_server" +PROCESS_TYPE_PYTHON_CORE_WORKER_DRIVER = "python-core-driver" +PROCESS_TYPE_PYTHON_CORE_WORKER = "python-core-worker" # Log file names MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_MONITOR}.log" diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py new file mode 100644 index 000000000..6796ac4f7 --- /dev/null +++ b/python/ray/tests/test_logging.py @@ -0,0 +1,112 @@ +import os +from collections import defaultdict +from pathlib import Path + +import ray +from ray import ray_constants + + +def set_logging_config(max_bytes, backup_count): + os.environ["RAY_ROTATION_MAX_BYTES"] = str(max_bytes) + os.environ["RAY_ROTATION_BACKUP_COUNT"] = str(backup_count) + + +def test_log_rotation_config(ray_start_cluster): + cluster = ray_start_cluster + max_bytes = 100 + backup_count = 3 + + # Create a cluster. + set_logging_config(max_bytes, backup_count) + head_node = cluster.add_node(num_cpus=0) + # Set a different env var for a worker node. + set_logging_config(0, 0) + worker_node = cluster.add_node(num_cpus=0) + cluster.wait_for_nodes() + + config = head_node.logging_config + assert config["log_rotation_max_bytes"] == max_bytes + assert config["log_rotation_backup_count"] == backup_count + config = worker_node.logging_config + assert config["log_rotation_max_bytes"] == 0 + assert config["log_rotation_backup_count"] == 0 + + +def test_log_rotation(shutdown_only): + max_bytes = 1 + backup_count = 3 + set_logging_config(max_bytes, backup_count) + ray.init(num_cpus=1) + session_dir = ray.worker.global_worker.node.address_info["session_dir"] + session_path = Path(session_dir) + log_dir_path = session_path / "logs" + + log_rotating_component = [ + ray_constants.PROCESS_TYPE_DASHBOARD, + ray_constants.PROCESS_TYPE_DASHBOARD_AGENT, + ray_constants.PROCESS_TYPE_LOG_MONITOR, + ray_constants.PROCESS_TYPE_MONITOR, + ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER_DRIVER, + ray_constants.PROCESS_TYPE_PYTHON_CORE_WORKER, + # Below components are not log rotating now. + # ray_constants.PROCESS_TYPE_RAYLET, + # ray_constants.PROCESS_TYPE_GCS_SERVER, + # ray_constants.PROCESS_TYPE_WORKER, + ] + + # Run the basic workload. + @ray.remote + def f(): + for i in range(10): + print(f"test {i}") + + ray.get(f.remote()) + + paths = list(log_dir_path.iterdir()) + + def component_exist(component, paths): + for path in paths: + filename = path.stem + if component in filename: + return True + return False + + def component_file_size_small_enough(component): + """Although max_bytes is 1, the file can have size that is big. + For example, if the logger prints the traceback, it can be + much bigger. So, we shouldn't make the assertion too tight. + """ + small_enough_bytes = 512 # 512 bytes. + for path in paths: + if not component_exist(component, [path]): + continue + + if path.stat().st_size > small_enough_bytes: + return False + return True + + for component in log_rotating_component: + assert component_exist(component, paths) + assert component_file_size_small_enough(component) + + # Check if the backup count is respected. + file_cnts = defaultdict(int) + for path in paths: + filename = path.stem + filename_without_suffix = filename.split(".")[0] + file_cnts[filename_without_suffix] += 1 + for filename, file_cnt in file_cnts.items(): + # There could be backup_count + 1 files. + # EX) *.log, *.log.* (as many as backup count). + assert file_cnt <= backup_count + 1, ( + f"{filename} has files that are more than " + f"backup count {backup_count}, file count: {file_cnt}") + + +if __name__ == "__main__": + import pytest + import sys + # Make subprocess happy in bazel. + os.environ["LC_ALL"] = "en_US.UTF-8" + os.environ["LANG"] = "en_US.UTF-8" + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index d9f7837ff..7b9c2677b 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -109,6 +109,21 @@ parser.add_argument( help="A list of directories or jar files separated by colon that specify " "the search path for user code. This will be used as `CLASSPATH` in " "Java and `PYTHONPATH` in Python.") +parser.add_argument( + "--logging-rotate-bytes", + required=False, + type=int, + default=ray_constants.LOGGING_ROTATE_BYTES, + help="Specify the max bytes for rotating " + "log file, default is " + f"{ray_constants.LOGGING_ROTATE_BYTES} bytes.") +parser.add_argument( + "--logging-rotate-backup-count", + required=False, + type=int, + default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT, + help="Specify the backup count of rotated log file, default is " + f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.") if __name__ == "__main__": # NOTE(sang): For some reason, if we move the code below # to a separate function, tensorflow will capture that method diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index d06a1c358..cd6bd84ce 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -369,3 +369,12 @@ RAY_CONFIG(bool, is_external_storage_type_fs, true) /// Whether to enable locality-aware leasing. If enabled, then Ray will consider task /// dependency locality when choosing a worker for leasing. RAY_CONFIG(bool, locality_aware_leasing_enabled, true) + +/* Configuration parameters for logging */ +/// Parameters for log rotation. This value is equivalent to RotatingFileHandler's +/// maxBytes argument. +RAY_CONFIG(int64_t, log_rotation_max_bytes, 100 * 1024 * 1024) + +/// Parameters for log rotation. This value is equivalent to RotatingFileHandler's +/// backupCount argument. +RAY_CONFIG(int64_t, log_rotation_backup_count, 5) diff --git a/src/ray/util/logging.cc b/src/ray/util/logging.cc index 1640c5cfc..b06d64441 100644 --- a/src/ray/util/logging.cc +++ b/src/ray/util/logging.cc @@ -307,11 +307,19 @@ void RayLog::StartRayLog(const std::string &app_name, RayLogLevel severity_thres #endif // Reset log pattern and level and we assume a log file can be rotated with // 10 files in max size 512M by default. - if (getenv("RAY_ROTATION_MAX_SIZE")) { - log_rotation_max_size_ = std::atol(getenv("RAY_RAOTATION_MAX_SIZE")); + if (getenv("RAY_ROTATION_MAX_BYTES")) { + long max_size = std::atol(getenv("RAY_ROTATION_MAX_BYTES")); + // 0 means no log rotation in python, but not in spdlog. We just use the default + // value here. + if (max_size != 0) { + log_rotation_max_size_ = max_size; + } } - if (getenv("RAY_ROTATION_FILE_NUM")) { - log_rotation_file_num_ = std::atol(getenv("RAY_ROTATION_FILE_NUM")); + if (getenv("RAY_ROTATION_BACKUP_COUNT")) { + long file_num = std::atol(getenv("RAY_ROTATION_BACKUP_COUNT")); + if (file_num != 0) { + log_rotation_file_num_ = file_num; + } } spdlog::set_pattern(log_format_pattern_); spdlog::set_level(static_cast(severity_threshold_));