From ddfababb8286430b75a5c0edaeaece9db8319f88 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 27 Aug 2019 12:47:00 -0700 Subject: [PATCH] Fix log files being opened as unicode files (#5545) --- python/ray/log_monitor.py | 6 +++++- python/ray/tests/test_basic.py | 27 +++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 8b6b3547e..6d534ff50 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -144,7 +144,7 @@ class LogMonitor(object): # file. if file_size > file_info.size_when_last_opened: try: - f = open(file_info.filename, "r") + f = open(file_info.filename, "rb") except (IOError, OSError) as e: if e.errno == errno.ENOENT: logger.warning("Warning: The file {} was not " @@ -179,6 +179,10 @@ class LogMonitor(object): for _ in range(max_num_lines_to_read): try: next_line = file_info.file_handle.readline() + # Replace any characters not in UTF-8 with + # a replacement character, see + # https://stackoverflow.com/a/38565489/10891801 + next_line = next_line.decode("utf-8", "replace") if next_line == "": break if next_line[-1] == "\n": diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 60a7864c2..3b575c1e0 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -5,6 +5,7 @@ from __future__ import print_function import collections from concurrent.futures import ThreadPoolExecutor +import glob import json import logging from multiprocessing import Process @@ -3113,3 +3114,29 @@ def test_export_after_shutdown(ray_start_regular): ray.get(actor_handle.method.remote()) ray.get(export_definitions_from_worker.remote(f, Actor)) + + +def test_invalid_unicode_in_worker_log(shutdown_only): + info = ray.init(num_cpus=1) + + logs_dir = os.path.join(info["session_dir"], "logs") + + # Wait till first worker log file is created. + while True: + log_file_paths = glob.glob("{}/worker*.out".format(logs_dir)) + if len(log_file_paths) == 0: + time.sleep(0.2) + else: + break + + with open(log_file_paths[0], "wb") as f: + f.write(b"\xe5abc\nline2\nline3\n") + f.write(b"\xe5abc\nline2\nline3\n") + f.write(b"\xe5abc\nline2\nline3\n") + f.flush() + + # Wait till the log monitor reads the file. + time.sleep(1.0) + + # Make sure that nothing has died. + assert ray.services.remaining_processes_alive()