diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index 02666964..2328f965 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -1,15 +1,24 @@ -from itertools import product +import itertools import numpy as np import pandas as pd +from pandas.util.testing import assert_series_equal from zipline.pipeline import SimplePipelineEngine, Pipeline from zipline.pipeline.data import DataSet, Column -from zipline.pipeline.loaders.quarter_estimates import \ - NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader +from zipline.pipeline.loaders.quarter_estimates import ( + NextQuartersEstimatesLoader, + PreviousQuartersEstimatesLoader +) +from zipline.pipeline.loaders.quarter_estimates import ( + calc_forward_shift, + calc_backward_shift +) from zipline.testing import ZiplineTestCase from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions from zipline.testing.predicates import assert_equal from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype +import line_profiler +prof = line_profiler.LineProfiler() class Estimates(DataSet): @@ -23,70 +32,106 @@ class Estimates(DataSet): def QuartersEstimates(num_qtr): class QtrEstimates(Estimates): num_quarters = num_qtr - name=Estimates + name = Estimates return QtrEstimates -# Final release dates never change +# Final release dates never change. The quarters have very tight date ranges +# in order to reduce the number of dates we need to iterate through when +# testing. releases = pd.DataFrame({ - 'sid': [1, 1], - 'timestamp': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-4-20')], - 'event_date': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-04-20')], + 'timestamp': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')], + 'event_date': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')], 'estimate': [0.5, 0.8], 'value': [0.6, 0.9], - 'fiscal_quarter': [1, 2], - 'fiscal_year': [2015, 2015] + 'fiscal_quarter': [1.0, 2.0], + 'fiscal_year': [2015.0, 2015.0] }) +q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'), + pd.Timestamp('2015-01-08'), pd.Timestamp('2015-01-12')] +q2_knowledge_dates = [pd.Timestamp('2015-01-16'), pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-24'), pd.Timestamp('2015-01-28')] +# We want to model the possibility of an estimate predicting a release date +# that gets shifted forward/backward. +q1_release_dates = [pd.Timestamp('2015-01-13'), pd.Timestamp('2015-01-15')] +q2_release_dates = [pd.Timestamp('2015-01-28'), pd.Timestamp('2015-01-30')] estimates = pd.DataFrame({ - 'sid': [1, 1, 1, 1], - 'timestamp': [pd.Timestamp('2015-01-02'), - pd.Timestamp('2015-01-10'), - pd.Timestamp('2015-04-02'), - pd.Timestamp('2015-4-10')], - 'event_date': [pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-01-20'), - pd.Timestamp('2015-04-20'), - pd.Timestamp('2015-04-20')], 'estimate': [.1, .2, .3, .4], 'value': [np.NaN, np.NaN, np.NaN, np.NaN], - 'fiscal_quarter': [1, 1, 2, 2], - 'fiscal_year': [2015, 2015, 2015, 2015] + 'fiscal_quarter': [1.0, 1.0, 2.0, 2.0], + 'fiscal_year': [2015.0, 2015.0, 2015.0, 2015.0] }) -events = pd.concat([releases, estimates]) + +def gen_estimates(): + sid_estimates = [] + sid_releases = [] + release_dates = list(itertools.product(q1_release_dates, q2_release_dates)) + knowledge_permutations = list(itertools.permutations(q1_knowledge_dates + + q2_knowledge_dates, + 4)) + all_permutations = itertools.product(knowledge_permutations, + release_dates) + for sid, ((q1e1, q1e2, q2e1, q2e2), (rd1, rd2)) in enumerate( + all_permutations): + # We're assuming that estimates must come before the relevant release. + if q1e1 < q1e2 and q2e1 < q2e2 and q1e1 < rd1 and q1e2 < \ + rd2: + sid_estimate = estimates.copy(True) + sid_estimate['timestamp'] = [q1e1, q1e2, q2e1, q2e2] + sid_estimate['event_date'] = [rd1]*2 + [rd2] * 2 + sid_estimate['sid'] = sid + sid_estimates += [sid_estimate] + sid_release = releases.copy(True) + sid_release['sid'] = sid_estimate['sid'] + sid_releases += [sid_release] + + return pd.concat(sid_estimates + sid_releases).reset_index(drop=True) -class NextEstimateTestCase(WithAssetFinder, - WithTradingSessions, - ZiplineTestCase): - START_DATE = pd.Timestamp('2015-01-01') - END_DATE = pd.Timestamp('2015-04-30') +class EstimateTestCase(WithAssetFinder, + WithTradingSessions, + ZiplineTestCase): + START_DATE = pd.Timestamp('2014-12-28') + END_DATE = pd.Timestamp('2015-02-03') + @classmethod + def make_loader(cls, events, columns): + pass + + @classmethod + def init_class_fixtures(cls): + cls.events = gen_estimates() + cls.sids = cls.events['sid'].unique() + cls.columns = { + Estimates.estimate: 'estimate', + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.value: 'value', + } + cls.loader = cls.make_loader( + events=cls.events, + columns=cls.columns + ) + cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique()) + cls.ASSET_FINDER_EQUITY_SYMBOLS = [ + 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS + ] + super(EstimateTestCase, cls).init_class_fixtures() + + +class NextEstimateTestCase(EstimateTestCase): @classmethod def make_loader(cls, events, columns): return NextQuartersEstimatesLoader(events, columns) - @classmethod - def init_class_fixtures(cls): - cls.events = events - cls.columns = { - Estimates.estimate: 'estimate', - Estimates.event_date: 'event_date', - Estimates.fiscal_quarter: 'fiscal_quarter', - Estimates.fiscal_year: 'fiscal_year', - Estimates.value: 'value', - } - cls.loader = cls.make_loader( - events=cls.events, - columns=cls.columns - ) - cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique()) - cls.ASSET_FINDER_EQUITY_SYMBOLS = [ - 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS - ] - super(NextEstimateTestCase, cls).init_class_fixtures() - - def test_regular(self): + #@profile + def test_next_estimates(self): + """ + The goal of this test is to make sure that we select the right + datapoint as our 'next' w.r.t each date. + """ dataset = QuartersEstimates(1) engine = SimplePipelineEngine( lambda x: self.loader, @@ -99,55 +144,43 @@ class NextEstimateTestCase(WithAssetFinder, start_date=self.trading_days[0], end_date=self.trading_days[-1], ) - sid_events = results.xs(1, level=1) - ed_sorted_events = self.events.sort(['event_date', 'timestamp']) - for i, date in enumerate(sid_events.index): - # Get all upcoming events that we know about on 'date' - eligible_timestamps = ed_sorted_events[ed_sorted_events['timestamp'] - <= date] - eligible_events = eligible_timestamps[eligible_timestamps['event_date'] >= date] - if not eligible_events.empty: - smallest_event_date = eligible_events.iloc[0]['event_date'] - expected_event = eligible_events[eligible_events['event_date'] == smallest_event_date].iloc[-1] - for colname in sid_events.columns: - expected_value = expected_event[colname] - computed_value = sid_events.iloc[i][colname] - assert_equal(expected_value, computed_value) - else: - assert sid_events.iloc[i].isnull().all() + for sid in self.sids: + sid_events = results.xs(sid, level=1) + ed_sorted_events = self.events[ + self.events['sid'] == sid + ] + ed_sorted_events['key'] = 1 + all_dates = pd.DataFrame({'all_dates': sid_events.index}) + all_dates['key'] = 1 + crossproduct = pd.merge(all_dates, ed_sorted_events, on='key') + crossproduct = crossproduct[crossproduct['timestamp'] <= + crossproduct['all_dates']] + crossproduct = crossproduct[crossproduct['event_date'] >= + crossproduct['all_dates']] + final = crossproduct.sort_values(by=['all_dates', + 'event_date', + 'timestamp'], + ascending=[True, True, + False]).groupby([ + 'all_dates', 'sid']).first().reset_index() + final = pd.merge(final, all_dates, + how='right').sort_values(by='all_dates').set_index( + 'all_dates') + final.index.name = None + for colname in sid_events.columns: + assert_series_equal(final[colname], sid_events[colname]) -class PreviousEstimateTestCase(WithAssetFinder, - WithTradingSessions, - ZiplineTestCase): - START_DATE = pd.Timestamp('2015-01-01') - END_DATE = pd.Timestamp('2015-04-30') - +class PreviousEstimateTestCase(EstimateTestCase): @classmethod def make_loader(cls, events, columns): return PreviousQuartersEstimatesLoader(events, columns) - @classmethod - def init_class_fixtures(cls): - cls.events = events - cls.columns = { - Estimates.estimate: 'estimate', - Estimates.event_date: 'event_date', - Estimates.fiscal_quarter: 'fiscal_quarter', - Estimates.fiscal_year: 'fiscal_year', - Estimates.value: 'value', - } - cls.loader = cls.make_loader( - events=cls.events, - columns=cls.columns - ) - cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique()) - cls.ASSET_FINDER_EQUITY_SYMBOLS = [ - 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS - ] - super(PreviousEstimateTestCase, cls).init_class_fixtures() - - def test_regular(self): + def test_previous_estimates(self): + """ + The goal of this test is to make sure that we select the right + datapoint as our 'previous' w.r.t each date. + """ dataset = QuartersEstimates(1) engine = SimplePipelineEngine( lambda x: self.loader, @@ -160,19 +193,53 @@ class PreviousEstimateTestCase(WithAssetFinder, start_date=self.trading_days[0], end_date=self.trading_days[-1], ) - sid_events = results.xs(1, level=1) - ed_sorted_events = self.events.sort(['event_date', 'timestamp']) - for i, date in enumerate(sid_events.index): - # Filter for events that happened on or before the simulation - # date and that we knew about on or before the simulation date. - ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date] - ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date] - if not ts_eligible_events.empty: - # The expected event is the one we knew about last. - expected_event = ts_eligible_events.iloc[-1] - for colname in sid_events.columns: - expected_value = expected_event[colname] - computed_value = sid_events.iloc[i][colname] - assert_equal(expected_value, computed_value) - else: - assert sid_events.iloc[i].isnull().all() + for sid in self.sids: + sid_events = results.xs(sid, level=1) + ed_sorted_events = self.events[ + self.events['sid'] == sid + ].sort_values(by=['event_date', 'timestamp']) + for i, date in enumerate(sid_events.index): + # Filter for events that happened on or before the simulation + # date and that we knew about on or before the simulation date. + ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date] + ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date] + if not ts_eligible_events.empty: + # The expected event is the one we knew about last. + expected_event = ts_eligible_events.iloc[-1] + for colname in sid_events.columns: + expected_value = expected_event[colname] + computed_value = sid_events.iloc[i][colname] + assert_equal(expected_value, computed_value) + else: + assert sid_events.iloc[i].isnull().all() + + +class QuarterShiftTestCase(ZiplineTestCase): + """ + This tests, in isolation, quarter calculation logic for shifting quarters + backwards/forwards from a starting point. + """ + def test_calc_forward_shift(self): + input_yrs = pd.Series([0] * 4) + input_qtrs = pd.Series(range(1, 5)) + expected = pd.DataFrame(([yr, qtr] for yr in range(0, 4) for qtr + in range(1, 5))) + for i in range(0, 8): + years, quarters = calc_forward_shift(input_yrs, input_qtrs, i) + # Can't use assert_series_equal here with check_names=False + # because that still fails due to name differences. + assert years.equals(expected[i:i+4].reset_index(drop=True)[0]) + assert quarters.equals(expected[i:i+4].reset_index(drop=True)[1]) + + + def test_calc_backward_shift(self): + input_yrs = pd.Series([0] * 4) + input_qtrs = pd.Series(range(4, 0, -1)) + expected = pd.DataFrame(([yr, qtr] for yr in range(0, -4, -1) for qtr + in range(4, 0, -1))) + for i in range(0, 8): + years, quarters = calc_backward_shift(input_yrs, input_qtrs, i) + # Can't use assert_series_equal here with check_names=False + # because that still fails due to name differences. + assert years.equals(expected[i:i+4].reset_index(drop=True)[0]) + assert quarters.equals(expected[i:i+4].reset_index(drop=True)[1]) diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index 1c49779c..af11499e 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -12,6 +12,7 @@ from zipline.pipeline.common import ( TS_FIELD_NAME, ) from zipline.pipeline.loaders.utils import ( + choose_rows_by_indexer, next_event_indexer, previous_event_indexer, ) @@ -166,7 +167,8 @@ class EventsLoader(PipelineLoader): if not columns: return {} - return self._load_events( + return choose_rows_by_indexer( + rows=self.events, name_map=self.next_value_columns, indexer=self.next_event_indexer(dates, sids), columns=columns, @@ -179,7 +181,8 @@ class EventsLoader(PipelineLoader): if not columns: return {} - return self._load_events( + return choose_rows_by_indexer( + rows=self.events, name_map=self.previous_value_columns, indexer=self.previous_event_indexer(dates, sids), columns=columns, @@ -188,22 +191,6 @@ class EventsLoader(PipelineLoader): mask=mask, ) - def _load_events(self, name_map, indexer, columns, dates, sids, mask): - def to_frame(array): - return pd.DataFrame(array, index=dates, columns=sids) - - out = {} - for c in columns: - raw = self.events[name_map[c]][indexer] - # indexer will be -1 for locations where we don't have a known - # value. - raw[indexer < 0] = c.missing_value - - # Delegate the actual array formatting logic to a DataFrameLoader. - loader = DataFrameLoader(c, to_frame(raw), adjustments=None) - out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c] - return out - def load_adjusted_array(self, columns, dates, sids, mask): n, p = self.split_next_and_previous_event_columns(columns) return merge( diff --git a/zipline/pipeline/loaders/quarter_estimates.py b/zipline/pipeline/loaders/quarter_estimates.py index 6fb2d5d8..fa480b4d 100644 --- a/zipline/pipeline/loaders/quarter_estimates.py +++ b/zipline/pipeline/loaders/quarter_estimates.py @@ -1,3 +1,4 @@ +import numpy as np import pandas as pd from six import viewvalues from toolz import groupby @@ -10,8 +11,84 @@ from zipline.pipeline.common import ( ) from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.frame import DataFrameLoader -from zipline.pipeline.loaders.utils import calc_backward_shift, \ - calc_forward_shift + +import line_profiler +from zipline.pipeline.loaders.utils import choose_rows_by_indexer + +PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' + +PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' + +NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' + +NEXT_FISCAL_YEAR = 'next_fiscal_year' + +FISCAL_QUARTER = 'fiscal_quarter' + +FISCAL_YEAR = 'fiscal_year' + +ALL_DATES = 'dates' + +prof = line_profiler.LineProfiler() + + +#@profile +def calc_forward_shift(yrs, qtrs, num_qtrs_shift): + """ + Calculate the number of years to shift forward and the new quarter in the + shifted year. + + Parameters + ---------- + qtr : int + The starting quarter. + num_qtr_shift : int + The number of quarters to shift forward. + yr : int + The starting year. + + Returns + ------- + s : pd.Series + A series containins the new year and quarter. + """ + + result_qtrs = (qtrs + num_qtrs_shift) % 4 + result_years = yrs + (qtrs + num_qtrs_shift) // 4 + to_adjust = result_qtrs[result_qtrs == 0].index + result_years.iloc[to_adjust] -= 1 + result_qtrs.iloc[to_adjust] = 4 + return result_years, result_qtrs + + +#@profile +def calc_backward_shift(yrs, qtrs, num_qtrs_shift): + """ + Calculate the number of years to shift backward and the new quarter in the + shifted year. + + Parameters + ---------- + qtr : int + The starting quarter. + num_qtr_shift : int + The number of quarters to shift backward. + yr : int + The starting year. + + Returns + ------- + s : pd.Series + A series containins the new year and quarter. + """ + result_qtrs = 4 - (num_qtrs_shift - qtrs) % 4 + # Must subtract 1 year since we go backwards at least `qtr` number of + # quarters + result_years = yrs - (num_qtrs_shift - qtrs) // 4 - 1 + no_yr_boundary_crossed = qtrs[qtrs > num_qtrs_shift].index + result_years.iloc[no_yr_boundary_crossed] = yrs.iloc[no_yr_boundary_crossed] + result_qtrs.iloc[no_yr_boundary_crossed] = qtrs.iloc[no_yr_boundary_crossed] - num_qtrs_shift + return result_years, result_qtrs def required_event_fields(columns): @@ -56,35 +133,40 @@ def validate_column_specs(events, columns): class QuarterEstimatesLoader(PipelineLoader): def __init__(self, - events, - columns): + estimates, + base_column_name_map): validate_column_specs( - events, - columns + estimates, + base_column_name_map ) - self.events = events[ - events[EVENT_DATE_FIELD_NAME].notnull() & - events[FISCAL_QUARTER_FIELD_NAME].notnull() & - events[FISCAL_YEAR_FIELD_NAME].notnull() + self.estimates = estimates[ + estimates[EVENT_DATE_FIELD_NAME].notnull() & + estimates[FISCAL_QUARTER_FIELD_NAME].notnull() & + estimates[FISCAL_YEAR_FIELD_NAME].notnull() ] - self.columns = columns + self.base_column_name_map = base_column_name_map def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): pass + #@profile def load_adjusted_array(self, columns, dates, assets, mask): groups = groupby(lambda x: x.dataset.num_quarters, columns) out = {} date_values = pd.DataFrame({'dates': dates}) date_values['key'] = 1 - self.events['key'] = 1 - merged = pd.merge(date_values, self.events, on='key') - asset_df = pd.DataFrame({'sid': assets}) + self.estimates['key'] = 1 + merged = pd.merge(date_values, self.estimates, on='key') + asset_df = pd.DataFrame({SID_FIELD_NAME: assets}) asset_df['key'] = 1 dates_sids = pd.merge(date_values, asset_df, on='key') + merged.drop('key', axis=1, inplace=True) + dates_sids.drop('key', axis=1, inplace=True) for num_quarters in groups: + name_map = {c: self.base_column_name_map[getattr(c.dataset.__base__, c.name)] for c in columns} + columns = groups[num_quarters] # First, group by sid, fiscal year, and fiscal quarter and only # keep the last estimate made. @@ -92,7 +174,7 @@ class QuarterEstimatesLoader(PipelineLoader): merged.dates].sort( ['dates', TS_FIELD_NAME] ).groupby( - ['dates', 'sid', 'fiscal_year', 'fiscal_quarter'] + ['dates', SID_FIELD_NAME, FISCAL_YEAR, FISCAL_QUARTER] ).last() final_releases_per_qtr = final_releases_per_qtr.reset_index() @@ -101,15 +183,14 @@ class QuarterEstimatesLoader(PipelineLoader): final_releases_per_qtr) for c in columns: - super_col = getattr(c.dataset.__base__, c.name) - column_name = self.columns[super_col] + column_name = name_map[c] # Need to pass a DataFrame that has dates as the index and # all sids as columns with column values being the value in # 'result' for column c loader = DataFrameLoader( c, result.pivot(index='dates', - columns='sid', + columns=SID_FIELD_NAME, values=column_name), adjustments=None ) @@ -118,11 +199,8 @@ class QuarterEstimatesLoader(PipelineLoader): class NextQuartersEstimatesLoader(QuarterEstimatesLoader): - def __init__(self, - events, - columns): - super(NextQuartersEstimatesLoader, self).__init__(events, columns) + #@profile def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): # Filter for releases that are after each simulation date. eligible_next_releases = final_releases_per_qtr[ @@ -133,39 +211,34 @@ class NextQuartersEstimatesLoader(QuarterEstimatesLoader): eligible_next_releases.sort(EVENT_DATE_FIELD_NAME) # For each sid, get the upcoming release/year/quarter. next_releases = eligible_next_releases.groupby( - ['dates', 'sid'] - ).min() + ['dates', SID_FIELD_NAME] + ).nth(0).reset_index() # We use nth here to avoid forward filling + # NaNs, which `first()` will do. next_releases = next_releases.rename( - columns={'fiscal_year': 'next_fiscal_year', - 'fiscal_quarter': 'next_fiscal_quarter'} + columns={FISCAL_YEAR: NEXT_FISCAL_YEAR, + FISCAL_QUARTER: NEXT_FISCAL_QUARTER} ) # `next_qtr` is already the next quarter over, # so we should offest `num_shifts` by 1. - next_releases['fiscal_quarter'] = next_releases.apply( - lambda x: calc_forward_shift(x['next_fiscal_quarter'], - num_quarters - 1)[1], - axis=1 - ) - next_releases['fiscal_year'] = next_releases.apply( - lambda x: - x['next_fiscal_year'] + - calc_forward_shift(x['next_fiscal_quarter'], - num_quarters - 1)[0], - axis=1 + (next_releases[FISCAL_YEAR], + next_releases[FISCAL_QUARTER]) = calc_forward_shift( + next_releases[NEXT_FISCAL_YEAR], + next_releases[NEXT_FISCAL_QUARTER], (num_quarters - 1) ) # Merge to get the rows we care about for each date - result = dates_sids.merge(next_releases.reset_index(), - on=(['dates', 'sid']), + result = dates_sids.merge(next_releases, + on=(['dates', SID_FIELD_NAME]), how='left') return result class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader): def __init__(self, - events, + estimates, columns): - super(PreviousQuartersEstimatesLoader, self).__init__(events, columns) + super(PreviousQuartersEstimatesLoader, self).__init__(estimates, columns) + #@profile def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): # Filter for releases that are before each simulation date. eligible_previous_releases = final_releases_per_qtr[ @@ -177,26 +250,23 @@ class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader): # For each sid, get the latest release we knew about prior to # each simulation date. previous_releases = eligible_previous_releases.groupby( - ['dates', 'sid'] - ).max() + ['dates', SID_FIELD_NAME] + ).nth(-1).reset_index() # We use nth here to avoid forward filling + # NaNs, which `last()` will do. previous_releases = previous_releases.rename(columns={ - 'fiscal_year': 'previous_fiscal_year', - 'fiscal_quarter': 'previous_fiscal_quarter' + FISCAL_YEAR: PREVIOUS_FISCAL_YEAR, + FISCAL_QUARTER: PREVIOUS_FISCAL_QUARTER }) - previous_releases['fiscal_quarter'] = previous_releases.apply( - lambda x: calc_backward_shift(x['previous_fiscal_quarter'], - (num_quarters - 1))[1], - axis=1 - ) - previous_releases['fiscal_year'] = previous_releases.apply( - lambda x: - x['previous_fiscal_year'] - - calc_backward_shift(x['previous_fiscal_quarter'], - (num_quarters - 1))[0], - axis=1 + + (previous_releases[FISCAL_YEAR], + previous_releases[FISCAL_QUARTER]) = \ + calc_backward_shift( + previous_releases[PREVIOUS_FISCAL_YEAR], previous_releases[ + PREVIOUS_FISCAL_QUARTER], (num_quarters - 1) ) # Merge to get the rows we care about for each date - result = dates_sids.merge(previous_releases.reset_index(), - on=(['dates', 'sid']), how='left') + result = dates_sids.merge(previous_releases, + on=(['dates', SID_FIELD_NAME]), how='left') return result + diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 4e999b07..385022a8 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -2,6 +2,7 @@ import datetime import numpy as np import pandas as pd +from zipline.pipeline.loaders.frame import DataFrameLoader from zipline.utils.pandas_utils import mask_between_time @@ -274,59 +275,18 @@ def check_data_query_args(data_query_time, data_query_tz): ) -def calc_forward_shift(qtr, num_qtrs_shift): - """ - Calculate the number of years to shift forward and the new quarter in the - shifted year. +def choose_rows_by_indexer(rows, name_map, indexer, columns, dates, sids, mask): + def to_frame(array): + return pd.DataFrame(array, index=dates, columns=sids) - Parameters - ---------- - qtr : int - The starting quarter. - num_qtr_shift : int - The number of quarters to shift forward. + out = {} + for c in columns: + raw = rows[name_map[c]][indexer] + # indexer will be -1 for locations where we don't have a known + # value. + raw[indexer < 0] = c.missing_value - Returns - ------- - yrs_to_shift : int - The number of years to shift forward. - new_qtr : int - The quarter number of the new quarter after shifting num_qtrs_shift - forward from qtr. - """ - yrs_to_shift, new_qtr = divmod(qtr + num_qtrs_shift, 4) - if new_qtr == 0: - yrs_to_shift -= 1 - new_qtr = 4 - return yrs_to_shift, new_qtr - - -def calc_backward_shift(qtr, num_qtrs_shift): - """ - Calculate the number of years to shift backward and the new quarter in the - shifted year. - - Parameters - ---------- - qtr : int - The starting quarter. - num_qtr_shift : int - The number of quarters to shift backward. - - Returns - ------- - yrs_to_shift : int - The number of years to shift backward. - new_qtr : int - The quarter number of the new quarter after shifting num_qtrs_shift - backward from qtr. - """ - if qtr > num_qtrs_shift: - return 0, qtr - num_qtrs_shift - # num_qtrs_shift >= qtr; subtract to offset qtr, then calculate how many - # years/quarters to subtract. - yrs_to_shift, subtract_qtr = divmod(abs(num_qtrs_shift - qtr), 4) - # Must add 1 year since we go backwards at least `qtr` number of quarters - yrs_to_shift += 1 - new_qtr = 4 - subtract_qtr - return yrs_to_shift, new_qtr \ No newline at end of file + # Delegate the actual array formatting logic to a DataFrameLoader. + loader = DataFrameLoader(c, to_frame(raw), adjustments=None) + out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c] + return out \ No newline at end of file