Fix flaky test_dynres.py (#9310)

This commit is contained in:
Hao Chen
2020-07-10 10:34:23 +08:00
committed by GitHub
parent dd2cc6eb48
commit bed1be611e
+63 -38
View File
@@ -4,7 +4,7 @@ import time
import ray
import ray.cluster_utils
import ray.test_utils
from ray.test_utils import wait_for_condition
logger = logging.getLogger(__name__)
@@ -20,11 +20,13 @@ def test_dynamic_res_creation(ray_start_regular):
ray.get(set_res.remote(res_name, res_capacity))
available_res = ray.available_resources()
cluster_res = ray.cluster_resources()
def check_resources():
available_res = ray.available_resources()
cluster_res = ray.cluster_resources()
return (available_res.get(res_name, None) == res_capacity
and cluster_res.get(res_name, None) == res_capacity)
assert available_res[res_name] == res_capacity
assert cluster_res[res_name] == res_capacity
wait_for_condition(check_resources)
def test_dynamic_res_deletion(shutdown_only):
@@ -40,13 +42,12 @@ def test_dynamic_res_deletion(shutdown_only):
ray.get(delete_res.remote(res_name))
available_res = ray.available_resources()
cluster_res = ray.cluster_resources()
def check_resources():
available_res = ray.available_resources()
cluster_res = ray.cluster_resources()
return res_name not in available_res and res_name not in cluster_res
ray.test_utils.wait_for_condition(check_resources)
wait_for_condition(check_resources)
def test_dynamic_res_infeasible_rescheduling(ray_start_regular):
@@ -68,8 +69,11 @@ def test_dynamic_res_infeasible_rescheduling(ray_start_regular):
oid = remote_task.remote() # This is infeasible
ray.get(set_res.remote(res_name, res_capacity)) # Now should be feasible
available_res = ray.available_resources()
assert available_res[res_name] == res_capacity
def check_resources():
available_res = ray.available_resources()
return available_res.get(res_name, None) == res_capacity
wait_for_condition(check_resources)
successful, unsuccessful = ray.wait([oid], timeout=1)
assert successful # The task completed
@@ -101,12 +105,14 @@ def test_dynamic_res_updation_clientid(ray_start_cluster):
new_capacity = res_capacity + 1
ray.get(set_res.remote(res_name, new_capacity, target_node_id))
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
def check_resources():
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
assert res_name in resources
assert resources[res_name] == new_capacity
return (res_name in resources and resources[res_name] == new_capacity)
wait_for_condition(check_resources)
def test_dynamic_res_creation_clientid(ray_start_cluster):
@@ -129,12 +135,15 @@ def test_dynamic_res_creation_clientid(ray_start_cluster):
resource_name, resource_capacity, client_id=res_client_id)
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
assert res_name in resources
assert resources[res_name] == res_capacity
def check_resources():
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
return (res_name in resources and resources[res_name] == res_capacity)
wait_for_condition(check_resources)
def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
@@ -142,7 +151,6 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
# specifier
cluster = ray_start_cluster
TIMEOUT = 5
res_name = "test_res"
res_capacity = 1.0
num_nodes = 3
@@ -163,18 +171,17 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
results.append(set_res.remote(res_name, res_capacity, nid))
ray.get(results)
success = False
start_time = time.time()
while time.time() - start_time < TIMEOUT and not success:
def check_resources():
resources_created = []
for nid in target_node_ids:
target_node = next(
node for node in ray.nodes() if node["NodeID"] == nid)
resources = target_node["Resources"]
resources_created.append(resources[res_name] == res_capacity)
success = all(resources_created)
assert success
resources_created.append(
resources.get(res_name, None) == res_capacity)
return all(resources_created)
wait_for_condition(check_resources)
def test_dynamic_res_deletion_clientid(ray_start_cluster):
@@ -202,11 +209,13 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster):
ray.get(delete_res.remote(res_name, target_node_id))
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
print(ray.cluster_resources())
assert res_name not in resources
def check_resources():
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
return res_name not in resources
wait_for_condition(check_resources)
def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster):
@@ -276,7 +285,11 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster):
# Create the resource on node1
target_node_id = node_ids[1]
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
def check_resources():
return ray.cluster_resources().get(res_name, None) == res_capacity
wait_for_condition(check_resources)
# Delete the resource
ray.get(delete_res.remote(res_name, target_node_id))
@@ -318,7 +331,11 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster):
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
def check_resources():
return ray.cluster_resources().get(res_name, None) == res_capacity
wait_for_condition(check_resources)
# Task to hold the resource till the driver signals to finish
@ray.remote
@@ -404,7 +421,11 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
def check_resources():
return ray.cluster_resources().get(res_name, None) == res_capacity
wait_for_condition(check_resources)
# Task to hold the resource till the driver signals to finish
@ray.remote
@@ -493,7 +514,11 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
def check_resources():
return ray.cluster_resources().get(res_name, None) == res_capacity
wait_for_condition(check_resources)
# Task to hold the resource till the driver signals to finish
@ray.remote