ENH: Add support for downsampling.

Adds a new ``downsample`` method to all computable terms.  Computable
terms (Filters, Factors, and Classifiers) can be downsampled to yearly,
quarterly, monthly, or weekly frequency.

The result of ``term.downsample`` is a new term of the same
family (Filter/Factor/Classifier) as ``term``.  The downsampled term
computes by delegating to the original term; repeatedly calling its
``compute`` method with length-1 date ranges.

Downsampled terms take advantage of a new ``compute_extra_rows`` Term
method, which allows terms to dynamically request that additional extra
rows of themselves be computed based on the dates for which they're
being computed.  This ensures, for example, that a monthly-downsampled
term always computes at the start of a month, even when a
naively-calculated pipeline window would end in the middle of the month.
This commit is contained in:
Scott Sanderson
2016-08-15 22:58:40 -04:00
parent 790ee59455
commit b40ebdcfce
7 changed files with 942 additions and 11 deletions
+615
View File
@@ -0,0 +1,615 @@
"""
Tests for Downsampled Filters/Factors/Classifiers
"""
import pandas as pd
from pandas.util.testing import assert_frame_equal
from zipline.pipeline import (
Pipeline,
CustomFactor,
CustomFilter,
CustomClassifier,
)
from zipline.pipeline.data.testing import TestingDataSet
from zipline.pipeline.factors import SimpleMovingAverage
from zipline.testing import ZiplineTestCase, parameter_space
from zipline.testing.fixtures import (
WithTradingSessions,
WithSeededRandomPipelineEngine,
)
class NDaysAgoFactor(CustomFactor):
inputs = [TestingDataSet.float_col]
def compute(self, today, assets, out, floats):
out[:] = floats[0]
class NDaysAgoFilter(CustomFilter):
inputs = [TestingDataSet.bool_col]
def compute(self, today, assets, out, bools):
out[:] = bools[0]
class NDaysAgoClassifier(CustomClassifier):
inputs = [TestingDataSet.categorical_col]
dtype = TestingDataSet.categorical_col.dtype
def compute(self, today, assets, out, cats):
out[:] = cats[0]
class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase):
DATA_MIN_DAY = pd.Timestamp('2012-06', tz='UTC')
DATA_MAX_DAY = pd.Timestamp('2015', tz='UTC')
TRADING_CALENDAR_STRS = ('NYSE',)
# Test with different window_lengths to ensure that window length is not
# used when calculating exra rows for the top-level term.
factor1 = TestingDataSet.float_col.latest
factor11 = NDaysAgoFactor(window_length=11)
factor91 = NDaysAgoFactor(window_length=91)
filter1 = TestingDataSet.bool_col.latest
filter11 = NDaysAgoFilter(window_length=11)
filter91 = NDaysAgoFilter(window_length=91)
classifier1 = TestingDataSet.categorical_col.latest
classifier11 = NDaysAgoClassifier(window_length=11)
classifier91 = NDaysAgoClassifier(window_length=91)
all_terms = [
factor1,
factor11,
factor91,
filter1,
filter11,
filter91,
classifier1,
classifier11,
classifier91,
]
@parameter_space(
calendar_name=TRADING_CALENDAR_STRS,
base_terms=[
(factor1, factor11, factor91),
(filter1, filter11, filter91),
(classifier1, classifier11, classifier91),
],
__fail_fast=True
)
def test_yearly(self, base_terms, calendar_name):
downsampled_terms = tuple(t.downsample('Y') for t in base_terms)
all_terms = base_terms + downsampled_terms
all_sessions = self.trading_sessions[calendar_name]
end_session = all_sessions[-1]
years = all_sessions.year
sessions_in_2012 = all_sessions[years == 2012]
sessions_in_2013 = all_sessions[years == 2013]
sessions_in_2014 = all_sessions[years == 2014]
# Simulate requesting computation where the unaltered lookback would
# land exactly on the first date in 2014. We shouldn't request any
# additional rows for the regular terms or the downsampled terms.
for i in range(0, 30, 5):
start_session = sessions_in_2014[i]
self.check_extra_row_calculations(
all_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land on the second date in 2014. We should request one more extra
# row in the downsampled terms to push us back to the first date in
# 2014.
for i in range(0, 30, 5):
start_session = sessions_in_2014[i + 1]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i + 1,
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land on the last date of 2013. The downsampled terms should request
# enough extra rows to push us back to the start of 2013.
for i in range(0, 30, 5):
start_session = sessions_in_2014[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(sessions_in_2013),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
# Simulate requesting computation where the unaltered lookback would
# land on the last date of 2012. The downsampled terms should request
# enough extra rows to push us back to the first known date, which is
# in the middle of 2012
for i in range(0, 30, 5):
start_session = sessions_in_2013[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(sessions_in_2012),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
@parameter_space(
calendar_name=TRADING_CALENDAR_STRS,
base_terms=[
(factor1, factor11, factor91),
(filter1, filter11, filter91),
(classifier1, classifier11, classifier91),
],
__fail_fast=True
)
def test_quarterly(self, calendar_name, base_terms):
downsampled_terms = tuple(t.downsample('Q') for t in base_terms)
all_terms = base_terms + downsampled_terms
# This region intersects with Q4 2013, Q1 2014, and Q2 2014.
tmp = self.trading_sessions[calendar_name]
all_sessions = tmp[tmp.slice_indexer('2013-12-15', '2014-04-30')]
end_session = all_sessions[-1]
months = all_sessions.month
Q4_2013 = all_sessions[months == 12]
Q1_2014 = all_sessions[(months == 1) | (months == 2) | (months == 3)]
Q2_2014 = all_sessions[months == 4]
# Simulate requesting computation where the unaltered lookback would
# land exactly on the first date in Q2 2014. We shouldn't request any
# additional rows for the regular terms or the downsampled terms.
for i in range(0, 15, 5):
start_session = Q2_2014[i]
self.check_extra_row_calculations(
all_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land exactly on the second date in Q2 2014.
# The downsampled terms should request one more extra row.
for i in range(0, 15, 5):
start_session = Q2_2014[i + 1]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i + 1,
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land exactly on the last date in Q1 2014. The downsampled terms
# should request enough extra rows to push us back to the first date of
# Q1 2014.
for i in range(0, 15, 5):
start_session = Q2_2014[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(Q1_2014),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
# Simulate requesting computation where the unaltered lookback would
# land exactly on the last date in Q4 2013. The downsampled terms
# should request enough extra rows to push us back to the first known
# date, which is in the middle of december 2013.
for i in range(0, 15, 5):
start_session = Q1_2014[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(Q4_2013),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
@parameter_space(
calendar_name=TRADING_CALENDAR_STRS,
base_terms=[
(factor1, factor11, factor91),
(filter1, filter11, filter91),
(classifier1, classifier11, classifier91),
],
__fail_fast=True
)
def test_monthly(self, calendar_name, base_terms):
downsampled_terms = tuple(t.downsample('M') for t in base_terms)
all_terms = base_terms + downsampled_terms
# This region intersects with Dec 2013, Jan 2014, and Feb 2014.
tmp = self.trading_sessions[calendar_name]
all_sessions = tmp[tmp.slice_indexer('2013-12-15', '2014-02-28')]
end_session = all_sessions[-1]
months = all_sessions.month
dec2013 = all_sessions[months == 12]
jan2014 = all_sessions[months == 1]
feb2014 = all_sessions[months == 2]
# Simulate requesting computation where the unaltered lookback would
# land exactly on the first date in feb 2014. We shouldn't request any
# additional rows for the regular terms or the downsampled terms.
for i in range(0, 10, 2):
start_session = feb2014[i]
self.check_extra_row_calculations(
all_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land on the second date in feb 2014. We should request one more
# extra row in the downsampled terms to push us back to the first date
# in 2014.
for i in range(0, 10, 2):
start_session = feb2014[i + 1]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i + 1,
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land on the last date of jan 2014. The downsampled terms should
# request enough extra rows to push us back to the start of jan 2014.
for i in range(0, 10, 2):
start_session = feb2014[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(jan2014),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
# Simulate requesting computation where the unaltered lookback would
# land on the last date of dec 2013. The downsampled terms should
# request enough extra rows to push us back to the first known date,
# which is in the middle of december 2013.
for i in range(0, 10, 2):
start_session = jan2014[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(dec2013),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
@parameter_space(
calendar_name=TRADING_CALENDAR_STRS,
base_terms=[
(factor1, factor11, factor91),
(filter1, filter11, filter91),
(classifier1, classifier11, classifier91),
],
__fail_fast=True
)
def test_weekly(self, calendar_name, base_terms):
downsampled_terms = tuple(t.downsample('W') for t in base_terms)
all_terms = base_terms + downsampled_terms
# December 2013
# Mo Tu We Th Fr Sa Su
# 1
# 2 3 4 5 6 7 8
# 9 10 11 12 13 14 15
# 16 17 18 19 20 21 22
# 23 24 25 26 27 28 29
# 30 31
# January 2014
# Mo Tu We Th Fr Sa Su
# 1 2 3 4 5
# 6 7 8 9 10 11 12
# 13 14 15 16 17 18 19
# 20 21 22 23 24 25 26
# 27 28 29 30 31
# This region intersects with the last full week of 2013, the week
# shared by 2013 and 2014, and the first full week of 2014.
tmp = self.trading_sessions[calendar_name]
all_sessions = tmp[tmp.slice_indexer('2013-12-27', '2014-01-12')]
end_session = all_sessions[-1]
week0 = all_sessions[
all_sessions.slice_indexer('2013-12-27', '2013-12-29')
]
week1 = all_sessions[
all_sessions.slice_indexer('2013-12-30', '2014-01-05')
]
week2 = all_sessions[
all_sessions.slice_indexer('2014-01-06', '2014-01-12')
]
# Simulate requesting computation where the unaltered lookback would
# land exactly on the first date in week 2. We shouldn't request any
# additional rows for the regular terms or the downsampled terms.
for i in range(3):
start_session = week2[i]
self.check_extra_row_calculations(
all_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land exactly on the second date in week 2. The downsampled terms
# should request one more extra row.
for i in range(3):
start_session = week2[i + 1]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i + 1,
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i,
expected_extra_rows=i,
)
# Simulate requesting computation where the unaltered lookback would
# land exactly on the last date in week 1. The downsampled terms
# should request enough extra rows to push us back to the first date of
# week 1.
for i in range(3):
start_session = week2[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(week1),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
# Simulate requesting computation where the unaltered lookback would
# land exactly on the last date in week0. The downsampled terms
# should request enough extra rows to push us back to the first known
# date, which is in the middle of december 2013.
for i in range(3):
start_session = week1[i]
self.check_extra_row_calculations(
downsampled_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + len(week0),
)
self.check_extra_row_calculations(
base_terms,
all_sessions,
start_session,
end_session,
min_extra_rows=i + 1,
expected_extra_rows=i + 1,
)
def check_extra_row_calculations(self,
terms,
all_sessions,
start_session,
end_session,
min_extra_rows,
expected_extra_rows):
"""
Check that each term in ``terms`` computes an expected number of extra
rows for the given parameters.
"""
for term in terms:
result = term.compute_extra_rows(
all_sessions,
start_session,
end_session,
min_extra_rows,
)
self.assertEqual(
result,
expected_extra_rows,
"Expected {} extra_rows from {}, but got {}.".format(
expected_extra_rows,
term,
result,
)
)
class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine,
ZiplineTestCase):
# Extend into the last few days of 2013 to test year/quarter boundaries.
START_DATE = pd.Timestamp('2013-12-15', tz='UTC')
# Extend into the first few days of 2015 to test year/quarter boundaries.
END_DATE = pd.Timestamp('2015-01-06', tz='UTC')
def test_downsample_windowed_factor(self):
f = SimpleMovingAverage(
inputs=[TestingDataSet.float_col],
window_length=5,
)
# June 2014
# Mo Tu We Th Fr Sa Su
# 1
# 2 3 4 5 6 7 8
# 9 10 11 12 13 14 15
# 16 17 18 19 20 21 22
# 23 24 25 26 27 28 29
# 30
all_sessions = self.nyse_sessions
compute_dates = all_sessions[
all_sessions.slice_indexer('2014-06-05', '2015-01-06')
]
start_date, end_date = compute_dates[[0, -1]]
pipe = Pipeline({
'year': f.downsample(frequency='Y'),
'quarter': f.downsample(frequency='Q'),
'month': f.downsample(frequency='M'),
'week': f.downsample(frequency='W'),
})
# Raw values for f, computed each day from 2014 to the end of the
# target period.
raw_f_results = self.run_pipeline(
Pipeline({'f': f}),
start_date=pd.Timestamp('2014-01-02', tz='UTC'),
end_date=pd.Timestamp('2015-01-06', tz='UTC'),
)['f'].unstack()
expected_results = {
'year': (raw_f_results
.groupby(pd.TimeGrouper('AS'))
.first()
.reindex(compute_dates, method='ffill')),
'quarter': (raw_f_results
.groupby(pd.TimeGrouper('QS'))
.first()
.reindex(compute_dates, method='ffill')),
'month': (raw_f_results
.groupby(pd.TimeGrouper('MS'))
.first()
.reindex(compute_dates, method='ffill')),
'week': (raw_f_results
.groupby(pd.TimeGrouper('W', label='left'))
.first()
.reindex(compute_dates, method='ffill')),
}
results = self.run_pipeline(pipe, start_date, end_date)
for frequency in expected_results:
result = results[frequency].unstack()
expected = expected_results[frequency]
assert_frame_equal(result, expected)
@@ -23,6 +23,7 @@ from zipline.utils.numpy_utils import (
from ..filters import ArrayPredicate, NotNullFilter, NullFilter, NumExprFilter
from ..mixins import (
CustomTermMixin,
DownsampledMixin,
LatestMixin,
PositiveWindowLengthMixin,
RestrictedDTypeMixin,
@@ -301,6 +302,10 @@ class Classifier(RestrictedDTypeMixin, ComputableTerm):
raise AssertionError("Expected a LabelArray, got %s." % type(data))
return data.as_categorical()
@property
def _downsampled_type(self):
return DownsampledClassifier
class Everything(Classifier):
"""
@@ -386,6 +391,18 @@ class Latest(LatestMixin, CustomClassifier):
pass
class DownsampledClassifier(DownsampledMixin, Classifier):
"""
A Classifier that defers to another Classifier at lower-than-daily
frequency.
Parameters
----------
term : zipline.Classifier
freq : {'Y', 'Q', 'M', 'W'}
"""
class InvalidClassifierComparison(TypeError):
def __init__(self, classifier, compval):
super(InvalidClassifierComparison, self).__init__(
+5 -7
View File
@@ -222,13 +222,11 @@ class SimplePipelineEngine(object):
finder = self._finder
start_idx, end_idx = self._calendar.slice_locs(start_date, end_date)
if start_idx < extra_rows:
raise NoFurtherDataError(
msg="Insufficient data to compute Pipeline.\n\n"
"start date was %s\n"
"earliest known date was %s\n"
"%d extra rows were required" % (
start_date, calendar[0], extra_rows,
),
raise NoFurtherDataError.from_lookback_window(
initial_message="Insufficient data to compute Pipeline:",
first_date=calendar[0],
lookback_start=start_date,
lookback_length=extra_rows,
)
# Build lifetimes matrix reaching back to `extra_rows` days before
+16
View File
@@ -33,6 +33,7 @@ from zipline.pipeline.filters import (
)
from zipline.pipeline.mixins import (
CustomTermMixin,
DownsampledMixin,
LatestMixin,
PositiveWindowLengthMixin,
RestrictedDTypeMixin,
@@ -1071,6 +1072,10 @@ class Factor(RestrictedDTypeMixin, ComputableTerm):
"""
return (-inf < self) & (self < inf)
@property
def _downsampled_type(self):
return DownsampledFactor
class NumExprFactor(NumericalExpression, Factor):
"""
@@ -1510,6 +1515,17 @@ class Latest(LatestMixin, CustomFactor):
out[:] = data[-1]
class DownsampledFactor(DownsampledMixin, Factor):
"""
A Factor that defers to another Factor at lower-than-daily frequency.
Parameters
----------
term : zipline.pipeline.Factor
freq : {'Y', 'Q', 'M', 'W'}
"""
# Functions to be passed to GroupedRowTransform. These aren't defined inline
# because the transformation function is part of the instance hash key.
def demean(row):
+16
View File
@@ -25,6 +25,7 @@ from zipline.pipeline.expression import (
)
from zipline.pipeline.mixins import (
CustomTermMixin,
DownsampledMixin,
LatestMixin,
PositiveWindowLengthMixin,
RestrictedDTypeMixin,
@@ -201,6 +202,10 @@ class Filter(RestrictedDTypeMixin, ComputableTerm):
)
return retval
@property
def _downsampled_type(self):
return DownsampledFilter
class NumExprFilter(NumericalExpression, Filter):
"""
@@ -458,6 +463,17 @@ class Latest(LatestMixin, CustomFilter):
pass
class DownsampledFilter(DownsampledMixin, Filter):
"""
A Filter that defers to another Filter at lower-than-daily frequency.
Parameters
----------
term : zipline.pipeline.Filter
freq : {'Y', 'Q', 'M', 'W'}
"""
class SingleAsset(Filter):
"""
A Filter that computes to True only for the given asset.
+220 -1
View File
@@ -1,16 +1,28 @@
"""
Mixins classes for use with Filters and Factors.
"""
from operator import attrgetter
from numpy import (
array,
full,
recarray,
vstack,
)
from pandas import NaT as pd_NaT
from zipline.errors import (
WindowLengthNotPositive,
UnsupportedDataType,
NoFurtherDataError,
)
from zipline.utils.control_flow import nullctx
from zipline.errors import WindowLengthNotPositive, UnsupportedDataType
from zipline.utils.input_validation import expect_element, expect_types
from zipline.utils.numpy_utils import changed_locations
from zipline.utils.pandas_utils import nearest_unequal_elements
from .sentinels import NotSpecified
from .term import Term
class PositiveWindowLengthMixin(object):
@@ -218,3 +230,210 @@ class LatestMixin(SingleInputMixin):
actual=self.inputs[0].dtype,
)
)
_dt_to_period = {
'Y': attrgetter('year'),
'Q': attrgetter('quarter'),
'M': attrgetter('month'),
'W': attrgetter('week'),
}
def select_sampling_indices(dates, frequency):
"""
Choose entries from ``dates`` to use for downsampling at ``frequency``.
Parameters
----------
dates : pd.DatetimeIndex
Dates from which to select sample choices.
frequency : {'Y', 'Q', 'M', 'W'}
Frequency at which samples are to be taken.
Returns
-------
indices : np.array[int64]
An array condtaining indices of dates on which samples should be taken.
The resulting index will always include 0 as a sample index, and it
will include the first date of each subsequent year/quarter/month/week,
as determined by ``frequency``.
Notes
-----
This function assumes that ``dates`` does not have large gaps.
In particular, it assumes that the maximum distance between any two entries
in ``dates`` is never greater than a year, which we rely on because we use
``np.diff(dates.{quarter,month,week})`` to find dates where the sampling
period has changed.
"""
return changed_locations(
_dt_to_period[frequency](dates),
include_first=True
)
class DownsampledMixin(StandardOutputs):
"""
Mixin for behavior shared by Downsampled{Factor,Filter,Classifier}
A downsampled term is a wrapper around the "real" term that performs actual
computation. The downsampler is responsible for calling the real term's
`compute` method at selected intervals and forward-filling the computed
values.
Downsampling is not currently supported for terms with multiple outputs.
"""
# There's no reason to take a window of a downsampled term. The whole
# point is that you're re-using the same result multiple times.
window_safe = False
@expect_types(term=Term)
@expect_element(frequency=frozenset(_dt_to_period))
def __new__(cls, term, frequency):
return super(DownsampledMixin, cls).__new__(
cls,
inputs=term.inputs,
outputs=term.outputs,
window_length=term.window_length,
mask=term.mask,
frequency=frequency,
wrapped_term=term,
dtype=term.dtype,
missing_value=term.missing_value,
ndim=term.ndim,
)
def _init(self, frequency, wrapped_term, *args, **kwargs):
self._frequency = frequency
self._wrapped_term = wrapped_term
return super(DownsampledMixin, self)._init(*args, **kwargs)
@classmethod
def _static_identity(cls, frequency, wrapped_term, *args, **kwargs):
return (
super(DownsampledMixin, cls)._static_identity(*args, **kwargs),
frequency,
wrapped_term,
)
def compute_extra_rows(self,
all_dates,
start_date,
end_date,
min_extra_rows):
"""
Ensure that min_extra_rows pushes us back to a computation date.
Parameters
----------
all_dates : pd.DatetimeIndex
The trading sessions against which ``self`` will be computed.
start_date : pd.Timestamp
The first date for which final output is requested.
end_date : pd.Timestamp
The last date for which final output is requested.
min_extra_rows : int
The minimum number of extra rows required of ``self``, as
determined by other terms that depend on ``self``.
Returns
-------
extra_rows : int
The number of extra rows to compute. This will be the minimum
number of rows required to make our computed start_date fall on a
recomputation date.
"""
try:
current_start_pos = all_dates.get_loc(start_date) - min_extra_rows
if current_start_pos < 0:
raise NoFurtherDataError(
initial_message="Insufficient data to compute Pipeline:",
first_date=all_dates[0],
lookback_start=start_date,
lookback_length=min_extra_rows,
)
except KeyError:
before, after = nearest_unequal_elements(all_dates, start_date)
raise ValueError(
"Pipeline start_date {start_date} is not in calendar.\n"
"Latest date before start_date is {before}.\n"
"Earliest date after start_date is {after}.".format(
start_date=start_date,
before=before,
after=after,
)
)
# Our possible target dates are all the dates on or before the current
# starting position.
# TODO: Consider bounding this below by self.window_length
candidates = all_dates[:current_start_pos + 1]
# Choose the latest date in the candidates that is the start of a new
# period at our frequency.
choices = select_sampling_indices(candidates, self._frequency)
# If we have choices, the last choice is the first date if the
# period containing current_start_date. Choose it.
new_start_date = candidates[choices[-1]]
# Add the difference between the new and old start dates to get the
# number of rows for the new start_date.
new_start_pos = all_dates.get_loc(new_start_date)
assert new_start_pos <= current_start_pos, \
"Computed negative extra rows!"
return min_extra_rows + (current_start_pos - new_start_pos)
def _compute(self, windows, dates, assets, mask):
"""
Compute by delegating to self._wrapped_term._compute on sample dates.
On non-sample dates, forward-fill from previously-computed samples.
"""
to_sample = dates[select_sampling_indices(dates, self._frequency)]
assert to_sample[0] == dates[0], \
"Misaligned sampling dates in %s." % type(self).__name__
real_compute = self._wrapped_term._compute
results = []
samples = iter(to_sample)
next_sample = next(samples)
for i, compute_date in enumerate(dates):
if next_sample == compute_date:
results.append(
real_compute(
windows,
dates[i:i + 1],
assets,
mask[i:i + 1],
)
)
try:
next_sample = next(samples)
except StopIteration:
# No more samples to take. Set next_sample to Nat, which
# compares False with any other datetime.
next_sample = pd_NaT
else:
# Copy results from previous sample period.
results.append(results[-1])
# Force adjusted arrays forward one tick.
for w in windows:
next(w)
# We should have exhausted our sample dates.
try:
next_sample = next(samples)
except StopIteration:
pass
else:
raise AssertionError("Unconsumed sample date: %s" % next_sample)
# Concatenate stored results.
return vstack(results)
+53 -3
View File
@@ -287,9 +287,28 @@ class Term(with_metaclass(ABCMeta, object)):
"""
Calculate the number of extra rows needed to compute ``self``.
Must return at least ``min_extra_rows``, but can optionally require
more. This is used by downsampled terms to ensure that the first date
computed is a recomputation date.
Must return at least ``min_extra_rows``, and the default implementation
is to just return ``min_extra_rows``. This is overridden by
downsampled terms to ensure that the first date computed is a
recomputation date.
Parameters
----------
all_dates : pd.DatetimeIndex
The trading sessions against which ``self`` will be computed.
start_date : pd.Timestamp
The first date for which final output is requested.
end_date : pd.Timestamp
The last date for which final output is requested.
min_extra_rows : int
The minimum number of extra rows required of ``self``, as
determined by other terms that depend on ``self``.
Returns
-------
extra_rows : int
The number of extra rows to compute. Must be at least
``min_extra_rows``.
"""
return min_extra_rows
@@ -566,6 +585,31 @@ class ComputableTerm(Term):
"""
return data
def _downsampled_type(self):
"""
The expression type to return from self.downsample().
"""
raise NotImplementedError(
"downsampling is not yet implemented "
"for instances of %s." % type(self).__name__
)
def downsample(self, frequency):
"""
Make a term that computes from ``self`` at lower-than-daily frequency.
Parameters
----------
frequency : str, {'Y', 'Q', 'M', 'W'}
A string indicating the desired sampling rate.
'Y' -> sample on the first trading day of each calendar year
'Q' -> sample on the first trading day of
January, April, July, and October
'M' -> sample on the first trading day of each month
'W' -> sample on the first trading day of each week
"""
return self._downsampled_type(term=self, frequency=frequency)
def __repr__(self):
return (
"{type}({inputs}, window_length={window_length})"
@@ -632,6 +676,12 @@ class Slice(ComputableTerm):
# column.
return windows[0][:, [asset_column]]
@property
def _downsampled_type(self):
raise NotImplementedError(
'downsampling of slices is not yet supported'
)
def validate_dtype(termname, dtype, missing_value):
"""