From 219c445648570932bcfecd7cae32fa6411ef4b63 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 4 Dec 2020 22:56:26 +0100 Subject: [PATCH] [tune] verbosity refactor second attempt (#12571) Co-authored-by: Richard Liaw --- python/ray/tune/progress_reporter.py | 378 ++++++++++++++---- .../ray/tune/tests/test_progress_reporter.py | 113 +++++- python/ray/tune/trial.py | 11 +- python/ray/tune/trial_runner.py | 11 +- python/ray/tune/tune.py | 29 +- python/ray/tune/utils/callback.py | 33 +- python/ray/tune/utils/log.py | 34 ++ rllib/train.py | 18 +- 8 files changed, 503 insertions(+), 124 deletions(-) create mode 100644 python/ray/tune/utils/log.py diff --git a/python/ray/tune/progress_reporter.py b/python/ray/tune/progress_reporter.py index 59c540dd8..a45ac238d 100644 --- a/python/ray/tune/progress_reporter.py +++ b/python/ray/tune/progress_reporter.py @@ -1,21 +1,26 @@ from __future__ import print_function -import collections -import sys +from typing import Dict, List, Optional, Union +import collections +import os +import sys import numpy as np import time +from ray.tune.callback import Callback +from ray.tune.logger import pretty_print from ray.tune.result import (DEFAULT_METRIC, EPISODE_REWARD_MEAN, MEAN_ACCURACY, MEAN_LOSS, TRAINING_ITERATION, TIME_TOTAL_S, TIMESTEPS_TOTAL, AUTO_RESULT_KEYS) -from ray.tune.trial import Trial +from ray.tune.trial import DEBUG_PRINT_INTERVAL, Trial from ray.tune.utils import unflattened_lookup +from ray.tune.utils.log import Verbosity, has_verbosity try: - from collections.abc import Mapping + from collections.abc import Mapping, MutableMapping except ImportError: - from collections import Mapping + from collections import Mapping, MutableMapping try: from tabulate import tabulate @@ -33,7 +38,7 @@ class ProgressReporter: receiving training results, and so on. """ - def should_report(self, trials, done=False): + def should_report(self, trials: List[Trial], done: bool = False): """Returns whether or not progress should be reported. Args: @@ -42,7 +47,7 @@ class ProgressReporter: """ raise NotImplementedError - def report(self, trials, done, *sys_info): + def report(self, trials: List[Trial], done: bool, *sys_info: Dict): """Reports progress across trials. Args: @@ -80,6 +85,12 @@ class TuneReporterBase(ProgressReporter): Defaults to 5s. infer_limit (int): Maximum number of metrics to automatically infer from tune results. + print_intermediate_tables (bool|None): Print intermediate result + tables. If None (default), will be set to True for verbosity + levels above 3, otherwise False. If True, intermediate tables + will be printed with experiment progress. If False, tables + will only be printed at then end of the tuning run for verbosity + levels greater than 2. metric (str): Metric used to determine best current trial. mode (str): One of [min, max]. Determines whether objective is minimizing or maximizing the metric attribute. @@ -99,16 +110,18 @@ class TuneReporterBase(ProgressReporter): type(None) } - def __init__(self, - metric_columns=None, - parameter_columns=None, - total_samples=None, - max_progress_rows=20, - max_error_rows=20, - max_report_frequency=5, - infer_limit=3, - metric=None, - mode=None): + def __init__( + self, + metric_columns: Union[None, List[str], Dict[str, str]] = None, + parameter_columns: Union[None, List[str], Dict[str, str]] = None, + total_samples: Optional[int] = None, + max_progress_rows: int = 20, + max_error_rows: int = 20, + max_report_frequency: int = 5, + infer_limit: int = 3, + print_intermediate_tables: Optional[bool] = None, + metric: Optional[str] = None, + mode: Optional[str] = None): self._total_samples = total_samples self._metrics_override = metric_columns is not None self._inferred_metrics = {} @@ -118,13 +131,20 @@ class TuneReporterBase(ProgressReporter): self._max_error_rows = max_error_rows self._infer_limit = infer_limit + if print_intermediate_tables is None: + self._print_intermediate_tables = has_verbosity( + Verbosity.V3_TRIAL_DETAILS) + else: + self._print_intermediate_tables = print_intermediate_tables + self._max_report_freqency = max_report_frequency self._last_report_time = 0 self._metric = metric self._mode = mode - def set_search_properties(self, metric, mode): + def set_search_properties(self, metric: Optional[str], + mode: Optional[str]): if self._metric and metric: return False if self._mode and mode: @@ -141,16 +161,18 @@ class TuneReporterBase(ProgressReporter): return True - def set_total_samples(self, total_samples): + def set_total_samples(self, total_samples: int): self._total_samples = total_samples - def should_report(self, trials, done=False): + def should_report(self, trials: List[Trial], done: bool = False): if time.time() - self._last_report_time > self._max_report_freqency: self._last_report_time = time.time() return True return done - def add_metric_column(self, metric, representation=None): + def add_metric_column(self, + metric: str, + representation: Optional[str] = None): """Adds a metric to the existing columns. Args: @@ -163,7 +185,7 @@ class TuneReporterBase(ProgressReporter): if metric in self._metric_columns: raise ValueError("Column {} already exists.".format(metric)) - if isinstance(self._metric_columns, Mapping): + if isinstance(self._metric_columns, MutableMapping): representation = representation or metric self._metric_columns[metric] = representation else: @@ -174,7 +196,9 @@ class TuneReporterBase(ProgressReporter): "of metric columns.") self._metric_columns.append(metric) - def add_parameter_column(self, parameter, representation=None): + def add_parameter_column(self, + parameter: str, + representation: Optional[str] = None): """Adds a parameter to the existing columns. Args: @@ -186,7 +210,7 @@ class TuneReporterBase(ProgressReporter): if parameter in self._parameter_columns: raise ValueError("Column {} already exists.".format(parameter)) - if isinstance(self._parameter_columns, Mapping): + if isinstance(self._parameter_columns, MutableMapping): representation = representation or parameter self._parameter_columns[parameter] = representation else: @@ -197,7 +221,12 @@ class TuneReporterBase(ProgressReporter): "of metric columns.") self._parameter_columns.append(parameter) - def _progress_str(self, trials, done, *sys_info, fmt="psql", delim="\n"): + def _progress_str(self, + trials: List[Trial], + done: bool, + *sys_info: Dict, + fmt: str = "psql", + delim: str = "\n"): """Returns full progress string. This string contains a progress table and error table. The progress @@ -228,19 +257,24 @@ class TuneReporterBase(ProgressReporter): best_trial_str(current_best_trial, metric, self._parameter_columns)) - messages.append( - trial_progress_str( - trials, - metric_columns=self._metric_columns, - parameter_columns=self._parameter_columns, - total_samples=self._total_samples, - fmt=fmt, - max_rows=max_progress)) - messages.append(trial_errors_str(trials, fmt=fmt, max_rows=max_error)) + if has_verbosity(Verbosity.V1_EXPERIMENT): + # Will filter the table in `trial_progress_str` + messages.append( + trial_progress_str( + trials, + metric_columns=self._metric_columns, + parameter_columns=self._parameter_columns, + total_samples=self._total_samples, + force_table=self._print_intermediate_tables, + fmt=fmt, + max_rows=max_progress, + done=done)) + messages.append( + trial_errors_str(trials, fmt=fmt, max_rows=max_error)) return delim.join(messages) + delim - def _infer_user_metrics(self, trials, limit=4): + def _infer_user_metrics(self, trials: List[Trial], limit: int = 4): """Try to infer the metrics to print out.""" if len(self._inferred_metrics) >= limit: return self._inferred_metrics @@ -258,7 +292,7 @@ class TuneReporterBase(ProgressReporter): return self._inferred_metrics return self._inferred_metrics - def _current_best_trial(self, trials): + def _current_best_trial(self, trials: List[Trial]): if not trials: return None, None @@ -309,26 +343,39 @@ class JupyterNotebookReporter(TuneReporterBase): corresponding to each trial. Defaults to 20. max_report_frequency (int): Maximum report frequency in seconds. Defaults to 5s. + infer_limit (int): Maximum number of metrics to automatically infer + from tune results. + print_intermediate_tables (bool|None): Print intermediate result + tables. If None (default), will be set to True for verbosity + levels above 3, otherwise False. If True, intermediate tables + will be printed with experiment progress. If False, tables + will only be printed at then end of the tuning run for verbosity + levels greater than 2. + metric (str): Metric used to determine best current trial. + mode (str): One of [min, max]. Determines whether objective is + minimizing or maximizing the metric attribute. """ - def __init__(self, - overwrite, - metric_columns=None, - parameter_columns=None, - total_samples=None, - max_progress_rows=20, - max_error_rows=20, - max_report_frequency=5, - infer_limit=3, - metric=None, - mode=None): - super(JupyterNotebookReporter, - self).__init__(metric_columns, parameter_columns, total_samples, - max_progress_rows, max_error_rows, - max_report_frequency, infer_limit, metric, mode) + def __init__( + self, + overwrite: bool, + metric_columns: Union[None, List[str], Dict[str, str]] = None, + parameter_columns: Union[None, List[str], Dict[str, str]] = None, + total_samples: Optional[int] = None, + max_progress_rows: int = 20, + max_error_rows: int = 20, + max_report_frequency: int = 5, + infer_limit: int = 3, + print_intermediate_tables: Optional[bool] = None, + metric: Optional[str] = None, + mode: Optional[str] = None): + super(JupyterNotebookReporter, self).__init__( + metric_columns, parameter_columns, total_samples, + max_progress_rows, max_error_rows, max_report_frequency, + infer_limit, print_intermediate_tables, metric, mode) self._overwrite = overwrite - def report(self, trials, done, *sys_info): + def report(self, trials: List[Trial], done: bool, *sys_info: Dict): from IPython.display import clear_output from IPython.core.display import display, HTML if self._overwrite: @@ -359,25 +406,38 @@ class CLIReporter(TuneReporterBase): corresponding to each trial. Defaults to 20. max_report_frequency (int): Maximum report frequency in seconds. Defaults to 5s. + infer_limit (int): Maximum number of metrics to automatically infer + from tune results. + print_intermediate_tables (bool|None): Print intermediate result + tables. If None (default), will be set to True for verbosity + levels above 3, otherwise False. If True, intermediate tables + will be printed with experiment progress. If False, tables + will only be printed at then end of the tuning run for verbosity + levels greater than 2. + metric (str): Metric used to determine best current trial. + mode (str): One of [min, max]. Determines whether objective is + minimizing or maximizing the metric attribute. """ - def __init__(self, - metric_columns=None, - parameter_columns=None, - total_samples=None, - max_progress_rows=20, - max_error_rows=20, - max_report_frequency=5, - infer_limit=3, - metric=None, - mode=None): + def __init__( + self, + metric_columns: Union[None, List[str], Dict[str, str]] = None, + parameter_columns: Union[None, List[str], Dict[str, str]] = None, + total_samples: Optional[int] = None, + max_progress_rows: int = 20, + max_error_rows: int = 20, + max_report_frequency: int = 5, + infer_limit: int = 3, + print_intermediate_tables: Optional[bool] = None, + metric: Optional[str] = None, + mode: Optional[str] = None): - super(CLIReporter, - self).__init__(metric_columns, parameter_columns, total_samples, - max_progress_rows, max_error_rows, - max_report_frequency, infer_limit, metric, mode) + super(CLIReporter, self).__init__( + metric_columns, parameter_columns, total_samples, + max_progress_rows, max_error_rows, max_report_frequency, + infer_limit, print_intermediate_tables, metric, mode) - def report(self, trials, done, *sys_info): + def report(self, trials: List[Trial], done: bool, *sys_info: Dict): print(self._progress_str(trials, done, *sys_info)) @@ -403,12 +463,22 @@ def memory_debug_str(): "(or ray[debug]) to resolve)") -def trial_progress_str(trials, - metric_columns, - parameter_columns=None, - total_samples=0, - fmt="psql", - max_rows=None): +def _get_trials_by_state(trials: List[Trial]): + trials_by_state = collections.defaultdict(list) + for t in trials: + trials_by_state[t.status].append(t) + return trials_by_state + + +def trial_progress_str( + trials: List[Trial], + metric_columns: Union[List[str], Dict[str, str]], + parameter_columns: Union[None, List[str], Dict[str, str]] = None, + total_samples: int = 0, + force_table: bool = False, + fmt: str = "psql", + max_rows: Optional[int] = None, + done: bool = False): """Returns a human readable message for printing to the console. This contains a table where each row represents a trial, its parameters @@ -426,9 +496,13 @@ def trial_progress_str(trials, the parameter name is used in the message directly. If this is empty, all parameters are used in the message. total_samples (int): Total number of trials that will be generated. + force_table (bool): Force printing a table. If False, a table will + be printed only at the end of the training for verbosity levels + above `Verbosity.V2_TRIAL_NORM`. fmt (str): Output format (see tablefmt in tabulate API). max_rows (int): Maximum number of rows in the trial table. Defaults to unlimited. + done (bool): True indicates that the tuning run finished. """ messages = [] delim = "
" if fmt == "html" else "\n" @@ -436,9 +510,7 @@ def trial_progress_str(trials, return delim.join(messages) num_trials = len(trials) - trials_by_state = collections.defaultdict(list) - for t in trials: - trials_by_state[t.status].append(t) + trials_by_state = _get_trials_by_state(trials) for local_dir in sorted({t.local_dir for t in trials}): messages.append("Result logdir: {}".format(local_dir)) @@ -448,6 +520,30 @@ def trial_progress_str(trials, for state in sorted(trials_by_state) ] + if total_samples and total_samples >= sys.maxsize: + total_samples = "infinite" + + messages.append("Number of trials: {}{} ({})".format( + num_trials, f"/{total_samples}" + if total_samples else "", ", ".join(num_trials_strs))) + + if force_table or (has_verbosity(Verbosity.V2_TRIAL_NORM) and done): + messages += trial_progress_table(trials, metric_columns, + parameter_columns, fmt, max_rows) + + return delim.join(messages) + + +def trial_progress_table( + trials: List[Trial], + metric_columns: Union[List[str], Dict[str, str]], + parameter_columns: Union[None, List[str], Dict[str, str]] = None, + fmt: str = "psql", + max_rows: Optional[int] = None): + messages = [] + num_trials = len(trials) + trials_by_state = _get_trials_by_state(trials) + state_tbl_order = [ Trial.RUNNING, Trial.PAUSED, Trial.PENDING, Trial.TERMINATED, Trial.ERROR @@ -472,19 +568,13 @@ def trial_progress_str(trials, overflow_str = ", ".join(overflow_strs) else: overflow = False + overflow_str = "" trials = [] for state in state_tbl_order: if state not in trials_by_state: continue trials += trials_by_state[state] - if total_samples and total_samples >= sys.maxsize: - total_samples = "infinite" - - messages.append("Number of trials: {}{} ({})".format( - num_trials, f"/{total_samples}" - if total_samples else "", ", ".join(num_trials_strs))) - # Pre-process trials to figure out what columns to show. if isinstance(metric_columns, Mapping): metric_keys = list(metric_columns.keys()) @@ -526,10 +616,12 @@ def trial_progress_str(trials, if overflow: messages.append("... {} more trials not shown ({})".format( overflow, overflow_str)) - return delim.join(messages) + return messages -def trial_errors_str(trials, fmt="psql", max_rows=None): +def trial_errors_str(trials: List[Trial], + fmt: str = "psql", + max_rows: Optional[int] = None): """Returns a readable message regarding trial errors. Args: @@ -558,7 +650,10 @@ def trial_errors_str(trials, fmt="psql", max_rows=None): return delim.join(messages) -def best_trial_str(trial, metric, parameter_columns=None): +def best_trial_str( + trial: Trial, + metric: str, + parameter_columns: Union[None, List[str], Dict[str, str]] = None): """Returns a readable message stating the current best trial.""" val = trial.last_result[metric] config = trial.last_result.get("config", {}) @@ -570,7 +665,8 @@ def best_trial_str(trial, metric, parameter_columns=None): f"parameters={params}" -def _fair_filter_trials(trials_by_state, max_trials): +def _fair_filter_trials(trials_by_state: Dict[str, List[Trial]], + max_trials: int): """Filters trials such that each state is represented fairly. The oldest trials are truncated if necessary. @@ -605,7 +701,7 @@ def _fair_filter_trials(trials_by_state, max_trials): return filtered_trials -def _get_trial_info(trial, parameters, metrics): +def _get_trial_info(trial: Trial, parameters: List[str], metrics: List[str]): """Returns the following information about a trial: name | status | loc | params... | metrics... @@ -625,3 +721,109 @@ def _get_trial_info(trial, parameters, metrics): unflattened_lookup(metric, result, default=None) for metric in metrics ] return trial_info + + +class TrialProgressCallback(Callback): + """Reports (prints) intermediate trial progress. + + This callback is automatically added to the callback stack. When a + result is obtained, this callback will print the results according to + the specified verbosity level. + + For ``Verbosity.V3_TRIAL_DETAILS``, a full result list is printed. + + For ``Verbosity.V2_TRIAL_NORM``, only one line is printed per received + result. + + All other verbosity levels do not print intermediate trial progress. + + Result printing is throttled on a per-trial basis. Per default, results are + printed only once every 30 seconds. Results are always printed when a trial + finished or errored. + + """ + + def __init__(self, metric: Optional[str] = None): + self._last_print = collections.defaultdict(float) + self._completed_trials = set() + self._last_result_str = {} + self._metric = metric + + def on_trial_result(self, iteration: int, trials: List["Trial"], + trial: "Trial", result: Dict, **info): + self.log_result(trial, result, error=False) + + def on_trial_error(self, iteration: int, trials: List["Trial"], + trial: "Trial", **info): + self.log_result(trial, trial.last_result, error=True) + + def on_trial_complete(self, iteration: int, trials: List["Trial"], + trial: "Trial", **info): + # Only log when we never logged that a trial was completed + if trial not in self._completed_trials: + self._completed_trials.add(trial) + + print_result_str = self._print_result(trial.last_result) + last_result_str = self._last_result_str.get(trial, "") + # If this is a new result, print full result string + if print_result_str != last_result_str: + self.log_result(trial, trial.last_result, error=False) + else: + print(f"Trial {trial} completed. " + f"Last result: {print_result_str}") + + def log_result(self, trial: "Trial", result: Dict, error: bool = False): + done = result.get("done", False) is True + last_print = self._last_print[trial] + if done and trial not in self._completed_trials: + self._completed_trials.add(trial) + if has_verbosity(Verbosity.V3_TRIAL_DETAILS) and \ + (done or error or time.time() - last_print > DEBUG_PRINT_INTERVAL): + print("Result for {}:".format(trial)) + print(" {}".format(pretty_print(result).replace("\n", "\n "))) + self._last_print[trial] = time.time() + elif has_verbosity(Verbosity.V2_TRIAL_NORM) and ( + done or error + or time.time() - last_print > DEBUG_PRINT_INTERVAL): + info = "" + if done: + info = " This trial completed." + + metric_name = self._metric or "_metric" + metric_value = result.get(metric_name, -99.) + + print_result_str = self._print_result(result) + + self._last_result_str[trial] = print_result_str + + error_file = os.path.join(trial.logdir, "error.txt") + + if error: + message = f"The trial {trial} errored with " \ + f"parameters={trial.config}. " \ + f"Error file: {error_file}" + elif self._metric: + message = f"Trial {trial} reported " \ + f"{metric_name}={metric_value:.2f} " \ + f"with parameters={trial.config}.{info}" + else: + message = f"Trial {trial} reported " \ + f"{print_result_str} " \ + f"with parameters={trial.config}.{info}" + + print(message) + self._last_print[trial] = time.time() + + def _print_result(self, result: Dict): + print_result = result.copy() + print_result.pop("config", None) + print_result.pop("hist_stats", None) + print_result.pop("trial_id", None) + print_result.pop("experiment_tag", None) + print_result.pop("done", None) + for auto_result in AUTO_RESULT_KEYS: + print_result.pop(auto_result, None) + + print_result_str = ",".join( + [f"{k}={v}" for k, v in print_result.items()]) + return print_result_str diff --git a/python/ray/tune/tests/test_progress_reporter.py b/python/ray/tune/tests/test_progress_reporter.py index c297b8cdd..0df1d89dd 100644 --- a/python/ray/tune/tests/test_progress_reporter.py +++ b/python/ray/tune/tests/test_progress_reporter.py @@ -73,7 +73,7 @@ tune.run_experiments({ "c": tune.grid_search(list(range(10))), }, }, -}, verbose=1, progress_reporter=reporter)""" +}, verbose=3, progress_reporter=reporter)""" EXPECTED_END_TO_END_START = """Number of trials: 1/30 (1 RUNNING) +---------------+----------+-------+-----+ @@ -160,6 +160,48 @@ EXPECTED_BEST_1 = "Current best trial: 00001 with metric_1=0.5 and " \ EXPECTED_BEST_2 = "Current best trial: 00004 with metric_1=2.0 and " \ "parameters={'a': 4}" +VERBOSE_EXP_OUT_1 = "Number of trials: 1/3 (1 RUNNING)" +VERBOSE_EXP_OUT_2 = "Number of trials: 3/3 (3 TERMINATED)" + +VERBOSE_TRIAL_NORM = "Trial train_xxxxx_00000 reported acc=5 with " + \ + """parameters={'do': 'complete'}. This trial completed. +Trial train_xxxxx_00001 reported _metric=6 with parameters={'do': 'once'}. +Trial train_xxxxx_00001 completed. Last result: _metric=6 +Trial train_xxxxx_00002 reported acc=7 with parameters={'do': 'twice'}. +Trial train_xxxxx_00002 reported acc=8 with parameters={'do': 'twice'}. """ + \ + "This trial completed." + +VERBOSE_TRIAL_DETAIL = """+-------------------+----------+-------+----------+ +| Trial name | status | loc | do | +|-------------------+----------+-------+----------| +| train_xxxxx_00000 | RUNNING | | complete | ++-------------------+----------+-------+----------+""" + +VERBOSE_CMD = """from ray import tune +import random +import numpy as np + + +def train(config): + if config["do"] == "complete": + tune.report(acc=5, done=True) + elif config["do"] == "once": + tune.report(6) + else: + tune.report(acc=7) + tune.report(acc=8) + +random.seed(1234) +np.random.seed(1234) + +tune.run( + train, + config={ + "do": tune.grid_search(["complete", "once", "twice"]) + },""" + +# Add "verbose=3)" etc + class ProgressReporterTest(unittest.TestCase): def mock_trial(self, status, i): @@ -294,12 +336,16 @@ class ProgressReporterTest(unittest.TestCase): trials.append(t) # One metric, two parameters prog1 = trial_progress_str( - trials, ["metric_1"], ["a", "b"], fmt="psql", max_rows=3) + trials, ["metric_1"], ["a", "b"], + fmt="psql", + max_rows=3, + force_table=True) print(prog1) assert prog1 == EXPECTED_RESULT_1 # No metric, all parameters - prog2 = trial_progress_str(trials, [], None, fmt="psql", max_rows=None) + prog2 = trial_progress_str( + trials, [], None, fmt="psql", max_rows=None, force_table=True) print(prog2) assert prog2 == EXPECTED_RESULT_2 @@ -310,7 +356,8 @@ class ProgressReporterTest(unittest.TestCase): "metric_2": "Metric 2" }, {"a": "A"}, fmt="psql", - max_rows=3) + max_rows=3, + force_table=True) print(prog3) assert prog3 == EXPECTED_RESULT_3 @@ -363,6 +410,64 @@ class ProgressReporterTest(unittest.TestCase): finally: del os.environ["_TEST_TUNE_TRIAL_UUID"] + def testVerboseReporting(self): + try: + os.environ["_TEST_TUNE_TRIAL_UUID"] = "xxxxx" + + verbose_0_cmd = VERBOSE_CMD + "verbose=0)" + output = run_string_as_driver(verbose_0_cmd) + try: + self.assertNotIn(VERBOSE_EXP_OUT_1, output) + self.assertNotIn(VERBOSE_EXP_OUT_2, output) + self.assertNotIn(VERBOSE_TRIAL_NORM, output) + self.assertNotIn(VERBOSE_TRIAL_DETAIL, output) + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + + verbose_1_cmd = VERBOSE_CMD + "verbose=1)" + output = run_string_as_driver(verbose_1_cmd) + try: + self.assertIn(VERBOSE_EXP_OUT_1, output) + self.assertIn(VERBOSE_EXP_OUT_2, output) + self.assertNotIn(VERBOSE_TRIAL_NORM, output) + self.assertNotIn(VERBOSE_TRIAL_DETAIL, output) + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + + verbose_2_cmd = VERBOSE_CMD + "verbose=2)" + output = run_string_as_driver(verbose_2_cmd) + try: + self.assertIn(VERBOSE_EXP_OUT_1, output) + self.assertIn(VERBOSE_EXP_OUT_2, output) + self.assertIn(VERBOSE_TRIAL_NORM, output) + self.assertNotIn(VERBOSE_TRIAL_DETAIL, output) + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + + verbose_3_cmd = VERBOSE_CMD + "verbose=3)" + output = run_string_as_driver(verbose_3_cmd) + try: + self.assertIn(VERBOSE_EXP_OUT_1, output) + self.assertIn(VERBOSE_EXP_OUT_2, output) + self.assertNotIn(VERBOSE_TRIAL_NORM, output) + self.assertIn(VERBOSE_TRIAL_DETAIL, output) + except Exception: + print("*** BEGIN OUTPUT ***") + print(output) + print("*** END OUTPUT ***") + raise + finally: + del os.environ["_TEST_TUNE_TRIAL_UUID"] + if __name__ == "__main__": import sys diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index b775639d3..2e9465c83 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -16,7 +16,6 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager # NOTE(rkn): We import ray.tune.registry here instead of importing the names we # need because there are cyclic imports that may cause specific names to not # have been defined yet. See https://github.com/ray-project/ray/issues/1716. -from ray.tune.logger import pretty_print from ray.tune.registry import get_trainable_cls, validate_trainable from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION from ray.tune.resources import Resources, json_to_resources, resources_to_json @@ -230,7 +229,6 @@ class Trial: or not len(self.log_to_file) == 2: self.log_to_file = (None, None) - self.verbose = True self.max_failures = max_failures # Local trial state that is updated during the run @@ -480,11 +478,7 @@ class Trial: def update_last_result(self, result, terminate=False): if self.experiment_tag: result.update(experiment_tag=self.experiment_tag) - if self.verbose and (terminate or time.time() - self.last_debug > - DEBUG_PRINT_INTERVAL): - print("Result for {}:".format(self)) - print(" {}".format(pretty_print(result).replace("\n", "\n "))) - self.last_debug = time.time() + self.set_location(Location(result.get("node_ip"), result.get("pid"))) self.last_result = result self.last_update_time = time.time() @@ -527,9 +521,6 @@ class Trial: def get_trainable_cls(self): return get_trainable_cls(self.trainable_name) - def set_verbose(self, verbose): - self.verbose = verbose - def is_finished(self): return self.status in [Trial.ERROR, Trial.TERMINATED] diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 7b28fc144..25bc090f5 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -19,6 +19,7 @@ from ray.tune.trial import Checkpoint, Trial from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.suggest import BasicVariantGenerator from ray.tune.utils import warn_if_slow, flatten_dict, env_integer +from ray.tune.utils.log import Verbosity, has_verbosity from ray.tune.utils.serialization import TuneFunctionDecoder, \ TuneFunctionEncoder from ray.tune.web_server import TuneServer @@ -78,8 +79,6 @@ class TrialRunner: If fail_fast='raise' provided, Tune will automatically raise the exception received by the Trainable. fail_fast='raise' can easily leak resources and should be used with caution. - verbose (bool): Flag for verbosity. If False, trial results - will not be output. checkpoint_period (int): Trial runner checkpoint periodicity in seconds. Defaults to 10. trial_executor (TrialExecutor): Defaults to RayTrialExecutor. @@ -102,7 +101,6 @@ class TrialRunner: resume=False, server_port=None, fail_fast=False, - verbose=True, checkpoint_period=None, trial_executor=None, callbacks=None, @@ -135,7 +133,6 @@ class TrialRunner: else: raise ValueError("fail_fast must be one of {bool, RAISE}. " f"Got {self._fail_fast}.") - self._verbose = verbose self._server = None self._server_port = server_port @@ -165,7 +162,7 @@ class TrialRunner: self.resume(run_errored_only=errored_only) self._resumed = True except Exception as e: - if self._verbose: + if has_verbosity(Verbosity.V3_TRIAL_DETAILS): logger.error(str(e)) logger.exception("Runner restore failed.") if self._fail_fast: @@ -405,7 +402,6 @@ class TrialRunner: Args: trial (Trial): Trial to queue. """ - trial.set_verbose(self._verbose) self._trials.append(trial) with warn_if_slow("scheduler.on_trial_add"): self._scheduler_alg.on_trial_add(self, trial) @@ -565,6 +561,8 @@ class TrialRunner: with warn_if_slow("scheduler.on_trial_result"): decision = self._scheduler_alg.on_trial_result( self, trial, flat_result) + if decision == TrialScheduler.STOP: + result.update(done=True) with warn_if_slow("search_alg.on_trial_result"): self._search_alg.on_trial_result(trial.trial_id, flat_result) @@ -583,7 +581,6 @@ class TrialRunner: iteration=self._iteration, trials=self._trials, trial=trial) - result.update(done=True) if not is_duplicate: trial.update_last_result( diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 3566b95ff..fe26e12e5 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -18,6 +18,7 @@ from ray.tune.syncer import wait_for_sync, set_sync_periods, \ from ray.tune.trial_runner import TrialRunner from ray.tune.progress_reporter import CLIReporter, JupyterNotebookReporter from ray.tune.schedulers import FIFOScheduler +from ray.tune.utils.log import Verbosity, has_verbosity, set_verbosity logger = logging.getLogger(__name__) @@ -70,7 +71,7 @@ def run( checkpoint_score_attr=None, checkpoint_freq=0, checkpoint_at_end=False, - verbose=2, + verbose=Verbosity.V3_TRIAL_DETAILS, progress_reporter=None, log_to_file=False, trial_name_creator=None, @@ -188,8 +189,9 @@ def run( checkpoint_at_end (bool): Whether to checkpoint at the end of the experiment regardless of the checkpoint_freq. Default is False. This has no effect when using the Functional Training API. - verbose (int): 0, 1, or 2. Verbosity mode. 0 = silent, - 1 = only status updates, 2 = status and trial results. + verbose (Union[int, Verbosity]): 0, 1, 2, or 3. Verbosity mode. + 0 = silent, 1 = only status updates, 2 = status and brief trial + results, 3 = status and detailed trial results. Defaults to 3. progress_reporter (ProgressReporter): Progress reporter for reporting intermediate experiment progress. Defaults to CLIReporter if running in command-line, or JupyterNotebookReporter if running in @@ -281,6 +283,8 @@ def run( "The `mode` parameter passed to `tune.run()` has to be one of " "['min', 'max']") + set_verbosity(verbose) + config = config or {} sync_config = sync_config or SyncConfig() set_sync_periods(sync_config) @@ -353,9 +357,9 @@ def run( "own `metric` and `mode` parameters. Either remove the arguments " "from your scheduler or from your call to `tune.run()`") - # Create logger and syncer callbacks + # Create syncer callbacks callbacks = create_default_callbacks( - callbacks, sync_config, loggers=loggers) + callbacks, sync_config, metric=metric, loggers=loggers) runner = TrialRunner( search_alg=search_alg, @@ -366,7 +370,6 @@ def run( stopper=experiments[0].stopper, resume=resume, server_port=server_port, - verbose=bool(verbose > 1), fail_fast=fail_fast, trial_executor=trial_executor, callbacks=callbacks, @@ -380,7 +383,8 @@ def run( if progress_reporter is None: if IS_NOTEBOOK: - progress_reporter = JupyterNotebookReporter(overwrite=verbose < 2) + progress_reporter = JupyterNotebookReporter( + overwrite=not has_verbosity(Verbosity.V2_TRIAL_NORM)) else: progress_reporter = CLIReporter() @@ -413,7 +417,7 @@ def run( tune_start = time.time() while not runner.is_finished(): runner.step() - if verbose: + if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter) tune_taken = time.time() - tune_start @@ -422,7 +426,7 @@ def run( except Exception as e: logger.warning(f"Trial Runner checkpointing failed: {str(e)}") - if verbose: + if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter, done=True) wait_for_sync() @@ -440,8 +444,9 @@ def run( logger.error("Trials did not complete: %s", incomplete_trials) all_taken = time.time() - all_start - logger.info(f"Total run time: {all_taken:.2f} seconds " - f"({tune_taken:.2f} seconds for the tuning loop).") + if has_verbosity(Verbosity.V1_EXPERIMENT): + logger.info(f"Total run time: {all_taken:.2f} seconds " + f"({tune_taken:.2f} seconds for the tuning loop).") trials = runner.get_trials() return ExperimentAnalysis( @@ -454,7 +459,7 @@ def run( def run_experiments(experiments, scheduler=None, server_port=None, - verbose=2, + verbose=Verbosity.V3_TRIAL_DETAILS, progress_reporter=None, resume=False, queue_trials=False, diff --git a/python/ray/tune/utils/callback.py b/python/ray/tune/utils/callback.py index b2f5fcf71..d54bd83ea 100644 --- a/python/ray/tune/utils/callback.py +++ b/python/ray/tune/utils/callback.py @@ -4,6 +4,7 @@ import logging import os from ray.tune.callback import Callback +from ray.tune.progress_reporter import TrialProgressCallback from ray.tune.syncer import SyncConfig, detect_sync_to_driver from ray.tune.logger import CSVLoggerCallback, CSVLogger, LoggerCallback, \ JsonLoggerCallback, JsonLogger, LegacyLoggerCallback, Logger, \ @@ -15,14 +16,44 @@ logger = logging.getLogger(__name__) def create_default_callbacks(callbacks: Optional[List[Callback]], sync_config: SyncConfig, - loggers: Optional[List[Logger]]): + loggers: Optional[List[Logger]], + metric: Optional[str] = None): + """Create default callbacks for `tune.run()`. + This function takes a list of existing callbacks and adds default + callbacks to it. + + Specifically, three kinds of callbacks will be added: + + 1. Loggers. Ray Tune's experiment analysis relies on CSV and JSON logging. + 2. Syncer. Ray Tune synchronizes logs and checkpoint between workers and + the head node. + 2. Trial progress reporter. For reporting intermediate progress, like trial + results, Ray Tune uses a callback. + + These callbacks will only be added if they don't already exist, i.e. if + they haven't been passed (and configured) by the user. A notable case + is when a Logger is passed, which is not a CSV or JSON logger - then + a CSV and JSON logger will still be created. + + Lastly, this function will ensure that the Syncer callback comes after all + Logger callbacks, to ensure that the most up-to-date logs and checkpoints + are synced across nodes. + + """ callbacks = callbacks or [] has_syncer_callback = False has_csv_logger = False has_json_logger = False has_tbx_logger = False + has_trial_progress_callback = any( + isinstance(c, TrialProgressCallback) for c in callbacks) + + if not has_trial_progress_callback: + trial_progress_callback = TrialProgressCallback(metric=metric) + callbacks.append(trial_progress_callback) + # Track syncer obj/index to move callback after loggers last_logger_index = None syncer_index = None diff --git a/python/ray/tune/utils/log.py b/python/ray/tune/utils/log.py new file mode 100644 index 000000000..a62eb2b66 --- /dev/null +++ b/python/ray/tune/utils/log.py @@ -0,0 +1,34 @@ +from enum import Enum +from typing import Union + + +class Verbosity(Enum): + V0_MINIMAL = 0 + V1_EXPERIMENT = 1 + V2_TRIAL_NORM = 2 + V3_TRIAL_DETAILS = 3 + + def __int__(self): + return self.value + + +verbosity: Union[int, Verbosity] = Verbosity.V3_TRIAL_DETAILS + + +def set_verbosity(level: Union[int, Verbosity]): + global verbosity + + if isinstance(level, int): + verbosity = Verbosity(level) + else: + verbosity = verbosity + + +def has_verbosity(level: Union[int, Verbosity]) -> bool: + """Return True if passed level exceeds global verbosity level.""" + global verbosity + + log_level = int(level) + verbosity_level = int(verbosity) + + return verbosity_level >= log_level diff --git a/rllib/train.py b/rllib/train.py index a89a23f18..228dcbfbc 100755 --- a/rllib/train.py +++ b/rllib/train.py @@ -8,12 +8,19 @@ import yaml import ray from ray.cluster_utils import Cluster from ray.tune.config_parser import make_parser +from ray.tune.progress_reporter import CLIReporter, JupyterNotebookReporter from ray.tune.result import DEFAULT_RESULTS_DIR from ray.tune.resources import resources_to_json from ray.tune.tune import run_experiments from ray.tune.schedulers import create_scheduler from ray.rllib.utils.framework import try_import_tf, try_import_torch +try: + class_name = get_ipython().__class__.__name__ + IS_NOTEBOOK = True if "Terminal" not in class_name else False +except NameError: + IS_NOTEBOOK = False + # Try to import both backends for flag checking/warnings. tf1, tf, tfv = try_import_tf() torch, _ = try_import_torch() @@ -184,10 +191,10 @@ def run(args, parser): if args.v: exp["config"]["log_level"] = "INFO" - verbose = 2 + verbose = 3 # Print details on trial result if args.vv: exp["config"]["log_level"] = "DEBUG" - verbose = 3 + verbose = 3 # Print details on trial result if args.ray_num_nodes: cluster = Cluster() @@ -206,12 +213,19 @@ def run(args, parser): num_gpus=args.ray_num_gpus, local_mode=args.local_mode) + if IS_NOTEBOOK: + progress_reporter = JupyterNotebookReporter( + overwrite=verbose >= 3, print_intermediate_tables=verbose >= 1) + else: + progress_reporter = CLIReporter(print_intermediate_tables=verbose >= 1) + run_experiments( experiments, scheduler=create_scheduler(args.scheduler, **args.scheduler_config), resume=args.resume, queue_trials=args.queue_trials, verbose=verbose, + progress_reporter=progress_reporter, concurrent=True) ray.shutdown()