mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:23:10 +08:00
Fix flaky test_object_manager.py (#9472)
This commit is contained in:
@@ -8,6 +8,7 @@ import warnings
|
||||
|
||||
import ray
|
||||
from ray.cluster_utils import Cluster
|
||||
from ray.test_utils import wait_for_condition
|
||||
|
||||
if (multiprocessing.cpu_count() < 40
|
||||
or ray.utils.get_system_memory() < 50 * 10**9):
|
||||
@@ -200,10 +201,10 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
|
||||
def test_object_transfer_retry(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
|
||||
repeated_push_delay = 4
|
||||
repeated_push_delay = 1
|
||||
|
||||
# Force the sending object manager to allow duplicate pushes again sooner.
|
||||
# Also, force the receiving object manager to retry the Pull sooner. We
|
||||
# Also, force the receiving object manager to retry the pull sooner. We
|
||||
# make the chunk size smaller in order to make it easier to test objects
|
||||
# with multiple chunks.
|
||||
config = json.dumps({
|
||||
@@ -224,57 +225,45 @@ def test_object_transfer_retry(ray_start_cluster):
|
||||
# Transfer an object to warm up the object manager.
|
||||
ray.get(f.remote(10**6))
|
||||
|
||||
x_ids = [f.remote(10**i) for i in [6]]
|
||||
assert not any(
|
||||
ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
for x_id in x_ids)
|
||||
x_id = f.remote(10**6)
|
||||
assert not ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
|
||||
# Get the objects locally to cause them to be transferred. This is the
|
||||
# first time the objects are getting transferred, so it should happen
|
||||
# quickly.
|
||||
start_time = time.time()
|
||||
xs = ray.get(x_ids)
|
||||
ray.get(x_id)
|
||||
end_time = time.time()
|
||||
if end_time - start_time > repeated_push_delay:
|
||||
warnings.warn("The initial transfer took longer than the repeated "
|
||||
"push delay, so this test may not be testing the thing "
|
||||
"it's supposed to test.")
|
||||
|
||||
# Cause all objects to be flushed.
|
||||
del xs
|
||||
x = np.zeros(object_store_memory // 10, dtype=np.uint8)
|
||||
for _ in range(15):
|
||||
ray.put(x)
|
||||
assert not any(
|
||||
ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
for x_id in x_ids)
|
||||
def not_exists():
|
||||
return not ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
|
||||
end_time = time.time()
|
||||
# Make sure that the first time the objects get transferred, it happens
|
||||
# quickly.
|
||||
assert end_time - start_time < repeated_push_delay
|
||||
def force_eviction():
|
||||
for _ in range(20):
|
||||
ray.put(np.zeros(object_store_memory // 10, dtype=np.uint8))
|
||||
wait_for_condition(not_exists)
|
||||
|
||||
# Get the objects again and make sure they get transferred.
|
||||
xs = ray.get(x_ids)
|
||||
# Force the object to be evicted from the local node.
|
||||
force_eviction()
|
||||
|
||||
# Get the object again and make sure it gets transferred.
|
||||
ray.get(x_id)
|
||||
end_transfer_time = time.time()
|
||||
# We should have had to wait for the repeated push delay.
|
||||
assert end_transfer_time - start_time >= repeated_push_delay
|
||||
|
||||
# Flush the objects again and wait longer than the repeated push delay and
|
||||
# make sure that the objects are transferred again.
|
||||
del xs
|
||||
for _ in range(15):
|
||||
ray.put(x)
|
||||
assert not any(
|
||||
ray.worker.global_worker.core_worker.object_exists(x_id)
|
||||
for x_id in x_ids)
|
||||
|
||||
# Force the object to be evicted again and wait longer than the repeated
|
||||
# push delay and make sure that the object is transferred again.
|
||||
force_eviction()
|
||||
time.sleep(repeated_push_delay)
|
||||
|
||||
# Get the objects locally to cause them to be transferred. This should
|
||||
# happen quickly.
|
||||
# Fetch the object again. This should not wait for the delay.
|
||||
start_time = time.time()
|
||||
ray.get(x_ids)
|
||||
ray.get(x_id)
|
||||
end_time = time.time()
|
||||
assert end_time - start_time < repeated_push_delay
|
||||
|
||||
|
||||
Reference in New Issue
Block a user