diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index 109e7904..52cd08e1 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -1,9 +1,14 @@ +from __future__ import division + +from datetime import timedelta +from functools import partial + import blaze as bz import itertools from nose.tools import assert_true from nose_parameterized import parameterized import numpy as np -from numpy.testing import assert_array_equal +from numpy.testing import assert_array_equal, assert_almost_equal import pandas as pd from toolz import merge @@ -19,21 +24,25 @@ from zipline.pipeline.data import DataSet from zipline.pipeline.data import Column from zipline.pipeline.loaders.blaze.estimates import ( BlazeNextEstimatesLoader, - BlazePreviousEstimatesLoader -) + BlazePreviousEstimatesLoader, + BlazeNextSplitAdjustedEstimatesLoader, + BlazePreviousSplitAdjustedEstimatesLoader) from zipline.pipeline.loaders.earnings_estimates import ( INVALID_NUM_QTRS_MESSAGE, NextEarningsEstimatesLoader, + NextSplitAdjustedEarningsEstimatesLoader, normalize_quarters, PreviousEarningsEstimatesLoader, + PreviousSplitAdjustedEarningsEstimatesLoader, split_normalized_quarters, ) from zipline.testing.fixtures import ( - WithAssetFinder, + WithAdjustmentReader, WithTradingSessions, ZiplineTestCase, ) from zipline.testing.predicates import assert_equal, assert_raises_regex +from zipline.testing.predicates import assert_frame_equal from zipline.utils.numpy_utils import datetime64ns_dtype from zipline.utils.numpy_utils import float64_dtype @@ -45,6 +54,14 @@ class Estimates(DataSet): estimate = Column(dtype=float64_dtype) +class MultipleColumnsEstimates(DataSet): + event_date = Column(dtype=datetime64ns_dtype) + fiscal_quarter = Column(dtype=float64_dtype) + fiscal_year = Column(dtype=float64_dtype) + estimate1 = Column(dtype=float64_dtype) + estimate2 = Column(dtype=float64_dtype) + + def QuartersEstimates(announcements_out): class QtrEstimates(Estimates): num_announcements = announcements_out @@ -52,13 +69,48 @@ def QuartersEstimates(announcements_out): return QtrEstimates +def MultipleColumnsQuartersEstimates(announcements_out): + class QtrEstimates(MultipleColumnsEstimates): + num_announcements = announcements_out + name = Estimates + return QtrEstimates + + def QuartersEstimatesNoNumQuartersAttr(num_qtr): class QtrEstimates(Estimates): name = Estimates return QtrEstimates -class WithEstimates(WithTradingSessions, WithAssetFinder): +def create_expected_df_for_factor_compute(start_date, + sids, + tuples, + end_date): + """ + Given a list of tuples of new data we get for each sid on each critical + date (when information changes), create a DataFrame that fills that + data through a date range ending at `end_date`. + """ + df = pd.DataFrame(tuples, + columns=[SID_FIELD_NAME, + 'estimate', + 'knowledge_date']) + df = df.pivot_table(columns=SID_FIELD_NAME, + values='estimate', + index='knowledge_date') + df = df.reindex( + pd.date_range(start_date, end_date) + ) + # Index name is lost during reindex. + df.index = df.index.rename('knowledge_date') + df['at_date'] = end_date.tz_localize('utc') + df = df.set_index(['at_date', df.index.tz_localize('utc')]).ffill() + new_sids = set(sids) - set(df.columns) + df = df.reindex(columns=df.columns.union(new_sids)) + return df + + +class WithEstimates(WithTradingSessions, WithAdjustmentReader): """ ZiplineTestCase mixin providing cls.loader and cls.events as class level fixtures. @@ -74,6 +126,9 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): columns : dict[str -> str] The dictionary mapping the names of BoundColumns to the associated column name in the events DataFrame. + make_columns() -> dict[BoundColumn -> str] + Method which returns a dictionary of BoundColumns mapped to the + associated column names in the raw data. """ # Short window defined in order for test to run faster. @@ -93,22 +148,40 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): return cls.events[SID_FIELD_NAME].unique() @classmethod - def init_class_fixtures(cls): - cls.events = cls.make_events() - cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids() - cls.columns = { + def make_columns(cls): + return { Estimates.event_date: 'event_date', Estimates.fiscal_quarter: 'fiscal_quarter', Estimates.fiscal_year: 'fiscal_year', Estimates.estimate: 'estimate' } - cls.loader = cls.make_loader(cls.events, {column.name: val for - column, val in - cls.columns.items()}) + + @classmethod + def init_class_fixtures(cls): + cls.events = cls.make_events() + cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids() cls.ASSET_FINDER_EQUITY_SYMBOLS = [ 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS ] + # We need to instantiate certain constants needed by supers of + # `WithEstimates` before we call their `init_class_fixtures`. super(WithEstimates, cls).init_class_fixtures() + cls.columns = cls.make_columns() + # Some tests require `WithAdjustmentReader` to be set up by the time we + # make the loader. + cls.loader = cls.make_loader(cls.events, {column.name: val for + column, val in + cls.columns.items()}) + + +dummy_df = pd.DataFrame({SID_FIELD_NAME: 0}, + columns=[SID_FIELD_NAME, + TS_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + 'estimate'], + index=[0]) class WithWrongLoaderDefinition(WithEstimates): @@ -133,14 +206,7 @@ class WithWrongLoaderDefinition(WithEstimates): @classmethod def make_events(cls): - return pd.DataFrame({SID_FIELD_NAME: 0}, - columns=[SID_FIELD_NAME, - TS_FIELD_NAME, - EVENT_DATE_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - 'estimate'], - index=[0]) + return dummy_df def test_wrong_num_announcements_passed(self): bad_dataset1 = QuartersEstimates(-1) @@ -205,6 +271,47 @@ class NextWithWrongNumQuarters(WithWrongLoaderDefinition, return NextEarningsEstimatesLoader(events, columns) +options = ["split_adjustments_loader", + "split_adjusted_column_names", + "split_adjusted_asof"] + + +class WrongSplitsLoaderDefinition(WithEstimates, ZiplineTestCase): + """ + Test class that tests that loaders break correctly when incorrectly + instantiated. + + Tests + ----- + test_extra_splits_columns_passed(SplitAdjustedEstimatesLoader) + A test that checks that the loader correctly breaks when an + unexpected column is passed in the list of split-adjusted columns. + """ + @classmethod + def init_class_fixtures(cls): + super(WithEstimates, cls).init_class_fixtures() + + @parameterized.expand(itertools.product( + (NextSplitAdjustedEarningsEstimatesLoader, + PreviousSplitAdjustedEarningsEstimatesLoader), + )) + def test_extra_splits_columns_passed(self, loader): + columns = { + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.estimate: 'estimate' + } + + with self.assertRaises(ValueError): + loader(dummy_df, + {column.name: val for column, val in + columns.items()}, + split_adjustments_loader=self.adjustment_reader, + split_adjusted_column_names=["estimate", "extra_col"], + split_adjusted_asof=pd.Timestamp("2015-01-01")) + + class WithEstimatesTimeZero(WithEstimates): """ ZiplineTestCase mixin providing cls.events as a class level fixture and @@ -618,6 +725,15 @@ class NextEstimateMultipleQuarters( return expected +class BlazeNextEstimateMultipleQuarters(NextEstimateMultipleQuarters): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) + + class PreviousEstimateMultipleQuarters( WithEstimateMultipleQuarters, ZiplineTestCase @@ -658,6 +774,15 @@ class PreviousEstimateMultipleQuarters( return expected +class BlazePreviousEstimateMultipleQuarters(PreviousEstimateMultipleQuarters): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) + + class WithVaryingNumEstimates(WithEstimates): """ ZiplineTestCase mixin providing fixtures and a test to ensure that we @@ -749,6 +874,15 @@ class PreviousVaryingNumEstimates( return PreviousEarningsEstimatesLoader(events, columns) +class BlazePreviousVaryingNumEstimates(PreviousVaryingNumEstimates): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) + + class NextVaryingNumEstimates( WithVaryingNumEstimates, ZiplineTestCase @@ -771,6 +905,15 @@ class NextVaryingNumEstimates( return NextEarningsEstimatesLoader(events, columns) +class BlazeNextVaryingNumEstimates(NextVaryingNumEstimates): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) + + class WithEstimateWindows(WithEstimates): """ ZiplineTestCase mixin providing fixures and a test to test running a @@ -797,60 +940,69 @@ class WithEstimateWindows(WithEstimates): Tests that we overwrite values with the correct quarter's estimate at the correct dates when we have a factor that asks for a window of data. """ - + END_DATE = pd.Timestamp('2015-02-10') window_test_start_date = pd.Timestamp('2015-01-05') critical_dates = [pd.Timestamp('2015-01-09', tz='utc'), - pd.Timestamp('2015-01-12', tz='utc'), pd.Timestamp('2015-01-15', tz='utc'), - pd.Timestamp('2015-01-20', tz='utc')] - # window length, starting date, num quarters out, timeline. Parameterizes - # over number of quarters out. + pd.Timestamp('2015-01-20', tz='utc'), + pd.Timestamp('2015-01-26', tz='utc'), + pd.Timestamp('2015-02-05', tz='utc'), + pd.Timestamp('2015-02-10', tz='utc')] + # Starting date, number of announcements out. window_test_cases = list(itertools.product(critical_dates, (1, 2))) @classmethod def make_events(cls): + # Typical case: 2 consecutive quarters. sid_0_timeline = pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), - pd.Timestamp('2015-01-07'), - pd.Timestamp('2015-01-05'), - pd.Timestamp('2015-01-17')], + TS_FIELD_NAME: [cls.window_test_start_date, + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-02-10'), + # We want a case where we get info for a later + # quarter before the current quarter is over but + # after the split_asof_date to make sure that + # we choose the correct date to overwrite until. + pd.Timestamp('2015-01-18')], EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-01-10'), + [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-20')], - 'estimate': [100., 101.] + [200., 201.], - FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, + pd.Timestamp('2015-02-10'), + pd.Timestamp('2015-02-10'), + pd.Timestamp('2015-04-01')], + 'estimate': [100., 101.] + [200., 201.] + [400], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2 + [4], FISCAL_YEAR_FIELD_NAME: 2015, SID_FIELD_NAME: 0, }) + # We want a case where we skip a quarter. We never find out about Q2. sid_10_timeline = pd.DataFrame({ TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), pd.Timestamp('2015-01-12'), pd.Timestamp('2015-01-09'), pd.Timestamp('2015-01-15')], EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-12'), pd.Timestamp('2015-01-12'), - pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-15')], + [pd.Timestamp('2015-01-22'), pd.Timestamp('2015-01-22'), + pd.Timestamp('2015-02-05'), pd.Timestamp('2015-02-05')], 'estimate': [110., 111.] + [310., 311.], FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [3] * 2, FISCAL_YEAR_FIELD_NAME: 2015, SID_FIELD_NAME: 10 }) - # Extra sid to make sure we have correct overwrites when sid quarter - # boundaries collide. + # We want to make sure we have correct overwrites when sid quarter + # boundaries collide. This sid's quarter boundaries collide with sid 0. sid_20_timeline = pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + TS_FIELD_NAME: [cls.window_test_start_date, pd.Timestamp('2015-01-07'), - pd.Timestamp('2015-01-05'), + cls.window_test_start_date, pd.Timestamp('2015-01-17')], EVENT_DATE_FIELD_NAME: - [pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-01-10'), + [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-20')], + pd.Timestamp('2015-02-10'), + pd.Timestamp('2015-02-10')], 'estimate': [120., 121.] + [220., 221.], FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, FISCAL_YEAR_FIELD_NAME: 2015, @@ -877,36 +1029,16 @@ class WithEstimateWindows(WithEstimates): @classmethod def init_class_fixtures(cls): super(WithEstimateWindows, cls).init_class_fixtures() - cls.timelines = cls.make_expected_timelines() - - @classmethod - def create_expected_df(cls, tuples, end_date): - """ - Given a list of tuples of new data we get for each sid on each critical - date (when information changes), create a DataFrame that fills that - data through a date range ending at `end_date`. - """ - df = pd.DataFrame(tuples, - columns=[SID_FIELD_NAME, - 'estimate', - 'knowledge_date']) - df = df.pivot_table(columns=SID_FIELD_NAME, - values='estimate', - index='knowledge_date') - df = df.reindex( - pd.date_range(cls.window_test_start_date, end_date) + cls.create_expected_df_for_factor_compute = partial( + create_expected_df_for_factor_compute, + cls.window_test_start_date, + cls.get_sids() ) - # Index name is lost during reindex. - df.index = df.index.rename('knowledge_date') - df['at_date'] = end_date.tz_localize('utc') - df = df.set_index(['at_date', df.index.tz_localize('utc')]).ffill() - new_sids = set(cls.get_sids()) - set(df.columns) - df = df.reindex(columns=df.columns.union(new_sids)) - return df + cls.timelines = cls.make_expected_timelines() @parameterized.expand(window_test_cases) def test_estimate_windows_at_quarter_boundaries(self, - start_idx, + start_date, num_announcements_out): dataset = QuartersEstimates(num_announcements_out) trading_days = self.trading_days @@ -916,7 +1048,7 @@ class WithEstimateWindows(WithEstimates): # progress through the timeline, all data we got, starting from that # first date, is correctly overwritten. window_len = ( - self.trading_days.get_loc(start_idx) - + self.trading_days.get_loc(start_date) - self.trading_days.get_loc(self.window_test_start_date) + 1 ) @@ -932,8 +1064,9 @@ class WithEstimateWindows(WithEstimates): trading_days[:today_idx + 1] ).values timeline_start_idx = (len(today_timeline) - window_len) - assert_equal(estimate, - today_timeline[timeline_start_idx:]) + assert_almost_equal(estimate, + today_timeline[timeline_start_idx:]) + engine = SimplePipelineEngine( lambda x: self.loader, self.trading_days, @@ -941,9 +1074,9 @@ class WithEstimateWindows(WithEstimates): ) engine.run_pipeline( Pipeline({'est': SomeFactor()}), - start_date=start_idx, + start_date=start_date, # last event date we have - end_date=pd.Timestamp('2015-01-20', tz='utc'), + end_date=pd.Timestamp('2015-02-10', tz='utc'), ) @@ -955,62 +1088,65 @@ class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_expected_timelines(cls): oneq_previous = pd.concat([ - cls.create_expected_df( - [(0, np.NaN, cls.window_test_start_date), - (10, np.NaN, cls.window_test_start_date), - (20, np.NaN, cls.window_test_start_date)], - pd.Timestamp('2015-01-09') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 111, pd.Timestamp('2015-01-12')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-12') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 111, pd.Timestamp('2015-01-12')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-13') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 111, pd.Timestamp('2015-01-12')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-14') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-15') - ), - cls.create_expected_df( - [(0, 101, pd.Timestamp('2015-01-10')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 121, pd.Timestamp('2015-01-10'))], - pd.Timestamp('2015-01-16') - ), - cls.create_expected_df( - [(0, 201, pd.Timestamp('2015-01-17')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 221, pd.Timestamp('2015-01-17'))], - pd.Timestamp('2015-01-20') - ), - ]) - - twoq_previous = pd.concat( - [cls.create_expected_df( - [(0, np.NaN, cls.window_test_start_date), - (10, np.NaN, cls.window_test_start_date), - (20, np.NaN, cls.window_test_start_date)], - end_date - ) for end_date in pd.date_range('2015-01-09', '2015-01-19')] + - [cls.create_expected_df( + pd.concat([ + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date) + ], end_date) + for end_date in pd.date_range('2015-01-09', '2015-01-19') + ]), + cls.create_expected_df_for_factor_compute( [(0, 101, pd.Timestamp('2015-01-20')), (10, np.NaN, cls.window_test_start_date), (20, 121, pd.Timestamp('2015-01-20'))], pd.Timestamp('2015-01-20') + ), + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, np.NaN, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-20'))], + pd.Timestamp('2015-01-21') + ), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, 111, pd.Timestamp('2015-01-22')), + (20, 121, pd.Timestamp('2015-01-20'))], + end_date + ) for end_date in pd.date_range('2015-01-22', '2015-02-04') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, 311, pd.Timestamp('2015-02-05')), + (20, 121, pd.Timestamp('2015-01-20'))], + end_date + ) for end_date in pd.date_range('2015-02-05', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 201, pd.Timestamp('2015-02-10')), + (10, 311, pd.Timestamp('2015-02-05')), + (20, 221, pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') + ), + ]) + + twoq_previous = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-02-09')] + + # We never get estimates for S1 for 2Q ago because once Q3 + # becomes our previous quarter, 2Q ago would be Q2, and we have + # no data on it. + [cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-02-10')), + (10, np.NaN, pd.Timestamp('2015-02-05')), + (20, 121, pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') )] ) return { @@ -1019,6 +1155,12 @@ class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): } +class BlazePreviousEstimateWindows(PreviousEstimateWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader(bz.data(events), columns) + + class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): @@ -1027,69 +1169,97 @@ class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_expected_timelines(cls): oneq_next = pd.concat([ - cls.create_expected_df( + cls.create_expected_df_for_factor_compute( [(0, 100, cls.window_test_start_date), - (0, 101, pd.Timestamp('2015-01-07')), (10, 110, pd.Timestamp('2015-01-09')), (20, 120, cls.window_test_start_date), (20, 121, pd.Timestamp('2015-01-07'))], pd.Timestamp('2015-01-09') ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07'))], + end_date + ) for end_date in pd.date_range('2015-01-12', '2015-01-19') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (0, 101, pd.Timestamp('2015-01-20')), (10, 110, pd.Timestamp('2015-01-09')), (10, 111, pd.Timestamp('2015-01-12')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-12') + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07'))], + pd.Timestamp('2015-01-20') ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (10, 310, pd.Timestamp('2015-01-09')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-13') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (10, 310, pd.Timestamp('2015-01-09')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-14') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (10, 310, pd.Timestamp('2015-01-09')), - (10, 311, pd.Timestamp('2015-01-15')), - (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-15') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + end_date + ) for end_date in pd.date_range('2015-01-21', '2015-01-22') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, 310, pd.Timestamp('2015-01-09')), + (10, 311, pd.Timestamp('2015-01-15')), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + end_date + ) for end_date in pd.date_range('2015-01-23', '2015-02-05') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + end_date + ) for end_date in pd.date_range('2015-02-06', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (0, 201, pd.Timestamp('2015-02-10')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date), + (20, 221, pd.Timestamp('2015-01-17'))], + pd.Timestamp('2015-02-10') + ) + ]) + + twoq_next = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), (10, np.NaN, cls.window_test_start_date), (20, 220, cls.window_test_start_date)], - pd.Timestamp('2015-01-16') - ), - cls.create_expected_df( - [(0, 200, cls.window_test_start_date), - (0, 201, pd.Timestamp('2015-01-17')), + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-01-11')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-12', '2015-01-16')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), (10, np.NaN, cls.window_test_start_date), (20, 220, cls.window_test_start_date), (20, 221, pd.Timestamp('2015-01-17'))], pd.Timestamp('2015-01-20') - ), - ]) - - twoq_next = pd.concat( - [cls.create_expected_df( - [(0, 200, pd.Timestamp(cls.window_test_start_date)), - (10, np.NaN, pd.Timestamp(cls.window_test_start_date)), - (20, 220, pd.Timestamp(cls.window_test_start_date))], - pd.Timestamp('2015-01-09') )] + - [cls.create_expected_df( - [(0, np.NaN, pd.Timestamp(cls.window_test_start_date)), - (10, np.NaN, pd.Timestamp(cls.window_test_start_date)), - (20, np.NaN, pd.Timestamp(cls.window_test_start_date))], + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date)], end_date - ) for end_date in pd.date_range('2015-01-12', '2015-01-20')] + ) for end_date in pd.date_range('2015-01-21', '2015-02-10')] ) return { @@ -1098,6 +1268,1340 @@ class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): } +class BlazeNextEstimateWindows(NextEstimateWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader(bz.data(events), columns) + + +class WithSplitAdjustedWindows(WithEstimateWindows): + """ + ZiplineTestCase mixin providing fixures and a test to test running a + Pipeline with an estimates loader over differently-sized windows and with + split adjustments. + """ + + split_adjusted_asof_date = pd.Timestamp('2015-01-14') + + @classmethod + def make_events(cls): + # Add an extra sid that has a release before the split-asof-date in + # order to test that we're reversing splits correctly in the previous + # case (without an overwrite) and in the next case (with an overwrite). + sid_30 = pd.DataFrame({ + TS_FIELD_NAME: [cls.window_test_start_date, + pd.Timestamp('2015-01-09'), + # For Q2, we want it to start early enough + # that we can have several adjustments before + # the end of the first quarter so that we + # can test un-adjusting & readjusting with an + # overwrite. + cls.window_test_start_date, + # We want the Q2 event date to be enough past + # the split-asof-date that we can have + # several splits and can make sure that they + # are applied correctly. + pd.Timestamp('2015-01-20')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-20')], + 'estimate': [130., 131., 230., 231.], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 30 + }) + + # An extra sid to test no splits before the split-adjusted-asof-date. + # We want an event before and after the split-adjusted-asof-date & + # timestamps for data points also before and after + # split-adjsuted-asof-date (but also before the split dates, so that + # we can test that splits actually get applied at the correct times). + sid_40 = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-15')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-02-10')], + 'estimate': [140., 240.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 40 + }) + + # An extra sid to test all splits before the + # split-adjusted-asof-date. All timestamps should be before that date + # so that we have cases where we un-apply and re-apply splits. + sid_50 = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-02-10')], + 'estimate': [150., 250.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 50 + }) + + return pd.concat([ + # Slightly hacky, but want to make sure we're using the same + # events as WithEstimateWindows. + cls.__base__.make_events(), + sid_30, + sid_40, + sid_50, + ]) + + @classmethod + def make_splits_data(cls): + # For sid 0, we want to apply a series of splits before and after the + # split-adjusted-asof-date we well as between quarters (for the + # previous case, where we won't see any values until after the event + # happens). + sid_0_splits = pd.DataFrame({ + SID_FIELD_NAME: 0, + 'ratio': (-1., 2., 3., 4., 5., 6., 7., 100), + 'effective_date': (pd.Timestamp('2014-01-01'), # Filter out + # Split before Q1 event & after first estimate + pd.Timestamp('2015-01-07'), + # Split before Q1 event + pd.Timestamp('2015-01-09'), + # Split before Q1 event + pd.Timestamp('2015-01-13'), + # Split before Q1 event + pd.Timestamp('2015-01-15'), + # Split before Q1 event + pd.Timestamp('2015-01-18'), + # Split after Q1 event and before Q2 event + pd.Timestamp('2015-01-30'), + # Filter out - this is after our date index + pd.Timestamp('2016-01-01')) + }) + + sid_10_splits = pd.DataFrame({ + SID_FIELD_NAME: 10, + 'ratio': (.2, .3), + 'effective_date': ( + # We want a split before the first estimate and before the + # split-adjusted-asof-date but within our calendar index so + # that we can test that the split is NEVER applied. + pd.Timestamp('2015-01-07'), + # Apply a single split before Q1 event. + pd.Timestamp('2015-01-20')), + }) + + # We want a sid with split dates that collide with another sid (0) to + # make sure splits are correctly applied for both sids. + sid_20_splits = pd.DataFrame({ + SID_FIELD_NAME: 20, + 'ratio': (.4, .5, .6, .7, .8, .9,), + 'effective_date': ( + pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-18'), + pd.Timestamp('2015-01-30')), + }) + + # This sid has event dates that are shifted back so that we can test + # cases where an event occurs before the split-asof-date. + sid_30_splits = pd.DataFrame({ + SID_FIELD_NAME: 30, + 'ratio': (8, 9, 10, 11, 12), + 'effective_date': ( + # Split before the event and before the + # split-asof-date. + pd.Timestamp('2015-01-07'), + # Split on date of event but before the + # split-asof-date. + pd.Timestamp('2015-01-09'), + # Split after the event, but before the + # split-asof-date. + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-18')), + }) + + # No splits for a sid before the split-adjusted-asof-date. + sid_40_splits = pd.DataFrame({ + SID_FIELD_NAME: 40, + 'ratio': (13, 14), + 'effective_date': ( + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-22') + ) + }) + + # No splits for a sid after the split-adjusted-asof-date. + sid_50_splits = pd.DataFrame({ + SID_FIELD_NAME: 50, + 'ratio': (15, 16), + 'effective_date': ( + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-14') + ) + }) + + return pd.concat([ + sid_0_splits, + sid_10_splits, + sid_20_splits, + sid_30_splits, + sid_40_splits, + sid_50_splits, + ]) + + +class PreviousWithSplitAdjustedWindows(WithSplitAdjustedWindows, + ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return PreviousSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + @classmethod + def make_expected_timelines(cls): + oneq_previous = pd.concat([ + pd.concat([ + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + # Undo all adjustments that haven't happened yet. + (30, 131*1/10, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150 * 1 / 15 * 1 / 16, pd.Timestamp('2015-01-09')), + ], end_date) + for end_date in pd.date_range('2015-01-09', '2015-01-12') + ]), + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150. * 1 / 16, pd.Timestamp('2015-01-09')), + ], pd.Timestamp('2015-01-13')), + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09')) + ], pd.Timestamp('2015-01-14')), + pd.concat([ + cls.create_expected_df_for_factor_compute([ + (0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131*11, pd.Timestamp('2015-01-09')), + (40, 140., pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09')), + ], end_date) + for end_date in pd.date_range('2015-01-15', '2015-01-16') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, np.NaN, cls.window_test_start_date), + (20, 121*.7*.8, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-01-20', '2015-01-21') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101, pd.Timestamp('2015-01-20')), + (10, 111*.3, pd.Timestamp('2015-01-22')), + (20, 121*.7*.8, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13*14, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-01-22', '2015-01-29') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101*7, pd.Timestamp('2015-01-20')), + (10, 111*.3, pd.Timestamp('2015-01-22')), + (20, 121*.7*.8*.9, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13*14, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-01-30', '2015-02-04') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 101*7, pd.Timestamp('2015-01-20')), + (10, 311*.3, pd.Timestamp('2015-02-05')), + (20, 121*.7*.8*.9, pd.Timestamp('2015-01-20')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 140.*13*14, pd.Timestamp('2015-01-09')), + (50, 150., pd.Timestamp('2015-01-09'))], + end_date + ) for end_date in pd.date_range('2015-02-05', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 201, pd.Timestamp('2015-02-10')), + (10, 311*.3, pd.Timestamp('2015-02-05')), + (20, 221*.8*.9, pd.Timestamp('2015-02-10')), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 240.*13*14, pd.Timestamp('2015-02-10')), + (50, 250., pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') + ), + ]) + + twoq_previous = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-01-19')] + + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, 131*11*12, pd.Timestamp('2015-01-20'))], + end_date + ) for end_date in pd.date_range('2015-01-20', '2015-02-09')] + + # We never get estimates for S1 for 2Q ago because once Q3 + # becomes our previous quarter, 2Q ago would be Q2, and we have + # no data on it. + [cls.create_expected_df_for_factor_compute( + [(0, 101*7, pd.Timestamp('2015-02-10')), + (10, np.NaN, pd.Timestamp('2015-02-05')), + (20, 121*.7*.8*.9, pd.Timestamp('2015-02-10')), + (30, 131*11*12, pd.Timestamp('2015-01-20')), + (40, 140. * 13 * 14, pd.Timestamp('2015-02-10')), + (50, 150., pd.Timestamp('2015-02-10'))], + pd.Timestamp('2015-02-10') + )] + ) + return { + 1: oneq_previous, + 2: twoq_previous + } + + +class BlazePreviousWithSplitAdjustedWindows(PreviousWithSplitAdjustedWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + +class NextWithSplitAdjustedWindows(WithSplitAdjustedWindows, ZiplineTestCase): + + @classmethod + def make_loader(cls, events, columns): + return NextSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + @classmethod + def make_expected_timelines(cls): + oneq_next = pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 100*1/4, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (20, 120*5/3, cls.window_test_start_date), + (20, 121*5/3, pd.Timestamp('2015-01-07')), + (30, 130*1/10, cls.window_test_start_date), + (30, 131*1/10, pd.Timestamp('2015-01-09')), + (40, 140, pd.Timestamp('2015-01-09')), + (50, 150.*1/15*1/16, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-09') + ), + cls.create_expected_df_for_factor_compute( + [(0, 100*1/4, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120*5/3, cls.window_test_start_date), + (20, 121*5/3, pd.Timestamp('2015-01-07')), + (30, 230*1/10, cls.window_test_start_date), + (40, np.NaN, pd.Timestamp('2015-01-10')), + (50, 250.*1/15*1/16, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-12') + ), + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07')), + (30, 230, cls.window_test_start_date), + (40, np.NaN, pd.Timestamp('2015-01-10')), + (50, 250.*1/16, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-13') + ), + cls.create_expected_df_for_factor_compute( + [(0, 100, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120, cls.window_test_start_date), + (20, 121, pd.Timestamp('2015-01-07')), + (30, 230, cls.window_test_start_date), + (40, np.NaN, pd.Timestamp('2015-01-10')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-14') + ), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 100*5, cls.window_test_start_date), + (10, 110, pd.Timestamp('2015-01-09')), + (10, 111, pd.Timestamp('2015-01-12')), + (20, 120*.7, cls.window_test_start_date), + (20, 121*.7, pd.Timestamp('2015-01-07')), + (30, 230*11, cls.window_test_start_date), + (40, 240, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-01-15', '2015-01-16') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 100*5*6, cls.window_test_start_date), + (0, 101, pd.Timestamp('2015-01-20')), + (10, 110*.3, pd.Timestamp('2015-01-09')), + (10, 111*.3, pd.Timestamp('2015-01-12')), + (20, 120*.7*.8, cls.window_test_start_date), + (20, 121*.7*.8, pd.Timestamp('2015-01-07')), + (30, 230*11*12, cls.window_test_start_date), + (30, 231, pd.Timestamp('2015-01-20')), + (40, 240*13, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-20') + ), + cls.create_expected_df_for_factor_compute( + [(0, 200 * 5 * 6, pd.Timestamp('2015-01-12')), + (10, 110 * .3, pd.Timestamp('2015-01-09')), + (10, 111 * .3, pd.Timestamp('2015-01-12')), + (20, 220 * .7 * .8, cls.window_test_start_date), + (20, 221 * .8, pd.Timestamp('2015-01-17')), + (40, 240 * 13, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-21') + ), + cls.create_expected_df_for_factor_compute( + [(0, 200 * 5 * 6, pd.Timestamp('2015-01-12')), + (10, 110 * .3, pd.Timestamp('2015-01-09')), + (10, 111 * .3, pd.Timestamp('2015-01-12')), + (20, 220 * .7 * .8, cls.window_test_start_date), + (20, 221 * .8, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-22') + ), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6, pd.Timestamp('2015-01-12')), + (10, 310*.3, pd.Timestamp('2015-01-09')), + (10, 311*.3, pd.Timestamp('2015-01-15')), + (20, 220*.7*.8, cls.window_test_start_date), + (20, 221*.8, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-01-23', '2015-01-29') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6*7, pd.Timestamp('2015-01-12')), + (10, 310*.3, pd.Timestamp('2015-01-09')), + (10, 311*.3, pd.Timestamp('2015-01-15')), + (20, 220*.7*.8*.9, cls.window_test_start_date), + (20, 221*.8*.9, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-01-30', '2015-02-05') + ]), + pd.concat([ + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6*7, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7*.8*.9, cls.window_test_start_date), + (20, 221*.8*.9, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + end_date + ) for end_date in pd.date_range('2015-02-06', '2015-02-09') + ]), + cls.create_expected_df_for_factor_compute( + [(0, 200*5*6*7, pd.Timestamp('2015-01-12')), + (0, 201, pd.Timestamp('2015-02-10')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7*.8*.9, cls.window_test_start_date), + (20, 221*.8*.9, pd.Timestamp('2015-01-17')), + (40, 240 * 13 * 14, pd.Timestamp('2015-01-15')), + (50, 250., pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-02-10') + ) + ]) + + twoq_next = pd.concat( + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, 220*5/3, cls.window_test_start_date), + (30, 230*1/10, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date), + (50, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-09') + )] + + [cls.create_expected_df_for_factor_compute( + [(0, 200*1/4, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*5/3, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-12') + )] + + [cls.create_expected_df_for_factor_compute( + [(0, 200, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-13', '2015-01-14')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200*5, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-15', '2015-01-16')] + + [cls.create_expected_df_for_factor_compute( + [(0, 200*5*6, pd.Timestamp('2015-01-12')), + (10, np.NaN, cls.window_test_start_date), + (20, 220*.7*.8, cls.window_test_start_date), + (20, 221*.8, pd.Timestamp('2015-01-17')), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-20') + )] + + [cls.create_expected_df_for_factor_compute( + [(0, np.NaN, cls.window_test_start_date), + (10, np.NaN, cls.window_test_start_date), + (20, np.NaN, cls.window_test_start_date), + (30, np.NaN, cls.window_test_start_date), + (40, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-21', '2015-02-10')] + ) + + return { + 1: oneq_next, + 2: twoq_next + } + + +class BlazeNextWithSplitAdjustedWindows(NextWithSplitAdjustedWindows): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate'], + split_adjusted_asof=cls.split_adjusted_asof_date, + ) + + +class WithSplitAdjustedMultipleEstimateColumns(WithEstimates): + """ + ZiplineTestCase mixin for having multiple estimate columns that are + split-adjusted to make sure that adjustments are applied correctly. + + Attributes + ---------- + test_start_date : pd.Timestamp + The start date of the test. + test_end_date : pd.Timestamp + The start date of the test. + split_adjusted_asof : pd.Timestamp + The split-adjusted-asof-date of the data used in the test, to be used + to create all loaders of test classes that subclass this mixin. + + Methods + ------- + make_expected_timelines_1q_out -> dict[pd.Timestamp -> dict[str -> + np.array]] + The expected array of results for each date of the date range for + each column. Only for 1 quarter out. + + make_expected_timelines_2q_out -> dict[pd.Timestamp -> dict[str -> + np.array]] + The expected array of results for each date of the date range. For 2 + quarters out, so only for the column that is requested to be loaded + with 2 quarters out. + + Tests + ----- + test_adjustments_with_multiple_adjusted_columns + Tests that if you have multiple columns, we still split-adjust + correctly. + + test_multiple_datasets_different_num_announcements + Tests that if you have multiple datasets that ask for a different + number of quarters out, and each asks for a different estimates column, + we still split-adjust correctly. + """ + END_DATE = pd.Timestamp('2015-02-10') + test_start_date = pd.Timestamp('2015-01-06', tz='utc') + test_end_date = pd.Timestamp('2015-01-12', tz='utc') + split_adjusted_asof = pd.Timestamp('2015-01-08') + + @classmethod + def make_columns(cls): + return { + MultipleColumnsEstimates.event_date: 'event_date', + MultipleColumnsEstimates.fiscal_quarter: 'fiscal_quarter', + MultipleColumnsEstimates.fiscal_year: 'fiscal_year', + MultipleColumnsEstimates.estimate1: 'estimate1', + MultipleColumnsEstimates.estimate2: 'estimate2' + } + + @classmethod + def make_events(cls): + sid_0_events = pd.DataFrame({ + # We only want a stale KD here so that adjustments + # will be applied. + TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-05')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12')], + 'estimate1': [1100., 1200.], + 'estimate2': [2100., 2200.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 0, + }) + + # This is just an extra sid to make sure that we apply adjustments + # correctly for multiple columns when we have multiple sids. + sid_1_events = pd.DataFrame({ + # We only want a stale KD here so that adjustments + # will be applied. + TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-05')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-08'), + pd.Timestamp('2015-01-11')], + 'estimate1': [1110., 1210.], + 'estimate2': [2110., 2210.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 1, + }) + return pd.concat([sid_0_events, sid_1_events]) + + @classmethod + def make_splits_data(cls): + sid_0_splits = pd.DataFrame({ + SID_FIELD_NAME: 0, + 'ratio': (.3, 3.), + 'effective_date': (pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-09')), + }) + + sid_1_splits = pd.DataFrame({ + SID_FIELD_NAME: 1, + 'ratio': (.4, 4.), + 'effective_date': (pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-09')), + }) + + return pd.concat([sid_0_splits, sid_1_splits]) + + @classmethod + def make_expected_timelines_1q_out(cls): + return {} + + @classmethod + def make_expected_timelines_2q_out(cls): + return {} + + @classmethod + def init_class_fixtures(cls): + super( + WithSplitAdjustedMultipleEstimateColumns, cls + ).init_class_fixtures() + cls.timelines_1q_out = cls.make_expected_timelines_1q_out() + cls.timelines_2q_out = cls.make_expected_timelines_2q_out() + + def test_adjustments_with_multiple_adjusted_columns(self): + dataset = MultipleColumnsQuartersEstimates(1) + timelines = self.timelines_1q_out + window_len = 3 + + class SomeFactor(CustomFactor): + inputs = [dataset.estimate1, dataset.estimate2] + window_length = window_len + + def compute(self, today, assets, out, estimate1, estimate2): + assert_almost_equal(estimate1, timelines[today]['estimate1']) + assert_almost_equal(estimate2, timelines[today]['estimate2']) + + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + engine.run_pipeline( + Pipeline({'est': SomeFactor()}), + start_date=self.test_start_date, + # last event date we have + end_date=self.test_end_date, + ) + + def test_multiple_datasets_different_num_announcements(self): + dataset1 = MultipleColumnsQuartersEstimates(1) + dataset2 = MultipleColumnsQuartersEstimates(2) + timelines_1q_out = self.timelines_1q_out + timelines_2q_out = self.timelines_2q_out + window_len = 3 + + class SomeFactor1(CustomFactor): + inputs = [dataset1.estimate1] + window_length = window_len + + def compute(self, today, assets, out, estimate1): + assert_almost_equal( + estimate1, timelines_1q_out[today]['estimate1'] + ) + + class SomeFactor2(CustomFactor): + inputs = [dataset2.estimate2] + window_length = window_len + + def compute(self, today, assets, out, estimate2): + assert_almost_equal( + estimate2, timelines_2q_out[today]['estimate2'] + ) + + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + engine.run_pipeline( + Pipeline({'est1': SomeFactor1(), 'est2': SomeFactor2()}), + start_date=self.test_start_date, + # last event date we have + end_date=self.test_end_date, + ) + + +class PreviousWithSplitAdjustedMultipleEstimateColumns( + WithSplitAdjustedMultipleEstimateColumns, ZiplineTestCase +): + @classmethod + def make_loader(cls, events, columns): + return PreviousSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + @classmethod + def make_expected_timelines_1q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 3), + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 3), + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 2 + + [[np.NaN, 1110.]]), + 'estimate2': np.array([[np.NaN, np.NaN]] * 2 + + [[np.NaN, 2110.]]) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] + + [[np.NaN, 1110. * 4]] + + [[1100 * 3., 1110. * 4]]), + 'estimate2': np.array([[np.NaN, np.NaN]] + + [[np.NaN, 2110. * 4]] + + [[2100 * 3., 2110. * 4]]) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] * 2 + + [[1200 * 3., 1210. * 4]]), + 'estimate2': np.array([[np.NaN, np.NaN]] * 2 + + [[2200 * 3., 2210. * 4]]) + } + } + + @classmethod + def make_expected_timelines_2q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 2 + + [[2100 * 3., 2110. * 4]]) + } + } + + +class BlazePreviousWithMultipleEstimateColumns( + PreviousWithSplitAdjustedMultipleEstimateColumns +): + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + +class NextWithSplitAdjustedMultipleEstimateColumns( + WithSplitAdjustedMultipleEstimateColumns, ZiplineTestCase +): + @classmethod + def make_loader(cls, events, columns): + return NextSplitAdjustedEarningsEstimatesLoader( + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + @classmethod + def make_expected_timelines_1q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate1': np.array([[np.NaN, np.NaN]] + + [[1100. * 1/.3, 1110. * 1/.4]] * 2), + 'estimate2': np.array([[np.NaN, np.NaN]] + + [[2100. * 1/.3, 2110. * 1/.4]] * 2), + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate1': np.array([[1100., 1110.]] * 3), + 'estimate2': np.array([[2100., 2110.]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate1': np.array([[1100., 1110.]] * 3), + 'estimate2': np.array([[2100., 2110.]] * 3) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate1': np.array([[1100 * 3., 1210. * 4]] * 3), + 'estimate2': np.array([[2100 * 3., 2210. * 4]] * 3) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate1': np.array([[1200 * 3., np.NaN]] * 3), + 'estimate2': np.array([[2200 * 3., np.NaN]] * 3) + } + } + + @classmethod + def make_expected_timelines_2q_out(cls): + return { + pd.Timestamp('2015-01-06', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] + + [[2200 * 1/.3, 2210. * 1/.4]] * 2) + }, + pd.Timestamp('2015-01-07', tz='utc'): { + 'estimate2': np.array([[2200., 2210.]] * 3) + }, + pd.Timestamp('2015-01-08', tz='utc'): { + 'estimate2': np.array([[2200, 2210.]] * 3) + }, + pd.Timestamp('2015-01-09', tz='utc'): { + 'estimate2': np.array([[2200 * 3., np.NaN]] * 3) + }, + pd.Timestamp('2015-01-12', tz='utc'): { + 'estimate2': np.array([[np.NaN, np.NaN]] * 3) + } + } + + +class BlazeNextWithMultipleEstimateColumns( + NextWithSplitAdjustedMultipleEstimateColumns +): + @classmethod + def make_loader(cls, events, columns): + return BlazeNextSplitAdjustedEstimatesLoader( + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate1', 'estimate2'], + split_adjusted_asof=cls.split_adjusted_asof, + ) + + +class WithAdjustmentBoundaries(WithEstimates): + """ + ZiplineTestCase mixin providing class-level attributes, methods, + and a test to make sure that when the split-adjusted-asof-date is not + strictly within the date index, we can still apply adjustments correctly. + + Attributes + ---------- + split_adjusted_before_start : pd.Timestamp + A split-adjusted-asof-date before the start date of the test. + split_adjusted_after_end : pd.Timestamp + A split-adjusted-asof-date before the end date of the test. + split_adjusted_asof_dates : list of tuples of pd.Timestamp + All the split-adjusted-asof-dates over which we want to parameterize + the test. + + Methods + ------- + make_expected_out -> dict[pd.Timestamp -> pd.DataFrame] + A dictionary of the expected output of the pipeline at each of the + dates of interest. + """ + START_DATE = pd.Timestamp('2015-01-04') + # We want to run the pipeline starting from `START_DATE`, but the + # pipeline results will start from the next day, which is + # `test_start_date`. + test_start_date = pd.Timestamp('2015-01-05') + END_DATE = test_end_date = pd.Timestamp('2015-01-12') + split_adjusted_before_start = ( + test_start_date - timedelta(days=1) + ) + split_adjusted_after_end = ( + test_end_date + timedelta(days=1) + ) + # Must parametrize over this because there can only be 1 such date for + # each set of data. + split_adjusted_asof_dates = [(test_start_date,), + (test_end_date,), + (split_adjusted_before_start,), + (split_adjusted_after_end,)] + + @classmethod + def init_class_fixtures(cls): + super(WithAdjustmentBoundaries, cls).init_class_fixtures() + cls.s0 = cls.asset_finder.retrieve_asset(0) + cls.s1 = cls.asset_finder.retrieve_asset(1) + cls.s2 = cls.asset_finder.retrieve_asset(2) + cls.s3 = cls.asset_finder.retrieve_asset(3) + cls.s4 = cls.asset_finder.retrieve_asset(4) + cls.expected = cls.make_expected_out() + + @classmethod + def make_events(cls): + # We can create a sid for each configuration of dates for KDs, events, + # and splits. For this test we don't care about overwrites so we only + # test 1 quarter. + sid_0_timeline = pd.DataFrame({ + # KD on first date of index + TS_FIELD_NAME: cls.test_start_date, + EVENT_DATE_FIELD_NAME: pd.Timestamp('2015-01-09'), + 'estimate': 10., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 0, + }, index=[0]) + + sid_1_timeline = pd.DataFrame({ + TS_FIELD_NAME: cls.test_start_date, + # event date on first date of index + EVENT_DATE_FIELD_NAME: cls.test_start_date, + 'estimate': 11., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 1, + }, index=[0]) + + sid_2_timeline = pd.DataFrame({ + # KD on first date of index + TS_FIELD_NAME: cls.test_end_date, + EVENT_DATE_FIELD_NAME: cls.test_end_date + timedelta(days=1), + 'estimate': 12., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 2, + }, index=[0]) + + sid_3_timeline = pd.DataFrame({ + TS_FIELD_NAME: cls.test_end_date - timedelta(days=1), + EVENT_DATE_FIELD_NAME: cls.test_end_date, + 'estimate': 13., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 3, + }, index=[0]) + + # KD and event date don't fall on date index boundaries + sid_4_timeline = pd.DataFrame({ + TS_FIELD_NAME: cls.test_end_date - timedelta(days=1), + EVENT_DATE_FIELD_NAME: cls.test_end_date - timedelta(days=1), + 'estimate': 14., + FISCAL_QUARTER_FIELD_NAME: 1, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 4, + }, index=[0]) + + return pd.concat([sid_0_timeline, + sid_1_timeline, + sid_2_timeline, + sid_3_timeline, + sid_4_timeline]) + + @classmethod + def make_splits_data(cls): + # Here we want splits that collide + sid_0_splits = pd.DataFrame({ + SID_FIELD_NAME: 0, + 'ratio': .10, + 'effective_date': cls.test_start_date, + }, index=[0]) + + sid_1_splits = pd.DataFrame({ + SID_FIELD_NAME: 1, + 'ratio': .11, + 'effective_date': cls.test_start_date, + }, index=[0]) + + sid_2_splits = pd.DataFrame({ + SID_FIELD_NAME: 2, + 'ratio': .12, + 'effective_date': cls.test_end_date, + }, index=[0]) + + sid_3_splits = pd.DataFrame({ + SID_FIELD_NAME: 3, + 'ratio': .13, + 'effective_date': cls.test_end_date, + }, index=[0]) + + # We want 2 splits here - at the starting boundary and at the end + # boundary - while there is no collision with KD/event date for the + # sid. + sid_4_splits = pd.DataFrame({ + SID_FIELD_NAME: 4, + 'ratio': (.14, .15), + 'effective_date': (cls.test_start_date, cls.test_end_date), + }) + + return pd.concat([sid_0_splits, + sid_1_splits, + sid_2_splits, + sid_3_splits, + sid_4_splits]) + + @parameterized.expand(split_adjusted_asof_dates) + def test_boundaries(self, split_date): + dataset = QuartersEstimates(1) + loader = self.loader(split_adjusted_asof=split_date) + engine = SimplePipelineEngine( + lambda x: loader, + self.trading_days, + self.asset_finder, + ) + result = engine.run_pipeline( + Pipeline({'estimate': dataset.estimate.latest}), + start_date=self.trading_days[0], + # last event date we have + end_date=self.trading_days[-1], + ) + expected = self.expected[split_date] + assert_frame_equal(result, expected, check_names=False) + + @classmethod + def make_expected_out(cls): + return {} + + +class PreviousWithAdjustmentBoundaries(WithAdjustmentBoundaries, + ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return partial(PreviousSplitAdjustedEarningsEstimatesLoader, + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + @classmethod + def make_expected_out(cls): + split_adjusted_at_start_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': np.NaN, + }, index=pd.date_range( + cls.test_start_date, + pd.Timestamp('2015-01-08'), + tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10., + }, index=pd.date_range( + pd.Timestamp('2015-01-09'), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': np.NaN + }, index=pd.date_range(cls.test_start_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(1), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13. * .13 + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(2), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14. * .15 + }, index=pd.date_range( + cls.test_end_date - timedelta(1), cls.test_end_date, tz='utc' + )), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack( + SID_FIELD_NAME, dropna=False) + + split_adjusted_at_end_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': np.NaN, + }, index=pd.date_range( + cls.test_start_date, pd.Timestamp('2015-01-08'), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10., + }, index=pd.date_range( + pd.Timestamp('2015-01-09'), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': np.NaN + }, index=pd.date_range(cls.test_start_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(1), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13. + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': np.NaN + }, index=pd.date_range( + cls.test_start_date, cls.test_end_date - timedelta(2), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14. + }, index=pd.date_range(cls.test_end_date - timedelta(1), + cls.test_end_date, + tz='utc')), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack(SID_FIELD_NAME, + dropna=False) + + split_adjusted_before_start_boundary = split_adjusted_at_start_boundary + split_adjusted_after_end_boundary = split_adjusted_at_end_boundary + + return {cls.test_start_date: + split_adjusted_at_start_boundary, + cls.split_adjusted_before_start: + split_adjusted_before_start_boundary, + cls.test_end_date: + split_adjusted_at_end_boundary, + cls.split_adjusted_after_end: + split_adjusted_after_end_boundary} + + +class BlazePreviousWithAdjustmentBoundaries(PreviousWithAdjustmentBoundaries): + @classmethod + def make_loader(cls, events, columns): + return partial(BlazePreviousSplitAdjustedEstimatesLoader, + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + +class NextWithAdjustmentBoundaries(WithAdjustmentBoundaries, + ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return partial(NextSplitAdjustedEarningsEstimatesLoader, + events, + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + @classmethod + def make_expected_out(cls): + split_adjusted_at_start_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10, + }, index=pd.date_range( + cls.test_start_date, pd.Timestamp('2015-01-09'), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, + cls.test_start_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': 12., + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13. * .13, + }, index=pd.date_range( + cls.test_end_date - timedelta(1), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14., + }, index=pd.date_range( + cls.test_end_date - timedelta(1), + cls.test_end_date - timedelta(1), + tz='utc' + )), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack( + SID_FIELD_NAME, dropna=False) + + split_adjusted_at_end_boundary = pd.concat([ + pd.DataFrame({ + SID_FIELD_NAME: cls.s0, + 'estimate': 10, + }, index=pd.date_range( + cls.test_start_date, pd.Timestamp('2015-01-09'), tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s1, + 'estimate': 11., + }, index=pd.date_range(cls.test_start_date, + cls.test_start_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s2, + 'estimate': 12., + }, index=pd.date_range(cls.test_end_date, + cls.test_end_date, + tz='utc')), + pd.DataFrame({ + SID_FIELD_NAME: cls.s3, + 'estimate': 13., + }, index=pd.date_range( + cls.test_end_date - timedelta(1), cls.test_end_date, tz='utc' + )), + pd.DataFrame({ + SID_FIELD_NAME: cls.s4, + 'estimate': 14., + }, index=pd.date_range( + cls.test_end_date - timedelta(1), + cls.test_end_date - timedelta(1), + tz='utc' + )), + ]).set_index(SID_FIELD_NAME, append=True).unstack( + SID_FIELD_NAME).reindex(cls.trading_days).stack( + SID_FIELD_NAME, dropna=False) + + split_adjusted_before_start_boundary = split_adjusted_at_start_boundary + split_adjusted_after_end_boundary = split_adjusted_at_end_boundary + + return {cls.test_start_date: + split_adjusted_at_start_boundary, + cls.split_adjusted_before_start: + split_adjusted_before_start_boundary, + cls.test_end_date: + split_adjusted_at_end_boundary, + cls.split_adjusted_after_end: + split_adjusted_after_end_boundary} + + +class BlazeNextWithAdjustmentBoundaries(NextWithAdjustmentBoundaries): + @classmethod + def make_loader(cls, events, columns): + return partial(BlazeNextSplitAdjustedEstimatesLoader, + bz.data(events), + columns, + split_adjustments_loader=cls.adjustment_reader, + split_adjusted_column_names=['estimate']) + + class QuarterShiftTestCase(ZiplineTestCase): """ This tests, in isolation, quarter calculation logic for shifting quarters diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py index 3cb1064a..bf046a9b 100644 --- a/zipline/pipeline/loaders/blaze/estimates.py +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -17,7 +17,8 @@ from zipline.pipeline.loaders.earnings_estimates import ( PreviousEarningsEstimatesLoader, required_estimates_fields, metadata_columns, -) + PreviousSplitAdjustedEarningsEstimatesLoader, + NextSplitAdjustedEarningsEstimatesLoader) from zipline.pipeline.loaders.utils import ( check_data_query_args, ) @@ -108,6 +109,7 @@ class BlazeEstimatesLoader(PipelineLoader): # Only load requested columns. requested_column_names = [self._columns[column.name] for column in columns] + raw = load_raw_data( assets, dates, @@ -120,7 +122,7 @@ class BlazeEstimatesLoader(PipelineLoader): return self.loader( raw, - {column.name: self._columns[column.name] for column in columns} + {column.name: self._columns[column.name] for column in columns}, ).load_adjusted_array( columns, dates, @@ -135,3 +137,65 @@ class BlazeNextEstimatesLoader(BlazeEstimatesLoader): class BlazePreviousEstimatesLoader(BlazeEstimatesLoader): loader = PreviousEarningsEstimatesLoader + + +class BlazeSplitAdjustedEstimatesLoader(BlazeEstimatesLoader): + def __init__(self, + expr, + columns, + split_adjustments_loader, + split_adjusted_column_names, + split_adjusted_asof, + **kwargs): + self._split_adjustments = split_adjustments_loader + self._split_adjusted_column_names = split_adjusted_column_names + self._split_adjusted_asof = split_adjusted_asof + super(BlazeSplitAdjustedEstimatesLoader, self).__init__( + expr, + columns, + **kwargs + ) + + def load_adjusted_array(self, columns, dates, assets, mask): + # Only load requested columns. + requested_column_names = [self._columns[column.name] + for column in columns] + + requested_spilt_adjusted_columns = [ + column_name + for column_name in self._split_adjusted_column_names + if column_name in requested_column_names + ] + + raw = load_raw_data( + assets, + dates, + self._data_query_time, + self._data_query_tz, + self._expr[sorted(metadata_columns.union(requested_column_names))], + self._odo_kwargs, + checkpoints=self._checkpoints, + ) + + return self.loader( + raw, + {column.name: self._columns[column.name] for column in columns}, + self._split_adjustments, + requested_spilt_adjusted_columns, + self._split_adjusted_asof, + ).load_adjusted_array( + columns, + dates, + assets, + mask, + ) + + +class BlazeNextSplitAdjustedEstimatesLoader(BlazeSplitAdjustedEstimatesLoader): + loader = NextSplitAdjustedEarningsEstimatesLoader + + +class BlazePreviousSplitAdjustedEstimatesLoader( + BlazeSplitAdjustedEstimatesLoader +): + loader = PreviousSplitAdjustedEarningsEstimatesLoader diff --git a/zipline/pipeline/loaders/earnings_estimates.py b/zipline/pipeline/loaders/earnings_estimates.py index 1fd6dcb9..367301a9 100644 --- a/zipline/pipeline/loaders/earnings_estimates.py +++ b/zipline/pipeline/loaders/earnings_estimates.py @@ -1,5 +1,4 @@ from abc import abstractmethod, abstractproperty -from collections import defaultdict import numpy as np import pandas as pd @@ -11,6 +10,7 @@ from zipline.lib.adjustment import ( Datetime641DArrayOverwrite, Datetime64Overwrite, Float641DArrayOverwrite, + Float64Multiply, Float64Overwrite, ) @@ -91,11 +91,23 @@ def validate_column_specs(events, columns): ) +def add_new_adjustments(adjustments_dict, + adjustments, + column_name, + ts): + try: + adjustments_dict[column_name][ts].extend(adjustments) + except KeyError: + adjustments_dict[column_name][ts] = adjustments + + class EarningsEstimatesLoader(PipelineLoader): """ An abstract pipeline loader for estimates data that can load data a variable number of quarters forwards/backwards from calendar dates depending on the `num_announcements` attribute of the columns' dataset. + If split adjustments are to be applied, a loader, split-adjusted columns, + and the split-adjusted asof-date must be supplied. Parameters ---------- @@ -152,7 +164,7 @@ class EarningsEstimatesLoader(PipelineLoader): self.name_map = name_map @abstractmethod - def get_zeroth_quarter_idx(self, num_announcements, last, dates): + def get_zeroth_quarter_idx(self, stacked_last_per_qtr): raise NotImplementedError('get_zeroth_quarter_idx') @abstractmethod @@ -167,7 +179,9 @@ class EarningsEstimatesLoader(PipelineLoader): next_qtr_start_idx, requested_quarter, sid, - sid_idx): + sid_idx, + col_to_split_adjustments, + split_adjusted_asof_idx): raise NotImplementedError('create_overwrite_for_estimate') @abstractproperty @@ -193,6 +207,9 @@ class EarningsEstimatesLoader(PipelineLoader): stacked_last_per_qtr : pd.DataFrame The latest estimate known with the dates, normalized quarter, and sid as the index. + num_announcements : int + The number of annoucements out the user requested relative to + each date in the calendar dates. dates : pd.DatetimeIndex The calendar dates for which estimates data is requested. @@ -236,13 +253,185 @@ class EarningsEstimatesLoader(PipelineLoader): # dates so that we have a value for each calendar date. return requested_qtr_data.unstack(SID_FIELD_NAME).reindex(dates) + def get_split_adjusted_asof_idx(self, dates): + """ + Compute the index in `dates` where the split-adjusted-asof-date + falls. This is the date up to which, and including which, we will + need to unapply all adjustments for and then re-apply them as they + come in. After this date, adjustments are applied as normal. + + Parameters + ---------- + dates : pd.DatetimeIndex + The calendar dates over which the Pipeline is being computed. + + Returns + ------- + split_adjusted_asof_idx : int + The index in `dates` at which the data should be split. + """ + split_adjusted_asof_idx = dates.searchsorted( + self._split_adjusted_asof + ) + # The split-asof date is after the date index. + if split_adjusted_asof_idx == len(dates): + split_adjusted_asof_idx = len(dates) - 1 + elif self._split_adjusted_asof < dates[0].tz_localize(None): + split_adjusted_asof_idx = -1 + return split_adjusted_asof_idx + + def collect_overwrites_for_sid(self, + group, + dates, + requested_qtr_data, + last_per_qtr, + sid_idx, + columns, + all_adjustments_for_sid, + sid): + """ + Given a sid, collect all overwrites that should be applied for this + sid at each quarter boundary. + + Parameters + ---------- + group : pd.DataFrame + The data for `sid`. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + requested_qtr_data : pd.DataFrame + The DataFrame with the latest values for the requested quarter + for all columns. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter. + sid_idx : int + The sid's index in the asset index. + columns : list of BoundColumn + The columns for which the overwrites should be computed. + all_adjustments_for_sid : dict[int -> AdjustedArray] + A dictionary of the integer index of each timestamp into the date + index, mapped to adjustments that should be applied at that + index for the given sid (`sid`). This dictionary is modified as + adjustments are collected. + sid : int + The sid for which overwrites should be computed. + """ + next_qtr_start_indices = dates.searchsorted( + group[EVENT_DATE_FIELD_NAME].values, + side=self.searchsorted_side, + ) + + qtrs_with_estimates = group.index.get_level_values( + NORMALIZED_QUARTERS + ).values + for idx in next_qtr_start_indices: + if 0 < idx < len(dates): + # Find the quarter being requested in the quarter we're + # crossing into. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS, sid, + ].iloc[idx] + # Only add adjustments if the next quarter starts somewhere + # in our date index for this sid. Our 'next' quarter can + # never start at index 0; a starting index of 0 means that + # the next quarter's event date was NaT. + self.create_overwrites_for_quarter( + all_adjustments_for_sid, + idx, + last_per_qtr, + qtrs_with_estimates, + requested_quarter, + sid, + sid_idx, + columns + ) + + def get_adjustments_for_sid(self, + group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx, + columns, + col_to_all_adjustments, + **kwargs): + """ + + Parameters + ---------- + group : pd.DataFrame + The data for the given sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + requested_qtr_data : pd.DataFrame + The DataFrame with the latest values for the requested quarter + for all columns. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter. + sid_to_idx : dict[int -> int] + A dictionary mapping sid to he sid's index in the asset index. + columns : list of BoundColumn + The columns for which the overwrites should be computed. + col_to_all_adjustments : dict[int -> AdjustedArray] + A dictionary of the integer index of each timestamp into the date + index, mapped to adjustments that should be applied at that + index. This dictionary is for adjustments for ALL sids. It is + modified as adjustments are collected. + kwargs : + Additional arguments used in collecting adjustments; unused here. + """ + # Collect all adjustments for a given sid. + all_adjustments_for_sid = {} + sid = int(group.name) + self.collect_overwrites_for_sid(group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx[sid], + columns, + all_adjustments_for_sid, + sid) + self.merge_into_adjustments_for_all_sids( + all_adjustments_for_sid, col_to_all_adjustments + ) + + def merge_into_adjustments_for_all_sids(self, + all_adjustments_for_sid, + col_to_all_adjustments): + """ + Merge adjustments for a particular sid into a dictionary containing + adjustments for all sids. + + Parameters + ---------- + all_adjustments_for_sid : dict[int -> AdjustedArray] + All adjustments for a particular sid. + col_to_all_adjustments : dict[int -> AdjustedArray] + All adjustments for all sids. + """ + + for col_name in all_adjustments_for_sid: + if col_name not in col_to_all_adjustments: + col_to_all_adjustments[col_name] = {} + for ts in all_adjustments_for_sid[col_name]: + adjs = all_adjustments_for_sid[col_name][ts] + add_new_adjustments(col_to_all_adjustments, + adjs, + col_name, + ts) + def get_adjustments(self, zero_qtr_data, requested_qtr_data, last_per_qtr, dates, assets, - columns): + columns, + **kwargs): """ Creates an AdjustedArray from the given estimates data for the given dates. @@ -263,13 +452,17 @@ class EarningsEstimatesLoader(PipelineLoader): An index of all the assets from the raw data. columns : list of BoundColumn The columns for which adjustments need to be calculated. + kwargs : + Additional keyword arguments that should be forwarded to + `get_adjustments_for_sid` and to be used in computing adjustments + for each sid. Returns ------- - adjusted_array : AdjustedArray - The array of data and overwrites for the given column. + col_to_all_adjustments : dict[int -> AdjustedArray] + A dictionary of all adjustments that should be applied. """ - col_to_overwrites = defaultdict(dict) + zero_qtr_data.sort_index(inplace=True) # Here we want to get the LAST record from each group of records # corresponding to a single quarter. This is to ensure that we select @@ -278,50 +471,29 @@ class EarningsEstimatesLoader(PipelineLoader): level=[SID_FIELD_NAME, NORMALIZED_QUARTERS] ).nth(-1) + col_to_all_adjustments = {} sid_to_idx = dict(zip(assets, range(len(assets)))) + quarter_shifts.groupby(level=SID_FIELD_NAME).apply( + self.get_adjustments_for_sid, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx, + columns, + col_to_all_adjustments, + **kwargs + ) + return col_to_all_adjustments - for column in columns: - column_name = self.name_map[column.name] - col_to_overwrites[column_name] = defaultdict(list) - - def collect_adjustments(group): - next_qtr_start_indices = dates.searchsorted( - group[EVENT_DATE_FIELD_NAME].values, - side=self.searchsorted_side, - ) - sid = int(group.name) - qtrs_with_estimates = group.index.get_level_values( - NORMALIZED_QUARTERS - ).values - for idx in next_qtr_start_indices: - if 0 < idx < len(dates): - # Only add adjustments if the next quarter starts somewhere - # in our date index for this sid. Our 'next' quarter can - # never start at index 0; a starting index of 0 means that - # the next quarter's event date was NaT. - self.create_overwrite_for_quarter( - col_to_overwrites, - idx, - last_per_qtr, - qtrs_with_estimates, - requested_qtr_data, - sid, - sid_to_idx[sid], - columns, - ) - - quarter_shifts.groupby(level=SID_FIELD_NAME).apply(collect_adjustments) - return col_to_overwrites - - def create_overwrite_for_quarter(self, - col_to_overwrites, - next_qtr_start_idx, - last_per_qtr, - quarters_with_estimates_for_sid, - requested_qtr_data, - sid, - sid_idx, - columns): + def create_overwrites_for_quarter(self, + col_to_overwrites, + next_qtr_start_idx, + last_per_qtr, + quarters_with_estimates_for_sid, + requested_quarter, + sid, + sid_idx, + columns): """ Add entries to the dictionary of columns to adjustments for the given sid and the given quarter. @@ -343,6 +515,8 @@ class EarningsEstimatesLoader(PipelineLoader): quarters_with_estimates_for_sid : np.array An array of all quarters for which there are estimates for the given sid. + requested_quarter : float + The quarter for which the overwrite should be created. sid : int The sid for which to create overwrites. sid_idx : int @@ -350,45 +524,42 @@ class EarningsEstimatesLoader(PipelineLoader): columns : list of BoundColumn The columns for which to create overwrites. """ - - # Find the quarter being requested in the quarter we're - # crossing into. - requested_quarter = requested_qtr_data[ - SHIFTED_NORMALIZED_QTRS, sid, - ].iloc[next_qtr_start_idx] for col in columns: column_name = self.name_map[col.name] + if column_name not in col_to_overwrites: + col_to_overwrites[column_name] = {} # If there are estimates for the requested quarter, # overwrite all values going up to the starting index of # that quarter with estimates for that quarter. if requested_quarter in quarters_with_estimates_for_sid: - col_to_overwrites[column_name][next_qtr_start_idx].append( - self.create_overwrite_for_estimate( - col, - column_name, - last_per_qtr, - next_qtr_start_idx, - requested_quarter, - sid, - sid_idx - ), + adjs = self.create_overwrite_for_estimate( + col, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx, ) + add_new_adjustments(col_to_overwrites, + adjs, + column_name, + next_qtr_start_idx) # There are no estimates for the quarter. Overwrite all # values going up to the starting index of that quarter # with the missing value for this column. else: - col_to_overwrites[column_name][next_qtr_start_idx].append( - self.overwrite_with_null( + adjs = [self.overwrite_with_null( col, - last_per_qtr.index, next_qtr_start_idx, - sid_idx - ), - ) + sid_idx)] + add_new_adjustments(col_to_overwrites, + adjs, + column_name, + next_qtr_start_idx) def overwrite_with_null(self, column, - dates, next_qtr_start_idx, sid_idx): return self.scalar_overwrites_dict[column.dtype]( @@ -553,10 +724,13 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): next_qtr_start_idx, requested_quarter, sid, - sid_idx): - return self.array_overwrites_dict[column.dtype]( + sid_idx, + col_to_split_adjustments=None, + split_adjusted_asof_idx=None): + # if not isinstance(sid_idx, int): + # import pdb; pdb.set_trace() + return [self.array_overwrites_dict[column.dtype]( 0, - # overwrite thru last qtr next_qtr_start_idx - 1, sid_idx, sid_idx, @@ -565,7 +739,7 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): requested_quarter, sid, ].values[:next_qtr_start_idx], - ) + )] def get_shifted_qtrs(self, zero_qtrs, num_announcements): return zero_qtrs + (num_announcements - 1) @@ -611,13 +785,15 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): next_qtr_start_idx, requested_quarter, sid, - sid_idx): - return self.overwrite_with_null( + sid_idx, + col_to_split_adjustments=None, + split_adjusted_asof_idx=None, + split_dict=None): + return [self.overwrite_with_null( column, - dates, next_qtr_start_idx, sid_idx, - ) + )] def get_shifted_qtrs(self, zero_qtrs, num_announcements): return zero_qtrs - (num_announcements - 1) @@ -651,3 +827,699 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): # sorted by event date. ).nth(-1) return previous_releases_per_date.index + + +def validate_split_adjusted_column_specs(name_map, columns): + to_be_split = set(columns) + available = set(name_map.keys()) + extra = to_be_split - available + if extra: + raise ValueError( + "EarningsEstimatesLoader got the following extra columns to be " + "split-adjusted: {extra}.\n" + "Got Columns: {to_be_split}\n" + "Available Columns: {available}".format( + extra=sorted(extra), + to_be_split=sorted(to_be_split), + available=sorted(available), + ) + ) + + +class SplitAdjustedEstimatesLoader(EarningsEstimatesLoader): + """ + Estimates loader that loads data that needs to be split-adjusted. + + Parameters + ---------- + split_adjustments_loader : SQLiteAdjustmentReader + The loader to use for reading split adjustments. + split_adjusted_column_names : iterable of str + The column names that should be split-adjusted. + split_adjusted_asof : pd.Timestamp + The date that separates data into 2 halves: the first half is the set + of dates up to and including the split_adjusted_asof date. All + adjustments occurring during this first half are applied to all + dates in this first half. The second half is the set of dates after + the split_adjusted_asof date. All adjustments occurring during this + second half are applied sequentially as they appear in the timeline. + """ + def __init__(self, + estimates, + name_map, + split_adjustments_loader, + split_adjusted_column_names, + split_adjusted_asof): + validate_split_adjusted_column_specs(name_map, + split_adjusted_column_names) + self._split_adjustments = split_adjustments_loader + self._split_adjusted_column_names = split_adjusted_column_names + self._split_adjusted_asof = split_adjusted_asof + self._split_adjustment_dict = {} + super(SplitAdjustedEstimatesLoader, self).__init__( + estimates, + name_map + ) + + @abstractmethod + def collect_split_adjustments(self, + adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + raise NotImplementedError('collect_split_adjustments') + + def get_adjustments_for_sid(self, + group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx, + columns, + col_to_all_adjustments, + split_adjusted_asof_idx=None, + split_adjusted_cols_for_group=None): + """ + Collects both overwrites and adjustments for a particular sid. + + Parameters + ---------- + split_adjusted_asof_idx : int + The integer index of the date on which the data was split-adjusted. + split_adjusted_cols_for_group : list of str + The names of requested columns that should also be split-adjusted. + """ + all_adjustments_for_sid = {} + sid = int(group.name) + self.collect_overwrites_for_sid(group, + dates, + requested_qtr_data, + last_per_qtr, + sid_to_idx[sid], + columns, + all_adjustments_for_sid, + sid) + (pre_adjustments, + post_adjustments) = self.retrieve_split_adjustment_data_for_sid( + dates, sid, split_adjusted_asof_idx + ) + sid_estimates = self.estimates[ + self.estimates[SID_FIELD_NAME] == sid + ] + # We might not have any overwrites but still have + # adjustments, and we will need to manually add columns if + # that is the case. + for col_name in split_adjusted_cols_for_group: + if col_name not in all_adjustments_for_sid: + all_adjustments_for_sid[col_name] = {} + + self.collect_split_adjustments( + all_adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_to_idx[sid], + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + split_adjusted_cols_for_group + ) + self.merge_into_adjustments_for_all_sids( + all_adjustments_for_sid, col_to_all_adjustments + ) + + def get_adjustments(self, + zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets, + columns, + **kwargs): + """ + Calculates both split adjustments and overwrites for all sids. + """ + split_adjusted_cols_for_group = [ + self.name_map[col.name] + for col in columns + if self.name_map[col.name] in self._split_adjusted_column_names + ] + # Add all splits to the adjustment dict for this sid. + split_adjusted_asof_idx = self.get_split_adjusted_asof_idx( + dates + ) + return super(SplitAdjustedEstimatesLoader, self).get_adjustments( + zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets, + columns, + split_adjusted_cols_for_group=split_adjusted_cols_for_group, + split_adjusted_asof_idx=split_adjusted_asof_idx + ) + + def determine_end_idx_for_adjustment(self, + adjustment_ts, + dates, + upper_bound, + requested_quarter, + sid_estimates): + """ + Determines the date until which the adjustment at the given date + index should be applied for the given quarter. + + Parameters + ---------- + adjustment_ts : pd.Timestamp + The timestamp at which the adjustment occurs. + dates : pd.DatetimeIndex + The calendar dates over which the Pipeline is being computed. + upper_bound : int + The index of the upper bound in the calendar dates. This is the + index until which the adjusment will be applied unless there is + information for the requested quarter that comes in on or before + that date. + requested_quarter : float + The quarter for which we are determining how the adjustment + should be applied. + sid_estimates : pd.DataFrame + The DataFrame of estimates data for the sid for which we're + applying the given adjustment. + + Returns + ------- + end_idx : int + The last index to which the adjustment should be applied for the + given quarter/sid. + """ + end_idx = upper_bound + # Find the next newest kd that happens on or after + # the date of this adjustment + newest_kd_for_qtr = sid_estimates[ + (sid_estimates[NORMALIZED_QUARTERS] == requested_quarter) & + (sid_estimates[TS_FIELD_NAME] >= adjustment_ts) + ][TS_FIELD_NAME].min() + if pd.notnull(newest_kd_for_qtr): + newest_kd_idx = dates.searchsorted( + newest_kd_for_qtr + ) + # We have fresh information that comes in + # before the end of the overwrite and + # presumably is already split-adjusted to the + # current split. We should stop applying the + # adjustment the day before this new + # information comes in. + if newest_kd_idx <= upper_bound: + end_idx = newest_kd_idx - 1 + return end_idx + + def collect_pre_split_asof_date_adjustments( + self, + split_adjusted_asof_date_idx, + sid_idx, + pre_adjustments, + requested_split_adjusted_columns + ): + """ + Collect split adjustments that occur before the + split-adjusted-asof-date. All those adjustments must first be + UN-applied at the first date index and then re-applied on the + appropriate dates in order to match point in time share pricing data. + + Parameters + ---------- + split_adjusted_asof_date_idx : int + The index in the calendar dates as-of which all data was + split-adjusted. + sid_idx : int + The index of the sid for which adjustments should be collected in + the adjusted array. + pre_adjustments : tuple(list(float), list(int)) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_split_adjusted_columns : list of str + The requested split adjusted columns. + + Returns + ------- + col_to_split_adjustments : dict[str -> dict[int -> list of Adjustment]] + The adjustments for this sid that occurred on or before the + split-asof-date. + """ + col_to_split_adjustments = {} + if len(pre_adjustments[0]): + adjustment_values, date_indexes = pre_adjustments + for column_name in requested_split_adjusted_columns: + col_to_split_adjustments[column_name] = {} + # We need to undo all adjustments that happen before the + # split_asof_date here by reversing the split ratio. + col_to_split_adjustments[column_name][0] = [Float64Multiply( + 0, + split_adjusted_asof_date_idx, + sid_idx, + sid_idx, + 1 / future_adjustment + ) for future_adjustment in adjustment_values] + + for adjustment, date_index in zip(adjustment_values, + date_indexes): + adj = Float64Multiply( + 0, + split_adjusted_asof_date_idx, + sid_idx, + sid_idx, + adjustment + ) + add_new_adjustments(col_to_split_adjustments, + [adj], + column_name, + date_index) + + return col_to_split_adjustments + + def collect_post_asof_split_adjustments(self, + post_adjustments, + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + requested_split_adjusted_columns): + """ + Collect split adjustments that occur after the + split-adjusted-asof-date. Each adjustment needs to be applied to all + dates on which knowledge for the requested quarter was older than the + date of the adjustment. + + Parameters + ---------- + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + sid : int + The sid for which adjustments need to be collected. + sid_idx : int + The index of `sid` in the adjusted array. + sid_estimates : pd.DataFrame + The raw estimates data for this sid. + requested_split_adjusted_columns : list of str + The requested split adjusted columns. + Returns + ------- + col_to_split_adjustments : dict[str -> dict[int -> list of Adjustment]] + The adjustments for this sid that occurred after the + split-asof-date. + """ + col_to_split_adjustments = {} + if post_adjustments: + # Get an integer index + requested_qtr_timeline = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS + ][sid].reset_index() + requested_qtr_timeline = requested_qtr_timeline[ + requested_qtr_timeline[sid].notnull() + ] + + # Split the data into range by quarter and determine which quarter + # was being requested in each range. + # Split integer indexes up by quarter range + qtr_ranges_idxs = np.split( + requested_qtr_timeline.index, + np.where(np.diff(requested_qtr_timeline[sid]) != 0)[0] + 1 + ) + requested_quarters_per_range = [requested_qtr_timeline[sid][r[0]] + for r in qtr_ranges_idxs] + # Try to apply each adjustment to each quarter range. + for i, qtr_range in enumerate(qtr_ranges_idxs): + for adjustment, date_index, timestamp in zip( + *post_adjustments + ): + # In the default case, apply through the end of the quarter + upper_bound = qtr_range[-1] + # Find the smallest KD in estimates that is on or after the + # date of the given adjustment. Apply the given adjustment + # until that KD. + end_idx = self.determine_end_idx_for_adjustment( + timestamp, + requested_qtr_data.index, + upper_bound, + requested_quarters_per_range[i], + sid_estimates + ) + # In the default case, apply adjustment on the first day of + # the quarter. + start_idx = qtr_range[0] + # If the adjustment happens during this quarter, apply the + # adjustment on the day it happens. + if date_index > start_idx: + start_idx = date_index + # We only want to apply the adjustment if we have any stale + # data to apply it to. + if qtr_range[0] <= end_idx: + for column_name in requested_split_adjusted_columns: + if column_name not in col_to_split_adjustments: + col_to_split_adjustments[column_name] = {} + adj = Float64Multiply( + # Always apply from first day of qtr + qtr_range[0], + end_idx, + sid_idx, + sid_idx, + adjustment + ) + add_new_adjustments( + col_to_split_adjustments, + [adj], + column_name, + start_idx + ) + + return col_to_split_adjustments + + def retrieve_split_adjustment_data_for_sid(self, + dates, + sid, + split_adjusted_asof_idx): + """ + dates : pd.DatetimeIndex + The calendar dates. + sid : int + The sid for which we want to retrieve adjustments. + split_adjusted_asof_idx : int + The index in `dates` as-of which the data is split adjusted. + + Returns + ------- + pre_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values and indexes in `dates` for + adjustments that happened before the split-asof-date. + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + """ + adjustments = self._split_adjustments.get_adjustments_for_sid( + 'splits', sid + ) + sorted(adjustments, key=lambda adj: adj[0]) + # Get rid of any adjustments that happen outside of our date index. + adjustments = list(filter(lambda x: dates[0] <= x[0] <= dates[-1], + adjustments)) + adjustment_values = np.array([adj[1] for adj in adjustments]) + timestamps = pd.DatetimeIndex([adj[0] for adj in adjustments]) + # We need the first date on which we would have known about each + # adjustment. + date_indexes = dates.searchsorted(timestamps) + pre_adjustment_idxs = np.where( + date_indexes <= split_adjusted_asof_idx + )[0] + last_adjustment_split_asof_idx = -1 + if len(pre_adjustment_idxs): + last_adjustment_split_asof_idx = pre_adjustment_idxs.max() + pre_adjustments = ( + adjustment_values[:last_adjustment_split_asof_idx + 1], + date_indexes[:last_adjustment_split_asof_idx + 1] + ) + post_adjustments = ( + adjustment_values[last_adjustment_split_asof_idx + 1:], + date_indexes[last_adjustment_split_asof_idx + 1:], + timestamps[last_adjustment_split_asof_idx + 1:] + ) + return pre_adjustments, post_adjustments + + def _collect_adjustments(self, + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + + pre_adjustments_dict = self.collect_pre_split_asof_date_adjustments( + split_adjusted_asof_idx, + sid_idx, + pre_adjustments, + requested_split_adjusted_columns + ) + + post_adjustments_dict = self.collect_post_asof_split_adjustments( + post_adjustments, + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + requested_split_adjusted_columns + ) + return pre_adjustments_dict, post_adjustments_dict + + def merge_split_adjustments_with_overwrites( + self, + pre, + post, + overwrites, + requested_split_adjusted_columns + ): + """ + Merge split adjustments with the dict containing overwrites. + + Parameters + ---------- + pre : dict[str -> dict[int -> list]] + The adjustments that occur before the split-adjusted-asof-date. + post : dict[str -> dict[int -> list]] + The adjustments that occur after the split-adjusted-asof-date. + overwrites : dict[str -> dict[int -> list]] + The overwrites across all time. Adjustments will be merged into + this dictionary. + requested_split_adjusted_columns : list of str + List of names of split adjusted columns that are being requested. + """ + for column_name in requested_split_adjusted_columns: + # We can do a merge here because the timestamps in 'pre' and + # 'post' are guaranteed to not overlap. + if pre: + # Either empty or contains all columns. + for ts in pre[column_name]: + add_new_adjustments( + overwrites, + pre[column_name][ts], + column_name, + ts + ) + if post: + # Either empty or contains all columns. + for ts in post[column_name]: + add_new_adjustments( + overwrites, + post[column_name][ts], + column_name, + ts + ) + + +class PreviousSplitAdjustedEarningsEstimatesLoader( + SplitAdjustedEstimatesLoader, PreviousEarningsEstimatesLoader +): + def collect_split_adjustments(self, + adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + """ + Collect split adjustments for previous quarters and apply them to the + given dictionary of splits for the given sid. Since overwrites just + replace all estimates before the new quarter with NaN, we don't need to + worry about re-applying split adjustments. + + Parameters + ---------- + adjustments_for_sid : dict[str -> dict[int -> list]] + The dictionary of adjustments to which splits need to be added. + Initially it contains only overwrites. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + sid : int + The sid for which adjustments need to be collected. + sid_idx : int + The index of `sid` in the adjusted array. + sid_estimates : pd.DataFrame + The raw estimates data for the given sid. + split_adjusted_asof_idx : int + The index in `dates` as-of which the data is split adjusted. + pre_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values and indexes in `dates` for + adjustments that happened before the split-asof-date. + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_split_adjusted_columns : list of str + List of requested split adjusted column names. + """ + (pre_adjustments_dict, + post_adjustments_dict) = self._collect_adjustments( + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns + ) + self.merge_split_adjustments_with_overwrites( + pre_adjustments_dict, + post_adjustments_dict, + adjustments_for_sid, + requested_split_adjusted_columns + ) + + +class NextSplitAdjustedEarningsEstimatesLoader( + SplitAdjustedEstimatesLoader, NextEarningsEstimatesLoader +): + def collect_split_adjustments(self, + adjustments_for_sid, + requested_qtr_data, + dates, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns): + """ + Collect split adjustments for future quarters. Re-apply adjustments + that would be overwritten by overwrites. Merge split adjustments with + overwrites into the given dictionary of splits for the given sid. + + Parameters + ---------- + adjustments_for_sid : dict[str -> dict[int -> list]] + The dictionary of adjustments to which splits need to be added. + Initially it contains only overwrites. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + sid : int + The sid for which adjustments need to be collected. + sid_idx : int + The index of `sid` in the adjusted array. + sid_estimates : pd.DataFrame + The raw estimates data for the given sid. + split_adjusted_asof_idx : int + The index in `dates` as-of which the data is split adjusted. + pre_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values and indexes in `dates` for + adjustments that happened before the split-asof-date. + post_adjustments : tuple(list(float), list(int), pd.DatetimeIndex) + The adjustment values, indexes in `dates`, and timestamps for + adjustments that happened after the split-asof-date. + requested_split_adjusted_columns : list of str + List of requested split adjusted column names. + """ + (pre_adjustments_dict, + post_adjustments_dict) = self._collect_adjustments( + requested_qtr_data, + sid, + sid_idx, + sid_estimates, + split_adjusted_asof_idx, + pre_adjustments, + post_adjustments, + requested_split_adjusted_columns, + ) + for column_name in requested_split_adjusted_columns: + for overwrite_ts in adjustments_for_sid[column_name]: + # We need to cumulatively re-apply all adjustments up to the + # split-adjusted-asof-date. We might not have any + # pre-adjustments, so we should check for that. + if overwrite_ts <= split_adjusted_asof_idx \ + and pre_adjustments_dict: + for split_ts in pre_adjustments_dict[column_name]: + # The split has to have occurred during the span of + # the overwrite. + if split_ts < overwrite_ts: + # Create new adjustments here so that we can + # re-apply all applicable adjustments to ONLY + # the dates being overwritten. + adjustments_for_sid[ + column_name + ][overwrite_ts].extend([ + Float64Multiply( + 0, + overwrite_ts - 1, + sid_idx, + sid_idx, + adjustment.value + ) + for adjustment + in pre_adjustments_dict[ + column_name + ][split_ts] + ]) + # After the split-adjusted-asof-date, we need to re-apply all + # adjustments that occur after that date and within the + # bounds of the overwrite. They need to be applied starting + # from the first date and until an end date. The end date is + # the date of the newest information we get about + # `requested_quarter` that is >= `split_ts`, or if there is no + # new knowledge before `overwrite_ts`, then it is the date + # before `overwrite_ts`. + else: + # Overwrites happen at the first index of a new quarter, + # so determine here which quarter that is. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS, sid + ].iloc[overwrite_ts] + + for adjustment_value, date_index, timestamp in zip( + *post_adjustments + ): + if split_adjusted_asof_idx < date_index < overwrite_ts: + # Assume the entire overwrite contains stale data + upper_bound = overwrite_ts - 1 + end_idx = self.determine_end_idx_for_adjustment( + timestamp, + dates, + upper_bound, + requested_quarter, + sid_estimates + ) + adjustments_for_sid[ + column_name + ][overwrite_ts].append( + Float64Multiply( + 0, + end_idx, + sid_idx, + sid_idx, + adjustment_value + ) + ) + + self.merge_split_adjustments_with_overwrites( + pre_adjustments_dict, + post_adjustments_dict, + adjustments_for_sid, + requested_split_adjusted_columns + )