From 12d75784a459202fefe3f9640736f2232d9f2bab Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Wed, 5 Aug 2020 13:04:36 -0700 Subject: [PATCH] [Core] test_advanced_3.py::test_logging_to_driver (round 2) (#9916) --- python/ray/tests/test_advanced_3.py | 12 ++++++--- python/ray/utils.py | 41 ++++++++++++++++++++++++++--- python/ray/worker.py | 2 +- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index f769f993c..94fde6b11 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -202,17 +202,21 @@ def test_logging_to_driver(shutdown_only): def f(): # It's important to make sure that these print statements occur even # without calling sys.stdout.flush() and sys.stderr.flush(). - for i in range(100): - print(i) - print(100 + i, file=sys.stderr) + for i in range(10): + print(i, end=" ") + print(100 + i, end=" ", file=sys.stderr) captured = {} with CaptureOutputAndError(captured): ray.get(f.remote()) time.sleep(1) + out_lines = captured["out"] err_lines = captured["err"] - for i in range(200): + for i in range(10): + assert str(i) in out_lines + + for i in range(100, 110): assert str(i) in err_lines diff --git a/python/ray/utils.py b/python/ray/utils.py index 1ce21ba5c..38965db9c 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -391,20 +391,52 @@ def setup_logger(logging_level, logging_format): logger.propagate = False -def open_log(path, **kwargs): +class Unbuffered(object): + """There's no "built-in" solution to programatically disabling buffering of + text files. Ray expects stdout/err to be text files, so creating an + unbuffered binary file is unacceptable. + + See + https://mail.python.org/pipermail/tutor/2003-November/026645.html. + https://docs.python.org/3/library/functions.html#open + + """ + + def __init__(self, stream): + self.stream = stream + + def write(self, data): + self.stream.write(data) + self.stream.flush() + + def writelines(self, datas): + self.stream.writelines(datas) + self.stream.flush() + + def __getattr__(self, attr): + return getattr(self.stream, attr) + + +def open_log(path, unbuffered=False, **kwargs): """ Opens the log file at `path`, with the provided kwargs being given to `open`. """ + # Disable buffering, see test_advanced_3.py::test_logging_to_driver kwargs.setdefault("buffering", 1) kwargs.setdefault("mode", "a") kwargs.setdefault("encoding", "utf-8") - return open(path, **kwargs) + stream = open(path, **kwargs) + if unbuffered: + return Unbuffered(stream) + else: + return stream def create_and_init_new_worker_log(path, worker_pid): - """ - Opens a path (or creates if necessary) for a log. + """Opens or creates and sets up a new worker log file. Note that because we + expect to dup the underlying file descriptor, then fdopen it, the python + level metadata is not important. Args: path (str): The name/path of the file to be opened. @@ -412,6 +444,7 @@ def create_and_init_new_worker_log(path, worker_pid): Returns: A file-like object which can be written to. + """ # TODO (Alex): We should eventually be able to replace this with # named-pipes. diff --git a/python/ray/worker.py b/python/ray/worker.py index 05bc4e22d..6a6d7f2e1 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -931,7 +931,7 @@ def _set_log_file(file_name, worker_pid, old_obj, setter_func): # and stderr are heavily buffered resulting in seemingly lost logging # statements. We never want to close the stdout file descriptor, dup2 will # close it when necessary and we don't want python's GC to close it. - setter_func(open_log(fileno, closefd=False)) + setter_func(open_log(fileno, unbuffered=True, closefd=False)) return os.path.abspath(f.name)