From 862cfbbd19abaa2e65db2f49009c8544b71c9ba3 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Sun, 17 May 2015 23:46:02 -0400 Subject: [PATCH] MAINT: Group events by type before processing. Make the ordering in which processing of event types both explicit and independent of the sort ordering of the incoming sources. The overhead of creating the list per snapshot and the iterators appears to be marginal in the minute data case when tested locally. This patch is intended as part of the path towards making the trade simulation loop not depend on consuming and tracking every trade event. The timing of where last_sale_date was needed to be changed was proving difficult to adapt in the previous model. Should also allow the removal of sorting of the various source streams. --- zipline/gens/tradesimulation.py | 106 +++++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 30 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 29e78639..c23b6ca1 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from logbook import Logger, Processor from pandas.tslib import normalize_date @@ -206,46 +207,91 @@ class AlgorithmSimulator(object): blotter_process_trade = self.algo.blotter.process_trade blotter_process_benchmark = self.algo.blotter.process_benchmark + # Containers for the snapshotted events, so that the events are + # processed in a predictable order, without relying on the sorted order + # of the individual sources. + + # There is only one benchmark per snapshot, will be set to the current + # benchmark iff it occurs. + benchmark = None + # trades and customs are initialized as a list since process_snapshot + # is most often called on market bars, which could contain trades or + # custom events. + trades = [] + customs = [] + + # splits and dividends are processed once a day. + # + # The avoidance of creating the list every time this is called is more + # to attempt to show that this is the infrequent case of the method, + # since the performance benefit from deferring the list allocation is + # marginal. splits list will be allocated when a split occurs in the + # snapshot. + splits = None + # dividends list will be allocated when a dividend occurs in the + # snapshot. + dividends = None + for event in snapshot: - if event.type == DATASOURCE_TYPE.TRADE: - self.update_universe(event) - any_trade_occurred = True - if instant_fill: - events_to_be_processed.append(event) - else: - for txn, order in blotter_process_trade(event): - if txn.type == DATASOURCE_TYPE.TRANSACTION: - perf_process_transaction(txn) - elif txn.type == DATASOURCE_TYPE.COMMISSION: - perf_process_commission(txn) - perf_process_order(order) - perf_process_trade(event) - + trades.append(event) elif event.type == DATASOURCE_TYPE.BENCHMARK: - benchmark_event_occurred = True - perf_process_benchmark(event) - for txn, order in blotter_process_benchmark(event): + benchmark = event + elif event.type == DATASOURCE_TYPE.SPLIT: + if splits is None: + splits = [] + splits.append(event) + elif event.type == DATASOURCE_TYPE.CUSTOM: + customs.append(event) + elif event.type == DATASOURCE_TYPE.DIVIDEND: + if dividends is None: + dividends = [] + dividends.append(event) + else: + raise log.warn("Unrecognized event=%s".format(event)) + + # Handle benchmark first. + # + # Internal broker implementation depends on the benchmark being + # processed first so that transactions and commissions reported from + # the broker can be injected. + if benchmark is not None: + benchmark_event_occurred = True + perf_process_benchmark(benchmark) + for txn, order in blotter_process_benchmark(benchmark): + if txn.type == DATASOURCE_TYPE.TRANSACTION: + perf_process_transaction(txn) + elif txn.type == DATASOURCE_TYPE.COMMISSION: + perf_process_commission(txn) + perf_process_order(order) + + for trade in trades: + self.update_universe(trade) + any_trade_occurred = True + if instant_fill: + events_to_be_processed.append(trade) + else: + for txn, order in blotter_process_trade(trade): if txn.type == DATASOURCE_TYPE.TRANSACTION: perf_process_transaction(txn) elif txn.type == DATASOURCE_TYPE.COMMISSION: perf_process_commission(txn) perf_process_order(order) + perf_process_trade(trade) - elif event.type == DATASOURCE_TYPE.CUSTOM: - self.update_universe(event) + for custom in customs: + self.update_universe(custom) - elif event.type == DATASOURCE_TYPE.SPLIT: + if splits is not None: + for split in splits: # process_split is not assigned to a variable since it is # called rarely compared to the other event processors. - self.algo.blotter.process_split(event) - perf_process_split(event) + self.algo.blotter.process_split(split) + perf_process_split(split) - elif event.type == DATASOURCE_TYPE.DIVIDEND: - perf_process_dividend(event) - - else: - raise log.warn("Unrecognized event=%s".format(event)) + if dividends is not None: + for dividend in dividends: + perf_process_dividend(dividend) if any_trade_occurred: new_orders = self._call_handle_data() @@ -256,13 +302,13 @@ class AlgorithmSimulator(object): # Now that handle_data has been called and orders have been placed, # process the event stream to fill user orders based on the events # from this snapshot. - for event in events_to_be_processed: - for txn, order in blotter_process_trade(event): + for trade in events_to_be_processed: + for txn, order in blotter_process_trade(trade): if txn is not None: perf_process_transaction(txn) if order is not None: perf_process_order(order) - perf_process_trade(event) + perf_process_trade(trade) if benchmark_event_occurred: return self.get_message(dt)