From e9ee38ace23d2ffc0b642feee8c77ef347fef186 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 19 Feb 2019 19:53:43 -0800 Subject: [PATCH] More compact format for worker logs (#4092) --- python/ray/log_monitor.py | 21 ++++++++++----------- python/ray/worker.py | 16 +++++++++++++++- test/runtest.py | 6 +++--- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index ae569c5e7..9a058b3a9 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -4,14 +4,14 @@ from __future__ import print_function import argparse import errno +import json import logging import os import time import traceback -import colorama - import ray.ray_constants as ray_constants +import ray.services as services import ray.utils # Logger for this module. It should be configured at the entry point @@ -69,7 +69,7 @@ class LogMonitor(object): def __init__(self, logs_dir, redis_address, redis_password=None): """Initialize the log monitor object.""" - self.host = os.uname()[1] + self.ip = services.get_node_ip_address() self.logs_dir = logs_dir self.redis_client = ray.services.create_redis_client( redis_address, password=redis_password) @@ -184,7 +184,6 @@ class LogMonitor(object): is_worker = (filename.startswith("worker") and (filename.endswith("out") or filename.endswith("err"))) - output_type = "stdout" if filename.endswith("out") else "stderr" if is_worker and file_info.file_position == 0: if (len(lines_to_publish) > 0 and @@ -197,13 +196,13 @@ class LogMonitor(object): file_info.file_position = file_info.file_handle.tell() if len(lines_to_publish) > 0 and is_worker: - lines_to_publish.insert( - 0, "{}{}{} (pid={}, host={})".format( - colorama.Fore.CYAN, "worker ({})".format(output_type), - colorama.Fore.RESET, file_info.worker_pid, self.host)) - - self.redis_client.publish(ray.gcs_utils.LOG_FILE_CHANNEL, - "\n".join(lines_to_publish)) + self.redis_client.publish( + ray.gcs_utils.LOG_FILE_CHANNEL, + json.dumps({ + "ip": self.ip, + "pid": file_info.worker_pid, + "lines": lines_to_publish + })) anything_published = True return anything_published diff --git a/python/ray/worker.py b/python/ray/worker.py index 3af7c7011..a726b9a76 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -3,10 +3,12 @@ from __future__ import division from __future__ import print_function from contextlib import contextmanager +import colorama import atexit import faulthandler import hashlib import inspect +import json import logging import numpy as np import os @@ -1587,6 +1589,7 @@ def print_logs(redis_client, threads_stopped): """ pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True) pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL) + localhost = services.get_node_ip_address() try: # Keep track of the number of consecutive log messages that have been # received with no break in between. If this number grows continually, @@ -1604,7 +1607,18 @@ def print_logs(redis_client, threads_stopped): threads_stopped.wait(timeout=0.01) continue num_consecutive_messages_received += 1 - print(ray.utils.decode(msg["data"]), file=sys.stderr) + + data = json.loads(ray.utils.decode(msg["data"])) + if data["ip"] == localhost: + for line in data["lines"]: + print("{}{}(pid={}){} {}".format( + colorama.Style.DIM, colorama.Fore.CYAN, data["pid"], + colorama.Style.RESET_ALL, line)) + else: + for line in data["lines"]: + print("{}{}(pid={}, ip={}){} {}".format( + colorama.Style.DIM, colorama.Fore.CYAN, data["pid"], + data["ip"], colorama.Style.RESET_ALL, line)) if (num_consecutive_messages_received % 100 == 0 and num_consecutive_messages_received > 0): diff --git a/test/runtest.py b/test/runtest.py index 0f8f0f10e..c13a03c5f 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2535,10 +2535,10 @@ def test_logging_to_driver(shutdown_only): time.sleep(1) output_lines = captured["out"] - assert len(output_lines) == 0 - error_lines = captured["err"] for i in range(200): - assert str(i) in error_lines + assert str(i) in output_lines + error_lines = captured["err"] + assert len(error_lines) == 0 def test_not_logging_to_driver(shutdown_only):