ENH: Add single-column input/output capabilities to pipeline terms

This commit is contained in:
dmichalowicz
2016-06-02 12:43:32 -04:00
parent e510cbbf7b
commit 393f82e81e
21 changed files with 1681 additions and 489 deletions
+4
View File
@@ -17,6 +17,10 @@ Enhancements
- Added support for non-float columns to Blaze-backed Pipeline
datasets (:issue:`1201`).
- Added :class:`zipline.pipeline.slice.Slice`, a new pipeline term designed to
extract a single column from another term. Slices can be created by indexing
into a term, keyed by asset. (:issue:`1267`)
Bug Fixes
~~~~~~~~~
+134 -130
View File
@@ -72,9 +72,14 @@ from zipline.pipeline.loaders.synthetic import (
make_bar_data,
expected_bar_values_2d,
)
from zipline.pipeline.term import NotSpecified
from zipline.pipeline.sentinels import NotSpecified
from zipline.testing import (
AssetID,
AssetIDPlusDay,
check_arrays,
make_alternating_boolean_array,
make_cascading_boolean_array,
OpenPrice,
parameter_space,
product_upper_triangle,
)
@@ -95,38 +100,6 @@ class RollingSumDifference(CustomFactor):
out[:] = (open - close).sum(axis=0)
class AssetID(CustomFactor):
"""
CustomFactor that returns the AssetID of each asset.
Useful for providing a Factor that produces a different value for each
asset.
"""
window_length = 1
# HACK: We currently decide whether to load or compute a Term based on the
# length of its inputs. This means we have to provide a dummy input.
inputs = [USEquityPricing.close]
def compute(self, today, assets, out, close):
out[:] = assets
class AssetIDPlusDay(CustomFactor):
window_length = 1
inputs = [USEquityPricing.close]
def compute(self, today, assets, out, close):
out[:] = assets + today.day
class OpenPrice(CustomFactor):
window_length = 1
inputs = [USEquityPricing.open]
def compute(self, today, assets, out, open):
out[:] = open
class MultipleOutputs(CustomFactor):
window_length = 1
inputs = [USEquityPricing.open, USEquityPricing.close]
@@ -421,6 +394,8 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase):
assets = self.assets
asset_ids = self.asset_ids
constants = self.constants
num_dates = len(dates)
num_assets = len(assets)
open = USEquityPricing.open
close = USEquityPricing.close
engine = SimplePipelineEngine(
@@ -435,19 +410,13 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase):
return DataFrame(expected_values, index=dates, columns=assets)
cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day)
expected_cascading_mask_result = array(
[[True, True, True, False],
[True, True, False, False],
[True, False, False, False]],
dtype=bool,
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_dates, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = array(
[[False, True, False, True],
[True, False, True, False],
[False, True, False, True]],
dtype=bool,
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_dates, num_assets), first_value=False,
)
masks = cascading_mask, alternating_mask
@@ -592,6 +561,8 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase):
assets = self.assets
asset_ids = self.asset_ids
constants = self.constants
num_dates = len(dates)
num_assets = len(assets)
open = USEquityPricing.open
close = USEquityPricing.close
engine = SimplePipelineEngine(
@@ -603,32 +574,17 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase):
return DataFrame(expected_values, index=dates, columns=assets)
cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day)
expected_cascading_mask_result = array(
[[True, True, True, False],
[True, True, False, False],
[True, False, False, False],
[False, False, False, False],
[False, False, False, False]],
dtype=bool,
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_dates, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = array(
[[False, True, False, True],
[True, False, True, False],
[False, True, False, True],
[True, False, True, False],
[False, True, False, True]],
dtype=bool,
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_dates, num_assets), first_value=False,
)
expected_no_mask_result = array(
[[True, True, True, True],
[True, True, True, True],
[True, True, True, True],
[True, True, True, True],
[True, True, True, True]],
dtype=bool,
expected_no_mask_result = full(
shape=(num_dates, num_assets), fill_value=True, dtype=bool,
)
masks = cascading_mask, alternating_mask, NotSpecified
@@ -1258,19 +1214,39 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
`RollingSpearmanOfReturns`.
"""
my_asset_column = 0
start_date_index = 6
end_date_index = 10
start_date_index = 14
end_date_index = 18
assets = self.asset_finder.retrieve_all(self.sids)
sids = self.sids
dates = self.dates
assets = self.asset_finder.retrieve_all(sids)
my_asset = assets[my_asset_column]
my_asset_filter = (AssetID() != (my_asset_column + 1))
num_days = end_date_index - start_date_index + 1
num_assets = len(assets)
# Our correlation factors require that their target asset is not
# filtered out, so make sure that masking out our target asset does not
# take effect. That is, a filter which filters out only our target
# asset should produce the same result as if no mask was passed at all.
for mask in (NotSpecified, my_asset_filter):
cascading_mask = \
AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_days, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_days, num_assets),
)
expected_no_mask_result = full(
shape=(num_days, num_assets), fill_value=True, dtype=bool,
)
masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
pearson_factor = RollingPearsonOfReturns(
target=my_asset,
returns_length=returns_length,
@@ -1284,18 +1260,23 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
mask=mask,
)
pipeline = Pipeline(
columns={
'pearson_factor': pearson_factor,
'spearman_factor': spearman_factor,
},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = self.engine.run_pipeline(
Pipeline(
columns={
'pearson_factor': pearson_factor,
'spearman_factor': spearman_factor,
},
),
self.dates[start_date_index],
self.dates[end_date_index],
pipeline, dates[start_date_index], dates[end_date_index],
)
pearson_results = results['pearson_factor'].unstack()
spearman_results = results['spearman_factor'].unstack()
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
# Run a separate pipeline that calculates returns starting
# (correlation_length - 1) days prior to our start date. This is
@@ -1304,8 +1285,8 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
returns = Returns(window_length=returns_length)
results = self.engine.run_pipeline(
Pipeline(columns={'returns': returns}),
self.dates[start_date_index - (correlation_length - 1)],
self.dates[end_date_index],
dates[start_date_index - (correlation_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()
@@ -1328,22 +1309,19 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
my_asset_returns, other_asset_returns,
)[0]
assert_frame_equal(
pearson_results,
DataFrame(
expected_pearson_results,
index=self.dates[start_date_index:end_date_index + 1],
columns=assets,
),
expected_pearson_results = DataFrame(
data=where(expected_mask, expected_pearson_results, nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(
spearman_results,
DataFrame(
expected_spearman_results,
index=self.dates[start_date_index:end_date_index + 1],
columns=assets,
),
assert_frame_equal(pearson_results, expected_pearson_results)
expected_spearman_results = DataFrame(
data=where(expected_mask, expected_spearman_results, nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(spearman_results, expected_spearman_results)
@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_regression_of_returns_factor(self,
@@ -1353,38 +1331,65 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
Tests for the built-in factor `RollingLinearRegressionOfReturns`.
"""
my_asset_column = 0
start_date_index = 6
end_date_index = 10
start_date_index = 14
end_date_index = 18
assets = self.asset_finder.retrieve_all(self.sids)
sids = self.sids
dates = self.dates
assets = self.asset_finder.retrieve_all(sids)
my_asset = assets[my_asset_column]
my_asset_filter = (AssetID() != (my_asset_column + 1))
num_days = end_date_index - start_date_index + 1
num_assets = len(assets)
cascading_mask = \
AssetIDPlusDay() < (sids[-1] + dates[start_date_index].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_days, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_days, num_assets),
)
expected_no_mask_result = full(
shape=(num_days, num_assets), fill_value=True, dtype=bool,
)
masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)
# The order of these is meant to align with the output of `linregress`.
outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr']
# Our regression factor requires that its target asset is not filtered
# out, so make sure that masking out our target asset does not take
# effect. That is, a filter which filters out only our target asset
# should produce the same result as if no mask was passed at all.
for mask in (NotSpecified, my_asset_filter):
for mask, expected_mask in zip(masks, expected_mask_results):
regression_factor = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
mask=mask,
)
results = self.engine.run_pipeline(
Pipeline(
columns={
output: getattr(regression_factor, output)
for output in outputs
},
),
self.dates[start_date_index],
self.dates[end_date_index],
pipeline = Pipeline(
columns={
output: getattr(regression_factor, output)
for output in outputs
},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = self.engine.run_pipeline(
pipeline, dates[start_date_index], dates[end_date_index],
)
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
output_results = {}
expected_output_results = {}
for output in outputs:
@@ -1393,15 +1398,15 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
output_results[output], nan,
)
# Run a separate pipeline that calculates returns starting 2 days
# prior to our start date. This is because we need
# (regression_length - 1) extra days of returns to compute our
# expected regressions.
# Run a separate pipeline that calculates returns starting
# (regression_length - 1) days prior to our start date. This is
# because we need (regression_length - 1) extra days of returns to
# compute our expected regressions.
returns = Returns(window_length=returns_length)
results = self.engine.run_pipeline(
Pipeline(columns={'returns': returns}),
self.dates[start_date_index - (regression_length - 1)],
self.dates[end_date_index],
dates[start_date_index - (regression_length - 1)],
dates[end_date_index],
)
returns_results = results['returns'].unstack()
@@ -1424,14 +1429,13 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
expected_regression_results[i]
for output in outputs:
assert_frame_equal(
output_results[output],
DataFrame(
expected_output_results[output],
index=self.dates[start_date_index:end_date_index + 1],
columns=assets,
),
output_result = output_results[output]
expected_output_result = DataFrame(
where(expected_mask, expected_output_results[output], nan),
index=dates[start_date_index:end_date_index + 1],
columns=assets,
)
assert_frame_equal(output_result, expected_output_result)
def test_correlation_and_regression_with_bad_asset(self):
"""
@@ -1439,8 +1443,8 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase):
`RollingLinearRegressionOfReturns` raise the proper exception when
given a nonexistent target asset.
"""
start_date_index = 6
end_date_index = 10
start_date_index = 14
end_date_index = 18
my_asset = Equity(0)
# This filter is arbitrary; the important thing is that we test each
+516
View File
@@ -0,0 +1,516 @@
"""
Tests for slicing pipeline terms.
"""
from numpy import where
from pandas import Int64Index, Timestamp
from pandas.util.testing import assert_frame_equal
from zipline.assets import Asset
from zipline.errors import (
NonExistentAssetInTimeFrame,
NonSliceableTerm,
NonWindowSafeInput,
UnsupportedPipelineOutput,
)
from zipline.pipeline import CustomFactor, Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.data.testing import TestingDataSet
from zipline.pipeline.factors import (
Returns,
RollingLinearRegressionOfReturns,
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
SimpleMovingAverage,
)
from zipline.testing import (
AssetID,
AssetIDPlusDay,
check_arrays,
OpenPrice,
parameter_space,
)
from zipline.testing.fixtures import (
WithSeededRandomPipelineEngine,
ZiplineTestCase,
)
from zipline.utils.numpy_utils import datetime64ns_dtype
class SliceTestCase(WithSeededRandomPipelineEngine, ZiplineTestCase):
sids = ASSET_FINDER_EQUITY_SIDS = Int64Index([1, 2, 3])
START_DATE = Timestamp('2015-01-31', tz='UTC')
END_DATE = Timestamp('2015-03-01', tz='UTC')
@classmethod
def init_class_fixtures(cls):
super(SliceTestCase, cls).init_class_fixtures()
# Using the date at index 14 as the start date because when running
# pipelines, especially those involving correlations or regressions, we
# want to make sure there are enough days to look back on. The end date
# at index 18 is chosen for convenience, as it makes for a contiguous
# five day span.
cls.pipeline_start_date = cls.trading_days[14]
cls.pipeline_end_date = cls.trading_days[18]
# Random input for factors.
cls.col = TestingDataSet.float_col
@parameter_space(my_asset_column=[0, 1, 2], window_length_=[1, 2, 3])
def test_slice(self, my_asset_column, window_length_):
"""
Test that slices can be created by indexing into a term, and that they
have the correct shape when used as inputs.
"""
sids = self.sids
my_asset = self.asset_finder.retrieve_asset(self.sids[my_asset_column])
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = window_length_
inputs = [returns, returns_slice]
def compute(self, today, assets, out, returns, returns_slice):
# Make sure that our slice is the correct shape (i.e. has only
# one column) and that it has the same values as the original
# returns factor from which it is derived.
assert returns_slice.shape == (self.window_length, 1)
assert returns.shape == (self.window_length, len(sids))
check_arrays(returns_slice[:, 0], returns[:, my_asset_column])
# Assertions about the expected slice data are made in the `compute`
# function of our custom factor above.
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
@parameter_space(unmasked_column=[0, 1, 2], slice_column=[0, 1, 2])
def test_slice_with_masking(self, unmasked_column, slice_column):
"""
Test that masking a factor that uses slices as inputs does not mask the
slice data.
"""
sids = self.sids
asset_finder = self.asset_finder
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
# Create a filter that masks out all but a single asset.
unmasked_asset = asset_finder.retrieve_asset(sids[unmasked_column])
unmasked_asset_only = (AssetID().eq(unmasked_asset.sid))
# Asset used to create our slice. In the cases where this is different
# than `unmasked_asset`, our slice should still have non-missing data
# when used as an input to our custom factor. That is, it should not be
# masked out.
slice_asset = asset_finder.retrieve_asset(sids[slice_column])
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[slice_asset]
returns_results = self.run_pipeline(
Pipeline(columns={'returns': returns}), start_date, end_date,
)
returns_results = returns_results['returns'].unstack()
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [returns, returns_slice]
def compute(self, today, assets, out, returns, returns_slice):
# Ensure that our mask correctly affects the `returns` input
# and does not affect the `returns_slice` input.
assert returns.shape == (1, 1)
assert returns_slice.shape == (1, 1)
assert returns[0, 0] == \
returns_results.loc[today, unmasked_asset]
assert returns_slice[0, 0] == \
returns_results.loc[today, slice_asset]
columns = {'masked': UsesSlicedInput(mask=unmasked_asset_only)}
# Assertions about the expected data are made in the `compute` function
# of our custom factor above.
self.run_pipeline(Pipeline(columns=columns), start_date, end_date)
def test_adding_slice_column(self):
"""
Test that slices cannot be added as a pipeline column.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
open_slice = OpenPrice()[my_asset]
with self.assertRaises(UnsupportedPipelineOutput):
Pipeline(columns={'open_slice': open_slice})
pipe = Pipeline(columns={})
with self.assertRaises(UnsupportedPipelineOutput):
pipe.add(open_slice, 'open_slice')
def test_loadable_term_slices(self):
"""
Test that slicing loadable terms raises the proper error.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
with self.assertRaises(NonSliceableTerm):
USEquityPricing.close[my_asset]
def test_non_existent_asset(self):
"""
Test that indexing into a term with a non-existent asset raises the
proper exception.
"""
my_asset = Asset(0)
returns = Returns(window_length=2, inputs=[self.col])
returns_slice = returns[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [returns_slice]
def compute(self, today, assets, out, returns_slice):
pass
with self.assertRaises(NonExistentAssetInTimeFrame):
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
def test_window_safety_of_slices(self):
"""
Test that slices correctly inherit the `window_safe` property of the
term from which they are derived.
"""
col = self.col
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
# SimpleMovingAverage is not window safe.
sma = SimpleMovingAverage(inputs=[self.col], window_length=10)
sma_slice = sma[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [sma_slice]
def compute(self, today, assets, out, sma_slice):
pass
with self.assertRaises(NonWindowSafeInput):
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
# Make sure that slices of custom factors are not window safe.
class MyUnsafeFactor(CustomFactor):
window_length = 1
inputs = [col]
def compute(self, today, assets, out, col):
pass
my_unsafe_factor = MyUnsafeFactor()
my_unsafe_factor_slice = my_unsafe_factor[my_asset]
class UsesSlicedInput(CustomFactor):
window_length = 1
inputs = [my_unsafe_factor_slice]
def compute(self, today, assets, out, my_unsafe_factor_slice):
pass
with self.assertRaises(NonWindowSafeInput):
self.run_pipeline(
Pipeline(columns={'uses_sliced_input': UsesSlicedInput()}),
self.pipeline_start_date,
self.pipeline_end_date,
)
# Create a window safe factor.
class MySafeFactor(CustomFactor):
window_length = 1
inputs = [col]
window_safe = True
def compute(self, today, assets, out, col):
pass
my_safe_factor = MySafeFactor()
my_safe_factor_slice = my_safe_factor[my_asset]
# Make sure that correlations are not safe if either the factor *or*
# the target slice are not window safe.
with self.assertRaises(NonWindowSafeInput):
my_unsafe_factor.pearsonr(
target=my_safe_factor_slice, correlation_length=10,
)
with self.assertRaises(NonWindowSafeInput):
my_safe_factor.pearsonr(
target=my_unsafe_factor_slice, correlation_length=10,
)
def test_single_column_output(self):
"""
Tests for custom factors that compute a 1D out.
"""
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
cascading_mask = AssetIDPlusDay() < (self.sids[-1] + start_date.day)
class SingleColumnOutput(CustomFactor):
window_length = 1
inputs = [self.col]
window_safe = True
ndim = 1
def compute(self, today, assets, out, col):
# Because we specified ndim as 1, `out` should be a singleton
# array but `close` should be a regular sized input.
assert out.shape == (1,)
assert col.shape == (1, 3)
out[:] = col.sum()
# Since we cannot add single column output factors as pipeline
# columns, we have to test its output through another factor.
class UsesSingleColumnOutput(CustomFactor):
window_length = 1
inputs = [SingleColumnOutput()]
def compute(self, today, assets, out, single_column_output):
# Make sure that `single_column` has the correct shape. That
# is, it should always have one column regardless of any mask
# passed to `UsesSingleColumnInput`.
assert single_column_output.shape == (1, 1)
for mask in (alternating_mask, cascading_mask):
columns = {
'uses_single_column_output': UsesSingleColumnOutput(),
'uses_single_column_output_masked': UsesSingleColumnOutput(
mask=mask,
),
}
# Assertions about the expected shapes of our data are made in the
# `compute` function of our custom factors above.
self.run_pipeline(Pipeline(columns=columns), start_date, end_date)
def test_masked_single_column_output(self):
"""
Tests for masking custom factors that compute a 1D out.
"""
start_date = self.pipeline_start_date
end_date = self.pipeline_end_date
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
cascading_mask = AssetIDPlusDay() < (self.sids[-1] + start_date.day)
alternating_mask.window_safe = True
cascading_mask.window_safe = True
for mask in (alternating_mask, cascading_mask):
class SingleColumnOutput(CustomFactor):
window_length = 1
inputs = [self.col, mask]
window_safe = True
ndim = 1
def compute(self, today, assets, out, col, mask):
# Because we specified ndim as 1, `out` should always be a
# singleton array but `close` should be a sized based on
# the mask we passed.
assert out.shape == (1,)
assert col.shape == (1, mask.sum())
out[:] = col.sum()
# Since we cannot add single column output factors as pipeline
# columns, we have to test its output through another factor.
class UsesSingleColumnInput(CustomFactor):
window_length = 1
inputs = [self.col, mask, SingleColumnOutput(mask=mask)]
def compute(self,
today,
assets,
out,
col,
mask,
single_column_output):
# Make sure that `single_column` has the correct value
# based on the masked it used.
assert single_column_output.shape == (1, 1)
single_column_output_value = single_column_output[0][0]
expected_value = where(mask, col, 0).sum()
assert single_column_output_value == expected_value
columns = {'uses_single_column_input': UsesSingleColumnInput()}
# Assertions about the expected shapes of our data are made in the
# `compute` function of our custom factors above.
self.run_pipeline(Pipeline(columns=columns), start_date, end_date)
@parameter_space(returns_length=[2, 3], correlation_length=[3, 4])
def test_factor_correlation_methods(self,
returns_length,
correlation_length):
"""
Ensure that `Factor.pearsonr` and `Factor.spearmanr` are consistent
with the built-in factors `RollingPearsonOfReturns` and
`RollingSpearmanOfReturns`.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
pearson = returns.pearsonr(
target=returns_slice, correlation_length=correlation_length,
)
spearman = returns.spearmanr(
target=returns_slice, correlation_length=correlation_length,
)
expected_pearson = RollingPearsonOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
)
expected_spearman = RollingSpearmanOfReturns(
target=my_asset,
returns_length=returns_length,
correlation_length=correlation_length,
)
# These built-ins construct their own Returns factor to use as inputs,
# so the only way to set our own inputs is to do so after the fact.
# This should not be done in practice. It is necessary here because we
# want Returns to use our random data as an input, but by default it is
# using USEquityPricing.close.
expected_pearson.inputs = [returns, returns_slice]
expected_spearman.inputs = [returns, returns_slice]
columns = {
'pearson': pearson,
'spearman': spearman,
'expected_pearson': expected_pearson,
'expected_spearman': expected_spearman,
}
results = self.run_pipeline(
Pipeline(columns=columns),
self.pipeline_start_date,
self.pipeline_end_date,
)
pearson_results = results['pearson'].unstack()
spearman_results = results['spearman'].unstack()
expected_pearson_results = results['expected_pearson'].unstack()
expected_spearman_results = results['expected_spearman'].unstack()
assert_frame_equal(pearson_results, expected_pearson_results)
assert_frame_equal(spearman_results, expected_spearman_results)
# Make sure we cannot call the correlation methods on factors or slices
# of dtype `datetime64[ns]`.
class DateFactor(CustomFactor):
window_length = 1
inputs = []
dtype = datetime64ns_dtype
window_safe = True
def compute(self, today, assets, out):
pass
date_factor = DateFactor()
date_factor_slice = date_factor[my_asset]
with self.assertRaises(TypeError):
date_factor.pearsonr(
target=returns_slice, correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
date_factor.spearmanr(
target=returns_slice, correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
returns.pearsonr(
target=date_factor_slice,
correlation_length=correlation_length,
)
with self.assertRaises(TypeError):
returns.pearsonr(
target=date_factor_slice,
correlation_length=correlation_length,
)
@parameter_space(returns_length=[2, 3], regression_length=[3, 4])
def test_factor_regression_method(self, returns_length, regression_length):
"""
Ensure that `Factor.linear_regression` is consistent with the built-in
factor `RollingLinearRegressionOfReturns`.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
returns = Returns(window_length=returns_length, inputs=[self.col])
returns_slice = returns[my_asset]
regression = returns.linear_regression(
target=returns_slice, regression_length=regression_length,
)
expected_regression = RollingLinearRegressionOfReturns(
target=my_asset,
returns_length=returns_length,
regression_length=regression_length,
)
# These built-ins construct their own Returns factor to use as inputs,
# so the only way to set our own inputs is to do so after the fact.
# This should not be done in practice. It is necessary here because we
# want Returns to use our random data as an input, but by default it is
# using USEquityPricing.close.
expected_regression.inputs = [returns, returns_slice]
columns = {
'regression': regression,
'expected_regression': expected_regression,
}
results = self.run_pipeline(
Pipeline(columns=columns),
self.pipeline_start_date,
self.pipeline_end_date,
)
regression_results = results['regression'].unstack()
expected_regression_results = results['expected_regression'].unstack()
assert_frame_equal(regression_results, expected_regression_results)
# Make sure we cannot call the linear regression method on factors or
# slices of dtype `datetime64[ns]`.
class DateFactor(CustomFactor):
window_length = 1
inputs = []
dtype = datetime64ns_dtype
window_safe = True
def compute(self, today, assets, out):
pass
date_factor = DateFactor()
date_factor_slice = date_factor[my_asset]
with self.assertRaises(TypeError):
date_factor.linear_regression(
target=returns_slice, regression_length=regression_length,
)
with self.assertRaises(TypeError):
returns.linear_regression(
target=date_factor_slice, regression_length=regression_length,
)
+31 -2
View File
@@ -5,6 +5,7 @@ from collections import Counter
from itertools import product
from unittest import TestCase
from zipline.assets import Asset
from zipline.errors import (
DTypeNotSpecified,
InvalidOutputName,
@@ -25,9 +26,10 @@ from zipline.pipeline import (
)
from zipline.pipeline.data import Column, DataSet
from zipline.pipeline.data.testing import TestingDataSet
from zipline.pipeline.factors import RecarrayField
from zipline.pipeline.term import AssetExists, NotSpecified
from zipline.pipeline.expression import NUMEXPR_MATH_FUNCS
from zipline.pipeline.factors import RecarrayField
from zipline.pipeline.sentinels import NotSpecified
from zipline.pipeline.term import AssetExists, Slice
from zipline.testing import parameter_space
from zipline.testing.predicates import assert_equal, assert_raises
from zipline.utils.numpy_utils import (
@@ -96,6 +98,18 @@ class MultipleOutputs(CustomFactor):
return
class GenericFilter(Filter):
dtype = bool_dtype
window_length = 0
inputs = []
class GenericClassifier(Classifier):
dtype = categorical_dtype
window_length = 0
inputs = []
def gen_equivalent_factors():
"""
Return an iterator of SomeFactor instances that should all be the same
@@ -268,6 +282,21 @@ class ObjectIdentityTestCase(TestCase):
self.assertIs(alpha, multiple_outputs.alpha)
self.assertIs(beta, multiple_outputs.beta)
def test_instance_caching_of_slices(self):
my_asset = Asset(1)
f = GenericCustomFactor()
f_slice = f[my_asset]
self.assertIs(f_slice, Slice(GenericCustomFactor(), my_asset))
f = GenericFilter()
f_slice = f[my_asset]
self.assertIs(f_slice, Slice(GenericFilter(), my_asset))
c = GenericClassifier()
c_slice = c[my_asset]
self.assertIs(c_slice, Slice(GenericClassifier(), my_asset))
def test_instance_non_caching(self):
f = SomeFactor()
+72 -1
View File
@@ -4,7 +4,15 @@ Tests for our testing utilities.
from itertools import product
from unittest import TestCase
from zipline.testing import parameter_space
from numpy import array, empty
from zipline.testing import (
check_arrays,
make_alternating_boolean_array,
make_cascading_boolean_array,
parameter_space,
)
from zipline.utils.numpy_utils import bool_dtype
class TestParameterSpace(TestCase):
@@ -38,3 +46,66 @@ class TestParameterSpace(TestCase):
# our {setUp,tearDown}Class won't be called if, for example,
# `parameter_space` returns None.
pass
class TestMakeBooleanArray(TestCase):
def test_make_alternating_boolean_array(self):
check_arrays(
make_alternating_boolean_array((3, 3)),
array(
[[True, False, True],
[False, True, False],
[True, False, True]]
),
)
check_arrays(
make_alternating_boolean_array((3, 3), first_value=False),
array(
[[False, True, False],
[True, False, True],
[False, True, False]]
),
)
check_arrays(
make_alternating_boolean_array((1, 3)),
array([[True, False, True]]),
)
check_arrays(
make_alternating_boolean_array((3, 1)),
array([[True], [False], [True]]),
)
check_arrays(
make_alternating_boolean_array((3, 0)),
empty((3, 0), dtype=bool_dtype),
)
def test_make_cascading_boolean_array(self):
check_arrays(
make_cascading_boolean_array((3, 3)),
array(
[[True, True, False],
[True, False, False],
[False, False, False]]
),
)
check_arrays(
make_cascading_boolean_array((3, 3), first_value=False),
array(
[[False, False, True],
[False, True, True],
[True, True, True]]
),
)
check_arrays(
make_cascading_boolean_array((1, 3)),
array([[True, True, False]]),
)
check_arrays(
make_cascading_boolean_array((3, 1)),
array([[False], [False], [False]]),
)
check_arrays(
make_cascading_boolean_array((3, 0)),
empty((3, 0), dtype=bool_dtype),
)
+19
View File
@@ -666,3 +666,22 @@ class ScheduleFunctionWithoutCalendar(ZiplineError):
"To use schedule_function, the TradingAlgorithm must be running on an "
"ExchangeTradingSchedule, rather than {schedule}."
)
class UnsupportedPipelineOutput(ZiplineError):
"""
Raised when a 1D term is added as a column to a pipeline.
"""
msg = (
"Cannot add column {column_name!r} with term {term}. Adding slices or "
"single-column-output terms as pipeline columns is not currently "
"supported."
)
class NonSliceableTerm(ZiplineError):
"""
Raised when attempting to index into a non-sliceable term, e.g. instances
of `zipline.pipeline.term.LoadableTerm`.
"""
msg = "Taking slices of {term} is not currently supported."
+2 -1
View File
@@ -10,7 +10,8 @@ from numpy import where, isnan, nan, zeros
from zipline.lib.labelarray import LabelArray
from zipline.lib.quantiles import quantiles
from zipline.pipeline.api_utils import restrict_to_dtype
from zipline.pipeline.term import ComputableTerm, NotSpecified
from zipline.pipeline.sentinels import NotSpecified
from zipline.pipeline.term import ComputableTerm
from zipline.utils.compat import unicode
from zipline.utils.input_validation import expect_types
from zipline.utils.numpy_utils import (
+1 -1
View File
@@ -10,10 +10,10 @@ from six import (
from zipline.pipeline.classifiers import Classifier, Latest as LatestClassifier
from zipline.pipeline.factors import Factor, Latest as LatestFactor
from zipline.pipeline.filters import Filter, Latest as LatestFilter
from zipline.pipeline.sentinels import NotSpecified
from zipline.pipeline.term import (
AssetExists,
LoadableTerm,
NotSpecified,
validate_dtype,
)
from zipline.utils.input_validation import ensure_dtype
+4 -1
View File
@@ -361,7 +361,10 @@ class SimplePipelineEngine(object):
assets,
mask,
)
assert(workspace[term].shape == mask.shape)
if term.ndim == 2:
assert workspace[term].shape == mask.shape
else:
assert workspace[term].shape == (mask.shape[0], 1)
out = {}
graph_extra_rows = graph.extra_rows
+5 -3
View File
@@ -8,6 +8,11 @@ from .events import (
BusinessDaysSincePreviousEvent,
BusinessDaysUntilNextEvent,
)
from .statistical import (
RollingLinearRegressionOfReturns,
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
)
from .technical import (
Aroon,
AverageDollarVolume,
@@ -19,9 +24,6 @@ from .technical import (
FastStochasticOscillator,
MaxDrawdown,
Returns,
RollingLinearRegressionOfReturns,
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
RSI,
SimpleMovingAverage,
VWAP,
+200 -13
View File
@@ -12,19 +12,6 @@ from zipline.lib.normalize import naive_grouped_rowwise_apply
from zipline.lib.rank import masked_rankdata_2d
from zipline.pipeline.api_utils import restrict_to_dtype
from zipline.pipeline.classifiers import Classifier, Everything, Quantiles
from zipline.pipeline.mixins import (
CustomTermMixin,
LatestMixin,
PositiveWindowLengthMixin,
RestrictedDTypeMixin,
SingleInputMixin,
)
from zipline.pipeline.term import (
ComputableTerm,
NotSpecified,
NotSpecifiedType,
Term,
)
from zipline.pipeline.expression import (
BadBinaryOperator,
COMPARISONS,
@@ -42,6 +29,19 @@ from zipline.pipeline.filters import (
PercentileFilter,
NullFilter,
)
from zipline.pipeline.mixins import (
CustomTermMixin,
LatestMixin,
PositiveWindowLengthMixin,
RestrictedDTypeMixin,
SingleInputMixin,
)
from zipline.pipeline.sentinels import NotSpecified, NotSpecifiedType
from zipline.pipeline.term import (
ComputableTerm,
Slice,
Term,
)
from zipline.utils.functional import with_doc, with_name
from zipline.utils.input_validation import expect_types
from zipline.utils.math_utils import nanmean, nanstd
@@ -625,6 +625,193 @@ class Factor(RestrictedDTypeMixin, ComputableTerm):
"""
return Rank(self, method=method, ascending=ascending, mask=mask)
@expect_types(
target=Slice, correlation_length=int, mask=(Filter, NotSpecifiedType),
)
def pearsonr(self, target, correlation_length, mask=NotSpecified):
"""
Construct a new Factor that computes rolling pearson correlation
coefficients between `target` and the columns of `self`.
This method can only be called on factors which are deemed safe for use
as inputs to other factors. This includes `Returns` and any factors
created from `Factor.rank` or `Factor.zscore`.
Parameters
----------
target : zipline.pipeline.slice.Slice
The column of data with which to compute correlations against each
column of data produced by `self`.
correlation_length : int
Length of the lookback window over which to compute each
correlation coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should have their correlation with
the target slice computed each day.
Returns
-------
correlations : zipline.pipeline.factors.RollingPearson
A new Factor that will compute correlations between `target` and
the columns of `self`.
Example
-------
Suppose we want to create a factor that computes the correlation
between AAPL's 10-day returns and the 10-day returns of all other
assets, computing each correlation over 30 days. This can be achieved
by doing the following::
returns = Returns(window_length=10)
returns_slice = returns[Asset(24)]
aapl_correlations = returns.pearsonr(
target=returns_slice, correlation_length=30,
)
This is equivalent to doing::
aapl_correlations = RollingPearsonOfReturns(
target=Asset(24), returns_length=10, correlation_length=30,
)
See Also
--------
:func:`scipy.stats.pearsonr`
:class:`zipline.pipeline.factors.RollingPearsonOfReturns`
:meth:`Factor.spearmanr`
"""
from .statistical import RollingPearson
return RollingPearson(
target_factor=self,
target_slice=target,
correlation_length=correlation_length,
mask=mask,
)
@expect_types(
target=Slice, correlation_length=int, mask=(Filter, NotSpecifiedType),
)
def spearmanr(self, target, correlation_length, mask=NotSpecified):
"""
Construct a new Factor that computes rolling spearman rank correlation
coefficients between `target` and the columns of `self`.
This method can only be called on factors which are deemed safe for use
as inputs to other factors. This includes `Returns` and any factors
created from `Factor.rank` or `Factor.zscore`.
Parameters
----------
target : zipline.pipeline.slice.Slice
The column of data with which to compute correlations against each
column of data produced by `self`.
correlation_length : int
Length of the lookback window over which to compute each
correlation coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should have their correlation with
the target slice computed each day.
Returns
-------
correlations : zipline.pipeline.factors.RollingSpearman
A new Factor that will compute correlations between `target` and
the columns of `self`.
Example
-------
Suppose we want to create a factor that computes the correlation
between AAPL's 10-day returns and the 10-day returns of all other
assets, computing each correlation over 30 days. This can be achieved
by doing the following::
returns = Returns(window_length=10)
returns_slice = returns[Asset(24)]
aapl_correlations = returns.spearmanr(
target=returns_slice, correlation_length=30,
)
This is equivalent to doing::
aapl_correlations = RollingSpearmanOfReturns(
target=Asset(24), returns_length=10, correlation_length=30,
)
See Also
--------
:func:`scipy.stats.spearmanr`
:class:`zipline.pipeline.factors.RollingSpearmanOfReturns`
:meth:`Factor.pearsonr`
"""
from .statistical import RollingSpearman
return RollingSpearman(
target_factor=self,
target_slice=target,
correlation_length=correlation_length,
mask=mask,
)
@expect_types(
target=Slice, regression_length=int, mask=(Filter, NotSpecifiedType),
)
def linear_regression(self, target, regression_length, mask=NotSpecified):
"""
Construct a new Factor that performs an ordinary least-squares
regression predicting the columns of `self` from `target`.
This method can only be called on factors which are deemed safe for use
as inputs to other factors. This includes `Returns` and any factors
created from `Factor.rank` or `Factor.zscore`.
Parameters
----------
target : zipline.pipeline.slice.Slice
The column of data to use as the predictor/independent variable in
each regression.
correlation_length : int
Length of the lookback window over which to compute each
regression.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should be regressed with the
target slice each day.
Returns
-------
regressions : zipline.pipeline.factors.RollingLinearRegression
A new Factor that will compute linear regressions of `target`
against the columns of `self`.
Example
-------
Suppose we want to create a factor that regresses AAPL's 10-day returns
against the 10-day returns of all other assets, computing each
regression over 30 days. This can be achieved by doing the following::
returns = Returns(window_length=10)
returns_slice = returns[Asset(24)]
aapl_regressions = returns.linear_regression(
target=returns_slice, regression_length=30,
)
This is equivalent to doing::
aapl_regressions = RollingLinearRegressionOfReturns(
target=Asset(24), returns_length=10, regression_length=30,
)
See Also
--------
:func:`scipy.stats.linregress`
:class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns`
"""
from .statistical import RollingLinearRegression
return RollingLinearRegression(
target_factor=self,
target_slice=target,
regression_length=regression_length,
mask=mask,
)
@expect_types(bins=int, mask=(Filter, NotSpecifiedType))
def quantiles(self, bins, mask=NotSpecified):
"""
+435
View File
@@ -0,0 +1,435 @@
from scipy.stats import (
linregress,
pearsonr,
spearmanr,
)
from zipline.pipeline.factors import CustomFactor
from zipline.pipeline.filters import SingleAsset
from zipline.pipeline.mixins import SingleInputMixin
from zipline.pipeline.sentinels import NotSpecified
from zipline.pipeline.term import AssetExists
from zipline.utils.input_validation import expect_dtypes
from zipline.utils.numpy_utils import float64_dtype, int64_dtype
from .technical import Returns
ALLOWED_DTYPES = (float64_dtype, int64_dtype)
class _RollingCorrelation(CustomFactor, SingleInputMixin):
@expect_dtypes(target_factor=ALLOWED_DTYPES, target_slice=ALLOWED_DTYPES)
def __new__(cls,
target_factor,
target_slice,
correlation_length,
mask=NotSpecified):
return super(_RollingCorrelation, cls).__new__(
cls,
inputs=[target_factor, target_slice],
window_length=correlation_length,
mask=mask,
)
class RollingPearson(_RollingCorrelation):
"""
A Factor that computes pearson correlation coefficients between a single
column of data and the columns of another Factor.
Parameters
----------
target_factor : zipline.pipeline.factors.Factor
The factor for which to compute correlations of each of its columns
with `target_slice`.
target_slice : zipline.pipeline.slice.Slice
The column of data with which to compute correlations against each
column of data produced by `target_factor`.
correlation_length : int
Length of the lookback window over which to compute each
correlation coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets (columns) of `target_factor` should
have their correlation with `target_slice` computed each day.
See Also
--------
:func:`scipy.stats.pearsonr`
:meth:`Factor.pearsonr`
:class:`zipline.pipeline.factors.RollingPearsonOfReturns`
Notes
-----
Most users should call Factor.pearsonr rather than directly construct an
instance of this class.
"""
def compute(self, today, assets, out, factor_data, slice_data):
slice_data_column = slice_data[:, 0]
for i in range(len(out)):
# pearsonr returns the R-value and the P-value.
out[i] = pearsonr(factor_data[:, i], slice_data_column)[0]
class RollingSpearman(_RollingCorrelation):
"""
A Factor that computes spearman rank correlation coefficients between a
single column of data and the columns of another Factor.
Parameters
----------
target_factor : zipline.pipeline.factors.Factor
The factor for which to compute correlations of each of its columns
with `target_slice`.
target_slice : zipline.pipeline.slice.Slice
The column of data with which to compute correlations against each
column of data produced by `target_factor`.
correlation_length : int
Length of the lookback window over which to compute each
correlation coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets (columns) of `target_factor` should
have their correlation with `target_slice` computed each day.
See Also
--------
:func:`scipy.stats.spearmanr`
:meth:`Factor.spearmanr`
:class:`zipline.pipeline.factors.RollingSpearmanOfReturns`
Notes
-----
Most users should call Factor.spearmanr rather than directly construct an
instance of this class.
"""
def compute(self, today, assets, out, factor_data, slice_data):
slice_data_column = slice_data[:, 0]
for i in range(len(out)):
# spearmanr returns the R-value and the P-value.
out[i] = spearmanr(factor_data[:, i], slice_data_column)[0]
class RollingLinearRegression(CustomFactor, SingleInputMixin):
"""
A Factor that performs an ordinary least-squares regression predicting the
columns of another Factor from a single column of data.
Parameters
----------
target_factor : zipline.pipeline.factors.Factor
The factor whose columns are the predicted/dependent variable of each
regression with `target_slice`.
target_slice : zipline.pipeline.slice.Slice
The column of data to use as the predictor/independent variable in
each regression with the columns of `target_factor`.
correlation_length : int
Length of the lookback window over which to compute each regression.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets (columns) of `target_factor` should be
regressed against `target_slice` each day.
See Also
--------
:func:`scipy.stats.linregress`
:meth:`Factor.linear_regression`
:class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns`
Notes
-----
Most users should call Factor.linear_regression rather than directly
construct an instance of this class.
"""
outputs = ['alpha', 'beta', 'r_value', 'p_value', 'stderr']
@expect_dtypes(target_factor=ALLOWED_DTYPES, target_slice=ALLOWED_DTYPES)
def __new__(cls,
target_factor,
target_slice,
regression_length,
mask=NotSpecified):
return super(RollingLinearRegression, cls).__new__(
cls,
inputs=[target_factor, target_slice],
window_length=regression_length,
mask=mask,
)
def compute(self, today, assets, out, factor_data, slice_data):
slice_data_column = slice_data[:, 0]
alpha = out.alpha
beta = out.beta
r_value = out.r_value
p_value = out.p_value
stderr = out.stderr
for i in range(len(out)):
regr_results = linregress(y=factor_data[:, i], x=slice_data_column)
# `linregress` returns its results in the following order:
# slope, intercept, r-value, p-value, stderr
alpha[i] = regr_results[1]
beta[i] = regr_results[0]
r_value[i] = regr_results[2]
p_value[i] = regr_results[3]
stderr[i] = regr_results[4]
class RollingPearsonOfReturns(RollingPearson):
"""
Calculates the Pearson product-moment correlation coefficient of the
returns of the given asset with the returns of all other assets.
Pearson correlation is what most people mean when they say "correlation
coefficient" or "R-value".
Parameters
----------
target : zipline.assets.Asset
The asset to correlate with all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
correlation_length : int >= 1
Length of the lookback window over which to compute each correlation
coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should have their correlation with the
target asset computed each day.
Note
----
Computing this factor over many assets can be time consuming. It is
recommended that a mask be used in order to limit the number of assets over
which correlations are computed.
Example
-------
Let the following be example 10-day returns for three different assets::
SPY MSFT FB
2017-03-13 -.03 .03 .04
2017-03-14 -.02 -.03 .02
2017-03-15 -.01 .02 .01
2017-03-16 0 -.02 .01
2017-03-17 .01 .04 -.01
2017-03-20 .02 -.03 -.02
2017-03-21 .03 .01 -.02
2017-03-22 .04 -.02 -.02
Suppose we are interested in SPY's rolling returns correlation with each
stock from 2017-03-17 to 2017-03-22, using a 5-day look back window (that
is, we calculate each correlation coefficient over 5 days of data). We can
achieve this by doing::
rolling_correlations = RollingPearsonOfReturns(
target=Equity(8554),
returns_length=10,
correlation_length=5,
)
The result of computing ``rolling_correlations`` from 2017-03-17 to
2017-03-22 gives::
SPY MSFT FB
2017-03-17 1 .15 -.96
2017-03-20 1 .10 -.96
2017-03-21 1 -.16 -.94
2017-03-22 1 -.16 -.85
Note that the column for SPY is all 1's, as the correlation of any data
series with itself is always 1. To understand how each of the other values
were calculated, take for example the .15 in MSFT's column. This is the
correlation coefficient between SPY's returns looking back from 2017-03-17
(-.03, -.02, -.01, 0, .01) and MSFT's returns (.03, -.03, .02, -.02, .04).
See Also
--------
:class:`zipline.pipeline.factors.RollingSpearmanOfReturns`
:class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns`
"""
def __new__(cls,
target,
returns_length,
correlation_length,
mask=NotSpecified):
# Use the `SingleAsset` filter here because it protects against
# inputting a non-existent target asset.
returns = Returns(
window_length=returns_length,
mask=(AssetExists() | SingleAsset(asset=target)),
)
return super(RollingPearsonOfReturns, cls).__new__(
cls,
target_factor=returns,
target_slice=returns[target],
correlation_length=correlation_length,
mask=mask,
)
class RollingSpearmanOfReturns(RollingSpearman):
"""
Calculates the Spearman rank correlation coefficient of the returns of the
given asset with the returns of all other assets.
Parameters
----------
target : zipline.assets.Asset
The asset to correlate with all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
correlation_length : int >= 1
Length of the lookback window over which to compute each correlation
coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should have their correlation with the
target asset computed each day.
Note
----
Computing this factor over many assets can be time consuming. It is
recommended that a mask be used in order to limit the number of assets over
which correlations are computed.
See Also
--------
:class:`zipline.pipeline.factors.RollingPearsonOfReturns`
:class:`zipline.pipeline.factors.RollingLinearRegressionOfReturns`
"""
def __new__(cls,
target,
returns_length,
correlation_length,
mask=NotSpecified):
# Use the `SingleAsset` filter here because it protects against
# inputting a non-existent target asset.
returns = Returns(
window_length=returns_length,
mask=(AssetExists() | SingleAsset(asset=target)),
)
return super(RollingSpearmanOfReturns, cls).__new__(
cls,
target_factor=returns,
target_slice=returns[target],
correlation_length=correlation_length,
mask=mask,
)
class RollingLinearRegressionOfReturns(RollingLinearRegression):
"""
Perform an ordinary least-squares regression predicting the returns of all
other assets on the given asset.
Parameters
----------
target : zipline.assets.Asset
The asset to regress against all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
regression_length : int >= 1
Length of the lookback window over which to compute each regression.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should be regressed against the target
asset each day.
Notes
-----
Computing this factor over many assets can be time consuming. It is
recommended that a mask be used in order to limit the number of assets over
which regressions are computed.
This factor is designed to return five outputs:
- alpha, a factor that computes the intercepts of each regression.
- beta, a factor that computes the slopes of each regression.
- r_value, a factor that computes the correlation coefficient of each
regression.
- p_value, a factor that computes, for each regression, the two-sided
p-value for a hypothesis test whose null hypothesis is that the slope is
zero.
- stderr, a factor that computes the standard error of the estimate of each
regression.
For more help on factors with multiple outputs, see
:class:`zipline.pipeline.factors.CustomFactor`.
Example
-------
Let the following be example 10-day returns for three different assets::
SPY MSFT FB
2017-03-13 -.03 .03 .04
2017-03-14 -.02 -.03 .02
2017-03-15 -.01 .02 .01
2017-03-16 0 -.02 .01
2017-03-17 .01 .04 -.01
2017-03-20 .02 -.03 -.02
2017-03-21 .03 .01 -.02
2017-03-22 .04 -.02 -.02
Suppose we are interested in predicting each stock's returns from SPY's
over rolling 5-day look back windows. We can compute rolling regression
coefficients (alpha and beta) from 2017-03-17 to 2017-03-22 by doing::
regression_factor = RollingRegressionOfReturns(
target=Equity(8554),
returns_length=10,
regression_length=5,
)
alpha = regression_factor.alpha
beta = regression_factor.beta
The result of computing ``alpha`` from 2017-03-17 to 2017-03-22 gives::
SPY MSFT FB
2017-03-17 0 .011 .003
2017-03-20 0 -.004 .004
2017-03-21 0 .007 .006
2017-03-22 0 .002 .008
And the result of computing ``beta`` from 2017-03-17 to 2017-03-22 gives::
SPY MSFT FB
2017-03-17 1 .3 -1.1
2017-03-20 1 .2 -1
2017-03-21 1 -.3 -1
2017-03-22 1 -.3 -.9
Note that SPY's column for alpha is all 0's and for beta is all 1's, as the
regression line of SPY with itself is simply the function y = x.
To understand how each of the other values were calculated, take for
example MSFT's ``alpha`` and ``beta`` values on 2017-03-17 (.011 and .3,
respectively). These values are the result of running a linear regression
predicting MSFT's returns from SPY's returns, using values starting at
2017-03-17 and looking back 5 days. That is, the regression was run with
x = [-.03, -.02, -.01, 0, .01] and y = [.03, -.03, .02, -.02, .04], and it
produced a slope of .3 and an intercept of .011.
See Also
--------
:class:`zipline.pipeline.factors.RollingPearsonOfReturns`
:class:`zipline.pipeline.factors.RollingSpearmanOfReturns`
"""
def __new__(cls,
target,
returns_length,
regression_length,
mask=NotSpecified):
# Use the `SingleAsset` filter here because it protects against
# inputting a non-existent target asset.
returns = Returns(
window_length=returns_length,
mask=(AssetExists() | SingleAsset(asset=target)),
)
return super(RollingLinearRegressionOfReturns, cls).__new__(
cls,
target_factor=returns,
target_slice=returns[target],
regression_length=regression_length,
mask=mask,
)
-303
View File
@@ -18,17 +18,13 @@ from numpy import (
isnan,
log,
NINF,
searchsorted,
sqrt,
sum as np_sum,
)
from numexpr import evaluate
from scipy.stats import linregress, pearsonr, spearmanr
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.filters import SingleAsset
from zipline.pipeline.mixins import SingleInputMixin
from zipline.pipeline.term import AssetExists, NotSpecified
from zipline.utils.numpy_utils import ignore_nanwarnings
from zipline.utils.input_validation import expect_types
from zipline.utils.math_utils import (
@@ -163,305 +159,6 @@ class AverageDollarVolume(CustomFactor):
out[:] = nanmean(close * volume, axis=0)
class _RollingCorrelationOfReturns(CustomFactor, SingleInputMixin):
"""
Base class for factors computing a rolling correlation over a window of
Returns.
Parameters
----------
target : zipline.assets.Asset
The asset to correlate with all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
correlation_length : int >= 1
Length of the lookback window over which to compute each correlation
coefficient.
"""
params = ['target']
def __new__(cls,
target,
returns_length,
correlation_length,
mask=NotSpecified,
**kwargs):
if mask is NotSpecified:
mask = AssetExists()
# Make sure we do not filter out the asset of interest.
mask = mask | SingleAsset(asset=target)
return super(_RollingCorrelationOfReturns, cls).__new__(
cls,
target=target,
inputs=[Returns(window_length=returns_length)],
window_length=correlation_length,
mask=mask,
**kwargs
)
class RollingPearsonOfReturns(_RollingCorrelationOfReturns):
"""
Calculates the Pearson product-moment correlation coefficient of the
returns of the given asset with the returns of all other assets.
Pearson correlation is what most people mean when they say "correlation
coefficient" or "R-value".
Parameters
----------
target : zipline.assets.Asset
The asset to correlate with all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
correlation_length : int >= 1
Length of the lookback window over which to compute each correlation
coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should have their correlation with the
target asset computed each day.
Note
----
Computing this factor over many assets can be time consuming. It is
recommended that a mask be used in order to limit the number of assets over
which correlations are computed.
Example
-------
Let the following be example 10-day returns for three different assets::
SPY MSFT FB
2017-03-13 -.03 .03 .04
2017-03-14 -.02 -.03 .02
2017-03-15 -.01 .02 .01
2017-03-16 0 -.02 .01
2017-03-17 .01 .04 -.01
2017-03-20 .02 -.03 -.02
2017-03-21 .03 .01 -.02
2017-03-22 .04 -.02 -.02
Suppose we are interested in SPY's rolling returns correlation with each
stock from 2017-03-17 to 2017-03-22, using a 5-day look back window (that
is, we calculate each correlation coefficient over 5 days of data). We can
achieve this by doing::
rolling_correlations = RollingPearsonOfReturns(
target=Equity(8554),
returns_length=10,
correlation_length=5,
)
The result of computing ``rolling_correlations`` from 2017-03-17 to
2017-03-22 gives::
SPY MSFT FB
2017-03-17 1 .15 -.96
2017-03-20 1 .10 -.96
2017-03-21 1 -.16 -.94
2017-03-22 1 -.16 -.85
Note that the column for SPY is all 1's, as the correlation of any data
series with itself is always 1. To understand how each of the other values
were calculated, take for example the .15 in MSFT's column. This is the
correlation coefficient between SPY's returns looking back from 2017-03-17
(-.03, -.02, -.01, 0, .01) and MSFT's returns (.03, -.03, .02, -.02, .04).
See Also
--------
:class:`zipline.pipeline.factors.technical.RollingSpearmanOfReturns`
:class:`zipline.pipeline.factors.technical.RollingLinearRegressionOfReturns`
"""
def compute(self, today, assets, out, data, target):
target_col = data[:, searchsorted(assets.values, target.sid)]
for i in range(len(out)):
# pearsonr returns the R-value and the P-value.
out[i] = pearsonr(data[:, i], target_col)[0]
class RollingSpearmanOfReturns(_RollingCorrelationOfReturns):
"""
Calculates the Spearman rank correlation coefficient of the returns of the
given asset with the returns of all other assets.
Parameters
----------
target : zipline.assets.Asset
The asset to correlate with all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
correlation_length : int >= 1
Length of the lookback window over which to compute each correlation
coefficient.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should have their correlation with the
target asset computed each day.
Note
----
Computing this factor over many assets can be time consuming. It is
recommended that a mask be used in order to limit the number of assets over
which correlations are computed.
See Also
--------
:class:`zipline.pipeline.factors.technical.RollingPearsonOfReturns`
:class:`zipline.pipeline.factors.technical.RollingLinearRegressionOfReturns`
"""
def compute(self, today, assets, out, data, target):
target_col = data[:, searchsorted(assets.values, target.sid)]
for i in range(len(out)):
# spearmanr returns the R-value and the P-value.
out[i] = spearmanr(data[:, i], target_col)[0]
class RollingLinearRegressionOfReturns(CustomFactor, SingleInputMixin):
"""
Perform an ordinary least-squares regression predicting the returns of all
other assets on the given asset.
Parameters
----------
target : zipline.assets.Asset
The asset to regress against all other assets.
returns_length : int >= 2
Length of the lookback window over which to compute returns. Daily
returns require a window length of 2.
regression_length : int >= 1
Length of the lookback window over which to compute each regression.
mask : zipline.pipeline.Filter, optional
A Filter describing which assets should be regressed against the target
asset each day.
Notes
-----
Computing this factor over many assets can be time consuming. It is
recommended that a mask be used in order to limit the number of assets over
which regressions are computed.
This factor is designed to return five outputs:
- alpha, a factor that computes the intercepts of each regression.
- beta, a factor that computes the slopes of each regression.
- r_value, a factor that computes the correlation coefficient of each
regression.
- p_value, a factor that computes, for each regression, the two-sided
p-value for a hypothesis test whose null hypothesis is that the slope is
zero.
- stderr, a factor that computes the standard error of the estimate of each
regression.
For more help on factors with multiple outputs, see
:class:`zipline.pipeline.factors.CustomFactor`.
Example
-------
Let the following be example 10-day returns for three different assets::
SPY MSFT FB
2017-03-13 -.03 .03 .04
2017-03-14 -.02 -.03 .02
2017-03-15 -.01 .02 .01
2017-03-16 0 -.02 .01
2017-03-17 .01 .04 -.01
2017-03-20 .02 -.03 -.02
2017-03-21 .03 .01 -.02
2017-03-22 .04 -.02 -.02
Suppose we are interested in predicting each stock's returns from SPY's
over rolling 5-day look back windows. We can compute rolling regression
coefficients (alpha and beta) from 2017-03-17 to 2017-03-22 by doing::
regression_factor = RollingRegressionOfReturns(
target=Equity(8554),
returns_length=10,
regression_length=5,
)
alpha = regression_factor.alpha
beta = regression_factor.beta
The result of computing ``alpha`` from 2017-03-17 to 2017-03-22 gives::
SPY MSFT FB
2017-03-17 0 .011 .003
2017-03-20 0 -.004 .004
2017-03-21 0 .007 .006
2017-03-22 0 .002 .008
And the result of computing ``beta`` from 2017-03-17 to 2017-03-22 gives::
SPY MSFT FB
2017-03-17 1 .3 -1.1
2017-03-20 1 .2 -1
2017-03-21 1 -.3 -1
2017-03-22 1 -.3 -.9
Note that SPY's column for alpha is all 0's and for beta is all 1's, as the
regression line of SPY with itself is simply the function y = x.
To understand how each of the other values were calculated, take for
example MSFT's ``alpha`` and ``beta`` values on 2017-03-17 (.011 and .3,
respectively). These values are the result of running a linear regression
predicting MSFT's returns from SPY's returns, using values starting at
2017-03-17 and looking back 5 days. That is, the regression was run with
x = [-.03, -.02, -.01, 0, .01] and y = [.03, -.03, .02, -.02, .04], and it
produced a slope of .3 and an intercept of .011.
See Also
--------
:class:`zipline.pipeline.factors.technical.RollingPearsonOfReturns`
:class:`zipline.pipeline.factors.technical.RollingSpearmanOfReturns`
"""
outputs = ['alpha', 'beta', 'r_value', 'p_value', 'stderr']
params = ['target']
def __new__(cls,
target,
returns_length,
regression_length,
mask=NotSpecified,
**kwargs):
if mask is NotSpecified:
mask = AssetExists()
# Make sure we do not filter out the asset of interest.
mask = mask | SingleAsset(asset=target)
return super(RollingLinearRegressionOfReturns, cls).__new__(
cls,
target=target,
inputs=[Returns(window_length=returns_length)],
window_length=regression_length,
mask=mask,
**kwargs
)
def compute(self, today, assets, out, returns, target):
asset_col = searchsorted(assets.values, target.sid)
my_asset = returns[:, asset_col]
alpha = out.alpha
beta = out.beta
r_value = out.r_value
p_value = out.p_value
stderr = out.stderr
for i in range(len(out)):
other_asset = returns[:, i]
regr_results = linregress(y=other_asset, x=my_asset)
# `linregress` returns its results in the following order:
# slope, intercept, r-value, p-value, stderr
alpha[i] = regr_results[1]
beta[i] = regr_results[0]
r_value[i] = regr_results[2]
p_value[i] = regr_results[3]
stderr[i] = regr_results[4]
class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor):
"""
Base class for factors implementing exponential-weighted operations.
+7 -7
View File
@@ -4,12 +4,12 @@ filter.py
from itertools import chain
from operator import attrgetter
from numpy import (
float64,
nan,
nanpercentile,
)
from zipline.errors import (
BadPercentileBounds,
NonExistentAssetInTimeFrame,
@@ -17,6 +17,12 @@ from zipline.errors import (
)
from zipline.lib.labelarray import LabelArray
from zipline.lib.rank import is_missing
from zipline.pipeline.expression import (
BadBinaryOperator,
FILTER_BINOPS,
method_name_for_op,
NumericalExpression,
)
from zipline.pipeline.mixins import (
CustomTermMixin,
LatestMixin,
@@ -25,12 +31,6 @@ from zipline.pipeline.mixins import (
SingleInputMixin,
)
from zipline.pipeline.term import ComputableTerm, Term
from zipline.pipeline.expression import (
BadBinaryOperator,
FILTER_BINOPS,
method_name_for_op,
NumericalExpression,
)
from zipline.utils.input_validation import expect_types
from zipline.utils.numpy_utils import bool_dtype, repeat_first_axis
+1 -1
View File
@@ -179,7 +179,7 @@ from zipline.pipeline.loaders.utils import (
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
)
from zipline.pipeline.term import NotSpecified
from zipline.pipeline.sentinels import NotSpecified
from zipline.lib.adjusted_array import AdjustedArray, can_represent_dtype
from zipline.lib.adjustment import Float64Overwrite
from zipline.utils.input_validation import (
+33 -14
View File
@@ -1,12 +1,16 @@
"""
Mixins classes for use with Filters and Factors.
"""
from numpy import full, recarray
from numpy import (
array,
full,
recarray,
)
from zipline.utils.control_flow import nullctx
from zipline.errors import WindowLengthNotPositive, UnsupportedDataType
from .term import NotSpecified
from .sentinels import NotSpecified
class PositiveWindowLengthMixin(object):
@@ -148,29 +152,44 @@ class CustomTermMixin(object):
out = full(shape, missing_value, dtype=self.dtype)
return out
def _format_inputs(self, windows, column_mask):
inputs = []
for input_ in windows:
window = next(input_)
if window.shape[1] == 1:
# Do not mask single-column inputs.
inputs.append(window)
else:
inputs.append(window[:, column_mask])
return inputs
def _compute(self, windows, dates, assets, mask):
"""
Call the user's `compute` function on each window with a pre-built
output array.
"""
format_inputs = self._format_inputs
compute = self.compute
params = self.params
out = self._allocate_output(windows, mask.shape)
ndim = self.ndim
shape = (len(mask), 1) if ndim == 1 else mask.shape
out = self._allocate_output(windows, shape)
with self.ctx:
for idx, date in enumerate(dates):
col_mask = mask[idx]
masked_out = out[idx][col_mask]
masked_assets = assets[col_mask]
# Never apply a mask to 1D outputs.
out_mask = array([True]) if ndim == 1 else mask[idx]
compute(
date,
masked_assets,
masked_out,
*(next(w)[:, col_mask] for w in windows),
**params
)
out[idx][col_mask] = masked_out
# Mask our inputs as usual.
inputs_mask = mask[idx]
masked_assets = assets[inputs_mask]
out_row = out[idx][out_mask]
inputs = format_inputs(windows, inputs_mask)
compute(date, masked_assets, out_row, *inputs, **params)
out[idx][out_mask] = out_row
return out
def short_repr(self):
+13 -1
View File
@@ -1,3 +1,5 @@
from zipline.errors import UnsupportedPipelineOutput
from zipline.utils.input_validation import expect_types, optional
from .term import AssetExists, ComputableTerm, Term
@@ -34,10 +36,12 @@ class Pipeline(object):
screen=optional(Filter),
)
def __init__(self, columns=None, screen=None):
if columns is None:
columns = {}
validate_column = self.validate_column
for column_name, term in columns.items():
validate_column(column_name, term)
if not isinstance(term, ComputableTerm):
raise TypeError(
"Column {column_name!r} contains an invalid pipeline term "
@@ -45,6 +49,7 @@ class Pipeline(object):
column_name=column_name, term=term,
)
)
self._columns = columns
self._screen = screen
@@ -80,6 +85,8 @@ class Pipeline(object):
Whether to overwrite the existing entry if we already have a column
named `name`.
"""
self.validate_column(name, term)
columns = self.columns
if name in columns:
if overwrite:
@@ -178,3 +185,8 @@ class Pipeline(object):
return g.jpeg
else:
raise ValueError("Unknown graph format %r." % format)
@staticmethod
def validate_column(column_name, term):
if term.ndim == 1:
raise UnsupportedPipelineOutput(column_name=column_name, term=term)
+10
View File
@@ -0,0 +1,10 @@
from zipline.utils.sentinel import sentinel
NotSpecified = sentinel(
'NotSpecified',
'Singleton sentinel value used for Term defaults.',
)
NotSpecifiedType = type(NotSpecified)
+87 -11
View File
@@ -5,11 +5,20 @@ from abc import ABCMeta, abstractproperty
from bisect import insort
from weakref import WeakValueDictionary
from numpy import array, dtype as dtype_class, ndarray
from numpy import (
array,
dtype as dtype_class,
ndarray,
searchsorted,
)
from six import with_metaclass
from zipline.assets import Asset
from zipline.errors import (
DTypeNotSpecified,
InvalidOutputName,
NonExistentAssetInTimeFrame,
NonSliceableTerm,
NonWindowSafeInput,
NotDType,
TermInputsNotSpecified,
@@ -26,15 +35,9 @@ from zipline.utils.numpy_utils import (
categorical_dtype,
default_missing_value_for_dtype,
)
from zipline.utils.sentinel import sentinel
NotSpecified = sentinel(
'NotSpecified',
'Singleton sentinel value used for Term defaults.',
)
NotSpecifiedType = type(NotSpecified)
from .mixins import SingleInputMixin
from .sentinels import NotSpecified
class Term(with_metaclass(ABCMeta, object)):
@@ -53,6 +56,9 @@ class Term(with_metaclass(ABCMeta, object)):
# Determines if a term is safe to be used as a windowed input.
window_safe = False
# The dimensions of the term's output (1D or 2D).
ndim = 2
_term_cache = WeakValueDictionary()
def __new__(cls,
@@ -60,6 +66,7 @@ class Term(with_metaclass(ABCMeta, object)):
dtype=dtype,
missing_value=missing_value,
window_safe=NotSpecified,
ndim=NotSpecified,
# params is explicitly not allowed to be passed to an instance.
*args,
**kwargs):
@@ -81,6 +88,8 @@ class Term(with_metaclass(ABCMeta, object)):
dtype = cls.dtype
if missing_value is NotSpecified:
missing_value = cls.missing_value
if ndim is NotSpecified:
ndim = cls.ndim
if window_safe is NotSpecified:
window_safe = cls.window_safe
@@ -96,6 +105,7 @@ class Term(with_metaclass(ABCMeta, object)):
dtype=dtype,
missing_value=missing_value,
window_safe=window_safe,
ndim=ndim,
params=params,
*args, **kwargs
)
@@ -109,6 +119,7 @@ class Term(with_metaclass(ABCMeta, object)):
dtype=dtype,
missing_value=missing_value,
window_safe=window_safe,
ndim=ndim,
params=params,
*args, **kwargs
)
@@ -179,12 +190,19 @@ class Term(with_metaclass(ABCMeta, object)):
"""
pass
@expect_types(key=Asset)
def __getitem__(self, key):
if isinstance(self, LoadableTerm):
raise NonSliceableTerm(term=self)
return Slice(self, key)
@classmethod
def _static_identity(cls,
domain,
dtype,
missing_value,
window_safe,
ndim,
params):
"""
Return the identity of the Term that would be constructed from the
@@ -197,9 +215,9 @@ class Term(with_metaclass(ABCMeta, object)):
This is a classmethod so that it can be called from Term.__new__ to
determine whether to produce a new instance.
"""
return (cls, domain, dtype, missing_value, window_safe, params)
return (cls, domain, dtype, missing_value, window_safe, ndim, params)
def _init(self, domain, dtype, missing_value, window_safe, params):
def _init(self, domain, dtype, missing_value, window_safe, ndim, params):
"""
Parameters
----------
@@ -214,6 +232,7 @@ class Term(with_metaclass(ABCMeta, object)):
self.dtype = dtype
self.missing_value = missing_value
self.window_safe = window_safe
self.ndim = ndim
for name, value in params:
if hasattr(self, name):
@@ -500,6 +519,63 @@ class ComputableTerm(Term):
)
class Slice(ComputableTerm, SingleInputMixin):
"""
Term for extracting a single column of a another term's output.
Parameters
----------
term : zipline.pipeline.term.Term
The term from which to extract a column of data.
asset : zipline.assets.Asset
The asset corresponding to the column of `term` to be extracted.
Notes
-----
Users should rarely construct instances of `Slice` directly. Instead, they
should construct instances via indexing, e.g. `MyFactor()[Asset(24)]`.
"""
def __new__(cls, term, asset):
return super(Slice, cls).__new__(
cls,
asset=asset,
inputs=[term],
window_length=0,
mask=term.mask,
dtype=term.dtype,
missing_value=term.missing_value,
window_safe=term.window_safe,
ndim=1,
)
def __repr__(self):
return "{type}({parent_term}, column={asset})".format(
type=type(self).__name__,
parent_term=type(self.inputs[0]).__name__,
asset=self._asset,
)
def _init(self, asset, *args, **kwargs):
self._asset = asset
return super(Slice, self)._init(*args, **kwargs)
@classmethod
def _static_identity(cls, asset, *args, **kwargs):
return (super(Slice, cls)._static_identity(*args, **kwargs), asset)
def _compute(self, windows, dates, assets, mask):
asset = self._asset
asset_column = searchsorted(assets.values, asset.sid)
if assets[asset_column] != asset.sid:
raise NonExistentAssetInTimeFrame(
asset=asset, start_date=dates[0], end_date=dates[-1],
)
# Return a 2D array with one column rather than a 1D array of the
# column.
return windows[0][:, [asset_column]]
def validate_dtype(termname, dtype, missing_value):
"""
Validate a `dtype` and `missing_value` passed to Term.__new__.
+5
View File
@@ -1,9 +1,12 @@
from .core import ( # noqa
AssetID,
AssetIDPlusDay,
EPOCH,
ExplodingObject,
FakeDataPortal,
FetcherDataPortal,
MockDailyBarReader,
OpenPrice,
add_security_data,
all_pairs_matching_predicate,
all_subindices,
@@ -22,6 +25,8 @@ from .core import ( # noqa
empty_asset_finder,
empty_assets_db,
empty_trading_env,
make_alternating_boolean_array,
make_cascading_boolean_array,
make_test_handler,
make_trade_data_for_asset_info,
parameter_space,
+102
View File
@@ -42,7 +42,9 @@ from zipline.data.us_equity_pricing import (
from zipline.finance.trading import TradingEnvironment
from zipline.finance.order import ORDER_STATUS
from zipline.lib.labelarray import LabelArray
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.factors import CustomFactor
from zipline.pipeline.loaders.testing import make_seeded_random_loader
from zipline.utils import security_list
from zipline.utils.input_validation import expect_dimensions
@@ -1151,6 +1153,72 @@ def create_empty_splits_mergers_frame():
)
def make_alternating_boolean_array(shape, first_value=True):
"""
Create a 2D numpy array with the given shape containing alternating values
of False, True, False, True,... along each row and each column.
Examples
--------
>>> make_alternating_boolean_array((4,4))
array([[ True, False, True, False],
[False, True, False, True],
[ True, False, True, False],
[False, True, False, True]], dtype=bool)
>>> make_alternating_boolean_array((4,3), first_value=False)
array([[False, True, False],
[ True, False, True],
[False, True, False],
[ True, False, True]], dtype=bool)
"""
if len(shape) != 2:
raise ValueError(
'Shape must be 2-dimensional. Given shape was {}'.format(shape)
)
alternating = np.empty(shape, dtype=np.bool)
for row in alternating:
row[::2] = first_value
row[1::2] = not(first_value)
first_value = not(first_value)
return alternating
def make_cascading_boolean_array(shape, first_value=True):
"""
Create a numpy array with the given shape containing cascading boolean
values, with `first_value` being the top-left value.
Examples
--------
>>> make_cascading_boolean_array((4,4))
array([[ True, True, True, False],
[ True, True, False, False],
[ True, False, False, False],
[False, False, False, False]], dtype=bool)
>>> make_cascading_boolean_array((4,2))
array([[ True, False],
[False, False],
[False, False],
[False, False]], dtype=bool)
>>> make_cascading_boolean_array((2,4))
array([[ True, True, True, False],
[ True, True, False, False]], dtype=bool)
"""
if len(shape) != 2:
raise ValueError(
'Shape must be 2-dimensional. Given shape was {}'.format(shape)
)
cascading = np.full(shape, not(first_value), dtype=np.bool)
ending_col = shape[1] - 1
for row in cascading:
if ending_col > 0:
row[:ending_col] = first_value
ending_col -= 1
else:
break
return cascading
@expect_dimensions(array=2)
def permute_rows(seed, array):
"""
@@ -1400,3 +1468,37 @@ def ensure_doctest(f, name=None):
f.__name__ if name is None else name
] = f
return f
####################################
# Shared factors for pipeline tests.
####################################
class AssetID(CustomFactor):
"""
CustomFactor that returns the AssetID of each asset.
Useful for providing a Factor that produces a different value for each
asset.
"""
window_length = 1
inputs = ()
def compute(self, today, assets, out):
out[:] = assets
class AssetIDPlusDay(CustomFactor):
window_length = 1
inputs = ()
def compute(self, today, assets, out):
out[:] = assets + today.day
class OpenPrice(CustomFactor):
window_length = 1
inputs = [USEquityPricing.open]
def compute(self, today, assets, out, open):
out[:] = open