diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index 5d9e6540c..a830791d0 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -688,6 +688,8 @@ These are the environment variables Ray Tune currently considers: experiment state is checkpointed. If not set this will default to ``10``. * **TUNE_MAX_LEN_IDENTIFIER**: Maximum length of trial subdirectory names (those with the parameter values in them) +* **TUNE_MAX_PENDING_TRIALS_PG**: Maximum number of pending trials when placement groups are used. Defaults + to ``1000``. * **TUNE_RESULT_DIR**: Directory where Ray Tune trial results are stored. If this is not set, ``~/ray_results`` will be used. * **TUNE_RESULT_BUFFER_LENGTH**: Ray Tune can buffer results from trainables before they are passed @@ -697,6 +699,9 @@ These are the environment variables Ray Tune currently considers: but never longer than this value. Defaults to 100 (seconds). * **TUNE_RESULT_BUFFER_MIN_TIME_S**: Additionally, you can specify a minimum time to buffer results. Defaults to 0. * **TUNE_SYNCER_VERBOSITY**: Amount of command output when using Tune with Docker Syncer. Defaults to 0. +* **TUNE_TRIAL_STARTUP_GRACE_PERIOD**: Amount of time after starting a trial that Ray Tune checks for successful + trial startups. After the grace period, Tune will block until a result from a running trial is received. Can + be disabled by setting this to lower or equal to 0. * **TUNE_WARN_THRESHOLD_S**: Threshold for logging if an Tune event loop operation takes too long. Defaults to 0.5 (seconds). * **TUNE_STATE_REFRESH_PERIOD**: Frequency of updating the resource tracking from Ray. Defaults to 10 (seconds). diff --git a/python/ray/tune/config_parser.py b/python/ray/tune/config_parser.py index 0f75bd992..5fda01ac5 100644 --- a/python/ray/tune/config_parser.py +++ b/python/ray/tune/config_parser.py @@ -3,11 +3,12 @@ import json import os # For compatibility under py2 to consider unicode as str +from ray.tune.utils.serialization import TuneFunctionEncoder from six import string_types from ray.tune import TuneError from ray.tune.trial import Trial -from ray.tune.resources import json_to_resources +from ray.tune.resources import PlacementGroupFactory, json_to_resources from ray.tune.utils.util import SafeFallbackEncoder @@ -142,11 +143,16 @@ def to_argv(config): argv.append(v) elif isinstance(v, bool): pass + elif callable(v): + argv.append(json.dumps(v, cls=TuneFunctionEncoder)) else: argv.append(json.dumps(v, cls=SafeFallbackEncoder)) return argv +_cached_pgf = {} + + def create_trial_from_spec(spec, output_path, parser, **trial_kwargs): """Creates a Trial object from parsing the spec. @@ -163,13 +169,34 @@ def create_trial_from_spec(spec, output_path, parser, **trial_kwargs): Returns: A trial object with corresponding parameters to the specification. """ + global _cached_pgf + + spec = spec.copy() + resources = spec.pop("resources_per_trial", None) + try: args, _ = parser.parse_known_args(to_argv(spec)) except SystemExit: raise TuneError("Error parsing args, see above message", spec) - if "resources_per_trial" in spec: - trial_kwargs["resources"] = json_to_resources( - spec["resources_per_trial"]) + + if resources: + if isinstance(resources, PlacementGroupFactory): + trial_kwargs["placement_group_factory"] = resources + elif callable(resources): + if resources in _cached_pgf: + trial_kwargs["placement_group_factory"] = _cached_pgf[ + resources] + else: + pgf = PlacementGroupFactory(resources) + _cached_pgf[resources] = pgf + trial_kwargs["placement_group_factory"] = pgf + else: + try: + trial_kwargs["resources"] = json_to_resources(resources) + except (TuneError, ValueError) as exc: + raise TuneError("Error parsing resources_per_trial", + resources) from exc + return Trial( # Submitting trial via server in py2.7 creates Unicode, which does not # convert to string in a straightforward manner. diff --git a/python/ray/tune/integration/kubernetes.py b/python/ray/tune/integration/kubernetes.py index 484475508..eb6bd6d38 100644 --- a/python/ray/tune/integration/kubernetes.py +++ b/python/ray/tune/integration/kubernetes.py @@ -25,7 +25,8 @@ def NamespacedKubernetesSyncer(namespace): Args: namespace (str): Kubernetes namespace. - Returns: A ``KubernetesSyncer`` class to be passed to ``tune.run()``. + Returns: + A ``KubernetesSyncer`` class to be passed to ``tune.run()``. Example: diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 22fad4885..bc81b5025 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -7,8 +7,10 @@ import random import time import traceback from contextlib import contextmanager +from typing import List, Optional import ray +from ray.actor import ActorHandle from ray.exceptions import GetTimeoutError from ray import ray_constants from ray.resource_spec import ResourceSpec @@ -18,10 +20,12 @@ from ray.tune.function_runner import FunctionRunner from ray.tune.logger import NoopLogger from ray.tune.result import TRIAL_INFO, STDOUT_FILE, STDERR_FILE from ray.tune.resources import Resources +from ray.tune.utils.placement_groups import PlacementGroupManager from ray.tune.utils.trainable import TrainableUtil from ray.tune.trial import Trial, Checkpoint, Location, TrialInfo from ray.tune.trial_executor import TrialExecutor from ray.tune.utils import warn_if_slow +from ray.util.placement_group import PlacementGroup, remove_placement_group logger = logging.getLogger(__name__) @@ -88,11 +92,14 @@ class _TrialCleanup: is passed, cleanup will kick in and remove futures. """ - def __init__(self, threshold=TRIAL_CLEANUP_THRESHOLD): + def __init__(self, threshold: int = TRIAL_CLEANUP_THRESHOLD): self.threshold = threshold self._cleanup_map = {} - def add(self, trial, actor): + def add(self, + trial: Trial, + actor: ActorHandle, + placement_group: Optional[PlacementGroup] = None): """Adds a trial actor to be stopped. If the number of futures exceeds the threshold, the cleanup mechanism @@ -101,15 +108,20 @@ class _TrialCleanup: Args: trial (Trial): The trial corresponding to the future. actor (ActorHandle): Handle to the trainable to be stopped. + placement_group (PlacementGroup): Placement group to stop. """ future = actor.stop.remote() - actor.__ray_terminate__.remote() + + if placement_group: + remove_placement_group(placement_group) + else: + actor.__ray_terminate__.remote() self._cleanup_map[future] = trial if len(self._cleanup_map) > self.threshold: self.cleanup(partial=True) - def cleanup(self, partial=True): + def cleanup(self, partial: bool = True): """Waits for cleanup to finish. If partial=False, all futures are expected to return. If a future @@ -141,10 +153,10 @@ class RayTrialExecutor(TrialExecutor): """An implementation of TrialExecutor based on Ray.""" def __init__(self, - queue_trials=False, - reuse_actors=False, - ray_auto_init=None, - refresh_period=None): + queue_trials: bool = False, + reuse_actors: bool = False, + ray_auto_init: Optional[bool] = None, + refresh_period: Optional[float] = None): if ray_auto_init is None: if os.environ.get("TUNE_DISABLE_AUTO_INIT") == "1": logger.info("'TUNE_DISABLE_AUTO_INIT=1' detected.") @@ -168,6 +180,9 @@ class RayTrialExecutor(TrialExecutor): self._avail_resources = Resources(cpu=0, gpu=0) self._committed_resources = Resources(cpu=0, gpu=0) + self._pg_manager = PlacementGroupManager() + self._staged_trials = set() + self._resources_initialized = False if refresh_period is None: @@ -188,6 +203,49 @@ class RayTrialExecutor(TrialExecutor): if ray.is_initialized(): self._update_avail_resources() + def in_staging_grace_period(self) -> bool: + """Returns True if trials have recently been staged.""" + return self._pg_manager.in_staging_grace_period() + + def stage_and_update_status(self, trials: List[Trial]): + """Check and update statuses of scheduled placement groups. + + Stages placement groups of all trials. + """ + for trial in trials: + if trial.status != Trial.PENDING: + continue + if not trial.uses_placement_groups: + continue + if trial in self._staged_trials: + continue + if self._pg_manager.trial_in_use(trial): + continue + + if not self._pg_manager.stage_trial_pg( + trial.placement_group_factory): + # Break if we reached the limit of pending placement groups. + break + + self._staged_trials.add(trial) + + self._pg_manager.update_status() + + def get_staged_trial(self): + """Get a trial whose placement group was successfully staged. + + Can also return None if no trial is available. + + Returns: + Trial object or None. + + """ + for trial in self._staged_trials: + if self._pg_manager.has_ready(trial.placement_group_factory): + return trial + + return None + def _setup_remote_runner(self, trial, reuse_allowed): trial.init_logdir() # We checkpoint metadata here to try mitigating logdir duplication @@ -212,16 +270,31 @@ class RayTrialExecutor(TrialExecutor): logger.debug("Cannot reuse cached runner {} for new trial".format( self._cached_actor)) with self._change_working_directory(trial): - self._trial_cleanup.add(trial, actor=self._cached_actor) + pg = self._pg_manager.clean_trial_placement_group(trial) + + self._trial_cleanup.add( + trial, actor=self._cached_actor, placement_group=pg) self._cached_actor = None _actor_cls = _class_cache.get(trial.get_trainable_cls()) - full_actor_class = _actor_cls.options( - num_cpus=trial.resources.cpu, - num_gpus=trial.resources.gpu, - memory=trial.resources.memory or None, - object_store_memory=trial.resources.object_store_memory or None, - resources=trial.resources.custom_resources) + if trial.uses_placement_groups: + if not self._pg_manager.has_ready(trial.placement_group_factory): + if trial not in self._staged_trials: + if self._pg_manager.stage_trial_pg( + trial.placement_group_factory): + self._staged_trials.add(trial) + return None + else: + full_actor_class = self._pg_manager.get_full_actor_cls( + trial, _actor_cls) + else: + full_actor_class = _actor_cls.options( + num_cpus=trial.resources.cpu, + num_gpus=trial.resources.gpu, + memory=trial.resources.memory or None, + object_store_memory=trial.resources.object_store_memory + or None, + resources=trial.resources.custom_resources) # Clear the Trial's location (to be updated later on result) # since we don't know where the remote runner is placed. trial.set_location(Location()) @@ -285,7 +358,8 @@ class RayTrialExecutor(TrialExecutor): trial_item = self._find_item(self._running, trial) assert len(trial_item) < 2, trial_item - def _start_trial(self, trial, checkpoint=None, runner=None, train=True): + def _start_trial(self, trial, checkpoint=None, runner=None, + train=True) -> bool: """Starts trial and restores last result if trial was paused. Args: @@ -297,6 +371,9 @@ class RayTrialExecutor(TrialExecutor): cached actor. If None, a new runner is created. train (bool): Whether or not to start training. + Returns: + True if trial was started successfully, False otherwise. + See `RayTrialExecutor.restore` for possible errors raised. """ prior_status = trial.status @@ -309,6 +386,8 @@ class RayTrialExecutor(TrialExecutor): or issubclass(trial.get_trainable_cls(), FunctionRunner) runner = self._setup_remote_runner(trial, reuse_allowed) + if not runner: + return False trial.set_runner(runner) self.restore(trial, checkpoint) self.set_status(trial, Trial.RUNNING) @@ -320,6 +399,7 @@ class RayTrialExecutor(TrialExecutor): self._running[previous_run[0]] = trial elif train and not trial.is_restoring: self._train(trial) + return True def _stop_trial(self, trial, error=False, error_msg=None): """Stops this trial. @@ -344,15 +424,17 @@ class RayTrialExecutor(TrialExecutor): self._cached_actor = trial.runner else: logger.debug("Trial %s: Destroying actor.", trial) + pg = self._pg_manager.clean_trial_placement_group(trial) with self._change_working_directory(trial): - self._trial_cleanup.add(trial, actor=trial.runner) + self._trial_cleanup.add( + trial, actor=trial.runner, placement_group=pg) except Exception: logger.exception("Trial %s: Error stopping runner.", trial) self.set_status(trial, Trial.ERROR) finally: trial.set_runner(None) - def start_trial(self, trial, checkpoint=None, train=True): + def start_trial(self, trial, checkpoint=None, train=True) -> bool: """Starts the trial. Will not return resources if trial repeatedly fails on start. @@ -362,16 +444,21 @@ class RayTrialExecutor(TrialExecutor): checkpoint (Checkpoint): A Python object or path storing the state of trial. train (bool): Whether or not to start training. + + Returns: + True if trial was started successfully, False otherwise. """ - self._commit_resources(trial.resources) + if not trial.uses_placement_groups: + self._commit_resources(trial.resources) try: - self._start_trial(trial, checkpoint, train=train) + return self._start_trial(trial, checkpoint, train=train) except AbortTrialExecution: logger.exception("Trial %s: Error starting runner, aborting!", trial) time.sleep(2) error_msg = traceback.format_exc() self._stop_trial(trial, error=True, error_msg=error_msg) + return False except Exception: logger.exception("Trial %s: Unexpected error starting runner.", trial) @@ -380,6 +467,7 @@ class RayTrialExecutor(TrialExecutor): self._stop_trial(trial, error=True, error_msg=error_msg) # Note that we don't return the resources, since they may # have been lost. TODO(ujvl): is this the right thing to do? + return False def _find_item(self, dictionary, item): out = [rid for rid, t in dictionary.items() if t is item] @@ -391,7 +479,8 @@ class RayTrialExecutor(TrialExecutor): self._stop_trial(trial, error=error, error_msg=error_msg) if prior_status == Trial.RUNNING: logger.debug("Trial %s: Returning resources.", trial) - self._return_resources(trial.resources) + if not trial.uses_placement_groups: + self._return_resources(trial.resources) out = self._find_item(self._running, trial) for result_id in out: self._running.pop(result_id) @@ -478,7 +567,9 @@ class RayTrialExecutor(TrialExecutor): return trial return None - def get_next_available_trial(self): + def get_next_available_trial(self, timeout: Optional[float] = None): + if not self._running: + return None shuffled_results = list(self._running.keys()) random.shuffle(shuffled_results) # Note: We shuffle the results because `ray.wait` by default returns @@ -486,7 +577,10 @@ class RayTrialExecutor(TrialExecutor): # trials (i.e. trials that run remotely) also get fairly reported. # See https://github.com/ray-project/ray/issues/4211 for details. start = time.time() - [result_id], _ = ray.wait(shuffled_results) + ready, _ = ray.wait(shuffled_results, timeout=timeout) + if not ready: + return None + result_id = ready[0] wait_time = time.time() - start if wait_time > NONTRIVIAL_WAIT_TIME_THRESHOLD_S: self._last_nontrivial_wait = time.time() @@ -540,6 +634,9 @@ class RayTrialExecutor(TrialExecutor): custom_resources=custom_resources) def _return_resources(self, resources): + if resources.has_placement_group: + return + committed = self._committed_resources all_keys = set(resources.custom_resources).union( @@ -611,6 +708,9 @@ class RayTrialExecutor(TrialExecutor): has exceeded self._refresh_period. This also assumes that the cluster is not resizing very frequently. """ + if resources.has_placement_group: + return self._pg_manager.can_stage() + self._update_avail_resources() currently_available = Resources.subtract(self._avail_resources, self._committed_resources) diff --git a/python/ray/tune/resources.py b/python/ray/tune/resources.py index 991832840..044d0d688 100644 --- a/python/ray/tune/resources.py +++ b/python/ray/tune/resources.py @@ -3,6 +3,8 @@ import logging import json from numbers import Number # For compatibility under py2 to consider unicode as str +from typing import Optional + from six import string_types import ray @@ -15,7 +17,7 @@ class Resources( namedtuple("Resources", [ "cpu", "gpu", "memory", "object_store_memory", "extra_cpu", "extra_gpu", "extra_memory", "extra_object_store_memory", - "custom_resources", "extra_custom_resources" + "custom_resources", "extra_custom_resources", "has_placement_group" ])): """Ray resources required to schedule a trial. @@ -38,6 +40,8 @@ class Resources( extra_custom_resources (dict): Extra custom resources to reserve in case the trial needs to launch additional Ray actors that use any of these custom resources. + has_placement_group (bool): Bool indicating if the trial also + has an associated placement group. """ @@ -53,7 +57,8 @@ class Resources( extra_memory=0, extra_object_store_memory=0, custom_resources=None, - extra_custom_resources=None): + extra_custom_resources=None, + has_placement_group=False): custom_resources = custom_resources or {} extra_custom_resources = extra_custom_resources or {} leftovers = set(custom_resources) ^ set(extra_custom_resources) @@ -92,7 +97,7 @@ class Resources( return super(Resources, cls).__new__( cls, cpu, gpu, memory, object_store_memory, extra_cpu, extra_gpu, extra_memory, extra_object_store_memory, custom_resources, - extra_custom_resources) + extra_custom_resources, has_placement_group) def summary_string(self): summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu, @@ -171,11 +176,22 @@ class Resources( return resources_to_json(self) -def json_to_resources(data): +class PlacementGroupFactory: + """Wrapper class to identify placement group factory methods.""" + + def __init__(self, factory): + self._factory = factory + + def __call__(self, *args, **kwargs): + return self._factory(*args, **kwargs) + + +def json_to_resources(data: Optional[str]): if data is None or data == "null": return None if isinstance(data, string_types): data = json.loads(data) + for k in data: if k in ["driver_cpu_limit", "driver_gpu_limit"]: raise TuneError( @@ -193,7 +209,7 @@ def json_to_resources(data): data.get("extra_custom_resources")) -def resources_to_json(resources): +def resources_to_json(resources: Optional[Resources]): if resources is None: return None return { diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 6267dee21..b560a45aa 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -3,6 +3,7 @@ import unittest from unittest.mock import patch import ray +from ray import tune from ray.rllib import _register_all from ray.tune import Trainable from ray.tune.ray_trial_executor import RayTrialExecutor @@ -12,6 +13,7 @@ from ray.tune.suggest import BasicVariantGenerator from ray.tune.trial import Trial, Checkpoint from ray.tune.resources import Resources from ray.cluster_utils import Cluster +from ray.util import placement_group class RayTrialExecutorTest(unittest.TestCase): @@ -270,6 +272,87 @@ class RayExecutorQueueTest(unittest.TestCase): self.trial_executor.has_resources(cpu_only_trial3.resources)) +class RayExecutorPlacementGroupTest(unittest.TestCase): + def setUp(self): + self.head_cpus = 8 + self.head_gpus = 4 + self.head_custom = 16 + + self.cluster = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "num_cpus": self.head_cpus, + "num_gpus": self.head_gpus, + "resources": { + "custom": self.head_custom + }, + "_system_config": { + "num_heartbeats_timeout": 10 + } + }) + # Pytest doesn't play nicely with imports + _register_all() + + def tearDown(self): + ray.shutdown() + self.cluster.shutdown() + _register_all() # re-register the evicted objects + + def testResourcesAvailableNoPlacementGroup(self): + def train(config): + tune.report(metric=0, resources=ray.available_resources()) + + out = tune.run( + train, + resources_per_trial={ + "cpu": 1, + "gpu": 1, + "custom_resources": { + "custom": 3 + }, + "extra_cpu": 3, + "extra_gpu": 1, + "extra_custom_resources": { + "custom": 4 + }, + }) + + # Only `cpu`, `gpu`, and `custom_resources` will be "really" reserved, + # the extra_* will just be internally reserved by Tune. + self.assertDictEqual({ + key: val + for key, val in out.trials[0].last_result["resources"].items() + if key in ["CPU", "GPU", "custom"] + }, { + "CPU": self.head_cpus - 1.0, + "GPU": self.head_gpus - 1.0, + "custom": self.head_custom - 3.0 + }) + + def testResourcesAvailableWithPlacementGroup(self): + def train(config): + tune.report(metric=0, resources=ray.available_resources()) + + def placement_group_factory(): + head_bundle = {"CPU": 1, "GPU": 0, "custom": 4} + child_bundle = {"CPU": 2, "GPU": 1, "custom": 3} + + return placement_group([head_bundle, child_bundle, child_bundle]) + + out = tune.run(train, resources_per_trial=placement_group_factory) + + self.assertDictEqual({ + key: val + for key, val in out.trials[0].last_result["resources"].items() + if key in ["CPU", "GPU", "custom"] + }, { + "CPU": self.head_cpus - 5.0, + "GPU": self.head_gpus - 2.0, + "custom": self.head_custom - 10.0 + }) + + class LocalModeExecutorTest(RayTrialExecutorTest): def setUp(self): ray.init(local_mode=True) diff --git a/python/ray/tune/tests/test_trial_runner.py b/python/ray/tune/tests/test_trial_runner.py index 399802564..64c5f861b 100644 --- a/python/ray/tune/tests/test_trial_runner.py +++ b/python/ray/tune/tests/test_trial_runner.py @@ -264,7 +264,7 @@ class TrialRunnerTest(unittest.TestCase): if result["training_iteration"] == 1: executor = trial_runner.trial_executor executor.stop_trial(trial) - trial.update_resources(2, 0) + trial.update_resources(dict(cpu=2, gpu=0)) executor.start_trial(trial) return TrialScheduler.CONTINUE @@ -282,7 +282,8 @@ class TrialRunnerTest(unittest.TestCase): runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) self.assertEqual(runner.trial_executor._committed_resources.cpu, 1) - self.assertRaises(ValueError, lambda: trials[0].update_resources(2, 0)) + self.assertRaises( + ValueError, lambda: trials[0].update_resources(dict(cpu=2, gpu=0))) runner.step() self.assertEqual(trials[0].status, Trial.RUNNING) diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index f835aa53b..ab10112d4 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -1,3 +1,4 @@ +import time from collections import Counter import os import pickle @@ -6,11 +7,15 @@ import sys import tempfile import unittest from unittest.mock import patch +import numpy as np import ray +from ray.cluster_utils import Cluster from ray.rllib import _register_all -from ray.tune import TuneError +from ray import tune +from ray.tune import Callback, TuneError +from ray.tune.ray_trial_executor import RayTrialExecutor from ray.tune.result import TRAINING_ITERATION from ray.tune.schedulers import TrialScheduler, FIFOScheduler from ray.tune.experiment import Experiment @@ -21,6 +26,7 @@ from ray.tune.suggest.repeater import Repeater from ray.tune.suggest._mock import _MockSuggestionAlgorithm from ray.tune.suggest.suggestion import Searcher, ConcurrencyLimiter from ray.tune.suggest.search_generator import SearchGenerator +from ray.util import placement_group class TrialRunnerTest3(unittest.TestCase): @@ -921,6 +927,165 @@ class ResourcesTest(unittest.TestCase): self.assertEqual(original, new_resource) +class TrialRunnerPlacementGroupTest(unittest.TestCase): + def setUp(self): + os.environ["TUNE_GLOBAL_CHECKPOINT_S"] = "10000" + self.head_cpus = 8 + self.head_gpus = 4 + self.head_custom = 16 + + self.cluster = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "num_cpus": self.head_cpus, + "num_gpus": self.head_gpus, + "resources": { + "custom": self.head_custom + }, + "_system_config": { + "num_heartbeats_timeout": 10 + } + }) + # Pytest doesn't play nicely with imports + _register_all() + + def tearDown(self): + ray.shutdown() + self.cluster.shutdown() + _register_all() # re-register the evicted objects + + def testPlacementGroupRequests(self, scheduled=10): + """In this test we try to start 10 trials but only have resources + for 2. Placement groups should still be created and PENDING. + + Eventually they should be scheduled sequentially (i.e. in pairs + of two).""" + + def train(config): + time.sleep(1) + now = time.time() + tune.report(end=now - config["start_time"]) + + def placement_group_factory(): + head_bundle = {"CPU": 4, "GPU": 0, "custom": 0} + child_bundle = {"custom": 1} + + return placement_group([head_bundle, child_bundle, child_bundle]) + + trial_executor = RayTrialExecutor() + + this = self + + class _TestCallback(Callback): + def on_step_end(self, iteration, trials, **info): + if iteration == 1: + this.assertEqual(scheduled, len(trials)) + this.assertEqual( + scheduled, + sum( + len(s) for s in + trial_executor._pg_manager._staging.values()) + + sum( + len(s) + for s in trial_executor._pg_manager._ready.values( + )) + len(trial_executor._pg_manager._in_use_pgs)) + + start = time.time() + out = tune.run( + train, + config={"start_time": start}, + resources_per_trial=placement_group_factory, + num_samples=10, + trial_executor=trial_executor, + callbacks=[_TestCallback()]) + + trial_end_times = sorted(t.last_result["end"] for t in out.trials) + print("Trial end times:", trial_end_times) + max_diff = trial_end_times[-1] - trial_end_times[0] + + # Not all trials have been run in parallel + self.assertGreater(max_diff, 5) + + # Some trials should have run in parallel + self.assertLess(max_diff, 10) + + @patch("ray.tune.trial_runner.TUNE_MAX_PENDING_TRIALS_PG", 6) + @patch("ray.tune.utils.placement_groups.TUNE_MAX_PENDING_TRIALS_PG", 6) + def testPlacementGroupLimitedRequests(self): + """Assert that maximum number of placement groups is enforced.""" + self.testPlacementGroupRequests(scheduled=6) + + def testPlacementGroupDistributedTraining(self): + """Run distributed training using placement groups. + + Each trial requests 4 CPUs and starts 4 remote training workers. + """ + + def placement_group_factory(): + head_bundle = {"CPU": 1, "GPU": 0, "custom": 0} + child_bundle = {"CPU": 1} + + return placement_group( + [head_bundle, child_bundle, child_bundle, child_bundle]) + + @ray.remote + class TrainingActor: + def train(self, val): + time.sleep(1) + return val + + def train(config): + base = config["base"] + actors = [TrainingActor.remote() for _ in range(4)] + futures = [ + actor.train.remote(base + 2 * i) + for i, actor in enumerate(actors) + ] + results = ray.get(futures) + + end = time.time() - config["start_time"] + tune.report(avg=np.mean(results), end=end) + + trial_executor = RayTrialExecutor() + + start = time.time() + out = tune.run( + train, + config={ + "start_time": start, + "base": tune.grid_search(list(range(0, 100, 10))) + }, + resources_per_trial=placement_group_factory, + num_samples=1, + trial_executor=trial_executor) + + avgs = sorted(t.last_result["avg"] for t in out.trials) + self.assertSequenceEqual(avgs, list(range(3, 103, 10))) + + trial_end_times = sorted(t.last_result["end"] for t in out.trials) + print("Trial end times:", trial_end_times) + max_diff = trial_end_times[-1] - trial_end_times[0] + + # Not all trials have been run in parallel + self.assertGreater(max_diff, 5) + + # Some trials should have run in parallel + # Todo: Re-enable when using buildkite + # self.assertLess(max_diff, 10) + + # Assert proper cleanup + pg_manager = trial_executor._pg_manager + self.assertFalse(pg_manager._in_use_trials) + self.assertFalse(pg_manager._in_use_pgs) + self.assertFalse(pg_manager._staging_futures) + for pgf in pg_manager._staging: + self.assertFalse(pg_manager._staging[pgf]) + for pgf in pg_manager._ready: + self.assertFalse(pg_manager._ready[pgf]) + self.assertTrue(pg_manager._latest_staging_start_time) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/tests/test_trial_runner_callbacks.py b/python/ray/tune/tests/test_trial_runner_callbacks.py index 4c56cb6d2..75b06d0e3 100644 --- a/python/ray/tune/tests/test_trial_runner_callbacks.py +++ b/python/ray/tune/tests/test_trial_runner_callbacks.py @@ -64,7 +64,7 @@ class _MockTrialExecutor(RayTrialExecutor): def fetch_result(self, trial): return [self.results.get(trial, {})] - def get_next_available_trial(self): + def get_next_available_trial(self, timeout=None): return self.next_trial or super().get_next_available_trial() def get_next_failed_trial(self): diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 2e9465c83..fc6152f97 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -1,5 +1,5 @@ +from typing import Callable, Dict, Sequence, Union import json -from typing import Sequence import ray.cloudpickle as cloudpickle from collections import deque @@ -18,7 +18,8 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager # have been defined yet. See https://github.com/ray-project/ray/issues/1716. from ray.tune.registry import get_trainable_cls, validate_trainable from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION -from ray.tune.resources import Resources, json_to_resources, resources_to_json +from ray.tune.resources import PlacementGroupFactory, Resources, \ + json_to_resources, resources_to_json from ray.tune.utils.serialization import TuneFunctionEncoder from ray.tune.utils.trainable import TrainableUtil from ray.tune.utils import date_str, flatten_dict @@ -179,6 +180,7 @@ class Trial: evaluated_params=None, experiment_tag="", resources=None, + placement_group_factory=None, stopping_criterion=None, remote_checkpoint_dir=None, checkpoint_freq=0, @@ -221,6 +223,12 @@ class Trial: resources = default_resources self.location = Location() self.resources = resources or Resources(cpu=1, gpu=0) + self.placement_group_factory = placement_group_factory + if self.placement_group_factory: + resource_kwargs = self.resources._asdict() + resource_kwargs["has_placement_group"] = True + self.resources = Resources(**resource_kwargs) + self.stopping_criterion = stopping_criterion or {} self.log_to_file = log_to_file @@ -330,6 +338,10 @@ class Trial: logdir_name = os.path.basename(self.logdir) return os.path.join(self.remote_checkpoint_dir_prefix, logdir_name) + @property + def uses_placement_groups(self): + return bool(self.placement_group_factory) + def reset(self): return Trial( self.trainable_name, @@ -339,6 +351,7 @@ class Trial: evaluated_params=self.evaluated_params, experiment_tag=self.experiment_tag, resources=self.resources, + placement_group_factory=self.placement_group_factory, stopping_criterion=self.stopping_criterion, remote_checkpoint_dir=self.remote_checkpoint_dir, checkpoint_freq=self.checkpoint_freq, @@ -362,7 +375,8 @@ class Trial: os.makedirs(self.logdir, exist_ok=True) self.invalidate_json_state() - def update_resources(self, cpu, gpu, **kwargs): + def update_resources( + self, resources: Union[Dict, Callable, PlacementGroupFactory]): """EXPERIMENTAL: Updates the resource requirements. Should only be called when the trial is not running. @@ -372,7 +386,20 @@ class Trial: """ if self.status is Trial.RUNNING: raise ValueError("Cannot update resources while Trial is running.") - self.resources = Resources(cpu, gpu, **kwargs) + if isinstance(resources, PlacementGroupFactory): + self.placement_group_factory = resources + elif callable(resources): + self.placement_group_factory = PlacementGroupFactory(resources) + else: + self.resources = Resources(**resources) + self.placement_group_factory = None + + if self.placement_group_factory and \ + not self.resources.has_placement_group: + resource_kwargs = self.resources._asdict() + resource_kwargs["has_placement_group"] = True + self.resources = Resources(**resource_kwargs) + self.invalidate_json_state() def set_runner(self, runner): diff --git a/python/ray/tune/trial_executor.py b/python/ray/tune/trial_executor.py index 02258e17f..6e0568fd1 100644 --- a/python/ray/tune/trial_executor.py +++ b/python/ray/tune/trial_executor.py @@ -15,7 +15,7 @@ class TrialExecutor: and starting/stopping trials. """ - def __init__(self, queue_trials=False): + def __init__(self, queue_trials: bool = False): """Initializes a new TrialExecutor. Args: @@ -78,7 +78,7 @@ class TrialExecutor: raise NotImplementedError("Subclasses of TrialExecutor must provide " "has_resources() method") - def start_trial(self, trial, checkpoint=None, train=True): + def start_trial(self, trial, checkpoint=None, train=True) -> bool: """Starts the trial restoring from checkpoint if checkpoint is provided. Args: @@ -86,6 +86,9 @@ class TrialExecutor: checkpoint (Checkpoint): A Python object or path storing the state of trial. train (bool): Whether or not to start training. + + Returns: + True if trial started successfully, False otherwise. """ raise NotImplementedError("Subclasses of TrialExecutor must provide " "start_trial() method") @@ -165,6 +168,8 @@ class TrialExecutor: if self._queue_trials: return for trial in trial_runner.get_trials(): + if trial.uses_placement_groups: + return if trial.status == Trial.PENDING: if not self.has_resources(trial.resources): resource_string = trial.resources.summary_string() @@ -275,3 +280,7 @@ class TrialExecutor: def cleanup(self, trial): """Ensures that trials are cleaned up after stopping.""" pass + + def in_staging_grace_period(self) -> bool: + """Returns True if trials have recently been staged.""" + return False diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 1f5d4aea0..c487190f7 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -1,3 +1,5 @@ +from typing import Optional + import click from datetime import datetime import json @@ -20,6 +22,7 @@ from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.suggest import BasicVariantGenerator from ray.tune.utils import warn_if_slow, flatten_dict, env_integer from ray.tune.utils.log import Verbosity, has_verbosity +from ray.tune.utils.placement_groups import TUNE_MAX_PENDING_TRIALS_PG from ray.tune.utils.serialization import TuneFunctionDecoder, \ TuneFunctionEncoder from ray.tune.web_server import TuneServer @@ -108,6 +111,11 @@ class TrialRunner: self._search_alg = search_alg or BasicVariantGenerator() self._scheduler_alg = scheduler or FIFOScheduler() self.trial_executor = trial_executor or RayTrialExecutor() + self._pending_trial_queue_times = {} + + # Setting this to 0 still allows adding one new (pending) trial, + # but it will prevent us from trying to fill the trial list + self._max_pending_trials = 0 # Can be updated in `self.add_trial()` self._metric = metric @@ -142,6 +150,7 @@ class TrialRunner: self._trials = [] self._cached_trial_decisions = {} self._queued_trial_decisions = {} + self._stop_queue = [] self._should_stop_experiment = False # used by TuneServer self._local_checkpoint_dir = local_checkpoint_dir @@ -349,18 +358,50 @@ class TrialRunner: with warn_if_slow("callbacks.on_step_begin"): self._callbacks.on_step_begin( iteration=self._iteration, trials=self._trials) + + # This will contain the next trial to start next_trial = self._get_next_trial() # blocking - if next_trial is not None: + + # Create pending trials + num_pending_trials = len( + [t for t in self._trials if t.status == Trial.PENDING]) + while num_pending_trials < self._max_pending_trials: + if not self._update_trial_queue(blocking=False): + break + num_pending_trials += 1 + + # Update status of staged placement groups + self.trial_executor.stage_and_update_status(self._trials) + + def _start_trial(trial: Trial) -> bool: + """Helper function to start trial and call callbacks""" with warn_if_slow("start_trial"): - self.trial_executor.start_trial(next_trial) - self._callbacks.on_trial_start( - iteration=self._iteration, - trials=self._trials, - trial=next_trial) - elif self.trial_executor.get_running_trials(): - self._process_events() # blocking - else: - self.trial_executor.on_no_available_trials(self) + if self.trial_executor.start_trial(trial): + self._callbacks.on_trial_start( + iteration=self._iteration, + trials=self._trials, + trial=trial) + return True + return False + + may_handle_events = True + if next_trial is not None: + if _start_trial(next_trial): + may_handle_events = False + else: + next_trial = self.trial_executor.get_staged_trial() + if next_trial is not None: + if _start_trial(next_trial): + may_handle_events = False + + if may_handle_events: + if self.trial_executor.get_running_trials(): + timeout = None + if self.trial_executor.in_staging_grace_period(): + timeout = 0.1 + self._process_events(timeout=timeout) # blocking + else: + self.trial_executor.on_no_available_trials(self) self._stop_experiment_if_needed() @@ -410,6 +451,9 @@ class TrialRunner: Args: trial (Trial): Trial to queue. """ + if trial.uses_placement_groups: + self._max_pending_trials = TUNE_MAX_PENDING_TRIALS_PG + self._trials.append(trial) with warn_if_slow("scheduler.on_trial_add"): self._scheduler_alg.on_trial_add(self, trial) @@ -462,7 +506,7 @@ class TrialRunner: logger.debug("Running trial {}".format(trial)) return trial - def _process_events(self): + def _process_events(self, timeout: Optional[float] = None): with warn_if_slow("get_next_failed_trial"): failed_trial = self.trial_executor.get_next_failed_trial() if failed_trial: @@ -475,8 +519,10 @@ class TrialRunner: else: # TODO(ujvl): Consider combining get_next_available_trial and # fetch_result functionality so that we don't timeout on fetch. - trial = self.trial_executor.get_next_available_trial() # blocking - + trial = self.trial_executor.get_next_available_trial( + timeout=timeout) # blocking + if not trial: + return if trial.is_restoring: with warn_if_slow("process_trial_restore"): self._process_trial_restore(trial) @@ -882,7 +928,8 @@ class TrialRunner: with warn_if_slow("scheduler.on_trial_add"): self._scheduler_alg.on_trial_add(self, trial) - def _update_trial_queue(self, blocking=False, timeout=600): + def _update_trial_queue(self, blocking: bool = False, + timeout: int = 600) -> bool: """Adds next trials to queue if possible. Note that the timeout is currently unexposed to the user. @@ -891,6 +938,9 @@ class TrialRunner: blocking (bool): Blocks until either a trial is available or is_finished (timeout or search algorithm finishes). timeout (int): Seconds before blocking times out. + + Returns: + Boolean indicating if a new trial was created or not. """ trial = self._search_alg.next_trial() if blocking and not trial: @@ -906,6 +956,9 @@ class TrialRunner: if trial: self.add_trial(trial) + return True + + return False def request_stop_trial(self, trial): self._stop_queue.append(trial) @@ -974,7 +1027,8 @@ class TrialRunner: state = self.__dict__.copy() for k in [ "_trials", "_stop_queue", "_server", "_search_alg", - "_scheduler_alg", "trial_executor", "_syncer", "_callbacks" + "_scheduler_alg", "_pending_trial_queue_times", + "trial_executor", "_syncer", "_callbacks" ]: del state[k] state["launch_web_server"] = bool(self._server) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index ab3df8ba8..fab7b79bf 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -160,10 +160,11 @@ def run( config (dict): Algorithm-specific configuration for Tune variant generation (e.g. env, hyperparams). Defaults to empty dict. Custom search algorithms may ignore this. - resources_per_trial (dict): Machine resources to allocate per trial, - e.g. ``{"cpu": 64, "gpu": 8}``. Note that GPUs will not be + resources_per_trial (dict|Callable): Machine resources to allocate per + trial, e.g. ``{"cpu": 64, "gpu": 8}``. Note that GPUs will not be assigned unless you specify them here. Defaults to 1 CPU and 0 - GPUs in ``Trainable.default_resource_request()``. + GPUs in ``Trainable.default_resource_request()``. This can also + be a function returning a placement group. num_samples (int): Number of times to sample from the hyperparameter space. Defaults to 1. If `grid_search` is provided as an argument, the grid will be repeated diff --git a/python/ray/tune/utils/placement_groups.py b/python/ray/tune/utils/placement_groups.py new file mode 100644 index 000000000..acab2cac6 --- /dev/null +++ b/python/ray/tune/utils/placement_groups.py @@ -0,0 +1,201 @@ +from collections import defaultdict +from typing import Dict, Optional, Set, Tuple +import os +import time + +import ray +from ray import ObjectRef +from ray.actor import ActorClass +from ray.tune.resources import PlacementGroupFactory +from ray.tune.trial import Trial +from ray.util.placement_group import PlacementGroup + +TUNE_MAX_PENDING_TRIALS_PG = int(os.getenv("TUNE_MAX_PENDING_TRIALS_PG", 1000)) +# Seconds we wait for a trial to come up before we make blocking calls +# to process events +TUNE_TRIAL_STARTUP_GRACE_PERIOD = float( + os.getenv("TUNE_TRIAL_STARTUP_GRACE_PERIOD", 10.)) + + +class PlacementGroupManager: + """PlacementGroupManager to stage and manage placement groups. + + This class schedules placement groups for trials, keeps track of + their state, and can return a fully configured actor class using + this placement group. + + If two trials share the same placement group factory, both could use + resulting placement groups from it. Thus this manager associates + placement groups with their factory methods. + """ + + def __init__(self): + # Sets of staged placement groups by factory + self._staging: Dict[PlacementGroupFactory, Set[ + PlacementGroup]] = defaultdict(set) + # Sets of ready and unused placement groups by factory + self._ready: Dict[PlacementGroupFactory, Set[ + PlacementGroup]] = defaultdict(set) + # Ray futures to check if a placement group is ready + self._staging_futures: Dict[ObjectRef, Tuple[PlacementGroupFactory, + PlacementGroup]] = {} + + # Placement groups used by trials + self._in_use_pgs: Dict[PlacementGroup, Trial] = {} + self._in_use_trials: Dict[Trial, PlacementGroup] = {} + + # Latest PG staging time to check if still in grace period. + self._latest_staging_start_time = time.time() + + def stage_trial_pg(self, pgf: PlacementGroupFactory): + """Stage a trial placement group. + + Create the trial placement group if maximum number of pending + placement groups is not exhausted. + + Args: + pgf (PlacementGroupFactory): Placement group factory to stage. + + Returns: + False if placement group has not been staged, True otherwise. + + Creates placement group and moves it to `self._staging`. + """ + if not self.can_stage(): + return False + + pg = pgf() # This creates the placement group + + self._staging[pgf].add(pg) + self._staging_futures[pg.ready()] = (pgf, pg) + + self._latest_staging_start_time = time.time() + + return True + + def can_stage(self): + """Return True if we can stage another placement group.""" + return len(self._staging) < TUNE_MAX_PENDING_TRIALS_PG + + def update_status(self): + """Update placement group status. + + Moves ready placement groups from `self._staging` to + `self._ready`. + """ + ready = True + while ready: + # Use a loop as `ready` might return futures one by one + ready, _ = ray.wait(list(self._staging_futures.keys()), timeout=0) + + for ready_fut in ready: + ready_pgf, ready_pg = self._staging_futures.pop(ready_fut) + + self._staging[ready_pgf].remove(ready_pg) + self._ready[ready_pgf].add(ready_pg) + + def get_full_actor_cls(self, trial: Trial, + actor_cls: ActorClass) -> Optional[ActorClass]: + """Get a fully configured actor class. + + Returns the actor handle if the placement group is ready. In this case, + the placement group is moved to `self._in_use_pgs` and removed from + `self._ready`. + + Args: + trial (Trial): Trial object to start + actor_cls: Ray actor class. + + Returns: + Configured ActorClass or None + + """ + pgf = trial.placement_group_factory + + if not self._ready[pgf]: + return None + + pg = self._ready[pgf].pop() + self._in_use_pgs[pg] = trial + self._in_use_trials[trial] = pg + + # We still have to pass resource specs + # Pass the full resource specs of the first bundle per default + first_bundle = pg.bundle_specs[0].copy() + num_cpus = first_bundle.pop("CPU", None) + num_gpus = first_bundle.get("GPU", None) + + # Only custom resources remain in `first_bundle` + resources = first_bundle or None + + return actor_cls.options( + placement_group=pg, + placement_group_bundle_index=0, + num_cpus=num_cpus, + num_gpus=num_gpus, + resources=resources) + + def has_ready(self, pgf: PlacementGroupFactory) -> bool: + """Return True if placement group is ready. + + Args: + pgf (PlacementGroupFactory): PlacementGroupFactory object. + + Returns: + Boolean. + + """ + return bool(self._ready[pgf]) + + def trial_in_use(self, trial: Trial): + return trial in self._in_use_trials + + def clean_trial_placement_group(self, + trial: Trial) -> Optional[PlacementGroup]: + """Remove reference to placement groups associated with a trial. + + Returns an associated placement group. If the trial was scheduled, this + is the placement group it was scheduled on. If the trial was not + scheduled, it will first try to return a staging placement group. If + there is no staging placement group, it will return a ready placement + group that is not yet being used by another trial. + + Args: + trial (Trial): Trial object. + + Returns: + PlacementGroup or None. + + """ + pgf = trial.placement_group_factory + + trial_pg = None + + if trial in self._in_use_trials: + # Trial was in use. Just return its placement group. + trial_pg = self._in_use_trials.pop(trial) + self._in_use_pgs.pop(trial_pg) + else: + # Trial was not in use. If there are pending placement groups + # in staging, pop a random one. + if self._staging[pgf]: + trial_pg = self._staging[pgf].pop() + + # For staging placement groups, we will also need to + # remove the future. + trial_future = None + for future, (pgf, pg) in self._staging_futures.items(): + if pg == trial_pg: + trial_future = future + break + del self._staging_futures[trial_future] + + elif self._ready[pgf]: + # Otherwise, return an unused ready placement group. + trial_pg = self._ready[pgf].pop() + + return trial_pg + + def in_staging_grace_period(self): + return self._staging_futures and time.time( + ) <= self._latest_staging_start_time + TUNE_TRIAL_STARTUP_GRACE_PERIOD diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index e533b2b7b..af0d1f0e0 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -563,7 +563,8 @@ def create_logdir(dirname: str, local_dir: str): dirname (str): Dirname to create in `local_dir` local_dir (str): Root directory for the log dir - Returns: full path to the newly created logdir. + Returns: + full path to the newly created logdir. """ local_dir = os.path.expanduser(local_dir) logdir = os.path.join(local_dir, dirname) diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 182dbb38c..be24772ab 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -1,6 +1,6 @@ import time -from typing import (List, Dict, Optional) +from typing import (List, Dict, Optional, Union) import ray from ray._raylet import PlacementGroupID, ObjectRef @@ -83,10 +83,10 @@ class PlacementGroup: placement_group_bundle_index=bundle_index, resources=resources).remote(self) - def wait(self, timeout_seconds: int) -> bool: + def wait(self, timeout_seconds: Union[float, int]) -> bool: """Wait for the placement group to be ready within the specified time. Args: - timeout_seconds(str): Timeout in seconds. + timeout_seconds(float|int): Timeout in seconds. Return: True if the placement group is created. False otherwise. """