mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:23:10 +08:00
[Logging] Use file handle temporalily (#12839)
This commit is contained in:
@@ -153,6 +153,46 @@ class StandardFdRedirectionRotatingFileHandler(RotatingFileHandler):
|
||||
os.dup2(self.stream.fileno(), self.get_original_stream().fileno())
|
||||
|
||||
|
||||
def get_worker_log_file_name(worker_type):
|
||||
job_id = os.environ.get("RAY_JOB_ID")
|
||||
if worker_type == "WORKER":
|
||||
assert job_id is not None, (
|
||||
"RAY_JOB_ID should be set as an env "
|
||||
"variable within default_worker.py. If you see this error, "
|
||||
"please report it to Ray's Github issue.")
|
||||
worker_name = "worker"
|
||||
else:
|
||||
job_id = ray.JobID.nil()
|
||||
worker_name = "io_worker"
|
||||
|
||||
# Make sure these values are set already.
|
||||
assert ray.worker._global_node is not None
|
||||
assert ray.worker.global_worker is not None
|
||||
filename = (f"{worker_name}-"
|
||||
f"{binary_to_hex(ray.worker.global_worker.worker_id)}-"
|
||||
f"{job_id}-{os.getpid()}")
|
||||
return filename
|
||||
|
||||
|
||||
def configure_log_file(out_file, err_file):
|
||||
stdout_fileno = sys.stdout.fileno()
|
||||
stderr_fileno = sys.stderr.fileno()
|
||||
# C++ logging requires redirecting the stdout file descriptor. Note that
|
||||
# dup2 will automatically close the old file descriptor before overriding
|
||||
# it.
|
||||
os.dup2(out_file.fileno(), stdout_fileno)
|
||||
os.dup2(err_file.fileno(), stderr_fileno)
|
||||
# We also manually set sys.stdout and sys.stderr because that seems to
|
||||
# have an effect on the output buffering. Without doing this, stdout
|
||||
# and stderr are heavily buffered resulting in seemingly lost logging
|
||||
# statements. We never want to close the stdout file descriptor, dup2 will
|
||||
# close it when necessary and we don't want python's GC to close it.
|
||||
sys.stdout = ray.utils.open_log(
|
||||
stdout_fileno, unbuffered=True, closefd=False)
|
||||
sys.stderr = ray.utils.open_log(
|
||||
stderr_fileno, unbuffered=True, closefd=False)
|
||||
|
||||
|
||||
def setup_and_get_worker_interceptor_logger(args,
|
||||
max_bytes=0,
|
||||
backup_count=0,
|
||||
|
||||
@@ -11,8 +11,7 @@ import ray.node
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.utils
|
||||
from ray.parameter import RayParams
|
||||
from ray.ray_logging import (StandardStreamInterceptor,
|
||||
setup_and_get_worker_interceptor_logger)
|
||||
from ray.ray_logging import get_worker_log_file_name, configure_log_file
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description=("Parse addresses for the worker "
|
||||
@@ -173,20 +172,10 @@ if __name__ == "__main__":
|
||||
ray.worker._global_node = node
|
||||
ray.worker.connect(node, mode=mode)
|
||||
|
||||
# Redirect stdout and stderr to the default worker interceptor logger.
|
||||
# NOTE: We deprecated redirect_worker_output arg,
|
||||
# so we don't need to handle here.
|
||||
stdout_interceptor = StandardStreamInterceptor(
|
||||
setup_and_get_worker_interceptor_logger(args, is_for_stdout=True),
|
||||
intercept_stdout=True)
|
||||
stderr_interceptor = StandardStreamInterceptor(
|
||||
setup_and_get_worker_interceptor_logger(args, is_for_stdout=False),
|
||||
intercept_stdout=False)
|
||||
# Although the os level fd is duplicated already, we should overwrite
|
||||
# the python level stdout/stderr object.
|
||||
# Otherwise, buffers won't be flushed.
|
||||
sys.stdout = stdout_interceptor
|
||||
sys.stderr = stderr_interceptor
|
||||
# Setup log file.
|
||||
out_file, err_file = node.get_log_file_handles(
|
||||
get_worker_log_file_name(args.worker_type))
|
||||
configure_log_file(out_file, err_file)
|
||||
|
||||
if mode == ray.WORKER_MODE:
|
||||
ray.worker.global_worker.main_loop()
|
||||
|
||||
Reference in New Issue
Block a user