Fix a few flaky tests (#9709)

Fix test_custom_resources, Remove test_pandas_parquet_serialization, Better error message for test_output.py, Potentially fix test_dynres::test_dynamic_res_creation_scheduler_consistency
This commit is contained in:
Robert Nishihara
2020-07-25 17:11:38 -07:00
committed by GitHub
parent 54a0d8b69e
commit a8efb214de
4 changed files with 10 additions and 33 deletions
+2 -7
View File
@@ -443,19 +443,17 @@ def test_multiple_raylets(ray_start_cluster):
def test_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3, resources={"CustomResource": 0})
cluster.add_node(num_cpus=1, resources={"CustomResource": 0})
custom_resource_node = cluster.add_node(
num_cpus=3, resources={"CustomResource": 1})
num_cpus=1, resources={"CustomResource": 1})
ray.init(address=cluster.address)
@ray.remote
def f():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def g():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
@@ -463,9 +461,6 @@ def test_custom_resources(ray_start_cluster):
ray.get([f.remote() for _ in range(5)])
return ray.worker.global_worker.node.unique_id
# The f tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(500)]))) == 2
# The g tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([g.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
-20
View File
@@ -2,11 +2,9 @@
import glob
import logging
import os
import shutil
import json
import sys
import socket
import tempfile
import time
import numpy as np
@@ -416,24 +414,6 @@ def test_ray_stack(ray_start_2_cpus):
"'ray stack'")
def test_pandas_parquet_serialization():
# Only test this if pandas is installed
pytest.importorskip("pandas")
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
tempdir = tempfile.mkdtemp()
filename = os.path.join(tempdir, "parquet-test")
pd.DataFrame({"col1": [0, 1], "col2": [0, 1]}).to_parquet(filename)
with open(os.path.join(tempdir, "parquet-compression"), "wb") as f:
table = pa.Table.from_arrays([pa.array([1, 2, 3])], ["hello"])
pq.write_table(table, f, compression="lz4")
# Clean up
shutil.rmtree(tempdir)
def test_socket_dir_not_existing(shutdown_only):
if sys.platform != "win32":
random_name = ray.ObjectRef.from_random().hex()
+5 -4
View File
@@ -286,14 +286,15 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster):
target_node_id = node_ids[1]
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
def check_resources():
return ray.cluster_resources().get(res_name, None) == res_capacity
wait_for_condition(check_resources)
wait_for_condition(
lambda: ray.cluster_resources().get(res_name, None) == res_capacity)
# Delete the resource
ray.get(delete_res.remote(res_name, target_node_id))
wait_for_condition(
lambda: ray.cluster_resources().get(res_name, None) is None)
# Define a task which requires this resource. This should not run
@ray.remote(resources={res_name: res_capacity})
def test_func():
+3 -2
View File
@@ -6,11 +6,12 @@ import ray
def test_output():
# Use subprocess to execute the __main__ below.
outputs = subprocess.check_output(
[sys.executable, __file__, "_ray_instance"],
stderr=subprocess.STDOUT).decode()
lines = outputs.split("\n")
assert len(lines) == 3
assert len(lines) == 3, lines
logging_header = r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\sINFO\s"
assert re.match(
logging_header + r"resource_spec.py:\d+ -- Starting Ray with [0-9\.]+ "
@@ -24,7 +25,7 @@ def test_output():
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "_ray_instance":
ray.init()
ray.init(num_cpus=1)
ray.shutdown()
else:
sys.exit(pytest.main(["-v", __file__]))