diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index 8ff397237..6f3261ce4 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -1,16 +1,21 @@ from __future__ import print_function import collections +import os import sys +from typing import Dict, List, Optional import numpy as np import time +from ray.tune.callback import Callback +from ray.tune.logger import pretty_print from ray.tune.result import (EPISODE_REWARD_MEAN, MEAN_ACCURACY, MEAN_LOSS, TRAINING_ITERATION, TIME_TOTAL_S, TIMESTEPS_TOTAL, AUTO_RESULT_KEYS) -from ray.tune.trial import Trial +from ray.tune.trial import DEBUG_PRINT_INTERVAL, Trial from ray.tune.utils import unflattened_lookup +from ray.tune.utils.log import Verbosity, has_verbosity try: from collections.abc import Mapping @@ -224,15 +229,19 @@ class TuneReporterBase(ProgressReporter): best_trial_str(current_best_trial, metric, self._parameter_columns)) - messages.append( - trial_progress_str( - trials, - metric_columns=self._metric_columns, - parameter_columns=self._parameter_columns, - total_samples=self._total_samples, - fmt=fmt, - max_rows=max_progress)) - messages.append(trial_errors_str(trials, fmt=fmt, max_rows=max_error)) + if has_verbosity(Verbosity.V1_EXPERIMENT): + # Will filter the table in `trial_progress_str` + messages.append( + trial_progress_str( + trials, + metric_columns=self._metric_columns, + parameter_columns=self._parameter_columns, + total_samples=self._total_samples, + fmt=fmt, + max_rows=max_progress, + done=done)) + messages.append( + trial_errors_str(trials, fmt=fmt, max_rows=max_error)) return delim.join(messages) + delim @@ -399,12 +408,20 @@ def memory_debug_str(): "(or ray[debug]) to resolve)") +def _get_trials_by_state(trials): + trials_by_state = collections.defaultdict(list) + for t in trials: + trials_by_state[t.status].append(t) + return trials_by_state + + def trial_progress_str(trials, metric_columns, parameter_columns=None, total_samples=0, fmt="psql", - max_rows=None): + max_rows=None, + done=False): """Returns a human readable message for printing to the console. This contains a table where each row represents a trial, its parameters @@ -432,9 +449,7 @@ def trial_progress_str(trials, return delim.join(messages) num_trials = len(trials) - trials_by_state = collections.defaultdict(list) - for t in trials: - trials_by_state[t.status].append(t) + trials_by_state = _get_trials_by_state(trials) for local_dir in sorted({t.local_dir for t in trials}): messages.append("Result logdir: {}".format(local_dir)) @@ -444,6 +459,30 @@ def trial_progress_str(trials, for state in sorted(trials_by_state) ] + if total_samples and total_samples >= sys.maxsize: + total_samples = "infinite" + + messages.append("Number of trials: {}{} ({})".format( + num_trials, f"/{total_samples}" + if total_samples else "", ", ".join(num_trials_strs))) + + if has_verbosity(Verbosity.V3_TRIAL_DETAILS) or (has_verbosity( + Verbosity.V2_TRIAL_NORM) and done): + messages += trial_progress_table(trials, metric_columns, + parameter_columns, fmt, max_rows) + + return delim.join(messages) + + +def trial_progress_table(trials, + metric_columns, + parameter_columns=None, + fmt="psql", + max_rows=None): + messages = [] + num_trials = len(trials) + trials_by_state = _get_trials_by_state(trials) + state_tbl_order = [ Trial.RUNNING, Trial.PAUSED, Trial.PENDING, Trial.TERMINATED, Trial.ERROR @@ -474,13 +513,6 @@ def trial_progress_str(trials, continue trials += trials_by_state[state] - if total_samples and total_samples >= sys.maxsize: - total_samples = "infinite" - - messages.append("Number of trials: {}{} ({})".format( - num_trials, f"/{total_samples}" - if total_samples else "", ", ".join(num_trials_strs))) - # Pre-process trials to figure out what columns to show. if isinstance(metric_columns, Mapping): metric_keys = list(metric_columns.keys()) @@ -522,7 +554,7 @@ def trial_progress_str(trials, if overflow: messages.append("... {} more trials not shown ({})".format( overflow, overflow_str)) - return delim.join(messages) + return messages def trial_errors_str(trials, fmt="psql", max_rows=None): @@ -621,3 +653,108 @@ def _get_trial_info(trial, parameters, metrics): unflattened_lookup(metric, result, default=None) for metric in metrics ] return trial_info + + +class TrialProgressCallback(Callback): + """Reports (prints) intermediate trial progress. + + This callback is automatically added to the callback stack. When a + result is obtained, this callback will print the results according to + the specified verbosity level. + + For ``Verbosity.V3_TRIAL_DETAILS``, a full result list is printed. + + For ``Verbosity.V2_TRIAL_NORM``, only one line is printed per received + result. + + All other verbosity levels do not print intermediate trial progress. + + Result printing is throttled on a per-trial basis. Per default, results are + printed only once every 30 seconds. Results are always printed when a trial + finished or errored. + + """ + + def __init__(self, metric: Optional[str] = None): + self._last_print = collections.defaultdict(float) + self._completed_trials = set() + self._last_result_str = {} + self._metric = metric + + def on_trial_result(self, iteration: int, trials: List["Trial"], + trial: "Trial", result: Dict, **info): + self.log_result(trial, result, error=False) + + def on_trial_error(self, iteration: int, trials: List["Trial"], + trial: "Trial", **info): + self.log_result(trial, trial.last_result, error=True) + + def on_trial_complete(self, iteration: int, trials: List["Trial"], + trial: "Trial", **info): + # Only log when we never logged that a trial was completed + if trial not in self._completed_trials: + self._completed_trials.add(trial) + + print_result_str = self._print_result(trial.last_result) + last_result_str = self._last_result_str.get(trial, "") + # If this is a new result, print full result string + if print_result_str != last_result_str: + self.log_result(trial, trial.last_result, error=False) + else: + print(f"Trial {trial} completed. " + f"Last result: {print_result_str}") + + def log_result(self, trial: "Trial", result: Dict, error: bool = False): + done = result.get("done", False) is True + last_print = self._last_print[trial] + if done and trial not in self._completed_trials: + self._completed_trials.add(trial) + if has_verbosity(Verbosity.V3_TRIAL_DETAILS) and \ + (done or error or time.time() - last_print > DEBUG_PRINT_INTERVAL): + print("Result for {}:".format(trial)) + print(" {}".format(pretty_print(result).replace("\n", "\n "))) + self._last_print[trial] = time.time() + elif has_verbosity(Verbosity.V2_TRIAL_NORM) and ( + done or error + or time.time() - last_print > DEBUG_PRINT_INTERVAL): + info = "" + if done: + info = " This trial completed." + + metric_name = self._metric or "_metric" + metric_value = result.get(metric_name, -99.) + + print_result_str = self._print_result(result) + + self._last_result_str[trial] = print_result_str + + error_file = os.path.join(trial.logdir, "error.txt") + + if error: + message = f"The trial {trial} errored with " \ + f"parameters={trial.config}. " \ + f"Error file: {error_file}" + elif self._metric: + message = f"Trial {trial} reported " \ + f"{metric_name}={metric_value:.2f} " \ + f"with parameters={trial.config}.{info}" + else: + message = f"Trial {trial} reported " \ + f"{print_result_str} " \ + f"with parameters={trial.config}.{info}" + + print(message) + self._last_print[trial] = time.time() + + def _print_result(self, result: Dict): + print_result = result.copy() + print_result.pop("config", None) + print_result.pop("trial_id", None) + print_result.pop("experiment_tag", None) + print_result.pop("done", None) + for auto_result in AUTO_RESULT_KEYS: + print_result.pop(auto_result, None) + + print_result_str = ",".join( + [f"{k}={v}" for k, v in print_result.items()]) + return print_result_str diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py index c297b8cdd..664b15941 100644 --- a/python/ray/tune/tests/test_progress_reporter.py +++ b/python/ray/tune/tests/test_progress_reporter.py @@ -73,7 +73,7 @@ tune.run_experiments({ "c": tune.grid_search(list(range(10))), }, }, -}, verbose=1, progress_reporter=reporter)""" +}, verbose=3, progress_reporter=reporter)""" EXPECTED_END_TO_END_START = """Number of trials: 1/30 (1 RUNNING) +---------------+----------+-------+-----+ @@ -160,6 +160,48 @@ EXPECTED_BEST_1 = "Current best trial: 00001 with metric_1=0.5 and " \ EXPECTED_BEST_2 = "Current best trial: 00004 with metric_1=2.0 and " \ "parameters={'a': 4}" +VERBOSE_EXP_OUT_1 = "Number of trials: 1/3 (1 RUNNING)" +VERBOSE_EXP_OUT_2 = "Number of trials: 3/3 (3 TERMINATED)" + +VERBOSE_TRIAL_NORM = "Trial train_xxxxx_00000 reported acc=5 with " + \ + """parameters={'do': 'complete'}. This trial completed. +Trial train_xxxxx_00001 reported _metric=6 with parameters={'do': 'once'}. +Trial train_xxxxx_00001 completed. Last result: _metric=6 +Trial train_xxxxx_00002 reported acc=7 with parameters={'do': 'twice'}. +Trial train_xxxxx_00002 reported acc=8 with parameters={'do': 'twice'}. """ + \ + "This trial completed." + +VERBOSE_TRIAL_DETAIL = """+-------------------+----------+-------+----------+ +| Trial name | status | loc | do | +|-------------------+----------+-------+----------| +| train_xxxxx_00000 | RUNNING | | complete | ++-------------------+----------+-------+----------+""" + +VERBOSE_CMD = """from ray import tune +import random +import numpy as np + + +def train(config): + if config["do"] == "complete": + tune.report(acc=5, done=True) + elif config["do"] == "once": + tune.report(6) + else: + tune.report(acc=7) + tune.report(acc=8) + +random.seed(1234) +np.random.seed(1234) + +tune.run( + train, + config={ + "do": tune.grid_search(["complete", "once", "twice"]) + },""" + +# Add "verbose=3)" etc + class ProgressReporterTest(unittest.TestCase): def mock_trial(self, status, i): @@ -363,6 +405,64 @@ class ProgressReporterTest(unittest.TestCase): finally: del os.environ["_TEST_TUNE_TRIAL_UUID"] + def testVerboseReporting(self): + try: + os.environ["_TEST_TUNE_TRIAL_UUID"] = "xxxxx" + + verbose_0_cmd = VERBOSE_CMD + "verbose=0)" + output = run_string_as_driver(verbose_0_cmd) + try: + assert VERBOSE_EXP_OUT_1 not in output + assert VERBOSE_EXP_OUT_2 not in output + assert VERBOSE_TRIAL_NORM not in output + assert VERBOSE_TRIAL_DETAIL not in output + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + + verbose_1_cmd = VERBOSE_CMD + "verbose=1)" + output = run_string_as_driver(verbose_1_cmd) + try: + assert VERBOSE_EXP_OUT_1 in output + assert VERBOSE_EXP_OUT_2 in output + assert VERBOSE_TRIAL_NORM not in output + assert VERBOSE_TRIAL_DETAIL not in output + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + + verbose_2_cmd = VERBOSE_CMD + "verbose=2)" + output = run_string_as_driver(verbose_2_cmd) + try: + assert VERBOSE_EXP_OUT_1 in output + assert VERBOSE_EXP_OUT_2 in output + assert VERBOSE_TRIAL_NORM in output + assert VERBOSE_TRIAL_DETAIL not in output + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + + verbose_3_cmd = VERBOSE_CMD + "verbose=3)" + output = run_string_as_driver(verbose_3_cmd) + try: + assert VERBOSE_EXP_OUT_1 in output + assert VERBOSE_EXP_OUT_2 in output + assert VERBOSE_TRIAL_NORM not in output + assert VERBOSE_TRIAL_DETAIL in output + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + finally: + del os.environ["_TEST_TUNE_TRIAL_UUID"] + if __name__ == "__main__": import sys diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 938706234..af4e7bc50 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -15,7 +15,6 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager # NOTE(rkn): We import ray.tune.registry here instead of importing the names we # need because there are cyclic imports that may cause specific names to not # have been defined yet. See https://github.com/ray-project/ray/issues/1716. -from ray.tune.logger import pretty_print from ray.tune.registry import get_trainable_cls, validate_trainable from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION from ray.tune.resources import Resources, json_to_resources, resources_to_json @@ -228,7 +227,6 @@ class Trial: or not len(self.log_to_file) == 2: self.log_to_file = (None, None) - self.verbose = True self.max_failures = max_failures # Local trial state that is updated during the run @@ -456,11 +454,7 @@ class Trial: def update_last_result(self, result, terminate=False): if self.experiment_tag: result.update(experiment_tag=self.experiment_tag) - if self.verbose and (terminate or time.time() - self.last_debug > - DEBUG_PRINT_INTERVAL): - print("Result for {}:".format(self)) - print(" {}".format(pretty_print(result).replace("\n", "\n "))) - self.last_debug = time.time() + self.set_location(Location(result.get("node_ip"), result.get("pid"))) self.last_result = result self.last_update_time = time.time() @@ -502,9 +496,6 @@ class Trial: def get_trainable_cls(self): return get_trainable_cls(self.trainable_name) - def set_verbose(self, verbose): - self.verbose = verbose - def is_finished(self): return self.status in [Trial.ERROR, Trial.TERMINATED] diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 09f34f82e..b5340b2ec 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -21,6 +21,7 @@ from ray.tune.trial import Checkpoint, Trial from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.suggest import BasicVariantGenerator from ray.tune.utils import warn_if_slow, flatten_dict, env_integer +from ray.tune.utils.log import Verbosity, has_verbosity from ray.tune.web_server import TuneServer from ray.utils import binary_to_hex, hex_to_binary from ray.util.debug import log_once @@ -110,8 +111,6 @@ class TrialRunner: If fail_fast='raise' provided, Tune will automatically raise the exception received by the Trainable. fail_fast='raise' can easily leak resources and should be used with caution. - verbose (bool): Flag for verbosity. If False, trial results - will not be output. checkpoint_period (int): Trial runner checkpoint periodicity in seconds. Defaults to 10. trial_executor (TrialExecutor): Defaults to RayTrialExecutor. @@ -134,7 +133,6 @@ class TrialRunner: resume=False, server_port=None, fail_fast=False, - verbose=True, checkpoint_period=None, trial_executor=None, callbacks=None, @@ -167,7 +165,6 @@ class TrialRunner: else: raise ValueError("fail_fast must be one of {bool, RAISE}. " f"Got {self._fail_fast}.") - self._verbose = verbose self._server = None self._server_port = server_port @@ -197,7 +194,7 @@ class TrialRunner: self.resume(run_errored_only=errored_only) self._resumed = True except Exception as e: - if self._verbose: + if has_verbosity(Verbosity.V3_TRIAL_DETAILS): logger.error(str(e)) logger.exception("Runner restore failed.") if self._fail_fast: @@ -430,7 +427,6 @@ class TrialRunner: Args: trial (Trial): Trial to queue. """ - trial.set_verbose(self._verbose) self._trials.append(trial) with warn_if_slow("scheduler.on_trial_add"): self._scheduler_alg.on_trial_add(self, trial) @@ -588,6 +584,8 @@ class TrialRunner: with warn_if_slow("scheduler.on_trial_result"): decision = self._scheduler_alg.on_trial_result( self, trial, flat_result) + if decision == TrialScheduler.STOP: + result.update(done=True) with warn_if_slow("search_alg.on_trial_result"): self._search_alg.on_trial_result(trial.trial_id, flat_result) @@ -606,7 +604,6 @@ class TrialRunner: iteration=self._iteration, trials=self._trials, trial=trial) - result.update(done=True) if not is_duplicate: trial.update_last_result( diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 0594d4d7e..8f35af055 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -18,6 +18,7 @@ from ray.tune.syncer import wait_for_sync, set_sync_periods, \ from ray.tune.trial_runner import TrialRunner from ray.tune.progress_reporter import CLIReporter, JupyterNotebookReporter from ray.tune.schedulers import FIFOScheduler +from ray.tune.utils.log import Verbosity, has_verbosity, set_verbosity logger = logging.getLogger(__name__) @@ -70,7 +71,7 @@ def run( checkpoint_score_attr=None, checkpoint_freq=0, checkpoint_at_end=False, - verbose=2, + verbose=Verbosity.V3_TRIAL_DETAILS, progress_reporter=None, loggers=None, log_to_file=False, @@ -188,8 +189,9 @@ def run( checkpoint_at_end (bool): Whether to checkpoint at the end of the experiment regardless of the checkpoint_freq. Default is False. This has no effect when using the Functional Training API. - verbose (int): 0, 1, or 2. Verbosity mode. 0 = silent, - 1 = only status updates, 2 = status and trial results. + verbose (int): 0, 1, 2, or 3. Verbosity mode. 0 = silent, + 1 = only status updates, 2 = status and brief trial results, + 3 = status and detailed trial results. Defaults to 3. progress_reporter (ProgressReporter): Progress reporter for reporting intermediate experiment progress. Defaults to CLIReporter if running in command-line, or JupyterNotebookReporter if running in @@ -282,6 +284,8 @@ def run( "The `mode` parameter passed to `tune.run()` has to be one of " "['min', 'max']") + set_verbosity(verbose) + config = config or {} sync_config = sync_config or SyncConfig() set_sync_periods(sync_config) @@ -354,9 +358,9 @@ def run( "own `metric` and `mode` parameters. Either remove the arguments " "from your scheduler or from your call to `tune.run()`") - # Create logger and syncer callbacks + # Create syncer callbacks callbacks = create_default_callbacks( - callbacks, sync_config, loggers=loggers) + callbacks, sync_config, metric=metric, loggers=loggers) runner = TrialRunner( search_alg=search_alg, @@ -367,7 +371,6 @@ def run( stopper=experiments[0].stopper, resume=resume, server_port=server_port, - verbose=bool(verbose > 1), fail_fast=fail_fast, trial_executor=trial_executor, callbacks=callbacks, @@ -381,7 +384,8 @@ def run( if progress_reporter is None: if IS_NOTEBOOK: - progress_reporter = JupyterNotebookReporter(overwrite=verbose < 2) + progress_reporter = JupyterNotebookReporter( + overwrite=not has_verbosity(Verbosity.V2_TRIAL_NORM)) else: progress_reporter = CLIReporter() @@ -414,7 +418,7 @@ def run( tune_start = time.time() while not runner.is_finished(): runner.step() - if verbose: + if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter) tune_taken = time.time() - tune_start @@ -423,7 +427,7 @@ def run( except Exception as e: logger.warning(f"Trial Runner checkpointing failed: {str(e)}") - if verbose: + if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter, done=True) wait_for_sync() @@ -441,8 +445,9 @@ def run( logger.error("Trials did not complete: %s", incomplete_trials) all_taken = time.time() - all_start - logger.info(f"Total run time: {all_taken:.2f} seconds " - f"({tune_taken:.2f} seconds for the tuning loop).") + if has_verbosity(Verbosity.V1_EXPERIMENT): + logger.info(f"Total run time: {all_taken:.2f} seconds " + f"({tune_taken:.2f} seconds for the tuning loop).") trials = runner.get_trials() return ExperimentAnalysis( @@ -455,7 +460,7 @@ def run( def run_experiments(experiments, scheduler=None, server_port=None, - verbose=2, + verbose=Verbosity.V3_TRIAL_DETAILS, progress_reporter=None, resume=False, queue_trials=False, diff --git a/python/ray/tune/utils/callback.py b/python/ray/tune/utils/callback.py index c4b7275ba..91ec27f91 100644 --- a/python/ray/tune/utils/callback.py +++ b/python/ray/tune/utils/callback.py @@ -2,6 +2,7 @@ import os from typing import List, Optional from ray.tune.callback import Callback +from ray.tune.progress_reporter import TrialProgressCallback from ray.tune.syncer import SyncConfig from ray.tune.logger import CSVLogger, DEFAULT_LOGGERS, ExperimentLogger, \ JsonLogger, LegacyExperimentLogger, Logger @@ -10,13 +11,43 @@ from ray.tune.syncer import SyncerCallback def create_default_callbacks(callbacks: Optional[List[Callback]], sync_config: SyncConfig, - loggers: Optional[List[Logger]]): + loggers: Optional[List[Logger]], + metric: Optional[str] = None): + """Create default callbacks for `tune.run()`. + This function takes a list of existing callbacks and adds default + callbacks to it. + + Specifically, three kinds of callbacks will be added: + + 1. Loggers. Ray Tune's experiment analysis relies on CSV and JSON logging. + 2. Syncer. Ray Tune synchronizes logs and checkpoint between workers and + the head node. + 2. Trial progress reporter. For reporting intermediate progress, like trial + results, Ray Tune uses a callback. + + These callbacks will only be added if they don't already exist, i.e. if + they haven't been passed (and configured) by the user. A notable case + is when a Logger is passed, which is not a CSV or JSON logger - then + a CSV and JSON logger will still be created. + + Lastly, this function will ensure that the Syncer callback comes after all + Logger callbacks, to ensure that the most up-to-date logs and checkpoints + are synced across nodes. + + """ callbacks = callbacks or [] has_syncer_callback = False has_csv_logger = False has_json_logger = False + has_trial_progress_callback = any( + isinstance(c, TrialProgressCallback) for c in callbacks) + + if not has_trial_progress_callback: + trial_progress_callback = TrialProgressCallback(metric=metric) + callbacks.append(trial_progress_callback) + # Track syncer obj/index to move callback after loggers last_logger_index = None syncer_index = None diff --git a/python/ray/tune/utils/log.py b/python/ray/tune/utils/log.py new file mode 100644 index 000000000..a62eb2b66 --- /dev/null +++ b/python/ray/tune/utils/log.py @@ -0,0 +1,34 @@ +from enum import Enum +from typing import Union + + +class Verbosity(Enum): + V0_MINIMAL = 0 + V1_EXPERIMENT = 1 + V2_TRIAL_NORM = 2 + V3_TRIAL_DETAILS = 3 + + def __int__(self): + return self.value + + +verbosity: Union[int, Verbosity] = Verbosity.V3_TRIAL_DETAILS + + +def set_verbosity(level: Union[int, Verbosity]): + global verbosity + + if isinstance(level, int): + verbosity = Verbosity(level) + else: + verbosity = verbosity + + +def has_verbosity(level: Union[int, Verbosity]) -> bool: + """Return True if passed level exceeds global verbosity level.""" + global verbosity + + log_level = int(level) + verbosity_level = int(verbosity) + + return verbosity_level >= log_level