TST: Refactor test_handle_adjustment.

Pull all the expected_result calculations into a helper.
This commit is contained in:
Scott Sanderson
2015-09-29 19:21:23 -04:00
parent eb997bb5f8
commit 98d2ed9940
+19 -10
View File
@@ -332,6 +332,7 @@ class FFCAlgorithmTestCase(TestCase):
raise
cls.dates = cls.raw_data[cls.AAPL].index.tz_localize('UTC')
cls.AAPL_split_date = Timestamp("2014-06-09", tz='UTC')
@classmethod
def tearDownClass(cls):
@@ -386,13 +387,13 @@ class FFCAlgorithmTestCase(TestCase):
def make_source(self):
return Panel(self.raw_data).tz_localize('UTC', axis=1)
def test_handle_adjustment(self):
AAPL, MSFT, BRK_A = assets = self.AAPL, self.MSFT, self.BRK_A
def compute_expected_vwaps(self, window_lengths):
AAPL, MSFT, BRK_A = self.AAPL, self.MSFT, self.BRK_A
# Our view of the data before AAPL's split on June 9, 2014.
raw = {k: v.copy() for k, v in iteritems(self.raw_data)}
split_date = Timestamp("2014-06-09", tz='UTC')
split_date = self.AAPL_split_date
split_loc = self.dates.get_loc(split_date)
split_ratio = 7.0
@@ -404,12 +405,9 @@ class FFCAlgorithmTestCase(TestCase):
adj[AAPL].ix[:split_loc, column] /= split_ratio
adj[AAPL].ix[:split_loc, 'volume'] *= split_ratio
window_lengths = [1, 2, 5, 10]
# length -> asset -> expected vwap
vwaps = {length: {} for length in window_lengths}
vwap_keys = {}
for length in window_lengths:
vwap_keys[length] = "vwap_%d" % length
for asset in AAPL, MSFT, BRK_A:
raw_vwap = rolling_vwap(raw[asset], length)
adj_vwap = rolling_vwap(adj[asset], length)
@@ -462,21 +460,32 @@ class FFCAlgorithmTestCase(TestCase):
decimal=2,
)
return vwaps
def test_handle_adjustment(self):
AAPL, MSFT, BRK_A = assets = self.AAPL, self.MSFT, self.BRK_A
window_lengths = [1, 2, 5, 10]
vwaps = self.compute_expected_vwaps(window_lengths)
def vwap_key(length):
return "vwap_%d" % length
def initialize(context):
pipeline = Pipeline('test')
context.vwaps = []
for length, key in iteritems(vwap_keys):
for length in vwaps:
context.vwaps.append(VWAP(window_length=length))
pipeline.add(context.vwaps[-1], name=key)
pipeline.add(context.vwaps[-1], name=vwap_key(length))
attach_pipeline(pipeline)
def handle_data(context, data):
today = get_datetime()
results = drain_pipeline('test')
for length, key in iteritems(vwap_keys):
for length in vwaps:
for asset in assets:
computed = results.loc[asset, key]
computed = results.loc[asset, vwap_key(length)]
expected = vwaps[length][asset].loc[today]
# Only having two places of precision here is a bit