diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 548e092cf..81f02661f 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -39,6 +39,7 @@ class RayTrialExecutor(TrialExecutor): def __init__(self, queue_trials=False, reuse_actors=False, + ray_auto_init=False, refresh_period=RESOURCE_REFRESH_PERIOD): super(RayTrialExecutor, self).__init__(queue_trials) self._running = {} @@ -55,6 +56,12 @@ class RayTrialExecutor(TrialExecutor): self._refresh_period = refresh_period self._last_resource_refresh = float("-inf") self._last_nontrivial_wait = time.time() + if not ray.is_initialized() and ray_auto_init: + logger.info("Initializing Ray automatically." + "For cluster usage or custom Ray initialization, " + "call `ray.init(...)` before `tune.run`.") + ray.init() + if ray.is_initialized(): self._update_avail_resources() diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index a9bf8e323..37022ceab 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -1218,7 +1218,7 @@ class TrialRunnerTest(unittest.TestCase): def testExtraResources(self): ray.init(num_cpus=4, num_gpus=2) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 @@ -1239,7 +1239,7 @@ class TrialRunnerTest(unittest.TestCase): def testCustomResources(self): ray.init(num_cpus=4, num_gpus=2, resources={"a": 2}) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 @@ -1260,7 +1260,7 @@ class TrialRunnerTest(unittest.TestCase): def testExtraCustomResources(self): ray.init(num_cpus=4, num_gpus=2, resources={"a": 2}) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 @@ -1283,7 +1283,7 @@ class TrialRunnerTest(unittest.TestCase): def testCustomResources2(self): ray.init(num_cpus=4, num_gpus=2, resources={"a": 2}) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2}) self.assertTrue(runner.has_resources(resource1)) resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2}) @@ -1295,7 +1295,7 @@ class TrialRunnerTest(unittest.TestCase): def testFractionalGpus(self): ray.init(num_cpus=4, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "resources": Resources(cpu=1, gpu=0.5), } @@ -1318,7 +1318,7 @@ class TrialRunnerTest(unittest.TestCase): def testResourceScheduler(self): ray.init(num_cpus=4, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 @@ -1347,7 +1347,7 @@ class TrialRunnerTest(unittest.TestCase): def testMultiStepRun(self): ray.init(num_cpus=4, num_gpus=2) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 5 @@ -1377,7 +1377,7 @@ class TrialRunnerTest(unittest.TestCase): def testMultiStepRun2(self): """Checks that runner.step throws when overstepping.""" ray.init(num_cpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 2 @@ -1411,8 +1411,7 @@ class TrialRunnerTest(unittest.TestCase): executor.start_trial(trial) return TrialScheduler.CONTINUE - runner = TrialRunner( - BasicVariantGenerator(), scheduler=ChangingScheduler()) + runner = TrialRunner(scheduler=ChangingScheduler()) kwargs = { "stopping_criterion": { "training_iteration": 2 @@ -1434,7 +1433,7 @@ class TrialRunnerTest(unittest.TestCase): def testErrorHandling(self): ray.init(num_cpus=4, num_gpus=2) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 @@ -1456,7 +1455,7 @@ class TrialRunnerTest(unittest.TestCase): def testThrowOnOverstep(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() runner.step() self.assertRaises(TuneError, runner.step) @@ -1550,7 +1549,7 @@ class TrialRunnerTest(unittest.TestCase): def testFailureRecoveryMaxFailures(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "resources": Resources(cpu=1, gpu=1), "checkpoint_freq": 1, @@ -1579,7 +1578,7 @@ class TrialRunnerTest(unittest.TestCase): def testCheckpointing(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 1 @@ -1610,7 +1609,7 @@ class TrialRunnerTest(unittest.TestCase): def testRestoreMetricsAfterCheckpointing(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "resources": Resources(cpu=1, gpu=1), } @@ -1642,7 +1641,7 @@ class TrialRunnerTest(unittest.TestCase): def testCheckpointingAtEnd(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 2 @@ -1663,7 +1662,7 @@ class TrialRunnerTest(unittest.TestCase): def testResultDone(self): """Tests that last_result is marked `done` after trial is complete.""" ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 2 @@ -1682,7 +1681,7 @@ class TrialRunnerTest(unittest.TestCase): def testPauseThenResume(self): ray.init(num_cpus=1, num_gpus=1) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 2 @@ -1713,7 +1712,7 @@ class TrialRunnerTest(unittest.TestCase): def testStepHook(self): ray.init(num_cpus=4, num_gpus=2) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() def on_step_begin(self): self._update_avail_resources() @@ -1743,7 +1742,7 @@ class TrialRunnerTest(unittest.TestCase): def testStopTrial(self): ray.init(num_cpus=4, num_gpus=2) - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() kwargs = { "stopping_criterion": { "training_iteration": 5 @@ -1953,8 +1952,7 @@ class TrialRunnerTest(unittest.TestCase): ray.init(num_cpus=3) tmpdir = tempfile.mkdtemp() - runner = TrialRunner( - BasicVariantGenerator(), metadata_checkpoint_dir=tmpdir) + runner = TrialRunner(metadata_checkpoint_dir=tmpdir) trials = [ Trial( "__fake", @@ -2013,8 +2011,7 @@ class TrialRunnerTest(unittest.TestCase): ray.init(num_cpus=3) tmpdir = tempfile.mkdtemp() - runner = TrialRunner( - BasicVariantGenerator(), metadata_checkpoint_dir=tmpdir) + runner = TrialRunner(metadata_checkpoint_dir=tmpdir) runner.add_trial( Trial( @@ -2069,8 +2066,7 @@ class TrialRunnerTest(unittest.TestCase): }, checkpoint_freq=1) tmpdir = tempfile.mkdtemp() - runner = TrialRunner( - BasicVariantGenerator(), metadata_checkpoint_dir=tmpdir) + runner = TrialRunner(metadata_checkpoint_dir=tmpdir) runner.add_trial(trial) for i in range(5): runner.step() @@ -2091,8 +2087,7 @@ class TrialRunnerTest(unittest.TestCase): ray.init() trial = Trial("__fake", checkpoint_freq=1) tmpdir = tempfile.mkdtemp() - runner = TrialRunner( - BasicVariantGenerator(), metadata_checkpoint_dir=tmpdir) + runner = TrialRunner(metadata_checkpoint_dir=tmpdir) runner.add_trial(trial) for i in range(5): runner.step() diff --git a/python/ray/tune/tests/test_tune_restore.py b/python/ray/tune/tests/test_tune_restore.py index 3742cf598..768e9d72b 100644 --- a/python/ray/tune/tests/test_tune_restore.py +++ b/python/ray/tune/tests/test_tune_restore.py @@ -53,5 +53,20 @@ class TuneRestoreTest(unittest.TestCase): ) +class AutoInitTest(unittest.TestCase): + def testTuneRestore(self): + self.assertFalse(ray.is_initialized()) + tune.run( + "__fake", + name="TestAutoInit", + stop={"training_iteration": 1}, + ray_auto_init=True) + self.assertTrue(ray.is_initialized()) + + def tearDown(self): + ray.shutdown() + _register_all() + + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/python/ray/tune/tests/test_tune_server.py b/python/ray/tune/tests/test_tune_server.py index 7df7a698b..dd9a9134c 100644 --- a/python/ray/tune/tests/test_tune_server.py +++ b/python/ray/tune/tests/test_tune_server.py @@ -12,7 +12,6 @@ from ray import tune from ray.rllib import _register_all from ray.tune.trial import Trial, Resources from ray.tune.web_server import TuneClient -from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial_runner import TrialRunner @@ -34,8 +33,7 @@ class TuneServerSuite(unittest.TestCase): def basicSetup(self): ray.init(num_cpus=4, num_gpus=1) port = get_valid_port() - self.runner = TrialRunner( - BasicVariantGenerator(), launch_web_server=True, server_port=port) + self.runner = TrialRunner(launch_web_server=True, server_port=port) runner = self.runner kwargs = { "stopping_criterion": { diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 8ffcb6e31..dfd809732 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -18,6 +18,7 @@ from ray.tune.result import TIME_THIS_ITER_S, RESULT_DUPLICATE from ray.tune.trial import Trial, Checkpoint from ray.tune.sample import function from ray.tune.schedulers import FIFOScheduler, TrialScheduler +from ray.tune.suggest import BasicVariantGenerator from ray.tune.util import warn_if_slow from ray.utils import binary_to_hex, hex_to_binary from ray.tune.web_server import TuneServer @@ -77,7 +78,7 @@ class TrialRunner(object): """A TrialRunner implements the event loop for scheduling trials on Ray. Example: - runner = TrialRunner(BasicVariantGenerator()) + runner = TrialRunner() runner.add_trial(Trial(...)) runner.add_trial(Trial(...)) while not runner.is_finished(): @@ -98,14 +99,12 @@ class TrialRunner(object): CKPT_FILE_TMPL = "experiment_state-{}.json" def __init__(self, - search_alg, + search_alg=None, scheduler=None, launch_web_server=False, metadata_checkpoint_dir=None, server_port=TuneServer.DEFAULT_PORT, verbose=True, - queue_trials=False, - reuse_actors=False, trial_executor=None): """Initializes a new TrialRunner. @@ -119,20 +118,15 @@ class TrialRunner(object): server_port (int): Port number for launching TuneServer verbose (bool): Flag for verbosity. If False, trial results will not be output. - queue_trials (bool): Whether to queue trials when the cluster does - not currently have enough resources to launch one. This should - be set to True when running on an autoscaling cluster to enable - automatic scale-up. reuse_actors (bool): Whether to reuse actors between different trials when possible. This can drastically speed up experiments that start and stop actors often (e.g., PBT in time-multiplexing mode). trial_executor (TrialExecutor): Defaults to RayTrialExecutor. """ - self._search_alg = search_alg + self._search_alg = search_alg or BasicVariantGenerator() self._scheduler_alg = scheduler or FIFOScheduler() - self.trial_executor = (trial_executor or RayTrialExecutor( - queue_trials=queue_trials, reuse_actors=reuse_actors)) + self.trial_executor = trial_executor or RayTrialExecutor() # For debugging, it may be useful to halt trials after some time has # elapsed. TODO(ekl) consider exposing this in the API. @@ -141,7 +135,6 @@ class TrialRunner(object): self._total_time = 0 self._iteration = 0 self._verbose = verbose - self._queue_trials = queue_trials self._server = None self._server_port = server_port @@ -229,11 +222,8 @@ class TrialRunner(object): "This will ignore any new changes to the specification." ])) - from ray.tune.suggest import BasicVariantGenerator runner = TrialRunner( - search_alg or BasicVariantGenerator(), - scheduler=scheduler, - trial_executor=trial_executor) + search_alg, scheduler=scheduler, trial_executor=trial_executor) runner.__setstate__(runner_state["runner_data"]) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 51f5dcdf2..0d84b6651 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -11,6 +11,7 @@ from ray.tune.error import TuneError from ray.tune.experiment import convert_to_experiment_list, Experiment from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, DEBUG_PRINT_INTERVAL +from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.log_sync import wait_for_log_sync from ray.tune.trial_runner import TrialRunner from ray.tune.schedulers import (HyperBandScheduler, AsyncHyperBandScheduler, @@ -90,7 +91,8 @@ def run(run_or_experiment, queue_trials=False, reuse_actors=False, trial_executor=None, - raise_on_failed_trial=True): + raise_on_failed_trial=True, + ray_auto_init=True): """Executes training. Args: @@ -166,6 +168,9 @@ def run(run_or_experiment, trial_executor (TrialExecutor): Manage the execution of trials. raise_on_failed_trial (bool): Raise TuneError if there exists failed trial (of ERROR state) when the experiments complete. + ray_auto_init (bool): Automatically starts a local Ray cluster + if using a RayTrialExecutor (which is the default) and + if Ray is not initialized. Defaults to True. Returns: List of Trial objects. @@ -187,6 +192,10 @@ def run(run_or_experiment, } ) """ + trial_executor = trial_executor or RayTrialExecutor( + queue_trials=queue_trials, + reuse_actors=reuse_actors, + ray_auto_init=ray_auto_init) experiment = run_or_experiment if not isinstance(run_or_experiment, Experiment): experiment = Experiment( @@ -229,14 +238,12 @@ def run(run_or_experiment, search_alg.add_configurations([experiment]) runner = TrialRunner( - search_alg, + search_alg=search_alg, scheduler=scheduler, metadata_checkpoint_dir=checkpoint_dir, launch_web_server=with_server, server_port=server_port, verbose=bool(verbose > 1), - queue_trials=queue_trials, - reuse_actors=reuse_actors, trial_executor=trial_executor) if verbose: