diff --git a/python/ray/rllib/examples/custom_metrics_and_callbacks.py b/python/ray/rllib/examples/custom_metrics_and_callbacks.py index ba7795bf0..b8772ad53 100644 --- a/python/ray/rllib/examples/custom_metrics_and_callbacks.py +++ b/python/ray/rllib/examples/custom_metrics_and_callbacks.py @@ -75,7 +75,7 @@ if __name__ == "__main__": "on_postprocess_traj": tune.function(on_postprocess_traj), }, }, - ) + return_trials=True) # verify custom metrics for integration tests custom_metrics = trials[0].last_result["custom_metrics"] diff --git a/python/ray/tune/__init__.py b/python/ray/tune/__init__.py index 560a67e6b..6c45df1e1 100644 --- a/python/ray/tune/__init__.py +++ b/python/ray/tune/__init__.py @@ -5,6 +5,7 @@ from __future__ import print_function from ray.tune.error import TuneError from ray.tune.tune import run_experiments, run from ray.tune.experiment import Experiment +from ray.tune.analysis import ExperimentAnalysis, Analysis from ray.tune.registry import register_env, register_trainable from ray.tune.trainable import Trainable from ray.tune.suggest import grid_search @@ -14,5 +15,6 @@ from ray.tune.sample import (function, sample_from, uniform, choice, randint, __all__ = [ "Trainable", "TuneError", "grid_search", "register_env", "register_trainable", "run", "run_experiments", "Experiment", "function", - "sample_from", "track", "uniform", "choice", "randint", "randn" + "sample_from", "track", "uniform", "choice", "randint", "randn", + "ExperimentAnalysis", "Analysis" ] diff --git a/python/ray/tune/analysis/__init__.py b/python/ray/tune/analysis/__init__.py index 51e43ed10..9d9f1725c 100644 --- a/python/ray/tune/analysis/__init__.py +++ b/python/ray/tune/analysis/__init__.py @@ -2,6 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from ray.tune.analysis.experiment_analysis import ExperimentAnalysis +from ray.tune.analysis.experiment_analysis import ExperimentAnalysis, Analysis -__all__ = ["ExperimentAnalysis"] +__all__ = ["ExperimentAnalysis", "Analysis"] diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index 2fe9a9d46..f501d57a0 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -3,13 +3,13 @@ from __future__ import division from __future__ import print_function import copy -import glob import json import logging import os import pandas as pd from ray.tune.error import TuneError +from ray.tune.result import EXPR_PROGRESS_FILE, EXPR_PARAM_FILE from ray.tune.util import flatten_dict logger = logging.getLogger(__name__) @@ -34,20 +34,134 @@ def unnest_checkpoints(checkpoints): return checkpoint_dicts -class ExperimentAnalysis(object): +class Analysis(object): + """Analyze all results from a directory of experiments.""" + + def __init__(self, experiment_dir): + experiment_dir = os.path.expanduser(experiment_dir) + if not os.path.isdir(experiment_dir): + raise ValueError( + "{} is not a valid directory.".format(experiment_dir)) + self._experiment_dir = experiment_dir + self._configs = {} + self._trial_dataframes = {} + self.fetch_trial_dataframes() + + def fetch_trial_dataframes(self): + fail_count = 0 + for path in self._get_trial_paths(): + try: + self.trial_dataframes[path] = pd.read_csv( + os.path.join(path, EXPR_PROGRESS_FILE)) + except Exception: + fail_count += 1 + + if fail_count: + logger.debug( + "Couldn't read results from {} paths".format(fail_count)) + return self.trial_dataframes + + def get_all_configs(self, prefix=False): + fail_count = 0 + for path in self._get_trial_paths(): + try: + with open(os.path.join(path, EXPR_PARAM_FILE)) as f: + config = json.load(f) + if prefix: + for k in list(config): + config["config:" + k] = config.pop(k) + self._configs[path] = config + except Exception: + fail_count += 1 + + if fail_count: + logger.warning( + "Couldn't read config from {} paths".format(fail_count)) + return self._configs + + def dataframe(self, metric=None, mode=None): + """Returns a pandas.DataFrame object constructed from the trials. + + Args: + metric (str): Key for trial info to order on. + If None, uses last result. + mode (str): One of [min, max]. + + """ + rows = self._retrieve_rows(metric=metric, mode=mode) + all_configs = self.get_all_configs(prefix=True) + for path, config in all_configs.items(): + if path in rows: + rows[path].update(config) + rows[path].update(logdir=path) + return pd.DataFrame(list(rows.values())) + + def get_best_config(self, metric, mode="max"): + """Retrieve the best config corresponding to the trial. + + Args: + metric (str): Key for trial info to order on. + mode (str): One of [min, max]. + + """ + rows = self._retrieve_rows(metric=metric, mode=mode) + all_configs = self.get_all_configs() + compare_op = max if mode == "max" else min + best_path = compare_op(rows, key=lambda k: rows[k][metric]) + return all_configs[best_path] + + def _retrieve_rows(self, metric=None, mode=None): + assert mode is None or mode in ["max", "min"] + rows = {} + for path, df in self.trial_dataframes.items(): + if mode == "max": + idx = df[metric].idxmax() + elif mode == "min": + idx = df[metric].idxmin() + else: + idx = -1 + rows[path] = df.iloc[idx].to_dict() + + return rows + + def _get_trial_paths(self): + _trial_paths = [] + for trial_path, _, files in os.walk(self._experiment_dir): + if EXPR_PROGRESS_FILE in files: + _trial_paths += [trial_path] + + if not _trial_paths: + raise TuneError("No trials found in {}.".format( + self._experiment_dir)) + return _trial_paths + + def get_best_logdir(self, metric, mode="max"): + df = self.dataframe() + if mode == "max": + return df.iloc[df[metric].idxmax()].logdir + elif mode == "min": + return df.iloc[df[metric].idxmin()].logdir + + @property + def trial_dataframes(self): + return self._trial_dataframes + + +class ExperimentAnalysis(Analysis): """Analyze results from a Tune experiment. Parameters: - experiment_path (str): Path to where experiment is located. - Corresponds to Experiment.local_dir/Experiment.name + experiment_checkpoint_path (str): Path to a json file + representing an experiment state. Corresponds to + Experiment.local_dir/Experiment.name/experiment_state.json Example: >>> tune.run(my_trainable, name="my_exp", local_dir="~/tune_results") >>> analysis = ExperimentAnalysis( - >>> experiment_path="~/tune_results/my_exp") + >>> experiment_checkpoint_path="~/tune_results/my_exp/state.json") """ - def __init__(self, experiment_path, trials=None): + def __init__(self, experiment_checkpoint_path, trials=None): """Initializer. Args: @@ -55,45 +169,15 @@ class ExperimentAnalysis(object): trials (list|None): List of trials that can be accessed via `analysis.trials`. """ - experiment_path = os.path.expanduser(experiment_path) - if not os.path.isdir(experiment_path): - raise TuneError( - "{} is not a valid directory.".format(experiment_path)) - experiment_state_paths = glob.glob( - os.path.join(experiment_path, "experiment_state*.json")) - if not experiment_state_paths: - raise TuneError( - "No experiment state found in {}!".format(experiment_path)) - experiment_filename = max( - list(experiment_state_paths)) # if more than one, pick latest - with open(experiment_filename) as f: - self._experiment_state = json.load(f) + with open(experiment_checkpoint_path) as f: + _experiment_state = json.load(f) - if "checkpoints" not in self._experiment_state: + if "checkpoints" not in _experiment_state: raise TuneError("Experiment state invalid; no checkpoints found.") - self._checkpoints = self._experiment_state["checkpoints"] - self._scrubbed_checkpoints = unnest_checkpoints(self._checkpoints) + self._checkpoints = _experiment_state["checkpoints"] self.trials = trials - self._dataframe = None - - def get_all_trial_dataframes(self): - trial_dfs = {} - for checkpoint in self._checkpoints: - logdir = checkpoint["logdir"] - progress = max(glob.glob(os.path.join(logdir, "progress.csv"))) - trial_dfs[checkpoint["trial_id"]] = pd.read_csv(progress) - return trial_dfs - - def dataframe(self, refresh=False): - """Returns a pandas.DataFrame object constructed from the trials. - - Args: - refresh (bool): Clears the cache which may have an existing copy. - - """ - if self._dataframe is None or refresh: - self._dataframe = pd.DataFrame(self._scrubbed_checkpoints) - return self._dataframe + super(ExperimentAnalysis, self).__init__( + os.path.dirname(experiment_checkpoint_path)) def stats(self): """Returns a dictionary of the statistics of the experiment.""" @@ -103,54 +187,11 @@ class ExperimentAnalysis(object): """Returns a dictionary of the TrialRunner data.""" return self._experiment_state.get("runner_data") - def trial_dataframe(self, trial_id): - """Returns a pandas.DataFrame constructed from one trial.""" - for checkpoint in self._checkpoints: - if checkpoint["trial_id"] == trial_id: - logdir = checkpoint["logdir"] - progress = max(glob.glob(os.path.join(logdir, "progress.csv"))) - return pd.read_csv(progress) - raise ValueError("Trial id {} not found".format(trial_id)) - - def get_best_trainable(self, metric, trainable_cls, mode="max"): - """Returns the best Trainable based on the experiment metric. - - Args: - metric (str): Key for trial info to order on. - mode (str): One of [min, max]. - - """ - return trainable_cls(config=self.get_best_config(metric, mode=mode)) - - def get_best_config(self, metric, mode="max"): - """Retrieve the best config from the best trial. - - Args: - metric (str): Key for trial info to order on. - mode (str): One of [min, max]. - - """ - return self.get_best_info(metric, flatten=False, mode=mode)["config"] - - def get_best_logdir(self, metric, mode="max"): - df = self.dataframe() - if mode == "max": - return df.iloc[df[metric].idxmax()].logdir - elif mode == "min": - return df.iloc[df[metric].idxmin()].logdir - - def get_best_info(self, metric, mode="max", flatten=True): - """Retrieve the best trial based on the experiment metric. - - Args: - metric (str): Key for trial info to order on. - mode (str): One of [min, max]. - flatten (bool): Assumes trial info is flattened, where - nested entries are concatenated like `info:metric`. - """ - optimize_op = max if mode == "max" else min - if flatten: - return optimize_op( - self._scrubbed_checkpoints, key=lambda d: d.get(metric, 0)) - return optimize_op( - self._checkpoints, key=lambda d: d["last_result"].get(metric, 0)) + def _get_trial_paths(self): + """Overwrites Analysis to only have trials of one experiment.""" + _trial_paths = [ + checkpoint["logdir"] for checkpoint in self._checkpoints + ] + if not _trial_paths: + raise TuneError("No trials found.") + return _trial_paths diff --git a/python/ray/tune/commands.py b/python/ray/tune/commands.py index b8901566a..631935908 100644 --- a/python/ray/tune/commands.py +++ b/python/ray/tune/commands.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import logging +import glob import os import sys import subprocess @@ -11,9 +12,9 @@ from datetime import datetime import pandas as pd from pandas.api.types import is_string_dtype, is_numeric_dtype -from ray.tune.result import TRAINING_ITERATION, MEAN_ACCURACY, MEAN_LOSS -from ray.tune.trial import Trial -from ray.tune.analysis import ExperimentAnalysis +from ray.tune.result import (TRAINING_ITERATION, MEAN_ACCURACY, MEAN_LOSS, + TIME_TOTAL_S, TRIAL_ID) +from ray.tune.analysis import Analysis from ray.tune import TuneError try: from tabulate import tabulate @@ -26,9 +27,9 @@ EDITOR = os.getenv("EDITOR", "vim") TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S (%A)" -DEFAULT_EXPERIMENT_INFO_KEYS = ("trainable_name", "experiment_tag", "trial_id", - "status", "last_update_time", - TRAINING_ITERATION, MEAN_ACCURACY, MEAN_LOSS) +DEFAULT_EXPERIMENT_INFO_KEYS = ("trainable_name", "experiment_tag", + TRAINING_ITERATION, TIME_TOTAL_S, + MEAN_ACCURACY, MEAN_LOSS, TRIAL_ID) DEFAULT_PROJECT_INFO_KEYS = ( "name", @@ -62,6 +63,20 @@ def _check_tabulate(): "Tabulate not installed. Please run `pip install tabulate`.") +def get_most_recent_state(experiment_path): + experiment_path = os.path.expanduser(experiment_path) + if not os.path.isdir(experiment_path): + raise TuneError("{} is not a valid directory.".format(experiment_path)) + experiment_state_paths = glob.glob( + os.path.join(experiment_path, "experiment_state*.json")) + if not experiment_state_paths: + raise TuneError( + "No experiment state found in {}!".format(experiment_path)) + experiment_filename = max( + list(experiment_state_paths)) # if more than one, pick latest + return experiment_filename + + def print_format_output(dataframe): """Prints output of given dataframe to fit into terminal. @@ -112,7 +127,7 @@ def list_trials(experiment_path, Args: experiment_path (str): Directory where trials are located. - Corresponds to Experiment.local_dir/Experiment.name. + Like Experiment.local_dir/Experiment.name/experiment*.json. sort (list): Keys to sort by. output (str): Name of file where output is saved. filter_op (str): Filter operation in the format @@ -124,14 +139,17 @@ def list_trials(experiment_path, _check_tabulate() try: - checkpoints_df = ExperimentAnalysis(experiment_path).dataframe() + checkpoints_df = Analysis(experiment_path).dataframe() except TuneError: print("No experiment state found!") - sys.exit(0) + sys.exit(1) if not info_keys: info_keys = DEFAULT_EXPERIMENT_INFO_KEYS - col_keys = [k for k in list(info_keys) if k in checkpoints_df] + col_keys = [ + k for k in checkpoints_df.columns + if k in info_keys or k.startswith("config:") + ] checkpoints_df = checkpoints_df[col_keys] if "last_update_time" in checkpoints_df: @@ -143,7 +161,7 @@ def list_trials(experiment_path, checkpoints_df["last_update_time"] = datetime_series if "logdir" in checkpoints_df: - # logdir often too verbose to view in table, so drop experiment_path + # logdir often too long to view in table, so drop experiment_path checkpoints_df["logdir"] = checkpoints_df["logdir"].str.replace( experiment_path, "") @@ -213,38 +231,11 @@ def list_experiments(project_path, experiment_data_collection = [] for experiment_dir in experiment_folders: - analysis_obj, checkpoints_df = None, None - try: - analysis_obj = ExperimentAnalysis( - os.path.join(project_path, experiment_dir)) - checkpoints_df = analysis_obj.dataframe() - except TuneError: - logger.debug("No experiment state found in %s", experiment_dir) - continue + num_trials = sum( + "result.json" in files + for _, _, files in os.walk(os.path.join(base, experiment_dir))) - # Format time-based values. - stats = analysis_obj.stats() - time_values = { - "start_time": stats.get("_start_time"), - "last_updated": stats.get("timestamp"), - } - - formatted_time_values = { - key: datetime.fromtimestamp(val).strftime(TIMESTAMP_FORMAT) - if val else None - for key, val in time_values.items() - } - - experiment_data = { - "name": experiment_dir, - "total_trials": checkpoints_df.shape[0], - "running_trials": ( - checkpoints_df["status"] == Trial.RUNNING).sum(), - "terminated_trials": ( - checkpoints_df["status"] == Trial.TERMINATED).sum(), - "error_trials": (checkpoints_df["status"] == Trial.ERROR).sum(), - } - experiment_data.update(formatted_time_values) + experiment_data = {"name": experiment_dir, "total_trials": num_trials} experiment_data_collection.append(experiment_data) if not experiment_data_collection: diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py index 9e2a96aee..7a3bf97a3 100644 --- a/python/ray/tune/logger.py +++ b/python/ray/tune/logger.py @@ -14,8 +14,10 @@ import numpy as np import ray.cloudpickle as cloudpickle from ray.tune.syncer import get_log_syncer -from ray.tune.result import NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S, \ - TIMESTEPS_TOTAL +from ray.tune.result import (NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S, + TIMESTEPS_TOTAL, EXPR_PARAM_FILE, + EXPR_PARAM_PICKLE_FILE, EXPR_PROGRESS_FILE, + EXPR_RESULT_FILE) logger = logging.getLogger(__name__) @@ -72,7 +74,7 @@ class NoopLogger(Logger): class JsonLogger(Logger): def _init(self): self.update_config(self.config) - local_file = os.path.join(self.logdir, "result.json") + local_file = os.path.join(self.logdir, EXPR_RESULT_FILE) self.local_out = open(local_file, "a") def on_result(self, result): @@ -91,7 +93,7 @@ class JsonLogger(Logger): def update_config(self, config): self.config = config - config_out = os.path.join(self.logdir, "params.json") + config_out = os.path.join(self.logdir, EXPR_PARAM_FILE) with open(config_out, "w") as f: json.dump( self.config, @@ -99,7 +101,7 @@ class JsonLogger(Logger): indent=2, sort_keys=True, cls=_SafeFallbackEncoder) - config_pkl = os.path.join(self.logdir, "params.pkl") + config_pkl = os.path.join(self.logdir, EXPR_PARAM_PICKLE_FILE) with open(config_pkl, "wb") as f: cloudpickle.dump(self.config, f) @@ -150,7 +152,7 @@ class TFLogger(Logger): t = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION] self._file_writer.add_summary(train_stats, t) iteration_value = to_tf_values({ - "training_iteration": result[TRAINING_ITERATION] + TRAINING_ITERATION: result[TRAINING_ITERATION] }, ["ray", "tune"]) iteration_stats = tf.Summary(value=iteration_value) self._file_writer.add_summary(iteration_stats, t) @@ -167,7 +169,7 @@ class CSVLogger(Logger): def _init(self): """CSV outputted with Headers as first set of results.""" # Note that we assume params.json was already created by JsonLogger - progress_file = os.path.join(self.logdir, "progress.csv") + progress_file = os.path.join(self.logdir, EXPR_PROGRESS_FILE) self._continuing = os.path.exists(progress_file) self._file = open(progress_file, "a") self._csv_out = None diff --git a/python/ray/tune/result.py b/python/ray/tune/result.py index 3ef2d2975..0714adeb8 100644 --- a/python/ray/tune/result.py +++ b/python/ray/tune/result.py @@ -15,6 +15,9 @@ SHOULD_CHECKPOINT = "should_checkpoint" # (Auto-filled) The hostname of the machine hosting the training process. HOSTNAME = "hostname" +# (Auto-filled) The auto-assigned id of the trial. +TRIAL_ID = "trial_id" + # (Auto-filled) The node ip of the machine hosting the training process. NODE_IP = "node_ip" @@ -73,6 +76,9 @@ EXPR_META_FILE = "trial_status.json" # File that stores parameters of the trial. EXPR_PARAM_FILE = "params.json" +# Pickle File that stores parameters of the trial. +EXPR_PARAM_PICKLE_FILE = "params.pkl" + # File that stores the progress of the trial. EXPR_PROGRESS_FILE = "progress.csv" diff --git a/python/ray/tune/tests/test_commands.py b/python/ray/tune/tests/test_commands.py index 277d9ed0b..6ddfdc45a 100644 --- a/python/ray/tune/tests/test_commands.py +++ b/python/ray/tune/tests/test_commands.py @@ -75,7 +75,7 @@ def test_ls(start_ray, tmpdir): local_dir=str(tmpdir), global_checkpoint_period=0) - columns = ["status", "episode_reward_mean", "training_iteration"] + columns = ["episode_reward_mean", "training_iteration", "trial_id"] limit = 2 with Capturing() as output: commands.list_trials(experiment_path, info_keys=columns, limit=limit) @@ -87,11 +87,10 @@ def test_ls(start_ray, tmpdir): with Capturing() as output: commands.list_trials( experiment_path, - sort=["status"], - info_keys=("status", ), - filter_op="status == TERMINATED") + sort=["trial_id"], + info_keys=("trial_id", "training_iteration"), + filter_op="training_iteration == 1") lines = output.captured - assert sum("TERMINATED" in line for line in lines) == num_samples assert len(lines) == 3 + num_samples + 1 diff --git a/python/ray/tune/tests/test_experiment_analysis.py b/python/ray/tune/tests/test_experiment_analysis.py index 7a2c70f84..fc026cc9a 100644 --- a/python/ray/tune/tests/test_experiment_analysis.py +++ b/python/ray/tune/tests/test_experiment_analysis.py @@ -10,14 +10,13 @@ import os import pandas as pd import ray -from ray.tune import run, sample_from +from ray.tune import run, sample_from, Analysis from ray.tune.examples.async_hyperband_example import MyTrainableClass class ExperimentAnalysisSuite(unittest.TestCase): def setUp(self): ray.init(local_mode=True) - self.test_dir = tempfile.mkdtemp() self.test_name = "analysis_exp" self.num_samples = 10 @@ -51,20 +50,13 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.assertEquals(df.shape[0], self.num_samples) def testTrialDataframe(self): - cs = self.ea._checkpoints - idx = random.randint(0, len(cs) - 1) - trial_df = self.ea.trial_dataframe( - cs[idx]["trial_id"]) # random trial df + checkpoints = self.ea._checkpoints + idx = random.randint(0, len(checkpoints) - 1) + trial_df = self.ea.trial_dataframes[checkpoints[idx]["logdir"]] self.assertTrue(isinstance(trial_df, pd.DataFrame)) self.assertEqual(trial_df.shape[0], 1) - def testBestTrainable(self): - best_trainable = self.ea.get_best_trainable(self.metric, - MyTrainableClass) - - self.assertTrue(isinstance(best_trainable, MyTrainableClass)) - def testBestConfig(self): best_config = self.ea.get_best_config(self.metric) @@ -72,53 +64,6 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.assertTrue("width" in best_config) self.assertTrue("height" in best_config) - def testBestTrial(self): - best_trial = self.ea.get_best_info(self.metric, flatten=False) - - self.assertTrue(isinstance(best_trial, dict)) - self.assertTrue("local_dir" in best_trial) - self.assertEqual(best_trial["local_dir"], - os.path.expanduser(self.test_path)) - self.assertTrue("config" in best_trial) - self.assertTrue("width" in best_trial["config"]) - self.assertTrue("height" in best_trial["config"]) - self.assertTrue("last_result" in best_trial) - self.assertTrue(self.metric in best_trial["last_result"]) - - min_trial = self.ea.get_best_info( - self.metric, mode="min", flatten=False) - - self.assertTrue(isinstance(min_trial, dict)) - self.assertLess(min_trial["last_result"][self.metric], - best_trial["last_result"][self.metric]) - - flat_trial = self.ea.get_best_info(self.metric, flatten=True) - - self.assertTrue(isinstance(min_trial, dict)) - self.assertTrue(self.metric in flat_trial) - - def testCheckpoints(self): - checkpoints = self.ea._checkpoints - - self.assertTrue(isinstance(checkpoints, list)) - self.assertTrue(isinstance(checkpoints[0], dict)) - self.assertEqual(len(checkpoints), self.num_samples) - - def testStats(self): - stats = self.ea.stats() - - self.assertTrue(isinstance(stats, dict)) - self.assertTrue("start_time" in stats) - self.assertTrue("timestamp" in stats) - - def testRunnerData(self): - runner_data = self.ea.runner_data() - - self.assertTrue(isinstance(runner_data, dict)) - self.assertTrue("_local_checkpoint_dir" in runner_data) - self.assertEqual(runner_data["_local_checkpoint_dir"], - os.path.expanduser(self.test_path)) - def testBestLogdir(self): logdir = self.ea.get_best_logdir(self.metric) self.assertTrue(logdir.startswith(self.test_path)) @@ -127,13 +72,72 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.assertNotEquals(logdir, logdir2) def testAllDataframes(self): - dataframes = self.ea.get_all_trial_dataframes() + dataframes = self.ea.trial_dataframes self.assertTrue(len(dataframes) == self.num_samples) self.assertTrue(isinstance(dataframes, dict)) for df in dataframes.values(): self.assertEqual(df.training_iteration.max(), 1) + def testIgnoreOtherExperiment(self): + analysis = run( + MyTrainableClass, + global_checkpoint_period=0, + name="test_example", + local_dir=self.test_dir, + return_trials=False, + stop={"training_iteration": 1}, + num_samples=1, + config={ + "width": sample_from( + lambda spec: 10 + int(90 * random.random())), + "height": sample_from(lambda spec: int(100 * random.random())), + }) + df = analysis.dataframe() + self.assertEquals(df.shape[0], 1) + + +class AnalysisSuite(unittest.TestCase): + def setUp(self): + ray.init(local_mode=True) + self.test_dir = tempfile.mkdtemp() + self.num_samples = 10 + self.metric = "episode_reward_mean" + self.run_test_exp(test_name="analysis_exp1") + self.run_test_exp(test_name="analysis_exp2") + + def run_test_exp(self, test_name=None): + run(MyTrainableClass, + global_checkpoint_period=0, + name=test_name, + local_dir=self.test_dir, + return_trials=False, + stop={"training_iteration": 1}, + num_samples=self.num_samples, + config={ + "width": sample_from( + lambda spec: 10 + int(90 * random.random())), + "height": sample_from(lambda spec: int(100 * random.random())), + }) + + def tearDown(self): + shutil.rmtree(self.test_dir, ignore_errors=True) + ray.shutdown() + + def testDataframe(self): + analysis = Analysis(self.test_dir) + df = analysis.dataframe() + self.assertTrue(isinstance(df, pd.DataFrame)) + self.assertEquals(df.shape[0], self.num_samples * 2) + + def testBestLogdir(self): + analysis = Analysis(self.test_dir) + logdir = analysis.get_best_logdir(self.metric) + self.assertTrue(logdir.startswith(self.test_dir)) + logdir2 = analysis.get_best_logdir(self.metric, mode="min") + self.assertTrue(logdir2.startswith(self.test_dir)) + self.assertNotEquals(logdir, logdir2) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/ray/tune/tests/test_track.py b/python/ray/tune/tests/test_track.py index d3b6c38d7..5a1687100 100644 --- a/python/ray/tune/tests/test_track.py +++ b/python/ray/tune/tests/test_track.py @@ -66,7 +66,7 @@ class TrackApiTest(unittest.TestCase): for i in range(config["iters"]): track.log(iteration=i, hi="test") - trials = tune.run(testme, config={"iters": 5}) + trials = tune.run(testme, config={"iters": 5}).trials trial_res = trials[0].last_result self.assertTrue(trial_res["hi"], "test") self.assertTrue(trial_res["training_iteration"], 5) diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index ac3c9a07b..137ad5196 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -23,7 +23,7 @@ from ray.tune.registry import _global_registry, TRAINABLE_CLASS from ray.tune.result import (DEFAULT_RESULTS_DIR, TIMESTEPS_TOTAL, DONE, HOSTNAME, NODE_IP, PID, EPISODES_TOTAL, TRAINING_ITERATION, TIMESTEPS_THIS_ITER, - TIME_THIS_ITER_S, TIME_TOTAL_S) + TIME_THIS_ITER_S, TIME_TOTAL_S, TRIAL_ID) from ray.tune.logger import Logger from ray.tune.util import pin_in_object_store, get_pinned_object from ray.tune.experiment import Experiment @@ -115,6 +115,7 @@ class TrainableFunctionApiTest(unittest.TestCase): NO_COMPARE_FIELDS = { HOSTNAME, NODE_IP, + TRIAL_ID, PID, TIME_THIS_ITER_S, TIME_TOTAL_S, @@ -432,7 +433,14 @@ class TrainableFunctionApiTest(unittest.TestCase): for i in range(10): reporter(test={"test1": {"test2": i}}) - [trial] = tune.run(train, stop={"test": {"test1": {"test2": 6}}}) + [trial] = tune.run( + train, stop={ + "test": { + "test1": { + "test2": 6 + } + } + }).trials self.assertEqual(trial.last_result["training_iteration"], 7) def testEarlyReturn(self): @@ -938,7 +946,7 @@ class TestSyncFunctionality(unittest.TestCase): "training_iteration": 1 }, "sync_to_cloud": "echo {source} {target}" - }) + }).trials @patch("ray.tune.syncer.S3_PREFIX", "test") def testCloudProperString(self): @@ -953,7 +961,7 @@ class TestSyncFunctionality(unittest.TestCase): }, "upload_dir": "test", "sync_to_cloud": "ls {target}" - }) + }).trials with self.assertRaises(ValueError): [trial] = tune.run( @@ -966,7 +974,7 @@ class TestSyncFunctionality(unittest.TestCase): }, "upload_dir": "test", "sync_to_cloud": "ls {source}" - }) + }).trials tmpdir = tempfile.mkdtemp() logfile = os.path.join(tmpdir, "test.log") @@ -981,7 +989,7 @@ class TestSyncFunctionality(unittest.TestCase): }, "upload_dir": "test", "sync_to_cloud": "echo {source} {target} > " + logfile - }) + }).trials with open(logfile) as f: lines = f.read() self.assertTrue("test" in lines) @@ -1000,7 +1008,7 @@ class TestSyncFunctionality(unittest.TestCase): "training_iteration": 1 }, "sync_to_driver": "ls {target}" - }) + }).trials with self.assertRaises(TuneError): # This raises TuneError because logger is init in safe zone. @@ -1013,7 +1021,7 @@ class TestSyncFunctionality(unittest.TestCase): "training_iteration": 1 }, "sync_to_driver": "ls {source}" - }) + }).trials with patch("ray.tune.syncer.CommandSyncer.sync_function" ) as mock_fn, patch( @@ -1028,7 +1036,7 @@ class TestSyncFunctionality(unittest.TestCase): "training_iteration": 1 }, "sync_to_driver": "echo {source} {target}" - }) + }).trials self.assertGreater(mock_fn.call_count, 0) def testCloudFunctions(self): @@ -1045,9 +1053,11 @@ class TestSyncFunctionality(unittest.TestCase): name="foo", max_failures=0, local_dir=tmpdir, - stop={"training_iteration": 1}, + stop={ + "training_iteration": 1 + }, upload_dir=tmpdir2, - sync_to_cloud=tune.function(sync_func)) + sync_to_cloud=tune.function(sync_func)).trials test_file_path = glob.glob(os.path.join(tmpdir2, "foo", "*.json")) self.assertTrue(test_file_path) shutil.rmtree(tmpdir) @@ -1065,8 +1075,10 @@ class TestSyncFunctionality(unittest.TestCase): "__fake", name="foo", max_failures=0, - stop={"training_iteration": 1}, - sync_to_driver=tune.function(sync_func_driver)) + stop={ + "training_iteration": 1 + }, + sync_to_driver=tune.function(sync_func_driver)).trials test_file_path = os.path.join(trial.logdir, "test.log2") self.assertFalse(os.path.exists(test_file_path)) @@ -1076,8 +1088,10 @@ class TestSyncFunctionality(unittest.TestCase): "__fake", name="foo", max_failures=0, - stop={"training_iteration": 1}, - sync_to_driver=tune.function(sync_func_driver)) + stop={ + "training_iteration": 1 + }, + sync_to_driver=tune.function(sync_func_driver)).trials test_file_path = os.path.join(trial.logdir, "test.log2") self.assertTrue(os.path.exists(test_file_path)) os.remove(test_file_path) @@ -1098,7 +1112,7 @@ class TestSyncFunctionality(unittest.TestCase): "upload_dir": "test", "sync_to_driver": tune.function(sync_func), "sync_to_cloud": tune.function(sync_func) - }) + }).trials self.assertEqual(mock_sync.call_count, 0) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 1221c2a53..251a75f9a 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -288,6 +288,7 @@ class Trial(object): Trial._registration_check(trainable_name) # Trial config self.trainable_name = trainable_name + self.trial_id = Trial.generate_id() if trial_id is None else trial_id self.config = config or {} self.local_dir = local_dir # This remains unexpanded for syncing. self.experiment_tag = experiment_tag @@ -334,7 +335,6 @@ class Trial(object): self.runner = None self.result_logger = None self.last_debug = 0 - self.trial_id = Trial.generate_id() if trial_id is None else trial_id self.error_file = None self.num_failures = 0 self.custom_trial_name = None @@ -516,8 +516,7 @@ class Trial(object): or self.max_failures < 0)) def update_last_result(self, result, terminate=False): - if terminate: - result.update(done=True) + result.update(trial_id=self.trial_id, done=terminate) if self.verbose and (terminate or time.time() - self.last_debug > DEBUG_PRINT_INTERVAL): print("Result for {}:".format(self)) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index d35abd6ef..f805c9558 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -180,6 +180,11 @@ class TrialRunner(object): self._checkpoint_period = checkpoint_period self._session_str = datetime.fromtimestamp( self._start_time).strftime("%Y-%m-%d_%H-%M-%S") + self.checkpoint_file = None + if self._local_checkpoint_dir: + self.checkpoint_file = os.path.join( + self._local_checkpoint_dir, + TrialRunner.CKPT_FILE_TMPL.format(self._session_str)) def _validate_resume(self, resume_type): """Checks whether to resume experiment. @@ -260,10 +265,7 @@ class TrialRunner(object): with open(tmp_file_name, "w") as f: json.dump(runner_state, f, indent=2, cls=_TuneFunctionEncoder) - os.rename( - tmp_file_name, - os.path.join(self._local_checkpoint_dir, - TrialRunner.CKPT_FILE_TMPL.format(self._session_str))) + os.rename(tmp_file_name, self.checkpoint_file) self._syncer.sync_up_if_needed() return self._local_checkpoint_dir @@ -277,6 +279,7 @@ class TrialRunner(object): newest_ckpt_path = _find_newest_ckpt(self._local_checkpoint_dir) with open(newest_ckpt_path, "r") as f: runner_state = json.load(f, cls=_TuneFunctionDecoder) + self.checkpoint_file = newest_ckpt_path logger.warning("".join([ "Attempting to resume experiment from {}. ".format( diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index a34981d91..6f1913bd1 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -65,7 +65,7 @@ def run(run_or_experiment, reuse_actors=False, trial_executor=None, raise_on_failed_trial=True, - return_trials=True, + return_trials=False, ray_auto_init=True, sync_function=None): """Executes training. @@ -263,9 +263,10 @@ def run(run_or_experiment, else: logger.error("Trials did not complete: %s", errored_trials) + trials = runner.get_trials() if return_trials: - return runner.get_trials() - return ExperimentAnalysis(experiment.checkpoint_dir) + return trials + return ExperimentAnalysis(runner.checkpoint_file, trials=trials) def run_experiments(experiments, @@ -319,5 +320,6 @@ def run_experiments(experiments, queue_trials=queue_trials, reuse_actors=reuse_actors, trial_executor=trial_executor, - raise_on_failed_trial=raise_on_failed_trial) + raise_on_failed_trial=raise_on_failed_trial, + return_trials=True) return trials