[SGD] Variable worker CPU requirements (#8963)

This commit is contained in:
Xianyang Liu
2020-06-23 15:43:27 +08:00
committed by GitHub
parent acd765cb22
commit b449ece2ea
5 changed files with 155 additions and 69 deletions
+2 -1
View File
@@ -47,7 +47,8 @@ def test_tune_train(ray_start_2_cpus, num_replicas): # noqa: F811
"data_creator": tune.function(simple_dataset),
"num_replicas": num_replicas,
"use_gpu": False,
"trainer_config": SIMPLE_CONFIG
"trainer_config": SIMPLE_CONFIG,
"num_cpus_per_worker": 1
}
tune.run(
+49 -21
View File
@@ -185,12 +185,13 @@ class TestLocalDistributedRunner(unittest.TestCase):
clear_dummy_actor()
ray.shutdown()
def _testWithInitialized(self, init_mock):
def _testReserveCUDAResource(self, init_mock, num_cpus):
mock_runner = MagicMock()
mock_runner._set_cuda_device = MagicMock()
preset_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
LocalDistributedRunner._try_reserve_and_set_cuda(mock_runner)
LocalDistributedRunner._try_reserve_and_set_resources(
mock_runner, num_cpus, 1)
self.assertTrue(mock_runner._set_cuda_device.called)
local_device = mock_runner._set_cuda_device.call_args[0][0]
@@ -202,52 +203,79 @@ class TestLocalDistributedRunner(unittest.TestCase):
self.assertIn(env_set_device, visible_devices)
device_int = int(local_device)
self.assertLess(device_int, len(visible_devices))
self.assertEquals(len(os.environ["CUDA_VISIBLE_DEVICES"]), 1)
else:
self.assertEquals(local_device, env_set_device)
def testNoVisibleWithInitialized(self):
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = True
self._testWithInitialized(init_mock)
self._testReserveCUDAResource(init_mock, 0)
def test2VisibleWithInitialized(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
def testNoVisibleWithInitializedAndReserveCPUResource(self):
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = True
self._testWithInitialized(init_mock)
self._testReserveCUDAResource(init_mock, 2)
def test1VisibleWithInitialized(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = True
self._testWithInitialized(init_mock)
self._testReserveCUDAResource(init_mock, 0)
def test2VisibleNotInitialized(self):
def test2VisibleWithInitialized(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = False
mock_runner = MagicMock()
mock_runner._set_cuda_device = MagicMock()
LocalDistributedRunner._try_reserve_and_set_cuda(mock_runner)
args, _ = mock_runner._set_cuda_device.call_args
self.assertTrue(("1" in args) or "0" in args)
self.assertEquals(len(os.environ["CUDA_VISIBLE_DEVICES"]), 1)
init_mock.return_value = True
self._testReserveCUDAResource(init_mock, 0)
def test2VisibleWithInitializedAndReserveCPUResource(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = True
self._testReserveCUDAResource(init_mock, 2)
def test1VisibleNotInitialized(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = False
mock_runner = MagicMock()
mock_runner._set_cuda_device = MagicMock()
LocalDistributedRunner._try_reserve_and_set_cuda(mock_runner)
mock_runner._set_cuda_device.assert_called_with("0")
self.assertEquals(len(os.environ["CUDA_VISIBLE_DEVICES"]), 1)
self._testReserveCUDAResource(init_mock, 0)
def test1VisibleNotInitializedAndReserveCPUResource(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = False
self._testReserveCUDAResource(init_mock, 2)
def test2VisibleNotInitialized(self):
os.environ["CUDA_VISIBLE_DEVICES"] = "2,3"
with patch("torch.cuda.is_initialized") as init_mock:
init_mock.return_value = False
self._testReserveCUDAResource(init_mock, 0)
@patch("torch.cuda.set_device")
def testSetDevice(self, set_mock):
mock_runner = MagicMock()
mock_runner._is_set = False
LocalDistributedRunner._set_cuda_device(mock_runner, "123")
self.assertEquals(mock_runner.local_device, "123")
self.assertEquals(mock_runner.local_cuda_device, "123")
self.assertTrue(set_mock.called)
set_mock.assert_called_with(123)
def testV1ReserveCPUResources(self):
mock_runner = MagicMock()
mock_runner._set_cpu_devices = MagicMock()
# reserve CPU only
LocalDistributedRunner._try_reserve_and_set_resources(
mock_runner, 4, 0)
remaining = ray.available_resources()["CPU"]
self.assertEquals(int(remaining), 6)
def testV2ReserveCPUResources(self):
mock_runner = MagicMock()
mock_runner._set_cpu_devices = MagicMock()
# reserve CPU and GPU
LocalDistributedRunner._try_reserve_and_set_resources(
mock_runner, 4, 1)
remaining = ray.available_resources()["CPU"]
self.assertEquals(int(remaining), 6)
+8 -2
View File
@@ -17,6 +17,7 @@ class TFTrainer:
data_creator,
config=None,
num_replicas=1,
num_cpus_per_worker=1,
use_gpu=False,
verbose=False):
"""Sets up the TensorFlow trainer.
@@ -31,6 +32,8 @@ class TFTrainer:
'data_creator'. Also contains `fit_config`, which is passed
into `model.fit(data, **fit_config)` and
`evaluate_config` which is passed into `model.evaluate`.
num_cpus_per_worker (int): Sets the cpu requirement for each
worker.
num_replicas (int): Sets number of workers used in distributed
training. Workers will be placed arbitrarily across the
cluster.
@@ -40,6 +43,7 @@ class TFTrainer:
self.model_creator = model_creator
self.data_creator = data_creator
self.config = {} if config is None else config
self.num_cpus_per_worker = num_cpus_per_worker
self.use_gpu = use_gpu
self.num_replicas = num_replicas
self.verbose = verbose
@@ -47,7 +51,8 @@ class TFTrainer:
# Generate actor class
# todo: are these resource quotas right?
# should they be exposed to the client codee?
Runner = ray.remote(num_cpus=1, num_gpus=int(use_gpu))(TFRunner)
Runner = ray.remote(
num_cpus=self.num_cpus_per_worker, num_gpus=int(use_gpu))(TFRunner)
# todo: should we warn about using
# distributed training on one device only?
@@ -172,7 +177,8 @@ class TFTrainable(Trainable):
data_creator=config["data_creator"],
config=config.get("trainer_config", {}),
num_replicas=config["num_replicas"],
use_gpu=config["use_gpu"])
use_gpu=config["use_gpu"],
num_cpus_per_worker=config.get("num_cpus_per_worker", 1))
def _train(self):
@@ -190,27 +190,54 @@ class _DummyActor:
def cuda_devices(self):
return os.environ["CUDA_VISIBLE_DEVICES"]
def get(self):
# in order to verify the actor has created
return 1
# This is a bit of a hack. It prevents the reassignment of CUDA_VISIBLE_DEVICES
# during a trainer resize. We won't need this if we don't shutdown
# all the actors.
_dummy_actor = None
_dummy_cuda_actor = None
# used to reserve CPU resources
_dummy_cpu_actor = None
def clear_dummy_actor():
global _dummy_actor
if _dummy_actor:
global _dummy_cuda_actor
if _dummy_cuda_actor:
try:
_dummy_actor.__ray_terminate__.remote()
_dummy_cuda_actor.__ray_terminate__.remote()
except Exception as exc:
logger.info("Tried to clear dummy actor: %s", str(exc))
_dummy_actor = None
_dummy_cuda_actor = None
global _dummy_cpu_actor
if _dummy_cpu_actor:
try:
_dummy_cpu_actor.__ray_terminate__.remote()
except Exception as exc:
logger.info("Tried to clear dummy actor: %s", str(exc))
_dummy_cpu_actor = None
def reserve_cuda_device(retries=20):
def reserve_resources(num_cpus, num_gpus, retries=20):
ip = ray.services.get_node_ip_address()
reserved_device = None
reserved_cuda_device = None
if num_cpus > 0:
global _dummy_cpu_actor
if _dummy_cpu_actor is None:
_dummy_cpu_actor = ray.remote(
num_cpus=num_cpus,
resources={"node:" + ip: 0.1})(_DummyActor).remote()
assert ray.get(_dummy_cpu_actor.get.remote()) == 1
if num_gpus == 0:
return reserved_cuda_device
cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
cuda_device_set = {}
@@ -220,25 +247,25 @@ def reserve_cuda_device(retries=20):
assert isinstance(cuda_devices, str)
cuda_device_set = set(cuda_devices.split(","))
global _dummy_actor
global _dummy_cuda_actor
unused_actors = []
success = False
for _ in range(retries):
if _dummy_actor is None:
_dummy_actor = ray.remote(
num_gpus=1,
if _dummy_cuda_actor is None:
_dummy_cuda_actor = ray.remote(
num_cpus=0, num_gpus=num_gpus,
resources={"node:" + ip: 0.1})(_DummyActor).remote()
reserved_device = ray.get(_dummy_actor.cuda_devices.remote())
reserved_cuda_device = ray.get(_dummy_cuda_actor.cuda_devices.remote())
if match_devices and reserved_device not in cuda_device_set:
if match_devices and reserved_cuda_device not in cuda_device_set:
logger.debug("Device %s not in list of visible devices %s",
reserved_device, cuda_device_set)
unused_actors.append(_dummy_actor)
_dummy_actor = None
reserved_cuda_device, cuda_device_set)
unused_actors.append(_dummy_cuda_actor)
_dummy_cuda_actor = None
else:
logger.debug("Found matching device %s", reserved_device)
logger.debug("Found matching device %s", reserved_cuda_device)
success = True
for actor in unused_actors:
actor.__ray_terminate__.remote()
@@ -250,7 +277,7 @@ def reserve_cuda_device(retries=20):
"make sure that Ray has access to all the visible devices: "
"{}".format(os.environ.get("CUDA_VISIBLE_DEVICES")))
return reserved_device
return reserved_cuda_device
class LocalDistributedRunner(DistributedTorchRunner):
@@ -265,39 +292,48 @@ class LocalDistributedRunner(DistributedTorchRunner):
# Reserve a local GPU or CPU for the local worker
# TODO: we should make sure this NEVER dies.
self.local_device = "0"
self.local_cuda_device = "0"
self._is_set = False
if num_gpus:
assert num_gpus == 1, "Does not support multi-gpu workers"
if not self.is_actor() and num_gpus > 0:
self._try_reserve_and_set_cuda()
if num_cpus is None:
num_cpus = 0
if num_gpus is None:
num_gpus = 0
if not self.is_actor() and (num_cpus > 0 or num_gpus > 0):
self._try_reserve_and_set_resources(num_cpus, num_gpus)
super(LocalDistributedRunner, self).__init__(*args, **kwargs)
def _try_reserve_and_set_cuda(self):
visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
reserved_device = reserve_cuda_device()
def _try_reserve_and_set_resources(self, num_cpus, num_gpus):
visible_cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
reserved_cuda_device = reserve_resources(num_cpus, num_gpus)
if num_gpus == 0:
return
# This needs to be set even if torch.cuda is already
# initialized because the env var is used later when
# starting the DDP setup.
os.environ["CUDA_VISIBLE_DEVICES"] = reserved_device
if visible_devices:
os.environ["CUDA_VISIBLE_DEVICES"] = reserved_cuda_device
if visible_cuda_devices:
# We want to set the index on the visible devices list.
if reserved_device not in visible_devices:
if reserved_cuda_device not in visible_cuda_devices:
raise RuntimeError(
"TorchTrainer reserved a device {} that was not in the "
"CUDA_VISIBLE_DEVICES {}. This may be because the "
"Ray cluster is not set with the right env vars. "
"If that is not the issue, please raise a "
"Github issue.".format(reserved_device, visible_devices))
devices = visible_devices.split(",")
scoped_index = devices.index(reserved_device)
"If that is not the issue, please raise a Github "
"issue.".format(reserved_cuda_device,
visible_cuda_devices))
devices = visible_cuda_devices.split(",")
scoped_index = devices.index(reserved_cuda_device)
self._set_cuda_device(str(scoped_index))
else:
# Once cuda is initialized, torch.device ignores the os.env
# so we have to set the right actual device.
self._set_cuda_device(reserved_device)
self._set_cuda_device(reserved_cuda_device)
def _set_cuda_device(self, device_str):
"""Sets the CUDA device for this current local worker."""
@@ -309,24 +345,31 @@ class LocalDistributedRunner(DistributedTorchRunner):
# before we call 'set_device'.
_init_cuda_context()
assert isinstance(device_str, str)
self.local_device = device_str
logger.debug("Setting local device: %s", self.local_device)
self.local_cuda_device = device_str
logger.debug("Setting local CUDA device: %s", self.local_cuda_device)
try:
torch.cuda.set_device(int(self.local_device))
torch.cuda.set_device(int(self.local_cuda_device))
except RuntimeError:
logger.error("Failed to set local device.")
logger.error("Failed to set local CUDA device.")
raise
def get_device_ids(self):
return [int(self.local_device)]
return [int(self.local_cuda_device)]
def shutdown(self, cleanup=True):
super(LocalDistributedRunner, self).shutdown()
global _dummy_actor
if cleanup and _dummy_actor:
assert not self.is_actor(), "Actor shouldn't have a dummy actor."
ray.kill(_dummy_actor)
_dummy_actor = None
global _dummy_cpu_actor
global _dummy_cuda_actor
if cleanup:
if _dummy_cpu_actor or _dummy_cuda_actor:
assert not self.is_actor(), ("Actor shouldn't have a "
"dummy actor.")
if _dummy_cpu_actor:
ray.kill(_dummy_cpu_actor)
if _dummy_cuda_actor:
ray.kill(_dummy_cuda_actor)
_dummy_cpu_actor = None
_dummy_cuda_actor = None
def is_actor(self):
actor_id = ray.worker.global_worker.actor_id
+11 -3
View File
@@ -124,6 +124,7 @@ class TorchTrainer:
num_workers (int): the number of workers used in distributed
training. If 1, the worker will not be wrapped with
DistributedDataParallel.
num_cpus_per_worker (int): Sets the cpu requirement for each worker.
use_gpu (bool): Sets resource allocation for workers to 1 GPU
if true, and automatically moves both the model and optimizer
to the available CUDA device.
@@ -175,6 +176,7 @@ class TorchTrainer:
initialization_hook=None,
config=None,
num_workers=1,
num_cpus_per_worker=1,
use_gpu="auto",
backend="auto",
wrap_ddp=True,
@@ -240,6 +242,7 @@ class TorchTrainer:
logger.debug("Using {} as backend.".format(backend))
self.backend = backend
self.num_cpus_per_worker = num_cpus_per_worker
self.use_gpu = use_gpu
self.max_replicas = num_workers
@@ -328,11 +331,14 @@ class TorchTrainer:
# Start local worker
self.local_worker = LocalDistributedRunner(
num_cpus=1, num_gpus=int(self.use_gpu), **params)
num_cpus=self.num_cpus_per_worker,
num_gpus=int(self.use_gpu),
**params)
# Generate actor class
RemoteRunner = ray.remote(
num_cpus=1, num_gpus=int(self.use_gpu))(DistributedTorchRunner)
num_cpus=self.num_cpus_per_worker,
num_gpus=int(self.use_gpu))(DistributedTorchRunner)
# Start workers
self.remote_workers = [
RemoteRunner.remote(**params) for i in range(num_workers - 1)
@@ -736,12 +742,14 @@ class TorchTrainer:
def default_resource_request(cls, config):
num_workers = config.get("num_workers",
kwargs.get("num_workers", 1))
num_cpus = config.get("num_cpus_per_worker",
kwargs.get("num_cpus_per_worker", 1))
use_gpu = config.get("use_gpu", kwargs.get("use_gpu"))
remote_worker_count = num_workers - 1
return Resources(
cpu=1,
cpu=num_cpus,
gpu=int(use_gpu),
extra_cpu=int(remote_worker_count),
extra_gpu=int(int(use_gpu) * remote_worker_count))