diff --git a/zipline/gens/sort.py b/zipline/gens/sort.py index c76db032..d8ebd173 100644 --- a/zipline/gens/sort.py +++ b/zipline/gens/sort.py @@ -27,12 +27,11 @@ def date_sort(stream_in, source_ids): sources[id] = deque() # Process incoming streams. - log.info('Sorting first message') for message in stream_in: # Incoming messages should be the output of DATASOURCE_UNFRAME. assert_datasource_unframe_protocol(message), \ "Bad message in date_sort: %s" % message - + # Only allow messages from sources we expect. assert message.source_id in sources, "Unexpected source: %s" % message @@ -45,13 +44,12 @@ def date_sort(stream_in, source_ids): message = pop_oldest(sources) assert_sort_protocol(message) yield message - + # We should have only a done message left in each queue. for queue in sources.itervalues(): assert len(queue) == 1, "Bad queue in date_sort on exit: %s" % queue assert queue[0].dt == "DONE", \ "Bad last message in date_sort on exit: %s" % queue - log.info('Successfully finished Sorting') def ready(sources): """ diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 2ebbae73..99175c63 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -117,8 +117,8 @@ def drain_receiver(receiver, count=None): return output, transaction_count -def assert_single_position(test, zipline): - output, transaction_count = drain_zipline(test, zipline) +def assert_single_position(test, zipline, blocking=False): + output, transaction_count = drain_zipline(test, zipline, p_blocking=blocking) test.assertEqual( test.zipline_test_config['order_count'],