Merge pull request #990 from quantopian/buyback_auth_in_pipeline

Buyback auth in pipeline
This commit is contained in:
Maya Tydykov
2016-02-26 10:13:19 -05:00
16 changed files with 1641 additions and 542 deletions
+286
View File
@@ -0,0 +1,286 @@
"""
Tests for the reference loader for Buyback Authorizations.
"""
from functools import partial
from unittest import TestCase
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
from contextlib2 import ExitStack
import pandas as pd
from six import iteritems
from tests.pipeline.test_events import EventLoaderCommonMixin, DATE_FIELD_NAME
from zipline.pipeline.common import(
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME,
DAYS_SINCE_PREV,
PREVIOUS_BUYBACK_ANNOUNCEMENT,
PREVIOUS_BUYBACK_CASH,
PREVIOUS_BUYBACK_SHARE_COUNT,
SHARE_COUNT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME)
from zipline.pipeline.data import (CashBuybackAuthorizations,
ShareBuybackAuthorizations)
from zipline.pipeline.factors.events import (
BusinessDaysSincePreviousCashBuybackAuth,
BusinessDaysSincePreviousShareBuybackAuth
)
from zipline.pipeline.loaders.buyback_auth import \
CashBuybackAuthorizationsLoader, ShareBuybackAuthorizationsLoader
from zipline.pipeline.loaders.blaze import (
BlazeCashBuybackAuthorizationsLoader,
BlazeShareBuybackAuthorizationsLoader,
)
from zipline.utils.test_utils import (
tmp_asset_finder,
)
buyback_authorizations = [
# K1--K2--A1--A2.
pd.DataFrame({
SHARE_COUNT_FIELD_NAME: [1, 15],
CASH_FIELD_NAME: [10, 20]
}),
# K1--K2--A2--A1.
pd.DataFrame({
SHARE_COUNT_FIELD_NAME: [7, 13],
CASH_FIELD_NAME: [10, 22]
}),
# K1--A1--K2--A2.
pd.DataFrame({
SHARE_COUNT_FIELD_NAME: [3, 1],
CASH_FIELD_NAME: [4, 7]
}),
# K1 == K2.
pd.DataFrame({
SHARE_COUNT_FIELD_NAME: [6, 23],
CASH_FIELD_NAME: [1, 2]
}),
pd.DataFrame(
columns=[SHARE_COUNT_FIELD_NAME,
CASH_FIELD_NAME],
dtype='datetime64[ns]'
),
]
def create_buyback_auth_tst_frame(cases, field_to_drop):
buyback_auth_df = {
sid:
pd.concat([df, buyback_authorizations[sid]], axis=1).drop(
field_to_drop, 1)
for sid, df
in enumerate(case.rename(columns={DATE_FIELD_NAME:
BUYBACK_ANNOUNCEMENT_FIELD_NAME}
)
for case in cases
)
}
return buyback_auth_df
class CashBuybackAuthLoaderTestCase(TestCase, EventLoaderCommonMixin):
"""
Test for cash buyback authorizations dataset.
"""
pipeline_columns = {
PREVIOUS_BUYBACK_CASH:
CashBuybackAuthorizations.previous_value.latest,
PREVIOUS_BUYBACK_ANNOUNCEMENT:
CashBuybackAuthorizations.previous_announcement_date.latest,
DAYS_SINCE_PREV:
BusinessDaysSincePreviousCashBuybackAuth(),
}
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.finder = stack.enter_context(
tmp_asset_finder(equities=cls.equity_info),
)
cls.cols = {}
cls.dataset = create_buyback_auth_tst_frame(cls.event_dates_cases,
SHARE_COUNT_FIELD_NAME)
cls.loader_type = CashBuybackAuthorizationsLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
_expected_previous_cash = pd.DataFrame({
0: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[10] * num_days_between_dates('2014-01-15', '2014-01-19') +
[20] * num_days_between_dates('2014-01-20', None)
),
1: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[22] * num_days_between_dates('2014-01-15', '2014-01-19') +
[10] * num_days_between_dates('2014-01-20', None)
),
2: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[4] * num_days_between_dates('2014-01-10', '2014-01-19') +
[7] * num_days_between_dates('2014-01-20', None)
),
3: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[1] * num_days_between_dates('2014-01-10', '2014-01-14') +
[2] * num_days_between_dates('2014-01-15', None)
),
4: zip_with_floats_dates(['NaN'] * len(dates)),
}, index=dates)
self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] = \
self.get_expected_previous_event_dates(dates)
self.cols[PREVIOUS_BUYBACK_CASH] = _expected_previous_cash
self.cols[DAYS_SINCE_PREV] = self._compute_busday_offsets(
self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT]
)
class ShareBuybackAuthLoaderTestCase(TestCase, EventLoaderCommonMixin):
"""
Test for share buyback authorizations dataset.
"""
pipeline_columns = {
PREVIOUS_BUYBACK_SHARE_COUNT:
ShareBuybackAuthorizations.previous_share_count.latest,
PREVIOUS_BUYBACK_ANNOUNCEMENT:
ShareBuybackAuthorizations.previous_announcement_date.latest,
DAYS_SINCE_PREV:
BusinessDaysSincePreviousShareBuybackAuth(),
}
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.finder = stack.enter_context(
tmp_asset_finder(equities=cls.equity_info),
)
cls.cols = {}
cls.dataset = create_buyback_auth_tst_frame(cls.event_dates_cases,
CASH_FIELD_NAME)
cls.loader_type = ShareBuybackAuthorizationsLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
_expected_previous_buyback_share_count = pd.DataFrame({
0: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[1] * num_days_between_dates('2014-01-15', '2014-01-19') +
[15] * num_days_between_dates('2014-01-20', None)
),
1: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[13] * num_days_between_dates('2014-01-15', '2014-01-19') +
[7] * num_days_between_dates('2014-01-20', None)
),
2: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[3] * num_days_between_dates('2014-01-10', '2014-01-19') +
[1] * num_days_between_dates('2014-01-20', None)
),
3: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[6] * num_days_between_dates('2014-01-10', '2014-01-14') +
[23] * num_days_between_dates('2014-01-15', None)
),
4: zip_with_floats_dates(['NaN'] * len(dates)),
}, index=dates)
self.cols[
PREVIOUS_BUYBACK_SHARE_COUNT
] = _expected_previous_buyback_share_count
self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] = \
self.get_expected_previous_event_dates(dates)
self.cols[DAYS_SINCE_PREV] = self._compute_busday_offsets(
self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT]
)
class BlazeCashBuybackAuthLoaderTestCase(CashBuybackAuthLoaderTestCase):
""" Test case for loading via blaze.
"""
@classmethod
def setUpClass(cls):
super(BlazeCashBuybackAuthLoaderTestCase, cls).setUpClass()
cls.loader_type = BlazeCashBuybackAuthorizationsLoader
def loader_args(self, dates):
_, mapping = super(
BlazeCashBuybackAuthLoaderTestCase,
self,
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
BUYBACK_ANNOUNCEMENT_FIELD_NAME:
frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME],
CASH_FIELD_NAME:
frame[CASH_FIELD_NAME],
TS_FIELD_NAME:
frame[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, frame in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeShareBuybackAuthLoaderTestCase(ShareBuybackAuthLoaderTestCase):
""" Test case for loading via blaze.
"""
@classmethod
def setUpClass(cls):
super(BlazeShareBuybackAuthLoaderTestCase, cls).setUpClass()
cls.loader_type = BlazeShareBuybackAuthorizationsLoader
def loader_args(self, dates):
_, mapping = super(
BlazeShareBuybackAuthLoaderTestCase,
self,
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
BUYBACK_ANNOUNCEMENT_FIELD_NAME:
frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME],
SHARE_COUNT_FIELD_NAME:
frame[SHARE_COUNT_FIELD_NAME],
TS_FIELD_NAME:
frame[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, frame in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeShareBuybackAuthLoaderNotInteractiveTestCase(
BlazeShareBuybackAuthLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def loader_args(self, dates):
(bound_expr,) = super(
BlazeShareBuybackAuthLoaderNotInteractiveTestCase,
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class BlazeCashBuybackAuthLoaderNotInteractiveTestCase(
BlazeCashBuybackAuthLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def loader_args(self, dates):
(bound_expr,) = super(
BlazeCashBuybackAuthLoaderNotInteractiveTestCase,
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
+45 -310
View File
@@ -6,172 +6,74 @@ from unittest import TestCase
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
from contextlib2 import ExitStack
from nose_parameterized import parameterized
import pandas as pd
import numpy as np
from pandas.util.testing import assert_series_equal
from six import iteritems
from tests.pipeline.test_events import EventLoaderCommonMixin, DATE_FIELD_NAME
from zipline.pipeline import Pipeline
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
DAYS_SINCE_PREV,
DAYS_TO_NEXT,
NEXT_ANNOUNCEMENT,
PREVIOUS_ANNOUNCEMENT,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.factors.events import (
BusinessDaysUntilNextEarnings,
BusinessDaysSincePreviousEarnings,
BusinessDaysUntilNextEarnings,
)
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from zipline.pipeline.loaders.blaze import (
ANNOUNCEMENT_FIELD_NAME,
BlazeEarningsCalendarLoader,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.utils.numpy_utils import make_datetime64D, NaTD
from zipline.utils.test_utils import (
make_simple_equity_info,
tmp_asset_finder,
gen_calendars,
to_series,
num_days_in_range,
)
class EarningsCalendarLoaderTestCase(TestCase):
class EarningsCalendarLoaderTestCase(TestCase, EventLoaderCommonMixin):
"""
Tests for loading the earnings announcement data.
"""
loader_type = EarningsCalendarLoader
pipeline_columns = {
NEXT_ANNOUNCEMENT: EarningsCalendar.next_announcement.latest,
PREVIOUS_ANNOUNCEMENT: EarningsCalendar.previous_announcement.latest,
DAYS_SINCE_PREV: BusinessDaysSincePreviousEarnings(),
DAYS_TO_NEXT: BusinessDaysUntilNextEarnings(),
}
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.sids = A, B, C, D, E = range(5)
equity_info = make_simple_equity_info(
cls.sids,
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
cls.cols = {}
cls.dataset = {sid: df for sid, df in enumerate(
case.rename(
columns={DATE_FIELD_NAME: ANNOUNCEMENT_FIELD_NAME}
) for case in cls.event_dates_cases)}
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
)
cls.earnings_dates = {
# K1--K2--E1--E2.
A: to_series(
knowledge_dates=['2014-01-05', '2014-01-10'],
earning_dates=['2014-01-15', '2014-01-20'],
),
# K1--K2--E2--E1.
B: to_series(
knowledge_dates=['2014-01-05', '2014-01-10'],
earning_dates=['2014-01-20', '2014-01-15']
),
# K1--E1--K2--E2.
C: to_series(
knowledge_dates=['2014-01-05', '2014-01-15'],
earning_dates=['2014-01-10', '2014-01-20']
),
# K1 == K2.
D: to_series(
knowledge_dates=['2014-01-05'] * 2,
earning_dates=['2014-01-10', '2014-01-15'],
),
E: pd.Series(
data=[],
index=pd.DatetimeIndex([]),
dtype='datetime64[ns]',
),
}
cls.loader_type = EarningsCalendarLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def loader_args(self, dates):
"""Construct the base earnings announcements object to pass to the
loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.earnings_dates
def setup(self, dates):
"""
Make a PipelineEngine and expectation functions for the given dates
calendar.
_expected_next_announce = self.get_expected_next_event_dates(dates)
This exists to make it easy to test our various cases with critical
dates missing from the calendar.
"""
A, B, C, D, E = self.sids
def num_days_between(start_date, end_date):
return num_days_in_range(dates, start_date, end_date)
def zip_with_dates(dts):
return pd.Series(pd.to_datetime(dts), index=dates)
_expected_next_announce = pd.DataFrame({
A: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-15'] * num_days_between('2014-01-05', '2014-01-15') +
['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
['NaT'] * num_days_between('2014-01-21', None)
),
B: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-20'] * num_days_between('2014-01-05', '2014-01-09') +
['2014-01-15'] * num_days_between('2014-01-10', '2014-01-15') +
['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
['NaT'] * num_days_between('2014-01-21', None)
),
C: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
['NaT'] * num_days_between('2014-01-11', '2014-01-14') +
['2014-01-20'] * num_days_between('2014-01-15', '2014-01-20') +
['NaT'] * num_days_between('2014-01-21', None)
),
D: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
['2014-01-15'] * num_days_between('2014-01-11', '2014-01-15') +
['NaT'] * num_days_between('2014-01-16', None)
),
E: zip_with_dates(['NaT'] * len(dates)),
}, index=dates)
_expected_previous_announce = pd.DataFrame({
A: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-14') +
['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
['2014-01-20'] * num_days_between('2014-01-20', None)
),
B: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-14') +
['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
['2014-01-20'] * num_days_between('2014-01-20', None)
),
C: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-09') +
['2014-01-10'] * num_days_between('2014-01-10', '2014-01-19') +
['2014-01-20'] * num_days_between('2014-01-20', None)
),
D: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-09') +
['2014-01-10'] * num_days_between('2014-01-10', '2014-01-14') +
['2014-01-15'] * num_days_between('2014-01-15', None)
),
E: zip_with_dates(['NaT'] * len(dates)),
}, index=dates)
_expected_previous_announce = self.get_expected_previous_event_dates(
dates
)
_expected_next_busday_offsets = self._compute_busday_offsets(
_expected_next_announce
@@ -179,164 +81,17 @@ class EarningsCalendarLoaderTestCase(TestCase):
_expected_previous_busday_offsets = self._compute_busday_offsets(
_expected_previous_announce
)
def expected_next_announce(sid):
"""
Return the expected next announcement dates for ``sid``.
"""
return _expected_next_announce[sid]
def expected_next_busday_offset(sid):
"""
Return the expected number of days to the next announcement for
``sid``.
"""
return _expected_next_busday_offsets[sid]
def expected_previous_announce(sid):
"""
Return the expected previous announcement dates for ``sid``.
"""
return _expected_previous_announce[sid]
def expected_previous_busday_offset(sid):
"""
Return the expected number of days to the next announcement for
``sid``.
"""
return _expected_previous_busday_offsets[sid]
loader = self.loader_type(*self.loader_args(dates))
engine = SimplePipelineEngine(lambda _: loader, dates, self.finder)
return (
engine,
expected_next_announce,
expected_next_busday_offset,
expected_previous_announce,
expected_previous_busday_offset,
)
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == NaTD
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
@parameterized.expand(gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
], utc=True),
))
def test_compute_earnings(self, dates):
(
engine,
expected_next,
expected_next_busday_offset,
expected_previous,
expected_previous_busday_offset,
) = self.setup(dates)
pipe = Pipeline(
columns={
'next': EarningsCalendar.next_announcement.latest,
'previous': EarningsCalendar.previous_announcement.latest,
'days_to_next': BusinessDaysUntilNextEarnings(),
'days_since_prev': BusinessDaysSincePreviousEarnings(),
}
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
computed_next = result['next']
computed_previous = result['previous']
computed_next_busday_offset = result['days_to_next']
computed_previous_busday_offset = result['days_since_prev']
# NaTs in next/prev should correspond to NaNs in offsets.
assert_series_equal(
computed_next.isnull(),
computed_next_busday_offset.isnull(),
check_names=False,
)
assert_series_equal(
computed_previous.isnull(),
computed_previous_busday_offset.isnull(),
check_names=False,
)
for sid in self.sids:
assert_series_equal(
computed_next.xs(sid, level=1),
expected_next(sid),
sid,
check_names=False,
)
assert_series_equal(
computed_previous.xs(sid, level=1),
expected_previous(sid),
sid,
check_names=False,
)
assert_series_equal(
computed_next_busday_offset.xs(sid, level=1),
expected_next_busday_offset(sid),
sid,
check_names=False,
)
assert_series_equal(
computed_previous_busday_offset.xs(sid, level=1),
expected_previous_busday_offset(sid),
sid,
check_names=False,
)
self.cols[PREVIOUS_ANNOUNCEMENT] = _expected_previous_announce
self.cols[NEXT_ANNOUNCEMENT] = _expected_next_announce
self.cols[DAYS_TO_NEXT] = _expected_next_busday_offsets
self.cols[DAYS_SINCE_PREV] = _expected_previous_busday_offsets
class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
loader_type = BlazeEarningsCalendarLoader
@classmethod
def setUpClass(cls):
super(BlazeEarningsCalendarLoaderTestCase, cls).setUpClass()
cls.loader_type = BlazeEarningsCalendarLoader
def loader_args(self, dates):
_, mapping = super(
@@ -345,11 +100,11 @@ class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
ANNOUNCEMENT_FIELD_NAME: earning_dates,
TS_FIELD_NAME: earning_dates.index,
ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, earning_dates in iteritems(mapping)
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
@@ -357,35 +112,15 @@ class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
BlazeEarningsCalendarLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
@classmethod
def setUpClass(cls):
super(BlazeEarningsCalendarLoaderNotInteractiveTestCase,
cls).setUpClass()
cls.loader_type = BlazeEarningsCalendarLoader
def loader_args(self, dates):
(bound_expr,) = super(
BlazeEarningsCalendarLoaderNotInteractiveTestCase,
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class EarningsCalendarLoaderInferTimestampTestCase(TestCase):
def test_infer_timestamp(self):
dtx = pd.date_range('2014-01-01', '2014-01-10')
announcement_dates = {
0: dtx,
1: pd.Series(dtx, dtx),
}
loader = EarningsCalendarLoader(
dtx,
announcement_dates,
infer_timestamps=True,
)
self.assertEqual(
loader.announcement_dates.keys(),
announcement_dates.keys(),
)
assert_series_equal(
loader.announcement_dates[0],
pd.Series(index=[dtx[0]] * 10, data=dtx),
)
assert_series_equal(
loader.announcement_dates[1],
announcement_dates[1],
)
+486
View File
@@ -0,0 +1,486 @@
"""
Tests for setting up an EventsLoader and a BlazeEventsLoader.
"""
from functools import partial
from nose_parameterized import parameterized
import re
from unittest import TestCase
import blaze as bz
import numpy as np
import pandas as pd
from pandas.util.testing import assert_series_equal
from zipline.pipeline import SimplePipelineEngine, Pipeline
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data import DataSet, Column
from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader
from zipline.pipeline.loaders.events import (
DF_NO_TS_NOT_INFER_TS_ERROR,
DTINDEX_NOT_INFER_TS_ERROR,
EventsLoader,
SERIES_NO_DTINDEX_ERROR,
WRONG_COLS_ERROR,
WRONG_MANY_COL_DATA_FORMAT_ERROR,
WRONG_SINGLE_COL_DATA_FORMAT_ERROR
)
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import (
datetime64ns_dtype,
NaTD,
make_datetime64D
)
from zipline.utils.test_utils import (
gen_calendars,
num_days_in_range,
make_simple_equity_info
)
ABSTRACT_CONCRETE_LOADER_ERROR = 'abstract methods concrete_loader'
ABSTRACT_EXPECTED_COLS_ERROR = 'abstract methods expected_cols'
DATE_FIELD_NAME = "event_date"
class EventDataSet(DataSet):
previous_announcement = Column(datetime64ns_dtype)
class EventDataSetLoader(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME])
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=EventDataSet):
super(EventDataSetLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
@lazyval
def previous_announcement_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement,
ANNOUNCEMENT_FIELD_NAME,
)
@lazyval
def next_announcement_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement,
ANNOUNCEMENT_FIELD_NAME,
)
# Test case just for catching an error when multiple columns are in the wrong
# data format, so no loader defined.
class EventDataSetLoaderMultipleExpectedCols(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME, "other_field"])
class EventDataSetLoaderNoExpectedCols(EventsLoader):
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=EventDataSet):
super(EventDataSetLoaderNoExpectedCols, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
dtx = pd.date_range('2014-01-01', '2014-01-10')
class EventLoaderTestCase(TestCase):
def assert_loader_error(self, events_by_sid, error, msg,
infer_timestamps, loader):
with self.assertRaisesRegexp(error, re.escape(msg)):
loader(
dtx, events_by_sid, infer_timestamps=infer_timestamps,
)
def test_no_expected_cols_defined(self):
events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})}
self.assert_loader_error(events_by_sid, TypeError,
ABSTRACT_EXPECTED_COLS_ERROR,
True, EventDataSetLoaderNoExpectedCols)
def test_wrong_cols(self):
wrong_col_name = 'some_other_col'
# Test wrong cols (cols != expected)
events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})}
self.assert_loader_error(
events_by_sid, ValueError, WRONG_COLS_ERROR.format(
expected_columns=list(EventDataSetLoader.expected_cols),
sid=0,
resulting_columns=[wrong_col_name],
),
True,
EventDataSetLoader
)
@parameterized.expand([
# DataFrame without timestamp column and infer_timestamps = True
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), True],
# DataFrame with timestamp column
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
TS_FIELD_NAME: dtx}), False],
# DatetimeIndex with infer_timestamps = True
[pd.DatetimeIndex(dtx), True],
# Series with DatetimeIndex as index and infer_timestamps = False
[pd.Series(dtx, index=dtx), False]
])
def test_conversion_to_df(self, df, infer_timestamps):
events_by_sid = {0: df}
loader = EventDataSetLoader(
dtx,
events_by_sid,
infer_timestamps=infer_timestamps,
)
self.assertEqual(
loader.events_by_sid.keys(),
events_by_sid.keys(),
)
if infer_timestamps:
expected = pd.Series(index=[dtx[0]] * 10, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME)
else:
expected = pd.Series(index=dtx, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME)
expected.index.name = TS_FIELD_NAME
# Check that index by first given date has been added
assert_series_equal(
loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME],
expected,
)
@parameterized.expand(
[
# DataFrame without timestamp column and infer_timestamps = True
[
pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}),
False,
DF_NO_TS_NOT_INFER_TS_ERROR.format(
timestamp_column_name=TS_FIELD_NAME,
sid=0
),
EventDataSetLoader
],
# DatetimeIndex with infer_timestamps = False
[
pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME),
False,
DTINDEX_NOT_INFER_TS_ERROR.format(sid=0),
EventDataSetLoader
],
# Series with DatetimeIndex as index and infer_timestamps = False
[
pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME),
False,
SERIES_NO_DTINDEX_ERROR.format(sid=0),
EventDataSetLoader
],
# Below, 2 cases repeated for infer_timestamps = True and False.
# Shouldn't make a difference in the outcome.
# We expected 1 column but got a data structure other than a
# DataFrame, Series, or DatetimeIndex
[
[dtx],
True,
WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoader
],
# We expected multiple columns but got a data structure other
# than a DataFrame
[
[dtx, dtx],
True,
WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoaderMultipleExpectedCols
],
[
[dtx],
False,
WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoader
],
# We expected multiple columns but got a data structure other
# than a DataFrame
[
[dtx, dtx],
False,
WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoaderMultipleExpectedCols
]
]
)
def test_bad_conversion_to_df(self, df, infer_timestamps, msg, loader):
events_by_sid = {0: df}
self.assert_loader_error(events_by_sid, ValueError, msg,
infer_timestamps, loader)
class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader):
def __init__(self,
expr,
dataset=EventDataSet,
**kwargs):
super(
BlazeEventDataSetLoaderNoConcreteLoader, self
).__init__(expr,
dataset=dataset,
**kwargs)
class BlazeEventLoaderTestCase(TestCase):
# Blaze loader: need to test failure if no concrete loader
def test_no_concrete_loader_defined(self):
with self.assertRaisesRegexp(
TypeError, re.escape(ABSTRACT_CONCRETE_LOADER_ERROR)
):
BlazeEventDataSetLoaderNoConcreteLoader(
bz.Data(
pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
SID_FIELD_NAME: 0
})
)
)
# Must be a list - can't use generator since this needs to be used more than
# once.
param_dates = list(gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
], utc=True),
))
class EventLoaderCommonMixin(object):
sids = A, B, C, D, E = range(5)
equity_info = make_simple_equity_info(
sids,
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
event_dates_cases = [
# K1--K2--E1--E2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20'])
}),
# K1--K2--E2--E1.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15'])
}),
# K1--E1--K2--E2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']),
DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20'])
}),
# K1 == K2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2),
DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15'])
}),
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime([]),
DATE_FIELD_NAME: pd.to_datetime([])
})
]
def zip_with_floats(self, dates, flts):
return pd.Series(flts, index=dates).astype('float')
def num_days_between(self, dates, start_date, end_date):
return num_days_in_range(dates, start_date, end_date)
def zip_with_dates(self, index_dates, dts):
return pd.Series(pd.to_datetime(dts), index=index_dates)
def loader_args(self, dates):
"""Construct the base object to pass to the loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.dataset
def setup_engine(self, dates):
"""
Make a Pipeline Enigne object based on the given dates.
"""
loader = self.loader_type(*self.loader_args(dates))
return SimplePipelineEngine(lambda _: loader, dates, self.finder)
def get_expected_next_event_dates(self, dates):
num_days_between_for_dates = partial(self.num_days_between, dates)
zip_with_dates_for_dates = partial(self.zip_with_dates, dates)
return pd.DataFrame({
0: zip_with_dates_for_dates(
['NaT'] *
num_days_between_for_dates(None, '2014-01-04') +
['2014-01-15'] *
num_days_between_for_dates('2014-01-05', '2014-01-15') +
['2014-01-20'] *
num_days_between_for_dates('2014-01-16', '2014-01-20') +
['NaT'] *
num_days_between_for_dates('2014-01-21', None)
),
1: zip_with_dates_for_dates(
['NaT'] *
num_days_between_for_dates(None, '2014-01-04') +
['2014-01-20'] *
num_days_between_for_dates('2014-01-05', '2014-01-09') +
['2014-01-15'] *
num_days_between_for_dates('2014-01-10', '2014-01-15') +
['2014-01-20'] *
num_days_between_for_dates('2014-01-16', '2014-01-20') +
['NaT'] *
num_days_between_for_dates('2014-01-21', None)
),
2: zip_with_dates_for_dates(
['NaT'] *
num_days_between_for_dates(None, '2014-01-04') +
['2014-01-10'] *
num_days_between_for_dates('2014-01-05', '2014-01-10') +
['NaT'] *
num_days_between_for_dates('2014-01-11', '2014-01-14') +
['2014-01-20'] *
num_days_between_for_dates('2014-01-15', '2014-01-20') +
['NaT'] *
num_days_between_for_dates('2014-01-21', None)
),
3: zip_with_dates_for_dates(
['NaT'] *
num_days_between_for_dates(None, '2014-01-04') +
['2014-01-10'] *
num_days_between_for_dates('2014-01-05', '2014-01-10') +
['2014-01-15'] *
num_days_between_for_dates('2014-01-11', '2014-01-15') +
['NaT'] *
num_days_between_for_dates('2014-01-16', None)
),
4: zip_with_dates_for_dates(['NaT'] *
len(dates)),
}, index=dates)
def get_expected_previous_event_dates(self, dates):
num_days_between_for_dates = partial(self.num_days_between, dates)
zip_with_dates_for_dates = partial(self.zip_with_dates, dates)
return pd.DataFrame({
0: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
1: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
2: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-09') +
['2014-01-10'] * num_days_between_for_dates('2014-01-10',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
3: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-09') +
['2014-01-10'] * num_days_between_for_dates('2014-01-10',
'2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
None),
),
4: zip_with_dates_for_dates(['NaT'] * len(dates)),
}, index=dates)
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == NaTD
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
@parameterized.expand(param_dates)
def test_compute(self, dates):
engine = self.setup_engine(dates)
self.setup(dates)
pipe = Pipeline(
columns=self.pipeline_columns
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
for sid in self.sids:
for col_name in self.cols.keys():
assert_series_equal(result[col_name].xs(sid, level=1),
self.cols[col_name][sid],
check_names=False)
+17
View File
@@ -0,0 +1,17 @@
"""
Common constants for Pipeline.
"""
AD_FIELD_NAME = 'asof_date'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
CASH_FIELD_NAME = 'cash'
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date'
DAYS_SINCE_PREV = 'days_since_prev'
DAYS_TO_NEXT = 'days_to_next'
NEXT_ANNOUNCEMENT = 'next_announcement'
PREVIOUS_ANNOUNCEMENT = 'previous_announcement'
PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement'
PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash'
PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count'
SHARE_COUNT_FIELD_NAME = 'share_count'
SID_FIELD_NAME = 'sid'
TS_FIELD_NAME = 'timestamp'
+3
View File
@@ -1,11 +1,14 @@
from .buyback_auth import CashBuybackAuthorizations, ShareBuybackAuthorizations
from .earnings import EarningsCalendar
from .equity_pricing import USEquityPricing
from .dataset import DataSet, Column, BoundColumn
__all__ = [
'BoundColumn',
'CashBuybackAuthorizations',
'Column',
'DataSet',
'EarningsCalendar',
'ShareBuybackAuthorizations',
'USEquityPricing',
]
+24
View File
@@ -0,0 +1,24 @@
"""
Datasets representing dates of recently announced buyback authorizations.
"""
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
from .dataset import Column, DataSet
class CashBuybackAuthorizations(DataSet):
"""
Dataset representing dates of recently announced cash buyback
authorizations.
"""
previous_value = Column(float64_dtype)
previous_announcement_date = Column(datetime64ns_dtype)
class ShareBuybackAuthorizations(DataSet):
"""
Dataset representing dates of recently announced share buyback
authorizations.
"""
previous_share_count = Column(float64_dtype)
previous_announcement_date = Column(datetime64ns_dtype)
+80 -34
View File
@@ -3,6 +3,10 @@ Factors describing information about event data (e.g. earnings
announcements, acquisitions, dividends, etc.).
"""
from numpy import newaxis
from zipline.pipeline.data.buyback_auth import (
CashBuybackAuthorizations,
ShareBuybackAuthorizations
)
from zipline.pipeline.data.earnings import EarningsCalendar
from zipline.utils.numpy_utils import (
NaTD,
@@ -14,10 +18,42 @@ from zipline.utils.numpy_utils import (
from .factor import Factor
class BusinessDaysUntilNextEarnings(Factor):
class BusinessDaysSincePreviousEvents(Factor):
"""
Factor returning the number of **business days** (not trading days!) until
the next known earnings date for each asset.
Abstract class for business days since a previous event.
Returns the number of **business days** (not trading days!) since
the most recent event date for each asset.
This doesn't use trading days for symmetry with
BusinessDaysUntilNextEarnings.
Assets which announced or will announce the event today will produce a
value of 0.0. Assets that announced the event on the previous business
day will produce a value of 1.0.
Assets for which the event date is `NaT` will produce a value of `NaN`.
"""
window_length = 0
dtype = float64_dtype
def _compute(self, arrays, dates, assets, mask):
# Coerce from [ns] to [D] for numpy busday_count.
announce_dates = arrays[0].astype(datetime64D_dtype)
# Set masked values to NaT.
announce_dates[~mask] = NaTD
# Convert row labels into a column vector for broadcasted comparison.
reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis]
return busday_count_mask_NaT(announce_dates, reference_dates)
class BusinessDaysUntilNextEvents(Factor):
"""
Abstract class for business days since a next event.
Returns the number of **business days** (not trading days!) until
the next known event date for each asset.
This doesn't use trading days because the trading calendar includes
information that may not have been available to the algorithm at the time
@@ -26,19 +62,12 @@ class BusinessDaysUntilNextEarnings(Factor):
For example, the NYSE closings September 11th 2001, would not have been
known to the algorithm on September 10th.
Assets that announced or will announce earnings today will produce a value
of 0.0. Assets that will announce earnings on the next upcoming business
Assets that announced or will announce the event today will produce a value
of 0.0. Assets that will announce the event on the next upcoming business
day will produce a value of 1.0.
Assets for which `EarningsCalendar.next_announcement` is `NaT` will produce
a value of `NaN`.
See Also
--------
zipline.pipeline.factors.BusinessDaysSincePreviousEarnings
Assets for which the event date is `NaT` will produce a value of `NaN`.
"""
inputs = [EarningsCalendar.next_announcement]
window_length = 0
dtype = float64_dtype
@@ -55,37 +84,54 @@ class BusinessDaysUntilNextEarnings(Factor):
return busday_count_mask_NaT(reference_dates, announce_dates)
class BusinessDaysSincePreviousEarnings(Factor):
class BusinessDaysUntilNextEarnings(BusinessDaysUntilNextEvents):
"""
Factor returning the number of **business days** (not trading days!) until
the next known earnings date for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSincePreviousEarnings
"""
inputs = [EarningsCalendar.next_announcement]
class BusinessDaysSincePreviousEarnings(BusinessDaysSincePreviousEvents):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent earnings date for each asset.
This doesn't use trading days for symmetry with
BusinessDaysUntilNextEarnings.
Assets which announced or will announce earnings today will produce a value
of 0.0. Assets that announced earnings on the previous business day will
produce a value of 1.0.
Assets for which `EarningsCalendar.previous_announcement` is `NaT` will
produce a value of `NaN`.
See Also
--------
zipline.pipeline.factors.BusinessDaysUntilNextEarnings
"""
inputs = [EarningsCalendar.previous_announcement]
window_length = 0
dtype = float64_dtype
def _compute(self, arrays, dates, assets, mask):
# Coerce from [ns] to [D] for numpy busday_count.
announce_dates = arrays[0].astype(datetime64D_dtype)
class BusinessDaysSincePreviousCashBuybackAuth(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent cash buyback authorization for each asset.
# Set masked values to NaT.
announce_dates[~mask] = NaTD
See Also
--------
zipline.pipeline.factors.BusinessDaysSincePreviousCashBuybackAuth
"""
inputs = [CashBuybackAuthorizations.previous_announcement_date]
# Convert row labels into a column vector for broadcasted comparison.
reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis]
return busday_count_mask_NaT(announce_dates, reference_dates)
class BusinessDaysSincePreviousShareBuybackAuth(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent share buyback authorization for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSincePreviousShareBuybackAuth
"""
inputs = [ShareBuybackAuthorizations.previous_announcement_date]
+9 -9
View File
@@ -1,25 +1,25 @@
from .buyback_auth import (
BlazeCashBuybackAuthorizationsLoader,
BlazeShareBuybackAuthorizationsLoader
)
from .core import (
AD_FIELD_NAME,
BlazeLoader,
NoDeltasWarning,
SID_FIELD_NAME,
TS_FIELD_NAME,
from_blaze,
global_loader,
)
from .earnings import (
ANNOUNCEMENT_FIELD_NAME,
BlazeEarningsCalendarLoader,
)
__all__ = (
'AD_FIELD_NAME',
'ANNOUNCEMENT_FIELD_NAME',
'BlazeCashBuybackAuthorizationsLoader',
'BlazeEarningsCalendarLoader',
'BlazeLoader',
'NoDeltasWarning',
'SID_FIELD_NAME',
'TS_FIELD_NAME',
'BlazeShareBuybackAuthorizationsLoader',
'from_blaze',
'global_loader',
'NoDeltasWarning',
)
@@ -0,0 +1,160 @@
from .core import (
TS_FIELD_NAME,
SID_FIELD_NAME,
)
from zipline.pipeline.common import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME,
SHARE_COUNT_FIELD_NAME
)
from zipline.pipeline.data import (CashBuybackAuthorizations,
ShareBuybackAuthorizations)
from zipline.pipeline.loaders.buyback_auth import (
CashBuybackAuthorizationsLoader,
ShareBuybackAuthorizationsLoader,
)
from .events import BlazeEventsLoader
class BlazeCashBuybackAuthorizationsLoader(BlazeEventsLoader):
"""A pipeline loader for the ``CashBuybackAuthorizations`` dataset that
loads data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime,
{CASH_FIELD_NAME}: ?float64
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the buyback was announced, the share count, and the cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME=CASH_FIELD_NAME
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME
})
concrete_loader = CashBuybackAuthorizationsLoader
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=CashBuybackAuthorizations,
**kwargs):
super(
BlazeCashBuybackAuthorizationsLoader, self
).__init__(expr,
resources=resources,
odo_kwargs=odo_kwargs,
data_query_time=data_query_time,
data_query_tz=data_query_tz,
dataset=dataset,
**kwargs)
class BlazeShareBuybackAuthorizationsLoader(BlazeEventsLoader):
"""A pipeline loader for the ``ShareBuybackAuthorizations`` dataset that
loads data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime,
{SHARE_COUNT_FIELD_NAME}: ?float64,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the buyback was announced, the share count, and the value.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME=SHARE_COUNT_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
})
concrete_loader = ShareBuybackAuthorizationsLoader
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=ShareBuybackAuthorizations,
**kwargs):
super(
BlazeShareBuybackAuthorizationsLoader, self
).__init__(expr,
resources=resources,
odo_kwargs=odo_kwargs,
data_query_time=data_query_time,
data_query_tz=data_query_tz,
dataset=dataset,
**kwargs)
+5 -4
View File
@@ -158,7 +158,11 @@ from toolz import (
)
import toolz.curried.operator as op
from zipline.pipeline.common import (
AD_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data.dataset import DataSet, Column
from zipline.pipeline.loaders.utils import (
check_data_query_args,
@@ -179,9 +183,6 @@ from zipline.utils.pandas_utils import sort_values
from zipline.utils.preprocess import preprocess
AD_FIELD_NAME = 'asof_date'
TS_FIELD_NAME = 'timestamp'
SID_FIELD_NAME = 'sid'
valid_deltas_node_types = (
bz.expr.Field,
bz.expr.ReLabel,
+18 -86
View File
@@ -1,29 +1,14 @@
from datashape import istabular
import pandas as pd
from toolz import valmap
from .core import (
TS_FIELD_NAME,
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
SID_FIELD_NAME,
bind_expression_to_resources,
ffill_query_in_range,
TS_FIELD_NAME,
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
)
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
from .events import BlazeEventsLoader
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
class BlazeEarningsCalendarLoader(PipelineLoader):
class BlazeEarningsCalendarLoader(BlazeEventsLoader):
"""A pipeline loader for the ``EarningsCalendar`` dataset that loads
data from a blaze expression.
@@ -39,6 +24,8 @@ class BlazeEarningsCalendarLoader(PipelineLoader):
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
@@ -57,6 +44,7 @@ class BlazeEarningsCalendarLoader(PipelineLoader):
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
@@ -69,75 +57,19 @@ class BlazeEarningsCalendarLoader(PipelineLoader):
ANNOUNCEMENT_FIELD_NAME,
})
@preprocess(data_query_tz=optionally(ensure_timezone))
concrete_loader = EarningsCalendarLoader
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=EarningsCalendar):
dshape = expr.dshape
if not istabular(dshape):
raise ValueError(
'expression dshape must be tabular, got: %s' % dshape,
)
expected_fields = self._expected_fields
self._expr = bind_expression_to_resources(
expr[list(expected_fields)],
resources,
)
self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {}
self._dataset = dataset
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
data_query_tz = self._data_query_tz
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
self._expr,
lower_dt,
upper_dt,
self._odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
gb = raw.groupby(SID_FIELD_NAME)
def mkseries(idx, raw_loc=raw.loc):
vs = raw_loc[
idx, [TS_FIELD_NAME, ANNOUNCEMENT_FIELD_NAME]
].values
return pd.Series(
index=pd.DatetimeIndex(vs[:, 0]),
data=vs[:, 1],
)
return EarningsCalendarLoader(
dates,
valmap(mkseries, gb.groups),
dataset=self._dataset,
).load_adjusted_array(columns, dates, assets, mask)
dataset=EarningsCalendar,
**kwargs):
super(
BlazeEarningsCalendarLoader, self
).__init__(expr, dataset=dataset,
resources=resources, odo_kwargs=odo_kwargs,
data_query_time=data_query_time,
data_query_tz=data_query_tz, **kwargs)
+128
View File
@@ -0,0 +1,128 @@
import abc
from datashape import istabular
from .core import (
bind_expression_to_resources,
ffill_query_in_range,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.common import (
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
)
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
class BlazeEventsLoader(PipelineLoader):
"""An abstract pipeline loader for the events datasets that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset : DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
}}
And other dataset-specific fields, where each row of the table is a
record including the sid to identify the company, the timestamp where we
learned about the announcement, and the date when the earnings will be z
announced.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=None):
dshape = expr.dshape
if not istabular(dshape):
raise ValueError(
'expression dshape must be tabular, got: %s' % dshape,
)
expected_fields = self._expected_fields
self._expr = bind_expression_to_resources(
expr[list(expected_fields)],
resources,
)
self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {}
self._dataset = dataset
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
@abc.abstractproperty
def concrete_loader(self):
NotImplementedError('concrete_loader')
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
data_query_tz = self._data_query_tz
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
self._expr,
lower_dt,
upper_dt,
self._odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
gb = raw.groupby(SID_FIELD_NAME)
return self.concrete_loader(
dates,
self.prepare_data(raw, gb),
dataset=self._dataset,
).load_adjusted_array(columns, dates, assets, mask)
def prepare_data(self, raw, gb):
return {sid: raw.loc[group].drop(SID_FIELD_NAME, axis=1) for sid, group
in gb.groups.items()}
+98
View File
@@ -0,0 +1,98 @@
"""
Reference implementation for buyback auth loaders.
"""
from ..data.buyback_auth import (
CashBuybackAuthorizations,
ShareBuybackAuthorizations
)
from .events import EventsLoader
from zipline.pipeline.common import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME,
SHARE_COUNT_FIELD_NAME
)
from zipline.utils.memoize import lazyval
class CashBuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.CashBuybackAuthorizations`.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, cash value)]
"""
expected_cols = frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME])
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=CashBuybackAuthorizations):
super(CashBuybackAuthorizationsLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
@lazyval
def previous_value_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_value,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME
)
@lazyval
def previous_announcement_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement_date,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
)
class ShareBuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.ShareBuybackAuthorizations`.
Does not currently support adjustments to the dates of known buyback
authorizations.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, share value)]
"""
expected_cols = frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME])
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=ShareBuybackAuthorizations):
super(ShareBuybackAuthorizationsLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
@lazyval
def previous_share_count_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_share_count,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME
)
@lazyval
def previous_announcement_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement_date,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
)
+11 -87
View File
@@ -1,108 +1,32 @@
"""
Reference implementation for EarningsCalendar loaders.
"""
from itertools import repeat
import pandas as pd
from six import iteritems
from toolz import merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import next_date_frame, previous_date_frame
from ..data.earnings import EarningsCalendar
from .events import EventsLoader
from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME
from zipline.utils.memoize import lazyval
class EarningsCalendarLoader(PipelineLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.earnings.EarningsCalendar`.
class EarningsCalendarLoader(EventsLoader):
Does not currently support adjustments to the dates of known earnings.
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME])
Parameters
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex]
Dict mapping sids to objects representing dates on which earnings
occurred.
If a dict value is a Series, it's interpreted as a mapping from the
date on which we learned an announcement was coming to the date on
which the announcement was made.
If a dict value is a DatetimeIndex, it's interpreted as just containing
the dates that announcements were made, and we assume we knew about the
announcement on all prior dates. This mode is only supported if
``infer_timestamp`` is explicitly passed as a truthy value.
infer_timestamps : bool, optional
Whether to allow passing ``DatetimeIndex`` values in
``announcement_dates``.
"""
def __init__(self,
all_dates,
announcement_dates,
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=EarningsCalendar):
self.all_dates = all_dates
self.announcement_dates = announcement_dates = (
announcement_dates.copy()
super(EarningsCalendarLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
dates = self.all_dates.values
for k, v in iteritems(announcement_dates):
if isinstance(v, pd.DatetimeIndex):
if not infer_timestamps:
raise ValueError(
"Got DatetimeIndex of announcement dates for sid %d.\n"
"Pass `infer_timestamps=True` to use the first date in"
" `all_dates` as implicit timestamp."
)
# If we are passed a DatetimeIndex, we always have
# knowledge of the announcements.
announcement_dates[k] = pd.Series(
v, index=repeat(dates[0], len(v)),
)
self.dataset = dataset
def get_loader(self, column):
"""Dispatch to the loader for ``column``.
"""
if column is self.dataset.next_announcement:
return self.next_announcement_loader
elif column is self.dataset.previous_announcement:
return self.previous_announcement_loader
else:
raise ValueError("Don't know how to load column '%s'." % column)
@lazyval
def next_announcement_loader(self):
return DataFrameLoader(
self.dataset.next_announcement,
next_date_frame(
self.all_dates,
self.announcement_dates,
),
adjustments=None,
)
return self._next_event_date_loader(self.dataset.next_announcement,
ANNOUNCEMENT_FIELD_NAME)
@lazyval
def previous_announcement_loader(self):
return DataFrameLoader(
return self._previous_event_date_loader(
self.dataset.previous_announcement,
previous_date_frame(
self.all_dates,
self.announcement_dates,
),
adjustments=None,
)
def load_adjusted_array(self, columns, dates, assets, mask):
return merge(
self.get_loader(column).load_adjusted_array(
[column], dates, assets, mask
)
for column in columns
ANNOUNCEMENT_FIELD_NAME
)
+205
View File
@@ -0,0 +1,205 @@
import abc
import pandas as pd
from six import iteritems
from toolz import merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import next_date_frame, previous_date_frame, previous_value
from zipline.pipeline.common import TS_FIELD_NAME
WRONG_COLS_ERROR = "Expected columns {expected_columns} for sid {sid} but " \
"got columns {resulting_columns}."
WRONG_SINGLE_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have "
"1 column and to be in a DataFrame, "
"Series, or DatetimeIndex.")
WRONG_MANY_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have "
"more than 1 column and to be in a "
"DataFrame.")
SERIES_NO_DTINDEX_ERROR = ("Got Series for sid {sid}, but index was not "
"DatetimeIndex.")
DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid {sid}.\n"
"Pass `infer_timestamps=True` to use the first "
"date in `all_dates` as implicit timestamp.")
DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '{"
"timestamp_column_name}' column for sid {sid}."
"\nPass `infer_timestamps=True` to use the "
"first date in `all_dates` as implicit "
"timestamp.")
class EventsLoader(PipelineLoader):
"""
Abstract loader.
Does not currently support adjustments to the dates of known events.
Parameters
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
events_by_sid : dict[int -> pd.DataFrame or pd.Series or pd.DatetimeIndex]
Dict mapping sids to objects representing dates on which earnings
occurred.
If a dict value is a Series, it's interpreted as a mapping from the
date on which we learned an announcement was coming to the date on
which the announcement was made.
If a dict value is a DatetimeIndex, it's interpreted as just containing
the dates that announcements were made, and we assume we knew about the
announcement on all prior dates. This mode is only supported if
``infer_timestamp`` is explicitly passed as a truthy value.
Dict mapping sids to DataFrames, Series, or DatetimeIndexes.
If the value is a DataFrame, it then represents dates on which events
occurred along with other associated values. If the DataFrame
contains a "timestamp" column, that column is interpreted as the date
on which we learned about the event. If the DataFrames do not contain a
"timestamp" column, we assume we knew about the event on all prior
dates. This mode is only supported if ``infer_timestamp`` is
explicitly passed as a truthy value.
infer_timestamps : bool, optional
Whether to allow omitting the "timestamp" column.
dataset : DataSet
The DataSet object for which this loader loads data.
"""
@abc.abstractproperty
def expected_cols(self):
raise NotImplemented('expected_cols')
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=None):
self.all_dates = all_dates
# Do not modify the original in place, since it may be used for other
# purposes.
self.events_by_sid = (
events_by_sid.copy()
)
dates = self.all_dates.values
for k, v in iteritems(events_by_sid):
# Already a DataFrame
if isinstance(v, pd.DataFrame):
if TS_FIELD_NAME not in v.columns:
if not infer_timestamps:
raise ValueError(
DF_NO_TS_NOT_INFER_TS_ERROR.format(
timestamp_column_name=TS_FIELD_NAME,
sid=k
)
)
self.events_by_sid[k] = v = v.copy()
v.index = [dates[0]] * len(v)
else:
self.events_by_sid[k] = v.set_index(TS_FIELD_NAME)
# Once data is in a DF, make sure columns are correct.
cols_except_ts = (set(v.columns) -
{TS_FIELD_NAME})
# Check that all columns other than timestamp are as expected.
if cols_except_ts != self.expected_cols:
raise ValueError(
WRONG_COLS_ERROR.format(
expected_columns=list(self.expected_cols),
sid=k,
resulting_columns=v.columns.values
)
)
# Not a DataFrame and we only expect 1 column
elif len(self.expected_cols) == 1:
# First, must convert to DataFrame.
if isinstance(v, pd.Series):
if not isinstance(v.index, pd.DatetimeIndex):
raise ValueError(
SERIES_NO_DTINDEX_ERROR.format(sid=k)
)
self.events_by_sid[k] = pd.DataFrame({
list(self.expected_cols)[0]: v})
elif isinstance(v, pd.DatetimeIndex):
if not infer_timestamps:
raise ValueError(
DTINDEX_NOT_INFER_TS_ERROR.format(sid=k)
)
self.events_by_sid[k] = pd.DataFrame({
list(self.expected_cols)[0]: v
}, index=[dates[0]] * len(v))
else:
# We expect 1 column, but we got something other than a
# Series, DatetimeIndex, or DataFrame.
raise ValueError(
WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=k)
)
else:
# We expected multiple columns, but we got something other
# than a DataFrame.
raise ValueError(
WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=k)
)
self.dataset = dataset
def get_loader(self, column):
if column in self.dataset.columns:
return getattr(self, "%s_loader" % column.name)
raise ValueError("Don't know how to load column '%s'." % column)
def load_adjusted_array(self, columns, dates, assets, mask):
return merge(
self.get_loader(column).load_adjusted_array(
[column], dates, assets, mask
)
for column in columns
)
def _next_event_date_loader(self, next_date_field, event_date_field_name):
return DataFrameLoader(
next_date_field,
next_date_frame(
self.all_dates,
self.events_by_sid,
event_date_field_name
),
adjustments=None,
)
def _previous_event_date_loader(self,
prev_date_field,
event_date_field_name):
return DataFrameLoader(
prev_date_field,
previous_date_frame(
self.all_dates,
self.events_by_sid,
event_date_field_name,
),
adjustments=None,
)
def _previous_event_value_loader(self,
previous_value_field,
event_date_field_name,
value_field_name):
return DataFrameLoader(
previous_value_field,
previous_value(
self.all_dates,
self.events_by_sid,
event_date_field_name,
value_field_name,
previous_value_field.dtype,
previous_value_field.missing_value
),
adjustments=None,
)
+66 -12
View File
@@ -5,10 +5,10 @@ import pandas as pd
from six import iteritems
from six.moves import zip
from zipline.utils.numpy_utils import NaTns
from zipline.utils.numpy_utils import NaTns, NaTD
def next_date_frame(dates, events_by_sid):
def next_date_frame(dates, events_by_sid, event_date_field_name):
"""
Make a DataFrame representing the simulated next known date for an event.
@@ -20,6 +20,9 @@ def next_date_frame(dates, events_by_sid):
Dict mapping sids to a series of dates. Each k:v pair of the series
represents the date we learned of the event mapping to the date the
event will occur.
event_date_field_name : str
The name of the date field that marks when the event occurred.
Returns
-------
next_events: pd.DataFrame
@@ -37,7 +40,8 @@ def next_date_frame(dates, events_by_sid):
equity: np.full_like(dates, NaTns) for equity in events_by_sid
}
raw_dates = dates.values
for equity, event_dates in iteritems(events_by_sid):
for equity, df in iteritems(events_by_sid):
event_dates = df[event_date_field_name]
data = cols[equity]
if not event_dates.index.is_monotonic_increasing:
event_dates = event_dates.sort_index()
@@ -56,7 +60,51 @@ def next_date_frame(dates, events_by_sid):
return pd.DataFrame(index=dates, data=cols)
def previous_date_frame(date_index, events_by_sid):
def previous_date_frame(date_index, events_by_sid, event_date_field_name):
"""
Make a DataFrame representing simulated next earnings date_index.
Parameters
----------
date_index : DatetimeIndex.
The index of the returned DataFrame.
events_by_sid : dict[int -> pd.DataFrame]
Dict mapping sids to a DataFrame. The index of the DataFrame
represents the date we learned of the event mapping to the event
data.
event_date_field_name : str
The name of the date field that marks when the event occurred.
Returns
-------
previous_events: pd.DataFrame
A DataFrame where each column is a security from `events_by_sid` where
the values are the dates of the previous event that occurred on the
date of the index. Entries falling before the first date will have
`NaT` as the result in the output.
See Also
--------
next_date_frame
"""
sids = list(events_by_sid)
out = np.full((len(date_index), len(sids)), NaTD, dtype='datetime64[ns]')
d_n = date_index[-1].asm8
for col_idx, sid in enumerate(sids):
# events_by_sid[sid] is Series mapping knowledge_date to actual
# event_date. We don't care about the knowledge date for
# computing previous earnings.
values = events_by_sid[sid][event_date_field_name].values
values = values[values <= d_n]
out[date_index.searchsorted(values), col_idx] = values
frame = pd.DataFrame(out, index=date_index, columns=sids)
frame.ffill(inplace=True)
return frame
def previous_value(date_index, events_by_sid, event_date_field, value_field,
value_field_dtype, missing_value):
"""
Make a DataFrame representing simulated next earnings date_index.
@@ -82,15 +130,21 @@ def previous_date_frame(date_index, events_by_sid):
next_date_frame
"""
sids = list(events_by_sid)
out = np.full((len(date_index), len(sids)), NaTns, dtype='datetime64[ns]')
dn = date_index[-1].asm8
out = np.full(
(len(date_index), len(sids)),
missing_value,
dtype=value_field_dtype
)
d_n = date_index[-1].asm8
for col_idx, sid in enumerate(sids):
# events_by_sid[sid] is Series mapping knowledge_date to actual
# event_date. We don't care about the knowledge date for
# computing previous earnings.
values = events_by_sid[sid].values
values = values[values <= dn]
out[date_index.searchsorted(values), col_idx] = values
# events_by_sid[sid] is DataFrame mapping knowledge_date to event
# date and value. We don't care about the knowledge date for computing
# previous values.
df = events_by_sid[sid]
df = df[df[event_date_field] <= d_n]
out[
date_index.searchsorted(df[event_date_field].values), col_idx
] = df[value_field]
frame = pd.DataFrame(out, index=date_index, columns=sids)
frame.ffill(inplace=True)