diff --git a/include/ray/logging.h b/include/ray/logging.h index b79e4f597..f4934075e 100644 --- a/include/ray/logging.h +++ b/include/ray/logging.h @@ -49,6 +49,10 @@ extern "C" __declspec(dllimport) int __stdcall IsDebuggerPresent(); if (!(condition)) {\ RAY_LOG(RAY_FATAL, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \ } +#define RAY_WARN(condition, message) \ + if (!(condition)) {\ + RAY_LOG(RAY_INFO, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \ + } #define RAY_CHECK_EQ(var1, var2, message) RAY_CHECK((var1) == (var2), message) #define RAY_CHECK_NEQ(var1, var2, message) RAY_CHECK((var1) != (var2), message) @@ -60,5 +64,5 @@ extern "C" __declspec(dllimport) int __stdcall IsDebuggerPresent(); #define RAY_CHECK_GRPC(expr) \ do { \ grpc::Status _s = (expr); \ - RAY_CHECK(_s.ok(), "grpc call failed with message " << _s.error_message()); \ + RAY_WARN(_s.ok(), "grpc call failed with message " << _s.error_message()); \ } while (0); diff --git a/test/array_test.py b/test/array_test.py index edbb2ffba..f8d11e345 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -9,13 +9,19 @@ from numpy.testing import assert_equal, assert_almost_equal import ray.array.remote as ra import ray.array.distributed as da -class RemoteArrayTest(unittest.TestCase): +class TestRemoteArrays(unittest.TestCase): - def testMethods(self): + @classmethod + def setUpClass(cls): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True) + ray.init(start_ray_local=True, num_workers=1) + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def test_methods(self): # test eye object_id = ra.eye.remote(3) val = ray.get(object_id) @@ -41,40 +47,32 @@ class RemoteArrayTest(unittest.TestCase): r_val = ray.get(r_id) assert_almost_equal(np.dot(q_val, r_val), a_val) - ray.worker.cleanup() +class TestDistributedArrays(unittest.TestCase): -class DistributedArrayTest(unittest.TestCase): - - def testSerialization(self): + @classmethod + def setUpClass(cls): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True, num_workers=0) + ray.init(start_ray_local=True, num_workers=10, num_objstores=2) + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def test_serialization(self): x = da.DistArray([2, 3, 4], np.array([[[ray.put(0)]]])) capsule, _ = ray.serialization.serialize(ray.worker.global_worker.handle, x) y = ray.serialization.deserialize(ray.worker.global_worker.handle, capsule) self.assertEqual(x.shape, y.shape) self.assertEqual(x.objectids[0, 0, 0].id, y.objectids[0, 0, 0].id) - ray.worker.cleanup() - - def testAssemble(self): - for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: - reload(module) - ray.init(start_ray_local=True, num_workers=1) - + def test_assemble(self): a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE], np.array([[a], [b]])) assert_equal(x.assemble(), np.vstack([np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]), np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])])) - ray.worker.cleanup() - - def testMethods(self): - for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: - reload(module) - ray.init(start_ray_local=True, num_objstores=2, num_workers=10) - + def test_methods(self): x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51])) @@ -207,7 +205,5 @@ class DistributedArrayTest(unittest.TestCase): d2 = np.random.randint(1, 35) test_dist_qr(d1, d2) - ray.worker.cleanup() - if __name__ == "__main__": - unittest.main() + unittest.main(verbosity=2) diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index dca6d86e7..452cd2885 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -6,12 +6,18 @@ import numpy as np import test_functions -class MicroBenchmarkTest(unittest.TestCase): +class TestMicroBenchmarks(unittest.TestCase): - def testTiming(self): + @classmethod + def setUpClass(cls): reload(test_functions) ray.init(start_ray_local=True, num_workers=3) + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def test_timing(self): # measure the time required to submit a remote task to the scheduler elapsed_times = [] for _ in range(1000): @@ -77,7 +83,5 @@ class MicroBenchmarkTest(unittest.TestCase): print " worst: {}".format(elapsed_times[999]) # average_elapsed_time should be about 0.00087 - ray.worker.cleanup() - if __name__ == "__main__": - unittest.main() + unittest.main(verbosity=2) diff --git a/test/runtest.py b/test/runtest.py index 7b9b60d2a..7890223ab 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -33,32 +33,38 @@ class UserDefinedType(object): class SerializationTest(unittest.TestCase): - def roundTripTest(self, data): + @classmethod + def setUpClass(cls): + ray.init(start_ray_local=True, num_workers=0) + + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def round_trip_test(self, data): serialized, _ = ray.serialization.serialize(ray.worker.global_worker.handle, data) result = ray.serialization.deserialize(ray.worker.global_worker.handle, serialized) assert_equal(data, result) - def numpyTypeTest(self, typ): - self.roundTripTest(np.random.randint(0, 10, size=(100, 100)).astype(typ)) - self.roundTripTest(np.array(0).astype(typ)) - self.roundTripTest(np.empty((0,)).astype(typ)) - - def testSerialize(self): - ray.init(start_ray_local=True, num_workers=0) + def numpy_type_test(self, typ): + self.round_trip_test(np.random.randint(0, 10, size=(100, 100)).astype(typ)) + self.round_trip_test(np.array(0).astype(typ)) + self.round_trip_test(np.empty((0,)).astype(typ)) + def test_serialize(self): for val in RAY_TEST_OBJECTS: - self.roundTripTest(val) + self.round_trip_test(val) - self.roundTripTest(np.zeros((100, 100))) + self.round_trip_test(np.zeros((100, 100))) - self.numpyTypeTest("int8") - self.numpyTypeTest("uint8") - self.numpyTypeTest("int16") - self.numpyTypeTest("uint16") - self.numpyTypeTest("int32") - self.numpyTypeTest("uint32") - self.numpyTypeTest("float32") - self.numpyTypeTest("float64") + self.numpy_type_test("int8") + self.numpy_type_test("uint8") + self.numpy_type_test("int16") + self.numpy_type_test("uint16") + self.numpy_type_test("int32") + self.numpy_type_test("uint32") + self.numpy_type_test("float32") + self.numpy_type_test("float64") ref0 = ray.put(0) ref1 = ray.put(0) @@ -70,17 +76,15 @@ class SerializationTest(unittest.TestCase): result = ray.serialization.deserialize(ray.worker.global_worker.handle, capsule) self.assertTrue((a == result).all()) - self.roundTripTest(ref0) - self.roundTripTest([ref0, ref1, ref2, ref3]) - self.roundTripTest({"0": ref0, "1": ref1, "2": ref2, "3": ref3}) - self.roundTripTest((ref0, 1)) - - ray.worker.cleanup() + self.round_trip_test(ref0) + self.round_trip_test([ref0, ref1, ref2, ref3]) + self.round_trip_test({"0": ref0, "1": ref1, "2": ref2, "3": ref3}) + self.round_trip_test((ref0, 1)) class ObjStoreTest(unittest.TestCase): # Test setting up object stores, transfering data between them and retrieving data to a client - def testObjStore(self): + def test_object_store(self): node_ip_address = "127.0.0.1" scheduler_address = ray.services.start_ray_local(num_objstores=2, num_workers=0, worker_path=None) ray.connect(node_ip_address, scheduler_address, mode=ray.SCRIPT_MODE) @@ -126,15 +130,6 @@ class ObjStoreTest(unittest.TestCase): result = ray.get(objectid, w1) assert_equal(result, data) - # This test fails. See https://github.com/amplab/ray/issues/159. - # getting multiple times shouldn't matter - # for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: - # objectid = worker.put(data, w1) - # result = worker.get(objectid, w2) - # result = worker.get(objectid, w2) - # result = worker.get(objectid, w2) - # assert_equal(result, data) - # Getting a buffer after modifying it before it finishes should return updated buffer objectid = ray.libraylib.get_objectid(w1.handle) buf = ray.libraylib.allocate_buffer(w1.handle, objectid, 100) @@ -148,11 +143,18 @@ class ObjStoreTest(unittest.TestCase): ray.disconnect(worker=w2) ray.worker.cleanup() -class WorkerTest(unittest.TestCase): +class APITest(unittest.TestCase): - def testPutGet(self): - ray.init(start_ray_local=True, num_workers=0) + @classmethod + def setUpClass(cls): + reload(test_functions) + ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def test_put_get(self): for i in range(100): value_before = i * 10 ** 6 objectid = ray.put(value_before) @@ -177,14 +179,18 @@ class WorkerTest(unittest.TestCase): value_after = ray.get(objectid) self.assertEqual(value_before, value_after) - ray.worker.cleanup() - -class APITest(unittest.TestCase): - - def testObjectIDAliasing(self): - reload(test_functions) - ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) + @unittest.skip("This test is currently disabled.") + def test_multiple_get(self): + # This test fails. See https://github.com/amplab/ray/issues/159. getting + # multiple times shouldn't matter + for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: + objectid = ray.put(data) + result = ray.get(objectid) + result = ray.get(objectid) + result = ray.get(objectid) + assert_equal(result, data) + def test_objectid_aliasing(self): ref = test_functions.test_alias_f.remote() assert_equal(ray.get(ref), np.ones([3, 4, 5])) ref = test_functions.test_alias_g.remote() @@ -192,12 +198,7 @@ class APITest(unittest.TestCase): ref = test_functions.test_alias_h.remote() assert_equal(ray.get(ref), np.ones([3, 4, 5])) - ray.worker.cleanup() - - def testKeywordArgs(self): - reload(test_functions) - ray.init(start_ray_local=True, num_workers=1) - + def test_keyword_args(self): x = test_functions.keyword_fct1.remote(1) self.assertEqual(ray.get(x), "1 hello") x = test_functions.keyword_fct1.remote(1, "hi") @@ -229,12 +230,7 @@ class APITest(unittest.TestCase): x = test_functions.keyword_fct3.remote(0, 1) self.assertEqual(ray.get(x), "0 1 hello world") - ray.worker.cleanup() - - def testVariableNumberOfArgs(self): - reload(test_functions) - ray.init(start_ray_local=True, num_workers=1) - + def test_variable_number_of_args(self): x = test_functions.varargs_fct1.remote(0, 1, 2) self.assertEqual(ray.get(x), "0 1 2") x = test_functions.varargs_fct2.remote(0, 1, 2) @@ -243,12 +239,7 @@ class APITest(unittest.TestCase): self.assertTrue(test_functions.kwargs_exception_thrown) self.assertTrue(test_functions.varargs_and_kwargs_exception_thrown) - ray.worker.cleanup() - - def testNoArgs(self): - reload(test_functions) - ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) - + def test_no_args(self): test_functions.no_op.remote() time.sleep(0.2) task_info = ray.task_info() @@ -262,26 +253,18 @@ class APITest(unittest.TestCase): self.assertEqual(len(task_info["running_tasks"]), 0) self.assertTrue("The @remote decorator for function test_functions.no_op_fail has 0 return values, but test_functions.no_op_fail returned more than 0 values." in task_info["failed_tasks"][0].get("error_message")) - ray.worker.cleanup() - - def testTypeChecking(self): - reload(test_functions) - ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) - + def test_type_checking(self): # Make sure that these functions throw exceptions because there return # values do not type check. + num_failed_tasks = len(ray.task_info()["failed_tasks"]) test_functions.test_return1.remote() test_functions.test_return2.remote() time.sleep(0.2) task_info = ray.task_info() - self.assertEqual(len(task_info["failed_tasks"]), 2) + self.assertEqual(len(task_info["failed_tasks"]), num_failed_tasks + 2) self.assertEqual(len(task_info["running_tasks"]), 0) - ray.worker.cleanup() - - def testDefiningRemoteFunctions(self): - ray.init(start_ray_local=True, num_workers=2) - + def test_defining_remote_functions(self): # Test that we can define a remote function in the shell. @ray.remote([int], [int]) def f(x): @@ -325,9 +308,13 @@ class APITest(unittest.TestCase): self.assertEqual(ray.get(l.remote(1)), 2) self.assertEqual(ray.get(m.remote(1)), 2) +class TestCachingBeforeInit(unittest.TestCase): + + @classmethod + def tearDownClass(cls): ray.worker.cleanup() - def testCachingReusables(self): + def test_caching_reusables(self): # Test that we can define reusable variables before the driver is connected. def foo_initializer(): return 1 @@ -337,7 +324,6 @@ class APITest(unittest.TestCase): return [] ray.reusables.foo = ray.Reusable(foo_initializer) ray.reusables.bar = ray.Reusable(bar_initializer, bar_reinitializer) - @ray.remote([], [int]) def use_foo(): return ray.reusables.foo @@ -353,13 +339,18 @@ class APITest(unittest.TestCase): self.assertEqual(ray.get(use_bar.remote()), [1]) self.assertEqual(ray.get(use_bar.remote()), [1]) - ray.worker.cleanup() +class TestFailures(unittest.TestCase): -class TaskStatusTest(unittest.TestCase): - def testFailedTask(self): + @classmethod + def setUpClass(cls): reload(test_functions) ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def test_failed_task(self): test_functions.test_alias_f.remote() test_functions.throw_exception_fct1.remote() test_functions.throw_exception_fct1.remote() @@ -391,11 +382,7 @@ class TaskStatusTest(unittest.TestCase): else: self.assertTrue(False) # ray.get should throw an exception - ray.worker.cleanup() - - def testFailImportingRemoteFunction(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) - + def test_fail_importing_remote_function(self): # This example is somewhat contrived. It should be successfully pickled, and # then it should throw an exception when it is unpickled. This may depend a # bit on the specifics of our pickler. @@ -414,11 +401,7 @@ class TaskStatusTest(unittest.TestCase): time.sleep(0.1) self.assertTrue("There is a problem here." in ray.task_info()["failed_remote_function_imports"][0]["error_message"]) - ray.worker.cleanup() - - def testFailImportingReusableVariable(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) - + def test_fail_importing_reusable_variable(self): # This will throw an exception when the reusable variable is imported on the # workers. def initializer(): @@ -430,11 +413,7 @@ class TaskStatusTest(unittest.TestCase): # Check that the error message is in the task info. self.assertTrue("The initializer failed." in ray.task_info()["failed_reusable_variable_imports"][0]["error_message"]) - ray.worker.cleanup() - - def testFailReinitializingVariable(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) - + def test_fail_reinitializing_variable(self): def initializer(): return 0 def reinitializer(foo): @@ -448,8 +427,6 @@ class TaskStatusTest(unittest.TestCase): # Check that the error message is in the task info. self.assertTrue("The reinitializer failed." in ray.task_info()["failed_reinitialize_reusable_variables"][0]["error_message"]) - ray.worker.cleanup() - def check_get_deallocated(data): x = ray.put(data) ray.get(x) @@ -460,20 +437,25 @@ def check_get_not_deallocated(data): y = ray.get(x) return y, x.id -class ReferenceCountingTest(unittest.TestCase): +class TestReferenceCounting(unittest.TestCase): - def testDeallocation(self): + @classmethod + def setUpClass(cls): reload(test_functions) for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True, num_workers=1) + ray.init(start_ray_local=True, num_workers=3) + @classmethod + def tearDownClass(cls): + ray.worker.cleanup() + + def test_deallocation(self): x = test_functions.test_alias_f.remote() ray.get(x) time.sleep(0.1) objectid_val = x.id self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1) - del x self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], -1) # -1 indicates deallocated @@ -482,7 +464,6 @@ class ReferenceCountingTest(unittest.TestCase): time.sleep(0.1) objectid_val = y.id self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 0, 0]) - del y self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1]) @@ -490,7 +471,6 @@ class ReferenceCountingTest(unittest.TestCase): time.sleep(0.1) objectid_val = z.id self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 1, 1]) - del z time.sleep(0.1) self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1]) @@ -501,7 +481,6 @@ class ReferenceCountingTest(unittest.TestCase): objectid_val = x.id time.sleep(0.1) self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 1, 1]) - del x time.sleep(0.1) self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, 1, 1]) @@ -512,11 +491,7 @@ class ReferenceCountingTest(unittest.TestCase): time.sleep(0.1) self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1]) - ray.worker.cleanup() - - def testGet(self): - ray.init(start_ray_local=True, num_workers=3) - + def test_get(self): for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]: objectid_val = check_get_deallocated(val) self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], -1) @@ -525,36 +500,35 @@ class ReferenceCountingTest(unittest.TestCase): x, objectid_val = check_get_not_deallocated(val) self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1) + @unittest.skip("This test is currently disabled.") + def test_get_twice(self): # The following currently segfaults: The second "result = " closes the # memory segment as soon as the assignment is done (and the first result # goes out of scope). - # data = np.zeros([10, 20]) - # objectid = ray.put(data) - # result = worker.get(objectid) - # result = worker.get(objectid) - # assert_equal(result, data) + data = np.zeros([10, 20]) + objectid = ray.put(data) + result = ray.get(objectid) + result = ray.get(objectid) + assert_equal(result, data) + @unittest.skip("This test is currently disabled.") + def test_get_bool_and_none(self): + # This fails, because for bool and None, we cannot track python reference + # counts and therefore cannot keep the refcount up (see + # 5281bd414f6b404f61e1fe25ec5f6651defee206). The resulting behavior is still + # correct however because True, False and None are returned by get "by + # value" and therefore can be reclaimed from the object store safely. + for val in [True, False, None]: + x, objectid_val = check_get_not_deallocated(val) + self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1) + +class TestPythonMode(unittest.TestCase): + + @classmethod + def tearDownClass(cls): ray.worker.cleanup() - # @unittest.expectedFailure - # def testGetFailing(self): - # ray.init(start_ray_local=True, num_workers=3) - - # # This is failing, because for bool and None, we cannot track python - # # refcounts and therefore cannot keep the refcount up - # # (see 5281bd414f6b404f61e1fe25ec5f6651defee206). - # # The resulting behavior is still correct however because True, False and - # # None are returned by get "by value" and therefore can be reclaimed from - # # the object store safely. - # for val in [True, False, None]: - # x, objectid_val = check_get_not_deallocated(val) - # self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1) - - # ray.worker.cleanup() - -class PythonModeTest(unittest.TestCase): - - def testPythonMode(self): + def test_python_mode(self): reload(test_functions) ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE) @@ -572,11 +546,13 @@ class PythonModeTest(unittest.TestCase): assert_equal(aref, np.array([0, 0])) # python_mode_g should not mutate aref assert_equal(bref, np.array([1, 0])) +class TestPythonCExtensions(unittest.TestCase): + + @classmethod + def tearDownClass(cls): ray.worker.cleanup() -class PythonCExtensionTest(unittest.TestCase): - - def testReferenceCountNone(self): + def test_reference_counting_bools_and_none(self): ray.init(start_ray_local=True, num_workers=1) # Make sure that we aren't accidentally messing up Python's reference counts. @@ -588,23 +564,22 @@ class PythonCExtensionTest(unittest.TestCase): second_count = ray.get(f.remote()) self.assertEqual(first_count, second_count) +class TestReusableVariables(unittest.TestCase): + + @classmethod + def tearDownClass(cls): ray.worker.cleanup() -class ReusablesTest(unittest.TestCase): - - def testReusables(self): + def test_reusable_variables(self): ray.init(start_ray_local=True, num_workers=1) # Test that we can add a variable to the key-value store. - def foo_initializer(): return 1 def foo_reinitializer(foo): return foo - ray.reusables.foo = ray.Reusable(foo_initializer, foo_reinitializer) self.assertEqual(ray.reusables.foo, 1) - @ray.remote([], [int]) def use_foo(): return ray.reusables.foo @@ -613,12 +588,9 @@ class ReusablesTest(unittest.TestCase): self.assertEqual(ray.get(use_foo.remote()), 1) # Test that we can add a variable to the key-value store, mutate it, and reset it. - def bar_initializer(): return [1, 2, 3] - ray.reusables.bar = ray.Reusable(bar_initializer) - @ray.remote([], [list]) def use_bar(): ray.reusables.bar.append(4) @@ -628,16 +600,13 @@ class ReusablesTest(unittest.TestCase): self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) # Test that we can use the reinitializer. - def baz_initializer(): return np.zeros([4]) def baz_reinitializer(baz): for i in range(len(baz)): baz[i] = 0 return baz - ray.reusables.baz = ray.Reusable(baz_initializer, baz_reinitializer) - @ray.remote([int], [np.ndarray]) def use_baz(i): baz = ray.reusables.baz @@ -651,14 +620,11 @@ class ReusablesTest(unittest.TestCase): # Make sure the reinitializer is actually getting called. Note that this is # not the correct usage of a reinitializer because it does not reset qux to # its original state. This is just for testing. - def qux_initializer(): return 0 def qux_reinitializer(x): return x + 1 - ray.reusables.qux = ray.Reusable(qux_initializer, qux_reinitializer) - @ray.remote([], [int]) def use_qux(): return ray.reusables.qux @@ -666,27 +632,31 @@ class ReusablesTest(unittest.TestCase): self.assertEqual(ray.get(use_qux.remote()), 1) self.assertEqual(ray.get(use_qux.remote()), 2) +class TestAttachingToCluster(unittest.TestCase): + + @classmethod + def tearDownClass(cls): ray.worker.cleanup() -class ClusterAttachingTest(unittest.TestCase): - - def testAttachingToCluster(self): + def test_attaching_to_cluster(self): node_ip_address = "127.0.0.1" scheduler_port = np.random.randint(40000, 50000) scheduler_address = "{}:{}".format(node_ip_address, scheduler_port) ray.services.start_scheduler(scheduler_address, cleanup=True) ray.services.start_node(scheduler_address, node_ip_address, num_workers=1, cleanup=True) - ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address) - @ray.remote([int], [int]) def f(x): return x + 1 self.assertEqual(ray.get(f.remote(0)), 1) +class TestAttachingToClusterWithMultipleObjectStores(unittest.TestCase): + + @classmethod + def tearDownClass(cls): ray.worker.cleanup() - def testAttachingToClusterWithMultipleObjectStores(self): + def test_attaching_to_cluster_with_multiple_object_stores(self): node_ip_address = "127.0.0.1" scheduler_port = np.random.randint(40000, 50000) scheduler_address = "{}:{}".format(node_ip_address, scheduler_port) @@ -694,15 +664,11 @@ class ClusterAttachingTest(unittest.TestCase): ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) - ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address) - @ray.remote([int], [int]) def f(x): return x + 1 self.assertEqual(ray.get(f.remote(0)), 1) - ray.worker.cleanup() - if __name__ == "__main__": - unittest.main() + unittest.main(verbosity=2)