diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py new file mode 100644 index 000000000..febcd9e10 --- /dev/null +++ b/python/ray/log_monitor.py @@ -0,0 +1,85 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import os +import redis +import time + +from ray.services import get_ip_address +from ray.services import get_port + +class LogMonitor(object): + """A monitor process for monitoring Ray log files. + + Attributes: + node_ip_address: The IP address of the node that the log monitor process is + running on. This will be used to determine which log files to track. + redis_client: A client used to communicate with the Redis server. + log_filenames: A list of the names of the log files that this monitor + process is monitoring. + log_files: A dictionary mapping the name of a log file to a list of strings + representing its contents. + log_file_handles: A dictionary mapping the name of a log file to a file + handle for that file. + """ + def __init__(self, redis_ip_address, redis_port, node_ip_address): + """Initialize the log monitor object.""" + self.node_ip_address = node_ip_address + self.redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port) + self.log_files = {} + self.log_file_handles = {} + + def update_log_filenames(self): + """Get the most up-to-date list of log files to monitor from Redis.""" + num_current_log_files = len(self.log_files) + new_log_filenames = self.redis_client.lrange("LOG_FILENAMES:{}".format(self.node_ip_address), + num_current_log_files, -1) + for log_filename in new_log_filenames: + print("Beginning to track file {}".format(log_filename)) + assert log_filename not in self.log_files + self.log_files[log_filename] = [] + + def check_log_files_and_push_updates(self): + """Get any changes to the log files and push updates to Redis.""" + for log_filename in self.log_files: + if log_filename in self.log_file_handles: + # Get any updates to the file. + new_lines = self.log_file_handles[log_filename].readlines() + # If there are any new lines, cache them and also push them to Redis. + if len(new_lines) > 0: + self.log_files[log_filename] += new_lines + redis_key = "LOGFILE:{}:{}".format(self.node_ip_address, log_filename.decode("ascii")) + self.redis_client.rpush(redis_key, *new_lines) + else: + try: + self.log_file_handles[log_filename] = open(log_filename, "r") + except FileNotFoundError: + print("Warning: The file {} was not found.".format(log_filename)) + + def run(self): + """Run the log monitor. + + This will query Redis once every second to check if there are new log files + to monitor. It will also store those log files in Redis. + """ + while True: + self.update_log_filenames() + self.check_log_files_and_push_updates() + time.sleep(1) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=("Parse Redis server for the " + "log monitor to connect to.")) + parser.add_argument("--redis-address", required=True, type=str, + help="The address to use for Redis.") + parser.add_argument("--node-ip-address", required=True, type=str, + help="The IP address of the node this process is on.") + args = parser.parse_args() + + redis_ip_address = get_ip_address(args.redis_address) + redis_port = get_port(args.redis_address) + + log_monitor = LogMonitor(redis_ip_address, redis_port, args.node_ip_address) + log_monitor.run() diff --git a/python/ray/services.py b/python/ray/services.py index dbc1e8fe2..ae988586d 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -22,6 +22,7 @@ import ray.plasma import ray.global_scheduler as global_scheduler PROCESS_TYPE_MONITOR = "monitor" +PROCESS_TYPE_LOG_MONITOR = "log_monitor" PROCESS_TYPE_WORKER = "worker" PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler" PROCESS_TYPE_PLASMA_MANAGER = "plasma_manager" @@ -36,6 +37,7 @@ PROCESS_TYPE_WEB_UI = "web_ui" # terminated when Ray exits, and certain orders will cause errors to be logged # to the screen. all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []), + (PROCESS_TYPE_LOG_MONITOR, []), (PROCESS_TYPE_WORKER, []), (PROCESS_TYPE_LOCAL_SCHEDULER, []), (PROCESS_TYPE_PLASMA_MANAGER, []), @@ -164,6 +166,27 @@ def get_node_ip_address(address="8.8.8.8:53"): s.connect((ip_address, int(port))) return s.getsockname()[0] +def record_log_files_in_redis(redis_address, node_ip_address, log_files): + """Record in Redis that a new log file has been created. + + This is used so that each log monitor can check Redis and figure out which + log files it is reponsible for monitoring. + + Args: + redis_address: The address of the redis server. + node_ip_address: The IP address of the node that the log file exists on. + log_files: A list of file handles for the log files. If one of the file + handles is None, we ignore it. + """ + for log_file in log_files: + if log_file is not None: + redis_ip_address, redis_port = redis_address.split(":") + redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port) + # The name of the key storing the list of log filenames for this IP + # address. + log_file_list_key = "LOG_FILENAMES:{}".format(node_ip_address) + redis_client.rpush(log_file_list_key, log_file.name) + def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5): """Wait for a Redis server to be available. @@ -197,11 +220,13 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5): if counter == num_retries: raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.") -def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None, - cleanup=True): +def start_redis(node_ip_address="127.0.0.1", port=None, num_retries=20, + stdout_file=None, stderr_file=None, cleanup=True): """Start a Redis server. Args: + node_ip_address: The IP address of the current node. This is only used for + recording the log filenames in Redis. port (int): If provided, start a Redis server with this port. num_retries (int): The number of times to attempt to start Redis. stdout_file: A file handle opened for writing to redirect stdout to. If no @@ -261,14 +286,44 @@ def start_redis(port=None, num_retries=20, stdout_file=None, stderr_file=None, redis_client.config_set("protected-mode", "no") # Put a time stamp in Redis to indicate when it was started. redis_client.set("redis_start_time", time.time()) + # Record the log files in Redis. + record_log_files_in_redis(address(node_ip_address, port), node_ip_address, + [stdout_file, stderr_file]) return port, p -def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None, - cleanup=True): +def start_log_monitor(redis_address, node_ip_address, stdout_file=None, + stderr_file=None, cleanup=cleanup): + """Start a log monitor process. + + Args: + redis_address (str): The address of the Redis instance. + node_ip_address (str): The IP address of the node that this log monitor is + running on. + stdout_file: A file handle opened for writing to redirect stdout to. If no + redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If no + redirection should happen, then this should be None. + cleanup (bool): True if using Ray in local mode. If cleanup is true, then + this process will be killed by services.cleanup() when the Python process + that imported services exits. + """ + log_monitor_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "log_monitor.py") + p = subprocess.Popen(["python", log_monitor_filepath, + "--redis-address", redis_address, + "--node-ip-address", node_ip_address], + stdout=stdout_file, stderr=stderr_file) + if cleanup: + all_processes[PROCESS_TYPE_LOG_MONITOR].append(p) + record_log_files_in_redis(redis_address, node_ip_address, + [stdout_file, stderr_file]) + +def start_global_scheduler(redis_address, node_ip_address, stdout_file=None, + stderr_file=None, cleanup=True): """Start a global scheduler process. Args: redis_address (str): The address of the Redis instance. + node_ip_address: The IP address of the node that this scheduler will run on. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If no @@ -282,14 +337,17 @@ def start_global_scheduler(redis_address, stdout_file=None, stderr_file=None, stderr_file=stderr_file) if cleanup: all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p) + record_log_files_in_redis(redis_address, node_ip_address, + [stdout_file, stderr_file]) -def start_webui(redis_address, backend_stdout_file=None, +def start_webui(redis_address, node_ip_address, backend_stdout_file=None, backend_stderr_file=None, polymer_stdout_file=None, polymer_stderr_file=None, cleanup=True): """Attempt to start the Ray web UI. Args: redis_address (str): The address of the Redis server. + node_ip_address: The IP address of the node that this process will run on. backend_stdout_file: A file handle opened for writing to redirect the backend stdout to. If no redirection should happen, then this should be None. @@ -326,8 +384,8 @@ def start_webui(redis_address, backend_stdout_file=None, backend_process = subprocess.Popen([python_executable, webui_backend_filepath, "--redis-address", redis_address], - stdout=backend_stdout_file, - stderr=backend_stderr_file) + stdout=backend_stdout_file, + stderr=backend_stderr_file) time.sleep(0.1) if backend_process.poll() is not None: @@ -369,6 +427,9 @@ def start_webui(redis_address, backend_stdout_file=None, if cleanup: all_processes[PROCESS_TYPE_WEB_UI].append(backend_process) all_processes[PROCESS_TYPE_WEB_UI].append(polymer_process) + record_log_files_in_redis(redis_address, node_ip_address, + [backend_stdout_file, backend_stderr_file, + polymer_stdout_file, polymer_stderr_file]) return True @@ -431,6 +492,8 @@ def start_local_scheduler(redis_address, num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) + record_log_files_in_redis(redis_address, node_ip_address, + [stdout_file, stderr_file]) return local_scheduler_name def start_objstore(node_ip_address, redis_address, object_manager_port=None, @@ -517,6 +580,9 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None, if cleanup: all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1) all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) + record_log_files_in_redis(redis_address, node_ip_address, + [store_stdout_file, store_stderr_file, + manager_stdout_file, manager_stderr_file]) return ObjectStoreAddress(plasma_store_name, plasma_manager_name, plasma_manager_port) @@ -553,13 +619,16 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) if cleanup: all_processes[PROCESS_TYPE_WORKER].append(p) + record_log_files_in_redis(redis_address, node_ip_address, + [stdout_file, stderr_file]) -def start_monitor(redis_address, stdout_file=None, stderr_file=None, - cleanup=True): +def start_monitor(redis_address, node_ip_address, stdout_file=None, + stderr_file=None, cleanup=True): """Run a process to monitor the other processes. Args: redis_address (str): The address that the Redis server is listening on. + node_ip_address: The IP address of the node that this process will run on. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If no @@ -575,6 +644,8 @@ def start_monitor(redis_address, stdout_file=None, stderr_file=None, p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) if cleanup: all_processes[PROCESS_TYPE_WORKER].append(p) + record_log_files_in_redis(redis_address, node_ip_address, + [stdout_file, stderr_file]) def start_ray_processes(address_info=None, node_ip_address="127.0.0.1", @@ -585,6 +656,7 @@ def start_ray_processes(address_info=None, redirect_output=False, include_global_scheduler=False, include_redis=False, + include_log_monitor=False, include_webui=False, start_workers_from_local_scheduler=True, num_cpus=None, @@ -613,6 +685,8 @@ def start_ray_processes(address_info=None, start a global scheduler process. include_redis (bool): If include_redis is True, then start a Redis server process. + include_log_monitor (bool): If True, then start a log monitor to monitor the + log files for all processes on this node and push their contents to Redis. include_webui (bool): If True, then attempt to start the web UI. Note that this is only possible with Python 3. start_workers_from_local_scheduler (bool): If this flag is True, then start @@ -650,7 +724,8 @@ def start_ray_processes(address_info=None, redis_stdout_file, redis_stderr_file = new_log_files("redis", redirect_output) if redis_address is None: # Start a Redis server. The start_redis method will choose a random port. - redis_port, _ = start_redis(stdout_file=redis_stdout_file, + redis_port, _ = start_redis(node_ip_address, + stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup) redis_address = address(node_ip_address, redis_port) @@ -671,16 +746,28 @@ def start_ray_processes(address_info=None, # Start monitoring the processes. monitor_stdout_file, monitor_stderr_file = new_log_files("monitor", redirect_output) start_monitor(redis_address, + node_ip_address, stdout_file=monitor_stdout_file, stderr_file=monitor_stderr_file) else: if redis_address is None: raise Exception("Redis address expected") + # Start the log monitor, if necessary. + if include_log_monitor: + log_monitor_stdout_file, log_monitor_stderr_file = new_log_files("log_monitor", + redirect_output=True) + start_log_monitor(redis_address, + node_ip_address, + stdout_file=log_monitor_stdout_file, + stderr_file=log_monitor_stderr_file, + cleanup=cleanup) + # Start the global scheduler, if necessary. if include_global_scheduler: global_scheduler_stdout_file, global_scheduler_stderr_file = new_log_files("global_scheduler", redirect_output) start_global_scheduler(redis_address, + node_ip_address, stdout_file=global_scheduler_stdout_file, stderr_file=global_scheduler_stderr_file, cleanup=cleanup) @@ -704,7 +791,8 @@ def start_ray_processes(address_info=None, # Start Plasma. plasma_store_stdout_file, plasma_store_stderr_file = new_log_files("plasma_store_{}".format(i), redirect_output) plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files("plasma_manager_{}".format(i), redirect_output) - object_store_address = start_objstore(node_ip_address, redis_address, + object_store_address = start_objstore(node_ip_address, + redis_address, object_manager_port=object_manager_ports[i], store_stdout_file=plasma_store_stdout_file, store_stderr_file=plasma_store_stderr_file, @@ -782,6 +870,7 @@ def start_ray_processes(address_info=None, polymer_stdout_file, polymer_stderr_file = new_log_files("webui_polymer", redirect_output=True) successfully_started = start_webui(redis_address, + node_ip_address, backend_stdout_file=backend_stdout_file, backend_stderr_file=backend_stderr_file, polymer_stdout_file=polymer_stdout_file, @@ -839,6 +928,7 @@ def start_ray_node(node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, worker_path=worker_path, + include_log_monitor=True, cleanup=cleanup, redirect_output=redirect_output, num_cpus=num_cpus, @@ -892,6 +982,7 @@ def start_ray_head(address_info=None, cleanup=cleanup, redirect_output=redirect_output, include_global_scheduler=True, + include_log_monitor=True, include_redis=True, include_webui=True, start_workers_from_local_scheduler=start_workers_from_local_scheduler, diff --git a/scripts/stop_ray.sh b/scripts/stop_ray.sh index d983fe0d7..bda47f56b 100755 --- a/scripts/stop_ray.sh +++ b/scripts/stop_ray.sh @@ -16,3 +16,6 @@ killall polymer # Find the PID of the Ray UI backend process and kill it. kill $(ps aux | grep ray_ui.py | awk '{ print $2 }') 2> /dev/null + +# Find the PID of the Ray log monitor process and kill it. +kill $(ps aux | grep log_monitor.py | awk '{ print $2 }') 2> /dev/null diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index b8c107551..3b7538ab5 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -12,6 +12,9 @@ import sys import time import websockets +# Import flatbuffer bindings. +from ray.core.generated.LocalSchedulerInfoMessage import LocalSchedulerInfoMessage + parser = argparse.ArgumentParser(description="parse information for the web ui") parser.add_argument("--redis-address", required=True, type=str, help="the address to use for redis") @@ -212,17 +215,19 @@ async def handle_get_recent_tasks(websocket, redis_conn, num_tasks): task_get_arguments_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:get_arguments"] task_execute_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:execute"] task_store_outputs_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:store_outputs"] - task_data[node_index]["task_data"].append( - {"task": task_times, - "get_arguments": task_get_arguments_times, - "execute": task_execute_times, - "store_outputs": task_store_outputs_times, - "worker_index": worker_index, - "node_ip_address": node_ip_address, - "task_formatted_time": duration_to_string(task_times[1] - task_times[0]), - "get_arguments_formatted_time": duration_to_string(task_get_arguments_times[1] - task_get_arguments_times[0]), - "execute_formatted_time": duration_to_string(task_execute_times[1] - task_execute_times[0]), - "store_outputs_formatted_time": duration_to_string(task_store_outputs_times[1] - task_store_outputs_times[0])}) + task_info = {"task": task_times, + "get_arguments": task_get_arguments_times, + "execute": task_execute_times, + "store_outputs": task_store_outputs_times, + "worker_index": worker_index, + "node_ip_address": node_ip_address, + "task_formatted_time": duration_to_string(task_times[1] - task_times[0]), + "get_arguments_formatted_time": duration_to_string(task_get_arguments_times[1] - task_get_arguments_times[0])} + if len(task_execute_times) == 2: + task_info["execute_formatted_time"] = duration_to_string(task_execute_times[1] - task_execute_times[0]) + if len(task_store_outputs_times) == 2: + task_info["store_outputs_formatted_time"] = duration_to_string(task_store_outputs_times[1] - task_store_outputs_times[0]) + task_data[node_index]["task_data"].append(task_info) num_tasks += 1 reply = {"min_time": min_time, "max_time": max_time, @@ -267,7 +272,8 @@ async def send_heartbeats(websocket, redis_conn): while True: msg = await redis_conn.pubsub_channels["local_schedulers"].get() - local_scheduler_id_bytes = msg[:IDENTIFIER_LENGTH] + heartbeat = LocalSchedulerInfoMessage.GetRootAsLocalSchedulerInfoMessage(msg, 0) + local_scheduler_id_bytes = heartbeat.DbClientId() local_scheduler_id = hex_identifier(local_scheduler_id_bytes) if local_scheduler_id not in local_schedulers: # A new local scheduler has joined the cluster. Ignore it. This won't be @@ -282,10 +288,25 @@ async def cache_data_from_redis(redis_ip_address, redis_port): asyncio.ensure_future(listen_for_errors(redis_ip_address, redis_port)) +async def handle_get_log_files(websocket, redis_conn): + reply = {} + # First get all keys for the log file lists. + log_file_list_keys = await redis_conn.execute("keys", "LOG_FILENAMES:*") + for log_file_list_key in log_file_list_keys: + node_ip_address = log_file_list_key.decode("ascii").split(":")[1] + reply[node_ip_address] = {} + # Get all of the log filenames for this node IP address. + log_filenames = await redis_conn.execute("lrange", log_file_list_key, 0, -1) + for log_filename in log_filenames: + log_filename_key = "LOGFILE:{}:{}".format(node_ip_address, log_filename.decode("ascii")) + logfile = await redis_conn.execute("lrange", log_filename_key, 0, -1) + logfile = [line.decode("ascii") for line in logfile] + reply[node_ip_address][log_filename.decode("ascii")] = logfile + + # Send the reply back to the front end. + await websocket.send(json.dumps(reply)) + async def serve_requests(websocket, path): - # We loop infinitely because otherwise the websocket will be closed. - # TODO(rkn): Maybe we should open a new web sockets for every request instead - # of looping here. redis_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) while True: command = json.loads(await websocket.recv()) @@ -301,6 +322,8 @@ async def serve_requests(websocket, path): await handle_get_errors(websocket) elif command["command"] == "get-heartbeats": await send_heartbeats(websocket, redis_conn) + elif command["command"] == "get-log-files": + await handle_get_log_files(websocket, redis_conn) if command["command"] == "get-workers": result = [] diff --git a/webui/src/ray-app.html b/webui/src/ray-app.html index 83e94e75e..1eaa2b6fd 100644 --- a/webui/src/ray-app.html +++ b/webui/src/ray-app.html @@ -68,6 +68,7 @@ Errors Timeline Recent Tasks + Log Files @@ -94,6 +95,7 @@ + diff --git a/webui/src/ray-log-files.html b/webui/src/ray-log-files.html new file mode 100644 index 000000000..94bd3fbe0 --- /dev/null +++ b/webui/src/ray-log-files.html @@ -0,0 +1,90 @@ + + + + + + + +