[tune] Avoid scheduler blocking, add reuse_actors optimization (#4218)

This commit is contained in:
Eric Liang
2019-03-12 23:49:31 -07:00
committed by GitHub
parent 2202a81773
commit d5f4698305
11 changed files with 244 additions and 42 deletions
+5
View File
@@ -6,3 +6,8 @@ from __future__ import print_function
class TuneError(Exception):
"""General error class raised by ray.tune."""
pass
class AbortTrialExecution(TuneError):
"""Error that indicates a trial should not be retried."""
pass
+1
View File
@@ -96,4 +96,5 @@ if __name__ == "__main__":
}
},
scheduler=pbt,
reuse_actors=True,
verbose=False)
+62 -21
View File
@@ -10,10 +10,11 @@ import time
import traceback
import ray
from ray.tune.error import TuneError
from ray.tune.error import TuneError, AbortTrialExecution
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Resources, Checkpoint
from ray.tune.trial_executor import TrialExecutor
from ray.tune.util import warn_if_slow
logger = logging.getLogger(__name__)
@@ -30,7 +31,7 @@ class _LocalWrapper(object):
class RayTrialExecutor(TrialExecutor):
"""An implemention of TrialExecutor based on Ray."""
def __init__(self, queue_trials=False):
def __init__(self, queue_trials=False, reuse_actors=False):
super(RayTrialExecutor, self).__init__(queue_trials)
self._running = {}
# Since trial resume after paused should not run
@@ -40,21 +41,46 @@ class RayTrialExecutor(TrialExecutor):
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False
self._reuse_actors = reuse_actors
self._cached_actor = None
if ray.is_initialized():
self._update_avail_resources()
def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())
def _setup_runner(self, trial, reuse_allowed):
if (self._reuse_actors and reuse_allowed
and self._cached_actor is not None):
logger.debug("Reusing cached runner {} for {}".format(
self._cached_actor, trial.trial_id))
existing_runner = self._cached_actor
self._cached_actor = None
else:
if self._cached_actor:
logger.debug(
"Cannot reuse cached runner {} for new trial".format(
self._cached_actor))
self._cached_actor.stop.remote()
self._cached_actor.__ray_terminate__.remote()
self._cached_actor = None
existing_runner = None
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())
trial.init_logger()
# We checkpoint metadata here to try mitigating logdir duplication
self.try_checkpoint_metadata(trial)
remote_logdir = trial.logdir
if existing_runner:
trial.runner = existing_runner
if not self.reset_trial(trial, trial.config, trial.experiment_tag):
raise AbortTrialExecution(
"Trial runner reuse requires reset_trial() to be "
"implemented and return True.")
return existing_runner
def logger_creator(config):
# Set the working dir in the remote process, for user file writes
if not os.path.exists(remote_logdir):
@@ -86,7 +112,10 @@ class RayTrialExecutor(TrialExecutor):
"""
prior_status = trial.status
self.set_status(trial, Trial.RUNNING)
trial.runner = self._setup_runner(trial)
trial.runner = self._setup_runner(
trial,
reuse_allowed=checkpoint is not None
or trial._checkpoint.value is not None)
if not self.restore(trial, checkpoint):
if trial.status == Trial.ERROR:
raise RuntimeError(
@@ -126,12 +155,18 @@ class RayTrialExecutor(TrialExecutor):
try:
trial.write_error_log(error_msg)
if hasattr(trial, 'runner') and trial.runner:
stop_tasks = []
stop_tasks.append(trial.runner.stop.remote())
stop_tasks.append(trial.runner.__ray_terminate__.remote())
# TODO(ekl) seems like wait hangs when killing actors
_, unfinished = ray.wait(
stop_tasks, num_returns=2, timeout=0.25)
if (not error and self._reuse_actors
and self._cached_actor is None):
logger.debug("Reusing actor for {}".format(trial.runner))
self._cached_actor = trial.runner
else:
logger.info(
"Destroying actor for trial {}. If your trainable is "
"slow to initialize, consider setting "
"reuse_actors=True to reduce actor creation "
"overheads.".format(trial))
trial.runner.stop.remote()
trial.runner.__ray_terminate__.remote()
except Exception:
logger.exception("Error stopping runner for Trial %s", str(trial))
self.set_status(trial, Trial.ERROR)
@@ -152,11 +187,13 @@ class RayTrialExecutor(TrialExecutor):
self._commit_resources(trial.resources)
try:
self._start_trial(trial, checkpoint)
except Exception:
except Exception as e:
logger.exception("Error starting runner for Trial %s", str(trial))
error_msg = traceback.format_exc()
time.sleep(2)
self._stop_trial(trial, error=True, error_msg=error_msg)
if isinstance(e, AbortTrialExecution):
return # don't retry fatal Tune errors
try:
# This forces the trial to not start from checkpoint.
trial.clear_checkpoint()
@@ -222,7 +259,8 @@ class RayTrialExecutor(TrialExecutor):
trial.experiment_tag = new_experiment_tag
trial.config = new_config
trainable = trial.runner
reset_val = ray.get(trainable.reset_config.remote(new_config))
with warn_if_slow("reset_config"):
reset_val = ray.get(trainable.reset_config.remote(new_config))
return reset_val
def get_running_trials(self):
@@ -249,7 +287,8 @@ class RayTrialExecutor(TrialExecutor):
if not trial_future:
raise ValueError("Trial was not running.")
self._running.pop(trial_future[0])
result = ray.get(trial_future[0])
with warn_if_slow("fetch_result"):
result = ray.get(trial_future[0])
# For local mode
if isinstance(result, _LocalWrapper):
@@ -400,7 +439,8 @@ class RayTrialExecutor(TrialExecutor):
if storage == Checkpoint.MEMORY:
trial._checkpoint.value = trial.runner.save_to_object.remote()
else:
trial._checkpoint.value = ray.get(trial.runner.save.remote())
with warn_if_slow("save_to_disk"):
trial._checkpoint.value = ray.get(trial.runner.save.remote())
return trial._checkpoint.value
def restore(self, trial, checkpoint=None):
@@ -421,11 +461,12 @@ class RayTrialExecutor(TrialExecutor):
value = checkpoint.value
if checkpoint.storage == Checkpoint.MEMORY:
assert type(value) != Checkpoint, type(value)
ray.get(trial.runner.restore_from_object.remote(value))
trial.runner.restore_from_object.remote(value)
else:
worker_ip = ray.get(trial.runner.current_ip.remote())
trial.sync_logger_to_new_location(worker_ip)
ray.get(trial.runner.restore.remote(value))
with warn_if_slow("restore_from_disk"):
ray.get(trial.runner.restore.remote(value))
trial.last_result = checkpoint.last_result
return True
except Exception:
+10 -9
View File
@@ -218,23 +218,24 @@ class PopulationBasedTraining(FIFOScheduler):
trial_state = self._trial_state[trial]
new_state = self._trial_state[trial_to_clone]
if not new_state.last_checkpoint:
logger.warning("[pbt]: no checkpoint for trial."
" Skip exploit for Trial {}".format(trial))
logger.info("[pbt]: no checkpoint for trial."
" Skip exploit for Trial {}".format(trial))
return
new_config = explore(trial_to_clone.config, self._hyperparam_mutations,
self._resample_probability,
self._custom_explore_fn)
logger.warning("[exploit] transferring weights from trial "
"{} (score {}) -> {} (score {})".format(
trial_to_clone, new_state.last_score, trial,
trial_state.last_score))
# TODO(ekl) restarting the trial is expensive. We should implement a
# lighter way reset() method that can alter the trial config.
logger.info("[exploit] transferring weights from trial "
"{} (score {}) -> {} (score {})".format(
trial_to_clone, new_state.last_score, trial,
trial_state.last_score))
new_tag = make_experiment_tag(trial_state.orig_tag, new_config,
self._hyperparam_mutations)
reset_successful = trial_executor.reset_trial(trial, new_config,
new_tag)
if not reset_successful:
if reset_successful:
trial_executor.restore(
trial, Checkpoint.from_object(new_state.last_checkpoint))
else:
trial_executor.stop_trial(trial, stop_logger=False)
trial.config = new_config
trial.experiment_tag = new_tag
+96
View File
@@ -0,0 +1,96 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import ray
from ray.tune import Trainable, run_experiments
from ray.tune.error import TuneError
from ray.tune.schedulers.trial_scheduler import FIFOScheduler, TrialScheduler
class FrequentPausesScheduler(FIFOScheduler):
def on_trial_result(self, trial_runner, trial, result):
return TrialScheduler.PAUSE
class MyResettableClass(Trainable):
def _setup(self, config):
self.config = config
self.num_resets = 0
self.iter = 0
def _train(self):
self.iter += 1
return {"num_resets": self.num_resets, "done": self.iter > 1}
def _save(self, chkpt_dir):
return {"iter": self.iter}
def _restore(self, item):
self.iter = item["iter"]
def reset_config(self, new_config):
if "fake_reset_not_supported" in self.config:
return False
self.num_resets += 1
return True
class ActorReuseTest(unittest.TestCase):
def setUp(self):
ray.init(num_cpus=1, num_gpus=0)
def tearDown(self):
ray.shutdown()
def testTrialReuseDisabled(self):
trials = run_experiments(
{
"foo": {
"run": MyResettableClass,
"num_samples": 4,
"config": {},
}
},
reuse_actors=False,
scheduler=FrequentPausesScheduler())
self.assertEqual([t.last_result["num_resets"] for t in trials],
[0, 0, 0, 0])
def testTrialReuseEnabled(self):
trials = run_experiments(
{
"foo": {
"run": MyResettableClass,
"num_samples": 4,
"config": {},
}
},
reuse_actors=True,
scheduler=FrequentPausesScheduler())
self.assertEqual([t.last_result["num_resets"] for t in trials],
[1, 2, 3, 4])
def testTrialReuseEnabledError(self):
def run():
run_experiments(
{
"foo": {
"run": MyResettableClass,
"max_failures": 1,
"num_samples": 4,
"config": {
"fake_reset_not_supported": True
},
}
},
reuse_actors=True,
scheduler=FrequentPausesScheduler())
self.assertRaises(TuneError, lambda: run())
if __name__ == "__main__":
unittest.main(verbosity=2)
+8 -1
View File
@@ -310,6 +310,9 @@ class Trainable(object):
self._restore(checkpoint_dict)
else:
self._restore(checkpoint_path)
self._time_since_restore = 0.0
self._timesteps_since_restore = 0
self._iterations_since_restore = 0
self._restored = True
def restore_from_object(self, obj):
@@ -350,12 +353,16 @@ class Trainable(object):
def reset_config(self, new_config):
"""Resets configuration without restarting the trial.
This method is optional, but can be implemented to speed up algorithms
such as PBT, and to allow performance optimizations such as running
experiments with reuse_actors=True.
Args:
new_config (dir): Updated hyperparameter configuration
for the trainable.
Returns:
True if configuration reset successfully else False.
True if reset was successful else False.
"""
return False
+26 -10
View File
@@ -16,6 +16,7 @@ from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.result import TIME_THIS_ITER_S
from ray.tune.trial import Trial, Checkpoint
from ray.tune.schedulers import FIFOScheduler, TrialScheduler
from ray.tune.util import warn_if_slow
from ray.tune.web_server import TuneServer
MAX_DEBUG_TRIALS = 20
@@ -70,6 +71,7 @@ class TrialRunner(object):
server_port=TuneServer.DEFAULT_PORT,
verbose=True,
queue_trials=False,
reuse_actors=False,
trial_executor=None):
"""Initializes a new TrialRunner.
@@ -87,12 +89,16 @@ class TrialRunner(object):
not currently have enough resources to launch one. This should
be set to True when running on an autoscaling cluster to enable
automatic scale-up.
reuse_actors (bool): Whether to reuse actors between different
trials when possible. This can drastically speed up experiments
that start and stop actors often (e.g., PBT in
time-multiplexing mode).
trial_executor (TrialExecutor): Defaults to RayTrialExecutor.
"""
self._search_alg = search_alg
self._scheduler_alg = scheduler or FIFOScheduler()
self.trial_executor = trial_executor or \
RayTrialExecutor(queue_trials=queue_trials)
self.trial_executor = (trial_executor or RayTrialExecutor(
queue_trials=queue_trials, reuse_actors=reuse_actors))
# For debugging, it may be useful to halt trials after some time has
# elapsed. TODO(ekl) consider exposing this in the API.
@@ -226,7 +232,8 @@ class TrialRunner(object):
self.trial_executor.on_step_begin()
next_trial = self._get_next_trial()
if next_trial is not None:
self.trial_executor.start_trial(next_trial)
with warn_if_slow("start_trial"):
self.trial_executor.start_trial(next_trial)
elif self.trial_executor.get_running_trials():
self._process_events()
else:
@@ -284,7 +291,8 @@ class TrialRunner(object):
"""
trial.set_verbose(self._verbose)
self._trials.append(trial)
self._scheduler_alg.on_trial_add(self, trial)
with warn_if_slow("scheduler.on_trial_add"):
self._scheduler_alg.on_trial_add(self, trial)
self.trial_executor.try_checkpoint_metadata(trial)
def debug_string(self, max_debug=MAX_DEBUG_TRIALS):
@@ -388,6 +396,10 @@ class TrialRunner(object):
def _process_events(self):
trial = self.trial_executor.get_next_available_trial()
with warn_if_slow("process_trial"):
self._process_trial(trial)
def _process_trial(self, trial):
try:
result = self.trial_executor.fetch_result(trial)
self._total_time += result[TIME_THIS_ITER_S]
@@ -400,12 +412,15 @@ class TrialRunner(object):
decision = TrialScheduler.STOP
else:
decision = self._scheduler_alg.on_trial_result(
self, trial, result)
self._search_alg.on_trial_result(trial.trial_id, result)
with warn_if_slow("scheduler.on_trial_result"):
decision = self._scheduler_alg.on_trial_result(
self, trial, result)
with warn_if_slow("search_alg.on_trial_result"):
self._search_alg.on_trial_result(trial.trial_id, result)
if decision == TrialScheduler.STOP:
self._search_alg.on_trial_complete(
trial.trial_id, early_terminated=True)
with warn_if_slow("search_alg.on_trial_complete"):
self._search_alg.on_trial_complete(
trial.trial_id, early_terminated=True)
trial.update_last_result(
result, terminate=(decision == TrialScheduler.STOP))
@@ -484,7 +499,8 @@ class TrialRunner(object):
"""
self._scheduler_alg.on_trial_error(self, trial)
self.trial_executor.set_status(trial, Trial.PENDING)
self._scheduler_alg.on_trial_add(self, trial)
with warn_if_slow("scheduler.on_trial_add"):
self._scheduler_alg.on_trial_add(self, trial)
def _update_trial_queue(self, blocking=False, timeout=600):
"""Adds next trials to queue if possible.
+6
View File
@@ -60,6 +60,7 @@ def run_experiments(experiments,
verbose=2,
resume=False,
queue_trials=False,
reuse_actors=False,
trial_executor=None,
raise_on_failed_trial=True):
"""Runs and blocks until all trials finish.
@@ -84,6 +85,10 @@ def run_experiments(experiments,
not currently have enough resources to launch one. This should
be set to True when running on an autoscaling cluster to enable
automatic scale-up.
reuse_actors (bool): Whether to reuse actors between different trials
when possible. This can drastically speed up experiments that start
and stop actors often (e.g., PBT in time-multiplexing mode). This
requires trials to have the same resource requirements.
trial_executor (TrialExecutor): Manage the execution of trials.
raise_on_failed_trial (bool): Raise TuneError if there exists failed
trial (of ERROR state) when the experiments complete.
@@ -161,6 +166,7 @@ def run_experiments(experiments,
server_port=server_port,
verbose=bool(verbose > 1),
queue_trials=queue_trials,
reuse_actors=reuse_actors,
trial_executor=trial_executor)
if verbose:
+26
View File
@@ -2,12 +2,16 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import base64
import copy
import numpy as np
import time
import ray
logger = logging.getLogger(__name__)
_pinned_objects = []
PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:"
@@ -36,6 +40,28 @@ def get_pinned_object(pinned_id):
ObjectID(base64.b64decode(pinned_id[len(PINNED_OBJECT_PREFIX):]))))
class warn_if_slow(object):
"""Prints a warning if a given operation is slower than 100ms.
Example:
>>> with warn_if_slow("some_operation"):
... ray.get(something)
"""
def __init__(self, name):
self.name = name
def __enter__(self):
self.start = time.time()
def __exit__(self, type, value, traceback):
now = time.time()
if now - self.start > 0.1:
logger.warning("The `{}` operation took {} seconds to complete, ".
format(self.name, now - self.start) +
"which may be a performance bottleneck.")
def merge_dicts(d1, d2):
"""Returns a new dict that is d1 and d2 deep merged."""
merged = copy.deepcopy(d1)