mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 01:04:13 +08:00
Merge pull request #97 from quantopian/fawce_alpha750_bugs
Fawce alpha750 bugs
This commit is contained in:
@@ -27,12 +27,11 @@ def date_sort(stream_in, source_ids):
|
||||
sources[id] = deque()
|
||||
|
||||
# Process incoming streams.
|
||||
log.info('Sorting first message')
|
||||
for message in stream_in:
|
||||
# Incoming messages should be the output of DATASOURCE_UNFRAME.
|
||||
assert_datasource_unframe_protocol(message), \
|
||||
"Bad message in date_sort: %s" % message
|
||||
|
||||
|
||||
# Only allow messages from sources we expect.
|
||||
assert message.source_id in sources, "Unexpected source: %s" % message
|
||||
|
||||
@@ -45,13 +44,12 @@ def date_sort(stream_in, source_ids):
|
||||
message = pop_oldest(sources)
|
||||
assert_sort_protocol(message)
|
||||
yield message
|
||||
|
||||
|
||||
# We should have only a done message left in each queue.
|
||||
for queue in sources.itervalues():
|
||||
assert len(queue) == 1, "Bad queue in date_sort on exit: %s" % queue
|
||||
assert queue[0].dt == "DONE", \
|
||||
"Bad last message in date_sort on exit: %s" % queue
|
||||
log.info('Successfully finished Sorting')
|
||||
|
||||
def ready(sources):
|
||||
"""
|
||||
|
||||
@@ -117,8 +117,8 @@ def drain_receiver(receiver, count=None):
|
||||
return output, transaction_count
|
||||
|
||||
|
||||
def assert_single_position(test, zipline):
|
||||
output, transaction_count = drain_zipline(test, zipline)
|
||||
def assert_single_position(test, zipline, blocking=False):
|
||||
output, transaction_count = drain_zipline(test, zipline, p_blocking=blocking)
|
||||
|
||||
test.assertEqual(
|
||||
test.zipline_test_config['order_count'],
|
||||
|
||||
Reference in New Issue
Block a user