Fix log monitor read error (#5221)

This commit is contained in:
Xianyang Liu
2019-08-02 06:47:10 +08:00
committed by Robert Nishihara
parent 20450a4e82
commit 3ae54a2b20
+25 -21
View File
@@ -4,6 +4,7 @@ from __future__ import print_function
import argparse
import errno
import glob
import json
import logging
import os
@@ -89,18 +90,20 @@ class LogMonitor(object):
def update_log_filenames(self):
"""Update the list of log files to monitor."""
log_filenames = os.listdir(self.logs_dir)
for log_filename in log_filenames:
full_path = os.path.join(self.logs_dir, log_filename)
if full_path not in self.log_filenames:
self.log_filenames.add(full_path)
# we only monior worker log files
log_file_paths = glob.glob("{}/worker*[.out|.err]".format(
self.logs_dir))
for file_path in log_file_paths:
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
self.log_filenames.add(file_path)
self.closed_file_infos.append(
LogFileInfo(
filename=full_path,
filename=file_path,
size_when_last_opened=0,
file_position=0,
file_handle=None))
log_filename = os.path.basename(file_path)
logger.info("Beginning to track file {}".format(log_filename))
def open_closed_files(self):
@@ -172,20 +175,21 @@ class LogMonitor(object):
lines_to_publish = []
max_num_lines_to_read = 100
for _ in range(max_num_lines_to_read):
next_line = file_info.file_handle.readline()
if next_line == "":
break
if next_line[-1] == "\n":
next_line = next_line[:-1]
lines_to_publish.append(next_line)
try:
next_line = file_info.file_handle.readline()
if next_line == "":
break
if next_line[-1] == "\n":
next_line = next_line[:-1]
lines_to_publish.append(next_line)
except Exception:
logger.error("Error: Reading file: {}, position: {} "
"failed.".format(
file_info.full_path,
file_info.file_info.file_handle.tell()))
raise
# Publish the lines if this is a worker process.
filename = file_info.filename.split("/")[-1]
is_worker = (filename.startswith("worker")
and (filename.endswith("out")
or filename.endswith("err")))
if is_worker and file_info.file_position == 0:
if file_info.file_position == 0:
if (len(lines_to_publish) > 0 and
lines_to_publish[0].startswith("Ray worker pid: ")):
file_info.worker_pid = int(
@@ -195,7 +199,7 @@ class LogMonitor(object):
# Record the current position in the file.
file_info.file_position = file_info.file_handle.tell()
if len(lines_to_publish) > 0 and is_worker:
if len(lines_to_publish) > 0:
self.redis_client.publish(
ray.gcs_utils.LOG_FILE_CHANNEL,
json.dumps({