mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-05 17:53:17 +08:00
updated tests to compare with/without processes
This commit is contained in:
+97
-60
@@ -7,7 +7,6 @@ from unittest2 import TestCase
|
||||
from collections import defaultdict
|
||||
from zipline.gens.composites import date_sorted_sources, merged_transforms
|
||||
|
||||
from zipline.finance.trading import SIMULATION_STYLE
|
||||
from zipline.core.devsimulator import AddressAllocator
|
||||
from zipline.gens.transform import MovingAverage, Passthrough, StatefulTransform
|
||||
from zipline.gens.tradesimulation import TradeSimulationClient as tsc
|
||||
@@ -31,7 +30,9 @@ from zipline.protocol import (
|
||||
FEED_UNFRAME,
|
||||
MERGE_FRAME,
|
||||
MERGE_UNFRAME,
|
||||
SIMULATION_STYLE
|
||||
SIMULATION_STYLE,
|
||||
PERF_FRAME,
|
||||
BT_UPDATE_UNFRAME
|
||||
)
|
||||
|
||||
from zipline.gens.tradegens import SpecificEquityTrades
|
||||
@@ -57,6 +58,34 @@ class ComponentTestCase(TestCase):
|
||||
self.ctx = zmq.Context()
|
||||
setup_logger(self)
|
||||
|
||||
count = 250
|
||||
filter = [2,3]
|
||||
#Set up source a. One minute between events.
|
||||
args_a = tuple()
|
||||
kwargs_a = {
|
||||
'count' : 2*count,
|
||||
'sids' : [1,2,3],
|
||||
'start' : datetime(2002,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(hours = 6),
|
||||
'filter' : filter
|
||||
}
|
||||
self.source_a = SpecificEquityTrades(*args_a, **kwargs_a)
|
||||
|
||||
#Set up source b. Two minutes between events.
|
||||
args_b = tuple()
|
||||
kwargs_b = {
|
||||
'count' : count,
|
||||
'sids' : [2,3,4],
|
||||
'start' : datetime(2002,1,3,14, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 5),
|
||||
'filter' : filter
|
||||
}
|
||||
self.source_b = SpecificEquityTrades(*args_b, **kwargs_b)
|
||||
|
||||
self.environment = create_trading_environment(year = 2002)
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
teardown_logger(self)
|
||||
|
||||
@@ -187,47 +216,24 @@ class ComponentTestCase(TestCase):
|
||||
def test_full(self):
|
||||
monitor = create_monitor(allocator)
|
||||
|
||||
filter = [2,3]
|
||||
#Set up source a. One minute between events.
|
||||
args_a = tuple()
|
||||
kwargs_a = {
|
||||
'count' : 325,
|
||||
'sids' : [1,2,3],
|
||||
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(hours = 6),
|
||||
'filter' : filter
|
||||
}
|
||||
source_a = SpecificEquityTrades(*args_a, **kwargs_a)
|
||||
|
||||
#Set up source b. Two minutes between events.
|
||||
args_b = tuple()
|
||||
kwargs_b = {
|
||||
'count' : 7500,
|
||||
'sids' : [2,3,4],
|
||||
'start' : datetime(2012,1,3,14, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 5),
|
||||
'filter' : filter
|
||||
}
|
||||
source_b = SpecificEquityTrades(*args_b, **kwargs_b)
|
||||
|
||||
# ------------------------
|
||||
# Run sources in dedicated processes
|
||||
comp_a = Component(
|
||||
source_a,
|
||||
self.source_a,
|
||||
monitor,
|
||||
allocator.lease(1)[0],
|
||||
DATASOURCE_FRAME,
|
||||
DATASOURCE_UNFRAME,
|
||||
source_a.get_hash()
|
||||
self.source_a.get_hash()
|
||||
)
|
||||
|
||||
comp_b = Component(
|
||||
source_b,
|
||||
self.source_b,
|
||||
monitor,
|
||||
allocator.lease(1)[0],
|
||||
DATASOURCE_FRAME,
|
||||
DATASOURCE_UNFRAME,
|
||||
source_b.get_hash()
|
||||
self.source_b.get_hash()
|
||||
)
|
||||
|
||||
# Date sort the sources, and run the sort in a dedicated
|
||||
@@ -272,13 +278,23 @@ class ComponentTestCase(TestCase):
|
||||
)
|
||||
|
||||
algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3])
|
||||
environment = create_trading_environment(year = 2012)
|
||||
|
||||
style = SIMULATION_STYLE.FIXED_SLIPPAGE
|
||||
|
||||
trading_client = tsc(algo, environment, style)
|
||||
trading_client = tsc(algo, self.environment, style)
|
||||
tsc_gen = trading_client.simulate(merged)
|
||||
|
||||
tsc_comp = Component(
|
||||
tsc_gen,
|
||||
monitor,
|
||||
allocator.lease(1)[0],
|
||||
PERF_FRAME,
|
||||
BT_UPDATE_UNFRAME,
|
||||
"tsc"
|
||||
)
|
||||
|
||||
launch_monitor(monitor)
|
||||
for message in trading_client.simulate(merged):
|
||||
for message in tsc_comp:
|
||||
log.info(pf(message))
|
||||
|
||||
|
||||
@@ -289,48 +305,69 @@ class ComponentTestCase(TestCase):
|
||||
merged.proc.join()
|
||||
return
|
||||
|
||||
def test_single_thread(self):
|
||||
|
||||
#Set up source c. Three minutes between events.
|
||||
|
||||
sorted = date_sorted_sources(self.source_a, self.source_b)
|
||||
|
||||
passthrough = StatefulTransform(Passthrough)
|
||||
mavg_price = StatefulTransform(MovingAverage, timedelta(minutes = 20), ['price'])
|
||||
|
||||
merged = merged_transforms(sorted, passthrough, mavg_price)
|
||||
|
||||
algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3])
|
||||
style = SIMULATION_STYLE.FIXED_SLIPPAGE
|
||||
|
||||
trading_client = tsc(algo, self.environment, style)
|
||||
for message in trading_client.simulate(merged):
|
||||
log.info(pf(message))
|
||||
|
||||
def test_compound(self):
|
||||
monitor = create_monitor(allocator)
|
||||
|
||||
filter = [2,3]
|
||||
#Set up source a. One minute between events.
|
||||
args_a = tuple()
|
||||
kwargs_a = {
|
||||
'count' : 325,
|
||||
'sids' : [1,2,3],
|
||||
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(hours = 6),
|
||||
'filter' : filter
|
||||
}
|
||||
source_a = SpecificEquityTrades(*args_a, **kwargs_a)
|
||||
|
||||
#Set up source b. Two minutes between events.
|
||||
args_b = tuple()
|
||||
kwargs_b = {
|
||||
'count' : 7500,
|
||||
'sids' : [2,3,4],
|
||||
'start' : datetime(2012,1,3,14, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 5),
|
||||
'filter' : filter
|
||||
}
|
||||
source_b = SpecificEquityTrades(*args_b, **kwargs_b)
|
||||
|
||||
sorted_out = date_sorted_sources(source_a, source_b)
|
||||
sorted_out = date_sorted_sources(self.source_a, self.source_b)
|
||||
|
||||
sorted = Component(
|
||||
sorted_out,
|
||||
monitor,
|
||||
allocator.lease(1)[0],
|
||||
FEED_FRAME,
|
||||
FEED_UNFRAME
|
||||
FEED_UNFRAME,
|
||||
"feed"
|
||||
)
|
||||
|
||||
passthrough = StatefulTransform(Passthrough)
|
||||
mavg_price = StatefulTransform(
|
||||
MovingAverage,
|
||||
timedelta(minutes = 20),
|
||||
['price']
|
||||
)
|
||||
|
||||
merged_gen = merged_transforms(sorted, passthrough, mavg_price)
|
||||
|
||||
merged = Component(
|
||||
merged_gen,
|
||||
monitor,
|
||||
allocator.lease(1)[0],
|
||||
MERGE_FRAME,
|
||||
MERGE_UNFRAME,
|
||||
"merge"
|
||||
)
|
||||
|
||||
algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3])
|
||||
style = SIMULATION_STYLE.FIXED_SLIPPAGE
|
||||
|
||||
trading_client = tsc(algo, self.environment, style)
|
||||
tsc_gen = trading_client.simulate(merged)
|
||||
|
||||
|
||||
launch_monitor(monitor)
|
||||
|
||||
for event in sorted:
|
||||
log.info(event)
|
||||
for message in tsc_gen:
|
||||
log.info(pf(message))
|
||||
|
||||
|
||||
# wait for processes to finish
|
||||
sorted.proc.join()
|
||||
merged.proc.join()
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user