mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 20:56:34 +08:00
Fix O(n^2) behavior in the log_monitor (#5569)
This commit is contained in:
@@ -8,6 +8,7 @@ import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
import traceback
|
||||
|
||||
@@ -85,7 +86,26 @@ class LogMonitor(object):
|
||||
file_info = self.open_file_infos.pop(0)
|
||||
file_info.file_handle.close()
|
||||
file_info.file_handle = None
|
||||
self.closed_file_infos.append(file_info)
|
||||
try:
|
||||
# Test if the worker process that generated the log file
|
||||
# is still alive.
|
||||
os.kill(file_info.worker_pid, 0)
|
||||
except OSError:
|
||||
# The process is not alive any more, so move the log file
|
||||
# out of the log directory so glob.glob will not be slowed
|
||||
# by it.
|
||||
target = os.path.join(self.logs_dir, "old",
|
||||
os.path.basename(file_info.filename))
|
||||
try:
|
||||
shutil.move(file_info.filename, target)
|
||||
except (IOError, OSError) as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
logger.warning("Warning: The file {} was not "
|
||||
"found.".format(file_info.filename))
|
||||
else:
|
||||
raise e
|
||||
else:
|
||||
self.closed_file_infos.append(file_info)
|
||||
self.can_open_more_files = True
|
||||
|
||||
def update_log_filenames(self):
|
||||
|
||||
@@ -180,6 +180,8 @@ class Node(object):
|
||||
# Create a directory to be used for process log files.
|
||||
self._logs_dir = os.path.join(self._session_dir, "logs")
|
||||
try_to_create_directory(self._logs_dir, warn_if_exist=False)
|
||||
old_logs_dir = os.path.join(self._logs_dir, "old")
|
||||
try_to_create_directory(old_logs_dir, warn_if_exist=False)
|
||||
|
||||
def get_resource_spec(self):
|
||||
"""Resolve and return the current resource spec for the node."""
|
||||
|
||||
@@ -28,6 +28,7 @@ import pickle
|
||||
import pytest
|
||||
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.tests.cluster_utils
|
||||
import ray.tests.utils
|
||||
|
||||
@@ -3140,3 +3141,38 @@ def test_invalid_unicode_in_worker_log(shutdown_only):
|
||||
|
||||
# Make sure that nothing has died.
|
||||
assert ray.services.remaining_processes_alive()
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="This test is too expensive to run.")
|
||||
def test_move_log_files_to_old(shutdown_only):
|
||||
info = ray.init(num_cpus=1)
|
||||
|
||||
logs_dir = os.path.join(info["session_dir"], "logs")
|
||||
|
||||
@ray.remote
|
||||
class Actor(object):
|
||||
def f(self):
|
||||
print("function f finished")
|
||||
|
||||
# First create a temporary actor.
|
||||
actors = [
|
||||
Actor.remote() for i in range(ray_constants.LOG_MONITOR_MAX_OPEN_FILES)
|
||||
]
|
||||
ray.get([a.f.remote() for a in actors])
|
||||
|
||||
# Make sure no log files are in the "old" directory before the actors
|
||||
# are killed.
|
||||
assert len(glob.glob("{}/old/worker*.out".format(logs_dir))) == 0
|
||||
|
||||
# Now kill the actors so the files get moved to logs/old/.
|
||||
[a.__ray_terminate__.remote() for a in actors]
|
||||
|
||||
while True:
|
||||
log_file_paths = glob.glob("{}/old/worker*.out".format(logs_dir))
|
||||
if len(log_file_paths) > 0:
|
||||
with open(log_file_paths[0], "r") as f:
|
||||
assert "function f finished\n" in f.readlines()
|
||||
break
|
||||
|
||||
# Make sure that nothing has died.
|
||||
assert ray.services.remaining_processes_alive()
|
||||
|
||||
Reference in New Issue
Block a user