diff --git a/tests/pipeline/test_13d_filings.py b/tests/pipeline/test_13d_filings.py deleted file mode 100644 index 9bf1964a..00000000 --- a/tests/pipeline/test_13d_filings.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Tests for the reference loader for 13d filings. -""" -import pandas as pd - -from zipline.pipeline.common import TS_FIELD_NAME -from zipline.pipeline.data import _13DFilings -from zipline.pipeline.factors.events import BusinessDaysSince13DFilingsDate -from zipline.pipeline.loaders._13d_filings import ( - _13DFilingsLoader, - DISCLOSURE_DATE, - NUM_SHARES, - PERCENT_SHARES, -) -from zipline.pipeline.loaders.utils import ( - zip_with_floats, - zip_with_dates -) -from zipline.testing.fixtures import WithPipelineEventDataLoader -from zipline.testing.fixtures import ZiplineTestCase - -DAYS_SINCE_PREV_DISCLOSURE = 'days_since_prev_disclosure' -PREVIOUS_DISCLOSURE_DATE = 'previous_disclosure_date' -PREVIOUS_NUM_SHARES = 'previous_number_shares' -PREVIOUS_PERCENT_SHARES = 'previous_percentage' - - -date_intervals = [ - [['2014-01-01', '2014-01-04'], - ['2014-01-05', '2014-01-09'], - ['2014-01-10', '2014-01-31']] -] - -empty_df = pd.DataFrame( - columns=[NUM_SHARES, - PERCENT_SHARES, - DISCLOSURE_DATE, - TS_FIELD_NAME], -) - -empty_df[NUM_SHARES] = empty_df[NUM_SHARES].astype('float') -empty_df[PERCENT_SHARES] = empty_df[PERCENT_SHARES].astype('float') -empty_df[TS_FIELD_NAME] = empty_df[TS_FIELD_NAME].astype('datetime64[ns]') -empty_df[DISCLOSURE_DATE] = empty_df[DISCLOSURE_DATE].astype('datetime64[ns]') - -_13d_filings_cases = [ - pd.DataFrame({ - NUM_SHARES: [1, 15], - PERCENT_SHARES: [10, 20], - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), - DISCLOSURE_DATE: pd.to_datetime(['2014-01-04', '2014-01-09']) - }), - empty_df -] - - -class _13DFilingsLoaderTestCase(WithPipelineEventDataLoader, - ZiplineTestCase): - """ - Test for _13_filings dataset. - """ - pipeline_columns = { - PREVIOUS_NUM_SHARES: - _13DFilings.number_shares.latest, - PREVIOUS_PERCENT_SHARES: - _13DFilings.percent_shares.latest, - PREVIOUS_DISCLOSURE_DATE: - _13DFilings.disclosure_date.latest, - DAYS_SINCE_PREV_DISCLOSURE: - BusinessDaysSince13DFilingsDate(), - } - - @classmethod - def get_sids(cls): - return range(2) - - @classmethod - def get_dataset(cls): - return {sid: frame - for sid, frame - in enumerate(_13d_filings_cases)} - - loader_type = _13DFilingsLoader - - def setup(self, dates): - cols = { - PREVIOUS_DISCLOSURE_DATE: self.get_sids_to_frames( - zip_with_dates, - [['NaT', '2014-01-04', '2014-01-09']], - date_intervals, - dates, - 'datetime64[ns]', - 'NaN' - ), - PREVIOUS_NUM_SHARES: self.get_sids_to_frames( - zip_with_floats, - [['NaN', 1, 15]], - date_intervals, - dates, - 'float', - 'NaN' - ), - PREVIOUS_PERCENT_SHARES: self.get_sids_to_frames( - zip_with_floats, - [['NaN', 10, 20]], - date_intervals, - dates, - 'float', - 'NaN' - ) - } - cols[DAYS_SINCE_PREV_DISCLOSURE] = self._compute_busday_offsets( - cols[PREVIOUS_DISCLOSURE_DATE] - ) - return cols diff --git a/tests/pipeline/test_buyback_auth.py b/tests/pipeline/test_buyback_auth.py deleted file mode 100644 index debb24e1..00000000 --- a/tests/pipeline/test_buyback_auth.py +++ /dev/null @@ -1,172 +0,0 @@ -""" -Tests for the reference loader for Buyback Authorizations. -""" -import blaze as bz -from blaze.compute.core import swap_resources_into_scope -import pandas as pd -from six import iteritems - -from zipline.pipeline.common import( - DAYS_SINCE_PREV, - SID_FIELD_NAME, - TS_FIELD_NAME, -) -from zipline.pipeline.data import BuybackAuthorizations -from zipline.pipeline.factors.events import BusinessDaysSinceBuybackAuth -from zipline.pipeline.loaders.buyback_auth import ( - BUYBACK_AMOUNT_FIELD_NAME, - BUYBACK_ANNOUNCEMENT_FIELD_NAME, - BUYBACK_TYPE_FIELD_NAME, - BUYBACK_UNIT_FIELD_NAME, - BuybackAuthorizationsLoader, -) -from zipline.pipeline.loaders.blaze import BlazeBuybackAuthorizationsLoader -from zipline.pipeline.loaders.utils import ( - zip_with_dates, - zip_with_floats, - zip_with_strs -) -from zipline.testing.fixtures import ( - WithPipelineEventDataLoader, ZiplineTestCase -) - -PREVIOUS_BUYBACK_AMOUNT = 'previous_value' -PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement' -PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash' -PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count' -PREVIOUS_BUYBACK_TYPE = 'previous_buyback_type' -PREVIOUS_BUYBACK_UNIT = 'previous_buyback_unit' - -date_intervals = [ - [['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'], - ['2014-01-10', '2014-01-31']] -] - -buyback_authorizations_cases = [ - pd.DataFrame({ - BUYBACK_AMOUNT_FIELD_NAME: [1, 15], - BUYBACK_UNIT_FIELD_NAME: ["$M", "Mshares"], - BUYBACK_TYPE_FIELD_NAME: ["New", "Additional"], - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), - BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', - '2014-01-09']) - }), - pd.DataFrame( - columns=[BUYBACK_AMOUNT_FIELD_NAME, - BUYBACK_UNIT_FIELD_NAME, - BUYBACK_TYPE_FIELD_NAME, - BUYBACK_ANNOUNCEMENT_FIELD_NAME, - TS_FIELD_NAME], - dtype='datetime64[ns]' - ), -] - - -class BuybackAuthLoaderTestCase(WithPipelineEventDataLoader, ZiplineTestCase): - """ - Test for cash buyback authorizations dataset. - """ - pipeline_columns = { - PREVIOUS_BUYBACK_AMOUNT: - BuybackAuthorizations.previous_amount.latest, - PREVIOUS_BUYBACK_ANNOUNCEMENT: - BuybackAuthorizations.previous_date.latest, - PREVIOUS_BUYBACK_UNIT: - BuybackAuthorizations.previous_unit.latest, - PREVIOUS_BUYBACK_TYPE: - BuybackAuthorizations.previous_type.latest, - DAYS_SINCE_PREV: - BusinessDaysSinceBuybackAuth(), - } - - @classmethod - def get_sids(cls): - return range(2) - - @classmethod - def get_dataset(cls): - return {sid: frame - for sid, frame - in enumerate(buyback_authorizations_cases)} - - loader_type = BuybackAuthorizationsLoader - - def setup(self, dates): - cols = { - PREVIOUS_BUYBACK_AMOUNT: self.get_sids_to_frames(zip_with_floats, - [['NaN', 1, 15]], - date_intervals, - dates, - 'float', - 'NaN'), - PREVIOUS_BUYBACK_ANNOUNCEMENT: self.get_sids_to_frames( - zip_with_dates, - [['NaT', '2014-01-04', '2014-01-09']], - date_intervals, - dates, - 'datetime64[ns]', - 'NaN' - ), - PREVIOUS_BUYBACK_UNIT: self.get_sids_to_frames( - zip_with_strs, - [[None, "$M", "Mshares"]], - date_intervals, - dates, - 'category', - None - ), - PREVIOUS_BUYBACK_TYPE: self.get_sids_to_frames( - zip_with_strs, - [[None, "New", "Additional"]], - date_intervals, - dates, - 'category', - None - ) - } - - cols[DAYS_SINCE_PREV] = self._compute_busday_offsets( - cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] - ) - return cols - - -class BlazeBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase): - """ Test case for loading via blaze. - """ - loader_type = BlazeBuybackAuthorizationsLoader - - def pipeline_event_loader_args(self, dates): - _, mapping = super( - BlazeBuybackAuthLoaderTestCase, - self, - ).pipeline_event_loader_args(dates) - return (bz.data(pd.concat( - pd.DataFrame({ - BUYBACK_ANNOUNCEMENT_FIELD_NAME: - frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME], - BUYBACK_AMOUNT_FIELD_NAME: - frame[BUYBACK_AMOUNT_FIELD_NAME], - BUYBACK_UNIT_FIELD_NAME: - frame[BUYBACK_UNIT_FIELD_NAME], - BUYBACK_TYPE_FIELD_NAME: - frame[BUYBACK_TYPE_FIELD_NAME], - TS_FIELD_NAME: - frame[TS_FIELD_NAME], - SID_FIELD_NAME: sid, - }) - for sid, frame in iteritems(mapping) - ).reset_index(drop=True)),) - - -class BlazeBuybackAuthLoaderNotInteractiveTestCase( - BlazeBuybackAuthLoaderTestCase -): - """Test case for passing a non-interactive symbol and a dict of resources. - """ - def pipeline_event_loader_args(self, dates): - (bound_expr,) = super( - BlazeBuybackAuthLoaderNotInteractiveTestCase, - self, - ).pipeline_event_loader_args(dates) - return swap_resources_into_scope(bound_expr, {}) diff --git a/tests/pipeline/test_consensus_estimates.py b/tests/pipeline/test_consensus_estimates.py deleted file mode 100644 index 9a3806ea..00000000 --- a/tests/pipeline/test_consensus_estimates.py +++ /dev/null @@ -1,353 +0,0 @@ -""" -Tests for the reference loader for ConsensusEstimates. -""" -import blaze as bz -from blaze.compute.core import swap_resources_into_scope -import pandas as pd -from six import iteritems - -from zipline.pipeline.common import SID_FIELD_NAME -from zipline.pipeline.data import ConsensusEstimates -from zipline.pipeline.loaders.consensus_estimates import ( - ACTUAL_VALUE_FIELD_NAME, - ConsensusEstimatesLoader, - COUNT_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - HIGH_FIELD_NAME, - LOW_FIELD_NAME, - MEAN_FIELD_NAME, - RELEASE_DATE_FIELD_NAME, - STANDARD_DEVIATION_FIELD_NAME, -) -from zipline.pipeline.loaders.blaze import BlazeConsensusEstimatesLoader -from zipline.pipeline.loaders.utils import ( - zip_with_floats -) -from zipline.testing.fixtures import ( - ZiplineTestCase, - WithNextAndPreviousEventDataLoader -) - -NEXT_COUNT = 'next_count' -NEXT_FISCAL_QUARTER = 'next_fiscal_quarter' -NEXT_FISCAL_YEAR = 'next_fiscal_year' -NEXT_HIGH = 'next_high' -NEXT_LOW = 'next_low' -NEXT_MEAN = 'next_mean' -NEXT_RELEASE_DATE = 'next_release_date' -NEXT_STANDARD_DEVIATION = 'next_standard_deviation' -PREVIOUS_ACTUAL_VALUE = 'previous_actual_value' -PREVIOUS_COUNT = 'previous_count' -PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter' -PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year' -PREVIOUS_HIGH = 'previous_high' -PREVIOUS_LOW = 'previous_low' -PREVIOUS_MEAN = 'previous_mean' -PREVIOUS_RELEASE_DATE = 'previous_release_date' -PREVIOUS_STANDARD_DEVIATION = 'previous_standard_deviation' - - -consensus_estimates_cases = [ - # K1--K2--A1--A2. - pd.DataFrame({ - ACTUAL_VALUE_FIELD_NAME: (100, 200), - STANDARD_DEVIATION_FIELD_NAME: (.5, .6), - COUNT_FIELD_NAME: (1, 2), - FISCAL_QUARTER_FIELD_NAME: (1, 1), - HIGH_FIELD_NAME: (.6, .7), - MEAN_FIELD_NAME: (.1, .2), - FISCAL_YEAR_FIELD_NAME: (2014, 2014), - LOW_FIELD_NAME: (.05, .06), - }), - # K1--K2--A2--A1. - pd.DataFrame({ - ACTUAL_VALUE_FIELD_NAME: (200, 300), - STANDARD_DEVIATION_FIELD_NAME: (.6, .7), - COUNT_FIELD_NAME: (2, 3), - FISCAL_QUARTER_FIELD_NAME: (1, 1), - HIGH_FIELD_NAME: (.7, .8), - MEAN_FIELD_NAME: (.2, .3), - FISCAL_YEAR_FIELD_NAME: (2014, 2014), - LOW_FIELD_NAME: (.06, .07), - }), - # K1--A1--K2--A2. - pd.DataFrame({ - ACTUAL_VALUE_FIELD_NAME: (300, 400), - STANDARD_DEVIATION_FIELD_NAME: (.7, .8), - COUNT_FIELD_NAME: (3, 4), - FISCAL_QUARTER_FIELD_NAME: (1, 1), - HIGH_FIELD_NAME: (.8, .9), - MEAN_FIELD_NAME: (.3, .4), - FISCAL_YEAR_FIELD_NAME: (2014, 2014), - LOW_FIELD_NAME: (.07, .08), - }), - # K1 == K2. - pd.DataFrame({ - ACTUAL_VALUE_FIELD_NAME: (400, 500), - STANDARD_DEVIATION_FIELD_NAME: (.8, .9), - COUNT_FIELD_NAME: (4, 5), - FISCAL_QUARTER_FIELD_NAME: (1, 1), - HIGH_FIELD_NAME: (.9, 1.0), - MEAN_FIELD_NAME: (.4, .5), - FISCAL_YEAR_FIELD_NAME: (2014, 2014), - LOW_FIELD_NAME: (.08, .09), - }), - pd.DataFrame( - columns=[ACTUAL_VALUE_FIELD_NAME, - STANDARD_DEVIATION_FIELD_NAME, - COUNT_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - HIGH_FIELD_NAME, - MEAN_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - LOW_FIELD_NAME], - dtype='datetime64[ns]' - ), -] - -prev_actual_value = [ - ['NaN', 100, 200], - ['NaN', 300, 200], - ['NaN', 300, 400], - ['NaN', 400, 500], - ['NaN'] -] - -next_standard_deviation = [ - ['NaN', .5, .6, 'NaN'], - ['NaN', .6, .7, .6, 'NaN'], - ['NaN', .7, 'NaN', .8, 'NaN'], - ['NaN', .8, .9, 'NaN'], - ['NaN'] -] - -prev_standard_deviation = [ - ['NaN', .5, .6], - ['NaN', .7, .6], - ['NaN', .7, .8], - ['NaN', .8, .9], - ['NaN'] -] - -next_count = [ - ['NaN', 1, 2, 'NaN'], - ['NaN', 2, 3, 2, 'NaN'], - ['NaN', 3, 'NaN', 4, 'NaN'], - ['NaN', 4, 5, 'NaN'], - ['NaN'] -] - -prev_count = [ - ['NaN', 1, 2], - ['NaN', 3, 2], - ['NaN', 3, 4], - ['NaN', 4, 5], - ['NaN'] -] - -next_fiscal_quarter = [ - ['NaN', 1, 1, 'NaN'], - ['NaN', 1, 1, 1, 'NaN'], - ['NaN', 1, 'NaN', 1, 'NaN'], - ['NaN', 1, 1, 'NaN'], - ['NaN'] -] - -prev_fiscal_quarter = [ - ['NaN', 1, 1], - ['NaN', 1, 1], - ['NaN', 1, 1], - ['NaN', 1, 1], - ['NaN'] -] - -next_high = [ - ['NaN', .6, .7, 'NaN'], - ['NaN', .7, .8, .7, 'NaN'], - ['NaN', .8, 'NaN', .9, 'NaN'], - ['NaN', .9, 1.0, 'NaN'], - ['NaN'] -] - -prev_high = [ - ['NaN', .6, .7], - ['NaN', .8, .7], - ['NaN', .8, .9], - ['NaN', .9, 1.0], - ['NaN'] -] - -next_mean = [ - ['NaN', .1, .2, 'NaN'], - ['NaN', .2, .3, .2, 'NaN'], - ['NaN', .3, 'NaN', .4, 'NaN'], - ['NaN', .4, .5, 'NaN'], - ['NaN'] -] - -prev_mean = [ - ['NaN', .1, .2], - ['NaN', .3, .2], - ['NaN', .3, .4], - ['NaN', .4, .5], - ['NaN'] -] - -next_fiscal_year = [ - ['NaN', 2014, 2014, 'NaN'], - ['NaN', 2014, 2014, 2014, 'NaN'], - ['NaN', 2014, 'NaN', 2014, 'NaN'], - ['NaN', 2014, 2014, 'NaN'], - ['NaN'] -] - -prev_fiscal_year = [ - ['NaN', 2014, 2014], - ['NaN', 2014, 2014], - ['NaN', 2014, 2014], - ['NaN', 2014, 2014], - ['NaN'] -] - -next_low = [ - ['NaN', .05, .06, 'NaN'], - ['NaN', .06, .07, .06, 'NaN'], - ['NaN', .07, 'NaN', .08, 'NaN'], - ['NaN', .08, .09, 'NaN'], - ['NaN'] -] - -prev_low = [ - ['NaN', .05, .06], - ['NaN', .07, .06], - ['NaN', .07, .08], - ['NaN', .08, .09], - ['NaN'] -] - -field_name_to_expected_col = { - PREVIOUS_ACTUAL_VALUE: prev_actual_value, - PREVIOUS_STANDARD_DEVIATION: prev_standard_deviation, - NEXT_STANDARD_DEVIATION: next_standard_deviation, - PREVIOUS_COUNT: prev_count, - NEXT_COUNT: next_count, - PREVIOUS_FISCAL_QUARTER: prev_fiscal_quarter, - NEXT_FISCAL_QUARTER: next_fiscal_quarter, - PREVIOUS_HIGH: prev_high, - NEXT_HIGH: next_high, - PREVIOUS_MEAN: prev_mean, - NEXT_MEAN: next_mean, - PREVIOUS_FISCAL_YEAR: prev_fiscal_year, - NEXT_FISCAL_YEAR: next_fiscal_year, - PREVIOUS_LOW: prev_low, - NEXT_LOW: next_low -} - - -class ConsensusEstimatesLoaderTestCase(WithNextAndPreviousEventDataLoader, - ZiplineTestCase): - """ - Tests for loading the consensus estimates data. - """ - pipeline_columns = { - PREVIOUS_ACTUAL_VALUE: - ConsensusEstimates.previous_actual_value.latest, - NEXT_RELEASE_DATE: - ConsensusEstimates.next_release_date.latest, - PREVIOUS_RELEASE_DATE: - ConsensusEstimates.previous_release_date.latest, - PREVIOUS_STANDARD_DEVIATION: - ConsensusEstimates.previous_standard_deviation.latest, - NEXT_STANDARD_DEVIATION: - ConsensusEstimates.next_standard_deviation.latest, - PREVIOUS_COUNT: - ConsensusEstimates.previous_count.latest, - NEXT_COUNT: - ConsensusEstimates.next_count.latest, - PREVIOUS_FISCAL_QUARTER: - ConsensusEstimates.previous_fiscal_quarter.latest, - NEXT_FISCAL_QUARTER: - ConsensusEstimates.next_fiscal_quarter.latest, - PREVIOUS_HIGH: - ConsensusEstimates.previous_high.latest, - NEXT_HIGH: - ConsensusEstimates.next_high.latest, - PREVIOUS_MEAN: - ConsensusEstimates.previous_mean.latest, - NEXT_MEAN: - ConsensusEstimates.next_mean.latest, - PREVIOUS_FISCAL_YEAR: - ConsensusEstimates.previous_fiscal_year.latest, - NEXT_FISCAL_YEAR: - ConsensusEstimates.next_fiscal_year.latest, - PREVIOUS_LOW: - ConsensusEstimates.previous_low.latest, - NEXT_LOW: - ConsensusEstimates.next_low.latest - } - - @classmethod - def get_dataset(cls): - return {sid: - pd.concat([ - cls.base_cases[sid].rename(columns={ - 'other_date': RELEASE_DATE_FIELD_NAME - }), - df - ], axis=1) - for sid, df in enumerate(consensus_estimates_cases)} - - loader_type = ConsensusEstimatesLoader - - def setup(self, dates): - cols = { - PREVIOUS_RELEASE_DATE: - self.get_expected_previous_event_dates( - dates, 'datetime64[ns]', 'NaN' - ), - NEXT_RELEASE_DATE: self.get_expected_next_event_dates( - dates, 'datetime64[ns]', 'NaN' - ) - } - for field_name in field_name_to_expected_col: - cols[field_name] = self.get_sids_to_frames( - zip_with_floats, field_name_to_expected_col[field_name], - self.prev_date_intervals - if field_name.startswith("previous") - else self.next_date_intervals, - dates, - 'float', - 'NaN' - ) - return cols - - -class BlazeConsensusEstimatesLoaderTestCase(ConsensusEstimatesLoaderTestCase): - loader_type = BlazeConsensusEstimatesLoader - - def pipeline_event_loader_args(self, dates): - _, mapping = super( - BlazeConsensusEstimatesLoaderTestCase, - self, - ).pipeline_event_loader_args(dates) - frames = [] - for sid, df in iteritems(mapping): - frame = df.copy() - frame[SID_FIELD_NAME] = sid - frames.append(frame) - return bz.data(pd.concat(frames).reset_index(drop=True)), - - -class BlazeConsensusEstimatesLoaderNotInteractiveTestCase( - BlazeConsensusEstimatesLoaderTestCase -): - """Test case for passing a non-interactive symbol and a dict of resources. - """ - - def pipeline_event_loader_args(self, dates): - (bound_expr,) = super( - BlazeConsensusEstimatesLoaderNotInteractiveTestCase, - self, - ).pipeline_event_loader_args(dates) - return swap_resources_into_scope(bound_expr, {}) diff --git a/tests/pipeline/test_dividends.py b/tests/pipeline/test_dividends.py deleted file mode 100644 index 3e1864ce..00000000 --- a/tests/pipeline/test_dividends.py +++ /dev/null @@ -1,517 +0,0 @@ -""" -Tests for the reference loader for Dividends datasets. -""" -import blaze as bz -from blaze.compute.core import swap_resources_into_scope -import pandas as pd -from six import iteritems - -from zipline.pipeline.common import ( - ANNOUNCEMENT_FIELD_NAME, - PREVIOUS_AMOUNT, - PREVIOUS_ANNOUNCEMENT, - SID_FIELD_NAME, - TS_FIELD_NAME, -) -from zipline.pipeline.data.dividends import ( - DividendsByAnnouncementDate, - DividendsByExDate, - DividendsByPayDate -) -from zipline.pipeline.factors.events import ( - BusinessDaysSinceDividendAnnouncement, - BusinessDaysSincePreviousExDate, - BusinessDaysUntilNextExDate -) -from zipline.pipeline.loaders.blaze.dividends import ( - BlazeDividendsByAnnouncementDateLoader, - BlazeDividendsByPayDateLoader, - BlazeDividendsByExDateLoader -) -from zipline.pipeline.loaders.dividends import ( - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME, - DividendsByAnnouncementDateLoader, - DividendsByExDateLoader, - DividendsByPayDateLoader, - EX_DATE_FIELD_NAME, - PAY_DATE_FIELD_NAME, -) -from zipline.pipeline.loaders.utils import ( - zip_with_dates, - zip_with_floats, - zip_with_strs, -) -from zipline.testing.fixtures import ( - WithPipelineEventDataLoader, - ZiplineTestCase -) - -DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT = 'days_since_prev_dividend_announcement' -DAYS_SINCE_PREV_EX_DATE = 'days_since_prev_ex_date' -DAYS_TO_NEXT_EX_DATE = 'days_to_next_ex_date' -NEXT_AMOUNT = 'next_amount' -NEXT_CURRENCY_TYPE = 'next_currency_type' -NEXT_DIVIDEND_TYPE = 'next_dividend_type' -NEXT_EX_DATE = 'next_ex_date' -NEXT_PAY_DATE = 'next_pay_date' -PREVIOUS_CURRENCY_TYPE = 'previous_currency_type' -PREVIOUS_DIVIDEND_TYPE = 'previous_dividend_type' -PREVIOUS_EX_DATE = 'previous_ex_date' -PREVIOUS_PAY_DATE = 'previous_pay_date' - - -dividends_cases = [ - # K1--K2--A1--A2. - pd.DataFrame({ - CASH_AMOUNT_FIELD_NAME: [1, 15], - EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']), - PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']), - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), - ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09']), - CURRENCY_FIELD_NAME: ["$", "EUR"], - DIVIDEND_TYPE_FIELD_NAME: ["Stock", "Mixed"] - }), - # K1--K2--A2--A1. - pd.DataFrame({ - CASH_AMOUNT_FIELD_NAME: [7, 13], - EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']), - PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']), - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), - ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09']), - CURRENCY_FIELD_NAME: ["EUR", "$"], - DIVIDEND_TYPE_FIELD_NAME: ["Mixed", "Stock"] - }), - # K1--A1--K2--A2. - pd.DataFrame({ - CASH_AMOUNT_FIELD_NAME: [3, 1], - EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']), - PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']), - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']), - ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-14']), - CURRENCY_FIELD_NAME: ["$", "EUR"], - DIVIDEND_TYPE_FIELD_NAME: ["Stock", "Mixed"] - }), - # K1 == K2. - pd.DataFrame({ - CASH_AMOUNT_FIELD_NAME: [6, 23], - EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']), - PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']), - TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2), - ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-04']), - CURRENCY_FIELD_NAME: ["$", "EUR"], - DIVIDEND_TYPE_FIELD_NAME: ["Stock", "Mixed"] - }), - pd.DataFrame( - columns=[CASH_AMOUNT_FIELD_NAME, - EX_DATE_FIELD_NAME, - PAY_DATE_FIELD_NAME, - TS_FIELD_NAME, - ANNOUNCEMENT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME], - dtype='datetime64[ns]' - ), -] - -prev_date_intervals = [ - [ - ['2014-01-01', '2014-01-14'], ['2014-01-15', '2014-01-19'], - ['2014-01-20', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-14'], ['2014-01-15', '2014-01-19'], - ['2014-01-20', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-09'], ['2014-01-10', '2014-01-19'], - ['2014-01-20', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-09'], ['2014-01-10', '2014-01-14'], - ['2014-01-15', '2014-01-31'] - ] -] - -next_date_intervals = [ - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-15'], - ['2014-01-16', '2014-01-20'], ['2014-01-21', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'], - ['2014-01-10', '2014-01-15'], ['2014-01-16', '2014-01-20'], - ['2014-01-21', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-10'], - ['2014-01-11', '2014-01-14'], ['2014-01-15', '2014-01-20'], - ['2014-01-21', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-10'], - ['2014-01-11', '2014-01-15'], ['2014-01-16', '2014-01-31'] - ] -] - -next_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20', 'NaT'], - ['NaT', '2014-01-20', '2014-01-15', '2014-01-20', - 'NaT'], - ['NaT', '2014-01-10', 'NaT', '2014-01-20', 'NaT'], - ['NaT', '2014-01-10', '2014-01-15', 'NaT']] - -prev_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20'], - ['NaT', '2014-01-15', '2014-01-20'], - ['NaT', '2014-01-10', '2014-01-20'], - ['NaT', '2014-01-10', '2014-01-15']] - -prev_amounts = [['NaN', 1, 15], - ['NaN', 13, 7], - ['NaN', 3, 1], - ['NaN', 6, 23]] - -next_amounts = [['NaN', 1, 15, 'NaN'], - ['NaN', 7, 13, 7, 'NaN'], - ['NaN', 3, 'NaN', 1, 'NaN'], - ['NaN', 6, 23, 'NaN']] - -prev_currency_types = [[None, "$", "EUR"], - [None, "$", "EUR"], - [None, "$", "EUR"], - [None, "$", "EUR"]] - -next_currency_types = [[None, "$", "EUR", None], - [None, "EUR", "$", "EUR", None], - [None, "$", None, "EUR", None], - [None, "$", "EUR", None]] - -prev_dividend_types = [[None, "Stock", "Mixed"], - [None, "Stock", "Mixed"], - [None, "Stock", "Mixed"], - [None, "Stock", "Mixed"]] - -next_dividend_types = [[None, "Stock", "Mixed", None], - [None, "Mixed", "Stock", "Mixed", None], - [None, "Stock", None, "Mixed", None], - [None, "Stock", "Mixed", None]] - - -class DividendsByAnnouncementDateTestCase(WithPipelineEventDataLoader, - ZiplineTestCase): - """ - Tests for loading the dividends by announcement date data. - """ - pipeline_columns = { - PREVIOUS_ANNOUNCEMENT: - DividendsByAnnouncementDate.previous_announcement_date.latest, - PREVIOUS_AMOUNT: DividendsByAnnouncementDate.previous_amount.latest, - DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT: - BusinessDaysSinceDividendAnnouncement(), - PREVIOUS_CURRENCY_TYPE: - DividendsByAnnouncementDate.previous_currency.latest, - PREVIOUS_DIVIDEND_TYPE: - DividendsByAnnouncementDate.previous_type.latest, - } - - @classmethod - def get_dataset(cls): - return {sid: - frame.drop([EX_DATE_FIELD_NAME, - PAY_DATE_FIELD_NAME], axis=1) - for sid, frame - in enumerate(dividends_cases)} - - loader_type = DividendsByAnnouncementDateLoader - - def setup(self, dates): - date_intervals = [ - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'], - ['2014-01-10', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'], - ['2014-01-10', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-14'], - ['2014-01-15', '2014-01-31'] - ], - [ - ['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-31'] - ] - ] - announcement_dates = [['NaT', '2014-01-04', '2014-01-09'], - ['NaT', '2014-01-04', '2014-01-09'], - ['NaT', '2014-01-04', '2014-01-14'], - ['NaT', '2014-01-04']] - amounts = [['NaN', 1, 15], ['NaN', 7, 13], ['NaN', 3, 1], ['NaN', 23]] - currency_types = [[None, "$", "EUR"], [None, "EUR", "$"], - [None, "$", "EUR"], [None, "EUR"]] - dividend_types = [[None, "Stock", "Mixed"], [None, "Mixed", "Stock"], - [None, "Stock", "Mixed"], [None, "Mixed"]] - cols = { - PREVIOUS_ANNOUNCEMENT: self.get_sids_to_frames( - zip_with_dates, announcement_dates, date_intervals, dates, - 'datetime64[ns]', 'NaN' - ), - PREVIOUS_AMOUNT: self.get_sids_to_frames( - zip_with_floats, amounts, date_intervals, dates, 'float', 'NaN' - ), - PREVIOUS_CURRENCY_TYPE: self.get_sids_to_frames( - zip_with_strs, currency_types, date_intervals, dates, - 'category', None - ), - PREVIOUS_DIVIDEND_TYPE: self.get_sids_to_frames( - zip_with_strs, dividend_types, date_intervals, dates, - 'category', None - ), - } - - cols[ - DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT - ] = self._compute_busday_offsets(cols[PREVIOUS_ANNOUNCEMENT]) - return cols - - -class BlazeDividendsByAnnouncementDateTestCase( - DividendsByAnnouncementDateTestCase -): - loader_type = BlazeDividendsByAnnouncementDateLoader - - def pipeline_event_loader_args(self, dates): - _, mapping = super( - BlazeDividendsByAnnouncementDateTestCase, - self, - ).pipeline_event_loader_args(dates) - return (bz.Data(pd.concat( - pd.DataFrame({ - ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME], - TS_FIELD_NAME: df[TS_FIELD_NAME], - SID_FIELD_NAME: sid, - CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME], - CURRENCY_FIELD_NAME: df[CURRENCY_FIELD_NAME], - DIVIDEND_TYPE_FIELD_NAME: df[DIVIDEND_TYPE_FIELD_NAME], - }) - for sid, df in iteritems(mapping) - ).reset_index(drop=True)),) - - -class BlazeDividendsByAnnouncementDateNotInteractiveTestCase( - BlazeDividendsByAnnouncementDateTestCase): - """Test case for passing a non-interactive symbol and a dict of resources. - """ - - def pipeline_event_loader_args(self, dates): - (bound_expr,) = super( - BlazeDividendsByAnnouncementDateNotInteractiveTestCase, - self, - ).pipeline_event_loader_args(dates) - return swap_resources_into_scope(bound_expr, {}) - - -class DividendsByExDateTestCase(WithPipelineEventDataLoader, ZiplineTestCase): - """ - Tests for loading the dividends by ex date data. - """ - pipeline_columns = { - NEXT_EX_DATE: DividendsByExDate.next_date.latest, - PREVIOUS_EX_DATE: DividendsByExDate.previous_date.latest, - NEXT_AMOUNT: DividendsByExDate.next_amount.latest, - PREVIOUS_AMOUNT: DividendsByExDate.previous_amount.latest, - PREVIOUS_CURRENCY_TYPE: DividendsByExDate.previous_currency.latest, - NEXT_CURRENCY_TYPE: DividendsByExDate.next_currency.latest, - PREVIOUS_DIVIDEND_TYPE: DividendsByExDate.previous_type.latest, - NEXT_DIVIDEND_TYPE: DividendsByExDate.next_type.latest, - DAYS_TO_NEXT_EX_DATE: BusinessDaysUntilNextExDate(), - DAYS_SINCE_PREV_EX_DATE: BusinessDaysSincePreviousExDate() - } - - @classmethod - def get_dataset(cls): - return {sid: - frame.drop([ANNOUNCEMENT_FIELD_NAME, - PAY_DATE_FIELD_NAME], axis=1) - for sid, frame - in enumerate(dividends_cases)} - - loader_type = DividendsByExDateLoader - - def setup(self, dates): - cols = { - NEXT_EX_DATE: self.get_sids_to_frames( - zip_with_dates, next_ex_and_pay_dates, next_date_intervals, - dates, - 'datetime64[ns]', 'NaN' - ), - PREVIOUS_EX_DATE: self.get_sids_to_frames( - zip_with_dates, prev_ex_and_pay_dates, prev_date_intervals, - dates, - 'datetime64[ns]', 'NaN' - ), - NEXT_AMOUNT: self.get_sids_to_frames( - zip_with_floats, next_amounts, next_date_intervals, dates, - 'float', 'NaN' - ), - PREVIOUS_AMOUNT: self.get_sids_to_frames( - zip_with_floats, prev_amounts, prev_date_intervals, dates, - 'float', 'NaN' - ), - PREVIOUS_CURRENCY_TYPE: self.get_sids_to_frames( - zip_with_strs, prev_currency_types, prev_date_intervals, dates, - 'category', None - ), - NEXT_CURRENCY_TYPE: self.get_sids_to_frames( - zip_with_strs, next_currency_types, next_date_intervals, dates, - 'category', None - ), - PREVIOUS_DIVIDEND_TYPE: self.get_sids_to_frames( - zip_with_strs, prev_dividend_types, prev_date_intervals, dates, - 'category', None - ), - NEXT_DIVIDEND_TYPE: self.get_sids_to_frames( - zip_with_strs, next_dividend_types, next_date_intervals, dates, - 'category', None - ), - } - - cols[DAYS_TO_NEXT_EX_DATE] = self._compute_busday_offsets( - cols[NEXT_EX_DATE] - ) - cols[DAYS_SINCE_PREV_EX_DATE] = self._compute_busday_offsets( - cols[PREVIOUS_EX_DATE] - ) - return cols - - -class BlazeDividendsByExDateLoaderTestCase(DividendsByExDateTestCase): - loader_type = BlazeDividendsByExDateLoader - - def pipeline_event_loader_args(self, dates): - _, mapping = super( - BlazeDividendsByExDateLoaderTestCase, - self, - ).pipeline_event_loader_args(dates) - return (bz.Data(pd.concat( - pd.DataFrame({ - EX_DATE_FIELD_NAME: df[EX_DATE_FIELD_NAME], - TS_FIELD_NAME: df[TS_FIELD_NAME], - SID_FIELD_NAME: sid, - CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME], - CURRENCY_FIELD_NAME: df[CURRENCY_FIELD_NAME], - DIVIDEND_TYPE_FIELD_NAME: df[DIVIDEND_TYPE_FIELD_NAME], - }) - for sid, df in iteritems(mapping) - ).reset_index(drop=True)),) - - -class BlazeDividendsByExDateLoaderNotInteractiveTestCase( - BlazeDividendsByExDateLoaderTestCase): - """Test case for passing a non-interactive symbol and a dict of resources. - """ - - def pipeline_event_loader_args(self, dates): - (bound_expr,) = super( - BlazeDividendsByExDateLoaderNotInteractiveTestCase, - self, - ).pipeline_event_loader_args(dates) - return swap_resources_into_scope(bound_expr, {}) - - -class DividendsByPayDateTestCase(WithPipelineEventDataLoader, ZiplineTestCase): - """ - Tests for loading the dividends by pay date data. - """ - pipeline_columns = { - NEXT_PAY_DATE: DividendsByPayDate.next_date.latest, - PREVIOUS_PAY_DATE: DividendsByPayDate.previous_date.latest, - NEXT_AMOUNT: DividendsByPayDate.next_amount.latest, - PREVIOUS_AMOUNT: DividendsByPayDate.previous_amount.latest, - PREVIOUS_CURRENCY_TYPE: DividendsByPayDate.previous_currency.latest, - NEXT_CURRENCY_TYPE: DividendsByPayDate.next_currency.latest, - PREVIOUS_DIVIDEND_TYPE: DividendsByPayDate.previous_type.latest, - NEXT_DIVIDEND_TYPE: DividendsByPayDate.next_type.latest, - } - - @classmethod - def get_dataset(cls): - return {sid: - frame.drop([ANNOUNCEMENT_FIELD_NAME, - EX_DATE_FIELD_NAME], axis=1) - for sid, frame - in enumerate(dividends_cases)} - - loader_type = DividendsByPayDateLoader - - def setup(self, dates): - return { - NEXT_PAY_DATE: self.get_sids_to_frames( - zip_with_dates, next_ex_and_pay_dates, next_date_intervals, - dates, - 'datetime64[ns]', 'NaN' - ), - PREVIOUS_PAY_DATE: self.get_sids_to_frames( - zip_with_dates, prev_ex_and_pay_dates, prev_date_intervals, - dates, - 'datetime64[ns]', 'NaN' - ), - NEXT_AMOUNT: self.get_sids_to_frames( - zip_with_floats, next_amounts, next_date_intervals, dates, - 'float', 'NaN' - ), - PREVIOUS_AMOUNT: self.get_sids_to_frames( - zip_with_floats, prev_amounts, prev_date_intervals, dates, - 'float', 'NaN' - ), - PREVIOUS_CURRENCY_TYPE: self.get_sids_to_frames( - zip_with_strs, prev_currency_types, prev_date_intervals, dates, - 'category', None - ), - NEXT_CURRENCY_TYPE: self.get_sids_to_frames( - zip_with_strs, next_currency_types, next_date_intervals, dates, - 'category', None - ), - PREVIOUS_DIVIDEND_TYPE: self.get_sids_to_frames( - zip_with_strs, prev_dividend_types, prev_date_intervals, dates, - 'category', None - ), - NEXT_DIVIDEND_TYPE: self.get_sids_to_frames( - zip_with_strs, next_dividend_types, next_date_intervals, dates, - 'category', None - ), - } - - -class BlazeDividendsByPayDateLoaderTestCase(DividendsByPayDateTestCase): - loader_type = BlazeDividendsByPayDateLoader - - def pipeline_event_loader_args(self, dates): - _, mapping = super( - BlazeDividendsByPayDateLoaderTestCase, - self, - ).pipeline_event_loader_args(dates) - return (bz.Data(pd.concat( - pd.DataFrame({ - PAY_DATE_FIELD_NAME: df[PAY_DATE_FIELD_NAME], - TS_FIELD_NAME: df[TS_FIELD_NAME], - SID_FIELD_NAME: sid, - CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME], - CURRENCY_FIELD_NAME: df[CURRENCY_FIELD_NAME], - DIVIDEND_TYPE_FIELD_NAME: df[DIVIDEND_TYPE_FIELD_NAME], - }) - for sid, df in iteritems(mapping) - ).reset_index(drop=True)),) - - -class BlazeDividendsByPayDateLoaderNotInteractiveTestCase( - BlazeDividendsByPayDateLoaderTestCase): - """Test case for passing a non-interactive symbol and a dict of resources. - """ - - def pipeline_event_loader_args(self, dates): - (bound_expr,) = super( - BlazeDividendsByPayDateLoaderNotInteractiveTestCase, - self, - ).pipeline_event_loader_args(dates) - return swap_resources_into_scope(bound_expr, {}) diff --git a/tests/pipeline/test_earnings.py b/tests/pipeline/test_earnings.py deleted file mode 100644 index 74f58597..00000000 --- a/tests/pipeline/test_earnings.py +++ /dev/null @@ -1,98 +0,0 @@ -""" -Tests for the reference loader for EarningsCalendar. -""" -import blaze as bz -from blaze.compute.core import swap_resources_into_scope -import pandas as pd -from six import iteritems - -from zipline.pipeline.common import ( - ANNOUNCEMENT_FIELD_NAME, - DAYS_SINCE_PREV, - DAYS_TO_NEXT, - NEXT_ANNOUNCEMENT, - PREVIOUS_ANNOUNCEMENT, - SID_FIELD_NAME, - TS_FIELD_NAME -) -from zipline.pipeline.data import EarningsCalendar -from zipline.pipeline.factors.events import ( - BusinessDaysSincePreviousEarnings, - BusinessDaysUntilNextEarnings, -) -from zipline.pipeline.loaders.earnings import EarningsCalendarLoader -from zipline.pipeline.loaders.blaze import BlazeEarningsCalendarLoader -from zipline.testing.fixtures import ( - ZiplineTestCase, - WithNextAndPreviousEventDataLoader -) - - -class EarningsCalendarLoaderTestCase(WithNextAndPreviousEventDataLoader, - ZiplineTestCase): - """ - Tests for loading the earnings announcement data. - """ - pipeline_columns = { - NEXT_ANNOUNCEMENT: EarningsCalendar.next_announcement.latest, - PREVIOUS_ANNOUNCEMENT: EarningsCalendar.previous_announcement.latest, - DAYS_SINCE_PREV: BusinessDaysSincePreviousEarnings(), - DAYS_TO_NEXT: BusinessDaysUntilNextEarnings(), - } - - @classmethod - def get_dataset(cls): - return {sid: df.rename( - columns={'other_date': ANNOUNCEMENT_FIELD_NAME} - ) for sid, df in enumerate(cls.base_cases)} - - loader_type = EarningsCalendarLoader - - def setup(self, dates): - cols = { - PREVIOUS_ANNOUNCEMENT: self.get_expected_previous_event_dates( - dates, - 'datetime64[ns]', 'NaN' - ), - NEXT_ANNOUNCEMENT: self.get_expected_next_event_dates( - dates, 'datetime64[ns]', 'NaN' - ), - } - cols[DAYS_TO_NEXT] = self._compute_busday_offsets( - cols[NEXT_ANNOUNCEMENT] - ) - cols[DAYS_SINCE_PREV] = self._compute_busday_offsets( - cols[PREVIOUS_ANNOUNCEMENT] - ) - return cols - - -class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase): - loader_type = BlazeEarningsCalendarLoader - - def pipeline_event_loader_args(self, dates): - _, mapping = super( - BlazeEarningsCalendarLoaderTestCase, - self, - ).pipeline_event_loader_args(dates) - return (bz.data(pd.concat( - pd.DataFrame({ - ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME], - TS_FIELD_NAME: df[TS_FIELD_NAME], - SID_FIELD_NAME: sid, - }) - for sid, df in iteritems(mapping) - ).reset_index(drop=True)),) - - -class BlazeEarningsCalendarLoaderNotInteractiveTestCase( - BlazeEarningsCalendarLoaderTestCase): - """Test case for passing a non-interactive symbol and a dict of resources. - """ - - def pipeline_event_loader_args(self, dates): - (bound_expr,) = super( - BlazeEarningsCalendarLoaderNotInteractiveTestCase, - self, - ).pipeline_event_loader_args(dates) - return swap_resources_into_scope(bound_expr, {}) diff --git a/tests/pipeline/test_events.py b/tests/pipeline/test_events.py index 9663b94e..5747f13c 100644 --- a/tests/pipeline/test_events.py +++ b/tests/pipeline/test_events.py @@ -1,299 +1,458 @@ """ Tests for setting up an EventsLoader and a BlazeEventsLoader. """ -import re -from unittest import TestCase +from itertools import product import blaze as bz -from nose_parameterized import parameterized import numpy as np -from numpy.testing import assert_array_equal import pandas as pd -from pandas.util.testing import assert_series_equal +from zipline.pipeline import Pipeline, SimplePipelineEngine from zipline.pipeline.common import ( - ANNOUNCEMENT_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + TS_FIELD_NAME, SID_FIELD_NAME, - TS_FIELD_NAME ) from zipline.pipeline.data import DataSet, Column +from zipline.pipeline.loaders.events import EventsLoader from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader -from zipline.pipeline.loaders.events import ( - DF_NO_TS_NOT_INFER_TS_ERROR, - DTINDEX_NOT_INFER_TS_ERROR, - EventsLoader, - SERIES_NO_DTINDEX_ERROR, - WRONG_COLS_ERROR, - WRONG_MANY_COL_DATA_FORMAT_ERROR, - WRONG_SINGLE_COL_DATA_FORMAT_ERROR +from zipline.pipeline.loaders.utils import ( + previous_event_indexer, + next_event_indexer, +) +from zipline.testing import ZiplineTestCase +from zipline.testing.fixtures import ( + WithAssetFinder, + WithNYSETradingDays, +) +from zipline.testing.predicates import assert_equal +from zipline.utils.numpy_utils import ( + categorical_dtype, + datetime64ns_dtype, + float64_dtype, + int64_dtype, ) -from zipline.utils.memoize import lazyval -from zipline.utils.numpy_utils import datetime64ns_dtype - -OTHER_FIELD = "other_field" - -ABSTRACT_CONCRETE_LOADER_ERROR = 'abstract methods concrete_loader' -ABSTRACT_EXPECTED_COLS_ERROR = 'abstract methods event_date_col, expected_cols' class EventDataSet(DataSet): - previous_announcement = Column(datetime64ns_dtype) - next_announcement = Column(datetime64ns_dtype) + previous_event_date = Column(dtype=datetime64ns_dtype) + next_event_date = Column(dtype=datetime64ns_dtype) -class EventDataSetLoader(EventsLoader): - expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME]) + previous_float = Column(dtype=float64_dtype) + next_float = Column(dtype=float64_dtype) - event_date_col = ANNOUNCEMENT_FIELD_NAME + previous_datetime = Column(dtype=datetime64ns_dtype) + next_datetime = Column(dtype=datetime64ns_dtype) - def __init__(self, - all_dates, - events_by_sid, - infer_timestamps=False, - dataset=EventDataSet): - super(EventDataSetLoader, self).__init__( - all_dates, - events_by_sid, - infer_timestamps=infer_timestamps, - dataset=dataset, - ) + previous_int = Column(dtype=int64_dtype, missing_value=-1) + next_int = Column(dtype=int64_dtype, missing_value=-1) - @lazyval - def previous_announcement_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_announcement, - ) + previous_string = Column(dtype=categorical_dtype, missing_value=None) + next_string = Column(dtype=categorical_dtype, missing_value=None) - @lazyval - def next_announcement_loader(self): - return self._next_event_date_loader( - self.dataset.next_announcement, - ) - - -# Test case just for catching an error when multiple columns are in the wrong -# data format, so no loader defined. -class EventDataSetLoaderMultipleExpectedColsNoColumnLoaders(EventsLoader): - expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME, OTHER_FIELD]) - - event_date_col = ANNOUNCEMENT_FIELD_NAME - - -class EventDataSetLoaderNoExpectedCols(EventsLoader): - - def __init__(self, - all_dates, - events_by_sid, - infer_timestamps=False, - dataset=EventDataSet): - super(EventDataSetLoaderNoExpectedCols, self).__init__( - all_dates, - events_by_sid, - infer_timestamps=infer_timestamps, - dataset=dataset, - ) - - -dtx = pd.date_range('2014-01-01', '2014-01-10') - - -class EventLoaderTestCase(TestCase): - def test_null_in_event_date_col(self): - # Tests that if there is a null date in the event date column, it is - # filtered out and does not break on loading the adjusted array. - dates_with_null = pd.Series(dtx) - dates_with_null[2] = pd.NaT - events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: - dates_with_null, - TS_FIELD_NAME: dtx})} - loader = EventDataSetLoader( - dtx, - events_by_sid, - ) - - prev_result = loader.load_adjusted_array({ - EventDataSet.previous_announcement - }, dtx, [0], [True])[EventDataSet.previous_announcement].data[:, 0] - - next_result = loader.load_adjusted_array({ - EventDataSet.next_announcement - }, dtx, [0], [True])[EventDataSet.next_announcement].data[:, 0] - - expected_prev = dates_with_null[:] - expected_prev[2] = dtx[1] - assert_array_equal(prev_result, expected_prev) - expected_next = dates_with_null[:] - expected_next[2] = np.datetime64('NaT') - assert_array_equal(next_result, expected_next) - - def assert_loader_error(self, events_by_sid, error, msg, - infer_timestamps, loader): - with self.assertRaisesRegexp(error, re.escape(msg)): - loader( - dtx, events_by_sid, infer_timestamps=infer_timestamps, - ) - - def test_no_expected_cols_defined(self): - events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})} - self.assert_loader_error(events_by_sid, TypeError, - ABSTRACT_EXPECTED_COLS_ERROR, - True, EventDataSetLoaderNoExpectedCols) - - def test_wrong_cols(self): - wrong_col_name = 'some_other_col' - # Test wrong cols (cols != expected) - events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})} - self.assert_loader_error( - events_by_sid, ValueError, WRONG_COLS_ERROR.format( - expected_columns=list(EventDataSetLoader.expected_cols), - sid=0, - resulting_columns=[wrong_col_name], - ), - True, - EventDataSetLoader - ) - - @parameterized.expand([ - # DataFrame without timestamp column and infer_timestamps = True - [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), True], - # DataFrame with timestamp column - [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx, - TS_FIELD_NAME: dtx}), False], - # DatetimeIndex with infer_timestamps = True - [pd.DatetimeIndex(dtx), True], - # Series with DatetimeIndex as index and infer_timestamps = False - [pd.Series(dtx, index=dtx), False] - ]) - def test_conversion_to_df(self, df, infer_timestamps): - - events_by_sid = {0: df} - loader = EventDataSetLoader( - dtx, - events_by_sid, - infer_timestamps=infer_timestamps, - ) - self.assertEqual( - loader.events_by_sid.keys(), - events_by_sid.keys(), - ) - - if infer_timestamps: - expected = pd.Series(index=[dtx[0]] * 10, data=dtx, - name=ANNOUNCEMENT_FIELD_NAME) - else: - expected = pd.Series(index=dtx, data=dtx, - name=ANNOUNCEMENT_FIELD_NAME) - expected.index.name = TS_FIELD_NAME - # Check that index by first given date has been added - assert_series_equal( - loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME], - expected, - ) - - @parameterized.expand( - [ - # DataFrame without timestamp column and infer_timestamps = True - [ - pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), - False, - DF_NO_TS_NOT_INFER_TS_ERROR.format( - timestamp_column_name=TS_FIELD_NAME, - sid=0 - ), - EventDataSetLoader - ], - # DatetimeIndex with infer_timestamps = False - [ - pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), - False, - DTINDEX_NOT_INFER_TS_ERROR.format(sid=0), - EventDataSetLoader - ], - # Series with DatetimeIndex as index and infer_timestamps = False - [ - pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME), - False, - SERIES_NO_DTINDEX_ERROR.format(sid=0), - EventDataSetLoader - ], - # Below, 2 cases repeated for infer_timestamps = True and False. - # Shouldn't make a difference in the outcome. - # We expected 1 column but got a data structure other than a - # DataFrame, Series, or DatetimeIndex - [ - [dtx], - True, - WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0), - EventDataSetLoader - ], - # We expected multiple columns but got a data structure other - # than a DataFrame - [ - [dtx, dtx], - True, - WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0), - EventDataSetLoaderMultipleExpectedColsNoColumnLoaders - ], - [ - [dtx], - False, - WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0), - EventDataSetLoader - ], - # We expected multiple columns but got a data structure other - # than a DataFrame - [ - [dtx, dtx], - False, - WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0), - EventDataSetLoaderMultipleExpectedColsNoColumnLoaders - ] - ] + previous_string_custom_missing = Column( + dtype=categorical_dtype, + missing_value=u"<>", + ) + next_string_custom_missing = Column( + dtype=categorical_dtype, + missing_value=u"<>", ) - def test_bad_conversion_to_df(self, df, infer_timestamps, msg, loader): - events_by_sid = {0: df} - self.assert_loader_error(events_by_sid, ValueError, msg, - infer_timestamps, loader) -class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader): - def __init__(self, - expr, - dataset=EventDataSet, - **kwargs): - super( - BlazeEventDataSetLoaderNoConcreteLoader, self - ).__init__(expr, - dataset=dataset, - **kwargs) +critical_dates = pd.to_datetime([ + '2014-01-05', + '2014-01-10', + '2014-01-15', + '2014-01-20', +]) -class BlazeEventLoaderTestCase(TestCase): - # Blaze loader: need to test failure if no concrete loader - def test_no_concrete_loader_defined(self): - with self.assertRaisesRegexp( - TypeError, re.escape(ABSTRACT_CONCRETE_LOADER_ERROR) - ): - BlazeEventDataSetLoaderNoConcreteLoader( - bz.data( - pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx, - SID_FIELD_NAME: 0}) +def make_events_for_sid(sid, event_dates, event_timestamps): + num_events = len(event_dates) + return pd.DataFrame({ + 'sid': np.full(num_events, sid, dtype=np.int64), + 'timestamp': event_timestamps, + 'event_date': event_dates, + 'float': np.arange(num_events, dtype=np.float64) + sid, + 'int': np.arange(num_events) + sid, + 'datetime': pd.date_range('1990-01-01', periods=num_events).shift(sid), + 'string': ['-'.join([str(sid), str(i)]) for i in range(num_events)], + }) + + +def make_null_event_date_events(all_sids, timestamp): + """ + Make an event with a null event_date for all sids. + + Used to test that EventsLoaders filter out null events. + """ + return pd.DataFrame({ + 'sid': all_sids, + 'timestamp': timestamp, + 'event_date': pd.Timestamp('NaT'), + 'float': -9999.0, + 'int': -9999, + 'datetime': pd.Timestamp('1980'), + 'string': 'should be ignored', + }) + + +def make_events(add_nulls): + """ + Every event has at least three pieces of data associated with it: + + 1. sid : The ID of the asset associated with the event. + 2. event_date : The date on which an event occurred. + 3. timestamp : The date on which we learned about the event. + This can be before the occurence_date in the case of an + announcement about an upcoming event. + + Events for two different sids shouldn't interact in any way, so the + interesting cases are determined by the possible interleavings of + event_date and timestamp for a single sid. + + Fix two events with dates e1, e2 and timestamps t1 and t2. + + Without loss of generality, assume that e1 < e2. (If two events have the + same occurrence date, the behavior of next/previous event is undefined). + + The remaining possible sequences of events are given by taking all possible + 4-tuples of four ascending dates. For each possible interleaving, we + generate a set of fake events with those dates and assign them to a new + sid. + """ + def gen_date_interleavings(): + for e1, e2, t1, t2 in product(*[critical_dates] * 4): + if e1 < e2: + yield (e1, e2, t1, t2) + + event_frames = [] + for sid, (e1, e2, t1, t2) in enumerate(gen_date_interleavings()): + event_frames.append(make_events_for_sid(sid, [e1, e2], [t1, t2])) + + if add_nulls: + for date in critical_dates: + event_frames.append( + make_null_event_date_events( + np.arange(sid + 1), + timestamp=date, ) ) + return pd.concat(event_frames, ignore_index=True) -class BlazeEventDataSetLoader(BlazeEventsLoader): - concrete_loader = EventDataSetLoader - _expected_fields = frozenset({ANNOUNCEMENT_FIELD_NAME, - TS_FIELD_NAME, - SID_FIELD_NAME}) - def __init__(self, - expr, - dataset=EventDataSet, - **kwargs): - super( - BlazeEventDataSetLoader, self - ).__init__(expr, - dataset=dataset, - **kwargs) +class EventIndexerTestCase(ZiplineTestCase): + + @classmethod + def init_class_fixtures(cls): + super(EventIndexerTestCase, cls).init_class_fixtures() + cls.events = make_events(add_nulls=False).sort('event_date') + cls.events.reset_index(inplace=True) + + def test_previous_event_indexer(self): + events = self.events + event_sids = events['sid'].values + event_dates = events['event_date'].values + event_timestamps = events['timestamp'].values + + all_dates = pd.date_range('2014', '2014-01-31') + all_sids = np.unique(event_sids) + + indexer = previous_event_indexer( + all_dates, + all_sids, + event_dates, + event_timestamps, + event_sids, + ) + + # Compute expected results without knowledge of null events. + for i, sid in enumerate(all_sids): + self.check_previous_event_indexer( + events, + all_dates, + sid, + indexer[:, i], + ) + + def check_previous_event_indexer(self, + events, + all_dates, + sid, + indexer): + relevant_events = events[events.sid == sid] + self.assertEqual(len(relevant_events), 2) + + ix1, ix2 = relevant_events.index + + # An event becomes a possible value once we're past both its event_date + # and its timestamp. + event1_first_eligible = max( + relevant_events.loc[ix1, ['event_date', 'timestamp']], + ) + event2_first_eligible = max( + relevant_events.loc[ix2, ['event_date', 'timestamp']], + ) + + for date, computed_index in zip(all_dates, indexer): + if date >= event2_first_eligible: + # If we've seen event 2, it should win even if we've seen event + # 1, because events are sorted by event_date. + self.assertEqual(computed_index, ix2) + elif date >= event1_first_eligible: + # If we've seen event 1 but not event 2, event 1 should win. + self.assertEqual(computed_index, ix1) + else: + # If we haven't seen either event, then we should have -1 as + # sentinel. + self.assertEqual(computed_index, -1) + + def test_next_event_indexer(self): + events = self.events + event_sids = events['sid'].values + event_dates = events['event_date'].values + event_timestamps = events['timestamp'].values + + all_dates = pd.date_range('2014', '2014-01-31') + all_sids = np.unique(event_sids) + + indexer = next_event_indexer( + all_dates, + all_sids, + event_dates, + event_timestamps, + event_sids, + ) + + # Compute expected results without knowledge of null events. + for i, sid in enumerate(all_sids): + self.check_next_event_indexer( + events, + all_dates, + sid, + indexer[:, i], + ) + + def check_next_event_indexer(self, + events, + all_dates, + sid, + indexer): + relevant_events = events[events.sid == sid] + self.assertEqual(len(relevant_events), 2) + + ix1, ix2 = relevant_events.index + e1, e2 = relevant_events['event_date'] + t1, t2 = relevant_events['timestamp'] + + for date, computed_index in zip(all_dates, indexer): + # An event is eligible to be the next event if it's between the + # timestamp and the event_date, inclusive. + if t1 <= date <= e1: + # If e1 is eligible, it should be chosen even if e2 is + # eligible, since it's earlier. + self.assertEqual(computed_index, ix1) + elif t2 <= date <= e2: + # e2 is eligible and e1 is not, so e2 should be chosen. + self.assertEqual(computed_index, ix2) + else: + # Neither event is eligible. Return -1 as a sentinel. + self.assertEqual(computed_index, -1) + + +class EventsLoaderTestCase(WithAssetFinder, + WithNYSETradingDays, + ZiplineTestCase): + + START_DATE = pd.Timestamp('2014-01-01') + END_DATE = pd.Timestamp('2014-01-30') + + @classmethod + def init_class_fixtures(cls): + # This is a rare case where we actually want to do work **before** we + # call init_class_fixtures. We choose our sids for WithAssetFinder + # based on the events generated by make_event_data. + cls.raw_events = make_events(add_nulls=True) + cls.raw_events_no_nulls = cls.raw_events[ + cls.raw_events['event_date'].notnull() + ] + cls.next_value_columns = { + EventDataSet.next_datetime: 'datetime', + EventDataSet.next_event_date: 'event_date', + EventDataSet.next_float: 'float', + EventDataSet.next_int: 'int', + EventDataSet.next_string: 'string', + EventDataSet.next_string_custom_missing: 'string' + } + cls.previous_value_columns = { + EventDataSet.previous_datetime: 'datetime', + EventDataSet.previous_event_date: 'event_date', + EventDataSet.previous_float: 'float', + EventDataSet.previous_int: 'int', + EventDataSet.previous_string: 'string', + EventDataSet.previous_string_custom_missing: 'string' + } + cls.loader = cls.make_loader( + events=cls.raw_events, + next_value_columns=cls.next_value_columns, + previous_value_columns=cls.previous_value_columns, + ) + cls.ASSET_FINDER_EQUITY_SIDS = list(cls.raw_events['sid'].unique()) + cls.ASSET_FINDER_EQUITY_SYMBOLS = [ + 's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS + ] + super(EventsLoaderTestCase, cls).init_class_fixtures() + + @classmethod + def make_loader(cls, events, next_value_columns, previous_value_columns): + # This method exists to be overridden by BlazeEventsLoaderTestCase + return EventsLoader(events, next_value_columns, previous_value_columns) + + def test_load_with_trading_calendar(self): + engine = SimplePipelineEngine( + lambda x: self.loader, + self.trading_days, + self.asset_finder, + ) + + results = engine.run_pipeline( + Pipeline({c.name: c.latest for c in EventDataSet.columns}), + start_date=self.trading_days[0], + end_date=self.trading_days[-1], + ) + + for c in EventDataSet.columns: + if c in self.next_value_columns: + self.check_next_value_results(c, results[c.name].unstack()) + elif c in self.previous_value_columns: + self.check_previous_value_results(c, results[c.name].unstack()) + else: + raise AssertionError("Unexpected column %s." % c) + + def assert_result_contains_all_sids(self, results): + assert_equal( + list(map(int, results.columns)), + self.ASSET_FINDER_EQUITY_SIDS, + ) + + def check_previous_value_results(self, column, results): + """ + Check previous value results for a single column. + """ + # Verify that we got a result for every sid. + self.assert_result_contains_all_sids(results) + + events = self.raw_events_no_nulls + # Remove timezone info from trading days, since the outputs + # from pandas won't be tz_localized. + dates = self.trading_days.tz_localize(None) + + for asset, asset_result in results.iterkv(): + relevant_events = events[events.sid == asset.sid] + self.assertEqual(len(relevant_events), 2) + + v1, v2 = relevant_events[self.previous_value_columns[column]] + event1_first_eligible = max( + # .ix doesn't work here because the frame index contains + # integers, so 0 is still interpreted as a key. + relevant_events.iloc[0].loc[['event_date', 'timestamp']], + ) + event2_first_eligible = max( + relevant_events.iloc[1].loc[['event_date', 'timestamp']] + ) + + for date, computed_value in zip(dates, asset_result): + if date >= event2_first_eligible: + # If we've seen event 2, it should win even if we've seen + # event 1, because events are sorted by event_date. + self.assertEqual(computed_value, v2) + elif date >= event1_first_eligible: + # If we've seen event 1 but not event 2, event 1 should + # win. + self.assertEqual(computed_value, v1) + else: + # If we haven't seen either event, then we should have + # column.missing_value. + assert_equal( + computed_value, + column.missing_value, + # Coerce from Timestamp to datetime64. + allow_datetime_coercions=True, + ) + + def check_next_value_results(self, column, results): + """ + Check results for a single column. + """ + self.assert_result_contains_all_sids(results) + + events = self.raw_events_no_nulls + # Remove timezone info from trading days, since the outputs + # from pandas won't be tz_localized. + dates = self.trading_days.tz_localize(None) + for asset, asset_result in results.iterkv(): + relevant_events = events[events.sid == asset.sid] + self.assertEqual(len(relevant_events), 2) + + v1, v2 = relevant_events[self.next_value_columns[column]] + e1, e2 = relevant_events['event_date'] + t1, t2 = relevant_events['timestamp'] + + for date, computed_value in zip(dates, asset_result): + if t1 <= date <= e1: + # If we've seen event 2, it should win even if we've seen + # event 1, because events are sorted by event_date. + self.assertEqual(computed_value, v1) + elif t2 <= date <= e2: + # If we've seen event 1 but not event 2, event 1 should + # win. + self.assertEqual(computed_value, v2) + else: + # If we haven't seen either event, then we should have + # column.missing_value. + assert_equal( + computed_value, + column.missing_value, + # Coerce from Timestamp to datetime64. + allow_datetime_coercions=True, + ) + + def test_wrong_cols(self): + # Test wrong cols (cols != expected) + events = pd.DataFrame({ + 'c': [5], + SID_FIELD_NAME: [1], + TS_FIELD_NAME: [pd.Timestamp('2014')], + EVENT_DATE_FIELD_NAME: [pd.Timestamp('2014')], + }) + + EventsLoader(events, {EventDataSet.next_float: 'c'}, {}) + EventsLoader(events, {}, {EventDataSet.previous_float: 'c'}) + + with self.assertRaises(ValueError) as e: + EventsLoader(events, {EventDataSet.next_float: 'd'}, {}) + + msg = str(e.exception) + expected = ( + "EventsLoader missing required columns ['d'].\n" + "Got Columns: ['c', 'event_date', 'sid', 'timestamp']\n" + "Expected Columns: ['d', 'event_date', 'sid', 'timestamp']" + ) + self.assertEqual(msg, expected) + + +class BlazeEventsLoaderTestCase(EventsLoaderTestCase): + """ + Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader. + """ + + @classmethod + def make_loader(cls, events, next_value_columns, previous_value_columns): + return BlazeEventsLoader( + bz.data(events), + next_value_columns, + previous_value_columns, + ) diff --git a/zipline/pipeline/common.py b/zipline/pipeline/common.py index cfa08a71..e64b1dc9 100644 --- a/zipline/pipeline/common.py +++ b/zipline/pipeline/common.py @@ -9,5 +9,7 @@ DAYS_TO_NEXT = 'days_to_next' NEXT_ANNOUNCEMENT = 'next_announcement' PREVIOUS_AMOUNT = 'previous_amount' PREVIOUS_ANNOUNCEMENT = 'previous_announcement' + +EVENT_DATE_FIELD_NAME = 'event_date' SID_FIELD_NAME = 'sid' TS_FIELD_NAME = 'timestamp' diff --git a/zipline/pipeline/data/_13d_filings.py b/zipline/pipeline/data/_13d_filings.py deleted file mode 100644 index 891d25f1..00000000 --- a/zipline/pipeline/data/_13d_filings.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -Dataset representing recently disclosed 13d filings. -""" -from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype - -from .dataset import Column, DataSet - - -class _13DFilings(DataSet): - """ - Dataset representing dates of recently disclosed 13d filings. - """ - number_shares = Column(float64_dtype) - percent_shares = Column(float64_dtype) - disclosure_date = Column(datetime64ns_dtype) diff --git a/zipline/pipeline/data/__init__.py b/zipline/pipeline/data/__init__.py index 063316ad..b4066453 100644 --- a/zipline/pipeline/data/__init__.py +++ b/zipline/pipeline/data/__init__.py @@ -1,25 +1,9 @@ -from ._13d_filings import _13DFilings -from .buyback_auth import BuybackAuthorizations -from .dividends import ( - DividendsByAnnouncementDate, - DividendsByExDate, - DividendsByPayDate, -) -from .earnings import EarningsCalendar -from .consensus_estimates import ConsensusEstimates from .equity_pricing import USEquityPricing from .dataset import DataSet, Column, BoundColumn __all__ = [ - '_13DFilings', 'BoundColumn', - 'BuybackAuthorizations', 'Column', 'DataSet', - 'DividendsByAnnouncementDate', - 'DividendsByExDate', - 'DividendsByPayDate', - 'EarningsCalendar', - 'ConsensusEstimates', 'USEquityPricing', ] diff --git a/zipline/pipeline/data/buyback_auth.py b/zipline/pipeline/data/buyback_auth.py deleted file mode 100644 index d0174c9d..00000000 --- a/zipline/pipeline/data/buyback_auth.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Datasets representing dates of recently announced buyback authorizations. -""" -from zipline.utils.numpy_utils import ( - datetime64ns_dtype, - float64_dtype, - categorical_dtype -) - -from .dataset import Column, DataSet - - -class BuybackAuthorizations(DataSet): - """ - Dataset representing dates of recently announced cash buyback - authorizations. - """ - previous_amount = Column(float64_dtype) - previous_date = Column(datetime64ns_dtype) - previous_unit = Column(categorical_dtype, missing_value=None) - previous_type = Column(categorical_dtype, missing_value=None) diff --git a/zipline/pipeline/data/consensus_estimates.py b/zipline/pipeline/data/consensus_estimates.py deleted file mode 100644 index ee784290..00000000 --- a/zipline/pipeline/data/consensus_estimates.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -Datasets representing consensus estimates data. -""" -from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype - -from .dataset import Column, DataSet - - -class ConsensusEstimates(DataSet): - """ - Dataset representing consensus estimates data. - """ - previous_release_date = Column(datetime64ns_dtype) - next_release_date = Column(datetime64ns_dtype) - previous_standard_deviation = Column(float64_dtype) - next_standard_deviation = Column(float64_dtype) - previous_count = Column(float64_dtype) - next_count = Column(float64_dtype) - previous_fiscal_quarter = Column(float64_dtype) - next_fiscal_quarter = Column(float64_dtype) - previous_high = Column(float64_dtype) - next_high = Column(float64_dtype) - previous_mean = Column(float64_dtype) - next_mean = Column(float64_dtype) - previous_fiscal_year = Column(float64_dtype) - next_fiscal_year = Column(float64_dtype) - previous_low = Column(float64_dtype) - next_low = Column(float64_dtype) - previous_actual_value = Column(float64_dtype) diff --git a/zipline/pipeline/data/dividends.py b/zipline/pipeline/data/dividends.py deleted file mode 100644 index b3975215..00000000 --- a/zipline/pipeline/data/dividends.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -Dataset representing dates of upcoming dividends. -""" -from zipline.utils.numpy_utils import ( - categorical_dtype, - datetime64ns_dtype, - float64_dtype, -) - -from .dataset import Column, DataSet - - -class DividendsByExDate(DataSet): - next_date = Column(datetime64ns_dtype) - previous_date = Column(datetime64ns_dtype) - next_amount = Column(float64_dtype) - previous_amount = Column(float64_dtype) - next_currency = Column(categorical_dtype) - previous_currency = Column(categorical_dtype) - next_type = Column(categorical_dtype) - previous_type = Column(categorical_dtype) - - -class DividendsByPayDate(DataSet): - next_date = Column(datetime64ns_dtype) - previous_date = Column(datetime64ns_dtype) - next_amount = Column(float64_dtype) - previous_amount = Column(float64_dtype) - next_currency = Column(categorical_dtype) - previous_currency = Column(categorical_dtype) - next_type = Column(categorical_dtype) - previous_type = Column(categorical_dtype) - - -class DividendsByAnnouncementDate(DataSet): - previous_announcement_date = Column(datetime64ns_dtype) - previous_amount = Column(float64_dtype) - previous_currency = Column(categorical_dtype) - previous_type = Column(categorical_dtype) diff --git a/zipline/pipeline/data/earnings.py b/zipline/pipeline/data/earnings.py deleted file mode 100644 index c6abcc49..00000000 --- a/zipline/pipeline/data/earnings.py +++ /dev/null @@ -1,17 +0,0 @@ -""" -Dataset representing dates of upcoming earnings. -""" -from zipline.utils.numpy_utils import datetime64ns_dtype - -from .dataset import Column, DataSet - - -class EarningsCalendar(DataSet): - """ - Dataset representing dates of upcoming or recently announced earnings. - """ - next_announcement = Column(datetime64ns_dtype) - previous_announcement = Column(datetime64ns_dtype) - - # TODO: Provide categorical columns for when during the day the - # announcement occurred. diff --git a/zipline/pipeline/factors/__init__.py b/zipline/pipeline/factors/__init__.py index 7858920c..f4604711 100644 --- a/zipline/pipeline/factors/__init__.py +++ b/zipline/pipeline/factors/__init__.py @@ -5,13 +5,8 @@ from .factor import ( RecarrayField, ) from .events import ( - BusinessDaysSince13DFilingsDate, - BusinessDaysSinceBuybackAuth, - BusinessDaysSinceDividendAnnouncement, - BusinessDaysSincePreviousEarnings, - BusinessDaysSincePreviousExDate, - BusinessDaysUntilNextEarnings, - BusinessDaysUntilNextExDate, + BusinessDaysSincePreviousEvent, + BusinessDaysUntilNextEvent, ) from .technical import ( Aroon, @@ -37,13 +32,8 @@ __all__ = [ 'Aroon', 'AverageDollarVolume', 'BollingerBands', - 'BusinessDaysSince13DFilingsDate', - 'BusinessDaysSinceBuybackAuth', - 'BusinessDaysSinceDividendAnnouncement', - 'BusinessDaysSincePreviousEarnings', - 'BusinessDaysSincePreviousExDate', - 'BusinessDaysUntilNextEarnings', - 'BusinessDaysUntilNextExDate', + 'BusinessDaysSincePreviousEvent', + 'BusinessDaysUntilNextEvent', 'CustomFactor', 'EWMA', 'EWMSTD', diff --git a/zipline/pipeline/factors/events.py b/zipline/pipeline/factors/events.py index bed4f9a8..8f568e09 100644 --- a/zipline/pipeline/factors/events.py +++ b/zipline/pipeline/factors/events.py @@ -3,13 +3,6 @@ Factors describing information about event data (e.g. earnings announcements, acquisitions, dividends, etc.). """ from numpy import newaxis -from ..data import ( - _13DFilings, - BuybackAuthorizations, - DividendsByAnnouncementDate, - DividendsByExDate, - EarningsCalendar -) from zipline.utils.numpy_utils import ( NaTD, busday_count_mask_NaT, @@ -20,7 +13,7 @@ from zipline.utils.numpy_utils import ( from .factor import Factor -class BusinessDaysSincePreviousEvents(Factor): +class BusinessDaysSincePreviousEvent(Factor): """ Abstract class for business days since a previous event. Returns the number of **business days** (not trading days!) since @@ -51,7 +44,7 @@ class BusinessDaysSincePreviousEvents(Factor): return busday_count_mask_NaT(announce_dates, reference_dates) -class BusinessDaysUntilNextEvents(Factor): +class BusinessDaysUntilNextEvent(Factor): """ Abstract class for business days since a next event. Returns the number of **business days** (not trading days!) until @@ -84,118 +77,3 @@ class BusinessDaysUntilNextEvents(Factor): # Convert row labels into a column vector for broadcasted comparison. reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis] return busday_count_mask_NaT(reference_dates, announce_dates) - - -class BusinessDaysUntilNextEarnings(BusinessDaysUntilNextEvents): - """ - Factor returning the number of **business days** (not trading days!) until - the next known earnings date for each asset. - - Assets that announced or will announce earnings on the day of ``compute`` - will produce a value of 0.0. Assets that will announce the event on the - next upcoming business day will produce a value of 1.0. - - Assets for which the date of the next earnings announcement is ``NaT`` will - produce a value of ``NaN``. This most commonly occurs because many - companies do not publish the exact date of their upcoming earnings - announcements until a few weeks before the announcement. - - See Also - -------- - BusinessDaysSincePreviousEarnings - """ - inputs = [EarningsCalendar.next_announcement] - - -class BusinessDaysSincePreviousEarnings(BusinessDaysSincePreviousEvents): - """ - Factor returning the number of **business days** (not trading days!) since - the most recent earnings date for each asset. - - Assets that announced or will announce earnings on the day of ``compute`` - will produce a value of 0.0. Assets that will announce the event on the - next upcoming business day will produce a value of 1.0. - - Assets which announced or will announce the earnings today will produce a - value of 0.0. Assets that announced the on the previous business day will - produce a value of 1.0. - - Assets for which the previous earnings date is `NaT` will produce a value - of `NaN`. This will happen in the interval between IPO and first earnings - for most companies. - - See Also - -------- - BusinessDaysUntilNextEarnings - """ - inputs = [EarningsCalendar.previous_announcement] - - -class BusinessDaysSinceBuybackAuth( - BusinessDaysSincePreviousEvents -): - """ - Factor returning the number of **business days** (not trading days!) since - the most recent buyback authorization for each asset. - - See Also - -------- - zipline.pipeline.factors.BusinessDaysSinceBuybackAuth - """ - inputs = [BuybackAuthorizations.previous_date] - - -class BusinessDaysSinceDividendAnnouncement( - BusinessDaysSincePreviousEvents -): - """ - Factor returning the number of **business days** (not trading days!) since - the most recent dividend announcement for each asset. - - - See Also - -------- - zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement - """ - inputs = [DividendsByAnnouncementDate.previous_announcement_date] - - -class BusinessDaysUntilNextExDate( - BusinessDaysUntilNextEvents -): - """ - Factor returning the number of **business days** (not trading days!) until - the next ex date for each asset. - - - See Also - -------- - zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement - """ - inputs = [DividendsByExDate.next_date] - - -class BusinessDaysSincePreviousExDate( - BusinessDaysSincePreviousEvents -): - """ - Factor returning the number of **business days** (not trading days!) since - the most recent ex date for each asset. - - - See Also - -------- - zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement - """ - inputs = [DividendsByExDate.previous_date] - - -class BusinessDaysSince13DFilingsDate( - BusinessDaysSincePreviousEvents -): - """ - Factor returning the number of **business days** (not trading days!) since - the most recent 13d filings for each asset. - - """ - inputs = [_13DFilings.disclosure_date] diff --git a/zipline/pipeline/loaders/_13d_filings.py b/zipline/pipeline/loaders/_13d_filings.py deleted file mode 100644 index 4a45d879..00000000 --- a/zipline/pipeline/loaders/_13d_filings.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Reference implementation for 13d filings loaders. -""" - -from zipline.pipeline.data import _13DFilings -from zipline.pipeline.loaders.events import EventsLoader -from zipline.utils.memoize import lazyval - - -DISCLOSURE_DATE = 'disclosure_date' -NUM_SHARES = 'number_shares' -PERCENT_SHARES = 'percent_shares' - - -class _13DFilingsLoader(EventsLoader): - """ - Reference loader for - :class:`zipline.pipeline.data._13DFilings`. - - events_by_sid: dict[sid -> pd.DataFrame(knowledge date, - disclosure date, percent shares, number of shares)] - - """ - expected_cols = frozenset([DISCLOSURE_DATE, - PERCENT_SHARES, - NUM_SHARES]) - event_date_col = DISCLOSURE_DATE - - def __init__(self, all_dates, events_by_sid, - infer_timestamps=False, - dataset=_13DFilings): - super(_13DFilingsLoader, self).__init__( - all_dates, events_by_sid, infer_timestamps, dataset=dataset, - ) - - @lazyval - def disclosure_date_loader(self): - return self._previous_event_date_loader( - self.dataset.disclosure_date, - ) - - @lazyval - def percent_shares_loader(self): - return self._previous_event_value_loader( - self.dataset.percent_shares, - PERCENT_SHARES - ) - - @lazyval - def number_shares_loader(self): - return self._previous_event_value_loader( - self.dataset.number_shares, - NUM_SHARES - ) diff --git a/zipline/pipeline/loaders/__init__.py b/zipline/pipeline/loaders/__init__.py index 270cb9ce..0e94c6c0 100644 --- a/zipline/pipeline/loaders/__init__.py +++ b/zipline/pipeline/loaders/__init__.py @@ -1,21 +1,5 @@ -from ._13d_filings import _13DFilingsLoader -from .buyback_auth import BuybackAuthorizationsLoader -from .consensus_estimates import ConsensusEstimatesLoader -from .earnings import EarningsCalendarLoader -from .dividends import ( - DividendsByAnnouncementDateLoader, - DividendsByExDateLoader, - DividendsByPayDateLoader, -) from .equity_pricing_loader import USEquityPricingLoader __all__ = [ - '_13DFilingsLoader', - 'BuybackAuthorizationsLoader', - 'DividendsByAnnouncementDateLoader', - 'DividendsByExDateLoader', - 'DividendsByPayDateLoader', - 'EarningsCalendarLoader', - 'ConsensusEstimatesLoader', 'USEquityPricingLoader', ] diff --git a/zipline/pipeline/loaders/blaze/_13d_filings.py b/zipline/pipeline/loaders/blaze/_13d_filings.py deleted file mode 100644 index 592d847e..00000000 --- a/zipline/pipeline/loaders/blaze/_13d_filings.py +++ /dev/null @@ -1,68 +0,0 @@ -from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME -from zipline.pipeline.data import _13DFilings -from zipline.pipeline.loaders import _13DFilingsLoader -from .events import BlazeEventsLoader -from zipline.pipeline.loaders._13d_filings import ( - DISCLOSURE_DATE, - NUM_SHARES, - PERCENT_SHARES, -) - - -class Blaze_13DFilingsLoader(BlazeEventsLoader): - """A pipeline loader for the ``_13DFilings`` dataset that - loads data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the atomic terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {PERCENTAGE}: float64, - {NUM_SHARES}: float64, - {DISCLOSURE_DATE}: ?datetime, - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the disclosure, the - date of the disclosure, the percentage, and the number of shares. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all disclosures. - """ - - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - PERCENTAGE=PERCENT_SHARES, - NUM_SHARES=NUM_SHARES, - DISCLOSURE_DATE=DISCLOSURE_DATE - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - PERCENT_SHARES, - NUM_SHARES, - DISCLOSURE_DATE - }) - - concrete_loader = _13DFilingsLoader - concrete_dataset = _13DFilings diff --git a/zipline/pipeline/loaders/blaze/__init__.py b/zipline/pipeline/loaders/blaze/__init__.py index 1dfd0dd7..ec88396e 100644 --- a/zipline/pipeline/loaders/blaze/__init__.py +++ b/zipline/pipeline/loaders/blaze/__init__.py @@ -1,30 +1,11 @@ -from ._13d_filings import Blaze_13DFilingsLoader - -from .buyback_auth import BlazeBuybackAuthorizationsLoader from .core import ( BlazeLoader, NoDeltasWarning, from_blaze, global_loader, ) -from .dividends import ( - BlazeDividendsByAnnouncementDateLoader, - BlazeDividendsByExDateLoader, - BlazeDividendsByPayDateLoader -) -from .earnings import ( - BlazeEarningsCalendarLoader, -) -from .consensus_estimates import BlazeConsensusEstimatesLoader __all__ = ( - 'Blaze_13DFilingsLoader', - 'BlazeBuybackAuthorizationsLoader', - 'BlazeDividendsByAnnouncementDateLoader', - 'BlazeConsensusEstimatesLoader', - 'BlazeDividendsByExDateLoader', - 'BlazeDividendsByPayDateLoader', - 'BlazeEarningsCalendarLoader', 'BlazeLoader', 'from_blaze', 'global_loader', diff --git a/zipline/pipeline/loaders/blaze/buyback_auth.py b/zipline/pipeline/loaders/blaze/buyback_auth.py deleted file mode 100644 index db5cd3cc..00000000 --- a/zipline/pipeline/loaders/blaze/buyback_auth.py +++ /dev/null @@ -1,76 +0,0 @@ -from .core import ( - SID_FIELD_NAME, - TS_FIELD_NAME, -) - -from zipline.pipeline.data import BuybackAuthorizations -from zipline.pipeline.loaders import BuybackAuthorizationsLoader -from .events import BlazeEventsLoader -from zipline.pipeline.loaders.buyback_auth import ( - BUYBACK_ANNOUNCEMENT_FIELD_NAME, - BUYBACK_TYPE_FIELD_NAME, - BUYBACK_UNIT_FIELD_NAME, -) -from zipline.pipeline.loaders.buyback_auth import BUYBACK_AMOUNT_FIELD_NAME - - -class BlazeBuybackAuthorizationsLoader(BlazeEventsLoader): - """A pipeline loader for the ``BuybackAuthorizations`` dataset that - loads data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the loadable terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime, - {BUYBACK_AMOUNT_FIELD_NAME}: ?float64, - {BUYBACK_UNIT_FIELD_NAME}: ?str, - {BUYBACK_TYPE_FIELD_NAME}: ?str, - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the announcement, the - date when the buyback was announced, the buyback amount, the buyback unit, - and the buyback type. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all announcements. - """ - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME, - BUYBACK_AMOUNT_FIELD_NAME=BUYBACK_AMOUNT_FIELD_NAME, - BUYBACK_UNIT_FIELD_NAME=BUYBACK_UNIT_FIELD_NAME, - BUYBACK_TYPE_FIELD_NAME=BUYBACK_TYPE_FIELD_NAME - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - BUYBACK_ANNOUNCEMENT_FIELD_NAME, - BUYBACK_AMOUNT_FIELD_NAME, - BUYBACK_UNIT_FIELD_NAME, - BUYBACK_TYPE_FIELD_NAME - }) - - concrete_loader = BuybackAuthorizationsLoader - default_dataset = BuybackAuthorizations diff --git a/zipline/pipeline/loaders/blaze/consensus_estimates.py b/zipline/pipeline/loaders/blaze/consensus_estimates.py deleted file mode 100644 index 78686738..00000000 --- a/zipline/pipeline/loaders/blaze/consensus_estimates.py +++ /dev/null @@ -1,94 +0,0 @@ -from .events import BlazeEventsLoader -from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME -from zipline.pipeline.data import ConsensusEstimates -from zipline.pipeline.loaders import ConsensusEstimatesLoader -from zipline.pipeline.loaders.consensus_estimates import ( - ACTUAL_VALUE_FIELD_NAME, - COUNT_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - HIGH_FIELD_NAME, - LOW_FIELD_NAME, - MEAN_FIELD_NAME, - RELEASE_DATE_FIELD_NAME, - STANDARD_DEVIATION_FIELD_NAME, -) - - -class BlazeConsensusEstimatesLoader(BlazeEventsLoader): - """A pipeline loader for the ``ConsensusEstimates`` dataset that - loads - data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the loadable terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {RELEASE_DATE_FIELD_NAME}: ?datetime, - {STANDARD_DEVIATION_FIELD_NAME}: ?float64, - {COUNT_FIELD_NAME}: ?float64, - {FISCAL_QUARTER_FIELD_NAME}: ?float64, - {HIGH_FIELD_NAME}: ?float64, - {MEAN_FIELD_NAME}: ?float64, - {FISCAL_YEAR_FIELD_NAME}: ?float64, - {LOW_FIELD_NAME}: ?float64, - {ACTUAL_VALUE_FIELD_NAME}: ?float64 - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the announcement, - the release date for the corresponding estimate, and other estimate - information. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all announcements. - """ - - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - RELEASE_DATE_FIELD_NAME=RELEASE_DATE_FIELD_NAME, - STANDARD_DEVIATION_FIELD_NAME=STANDARD_DEVIATION_FIELD_NAME, - COUNT_FIELD_NAME=COUNT_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME=FISCAL_QUARTER_FIELD_NAME, - HIGH_FIELD_NAME=HIGH_FIELD_NAME, - MEAN_FIELD_NAME=MEAN_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME=FISCAL_YEAR_FIELD_NAME, - LOW_FIELD_NAME=LOW_FIELD_NAME, - ACTUAL_VALUE_FIELD_NAME=ACTUAL_VALUE_FIELD_NAME - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - RELEASE_DATE_FIELD_NAME, - STANDARD_DEVIATION_FIELD_NAME, - COUNT_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - HIGH_FIELD_NAME, - MEAN_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - LOW_FIELD_NAME, - ACTUAL_VALUE_FIELD_NAME - }) - - concrete_loader = ConsensusEstimatesLoader - default_dataset = ConsensusEstimates diff --git a/zipline/pipeline/loaders/blaze/dividends.py b/zipline/pipeline/loaders/blaze/dividends.py deleted file mode 100644 index 6ee8b34e..00000000 --- a/zipline/pipeline/loaders/blaze/dividends.py +++ /dev/null @@ -1,209 +0,0 @@ -from zipline.pipeline.common import ( - ANNOUNCEMENT_FIELD_NAME, - SID_FIELD_NAME, - TS_FIELD_NAME, -) -from zipline.pipeline.data import ( - DividendsByExDate, - DividendsByAnnouncementDate, - DividendsByPayDate -) -from zipline.pipeline.loaders import ( - DividendsByAnnouncementDateLoader, - DividendsByPayDateLoader, - DividendsByExDateLoader -) -from .events import BlazeEventsLoader -from zipline.pipeline.loaders.dividends import ( - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME, - EX_DATE_FIELD_NAME, - PAY_DATE_FIELD_NAME, -) - - -class BlazeDividendsByAnnouncementDateLoader(BlazeEventsLoader): - """A pipeline loader for the ``DividendsByAnnouncementDate`` dataset that - loads data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the atomic terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {CASH_AMOUNT_FIELD_NAME}: ?float64, - {ANNOUNCEMENT_FIELD_NAME}: ?datetime, - {CURRENCY_FIELD_NAME}: ?string, - {DIVIDEND_TYPE_FIELD_NAME}: ?string, - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the announcement, the - date when the dividends will be announced, and the cash amount. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all announcements. - """ - - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME, - ANNOUNCEMENT_FIELD_NAME=ANNOUNCEMENT_FIELD_NAME, - CURRENCY_FIELD_NAME=CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME=DIVIDEND_TYPE_FIELD_NAME, - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - ANNOUNCEMENT_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME - }) - - concrete_loader = DividendsByAnnouncementDateLoader - default_dataset = DividendsByAnnouncementDate - - -class BlazeDividendsByExDateLoader(BlazeEventsLoader): - """A pipeline loader for the ``DividendsByExDate`` dataset that loads - data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the atomic terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {EX_DATE_FIELD_NAME}: ?datetime, - {CASH_AMOUNT_FIELD_NAME}: ?datetime, - {CURRENCY_FIELD_NAME}: ?string, - {DIVIDEND_TYPE_FIELD_NAME}: ?string, - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the ex date, the - ex date, and the associated cash amount. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all announcements. - """ - - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - EX_DATE_FIELD_NAME=EX_DATE_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME=CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME=DIVIDEND_TYPE_FIELD_NAME, - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - EX_DATE_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME, - }) - - concrete_loader = DividendsByExDateLoader - default_dataset = DividendsByExDate - - -class BlazeDividendsByPayDateLoader(BlazeEventsLoader): - """A pipeline loader for the ``DividendsByPayDate`` dataset that loads - data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the atomic terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {PAY_DATE_FIELD_NAME}: ?datetime, - {CASH_AMOUNT_FIELD_NAME}: ?datetime, - {CURRENCY_FIELD_NAME}: ?string, - {DIVIDEND_TYPE_FIELD_NAME}: ?string, - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the pay date, the pay date, - and the associated cash amount. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all announcements. - """ - - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - PAY_DATE_FIELD_NAME=PAY_DATE_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME=CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME=DIVIDEND_TYPE_FIELD_NAME - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - PAY_DATE_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME, - }) - - concrete_loader = DividendsByPayDateLoader - default_dataset = DividendsByPayDate diff --git a/zipline/pipeline/loaders/blaze/earnings.py b/zipline/pipeline/loaders/blaze/earnings.py deleted file mode 100644 index ee412edf..00000000 --- a/zipline/pipeline/loaders/blaze/earnings.py +++ /dev/null @@ -1,61 +0,0 @@ -from zipline.pipeline.common import ( - ANNOUNCEMENT_FIELD_NAME, - SID_FIELD_NAME, - TS_FIELD_NAME, -) -from zipline.pipeline.data import EarningsCalendar -from zipline.pipeline.loaders import EarningsCalendarLoader -from .events import BlazeEventsLoader - - -class BlazeEarningsCalendarLoader(BlazeEventsLoader): - """A pipeline loader for the ``EarningsCalendar`` dataset that loads - data from a blaze expression. - - Parameters - ---------- - expr : Expr - The expression representing the data to load. - resources : dict, optional - Mapping from the loadable terms of ``expr`` to actual data resources. - odo_kwargs : dict, optional - Extra keyword arguments to pass to odo when executing the expression. - data_query_time : time, optional - The time to use for the data query cutoff. - data_query_tz : tzinfo or str - The timezeone to use for the data query cutoff. - dataset: DataSet - The DataSet object for which this loader loads data. - - Notes - ----- - The expression should have a tabular dshape of:: - - Dim * {{ - {SID_FIELD_NAME}: int64, - {TS_FIELD_NAME}: datetime, - {ANNOUNCEMENT_FIELD_NAME}: ?datetime, - }} - - Where each row of the table is a record including the sid to identify the - company, the timestamp where we learned about the announcement, and the - date when the earnings will be announced. - - If the '{TS_FIELD_NAME}' field is not included it is assumed that we - start the backtest with knowledge of all announcements. - """ - - __doc__ = __doc__.format( - TS_FIELD_NAME=TS_FIELD_NAME, - SID_FIELD_NAME=SID_FIELD_NAME, - ANNOUNCEMENT_FIELD_NAME=ANNOUNCEMENT_FIELD_NAME, - ) - - _expected_fields = frozenset({ - TS_FIELD_NAME, - SID_FIELD_NAME, - ANNOUNCEMENT_FIELD_NAME, - }) - - concrete_loader = EarningsCalendarLoader - default_dataset = EarningsCalendar diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index 6fae418f..c71646cb 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -1,5 +1,3 @@ -import abc - from datashape import istabular from .core import ( @@ -7,6 +5,10 @@ from .core import ( ffill_query_in_range, ) from zipline.pipeline.loaders.base import PipelineLoader +from zipline.pipeline.loaders.events import ( + EventsLoader, + required_event_fields, +) from zipline.pipeline.common import ( SID_FIELD_NAME, TS_FIELD_NAME, @@ -56,41 +58,37 @@ class BlazeEventsLoader(PipelineLoader): If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. """ - default_dataset = None @preprocess(data_query_tz=optionally(ensure_timezone)) def __init__(self, expr, + next_value_columns, + previous_value_columns, resources=None, odo_kwargs=None, data_query_time=None, - data_query_tz=None, - dataset=default_dataset): - if dataset is None: - dataset = self.default_dataset + data_query_tz=None): dshape = expr.dshape - if not istabular(dshape): raise ValueError( 'expression dshape must be tabular, got: %s' % dshape, ) - expected_fields = self._expected_fields + required_cols = list( + required_event_fields(next_value_columns, previous_value_columns) + ) self._expr = bind_expression_to_resources( - expr[list(expected_fields)], + expr[required_cols], resources, ) + self._next_value_columns = next_value_columns + self._previous_value_columns = previous_value_columns self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {} - self._dataset = dataset check_data_query_args(data_query_time, data_query_tz) self._data_query_time = data_query_time self._data_query_tz = data_query_tz - @abc.abstractproperty - def concrete_loader(self): - NotImplementedError('concrete_loader') - def load_adjusted_array(self, columns, dates, assets, mask): data_query_time = self._data_query_time data_query_tz = self._data_query_tz @@ -120,13 +118,14 @@ class BlazeEventsLoader(PipelineLoader): inplace=True, ts_field=TS_FIELD_NAME, ) - gb = raw.groupby(SID_FIELD_NAME) - return self.concrete_loader( - dates, - self.prepare_data(raw, gb), - dataset=self._dataset, - ).load_adjusted_array(columns, dates, assets, mask) - def prepare_data(self, raw, gb): - return {sid: raw.loc[group].drop(SID_FIELD_NAME, axis=1) for sid, group - in gb.groups.items()} + return EventsLoader( + events=raw, + next_value_columns=self._next_value_columns, + previous_value_columns=self._previous_value_columns, + ).load_adjusted_array( + columns, + dates, + assets, + mask, + ) diff --git a/zipline/pipeline/loaders/buyback_auth.py b/zipline/pipeline/loaders/buyback_auth.py deleted file mode 100644 index 0cc55e04..00000000 --- a/zipline/pipeline/loaders/buyback_auth.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -Reference implementation for buyback auth loaders. -""" - -from ..data import BuybackAuthorizations -from .events import EventsLoader - -from zipline.utils.memoize import lazyval - -BUYBACK_AMOUNT_FIELD_NAME = 'buyback_amount' -BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date' -BUYBACK_TYPE_FIELD_NAME = 'buyback_type' -BUYBACK_UNIT_FIELD_NAME = 'buyback_unit' - - -class BuybackAuthorizationsLoader(EventsLoader): - """ - Reference loader for - :class:`zipline.pipeline.data.BuybackAuthorizations`. - - events_by_sid: dict[sid -> pd.DataFrame(knowledge date, - event date, buyback amount, buyback unit, buyback type)] - - """ - expected_cols = frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME, - BUYBACK_AMOUNT_FIELD_NAME, - BUYBACK_UNIT_FIELD_NAME, - BUYBACK_TYPE_FIELD_NAME]) - - event_date_col = BUYBACK_ANNOUNCEMENT_FIELD_NAME - - def __init__(self, - all_dates, - events_by_sid, - infer_timestamps=False, - dataset=BuybackAuthorizations): - super(BuybackAuthorizationsLoader, self).__init__( - all_dates, - events_by_sid, - infer_timestamps=infer_timestamps, - dataset=dataset, - ) - - @lazyval - def previous_amount_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_amount, - BUYBACK_AMOUNT_FIELD_NAME - ) - - @lazyval - def previous_date_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_date, - ) - - @lazyval - def previous_unit_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_unit, - BUYBACK_UNIT_FIELD_NAME, - ) - - @lazyval - def previous_type_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_type, - BUYBACK_TYPE_FIELD_NAME, - ) diff --git a/zipline/pipeline/loaders/consensus_estimates.py b/zipline/pipeline/loaders/consensus_estimates.py deleted file mode 100644 index e9eed4e9..00000000 --- a/zipline/pipeline/loaders/consensus_estimates.py +++ /dev/null @@ -1,156 +0,0 @@ -""" -Reference implementation for ConsensusEstimates loaders. -""" - -from ..data import ConsensusEstimates -from .events import EventsLoader -from zipline.utils.memoize import lazyval - -ACTUAL_VALUE_FIELD_NAME = 'actual_value' -COUNT_FIELD_NAME = 'count' -FISCAL_QUARTER_FIELD_NAME = 'fiscal_quarter' -FISCAL_YEAR_FIELD_NAME = 'fiscal_year' -HIGH_FIELD_NAME = 'high' -LOW_FIELD_NAME = 'low' -MEAN_FIELD_NAME = 'mean' -RELEASE_DATE_FIELD_NAME = 'release_date' -STANDARD_DEVIATION_FIELD_NAME = 'standard_deviation' - - -class ConsensusEstimatesLoader(EventsLoader): - - expected_cols = frozenset([RELEASE_DATE_FIELD_NAME, - STANDARD_DEVIATION_FIELD_NAME, - COUNT_FIELD_NAME, - FISCAL_QUARTER_FIELD_NAME, - HIGH_FIELD_NAME, - MEAN_FIELD_NAME, - FISCAL_YEAR_FIELD_NAME, - LOW_FIELD_NAME, - ACTUAL_VALUE_FIELD_NAME]) - - event_date_col = RELEASE_DATE_FIELD_NAME - - def __init__(self, all_dates, events_by_sid, - infer_timestamps=False, - dataset=ConsensusEstimates): - super(ConsensusEstimatesLoader, self).__init__( - all_dates, events_by_sid, infer_timestamps, dataset=dataset, - ) - - @lazyval - def next_release_date_loader(self): - return self._next_event_date_loader( - self.dataset.next_release_date, - ) - - @lazyval - def previous_release_date_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_release_date, - ) - - @lazyval - def next_standard_deviation_loader(self): - return self._next_event_value_loader( - self.dataset.next_standard_deviation, - STANDARD_DEVIATION_FIELD_NAME, - ) - - @lazyval - def previous_standard_deviation_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_standard_deviation, - STANDARD_DEVIATION_FIELD_NAME, - ) - - @lazyval - def next_count_loader(self): - return self._next_event_value_loader( - self.dataset.next_count, - COUNT_FIELD_NAME, - ) - - @lazyval - def previous_count_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_count, - COUNT_FIELD_NAME, - ) - - @lazyval - def next_fiscal_quarter_loader(self): - return self._next_event_value_loader( - self.dataset.next_fiscal_quarter, - FISCAL_QUARTER_FIELD_NAME, - ) - - @lazyval - def previous_fiscal_quarter_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_fiscal_quarter, - FISCAL_QUARTER_FIELD_NAME, - ) - - @lazyval - def next_high_loader(self): - return self._next_event_value_loader( - self.dataset.next_high, - HIGH_FIELD_NAME, - ) - - @lazyval - def previous_high_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_high, - HIGH_FIELD_NAME, - ) - - @lazyval - def next_mean_loader(self): - return self._next_event_value_loader( - self.dataset.next_mean, - MEAN_FIELD_NAME, - ) - - @lazyval - def previous_mean_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_mean, - MEAN_FIELD_NAME, - ) - - @lazyval - def next_fiscal_year_loader(self): - return self._next_event_value_loader( - self.dataset.next_fiscal_year, - FISCAL_YEAR_FIELD_NAME, - ) - - @lazyval - def previous_fiscal_year_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_fiscal_year, - FISCAL_YEAR_FIELD_NAME, - ) - - @lazyval - def next_low_loader(self): - return self._next_event_value_loader( - self.dataset.next_low, - LOW_FIELD_NAME, - ) - - @lazyval - def previous_low_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_low, - LOW_FIELD_NAME, - ) - - @lazyval - def previous_actual_value_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_actual_value, - ACTUAL_VALUE_FIELD_NAME, - ) diff --git a/zipline/pipeline/loaders/dividends.py b/zipline/pipeline/loaders/dividends.py deleted file mode 100644 index c68d95b4..00000000 --- a/zipline/pipeline/loaders/dividends.py +++ /dev/null @@ -1,191 +0,0 @@ -from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME -from zipline.pipeline.loaders.events import EventsLoader -from ..data import ( - DividendsByAnnouncementDate, - DividendsByExDate, - DividendsByPayDate, -) -from zipline.utils.memoize import lazyval - - -CASH_AMOUNT_FIELD_NAME = 'cash_amount' -CASH_AMOUNT_FIELD_NAME = 'cash_amount' -CURRENCY_FIELD_NAME = 'currency_type' -DIVIDEND_TYPE_FIELD_NAME = 'dividend_type' -EX_DATE_FIELD_NAME = 'ex_date' -PAY_DATE_FIELD_NAME = 'pay_date' - - -class DividendsByAnnouncementDateLoader(EventsLoader): - expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME]) - - event_date_col = ANNOUNCEMENT_FIELD_NAME - - def __init__(self, all_dates, events_by_sid, - infer_timestamps=False, - dataset=DividendsByAnnouncementDate): - super(DividendsByAnnouncementDateLoader, self).__init__( - all_dates, events_by_sid, infer_timestamps, dataset=dataset, - ) - - @lazyval - def previous_announcement_date_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_announcement_date, - ) - - @lazyval - def previous_amount_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_amount, - CASH_AMOUNT_FIELD_NAME - ) - - @lazyval - def previous_currency_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_currency, - CURRENCY_FIELD_NAME - ) - - @lazyval - def previous_type_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_type, - DIVIDEND_TYPE_FIELD_NAME - ) - - -class DividendsByPayDateLoader(EventsLoader): - expected_cols = frozenset([PAY_DATE_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME]) - - event_date_col = PAY_DATE_FIELD_NAME - - def __init__(self, all_dates, events_by_sid, - infer_timestamps=False, - dataset=DividendsByPayDate): - super(DividendsByPayDateLoader, self).__init__( - all_dates, events_by_sid, infer_timestamps, dataset=dataset, - ) - - @lazyval - def next_date_loader(self): - return self._next_event_date_loader(self.dataset.next_date) - - @lazyval - def previous_date_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_date, - ) - - @lazyval - def next_amount_loader(self): - return self._next_event_value_loader(self.dataset.next_amount, - CASH_AMOUNT_FIELD_NAME) - - @lazyval - def previous_amount_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_amount, - CASH_AMOUNT_FIELD_NAME - ) - - @lazyval - def previous_currency_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_currency, - CURRENCY_FIELD_NAME - ) - - @lazyval - def next_currency_loader(self): - return self._next_event_value_loader( - self.dataset.next_currency, - CURRENCY_FIELD_NAME - ) - - @lazyval - def previous_type_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_type, - DIVIDEND_TYPE_FIELD_NAME - ) - - @lazyval - def next_type_loader(self): - return self._next_event_value_loader( - self.dataset.next_type, - DIVIDEND_TYPE_FIELD_NAME - ) - - -class DividendsByExDateLoader(EventsLoader): - expected_cols = frozenset([EX_DATE_FIELD_NAME, - CASH_AMOUNT_FIELD_NAME, - CURRENCY_FIELD_NAME, - DIVIDEND_TYPE_FIELD_NAME]) - - event_date_col = EX_DATE_FIELD_NAME - - def __init__(self, all_dates, events_by_sid, - infer_timestamps=False, - dataset=DividendsByExDate): - super(DividendsByExDateLoader, self).__init__( - all_dates, events_by_sid, infer_timestamps, dataset=dataset, - ) - - @lazyval - def next_date_loader(self): - return self._next_event_date_loader(self.dataset.next_date) - - @lazyval - def previous_date_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_date, - ) - - @lazyval - def next_amount_loader(self): - return self._next_event_value_loader(self.dataset.next_amount, - CASH_AMOUNT_FIELD_NAME) - - @lazyval - def previous_amount_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_amount, - CASH_AMOUNT_FIELD_NAME - ) - - @lazyval - def previous_currency_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_currency, - CURRENCY_FIELD_NAME - ) - - @lazyval - def next_currency_loader(self): - return self._next_event_value_loader( - self.dataset.next_currency, - CURRENCY_FIELD_NAME - ) - - @lazyval - def previous_type_loader(self): - return self._previous_event_value_loader( - self.dataset.previous_type, - DIVIDEND_TYPE_FIELD_NAME - ) - - @lazyval - def next_type_loader(self): - return self._next_event_value_loader( - self.dataset.next_type, - DIVIDEND_TYPE_FIELD_NAME - ) diff --git a/zipline/pipeline/loaders/earnings.py b/zipline/pipeline/loaders/earnings.py deleted file mode 100644 index d4ed9bb6..00000000 --- a/zipline/pipeline/loaders/earnings.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -Reference implementation for EarningsCalendar loaders. -""" - -from ..data import EarningsCalendar -from .events import EventsLoader -from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME -from zipline.utils.memoize import lazyval - - -class EarningsCalendarLoader(EventsLoader): - - expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME]) - - event_date_col = ANNOUNCEMENT_FIELD_NAME - - def __init__(self, all_dates, events_by_sid, - infer_timestamps=False, - dataset=EarningsCalendar): - super(EarningsCalendarLoader, self).__init__( - all_dates, events_by_sid, infer_timestamps, dataset=dataset, - ) - - @lazyval - def next_announcement_loader(self): - return self._next_event_date_loader(self.dataset.next_announcement) - - @lazyval - def previous_announcement_loader(self): - return self._previous_event_date_loader( - self.dataset.previous_announcement, - ) diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index be1a8ba3..f757248e 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -1,231 +1,218 @@ -import abc +import numpy as np import pandas as pd -from six import iteritems -from toolz import merge + +from six import viewvalues +from toolz import groupby, merge from .base import PipelineLoader from .frame import DataFrameLoader -from .utils import previous_event_frame, next_event_frame -from zipline.pipeline.common import TS_FIELD_NAME -from zipline.utils.numpy_utils import NaTD +from zipline.pipeline.common import ( + EVENT_DATE_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.loaders.utils import ( + next_event_indexer, + previous_event_indexer, +) -WRONG_COLS_ERROR = "Expected columns {expected_columns} for sid {sid} but " \ - "got columns {resulting_columns}." -WRONG_SINGLE_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have " - "1 column and to be in a DataFrame, " - "Series, or DatetimeIndex.") +def required_event_fields(next_value_columns, previous_value_columns): + """ + Compute the set of resource columns required to serve + ``next_value_columns`` and ``previous_value_columns``. + """ + # These metadata columns are used to align event indexers. + return { + TS_FIELD_NAME, + SID_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + }.union( + # We also expect any of the field names that our loadable columns + # are mapped to. + viewvalues(next_value_columns), + viewvalues(previous_value_columns), + ) -WRONG_MANY_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have " - "more than 1 column and to be in a " - "DataFrame.") -SERIES_NO_DTINDEX_ERROR = ("Got Series for sid {sid}, but index was not " - "DatetimeIndex.") - -DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid {sid}.\n" - "Pass `infer_timestamps=True` to use the first " - "date in `all_dates` as implicit timestamp.") - -DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '{" - "timestamp_column_name}' column for sid {sid}." - "\nPass `infer_timestamps=True` to use the " - "first date in `all_dates` as implicit " - "timestamp.") +def validate_column_specs(events, next_value_columns, previous_value_columns): + """ + Verify that the columns of ``events`` can be used by an EventsLoader to + serve the BoundColumns described by ``next_value_columns`` and + ``previous_value_columns``. + """ + required = { + TS_FIELD_NAME, + SID_FIELD_NAME, + EVENT_DATE_FIELD_NAME, + }.union( + # We also expect any of the field names that our loadable columns + # are mapped to. + viewvalues(next_value_columns), + viewvalues(previous_value_columns), + ) + received = set(events.columns) + missing = required - received + if missing: + raise ValueError( + "EventsLoader missing required columns {missing}.\n" + "Got Columns: {received}\n" + "Expected Columns: {required}".format( + missing=sorted(missing), + received=sorted(received), + required=sorted(required), + ) + ) class EventsLoader(PipelineLoader): """ - Abstract loader. + Base class for PipelineLoaders that supports loading the next and previous + value of an event field. - Does not currently support adjustments to the dates of known events. + Does not currently support adjustments. Parameters ---------- - all_dates : pd.DatetimeIndex - Index of dates for which we can serve queries. - events_by_sid : dict[int -> pd.DataFrame or pd.Series or pd.DatetimeIndex] - Dict mapping sids to objects representing dates on which earnings - occurred. + events : pd.DataFrame + A DataFrame representing events (e.g. share buybacks or + earnings announcements) associated with particular companies. - If a dict value is a Series, it's interpreted as a mapping from the - date on which we learned an announcement was coming to the date on - which the announcement was made. + ``events`` must contain at least three columns:: + sid : int64 + The asset id associated with each event. - If a dict value is a DatetimeIndex, it's interpreted as just containing - the dates that announcements were made, and we assume we knew about the - announcement on all prior dates. This mode is only supported if - ``infer_timestamp`` is explicitly passed as a truthy value. - Dict mapping sids to DataFrames, Series, or DatetimeIndexes. + event_date : datetime64[ns] + The date on which the event occurred. - If the value is a DataFrame, it then represents dates on which events - occurred along with other associated values. If the DataFrame - contains a "timestamp" column, that column is interpreted as the date - on which we learned about the event. If the DataFrames do not contain a - "timestamp" column, we assume we knew about the event on all prior - dates. This mode is only supported if ``infer_timestamp`` is - explicitly passed as a truthy value. + timestamp : datetime64[ns] + The date on which we learned about the event. - infer_timestamps : bool, optional - Whether to allow omitting the "timestamp" column. - dataset : DataSet - The DataSet object for which this loader loads data. + next_value_columns : dict[BoundColumn -> str] + Map from dataset columns to raw field names that should be used when + searching for a next event value. + previous_value_columns : dict[BoundColumn -> str] + Map from dataset columns to raw field names that should be used when + searching for a previous event value. """ - - @abc.abstractproperty - def expected_cols(self): - raise NotImplemented('expected_cols') - - @abc.abstractproperty - def event_date_col(self): - raise NotImplemented('event_date_col') - def __init__(self, - all_dates, - events_by_sid, - infer_timestamps=False, - dataset=None): - self.all_dates = all_dates - # Do not modify the original in place, since it may be used for other - # purposes. - self.events_by_sid = ( - events_by_sid.copy() + events, + next_value_columns, + previous_value_columns): + validate_column_specs( + events, + next_value_columns, + previous_value_columns, ) - dates = self.all_dates.values - for k, v in iteritems(events_by_sid): - # Already a DataFrame - if isinstance(v, pd.DataFrame): - if TS_FIELD_NAME not in v.columns: - if not infer_timestamps: - raise ValueError( - DF_NO_TS_NOT_INFER_TS_ERROR.format( - timestamp_column_name=TS_FIELD_NAME, - sid=k - ) - ) - self.events_by_sid[k] = v = v.copy() - v.index = [dates[0]] * len(v) - else: - self.events_by_sid[k] = v.set_index(TS_FIELD_NAME) - # Once data is in a DF, make sure columns are correct. - cols_except_ts = (set(v.columns) - - {TS_FIELD_NAME}) + events = events[events[EVENT_DATE_FIELD_NAME].notnull()] - # Check that all columns other than timestamp are as expected. - if cols_except_ts != self.expected_cols: - raise ValueError( - WRONG_COLS_ERROR.format( - expected_columns=list(self.expected_cols), - sid=k, - resulting_columns=v.columns.values - ) - ) - # Not a DataFrame and we only expect 1 column - elif len(self.expected_cols) == 1: - # First, must convert to DataFrame. - if isinstance(v, pd.Series): - if not isinstance(v.index, pd.DatetimeIndex): - raise ValueError( - SERIES_NO_DTINDEX_ERROR.format(sid=k) - ) - self.events_by_sid[k] = pd.DataFrame({ - list(self.expected_cols)[0]: v}) - elif isinstance(v, pd.DatetimeIndex): - if not infer_timestamps: - raise ValueError( - DTINDEX_NOT_INFER_TS_ERROR.format(sid=k) - ) - self.events_by_sid[k] = pd.DataFrame({ - list(self.expected_cols)[0]: v - }, index=[dates[0]] * len(v)) - else: - # We expect 1 column, but we got something other than a - # Series, DatetimeIndex, or DataFrame. - raise ValueError( - WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=k) - ) - else: - # We expected multiple columns, but we got something other - # than a DataFrame. - raise ValueError( - WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=k) - ) - self.events_by_sid = {sid: df.dropna(subset=[self.event_date_col]) for - sid, df in self.events_by_sid.items()} - self.dataset = dataset + # We always work with entries from ``events`` directly as numpy arrays, + # so we coerce from a frame here. + self.events = { + name: np.asarray(series) + for name, series in events.sort(EVENT_DATE_FIELD_NAME).iteritems() + } - def get_loader(self, column): - if column in self.dataset.columns: - return getattr(self, "%s_loader" % column.name) - raise ValueError("Don't know how to load column '%s'." % column) + # Columns to load with self.load_next_events. + self.next_value_columns = next_value_columns - def load_adjusted_array(self, columns, dates, assets, mask): - return merge( - self.get_loader(column).load_adjusted_array( - [column], dates, assets, mask + # Columns to load with self.load_previous_events. + self.previous_value_columns = previous_value_columns + + def split_next_and_previous_event_columns(self, requested_columns): + """ + Split requested columns into columns that should load the next known + value and columns that should load the previous known value. + + Parameters + ---------- + requested_columns : iterable[BoundColumn] + + Returns + ------- + next_cols, previous_cols : iterable[BoundColumn], iterable[BoundColumn] + ``requested_columns``, partitioned into sub-sequences based on + whether the column should produce values from the next event or the + previous event + """ + def next_or_previous(c): + if c in self.next_value_columns: + return 'next' + elif c in self.previous_value_columns: + return 'previous' + + raise ValueError( + "{c} not found in next_value_columns " + "or previous_value_columns".format(c=c) ) - for column in columns + groups = groupby(next_or_previous, requested_columns) + return groups.get('next', ()), groups.get('previous', ()) + + def next_event_indexer(self, dates, sids): + return next_event_indexer( + dates, + sids, + self.events[EVENT_DATE_FIELD_NAME], + self.events[TS_FIELD_NAME], + self.events[SID_FIELD_NAME], ) - def _next_event_date_loader(self, next_date_field): - return DataFrameLoader( - next_date_field, - next_event_frame( - self.events_by_sid, - self.all_dates, - next_date_field.missing_value, - next_date_field.dtype, - self.event_date_col, - self.event_date_col - ), - adjustments=None, + def previous_event_indexer(self, dates, sids): + return previous_event_indexer( + dates, + sids, + self.events[EVENT_DATE_FIELD_NAME], + self.events[TS_FIELD_NAME], + self.events[SID_FIELD_NAME], ) - def _next_event_value_loader(self, - next_value_field, - value_field_name): - return DataFrameLoader( - next_value_field, - next_event_frame( - self.events_by_sid, - self.all_dates, - next_value_field.missing_value, - next_value_field.dtype, - self.event_date_col, - value_field_name - ), - adjustments=None, + def load_next_events(self, columns, dates, sids, mask): + if not columns: + return {} + + return self._load_events( + name_map=self.next_value_columns, + indexer=self.next_event_indexer(dates, sids), + columns=columns, + dates=dates, + sids=sids, + mask=mask, ) - def _previous_event_date_loader(self, - prev_date_field): - return DataFrameLoader( - prev_date_field, - previous_event_frame( - self.events_by_sid, - self.all_dates, - NaTD, - 'datetime64[ns]', - self.event_date_col, - self.event_date_col - ), - adjustments=None, + def load_previous_events(self, columns, dates, sids, mask): + if not columns: + return {} + + return self._load_events( + name_map=self.previous_value_columns, + indexer=self.previous_event_indexer(dates, sids), + columns=columns, + dates=dates, + sids=sids, + mask=mask, ) - def _previous_event_value_loader(self, - previous_value_field, - value_field_name): - return DataFrameLoader( - previous_value_field, - previous_event_frame( - self.events_by_sid, - self.all_dates, - previous_value_field.missing_value, - previous_value_field.dtype, - self.event_date_col, - value_field_name - ), - adjustments=None, + def _load_events(self, name_map, indexer, columns, dates, sids, mask): + def to_frame(array): + return pd.DataFrame(array, index=dates, columns=sids) + + out = {} + for c in columns: + raw = self.events[name_map[c]][indexer] + # indexer will be -1 for locations where we don't have a known + # value. + raw[indexer < 0] = c.missing_value + + # Delegate the actual array formatting logic to a DataFrameLoader. + loader = DataFrameLoader(c, to_frame(raw), adjustments=None) + out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c] + return out + + def load_adjusted_array(self, columns, dates, sids, mask): + n, p = self.split_next_and_previous_event_columns(columns) + return merge( + self.load_next_events(n, dates, sids, mask), + self.load_previous_events(p, dates, sids, mask), ) diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 1eb5b371..6036d421 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -2,150 +2,131 @@ import datetime import numpy as np import pandas as pd -from six import iteritems -from six.moves import zip - -from zipline.utils.numpy_utils import categorical_dtype, NaTns from zipline.utils.pandas_utils import mask_between_time -def next_event_frame(events_by_sid, - dates, - missing_value, - field_dtype, - event_date_field_name, - return_field_name): +def is_sorted_ascending(a): + """Check if a numpy array is sorted.""" + return (np.fmax.accumulate(a) <= a).all() + + +def validate_event_metadata(event_dates, + event_timestamps, + event_sids): + assert is_sorted_ascending(event_dates), "event dates must be sorted" + assert len(event_sids) == len(event_dates) == len(event_timestamps), \ + "mismatched arrays: %d != %d != %d" % ( + len(event_sids), + len(event_dates), + len(event_timestamps), + ) + + +def next_event_indexer(all_dates, + all_sids, + event_dates, + event_timestamps, + event_sids): """ - Make a DataFrame representing the simulated next known dates or values - for an event. + Construct an index array that, when applied to an array of values, produces + a 2D array containing the values associated with the next event for each + sid at each moment in time. + + Locations where no next event was known will be filled with -1. Parameters ---------- - dates : pd.DatetimeIndex. - The index of the returned DataFrame. - events_by_sid : dict[int -> pd.Series] - Dict mapping sids to a series of dates. Each k:v pair of the series - represents the date we learned of the event mapping to the date the - event will occur. - event_date_field_name : str - The name of the date field that marks when the event occurred. + all_dates : ndarray[datetime64[ns], ndim=1] + Row labels for the target output. + all_sids : ndarray[int, ndim=1] + Column labels for the target output. + event_dates : ndarray[datetime64[ns], ndim=1] + Dates on which each input events occurred/will occur. ``event_dates`` + must be in sorted order, and may not contain any NaT values. + event_timestamps : ndarray[datetime64[ns], ndim=1] + Dates on which we learned about each input event. + event_sids : ndarray[int, ndim=1] + Sids assocated with each input event. Returns ------- - next_events: pd.DataFrame - A DataFrame where each column is a security from `events_by_sid` where - the values are the dates of the next known event with the knowledge we - had on the date of the index. Entries falling after the last date will - have `NaT` as the result in the output. - - - See Also - -------- - previous_date_frame + indexer : ndarray[int, ndim=2] + An array of shape (len(all_dates), len(all_sids)) of indices into + ``event_{dates,timestamps,sids}``. """ - date_cols = { - equity: np.full_like(dates, NaTns) for equity in events_by_sid - } - value_cols = { - equity: np.full(len(dates), missing_value, dtype=field_dtype) - for equity in events_by_sid - } + validate_event_metadata(event_dates, event_timestamps, event_sids) + out = np.full((len(all_dates), len(all_sids)), -1, dtype=np.int64) - raw_dates = dates.values - for equity, df in iteritems(events_by_sid): - event_dates = df[event_date_field_name] - values = df[return_field_name] - data = date_cols[equity] - if not event_dates.index.is_monotonic_increasing: - event_dates = event_dates.sort_index() + sid_ixs = all_sids.searchsorted(event_sids) + # side='right' here ensures that we include the event date itself + # if it's in all_dates. + dt_ixs = all_dates.searchsorted(event_dates, side='right') + ts_ixs = all_dates.searchsorted(event_timestamps) - # Iterate over the raw Series values, since we're comparing against - # numpy arrays anyway. - iter_date_vals = zip(event_dates.index.values, event_dates.values, - values) - for knowledge_date, event_date, value in iter_date_vals: - date_mask = ( - (knowledge_date <= raw_dates) & - (raw_dates <= event_date) - ) - value_mask = (event_date <= data) | (data == NaTns) - data_indices = np.where(date_mask & value_mask) - data[data_indices] = event_date - value_cols[equity][data_indices] = value - return pd.DataFrame(index=dates, data=value_cols) + # Walk backward through the events, writing the index of the event into + # slots ranging from the event's timestamp to its asof. This depends for + # correctness on the fact that event_dates is sorted in ascending order, + # because we need to overwrite later events with earlier ones if their + # eligible windows overlap. + for i in range(len(event_sids) - 1, -1, -1): + start_ix = ts_ixs[i] + end_ix = dt_ixs[i] + out[start_ix:end_ix, sid_ixs[i]] = i + + return out -def previous_event_frame(events_by_sid, - date_index, - missing_value, - field_dtype, - event_date_field, - previous_return_field): +def previous_event_indexer(all_dates, + all_sids, + event_dates, + event_timestamps, + event_sids): """ - Make a DataFrame representing simulated previous dates or values for an - event. + Construct an index array that, when applied to an array of values, produces + a 2D array containing the values associated with the previous event for + each sid at each moment in time. + + Locations where no previous event was known will be filled with -1. Parameters ---------- - events_by_sid : dict[int -> DatetimeIndex] - Dict mapping sids to a series of dates. Each k:v pair of the series - represents the date we learned of the event mapping to the date the - event will occur. - date_index : DatetimeIndex. - The index of the returned DataFrame. - missing_value : any - Data which missing values should be filled with. - field_dtype: any - The dtype of the field for which the previous values are being - retrieved. - event_date_field: str - The name of the date field that marks when the event occurred. - return_field: str - The name of the field for which the previous values are being - retrieved. + all_dates : ndarray[datetime64[ns], ndim=1] + Row labels for the target output. + all_sids : ndarray[int, ndim=1] + Column labels for the target output. + event_dates : ndarray[datetime64[ns], ndim=1] + Dates on which each input events occurred/will occur. ``event_dates`` + must be in sorted order, and may not contain any NaT values. + event_timestamps : ndarray[datetime64[ns], ndim=1] + Dates on which we learned about each input event. + event_sids : ndarray[int, ndim=1] + Sids assocated with each input event. Returns ------- - previous_events: pd.DataFrame - A DataFrame where each column is a security from `events_by_sid` and - the values are the values for the previous event that occurred on the - date of the index. Entries falling before the first date will have - `missing_value` filled in as the result in the output. - - See Also - -------- - next_date_frame + indexer : ndarray[int, ndim=2] + An array of shape (len(all_dates), len(all_sids)) of indices into + ``event_{dates,timestamps,sids}``. """ - sids = list(events_by_sid) - populate_value = None if field_dtype == categorical_dtype else \ - missing_value - out = np.full( - (len(date_index), len(sids)), - populate_value, - dtype=field_dtype - ) - d_n = date_index[-1].asm8 - for col_idx, sid in enumerate(sids): - # events_by_sid[sid] is a DataFrame mapping knowledge_date to event - # date and values. - df = events_by_sid[sid] - df = df[df[event_date_field] <= d_n] - event_date_vals = df[event_date_field].values - # Get knowledge dates corresponding to the values in which we are - # interested - kd_vals = df[df[event_date_field] <= d_n].index.values - # The date at which a previous event is first known is the max of the - # kd and the event date. - index_dates = np.maximum(kd_vals, event_date_vals) - out[ - date_index.searchsorted(index_dates), col_idx - ] = df[previous_return_field] + validate_event_metadata(event_dates, event_timestamps, event_sids) + out = np.full((len(all_dates), len(all_sids)), -1, dtype=np.int64) - frame = pd.DataFrame(out, index=date_index, columns=sids) - frame.ffill(inplace=True) - if field_dtype == categorical_dtype: - frame[frame.isnull()] = missing_value - return frame + eff_dts = np.maximum(event_dates, event_timestamps) + sid_ixs = all_sids.searchsorted(event_sids) + dt_ixs = all_dates.searchsorted(eff_dts) + + # Walk backwards through the events, writing the index of the event into + # slots ranging from max(event_date, event_timestamp) to the start of the + # previously-written event. This depends for correctness on the fact that + # event_dates is sorted in ascending order, because we need to have written + # later events so we know where to stop forward-filling earlier events. + last_written = {} + for i in range(len(event_dates) - 1, -1, -1): + sid_ix = sid_ixs[i] + dt_ix = dt_ixs[i] + out[dt_ix:last_written.get(sid_ix, None), sid_ix] = i + last_written[sid_ix] = dt_ix + return out def normalize_data_query_time(dt, time, tz): @@ -287,66 +268,3 @@ def check_data_query_args(data_query_time, data_query_tz): data_query_tz, ), ) - - -def zip_with_floats(dates, flts): - return pd.Series(flts, index=dates, dtype='float') - - -def zip_with_strs(dates, strs): - return pd.Series(strs, index=dates, dtype='object') - - -def zip_with_dates(index_dates, dts): - return pd.Series(pd.to_datetime(dts), index=index_dates) - - -def get_values_for_date_ranges(zip_date_index_with_vals, - vals_for_date_intervals, - starts, - ends, - date_index): - """ - Returns a Series of values indexed by date based on the intervals defined - by the start and end dates. - - Parameters - ---------- - zip_date_index_with_vals : callable - A function that takes in a list of dates and a list of values and - returns a pd.Series with the values indexed by the dates. - vals_for_date_intervals : list - A list of values for each date interval in `date_intervals`. - starts : DatetimeIndex - A DatetimeIndex of start dates. - ends : list - A DatetimeIndex of end dates. - date_index : DatetimeIndex - The DatetimeIndex containing all dates for which values were requested. - - Returns - ------- - date_index_with_vals : pd.Series - A Series indexed by the given DatetimeIndex and with values assigned - to dates based on the given date intervals. - """ - # Fill in given values for given date ranges. - end_indexes = date_index.values.searchsorted(ends) - start_indexes = date_index.values.searchsorted(starts) - num_days = (end_indexes - start_indexes) + 1 - - # In case any of the end dates falls on days missing from the date_index, - # searchsorted will have placed their index within `date_index` to the - # index of the next start date, so we will have added 1 extra day for - # each of these. Subtract those extra days, but ignore any cases where the - # start and end dates are equal. Note: if any of the start dates is - # missing, it won't affect calculations because searchsorted will advance - # the index to the next date within the same range. - num_days[np.where(~np.in1d(ends, date_index) & (num_days != 0))] -= 1 - return zip_date_index_with_vals( - date_index, - np.repeat( - vals_for_date_intervals, - num_days, - ) - ) diff --git a/zipline/testing/__init__.py b/zipline/testing/__init__.py index c52ff03e..66b5e5b9 100644 --- a/zipline/testing/__init__.py +++ b/zipline/testing/__init__.py @@ -23,7 +23,6 @@ from .core import ( # noqa empty_asset_finder, empty_assets_db, empty_trading_env, - gen_calendars, make_test_handler, make_trade_data_for_asset_info, parameter_space, diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index 01bc9de5..69c9851f 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -1,14 +1,10 @@ -from abc import ABCMeta, abstractproperty import sqlite3 from unittest import TestCase from contextlib2 import ExitStack from logbook import NullHandler, Logger -from nose_parameterized import parameterized -from pandas.util.testing import assert_series_equal from six import with_metaclass from toolz import flip -import numpy as np import pandas as pd import responses @@ -36,17 +32,9 @@ from ..finance.trading import TradingEnvironment from ..utils import factory from ..utils.classproperty import classproperty from ..utils.final import FinalMeta, final -from ..utils.metautils import with_metaclasses -from .core import tmp_asset_finder, make_simple_equity_info, gen_calendars -from zipline.pipeline import Pipeline, SimplePipelineEngine +from .core import tmp_asset_finder, make_simple_equity_info +from zipline.pipeline import SimplePipelineEngine from zipline.pipeline.loaders.testing import make_seeded_random_loader -from zipline.utils.numpy_utils import make_datetime64D -from zipline.utils.numpy_utils import NaTD -from zipline.pipeline.common import TS_FIELD_NAME -from zipline.pipeline.loaders.utils import ( - get_values_for_date_ranges, - zip_with_dates -) from zipline.utils.calendars import ( get_calendar, ExchangeTradingSchedule, @@ -908,182 +896,6 @@ class WithAdjustmentReader(WithBcolzDailyBarReader): cls.adjustment_reader = SQLiteAdjustmentReader(conn) -class WithPipelineEventDataLoader( - with_metaclasses((type(ZiplineTestCase), ABCMeta), WithAssetFinder)): - """ - ZiplineTestCase mixin providing common test methods/behaviors for event - data loaders. - - Attributes - ---------- - loader_type : PipelineLoader - The type of loader to use. This must be overridden by subclasses. - - Methods - ------- - get_sids() -> iterable[int] - Class method which returns the sids that need to be available to the - tests. - get_dataset() -> dict[int -> pd.DataFrmae] - Class method which returns a mapping from sid to data for that sid. - By default this is empty for every sid. - pipeline_event_loader_args(dates: pd.DatetimeIndex) -> tuple[any] - The arguments to pass to the ``loader_type`` to construct the pipeline - loader for this test. - """ - @classmethod - def get_sids(cls): - return range(0, 5) - - @classmethod - def get_dataset(cls): - return {sid: pd.DataFrame() for sid in cls.get_sids()} - - @abstractproperty - def loader_type(self): - raise NotImplementedError('loader_type') - - @classmethod - def make_equity_info(cls): - return make_simple_equity_info( - cls.get_sids(), - start_date=pd.Timestamp('2013-01-01', tz='UTC'), - end_date=pd.Timestamp('2015-01-01', tz='UTC'), - ) - - def pipeline_event_loader_args(self, dates): - """Construct the base object to pass to the loader. - - Parameters - ---------- - dates : pd.DatetimeIndex - The dates we can serve. - - Returns - ------- - args : tuple[any] - The arguments to forward to the loader positionally. - """ - return dates, self.get_dataset() - - def pipeline_event_setup_engine(self, dates): - """ - Make a Pipeline Enigne object based on the given dates. - """ - loader = self.loader_type(*self.pipeline_event_loader_args(dates)) - return SimplePipelineEngine(lambda _: loader, dates, self.asset_finder) - - def get_sids_to_frames(self, - zip_date_index_with_vals, - vals, - date_intervals, - dates, - dtype_name, - missing_dtype): - """ - Construct a DataFrame that maps sid to the expected values for the - given dates. - - Parameters - ---------- - zip_date_index_with_vals: callable - A function that returns a series of `vals` repeated based on the - number of days in the date interval for each val, indexed by the - dates in `dates`. - vals: iterable - An iterable with values that correspond to each interval in - `date_intervals`. - date_intervals: list - A list of date intervals for each sid that correspond to values in - `vals`. - dates: DatetimeIndex - The dates which will serve as the index for each Series for each - sid in the DataFrame. - dtype_name: str - The name of the dtype of the values in `vals`. - missing_dtype: str - The name of the value that should be used as the missing value - for the dtype of `vals` - e.g., 'NaN' for floats. - """ - frame = pd.DataFrame({sid: get_values_for_date_ranges( - zip_date_index_with_vals, - vals[sid], - pd.DatetimeIndex(list(zip(*date_intervals[sid]))[0]), - pd.DatetimeIndex(list(zip(*date_intervals[sid]))[1]), - dates - ).astype(dtype_name) for sid in self.get_sids()[:-1]}) - frame[self.get_sids()[-1]] = zip_date_index_with_vals( - dates, [missing_dtype] * len(dates) - ).astype(dtype_name) - return frame - - @staticmethod - def _compute_busday_offsets(announcement_dates): - """ - Compute expected business day offsets from a DataFrame of announcement - dates. - """ - # Column-vector of dates on which factor `compute` will be called. - raw_call_dates = announcement_dates.index.values.astype( - 'datetime64[D]' - )[:, None] - - # 2D array of dates containining expected nexg announcement. - raw_announce_dates = ( - announcement_dates.values.astype('datetime64[D]') - ) - - # Set NaTs to 0 temporarily because busday_count doesn't support NaT. - # We fill these entries with NaNs later. - whereNaT = raw_announce_dates == NaTD - raw_announce_dates[whereNaT] = make_datetime64D(0) - - # The abs call here makes it so that we can use this function to - # compute offsets for both next and previous earnings (previous - # earnings offsets come back negative). - expected = abs(np.busday_count( - raw_call_dates, - raw_announce_dates - ).astype(float)) - - expected[whereNaT] = np.nan - return pd.DataFrame( - data=expected, - columns=announcement_dates.columns, - index=announcement_dates.index, - ) - - @parameterized.expand(gen_calendars( - '2014-01-01', - '2014-01-31', - critical_dates=pd.to_datetime([ - '2014-01-05', - '2014-01-10', - '2014-01-15', - '2014-01-20', - ], utc=True), - )) - def test_compute(self, dates): - engine = self.pipeline_event_setup_engine(dates) - cols = self.setup(dates) - - pipe = Pipeline( - columns=self.pipeline_columns - ) - - result = engine.run_pipeline( - pipe, - start_date=dates[0], - end_date=dates[-1], - ) - - for sid in self.get_sids(): - for col_name in cols.keys(): - assert_series_equal(result[col_name].unstack(1)[sid], - cols[col_name][sid], - check_names=False) - - class WithSeededRandomPipelineEngine(WithNYSETradingDays, WithAssetFinder): """ ZiplineTestCase mixin providing class-level fixtures for running pipelines @@ -1239,127 +1051,3 @@ class WithResponses(object): self.responses = self.enter_instance_context( responses.RequestsMock(), ) - - -class WithNextAndPreviousEventDataLoader(WithPipelineEventDataLoader): - """ - ZiplineTestCase mixin extending common functionality for event data - loader tests that have both next and previous events. - - `base_cases` should be used as the template to test cases that combine - knowledge date (timestamp) and some 'other_date' in various ways. - `next_date_intervals` gives the date intervals for the next event based - on the dates given in `base_cases`. - `next_dates` gives the next date from `other_date` which is known about at - each interval. - `prev_date_intervals` gives the date intervals for each sid for the - previous event based on the dates given in `base_cases`. - `prev_dates` gives the previous date from `other_date` which is known - about at each interval. - `get_expected_previous_event_dates` is a convenience function that fills - a DataFrame with the previously known dates for each sid for the given - dates. - `get_expected_next_event_dates` is a convenience function that fills - a DataFrame with the next known dates for each sid for the given - dates. - """ - base_cases = [ - # K1--K2--A1--A2. - pd.DataFrame({ - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), - 'other_date': pd.to_datetime(['2014-01-15', '2014-01-20']), - }), - # K1--K2--A2--A1. - pd.DataFrame({ - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), - 'other_date': pd.to_datetime(['2014-01-20', '2014-01-15']), - }), - # K1--A1--K2--A2. - pd.DataFrame({ - TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']), - 'other_date': pd.to_datetime(['2014-01-10', '2014-01-20']), - }), - # K1 == K2. - pd.DataFrame({ - TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2), - 'other_date': pd.to_datetime(['2014-01-10', '2014-01-15']), - }), - pd.DataFrame( - columns=['other_date', - TS_FIELD_NAME], - dtype='datetime64[ns]' - ), - ] - - next_date_intervals = [ - [['2014-01-01', '2014-01-04'], - ['2014-01-05', '2014-01-15'], - ['2014-01-16', '2014-01-20'], - ['2014-01-21', '2014-01-31']], - [['2014-01-01', '2014-01-04'], - ['2014-01-05', '2014-01-09'], - ['2014-01-10', '2014-01-15'], - ['2014-01-16', '2014-01-20'], - ['2014-01-21', '2014-01-31']], - [['2014-01-01', '2014-01-04'], - ['2014-01-05', '2014-01-10'], - ['2014-01-11', '2014-01-14'], - ['2014-01-15', '2014-01-20'], - ['2014-01-21', '2014-01-31']], - [['2014-01-01', '2014-01-04'], - ['2014-01-05', '2014-01-10'], - ['2014-01-11', '2014-01-15'], - ['2014-01-16', '2014-01-31']] - ] - - next_dates = [ - ['NaT', '2014-01-15', '2014-01-20', 'NaT'], - ['NaT', '2014-01-20', '2014-01-15', '2014-01-20', 'NaT'], - ['NaT', '2014-01-10', 'NaT', '2014-01-20', 'NaT'], - ['NaT', '2014-01-10', '2014-01-15', 'NaT'], - ['NaT'] - ] - - prev_date_intervals = [ - [['2014-01-01', '2014-01-14'], - ['2014-01-15', '2014-01-19'], - ['2014-01-20', '2014-01-31']], - [['2014-01-01', '2014-01-14'], - ['2014-01-15', '2014-01-19'], - ['2014-01-20', '2014-01-31']], - [['2014-01-01', '2014-01-09'], - ['2014-01-10', '2014-01-19'], - ['2014-01-20', '2014-01-31']], - [['2014-01-01', '2014-01-09'], - ['2014-01-10', '2014-01-14'], - ['2014-01-15', '2014-01-31']] - ] - - prev_dates = [ - ['NaT', '2014-01-15', '2014-01-20'], - ['NaT', '2014-01-15', '2014-01-20'], - ['NaT', '2014-01-10', '2014-01-20'], - ['NaT', '2014-01-10', '2014-01-15'], - ['NaT'] - ] - - def get_expected_previous_event_dates(self, dates, dtype_name, - missing_dtype): - return self.get_sids_to_frames( - zip_with_dates, - self.prev_dates, - self.prev_date_intervals, - dates, - dtype_name, - missing_dtype - ) - - def get_expected_next_event_dates(self, dates, dtype_name, missing_dtype): - return self.get_sids_to_frames( - zip_with_dates, - self.next_dates, - self.next_date_intervals, - dates, - dtype_name, - missing_dtype - ) diff --git a/zipline/testing/predicates.py b/zipline/testing/predicates.py index b83bdbf8..3bc35346 100644 --- a/zipline/testing/predicates.py +++ b/zipline/testing/predicates.py @@ -1,3 +1,4 @@ +import datetime from functools import partial import inspect @@ -339,6 +340,46 @@ def assert_adjustment_equal(result, expected, path=(), **kwargs): ) +@assert_equal.register( + (datetime.datetime, np.datetime64), + (datetime.datetime, np.datetime64), +) +def assert_timestamp_and_datetime_equal(result, + expected, + path=(), + msg='', + allow_datetime_coercions=False, + compare_nat_equal=True, + **kwargs): + """ + Branch for comparing python datetime (which includes pandas Timestamp) and + np.datetime64 as equal. + + Returns raises unless ``allow_datetime_coercions`` is passed as True. + """ + assert allow_datetime_coercions or type(result) == type(expected), ( + "%sdatetime types (%s, %s) don't match and " + "allow_datetime_coercions was not set.\n%s" % ( + _fmt_msg(msg), + type(result), + type(expected), + _fmt_path(path), + ) + ) + + result = pd.Timestamp(result) + expected = pd.Timestamp(result) + if compare_nat_equal and pd.isnull(result) and pd.isnull(expected): + return + + assert_equal.dispatch(object, object)( + result, + expected, + path=path, + **kwargs + ) + + try: # pull the dshape cases in from datashape.util.testing import assert_dshape_equal