[Logging] Fix tensorflow logging issue. (#12225)

* in progress.

* ip

* In Progress

* done.

* fix lint.

* Addressed code review

* Addressed code review.
This commit is contained in:
SangBin Cho
2020-11-29 22:16:52 -08:00
committed by GitHub
parent 91d54ef621
commit 3964defbe1
2 changed files with 217 additions and 140 deletions
+130
View File
@@ -1,5 +1,10 @@
import logging
import os
import sys
from logging.handlers import RotatingFileHandler
import ray
from ray.utils import binary_to_hex
_default_handler = None
@@ -20,6 +25,11 @@ def setup_logger(logging_level, logging_format):
logger.propagate = False
"""
All components underneath here is used specifically for the default_worker.py.
"""
class StandardStreamInterceptor:
"""Used to intercept stdout and stderr.
@@ -44,11 +54,14 @@ class StandardStreamInterceptor:
def __init__(self, logger, intercept_stdout=True):
self.logger = logger
assert len(self.logger.handlers) == 1, (
"Only one handler is allowed for the interceptor logger.")
self.intercept_stdout = intercept_stdout
def write(self, message):
"""Redirect the original message to the logger."""
self.logger.info(message)
return len(message)
def flush(self):
for handler in self.logger.handlers:
@@ -58,3 +71,120 @@ class StandardStreamInterceptor:
# Return the standard out isatty. This is used by colorful.
fd = 1 if self.intercept_stdout else 2
return os.isatty(fd)
def close(self):
handler = self.logger.handlers[0]
handler.close()
def fileno(self):
handler = self.logger.handlers[0]
return handler.stream.fileno()
class StandardFdRedirectionRotatingFileHandler(RotatingFileHandler):
"""RotatingFileHandler that redirects stdout and stderr to the log file.
It is specifically used to default_worker.py.
The only difference from this handler vs original RotatingFileHandler is
that it actually duplicates the OS level fd using os.dup2.
"""
def __init__(self,
filename,
mode="a",
maxBytes=0,
backupCount=0,
encoding=None,
delay=False,
is_for_stdout=True):
super().__init__(
filename,
mode=mode,
maxBytes=maxBytes,
backupCount=backupCount,
encoding=encoding,
delay=delay)
self.is_for_stdout = is_for_stdout
self.switch_os_fd()
def doRollover(self):
super().doRollover()
self.switch_os_fd()
def get_original_stream(self):
if self.is_for_stdout:
return sys.stdout
else:
return sys.stderr
def switch_os_fd(self):
# Old fd will automatically closed by dup2 when necessary.
os.dup2(self.stream.fileno(), self.get_original_stream().fileno())
def setup_and_get_worker_interceptor_logger(args,
max_bytes=0,
backup_count=0,
is_for_stdout: bool = True):
"""Setup a logger to be used to intercept worker log messages.
NOTE: This method is only meant to be used within default_worker.py.
Ray worker logs should be treated in a special way because
there's a need to intercept stdout and stderr to support various
ray features. For example, ray will prepend 0 or 1 in the beggining
of each log message to decide if logs should be streamed to driveres.
This logger will also setup the RotatingFileHandler for
ray workers processes.
If max_bytes and backup_count is not set, files will grow indefinitely.
Args:
args: args received from default_worker.py.
max_bytes(int): maxBytes argument of RotatingFileHandler.
backup_count(int): backupCount argument of RotatingFileHandler.
is_for_stdout(bool): True if logger will be used to intercept stdout.
False otherwise.
"""
file_extension = "out" if is_for_stdout else "err"
logger = logging.getLogger(f"ray_default_worker_{file_extension}")
if len(logger.handlers) == 1:
return logger
logger.setLevel(logging.INFO)
# TODO(sang): This is how the job id is propagated to workers now.
# But eventually, it will be clearer to just pass the job id.
job_id = os.environ.get("RAY_JOB_ID")
if args.worker_type == "WORKER":
assert job_id is not None, (
"RAY_JOB_ID should be set as an env "
"variable within default_worker.py. If you see this error, "
"please report it to Ray's Github issue.")
worker_name = "worker"
else:
job_id = ray.JobID.nil()
worker_name = "io_worker"
# Make sure these values are set already.
assert ray.worker._global_node is not None
assert ray.worker.global_worker is not None
filename = (f"{ray.worker._global_node.get_session_dir_path()}/logs/"
f"{worker_name}-"
f"{binary_to_hex(ray.worker.global_worker.worker_id)}-"
f"{job_id}-{os.getpid()}.{file_extension}")
handler = StandardFdRedirectionRotatingFileHandler(
filename,
maxBytes=max_bytes,
backupCount=backup_count,
is_for_stdout=is_for_stdout)
logger.addHandler(handler)
# TODO(sang): Add 0 or 1 to decide whether
# or not logs are streamed to drivers.
handler.setFormatter(logging.Formatter("%(message)s"))
# Avoid messages are propagated to parent loggers.
logger.propagate = False
# Remove the terminator. It is important because we don't want this
# logger to add a newline at the end of string.
handler.terminator = ""
return logger
+87 -140
View File
@@ -1,12 +1,9 @@
import argparse
import base64
import json
import logging
import time
import sys
import os
from contextlib import redirect_stdout, redirect_stderr
from logging.handlers import RotatingFileHandler
import ray
import ray.actor
@@ -14,142 +11,8 @@ import ray.node
import ray.ray_constants as ray_constants
import ray.utils
from ray.parameter import RayParams
from ray.ray_logging import StandardStreamInterceptor
def setup_and_get_worker_interceptor_logger(is_for_stdout: bool = True):
"""Setup a logger to be used to intercept worker log messages.
NOTE: The method is not idempotent.
Ray worker logs should be treated in a special way because
there's a need to intercept stdout and stderr to support various
ray features. For example, ray will prepend 0 or 1 in the beggining
of each log message to decide if logs should be streamed to driveres.
This logger will also setup the RotatingFileHandler for
ray workers processes.
Args:
is_for_stdout(bool): True if logger will be used to intercept stdout.
False otherwise.
"""
file_extension = "out" if is_for_stdout else "err"
logger = logging.getLogger(f"ray_default_worker_{file_extension}")
logger.setLevel(logging.INFO)
# TODO(sang): This is how the job id is propagated to workers now.
# But eventually, it will be clearer to just pass the job id.
job_id = os.environ.get("RAY_JOB_ID")
if args.worker_type == "WORKER":
assert job_id is not None, (
"RAY_JOB_ID should be set as an env "
"variable within default_worker.py. If you see this error, "
"please report it to Ray's Github issue.")
worker_name = "worker"
else:
job_id = ray.JobID.nil()
worker_name = "io_worker"
# Make sure these values are set already.
assert ray.worker._global_node is not None
assert ray.worker.global_worker is not None
handler = RotatingFileHandler(
f"{ray.worker._global_node.get_session_dir_path()}/logs/"
f"{worker_name}-"
f"{ray.utils.binary_to_hex(ray.worker.global_worker.worker_id)}-"
f"{job_id}-{os.getpid()}.{file_extension}")
logger.addHandler(handler)
# TODO(sang): Add 0 or 1 to decide whether
# or not logs are streamed to drivers.
handler.setFormatter(logging.Formatter("%(message)s"))
# Avoid messages are propagated to parent loggers.
logger.propagate = False
# Remove the terminator. It is important because we don't want this
# logger to add a newline at the end of string.
handler.terminator = ""
return logger
def main(args):
ray.ray_logging.setup_logger(args.logging_level, args.logging_format)
if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "SPILL_WORKER":
mode = ray.SPILL_WORKER_MODE
elif args.worker_type == "RESTORE_WORKER":
mode = ray.RESTORE_WORKER_MODE
else:
raise ValueError("Unknown worker type: " + args.worker_type)
# NOTE(suquark): We must initialize the external storage before we
# connect to raylet. Otherwise we may receive requests before the
# external storage is intialized.
if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
from ray import external_storage
if args.object_spilling_config:
object_spilling_config = base64.b64decode(
args.object_spilling_config)
object_spilling_config = json.loads(object_spilling_config)
else:
object_spilling_config = {}
external_storage.setup_external_storage(object_spilling_config)
raylet_ip_address = args.raylet_ip_address
if raylet_ip_address is None:
raylet_ip_address = args.node_ip_address
code_search_path = args.code_search_path
if code_search_path is not None:
for p in code_search_path.split(":"):
if os.path.isfile(p):
p = os.path.dirname(p)
sys.path.append(p)
ray_params = RayParams(
node_ip_address=args.node_ip_address,
raylet_ip_address=raylet_ip_address,
node_manager_port=args.node_manager_port,
redis_address=args.redis_address,
redis_password=args.redis_password,
plasma_store_socket_name=args.object_store_name,
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
metrics_agent_port=args.metrics_agent_port,
)
node = ray.node.Node(
ray_params,
head=False,
shutdown_at_exit=False,
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(node, mode=mode)
# Redirect stdout and stderr to the default worker interceptor logger.
# NOTE: We deprecated redirect_worker_output arg,
# so we don't need to handle here.
stdout_interceptor = StandardStreamInterceptor(
setup_and_get_worker_interceptor_logger(is_for_stdout=True),
intercept_stdout=True)
stderr_interceptor = StandardStreamInterceptor(
setup_and_get_worker_interceptor_logger(is_for_stdout=False),
intercept_stdout=False)
with redirect_stdout(stdout_interceptor):
with redirect_stderr(stderr_interceptor):
if mode == ray.WORKER_MODE:
ray.worker.global_worker.main_loop()
elif (mode == ray.RESTORE_WORKER_MODE
or mode == ray.SPILL_WORKER_MODE):
# It is handled by another thread in the C++ core worker.
# We just need to keep the worker alive.
while True:
time.sleep(100000)
else:
raise ValueError(f"Unexcepted worker mode: {mode}")
from ray.ray_logging import (StandardStreamInterceptor,
setup_and_get_worker_interceptor_logger)
parser = argparse.ArgumentParser(
description=("Parse addresses for the worker "
@@ -248,5 +111,89 @@ parser.add_argument(
"the search path for user code. This will be used as `CLASSPATH` in "
"Java and `PYTHONPATH` in Python.")
if __name__ == "__main__":
# NOTE(sang): For some reason, if we move the code below
# to a separate function, tensorflow will capture that method
# as a step function. For more details, check out
# https://github.com/ray-project/ray/pull/12225#issue-525059663.
args = parser.parse_args()
main(args)
ray.ray_logging.setup_logger(args.logging_level, args.logging_format)
if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "SPILL_WORKER":
mode = ray.SPILL_WORKER_MODE
elif args.worker_type == "RESTORE_WORKER":
mode = ray.RESTORE_WORKER_MODE
else:
raise ValueError("Unknown worker type: " + args.worker_type)
# NOTE(suquark): We must initialize the external storage before we
# connect to raylet. Otherwise we may receive requests before the
# external storage is intialized.
if mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE:
from ray import external_storage
if args.object_spilling_config:
object_spilling_config = base64.b64decode(
args.object_spilling_config)
object_spilling_config = json.loads(object_spilling_config)
else:
object_spilling_config = {}
external_storage.setup_external_storage(object_spilling_config)
raylet_ip_address = args.raylet_ip_address
if raylet_ip_address is None:
raylet_ip_address = args.node_ip_address
code_search_path = args.code_search_path
if code_search_path is not None:
for p in code_search_path.split(":"):
if os.path.isfile(p):
p = os.path.dirname(p)
sys.path.append(p)
ray_params = RayParams(
node_ip_address=args.node_ip_address,
raylet_ip_address=raylet_ip_address,
node_manager_port=args.node_manager_port,
redis_address=args.redis_address,
redis_password=args.redis_password,
plasma_store_socket_name=args.object_store_name,
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
metrics_agent_port=args.metrics_agent_port,
)
node = ray.node.Node(
ray_params,
head=False,
shutdown_at_exit=False,
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(node, mode=mode)
# Redirect stdout and stderr to the default worker interceptor logger.
# NOTE: We deprecated redirect_worker_output arg,
# so we don't need to handle here.
stdout_interceptor = StandardStreamInterceptor(
setup_and_get_worker_interceptor_logger(args, is_for_stdout=True),
intercept_stdout=True)
stderr_interceptor = StandardStreamInterceptor(
setup_and_get_worker_interceptor_logger(args, is_for_stdout=False),
intercept_stdout=False)
# Although the os level fd is duplicated already, we should overwrite
# the python level stdout/stderr object.
# Otherwise, buffers won't be flushed.
sys.stdout = stdout_interceptor
sys.stderr = stderr_interceptor
if mode == ray.WORKER_MODE:
ray.worker.global_worker.main_loop()
elif (mode == ray.RESTORE_WORKER_MODE or mode == ray.SPILL_WORKER_MODE):
# It is handled by another thread in the C++ core worker.
# We just need to keep the worker alive.
while True:
time.sleep(100000)
else:
raise ValueError(f"Unexcepted worker mode: {mode}")