diff --git a/zipline/pipeline/loaders/earnings_estimates.py b/zipline/pipeline/loaders/earnings_estimates.py index 3397d105..6790e2c4 100644 --- a/zipline/pipeline/loaders/earnings_estimates.py +++ b/zipline/pipeline/loaders/earnings_estimates.py @@ -1,5 +1,7 @@ -from collections import defaultdict from abc import abstractmethod, abstractproperty +from collections import defaultdict + +import numpy as np import pandas as pd from six import viewvalues from toolz import groupby @@ -136,11 +138,15 @@ class EarningsEstimatesLoader(PipelineLoader): self.estimates[FISCAL_YEAR_FIELD_NAME], self.estimates[FISCAL_QUARTER_FIELD_NAME], ) - self.array_overwrites_dict = {datetime64ns_dtype: - Datetime641DArrayOverwrite, - float64_dtype: Float641DArrayOverwrite} - self.scalar_overwrites_dict = {datetime64ns_dtype: Datetime64Overwrite, - float64_dtype: Float64Overwrite} + + self.array_overwrites_dict = { + datetime64ns_dtype: Datetime641DArrayOverwrite, + float64_dtype: Float641DArrayOverwrite, + } + self.scalar_overwrites_dict = { + datetime64ns_dtype: Datetime64Overwrite, + float64_dtype: Float64Overwrite, + } self.name_map = name_map @@ -167,18 +173,25 @@ class EarningsEstimatesLoader(PipelineLoader): def searchsorted_side(self): return NotImplementedError('searchsorted_side') - def get_requested_quarter_data(self, stacked_last_per_qtr, idx, dates): + def get_requested_quarter_data(self, + zero_qtr_data, + zeroth_quarter_idx, + stacked_last_per_qtr, + num_quarters, + dates): """ Selects the requested data for each date. Parameters ---------- + zero_qtr_data : pd.DataFrame + The 'time zero' data for each calendar date per sid. + zeroth_quarter_idx : pd.Index + An index of calendar dates, sid, and normalized quarters, for only + the rows that have a next or previous earnings estimate. stacked_last_per_qtr : pd.DataFrame - The latest estimate known with the dates, normalized quarter, and + The latest estimate known with the dates, normalized quarter, and sid as the index. - idx : pd.MultiIndex - The index of the row of the requested quarter from each date for - each sid. dates : pd.DatetimeIndex The calendar dates for which estimates data is requested. @@ -189,13 +202,27 @@ class EarningsEstimatesLoader(PipelineLoader): for all columns; `dates` are the index and columns are a MultiIndex with sids at the top level and the dataset columns on the bottom. """ - requested_qtr_data = stacked_last_per_qtr.loc[idx] - # We've lost the index names when doing `loc`, so set them here. - requested_qtr_data.index = requested_qtr_data.index.set_names( - idx.names + zero_qtr_data_idx = zero_qtr_data.index + requested_qtr_idx = pd.MultiIndex.from_arrays( + [ + zero_qtr_data_idx.get_level_values(0), + zero_qtr_data_idx.get_level_values(1), + self.get_shifted_qtrs( + zeroth_quarter_idx.get_level_values( + NORMALIZED_QUARTERS, + ), + num_quarters, + ), + ], + names=[ + zero_qtr_data_idx.names[0], + zero_qtr_data_idx.names[1], + SHIFTED_NORMALIZED_QTRS, + ], ) + requested_qtr_data = stacked_last_per_qtr.loc[requested_qtr_idx] requested_qtr_data = requested_qtr_data.reset_index( - SHIFTED_NORMALIZED_QTRS + SHIFTED_NORMALIZED_QTRS, ) # Calculate the actual year/quarter being requested and add those in # as columns. @@ -244,54 +271,44 @@ class EarningsEstimatesLoader(PipelineLoader): col_to_overwrites = defaultdict(dict) # We no longer need NORMALIZED_QUARTERS in the index, but we do need it # as a column to calculate adjustments. - zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS) + zero_qtr_data = zero_qtr_data.reset_index(level=NORMALIZED_QUARTERS) + zero_qtr_data.sort_index(inplace=True) - for sid_idx, sid in enumerate(assets): - zero_qtr_sid_data = zero_qtr_data[ - zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid + quarter_shifts = zero_qtr_data.loc[ + zero_qtr_data.index[ + zero_qtr_data.groupby(level=SID_FIELD_NAME)[ + NORMALIZED_QUARTERS + ].diff().nonzero() ] - # Determine where quarters are changing for this sid. - qtr_shifts = zero_qtr_sid_data[ - zero_qtr_sid_data[NORMALIZED_QUARTERS] != - zero_qtr_sid_data[NORMALIZED_QUARTERS].shift(1) - ] - # On dates where we don't have any information about quarters, - # we will get nulls, and each of these will be interpreted as - # quarter shifts. We need to remove these here. - qtr_shifts = qtr_shifts[ - qtr_shifts[NORMALIZED_QUARTERS].notnull() - ] - # For the given sid, determine which quarters we have estimates - # for. - qtrs_with_estimates_for_sid = last_per_qtr.xs( - sid, axis=1, level=SID_FIELD_NAME - ).groupby(axis=1, level=1).first().columns.values - for row_indexer in list(qtr_shifts.index): - # Find the starting index of the quarter that comes right - # after this row. This isn't the starting index of the - # requested quarter, but simply the date we cross over into a - # new quarter. - next_qtr_start_idx = dates.searchsorted( - zero_qtr_data.loc[ - row_indexer - ][EVENT_DATE_FIELD_NAME], - side=self.searchsorted_side - ) - # Only add adjustments if the next quarter starts somewhere in - # our date index for this sid. Our 'next' quarter can never - # start at index 0; a starting index of 0 means that the next - # quarter's event date was NaT. - if 0 < next_qtr_start_idx < len(dates): + ] + + sid_to_idx = dict(zip(assets, range(len(assets)))) + + def collect_adjustments(group): + next_qtr_start_indices = dates.searchsorted( + group[EVENT_DATE_FIELD_NAME].values, + side=self.searchsorted_side, + ) + sid = int(group.name) + qtrs_with_estimates = group[NORMALIZED_QUARTERS].values + for idx in next_qtr_start_indices: + if 0 < idx < len(dates): + # Only add adjustments if the next quarter starts somewhere + # in our date index for this sid. Our 'next' quarter can + # never start at index 0; a starting index of 0 means that + # the next quarter's event date was NaT. self.create_overwrite_for_quarter( col_to_overwrites, - next_qtr_start_idx, + idx, last_per_qtr, - qtrs_with_estimates_for_sid, + qtrs_with_estimates, requested_qtr_data, sid, - sid_idx, + sid_to_idx[sid], columns, ) + + quarter_shifts.groupby(level=SID_FIELD_NAME).apply(collect_adjustments) return col_to_overwrites def create_overwrite_for_quarter(self, @@ -335,16 +352,16 @@ class EarningsEstimatesLoader(PipelineLoader): # Find the quarter being requested in the quarter we're # crossing into. requested_quarter = requested_qtr_data[ - SHIFTED_NORMALIZED_QTRS - ][sid].iloc[next_qtr_start_idx] + SHIFTED_NORMALIZED_QTRS, sid, + ].iloc[next_qtr_start_idx] for col in columns: column_name = self.name_map[col.name] # If there are estimates for the requested quarter, # overwrite all values going up to the starting index of # that quarter with estimates for that quarter. if requested_quarter in quarters_with_estimates_for_sid: - col_to_overwrites[column_name][next_qtr_start_idx] = \ - [self.create_overwrite_for_estimate( + col_to_overwrites[column_name][next_qtr_start_idx] = [ + self.create_overwrite_for_estimate( col, column_name, last_per_qtr, @@ -352,18 +369,20 @@ class EarningsEstimatesLoader(PipelineLoader): requested_quarter, sid, sid_idx - )] + ), + ] # There are no estimates for the quarter. Overwrite all # values going up to the starting index of that quarter # with the missing value for this column. else: - col_to_overwrites[column_name][next_qtr_start_idx] =\ - [self.overwrite_with_null( + col_to_overwrites[column_name][next_qtr_start_idx] = [ + self.overwrite_with_null( col, last_per_qtr.index, next_qtr_start_idx, sid_idx - )] + ), + ] def overwrite_with_null(self, column, @@ -403,34 +422,23 @@ class EarningsEstimatesLoader(PipelineLoader): # To optimize performance, only work below on assets that are # actually in the raw data. assets_with_data = set(assets) & set(self.estimates[SID_FIELD_NAME]) + last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr( + assets_with_data, + columns, + dates + ) + # Determine which quarter is immediately next/previous for each + # date. + zeroth_quarter_idx = self.get_zeroth_quarter_idx(stacked_last_per_qtr) + zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx] + for num_quarters, columns in groups.items(): - last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr( - assets_with_data, columns, dates - ) - # Determine which quarter is immediately next/previous for each - # date. - zeroth_quarter_idx = self.get_zeroth_quarter_idx( - num_quarters, stacked_last_per_qtr - ) - zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx] - # Doing it this way because creating a MultiIndex from scratch - # results in being unable to unstack sids because of duplicate - # values, even though the MultiIndex is created with the same - # exact values as below - possible pandas bug. - requested_qtr_idx = zero_qtr_data.reset_index( - NORMALIZED_QUARTERS - ).set_index( - self.get_shifted_qtrs( - zeroth_quarter_idx.get_level_values(NORMALIZED_QUARTERS), - num_quarters - ), - append=True - ).index - requested_qtr_idx = requested_qtr_idx.rename( - SHIFTED_NORMALIZED_QTRS, -1 - ) requested_qtr_data = self.get_requested_quarter_data( - stacked_last_per_qtr, requested_qtr_idx, dates + zero_qtr_data, + zeroth_quarter_idx, + stacked_last_per_qtr, + num_quarters, + dates, ) # Calculate all adjustments for the given quarter and accumulate @@ -441,17 +449,33 @@ class EarningsEstimatesLoader(PipelineLoader): dates, assets_with_data, columns) + + # Lookup the asset indexer once, this is so we can reindex + # the assets returned into the assets requested for each column. + # This depends on the fact that our column multiindex has the same + # sids for each field. This allows us to do the lookup once on + # level 1 instead of doing the lookup each time per value in + # level 0. + asset_indexer = assets.get_indexer_for( + requested_qtr_data.columns.levels[1], + ) for col in columns: column_name = self.name_map[col.name] - # We may have dropped assets if they never have any data for - # the requested quarter. - df = pd.DataFrame(data=requested_qtr_data[column_name], - index=dates, - columns=assets, - dtype=col.dtype) + # allocate the empty output with the correct missing value + output_array = np.full( + (len(dates), len(assets)), + col.missing_value, + dtype=col.dtype, + ) + # overwrite the missing value with values from the computed + # data + output_array[ + :, + asset_indexer, + ] = requested_qtr_data[column_name].values out[col] = AdjustedArray( - df.values.astype(col.dtype), + output_array, mask, dict(col_to_adjustments[column_name]), col.missing_value, @@ -487,20 +511,26 @@ class EarningsEstimatesLoader(PipelineLoader): # self.estimates.columns, normalized_quarters, sid], where each cell # contains the latest data for that day. last_per_qtr = last_in_date_group( - self.estimates, dates, assets_with_data, reindex=True, - extra_groupers=[NORMALIZED_QUARTERS] + self.estimates, + dates, + assets_with_data, + reindex=True, + extra_groupers=[NORMALIZED_QUARTERS], ) # Forward fill values for each quarter/sid/dataset column. ffill_across_cols(last_per_qtr, columns, self.name_map) # Stack quarter and sid into the index. - stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME, - NORMALIZED_QUARTERS]) + stacked_last_per_qtr = last_per_qtr.stack( + [SID_FIELD_NAME, NORMALIZED_QUARTERS], + ) # Set date index name for ease of reference - stacked_last_per_qtr.index.set_names(SIMULTATION_DATES, - level=0, - inplace=True) + stacked_last_per_qtr.index.set_names( + SIMULTATION_DATES, + level=0, + inplace=True, + ) stacked_last_per_qtr = stacked_last_per_qtr.sort( - EVENT_DATE_FIELD_NAME + EVENT_DATE_FIELD_NAME, ) stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] = pd.to_datetime( stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] @@ -509,9 +539,7 @@ class EarningsEstimatesLoader(PipelineLoader): class NextEarningsEstimatesLoader(EarningsEstimatesLoader): - @property - def searchsorted_side(self): - return 'right' + searchsorted_side = 'right' def create_overwrite_for_estimate(self, column, @@ -530,13 +558,14 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): last_per_qtr[ column_name, requested_quarter, - sid - ][:next_qtr_start_idx].values) + sid, + ].values[:next_qtr_start_idx], + ) def get_shifted_qtrs(self, zero_qtrs, num_quarters): return zero_qtrs + (num_quarters - 1) - def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr): + def get_zeroth_quarter_idx(self, stacked_last_per_qtr): """ Filters for releases that are on or after each simulation date and determines the next quarter by picking out the upcoming release for @@ -544,8 +573,6 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): Parameters ---------- - num_quarters : int - Number of quarters to go out in the future. stacked_last_per_qtr : pd.DataFrame A DataFrame with index of calendar dates, sid, and normalized quarters with each row being the latest estimate for the row's @@ -562,15 +589,14 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader): stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >= stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) ].groupby( - level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False + level=[SIMULTATION_DATES, SID_FIELD_NAME], + as_index=False, ).nth(0) return next_releases_per_date.index class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): - @property - def searchsorted_side(self): - return 'left' + searchsorted_side = 'left' def create_overwrite_for_estimate(self, column, @@ -580,15 +606,17 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): requested_quarter, sid, sid_idx): - return self.overwrite_with_null(column, - dates, - next_qtr_start_idx, - sid_idx) + return self.overwrite_with_null( + column, + dates, + next_qtr_start_idx, + sid_idx, + ) def get_shifted_qtrs(self, zero_qtrs, num_quarters): return zero_qtrs - (num_quarters - 1) - def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr): + def get_zeroth_quarter_idx(self, stacked_last_per_qtr): """ Filters for releases that are on or after each simulation date and determines the previous quarter by picking out the most recent @@ -596,8 +624,6 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): Parameters ---------- - num_quarters : int - Number of quarters to go out in the past. stacked_last_per_qtr : pd.DataFrame A DataFrame with index of calendar dates, sid, and normalized quarters with each row being the latest estimate for the row's @@ -614,6 +640,7 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader): stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <= stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES) ].groupby( - level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False + level=[SIMULTATION_DATES, SID_FIELD_NAME], + as_index=False, ).nth(-1) return previous_releases_per_date.index diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index a81df8fe..028da74c 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -276,7 +276,11 @@ def check_data_query_args(data_query_time, data_query_tz): ) -def last_in_date_group(df, dates, assets, reindex=True, have_sids=True, +def last_in_date_group(df, + dates, + assets, + reindex=True, + have_sids=True, extra_groupers=[]): """ Determine the last piece of information known on each date in the date