Files
catalyst/tests/pipeline/test_statistical.py
2017-06-19 14:43:10 -07:00

805 lines
32 KiB
Python

"""
Tests for statistical pipeline terms.
"""
from numpy import (
arange,
full,
full_like,
nan,
where,
)
from pandas import (
DataFrame,
date_range,
Int64Index,
Timestamp,
)
from pandas.util.testing import assert_frame_equal
from scipy.stats import linregress, pearsonr, spearmanr
from catalyst.assets import Equity
from catalyst.errors import IncompatibleTerms, NonExistentAssetInTimeFrame
from catalyst.pipeline import CustomFactor, Pipeline
from catalyst.pipeline.data import USEquityPricing
from catalyst.pipeline.data.testing import TestingDataSet
from catalyst.pipeline.engine import SimplePipelineEngine
from catalyst.pipeline.factors.equity import (
Returns,
RollingLinearRegressionOfReturns,
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
)
from catalyst.pipeline.loaders.frame import DataFrameLoader
from catalyst.pipeline.sentinels import NotSpecified
from catalyst.testing import (
AssetID,
AssetIDPlusDay,
check_arrays,
make_alternating_boolean_array,
make_cascading_boolean_array,
parameter_space,
)
from catalyst.testing.fixtures import (
WithSeededRandomPipelineEngine,
WithTradingEnvironment,
ZiplineTestCase,
)
from catalyst.utils.numpy_utils import (
bool_dtype,
datetime64ns_dtype,
float64_dtype,
)
class StatisticalBuiltInsTestCase(WithTradingEnvironment, ZiplineTestCase):
sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3])
START_DATE = Timestamp('2015-01-31', tz='UTC')
END_DATE = Timestamp('2015-03-01', tz='UTC')
@classmethod
def init_class_fixtures(cls):
super(StatisticalBuiltInsTestCase, cls).init_class_fixtures()
day = cls.trading_calendar.day
cls.dates = dates = date_range(
'2015-02-01', '2015-02-28', freq=day, tz='UTC',
)
# Using these start and end dates because they are a contigous span of
# 5 days (Monday - Friday) and they allow for plenty of days to look
# back on when computing correlations and regressions.
cls.start_date_index = start_date_index = 14
cls.end_date_index = end_date_index = 18
cls.pipeline_start_date = dates[start_date_index]
cls.pipeline_end_date = dates[end_date_index]
cls.num_days = num_days = end_date_index - start_date_index + 1
sids = cls.sids
cls.assets = assets = cls.asset_finder.retrieve_all(sids)
cls.my_asset_column = my_asset_column = 0
cls.my_asset = assets[my_asset_column]
cls.num_assets = num_assets = len(assets)
cls.raw_data = raw_data = DataFrame(
data=arange(len(dates) * len(sids), dtype=float64_dtype).reshape(
len(dates), len(sids),
),
index=dates,
columns=assets,
)
# Using mock 'close' data here because the correlation and regression
# built-ins use USEquityPricing.close as the input to their `Returns`
# factors. Since there is no way to change that when constructing an
# instance of these built-ins, we need to test with mock 'close' data
# to most accurately reflect their true behavior and results.
close_loader = DataFrameLoader(USEquityPricing.close, raw_data)
cls.run_pipeline = SimplePipelineEngine(
{USEquityPricing.close: close_loader}.__getitem__,
dates,
cls.asset_finder,
).run_pipeline
cls.cascading_mask = \
AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day)
cls.expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_days, num_assets),
)
cls.alternating_mask = (AssetIDPlusDay() % 2).eq(0)
cls.expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_days, num_assets),
)
cls.expected_no_mask_result = full(
shape=(num_days, num_assets), fill_value=True, dtype=bool_dtype,
)
@parameter_space(returns_length=[2, 3], correlation_length=[3, 4])
def test_correlation_factors(self, returns_length, correlation_length):
"""
Tests for the built-in factors `RollingPearsonOfReturns` and
`RollingSpearmanOfReturns`.
"""
assets = self.assets
my_asset = self.my_asset
my_asset_column = self.my_asset_column
dates = self.dates
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
start_date_index = self.start_date_index
end_date_index = self.end_date_index
num_days = self.num_days
run_pipeline = self.run_pipeline
returns = Returns(window_length=returns_length)
masks = (self.cascading_mask, self.alternating_mask, NotSpecified)
expected_mask_results = (
self.expected_cascading_mask_result,
self.expected_alternating_mask_result,
self.expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
pearson_factor = RollingPearsonOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
mask=mask,
)
spearman_factor = RollingSpearmanOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
mask=mask,
)
columns = {
'pearson_factor': pearson_factor,
'spearman_factor': spearman_factor,
}
pipeline = Pipeline(columns=columns)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = run_pipeline(pipeline, start_date, end_date)
pearson_results = results['pearson_factor'].unstack()
spearman_results = results['spearman_factor'].unstack()
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
# Run a separate pipeline that calculates returns starting
# (correlation_length - 1) days prior to our start date. This is
# because we need (correlation_length - 1) extra days of returns to
# compute our expected correlations.
results = run_pipeline(
Pipeline(columns={'returns': returns}),
dates[start_date_index - (correlation_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()
# On each day, calculate the expected correlation coefficients
# between the asset we are interested in and each other asset. Each
# correlation is calculated over `correlation_length` days.
expected_pearson_results = full_like(pearson_results, nan)
expected_spearman_results = full_like(spearman_results, nan)
for day in range(num_days):
todays_returns = returns_results.iloc[
day:day + correlation_length
]
my_asset_returns = todays_returns.iloc[:, my_asset_column]
for asset, other_asset_returns in todays_returns.iteritems():
asset_column = int(asset) - 1
expected_pearson_results[day, asset_column] = pearsonr(
my_asset_returns, other_asset_returns,
)[0]
expected_spearman_results[day, asset_column] = spearmanr(
my_asset_returns, other_asset_returns,
)[0]
expected_pearson_results = DataFrame(
data=where(expected_mask, expected_pearson_results, nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(pearson_results, expected_pearson_results)
expected_spearman_results = DataFrame(
data=where(expected_mask, expected_spearman_results, nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(spearman_results, expected_spearman_results)
@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_regression_of_returns_factor(self,
returns_length,
regression_length):
"""
Tests for the built-in factor `RollingLinearRegressionOfReturns`.
"""
assets = self.assets
my_asset = self.my_asset
my_asset_column = self.my_asset_column
dates = self.dates
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
start_date_index = self.start_date_index
end_date_index = self.end_date_index
num_days = self.num_days
run_pipeline = self.run_pipeline
# The order of these is meant to align with the output of `linregress`.
outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr']
returns = Returns(window_length=returns_length)
masks = self.cascading_mask, self.alternating_mask, NotSpecified
expected_mask_results = (
self.expected_cascading_mask_result,
self.expected_alternating_mask_result,
self.expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
regression_factor = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
mask=mask,
)
columns = {
output: getattr(regression_factor, output)
for output in outputs
}
pipeline = Pipeline(columns=columns)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = run_pipeline(pipeline, start_date, end_date)
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
output_results = {}
expected_output_results = {}
for output in outputs:
output_results[output] = results[output].unstack()
expected_output_results[output] = full_like(
output_results[output], nan,
)
# Run a separate pipeline that calculates returns starting
# (regression_length - 1) days prior to our start date. This is
# because we need (regression_length - 1) extra days of returns to
# compute our expected regressions.
results = run_pipeline(
Pipeline(columns={'returns': returns}),
dates[start_date_index - (regression_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()
# On each day, calculate the expected regression results for Y ~ X
# where Y is the asset we are interested in and X is each other
# asset. Each regression is calculated over `regression_length`
# days of data.
for day in range(num_days):
todays_returns = returns_results.iloc[
day:day + regression_length
]
my_asset_returns = todays_returns.iloc[:, my_asset_column]
for asset, other_asset_returns in todays_returns.iteritems():
asset_column = int(asset) - 1
expected_regression_results = linregress(
y=other_asset_returns, x=my_asset_returns,
)
for i, output in enumerate(outputs):
expected_output_results[output][day, asset_column] = \
expected_regression_results[i]
for output in outputs:
output_result = output_results[output]
expected_output_result = DataFrame(
where(expected_mask, expected_output_results[output], nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(output_result, expected_output_result)
def test_correlation_and_regression_with_bad_asset(self):
"""
Test that `RollingPearsonOfReturns`, `RollingSpearmanOfReturns` and
`RollingLinearRegressionOfReturns` raise the proper exception when
given a nonexistent target asset.
"""
my_asset = Equity(0, exchange="TEST")
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
run_pipeline = self.run_pipeline
# This filter is arbitrary; the important thing is that we test each
# factor both with and without a specified mask.
my_asset_filter = AssetID().eq(1)
for mask in (NotSpecified, my_asset_filter):
pearson_factor = RollingPearsonOfReturns(
target=my_asset,
returns_length=3,
correlation_length=3,
mask=mask,
)
spearman_factor = RollingSpearmanOfReturns(
target=my_asset,
returns_length=3,
correlation_length=3,
mask=mask,
)
regression_factor = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=3,
regression_length=3,
mask=mask,
)
with self.assertRaises(NonExistentAssetInTimeFrame):
run_pipeline(
Pipeline(columns={'pearson_factor': pearson_factor}),
start_date,
end_date,
)
with self.assertRaises(NonExistentAssetInTimeFrame):
run_pipeline(
Pipeline(columns={'spearman_factor': spearman_factor}),
start_date,
end_date,
)
with self.assertRaises(NonExistentAssetInTimeFrame):
run_pipeline(
Pipeline(columns={'regression_factor': regression_factor}),
start_date,
end_date,
)
def test_require_length_greater_than_one(self):
my_asset = Equity(0, exchange="TEST")
with self.assertRaises(ValueError):
RollingPearsonOfReturns(
target=my_asset,
returns_length=3,
correlation_length=1,
)
with self.assertRaises(ValueError):
RollingSpearmanOfReturns(
target=my_asset,
returns_length=3,
correlation_length=1,
)
with self.assertRaises(ValueError):
RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=3,
regression_length=1,
)
class StatisticalMethodsTestCase(WithSeededRandomPipelineEngine,
ZiplineTestCase):
sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3])
START_DATE = Timestamp('2015-01-31', tz='UTC')
END_DATE = Timestamp('2015-03-01', tz='UTC')
@classmethod
def init_class_fixtures(cls):
super(StatisticalMethodsTestCase, cls).init_class_fixtures()
# Using these start and end dates because they are a contigous span of
# 5 days (Monday - Friday) and they allow for plenty of days to look
# back on when computing correlations and regressions.
cls.dates = dates = cls.trading_days
cls.start_date_index = start_date_index = 14
cls.end_date_index = end_date_index = 18
cls.pipeline_start_date = cls.trading_days[start_date_index]
cls.pipeline_end_date = cls.trading_days[end_date_index]
sids = cls.sids
cls.assets = assets = cls.asset_finder.retrieve_all(sids)
cls.my_asset_column = my_asset_column = 0
cls.my_asset = assets[my_asset_column]
cls.num_days = num_days = end_date_index - start_date_index + 1
cls.num_assets = num_assets = len(assets)
cls.cascading_mask = \
AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day)
cls.expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_days, num_assets),
)
cls.alternating_mask = (AssetIDPlusDay() % 2).eq(0)
cls.expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_days, num_assets),
)
cls.expected_no_mask_result = full(
shape=(num_days, num_assets), fill_value=True, dtype=bool_dtype,
)
# Random input for factors.
cls.col = TestingDataSet.float_col
@parameter_space(returns_length=[2, 3], correlation_length=[3, 4])
def test_factor_correlation_methods(self,
returns_length,
correlation_length):
"""
Ensure that `Factor.pearsonr` and `Factor.spearmanr` are consistent
with the built-in factors `RollingPearsonOfReturns` and
`RollingSpearmanOfReturns`.
"""
my_asset = self.my_asset
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
run_pipeline = self.run_pipeline
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
pearson = returns.pearsonr(
target=returns_slice, correlation_length=correlation_length,
)
spearman = returns.spearmanr(
target=returns_slice, correlation_length=correlation_length,
)
expected_pearson = RollingPearsonOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
)
expected_spearman = RollingSpearmanOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
)
# These built-ins construct their own Returns factor to use as inputs,
# so the only way to set our own inputs is to do so after the fact.
# This should not be done in practice. It is necessary here because we
# want Returns to use our random data as an input, but by default it is
# using USEquityPricing.close.
expected_pearson.inputs = [returns, returns_slice]
expected_spearman.inputs = [returns, returns_slice]
columns = {
'pearson': pearson,
'spearman': spearman,
'expected_pearson': expected_pearson,
'expected_spearman': expected_spearman,
}
results = run_pipeline(Pipeline(columns=columns), start_date, end_date)
pearson_results = results['pearson'].unstack()
spearman_results = results['spearman'].unstack()
expected_pearson_results = results['expected_pearson'].unstack()
expected_spearman_results = results['expected_spearman'].unstack()
assert_frame_equal(pearson_results, expected_pearson_results)
assert_frame_equal(spearman_results, expected_spearman_results)
def test_correlation_methods_bad_type(self):
"""
Make sure we cannot call the Factor correlation methods on factors or
slices that are not of float or int dtype.
"""
# These are arbitrary for the purpose of this test.
returns_length = 2
correlation_length = 10
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[self.my_asset]
class BadTypeFactor(CustomFactor):
inputs = []
window_length = 1
dtype = datetime64ns_dtype
window_safe = True
def compute(self, today, assets, out):
pass
bad_type_factor = BadTypeFactor()
bad_type_factor_slice = bad_type_factor[self.my_asset]
with self.assertRaises(TypeError):
bad_type_factor.pearsonr(
target=returns_slice, correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
bad_type_factor.spearmanr(
target=returns_slice, correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
returns.pearsonr(
target=bad_type_factor_slice,
correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
returns.spearmanr(
target=bad_type_factor_slice,
correlation_length=correlation_length,
)
@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_factor_regression_method(self, returns_length, regression_length):
"""
Ensure that `Factor.linear_regression` is consistent with the built-in
factor `RollingLinearRegressionOfReturns`.
"""
my_asset = self.my_asset
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
run_pipeline = self.run_pipeline
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
regression = returns.linear_regression(
target=returns_slice, regression_length=regression_length,
)
expected_regression = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
)
# This built-in constructs its own Returns factor to use as an input,
# so the only way to set our own input is to do so after the fact. This
# should not be done in practice. It is necessary here because we want
# Returns to use our random data as an input, but by default it is
# using USEquityPricing.close.
expected_regression.inputs = [returns, returns_slice]
columns = {
'regression': regression,
'expected_regression': expected_regression,
}
results = run_pipeline(Pipeline(columns=columns), start_date, end_date)
regression_results = results['regression'].unstack()
expected_regression_results = results['expected_regression'].unstack()
assert_frame_equal(regression_results, expected_regression_results)
def test_regression_method_bad_type(self):
"""
Make sure we cannot call the Factor linear regression method on factors
or slices that are not of float or int dtype.
"""
# These are arbitrary for the purpose of this test.
returns_length = 2
regression_length = 10
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[self.my_asset]
class BadTypeFactor(CustomFactor):
window_length = 1
inputs = []
dtype = datetime64ns_dtype
window_safe = True
def compute(self, today, assets, out):
pass
bad_type_factor = BadTypeFactor()
bad_type_factor_slice = bad_type_factor[self.my_asset]
with self.assertRaises(TypeError):
bad_type_factor.linear_regression(
target=returns_slice, regression_length=regression_length,
)
with self.assertRaises(TypeError):
returns.linear_regression(
target=bad_type_factor_slice,
regression_length=regression_length,
)
@parameter_space(correlation_length=[2, 3, 4])
def test_factor_correlation_methods_two_factors(self, correlation_length):
"""
Tests for `Factor.pearsonr` and `Factor.spearmanr` when passed another
2D factor instead of a Slice.
"""
assets = self.assets
dates = self.dates
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
start_date_index = self.start_date_index
end_date_index = self.end_date_index
num_days = self.num_days
run_pipeline = self.run_pipeline
# Ensure that the correlation methods cannot be called with two 2D
# factors which have different masks.
returns_masked_1 = Returns(
window_length=5, inputs=[self.col], mask=AssetID().eq(1),
)
returns_masked_2 = Returns(
window_length=5, inputs=[self.col], mask=AssetID().eq(2),
)
with self.assertRaises(IncompatibleTerms):
returns_masked_1.pearsonr(
target=returns_masked_2, correlation_length=correlation_length,
)
with self.assertRaises(IncompatibleTerms):
returns_masked_1.spearmanr(
target=returns_masked_2, correlation_length=correlation_length,
)
returns_5 = Returns(window_length=5, inputs=[self.col])
returns_10 = Returns(window_length=10, inputs=[self.col])
pearson_factor = returns_5.pearsonr(
target=returns_10, correlation_length=correlation_length,
)
spearman_factor = returns_5.spearmanr(
target=returns_10, correlation_length=correlation_length,
)
columns = {
'pearson_factor': pearson_factor,
'spearman_factor': spearman_factor,
}
pipeline = Pipeline(columns=columns)
results = run_pipeline(pipeline, start_date, end_date)
pearson_results = results['pearson_factor'].unstack()
spearman_results = results['spearman_factor'].unstack()
# Run a separate pipeline that calculates returns starting
# (correlation_length - 1) days prior to our start date. This is
# because we need (correlation_length - 1) extra days of returns to
# compute our expected correlations.
columns = {'returns_5': returns_5, 'returns_10': returns_10}
results = run_pipeline(
Pipeline(columns=columns),
dates[start_date_index - (correlation_length - 1)],
dates[end_date_index],
)
returns_5_results = results['returns_5'].unstack()
returns_10_results = results['returns_10'].unstack()
# On each day, calculate the expected correlation coefficients
# between each asset's 5 and 10 day rolling returns. Each correlation
# is calculated over `correlation_length` days.
expected_pearson_results = full_like(pearson_results, nan)
expected_spearman_results = full_like(spearman_results, nan)
for day in range(num_days):
todays_returns_5 = returns_5_results.iloc[
day:day + correlation_length
]
todays_returns_10 = returns_10_results.iloc[
day:day + correlation_length
]
for asset, asset_returns_5 in todays_returns_5.iteritems():
asset_column = int(asset) - 1
asset_returns_10 = todays_returns_10[asset]
expected_pearson_results[day, asset_column] = pearsonr(
asset_returns_5, asset_returns_10,
)[0]
expected_spearman_results[day, asset_column] = spearmanr(
asset_returns_5, asset_returns_10,
)[0]
expected_pearson_results = DataFrame(
data=expected_pearson_results,
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(pearson_results, expected_pearson_results)
expected_spearman_results = DataFrame(
data=expected_spearman_results,
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(spearman_results, expected_spearman_results)
@parameter_space(regression_length=[2, 3, 4])
def test_factor_regression_method_two_factors(self, regression_length):
"""
Tests for `Factor.linear_regression` when passed another 2D factor
instead of a Slice.
"""
assets = self.assets
dates = self.dates
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
start_date_index = self.start_date_index
end_date_index = self.end_date_index
num_days = self.num_days
run_pipeline = self.run_pipeline
# The order of these is meant to align with the output of `linregress`.
outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr']
# Ensure that the `linear_regression` method cannot be called with two
# 2D factors which have different masks.
returns_masked_1 = Returns(
window_length=5, inputs=[self.col], mask=AssetID().eq(1),
)
returns_masked_2 = Returns(
window_length=5, inputs=[self.col], mask=AssetID().eq(2),
)
with self.assertRaises(IncompatibleTerms):
returns_masked_1.linear_regression(
target=returns_masked_2, regression_length=regression_length,
)
returns_5 = Returns(window_length=5, inputs=[self.col])
returns_10 = Returns(window_length=10, inputs=[self.col])
regression_factor = returns_5.linear_regression(
target=returns_10, regression_length=regression_length,
)
columns = {
output: getattr(regression_factor, output)
for output in outputs
}
pipeline = Pipeline(columns=columns)
results = run_pipeline(pipeline, start_date, end_date)
output_results = {}
expected_output_results = {}
for output in outputs:
output_results[output] = results[output].unstack()
expected_output_results[output] = full_like(
output_results[output], nan,
)
# Run a separate pipeline that calculates returns starting
# (regression_length - 1) days prior to our start date. This is because
# we need (regression_length - 1) extra days of returns to compute our
# expected regressions.
columns = {'returns_5': returns_5, 'returns_10': returns_10}
results = run_pipeline(
Pipeline(columns=columns),
dates[start_date_index - (regression_length - 1)],
dates[end_date_index],
)
returns_5_results = results['returns_5'].unstack()
returns_10_results = results['returns_10'].unstack()
# On each day, for each asset, calculate the expected regression
# results of Y ~ X where Y is the asset's rolling 5 day returns and X
# is the asset's rolling 10 day returns. Each regression is calculated
# over `regression_length` days of data.
for day in range(num_days):
todays_returns_5 = returns_5_results.iloc[
day:day + regression_length
]
todays_returns_10 = returns_10_results.iloc[
day:day + regression_length
]
for asset, asset_returns_5 in todays_returns_5.iteritems():
asset_column = int(asset) - 1
asset_returns_10 = todays_returns_10[asset]
expected_regression_results = linregress(
y=asset_returns_5, x=asset_returns_10,
)
for i, output in enumerate(outputs):
expected_output_results[output][day, asset_column] = \
expected_regression_results[i]
for output in outputs:
output_result = output_results[output]
expected_output_result = DataFrame(
expected_output_results[output],
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(output_result, expected_output_result)