diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 29e78639..c23b6ca1 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from logbook import Logger, Processor from pandas.tslib import normalize_date @@ -206,46 +207,91 @@ class AlgorithmSimulator(object): blotter_process_trade = self.algo.blotter.process_trade blotter_process_benchmark = self.algo.blotter.process_benchmark + # Containers for the snapshotted events, so that the events are + # processed in a predictable order, without relying on the sorted order + # of the individual sources. + + # There is only one benchmark per snapshot, will be set to the current + # benchmark iff it occurs. + benchmark = None + # trades and customs are initialized as a list since process_snapshot + # is most often called on market bars, which could contain trades or + # custom events. + trades = [] + customs = [] + + # splits and dividends are processed once a day. + # + # The avoidance of creating the list every time this is called is more + # to attempt to show that this is the infrequent case of the method, + # since the performance benefit from deferring the list allocation is + # marginal. splits list will be allocated when a split occurs in the + # snapshot. + splits = None + # dividends list will be allocated when a dividend occurs in the + # snapshot. + dividends = None + for event in snapshot: - if event.type == DATASOURCE_TYPE.TRADE: - self.update_universe(event) - any_trade_occurred = True - if instant_fill: - events_to_be_processed.append(event) - else: - for txn, order in blotter_process_trade(event): - if txn.type == DATASOURCE_TYPE.TRANSACTION: - perf_process_transaction(txn) - elif txn.type == DATASOURCE_TYPE.COMMISSION: - perf_process_commission(txn) - perf_process_order(order) - perf_process_trade(event) - + trades.append(event) elif event.type == DATASOURCE_TYPE.BENCHMARK: - benchmark_event_occurred = True - perf_process_benchmark(event) - for txn, order in blotter_process_benchmark(event): + benchmark = event + elif event.type == DATASOURCE_TYPE.SPLIT: + if splits is None: + splits = [] + splits.append(event) + elif event.type == DATASOURCE_TYPE.CUSTOM: + customs.append(event) + elif event.type == DATASOURCE_TYPE.DIVIDEND: + if dividends is None: + dividends = [] + dividends.append(event) + else: + raise log.warn("Unrecognized event=%s".format(event)) + + # Handle benchmark first. + # + # Internal broker implementation depends on the benchmark being + # processed first so that transactions and commissions reported from + # the broker can be injected. + if benchmark is not None: + benchmark_event_occurred = True + perf_process_benchmark(benchmark) + for txn, order in blotter_process_benchmark(benchmark): + if txn.type == DATASOURCE_TYPE.TRANSACTION: + perf_process_transaction(txn) + elif txn.type == DATASOURCE_TYPE.COMMISSION: + perf_process_commission(txn) + perf_process_order(order) + + for trade in trades: + self.update_universe(trade) + any_trade_occurred = True + if instant_fill: + events_to_be_processed.append(trade) + else: + for txn, order in blotter_process_trade(trade): if txn.type == DATASOURCE_TYPE.TRANSACTION: perf_process_transaction(txn) elif txn.type == DATASOURCE_TYPE.COMMISSION: perf_process_commission(txn) perf_process_order(order) + perf_process_trade(trade) - elif event.type == DATASOURCE_TYPE.CUSTOM: - self.update_universe(event) + for custom in customs: + self.update_universe(custom) - elif event.type == DATASOURCE_TYPE.SPLIT: + if splits is not None: + for split in splits: # process_split is not assigned to a variable since it is # called rarely compared to the other event processors. - self.algo.blotter.process_split(event) - perf_process_split(event) + self.algo.blotter.process_split(split) + perf_process_split(split) - elif event.type == DATASOURCE_TYPE.DIVIDEND: - perf_process_dividend(event) - - else: - raise log.warn("Unrecognized event=%s".format(event)) + if dividends is not None: + for dividend in dividends: + perf_process_dividend(dividend) if any_trade_occurred: new_orders = self._call_handle_data() @@ -256,13 +302,13 @@ class AlgorithmSimulator(object): # Now that handle_data has been called and orders have been placed, # process the event stream to fill user orders based on the events # from this snapshot. - for event in events_to_be_processed: - for txn, order in blotter_process_trade(event): + for trade in events_to_be_processed: + for txn, order in blotter_process_trade(trade): if txn is not None: perf_process_transaction(txn) if order is not None: perf_process_order(order) - perf_process_trade(event) + perf_process_trade(trade) if benchmark_event_occurred: return self.get_message(dt)