Move calls to ray.worker.cleanup into tearDown part of tests for isolation. (#1433)

This commit is contained in:
Robert Nishihara
2018-01-22 22:54:56 -08:00
committed by Philipp Moritz
parent 4b1c8be4fe
commit f32c0c8ec1
5 changed files with 37 additions and 91 deletions
+4 -6
View File
@@ -16,6 +16,8 @@ if sys.version_info >= (3, 0):
class RemoteArrayTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random,
@@ -48,10 +50,10 @@ class RemoteArrayTest(unittest.TestCase):
r_val = ray.get(r_id)
assert_almost_equal(np.dot(q_val, r_val), a_val)
ray.worker.cleanup()
class DistributedArrayTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testAssemble(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random,
@@ -67,8 +69,6 @@ class DistributedArrayTest(unittest.TestCase):
np.vstack([np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]),
np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])]))
ray.worker.cleanup()
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random,
da.linalg]:
@@ -230,8 +230,6 @@ class DistributedArrayTest(unittest.TestCase):
d2 = np.random.randint(1, 35)
test_dist_qr(d1, d2)
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)
+11 -23
View File
@@ -30,6 +30,9 @@ def wait_for_errors(error_type, num_errors, timeout=10):
class TaskStatusTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testFailedTask(self):
reload(test_functions)
ray.init(num_workers=3, driver_mode=ray.SILENT_MODE)
@@ -61,8 +64,6 @@ class TaskStatusTest(unittest.TestCase):
# ray.get should throw an exception.
self.assertTrue(False)
ray.worker.cleanup()
def testFailImportingRemoteFunction(self):
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
@@ -100,7 +101,6 @@ def temporary_helper_function():
# Clean up the junk we added to sys.path.
sys.path.pop(-1)
ray.worker.cleanup()
def testFailedFunctionToRun(self):
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
@@ -117,8 +117,6 @@ def temporary_helper_function():
self.assertIn(b"Function to run failed.",
ray.error_info()[1][b"message"])
ray.worker.cleanup()
def testFailImportingActor(self):
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
@@ -178,10 +176,11 @@ def temporary_helper_function():
# Clean up the junk we added to sys.path.
sys.path.pop(-1)
ray.worker.cleanup()
class ActorTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testFailedActorInit(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -215,8 +214,6 @@ class ActorTest(unittest.TestCase):
self.assertIn(error_message2,
ray.error_info()[1][b"message"].decode("ascii"))
ray.worker.cleanup()
def testIncorrectMethodCalls(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -254,10 +251,10 @@ class ActorTest(unittest.TestCase):
with self.assertRaises(AttributeError):
a.nonexistent_method.remote()
ray.worker.cleanup()
class WorkerDeath(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testWorkerRaisingException(self):
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
@@ -290,8 +287,6 @@ class WorkerDeath(unittest.TestCase):
self.assertIn("A worker died or was killed while executing a task.",
ray.error_info()[0][b"message"].decode("ascii"))
ray.worker.cleanup()
def testActorWorkerDying(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -310,8 +305,6 @@ class WorkerDeath(unittest.TestCase):
self.assertRaises(Exception, lambda: ray.get(consume.remote(obj)))
wait_for_errors(b"worker_died", 1)
ray.worker.cleanup()
def testActorWorkerDyingFutureTasks(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -334,8 +327,6 @@ class WorkerDeath(unittest.TestCase):
wait_for_errors(b"worker_died", 1)
ray.worker.cleanup()
def testActorWorkerDyingNothingInProgress(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -351,10 +342,10 @@ class WorkerDeath(unittest.TestCase):
task2 = a.getpid.remote()
self.assertRaises(Exception, lambda: ray.get(task2))
ray.worker.cleanup()
class PutErrorTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testPutError1(self):
store_size = 10 ** 6
@@ -400,8 +391,6 @@ class PutErrorTest(unittest.TestCase):
# Make sure we receive the correct error message.
wait_for_errors(b"put_reconstruction", 1)
ray.worker.cleanup()
def testPutError2(self):
# This is the same as the previous test, but it calls ray.put directly.
store_size = 10 ** 6
@@ -446,10 +435,10 @@ class PutErrorTest(unittest.TestCase):
# Make sure we receive the correct error message.
wait_for_errors(b"put_reconstruction", 1)
ray.worker.cleanup()
class ConfigurationTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testVersionMismatch(self):
import cloudpickle
@@ -461,7 +450,6 @@ class ConfigurationTest(unittest.TestCase):
wait_for_errors(b"version_mismatch", 1)
cloudpickle.__version__ = cloudpickle_version
ray.worker.cleanup()
if __name__ == "__main__":
+2 -4
View File
@@ -16,6 +16,8 @@ if sys.version_info >= (3, 0):
class MicroBenchmarkTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testTiming(self):
reload(test_functions)
@@ -89,8 +91,6 @@ class MicroBenchmarkTest(unittest.TestCase):
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00087.
ray.worker.cleanup()
def testCache(self):
ray.init(num_workers=1)
@@ -115,8 +115,6 @@ class MicroBenchmarkTest(unittest.TestCase):
print("WARNING: The caching test was too slow. "
"d = {}, b = {}".format(d, b))
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)
+18 -45
View File
@@ -205,6 +205,9 @@ except AttributeError:
class SerializationTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testRecursiveObjects(self):
ray.init(num_workers=0)
@@ -233,8 +236,6 @@ class SerializationTest(unittest.TestCase):
for obj in recursive_objects:
self.assertRaises(Exception, lambda: ray.put(obj))
ray.worker.cleanup()
def testPassingArgumentsByValue(self):
ray.init(num_workers=1)
@@ -247,8 +248,6 @@ class SerializationTest(unittest.TestCase):
for obj in RAY_TEST_OBJECTS:
assert_equal(obj, ray.get(f.remote(obj)))
ray.worker.cleanup()
def testPassingArgumentsByValueOutOfTheBox(self):
ray.init(num_workers=1)
@@ -282,8 +281,6 @@ class SerializationTest(unittest.TestCase):
# won't be "equal" to Foo.
ray.get(ray.put(Foo))
ray.worker.cleanup()
def testPuttingObjectThatClosesOverObjectID(self):
# This test is here to prevent a regression of
# https://github.com/ray-project/ray/issues/1317.
@@ -300,10 +297,11 @@ class SerializationTest(unittest.TestCase):
with self.assertRaises(ray.local_scheduler.common_error):
ray.put(f)
ray.worker.cleanup()
class WorkerTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testPythonWorkers(self):
# Test the codepath for starting workers from the Python script,
# instead of the local scheduler. This codepath is for debugging
@@ -320,7 +318,6 @@ class WorkerTest(unittest.TestCase):
values = ray.get([f.remote(1) for i in range(num_workers * 2)])
self.assertEqual(values, [1] * (num_workers * 2))
ray.worker.cleanup()
def testPutGet(self):
ray.init(num_workers=0)
@@ -349,8 +346,6 @@ class WorkerTest(unittest.TestCase):
value_after = ray.get(objectid)
self.assertEqual(value_before, value_after)
ray.worker.cleanup()
class APITest(unittest.TestCase):
def init_ray(self, **kwargs):
@@ -1021,6 +1016,9 @@ class APITestSharded(APITest):
class PythonModeTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testPythonMode(self):
reload(test_functions)
ray.init(driver_mode=ray.PYTHON_MODE)
@@ -1085,10 +1083,11 @@ class PythonModeTest(unittest.TestCase):
test_array[0] = -1
assert_equal(test_array, test_actor.get_array.remote())
ray.worker.cleanup()
class ResourcesTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testResourceConstraints(self):
num_workers = 20
ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2)
@@ -1164,8 +1163,6 @@ class ResourcesTest(unittest.TestCase):
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
ray.worker.cleanup()
def testMultiResourceConstraints(self):
num_workers = 20
ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10)
@@ -1218,8 +1215,6 @@ class ResourcesTest(unittest.TestCase):
self.assertLess(duration, 1 + time_buffer)
self.assertGreater(duration, 1)
ray.worker.cleanup()
def testGPUIDs(self):
num_gpus = 10
ray.init(num_cpus=10, num_gpus=num_gpus)
@@ -1371,8 +1366,6 @@ class ResourcesTest(unittest.TestCase):
a1 = Actor1.remote()
ray.get(a1.test.remote())
ray.worker.cleanup()
def testMultipleLocalSchedulers(self):
# This test will define a bunch of tasks that can only be assigned to
# specific local schedulers, and we will check that they are assigned
@@ -1487,8 +1480,6 @@ class ResourcesTest(unittest.TestCase):
names, results = ray.get(run_nested2.remote())
validate_names_and_results(names, results)
ray.worker.cleanup()
def testCustomResources(self):
ray.worker._init(
start_ray_local=True,
@@ -1525,8 +1516,6 @@ class ResourcesTest(unittest.TestCase):
# custom resources gets blocked.
ray.get([h.remote() for _ in range(5)])
ray.worker.cleanup()
def testTwoCustomResources(self):
ray.worker._init(
start_ray_local=True,
@@ -1577,8 +1566,6 @@ class ResourcesTest(unittest.TestCase):
timeout=500)
self.assertEqual(ready_ids, [])
ray.worker.cleanup()
def testManyCustomResources(self):
num_custom_resources = 10000
total_resources = {str(i): np.random.randint(1, 7)
@@ -1609,8 +1596,6 @@ class ResourcesTest(unittest.TestCase):
ray.get(results)
ray.worker.cleanup()
class WorkerPoolTests(unittest.TestCase):
def tearDown(self):
@@ -1658,8 +1643,6 @@ class WorkerPoolTests(unittest.TestCase):
ray.get(sleep.remote())
ray.worker.cleanup()
def testMaxCallTasks(self):
ray.init(num_cpus=1)
@@ -1679,10 +1662,11 @@ class WorkerPoolTests(unittest.TestCase):
self.assertEqual(pid1, pid2)
ray.test.test_utils.wait_for_pid_to_exit(pid1)
ray.worker.cleanup()
class SchedulingAlgorithm(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def attempt_to_load_balance(self,
remote_function,
args,
@@ -1721,8 +1705,6 @@ class SchedulingAlgorithm(unittest.TestCase):
self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 25)
self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 250)
ray.worker.cleanup()
def testLoadBalancingWithDependencies(self):
# This test ensures that tasks are being assigned to all local
# schedulers in a roughly equal manner even when the tasks have
@@ -1745,8 +1727,6 @@ class SchedulingAlgorithm(unittest.TestCase):
self.attempt_to_load_balance(f, [x], 100, num_local_schedulers, 25)
ray.worker.cleanup()
def wait_for_num_tasks(num_tasks, timeout=10):
start_time = time.time()
@@ -1767,6 +1747,9 @@ def wait_for_num_objects(num_objects, timeout=10):
class GlobalStateAPI(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testGlobalStateAPI(self):
with self.assertRaises(Exception):
ray.global_state.object_table()
@@ -1891,8 +1874,6 @@ class GlobalStateAPI(unittest.TestCase):
self.assertEqual(object_table[result_id],
ray.global_state.object_table(result_id))
ray.worker.cleanup()
def testLogFileAPI(self):
ray.init(redirect_output=True)
@@ -1923,8 +1904,6 @@ class GlobalStateAPI(unittest.TestCase):
self.assertEqual(found_message, True)
ray.worker.cleanup()
def testTaskProfileAPI(self):
ray.init(redirect_output=True)
@@ -1957,8 +1936,6 @@ class GlobalStateAPI(unittest.TestCase):
self.assertIn("store_outputs_start", data)
self.assertIn("store_outputs_end", data)
ray.worker.cleanup()
def testWorkers(self):
num_workers = 3
ray.init(
@@ -1985,8 +1962,6 @@ class GlobalStateAPI(unittest.TestCase):
self.assertIn("stderr_file", info)
self.assertIn("stdout_file", info)
ray.worker.cleanup()
def testDumpTraceFile(self):
ray.init(redirect_output=True)
@@ -2016,8 +1991,6 @@ class GlobalStateAPI(unittest.TestCase):
# the visualization actually renders (e.g., the context of the dumped
# trace could be malformed).
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)
+2 -13
View File
@@ -72,6 +72,8 @@ class TrainActor(object):
class TensorFlowTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
def testTensorFlowVariables(self):
ray.init(num_workers=2)
@@ -111,8 +113,6 @@ class TensorFlowTest(unittest.TestCase):
variables3.set_session(sess)
self.assertEqual(variables3.sess, sess)
ray.worker.cleanup()
# Test that the variable names for the two different nets are not
# modified by TensorFlow to be unique (i.e. they should already
# be unique because of the variable prefix).
@@ -126,8 +126,6 @@ class TensorFlowTest(unittest.TestCase):
# same, i.e. that the names in the weight dictionaries are the same
net1.values[0].set_weights(net2.values[0].get_weights())
ray.worker.cleanup()
# Test that different networks on the same worker are independent and
# we can get/set their weights without any interaction.
def testNetworksIndependent(self):
@@ -157,8 +155,6 @@ class TensorFlowTest(unittest.TestCase):
self.assertEqual(weights1, new_weights1)
self.assertEqual(weights2, new_weights2)
ray.worker.cleanup()
# This test creates an additional network on the driver so that the
# tensorflow variables on the driver and the worker differ.
def testNetworkDriverWorkerIndependent(self):
@@ -177,8 +173,6 @@ class TensorFlowTest(unittest.TestCase):
net2.get_weights.remote()))
self.assertEqual(weights2, new_weights2)
ray.worker.cleanup()
def testVariablesControlDependencies(self):
ray.init(num_workers=1)
@@ -193,16 +187,12 @@ class TensorFlowTest(unittest.TestCase):
# momentum variables.
self.assertEqual(len(net_vars.variables.items()), 4)
ray.worker.cleanup()
def testRemoteTrainingStep(self):
ray.init(num_workers=1)
net = ray.remote(TrainActor).remote()
ray.get(net.training_step.remote(net.get_weights.remote()))
ray.worker.cleanup()
def testRemoteTrainingLoss(self):
ray.init(num_workers=2)
@@ -227,7 +217,6 @@ class TensorFlowTest(unittest.TestCase):
after_acc = sess.run(loss, feed_dict=dict(zip(placeholders,
[[2] * 100, [4] * 100])))
self.assertTrue(before_acc < after_acc)
ray.worker.cleanup()
if __name__ == "__main__":