diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 8a25379db..d25dc0c8e 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -26,7 +26,8 @@ logger = logging.getLogger(__name__) RESOURCE_REFRESH_PERIOD = 0.5 # Refresh resources every 500 ms BOTTLENECK_WARN_PERIOD_S = 60 NONTRIVIAL_WAIT_TIME_THRESHOLD_S = 1e-3 -DEFAULT_GET_TIMEOUT = 30.0 # seconds +DEFAULT_GET_TIMEOUT = 60.0 # seconds +TRIAL_CLEANUP_THRESHOLD = 100 class _LocalWrapper: @@ -38,6 +39,55 @@ class _LocalWrapper: return self._result +class _TrialCleanup: + """Mechanism for ensuring trial stop futures are cleaned up. + + Args: + threshold (int): Number of futures to hold at once. If the threshold + is passed, cleanup will kick in and remove futures. + """ + + def __init__(self, threshold=TRIAL_CLEANUP_THRESHOLD): + self.threshold = threshold + self._cleanup_map = {} + + def add(self, trial, actor): + """Adds a trial actor to be stopped. + + If the number of futures exceeds the threshold, the cleanup mechanism + will kick in. + + Args: + trial (Trial): The trial corresponding to the future. + actor (ActorHandle): Handle to the trainable to be stopped. + """ + future = actor.stop.remote() + actor.__ray_terminate__.remote() + + self._cleanup_map[future] = trial + if len(self._cleanup_map) > self.threshold: + self.cleanup(partial=True) + + def cleanup(self, partial=True): + """Waits for cleanup to finish. + + If partial=False, all futures are expected to return. If a future + does not return within the timeout period, the cleanup terminates. + """ + logger.debug("Cleaning up futures") + num_to_keep = int(self.threshold) / 2 if partial else 0 + while len(self._cleanup_map) > num_to_keep: + dones, _ = ray.wait( + list(self._cleanup_map), timeout=DEFAULT_GET_TIMEOUT) + if not dones: + logger.warning( + "Skipping cleanup - trainable.stop did not return in " + "time. Consider making `stop` a faster operation.") + else: + done = dones[0] + del self._cleanup_map[done] + + class RayTrialExecutor(TrialExecutor): """An implementation of TrialExecutor based on Ray.""" @@ -55,6 +105,8 @@ class RayTrialExecutor(TrialExecutor): # trial.train.remote(), thus no more new remote object id generated. # We use self._paused to store paused trials here. self._paused = {} + + self._trial_cleanup = _TrialCleanup() self._reuse_actors = reuse_actors self._cached_actor = None @@ -96,8 +148,7 @@ class RayTrialExecutor(TrialExecutor): logger.debug("Cannot reuse cached runner {} for new trial".format( self._cached_actor)) with self._change_working_directory(trial): - self._cached_actor.stop.remote() - self._cached_actor.__ray_terminate__.remote() + self._trial_cleanup.add(trial, actor=self._cached_actor) self._cached_actor = None cls = ray.remote( @@ -220,8 +271,7 @@ class RayTrialExecutor(TrialExecutor): else: logger.debug("Trial %s: Destroying actor.", trial) with self._change_working_directory(trial): - trial.runner.stop.remote() - trial.runner.__ray_terminate__.remote() + self._trial_cleanup.add(trial, actor=trial.runner) except Exception: logger.exception("Trial %s: Error stopping runner.", trial) self.set_status(trial, Trial.ERROR) @@ -651,6 +701,9 @@ class RayTrialExecutor(TrialExecutor): self._update_avail_resources() return self._avail_resources.gpu > 0 + def cleanup(self): + self._trial_cleanup.cleanup(partial=False) + @contextmanager def _change_working_directory(self, trial): """Context manager changing working directory to trial logdir. diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index 0866da638..3361fbfe5 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -614,6 +614,25 @@ class TrainableFunctionApiTest(unittest.TestCase): self.assertEqual(trial.last_result.get("name"), str(trial)) self.assertEqual(trial.last_result.get("trial_id"), trial.trial_id) + @patch("ray.tune.ray_trial_executor.TRIAL_CLEANUP_THRESHOLD", 3) + def testLotsOfStops(self): + class TestTrainable(Trainable): + def _train(self): + result = {"name": self.trial_name, "trial_id": self.trial_id} + return result + + def _stop(self): + time.sleep(2) + open(os.path.join(self.logdir, "marker"), "a").close() + return 1 + + analysis = tune.run( + TestTrainable, num_samples=10, stop={TRAINING_ITERATION: 1}) + ray.shutdown() + for trial in analysis.trials: + path = os.path.join(trial.logdir, "marker") + assert os.path.exists(path) + def testNestedResults(self): def create_result(i): return {"test": {"1": {"2": {"3": i, "4": False}}}} diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index 37e656567..f6ebacacc 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -259,3 +259,7 @@ class TrialExecutor: def has_gpus(self): """Returns True if GPUs are detected on the cluster.""" return None + + def cleanup(self, trial): + """Ensures that trials are cleaned up after stopping.""" + pass diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 7a1c950a5..77fae623a 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -740,6 +740,9 @@ class TrialRunner: self.trial_executor.stop_trial(trial, error=error, error_msg=error_msg) + def cleanup_trials(self): + self.trial_executor.cleanup() + def __getstate__(self): """Gets state for trial. diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 4cf1f9d03..3ed216dbe 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -331,6 +331,7 @@ def run(run_or_experiment, _report_progress(runner, progress_reporter, done=True) wait_for_sync() + runner.cleanup_trials() incomplete_trials = [] for trial in runner.get_trials():