python/test: Faster tests and better BUILD (#9791)

This commit is contained in:
Barak Michener
2020-08-06 10:58:42 -07:00
committed by GitHub
parent 6a1acce791
commit 21994c594b
7 changed files with 984 additions and 1468 deletions
+214 -242
View File
@@ -188,115 +188,6 @@ def test_redefining_remote_functions(shutdown_only):
assert ray.get(ray.get(h.remote(i))) == i
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_get_multiple(ray_start_regular):
object_refs = [ray.put(i) for i in range(10)]
assert ray.get(object_refs) == list(range(10))
# Get a random choice of object refs with duplicates.
indices = list(np.random.choice(range(10), 5))
indices += indices
results = ray.get([object_refs[i] for i in indices])
assert results == indices
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_get_multiple_experimental(ray_start_regular):
object_refs = [ray.put(i) for i in range(10)]
object_refs_tuple = tuple(object_refs)
assert ray.experimental.get(object_refs_tuple) == list(range(10))
object_refs_nparray = np.array(object_refs)
assert ray.experimental.get(object_refs_nparray) == list(range(10))
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
def test_get_dict(ray_start_regular):
d = {str(i): ray.put(i) for i in range(5)}
for i in range(5, 10):
d[str(i)] = i
result = ray.experimental.get(d)
expected = {str(i): i for i in range(10)}
assert result == expected
def test_get_with_timeout(ray_start_regular):
signal = ray.test_utils.SignalActor.remote()
# Check that get() returns early if object is ready.
start = time.time()
ray.get(signal.wait.remote(should_wait=False), timeout=30)
assert time.time() - start < 30
# Check that get() raises a TimeoutError after the timeout if the object
# is not ready yet.
result_id = signal.wait.remote()
with pytest.raises(RayTimeoutError):
ray.get(result_id, timeout=0.1)
# Check that a subsequent get() returns early.
ray.get(signal.send.remote())
start = time.time()
ray.get(result_id, timeout=30)
assert time.time() - start < 30
@pytest.mark.parametrize(
"ray_start_regular", [{
"local_mode": True
}, {
"local_mode": False
}],
indirect=True)
# https://github.com/ray-project/ray/issues/6329
def test_call_actors_indirect_through_tasks(ray_start_regular):
@ray.remote
class Counter:
def __init__(self, value):
self.value = int(value)
def increase(self, delta):
self.value += int(delta)
return self.value
@ray.remote
def foo(object):
return ray.get(object.increase.remote(1))
@ray.remote
def bar(object):
return ray.get(object.increase.remote(1))
@ray.remote
def zoo(object):
return ray.get(object[0].increase.remote(1))
c = Counter.remote(0)
for _ in range(0, 100):
ray.get(foo.remote(c))
ray.get(bar.remote(c))
ray.get(zoo.remote([c]))
def test_call_matrix(shutdown_only):
ray.init(object_store_memory=1000 * 1024 * 1024)
@@ -362,62 +253,6 @@ def test_call_matrix(shutdown_only):
check(source_actor, dest_actor, is_large, out_of_band)
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
"num_nodes": 1,
}, {
"num_cpus": 1,
"num_nodes": 2,
}],
indirect=True)
def test_call_chain(ray_start_cluster):
@ray.remote
def g(x):
return x + 1
x = 0
for _ in range(100):
x = g.remote(x)
assert ray.get(x) == 100
def test_inline_arg_memory_corruption(ray_start_regular):
@ray.remote
def f():
return np.zeros(1000, dtype=np.uint8)
@ray.remote
class Actor:
def __init__(self):
self.z = []
def add(self, x):
self.z.append(x)
for prev in self.z:
assert np.sum(prev) == 0, ("memory corruption detected", prev)
a = Actor.remote()
for i in range(100):
ray.get(a.add.remote(f.remote()))
def test_skip_plasma(ray_start_regular):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self, x):
return x * 2
a = Actor.remote()
obj_ref = a.f.remote(1)
# it is not stored in plasma
assert not ray.worker.global_worker.core_worker.object_exists(obj_ref)
assert ray.get(obj_ref) == 2
def test_actor_call_order(shutdown_only):
ray.init(num_cpus=4)
@@ -441,53 +276,6 @@ def test_actor_call_order(shutdown_only):
for i in range(100)]) == list(range(100))
def test_actor_large_objects(ray_start_regular):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self):
time.sleep(1)
return np.zeros(10000000)
a = Actor.remote()
obj_ref = a.f.remote()
assert not ray.worker.global_worker.core_worker.object_exists(obj_ref)
done, _ = ray.wait([obj_ref])
assert len(done) == 1
assert ray.worker.global_worker.core_worker.object_exists(obj_ref)
assert isinstance(ray.get(obj_ref), np.ndarray)
def test_actor_pass_by_ref(ray_start_regular):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self, x):
return x * 2
@ray.remote
def f(x):
return x
@ray.remote
def error():
sys.exit(0)
a = Actor.remote()
assert ray.get(a.f.remote(f.remote(1))) == 2
fut = [a.f.remote(f.remote(i)) for i in range(100)]
assert ray.get(fut) == [i * 2 for i in range(100)]
# propagates errors for pass by ref
with pytest.raises(Exception):
ray.get(a.f.remote(error.remote()))
def test_actor_pass_by_ref_order_optimization(shutdown_only):
ray.init(num_cpus=4)
@@ -525,7 +313,217 @@ def test_actor_pass_by_ref_order_optimization(shutdown_only):
assert delta < 10, "did not skip slow value"
def test_actor_recursive(ray_start_regular):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
"num_nodes": 1,
}, {
"num_cpus": 1,
"num_nodes": 2,
}],
indirect=True)
def test_call_chain(ray_start_cluster):
@ray.remote
def g(x):
return x + 1
x = 0
for _ in range(100):
x = g.remote(x)
assert ray.get(x) == 100
def test_internal_config_when_connecting(ray_start_cluster):
config = json.dumps({
"object_pinning_enabled": 0,
"initial_reconstruction_timeout_milliseconds": 200
})
cluster = ray.cluster_utils.Cluster()
cluster.add_node(
_internal_config=config, object_store_memory=100 * 1024 * 1024)
cluster.wait_for_nodes()
# Specifying _internal_config when connecting to a cluster is disallowed.
with pytest.raises(ValueError):
ray.init(address=cluster.address, _internal_config=config)
# Check that the config was picked up (object pinning is disabled).
ray.init(address=cluster.address)
obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
for _ in range(5):
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
# This would not raise an exception if object pinning was enabled.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(obj_ref)
def test_get_multiple(ray_start_regular_shared):
object_refs = [ray.put(i) for i in range(10)]
assert ray.get(object_refs) == list(range(10))
# Get a random choice of object refs with duplicates.
indices = list(np.random.choice(range(10), 5))
indices += indices
results = ray.get([object_refs[i] for i in indices])
assert results == indices
def test_get_multiple_experimental(ray_start_regular_shared):
object_refs = [ray.put(i) for i in range(10)]
object_refs_tuple = tuple(object_refs)
assert ray.experimental.get(object_refs_tuple) == list(range(10))
object_refs_nparray = np.array(object_refs)
assert ray.experimental.get(object_refs_nparray) == list(range(10))
def test_get_dict(ray_start_regular_shared):
d = {str(i): ray.put(i) for i in range(5)}
for i in range(5, 10):
d[str(i)] = i
result = ray.experimental.get(d)
expected = {str(i): i for i in range(10)}
assert result == expected
def test_get_with_timeout(ray_start_regular_shared):
signal = ray.test_utils.SignalActor.remote()
# Check that get() returns early if object is ready.
start = time.time()
ray.get(signal.wait.remote(should_wait=False), timeout=30)
assert time.time() - start < 30
# Check that get() raises a TimeoutError after the timeout if the object
# is not ready yet.
result_id = signal.wait.remote()
with pytest.raises(RayTimeoutError):
ray.get(result_id, timeout=0.1)
# Check that a subsequent get() returns early.
ray.get(signal.send.remote())
start = time.time()
ray.get(result_id, timeout=30)
assert time.time() - start < 30
# https://github.com/ray-project/ray/issues/6329
def test_call_actors_indirect_through_tasks(ray_start_regular_shared):
@ray.remote
class Counter:
def __init__(self, value):
self.value = int(value)
def increase(self, delta):
self.value += int(delta)
return self.value
@ray.remote
def foo(object):
return ray.get(object.increase.remote(1))
@ray.remote
def bar(object):
return ray.get(object.increase.remote(1))
@ray.remote
def zoo(object):
return ray.get(object[0].increase.remote(1))
c = Counter.remote(0)
for _ in range(0, 100):
ray.get(foo.remote(c))
ray.get(bar.remote(c))
ray.get(zoo.remote([c]))
def test_inline_arg_memory_corruption(ray_start_regular_shared):
@ray.remote
def f():
return np.zeros(1000, dtype=np.uint8)
@ray.remote
class Actor:
def __init__(self):
self.z = []
def add(self, x):
self.z.append(x)
for prev in self.z:
assert np.sum(prev) == 0, ("memory corruption detected", prev)
a = Actor.remote()
for i in range(100):
ray.get(a.add.remote(f.remote()))
def test_skip_plasma(ray_start_regular_shared):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self, x):
return x * 2
a = Actor.remote()
obj_ref = a.f.remote(1)
# it is not stored in plasma
assert not ray.worker.global_worker.core_worker.object_exists(obj_ref)
assert ray.get(obj_ref) == 2
def test_actor_large_objects(ray_start_regular_shared):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self):
time.sleep(1)
return np.zeros(10000000)
a = Actor.remote()
obj_ref = a.f.remote()
assert not ray.worker.global_worker.core_worker.object_exists(obj_ref)
done, _ = ray.wait([obj_ref])
assert len(done) == 1
assert ray.worker.global_worker.core_worker.object_exists(obj_ref)
assert isinstance(ray.get(obj_ref), np.ndarray)
def test_actor_pass_by_ref(ray_start_regular_shared):
@ray.remote
class Actor:
def __init__(self):
pass
def f(self, x):
return x * 2
@ray.remote
def f(x):
return x
@ray.remote
def error():
sys.exit(0)
a = Actor.remote()
assert ray.get(a.f.remote(f.remote(1))) == 2
fut = [a.f.remote(f.remote(i)) for i in range(100)]
assert ray.get(fut) == [i * 2 for i in range(100)]
# propagates errors for pass by ref
with pytest.raises(Exception):
ray.get(a.f.remote(error.remote()))
def test_actor_recursive(ray_start_regular_shared):
@ray.remote
class Actor:
def __init__(self, delegate=None):
@@ -548,7 +546,7 @@ def test_actor_recursive(ray_start_regular):
assert result == [x * 2 for x in range(100)]
def test_actor_concurrent(ray_start_regular):
def test_actor_concurrent(ray_start_regular_shared):
@ray.remote
class Batcher:
def __init__(self):
@@ -574,7 +572,7 @@ def test_actor_concurrent(ray_start_regular):
assert r1 == r2 == r3
def test_wait(ray_start_regular):
def test_wait(ray_start_regular_shared):
@ray.remote
def f(delay):
time.sleep(delay)
@@ -621,7 +619,7 @@ def test_wait(ray_start_regular):
ray.wait([1])
def test_duplicate_args(ray_start_regular):
def test_duplicate_args(ray_start_regular_shared):
@ray.remote
def f(arg1,
arg2,
@@ -650,32 +648,6 @@ def test_duplicate_args(ray_start_regular):
arg1, arg2, arg1, kwarg1=arg1, kwarg2=arg2, kwarg1_duplicate=arg1))
def test_internal_config_when_connecting(ray_start_cluster):
config = json.dumps({
"object_pinning_enabled": 0,
"initial_reconstruction_timeout_milliseconds": 200
})
cluster = ray.cluster_utils.Cluster()
cluster.add_node(
_internal_config=config, object_store_memory=100 * 1024 * 1024)
cluster.wait_for_nodes()
# Specifying _internal_config when connecting to a cluster is disallowed.
with pytest.raises(ValueError):
ray.init(address=cluster.address, _internal_config=config)
# Check that the config was picked up (object pinning is disabled).
ray.init(address=cluster.address)
obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
for _ in range(5):
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
# This would not raise an exception if object pinning was enabled.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(obj_ref)
def test_get_correct_node_ip():
with patch("ray.worker") as worker_mock:
node_mock = MagicMock()