From c741d1cf9c78f8cd60b40d58b88e8d3e4d636906 Mon Sep 17 00:00:00 2001 From: krfricke Date: Mon, 3 Aug 2020 20:18:34 +0200 Subject: [PATCH] [tune] stdout/stderr logging redirection (#9817) * Add `log_to_file` parameter, pass to Trainable config, redirect stdout/stderr. * Add logging handler to root ray logger * Added test for `log_to_file` parameter * Added logs, reuse test * Revert debug change * Update logdir on reset, flush streams after each train() step * Remove magic keys from visible config Co-authored-by: Kai Fricke --- doc/source/tune/user-guide.rst | 52 +++++++++ python/ray/tune/ray_trial_executor.py | 18 ++-- python/ray/tune/result.py | 9 ++ python/ray/tune/tests/test_actor_reuse.py | 56 +++++++++- python/ray/tune/tests/test_api.py | 58 ++++++++++ python/ray/tune/trainable.py | 122 +++++++++++++++++++--- python/ray/tune/trial_executor.py | 4 +- python/ray/tune/tune.py | 42 ++++++++ python/ray/tune/utils/util.py | 14 +++ 9 files changed, 346 insertions(+), 29 deletions(-) diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index d3140642c..3a350a3ef 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -431,6 +431,58 @@ If a string is provided, then it must include replacement fields ``{source}`` an By default, syncing occurs every 300 seconds. To change the frequency of syncing, set the ``TUNE_CLOUD_SYNC_S`` environment variable in the driver to the desired syncing period. Note that uploading only happens when global experiment state is collected, and the frequency of this is determined by the ``global_checkpoint_period`` argument. So the true upload period is given by ``max(TUNE_CLOUD_SYNC_S, global_checkpoint_period)``. +.. _tune-log_to_file: + +Redirecting stdout and stderr to files +-------------------------------------- +The stdout and stderr streams are usually printed to the console. For remote actors, +Ray collects these logs and prints them to the head process, as long as it +has been initialized with ``log_to_driver=True``, which is the default. + +However, if you would like to collect the stream outputs in files for later +analysis or troubleshooting, Tune offers an utility parameter, ``log_to_file``, +for this. + +By passing ``log_to_file=True`` to ``tune.run()``, stdout and stderr will be logged +to ``trial_logdir/stdout`` and ``trial_logdir/stderr``, respectively: + +.. code-block:: python + + tune.run( + trainable, + log_to_file=True) + +If you would like to specify the output files, you can either pass one filename, +where the combined output will be stored, or two filenames, for stdout and stderr, +respectively: + +.. code-block:: python + + tune.run( + trainable, + log_to_file="std_combined.log") + + tune.run( + trainable, + log_to_file=("my_stdout.log", "my_stderr.log")) + +The file names are relative to the trial's logdir. You can pass absolute paths, +too. + +If ``log_to_file`` is set, Tune will automatically register a new logging handler +for Ray's base logger and log the output to the specified stderr output file. + +Setting ``log_to_file`` does not disable logging to the driver. If you would +like to disable the logs showing up in the driver output (i.e. they should only +show up in the logfiles), initialize Ray accordingly: + +.. code-block:: python + + ray.init(log_to_driver=False) + tune.run( + trainable, + log_to_file=True) + .. _tune-debugging: Debugging diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 165e55f1a..c21af75c2 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -14,7 +14,7 @@ from ray.resource_spec import ResourceSpec from ray.tune.durable_trainable import DurableTrainable from ray.tune.error import AbortTrialExecution, TuneError from ray.tune.logger import NoopLogger -from ray.tune.result import TRIAL_INFO +from ray.tune.result import TRIAL_INFO, LOGDIR_PATH from ray.tune.resources import Resources from ray.tune.trainable import TrainableUtil from ray.tune.trial import Trial, Checkpoint, Location, TrialInfo @@ -161,10 +161,11 @@ class RayTrialExecutor(TrialExecutor): def logger_creator(config): # Set the working dir in the remote process, for user file writes - os.makedirs(remote_logdir, exist_ok=True) + logdir = config.pop(LOGDIR_PATH, remote_logdir) + os.makedirs(logdir, exist_ok=True) if not ray.worker._mode() == ray.worker.LOCAL_MODE: - os.chdir(remote_logdir) - return NoopLogger(config, remote_logdir) + os.chdir(logdir) + return NoopLogger(config, logdir) # Clear the Trial's location (to be updated later on result) # since we don't know where the remote runner is placed. @@ -339,7 +340,7 @@ class RayTrialExecutor(TrialExecutor): super(RayTrialExecutor, self).pause_trial(trial) def reset_trial(self, trial, new_config, new_experiment_tag): - """Tries to invoke `Trainable.reset_config()` to reset trial. + """Tries to invoke `Trainable.reset()` to reset trial. Args: trial (Trial): Trial to be reset. @@ -353,14 +354,13 @@ class RayTrialExecutor(TrialExecutor): trial.config = new_config trainable = trial.runner with self._change_working_directory(trial): - with warn_if_slow("reset_config"): + with warn_if_slow("reset"): try: reset_val = ray.get( - trainable.reset_config.remote(new_config), + trainable.reset.remote(new_config, trial.logdir), DEFAULT_GET_TIMEOUT) except RayTimeoutError: - logger.exception("Trial %s: reset_config timed out.", - trial) + logger.exception("Trial %s: reset timed out.", trial) return False return reset_val diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 88fbb3e96..e6fd5c691 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -69,6 +69,15 @@ RESULT_DUPLICATE = "__duplicate__" # to the Trainable via the constructor. TRIAL_INFO = "__trial_info__" +# __stdout_file__/__stderr_file__ are magic keywords used internally +# to pass log file locations to the Trainable via the constructor. +STDOUT_FILE = "__stdout_file__" +STDERR_FILE = "__stderr_file__" + +# __logdir_path__ is a magic keyword used internally to pass a new +# logdir to existing loggers. +LOGDIR_PATH = "__logdir_path__" + # Where Tune writes result files by default DEFAULT_RESULTS_DIR = (os.environ.get("TEST_TMPDIR") or os.environ.get("TUNE_RESULT_DIR") diff --git a/python/ray/tune/tests/test_actor_reuse.py b/python/ray/tune/tests/test_actor_reuse.py index b2038a10f..834343847 100644 --- a/python/ray/tune/tests/test_actor_reuse.py +++ b/python/ray/tune/tests/test_actor_reuse.py @@ -1,7 +1,10 @@ +import os import unittest +import sys import ray -from ray.tune import Trainable, run_experiments +from ray import tune, logger +from ray.tune import Trainable, run_experiments, register_trainable from ray.tune.error import TuneError from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler @@ -17,9 +20,15 @@ def create_resettable_class(): self.config = config self.num_resets = 0 self.iter = 0 + self.msg = config.get("message", "No message") def step(self): self.iter += 1 + + print("PRINT_STDOUT: {}".format(self.msg)) + print("PRINT_STDERR: {}".format(self.msg), file=sys.stderr) + logger.info("LOG_STDERR: {}".format(self.msg)) + return {"num_resets": self.num_resets, "done": self.iter > 1} def save_checkpoint(self, chkpt_dir): @@ -32,6 +41,7 @@ def create_resettable_class(): if "fake_reset_not_supported" in self.config: return False self.num_resets += 1 + self.msg = new_config.get("message", "No message") return True return MyResettableClass @@ -90,8 +100,50 @@ class ActorReuseTest(unittest.TestCase): self.assertRaises(TuneError, lambda: run()) + def testTrialReuseLogToFile(self): + register_trainable("foo2", create_resettable_class()) + + # Log to default files + [trial1, trial2] = tune.run( + "foo2", + config={ + "message": tune.grid_search(["First", "Second"]) + }, + log_to_file=True, + scheduler=FrequentPausesScheduler(), + reuse_actors=True).trials + + # Check trial 1 + self.assertEqual(trial1.last_result["num_resets"], 1) + self.assertTrue(os.path.exists(os.path.join(trial1.logdir, "stdout"))) + self.assertTrue(os.path.exists(os.path.join(trial1.logdir, "stderr"))) + with open(os.path.join(trial1.logdir, "stdout"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDOUT: First", content) + self.assertNotIn("PRINT_STDOUT: Second", content) + with open(os.path.join(trial1.logdir, "stderr"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDERR: First", content) + self.assertIn("LOG_STDERR: First", content) + self.assertNotIn("PRINT_STDERR: Second", content) + self.assertNotIn("LOG_STDERR: Second", content) + + # Check trial 2 + self.assertEqual(trial2.last_result["num_resets"], 2) + self.assertTrue(os.path.exists(os.path.join(trial2.logdir, "stdout"))) + self.assertTrue(os.path.exists(os.path.join(trial2.logdir, "stderr"))) + with open(os.path.join(trial2.logdir, "stdout"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDOUT: Second", content) + self.assertNotIn("PRINT_STDOUT: First", content) + with open(os.path.join(trial2.logdir, "stderr"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDERR: Second", content) + self.assertIn("LOG_STDERR: Second", content) + self.assertNotIn("PRINT_STDERR: First", content) + self.assertNotIn("LOG_STDERR: First", content) + if __name__ == "__main__": import pytest - import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index 51b0c5351..a5af5dd2f 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -973,6 +973,64 @@ class TrainableFunctionApiTest(unittest.TestCase): self.assertEqual(trial.status, Trial.TERMINATED) self.assertTrue(trial.has_checkpoint()) + def testLogToFile(self): + def train(config, reporter): + import sys + from ray import logger + for i in range(10): + reporter(timesteps_total=i) + print("PRINT_STDOUT") + print("PRINT_STDERR", file=sys.stderr) + logger.info("LOG_STDERR") + + register_trainable("f1", train) + + # Do not log to file + [trial] = tune.run("f1", log_to_file=False).trials + self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout"))) + self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr"))) + + # Log to default files + [trial] = tune.run("f1", log_to_file=True).trials + self.assertTrue(os.path.exists(os.path.join(trial.logdir, "stdout"))) + self.assertTrue(os.path.exists(os.path.join(trial.logdir, "stderr"))) + with open(os.path.join(trial.logdir, "stdout"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDOUT", content) + with open(os.path.join(trial.logdir, "stderr"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDERR", content) + self.assertIn("LOG_STDERR", content) + + # Log to one file + [trial] = tune.run("f1", log_to_file="combined").trials + self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout"))) + self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr"))) + self.assertTrue(os.path.exists(os.path.join(trial.logdir, "combined"))) + with open(os.path.join(trial.logdir, "combined"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDOUT", content) + self.assertIn("PRINT_STDERR", content) + self.assertIn("LOG_STDERR", content) + + # Log to two files + [trial] = tune.run( + "f1", log_to_file=("alt.stdout", "alt.stderr")).trials + self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stdout"))) + self.assertFalse(os.path.exists(os.path.join(trial.logdir, "stderr"))) + self.assertTrue( + os.path.exists(os.path.join(trial.logdir, "alt.stdout"))) + self.assertTrue( + os.path.exists(os.path.join(trial.logdir, "alt.stderr"))) + + with open(os.path.join(trial.logdir, "alt.stdout"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDOUT", content) + with open(os.path.join(trial.logdir, "alt.stderr"), "rt") as fp: + content = fp.read() + self.assertIn("PRINT_STDERR", content) + self.assertIn("LOG_STDERR", content) + if __name__ == "__main__": import pytest diff --git a/python/ray/tune/trainable.py b/python/ray/tune/trainable.py index cb8c2a35e..290042927 100644 --- a/python/ray/tune/trainable.py +++ b/python/ray/tune/trainable.py @@ -1,3 +1,5 @@ +import sys +from contextlib import redirect_stdout, redirect_stderr from datetime import datetime import copy @@ -7,7 +9,9 @@ import glob import os import pickle import platform + import pandas as pd +from ray.tune.utils.util import Tee from six import string_types import shutil import tempfile @@ -17,10 +21,10 @@ import uuid import ray from ray.util.debug import log_once from ray.tune.logger import UnifiedLogger -from ray.tune.result import (DEFAULT_RESULTS_DIR, TIME_THIS_ITER_S, - TIMESTEPS_THIS_ITER, DONE, TIMESTEPS_TOTAL, - EPISODES_THIS_ITER, EPISODES_TOTAL, - TRAINING_ITERATION, RESULT_DUPLICATE, TRIAL_INFO) +from ray.tune.result import ( + DEFAULT_RESULTS_DIR, TIME_THIS_ITER_S, TIMESTEPS_THIS_ITER, DONE, + TIMESTEPS_TOTAL, EPISODES_THIS_ITER, EPISODES_TOTAL, TRAINING_ITERATION, + RESULT_DUPLICATE, TRIAL_INFO, STDOUT_FILE, STDERR_FILE, LOGDIR_PATH) from ray.tune.utils import UtilMonitor logger = logging.getLogger(__name__) @@ -216,16 +220,17 @@ class Trainable: self.config = config or {} trial_info = self.config.pop(TRIAL_INFO, None) - if logger_creator: - self._result_logger = logger_creator(self.config) - self._logdir = self._result_logger.logdir - else: - logdir_prefix = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") - ray.utils.try_to_create_directory(DEFAULT_RESULTS_DIR) - self._logdir = tempfile.mkdtemp( - prefix=logdir_prefix, dir=DEFAULT_RESULTS_DIR) - self._result_logger = UnifiedLogger( - self.config, self._logdir, loggers=None) + self._logger_creator = logger_creator + self._result_logger = self._logdir = None + self._create_logger(self.config) + + self._stdout_context = self._stdout_fp = self._stdout_stream = None + self._stderr_context = self._stderr_fp = self._stderr_stream = None + self._stderr_logging_handler = None + + stdout_file = self.config.pop(STDOUT_FILE, None) + stderr_file = self.config.pop(STDERR_FILE, None) + self._open_logfiles(stdout_file, stderr_file) self._iteration = 0 self._time_total = 0.0 @@ -389,6 +394,11 @@ class Trainable: self.log_result(result) + if self._stdout_stream: + self._stdout_stream.flush() + if self._stderr_stream: + self._stderr_stream.flush() + return result def get_state(self): @@ -521,13 +531,37 @@ class Trainable: export_dir = export_dir or self.logdir return self._export_model(export_formats, export_dir) + def reset(self, new_config, new_logdir): + """Resets trial for use with new config. + + Subclasses should override reset_config() to actually + reset actor behavior for the new config.""" + self.config = new_config + + logger_config = new_config.copy() + logger_config[LOGDIR_PATH] = new_logdir + + self._logdir = new_logdir + + self._result_logger.flush() + self._result_logger.close() + + self._create_logger(logger_config) + + stdout_file = new_config.pop(STDOUT_FILE, None) + stderr_file = new_config.pop(STDERR_FILE, None) + + self._close_logfiles() + self._open_logfiles(stdout_file, stderr_file) + + return self.reset_config(new_config) + def reset_config(self, new_config): """Resets configuration without restarting the trial. This method is optional, but can be implemented to speed up algorithms such as PBT, and to allow performance optimizations such as running - experiments with reuse_actors=True. Note that self.config need to - be updated to reflect the latest parameter information in Ray logs. + experiments with reuse_actors=True. Args: new_config (dict): Updated hyperparameter configuration @@ -538,6 +572,60 @@ class Trainable: """ return False + def _create_logger(self, config): + """Create logger from logger creator""" + if self._logger_creator: + self._result_logger = self._logger_creator(config) + self._logdir = self._result_logger.logdir + else: + logdir_prefix = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") + ray.utils.try_to_create_directory(DEFAULT_RESULTS_DIR) + self._logdir = tempfile.mkdtemp( + prefix=logdir_prefix, dir=DEFAULT_RESULTS_DIR) + self._result_logger = UnifiedLogger( + config, self._logdir, loggers=None) + + def _open_logfiles(self, stdout_file, stderr_file): + """Create loggers. Open stdout and stderr logfiles.""" + if stdout_file: + stdout_path = os.path.expanduser( + os.path.join(self._logdir, stdout_file)) + self._stdout_fp = open(stdout_path, "a+") + self._stdout_stream = Tee(sys.stdout, self._stdout_fp) + self._stdout_context = redirect_stdout(self._stdout_stream) + self._stdout_context.__enter__() + + if stderr_file: + stderr_path = os.path.expanduser( + os.path.join(self._logdir, stderr_file)) + self._stderr_fp = open(stderr_path, "a+") + self._stderr_stream = Tee(sys.stderr, self._stderr_fp) + self._stderr_context = redirect_stderr(self._stderr_stream) + self._stderr_context.__enter__() + + # Add logging handler to root ray logger + formatter = logging.Formatter("[%(levelname)s %(asctime)s] " + "%(filename)s: %(lineno)d " + "%(message)s") + self._stderr_logging_handler = logging.StreamHandler( + self._stderr_fp) + self._stderr_logging_handler.setFormatter(formatter) + ray.logger.addHandler(self._stderr_logging_handler) + + def _close_logfiles(self): + """Close stdout and stderr logfiles.""" + if self._stderr_logging_handler: + ray.logger.removeHandler(self._stderr_logging_handler) + + if self._stdout_context: + self._stdout_stream.flush() + self._stdout_context.__exit__(None, None, None) + self._stdout_fp.close() + if self._stderr_context: + self._stderr_stream.flush() + self._stderr_context.__exit__(None, None, None) + self._stderr_fp.close() + def stop(self): """Releases all resources used by this trainable. @@ -548,6 +636,8 @@ class Trainable: self._result_logger.close() self.cleanup() + self._close_logfiles() + @property def logdir(self): """Directory of the results and checkpoints for this Trainable. diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index e4e581ead..d391bb56f 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -129,7 +129,7 @@ class TrialExecutor: self.start_trial(trial) def reset_trial(self, trial, new_config, new_experiment_tag): - """Tries to invoke `Trainable.reset_config()` to reset trial. + """Tries to invoke `Trainable.reset()` to reset trial. Args: trial (Trial): Trial to be reset. @@ -139,7 +139,7 @@ class TrialExecutor: for trial. Returns: - True if `reset_config` is successful else False. + True if `reset` is successful else False. """ raise NotImplementedError diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 4c2229d95..3030385b4 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -1,8 +1,10 @@ import logging +from typing import Sequence from ray.tune.error import TuneError from ray.tune.experiment import convert_to_experiment_list, Experiment from ray.tune.analysis import ExperimentAnalysis +from ray.tune.result import STDOUT_FILE, STDERR_FILE from ray.tune.suggest import BasicVariantGenerator from ray.tune.suggest.suggestion import Searcher, SearchGenerator from ray.tune.trial import Trial @@ -65,6 +67,31 @@ def _report_progress(runner, reporter, done=False): reporter.report(trials, done, sched_debug_str, executor_debug_str) +def _validate_log_to_file(log_to_file): + """Validate ``tune.run``'s ``log_to_file`` parameter. Return + validated relative stdout and stderr filenames.""" + if not log_to_file: + stdout_file = stderr_file = None + elif isinstance(log_to_file, bool) and log_to_file: + stdout_file = "stdout" + stderr_file = "stderr" + elif isinstance(log_to_file, str): + stdout_file = stderr_file = log_to_file + elif isinstance(log_to_file, Sequence): + if len(log_to_file) != 2: + raise ValueError( + "If you pass a Sequence to `log_to_file` it has to have " + "a length of 2 (for stdout and stderr, respectively). The " + "Sequence you passed has length {}.".format(len(log_to_file))) + stdout_file, stderr_file = log_to_file + else: + raise ValueError( + "You can pass a boolean, a string, or a Sequence of length 2 to " + "`log_to_file`, but you passed something else ({}).".format( + type(log_to_file))) + return stdout_file, stderr_file + + def run(run_or_experiment, name=None, stop=None, @@ -75,6 +102,7 @@ def run(run_or_experiment, upload_dir=None, trial_name_creator=None, loggers=None, + log_to_file=False, sync_to_cloud=None, sync_to_driver=None, checkpoint_freq=0, @@ -143,6 +171,14 @@ def run(run_or_experiment, loggers (list): List of logger creators to be used with each Trial. If None, defaults to ray.tune.logger.DEFAULT_LOGGERS. See `ray/tune/logger.py`. + log_to_file (bool|str|Sequence): Log stdout and stderr to files in + Tune's trial directories. If this is `False` (default), no files + are written. If `true`, outputs are written to `trialdir/stdout` + and `trialdir/stderr`, respectively. If this is a single string, + this is interpreted as a file relative to the trialdir, to which + both streams are written. If this is a Sequence (e.g. a Tuple), + it has to have length 2 and the elements indicate the files to + which stdout and stderr are written, respectively. sync_to_cloud (func|str): Function for syncing the local_dir to and from upload_dir. If string, then it must be a string template that includes `{source}` and `{target}` for the syncer to run. If not @@ -242,6 +278,8 @@ def run(run_or_experiment, space = {"lr": tune.uniform(0, 1), "momentum": tune.uniform(0, 1)} tune.run(my_trainable, config=space, stop={"training_iteration": 10}) """ + config = config or {} + trial_executor = trial_executor or RayTrialExecutor( queue_trials=queue_trials, reuse_actors=reuse_actors, @@ -251,6 +289,10 @@ def run(run_or_experiment, else: experiments = [run_or_experiment] + stdout_file, stderr_file = _validate_log_to_file(log_to_file) + config[STDOUT_FILE] = stdout_file + config[STDERR_FILE] = stderr_file + for i, exp in enumerate(experiments): if not isinstance(exp, Experiment): run_identifier = Experiment.register_if_needed(exp) diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 94e766b75..fb217d9d9 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -137,6 +137,20 @@ class warn_if_slow: now - self.start) +class Tee(object): + def __init__(self, stream1, stream2): + self.stream1 = stream1 + self.stream2 = stream2 + + def write(self, *args, **kwargs): + self.stream1.write(*args, **kwargs) + self.stream2.write(*args, **kwargs) + + def flush(self, *args, **kwargs): + self.stream1.flush(*args, **kwargs) + self.stream2.flush(*args, **kwargs) + + def merge_dicts(d1, d2): """ Args: