From ef350f38894f64f2cb24d3eb17fb006bad9ae322 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 16 Aug 2016 08:28:50 -0400 Subject: [PATCH] TST: add cases for shifting release dates BUG: fix bugs in blaze loader BUG: call correct method MAINT: explicitly cast dates column MAINT: modify code to comply with pandas 0.16.1 --- tests/pipeline/test_quarters_estimates.py | 257 ++++++++++++------ zipline/pipeline/loaders/blaze/core.py | 32 +-- zipline/pipeline/loaders/blaze/estimates.py | 62 ++--- zipline/pipeline/loaders/blaze/events.py | 8 +- zipline/pipeline/loaders/blaze/utils.py | 61 +++++ zipline/pipeline/loaders/events.py | 23 +- zipline/pipeline/loaders/quarter_estimates.py | 234 +++++++--------- zipline/pipeline/loaders/utils.py | 18 -- 8 files changed, 372 insertions(+), 323 deletions(-) create mode 100644 zipline/pipeline/loaders/blaze/utils.py diff --git a/tests/pipeline/test_quarters_estimates.py b/tests/pipeline/test_quarters_estimates.py index 2328f965..f343bfb8 100644 --- a/tests/pipeline/test_quarters_estimates.py +++ b/tests/pipeline/test_quarters_estimates.py @@ -1,24 +1,30 @@ +import blaze as bz import itertools import numpy as np import pandas as pd -from pandas.util.testing import assert_series_equal -from zipline.pipeline import SimplePipelineEngine, Pipeline +from zipline.pipeline import SimplePipelineEngine, Pipeline +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) from zipline.pipeline.data import DataSet, Column +from zipline.pipeline.loaders.blaze.estimates import ( + BlazeNextEstimatesLoader, + BlazePreviousEstimatesLoader +) from zipline.pipeline.loaders.quarter_estimates import ( NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader ) -from zipline.pipeline.loaders.quarter_estimates import ( - calc_forward_shift, - calc_backward_shift -) +from zipline.pipeline.loaders.quarter_estimates import shift_quarters from zipline.testing import ZiplineTestCase from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions from zipline.testing.predicates import assert_equal from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype -import line_profiler -prof = line_profiler.LineProfiler() class Estimates(DataSet): @@ -39,12 +45,13 @@ def QuartersEstimates(num_qtr): # in order to reduce the number of dates we need to iterate through when # testing. releases = pd.DataFrame({ - 'timestamp': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')], - 'event_date': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')], + TS_FIELD_NAME: [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-31')], 'estimate': [0.5, 0.8], 'value': [0.6, 0.9], - 'fiscal_quarter': [1.0, 2.0], - 'fiscal_year': [2015.0, 2015.0] + FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0], + FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0] }) q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'), @@ -52,40 +59,43 @@ q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'), q2_knowledge_dates = [pd.Timestamp('2015-01-16'), pd.Timestamp('2015-01-20'), pd.Timestamp('2015-01-24'), pd.Timestamp('2015-01-28')] # We want to model the possibility of an estimate predicting a release date -# that gets shifted forward/backward. -q1_release_dates = [pd.Timestamp('2015-01-13'), pd.Timestamp('2015-01-15')] -q2_release_dates = [pd.Timestamp('2015-01-28'), pd.Timestamp('2015-01-30')] +# that doesn't match the actual release. This could be done by dynamically +# generating more combinations with different release dates, but that +# significantly increases the amount of time it takes to run the tests. These +# hard-coded cases are sufficient to know that we can update our beliefs when +# we get new information. +q1_release_dates = [pd.Timestamp('2015-01-15'), + pd.Timestamp('2015-01-16')] # One day late +q2_release_dates = [pd.Timestamp('2015-01-30'), # One day early + pd.Timestamp('2015-01-31')] estimates = pd.DataFrame({ + EVENT_DATE_FIELD_NAME: q1_release_dates + q2_release_dates, 'estimate': [.1, .2, .3, .4], 'value': [np.NaN, np.NaN, np.NaN, np.NaN], - 'fiscal_quarter': [1.0, 1.0, 2.0, 2.0], - 'fiscal_year': [2015.0, 2015.0, 2015.0, 2015.0] + FISCAL_QUARTER_FIELD_NAME: [1.0, 1.0, 2.0, 2.0], + FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0, 2015.0, 2015.0] }) def gen_estimates(): sid_estimates = [] sid_releases = [] - release_dates = list(itertools.product(q1_release_dates, q2_release_dates)) - knowledge_permutations = list(itertools.permutations(q1_knowledge_dates + - q2_knowledge_dates, - 4)) - all_permutations = itertools.product(knowledge_permutations, - release_dates) - for sid, ((q1e1, q1e2, q2e1, q2e2), (rd1, rd2)) in enumerate( - all_permutations): + for sid, (q1e1, q1e2, q2e1, q2e2) in enumerate( + itertools.permutations(q1_knowledge_dates + q2_knowledge_dates, + 4) + ): # We're assuming that estimates must come before the relevant release. - if q1e1 < q1e2 and q2e1 < q2e2 and q1e1 < rd1 and q1e2 < \ - rd2: + if (q1e1 < q1e2 and + q2e1 < q2e2 and + q1e1 < q1_release_dates[0] and + q1e2 < q1_release_dates[1]): sid_estimate = estimates.copy(True) - sid_estimate['timestamp'] = [q1e1, q1e2, q2e1, q2e2] - sid_estimate['event_date'] = [rd1]*2 + [rd2] * 2 - sid_estimate['sid'] = sid + sid_estimate[TS_FIELD_NAME] = [q1e1, q1e2, q2e1, q2e2] + sid_estimate[SID_FIELD_NAME] = sid sid_estimates += [sid_estimate] sid_release = releases.copy(True) - sid_release['sid'] = sid_estimate['sid'] + sid_release[SID_FIELD_NAME] = sid_estimate[SID_FIELD_NAME] sid_releases += [sid_release] - return pd.concat(sid_estimates + sid_releases).reset_index(drop=True) @@ -105,28 +115,44 @@ class EstimateTestCase(WithAssetFinder, cls.sids = cls.events['sid'].unique() cls.columns = { Estimates.estimate: 'estimate', - Estimates.event_date: 'event_date', - Estimates.fiscal_quarter: 'fiscal_quarter', - Estimates.fiscal_year: 'fiscal_year', + Estimates.event_date: EVENT_DATE_FIELD_NAME, + Estimates.fiscal_quarter: FISCAL_QUARTER_FIELD_NAME, + Estimates.fiscal_year: FISCAL_YEAR_FIELD_NAME, Estimates.value: 'value', } cls.loader = cls.make_loader( events=cls.events, columns=cls.columns ) - cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique()) + cls.ASSET_FINDER_EQUITY_SIDS = list( + cls.events[SID_FIELD_NAME].unique() + ) cls.ASSET_FINDER_EQUITY_SYMBOLS = [ 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS ] super(EstimateTestCase, cls).init_class_fixtures() + def _test_wrong_num_quarters_passed(self): + with self.assertRaises(ValueError): + dataset = QuartersEstimates(-1) + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + + engine.run_pipeline( + Pipeline({c.name: c.latest for c in dataset.columns}), + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + class NextEstimateTestCase(EstimateTestCase): @classmethod def make_loader(cls, events, columns): return NextQuartersEstimatesLoader(events, columns) - #@profile def test_next_estimates(self): """ The goal of this test is to make sure that we select the right @@ -145,30 +171,62 @@ class NextEstimateTestCase(EstimateTestCase): end_date=self.trading_days[-1], ) for sid in self.sids: - sid_events = results.xs(sid, level=1) - ed_sorted_events = self.events[ - self.events['sid'] == sid - ] - ed_sorted_events['key'] = 1 - all_dates = pd.DataFrame({'all_dates': sid_events.index}) - all_dates['key'] = 1 - crossproduct = pd.merge(all_dates, ed_sorted_events, on='key') - crossproduct = crossproduct[crossproduct['timestamp'] <= - crossproduct['all_dates']] - crossproduct = crossproduct[crossproduct['event_date'] >= - crossproduct['all_dates']] - final = crossproduct.sort_values(by=['all_dates', - 'event_date', - 'timestamp'], - ascending=[True, True, - False]).groupby([ - 'all_dates', 'sid']).first().reset_index() - final = pd.merge(final, all_dates, - how='right').sort_values(by='all_dates').set_index( - 'all_dates') - final.index.name = None - for colname in sid_events.columns: - assert_series_equal(final[colname], sid_events[colname]) + sid_estimates = results.xs(sid, level=1) + ts_sorted_estimates = self.events[ + self.events[SID_FIELD_NAME] == sid + ].sort(TS_FIELD_NAME) + for i, date in enumerate(sid_estimates.index): + comparable_date = date.tz_localize(None) + # Filter out estimates we don't know about yet. + ts_eligible_estimates = ts_sorted_estimates[ + ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date + ] + expected_estimate = pd.DataFrame() + if not ts_eligible_estimates.empty: + q1_knowledge = ts_eligible_estimates[ + ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 + ] + q2_knowledge = ts_eligible_estimates[ + ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 + ] + + # If our latest knowledge of q1 is that the release is + # happening on this simulation date or later, then that's + # the estimate we want to use. + if (not q1_knowledge.empty and + q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >= + comparable_date): + expected_estimate = q1_knowledge.iloc[-1] + # If q1 has already happened or we don't know about it + # yet and our latest knowledge indicates that q2 hasn't + # happend yet, then that's the estimate we want to use. + elif (not q2_knowledge.empty and + q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >= + comparable_date): + expected_estimate = q2_knowledge.iloc[-1] + if not expected_estimate.empty: + for colname in sid_estimates.columns: + expected_value = expected_estimate[colname] + computed_value = sid_estimates.iloc[i][colname] + assert_equal(expected_value, computed_value) + else: + assert sid_estimates.iloc[i].isnull().all() + + def test_wrong_num_quarters_passed(self): + self._test_wrong_num_quarters_passed() + + +class BlazeNextEstimateLoaderTestCase(NextEstimateTestCase): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, columns): + return BlazeNextEstimatesLoader( + bz.data(events), + columns, + ) class PreviousEstimateTestCase(EstimateTestCase): @@ -194,24 +252,62 @@ class PreviousEstimateTestCase(EstimateTestCase): end_date=self.trading_days[-1], ) for sid in self.sids: - sid_events = results.xs(sid, level=1) - ed_sorted_events = self.events[ - self.events['sid'] == sid - ].sort_values(by=['event_date', 'timestamp']) - for i, date in enumerate(sid_events.index): - # Filter for events that happened on or before the simulation - # date and that we knew about on or before the simulation date. - ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date] - ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date] - if not ts_eligible_events.empty: - # The expected event is the one we knew about last. - expected_event = ts_eligible_events.iloc[-1] - for colname in sid_events.columns: - expected_value = expected_event[colname] - computed_value = sid_events.iloc[i][colname] + sid_estimates = results.xs(sid, level=1) + ts_sorted_estimates = self.events[ + self.events[SID_FIELD_NAME] == sid + ].sort(TS_FIELD_NAME) + for i, date in enumerate(sid_estimates.index): + comparable_date = date.tz_localize(None) + # Filter out estimates we don't know about yet. + ts_eligible_estimates = ts_sorted_estimates[ + ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date + ] + expected_estimate = pd.DataFrame() + if not ts_eligible_estimates.empty: + # Determine the last piece of information we know about + # for q1 and q2. This takes advantage of the fact that we + # only have 2 quarters in the test data. + q1_knowledge = ts_eligible_estimates[ + ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1 + ] + q2_knowledge = ts_eligible_estimates[ + ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2 + ] + # The expected estimate will be for q2 if the last thing + # we've seen is that the release date already happened. + # Otherwise, it'll be for q1, as long as the release date + # for q1 has already happened. + if (not q2_knowledge.empty and + q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <= + comparable_date): + expected_estimate = q2_knowledge.iloc[-1] + elif (not q1_knowledge.empty and + q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <= + comparable_date): + expected_estimate = q1_knowledge.iloc[-1] + if not expected_estimate.empty: + for colname in sid_estimates.columns: + expected_value = expected_estimate[colname] + computed_value = sid_estimates.iloc[i][colname] assert_equal(expected_value, computed_value) else: - assert sid_events.iloc[i].isnull().all() + assert sid_estimates.iloc[i].isnull().all() + + def test_wrong_num_quarters_passed(self): + self._test_wrong_num_quarters_passed() + + +class BlazePreviousEstimateLoaderTestCase(PreviousEstimateTestCase): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, columns): + return BlazePreviousEstimatesLoader( + bz.data(events), + columns, + ) class QuarterShiftTestCase(ZiplineTestCase): @@ -225,20 +321,19 @@ class QuarterShiftTestCase(ZiplineTestCase): expected = pd.DataFrame(([yr, qtr] for yr in range(0, 4) for qtr in range(1, 5))) for i in range(0, 8): - years, quarters = calc_forward_shift(input_yrs, input_qtrs, i) + years, quarters = shift_quarters(i, input_yrs, input_qtrs) # Can't use assert_series_equal here with check_names=False # because that still fails due to name differences. assert years.equals(expected[i:i+4].reset_index(drop=True)[0]) assert quarters.equals(expected[i:i+4].reset_index(drop=True)[1]) - def test_calc_backward_shift(self): input_yrs = pd.Series([0] * 4) input_qtrs = pd.Series(range(4, 0, -1)) expected = pd.DataFrame(([yr, qtr] for yr in range(0, -4, -1) for qtr in range(4, 0, -1))) - for i in range(0, 8): - years, quarters = calc_backward_shift(input_yrs, input_qtrs, i) + for i in range(0, 8, 1): + years, quarters = shift_quarters(-i, input_yrs, input_qtrs) # Can't use assert_series_equal here with check_names=False # because that still fails due to name differences. assert years.equals(expected[i:i+4].reset_index(drop=True)[0]) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index fc3252e4..01e1e659 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -1219,36 +1219,6 @@ def bind_expression_to_resources(expr, resources): }) -def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, - odo_kwargs): - lower_dt, upper_dt = normalize_data_query_bounds( - dates[0], - dates[-1], - data_query_time, - data_query_tz, - ) - raw = ffill_query_in_range( - expr, - lower_dt, - upper_dt, - odo_kwargs, - ) - sids = raw.loc[:, SID_FIELD_NAME] - raw.drop( - sids[~sids.isin(assets)].index, - inplace=True - ) - if data_query_time is not None: - normalize_timestamp_to_query_time( - raw, - data_query_time, - data_query_tz, - inplace=True, - ts_field=TS_FIELD_NAME, - ) - return raw - - def ffill_query_in_range(expr, lower, upper, @@ -1303,4 +1273,4 @@ def ffill_query_in_range(expr, **odo_kwargs ) raw.loc[:, ts_field] = raw.loc[:, ts_field].astype('datetime64[ns]') - return raw \ No newline at end of file + return raw diff --git a/zipline/pipeline/loaders/blaze/estimates.py b/zipline/pipeline/loaders/blaze/estimates.py index c2341164..81554b0d 100644 --- a/zipline/pipeline/loaders/blaze/estimates.py +++ b/zipline/pipeline/loaders/blaze/estimates.py @@ -2,14 +2,14 @@ from datashape import istabular from .core import ( bind_expression_to_resources, - load_raw_data, ) from zipline.pipeline.loaders.base import PipelineLoader -from zipline.pipeline.loaders.events import ( - required_event_fields, +from zipline.pipeline.loaders.blaze.utils import load_raw_data +from zipline.pipeline.loaders.quarter_estimates import ( + NextQuartersEstimatesLoader, + PreviousQuartersEstimatesLoader, + required_estimates_fields, ) -from zipline.pipeline.loaders.quarter_estimates import \ - NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader from zipline.pipeline.loaders.utils import ( check_data_query_args, ) @@ -47,7 +47,7 @@ class BlazeEstimatesLoader(PipelineLoader): And other dataset-specific fields, where each row of the table is a record including the sid to identify the company, the timestamp where we - learned about the announcement, and the date when the earnings will be z + learned about the announcement, and the date when the earnings will be announced. If the '{TS_FIELD_NAME}' field is not included it is assumed that we @@ -61,8 +61,7 @@ class BlazeEstimatesLoader(PipelineLoader): resources=None, odo_kwargs=None, data_query_time=None, - data_query_tz=None, - loader=None): + data_query_tz=None): dshape = expr.dshape if not istabular(dshape): @@ -71,7 +70,7 @@ class BlazeEstimatesLoader(PipelineLoader): ) required_cols = list( - required_event_fields(columns) + required_estimates_fields(columns) ) self._expr = bind_expression_to_resources( expr[required_cols], @@ -82,15 +81,18 @@ class BlazeEstimatesLoader(PipelineLoader): check_data_query_args(data_query_time, data_query_tz) self._data_query_time = data_query_time self._data_query_tz = data_query_tz - self.loader = loader def load_adjusted_array(self, columns, dates, assets, mask): - raw = load_raw_data(assets, dates, self._data_query_time, - self._data_query_tz, self._exp, self._odo_kwargs) + raw = load_raw_data(assets, + dates, + self._data_query_time, + self._data_query_tz, + self._expr, + self._odo_kwargs) return self.loader( - events=raw, - next_value_columns=self._columns, + raw, + self._columns, ).load_adjusted_array( columns, dates, @@ -102,38 +104,6 @@ class BlazeEstimatesLoader(PipelineLoader): class BlazeNextEstimatesLoader(BlazeEstimatesLoader): loader = NextQuartersEstimatesLoader - def __init__(self, - expr, - columns, - resources=None, - odo_kwargs=None, - data_query_time=None, - data_query_tz=None, - loader=None): - super(BlazeNextEstimatesLoader).__init__(expr, - columns, - resources, - odo_kwargs, - data_query_time, - data_query_tz, - loader) - class BlazePreviousEstimatesLoader(BlazeEstimatesLoader): loader = PreviousQuartersEstimatesLoader - - def __init__(self, - expr, - columns, - resources=None, - odo_kwargs=None, - data_query_time=None, - data_query_tz=None, - loader=None): - super(BlazeNextEstimatesLoader).__init__(expr, - columns, - resources, - odo_kwargs, - data_query_time, - data_query_tz, - loader) diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index d5ac8e37..39e5dac6 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -2,14 +2,16 @@ from datashape import istabular from .core import ( bind_expression_to_resources, - load_raw_data, ) from zipline.pipeline.loaders.base import PipelineLoader +from zipline.pipeline.loaders.blaze.utils import load_raw_data from zipline.pipeline.loaders.events import ( EventsLoader, required_event_fields, ) -from zipline.pipeline.loaders.utils import check_data_query_args +from zipline.pipeline.loaders.utils import ( + check_data_query_args, +) from zipline.utils.input_validation import ensure_timezone, optionally from zipline.utils.preprocess import preprocess @@ -29,7 +31,7 @@ class BlazeEventsLoader(PipelineLoader): data_query_time : time, optional The time to use for the data query cutoff. data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. + The timezone to use for the data query cutoff. dataset : DataSet The DataSet object for which this loader loads data. diff --git a/zipline/pipeline/loaders/blaze/utils.py b/zipline/pipeline/loaders/blaze/utils.py new file mode 100644 index 00000000..6455f76c --- /dev/null +++ b/zipline/pipeline/loaders/blaze/utils.py @@ -0,0 +1,61 @@ +from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME +from zipline.pipeline.loaders.blaze.core import ffill_query_in_range +from zipline.pipeline.loaders.utils import ( + normalize_data_query_bounds, + normalize_timestamp_to_query_time, +) + + +def load_raw_data(assets, dates, data_query_time, data_query_tz, expr, + odo_kwargs): + """ + given an expression representing data to load, perform normalization and + forward-filling and return the data, materialized. + + parameters + ---------- + assets : pd.int64index + the assets to load data for. + dates : pd.datetimeindex + the simulation dates to load data for. + data_query_time : datetime.time + the time used as cutoff for new information. + data_query_tz : tzinfo + the timezone to normalize your dates to before comparing against + `time`. + expr : expr + the expression representing the data to load. + odo_kwargs : dict, optional + extra keyword arguments to pass to odo when executing the expression. + + returns + ------- + raw : pd.dataframe + the data symbolized by `expr` materialized in a dataframe. + """ + lower_dt, upper_dt = normalize_data_query_bounds( + dates[0], + dates[-1], + data_query_time, + data_query_tz, + ) + raw = ffill_query_in_range( + expr, + lower_dt, + upper_dt, + odo_kwargs, + ) + sids = raw.loc[:, SID_FIELD_NAME] + raw.drop( + sids[~sids.isin(assets)].index, + inplace=True + ) + if data_query_time is not None: + normalize_timestamp_to_query_time( + raw, + data_query_time, + data_query_tz, + inplace=True, + ts_field=TS_FIELD_NAME, + ) + return raw diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index af11499e..cb33b3b2 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -5,14 +5,13 @@ from six import viewvalues from toolz import groupby, merge from .base import PipelineLoader -from .frame import DataFrameLoader from zipline.pipeline.common import ( EVENT_DATE_FIELD_NAME, SID_FIELD_NAME, TS_FIELD_NAME, ) +from zipline.pipeline.loaders.frame import DataFrameLoader from zipline.pipeline.loaders.utils import ( - choose_rows_by_indexer, next_event_indexer, previous_event_indexer, ) @@ -167,7 +166,7 @@ class EventsLoader(PipelineLoader): if not columns: return {} - return choose_rows_by_indexer( + return self._load_events( rows=self.events, name_map=self.next_value_columns, indexer=self.next_event_indexer(dates, sids), @@ -181,7 +180,7 @@ class EventsLoader(PipelineLoader): if not columns: return {} - return choose_rows_by_indexer( + return self._load_events( rows=self.events, name_map=self.previous_value_columns, indexer=self.previous_event_indexer(dates, sids), @@ -191,6 +190,22 @@ class EventsLoader(PipelineLoader): mask=mask, ) + def _load_events(self, name_map, indexer, columns, dates, sids, mask): + def to_frame(array): + return pd.DataFrame(array, index=dates, columns=sids) + + out = {} + for c in columns: + raw = self.events[name_map[c]][indexer] + # indexer will be -1 for locations where we don't have a known + # value. + raw[indexer < 0] = c.missing_value + + # Delegate the actual array formatting logic to a DataFrameLoader. + loader = DataFrameLoader(c, to_frame(raw), adjustments=None) + out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c] + return out + def load_adjusted_array(self, columns, dates, sids, mask): n, p = self.split_next_and_previous_event_columns(columns) return merge( diff --git a/zipline/pipeline/loaders/quarter_estimates.py b/zipline/pipeline/loaders/quarter_estimates.py index fa480b4d..7837e57d 100644 --- a/zipline/pipeline/loaders/quarter_estimates.py +++ b/zipline/pipeline/loaders/quarter_estimates.py @@ -1,7 +1,8 @@ -import numpy as np +from abc import abstractmethod import pandas as pd from six import viewvalues from toolz import groupby + from zipline.pipeline.common import ( EVENT_DATE_FIELD_NAME, FISCAL_QUARTER_FIELD_NAME, @@ -11,90 +12,33 @@ from zipline.pipeline.common import ( ) from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.frame import DataFrameLoader - -import line_profiler -from zipline.pipeline.loaders.utils import choose_rows_by_indexer - -PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' - -PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' +from zipline.utils.pandas_utils import cross_product NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' - NEXT_FISCAL_YEAR = 'next_fiscal_year' - -FISCAL_QUARTER = 'fiscal_quarter' - -FISCAL_YEAR = 'fiscal_year' - -ALL_DATES = 'dates' - -prof = line_profiler.LineProfiler() +PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' +PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' +SIMULTATION_DATES = 'dates' -#@profile -def calc_forward_shift(yrs, qtrs, num_qtrs_shift): - """ - Calculate the number of years to shift forward and the new quarter in the - shifted year. - - Parameters - ---------- - qtr : int - The starting quarter. - num_qtr_shift : int - The number of quarters to shift forward. - yr : int - The starting year. - - Returns - ------- - s : pd.Series - A series containins the new year and quarter. - """ - - result_qtrs = (qtrs + num_qtrs_shift) % 4 - result_years = yrs + (qtrs + num_qtrs_shift) // 4 - to_adjust = result_qtrs[result_qtrs == 0].index - result_years.iloc[to_adjust] -= 1 - result_qtrs.iloc[to_adjust] = 4 - return result_years, result_qtrs +def normalize_quarters(years, quarters): + return years * 4 + quarters - 1 -#@profile -def calc_backward_shift(yrs, qtrs, num_qtrs_shift): - """ - Calculate the number of years to shift backward and the new quarter in the - shifted year. - - Parameters - ---------- - qtr : int - The starting quarter. - num_qtr_shift : int - The number of quarters to shift backward. - yr : int - The starting year. - - Returns - ------- - s : pd.Series - A series containins the new year and quarter. - """ - result_qtrs = 4 - (num_qtrs_shift - qtrs) % 4 - # Must subtract 1 year since we go backwards at least `qtr` number of - # quarters - result_years = yrs - (num_qtrs_shift - qtrs) // 4 - 1 - no_yr_boundary_crossed = qtrs[qtrs > num_qtrs_shift].index - result_years.iloc[no_yr_boundary_crossed] = yrs.iloc[no_yr_boundary_crossed] - result_qtrs.iloc[no_yr_boundary_crossed] = qtrs.iloc[no_yr_boundary_crossed] - num_qtrs_shift - return result_years, result_qtrs +def split_normalized_quarters(normalized_quarters): + years = normalized_quarters // 4 + quarters = normalized_quarters % 4 + return years, quarters + 1 -def required_event_fields(columns): +def shift_quarters(by, years, quarters): + return split_normalized_quarters(normalize_quarters(years, quarters) + by) + + +def required_estimates_fields(columns): """ Compute the set of resource columns required to serve - ``next_value_columns`` and ``previous_value_columns``. + `columns`. """ # These metadata columns are used to align event indexers. return { @@ -112,16 +56,16 @@ def required_event_fields(columns): def validate_column_specs(events, columns): """ - Verify that the columns of ``events`` can be used by an EventsLoader to - serve the BoundColumns described by ``next_value_columns`` and - ``previous_value_columns``. + Verify that the columns of ``events`` can be used by a + QuarterEstimatesLoader to serve the BoundColumns described by + `columns`. """ - required = required_event_fields(columns) + required = required_estimates_fields(columns) received = set(events.columns) missing = required - received if missing: raise ValueError( - "EventsLoader missing required columns {missing}.\n" + "QuarterEstimatesLoader missing required columns {missing}.\n" "Got Columns: {received}\n" "Expected Columns: {required}".format( missing=sorted(missing), @@ -148,35 +92,45 @@ class QuarterEstimatesLoader(PipelineLoader): self.base_column_name_map = base_column_name_map + @abstractmethod def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): pass - #@profile def load_adjusted_array(self, columns, dates, assets, mask): + # TODO: how can we enforce that datasets have the num_quarters + # attribute, given that they're created dynamically? groups = groupby(lambda x: x.dataset.num_quarters, columns) + groups_columns = dict(groups) + if (pd.Series(groups_columns.keys()) < 0).any(): + raise ValueError("Must pass a number of quarters >= 0") out = {} - date_values = pd.DataFrame({'dates': dates}) - date_values['key'] = 1 - self.estimates['key'] = 1 - merged = pd.merge(date_values, self.estimates, on='key') + date_values = pd.DataFrame({SIMULTATION_DATES: dates}) + # dates column must be of type datetime64[ns] in order for subsequent + # comparisons to work correctly. + date_values[SIMULTATION_DATES] = date_values[ + SIMULTATION_DATES + ].astype('datetime64[ns]') + estimates_all_dates = cross_product(date_values, self.estimates) asset_df = pd.DataFrame({SID_FIELD_NAME: assets}) - asset_df['key'] = 1 - dates_sids = pd.merge(date_values, asset_df, on='key') - merged.drop('key', axis=1, inplace=True) - dates_sids.drop('key', axis=1, inplace=True) - for num_quarters in groups: - name_map = {c: self.base_column_name_map[getattr(c.dataset.__base__, c.name)] for c in columns} + dates_sids = cross_product(date_values, asset_df) + for num_quarters, columns in groups_columns.iteritems(): + name_map = {c: + self.base_column_name_map[ + getattr(c.dataset.__base__, c.name) + ] for c in columns} - columns = groups[num_quarters] - # First, group by sid, fiscal year, and fiscal quarter and only - # keep the last estimate made. - final_releases_per_qtr = merged[merged[TS_FIELD_NAME] <= - merged.dates].sort( - ['dates', TS_FIELD_NAME] - ).groupby( - ['dates', SID_FIELD_NAME, FISCAL_YEAR, FISCAL_QUARTER] - ).last() - final_releases_per_qtr = final_releases_per_qtr.reset_index() + # First, determine which estimates we would have known about on + # each date. Then, Sort by timestamp and group to find the latest + # estimate for each quarter. + final_releases_per_qtr = estimates_all_dates[ + estimates_all_dates[TS_FIELD_NAME] <= + estimates_all_dates.dates + ].sort([TS_FIELD_NAME]).groupby( + [SIMULTATION_DATES, + SID_FIELD_NAME, + FISCAL_YEAR_FIELD_NAME, + FISCAL_QUARTER_FIELD_NAME] + ).nth(-1).reset_index() result = self.load_quarters(num_quarters, dates_sids, @@ -184,50 +138,51 @@ class QuarterEstimatesLoader(PipelineLoader): for c in columns: column_name = name_map[c] - # Need to pass a DataFrame that has dates as the index and - # all sids as columns with column values being the value in - # 'result' for column c + # Pivot to get a DataFrame with dates as the index and + # sids as the columns. loader = DataFrameLoader( c, - result.pivot(index='dates', + result.pivot(index=SIMULTATION_DATES, columns=SID_FIELD_NAME, values=column_name), adjustments=None ) - out[c] = loader.load_adjusted_array([c], dates, assets, mask)[c] + out[c] = loader.load_adjusted_array([c], + dates, + assets, + mask)[c] return out class NextQuartersEstimatesLoader(QuarterEstimatesLoader): - #@profile def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): - # Filter for releases that are after each simulation date. + # Filter for releases that are on or after each simulation date. eligible_next_releases = final_releases_per_qtr[ final_releases_per_qtr[EVENT_DATE_FIELD_NAME] >= - final_releases_per_qtr['dates'] + final_releases_per_qtr[SIMULTATION_DATES] ] - + # For each sid, get the upcoming release. eligible_next_releases.sort(EVENT_DATE_FIELD_NAME) - # For each sid, get the upcoming release/year/quarter. next_releases = eligible_next_releases.groupby( - ['dates', SID_FIELD_NAME] + [SIMULTATION_DATES, SID_FIELD_NAME] ).nth(0).reset_index() # We use nth here to avoid forward filling # NaNs, which `first()` will do. next_releases = next_releases.rename( - columns={FISCAL_YEAR: NEXT_FISCAL_YEAR, - FISCAL_QUARTER: NEXT_FISCAL_QUARTER} + columns={FISCAL_YEAR_FIELD_NAME: NEXT_FISCAL_YEAR, + FISCAL_QUARTER_FIELD_NAME: NEXT_FISCAL_QUARTER} ) - # `next_qtr` is already the next quarter over, - # so we should offest `num_shifts` by 1. - (next_releases[FISCAL_YEAR], - next_releases[FISCAL_QUARTER]) = calc_forward_shift( + # The next fiscal quarter is already our starting point, + # so we should offset `num_quarters` by 1. + (next_releases[FISCAL_YEAR_FIELD_NAME], + next_releases[FISCAL_QUARTER_FIELD_NAME]) = shift_quarters( + (num_quarters - 1), next_releases[NEXT_FISCAL_YEAR], - next_releases[NEXT_FISCAL_QUARTER], (num_quarters - 1) + next_releases[NEXT_FISCAL_QUARTER], ) - # Merge to get the rows we care about for each date + # Do a left merge to get values for each date. result = dates_sids.merge(next_releases, - on=(['dates', SID_FIELD_NAME]), + on=([SIMULTATION_DATES, SID_FIELD_NAME]), how='left') return result @@ -236,37 +191,36 @@ class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader): def __init__(self, estimates, columns): - super(PreviousQuartersEstimatesLoader, self).__init__(estimates, columns) + super(PreviousQuartersEstimatesLoader, self).__init__(estimates, + columns) - #@profile def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr): - # Filter for releases that are before each simulation date. + # Filter for releases that are on or before each simulation date. eligible_previous_releases = final_releases_per_qtr[ final_releases_per_qtr[EVENT_DATE_FIELD_NAME] <= - final_releases_per_qtr['dates'] + final_releases_per_qtr[SIMULTATION_DATES] ] - + # For each sid, get the latest release. eligible_previous_releases.sort(EVENT_DATE_FIELD_NAME) - # For each sid, get the latest release we knew about prior to - # each simulation date. previous_releases = eligible_previous_releases.groupby( - ['dates', SID_FIELD_NAME] + [SIMULTATION_DATES, SID_FIELD_NAME] ).nth(-1).reset_index() # We use nth here to avoid forward filling # NaNs, which `last()` will do. - previous_releases = previous_releases.rename(columns={ - FISCAL_YEAR: PREVIOUS_FISCAL_YEAR, - FISCAL_QUARTER: PREVIOUS_FISCAL_QUARTER + FISCAL_YEAR_FIELD_NAME: PREVIOUS_FISCAL_YEAR, + FISCAL_QUARTER_FIELD_NAME: PREVIOUS_FISCAL_QUARTER }) - - (previous_releases[FISCAL_YEAR], - previous_releases[FISCAL_QUARTER]) = \ - calc_backward_shift( - previous_releases[PREVIOUS_FISCAL_YEAR], previous_releases[ - PREVIOUS_FISCAL_QUARTER], (num_quarters - 1) + # The previous fiscal quarter is already our starting point, + # so we should offset `num_quarters` by 1. + (previous_releases[FISCAL_YEAR_FIELD_NAME], + previous_releases[FISCAL_QUARTER_FIELD_NAME]) = shift_quarters( + -(num_quarters - 1), + previous_releases[PREVIOUS_FISCAL_YEAR], + previous_releases[PREVIOUS_FISCAL_QUARTER], ) - # Merge to get the rows we care about for each date + # Do a left merge to get values for each date. result = dates_sids.merge(previous_releases, - on=(['dates', SID_FIELD_NAME]), how='left') + on=([SIMULTATION_DATES, + SID_FIELD_NAME]), + how='left') return result - diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 385022a8..77a9f447 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -2,7 +2,6 @@ import datetime import numpy as np import pandas as pd -from zipline.pipeline.loaders.frame import DataFrameLoader from zipline.utils.pandas_utils import mask_between_time @@ -273,20 +272,3 @@ def check_data_query_args(data_query_time, data_query_tz): data_query_tz, ), ) - - -def choose_rows_by_indexer(rows, name_map, indexer, columns, dates, sids, mask): - def to_frame(array): - return pd.DataFrame(array, index=dates, columns=sids) - - out = {} - for c in columns: - raw = rows[name_map[c]][indexer] - # indexer will be -1 for locations where we don't have a known - # value. - raw[indexer < 0] = c.missing_value - - # Delegate the actual array formatting logic to a DataFrameLoader. - loader = DataFrameLoader(c, to_frame(raw), adjustments=None) - out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c] - return out \ No newline at end of file