diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 9a058b3a9..421231686 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -4,6 +4,7 @@ from __future__ import print_function import argparse import errno +import glob import json import logging import os @@ -89,18 +90,20 @@ class LogMonitor(object): def update_log_filenames(self): """Update the list of log files to monitor.""" - log_filenames = os.listdir(self.logs_dir) - - for log_filename in log_filenames: - full_path = os.path.join(self.logs_dir, log_filename) - if full_path not in self.log_filenames: - self.log_filenames.add(full_path) + # we only monior worker log files + log_file_paths = glob.glob("{}/worker*[.out|.err]".format( + self.logs_dir)) + for file_path in log_file_paths: + if os.path.isfile( + file_path) and file_path not in self.log_filenames: + self.log_filenames.add(file_path) self.closed_file_infos.append( LogFileInfo( - filename=full_path, + filename=file_path, size_when_last_opened=0, file_position=0, file_handle=None)) + log_filename = os.path.basename(file_path) logger.info("Beginning to track file {}".format(log_filename)) def open_closed_files(self): @@ -172,20 +175,21 @@ class LogMonitor(object): lines_to_publish = [] max_num_lines_to_read = 100 for _ in range(max_num_lines_to_read): - next_line = file_info.file_handle.readline() - if next_line == "": - break - if next_line[-1] == "\n": - next_line = next_line[:-1] - lines_to_publish.append(next_line) + try: + next_line = file_info.file_handle.readline() + if next_line == "": + break + if next_line[-1] == "\n": + next_line = next_line[:-1] + lines_to_publish.append(next_line) + except Exception: + logger.error("Error: Reading file: {}, position: {} " + "failed.".format( + file_info.full_path, + file_info.file_info.file_handle.tell())) + raise - # Publish the lines if this is a worker process. - filename = file_info.filename.split("/")[-1] - is_worker = (filename.startswith("worker") - and (filename.endswith("out") - or filename.endswith("err"))) - - if is_worker and file_info.file_position == 0: + if file_info.file_position == 0: if (len(lines_to_publish) > 0 and lines_to_publish[0].startswith("Ray worker pid: ")): file_info.worker_pid = int( @@ -195,7 +199,7 @@ class LogMonitor(object): # Record the current position in the file. file_info.file_position = file_info.file_handle.tell() - if len(lines_to_publish) > 0 and is_worker: + if len(lines_to_publish) > 0: self.redis_client.publish( ray.gcs_utils.LOG_FILE_CHANNEL, json.dumps({