diff --git a/python/ray/ray_logging.py b/python/ray/ray_logging.py index e5f84eb2a..0668f397f 100644 --- a/python/ray/ray_logging.py +++ b/python/ray/ray_logging.py @@ -153,6 +153,46 @@ class StandardFdRedirectionRotatingFileHandler(RotatingFileHandler): os.dup2(self.stream.fileno(), self.get_original_stream().fileno()) +def get_worker_log_file_name(worker_type): + job_id = os.environ.get("RAY_JOB_ID") + if worker_type == "WORKER": + assert job_id is not None, ( + "RAY_JOB_ID should be set as an env " + "variable within default_worker.py. If you see this error, " + "please report it to Ray's Github issue.") + worker_name = "worker" + else: + job_id = ray.JobID.nil() + worker_name = "io_worker" + + # Make sure these values are set already. + assert ray.worker._global_node is not None + assert ray.worker.global_worker is not None + filename = (f"{worker_name}-" + f"{binary_to_hex(ray.worker.global_worker.worker_id)}-" + f"{job_id}-{os.getpid()}") + return filename + + +def configure_log_file(out_file, err_file): + stdout_fileno = sys.stdout.fileno() + stderr_fileno = sys.stderr.fileno() + # C++ logging requires redirecting the stdout file descriptor. Note that + # dup2 will automatically close the old file descriptor before overriding + # it. + os.dup2(out_file.fileno(), stdout_fileno) + os.dup2(err_file.fileno(), stderr_fileno) + # We also manually set sys.stdout and sys.stderr because that seems to + # have an effect on the output buffering. Without doing this, stdout + # and stderr are heavily buffered resulting in seemingly lost logging + # statements. We never want to close the stdout file descriptor, dup2 will + # close it when necessary and we don't want python's GC to close it. + sys.stdout = ray.utils.open_log( + stdout_fileno, unbuffered=True, closefd=False) + sys.stderr = ray.utils.open_log( + stderr_fileno, unbuffered=True, closefd=False) + + def setup_and_get_worker_interceptor_logger(args, max_bytes=0, backup_count=0, diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 8b63b2c81..d9f7837ff 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -11,8 +11,7 @@ import ray.node import ray.ray_constants as ray_constants import ray.utils from ray.parameter import RayParams -from ray.ray_logging import (StandardStreamInterceptor, - setup_and_get_worker_interceptor_logger) +from ray.ray_logging import get_worker_log_file_name, configure_log_file parser = argparse.ArgumentParser( description=("Parse addresses for the worker " @@ -175,20 +174,10 @@ if __name__ == "__main__": ray.worker._global_node = node ray.worker.connect(node, mode=mode) - # Redirect stdout and stderr to the default worker interceptor logger. - # NOTE: We deprecated redirect_worker_output arg, - # so we don't need to handle here. - stdout_interceptor = StandardStreamInterceptor( - setup_and_get_worker_interceptor_logger(args, is_for_stdout=True), - intercept_stdout=True) - stderr_interceptor = StandardStreamInterceptor( - setup_and_get_worker_interceptor_logger(args, is_for_stdout=False), - intercept_stdout=False) - # Although the os level fd is duplicated already, we should overwrite - # the python level stdout/stderr object. - # Otherwise, buffers won't be flushed. - sys.stdout = stdout_interceptor - sys.stderr = stderr_interceptor + # Setup log file. + out_file, err_file = node.get_log_file_handles( + get_worker_log_file_name(args.worker_type)) + configure_log_file(out_file, err_file) if mode == ray.WORKER_MODE: ray.worker.global_worker.main_loop()