[docker] Detect CPUs in container correctly (#10507)

Co-authored-by: simon-mo <simon.mo@hey.com>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Alex Wu <itswu.alex@gmail.com>
This commit is contained in:
Ian Rodney
2020-09-13 23:40:48 -07:00
committed by GitHub
parent 660aee6311
commit 5bc2ba38fd
35 changed files with 173 additions and 45 deletions
+4 -3
View File
@@ -1,7 +1,6 @@
import math
from collections import namedtuple
import logging
import multiprocessing
import os
import re
import subprocess
@@ -105,7 +104,9 @@ class ResourceSpec(
# Check types.
for resource_label, resource_quantity in resources.items():
assert (isinstance(resource_quantity, int)
or isinstance(resource_quantity, float))
or isinstance(resource_quantity, float)), (
f"{resource_label} ({type(resource_quantity)}): "
f"{resource_quantity}")
if (isinstance(resource_quantity, float)
and not resource_quantity.is_integer()):
raise ValueError(
@@ -148,7 +149,7 @@ class ResourceSpec(
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = multiprocessing.cpu_count()
num_cpus = ray.utils.get_num_cpus()
num_gpus = self.num_gpus
gpu_ids = ray.utils.get_cuda_visible_devices()
@@ -1,6 +1,8 @@
import ray
from ray import serve
import requests
ray.init(num_cpus=8)
client = serve.start()
@@ -1,6 +1,8 @@
import ray
from ray import serve
import requests
ray.init(num_cpus=8)
client = serve.start()
@@ -3,7 +3,7 @@ import requests
import ray
from ray import serve
ray.init(num_cpus=10)
ray.init(num_cpus=8)
client = serve.start()
# Our pipeline will be structured as follows:
@@ -28,8 +28,8 @@ def batch_adder_v0(flask_requests: List):
# __doc_define_servable_v0_end__
ray.init(num_cpus=8)
# __doc_deploy_begin__
ray.init(num_cpus=10)
client = serve.start()
client.create_backend("adder:v0", batch_adder_v0, config={"max_batch_size": 4})
client.create_endpoint(
@@ -1,4 +1,5 @@
# yapf: disable
import ray
# __doc_import_begin__
from ray import serve
@@ -45,6 +46,7 @@ class ImageModel:
# __doc_define_servable_end__
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("resnet18:v0", ImageModel)
@@ -1,4 +1,5 @@
# yapf: disable
import ray
# __doc_import_begin__
from ray import serve
@@ -70,6 +71,7 @@ class BoostingModel:
# __doc_define_servable_end__
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("lr:v1", BoostingModel)
@@ -1,4 +1,5 @@
# yapf: disable
import ray
# __doc_import_begin__
from ray import serve
@@ -68,6 +69,7 @@ class TFMnistModel:
# __doc_define_servable_end__
ray.init(num_cpus=8)
# __doc_deploy_begin__
client = serve.start()
client.create_backend("tf:v1", TFMnistModel, TRAINED_MODEL_PATH)
@@ -70,6 +70,7 @@ class TestMemoryScheduling(unittest.TestCase):
def testTuneDriverHeapLimit(self):
try:
ray.init(num_cpus=4, _memory=100 * MB)
_register_all()
result = tune.run(
"PG",
@@ -89,6 +90,11 @@ class TestMemoryScheduling(unittest.TestCase):
def testTuneDriverStoreLimit(self):
try:
ray.init(
num_cpus=4,
_memory=100 * MB,
object_store_memory=100 * MB,
)
_register_all()
self.assertRaisesRegexp(
ray.tune.error.TuneError,
@@ -107,6 +113,7 @@ class TestMemoryScheduling(unittest.TestCase):
def testTuneWorkerHeapLimit(self):
try:
ray.init(num_cpus=4, _memory=100 * MB)
_register_all()
result = tune.run(
"PG",
@@ -127,6 +134,11 @@ class TestMemoryScheduling(unittest.TestCase):
def testTuneWorkerStoreLimit(self):
try:
ray.init(
num_cpus=4,
_memory=100 * MB,
object_store_memory=100 * MB,
)
_register_all()
self.assertRaisesRegexp(
ray.tune.error.TuneError,
@@ -144,6 +156,7 @@ class TestMemoryScheduling(unittest.TestCase):
def testTuneObjectLimitApplied(self):
try:
ray.init(num_cpus=2, object_store_memory=500 * MB)
result = tune.run(
train_oom,
resources_per_trial={"object_store_memory": 150 * 1024 * 1024},
@@ -12,6 +12,7 @@ import torch.optim as optim
from torch.utils.data import random_split
import torchvision
import torchvision.transforms as transforms
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
@@ -231,6 +232,7 @@ if __name__ == "__main__":
args, _ = parser.parse_known_args()
if args.smoke_test:
ray.init(num_cpus=2)
main(num_samples=1, max_num_epochs=1, gpus_per_trial=0)
else:
# Change this to activate training on GPUs
@@ -97,6 +97,9 @@ if __name__ == "__main__":
parser.add_argument("--hosts-per-trial", type=int, default=1)
parser.add_argument("--slots-per-host", type=int, default=2)
args = parser.parse_args()
if args.smoke_test:
import ray
ray.init(num_cpus=2)
# import ray
# ray.init(address="auto") # assumes ray is started with ray up
@@ -76,7 +76,7 @@ if __name__ == "__main__":
"--smoke-test", action="store_true", help="Finish quickly for testing")
args, _ = parser.parse_known_args()
ray.init()
ray.init(num_cpus=2)
datasets.MNIST("~/data", train=True, download=True)
# check if PytorchTrainble will save/restore correctly before execution
+2
View File
@@ -58,6 +58,7 @@ import torch.optim as optim
from torch.utils.data import random_split
import torchvision
import torchvision.transforms as transforms
import ray
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
@@ -434,6 +435,7 @@ def main(num_samples=10, max_num_epochs=10, gpus_per_trial=2):
if __name__ == "__main__":
# You can change the number of GPUs per trial here:
ray.init(num_cpus=2) # for testing purposes only
main(num_samples=2, max_num_epochs=2, gpus_per_trial=0)
+2 -1
View File
@@ -11,7 +11,8 @@ def f(config, reporter):
if __name__ == "__main__":
ray.init()
ray.init(num_cpus=2)
register_trainable("my_class", f)
run_experiments({
"test": {
@@ -16,7 +16,7 @@ from ray.cluster_utils import Cluster
class RayTrialExecutorTest(unittest.TestCase):
def setUp(self):
self.trial_executor = RayTrialExecutor(queue_trials=False)
ray.init(ignore_reinit_error=True)
ray.init(num_cpus=2, ignore_reinit_error=True)
_register_all() # Needed for flaky tests
def tearDown(self):
+1 -1
View File
@@ -17,7 +17,7 @@ from ray.tune.syncer import CommandBasedClient
class TestSyncFunctionality(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
+1 -1
View File
@@ -20,7 +20,7 @@ class TrackApiTest(unittest.TestCase):
def testSoftDeprecation(self):
"""Checks that tune.track.log code does not break."""
from ray.tune import track
ray.init()
ray.init(num_cpus=2)
def testme(config):
for i in range(config["iters"]):
+3 -2
View File
@@ -23,7 +23,8 @@ class TrialRunnerTest(unittest.TestCase):
ray.shutdown()
def testTrialStatus(self):
ray.init()
ray.init(num_cpus=2)
trial = Trial("__fake")
trial_executor = RayTrialExecutor()
self.assertEqual(trial.status, Trial.PENDING)
@@ -35,7 +36,7 @@ class TrialRunnerTest(unittest.TestCase):
self.assertEqual(trial.status, Trial.ERROR)
def testExperimentTagTruncation(self):
ray.init()
ray.init(num_cpus=2)
def train(config, reporter):
reporter(timesteps_total=1)
+4 -2
View File
@@ -540,7 +540,8 @@ class TrialRunnerTest3(unittest.TestCase):
runner2.step()
def testCheckpointWithFunction(self):
ray.init()
ray.init(num_cpus=2)
trial = Trial(
"__fake",
config={"callbacks": {
@@ -565,7 +566,8 @@ class TrialRunnerTest3(unittest.TestCase):
and fname.endswith(".json"))
for fname in os.listdir(cdir))
ray.init()
ray.init(num_cpus=2)
trial = Trial("__fake", checkpoint_freq=1)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
@@ -38,7 +38,7 @@ def mock_trial_runner(trials=None):
class EarlyStoppingSuite(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
@@ -782,7 +782,7 @@ class _MockTrial(Trial):
class PopulationBasedTestingSuite(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
@@ -1802,7 +1802,7 @@ class E2EPopulationBasedTestingSuite(unittest.TestCase):
class AsyncHyperBandSuite(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
@@ -104,7 +104,7 @@ class PopulationBasedTrainingSynchTest(unittest.TestCase):
class PopulationBasedTrainingConfigTest(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
@@ -145,7 +145,7 @@ class PopulationBasedTrainingConfigTest(unittest.TestCase):
class PopulationBasedTrainingResumeTest(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
+1 -1
View File
@@ -85,7 +85,7 @@ class TuneRestoreTest(unittest.TestCase):
class TuneExampleTest(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
+1 -1
View File
@@ -15,7 +15,7 @@ from ray.tune.suggest.variant_generator import (RecursiveDependencyError,
class VariantGeneratorTest(unittest.TestCase):
def setUp(self):
ray.init()
ray.init(num_cpus=2)
def tearDown(self):
ray.shutdown()
+7 -7
View File
@@ -67,7 +67,7 @@ py_test(
srcs = ["tf/examples/tensorflow_train_example.py"],
tags = ["exclusive", "tf"],
deps = [":sgd_lib"],
args = ["--num-replicas=1"]
args = ["--num-replicas=1", "--smoke-test"]
)
py_test(
@@ -77,7 +77,7 @@ py_test(
srcs = ["tf/examples/tensorflow_train_example.py"],
tags = ["exclusive", "tf"],
deps = [":sgd_lib"],
args = ["--num-replicas=2"]
args = ["--num-replicas=2", "--smoke-test"]
)
py_test(
@@ -87,7 +87,7 @@ py_test(
srcs = ["tf/examples/tensorflow_train_example.py"],
tags = ["exclusive", "tf"],
deps = [":sgd_lib"],
args = ["--tune"]
args = ["--tune", "--smoke-test"]
)
# --------------------------------------------------------------------
@@ -148,7 +148,7 @@ py_test(
srcs = ["torch/examples/train_example.py"],
tags = ["exclusive", "pytorch"],
deps = [":sgd_lib"],
args = ["--num-workers=1"]
args = ["--num-workers=1", "--smoke-test"]
)
py_test(
@@ -158,7 +158,7 @@ py_test(
srcs = ["torch/examples/train_example.py"],
tags = ["exclusive", "pytorch"],
deps = [":sgd_lib"],
args = ["--num-workers=2"]
args = ["--num-workers=2", "--smoke-test"]
)
py_test(
@@ -168,7 +168,7 @@ py_test(
srcs = ["torch/examples/tune_example.py"],
tags = ["exclusive", "pytorch"],
deps = [":sgd_lib"],
args = ["--num-workers=1"]
args = ["--num-workers=1", "--smoke-test"]
)
py_test(
@@ -178,7 +178,7 @@ py_test(
srcs = ["torch/examples/tune_example.py"],
tags = ["exclusive", "pytorch"],
deps = [":sgd_lib"],
args = ["--num-workers=2"]
args = ["--num-workers=2", "--smoke-test"]
)
@@ -177,7 +177,10 @@ if __name__ == "__main__":
help="Finish quickly for testing. Assume False for users.")
args, _ = parser.parse_known_args()
ray.init(address=args.address)
if args.smoke_test:
ray.init(num_cpus=2)
else:
ray.init(address=args.address)
data_size = 60000
test_size = 10000
batch_size = args.batch_size
@@ -114,6 +114,8 @@ def tune_example(num_replicas=1, use_gpu=False):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--address",
required=False,
@@ -135,7 +137,10 @@ if __name__ == "__main__":
args, _ = parser.parse_known_args()
ray.init(address=args.address)
if args.smoke_test:
ray.init(num_cpus=2)
else:
ray.init(address=args.address)
if args.tune:
tune_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)
@@ -109,7 +109,10 @@ class Training(TrainingOperator):
if __name__ == "__main__":
ray.init(address=None if args.local else "auto")
if args.local:
ray.init(num_cpus=2)
else:
ray.init(address="auto")
num_workers = 2 if args.local else int(ray.cluster_resources().get(device))
from ray.util.sgd.torch.examples.train_example import LinearDataset
+4 -1
View File
@@ -289,7 +289,10 @@ if __name__ == "__main__":
default=False,
help="Enables GPU training")
args = parser.parse_args()
ray.init(address=args.address)
if args.smoke_test:
ray.init(num_cpus=2)
else:
ray.init(address=args.address)
trainer = train_example(
num_workers=args.num_workers,
@@ -128,8 +128,10 @@ def main():
setup_default_logging()
args, args_text = parse_args()
ray.init(address=args.ray_address)
if args.smoke_test:
ray.init(num_cpus=int(args.ray_num_workers))
else:
ray.init(address=args.ray_address)
CustomTrainingOperator = TrainingOperator.from_creators(
model_creator=model_creator,
@@ -115,10 +115,17 @@ if __name__ == "__main__":
help="Enables GPU training")
parser.add_argument(
"--tune", action="store_true", default=False, help="Tune training")
parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.")
args, _ = parser.parse_known_args()
import ray
ray.init(address=args.address)
if args.smoke_test:
ray.init(num_cpus=2)
else:
ray.init(address=args.address)
train_example(num_workers=args.num_workers, use_gpu=args.use_gpu)
@@ -59,6 +59,8 @@ def tune_example(operator_cls, num_workers=1, use_gpu=False):
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--address",
type=str,
@@ -77,7 +79,10 @@ if __name__ == "__main__":
args, _ = parser.parse_known_args()
ray.init(address=args.address)
if args.smoke_test:
ray.init(num_cpus=2)
else:
ray.init(address=args.address)
CustomTrainingOperator = TrainingOperator.from_creators(
model_creator=model_creator, optimizer_creator=optimizer_creator,
data_creator=data_creator, loss_creator=nn.MSELoss)
+51
View File
@@ -3,6 +3,7 @@ import errno
import hashlib
import inspect
import logging
import multiprocessing
import numpy as np
import os
import signal
@@ -498,6 +499,56 @@ def get_system_memory():
return psutil_memory_in_bytes
def _get_docker_cpus():
# 1. Try using CFS Quota (https://bugs.openjdk.java.net/browse/JDK-8146115)
# 2. Try Nproc (CPU sets)
cpu_quota_file_name = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
cpu_share_file_name = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
num_cpus = 0
if os.path.exists(cpu_quota_file_name) and os.path.exists(
cpu_quota_file_name):
with open(cpu_quota_file_name, "r") as f:
num_cpus = int(f.read())
if num_cpus != -1:
with open(cpu_share_file_name, "r") as f:
num_cpus /= int(f.read())
return num_cpus
return int(subprocess.check_output("nproc"))
def get_num_cpus():
cpu_count = multiprocessing.cpu_count()
if "RAY_USE_MULTIPROCESSING_CPU_COUNT" in os.environ:
logger.info(
"Using multiprocessing.cpu_count() to detect the number of CPUs. "
"This method of CPU detection is buggy when used inside docker. "
"To correctly detect CPUs remove the enivronment variable: "
"`RAY_USE_MULTIPROCESSING_CPU_COUNT`.")
return cpu_count
try:
# Not easy to get cpu count in docker, see:
# https://bugs.python.org/issue36054
docker_count = _get_docker_cpus()
if docker_count != cpu_count:
cpu_count = docker_count
if "RAY_DISABLE_DOCKER_CPU_WARNING" not in os.environ:
logger.warning(
"Detecting limited number of CPUs due to docker. In "
"previous versions of Ray, CPU detection in containers "
"was buggy. Please ensure that Ray has enough CPUs "
"allocated. This message will be removed in future "
"version of Ray. You can set the environment variable: "
"`RAY_DISABLE_DOCKER_CPU_WARNING` to remove it now.")
except Exception:
# `nproc` and cgroup are linux-only. If docker only works on linux
# (will run in a linux VM on other platforms), so this is fine.
pass
return cpu_count
def get_used_memory():
"""Return the currently used system memory in bytes