From bfc01546aef157a0a047fbae11b3dfedaab60a4e Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Wed, 5 Oct 2016 10:33:24 -0400 Subject: [PATCH] ENH: allow estimates to be split-adjusted. This modificaiton to the estimates loader allows the caller to pass in an equity pricing loader which can then be used to get split data for sids. That split data is then used to do point-in-time adjustments of estimates data. TST: add test for multiple estimates columns TST: add test for multiple datasets requesting different columns TST: add blaze versions for all next/previous tests --- tests/pipeline/test_quarters_estimates.py | 1848 +++++++++++++++-- zipline/pipeline/loaders/blaze/estimates.py | 68 +- .../pipeline/loaders/earnings_estimates.py | 1032 ++++++++- 3 files changed, 2694 insertions(+), 254 deletions(-) diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index 109e7904..52cd08e1 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -1,9 +1,14 @@ +from __future__ import division + +from datetime import timedelta +from functools import partial + import blaze as bz import itertools from nose.tools import assert_true from nose_parameterized import parameterized import numpy as np -from numpy.testing import assert_array_equal +from numpy.testing import assert_array_equal, assert_almost_equal import pandas as pd from toolz import merge @@ -19,21 +24,25 @@ from zipline.pipeline.data import DataSet from zipline.pipeline.data import Column from zipline.pipeline.loaders.blaze.estimates import ( BlazeNextEstimatesLoader, - BlazePreviousEstimatesLoader -) + BlazePreviousEstimatesLoader, + BlazeNextSplitAdjustedEstimatesLoader, + BlazePreviousSplitAdjustedEstimatesLoader) from zipline.pipeline.loaders.earnings_estimates import ( INVALID_NUM_QTRS_MESSAGE, NextEarningsEstimatesLoader, + NextSplitAdjustedEarningsEstimatesLoader, normalize_quarters, PreviousEarningsEstimatesLoader, + PreviousSplitAdjustedEarningsEstimatesLoader, split_normalized_quarters, ) from zipline.testing.fixtures import ( - WithAssetFinder, + WithAdjustmentReader, WithTradingSessions, ZiplineTestCase, ) from zipline.testing.predicates import assert_equal, assert_raises_regex +from zipline.testing.predicates import assert_frame_equal from zipline.utils.numpy_utils import datetime64ns_dtype from zipline.utils.numpy_utils import float64_dtype @@ -45,6 +54,14 @@ class Estimates(DataSet): estimate = Column(dtype=float64_dtype) +class MultipleColumnsEstimates(DataSet): + event_date = Column(dtype=datetime64ns_dtype) + fiscal_quarter = Column(dtype=float64_dtype) + fiscal_year = Column(dtype=float64_dtype) + estimate1 = Column(dtype=float64_dtype) + estimate2 = Column(dtype=float64_dtype) + + def QuartersEstimates(announcements_out): class QtrEstimates(Estimates): num_announcements = announcements_out @@ -52,13 +69,48 @@ def QuartersEstimates(announcements_out): return QtrEstimates +def MultipleColumnsQuartersEstimates(announcements_out): + class QtrEstimates(MultipleColumnsEstimates): + num_announcements = announcements_out + name = Estimates + return QtrEstimates + + def QuartersEstimatesNoNumQuartersAttr(num_qtr): class QtrEstimates(Estimates): name = Estimates return QtrEstimates -class WithEstimates(WithTradingSessions, WithAssetFinder): +def create_expected_df_for_factor_compute(start_date, + sids, + tuples, + end_date): + """ + Given a list of tuples of new data we get for each sid on each critical + date (when information changes), create a DataFrame that fills that + data through a date range ending at `end_date`. + """ + df = pd.DataFrame(tuples, + columns=[SID_FIELD_NAME, + 'estimate', + 'knowledge_date']) + df = df.pivot_table(columns=SID_FIELD_NAME, + values='estimate', + index='knowledge_date') + df = df.reindex( + pd.date_range(start_date, end_date) + ) + # Index name is lost during reindex. + df.index = df.index.rename('knowledge_date') + df['at_date'] = end_date.tz_localize('utc') + df = df.set_index(['at_date', df.index.tz_localize('utc')]).ffill() + new_sids = set(sids) - set(df.columns) + df = df.reindex(columns=df.columns.union(new_sids)) + return df + + +class WithEstimates(WithTradingSessions, WithAdjustmentReader): """ ZiplineTestCase mixin providing cls.loader and cls.events as class level fixtures. @@ -74,6 +126,9 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): columns : dict[str -> str] The dictionary mapping the names of BoundColumns to the associated column name in the events DataFrame. + make_columns() -> dict[BoundColumn -> str] + Method which returns a dictionary of BoundColumns mapped to the + associated column names in the raw data. """ # Short window defined in order for test to run faster. @@ -93,22 +148,40 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): return cls.events[SID_FIELD_NAME].unique() @classmethod - def init_class_fixtures(cls): - cls.events = cls.make_events() - cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids() - cls.columns = { + def make_columns(cls): + return { Estimates.event_date: 'event_date', Estimates.fiscal_quarter: 'fiscal_quarter', Estimates.fiscal_year: 'fiscal_year', Estimates.estimate: 'estimate' } - cls.loader = cls.make_loader(cls.events, {column.name: val for - column, val in - cls.columns.items()}) + + @classmethod + def init_class_fixtures(cls): + cls.events = cls.make_events() + cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids() cls.ASSET_FINDER_EQUITY_SYMBOLS = [ 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS ] + # We need to instantiate certain constants needed by supers of + # `WithEstimates` before we call their `init_class_fixtures`. super(WithEstimates, cls).init_class_fixtures() + cls.columns = cls.make_columns() + # Some tests require `WithAdjustmentReader` to be set up by the time we + # make the loader. + cls.loader = cls.make_loader(cls.events, {column.name: val for + column, val in + cls.columns.items()}) + + +dummy_df = pd.DataFrame({SID_FIELD_NAME: 0}, + columns=[SID_FIELD_NAME, + TS_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + 'estimate'], + index=[0]) class WithWrongLoaderDefinition(WithEstimates): @@ -133,14 +206,7 @@ class WithWrongLoaderDefinition(WithEstimates): @classmethod def make_events(cls): - return pd.DataFrame({SID_FIELD_NAME: 0}, - columns=[SID_FIELD_NAME, - TS_FIELD_NAME, - EVENT_DATE_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - 'estimate'], - index=[0]) + return dummy_df def test_wrong_num_announcements_passed(self): bad_dataset1 = QuartersEstimates(-1) @@ -205,6 +271,47 @@ class NextWithWrongNumQuarters(WithWrongLoaderDefinition, return NextEarningsEstimatesLoader(events, columns) +options = ["split_adjustments_loader", + "split_adjusted_column_names", + "split_adjusted_asof"] + + +class WrongSplitsLoaderDefinition(WithEstimates, ZiplineTestCase): + """ + Test class that tests that loaders break correctly when incorrectly + instantiated. + + Tests + ----- + test_extra_splits_columns_passed(SplitAdjustedEstimatesLoader) + A test that checks that the loader correctly breaks when an + unexpected column is passed in the list of split-adjusted columns. + """ + @classmethod + def init_class_fixtures(cls): + super(WithEstimates, cls).init_class_fixtures() + + @parameterized.expand(itertools.product( + (NextSplitAdjustedEarningsEstimatesLoader, + PreviousSplitAdjustedEarningsEstimatesLoader), + )) + def test_extra_splits_columns_passed(self, loader): + columns = { + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.estimate: 'estimate' + } + + with self.assertRaises(ValueError): + loader(dummy_df, + {column.name: val for column, val in + columns.items()}, + split_adjustments_loader=self.adjustment_reader, + split_adjusted_column_names=["estimate", "extra_col"], + split_adjusted_asof=pd.Timestamp("2015-01-01")) + + class WithEstimatesTimeZero(WithEstimates): """ ZiplineTestCase mixin providing cls.events as a class level fixture and @@ -618,6 +725,15 @@ class NextEstimateMultipleQuarters( return expected +class BlazeNextEstimateMultipleQuarters(NextEstimateMultipleQuarters): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) + + class PreviousEstimateMultipleQuarters( WithEstimateMultipleQuarters, ZiplineTestCase @@ -658,6 +774,15 @@ class PreviousEstimateMultipleQuarters( return expected +class BlazePreviousEstimateMultipleQuarters(PreviousEstimateMultipleQuarters): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) + + class WithVaryingNumEstimates(WithEstimates): """ ZiplineTestCase mixin providing fixtures and a test to ensure that we @@ -749,6 +874,15 @@ class PreviousVaryingNumEstimates( return PreviousEarningsEstimatesLoader(events, columns) +class BlazePreviousVaryingNumEstimates(PreviousVaryingNumEstimates): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) + + class NextVaryingNumEstimates( WithVaryingNumEstimates, ZiplineTestCase @@ -771,6 +905,15 @@ class NextVaryingNumEstimates( return NextEarningsEstimatesLoader(events, columns) +class BlazeNextVaryingNumEstimates(NextVaryingNumEstimates): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) + + class WithEstimateWindows(WithEstimates): """ ZiplineTestCase mixin providing fixures and a test to test running a @@ -797,60 +940,69 @@ class WithEstimateWindows(WithEstimates): Tests that we overwrite values with the correct quarter's estimate at the correct dates when we have a factor that asks for a window of data. """ - + END_DATE = pd.Timestamp('2015-02-10') window_test_start_date = pd.Timestamp('2015-01-05') critical_dates = [pd.Timestamp('2015-01-09', tz='utc'), - pd.Timestamp('2015-01-12', tz='utc'), pd.Timestamp('2015-01-15', tz='utc'), - pd.Timestamp('2015-01-20', tz='utc')] - # window length, starting date, num quarters out, timeline. Parameterizes - # over number of quarters out. + pd.Timestamp('2015-01-20', tz='utc'), + pd.Timestamp('2015-01-26', tz='utc'), + pd.Timestamp('2015-02-05', tz='utc'), + pd.Timestamp('2015-02-10', tz='utc')] + # Starting date, number of announcements out. window_test_cases = list(itertools.product(critical_dates, (1, 2))) @classmethod def make_events(cls): + # Typical case: 2 consecutive quarters. sid_0_timeline = pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), - pd.Timestamp('2015-01-07'), - pd.Timestamp('2015-01-05'), - pd.Timestamp('2015-01-17')], + TS_FIELD_NAME: [cls.window_test_start_date, + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-02-10'), + # We want a case where we get info for a later + # quarter before the current quarter is over but + # after the split_asof_date to make sure that + # we choose the correct date to overwrite until. + pd.Timestamp('2015-01-18')], EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-01-10'), + [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-20')], - 'estimate': [100., 101.] + [200., 201.], - FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, + pd.Timestamp('2015-02-10'), + pd.Timestamp('2015-02-10'), + pd.Timestamp('2015-04-01')], + 'estimate': [100., 101.] + [200., 201.] + [400], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2 + [4], FISCAL_YEAR_FIELD_NAME: 2015, SID_FIELD_NAME: 0, }) + # We want a case where we skip a quarter. We never find out about Q2. sid_10_timeline = pd.DataFrame({ TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), pd.Timestamp('2015-01-12'), pd.Timestamp('2015-01-09'), pd.Timestamp('2015-01-15')], EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-12'), pd.Timestamp('2015-01-12'), - pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-15')], + [pd.Timestamp('2015-01-22'), pd.Timestamp('2015-01-22'), + pd.Timestamp('2015-02-05'), pd.Timestamp('2015-02-05')], 'estimate': [110., 111.] + [310., 311.], FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [3] * 2, FISCAL_YEAR_FIELD_NAME: 2015, SID_FIELD_NAME: 10 }) - # Extra sid to make sure we have correct overwrites when sid quarter - # boundaries collide. + # We want to make sure we have correct overwrites when sid quarter + # boundaries collide. This sid's quarter boundaries collide with sid 0. sid_20_timeline = pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + TS_FIELD_NAME: [cls.window_test_start_date, pd.Timestamp('2015-01-07'), - pd.Timestamp('2015-01-05'), + cls.window_test_start_date, pd.Timestamp('2015-01-17')], EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-01-10'), + [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-20')], + pd.Timestamp('2015-02-10'), + pd.Timestamp('2015-02-10')], 'estimate': [120., 121.] + [220., 221.], FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, FISCAL_YEAR_FIELD_NAME: 2015, @@ -877,36 +1029,16 @@ class WithEstimateWindows(WithEstimates): @classmethod def init_class_fixtures(cls): super(WithEstimateWindows, cls).init_class_fixtures() - cls.timelines = cls.make_expected_timelines() - - @classmethod - def create_expected_df(cls, tuples, end_date): - """ - Given a list of tuples of new data we get for each sid on each critical - date (when information changes), create a DataFrame that fills that - data through a date range ending at `end_date`. - """ - df = pd.DataFrame(tuples, - columns=[SID_FIELD_NAME, - 'estimate', - 'knowledge_date']) - df = df.pivot_table(columns=SID_FIELD_NAME, - values='estimate', - index='knowledge_date') - df = df.reindex( - pd.date_range(cls.window_test_start_date, end_date) + cls.create_expected_df_for_factor_compute = partial( + create_expected_df_for_factor_compute, + cls.window_test_start_date, + cls.get_sids() ) - # Index name is lost during reindex. - df.index = df.index.rename('knowledge_date') - df['at_date'] = end_date.tz_localize('utc') - df = df.set_index(['at_date', df.index.tz_localize('utc')]).ffill() - new_sids = set(cls.get_sids()) - set(df.columns) - df = df.reindex(columns=df.columns.union(new_sids)) - return df + cls.timelines = cls.make_expected_timelines() @parameterized.expand(window_test_cases) def test_estimate_windows_at_quarter_boundaries(self, - start_idx, + start_date, num_announcements_out): dataset = QuartersEstimates(num_announcements_out) trading_days = self.trading_days @@ -916,7 +1048,7 @@ class WithEstimateWindows(WithEstimates): # progress through the timeline, all data we got, starting from that # first date, is correctly overwritten. window_len = ( - self.trading_days.get_loc(start_idx) - + self.trading_days.get_loc(start_date) - self.trading_days.get_loc(self.window_test_start_date) + 1 ) @@ -932,8 +1064,9 @@ class WithEstimateWindows(WithEstimates): trading_days[:today_idx + 1] ).values timeline_start_idx = (len(today_timeline) - window_len) - assert_equal(estimate, - today_timeline[timeline_start_idx:]) + assert_almost_equal(estimate, + today_timeline[timeline_start_idx:]) + engine = SimplePipelineEngine( lambda x: self.loader, self.trading_days, @@ -941,9 +1074,9 @@ class WithEstimateWindows(WithEstimates): ) engine.run_pipeline( Pipeline({'est': SomeFactor()}), - start_date=start_idx, + start_date=start_date, # last event date we have - end_date=pd.Timestamp('2015-01-20', tz='utc'), + end_date=pd.Timestamp('2015-02-10', tz='utc'), ) @@ -955,62 +1088,65 @@ class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_expected_timelines(cls): oneq_previous = pd.concat([ - cls.create_expected_df( - [(0, np.NaN, cls.window_test_start_date), - (10, np.NaN, cls.window_test_start_date), - (20, np.NaN, cls.window_test_start_date)], - pd.Timestamp('2015-01-09') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 111, pd.Timestamp('2015-01-12')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-12') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 111, pd.Timestamp('2015-01-12')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-13') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 111, pd.Timestamp('2015-01-12')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-14') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-15') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-16') - ), - cls.create_expected_df( - [(0, 201, pd.Timestamp('2015-01-17')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 221, pd.Timestamp('2015-01-17'))], - pd.Timestamp('2015-01-20') - ), - ]) - - twoq_previous = pd.concat( - [cls.create_expected_df( - [(0, np.NaN, cls.window_test_start_date), - (10, np.NaN, cls.window_test_start_date), - (20, np.NaN, cls.window_test_start_date)], - end_date - ) for end_date in pd.date_range('2015-01-09', '2015-01-19')] + - [cls.create_expected_df( + pd.concat([ + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date) + ], end_date) + for end_date in pd.date_range('2015-01-09', '2015-01-19') + ]), + cls.create_expected_df_for_factor_compute( [(0, 101, pd.Timestamp('2015-01-20')), (10, np.NaN, cls.window_test_start_date), (20, 121, pd.Timestamp('2015-01-20'))], pd.Timestamp('2015-01-20') + ), + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, np.NaN, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-20'))], + pd.Timestamp('2015-01-21') + ), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, 111, pd.Timestamp('2015-01-22')), + (20, 121, pd.Timestamp('2015-01-20'))], + end_date + ) for end_date in pd.date_range('2015-01-22', '2015-02-04') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, 311, pd.Timestamp('2015-02-05')), + (20, 121, pd.Timestamp('2015-01-20'))], + end_date + ) for end_date in pd.date_range('2015-02-05', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 201, pd.Timestamp('2015-02-10')), + (10, 311, pd.Timestamp('2015-02-05')), + (20, 221, pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') + ), + ]) + + twoq_previous = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-02-09')] + + # We never get estimates for S1 for 2Q ago because once Q3 + # becomes our previous quarter, 2Q ago would be Q2, and we have + # no data on it. + [cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-02-10')), + (10, np.NaN, pd.Timestamp('2015-02-05')), + (20, 121, pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') )] ) return { @@ -1019,6 +1155,12 @@ class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): } +class BlazePreviousEstimateWindows(PreviousEstimateWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader(bz.data(events), columns) + + class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): @@ -1027,69 +1169,97 @@ class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_expected_timelines(cls): oneq_next = pd.concat([ - cls.create_expected_df( + cls.create_expected_df_for_factor_compute( [(0, 100, cls.window_test_start_date), - (0, 101, pd.Timestamp('2015-01-07')), (10, 110, pd.Timestamp('2015-01-09')), (20, 120, cls.window_test_start_date), (20, 121, pd.Timestamp('2015-01-07'))], pd.Timestamp('2015-01-09') ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07'))], + end_date + ) for end_date in pd.date_range('2015-01-12', '2015-01-19') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (0, 101, pd.Timestamp('2015-01-20')), (10, 110, pd.Timestamp('2015-01-09')), (10, 111, pd.Timestamp('2015-01-12')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-12') + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07'))], + pd.Timestamp('2015-01-20') ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (10, 310, pd.Timestamp('2015-01-09')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-13') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (10, 310, pd.Timestamp('2015-01-09')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-14') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (10, 310, pd.Timestamp('2015-01-09')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-15') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + end_date + ) for end_date in pd.date_range('2015-01-21', '2015-01-22') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, 310, pd.Timestamp('2015-01-09')), + (10, 311, pd.Timestamp('2015-01-15')), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + end_date + ) for end_date in pd.date_range('2015-01-23', '2015-02-05') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + end_date + ) for end_date in pd.date_range('2015-02-06', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (0, 201, pd.Timestamp('2015-02-10')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + pd.Timestamp('2015-02-10') + ) + ]) + + twoq_next = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), (10, np.NaN, cls.window_test_start_date), (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-16') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (0, 201, pd.Timestamp('2015-01-17')), + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-01-11')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-12', '2015-01-16')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), (10, np.NaN, cls.window_test_start_date), (20, 220, cls.window_test_start_date), (20, 221, pd.Timestamp('2015-01-17'))], pd.Timestamp('2015-01-20') - ), - ]) - - twoq_next = pd.concat( - [cls.create_expected_df( - [(0, 200, pd.Timestamp(cls.window_test_start_date)), - (10, np.NaN, pd.Timestamp(cls.window_test_start_date)), - (20, 220, pd.Timestamp(cls.window_test_start_date))], - pd.Timestamp('2015-01-09') )] + - [cls.create_expected_df( - [(0, np.NaN, pd.Timestamp(cls.window_test_start_date)), - (10, np.NaN, pd.Timestamp(cls.window_test_start_date)), - (20, np.NaN, pd.Timestamp(cls.window_test_start_date))], + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date)], end_date - ) for end_date in pd.date_range('2015-01-12', '2015-01-20')] + ) for end_date in pd.date_range('2015-01-21', '2015-02-10')] ) return { @@ -1098,6 +1268,1340 @@ class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): } +class BlazeNextEstimateWindows(NextEstimateWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader(bz.data(events), columns) + + +class WithSplitAdjustedWindows(WithEstimateWindows): + """ + ZiplineTestCase mixin providing fixures and a test to test running a + Pipeline with an estimates loader over differently-sized windows and with + split adjustments. + """ + + split_adjusted_asof_date = pd.Timestamp('2015-01-14') + + @classmethod + def make_events(cls): + # Add an extra sid that has a release before the split-asof-date in + # order to test that we're reversing splits correctly in the previous + # case (without an overwrite) and in the next case (with an overwrite). + sid_30 = pd.DataFrame({ + TS_FIELD_NAME: [cls.window_test_start_date, + pd.Timestamp('2015-01-09'), + # For Q2, we want it to start early enough + # that we can have several adjustments before + # the end of the first quarter so that we + # can test un-adjusting & readjusting with an + # overwrite. + cls.window_test_start_date, + # We want the Q2 event date to be enough past + # the split-asof-date that we can have + # several splits and can make sure that they + # are applied correctly. + pd.Timestamp('2015-01-20')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-20')], + 'estimate': [130., 131., 230., 231.], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 30 + }) + + # An extra sid to test no splits before the split-adjusted-asof-date. + # We want an event before and after the split-adjusted-asof-date & + # timestamps for data points also before and after + # split-adjsuted-asof-date (but also before the split dates, so that + # we can test that splits actually get applied at the correct times). + sid_40 = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-15')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-02-10')], + 'estimate': [140., 240.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 40 + }) + + # An extra sid to test all splits before the + # split-adjusted-asof-date. All timestamps should be before that date + # so that we have cases where we un-apply and re-apply splits. + sid_50 = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-02-10')], + 'estimate': [150., 250.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 50 + }) + + return pd.concat([ + # Slightly hacky, but want to make sure we're using the same + # events as WithEstimateWindows. + cls.__base__.make_events(), + sid_30, + sid_40, + sid_50, + ]) + + @classmethod + def make_splits_data(cls): + # For sid 0, we want to apply a series of splits before and after the + # split-adjusted-asof-date we well as between quarters (for the + # previous case, where we won't see any values until after the event + # happens). + sid_0_splits = pd.DataFrame({ + SID_FIELD_NAME: 0, + 'ratio': (-1., 2., 3., 4., 5., 6., 7., 100), + 'effective_date': (pd.Timestamp('2014-01-01'), # Filter out + # Split before Q1 event & after first estimate + pd.Timestamp('2015-01-07'), + # Split before Q1 event + pd.Timestamp('2015-01-09'), + # Split before Q1 event + pd.Timestamp('2015-01-13'), + # Split before Q1 event + pd.Timestamp('2015-01-15'), + # Split before Q1 event + pd.Timestamp('2015-01-18'), + # Split after Q1 event and before Q2 event + pd.Timestamp('2015-01-30'), + # Filter out - this is after our date index + pd.Timestamp('2016-01-01')) + }) + + sid_10_splits = pd.DataFrame({ + SID_FIELD_NAME: 10, + 'ratio': (.2, .3), + 'effective_date': ( + # We want a split before the first estimate and before the + # split-adjusted-asof-date but within our calendar index so + # that we can test that the split is NEVER applied. + pd.Timestamp('2015-01-07'), + # Apply a single split before Q1 event. + pd.Timestamp('2015-01-20')), + }) + + # We want a sid with split dates that collide with another sid (0) to + # make sure splits are correctly applied for both sids. + sid_20_splits = pd.DataFrame({ + SID_FIELD_NAME: 20, + 'ratio': (.4, .5, .6, .7, .8, .9,), + 'effective_date': ( + pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-18'), + pd.Timestamp('2015-01-30')), + }) + + # This sid has event dates that are shifted back so that we can test + # cases where an event occurs before the split-asof-date. + sid_30_splits = pd.DataFrame({ + SID_FIELD_NAME: 30, + 'ratio': (8, 9, 10, 11, 12), + 'effective_date': ( + # Split before the event and before the + # split-asof-date. + pd.Timestamp('2015-01-07'), + # Split on date of event but before the + # split-asof-date. + pd.Timestamp('2015-01-09'), + # Split after the event, but before the + # split-asof-date. + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-18')), + }) + + # No splits for a sid before the split-adjusted-asof-date. + sid_40_splits = pd.DataFrame({ + SID_FIELD_NAME: 40, + 'ratio': (13, 14), + 'effective_date': ( + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-22') + ) + }) + + # No splits for a sid after the split-adjusted-asof-date. + sid_50_splits = pd.DataFrame({ + SID_FIELD_NAME: 50, + 'ratio': (15, 16), + 'effective_date': ( + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-14') + ) + }) + + return pd.concat([ + sid_0_splits, + sid_10_splits, + sid_20_splits, + sid_30_splits, + sid_40_splits, + sid_50_splits, + ]) + + +class PreviousWithSplitAdjustedWindows(WithSplitAdjustedWindows, + ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return PreviousSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + @classmethod + def make_expected_timelines(cls): + oneq_previous = pd.concat([ + pd.concat([ + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + # Undo all adjustments that haven't happened yet. + (30, 131*1/10, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150 * 1 / 15 * 1 / 16, pd.Timestamp('2015-01-09')), + ], end_date) + for end_date in pd.date_range('2015-01-09', '2015-01-12') + ]), + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150. * 1 / 16, pd.Timestamp('2015-01-09')), + ], pd.Timestamp('2015-01-13')), + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09')) + ], pd.Timestamp('2015-01-14')), + pd.concat([ + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131*11, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09')), + ], end_date) + for end_date in pd.date_range('2015-01-15', '2015-01-16') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, np.NaN, cls.window_test_start_date), + (20, 121*.7*.8, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-01-20', '2015-01-21') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, 111*.3, pd.Timestamp('2015-01-22')), + (20, 121*.7*.8, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13*14, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-01-22', '2015-01-29') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101*7, pd.Timestamp('2015-01-20')), + (10, 111*.3, pd.Timestamp('2015-01-22')), + (20, 121*.7*.8*.9, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13*14, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-01-30', '2015-02-04') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101*7, pd.Timestamp('2015-01-20')), + (10, 311*.3, pd.Timestamp('2015-02-05')), + (20, 121*.7*.8*.9, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13*14, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-02-05', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 201, pd.Timestamp('2015-02-10')), + (10, 311*.3, pd.Timestamp('2015-02-05')), + (20, 221*.8*.9, pd.Timestamp('2015-02-10')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 240.*13*14, pd.Timestamp('2015-02-10')), + (50, 250., pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') + ), + ]) + + twoq_previous = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-01-19')] + + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131*11*12, pd.Timestamp('2015-01-20'))], + end_date + ) for end_date in pd.date_range('2015-01-20', '2015-02-09')] + + # We never get estimates for S1 for 2Q ago because once Q3 + # becomes our previous quarter, 2Q ago would be Q2, and we have + # no data on it. + [cls.create_expected_df_for_factor_compute( + [(0, 101*7, pd.Timestamp('2015-02-10')), + (10, np.NaN, pd.Timestamp('2015-02-05')), + (20, 121*.7*.8*.9, pd.Timestamp('2015-02-10')), + (30, 131*11*12, pd.Timestamp('2015-01-20')), + (40, 140. * 13 * 14, pd.Timestamp('2015-02-10')), + (50, 150., pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') + )] + ) + return { + 1: oneq_previous, + 2: twoq_previous + } + + +class BlazePreviousWithSplitAdjustedWindows(PreviousWithSplitAdjustedWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + +class NextWithSplitAdjustedWindows(WithSplitAdjustedWindows, ZiplineTestCase): + + @classmethod + def make_loader(cls, events, columns): + return NextSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + @classmethod + def make_expected_timelines(cls): + oneq_next = pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 100*1/4, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (20, 120*5/3, cls.window_test_start_date), + (20, 121*5/3, pd.Timestamp('2015-01-07')), + (30, 130*1/10, cls.window_test_start_date), + (30, 131*1/10, pd.Timestamp('2015-01-09')), + (40, 140, pd.Timestamp('2015-01-09')), + (50, 150.*1/15*1/16, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-09') + ), + cls.create_expected_df_for_factor_compute( + [(0, 100*1/4, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120*5/3, cls.window_test_start_date), + (20, 121*5/3, pd.Timestamp('2015-01-07')), + (30, 230*1/10, cls.window_test_start_date), + (40, np.NaN, pd.Timestamp('2015-01-10')), + (50, 250.*1/15*1/16, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-12') + ), + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07')), + (30, 230, cls.window_test_start_date), + (40, np.NaN, pd.Timestamp('2015-01-10')), + (50, 250.*1/16, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-13') + ), + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07')), + (30, 230, cls.window_test_start_date), + (40, np.NaN, pd.Timestamp('2015-01-10')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-14') + ), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 100*5, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120*.7, cls.window_test_start_date), + (20, 121*.7, pd.Timestamp('2015-01-07')), + (30, 230*11, cls.window_test_start_date), + (40, 240, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-01-15', '2015-01-16') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 100*5*6, cls.window_test_start_date), + (0, 101, pd.Timestamp('2015-01-20')), + (10, 110*.3, pd.Timestamp('2015-01-09')), + (10, 111*.3, pd.Timestamp('2015-01-12')), + (20, 120*.7*.8, cls.window_test_start_date), + (20, 121*.7*.8, pd.Timestamp('2015-01-07')), + (30, 230*11*12, cls.window_test_start_date), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 240*13, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-20') + ), + cls.create_expected_df_for_factor_compute( + [(0, 200 * 5 * 6, pd.Timestamp('2015-01-12')), + (10, 110 * .3, pd.Timestamp('2015-01-09')), + (10, 111 * .3, pd.Timestamp('2015-01-12')), + (20, 220 * .7 * .8, cls.window_test_start_date), + (20, 221 * .8, pd.Timestamp('2015-01-17')), + (40, 240 * 13, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-21') + ), + cls.create_expected_df_for_factor_compute( + [(0, 200 * 5 * 6, pd.Timestamp('2015-01-12')), + (10, 110 * .3, pd.Timestamp('2015-01-09')), + (10, 111 * .3, pd.Timestamp('2015-01-12')), + (20, 220 * .7 * .8, cls.window_test_start_date), + (20, 221 * .8, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-22') + ), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6, pd.Timestamp('2015-01-12')), + (10, 310*.3, pd.Timestamp('2015-01-09')), + (10, 311*.3, pd.Timestamp('2015-01-15')), + (20, 220*.7*.8, cls.window_test_start_date), + (20, 221*.8, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-01-23', '2015-01-29') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6*7, pd.Timestamp('2015-01-12')), + (10, 310*.3, pd.Timestamp('2015-01-09')), + (10, 311*.3, pd.Timestamp('2015-01-15')), + (20, 220*.7*.8*.9, cls.window_test_start_date), + (20, 221*.8*.9, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-01-30', '2015-02-05') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6*7, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7*.8*.9, cls.window_test_start_date), + (20, 221*.8*.9, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-02-06', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6*7, pd.Timestamp('2015-01-12')), + (0, 201, pd.Timestamp('2015-02-10')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7*.8*.9, cls.window_test_start_date), + (20, 221*.8*.9, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-02-10') + ) + ]) + + twoq_next = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, 220*5/3, cls.window_test_start_date), + (30, 230*1/10, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date), + (50, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-09') + )] + + [cls.create_expected_df_for_factor_compute( + [(0, 200*1/4, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*5/3, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-12') + )] + + [cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-13', '2015-01-14')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200*5, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-15', '2015-01-16')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200*5*6, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7*.8, cls.window_test_start_date), + (20, 221*.8, pd.Timestamp('2015-01-17')), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-20') + )] + + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-21', '2015-02-10')] + ) + + return { + 1: oneq_next, + 2: twoq_next + } + + +class BlazeNextWithSplitAdjustedWindows(NextWithSplitAdjustedWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + +class WithSplitAdjustedMultipleEstimateColumns(WithEstimates): + """ + ZiplineTestCase mixin for having multiple estimate columns that are + split-adjusted to make sure that adjustments are applied correctly. + + Attributes + ---------- + test_start_date : pd.Timestamp + The start date of the test. + test_end_date : pd.Timestamp + The start date of the test. + split_adjusted_asof : pd.Timestamp + The split-adjusted-asof-date of the data used in the test, to be used + to create all loaders of test classes that subclass this mixin. + + Methods + ------- + make_expected_timelines_1q_out -> dict[pd.Timestamp -> dict[str -> + np.array]] + The expected array of results for each date of the date range for + each column. Only for 1 quarter out. + + make_expected_timelines_2q_out -> dict[pd.Timestamp -> dict[str -> + np.array]] + The expected array of results for each date of the date range. For 2 + quarters out, so only for the column that is requested to be loaded + with 2 quarters out. + + Tests + ----- + test_adjustments_with_multiple_adjusted_columns + Tests that if you have multiple columns, we still split-adjust + correctly. + + test_multiple_datasets_different_num_announcements + Tests that if you have multiple datasets that ask for a different + number of quarters out, and each asks for a different estimates column, + we still split-adjust correctly. + """ + END_DATE = pd.Timestamp('2015-02-10') + test_start_date = pd.Timestamp('2015-01-06', tz='utc') + test_end_date = pd.Timestamp('2015-01-12', tz='utc') + split_adjusted_asof = pd.Timestamp('2015-01-08') + + @classmethod + def make_columns(cls): + return { + MultipleColumnsEstimates.event_date: 'event_date', + MultipleColumnsEstimates.fiscal_quarter: 'fiscal_quarter', + MultipleColumnsEstimates.fiscal_year: 'fiscal_year', + MultipleColumnsEstimates.estimate1: 'estimate1', + MultipleColumnsEstimates.estimate2: 'estimate2' + } + + @classmethod + def make_events(cls): + sid_0_events = pd.DataFrame({ + # We only want a stale KD here so that adjustments + # will be applied. + TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-05')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12')], + 'estimate1': [1100., 1200.], + 'estimate2': [2100., 2200.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 0, + }) + + # This is just an extra sid to make sure that we apply adjustments + # correctly for multiple columns when we have multiple sids. + sid_1_events = pd.DataFrame({ + # We only want a stale KD here so that adjustments + # will be applied. + TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-05')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-08'), + pd.Timestamp('2015-01-11')], + 'estimate1': [1110., 1210.], + 'estimate2': [2110., 2210.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 1, + }) + return pd.concat([sid_0_events, sid_1_events]) + + @classmethod + def make_splits_data(cls): + sid_0_splits = pd.DataFrame({ + SID_FIELD_NAME: 0, + 'ratio': (.3, 3.), + 'effective_date': (pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-09')), + }) + + sid_1_splits = pd.DataFrame({ + SID_FIELD_NAME: 1, + 'ratio': (.4, 4.), + 'effective_date': (pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-09')), + }) + + return pd.concat([sid_0_splits, sid_1_splits]) + + @classmethod + def make_expected_timelines_1q_out(cls): + return {} + + @classmethod + def make_expected_timelines_2q_out(cls): + return {} + + @classmethod + def init_class_fixtures(cls): + super( + WithSplitAdjustedMultipleEstimateColumns, cls + ).init_class_fixtures() + cls.timelines_1q_out = cls.make_expected_timelines_1q_out() + cls.timelines_2q_out = cls.make_expected_timelines_2q_out() + + def test_adjustments_with_multiple_adjusted_columns(self): + dataset = MultipleColumnsQuartersEstimates(1) + timelines = self.timelines_1q_out + window_len = 3 + + class SomeFactor(CustomFactor): + inputs = [dataset.estimate1, dataset.estimate2] + window_length = window_len + + def compute(self, today, assets, out, estimate1, estimate2): + assert_almost_equal(estimate1, timelines[today]['estimate1']) + assert_almost_equal(estimate2, timelines[today]['estimate2']) + + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + engine.run_pipeline( + Pipeline({'est': SomeFactor()}), + start_date=self.test_start_date, + # last event date we have + end_date=self.test_end_date, + ) + + def test_multiple_datasets_different_num_announcements(self): + dataset1 = MultipleColumnsQuartersEstimates(1) + dataset2 = MultipleColumnsQuartersEstimates(2) + timelines_1q_out = self.timelines_1q_out + timelines_2q_out = self.timelines_2q_out + window_len = 3 + + class SomeFactor1(CustomFactor): + inputs = [dataset1.estimate1] + window_length = window_len + + def compute(self, today, assets, out, estimate1): + assert_almost_equal( + estimate1, timelines_1q_out[today]['estimate1'] + ) + + class SomeFactor2(CustomFactor): + inputs = [dataset2.estimate2] + window_length = window_len + + def compute(self, today, assets, out, estimate2): + assert_almost_equal( + estimate2, timelines_2q_out[today]['estimate2'] + ) + + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + engine.run_pipeline( + Pipeline({'est1': SomeFactor1(), 'est2': SomeFactor2()}), + start_date=self.test_start_date, + # last event date we have + end_date=self.test_end_date, + ) + + +class PreviousWithSplitAdjustedMultipleEstimateColumns( + WithSplitAdjustedMultipleEstimateColumns, ZiplineTestCase +): + @classmethod + def make_loader(cls, events, columns): + return PreviousSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + @classmethod + def make_expected_timelines_1q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 3), + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 3), + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 2 + + [[np.NaN, 1110.]]), + 'estimate2': np.array([[np.NaN, np.NaN]] * 2 + + [[np.NaN, 2110.]]) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] + + [[np.NaN, 1110. * 4]] + + [[1100 * 3., 1110. * 4]]), + 'estimate2': np.array([[np.NaN, np.NaN]] + + [[np.NaN, 2110. * 4]] + + [[2100 * 3., 2110. * 4]]) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 2 + + [[1200 * 3., 1210. * 4]]), + 'estimate2': np.array([[np.NaN, np.NaN]] * 2 + + [[2200 * 3., 2210. * 4]]) + } + } + + @classmethod + def make_expected_timelines_2q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 2 + + [[2100 * 3., 2110. * 4]]) + } + } + + +class BlazePreviousWithMultipleEstimateColumns( + PreviousWithSplitAdjustedMultipleEstimateColumns +): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + +class NextWithSplitAdjustedMultipleEstimateColumns( + WithSplitAdjustedMultipleEstimateColumns, ZiplineTestCase +): + @classmethod + def make_loader(cls, events, columns): + return NextSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + @classmethod + def make_expected_timelines_1q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] + + [[1100. * 1/.3, 1110. * 1/.4]] * 2), + 'estimate2': np.array([[np.NaN, np.NaN]] + + [[2100. * 1/.3, 2110. * 1/.4]] * 2), + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate1': np.array([[1100., 1110.]] * 3), + 'estimate2': np.array([[2100., 2110.]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate1': np.array([[1100., 1110.]] * 3), + 'estimate2': np.array([[2100., 2110.]] * 3) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate1': np.array([[1100 * 3., 1210. * 4]] * 3), + 'estimate2': np.array([[2100 * 3., 2210. * 4]] * 3) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate1': np.array([[1200 * 3., np.NaN]] * 3), + 'estimate2': np.array([[2200 * 3., np.NaN]] * 3) + } + } + + @classmethod + def make_expected_timelines_2q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] + + [[2200 * 1/.3, 2210. * 1/.4]] * 2) + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate2': np.array([[2200., 2210.]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate2': np.array([[2200, 2210.]] * 3) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate2': np.array([[2200 * 3., np.NaN]] * 3) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + } + } + + +class BlazeNextWithMultipleEstimateColumns( + NextWithSplitAdjustedMultipleEstimateColumns +): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + +class WithAdjustmentBoundaries(WithEstimates): + """ + ZiplineTestCase mixin providing class-level attributes, methods, + and a test to make sure that when the split-adjusted-asof-date is not + strictly within the date index, we can still apply adjustments correctly. + + Attributes + ---------- + split_adjusted_before_start : pd.Timestamp + A split-adjusted-asof-date before the start date of the test. + split_adjusted_after_end : pd.Timestamp + A split-adjusted-asof-date before the end date of the test. + split_adjusted_asof_dates : list of tuples of pd.Timestamp + All the split-adjusted-asof-dates over which we want to parameterize + the test. + + Methods + ------- + make_expected_out -> dict[pd.Timestamp -> pd.DataFrame] + A dictionary of the expected output of the pipeline at each of the + dates of interest. + """ + START_DATE = pd.Timestamp('2015-01-04') + # We want to run the pipeline starting from `START_DATE`, but the + # pipeline results will start from the next day, which is + # `test_start_date`. + test_start_date = pd.Timestamp('2015-01-05') + END_DATE = test_end_date = pd.Timestamp('2015-01-12') + split_adjusted_before_start = ( + test_start_date - timedelta(days=1) + ) + split_adjusted_after_end = ( + test_end_date + timedelta(days=1) + ) + # Must parametrize over this because there can only be 1 such date for + # each set of data. + split_adjusted_asof_dates = [(test_start_date,), + (test_end_date,), + (split_adjusted_before_start,), + (split_adjusted_after_end,)] + + @classmethod + def init_class_fixtures(cls): + super(WithAdjustmentBoundaries, cls).init_class_fixtures() + cls.s0 = cls.asset_finder.retrieve_asset(0) + cls.s1 = cls.asset_finder.retrieve_asset(1) + cls.s2 = cls.asset_finder.retrieve_asset(2) + cls.s3 = cls.asset_finder.retrieve_asset(3) + cls.s4 = cls.asset_finder.retrieve_asset(4) + cls.expected = cls.make_expected_out() + + @classmethod + def make_events(cls): + # We can create a sid for each configuration of dates for KDs, events, + # and splits. For this test we don't care about overwrites so we only + # test 1 quarter. + sid_0_timeline = pd.DataFrame({ + # KD on first date of index + TS_FIELD_NAME: cls.test_start_date, + EVENT_DATE_FIELD_NAME: pd.Timestamp('2015-01-09'), + 'estimate': 10., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 0, + }, index=[0]) + + sid_1_timeline = pd.DataFrame({ + TS_FIELD_NAME: cls.test_start_date, + # event date on first date of index + EVENT_DATE_FIELD_NAME: cls.test_start_date, + 'estimate': 11., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 1, + }, index=[0]) + + sid_2_timeline = pd.DataFrame({ + # KD on first date of index + TS_FIELD_NAME: cls.test_end_date, + EVENT_DATE_FIELD_NAME: cls.test_end_date + timedelta(days=1), + 'estimate': 12., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 2, + }, index=[0]) + + sid_3_timeline = pd.DataFrame({ + TS_FIELD_NAME: cls.test_end_date - timedelta(days=1), + EVENT_DATE_FIELD_NAME: cls.test_end_date, + 'estimate': 13., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 3, + }, index=[0]) + + # KD and event date don't fall on date index boundaries + sid_4_timeline = pd.DataFrame({ + TS_FIELD_NAME: cls.test_end_date - timedelta(days=1), + EVENT_DATE_FIELD_NAME: cls.test_end_date - timedelta(days=1), + 'estimate': 14., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 4, + }, index=[0]) + + return pd.concat([sid_0_timeline, + sid_1_timeline, + sid_2_timeline, + sid_3_timeline, + sid_4_timeline]) + + @classmethod + def make_splits_data(cls): + # Here we want splits that collide + sid_0_splits = pd.DataFrame({ + SID_FIELD_NAME: 0, + 'ratio': .10, + 'effective_date': cls.test_start_date, + }, index=[0]) + + sid_1_splits = pd.DataFrame({ + SID_FIELD_NAME: 1, + 'ratio': .11, + 'effective_date': cls.test_start_date, + }, index=[0]) + + sid_2_splits = pd.DataFrame({ + SID_FIELD_NAME: 2, + 'ratio': .12, + 'effective_date': cls.test_end_date, + }, index=[0]) + + sid_3_splits = pd.DataFrame({ + SID_FIELD_NAME: 3, + 'ratio': .13, + 'effective_date': cls.test_end_date, + }, index=[0]) + + # We want 2 splits here - at the starting boundary and at the end + # boundary - while there is no collision with KD/event date for the + # sid. + sid_4_splits = pd.DataFrame({ + SID_FIELD_NAME: 4, + 'ratio': (.14, .15), + 'effective_date': (cls.test_start_date, cls.test_end_date), + }) + + return pd.concat([sid_0_splits, + sid_1_splits, + sid_2_splits, + sid_3_splits, + sid_4_splits]) + + @parameterized.expand(split_adjusted_asof_dates) + def test_boundaries(self, split_date): + dataset = QuartersEstimates(1) + loader = self.loader(split_adjusted_asof=split_date) + engine = SimplePipelineEngine( + lambda x: loader, + self.trading_days, + self.asset_finder, + ) + result = engine.run_pipeline( + Pipeline({'estimate': dataset.estimate.latest}), + start_date=self.trading_days[0], + # last event date we have + end_date=self.trading_days[-1], + ) + expected = self.expected[split_date] + assert_frame_equal(result, expected, check_names=False) + + @classmethod + def make_expected_out(cls): + return {} + + +class PreviousWithAdjustmentBoundaries(WithAdjustmentBoundaries, + ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return partial(PreviousSplitAdjustedEarningsEstimatesLoader, + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + @classmethod + def make_expected_out(cls): + split_adjusted_at_start_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': np.NaN, + }, index=pd.date_range( + cls.test_start_date, + pd.Timestamp('2015-01-08'), + tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10., + }, index=pd.date_range( + pd.Timestamp('2015-01-09'), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': np.NaN + }, index=pd.date_range(cls.test_start_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(1), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13. * .13 + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(2), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14. * .15 + }, index=pd.date_range( + cls.test_end_date - timedelta(1), cls.test_end_date, tz='utc' + )), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack( + SID_FIELD_NAME, dropna=False) + + split_adjusted_at_end_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': np.NaN, + }, index=pd.date_range( + cls.test_start_date, pd.Timestamp('2015-01-08'), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10., + }, index=pd.date_range( + pd.Timestamp('2015-01-09'), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': np.NaN + }, index=pd.date_range(cls.test_start_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(1), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13. + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(2), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14. + }, index=pd.date_range(cls.test_end_date - timedelta(1), + cls.test_end_date, + tz='utc')), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack(SID_FIELD_NAME, + dropna=False) + + split_adjusted_before_start_boundary = split_adjusted_at_start_boundary + split_adjusted_after_end_boundary = split_adjusted_at_end_boundary + + return {cls.test_start_date: + split_adjusted_at_start_boundary, + cls.split_adjusted_before_start: + split_adjusted_before_start_boundary, + cls.test_end_date: + split_adjusted_at_end_boundary, + cls.split_adjusted_after_end: + split_adjusted_after_end_boundary} + + +class BlazePreviousWithAdjustmentBoundaries(PreviousWithAdjustmentBoundaries): + @classmethod + def make_loader(cls, events, columns): + return partial(BlazePreviousSplitAdjustedEstimatesLoader, + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + +class NextWithAdjustmentBoundaries(WithAdjustmentBoundaries, + ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return partial(NextSplitAdjustedEarningsEstimatesLoader, + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + @classmethod + def make_expected_out(cls): + split_adjusted_at_start_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10, + }, index=pd.date_range( + cls.test_start_date, pd.Timestamp('2015-01-09'), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, + cls.test_start_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': 12., + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13. * .13, + }, index=pd.date_range( + cls.test_end_date - timedelta(1), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14., + }, index=pd.date_range( + cls.test_end_date - timedelta(1), + cls.test_end_date - timedelta(1), + tz='utc' + )), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack( + SID_FIELD_NAME, dropna=False) + + split_adjusted_at_end_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10, + }, index=pd.date_range( + cls.test_start_date, pd.Timestamp('2015-01-09'), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, + cls.test_start_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': 12., + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13., + }, index=pd.date_range( + cls.test_end_date - timedelta(1), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14., + }, index=pd.date_range( + cls.test_end_date - timedelta(1), + cls.test_end_date - timedelta(1), + tz='utc' + )), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack( + SID_FIELD_NAME, dropna=False) + + split_adjusted_before_start_boundary = split_adjusted_at_start_boundary + split_adjusted_after_end_boundary = split_adjusted_at_end_boundary + + return {cls.test_start_date: + split_adjusted_at_start_boundary, + cls.split_adjusted_before_start: + split_adjusted_before_start_boundary, + cls.test_end_date: + split_adjusted_at_end_boundary, + cls.split_adjusted_after_end: + split_adjusted_after_end_boundary} + + +class BlazeNextWithAdjustmentBoundaries(NextWithAdjustmentBoundaries): + @classmethod + def make_loader(cls, events, columns): + return partial(BlazeNextSplitAdjustedEstimatesLoader, + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + class QuarterShiftTestCase(ZiplineTestCase): """ This tests, in isolation, quarter calculation logic for shifting quarters diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py index 3cb1064a..bf046a9b 100644 --- a/zipline/pipeline/loaders/blaze/estimates.py +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -17,7 +17,8 @@ from zipline.pipeline.loaders.earnings_estimates import ( PreviousEarningsEstimatesLoader, required_estimates_fields, metadata_columns, -) + PreviousSplitAdjustedEarningsEstimatesLoader, + NextSplitAdjustedEarningsEstimatesLoader) from zipline.pipeline.loaders.utils import ( check_data_query_args, ) @@ -108,6 +109,7 @@ class BlazeEstimatesLoader(PipelineLoader): # Only load requested columns. requested_column_names = [self._columns[column.name] for column in columns] + raw = load_raw_data( assets, dates, @@ -120,7 +122,7 @@ class BlazeEstimatesLoader(PipelineLoader): return self.loader( raw, - {column.name: self._columns[column.name] for column in columns} + {column.name: self._columns[column.name] for column in columns}, ).load_adjusted_array( columns, dates, @@ -135,3 +137,65 @@ class BlazeNextEstimatesLoader(BlazeEstimatesLoader): class BlazePreviousEstimatesLoader(BlazeEstimatesLoader): loader = PreviousEarningsEstimatesLoader + + +class BlazeSplitAdjustedEstimatesLoader(BlazeEstimatesLoader): + def __init__(self, + expr, + columns, + split_adjustments_loader, + split_adjusted_column_names, + split_adjusted_asof, + **kwargs): + self._split_adjustments = split_adjustments_loader + self._split_adjusted_column_names = split_adjusted_column_names + self._split_adjusted_asof = split_adjusted_asof + super(BlazeSplitAdjustedEstimatesLoader, self).__init__( + expr, + columns, + **kwargs + ) + + def load_adjusted_array(self, columns, dates, assets, mask): + # Only load requested columns. + requested_column_names = [self._columns[column.name] + for column in columns] + + requested_spilt_adjusted_columns = [ + column_name + for column_name in self._split_adjusted_column_names + if column_name in requested_column_names + ] + + raw = load_raw_data( + assets, + dates, + self._data_query_time, + self._data_query_tz, + self._expr[sorted(metadata_columns.union(requested_column_names))], + self._odo_kwargs, + checkpoints=self._checkpoints, + ) + + return self.loader( + raw, + {column.name: self._columns[column.name] for column in columns}, + self._split_adjustments, + requested_spilt_adjusted_columns, + self._split_adjusted_asof, + ).load_adjusted_array( + columns, + dates, + assets, + mask, + ) + + +class BlazeNextSplitAdjustedEstimatesLoader(BlazeSplitAdjustedEstimatesLoader): + loader = NextSplitAdjustedEarningsEstimatesLoader + + +class BlazePreviousSplitAdjustedEstimatesLoader( + BlazeSplitAdjustedEstimatesLoader +): + loader = PreviousSplitAdjustedEarningsEstimatesLoader diff --git a/zipline/pipeline/loaders/earnings_estimates.py b/zipline/pipeline/loaders/earnings_estimates.py index 1fd6dcb9..367301a9 100644 --- a/zipline/pipeline/loaders/earnings_estimates.py +++ b/zipline/pipeline/loaders/earnings_estimates.py @@ -1,5 +1,4 @@ from abc import abstractmethod, abstractproperty -from collections import defaultdict import numpy as np import pandas as pd @@ -11,6 +10,7 @@ from zipline.lib.adjustment import ( Datetime641DArrayOverwrite, Datetime64Overwrite, Float641DArrayOverwrite, + Float64Multiply, Float64Overwrite, ) @@ -91,11 +91,23 @@ def validate_column_specs(events, columns): ) +def add_new_adjustments(adjustments_dict, + adjustments, + column_name, + ts): + try: + adjustments_dict[column_name][ts].extend(adjustments) + except KeyError: + adjustments_dict[column_name][ts] = adjustments + + class EarningsEstimatesLoader(PipelineLoader): """ An abstract pipeline loader for estimates data that can load data a variable number of quarters forwards/backwards from calendar dates depending on the `num_announcements` attribute of the columns' dataset. + If split adjustments are to be applied, a loader, split-adjusted columns, + and the split-adjusted asof-date must be supplied. Parameters ---------- @@ -152,7 +164,7 @@ class EarningsEstimatesLoader(PipelineLoader): self.name_map = name_map @abstractmethod - def get_zeroth_quarter_idx(self, num_announcements, last, dates): + def get_zeroth_quarter_idx(self, stacked_last_per_qtr): raise NotImplementedError('get_zeroth_quarter_idx') @abstractmethod @@ -167,7 +179,9 @@ class EarningsEstimatesLoader(PipelineLoader): next_qtr_start_idx, requested_quarter, sid, - sid_idx): + sid_idx, + col_to_split_adjustments, + split_adjusted_asof_idx): raise NotImplementedError('create_overwrite_for_estimate') @abstractproperty @@ -193,6 +207,9 @@ class EarningsEstimatesLoader(PipelineLoader): stacked_last_per_qtr : pd.DataFrame The latest estimate known with the dates, normalized quarter, and sid as the index. + num_announcements : int + The number of annoucements out the user requested relative to + each date in the calendar dates. dates : pd.DatetimeIndex The calendar dates for which estimates data is requested. @@ -236,13 +253,185 @@ class EarningsEstimatesLoader(PipelineLoader): # dates so that we have a value for each calendar date. return requested_qtr_data.unstack(SID_FIELD_NAME).reindex(dates) + def get_split_adjusted_asof_idx(self, dates): + """ + Compute the index in `dates` where the split-adjusted-asof-date + falls. This is the date up to which, and including which, we will + need to unapply all adjustments for and then re-apply them as they + come in. After this date, adjustments are applied as normal. + + Parameters + ---------- + dates : pd.DatetimeIndex + The calendar dates over which the Pipeline is being computed. + + Returns + ------- + split_adjusted_asof_idx : int + The index in `dates` at which the data should be split. + """ + split_adjusted_asof_idx = dates.searchsorted( + self._split_adjusted_asof + ) + # The split-asof date is after the date index. + if split_adjusted_asof_idx == len(dates): + split_adjusted_asof_idx = len(dates) - 1 + elif self._split_adjusted_asof < dates[0].tz_localize(None): + split_adjusted_asof_idx = -1 + return split_adjusted_asof_idx + + def collect_overwrites_for_sid(self, + group, + dates, + requested_qtr_data, + last_per_qtr, + sid_idx, + columns, + all_adjustments_for_sid, + sid): + """ + Given a sid, collect all overwrites that should be applied for this + sid at each quarter boundary. + + Parameters + ---------- + group : pd.DataFrame + The data for `sid`. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + requested_qtr_data : pd.DataFrame + The DataFrame with the latest values for the requested quarter + for all columns. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter. + sid_idx : int + The sid's index in the asset index. + columns : list of BoundColumn + The columns for which the overwrites should be computed. + all_adjustments_for_sid : dict[int -> AdjustedArray] + A dictionary of the integer index of each timestamp into the date + index, mapped to adjustments that should be applied at that + index for the given sid (`sid`). This dictionary is modified as + adjustments are collected. + sid : int + The sid for which overwrites should be computed. + """ + next_qtr_start_indices = dates.searchsorted( + group[EVENT_DATE_FIELD_NAME].values, + side=self.searchsorted_side, + ) + + qtrs_with_estimates = group.index.get_level_values( + NORMALIZED_QUARTERS + ).values + for idx in next_qtr_start_indices: + if 0 < idx < len(dates): + # Find the quarter being requested in the quarter we're + # crossing into. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS, sid, + ].iloc[idx] + # Only add adjustments if the next quarter starts somewhere + # in our date index for this sid. Our 'next' quarter can + # never start at index 0; a starting index of 0 means that + # the next quarter's event date was NaT. + self.create_overwrites_for_quarter( + all_adjustments_for_sid, + idx, + last_per_qtr, + qtrs_with_estimates, + requested_quarter, + sid, + sid_idx, + columns + ) + + def get_adjustments_for_sid(self, + group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx, + columns, + col_to_all_adjustments, + **kwargs): + """ + + Parameters + ---------- + group : pd.DataFrame + The data for the given sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + requested_qtr_data : pd.DataFrame + The DataFrame with the latest values for the requested quarter + for all columns. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter. + sid_to_idx : dict[int -> int] + A dictionary mapping sid to he sid's index in the asset index. + columns : list of BoundColumn + The columns for which the overwrites should be computed. + col_to_all_adjustments : dict[int -> AdjustedArray] + A dictionary of the integer index of each timestamp into the date + index, mapped to adjustments that should be applied at that + index. This dictionary is for adjustments for ALL sids. It is + modified as adjustments are collected. + kwargs : + Additional arguments used in collecting adjustments; unused here. + """ + # Collect all adjustments for a given sid. + all_adjustments_for_sid = {} + sid = int(group.name) + self.collect_overwrites_for_sid(group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx[sid], + columns, + all_adjustments_for_sid, + sid) + self.merge_into_adjustments_for_all_sids( + all_adjustments_for_sid, col_to_all_adjustments + ) + + def merge_into_adjustments_for_all_sids(self, + all_adjustments_for_sid, + col_to_all_adjustments): + """ + Merge adjustments for a particular sid into a dictionary containing + adjustments for all sids. + + Parameters + ---------- + all_adjustments_for_sid : dict[int -> AdjustedArray] + All adjustments for a particular sid. + col_to_all_adjustments : dict[int -> AdjustedArray] + All adjustments for all sids. + """ + + for col_name in all_adjustments_for_sid: + if col_name not in col_to_all_adjustments: + col_to_all_adjustments[col_name] = {} + for ts in all_adjustments_for_sid[col_name]: + adjs = all_adjustments_for_sid[col_name][ts] + add_new_adjustments(col_to_all_adjustments, + adjs, + col_name, + ts) + def get_adjustments(self, zero_qtr_data, requested_qtr_data, last_per_qtr, dates, assets, - columns): + columns, + **kwargs): """ Creates an AdjustedArray from the given estimates data for the given dates. @@ -263,13 +452,17 @@ class EarningsEstimatesLoader(PipelineLoader): An index of all the assets from the raw data. columns : list of BoundColumn The columns for which adjustments need to be calculated. + kwargs : + Additional keyword arguments that should be forwarded to + `get_adjustments_for_sid` and to be used in computing adjustments + for each sid. Returns ------- - adjusted_array : AdjustedArray - The array of data and overwrites for the given column. + col_to_all_adjustments : dict[int -> AdjustedArray] + A dictionary of all adjustments that should be applied. """ - col_to_overwrites = defaultdict(dict) + zero_qtr_data.sort_index(inplace=True) # Here we want to get the LAST record from each group of records # corresponding to a single quarter. This is to ensure that we select @@ -278,50 +471,29 @@ class EarningsEstimatesLoader(PipelineLoader): level=[SID_FIELD_NAME, NORMALIZED_QUARTERS] ).nth(-1) + col_to_all_adjustments = {} sid_to_idx = dict(zip(assets, range(len(assets)))) + quarter_shifts.groupby(level=SID_FIELD_NAME).apply( + self.get_adjustments_for_sid, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx, + columns, + col_to_all_adjustments, + **kwargs + ) + return col_to_all_adjustments - for column in columns: - column_name = self.name_map[column.name] - col_to_overwrites[column_name] = defaultdict(list) - - def collect_adjustments(group): - next_qtr_start_indices = dates.searchsorted( - group[EVENT_DATE_FIELD_NAME].values, - side=self.searchsorted_side, - ) - sid = int(group.name) - qtrs_with_estimates = group.index.get_level_values( - NORMALIZED_QUARTERS - ).values - for idx in next_qtr_start_indices: - if 0 < idx < len(dates): - # Only add adjustments if the next quarter starts somewhere - # in our date index for this sid. Our 'next' quarter can - # never start at index 0; a starting index of 0 means that - # the next quarter's event date was NaT. - self.create_overwrite_for_quarter( - col_to_overwrites, - idx, - last_per_qtr, - qtrs_with_estimates, - requested_qtr_data, - sid, - sid_to_idx[sid], - columns, - ) - - quarter_shifts.groupby(level=SID_FIELD_NAME).apply(collect_adjustments) - return col_to_overwrites - - def create_overwrite_for_quarter(self, - col_to_overwrites, - next_qtr_start_idx, - last_per_qtr, - quarters_with_estimates_for_sid, - requested_qtr_data, - sid, - sid_idx, - columns): + def create_overwrites_for_quarter(self, + col_to_overwrites, + next_qtr_start_idx, + last_per_qtr, + quarters_with_estimates_for_sid, + requested_quarter, + sid, + sid_idx, + columns): """ Add entries to the dictionary of columns to adjustments for the given sid and the given quarter. @@ -343,6 +515,8 @@ class EarningsEstimatesLoader(PipelineLoader): quarters_with_estimates_for_sid : np.array An array of all quarters for which there are estimates for the given sid. + requested_quarter : float + The quarter for which the overwrite should be created. sid : int The sid for which to create overwrites. sid_idx : int @@ -350,45 +524,42 @@ class EarningsEstimatesLoader(PipelineLoader): columns : list of BoundColumn The columns for which to create overwrites. """ - - # Find the quarter being requested in the quarter we're - # crossing into. - requested_quarter = requested_qtr_data[ - SHIFTED_NORMALIZED_QTRS, sid, - ].iloc[next_qtr_start_idx] for col in columns: column_name = self.name_map[col.name] + if column_name not in col_to_overwrites: + col_to_overwrites[column_name] = {} # If there are estimates for the requested quarter, # overwrite all values going up to the starting index of # that quarter with estimates for that quarter. if requested_quarter in quarters_with_estimates_for_sid: - col_to_overwrites[column_name][next_qtr_start_idx].append( - self.create_overwrite_for_estimate( - col, - column_name, - last_per_qtr, - next_qtr_start_idx, - requested_quarter, - sid, - sid_idx - ), + adjs = self.create_overwrite_for_estimate( + col, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx, ) + add_new_adjustments(col_to_overwrites, + adjs, + column_name, + next_qtr_start_idx) # There are no estimates for the quarter. Overwrite all # values going up to the starting index of that quarter # with the missing value for this column. else: - col_to_overwrites[column_name][next_qtr_start_idx].append( - self.overwrite_with_null( + adjs = [self.overwrite_with_null( col, - last_per_qtr.index, next_qtr_start_idx, - sid_idx - ), - ) + sid_idx)] + add_new_adjustments(col_to_overwrites, + adjs, + column_name, + next_qtr_start_idx) def overwrite_with_null(self, column, - dates, next_qtr_start_idx, sid_idx): return self.scalar_overwrites_dict[column.dtype]( @@ -553,10 +724,13 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): next_qtr_start_idx, requested_quarter, sid, - sid_idx): - return self.array_overwrites_dict[column.dtype]( + sid_idx, + col_to_split_adjustments=None, + split_adjusted_asof_idx=None): + # if not isinstance(sid_idx, int): + # import pdb; pdb.set_trace() + return [self.array_overwrites_dict[column.dtype]( 0, - # overwrite thru last qtr next_qtr_start_idx - 1, sid_idx, sid_idx, @@ -565,7 +739,7 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): requested_quarter, sid, ].values[:next_qtr_start_idx], - ) + )] def get_shifted_qtrs(self, zero_qtrs, num_announcements): return zero_qtrs + (num_announcements - 1) @@ -611,13 +785,15 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): next_qtr_start_idx, requested_quarter, sid, - sid_idx): - return self.overwrite_with_null( + sid_idx, + col_to_split_adjustments=None, + split_adjusted_asof_idx=None, + split_dict=None): + return [self.overwrite_with_null( column, - dates, next_qtr_start_idx, sid_idx, - ) + )] def get_shifted_qtrs(self, zero_qtrs, num_announcements): return zero_qtrs - (num_announcements - 1) @@ -651,3 +827,699 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): # sorted by event date. ).nth(-1) return previous_releases_per_date.index + + +def validate_split_adjusted_column_specs(name_map, columns): + to_be_split = set(columns) + available = set(name_map.keys()) + extra = to_be_split - available + if extra: + raise ValueError( + "EarningsEstimatesLoader got the following extra columns to be " + "split-adjusted: {extra}.\n" + "Got Columns: {to_be_split}\n" + "Available Columns: {available}".format( + extra=sorted(extra), + to_be_split=sorted(to_be_split), + available=sorted(available), + ) + ) + + +class SplitAdjustedEstimatesLoader(EarningsEstimatesLoader): + """ + Estimates loader that loads data that needs to be split-adjusted. + + Parameters + ---------- + split_adjustments_loader : SQLiteAdjustmentReader + The loader to use for reading split adjustments. + split_adjusted_column_names : iterable of str + The column names that should be split-adjusted. + split_adjusted_asof : pd.Timestamp + The date that separates data into 2 halves: the first half is the set + of dates up to and including the split_adjusted_asof date. All + adjustments occurring during this first half are applied to all + dates in this first half. The second half is the set of dates after + the split_adjusted_asof date. All adjustments occurring during this + second half are applied sequentially as they appear in the timeline. + """ + def __init__(self, + estimates, + name_map, + split_adjustments_loader, + split_adjusted_column_names, + split_adjusted_asof): + validate_split_adjusted_column_specs(name_map, + split_adjusted_column_names) + self._split_adjustments = split_adjustments_loader + self._split_adjusted_column_names = split_adjusted_column_names + self._split_adjusted_asof = split_adjusted_asof + self._split_adjustment_dict = {} + super(SplitAdjustedEstimatesLoader, self).__init__( + estimates, + name_map + ) + + @abstractmethod + def collect_split_adjustments(self, + adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + raise NotImplementedError('collect_split_adjustments') + + def get_adjustments_for_sid(self, + group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx, + columns, + col_to_all_adjustments, + split_adjusted_asof_idx=None, + split_adjusted_cols_for_group=None): + """ + Collects both overwrites and adjustments for a particular sid. + + Parameters + ---------- + split_adjusted_asof_idx : int + The integer index of the date on which the data was split-adjusted. + split_adjusted_cols_for_group : list of str + The names of requested columns that should also be split-adjusted. + """ + all_adjustments_for_sid = {} + sid = int(group.name) + self.collect_overwrites_for_sid(group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx[sid], + columns, + all_adjustments_for_sid, + sid) + (pre_adjustments, + post_adjustments) = self.retrieve_split_adjustment_data_for_sid( + dates, sid, split_adjusted_asof_idx + ) + sid_estimates = self.estimates[ + self.estimates[SID_FIELD_NAME] == sid + ] + # We might not have any overwrites but still have + # adjustments, and we will need to manually add columns if + # that is the case. + for col_name in split_adjusted_cols_for_group: + if col_name not in all_adjustments_for_sid: + all_adjustments_for_sid[col_name] = {} + + self.collect_split_adjustments( + all_adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_to_idx[sid], + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + split_adjusted_cols_for_group + ) + self.merge_into_adjustments_for_all_sids( + all_adjustments_for_sid, col_to_all_adjustments + ) + + def get_adjustments(self, + zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets, + columns, + **kwargs): + """ + Calculates both split adjustments and overwrites for all sids. + """ + split_adjusted_cols_for_group = [ + self.name_map[col.name] + for col in columns + if self.name_map[col.name] in self._split_adjusted_column_names + ] + # Add all splits to the adjustment dict for this sid. + split_adjusted_asof_idx = self.get_split_adjusted_asof_idx( + dates + ) + return super(SplitAdjustedEstimatesLoader, self).get_adjustments( + zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets, + columns, + split_adjusted_cols_for_group=split_adjusted_cols_for_group, + split_adjusted_asof_idx=split_adjusted_asof_idx + ) + + def determine_end_idx_for_adjustment(self, + adjustment_ts, + dates, + upper_bound, + requested_quarter, + sid_estimates): + """ + Determines the date until which the adjustment at the given date + index should be applied for the given quarter. + + Parameters + ---------- + adjustment_ts : pd.Timestamp + The timestamp at which the adjustment occurs. + dates : pd.DatetimeIndex + The calendar dates over which the Pipeline is being computed. + upper_bound : int + The index of the upper bound in the calendar dates. This is the + index until which the adjusment will be applied unless there is + information for the requested quarter that comes in on or before + that date. + requested_quarter : float + The quarter for which we are determining how the adjustment + should be applied. + sid_estimates : pd.DataFrame + The DataFrame of estimates data for the sid for which we're + applying the given adjustment. + + Returns + ------- + end_idx : int + The last index to which the adjustment should be applied for the + given quarter/sid. + """ + end_idx = upper_bound + # Find the next newest kd that happens on or after + # the date of this adjustment + newest_kd_for_qtr = sid_estimates[ + (sid_estimates[NORMALIZED_QUARTERS] == requested_quarter) & + (sid_estimates[TS_FIELD_NAME] >= adjustment_ts) + ][TS_FIELD_NAME].min() + if pd.notnull(newest_kd_for_qtr): + newest_kd_idx = dates.searchsorted( + newest_kd_for_qtr + ) + # We have fresh information that comes in + # before the end of the overwrite and + # presumably is already split-adjusted to the + # current split. We should stop applying the + # adjustment the day before this new + # information comes in. + if newest_kd_idx <= upper_bound: + end_idx = newest_kd_idx - 1 + return end_idx + + def collect_pre_split_asof_date_adjustments( + self, + split_adjusted_asof_date_idx, + sid_idx, + pre_adjustments, + requested_split_adjusted_columns + ): + """ + Collect split adjustments that occur before the + split-adjusted-asof-date. All those adjustments must first be + UN-applied at the first date index and then re-applied on the + appropriate dates in order to match point in time share pricing data. + + Parameters + ---------- + split_adjusted_asof_date_idx : int + The index in the calendar dates as-of which all data was + split-adjusted. + sid_idx : int + The index of the sid for which adjustments should be collected in + the adjusted array. + pre_adjustments : tuple(list(float), list(int)) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_split_adjusted_columns : list of str + The requested split adjusted columns. + + Returns + ------- + col_to_split_adjustments : dict[str -> dict[int -> list of Adjustment]] + The adjustments for this sid that occurred on or before the + split-asof-date. + """ + col_to_split_adjustments = {} + if len(pre_adjustments[0]): + adjustment_values, date_indexes = pre_adjustments + for column_name in requested_split_adjusted_columns: + col_to_split_adjustments[column_name] = {} + # We need to undo all adjustments that happen before the + # split_asof_date here by reversing the split ratio. + col_to_split_adjustments[column_name][0] = [Float64Multiply( + 0, + split_adjusted_asof_date_idx, + sid_idx, + sid_idx, + 1 / future_adjustment + ) for future_adjustment in adjustment_values] + + for adjustment, date_index in zip(adjustment_values, + date_indexes): + adj = Float64Multiply( + 0, + split_adjusted_asof_date_idx, + sid_idx, + sid_idx, + adjustment + ) + add_new_adjustments(col_to_split_adjustments, + [adj], + column_name, + date_index) + + return col_to_split_adjustments + + def collect_post_asof_split_adjustments(self, + post_adjustments, + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + requested_split_adjusted_columns): + """ + Collect split adjustments that occur after the + split-adjusted-asof-date. Each adjustment needs to be applied to all + dates on which knowledge for the requested quarter was older than the + date of the adjustment. + + Parameters + ---------- + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + sid : int + The sid for which adjustments need to be collected. + sid_idx : int + The index of `sid` in the adjusted array. + sid_estimates : pd.DataFrame + The raw estimates data for this sid. + requested_split_adjusted_columns : list of str + The requested split adjusted columns. + Returns + ------- + col_to_split_adjustments : dict[str -> dict[int -> list of Adjustment]] + The adjustments for this sid that occurred after the + split-asof-date. + """ + col_to_split_adjustments = {} + if post_adjustments: + # Get an integer index + requested_qtr_timeline = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS + ][sid].reset_index() + requested_qtr_timeline = requested_qtr_timeline[ + requested_qtr_timeline[sid].notnull() + ] + + # Split the data into range by quarter and determine which quarter + # was being requested in each range. + # Split integer indexes up by quarter range + qtr_ranges_idxs = np.split( + requested_qtr_timeline.index, + np.where(np.diff(requested_qtr_timeline[sid]) != 0)[0] + 1 + ) + requested_quarters_per_range = [requested_qtr_timeline[sid][r[0]] + for r in qtr_ranges_idxs] + # Try to apply each adjustment to each quarter range. + for i, qtr_range in enumerate(qtr_ranges_idxs): + for adjustment, date_index, timestamp in zip( + *post_adjustments + ): + # In the default case, apply through the end of the quarter + upper_bound = qtr_range[-1] + # Find the smallest KD in estimates that is on or after the + # date of the given adjustment. Apply the given adjustment + # until that KD. + end_idx = self.determine_end_idx_for_adjustment( + timestamp, + requested_qtr_data.index, + upper_bound, + requested_quarters_per_range[i], + sid_estimates + ) + # In the default case, apply adjustment on the first day of + # the quarter. + start_idx = qtr_range[0] + # If the adjustment happens during this quarter, apply the + # adjustment on the day it happens. + if date_index > start_idx: + start_idx = date_index + # We only want to apply the adjustment if we have any stale + # data to apply it to. + if qtr_range[0] <= end_idx: + for column_name in requested_split_adjusted_columns: + if column_name not in col_to_split_adjustments: + col_to_split_adjustments[column_name] = {} + adj = Float64Multiply( + # Always apply from first day of qtr + qtr_range[0], + end_idx, + sid_idx, + sid_idx, + adjustment + ) + add_new_adjustments( + col_to_split_adjustments, + [adj], + column_name, + start_idx + ) + + return col_to_split_adjustments + + def retrieve_split_adjustment_data_for_sid(self, + dates, + sid, + split_adjusted_asof_idx): + """ + dates : pd.DatetimeIndex + The calendar dates. + sid : int + The sid for which we want to retrieve adjustments. + split_adjusted_asof_idx : int + The index in `dates` as-of which the data is split adjusted. + + Returns + ------- + pre_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values and indexes in `dates` for + adjustments that happened before the split-asof-date. + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + """ + adjustments = self._split_adjustments.get_adjustments_for_sid( + 'splits', sid + ) + sorted(adjustments, key=lambda adj: adj[0]) + # Get rid of any adjustments that happen outside of our date index. + adjustments = list(filter(lambda x: dates[0] <= x[0] <= dates[-1], + adjustments)) + adjustment_values = np.array([adj[1] for adj in adjustments]) + timestamps = pd.DatetimeIndex([adj[0] for adj in adjustments]) + # We need the first date on which we would have known about each + # adjustment. + date_indexes = dates.searchsorted(timestamps) + pre_adjustment_idxs = np.where( + date_indexes <= split_adjusted_asof_idx + )[0] + last_adjustment_split_asof_idx = -1 + if len(pre_adjustment_idxs): + last_adjustment_split_asof_idx = pre_adjustment_idxs.max() + pre_adjustments = ( + adjustment_values[:last_adjustment_split_asof_idx + 1], + date_indexes[:last_adjustment_split_asof_idx + 1] + ) + post_adjustments = ( + adjustment_values[last_adjustment_split_asof_idx + 1:], + date_indexes[last_adjustment_split_asof_idx + 1:], + timestamps[last_adjustment_split_asof_idx + 1:] + ) + return pre_adjustments, post_adjustments + + def _collect_adjustments(self, + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + + pre_adjustments_dict = self.collect_pre_split_asof_date_adjustments( + split_adjusted_asof_idx, + sid_idx, + pre_adjustments, + requested_split_adjusted_columns + ) + + post_adjustments_dict = self.collect_post_asof_split_adjustments( + post_adjustments, + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + requested_split_adjusted_columns + ) + return pre_adjustments_dict, post_adjustments_dict + + def merge_split_adjustments_with_overwrites( + self, + pre, + post, + overwrites, + requested_split_adjusted_columns + ): + """ + Merge split adjustments with the dict containing overwrites. + + Parameters + ---------- + pre : dict[str -> dict[int -> list]] + The adjustments that occur before the split-adjusted-asof-date. + post : dict[str -> dict[int -> list]] + The adjustments that occur after the split-adjusted-asof-date. + overwrites : dict[str -> dict[int -> list]] + The overwrites across all time. Adjustments will be merged into + this dictionary. + requested_split_adjusted_columns : list of str + List of names of split adjusted columns that are being requested. + """ + for column_name in requested_split_adjusted_columns: + # We can do a merge here because the timestamps in 'pre' and + # 'post' are guaranteed to not overlap. + if pre: + # Either empty or contains all columns. + for ts in pre[column_name]: + add_new_adjustments( + overwrites, + pre[column_name][ts], + column_name, + ts + ) + if post: + # Either empty or contains all columns. + for ts in post[column_name]: + add_new_adjustments( + overwrites, + post[column_name][ts], + column_name, + ts + ) + + +class PreviousSplitAdjustedEarningsEstimatesLoader( + SplitAdjustedEstimatesLoader, PreviousEarningsEstimatesLoader +): + def collect_split_adjustments(self, + adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + """ + Collect split adjustments for previous quarters and apply them to the + given dictionary of splits for the given sid. Since overwrites just + replace all estimates before the new quarter with NaN, we don't need to + worry about re-applying split adjustments. + + Parameters + ---------- + adjustments_for_sid : dict[str -> dict[int -> list]] + The dictionary of adjustments to which splits need to be added. + Initially it contains only overwrites. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + sid : int + The sid for which adjustments need to be collected. + sid_idx : int + The index of `sid` in the adjusted array. + sid_estimates : pd.DataFrame + The raw estimates data for the given sid. + split_adjusted_asof_idx : int + The index in `dates` as-of which the data is split adjusted. + pre_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values and indexes in `dates` for + adjustments that happened before the split-asof-date. + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_split_adjusted_columns : list of str + List of requested split adjusted column names. + """ + (pre_adjustments_dict, + post_adjustments_dict) = self._collect_adjustments( + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns + ) + self.merge_split_adjustments_with_overwrites( + pre_adjustments_dict, + post_adjustments_dict, + adjustments_for_sid, + requested_split_adjusted_columns + ) + + +class NextSplitAdjustedEarningsEstimatesLoader( + SplitAdjustedEstimatesLoader, NextEarningsEstimatesLoader +): + def collect_split_adjustments(self, + adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + """ + Collect split adjustments for future quarters. Re-apply adjustments + that would be overwritten by overwrites. Merge split adjustments with + overwrites into the given dictionary of splits for the given sid. + + Parameters + ---------- + adjustments_for_sid : dict[str -> dict[int -> list]] + The dictionary of adjustments to which splits need to be added. + Initially it contains only overwrites. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + sid : int + The sid for which adjustments need to be collected. + sid_idx : int + The index of `sid` in the adjusted array. + sid_estimates : pd.DataFrame + The raw estimates data for the given sid. + split_adjusted_asof_idx : int + The index in `dates` as-of which the data is split adjusted. + pre_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values and indexes in `dates` for + adjustments that happened before the split-asof-date. + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_split_adjusted_columns : list of str + List of requested split adjusted column names. + """ + (pre_adjustments_dict, + post_adjustments_dict) = self._collect_adjustments( + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns, + ) + for column_name in requested_split_adjusted_columns: + for overwrite_ts in adjustments_for_sid[column_name]: + # We need to cumulatively re-apply all adjustments up to the + # split-adjusted-asof-date. We might not have any + # pre-adjustments, so we should check for that. + if overwrite_ts <= split_adjusted_asof_idx \ + and pre_adjustments_dict: + for split_ts in pre_adjustments_dict[column_name]: + # The split has to have occurred during the span of + # the overwrite. + if split_ts < overwrite_ts: + # Create new adjustments here so that we can + # re-apply all applicable adjustments to ONLY + # the dates being overwritten. + adjustments_for_sid[ + column_name + ][overwrite_ts].extend([ + Float64Multiply( + 0, + overwrite_ts - 1, + sid_idx, + sid_idx, + adjustment.value + ) + for adjustment + in pre_adjustments_dict[ + column_name + ][split_ts] + ]) + # After the split-adjusted-asof-date, we need to re-apply all + # adjustments that occur after that date and within the + # bounds of the overwrite. They need to be applied starting + # from the first date and until an end date. The end date is + # the date of the newest information we get about + # `requested_quarter` that is >= `split_ts`, or if there is no + # new knowledge before `overwrite_ts`, then it is the date + # before `overwrite_ts`. + else: + # Overwrites happen at the first index of a new quarter, + # so determine here which quarter that is. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS, sid + ].iloc[overwrite_ts] + + for adjustment_value, date_index, timestamp in zip( + *post_adjustments + ): + if split_adjusted_asof_idx < date_index < overwrite_ts: + # Assume the entire overwrite contains stale data + upper_bound = overwrite_ts - 1 + end_idx = self.determine_end_idx_for_adjustment( + timestamp, + dates, + upper_bound, + requested_quarter, + sid_estimates + ) + adjustments_for_sid[ + column_name + ][overwrite_ts].append( + Float64Multiply( + 0, + end_idx, + sid_idx, + sid_idx, + adjustment_value + ) + ) + + self.merge_split_adjustments_with_overwrites( + pre_adjustments_dict, + post_adjustments_dict, + adjustments_for_sid, + requested_split_adjusted_columns + )