diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index afb6355c4..7f02dec9f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -326,19 +326,6 @@ cdef prepare_args( CCoreWorkerProcess.GetCoreWorker().GetRpcAddress()))) -def switch_worker_log_if_needed(worker, next_job_id): - if worker.mode != ray.WORKER_MODE: - return - if (worker.current_logging_job_id is None) or \ - (worker.current_logging_job_id != next_job_id): - job_stdout_path, job_stderr_path = ( - worker.node.get_job_redirected_log_file( - worker.worker_id, next_job_id.binary()) - ) - ray.worker.set_log_file(job_stdout_path, job_stderr_path) - worker.current_logging_job_id = next_job_id - - cdef execute_task( CTaskType task_type, const c_string name, @@ -472,7 +459,6 @@ cdef execute_task( with core_worker.profile_event(b"task:execute"): task_exception = True try: - switch_worker_log_if_needed(worker, job_id) with ray.worker._changeproctitle(title, next_title): outputs = function_executor(*args, **kwargs) task_exception = False diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 5e8fac428..066e494aa 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -946,7 +946,7 @@ if __name__ == "__main__": default=None, help="Specify the path of the temporary directory use by Ray process.") args = parser.parse_args() - ray.utils.setup_logger(args.logging_level, args.logging_format) + ray.ray_logging.setup_logger(args.logging_level, args.logging_format) # TODO(sang): Add a URL validation. metrics_export_address = os.environ.get("METRICS_EXPORT_ADDRESS") diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 6d82e1282..0f5ecfeac 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -30,7 +30,8 @@ class LogFileInfo: file_position=None, file_handle=None, is_err_file=False, - job_id=None): + job_id=None, + worker_pid=None): assert (filename is not None and size_when_last_opened is not None and file_position is not None) self.filename = filename @@ -39,7 +40,7 @@ class LogFileInfo: self.file_handle = file_handle self.is_err_file = is_err_file self.job_id = job_id - self.worker_pid = None + self.worker_pid = worker_pid class LogMonitor: @@ -130,8 +131,10 @@ class LogMonitor: job_match = JOB_LOG_PATTERN.match(file_path) if job_match: job_id = job_match.group(2) + worker_pid = job_match.group(3) else: job_id = None + worker_pid = None is_err_file = file_path.endswith("err") @@ -143,7 +146,8 @@ class LogMonitor: file_position=0, file_handle=None, is_err_file=is_err_file, - job_id=job_id)) + job_id=job_id, + worker_pid=worker_pid)) log_filename = os.path.basename(file_path) logger.info(f"Beginning to track file {log_filename}") @@ -236,12 +240,7 @@ class LogMonitor: raise if file_info.file_position == 0: - if (len(lines_to_publish) > 0 and - lines_to_publish[0].startswith("Ray worker pid: ")): - file_info.worker_pid = int( - lines_to_publish[0].split(" ")[-1]) - lines_to_publish = lines_to_publish[1:] - elif "/raylet" in file_info.filename: + if "/raylet" in file_info.filename: file_info.worker_pid = "raylet" elif "/gcs_server" in file_info.filename: file_info.worker_pid = "gcs_server" @@ -315,7 +314,7 @@ if __name__ == "__main__": help="Specify the path of the temporary directory used by Ray " "processes.") args = parser.parse_args() - ray.utils.setup_logger(args.logging_level, args.logging_format) + ray.ray_logging.setup_logger(args.logging_level, args.logging_format) log_monitor = LogMonitor( args.logs_dir, args.redis_address, redis_password=args.redis_password) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index c3d47aa15..e23b97a7b 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -17,7 +17,7 @@ from ray.autoscaler._private.constants import \ import ray.gcs_utils import ray.utils import ray.ray_constants as ray_constants -from ray.utils import setup_logger +from ray.ray_logging import setup_logger from ray._raylet import GlobalStateAccessor import redis diff --git a/python/ray/node.py b/python/ray/node.py index 882dcb42a..15b5a9a3f 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -755,41 +755,6 @@ class Node: assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] - def get_job_redirected_log_file(self, - worker_id: bytes, - job_id: bytes = None): - """Determines (but does not create) logging files for workers to - redirect its output. - - Args: - worker_id (bytes): A byte representation of the worker id. - job_id (bytes): A byte representation of the job id. If None, - provides a generic log file for the worker. - - Returns: - (tuple) The stdout and stderr file names that the job should be - redirected to. - """ - redirect_output = self._ray_params.redirect_output - - if redirect_output is None: - # Make the default behavior match that of glog. - redirect_output = os.getenv("GLOG_logtostderr") != "1" - - if not redirect_output: - return None, None - - if job_id is not None: - name = "worker-{}-{}-{}".format( - ray.utils.binary_to_hex(worker_id), - ray.utils.binary_to_hex(job_id), os.getpid()) - else: - name = f"worker-{ray.utils.binary_to_hex(worker_id)}-{os.getpid()}" - - worker_stdout_file, worker_stderr_file = self._get_log_file_names( - name, unique=False) - return worker_stdout_file, worker_stderr_file - def start_worker(self): """Start a worker process.""" raise NotImplementedError diff --git a/python/ray/ray_logging.py b/python/ray/ray_logging.py new file mode 100644 index 000000000..4e96270c4 --- /dev/null +++ b/python/ray/ray_logging.py @@ -0,0 +1,60 @@ +import logging +import os + +_default_handler = None + + +def setup_logger(logging_level, logging_format): + """Setup default logging for ray.""" + logger = logging.getLogger("ray") + if type(logging_level) is str: + logging_level = logging.getLevelName(logging_level.upper()) + logger.setLevel(logging_level) + global _default_handler + if _default_handler is None: + _default_handler = logging.StreamHandler() + logger.addHandler(_default_handler) + _default_handler.setFormatter(logging.Formatter(logging_format)) + # Setting this will avoid the message + # is propagated to the parent logger. + logger.propagate = False + + +class StandardStreamInterceptor: + """Used to intercept stdout and stderr. + + Intercepted messages are handled by the given logger. + + NOTE: The logger passed to this method should always have + logging.INFO severity level. + + Example: + >>> from contextlib import redirect_stdout + >>> logger = logging.getLogger("ray_logger") + >>> hook = StandardStreamHook(logger) + >>> with redirect_stdout(hook): + >>> print("a") # stdout will be delegated to logger. + + Args: + logger: Python logger that will receive messages streamed to + the standard out/err and delegate writes. + intercept_stdout(bool): True if the class intercepts stdout. False + if stderr is intercepted. + """ + + def __init__(self, logger, intercept_stdout=True): + self.logger = logger + self.intercept_stdout = intercept_stdout + + def write(self, message): + """Redirect the original message to the logger.""" + self.logger.info(message) + + def flush(self): + for handler in self.logger.handlers: + handler.flush() + + def isatty(self): + # Return the standard out isatty. This is used by colorful. + fd = 1 if self.intercept_stdout else 2 + return os.isatty(fd) diff --git a/python/ray/reporter.py b/python/ray/reporter.py index a53bdda07..b8bbae75d 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -297,7 +297,7 @@ if __name__ == "__main__": default=ray_constants.LOGGER_FORMAT, help=ray_constants.LOGGER_FORMAT_HELP) args = parser.parse_args() - ray.utils.setup_logger(args.logging_level, args.logging_format) + ray.ray_logging.setup_logger(args.logging_level, args.logging_format) reporter = Reporter( args.redis_address, diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index b91431c07..cd3ba418f 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -97,7 +97,7 @@ def add_click_options(options): @click.version_option() def cli(logging_level, logging_format): level = logging.getLevelName(logging_level.upper()) - ray.utils.setup_logger(level, logging_format) + ray.ray_logging.setup_logger(level, logging_format) cli_logger.set_format(format_tmpl=logging_format) diff --git a/python/ray/utils.py b/python/ray/utils.py index e199db116..fb8a1964c 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -388,23 +388,6 @@ def resources_from_resource_arguments( return resources -_default_handler = None - - -def setup_logger(logging_level, logging_format): - """Setup default logging for ray.""" - logger = logging.getLogger("ray") - if type(logging_level) is str: - logging_level = logging.getLevelName(logging_level.upper()) - logger.setLevel(logging_level) - global _default_handler - if _default_handler is None: - _default_handler = logging.StreamHandler() - logger.addHandler(_default_handler) - _default_handler.setFormatter(logging.Formatter(logging_format)) - logger.propagate = False - - class Unbuffered(object): """There's no "built-in" solution to programatically disabling buffering of text files. Ray expects stdout/err to be text files, so creating an @@ -447,32 +430,6 @@ def open_log(path, unbuffered=False, **kwargs): return stream -def create_and_init_new_worker_log(path, worker_pid): - """Opens or creates and sets up a new worker log file. Note that because we - expect to dup the underlying file descriptor, then fdopen it, the python - level metadata is not important. - - Args: - path (str): The name/path of the file to be opened. - worker_pid (int): The pid of the worker process. - - Returns: - A file-like object which can be written to. - - """ - # TODO (Alex): We should eventually be able to replace this with - # named-pipes. - f = open_log(path) - # Check to see if we're creating this file. No one else should ever write - # to this file, so we don't have to worry about TOCTOU. - if f.tell() == 0: - # This should always be the first message to appear in the worker's - # stdout and stderr log files. The string "Ray worker pid:" is - # parsed in the log monitor process. - print(f"Ray worker pid: {worker_pid}", file=f) - return f - - def get_system_memory(): """Return the total amount of system memory in bytes. diff --git a/python/ray/worker.py b/python/ray/worker.py index d22d4ccd6..dc3a74815 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -47,8 +47,8 @@ from ray.exceptions import ( ObjectStoreFullError, ) from ray.function_manager import FunctionActorManager -from ray.utils import (_random_string, check_oversized_pickle, is_cython, - setup_logger, create_and_init_new_worker_log, open_log) +from ray.ray_logging import setup_logger +from ray.utils import _random_string, check_oversized_pickle, is_cython SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -887,68 +887,6 @@ last_task_error_raise_time = 0 UNCAUGHT_ERROR_GRACE_PERIOD = 5 -def _set_log_file(file_name, worker_pid, old_obj, setter_func): - # Line-buffer the output (mode 1). - f = create_and_init_new_worker_log(file_name, worker_pid) - - # TODO (Alex): Python seems to always flush when writing. If that is no - # longer true, then we need to manually flush the old buffer. - # old_obj.flush() - - # TODO (Alex): Flush the c/c++ userspace buffers if necessary. - # `fflush(stdout); cout.flush();` - - fileno = old_obj.fileno() - - # C++ logging requires redirecting the stdout file descriptor. Note that - # dup2 will automatically close the old file descriptor before overriding - # it. - os.dup2(f.fileno(), fileno) - - # We also manually set sys.stdout and sys.stderr because that seems to - # have an effect on the output buffering. Without doing this, stdout - # and stderr are heavily buffered resulting in seemingly lost logging - # statements. We never want to close the stdout file descriptor, dup2 will - # close it when necessary and we don't want python's GC to close it. - setter_func(open_log(fileno, unbuffered=True, closefd=False)) - - return os.path.abspath(f.name) - - -def set_log_file(stdout_name, stderr_name): - """Sets up logging for the current worker, creating the (fd backed) file and - flushing buffers as is necessary. - - Args: - stdout_name (str): The file name that stdout should be written to. - stderr_name(str): The file name that stderr should be written to. - - Returns: - (tuple) The absolute paths of the files that stdout and stderr will be - written to. - - """ - stdout_path = "" - stderr_path = "" - worker_pid = os.getpid() - - # lambda cannot contain assignment - def stdout_setter(x): - sys.stdout = x - - def stderr_setter(x): - sys.stderr = x - - if stdout_name: - _set_log_file(stdout_name, worker_pid, sys.stdout, stdout_setter) - - # The stderr case should be analogous to the stdout case - if stderr_name: - _set_log_file(stderr_name, worker_pid, sys.stderr, stderr_setter) - - return stdout_path, stderr_path - - def print_logs(redis_client, threads_stopped, job_id): """Prints log messages from workers on all of the nodes. @@ -1227,23 +1165,6 @@ def connect(node, import __main__ as main driver_name = (main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE") - elif (mode == WORKER_MODE or mode == SPILL_WORKER_MODE - or mode == RESTORE_WORKER_MODE): - # Check the RedirectOutput key in Redis and based on its value redirect - # worker output and error to their own files. - # This key is set in services.py when Redis is started. - redirect_worker_output_val = worker.redis_client.get("RedirectOutput") - if (redirect_worker_output_val is not None - and int(redirect_worker_output_val) == 1): - log_stdout_file_name, log_stderr_file_name = ( - node.get_job_redirected_log_file(worker.worker_id)) - try: - log_stdout_file_path, log_stderr_file_path = \ - set_log_file(log_stdout_file_name, log_stderr_file_name) - except IOError: - raise IOError( - "Workers must be able to redirect their output at" - "the file descriptor level.") elif not LOCAL_MODE: raise ValueError( "Invalid worker mode. Expected DRIVER, WORKER or LOCAL.") diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 4080b6963..587a35386 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -1,9 +1,12 @@ import argparse import base64 import json +import logging import time import sys import os +from contextlib import redirect_stdout, redirect_stderr +from logging.handlers import RotatingFileHandler import ray import ray.actor @@ -11,6 +14,142 @@ import ray.node import ray.ray_constants as ray_constants import ray.utils from ray.parameter import RayParams +from ray.ray_logging import StandardStreamInterceptor + + +def setup_and_get_worker_interceptor_logger(is_for_stdout: bool = True): + """Setup a logger to be used to intercept worker log messages. + + NOTE: The method is not idempotent. + + Ray worker logs should be treated in a special way because + there's a need to intercept stdout and stderr to support various + ray features. For example, ray will prepend 0 or 1 in the beggining + of each log message to decide if logs should be streamed to driveres. + + This logger will also setup the RotatingFileHandler for + ray workers processes. + + Args: + is_for_stdout(bool): True if logger will be used to intercept stdout. + False otherwise. + """ + file_extension = "out" if is_for_stdout else "err" + logger = logging.getLogger(f"ray_default_worker_{file_extension}") + logger.setLevel(logging.INFO) + # TODO(sang): This is how the job id is propagated to workers now. + # But eventually, it will be clearer to just pass the job id. + job_id = os.environ.get("RAY_JOB_ID") + if args.worker_type == "WORKER": + assert job_id is not None, ( + "RAY_JOB_ID should be set as an env " + "variable within default_worker.py. If you see this error, " + "please report it to Ray's Github issue.") + worker_name = "worker" + else: + job_id = ray.JobID.nil() + worker_name = "io_worker" + + # Make sure these values are set already. + assert ray.worker._global_node is not None + assert ray.worker.global_worker is not None + handler = RotatingFileHandler( + f"{ray.worker._global_node.get_session_dir_path()}/logs/" + f"{worker_name}-" + f"{ray.utils.binary_to_hex(ray.worker.global_worker.worker_id)}-" + f"{job_id}-{os.getpid()}.{file_extension}") + logger.addHandler(handler) + # TODO(sang): Add 0 or 1 to decide whether + # or not logs are streamed to drivers. + handler.setFormatter(logging.Formatter("%(message)s")) + # Avoid messages are propagated to parent loggers. + logger.propagate = False + # Remove the terminator. It is important because we don't want this + # logger to add a newline at the end of string. + handler.terminator = "" + return logger + + +def main(args): + ray.ray_logging.setup_logger(args.logging_level, args.logging_format) + + if args.worker_type == "WORKER": + mode = ray.WORKER_MODE + elif args.worker_type == "SPILL_WORKER": + mode = ray.SPILL_WORKER_MODE + elif args.worker_type == "RESTORE_WORKER": + mode = ray.RESTORE_WORKER_MODE + else: + raise ValueError("Unknown worker type: " + args.worker_type) + + # NOTE(suquark): We must initialize the external storage before we + # connect to raylet. Otherwise we may receive requests before the + # external storage is intialized. + if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: + from ray import external_storage + if args.object_spilling_config: + object_spilling_config = base64.b64decode( + args.object_spilling_config) + object_spilling_config = json.loads(object_spilling_config) + else: + object_spilling_config = {} + external_storage.setup_external_storage(object_spilling_config) + + raylet_ip_address = args.raylet_ip_address + if raylet_ip_address is None: + raylet_ip_address = args.node_ip_address + + code_search_path = args.code_search_path + if code_search_path is not None: + for p in code_search_path.split(":"): + if os.path.isfile(p): + p = os.path.dirname(p) + sys.path.append(p) + + ray_params = RayParams( + node_ip_address=args.node_ip_address, + raylet_ip_address=raylet_ip_address, + node_manager_port=args.node_manager_port, + redis_address=args.redis_address, + redis_password=args.redis_password, + plasma_store_socket_name=args.object_store_name, + raylet_socket_name=args.raylet_name, + temp_dir=args.temp_dir, + load_code_from_local=args.load_code_from_local, + metrics_agent_port=args.metrics_agent_port, + ) + + node = ray.node.Node( + ray_params, + head=False, + shutdown_at_exit=False, + spawn_reaper=False, + connect_only=True) + ray.worker._global_node = node + ray.worker.connect(node, mode=mode) + + # Redirect stdout and stderr to the default worker interceptor logger. + # NOTE: We deprecated redirect_worker_output arg, + # so we don't need to handle here. + stdout_interceptor = StandardStreamInterceptor( + setup_and_get_worker_interceptor_logger(is_for_stdout=True), + intercept_stdout=True) + stderr_interceptor = StandardStreamInterceptor( + setup_and_get_worker_interceptor_logger(is_for_stdout=False), + intercept_stdout=False) + with redirect_stdout(stdout_interceptor): + with redirect_stderr(stderr_interceptor): + if mode == ray.WORKER_MODE: + ray.worker.global_worker.main_loop() + elif (mode == ray.RESTORE_WORKER_MODE + or mode == ray.SPILL_WORKER_MODE): + # It is handled by another thread in the C++ core worker. + # We just need to keep the worker alive. + while True: + time.sleep(100000) + else: + raise ValueError(f"Unexcepted worker mode: {mode}") + parser = argparse.ArgumentParser( description=("Parse addresses for the worker " @@ -110,70 +249,4 @@ parser.add_argument( "Java and `PYTHONPATH` in Python.") if __name__ == "__main__": args = parser.parse_args() - - ray.utils.setup_logger(args.logging_level, args.logging_format) - - if args.worker_type == "WORKER": - mode = ray.WORKER_MODE - elif args.worker_type == "SPILL_WORKER": - mode = ray.SPILL_WORKER_MODE - elif args.worker_type == "RESTORE_WORKER": - mode = ray.RESTORE_WORKER_MODE - else: - raise ValueError("Unknown worker type: " + args.worker_type) - - # NOTE(suquark): We must initialize the external storage before we - # connect to raylet. Otherwise we may receive requests before the - # external storage is intialized. - if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: - from ray import external_storage - if args.object_spilling_config: - object_spilling_config = base64.b64decode( - args.object_spilling_config) - object_spilling_config = json.loads(object_spilling_config) - else: - object_spilling_config = {} - external_storage.setup_external_storage(object_spilling_config) - - raylet_ip_address = args.raylet_ip_address - if raylet_ip_address is None: - raylet_ip_address = args.node_ip_address - - code_search_path = args.code_search_path - if code_search_path is not None: - for p in code_search_path.split(":"): - if os.path.isfile(p): - p = os.path.dirname(p) - sys.path.append(p) - - ray_params = RayParams( - node_ip_address=args.node_ip_address, - raylet_ip_address=raylet_ip_address, - node_manager_port=args.node_manager_port, - redis_address=args.redis_address, - redis_password=args.redis_password, - plasma_store_socket_name=args.object_store_name, - raylet_socket_name=args.raylet_name, - temp_dir=args.temp_dir, - load_code_from_local=args.load_code_from_local, - metrics_agent_port=args.metrics_agent_port, - ) - - node = ray.node.Node( - ray_params, - head=False, - shutdown_at_exit=False, - spawn_reaper=False, - connect_only=True) - ray.worker._global_node = node - - ray.worker.connect(node, mode=mode) - if mode == ray.WORKER_MODE: - ray.worker.global_worker.main_loop() - elif mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE: - # It is handled by another thread in the C++ core worker. - # We just need to keep the worker alive. - while True: - time.sleep(100000) - else: - raise ValueError(f"Unexcepted worker mode: {mode}") + main(args)