From 124baa747268769083d5fc2661d36df89031b655 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 18 Feb 2017 00:55:57 -0800 Subject: [PATCH] Fix bug in redis module tests. (#292) * Fix bug in redis module tests. * Sleep while waiting for next message. --- python/common/redis_module/runtest.py | 29 ++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/python/common/redis_module/runtest.py b/python/common/redis_module/runtest.py index 0cfc02169..cf1a227a9 100644 --- a/python/common/redis_module/runtest.py +++ b/python/common/redis_module/runtest.py @@ -40,6 +40,17 @@ def integerToAsciiHex(num, numbytes): return retstr +def get_next_message(pubsub_client, timeout_seconds=10): + """Block until the next message is available on the pubsub channel.""" + start_time = time.time() + while True: + message = pubsub_client.get_message() + if message is not None: + return message + time.sleep(0.1) + if time.time() - start_time > timeout_seconds: + raise Exception("Timed out while waiting for next message.") + class TestGlobalStateStore(unittest.TestCase): def setUp(self): @@ -138,10 +149,10 @@ class TestGlobalStateStore(unittest.TestCase): p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX)) self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", data_size, "hash1", "manager_id2") # Receive the acknowledgement message. - self.assertEqual(p.get_message()["data"], 1) + self.assertEqual(get_next_message(p)["data"], 1) # Request a notification and receive the data. self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1") - self.assertEqual(p.get_message()["data"], b"object_id1 %s MANAGERS manager_id2"\ + self.assertEqual(get_next_message(p)["data"], b"object_id1 %s MANAGERS manager_id2"\ %integerToAsciiHex(data_size, 8)) # Request a notification for an object that isn't there. Then add the object # and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should @@ -150,14 +161,14 @@ class TestGlobalStateStore(unittest.TestCase): self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id1") self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id2") self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id3") - self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1"\ + self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1"\ %integerToAsciiHex(data_size, 8)) self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", data_size, "hash1", "manager_id3") - self.assertEqual(p.get_message()["data"], b"object_id2 %s MANAGERS manager_id3"\ + self.assertEqual(get_next_message(p)["data"], b"object_id2 %s MANAGERS manager_id3"\ %integerToAsciiHex(data_size, 8)) # Request notifications for object_id3 again. self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3") - self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\ + self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\ %integerToAsciiHex(data_size, 8)) def testResultTableAddAndLookup(self): @@ -262,12 +273,12 @@ class TestGlobalStateStore(unittest.TestCase): task_args = [b"task_id", scheduling_state, node_id.encode("ascii"), b"task_spec"] self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args) # Receive the acknowledgement message. - self.assertEqual(p.get_message()["data"], 1) - self.assertEqual(p.get_message()["data"], 2) - self.assertEqual(p.get_message()["data"], 3) + self.assertEqual(get_next_message(p)["data"], 1) + self.assertEqual(get_next_message(p)["data"], 2) + self.assertEqual(get_next_message(p)["data"], 3) # Receive the actual data. for i in range(3): - message = p.get_message()["data"] + message = get_next_message(p)["data"] message = message.split() message[1] = int(message[1]) self.assertEqual(message, task_args)