[tune] placement group support (#13370)

This commit is contained in:
Kai Fricke
2021-01-18 20:58:57 +01:00
committed by GitHub
parent 1f00f834ac
commit dc42abb2f5
16 changed files with 756 additions and 65 deletions
+5
View File
@@ -688,6 +688,8 @@ These are the environment variables Ray Tune currently considers:
experiment state is checkpointed. If not set this will default to ``10``.
* **TUNE_MAX_LEN_IDENTIFIER**: Maximum length of trial subdirectory names (those
with the parameter values in them)
* **TUNE_MAX_PENDING_TRIALS_PG**: Maximum number of pending trials when placement groups are used. Defaults
to ``1000``.
* **TUNE_RESULT_DIR**: Directory where Ray Tune trial results are stored. If this
is not set, ``~/ray_results`` will be used.
* **TUNE_RESULT_BUFFER_LENGTH**: Ray Tune can buffer results from trainables before they are passed
@@ -697,6 +699,9 @@ These are the environment variables Ray Tune currently considers:
but never longer than this value. Defaults to 100 (seconds).
* **TUNE_RESULT_BUFFER_MIN_TIME_S**: Additionally, you can specify a minimum time to buffer results. Defaults to 0.
* **TUNE_SYNCER_VERBOSITY**: Amount of command output when using Tune with Docker Syncer. Defaults to 0.
* **TUNE_TRIAL_STARTUP_GRACE_PERIOD**: Amount of time after starting a trial that Ray Tune checks for successful
trial startups. After the grace period, Tune will block until a result from a running trial is received. Can
be disabled by setting this to lower or equal to 0.
* **TUNE_WARN_THRESHOLD_S**: Threshold for logging if an Tune event loop operation takes too long. Defaults to 0.5 (seconds).
* **TUNE_STATE_REFRESH_PERIOD**: Frequency of updating the resource tracking from Ray. Defaults to 10 (seconds).
+31 -4
View File
@@ -3,11 +3,12 @@ import json
import os
# For compatibility under py2 to consider unicode as str
from ray.tune.utils.serialization import TuneFunctionEncoder
from six import string_types
from ray.tune import TuneError
from ray.tune.trial import Trial
from ray.tune.resources import json_to_resources
from ray.tune.resources import PlacementGroupFactory, json_to_resources
from ray.tune.utils.util import SafeFallbackEncoder
@@ -142,11 +143,16 @@ def to_argv(config):
argv.append(v)
elif isinstance(v, bool):
pass
elif callable(v):
argv.append(json.dumps(v, cls=TuneFunctionEncoder))
else:
argv.append(json.dumps(v, cls=SafeFallbackEncoder))
return argv
_cached_pgf = {}
def create_trial_from_spec(spec, output_path, parser, **trial_kwargs):
"""Creates a Trial object from parsing the spec.
@@ -163,13 +169,34 @@ def create_trial_from_spec(spec, output_path, parser, **trial_kwargs):
Returns:
A trial object with corresponding parameters to the specification.
"""
global _cached_pgf
spec = spec.copy()
resources = spec.pop("resources_per_trial", None)
try:
args, _ = parser.parse_known_args(to_argv(spec))
except SystemExit:
raise TuneError("Error parsing args, see above message", spec)
if "resources_per_trial" in spec:
trial_kwargs["resources"] = json_to_resources(
spec["resources_per_trial"])
if resources:
if isinstance(resources, PlacementGroupFactory):
trial_kwargs["placement_group_factory"] = resources
elif callable(resources):
if resources in _cached_pgf:
trial_kwargs["placement_group_factory"] = _cached_pgf[
resources]
else:
pgf = PlacementGroupFactory(resources)
_cached_pgf[resources] = pgf
trial_kwargs["placement_group_factory"] = pgf
else:
try:
trial_kwargs["resources"] = json_to_resources(resources)
except (TuneError, ValueError) as exc:
raise TuneError("Error parsing resources_per_trial",
resources) from exc
return Trial(
# Submitting trial via server in py2.7 creates Unicode, which does not
# convert to string in a straightforward manner.
+2 -1
View File
@@ -25,7 +25,8 @@ def NamespacedKubernetesSyncer(namespace):
Args:
namespace (str): Kubernetes namespace.
Returns: A ``KubernetesSyncer`` class to be passed to ``tune.run()``.
Returns:
A ``KubernetesSyncer`` class to be passed to ``tune.run()``.
Example:
+123 -23
View File
@@ -7,8 +7,10 @@ import random
import time
import traceback
from contextlib import contextmanager
from typing import List, Optional
import ray
from ray.actor import ActorHandle
from ray.exceptions import GetTimeoutError
from ray import ray_constants
from ray.resource_spec import ResourceSpec
@@ -18,10 +20,12 @@ from ray.tune.function_runner import FunctionRunner
from ray.tune.logger import NoopLogger
from ray.tune.result import TRIAL_INFO, STDOUT_FILE, STDERR_FILE
from ray.tune.resources import Resources
from ray.tune.utils.placement_groups import PlacementGroupManager
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.trial import Trial, Checkpoint, Location, TrialInfo
from ray.tune.trial_executor import TrialExecutor
from ray.tune.utils import warn_if_slow
from ray.util.placement_group import PlacementGroup, remove_placement_group
logger = logging.getLogger(__name__)
@@ -88,11 +92,14 @@ class _TrialCleanup:
is passed, cleanup will kick in and remove futures.
"""
def __init__(self, threshold=TRIAL_CLEANUP_THRESHOLD):
def __init__(self, threshold: int = TRIAL_CLEANUP_THRESHOLD):
self.threshold = threshold
self._cleanup_map = {}
def add(self, trial, actor):
def add(self,
trial: Trial,
actor: ActorHandle,
placement_group: Optional[PlacementGroup] = None):
"""Adds a trial actor to be stopped.
If the number of futures exceeds the threshold, the cleanup mechanism
@@ -101,15 +108,20 @@ class _TrialCleanup:
Args:
trial (Trial): The trial corresponding to the future.
actor (ActorHandle): Handle to the trainable to be stopped.
placement_group (PlacementGroup): Placement group to stop.
"""
future = actor.stop.remote()
actor.__ray_terminate__.remote()
if placement_group:
remove_placement_group(placement_group)
else:
actor.__ray_terminate__.remote()
self._cleanup_map[future] = trial
if len(self._cleanup_map) > self.threshold:
self.cleanup(partial=True)
def cleanup(self, partial=True):
def cleanup(self, partial: bool = True):
"""Waits for cleanup to finish.
If partial=False, all futures are expected to return. If a future
@@ -141,10 +153,10 @@ class RayTrialExecutor(TrialExecutor):
"""An implementation of TrialExecutor based on Ray."""
def __init__(self,
queue_trials=False,
reuse_actors=False,
ray_auto_init=None,
refresh_period=None):
queue_trials: bool = False,
reuse_actors: bool = False,
ray_auto_init: Optional[bool] = None,
refresh_period: Optional[float] = None):
if ray_auto_init is None:
if os.environ.get("TUNE_DISABLE_AUTO_INIT") == "1":
logger.info("'TUNE_DISABLE_AUTO_INIT=1' detected.")
@@ -168,6 +180,9 @@ class RayTrialExecutor(TrialExecutor):
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._pg_manager = PlacementGroupManager()
self._staged_trials = set()
self._resources_initialized = False
if refresh_period is None:
@@ -188,6 +203,49 @@ class RayTrialExecutor(TrialExecutor):
if ray.is_initialized():
self._update_avail_resources()
def in_staging_grace_period(self) -> bool:
"""Returns True if trials have recently been staged."""
return self._pg_manager.in_staging_grace_period()
def stage_and_update_status(self, trials: List[Trial]):
"""Check and update statuses of scheduled placement groups.
Stages placement groups of all trials.
"""
for trial in trials:
if trial.status != Trial.PENDING:
continue
if not trial.uses_placement_groups:
continue
if trial in self._staged_trials:
continue
if self._pg_manager.trial_in_use(trial):
continue
if not self._pg_manager.stage_trial_pg(
trial.placement_group_factory):
# Break if we reached the limit of pending placement groups.
break
self._staged_trials.add(trial)
self._pg_manager.update_status()
def get_staged_trial(self):
"""Get a trial whose placement group was successfully staged.
Can also return None if no trial is available.
Returns:
Trial object or None.
"""
for trial in self._staged_trials:
if self._pg_manager.has_ready(trial.placement_group_factory):
return trial
return None
def _setup_remote_runner(self, trial, reuse_allowed):
trial.init_logdir()
# We checkpoint metadata here to try mitigating logdir duplication
@@ -212,16 +270,31 @@ class RayTrialExecutor(TrialExecutor):
logger.debug("Cannot reuse cached runner {} for new trial".format(
self._cached_actor))
with self._change_working_directory(trial):
self._trial_cleanup.add(trial, actor=self._cached_actor)
pg = self._pg_manager.clean_trial_placement_group(trial)
self._trial_cleanup.add(
trial, actor=self._cached_actor, placement_group=pg)
self._cached_actor = None
_actor_cls = _class_cache.get(trial.get_trainable_cls())
full_actor_class = _actor_cls.options(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
memory=trial.resources.memory or None,
object_store_memory=trial.resources.object_store_memory or None,
resources=trial.resources.custom_resources)
if trial.uses_placement_groups:
if not self._pg_manager.has_ready(trial.placement_group_factory):
if trial not in self._staged_trials:
if self._pg_manager.stage_trial_pg(
trial.placement_group_factory):
self._staged_trials.add(trial)
return None
else:
full_actor_class = self._pg_manager.get_full_actor_cls(
trial, _actor_cls)
else:
full_actor_class = _actor_cls.options(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu,
memory=trial.resources.memory or None,
object_store_memory=trial.resources.object_store_memory
or None,
resources=trial.resources.custom_resources)
# Clear the Trial's location (to be updated later on result)
# since we don't know where the remote runner is placed.
trial.set_location(Location())
@@ -285,7 +358,8 @@ class RayTrialExecutor(TrialExecutor):
trial_item = self._find_item(self._running, trial)
assert len(trial_item) < 2, trial_item
def _start_trial(self, trial, checkpoint=None, runner=None, train=True):
def _start_trial(self, trial, checkpoint=None, runner=None,
train=True) -> bool:
"""Starts trial and restores last result if trial was paused.
Args:
@@ -297,6 +371,9 @@ class RayTrialExecutor(TrialExecutor):
cached actor. If None, a new runner is created.
train (bool): Whether or not to start training.
Returns:
True if trial was started successfully, False otherwise.
See `RayTrialExecutor.restore` for possible errors raised.
"""
prior_status = trial.status
@@ -309,6 +386,8 @@ class RayTrialExecutor(TrialExecutor):
or issubclass(trial.get_trainable_cls(),
FunctionRunner)
runner = self._setup_remote_runner(trial, reuse_allowed)
if not runner:
return False
trial.set_runner(runner)
self.restore(trial, checkpoint)
self.set_status(trial, Trial.RUNNING)
@@ -320,6 +399,7 @@ class RayTrialExecutor(TrialExecutor):
self._running[previous_run[0]] = trial
elif train and not trial.is_restoring:
self._train(trial)
return True
def _stop_trial(self, trial, error=False, error_msg=None):
"""Stops this trial.
@@ -344,15 +424,17 @@ class RayTrialExecutor(TrialExecutor):
self._cached_actor = trial.runner
else:
logger.debug("Trial %s: Destroying actor.", trial)
pg = self._pg_manager.clean_trial_placement_group(trial)
with self._change_working_directory(trial):
self._trial_cleanup.add(trial, actor=trial.runner)
self._trial_cleanup.add(
trial, actor=trial.runner, placement_group=pg)
except Exception:
logger.exception("Trial %s: Error stopping runner.", trial)
self.set_status(trial, Trial.ERROR)
finally:
trial.set_runner(None)
def start_trial(self, trial, checkpoint=None, train=True):
def start_trial(self, trial, checkpoint=None, train=True) -> bool:
"""Starts the trial.
Will not return resources if trial repeatedly fails on start.
@@ -362,16 +444,21 @@ class RayTrialExecutor(TrialExecutor):
checkpoint (Checkpoint): A Python object or path storing the state
of trial.
train (bool): Whether or not to start training.
Returns:
True if trial was started successfully, False otherwise.
"""
self._commit_resources(trial.resources)
if not trial.uses_placement_groups:
self._commit_resources(trial.resources)
try:
self._start_trial(trial, checkpoint, train=train)
return self._start_trial(trial, checkpoint, train=train)
except AbortTrialExecution:
logger.exception("Trial %s: Error starting runner, aborting!",
trial)
time.sleep(2)
error_msg = traceback.format_exc()
self._stop_trial(trial, error=True, error_msg=error_msg)
return False
except Exception:
logger.exception("Trial %s: Unexpected error starting runner.",
trial)
@@ -380,6 +467,7 @@ class RayTrialExecutor(TrialExecutor):
self._stop_trial(trial, error=True, error_msg=error_msg)
# Note that we don't return the resources, since they may
# have been lost. TODO(ujvl): is this the right thing to do?
return False
def _find_item(self, dictionary, item):
out = [rid for rid, t in dictionary.items() if t is item]
@@ -391,7 +479,8 @@ class RayTrialExecutor(TrialExecutor):
self._stop_trial(trial, error=error, error_msg=error_msg)
if prior_status == Trial.RUNNING:
logger.debug("Trial %s: Returning resources.", trial)
self._return_resources(trial.resources)
if not trial.uses_placement_groups:
self._return_resources(trial.resources)
out = self._find_item(self._running, trial)
for result_id in out:
self._running.pop(result_id)
@@ -478,7 +567,9 @@ class RayTrialExecutor(TrialExecutor):
return trial
return None
def get_next_available_trial(self):
def get_next_available_trial(self, timeout: Optional[float] = None):
if not self._running:
return None
shuffled_results = list(self._running.keys())
random.shuffle(shuffled_results)
# Note: We shuffle the results because `ray.wait` by default returns
@@ -486,7 +577,10 @@ class RayTrialExecutor(TrialExecutor):
# trials (i.e. trials that run remotely) also get fairly reported.
# See https://github.com/ray-project/ray/issues/4211 for details.
start = time.time()
[result_id], _ = ray.wait(shuffled_results)
ready, _ = ray.wait(shuffled_results, timeout=timeout)
if not ready:
return None
result_id = ready[0]
wait_time = time.time() - start
if wait_time > NONTRIVIAL_WAIT_TIME_THRESHOLD_S:
self._last_nontrivial_wait = time.time()
@@ -540,6 +634,9 @@ class RayTrialExecutor(TrialExecutor):
custom_resources=custom_resources)
def _return_resources(self, resources):
if resources.has_placement_group:
return
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
@@ -611,6 +708,9 @@ class RayTrialExecutor(TrialExecutor):
has exceeded self._refresh_period. This also assumes that the
cluster is not resizing very frequently.
"""
if resources.has_placement_group:
return self._pg_manager.can_stage()
self._update_avail_resources()
currently_available = Resources.subtract(self._avail_resources,
self._committed_resources)
+21 -5
View File
@@ -3,6 +3,8 @@ import logging
import json
from numbers import Number
# For compatibility under py2 to consider unicode as str
from typing import Optional
from six import string_types
import ray
@@ -15,7 +17,7 @@ class Resources(
namedtuple("Resources", [
"cpu", "gpu", "memory", "object_store_memory", "extra_cpu",
"extra_gpu", "extra_memory", "extra_object_store_memory",
"custom_resources", "extra_custom_resources"
"custom_resources", "extra_custom_resources", "has_placement_group"
])):
"""Ray resources required to schedule a trial.
@@ -38,6 +40,8 @@ class Resources(
extra_custom_resources (dict): Extra custom resources to reserve in
case the trial needs to launch additional Ray actors that use
any of these custom resources.
has_placement_group (bool): Bool indicating if the trial also
has an associated placement group.
"""
@@ -53,7 +57,8 @@ class Resources(
extra_memory=0,
extra_object_store_memory=0,
custom_resources=None,
extra_custom_resources=None):
extra_custom_resources=None,
has_placement_group=False):
custom_resources = custom_resources or {}
extra_custom_resources = extra_custom_resources or {}
leftovers = set(custom_resources) ^ set(extra_custom_resources)
@@ -92,7 +97,7 @@ class Resources(
return super(Resources, cls).__new__(
cls, cpu, gpu, memory, object_store_memory, extra_cpu, extra_gpu,
extra_memory, extra_object_store_memory, custom_resources,
extra_custom_resources)
extra_custom_resources, has_placement_group)
def summary_string(self):
summary = "{} CPUs, {} GPUs".format(self.cpu + self.extra_cpu,
@@ -171,11 +176,22 @@ class Resources(
return resources_to_json(self)
def json_to_resources(data):
class PlacementGroupFactory:
"""Wrapper class to identify placement group factory methods."""
def __init__(self, factory):
self._factory = factory
def __call__(self, *args, **kwargs):
return self._factory(*args, **kwargs)
def json_to_resources(data: Optional[str]):
if data is None or data == "null":
return None
if isinstance(data, string_types):
data = json.loads(data)
for k in data:
if k in ["driver_cpu_limit", "driver_gpu_limit"]:
raise TuneError(
@@ -193,7 +209,7 @@ def json_to_resources(data):
data.get("extra_custom_resources"))
def resources_to_json(resources):
def resources_to_json(resources: Optional[Resources]):
if resources is None:
return None
return {
@@ -3,6 +3,7 @@ import unittest
from unittest.mock import patch
import ray
from ray import tune
from ray.rllib import _register_all
from ray.tune import Trainable
from ray.tune.ray_trial_executor import RayTrialExecutor
@@ -12,6 +13,7 @@ from ray.tune.suggest import BasicVariantGenerator
from ray.tune.trial import Trial, Checkpoint
from ray.tune.resources import Resources
from ray.cluster_utils import Cluster
from ray.util import placement_group
class RayTrialExecutorTest(unittest.TestCase):
@@ -270,6 +272,87 @@ class RayExecutorQueueTest(unittest.TestCase):
self.trial_executor.has_resources(cpu_only_trial3.resources))
class RayExecutorPlacementGroupTest(unittest.TestCase):
def setUp(self):
self.head_cpus = 8
self.head_gpus = 4
self.head_custom = 16
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": self.head_cpus,
"num_gpus": self.head_gpus,
"resources": {
"custom": self.head_custom
},
"_system_config": {
"num_heartbeats_timeout": 10
}
})
# Pytest doesn't play nicely with imports
_register_all()
def tearDown(self):
ray.shutdown()
self.cluster.shutdown()
_register_all() # re-register the evicted objects
def testResourcesAvailableNoPlacementGroup(self):
def train(config):
tune.report(metric=0, resources=ray.available_resources())
out = tune.run(
train,
resources_per_trial={
"cpu": 1,
"gpu": 1,
"custom_resources": {
"custom": 3
},
"extra_cpu": 3,
"extra_gpu": 1,
"extra_custom_resources": {
"custom": 4
},
})
# Only `cpu`, `gpu`, and `custom_resources` will be "really" reserved,
# the extra_* will just be internally reserved by Tune.
self.assertDictEqual({
key: val
for key, val in out.trials[0].last_result["resources"].items()
if key in ["CPU", "GPU", "custom"]
}, {
"CPU": self.head_cpus - 1.0,
"GPU": self.head_gpus - 1.0,
"custom": self.head_custom - 3.0
})
def testResourcesAvailableWithPlacementGroup(self):
def train(config):
tune.report(metric=0, resources=ray.available_resources())
def placement_group_factory():
head_bundle = {"CPU": 1, "GPU": 0, "custom": 4}
child_bundle = {"CPU": 2, "GPU": 1, "custom": 3}
return placement_group([head_bundle, child_bundle, child_bundle])
out = tune.run(train, resources_per_trial=placement_group_factory)
self.assertDictEqual({
key: val
for key, val in out.trials[0].last_result["resources"].items()
if key in ["CPU", "GPU", "custom"]
}, {
"CPU": self.head_cpus - 5.0,
"GPU": self.head_gpus - 2.0,
"custom": self.head_custom - 10.0
})
class LocalModeExecutorTest(RayTrialExecutorTest):
def setUp(self):
ray.init(local_mode=True)
+3 -2
View File
@@ -264,7 +264,7 @@ class TrialRunnerTest(unittest.TestCase):
if result["training_iteration"] == 1:
executor = trial_runner.trial_executor
executor.stop_trial(trial)
trial.update_resources(2, 0)
trial.update_resources(dict(cpu=2, gpu=0))
executor.start_trial(trial)
return TrialScheduler.CONTINUE
@@ -282,7 +282,8 @@ class TrialRunnerTest(unittest.TestCase):
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(runner.trial_executor._committed_resources.cpu, 1)
self.assertRaises(ValueError, lambda: trials[0].update_resources(2, 0))
self.assertRaises(
ValueError, lambda: trials[0].update_resources(dict(cpu=2, gpu=0)))
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
+166 -1
View File
@@ -1,3 +1,4 @@
import time
from collections import Counter
import os
import pickle
@@ -6,11 +7,15 @@ import sys
import tempfile
import unittest
from unittest.mock import patch
import numpy as np
import ray
from ray.cluster_utils import Cluster
from ray.rllib import _register_all
from ray.tune import TuneError
from ray import tune
from ray.tune import Callback, TuneError
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.result import TRAINING_ITERATION
from ray.tune.schedulers import TrialScheduler, FIFOScheduler
from ray.tune.experiment import Experiment
@@ -21,6 +26,7 @@ from ray.tune.suggest.repeater import Repeater
from ray.tune.suggest._mock import _MockSuggestionAlgorithm
from ray.tune.suggest.suggestion import Searcher, ConcurrencyLimiter
from ray.tune.suggest.search_generator import SearchGenerator
from ray.util import placement_group
class TrialRunnerTest3(unittest.TestCase):
@@ -921,6 +927,165 @@ class ResourcesTest(unittest.TestCase):
self.assertEqual(original, new_resource)
class TrialRunnerPlacementGroupTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_GLOBAL_CHECKPOINT_S"] = "10000"
self.head_cpus = 8
self.head_gpus = 4
self.head_custom = 16
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": self.head_cpus,
"num_gpus": self.head_gpus,
"resources": {
"custom": self.head_custom
},
"_system_config": {
"num_heartbeats_timeout": 10
}
})
# Pytest doesn't play nicely with imports
_register_all()
def tearDown(self):
ray.shutdown()
self.cluster.shutdown()
_register_all() # re-register the evicted objects
def testPlacementGroupRequests(self, scheduled=10):
"""In this test we try to start 10 trials but only have resources
for 2. Placement groups should still be created and PENDING.
Eventually they should be scheduled sequentially (i.e. in pairs
of two)."""
def train(config):
time.sleep(1)
now = time.time()
tune.report(end=now - config["start_time"])
def placement_group_factory():
head_bundle = {"CPU": 4, "GPU": 0, "custom": 0}
child_bundle = {"custom": 1}
return placement_group([head_bundle, child_bundle, child_bundle])
trial_executor = RayTrialExecutor()
this = self
class _TestCallback(Callback):
def on_step_end(self, iteration, trials, **info):
if iteration == 1:
this.assertEqual(scheduled, len(trials))
this.assertEqual(
scheduled,
sum(
len(s) for s in
trial_executor._pg_manager._staging.values()) +
sum(
len(s)
for s in trial_executor._pg_manager._ready.values(
)) + len(trial_executor._pg_manager._in_use_pgs))
start = time.time()
out = tune.run(
train,
config={"start_time": start},
resources_per_trial=placement_group_factory,
num_samples=10,
trial_executor=trial_executor,
callbacks=[_TestCallback()])
trial_end_times = sorted(t.last_result["end"] for t in out.trials)
print("Trial end times:", trial_end_times)
max_diff = trial_end_times[-1] - trial_end_times[0]
# Not all trials have been run in parallel
self.assertGreater(max_diff, 5)
# Some trials should have run in parallel
self.assertLess(max_diff, 10)
@patch("ray.tune.trial_runner.TUNE_MAX_PENDING_TRIALS_PG", 6)
@patch("ray.tune.utils.placement_groups.TUNE_MAX_PENDING_TRIALS_PG", 6)
def testPlacementGroupLimitedRequests(self):
"""Assert that maximum number of placement groups is enforced."""
self.testPlacementGroupRequests(scheduled=6)
def testPlacementGroupDistributedTraining(self):
"""Run distributed training using placement groups.
Each trial requests 4 CPUs and starts 4 remote training workers.
"""
def placement_group_factory():
head_bundle = {"CPU": 1, "GPU": 0, "custom": 0}
child_bundle = {"CPU": 1}
return placement_group(
[head_bundle, child_bundle, child_bundle, child_bundle])
@ray.remote
class TrainingActor:
def train(self, val):
time.sleep(1)
return val
def train(config):
base = config["base"]
actors = [TrainingActor.remote() for _ in range(4)]
futures = [
actor.train.remote(base + 2 * i)
for i, actor in enumerate(actors)
]
results = ray.get(futures)
end = time.time() - config["start_time"]
tune.report(avg=np.mean(results), end=end)
trial_executor = RayTrialExecutor()
start = time.time()
out = tune.run(
train,
config={
"start_time": start,
"base": tune.grid_search(list(range(0, 100, 10)))
},
resources_per_trial=placement_group_factory,
num_samples=1,
trial_executor=trial_executor)
avgs = sorted(t.last_result["avg"] for t in out.trials)
self.assertSequenceEqual(avgs, list(range(3, 103, 10)))
trial_end_times = sorted(t.last_result["end"] for t in out.trials)
print("Trial end times:", trial_end_times)
max_diff = trial_end_times[-1] - trial_end_times[0]
# Not all trials have been run in parallel
self.assertGreater(max_diff, 5)
# Some trials should have run in parallel
# Todo: Re-enable when using buildkite
# self.assertLess(max_diff, 10)
# Assert proper cleanup
pg_manager = trial_executor._pg_manager
self.assertFalse(pg_manager._in_use_trials)
self.assertFalse(pg_manager._in_use_pgs)
self.assertFalse(pg_manager._staging_futures)
for pgf in pg_manager._staging:
self.assertFalse(pg_manager._staging[pgf])
for pgf in pg_manager._ready:
self.assertFalse(pg_manager._ready[pgf])
self.assertTrue(pg_manager._latest_staging_start_time)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
@@ -64,7 +64,7 @@ class _MockTrialExecutor(RayTrialExecutor):
def fetch_result(self, trial):
return [self.results.get(trial, {})]
def get_next_available_trial(self):
def get_next_available_trial(self, timeout=None):
return self.next_trial or super().get_next_available_trial()
def get_next_failed_trial(self):
+31 -4
View File
@@ -1,5 +1,5 @@
from typing import Callable, Dict, Sequence, Union
import json
from typing import Sequence
import ray.cloudpickle as cloudpickle
from collections import deque
@@ -18,7 +18,8 @@ from ray.tune.checkpoint_manager import Checkpoint, CheckpointManager
# have been defined yet. See https://github.com/ray-project/ray/issues/1716.
from ray.tune.registry import get_trainable_cls, validate_trainable
from ray.tune.result import DEFAULT_RESULTS_DIR, DONE, TRAINING_ITERATION
from ray.tune.resources import Resources, json_to_resources, resources_to_json
from ray.tune.resources import PlacementGroupFactory, Resources, \
json_to_resources, resources_to_json
from ray.tune.utils.serialization import TuneFunctionEncoder
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils import date_str, flatten_dict
@@ -179,6 +180,7 @@ class Trial:
evaluated_params=None,
experiment_tag="",
resources=None,
placement_group_factory=None,
stopping_criterion=None,
remote_checkpoint_dir=None,
checkpoint_freq=0,
@@ -221,6 +223,12 @@ class Trial:
resources = default_resources
self.location = Location()
self.resources = resources or Resources(cpu=1, gpu=0)
self.placement_group_factory = placement_group_factory
if self.placement_group_factory:
resource_kwargs = self.resources._asdict()
resource_kwargs["has_placement_group"] = True
self.resources = Resources(**resource_kwargs)
self.stopping_criterion = stopping_criterion or {}
self.log_to_file = log_to_file
@@ -330,6 +338,10 @@ class Trial:
logdir_name = os.path.basename(self.logdir)
return os.path.join(self.remote_checkpoint_dir_prefix, logdir_name)
@property
def uses_placement_groups(self):
return bool(self.placement_group_factory)
def reset(self):
return Trial(
self.trainable_name,
@@ -339,6 +351,7 @@ class Trial:
evaluated_params=self.evaluated_params,
experiment_tag=self.experiment_tag,
resources=self.resources,
placement_group_factory=self.placement_group_factory,
stopping_criterion=self.stopping_criterion,
remote_checkpoint_dir=self.remote_checkpoint_dir,
checkpoint_freq=self.checkpoint_freq,
@@ -362,7 +375,8 @@ class Trial:
os.makedirs(self.logdir, exist_ok=True)
self.invalidate_json_state()
def update_resources(self, cpu, gpu, **kwargs):
def update_resources(
self, resources: Union[Dict, Callable, PlacementGroupFactory]):
"""EXPERIMENTAL: Updates the resource requirements.
Should only be called when the trial is not running.
@@ -372,7 +386,20 @@ class Trial:
"""
if self.status is Trial.RUNNING:
raise ValueError("Cannot update resources while Trial is running.")
self.resources = Resources(cpu, gpu, **kwargs)
if isinstance(resources, PlacementGroupFactory):
self.placement_group_factory = resources
elif callable(resources):
self.placement_group_factory = PlacementGroupFactory(resources)
else:
self.resources = Resources(**resources)
self.placement_group_factory = None
if self.placement_group_factory and \
not self.resources.has_placement_group:
resource_kwargs = self.resources._asdict()
resource_kwargs["has_placement_group"] = True
self.resources = Resources(**resource_kwargs)
self.invalidate_json_state()
def set_runner(self, runner):
+11 -2
View File
@@ -15,7 +15,7 @@ class TrialExecutor:
and starting/stopping trials.
"""
def __init__(self, queue_trials=False):
def __init__(self, queue_trials: bool = False):
"""Initializes a new TrialExecutor.
Args:
@@ -78,7 +78,7 @@ class TrialExecutor:
raise NotImplementedError("Subclasses of TrialExecutor must provide "
"has_resources() method")
def start_trial(self, trial, checkpoint=None, train=True):
def start_trial(self, trial, checkpoint=None, train=True) -> bool:
"""Starts the trial restoring from checkpoint if checkpoint is provided.
Args:
@@ -86,6 +86,9 @@ class TrialExecutor:
checkpoint (Checkpoint): A Python object or path storing the state
of trial.
train (bool): Whether or not to start training.
Returns:
True if trial started successfully, False otherwise.
"""
raise NotImplementedError("Subclasses of TrialExecutor must provide "
"start_trial() method")
@@ -165,6 +168,8 @@ class TrialExecutor:
if self._queue_trials:
return
for trial in trial_runner.get_trials():
if trial.uses_placement_groups:
return
if trial.status == Trial.PENDING:
if not self.has_resources(trial.resources):
resource_string = trial.resources.summary_string()
@@ -275,3 +280,7 @@ class TrialExecutor:
def cleanup(self, trial):
"""Ensures that trials are cleaned up after stopping."""
pass
def in_staging_grace_period(self) -> bool:
"""Returns True if trials have recently been staged."""
return False
+69 -15
View File
@@ -1,3 +1,5 @@
from typing import Optional
import click
from datetime import datetime
import json
@@ -20,6 +22,7 @@ from ray.tune.schedulers import FIFOScheduler, TrialScheduler
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.utils import warn_if_slow, flatten_dict, env_integer
from ray.tune.utils.log import Verbosity, has_verbosity
from ray.tune.utils.placement_groups import TUNE_MAX_PENDING_TRIALS_PG
from ray.tune.utils.serialization import TuneFunctionDecoder, \
TuneFunctionEncoder
from ray.tune.web_server import TuneServer
@@ -108,6 +111,11 @@ class TrialRunner:
self._search_alg = search_alg or BasicVariantGenerator()
self._scheduler_alg = scheduler or FIFOScheduler()
self.trial_executor = trial_executor or RayTrialExecutor()
self._pending_trial_queue_times = {}
# Setting this to 0 still allows adding one new (pending) trial,
# but it will prevent us from trying to fill the trial list
self._max_pending_trials = 0 # Can be updated in `self.add_trial()`
self._metric = metric
@@ -142,6 +150,7 @@ class TrialRunner:
self._trials = []
self._cached_trial_decisions = {}
self._queued_trial_decisions = {}
self._stop_queue = []
self._should_stop_experiment = False # used by TuneServer
self._local_checkpoint_dir = local_checkpoint_dir
@@ -349,18 +358,50 @@ class TrialRunner:
with warn_if_slow("callbacks.on_step_begin"):
self._callbacks.on_step_begin(
iteration=self._iteration, trials=self._trials)
# This will contain the next trial to start
next_trial = self._get_next_trial() # blocking
if next_trial is not None:
# Create pending trials
num_pending_trials = len(
[t for t in self._trials if t.status == Trial.PENDING])
while num_pending_trials < self._max_pending_trials:
if not self._update_trial_queue(blocking=False):
break
num_pending_trials += 1
# Update status of staged placement groups
self.trial_executor.stage_and_update_status(self._trials)
def _start_trial(trial: Trial) -> bool:
"""Helper function to start trial and call callbacks"""
with warn_if_slow("start_trial"):
self.trial_executor.start_trial(next_trial)
self._callbacks.on_trial_start(
iteration=self._iteration,
trials=self._trials,
trial=next_trial)
elif self.trial_executor.get_running_trials():
self._process_events() # blocking
else:
self.trial_executor.on_no_available_trials(self)
if self.trial_executor.start_trial(trial):
self._callbacks.on_trial_start(
iteration=self._iteration,
trials=self._trials,
trial=trial)
return True
return False
may_handle_events = True
if next_trial is not None:
if _start_trial(next_trial):
may_handle_events = False
else:
next_trial = self.trial_executor.get_staged_trial()
if next_trial is not None:
if _start_trial(next_trial):
may_handle_events = False
if may_handle_events:
if self.trial_executor.get_running_trials():
timeout = None
if self.trial_executor.in_staging_grace_period():
timeout = 0.1
self._process_events(timeout=timeout) # blocking
else:
self.trial_executor.on_no_available_trials(self)
self._stop_experiment_if_needed()
@@ -410,6 +451,9 @@ class TrialRunner:
Args:
trial (Trial): Trial to queue.
"""
if trial.uses_placement_groups:
self._max_pending_trials = TUNE_MAX_PENDING_TRIALS_PG
self._trials.append(trial)
with warn_if_slow("scheduler.on_trial_add"):
self._scheduler_alg.on_trial_add(self, trial)
@@ -462,7 +506,7 @@ class TrialRunner:
logger.debug("Running trial {}".format(trial))
return trial
def _process_events(self):
def _process_events(self, timeout: Optional[float] = None):
with warn_if_slow("get_next_failed_trial"):
failed_trial = self.trial_executor.get_next_failed_trial()
if failed_trial:
@@ -475,8 +519,10 @@ class TrialRunner:
else:
# TODO(ujvl): Consider combining get_next_available_trial and
# fetch_result functionality so that we don't timeout on fetch.
trial = self.trial_executor.get_next_available_trial() # blocking
trial = self.trial_executor.get_next_available_trial(
timeout=timeout) # blocking
if not trial:
return
if trial.is_restoring:
with warn_if_slow("process_trial_restore"):
self._process_trial_restore(trial)
@@ -882,7 +928,8 @@ class TrialRunner:
with warn_if_slow("scheduler.on_trial_add"):
self._scheduler_alg.on_trial_add(self, trial)
def _update_trial_queue(self, blocking=False, timeout=600):
def _update_trial_queue(self, blocking: bool = False,
timeout: int = 600) -> bool:
"""Adds next trials to queue if possible.
Note that the timeout is currently unexposed to the user.
@@ -891,6 +938,9 @@ class TrialRunner:
blocking (bool): Blocks until either a trial is available
or is_finished (timeout or search algorithm finishes).
timeout (int): Seconds before blocking times out.
Returns:
Boolean indicating if a new trial was created or not.
"""
trial = self._search_alg.next_trial()
if blocking and not trial:
@@ -906,6 +956,9 @@ class TrialRunner:
if trial:
self.add_trial(trial)
return True
return False
def request_stop_trial(self, trial):
self._stop_queue.append(trial)
@@ -974,7 +1027,8 @@ class TrialRunner:
state = self.__dict__.copy()
for k in [
"_trials", "_stop_queue", "_server", "_search_alg",
"_scheduler_alg", "trial_executor", "_syncer", "_callbacks"
"_scheduler_alg", "_pending_trial_queue_times",
"trial_executor", "_syncer", "_callbacks"
]:
del state[k]
state["launch_web_server"] = bool(self._server)
+4 -3
View File
@@ -160,10 +160,11 @@ def run(
config (dict): Algorithm-specific configuration for Tune variant
generation (e.g. env, hyperparams). Defaults to empty dict.
Custom search algorithms may ignore this.
resources_per_trial (dict): Machine resources to allocate per trial,
e.g. ``{"cpu": 64, "gpu": 8}``. Note that GPUs will not be
resources_per_trial (dict|Callable): Machine resources to allocate per
trial, e.g. ``{"cpu": 64, "gpu": 8}``. Note that GPUs will not be
assigned unless you specify them here. Defaults to 1 CPU and 0
GPUs in ``Trainable.default_resource_request()``.
GPUs in ``Trainable.default_resource_request()``. This can also
be a function returning a placement group.
num_samples (int): Number of times to sample from the
hyperparameter space. Defaults to 1. If `grid_search` is
provided as an argument, the grid will be repeated
+201
View File
@@ -0,0 +1,201 @@
from collections import defaultdict
from typing import Dict, Optional, Set, Tuple
import os
import time
import ray
from ray import ObjectRef
from ray.actor import ActorClass
from ray.tune.resources import PlacementGroupFactory
from ray.tune.trial import Trial
from ray.util.placement_group import PlacementGroup
TUNE_MAX_PENDING_TRIALS_PG = int(os.getenv("TUNE_MAX_PENDING_TRIALS_PG", 1000))
# Seconds we wait for a trial to come up before we make blocking calls
# to process events
TUNE_TRIAL_STARTUP_GRACE_PERIOD = float(
os.getenv("TUNE_TRIAL_STARTUP_GRACE_PERIOD", 10.))
class PlacementGroupManager:
"""PlacementGroupManager to stage and manage placement groups.
This class schedules placement groups for trials, keeps track of
their state, and can return a fully configured actor class using
this placement group.
If two trials share the same placement group factory, both could use
resulting placement groups from it. Thus this manager associates
placement groups with their factory methods.
"""
def __init__(self):
# Sets of staged placement groups by factory
self._staging: Dict[PlacementGroupFactory, Set[
PlacementGroup]] = defaultdict(set)
# Sets of ready and unused placement groups by factory
self._ready: Dict[PlacementGroupFactory, Set[
PlacementGroup]] = defaultdict(set)
# Ray futures to check if a placement group is ready
self._staging_futures: Dict[ObjectRef, Tuple[PlacementGroupFactory,
PlacementGroup]] = {}
# Placement groups used by trials
self._in_use_pgs: Dict[PlacementGroup, Trial] = {}
self._in_use_trials: Dict[Trial, PlacementGroup] = {}
# Latest PG staging time to check if still in grace period.
self._latest_staging_start_time = time.time()
def stage_trial_pg(self, pgf: PlacementGroupFactory):
"""Stage a trial placement group.
Create the trial placement group if maximum number of pending
placement groups is not exhausted.
Args:
pgf (PlacementGroupFactory): Placement group factory to stage.
Returns:
False if placement group has not been staged, True otherwise.
Creates placement group and moves it to `self._staging`.
"""
if not self.can_stage():
return False
pg = pgf() # This creates the placement group
self._staging[pgf].add(pg)
self._staging_futures[pg.ready()] = (pgf, pg)
self._latest_staging_start_time = time.time()
return True
def can_stage(self):
"""Return True if we can stage another placement group."""
return len(self._staging) < TUNE_MAX_PENDING_TRIALS_PG
def update_status(self):
"""Update placement group status.
Moves ready placement groups from `self._staging` to
`self._ready`.
"""
ready = True
while ready:
# Use a loop as `ready` might return futures one by one
ready, _ = ray.wait(list(self._staging_futures.keys()), timeout=0)
for ready_fut in ready:
ready_pgf, ready_pg = self._staging_futures.pop(ready_fut)
self._staging[ready_pgf].remove(ready_pg)
self._ready[ready_pgf].add(ready_pg)
def get_full_actor_cls(self, trial: Trial,
actor_cls: ActorClass) -> Optional[ActorClass]:
"""Get a fully configured actor class.
Returns the actor handle if the placement group is ready. In this case,
the placement group is moved to `self._in_use_pgs` and removed from
`self._ready`.
Args:
trial (Trial): Trial object to start
actor_cls: Ray actor class.
Returns:
Configured ActorClass or None
"""
pgf = trial.placement_group_factory
if not self._ready[pgf]:
return None
pg = self._ready[pgf].pop()
self._in_use_pgs[pg] = trial
self._in_use_trials[trial] = pg
# We still have to pass resource specs
# Pass the full resource specs of the first bundle per default
first_bundle = pg.bundle_specs[0].copy()
num_cpus = first_bundle.pop("CPU", None)
num_gpus = first_bundle.get("GPU", None)
# Only custom resources remain in `first_bundle`
resources = first_bundle or None
return actor_cls.options(
placement_group=pg,
placement_group_bundle_index=0,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources)
def has_ready(self, pgf: PlacementGroupFactory) -> bool:
"""Return True if placement group is ready.
Args:
pgf (PlacementGroupFactory): PlacementGroupFactory object.
Returns:
Boolean.
"""
return bool(self._ready[pgf])
def trial_in_use(self, trial: Trial):
return trial in self._in_use_trials
def clean_trial_placement_group(self,
trial: Trial) -> Optional[PlacementGroup]:
"""Remove reference to placement groups associated with a trial.
Returns an associated placement group. If the trial was scheduled, this
is the placement group it was scheduled on. If the trial was not
scheduled, it will first try to return a staging placement group. If
there is no staging placement group, it will return a ready placement
group that is not yet being used by another trial.
Args:
trial (Trial): Trial object.
Returns:
PlacementGroup or None.
"""
pgf = trial.placement_group_factory
trial_pg = None
if trial in self._in_use_trials:
# Trial was in use. Just return its placement group.
trial_pg = self._in_use_trials.pop(trial)
self._in_use_pgs.pop(trial_pg)
else:
# Trial was not in use. If there are pending placement groups
# in staging, pop a random one.
if self._staging[pgf]:
trial_pg = self._staging[pgf].pop()
# For staging placement groups, we will also need to
# remove the future.
trial_future = None
for future, (pgf, pg) in self._staging_futures.items():
if pg == trial_pg:
trial_future = future
break
del self._staging_futures[trial_future]
elif self._ready[pgf]:
# Otherwise, return an unused ready placement group.
trial_pg = self._ready[pgf].pop()
return trial_pg
def in_staging_grace_period(self):
return self._staging_futures and time.time(
) <= self._latest_staging_start_time + TUNE_TRIAL_STARTUP_GRACE_PERIOD
+2 -1
View File
@@ -563,7 +563,8 @@ def create_logdir(dirname: str, local_dir: str):
dirname (str): Dirname to create in `local_dir`
local_dir (str): Root directory for the log dir
Returns: full path to the newly created logdir.
Returns:
full path to the newly created logdir.
"""
local_dir = os.path.expanduser(local_dir)
logdir = os.path.join(local_dir, dirname)
+3 -3
View File
@@ -1,6 +1,6 @@
import time
from typing import (List, Dict, Optional)
from typing import (List, Dict, Optional, Union)
import ray
from ray._raylet import PlacementGroupID, ObjectRef
@@ -83,10 +83,10 @@ class PlacementGroup:
placement_group_bundle_index=bundle_index,
resources=resources).remote(self)
def wait(self, timeout_seconds: int) -> bool:
def wait(self, timeout_seconds: Union[float, int]) -> bool:
"""Wait for the placement group to be ready within the specified time.
Args:
timeout_seconds(str): Timeout in seconds.
timeout_seconds(float|int): Timeout in seconds.
Return:
True if the placement group is created. False otherwise.
"""