diff --git a/python/ray/ray_logging.py b/python/ray/ray_logging.py index 4e96270c4..52425e90f 100644 --- a/python/ray/ray_logging.py +++ b/python/ray/ray_logging.py @@ -1,5 +1,10 @@ import logging import os +import sys +from logging.handlers import RotatingFileHandler + +import ray +from ray.utils import binary_to_hex _default_handler = None @@ -20,6 +25,11 @@ def setup_logger(logging_level, logging_format): logger.propagate = False +""" +All components underneath here is used specifically for the default_worker.py. +""" + + class StandardStreamInterceptor: """Used to intercept stdout and stderr. @@ -44,11 +54,14 @@ class StandardStreamInterceptor: def __init__(self, logger, intercept_stdout=True): self.logger = logger + assert len(self.logger.handlers) == 1, ( + "Only one handler is allowed for the interceptor logger.") self.intercept_stdout = intercept_stdout def write(self, message): """Redirect the original message to the logger.""" self.logger.info(message) + return len(message) def flush(self): for handler in self.logger.handlers: @@ -58,3 +71,120 @@ class StandardStreamInterceptor: # Return the standard out isatty. This is used by colorful. fd = 1 if self.intercept_stdout else 2 return os.isatty(fd) + + def close(self): + handler = self.logger.handlers[0] + handler.close() + + def fileno(self): + handler = self.logger.handlers[0] + return handler.stream.fileno() + + +class StandardFdRedirectionRotatingFileHandler(RotatingFileHandler): + """RotatingFileHandler that redirects stdout and stderr to the log file. + + It is specifically used to default_worker.py. + + The only difference from this handler vs original RotatingFileHandler is + that it actually duplicates the OS level fd using os.dup2. + """ + + def __init__(self, + filename, + mode="a", + maxBytes=0, + backupCount=0, + encoding=None, + delay=False, + is_for_stdout=True): + super().__init__( + filename, + mode=mode, + maxBytes=maxBytes, + backupCount=backupCount, + encoding=encoding, + delay=delay) + self.is_for_stdout = is_for_stdout + self.switch_os_fd() + + def doRollover(self): + super().doRollover() + self.switch_os_fd() + + def get_original_stream(self): + if self.is_for_stdout: + return sys.stdout + else: + return sys.stderr + + def switch_os_fd(self): + # Old fd will automatically closed by dup2 when necessary. + os.dup2(self.stream.fileno(), self.get_original_stream().fileno()) + + +def setup_and_get_worker_interceptor_logger(args, + max_bytes=0, + backup_count=0, + is_for_stdout: bool = True): + """Setup a logger to be used to intercept worker log messages. + + NOTE: This method is only meant to be used within default_worker.py. + + Ray worker logs should be treated in a special way because + there's a need to intercept stdout and stderr to support various + ray features. For example, ray will prepend 0 or 1 in the beggining + of each log message to decide if logs should be streamed to driveres. + + This logger will also setup the RotatingFileHandler for + ray workers processes. + + If max_bytes and backup_count is not set, files will grow indefinitely. + + Args: + args: args received from default_worker.py. + max_bytes(int): maxBytes argument of RotatingFileHandler. + backup_count(int): backupCount argument of RotatingFileHandler. + is_for_stdout(bool): True if logger will be used to intercept stdout. + False otherwise. + """ + file_extension = "out" if is_for_stdout else "err" + logger = logging.getLogger(f"ray_default_worker_{file_extension}") + if len(logger.handlers) == 1: + return logger + logger.setLevel(logging.INFO) + # TODO(sang): This is how the job id is propagated to workers now. + # But eventually, it will be clearer to just pass the job id. + job_id = os.environ.get("RAY_JOB_ID") + if args.worker_type == "WORKER": + assert job_id is not None, ( + "RAY_JOB_ID should be set as an env " + "variable within default_worker.py. If you see this error, " + "please report it to Ray's Github issue.") + worker_name = "worker" + else: + job_id = ray.JobID.nil() + worker_name = "io_worker" + + # Make sure these values are set already. + assert ray.worker._global_node is not None + assert ray.worker.global_worker is not None + filename = (f"{ray.worker._global_node.get_session_dir_path()}/logs/" + f"{worker_name}-" + f"{binary_to_hex(ray.worker.global_worker.worker_id)}-" + f"{job_id}-{os.getpid()}.{file_extension}") + handler = StandardFdRedirectionRotatingFileHandler( + filename, + maxBytes=max_bytes, + backupCount=backup_count, + is_for_stdout=is_for_stdout) + logger.addHandler(handler) + # TODO(sang): Add 0 or 1 to decide whether + # or not logs are streamed to drivers. + handler.setFormatter(logging.Formatter("%(message)s")) + # Avoid messages are propagated to parent loggers. + logger.propagate = False + # Remove the terminator. It is important because we don't want this + # logger to add a newline at the end of string. + handler.terminator = "" + return logger diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 587a35386..5574b2e4c 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -1,12 +1,9 @@ import argparse import base64 import json -import logging import time import sys import os -from contextlib import redirect_stdout, redirect_stderr -from logging.handlers import RotatingFileHandler import ray import ray.actor @@ -14,142 +11,8 @@ import ray.node import ray.ray_constants as ray_constants import ray.utils from ray.parameter import RayParams -from ray.ray_logging import StandardStreamInterceptor - - -def setup_and_get_worker_interceptor_logger(is_for_stdout: bool = True): - """Setup a logger to be used to intercept worker log messages. - - NOTE: The method is not idempotent. - - Ray worker logs should be treated in a special way because - there's a need to intercept stdout and stderr to support various - ray features. For example, ray will prepend 0 or 1 in the beggining - of each log message to decide if logs should be streamed to driveres. - - This logger will also setup the RotatingFileHandler for - ray workers processes. - - Args: - is_for_stdout(bool): True if logger will be used to intercept stdout. - False otherwise. - """ - file_extension = "out" if is_for_stdout else "err" - logger = logging.getLogger(f"ray_default_worker_{file_extension}") - logger.setLevel(logging.INFO) - # TODO(sang): This is how the job id is propagated to workers now. - # But eventually, it will be clearer to just pass the job id. - job_id = os.environ.get("RAY_JOB_ID") - if args.worker_type == "WORKER": - assert job_id is not None, ( - "RAY_JOB_ID should be set as an env " - "variable within default_worker.py. If you see this error, " - "please report it to Ray's Github issue.") - worker_name = "worker" - else: - job_id = ray.JobID.nil() - worker_name = "io_worker" - - # Make sure these values are set already. - assert ray.worker._global_node is not None - assert ray.worker.global_worker is not None - handler = RotatingFileHandler( - f"{ray.worker._global_node.get_session_dir_path()}/logs/" - f"{worker_name}-" - f"{ray.utils.binary_to_hex(ray.worker.global_worker.worker_id)}-" - f"{job_id}-{os.getpid()}.{file_extension}") - logger.addHandler(handler) - # TODO(sang): Add 0 or 1 to decide whether - # or not logs are streamed to drivers. - handler.setFormatter(logging.Formatter("%(message)s")) - # Avoid messages are propagated to parent loggers. - logger.propagate = False - # Remove the terminator. It is important because we don't want this - # logger to add a newline at the end of string. - handler.terminator = "" - return logger - - -def main(args): - ray.ray_logging.setup_logger(args.logging_level, args.logging_format) - - if args.worker_type == "WORKER": - mode = ray.WORKER_MODE - elif args.worker_type == "SPILL_WORKER": - mode = ray.SPILL_WORKER_MODE - elif args.worker_type == "RESTORE_WORKER": - mode = ray.RESTORE_WORKER_MODE - else: - raise ValueError("Unknown worker type: " + args.worker_type) - - # NOTE(suquark): We must initialize the external storage before we - # connect to raylet. Otherwise we may receive requests before the - # external storage is intialized. - if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: - from ray import external_storage - if args.object_spilling_config: - object_spilling_config = base64.b64decode( - args.object_spilling_config) - object_spilling_config = json.loads(object_spilling_config) - else: - object_spilling_config = {} - external_storage.setup_external_storage(object_spilling_config) - - raylet_ip_address = args.raylet_ip_address - if raylet_ip_address is None: - raylet_ip_address = args.node_ip_address - - code_search_path = args.code_search_path - if code_search_path is not None: - for p in code_search_path.split(":"): - if os.path.isfile(p): - p = os.path.dirname(p) - sys.path.append(p) - - ray_params = RayParams( - node_ip_address=args.node_ip_address, - raylet_ip_address=raylet_ip_address, - node_manager_port=args.node_manager_port, - redis_address=args.redis_address, - redis_password=args.redis_password, - plasma_store_socket_name=args.object_store_name, - raylet_socket_name=args.raylet_name, - temp_dir=args.temp_dir, - load_code_from_local=args.load_code_from_local, - metrics_agent_port=args.metrics_agent_port, - ) - - node = ray.node.Node( - ray_params, - head=False, - shutdown_at_exit=False, - spawn_reaper=False, - connect_only=True) - ray.worker._global_node = node - ray.worker.connect(node, mode=mode) - - # Redirect stdout and stderr to the default worker interceptor logger. - # NOTE: We deprecated redirect_worker_output arg, - # so we don't need to handle here. - stdout_interceptor = StandardStreamInterceptor( - setup_and_get_worker_interceptor_logger(is_for_stdout=True), - intercept_stdout=True) - stderr_interceptor = StandardStreamInterceptor( - setup_and_get_worker_interceptor_logger(is_for_stdout=False), - intercept_stdout=False) - with redirect_stdout(stdout_interceptor): - with redirect_stderr(stderr_interceptor): - if mode == ray.WORKER_MODE: - ray.worker.global_worker.main_loop() - elif (mode == ray.RESTORE_WORKER_MODE - or mode == ray.SPILL_WORKER_MODE): - # It is handled by another thread in the C++ core worker. - # We just need to keep the worker alive. - while True: - time.sleep(100000) - else: - raise ValueError(f"Unexcepted worker mode: {mode}") - +from ray.ray_logging import (StandardStreamInterceptor, + setup_and_get_worker_interceptor_logger) parser = argparse.ArgumentParser( description=("Parse addresses for the worker " @@ -248,5 +111,89 @@ parser.add_argument( "the search path for user code. This will be used as `CLASSPATH` in " "Java and `PYTHONPATH` in Python.") if __name__ == "__main__": + # NOTE(sang): For some reason, if we move the code below + # to a separate function, tensorflow will capture that method + # as a step function. For more details, check out + # https://github.com/ray-project/ray/pull/12225#issue-525059663. args = parser.parse_args() - main(args) + ray.ray_logging.setup_logger(args.logging_level, args.logging_format) + + if args.worker_type == "WORKER": + mode = ray.WORKER_MODE + elif args.worker_type == "SPILL_WORKER": + mode = ray.SPILL_WORKER_MODE + elif args.worker_type == "RESTORE_WORKER": + mode = ray.RESTORE_WORKER_MODE + else: + raise ValueError("Unknown worker type: " + args.worker_type) + + # NOTE(suquark): We must initialize the external storage before we + # connect to raylet. Otherwise we may receive requests before the + # external storage is intialized. + if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: + from ray import external_storage + if args.object_spilling_config: + object_spilling_config = base64.b64decode( + args.object_spilling_config) + object_spilling_config = json.loads(object_spilling_config) + else: + object_spilling_config = {} + external_storage.setup_external_storage(object_spilling_config) + + raylet_ip_address = args.raylet_ip_address + if raylet_ip_address is None: + raylet_ip_address = args.node_ip_address + + code_search_path = args.code_search_path + if code_search_path is not None: + for p in code_search_path.split(":"): + if os.path.isfile(p): + p = os.path.dirname(p) + sys.path.append(p) + + ray_params = RayParams( + node_ip_address=args.node_ip_address, + raylet_ip_address=raylet_ip_address, + node_manager_port=args.node_manager_port, + redis_address=args.redis_address, + redis_password=args.redis_password, + plasma_store_socket_name=args.object_store_name, + raylet_socket_name=args.raylet_name, + temp_dir=args.temp_dir, + load_code_from_local=args.load_code_from_local, + metrics_agent_port=args.metrics_agent_port, + ) + + node = ray.node.Node( + ray_params, + head=False, + shutdown_at_exit=False, + spawn_reaper=False, + connect_only=True) + ray.worker._global_node = node + ray.worker.connect(node, mode=mode) + + # Redirect stdout and stderr to the default worker interceptor logger. + # NOTE: We deprecated redirect_worker_output arg, + # so we don't need to handle here. + stdout_interceptor = StandardStreamInterceptor( + setup_and_get_worker_interceptor_logger(args, is_for_stdout=True), + intercept_stdout=True) + stderr_interceptor = StandardStreamInterceptor( + setup_and_get_worker_interceptor_logger(args, is_for_stdout=False), + intercept_stdout=False) + # Although the os level fd is duplicated already, we should overwrite + # the python level stdout/stderr object. + # Otherwise, buffers won't be flushed. + sys.stdout = stdout_interceptor + sys.stderr = stderr_interceptor + + if mode == ray.WORKER_MODE: + ray.worker.global_worker.main_loop() + elif (mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE): + # It is handled by another thread in the C++ core worker. + # We just need to keep the worker alive. + while True: + time.sleep(100000) + else: + raise ValueError(f"Unexcepted worker mode: {mode}")