mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:44:07 +08:00
[tune] Fixed wait_for_gpu to handle str representations of ordinal IDs (#13936)
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
@@ -1,10 +1,14 @@
|
||||
from collections import OrderedDict
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
import shutil
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
import ray.utils
|
||||
|
||||
import ray.cloudpickle as cloudpickle
|
||||
from ray.tune.utils.util import wait_for_gpu
|
||||
from ray.tune.utils.util import unflatten_dict
|
||||
from ray.tune.utils.trainable import TrainableUtil
|
||||
|
||||
|
||||
@@ -12,13 +16,15 @@ class TrainableUtilTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"tune", "MyTrainable123")
|
||||
TrainableUtil.make_checkpoint_dir(self.checkpoint_dir)
|
||||
self.checkpoint_dir = TrainableUtil.make_checkpoint_dir(
|
||||
self.checkpoint_dir, "0")
|
||||
|
||||
def tearDown(self):
|
||||
self.addCleanup(shutil.rmtree, self.checkpoint_dir)
|
||||
|
||||
def testFindCheckpointDir(self):
|
||||
checkpoint_path = os.path.join(self.checkpoint_dir, "my/nested/chkpt")
|
||||
checkpoint_path = os.path.join(self.checkpoint_dir,
|
||||
"0/my/nested/chkpt")
|
||||
os.makedirs(checkpoint_path)
|
||||
found_dir = TrainableUtil.find_checkpoint_dir(checkpoint_path)
|
||||
self.assertEquals(self.checkpoint_dir, found_dir)
|
||||
@@ -36,7 +42,7 @@ class TrainableUtilTest(unittest.TestCase):
|
||||
checkpoint_path = os.path.join(self.checkpoint_dir, "0")
|
||||
|
||||
data_dict = TrainableUtil.pickle_checkpoint(checkpoint_path)
|
||||
loaded = pickle.loads(data_dict)
|
||||
loaded = cloudpickle.loads(data_dict)
|
||||
|
||||
checkpoint_name = os.path.basename(checkpoint_path)
|
||||
self.assertEqual(loaded["checkpoint_name"], checkpoint_name)
|
||||
@@ -44,3 +50,94 @@ class TrainableUtilTest(unittest.TestCase):
|
||||
for i in range(5):
|
||||
path = os.path.join(self.checkpoint_dir, str(i))
|
||||
self.assertEquals(loaded["data"][str(i)], open(path, "rb").read())
|
||||
|
||||
|
||||
class UnflattenDictTest(unittest.TestCase):
|
||||
def test_output_type(self):
|
||||
in_ = OrderedDict({"a/b": 1, "c/d": 2, "e": 3})
|
||||
out = unflatten_dict(in_)
|
||||
assert type(in_) is type(out)
|
||||
|
||||
def test_one_level_nested(self):
|
||||
result = unflatten_dict({"a/b": 1, "c/d": 2, "e": 3})
|
||||
assert result == {"a": {"b": 1}, "c": {"d": 2}, "e": 3}
|
||||
|
||||
def test_multi_level_nested(self):
|
||||
result = unflatten_dict({"a/b/c/d": 1, "b/c/d": 2, "c/d": 3, "e": 4})
|
||||
assert result == {
|
||||
"a": {
|
||||
"b": {
|
||||
"c": {
|
||||
"d": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
"b": {
|
||||
"c": {
|
||||
"d": 2,
|
||||
},
|
||||
},
|
||||
"c": {
|
||||
"d": 3,
|
||||
},
|
||||
"e": 4,
|
||||
}
|
||||
|
||||
|
||||
class GPUUtilMock:
|
||||
class GPU:
|
||||
def __init__(self, id, uuid, util=None):
|
||||
self.id = id
|
||||
self.uuid = uuid
|
||||
self.util = [0.5, 0.0]
|
||||
|
||||
@property
|
||||
def memoryUtil(self):
|
||||
if self.util:
|
||||
return self.util.pop(0)
|
||||
return 0
|
||||
|
||||
def __init__(self, gpus, gpu_uuids):
|
||||
self.gpus = gpus
|
||||
self.uuids = gpu_uuids
|
||||
self.gpu_list = [
|
||||
self.GPU(gpu, uuid) for gpu, uuid in zip(self.gpus, self.uuids)
|
||||
]
|
||||
|
||||
def getGPUs(self):
|
||||
return self.gpu_list
|
||||
|
||||
|
||||
class GPUTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
sys.modules["GPUtil"] = GPUUtilMock([0, 1], ["GPU-aaa", "GPU-bbb"])
|
||||
|
||||
def testGPUWait1(self):
|
||||
wait_for_gpu(0, delay_s=0)
|
||||
|
||||
def testGPUWait2(self):
|
||||
wait_for_gpu("1", delay_s=0)
|
||||
|
||||
def testGPUWait3(self):
|
||||
wait_for_gpu("GPU-aaa", delay_s=0)
|
||||
|
||||
def testGPUWaitFail(self):
|
||||
with self.assertRaises(ValueError):
|
||||
wait_for_gpu(2, delay_s=0)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
wait_for_gpu("4", delay_s=0)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
wait_for_gpu(1.23, delay_s=0)
|
||||
|
||||
@patch("ray.get_gpu_ids", lambda: ["0"])
|
||||
def testDefaultGPU(self):
|
||||
import sys
|
||||
sys.modules["GPUtil"] = GPUUtilMock([0], ["GPU-aaa"])
|
||||
wait_for_gpu(delay_s=0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
||||
@@ -21,10 +21,14 @@ import psutil
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import GPUtil
|
||||
except ImportError:
|
||||
GPUtil = None
|
||||
|
||||
def _import_gputil():
|
||||
try:
|
||||
import GPUtil
|
||||
except ImportError:
|
||||
GPUtil = None
|
||||
return GPUtil
|
||||
|
||||
|
||||
_pinned_objects = []
|
||||
PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:"
|
||||
@@ -43,6 +47,8 @@ class UtilMonitor(Thread):
|
||||
|
||||
def __init__(self, start=True, delay=0.7):
|
||||
self.stopped = True
|
||||
GPUtil = _import_gputil()
|
||||
self.GPUtil = GPUtil
|
||||
if GPUtil is None and start:
|
||||
logger.warning("Install gputil for GPU system monitoring.")
|
||||
|
||||
@@ -67,10 +73,10 @@ class UtilMonitor(Thread):
|
||||
float(psutil.cpu_percent(interval=None)))
|
||||
self.values["ram_util_percent"].append(
|
||||
float(getattr(psutil.virtual_memory(), "percent")))
|
||||
if GPUtil is not None:
|
||||
if self.GPUtil is not None:
|
||||
gpu_list = []
|
||||
try:
|
||||
gpu_list = GPUtil.getGPUs()
|
||||
gpu_list = self.GPUtil.getGPUs()
|
||||
except Exception:
|
||||
logger.debug("GPUtil failed to retrieve GPUs.")
|
||||
for gpu in gpu_list:
|
||||
@@ -465,6 +471,7 @@ def load_newest_checkpoint(dirpath: str, ckpt_pattern: str) -> dict:
|
||||
def wait_for_gpu(gpu_id=None,
|
||||
target_util=0.01,
|
||||
retry=20,
|
||||
delay_s=5,
|
||||
gpu_memory_limit=None):
|
||||
"""Checks if a given GPU has freed memory.
|
||||
|
||||
@@ -476,8 +483,9 @@ def wait_for_gpu(gpu_id=None,
|
||||
the first item returned from `ray.get_gpu_ids()`.
|
||||
target_util (float): The utilization threshold to reach to unblock.
|
||||
Set this to 0 to block until the GPU is completely free.
|
||||
retry (int): Number of times to check GPU limit. Sleeps 5
|
||||
retry (int): Number of times to check GPU limit. Sleeps `delay_s`
|
||||
seconds between checks.
|
||||
delay_s (int): Seconds to wait before check.
|
||||
gpu_memory_limit (float): Deprecated.
|
||||
|
||||
Returns:
|
||||
@@ -497,44 +505,54 @@ def wait_for_gpu(gpu_id=None,
|
||||
|
||||
tune.run(tune_func, resources_per_trial={"GPU": 1}, num_samples=10)
|
||||
"""
|
||||
GPUtil = _import_gputil()
|
||||
if gpu_memory_limit:
|
||||
raise ValueError("'gpu_memory_limit' is deprecated. "
|
||||
"Use 'target_util' instead.")
|
||||
if GPUtil is None:
|
||||
raise RuntimeError(
|
||||
"GPUtil must be installed if calling `wait_for_gpu`.")
|
||||
|
||||
if gpu_id is None:
|
||||
gpu_id_list = ray.get_gpu_ids()
|
||||
if not gpu_id_list:
|
||||
raise RuntimeError(f"No GPU ids found from {ray.get_gpu_ids()}. "
|
||||
raise RuntimeError("No GPU ids found from `ray.get_gpu_ids()`. "
|
||||
"Did you set Tune resources correctly?")
|
||||
gpu_id = gpu_id_list[0]
|
||||
|
||||
if isinstance(gpu_id, int):
|
||||
list_gpu_ids = [g.id for g in GPUtil.getGPUs()]
|
||||
if gpu_id not in list_gpu_ids:
|
||||
raise ValueError(
|
||||
f"{gpu_id} (int) not found in GPU ids: {list_gpu_ids}. "
|
||||
"wait_for_gpu takes either int (gpu id) or str (gpu uuid).")
|
||||
elif isinstance(gpu_id, str):
|
||||
list_uuids = [g.uuid for g in GPUtil.getGPUs()]
|
||||
if gpu_id not in list_uuids:
|
||||
raise ValueError(
|
||||
f"{gpu_id} (str) not found in GPU uuids: {list_uuids}. "
|
||||
"wait_for_gpu takes either int (gpu id) or str (gpu uuid).")
|
||||
else:
|
||||
raise ValueError(f"gpu_id must be int or str -- got ({type(gpu_id)})")
|
||||
gpu_attr = "id"
|
||||
if isinstance(gpu_id, str):
|
||||
if gpu_id.isdigit():
|
||||
# GPU ID returned from `ray.get_gpu_ids()` is a str representation
|
||||
# of the int GPU ID
|
||||
gpu_id = int(gpu_id)
|
||||
else:
|
||||
# Could not coerce gpu_id to int, so assume UUID
|
||||
# and compare against `uuid` attribute e.g.,
|
||||
# 'GPU-04546190-b68d-65ac-101b-035f8faed77d'
|
||||
gpu_attr = "uuid"
|
||||
elif not isinstance(gpu_id, int):
|
||||
raise ValueError(f"gpu_id ({type(gpu_id)}) must be type str/int.")
|
||||
|
||||
def gpu_id_fn(g):
|
||||
# Returns either `g.id` or `g.uuid` depending on
|
||||
# the format of the input `gpu_id`
|
||||
return getattr(g, gpu_attr)
|
||||
|
||||
gpu_ids = {gpu_id_fn(g) for g in GPUtil.getGPUs()}
|
||||
if gpu_id not in gpu_ids:
|
||||
raise ValueError(
|
||||
f"{gpu_id} not found in set of available GPUs: {gpu_ids}. "
|
||||
"`wait_for_gpu` takes either GPU ordinal ID (e.g., '0') or "
|
||||
"UUID (e.g., 'GPU-04546190-b68d-65ac-101b-035f8faed77d').")
|
||||
|
||||
for i in range(int(retry)):
|
||||
if isinstance(gpu_id, int):
|
||||
gpu_object = [g for g in GPUtil.getGPUs() if g.id == gpu_id][0]
|
||||
else:
|
||||
gpu_object = [g for g in GPUtil.getGPUs() if g.uuid == gpu_id][0]
|
||||
|
||||
gpu_object = next(
|
||||
g for g in GPUtil.getGPUs() if gpu_id_fn(g) == gpu_id)
|
||||
if gpu_object.memoryUtil > target_util:
|
||||
logger.info(f"Waiting for GPU util to reach {target_util}. "
|
||||
f"Util: {gpu_object.memoryUtil:0.3f}")
|
||||
time.sleep(5)
|
||||
time.sleep(delay_s)
|
||||
else:
|
||||
return True
|
||||
raise RuntimeError("GPU memory was not freed.")
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
from collections import OrderedDict
|
||||
|
||||
import unittest
|
||||
|
||||
from .util import unflatten_dict
|
||||
|
||||
|
||||
class UnflattenDictTest(unittest.TestCase):
|
||||
def test_output_type(self):
|
||||
in_ = OrderedDict({"a/b": 1, "c/d": 2, "e": 3})
|
||||
out = unflatten_dict(in_)
|
||||
assert type(in_) is type(out)
|
||||
|
||||
def test_one_level_nested(self):
|
||||
result = unflatten_dict({"a/b": 1, "c/d": 2, "e": 3})
|
||||
assert result == {"a": {"b": 1}, "c": {"d": 2}, "e": 3}
|
||||
|
||||
def test_multi_level_nested(self):
|
||||
result = unflatten_dict({"a/b/c/d": 1, "b/c/d": 2, "c/d": 3, "e": 4})
|
||||
assert result == {
|
||||
"a": {
|
||||
"b": {
|
||||
"c": {
|
||||
"d": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
"b": {
|
||||
"c": {
|
||||
"d": 2,
|
||||
},
|
||||
},
|
||||
"c": {
|
||||
"d": 3,
|
||||
},
|
||||
"e": 4,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
Reference in New Issue
Block a user