From b40ebdcfce7b6e3fc18735505cc0f26a8b71bbdf Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 22:58:40 -0400 Subject: [PATCH] ENH: Add support for downsampling. Adds a new ``downsample`` method to all computable terms. Computable terms (Filters, Factors, and Classifiers) can be downsampled to yearly, quarterly, monthly, or weekly frequency. The result of ``term.downsample`` is a new term of the same family (Filter/Factor/Classifier) as ``term``. The downsampled term computes by delegating to the original term; repeatedly calling its ``compute`` method with length-1 date ranges. Downsampled terms take advantage of a new ``compute_extra_rows`` Term method, which allows terms to dynamically request that additional extra rows of themselves be computed based on the dates for which they're being computed. This ensures, for example, that a monthly-downsampled term always computes at the start of a month, even when a naively-calculated pipeline window would end in the middle of the month. --- tests/pipeline/test_downsampling.py | 615 +++++++++++++++++++++ zipline/pipeline/classifiers/classifier.py | 17 + zipline/pipeline/engine.py | 12 +- zipline/pipeline/factors/factor.py | 16 + zipline/pipeline/filters/filter.py | 16 + zipline/pipeline/mixins.py | 221 +++++++- zipline/pipeline/term.py | 56 +- 7 files changed, 942 insertions(+), 11 deletions(-) create mode 100644 tests/pipeline/test_downsampling.py diff --git a/tests/pipeline/test_downsampling.py b/tests/pipeline/test_downsampling.py new file mode 100644 index 00000000..47f35eee --- /dev/null +++ b/tests/pipeline/test_downsampling.py @@ -0,0 +1,615 @@ +""" +Tests for Downsampled Filters/Factors/Classifiers +""" +import pandas as pd +from pandas.util.testing import assert_frame_equal + +from zipline.pipeline import ( + Pipeline, + CustomFactor, + CustomFilter, + CustomClassifier, +) +from zipline.pipeline.data.testing import TestingDataSet +from zipline.pipeline.factors import SimpleMovingAverage +from zipline.testing import ZiplineTestCase, parameter_space +from zipline.testing.fixtures import ( + WithTradingSessions, + WithSeededRandomPipelineEngine, +) + + +class NDaysAgoFactor(CustomFactor): + inputs = [TestingDataSet.float_col] + + def compute(self, today, assets, out, floats): + out[:] = floats[0] + + +class NDaysAgoFilter(CustomFilter): + inputs = [TestingDataSet.bool_col] + + def compute(self, today, assets, out, bools): + out[:] = bools[0] + + +class NDaysAgoClassifier(CustomClassifier): + inputs = [TestingDataSet.categorical_col] + dtype = TestingDataSet.categorical_col.dtype + + def compute(self, today, assets, out, cats): + out[:] = cats[0] + + +class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase): + + DATA_MIN_DAY = pd.Timestamp('2012-06', tz='UTC') + DATA_MAX_DAY = pd.Timestamp('2015', tz='UTC') + TRADING_CALENDAR_STRS = ('NYSE',) + + # Test with different window_lengths to ensure that window length is not + # used when calculating exra rows for the top-level term. + factor1 = TestingDataSet.float_col.latest + factor11 = NDaysAgoFactor(window_length=11) + factor91 = NDaysAgoFactor(window_length=91) + + filter1 = TestingDataSet.bool_col.latest + filter11 = NDaysAgoFilter(window_length=11) + filter91 = NDaysAgoFilter(window_length=91) + + classifier1 = TestingDataSet.categorical_col.latest + classifier11 = NDaysAgoClassifier(window_length=11) + classifier91 = NDaysAgoClassifier(window_length=91) + + all_terms = [ + factor1, + factor11, + factor91, + filter1, + filter11, + filter91, + classifier1, + classifier11, + classifier91, + ] + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_yearly(self, base_terms, calendar_name): + downsampled_terms = tuple(t.downsample('Y') for t in base_terms) + all_terms = base_terms + downsampled_terms + + all_sessions = self.trading_sessions[calendar_name] + end_session = all_sessions[-1] + + years = all_sessions.year + sessions_in_2012 = all_sessions[years == 2012] + sessions_in_2013 = all_sessions[years == 2013] + sessions_in_2014 = all_sessions[years == 2014] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in 2014. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(0, 30, 5): + start_session = sessions_in_2014[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the second date in 2014. We should request one more extra + # row in the downsampled terms to push us back to the first date in + # 2014. + for i in range(0, 30, 5): + start_session = sessions_in_2014[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of 2013. The downsampled terms should request + # enough extra rows to push us back to the start of 2013. + for i in range(0, 30, 5): + start_session = sessions_in_2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(sessions_in_2013), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of 2012. The downsampled terms should request + # enough extra rows to push us back to the first known date, which is + # in the middle of 2012 + for i in range(0, 30, 5): + start_session = sessions_in_2013[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(sessions_in_2012), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_quarterly(self, calendar_name, base_terms): + downsampled_terms = tuple(t.downsample('Q') for t in base_terms) + all_terms = base_terms + downsampled_terms + + # This region intersects with Q4 2013, Q1 2014, and Q2 2014. + tmp = self.trading_sessions[calendar_name] + all_sessions = tmp[tmp.slice_indexer('2013-12-15', '2014-04-30')] + end_session = all_sessions[-1] + + months = all_sessions.month + Q4_2013 = all_sessions[months == 12] + Q1_2014 = all_sessions[(months == 1) | (months == 2) | (months == 3)] + Q2_2014 = all_sessions[months == 4] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in Q2 2014. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(0, 15, 5): + start_session = Q2_2014[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the second date in Q2 2014. + # The downsampled terms should request one more extra row. + for i in range(0, 15, 5): + start_session = Q2_2014[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in Q1 2014. The downsampled terms + # should request enough extra rows to push us back to the first date of + # Q1 2014. + for i in range(0, 15, 5): + start_session = Q2_2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(Q1_2014), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in Q4 2013. The downsampled terms + # should request enough extra rows to push us back to the first known + # date, which is in the middle of december 2013. + for i in range(0, 15, 5): + start_session = Q1_2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(Q4_2013), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_monthly(self, calendar_name, base_terms): + downsampled_terms = tuple(t.downsample('M') for t in base_terms) + all_terms = base_terms + downsampled_terms + + # This region intersects with Dec 2013, Jan 2014, and Feb 2014. + tmp = self.trading_sessions[calendar_name] + all_sessions = tmp[tmp.slice_indexer('2013-12-15', '2014-02-28')] + end_session = all_sessions[-1] + + months = all_sessions.month + dec2013 = all_sessions[months == 12] + jan2014 = all_sessions[months == 1] + feb2014 = all_sessions[months == 2] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in feb 2014. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(0, 10, 2): + start_session = feb2014[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the second date in feb 2014. We should request one more + # extra row in the downsampled terms to push us back to the first date + # in 2014. + for i in range(0, 10, 2): + start_session = feb2014[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of jan 2014. The downsampled terms should + # request enough extra rows to push us back to the start of jan 2014. + for i in range(0, 10, 2): + start_session = feb2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(jan2014), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of dec 2013. The downsampled terms should + # request enough extra rows to push us back to the first known date, + # which is in the middle of december 2013. + for i in range(0, 10, 2): + start_session = jan2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(dec2013), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_weekly(self, calendar_name, base_terms): + downsampled_terms = tuple(t.downsample('W') for t in base_terms) + all_terms = base_terms + downsampled_terms + + # December 2013 + # Mo Tu We Th Fr Sa Su + # 1 + # 2 3 4 5 6 7 8 + # 9 10 11 12 13 14 15 + # 16 17 18 19 20 21 22 + # 23 24 25 26 27 28 29 + # 30 31 + + # January 2014 + # Mo Tu We Th Fr Sa Su + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + # This region intersects with the last full week of 2013, the week + # shared by 2013 and 2014, and the first full week of 2014. + tmp = self.trading_sessions[calendar_name] + all_sessions = tmp[tmp.slice_indexer('2013-12-27', '2014-01-12')] + end_session = all_sessions[-1] + + week0 = all_sessions[ + all_sessions.slice_indexer('2013-12-27', '2013-12-29') + ] + week1 = all_sessions[ + all_sessions.slice_indexer('2013-12-30', '2014-01-05') + ] + week2 = all_sessions[ + all_sessions.slice_indexer('2014-01-06', '2014-01-12') + ] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in week 2. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(3): + start_session = week2[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the second date in week 2. The downsampled terms + # should request one more extra row. + for i in range(3): + start_session = week2[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in week 1. The downsampled terms + # should request enough extra rows to push us back to the first date of + # week 1. + for i in range(3): + start_session = week2[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(week1), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in week0. The downsampled terms + # should request enough extra rows to push us back to the first known + # date, which is in the middle of december 2013. + for i in range(3): + start_session = week1[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(week0), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + def check_extra_row_calculations(self, + terms, + all_sessions, + start_session, + end_session, + min_extra_rows, + expected_extra_rows): + """ + Check that each term in ``terms`` computes an expected number of extra + rows for the given parameters. + """ + for term in terms: + result = term.compute_extra_rows( + all_sessions, + start_session, + end_session, + min_extra_rows, + ) + self.assertEqual( + result, + expected_extra_rows, + "Expected {} extra_rows from {}, but got {}.".format( + expected_extra_rows, + term, + result, + ) + ) + + +class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, + ZiplineTestCase): + + # Extend into the last few days of 2013 to test year/quarter boundaries. + START_DATE = pd.Timestamp('2013-12-15', tz='UTC') + + # Extend into the first few days of 2015 to test year/quarter boundaries. + END_DATE = pd.Timestamp('2015-01-06', tz='UTC') + + def test_downsample_windowed_factor(self): + + f = SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + + # June 2014 + # Mo Tu We Th Fr Sa Su + # 1 + # 2 3 4 5 6 7 8 + # 9 10 11 12 13 14 15 + # 16 17 18 19 20 21 22 + # 23 24 25 26 27 28 29 + # 30 + all_sessions = self.nyse_sessions + compute_dates = all_sessions[ + all_sessions.slice_indexer('2014-06-05', '2015-01-06') + ] + start_date, end_date = compute_dates[[0, -1]] + + pipe = Pipeline({ + 'year': f.downsample(frequency='Y'), + 'quarter': f.downsample(frequency='Q'), + 'month': f.downsample(frequency='M'), + 'week': f.downsample(frequency='W'), + }) + + # Raw values for f, computed each day from 2014 to the end of the + # target period. + raw_f_results = self.run_pipeline( + Pipeline({'f': f}), + start_date=pd.Timestamp('2014-01-02', tz='UTC'), + end_date=pd.Timestamp('2015-01-06', tz='UTC'), + )['f'].unstack() + + expected_results = { + 'year': (raw_f_results + .groupby(pd.TimeGrouper('AS')) + .first() + .reindex(compute_dates, method='ffill')), + 'quarter': (raw_f_results + .groupby(pd.TimeGrouper('QS')) + .first() + .reindex(compute_dates, method='ffill')), + 'month': (raw_f_results + .groupby(pd.TimeGrouper('MS')) + .first() + .reindex(compute_dates, method='ffill')), + 'week': (raw_f_results + .groupby(pd.TimeGrouper('W', label='left')) + .first() + .reindex(compute_dates, method='ffill')), + } + + results = self.run_pipeline(pipe, start_date, end_date) + + for frequency in expected_results: + result = results[frequency].unstack() + expected = expected_results[frequency] + assert_frame_equal(result, expected) diff --git a/zipline/pipeline/classifiers/classifier.py b/zipline/pipeline/classifiers/classifier.py index 1ae66dbc..12b3df6b 100644 --- a/zipline/pipeline/classifiers/classifier.py +++ b/zipline/pipeline/classifiers/classifier.py @@ -23,6 +23,7 @@ from zipline.utils.numpy_utils import ( from ..filters import ArrayPredicate, NotNullFilter, NullFilter, NumExprFilter from ..mixins import ( CustomTermMixin, + DownsampledMixin, LatestMixin, PositiveWindowLengthMixin, RestrictedDTypeMixin, @@ -301,6 +302,10 @@ class Classifier(RestrictedDTypeMixin, ComputableTerm): raise AssertionError("Expected a LabelArray, got %s." % type(data)) return data.as_categorical() + @property + def _downsampled_type(self): + return DownsampledClassifier + class Everything(Classifier): """ @@ -386,6 +391,18 @@ class Latest(LatestMixin, CustomClassifier): pass +class DownsampledClassifier(DownsampledMixin, Classifier): + """ + A Classifier that defers to another Classifier at lower-than-daily + frequency. + + Parameters + ---------- + term : zipline.Classifier + freq : {'Y', 'Q', 'M', 'W'} + """ + + class InvalidClassifierComparison(TypeError): def __init__(self, classifier, compval): super(InvalidClassifierComparison, self).__init__( diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 616178c7..dcda762a 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -222,13 +222,11 @@ class SimplePipelineEngine(object): finder = self._finder start_idx, end_idx = self._calendar.slice_locs(start_date, end_date) if start_idx < extra_rows: - raise NoFurtherDataError( - msg="Insufficient data to compute Pipeline.\n\n" - "start date was %s\n" - "earliest known date was %s\n" - "%d extra rows were required" % ( - start_date, calendar[0], extra_rows, - ), + raise NoFurtherDataError.from_lookback_window( + initial_message="Insufficient data to compute Pipeline:", + first_date=calendar[0], + lookback_start=start_date, + lookback_length=extra_rows, ) # Build lifetimes matrix reaching back to `extra_rows` days before diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 3410a60d..c78c074c 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -33,6 +33,7 @@ from zipline.pipeline.filters import ( ) from zipline.pipeline.mixins import ( CustomTermMixin, + DownsampledMixin, LatestMixin, PositiveWindowLengthMixin, RestrictedDTypeMixin, @@ -1071,6 +1072,10 @@ class Factor(RestrictedDTypeMixin, ComputableTerm): """ return (-inf < self) & (self < inf) + @property + def _downsampled_type(self): + return DownsampledFactor + class NumExprFactor(NumericalExpression, Factor): """ @@ -1510,6 +1515,17 @@ class Latest(LatestMixin, CustomFactor): out[:] = data[-1] +class DownsampledFactor(DownsampledMixin, Factor): + """ + A Factor that defers to another Factor at lower-than-daily frequency. + + Parameters + ---------- + term : zipline.pipeline.Factor + freq : {'Y', 'Q', 'M', 'W'} + """ + + # Functions to be passed to GroupedRowTransform. These aren't defined inline # because the transformation function is part of the instance hash key. def demean(row): diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 502fd031..c600fc15 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -25,6 +25,7 @@ from zipline.pipeline.expression import ( ) from zipline.pipeline.mixins import ( CustomTermMixin, + DownsampledMixin, LatestMixin, PositiveWindowLengthMixin, RestrictedDTypeMixin, @@ -201,6 +202,10 @@ class Filter(RestrictedDTypeMixin, ComputableTerm): ) return retval + @property + def _downsampled_type(self): + return DownsampledFilter + class NumExprFilter(NumericalExpression, Filter): """ @@ -458,6 +463,17 @@ class Latest(LatestMixin, CustomFilter): pass +class DownsampledFilter(DownsampledMixin, Filter): + """ + A Filter that defers to another Filter at lower-than-daily frequency. + + Parameters + ---------- + term : zipline.pipeline.Filter + freq : {'Y', 'Q', 'M', 'W'} + """ + + class SingleAsset(Filter): """ A Filter that computes to True only for the given asset. diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 3eb0cfd4..73b6d111 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -1,16 +1,28 @@ """ Mixins classes for use with Filters and Factors. """ +from operator import attrgetter + from numpy import ( array, full, recarray, + vstack, ) +from pandas import NaT as pd_NaT +from zipline.errors import ( + WindowLengthNotPositive, + UnsupportedDataType, + NoFurtherDataError, +) from zipline.utils.control_flow import nullctx -from zipline.errors import WindowLengthNotPositive, UnsupportedDataType +from zipline.utils.input_validation import expect_element, expect_types +from zipline.utils.numpy_utils import changed_locations +from zipline.utils.pandas_utils import nearest_unequal_elements from .sentinels import NotSpecified +from .term import Term class PositiveWindowLengthMixin(object): @@ -218,3 +230,210 @@ class LatestMixin(SingleInputMixin): actual=self.inputs[0].dtype, ) ) + + +_dt_to_period = { + 'Y': attrgetter('year'), + 'Q': attrgetter('quarter'), + 'M': attrgetter('month'), + 'W': attrgetter('week'), +} + + +def select_sampling_indices(dates, frequency): + """ + Choose entries from ``dates`` to use for downsampling at ``frequency``. + + Parameters + ---------- + dates : pd.DatetimeIndex + Dates from which to select sample choices. + frequency : {'Y', 'Q', 'M', 'W'} + Frequency at which samples are to be taken. + + Returns + ------- + indices : np.array[int64] + An array condtaining indices of dates on which samples should be taken. + + The resulting index will always include 0 as a sample index, and it + will include the first date of each subsequent year/quarter/month/week, + as determined by ``frequency``. + + Notes + ----- + This function assumes that ``dates`` does not have large gaps. + + In particular, it assumes that the maximum distance between any two entries + in ``dates`` is never greater than a year, which we rely on because we use + ``np.diff(dates.{quarter,month,week})`` to find dates where the sampling + period has changed. + """ + return changed_locations( + _dt_to_period[frequency](dates), + include_first=True + ) + + +class DownsampledMixin(StandardOutputs): + """ + Mixin for behavior shared by Downsampled{Factor,Filter,Classifier} + + A downsampled term is a wrapper around the "real" term that performs actual + computation. The downsampler is responsible for calling the real term's + `compute` method at selected intervals and forward-filling the computed + values. + + Downsampling is not currently supported for terms with multiple outputs. + """ + # There's no reason to take a window of a downsampled term. The whole + # point is that you're re-using the same result multiple times. + window_safe = False + + @expect_types(term=Term) + @expect_element(frequency=frozenset(_dt_to_period)) + def __new__(cls, term, frequency): + return super(DownsampledMixin, cls).__new__( + cls, + inputs=term.inputs, + outputs=term.outputs, + window_length=term.window_length, + mask=term.mask, + frequency=frequency, + wrapped_term=term, + dtype=term.dtype, + missing_value=term.missing_value, + ndim=term.ndim, + ) + + def _init(self, frequency, wrapped_term, *args, **kwargs): + self._frequency = frequency + self._wrapped_term = wrapped_term + return super(DownsampledMixin, self)._init(*args, **kwargs) + + @classmethod + def _static_identity(cls, frequency, wrapped_term, *args, **kwargs): + return ( + super(DownsampledMixin, cls)._static_identity(*args, **kwargs), + frequency, + wrapped_term, + ) + + def compute_extra_rows(self, + all_dates, + start_date, + end_date, + min_extra_rows): + """ + Ensure that min_extra_rows pushes us back to a computation date. + + Parameters + ---------- + all_dates : pd.DatetimeIndex + The trading sessions against which ``self`` will be computed. + start_date : pd.Timestamp + The first date for which final output is requested. + end_date : pd.Timestamp + The last date for which final output is requested. + min_extra_rows : int + The minimum number of extra rows required of ``self``, as + determined by other terms that depend on ``self``. + + Returns + ------- + extra_rows : int + The number of extra rows to compute. This will be the minimum + number of rows required to make our computed start_date fall on a + recomputation date. + """ + try: + current_start_pos = all_dates.get_loc(start_date) - min_extra_rows + if current_start_pos < 0: + raise NoFurtherDataError( + initial_message="Insufficient data to compute Pipeline:", + first_date=all_dates[0], + lookback_start=start_date, + lookback_length=min_extra_rows, + ) + except KeyError: + before, after = nearest_unequal_elements(all_dates, start_date) + raise ValueError( + "Pipeline start_date {start_date} is not in calendar.\n" + "Latest date before start_date is {before}.\n" + "Earliest date after start_date is {after}.".format( + start_date=start_date, + before=before, + after=after, + ) + ) + + # Our possible target dates are all the dates on or before the current + # starting position. + # TODO: Consider bounding this below by self.window_length + candidates = all_dates[:current_start_pos + 1] + + # Choose the latest date in the candidates that is the start of a new + # period at our frequency. + choices = select_sampling_indices(candidates, self._frequency) + + # If we have choices, the last choice is the first date if the + # period containing current_start_date. Choose it. + new_start_date = candidates[choices[-1]] + + # Add the difference between the new and old start dates to get the + # number of rows for the new start_date. + new_start_pos = all_dates.get_loc(new_start_date) + assert new_start_pos <= current_start_pos, \ + "Computed negative extra rows!" + + return min_extra_rows + (current_start_pos - new_start_pos) + + def _compute(self, windows, dates, assets, mask): + """ + Compute by delegating to self._wrapped_term._compute on sample dates. + + On non-sample dates, forward-fill from previously-computed samples. + """ + to_sample = dates[select_sampling_indices(dates, self._frequency)] + assert to_sample[0] == dates[0], \ + "Misaligned sampling dates in %s." % type(self).__name__ + + real_compute = self._wrapped_term._compute + + results = [] + samples = iter(to_sample) + next_sample = next(samples) + for i, compute_date in enumerate(dates): + if next_sample == compute_date: + results.append( + real_compute( + windows, + dates[i:i + 1], + assets, + mask[i:i + 1], + ) + ) + try: + next_sample = next(samples) + except StopIteration: + # No more samples to take. Set next_sample to Nat, which + # compares False with any other datetime. + next_sample = pd_NaT + else: + # Copy results from previous sample period. + results.append(results[-1]) + + # Force adjusted arrays forward one tick. + for w in windows: + next(w) + + # We should have exhausted our sample dates. + try: + next_sample = next(samples) + except StopIteration: + pass + else: + raise AssertionError("Unconsumed sample date: %s" % next_sample) + + # Concatenate stored results. + return vstack(results) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 5c7218e2..98941fe8 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -287,9 +287,28 @@ class Term(with_metaclass(ABCMeta, object)): """ Calculate the number of extra rows needed to compute ``self``. - Must return at least ``min_extra_rows``, but can optionally require - more. This is used by downsampled terms to ensure that the first date - computed is a recomputation date. + Must return at least ``min_extra_rows``, and the default implementation + is to just return ``min_extra_rows``. This is overridden by + downsampled terms to ensure that the first date computed is a + recomputation date. + + Parameters + ---------- + all_dates : pd.DatetimeIndex + The trading sessions against which ``self`` will be computed. + start_date : pd.Timestamp + The first date for which final output is requested. + end_date : pd.Timestamp + The last date for which final output is requested. + min_extra_rows : int + The minimum number of extra rows required of ``self``, as + determined by other terms that depend on ``self``. + + Returns + ------- + extra_rows : int + The number of extra rows to compute. Must be at least + ``min_extra_rows``. """ return min_extra_rows @@ -566,6 +585,31 @@ class ComputableTerm(Term): """ return data + def _downsampled_type(self): + """ + The expression type to return from self.downsample(). + """ + raise NotImplementedError( + "downsampling is not yet implemented " + "for instances of %s." % type(self).__name__ + ) + + def downsample(self, frequency): + """ + Make a term that computes from ``self`` at lower-than-daily frequency. + + Parameters + ---------- + frequency : str, {'Y', 'Q', 'M', 'W'} + A string indicating the desired sampling rate. + 'Y' -> sample on the first trading day of each calendar year + 'Q' -> sample on the first trading day of + January, April, July, and October + 'M' -> sample on the first trading day of each month + 'W' -> sample on the first trading day of each week + """ + return self._downsampled_type(term=self, frequency=frequency) + def __repr__(self): return ( "{type}({inputs}, window_length={window_length})" @@ -632,6 +676,12 @@ class Slice(ComputableTerm): # column. return windows[0][:, [asset_column]] + @property + def _downsampled_type(self): + raise NotImplementedError( + 'downsampling of slices is not yet supported' + ) + def validate_dtype(termname, dtype, missing_value): """