[tune] experiment_analysis split to Analysis (#5115)

This commit is contained in:
Richard Liaw
2019-07-27 01:10:52 -07:00
committed by GitHub
parent 7e715520e5
commit 5e15b36d6e
14 changed files with 302 additions and 239 deletions
@@ -75,7 +75,7 @@ if __name__ == "__main__":
"on_postprocess_traj": tune.function(on_postprocess_traj),
},
},
)
return_trials=True)
# verify custom metrics for integration tests
custom_metrics = trials[0].last_result["custom_metrics"]
+3 -1
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
from ray.tune.error import TuneError
from ray.tune.tune import run_experiments, run
from ray.tune.experiment import Experiment
from ray.tune.analysis import ExperimentAnalysis, Analysis
from ray.tune.registry import register_env, register_trainable
from ray.tune.trainable import Trainable
from ray.tune.suggest import grid_search
@@ -14,5 +15,6 @@ from ray.tune.sample import (function, sample_from, uniform, choice, randint,
__all__ = [
"Trainable", "TuneError", "grid_search", "register_env",
"register_trainable", "run", "run_experiments", "Experiment", "function",
"sample_from", "track", "uniform", "choice", "randint", "randn"
"sample_from", "track", "uniform", "choice", "randint", "randn",
"ExperimentAnalysis", "Analysis"
]
+2 -2
View File
@@ -2,6 +2,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.tune.analysis.experiment_analysis import ExperimentAnalysis
from ray.tune.analysis.experiment_analysis import ExperimentAnalysis, Analysis
__all__ = ["ExperimentAnalysis"]
__all__ = ["ExperimentAnalysis", "Analysis"]
+134 -93
View File
@@ -3,13 +3,13 @@ from __future__ import division
from __future__ import print_function
import copy
import glob
import json
import logging
import os
import pandas as pd
from ray.tune.error import TuneError
from ray.tune.result import EXPR_PROGRESS_FILE, EXPR_PARAM_FILE
from ray.tune.util import flatten_dict
logger = logging.getLogger(__name__)
@@ -34,20 +34,134 @@ def unnest_checkpoints(checkpoints):
return checkpoint_dicts
class ExperimentAnalysis(object):
class Analysis(object):
"""Analyze all results from a directory of experiments."""
def __init__(self, experiment_dir):
experiment_dir = os.path.expanduser(experiment_dir)
if not os.path.isdir(experiment_dir):
raise ValueError(
"{} is not a valid directory.".format(experiment_dir))
self._experiment_dir = experiment_dir
self._configs = {}
self._trial_dataframes = {}
self.fetch_trial_dataframes()
def fetch_trial_dataframes(self):
fail_count = 0
for path in self._get_trial_paths():
try:
self.trial_dataframes[path] = pd.read_csv(
os.path.join(path, EXPR_PROGRESS_FILE))
except Exception:
fail_count += 1
if fail_count:
logger.debug(
"Couldn't read results from {} paths".format(fail_count))
return self.trial_dataframes
def get_all_configs(self, prefix=False):
fail_count = 0
for path in self._get_trial_paths():
try:
with open(os.path.join(path, EXPR_PARAM_FILE)) as f:
config = json.load(f)
if prefix:
for k in list(config):
config["config:" + k] = config.pop(k)
self._configs[path] = config
except Exception:
fail_count += 1
if fail_count:
logger.warning(
"Couldn't read config from {} paths".format(fail_count))
return self._configs
def dataframe(self, metric=None, mode=None):
"""Returns a pandas.DataFrame object constructed from the trials.
Args:
metric (str): Key for trial info to order on.
If None, uses last result.
mode (str): One of [min, max].
"""
rows = self._retrieve_rows(metric=metric, mode=mode)
all_configs = self.get_all_configs(prefix=True)
for path, config in all_configs.items():
if path in rows:
rows[path].update(config)
rows[path].update(logdir=path)
return pd.DataFrame(list(rows.values()))
def get_best_config(self, metric, mode="max"):
"""Retrieve the best config corresponding to the trial.
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
"""
rows = self._retrieve_rows(metric=metric, mode=mode)
all_configs = self.get_all_configs()
compare_op = max if mode == "max" else min
best_path = compare_op(rows, key=lambda k: rows[k][metric])
return all_configs[best_path]
def _retrieve_rows(self, metric=None, mode=None):
assert mode is None or mode in ["max", "min"]
rows = {}
for path, df in self.trial_dataframes.items():
if mode == "max":
idx = df[metric].idxmax()
elif mode == "min":
idx = df[metric].idxmin()
else:
idx = -1
rows[path] = df.iloc[idx].to_dict()
return rows
def _get_trial_paths(self):
_trial_paths = []
for trial_path, _, files in os.walk(self._experiment_dir):
if EXPR_PROGRESS_FILE in files:
_trial_paths += [trial_path]
if not _trial_paths:
raise TuneError("No trials found in {}.".format(
self._experiment_dir))
return _trial_paths
def get_best_logdir(self, metric, mode="max"):
df = self.dataframe()
if mode == "max":
return df.iloc[df[metric].idxmax()].logdir
elif mode == "min":
return df.iloc[df[metric].idxmin()].logdir
@property
def trial_dataframes(self):
return self._trial_dataframes
class ExperimentAnalysis(Analysis):
"""Analyze results from a Tune experiment.
Parameters:
experiment_path (str): Path to where experiment is located.
Corresponds to Experiment.local_dir/Experiment.name
experiment_checkpoint_path (str): Path to a json file
representing an experiment state. Corresponds to
Experiment.local_dir/Experiment.name/experiment_state.json
Example:
>>> tune.run(my_trainable, name="my_exp", local_dir="~/tune_results")
>>> analysis = ExperimentAnalysis(
>>> experiment_path="~/tune_results/my_exp")
>>> experiment_checkpoint_path="~/tune_results/my_exp/state.json")
"""
def __init__(self, experiment_path, trials=None):
def __init__(self, experiment_checkpoint_path, trials=None):
"""Initializer.
Args:
@@ -55,45 +169,15 @@ class ExperimentAnalysis(object):
trials (list|None): List of trials that can be accessed via
`analysis.trials`.
"""
experiment_path = os.path.expanduser(experiment_path)
if not os.path.isdir(experiment_path):
raise TuneError(
"{} is not a valid directory.".format(experiment_path))
experiment_state_paths = glob.glob(
os.path.join(experiment_path, "experiment_state*.json"))
if not experiment_state_paths:
raise TuneError(
"No experiment state found in {}!".format(experiment_path))
experiment_filename = max(
list(experiment_state_paths)) # if more than one, pick latest
with open(experiment_filename) as f:
self._experiment_state = json.load(f)
with open(experiment_checkpoint_path) as f:
_experiment_state = json.load(f)
if "checkpoints" not in self._experiment_state:
if "checkpoints" not in _experiment_state:
raise TuneError("Experiment state invalid; no checkpoints found.")
self._checkpoints = self._experiment_state["checkpoints"]
self._scrubbed_checkpoints = unnest_checkpoints(self._checkpoints)
self._checkpoints = _experiment_state["checkpoints"]
self.trials = trials
self._dataframe = None
def get_all_trial_dataframes(self):
trial_dfs = {}
for checkpoint in self._checkpoints:
logdir = checkpoint["logdir"]
progress = max(glob.glob(os.path.join(logdir, "progress.csv")))
trial_dfs[checkpoint["trial_id"]] = pd.read_csv(progress)
return trial_dfs
def dataframe(self, refresh=False):
"""Returns a pandas.DataFrame object constructed from the trials.
Args:
refresh (bool): Clears the cache which may have an existing copy.
"""
if self._dataframe is None or refresh:
self._dataframe = pd.DataFrame(self._scrubbed_checkpoints)
return self._dataframe
super(ExperimentAnalysis, self).__init__(
os.path.dirname(experiment_checkpoint_path))
def stats(self):
"""Returns a dictionary of the statistics of the experiment."""
@@ -103,54 +187,11 @@ class ExperimentAnalysis(object):
"""Returns a dictionary of the TrialRunner data."""
return self._experiment_state.get("runner_data")
def trial_dataframe(self, trial_id):
"""Returns a pandas.DataFrame constructed from one trial."""
for checkpoint in self._checkpoints:
if checkpoint["trial_id"] == trial_id:
logdir = checkpoint["logdir"]
progress = max(glob.glob(os.path.join(logdir, "progress.csv")))
return pd.read_csv(progress)
raise ValueError("Trial id {} not found".format(trial_id))
def get_best_trainable(self, metric, trainable_cls, mode="max"):
"""Returns the best Trainable based on the experiment metric.
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
"""
return trainable_cls(config=self.get_best_config(metric, mode=mode))
def get_best_config(self, metric, mode="max"):
"""Retrieve the best config from the best trial.
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
"""
return self.get_best_info(metric, flatten=False, mode=mode)["config"]
def get_best_logdir(self, metric, mode="max"):
df = self.dataframe()
if mode == "max":
return df.iloc[df[metric].idxmax()].logdir
elif mode == "min":
return df.iloc[df[metric].idxmin()].logdir
def get_best_info(self, metric, mode="max", flatten=True):
"""Retrieve the best trial based on the experiment metric.
Args:
metric (str): Key for trial info to order on.
mode (str): One of [min, max].
flatten (bool): Assumes trial info is flattened, where
nested entries are concatenated like `info:metric`.
"""
optimize_op = max if mode == "max" else min
if flatten:
return optimize_op(
self._scrubbed_checkpoints, key=lambda d: d.get(metric, 0))
return optimize_op(
self._checkpoints, key=lambda d: d["last_result"].get(metric, 0))
def _get_trial_paths(self):
"""Overwrites Analysis to only have trials of one experiment."""
_trial_paths = [
checkpoint["logdir"] for checkpoint in self._checkpoints
]
if not _trial_paths:
raise TuneError("No trials found.")
return _trial_paths
+33 -42
View File
@@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import logging
import glob
import os
import sys
import subprocess
@@ -11,9 +12,9 @@ from datetime import datetime
import pandas as pd
from pandas.api.types import is_string_dtype, is_numeric_dtype
from ray.tune.result import TRAINING_ITERATION, MEAN_ACCURACY, MEAN_LOSS
from ray.tune.trial import Trial
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.result import (TRAINING_ITERATION, MEAN_ACCURACY, MEAN_LOSS,
TIME_TOTAL_S, TRIAL_ID)
from ray.tune.analysis import Analysis
from ray.tune import TuneError
try:
from tabulate import tabulate
@@ -26,9 +27,9 @@ EDITOR = os.getenv("EDITOR", "vim")
TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S (%A)"
DEFAULT_EXPERIMENT_INFO_KEYS = ("trainable_name", "experiment_tag", "trial_id",
"status", "last_update_time",
TRAINING_ITERATION, MEAN_ACCURACY, MEAN_LOSS)
DEFAULT_EXPERIMENT_INFO_KEYS = ("trainable_name", "experiment_tag",
TRAINING_ITERATION, TIME_TOTAL_S,
MEAN_ACCURACY, MEAN_LOSS, TRIAL_ID)
DEFAULT_PROJECT_INFO_KEYS = (
"name",
@@ -62,6 +63,20 @@ def _check_tabulate():
"Tabulate not installed. Please run `pip install tabulate`.")
def get_most_recent_state(experiment_path):
experiment_path = os.path.expanduser(experiment_path)
if not os.path.isdir(experiment_path):
raise TuneError("{} is not a valid directory.".format(experiment_path))
experiment_state_paths = glob.glob(
os.path.join(experiment_path, "experiment_state*.json"))
if not experiment_state_paths:
raise TuneError(
"No experiment state found in {}!".format(experiment_path))
experiment_filename = max(
list(experiment_state_paths)) # if more than one, pick latest
return experiment_filename
def print_format_output(dataframe):
"""Prints output of given dataframe to fit into terminal.
@@ -112,7 +127,7 @@ def list_trials(experiment_path,
Args:
experiment_path (str): Directory where trials are located.
Corresponds to Experiment.local_dir/Experiment.name.
Like Experiment.local_dir/Experiment.name/experiment*.json.
sort (list): Keys to sort by.
output (str): Name of file where output is saved.
filter_op (str): Filter operation in the format
@@ -124,14 +139,17 @@ def list_trials(experiment_path,
_check_tabulate()
try:
checkpoints_df = ExperimentAnalysis(experiment_path).dataframe()
checkpoints_df = Analysis(experiment_path).dataframe()
except TuneError:
print("No experiment state found!")
sys.exit(0)
sys.exit(1)
if not info_keys:
info_keys = DEFAULT_EXPERIMENT_INFO_KEYS
col_keys = [k for k in list(info_keys) if k in checkpoints_df]
col_keys = [
k for k in checkpoints_df.columns
if k in info_keys or k.startswith("config:")
]
checkpoints_df = checkpoints_df[col_keys]
if "last_update_time" in checkpoints_df:
@@ -143,7 +161,7 @@ def list_trials(experiment_path,
checkpoints_df["last_update_time"] = datetime_series
if "logdir" in checkpoints_df:
# logdir often too verbose to view in table, so drop experiment_path
# logdir often too long to view in table, so drop experiment_path
checkpoints_df["logdir"] = checkpoints_df["logdir"].str.replace(
experiment_path, "")
@@ -213,38 +231,11 @@ def list_experiments(project_path,
experiment_data_collection = []
for experiment_dir in experiment_folders:
analysis_obj, checkpoints_df = None, None
try:
analysis_obj = ExperimentAnalysis(
os.path.join(project_path, experiment_dir))
checkpoints_df = analysis_obj.dataframe()
except TuneError:
logger.debug("No experiment state found in %s", experiment_dir)
continue
num_trials = sum(
"result.json" in files
for _, _, files in os.walk(os.path.join(base, experiment_dir)))
# Format time-based values.
stats = analysis_obj.stats()
time_values = {
"start_time": stats.get("_start_time"),
"last_updated": stats.get("timestamp"),
}
formatted_time_values = {
key: datetime.fromtimestamp(val).strftime(TIMESTAMP_FORMAT)
if val else None
for key, val in time_values.items()
}
experiment_data = {
"name": experiment_dir,
"total_trials": checkpoints_df.shape[0],
"running_trials": (
checkpoints_df["status"] == Trial.RUNNING).sum(),
"terminated_trials": (
checkpoints_df["status"] == Trial.TERMINATED).sum(),
"error_trials": (checkpoints_df["status"] == Trial.ERROR).sum(),
}
experiment_data.update(formatted_time_values)
experiment_data = {"name": experiment_dir, "total_trials": num_trials}
experiment_data_collection.append(experiment_data)
if not experiment_data_collection:
+9 -7
View File
@@ -14,8 +14,10 @@ import numpy as np
import ray.cloudpickle as cloudpickle
from ray.tune.syncer import get_log_syncer
from ray.tune.result import NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S, \
TIMESTEPS_TOTAL
from ray.tune.result import (NODE_IP, TRAINING_ITERATION, TIME_TOTAL_S,
TIMESTEPS_TOTAL, EXPR_PARAM_FILE,
EXPR_PARAM_PICKLE_FILE, EXPR_PROGRESS_FILE,
EXPR_RESULT_FILE)
logger = logging.getLogger(__name__)
@@ -72,7 +74,7 @@ class NoopLogger(Logger):
class JsonLogger(Logger):
def _init(self):
self.update_config(self.config)
local_file = os.path.join(self.logdir, "result.json")
local_file = os.path.join(self.logdir, EXPR_RESULT_FILE)
self.local_out = open(local_file, "a")
def on_result(self, result):
@@ -91,7 +93,7 @@ class JsonLogger(Logger):
def update_config(self, config):
self.config = config
config_out = os.path.join(self.logdir, "params.json")
config_out = os.path.join(self.logdir, EXPR_PARAM_FILE)
with open(config_out, "w") as f:
json.dump(
self.config,
@@ -99,7 +101,7 @@ class JsonLogger(Logger):
indent=2,
sort_keys=True,
cls=_SafeFallbackEncoder)
config_pkl = os.path.join(self.logdir, "params.pkl")
config_pkl = os.path.join(self.logdir, EXPR_PARAM_PICKLE_FILE)
with open(config_pkl, "wb") as f:
cloudpickle.dump(self.config, f)
@@ -150,7 +152,7 @@ class TFLogger(Logger):
t = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION]
self._file_writer.add_summary(train_stats, t)
iteration_value = to_tf_values({
"training_iteration": result[TRAINING_ITERATION]
TRAINING_ITERATION: result[TRAINING_ITERATION]
}, ["ray", "tune"])
iteration_stats = tf.Summary(value=iteration_value)
self._file_writer.add_summary(iteration_stats, t)
@@ -167,7 +169,7 @@ class CSVLogger(Logger):
def _init(self):
"""CSV outputted with Headers as first set of results."""
# Note that we assume params.json was already created by JsonLogger
progress_file = os.path.join(self.logdir, "progress.csv")
progress_file = os.path.join(self.logdir, EXPR_PROGRESS_FILE)
self._continuing = os.path.exists(progress_file)
self._file = open(progress_file, "a")
self._csv_out = None
+6
View File
@@ -15,6 +15,9 @@ SHOULD_CHECKPOINT = "should_checkpoint"
# (Auto-filled) The hostname of the machine hosting the training process.
HOSTNAME = "hostname"
# (Auto-filled) The auto-assigned id of the trial.
TRIAL_ID = "trial_id"
# (Auto-filled) The node ip of the machine hosting the training process.
NODE_IP = "node_ip"
@@ -73,6 +76,9 @@ EXPR_META_FILE = "trial_status.json"
# File that stores parameters of the trial.
EXPR_PARAM_FILE = "params.json"
# Pickle File that stores parameters of the trial.
EXPR_PARAM_PICKLE_FILE = "params.pkl"
# File that stores the progress of the trial.
EXPR_PROGRESS_FILE = "progress.csv"
+4 -5
View File
@@ -75,7 +75,7 @@ def test_ls(start_ray, tmpdir):
local_dir=str(tmpdir),
global_checkpoint_period=0)
columns = ["status", "episode_reward_mean", "training_iteration"]
columns = ["episode_reward_mean", "training_iteration", "trial_id"]
limit = 2
with Capturing() as output:
commands.list_trials(experiment_path, info_keys=columns, limit=limit)
@@ -87,11 +87,10 @@ def test_ls(start_ray, tmpdir):
with Capturing() as output:
commands.list_trials(
experiment_path,
sort=["status"],
info_keys=("status", ),
filter_op="status == TERMINATED")
sort=["trial_id"],
info_keys=("trial_id", "training_iteration"),
filter_op="training_iteration == 1")
lines = output.captured
assert sum("TERMINATED" in line for line in lines) == num_samples
assert len(lines) == 3 + num_samples + 1
@@ -10,14 +10,13 @@ import os
import pandas as pd
import ray
from ray.tune import run, sample_from
from ray.tune import run, sample_from, Analysis
from ray.tune.examples.async_hyperband_example import MyTrainableClass
class ExperimentAnalysisSuite(unittest.TestCase):
def setUp(self):
ray.init(local_mode=True)
self.test_dir = tempfile.mkdtemp()
self.test_name = "analysis_exp"
self.num_samples = 10
@@ -51,20 +50,13 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertEquals(df.shape[0], self.num_samples)
def testTrialDataframe(self):
cs = self.ea._checkpoints
idx = random.randint(0, len(cs) - 1)
trial_df = self.ea.trial_dataframe(
cs[idx]["trial_id"]) # random trial df
checkpoints = self.ea._checkpoints
idx = random.randint(0, len(checkpoints) - 1)
trial_df = self.ea.trial_dataframes[checkpoints[idx]["logdir"]]
self.assertTrue(isinstance(trial_df, pd.DataFrame))
self.assertEqual(trial_df.shape[0], 1)
def testBestTrainable(self):
best_trainable = self.ea.get_best_trainable(self.metric,
MyTrainableClass)
self.assertTrue(isinstance(best_trainable, MyTrainableClass))
def testBestConfig(self):
best_config = self.ea.get_best_config(self.metric)
@@ -72,53 +64,6 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertTrue("width" in best_config)
self.assertTrue("height" in best_config)
def testBestTrial(self):
best_trial = self.ea.get_best_info(self.metric, flatten=False)
self.assertTrue(isinstance(best_trial, dict))
self.assertTrue("local_dir" in best_trial)
self.assertEqual(best_trial["local_dir"],
os.path.expanduser(self.test_path))
self.assertTrue("config" in best_trial)
self.assertTrue("width" in best_trial["config"])
self.assertTrue("height" in best_trial["config"])
self.assertTrue("last_result" in best_trial)
self.assertTrue(self.metric in best_trial["last_result"])
min_trial = self.ea.get_best_info(
self.metric, mode="min", flatten=False)
self.assertTrue(isinstance(min_trial, dict))
self.assertLess(min_trial["last_result"][self.metric],
best_trial["last_result"][self.metric])
flat_trial = self.ea.get_best_info(self.metric, flatten=True)
self.assertTrue(isinstance(min_trial, dict))
self.assertTrue(self.metric in flat_trial)
def testCheckpoints(self):
checkpoints = self.ea._checkpoints
self.assertTrue(isinstance(checkpoints, list))
self.assertTrue(isinstance(checkpoints[0], dict))
self.assertEqual(len(checkpoints), self.num_samples)
def testStats(self):
stats = self.ea.stats()
self.assertTrue(isinstance(stats, dict))
self.assertTrue("start_time" in stats)
self.assertTrue("timestamp" in stats)
def testRunnerData(self):
runner_data = self.ea.runner_data()
self.assertTrue(isinstance(runner_data, dict))
self.assertTrue("_local_checkpoint_dir" in runner_data)
self.assertEqual(runner_data["_local_checkpoint_dir"],
os.path.expanduser(self.test_path))
def testBestLogdir(self):
logdir = self.ea.get_best_logdir(self.metric)
self.assertTrue(logdir.startswith(self.test_path))
@@ -127,13 +72,72 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertNotEquals(logdir, logdir2)
def testAllDataframes(self):
dataframes = self.ea.get_all_trial_dataframes()
dataframes = self.ea.trial_dataframes
self.assertTrue(len(dataframes) == self.num_samples)
self.assertTrue(isinstance(dataframes, dict))
for df in dataframes.values():
self.assertEqual(df.training_iteration.max(), 1)
def testIgnoreOtherExperiment(self):
analysis = run(
MyTrainableClass,
global_checkpoint_period=0,
name="test_example",
local_dir=self.test_dir,
return_trials=False,
stop={"training_iteration": 1},
num_samples=1,
config={
"width": sample_from(
lambda spec: 10 + int(90 * random.random())),
"height": sample_from(lambda spec: int(100 * random.random())),
})
df = analysis.dataframe()
self.assertEquals(df.shape[0], 1)
class AnalysisSuite(unittest.TestCase):
def setUp(self):
ray.init(local_mode=True)
self.test_dir = tempfile.mkdtemp()
self.num_samples = 10
self.metric = "episode_reward_mean"
self.run_test_exp(test_name="analysis_exp1")
self.run_test_exp(test_name="analysis_exp2")
def run_test_exp(self, test_name=None):
run(MyTrainableClass,
global_checkpoint_period=0,
name=test_name,
local_dir=self.test_dir,
return_trials=False,
stop={"training_iteration": 1},
num_samples=self.num_samples,
config={
"width": sample_from(
lambda spec: 10 + int(90 * random.random())),
"height": sample_from(lambda spec: int(100 * random.random())),
})
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
ray.shutdown()
def testDataframe(self):
analysis = Analysis(self.test_dir)
df = analysis.dataframe()
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertEquals(df.shape[0], self.num_samples * 2)
def testBestLogdir(self):
analysis = Analysis(self.test_dir)
logdir = analysis.get_best_logdir(self.metric)
self.assertTrue(logdir.startswith(self.test_dir))
logdir2 = analysis.get_best_logdir(self.metric, mode="min")
self.assertTrue(logdir2.startswith(self.test_dir))
self.assertNotEquals(logdir, logdir2)
if __name__ == "__main__":
unittest.main(verbosity=2)
+1 -1
View File
@@ -66,7 +66,7 @@ class TrackApiTest(unittest.TestCase):
for i in range(config["iters"]):
track.log(iteration=i, hi="test")
trials = tune.run(testme, config={"iters": 5})
trials = tune.run(testme, config={"iters": 5}).trials
trial_res = trials[0].last_result
self.assertTrue(trial_res["hi"], "test")
self.assertTrue(trial_res["training_iteration"], 5)
+30 -16
View File
@@ -23,7 +23,7 @@ from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.result import (DEFAULT_RESULTS_DIR, TIMESTEPS_TOTAL, DONE,
HOSTNAME, NODE_IP, PID, EPISODES_TOTAL,
TRAINING_ITERATION, TIMESTEPS_THIS_ITER,
TIME_THIS_ITER_S, TIME_TOTAL_S)
TIME_THIS_ITER_S, TIME_TOTAL_S, TRIAL_ID)
from ray.tune.logger import Logger
from ray.tune.util import pin_in_object_store, get_pinned_object
from ray.tune.experiment import Experiment
@@ -115,6 +115,7 @@ class TrainableFunctionApiTest(unittest.TestCase):
NO_COMPARE_FIELDS = {
HOSTNAME,
NODE_IP,
TRIAL_ID,
PID,
TIME_THIS_ITER_S,
TIME_TOTAL_S,
@@ -432,7 +433,14 @@ class TrainableFunctionApiTest(unittest.TestCase):
for i in range(10):
reporter(test={"test1": {"test2": i}})
[trial] = tune.run(train, stop={"test": {"test1": {"test2": 6}}})
[trial] = tune.run(
train, stop={
"test": {
"test1": {
"test2": 6
}
}
}).trials
self.assertEqual(trial.last_result["training_iteration"], 7)
def testEarlyReturn(self):
@@ -938,7 +946,7 @@ class TestSyncFunctionality(unittest.TestCase):
"training_iteration": 1
},
"sync_to_cloud": "echo {source} {target}"
})
}).trials
@patch("ray.tune.syncer.S3_PREFIX", "test")
def testCloudProperString(self):
@@ -953,7 +961,7 @@ class TestSyncFunctionality(unittest.TestCase):
},
"upload_dir": "test",
"sync_to_cloud": "ls {target}"
})
}).trials
with self.assertRaises(ValueError):
[trial] = tune.run(
@@ -966,7 +974,7 @@ class TestSyncFunctionality(unittest.TestCase):
},
"upload_dir": "test",
"sync_to_cloud": "ls {source}"
})
}).trials
tmpdir = tempfile.mkdtemp()
logfile = os.path.join(tmpdir, "test.log")
@@ -981,7 +989,7 @@ class TestSyncFunctionality(unittest.TestCase):
},
"upload_dir": "test",
"sync_to_cloud": "echo {source} {target} > " + logfile
})
}).trials
with open(logfile) as f:
lines = f.read()
self.assertTrue("test" in lines)
@@ -1000,7 +1008,7 @@ class TestSyncFunctionality(unittest.TestCase):
"training_iteration": 1
},
"sync_to_driver": "ls {target}"
})
}).trials
with self.assertRaises(TuneError):
# This raises TuneError because logger is init in safe zone.
@@ -1013,7 +1021,7 @@ class TestSyncFunctionality(unittest.TestCase):
"training_iteration": 1
},
"sync_to_driver": "ls {source}"
})
}).trials
with patch("ray.tune.syncer.CommandSyncer.sync_function"
) as mock_fn, patch(
@@ -1028,7 +1036,7 @@ class TestSyncFunctionality(unittest.TestCase):
"training_iteration": 1
},
"sync_to_driver": "echo {source} {target}"
})
}).trials
self.assertGreater(mock_fn.call_count, 0)
def testCloudFunctions(self):
@@ -1045,9 +1053,11 @@ class TestSyncFunctionality(unittest.TestCase):
name="foo",
max_failures=0,
local_dir=tmpdir,
stop={"training_iteration": 1},
stop={
"training_iteration": 1
},
upload_dir=tmpdir2,
sync_to_cloud=tune.function(sync_func))
sync_to_cloud=tune.function(sync_func)).trials
test_file_path = glob.glob(os.path.join(tmpdir2, "foo", "*.json"))
self.assertTrue(test_file_path)
shutil.rmtree(tmpdir)
@@ -1065,8 +1075,10 @@ class TestSyncFunctionality(unittest.TestCase):
"__fake",
name="foo",
max_failures=0,
stop={"training_iteration": 1},
sync_to_driver=tune.function(sync_func_driver))
stop={
"training_iteration": 1
},
sync_to_driver=tune.function(sync_func_driver)).trials
test_file_path = os.path.join(trial.logdir, "test.log2")
self.assertFalse(os.path.exists(test_file_path))
@@ -1076,8 +1088,10 @@ class TestSyncFunctionality(unittest.TestCase):
"__fake",
name="foo",
max_failures=0,
stop={"training_iteration": 1},
sync_to_driver=tune.function(sync_func_driver))
stop={
"training_iteration": 1
},
sync_to_driver=tune.function(sync_func_driver)).trials
test_file_path = os.path.join(trial.logdir, "test.log2")
self.assertTrue(os.path.exists(test_file_path))
os.remove(test_file_path)
@@ -1098,7 +1112,7 @@ class TestSyncFunctionality(unittest.TestCase):
"upload_dir": "test",
"sync_to_driver": tune.function(sync_func),
"sync_to_cloud": tune.function(sync_func)
})
}).trials
self.assertEqual(mock_sync.call_count, 0)
+2 -3
View File
@@ -288,6 +288,7 @@ class Trial(object):
Trial._registration_check(trainable_name)
# Trial config
self.trainable_name = trainable_name
self.trial_id = Trial.generate_id() if trial_id is None else trial_id
self.config = config or {}
self.local_dir = local_dir # This remains unexpanded for syncing.
self.experiment_tag = experiment_tag
@@ -334,7 +335,6 @@ class Trial(object):
self.runner = None
self.result_logger = None
self.last_debug = 0
self.trial_id = Trial.generate_id() if trial_id is None else trial_id
self.error_file = None
self.num_failures = 0
self.custom_trial_name = None
@@ -516,8 +516,7 @@ class Trial(object):
or self.max_failures < 0))
def update_last_result(self, result, terminate=False):
if terminate:
result.update(done=True)
result.update(trial_id=self.trial_id, done=terminate)
if self.verbose and (terminate or time.time() - self.last_debug >
DEBUG_PRINT_INTERVAL):
print("Result for {}:".format(self))
+7 -4
View File
@@ -180,6 +180,11 @@ class TrialRunner(object):
self._checkpoint_period = checkpoint_period
self._session_str = datetime.fromtimestamp(
self._start_time).strftime("%Y-%m-%d_%H-%M-%S")
self.checkpoint_file = None
if self._local_checkpoint_dir:
self.checkpoint_file = os.path.join(
self._local_checkpoint_dir,
TrialRunner.CKPT_FILE_TMPL.format(self._session_str))
def _validate_resume(self, resume_type):
"""Checks whether to resume experiment.
@@ -260,10 +265,7 @@ class TrialRunner(object):
with open(tmp_file_name, "w") as f:
json.dump(runner_state, f, indent=2, cls=_TuneFunctionEncoder)
os.rename(
tmp_file_name,
os.path.join(self._local_checkpoint_dir,
TrialRunner.CKPT_FILE_TMPL.format(self._session_str)))
os.rename(tmp_file_name, self.checkpoint_file)
self._syncer.sync_up_if_needed()
return self._local_checkpoint_dir
@@ -277,6 +279,7 @@ class TrialRunner(object):
newest_ckpt_path = _find_newest_ckpt(self._local_checkpoint_dir)
with open(newest_ckpt_path, "r") as f:
runner_state = json.load(f, cls=_TuneFunctionDecoder)
self.checkpoint_file = newest_ckpt_path
logger.warning("".join([
"Attempting to resume experiment from {}. ".format(
+6 -4
View File
@@ -65,7 +65,7 @@ def run(run_or_experiment,
reuse_actors=False,
trial_executor=None,
raise_on_failed_trial=True,
return_trials=True,
return_trials=False,
ray_auto_init=True,
sync_function=None):
"""Executes training.
@@ -263,9 +263,10 @@ def run(run_or_experiment,
else:
logger.error("Trials did not complete: %s", errored_trials)
trials = runner.get_trials()
if return_trials:
return runner.get_trials()
return ExperimentAnalysis(experiment.checkpoint_dir)
return trials
return ExperimentAnalysis(runner.checkpoint_file, trials=trials)
def run_experiments(experiments,
@@ -319,5 +320,6 @@ def run_experiments(experiments,
queue_trials=queue_trials,
reuse_actors=reuse_actors,
trial_executor=trial_executor,
raise_on_failed_trial=raise_on_failed_trial)
raise_on_failed_trial=raise_on_failed_trial,
return_trials=True)
return trials