diff --git a/docs/source/appendix.rst b/docs/source/appendix.rst index b5792fd5..6570b803 100644 --- a/docs/source/appendix.rst +++ b/docs/source/appendix.rst @@ -215,6 +215,15 @@ Pipeline API .. autoclass:: zipline.pipeline.factors.BollingerBands :members: +.. autoclass:: zipline.pipeline.factors.RollingPearsonOfReturns + :members: + +.. autoclass:: zipline.pipeline.factors.RollingSpearmanOfReturns + :members: + +.. autoclass:: zipline.pipeline.factors.RollingLinearRegressionOfReturns + :members: + .. autoclass:: zipline.pipeline.filters.Filter :members: __and__, __or__ :exclude-members: dtype diff --git a/docs/source/whatsnew/1.0.0.txt b/docs/source/whatsnew/1.0.0.txt index 4dc4cea3..2ae37f84 100644 --- a/docs/source/whatsnew/1.0.0.txt +++ b/docs/source/whatsnew/1.0.0.txt @@ -173,6 +173,11 @@ Enhancements * Fetcher has been moved from Quantopian internal code into Zipline (:issue:`1105`). +* Added new built-in factors, + :class:`~zipline.pipeline.factors.RollingPearsonOfReturns`, + :class:`~zipline.pipeline.factors.RollingSpearmanOfReturns` and + :class:`~zipline.pipeline.factors.RollingLinearRegressionOfReturns` + (:issue:`1154`) Experimental Features ~~~~~~~~~~~~~~~~~~~~~ diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 254936ff..3cef954b 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -14,6 +14,7 @@ from numpy import ( float32, float64, full, + full_like, log, nan, tile, @@ -36,6 +37,7 @@ from pandas import ( ) from pandas.compat.chainmap import ChainMap from pandas.util.testing import assert_frame_equal +from scipy.stats.stats import linregress, pearsonr, spearmanr from six import iteritems, itervalues from toolz import merge @@ -53,6 +55,10 @@ from zipline.pipeline.factors import ( ExponentialWeightedMovingAverage, ExponentialWeightedMovingStdDev, MaxDrawdown, + Returns, + RollingLinearRegressionOfReturns, + RollingPearsonOfReturns, + RollingSpearmanOfReturns, SimpleMovingAverage, ) from zipline.pipeline.loaders.equity_pricing_loader import ( @@ -66,8 +72,9 @@ from zipline.pipeline.loaders.synthetic import ( ) from zipline.pipeline.term import NotSpecified from zipline.testing import ( - product_upper_triangle, check_arrays, + parameter_space, + product_upper_triangle, ) from zipline.testing.fixtures import ( WithAdjustmentReader, @@ -1242,6 +1249,188 @@ class ParameterizedFactorTestCase(WithTradingEnvironment, ZiplineTestCase): expected_5 = rolling_mean((self.raw_data ** 2) * 2, window=5)[5:] assert_frame_equal(results['dv5'].unstack(), expected_5) + @parameter_space(returns_length=[2, 3], correlation_length=[3, 4]) + def test_correlation_factors(self, returns_length, correlation_length): + """ + Tests for the built-in factors `RollingPearsonOfReturns` and + `RollingSpearmanOfReturns`. + """ + my_asset_column = 0 + start_date_index = 6 + end_date_index = 10 + + assets = self.asset_finder.retrieve_all(self.sids) + my_asset = assets[my_asset_column] + my_asset_filter = (AssetID() != (my_asset_column + 1)) + num_days = end_date_index - start_date_index + 1 + + # Our correlation factors require that their target asset is not + # filtered out, so make sure that masking out our target asset does not + # take effect. That is, a filter which filters out only our target + # asset should produce the same result as if no mask was passed at all. + for mask in (NotSpecified, my_asset_filter): + pearson_factor = RollingPearsonOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + mask=mask, + ) + spearman_factor = RollingSpearmanOfReturns( + target=my_asset, + returns_length=returns_length, + correlation_length=correlation_length, + mask=mask, + ) + + results = self.engine.run_pipeline( + Pipeline( + columns={ + 'pearson_factor': pearson_factor, + 'spearman_factor': spearman_factor, + }, + ), + self.dates[start_date_index], + self.dates[end_date_index], + ) + pearson_results = results['pearson_factor'].unstack() + spearman_results = results['spearman_factor'].unstack() + + # Run a separate pipeline that calculates returns starting + # (correlation_length - 1) days prior to our start date. This is + # because we need (correlation_length - 1) extra days of returns to + # compute our expected correlations. + returns = Returns(window_length=returns_length) + results = self.engine.run_pipeline( + Pipeline(columns={'returns': returns}), + self.dates[start_date_index - (correlation_length - 1)], + self.dates[end_date_index], + ) + returns_results = results['returns'].unstack() + + # On each day, calculate the expected correlation coefficients + # between the asset we are interested in and each other asset. Each + # correlation is calculated over `correlation_length` days. + expected_pearson_results = full_like(pearson_results, nan) + expected_spearman_results = full_like(spearman_results, nan) + for day in range(num_days): + todays_returns = returns_results.iloc[ + day:day + correlation_length + ] + my_asset_returns = todays_returns.iloc[:, my_asset_column] + for asset, other_asset_returns in todays_returns.iteritems(): + asset_column = int(asset) - 1 + expected_pearson_results[day, asset_column] = pearsonr( + my_asset_returns, other_asset_returns, + )[0] + expected_spearman_results[day, asset_column] = spearmanr( + my_asset_returns, other_asset_returns, + )[0] + + assert_frame_equal( + pearson_results, + DataFrame( + expected_pearson_results, + index=self.dates[start_date_index:end_date_index + 1], + columns=assets, + ), + ) + assert_frame_equal( + spearman_results, + DataFrame( + expected_spearman_results, + index=self.dates[start_date_index:end_date_index + 1], + columns=assets, + ), + ) + + @parameter_space(returns_length=[2, 3], regression_length=[3, 4]) + def test_regression_of_returns_factor(self, + returns_length, + regression_length): + """ + Tests for the built-in factor `RollingLinearRegressionOfReturns`. + """ + my_asset_column = 0 + start_date_index = 6 + end_date_index = 10 + + assets = self.asset_finder.retrieve_all(self.sids) + my_asset = assets[my_asset_column] + my_asset_filter = (AssetID() != (my_asset_column + 1)) + num_days = end_date_index - start_date_index + 1 + + # The order of these is meant to align with the output of `linregress`. + outputs = ['beta', 'alpha', 'r_value', 'p_value', 'stderr'] + + # Our regression factor requires that its target asset is not filtered + # out, so make sure that masking out our target asset does not take + # effect. That is, a filter which filters out only our target asset + # should produce the same result as if no mask was passed at all. + for mask in (NotSpecified, my_asset_filter): + regression_factor = RollingLinearRegressionOfReturns( + target=my_asset, + returns_length=returns_length, + regression_length=regression_length, + mask=mask, + ) + results = self.engine.run_pipeline( + Pipeline( + columns={ + output: getattr(regression_factor, output) + for output in outputs + }, + ), + self.dates[start_date_index], + self.dates[end_date_index], + ) + output_results = {} + expected_output_results = {} + for output in outputs: + output_results[output] = results[output].unstack() + expected_output_results[output] = full_like( + output_results[output], nan, + ) + + # Run a separate pipeline that calculates returns starting 2 days + # prior to our start date. This is because we need + # (regression_length - 1) extra days of returns to compute our + # expected regressions. + returns = Returns(window_length=returns_length) + results = self.engine.run_pipeline( + Pipeline(columns={'returns': returns}), + self.dates[start_date_index - (regression_length - 1)], + self.dates[end_date_index], + ) + returns_results = results['returns'].unstack() + + # On each day, calculate the expected regression results for Y ~ X + # where Y is the asset we are interested in and X is each other + # asset. Each regression is calculated over `regression_length` + # days of data. + for day in range(num_days): + todays_returns = returns_results.iloc[ + day:day + regression_length + ] + my_asset_returns = todays_returns.iloc[:, my_asset_column] + for asset, other_asset_returns in todays_returns.iteritems(): + asset_column = int(asset) - 1 + expected_regression_results = linregress( + y=other_asset_returns, x=my_asset_returns, + ) + for i, output in enumerate(outputs): + expected_output_results[output][day, asset_column] = \ + expected_regression_results[i] + + for output in outputs: + assert_frame_equal( + output_results[output], + DataFrame( + expected_output_results[output], + index=self.dates[start_date_index:end_date_index + 1], + columns=assets, + ), + ) + class StringColumnTestCase(WithSeededRandomPipelineEngine, ZiplineTestCase): diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index 062bd9e4..1a02fbac 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -7,7 +7,7 @@ from unittest import TestCase from zipline.errors import ( DTypeNotSpecified, - WindowedInputToWindowedTerm, + NonWindowSafeInput, NotDType, TermInputsNotSpecified, TermOutputsEmpty, @@ -198,7 +198,7 @@ class DependencyResolutionTestCase(TestCase): def test_disallow_recursive_lookback(self): - with self.assertRaises(WindowedInputToWindowedTerm): + with self.assertRaises(NonWindowSafeInput): SomeFactor(inputs=[SomeFactor(), SomeDataSet.foo]) diff --git a/zipline/errors.py b/zipline/errors.py index 675c3eac..44b88c03 100644 --- a/zipline/errors.py +++ b/zipline/errors.py @@ -418,10 +418,10 @@ class WindowLengthNotPositive(ZiplineError): ).strip() -class WindowedInputToWindowedTerm(ZiplineError): +class NonWindowSafeInput(ZiplineError): """ - Raised when a windowed Pipeline API term is specified as an input to - another windowed term. + Raised when a Pipeline API term that is not deemed window safe is specified + as an input to another windowed term. This is an error because it's generally not safe to compose windowed functions on split/dividend adjusted data. @@ -617,3 +617,10 @@ class HistoryWindowStartsBeforeData(ZiplineError): "History window extends before {first_trading_day}. To use this " "history window, start the backtest on or after {suggested_start_day}." ) + + +class NonExistentAssetInTimeFrame(ZiplineError): + msg = ( + "The target asset '{asset}' does not exist for the entire timeframe " + "between {start_date} and {end_date}." + ) diff --git a/zipline/lib/adjusted_array.py b/zipline/lib/adjusted_array.py index 40c6803e..44645acd 100644 --- a/zipline/lib/adjusted_array.py +++ b/zipline/lib/adjusted_array.py @@ -243,6 +243,20 @@ class AdjustedArray(object): ) +def ensure_adjusted_array(ndarray_or_adjusted_array, missing_value): + if isinstance(ndarray_or_adjusted_array, AdjustedArray): + return ndarray_or_adjusted_array + elif isinstance(ndarray_or_adjusted_array, ndarray): + return AdjustedArray( + ndarray_or_adjusted_array, NOMASK, {}, missing_value, + ) + else: + raise TypeError( + "Can't convert %s to AdjustedArray" % + type(ndarray_or_adjusted_array).__name__ + ) + + def ensure_ndarray(ndarray_or_adjusted_array): """ Return the input as a numpy ndarray. diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index dc867ae2..0fd19cfe 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -113,6 +113,7 @@ class BoundColumn(LoadableTerm): """ mask = AssetExists() inputs = () + window_safe = True def __new__(cls, dtype, missing_value, dataset, name): return super(BoundColumn, cls).__new__( diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index cecb9caf..c0b87887 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -20,7 +20,7 @@ from pandas import ( from toolz import groupby, juxt from toolz.curried.operator import getitem -from zipline.lib.adjusted_array import ensure_ndarray +from zipline.lib.adjusted_array import ensure_adjusted_array, ensure_ndarray from zipline.errors import NoFurtherDataError from zipline.utils.numpy_utils import repeat_first_axis, repeat_last_axis from zipline.utils.pandas_utils import explode @@ -265,28 +265,31 @@ class SimplePipelineEngine(object): that input. """ offsets = graph.offset + out = [] if term.windowed: # If term is windowed, then all input data should be instances of # AdjustedArray. - return [ - workspace[input_].traverse( - window_length=term.window_length, - offset=offsets[term, input_] + for input_ in term.inputs: + adjusted_array = ensure_adjusted_array( + workspace[input_], input_.missing_value, ) - for input_ in term.inputs - ] - - # If term is not windowed, input_data may be an AdjustedArray or - # np.ndarray. Coerce the former to the latter. - out = [] - for input_ in term.inputs: - input_data = ensure_ndarray(workspace[input_]) - offset = offsets[term, input_] - # OPTIMIZATION: Don't make a copy by doing input_data[0:] if - # offset is zero. - if offset: - input_data = input_data[offset:] - out.append(input_data) + out.append( + adjusted_array.traverse( + window_length=term.window_length, + offset=offsets[term, input_], + ) + ) + else: + # If term is not windowed, input_data may be an AdjustedArray or + # np.ndarray. Coerce the former to the latter. + for input_ in term.inputs: + input_data = ensure_ndarray(workspace[input_]) + offset = offsets[term, input_] + # OPTIMIZATION: Don't make a copy by doing input_data[0:] if + # offset is zero. + if offset: + input_data = input_data[offset:] + out.append(input_data) return out def get_loader(self, term): diff --git a/zipline/pipeline/factors/__init__.py b/zipline/pipeline/factors/__init__.py index d59de274..2e6dbdd3 100644 --- a/zipline/pipeline/factors/__init__.py +++ b/zipline/pipeline/factors/__init__.py @@ -8,10 +8,10 @@ from .events import ( BusinessDaysSince13DFilingsDate, BusinessDaysSinceBuybackAuth, BusinessDaysSinceDividendAnnouncement, - BusinessDaysUntilNextExDate, + BusinessDaysSincePreviousEarnings, BusinessDaysSincePreviousExDate, BusinessDaysUntilNextEarnings, - BusinessDaysSincePreviousEarnings, + BusinessDaysUntilNextExDate, ) from .technical import ( AverageDollarVolume, @@ -21,24 +21,27 @@ from .technical import ( ExponentialWeightedMovingAverage, ExponentialWeightedMovingStdDev, MaxDrawdown, - RSI, Returns, + RollingLinearRegressionOfReturns, + RollingPearsonOfReturns, + RollingSpearmanOfReturns, + RSI, SimpleMovingAverage, VWAP, WeightedAverageValue, ) __all__ = [ + 'AverageDollarVolume', 'BollingerBands', 'BusinessDaysSince13DFilingsDate', 'BusinessDaysSinceBuybackAuth', 'BusinessDaysSinceDividendAnnouncement', - 'BusinessDaysUntilNextExDate', + 'BusinessDaysSincePreviousEarnings', 'BusinessDaysSincePreviousExDate', 'BusinessDaysUntilNextEarnings', - 'BusinessDaysSincePreviousEarnings', + 'BusinessDaysUntilNextExDate', 'CustomFactor', - 'AverageDollarVolume', 'EWMA', 'EWMSTD', 'ExponentialWeightedMovingAverage', @@ -46,9 +49,12 @@ __all__ = [ 'Factor', 'Latest', 'MaxDrawdown', - 'RSI', 'RecarrayField', 'Returns', + 'RollingLinearRegressionOfReturns', + 'RollingPearsonOfReturns', + 'RollingSpearmanOfReturns', + 'RSI', 'SimpleMovingAverage', 'VWAP', 'WeightedAverageValue', diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 6a0b0b08..cbb39e5a 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -577,6 +577,7 @@ class Factor(RestrictedDTypeMixin, ComputableTerm): factor=self, mask=mask, groupby=groupby, + window_safe=True, ) def rank(self, method='ordinal', ascending=True, mask=NotSpecified): @@ -908,7 +909,7 @@ class GroupedRowTransform(Factor): """ window_length = 0 - def __new__(cls, transform, factor, mask, groupby): + def __new__(cls, transform, factor, mask, groupby, **kwargs): if mask is NotSpecified: mask = factor.mask @@ -925,6 +926,7 @@ class GroupedRowTransform(Factor): missing_value=factor.missing_value, mask=mask, dtype=factor.dtype, + **kwargs ) def _init(self, transform, *args, **kwargs): @@ -1001,6 +1003,7 @@ class Rank(SingleInputMixin, Factor): """ window_length = 0 dtype = float64_dtype + window_safe = True def __new__(cls, factor, method, ascending, mask): return super(Rank, cls).__new__( diff --git a/zipline/pipeline/factors/technical.py b/zipline/pipeline/factors/technical.py index 74b02c2b..008f2627 100644 --- a/zipline/pipeline/factors/technical.py +++ b/zipline/pipeline/factors/technical.py @@ -8,6 +8,7 @@ from numpy import ( arange, average, clip, + corrcoef, diff, exp, fmax, @@ -16,13 +17,17 @@ from numpy import ( isnan, log, NINF, + searchsorted, sqrt, sum as np_sum, ) from numexpr import evaluate +from scipy.stats import linregress, spearmanr from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.filters import SingleAsset from zipline.pipeline.mixins import SingleInputMixin +from zipline.pipeline.term import NotSpecified from zipline.utils.numpy_utils import ignore_nanwarnings from zipline.utils.input_validation import expect_types from zipline.utils.math_utils import ( @@ -42,6 +47,16 @@ class Returns(CustomFactor): **Default Inputs**: [USEquityPricing.close] """ inputs = [USEquityPricing.close] + window_safe = True + + def _validate(self): + super(Returns, self)._validate() + if self.window_length < 2: + raise ValueError( + "'Returns' expected a window length of at least 2, but was " + "given {window_length}. For daily returns, use a window " + "length of 2.".format(window_length=self.window_length) + ) def compute(self, today, assets, out, close): out[:] = (close[-1] - close[0]) / close[0] @@ -145,6 +160,260 @@ class AverageDollarVolume(CustomFactor): out[:] = nanmean(close * volume, axis=0) +class _RollingCorrelationOfReturns(CustomFactor, SingleInputMixin): + """ + Base class for factors computing a rolling correlation over a window of + Returns. + + Parameters + ---------- + target : zipline.assets.Asset + The asset to correlate with all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + correlation_length : int >= 1 + Length of the lookback window over which to compute each correlation + coefficient. + """ + params = ['target'] + + def __new__(cls, + target, + returns_length, + correlation_length, + mask=NotSpecified, + **kwargs): + if mask is not NotSpecified: + # Make sure we do not filter out the asset of interest. + mask = mask | SingleAsset(asset=target) + return super(_RollingCorrelationOfReturns, cls).__new__( + cls, + target=target, + inputs=[Returns(window_length=returns_length)], + window_length=correlation_length, + mask=mask, + **kwargs + ) + + +class RollingPearsonOfReturns(_RollingCorrelationOfReturns): + """ + Calculates the Pearson product-moment correlation coefficient of the + returns of the given asset with the returns of all other assets. + + Pearson correlation is what most people mean when they say "correlation + coefficient" or "R-value". + + Parameters + ---------- + target : zipline.assets.Asset + The asset to correlate with all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + correlation_length : int >= 1 + Length of the lookback window over which to compute each correlation + coefficient. + + Example + ------- + Let the following be example 10-day returns for three different assets:: + + SPY MSFT FB + 2017-03-13 -.03 .03 .04 + 2017-03-14 -.02 -.03 .02 + 2017-03-15 -.01 .02 .01 + 2017-03-16 0 -.02 .01 + 2017-03-17 .01 .04 -.01 + 2017-03-20 .02 -.03 -.02 + 2017-03-21 .03 .01 -.02 + 2017-03-22 .04 -.02 -.02 + + Suppose we are interested in SPY's rolling returns correlation with each + stock from 2017-03-17 to 2017-03-22, using a 5-day look back window (that + is, we calculate each correlation coefficient over 5 days of data). We can + achieve this by doing:: + + rolling_correlations = RollingPearsonOfReturns( + target=Equity(8554), + returns_length=10, + correlation_length=5, + ) + + The result of computing ``rolling_correlations`` from 2017-03-17 to + 2017-03-22 gives:: + + SPY MSFT FB + 2017-03-17 1 .15 -.96 + 2017-03-20 1 .10 -.96 + 2017-03-21 1 -.16 -.94 + 2017-03-22 1 -.16 -.85 + + Note that the column for SPY is all 1's, as the correlation of any data + series with itself is always 1. To understand how each of the other values + were calculated, take for example the .15 in MSFT's column. This is the + correlation coefficient between SPY's returns looking back from 2017-03-17 + (-.03, -.02, -.01, 0, .01) and MSFT's returns (.03, -.03, .02, -.02, .04). + + See Also + -------- + :class:`zipline.pipeline.factors.technical.RollingSpearmanOfReturns` + :class:`zipline.pipeline.factors.technical.RollingLinearRegressionOfReturns` + """ + def compute(self, today, assets, out, data, target): + asset_col = searchsorted(assets.values, target.sid) + out[:] = corrcoef(data, rowvar=0)[asset_col] + + +class RollingSpearmanOfReturns(_RollingCorrelationOfReturns): + """ + Calculates the Spearman rank correlation coefficient of the returns of the + given asset with the returns of all other assets. + + Parameters + ---------- + target : zipline.assets.Asset + The asset to correlate with all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + correlation_length : int >= 1 + Length of the lookback window over which to compute each correlation + coefficient. + + See Also + -------- + :class:`zipline.pipeline.factors.technical.RollingPearsonOfReturns` + :class:`zipline.pipeline.factors.technical.RollingLinearRegressionOfReturns` + """ + def compute(self, today, assets, out, data, target): + asset_col = searchsorted(assets.values, target.sid) + out[:] = spearmanr(data)[0][asset_col] + + +class RollingLinearRegressionOfReturns(CustomFactor, SingleInputMixin): + """ + Perform an ordinary least-squares regression predicting the returns of all + other assets on the given asset. + + Parameters + ---------- + target : zipline.assets.Asset + The asset to regress against all other assets. + returns_length : int >= 2 + Length of the lookback window over which to compute returns. Daily + returns require a window length of 2. + regression_length : int >= 1 + Length of the lookback window over which to compute each regression. + + Note + ---- + This factor is designed to return five outputs: + - alpha, a factor that computes the intercepts of each regression. + - beta, a factor that computes the slopes of each regression. + - r_value, a factor that computes the correlation coefficient of each + regression. + - p_value, a factor that computes, for each regression, the two-sided + p-value for a hypothesis test whose null hypothesis is that the slope + is zero. + - stderr, a factor that computes the standard error of the estimate of + each regression. + + Example + ------- + Let the following be example 10-day returns for three different assets:: + + SPY MSFT FB + 2017-03-13 -.03 .03 .04 + 2017-03-14 -.02 -.03 .02 + 2017-03-15 -.01 .02 .01 + 2017-03-16 0 -.02 .01 + 2017-03-17 .01 .04 -.01 + 2017-03-20 .02 -.03 -.02 + 2017-03-21 .03 .01 -.02 + 2017-03-22 .04 -.02 -.02 + + Suppose we are interested in predicting each stock's returns from SPY's + over rolling 5-day look back windows. We can compute rolling regression + coefficients (alpha and beta) from 2017-03-17 to 2017-03-22 by doing:: + + regression_factor = RollingRegressionOfReturns( + target=Equity(8554), + returns_length=10, + regression_length=5, + ) + alpha = regression_factor.alpha + beta = regression_factor.beta + + The result of computing ``alpha`` from 2017-03-17 to 2017-03-22 gives:: + + SPY MSFT FB + 2017-03-17 0 .011 .003 + 2017-03-20 0 -.004 .004 + 2017-03-21 0 .007 .006 + 2017-03-22 0 .002 .008 + + And the result of computing ``beta`` from 2017-03-17 to 2017-03-22 gives:: + + SPY MSFT FB + 2017-03-17 1 .3 -1.1 + 2017-03-20 1 .2 -1 + 2017-03-21 1 -.3 -1 + 2017-03-22 1 -.3 -.9 + + Note that SPY's column for alpha is all 0's and for beta is all 1's, as the + regression line of SPY with itself is simply the function y = x. + + To understand how each of the other values were calculated, take for + example MSFT's ``alpha`` and ``beta`` values on 2017-03-17 (.011 and .3, + respectively). These values are the result of running a linear regression + predicting MSFT's returns from SPY's returns, using values starting at + 2017-03-17 and looking back 5 days. That is, the regression was run with + x = [-.03, -.02, -.01, 0, .01] and y = [.03, -.03, .02, -.02, .04], and it + produced a slope of .3 and an intercept of .011. + + See Also + -------- + :class:`zipline.pipeline.factors.technical.RollingPearsonOfReturns` + :class:`zipline.pipeline.factors.technical.RollingSpearmanOfReturns` + """ + outputs = ['alpha', 'beta', 'r_value', 'p_value', 'stderr'] + params = ['target'] + + def __new__(cls, + target, + returns_length, + regression_length, + mask=NotSpecified, + **kwargs): + if mask is not NotSpecified: + # Make sure we do not filter out the asset of interest. + mask = mask | SingleAsset(asset=target) + return super(RollingLinearRegressionOfReturns, cls).__new__( + cls, + target=target, + inputs=[Returns(window_length=returns_length)], + window_length=regression_length, + mask=mask, + **kwargs + ) + + def compute(self, today, assets, out, returns, target): + asset_col = searchsorted(assets.values, target.sid) + my_asset = returns[:, asset_col] + for i in range(len(out)): + other_asset = returns[:, i] + regr_results = linregress(y=other_asset, x=my_asset) + # `linregress` returns its results in the following order: + # slope, intercept, r-value, p-value, stderr + out.alpha[i] = regr_results[1] + out.beta[i] = regr_results[0] + out.r_value[i] = regr_results[2] + out.p_value[i] = regr_results[3] + out.stderr[i] = regr_results[4] + + class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor): """ Base class for factors implementing exponential-weighted operations. diff --git a/zipline/pipeline/filters/__init__.py b/zipline/pipeline/filters/__init__.py index be7a63a3..c3589d99 100644 --- a/zipline/pipeline/filters/__init__.py +++ b/zipline/pipeline/filters/__init__.py @@ -6,6 +6,7 @@ from .filter import ( NullFilter, NumExprFilter, PercentileFilter, + SingleAsset, ) __all__ = [ @@ -16,4 +17,5 @@ __all__ = [ 'NullFilter', 'NumExprFilter', 'PercentileFilter', + 'SingleAsset', ] diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 053b794e..d39a9d67 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -12,6 +12,7 @@ from numpy import ( ) from zipline.errors import ( BadPercentileBounds, + NonExistentAssetInTimeFrame, UnsupportedDataType, ) from zipline.lib.labelarray import LabelArray @@ -31,7 +32,7 @@ from zipline.pipeline.expression import ( NumericalExpression, ) from zipline.utils.input_validation import expect_types -from zipline.utils.numpy_utils import bool_dtype +from zipline.utils.numpy_utils import bool_dtype, repeat_first_axis def concat_tuples(*tuples): @@ -427,3 +428,35 @@ class Latest(LatestMixin, CustomFilter): Filter producing the most recently-known value of `inputs[0]` on each day. """ pass + + +class SingleAsset(Filter): + """ + A Filter that computes to True only for the given asset. + """ + inputs = [] + window_length = 1 + + def __new__(cls, asset): + return super(SingleAsset, cls).__new__(cls, asset=asset) + + def _init(self, asset, *args, **kwargs): + self._asset = asset + return super(SingleAsset, self)._init(*args, **kwargs) + + @classmethod + def static_identity(cls, asset, *args, **kwargs): + return ( + super(SingleAsset, cls).static_identity(*args, **kwargs), asset, + ) + + def _compute(self, arrays, dates, assets, mask): + is_my_asset = (assets == self._asset.sid) + out = repeat_first_axis(is_my_asset, len(mask)) + # Raise an exception if `self._asset` does not exist for the entirety + # of the timeframe over which we are computing. + if (is_my_asset.sum() != 1) or ((out & mask).sum() != len(mask)): + raise NonExistentAssetInTimeFrame( + asset=self._asset, start_date=dates[0], end_date=dates[-1], + ) + return out diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 529e2459..510776ad 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -62,30 +62,49 @@ class TermGraph(DiGraph): def offset(self): """ For all pairs (term, input) such that `input` is an input to `term`, - compute a mapping: + compute a mapping:: (term, input) -> offset(term, input) - where `offset(term, input)` is defined as + where ``offset(term, input)`` is the number of rows that ``term`` + should truncate off the raw array produced for ``input`` before using + it. We compute this value as follows:: - Max number of extra rows needed by any term depending on `input` - minus - Number of extra rows needed by `term`. + offset(term, input) = (extra_rows_computed(input) + - extra_rows_computed(term) + - requested_extra_rows(term, input)) + Examples + -------- - Example - ------- + Case 1 + ~~~~~~ Factor A needs 5 extra rows of USEquityPricing.close, and Factor B needs 3 extra rows of the same. Factor A also requires 5 extra rows of - USEquityPricing.high, which no other Factor uses. + USEquityPricing.high, which no other Factor uses. We don't require any + extra rows of Factor A or Factor B We load 5 extra rows of both `price` and `high` to ensure we can - service Factor A, and the following offsets get computed: + service Factor A, and the following offsets get computed:: - self.offset[Factor A, USEquityPricing.close] == 0 - self.offset[Factor A, USEquityPricing.high] == 0 - self.offset[Factor B, USEquityPricing.close] == 2 - self.offset[Factor B, USEquityPricing.high] raises KeyError. + offset[Factor A, USEquityPricing.close] == (5 - 0) - 5 == 0 + offset[Factor A, USEquityPricing.high] == (5 - 0) - 5 == 0 + offset[Factor B, USEquityPricing.close] == (5 - 0) - 3 == 2 + offset[Factor B, USEquityPricing.high] raises KeyError. + + Case 2 + ~~~~~~ + + Factor A needs 5 extra rows of USEquityPricing.close, and Factor B + needs 3 extra rows of Factor A, and Factor B needs 2 extra rows of + USEquityPricing.close. + + We load 8 extra rows of USEquityPricing.close (enough to load 5 extra + rows of Factor A), and the following offsets get computed:: + + offset[Factor A, USEquityPricing.close] == (8 - 3) - 5 == 0 + offset[Factor B, USEquityPricing.close] == (8 - 0) - 2 == 6 + offset[Factor B, Factor A] == (3 - 0) - 3 == 0 Notes ----- @@ -104,9 +123,15 @@ class TermGraph(DiGraph): zipline.pipeline.engine.SimplePipelineEngine._inputs_for_term zipline.pipeline.engine.SimplePipelineEngine._mask_and_dates_for_term """ - return {(term, dep): self.extra_rows[dep] - additional_extra_rows - for term in self - for dep, additional_extra_rows in term.dependencies.items()} + extra = self.extra_rows + return { + # Another way of thinking about this is: + # How much bigger is the array for ``dep`` compared to ``term``? + # How much of that difference did I ask for. + (term, dep): (extra[dep] - extra[term]) - requested_extra_rows + for term in self + for dep, requested_extra_rows in term.dependencies.items() + } @lazyval def extra_rows(self): diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 942c7182..c6091e2a 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -8,7 +8,7 @@ from numpy import array, dtype as dtype_class, ndarray from six import with_metaclass from zipline.errors import ( DTypeNotSpecified, - WindowedInputToWindowedTerm, + NonWindowSafeInput, NotDType, TermInputsNotSpecified, TermOutputsEmpty, @@ -48,12 +48,16 @@ class Term(with_metaclass(ABCMeta, object)): # no params. params = () + # Determines if a term is safe to be used as a windowed input. + window_safe = False + _term_cache = WeakValueDictionary() def __new__(cls, domain=domain, dtype=dtype, missing_value=missing_value, + window_safe=NotSpecified, # params is explicitly not allowed to be passed to an instance. *args, **kwargs): @@ -75,6 +79,8 @@ class Term(with_metaclass(ABCMeta, object)): dtype = cls.dtype if missing_value is NotSpecified: missing_value = cls.missing_value + if window_safe is NotSpecified: + window_safe = cls.window_safe dtype, missing_value = cls.validate_dtype( cls.__name__, @@ -87,6 +93,7 @@ class Term(with_metaclass(ABCMeta, object)): domain=domain, dtype=dtype, missing_value=missing_value, + window_safe=window_safe, params=params, *args, **kwargs ) @@ -99,6 +106,7 @@ class Term(with_metaclass(ABCMeta, object)): domain=domain, dtype=dtype, missing_value=missing_value, + window_safe=window_safe, params=params, *args, **kwargs ) @@ -236,7 +244,12 @@ class Term(with_metaclass(ABCMeta, object)): pass @classmethod - def static_identity(cls, domain, dtype, missing_value, params): + def static_identity(cls, + domain, + dtype, + missing_value, + window_safe, + params): """ Return the identity of the Term that would be constructed from the given arguments. @@ -248,9 +261,9 @@ class Term(with_metaclass(ABCMeta, object)): This is a classmethod so that it can be called from Term.__new__ to determine whether to produce a new instance. """ - return (cls, domain, dtype, missing_value, params) + return (cls, domain, dtype, missing_value, window_safe, params) - def _init(self, domain, dtype, missing_value, params): + def _init(self, domain, dtype, missing_value, window_safe, params): """ Parameters ---------- @@ -264,6 +277,7 @@ class Term(with_metaclass(ABCMeta, object)): self.domain = domain self.dtype = dtype self.missing_value = missing_value + self.window_safe = window_safe for name, value in params: if hasattr(self, name): @@ -464,8 +478,8 @@ class ComputableTerm(Term): if self.window_length: for child in self.inputs: - if child.windowed: - raise WindowedInputToWindowedTerm(parent=self, child=child) + if not child.window_safe: + raise NonWindowSafeInput(parent=self, child=child) def _compute(self, inputs, dates, assets, mask): """