Put all log files in redis and visualize them in UI. (#350)

* Start process for monitoring log files and push changes to redis.

* Display log files in UI.

* Bug fix for recent tasks.

* Use flatbuffers to parse local scheduler heartbeats.
This commit is contained in:
Robert Nishihara
2017-03-16 15:27:00 -07:00
committed by Philipp Moritz
parent 3333e1d6b9
commit f1d4dda8cb
6 changed files with 320 additions and 26 deletions
+85
View File
@@ -0,0 +1,85 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import redis
import time
from ray.services import get_ip_address
from ray.services import get_port
class LogMonitor(object):
"""A monitor process for monitoring Ray log files.
Attributes:
node_ip_address: The IP address of the node that the log monitor process is
running on. This will be used to determine which log files to track.
redis_client: A client used to communicate with the Redis server.
log_filenames: A list of the names of the log files that this monitor
process is monitoring.
log_files: A dictionary mapping the name of a log file to a list of strings
representing its contents.
log_file_handles: A dictionary mapping the name of a log file to a file
handle for that file.
"""
def __init__(self, redis_ip_address, redis_port, node_ip_address):
"""Initialize the log monitor object."""
self.node_ip_address = node_ip_address
self.redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port)
self.log_files = {}
self.log_file_handles = {}
def update_log_filenames(self):
"""Get the most up-to-date list of log files to monitor from Redis."""
num_current_log_files = len(self.log_files)
new_log_filenames = self.redis_client.lrange("LOG_FILENAMES:{}".format(self.node_ip_address),
num_current_log_files, -1)
for log_filename in new_log_filenames:
print("Beginning to track file {}".format(log_filename))
assert log_filename not in self.log_files
self.log_files[log_filename] = []
def check_log_files_and_push_updates(self):
"""Get any changes to the log files and push updates to Redis."""
for log_filename in self.log_files:
if log_filename in self.log_file_handles:
# Get any updates to the file.
new_lines = self.log_file_handles[log_filename].readlines()
# If there are any new lines, cache them and also push them to Redis.
if len(new_lines) > 0:
self.log_files[log_filename] += new_lines
redis_key = "LOGFILE:{}:{}".format(self.node_ip_address, log_filename.decode("ascii"))
self.redis_client.rpush(redis_key, *new_lines)
else:
try:
self.log_file_handles[log_filename] = open(log_filename, "r")
except FileNotFoundError:
print("Warning: The file {} was not found.".format(log_filename))
def run(self):
"""Run the log monitor.
This will query Redis once every second to check if there are new log files
to monitor. It will also store those log files in Redis.
"""
while True:
self.update_log_filenames()
self.check_log_files_and_push_updates()
time.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=("Parse Redis server for the "
"log monitor to connect to."))
parser.add_argument("--redis-address", required=True, type=str,
help="The address to use for Redis.")
parser.add_argument("--node-ip-address", required=True, type=str,
help="The IP address of the node this process is on.")
args = parser.parse_args()
redis_ip_address = get_ip_address(args.redis_address)
redis_port = get_port(args.redis_address)
log_monitor = LogMonitor(redis_ip_address, redis_port, args.node_ip_address)
log_monitor.run()
+102 -11
View File
@@ -22,6 +22,7 @@ import ray.plasma
import ray.global_scheduler as global_scheduler
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_WORKER = "worker"
PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler"
PROCESS_TYPE_PLASMA_MANAGER = "plasma_manager"
@@ -36,6 +37,7 @@ PROCESS_TYPE_WEB_UI = "web_ui"
# terminated when Ray exits, and certain orders will cause errors to be logged
# to the screen.
all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []),
(PROCESS_TYPE_LOG_MONITOR, []),
(PROCESS_TYPE_WORKER, []),
(PROCESS_TYPE_LOCAL_SCHEDULER, []),
(PROCESS_TYPE_PLASMA_MANAGER, []),
@@ -164,6 +166,27 @@ def get_node_ip_address(address="8.8.8.8:53"):
s.connect((ip_address, int(port)))
return s.getsockname()[0]
def record_log_files_in_redis(redis_address, node_ip_address, log_files):
"""Record in Redis that a new log file has been created.
This is used so that each log monitor can check Redis and figure out which
log files it is reponsible for monitoring.
Args:
redis_address: The address of the redis server.
node_ip_address: The IP address of the node that the log file exists on.
log_files: A list of file handles for the log files. If one of the file
handles is None, we ignore it.
"""
for log_file in log_files:
if log_file is not None:
redis_ip_address, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port)
# The name of the key storing the list of log filenames for this IP
# address.
log_file_list_key = "LOG_FILENAMES:{}".format(node_ip_address)
redis_client.rpush(log_file_list_key, log_file.name)
def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5):
"""Wait for a Redis server to be available.
@@ -197,11 +220,13 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5):
if counter == num_retries:
raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.")
def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None,
cleanup=True):
def start_redis(node_ip_address="127.0.0.1", port=None, num_retries=20,
stdout_file=None, stderr_file=None, cleanup=True):
"""Start a Redis server.
Args:
node_ip_address: The IP address of the current node. This is only used for
recording the log filenames in Redis.
port (int): If provided, start a Redis server with this port.
num_retries (int): The number of times to attempt to start Redis.
stdout_file: A file handle opened for writing to redirect stdout to. If no
@@ -261,14 +286,44 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None,
redis_client.config_set("protected-mode", "no")
# Put a time stamp in Redis to indicate when it was started.
redis_client.set("redis_start_time", time.time())
# Record the log files in Redis.
record_log_files_in_redis(address(node_ip_address, port), node_ip_address,
[stdout_file, stderr_file])
return port, p
def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None,
cleanup=True):
def start_log_monitor(redis_address, node_ip_address, stdout_file=None,
stderr_file=None, cleanup=cleanup):
"""Start a log monitor process.
Args:
redis_address (str): The address of the Redis instance.
node_ip_address (str): The IP address of the node that this log monitor is
running on.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by services.cleanup() when the Python process
that imported services exits.
"""
log_monitor_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "log_monitor.py")
p = subprocess.Popen(["python", log_monitor_filepath,
"--redis-address", redis_address,
"--node-ip-address", node_ip_address],
stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_LOG_MONITOR].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_global_scheduler(redis_address, node_ip_address, stdout_file=None,
stderr_file=None, cleanup=True):
"""Start a global scheduler process.
Args:
redis_address (str): The address of the Redis instance.
node_ip_address: The IP address of the node that this scheduler will run on.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
@@ -282,14 +337,17 @@ def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None,
stderr_file=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_webui(redis_address, backend_stdout_file=None,
def start_webui(redis_address, node_ip_address, backend_stdout_file=None,
backend_stderr_file=None, polymer_stdout_file=None,
polymer_stderr_file=None, cleanup=True):
"""Attempt to start the Ray web UI.
Args:
redis_address (str): The address of the Redis server.
node_ip_address: The IP address of the node that this process will run on.
backend_stdout_file: A file handle opened for writing to redirect the
backend stdout to. If no redirection should happen, then this should be
None.
@@ -326,8 +384,8 @@ def start_webui(redis_address, backend_stdout_file=None,
backend_process = subprocess.Popen([python_executable,
webui_backend_filepath,
"--redis-address", redis_address],
stdout=backend_stdout_file,
stderr=backend_stderr_file)
stdout=backend_stdout_file,
stderr=backend_stderr_file)
time.sleep(0.1)
if backend_process.poll() is not None:
@@ -369,6 +427,9 @@ def start_webui(redis_address, backend_stdout_file=None,
if cleanup:
all_processes[PROCESS_TYPE_WEB_UI].append(backend_process)
all_processes[PROCESS_TYPE_WEB_UI].append(polymer_process)
record_log_files_in_redis(redis_address, node_ip_address,
[backend_stdout_file, backend_stderr_file,
polymer_stdout_file, polymer_stderr_file])
return True
@@ -431,6 +492,8 @@ def start_local_scheduler(redis_address,
num_workers=num_workers)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
return local_scheduler_name
def start_objstore(node_ip_address, redis_address, object_manager_port=None,
@@ -517,6 +580,9 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None,
if cleanup:
all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1)
all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2)
record_log_files_in_redis(redis_address, node_ip_address,
[store_stdout_file, store_stderr_file,
manager_stdout_file, manager_stderr_file])
return ObjectStoreAddress(plasma_store_name, plasma_manager_name,
plasma_manager_port)
@@ -553,13 +619,16 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name,
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_WORKER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_monitor(redis_address, stdout_file=None, stderr_file=None,
cleanup=True):
def start_monitor(redis_address, node_ip_address, stdout_file=None,
stderr_file=None, cleanup=True):
"""Run a process to monitor the other processes.
Args:
redis_address (str): The address that the Redis server is listening on.
node_ip_address: The IP address of the node that this process will run on.
stdout_file: A file handle opened for writing to redirect stdout to. If no
redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If no
@@ -575,6 +644,8 @@ def start_monitor(redis_address, stdout_file=None, stderr_file=None,
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_WORKER].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])
def start_ray_processes(address_info=None,
node_ip_address="127.0.0.1",
@@ -585,6 +656,7 @@ def start_ray_processes(address_info=None,
redirect_output=False,
include_global_scheduler=False,
include_redis=False,
include_log_monitor=False,
include_webui=False,
start_workers_from_local_scheduler=True,
num_cpus=None,
@@ -613,6 +685,8 @@ def start_ray_processes(address_info=None,
start a global scheduler process.
include_redis (bool): If include_redis is True, then start a Redis server
process.
include_log_monitor (bool): If True, then start a log monitor to monitor the
log files for all processes on this node and push their contents to Redis.
include_webui (bool): If True, then attempt to start the web UI. Note that
this is only possible with Python 3.
start_workers_from_local_scheduler (bool): If this flag is True, then start
@@ -650,7 +724,8 @@ def start_ray_processes(address_info=None,
redis_stdout_file, redis_stderr_file = new_log_files("redis", redirect_output)
if redis_address is None:
# Start a Redis server. The start_redis method will choose a random port.
redis_port, _ = start_redis(stdout_file=redis_stdout_file,
redis_port, _ = start_redis(node_ip_address,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup)
redis_address = address(node_ip_address, redis_port)
@@ -671,16 +746,28 @@ def start_ray_processes(address_info=None,
# Start monitoring the processes.
monitor_stdout_file, monitor_stderr_file = new_log_files("monitor", redirect_output)
start_monitor(redis_address,
node_ip_address,
stdout_file=monitor_stdout_file,
stderr_file=monitor_stderr_file)
else:
if redis_address is None:
raise Exception("Redis address expected")
# Start the log monitor, if necessary.
if include_log_monitor:
log_monitor_stdout_file, log_monitor_stderr_file = new_log_files("log_monitor",
redirect_output=True)
start_log_monitor(redis_address,
node_ip_address,
stdout_file=log_monitor_stdout_file,
stderr_file=log_monitor_stderr_file,
cleanup=cleanup)
# Start the global scheduler, if necessary.
if include_global_scheduler:
global_scheduler_stdout_file, global_scheduler_stderr_file = new_log_files("global_scheduler", redirect_output)
start_global_scheduler(redis_address,
node_ip_address,
stdout_file=global_scheduler_stdout_file,
stderr_file=global_scheduler_stderr_file,
cleanup=cleanup)
@@ -704,7 +791,8 @@ def start_ray_processes(address_info=None,
# Start Plasma.
plasma_store_stdout_file, plasma_store_stderr_file = new_log_files("plasma_store_{}".format(i), redirect_output)
plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files("plasma_manager_{}".format(i), redirect_output)
object_store_address = start_objstore(node_ip_address, redis_address,
object_store_address = start_objstore(node_ip_address,
redis_address,
object_manager_port=object_manager_ports[i],
store_stdout_file=plasma_store_stdout_file,
store_stderr_file=plasma_store_stderr_file,
@@ -782,6 +870,7 @@ def start_ray_processes(address_info=None,
polymer_stdout_file, polymer_stderr_file = new_log_files("webui_polymer",
redirect_output=True)
successfully_started = start_webui(redis_address,
node_ip_address,
backend_stdout_file=backend_stdout_file,
backend_stderr_file=backend_stderr_file,
polymer_stdout_file=polymer_stdout_file,
@@ -839,6 +928,7 @@ def start_ray_node(node_ip_address,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
worker_path=worker_path,
include_log_monitor=True,
cleanup=cleanup,
redirect_output=redirect_output,
num_cpus=num_cpus,
@@ -892,6 +982,7 @@ def start_ray_head(address_info=None,
cleanup=cleanup,
redirect_output=redirect_output,
include_global_scheduler=True,
include_log_monitor=True,
include_redis=True,
include_webui=True,
start_workers_from_local_scheduler=start_workers_from_local_scheduler,