diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 9cae6e72..3676437f 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -9,6 +9,7 @@ from zipline.protocol import SIMULATION_STYLE log = logbook.Logger('Transaction Simulator') class TransactionSimulator(object): + FORWARDER = True def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME): self.open_orders = open_orders diff --git a/zipline/gens/examples.py b/zipline/gens/examples.py index d99da662..def7954f 100644 --- a/zipline/gens/examples.py +++ b/zipline/gens/examples.py @@ -54,15 +54,13 @@ if __name__ == "__main__": merge_out = merged_transforms(sort_out, tnfm_bundles) -# for message in merge_out: -# print "Event: \n", message.event -# print "Transforms: \n", message.tnfms + # for message in merge_out: +# print message algo = TestAlgorithm(2, 100, 100) environment = create_trading_environment() style = zp.SIMULATION_STYLE.PARTIAL_VOLUME - + client_out = tsc(merge_out, algo, environment, style) - for message in client_out: - print message + diff --git a/zipline/gens/merge.py b/zipline/gens/merge.py index 0e8fab93..5a4e6bde 100644 --- a/zipline/gens/merge.py +++ b/zipline/gens/merge.py @@ -54,21 +54,19 @@ def merge(stream_in, tnfm_ids): def merge_one(sources): dict_primer = zip(sources.keys(), repeat(None)) - transforms = ndict(dict_primer) event_fields = ndict() for key, queue in sources.iteritems(): # Add transform value to the transforms dict. message = queue.popleft() - transforms[message.tnfm_id] = message.tnfm_value + event_fields[message.tnfm_id] = message.tnfm_value del message['tnfm_id'] del message['tnfm_value'] # Merge any remaining fields into the event dict. event_fields.merge(message) - - return ndict({'event' : event_fields, 'tnfms' : transforms}) + return event_fields #TODO: This is replicated in sort. Probably should be one source file. diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 76d7938e..db80a8ed 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -49,7 +49,6 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # Initialize txn_sim's dictionary of orders here so that we can # reference it from within the user's algorithm. - import nose.tools; nose.tools.set_trace() sids = algo.get_sid_filter() open_orders = {} @@ -88,7 +87,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): algo.initialize() # Pipe the in stream into the transaction simulator. - # Creates a TRANSACTION field on the event containing transaction + # Creates a txn field on the event containing transaction # information if we filled any pending orders on the event's sid. # TRANSACTION is None if we didn't fill any orders. with_txns = stateful_transform( @@ -100,14 +99,14 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # Pipe the events with transactions to perf. This will remove the - # TRANSACTION field added by TransactionSimulator and replace it with + # txn field added by TransactionSimulator and replace it with # a portfolio object to be passed to the user's algorithm. Also adds # a PERF_MESSAGE field which is usually none, but contains an update # message once per day. with_portfolio_and_perf_msg = stateful_transform( - stream_with_txns, + with_txns, PerformanceTracker, - trading_environment, + environment, sids ) @@ -115,6 +114,10 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # Will also set the PERF_MESSAGE field if the batch contains a perf # message. + def batcher(stream): + for msg in stream: + yield msg + batches = batcher(with_portfolio_and_perf_msg) for batch in batches: diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index ece2d383..2adc44e8 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -54,7 +54,7 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs): "Stateful transform requires a class." assert tnfm_class.__dict__.has_key('update'), \ "Stateful transform requires the class to have an update method" - + # Create an instance of our transform class. state = tnfm_class(*args, **kwargs) @@ -71,13 +71,18 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs): # Same shared pointer issue here as above. tnfm_value = state.update(deepcopy(message_copy)) - # If we want to keep all original values, just append tnfm_id + # If we want to keep all original values, plus append tnfm_id # and tnfm_value. if forward_all_fields: out_message = message_copy out_message.tnfm_id = namestring out_message.tnfm_value = tnfm_value yield out_message + + # Special logic for TransactionSimulator. This is ugly but I + # want to get to testing faster. Should be refactored later + # to something that doesn't make Scott cry. + elif tnfm_class.__name__ == 'TransactionSimulator' # Otherwise send tnfm_id, tnfm_value, and the message date. else: diff --git a/zipline/utils/factory.py b/zipline/utils/factory.py index db440891..004f542a 100644 --- a/zipline/utils/factory.py +++ b/zipline/utils/factory.py @@ -89,8 +89,6 @@ def get_next_trading_dt(current, interval, trading_calendar): return next - - def create_trade_history(sid, prices, amounts, interval, trading_calendar): trades = [] current = trading_calendar.first_open