diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 601d6ce3b..ba0ef75db 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -8,6 +8,7 @@ import warnings import ray from ray.cluster_utils import Cluster +from ray.test_utils import wait_for_condition if (multiprocessing.cpu_count() < 40 or ray.utils.get_system_memory() < 50 * 10**9): @@ -200,10 +201,10 @@ def test_actor_broadcast(ray_start_cluster_with_resource): def test_object_transfer_retry(ray_start_cluster): cluster = ray_start_cluster - repeated_push_delay = 4 + repeated_push_delay = 1 # Force the sending object manager to allow duplicate pushes again sooner. - # Also, force the receiving object manager to retry the Pull sooner. We + # Also, force the receiving object manager to retry the pull sooner. We # make the chunk size smaller in order to make it easier to test objects # with multiple chunks. config = json.dumps({ @@ -224,57 +225,45 @@ def test_object_transfer_retry(ray_start_cluster): # Transfer an object to warm up the object manager. ray.get(f.remote(10**6)) - x_ids = [f.remote(10**i) for i in [6]] - assert not any( - ray.worker.global_worker.core_worker.object_exists(x_id) - for x_id in x_ids) + x_id = f.remote(10**6) + assert not ray.worker.global_worker.core_worker.object_exists(x_id) # Get the objects locally to cause them to be transferred. This is the # first time the objects are getting transferred, so it should happen # quickly. start_time = time.time() - xs = ray.get(x_ids) + ray.get(x_id) end_time = time.time() if end_time - start_time > repeated_push_delay: warnings.warn("The initial transfer took longer than the repeated " "push delay, so this test may not be testing the thing " "it's supposed to test.") - # Cause all objects to be flushed. - del xs - x = np.zeros(object_store_memory // 10, dtype=np.uint8) - for _ in range(15): - ray.put(x) - assert not any( - ray.worker.global_worker.core_worker.object_exists(x_id) - for x_id in x_ids) + def not_exists(): + return not ray.worker.global_worker.core_worker.object_exists(x_id) - end_time = time.time() - # Make sure that the first time the objects get transferred, it happens - # quickly. - assert end_time - start_time < repeated_push_delay + def force_eviction(): + for _ in range(20): + ray.put(np.zeros(object_store_memory // 10, dtype=np.uint8)) + wait_for_condition(not_exists) - # Get the objects again and make sure they get transferred. - xs = ray.get(x_ids) + # Force the object to be evicted from the local node. + force_eviction() + + # Get the object again and make sure it gets transferred. + ray.get(x_id) end_transfer_time = time.time() # We should have had to wait for the repeated push delay. assert end_transfer_time - start_time >= repeated_push_delay - # Flush the objects again and wait longer than the repeated push delay and - # make sure that the objects are transferred again. - del xs - for _ in range(15): - ray.put(x) - assert not any( - ray.worker.global_worker.core_worker.object_exists(x_id) - for x_id in x_ids) - + # Force the object to be evicted again and wait longer than the repeated + # push delay and make sure that the object is transferred again. + force_eviction() time.sleep(repeated_push_delay) - # Get the objects locally to cause them to be transferred. This should - # happen quickly. + # Fetch the object again. This should not wait for the delay. start_time = time.time() - ray.get(x_ids) + ray.get(x_id) end_time = time.time() assert end_time - start_time < repeated_push_delay