From b449ece2ea876a23da4b3e46d096d9f2a3ddabbb Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Tue, 23 Jun 2020 15:43:27 +0800 Subject: [PATCH] [SGD] Variable worker CPU requirements (#8963) --- python/ray/util/sgd/tests/test_tensorflow.py | 3 +- .../ray/util/sgd/tests/test_torch_runner.py | 70 +++++++--- python/ray/util/sgd/tf/tf_trainer.py | 10 +- .../sgd/torch/distributed_torch_runner.py | 127 ++++++++++++------ python/ray/util/sgd/torch/torch_trainer.py | 14 +- 5 files changed, 155 insertions(+), 69 deletions(-) diff --git a/python/ray/util/sgd/tests/test_tensorflow.py b/python/ray/util/sgd/tests/test_tensorflow.py index b6b33690b..1072ce129 100644 --- a/python/ray/util/sgd/tests/test_tensorflow.py +++ b/python/ray/util/sgd/tests/test_tensorflow.py @@ -47,7 +47,8 @@ def test_tune_train(ray_start_2_cpus, num_replicas): # noqa: F811 "data_creator": tune.function(simple_dataset), "num_replicas": num_replicas, "use_gpu": False, - "trainer_config": SIMPLE_CONFIG + "trainer_config": SIMPLE_CONFIG, + "num_cpus_per_worker": 1 } tune.run( diff --git a/python/ray/util/sgd/tests/test_torch_runner.py b/python/ray/util/sgd/tests/test_torch_runner.py index cabea8c93..ca8292373 100644 --- a/python/ray/util/sgd/tests/test_torch_runner.py +++ b/python/ray/util/sgd/tests/test_torch_runner.py @@ -185,12 +185,13 @@ class TestLocalDistributedRunner(unittest.TestCase): clear_dummy_actor() ray.shutdown() - def _testWithInitialized(self, init_mock): + def _testReserveCUDAResource(self, init_mock, num_cpus): mock_runner = MagicMock() mock_runner._set_cuda_device = MagicMock() preset_devices = os.environ.get("CUDA_VISIBLE_DEVICES") - LocalDistributedRunner._try_reserve_and_set_cuda(mock_runner) + LocalDistributedRunner._try_reserve_and_set_resources( + mock_runner, num_cpus, 1) self.assertTrue(mock_runner._set_cuda_device.called) local_device = mock_runner._set_cuda_device.call_args[0][0] @@ -202,52 +203,79 @@ class TestLocalDistributedRunner(unittest.TestCase): self.assertIn(env_set_device, visible_devices) device_int = int(local_device) self.assertLess(device_int, len(visible_devices)) + self.assertEquals(len(os.environ["CUDA_VISIBLE_DEVICES"]), 1) else: self.assertEquals(local_device, env_set_device) def testNoVisibleWithInitialized(self): with patch("torch.cuda.is_initialized") as init_mock: init_mock.return_value = True - self._testWithInitialized(init_mock) + self._testReserveCUDAResource(init_mock, 0) - def test2VisibleWithInitialized(self): - os.environ["CUDA_VISIBLE_DEVICES"] = "2,3" + def testNoVisibleWithInitializedAndReserveCPUResource(self): with patch("torch.cuda.is_initialized") as init_mock: init_mock.return_value = True - self._testWithInitialized(init_mock) + self._testReserveCUDAResource(init_mock, 2) def test1VisibleWithInitialized(self): os.environ["CUDA_VISIBLE_DEVICES"] = "0" with patch("torch.cuda.is_initialized") as init_mock: init_mock.return_value = True - self._testWithInitialized(init_mock) + self._testReserveCUDAResource(init_mock, 0) - def test2VisibleNotInitialized(self): + def test2VisibleWithInitialized(self): os.environ["CUDA_VISIBLE_DEVICES"] = "2,3" with patch("torch.cuda.is_initialized") as init_mock: - init_mock.return_value = False - mock_runner = MagicMock() - mock_runner._set_cuda_device = MagicMock() - LocalDistributedRunner._try_reserve_and_set_cuda(mock_runner) - args, _ = mock_runner._set_cuda_device.call_args - self.assertTrue(("1" in args) or "0" in args) - self.assertEquals(len(os.environ["CUDA_VISIBLE_DEVICES"]), 1) + init_mock.return_value = True + self._testReserveCUDAResource(init_mock, 0) + + def test2VisibleWithInitializedAndReserveCPUResource(self): + os.environ["CUDA_VISIBLE_DEVICES"] = "2,3" + with patch("torch.cuda.is_initialized") as init_mock: + init_mock.return_value = True + self._testReserveCUDAResource(init_mock, 2) def test1VisibleNotInitialized(self): os.environ["CUDA_VISIBLE_DEVICES"] = "0" with patch("torch.cuda.is_initialized") as init_mock: init_mock.return_value = False - mock_runner = MagicMock() - mock_runner._set_cuda_device = MagicMock() - LocalDistributedRunner._try_reserve_and_set_cuda(mock_runner) - mock_runner._set_cuda_device.assert_called_with("0") - self.assertEquals(len(os.environ["CUDA_VISIBLE_DEVICES"]), 1) + self._testReserveCUDAResource(init_mock, 0) + + def test1VisibleNotInitializedAndReserveCPUResource(self): + os.environ["CUDA_VISIBLE_DEVICES"] = "0" + with patch("torch.cuda.is_initialized") as init_mock: + init_mock.return_value = False + self._testReserveCUDAResource(init_mock, 2) + + def test2VisibleNotInitialized(self): + os.environ["CUDA_VISIBLE_DEVICES"] = "2,3" + with patch("torch.cuda.is_initialized") as init_mock: + init_mock.return_value = False + self._testReserveCUDAResource(init_mock, 0) @patch("torch.cuda.set_device") def testSetDevice(self, set_mock): mock_runner = MagicMock() mock_runner._is_set = False LocalDistributedRunner._set_cuda_device(mock_runner, "123") - self.assertEquals(mock_runner.local_device, "123") + self.assertEquals(mock_runner.local_cuda_device, "123") self.assertTrue(set_mock.called) set_mock.assert_called_with(123) + + def testV1ReserveCPUResources(self): + mock_runner = MagicMock() + mock_runner._set_cpu_devices = MagicMock() + # reserve CPU only + LocalDistributedRunner._try_reserve_and_set_resources( + mock_runner, 4, 0) + remaining = ray.available_resources()["CPU"] + self.assertEquals(int(remaining), 6) + + def testV2ReserveCPUResources(self): + mock_runner = MagicMock() + mock_runner._set_cpu_devices = MagicMock() + # reserve CPU and GPU + LocalDistributedRunner._try_reserve_and_set_resources( + mock_runner, 4, 1) + remaining = ray.available_resources()["CPU"] + self.assertEquals(int(remaining), 6) diff --git a/python/ray/util/sgd/tf/tf_trainer.py b/python/ray/util/sgd/tf/tf_trainer.py index 27857d20d..b5ab0ddee 100644 --- a/python/ray/util/sgd/tf/tf_trainer.py +++ b/python/ray/util/sgd/tf/tf_trainer.py @@ -17,6 +17,7 @@ class TFTrainer: data_creator, config=None, num_replicas=1, + num_cpus_per_worker=1, use_gpu=False, verbose=False): """Sets up the TensorFlow trainer. @@ -31,6 +32,8 @@ class TFTrainer: 'data_creator'. Also contains `fit_config`, which is passed into `model.fit(data, **fit_config)` and `evaluate_config` which is passed into `model.evaluate`. + num_cpus_per_worker (int): Sets the cpu requirement for each + worker. num_replicas (int): Sets number of workers used in distributed training. Workers will be placed arbitrarily across the cluster. @@ -40,6 +43,7 @@ class TFTrainer: self.model_creator = model_creator self.data_creator = data_creator self.config = {} if config is None else config + self.num_cpus_per_worker = num_cpus_per_worker self.use_gpu = use_gpu self.num_replicas = num_replicas self.verbose = verbose @@ -47,7 +51,8 @@ class TFTrainer: # Generate actor class # todo: are these resource quotas right? # should they be exposed to the client codee? - Runner = ray.remote(num_cpus=1, num_gpus=int(use_gpu))(TFRunner) + Runner = ray.remote( + num_cpus=self.num_cpus_per_worker, num_gpus=int(use_gpu))(TFRunner) # todo: should we warn about using # distributed training on one device only? @@ -172,7 +177,8 @@ class TFTrainable(Trainable): data_creator=config["data_creator"], config=config.get("trainer_config", {}), num_replicas=config["num_replicas"], - use_gpu=config["use_gpu"]) + use_gpu=config["use_gpu"], + num_cpus_per_worker=config.get("num_cpus_per_worker", 1)) def _train(self): diff --git a/python/ray/util/sgd/torch/distributed_torch_runner.py b/python/ray/util/sgd/torch/distributed_torch_runner.py index c4181c5ca..d97b04e5f 100644 --- a/python/ray/util/sgd/torch/distributed_torch_runner.py +++ b/python/ray/util/sgd/torch/distributed_torch_runner.py @@ -190,27 +190,54 @@ class _DummyActor: def cuda_devices(self): return os.environ["CUDA_VISIBLE_DEVICES"] + def get(self): + # in order to verify the actor has created + return 1 + # This is a bit of a hack. It prevents the reassignment of CUDA_VISIBLE_DEVICES # during a trainer resize. We won't need this if we don't shutdown # all the actors. -_dummy_actor = None +_dummy_cuda_actor = None +# used to reserve CPU resources +_dummy_cpu_actor = None def clear_dummy_actor(): - global _dummy_actor - if _dummy_actor: + global _dummy_cuda_actor + if _dummy_cuda_actor: try: - _dummy_actor.__ray_terminate__.remote() + _dummy_cuda_actor.__ray_terminate__.remote() except Exception as exc: logger.info("Tried to clear dummy actor: %s", str(exc)) - _dummy_actor = None + _dummy_cuda_actor = None + + global _dummy_cpu_actor + if _dummy_cpu_actor: + try: + _dummy_cpu_actor.__ray_terminate__.remote() + except Exception as exc: + logger.info("Tried to clear dummy actor: %s", str(exc)) + + _dummy_cpu_actor = None -def reserve_cuda_device(retries=20): +def reserve_resources(num_cpus, num_gpus, retries=20): ip = ray.services.get_node_ip_address() - reserved_device = None + + reserved_cuda_device = None + + if num_cpus > 0: + global _dummy_cpu_actor + if _dummy_cpu_actor is None: + _dummy_cpu_actor = ray.remote( + num_cpus=num_cpus, + resources={"node:" + ip: 0.1})(_DummyActor).remote() + assert ray.get(_dummy_cpu_actor.get.remote()) == 1 + + if num_gpus == 0: + return reserved_cuda_device cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES") cuda_device_set = {} @@ -220,25 +247,25 @@ def reserve_cuda_device(retries=20): assert isinstance(cuda_devices, str) cuda_device_set = set(cuda_devices.split(",")) - global _dummy_actor + global _dummy_cuda_actor unused_actors = [] success = False for _ in range(retries): - if _dummy_actor is None: - _dummy_actor = ray.remote( - num_gpus=1, + if _dummy_cuda_actor is None: + _dummy_cuda_actor = ray.remote( + num_cpus=0, num_gpus=num_gpus, resources={"node:" + ip: 0.1})(_DummyActor).remote() - reserved_device = ray.get(_dummy_actor.cuda_devices.remote()) + reserved_cuda_device = ray.get(_dummy_cuda_actor.cuda_devices.remote()) - if match_devices and reserved_device not in cuda_device_set: + if match_devices and reserved_cuda_device not in cuda_device_set: logger.debug("Device %s not in list of visible devices %s", - reserved_device, cuda_device_set) - unused_actors.append(_dummy_actor) - _dummy_actor = None + reserved_cuda_device, cuda_device_set) + unused_actors.append(_dummy_cuda_actor) + _dummy_cuda_actor = None else: - logger.debug("Found matching device %s", reserved_device) + logger.debug("Found matching device %s", reserved_cuda_device) success = True for actor in unused_actors: actor.__ray_terminate__.remote() @@ -250,7 +277,7 @@ def reserve_cuda_device(retries=20): "make sure that Ray has access to all the visible devices: " "{}".format(os.environ.get("CUDA_VISIBLE_DEVICES"))) - return reserved_device + return reserved_cuda_device class LocalDistributedRunner(DistributedTorchRunner): @@ -265,39 +292,48 @@ class LocalDistributedRunner(DistributedTorchRunner): # Reserve a local GPU or CPU for the local worker # TODO: we should make sure this NEVER dies. - self.local_device = "0" + self.local_cuda_device = "0" self._is_set = False if num_gpus: assert num_gpus == 1, "Does not support multi-gpu workers" - if not self.is_actor() and num_gpus > 0: - self._try_reserve_and_set_cuda() + if num_cpus is None: + num_cpus = 0 + + if num_gpus is None: + num_gpus = 0 + + if not self.is_actor() and (num_cpus > 0 or num_gpus > 0): + self._try_reserve_and_set_resources(num_cpus, num_gpus) super(LocalDistributedRunner, self).__init__(*args, **kwargs) - def _try_reserve_and_set_cuda(self): - visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES") - reserved_device = reserve_cuda_device() + def _try_reserve_and_set_resources(self, num_cpus, num_gpus): + visible_cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES") + reserved_cuda_device = reserve_resources(num_cpus, num_gpus) + if num_gpus == 0: + return # This needs to be set even if torch.cuda is already # initialized because the env var is used later when # starting the DDP setup. - os.environ["CUDA_VISIBLE_DEVICES"] = reserved_device - if visible_devices: + os.environ["CUDA_VISIBLE_DEVICES"] = reserved_cuda_device + if visible_cuda_devices: # We want to set the index on the visible devices list. - if reserved_device not in visible_devices: + if reserved_cuda_device not in visible_cuda_devices: raise RuntimeError( "TorchTrainer reserved a device {} that was not in the " "CUDA_VISIBLE_DEVICES {}. This may be because the " "Ray cluster is not set with the right env vars. " - "If that is not the issue, please raise a " - "Github issue.".format(reserved_device, visible_devices)) - devices = visible_devices.split(",") - scoped_index = devices.index(reserved_device) + "If that is not the issue, please raise a Github " + "issue.".format(reserved_cuda_device, + visible_cuda_devices)) + devices = visible_cuda_devices.split(",") + scoped_index = devices.index(reserved_cuda_device) self._set_cuda_device(str(scoped_index)) else: # Once cuda is initialized, torch.device ignores the os.env # so we have to set the right actual device. - self._set_cuda_device(reserved_device) + self._set_cuda_device(reserved_cuda_device) def _set_cuda_device(self, device_str): """Sets the CUDA device for this current local worker.""" @@ -309,24 +345,31 @@ class LocalDistributedRunner(DistributedTorchRunner): # before we call 'set_device'. _init_cuda_context() assert isinstance(device_str, str) - self.local_device = device_str - logger.debug("Setting local device: %s", self.local_device) + self.local_cuda_device = device_str + logger.debug("Setting local CUDA device: %s", self.local_cuda_device) try: - torch.cuda.set_device(int(self.local_device)) + torch.cuda.set_device(int(self.local_cuda_device)) except RuntimeError: - logger.error("Failed to set local device.") + logger.error("Failed to set local CUDA device.") raise def get_device_ids(self): - return [int(self.local_device)] + return [int(self.local_cuda_device)] def shutdown(self, cleanup=True): super(LocalDistributedRunner, self).shutdown() - global _dummy_actor - if cleanup and _dummy_actor: - assert not self.is_actor(), "Actor shouldn't have a dummy actor." - ray.kill(_dummy_actor) - _dummy_actor = None + global _dummy_cpu_actor + global _dummy_cuda_actor + if cleanup: + if _dummy_cpu_actor or _dummy_cuda_actor: + assert not self.is_actor(), ("Actor shouldn't have a " + "dummy actor.") + if _dummy_cpu_actor: + ray.kill(_dummy_cpu_actor) + if _dummy_cuda_actor: + ray.kill(_dummy_cuda_actor) + _dummy_cpu_actor = None + _dummy_cuda_actor = None def is_actor(self): actor_id = ray.worker.global_worker.actor_id diff --git a/python/ray/util/sgd/torch/torch_trainer.py b/python/ray/util/sgd/torch/torch_trainer.py index 660d2adf1..232289501 100644 --- a/python/ray/util/sgd/torch/torch_trainer.py +++ b/python/ray/util/sgd/torch/torch_trainer.py @@ -124,6 +124,7 @@ class TorchTrainer: num_workers (int): the number of workers used in distributed training. If 1, the worker will not be wrapped with DistributedDataParallel. + num_cpus_per_worker (int): Sets the cpu requirement for each worker. use_gpu (bool): Sets resource allocation for workers to 1 GPU if true, and automatically moves both the model and optimizer to the available CUDA device. @@ -175,6 +176,7 @@ class TorchTrainer: initialization_hook=None, config=None, num_workers=1, + num_cpus_per_worker=1, use_gpu="auto", backend="auto", wrap_ddp=True, @@ -240,6 +242,7 @@ class TorchTrainer: logger.debug("Using {} as backend.".format(backend)) self.backend = backend + self.num_cpus_per_worker = num_cpus_per_worker self.use_gpu = use_gpu self.max_replicas = num_workers @@ -328,11 +331,14 @@ class TorchTrainer: # Start local worker self.local_worker = LocalDistributedRunner( - num_cpus=1, num_gpus=int(self.use_gpu), **params) + num_cpus=self.num_cpus_per_worker, + num_gpus=int(self.use_gpu), + **params) # Generate actor class RemoteRunner = ray.remote( - num_cpus=1, num_gpus=int(self.use_gpu))(DistributedTorchRunner) + num_cpus=self.num_cpus_per_worker, + num_gpus=int(self.use_gpu))(DistributedTorchRunner) # Start workers self.remote_workers = [ RemoteRunner.remote(**params) for i in range(num_workers - 1) @@ -736,12 +742,14 @@ class TorchTrainer: def default_resource_request(cls, config): num_workers = config.get("num_workers", kwargs.get("num_workers", 1)) + num_cpus = config.get("num_cpus_per_worker", + kwargs.get("num_cpus_per_worker", 1)) use_gpu = config.get("use_gpu", kwargs.get("use_gpu")) remote_worker_count = num_workers - 1 return Resources( - cpu=1, + cpu=num_cpus, gpu=int(use_gpu), extra_cpu=int(remote_worker_count), extra_gpu=int(int(use_gpu) * remote_worker_count))