Fix bug in redis module tests. (#292)

* Fix bug in redis module tests.

* Sleep while waiting for next message.
This commit is contained in:
Robert Nishihara
2017-02-18 00:55:57 -08:00
committed by Philipp Moritz
parent abd9987e3b
commit 124baa7472
+20 -9
View File
@@ -40,6 +40,17 @@ def integerToAsciiHex(num, numbytes):
return retstr
def get_next_message(pubsub_client, timeout_seconds=10):
"""Block until the next message is available on the pubsub channel."""
start_time = time.time()
while True:
message = pubsub_client.get_message()
if message is not None:
return message
time.sleep(0.1)
if time.time() - start_time > timeout_seconds:
raise Exception("Timed out while waiting for next message.")
class TestGlobalStateStore(unittest.TestCase):
def setUp(self):
@@ -138,10 +149,10 @@ class TestGlobalStateStore(unittest.TestCase):
p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", data_size, "hash1", "manager_id2")
# Receive the acknowledgement message.
self.assertEqual(p.get_message()["data"], 1)
self.assertEqual(get_next_message(p)["data"], 1)
# Request a notification and receive the data.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1")
self.assertEqual(p.get_message()["data"], b"object_id1 %s MANAGERS manager_id2"\
self.assertEqual(get_next_message(p)["data"], b"object_id1 %s MANAGERS manager_id2"\
%integerToAsciiHex(data_size, 8))
# Request a notification for an object that isn't there. Then add the object
# and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should
@@ -150,14 +161,14 @@ class TestGlobalStateStore(unittest.TestCase):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id2")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id3")
self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1"\
self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1"\
%integerToAsciiHex(data_size, 8))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", data_size, "hash1", "manager_id3")
self.assertEqual(p.get_message()["data"], b"object_id2 %s MANAGERS manager_id3"\
self.assertEqual(get_next_message(p)["data"], b"object_id2 %s MANAGERS manager_id3"\
%integerToAsciiHex(data_size, 8))
# Request notifications for object_id3 again.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3")
self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\
self.assertEqual(get_next_message(p)["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\
%integerToAsciiHex(data_size, 8))
def testResultTableAddAndLookup(self):
@@ -262,12 +273,12 @@ class TestGlobalStateStore(unittest.TestCase):
task_args = [b"task_id", scheduling_state, node_id.encode("ascii"), b"task_spec"]
self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args)
# Receive the acknowledgement message.
self.assertEqual(p.get_message()["data"], 1)
self.assertEqual(p.get_message()["data"], 2)
self.assertEqual(p.get_message()["data"], 3)
self.assertEqual(get_next_message(p)["data"], 1)
self.assertEqual(get_next_message(p)["data"], 2)
self.assertEqual(get_next_message(p)["data"], 3)
# Receive the actual data.
for i in range(3):
message = p.get_message()["data"]
message = get_next_message(p)["data"]
message = message.split()
message[1] = int(message[1])
self.assertEqual(message, task_args)