diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index 201011ea..02666964 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -1,14 +1,178 @@ -def test_shift_quarters_forward(): - quarters = list(range(1, 5)) - shifts = list(range(5)) - expected = [(x, i) for ] - expected = ((0, 1), (0, 2), (0, 3), (0, 4), (1, 1), - (0, 2), (0, 3), (0, 4), (1, 1), (1, 2)) - for quarter in quarters: - for shift in shifts: - yrs_to_shift, new_qtr = EstimizeLoader.calc_forward_shift(quarter, - shift) - if quarter + shift <= 4: - assert yrs_to_shift == 0 - assert new_qtr == quarter + shift +from itertools import product +import numpy as np +import pandas as pd +from zipline.pipeline import SimplePipelineEngine, Pipeline + +from zipline.pipeline.data import DataSet, Column +from zipline.pipeline.loaders.quarter_estimates import \ + NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader +from zipline.testing import ZiplineTestCase +from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions +from zipline.testing.predicates import assert_equal +from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype + + +class Estimates(DataSet): + event_date = Column(dtype=datetime64ns_dtype) + fiscal_quarter = Column(dtype=float64_dtype) + fiscal_year = Column(dtype=float64_dtype) + estimate = Column(dtype=float64_dtype) + value = Column(dtype=float64_dtype) + + +def QuartersEstimates(num_qtr): + class QtrEstimates(Estimates): + num_quarters = num_qtr + name=Estimates + return QtrEstimates + +# Final release dates never change +releases = pd.DataFrame({ + 'sid': [1, 1], + 'timestamp': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-4-20')], + 'event_date': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-04-20')], + 'estimate': [0.5, 0.8], + 'value': [0.6, 0.9], + 'fiscal_quarter': [1, 2], + 'fiscal_year': [2015, 2015] +}) + +estimates = pd.DataFrame({ + 'sid': [1, 1, 1, 1], + 'timestamp': [pd.Timestamp('2015-01-02'), + pd.Timestamp('2015-01-10'), + pd.Timestamp('2015-04-02'), + pd.Timestamp('2015-4-10')], + 'event_date': [pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-01-20'), + pd.Timestamp('2015-04-20'), + pd.Timestamp('2015-04-20')], + 'estimate': [.1, .2, .3, .4], + 'value': [np.NaN, np.NaN, np.NaN, np.NaN], + 'fiscal_quarter': [1, 1, 2, 2], + 'fiscal_year': [2015, 2015, 2015, 2015] +}) + +events = pd.concat([releases, estimates]) + + +class NextEstimateTestCase(WithAssetFinder, + WithTradingSessions, + ZiplineTestCase): + START_DATE = pd.Timestamp('2015-01-01') + END_DATE = pd.Timestamp('2015-04-30') + + @classmethod + def make_loader(cls, events, columns): + return NextQuartersEstimatesLoader(events, columns) + + @classmethod + def init_class_fixtures(cls): + cls.events = events + cls.columns = { + Estimates.estimate: 'estimate', + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.value: 'value', + } + cls.loader = cls.make_loader( + events=cls.events, + columns=cls.columns + ) + cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique()) + cls.ASSET_FINDER_EQUITY_SYMBOLS = [ + 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS + ] + super(NextEstimateTestCase, cls).init_class_fixtures() + + def test_regular(self): + dataset = QuartersEstimates(1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + + results = engine.run_pipeline( + Pipeline({c.name: c.latest for c in dataset.columns}), + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + sid_events = results.xs(1, level=1) + ed_sorted_events = self.events.sort(['event_date', 'timestamp']) + for i, date in enumerate(sid_events.index): + # Get all upcoming events that we know about on 'date' + eligible_timestamps = ed_sorted_events[ed_sorted_events['timestamp'] + <= date] + eligible_events = eligible_timestamps[eligible_timestamps['event_date'] >= date] + if not eligible_events.empty: + smallest_event_date = eligible_events.iloc[0]['event_date'] + expected_event = eligible_events[eligible_events['event_date'] == smallest_event_date].iloc[-1] + for colname in sid_events.columns: + expected_value = expected_event[colname] + computed_value = sid_events.iloc[i][colname] + assert_equal(expected_value, computed_value) else: + assert sid_events.iloc[i].isnull().all() + + +class PreviousEstimateTestCase(WithAssetFinder, + WithTradingSessions, + ZiplineTestCase): + START_DATE = pd.Timestamp('2015-01-01') + END_DATE = pd.Timestamp('2015-04-30') + + @classmethod + def make_loader(cls, events, columns): + return PreviousQuartersEstimatesLoader(events, columns) + + @classmethod + def init_class_fixtures(cls): + cls.events = events + cls.columns = { + Estimates.estimate: 'estimate', + Estimates.event_date: 'event_date', + Estimates.fiscal_quarter: 'fiscal_quarter', + Estimates.fiscal_year: 'fiscal_year', + Estimates.value: 'value', + } + cls.loader = cls.make_loader( + events=cls.events, + columns=cls.columns + ) + cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique()) + cls.ASSET_FINDER_EQUITY_SYMBOLS = [ + 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS + ] + super(PreviousEstimateTestCase, cls).init_class_fixtures() + + def test_regular(self): + dataset = QuartersEstimates(1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + + results = engine.run_pipeline( + Pipeline({c.name: c.latest for c in dataset.columns}), + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + sid_events = results.xs(1, level=1) + ed_sorted_events = self.events.sort(['event_date', 'timestamp']) + for i, date in enumerate(sid_events.index): + # Filter for events that happened on or before the simulation + # date and that we knew about on or before the simulation date. + ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date] + ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date] + if not ts_eligible_events.empty: + # The expected event is the one we knew about last. + expected_event = ts_eligible_events.iloc[-1] + for colname in sid_events.columns: + expected_value = expected_event[colname] + computed_value = sid_events.iloc[i][colname] + assert_equal(expected_value, computed_value) + else: + assert sid_events.iloc[i].isnull().all() diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 01e1e659..fc3252e4 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -1219,6 +1219,36 @@ def bind_expression_to_resources(expr, resources): }) +def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, + odo_kwargs): + lower_dt, upper_dt = normalize_data_query_bounds( + dates[0], + dates[-1], + data_query_time, + data_query_tz, + ) + raw = ffill_query_in_range( + expr, + lower_dt, + upper_dt, + odo_kwargs, + ) + sids = raw.loc[:, SID_FIELD_NAME] + raw.drop( + sids[~sids.isin(assets)].index, + inplace=True + ) + if data_query_time is not None: + normalize_timestamp_to_query_time( + raw, + data_query_time, + data_query_tz, + inplace=True, + ts_field=TS_FIELD_NAME, + ) + return raw + + def ffill_query_in_range(expr, lower, upper, @@ -1273,4 +1303,4 @@ def ffill_query_in_range(expr, **odo_kwargs ) raw.loc[:, ts_field] = raw.loc[:, ts_field].astype('datetime64[ns]') - return raw + return raw \ No newline at end of file diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py index 6a89fa7b..c2341164 100644 --- a/zipline/pipeline/loaders/blaze/estimates.py +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -2,24 +2,17 @@ from datashape import istabular from .core import ( bind_expression_to_resources, - ffill_query_in_range, + load_raw_data, ) from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.events import ( - EventsLoader, required_event_fields, ) -from zipline.pipeline.common import ( - SID_FIELD_NAME, - TS_FIELD_NAME, -) from zipline.pipeline.loaders.quarter_estimates import \ NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader from zipline.pipeline.loaders.utils import ( check_data_query_args, - normalize_data_query_bounds, - normalize_timestamp_to_query_time, - load_raw_data) +) from zipline.utils.input_validation import ensure_timezone, optionally from zipline.utils.preprocess import preprocess diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index 4165166b..d5ac8e37 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -2,22 +2,14 @@ from datashape import istabular from .core import ( bind_expression_to_resources, - ffill_query_in_range, + load_raw_data, ) from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.events import ( EventsLoader, required_event_fields, ) -from zipline.pipeline.common import ( - SID_FIELD_NAME, - TS_FIELD_NAME, -) -from zipline.pipeline.loaders.utils import ( - check_data_query_args, - normalize_data_query_bounds, - normalize_timestamp_to_query_time, - load_raw_data) +from zipline.pipeline.loaders.utils import check_data_query_args from zipline.utils.input_validation import ensure_timezone, optionally from zipline.utils.preprocess import preprocess diff --git a/zipline/pipeline/loaders/quarter_estimates.py b/zipline/pipeline/loaders/quarter_estimates.py index 495bc075..6fb2d5d8 100644 --- a/zipline/pipeline/loaders/quarter_estimates.py +++ b/zipline/pipeline/loaders/quarter_estimates.py @@ -1,11 +1,17 @@ -from itertools import groupby -import numpy as np import pandas as pd from six import viewvalues -from zipline.pipeline.common import AD_FIELD_NAME, SID_FIELD_NAME, \ - EVENT_DATE_FIELD_NAME, FISCAL_QUARTER_FIELD_NAME, FISCAL_YEAR_FIELD_NAME +from toolz import groupby +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.frame import DataFrameLoader +from zipline.pipeline.loaders.utils import calc_backward_shift, \ + calc_forward_shift def required_event_fields(columns): @@ -15,7 +21,7 @@ def required_event_fields(columns): """ # These metadata columns are used to align event indexers. return { - AD_FIELD_NAME, + TS_FIELD_NAME, SID_FIELD_NAME, EVENT_DATE_FIELD_NAME, FISCAL_QUARTER_FIELD_NAME, @@ -48,23 +54,6 @@ def validate_column_specs(events, columns): ) -def calc_forward_shift(qtr, num_shifts): - yrs_to_shift, new_qtr = divmod(qtr + num_shifts, 4) - if yrs_to_shift == 1 and new_qtr == 0: - yrs_to_shift = 0 - new_qtr = 4 - return yrs_to_shift, new_qtr - - -def calc_backward_shift(qtr, num_shifts): - yrs_to_shift, new_qtr = divmod(abs(num_shifts - qtr), 4) - if yrs_to_shift == 0 and new_qtr == 0: - yrs_to_shift = 1 - new_qtr = 4 - yrs_to_shift = -yrs_to_shift - return yrs_to_shift, new_qtr - - class QuarterEstimatesLoader(PipelineLoader): def __init__(self, events, @@ -75,60 +64,45 @@ class QuarterEstimatesLoader(PipelineLoader): ) self.events = events[ - events[EVENT_DATE_FIELD_NAME].notnull() and - events[FISCAL_QUARTER_FIELD_NAME].notnull() and + events[EVENT_DATE_FIELD_NAME].notnull() & + events[FISCAL_QUARTER_FIELD_NAME].notnull() & events[FISCAL_YEAR_FIELD_NAME].notnull() ] self.columns = columns - def load_quarters(self, next_releases, num_quarters, dates_sids, gb): + def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): pass def load_adjusted_array(self, columns, dates, assets, mask): groups = groupby(lambda x: x.dataset.num_quarters, columns) out = {} - date_values = pd.DataFrame(dates, columns=['dates']) + date_values = pd.DataFrame({'dates': dates}) date_values['key'] = 1 self.events['key'] = 1 merged = pd.merge(date_values, self.events, on='key') - asset_df = pd.DataFrame(assets, columns=['sid']) + asset_df = pd.DataFrame({'sid': assets}) asset_df['key'] = 1 dates_sids = pd.merge(date_values, asset_df, on='key') for num_quarters in groups: columns = groups[num_quarters] # First, group by sid, fiscal year, and fiscal quarter and only # keep the last estimate made. - final_releases_per_qtr = merged[merged.asof_date <= + final_releases_per_qtr = merged[merged[TS_FIELD_NAME] <= merged.dates].sort( - ['dates', 'asof_date'] + ['dates', TS_FIELD_NAME] ).groupby( ['dates', 'sid', 'fiscal_year', 'fiscal_quarter'] ).last() - gb = final_releases_per_qtr.reset_index().groupby(['dates', 'sid']) - # Split the date-sid combinations into ones with a next release - # and ones without - eligible_next_releases = pd.concat([group[1] for group in gb if ( - group[1][EVENT_DATE_FIELD_NAME] >= group[1]['dates'] - ).any()]) + final_releases_per_qtr = final_releases_per_qtr.reset_index() - eligible_next_releases.sort(EVENT_DATE_FIELD_NAME) - # For each sid, get the next release/year/quarter that we care - # about. - next_releases = eligible_next_releases.groupby( - ['dates', 'sid'] - ).min() - next_releases = next_releases.rename( - columns={'fiscal_year': 'next_fiscal_year', - 'fiscal_quarter': 'next_fiscal_quarter'} - ) - - result = self.load_quarters(next_releases, - num_quarters, - dates_sids) + result = self.load_quarters(num_quarters, + dates_sids, + final_releases_per_qtr) for c in columns: - column_name = self.columns[c.name] + super_col = getattr(c.dataset.__base__, c.name) + column_name = self.columns[super_col] # Need to pass a DataFrame that has dates as the index and # all sids as columns with column values being the value in # 'result' for column c @@ -147,9 +121,24 @@ class NextQuartersEstimatesLoader(QuarterEstimatesLoader): def __init__(self, events, columns): - super(NextQuartersEstimatesLoader).__init__(events, columns) + super(NextQuartersEstimatesLoader, self).__init__(events, columns) - def load_quarters(self, next_releases, num_quarters, dates_sids, gb): + def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): + # Filter for releases that are after each simulation date. + eligible_next_releases = final_releases_per_qtr[ + final_releases_per_qtr[EVENT_DATE_FIELD_NAME] >= + final_releases_per_qtr['dates'] + ] + + eligible_next_releases.sort(EVENT_DATE_FIELD_NAME) + # For each sid, get the upcoming release/year/quarter. + next_releases = eligible_next_releases.groupby( + ['dates', 'sid'] + ).min() + next_releases = next_releases.rename( + columns={'fiscal_year': 'next_fiscal_year', + 'fiscal_quarter': 'next_fiscal_quarter'} + ) # `next_qtr` is already the next quarter over, # so we should offest `num_shifts` by 1. next_releases['fiscal_quarter'] = next_releases.apply( @@ -175,47 +164,39 @@ class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader): def __init__(self, events, columns): - super(PreviousQuartersEstimatesLoader).__init__(events, columns) + super(PreviousQuartersEstimatesLoader, self).__init__(events, columns) - def load_quarters(self, next_releases, num_quarters, dates_sids, gb): - next_releases['fiscal_quarter'] = next_releases.apply( - lambda x: calc_backward_shift(x['next_fiscal_quarter'], - num_quarters)[1], - axis=1 - ) - next_releases['fiscal_year'] = next_releases.apply( - lambda x: - x['next_fiscal_year'] + - calc_backward_shift(x['next_fiscal_quarter'], - num_quarters)[0], - axis=1 - ) - only_previous_releases = pd.concat([group[1] for group in gb if ( - group[1][EVENT_DATE_FIELD_NAME] < group[1]['dates'] - ).all()]) - only_previous_releases.sort(EVENT_DATE_FIELD_NAME) + def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): + # Filter for releases that are before each simulation date. + eligible_previous_releases = final_releases_per_qtr[ + final_releases_per_qtr[EVENT_DATE_FIELD_NAME] <= + final_releases_per_qtr['dates'] + ] + + eligible_previous_releases.sort(EVENT_DATE_FIELD_NAME) # For each sid, get the latest release we knew about prior to # each simulation date. - previous_releases = only_previous_releases.groupby(['dates', - 'sid']).max() + previous_releases = eligible_previous_releases.groupby( + ['dates', 'sid'] + ).max() + previous_releases = previous_releases.rename(columns={ 'fiscal_year': 'previous_fiscal_year', 'fiscal_quarter': 'previous_fiscal_quarter' }) previous_releases['fiscal_quarter'] = previous_releases.apply( lambda x: calc_backward_shift(x['previous_fiscal_quarter'], - num_quarters)[1], + (num_quarters - 1))[1], axis=1 ) previous_releases['fiscal_year'] = previous_releases.apply( lambda x: - x['previous_fiscal_year'] + + x['previous_fiscal_year'] - calc_backward_shift(x['previous_fiscal_quarter'], - num_quarters)[0], + (num_quarters - 1))[0], axis=1 ) - all_releases = pd.concat([next_releases, previous_releases]) # Merge to get the rows we care about for each date - result = dates_sids.merge(all_releases.reset_index(), + result = dates_sids.merge(previous_releases.reset_index(), on=(['dates', 'sid']), how='left') return result diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 2f388810..4e999b07 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -2,8 +2,6 @@ import datetime import numpy as np import pandas as pd -from zipline.pipeline.common import TS_FIELD_NAME, SID_FIELD_NAME -from zipline.pipeline.loaders.blaze.core import ffill_query_in_range from zipline.utils.pandas_utils import mask_between_time @@ -276,31 +274,59 @@ def check_data_query_args(data_query_time, data_query_tz): ) -def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, - odo_kwargs): - lower_dt, upper_dt = normalize_data_query_bounds( - dates[0], - dates[-1], - data_query_time, - data_query_tz, - ) - raw = ffill_query_in_range( - expr, - lower_dt, - upper_dt, - odo_kwargs, - ) - sids = raw.loc[:, SID_FIELD_NAME] - raw.drop( - sids[~sids.isin(assets)].index, - inplace=True - ) - if data_query_time is not None: - normalize_timestamp_to_query_time( - raw, - data_query_time, - data_query_tz, - inplace=True, - ts_field=TS_FIELD_NAME, - ) - return raw +def calc_forward_shift(qtr, num_qtrs_shift): + """ + Calculate the number of years to shift forward and the new quarter in the + shifted year. + + Parameters + ---------- + qtr : int + The starting quarter. + num_qtr_shift : int + The number of quarters to shift forward. + + Returns + ------- + yrs_to_shift : int + The number of years to shift forward. + new_qtr : int + The quarter number of the new quarter after shifting num_qtrs_shift + forward from qtr. + """ + yrs_to_shift, new_qtr = divmod(qtr + num_qtrs_shift, 4) + if new_qtr == 0: + yrs_to_shift -= 1 + new_qtr = 4 + return yrs_to_shift, new_qtr + + +def calc_backward_shift(qtr, num_qtrs_shift): + """ + Calculate the number of years to shift backward and the new quarter in the + shifted year. + + Parameters + ---------- + qtr : int + The starting quarter. + num_qtr_shift : int + The number of quarters to shift backward. + + Returns + ------- + yrs_to_shift : int + The number of years to shift backward. + new_qtr : int + The quarter number of the new quarter after shifting num_qtrs_shift + backward from qtr. + """ + if qtr > num_qtrs_shift: + return 0, qtr - num_qtrs_shift + # num_qtrs_shift >= qtr; subtract to offset qtr, then calculate how many + # years/quarters to subtract. + yrs_to_shift, subtract_qtr = divmod(abs(num_qtrs_shift - qtr), 4) + # Must add 1 year since we go backwards at least `qtr` number of quarters + yrs_to_shift += 1 + new_qtr = 4 - subtract_qtr + return yrs_to_shift, new_qtr \ No newline at end of file