From bed1be611e075a1e676c0bc66e26824e728108b2 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Fri, 10 Jul 2020 10:34:23 +0800 Subject: [PATCH] Fix flaky test_dynres.py (#9310) --- python/ray/tests/test_dynres.py | 101 ++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/python/ray/tests/test_dynres.py b/python/ray/tests/test_dynres.py index 38fcaff80..0f6e2663c 100644 --- a/python/ray/tests/test_dynres.py +++ b/python/ray/tests/test_dynres.py @@ -4,7 +4,7 @@ import time import ray import ray.cluster_utils -import ray.test_utils +from ray.test_utils import wait_for_condition logger = logging.getLogger(__name__) @@ -20,11 +20,13 @@ def test_dynamic_res_creation(ray_start_regular): ray.get(set_res.remote(res_name, res_capacity)) - available_res = ray.available_resources() - cluster_res = ray.cluster_resources() + def check_resources(): + available_res = ray.available_resources() + cluster_res = ray.cluster_resources() + return (available_res.get(res_name, None) == res_capacity + and cluster_res.get(res_name, None) == res_capacity) - assert available_res[res_name] == res_capacity - assert cluster_res[res_name] == res_capacity + wait_for_condition(check_resources) def test_dynamic_res_deletion(shutdown_only): @@ -40,13 +42,12 @@ def test_dynamic_res_deletion(shutdown_only): ray.get(delete_res.remote(res_name)) - available_res = ray.available_resources() - cluster_res = ray.cluster_resources() - def check_resources(): + available_res = ray.available_resources() + cluster_res = ray.cluster_resources() return res_name not in available_res and res_name not in cluster_res - ray.test_utils.wait_for_condition(check_resources) + wait_for_condition(check_resources) def test_dynamic_res_infeasible_rescheduling(ray_start_regular): @@ -68,8 +69,11 @@ def test_dynamic_res_infeasible_rescheduling(ray_start_regular): oid = remote_task.remote() # This is infeasible ray.get(set_res.remote(res_name, res_capacity)) # Now should be feasible - available_res = ray.available_resources() - assert available_res[res_name] == res_capacity + def check_resources(): + available_res = ray.available_resources() + return available_res.get(res_name, None) == res_capacity + + wait_for_condition(check_resources) successful, unsuccessful = ray.wait([oid], timeout=1) assert successful # The task completed @@ -101,12 +105,14 @@ def test_dynamic_res_updation_clientid(ray_start_cluster): new_capacity = res_capacity + 1 ray.get(set_res.remote(res_name, new_capacity, target_node_id)) - target_node = next( - node for node in ray.nodes() if node["NodeID"] == target_node_id) - resources = target_node["Resources"] + def check_resources(): + target_node = next( + node for node in ray.nodes() if node["NodeID"] == target_node_id) + resources = target_node["Resources"] - assert res_name in resources - assert resources[res_name] == new_capacity + return (res_name in resources and resources[res_name] == new_capacity) + + wait_for_condition(check_resources) def test_dynamic_res_creation_clientid(ray_start_cluster): @@ -129,12 +135,15 @@ def test_dynamic_res_creation_clientid(ray_start_cluster): resource_name, resource_capacity, client_id=res_client_id) ray.get(set_res.remote(res_name, res_capacity, target_node_id)) - target_node = next( - node for node in ray.nodes() if node["NodeID"] == target_node_id) - resources = target_node["Resources"] - assert res_name in resources - assert resources[res_name] == res_capacity + def check_resources(): + target_node = next( + node for node in ray.nodes() if node["NodeID"] == target_node_id) + resources = target_node["Resources"] + + return (res_name in resources and resources[res_name] == res_capacity) + + wait_for_condition(check_resources) def test_dynamic_res_creation_clientid_multiple(ray_start_cluster): @@ -142,7 +151,6 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster): # specifier cluster = ray_start_cluster - TIMEOUT = 5 res_name = "test_res" res_capacity = 1.0 num_nodes = 3 @@ -163,18 +171,17 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster): results.append(set_res.remote(res_name, res_capacity, nid)) ray.get(results) - success = False - start_time = time.time() - - while time.time() - start_time < TIMEOUT and not success: + def check_resources(): resources_created = [] for nid in target_node_ids: target_node = next( node for node in ray.nodes() if node["NodeID"] == nid) resources = target_node["Resources"] - resources_created.append(resources[res_name] == res_capacity) - success = all(resources_created) - assert success + resources_created.append( + resources.get(res_name, None) == res_capacity) + return all(resources_created) + + wait_for_condition(check_resources) def test_dynamic_res_deletion_clientid(ray_start_cluster): @@ -202,11 +209,13 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster): ray.get(delete_res.remote(res_name, target_node_id)) - target_node = next( - node for node in ray.nodes() if node["NodeID"] == target_node_id) - resources = target_node["Resources"] - print(ray.cluster_resources()) - assert res_name not in resources + def check_resources(): + target_node = next( + node for node in ray.nodes() if node["NodeID"] == target_node_id) + resources = target_node["Resources"] + return res_name not in resources + + wait_for_condition(check_resources) def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster): @@ -276,7 +285,11 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster): # Create the resource on node1 target_node_id = node_ids[1] ray.get(set_res.remote(res_name, res_capacity, target_node_id)) - assert ray.cluster_resources()[res_name] == res_capacity + + def check_resources(): + return ray.cluster_resources().get(res_name, None) == res_capacity + + wait_for_condition(check_resources) # Delete the resource ray.get(delete_res.remote(res_name, target_node_id)) @@ -318,7 +331,11 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster): # Create the resource on node 1 ray.get(set_res.remote(res_name, res_capacity, target_node_id)) - assert ray.cluster_resources()[res_name] == res_capacity + + def check_resources(): + return ray.cluster_resources().get(res_name, None) == res_capacity + + wait_for_condition(check_resources) # Task to hold the resource till the driver signals to finish @ray.remote @@ -404,7 +421,11 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster): # Create the resource on node 1 ray.get(set_res.remote(res_name, res_capacity, target_node_id)) - assert ray.cluster_resources()[res_name] == res_capacity + + def check_resources(): + return ray.cluster_resources().get(res_name, None) == res_capacity + + wait_for_condition(check_resources) # Task to hold the resource till the driver signals to finish @ray.remote @@ -493,7 +514,11 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster): # Create the resource on node 1 ray.get(set_res.remote(res_name, res_capacity, target_node_id)) - assert ray.cluster_resources()[res_name] == res_capacity + + def check_resources(): + return ray.cluster_resources().get(res_name, None) == res_capacity + + wait_for_condition(check_resources) # Task to hold the resource till the driver signals to finish @ray.remote