[tune] Ensure Cleanup (#7967)

This commit is contained in:
Richard Liaw
2020-04-11 16:28:03 -07:00
committed by GitHub
parent dd63178e91
commit 87e3c39b48
5 changed files with 85 additions and 5 deletions
+58 -5
View File
@@ -26,7 +26,8 @@ logger = logging.getLogger(__name__)
RESOURCE_REFRESH_PERIOD = 0.5 # Refresh resources every 500 ms
BOTTLENECK_WARN_PERIOD_S = 60
NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3
DEFAULT_GET_TIMEOUT = 30.0 # seconds
DEFAULT_GET_TIMEOUT = 60.0 # seconds
TRIAL_CLEANUP_THRESHOLD = 100
class _LocalWrapper:
@@ -38,6 +39,55 @@ class _LocalWrapper:
return self._result
class _TrialCleanup:
"""Mechanism for ensuring trial stop futures are cleaned up.
Args:
threshold (int): Number of futures to hold at once. If the threshold
is passed, cleanup will kick in and remove futures.
"""
def __init__(self, threshold=TRIAL_CLEANUP_THRESHOLD):
self.threshold = threshold
self._cleanup_map = {}
def add(self, trial, actor):
"""Adds a trial actor to be stopped.
If the number of futures exceeds the threshold, the cleanup mechanism
will kick in.
Args:
trial (Trial): The trial corresponding to the future.
actor (ActorHandle): Handle to the trainable to be stopped.
"""
future = actor.stop.remote()
actor.__ray_terminate__.remote()
self._cleanup_map[future] = trial
if len(self._cleanup_map) > self.threshold:
self.cleanup(partial=True)
def cleanup(self, partial=True):
"""Waits for cleanup to finish.
If partial=False, all futures are expected to return. If a future
does not return within the timeout period, the cleanup terminates.
"""
logger.debug("Cleaning up futures")
num_to_keep = int(self.threshold) / 2 if partial else 0
while len(self._cleanup_map) > num_to_keep:
dones, _ = ray.wait(
list(self._cleanup_map), timeout=DEFAULT_GET_TIMEOUT)
if not dones:
logger.warning(
"Skipping cleanup - trainable.stop did not return in "
"time. Consider making `stop` a faster operation.")
else:
done = dones[0]
del self._cleanup_map[done]
class RayTrialExecutor(TrialExecutor):
"""An implementation of TrialExecutor based on Ray."""
@@ -55,6 +105,8 @@ class RayTrialExecutor(TrialExecutor):
# trial.train.remote(), thus no more new remote object id generated.
# We use self._paused to store paused trials here.
self._paused = {}
self._trial_cleanup = _TrialCleanup()
self._reuse_actors = reuse_actors
self._cached_actor = None
@@ -96,8 +148,7 @@ class RayTrialExecutor(TrialExecutor):
logger.debug("Cannot reuse cached runner {} for new trial".format(
self._cached_actor))
with self._change_working_directory(trial):
self._cached_actor.stop.remote()
self._cached_actor.__ray_terminate__.remote()
self._trial_cleanup.add(trial, actor=self._cached_actor)
self._cached_actor = None
cls = ray.remote(
@@ -220,8 +271,7 @@ class RayTrialExecutor(TrialExecutor):
else:
logger.debug("Trial %s: Destroying actor.", trial)
with self._change_working_directory(trial):
trial.runner.stop.remote()
trial.runner.__ray_terminate__.remote()
self._trial_cleanup.add(trial, actor=trial.runner)
except Exception:
logger.exception("Trial %s: Error stopping runner.", trial)
self.set_status(trial, Trial.ERROR)
@@ -651,6 +701,9 @@ class RayTrialExecutor(TrialExecutor):
self._update_avail_resources()
return self._avail_resources.gpu > 0
def cleanup(self):
self._trial_cleanup.cleanup(partial=False)
@contextmanager
def _change_working_directory(self, trial):
"""Context manager changing working directory to trial logdir.
+19
View File
@@ -614,6 +614,25 @@ class TrainableFunctionApiTest(unittest.TestCase):
self.assertEqual(trial.last_result.get("name"), str(trial))
self.assertEqual(trial.last_result.get("trial_id"), trial.trial_id)
@patch("ray.tune.ray_trial_executor.TRIAL_CLEANUP_THRESHOLD", 3)
def testLotsOfStops(self):
class TestTrainable(Trainable):
def _train(self):
result = {"name": self.trial_name, "trial_id": self.trial_id}
return result
def _stop(self):
time.sleep(2)
open(os.path.join(self.logdir, "marker"), "a").close()
return 1
analysis = tune.run(
TestTrainable, num_samples=10, stop={TRAINING_ITERATION: 1})
ray.shutdown()
for trial in analysis.trials:
path = os.path.join(trial.logdir, "marker")
assert os.path.exists(path)
def testNestedResults(self):
def create_result(i):
return {"test": {"1": {"2": {"3": i, "4": False}}}}
+4
View File
@@ -259,3 +259,7 @@ class TrialExecutor:
def has_gpus(self):
"""Returns True if GPUs are detected on the cluster."""
return None
def cleanup(self, trial):
"""Ensures that trials are cleaned up after stopping."""
pass
+3
View File
@@ -740,6 +740,9 @@ class TrialRunner:
self.trial_executor.stop_trial(trial, error=error, error_msg=error_msg)
def cleanup_trials(self):
self.trial_executor.cleanup()
def __getstate__(self):
"""Gets state for trial.
+1
View File
@@ -331,6 +331,7 @@ def run(run_or_experiment,
_report_progress(runner, progress_reporter, done=True)
wait_for_sync()
runner.cleanup_trials()
incomplete_trials = []
for trial in runner.get_trials():