Files
ray/python/ray/log_monitor.py
T
Yuhong Guo 0b6e08ebee Separate python logger module-wise (#2703)
## What do these changes do?
1. Separate the log related code to logger.py from services.py.
2. Allow users to modify logging formatter in `ray start`.

## Related issue number
https://github.com/ray-project/ray/pull/2664
2018-08-26 13:46:14 -07:00

157 lines
5.9 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import os
import redis
import time
import ray.ray_constants as ray_constants
from ray.services import get_ip_address
from ray.services import get_port
import ray.utils
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)
class LogMonitor(object):
"""A monitor process for monitoring Ray log files.
Attributes:
node_ip_address: The IP address of the node that the log monitor
process is running on. This will be used to determine which log
files to track.
redis_client: A client used to communicate with the Redis server.
log_filenames: A list of the names of the log files that this monitor
process is monitoring.
log_files: A dictionary mapping the name of a log file to a list of
strings representing its contents.
log_file_handles: A dictionary mapping the name of a log file to a file
handle for that file.
"""
def __init__(self, redis_ip_address, redis_port, node_ip_address):
"""Initialize the log monitor object."""
self.node_ip_address = node_ip_address
self.redis_client = redis.StrictRedis(
host=redis_ip_address, port=redis_port)
self.log_files = {}
self.log_file_handles = {}
self.files_to_ignore = set()
def update_log_filenames(self):
"""Get the most up-to-date list of log files to monitor from Redis."""
num_current_log_files = len(self.log_files)
new_log_filenames = self.redis_client.lrange(
"LOG_FILENAMES:{}".format(self.node_ip_address),
num_current_log_files, -1)
for log_filename in new_log_filenames:
logger.info("Beginning to track file {}".format(log_filename))
assert log_filename not in self.log_files
self.log_files[log_filename] = []
def check_log_files_and_push_updates(self):
"""Get any changes to the log files and push updates to Redis."""
for log_filename in self.log_files:
if log_filename in self.log_file_handles:
# Get any updates to the file.
new_lines = []
while True:
current_position = (
self.log_file_handles[log_filename].tell())
next_line = self.log_file_handles[log_filename].readline()
if next_line != "":
new_lines.append(next_line)
else:
self.log_file_handles[log_filename].seek(
current_position)
break
# If there are any new lines, cache them and also push them to
# Redis.
if len(new_lines) > 0:
self.log_files[log_filename] += new_lines
redis_key = "LOGFILE:{}:{}".format(
self.node_ip_address, ray.utils.decode(log_filename))
self.redis_client.rpush(redis_key, *new_lines)
# Pass if we already failed to open the log file.
elif log_filename in self.files_to_ignore:
pass
# Try to open this file for the first time.
else:
try:
self.log_file_handles[log_filename] = open(
log_filename, "r")
except IOError as e:
if e.errno == os.errno.EMFILE:
logger.warning(
"Warning: Ignoring {} because there are too "
"many open files.".format(log_filename))
elif e.errno == os.errno.ENOENT:
logger.warning("Warning: The file {} was not "
"found.".format(log_filename))
else:
raise e
# Don't try to open this file any more.
self.files_to_ignore.add(log_filename)
def run(self):
"""Run the log monitor.
This will query Redis once every second to check if there are new log
files to monitor. It will also store those log files in Redis.
"""
while True:
self.update_log_filenames()
self.check_log_files_and_push_updates()
time.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Parse Redis server for the "
"log monitor to connect "
"to."))
parser.add_argument(
"--redis-address",
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--node-ip-address",
required=True,
type=str,
help="The IP address of the node this process is on.")
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
args = parser.parse_args()
logging.basicConfig(
level=logging.getLevelName(args.logging_level.upper()),
format=args.logging_format)
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
log_monitor = LogMonitor(redis_ip_address, redis_port,
args.node_ip_address)
log_monitor.run()