PERF: vectorize earnings estimates

This commit is contained in:
Joe Jevnik
2016-09-19 21:13:28 -04:00
committed by Maya Tydykov
parent cbd9bd068c
commit 25b38520fa
2 changed files with 156 additions and 125 deletions
+151 -124
View File
@@ -1,5 +1,7 @@
from collections import defaultdict
from abc import abstractmethod, abstractproperty
from collections import defaultdict
import numpy as np
import pandas as pd
from six import viewvalues
from toolz import groupby
@@ -136,11 +138,15 @@ class EarningsEstimatesLoader(PipelineLoader):
self.estimates[FISCAL_YEAR_FIELD_NAME],
self.estimates[FISCAL_QUARTER_FIELD_NAME],
)
self.array_overwrites_dict = {datetime64ns_dtype:
Datetime641DArrayOverwrite,
float64_dtype: Float641DArrayOverwrite}
self.scalar_overwrites_dict = {datetime64ns_dtype: Datetime64Overwrite,
float64_dtype: Float64Overwrite}
self.array_overwrites_dict = {
datetime64ns_dtype: Datetime641DArrayOverwrite,
float64_dtype: Float641DArrayOverwrite,
}
self.scalar_overwrites_dict = {
datetime64ns_dtype: Datetime64Overwrite,
float64_dtype: Float64Overwrite,
}
self.name_map = name_map
@@ -167,18 +173,25 @@ class EarningsEstimatesLoader(PipelineLoader):
def searchsorted_side(self):
return NotImplementedError('searchsorted_side')
def get_requested_quarter_data(self, stacked_last_per_qtr, idx, dates):
def get_requested_quarter_data(self,
zero_qtr_data,
zeroth_quarter_idx,
stacked_last_per_qtr,
num_quarters,
dates):
"""
Selects the requested data for each date.
Parameters
----------
zero_qtr_data : pd.DataFrame
The 'time zero' data for each calendar date per sid.
zeroth_quarter_idx : pd.Index
An index of calendar dates, sid, and normalized quarters, for only
the rows that have a next or previous earnings estimate.
stacked_last_per_qtr : pd.DataFrame
The latest estimate known with the dates, normalized quarter, and
The latest estimate known with the dates, normalized quarter, and
sid as the index.
idx : pd.MultiIndex
The index of the row of the requested quarter from each date for
each sid.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
@@ -189,13 +202,27 @@ class EarningsEstimatesLoader(PipelineLoader):
for all columns; `dates` are the index and columns are a MultiIndex
with sids at the top level and the dataset columns on the bottom.
"""
requested_qtr_data = stacked_last_per_qtr.loc[idx]
# We've lost the index names when doing `loc`, so set them here.
requested_qtr_data.index = requested_qtr_data.index.set_names(
idx.names
zero_qtr_data_idx = zero_qtr_data.index
requested_qtr_idx = pd.MultiIndex.from_arrays(
[
zero_qtr_data_idx.get_level_values(0),
zero_qtr_data_idx.get_level_values(1),
self.get_shifted_qtrs(
zeroth_quarter_idx.get_level_values(
NORMALIZED_QUARTERS,
),
num_quarters,
),
],
names=[
zero_qtr_data_idx.names[0],
zero_qtr_data_idx.names[1],
SHIFTED_NORMALIZED_QTRS,
],
)
requested_qtr_data = stacked_last_per_qtr.loc[requested_qtr_idx]
requested_qtr_data = requested_qtr_data.reset_index(
SHIFTED_NORMALIZED_QTRS
SHIFTED_NORMALIZED_QTRS,
)
# Calculate the actual year/quarter being requested and add those in
# as columns.
@@ -244,54 +271,44 @@ class EarningsEstimatesLoader(PipelineLoader):
col_to_overwrites = defaultdict(dict)
# We no longer need NORMALIZED_QUARTERS in the index, but we do need it
# as a column to calculate adjustments.
zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS)
zero_qtr_data = zero_qtr_data.reset_index(level=NORMALIZED_QUARTERS)
zero_qtr_data.sort_index(inplace=True)
for sid_idx, sid in enumerate(assets):
zero_qtr_sid_data = zero_qtr_data[
zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid
quarter_shifts = zero_qtr_data.loc[
zero_qtr_data.index[
zero_qtr_data.groupby(level=SID_FIELD_NAME)[
NORMALIZED_QUARTERS
].diff().nonzero()
]
# Determine where quarters are changing for this sid.
qtr_shifts = zero_qtr_sid_data[
zero_qtr_sid_data[NORMALIZED_QUARTERS] !=
zero_qtr_sid_data[NORMALIZED_QUARTERS].shift(1)
]
# On dates where we don't have any information about quarters,
# we will get nulls, and each of these will be interpreted as
# quarter shifts. We need to remove these here.
qtr_shifts = qtr_shifts[
qtr_shifts[NORMALIZED_QUARTERS].notnull()
]
# For the given sid, determine which quarters we have estimates
# for.
qtrs_with_estimates_for_sid = last_per_qtr.xs(
sid, axis=1, level=SID_FIELD_NAME
).groupby(axis=1, level=1).first().columns.values
for row_indexer in list(qtr_shifts.index):
# Find the starting index of the quarter that comes right
# after this row. This isn't the starting index of the
# requested quarter, but simply the date we cross over into a
# new quarter.
next_qtr_start_idx = dates.searchsorted(
zero_qtr_data.loc[
row_indexer
][EVENT_DATE_FIELD_NAME],
side=self.searchsorted_side
)
# Only add adjustments if the next quarter starts somewhere in
# our date index for this sid. Our 'next' quarter can never
# start at index 0; a starting index of 0 means that the next
# quarter's event date was NaT.
if 0 < next_qtr_start_idx < len(dates):
]
sid_to_idx = dict(zip(assets, range(len(assets))))
def collect_adjustments(group):
next_qtr_start_indices = dates.searchsorted(
group[EVENT_DATE_FIELD_NAME].values,
side=self.searchsorted_side,
)
sid = int(group.name)
qtrs_with_estimates = group[NORMALIZED_QUARTERS].values
for idx in next_qtr_start_indices:
if 0 < idx < len(dates):
# Only add adjustments if the next quarter starts somewhere
# in our date index for this sid. Our 'next' quarter can
# never start at index 0; a starting index of 0 means that
# the next quarter's event date was NaT.
self.create_overwrite_for_quarter(
col_to_overwrites,
next_qtr_start_idx,
idx,
last_per_qtr,
qtrs_with_estimates_for_sid,
qtrs_with_estimates,
requested_qtr_data,
sid,
sid_idx,
sid_to_idx[sid],
columns,
)
quarter_shifts.groupby(level=SID_FIELD_NAME).apply(collect_adjustments)
return col_to_overwrites
def create_overwrite_for_quarter(self,
@@ -335,16 +352,16 @@ class EarningsEstimatesLoader(PipelineLoader):
# Find the quarter being requested in the quarter we're
# crossing into.
requested_quarter = requested_qtr_data[
SHIFTED_NORMALIZED_QTRS
][sid].iloc[next_qtr_start_idx]
SHIFTED_NORMALIZED_QTRS, sid,
].iloc[next_qtr_start_idx]
for col in columns:
column_name = self.name_map[col.name]
# If there are estimates for the requested quarter,
# overwrite all values going up to the starting index of
# that quarter with estimates for that quarter.
if requested_quarter in quarters_with_estimates_for_sid:
col_to_overwrites[column_name][next_qtr_start_idx] = \
[self.create_overwrite_for_estimate(
col_to_overwrites[column_name][next_qtr_start_idx] = [
self.create_overwrite_for_estimate(
col,
column_name,
last_per_qtr,
@@ -352,18 +369,20 @@ class EarningsEstimatesLoader(PipelineLoader):
requested_quarter,
sid,
sid_idx
)]
),
]
# There are no estimates for the quarter. Overwrite all
# values going up to the starting index of that quarter
# with the missing value for this column.
else:
col_to_overwrites[column_name][next_qtr_start_idx] =\
[self.overwrite_with_null(
col_to_overwrites[column_name][next_qtr_start_idx] = [
self.overwrite_with_null(
col,
last_per_qtr.index,
next_qtr_start_idx,
sid_idx
)]
),
]
def overwrite_with_null(self,
column,
@@ -403,34 +422,23 @@ class EarningsEstimatesLoader(PipelineLoader):
# To optimize performance, only work below on assets that are
# actually in the raw data.
assets_with_data = set(assets) & set(self.estimates[SID_FIELD_NAME])
last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr(
assets_with_data,
columns,
dates
)
# Determine which quarter is immediately next/previous for each
# date.
zeroth_quarter_idx = self.get_zeroth_quarter_idx(stacked_last_per_qtr)
zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx]
for num_quarters, columns in groups.items():
last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr(
assets_with_data, columns, dates
)
# Determine which quarter is immediately next/previous for each
# date.
zeroth_quarter_idx = self.get_zeroth_quarter_idx(
num_quarters, stacked_last_per_qtr
)
zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx]
# Doing it this way because creating a MultiIndex from scratch
# results in being unable to unstack sids because of duplicate
# values, even though the MultiIndex is created with the same
# exact values as below - possible pandas bug.
requested_qtr_idx = zero_qtr_data.reset_index(
NORMALIZED_QUARTERS
).set_index(
self.get_shifted_qtrs(
zeroth_quarter_idx.get_level_values(NORMALIZED_QUARTERS),
num_quarters
),
append=True
).index
requested_qtr_idx = requested_qtr_idx.rename(
SHIFTED_NORMALIZED_QTRS, -1
)
requested_qtr_data = self.get_requested_quarter_data(
stacked_last_per_qtr, requested_qtr_idx, dates
zero_qtr_data,
zeroth_quarter_idx,
stacked_last_per_qtr,
num_quarters,
dates,
)
# Calculate all adjustments for the given quarter and accumulate
@@ -441,17 +449,33 @@ class EarningsEstimatesLoader(PipelineLoader):
dates,
assets_with_data,
columns)
# Lookup the asset indexer once, this is so we can reindex
# the assets returned into the assets requested for each column.
# This depends on the fact that our column multiindex has the same
# sids for each field. This allows us to do the lookup once on
# level 1 instead of doing the lookup each time per value in
# level 0.
asset_indexer = assets.get_indexer_for(
requested_qtr_data.columns.levels[1],
)
for col in columns:
column_name = self.name_map[col.name]
# We may have dropped assets if they never have any data for
# the requested quarter.
df = pd.DataFrame(data=requested_qtr_data[column_name],
index=dates,
columns=assets,
dtype=col.dtype)
# allocate the empty output with the correct missing value
output_array = np.full(
(len(dates), len(assets)),
col.missing_value,
dtype=col.dtype,
)
# overwrite the missing value with values from the computed
# data
output_array[
:,
asset_indexer,
] = requested_qtr_data[column_name].values
out[col] = AdjustedArray(
df.values.astype(col.dtype),
output_array,
mask,
dict(col_to_adjustments[column_name]),
col.missing_value,
@@ -487,20 +511,26 @@ class EarningsEstimatesLoader(PipelineLoader):
# self.estimates.columns, normalized_quarters, sid], where each cell
# contains the latest data for that day.
last_per_qtr = last_in_date_group(
self.estimates, dates, assets_with_data, reindex=True,
extra_groupers=[NORMALIZED_QUARTERS]
self.estimates,
dates,
assets_with_data,
reindex=True,
extra_groupers=[NORMALIZED_QUARTERS],
)
# Forward fill values for each quarter/sid/dataset column.
ffill_across_cols(last_per_qtr, columns, self.name_map)
# Stack quarter and sid into the index.
stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME,
NORMALIZED_QUARTERS])
stacked_last_per_qtr = last_per_qtr.stack(
[SID_FIELD_NAME, NORMALIZED_QUARTERS],
)
# Set date index name for ease of reference
stacked_last_per_qtr.index.set_names(SIMULTATION_DATES,
level=0,
inplace=True)
stacked_last_per_qtr.index.set_names(
SIMULTATION_DATES,
level=0,
inplace=True,
)
stacked_last_per_qtr = stacked_last_per_qtr.sort(
EVENT_DATE_FIELD_NAME
EVENT_DATE_FIELD_NAME,
)
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] = pd.to_datetime(
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME]
@@ -509,9 +539,7 @@ class EarningsEstimatesLoader(PipelineLoader):
class NextEarningsEstimatesLoader(EarningsEstimatesLoader):
@property
def searchsorted_side(self):
return 'right'
searchsorted_side = 'right'
def create_overwrite_for_estimate(self,
column,
@@ -530,13 +558,14 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader):
last_per_qtr[
column_name,
requested_quarter,
sid
][:next_qtr_start_idx].values)
sid,
].values[:next_qtr_start_idx],
)
def get_shifted_qtrs(self, zero_qtrs, num_quarters):
return zero_qtrs + (num_quarters - 1)
def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr):
def get_zeroth_quarter_idx(self, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the next quarter by picking out the upcoming release for
@@ -544,8 +573,6 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader):
Parameters
----------
num_quarters : int
Number of quarters to go out in the future.
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
@@ -562,15 +589,14 @@ class NextEarningsEstimatesLoader(EarningsEstimatesLoader):
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >=
stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES)
].groupby(
level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False
level=[SIMULTATION_DATES, SID_FIELD_NAME],
as_index=False,
).nth(0)
return next_releases_per_date.index
class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader):
@property
def searchsorted_side(self):
return 'left'
searchsorted_side = 'left'
def create_overwrite_for_estimate(self,
column,
@@ -580,15 +606,17 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader):
requested_quarter,
sid,
sid_idx):
return self.overwrite_with_null(column,
dates,
next_qtr_start_idx,
sid_idx)
return self.overwrite_with_null(
column,
dates,
next_qtr_start_idx,
sid_idx,
)
def get_shifted_qtrs(self, zero_qtrs, num_quarters):
return zero_qtrs - (num_quarters - 1)
def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr):
def get_zeroth_quarter_idx(self, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the previous quarter by picking out the most recent
@@ -596,8 +624,6 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader):
Parameters
----------
num_quarters : int
Number of quarters to go out in the past.
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
@@ -614,6 +640,7 @@ class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader):
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <=
stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES)
].groupby(
level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False
level=[SIMULTATION_DATES, SID_FIELD_NAME],
as_index=False,
).nth(-1)
return previous_releases_per_date.index
+5 -1
View File
@@ -276,7 +276,11 @@ def check_data_query_args(data_query_time, data_query_tz):
)
def last_in_date_group(df, dates, assets, reindex=True, have_sids=True,
def last_in_date_group(df,
dates,
assets,
reindex=True,
have_sids=True,
extra_groupers=[]):
"""
Determine the last piece of information known on each date in the date