[tune] Tutorial UX Changes (#4990)

* add integration, iris, ASHA, recursive changes, set reuse_actors=True, and enable Analysis as a return object

* docstring

* fix up example

* fix

* cleanup tests

* experiment analysis
This commit is contained in:
Richard Liaw
2019-06-21 12:59:49 +08:00
committed by GitHub
parent 1d17125333
commit 31b6da12f9
13 changed files with 209 additions and 85 deletions
+69 -21
View File
@@ -47,7 +47,14 @@ class ExperimentAnalysis(object):
>>> experiment_path="~/tune_results/my_exp")
"""
def __init__(self, experiment_path):
def __init__(self, experiment_path, trials=None):
"""Initializer.
Args:
experiment_path (str): Path to where experiment is located.
trials (list|None): List of trials that can be accessed via
`analysis.trials`.
"""
experiment_path = os.path.expanduser(experiment_path)
if not os.path.isdir(experiment_path):
raise TuneError(
@@ -55,7 +62,8 @@ class ExperimentAnalysis(object):
experiment_state_paths = glob.glob(
os.path.join(experiment_path, "experiment_state*.json"))
if not experiment_state_paths:
raise TuneError("No experiment state found!")
raise TuneError(
"No experiment state found in {}!".format(experiment_path))
experiment_filename = max(
list(experiment_state_paths)) # if more than one, pick latest
with open(os.path.join(experiment_path, experiment_filename)) as f:
@@ -65,10 +73,27 @@ class ExperimentAnalysis(object):
raise TuneError("Experiment state invalid; no checkpoints found.")
self._checkpoints = self._experiment_state["checkpoints"]
self._scrubbed_checkpoints = unnest_checkpoints(self._checkpoints)
self.trials = trials
self._dataframe = None
def dataframe(self):
"""Returns a pandas.DataFrame object constructed from the trials."""
return pd.DataFrame(self._scrubbed_checkpoints)
def get_all_trial_dataframes(self):
trial_dfs = {}
for checkpoint in self._checkpoints:
logdir = checkpoint["logdir"]
progress = max(glob.glob(os.path.join(logdir, "progress.csv")))
trial_dfs[checkpoint["trial_id"]] = pd.read_csv(progress)
return trial_dfs
def dataframe(self, refresh=False):
"""Returns a pandas.DataFrame object constructed from the trials.
Args:
refresh (bool): Clears the cache which may have an existing copy.
"""
if self._dataframe is None or refresh:
self._dataframe = pd.DataFrame(self._scrubbed_checkpoints)
return self._dataframe
def stats(self):
"""Returns a dictionary of the statistics of the experiment."""
@@ -87,22 +112,45 @@ class ExperimentAnalysis(object):
return pd.read_csv(progress)
raise ValueError("Trial id {} not found".format(trial_id))
def get_best_trainable(self, metric, trainable_cls):
"""Returns the best Trainable based on the experiment metric."""
return trainable_cls(config=self.get_best_config(metric))
def get_best_trainable(self, metric, trainable_cls, mode="max"):
"""Returns the best Trainable based on the experiment metric.
def get_best_config(self, metric):
"""Retrieve the best config from the best trial."""
return self._get_best_trial(metric)["config"]
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
def _get_best_trial(self, metric):
"""Retrieve the best trial based on the experiment metric."""
return max(
"""
return trainable_cls(config=self.get_best_config(metric, mode=mode))
def get_best_config(self, metric, mode="max"):
"""Retrieve the best config from the best trial.
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
"""
return self.get_best_info(metric, flatten=False, mode=mode)["config"]
def get_best_logdir(self, metric, mode="max"):
df = self.dataframe()
if mode == "max":
return df.iloc[df[metric].idxmax()].logdir
elif mode == "min":
return df.iloc[df[metric].idxmin()].logdir
def get_best_info(self, metric, mode="max", flatten=True):
"""Retrieve the best trial based on the experiment metric.
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
flatten (bool): Assumes trial info is flattened, where
nested entries are concatenated like `info:metric`.
"""
optimize_op = max if mode == "max" else min
if flatten:
return optimize_op(
self._scrubbed_checkpoints, key=lambda d: d.get(metric, 0))
return optimize_op(
self._checkpoints, key=lambda d: d["last_result"].get(metric, 0))
def _get_sorted_trials(self, metric):
"""Retrive trials in sorted order based on the experiment metric."""
return sorted(
self._checkpoints,
key=lambda d: d["last_result"].get(metric, 0),
reverse=True)
+2 -2
View File
@@ -9,7 +9,7 @@ from keras.models import Sequential
from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D)
from ray.tune import track
from ray.tune.examples.utils import TuneKerasCallback, get_mnist_data
from ray.tune.examples.utils import TuneReporterCallback, get_mnist_data
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -63,7 +63,7 @@ def train_mnist(args):
batch_size=batch_size,
epochs=epochs,
validation_data=(x_test, y_test),
callbacks=[TuneKerasCallback(track.metric)])
callbacks=[TuneReporterCallback(track.metric)])
track.shutdown()
+4 -4
View File
@@ -9,8 +9,8 @@ from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import (Dense, Dropout, Flatten, Conv2D, MaxPooling2D)
from ray.tune.examples.utils import (TuneKerasCallback, get_mnist_data,
set_keras_threads)
from ray.tune.integration.keras import TuneReporterCallback
from ray.tune.examples.utils import get_mnist_data, set_keras_threads
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -52,7 +52,7 @@ def train_mnist(config, reporter):
epochs=epochs,
verbose=0,
validation_data=(x_test, y_test),
callbacks=[TuneKerasCallback(reporter)])
callbacks=[TuneReporterCallback(reporter)])
if __name__ == "__main__":
@@ -63,7 +63,7 @@ if __name__ == "__main__":
ray.init()
sched = AsyncHyperBandScheduler(
time_attr="timesteps_total",
time_attr="training_iteration",
metric="mean_accuracy",
mode="max",
max_t=400,
+18 -18
View File
@@ -5,24 +5,9 @@ from __future__ import print_function
import keras
from keras.datasets import mnist
from keras import backend as K
class TuneKerasCallback(keras.callbacks.Callback):
def __init__(self, reporter, logs={}):
self.reporter = reporter
self.iteration = 0
super(TuneKerasCallback, self).__init__()
def on_train_end(self, epoch, logs={}):
self.reporter(
timesteps_total=self.iteration,
done=1,
mean_accuracy=logs.get("acc"))
def on_batch_end(self, batch, logs={}):
self.iteration += 1
self.reporter(
timesteps_total=self.iteration, mean_accuracy=logs["acc"])
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
def get_mnist_data():
@@ -53,6 +38,16 @@ def get_mnist_data():
return x_train, y_train, x_test, y_test, input_shape
def get_iris_data(test_size=0.2):
iris_data = load_iris()
x = iris_data.data
y = iris_data.target.reshape(-1, 1)
encoder = OneHotEncoder(sparse=False)
y = encoder.fit_transform(y)
train_x, test_x, train_y, test_y = train_test_split(x, y)
return train_x, train_y, test_x, test_y
def set_keras_threads(threads):
# We set threads here to avoid contention, as Keras
# is heavily parallelized across multiple cores.
@@ -61,3 +56,8 @@ def set_keras_threads(threads):
config=K.tf.ConfigProto(
intra_op_parallelism_threads=threads,
inter_op_parallelism_threads=threads)))
def TuneKerasCallback(*args, **kwargs):
raise DeprecationWarning("TuneKerasCallback is now "
"tune.integration.keras.TuneReporterCallback.")
+8
View File
@@ -176,6 +176,14 @@ class Experiment(object):
else:
raise TuneError("Improper 'run' - not string nor trainable.")
@property
def local_dir(self):
return self.spec.get("local_dir")
@property
def checkpoint_dir(self):
return os.path.join(self.spec["local_dir"], self.name)
def convert_to_experiment_list(experiments):
"""Produces a list of Experiment objects.
+34
View File
@@ -0,0 +1,34 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import keras
from ray.tune import track
class TuneReporterCallback(keras.callbacks.Callback):
def __init__(self, reporter=None, freq="batch", logs={}):
self.reporter = reporter or track.log
self.iteration = 0
if freq not in ["batch", "epoch"]:
raise ValueError("{} not supported as a frequency.".format(freq))
self.freq = freq
super(TuneReporterCallback, self).__init__()
def on_batch_end(self, batch, logs={}):
if not self.freq == "batch":
return
self.iteration += 1
for metric in list(logs):
if "loss" in metric and "neg_" not in metric:
logs["neg_" + metric] = -logs[metric]
self.reporter(keras_info=logs, mean_accuracy=logs["acc"])
def on_epoch_end(self, batch, logs={}):
if not self.freq == "epoch":
return
self.iteration += 1
for metric in list(logs):
if "loss" in metric and "neg_" not in metric:
logs["neg_" + metric] = -logs[metric]
self.reporter(keras_info=logs, mean_accuracy=logs["acc"])
+4 -2
View File
@@ -4,11 +4,13 @@ from __future__ import print_function
from ray.tune.schedulers.trial_scheduler import TrialScheduler, FIFOScheduler
from ray.tune.schedulers.hyperband import HyperBandScheduler
from ray.tune.schedulers.async_hyperband import AsyncHyperBandScheduler
from ray.tune.schedulers.async_hyperband import (AsyncHyperBandScheduler,
ASHAScheduler)
from ray.tune.schedulers.median_stopping_rule import MedianStoppingRule
from ray.tune.schedulers.pbt import PopulationBasedTraining
__all__ = [
"TrialScheduler", "HyperBandScheduler", "AsyncHyperBandScheduler",
"MedianStoppingRule", "FIFOScheduler", "PopulationBasedTraining"
"ASHAScheduler", "MedianStoppingRule", "FIFOScheduler",
"PopulationBasedTraining"
]
@@ -168,6 +168,8 @@ class _Bracket():
return "Bracket: " + iters
ASHAScheduler = AsyncHyperBandScheduler
if __name__ == "__main__":
sched = AsyncHyperBandScheduler(
grace_period=1, max_t=10, reduction_factor=2)
@@ -11,9 +11,7 @@ import pandas as pd
import ray
from ray.tune import run, sample_from
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.examples.async_hyperband_example import MyTrainableClass
from ray.tune.schedulers import AsyncHyperBandScheduler
class ExperimentAnalysisSuite(unittest.TestCase):
@@ -27,35 +25,22 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.test_path = os.path.join(self.test_dir, self.test_name)
self.run_test_exp()
self.ea = ExperimentAnalysis(self.test_path)
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
ray.shutdown()
def run_test_exp(self):
ahb = AsyncHyperBandScheduler(
time_attr="training_iteration",
metric=self.metric,
mode="max",
grace_period=5,
max_t=100)
run(MyTrainableClass,
self.ea = run(
MyTrainableClass,
name=self.test_name,
scheduler=ahb,
local_dir=self.test_dir,
**{
"stop": {
"training_iteration": 1
},
"num_samples": 10,
"config": {
"width": sample_from(
lambda spec: 10 + int(90 * random.random())),
"height": sample_from(
lambda spec: int(100 * random.random())),
},
return_trials=False,
stop={"training_iteration": 1},
num_samples=self.num_samples,
config={
"width": sample_from(
lambda spec: 10 + int(90 * random.random())),
"height": sample_from(lambda spec: int(100 * random.random())),
})
def testDataframe(self):
@@ -87,7 +72,7 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertTrue("height" in best_config)
def testBestTrial(self):
best_trial = self.ea._get_best_trial(self.metric)
best_trial = self.ea.get_best_info(self.metric, flatten=False)
self.assertTrue(isinstance(best_trial, dict))
self.assertTrue("local_dir" in best_trial)
@@ -99,6 +84,18 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertTrue("last_result" in best_trial)
self.assertTrue(self.metric in best_trial["last_result"])
min_trial = self.ea.get_best_info(
self.metric, mode="min", flatten=False)
self.assertTrue(isinstance(min_trial, dict))
self.assertLess(min_trial["last_result"][self.metric],
best_trial["last_result"][self.metric])
flat_trial = self.ea.get_best_info(self.metric, flatten=True)
self.assertTrue(isinstance(min_trial, dict))
self.assertTrue(self.metric in flat_trial)
def testCheckpoints(self):
checkpoints = self.ea._checkpoints
@@ -121,6 +118,21 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertEqual(runner_data["_metadata_checkpoint_dir"],
os.path.expanduser(self.test_path))
def testBestLogdir(self):
logdir = self.ea.get_best_logdir(self.metric)
self.assertTrue(logdir.startswith(self.test_path))
logdir2 = self.ea.get_best_logdir(self.metric, mode="min")
self.assertTrue(logdir2.startswith(self.test_path))
self.assertNotEquals(logdir, logdir2)
def testAllDataframes(self):
dataframes = self.ea.get_all_trial_dataframes()
self.assertTrue(len(dataframes) == self.num_samples)
self.assertTrue(isinstance(dataframes, dict))
for df in dataframes.values():
self.assertEqual(df.training_iteration.max(), 1)
if __name__ == "__main__":
unittest.main(verbosity=2)
@@ -441,6 +441,14 @@ class TrainableFunctionApiTest(unittest.TestCase):
self.assertRaises(TuneError, f)
def testNestedStoppingReturn(self):
def train(config, reporter):
for i in range(10):
reporter(test={"test1": {"test2": i}})
[trial] = tune.run(train, stop={"test": {"test1": {"test2": 6}}})
self.assertEqual(trial.last_result["training_iteration"], 7)
def testEarlyReturn(self):
def train(config, reporter):
reporter(timesteps_total=100, done=True)
+16 -9
View File
@@ -181,6 +181,21 @@ def has_trainable(trainable_name):
ray.tune.registry.TRAINABLE_CLASS, trainable_name)
def recursive_criteria_check(result, criteria):
for criteria, stop_value in criteria.items():
if criteria not in result:
raise TuneError(
"Stopping criteria {} not provided in result {}.".format(
criteria, result))
elif isinstance(result[criteria], dict) and isinstance(
stop_value, dict):
if recursive_criteria_check(result[criteria], stop_value):
return True
elif result[criteria] >= stop_value:
return True
return False
class Checkpoint(object):
"""Describes a checkpoint of trial state.
@@ -425,15 +440,7 @@ class Trial(object):
if result.get(DONE):
return True
for criteria, stop_value in self.stopping_criterion.items():
if criteria not in result:
raise TuneError(
"Stopping criteria {} not provided in result {}.".format(
criteria, result))
if result[criteria] >= stop_value:
return True
return False
return recursive_criteria_check(result, self.stopping_criterion)
def should_checkpoint(self):
"""Whether this trial is due for checkpointing."""
+7 -4
View File
@@ -4,11 +4,11 @@ from __future__ import print_function
import click
import logging
import os
import time
from ray.tune.error import TuneError
from ray.tune.experiment import convert_to_experiment_list, Experiment
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.trial import Trial, DEBUG_PRINT_INTERVAL
from ray.tune.ray_trial_executor import RayTrialExecutor
@@ -39,7 +39,7 @@ def _make_scheduler(args):
def _find_checkpoint_dir(exp):
# TODO(rliaw): Make sure the checkpoint_dir is resolved earlier.
# Right now it is resolved somewhere far down the trial generation process
return os.path.join(exp.spec["local_dir"], exp.name)
return exp.checkpoint_dir
def _prompt_restore(checkpoint_dir, resume):
@@ -89,9 +89,10 @@ def run(run_or_experiment,
verbose=2,
resume=False,
queue_trials=False,
reuse_actors=False,
reuse_actors=True,
trial_executor=None,
raise_on_failed_trial=True,
return_trials=True,
ray_auto_init=True):
"""Executes training.
@@ -273,7 +274,9 @@ def run(run_or_experiment,
else:
logger.error("Trials did not complete: %s", errored_trials)
return runner.get_trials()
if return_trials:
return runner.get_trials()
return ExperimentAnalysis(experiment.checkpoint_dir)
def run_experiments(experiments,