diff --git a/tests/test_components.py b/tests/test_components.py index 54049723..33255883 100644 --- a/tests/test_components.py +++ b/tests/test_components.py @@ -7,7 +7,6 @@ from unittest2 import TestCase from collections import defaultdict from zipline.gens.composites import date_sorted_sources, merged_transforms -from zipline.finance.trading import SIMULATION_STYLE from zipline.core.devsimulator import AddressAllocator from zipline.gens.transform import MovingAverage, Passthrough, StatefulTransform from zipline.gens.tradesimulation import TradeSimulationClient as tsc @@ -31,7 +30,9 @@ from zipline.protocol import ( FEED_UNFRAME, MERGE_FRAME, MERGE_UNFRAME, - SIMULATION_STYLE + SIMULATION_STYLE, + PERF_FRAME, + BT_UPDATE_UNFRAME ) from zipline.gens.tradegens import SpecificEquityTrades @@ -57,6 +58,34 @@ class ComponentTestCase(TestCase): self.ctx = zmq.Context() setup_logger(self) + count = 250 + filter = [2,3] + #Set up source a. One minute between events. + args_a = tuple() + kwargs_a = { + 'count' : 2*count, + 'sids' : [1,2,3], + 'start' : datetime(2002,1,3,15, tzinfo = pytz.utc), + 'delta' : timedelta(hours = 6), + 'filter' : filter + } + self.source_a = SpecificEquityTrades(*args_a, **kwargs_a) + + #Set up source b. Two minutes between events. + args_b = tuple() + kwargs_b = { + 'count' : count, + 'sids' : [2,3,4], + 'start' : datetime(2002,1,3,14, tzinfo = pytz.utc), + 'delta' : timedelta(minutes = 5), + 'filter' : filter + } + self.source_b = SpecificEquityTrades(*args_b, **kwargs_b) + + self.environment = create_trading_environment(year = 2002) + + + def tearDown(self): teardown_logger(self) @@ -187,47 +216,24 @@ class ComponentTestCase(TestCase): def test_full(self): monitor = create_monitor(allocator) - filter = [2,3] - #Set up source a. One minute between events. - args_a = tuple() - kwargs_a = { - 'count' : 325, - 'sids' : [1,2,3], - 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), - 'delta' : timedelta(hours = 6), - 'filter' : filter - } - source_a = SpecificEquityTrades(*args_a, **kwargs_a) - - #Set up source b. Two minutes between events. - args_b = tuple() - kwargs_b = { - 'count' : 7500, - 'sids' : [2,3,4], - 'start' : datetime(2012,1,3,14, tzinfo = pytz.utc), - 'delta' : timedelta(minutes = 5), - 'filter' : filter - } - source_b = SpecificEquityTrades(*args_b, **kwargs_b) - # ------------------------ # Run sources in dedicated processes comp_a = Component( - source_a, + self.source_a, monitor, allocator.lease(1)[0], DATASOURCE_FRAME, DATASOURCE_UNFRAME, - source_a.get_hash() + self.source_a.get_hash() ) comp_b = Component( - source_b, + self.source_b, monitor, allocator.lease(1)[0], DATASOURCE_FRAME, DATASOURCE_UNFRAME, - source_b.get_hash() + self.source_b.get_hash() ) # Date sort the sources, and run the sort in a dedicated @@ -272,13 +278,23 @@ class ComponentTestCase(TestCase): ) algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) - environment = create_trading_environment(year = 2012) + style = SIMULATION_STYLE.FIXED_SLIPPAGE - trading_client = tsc(algo, environment, style) + trading_client = tsc(algo, self.environment, style) + tsc_gen = trading_client.simulate(merged) + + tsc_comp = Component( + tsc_gen, + monitor, + allocator.lease(1)[0], + PERF_FRAME, + BT_UPDATE_UNFRAME, + "tsc" + ) launch_monitor(monitor) - for message in trading_client.simulate(merged): + for message in tsc_comp: log.info(pf(message)) @@ -289,48 +305,69 @@ class ComponentTestCase(TestCase): merged.proc.join() return + def test_single_thread(self): + #Set up source c. Three minutes between events. + + sorted = date_sorted_sources(self.source_a, self.source_b) + + passthrough = StatefulTransform(Passthrough) + mavg_price = StatefulTransform(MovingAverage, timedelta(minutes = 20), ['price']) + + merged = merged_transforms(sorted, passthrough, mavg_price) + + algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) + style = SIMULATION_STYLE.FIXED_SLIPPAGE + + trading_client = tsc(algo, self.environment, style) + for message in trading_client.simulate(merged): + log.info(pf(message)) def test_compound(self): monitor = create_monitor(allocator) - filter = [2,3] - #Set up source a. One minute between events. - args_a = tuple() - kwargs_a = { - 'count' : 325, - 'sids' : [1,2,3], - 'start' : datetime(2012,1,3,15, tzinfo = pytz.utc), - 'delta' : timedelta(hours = 6), - 'filter' : filter - } - source_a = SpecificEquityTrades(*args_a, **kwargs_a) - - #Set up source b. Two minutes between events. - args_b = tuple() - kwargs_b = { - 'count' : 7500, - 'sids' : [2,3,4], - 'start' : datetime(2012,1,3,14, tzinfo = pytz.utc), - 'delta' : timedelta(minutes = 5), - 'filter' : filter - } - source_b = SpecificEquityTrades(*args_b, **kwargs_b) - - sorted_out = date_sorted_sources(source_a, source_b) + sorted_out = date_sorted_sources(self.source_a, self.source_b) sorted = Component( sorted_out, monitor, allocator.lease(1)[0], FEED_FRAME, - FEED_UNFRAME + FEED_UNFRAME, + "feed" ) + passthrough = StatefulTransform(Passthrough) + mavg_price = StatefulTransform( + MovingAverage, + timedelta(minutes = 20), + ['price'] + ) + + merged_gen = merged_transforms(sorted, passthrough, mavg_price) + + merged = Component( + merged_gen, + monitor, + allocator.lease(1)[0], + MERGE_FRAME, + MERGE_UNFRAME, + "merge" + ) + + algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) + style = SIMULATION_STYLE.FIXED_SLIPPAGE + + trading_client = tsc(algo, self.environment, style) + tsc_gen = trading_client.simulate(merged) + + launch_monitor(monitor) - - for event in sorted: - log.info(event) + for message in tsc_gen: + log.info(pf(message)) + # wait for processes to finish sorted.proc.join() + merged.proc.join() + return