ENH: add dividends to pipeline

MAINT: remove record date - not needed.

MAINT: restructure dividends dataset.

MAINT: restructure dividends factors.

WIP: update dividends tests.

MAINT: correct the way to get the 'next' event frame.
This commit is contained in:
Maya Tydykov
2016-03-09 10:29:10 -05:00
parent 8c059e6da6
commit 8a28e82d32
9 changed files with 1016 additions and 12 deletions
+542
View File
@@ -0,0 +1,542 @@
"""
Tests for the reference loader for Dividends datasets.
"""
from functools import partial
from unittest import TestCase
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
from contextlib2 import ExitStack
import itertools
import pandas as pd
from six import iteritems
from tests.pipeline.base import EventLoaderCommonMixin
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT,
DAYS_SINCE_PREV_EX_DATE,
DAYS_TO_NEXT_EX_DATE,
NEXT_AMOUNT,
NEXT_EX_DATE,
NEXT_PAY_DATE,
PREVIOUS_ANNOUNCEMENT,
PREVIOUS_EX_DATE,
PREVIOUS_PAY_DATE,
PREVIOUS_AMOUNT,
SID_FIELD_NAME,
TS_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME
)
from zipline.pipeline.data.dividends import DividendsByAnnouncementDate, \
DividendsByExDate, DividendsByPayDate
from zipline.pipeline.factors.events import (
BusinessDaysSinceDividendAnnouncement,
BusinessDaysSincePreviousExDate,
BusinessDaysUntilNextExDate
)
from zipline.pipeline.loaders.blaze.dividends import \
BlazeDividendsByAnnouncementDateLoader, BlazeDividendsByPayDateLoader, \
BlazeDividendsByExDateLoader
from zipline.pipeline.loaders.dividends import DividendsByAnnouncementDateLoader, \
DividendsByExDateLoader, DividendsByPayDateLoader
from zipline.utils.test_utils import (
make_simple_equity_info,
tmp_asset_finder,
)
dividends_cases = [
# K1--K2--A1--A2.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [1, 15],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09'])
}),
# K1--K2--A2--A1.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [7, 13],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09'])
}),
# K1--A1--K2--A2.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [3, 1],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-14'])
}),
# K1 == K2.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [6, 23],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-04'])
}),
pd.DataFrame(
columns=[CASH_AMOUNT_FIELD_NAME,
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME,
TS_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME],
dtype='datetime64[ns]'
),
]
prev_date_intervals = [
[
[None, '2014-01-14'], ['2014-01-15', '2014-01-19'],
['2014-01-20', None]
],
[
[None, '2014-01-14'], ['2014-01-15', '2014-01-19'],
['2014-01-20', None]
],
[
[None, '2014-01-09'], ['2014-01-10', '2014-01-19'],
['2014-01-20', None]
],
[
[None, '2014-01-09'], ['2014-01-10', '2014-01-14'],
['2014-01-15', None]
]
]
next_date_intervals = [
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-15'],
['2014-01-16', '2014-01-20'], ['2014-01-21', None]
],
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-15'], ['2014-01-16', '2014-01-20'],
['2014-01-21', None]
],
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-10'],
['2014-01-11', '2014-01-14'], ['2014-01-15', '2014-01-20'],
['2014-01-21', None]
],
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-10'],
['2014-01-11', '2014-01-15'], ['2014-01-16', None]
]
]
next_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20', 'NaT'],
['NaT', '2014-01-20', '2014-01-15', '2014-01-20',
'NaT'],
['NaT', '2014-01-10', 'NaT', '2014-01-20', 'NaT'],
['NaT', '2014-01-10', '2014-01-15', 'NaT']]
prev_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20'],
['NaT', '2014-01-15', '2014-01-20'],
['NaT', '2014-01-10', '2014-01-20'],
['NaT', '2014-01-10', '2014-01-15']]
prev_amounts = [['NaN', 1, 15],
['NaN', 13, 7],
['NaN', 3, 1],
['NaN', 6, 23]]
next_amounts = [['NaN', 1, 15, 'NaN'],
['NaN', 7, 13, 7, 'NaN'],
['NaN', 3, 'NaN', 1, 'NaN'],
['NaN', 6, 23, 'NaN']]
def get_values_for_date_ranges(zip_vals_dates,
num_days_between_dates,
vals_for_date_intervals,
date_intervals):
# Fill in given values for given date ranges.
return zip_vals_dates(
list(
itertools.chain(*[
[val] * num_days_between_dates(*date_intervals[i])
for i, val in enumerate(vals_for_date_intervals)
])
)
)
def get_vals_for_dates(zip_with_floats_dates,
num_days_between_dates,
dates,
date_invervals,
vals):
return pd.DataFrame({
0: get_values_for_date_ranges(zip_with_floats_dates,
num_days_between_dates,
vals[0],
date_invervals[0]),
1: get_values_for_date_ranges(zip_with_floats_dates,
num_days_between_dates,
vals[1],
date_invervals[1]),
2: get_values_for_date_ranges(zip_with_floats_dates,
num_days_between_dates,
vals[2],
date_invervals[2]),
# Assume the latest of 2 cash values is used if we find out about 2
# announcements that happened on the same day for the same sid.
3: get_values_for_date_ranges(zip_with_floats_dates,
num_days_between_dates,
vals[3],
date_invervals[3]),
4: zip_with_floats_dates(['NaN'] * len(dates)),
}, index=dates)
class DividendsByAnnouncementDateTestCase(TestCase, EventLoaderCommonMixin):
"""
Tests for loading the dividends by announcement date data.
"""
pipeline_columns = {
PREVIOUS_ANNOUNCEMENT:
DividendsByAnnouncementDate.previous_announcement_date.latest,
PREVIOUS_AMOUNT: DividendsByAnnouncementDate.previous_amount.latest,
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT:
BusinessDaysSinceDividendAnnouncement(),
}
@classmethod
def get_sids(cls):
return range(0, 5)
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
equity_info = make_simple_equity_info(
cls.get_sids(),
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
cls.cols = {}
cls.dataset = {sid:
frame.drop([EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME], axis=1)
for sid, frame
in enumerate(dividends_cases)}
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
)
cls.loader_type = DividendsByAnnouncementDateLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
num_days_between_for_dates = partial(self.num_days_between, dates)
zip_with_dates_for_dates = partial(self.zip_with_dates, dates)
date_intervals = [
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', None]
],
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', None]
],
[
[None, '2014-01-04'], ['2014-01-05', '2014-01-14'],
['2014-01-15', None]
],
[
[None, '2014-01-04'], ['2014-01-05', None]
]
]
announcement_dates = [['NaT', '2014-01-04', '2014-01-09'],
['NaT', '2014-01-04', '2014-01-09'],
['NaT', '2014-01-04', '2014-01-14'],
['NaT', '2014-01-04']]
amounts = [['NaN', 1, 15], ['NaN', 7, 13], ['NaN', 3, 1], ['NaN', 23]]
self.cols[PREVIOUS_ANNOUNCEMENT] = get_vals_for_dates(
zip_with_dates_for_dates, num_days_between_for_dates, dates,
date_intervals, announcement_dates
)
self.cols[PREVIOUS_AMOUNT] = get_vals_for_dates(
zip_with_floats_dates, num_days_between_dates, dates,
date_intervals, amounts
)
self.cols[
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT
] = self._compute_busday_offsets(self.cols[PREVIOUS_ANNOUNCEMENT])
class BlazeDividendsByAnnouncementDateTestCase(
DividendsByAnnouncementDateTestCase
):
@classmethod
def setUpClass(cls):
super(BlazeDividendsByAnnouncementDateTestCase, cls).setUpClass()
cls.loader_type = BlazeDividendsByAnnouncementDateLoader
def loader_args(self, dates):
_, mapping = super(
BlazeDividendsByAnnouncementDateTestCase,
self,
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME]
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeDividendsByAnnouncementDateNotInteractiveTestCase(
BlazeDividendsByAnnouncementDateTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
@classmethod
def setUpClass(cls):
super(BlazeDividendsByAnnouncementDateNotInteractiveTestCase,
cls).setUpClass()
cls.loader_type = BlazeDividendsByAnnouncementDateLoader
def loader_args(self, dates):
(bound_expr,) = super(
BlazeDividendsByAnnouncementDateNotInteractiveTestCase,
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class DividendsByExDateTestCase(TestCase, EventLoaderCommonMixin):
"""
Tests for loading the dividends by ex date data.
"""
pipeline_columns = {
NEXT_EX_DATE: DividendsByExDate.previous_ex_date.latest,
PREVIOUS_EX_DATE: DividendsByExDate.next_ex_date.latest,
NEXT_AMOUNT: DividendsByExDate.next_amount.latest,
PREVIOUS_AMOUNT: DividendsByExDate.previous_amount.latest,
DAYS_TO_NEXT_EX_DATE: BusinessDaysUntilNextExDate(),
DAYS_SINCE_PREV_EX_DATE: BusinessDaysSincePreviousExDate()
}
@classmethod
def get_sids(cls):
return range(0, 5)
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
equity_info = make_simple_equity_info(
cls.get_sids(),
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
cls.cols = {}
cls.dataset = {sid:
frame.drop([ANNOUNCEMENT_FIELD_NAME,
PAY_DATE_FIELD_NAME], axis=1)
for sid, frame
in enumerate(dividends_cases)}
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
)
cls.loader_type = DividendsByExDateLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
num_days_between_for_dates = partial(self.num_days_between, dates)
zip_with_dates_for_dates = partial(self.zip_with_dates, dates)
self.cols[NEXT_EX_DATE] = get_vals_for_dates(
zip_with_dates_for_dates, num_days_between_for_dates, dates,
next_date_intervals, next_ex_and_pay_dates
)
self.cols[PREVIOUS_EX_DATE] = get_vals_for_dates(
zip_with_dates_for_dates, num_days_between_for_dates, dates,
prev_date_intervals, prev_ex_and_pay_dates
)
self.cols[NEXT_AMOUNT] = get_vals_for_dates(
zip_with_floats_dates, num_days_between_dates,
dates, next_date_intervals, next_amounts
)
self.cols[PREVIOUS_AMOUNT] = get_vals_for_dates(
zip_with_floats_dates, num_days_between_dates,
dates, prev_date_intervals, prev_amounts
)
self.cols[DAYS_TO_NEXT_EX_DATE] = self._compute_busday_offsets(
self.cols[NEXT_EX_DATE]
)
self.cols[DAYS_SINCE_PREV_EX_DATE] = self._compute_busday_offsets(
self.cols[PREVIOUS_EX_DATE]
)
class BlazeDividendsByExDateLoaderTestCase(DividendsByExDateTestCase):
@classmethod
def setUpClass(cls):
super(BlazeDividendsByExDateLoaderTestCase, cls).setUpClass()
cls.loader_type = BlazeDividendsByExDateLoader
def loader_args(self, dates):
_, mapping = super(
BlazeDividendsByExDateLoaderTestCase,
self,
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
EX_DATE_FIELD_NAME: df[EX_DATE_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME]
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeDividendsByExDateLoaderNotInteractiveTestCase(
BlazeDividendsByExDateLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
@classmethod
def setUpClass(cls):
super(BlazeDividendsByExDateLoaderNotInteractiveTestCase,
cls).setUpClass()
cls.loader_type = DividendsByExDateLoader
def loader_args(self, dates):
(bound_expr,) = super(
BlazeDividendsByExDateLoaderNotInteractiveTestCase,
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class DividendsByPayDateTestCase(TestCase, EventLoaderCommonMixin):
"""
Tests for loading the dividends by pay date data.
"""
pipeline_columns = {
NEXT_PAY_DATE: DividendsByPayDate.next_pay_date.latest,
PREVIOUS_PAY_DATE: DividendsByPayDate.previous_pay_date.latest,
NEXT_AMOUNT: DividendsByPayDate.next_amount.latest,
PREVIOUS_AMOUNT: DividendsByPayDate.previous_amount.latest,
}
@classmethod
def get_sids(cls):
return range(0, 5)
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
equity_info = make_simple_equity_info(
cls.get_sids(),
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
cls.cols = {}
cls.dataset = {sid:
frame.drop([ANNOUNCEMENT_FIELD_NAME,
EX_DATE_FIELD_NAME], axis=1)
for sid, frame
in enumerate(dividends_cases)}
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
)
cls.loader_type = DividendsByPayDateLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
num_days_between_for_dates = partial(self.num_days_between, dates)
zip_with_dates_for_dates = partial(self.zip_with_dates, dates)
self.cols[NEXT_PAY_DATE] = get_vals_for_dates(
zip_with_dates_for_dates, num_days_between_for_dates, dates,
next_date_intervals, next_ex_and_pay_dates
)
self.cols[PREVIOUS_PAY_DATE] = get_vals_for_dates(
zip_with_dates_for_dates, num_days_between_for_dates, dates,
prev_date_intervals, prev_ex_and_pay_dates
)
self.cols[NEXT_AMOUNT] = get_vals_for_dates(
zip_with_floats_dates, num_days_between_dates,
dates, next_date_intervals, next_amounts
)
self.cols[PREVIOUS_AMOUNT] = get_vals_for_dates(
zip_with_floats_dates, num_days_between_dates,
dates, prev_date_intervals, prev_amounts
)
class BlazeDividendsByPayDateLoaderTestCase(DividendsByPayDateTestCase):
@classmethod
def setUpClass(cls):
super(BlazeDividendsByPayDateLoaderTestCase, cls).setUpClass()
cls.loader_type = BlazeDividendsByPayDateLoader
def loader_args(self, dates):
_, mapping = super(
BlazeDividendsByPayDateLoaderTestCase,
self,
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
PAY_DATE_FIELD_NAME: df[PAY_DATE_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME]
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeDividendsByPayDateLoaderNotInteractiveTestCase(
BlazeDividendsByPayDateLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
@classmethod
def setUpClass(cls):
super(BlazeDividendsByPayDateLoaderNotInteractiveTestCase,
cls).setUpClass()
cls.loader_type = BlazeDividendsByPayDateLoader
def loader_args(self, dates):
(bound_expr,) = super(
BlazeDividendsByPayDateLoaderNotInteractiveTestCase,
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
+3
View File
@@ -29,6 +29,9 @@ from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from zipline.pipeline.loaders.blaze import (
BlazeEarningsCalendarLoader,
)
from zipline.utils.test_utils import (
tmp_asset_finder,
)
from zipline.testing import tmp_asset_finder
+12
View File
@@ -4,14 +4,26 @@ Common constants for Pipeline.
AD_FIELD_NAME = 'asof_date'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
CASH_FIELD_NAME = 'cash'
CASH_AMOUNT_FIELD_NAME = 'cash_amount'
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date'
DAYS_SINCE_PREV = 'days_since_prev'
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT = 'days_since_prev_dividend_announcement'
DAYS_SINCE_PREV_EX_DATE = 'days_since_prev_ex_date'
DAYS_TO_NEXT = 'days_to_next'
DAYS_TO_NEXT_EX_DATE = 'days_to_next_ex_date'
EX_DATE_FIELD_NAME = 'ex_date'
NEXT_AMOUNT = 'next_amount'
NEXT_ANNOUNCEMENT = 'next_announcement'
NEXT_EX_DATE = 'next_ex_date'
NEXT_PAY_DATE = 'next_pay_date'
PAY_DATE_FIELD_NAME = 'pay_date'
PREVIOUS_AMOUNT = 'previous_amount'
PREVIOUS_ANNOUNCEMENT = 'previous_announcement'
PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement'
PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash'
PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count'
PREVIOUS_EX_DATE = 'previous_ex_date'
PREVIOUS_PAY_DATE = 'previous_pay_date'
SHARE_COUNT_FIELD_NAME = 'share_count'
SID_FIELD_NAME = 'sid'
TS_FIELD_NAME = 'timestamp'
+25
View File
@@ -0,0 +1,25 @@
"""
Dataset representing dates of upcoming dividends.
"""
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
from .dataset import Column, DataSet
class DividendsByExDate(DataSet):
next_ex_date = Column(datetime64ns_dtype)
previous_ex_date = Column(datetime64ns_dtype)
next_amount = Column(float64_dtype)
previous_amount = Column(float64_dtype)
class DividendsByPayDate(DataSet):
next_pay_date = Column(datetime64ns_dtype)
previous_pay_date = Column(datetime64ns_dtype)
next_amount = Column(float64_dtype)
previous_amount = Column(float64_dtype)
class DividendsByAnnouncementDate(DataSet):
previous_announcement_date = Column(datetime64ns_dtype)
previous_amount = Column(float64_dtype)
+49
View File
@@ -7,6 +7,10 @@ from zipline.pipeline.data.buyback_auth import (
CashBuybackAuthorizations,
ShareBuybackAuthorizations
)
from zipline.pipeline.data.dividends import (
DividendsByAnnouncementDate,
DividendsByExDate
)
from zipline.pipeline.data.earnings import EarningsCalendar
from zipline.utils.numpy_utils import (
NaTD,
@@ -156,3 +160,48 @@ class BusinessDaysSinceShareBuybackAuth(
zipline.pipeline.factors.BusinessDaysSinceShareBuybackAuth
"""
inputs = [ShareBuybackAuthorizations.announcement_date]
class BusinessDaysSinceDividendAnnouncement(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent dividend announcement for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement
"""
inputs = [DividendsByAnnouncementDate.previous_announcement_date]
class BusinessDaysUntilNextExDate(
BusinessDaysUntilNextEvents
):
"""
Factor returning the number of **business days** (not trading days!) until
the next ex date for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement
"""
inputs = [DividendsByExDate.next_ex_date]
class BusinessDaysSincePreviousExDate(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent ex date for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement
"""
inputs = [DividendsByExDate.previous_ex_date]
+223
View File
@@ -0,0 +1,223 @@
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data.dividends import DividendsByExDate, \
DividendsByAnnouncementDate, DividendsByPayDate
from zipline.pipeline.loaders.dividends import DividendsByAnnouncementDateLoader, \
DividendsByPayDateLoader, DividendsByExDateLoader
from .events import BlazeEventsLoader
class BlazeDividendsByAnnouncementDateLoader(BlazeEventsLoader):
"""A pipeline loader for the ``DividendsByAnnouncementDate`` dataset that
loads data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{CASH_AMOUNT_FIELD_NAME}: ?datetime,
{ANNOUNCEMENT_FIELD_NAME}: ?datetime,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the dividends will be announced, and the cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME=ANNOUNCEMENT_FIELD_NAME
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME
})
concrete_loader = DividendsByAnnouncementDateLoader
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=DividendsByAnnouncementDate,
**kwargs):
super(
BlazeDividendsByAnnouncementDateLoader, self
).__init__(expr, dataset=dataset,
resources=resources, odo_kwargs=odo_kwargs,
data_query_time=data_query_time,
data_query_tz=data_query_tz, **kwargs)
class BlazeDividendsByExDateLoader(BlazeEventsLoader):
"""A pipeline loader for the ``DividendsByExDate`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{EX_DATE_FIELD_NAME}: ?datetime,
{CASH_AMOUNT_FIELD_NAME}: ?datetime,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the ex date, the
ex date, and the associated cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
EX_DATE_FIELD_NAME=EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
})
concrete_loader = DividendsByExDateLoader
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=DividendsByExDate,
**kwargs):
super(
BlazeDividendsByExDateLoader, self
).__init__(expr, dataset=dataset,
resources=resources, odo_kwargs=odo_kwargs,
data_query_time=data_query_time,
data_query_tz=data_query_tz, **kwargs)
class BlazeDividendsByPayDateLoader(BlazeEventsLoader):
"""A pipeline loader for the ``DividendsByPayDate`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{PAY_DATE_FIELD_NAME}: ?datetime,
{CASH_AMOUNT_FIELD_NAME}: ?datetime,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the pay date, the pay date,
and the associated cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
PAY_DATE_FIELD_NAME=PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
})
concrete_loader = DividendsByPayDateLoader
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=DividendsByPayDate,
**kwargs):
super(
BlazeDividendsByPayDateLoader, self
).__init__(expr, dataset=dataset,
resources=resources, odo_kwargs=odo_kwargs,
data_query_time=data_query_time,
data_query_tz=data_query_tz, **kwargs)
+116
View File
@@ -0,0 +1,116 @@
from zipline.pipeline.common import (
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME
)
from zipline.pipeline.loaders.events import EventsLoader
from zipline.pipeline.data.dividends import (
DividendsByExDate,
DividendsByAnnouncementDate,
DividendsByPayDate
)
from zipline.utils.memoize import lazyval
class DividendsByAnnouncementDateLoader(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME])
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=DividendsByAnnouncementDate):
super(DividendsByAnnouncementDateLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def previous_announcement_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement_date,
ANNOUNCEMENT_FIELD_NAME
)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
ANNOUNCEMENT_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME
)
class DividendsByPayDateLoader(EventsLoader):
expected_cols = frozenset([PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME])
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=DividendsByPayDate):
super(DividendsByPayDateLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def next_pay_date_loader(self):
return self._next_event_date_loader(self.dataset.next_pay_date,
PAY_DATE_FIELD_NAME)
@lazyval
def previous_pay_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_pay_date,
PAY_DATE_FIELD_NAME
)
@lazyval
def next_amount_loader(self):
return self._next_event_value_loader(self.dataset.next_amount,
PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME
)
class DividendsByExDateLoader(EventsLoader):
expected_cols = frozenset([EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME])
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=DividendsByExDate):
super(DividendsByExDateLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def next_ex_date_loader(self):
return self._next_event_date_loader(self.dataset.next_ex_date,
EX_DATE_FIELD_NAME)
@lazyval
def previous_ex_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_ex_date,
EX_DATE_FIELD_NAME
)
@lazyval
def next_amount_loader(self):
return self._next_event_value_loader(self.dataset.next_amount,
EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME
)
+23 -3
View File
@@ -5,7 +5,7 @@ from toolz import merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import previous_event_frame, next_date_frame
from .utils import previous_event_frame, next_event_frame
from zipline.pipeline.common import TS_FIELD_NAME
from zipline.utils.numpy_utils import NaTD
@@ -167,14 +167,34 @@ class EventsLoader(PipelineLoader):
def _next_event_date_loader(self, next_date_field, event_date_field_name):
return DataFrameLoader(
next_date_field,
next_date_frame(
self.all_dates,
next_event_frame(
self.events_by_sid,
self.all_dates,
next_date_field.missing_value,
next_date_field.dtype,
event_date_field_name,
event_date_field_name
),
adjustments=None,
)
def _next_event_value_loader(self,
next_value_field,
event_date_field_name,
value_field_name):
return DataFrameLoader(
next_value_field,
next_event_frame(
self.events_by_sid,
self.all_dates,
next_value_field.missing_value,
next_value_field.dtype,
event_date_field_name,
value_field_name
),
adjustments=None,
)
def _previous_event_date_loader(self,
prev_date_field,
event_date_field_name):
+23 -9
View File
@@ -8,9 +8,15 @@ from six.moves import zip
from zipline.utils.numpy_utils import NaTns
def next_date_frame(dates, events_by_sid, event_date_field_name):
def next_event_frame(events_by_sid,
dates,
missing_value,
field_dtype,
event_date_field_name,
return_field_name):
"""
Make a DataFrame representing the simulated next known date for an event.
Make a DataFrame representing the simulated next known dates or values
for an event.
Parameters
----------
@@ -36,28 +42,36 @@ def next_date_frame(dates, events_by_sid, event_date_field_name):
--------
previous_date_frame
"""
cols = {
date_cols = {
equity: np.full_like(dates, NaTns) for equity in events_by_sid
}
value_cols = {
equity: np.full(len(dates), missing_value, dtype=field_dtype) for equity
in
events_by_sid
}
raw_dates = dates.values
for equity, df in iteritems(events_by_sid):
event_dates = df[event_date_field_name]
data = cols[equity]
values = df[return_field_name]
data = date_cols[equity]
if not event_dates.index.is_monotonic_increasing:
event_dates = event_dates.sort_index()
# Iterate over the raw Series values, since we're comparing against
# numpy arrays anyway.
iterkv = zip(event_dates.index.values, event_dates.values)
for knowledge_date, event_date in iterkv:
iterkv = zip(event_dates.index.values, event_dates.values, values)
for knowledge_date, event_date, value in iterkv:
date_mask = (
(knowledge_date <= raw_dates) &
(raw_dates <= event_date)
)
value_mask = (event_date <= data) | (data == NaTns)
data[date_mask & value_mask] = event_date
return pd.DataFrame(index=dates, data=cols)
data_indeces = np.where(date_mask & value_mask)
data[data_indeces] = event_date
value_cols[equity][data_indeces] = value
return pd.DataFrame(index=dates, data=value_cols)
def previous_event_frame(events_by_sid,