From f32c0c8ec118836bf4fcda957b1d16eeddd7706f Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 22 Jan 2018 22:54:56 -0800 Subject: [PATCH] Move calls to ray.worker.cleanup into tearDown part of tests for isolation. (#1433) --- test/array_test.py | 10 +++---- test/failure_test.py | 34 +++++++--------------- test/microbenchmarks.py | 6 ++-- test/runtest.py | 63 ++++++++++++----------------------------- test/tensorflow_test.py | 15 ++-------- 5 files changed, 37 insertions(+), 91 deletions(-) diff --git a/test/array_test.py b/test/array_test.py index f3418e86f..99a533c21 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -16,6 +16,8 @@ if sys.version_info >= (3, 0): class RemoteArrayTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, @@ -48,10 +50,10 @@ class RemoteArrayTest(unittest.TestCase): r_val = ray.get(r_id) assert_almost_equal(np.dot(q_val, r_val), a_val) - ray.worker.cleanup() - class DistributedArrayTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testAssemble(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, @@ -67,8 +69,6 @@ class DistributedArrayTest(unittest.TestCase): np.vstack([np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]), np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])])) - ray.worker.cleanup() - def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: @@ -230,8 +230,6 @@ class DistributedArrayTest(unittest.TestCase): d2 = np.random.randint(1, 35) test_dist_qr(d1, d2) - ray.worker.cleanup() - if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/failure_test.py b/test/failure_test.py index 13f696781..f0e3e8703 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -30,6 +30,9 @@ def wait_for_errors(error_type, num_errors, timeout=10): class TaskStatusTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testFailedTask(self): reload(test_functions) ray.init(num_workers=3, driver_mode=ray.SILENT_MODE) @@ -61,8 +64,6 @@ class TaskStatusTest(unittest.TestCase): # ray.get should throw an exception. self.assertTrue(False) - ray.worker.cleanup() - def testFailImportingRemoteFunction(self): ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) @@ -100,7 +101,6 @@ def temporary_helper_function(): # Clean up the junk we added to sys.path. sys.path.pop(-1) - ray.worker.cleanup() def testFailedFunctionToRun(self): ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) @@ -117,8 +117,6 @@ def temporary_helper_function(): self.assertIn(b"Function to run failed.", ray.error_info()[1][b"message"]) - ray.worker.cleanup() - def testFailImportingActor(self): ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) @@ -178,10 +176,11 @@ def temporary_helper_function(): # Clean up the junk we added to sys.path. sys.path.pop(-1) - ray.worker.cleanup() class ActorTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testFailedActorInit(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -215,8 +214,6 @@ class ActorTest(unittest.TestCase): self.assertIn(error_message2, ray.error_info()[1][b"message"].decode("ascii")) - ray.worker.cleanup() - def testIncorrectMethodCalls(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -254,10 +251,10 @@ class ActorTest(unittest.TestCase): with self.assertRaises(AttributeError): a.nonexistent_method.remote() - ray.worker.cleanup() - class WorkerDeath(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testWorkerRaisingException(self): ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) @@ -290,8 +287,6 @@ class WorkerDeath(unittest.TestCase): self.assertIn("A worker died or was killed while executing a task.", ray.error_info()[0][b"message"].decode("ascii")) - ray.worker.cleanup() - def testActorWorkerDying(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -310,8 +305,6 @@ class WorkerDeath(unittest.TestCase): self.assertRaises(Exception, lambda: ray.get(consume.remote(obj))) wait_for_errors(b"worker_died", 1) - ray.worker.cleanup() - def testActorWorkerDyingFutureTasks(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -334,8 +327,6 @@ class WorkerDeath(unittest.TestCase): wait_for_errors(b"worker_died", 1) - ray.worker.cleanup() - def testActorWorkerDyingNothingInProgress(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -351,10 +342,10 @@ class WorkerDeath(unittest.TestCase): task2 = a.getpid.remote() self.assertRaises(Exception, lambda: ray.get(task2)) - ray.worker.cleanup() - class PutErrorTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testPutError1(self): store_size = 10 ** 6 @@ -400,8 +391,6 @@ class PutErrorTest(unittest.TestCase): # Make sure we receive the correct error message. wait_for_errors(b"put_reconstruction", 1) - ray.worker.cleanup() - def testPutError2(self): # This is the same as the previous test, but it calls ray.put directly. store_size = 10 ** 6 @@ -446,10 +435,10 @@ class PutErrorTest(unittest.TestCase): # Make sure we receive the correct error message. wait_for_errors(b"put_reconstruction", 1) - ray.worker.cleanup() - class ConfigurationTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testVersionMismatch(self): import cloudpickle @@ -461,7 +450,6 @@ class ConfigurationTest(unittest.TestCase): wait_for_errors(b"version_mismatch", 1) cloudpickle.__version__ = cloudpickle_version - ray.worker.cleanup() if __name__ == "__main__": diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index f31a8fe56..bd358d3d9 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -16,6 +16,8 @@ if sys.version_info >= (3, 0): class MicroBenchmarkTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testTiming(self): reload(test_functions) @@ -89,8 +91,6 @@ class MicroBenchmarkTest(unittest.TestCase): print(" worst: {}".format(elapsed_times[999])) # average_elapsed_time should be about 0.00087. - ray.worker.cleanup() - def testCache(self): ray.init(num_workers=1) @@ -115,8 +115,6 @@ class MicroBenchmarkTest(unittest.TestCase): print("WARNING: The caching test was too slow. " "d = {}, b = {}".format(d, b)) - ray.worker.cleanup() - if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/runtest.py b/test/runtest.py index 5c40b36ef..007ba09d8 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -205,6 +205,9 @@ except AttributeError: class SerializationTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testRecursiveObjects(self): ray.init(num_workers=0) @@ -233,8 +236,6 @@ class SerializationTest(unittest.TestCase): for obj in recursive_objects: self.assertRaises(Exception, lambda: ray.put(obj)) - ray.worker.cleanup() - def testPassingArgumentsByValue(self): ray.init(num_workers=1) @@ -247,8 +248,6 @@ class SerializationTest(unittest.TestCase): for obj in RAY_TEST_OBJECTS: assert_equal(obj, ray.get(f.remote(obj))) - ray.worker.cleanup() - def testPassingArgumentsByValueOutOfTheBox(self): ray.init(num_workers=1) @@ -282,8 +281,6 @@ class SerializationTest(unittest.TestCase): # won't be "equal" to Foo. ray.get(ray.put(Foo)) - ray.worker.cleanup() - def testPuttingObjectThatClosesOverObjectID(self): # This test is here to prevent a regression of # https://github.com/ray-project/ray/issues/1317. @@ -300,10 +297,11 @@ class SerializationTest(unittest.TestCase): with self.assertRaises(ray.local_scheduler.common_error): ray.put(f) - ray.worker.cleanup() - class WorkerTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testPythonWorkers(self): # Test the codepath for starting workers from the Python script, # instead of the local scheduler. This codepath is for debugging @@ -320,7 +318,6 @@ class WorkerTest(unittest.TestCase): values = ray.get([f.remote(1) for i in range(num_workers * 2)]) self.assertEqual(values, [1] * (num_workers * 2)) - ray.worker.cleanup() def testPutGet(self): ray.init(num_workers=0) @@ -349,8 +346,6 @@ class WorkerTest(unittest.TestCase): value_after = ray.get(objectid) self.assertEqual(value_before, value_after) - ray.worker.cleanup() - class APITest(unittest.TestCase): def init_ray(self, **kwargs): @@ -1021,6 +1016,9 @@ class APITestSharded(APITest): class PythonModeTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testPythonMode(self): reload(test_functions) ray.init(driver_mode=ray.PYTHON_MODE) @@ -1085,10 +1083,11 @@ class PythonModeTest(unittest.TestCase): test_array[0] = -1 assert_equal(test_array, test_actor.get_array.remote()) - ray.worker.cleanup() - class ResourcesTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testResourceConstraints(self): num_workers = 20 ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2) @@ -1164,8 +1163,6 @@ class ResourcesTest(unittest.TestCase): self.assertLess(duration, 1 + time_buffer) self.assertGreater(duration, 1) - ray.worker.cleanup() - def testMultiResourceConstraints(self): num_workers = 20 ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10) @@ -1218,8 +1215,6 @@ class ResourcesTest(unittest.TestCase): self.assertLess(duration, 1 + time_buffer) self.assertGreater(duration, 1) - ray.worker.cleanup() - def testGPUIDs(self): num_gpus = 10 ray.init(num_cpus=10, num_gpus=num_gpus) @@ -1371,8 +1366,6 @@ class ResourcesTest(unittest.TestCase): a1 = Actor1.remote() ray.get(a1.test.remote()) - ray.worker.cleanup() - def testMultipleLocalSchedulers(self): # This test will define a bunch of tasks that can only be assigned to # specific local schedulers, and we will check that they are assigned @@ -1487,8 +1480,6 @@ class ResourcesTest(unittest.TestCase): names, results = ray.get(run_nested2.remote()) validate_names_and_results(names, results) - ray.worker.cleanup() - def testCustomResources(self): ray.worker._init( start_ray_local=True, @@ -1525,8 +1516,6 @@ class ResourcesTest(unittest.TestCase): # custom resources gets blocked. ray.get([h.remote() for _ in range(5)]) - ray.worker.cleanup() - def testTwoCustomResources(self): ray.worker._init( start_ray_local=True, @@ -1577,8 +1566,6 @@ class ResourcesTest(unittest.TestCase): timeout=500) self.assertEqual(ready_ids, []) - ray.worker.cleanup() - def testManyCustomResources(self): num_custom_resources = 10000 total_resources = {str(i): np.random.randint(1, 7) @@ -1609,8 +1596,6 @@ class ResourcesTest(unittest.TestCase): ray.get(results) - ray.worker.cleanup() - class WorkerPoolTests(unittest.TestCase): def tearDown(self): @@ -1658,8 +1643,6 @@ class WorkerPoolTests(unittest.TestCase): ray.get(sleep.remote()) - ray.worker.cleanup() - def testMaxCallTasks(self): ray.init(num_cpus=1) @@ -1679,10 +1662,11 @@ class WorkerPoolTests(unittest.TestCase): self.assertEqual(pid1, pid2) ray.test.test_utils.wait_for_pid_to_exit(pid1) - ray.worker.cleanup() - class SchedulingAlgorithm(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def attempt_to_load_balance(self, remote_function, args, @@ -1721,8 +1705,6 @@ class SchedulingAlgorithm(unittest.TestCase): self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 25) self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 250) - ray.worker.cleanup() - def testLoadBalancingWithDependencies(self): # This test ensures that tasks are being assigned to all local # schedulers in a roughly equal manner even when the tasks have @@ -1745,8 +1727,6 @@ class SchedulingAlgorithm(unittest.TestCase): self.attempt_to_load_balance(f, [x], 100, num_local_schedulers, 25) - ray.worker.cleanup() - def wait_for_num_tasks(num_tasks, timeout=10): start_time = time.time() @@ -1767,6 +1747,9 @@ def wait_for_num_objects(num_objects, timeout=10): class GlobalStateAPI(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testGlobalStateAPI(self): with self.assertRaises(Exception): ray.global_state.object_table() @@ -1891,8 +1874,6 @@ class GlobalStateAPI(unittest.TestCase): self.assertEqual(object_table[result_id], ray.global_state.object_table(result_id)) - ray.worker.cleanup() - def testLogFileAPI(self): ray.init(redirect_output=True) @@ -1923,8 +1904,6 @@ class GlobalStateAPI(unittest.TestCase): self.assertEqual(found_message, True) - ray.worker.cleanup() - def testTaskProfileAPI(self): ray.init(redirect_output=True) @@ -1957,8 +1936,6 @@ class GlobalStateAPI(unittest.TestCase): self.assertIn("store_outputs_start", data) self.assertIn("store_outputs_end", data) - ray.worker.cleanup() - def testWorkers(self): num_workers = 3 ray.init( @@ -1985,8 +1962,6 @@ class GlobalStateAPI(unittest.TestCase): self.assertIn("stderr_file", info) self.assertIn("stdout_file", info) - ray.worker.cleanup() - def testDumpTraceFile(self): ray.init(redirect_output=True) @@ -2016,8 +1991,6 @@ class GlobalStateAPI(unittest.TestCase): # the visualization actually renders (e.g., the context of the dumped # trace could be malformed). - ray.worker.cleanup() - if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index f83e7283d..4ffaa29ef 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -72,6 +72,8 @@ class TrainActor(object): class TensorFlowTest(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() def testTensorFlowVariables(self): ray.init(num_workers=2) @@ -111,8 +113,6 @@ class TensorFlowTest(unittest.TestCase): variables3.set_session(sess) self.assertEqual(variables3.sess, sess) - ray.worker.cleanup() - # Test that the variable names for the two different nets are not # modified by TensorFlow to be unique (i.e. they should already # be unique because of the variable prefix). @@ -126,8 +126,6 @@ class TensorFlowTest(unittest.TestCase): # same, i.e. that the names in the weight dictionaries are the same net1.values[0].set_weights(net2.values[0].get_weights()) - ray.worker.cleanup() - # Test that different networks on the same worker are independent and # we can get/set their weights without any interaction. def testNetworksIndependent(self): @@ -157,8 +155,6 @@ class TensorFlowTest(unittest.TestCase): self.assertEqual(weights1, new_weights1) self.assertEqual(weights2, new_weights2) - ray.worker.cleanup() - # This test creates an additional network on the driver so that the # tensorflow variables on the driver and the worker differ. def testNetworkDriverWorkerIndependent(self): @@ -177,8 +173,6 @@ class TensorFlowTest(unittest.TestCase): net2.get_weights.remote())) self.assertEqual(weights2, new_weights2) - ray.worker.cleanup() - def testVariablesControlDependencies(self): ray.init(num_workers=1) @@ -193,16 +187,12 @@ class TensorFlowTest(unittest.TestCase): # momentum variables. self.assertEqual(len(net_vars.variables.items()), 4) - ray.worker.cleanup() - def testRemoteTrainingStep(self): ray.init(num_workers=1) net = ray.remote(TrainActor).remote() ray.get(net.training_step.remote(net.get_weights.remote())) - ray.worker.cleanup() - def testRemoteTrainingLoss(self): ray.init(num_workers=2) @@ -227,7 +217,6 @@ class TensorFlowTest(unittest.TestCase): after_acc = sess.run(loss, feed_dict=dict(zip(placeholders, [[2] * 100, [4] * 100]))) self.assertTrue(before_acc < after_acc) - ray.worker.cleanup() if __name__ == "__main__":