[Logging] Remove per worker job log file / support worker log rotation (#11927)

* In progress.

* MVP done.

* In Progress.

* Remove unnecessay code.

* Fix some issues.

* Fix test failures.

* Addressed code review + fix object spilling test failure.
This commit is contained in:
SangBin Cho
2020-11-16 11:29:43 -08:00
committed by GitHub
parent b6b54f1c81
commit f56d7c1a76
11 changed files with 215 additions and 254 deletions
-14
View File
@@ -326,19 +326,6 @@ cdef prepare_args(
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress())))
def switch_worker_log_if_needed(worker, next_job_id):
if worker.mode != ray.WORKER_MODE:
return
if (worker.current_logging_job_id is None) or \
(worker.current_logging_job_id != next_job_id):
job_stdout_path, job_stderr_path = (
worker.node.get_job_redirected_log_file(
worker.worker_id, next_job_id.binary())
)
ray.worker.set_log_file(job_stdout_path, job_stderr_path)
worker.current_logging_job_id = next_job_id
cdef execute_task(
CTaskType task_type,
const c_string name,
@@ -472,7 +459,6 @@ cdef execute_task(
with core_worker.profile_event(b"task:execute"):
task_exception = True
try:
switch_worker_log_if_needed(worker, job_id)
with ray.worker._changeproctitle(title, next_title):
outputs = function_executor(*args, **kwargs)
task_exception = False
+1 -1
View File
@@ -946,7 +946,7 @@ if __name__ == "__main__":
default=None,
help="Specify the path of the temporary directory use by Ray process.")
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
ray.ray_logging.setup_logger(args.logging_level, args.logging_format)
# TODO(sang): Add a URL validation.
metrics_export_address = os.environ.get("METRICS_EXPORT_ADDRESS")
+9 -10
View File
@@ -30,7 +30,8 @@ class LogFileInfo:
file_position=None,
file_handle=None,
is_err_file=False,
job_id=None):
job_id=None,
worker_pid=None):
assert (filename is not None and size_when_last_opened is not None
and file_position is not None)
self.filename = filename
@@ -39,7 +40,7 @@ class LogFileInfo:
self.file_handle = file_handle
self.is_err_file = is_err_file
self.job_id = job_id
self.worker_pid = None
self.worker_pid = worker_pid
class LogMonitor:
@@ -130,8 +131,10 @@ class LogMonitor:
job_match = JOB_LOG_PATTERN.match(file_path)
if job_match:
job_id = job_match.group(2)
worker_pid = job_match.group(3)
else:
job_id = None
worker_pid = None
is_err_file = file_path.endswith("err")
@@ -143,7 +146,8 @@ class LogMonitor:
file_position=0,
file_handle=None,
is_err_file=is_err_file,
job_id=job_id))
job_id=job_id,
worker_pid=worker_pid))
log_filename = os.path.basename(file_path)
logger.info(f"Beginning to track file {log_filename}")
@@ -236,12 +240,7 @@ class LogMonitor:
raise
if file_info.file_position == 0:
if (len(lines_to_publish) > 0 and
lines_to_publish[0].startswith("Ray worker pid: ")):
file_info.worker_pid = int(
lines_to_publish[0].split(" ")[-1])
lines_to_publish = lines_to_publish[1:]
elif "/raylet" in file_info.filename:
if "/raylet" in file_info.filename:
file_info.worker_pid = "raylet"
elif "/gcs_server" in file_info.filename:
file_info.worker_pid = "gcs_server"
@@ -315,7 +314,7 @@ if __name__ == "__main__":
help="Specify the path of the temporary directory used by Ray "
"processes.")
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
ray.ray_logging.setup_logger(args.logging_level, args.logging_format)
log_monitor = LogMonitor(
args.logs_dir, args.redis_address, redis_password=args.redis_password)
+1 -1
View File
@@ -17,7 +17,7 @@ from ray.autoscaler._private.constants import \
import ray.gcs_utils
import ray.utils
import ray.ray_constants as ray_constants
from ray.utils import setup_logger
from ray.ray_logging import setup_logger
from ray._raylet import GlobalStateAccessor
import redis
-35
View File
@@ -755,41 +755,6 @@ class Node:
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
def get_job_redirected_log_file(self,
worker_id: bytes,
job_id: bytes = None):
"""Determines (but does not create) logging files for workers to
redirect its output.
Args:
worker_id (bytes): A byte representation of the worker id.
job_id (bytes): A byte representation of the job id. If None,
provides a generic log file for the worker.
Returns:
(tuple) The stdout and stderr file names that the job should be
redirected to.
"""
redirect_output = self._ray_params.redirect_output
if redirect_output is None:
# Make the default behavior match that of glog.
redirect_output = os.getenv("GLOG_logtostderr") != "1"
if not redirect_output:
return None, None
if job_id is not None:
name = "worker-{}-{}-{}".format(
ray.utils.binary_to_hex(worker_id),
ray.utils.binary_to_hex(job_id), os.getpid())
else:
name = f"worker-{ray.utils.binary_to_hex(worker_id)}-{os.getpid()}"
worker_stdout_file, worker_stderr_file = self._get_log_file_names(
name, unique=False)
return worker_stdout_file, worker_stderr_file
def start_worker(self):
"""Start a worker process."""
raise NotImplementedError
+60
View File
@@ -0,0 +1,60 @@
import logging
import os
_default_handler = None
def setup_logger(logging_level, logging_format):
"""Setup default logging for ray."""
logger = logging.getLogger("ray")
if type(logging_level) is str:
logging_level = logging.getLevelName(logging_level.upper())
logger.setLevel(logging_level)
global _default_handler
if _default_handler is None:
_default_handler = logging.StreamHandler()
logger.addHandler(_default_handler)
_default_handler.setFormatter(logging.Formatter(logging_format))
# Setting this will avoid the message
# is propagated to the parent logger.
logger.propagate = False
class StandardStreamInterceptor:
"""Used to intercept stdout and stderr.
Intercepted messages are handled by the given logger.
NOTE: The logger passed to this method should always have
logging.INFO severity level.
Example:
>>> from contextlib import redirect_stdout
>>> logger = logging.getLogger("ray_logger")
>>> hook = StandardStreamHook(logger)
>>> with redirect_stdout(hook):
>>> print("a") # stdout will be delegated to logger.
Args:
logger: Python logger that will receive messages streamed to
the standard out/err and delegate writes.
intercept_stdout(bool): True if the class intercepts stdout. False
if stderr is intercepted.
"""
def __init__(self, logger, intercept_stdout=True):
self.logger = logger
self.intercept_stdout = intercept_stdout
def write(self, message):
"""Redirect the original message to the logger."""
self.logger.info(message)
def flush(self):
for handler in self.logger.handlers:
handler.flush()
def isatty(self):
# Return the standard out isatty. This is used by colorful.
fd = 1 if self.intercept_stdout else 2
return os.isatty(fd)
+1 -1
View File
@@ -297,7 +297,7 @@ if __name__ == "__main__":
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
ray.ray_logging.setup_logger(args.logging_level, args.logging_format)
reporter = Reporter(
args.redis_address,
+1 -1
View File
@@ -97,7 +97,7 @@ def add_click_options(options):
@click.version_option()
def cli(logging_level, logging_format):
level = logging.getLevelName(logging_level.upper())
ray.utils.setup_logger(level, logging_format)
ray.ray_logging.setup_logger(level, logging_format)
cli_logger.set_format(format_tmpl=logging_format)
-43
View File
@@ -388,23 +388,6 @@ def resources_from_resource_arguments(
return resources
_default_handler = None
def setup_logger(logging_level, logging_format):
"""Setup default logging for ray."""
logger = logging.getLogger("ray")
if type(logging_level) is str:
logging_level = logging.getLevelName(logging_level.upper())
logger.setLevel(logging_level)
global _default_handler
if _default_handler is None:
_default_handler = logging.StreamHandler()
logger.addHandler(_default_handler)
_default_handler.setFormatter(logging.Formatter(logging_format))
logger.propagate = False
class Unbuffered(object):
"""There's no "built-in" solution to programatically disabling buffering of
text files. Ray expects stdout/err to be text files, so creating an
@@ -447,32 +430,6 @@ def open_log(path, unbuffered=False, **kwargs):
return stream
def create_and_init_new_worker_log(path, worker_pid):
"""Opens or creates and sets up a new worker log file. Note that because we
expect to dup the underlying file descriptor, then fdopen it, the python
level metadata is not important.
Args:
path (str): The name/path of the file to be opened.
worker_pid (int): The pid of the worker process.
Returns:
A file-like object which can be written to.
"""
# TODO (Alex): We should eventually be able to replace this with
# named-pipes.
f = open_log(path)
# Check to see if we're creating this file. No one else should ever write
# to this file, so we don't have to worry about TOCTOU.
if f.tell() == 0:
# This should always be the first message to appear in the worker's
# stdout and stderr log files. The string "Ray worker pid:" is
# parsed in the log monitor process.
print(f"Ray worker pid: {worker_pid}", file=f)
return f
def get_system_memory():
"""Return the total amount of system memory in bytes.
+2 -81
View File
@@ -47,8 +47,8 @@ from ray.exceptions import (
ObjectStoreFullError,
)
from ray.function_manager import FunctionActorManager
from ray.utils import (_random_string, check_oversized_pickle, is_cython,
setup_logger, create_and_init_new_worker_log, open_log)
from ray.ray_logging import setup_logger
from ray.utils import _random_string, check_oversized_pickle, is_cython
SCRIPT_MODE = 0
WORKER_MODE = 1
@@ -887,68 +887,6 @@ last_task_error_raise_time = 0
UNCAUGHT_ERROR_GRACE_PERIOD = 5
def _set_log_file(file_name, worker_pid, old_obj, setter_func):
# Line-buffer the output (mode 1).
f = create_and_init_new_worker_log(file_name, worker_pid)
# TODO (Alex): Python seems to always flush when writing. If that is no
# longer true, then we need to manually flush the old buffer.
# old_obj.flush()
# TODO (Alex): Flush the c/c++ userspace buffers if necessary.
# `fflush(stdout); cout.flush();`
fileno = old_obj.fileno()
# C++ logging requires redirecting the stdout file descriptor. Note that
# dup2 will automatically close the old file descriptor before overriding
# it.
os.dup2(f.fileno(), fileno)
# We also manually set sys.stdout and sys.stderr because that seems to
# have an effect on the output buffering. Without doing this, stdout
# and stderr are heavily buffered resulting in seemingly lost logging
# statements. We never want to close the stdout file descriptor, dup2 will
# close it when necessary and we don't want python's GC to close it.
setter_func(open_log(fileno, unbuffered=True, closefd=False))
return os.path.abspath(f.name)
def set_log_file(stdout_name, stderr_name):
"""Sets up logging for the current worker, creating the (fd backed) file and
flushing buffers as is necessary.
Args:
stdout_name (str): The file name that stdout should be written to.
stderr_name(str): The file name that stderr should be written to.
Returns:
(tuple) The absolute paths of the files that stdout and stderr will be
written to.
"""
stdout_path = ""
stderr_path = ""
worker_pid = os.getpid()
# lambda cannot contain assignment
def stdout_setter(x):
sys.stdout = x
def stderr_setter(x):
sys.stderr = x
if stdout_name:
_set_log_file(stdout_name, worker_pid, sys.stdout, stdout_setter)
# The stderr case should be analogous to the stdout case
if stderr_name:
_set_log_file(stderr_name, worker_pid, sys.stderr, stderr_setter)
return stdout_path, stderr_path
def print_logs(redis_client, threads_stopped, job_id):
"""Prints log messages from workers on all of the nodes.
@@ -1227,23 +1165,6 @@ def connect(node,
import __main__ as main
driver_name = (main.__file__
if hasattr(main, "__file__") else "INTERACTIVE MODE")
elif (mode == WORKER_MODE or mode == SPILL_WORKER_MODE
or mode == RESTORE_WORKER_MODE):
# Check the RedirectOutput key in Redis and based on its value redirect
# worker output and error to their own files.
# This key is set in services.py when Redis is started.
redirect_worker_output_val = worker.redis_client.get("RedirectOutput")
if (redirect_worker_output_val is not None
and int(redirect_worker_output_val) == 1):
log_stdout_file_name, log_stderr_file_name = (
node.get_job_redirected_log_file(worker.worker_id))
try:
log_stdout_file_path, log_stderr_file_path = \
set_log_file(log_stdout_file_name, log_stderr_file_name)
except IOError:
raise IOError(
"Workers must be able to redirect their output at"
"the file descriptor level.")
elif not LOCAL_MODE:
raise ValueError(
"Invalid worker mode. Expected DRIVER, WORKER or LOCAL.")
+140 -67
View File
@@ -1,9 +1,12 @@
import argparse
import base64
import json
import logging
import time
import sys
import os
from contextlib import redirect_stdout, redirect_stderr
from logging.handlers import RotatingFileHandler
import ray
import ray.actor
@@ -11,6 +14,142 @@ import ray.node
import ray.ray_constants as ray_constants
import ray.utils
from ray.parameter import RayParams
from ray.ray_logging import StandardStreamInterceptor
def setup_and_get_worker_interceptor_logger(is_for_stdout: bool = True):
"""Setup a logger to be used to intercept worker log messages.
NOTE: The method is not idempotent.
Ray worker logs should be treated in a special way because
there's a need to intercept stdout and stderr to support various
ray features. For example, ray will prepend 0 or 1 in the beggining
of each log message to decide if logs should be streamed to driveres.
This logger will also setup the RotatingFileHandler for
ray workers processes.
Args:
is_for_stdout(bool): True if logger will be used to intercept stdout.
False otherwise.
"""
file_extension = "out" if is_for_stdout else "err"
logger = logging.getLogger(f"ray_default_worker_{file_extension}")
logger.setLevel(logging.INFO)
# TODO(sang): This is how the job id is propagated to workers now.
# But eventually, it will be clearer to just pass the job id.
job_id = os.environ.get("RAY_JOB_ID")
if args.worker_type == "WORKER":
assert job_id is not None, (
"RAY_JOB_ID should be set as an env "
"variable within default_worker.py. If you see this error, "
"please report it to Ray's Github issue.")
worker_name = "worker"
else:
job_id = ray.JobID.nil()
worker_name = "io_worker"
# Make sure these values are set already.
assert ray.worker._global_node is not None
assert ray.worker.global_worker is not None
handler = RotatingFileHandler(
f"{ray.worker._global_node.get_session_dir_path()}/logs/"
f"{worker_name}-"
f"{ray.utils.binary_to_hex(ray.worker.global_worker.worker_id)}-"
f"{job_id}-{os.getpid()}.{file_extension}")
logger.addHandler(handler)
# TODO(sang): Add 0 or 1 to decide whether
# or not logs are streamed to drivers.
handler.setFormatter(logging.Formatter("%(message)s"))
# Avoid messages are propagated to parent loggers.
logger.propagate = False
# Remove the terminator. It is important because we don't want this
# logger to add a newline at the end of string.
handler.terminator = ""
return logger
def main(args):
ray.ray_logging.setup_logger(args.logging_level, args.logging_format)
if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "SPILL_WORKER":
mode = ray.SPILL_WORKER_MODE
elif args.worker_type == "RESTORE_WORKER":
mode = ray.RESTORE_WORKER_MODE
else:
raise ValueError("Unknown worker type: " + args.worker_type)
# NOTE(suquark): We must initialize the external storage before we
# connect to raylet. Otherwise we may receive requests before the
# external storage is intialized.
if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
from ray import external_storage
if args.object_spilling_config:
object_spilling_config = base64.b64decode(
args.object_spilling_config)
object_spilling_config = json.loads(object_spilling_config)
else:
object_spilling_config = {}
external_storage.setup_external_storage(object_spilling_config)
raylet_ip_address = args.raylet_ip_address
if raylet_ip_address is None:
raylet_ip_address = args.node_ip_address
code_search_path = args.code_search_path
if code_search_path is not None:
for p in code_search_path.split(":"):
if os.path.isfile(p):
p = os.path.dirname(p)
sys.path.append(p)
ray_params = RayParams(
node_ip_address=args.node_ip_address,
raylet_ip_address=raylet_ip_address,
node_manager_port=args.node_manager_port,
redis_address=args.redis_address,
redis_password=args.redis_password,
plasma_store_socket_name=args.object_store_name,
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
metrics_agent_port=args.metrics_agent_port,
)
node = ray.node.Node(
ray_params,
head=False,
shutdown_at_exit=False,
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(node, mode=mode)
# Redirect stdout and stderr to the default worker interceptor logger.
# NOTE: We deprecated redirect_worker_output arg,
# so we don't need to handle here.
stdout_interceptor = StandardStreamInterceptor(
setup_and_get_worker_interceptor_logger(is_for_stdout=True),
intercept_stdout=True)
stderr_interceptor = StandardStreamInterceptor(
setup_and_get_worker_interceptor_logger(is_for_stdout=False),
intercept_stdout=False)
with redirect_stdout(stdout_interceptor):
with redirect_stderr(stderr_interceptor):
if mode == ray.WORKER_MODE:
ray.worker.global_worker.main_loop()
elif (mode == ray.RESTORE_WORKER_MODE
or mode == ray.SPILL_WORKER_MODE):
# It is handled by another thread in the C++ core worker.
# We just need to keep the worker alive.
while True:
time.sleep(100000)
else:
raise ValueError(f"Unexcepted worker mode: {mode}")
parser = argparse.ArgumentParser(
description=("Parse addresses for the worker "
@@ -110,70 +249,4 @@ parser.add_argument(
"Java and `PYTHONPATH` in Python.")
if __name__ == "__main__":
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "SPILL_WORKER":
mode = ray.SPILL_WORKER_MODE
elif args.worker_type == "RESTORE_WORKER":
mode = ray.RESTORE_WORKER_MODE
else:
raise ValueError("Unknown worker type: " + args.worker_type)
# NOTE(suquark): We must initialize the external storage before we
# connect to raylet. Otherwise we may receive requests before the
# external storage is intialized.
if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
from ray import external_storage
if args.object_spilling_config:
object_spilling_config = base64.b64decode(
args.object_spilling_config)
object_spilling_config = json.loads(object_spilling_config)
else:
object_spilling_config = {}
external_storage.setup_external_storage(object_spilling_config)
raylet_ip_address = args.raylet_ip_address
if raylet_ip_address is None:
raylet_ip_address = args.node_ip_address
code_search_path = args.code_search_path
if code_search_path is not None:
for p in code_search_path.split(":"):
if os.path.isfile(p):
p = os.path.dirname(p)
sys.path.append(p)
ray_params = RayParams(
node_ip_address=args.node_ip_address,
raylet_ip_address=raylet_ip_address,
node_manager_port=args.node_manager_port,
redis_address=args.redis_address,
redis_password=args.redis_password,
plasma_store_socket_name=args.object_store_name,
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
metrics_agent_port=args.metrics_agent_port,
)
node = ray.node.Node(
ray_params,
head=False,
shutdown_at_exit=False,
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(node, mode=mode)
if mode == ray.WORKER_MODE:
ray.worker.global_worker.main_loop()
elif mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
# It is handled by another thread in the C++ core worker.
# We just need to keep the worker alive.
while True:
time.sleep(100000)
else:
raise ValueError(f"Unexcepted worker mode: {mode}")
main(args)