MAINT: modify next_date_frame and prev_date_frame to mirror previous_value.

MAINT: clean up and improve docs.

BUG: fix imports.

MAINT: refactor test.

MAINT: change class name.

MAINT: remove error since won't be reached.

TST: improve and expand tests.

MAINT: change class name.

MAINT: change class name.

MAINT: extract string constants and remove error that won't be reached.

STY: fix line length.

MAINT: undo name change.
This commit is contained in:
Maya Tydykov
2016-02-19 11:48:54 -05:00
parent 2fe126135c
commit ae922bf3ee
9 changed files with 280 additions and 140 deletions
-47
View File
@@ -462,50 +462,3 @@ class BlazeCashBuybackAuthLoaderNotInteractiveTestCase(
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
dtx = pd.date_range('2014-01-01', '2014-01-10')
class BuybackAuthLoaderInferTimestampTestCase(TestCase):
# 'fields' needs to match expected fields for the given loader to
# satisfy column check in constructor.
@parameterized.expand([[CashBuybackAuthorizationsLoader,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME: dtx,
CASH_FIELD_NAME: [0] * 10}],
[ShareBuybackAuthorizationsLoader,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME: dtx,
SHARE_COUNT_FIELD_NAME: [0] * 10}]])
def test_infer_timestamp(self, loader, fields):
events_by_sid = {
# No timestamp column - should index by first given date
0: pd.DataFrame(fields),
# timestamp column exists - should index by it
1: pd.DataFrame(dict(fields, **{TS_FIELD_NAME: dtx}))
}
loader = loader(
dtx,
events_by_sid,
infer_timestamps=True,
)
self.assertEqual(
loader.events_by_sid.keys(),
events_by_sid.keys(),
)
# Check that index by first given date has been added
assert_series_equal(
loader.events_by_sid[0][BUYBACK_ANNOUNCEMENT_FIELD_NAME],
pd.Series(index=[dtx[0]] * 10,
data=dtx,
name=BUYBACK_ANNOUNCEMENT_FIELD_NAME),
)
# Check that timestamp column was turned into index
modified_events_by_sid_date_col = pd.Series(data=np.array(
events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME]),
index=events_by_sid[1][TS_FIELD_NAME],
name=BUYBACK_ANNOUNCEMENT_FIELD_NAME)
assert_series_equal(
loader.events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME],
modified_events_by_sid_date_col,
)
-36
View File
@@ -365,39 +365,3 @@ class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
self,
).loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class EarningsCalendarLoaderInferTimestampTestCase(TestCase):
def test_infer_timestamp(self):
dtx = pd.date_range('2014-01-01', '2014-01-10')
announcement_dates = {
0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}),
1: pd.DataFrame(
{TS_FIELD_NAME: dtx, ANNOUNCEMENT_FIELD_NAME: dtx}
),
}
loader = EarningsCalendarLoader(
dtx,
announcement_dates,
infer_timestamps=True,
)
self.assertEqual(
loader.events_by_sid.keys(),
announcement_dates.keys(),
)
assert_series_equal(
loader.events_by_sid[0].loc[:, ANNOUNCEMENT_FIELD_NAME],
pd.Series(index=[dtx[0]] * 10,
data=dtx,
name=ANNOUNCEMENT_FIELD_NAME),
)
assert_series_equal(
loader.events_by_sid[1][ANNOUNCEMENT_FIELD_NAME],
pd.Series(
index=announcement_dates[1][TS_FIELD_NAME],
data=np.array(
announcement_dates[1][ANNOUNCEMENT_FIELD_NAME]
),
name=ANNOUNCEMENT_FIELD_NAME
)
)
+191 -1
View File
@@ -1 +1,191 @@
__author__ = 'mtydykov'
"""
Tests for setting up an EventsLoader and a BlazeEventsLoader.
"""
from nose_parameterized import parameterized
import blaze as bz
import pandas as pd
from pandas.util.testing import assert_series_equal, TestCase, assertRaises
from zipline.pipeline.data import DataSet, Column
from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader
from zipline.pipeline.loaders.events import (
BAD_DATA_FORMAT_ERROR,
DF_NO_TS_NOT_INFER_TS_ERROR,
DTINDEX_NOT_INFER_TS_ERROR,
EventsLoader,
SERIES_NO_DTINDEX_ERROR,
SID_FIELD_NAME,
TS_FIELD_NAME,
WRONG_COLS_ERROR,
)
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import datetime64ns_dtype
ABSTRACT_METHODS_ERROR = 'abstract methods concrete_loader'
DAYS_SINCE_PREV = 'days_since_prev'
PREVIOUS_ANNOUNCEMENT = 'previous_announcement'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
class EventDataSet(DataSet):
previous_announcement = Column(datetime64ns_dtype)
class EventDataSetLoader(EventsLoader):
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=EventDataSet):
super(EventDataSetLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
@property
def expected_cols(self):
return frozenset([ANNOUNCEMENT_FIELD_NAME])
@lazyval
def previous_announcement_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement,
ANNOUNCEMENT_FIELD_NAME,
)
@lazyval
def next_announcement_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement,
ANNOUNCEMENT_FIELD_NAME,
)
class EventDataSetLoaderNoExpectedCols(EventsLoader):
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=EventDataSet):
super(EventDataSetLoaderNoExpectedCols, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
dtx = pd.date_range('2014-01-01', '2014-01-10')
def assert_loader_error(events_by_sid, error, msg, infer_timestamps=True):
with assertRaises(error) as context:
EventDataSetLoader(
dtx, events_by_sid, infer_timestamps=infer_timestamps,
)
assert msg in context.exception
class EventLoaderTestCase(TestCase):
def test_no_expected_cols_defined(self):
events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})}
assert_loader_error(events_by_sid, TypeError, ABSTRACT_METHODS_ERROR)
def test_wrong_cols(self):
wrong_col_name = 'some_other_col'
# Test wrong cols (cols != expected)
events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})}
assert_loader_error(
events_by_sid, ValueError, WRONG_COLS_ERROR % (
EventDataSetLoader.expected_cols, 0, wrong_col_name
)
)
@parameterized.expand([
# DataFrame without timestamp column and infer_timestamps = True
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), True],
# DataFrame with timestamp column
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
TS_FIELD_NAME: dtx}), False],
# DatetimeIndex with infer_timestamps = True
[pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), True],
# Series with DatetimeIndex as index and infer_timestamps = False
[pd.Series(dtx, index=dtx, name=ANNOUNCEMENT_FIELD_NAME), False]
])
def test_conversion_to_df(self, df, infer_timestamps):
events_by_sid = {0: df}
loader = EventDataSetLoader(
dtx,
events_by_sid,
infer_timestamps=infer_timestamps,
)
self.assertEqual(
loader.events_by_sid.keys(),
events_by_sid.keys(),
)
if infer_timestamps:
expected = pd.Series(index=[dtx[0]] * 10, data=dtx, )
else:
expected = pd.Series(index=dtx, data=dtx,)
# Check that index by first given date has been added
assert_series_equal(
loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME],
expected,
check_names=False
)
@parameterized.expand([
# DataFrame without timestamp column and infer_timestamps = True
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), False,
DF_NO_TS_NOT_INFER_TS_ERROR % (TS_FIELD_NAME, 0)],
# DatetimeIndex with infer_timestamps = False
[pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), False,
DTINDEX_NOT_INFER_TS_ERROR % 0],
# Series with DatetimeIndex as index and infer_timestamps = False
[pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME), False,
SERIES_NO_DTINDEX_ERROR % 0],
# Some other data structure that is not expected
[dtx, False, BAD_DATA_FORMAT_ERROR % 0],
[dtx, True, BAD_DATA_FORMAT_ERROR % 0]
])
def test_bad_conversion_to_df(self, df, infer_timestamps, msg):
events_by_sid = {0: df}
assert_loader_error(events_by_sid, ValueError, msg,
infer_timestamps=infer_timestamps)
class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader):
def __init__(self,
expr,
dataset=EventDataSet,
**kwargs):
super(
BlazeEventDataSetLoaderNoConcreteLoader, self
).__init__(expr,
dataset=dataset,
**kwargs)
class BlazeEventLoaderTestCase(TestCase):
# Blaze loader: need to test failure if no concrete loader
def test_no_concrete_loader_defined(self):
with assertRaises(TypeError) as context:
BlazeEventDataSetLoaderNoConcreteLoader(
bz.Data(
pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
SID_FIELD_NAME: 0
})
)
)
assert ABSTRACT_METHODS_ERROR in context.exception
@@ -11,10 +11,10 @@ from zipline.pipeline.loaders.buyback_auth import (
ShareBuybackAuthorizationsLoader,
SHARE_COUNT_FIELD_NAME
)
from .events import BlazeEventsCalendarLoader
from .events import BlazeEventsLoader
class BlazeCashBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
class BlazeCashBuybackAuthorizationsLoader(BlazeEventsLoader):
"""A pipeline loader for the ``CashBuybackAuthorizations`` dataset that loads
data from a blaze expression.
@@ -90,7 +90,7 @@ class BlazeCashBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
return CashBuybackAuthorizationsLoader
class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
class BlazeShareBuybackAuthorizationsLoader(BlazeEventsLoader):
"""A pipeline loader for the ``ShareBuybackAuthorizations`` dataset that loads
data from a blaze expression.
@@ -143,7 +143,6 @@ class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
SHARE_COUNT_FIELD_NAME,
})
def __init__(self,
expr,
resources=None,
@@ -164,4 +163,4 @@ class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
@property
def concrete_loader(self):
return ShareBuybackAuthorizationsLoader
return ShareBuybackAuthorizationsLoader
+3 -3
View File
@@ -5,10 +5,10 @@ from .core import (
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from .events import BlazeEventsCalendarLoader
from .events import BlazeEventsLoader
class BlazeEarningsCalendarLoader(BlazeEventsCalendarLoader):
class BlazeEarningsCalendarLoader(BlazeEventsLoader):
"""A pipeline loader for the ``EarningsCalendar`` dataset that loads
data from a blaze expression.
@@ -76,4 +76,4 @@ class BlazeEarningsCalendarLoader(BlazeEventsCalendarLoader):
@property
def concrete_loader(self):
return EarningsCalendarLoader
return EarningsCalendarLoader
+2 -2
View File
@@ -18,7 +18,7 @@ from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
class BlazeEventsCalendarLoader(PipelineLoader):
class BlazeEventsLoader(PipelineLoader):
"""An abstract pipeline loader for the events datasets that loads
data from a blaze expression.
@@ -82,7 +82,7 @@ class BlazeEventsCalendarLoader(PipelineLoader):
@abc.abstractproperty
def concrete_loader(self):
raise NotImplementedError("Must specify `concrete_loader`.")
pass
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
-1
View File
@@ -87,7 +87,6 @@ class ShareBuybackAuthorizationsLoader(EventsLoader):
return frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME])
@lazyval
def previous_share_count_loader(self):
return self._previous_event_value_loader(
+63 -34
View File
@@ -1,4 +1,4 @@
import numpy as np
import abc
import pandas as pd
from six import iteritems
from toolz import merge
@@ -7,6 +7,23 @@ from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import next_date_frame, previous_date_frame, previous_value
WRONG_COLS_ERROR = "Expected columns %s for sid %s but got columns %s."
BAD_DATA_FORMAT_ERROR = ("Data for sid %s must be in DataFrame, "
"Series, or DatetimeIndex.")
SERIES_NO_DTINDEX_ERROR = ("Got Series for sid %d, but index was not "
"DatetimeIndex.")
DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid %d.\n"
"Pass `infer_timestamps=True` to use the first "
"date in `all_dates` as implicit timestamp.")
DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '%r' column for sid "
"%d.\nPass `infer_timestamps=True` to use the "
"first date in `all_dates` as implicit "
"timestamp.")
TS_FIELD_NAME = "timestamp"
SID_FIELD_NAME = "sid"
@@ -21,16 +38,29 @@ class EventsLoader(PipelineLoader):
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
events_by_sid : dict[int -> pd.DataFrame]
Dict mapping sids to DataFrames representing dates on which events
occurred along with other associated values.
events_by_sid : dict[int -> pd.DataFrame], dict[int -> pd.Series],
or dict[int -> pd.DatetimeIndex]
Dict mapping sids to objects representing dates on which earnings
occurred.
If the DataFrames contain a "timestamp" column, that column is
interpreted as the date on which we learned about the event.
If a dict value is a Series, it's interpreted as a mapping from the
date on which we learned an announcement was coming to the date on
which the announcement was made.
If a dict value is a DatetimeIndex, it's interpreted as just containing
the dates that announcements were made, and we assume we knew about the
announcement on all prior dates. This mode is only supported if
``infer_timestamp`` is explicitly passed as a truthy value.
Dict mapping sids to DataFrames, Series, or DatetimeIndexes.
If the value is a DataFrame, it then represents dates on which events
occurred along with other associated values. If the DataFrame
contains a "timestamp" column, that column is interpreted as the date
on which we learned about the event. If the DataFrames do not contain a
"timestamp" column, we assume we knew about the event on all prior
dates. This mode is only supported if ``infer_timestamp`` is
explicitly passed as a truthy value.
If the DataFrames do not contain a "timestamp" column, we assume we
knew about the event on all prior dates. This mode is only supported
if ``infer_timestamp`` is explicitly passed as a truthy value.
infer_timestamps : bool, optional
Whether to allow omitting the "timestamp" column.
dataset : DataSet
@@ -39,12 +69,15 @@ class EventsLoader(PipelineLoader):
Set of expected columns for the dataset, without timestamp.
"""
@abc.abstractproperty
def expected_cols(self):
pass
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=None,
expected_cols=frozenset()):
dataset=None):
self.all_dates = all_dates
# Do not modify the original in place, since it may be used for other
# purposes.
@@ -56,25 +89,25 @@ class EventsLoader(PipelineLoader):
for k, v in iteritems(events_by_sid):
# First, must convert to DataFrame.
if isinstance(v, pd.Series):
# If Series was passed, DateTime index is assumed.
self.events_by_sid[k] = pd.DataFrame(v)
if not isinstance(v.index, pd.DatetimeIndex):
raise ValueError(
SERIES_NO_DTINDEX_ERROR % k
)
self.events_by_sid[k] = v = pd.DataFrame(v)
elif isinstance(v, pd.DatetimeIndex):
if not infer_timestamps:
raise ValueError(
"Got DatetimeIndex for sid %d.\n"
"Pass `infer_timestamps=True` to use the first date in"
" `all_dates` as implicit timestamp." % k
DTINDEX_NOT_INFER_TS_ERROR % k
)
self.events_by_sid[k] = pd.DataFrame(v)
v.index = [dates[0]] * len(v)
self.events_by_sid[k] = v = pd.DataFrame(
v, index=[dates[0]] * len(v)
)
# Already a DataFrame
elif isinstance(v, pd.DataFrame):
if TS_FIELD_NAME not in v.columns:
if not infer_timestamps:
raise ValueError(
"Got DataFrame without a '%s' column for sid %d.\n"
"Pass `infer_timestamps=True` to use the first "
"date in `all_dates` as implicit timestamp." %
DF_NO_TS_NOT_INFER_TS_ERROR %
(TS_FIELD_NAME, k)
)
self.events_by_sid[k] = v = v.copy()
@@ -82,17 +115,16 @@ class EventsLoader(PipelineLoader):
else:
self.events_by_sid[k] = v.set_index(TS_FIELD_NAME)
else:
raise ValueError("Data for sid %s must be in DataFrame, "
"Series, or DatetimeIndex." % k)
raise ValueError(BAD_DATA_FORMAT_ERROR % k)
# Once data is in a DF, make sure columns are correct.
cols_except_ts = (set(v.columns.values) -
cols_except_ts = (set(v.columns) -
{TS_FIELD_NAME} -
{SID_FIELD_NAME})
# Check that all columns other than timestamp are as expected.
if cols_except_ts != expected_cols:
if cols_except_ts != self.expected_cols:
raise ValueError(
"Expected columns %s for sid %s but got columns %s." %
(expected_cols, k, v.columns.values)
WRONG_COLS_ERROR %
(self.expected_cols, k, v.columns.values)
)
self.dataset = dataset
@@ -109,17 +141,13 @@ class EventsLoader(PipelineLoader):
for column in columns
)
def mk_date_series(self, date_field_name):
return {sid: pd.Series(index=event.index,
data=np.array(event[date_field_name]))
for sid, event in iteritems(self.events_by_sid)}
def _next_event_date_loader(self, next_date_field, event_date_field_name):
return DataFrameLoader(
next_date_field,
next_date_frame(
self.all_dates,
self.mk_date_series(event_date_field_name),
self.events_by_sid,
event_date_field_name
),
adjustments=None,
)
@@ -131,7 +159,8 @@ class EventsLoader(PipelineLoader):
prev_date_field,
previous_date_frame(
self.all_dates,
self.mk_date_series(event_date_field_name),
self.events_by_sid,
event_date_field_name,
),
adjustments=None,
)
+17 -11
View File
@@ -8,7 +8,7 @@ from six.moves import zip
from zipline.utils.numpy_utils import NaTns, NaTD
def next_date_frame(dates, events_by_sid):
def next_date_frame(dates, events_by_sid, event_date_field_name):
"""
Make a DataFrame representing the simulated next known date for an event.
@@ -20,6 +20,9 @@ def next_date_frame(dates, events_by_sid):
Dict mapping sids to a series of dates. Each k:v pair of the series
represents the date we learned of the event mapping to the date the
event will occur.
event_date_field_name : str
The name of the date field that marks when the event occurred.
Returns
-------
next_events: pd.DataFrame
@@ -37,7 +40,8 @@ def next_date_frame(dates, events_by_sid):
equity: np.full_like(dates, NaTns) for equity in events_by_sid
}
raw_dates = dates.values
for equity, event_dates in iteritems(events_by_sid):
for equity, df in iteritems(events_by_sid):
event_dates = df[event_date_field_name]
data = cols[equity]
if not event_dates.index.is_monotonic_increasing:
event_dates = event_dates.sort_index()
@@ -56,7 +60,7 @@ def next_date_frame(dates, events_by_sid):
return pd.DataFrame(index=dates, data=cols)
def previous_date_frame(date_index, events_by_sid):
def previous_date_frame(date_index, events_by_sid, event_date_field_name):
"""
Make a DataFrame representing simulated next earnings date_index.
@@ -64,18 +68,20 @@ def previous_date_frame(date_index, events_by_sid):
----------
date_index : DatetimeIndex.
The index of the returned DataFrame.
events_by_sid : dict[int -> DatetimeIndex]
Dict mapping sids to a series of dates. Each k:v pair of the series
represents the date we learned of the event mapping to the date the
event will occur.
events_by_sid : dict[int -> pd.DataFrame]
Dict mapping sids to a DataFrame. The index of the DataFrame
represents the date we learned of the event mapping to the event
data.
event_date_field_name : str
The name of the date field that marks when the event occurred.
Returns
-------
previous_events: pd.DataFrame
A DataFrame where each column is a security from `events_by_sid` where
the values are the dates of the previous event that occured on the date
of the index. Entries falling before the first date will have `NaT` as
the result in the output.
the values are the dates of the previous event that occurred on the
date of the index. Entries falling before the first date will have
`NaT` as the result in the output.
See Also
--------
@@ -88,7 +94,7 @@ def previous_date_frame(date_index, events_by_sid):
# events_by_sid[sid] is Series mapping knowledge_date to actual
# event_date. We don't care about the knowledge date for
# computing previous earnings.
values = events_by_sid[sid].values
values = events_by_sid[sid][event_date_field_name].values
values = values[values <= d_n]
out[date_index.searchsorted(values), col_idx] = values