diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index 234db714..af2fdc8c 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -11,26 +11,22 @@ from zipline.gens.transform import stateful_transform SourceBundle = namedtuple("SourceBundle", ['source', 'args', 'kwargs']) TransformBundle = namedtuple("TransformBundle", ['tnfm', 'args', 'kwargs']) -def date_sorted_sources(bundles): +def date_sorted_sources(*sources): """ Takes an iterable of SortBundles, generating namestrings and initialized datasources for each before piping them into a date_sort. """ - assert isinstance(bundles, (list, tuple)) - for bundle in bundles: - assert isinstance(bundle, SourceBundle) - # Calculate namestring hashes to pass to date_sort. - names = [bundle.source.__name__ + hash_args(*bundle.args, **bundle.kwargs) - for bundle in bundles] + for source in sources: + assert iter(source), "Source %s not iterable" % source + assert source.__class__.__dict__.has_key('get_hash'), "No get_hash" + + # Get name hashes to pass to date_sort. + names = [source.get_hash() for source in sources] - # Pass each source its arguments. - source_gens = [bundle.source(*bundle.args, **bundle.kwargs) - for bundle in bundles] - # Convert the list of generators into a flat stream by pulling # one element at a time from each. - stream_in = roundrobin(source_gens, names) + stream_in = roundrobin(sources, names) # Guarantee the flat stream will be sorted by date, using source_id as # tie-breaker, which is fully deterministic (given deterministic string diff --git a/zipline/gens/examples.py b/zipline/gens/examples.py index 55704f27..5a24493f 100644 --- a/zipline/gens/examples.py +++ b/zipline/gens/examples.py @@ -13,54 +13,47 @@ from zipline.gens.tradesimulation import trade_simulation_client as tsc import zipline.protocol as zp if __name__ == "__main__": - - filter = [1,2,3,4] + + filter = [2,3] #Set up source a. One minute between events. args_a = tuple() kwargs_a = { - 'sids' : [1], + 'sids' : [1,2,3], 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), 'delta' : timedelta(minutes = 1), 'filter' : filter } - bundle_a = SourceBundle(SpecificEquityTrades, args_a, kwargs_a) + source_a = SpecificEquityTrades(*args_a, **kwargs_a) #Set up source b. Two minutes between events. args_b = tuple() kwargs_b = { - 'sids' : [2], - 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), + 'sids' : [2,3,4], + 'start' : datetime(2012,1,3,14, tzinfo = pytz.utc), 'delta' : timedelta(minutes = 1), 'filter' : filter } - bundle_b = SourceBundle(SpecificEquityTrades, args_b, kwargs_b) + source_b = SpecificEquityTrades(*args_b, **kwargs_b) #Set up source c. Three minutes between events. - args_c = tuple() - kwargs_c = { - 'sids' : [3], - 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), - 'delta' : timedelta(minutes = 1), - 'filter' : filter - } - bundle_c = SourceBundle(SpecificEquityTrades, args_c, kwargs_c) - source_bundles = (bundle_a, bundle_b, bundle_c) - # Pipe our sources into sort. - sort_out = date_sorted_sources(source_bundles) + sort_out = date_sorted_sources(source_a, source_b) + +# passthrough = TransformBundle(Passthrough, (), {}) +# mavg_price = TransformBundle(MovingAverage, (timedelta(minutes = 20), ['price']), {}) +# tnfm_bundles = (passthrough, mavg_price) - passthrough = TransformBundle(Passthrough, (), {}) - mavg_price = TransformBundle(MovingAverage, (timedelta(minutes = 20), ['price']), {}) - tnfm_bundles = (passthrough, mavg_price) +# merge_out = merged_transforms(sort_out, tnfm_bundles) - merge_out = merged_transforms(sort_out, tnfm_bundles) - - # for message in merge_out: -# print message - - algo = TestAlgorithm(2, 100, 100) - environment = create_trading_environment(year = 2012) - style = zp.SIMULATION_STYLE.PARTIAL_VOLUME - - client_out = tsc(merge_out, algo, environment, style) - client_out.next() +# # for message in merge_out: +# # print message + +# algo = TestAlgorithm(2, 100, 100) +# environment = create_trading_environment(year = 2012) +# style = zp.SIMULATION_STYLE.FIXED_SLIPPAGE + +# client_out = tsc(merge_out, algo, environment, style) +# for message in client_out: + # pp(message) + +