diff --git a/python/ray/tune/tests/test_trainable_util.py b/python/ray/tune/tests/test_trainable_util.py index 25860eb1c..23dfb3573 100644 --- a/python/ray/tune/tests/test_trainable_util.py +++ b/python/ray/tune/tests/test_trainable_util.py @@ -1,10 +1,14 @@ +from collections import OrderedDict import os -import pickle +import sys import shutil import unittest +from unittest.mock import patch import ray.utils - +import ray.cloudpickle as cloudpickle +from ray.tune.utils.util import wait_for_gpu +from ray.tune.utils.util import unflatten_dict from ray.tune.utils.trainable import TrainableUtil @@ -12,13 +16,15 @@ class TrainableUtilTest(unittest.TestCase): def setUp(self): self.checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(), "tune", "MyTrainable123") - TrainableUtil.make_checkpoint_dir(self.checkpoint_dir) + self.checkpoint_dir = TrainableUtil.make_checkpoint_dir( + self.checkpoint_dir, "0") def tearDown(self): self.addCleanup(shutil.rmtree, self.checkpoint_dir) def testFindCheckpointDir(self): - checkpoint_path = os.path.join(self.checkpoint_dir, "my/nested/chkpt") + checkpoint_path = os.path.join(self.checkpoint_dir, + "0/my/nested/chkpt") os.makedirs(checkpoint_path) found_dir = TrainableUtil.find_checkpoint_dir(checkpoint_path) self.assertEquals(self.checkpoint_dir, found_dir) @@ -36,7 +42,7 @@ class TrainableUtilTest(unittest.TestCase): checkpoint_path = os.path.join(self.checkpoint_dir, "0") data_dict = TrainableUtil.pickle_checkpoint(checkpoint_path) - loaded = pickle.loads(data_dict) + loaded = cloudpickle.loads(data_dict) checkpoint_name = os.path.basename(checkpoint_path) self.assertEqual(loaded["checkpoint_name"], checkpoint_name) @@ -44,3 +50,94 @@ class TrainableUtilTest(unittest.TestCase): for i in range(5): path = os.path.join(self.checkpoint_dir, str(i)) self.assertEquals(loaded["data"][str(i)], open(path, "rb").read()) + + +class UnflattenDictTest(unittest.TestCase): + def test_output_type(self): + in_ = OrderedDict({"a/b": 1, "c/d": 2, "e": 3}) + out = unflatten_dict(in_) + assert type(in_) is type(out) + + def test_one_level_nested(self): + result = unflatten_dict({"a/b": 1, "c/d": 2, "e": 3}) + assert result == {"a": {"b": 1}, "c": {"d": 2}, "e": 3} + + def test_multi_level_nested(self): + result = unflatten_dict({"a/b/c/d": 1, "b/c/d": 2, "c/d": 3, "e": 4}) + assert result == { + "a": { + "b": { + "c": { + "d": 1, + }, + }, + }, + "b": { + "c": { + "d": 2, + }, + }, + "c": { + "d": 3, + }, + "e": 4, + } + + +class GPUUtilMock: + class GPU: + def __init__(self, id, uuid, util=None): + self.id = id + self.uuid = uuid + self.util = [0.5, 0.0] + + @property + def memoryUtil(self): + if self.util: + return self.util.pop(0) + return 0 + + def __init__(self, gpus, gpu_uuids): + self.gpus = gpus + self.uuids = gpu_uuids + self.gpu_list = [ + self.GPU(gpu, uuid) for gpu, uuid in zip(self.gpus, self.uuids) + ] + + def getGPUs(self): + return self.gpu_list + + +class GPUTest(unittest.TestCase): + def setUp(self): + sys.modules["GPUtil"] = GPUUtilMock([0, 1], ["GPU-aaa", "GPU-bbb"]) + + def testGPUWait1(self): + wait_for_gpu(0, delay_s=0) + + def testGPUWait2(self): + wait_for_gpu("1", delay_s=0) + + def testGPUWait3(self): + wait_for_gpu("GPU-aaa", delay_s=0) + + def testGPUWaitFail(self): + with self.assertRaises(ValueError): + wait_for_gpu(2, delay_s=0) + + with self.assertRaises(ValueError): + wait_for_gpu("4", delay_s=0) + + with self.assertRaises(ValueError): + wait_for_gpu(1.23, delay_s=0) + + @patch("ray.get_gpu_ids", lambda: ["0"]) + def testDefaultGPU(self): + import sys + sys.modules["GPUtil"] = GPUUtilMock([0], ["GPU-aaa"]) + wait_for_gpu(delay_s=0) + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 02daa858f..73c56a013 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -21,10 +21,14 @@ import psutil logger = logging.getLogger(__name__) -try: - import GPUtil -except ImportError: - GPUtil = None + +def _import_gputil(): + try: + import GPUtil + except ImportError: + GPUtil = None + return GPUtil + _pinned_objects = [] PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:" @@ -43,6 +47,8 @@ class UtilMonitor(Thread): def __init__(self, start=True, delay=0.7): self.stopped = True + GPUtil = _import_gputil() + self.GPUtil = GPUtil if GPUtil is None and start: logger.warning("Install gputil for GPU system monitoring.") @@ -67,10 +73,10 @@ class UtilMonitor(Thread): float(psutil.cpu_percent(interval=None))) self.values["ram_util_percent"].append( float(getattr(psutil.virtual_memory(), "percent"))) - if GPUtil is not None: + if self.GPUtil is not None: gpu_list = [] try: - gpu_list = GPUtil.getGPUs() + gpu_list = self.GPUtil.getGPUs() except Exception: logger.debug("GPUtil failed to retrieve GPUs.") for gpu in gpu_list: @@ -465,6 +471,7 @@ def load_newest_checkpoint(dirpath: str, ckpt_pattern: str) -> dict: def wait_for_gpu(gpu_id=None, target_util=0.01, retry=20, + delay_s=5, gpu_memory_limit=None): """Checks if a given GPU has freed memory. @@ -476,8 +483,9 @@ def wait_for_gpu(gpu_id=None, the first item returned from `ray.get_gpu_ids()`. target_util (float): The utilization threshold to reach to unblock. Set this to 0 to block until the GPU is completely free. - retry (int): Number of times to check GPU limit. Sleeps 5 + retry (int): Number of times to check GPU limit. Sleeps `delay_s` seconds between checks. + delay_s (int): Seconds to wait before check. gpu_memory_limit (float): Deprecated. Returns: @@ -497,44 +505,54 @@ def wait_for_gpu(gpu_id=None, tune.run(tune_func, resources_per_trial={"GPU": 1}, num_samples=10) """ + GPUtil = _import_gputil() if gpu_memory_limit: raise ValueError("'gpu_memory_limit' is deprecated. " "Use 'target_util' instead.") if GPUtil is None: raise RuntimeError( "GPUtil must be installed if calling `wait_for_gpu`.") + if gpu_id is None: gpu_id_list = ray.get_gpu_ids() if not gpu_id_list: - raise RuntimeError(f"No GPU ids found from {ray.get_gpu_ids()}. " + raise RuntimeError("No GPU ids found from `ray.get_gpu_ids()`. " "Did you set Tune resources correctly?") gpu_id = gpu_id_list[0] - if isinstance(gpu_id, int): - list_gpu_ids = [g.id for g in GPUtil.getGPUs()] - if gpu_id not in list_gpu_ids: - raise ValueError( - f"{gpu_id} (int) not found in GPU ids: {list_gpu_ids}. " - "wait_for_gpu takes either int (gpu id) or str (gpu uuid).") - elif isinstance(gpu_id, str): - list_uuids = [g.uuid for g in GPUtil.getGPUs()] - if gpu_id not in list_uuids: - raise ValueError( - f"{gpu_id} (str) not found in GPU uuids: {list_uuids}. " - "wait_for_gpu takes either int (gpu id) or str (gpu uuid).") - else: - raise ValueError(f"gpu_id must be int or str -- got ({type(gpu_id)})") + gpu_attr = "id" + if isinstance(gpu_id, str): + if gpu_id.isdigit(): + # GPU ID returned from `ray.get_gpu_ids()` is a str representation + # of the int GPU ID + gpu_id = int(gpu_id) + else: + # Could not coerce gpu_id to int, so assume UUID + # and compare against `uuid` attribute e.g., + # 'GPU-04546190-b68d-65ac-101b-035f8faed77d' + gpu_attr = "uuid" + elif not isinstance(gpu_id, int): + raise ValueError(f"gpu_id ({type(gpu_id)}) must be type str/int.") + + def gpu_id_fn(g): + # Returns either `g.id` or `g.uuid` depending on + # the format of the input `gpu_id` + return getattr(g, gpu_attr) + + gpu_ids = {gpu_id_fn(g) for g in GPUtil.getGPUs()} + if gpu_id not in gpu_ids: + raise ValueError( + f"{gpu_id} not found in set of available GPUs: {gpu_ids}. " + "`wait_for_gpu` takes either GPU ordinal ID (e.g., '0') or " + "UUID (e.g., 'GPU-04546190-b68d-65ac-101b-035f8faed77d').") for i in range(int(retry)): - if isinstance(gpu_id, int): - gpu_object = [g for g in GPUtil.getGPUs() if g.id == gpu_id][0] - else: - gpu_object = [g for g in GPUtil.getGPUs() if g.uuid == gpu_id][0] - + gpu_object = next( + g for g in GPUtil.getGPUs() if gpu_id_fn(g) == gpu_id) if gpu_object.memoryUtil > target_util: logger.info(f"Waiting for GPU util to reach {target_util}. " f"Util: {gpu_object.memoryUtil:0.3f}") - time.sleep(5) + time.sleep(delay_s) else: return True raise RuntimeError("GPU memory was not freed.") diff --git a/python/ray/tune/utils/util_test.py b/python/ray/tune/utils/util_test.py deleted file mode 100644 index 534061f68..000000000 --- a/python/ray/tune/utils/util_test.py +++ /dev/null @@ -1,43 +0,0 @@ -from collections import OrderedDict - -import unittest - -from .util import unflatten_dict - - -class UnflattenDictTest(unittest.TestCase): - def test_output_type(self): - in_ = OrderedDict({"a/b": 1, "c/d": 2, "e": 3}) - out = unflatten_dict(in_) - assert type(in_) is type(out) - - def test_one_level_nested(self): - result = unflatten_dict({"a/b": 1, "c/d": 2, "e": 3}) - assert result == {"a": {"b": 1}, "c": {"d": 2}, "e": 3} - - def test_multi_level_nested(self): - result = unflatten_dict({"a/b/c/d": 1, "b/c/d": 2, "c/d": 3, "e": 4}) - assert result == { - "a": { - "b": { - "c": { - "d": 1, - }, - }, - }, - "b": { - "c": { - "d": 2, - }, - }, - "c": { - "d": 3, - }, - "e": 4, - } - - -if __name__ == "__main__": - import pytest - import sys - sys.exit(pytest.main(["-v", __file__]))