MAINT: refactor constants to common file.

TST: refactor constants and clarify comments.

TST: clean up/extract constants in tests.

MAINT: add/modify constants.

MAINT: remove obsolete and alphabetize.

MAINT: clean up.

MAINT: modify constants to have named params.

MAINT: extract constants.

STY: fix indentation.

MAINT: refactor common part out of buyback_auth.

MAINT: refactor earnings test logic.
This commit is contained in:
Maya Tydykov
2016-02-22 09:15:02 -05:00
parent ae922bf3ee
commit e257dc1da9
12 changed files with 403 additions and 595 deletions
+62 -224
View File
@@ -12,8 +12,19 @@ import numpy as np
import pandas as pd
from pandas.util.testing import assert_series_equal
from six import iteritems
from tests.pipeline.test_events import param_dates, EventLoaderCommonTest
from zipline.pipeline import Pipeline
from zipline.pipeline.common import(
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME,
DAYS_SINCE_PREV,
PREVIOUS_BUYBACK_ANNOUNCEMENT,
PREVIOUS_BUYBACK_CASH,
PREVIOUS_BUYBACK_SHARE_COUNT,
SHARE_COUNT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME)
from zipline.pipeline.data import (CashBuybackAuthorizations,
ShareBuybackAuthorizations)
from zipline.pipeline.engine import SimplePipelineEngine
@@ -26,11 +37,6 @@ from zipline.pipeline.loaders.buyback_auth import \
from zipline.pipeline.loaders.blaze import (
BlazeCashBuybackAuthorizationsLoader,
BlazeShareBuybackAuthorizationsLoader,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
CASH_FIELD_NAME
)
from zipline.utils.numpy_utils import make_datetime64D, NaTD
from zipline.utils.test_utils import (
@@ -41,34 +47,26 @@ from zipline.utils.test_utils import (
)
sids = A, B, C, D, E = range(5)
equity_info = make_simple_equity_info(
sids,
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
buyback_authorizations = {
# K1--K2--A1--A2--SC1--SC2--V1--V2.
A: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
buyback_authorizations = [
# K1--K2--A1--A2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15',
'2014-01-20']),
SHARE_COUNT_FIELD_NAME: [1, 15],
CASH_FIELD_NAME: [10, 20]
}),
# K1--K2--E2--E1.
B: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
# K1--K2--A2--A1.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([
'2014-01-20', '2014-01-15'
]),
SHARE_COUNT_FIELD_NAME: [7, 13], CASH_FIELD_NAME: [10, 22]
}),
# K1--E1--K2--E2.
C: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-15']),
# K1--A1--K2--A2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([
'2014-01-10', '2014-01-20'
]),
@@ -76,196 +74,34 @@ buyback_authorizations = {
CASH_FIELD_NAME: [4, 7]
}),
# K1 == K2.
D: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05'] * 2),
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([
'2014-01-10', '2014-01-15'
]),
SHARE_COUNT_FIELD_NAME: [6, 23],
CASH_FIELD_NAME: [1, 2]
}),
E: pd.DataFrame(
columns=["timestamp",
pd.DataFrame(
columns=[TS_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
CASH_FIELD_NAME],
dtype='datetime64[ns]'
),
}
# Must be a list - can't use generator since this needs to be used more than
# once.
param_dates = list(gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
], utc=True),
))
]
def zip_with_floats(dates, flts):
return pd.Series(flts, index=dates).astype('float')
def num_days_between(dates, start_date, end_date):
return num_days_in_range(dates, start_date, end_date)
def zip_with_dates(index_dates, dts):
return pd.Series(pd.to_datetime(dts), index=index_dates)
class BuybackAuthLoaderCommonTest(object):
"""
Tests for loading the buyback authorization announcement data.
"""
def loader_args(self, dates):
"""Construct the base buyback authorizations object to pass to the
loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.buyback_authorizations
def setup_engine(self, dates):
"""
Make a Pipeline Enigne object based on the given dates.
"""
loader = self.loader_type(*self.loader_args(dates))
return SimplePipelineEngine(lambda _: loader, dates, self.finder)
def setup_expected_cols(self, dates):
"""
Make expectation functions for the given dates calendar.
This exists to make it easy to test our various cases with critical
dates missing from the calendar.
"""
num_days_between_for_dates = partial(num_days_between, dates)
zip_with_dates_for_dates = partial(zip_with_dates, dates)
_expected_previous_buyback_announcement = pd.DataFrame({
A: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
B: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
C: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-09') +
['2014-01-10'] * num_days_between_for_dates('2014-01-10',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
D: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-09') +
['2014-01-10'] * num_days_between_for_dates('2014-01-10',
'2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
None),
),
E: zip_with_dates_for_dates(['NaT'] * len(dates)),
}, index=dates)
_expected_previous_busday_offsets = self._compute_busday_offsets(
_expected_previous_buyback_announcement
)
# Common cols for buyback authorization datasets are announcement
# date and days since previous.
self.cols[
'previous_buyback_announcement'
] = _expected_previous_buyback_announcement
self.cols['days_since_prev'] = _expected_previous_busday_offsets
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == NaTD
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
def _test_compute_buyback_auth(self, dates):
engine = self.setup_engine(dates)
self.setup_expected_cols(dates)
pipe = Pipeline(
columns=self.pipeline_columns
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
for sid in sids:
for col_name in self.cols.keys():
assert_series_equal(result[col_name].xs(sid, level=1),
self.cols[col_name][sid],
check_names=False)
class CashBuybackAuthLoaderTestCase(TestCase, BuybackAuthLoaderCommonTest):
class CashBuybackAuthLoaderTestCase(TestCase, EventLoaderCommonTest):
"""
Test for cash buyback authorizations dataset.
"""
pipeline_columns = {
'previous_buyback_cash':
('%s' % PREVIOUS_BUYBACK_CASH):
CashBuybackAuthorizations.previous_value.latest,
'previous_buyback_announcement':
PREVIOUS_BUYBACK_ANNOUNCEMENT:
CashBuybackAuthorizations.previous_announcement_date.latest,
'days_since_prev':
DAYS_SINCE_PREV:
BusinessDaysSincePreviousCashBuybackAuth(),
}
@@ -273,12 +109,11 @@ class CashBuybackAuthLoaderTestCase(TestCase, BuybackAuthLoaderCommonTest):
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
tmp_asset_finder(equities=cls.equity_info),
)
cls.cols = {}
cls.buyback_authorizations = {sid: df.drop(SHARE_COUNT_FIELD_NAME, 1)
for sid, df in
iteritems(buyback_authorizations)}
cls.dataset = {sid: df.drop(SHARE_COUNT_FIELD_NAME, 1)
for sid, df in enumerate(buyback_authorizations)}
cls.loader_type = CashBuybackAuthorizationsLoader
@classmethod
@@ -286,51 +121,53 @@ class CashBuybackAuthLoaderTestCase(TestCase, BuybackAuthLoaderCommonTest):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(zip_with_floats, dates)
num_days_between_dates = partial(num_days_between, dates)
super(CashBuybackAuthLoaderTestCase, self).setup_expected_cols(dates)
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
_expected_previous_cash = pd.DataFrame({
# TODO if the next knowledge date is 10, why is the range
# until 15?
A: zip_with_floats_dates(
['NaN'] * num_days_between(dates, None, '2014-01-14') +
0: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[10] * num_days_between_dates('2014-01-15', '2014-01-19') +
[20] * num_days_between_dates('2014-01-20', None)
),
B: zip_with_floats_dates(
1: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[22] * num_days_between_dates('2014-01-15', '2014-01-19') +
[10] * num_days_between_dates('2014-01-20', None)
),
C: zip_with_floats_dates(
2: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[4] * num_days_between_dates('2014-01-10', '2014-01-19') +
[7] * num_days_between_dates('2014-01-20', None)
),
D: zip_with_floats_dates(
3: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[1] * num_days_between_dates('2014-01-10', '2014-01-14') +
[2] * num_days_between_dates('2014-01-15', None)
),
E: zip_with_floats_dates(['NaN'] * len(dates)),
4: zip_with_floats_dates(['NaN'] * len(dates)),
}, index=dates)
self.cols['previous_buyback_cash'] = _expected_previous_cash
self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] = self.get_expected_previous(
dates)
self.cols[PREVIOUS_BUYBACK_CASH] = _expected_previous_cash
@parameterized.expand(param_dates)
def test_compute_cash_buyback_auth(self, dates):
self._test_compute_buyback_auth(dates)
class ShareBuybackAuthLoaderTestCase(BuybackAuthLoaderCommonTest, TestCase):
class ShareBuybackAuthLoaderTestCase(EventLoaderCommonTest, TestCase):
"""
Test for share buyback authorizations dataset.
"""
pipeline_columns = {
'previous_buyback_share_count':
('%s' % PREVIOUS_BUYBACK_SHARE_COUNT):
ShareBuybackAuthorizations.previous_share_count.latest,
'previous_buyback_announcement':
('%s' % PREVIOUS_BUYBACK_ANNOUNCEMENT):
ShareBuybackAuthorizations.previous_announcement_date.latest,
'days_since_prev':
DAYS_SINCE_PREV:
BusinessDaysSincePreviousShareBuybackAuth(),
}
@@ -338,12 +175,12 @@ class ShareBuybackAuthLoaderTestCase(BuybackAuthLoaderCommonTest, TestCase):
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
tmp_asset_finder(equities=cls.equity_info),
)
cls.cols = {}
cls.buyback_authorizations = {sid: df.drop(CASH_FIELD_NAME, 1)
cls.dataset = {sid: df.drop(CASH_FIELD_NAME, 1)
for sid, df in
iteritems(buyback_authorizations)}
enumerate(buyback_authorizations)}
cls.loader_type = ShareBuybackAuthorizationsLoader
@classmethod
@@ -351,35 +188,36 @@ class ShareBuybackAuthLoaderTestCase(BuybackAuthLoaderCommonTest, TestCase):
cls._cleanup_stack.close()
def setup(self, dates):
zip_with_floats_dates = partial(zip_with_floats, dates)
num_days_between_dates = partial(num_days_between, dates)
super(ShareBuybackAuthLoaderTestCase, self).setup_expected_cols(dates)
zip_with_floats_dates = partial(self.zip_with_floats, dates)
num_days_between_dates = partial(self.num_days_between, dates)
_expected_previous_buyback_share_count = pd.DataFrame({
A: zip_with_floats_dates(
0: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[1] * num_days_between_dates('2014-01-15', '2014-01-19') +
[15] * num_days_between_dates('2014-01-20', None)
),
B: zip_with_floats_dates(
1: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-14') +
[13] * num_days_between_dates('2014-01-15', '2014-01-19') +
[7] * num_days_between_dates('2014-01-20', None)
),
C: zip_with_floats_dates(
2: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[3] * num_days_between_dates('2014-01-10', '2014-01-19') +
[1] * num_days_between_dates('2014-01-20', None)
),
D: zip_with_floats_dates(
3: zip_with_floats_dates(
['NaN'] * num_days_between_dates(None, '2014-01-09') +
[6] * num_days_between_dates('2014-01-10', '2014-01-14') +
[23] * num_days_between_dates('2014-01-15', None)
),
E: zip_with_floats_dates(['NaN'] * len(dates)),
4: zip_with_floats_dates(['NaN'] * len(dates)),
}, index=dates)
self.cols[
'previous_buyback_share_count'
PREVIOUS_BUYBACK_SHARE_COUNT
] = _expected_previous_buyback_share_count
self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] = \
self.get_expected_previous(dates)
@parameterized.expand(param_dates)
def test_compute_share_buyback_auth(self, dates):
+68 -278
View File
@@ -7,173 +7,101 @@ import blaze as bz
from blaze.compute.core import swap_resources_into_scope
from contextlib2 import ExitStack
from nose_parameterized import parameterized
import numpy as np
import pandas as pd
from pandas.util.testing import assert_series_equal
from six import iteritems
from tests.pipeline.test_events import EventLoaderCommonTest, param_dates
from zipline.pipeline import Pipeline
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
DAYS_SINCE_PREV,
DAYS_TO_NEXT,
NEXT_ANNOUNCEMENT,
PREVIOUS_ANNOUNCEMENT,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.factors.events import (
BusinessDaysSincePreviousEarnings,
BusinessDaysUntilNextEarnings,
)
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from zipline.pipeline.loaders.blaze import (
ANNOUNCEMENT_FIELD_NAME,
BlazeEarningsCalendarLoader,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.utils.numpy_utils import make_datetime64D, NaTD
from zipline.utils.test_utils import (
gen_calendars,
make_simple_equity_info,
num_days_in_range,
tmp_asset_finder,
)
earnings_dates = [
# K1--K2--E1--E2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15',
'2014-01-20'])
}),
# K1--K2--E2--E1.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-20',
'2014-01-15'])
}),
# K1--E1--K2--E2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10',
'2014-01-20'])
}),
# K1 == K2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10',
'2014-01-15'])
}),
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime([]),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([])
})
]
class EarningsCalendarLoaderTestCase(TestCase):
class EarningsCalendarLoaderTestCase(TestCase, EventLoaderCommonTest):
"""
Tests for loading the earnings announcement data.
"""
loader_type = EarningsCalendarLoader
pipeline_columns = {
NEXT_ANNOUNCEMENT: EarningsCalendar.next_announcement.latest,
PREVIOUS_ANNOUNCEMENT: EarningsCalendar.previous_announcement.latest,
DAYS_SINCE_PREV: BusinessDaysSincePreviousEarnings(),
DAYS_TO_NEXT: BusinessDaysUntilNextEarnings(),
}
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.sids = A, B, C, D, E = range(5)
equity_info = make_simple_equity_info(
cls.sids,
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
cls.cols = {}
cls.dataset = {sid: df for sid, df in enumerate(earnings_dates)}
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
)
cls.earnings_dates = {
# K1--K2--E1--E2.
A: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15',
'2014-01-20'])
}),
# K1--K2--E2--E1.
B: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-20',
'2014-01-15'])
}),
# K1--E1--K2--E2.
C: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-15']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10',
'2014-01-20'])
}),
# K1 == K2.
D: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05'] * 2),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10',
'2014-01-15'])
}),
E: pd.DataFrame({
"timestamp": pd.to_datetime([]),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([])
})
}
cls.loader_type = EarningsCalendarLoader
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def loader_args(self, dates):
"""Construct the base earnings announcements object to pass to the
loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.earnings_dates
def setup(self, dates):
"""
Make a PipelineEngine and expectation functions for the given dates
calendar.
_expected_next_announce = self.get_expected_next_event_dates(dates)
This exists to make it easy to test our various cases with critical
dates missing from the calendar.
"""
A, B, C, D, E = self.sids
def num_days_between(start_date, end_date):
return num_days_in_range(dates, start_date, end_date)
def zip_with_dates(dts):
return pd.Series(pd.to_datetime(dts), index=dates)
_expected_next_announce = pd.DataFrame({
A: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-15'] * num_days_between('2014-01-05', '2014-01-15') +
['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
['NaT'] * num_days_between('2014-01-21', None)
),
B: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-20'] * num_days_between('2014-01-05', '2014-01-09') +
['2014-01-15'] * num_days_between('2014-01-10', '2014-01-15') +
['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') +
['NaT'] * num_days_between('2014-01-21', None)
),
C: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
['NaT'] * num_days_between('2014-01-11', '2014-01-14') +
['2014-01-20'] * num_days_between('2014-01-15', '2014-01-20') +
['NaT'] * num_days_between('2014-01-21', None)
),
D: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') +
['2014-01-15'] * num_days_between('2014-01-11', '2014-01-15') +
['NaT'] * num_days_between('2014-01-16', None)
),
E: zip_with_dates(['NaT'] * len(dates)),
}, index=dates)
_expected_previous_announce = pd.DataFrame({
A: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-14') +
['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
['2014-01-20'] * num_days_between('2014-01-20', None)
),
B: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-14') +
['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') +
['2014-01-20'] * num_days_between('2014-01-20', None)
),
C: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-09') +
['2014-01-10'] * num_days_between('2014-01-10', '2014-01-19') +
['2014-01-20'] * num_days_between('2014-01-20', None)
),
D: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-09') +
['2014-01-10'] * num_days_between('2014-01-10', '2014-01-14') +
['2014-01-15'] * num_days_between('2014-01-15', None)
),
E: zip_with_dates(['NaT'] * len(dates)),
}, index=dates)
_expected_previous_announce = self.get_expected_previous_event_dates(dates)
_expected_next_busday_offsets = self._compute_busday_offsets(
_expected_next_announce
@@ -181,164 +109,21 @@ class EarningsCalendarLoaderTestCase(TestCase):
_expected_previous_busday_offsets = self._compute_busday_offsets(
_expected_previous_announce
)
self.cols[PREVIOUS_ANNOUNCEMENT] = _expected_previous_announce
self.cols[NEXT_ANNOUNCEMENT] = _expected_next_announce
self.cols[DAYS_TO_NEXT] = _expected_next_busday_offsets
self.cols[DAYS_SINCE_PREV] = _expected_previous_busday_offsets
def expected_next_announce(sid):
"""
Return the expected next announcement dates for ``sid``.
"""
return _expected_next_announce[sid]
def expected_next_busday_offset(sid):
"""
Return the expected number of days to the next announcement for
``sid``.
"""
return _expected_next_busday_offsets[sid]
def expected_previous_announce(sid):
"""
Return the expected previous announcement dates for ``sid``.
"""
return _expected_previous_announce[sid]
def expected_previous_busday_offset(sid):
"""
Return the expected number of days to the next announcement for
``sid``.
"""
return _expected_previous_busday_offsets[sid]
loader = self.loader_type(*self.loader_args(dates))
engine = SimplePipelineEngine(lambda _: loader, dates, self.finder)
return (
engine,
expected_next_announce,
expected_next_busday_offset,
expected_previous_announce,
expected_previous_busday_offset,
)
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == NaTD
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
@parameterized.expand(gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
], utc=True),
))
@parameterized.expand(param_dates)
def test_compute_earnings(self, dates):
(
engine,
expected_next,
expected_next_busday_offset,
expected_previous,
expected_previous_busday_offset,
) = self.setup(dates)
pipe = Pipeline(
columns={
'next': EarningsCalendar.next_announcement.latest,
'previous': EarningsCalendar.previous_announcement.latest,
'days_to_next': BusinessDaysUntilNextEarnings(),
'days_since_prev': BusinessDaysSincePreviousEarnings(),
}
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
computed_next = result['next']
computed_previous = result['previous']
computed_next_busday_offset = result['days_to_next']
computed_previous_busday_offset = result['days_since_prev']
# NaTs in next/prev should correspond to NaNs in offsets.
assert_series_equal(
computed_next.isnull(),
computed_next_busday_offset.isnull(),
check_names=False,
)
assert_series_equal(
computed_previous.isnull(),
computed_previous_busday_offset.isnull(),
check_names=False,
)
for sid in self.sids:
assert_series_equal(
computed_next.xs(sid, level=1),
expected_next(sid),
sid,
check_names=False,
)
assert_series_equal(
computed_previous.xs(sid, level=1),
expected_previous(sid),
sid,
check_names=False,
)
assert_series_equal(
computed_next_busday_offset.xs(sid, level=1),
expected_next_busday_offset(sid),
sid,
check_names=False,
)
assert_series_equal(
computed_previous_busday_offset.xs(sid, level=1),
expected_previous_busday_offset(sid),
sid,
check_names=False,
)
self._test_compute(dates)
class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
loader_type = BlazeEarningsCalendarLoader
@classmethod
def setUpClass(cls):
super(BlazeEarningsCalendarLoaderTestCase, cls).setUpClass()
cls.loader_type = BlazeEarningsCalendarLoader
def loader_args(self, dates):
_, mapping = super(
@@ -359,6 +144,11 @@ class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
BlazeEarningsCalendarLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
@classmethod
def setUpClass(cls):
super(BlazeEarningsCalendarLoaderNotInteractiveTestCase, cls).setUpClass()
cls.loader_type = BlazeEarningsCalendarLoader
def loader_args(self, dates):
(bound_expr,) = super(
BlazeEarningsCalendarLoaderNotInteractiveTestCase,
+201 -28
View File
@@ -1,12 +1,20 @@
"""
Tests for setting up an EventsLoader and a BlazeEventsLoader.
"""
from functools import partial
from nose_parameterized import parameterized
import blaze as bz
import numpy as np
import pandas as pd
from pandas.util.testing import assert_series_equal, TestCase, assertRaises
from pandas.util.testing import assert_series_equal, TestCase
from zipline.pipeline import SimplePipelineEngine, Pipeline
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data import DataSet, Column
from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader
from zipline.pipeline.loaders.events import (
@@ -15,21 +23,15 @@ from zipline.pipeline.loaders.events import (
DTINDEX_NOT_INFER_TS_ERROR,
EventsLoader,
SERIES_NO_DTINDEX_ERROR,
SID_FIELD_NAME,
TS_FIELD_NAME,
WRONG_COLS_ERROR,
)
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import datetime64ns_dtype
from zipline.utils.numpy_utils import datetime64ns_dtype, NaTD, make_datetime64D
from zipline.utils.test_utils import gen_calendars, num_days_in_range, \
make_simple_equity_info
ABSTRACT_METHODS_ERROR = 'abstract methods concrete_loader'
DAYS_SINCE_PREV = 'days_since_prev'
PREVIOUS_ANNOUNCEMENT = 'previous_announcement'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
class EventDataSet(DataSet):
previous_announcement = Column(datetime64ns_dtype)
@@ -86,28 +88,32 @@ class EventDataSetLoaderNoExpectedCols(EventsLoader):
dtx = pd.date_range('2014-01-01', '2014-01-10')
def assert_loader_error(events_by_sid, error, msg, infer_timestamps=True):
with assertRaises(error) as context:
def assert_loader_error(events_by_sid, error, msg, infer_timestamps):
with TestCase.assertRaises(error) as context:
EventDataSetLoader(
dtx, events_by_sid, infer_timestamps=infer_timestamps,
)
assert msg in context.exception
TestCase.assertTrue(msg in context.exception)
class EventLoaderTestCase(TestCase):
def test_no_expected_cols_defined(self):
events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})}
assert_loader_error(events_by_sid, TypeError, ABSTRACT_METHODS_ERROR)
assert_loader_error(events_by_sid, TypeError, ABSTRACT_METHODS_ERROR,
True)
def test_wrong_cols(self):
wrong_col_name = 'some_other_col'
# Test wrong cols (cols != expected)
events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})}
assert_loader_error(
events_by_sid, ValueError, WRONG_COLS_ERROR % (
EventDataSetLoader.expected_cols, 0, wrong_col_name
)
events_by_sid, ValueError, WRONG_COLS_ERROR.format(
expected_columns=EventDataSetLoader.expected_cols,
sid=0,
resulting_columns=wrong_col_name,
),
True
)
@parameterized.expand([
@@ -135,29 +141,36 @@ class EventLoaderTestCase(TestCase):
)
if infer_timestamps:
expected = pd.Series(index=[dtx[0]] * 10, data=dtx, )
expected = pd.Series(index=[dtx[0]] * 10, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME)
else:
expected = pd.Series(index=dtx, data=dtx,)
expected = pd.Series(index=dtx, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME)
expected.index.name = TS_FIELD_NAME
# Check that index by first given date has been added
assert_series_equal(
loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME],
expected,
check_names=False
)
@parameterized.expand([
# DataFrame without timestamp column and infer_timestamps = True
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), False,
DF_NO_TS_NOT_INFER_TS_ERROR % (TS_FIELD_NAME, 0)],
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}),
False,
DF_NO_TS_NOT_INFER_TS_ERROR.format(
timestamp_column_name=TS_FIELD_NAME,
sid=0
)
],
# DatetimeIndex with infer_timestamps = False
[pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), False,
DTINDEX_NOT_INFER_TS_ERROR % 0],
DTINDEX_NOT_INFER_TS_ERROR.format(sid=0)],
# Series with DatetimeIndex as index and infer_timestamps = False
[pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME), False,
SERIES_NO_DTINDEX_ERROR % 0],
SERIES_NO_DTINDEX_ERROR.format(sid=0)],
# Some other data structure that is not expected
[dtx, False, BAD_DATA_FORMAT_ERROR % 0],
[dtx, True, BAD_DATA_FORMAT_ERROR % 0]
[dtx, False, BAD_DATA_FORMAT_ERROR.format(sid=0)],
[dtx, True, BAD_DATA_FORMAT_ERROR.format(sid=0)]
])
def test_bad_conversion_to_df(self, df, infer_timestamps, msg):
events_by_sid = {0: df}
@@ -180,7 +193,7 @@ class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader):
class BlazeEventLoaderTestCase(TestCase):
# Blaze loader: need to test failure if no concrete loader
def test_no_concrete_loader_defined(self):
with assertRaises(TypeError) as context:
with TestCase.assertRaises(TypeError) as context:
BlazeEventDataSetLoaderNoConcreteLoader(
bz.Data(
pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
@@ -188,4 +201,164 @@ class BlazeEventLoaderTestCase(TestCase):
})
)
)
assert ABSTRACT_METHODS_ERROR in context.exception
TestCase.assertTrue(ABSTRACT_METHODS_ERROR in context.exception)
##########################
# Must be a list - can't use generator since this needs to be used more than
# once.
param_dates = list(gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
], utc=True),
))
class EventLoaderCommonTest(object):
sids = A, B, C, D, E = range(5)
equity_info = make_simple_equity_info(
sids,
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
def zip_with_floats(self, dates, flts):
return pd.Series(flts, index=dates).astype('float')
def num_days_between(self, dates, start_date, end_date):
return num_days_in_range(dates, start_date, end_date)
def zip_with_dates(self, index_dates, dts):
return pd.Series(pd.to_datetime(dts), index=index_dates)
def loader_args(self, dates):
"""Construct the base object to pass to the loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.dataset
def setup_engine(self, dates):
"""
Make a Pipeline Enigne object based on the given dates.
"""
loader = self.loader_type(*self.loader_args(dates))
return SimplePipelineEngine(lambda _: loader, dates, self.finder)
def get_expected_previous(self, dates):
num_days_between_for_dates = partial(self.num_days_between, dates)
zip_with_dates_for_dates = partial(self.zip_with_dates, dates)
return pd.DataFrame({
0: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
1: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
2: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-09') +
['2014-01-10'] * num_days_between_for_dates('2014-01-10',
'2014-01-19') +
['2014-01-20'] * num_days_between_for_dates('2014-01-20',
None),
),
3: zip_with_dates_for_dates(
['NaT'] * num_days_between_for_dates(None, '2014-01-09') +
['2014-01-10'] * num_days_between_for_dates('2014-01-10',
'2014-01-14') +
['2014-01-15'] * num_days_between_for_dates('2014-01-15',
None),
),
4: zip_with_dates_for_dates(['NaT'] * len(dates)),
}, index=dates)
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == NaTD
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
def _test_compute_buyback_auth(self, dates):
engine = self.setup_engine(dates)
self.setup(dates)
pipe = Pipeline(
columns=self.pipeline_columns
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
for sid in self.sids:
for col_name in self.cols.keys():
assert_series_equal(result[col_name].xs(sid, level=1),
self.cols[col_name][sid],
check_names=False)
+17
View File
@@ -0,0 +1,17 @@
"""
Common constants for Pipeline.
"""
AD_FIELD_NAME = 'asof_date'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
CASH_FIELD_NAME = 'cash'
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date'
DAYS_SINCE_PREV = 'days_since_prev'
DAYS_TO_NEXT = 'days_to_next'
NEXT_ANNOUNCEMENT = 'next_announcement'
PREVIOUS_ANNOUNCEMENT = 'previous_announcement'
PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement'
PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash'
PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count'
SHARE_COUNT_FIELD_NAME = 'share_count'
SID_FIELD_NAME = 'sid'
TS_FIELD_NAME = 'timestamp'
+2 -17
View File
@@ -4,37 +4,22 @@ from .buyback_auth import (
BlazeShareBuybackAuthorizationsLoader
)
from .core import (
AD_FIELD_NAME,
BlazeLoader,
NoDeltasWarning,
SID_FIELD_NAME,
TS_FIELD_NAME,
from_blaze,
global_loader,
)
from .buyback_auth import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
CASH_FIELD_NAME
)
from .earnings import (
ANNOUNCEMENT_FIELD_NAME,
BlazeEarningsCalendarLoader,
)
__all__ = (
'AD_FIELD_NAME',
'ANNOUNCEMENT_FIELD_NAME',
'BlazeCashBuybackAuthorizationsLoader',
'BlazeEarningsCalendarLoader',
'BlazeLoader',
'BlazeShareBuybackAuthorizationsLoader',
'BUYBACK_ANNOUNCEMENT_FIELD_NAME',
'NoDeltasWarning',
'SHARE_COUNT_FIELD_NAME',
'SID_FIELD_NAME',
'TS_FIELD_NAME',
'CASH_FIELD_NAME',
'from_blaze',
'global_loader',
'NoDeltasWarning',
)
@@ -2,14 +2,16 @@ from .core import (
TS_FIELD_NAME,
SID_FIELD_NAME,
)
from zipline.pipeline.common import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME,
SHARE_COUNT_FIELD_NAME
)
from zipline.pipeline.data import (CashBuybackAuthorizations,
ShareBuybackAuthorizations)
from zipline.pipeline.loaders.buyback_auth import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CashBuybackAuthorizationsLoader,
CASH_FIELD_NAME,
ShareBuybackAuthorizationsLoader,
SHARE_COUNT_FIELD_NAME
)
from .events import BlazeEventsLoader
+5 -5
View File
@@ -158,7 +158,11 @@ from toolz import (
)
import toolz.curried.operator as op
from zipline.pipeline.common import (
AD_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data.dataset import DataSet, Column
from zipline.pipeline.loaders.utils import (
check_data_query_args,
@@ -179,10 +183,6 @@ from zipline.utils.pandas_utils import sort_values
from zipline.utils.preprocess import preprocess
AD_FIELD_NAME = 'asof_date'
TS_FIELD_NAME = 'timestamp'
SID_FIELD_NAME = 'sid'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
valid_deltas_node_types = (
bz.expr.Field,
bz.expr.ReLabel,
+2 -4
View File
@@ -1,7 +1,7 @@
from .core import (
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
SID_FIELD_NAME
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
@@ -26,8 +26,6 @@ class BlazeEarningsCalendarLoader(BlazeEventsLoader):
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
concrete_loader: EventsLoader
The reference loader to use for this dataset.
Notes
-----
+7 -4
View File
@@ -3,12 +3,14 @@ import abc
from datashape import istabular
from .core import (
TS_FIELD_NAME,
SID_FIELD_NAME,
bind_expression_to_resources,
ffill_query_in_range,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.common import (
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
@@ -36,6 +38,7 @@ class BlazeEventsLoader(PipelineLoader):
The timezeone to use for the data query cutoff.
dataset : DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
@@ -47,7 +50,7 @@ class BlazeEventsLoader(PipelineLoader):
And other dataset-specific fields, where each row of the table is a
record including the sid to identify the company, the timestamp where we
learned about the announcement, and the date when the earnings will be
learned about the announcement, and the date when the earnings will be z
announced.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
@@ -82,7 +85,7 @@ class BlazeEventsLoader(PipelineLoader):
@abc.abstractproperty
def concrete_loader(self):
pass
NotImplementedError('concrete_loader')
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
+9 -9
View File
@@ -1,5 +1,5 @@
"""
Reference implementation for EarningsCalendar loaders.
Reference implementation for buyback auth loaders.
"""
from ..data.buyback_auth import (
@@ -7,18 +7,18 @@ from ..data.buyback_auth import (
ShareBuybackAuthorizations
)
from .events import EventsLoader
from zipline.pipeline.common import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CASH_FIELD_NAME,
SHARE_COUNT_FIELD_NAME
)
from zipline.utils.memoize import lazyval
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date'
SHARE_COUNT_FIELD_NAME = 'share_count'
CASH_FIELD_NAME = 'cash'
class CashBuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.earnings.CashBuybackAuthorizations`.
:class:`zipline.pipeline.data.CashBuybackAuthorizations`.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, cash value)]
@@ -60,13 +60,13 @@ class CashBuybackAuthorizationsLoader(EventsLoader):
class ShareBuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.earnings.ShareBuybackAuthorizations`.
:class:`zipline.pipeline.data.ShareBuybackAuthorizations`.
Does not currently support adjustments to the dates of known buyback
authorizations.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, share value)]
event date, share value)]
"""
+1 -2
View File
@@ -4,10 +4,9 @@ Reference implementation for EarningsCalendar loaders.
from ..data.earnings import EarningsCalendar
from .events import EventsLoader
from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME
from zipline.utils.memoize import lazyval
ANNOUNCEMENT_FIELD_NAME = "announcement_date"
class EarningsCalendarLoader(EventsLoader):
+24 -21
View File
@@ -6,27 +6,27 @@ from toolz import merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import next_date_frame, previous_date_frame, previous_value
from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME
WRONG_COLS_ERROR = "Expected columns %s for sid %s but got columns %s."
WRONG_COLS_ERROR = "Expected columns {expected_columns} for sid {sid} but " \
"got columns {resulting_columns}."
BAD_DATA_FORMAT_ERROR = ("Data for sid %s must be in DataFrame, "
BAD_DATA_FORMAT_ERROR = ("Data for sid {sid} must be in DataFrame, "
"Series, or DatetimeIndex.")
SERIES_NO_DTINDEX_ERROR = ("Got Series for sid %d, but index was not "
SERIES_NO_DTINDEX_ERROR = ("Got Series for sid {sid}, but index was not "
"DatetimeIndex.")
DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid %d.\n"
DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid {sid}.\n"
"Pass `infer_timestamps=True` to use the first "
"date in `all_dates` as implicit timestamp.")
DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '%r' column for sid "
"%d.\nPass `infer_timestamps=True` to use the "
DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '{"
"timestamp_column_name}' column for sid {sid}."
"\nPass `infer_timestamps=True` to use the "
"first date in `all_dates` as implicit "
"timestamp.")
TS_FIELD_NAME = "timestamp"
SID_FIELD_NAME = "sid"
class EventsLoader(PipelineLoader):
"""
@@ -38,8 +38,7 @@ class EventsLoader(PipelineLoader):
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
events_by_sid : dict[int -> pd.DataFrame], dict[int -> pd.Series],
or dict[int -> pd.DatetimeIndex]
events_by_sid : dict[int -> pd.DataFrame or pd.Series or pd.DatetimeIndex]
Dict mapping sids to objects representing dates on which earnings
occurred.
@@ -65,13 +64,12 @@ class EventsLoader(PipelineLoader):
Whether to allow omitting the "timestamp" column.
dataset : DataSet
The DataSet object for which this loader loads data.
expected_cols : frozenset
Set of expected columns for the dataset, without timestamp.
"""
@abc.abstractproperty
def expected_cols(self):
pass
raise NotImplemented('expected_cols')
def __init__(self,
all_dates,
@@ -91,13 +89,13 @@ class EventsLoader(PipelineLoader):
if isinstance(v, pd.Series):
if not isinstance(v.index, pd.DatetimeIndex):
raise ValueError(
SERIES_NO_DTINDEX_ERROR % k
SERIES_NO_DTINDEX_ERROR.format(sid=k)
)
self.events_by_sid[k] = v = pd.DataFrame(v)
elif isinstance(v, pd.DatetimeIndex):
if not infer_timestamps:
raise ValueError(
DTINDEX_NOT_INFER_TS_ERROR % k
DTINDEX_NOT_INFER_TS_ERROR.format(sid=k)
)
self.events_by_sid[k] = v = pd.DataFrame(
v, index=[dates[0]] * len(v)
@@ -107,15 +105,17 @@ class EventsLoader(PipelineLoader):
if TS_FIELD_NAME not in v.columns:
if not infer_timestamps:
raise ValueError(
DF_NO_TS_NOT_INFER_TS_ERROR %
(TS_FIELD_NAME, k)
DF_NO_TS_NOT_INFER_TS_ERROR.format(
timestamp_column_name=TS_FIELD_NAME,
sid=k
)
)
self.events_by_sid[k] = v = v.copy()
v.index = [dates[0]] * len(v)
else:
self.events_by_sid[k] = v.set_index(TS_FIELD_NAME)
else:
raise ValueError(BAD_DATA_FORMAT_ERROR % k)
raise ValueError(BAD_DATA_FORMAT_ERROR.format(sid=k))
# Once data is in a DF, make sure columns are correct.
cols_except_ts = (set(v.columns) -
{TS_FIELD_NAME} -
@@ -123,8 +123,11 @@ class EventsLoader(PipelineLoader):
# Check that all columns other than timestamp are as expected.
if cols_except_ts != self.expected_cols:
raise ValueError(
WRONG_COLS_ERROR %
(self.expected_cols, k, v.columns.values)
WRONG_COLS_ERROR .format(
expected_columns=self.expected_cols,
sid=k,
resulting_columns=v.columns.values
)
)
self.dataset = dataset