diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index 56e7827b..4f7b8366 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -4,7 +4,6 @@ from nose.tools import assert_true from nose_parameterized import parameterized import numpy as np import pandas as pd -from pandas.util.testing import assert_frame_equal, assert_series_equal from toolz import merge from zipline.pipeline import SimplePipelineEngine, Pipeline, CustomFactor @@ -21,11 +20,11 @@ from zipline.pipeline.loaders.blaze.estimates import ( BlazeNextEstimatesLoader, BlazePreviousEstimatesLoader ) -from zipline.pipeline.loaders.quarter_estimates import ( +from zipline.pipeline.loaders.earnings_estimates import ( INVALID_NUM_QTRS_MESSAGE, - NextQuartersEstimatesLoader, + NextEarningsEstimatesLoader, normalize_quarters, - PreviousQuartersEstimatesLoader, + PreviousEarningsEstimatesLoader, split_normalized_quarters, ) from zipline.testing.fixtures import ( @@ -78,7 +77,7 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): # Short window defined in order for test to run faster. START_DATE = pd.Timestamp('2014-12-28') - END_DATE = pd.Timestamp('2015-02-03') + END_DATE = pd.Timestamp('2015-02-04') @classmethod def make_loader(cls, events, columns): @@ -88,10 +87,14 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): def make_events(cls): raise NotImplementedError('make_events') + @classmethod + def get_sids(cls): + return cls.events[SID_FIELD_NAME].unique() + @classmethod def init_class_fixtures(cls): cls.events = cls.make_events() - cls.sids = cls.events[SID_FIELD_NAME].unique() + cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids() cls.columns = { Estimates.event_date: 'event_date', Estimates.fiscal_quarter: 'fiscal_quarter', @@ -101,9 +104,6 @@ class WithEstimates(WithTradingSessions, WithAssetFinder): cls.loader = cls.make_loader(cls.events, {column.name: val for column, val in cls.columns.items()}) - cls.ASSET_FINDER_EQUITY_SIDS = list( - cls.events[SID_FIELD_NAME].unique() - ) cls.ASSET_FINDER_EQUITY_SYMBOLS = [ 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS ] @@ -190,7 +190,7 @@ class PreviousWithWrongNumQuarters(WithWrongLoaderDefinition, """ @classmethod def make_loader(cls, events, columns): - return PreviousQuartersEstimatesLoader(events, columns) + return PreviousEarningsEstimatesLoader(events, columns) class NextWithWrongNumQuarters(WithWrongLoaderDefinition, @@ -201,7 +201,7 @@ class NextWithWrongNumQuarters(WithWrongLoaderDefinition, """ @classmethod def make_loader(cls, events, columns): - return NextQuartersEstimatesLoader(events, columns) + return NextEarningsEstimatesLoader(events, columns) class WithEstimatesTimeZero(WithEstimates): @@ -234,24 +234,27 @@ class WithEstimatesTimeZero(WithEstimates): Tests that we get the right 'time zero' value on each day for each sid and for each column. """ + # Shorter date range for performance + END_DATE = pd.Timestamp('2015-01-28') + q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'), - pd.Timestamp('2015-01-08'), - pd.Timestamp('2015-01-12')] - q2_knowledge_dates = [pd.Timestamp('2015-01-16'), + pd.Timestamp('2015-01-07'), + pd.Timestamp('2015-01-11')] + q2_knowledge_dates = [pd.Timestamp('2015-01-14'), + pd.Timestamp('2015-01-17'), pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-24'), - pd.Timestamp('2015-01-28')] + pd.Timestamp('2015-01-23')] # We want to model the possibility of an estimate predicting a release date # that doesn't match the actual release. This could be done by dynamically # generating more combinations with different release dates, but that # significantly increases the amount of time it takes to run the tests. # These hard-coded cases are sufficient to know that we can update our # beliefs when we get new information. - q1_release_dates = [pd.Timestamp('2015-01-15'), - pd.Timestamp('2015-01-16')] # One day late - q2_release_dates = [pd.Timestamp('2015-01-30'), # One day early - pd.Timestamp('2015-01-31')] + q1_release_dates = [pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-14')] # One day late + q2_release_dates = [pd.Timestamp('2015-01-25'), # One day early + pd.Timestamp('2015-01-26')] @classmethod def make_events(cls): @@ -300,8 +303,15 @@ class WithEstimatesTimeZero(WithEstimates): q2e2, sid)) sid_releases.append(cls.create_releases_df(sid)) + return pd.concat(sid_estimates + + sid_releases).reset_index(drop=True) - return pd.concat(sid_estimates + sid_releases).reset_index(drop=True) + @classmethod + def get_sids(cls): + sids = cls.events[SID_FIELD_NAME].unique() + # Tack on an extra sid to make sure that sids with no data are + # included but have all-null columns. + return list(sids) + [max(sids) + 1] @classmethod def create_releases_df(cls, sid): @@ -309,10 +319,10 @@ class WithEstimatesTimeZero(WithEstimates): # ranges in order to reduce the number of dates we need to iterate # through when testing. return pd.DataFrame({ - TS_FIELD_NAME: [pd.Timestamp('2015-01-15'), - pd.Timestamp('2015-01-31')], - EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-15'), - pd.Timestamp('2015-01-31')], + TS_FIELD_NAME: [pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-26')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-13'), + pd.Timestamp('2015-01-26')], 'estimate': [0.5, 0.8], FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0], FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0], @@ -337,8 +347,6 @@ class WithEstimatesTimeZero(WithEstimates): @classmethod def init_class_fixtures(cls): - # Must be generated before call to super since super uses `events`. - cls.events = cls.make_events() super(WithEstimatesTimeZero, cls).init_class_fixtures() def get_expected_estimate(self, @@ -356,58 +364,42 @@ class WithEstimatesTimeZero(WithEstimates): ) results = engine.run_pipeline( Pipeline({c.name: c.latest for c in dataset.columns}), - start_date=self.trading_days[0], - end_date=self.trading_days[-1], + start_date=self.trading_days[1], + end_date=self.trading_days[-2], ) - for sid in self.sids: + for sid in self.ASSET_FINDER_EQUITY_SIDS: sid_estimates = results.xs(sid, level=1) - ts_sorted_estimates = self.events[ - self.events[SID_FIELD_NAME] == sid - ].sort(TS_FIELD_NAME) - for i, date in enumerate(sid_estimates.index): - comparable_date = date.tz_localize(None) - # Filter out estimates we don't know about yet. - ts_eligible_estimates = ts_sorted_estimates[ - ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date + # Separate assertion for all-null DataFrame to avoid setting + # column dtypes on `all_expected`. + if sid == max(self.ASSET_FINDER_EQUITY_SIDS): + assert_true(sid_estimates.isnull().all().all()) + else: + ts_sorted_estimates = self.events[ + self.events[SID_FIELD_NAME] == sid + ].sort(TS_FIELD_NAME) + q1_knowledge = ts_sorted_estimates[ + ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 ] - # If there are estimates we know about: - if not ts_eligible_estimates.empty: - # Determine the last piece of information we know about - # for q1 and q2. This takes advantage of the fact that we - # only have 2 quarters in the test data. - q1_knowledge = ts_eligible_estimates[ - ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 - ] - q2_knowledge = ts_eligible_estimates[ - ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 - ] - expected_estimate = self.get_expected_estimate( - q1_knowledge, - q2_knowledge, - comparable_date, - ) - # Have to explicitly check for None because - # `expected_estimate` might be a DataFrame. - if expected_estimate is not None: - assert_series_equal( - sid_estimates.iloc[i], - expected_estimate[sid_estimates.columns], - check_names=False - ) - else: - # There are no eligible 'next'/'previous' estimates on - # this day; everything should be null. - assert_true(sid_estimates.iloc[i].isnull().all()) - else: - # We don't know about any estimates on this day; - # everything should be null. - assert_true(sid_estimates.iloc[i].isnull().all()) + q2_knowledge = ts_sorted_estimates[ + ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 + ] + all_expected = pd.concat( + [self.get_expected_estimate( + q1_knowledge[q1_knowledge[TS_FIELD_NAME] <= + date.tz_localize(None)], + q2_knowledge[q2_knowledge[TS_FIELD_NAME] <= + date.tz_localize(None)], + date.tz_localize(None), + ).set_index([[date]]) for date in sid_estimates.index], + axis=0) + assert_equal(all_expected[sid_estimates.columns], + sid_estimates) class NextEstimate(WithEstimatesTimeZero, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): - return NextQuartersEstimatesLoader(events, columns) + return NextEarningsEstimatesLoader(events, columns) def get_expected_estimate(self, q1_knowledge, @@ -419,15 +411,16 @@ class NextEstimate(WithEstimatesTimeZero, ZiplineTestCase): if (not q1_knowledge.empty and q1_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] >= comparable_date): - return q1_knowledge.iloc[-1] + return q1_knowledge.iloc[-1:] # If q1 has already happened or we don't know about it # yet and our latest knowledge indicates that q2 hasn't # happened yet, then that's the estimate we want to use. elif (not q2_knowledge.empty and q2_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] >= comparable_date): - return q2_knowledge.iloc[-1] - return None + return q2_knowledge.iloc[-1:] + return pd.DataFrame(columns=q1_knowledge.columns, + index=[comparable_date]) class BlazeNextEstimateLoaderTestCase(NextEstimate): @@ -446,7 +439,7 @@ class BlazeNextEstimateLoaderTestCase(NextEstimate): class PreviousEstimate(WithEstimatesTimeZero, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): - return PreviousQuartersEstimatesLoader(events, columns) + return PreviousEarningsEstimatesLoader(events, columns) def get_expected_estimate(self, q1_knowledge, @@ -460,12 +453,13 @@ class PreviousEstimate(WithEstimatesTimeZero, ZiplineTestCase): if (not q2_knowledge.empty and q2_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] <= comparable_date): - return q2_knowledge.iloc[-1] + return q2_knowledge.iloc[-1:] elif (not q1_knowledge.empty and q1_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] <= comparable_date): - return q1_knowledge.iloc[-1] - return None + return q1_knowledge.iloc[-1:] + return pd.DataFrame(columns=q1_knowledge.columns, + index=[comparable_date]) class BlazePreviousEstimateLoaderTestCase(PreviousEstimate): @@ -572,8 +566,8 @@ class WithEstimateMultipleQuarters(WithEstimates): # quarters out for each of the dataset columns. assert_equal(sorted(np.array(q1_columns + q2_columns)), sorted(results.columns.values)) - assert_frame_equal(self.expected_out.sort(axis=1), - results.xs(0, level=1).sort(axis=1)) + assert_equal(self.expected_out.sort(axis=1), + results.xs(0, level=1).sort(axis=1)) class NextEstimateMultipleQuarters( @@ -581,17 +575,19 @@ class NextEstimateMultipleQuarters( ): @classmethod def make_loader(cls, events, columns): - return NextQuartersEstimatesLoader(events, columns) + return NextEarningsEstimatesLoader(events, columns) @classmethod def fill_expected_out(cls, expected): # Fill columns for 1 Q out for raw_name in cls.columns.values(): - expected[raw_name + '1'].loc[ - pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-11') + expected.loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-11'), + raw_name + '1' ] = cls.events[raw_name].iloc[0] - expected[raw_name + '1'].loc[ - pd.Timestamp('2015-01-11'):pd.Timestamp('2015-01-20') + expected.loc[ + pd.Timestamp('2015-01-11'):pd.Timestamp('2015-01-20'), + raw_name + '1' ] = cls.events[raw_name].iloc[1] # Fill columns for 2 Q out @@ -599,19 +595,23 @@ class NextEstimateMultipleQuarters( # Q1's event happens; after Q1's event, we know 1 Q out but not 2 Qs # out. for col_name in ['estimate', 'event_date']: - expected[col_name + '2'].loc[ - pd.Timestamp('2015-01-06'):pd.Timestamp('2015-01-10') + expected.loc[ + pd.Timestamp('2015-01-06'):pd.Timestamp('2015-01-10'), + col_name + '2' ] = cls.events[col_name].iloc[1] # But we know what FQ and FY we'd need in both Q1 and Q2 # because we know which FQ is next and can calculate from there - expected[FISCAL_QUARTER_FIELD_NAME + '2'].loc[ - pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-09') + expected.loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-09'), + FISCAL_QUARTER_FIELD_NAME + '2' ] = 2 - expected[FISCAL_QUARTER_FIELD_NAME + '2'].loc[ - pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20') + expected.loc[ + pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20'), + FISCAL_QUARTER_FIELD_NAME + '2' ] = 3 - expected[FISCAL_YEAR_FIELD_NAME + '2'].loc[ - pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-20') + expected.loc[ + pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-20'), + FISCAL_YEAR_FIELD_NAME + '2' ] = 2015 return expected @@ -624,7 +624,7 @@ class PreviousEstimateMultipleQuarters( @classmethod def make_loader(cls, events, columns): - return PreviousQuartersEstimatesLoader(events, columns) + return PreviousEarningsEstimatesLoader(events, columns) @classmethod def fill_expected_out(cls, expected): @@ -804,7 +804,7 @@ class WithEstimateWindows(WithEstimates): class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): - return PreviousQuartersEstimatesLoader(events, columns) + return PreviousEarningsEstimatesLoader(events, columns) @classmethod def make_expected_timelines(cls): @@ -867,7 +867,7 @@ class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase): class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase): @classmethod def make_loader(cls, events, columns): - return NextQuartersEstimatesLoader(events, columns) + return NextEarningsEstimatesLoader(events, columns) @classmethod def make_expected_timelines(cls): diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 61782c31..d5c3e40b 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -1028,7 +1028,6 @@ class BlazeLoader(dict): return odo(e[predicate][colnames], pd.DataFrame, **odo_kwargs) - lower, materialized_checkpoints = get_materialized_checkpoints( checkpoints, colnames, lower_dt, odo_kwargs ) @@ -1158,6 +1157,22 @@ def bind_expression_to_resources(expr, resources): def get_materialized_checkpoints(checkpoints, colnames, lower_dt, odo_kwargs): + """ + Computes a lower bound and a DataFrame checkpoints. + + Parameters + ---------- + checkpoints : Expr + Bound blaze expression for a checkpoints table from which to get a + computed lower bound. + colnames : iterable of str + The names of the columns for which checkpoints should be computed. + lower_dt : pd.Timestamp + The lower date being queried for that serves as an upper bound for + checkpoints. + odo_kwargs : dict, optional + The extra keyword arguments to pass to ``odo``. + """ if checkpoints is not None: ts = checkpoints[TS_FIELD_NAME] checkpoints_ts = odo(ts[ts <= lower_dt].max(), pd.Timestamp) @@ -1194,12 +1209,13 @@ def ffill_query_in_range(expr, The lower date to query for. upper : datetime The upper date to query for. + checkpoints : Expr, optional + Bound blaze expression for a checkpoints table from which to get a + computed lower bound. odo_kwargs : dict, optional The extra keyword arguments to pass to ``odo``. ts_field : str, optional The name of the timestamp field in the given blaze expression. - sid_field : str, optional - The name of the sid field in the given blaze expression. Returns ------- diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py index b991c62c..b63c58a6 100644 --- a/zipline/pipeline/loaders/blaze/estimates.py +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -3,11 +3,18 @@ from datashape import istabular from .core import ( bind_expression_to_resources, ) +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.blaze.utils import load_raw_data -from zipline.pipeline.loaders.quarter_estimates import ( - NextQuartersEstimatesLoader, - PreviousQuartersEstimatesLoader, +from zipline.pipeline.loaders.earnings_estimates import ( + NextEarningsEstimatesLoader, + PreviousEarningsEstimatesLoader, required_estimates_fields, ) from zipline.pipeline.loaders.utils import ( @@ -35,6 +42,9 @@ class BlazeEstimatesLoader(PipelineLoader): The time to use for the data query cutoff. data_query_tz : tzinfo or str The timezeone to use for the data query cutoff. + checkpoints : Expr, optional + The expression representing checkpointed data to be used for faster + forward-filling of data from `expr`. Notes ----- @@ -55,6 +65,13 @@ class BlazeEstimatesLoader(PipelineLoader): If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. """ + __doc__ = __doc__.format( + SID_FIELD_NAME=SID_FIELD_NAME, + TS_FIELD_NAME=TS_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME=FISCAL_YEAR_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME=FISCAL_QUARTER_FIELD_NAME, + EVENT_DATE_FIELD_NAME=EVENT_DATE_FIELD_NAME, + ) @preprocess(data_query_tz=optionally(ensure_timezone)) def __init__(self, @@ -107,8 +124,8 @@ class BlazeEstimatesLoader(PipelineLoader): class BlazeNextEstimatesLoader(BlazeEstimatesLoader): - loader = NextQuartersEstimatesLoader + loader = NextEarningsEstimatesLoader class BlazePreviousEstimatesLoader(BlazeEstimatesLoader): - loader = PreviousQuartersEstimatesLoader + loader = PreviousEarningsEstimatesLoader diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index 56c050f2..6ecf3c6f 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -3,6 +3,8 @@ from datashape import istabular from .core import ( bind_expression_to_resources, ) +from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME, \ + EVENT_DATE_FIELD_NAME from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.blaze.utils import load_raw_data from zipline.pipeline.loaders.events import ( @@ -55,6 +57,10 @@ class BlazeEventsLoader(PipelineLoader): start the backtest with knowledge of all announcements. """ + __doc__ == __doc__.format(SID_FIELD_NAME=SID_FIELD_NAME, + TS_FIELD_NAME=TS_FIELD_NAME, + EVENT_DATE_FIELD_NAME=EVENT_DATE_FIELD_NAME) + @preprocess(data_query_tz=optionally(ensure_timezone)) def __init__(self, expr, diff --git a/zipline/pipeline/loaders/blaze/utils.py b/zipline/pipeline/loaders/blaze/utils.py index b5be9cd9..ab186c82 100644 --- a/zipline/pipeline/loaders/blaze/utils.py +++ b/zipline/pipeline/loaders/blaze/utils.py @@ -14,10 +14,11 @@ def load_raw_data(assets, odo_kwargs, checkpoints=None): """ - given an expression representing data to load, perform normalization and - forward-filling and return the data, materialized. + Given an expression representing data to load, perform normalization and + forward-filling and return the data, materialized. Only accepts data with a + `sid` field. - parameters + Parameters ---------- assets : pd.int64index the assets to load data for. @@ -32,8 +33,10 @@ def load_raw_data(assets, the expression representing the data to load. odo_kwargs : dict extra keyword arguments to pass to odo when executing the expression. + checkpoints : expr, optional + the expression representing the checkpointed data for `expr`. - returns + Returns ------- raw : pd.dataframe The result of computing expr and materializing the result as a diff --git a/zipline/pipeline/loaders/earnings_estimates.py b/zipline/pipeline/loaders/earnings_estimates.py new file mode 100644 index 00000000..3397d105 --- /dev/null +++ b/zipline/pipeline/loaders/earnings_estimates.py @@ -0,0 +1,619 @@ +from collections import defaultdict +from abc import abstractmethod, abstractproperty +import pandas as pd +from six import viewvalues +from toolz import groupby + +from zipline.lib.adjusted_array import AdjustedArray +from zipline.lib.adjustment import ( + Datetime641DArrayOverwrite, + Datetime64Overwrite, + Float641DArrayOverwrite, + Float64Overwrite, +) + +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.loaders.base import PipelineLoader +from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype +from zipline.pipeline.loaders.utils import ( + ffill_across_cols, + last_in_date_group +) + + +INVALID_NUM_QTRS_MESSAGE = "Passed invalid number of quarters %s; " \ + "must pass a number of quarters >= 0" +NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' +NEXT_FISCAL_YEAR = 'next_fiscal_year' +NORMALIZED_QUARTERS = 'normalized_quarters' +PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' +PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' +SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters' +SIMULTATION_DATES = 'dates' + + +def normalize_quarters(years, quarters): + return years * 4 + quarters - 1 + + +def split_normalized_quarters(normalized_quarters): + years = normalized_quarters // 4 + quarters = normalized_quarters % 4 + return years, quarters + 1 + + +def required_estimates_fields(columns): + """ + Compute the set of resource columns required to serve + `columns`. + """ + # These metadata columns are used to align event indexers. + return { + TS_FIELD_NAME, + SID_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME + }.union( + # We also expect any of the field names that our loadable columns + # are mapped to. + viewvalues(columns), + ) + + +def validate_column_specs(events, columns): + """ + Verify that the columns of ``events`` can be used by a + EarningsEstimatesLoader to serve the BoundColumns described by + `columns`. + """ + required = required_estimates_fields(columns) + received = set(events.columns) + missing = required - received + if missing: + raise ValueError( + "EarningsEstimatesLoader missing required columns {missing}.\n" + "Got Columns: {received}\n" + "Expected Columns: {required}".format( + missing=sorted(missing), + received=sorted(received), + required=sorted(required), + ) + ) + + +class EarningsEstimatesLoader(PipelineLoader): + """ + An abstract pipeline loader for estimates data that can load data a + variable number of quarters forwards/backwards from calendar dates + depending on the `num_quarters` attribute of the columns' dataset. + + Parameters + ---------- + estimates : pd.DataFrame + The raw estimates data. + ``estimates`` must contain at least 5 columns: + sid : int64 + The asset id associated with each estimate. + + event_date : datetime64[ns] + The date on which the event that the estimate is for will/has + occurred.. + + timestamp : datetime64[ns] + The date on which we learned about the estimate. + + fiscal_quarter : int64 + The quarter during which the event has/will occur. + + fiscal_year : int64 + The year during which the event has/will occur. + + name_map : dict[str -> str] + A map of names of BoundColumns that this loader will load to the + names of the corresponding columns in `events`. + """ + def __init__(self, + estimates, + name_map): + validate_column_specs( + estimates, + name_map + ) + + self.estimates = estimates[ + estimates[EVENT_DATE_FIELD_NAME].notnull() & + estimates[FISCAL_QUARTER_FIELD_NAME].notnull() & + estimates[FISCAL_YEAR_FIELD_NAME].notnull() + ] + self.estimates[NORMALIZED_QUARTERS] = normalize_quarters( + self.estimates[FISCAL_YEAR_FIELD_NAME], + self.estimates[FISCAL_QUARTER_FIELD_NAME], + ) + self.array_overwrites_dict = {datetime64ns_dtype: + Datetime641DArrayOverwrite, + float64_dtype: Float641DArrayOverwrite} + self.scalar_overwrites_dict = {datetime64ns_dtype: Datetime64Overwrite, + float64_dtype: Float64Overwrite} + + self.name_map = name_map + + @abstractmethod + def get_zeroth_quarter_idx(self, num_quarters, last, dates): + raise NotImplementedError('get_zeroth_quarter_idx') + + @abstractmethod + def get_shifted_qtrs(self, zero_qtrs, num_quarters): + raise NotImplementedError('get_shifted_qtrs') + + @abstractmethod + def create_overwrite_for_estimate(self, + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx): + raise NotImplementedError('create_overwrite_for_estimate') + + @abstractproperty + def searchsorted_side(self): + return NotImplementedError('searchsorted_side') + + def get_requested_quarter_data(self, stacked_last_per_qtr, idx, dates): + """ + Selects the requested data for each date. + + Parameters + ---------- + stacked_last_per_qtr : pd.DataFrame + The latest estimate known with the dates, normalized quarter, and + sid as the index. + idx : pd.MultiIndex + The index of the row of the requested quarter from each date for + each sid. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + + Returns + -------- + requested_qtr_data : pd.DataFrame + The DataFrame with the latest values for the requested quarter + for all columns; `dates` are the index and columns are a MultiIndex + with sids at the top level and the dataset columns on the bottom. + """ + requested_qtr_data = stacked_last_per_qtr.loc[idx] + # We've lost the index names when doing `loc`, so set them here. + requested_qtr_data.index = requested_qtr_data.index.set_names( + idx.names + ) + requested_qtr_data = requested_qtr_data.reset_index( + SHIFTED_NORMALIZED_QTRS + ) + # Calculate the actual year/quarter being requested and add those in + # as columns. + (requested_qtr_data[FISCAL_YEAR_FIELD_NAME], + requested_qtr_data[FISCAL_QUARTER_FIELD_NAME]) = \ + split_normalized_quarters( + requested_qtr_data[SHIFTED_NORMALIZED_QTRS] + ) + # Once we're left with just dates as the index, we can reindex by all + # dates so that we have a value for each calendar date. + return requested_qtr_data.unstack(SID_FIELD_NAME).reindex(dates) + + def get_adjustments(self, + zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets, + columns): + """ + Creates an AdjustedArray from the given estimates data for the given + dates. + + Parameters + ---------- + zero_qtr_data : pd.DataFrame + The 'time zero' data for each calendar date per sid. + requested_qtr_data : pd.DataFrame + The requested quarter data for each calendar date per sid. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter. + dates : pd.DatetimeIndex + The calendar dates for which estimates data is requested. + assets : pd.Int64Index + An index of all the assets from the raw data. + columns : list of BoundColumn + The columns for which adjustments need to be calculated. + + Returns + ------- + adjusted_array : AdjustedArray + The array of data and overwrites for the given column. + """ + col_to_overwrites = defaultdict(dict) + # We no longer need NORMALIZED_QUARTERS in the index, but we do need it + # as a column to calculate adjustments. + zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS) + + for sid_idx, sid in enumerate(assets): + zero_qtr_sid_data = zero_qtr_data[ + zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid + ] + # Determine where quarters are changing for this sid. + qtr_shifts = zero_qtr_sid_data[ + zero_qtr_sid_data[NORMALIZED_QUARTERS] != + zero_qtr_sid_data[NORMALIZED_QUARTERS].shift(1) + ] + # On dates where we don't have any information about quarters, + # we will get nulls, and each of these will be interpreted as + # quarter shifts. We need to remove these here. + qtr_shifts = qtr_shifts[ + qtr_shifts[NORMALIZED_QUARTERS].notnull() + ] + # For the given sid, determine which quarters we have estimates + # for. + qtrs_with_estimates_for_sid = last_per_qtr.xs( + sid, axis=1, level=SID_FIELD_NAME + ).groupby(axis=1, level=1).first().columns.values + for row_indexer in list(qtr_shifts.index): + # Find the starting index of the quarter that comes right + # after this row. This isn't the starting index of the + # requested quarter, but simply the date we cross over into a + # new quarter. + next_qtr_start_idx = dates.searchsorted( + zero_qtr_data.loc[ + row_indexer + ][EVENT_DATE_FIELD_NAME], + side=self.searchsorted_side + ) + # Only add adjustments if the next quarter starts somewhere in + # our date index for this sid. Our 'next' quarter can never + # start at index 0; a starting index of 0 means that the next + # quarter's event date was NaT. + if 0 < next_qtr_start_idx < len(dates): + self.create_overwrite_for_quarter( + col_to_overwrites, + next_qtr_start_idx, + last_per_qtr, + qtrs_with_estimates_for_sid, + requested_qtr_data, + sid, + sid_idx, + columns, + ) + return col_to_overwrites + + def create_overwrite_for_quarter(self, + col_to_overwrites, + next_qtr_start_idx, + last_per_qtr, + quarters_with_estimates_for_sid, + requested_qtr_data, + sid, + sid_idx, + columns): + """ + Add entries to the dictionary of columns to adjustments for the given + sid and the given quarter. + + Parameters + ---------- + col_to_overwrites : dict [column_name -> list of ArrayAdjustment] + A dictionary mapping column names to all overwrites for those + columns. + next_qtr_start_idx : int + The index of the first day of the next quarter in the calendar + dates. + last_per_qtr : pd.DataFrame + A DataFrame with a column MultiIndex of [self.estimates.columns, + normalized_quarters, sid] that allows easily getting the timeline + of estimates for a particular sid for a particular quarter; this + is particularly useful for getting adjustments for 'next' + estimates. + quarters_with_estimates_for_sid : np.array + An array of all quarters for which there are estimates for the + given sid. + sid : int + The sid for which to create overwrites. + sid_idx : int + The index of the sid in `assets`. + columns : list of BoundColumn + The columns for which to create overwrites. + """ + + # Find the quarter being requested in the quarter we're + # crossing into. + requested_quarter = requested_qtr_data[ + SHIFTED_NORMALIZED_QTRS + ][sid].iloc[next_qtr_start_idx] + for col in columns: + column_name = self.name_map[col.name] + # If there are estimates for the requested quarter, + # overwrite all values going up to the starting index of + # that quarter with estimates for that quarter. + if requested_quarter in quarters_with_estimates_for_sid: + col_to_overwrites[column_name][next_qtr_start_idx] = \ + [self.create_overwrite_for_estimate( + col, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx + )] + # There are no estimates for the quarter. Overwrite all + # values going up to the starting index of that quarter + # with the missing value for this column. + else: + col_to_overwrites[column_name][next_qtr_start_idx] =\ + [self.overwrite_with_null( + col, + last_per_qtr.index, + next_qtr_start_idx, + sid_idx + )] + + def overwrite_with_null(self, + column, + dates, + next_qtr_start_idx, + sid_idx): + return self.scalar_overwrites_dict[column.dtype]( + 0, + next_qtr_start_idx - 1, + sid_idx, + sid_idx, + column.missing_value + ) + + def load_adjusted_array(self, columns, dates, assets, mask): + # Separate out getting the columns' datasets and the datasets' + # num_quarters attributes to ensure that we're catching the right + # AttributeError. + col_to_datasets = {col: col.dataset for col in columns} + try: + groups = groupby(lambda col: col_to_datasets[col].num_quarters, + col_to_datasets) + except AttributeError: + raise AttributeError("Datasets loaded via the " + "EarningsEstimatesLoader must define a " + "`num_quarters` attribute that defines how " + "many quarters out the loader should load " + "the data relative to `dates`.") + if any(num_qtr < 0 for num_qtr in groups): + raise ValueError( + INVALID_NUM_QTRS_MESSAGE % ','.join( + str(qtr) for qtr in groups if qtr < 0 + ) + + ) + out = {} + # To optimize performance, only work below on assets that are + # actually in the raw data. + assets_with_data = set(assets) & set(self.estimates[SID_FIELD_NAME]) + for num_quarters, columns in groups.items(): + last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr( + assets_with_data, columns, dates + ) + # Determine which quarter is immediately next/previous for each + # date. + zeroth_quarter_idx = self.get_zeroth_quarter_idx( + num_quarters, stacked_last_per_qtr + ) + zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx] + # Doing it this way because creating a MultiIndex from scratch + # results in being unable to unstack sids because of duplicate + # values, even though the MultiIndex is created with the same + # exact values as below - possible pandas bug. + requested_qtr_idx = zero_qtr_data.reset_index( + NORMALIZED_QUARTERS + ).set_index( + self.get_shifted_qtrs( + zeroth_quarter_idx.get_level_values(NORMALIZED_QUARTERS), + num_quarters + ), + append=True + ).index + requested_qtr_idx = requested_qtr_idx.rename( + SHIFTED_NORMALIZED_QTRS, -1 + ) + requested_qtr_data = self.get_requested_quarter_data( + stacked_last_per_qtr, requested_qtr_idx, dates + ) + + # Calculate all adjustments for the given quarter and accumulate + # them for each column. + col_to_adjustments = self.get_adjustments(zero_qtr_data, + requested_qtr_data, + last_per_qtr, + dates, + assets_with_data, + columns) + for col in columns: + column_name = self.name_map[col.name] + # We may have dropped assets if they never have any data for + # the requested quarter. + df = pd.DataFrame(data=requested_qtr_data[column_name], + index=dates, + columns=assets, + dtype=col.dtype) + + out[col] = AdjustedArray( + df.values.astype(col.dtype), + mask, + dict(col_to_adjustments[column_name]), + col.missing_value, + ) + return out + + def get_last_data_per_qtr(self, assets_with_data, columns, dates): + """ + Determine the last piece of information we know for each column on each + date in the index for each sid and quarter. + + Parameters + ---------- + assets_with_data : pd.Index + Index of all assets that appear in the raw data given to the + loader. + columns : iterable of BoundColumn + The columns that need to be loaded from the raw data. + dates : pd.DatetimeIndex + The calendar of dates for which data should be loaded. + + Returns + ------- + stacked_last_per_qtr : pd.DataFrame + A DataFrame indexed by [dates, sid, normalized_quarters] that has + the latest information for each row of the index, sorted by event + date. + last_per_qtr : pd.DataFrame + A DataFrame with columns that are a MultiIndex of [ + self.estimates.columns, normalized_quarters, sid]. + """ + # Get a DataFrame indexed by date with a MultiIndex of columns of [ + # self.estimates.columns, normalized_quarters, sid], where each cell + # contains the latest data for that day. + last_per_qtr = last_in_date_group( + self.estimates, dates, assets_with_data, reindex=True, + extra_groupers=[NORMALIZED_QUARTERS] + ) + # Forward fill values for each quarter/sid/dataset column. + ffill_across_cols(last_per_qtr, columns, self.name_map) + # Stack quarter and sid into the index. + stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME, + NORMALIZED_QUARTERS]) + # Set date index name for ease of reference + stacked_last_per_qtr.index.set_names(SIMULTATION_DATES, + level=0, + inplace=True) + stacked_last_per_qtr = stacked_last_per_qtr.sort( + EVENT_DATE_FIELD_NAME + ) + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] = pd.to_datetime( + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] + ) + return last_per_qtr, stacked_last_per_qtr + + +class NextEarningsEstimatesLoader(EarningsEstimatesLoader): + @property + def searchsorted_side(self): + return 'right' + + def create_overwrite_for_estimate(self, + column, + column_name, + last_per_qtr, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx): + return self.array_overwrites_dict[column.dtype]( + 0, + # overwrite thru last qtr + next_qtr_start_idx - 1, + sid_idx, + sid_idx, + last_per_qtr[ + column_name, + requested_quarter, + sid + ][:next_qtr_start_idx].values) + + def get_shifted_qtrs(self, zero_qtrs, num_quarters): + return zero_qtrs + (num_quarters - 1) + + def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr): + """ + Filters for releases that are on or after each simulation date and + determines the next quarter by picking out the upcoming release for + each date in the index. + + Parameters + ---------- + num_quarters : int + Number of quarters to go out in the future. + stacked_last_per_qtr : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters with each row being the latest estimate for the row's + index values, sorted by event date. + + Returns + ------- + next_releases_per_date_index : pd.MultiIndex + An index of calendar dates, sid, and normalized quarters, for only + the rows that have a next event. + """ + + next_releases_per_date = stacked_last_per_qtr.loc[ + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >= + stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) + ].groupby( + level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False + ).nth(0) + return next_releases_per_date.index + + +class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): + @property + def searchsorted_side(self): + return 'left' + + def create_overwrite_for_estimate(self, + column, + column_name, + dates, + next_qtr_start_idx, + requested_quarter, + sid, + sid_idx): + return self.overwrite_with_null(column, + dates, + next_qtr_start_idx, + sid_idx) + + def get_shifted_qtrs(self, zero_qtrs, num_quarters): + return zero_qtrs - (num_quarters - 1) + + def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr): + """ + Filters for releases that are on or after each simulation date and + determines the previous quarter by picking out the most recent + release relative to each date in the index. + + Parameters + ---------- + num_quarters : int + Number of quarters to go out in the past. + stacked_last_per_qtr : pd.DataFrame + A DataFrame with index of calendar dates, sid, and normalized + quarters with each row being the latest estimate for the row's + index values, sorted by event date. + + Returns + ------- + previous_releases_per_date_index : pd.MultiIndex + An index of calendar dates, sid, and normalized quarters, for only + the rows that have a previous event. + """ + + previous_releases_per_date = stacked_last_per_qtr.loc[ + stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <= + stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) + ].groupby( + level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False + ).nth(-1) + return previous_releases_per_date.index diff --git a/zipline/pipeline/loaders/quarter_estimates.py b/zipline/pipeline/loaders/quarter_estimates.py deleted file mode 100644 index 71ff3a32..00000000 --- a/zipline/pipeline/loaders/quarter_estimates.py +++ /dev/null @@ -1,529 +0,0 @@ -from collections import defaultdict -from abc import abstractmethod -import numpy as np -import pandas as pd -from six import viewvalues -from toolz import groupby - -from zipline.lib.adjusted_array import AdjustedArray -from zipline.lib.adjustment import (Datetime641DArrayOverwrite, - Float641DArrayOverwrite) - -from zipline.pipeline.common import ( - EVENT_DATE_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - SID_FIELD_NAME, - TS_FIELD_NAME, -) -from zipline.pipeline.loaders.base import PipelineLoader -from zipline.utils.numpy_utils import datetime64ns_dtype -from zipline.pipeline.loaders.utils import ( - ffill_across_cols, - last_in_date_group -) - - -INVALID_NUM_QTRS_MESSAGE = "Passed invalid number of quarters %s; " \ - "must pass a number of quarters >= 0" -NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' -NEXT_FISCAL_YEAR = 'next_fiscal_year' -NORMALIZED_QUARTERS = 'normalized_quarters' -PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' -PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' -SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters' -SIMULTATION_DATES = 'dates' - - -def normalize_quarters(years, quarters): - return years * 4 + quarters - 1 - - -def split_normalized_quarters(normalized_quarters): - years = normalized_quarters // 4 - quarters = normalized_quarters % 4 - return years, quarters + 1 - - -def required_estimates_fields(columns): - """ - Compute the set of resource columns required to serve - `columns`. - """ - # These metadata columns are used to align event indexers. - return { - TS_FIELD_NAME, - SID_FIELD_NAME, - EVENT_DATE_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME - }.union( - # We also expect any of the field names that our loadable columns - # are mapped to. - viewvalues(columns), - ) - - -def validate_column_specs(events, columns): - """ - Verify that the columns of ``events`` can be used by a - QuarterEstimatesLoader to serve the BoundColumns described by - `columns`. - """ - required = required_estimates_fields(columns) - received = set(events.columns) - missing = required - received - if missing: - raise ValueError( - "QuarterEstimatesLoader missing required columns {missing}.\n" - "Got Columns: {received}\n" - "Expected Columns: {required}".format( - missing=sorted(missing), - received=sorted(received), - required=sorted(required), - ) - ) - - -class QuarterEstimatesLoader(PipelineLoader): - def __init__(self, - estimates, - name_map): - validate_column_specs( - estimates, - name_map - ) - - self.estimates = estimates[ - estimates[EVENT_DATE_FIELD_NAME].notnull() & - estimates[FISCAL_QUARTER_FIELD_NAME].notnull() & - estimates[FISCAL_YEAR_FIELD_NAME].notnull() - ] - self.estimates[NORMALIZED_QUARTERS] = normalize_quarters( - self.estimates[FISCAL_YEAR_FIELD_NAME], - self.estimates[FISCAL_QUARTER_FIELD_NAME], - ) - - self.name_map = name_map - - @abstractmethod - def load_quarters(self, num_quarters, last, dates): - raise NotImplementedError('load_quarters') - - def get_requested_data_for_col(self, stacked_last_per_qtr, idx, dates): - """ - Selects the requested data for each date. - - Parameters - ---------- - stacked_last_per_qtr : pd.DataFrame - The latest estimate known with the dates, normalized quarter, and - sid as the index. - idx : pd.MultiIndex - The index of the row of the requested quarter from each date for - each sid. - dates : pd.DatetimeIndex - The calendar dates for which estimates data is requested. - - Returns - -------- - requested_qtr_data : pd.DataFrame - The DataFrame with the latest values for the requested quarter - for all columns; `dates` are the index and columns are a MultiIndex - with sids at the top level and the dataset columns on the bottom. - """ - requested_qtr_data = stacked_last_per_qtr.loc[idx] - # We no longer need the shifted normalized quarters in the index, but - # we do need it as a column to calculate adjustments. - requested_qtr_data = requested_qtr_data.reset_index( - SHIFTED_NORMALIZED_QTRS - ) - # Calculate the actual year/quarter being requested and add those in - # as columns. - (requested_qtr_data[FISCAL_YEAR_FIELD_NAME], - requested_qtr_data[FISCAL_QUARTER_FIELD_NAME]) = \ - split_normalized_quarters( - requested_qtr_data[SHIFTED_NORMALIZED_QTRS] - ) - # Move sids into the columns. Once we're left with just dates - # as the index, we can reindex by all dates so that we have a - # value for each calendar date. - requested_qtr_data = requested_qtr_data.unstack( - SID_FIELD_NAME - ).reindex(dates) - return requested_qtr_data - - def get_adjustments(self, - zero_qtr_data, - requested_qtr_data, - last_per_qtr, - dates, - assets, - columns): - """ - Creates an AdjustedArray from the given estimates data for the given - dates. - - Parameters - ---------- - zero_qtr_data : pd.DataFrame - The 'time zero' data for each date/sid. - zero_qtr_data : pd.DataFrame - The data for the requested quarter. - last_per_qtr : pd.DataFrame - The latest estimate known per sid per date per quarter with - dates as the index and normalized quarter and sid in the columns - MultiIndex; allows easy access to the timeline of estimates - across all dates for a sid for a particular quarter. - dates : pd.DatetimeIndex - The calendar dates for which estimates data is requested. - column_name : string - The name of the column for which the AdjustedArray is being - computed. - column : BoundColumn - The column for which the AdjustedArray is being computed. - mask : np.array - Mask array of dimensions len(dates) X len(assets). - assets : pd.Int64Index - An index of all the assets from the raw data. - - Returns - ------- - adjusted_array : AdjustedArray - The array of data and overwrites for the given column. - """ - col_to_adjustments = defaultdict(dict) - # We no longer need this in the index, but we do need it as a column - # to calculate adjustments. - zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS) - - for sid_idx, sid in enumerate(assets): - zero_qtr_sid_data = zero_qtr_data[ - zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid - ] - # Determine where quarters are changing for this sid. - qtr_shifts = zero_qtr_sid_data[ - zero_qtr_sid_data[NORMALIZED_QUARTERS] != - zero_qtr_sid_data[NORMALIZED_QUARTERS].shift(1) - ] - # On dates where we don't have any information about quarters, - # we will get nulls, and each of these will be interpreted as - # quarter shifts. We need to remove these here. - qtr_shifts = qtr_shifts[ - qtr_shifts[NORMALIZED_QUARTERS].notnull() - ] - # For the given sid, determine which quarters we have estimates - # for. - qtrs_with_estimates_for_sid = last_per_qtr.xs( - sid, axis=1, level=SID_FIELD_NAME - ).groupby(axis=1, level=1).first().columns.values - for row_indexer in list(qtr_shifts.index): - # Find the starting index of the quarter that comes right - # after this row. This isn't the starting index of the - # requested quarter, but simply the date we cross over into a - # new quarter. - next_qtr_start_idx = dates.searchsorted( - zero_qtr_data.loc[ - row_indexer - ][EVENT_DATE_FIELD_NAME], - side='left' - if isinstance(self, PreviousQuartersEstimatesLoader) - else 'right' - ) - self.create_overwrite_for_quarter( - col_to_adjustments, - next_qtr_start_idx, - dates, - last_per_qtr, - qtrs_with_estimates_for_sid, - requested_qtr_data, - sid, - sid_idx, - columns, - ) - return col_to_adjustments - - def create_overwrite_for_quarter(self, - col_to_adjustments, - next_qtr_start_idx, - dates, - last_per_qtr, - quarters_with_estimates_for_sid, - requested_qtr_data, - sid, - sid_idx, - columns): - overwrites_dict = {} - for col in columns: - if col.dtype == datetime64ns_dtype: - overwrites_dict[col] = Datetime641DArrayOverwrite - else: - overwrites_dict[col] = Float641DArrayOverwrite - # Only add adjustments if the next quarter starts somewhere in - # our date index for this sid. Our 'next' quarter can never - # start at index 0; a starting index of 0 means that the next - # quarter's event date was NaT. - if 0 < next_qtr_start_idx < len(dates): - # Find the quarter being requested in the quarter we're - # crossing into. - requested_quarter = requested_qtr_data[ - SHIFTED_NORMALIZED_QTRS - ][sid].iloc[next_qtr_start_idx] - for col in columns: - column_name = self.name_map[col.name] - # If there are estimates for the requested quarter, - # overwrite all values going up to the starting index of - # that quarter with estimates for that quarter. - if requested_quarter in quarters_with_estimates_for_sid: - col_to_adjustments[column_name][next_qtr_start_idx] = \ - self.create_overwrite_for_estimate( - col, - column_name, - last_per_qtr, - next_qtr_start_idx, - overwrites_dict[col], - requested_quarter, - sid, - sid_idx - ) - # There are no estimates for the quarter. Overwrite all - # values going up to the starting index of that quarter - # with the missing value for this column. - else: - col_to_adjustments[column_name][next_qtr_start_idx] =\ - self.overwrite_with_null( - col, - last_per_qtr, - next_qtr_start_idx, - overwrites_dict[col], - sid_idx - ) - - def overwrite_with_null(self, - column, - last_per_qtr, - next_qtr_start_idx, - overwrite, - sid_idx): - return [overwrite( - 0, - next_qtr_start_idx - 1, - sid_idx, - sid_idx, - np.full( - len( - last_per_qtr.index[:next_qtr_start_idx] - ), - column.missing_value, - dtype=column.dtype - ))] - - def load_adjusted_array(self, columns, dates, assets, mask): - # Separate out getting the columns' datasets and the datasets' - # num_quarters attributes to ensure that we're catching the right - # AttributeError. - col_to_datasets = {col: col.dataset for col in columns} - try: - groups = groupby(lambda col: col_to_datasets[col].num_quarters, - col_to_datasets) - except AttributeError: - raise AttributeError("Datasets loaded via the " - "QuarterEstimatesLoader must define a " - "`num_quarters` attribute that defines how " - "many quarters out the loader should load " - "the data relative to `dates`.") - if any(num_qtr < 0 for num_qtr in groups): - raise ValueError( - INVALID_NUM_QTRS_MESSAGE % ','.join( - str(qtr) for qtr in groups if qtr < 0 - ) - - ) - out = {} - for num_quarters, columns in groups.items(): - # Determine the last piece of information we know for each column - # on each date in the index for each sid and quarter. - last_per_qtr = last_in_date_group( - self.estimates, dates, assets, reindex=True, - extra_groupers=[NORMALIZED_QUARTERS] - ) - - # Forward fill values for each quarter/sid/dataset column. - ffill_across_cols(last_per_qtr, columns, self.name_map) - # Stack quarter and sid into the index. - stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME, - NORMALIZED_QUARTERS]) - # Set date index name for ease of reference - stacked_last_per_qtr.index.set_names(SIMULTATION_DATES, - level=0, - inplace=True) - # We want to know the most recent/next event relative to each date. - stacked_last_per_qtr = stacked_last_per_qtr.sort( - EVENT_DATE_FIELD_NAME - ) - # Determine which quarter is next/previous for each date. - shifted_qtr_data = self.load_quarters(num_quarters, - stacked_last_per_qtr) - zero_qtr_idx = shifted_qtr_data.index - requested_qtr_idx = shifted_qtr_data.set_index([ - shifted_qtr_data.index.get_level_values( - SIMULTATION_DATES - ), - shifted_qtr_data.index.get_level_values( - SID_FIELD_NAME - ), - shifted_qtr_data[SHIFTED_NORMALIZED_QTRS] - ]).index - requested_qtr_data = self.get_requested_data_for_col( - stacked_last_per_qtr, requested_qtr_idx, dates - ) - - zero_qtr_data = stacked_last_per_qtr.loc[zero_qtr_idx] - - col_to_adjustments = self.get_adjustments(zero_qtr_data, - requested_qtr_data, - last_per_qtr, - dates, - assets, - columns) - for col in columns: - column_name = self.name_map[col.name] - # We may have dropped assets if they never have any data for the - # requested quarter. - df = pd.DataFrame(data=requested_qtr_data[column_name], - index=dates, - columns=assets, - dtype=col.dtype) - - out[col] = AdjustedArray( - df.values.astype(col.dtype), - mask, - dict(col_to_adjustments[column_name]), - col.missing_value, - ) - return out - - -class NextQuartersEstimatesLoader(QuarterEstimatesLoader): - def create_overwrite_for_estimate(self, - column, - column_name, - last_per_qtr, - next_qtr_start_idx, - overwrite, - requested_quarter, - sid, - sid_idx): - return [overwrite( - 0, - # overwrite thru last qtr - next_qtr_start_idx - 1, - sid_idx, - sid_idx, - last_per_qtr[ - column_name, - requested_quarter, - sid - ][0:next_qtr_start_idx].values)] - - def load_quarters(self, num_quarters, stacked_last_per_qtr): - """ - Filters for releases that are on or after each simulation date and - determines the next quarter by picking out the upcoming release for - each date in the index. Adda a SHIFTED_NORMALIZED_QTRS column which - contains the requested next quarter for each calendar date and sid. - - Parameters - ---------- - num_quarters : int - Number of quarters to go out in the future. - stacked_last_per_qtr : pd.DataFrame - A DataFrame with index of calendar dates, sid, and normalized - quarters with each row being the latest estimate for the row's - index values, sorted by event date. - - Returns - ------- - next_releases_per_date : pd.DataFrame - A DataFrame with index of calendar dates, sid, and normalized - quarters, keeping only rows with next event information relative to - the index values and with an added column for - SHIFTED_NORMALIZED_QTRS, which contains the requested quarter for - each row. - """ - - # We reset the index here because in pandas3, a groupby on the index - # will set the index to just the items in the groupby, so we will lose - # the normalized quarters. - next_releases_per_date = stacked_last_per_qtr.loc[ - stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >= - stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) - ].reset_index(NORMALIZED_QUARTERS).groupby( - level=[SIMULTATION_DATES, SID_FIELD_NAME] - ).nth(0).set_index(NORMALIZED_QUARTERS, append=True) - next_releases_per_date[ - SHIFTED_NORMALIZED_QTRS - ] = next_releases_per_date.index.get_level_values( - NORMALIZED_QUARTERS - ) + (num_quarters - 1) - return next_releases_per_date - - -class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader): - def create_overwrite_for_estimate(self, - column, - column_name, - last_per_qtr, - next_qtr_start_idx, - overwrite, - requested_quarter, - sid, - sid_idx): - return self.overwrite_with_null(column, - last_per_qtr, - next_qtr_start_idx, - overwrite, - sid_idx) - - def load_quarters(self, num_quarters, stacked_last_per_qtr): - """ - Filters for releases that are on or after each simulation date and - determines the previous quarter by picking out the most recent - release relative to each date in the index. Adds a - SHIFTED_NORMALIZED_QTRS column which contains the requested previous - quarter for each calendar date and sid. - - Parameters - ---------- - num_quarters : int - Number of quarters to go out in the past. - stacked_last_per_qtr : pd.DataFrame - A DataFrame with index of calendar dates, sid, and normalized - quarters with each row being the latest estimate for the row's - index values, sorted by event date. - - Returns - ------- - next_releases_per_date : pd.DataFrame - A DataFrame with index of calendar dates, sid, and normalized - quarters, keeping only rows with have a previous event relative - to the index values and with an added column for - SHIFTED_NORMALIZED_QTRS, which contains the requested quarter for - each row. - """ - - # We reset the index here because in pandas3, a groupby on the index - # will set the index to just the items in the groupby, so we will lose - # the normalized quarters. - previous_releases_per_date = stacked_last_per_qtr.loc[ - stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <= - stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) - ].reset_index(NORMALIZED_QUARTERS).groupby( - level=[SIMULTATION_DATES, SID_FIELD_NAME] - ).nth(-1).set_index(NORMALIZED_QUARTERS, append=True) - previous_releases_per_date[ - SHIFTED_NORMALIZED_QTRS - ] = previous_releases_per_date.index.get_level_values( - NORMALIZED_QUARTERS - ) - (num_quarters - 1) - return previous_releases_per_date diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 125dacfb..a81df8fe 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -320,8 +320,11 @@ def last_in_date_group(df, dates, assets, reindex=True, have_sids=True, ).last() # For the number of things that we're grouping by (except TS), unstack - # the df - last_in_group = last_in_group.unstack(list(range(-1, -len(idx), -1))) + # the df. Done this way because of an unresolved pandas bug whereby + # passing a list of levels with mixed dtypes to unstack causes the + # resulting DataFrame to have all object-type columns. + for _ in range(len(idx) - 1): + last_in_group = last_in_group.unstack(-1) if reindex: if have_sids: diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index 1ce01627..f0e2aaa5 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -34,14 +34,13 @@ from ..finance.trading import TradingEnvironment from ..utils import factory from ..utils.classproperty import classproperty from ..utils.final import FinalMeta, final -from .core import (tmp_asset_finder, make_simple_equity_info) +from .core import tmp_asset_finder, make_simple_equity_info from zipline.assets import Equity, Future from zipline.pipeline import SimplePipelineEngine from zipline.pipeline.loaders.testing import make_seeded_random_loader from zipline.utils.calendars import ( get_calendar, - register_calendar -) + register_calendar) class ZiplineTestCase(with_metaclass(FinalMeta, TestCase)):