diff --git a/tests/test_sorting.py b/tests/test_sorting.py index fb836c4c..a1034bf1 100644 --- a/tests/test_sorting.py +++ b/tests/test_sorting.py @@ -23,9 +23,7 @@ from collections import deque from zipline import ndict from zipline.gens.sort import ( date_sort, - ready, done, - queue_is_ready, queue_is_done ) from zipline.gens.utils import alternate, done_message @@ -44,21 +42,17 @@ class HelperTestCase(TestCase): def test_individual_queue_logic(self): queue = deque() # Empty queues are neither done nor ready. - assert not queue_is_ready(queue) assert not queue_is_done(queue) queue.append(to_dt('foo')) - assert queue_is_ready(queue) assert not queue_is_done(queue) queue.appendleft(to_dt('DONE')) - assert queue_is_ready(queue) # Checking done when we have a message after done will trip an assert. self.assertRaises(AssertionError, queue_is_done, queue) queue.pop() - assert queue_is_ready(queue) assert queue_is_done(queue) def test_pop_logic(self): @@ -67,32 +61,27 @@ class HelperTestCase(TestCase): for id in ids: sources[id] = deque() - assert not ready(sources) assert not done(sources) # All sources must have a message to be ready/done sources['a'].append(to_dt("datetime")) - assert not ready(sources) assert not done(sources) sources['a'].pop() for id in ids: sources[id].append(to_dt("datetime")) - assert ready(sources) assert not done(sources) for id in ids: sources[id].appendleft(to_dt("DONE")) # ["DONE", message] will trip an assert in queue_is_done. - assert ready(sources) self.assertRaises(AssertionError, done, sources) for id in ids: sources[id].pop() - assert ready(sources) assert done(sources) diff --git a/zipline/gens/sort.py b/zipline/gens/sort.py index 53261c4d..8afd6031 100644 --- a/zipline/gens/sort.py +++ b/zipline/gens/sort.py @@ -56,7 +56,7 @@ def date_sort(stream_in, source_ids): # Only pop messages when we have a pending message from # all datasources. Stop if all sources have signalled done. - while ready(sources) and not done(sources): + while all(sources.values()) and not done(sources): message = pop_oldest(sources) assert_sort_protocol(message) yield message @@ -68,21 +68,6 @@ def date_sort(stream_in, source_ids): "Bad last message in date_sort on exit: %s" % queue -def ready(sources): - """ - Feed is ready when every internal queue has at least one - message. Note that this include DONE messages, so done(sources) is - True only if ready(sources). - """ - assert isinstance(sources, dict) - return all((queue_is_ready(source) for source in sources.itervalues())) - - -def queue_is_ready(queue): - assert isinstance(queue, deque) - return len(queue) > 0 - - def done(sources): """Feed is done when all internal queues have only a "DONE" message.""" assert isinstance(sources, dict)