From 2a09160ca803760d1fa2c0e0ce702e5da0fd6d1b Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 30 Aug 2016 10:13:38 -0400 Subject: [PATCH] TST: add test to check previous columns w/ multiple qtrs MAINT: pass column to name dict MAINT: make check for invalid num columns py3-compatible --- tests/pipeline/test_adjusted_array.py | 197 ++- tests/pipeline/test_quarters_estimates.py | 1173 ++++++++++------- zipline/lib/adjustment.pyx | 50 +- zipline/pipeline/loaders/blaze/core.py | 9 +- zipline/pipeline/loaders/blaze/estimates.py | 10 +- zipline/pipeline/loaders/blaze/events.py | 18 +- zipline/pipeline/loaders/blaze/utils.py | 13 +- zipline/pipeline/loaders/quarter_estimates.py | 381 ++++-- zipline/pipeline/loaders/utils.py | 32 +- zipline/testing/core.py | 6 + zipline/testing/fixtures.py | 5 +- 11 files changed, 1154 insertions(+), 740 deletions(-) diff --git a/tests/pipeline/test_adjusted_array.py b/tests/pipeline/test_adjusted_array.py index a5bf3ed0..730af952 100644 --- a/tests/pipeline/test_adjusted_array.py +++ b/tests/pipeline/test_adjusted_array.py @@ -202,11 +202,7 @@ def _gen_multiplicative_adjustment_cases(dtype): ) -def _gen_overwrite_adjustment_cases(name, - make_input, - make_expected_output, - dtype, - missing_value): +def _gen_overwrite_adjustment_cases(dtype): """ Generate test cases for overwrite adjustments. @@ -226,6 +222,8 @@ def _gen_overwrite_adjustment_cases(name, unicode_dtype: ObjectOverwrite, object_dtype: ObjectOverwrite, }[dtype] + make_expected_dtype = as_dtype(dtype) + missing_value = default_missing_value_for_dtype(datetime64ns_dtype) if dtype == object_dtype: # When we're testing object dtypes, we expect to have strings, but @@ -237,30 +235,30 @@ def _gen_overwrite_adjustment_cases(name, adjustments = {} buffer_as_of = [None] * 6 - baseline = make_input([[2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + baseline = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) - buffer_as_of[0] = make_expected_output([[2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[0] = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) # Note that row indices are inclusive! adjustments[1] = [ adjustment_type(0, 0, 0, 0, make_overwrite_value(dtype, 1)), ] - buffer_as_of[1] = make_expected_output([[1, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[1] = make_expected_dtype([[1, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) # No adjustment at index 2. buffer_as_of[2] = buffer_as_of[1] @@ -269,33 +267,33 @@ def _gen_overwrite_adjustment_cases(name, adjustment_type(1, 2, 1, 1, make_overwrite_value(dtype, 3)), adjustment_type(0, 1, 0, 0, make_overwrite_value(dtype, 4)), ] - buffer_as_of[3] = make_expected_output([[4, 2, 2], - [4, 3, 2], - [2, 3, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[3] = make_expected_dtype([[4, 2, 2], + [4, 3, 2], + [2, 3, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) adjustments[4] = [ adjustment_type(0, 3, 2, 2, make_overwrite_value(dtype, 5)) ] - buffer_as_of[4] = make_expected_output([[4, 2, 5], - [4, 3, 5], - [2, 3, 5], - [2, 2, 5], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[4] = make_expected_dtype([[4, 2, 5], + [4, 3, 5], + [2, 3, 5], + [2, 2, 5], + [2, 2, 2], + [2, 2, 2]]) adjustments[5] = [ adjustment_type(0, 4, 1, 1, make_overwrite_value(dtype, 6)), adjustment_type(2, 2, 2, 2, make_overwrite_value(dtype, 7)), ] - buffer_as_of[5] = make_expected_output([[4, 6, 5], - [4, 6, 5], - [2, 6, 7], - [2, 6, 5], - [2, 6, 2], - [2, 2, 2]]) + buffer_as_of[5] = make_expected_dtype([[4, 6, 5], + [4, 6, 5], + [2, 6, 7], + [2, 6, 5], + [2, 6, 2], + [2, 2, 2]]) return _gen_expectations( baseline, @@ -306,11 +304,7 @@ def _gen_overwrite_adjustment_cases(name, ) -def _gen_overwrite_1d_array_adjustment_case(name, - make_input, - make_expected_output, - dtype, - missing_value): +def _gen_overwrite_1d_array_adjustment_case(dtype): """ Generate test cases for overwrite adjustments. @@ -327,21 +321,24 @@ def _gen_overwrite_1d_array_adjustment_case(name, float64_dtype: Float641DArrayOverwrite, datetime64ns_dtype: Datetime641DArrayOverwrite, }[dtype] + make_expected_dtype = as_dtype(dtype) + missing_value = default_missing_value_for_dtype(datetime64ns_dtype) + adjustments = {} buffer_as_of = [None] * 6 - baseline = make_input([[2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + baseline = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) - buffer_as_of[0] = make_expected_output([[2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[0] = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) vals1 = [1] # Note that row indices are inclusive! @@ -351,12 +348,12 @@ def _gen_overwrite_1d_array_adjustment_case(name, array([coerce_to_dtype(dtype, val) for val in vals1]) ) ] - buffer_as_of[1] = make_input([[1, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[1] = make_expected_dtype([[1, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) # No adjustment at index 2. buffer_as_of[2] = buffer_as_of[1] @@ -368,12 +365,12 @@ def _gen_overwrite_1d_array_adjustment_case(name, array([coerce_to_dtype(dtype, val) for val in vals3]) ) ] - buffer_as_of[3] = make_input([[4, 2, 2], - [4, 2, 2], - [1, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[3] = make_expected_dtype([[4, 2, 2], + [4, 2, 2], + [1, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) vals4 = [5] * 4 adjustments[4] = [ @@ -381,12 +378,12 @@ def _gen_overwrite_1d_array_adjustment_case(name, 0, 3, 2, 2, array([coerce_to_dtype(dtype, val) for val in vals4])) ] - buffer_as_of[4] = make_input([[4, 2, 5], - [4, 2, 5], - [1, 2, 5], - [2, 2, 5], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[4] = make_expected_dtype([[4, 2, 5], + [4, 2, 5], + [1, 2, 5], + [2, 2, 5], + [2, 2, 2], + [2, 2, 2]]) vals5 = range(1, 6) adjustments[5] = [ @@ -394,12 +391,12 @@ def _gen_overwrite_1d_array_adjustment_case(name, 0, 4, 1, 1, array([coerce_to_dtype(dtype, val) for val in vals5])), ] - buffer_as_of[5] = make_input([[4, 1, 5], - [4, 2, 5], - [1, 3, 5], - [2, 4, 5], - [2, 5, 2], - [2, 2, 2]]) + buffer_as_of[5] = make_expected_dtype([[4, 1, 5], + [4, 2, 5], + [1, 3, 5], + [2, 4, 5], + [2, 5, 2], + [2, 2, 2]]) return _gen_expectations( baseline, @@ -532,38 +529,10 @@ class AdjustedArrayTestCase(TestCase): @parameterized.expand( chain( - _gen_overwrite_adjustment_cases( - 'float', - make_input=as_dtype(float64_dtype), - make_expected_output=as_dtype(float64_dtype), - dtype=float64_dtype, - missing_value=default_missing_value_for_dtype(float64_dtype), - ), - _gen_overwrite_adjustment_cases( - 'datetime', - make_input=as_dtype(datetime64ns_dtype), - make_expected_output=as_dtype(datetime64ns_dtype), - dtype=datetime64ns_dtype, - missing_value=default_missing_value_for_dtype( - datetime64ns_dtype, - ), - ), - _gen_overwrite_1d_array_adjustment_case( - 'float', - make_input=as_dtype(float64_dtype), - make_expected_output=as_dtype(float64_dtype), - dtype=float64_dtype, - missing_value=default_missing_value_for_dtype(float64_dtype), - ), - _gen_overwrite_1d_array_adjustment_case( - 'datetime', - make_input=as_dtype(datetime64ns_dtype), - make_expected_output=as_dtype(datetime64ns_dtype), - dtype=datetime64ns_dtype, - missing_value=default_missing_value_for_dtype( - datetime64ns_dtype, - ), - ), + _gen_overwrite_adjustment_cases(float64_dtype), + _gen_overwrite_adjustment_cases(datetime64ns_dtype), + _gen_overwrite_1d_array_adjustment_case(float64_dtype), + _gen_overwrite_1d_array_adjustment_case(datetime64ns_dtype), # There are six cases here: # Using np.bytes/np.unicode/object arrays as inputs. # Passing np.bytes/np.unicode/object arrays to LabelArray, diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index bb263f22..9217df77 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -1,8 +1,10 @@ import blaze as bz import itertools +from nose.tools import assert_true from nose_parameterized import parameterized import numpy as np import pandas as pd +from pandas.util.testing import assert_frame_equal from toolz import merge from zipline.pipeline import SimplePipelineEngine, Pipeline, CustomFactor @@ -13,19 +15,26 @@ from zipline.pipeline.common import ( SID_FIELD_NAME, TS_FIELD_NAME, ) -from zipline.pipeline.data import DataSet, Column +from zipline.pipeline.data import DataSet +from zipline.pipeline.data import Column from zipline.pipeline.loaders.blaze.estimates import ( BlazeNextEstimatesLoader, BlazePreviousEstimatesLoader ) from zipline.pipeline.loaders.quarter_estimates import ( NextQuartersEstimatesLoader, + normalize_quarters, PreviousQuartersEstimatesLoader, - split_normalized_quarters, normalize_quarters) -from zipline.testing import ZiplineTestCase -from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions + split_normalized_quarters, +) +from zipline.testing.fixtures import ( + WithAssetFinder, + WithTradingSessions, + ZiplineTestCase, +) from zipline.testing.predicates import assert_equal -from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype +from zipline.utils.numpy_utils import datetime64ns_dtype +from zipline.utils.numpy_utils import float64_dtype class Estimates(DataSet): @@ -42,276 +51,267 @@ def QuartersEstimates(num_qtr): return QtrEstimates -# 0Q1: 2015-01-05.Q1.e1.2015-01-06, 2015-01-10.Q1.e1.2015-01-11, -# 0Q2: 2015-01-15.Q2.e1.2015-01-16, 2015-01-20.Q2.e1.2015-01-21, -# 0Q4: 2015-02-05.Q4.e1.2015-02-06, 2015-02-10.Q4.e1.2015-02-11, -# Skip Q3 to make sure we handle skipped quarter data correctly. -estimates_timeline = pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), pd.Timestamp('2015-01-07'), - pd.Timestamp('2015-01-05'), pd.Timestamp('2015-01-17'), - pd.Timestamp('2015-01-05'), pd.Timestamp('2015-01-17'), - pd.Timestamp('2015-01-22'), pd.Timestamp('2015-02-02')], - EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-10'), pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-01-20'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-02-10'), pd.Timestamp('2015-02-10'), - pd.Timestamp('2015-02-10'), pd.Timestamp('2015-02-10')], - 'estimate': [1.]*2 + [2.] * 2 + [4.] * 4, - FISCAL_QUARTER_FIELD_NAME: [1]*2 + [2] * 2 + [4] * 4, - FISCAL_YEAR_FIELD_NAME: [2015]*8, - SID_FIELD_NAME: [0]*8 -}) +class WithEstimates(WithTradingSessions, WithAssetFinder): + """ + ZiplineTestCase mixin providing cls.loader and cls.events as class + level fixtures. -# Final release dates never change. The quarters have very tight date ranges -# in order to reduce the number of dates we need to iterate through when -# testing. -releases = pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')], - EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-15'), - pd.Timestamp('2015-01-31')], - 'estimate': [0.5, 0.8], - FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0], - FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0] -}) + Methods + ------- + make_loader(events, columns) -> PipelineLoader + Method which returns the loader to be used throughout tests. -q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'), - pd.Timestamp('2015-01-08'), pd.Timestamp('2015-01-12')] -q2_knowledge_dates = [pd.Timestamp('2015-01-16'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-24'), pd.Timestamp('2015-01-28')] -# We want to model the possibility of an estimate predicting a release date -# that doesn't match the actual release. This could be done by dynamically -# generating more combinations with different release dates, but that -# significantly increases the amount of time it takes to run the tests. These -# hard-coded cases are sufficient to know that we can update our beliefs when -# we get new information. -q1_release_dates = [pd.Timestamp('2015-01-15'), - pd.Timestamp('2015-01-16')] # One day late -q2_release_dates = [pd.Timestamp('2015-01-30'), # One day early - pd.Timestamp('2015-01-31')] -estimates = pd.DataFrame({ - EVENT_DATE_FIELD_NAME: q1_release_dates + q2_release_dates, - 'estimate': [.1, .2, .3, .4], - FISCAL_QUARTER_FIELD_NAME: [1.0, 1.0, 2.0, 2.0], - FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0, 2015.0, 2015.0] -}) + events : pd.DataFrame + The raw events to be used as input to the pipeline loader. + columns : dict[str -> str] + The dictionary mapping the names of BoundColumns to the + associated column name in the events DataFrame. + """ - -def gen_estimates(): - sid_estimates = [] - sid_releases = [] - for sid, (q1e1, q1e2, q2e1, q2e2) in enumerate( - itertools.permutations(q1_knowledge_dates + q2_knowledge_dates, - 4) - ): - # We're assuming that estimates must come before the relevant release. - if (q1e1 < q1e2 and - q2e1 < q2e2 and - q1e1 < q1_release_dates[0] and - q1e2 < q1_release_dates[1]): - sid_estimate = estimates.copy(True) - sid_estimate[TS_FIELD_NAME] = [q1e1, q1e2, q2e1, q2e2] - sid_estimate[SID_FIELD_NAME] = sid - sid_estimates += [sid_estimate] - sid_release = releases.copy(True) - sid_release[SID_FIELD_NAME] = sid_estimate[SID_FIELD_NAME] - sid_releases += [sid_release] - return pd.concat(sid_estimates + sid_releases).reset_index(drop=True) - - -class EstimateTestCase(WithAssetFinder, - WithTradingSessions, - ZiplineTestCase): + # Short window defined in order for test to run faster. START_DATE = pd.Timestamp('2014-12-28') END_DATE = pd.Timestamp('2015-02-03') @classmethod def make_loader(cls, events, columns): - pass + raise NotImplementedError('make_loader') @classmethod def init_class_fixtures(cls): - cls.sids = cls.events['sid'].unique() + cls.sids = cls.events[SID_FIELD_NAME].unique() cls.columns = { - Estimates.estimate: 'estimate', - Estimates.event_date: EVENT_DATE_FIELD_NAME, - Estimates.fiscal_quarter: FISCAL_QUARTER_FIELD_NAME, - Estimates.fiscal_year: FISCAL_YEAR_FIELD_NAME, + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.estimate: 'estimate' } - cls.loader = cls.make_loader(cls.events, cls.columns) - + cls.loader = cls.make_loader(cls.events, {column.name: val for + column, val in + cls.columns.items()}) cls.ASSET_FINDER_EQUITY_SIDS = list( cls.events[SID_FIELD_NAME].unique() ) cls.ASSET_FINDER_EQUITY_SYMBOLS = [ 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS ] - super(EstimateTestCase, cls).init_class_fixtures() + super(WithEstimates, cls).init_class_fixtures() + + +class WithWrongNumQuarters(WithEstimates): + """ + ZiplineTestCase mixin providing cls.events as a class level fixture and + defining a test for all inheritors to use. + + Attributes + ---------- + events : pd.DataFrame + A simple DataFrame with columns needed for estimates and a single sid + and no other data. + + Tests + ------ + test_wrong_num_quarters_passed() + Tests that loading with an incorrect quarter number raises an error. + """ + events = pd.DataFrame({SID_FIELD_NAME: 0}, + columns=[SID_FIELD_NAME, + TS_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + 'estimate'], + index=[0]) + + def test_wrong_num_quarters_passed(self): + dataset = QuartersEstimates(-1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + p = Pipeline({c.name: c.latest for c in dataset.columns}) - def _test_wrong_num_quarters_passed(self): with self.assertRaises(ValueError): - dataset = QuartersEstimates(-1) - engine = SimplePipelineEngine( - lambda x: self.loader, - self.trading_days, - self.asset_finder, - ) - engine.run_pipeline( - Pipeline({c.name: c.latest for c in dataset.columns}), + p, start_date=self.trading_days[0], end_date=self.trading_days[-1], ) -window_test_cases = [ - (window_len, start_idx, num_quarters_out) for - (window_len, start_idx), num_quarters_out in - itertools.product( - [[5, pd.Timestamp('2015-01-09').tz_localize('utc')], - [6, pd.Timestamp('2015-01-12').tz_localize('utc')], - [11, pd.Timestamp('2015-01-20').tz_localize('utc')], - [19, pd.Timestamp('2015-01-30').tz_localize('utc')], - [26, pd.Timestamp('2015-02-10').tz_localize('utc')]], - [1, 2, 3, 4]) -] - - -class NextEstimateWindowsTestCase(EstimateTestCase): - START_DATE = pd.Timestamp('2014-12-31') - END_DATE = pd.Timestamp('2015-02-15') - events = estimates_timeline - - @classmethod - def make_loader(cls, events, columns): - return NextQuartersEstimatesLoader(events, columns) - - @parameterized.expand(window_test_cases) - def test_next_estimate_windows_at_quarter_boundaries(self, - window_len, - start_idx, - num_quarters_out): - """ - Tests that we overwrite values with the correct quarter's estimate at - the correct dates. - """ - dataset = QuartersEstimates(num_quarters_out) - - class SomeFactor(CustomFactor): - inputs = [dataset.estimate] - window_length = window_len - - def compute(self, today, assets, out, *inputs): - unique_inputs = np.unique(inputs).tolist() - requested_quarter = None - if (pd.Timestamp('2015-02-10').tz_localize('utc') >= today >= - pd.Timestamp('2015-01-05').tz_localize('utc')): - next_quarter = estimates_timeline[ - estimates_timeline[EVENT_DATE_FIELD_NAME] >= today - ].min()[FISCAL_QUARTER_FIELD_NAME] - requested_quarter = next_quarter + num_quarters_out - 1 - - # If we know something about the requested quarter, assert - # that all our estimates in the window are about that quarter. - if requested_quarter and requested_quarter <= 4 and \ - requested_quarter != 3: - assert np.equal(unique_inputs, requested_quarter).all() - else: - # We don't have any information yet about the next quarter - # or about the requested quarter; in that case, all our - # estimates in the window should be NaN across time. - assert np.isnan(unique_inputs).all() - - engine = SimplePipelineEngine( - lambda x: self.loader, - self.trading_days, - self.asset_finder, - ) - engine.run_pipeline( - Pipeline({'est': SomeFactor()}), - start_date=start_idx, - end_date=self.trading_days[-1], - ) - - -class PreviousEstimateWindowsTestCase(EstimateTestCase): - START_DATE = pd.Timestamp('2014-12-31') - END_DATE = pd.Timestamp('2015-02-15') - events = estimates_timeline - +class PreviousWithWrongNumQuarters(WithWrongNumQuarters, + ZiplineTestCase): + """ + Tests that previous quarter loader correctly breaks if an incorrect + number of quarters is passed. + """ @classmethod def make_loader(cls, events, columns): return PreviousQuartersEstimatesLoader(events, columns) - @parameterized.expand(window_test_cases) - def test_previous_estimate_windows_at_quarter_boundaries(self, - window_len, - start_idx, - num_quarters_out): - """ - Tests that we overwrite values with the correct quarter's estimate at - the correct dates. - """ - dataset = QuartersEstimates(num_quarters_out) - - class SomeFactor(CustomFactor): - inputs = [dataset.estimate] - window_length = window_len - - def compute(self, today, assets, out, *inputs): - unique_inputs = np.unique(inputs).tolist() - requested_quarter = None - if today >= pd.Timestamp('2015-01-12').tz_localize('utc'): - previous_quarter = estimates_timeline[ - estimates_timeline[EVENT_DATE_FIELD_NAME] <= today - ].max()[FISCAL_QUARTER_FIELD_NAME] - requested_quarter = ( - previous_quarter - (num_quarters_out - 1) - ) - - # If we know something about the requested quarter, assert - # that all our estimates in the window are about that quarter. - if requested_quarter and requested_quarter >= 0 and \ - requested_quarter != 3: - assert np.equal(unique_inputs, requested_quarter).all() - else: - # We don't have any information yet about the previous - # quarter - # or about the requested quarter; in that case, all our - # estimates in the window should be NaN across time. - assert np.isnan(unique_inputs).all() - - engine = SimplePipelineEngine( - lambda x: self.loader, - self.trading_days, - self.asset_finder, - ) - engine.run_pipeline( - Pipeline({'est': SomeFactor()}), - start_date=start_idx, - end_date=self.trading_days[-1], - ) - - -class NextEstimateTestCase(EstimateTestCase): - events = gen_estimates() +class NextWithWrongNumQuarters(WithWrongNumQuarters, + ZiplineTestCase): + """ + Tests that next quarter loader correctly breaks if an incorrect + number of quarters is passed. + """ @classmethod def make_loader(cls, events, columns): return NextQuartersEstimatesLoader(events, columns) - def test_next_estimates(self): + +class WithEstimatesT0(WithEstimates): + """ + ZiplineTestCase mixin providing cls.events as a class level fixture and + defining a test for all inheritors to use. + + Attributes + ---------- + cls.events : pd.DataFrame + Generated dynamically in order to test inter-leavings of estimates and + event dates for multiple quarters to make sure that we select the + right immediate 'next' or 'previous' quarter relative to each date - + i.e., the right 't0' on the timeline. We care about selecting the + right 't0' because we use that to calculate which quarter's data needs + to be returned for each day. + + Methods + ------- + get_expected_estimate(q1_knowledge, + q2_knowledge, + comparable_date) -> pd.DataFrame + Retrieves the expected estimate given the latest knowledge about each + quarter and the date on which the estimate is being requested. If + there is no expected estimate, returns an empty DataFrame. + + Tests + ------ + test_estimates() + Tests that we get the right 't0' value on each day for each sid and + for each column. + """ + q1_knowledge_dates = [pd.Timestamp('2015-01-01'), + pd.Timestamp('2015-01-04'), + pd.Timestamp('2015-01-08'), + pd.Timestamp('2015-01-12')] + q2_knowledge_dates = [pd.Timestamp('2015-01-16'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-24'), + pd.Timestamp('2015-01-28')] + # We want to model the possibility of an estimate predicting a release date + # that doesn't match the actual release. This could be done by dynamically + # generating more combinations with different release dates, but that + # significantly increases the amount of time it takes to run the tests. + # These hard-coded cases are sufficient to know that we can update our + # beliefs when we get new information. + q1_release_dates = [pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-16')] # One day late + q2_release_dates = [pd.Timestamp('2015-01-30'), # One day early + pd.Timestamp('2015-01-31')] + + @classmethod + def gen_estimates(cls): """ - The goal of this test is to make sure that we select the right - datapoint as our 'next' w.r.t each date. + In order to determine which estimate we care about for a particular + sid, we need to look at all estimates that we have for that sid and + their associated event dates. + + We define q1 < q2, and thus event1 < event2 since event1 occurs + during q1 and event2 occurs during q2 and we assume that there can + only be 1 event per quarter. We assume that there can be multiple + estimates per quarter leading up to the event. We assume that estimates + will not surpass the relevant event date. We will look at 2 estimates + for an event before the event occurs, since that is the simplest + scenario that covers the interesting edge cases: + - estimate values changing + - a release date changing + - estimates for different quarters interleaving + + Thus, we generate all possible inter-leavings of 2 estimates per + quarter-event where estimate1 < estimate2 and all estimates are < the + relevant event and assign each of these inter-leavings to a + different sid. """ + + sid_estimates = [] + sid_releases = [] + # We want all permutations of 2 knowledge dates per quarter. + it = enumerate( + itertools.permutations(cls.q1_knowledge_dates + + cls.q2_knowledge_dates, + 4) + ) + for sid, (q1e1, q1e2, q2e1, q2e2) in it: + # We're assuming that estimates must come before the relevant + # release. + if (q1e1 < q1e2 and + q2e1 < q2e2 and + # All estimates are < Q2's event, so just constrain Q1 + # estimates. + q1e1 < cls.q1_release_dates[0] and + q1e2 < cls.q1_release_dates[0]): + sid_estimates.append(cls.create_estimates_df(q1e1, + q1e2, + q2e1, + q2e2, + sid)) + sid_releases.append(cls.create_releases_df(sid)) + + return pd.concat(sid_estimates + sid_releases).reset_index(drop=True) + + @classmethod + def create_releases_df(cls, sid): + # Final release dates never change. The quarters have very tight date + # ranges in order to reduce the number of dates we need to iterate + # through when testing. + return pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-31')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-31')], + 'estimate': [0.5, 0.8], + FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0], + FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0], + SID_FIELD_NAME: sid + }) + + @classmethod + def create_estimates_df(cls, + q1e1, + q1e2, + q2e1, + q2e2, + sid): + return pd.DataFrame({ + EVENT_DATE_FIELD_NAME: cls.q1_release_dates + cls.q2_release_dates, + 'estimate': [.1, .2, .3, .4], + FISCAL_QUARTER_FIELD_NAME: [1.0, 1.0, 2.0, 2.0], + FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0, 2015.0, 2015.0], + TS_FIELD_NAME: [q1e1, q1e2, q2e1, q2e2], + SID_FIELD_NAME: sid, + }) + + @classmethod + def init_class_fixtures(cls): + # Must be generated before call to super since super uses `events`. + cls.events = cls.gen_estimates() + super(WithEstimatesT0, cls).init_class_fixtures() + + def get_expected_estimate(self, + q1_knowledge, + q2_knowledge, + comparable_date): + return pd.DataFrame() + + def test_estimates(self): dataset = QuartersEstimates(1) engine = SimplePipelineEngine( lambda x: self.loader, self.trading_days, self.asset_finder, ) - results = engine.run_pipeline( Pipeline({c.name: c.latest for c in dataset.columns}), start_date=self.trading_days[0], @@ -328,63 +328,179 @@ class NextEstimateTestCase(EstimateTestCase): ts_eligible_estimates = ts_sorted_estimates[ ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date ] - expected_estimate = pd.DataFrame() + # If there are estimates we know about: if not ts_eligible_estimates.empty: + # Determine the last piece of information we know about + # for q1 and q2. This takes advantage of the fact that we + # only have 2 quarters in the test data. q1_knowledge = ts_eligible_estimates[ ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 ] q2_knowledge = ts_eligible_estimates[ ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 ] - - # If our latest knowledge of q1 is that the release is - # happening on this simulation date or later, then that's - # the estimate we want to use. - if (not q1_knowledge.empty and - q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >= - comparable_date): - expected_estimate = q1_knowledge.iloc[-1] - # If q1 has already happened or we don't know about it - # yet and our latest knowledge indicates that q2 hasn't - # happend yet, then that's the estimate we want to use. - elif (not q2_knowledge.empty and - q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >= - comparable_date): - expected_estimate = q2_knowledge.iloc[-1] - if not expected_estimate.empty: - for colname in sid_estimates.columns: - expected_value = expected_estimate[colname] - computed_value = sid_estimates.iloc[i][colname] - assert_equal(expected_value, computed_value) + expected_estimate = self.get_expected_estimate( + q1_knowledge, + q2_knowledge, + comparable_date, + ) + if not expected_estimate.empty: + for colname in sid_estimates.columns: + expected_value = expected_estimate[colname] + computed_value = sid_estimates.iloc[i][colname] + assert_equal(expected_value, computed_value) + else: + # There are no eligible 'next' estimates on this day; + # everything should be null. + assert_true(sid_estimates.iloc[i].isnull().all()) else: - assert sid_estimates.iloc[i].isnull().all() - - def test_wrong_num_quarters_passed(self): - self._test_wrong_num_quarters_passed() + # We don't know about any estimates on this day; + # everything should be null. + assert_true(sid_estimates.iloc[i].isnull().all()) -class NextEstimateMultipleQuartersTestCase(EstimateTestCase): - events = pd.DataFrame({ - SID_FIELD_NAME: [0] * 2, - TS_FIELD_NAME: [pd.Timestamp('2015-01-01'), - pd.Timestamp('2015-01-06')], - EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-01-20')], - 'estimate': [1., 2.], - FISCAL_QUARTER_FIELD_NAME: [1, 2], - FISCAL_YEAR_FIELD_NAME: [2015, 2015] - }) - +class NextEstimate(WithEstimatesT0, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): return NextQuartersEstimatesLoader(events, columns) + def get_expected_estimate(self, + q1_knowledge, + q2_knowledge, + comparable_date): + # If our latest knowledge of q1 is that the release is + # happening on this simulation date or later, then that's + # the estimate we want to use. + if (not q1_knowledge.empty and + q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >= + comparable_date): + return q1_knowledge.iloc[-1] + # If q1 has already happened or we don't know about it + # yet and our latest knowledge indicates that q2 hasn't + # happened yet, then that's the estimate we want to use. + elif (not q2_knowledge.empty and + q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >= + comparable_date): + return q2_knowledge.iloc[-1] + return pd.DataFrame() + + +class BlazeNextEstimateLoaderTestCase(NextEstimate): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) + + +class PreviousEstimate(WithEstimatesT0, ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return PreviousQuartersEstimatesLoader(events, columns) + + def get_expected_estimate(self, + q1_knowledge, + q2_knowledge, + comparable_date): + + # The expected estimate will be for q2 if the last thing + # we've seen is that the release date already happened. + # Otherwise, it'll be for q1, as long as the release date + # for q1 has already happened. + if (not q2_knowledge.empty and + q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <= + comparable_date): + return q2_knowledge.iloc[-1] + elif (not q1_knowledge.empty and + q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <= + comparable_date): + return q1_knowledge.iloc[-1] + return pd.DataFrame() + + +class BlazePreviousEstimateLoaderTestCase(PreviousEstimate): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) + + +class WithEstimateMultipleQuarters(WithEstimates): + """ + ZiplineTestCase mixin providing cls.events, cls.make_expected_out as + class-level fixtures and self.test_multiple_qtrs_requested as a test. + + Attributes + ---------- + events : pd.DataFrame + Simple DataFrame with estimates for 2 quarters for a single sid. + + Methods + ------- + make_expected_out() --> pd.DataFrame + Returns the DataFrame that is expected as a result of running a + Pipeline where estimates are requested for multiple quarters out. + fill_expected_out(expected) + Fills the expected DataFrame with data. + + Tests + ------ + test_multiple_qtrs_requested() + Runs a Pipeline that calculate which estimates for multiple quarters + out and checks that the returned columns contain data for the correct + number of quarters out. + """ + events = pd.DataFrame({ + SID_FIELD_NAME: [0] * 2, + TS_FIELD_NAME: [pd.Timestamp('2015-01-01'), + pd.Timestamp('2015-01-06')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-01-20')], + 'estimate': [1., 2.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: [2015, 2015] + }) + + @classmethod + def init_class_fixtures(cls): + super(WithEstimateMultipleQuarters, cls).init_class_fixtures() + cls.expected_out = cls.make_expected_out() + + @classmethod + def make_expected_out(cls): + expected = pd.DataFrame(columns=[cls.columns[col] + '1' + for col in cls.columns] + + [cls.columns[col] + '2' + for col in cls.columns], + index=cls.trading_days) + + for (col, raw_name), suffix in itertools.product( + cls.columns.items(), ('1', '2') + ): + expected_name = raw_name + suffix + if col.dtype == datetime64ns_dtype: + expected[expected_name] = pd.to_datetime( + expected[expected_name] + ) + else: + expected[expected_name] = expected[ + expected_name + ].astype(col.dtype) + cls.fill_expected_out(expected) + return expected.reindex(cls.trading_days) + def test_multiple_qtrs_requested(self): - """ - This test asks for datasets that calculate which estimates to - return for multiple quarters out and checks that the returned columns - contain data for the correct number of quarters out. - """ dataset1 = QuartersEstimates(1) dataset2 = QuartersEstimates(2) engine = SimplePipelineEngine( @@ -406,202 +522,363 @@ class NextEstimateMultipleQuartersTestCase(EstimateTestCase): # We now expect a column for 1 quarter out and a column for 2 # quarters out for each of the dataset columns. - assert np.array_equal(sorted(np.array(q1_columns + q2_columns)), - sorted(results.columns.values)) - - def check_null_range(start_date, stop_date, col_name): - # Make sure that values in the given column/range are all null. - assert ( - results.loc[ - start_date:stop_date - ][col_name].isnull() - ).all() - - def check_values(start_date, end_date, col_name, qtr, event_idx): - # Make sure that values in the given column/range are all equal - # to the value at the given index from the raw data. - assert ( - results.loc[ - start_date:end_date - ][col_name + qtr] == - self.events[col_name][event_idx] - ).all() - - # Although it's painful to check the ranges one by one for different - # columns, it's important to do this so that we have a clear - # understanding of how knowledge/event dates interact and give us - # values for 1Q out and 2Q out. - for col in self.columns: - # 1Q out cols - check_null_range(self.START_DATE, - pd.Timestamp('2014-12-31'), - col.name + '1') - check_values(pd.Timestamp('2015-01-02'), - pd.Timestamp('2015-01-10'), - col.name, - '1', - 0) # First event is our 1Q out - check_values(pd.Timestamp('2015-01-11'), - pd.Timestamp('2015-01-20'), - col.name, - '1', - 1) # Second event becomes our 1Q out - check_null_range(pd.Timestamp('2015-01-21'), - self.END_DATE, - col.name + '1') - - # Fiscal year and quarter are different for 2Q out because even when we - # have no data for 2Q out, we still know which fiscal year/quarter we - # want data for as long as we have data for 1Q out. - for col in set(self.columns.keys()) - {Estimates.fiscal_year, - Estimates.fiscal_quarter}: - # 2Q out cols - check_null_range(self.START_DATE, - pd.Timestamp('2015-01-05'), - col.name + '2') - # We have data for 2Q out when our knowledge of - # the next quarter and the quarter after that - # overlaps and before the next quarter's event - # happens. - check_values(pd.Timestamp('2015-01-06'), - pd.Timestamp('2015-01-10'), - col.name, - '2', - 1) - check_null_range(pd.Timestamp('2015-01-11'), - self.END_DATE, - col.name + '2') - - # Check fiscal year/quarter for 2Q out. - check_null_range(self.START_DATE, - pd.Timestamp('2015-01-01'), - Estimates.fiscal_quarter.name + '2') - check_null_range(self.START_DATE, - pd.Timestamp('2015-01-01'), - Estimates.fiscal_year.name + '2') - # We have a different quarter number than the quarter numbers we have - # in our data for 2Q out, so assert manually. - assert ( - results.loc[ - pd.Timestamp('2015-01-02'):pd.Timestamp('2015-01-10') - ][Estimates.fiscal_quarter.name + '2'] == - 2 - ).all() - assert ( - results.loc[ - pd.Timestamp('2015-01-10'):pd.Timestamp('2015-01-20') - ][Estimates.fiscal_quarter.name + '2'] == - 3 - ).all() - # We have the same fiscal year, 2-15, for 2Q out over the date range of - # interest. - check_values(pd.Timestamp('2015-01-02'), - pd.Timestamp('2015-01-20'), - Estimates.fiscal_year.name, - '2', - 1) - check_null_range(pd.Timestamp('2015-01-21'), - self.END_DATE, - Estimates.fiscal_quarter.name + '2') - check_null_range(pd.Timestamp('2015-01-21'), - self.END_DATE, - Estimates.fiscal_year.name + '2') + assert_equal(sorted(np.array(q1_columns + q2_columns)), + sorted(results.columns.values)) + assert_frame_equal(self.expected_out.sort(axis=1), + results.xs(0, level=1).sort(axis=1)) -class BlazeNextEstimateLoaderTestCase(NextEstimateTestCase): - """ - Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. - """ - +class NextEstimateMultipleQuarters( + WithEstimateMultipleQuarters, ZiplineTestCase +): @classmethod def make_loader(cls, events, columns): - return BlazeNextEstimatesLoader( - bz.data(events), - columns, - ) + return NextQuartersEstimatesLoader(events, columns) + + @classmethod + def fill_expected_out(cls, expected): + # Fill columns for 1 Q out + for raw_name in cls.columns.values(): + expected[raw_name + '1'].loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-11') + ] = cls.events[raw_name].iloc[0] + expected[raw_name + '1'].loc[ + pd.Timestamp('2015-01-11'):pd.Timestamp('2015-01-20') + ] = cls.events[raw_name].iloc[1] + + # Fill columns for 2 Q out + # We only have an estimate and event date for 2 quarters out before + # Q1's event happens; after Q1's event, we know 1 Q out but not 2 Qs + # out. + for col_name in ['estimate', 'event_date']: + expected[col_name + '2'].loc[ + pd.Timestamp('2015-01-06'):pd.Timestamp('2015-01-10') + ] = cls.events[col_name].iloc[1] + # But we know what FQ and FY we'd need in both Q1 and Q2 + # because we know which FQ is next and can calculate from there + expected[FISCAL_QUARTER_FIELD_NAME + '2'].loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-09') + ] = 2 + expected[FISCAL_QUARTER_FIELD_NAME + '2'].loc[ + pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20') + ] = 3 + expected[FISCAL_YEAR_FIELD_NAME + '2'].loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-20') + ] = 2015 + + return expected -class PreviousEstimateTestCase(EstimateTestCase): - events = gen_estimates() +class PreviousEstimateMultipleQuarters( + WithEstimateMultipleQuarters, + ZiplineTestCase +): @classmethod def make_loader(cls, events, columns): return PreviousQuartersEstimatesLoader(events, columns) - def test_previous_estimates(self): + @classmethod + def fill_expected_out(cls, expected): + # Fill columns for 1 Q out + for raw_name in cls.columns.values(): + expected[raw_name + '1'].loc[ + pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-19') + ] = cls.events[raw_name].iloc[0] + expected[raw_name + '1'].loc[ + pd.Timestamp('2015-01-20'): + ] = cls.events[raw_name].iloc[1] + + # Fill columns for 2 Q out + for col_name in ['estimate', 'event_date']: + expected[col_name + '2'].loc[ + pd.Timestamp('2015-01-20'): + ] = cls.events[col_name].iloc[0] + expected[ + FISCAL_QUARTER_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20')] = 4 + expected[ + FISCAL_YEAR_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20')] = 2014 + expected[ + FISCAL_QUARTER_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-20'):] = 1 + expected[ + FISCAL_YEAR_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-20'):] = 2015 + return expected + + +class WithEstimateWindows(WithEstimates): + """ + ZiplineTestCase mixin providing fixures and a test to test running a + Pipeline with an estimates loader over differently-sized windows. + + Attributes + ---------- + events : pd.DataFrame + DataFrame with estimates for 2 quarters for 2 sids. + window_test_start_date : pd.Timestamp + The date from which the window should start. + timelines : dict[int -> pd.DataFrame] + A dictionary mapping to the number of quarters out to + snapshots of how the data should look on each date in the date range. + + Methods + ------- + make_expected_timelines() -> dict[int -> pd.DataFrame] + Creates a dictionary of expected data. See `timelines`, above. + + Tests + ----- + test_estimate_windows_at_quarter_boundaries() + Tests that we overwrite values with the correct quarter's estimate at + the correct dates when we have a factor that asks for a window of data. + """ + sid_0_timeline = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-17')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-20')], + 'estimate': [10., 11.] + [20., 21.], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 0, + }) + + sid_1_timeline = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-15')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-12'), pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-15')], + 'estimate': [10., 11.] + [30., 31.], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [3] * 2, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 1 + }) + + window_test_start_date = pd.Timestamp('2015-01-05') + critical_dates = [pd.Timestamp('2015-01-09', tz='utc'), + pd.Timestamp('2015-01-12', tz='utc'), + pd.Timestamp('2015-01-15', tz='utc'), + pd.Timestamp('2015-01-20', tz='utc')] + # window length, starting date, num quarters out, timeline. Parameterizes + # over number of quarters out. + window_test_cases = list(itertools.product(critical_dates, (1, 2))) + events = pd.concat([sid_0_timeline, sid_1_timeline]) + + @classmethod + def make_expected_timelines(cls): + return {} + + @classmethod + def init_class_fixtures(cls): + super(WithEstimateWindows, cls).init_class_fixtures() + cls.timelines = cls.make_expected_timelines() + + @classmethod + def create_expected_df(cls, tuples, end_date): """ - The goal of this test is to make sure that we select the right - datapoint as our 'previous' w.r.t each date. + Given a list of tuples of new data we get for each sid on each critical + date (when information changes), create a DataFrame that fills that + data through a date range ending at `end_date`. """ - dataset = QuartersEstimates(1) + df = pd.DataFrame(tuples, + columns=[SID_FIELD_NAME, + 'estimate', + 'knowledge_date']) + df = df.pivot_table(columns='sid', + values='estimate', + index='knowledge_date') + df = df.reindex( + pd.date_range(cls.window_test_start_date, end_date) + ) + # Index name is lost during reindex. + df.index = df.index.rename('knowledge_date') + df['at_date'] = end_date.tz_localize('utc') + df = df.set_index(['at_date', df.index.tz_localize('utc')]).ffill() + return df + + @parameterized.expand(window_test_cases) + def test_estimate_windows_at_quarter_boundaries(self, + start_idx, + num_quarters_out): + dataset = QuartersEstimates(num_quarters_out) + trading_days = self.trading_days + timelines = self.timelines + # The window length should be from the starting index back to the first + # date on which we got data. The goal is to ensure that as we + # progress through the timeline, all data we got, starting from that + # first date, is correctly overwritten. + window_len = ( + self.trading_days.get_loc(start_idx) - + self.trading_days.get_loc(self.window_test_start_date) + 1 + ) + + class SomeFactor(CustomFactor): + inputs = [dataset.estimate] + window_length = window_len + + def compute(self, today, assets, out, estimate): + today_idx = trading_days.get_loc(today) + today_timeline = timelines[ + num_quarters_out + ].loc[today].reindex( + trading_days[:today_idx + 1] + ).values + timeline_start_idx = (len(today_timeline) - window_len) + assert_equal(estimate, + today_timeline[timeline_start_idx:]) engine = SimplePipelineEngine( lambda x: self.loader, self.trading_days, self.asset_finder, ) - - results = engine.run_pipeline( - Pipeline({c.name: c.latest for c in dataset.columns}), - start_date=self.trading_days[0], - end_date=self.trading_days[-1], + engine.run_pipeline( + Pipeline({'est': SomeFactor()}), + start_date=start_idx, + end_date=pd.Timestamp('2015-01-20', tz='utc'), # last event date + # we have ) - for sid in self.sids: - sid_estimates = results.xs(sid, level=1) - ts_sorted_estimates = self.events[ - self.events[SID_FIELD_NAME] == sid - ].sort(TS_FIELD_NAME) - for i, date in enumerate(sid_estimates.index): - comparable_date = date.tz_localize(None) - # Filter out estimates we don't know about yet. - ts_eligible_estimates = ts_sorted_estimates[ - ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date - ] - expected_estimate = pd.DataFrame() - if not ts_eligible_estimates.empty: - # Determine the last piece of information we know about - # for q1 and q2. This takes advantage of the fact that we - # only have 2 quarters in the test data. - q1_knowledge = ts_eligible_estimates[ - ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 - ] - q2_knowledge = ts_eligible_estimates[ - ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 - ] - # The expected estimate will be for q2 if the last thing - # we've seen is that the release date already happened. - # Otherwise, it'll be for q1, as long as the release date - # for q1 has already happened. - if (not q2_knowledge.empty and - q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <= - comparable_date): - expected_estimate = q2_knowledge.iloc[-1] - elif (not q1_knowledge.empty and - q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <= - comparable_date): - expected_estimate = q1_knowledge.iloc[-1] - if not expected_estimate.empty: - for colname in sid_estimates.columns: - expected_value = expected_estimate[colname] - computed_value = sid_estimates.iloc[i][colname] - assert_equal(expected_value, computed_value) - else: - assert sid_estimates.iloc[i].isnull().all() - - def test_wrong_num_quarters_passed(self): - self._test_wrong_num_quarters_passed() -class BlazePreviousEstimateLoaderTestCase(PreviousEstimateTestCase): - """ - Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. - """ - +class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): - return BlazePreviousEstimatesLoader( - bz.data(events), - columns, + return PreviousQuartersEstimatesLoader(events, columns) + + @classmethod + def make_expected_timelines(cls): + oneq_previous = pd.concat([ + cls.create_expected_df( + [(0, np.NaN, cls.window_test_start_date), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-09') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-12') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-13') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-14') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-15') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-16') + ), + cls.create_expected_df( + [(0, 21, pd.Timestamp('2015-01-17')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-20') + ), + ]) + + twoq_previous = pd.concat( + [cls.create_expected_df( + [(0, np.NaN, cls.window_test_start_date), + (1, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-01-19')] + + [cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-20')), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-20') + )] ) + return { + 1: oneq_previous, + 2: twoq_previous + } + + +class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return NextQuartersEstimatesLoader(events, columns) + + @classmethod + def make_expected_timelines(cls): + oneq_next = pd.concat([ + cls.create_expected_df( + [(0, 10, cls.window_test_start_date), + (0, 11, pd.Timestamp('2015-01-07')), + (1, 10, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-09') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 10, pd.Timestamp('2015-01-09')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-12') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 30, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-13') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 30, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-14') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 30, pd.Timestamp('2015-01-09')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-15') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-16') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (0, 21, pd.Timestamp('2015-01-17')), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-20') + ), + ]) + + twoq_next = pd.concat( + [cls.create_expected_df( + [(0, 20, pd.Timestamp(cls.window_test_start_date)), + (1, np.NaN, pd.Timestamp(cls.window_test_start_date))], + pd.Timestamp('2015-01-09') + )] + + [cls.create_expected_df( + [(0, np.NaN, pd.Timestamp(cls.window_test_start_date)), + (1, np.NaN, pd.Timestamp(cls.window_test_start_date))], + end_date + ) for end_date in pd.date_range('2015-01-12', '2015-01-20')] + ) + + return { + 1: oneq_next, + 2: twoq_next + } class QuarterShiftTestCase(ZiplineTestCase): @@ -610,12 +887,12 @@ class QuarterShiftTestCase(ZiplineTestCase): backwards/forwards from a starting point. """ def test_quarter_normalization(self): - input_yrs = pd.Series([0] * 4) - input_qtrs = pd.Series(range(1, 5)) + input_yrs = pd.Series([0] * 4, dtype=np.int64) + input_qtrs = pd.Series(range(1, 5), dtype=np.int64) result_years, result_quarters = split_normalized_quarters( normalize_quarters(input_yrs, input_qtrs) ) # Can't use assert_series_equal here with check_names=False # because that still fails due to name differences. - assert input_yrs.equals(result_years) - assert input_qtrs.equals(result_quarters) + assert_equal(input_yrs, result_years) + assert_equal(input_qtrs, result_quarters) diff --git a/zipline/lib/adjustment.pyx b/zipline/lib/adjustment.pyx index 8839ece9..aea8df5c 100644 --- a/zipline/lib/adjustment.pyx +++ b/zipline/lib/adjustment.pyx @@ -3,7 +3,7 @@ from cpython cimport Py_EQ from pandas import isnull, Timestamp from numpy cimport float64_t, uint8_t, int64_t -from numpy import asarray, datetime64, float64 +from numpy import asarray, datetime64, float64, int64 # Purely for readability. There aren't C-level declarations for these types. ctypedef object Int64Index_t ctypedef object DatetimeIndex_t @@ -451,28 +451,32 @@ cdef class Datetime641DArrayOverwrite(ArrayAdjustment): Example ------- - >>> import numpy as np - >>> arr = np.arange(25, dtype=float).reshape(5, 5) - >>> arr - array([[ 0., 1., 2., 3., 4.], - [ 5., 6., 7., 8., 9.], - [ 10., 11., 12., 13., 14.], - [ 15., 16., 17., 18., 19.], - [ 20., 21., 22., 23., 24.]]) + >>> import numpy as np; import pandas as pd + >>> dts = pd.date_range('2014', freq='D', periods=9, tz='UTC') + >>> arr = dts.values.reshape(3, 3) + >>> arr == np.datetime64(0, 'ns') + array([[False, False, False], + [False, False, False], + [False, False, False]], dtype=bool) >>> adj = Datetime641DArrayOverwrite( - ... row_start=0, - ... row_end=3, - ... column_start=0, - ... column_end=0, - ... values=np.array([1, 2, 3, 4]), - ) - >>> adj.mutate(arr) - >>> arr - array([[ 1., 1., 2., 3., 4.], - [ 2., 6., 7., 8., 9.], - [ 3., 11., 12., 13., 14.], - [ 4., 16., 17., 18., 19.], - [ 20., 21., 22., 23., 24.]]) + ... first_row=1, + ... last_row=2, + ... first_col=1, + ... last_col=2, + ... values=np.array([ + ... np.datetime64(0, 'ns'), + ... np.datetime64(1, 'ns') + ... ]) + ... ) + >>> adj.mutate(arr.view(np.int64)) + >>> arr == np.datetime64(0, 'ns') + array([[False, False, False], + [False, True, True], + [False, False, False]], dtype=bool) + >>> arr == np.datetime64(1, 'ns') + array([[False, False, False], + [False, False, False], + [False, True, True]], dtype=bool) """ cdef: readonly int64_t[:] values @@ -598,7 +602,7 @@ cdef datetime_to_int(object datetimelike): datetimelike.dtype.name, ) - return datetimelike.astype(int) + return datetimelike.astype(int64) cdef class Datetime64Adjustment(_Int64Adjustment): diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 24ce3e26..5264a954 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -1096,12 +1096,15 @@ class BlazeLoader(dict): sparse_deltas = last_in_date_group(non_novel_deltas, dates, assets, - reindex=False) + reindex=False, + have_sids=have_sids) dense_output = last_in_date_group(sparse_output, dates, assets, - reindex=True) - ffill_across_cols(dense_output, columns) + reindex=True, + have_sids=have_sids) + ffill_across_cols(dense_output, columns, {c.name: c.name + for c in columns}) if have_sids: adjustments_from_deltas = adjustments_from_deltas_with_sids column_view = identity diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py index 81554b0d..864b05a7 100644 --- a/zipline/pipeline/loaders/blaze/estimates.py +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -25,6 +25,8 @@ class BlazeEstimatesLoader(PipelineLoader): ---------- expr : Expr The expression representing the data to load. + columns : dict[str -> str] + A dict mapping BoundColumn names to the associated names in `expr`. resources : dict, optional Mapping from the loadable terms of ``expr`` to actual data resources. odo_kwargs : dict, optional @@ -33,8 +35,6 @@ class BlazeEstimatesLoader(PipelineLoader): The time to use for the data query cutoff. data_query_tz : tzinfo or str The timezeone to use for the data query cutoff. - dataset : DataSet - The DataSet object for which this loader loads data. Notes ----- @@ -43,12 +43,14 @@ class BlazeEstimatesLoader(PipelineLoader): Dim * {{ {SID_FIELD_NAME}: int64, {TS_FIELD_NAME}: datetime, + {FISCAL_YEAR_FIELD_NAME}: float64, + {FISCAL_QUARTER_FIELD_NAME}: float64, + {EVENT_DATE_FIELD_NAME}: datetime, }} And other dataset-specific fields, where each row of the table is a record including the sid to identify the company, the timestamp where we - learned about the announcement, and the date when the earnings will be - announced. + learned about the announcement, and the date of the event. If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index 39e5dac6..56c050f2 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -24,6 +24,10 @@ class BlazeEventsLoader(PipelineLoader): ---------- expr : Expr The expression representing the data to load. + next_value_columns : dict[BoundColumn -> raw column name] + A dict mapping 'next' BoundColumns to their column names in `expr`. + previous_value_columns : dict[BoundColumn -> raw column name] + A dict mapping 'previous' BoundColumns to their column names in `expr`. resources : dict, optional Mapping from the loadable terms of ``expr`` to actual data resources. odo_kwargs : dict, optional @@ -32,8 +36,6 @@ class BlazeEventsLoader(PipelineLoader): The time to use for the data query cutoff. data_query_tz : tzinfo or str The timezone to use for the data query cutoff. - dataset : DataSet - The DataSet object for which this loader loads data. Notes ----- @@ -42,12 +44,12 @@ class BlazeEventsLoader(PipelineLoader): Dim * {{ {SID_FIELD_NAME}: int64, {TS_FIELD_NAME}: datetime, + {EVENT_DATE_FIELD_NAME}: datetime, }} And other dataset-specific fields, where each row of the table is a record including the sid to identify the company, the timestamp where we - learned about the announcement, and the date when the earnings will be z - announced. + learned about the announcement, and the event date. If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. @@ -84,8 +86,12 @@ class BlazeEventsLoader(PipelineLoader): self._data_query_tz = data_query_tz def load_adjusted_array(self, columns, dates, assets, mask): - raw = load_raw_data(assets, dates, self._data_query_time, - self._data_query_tz, self._expr, self._odo_kwargs) + raw = load_raw_data(assets, + dates, + self._data_query_time, + self._data_query_tz, + self._expr, + self._odo_kwargs) return EventsLoader( events=raw, diff --git a/zipline/pipeline/loaders/blaze/utils.py b/zipline/pipeline/loaders/blaze/utils.py index 6455f76c..963d9f9a 100644 --- a/zipline/pipeline/loaders/blaze/utils.py +++ b/zipline/pipeline/loaders/blaze/utils.py @@ -6,7 +6,11 @@ from zipline.pipeline.loaders.utils import ( ) -def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, +def load_raw_data(assets, + dates, + data_query_time, + data_query_tz, + expr, odo_kwargs): """ given an expression representing data to load, perform normalization and @@ -25,13 +29,14 @@ def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, `time`. expr : expr the expression representing the data to load. - odo_kwargs : dict, optional + odo_kwargs : dict extra keyword arguments to pass to odo when executing the expression. returns ------- raw : pd.dataframe - the data symbolized by `expr` materialized in a dataframe. + The result of computing expr and materializing the result as a + dataframe. """ lower_dt, upper_dt = normalize_data_query_bounds( dates[0], @@ -45,7 +50,7 @@ def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, upper_dt, odo_kwargs, ) - sids = raw.loc[:, SID_FIELD_NAME] + sids = raw[SID_FIELD_NAME] raw.drop( sids[~sids.isin(assets)].index, inplace=True diff --git a/zipline/pipeline/loaders/quarter_estimates.py b/zipline/pipeline/loaders/quarter_estimates.py index 6ef8ae7c..54686a6e 100644 --- a/zipline/pipeline/loaders/quarter_estimates.py +++ b/zipline/pipeline/loaders/quarter_estimates.py @@ -1,9 +1,8 @@ from abc import abstractmethod -from collections import defaultdict import numpy as np -import pandas as pd from six import viewvalues from toolz import groupby + from zipline.lib.adjusted_array import AdjustedArray from zipline.lib.adjustment import (Datetime641DArrayOverwrite, Float641DArrayOverwrite) @@ -22,14 +21,15 @@ from zipline.pipeline.loaders.utils import ( last_in_date_group ) -NORMALIZED_QUARTERS = 'normalized_quarters' - -SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters' +INVALID_NUM_QTRS_MESSAGE = "Passed invalid number of quarters %s; " \ + "must pass a number of quarters >= 0" NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' NEXT_FISCAL_YEAR = 'next_fiscal_year' +NORMALIZED_QUARTERS = 'normalized_quarters' PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' +SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters' SIMULTATION_DATES = 'dates' @@ -86,10 +86,10 @@ def validate_column_specs(events, columns): class QuarterEstimatesLoader(PipelineLoader): def __init__(self, estimates, - base_column_name_map): + name_map): validate_column_specs( estimates, - base_column_name_map + name_map ) self.estimates = estimates[ @@ -97,12 +97,16 @@ class QuarterEstimatesLoader(PipelineLoader): estimates[FISCAL_QUARTER_FIELD_NAME].notnull() & estimates[FISCAL_YEAR_FIELD_NAME].notnull() ] + self.estimates[NORMALIZED_QUARTERS] = normalize_quarters( + self.estimates[FISCAL_YEAR_FIELD_NAME], + self.estimates[FISCAL_QUARTER_FIELD_NAME], + ) - self.base_column_name_map = base_column_name_map + self.name_map = name_map @abstractmethod def load_quarters(self, num_quarters, last, dates): - pass + raise NotImplementedError('load_quarters') def get_requested_data_for_col(self, stacked_last_per_qtr, idx, dates): """ @@ -111,8 +115,8 @@ class QuarterEstimatesLoader(PipelineLoader): Parameters ---------- stacked_last_per_qtr : pd.DataFrame - The latest estimate known per sid per date per quarter with the - dates, normalized quarter, and sid as the index. + The latest estimate known with the dates, normalized quarter, and + sid as the index. idx : pd.MultiIndex The index of the row of the requested quarter from each date for each sid. @@ -122,16 +126,18 @@ class QuarterEstimatesLoader(PipelineLoader): Returns -------- requested_qtr_data : pd.DataFrame - The DataFrame with final values for the requested quarter for all - columns; `dates` are the index and columns are a MultiIndex with - sids at the top level and the dataset columns on the bottom. + The DataFrame with the latest values for the requested quarter + for all columns; `dates` are the index and columns are a MultiIndex + with sids at the top level and the dataset columns on the bottom. """ requested_qtr_data = stacked_last_per_qtr.loc[idx] - # We no longer need this in the index, but we do need it as a column - # to calculate adjustments. + # We no longer need the shifted normalized quarters in the index, but + # we do need it as a column to calculate adjustments. requested_qtr_data = requested_qtr_data.reset_index( SHIFTED_NORMALIZED_QTRS ) + # Calculate the actual year/quarter being requested and add those in + # as columns. (requested_qtr_data[FISCAL_YEAR_FIELD_NAME], requested_qtr_data[FISCAL_QUARTER_FIELD_NAME]) = \ split_normalized_quarters( @@ -154,8 +160,7 @@ class QuarterEstimatesLoader(PipelineLoader): column_name, column, mask, - assets, - qtr_crossover_point): + assets): """ Creates an AdjustedArray from the given estimates data for the given dates. @@ -183,18 +188,17 @@ class QuarterEstimatesLoader(PipelineLoader): computed. column : BoundColumn The column for which the AdjustedArray is being computed. - mask : - assets : - qtr_crossover_point : - Whether we should use the 'right' or 'left' side when doing - searchsorted on the dates for quarter boundaries. + mask : np.array + Mask array of dimensions len(dates) X len(assets). + assets : pd.Int64Index + An index of all the assets from the raw data. Returns ------- adjusted_array : AdjustedArray The array of data and overwrites for the given column. """ - adjustments = defaultdict(list) + adjustments = {} requested_qtr_data = self.get_requested_data_for_col( stacked_last_per_qtr, requested_qtr_idx, dates ) @@ -204,10 +208,8 @@ class QuarterEstimatesLoader(PipelineLoader): zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS) if column.dtype == datetime64ns_dtype: overwrite = Datetime641DArrayOverwrite - missing_value = np.datetime64('NaT', 'ns') else: overwrite = Float641DArrayOverwrite - missing_value = np.NaN for sid_idx, sid in enumerate(assets): zero_qtr_sid_data = zero_qtr_data[ zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid @@ -225,7 +227,7 @@ class QuarterEstimatesLoader(PipelineLoader): ] # For the given sid, determine which quarters we have estimates # for. - quarters_with_estimates_for_sid = last_per_qtr.xs( + qtrs_with_estimates_for_sid = last_per_qtr.xs( sid, axis=1, level=SID_FIELD_NAME ).groupby(axis=1, level=1).first().columns.values for row_indexer in list(qtr_shifts.index): @@ -233,108 +235,162 @@ class QuarterEstimatesLoader(PipelineLoader): # after this row. This isn't the starting index of the # requested quarter, but simply the date we cross over into a # new quarter. - qtr_start_idx = dates.searchsorted( + next_qtr_start_idx = dates.searchsorted( zero_qtr_data.loc[ row_indexer ][EVENT_DATE_FIELD_NAME], - side=qtr_crossover_point + side='left' + if isinstance(self, PreviousQuartersEstimatesLoader) + else 'right' ) - - # Only add adjustments if the next quarter starts somewhere in - # our date index for this sid. Our 'next' quarter can never - # start at index 0; a starting index of 0 means that the next - # quarter's event date was NaT. - if 0 < qtr_start_idx < len(dates): - # Find the quarter being requested in the quarter we're - # crossing into. - requested_quarter = requested_qtr_data[ - SHIFTED_NORMALIZED_QTRS - ][sid].iloc[qtr_start_idx] - - # If there are estimates for the requested quarter, - # overwrite all values going up to the starting index of - # that quarter with estimates for that quarter. - if requested_quarter in quarters_with_estimates_for_sid: - adjustments[qtr_start_idx] = \ - [overwrite( - 0, - qtr_start_idx - 1, # overwrite thru last qtr - sid_idx, - sid_idx, - last_per_qtr[column_name, - requested_quarter, - sid][:qtr_start_idx].values)] - # There are no estimates for the quarter. Overwrite all - # values going up to the starting index of that quarter - # with the missing value for this column. - else: - adjustments[qtr_start_idx] = [ - overwrite( - 0, - qtr_start_idx - 1, - sid_idx, - sid_idx, - np.array( - [missing_value] * - len(last_per_qtr.index[:qtr_start_idx])) - ) - ] + adjustments[next_qtr_start_idx] = \ + self.create_overwrite_for_quarter( + next_qtr_start_idx, + column, + column_name, + dates, + last_per_qtr, + overwrite, + qtrs_with_estimates_for_sid, + requested_qtr_data, + sid, + sid_idx, + ) return AdjustedArray( - requested_qtr_data[column_name].values.astype(column.dtype), - mask, - dict(adjustments), - column.missing_value, - ) + requested_qtr_data[column_name].values.astype(column.dtype), + mask, + dict(adjustments), + column.missing_value, + ) + + def create_overwrite_for_quarter(self, + next_qtr_start_idx, + column, + column_name, + dates, + last_per_qtr, + overwrite, + quarters_with_estimates_for_sid, + requested_qtr_data, + sid, + sid_idx): + # Only add adjustments if the next quarter starts somewhere in + # our date index for this sid. Our 'next' quarter can never + # start at index 0; a starting index of 0 means that the next + # quarter's event date was NaT. + if 0 < next_qtr_start_idx < len(dates): + # Find the quarter being requested in the quarter we're + # crossing into. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS + ][sid].iloc[next_qtr_start_idx] + + # If there are estimates for the requested quarter, + # overwrite all values going up to the starting index of + # that quarter with estimates for that quarter. + if requested_quarter in quarters_with_estimates_for_sid: + return self.create_overwrite_for_estimate( + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + overwrite, + requested_quarter, + sid, + sid_idx + ) + # There are no estimates for the quarter. Overwrite all + # values going up to the starting index of that quarter + # with the missing value for this column. + else: + return self.overwrite_with_null( + column, + last_per_qtr, + next_qtr_start_idx, + overwrite, + sid_idx + ) + + def overwrite_with_null(self, + column, + last_per_qtr, + next_qtr_start_idx, + overwrite, + sid_idx): + return [overwrite( + 0, + next_qtr_start_idx - 1, + sid_idx, + sid_idx, + np.full( + len( + last_per_qtr.index[:next_qtr_start_idx] + ), + column.missing_value, + dtype=column.dtype + ))] def load_adjusted_array(self, columns, dates, assets, mask): - # TODO: how can we enforce that datasets have the num_quarters - # attribute, given that they're created dynamically? - groups = groupby(lambda x: x.dataset.num_quarters, columns) - groups_columns = dict(groups) - if (pd.Series(groups_columns.keys()) < 0).any(): - raise ValueError("Must pass a number of quarters >= 0") + # Separate out getting the columns' datasets and the datasets' + # num_quarters attributes to ensure that we're catching the right + # AttributeError. + col_to_datasets = {col: col.dataset for col in columns} + try: + groups = groupby(lambda col: col_to_datasets[col].num_quarters, + col_to_datasets) + except AttributeError: + raise AttributeError("Datasets loaded via the " + "QuarterEstimatesLoader must define a " + "`num_quarters` attribute that defines how " + "many quarters out the loader should load " + "the data relative to `dates`.") + if any(num_qtr < 0 for num_qtr in groups): + raise ValueError( + INVALID_NUM_QTRS_MESSAGE % ','.join( + str(qtr) for qtr in groups if qtr < 0 + ) + + ) out = {} - self.estimates[NORMALIZED_QUARTERS] = normalize_quarters( - self.estimates[FISCAL_YEAR_FIELD_NAME], - self.estimates[FISCAL_QUARTER_FIELD_NAME], - ) - for num_quarters, columns in groups_columns.items(): - # The column's dataset is itself dynamic and the mapping we - # actually want is to its dataset's parent's column name. - name_map = {c: self.base_column_name_map[ - getattr(c.dataset.__base__, c.name) - ] for c in columns} + + for num_quarters, columns in groups.items(): # Determine the last piece of information we know for each column # on each date in the index for each sid and quarter. last_per_qtr = last_in_date_group( - self.estimates, True, dates, assets, + self.estimates, dates, assets, reindex=True, extra_groupers=[NORMALIZED_QUARTERS] ) # Forward fill values for each quarter/sid/dataset column. - ffill_across_cols(last_per_qtr, columns) + ffill_across_cols(last_per_qtr, columns, self.name_map) # Stack quarter and sid into the index. - stacked_last_per_qtr = last_per_qtr.stack([NORMALIZED_QUARTERS, - SID_FIELD_NAME]) + stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME, + NORMALIZED_QUARTERS]) # Set date index name for ease of reference - stacked_last_per_qtr.index.set_names(SIMULTATION_DATES, 0, True) + stacked_last_per_qtr.index.set_names(SIMULTATION_DATES, + level=0, + inplace=True) + # We want to know the most recent/next event relative to each date. + stacked_last_per_qtr = stacked_last_per_qtr.sort( + EVENT_DATE_FIELD_NAME + ) # Determine which quarter is next/previous for each date. shifted_qtr_data = self.load_quarters(num_quarters, stacked_last_per_qtr) zero_qtr_idx = shifted_qtr_data.index requested_qtr_idx = shifted_qtr_data.set_index([ - shifted_qtr_data.index.get_level_values( - SIMULTATION_DATES - ), - shifted_qtr_data[SHIFTED_NORMALIZED_QTRS], - shifted_qtr_data.index.get_level_values( - SID_FIELD_NAME - )] - ).index + shifted_qtr_data.index.get_level_values( + SIMULTATION_DATES + ), + shifted_qtr_data.index.get_level_values( + SID_FIELD_NAME + ), + shifted_qtr_data[SHIFTED_NORMALIZED_QTRS] + ]).index for c in columns: - column_name = name_map[c] + column_name = self.name_map[c.name] adjusted_array = self.get_adjustments(zero_qtr_idx, requested_qtr_idx, stacked_last_per_qtr, @@ -343,26 +399,68 @@ class QuarterEstimatesLoader(PipelineLoader): column_name, c, mask, - assets, - self.qtr_crossover_point) + assets) out[c] = adjusted_array return out class NextQuartersEstimatesLoader(QuarterEstimatesLoader): - qtr_crossover_point = 'right' + def create_overwrite_for_estimate(self, + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + overwrite, + requested_quarter, + sid, + sid_idx): + return [overwrite( + 0, + # overwrite thru last qtr + next_qtr_start_idx - 1, + sid_idx, + sid_idx, + last_per_qtr[ + column_name, + requested_quarter, + sid + ][0:next_qtr_start_idx].values)] def load_quarters(self, num_quarters, stacked_last_per_qtr): - # Filter for releases that are on or after each simulation date and - # determine the next quarter by picking out the upcoming release for - # each date in the index. - stacked_last_per_qtr = stacked_last_per_qtr.sort( - EVENT_DATE_FIELD_NAME - ) + """ + Filters for releases that are on or after each simulation date and + determines the next quarter by picking out the upcoming release for + each date in the index. Adda a SHIFTED_NORMALIZED_QTRS column which + contains the requested next quarter for each calendar date and sid. + + Parameters + ---------- + num_quarters : int + Number of quarters to go out in the future. + stacked_last_per_qtr : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters with each row being the latest estimate for the row's + index values, sorted by event date. + + Returns + ------- + next_releases_per_date : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters, keeping only rows with next event information relative to + the index values and with an added column for + SHIFTED_NORMALIZED_QTRS, which contains the requested quarter for + each row. + """ + + # We reset the index here because in pandas3, a groupby on the index + # will set the index to just the items in the groupby, so we will lose + # the normalized quarters. next_releases_per_date = stacked_last_per_qtr.loc[ stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >= stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) - ].groupby(level=[SIMULTATION_DATES, SID_FIELD_NAME]).nth(0) + ].reset_index(NORMALIZED_QUARTERS).groupby( + level=[SIMULTATION_DATES, SID_FIELD_NAME] + ).nth(0).set_index(NORMALIZED_QUARTERS, append=True) next_releases_per_date[ SHIFTED_NORMALIZED_QTRS ] = next_releases_per_date.index.get_level_values( @@ -372,18 +470,57 @@ class NextQuartersEstimatesLoader(QuarterEstimatesLoader): class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader): - qtr_crossover_point = 'left' + def create_overwrite_for_estimate(self, + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + overwrite, + requested_quarter, + sid, + sid_idx): + return self.overwrite_with_null(column, + last_per_qtr, + next_qtr_start_idx, + overwrite, + sid_idx) def load_quarters(self, num_quarters, stacked_last_per_qtr): - # Filter for releases that are on or before each simulation date and - # determine the previous quarter by picking out the upcoming release - # for each date in the index. - stacked_last_per_qtr = stacked_last_per_qtr.sort(EVENT_DATE_FIELD_NAME) + """ + Filters for releases that are on or after each simulation date and + determines the previous quarter by picking out the most recent + release relative to each date in the index. Adds a + SHIFTED_NORMALIZED_QTRS column which contains the requested previous + quarter for each calendar date and sid. + + Parameters + ---------- + num_quarters : int + Number of quarters to go out in the past. + stacked_last_per_qtr : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters with each row being the latest estimate for the row's + index values, sorted by event date. + + Returns + ------- + next_releases_per_date : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters, keeping only rows with have a previous event relative + to the index values and with an added column for + SHIFTED_NORMALIZED_QTRS, which contains the requested quarter for + each row. + """ + + # We reset the index here because in pandas3, a groupby on the index + # will set the index to just the items in the groupby, so we will lose + # the normalized quarters. previous_releases_per_date = stacked_last_per_qtr.loc[ stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <= - stacked_last_per_qtr.index.get_level_values( - SIMULTATION_DATES - )].groupby(level=[SIMULTATION_DATES, SID_FIELD_NAME]).nth(-1) + stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) + ].reset_index(NORMALIZED_QUARTERS).groupby( + level=[SIMULTATION_DATES, SID_FIELD_NAME] + ).nth(-1).set_index(NORMALIZED_QUARTERS, append=True) previous_releases_per_date[ SHIFTED_NORMALIZED_QTRS ] = previous_releases_per_date.index.get_level_values( diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 5bd91132..125dacfb 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -276,7 +276,7 @@ def check_data_query_args(data_query_time, data_query_tz): ) -def last_in_date_group(df, reindex, dates, assets, have_sids=True, +def last_in_date_group(df, dates, assets, reindex=True, have_sids=True, extra_groupers=[]): """ Determine the last piece of information known on each date in the date @@ -286,14 +286,14 @@ def last_in_date_group(df, reindex, dates, assets, have_sids=True, ---------- df : pd.DataFrame The DataFrame containing the data to be grouped. - reindex : bool - Whether or not the DataFrame should be reindexed against the date - index. This will add back any dates to the index that were grouped - away. dates : pd.DatetimeIndex The dates to use for grouping and reindexing. assets : pd.Int64Index The assets that should be included in the column multiindex. + reindex : bool + Whether or not the DataFrame should be reindexed against the date + index. This will add back any dates to the index that were grouped + away. have_sids : bool Whether or not the DataFrame has sids. If it does, they will be used in the groupby. @@ -307,11 +307,11 @@ def last_in_date_group(df, reindex, dates, assets, have_sids=True, levels of a multiindex of columns. """ - idx = dates[dates.searchsorted( + idx = [dates[dates.searchsorted( df[TS_FIELD_NAME].values.astype('datetime64[D]') - )] + )]] if have_sids: - idx = [idx, SID_FIELD_NAME] + idx += [SID_FIELD_NAME] idx += extra_groupers last_in_group = df.drop(TS_FIELD_NAME, axis=1).groupby( @@ -321,7 +321,7 @@ def last_in_date_group(df, reindex, dates, assets, have_sids=True, # For the number of things that we're grouping by (except TS), unstack # the df - last_in_group = last_in_group.unstack([-1, -2]) + last_in_group = last_in_group.unstack(list(range(-1, -len(idx), -1))) if reindex: if have_sids: @@ -339,7 +339,7 @@ def last_in_date_group(df, reindex, dates, assets, have_sids=True, return last_in_group -def ffill_across_cols(df, columns): +def ffill_across_cols(df, columns, name_map): """ Forward fill values in a DataFrame with special logic to handle cases that pd.DataFrame.ffill cannot and cast columns to appropriate types. @@ -351,6 +351,9 @@ def ffill_across_cols(df, columns): columns : list of BoundColumn The BoundColumns that correspond to columns in the DataFrame to which special filling and/or casting logic should be applied. + name_map: map of string -> string + Mapping from the name of each BoundColumn to the associated column + name in `df`. """ df.ffill(inplace=True) @@ -369,18 +372,19 @@ def ffill_across_cols(df, columns): # pandas to replace NaNs in an object column with None using fillna, # so we have to roll our own instead using df.where. for column in columns: + column_name = name_map[column.name] # Special logic for strings since `fillna` doesn't work if the # missing value is `None`. if column.dtype == categorical_dtype: - df[column.name] = df[ + df[column_name] = df[ column.name - ].where(pd.notnull(df[column.name]), + ].where(pd.notnull(df[column_name]), column.missing_value) else: # We need to execute `fillna` before `astype` in case the # column contains NaNs and needs to be cast to bool or int. # This is so that the NaNs are replaced first, since pandas # can't convert NaNs for those types. - df[column.name] = df[ - column.name + df[column_name] = df[ + column_name ].fillna(column.missing_value).astype(column.dtype) diff --git a/zipline/testing/core.py b/zipline/testing/core.py index f9832767..19240a5b 100644 --- a/zipline/testing/core.py +++ b/zipline/testing/core.py @@ -49,8 +49,14 @@ from zipline.pipeline.loaders.testing import make_seeded_random_loader from zipline.utils import security_list from zipline.utils.calendars import get_calendar from zipline.utils.input_validation import expect_dimensions +<<<<<<< HEAD from zipline.utils.numpy_utils import as_column, isnat from zipline.utils.pandas_utils import timedelta_to_integral_seconds +======= +from zipline.utils.numpy_utils import ( + as_column, +) +>>>>>>> WIP from zipline.utils.sentinel import sentinel import numpy as np diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index f0e2aaa5..1ce01627 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -34,13 +34,14 @@ from ..finance.trading import TradingEnvironment from ..utils import factory from ..utils.classproperty import classproperty from ..utils.final import FinalMeta, final -from .core import tmp_asset_finder, make_simple_equity_info +from .core import (tmp_asset_finder, make_simple_equity_info) from zipline.assets import Equity, Future from zipline.pipeline import SimplePipelineEngine from zipline.pipeline.loaders.testing import make_seeded_random_loader from zipline.utils.calendars import ( get_calendar, - register_calendar) + register_calendar +) class ZiplineTestCase(with_metaclass(FinalMeta, TestCase)):