From 14067d83239f53907e889143132eea0dff922aa2 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 1 Aug 2012 21:42:55 -0400 Subject: [PATCH] commit for fawce --- zipline/finance/performance.py | 7 +++-- zipline/finance/trading.py | 6 ++-- zipline/gens/examples.py | 21 ++++++------- zipline/gens/merge.py | 2 +- zipline/gens/tradegens.py | 2 +- zipline/gens/tradesimulation.py | 52 ++++++++++++++++++++++++++------- zipline/gens/transform.py | 25 ++++++---------- 7 files changed, 71 insertions(+), 44 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 6871cd07..065d5095 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -133,6 +133,7 @@ import zipline.finance.risk as risk log = logbook.Logger('Performance') class PerformanceTracker(object): + UPDATER = True """ Tracks the performance of the zipline as it is running in the simulator, relays this out to the Deluge broker and then @@ -202,8 +203,10 @@ class PerformanceTracker(object): self.todays_performance.positions[sid] = Position(sid) def update(self, event): - event.perf_message = self.process_event() - event.portfolio = self.get_portfolio + import nose.tools; nose.tools.set_trace() + event.perf_message = self.process_event(event) + event.portfolio = self.get_portfolio() + del event['TRANSACTION'] return event def get_portfolio(self): diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 3676437f..dd6345d4 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -9,7 +9,7 @@ from zipline.protocol import SIMULATION_STYLE log = logbook.Logger('Transaction Simulator') class TransactionSimulator(object): - FORWARDER = True + UPDATER = True def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME): self.open_orders = open_orders @@ -28,9 +28,9 @@ class TransactionSimulator(object): self.apply_trade_to_open_orders = self.simulate_noop def update(self, event): - event.txn = None + event.TRANSACTION = None if event.type == zp.DATASOURCE_TYPE.TRADE: - event.txn = self.apply_trade_to_open_orders(event) + event.TRANSACTION = self.apply_trade_to_open_orders(event) return event def simulate_buy_all(self, event): diff --git a/zipline/gens/examples.py b/zipline/gens/examples.py index def7954f..7e01c293 100644 --- a/zipline/gens/examples.py +++ b/zipline/gens/examples.py @@ -1,3 +1,4 @@ +import pytz from datetime import datetime, timedelta from zipline.utils.factory import create_trading_environment @@ -17,8 +18,8 @@ if __name__ == "__main__": #Set up source a. One minute between events. args_a = tuple() kwargs_a = { - 'sids' : [1,2], - 'start' : datetime(2012,6,6,0), + 'sids' : [1], + 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), 'delta' : timedelta(minutes = 1), 'filter' : filter } @@ -27,9 +28,9 @@ if __name__ == "__main__": #Set up source b. Two minutes between events. args_b = tuple() kwargs_b = { - 'sids' : [2,3], - 'start' : datetime(2012,6,6,0), - 'delta' : timedelta(minutes = 2), + 'sids' : [2], + 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), + 'delta' : timedelta(minutes = 1), 'filter' : filter } bundle_b = SourceBundle(SpecificEquityTrades, args_b, kwargs_b) @@ -37,9 +38,9 @@ if __name__ == "__main__": #Set up source c. Three minutes between events. args_c = tuple() kwargs_c = { - 'sids' : [3,4], - 'start' : datetime(2012,6,6,0), - 'delta' : timedelta(minutes = 3), + 'sids' : [3], + 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), + 'delta' : timedelta(minutes = 1), 'filter' : filter } bundle_c = SourceBundle(SpecificEquityTrades, args_c, kwargs_c) @@ -58,9 +59,9 @@ if __name__ == "__main__": # print message algo = TestAlgorithm(2, 100, 100) - environment = create_trading_environment() + environment = create_trading_environment(year = 2012) style = zp.SIMULATION_STYLE.PARTIAL_VOLUME client_out = tsc(merge_out, algo, environment, style) - + client_out.next() diff --git a/zipline/gens/merge.py b/zipline/gens/merge.py index 5a4e6bde..dfd904d2 100644 --- a/zipline/gens/merge.py +++ b/zipline/gens/merge.py @@ -19,7 +19,7 @@ def merge(stream_in, tnfm_ids): """ assert isinstance(tnfm_ids, list) - + # Set up an internal queue for each expected source. tnfms = {} for id in tnfm_ids: diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index e3b88a0e..7420e1b4 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -9,7 +9,7 @@ from datetime import datetime, timedelta from zipline.utils.factory import create_trade from zipline.gens.utils import hash_args, mock_done -def date_gen(start = datetime(2012, 6, 6, 0), +def date_gen(start = datetime(2006, 6, 6, 12), delta = timedelta(minutes = 1), count = 100): """ diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index db80a8ed..2c12dc6f 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -1,5 +1,6 @@ import logbook +from datetime import datetime, timedelta from numbers import Integral from zipline import ndict @@ -75,7 +76,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): return open_orders[sid].append(event) - + # Set the algo's order method. algo.set_order(order) @@ -85,7 +86,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # Call user-defined initialize method before we process any # events. algo.initialize() - + # Pipe the in stream into the transaction simulator. # Creates a txn field on the event containing transaction # information if we filled any pending orders on the event's sid. @@ -111,16 +112,45 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): ) # Batch the event stream by dt to be processed by the user's algo. - # Will also set the PERF_MESSAGE field if the batch contains a perf - # message. + # Yields perf messages whenever it encounters them. + perf_messages = algo_simulator(with_portfolio_and_perf_msg, algo) - def batcher(stream): - for msg in stream: - yield msg + - batches = batcher(with_portfolio_and_perf_msg) - for batch in batches: - algo.handle_data(batch.data) - if batch.perf_message: +def algo_simulator(stream_in, sids, algo): + + current_dt = None + universe = ndict() + + for sid in sids: + universe[sid] = None + universe.portfolio = None + + for update in stream_in: + #Yield perf messages to be relayed back to the browser. + if update.perf_message: yield perf_message + + if current_dt = None: + current_dt = update.dt + + # If this message is newer than the algorithm's simulated dt, + # call handle data on a snapshot of the current algo universe, + # then + if message.dt >= current_dt + last_delta: + start_tic = datetime.now() + algo.handle_data(universe) + stop_tic = datetime.now() + last_delta = datetime + + current_dt = message.dt + last_delta + + batch.data[message.sid] = message + batch.data.portfolio = message.portfolio + + + + + + diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 9d90aeb8..44ec4b5a 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -49,6 +49,7 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs): are forwarded. """ forward_all_fields = tnfm_class.__dict__.get('FORWARDER', False) + update_in_place = tnfm_class.__dict__.get('UPDATER', False) assert isinstance(tnfm_class, (types.ObjectType, types.ClassType)), \ "Stateful transform requires a class." @@ -72,29 +73,21 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs): tnfm_value = state.update(deepcopy(message_copy)) # If we want to keep all original values, plus append tnfm_id - # and tnfm_value. + # and tnfm_value. Used for Passthrough. if forward_all_fields: out_message = message_copy out_message.tnfm_id = namestring out_message.tnfm_value = tnfm_value yield out_message - # Special logic for TransactionSimulator and - # PerformanceTracker. This is ugly but I want to get to - # testing faster. Should be refactored later to something - # that doesn't make Scott cry. - elif tnfm_class.__name__ == 'TransactionSimulator': - out_message = message_copy - out_message.txn = tnfm_value - yield out_message + # Our expectation is that the transform simply updated the + # message it was passed. Useful for chaining together + # multiple transforms, e.g. TransactionSimulator/PerformanceTracker. + elif update_in_place: + yield tnfm_value - elif tnfm_class.__name__ == 'PerformanceTracker': - out_message = message_copy - del out_message['txn'] - out_message.portfolio = tnfm_value - yield out_message - - # Otherwise send tnfm_id, tnfm_value, and the message date. + # Otherwise send tnfm_id, tnfm_value, and the message + # date. Useful for transforms being piped to a merge. else: out_message = ndict() out_message.tnfm_id = namestring