Make tests more informative (#372)

* Make tests more informative

* Change grpc status checks to warning instead of fatal
This commit is contained in:
Robert Nishihara
2016-08-11 12:40:55 -07:00
committed by Philipp Moritz
parent b0ecff69ad
commit fd353250c8
4 changed files with 162 additions and 192 deletions
+5 -1
View File
@@ -49,6 +49,10 @@ extern "C" __declspec(dllimport) int __stdcall IsDebuggerPresent();
if (!(condition)) {\
RAY_LOG(RAY_FATAL, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \
}
#define RAY_WARN(condition, message) \
if (!(condition)) {\
RAY_LOG(RAY_INFO, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \
}
#define RAY_CHECK_EQ(var1, var2, message) RAY_CHECK((var1) == (var2), message)
#define RAY_CHECK_NEQ(var1, var2, message) RAY_CHECK((var1) != (var2), message)
@@ -60,5 +64,5 @@ extern "C" __declspec(dllimport) int __stdcall IsDebuggerPresent();
#define RAY_CHECK_GRPC(expr) \
do { \
grpc::Status _s = (expr); \
RAY_CHECK(_s.ok(), "grpc call failed with message " << _s.error_message()); \
RAY_WARN(_s.ok(), "grpc call failed with message " << _s.error_message()); \
} while (0);
+21 -25
View File
@@ -9,13 +9,19 @@ from numpy.testing import assert_equal, assert_almost_equal
import ray.array.remote as ra
import ray.array.distributed as da
class RemoteArrayTest(unittest.TestCase):
class TestRemoteArrays(unittest.TestCase):
def testMethods(self):
@classmethod
def setUpClass(cls):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True)
ray.init(start_ray_local=True, num_workers=1)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def test_methods(self):
# test eye
object_id = ra.eye.remote(3)
val = ray.get(object_id)
@@ -41,40 +47,32 @@ class RemoteArrayTest(unittest.TestCase):
r_val = ray.get(r_id)
assert_almost_equal(np.dot(q_val, r_val), a_val)
ray.worker.cleanup()
class TestDistributedArrays(unittest.TestCase):
class DistributedArrayTest(unittest.TestCase):
def testSerialization(self):
@classmethod
def setUpClass(cls):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True, num_workers=0)
ray.init(start_ray_local=True, num_workers=10, num_objstores=2)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def test_serialization(self):
x = da.DistArray([2, 3, 4], np.array([[[ray.put(0)]]]))
capsule, _ = ray.serialization.serialize(ray.worker.global_worker.handle, x)
y = ray.serialization.deserialize(ray.worker.global_worker.handle, capsule)
self.assertEqual(x.shape, y.shape)
self.assertEqual(x.objectids[0, 0, 0].id, y.objectids[0, 0, 0].id)
ray.worker.cleanup()
def testAssemble(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True, num_workers=1)
def test_assemble(self):
a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE], np.array([[a], [b]]))
assert_equal(x.assemble(), np.vstack([np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]), np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])]))
ray.worker.cleanup()
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True, num_objstores=2, num_workers=10)
def test_methods(self):
x = da.zeros.remote([9, 25, 51], "float")
assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51]))
@@ -207,7 +205,5 @@ class DistributedArrayTest(unittest.TestCase):
d2 = np.random.randint(1, 35)
test_dist_qr(d1, d2)
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main()
unittest.main(verbosity=2)
+9 -5
View File
@@ -6,12 +6,18 @@ import numpy as np
import test_functions
class MicroBenchmarkTest(unittest.TestCase):
class TestMicroBenchmarks(unittest.TestCase):
def testTiming(self):
@classmethod
def setUpClass(cls):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def test_timing(self):
# measure the time required to submit a remote task to the scheduler
elapsed_times = []
for _ in range(1000):
@@ -77,7 +83,5 @@ class MicroBenchmarkTest(unittest.TestCase):
print " worst: {}".format(elapsed_times[999])
# average_elapsed_time should be about 0.00087
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main()
unittest.main(verbosity=2)
+127 -161
View File
@@ -33,32 +33,38 @@ class UserDefinedType(object):
class SerializationTest(unittest.TestCase):
def roundTripTest(self, data):
@classmethod
def setUpClass(cls):
ray.init(start_ray_local=True, num_workers=0)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def round_trip_test(self, data):
serialized, _ = ray.serialization.serialize(ray.worker.global_worker.handle, data)
result = ray.serialization.deserialize(ray.worker.global_worker.handle, serialized)
assert_equal(data, result)
def numpyTypeTest(self, typ):
self.roundTripTest(np.random.randint(0, 10, size=(100, 100)).astype(typ))
self.roundTripTest(np.array(0).astype(typ))
self.roundTripTest(np.empty((0,)).astype(typ))
def testSerialize(self):
ray.init(start_ray_local=True, num_workers=0)
def numpy_type_test(self, typ):
self.round_trip_test(np.random.randint(0, 10, size=(100, 100)).astype(typ))
self.round_trip_test(np.array(0).astype(typ))
self.round_trip_test(np.empty((0,)).astype(typ))
def test_serialize(self):
for val in RAY_TEST_OBJECTS:
self.roundTripTest(val)
self.round_trip_test(val)
self.roundTripTest(np.zeros((100, 100)))
self.round_trip_test(np.zeros((100, 100)))
self.numpyTypeTest("int8")
self.numpyTypeTest("uint8")
self.numpyTypeTest("int16")
self.numpyTypeTest("uint16")
self.numpyTypeTest("int32")
self.numpyTypeTest("uint32")
self.numpyTypeTest("float32")
self.numpyTypeTest("float64")
self.numpy_type_test("int8")
self.numpy_type_test("uint8")
self.numpy_type_test("int16")
self.numpy_type_test("uint16")
self.numpy_type_test("int32")
self.numpy_type_test("uint32")
self.numpy_type_test("float32")
self.numpy_type_test("float64")
ref0 = ray.put(0)
ref1 = ray.put(0)
@@ -70,17 +76,15 @@ class SerializationTest(unittest.TestCase):
result = ray.serialization.deserialize(ray.worker.global_worker.handle, capsule)
self.assertTrue((a == result).all())
self.roundTripTest(ref0)
self.roundTripTest([ref0, ref1, ref2, ref3])
self.roundTripTest({"0": ref0, "1": ref1, "2": ref2, "3": ref3})
self.roundTripTest((ref0, 1))
ray.worker.cleanup()
self.round_trip_test(ref0)
self.round_trip_test([ref0, ref1, ref2, ref3])
self.round_trip_test({"0": ref0, "1": ref1, "2": ref2, "3": ref3})
self.round_trip_test((ref0, 1))
class ObjStoreTest(unittest.TestCase):
# Test setting up object stores, transfering data between them and retrieving data to a client
def testObjStore(self):
def test_object_store(self):
node_ip_address = "127.0.0.1"
scheduler_address = ray.services.start_ray_local(num_objstores=2, num_workers=0, worker_path=None)
ray.connect(node_ip_address, scheduler_address, mode=ray.SCRIPT_MODE)
@@ -126,15 +130,6 @@ class ObjStoreTest(unittest.TestCase):
result = ray.get(objectid, w1)
assert_equal(result, data)
# This test fails. See https://github.com/amplab/ray/issues/159.
# getting multiple times shouldn't matter
# for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]:
# objectid = worker.put(data, w1)
# result = worker.get(objectid, w2)
# result = worker.get(objectid, w2)
# result = worker.get(objectid, w2)
# assert_equal(result, data)
# Getting a buffer after modifying it before it finishes should return updated buffer
objectid = ray.libraylib.get_objectid(w1.handle)
buf = ray.libraylib.allocate_buffer(w1.handle, objectid, 100)
@@ -148,11 +143,18 @@ class ObjStoreTest(unittest.TestCase):
ray.disconnect(worker=w2)
ray.worker.cleanup()
class WorkerTest(unittest.TestCase):
class APITest(unittest.TestCase):
def testPutGet(self):
ray.init(start_ray_local=True, num_workers=0)
@classmethod
def setUpClass(cls):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def test_put_get(self):
for i in range(100):
value_before = i * 10 ** 6
objectid = ray.put(value_before)
@@ -177,14 +179,18 @@ class WorkerTest(unittest.TestCase):
value_after = ray.get(objectid)
self.assertEqual(value_before, value_after)
ray.worker.cleanup()
class APITest(unittest.TestCase):
def testObjectIDAliasing(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
@unittest.skip("This test is currently disabled.")
def test_multiple_get(self):
# This test fails. See https://github.com/amplab/ray/issues/159. getting
# multiple times shouldn't matter
for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]:
objectid = ray.put(data)
result = ray.get(objectid)
result = ray.get(objectid)
result = ray.get(objectid)
assert_equal(result, data)
def test_objectid_aliasing(self):
ref = test_functions.test_alias_f.remote()
assert_equal(ray.get(ref), np.ones([3, 4, 5]))
ref = test_functions.test_alias_g.remote()
@@ -192,12 +198,7 @@ class APITest(unittest.TestCase):
ref = test_functions.test_alias_h.remote()
assert_equal(ray.get(ref), np.ones([3, 4, 5]))
ray.worker.cleanup()
def testKeywordArgs(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1)
def test_keyword_args(self):
x = test_functions.keyword_fct1.remote(1)
self.assertEqual(ray.get(x), "1 hello")
x = test_functions.keyword_fct1.remote(1, "hi")
@@ -229,12 +230,7 @@ class APITest(unittest.TestCase):
x = test_functions.keyword_fct3.remote(0, 1)
self.assertEqual(ray.get(x), "0 1 hello world")
ray.worker.cleanup()
def testVariableNumberOfArgs(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1)
def test_variable_number_of_args(self):
x = test_functions.varargs_fct1.remote(0, 1, 2)
self.assertEqual(ray.get(x), "0 1 2")
x = test_functions.varargs_fct2.remote(0, 1, 2)
@@ -243,12 +239,7 @@ class APITest(unittest.TestCase):
self.assertTrue(test_functions.kwargs_exception_thrown)
self.assertTrue(test_functions.varargs_and_kwargs_exception_thrown)
ray.worker.cleanup()
def testNoArgs(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
def test_no_args(self):
test_functions.no_op.remote()
time.sleep(0.2)
task_info = ray.task_info()
@@ -262,26 +253,18 @@ class APITest(unittest.TestCase):
self.assertEqual(len(task_info["running_tasks"]), 0)
self.assertTrue("The @remote decorator for function test_functions.no_op_fail has 0 return values, but test_functions.no_op_fail returned more than 0 values." in task_info["failed_tasks"][0].get("error_message"))
ray.worker.cleanup()
def testTypeChecking(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
def test_type_checking(self):
# Make sure that these functions throw exceptions because there return
# values do not type check.
num_failed_tasks = len(ray.task_info()["failed_tasks"])
test_functions.test_return1.remote()
test_functions.test_return2.remote()
time.sleep(0.2)
task_info = ray.task_info()
self.assertEqual(len(task_info["failed_tasks"]), 2)
self.assertEqual(len(task_info["failed_tasks"]), num_failed_tasks + 2)
self.assertEqual(len(task_info["running_tasks"]), 0)
ray.worker.cleanup()
def testDefiningRemoteFunctions(self):
ray.init(start_ray_local=True, num_workers=2)
def test_defining_remote_functions(self):
# Test that we can define a remote function in the shell.
@ray.remote([int], [int])
def f(x):
@@ -325,9 +308,13 @@ class APITest(unittest.TestCase):
self.assertEqual(ray.get(l.remote(1)), 2)
self.assertEqual(ray.get(m.remote(1)), 2)
class TestCachingBeforeInit(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def testCachingReusables(self):
def test_caching_reusables(self):
# Test that we can define reusable variables before the driver is connected.
def foo_initializer():
return 1
@@ -337,7 +324,6 @@ class APITest(unittest.TestCase):
return []
ray.reusables.foo = ray.Reusable(foo_initializer)
ray.reusables.bar = ray.Reusable(bar_initializer, bar_reinitializer)
@ray.remote([], [int])
def use_foo():
return ray.reusables.foo
@@ -353,13 +339,18 @@ class APITest(unittest.TestCase):
self.assertEqual(ray.get(use_bar.remote()), [1])
self.assertEqual(ray.get(use_bar.remote()), [1])
ray.worker.cleanup()
class TestFailures(unittest.TestCase):
class TaskStatusTest(unittest.TestCase):
def testFailedTask(self):
@classmethod
def setUpClass(cls):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def test_failed_task(self):
test_functions.test_alias_f.remote()
test_functions.throw_exception_fct1.remote()
test_functions.throw_exception_fct1.remote()
@@ -391,11 +382,7 @@ class TaskStatusTest(unittest.TestCase):
else:
self.assertTrue(False) # ray.get should throw an exception
ray.worker.cleanup()
def testFailImportingRemoteFunction(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
def test_fail_importing_remote_function(self):
# This example is somewhat contrived. It should be successfully pickled, and
# then it should throw an exception when it is unpickled. This may depend a
# bit on the specifics of our pickler.
@@ -414,11 +401,7 @@ class TaskStatusTest(unittest.TestCase):
time.sleep(0.1)
self.assertTrue("There is a problem here." in ray.task_info()["failed_remote_function_imports"][0]["error_message"])
ray.worker.cleanup()
def testFailImportingReusableVariable(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
def test_fail_importing_reusable_variable(self):
# This will throw an exception when the reusable variable is imported on the
# workers.
def initializer():
@@ -430,11 +413,7 @@ class TaskStatusTest(unittest.TestCase):
# Check that the error message is in the task info.
self.assertTrue("The initializer failed." in ray.task_info()["failed_reusable_variable_imports"][0]["error_message"])
ray.worker.cleanup()
def testFailReinitializingVariable(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
def test_fail_reinitializing_variable(self):
def initializer():
return 0
def reinitializer(foo):
@@ -448,8 +427,6 @@ class TaskStatusTest(unittest.TestCase):
# Check that the error message is in the task info.
self.assertTrue("The reinitializer failed." in ray.task_info()["failed_reinitialize_reusable_variables"][0]["error_message"])
ray.worker.cleanup()
def check_get_deallocated(data):
x = ray.put(data)
ray.get(x)
@@ -460,20 +437,25 @@ def check_get_not_deallocated(data):
y = ray.get(x)
return y, x.id
class ReferenceCountingTest(unittest.TestCase):
class TestReferenceCounting(unittest.TestCase):
def testDeallocation(self):
@classmethod
def setUpClass(cls):
reload(test_functions)
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True, num_workers=1)
ray.init(start_ray_local=True, num_workers=3)
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def test_deallocation(self):
x = test_functions.test_alias_f.remote()
ray.get(x)
time.sleep(0.1)
objectid_val = x.id
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1)
del x
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], -1) # -1 indicates deallocated
@@ -482,7 +464,6 @@ class ReferenceCountingTest(unittest.TestCase):
time.sleep(0.1)
objectid_val = y.id
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 0, 0])
del y
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1])
@@ -490,7 +471,6 @@ class ReferenceCountingTest(unittest.TestCase):
time.sleep(0.1)
objectid_val = z.id
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 1, 1])
del z
time.sleep(0.1)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1])
@@ -501,7 +481,6 @@ class ReferenceCountingTest(unittest.TestCase):
objectid_val = x.id
time.sleep(0.1)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [1, 1, 1])
del x
time.sleep(0.1)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, 1, 1])
@@ -512,11 +491,7 @@ class ReferenceCountingTest(unittest.TestCase):
time.sleep(0.1)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val:(objectid_val + 3)], [-1, -1, -1])
ray.worker.cleanup()
def testGet(self):
ray.init(start_ray_local=True, num_workers=3)
def test_get(self):
for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]:
objectid_val = check_get_deallocated(val)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], -1)
@@ -525,36 +500,35 @@ class ReferenceCountingTest(unittest.TestCase):
x, objectid_val = check_get_not_deallocated(val)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1)
@unittest.skip("This test is currently disabled.")
def test_get_twice(self):
# The following currently segfaults: The second "result = " closes the
# memory segment as soon as the assignment is done (and the first result
# goes out of scope).
# data = np.zeros([10, 20])
# objectid = ray.put(data)
# result = worker.get(objectid)
# result = worker.get(objectid)
# assert_equal(result, data)
data = np.zeros([10, 20])
objectid = ray.put(data)
result = ray.get(objectid)
result = ray.get(objectid)
assert_equal(result, data)
@unittest.skip("This test is currently disabled.")
def test_get_bool_and_none(self):
# This fails, because for bool and None, we cannot track python reference
# counts and therefore cannot keep the refcount up (see
# 5281bd414f6b404f61e1fe25ec5f6651defee206). The resulting behavior is still
# correct however because True, False and None are returned by get "by
# value" and therefore can be reclaimed from the object store safely.
for val in [True, False, None]:
x, objectid_val = check_get_not_deallocated(val)
self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1)
class TestPythonMode(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
# @unittest.expectedFailure
# def testGetFailing(self):
# ray.init(start_ray_local=True, num_workers=3)
# # This is failing, because for bool and None, we cannot track python
# # refcounts and therefore cannot keep the refcount up
# # (see 5281bd414f6b404f61e1fe25ec5f6651defee206).
# # The resulting behavior is still correct however because True, False and
# # None are returned by get "by value" and therefore can be reclaimed from
# # the object store safely.
# for val in [True, False, None]:
# x, objectid_val = check_get_not_deallocated(val)
# self.assertEqual(ray.scheduler_info()["reference_counts"][objectid_val], 1)
# ray.worker.cleanup()
class PythonModeTest(unittest.TestCase):
def testPythonMode(self):
def test_python_mode(self):
reload(test_functions)
ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE)
@@ -572,11 +546,13 @@ class PythonModeTest(unittest.TestCase):
assert_equal(aref, np.array([0, 0])) # python_mode_g should not mutate aref
assert_equal(bref, np.array([1, 0]))
class TestPythonCExtensions(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
class PythonCExtensionTest(unittest.TestCase):
def testReferenceCountNone(self):
def test_reference_counting_bools_and_none(self):
ray.init(start_ray_local=True, num_workers=1)
# Make sure that we aren't accidentally messing up Python's reference counts.
@@ -588,23 +564,22 @@ class PythonCExtensionTest(unittest.TestCase):
second_count = ray.get(f.remote())
self.assertEqual(first_count, second_count)
class TestReusableVariables(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
class ReusablesTest(unittest.TestCase):
def testReusables(self):
def test_reusable_variables(self):
ray.init(start_ray_local=True, num_workers=1)
# Test that we can add a variable to the key-value store.
def foo_initializer():
return 1
def foo_reinitializer(foo):
return foo
ray.reusables.foo = ray.Reusable(foo_initializer, foo_reinitializer)
self.assertEqual(ray.reusables.foo, 1)
@ray.remote([], [int])
def use_foo():
return ray.reusables.foo
@@ -613,12 +588,9 @@ class ReusablesTest(unittest.TestCase):
self.assertEqual(ray.get(use_foo.remote()), 1)
# Test that we can add a variable to the key-value store, mutate it, and reset it.
def bar_initializer():
return [1, 2, 3]
ray.reusables.bar = ray.Reusable(bar_initializer)
@ray.remote([], [list])
def use_bar():
ray.reusables.bar.append(4)
@@ -628,16 +600,13 @@ class ReusablesTest(unittest.TestCase):
self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4])
# Test that we can use the reinitializer.
def baz_initializer():
return np.zeros([4])
def baz_reinitializer(baz):
for i in range(len(baz)):
baz[i] = 0
return baz
ray.reusables.baz = ray.Reusable(baz_initializer, baz_reinitializer)
@ray.remote([int], [np.ndarray])
def use_baz(i):
baz = ray.reusables.baz
@@ -651,14 +620,11 @@ class ReusablesTest(unittest.TestCase):
# Make sure the reinitializer is actually getting called. Note that this is
# not the correct usage of a reinitializer because it does not reset qux to
# its original state. This is just for testing.
def qux_initializer():
return 0
def qux_reinitializer(x):
return x + 1
ray.reusables.qux = ray.Reusable(qux_initializer, qux_reinitializer)
@ray.remote([], [int])
def use_qux():
return ray.reusables.qux
@@ -666,27 +632,31 @@ class ReusablesTest(unittest.TestCase):
self.assertEqual(ray.get(use_qux.remote()), 1)
self.assertEqual(ray.get(use_qux.remote()), 2)
class TestAttachingToCluster(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
class ClusterAttachingTest(unittest.TestCase):
def testAttachingToCluster(self):
def test_attaching_to_cluster(self):
node_ip_address = "127.0.0.1"
scheduler_port = np.random.randint(40000, 50000)
scheduler_address = "{}:{}".format(node_ip_address, scheduler_port)
ray.services.start_scheduler(scheduler_address, cleanup=True)
ray.services.start_node(scheduler_address, node_ip_address, num_workers=1, cleanup=True)
ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address)
@ray.remote([int], [int])
def f(x):
return x + 1
self.assertEqual(ray.get(f.remote(0)), 1)
class TestAttachingToClusterWithMultipleObjectStores(unittest.TestCase):
@classmethod
def tearDownClass(cls):
ray.worker.cleanup()
def testAttachingToClusterWithMultipleObjectStores(self):
def test_attaching_to_cluster_with_multiple_object_stores(self):
node_ip_address = "127.0.0.1"
scheduler_port = np.random.randint(40000, 50000)
scheduler_address = "{}:{}".format(node_ip_address, scheduler_port)
@@ -694,15 +664,11 @@ class ClusterAttachingTest(unittest.TestCase):
ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True)
ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True)
ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True)
ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address)
@ray.remote([int], [int])
def f(x):
return x + 1
self.assertEqual(ray.get(f.remote(0)), 1)
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main()
unittest.main(verbosity=2)