diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index 0164ec2b1..a3c246aba 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -47,7 +47,14 @@ class ExperimentAnalysis(object): >>> experiment_path="~/tune_results/my_exp") """ - def __init__(self, experiment_path): + def __init__(self, experiment_path, trials=None): + """Initializer. + + Args: + experiment_path (str): Path to where experiment is located. + trials (list|None): List of trials that can be accessed via + `analysis.trials`. + """ experiment_path = os.path.expanduser(experiment_path) if not os.path.isdir(experiment_path): raise TuneError( @@ -55,7 +62,8 @@ class ExperimentAnalysis(object): experiment_state_paths = glob.glob( os.path.join(experiment_path, "experiment_state*.json")) if not experiment_state_paths: - raise TuneError("No experiment state found!") + raise TuneError( + "No experiment state found in {}!".format(experiment_path)) experiment_filename = max( list(experiment_state_paths)) # if more than one, pick latest with open(os.path.join(experiment_path, experiment_filename)) as f: @@ -65,10 +73,27 @@ class ExperimentAnalysis(object): raise TuneError("Experiment state invalid; no checkpoints found.") self._checkpoints = self._experiment_state["checkpoints"] self._scrubbed_checkpoints = unnest_checkpoints(self._checkpoints) + self.trials = trials + self._dataframe = None - def dataframe(self): - """Returns a pandas.DataFrame object constructed from the trials.""" - return pd.DataFrame(self._scrubbed_checkpoints) + def get_all_trial_dataframes(self): + trial_dfs = {} + for checkpoint in self._checkpoints: + logdir = checkpoint["logdir"] + progress = max(glob.glob(os.path.join(logdir, "progress.csv"))) + trial_dfs[checkpoint["trial_id"]] = pd.read_csv(progress) + return trial_dfs + + def dataframe(self, refresh=False): + """Returns a pandas.DataFrame object constructed from the trials. + + Args: + refresh (bool): Clears the cache which may have an existing copy. + + """ + if self._dataframe is None or refresh: + self._dataframe = pd.DataFrame(self._scrubbed_checkpoints) + return self._dataframe def stats(self): """Returns a dictionary of the statistics of the experiment.""" @@ -87,22 +112,45 @@ class ExperimentAnalysis(object): return pd.read_csv(progress) raise ValueError("Trial id {} not found".format(trial_id)) - def get_best_trainable(self, metric, trainable_cls): - """Returns the best Trainable based on the experiment metric.""" - return trainable_cls(config=self.get_best_config(metric)) + def get_best_trainable(self, metric, trainable_cls, mode="max"): + """Returns the best Trainable based on the experiment metric. - def get_best_config(self, metric): - """Retrieve the best config from the best trial.""" - return self._get_best_trial(metric)["config"] + Args: + metric (str): Key for trial info to order on. + mode (str): One of [min, max]. - def _get_best_trial(self, metric): - """Retrieve the best trial based on the experiment metric.""" - return max( + """ + return trainable_cls(config=self.get_best_config(metric, mode=mode)) + + def get_best_config(self, metric, mode="max"): + """Retrieve the best config from the best trial. + + Args: + metric (str): Key for trial info to order on. + mode (str): One of [min, max]. + + """ + return self.get_best_info(metric, flatten=False, mode=mode)["config"] + + def get_best_logdir(self, metric, mode="max"): + df = self.dataframe() + if mode == "max": + return df.iloc[df[metric].idxmax()].logdir + elif mode == "min": + return df.iloc[df[metric].idxmin()].logdir + + def get_best_info(self, metric, mode="max", flatten=True): + """Retrieve the best trial based on the experiment metric. + + Args: + metric (str): Key for trial info to order on. + mode (str): One of [min, max]. + flatten (bool): Assumes trial info is flattened, where + nested entries are concatenated like `info:metric`. + """ + optimize_op = max if mode == "max" else min + if flatten: + return optimize_op( + self._scrubbed_checkpoints, key=lambda d: d.get(metric, 0)) + return optimize_op( self._checkpoints, key=lambda d: d["last_result"].get(metric, 0)) - - def _get_sorted_trials(self, metric): - """Retrive trials in sorted order based on the experiment metric.""" - return sorted( - self._checkpoints, - key=lambda d: d["last_result"].get(metric, 0), - reverse=True) diff --git a/python/ray/tune/examples/track_example.py b/python/ray/tune/examples/track_example.py index 1ccec3946..751f0ed44 100644 --- a/python/ray/tune/examples/track_example.py +++ b/python/ray/tune/examples/track_example.py @@ -9,7 +9,7 @@ from keras.models import Sequential from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D) from ray.tune import track -from ray.tune.examples.utils import TuneKerasCallback, get_mnist_data +from ray.tune.examples.utils import TuneReporterCallback, get_mnist_data parser = argparse.ArgumentParser() parser.add_argument( @@ -63,7 +63,7 @@ def train_mnist(args): batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test), - callbacks=[TuneKerasCallback(track.metric)]) + callbacks=[TuneReporterCallback(track.metric)]) track.shutdown() diff --git a/python/ray/tune/examples/tune_mnist_keras.py b/python/ray/tune/examples/tune_mnist_keras.py index 5357d86af..ecd3c34bc 100644 --- a/python/ray/tune/examples/tune_mnist_keras.py +++ b/python/ray/tune/examples/tune_mnist_keras.py @@ -9,8 +9,8 @@ from keras.datasets import mnist from keras.models import Sequential from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D) -from ray.tune.examples.utils import (TuneKerasCallback, get_mnist_data, - set_keras_threads) +from ray.tune.integration.keras import TuneReporterCallback +from ray.tune.examples.utils import get_mnist_data, set_keras_threads parser = argparse.ArgumentParser() parser.add_argument( @@ -52,7 +52,7 @@ def train_mnist(config, reporter): epochs=epochs, verbose=0, validation_data=(x_test, y_test), - callbacks=[TuneKerasCallback(reporter)]) + callbacks=[TuneReporterCallback(reporter)]) if __name__ == "__main__": @@ -63,7 +63,7 @@ if __name__ == "__main__": ray.init() sched = AsyncHyperBandScheduler( - time_attr="timesteps_total", + time_attr="training_iteration", metric="mean_accuracy", mode="max", max_t=400, diff --git a/python/ray/tune/examples/utils.py b/python/ray/tune/examples/utils.py index a5ab1dbdb..f40707a01 100644 --- a/python/ray/tune/examples/utils.py +++ b/python/ray/tune/examples/utils.py @@ -5,24 +5,9 @@ from __future__ import print_function import keras from keras.datasets import mnist from keras import backend as K - - -class TuneKerasCallback(keras.callbacks.Callback): - def __init__(self, reporter, logs={}): - self.reporter = reporter - self.iteration = 0 - super(TuneKerasCallback, self).__init__() - - def on_train_end(self, epoch, logs={}): - self.reporter( - timesteps_total=self.iteration, - done=1, - mean_accuracy=logs.get("acc")) - - def on_batch_end(self, batch, logs={}): - self.iteration += 1 - self.reporter( - timesteps_total=self.iteration, mean_accuracy=logs["acc"]) +from sklearn.datasets import load_iris +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import OneHotEncoder def get_mnist_data(): @@ -53,6 +38,16 @@ def get_mnist_data(): return x_train, y_train, x_test, y_test, input_shape +def get_iris_data(test_size=0.2): + iris_data = load_iris() + x = iris_data.data + y = iris_data.target.reshape(-1, 1) + encoder = OneHotEncoder(sparse=False) + y = encoder.fit_transform(y) + train_x, test_x, train_y, test_y = train_test_split(x, y) + return train_x, train_y, test_x, test_y + + def set_keras_threads(threads): # We set threads here to avoid contention, as Keras # is heavily parallelized across multiple cores. @@ -61,3 +56,8 @@ def set_keras_threads(threads): config=K.tf.ConfigProto( intra_op_parallelism_threads=threads, inter_op_parallelism_threads=threads))) + + +def TuneKerasCallback(*args, **kwargs): + raise DeprecationWarning("TuneKerasCallback is now " + "tune.integration.keras.TuneReporterCallback.") diff --git a/python/ray/tune/experiment.py b/python/ray/tune/experiment.py index 5f3e46aab..95cb12043 100644 --- a/python/ray/tune/experiment.py +++ b/python/ray/tune/experiment.py @@ -176,6 +176,14 @@ class Experiment(object): else: raise TuneError("Improper 'run' - not string nor trainable.") + @property + def local_dir(self): + return self.spec.get("local_dir") + + @property + def checkpoint_dir(self): + return os.path.join(self.spec["local_dir"], self.name) + def convert_to_experiment_list(experiments): """Produces a list of Experiment objects. diff --git a/python/ray/tune/integration/__init__.py b/python/ray/tune/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/tune/integration/keras.py b/python/ray/tune/integration/keras.py new file mode 100644 index 000000000..197a7eef9 --- /dev/null +++ b/python/ray/tune/integration/keras.py @@ -0,0 +1,34 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import keras +from ray.tune import track + + +class TuneReporterCallback(keras.callbacks.Callback): + def __init__(self, reporter=None, freq="batch", logs={}): + self.reporter = reporter or track.log + self.iteration = 0 + if freq not in ["batch", "epoch"]: + raise ValueError("{} not supported as a frequency.".format(freq)) + self.freq = freq + super(TuneReporterCallback, self).__init__() + + def on_batch_end(self, batch, logs={}): + if not self.freq == "batch": + return + self.iteration += 1 + for metric in list(logs): + if "loss" in metric and "neg_" not in metric: + logs["neg_" + metric] = -logs[metric] + self.reporter(keras_info=logs, mean_accuracy=logs["acc"]) + + def on_epoch_end(self, batch, logs={}): + if not self.freq == "epoch": + return + self.iteration += 1 + for metric in list(logs): + if "loss" in metric and "neg_" not in metric: + logs["neg_" + metric] = -logs[metric] + self.reporter(keras_info=logs, mean_accuracy=logs["acc"]) diff --git a/python/ray/tune/schedulers/__init__.py b/python/ray/tune/schedulers/__init__.py index 50bb44743..34655372f 100644 --- a/python/ray/tune/schedulers/__init__.py +++ b/python/ray/tune/schedulers/__init__.py @@ -4,11 +4,13 @@ from __future__ import print_function from ray.tune.schedulers.trial_scheduler import TrialScheduler, FIFOScheduler from ray.tune.schedulers.hyperband import HyperBandScheduler -from ray.tune.schedulers.async_hyperband import AsyncHyperBandScheduler +from ray.tune.schedulers.async_hyperband import (AsyncHyperBandScheduler, + ASHAScheduler) from ray.tune.schedulers.median_stopping_rule import MedianStoppingRule from ray.tune.schedulers.pbt import PopulationBasedTraining __all__ = [ "TrialScheduler", "HyperBandScheduler", "AsyncHyperBandScheduler", - "MedianStoppingRule", "FIFOScheduler", "PopulationBasedTraining" + "ASHAScheduler", "MedianStoppingRule", "FIFOScheduler", + "PopulationBasedTraining" ] diff --git a/python/ray/tune/schedulers/async_hyperband.py b/python/ray/tune/schedulers/async_hyperband.py index 487eb350e..0370d03d3 100644 --- a/python/ray/tune/schedulers/async_hyperband.py +++ b/python/ray/tune/schedulers/async_hyperband.py @@ -168,6 +168,8 @@ class _Bracket(): return "Bracket: " + iters +ASHAScheduler = AsyncHyperBandScheduler + if __name__ == "__main__": sched = AsyncHyperBandScheduler( grace_period=1, max_t=10, reduction_factor=2) diff --git a/python/ray/tune/tests/test_experiment_analysis.py b/python/ray/tune/tests/test_experiment_analysis.py index a0721abc5..7b613a6fd 100644 --- a/python/ray/tune/tests/test_experiment_analysis.py +++ b/python/ray/tune/tests/test_experiment_analysis.py @@ -11,9 +11,7 @@ import pandas as pd import ray from ray.tune import run, sample_from -from ray.tune.analysis import ExperimentAnalysis from ray.tune.examples.async_hyperband_example import MyTrainableClass -from ray.tune.schedulers import AsyncHyperBandScheduler class ExperimentAnalysisSuite(unittest.TestCase): @@ -27,35 +25,22 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.test_path = os.path.join(self.test_dir, self.test_name) self.run_test_exp() - self.ea = ExperimentAnalysis(self.test_path) - def tearDown(self): shutil.rmtree(self.test_dir, ignore_errors=True) ray.shutdown() def run_test_exp(self): - ahb = AsyncHyperBandScheduler( - time_attr="training_iteration", - metric=self.metric, - mode="max", - grace_period=5, - max_t=100) - - run(MyTrainableClass, + self.ea = run( + MyTrainableClass, name=self.test_name, - scheduler=ahb, local_dir=self.test_dir, - **{ - "stop": { - "training_iteration": 1 - }, - "num_samples": 10, - "config": { - "width": sample_from( - lambda spec: 10 + int(90 * random.random())), - "height": sample_from( - lambda spec: int(100 * random.random())), - }, + return_trials=False, + stop={"training_iteration": 1}, + num_samples=self.num_samples, + config={ + "width": sample_from( + lambda spec: 10 + int(90 * random.random())), + "height": sample_from(lambda spec: int(100 * random.random())), }) def testDataframe(self): @@ -87,7 +72,7 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.assertTrue("height" in best_config) def testBestTrial(self): - best_trial = self.ea._get_best_trial(self.metric) + best_trial = self.ea.get_best_info(self.metric, flatten=False) self.assertTrue(isinstance(best_trial, dict)) self.assertTrue("local_dir" in best_trial) @@ -99,6 +84,18 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.assertTrue("last_result" in best_trial) self.assertTrue(self.metric in best_trial["last_result"]) + min_trial = self.ea.get_best_info( + self.metric, mode="min", flatten=False) + + self.assertTrue(isinstance(min_trial, dict)) + self.assertLess(min_trial["last_result"][self.metric], + best_trial["last_result"][self.metric]) + + flat_trial = self.ea.get_best_info(self.metric, flatten=True) + + self.assertTrue(isinstance(min_trial, dict)) + self.assertTrue(self.metric in flat_trial) + def testCheckpoints(self): checkpoints = self.ea._checkpoints @@ -121,6 +118,21 @@ class ExperimentAnalysisSuite(unittest.TestCase): self.assertEqual(runner_data["_metadata_checkpoint_dir"], os.path.expanduser(self.test_path)) + def testBestLogdir(self): + logdir = self.ea.get_best_logdir(self.metric) + self.assertTrue(logdir.startswith(self.test_path)) + logdir2 = self.ea.get_best_logdir(self.metric, mode="min") + self.assertTrue(logdir2.startswith(self.test_path)) + self.assertNotEquals(logdir, logdir2) + + def testAllDataframes(self): + dataframes = self.ea.get_all_trial_dataframes() + self.assertTrue(len(dataframes) == self.num_samples) + + self.assertTrue(isinstance(dataframes, dict)) + for df in dataframes.values(): + self.assertEqual(df.training_iteration.max(), 1) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index 37022ceab..64b8e9761 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -441,6 +441,14 @@ class TrainableFunctionApiTest(unittest.TestCase): self.assertRaises(TuneError, f) + def testNestedStoppingReturn(self): + def train(config, reporter): + for i in range(10): + reporter(test={"test1": {"test2": i}}) + + [trial] = tune.run(train, stop={"test": {"test1": {"test2": 6}}}) + self.assertEqual(trial.last_result["training_iteration"], 7) + def testEarlyReturn(self): def train(config, reporter): reporter(timesteps_total=100, done=True) diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index cb9351f9a..1a44575c7 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -181,6 +181,21 @@ def has_trainable(trainable_name): ray.tune.registry.TRAINABLE_CLASS, trainable_name) +def recursive_criteria_check(result, criteria): + for criteria, stop_value in criteria.items(): + if criteria not in result: + raise TuneError( + "Stopping criteria {} not provided in result {}.".format( + criteria, result)) + elif isinstance(result[criteria], dict) and isinstance( + stop_value, dict): + if recursive_criteria_check(result[criteria], stop_value): + return True + elif result[criteria] >= stop_value: + return True + return False + + class Checkpoint(object): """Describes a checkpoint of trial state. @@ -425,15 +440,7 @@ class Trial(object): if result.get(DONE): return True - for criteria, stop_value in self.stopping_criterion.items(): - if criteria not in result: - raise TuneError( - "Stopping criteria {} not provided in result {}.".format( - criteria, result)) - if result[criteria] >= stop_value: - return True - - return False + return recursive_criteria_check(result, self.stopping_criterion) def should_checkpoint(self): """Whether this trial is due for checkpointing.""" diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 0d84b6651..db302f6bd 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -4,11 +4,11 @@ from __future__ import print_function import click import logging -import os import time from ray.tune.error import TuneError from ray.tune.experiment import convert_to_experiment_list, Experiment +from ray.tune.analysis import ExperimentAnalysis from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, DEBUG_PRINT_INTERVAL from ray.tune.ray_trial_executor import RayTrialExecutor @@ -39,7 +39,7 @@ def _make_scheduler(args): def _find_checkpoint_dir(exp): # TODO(rliaw): Make sure the checkpoint_dir is resolved earlier. # Right now it is resolved somewhere far down the trial generation process - return os.path.join(exp.spec["local_dir"], exp.name) + return exp.checkpoint_dir def _prompt_restore(checkpoint_dir, resume): @@ -89,9 +89,10 @@ def run(run_or_experiment, verbose=2, resume=False, queue_trials=False, - reuse_actors=False, + reuse_actors=True, trial_executor=None, raise_on_failed_trial=True, + return_trials=True, ray_auto_init=True): """Executes training. @@ -273,7 +274,9 @@ def run(run_or_experiment, else: logger.error("Trials did not complete: %s", errored_trials) - return runner.get_trials() + if return_trials: + return runner.get_trials() + return ExperimentAnalysis(experiment.checkpoint_dir) def run_experiments(experiments,