diff --git a/tests/pipeline/test_buyback_auth.py b/tests/pipeline/test_buyback_auth.py index 6c023e9c..26fff9a3 100644 --- a/tests/pipeline/test_buyback_auth.py +++ b/tests/pipeline/test_buyback_auth.py @@ -462,50 +462,3 @@ class BlazeCashBuybackAuthLoaderNotInteractiveTestCase( self, ).loader_args(dates) return swap_resources_into_scope(bound_expr, {}) - -dtx = pd.date_range('2014-01-01', '2014-01-10') - - -class BuybackAuthLoaderInferTimestampTestCase(TestCase): - # 'fields' needs to match expected fields for the given loader to - # satisfy column check in constructor. - @parameterized.expand([[CashBuybackAuthorizationsLoader, - {BUYBACK_ANNOUNCEMENT_FIELD_NAME: dtx, - CASH_FIELD_NAME: [0] * 10}], - [ShareBuybackAuthorizationsLoader, - {BUYBACK_ANNOUNCEMENT_FIELD_NAME: dtx, - SHARE_COUNT_FIELD_NAME: [0] * 10}]]) - def test_infer_timestamp(self, loader, fields): - events_by_sid = { - # No timestamp column - should index by first given date - 0: pd.DataFrame(fields), - # timestamp column exists - should index by it - 1: pd.DataFrame(dict(fields, **{TS_FIELD_NAME: dtx})) - } - loader = loader( - dtx, - events_by_sid, - infer_timestamps=True, - ) - self.assertEqual( - loader.events_by_sid.keys(), - events_by_sid.keys(), - ) - - # Check that index by first given date has been added - assert_series_equal( - loader.events_by_sid[0][BUYBACK_ANNOUNCEMENT_FIELD_NAME], - pd.Series(index=[dtx[0]] * 10, - data=dtx, - name=BUYBACK_ANNOUNCEMENT_FIELD_NAME), - ) - - # Check that timestamp column was turned into index - modified_events_by_sid_date_col = pd.Series(data=np.array( - events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME]), - index=events_by_sid[1][TS_FIELD_NAME], - name=BUYBACK_ANNOUNCEMENT_FIELD_NAME) - assert_series_equal( - loader.events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME], - modified_events_by_sid_date_col, - ) diff --git a/tests/pipeline/test_earnings.py b/tests/pipeline/test_earnings.py index 185801ba..7f9986b8 100644 --- a/tests/pipeline/test_earnings.py +++ b/tests/pipeline/test_earnings.py @@ -365,39 +365,3 @@ class BlazeEarningsCalendarLoaderNotInteractiveTestCase( self, ).loader_args(dates) return swap_resources_into_scope(bound_expr, {}) - - -class EarningsCalendarLoaderInferTimestampTestCase(TestCase): - def test_infer_timestamp(self): - dtx = pd.date_range('2014-01-01', '2014-01-10') - announcement_dates = { - 0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), - 1: pd.DataFrame( - {TS_FIELD_NAME: dtx, ANNOUNCEMENT_FIELD_NAME: dtx} - ), - } - loader = EarningsCalendarLoader( - dtx, - announcement_dates, - infer_timestamps=True, - ) - self.assertEqual( - loader.events_by_sid.keys(), - announcement_dates.keys(), - ) - assert_series_equal( - loader.events_by_sid[0].loc[:, ANNOUNCEMENT_FIELD_NAME], - pd.Series(index=[dtx[0]] * 10, - data=dtx, - name=ANNOUNCEMENT_FIELD_NAME), - ) - assert_series_equal( - loader.events_by_sid[1][ANNOUNCEMENT_FIELD_NAME], - pd.Series( - index=announcement_dates[1][TS_FIELD_NAME], - data=np.array( - announcement_dates[1][ANNOUNCEMENT_FIELD_NAME] - ), - name=ANNOUNCEMENT_FIELD_NAME - ) - ) diff --git a/tests/pipeline/test_events.py b/tests/pipeline/test_events.py index 8710fd29..62df1ea3 100644 --- a/tests/pipeline/test_events.py +++ b/tests/pipeline/test_events.py @@ -1 +1,191 @@ -__author__ = 'mtydykov' +""" +Tests for setting up an EventsLoader and a BlazeEventsLoader. +""" +from nose_parameterized import parameterized + +import blaze as bz +import pandas as pd +from pandas.util.testing import assert_series_equal, TestCase, assertRaises + +from zipline.pipeline.data import DataSet, Column +from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader +from zipline.pipeline.loaders.events import ( + BAD_DATA_FORMAT_ERROR, + DF_NO_TS_NOT_INFER_TS_ERROR, + DTINDEX_NOT_INFER_TS_ERROR, + EventsLoader, + SERIES_NO_DTINDEX_ERROR, + SID_FIELD_NAME, + TS_FIELD_NAME, + WRONG_COLS_ERROR, +) +from zipline.utils.memoize import lazyval +from zipline.utils.numpy_utils import datetime64ns_dtype + +ABSTRACT_METHODS_ERROR = 'abstract methods concrete_loader' + +DAYS_SINCE_PREV = 'days_since_prev' + +PREVIOUS_ANNOUNCEMENT = 'previous_announcement' + +ANNOUNCEMENT_FIELD_NAME = 'announcement_date' + + +class EventDataSet(DataSet): + previous_announcement = Column(datetime64ns_dtype) + + +class EventDataSetLoader(EventsLoader): + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=EventDataSet): + super(EventDataSetLoader, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset, + ) + + @property + def expected_cols(self): + return frozenset([ANNOUNCEMENT_FIELD_NAME]) + + @lazyval + def previous_announcement_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement, + ANNOUNCEMENT_FIELD_NAME, + ) + + @lazyval + def next_announcement_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement, + ANNOUNCEMENT_FIELD_NAME, + ) + + +class EventDataSetLoaderNoExpectedCols(EventsLoader): + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=EventDataSet): + super(EventDataSetLoaderNoExpectedCols, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset, + ) + + +dtx = pd.date_range('2014-01-01', '2014-01-10') + + +def assert_loader_error(events_by_sid, error, msg, infer_timestamps=True): + with assertRaises(error) as context: + EventDataSetLoader( + dtx, events_by_sid, infer_timestamps=infer_timestamps, + ) + assert msg in context.exception + + +class EventLoaderTestCase(TestCase): + + def test_no_expected_cols_defined(self): + events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})} + assert_loader_error(events_by_sid, TypeError, ABSTRACT_METHODS_ERROR) + + def test_wrong_cols(self): + wrong_col_name = 'some_other_col' + # Test wrong cols (cols != expected) + events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})} + assert_loader_error( + events_by_sid, ValueError, WRONG_COLS_ERROR % ( + EventDataSetLoader.expected_cols, 0, wrong_col_name + ) + ) + + @parameterized.expand([ + # DataFrame without timestamp column and infer_timestamps = True + [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), True], + # DataFrame with timestamp column + [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx, + TS_FIELD_NAME: dtx}), False], + # DatetimeIndex with infer_timestamps = True + [pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), True], + # Series with DatetimeIndex as index and infer_timestamps = False + [pd.Series(dtx, index=dtx, name=ANNOUNCEMENT_FIELD_NAME), False] + ]) + def test_conversion_to_df(self, df, infer_timestamps): + + events_by_sid = {0: df} + loader = EventDataSetLoader( + dtx, + events_by_sid, + infer_timestamps=infer_timestamps, + ) + self.assertEqual( + loader.events_by_sid.keys(), + events_by_sid.keys(), + ) + + if infer_timestamps: + expected = pd.Series(index=[dtx[0]] * 10, data=dtx, ) + else: + expected = pd.Series(index=dtx, data=dtx,) + # Check that index by first given date has been added + assert_series_equal( + loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME], + expected, + check_names=False + ) + + @parameterized.expand([ + # DataFrame without timestamp column and infer_timestamps = True + [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), False, + DF_NO_TS_NOT_INFER_TS_ERROR % (TS_FIELD_NAME, 0)], + # DatetimeIndex with infer_timestamps = False + [pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), False, + DTINDEX_NOT_INFER_TS_ERROR % 0], + # Series with DatetimeIndex as index and infer_timestamps = False + [pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME), False, + SERIES_NO_DTINDEX_ERROR % 0], + # Some other data structure that is not expected + [dtx, False, BAD_DATA_FORMAT_ERROR % 0], + [dtx, True, BAD_DATA_FORMAT_ERROR % 0] + ]) + def test_bad_conversion_to_df(self, df, infer_timestamps, msg): + events_by_sid = {0: df} + assert_loader_error(events_by_sid, ValueError, msg, + infer_timestamps=infer_timestamps) + + +class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader): + def __init__(self, + expr, + dataset=EventDataSet, + **kwargs): + super( + BlazeEventDataSetLoaderNoConcreteLoader, self + ).__init__(expr, + dataset=dataset, + **kwargs) + + +class BlazeEventLoaderTestCase(TestCase): + # Blaze loader: need to test failure if no concrete loader + def test_no_concrete_loader_defined(self): + with assertRaises(TypeError) as context: + BlazeEventDataSetLoaderNoConcreteLoader( + bz.Data( + pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx, + SID_FIELD_NAME: 0 + }) + ) + ) + assert ABSTRACT_METHODS_ERROR in context.exception diff --git a/zipline/pipeline/loaders/blaze/buyback_auth.py b/zipline/pipeline/loaders/blaze/buyback_auth.py index 42e00dd7..20dbfcda 100644 --- a/zipline/pipeline/loaders/blaze/buyback_auth.py +++ b/zipline/pipeline/loaders/blaze/buyback_auth.py @@ -11,10 +11,10 @@ from zipline.pipeline.loaders.buyback_auth import ( ShareBuybackAuthorizationsLoader, SHARE_COUNT_FIELD_NAME ) -from .events import BlazeEventsCalendarLoader +from .events import BlazeEventsLoader -class BlazeCashBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): +class BlazeCashBuybackAuthorizationsLoader(BlazeEventsLoader): """A pipeline loader for the ``CashBuybackAuthorizations`` dataset that loads data from a blaze expression. @@ -90,7 +90,7 @@ class BlazeCashBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): return CashBuybackAuthorizationsLoader -class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): +class BlazeShareBuybackAuthorizationsLoader(BlazeEventsLoader): """A pipeline loader for the ``ShareBuybackAuthorizations`` dataset that loads data from a blaze expression. @@ -143,7 +143,6 @@ class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): SHARE_COUNT_FIELD_NAME, }) - def __init__(self, expr, resources=None, @@ -164,4 +163,4 @@ class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): @property def concrete_loader(self): - return ShareBuybackAuthorizationsLoader \ No newline at end of file + return ShareBuybackAuthorizationsLoader diff --git a/zipline/pipeline/loaders/blaze/earnings.py b/zipline/pipeline/loaders/blaze/earnings.py index 8588ff03..06d16738 100644 --- a/zipline/pipeline/loaders/blaze/earnings.py +++ b/zipline/pipeline/loaders/blaze/earnings.py @@ -5,10 +5,10 @@ from .core import ( ) from zipline.pipeline.data import EarningsCalendar from zipline.pipeline.loaders.earnings import EarningsCalendarLoader -from .events import BlazeEventsCalendarLoader +from .events import BlazeEventsLoader -class BlazeEarningsCalendarLoader(BlazeEventsCalendarLoader): +class BlazeEarningsCalendarLoader(BlazeEventsLoader): """A pipeline loader for the ``EarningsCalendar`` dataset that loads data from a blaze expression. @@ -76,4 +76,4 @@ class BlazeEarningsCalendarLoader(BlazeEventsCalendarLoader): @property def concrete_loader(self): - return EarningsCalendarLoader \ No newline at end of file + return EarningsCalendarLoader diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py index 3da16744..70850251 100644 --- a/zipline/pipeline/loaders/blaze/events.py +++ b/zipline/pipeline/loaders/blaze/events.py @@ -18,7 +18,7 @@ from zipline.utils.input_validation import ensure_timezone, optionally from zipline.utils.preprocess import preprocess -class BlazeEventsCalendarLoader(PipelineLoader): +class BlazeEventsLoader(PipelineLoader): """An abstract pipeline loader for the events datasets that loads data from a blaze expression. @@ -82,7 +82,7 @@ class BlazeEventsCalendarLoader(PipelineLoader): @abc.abstractproperty def concrete_loader(self): - raise NotImplementedError("Must specify `concrete_loader`.") + pass def load_adjusted_array(self, columns, dates, assets, mask): data_query_time = self._data_query_time diff --git a/zipline/pipeline/loaders/buyback_auth.py b/zipline/pipeline/loaders/buyback_auth.py index ba207697..eb14689e 100644 --- a/zipline/pipeline/loaders/buyback_auth.py +++ b/zipline/pipeline/loaders/buyback_auth.py @@ -87,7 +87,6 @@ class ShareBuybackAuthorizationsLoader(EventsLoader): return frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME, SHARE_COUNT_FIELD_NAME]) - @lazyval def previous_share_count_loader(self): return self._previous_event_value_loader( diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index 1cf4f6f4..9c18611a 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -1,4 +1,4 @@ -import numpy as np +import abc import pandas as pd from six import iteritems from toolz import merge @@ -7,6 +7,23 @@ from .base import PipelineLoader from .frame import DataFrameLoader from .utils import next_date_frame, previous_date_frame, previous_value +WRONG_COLS_ERROR = "Expected columns %s for sid %s but got columns %s." + +BAD_DATA_FORMAT_ERROR = ("Data for sid %s must be in DataFrame, " + "Series, or DatetimeIndex.") + +SERIES_NO_DTINDEX_ERROR = ("Got Series for sid %d, but index was not " + "DatetimeIndex.") + +DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid %d.\n" + "Pass `infer_timestamps=True` to use the first " + "date in `all_dates` as implicit timestamp.") + +DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '%r' column for sid " + "%d.\nPass `infer_timestamps=True` to use the " + "first date in `all_dates` as implicit " + "timestamp.") + TS_FIELD_NAME = "timestamp" SID_FIELD_NAME = "sid" @@ -21,16 +38,29 @@ class EventsLoader(PipelineLoader): ---------- all_dates : pd.DatetimeIndex Index of dates for which we can serve queries. - events_by_sid : dict[int -> pd.DataFrame] - Dict mapping sids to DataFrames representing dates on which events - occurred along with other associated values. + events_by_sid : dict[int -> pd.DataFrame], dict[int -> pd.Series], + or dict[int -> pd.DatetimeIndex] + Dict mapping sids to objects representing dates on which earnings + occurred. - If the DataFrames contain a "timestamp" column, that column is - interpreted as the date on which we learned about the event. + If a dict value is a Series, it's interpreted as a mapping from the + date on which we learned an announcement was coming to the date on + which the announcement was made. + + If a dict value is a DatetimeIndex, it's interpreted as just containing + the dates that announcements were made, and we assume we knew about the + announcement on all prior dates. This mode is only supported if + ``infer_timestamp`` is explicitly passed as a truthy value. + Dict mapping sids to DataFrames, Series, or DatetimeIndexes. + + If the value is a DataFrame, it then represents dates on which events + occurred along with other associated values. If the DataFrame + contains a "timestamp" column, that column is interpreted as the date + on which we learned about the event. If the DataFrames do not contain a + "timestamp" column, we assume we knew about the event on all prior + dates. This mode is only supported if ``infer_timestamp`` is + explicitly passed as a truthy value. - If the DataFrames do not contain a "timestamp" column, we assume we - knew about the event on all prior dates. This mode is only supported - if ``infer_timestamp`` is explicitly passed as a truthy value. infer_timestamps : bool, optional Whether to allow omitting the "timestamp" column. dataset : DataSet @@ -39,12 +69,15 @@ class EventsLoader(PipelineLoader): Set of expected columns for the dataset, without timestamp. """ + @abc.abstractproperty + def expected_cols(self): + pass + def __init__(self, all_dates, events_by_sid, infer_timestamps=False, - dataset=None, - expected_cols=frozenset()): + dataset=None): self.all_dates = all_dates # Do not modify the original in place, since it may be used for other # purposes. @@ -56,25 +89,25 @@ class EventsLoader(PipelineLoader): for k, v in iteritems(events_by_sid): # First, must convert to DataFrame. if isinstance(v, pd.Series): - # If Series was passed, DateTime index is assumed. - self.events_by_sid[k] = pd.DataFrame(v) + if not isinstance(v.index, pd.DatetimeIndex): + raise ValueError( + SERIES_NO_DTINDEX_ERROR % k + ) + self.events_by_sid[k] = v = pd.DataFrame(v) elif isinstance(v, pd.DatetimeIndex): if not infer_timestamps: raise ValueError( - "Got DatetimeIndex for sid %d.\n" - "Pass `infer_timestamps=True` to use the first date in" - " `all_dates` as implicit timestamp." % k + DTINDEX_NOT_INFER_TS_ERROR % k ) - self.events_by_sid[k] = pd.DataFrame(v) - v.index = [dates[0]] * len(v) + self.events_by_sid[k] = v = pd.DataFrame( + v, index=[dates[0]] * len(v) + ) # Already a DataFrame elif isinstance(v, pd.DataFrame): if TS_FIELD_NAME not in v.columns: if not infer_timestamps: raise ValueError( - "Got DataFrame without a '%s' column for sid %d.\n" - "Pass `infer_timestamps=True` to use the first " - "date in `all_dates` as implicit timestamp." % + DF_NO_TS_NOT_INFER_TS_ERROR % (TS_FIELD_NAME, k) ) self.events_by_sid[k] = v = v.copy() @@ -82,17 +115,16 @@ class EventsLoader(PipelineLoader): else: self.events_by_sid[k] = v.set_index(TS_FIELD_NAME) else: - raise ValueError("Data for sid %s must be in DataFrame, " - "Series, or DatetimeIndex." % k) + raise ValueError(BAD_DATA_FORMAT_ERROR % k) # Once data is in a DF, make sure columns are correct. - cols_except_ts = (set(v.columns.values) - + cols_except_ts = (set(v.columns) - {TS_FIELD_NAME} - {SID_FIELD_NAME}) # Check that all columns other than timestamp are as expected. - if cols_except_ts != expected_cols: + if cols_except_ts != self.expected_cols: raise ValueError( - "Expected columns %s for sid %s but got columns %s." % - (expected_cols, k, v.columns.values) + WRONG_COLS_ERROR % + (self.expected_cols, k, v.columns.values) ) self.dataset = dataset @@ -109,17 +141,13 @@ class EventsLoader(PipelineLoader): for column in columns ) - def mk_date_series(self, date_field_name): - return {sid: pd.Series(index=event.index, - data=np.array(event[date_field_name])) - for sid, event in iteritems(self.events_by_sid)} - def _next_event_date_loader(self, next_date_field, event_date_field_name): return DataFrameLoader( next_date_field, next_date_frame( self.all_dates, - self.mk_date_series(event_date_field_name), + self.events_by_sid, + event_date_field_name ), adjustments=None, ) @@ -131,7 +159,8 @@ class EventsLoader(PipelineLoader): prev_date_field, previous_date_frame( self.all_dates, - self.mk_date_series(event_date_field_name), + self.events_by_sid, + event_date_field_name, ), adjustments=None, ) diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 1dd685dc..73d0ad3f 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -8,7 +8,7 @@ from six.moves import zip from zipline.utils.numpy_utils import NaTns, NaTD -def next_date_frame(dates, events_by_sid): +def next_date_frame(dates, events_by_sid, event_date_field_name): """ Make a DataFrame representing the simulated next known date for an event. @@ -20,6 +20,9 @@ def next_date_frame(dates, events_by_sid): Dict mapping sids to a series of dates. Each k:v pair of the series represents the date we learned of the event mapping to the date the event will occur. + event_date_field_name : str + The name of the date field that marks when the event occurred. + Returns ------- next_events: pd.DataFrame @@ -37,7 +40,8 @@ def next_date_frame(dates, events_by_sid): equity: np.full_like(dates, NaTns) for equity in events_by_sid } raw_dates = dates.values - for equity, event_dates in iteritems(events_by_sid): + for equity, df in iteritems(events_by_sid): + event_dates = df[event_date_field_name] data = cols[equity] if not event_dates.index.is_monotonic_increasing: event_dates = event_dates.sort_index() @@ -56,7 +60,7 @@ def next_date_frame(dates, events_by_sid): return pd.DataFrame(index=dates, data=cols) -def previous_date_frame(date_index, events_by_sid): +def previous_date_frame(date_index, events_by_sid, event_date_field_name): """ Make a DataFrame representing simulated next earnings date_index. @@ -64,18 +68,20 @@ def previous_date_frame(date_index, events_by_sid): ---------- date_index : DatetimeIndex. The index of the returned DataFrame. - events_by_sid : dict[int -> DatetimeIndex] - Dict mapping sids to a series of dates. Each k:v pair of the series - represents the date we learned of the event mapping to the date the - event will occur. + events_by_sid : dict[int -> pd.DataFrame] + Dict mapping sids to a DataFrame. The index of the DataFrame + represents the date we learned of the event mapping to the event + data. + event_date_field_name : str + The name of the date field that marks when the event occurred. Returns ------- previous_events: pd.DataFrame A DataFrame where each column is a security from `events_by_sid` where - the values are the dates of the previous event that occured on the date - of the index. Entries falling before the first date will have `NaT` as - the result in the output. + the values are the dates of the previous event that occurred on the + date of the index. Entries falling before the first date will have + `NaT` as the result in the output. See Also -------- @@ -88,7 +94,7 @@ def previous_date_frame(date_index, events_by_sid): # events_by_sid[sid] is Series mapping knowledge_date to actual # event_date. We don't care about the knowledge date for # computing previous earnings. - values = events_by_sid[sid].values + values = events_by_sid[sid][event_date_field_name].values values = values[values <= d_n] out[date_index.searchsorted(values), col_idx] = values