Files
catalyst/tests/pipeline/test_slice.py
2017-06-19 14:43:10 -07:00

517 lines
20 KiB
Python

"""
Tests for slicing pipeline terms.
"""
from numpy import where
from pandas import Int64Index, Timestamp
from pandas.util.testing import assert_frame_equal
from catalyst.assets import Asset
from catalyst.errors import (
NonExistentAssetInTimeFrame,
NonSliceableTerm,
NonWindowSafeInput,
UnsupportedPipelineOutput,
)
from catalyst.pipeline import CustomFactor, Pipeline
from catalyst.pipeline.data import USEquityPricing
from catalyst.pipeline.data.testing import TestingDataSet
from catalyst.pipeline.factors.equity import (
Returns,
RollingLinearRegressionOfReturns,
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
SimpleMovingAverage,
)
from catalyst.testing import (
AssetID,
AssetIDPlusDay,
check_arrays,
OpenPrice,
parameter_space,
)
from catalyst.testing.fixtures import (
WithSeededRandomPipelineEngine,
ZiplineTestCase,
)
from catalyst.utils.numpy_utils import datetime64ns_dtype
class SliceTestCase(WithSeededRandomPipelineEngine, ZiplineTestCase):
sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3])
START_DATE = Timestamp('2015-01-31', tz='UTC')
END_DATE = Timestamp('2015-03-01', tz='UTC')
@classmethod
def init_class_fixtures(cls):
super(SliceTestCase, cls).init_class_fixtures()
# Using the date at index 14 as the start date because when running
# pipelines, especially those involving correlations or regressions, we
# want to make sure there are enough days to look back on. The end date
# at index 18 is chosen for convenience, as it makes for a contiguous
# five day span.
cls.pipeline_start_date = cls.trading_days[14]
cls.pipeline_end_date = cls.trading_days[18]
# Random input for factors.
cls.col = TestingDataSet.float_col
@parameter_space(my_asset_column=[0, 1, 2], window_length_=[1, 2, 3])
def test_slice(self, my_asset_column, window_length_):
"""
Test that slices can be created by indexing into a term, and that they
have the correct shape when used as inputs.
"""
sids = self.sids
my_asset = self.asset_finder.retrieve_asset(self.sids[my_asset_column])
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = window_length_
inputs = [returns, returns_slice]
def compute(self, today, assets, out, returns, returns_slice):
# Make sure that our slice is the correct shape (i.e. has only
# one column) and that it has the same values as the original
# returns factor from which it is derived.
assert returns_slice.shape == (self.window_length, 1)
assert returns.shape == (self.window_length, len(sids))
check_arrays(returns_slice[:, 0], returns[:, my_asset_column])
# Assertions about the expected slice data are made in the `compute`
# function of our custom factor above.
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
@parameter_space(unmasked_column=[0, 1, 2], slice_column=[0, 1, 2])
def test_slice_with_masking(self, unmasked_column, slice_column):
"""
Test that masking a factor that uses slices as inputs does not mask the
slice data.
"""
sids = self.sids
asset_finder = self.asset_finder
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
# Create a filter that masks out all but a single asset.
unmasked_asset = asset_finder.retrieve_asset(sids[unmasked_column])
unmasked_asset_only = (AssetID().eq(unmasked_asset.sid))
# Asset used to create our slice. In the cases where this is different
# than `unmasked_asset`, our slice should still have non-missing data
# when used as an input to our custom factor. That is, it should not be
# masked out.
slice_asset = asset_finder.retrieve_asset(sids[slice_column])
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[slice_asset]
returns_results = self.run_pipeline(
Pipeline(columns={'returns': returns}), start_date, end_date,
)
returns_results = returns_results['returns'].unstack()
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [returns, returns_slice]
def compute(self, today, assets, out, returns, returns_slice):
# Ensure that our mask correctly affects the `returns` input
# and does not affect the `returns_slice` input.
assert returns.shape == (1, 1)
assert returns_slice.shape == (1, 1)
assert returns[0, 0] == \
returns_results.loc[today, unmasked_asset]
assert returns_slice[0, 0] == \
returns_results.loc[today, slice_asset]
columns = {'masked': UsesSlicedInput(mask=unmasked_asset_only)}
# Assertions about the expected data are made in the `compute` function
# of our custom factor above.
self.run_pipeline(Pipeline(columns=columns), start_date, end_date)
def test_adding_slice_column(self):
"""
Test that slices cannot be added as a pipeline column.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
open_slice = OpenPrice()[my_asset]
with self.assertRaises(UnsupportedPipelineOutput):
Pipeline(columns={'open_slice': open_slice})
pipe = Pipeline(columns={})
with self.assertRaises(UnsupportedPipelineOutput):
pipe.add(open_slice, 'open_slice')
def test_loadable_term_slices(self):
"""
Test that slicing loadable terms raises the proper error.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
with self.assertRaises(NonSliceableTerm):
USEquityPricing.close[my_asset]
def test_non_existent_asset(self):
"""
Test that indexing into a term with a non-existent asset raises the
proper exception.
"""
my_asset = Asset(0, exchange="TEST")
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [returns_slice]
def compute(self, today, assets, out, returns_slice):
pass
with self.assertRaises(NonExistentAssetInTimeFrame):
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
def test_window_safety_of_slices(self):
"""
Test that slices correctly inherit the `window_safe` property of the
term from which they are derived.
"""
col = self.col
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
# SimpleMovingAverage is not window safe.
sma = SimpleMovingAverage(inputs=[self.col], window_length=10)
sma_slice = sma[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [sma_slice]
def compute(self, today, assets, out, sma_slice):
pass
with self.assertRaises(NonWindowSafeInput):
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
# Make sure that slices of custom factors are not window safe.
class MyUnsafeFactor(CustomFactor):
window_length = 1
inputs = [col]
def compute(self, today, assets, out, col):
pass
my_unsafe_factor = MyUnsafeFactor()
my_unsafe_factor_slice = my_unsafe_factor[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [my_unsafe_factor_slice]
def compute(self, today, assets, out, my_unsafe_factor_slice):
pass
with self.assertRaises(NonWindowSafeInput):
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
# Create a window safe factor.
class MySafeFactor(CustomFactor):
window_length = 1
inputs = [col]
window_safe = True
def compute(self, today, assets, out, col):
pass
my_safe_factor = MySafeFactor()
my_safe_factor_slice = my_safe_factor[my_asset]
# Make sure that correlations are not safe if either the factor *or*
# the target slice are not window safe.
with self.assertRaises(NonWindowSafeInput):
my_unsafe_factor.pearsonr(
target=my_safe_factor_slice, correlation_length=10,
)
with self.assertRaises(NonWindowSafeInput):
my_safe_factor.pearsonr(
target=my_unsafe_factor_slice, correlation_length=10,
)
def test_single_column_output(self):
"""
Tests for custom factors that compute a 1D out.
"""
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
cascading_mask = AssetIDPlusDay() < (self.sids[-1] + start_date.day)
class SingleColumnOutput(CustomFactor):
window_length = 1
inputs = [self.col]
window_safe = True
ndim = 1
def compute(self, today, assets, out, col):
# Because we specified ndim as 1, `out` should be a singleton
# array but `close` should be a regular sized input.
assert out.shape == (1,)
assert col.shape == (1, 3)
out[:] = col.sum()
# Since we cannot add single column output factors as pipeline
# columns, we have to test its output through another factor.
class UsesSingleColumnOutput(CustomFactor):
window_length = 1
inputs = [SingleColumnOutput()]
def compute(self, today, assets, out, single_column_output):
# Make sure that `single_column` has the correct shape. That
# is, it should always have one column regardless of any mask
# passed to `UsesSingleColumnInput`.
assert single_column_output.shape == (1, 1)
for mask in (alternating_mask, cascading_mask):
columns = {
'uses_single_column_output': UsesSingleColumnOutput(),
'uses_single_column_output_masked': UsesSingleColumnOutput(
mask=mask,
),
}
# Assertions about the expected shapes of our data are made in the
# `compute` function of our custom factors above.
self.run_pipeline(Pipeline(columns=columns), start_date, end_date)
def test_masked_single_column_output(self):
"""
Tests for masking custom factors that compute a 1D out.
"""
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
cascading_mask = AssetIDPlusDay() < (self.sids[-1] + start_date.day)
alternating_mask.window_safe = True
cascading_mask.window_safe = True
for mask in (alternating_mask, cascading_mask):
class SingleColumnOutput(CustomFactor):
window_length = 1
inputs = [self.col, mask]
window_safe = True
ndim = 1
def compute(self, today, assets, out, col, mask):
# Because we specified ndim as 1, `out` should always be a
# singleton array but `close` should be a sized based on
# the mask we passed.
assert out.shape == (1,)
assert col.shape == (1, mask.sum())
out[:] = col.sum()
# Since we cannot add single column output factors as pipeline
# columns, we have to test its output through another factor.
class UsesSingleColumnInput(CustomFactor):
window_length = 1
inputs = [self.col, mask, SingleColumnOutput(mask=mask)]
def compute(self,
today,
assets,
out,
col,
mask,
single_column_output):
# Make sure that `single_column` has the correct value
# based on the masked it used.
assert single_column_output.shape == (1, 1)
single_column_output_value = single_column_output[0][0]
expected_value = where(mask, col, 0).sum()
assert single_column_output_value == expected_value
columns = {'uses_single_column_input': UsesSingleColumnInput()}
# Assertions about the expected shapes of our data are made in the
# `compute` function of our custom factors above.
self.run_pipeline(Pipeline(columns=columns), start_date, end_date)
@parameter_space(returns_length=[2, 3], correlation_length=[3, 4])
def test_factor_correlation_methods(self,
returns_length,
correlation_length):
"""
Ensure that `Factor.pearsonr` and `Factor.spearmanr` are consistent
with the built-in factors `RollingPearsonOfReturns` and
`RollingSpearmanOfReturns`.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
pearson = returns.pearsonr(
target=returns_slice, correlation_length=correlation_length,
)
spearman = returns.spearmanr(
target=returns_slice, correlation_length=correlation_length,
)
expected_pearson = RollingPearsonOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
)
expected_spearman = RollingSpearmanOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
)
# These built-ins construct their own Returns factor to use as inputs,
# so the only way to set our own inputs is to do so after the fact.
# This should not be done in practice. It is necessary here because we
# want Returns to use our random data as an input, but by default it is
# using USEquityPricing.close.
expected_pearson.inputs = [returns, returns_slice]
expected_spearman.inputs = [returns, returns_slice]
columns = {
'pearson': pearson,
'spearman': spearman,
'expected_pearson': expected_pearson,
'expected_spearman': expected_spearman,
}
results = self.run_pipeline(
Pipeline(columns=columns),
self.pipeline_start_date,
self.pipeline_end_date,
)
pearson_results = results['pearson'].unstack()
spearman_results = results['spearman'].unstack()
expected_pearson_results = results['expected_pearson'].unstack()
expected_spearman_results = results['expected_spearman'].unstack()
assert_frame_equal(pearson_results, expected_pearson_results)
assert_frame_equal(spearman_results, expected_spearman_results)
# Make sure we cannot call the correlation methods on factors or slices
# of dtype `datetime64[ns]`.
class DateFactor(CustomFactor):
window_length = 1
inputs = []
dtype = datetime64ns_dtype
window_safe = True
def compute(self, today, assets, out):
pass
date_factor = DateFactor()
date_factor_slice = date_factor[my_asset]
with self.assertRaises(TypeError):
date_factor.pearsonr(
target=returns_slice, correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
date_factor.spearmanr(
target=returns_slice, correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
returns.pearsonr(
target=date_factor_slice,
correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
returns.pearsonr(
target=date_factor_slice,
correlation_length=correlation_length,
)
@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_factor_regression_method(self, returns_length, regression_length):
"""
Ensure that `Factor.linear_regression` is consistent with the built-in
factor `RollingLinearRegressionOfReturns`.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
regression = returns.linear_regression(
target=returns_slice, regression_length=regression_length,
)
expected_regression = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
)
# These built-ins construct their own Returns factor to use as inputs,
# so the only way to set our own inputs is to do so after the fact.
# This should not be done in practice. It is necessary here because we
# want Returns to use our random data as an input, but by default it is
# using USEquityPricing.close.
expected_regression.inputs = [returns, returns_slice]
columns = {
'regression': regression,
'expected_regression': expected_regression,
}
results = self.run_pipeline(
Pipeline(columns=columns),
self.pipeline_start_date,
self.pipeline_end_date,
)
regression_results = results['regression'].unstack()
expected_regression_results = results['expected_regression'].unstack()
assert_frame_equal(regression_results, expected_regression_results)
# Make sure we cannot call the linear regression method on factors or
# slices of dtype `datetime64[ns]`.
class DateFactor(CustomFactor):
window_length = 1
inputs = []
dtype = datetime64ns_dtype
window_safe = True
def compute(self, today, assets, out):
pass
date_factor = DateFactor()
date_factor_slice = date_factor[my_asset]
with self.assertRaises(TypeError):
date_factor.linear_regression(
target=returns_slice, regression_length=regression_length,
)
with self.assertRaises(TypeError):
returns.linear_regression(
target=date_factor_slice, regression_length=regression_length,
)