mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 22:18:31 +08:00
Optimizes checking ready state of sources during sorting.
The main bottle neck here was using `len`. A boolean check is a sufficient test for more items in the queue. Also, uses all instead of several functions.
This commit is contained in:
@@ -23,9 +23,7 @@ from collections import deque
|
||||
from zipline import ndict
|
||||
from zipline.gens.sort import (
|
||||
date_sort,
|
||||
ready,
|
||||
done,
|
||||
queue_is_ready,
|
||||
queue_is_done
|
||||
)
|
||||
from zipline.gens.utils import alternate, done_message
|
||||
@@ -44,21 +42,17 @@ class HelperTestCase(TestCase):
|
||||
def test_individual_queue_logic(self):
|
||||
queue = deque()
|
||||
# Empty queues are neither done nor ready.
|
||||
assert not queue_is_ready(queue)
|
||||
assert not queue_is_done(queue)
|
||||
|
||||
queue.append(to_dt('foo'))
|
||||
assert queue_is_ready(queue)
|
||||
assert not queue_is_done(queue)
|
||||
|
||||
queue.appendleft(to_dt('DONE'))
|
||||
assert queue_is_ready(queue)
|
||||
|
||||
# Checking done when we have a message after done will trip an assert.
|
||||
self.assertRaises(AssertionError, queue_is_done, queue)
|
||||
|
||||
queue.pop()
|
||||
assert queue_is_ready(queue)
|
||||
assert queue_is_done(queue)
|
||||
|
||||
def test_pop_logic(self):
|
||||
@@ -67,32 +61,27 @@ class HelperTestCase(TestCase):
|
||||
for id in ids:
|
||||
sources[id] = deque()
|
||||
|
||||
assert not ready(sources)
|
||||
assert not done(sources)
|
||||
|
||||
# All sources must have a message to be ready/done
|
||||
sources['a'].append(to_dt("datetime"))
|
||||
assert not ready(sources)
|
||||
assert not done(sources)
|
||||
sources['a'].pop()
|
||||
|
||||
for id in ids:
|
||||
sources[id].append(to_dt("datetime"))
|
||||
|
||||
assert ready(sources)
|
||||
assert not done(sources)
|
||||
|
||||
for id in ids:
|
||||
sources[id].appendleft(to_dt("DONE"))
|
||||
|
||||
# ["DONE", message] will trip an assert in queue_is_done.
|
||||
assert ready(sources)
|
||||
self.assertRaises(AssertionError, done, sources)
|
||||
|
||||
for id in ids:
|
||||
sources[id].pop()
|
||||
|
||||
assert ready(sources)
|
||||
assert done(sources)
|
||||
|
||||
|
||||
|
||||
+1
-16
@@ -56,7 +56,7 @@ def date_sort(stream_in, source_ids):
|
||||
# Only pop messages when we have a pending message from
|
||||
# all datasources. Stop if all sources have signalled done.
|
||||
|
||||
while ready(sources) and not done(sources):
|
||||
while all(sources.values()) and not done(sources):
|
||||
message = pop_oldest(sources)
|
||||
assert_sort_protocol(message)
|
||||
yield message
|
||||
@@ -68,21 +68,6 @@ def date_sort(stream_in, source_ids):
|
||||
"Bad last message in date_sort on exit: %s" % queue
|
||||
|
||||
|
||||
def ready(sources):
|
||||
"""
|
||||
Feed is ready when every internal queue has at least one
|
||||
message. Note that this include DONE messages, so done(sources) is
|
||||
True only if ready(sources).
|
||||
"""
|
||||
assert isinstance(sources, dict)
|
||||
return all((queue_is_ready(source) for source in sources.itervalues()))
|
||||
|
||||
|
||||
def queue_is_ready(queue):
|
||||
assert isinstance(queue, deque)
|
||||
return len(queue) > 0
|
||||
|
||||
|
||||
def done(sources):
|
||||
"""Feed is done when all internal queues have only a "DONE" message."""
|
||||
assert isinstance(sources, dict)
|
||||
|
||||
Reference in New Issue
Block a user