mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 02:46:49 +08:00
More compact format for worker logs (#4092)
This commit is contained in:
committed by
Robert Nishihara
parent
c92a867c8b
commit
e9ee38ace2
+10
-11
@@ -4,14 +4,14 @@ from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
import errno
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import colorama
|
||||
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.services as services
|
||||
import ray.utils
|
||||
|
||||
# Logger for this module. It should be configured at the entry point
|
||||
@@ -69,7 +69,7 @@ class LogMonitor(object):
|
||||
|
||||
def __init__(self, logs_dir, redis_address, redis_password=None):
|
||||
"""Initialize the log monitor object."""
|
||||
self.host = os.uname()[1]
|
||||
self.ip = services.get_node_ip_address()
|
||||
self.logs_dir = logs_dir
|
||||
self.redis_client = ray.services.create_redis_client(
|
||||
redis_address, password=redis_password)
|
||||
@@ -184,7 +184,6 @@ class LogMonitor(object):
|
||||
is_worker = (filename.startswith("worker")
|
||||
and (filename.endswith("out")
|
||||
or filename.endswith("err")))
|
||||
output_type = "stdout" if filename.endswith("out") else "stderr"
|
||||
|
||||
if is_worker and file_info.file_position == 0:
|
||||
if (len(lines_to_publish) > 0 and
|
||||
@@ -197,13 +196,13 @@ class LogMonitor(object):
|
||||
file_info.file_position = file_info.file_handle.tell()
|
||||
|
||||
if len(lines_to_publish) > 0 and is_worker:
|
||||
lines_to_publish.insert(
|
||||
0, "{}{}{} (pid={}, host={})".format(
|
||||
colorama.Fore.CYAN, "worker ({})".format(output_type),
|
||||
colorama.Fore.RESET, file_info.worker_pid, self.host))
|
||||
|
||||
self.redis_client.publish(ray.gcs_utils.LOG_FILE_CHANNEL,
|
||||
"\n".join(lines_to_publish))
|
||||
self.redis_client.publish(
|
||||
ray.gcs_utils.LOG_FILE_CHANNEL,
|
||||
json.dumps({
|
||||
"ip": self.ip,
|
||||
"pid": file_info.worker_pid,
|
||||
"lines": lines_to_publish
|
||||
}))
|
||||
anything_published = True
|
||||
|
||||
return anything_published
|
||||
|
||||
+15
-1
@@ -3,10 +3,12 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from contextlib import contextmanager
|
||||
import colorama
|
||||
import atexit
|
||||
import faulthandler
|
||||
import hashlib
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
import os
|
||||
@@ -1587,6 +1589,7 @@ def print_logs(redis_client, threads_stopped):
|
||||
"""
|
||||
pubsub_client = redis_client.pubsub(ignore_subscribe_messages=True)
|
||||
pubsub_client.subscribe(ray.gcs_utils.LOG_FILE_CHANNEL)
|
||||
localhost = services.get_node_ip_address()
|
||||
try:
|
||||
# Keep track of the number of consecutive log messages that have been
|
||||
# received with no break in between. If this number grows continually,
|
||||
@@ -1604,7 +1607,18 @@ def print_logs(redis_client, threads_stopped):
|
||||
threads_stopped.wait(timeout=0.01)
|
||||
continue
|
||||
num_consecutive_messages_received += 1
|
||||
print(ray.utils.decode(msg["data"]), file=sys.stderr)
|
||||
|
||||
data = json.loads(ray.utils.decode(msg["data"]))
|
||||
if data["ip"] == localhost:
|
||||
for line in data["lines"]:
|
||||
print("{}{}(pid={}){} {}".format(
|
||||
colorama.Style.DIM, colorama.Fore.CYAN, data["pid"],
|
||||
colorama.Style.RESET_ALL, line))
|
||||
else:
|
||||
for line in data["lines"]:
|
||||
print("{}{}(pid={}, ip={}){} {}".format(
|
||||
colorama.Style.DIM, colorama.Fore.CYAN, data["pid"],
|
||||
data["ip"], colorama.Style.RESET_ALL, line))
|
||||
|
||||
if (num_consecutive_messages_received % 100 == 0
|
||||
and num_consecutive_messages_received > 0):
|
||||
|
||||
Reference in New Issue
Block a user