diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 0b8ae1d9d..44618f472 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -579,7 +579,7 @@ def test_shutdown(serve_instance): pass return True - assert wait_for_condition(check_dead) + wait_for_condition(check_dead) def test_shadow_traffic(serve_instance): @@ -622,7 +622,7 @@ def test_shadow_traffic(serve_instance): requests_to_backend("backend4") > 0, ]) - assert wait_for_condition(check_requests) + wait_for_condition(check_requests) if __name__ == "__main__": diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index d2b538c12..35114d8ee 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -239,22 +239,22 @@ def wait_for_errors(error_type, num_errors, timeout=20): def wait_for_condition(condition_predictor, timeout=30, retry_interval_ms=100): - """A helper function that waits until a condition is met. + """Wait until a condition is met or time out with an exception. Args: condition_predictor: A function that predicts the condition. timeout: Maximum timeout in seconds. retry_interval_ms: Retry interval in milliseconds. - Return: - Whether the condition is met within the timeout. + Raises: + RuntimeError: If the condition is not met before the timeout expires. """ start = time.time() while time.time() - start <= timeout: if condition_predictor(): - return True + return time.sleep(retry_interval_ms / 1000.0) - return False + raise RuntimeError("The condition wasn't met before the timeout expired.") def wait_until_succeeded_without_exception(func, diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 3f87730f4..6d5db8f94 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -362,14 +362,14 @@ def test_actor_restart_without_task(ray_start_regular): ray.experimental.set_resource("actor", 1) actor = RestartableActor.remote() - assert wait_for_condition(lambda: not actor_resource_available()) + wait_for_condition(lambda: not actor_resource_available()) # Kill the actor. pid = ray.get(actor.get_pid.remote()) p = probe.remote() os.kill(pid, SIGKILL) ray.get(p) - assert wait_for_condition(lambda: not actor_resource_available()) + wait_for_condition(lambda: not actor_resource_available()) def test_caller_actor_restart(ray_start_regular): @@ -869,7 +869,7 @@ def test_ray_wait_dead_actor(ray_start_cluster): cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1]) # Repeatedly submit tasks and call ray.wait until the exception for the # dead actor is received. - assert wait_for_condition(actor_dead) + wait_for_condition(actor_dead) # Create an actor on the local node that will call ray.wait in a loop. head_node_resource = "HEAD_NODE" @@ -889,7 +889,7 @@ def test_ray_wait_dead_actor(ray_start_cluster): # Repeatedly call ray.wait through the local actor until the exception for # the dead actor is received. parent_actor = ParentActor.remote() - assert wait_for_condition(lambda: ray.get(parent_actor.wait.remote())) + wait_for_condition(lambda: ray.get(parent_actor.wait.remote())) @pytest.mark.parametrize( diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 7ec85ba20..16ffbe02e 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1098,21 +1098,21 @@ def test_fate_sharing(ray_start_cluster, use_actors, node_failure): pid = ray.get(a.get_pid.remote()) a.start_child.remote(use_actors=use_actors) # Wait for the child to be scheduled. - assert wait_for_condition(lambda: not child_resource_available()) + wait_for_condition(lambda: not child_resource_available()) # Kill the parent process. os.kill(pid, 9) - assert wait_for_condition(child_resource_available) + wait_for_condition(child_resource_available) # Test fate sharing if the parent node dies. def test_node_failure(node_to_kill, use_actors): a = Actor.options(resources={"parent": 1}).remote() a.start_child.remote(use_actors=use_actors) # Wait for the child to be scheduled. - assert wait_for_condition(lambda: not child_resource_available()) + wait_for_condition(lambda: not child_resource_available()) # Kill the parent process. cluster.remove_node(node_to_kill, allow_graceful=False) node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1}) - assert wait_for_condition(child_resource_available) + wait_for_condition(child_resource_available) return node_to_kill if node_failure: diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 71ea68a93..367f668ae 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -119,7 +119,7 @@ def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head): return False # Wait for the removed node dead. - assert wait_for_condition(condition, timeout=10) + wait_for_condition(condition, timeout=10) if __name__ == "__main__": diff --git a/python/ray/tests/test_global_gc.py b/python/ray/tests/test_global_gc.py index 2c8684b81..76512f691 100644 --- a/python/ray/tests/test_global_gc.py +++ b/python/ray/tests/test_global_gc.py @@ -53,7 +53,7 @@ def test_global_gc(shutdown_only): return (local_ref() is None and not any(ray.get([a.has_garbage.remote() for a in actors]))) - assert wait_for_condition(check_refs_gced) + wait_for_condition(check_refs_gced) finally: gc.enable() @@ -105,7 +105,7 @@ def test_global_gc_when_full(shutdown_only): return (local_ref() is None and not any(ray.get([a.has_garbage.remote() for a in actors]))) - assert wait_for_condition(check_refs_gced) + wait_for_condition(check_refs_gced) # Local driver. local_ref = weakref.ref(LargeObjectWithCyclicRef()) @@ -124,7 +124,7 @@ def test_global_gc_when_full(shutdown_only): return (local_ref() is None and not any(ray.get([a.has_garbage.remote() for a in actors]))) - assert wait_for_condition(check_refs_gced) + wait_for_condition(check_refs_gced) finally: gc.enable() diff --git a/python/ray/tests/test_job.py b/python/ray/tests/test_job.py index b38e21a59..7f953d48d 100644 --- a/python/ray/tests/test_job.py +++ b/python/ray/tests/test_job.py @@ -46,7 +46,7 @@ _ = Actor.remote() else: return False - assert wait_for_condition(actor_finish) + wait_for_condition(actor_finish) def test_job_gc_with_detached_actor(call_ray_start): diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 55087cf20..81b4d5363 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -444,7 +444,7 @@ def test_memory_dashboard(shutdown_only): stop_memory_table() return True - def test_object_pineed_in_memory(): + def test_object_pinned_in_memory(): a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) b = ray.get(a) # Noqa F841 @@ -470,8 +470,8 @@ def test_memory_dashboard(shutdown_only): def f(arg): time.sleep(1) - a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) # Noqa F841 - b = f.remote(a) # Noqa F841 + a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) + b = f.remote(a) wait_for_condition(memory_table_ready) memory_table = get_memory_table() @@ -492,7 +492,7 @@ def test_memory_dashboard(shutdown_only): def f(arg): time.sleep(1) - a = ray.put(None) # Noqa F841 + a = ray.put(None) b = f.remote([a]) # Noqa F841 wait_for_condition(memory_table_ready) @@ -551,30 +551,27 @@ def test_memory_dashboard(shutdown_only): # These tests should be retried because it takes at least one second # to get the fresh new memory table. It is because memory table is updated # Whenever raylet and node info is renewed which takes 1 second. - assert (wait_for_condition( - test_local_reference, timeout=30000, retry_interval_ms=1000) is True) + wait_for_condition( + test_local_reference, timeout=30000, retry_interval_ms=1000) - assert (wait_for_condition( - test_object_pineed_in_memory, timeout=30000, retry_interval_ms=1000) is - True) + wait_for_condition( + test_object_pinned_in_memory, timeout=30000, retry_interval_ms=1000) - assert (wait_for_condition( - test_pending_task_references, timeout=30000, retry_interval_ms=1000) is - True) + wait_for_condition( + test_pending_task_references, timeout=30000, retry_interval_ms=1000) - assert (wait_for_condition( + wait_for_condition( test_serialized_object_ref_reference, timeout=30000, - retry_interval_ms=1000) is True) + retry_interval_ms=1000) - assert (wait_for_condition( + wait_for_condition( test_captured_object_ref_reference, timeout=30000, - retry_interval_ms=1000) is True) + retry_interval_ms=1000) - assert (wait_for_condition( - test_actor_handle_reference, timeout=30000, retry_interval_ms=1000) is - True) + wait_for_condition( + test_actor_handle_reference, timeout=30000, retry_interval_ms=1000) """Memory Table Unit Test""" diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 8b65f2d21..a3af16baa 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -46,7 +46,7 @@ def test_cached_object(ray_start_cluster): cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) - assert wait_for_condition( + wait_for_condition( lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) for _ in range(20): @@ -101,7 +101,7 @@ def test_reconstruction_cached_dependency(ray_start_cluster, cluster.remove_node(node_to_kill, allow_graceful=False) cluster.add_node( num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) - assert wait_for_condition( + wait_for_condition( lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) for _ in range(20): @@ -296,6 +296,7 @@ def test_basic_reconstruction_actor_task(ray_start_cluster, pid = ray.get(a.pid.remote()) +@pytest.mark.skipif(sys.platform == "win32", reason="Test failing on Windows.") @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_basic_reconstruction_actor_constructor(ray_start_cluster, reconstruction_enabled): diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index 20be90f89..908d65b40 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -182,7 +182,7 @@ def test_pass_returned_object_ref(one_worker_100MiB, use_ray_put, failure): inner_oid = ray.ObjectRef(inner_oid_binary) return not worker.core_worker.object_exists(inner_oid) - assert wait_for_condition(ref_not_exists) + wait_for_condition(ref_not_exists) # Call a recursive chain of tasks that pass a serialized reference that was