[Streaming] refine streaming tests sleep condition (#10991)

This commit is contained in:
chaokunyang
2020-09-24 17:06:55 +08:00
committed by GitHub
parent f42ab54112
commit 842861b4fc
+11 -7
View File
@@ -1,6 +1,7 @@
import os
import ray
from ray.streaming import StreamingContext
from ray.test_utils import wait_for_condition
def test_word_count():
@@ -45,14 +46,17 @@ def test_simple_word_count():
(old_value[0], old_value[1] + new_value[1])) \
.sink(sink_func)
ctx.submit("word_count")
import time
time.sleep(3)
def check_succeed():
if os.path.exists(sink_file):
with open(sink_file, "r") as f:
result = f.read()
return "a:2" in result and "b:2" in result and "c:2" in result
return False
wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000)
print("Execution succeed")
ray.shutdown()
with open(sink_file, "r") as f:
result = f.read()
assert "a:2" in result
assert "b:2" in result
assert "c:2" in result
if __name__ == "__main__":