diff --git a/tests/pipeline/test_adjusted_array.py b/tests/pipeline/test_adjusted_array.py index cea6f090..730af952 100644 --- a/tests/pipeline/test_adjusted_array.py +++ b/tests/pipeline/test_adjusted_array.py @@ -20,8 +20,10 @@ from toolz import curry from zipline.errors import WindowLengthNotPositive, WindowLengthTooLong from zipline.lib.adjustment import ( Datetime64Overwrite, + Datetime641DArrayOverwrite, Float64Multiply, Float64Overwrite, + Float641DArrayOverwrite, ObjectOverwrite, ) from zipline.lib.adjusted_array import AdjustedArray, NOMASK @@ -200,11 +202,7 @@ def _gen_multiplicative_adjustment_cases(dtype): ) -def _gen_overwrite_adjustment_cases(name, - make_input, - make_expected_output, - dtype, - missing_value): +def _gen_overwrite_adjustment_cases(dtype): """ Generate test cases for overwrite adjustments. @@ -224,6 +222,8 @@ def _gen_overwrite_adjustment_cases(name, unicode_dtype: ObjectOverwrite, object_dtype: ObjectOverwrite, }[dtype] + make_expected_dtype = as_dtype(dtype) + missing_value = default_missing_value_for_dtype(datetime64ns_dtype) if dtype == object_dtype: # When we're testing object dtypes, we expect to have strings, but @@ -235,30 +235,30 @@ def _gen_overwrite_adjustment_cases(name, adjustments = {} buffer_as_of = [None] * 6 - baseline = make_input([[2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + baseline = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) - buffer_as_of[0] = make_expected_output([[2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[0] = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) # Note that row indices are inclusive! adjustments[1] = [ adjustment_type(0, 0, 0, 0, make_overwrite_value(dtype, 1)), ] - buffer_as_of[1] = make_expected_output([[1, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[1] = make_expected_dtype([[1, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) # No adjustment at index 2. buffer_as_of[2] = buffer_as_of[1] @@ -267,33 +267,136 @@ def _gen_overwrite_adjustment_cases(name, adjustment_type(1, 2, 1, 1, make_overwrite_value(dtype, 3)), adjustment_type(0, 1, 0, 0, make_overwrite_value(dtype, 4)), ] - buffer_as_of[3] = make_expected_output([[4, 2, 2], - [4, 3, 2], - [2, 3, 2], - [2, 2, 2], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[3] = make_expected_dtype([[4, 2, 2], + [4, 3, 2], + [2, 3, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) adjustments[4] = [ adjustment_type(0, 3, 2, 2, make_overwrite_value(dtype, 5)) ] - buffer_as_of[4] = make_expected_output([[4, 2, 5], - [4, 3, 5], - [2, 3, 5], - [2, 2, 5], - [2, 2, 2], - [2, 2, 2]]) + buffer_as_of[4] = make_expected_dtype([[4, 2, 5], + [4, 3, 5], + [2, 3, 5], + [2, 2, 5], + [2, 2, 2], + [2, 2, 2]]) adjustments[5] = [ adjustment_type(0, 4, 1, 1, make_overwrite_value(dtype, 6)), adjustment_type(2, 2, 2, 2, make_overwrite_value(dtype, 7)), ] - buffer_as_of[5] = make_expected_output([[4, 6, 5], - [4, 6, 5], - [2, 6, 7], - [2, 6, 5], - [2, 6, 2], - [2, 2, 2]]) + buffer_as_of[5] = make_expected_dtype([[4, 6, 5], + [4, 6, 5], + [2, 6, 7], + [2, 6, 5], + [2, 6, 2], + [2, 2, 2]]) + + return _gen_expectations( + baseline, + missing_value, + adjustments, + buffer_as_of, + nrows=6, + ) + + +def _gen_overwrite_1d_array_adjustment_case(dtype): + """ + Generate test cases for overwrite adjustments. + + The algorithm used here is the same as the one used above for + multiplicative adjustments. The only difference is the semantics of how + the adjustments are expected to modify the arrays. + + This is parameterized on `make_input` and `make_expected_output` functions, + which take 1-D lists of values and transform them into desired input/output + arrays. We do this so that we can easily test both vanilla numpy ndarrays + and our own LabelArray class for strings. + """ + adjustment_type = { + float64_dtype: Float641DArrayOverwrite, + datetime64ns_dtype: Datetime641DArrayOverwrite, + }[dtype] + make_expected_dtype = as_dtype(dtype) + missing_value = default_missing_value_for_dtype(datetime64ns_dtype) + + adjustments = {} + buffer_as_of = [None] * 6 + baseline = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) + + buffer_as_of[0] = make_expected_dtype([[2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) + + vals1 = [1] + # Note that row indices are inclusive! + adjustments[1] = [ + adjustment_type( + 0, 0, 0, 0, + array([coerce_to_dtype(dtype, val) for val in vals1]) + ) + ] + buffer_as_of[1] = make_expected_dtype([[1, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) + + # No adjustment at index 2. + buffer_as_of[2] = buffer_as_of[1] + + vals3 = [4, 4, 1] + adjustments[3] = [ + adjustment_type( + 0, 2, 0, 0, + array([coerce_to_dtype(dtype, val) for val in vals3]) + ) + ] + buffer_as_of[3] = make_expected_dtype([[4, 2, 2], + [4, 2, 2], + [1, 2, 2], + [2, 2, 2], + [2, 2, 2], + [2, 2, 2]]) + + vals4 = [5] * 4 + adjustments[4] = [ + adjustment_type( + 0, 3, 2, 2, + array([coerce_to_dtype(dtype, val) for val in vals4])) + ] + buffer_as_of[4] = make_expected_dtype([[4, 2, 5], + [4, 2, 5], + [1, 2, 5], + [2, 2, 5], + [2, 2, 2], + [2, 2, 2]]) + + vals5 = range(1, 6) + adjustments[5] = [ + adjustment_type( + 0, 4, 1, 1, + array([coerce_to_dtype(dtype, val) for val in vals5])), + ] + buffer_as_of[5] = make_expected_dtype([[4, 1, 5], + [4, 2, 5], + [1, 3, 5], + [2, 4, 5], + [2, 5, 2], + [2, 2, 2]]) return _gen_expectations( baseline, @@ -426,22 +529,10 @@ class AdjustedArrayTestCase(TestCase): @parameterized.expand( chain( - _gen_overwrite_adjustment_cases( - 'float', - make_input=as_dtype(float64_dtype), - make_expected_output=as_dtype(float64_dtype), - dtype=float64_dtype, - missing_value=default_missing_value_for_dtype(float64_dtype), - ), - _gen_overwrite_adjustment_cases( - 'datetime', - make_input=as_dtype(datetime64ns_dtype), - make_expected_output=as_dtype(datetime64ns_dtype), - dtype=datetime64ns_dtype, - missing_value=default_missing_value_for_dtype( - datetime64ns_dtype, - ), - ), + _gen_overwrite_adjustment_cases(float64_dtype), + _gen_overwrite_adjustment_cases(datetime64ns_dtype), + _gen_overwrite_1d_array_adjustment_case(float64_dtype), + _gen_overwrite_1d_array_adjustment_case(datetime64ns_dtype), # There are six cases here: # Using np.bytes/np.unicode/object arrays as inputs. # Passing np.bytes/np.unicode/object arrays to LabelArray, diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py new file mode 100644 index 00000000..8f0cf777 --- /dev/null +++ b/tests/pipeline/test_quarters_estimates.py @@ -0,0 +1,1063 @@ +import blaze as bz +import itertools +from nose.tools import assert_true +from nose_parameterized import parameterized +import numpy as np +from numpy.testing import assert_array_equal +import pandas as pd +from toolz import merge + +from zipline.pipeline import SimplePipelineEngine, Pipeline, CustomFactor +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.data import DataSet +from zipline.pipeline.data import Column +from zipline.pipeline.loaders.blaze.estimates import ( + BlazeNextEstimatesLoader, + BlazePreviousEstimatesLoader +) +from zipline.pipeline.loaders.earnings_estimates import ( + INVALID_NUM_QTRS_MESSAGE, + NextEarningsEstimatesLoader, + normalize_quarters, + PreviousEarningsEstimatesLoader, + split_normalized_quarters, +) +from zipline.testing.fixtures import ( + WithAssetFinder, + WithTradingSessions, + ZiplineTestCase, +) +from zipline.testing.predicates import assert_equal, assert_raises_regex +from zipline.utils.numpy_utils import datetime64ns_dtype +from zipline.utils.numpy_utils import float64_dtype + + +class Estimates(DataSet): + event_date = Column(dtype=datetime64ns_dtype) + fiscal_quarter = Column(dtype=float64_dtype) + fiscal_year = Column(dtype=float64_dtype) + estimate = Column(dtype=float64_dtype) + + +def QuartersEstimates(announcements_out): + class QtrEstimates(Estimates): + num_announcements = announcements_out + name = Estimates + return QtrEstimates + + +def QuartersEstimatesNoNumQuartersAttr(num_qtr): + class QtrEstimates(Estimates): + name = Estimates + return QtrEstimates + + +class WithEstimates(WithTradingSessions, WithAssetFinder): + """ + ZiplineTestCase mixin providing cls.loader and cls.events as class + level fixtures. + + + Methods + ------- + make_loader(events, columns) -> PipelineLoader + Method which returns the loader to be used throughout tests. + + events : pd.DataFrame + The raw events to be used as input to the pipeline loader. + columns : dict[str -> str] + The dictionary mapping the names of BoundColumns to the + associated column name in the events DataFrame. + """ + + # Short window defined in order for test to run faster. + START_DATE = pd.Timestamp('2014-12-28') + END_DATE = pd.Timestamp('2015-02-04') + + @classmethod + def make_loader(cls, events, columns): + raise NotImplementedError('make_loader') + + @classmethod + def make_events(cls): + raise NotImplementedError('make_events') + + @classmethod + def get_sids(cls): + return cls.events[SID_FIELD_NAME].unique() + + @classmethod + def init_class_fixtures(cls): + cls.events = cls.make_events() + cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids() + cls.columns = { + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.estimate: 'estimate' + } + cls.loader = cls.make_loader(cls.events, {column.name: val for + column, val in + cls.columns.items()}) + cls.ASSET_FINDER_EQUITY_SYMBOLS = [ + 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS + ] + super(WithEstimates, cls).init_class_fixtures() + + +class WithWrongLoaderDefinition(WithEstimates): + """ + ZiplineTestCase mixin providing cls.events as a class level fixture and + defining a test for all inheritors to use. + + Attributes + ---------- + events : pd.DataFrame + A simple DataFrame with columns needed for estimates and a single sid + and no other data. + + Tests + ------ + test_wrong_num_announcements_passed() + Tests that loading with an incorrect quarter number raises an error. + test_no_num_announcements_attr() + Tests that the loader throws an AssertionError if the dataset being + loaded has no `num_announcements` attribute. + """ + + @classmethod + def make_events(cls): + return pd.DataFrame({SID_FIELD_NAME: 0}, + columns=[SID_FIELD_NAME, + TS_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + 'estimate'], + index=[0]) + + def test_wrong_num_announcements_passed(self): + bad_dataset1 = QuartersEstimates(-1) + bad_dataset2 = QuartersEstimates(-2) + good_dataset = QuartersEstimates(1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + columns = {c.name + str(dataset.num_announcements): c.latest + for dataset in (bad_dataset1, + bad_dataset2, + good_dataset) + for c in dataset.columns} + p = Pipeline(columns) + + with self.assertRaises(ValueError) as e: + engine.run_pipeline( + p, + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + assert_raises_regex(e, INVALID_NUM_QTRS_MESSAGE % "-1,-2") + + def test_no_num_announcements_attr(self): + dataset = QuartersEstimatesNoNumQuartersAttr(1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + p = Pipeline({c.name: c.latest for c in dataset.columns}) + + with self.assertRaises(AttributeError): + engine.run_pipeline( + p, + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + + +class PreviousWithWrongNumQuarters(WithWrongLoaderDefinition, + ZiplineTestCase): + """ + Tests that previous quarter loader correctly breaks if an incorrect + number of quarters is passed. + """ + @classmethod + def make_loader(cls, events, columns): + return PreviousEarningsEstimatesLoader(events, columns) + + +class NextWithWrongNumQuarters(WithWrongLoaderDefinition, + ZiplineTestCase): + """ + Tests that next quarter loader correctly breaks if an incorrect + number of quarters is passed. + """ + @classmethod + def make_loader(cls, events, columns): + return NextEarningsEstimatesLoader(events, columns) + + +class WithEstimatesTimeZero(WithEstimates): + """ + ZiplineTestCase mixin providing cls.events as a class level fixture and + defining a test for all inheritors to use. + + Attributes + ---------- + cls.events : pd.DataFrame + Generated dynamically in order to test inter-leavings of estimates and + event dates for multiple quarters to make sure that we select the + right immediate 'next' or 'previous' quarter relative to each date - + i.e., the right 'time zero' on the timeline. We care about selecting + the right 'time zero' because we use that to calculate which quarter's + data needs to be returned for each day. + + Methods + ------- + get_expected_estimate(q1_knowledge, + q2_knowledge, + comparable_date) -> pd.DataFrame + Retrieves the expected estimate given the latest knowledge about each + quarter and the date on which the estimate is being requested. If + there is no expected estimate, returns an empty DataFrame. + + Tests + ------ + test_estimates() + Tests that we get the right 'time zero' value on each day for each + sid and for each column. + """ + # Shorter date range for performance + END_DATE = pd.Timestamp('2015-01-28') + + q1_knowledge_dates = [pd.Timestamp('2015-01-01'), + pd.Timestamp('2015-01-04'), + pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-11')] + q2_knowledge_dates = [pd.Timestamp('2015-01-14'), + pd.Timestamp('2015-01-17'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-23')] + # We want to model the possibility of an estimate predicting a release date + # that doesn't match the actual release. This could be done by dynamically + # generating more combinations with different release dates, but that + # significantly increases the amount of time it takes to run the tests. + # These hard-coded cases are sufficient to know that we can update our + # beliefs when we get new information. + q1_release_dates = [pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-14')] # One day late + q2_release_dates = [pd.Timestamp('2015-01-25'), # One day early + pd.Timestamp('2015-01-26')] + + @classmethod + def make_events(cls): + """ + In order to determine which estimate we care about for a particular + sid, we need to look at all estimates that we have for that sid and + their associated event dates. + + We define q1 < q2, and thus event1 < event2 since event1 occurs + during q1 and event2 occurs during q2 and we assume that there can + only be 1 event per quarter. We assume that there can be multiple + estimates per quarter leading up to the event. We assume that estimates + will not surpass the relevant event date. We will look at 2 estimates + for an event before the event occurs, since that is the simplest + scenario that covers the interesting edge cases: + - estimate values changing + - a release date changing + - estimates for different quarters interleaving + + Thus, we generate all possible inter-leavings of 2 estimates per + quarter-event where estimate1 < estimate2 and all estimates are < the + relevant event and assign each of these inter-leavings to a + different sid. + """ + + sid_estimates = [] + sid_releases = [] + # We want all permutations of 2 knowledge dates per quarter. + it = enumerate( + itertools.permutations(cls.q1_knowledge_dates + + cls.q2_knowledge_dates, + 4) + ) + for sid, (q1e1, q1e2, q2e1, q2e2) in it: + # We're assuming that estimates must come before the relevant + # release. + if (q1e1 < q1e2 and + q2e1 < q2e2 and + # All estimates are < Q2's event, so just constrain Q1 + # estimates. + q1e1 < cls.q1_release_dates[0] and + q1e2 < cls.q1_release_dates[0]): + sid_estimates.append(cls.create_estimates_df(q1e1, + q1e2, + q2e1, + q2e2, + sid)) + sid_releases.append(cls.create_releases_df(sid)) + return pd.concat(sid_estimates + + sid_releases).reset_index(drop=True) + + @classmethod + def get_sids(cls): + sids = cls.events[SID_FIELD_NAME].unique() + # Tack on an extra sid to make sure that sids with no data are + # included but have all-null columns. + return list(sids) + [max(sids) + 1] + + @classmethod + def create_releases_df(cls, sid): + # Final release dates never change. The quarters have very tight date + # ranges in order to reduce the number of dates we need to iterate + # through when testing. + return pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-26')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-26')], + 'estimate': [0.5, 0.8], + FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0], + FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0], + SID_FIELD_NAME: sid + }) + + @classmethod + def create_estimates_df(cls, + q1e1, + q1e2, + q2e1, + q2e2, + sid): + return pd.DataFrame({ + EVENT_DATE_FIELD_NAME: cls.q1_release_dates + cls.q2_release_dates, + 'estimate': [.1, .2, .3, .4], + FISCAL_QUARTER_FIELD_NAME: [1.0, 1.0, 2.0, 2.0], + FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0, 2015.0, 2015.0], + TS_FIELD_NAME: [q1e1, q1e2, q2e1, q2e2], + SID_FIELD_NAME: sid, + }) + + @classmethod + def init_class_fixtures(cls): + super(WithEstimatesTimeZero, cls).init_class_fixtures() + + def get_expected_estimate(self, + q1_knowledge, + q2_knowledge, + comparable_date): + return pd.DataFrame() + + def test_estimates(self): + dataset = QuartersEstimates(1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + results = engine.run_pipeline( + Pipeline({c.name: c.latest for c in dataset.columns}), + start_date=self.trading_days[1], + end_date=self.trading_days[-2], + ) + for sid in self.ASSET_FINDER_EQUITY_SIDS: + sid_estimates = results.xs(sid, level=1) + # Separate assertion for all-null DataFrame to avoid setting + # column dtypes on `all_expected`. + if sid == max(self.ASSET_FINDER_EQUITY_SIDS): + assert_true(sid_estimates.isnull().all().all()) + else: + ts_sorted_estimates = self.events[ + self.events[SID_FIELD_NAME] == sid + ].sort(TS_FIELD_NAME) + q1_knowledge = ts_sorted_estimates[ + ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 + ] + q2_knowledge = ts_sorted_estimates[ + ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 + ] + all_expected = pd.concat( + [self.get_expected_estimate( + q1_knowledge[q1_knowledge[TS_FIELD_NAME] <= + date.tz_localize(None)], + q2_knowledge[q2_knowledge[TS_FIELD_NAME] <= + date.tz_localize(None)], + date.tz_localize(None), + ).set_index([[date]]) for date in sid_estimates.index], + axis=0) + assert_equal(all_expected[sid_estimates.columns], + sid_estimates) + + +class NextEstimate(WithEstimatesTimeZero, ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return NextEarningsEstimatesLoader(events, columns) + + def get_expected_estimate(self, + q1_knowledge, + q2_knowledge, + comparable_date): + # If our latest knowledge of q1 is that the release is + # happening on this simulation date or later, then that's + # the estimate we want to use. + if (not q1_knowledge.empty and + q1_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] >= + comparable_date): + return q1_knowledge.iloc[-1:] + # If q1 has already happened or we don't know about it + # yet and our latest knowledge indicates that q2 hasn't + # happened yet, then that's the estimate we want to use. + elif (not q2_knowledge.empty and + q2_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] >= + comparable_date): + return q2_knowledge.iloc[-1:] + return pd.DataFrame(columns=q1_knowledge.columns, + index=[comparable_date]) + + +class BlazeNextEstimateLoaderTestCase(NextEstimate): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) + + +class PreviousEstimate(WithEstimatesTimeZero, ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return PreviousEarningsEstimatesLoader(events, columns) + + def get_expected_estimate(self, + q1_knowledge, + q2_knowledge, + comparable_date): + + # The expected estimate will be for q2 if the last thing + # we've seen is that the release date already happened. + # Otherwise, it'll be for q1, as long as the release date + # for q1 has already happened. + if (not q2_knowledge.empty and + q2_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] <= + comparable_date): + return q2_knowledge.iloc[-1:] + elif (not q1_knowledge.empty and + q1_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] <= + comparable_date): + return q1_knowledge.iloc[-1:] + return pd.DataFrame(columns=q1_knowledge.columns, + index=[comparable_date]) + + +class BlazePreviousEstimateLoaderTestCase(PreviousEstimate): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) + + +class WithEstimateMultipleQuarters(WithEstimates): + """ + ZiplineTestCase mixin providing cls.events, cls.make_expected_out as + class-level fixtures and self.test_multiple_qtrs_requested as a test. + + Attributes + ---------- + events : pd.DataFrame + Simple DataFrame with estimates for 2 quarters for a single sid. + + Methods + ------- + make_expected_out() --> pd.DataFrame + Returns the DataFrame that is expected as a result of running a + Pipeline where estimates are requested for multiple quarters out. + fill_expected_out(expected) + Fills the expected DataFrame with data. + + Tests + ------ + test_multiple_qtrs_requested() + Runs a Pipeline that calculate which estimates for multiple quarters + out and checks that the returned columns contain data for the correct + number of quarters out. + """ + + @classmethod + def make_events(cls): + return pd.DataFrame({ + SID_FIELD_NAME: [0] * 2, + TS_FIELD_NAME: [pd.Timestamp('2015-01-01'), + pd.Timestamp('2015-01-06')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-01-20')], + 'estimate': [1., 2.], + FISCAL_QUARTER_FIELD_NAME: [1, 2], + FISCAL_YEAR_FIELD_NAME: [2015, 2015] + }) + + @classmethod + def init_class_fixtures(cls): + super(WithEstimateMultipleQuarters, cls).init_class_fixtures() + cls.expected_out = cls.make_expected_out() + + @classmethod + def make_expected_out(cls): + expected = pd.DataFrame(columns=[cls.columns[col] + '1' + for col in cls.columns] + + [cls.columns[col] + '2' + for col in cls.columns], + index=cls.trading_days) + + for (col, raw_name), suffix in itertools.product( + cls.columns.items(), ('1', '2') + ): + expected_name = raw_name + suffix + if col.dtype == datetime64ns_dtype: + expected[expected_name] = pd.to_datetime( + expected[expected_name] + ) + else: + expected[expected_name] = expected[ + expected_name + ].astype(col.dtype) + cls.fill_expected_out(expected) + return expected.reindex(cls.trading_days) + + def test_multiple_qtrs_requested(self): + dataset1 = QuartersEstimates(1) + dataset2 = QuartersEstimates(2) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + + results = engine.run_pipeline( + Pipeline( + merge([{c.name + '1': c.latest for c in dataset1.columns}, + {c.name + '2': c.latest for c in dataset2.columns}]) + ), + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + q1_columns = [col.name + '1' for col in self.columns] + q2_columns = [col.name + '2' for col in self.columns] + + # We now expect a column for 1 quarter out and a column for 2 + # quarters out for each of the dataset columns. + assert_equal(sorted(np.array(q1_columns + q2_columns)), + sorted(results.columns.values)) + assert_equal(self.expected_out.sort(axis=1), + results.xs(0, level=1).sort(axis=1)) + + +class NextEstimateMultipleQuarters( + WithEstimateMultipleQuarters, ZiplineTestCase +): + @classmethod + def make_loader(cls, events, columns): + return NextEarningsEstimatesLoader(events, columns) + + @classmethod + def fill_expected_out(cls, expected): + # Fill columns for 1 Q out + for raw_name in cls.columns.values(): + expected.loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-11'), + raw_name + '1' + ] = cls.events[raw_name].iloc[0] + expected.loc[ + pd.Timestamp('2015-01-11'):pd.Timestamp('2015-01-20'), + raw_name + '1' + ] = cls.events[raw_name].iloc[1] + + # Fill columns for 2 Q out + # We only have an estimate and event date for 2 quarters out before + # Q1's event happens; after Q1's event, we know 1 Q out but not 2 Qs + # out. + for col_name in ['estimate', 'event_date']: + expected.loc[ + pd.Timestamp('2015-01-06'):pd.Timestamp('2015-01-10'), + col_name + '2' + ] = cls.events[col_name].iloc[1] + # But we know what FQ and FY we'd need in both Q1 and Q2 + # because we know which FQ is next and can calculate from there + expected.loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-09'), + FISCAL_QUARTER_FIELD_NAME + '2' + ] = 2 + expected.loc[ + pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20'), + FISCAL_QUARTER_FIELD_NAME + '2' + ] = 3 + expected.loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-20'), + FISCAL_YEAR_FIELD_NAME + '2' + ] = 2015 + + return expected + + +class PreviousEstimateMultipleQuarters( + WithEstimateMultipleQuarters, + ZiplineTestCase +): + + @classmethod + def make_loader(cls, events, columns): + return PreviousEarningsEstimatesLoader(events, columns) + + @classmethod + def fill_expected_out(cls, expected): + # Fill columns for 1 Q out + for raw_name in cls.columns.values(): + expected[raw_name + '1'].loc[ + pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-19') + ] = cls.events[raw_name].iloc[0] + expected[raw_name + '1'].loc[ + pd.Timestamp('2015-01-20'): + ] = cls.events[raw_name].iloc[1] + + # Fill columns for 2 Q out + for col_name in ['estimate', 'event_date']: + expected[col_name + '2'].loc[ + pd.Timestamp('2015-01-20'): + ] = cls.events[col_name].iloc[0] + expected[ + FISCAL_QUARTER_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20')] = 4 + expected[ + FISCAL_YEAR_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20')] = 2014 + expected[ + FISCAL_QUARTER_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-20'):] = 1 + expected[ + FISCAL_YEAR_FIELD_NAME + '2' + ].loc[pd.Timestamp('2015-01-20'):] = 2015 + return expected + + +class WithVaryingNumEstimates(WithEstimates): + """ + ZiplineTestCase mixin providing fixtures and a test to ensure that we + have the correct overwrites when the event date changes. We want to make + sure that if we have a quarter with an event date that gets pushed back, + we don't start overwriting for the next quarter early. Likewise, + if we have a quarter with an event date that gets pushed forward, we want + to make sure that we start applying adjustments at the appropriate, earlier + date, rather than the later date. + + Methods + ------- + assert_compute() + Defines how to determine that results computed for the `SomeFactor` + factor are correct. + + Tests + ----- + test_windows_with_varying_num_estimates() + Tests that we create the correct overwrites from 2015-01-13 to + 2015-01-14 regardless of how event dates were updated for each + quarter for each sid. + """ + + @classmethod + def make_events(cls): + return pd.DataFrame({ + SID_FIELD_NAME: [0] * 3 + [1] * 3, + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-13')] * 2, + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-20')], + 'estimate': [11., 12., 21.] * 2, + FISCAL_QUARTER_FIELD_NAME: [1, 1, 2] * 2, + FISCAL_YEAR_FIELD_NAME: [2015] * 6 + }) + + @classmethod + def assert_compute(cls, estimate, today): + raise NotImplementedError('assert_compute') + + def test_windows_with_varying_num_estimates(self): + dataset = QuartersEstimates(1) + assert_compute = self.assert_compute + + class SomeFactor(CustomFactor): + inputs = [dataset.estimate] + window_length = 3 + + def compute(self, today, assets, out, estimate): + assert_compute(estimate, today) + + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + engine.run_pipeline( + Pipeline({'est': SomeFactor()}), + start_date=pd.Timestamp('2015-01-13', tz='utc'), + # last event date we have + end_date=pd.Timestamp('2015-01-14', tz='utc'), + ) + + +class PreviousVaryingNumEstimates( + WithVaryingNumEstimates, + ZiplineTestCase +): + def assert_compute(self, estimate, today): + if today == pd.Timestamp('2015-01-13', tz='utc'): + assert_array_equal(estimate[:, 0], + np.array([np.NaN, np.NaN, 12])) + assert_array_equal(estimate[:, 1], + np.array([np.NaN, 12, 12])) + else: + assert_array_equal(estimate[:, 0], + np.array([np.NaN, 12, 12])) + assert_array_equal(estimate[:, 1], + np.array([12, 12, 12])) + + @classmethod + def make_loader(cls, events, columns): + return PreviousEarningsEstimatesLoader(events, columns) + + +class NextVaryingNumEstimates( + WithVaryingNumEstimates, + ZiplineTestCase +): + + def assert_compute(self, estimate, today): + if today == pd.Timestamp('2015-01-13', tz='utc'): + assert_array_equal(estimate[:, 0], + np.array([11, 12, 12])) + assert_array_equal(estimate[:, 1], + np.array([np.NaN, np.NaN, 21])) + else: + assert_array_equal(estimate[:, 0], + np.array([np.NaN, 21, 21])) + assert_array_equal(estimate[:, 1], + np.array([np.NaN, 21, 21])) + + @classmethod + def make_loader(cls, events, columns): + return NextEarningsEstimatesLoader(events, columns) + + +class WithEstimateWindows(WithEstimates): + """ + ZiplineTestCase mixin providing fixures and a test to test running a + Pipeline with an estimates loader over differently-sized windows. + + Attributes + ---------- + events : pd.DataFrame + DataFrame with estimates for 2 quarters for 2 sids. + window_test_start_date : pd.Timestamp + The date from which the window should start. + timelines : dict[int -> pd.DataFrame] + A dictionary mapping to the number of quarters out to + snapshots of how the data should look on each date in the date range. + + Methods + ------- + make_expected_timelines() -> dict[int -> pd.DataFrame] + Creates a dictionary of expected data. See `timelines`, above. + + Tests + ----- + test_estimate_windows_at_quarter_boundaries() + Tests that we overwrite values with the correct quarter's estimate at + the correct dates when we have a factor that asks for a window of data. + """ + + window_test_start_date = pd.Timestamp('2015-01-05') + critical_dates = [pd.Timestamp('2015-01-09', tz='utc'), + pd.Timestamp('2015-01-12', tz='utc'), + pd.Timestamp('2015-01-15', tz='utc'), + pd.Timestamp('2015-01-20', tz='utc')] + # window length, starting date, num quarters out, timeline. Parameterizes + # over number of quarters out. + window_test_cases = list(itertools.product(critical_dates, (1, 2))) + + @classmethod + def make_events(cls): + sid_0_timeline = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-05'), + pd.Timestamp('2015-01-17')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-20')], + 'estimate': [10., 11.] + [20., 21.], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [2] * 2, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 0, + }) + + sid_1_timeline = pd.DataFrame({ + TS_FIELD_NAME: [pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-09'), + pd.Timestamp('2015-01-15')], + EVENT_DATE_FIELD_NAME: + [pd.Timestamp('2015-01-12'), pd.Timestamp('2015-01-12'), + pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-15')], + 'estimate': [10., 11.] + [30., 31.], + FISCAL_QUARTER_FIELD_NAME: [1] * 2 + [3] * 2, + FISCAL_YEAR_FIELD_NAME: 2015, + SID_FIELD_NAME: 1 + }) + return pd.concat([sid_0_timeline, sid_1_timeline]) + + @classmethod + def make_expected_timelines(cls): + return {} + + @classmethod + def init_class_fixtures(cls): + super(WithEstimateWindows, cls).init_class_fixtures() + cls.timelines = cls.make_expected_timelines() + + @classmethod + def create_expected_df(cls, tuples, end_date): + """ + Given a list of tuples of new data we get for each sid on each critical + date (when information changes), create a DataFrame that fills that + data through a date range ending at `end_date`. + """ + df = pd.DataFrame(tuples, + columns=[SID_FIELD_NAME, + 'estimate', + 'knowledge_date']) + df = df.pivot_table(columns=SID_FIELD_NAME, + values='estimate', + index='knowledge_date') + df = df.reindex( + pd.date_range(cls.window_test_start_date, end_date) + ) + # Index name is lost during reindex. + df.index = df.index.rename('knowledge_date') + df['at_date'] = end_date.tz_localize('utc') + df = df.set_index(['at_date', df.index.tz_localize('utc')]).ffill() + return df + + @parameterized.expand(window_test_cases) + def test_estimate_windows_at_quarter_boundaries(self, + start_idx, + num_announcements_out): + dataset = QuartersEstimates(num_announcements_out) + trading_days = self.trading_days + timelines = self.timelines + # The window length should be from the starting index back to the first + # date on which we got data. The goal is to ensure that as we + # progress through the timeline, all data we got, starting from that + # first date, is correctly overwritten. + window_len = ( + self.trading_days.get_loc(start_idx) - + self.trading_days.get_loc(self.window_test_start_date) + 1 + ) + + class SomeFactor(CustomFactor): + inputs = [dataset.estimate] + window_length = window_len + + def compute(self, today, assets, out, estimate): + today_idx = trading_days.get_loc(today) + today_timeline = timelines[ + num_announcements_out + ].loc[today].reindex( + trading_days[:today_idx + 1] + ).values + timeline_start_idx = (len(today_timeline) - window_len) + assert_equal(estimate, + today_timeline[timeline_start_idx:]) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + engine.run_pipeline( + Pipeline({'est': SomeFactor()}), + start_date=start_idx, + # last event date we have + end_date=pd.Timestamp('2015-01-20', tz='utc'), + ) + + +class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return PreviousEarningsEstimatesLoader(events, columns) + + @classmethod + def make_expected_timelines(cls): + oneq_previous = pd.concat([ + cls.create_expected_df( + [(0, np.NaN, cls.window_test_start_date), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-09') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-12') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-13') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-14') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-15') + ), + cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-10')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-16') + ), + cls.create_expected_df( + [(0, 21, pd.Timestamp('2015-01-17')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-20') + ), + ]) + + twoq_previous = pd.concat( + [cls.create_expected_df( + [(0, np.NaN, cls.window_test_start_date), + (1, np.NaN, cls.window_test_start_date)], + end_date + ) for end_date in pd.date_range('2015-01-09', '2015-01-19')] + + [cls.create_expected_df( + [(0, 11, pd.Timestamp('2015-01-20')), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-20') + )] + ) + return { + 1: oneq_previous, + 2: twoq_previous + } + + +class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): + @classmethod + def make_loader(cls, events, columns): + return NextEarningsEstimatesLoader(events, columns) + + @classmethod + def make_expected_timelines(cls): + oneq_next = pd.concat([ + cls.create_expected_df( + [(0, 10, cls.window_test_start_date), + (0, 11, pd.Timestamp('2015-01-07')), + (1, 10, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-09') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 10, pd.Timestamp('2015-01-09')), + (1, 11, pd.Timestamp('2015-01-12'))], + pd.Timestamp('2015-01-12') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 30, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-13') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 30, pd.Timestamp('2015-01-09'))], + pd.Timestamp('2015-01-14') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, 30, pd.Timestamp('2015-01-09')), + (1, 31, pd.Timestamp('2015-01-15'))], + pd.Timestamp('2015-01-15') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-16') + ), + cls.create_expected_df( + [(0, 20, cls.window_test_start_date), + (0, 21, pd.Timestamp('2015-01-17')), + (1, np.NaN, cls.window_test_start_date)], + pd.Timestamp('2015-01-20') + ), + ]) + + twoq_next = pd.concat( + [cls.create_expected_df( + [(0, 20, pd.Timestamp(cls.window_test_start_date)), + (1, np.NaN, pd.Timestamp(cls.window_test_start_date))], + pd.Timestamp('2015-01-09') + )] + + [cls.create_expected_df( + [(0, np.NaN, pd.Timestamp(cls.window_test_start_date)), + (1, np.NaN, pd.Timestamp(cls.window_test_start_date))], + end_date + ) for end_date in pd.date_range('2015-01-12', '2015-01-20')] + ) + + return { + 1: oneq_next, + 2: twoq_next + } + + +class QuarterShiftTestCase(ZiplineTestCase): + """ + This tests, in isolation, quarter calculation logic for shifting quarters + backwards/forwards from a starting point. + """ + def test_quarter_normalization(self): + input_yrs = pd.Series(range(2011, 2015), dtype=np.int64) + input_qtrs = pd.Series(range(1, 5), dtype=np.int64) + result_years, result_quarters = split_normalized_quarters( + normalize_quarters(input_yrs, input_qtrs) + ) + # Can't use assert_series_equal here with check_names=False + # because that still fails due to name differences. + assert_equal(input_yrs, result_years) + assert_equal(input_qtrs, result_quarters) diff --git a/zipline/lib/adjustment.pyx b/zipline/lib/adjustment.pyx index b0ba1873..aea8df5c 100644 --- a/zipline/lib/adjustment.pyx +++ b/zipline/lib/adjustment.pyx @@ -3,7 +3,7 @@ from cpython cimport Py_EQ from pandas import isnull, Timestamp from numpy cimport float64_t, uint8_t, int64_t -from numpy import datetime64, float64 +from numpy import asarray, datetime64, float64, int64 # Purely for readability. There aren't C-level declarations for these types. ctypedef object Int64Index_t ctypedef object DatetimeIndex_t @@ -364,6 +364,150 @@ cdef class Float64Overwrite(Float64Adjustment): data[row, col] = value +cdef class ArrayAdjustment(Adjustment): + """ + Base class for ArrayAdjustments. + + Subclasses should inherit and provide a `values` attribute and a `mutate` + method. + """ + def __repr__(self): + return ( + "%s(first_row=%d, last_row=%d," + " first_col=%d, last_col=%d, values=%s)" % ( + type(self).__name__, + self.first_row, + self.last_row, + self.first_col, + self.last_col, + asarray(self.values), + ) + ) + +cdef class Float641DArrayOverwrite(ArrayAdjustment): + """ + An adjustment that overwrites subarrays with a value for each subarray. + + Example + ------- + + >>> import numpy as np + >>> arr = np.arange(25, dtype=float).reshape(5, 5) + >>> arr + array([[ 0., 1., 2., 3., 4.], + [ 5., 6., 7., 8., 9.], + [ 10., 11., 12., 13., 14.], + [ 15., 16., 17., 18., 19.], + [ 20., 21., 22., 23., 24.]]) + >>> adj = Float641DArrayOverwrite( + ... row_start=0, + ... row_end=3, + ... column_start=0, + ... column_end=0, + ... values=np.array([1, 2, 3, 4]), + ) + >>> adj.mutate(arr) + >>> arr + array([[ 1., 1., 2., 3., 4.], + [ 2., 6., 7., 8., 9.], + [ 3., 11., 12., 13., 14.], + [ 4., 16., 17., 18., 19.], + [ 20., 21., 22., 23., 24.]]) + """ + cdef: + readonly float64_t[:] values + + def __init__(self, + int64_t first_row, + int64_t last_row, + int64_t first_col, + int64_t last_col, + float64_t[:] values): + super(Float641DArrayOverwrite, self).__init__( + first_row=first_row, + last_row=last_row, + first_col=first_col, + last_col=last_col, + ) + if last_row + 1 - first_row != len(values): + raise ValueError( + "Mismatch: got %d values for rows starting at index %d and " + "ending at index %d." % (len(values), first_row, last_row) + ) + self.values = values + + cpdef mutate(self, float64_t[:, :] data): + cdef Py_ssize_t fill_range, row, col + cdef float64_t[:] values = self.values + for col in range(self.first_col, self.last_col + 1): + for i, row in enumerate(range(self.first_row, self.last_row + 1)): + data[row, col] = values[i] + + +cdef class Datetime641DArrayOverwrite(ArrayAdjustment): + """ + An adjustment that overwrites subarrays with a value for each subarray. + + Example + ------- + + >>> import numpy as np; import pandas as pd + >>> dts = pd.date_range('2014', freq='D', periods=9, tz='UTC') + >>> arr = dts.values.reshape(3, 3) + >>> arr == np.datetime64(0, 'ns') + array([[False, False, False], + [False, False, False], + [False, False, False]], dtype=bool) + >>> adj = Datetime641DArrayOverwrite( + ... first_row=1, + ... last_row=2, + ... first_col=1, + ... last_col=2, + ... values=np.array([ + ... np.datetime64(0, 'ns'), + ... np.datetime64(1, 'ns') + ... ]) + ... ) + >>> adj.mutate(arr.view(np.int64)) + >>> arr == np.datetime64(0, 'ns') + array([[False, False, False], + [False, True, True], + [False, False, False]], dtype=bool) + >>> arr == np.datetime64(1, 'ns') + array([[False, False, False], + [False, False, False], + [False, True, True]], dtype=bool) + """ + cdef: + readonly int64_t[:] values + + def __init__(self, + int64_t first_row, + int64_t last_row, + int64_t first_col, + int64_t last_col, + object values): + super(Datetime641DArrayOverwrite, self).__init__( + first_row=first_row, + last_row=last_row, + first_col=first_col, + last_col=last_col, + ) + if last_row + 1 - first_row != len(values): + raise ValueError("Mismatch: got %d values for rows starting at" + " index %d and ending at index %d." % ( + len(values), first_row, last_row) + ) + self.values = asarray([datetime_to_int(value) for value in values]) + + cpdef mutate(self, int64_t[:, :] data): + cdef Py_ssize_t row, col + cdef int64_t[:] values = self.values + for col in range(self.first_col, self.last_col + 1): + for i, row in enumerate(range(self.first_row, self.last_row + 1)): + data[row, col] = values[i] + + cdef class Float64Add(Float64Adjustment): """ An adjustment that adds a float. @@ -458,7 +602,7 @@ cdef datetime_to_int(object datetimelike): datetimelike.dtype.name, ) - return datetimelike.astype(int) + return datetimelike.astype(int64) cdef class Datetime64Adjustment(_Int64Adjustment): diff --git a/zipline/pipeline/common.py b/zipline/pipeline/common.py index e64b1dc9..7b48e271 100644 --- a/zipline/pipeline/common.py +++ b/zipline/pipeline/common.py @@ -6,6 +6,8 @@ ANNOUNCEMENT_FIELD_NAME = 'announcement_date' CASH_FIELD_NAME = 'cash' DAYS_SINCE_PREV = 'days_since_prev' DAYS_TO_NEXT = 'days_to_next' +FISCAL_QUARTER_FIELD_NAME = 'fiscal_quarter' +FISCAL_YEAR_FIELD_NAME = 'fiscal_year' NEXT_ANNOUNCEMENT = 'next_announcement' PREVIOUS_AMOUNT = 'previous_amount' PREVIOUS_ANNOUNCEMENT = 'previous_announcement' diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 01e1e659..925faddf 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -175,8 +175,10 @@ from zipline.pipeline.common import ( from zipline.pipeline.data.dataset import DataSet, Column from zipline.pipeline.loaders.utils import ( check_data_query_args, + last_in_date_group, normalize_data_query_bounds, normalize_timestamp_to_query_time, + ffill_across_cols ) from zipline.pipeline.sentinels import NotSpecified from zipline.lib.adjusted_array import AdjustedArray, can_represent_dtype @@ -186,7 +188,7 @@ from zipline.utils.input_validation import ( ensure_timezone, optionally, ) -from zipline.utils.numpy_utils import bool_dtype, categorical_dtype +from zipline.utils.numpy_utils import bool_dtype from zipline.utils.pool import SequentialPool from zipline.utils.preprocess import preprocess @@ -869,9 +871,9 @@ def adjustments_from_deltas_with_sids(dense_dates, Parameters ---------- - dates : pd.DatetimeIndex - The dates requested by the loader. dense_dates : pd.DatetimeIndex + The dates requested by the loader. + sparse_dates : pd.DatetimeIndex The dates that were in the raw data. column_idx : int The index of the column in the dataset. @@ -1026,22 +1028,9 @@ class BlazeLoader(dict): return odo(e[predicate][colnames], pd.DataFrame, **odo_kwargs) - if checkpoints is not None: - ts = checkpoints[TS_FIELD_NAME] - checkpoints_ts = odo(ts[ts <= lower_dt].max(), pd.Timestamp) - if pd.isnull(checkpoints_ts): - materialized_checkpoints = pd.DataFrame(columns=colnames) - lower = None - else: - materialized_checkpoints = odo( - checkpoints[ts == checkpoints_ts][colnames], - pd.DataFrame, - **odo_kwargs - ) - lower = checkpoints_ts - else: - materialized_checkpoints = pd.DataFrame(columns=colnames) - lower = None + lower, materialized_checkpoints = get_materialized_checkpoints( + checkpoints, colnames, lower_dt, odo_kwargs + ) materialized_expr = self.pool.apply_async(collect_expr, (expr, lower)) materialized_deltas = ( @@ -1091,71 +1080,18 @@ class BlazeLoader(dict): ) sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True) - def last_in_date_group(df, reindex, have_sids=have_sids): - idx = dates[dates.searchsorted( - df[TS_FIELD_NAME].values.astype('datetime64[D]') - )] - if have_sids: - idx = [idx, SID_FIELD_NAME] - - last_in_group = df.drop(TS_FIELD_NAME, axis=1).groupby( - idx, - sort=False, - ).last() - - if have_sids: - last_in_group = last_in_group.unstack() - - if reindex: - if have_sids: - cols = last_in_group.columns - last_in_group = last_in_group.reindex( - index=dates, - columns=pd.MultiIndex.from_product( - (cols.levels[0], assets), - names=cols.names, - ), - ) - else: - last_in_group = last_in_group.reindex(dates) - - return last_in_group - - sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) - dense_output = last_in_date_group(sparse_output, reindex=True) - dense_output.ffill(inplace=True) - - # Fill in missing values specified by each column. This is made - # significantly more complex by the fact that we need to work around - # two pandas issues: - - # 1) When we have sids, if there are no records for a given sid for any - # dates, pandas will generate a column full of NaNs for that sid. - # This means that some of the columns in `dense_output` are now - # float instead of the intended dtype, so we have to coerce back to - # our expected type and convert NaNs into the desired missing value. - - # 2) DataFrame.ffill assumes that receiving None as a fill-value means - # that no value was passed. Consequently, there's no way to tell - # pandas to replace NaNs in an object column with None using fillna, - # so we have to roll our own instead using df.where. - for column in columns: - # Special logic for strings since `fillna` doesn't work if the - # missing value is `None`. - if column.dtype == categorical_dtype: - dense_output[column.name] = dense_output[ - column.name - ].where(pd.notnull(dense_output[column.name]), - column.missing_value) - else: - # We need to execute `fillna` before `astype` in case the - # column contains NaNs and needs to be cast to bool or int. - # This is so that the NaNs are replaced first, since pandas - # can't convert NaNs for those types. - dense_output[column.name] = dense_output[ - column.name - ].fillna(column.missing_value).astype(column.dtype) - + sparse_deltas = last_in_date_group(non_novel_deltas, + dates, + assets, + reindex=False, + have_sids=have_sids) + dense_output = last_in_date_group(sparse_output, + dates, + assets, + reindex=True, + have_sids=have_sids) + ffill_across_cols(dense_output, columns, {c.name: c.name + for c in columns}) if have_sids: adjustments_from_deltas = adjustments_from_deltas_with_sids column_view = identity @@ -1188,6 +1124,7 @@ class BlazeLoader(dict): for column_idx, column in enumerate(columns) } + global_loader = BlazeLoader.global_instance() @@ -1219,12 +1156,48 @@ def bind_expression_to_resources(expr, resources): }) +def get_materialized_checkpoints(checkpoints, colnames, lower_dt, odo_kwargs): + """ + Computes a lower bound and a DataFrame checkpoints. + + Parameters + ---------- + checkpoints : Expr + Bound blaze expression for a checkpoints table from which to get a + computed lower bound. + colnames : iterable of str + The names of the columns for which checkpoints should be computed. + lower_dt : pd.Timestamp + The lower date being queried for that serves as an upper bound for + checkpoints. + odo_kwargs : dict, optional + The extra keyword arguments to pass to ``odo``. + """ + if checkpoints is not None: + ts = checkpoints[TS_FIELD_NAME] + checkpoints_ts = odo(ts[ts <= lower_dt].max(), pd.Timestamp) + if pd.isnull(checkpoints_ts): + materialized_checkpoints = pd.DataFrame(columns=colnames) + lower = None + else: + materialized_checkpoints = odo( + checkpoints[ts == checkpoints_ts][colnames], + pd.DataFrame, + **odo_kwargs + ) + lower = checkpoints_ts + else: + materialized_checkpoints = pd.DataFrame(columns=colnames) + lower = None + return lower, materialized_checkpoints + + def ffill_query_in_range(expr, lower, upper, + checkpoints=None, odo_kwargs=None, - ts_field=TS_FIELD_NAME, - sid_field=SID_FIELD_NAME): + ts_field=TS_FIELD_NAME): """Query a blaze expression in a given time range properly forward filling from values that fall before the lower date. @@ -1236,12 +1209,13 @@ def ffill_query_in_range(expr, The lower date to query for. upper : datetime The upper date to query for. + checkpoints : Expr, optional + Bound blaze expression for a checkpoints table from which to get a + computed lower bound. odo_kwargs : dict, optional The extra keyword arguments to pass to ``odo``. ts_field : str, optional The name of the timestamp field in the given blaze expression. - sid_field : str, optional - The name of the sid field in the given blaze expression. Returns ------- @@ -1250,27 +1224,24 @@ def ffill_query_in_range(expr, start before the requested start date if a value is needed to ffill. """ odo_kwargs = odo_kwargs or {} - filtered = expr[expr[ts_field] <= lower] - computed_lower = odo( - bz.by( - filtered[sid_field], - timestamp=filtered[ts_field].max(), - ).timestamp.min(), - pd.Timestamp, - **odo_kwargs + computed_lower, materialized_checkpoints = get_materialized_checkpoints( + checkpoints, expr.fields, lower, odo_kwargs ) if pd.isnull(computed_lower): # If there is no lower date, just query for data in the date # range. It must all be null anyways. computed_lower = lower - raw = odo( - expr[ - (expr[ts_field] >= computed_lower) & - (expr[ts_field] <= upper) - ], - pd.DataFrame, - **odo_kwargs + raw = pd.concat( + [materialized_checkpoints, + odo( + expr[ + (expr[ts_field] >= computed_lower) & + (expr[ts_field] <= upper) + ], + pd.DataFrame, + **odo_kwargs + )] ) raw.loc[:, ts_field] = raw.loc[:, ts_field].astype('datetime64[ns]') return raw diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py new file mode 100644 index 00000000..3cb1064a --- /dev/null +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -0,0 +1,137 @@ +from datashape import istabular + +from .core import ( + bind_expression_to_resources, +) +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.loaders.base import PipelineLoader +from zipline.pipeline.loaders.blaze.utils import load_raw_data +from zipline.pipeline.loaders.earnings_estimates import ( + NextEarningsEstimatesLoader, + PreviousEarningsEstimatesLoader, + required_estimates_fields, + metadata_columns, +) +from zipline.pipeline.loaders.utils import ( + check_data_query_args, +) +from zipline.utils.input_validation import ensure_timezone, optionally +from zipline.utils.preprocess import preprocess + + +class BlazeEstimatesLoader(PipelineLoader): + """An abstract pipeline loader for the estimates datasets that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + columns : dict[str -> str] + A dict mapping BoundColumn names to the associated names in `expr`. + resources : dict, optional + Mapping from the loadable terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + checkpoints : Expr, optional + The expression representing checkpointed data to be used for faster + forward-filling of data from `expr`. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {FISCAL_YEAR_FIELD_NAME}: float64, + {FISCAL_QUARTER_FIELD_NAME}: float64, + {EVENT_DATE_FIELD_NAME}: datetime, + }} + + And other dataset-specific fields, where each row of the table is a + record including the sid to identify the company, the timestamp where we + learned about the announcement, and the date of the event. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + __doc__ = __doc__.format( + SID_FIELD_NAME=SID_FIELD_NAME, + TS_FIELD_NAME=TS_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME=FISCAL_YEAR_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME=FISCAL_QUARTER_FIELD_NAME, + EVENT_DATE_FIELD_NAME=EVENT_DATE_FIELD_NAME, + ) + + @preprocess(data_query_tz=optionally(ensure_timezone)) + def __init__(self, + expr, + columns, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + checkpoints=None): + + dshape = expr.dshape + if not istabular(dshape): + raise ValueError( + 'expression dshape must be tabular, got: %s' % dshape, + ) + + required_cols = list( + required_estimates_fields(columns) + ) + self._expr = bind_expression_to_resources( + expr[required_cols], + resources, + ) + self._columns = columns + self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {} + check_data_query_args(data_query_time, data_query_tz) + self._data_query_time = data_query_time + self._data_query_tz = data_query_tz + self._checkpoints = checkpoints + + def load_adjusted_array(self, columns, dates, assets, mask): + # Only load requested columns. + requested_column_names = [self._columns[column.name] + for column in columns] + raw = load_raw_data( + assets, + dates, + self._data_query_time, + self._data_query_tz, + self._expr[sorted(metadata_columns.union(requested_column_names))], + self._odo_kwargs, + checkpoints=self._checkpoints, + ) + + return self.loader( + raw, + {column.name: self._columns[column.name] for column in columns} + ).load_adjusted_array( + columns, + dates, + assets, + mask, + ) + + +class BlazeNextEstimatesLoader(BlazeEstimatesLoader): + loader = NextEarningsEstimatesLoader + + +class BlazePreviousEstimatesLoader(BlazeEstimatesLoader): + loader = PreviousEarningsEstimatesLoader diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index c71646cb..cc778d5c 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -2,21 +2,17 @@ from datashape import istabular from .core import ( bind_expression_to_resources, - ffill_query_in_range, ) +from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME, \ + EVENT_DATE_FIELD_NAME from zipline.pipeline.loaders.base import PipelineLoader +from zipline.pipeline.loaders.blaze.utils import load_raw_data from zipline.pipeline.loaders.events import ( EventsLoader, required_event_fields, ) -from zipline.pipeline.common import ( - SID_FIELD_NAME, - TS_FIELD_NAME, -) from zipline.pipeline.loaders.utils import ( check_data_query_args, - normalize_data_query_bounds, - normalize_timestamp_to_query_time, ) from zipline.utils.input_validation import ensure_timezone, optionally from zipline.utils.preprocess import preprocess @@ -30,6 +26,10 @@ class BlazeEventsLoader(PipelineLoader): ---------- expr : Expr The expression representing the data to load. + next_value_columns : dict[BoundColumn -> raw column name] + A dict mapping 'next' BoundColumns to their column names in `expr`. + previous_value_columns : dict[BoundColumn -> raw column name] + A dict mapping 'previous' BoundColumns to their column names in `expr`. resources : dict, optional Mapping from the loadable terms of ``expr`` to actual data resources. odo_kwargs : dict, optional @@ -37,9 +37,7 @@ class BlazeEventsLoader(PipelineLoader): data_query_time : time, optional The time to use for the data query cutoff. data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset : DataSet - The DataSet object for which this loader loads data. + The timezone to use for the data query cutoff. Notes ----- @@ -48,17 +46,21 @@ class BlazeEventsLoader(PipelineLoader): Dim * {{ {SID_FIELD_NAME}: int64, {TS_FIELD_NAME}: datetime, + {EVENT_DATE_FIELD_NAME}: datetime, }} And other dataset-specific fields, where each row of the table is a record including the sid to identify the company, the timestamp where we - learned about the announcement, and the date when the earnings will be z - announced. + learned about the announcement, and the event date. If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. """ + __doc__ = __doc__.format(SID_FIELD_NAME=SID_FIELD_NAME, + TS_FIELD_NAME=TS_FIELD_NAME, + EVENT_DATE_FIELD_NAME=EVENT_DATE_FIELD_NAME) + @preprocess(data_query_tz=optionally(ensure_timezone)) def __init__(self, expr, @@ -90,34 +92,12 @@ class BlazeEventsLoader(PipelineLoader): self._data_query_tz = data_query_tz def load_adjusted_array(self, columns, dates, assets, mask): - data_query_time = self._data_query_time - data_query_tz = self._data_query_tz - lower_dt, upper_dt = normalize_data_query_bounds( - dates[0], - dates[-1], - data_query_time, - data_query_tz, - ) - - raw = ffill_query_in_range( - self._expr, - lower_dt, - upper_dt, - self._odo_kwargs, - ) - sids = raw.loc[:, SID_FIELD_NAME] - raw.drop( - sids[~sids.isin(assets)].index, - inplace=True - ) - if data_query_time is not None: - normalize_timestamp_to_query_time( - raw, - data_query_time, - data_query_tz, - inplace=True, - ts_field=TS_FIELD_NAME, - ) + raw = load_raw_data(assets, + dates, + self._data_query_time, + self._data_query_tz, + self._expr, + self._odo_kwargs) return EventsLoader( events=raw, diff --git a/zipline/pipeline/loaders/blaze/utils.py b/zipline/pipeline/loaders/blaze/utils.py new file mode 100644 index 00000000..ab186c82 --- /dev/null +++ b/zipline/pipeline/loaders/blaze/utils.py @@ -0,0 +1,71 @@ +from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME +from zipline.pipeline.loaders.blaze.core import ffill_query_in_range +from zipline.pipeline.loaders.utils import ( + normalize_data_query_bounds, + normalize_timestamp_to_query_time, +) + + +def load_raw_data(assets, + dates, + data_query_time, + data_query_tz, + expr, + odo_kwargs, + checkpoints=None): + """ + Given an expression representing data to load, perform normalization and + forward-filling and return the data, materialized. Only accepts data with a + `sid` field. + + Parameters + ---------- + assets : pd.int64index + the assets to load data for. + dates : pd.datetimeindex + the simulation dates to load data for. + data_query_time : datetime.time + the time used as cutoff for new information. + data_query_tz : tzinfo + the timezone to normalize your dates to before comparing against + `time`. + expr : expr + the expression representing the data to load. + odo_kwargs : dict + extra keyword arguments to pass to odo when executing the expression. + checkpoints : expr, optional + the expression representing the checkpointed data for `expr`. + + Returns + ------- + raw : pd.dataframe + The result of computing expr and materializing the result as a + dataframe. + """ + lower_dt, upper_dt = normalize_data_query_bounds( + dates[0], + dates[-1], + data_query_time, + data_query_tz, + ) + raw = ffill_query_in_range( + expr, + lower_dt, + upper_dt, + checkpoints=checkpoints, + odo_kwargs=odo_kwargs, + ) + sids = raw[SID_FIELD_NAME] + raw.drop( + sids[~sids.isin(assets)].index, + inplace=True + ) + if data_query_time is not None: + normalize_timestamp_to_query_time( + raw, + data_query_time, + data_query_tz, + inplace=True, + ts_field=TS_FIELD_NAME, + ) + return raw diff --git a/zipline/pipeline/loaders/earnings_estimates.py b/zipline/pipeline/loaders/earnings_estimates.py new file mode 100644 index 00000000..5a794683 --- /dev/null +++ b/zipline/pipeline/loaders/earnings_estimates.py @@ -0,0 +1,647 @@ +from abc import abstractmethod, abstractproperty +from collections import defaultdict + +import numpy as np +import pandas as pd +from six import viewvalues +from toolz import groupby + +from zipline.lib.adjusted_array import AdjustedArray +from zipline.lib.adjustment import ( + Datetime641DArrayOverwrite, + Datetime64Overwrite, + Float641DArrayOverwrite, + Float64Overwrite, +) + +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.loaders.base import PipelineLoader +from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype +from zipline.pipeline.loaders.utils import ( + ffill_across_cols, + last_in_date_group +) + + +INVALID_NUM_QTRS_MESSAGE = "Passed invalid number of quarters %s; " \ + "must pass a number of quarters >= 0" +NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' +NEXT_FISCAL_YEAR = 'next_fiscal_year' +NORMALIZED_QUARTERS = 'normalized_quarters' +PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' +PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' +SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters' +SIMULATION_DATES = 'dates' + + +def normalize_quarters(years, quarters): + return years * 4 + quarters - 1 + + +def split_normalized_quarters(normalized_quarters): + years = normalized_quarters // 4 + quarters = normalized_quarters % 4 + return years, quarters + 1 + + +# These metadata columns are used to align event indexers. +metadata_columns = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, +}) + + +def required_estimates_fields(columns): + """ + Compute the set of resource columns required to serve + `columns`. + """ + # We also expect any of the field names that our loadable columns + # are mapped to. + return metadata_columns.union(viewvalues(columns)) + + +def validate_column_specs(events, columns): + """ + Verify that the columns of ``events`` can be used by a + EarningsEstimatesLoader to serve the BoundColumns described by + `columns`. + """ + required = required_estimates_fields(columns) + received = set(events.columns) + missing = required - received + if missing: + raise ValueError( + "EarningsEstimatesLoader missing required columns {missing}.\n" + "Got Columns: {received}\n" + "Expected Columns: {required}".format( + missing=sorted(missing), + received=sorted(received), + required=sorted(required), + ) + ) + + +class EarningsEstimatesLoader(PipelineLoader): + """ + An abstract pipeline loader for estimates data that can load data a + variable number of quarters forwards/backwards from calendar dates + depending on the `num_announcements` attribute of the columns' dataset. + + Parameters + ---------- + estimates : pd.DataFrame + The raw estimates data. + ``estimates`` must contain at least 5 columns: + sid : int64 + The asset id associated with each estimate. + + event_date : datetime64[ns] + The date on which the event that the estimate is for will/has + occurred.. + + timestamp : datetime64[ns] + The date on which we learned about the estimate. + + fiscal_quarter : int64 + The quarter during which the event has/will occur. + + fiscal_year : int64 + The year during which the event has/will occur. + + name_map : dict[str -> str] + A map of names of BoundColumns that this loader will load to the + names of the corresponding columns in `events`. + """ + def __init__(self, + estimates, + name_map): + validate_column_specs( + estimates, + name_map + ) + + self.estimates = estimates[ + estimates[EVENT_DATE_FIELD_NAME].notnull() & + estimates[FISCAL_QUARTER_FIELD_NAME].notnull() & + estimates[FISCAL_YEAR_FIELD_NAME].notnull() + ] + self.estimates[NORMALIZED_QUARTERS] = normalize_quarters( + self.estimates[FISCAL_YEAR_FIELD_NAME], + self.estimates[FISCAL_QUARTER_FIELD_NAME], + ) + + self.array_overwrites_dict = { + datetime64ns_dtype: Datetime641DArrayOverwrite, + float64_dtype: Float641DArrayOverwrite, + } + self.scalar_overwrites_dict = { + datetime64ns_dtype: Datetime64Overwrite, + float64_dtype: Float64Overwrite, + } + + self.name_map = name_map + + @abstractmethod + def get_zeroth_quarter_idx(self, num_announcements, last, dates): + raise NotImplementedError('get_zeroth_quarter_idx') + + @abstractmethod + def get_shifted_qtrs(self, zero_qtrs, num_announcements): + raise NotImplementedError('get_shifted_qtrs') + + @abstractmethod + def create_overwrite_for_estimate(self, + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx): + raise NotImplementedError('create_overwrite_for_estimate') + + @abstractproperty + def searchsorted_side(self): + return NotImplementedError('searchsorted_side') + + def get_requested_quarter_data(self, + zero_qtr_data, + zeroth_quarter_idx, + stacked_last_per_qtr, + num_announcements, + dates): + """ + Selects the requested data for each date. + + Parameters + ---------- + zero_qtr_data : pd.DataFrame + The 'time zero' data for each calendar date per sid. + zeroth_quarter_idx : pd.Index + An index of calendar dates, sid, and normalized quarters, for only + the rows that have a next or previous earnings estimate. + stacked_last_per_qtr : pd.DataFrame + The latest estimate known with the dates, normalized quarter, and + sid as the index. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + + Returns + -------- + requested_qtr_data : pd.DataFrame + The DataFrame with the latest values for the requested quarter + for all columns; `dates` are the index and columns are a MultiIndex + with sids at the top level and the dataset columns on the bottom. + """ + zero_qtr_data_idx = zero_qtr_data.index + requested_qtr_idx = pd.MultiIndex.from_arrays( + [ + zero_qtr_data_idx.get_level_values(0), + zero_qtr_data_idx.get_level_values(1), + self.get_shifted_qtrs( + zeroth_quarter_idx.get_level_values( + NORMALIZED_QUARTERS, + ), + num_announcements, + ), + ], + names=[ + zero_qtr_data_idx.names[0], + zero_qtr_data_idx.names[1], + SHIFTED_NORMALIZED_QTRS, + ], + ) + requested_qtr_data = stacked_last_per_qtr.loc[requested_qtr_idx] + requested_qtr_data = requested_qtr_data.reset_index( + SHIFTED_NORMALIZED_QTRS, + ) + # Calculate the actual year/quarter being requested and add those in + # as columns. + (requested_qtr_data[FISCAL_YEAR_FIELD_NAME], + requested_qtr_data[FISCAL_QUARTER_FIELD_NAME]) = \ + split_normalized_quarters( + requested_qtr_data[SHIFTED_NORMALIZED_QTRS] + ) + # Once we're left with just dates as the index, we can reindex by all + # dates so that we have a value for each calendar date. + return requested_qtr_data.unstack(SID_FIELD_NAME).reindex(dates) + + def get_adjustments(self, + zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets, + columns): + """ + Creates an AdjustedArray from the given estimates data for the given + dates. + + Parameters + ---------- + zero_qtr_data : pd.DataFrame + The 'time zero' data for each calendar date per sid. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + assets : pd.Int64Index + An index of all the assets from the raw data. + columns : list of BoundColumn + The columns for which adjustments need to be calculated. + + Returns + ------- + adjusted_array : AdjustedArray + The array of data and overwrites for the given column. + """ + col_to_overwrites = defaultdict(dict) + zero_qtr_data.sort_index(inplace=True) + # Here we want to get the LAST record from each group of records + # corresponding to a single quarter. This is to ensure that we select + # the most up-to-date event date in case the event date changes. + quarter_shifts = zero_qtr_data.groupby( + level=[SID_FIELD_NAME, NORMALIZED_QUARTERS] + ).nth(-1) + + sid_to_idx = dict(zip(assets, range(len(assets)))) + + def collect_adjustments(group): + next_qtr_start_indices = dates.searchsorted( + group[EVENT_DATE_FIELD_NAME].values, + side=self.searchsorted_side, + ) + sid = int(group.name) + qtrs_with_estimates = group.index.get_level_values( + NORMALIZED_QUARTERS + ).values + for idx in next_qtr_start_indices: + if 0 < idx < len(dates): + # Only add adjustments if the next quarter starts somewhere + # in our date index for this sid. Our 'next' quarter can + # never start at index 0; a starting index of 0 means that + # the next quarter's event date was NaT. + self.create_overwrite_for_quarter( + col_to_overwrites, + idx, + last_per_qtr, + qtrs_with_estimates, + requested_qtr_data, + sid, + sid_to_idx[sid], + columns, + ) + + quarter_shifts.groupby(level=SID_FIELD_NAME).apply(collect_adjustments) + return col_to_overwrites + + def create_overwrite_for_quarter(self, + col_to_overwrites, + next_qtr_start_idx, + last_per_qtr, + quarters_with_estimates_for_sid, + requested_qtr_data, + sid, + sid_idx, + columns): + """ + Add entries to the dictionary of columns to adjustments for the given + sid and the given quarter. + + Parameters + ---------- + col_to_overwrites : dict [column_name -> list of ArrayAdjustment] + A dictionary mapping column names to all overwrites for those + columns. + next_qtr_start_idx : int + The index of the first day of the next quarter in the calendar + dates. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter; this + is particularly useful for getting adjustments for 'next' + estimates. + quarters_with_estimates_for_sid : np.array + An array of all quarters for which there are estimates for the + given sid. + sid : int + The sid for which to create overwrites. + sid_idx : int + The index of the sid in `assets`. + columns : list of BoundColumn + The columns for which to create overwrites. + """ + + # Find the quarter being requested in the quarter we're + # crossing into. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS, sid, + ].iloc[next_qtr_start_idx] + for col in columns: + column_name = self.name_map[col.name] + # If there are estimates for the requested quarter, + # overwrite all values going up to the starting index of + # that quarter with estimates for that quarter. + if requested_quarter in quarters_with_estimates_for_sid: + col_to_overwrites[column_name][next_qtr_start_idx] = [ + self.create_overwrite_for_estimate( + col, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx + ), + ] + # There are no estimates for the quarter. Overwrite all + # values going up to the starting index of that quarter + # with the missing value for this column. + else: + col_to_overwrites[column_name][next_qtr_start_idx] = [ + self.overwrite_with_null( + col, + last_per_qtr.index, + next_qtr_start_idx, + sid_idx + ), + ] + + def overwrite_with_null(self, + column, + dates, + next_qtr_start_idx, + sid_idx): + return self.scalar_overwrites_dict[column.dtype]( + 0, + next_qtr_start_idx - 1, + sid_idx, + sid_idx, + column.missing_value + ) + + def load_adjusted_array(self, columns, dates, assets, mask): + # Separate out getting the columns' datasets and the datasets' + # num_announcements attributes to ensure that we're catching the right + # AttributeError. + col_to_datasets = {col: col.dataset for col in columns} + try: + groups = groupby(lambda col: + col_to_datasets[col].num_announcements, + col_to_datasets) + except AttributeError: + raise AttributeError("Datasets loaded via the " + "EarningsEstimatesLoader must define a " + "`num_announcements` attribute that defines " + "how many quarters out the loader should load" + " the data relative to `dates`.") + if any(num_qtr < 0 for num_qtr in groups): + raise ValueError( + INVALID_NUM_QTRS_MESSAGE % ','.join( + str(qtr) for qtr in groups if qtr < 0 + ) + + ) + out = {} + # To optimize performance, only work below on assets that are + # actually in the raw data. + assets_with_data = set(assets) & set(self.estimates[SID_FIELD_NAME]) + last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr( + assets_with_data, + columns, + dates + ) + # Determine which quarter is immediately next/previous for each + # date. + zeroth_quarter_idx = self.get_zeroth_quarter_idx(stacked_last_per_qtr) + zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx] + + for num_announcements, columns in groups.items(): + requested_qtr_data = self.get_requested_quarter_data( + zero_qtr_data, + zeroth_quarter_idx, + stacked_last_per_qtr, + num_announcements, + dates, + ) + + # Calculate all adjustments for the given quarter and accumulate + # them for each column. + col_to_adjustments = self.get_adjustments(zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets_with_data, + columns) + + # Lookup the asset indexer once, this is so we can reindex + # the assets returned into the assets requested for each column. + # This depends on the fact that our column multiindex has the same + # sids for each field. This allows us to do the lookup once on + # level 1 instead of doing the lookup each time per value in + # level 0. + asset_indexer = assets.get_indexer_for( + requested_qtr_data.columns.levels[1], + ) + for col in columns: + column_name = self.name_map[col.name] + # allocate the empty output with the correct missing value + output_array = np.full( + (len(dates), len(assets)), + col.missing_value, + dtype=col.dtype, + ) + # overwrite the missing value with values from the computed + # data + output_array[ + :, + asset_indexer, + ] = requested_qtr_data[column_name].values + + out[col] = AdjustedArray( + output_array, + mask, + dict(col_to_adjustments[column_name]), + col.missing_value, + ) + return out + + def get_last_data_per_qtr(self, assets_with_data, columns, dates): + """ + Determine the last piece of information we know for each column on each + date in the index for each sid and quarter. + + Parameters + ---------- + assets_with_data : pd.Index + Index of all assets that appear in the raw data given to the + loader. + columns : iterable of BoundColumn + The columns that need to be loaded from the raw data. + dates : pd.DatetimeIndex + The calendar of dates for which data should be loaded. + + Returns + ------- + stacked_last_per_qtr : pd.DataFrame + A DataFrame indexed by [dates, sid, normalized_quarters] that has + the latest information for each row of the index, sorted by event + date. + last_per_qtr : pd.DataFrame + A DataFrame with columns that are a MultiIndex of [ + self.estimates.columns, normalized_quarters, sid]. + """ + # Get a DataFrame indexed by date with a MultiIndex of columns of [ + # self.estimates.columns, normalized_quarters, sid], where each cell + # contains the latest data for that day. + last_per_qtr = last_in_date_group( + self.estimates, + dates, + assets_with_data, + reindex=True, + extra_groupers=[NORMALIZED_QUARTERS], + ) + # Forward fill values for each quarter/sid/dataset column. + ffill_across_cols(last_per_qtr, columns, self.name_map) + # Stack quarter and sid into the index. + stacked_last_per_qtr = last_per_qtr.stack( + [SID_FIELD_NAME, NORMALIZED_QUARTERS], + ) + # Set date index name for ease of reference + stacked_last_per_qtr.index.set_names( + SIMULATION_DATES, + level=0, + inplace=True, + ) + stacked_last_per_qtr = stacked_last_per_qtr.sort( + EVENT_DATE_FIELD_NAME, + ) + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] = pd.to_datetime( + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] + ) + return last_per_qtr, stacked_last_per_qtr + + +class NextEarningsEstimatesLoader(EarningsEstimatesLoader): + searchsorted_side = 'right' + + def create_overwrite_for_estimate(self, + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx): + return self.array_overwrites_dict[column.dtype]( + 0, + # overwrite thru last qtr + next_qtr_start_idx - 1, + sid_idx, + sid_idx, + last_per_qtr[ + column_name, + requested_quarter, + sid, + ].values[:next_qtr_start_idx], + ) + + def get_shifted_qtrs(self, zero_qtrs, num_announcements): + return zero_qtrs + (num_announcements - 1) + + def get_zeroth_quarter_idx(self, stacked_last_per_qtr): + """ + Filters for releases that are on or after each simulation date and + determines the next quarter by picking out the upcoming release for + each date in the index. + + Parameters + ---------- + stacked_last_per_qtr : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters with each row being the latest estimate for the row's + index values, sorted by event date. + + Returns + ------- + next_releases_per_date_index : pd.MultiIndex + An index of calendar dates, sid, and normalized quarters, for only + the rows that have a next event. + """ + next_releases_per_date = stacked_last_per_qtr.loc[ + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >= + stacked_last_per_qtr.index.get_level_values(SIMULATION_DATES) + ].groupby( + level=[SIMULATION_DATES, SID_FIELD_NAME], + as_index=False, + # Here we take advantage of the fact that `stacked_last_per_qtr` is + # sorted by event date. + ).nth(0) + return next_releases_per_date.index + + +class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): + searchsorted_side = 'left' + + def create_overwrite_for_estimate(self, + column, + column_name, + dates, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx): + return self.overwrite_with_null( + column, + dates, + next_qtr_start_idx, + sid_idx, + ) + + def get_shifted_qtrs(self, zero_qtrs, num_announcements): + return zero_qtrs - (num_announcements - 1) + + def get_zeroth_quarter_idx(self, stacked_last_per_qtr): + """ + Filters for releases that are on or after each simulation date and + determines the previous quarter by picking out the most recent + release relative to each date in the index. + + Parameters + ---------- + stacked_last_per_qtr : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters with each row being the latest estimate for the row's + index values, sorted by event date. + + Returns + ------- + previous_releases_per_date_index : pd.MultiIndex + An index of calendar dates, sid, and normalized quarters, for only + the rows that have a previous event. + """ + previous_releases_per_date = stacked_last_per_qtr.loc[ + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <= + stacked_last_per_qtr.index.get_level_values(SIMULATION_DATES) + ].groupby( + level=[SIMULATION_DATES, SID_FIELD_NAME], + as_index=False, + # Here we take advantage of the fact that `stacked_last_per_qtr` is + # sorted by event date. + ).nth(-1) + return previous_releases_per_date.index diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index 645143d8..4249d145 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -5,12 +5,12 @@ from six import viewvalues from toolz import groupby, merge from .base import PipelineLoader -from .frame import DataFrameLoader from zipline.pipeline.common import ( EVENT_DATE_FIELD_NAME, SID_FIELD_NAME, TS_FIELD_NAME, ) +from zipline.pipeline.loaders.frame import DataFrameLoader from zipline.pipeline.loaders.utils import ( next_event_indexer, previous_event_indexer, @@ -41,16 +41,8 @@ def validate_column_specs(events, next_value_columns, previous_value_columns): serve the BoundColumns described by ``next_value_columns`` and ``previous_value_columns``. """ - required = { - TS_FIELD_NAME, - SID_FIELD_NAME, - EVENT_DATE_FIELD_NAME, - }.union( - # We also expect any of the field names that our loadable columns - # are mapped to. - viewvalues(next_value_columns), - viewvalues(previous_value_columns), - ) + required = required_event_fields(next_value_columns, + previous_value_columns) received = set(events.columns) missing = required - received if missing: diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 77a9f447..028da74c 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -2,6 +2,8 @@ import datetime import numpy as np import pandas as pd +from zipline.pipeline.common import TS_FIELD_NAME, SID_FIELD_NAME +from zipline.utils.numpy_utils import categorical_dtype from zipline.utils.pandas_utils import mask_between_time @@ -272,3 +274,124 @@ def check_data_query_args(data_query_time, data_query_tz): data_query_tz, ), ) + + +def last_in_date_group(df, + dates, + assets, + reindex=True, + have_sids=True, + extra_groupers=[]): + """ + Determine the last piece of information known on each date in the date + index for each group. + + Parameters + ---------- + df : pd.DataFrame + The DataFrame containing the data to be grouped. + dates : pd.DatetimeIndex + The dates to use for grouping and reindexing. + assets : pd.Int64Index + The assets that should be included in the column multiindex. + reindex : bool + Whether or not the DataFrame should be reindexed against the date + index. This will add back any dates to the index that were grouped + away. + have_sids : bool + Whether or not the DataFrame has sids. If it does, they will be used + in the groupby. + extra_groupers : list of str + Any extra field names that should be included in the groupby. + + Returns + ------- + last_in_group : pd.DataFrame + A DataFrame with dates as the index and fields used in the groupby as + levels of a multiindex of columns. + + """ + idx = [dates[dates.searchsorted( + df[TS_FIELD_NAME].values.astype('datetime64[D]') + )]] + if have_sids: + idx += [SID_FIELD_NAME] + idx += extra_groupers + + last_in_group = df.drop(TS_FIELD_NAME, axis=1).groupby( + idx, + sort=False, + ).last() + + # For the number of things that we're grouping by (except TS), unstack + # the df. Done this way because of an unresolved pandas bug whereby + # passing a list of levels with mixed dtypes to unstack causes the + # resulting DataFrame to have all object-type columns. + for _ in range(len(idx) - 1): + last_in_group = last_in_group.unstack(-1) + + if reindex: + if have_sids: + cols = last_in_group.columns + last_in_group = last_in_group.reindex( + index=dates, + columns=pd.MultiIndex.from_product( + tuple(cols.levels[0:len(extra_groupers) + 1]) + (assets,), + names=cols.names, + ), + ) + else: + last_in_group = last_in_group.reindex(dates) + + return last_in_group + + +def ffill_across_cols(df, columns, name_map): + """ + Forward fill values in a DataFrame with special logic to handle cases + that pd.DataFrame.ffill cannot and cast columns to appropriate types. + + Parameters + ---------- + df : pd.DataFrame + The DataFrame to do forward-filling on. + columns : list of BoundColumn + The BoundColumns that correspond to columns in the DataFrame to which + special filling and/or casting logic should be applied. + name_map: map of string -> string + Mapping from the name of each BoundColumn to the associated column + name in `df`. + """ + df.ffill(inplace=True) + + # Fill in missing values specified by each column. This is made + # significantly more complex by the fact that we need to work around + # two pandas issues: + + # 1) When we have sids, if there are no records for a given sid for any + # dates, pandas will generate a column full of NaNs for that sid. + # This means that some of the columns in `dense_output` are now + # float instead of the intended dtype, so we have to coerce back to + # our expected type and convert NaNs into the desired missing value. + + # 2) DataFrame.ffill assumes that receiving None as a fill-value means + # that no value was passed. Consequently, there's no way to tell + # pandas to replace NaNs in an object column with None using fillna, + # so we have to roll our own instead using df.where. + for column in columns: + column_name = name_map[column.name] + # Special logic for strings since `fillna` doesn't work if the + # missing value is `None`. + if column.dtype == categorical_dtype: + df[column_name] = df[ + column.name + ].where(pd.notnull(df[column_name]), + column.missing_value) + else: + # We need to execute `fillna` before `astype` in case the + # column contains NaNs and needs to be cast to bool or int. + # This is so that the NaNs are replaced first, since pandas + # can't convert NaNs for those types. + df[column_name] = df[ + column_name + ].fillna(column.missing_value).astype(column.dtype)