From 8a28e82d322123a45a8cfd550ac73ed582fc553f Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Wed, 9 Mar 2016 10:29:10 -0500 Subject: [PATCH] ENH: add dividends to pipeline MAINT: remove record date - not needed. MAINT: restructure dividends dataset. MAINT: restructure dividends factors. WIP: update dividends tests. MAINT: correct the way to get the 'next' event frame. --- tests/pipeline/test_dividends.py | 542 ++++++++++++++++++++ tests/pipeline/test_earnings.py | 3 + zipline/pipeline/common.py | 12 + zipline/pipeline/data/dividends.py | 25 + zipline/pipeline/factors/events.py | 49 ++ zipline/pipeline/loaders/blaze/dividends.py | 223 ++++++++ zipline/pipeline/loaders/dividends.py | 116 +++++ zipline/pipeline/loaders/events.py | 26 +- zipline/pipeline/loaders/utils.py | 32 +- 9 files changed, 1016 insertions(+), 12 deletions(-) create mode 100644 tests/pipeline/test_dividends.py create mode 100644 zipline/pipeline/data/dividends.py create mode 100644 zipline/pipeline/loaders/blaze/dividends.py create mode 100644 zipline/pipeline/loaders/dividends.py diff --git a/tests/pipeline/test_dividends.py b/tests/pipeline/test_dividends.py new file mode 100644 index 00000000..099bf67e --- /dev/null +++ b/tests/pipeline/test_dividends.py @@ -0,0 +1,542 @@ +""" +Tests for the reference loader for Dividends datasets. +""" +from functools import partial +from unittest import TestCase + +import blaze as bz +from blaze.compute.core import swap_resources_into_scope +from contextlib2 import ExitStack +import itertools +import pandas as pd +from six import iteritems +from tests.pipeline.base import EventLoaderCommonMixin + +from zipline.pipeline.common import ( + ANNOUNCEMENT_FIELD_NAME, + DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT, + DAYS_SINCE_PREV_EX_DATE, + DAYS_TO_NEXT_EX_DATE, + NEXT_AMOUNT, + NEXT_EX_DATE, + NEXT_PAY_DATE, + PREVIOUS_ANNOUNCEMENT, + PREVIOUS_EX_DATE, + PREVIOUS_PAY_DATE, + PREVIOUS_AMOUNT, + SID_FIELD_NAME, + TS_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME, + EX_DATE_FIELD_NAME, + PAY_DATE_FIELD_NAME +) +from zipline.pipeline.data.dividends import DividendsByAnnouncementDate, \ + DividendsByExDate, DividendsByPayDate +from zipline.pipeline.factors.events import ( + BusinessDaysSinceDividendAnnouncement, + BusinessDaysSincePreviousExDate, + BusinessDaysUntilNextExDate +) +from zipline.pipeline.loaders.blaze.dividends import \ + BlazeDividendsByAnnouncementDateLoader, BlazeDividendsByPayDateLoader, \ + BlazeDividendsByExDateLoader +from zipline.pipeline.loaders.dividends import DividendsByAnnouncementDateLoader, \ + DividendsByExDateLoader, DividendsByPayDateLoader +from zipline.utils.test_utils import ( + make_simple_equity_info, + tmp_asset_finder, +) + + +dividends_cases = [ + # K1--K2--A1--A2. + pd.DataFrame({ + CASH_AMOUNT_FIELD_NAME: [1, 15], + EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']), + PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']), + TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09']) + }), + # K1--K2--A2--A1. + pd.DataFrame({ + CASH_AMOUNT_FIELD_NAME: [7, 13], + EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']), + PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']), + TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09']) + }), + # K1--A1--K2--A2. + pd.DataFrame({ + CASH_AMOUNT_FIELD_NAME: [3, 1], + EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']), + PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']), + TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-14']) + }), + # K1 == K2. + pd.DataFrame({ + CASH_AMOUNT_FIELD_NAME: [6, 23], + EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']), + PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']), + TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-04']) + }), + pd.DataFrame( + columns=[CASH_AMOUNT_FIELD_NAME, + EX_DATE_FIELD_NAME, + PAY_DATE_FIELD_NAME, + TS_FIELD_NAME, + ANNOUNCEMENT_FIELD_NAME], + dtype='datetime64[ns]' + ), +] + +prev_date_intervals = [ + [ + [None, '2014-01-14'], ['2014-01-15', '2014-01-19'], + ['2014-01-20', None] + ], + [ + [None, '2014-01-14'], ['2014-01-15', '2014-01-19'], + ['2014-01-20', None] + ], + [ + [None, '2014-01-09'], ['2014-01-10', '2014-01-19'], + ['2014-01-20', None] + ], + [ + [None, '2014-01-09'], ['2014-01-10', '2014-01-14'], + ['2014-01-15', None] + ] + ] + +next_date_intervals = [ + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-15'], + ['2014-01-16', '2014-01-20'], ['2014-01-21', None] + ], + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-09'], + ['2014-01-10', '2014-01-15'], ['2014-01-16', '2014-01-20'], + ['2014-01-21', None] + ], + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-10'], + ['2014-01-11', '2014-01-14'], ['2014-01-15', '2014-01-20'], + ['2014-01-21', None] + ], + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-10'], + ['2014-01-11', '2014-01-15'], ['2014-01-16', None] + ] +] + +next_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20', 'NaT'], + ['NaT', '2014-01-20', '2014-01-15', '2014-01-20', + 'NaT'], + ['NaT', '2014-01-10', 'NaT', '2014-01-20', 'NaT'], + ['NaT', '2014-01-10', '2014-01-15', 'NaT']] + +prev_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20'], + ['NaT', '2014-01-15', '2014-01-20'], + ['NaT', '2014-01-10', '2014-01-20'], + ['NaT', '2014-01-10', '2014-01-15']] + +prev_amounts = [['NaN', 1, 15], + ['NaN', 13, 7], + ['NaN', 3, 1], + ['NaN', 6, 23]] + +next_amounts = [['NaN', 1, 15, 'NaN'], + ['NaN', 7, 13, 7, 'NaN'], + ['NaN', 3, 'NaN', 1, 'NaN'], + ['NaN', 6, 23, 'NaN']] + + +def get_values_for_date_ranges(zip_vals_dates, + num_days_between_dates, + vals_for_date_intervals, + date_intervals): + # Fill in given values for given date ranges. + return zip_vals_dates( + list( + itertools.chain(*[ + [val] * num_days_between_dates(*date_intervals[i]) + for i, val in enumerate(vals_for_date_intervals) + ]) + ) + ) + + +def get_vals_for_dates(zip_with_floats_dates, + num_days_between_dates, + dates, + date_invervals, + vals): + return pd.DataFrame({ + 0: get_values_for_date_ranges(zip_with_floats_dates, + num_days_between_dates, + vals[0], + date_invervals[0]), + 1: get_values_for_date_ranges(zip_with_floats_dates, + num_days_between_dates, + vals[1], + date_invervals[1]), + 2: get_values_for_date_ranges(zip_with_floats_dates, + num_days_between_dates, + vals[2], + date_invervals[2]), + # Assume the latest of 2 cash values is used if we find out about 2 + # announcements that happened on the same day for the same sid. + 3: get_values_for_date_ranges(zip_with_floats_dates, + num_days_between_dates, + vals[3], + date_invervals[3]), + 4: zip_with_floats_dates(['NaN'] * len(dates)), + }, index=dates) + + +class DividendsByAnnouncementDateTestCase(TestCase, EventLoaderCommonMixin): + """ + Tests for loading the dividends by announcement date data. + """ + pipeline_columns = { + PREVIOUS_ANNOUNCEMENT: + DividendsByAnnouncementDate.previous_announcement_date.latest, + PREVIOUS_AMOUNT: DividendsByAnnouncementDate.previous_amount.latest, + DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT: + BusinessDaysSinceDividendAnnouncement(), + } + + @classmethod + def get_sids(cls): + return range(0, 5) + + @classmethod + def setUpClass(cls): + cls._cleanup_stack = stack = ExitStack() + equity_info = make_simple_equity_info( + cls.get_sids(), + start_date=pd.Timestamp('2013-01-01', tz='UTC'), + end_date=pd.Timestamp('2015-01-01', tz='UTC'), + ) + cls.cols = {} + cls.dataset = {sid: + frame.drop([EX_DATE_FIELD_NAME, + PAY_DATE_FIELD_NAME], axis=1) + for sid, frame + in enumerate(dividends_cases)} + cls.finder = stack.enter_context( + tmp_asset_finder(equities=equity_info), + ) + + cls.loader_type = DividendsByAnnouncementDateLoader + + @classmethod + def tearDownClass(cls): + cls._cleanup_stack.close() + + def setup(self, dates): + zip_with_floats_dates = partial(self.zip_with_floats, dates) + num_days_between_dates = partial(self.num_days_between, dates) + num_days_between_for_dates = partial(self.num_days_between, dates) + zip_with_dates_for_dates = partial(self.zip_with_dates, dates) + date_intervals = [ + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-09'], + ['2014-01-10', None] + ], + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-09'], + ['2014-01-10', None] + ], + [ + [None, '2014-01-04'], ['2014-01-05', '2014-01-14'], + ['2014-01-15', None] + ], + [ + [None, '2014-01-04'], ['2014-01-05', None] + ] + ] + announcement_dates = [['NaT', '2014-01-04', '2014-01-09'], + ['NaT', '2014-01-04', '2014-01-09'], + ['NaT', '2014-01-04', '2014-01-14'], + ['NaT', '2014-01-04']] + amounts = [['NaN', 1, 15], ['NaN', 7, 13], ['NaN', 3, 1], ['NaN', 23]] + + self.cols[PREVIOUS_ANNOUNCEMENT] = get_vals_for_dates( + zip_with_dates_for_dates, num_days_between_for_dates, dates, + date_intervals, announcement_dates + ) + + self.cols[PREVIOUS_AMOUNT] = get_vals_for_dates( + zip_with_floats_dates, num_days_between_dates, dates, + date_intervals, amounts + ) + + self.cols[ + DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT + ] = self._compute_busday_offsets(self.cols[PREVIOUS_ANNOUNCEMENT]) + + +class BlazeDividendsByAnnouncementDateTestCase( + DividendsByAnnouncementDateTestCase +): + @classmethod + def setUpClass(cls): + super(BlazeDividendsByAnnouncementDateTestCase, cls).setUpClass() + cls.loader_type = BlazeDividendsByAnnouncementDateLoader + + def loader_args(self, dates): + _, mapping = super( + BlazeDividendsByAnnouncementDateTestCase, + self, + ).loader_args(dates) + return (bz.Data(pd.concat( + pd.DataFrame({ + ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME], + TS_FIELD_NAME: df[TS_FIELD_NAME], + SID_FIELD_NAME: sid, + CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME] + }) + for sid, df in iteritems(mapping) + ).reset_index(drop=True)),) + + +class BlazeDividendsByAnnouncementDateNotInteractiveTestCase( + BlazeDividendsByAnnouncementDateTestCase): + """Test case for passing a non-interactive symbol and a dict of resources. + """ + @classmethod + def setUpClass(cls): + super(BlazeDividendsByAnnouncementDateNotInteractiveTestCase, + cls).setUpClass() + cls.loader_type = BlazeDividendsByAnnouncementDateLoader + + def loader_args(self, dates): + (bound_expr,) = super( + BlazeDividendsByAnnouncementDateNotInteractiveTestCase, + self, + ).loader_args(dates) + return swap_resources_into_scope(bound_expr, {}) + + +class DividendsByExDateTestCase(TestCase, EventLoaderCommonMixin): + """ + Tests for loading the dividends by ex date data. + """ + pipeline_columns = { + NEXT_EX_DATE: DividendsByExDate.previous_ex_date.latest, + PREVIOUS_EX_DATE: DividendsByExDate.next_ex_date.latest, + NEXT_AMOUNT: DividendsByExDate.next_amount.latest, + PREVIOUS_AMOUNT: DividendsByExDate.previous_amount.latest, + DAYS_TO_NEXT_EX_DATE: BusinessDaysUntilNextExDate(), + DAYS_SINCE_PREV_EX_DATE: BusinessDaysSincePreviousExDate() + } + + @classmethod + def get_sids(cls): + return range(0, 5) + + @classmethod + def setUpClass(cls): + cls._cleanup_stack = stack = ExitStack() + equity_info = make_simple_equity_info( + cls.get_sids(), + start_date=pd.Timestamp('2013-01-01', tz='UTC'), + end_date=pd.Timestamp('2015-01-01', tz='UTC'), + ) + cls.cols = {} + cls.dataset = {sid: + frame.drop([ANNOUNCEMENT_FIELD_NAME, + PAY_DATE_FIELD_NAME], axis=1) + for sid, frame + in enumerate(dividends_cases)} + cls.finder = stack.enter_context( + tmp_asset_finder(equities=equity_info), + ) + + cls.loader_type = DividendsByExDateLoader + + @classmethod + def tearDownClass(cls): + cls._cleanup_stack.close() + + def setup(self, dates): + zip_with_floats_dates = partial(self.zip_with_floats, dates) + num_days_between_dates = partial(self.num_days_between, dates) + num_days_between_for_dates = partial(self.num_days_between, dates) + zip_with_dates_for_dates = partial(self.zip_with_dates, dates) + + self.cols[NEXT_EX_DATE] = get_vals_for_dates( + zip_with_dates_for_dates, num_days_between_for_dates, dates, + next_date_intervals, next_ex_and_pay_dates + ) + + self.cols[PREVIOUS_EX_DATE] = get_vals_for_dates( + zip_with_dates_for_dates, num_days_between_for_dates, dates, + prev_date_intervals, prev_ex_and_pay_dates + ) + + self.cols[NEXT_AMOUNT] = get_vals_for_dates( + zip_with_floats_dates, num_days_between_dates, + dates, next_date_intervals, next_amounts + ) + + self.cols[PREVIOUS_AMOUNT] = get_vals_for_dates( + zip_with_floats_dates, num_days_between_dates, + dates, prev_date_intervals, prev_amounts + ) + + self.cols[DAYS_TO_NEXT_EX_DATE] = self._compute_busday_offsets( + self.cols[NEXT_EX_DATE] + ) + + self.cols[DAYS_SINCE_PREV_EX_DATE] = self._compute_busday_offsets( + self.cols[PREVIOUS_EX_DATE] + ) + + +class BlazeDividendsByExDateLoaderTestCase(DividendsByExDateTestCase): + @classmethod + def setUpClass(cls): + super(BlazeDividendsByExDateLoaderTestCase, cls).setUpClass() + cls.loader_type = BlazeDividendsByExDateLoader + + def loader_args(self, dates): + _, mapping = super( + BlazeDividendsByExDateLoaderTestCase, + self, + ).loader_args(dates) + return (bz.Data(pd.concat( + pd.DataFrame({ + EX_DATE_FIELD_NAME: df[EX_DATE_FIELD_NAME], + TS_FIELD_NAME: df[TS_FIELD_NAME], + SID_FIELD_NAME: sid, + CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME] + }) + for sid, df in iteritems(mapping) + ).reset_index(drop=True)),) + + +class BlazeDividendsByExDateLoaderNotInteractiveTestCase( + BlazeDividendsByExDateLoaderTestCase): + """Test case for passing a non-interactive symbol and a dict of resources. + """ + @classmethod + def setUpClass(cls): + super(BlazeDividendsByExDateLoaderNotInteractiveTestCase, + cls).setUpClass() + cls.loader_type = DividendsByExDateLoader + + def loader_args(self, dates): + (bound_expr,) = super( + BlazeDividendsByExDateLoaderNotInteractiveTestCase, + self, + ).loader_args(dates) + return swap_resources_into_scope(bound_expr, {}) + + +class DividendsByPayDateTestCase(TestCase, EventLoaderCommonMixin): + """ + Tests for loading the dividends by pay date data. + """ + pipeline_columns = { + NEXT_PAY_DATE: DividendsByPayDate.next_pay_date.latest, + PREVIOUS_PAY_DATE: DividendsByPayDate.previous_pay_date.latest, + NEXT_AMOUNT: DividendsByPayDate.next_amount.latest, + PREVIOUS_AMOUNT: DividendsByPayDate.previous_amount.latest, + } + + @classmethod + def get_sids(cls): + return range(0, 5) + + @classmethod + def setUpClass(cls): + cls._cleanup_stack = stack = ExitStack() + equity_info = make_simple_equity_info( + cls.get_sids(), + start_date=pd.Timestamp('2013-01-01', tz='UTC'), + end_date=pd.Timestamp('2015-01-01', tz='UTC'), + ) + cls.cols = {} + cls.dataset = {sid: + frame.drop([ANNOUNCEMENT_FIELD_NAME, + EX_DATE_FIELD_NAME], axis=1) + for sid, frame + in enumerate(dividends_cases)} + cls.finder = stack.enter_context( + tmp_asset_finder(equities=equity_info), + ) + + cls.loader_type = DividendsByPayDateLoader + + @classmethod + def tearDownClass(cls): + cls._cleanup_stack.close() + + def setup(self, dates): + zip_with_floats_dates = partial(self.zip_with_floats, dates) + num_days_between_dates = partial(self.num_days_between, dates) + num_days_between_for_dates = partial(self.num_days_between, dates) + zip_with_dates_for_dates = partial(self.zip_with_dates, dates) + + self.cols[NEXT_PAY_DATE] = get_vals_for_dates( + zip_with_dates_for_dates, num_days_between_for_dates, dates, + next_date_intervals, next_ex_and_pay_dates + ) + self.cols[PREVIOUS_PAY_DATE] = get_vals_for_dates( + zip_with_dates_for_dates, num_days_between_for_dates, dates, + prev_date_intervals, prev_ex_and_pay_dates + ) + + self.cols[NEXT_AMOUNT] = get_vals_for_dates( + zip_with_floats_dates, num_days_between_dates, + dates, next_date_intervals, next_amounts + ) + + self.cols[PREVIOUS_AMOUNT] = get_vals_for_dates( + zip_with_floats_dates, num_days_between_dates, + dates, prev_date_intervals, prev_amounts + ) + + +class BlazeDividendsByPayDateLoaderTestCase(DividendsByPayDateTestCase): + @classmethod + def setUpClass(cls): + super(BlazeDividendsByPayDateLoaderTestCase, cls).setUpClass() + cls.loader_type = BlazeDividendsByPayDateLoader + + def loader_args(self, dates): + _, mapping = super( + BlazeDividendsByPayDateLoaderTestCase, + self, + ).loader_args(dates) + return (bz.Data(pd.concat( + pd.DataFrame({ + PAY_DATE_FIELD_NAME: df[PAY_DATE_FIELD_NAME], + TS_FIELD_NAME: df[TS_FIELD_NAME], + SID_FIELD_NAME: sid, + CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME] + }) + for sid, df in iteritems(mapping) + ).reset_index(drop=True)),) + + +class BlazeDividendsByPayDateLoaderNotInteractiveTestCase( + BlazeDividendsByPayDateLoaderTestCase): + """Test case for passing a non-interactive symbol and a dict of resources. + """ + @classmethod + def setUpClass(cls): + super(BlazeDividendsByPayDateLoaderNotInteractiveTestCase, + cls).setUpClass() + cls.loader_type = BlazeDividendsByPayDateLoader + + def loader_args(self, dates): + (bound_expr,) = super( + BlazeDividendsByPayDateLoaderNotInteractiveTestCase, + self, + ).loader_args(dates) + return swap_resources_into_scope(bound_expr, {}) diff --git a/tests/pipeline/test_earnings.py b/tests/pipeline/test_earnings.py index 18ec67ca..c67bfa03 100644 --- a/tests/pipeline/test_earnings.py +++ b/tests/pipeline/test_earnings.py @@ -29,6 +29,9 @@ from zipline.pipeline.loaders.earnings import EarningsCalendarLoader from zipline.pipeline.loaders.blaze import ( BlazeEarningsCalendarLoader, ) +from zipline.utils.test_utils import ( + tmp_asset_finder, +) from zipline.testing import tmp_asset_finder diff --git a/zipline/pipeline/common.py b/zipline/pipeline/common.py index de225409..aa71d3a9 100644 --- a/zipline/pipeline/common.py +++ b/zipline/pipeline/common.py @@ -4,14 +4,26 @@ Common constants for Pipeline. AD_FIELD_NAME = 'asof_date' ANNOUNCEMENT_FIELD_NAME = 'announcement_date' CASH_FIELD_NAME = 'cash' +CASH_AMOUNT_FIELD_NAME = 'cash_amount' BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date' DAYS_SINCE_PREV = 'days_since_prev' +DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT = 'days_since_prev_dividend_announcement' +DAYS_SINCE_PREV_EX_DATE = 'days_since_prev_ex_date' DAYS_TO_NEXT = 'days_to_next' +DAYS_TO_NEXT_EX_DATE = 'days_to_next_ex_date' +EX_DATE_FIELD_NAME = 'ex_date' +NEXT_AMOUNT = 'next_amount' NEXT_ANNOUNCEMENT = 'next_announcement' +NEXT_EX_DATE = 'next_ex_date' +NEXT_PAY_DATE = 'next_pay_date' +PAY_DATE_FIELD_NAME = 'pay_date' +PREVIOUS_AMOUNT = 'previous_amount' PREVIOUS_ANNOUNCEMENT = 'previous_announcement' PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement' PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash' PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count' +PREVIOUS_EX_DATE = 'previous_ex_date' +PREVIOUS_PAY_DATE = 'previous_pay_date' SHARE_COUNT_FIELD_NAME = 'share_count' SID_FIELD_NAME = 'sid' TS_FIELD_NAME = 'timestamp' diff --git a/zipline/pipeline/data/dividends.py b/zipline/pipeline/data/dividends.py new file mode 100644 index 00000000..b959e2a9 --- /dev/null +++ b/zipline/pipeline/data/dividends.py @@ -0,0 +1,25 @@ +""" +Dataset representing dates of upcoming dividends. +""" +from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype + +from .dataset import Column, DataSet + + +class DividendsByExDate(DataSet): + next_ex_date = Column(datetime64ns_dtype) + previous_ex_date = Column(datetime64ns_dtype) + next_amount = Column(float64_dtype) + previous_amount = Column(float64_dtype) + + +class DividendsByPayDate(DataSet): + next_pay_date = Column(datetime64ns_dtype) + previous_pay_date = Column(datetime64ns_dtype) + next_amount = Column(float64_dtype) + previous_amount = Column(float64_dtype) + + +class DividendsByAnnouncementDate(DataSet): + previous_announcement_date = Column(datetime64ns_dtype) + previous_amount = Column(float64_dtype) diff --git a/zipline/pipeline/factors/events.py b/zipline/pipeline/factors/events.py index a64fe3e2..100b7bfc 100644 --- a/zipline/pipeline/factors/events.py +++ b/zipline/pipeline/factors/events.py @@ -7,6 +7,10 @@ from zipline.pipeline.data.buyback_auth import ( CashBuybackAuthorizations, ShareBuybackAuthorizations ) +from zipline.pipeline.data.dividends import ( + DividendsByAnnouncementDate, + DividendsByExDate +) from zipline.pipeline.data.earnings import EarningsCalendar from zipline.utils.numpy_utils import ( NaTD, @@ -156,3 +160,48 @@ class BusinessDaysSinceShareBuybackAuth( zipline.pipeline.factors.BusinessDaysSinceShareBuybackAuth """ inputs = [ShareBuybackAuthorizations.announcement_date] + + +class BusinessDaysSinceDividendAnnouncement( + BusinessDaysSincePreviousEvents +): + """ + Factor returning the number of **business days** (not trading days!) since + the most recent dividend announcement for each asset. + + + See Also + -------- + zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement + """ + inputs = [DividendsByAnnouncementDate.previous_announcement_date] + + +class BusinessDaysUntilNextExDate( + BusinessDaysUntilNextEvents +): + """ + Factor returning the number of **business days** (not trading days!) until + the next ex date for each asset. + + + See Also + -------- + zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement + """ + inputs = [DividendsByExDate.next_ex_date] + + +class BusinessDaysSincePreviousExDate( + BusinessDaysSincePreviousEvents +): + """ + Factor returning the number of **business days** (not trading days!) since + the most recent ex date for each asset. + + + See Also + -------- + zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement + """ + inputs = [DividendsByExDate.previous_ex_date] diff --git a/zipline/pipeline/loaders/blaze/dividends.py b/zipline/pipeline/loaders/blaze/dividends.py new file mode 100644 index 00000000..9971511e --- /dev/null +++ b/zipline/pipeline/loaders/blaze/dividends.py @@ -0,0 +1,223 @@ +from zipline.pipeline.common import ( + ANNOUNCEMENT_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME, + EX_DATE_FIELD_NAME, + PAY_DATE_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.data.dividends import DividendsByExDate, \ + DividendsByAnnouncementDate, DividendsByPayDate +from zipline.pipeline.loaders.dividends import DividendsByAnnouncementDateLoader, \ + DividendsByPayDateLoader, DividendsByExDateLoader +from .events import BlazeEventsLoader + + +class BlazeDividendsByAnnouncementDateLoader(BlazeEventsLoader): + """A pipeline loader for the ``DividendsByAnnouncementDate`` dataset that + loads data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + dataset: DataSet + The DataSet object for which this loader loads data. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {CASH_AMOUNT_FIELD_NAME}: ?datetime, + {ANNOUNCEMENT_FIELD_NAME}: ?datetime, + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the announcement, the + date when the dividends will be announced, and the cash amount. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME, + ANNOUNCEMENT_FIELD_NAME=ANNOUNCEMENT_FIELD_NAME + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME, + ANNOUNCEMENT_FIELD_NAME + }) + + concrete_loader = DividendsByAnnouncementDateLoader + + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=DividendsByAnnouncementDate, + **kwargs): + super( + BlazeDividendsByAnnouncementDateLoader, self + ).__init__(expr, dataset=dataset, + resources=resources, odo_kwargs=odo_kwargs, + data_query_time=data_query_time, + data_query_tz=data_query_tz, **kwargs) + + +class BlazeDividendsByExDateLoader(BlazeEventsLoader): + """A pipeline loader for the ``DividendsByExDate`` dataset that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + dataset: DataSet + The DataSet object for which this loader loads data. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {EX_DATE_FIELD_NAME}: ?datetime, + {CASH_AMOUNT_FIELD_NAME}: ?datetime, + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the ex date, the + ex date, and the associated cash amount. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + EX_DATE_FIELD_NAME=EX_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME, + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + EX_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME, + }) + + concrete_loader = DividendsByExDateLoader + + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=DividendsByExDate, + **kwargs): + super( + BlazeDividendsByExDateLoader, self + ).__init__(expr, dataset=dataset, + resources=resources, odo_kwargs=odo_kwargs, + data_query_time=data_query_time, + data_query_tz=data_query_tz, **kwargs) + + +class BlazeDividendsByPayDateLoader(BlazeEventsLoader): + """A pipeline loader for the ``DividendsByPayDate`` dataset that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + dataset: DataSet + The DataSet object for which this loader loads data. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {PAY_DATE_FIELD_NAME}: ?datetime, + {CASH_AMOUNT_FIELD_NAME}: ?datetime, + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the pay date, the pay date, + and the associated cash amount. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + PAY_DATE_FIELD_NAME=PAY_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME, + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + PAY_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME, + }) + + concrete_loader = DividendsByPayDateLoader + + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=DividendsByPayDate, + **kwargs): + super( + BlazeDividendsByPayDateLoader, self + ).__init__(expr, dataset=dataset, + resources=resources, odo_kwargs=odo_kwargs, + data_query_time=data_query_time, + data_query_tz=data_query_tz, **kwargs) diff --git a/zipline/pipeline/loaders/dividends.py b/zipline/pipeline/loaders/dividends.py new file mode 100644 index 00000000..4f80bf65 --- /dev/null +++ b/zipline/pipeline/loaders/dividends.py @@ -0,0 +1,116 @@ +from zipline.pipeline.common import ( + EX_DATE_FIELD_NAME, + PAY_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME, + ANNOUNCEMENT_FIELD_NAME +) +from zipline.pipeline.loaders.events import EventsLoader +from zipline.pipeline.data.dividends import ( + DividendsByExDate, + DividendsByAnnouncementDate, + DividendsByPayDate +) +from zipline.utils.memoize import lazyval + + +class DividendsByAnnouncementDateLoader(EventsLoader): + expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME]) + + def __init__(self, all_dates, events_by_sid, + infer_timestamps=False, + dataset=DividendsByAnnouncementDate): + super(DividendsByAnnouncementDateLoader, self).__init__( + all_dates, events_by_sid, infer_timestamps, dataset=dataset, + ) + + @lazyval + def previous_announcement_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement_date, + ANNOUNCEMENT_FIELD_NAME + ) + + @lazyval + def previous_amount_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_amount, + ANNOUNCEMENT_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME + ) + + +class DividendsByPayDateLoader(EventsLoader): + expected_cols = frozenset([PAY_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME]) + + def __init__(self, all_dates, events_by_sid, + infer_timestamps=False, + dataset=DividendsByPayDate): + super(DividendsByPayDateLoader, self).__init__( + all_dates, events_by_sid, infer_timestamps, dataset=dataset, + ) + + @lazyval + def next_pay_date_loader(self): + return self._next_event_date_loader(self.dataset.next_pay_date, + PAY_DATE_FIELD_NAME) + + @lazyval + def previous_pay_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_pay_date, + PAY_DATE_FIELD_NAME + ) + + @lazyval + def next_amount_loader(self): + return self._next_event_value_loader(self.dataset.next_amount, + PAY_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME) + + @lazyval + def previous_amount_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_amount, + PAY_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME + ) + + +class DividendsByExDateLoader(EventsLoader): + expected_cols = frozenset([EX_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME]) + + def __init__(self, all_dates, events_by_sid, + infer_timestamps=False, + dataset=DividendsByExDate): + super(DividendsByExDateLoader, self).__init__( + all_dates, events_by_sid, infer_timestamps, dataset=dataset, + ) + + @lazyval + def next_ex_date_loader(self): + return self._next_event_date_loader(self.dataset.next_ex_date, + EX_DATE_FIELD_NAME) + + @lazyval + def previous_ex_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_ex_date, + EX_DATE_FIELD_NAME + ) + + @lazyval + def next_amount_loader(self): + return self._next_event_value_loader(self.dataset.next_amount, + EX_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME) + + @lazyval + def previous_amount_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_amount, + EX_DATE_FIELD_NAME, + CASH_AMOUNT_FIELD_NAME + ) diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index 27cebd50..e88d7966 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -5,7 +5,7 @@ from toolz import merge from .base import PipelineLoader from .frame import DataFrameLoader -from .utils import previous_event_frame, next_date_frame +from .utils import previous_event_frame, next_event_frame from zipline.pipeline.common import TS_FIELD_NAME from zipline.utils.numpy_utils import NaTD @@ -167,14 +167,34 @@ class EventsLoader(PipelineLoader): def _next_event_date_loader(self, next_date_field, event_date_field_name): return DataFrameLoader( next_date_field, - next_date_frame( - self.all_dates, + next_event_frame( self.events_by_sid, + self.all_dates, + next_date_field.missing_value, + next_date_field.dtype, + event_date_field_name, event_date_field_name ), adjustments=None, ) + def _next_event_value_loader(self, + next_value_field, + event_date_field_name, + value_field_name): + return DataFrameLoader( + next_value_field, + next_event_frame( + self.events_by_sid, + self.all_dates, + next_value_field.missing_value, + next_value_field.dtype, + event_date_field_name, + value_field_name + ), + adjustments=None, + ) + def _previous_event_date_loader(self, prev_date_field, event_date_field_name): diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 8b2cc9a2..494c4992 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -8,9 +8,15 @@ from six.moves import zip from zipline.utils.numpy_utils import NaTns -def next_date_frame(dates, events_by_sid, event_date_field_name): +def next_event_frame(events_by_sid, + dates, + missing_value, + field_dtype, + event_date_field_name, + return_field_name): """ - Make a DataFrame representing the simulated next known date for an event. + Make a DataFrame representing the simulated next known dates or values + for an event. Parameters ---------- @@ -36,28 +42,36 @@ def next_date_frame(dates, events_by_sid, event_date_field_name): -------- previous_date_frame """ - cols = { + date_cols = { equity: np.full_like(dates, NaTns) for equity in events_by_sid } + value_cols = { + equity: np.full(len(dates), missing_value, dtype=field_dtype) for equity + in + events_by_sid + } + raw_dates = dates.values for equity, df in iteritems(events_by_sid): event_dates = df[event_date_field_name] - data = cols[equity] + values = df[return_field_name] + data = date_cols[equity] if not event_dates.index.is_monotonic_increasing: event_dates = event_dates.sort_index() # Iterate over the raw Series values, since we're comparing against # numpy arrays anyway. - iterkv = zip(event_dates.index.values, event_dates.values) - for knowledge_date, event_date in iterkv: + iterkv = zip(event_dates.index.values, event_dates.values, values) + for knowledge_date, event_date, value in iterkv: date_mask = ( (knowledge_date <= raw_dates) & (raw_dates <= event_date) ) value_mask = (event_date <= data) | (data == NaTns) - data[date_mask & value_mask] = event_date - - return pd.DataFrame(index=dates, data=cols) + data_indeces = np.where(date_mask & value_mask) + data[data_indeces] = event_date + value_cols[equity][data_indeces] = value + return pd.DataFrame(index=dates, data=value_cols) def previous_event_frame(events_by_sid,