diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 6d534ff50..a8fce2fb1 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -8,6 +8,7 @@ import glob import json import logging import os +import shutil import time import traceback @@ -85,7 +86,26 @@ class LogMonitor(object): file_info = self.open_file_infos.pop(0) file_info.file_handle.close() file_info.file_handle = None - self.closed_file_infos.append(file_info) + try: + # Test if the worker process that generated the log file + # is still alive. + os.kill(file_info.worker_pid, 0) + except OSError: + # The process is not alive any more, so move the log file + # out of the log directory so glob.glob will not be slowed + # by it. + target = os.path.join(self.logs_dir, "old", + os.path.basename(file_info.filename)) + try: + shutil.move(file_info.filename, target) + except (IOError, OSError) as e: + if e.errno == errno.ENOENT: + logger.warning("Warning: The file {} was not " + "found.".format(file_info.filename)) + else: + raise e + else: + self.closed_file_infos.append(file_info) self.can_open_more_files = True def update_log_filenames(self): diff --git a/python/ray/node.py b/python/ray/node.py index c80a4fd6b..5f09046e9 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -180,6 +180,8 @@ class Node(object): # Create a directory to be used for process log files. self._logs_dir = os.path.join(self._session_dir, "logs") try_to_create_directory(self._logs_dir, warn_if_exist=False) + old_logs_dir = os.path.join(self._logs_dir, "old") + try_to_create_directory(old_logs_dir, warn_if_exist=False) def get_resource_spec(self): """Resolve and return the current resource spec for the node.""" diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 3b575c1e0..28f3cea75 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -28,6 +28,7 @@ import pickle import pytest import ray +import ray.ray_constants as ray_constants import ray.tests.cluster_utils import ray.tests.utils @@ -3140,3 +3141,38 @@ def test_invalid_unicode_in_worker_log(shutdown_only): # Make sure that nothing has died. assert ray.services.remaining_processes_alive() + + +@pytest.mark.skip(reason="This test is too expensive to run.") +def test_move_log_files_to_old(shutdown_only): + info = ray.init(num_cpus=1) + + logs_dir = os.path.join(info["session_dir"], "logs") + + @ray.remote + class Actor(object): + def f(self): + print("function f finished") + + # First create a temporary actor. + actors = [ + Actor.remote() for i in range(ray_constants.LOG_MONITOR_MAX_OPEN_FILES) + ] + ray.get([a.f.remote() for a in actors]) + + # Make sure no log files are in the "old" directory before the actors + # are killed. + assert len(glob.glob("{}/old/worker*.out".format(logs_dir))) == 0 + + # Now kill the actors so the files get moved to logs/old/. + [a.__ray_terminate__.remote() for a in actors] + + while True: + log_file_paths = glob.glob("{}/old/worker*.out".format(logs_dir)) + if len(log_file_paths) > 0: + with open(log_file_paths[0], "r") as f: + assert "function f finished\n" in f.readlines() + break + + # Make sure that nothing has died. + assert ray.services.remaining_processes_alive()