diff --git a/tests/pipeline/test_buyback_auth.py b/tests/pipeline/test_buyback_auth.py new file mode 100644 index 00000000..fbdd9797 --- /dev/null +++ b/tests/pipeline/test_buyback_auth.py @@ -0,0 +1,286 @@ +""" +Tests for the reference loader for Buyback Authorizations. +""" +from functools import partial +from unittest import TestCase + +import blaze as bz +from blaze.compute.core import swap_resources_into_scope +from contextlib2 import ExitStack +import pandas as pd +from six import iteritems +from tests.pipeline.test_events import EventLoaderCommonMixin, DATE_FIELD_NAME + +from zipline.pipeline.common import( + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME, + DAYS_SINCE_PREV, + PREVIOUS_BUYBACK_ANNOUNCEMENT, + PREVIOUS_BUYBACK_CASH, + PREVIOUS_BUYBACK_SHARE_COUNT, + SHARE_COUNT_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME) +from zipline.pipeline.data import (CashBuybackAuthorizations, + ShareBuybackAuthorizations) +from zipline.pipeline.factors.events import ( + BusinessDaysSincePreviousCashBuybackAuth, + BusinessDaysSincePreviousShareBuybackAuth +) +from zipline.pipeline.loaders.buyback_auth import \ + CashBuybackAuthorizationsLoader, ShareBuybackAuthorizationsLoader +from zipline.pipeline.loaders.blaze import ( + BlazeCashBuybackAuthorizationsLoader, + BlazeShareBuybackAuthorizationsLoader, +) +from zipline.utils.test_utils import ( + tmp_asset_finder, +) + + +buyback_authorizations = [ + # K1--K2--A1--A2. + pd.DataFrame({ + SHARE_COUNT_FIELD_NAME: [1, 15], + CASH_FIELD_NAME: [10, 20] + }), + # K1--K2--A2--A1. + pd.DataFrame({ + SHARE_COUNT_FIELD_NAME: [7, 13], + CASH_FIELD_NAME: [10, 22] + }), + # K1--A1--K2--A2. + pd.DataFrame({ + SHARE_COUNT_FIELD_NAME: [3, 1], + CASH_FIELD_NAME: [4, 7] + }), + # K1 == K2. + pd.DataFrame({ + SHARE_COUNT_FIELD_NAME: [6, 23], + CASH_FIELD_NAME: [1, 2] + }), + pd.DataFrame( + columns=[SHARE_COUNT_FIELD_NAME, + CASH_FIELD_NAME], + dtype='datetime64[ns]' + ), +] + + +def create_buyback_auth_tst_frame(cases, field_to_drop): + buyback_auth_df = { + sid: + pd.concat([df, buyback_authorizations[sid]], axis=1).drop( + field_to_drop, 1) + for sid, df + in enumerate(case.rename(columns={DATE_FIELD_NAME: + BUYBACK_ANNOUNCEMENT_FIELD_NAME} + ) + for case in cases + ) + } + return buyback_auth_df + + +class CashBuybackAuthLoaderTestCase(TestCase, EventLoaderCommonMixin): + """ + Test for cash buyback authorizations dataset. + """ + pipeline_columns = { + PREVIOUS_BUYBACK_CASH: + CashBuybackAuthorizations.previous_value.latest, + PREVIOUS_BUYBACK_ANNOUNCEMENT: + CashBuybackAuthorizations.previous_announcement_date.latest, + DAYS_SINCE_PREV: + BusinessDaysSincePreviousCashBuybackAuth(), + } + + @classmethod + def setUpClass(cls): + cls._cleanup_stack = stack = ExitStack() + cls.finder = stack.enter_context( + tmp_asset_finder(equities=cls.equity_info), + ) + cls.cols = {} + cls.dataset = create_buyback_auth_tst_frame(cls.event_dates_cases, + SHARE_COUNT_FIELD_NAME) + cls.loader_type = CashBuybackAuthorizationsLoader + + @classmethod + def tearDownClass(cls): + cls._cleanup_stack.close() + + def setup(self, dates): + zip_with_floats_dates = partial(self.zip_with_floats, dates) + num_days_between_dates = partial(self.num_days_between, dates) + _expected_previous_cash = pd.DataFrame({ + 0: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-14') + + [10] * num_days_between_dates('2014-01-15', '2014-01-19') + + [20] * num_days_between_dates('2014-01-20', None) + ), + 1: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-14') + + [22] * num_days_between_dates('2014-01-15', '2014-01-19') + + [10] * num_days_between_dates('2014-01-20', None) + ), + 2: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-09') + + [4] * num_days_between_dates('2014-01-10', '2014-01-19') + + [7] * num_days_between_dates('2014-01-20', None) + ), + 3: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-09') + + [1] * num_days_between_dates('2014-01-10', '2014-01-14') + + [2] * num_days_between_dates('2014-01-15', None) + ), + 4: zip_with_floats_dates(['NaN'] * len(dates)), + }, index=dates) + self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] = \ + self.get_expected_previous_event_dates(dates) + self.cols[PREVIOUS_BUYBACK_CASH] = _expected_previous_cash + self.cols[DAYS_SINCE_PREV] = self._compute_busday_offsets( + self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] + ) + + +class ShareBuybackAuthLoaderTestCase(TestCase, EventLoaderCommonMixin): + """ + Test for share buyback authorizations dataset. + """ + pipeline_columns = { + PREVIOUS_BUYBACK_SHARE_COUNT: + ShareBuybackAuthorizations.previous_share_count.latest, + PREVIOUS_BUYBACK_ANNOUNCEMENT: + ShareBuybackAuthorizations.previous_announcement_date.latest, + DAYS_SINCE_PREV: + BusinessDaysSincePreviousShareBuybackAuth(), + } + + @classmethod + def setUpClass(cls): + cls._cleanup_stack = stack = ExitStack() + cls.finder = stack.enter_context( + tmp_asset_finder(equities=cls.equity_info), + ) + cls.cols = {} + cls.dataset = create_buyback_auth_tst_frame(cls.event_dates_cases, + CASH_FIELD_NAME) + cls.loader_type = ShareBuybackAuthorizationsLoader + + @classmethod + def tearDownClass(cls): + cls._cleanup_stack.close() + + def setup(self, dates): + zip_with_floats_dates = partial(self.zip_with_floats, dates) + num_days_between_dates = partial(self.num_days_between, dates) + _expected_previous_buyback_share_count = pd.DataFrame({ + 0: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-14') + + [1] * num_days_between_dates('2014-01-15', '2014-01-19') + + [15] * num_days_between_dates('2014-01-20', None) + ), + 1: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-14') + + [13] * num_days_between_dates('2014-01-15', '2014-01-19') + + [7] * num_days_between_dates('2014-01-20', None) + ), + 2: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-09') + + [3] * num_days_between_dates('2014-01-10', '2014-01-19') + + [1] * num_days_between_dates('2014-01-20', None) + ), + 3: zip_with_floats_dates( + ['NaN'] * num_days_between_dates(None, '2014-01-09') + + [6] * num_days_between_dates('2014-01-10', '2014-01-14') + + [23] * num_days_between_dates('2014-01-15', None) + ), + 4: zip_with_floats_dates(['NaN'] * len(dates)), + }, index=dates) + self.cols[ + PREVIOUS_BUYBACK_SHARE_COUNT + ] = _expected_previous_buyback_share_count + self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] = \ + self.get_expected_previous_event_dates(dates) + self.cols[DAYS_SINCE_PREV] = self._compute_busday_offsets( + self.cols[PREVIOUS_BUYBACK_ANNOUNCEMENT] + ) + + +class BlazeCashBuybackAuthLoaderTestCase(CashBuybackAuthLoaderTestCase): + """ Test case for loading via blaze. + """ + @classmethod + def setUpClass(cls): + super(BlazeCashBuybackAuthLoaderTestCase, cls).setUpClass() + cls.loader_type = BlazeCashBuybackAuthorizationsLoader + + def loader_args(self, dates): + _, mapping = super( + BlazeCashBuybackAuthLoaderTestCase, + self, + ).loader_args(dates) + return (bz.Data(pd.concat( + pd.DataFrame({ + BUYBACK_ANNOUNCEMENT_FIELD_NAME: + frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME], + CASH_FIELD_NAME: + frame[CASH_FIELD_NAME], + TS_FIELD_NAME: + frame[TS_FIELD_NAME], + SID_FIELD_NAME: sid, + }) + for sid, frame in iteritems(mapping) + ).reset_index(drop=True)),) + + +class BlazeShareBuybackAuthLoaderTestCase(ShareBuybackAuthLoaderTestCase): + """ Test case for loading via blaze. + """ + @classmethod + def setUpClass(cls): + super(BlazeShareBuybackAuthLoaderTestCase, cls).setUpClass() + cls.loader_type = BlazeShareBuybackAuthorizationsLoader + + def loader_args(self, dates): + _, mapping = super( + BlazeShareBuybackAuthLoaderTestCase, + self, + ).loader_args(dates) + return (bz.Data(pd.concat( + pd.DataFrame({ + BUYBACK_ANNOUNCEMENT_FIELD_NAME: + frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME], + SHARE_COUNT_FIELD_NAME: + frame[SHARE_COUNT_FIELD_NAME], + TS_FIELD_NAME: + frame[TS_FIELD_NAME], + SID_FIELD_NAME: sid, + }) + for sid, frame in iteritems(mapping) + ).reset_index(drop=True)),) + + +class BlazeShareBuybackAuthLoaderNotInteractiveTestCase( + BlazeShareBuybackAuthLoaderTestCase): + """Test case for passing a non-interactive symbol and a dict of resources. + """ + def loader_args(self, dates): + (bound_expr,) = super( + BlazeShareBuybackAuthLoaderNotInteractiveTestCase, + self, + ).loader_args(dates) + return swap_resources_into_scope(bound_expr, {}) + + +class BlazeCashBuybackAuthLoaderNotInteractiveTestCase( + BlazeCashBuybackAuthLoaderTestCase): + """Test case for passing a non-interactive symbol and a dict of resources. + """ + def loader_args(self, dates): + (bound_expr,) = super( + BlazeCashBuybackAuthLoaderNotInteractiveTestCase, + self, + ).loader_args(dates) + return swap_resources_into_scope(bound_expr, {}) diff --git a/tests/pipeline/test_earnings.py b/tests/pipeline/test_earnings.py index 57b72420..ce39220d 100644 --- a/tests/pipeline/test_earnings.py +++ b/tests/pipeline/test_earnings.py @@ -6,172 +6,74 @@ from unittest import TestCase import blaze as bz from blaze.compute.core import swap_resources_into_scope from contextlib2 import ExitStack -from nose_parameterized import parameterized import pandas as pd -import numpy as np -from pandas.util.testing import assert_series_equal from six import iteritems +from tests.pipeline.test_events import EventLoaderCommonMixin, DATE_FIELD_NAME -from zipline.pipeline import Pipeline +from zipline.pipeline.common import ( + ANNOUNCEMENT_FIELD_NAME, + DAYS_SINCE_PREV, + DAYS_TO_NEXT, + NEXT_ANNOUNCEMENT, + PREVIOUS_ANNOUNCEMENT, + SID_FIELD_NAME, + TS_FIELD_NAME +) from zipline.pipeline.data import EarningsCalendar -from zipline.pipeline.engine import SimplePipelineEngine from zipline.pipeline.factors.events import ( - BusinessDaysUntilNextEarnings, BusinessDaysSincePreviousEarnings, + BusinessDaysUntilNextEarnings, ) from zipline.pipeline.loaders.earnings import EarningsCalendarLoader from zipline.pipeline.loaders.blaze import ( - ANNOUNCEMENT_FIELD_NAME, BlazeEarningsCalendarLoader, - SID_FIELD_NAME, - TS_FIELD_NAME, ) -from zipline.utils.numpy_utils import make_datetime64D, NaTD from zipline.utils.test_utils import ( make_simple_equity_info, tmp_asset_finder, - gen_calendars, - to_series, - num_days_in_range, ) -class EarningsCalendarLoaderTestCase(TestCase): +class EarningsCalendarLoaderTestCase(TestCase, EventLoaderCommonMixin): """ Tests for loading the earnings announcement data. """ - loader_type = EarningsCalendarLoader + pipeline_columns = { + NEXT_ANNOUNCEMENT: EarningsCalendar.next_announcement.latest, + PREVIOUS_ANNOUNCEMENT: EarningsCalendar.previous_announcement.latest, + DAYS_SINCE_PREV: BusinessDaysSincePreviousEarnings(), + DAYS_TO_NEXT: BusinessDaysUntilNextEarnings(), + } @classmethod def setUpClass(cls): cls._cleanup_stack = stack = ExitStack() - cls.sids = A, B, C, D, E = range(5) equity_info = make_simple_equity_info( cls.sids, start_date=pd.Timestamp('2013-01-01', tz='UTC'), end_date=pd.Timestamp('2015-01-01', tz='UTC'), ) + cls.cols = {} + cls.dataset = {sid: df for sid, df in enumerate( + case.rename( + columns={DATE_FIELD_NAME: ANNOUNCEMENT_FIELD_NAME} + ) for case in cls.event_dates_cases)} cls.finder = stack.enter_context( tmp_asset_finder(equities=equity_info), ) - cls.earnings_dates = { - # K1--K2--E1--E2. - A: to_series( - knowledge_dates=['2014-01-05', '2014-01-10'], - earning_dates=['2014-01-15', '2014-01-20'], - ), - # K1--K2--E2--E1. - B: to_series( - knowledge_dates=['2014-01-05', '2014-01-10'], - earning_dates=['2014-01-20', '2014-01-15'] - ), - # K1--E1--K2--E2. - C: to_series( - knowledge_dates=['2014-01-05', '2014-01-15'], - earning_dates=['2014-01-10', '2014-01-20'] - ), - # K1 == K2. - D: to_series( - knowledge_dates=['2014-01-05'] * 2, - earning_dates=['2014-01-10', '2014-01-15'], - ), - E: pd.Series( - data=[], - index=pd.DatetimeIndex([]), - dtype='datetime64[ns]', - ), - } + cls.loader_type = EarningsCalendarLoader @classmethod def tearDownClass(cls): cls._cleanup_stack.close() - def loader_args(self, dates): - """Construct the base earnings announcements object to pass to the - loader. - - Parameters - ---------- - dates : pd.DatetimeIndex - The dates we can serve. - - Returns - ------- - args : tuple[any] - The arguments to forward to the loader positionally. - """ - return dates, self.earnings_dates - def setup(self, dates): - """ - Make a PipelineEngine and expectation functions for the given dates - calendar. + _expected_next_announce = self.get_expected_next_event_dates(dates) - This exists to make it easy to test our various cases with critical - dates missing from the calendar. - """ - A, B, C, D, E = self.sids - - def num_days_between(start_date, end_date): - return num_days_in_range(dates, start_date, end_date) - - def zip_with_dates(dts): - return pd.Series(pd.to_datetime(dts), index=dates) - - _expected_next_announce = pd.DataFrame({ - A: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-04') + - ['2014-01-15'] * num_days_between('2014-01-05', '2014-01-15') + - ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') + - ['NaT'] * num_days_between('2014-01-21', None) - ), - B: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-04') + - ['2014-01-20'] * num_days_between('2014-01-05', '2014-01-09') + - ['2014-01-15'] * num_days_between('2014-01-10', '2014-01-15') + - ['2014-01-20'] * num_days_between('2014-01-16', '2014-01-20') + - ['NaT'] * num_days_between('2014-01-21', None) - ), - C: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-04') + - ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') + - ['NaT'] * num_days_between('2014-01-11', '2014-01-14') + - ['2014-01-20'] * num_days_between('2014-01-15', '2014-01-20') + - ['NaT'] * num_days_between('2014-01-21', None) - ), - D: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-04') + - ['2014-01-10'] * num_days_between('2014-01-05', '2014-01-10') + - ['2014-01-15'] * num_days_between('2014-01-11', '2014-01-15') + - ['NaT'] * num_days_between('2014-01-16', None) - ), - E: zip_with_dates(['NaT'] * len(dates)), - }, index=dates) - - _expected_previous_announce = pd.DataFrame({ - A: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-14') + - ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') + - ['2014-01-20'] * num_days_between('2014-01-20', None) - ), - B: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-14') + - ['2014-01-15'] * num_days_between('2014-01-15', '2014-01-19') + - ['2014-01-20'] * num_days_between('2014-01-20', None) - ), - C: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-09') + - ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-19') + - ['2014-01-20'] * num_days_between('2014-01-20', None) - ), - D: zip_with_dates( - ['NaT'] * num_days_between(None, '2014-01-09') + - ['2014-01-10'] * num_days_between('2014-01-10', '2014-01-14') + - ['2014-01-15'] * num_days_between('2014-01-15', None) - ), - E: zip_with_dates(['NaT'] * len(dates)), - }, index=dates) + _expected_previous_announce = self.get_expected_previous_event_dates( + dates + ) _expected_next_busday_offsets = self._compute_busday_offsets( _expected_next_announce @@ -179,164 +81,17 @@ class EarningsCalendarLoaderTestCase(TestCase): _expected_previous_busday_offsets = self._compute_busday_offsets( _expected_previous_announce ) - - def expected_next_announce(sid): - """ - Return the expected next announcement dates for ``sid``. - """ - return _expected_next_announce[sid] - - def expected_next_busday_offset(sid): - """ - Return the expected number of days to the next announcement for - ``sid``. - """ - return _expected_next_busday_offsets[sid] - - def expected_previous_announce(sid): - """ - Return the expected previous announcement dates for ``sid``. - """ - return _expected_previous_announce[sid] - - def expected_previous_busday_offset(sid): - """ - Return the expected number of days to the next announcement for - ``sid``. - """ - return _expected_previous_busday_offsets[sid] - - loader = self.loader_type(*self.loader_args(dates)) - engine = SimplePipelineEngine(lambda _: loader, dates, self.finder) - return ( - engine, - expected_next_announce, - expected_next_busday_offset, - expected_previous_announce, - expected_previous_busday_offset, - ) - - @staticmethod - def _compute_busday_offsets(announcement_dates): - """ - Compute expected business day offsets from a DataFrame of announcement - dates. - """ - # Column-vector of dates on which factor `compute` will be called. - raw_call_dates = announcement_dates.index.values.astype( - 'datetime64[D]' - )[:, None] - - # 2D array of dates containining expected nexg announcement. - raw_announce_dates = ( - announcement_dates.values.astype('datetime64[D]') - ) - - # Set NaTs to 0 temporarily because busday_count doesn't support NaT. - # We fill these entries with NaNs later. - whereNaT = raw_announce_dates == NaTD - raw_announce_dates[whereNaT] = make_datetime64D(0) - - # The abs call here makes it so that we can use this function to - # compute offsets for both next and previous earnings (previous - # earnings offsets come back negative). - expected = abs(np.busday_count( - raw_call_dates, - raw_announce_dates - ).astype(float)) - - expected[whereNaT] = np.nan - return pd.DataFrame( - data=expected, - columns=announcement_dates.columns, - index=announcement_dates.index, - ) - - @parameterized.expand(gen_calendars( - '2014-01-01', - '2014-01-31', - critical_dates=pd.to_datetime([ - '2014-01-05', - '2014-01-10', - '2014-01-15', - '2014-01-20', - ], utc=True), - )) - def test_compute_earnings(self, dates): - - ( - engine, - expected_next, - expected_next_busday_offset, - expected_previous, - expected_previous_busday_offset, - ) = self.setup(dates) - - pipe = Pipeline( - columns={ - 'next': EarningsCalendar.next_announcement.latest, - 'previous': EarningsCalendar.previous_announcement.latest, - 'days_to_next': BusinessDaysUntilNextEarnings(), - 'days_since_prev': BusinessDaysSincePreviousEarnings(), - } - ) - - result = engine.run_pipeline( - pipe, - start_date=dates[0], - end_date=dates[-1], - ) - - computed_next = result['next'] - computed_previous = result['previous'] - computed_next_busday_offset = result['days_to_next'] - computed_previous_busday_offset = result['days_since_prev'] - - # NaTs in next/prev should correspond to NaNs in offsets. - assert_series_equal( - computed_next.isnull(), - computed_next_busday_offset.isnull(), - check_names=False, - ) - assert_series_equal( - computed_previous.isnull(), - computed_previous_busday_offset.isnull(), - check_names=False, - ) - - for sid in self.sids: - - assert_series_equal( - computed_next.xs(sid, level=1), - expected_next(sid), - sid, - check_names=False, - ) - - assert_series_equal( - computed_previous.xs(sid, level=1), - expected_previous(sid), - sid, - check_names=False, - ) - - assert_series_equal( - computed_next_busday_offset.xs(sid, level=1), - expected_next_busday_offset(sid), - sid, - check_names=False, - ) - - assert_series_equal( - computed_previous_busday_offset.xs(sid, level=1), - expected_previous_busday_offset(sid), - sid, - check_names=False, - ) + self.cols[PREVIOUS_ANNOUNCEMENT] = _expected_previous_announce + self.cols[NEXT_ANNOUNCEMENT] = _expected_next_announce + self.cols[DAYS_TO_NEXT] = _expected_next_busday_offsets + self.cols[DAYS_SINCE_PREV] = _expected_previous_busday_offsets class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase): - loader_type = BlazeEarningsCalendarLoader + @classmethod + def setUpClass(cls): + super(BlazeEarningsCalendarLoaderTestCase, cls).setUpClass() + cls.loader_type = BlazeEarningsCalendarLoader def loader_args(self, dates): _, mapping = super( @@ -345,11 +100,11 @@ class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase): ).loader_args(dates) return (bz.Data(pd.concat( pd.DataFrame({ - ANNOUNCEMENT_FIELD_NAME: earning_dates, - TS_FIELD_NAME: earning_dates.index, + ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME], + TS_FIELD_NAME: df[TS_FIELD_NAME], SID_FIELD_NAME: sid, }) - for sid, earning_dates in iteritems(mapping) + for sid, df in iteritems(mapping) ).reset_index(drop=True)),) @@ -357,35 +112,15 @@ class BlazeEarningsCalendarLoaderNotInteractiveTestCase( BlazeEarningsCalendarLoaderTestCase): """Test case for passing a non-interactive symbol and a dict of resources. """ + @classmethod + def setUpClass(cls): + super(BlazeEarningsCalendarLoaderNotInteractiveTestCase, + cls).setUpClass() + cls.loader_type = BlazeEarningsCalendarLoader + def loader_args(self, dates): (bound_expr,) = super( BlazeEarningsCalendarLoaderNotInteractiveTestCase, self, ).loader_args(dates) return swap_resources_into_scope(bound_expr, {}) - - -class EarningsCalendarLoaderInferTimestampTestCase(TestCase): - def test_infer_timestamp(self): - dtx = pd.date_range('2014-01-01', '2014-01-10') - announcement_dates = { - 0: dtx, - 1: pd.Series(dtx, dtx), - } - loader = EarningsCalendarLoader( - dtx, - announcement_dates, - infer_timestamps=True, - ) - self.assertEqual( - loader.announcement_dates.keys(), - announcement_dates.keys(), - ) - assert_series_equal( - loader.announcement_dates[0], - pd.Series(index=[dtx[0]] * 10, data=dtx), - ) - assert_series_equal( - loader.announcement_dates[1], - announcement_dates[1], - ) diff --git a/tests/pipeline/test_events.py b/tests/pipeline/test_events.py new file mode 100644 index 00000000..48cd4d07 --- /dev/null +++ b/tests/pipeline/test_events.py @@ -0,0 +1,486 @@ +""" +Tests for setting up an EventsLoader and a BlazeEventsLoader. +""" +from functools import partial +from nose_parameterized import parameterized +import re +from unittest import TestCase + +import blaze as bz +import numpy as np +import pandas as pd +from pandas.util.testing import assert_series_equal +from zipline.pipeline import SimplePipelineEngine, Pipeline + +from zipline.pipeline.common import ( + ANNOUNCEMENT_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME +) +from zipline.pipeline.data import DataSet, Column +from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader +from zipline.pipeline.loaders.events import ( + DF_NO_TS_NOT_INFER_TS_ERROR, + DTINDEX_NOT_INFER_TS_ERROR, + EventsLoader, + SERIES_NO_DTINDEX_ERROR, + WRONG_COLS_ERROR, + WRONG_MANY_COL_DATA_FORMAT_ERROR, + WRONG_SINGLE_COL_DATA_FORMAT_ERROR +) +from zipline.utils.memoize import lazyval +from zipline.utils.numpy_utils import ( + datetime64ns_dtype, + NaTD, + make_datetime64D +) +from zipline.utils.test_utils import ( + gen_calendars, + num_days_in_range, + make_simple_equity_info +) + +ABSTRACT_CONCRETE_LOADER_ERROR = 'abstract methods concrete_loader' +ABSTRACT_EXPECTED_COLS_ERROR = 'abstract methods expected_cols' +DATE_FIELD_NAME = "event_date" + + +class EventDataSet(DataSet): + previous_announcement = Column(datetime64ns_dtype) + + +class EventDataSetLoader(EventsLoader): + expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME]) + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=EventDataSet): + super(EventDataSetLoader, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset, + ) + + @lazyval + def previous_announcement_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement, + ANNOUNCEMENT_FIELD_NAME, + ) + + @lazyval + def next_announcement_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement, + ANNOUNCEMENT_FIELD_NAME, + ) + + +# Test case just for catching an error when multiple columns are in the wrong +# data format, so no loader defined. +class EventDataSetLoaderMultipleExpectedCols(EventsLoader): + expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME, "other_field"]) + + +class EventDataSetLoaderNoExpectedCols(EventsLoader): + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=EventDataSet): + super(EventDataSetLoaderNoExpectedCols, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset, + ) + + +dtx = pd.date_range('2014-01-01', '2014-01-10') + + +class EventLoaderTestCase(TestCase): + def assert_loader_error(self, events_by_sid, error, msg, + infer_timestamps, loader): + with self.assertRaisesRegexp(error, re.escape(msg)): + loader( + dtx, events_by_sid, infer_timestamps=infer_timestamps, + ) + + def test_no_expected_cols_defined(self): + events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})} + self.assert_loader_error(events_by_sid, TypeError, + ABSTRACT_EXPECTED_COLS_ERROR, + True, EventDataSetLoaderNoExpectedCols) + + def test_wrong_cols(self): + wrong_col_name = 'some_other_col' + # Test wrong cols (cols != expected) + events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})} + self.assert_loader_error( + events_by_sid, ValueError, WRONG_COLS_ERROR.format( + expected_columns=list(EventDataSetLoader.expected_cols), + sid=0, + resulting_columns=[wrong_col_name], + ), + True, + EventDataSetLoader + ) + + @parameterized.expand([ + # DataFrame without timestamp column and infer_timestamps = True + [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), True], + # DataFrame with timestamp column + [pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx, + TS_FIELD_NAME: dtx}), False], + # DatetimeIndex with infer_timestamps = True + [pd.DatetimeIndex(dtx), True], + # Series with DatetimeIndex as index and infer_timestamps = False + [pd.Series(dtx, index=dtx), False] + ]) + def test_conversion_to_df(self, df, infer_timestamps): + + events_by_sid = {0: df} + loader = EventDataSetLoader( + dtx, + events_by_sid, + infer_timestamps=infer_timestamps, + ) + self.assertEqual( + loader.events_by_sid.keys(), + events_by_sid.keys(), + ) + + if infer_timestamps: + expected = pd.Series(index=[dtx[0]] * 10, data=dtx, + name=ANNOUNCEMENT_FIELD_NAME) + else: + expected = pd.Series(index=dtx, data=dtx, + name=ANNOUNCEMENT_FIELD_NAME) + expected.index.name = TS_FIELD_NAME + # Check that index by first given date has been added + assert_series_equal( + loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME], + expected, + ) + + @parameterized.expand( + [ + # DataFrame without timestamp column and infer_timestamps = True + [ + pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), + False, + DF_NO_TS_NOT_INFER_TS_ERROR.format( + timestamp_column_name=TS_FIELD_NAME, + sid=0 + ), + EventDataSetLoader + ], + # DatetimeIndex with infer_timestamps = False + [ + pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME), + False, + DTINDEX_NOT_INFER_TS_ERROR.format(sid=0), + EventDataSetLoader + ], + # Series with DatetimeIndex as index and infer_timestamps = False + [ + pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME), + False, + SERIES_NO_DTINDEX_ERROR.format(sid=0), + EventDataSetLoader + ], + # Below, 2 cases repeated for infer_timestamps = True and False. + # Shouldn't make a difference in the outcome. + # We expected 1 column but got a data structure other than a + # DataFrame, Series, or DatetimeIndex + [ + [dtx], + True, + WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0), + EventDataSetLoader + ], + # We expected multiple columns but got a data structure other + # than a DataFrame + [ + [dtx, dtx], + True, + WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0), + EventDataSetLoaderMultipleExpectedCols + ], + [ + [dtx], + False, + WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0), + EventDataSetLoader + ], + # We expected multiple columns but got a data structure other + # than a DataFrame + [ + [dtx, dtx], + False, + WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0), + EventDataSetLoaderMultipleExpectedCols + ] + ] + ) + def test_bad_conversion_to_df(self, df, infer_timestamps, msg, loader): + events_by_sid = {0: df} + self.assert_loader_error(events_by_sid, ValueError, msg, + infer_timestamps, loader) + + +class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader): + def __init__(self, + expr, + dataset=EventDataSet, + **kwargs): + super( + BlazeEventDataSetLoaderNoConcreteLoader, self + ).__init__(expr, + dataset=dataset, + **kwargs) + + +class BlazeEventLoaderTestCase(TestCase): + # Blaze loader: need to test failure if no concrete loader + def test_no_concrete_loader_defined(self): + with self.assertRaisesRegexp( + TypeError, re.escape(ABSTRACT_CONCRETE_LOADER_ERROR) + ): + BlazeEventDataSetLoaderNoConcreteLoader( + bz.Data( + pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx, + SID_FIELD_NAME: 0 + }) + ) + ) + + +# Must be a list - can't use generator since this needs to be used more than +# once. +param_dates = list(gen_calendars( + '2014-01-01', + '2014-01-31', + critical_dates=pd.to_datetime([ + '2014-01-05', + '2014-01-10', + '2014-01-15', + '2014-01-20', + ], utc=True), +)) + + +class EventLoaderCommonMixin(object): + sids = A, B, C, D, E = range(5) + equity_info = make_simple_equity_info( + sids, + start_date=pd.Timestamp('2013-01-01', tz='UTC'), + end_date=pd.Timestamp('2015-01-01', tz='UTC'), + ) + + event_dates_cases = [ + # K1--K2--E1--E2. + pd.DataFrame({ + TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), + DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']) + }), + # K1--K2--E2--E1. + pd.DataFrame({ + TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']), + DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']) + }), + # K1--E1--K2--E2. + pd.DataFrame({ + TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']), + DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']) + }), + # K1 == K2. + pd.DataFrame({ + TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2), + DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']) + }), + pd.DataFrame({ + TS_FIELD_NAME: pd.to_datetime([]), + DATE_FIELD_NAME: pd.to_datetime([]) + }) + ] + + def zip_with_floats(self, dates, flts): + return pd.Series(flts, index=dates).astype('float') + + def num_days_between(self, dates, start_date, end_date): + return num_days_in_range(dates, start_date, end_date) + + def zip_with_dates(self, index_dates, dts): + return pd.Series(pd.to_datetime(dts), index=index_dates) + + def loader_args(self, dates): + """Construct the base object to pass to the loader. + + Parameters + ---------- + dates : pd.DatetimeIndex + The dates we can serve. + + Returns + ------- + args : tuple[any] + The arguments to forward to the loader positionally. + """ + return dates, self.dataset + + def setup_engine(self, dates): + """ + Make a Pipeline Enigne object based on the given dates. + """ + loader = self.loader_type(*self.loader_args(dates)) + return SimplePipelineEngine(lambda _: loader, dates, self.finder) + + def get_expected_next_event_dates(self, dates): + num_days_between_for_dates = partial(self.num_days_between, dates) + zip_with_dates_for_dates = partial(self.zip_with_dates, dates) + return pd.DataFrame({ + 0: zip_with_dates_for_dates( + ['NaT'] * + num_days_between_for_dates(None, '2014-01-04') + + ['2014-01-15'] * + num_days_between_for_dates('2014-01-05', '2014-01-15') + + ['2014-01-20'] * + num_days_between_for_dates('2014-01-16', '2014-01-20') + + ['NaT'] * + num_days_between_for_dates('2014-01-21', None) + ), + 1: zip_with_dates_for_dates( + ['NaT'] * + num_days_between_for_dates(None, '2014-01-04') + + ['2014-01-20'] * + num_days_between_for_dates('2014-01-05', '2014-01-09') + + ['2014-01-15'] * + num_days_between_for_dates('2014-01-10', '2014-01-15') + + ['2014-01-20'] * + num_days_between_for_dates('2014-01-16', '2014-01-20') + + ['NaT'] * + num_days_between_for_dates('2014-01-21', None) + ), + 2: zip_with_dates_for_dates( + ['NaT'] * + num_days_between_for_dates(None, '2014-01-04') + + ['2014-01-10'] * + num_days_between_for_dates('2014-01-05', '2014-01-10') + + ['NaT'] * + num_days_between_for_dates('2014-01-11', '2014-01-14') + + ['2014-01-20'] * + num_days_between_for_dates('2014-01-15', '2014-01-20') + + ['NaT'] * + num_days_between_for_dates('2014-01-21', None) + ), + 3: zip_with_dates_for_dates( + ['NaT'] * + num_days_between_for_dates(None, '2014-01-04') + + ['2014-01-10'] * + num_days_between_for_dates('2014-01-05', '2014-01-10') + + ['2014-01-15'] * + num_days_between_for_dates('2014-01-11', '2014-01-15') + + ['NaT'] * + num_days_between_for_dates('2014-01-16', None) + ), + 4: zip_with_dates_for_dates(['NaT'] * + len(dates)), + }, index=dates) + + def get_expected_previous_event_dates(self, dates): + num_days_between_for_dates = partial(self.num_days_between, dates) + zip_with_dates_for_dates = partial(self.zip_with_dates, dates) + return pd.DataFrame({ + 0: zip_with_dates_for_dates( + ['NaT'] * num_days_between_for_dates(None, '2014-01-14') + + ['2014-01-15'] * num_days_between_for_dates('2014-01-15', + '2014-01-19') + + ['2014-01-20'] * num_days_between_for_dates('2014-01-20', + None), + ), + 1: zip_with_dates_for_dates( + ['NaT'] * num_days_between_for_dates(None, '2014-01-14') + + ['2014-01-15'] * num_days_between_for_dates('2014-01-15', + '2014-01-19') + + ['2014-01-20'] * num_days_between_for_dates('2014-01-20', + None), + ), + 2: zip_with_dates_for_dates( + ['NaT'] * num_days_between_for_dates(None, '2014-01-09') + + ['2014-01-10'] * num_days_between_for_dates('2014-01-10', + '2014-01-19') + + ['2014-01-20'] * num_days_between_for_dates('2014-01-20', + None), + ), + 3: zip_with_dates_for_dates( + ['NaT'] * num_days_between_for_dates(None, '2014-01-09') + + ['2014-01-10'] * num_days_between_for_dates('2014-01-10', + '2014-01-14') + + ['2014-01-15'] * num_days_between_for_dates('2014-01-15', + None), + ), + 4: zip_with_dates_for_dates(['NaT'] * len(dates)), + }, index=dates) + + @staticmethod + def _compute_busday_offsets(announcement_dates): + """ + Compute expected business day offsets from a DataFrame of announcement + dates. + """ + # Column-vector of dates on which factor `compute` will be called. + raw_call_dates = announcement_dates.index.values.astype( + 'datetime64[D]' + )[:, None] + + # 2D array of dates containining expected nexg announcement. + raw_announce_dates = ( + announcement_dates.values.astype('datetime64[D]') + ) + + # Set NaTs to 0 temporarily because busday_count doesn't support NaT. + # We fill these entries with NaNs later. + whereNaT = raw_announce_dates == NaTD + raw_announce_dates[whereNaT] = make_datetime64D(0) + + # The abs call here makes it so that we can use this function to + # compute offsets for both next and previous earnings (previous + # earnings offsets come back negative). + expected = abs(np.busday_count( + raw_call_dates, + raw_announce_dates + ).astype(float)) + + expected[whereNaT] = np.nan + return pd.DataFrame( + data=expected, + columns=announcement_dates.columns, + index=announcement_dates.index, + ) + + @parameterized.expand(param_dates) + def test_compute(self, dates): + engine = self.setup_engine(dates) + self.setup(dates) + + pipe = Pipeline( + columns=self.pipeline_columns + ) + + result = engine.run_pipeline( + pipe, + start_date=dates[0], + end_date=dates[-1], + ) + + for sid in self.sids: + for col_name in self.cols.keys(): + assert_series_equal(result[col_name].xs(sid, level=1), + self.cols[col_name][sid], + check_names=False) diff --git a/zipline/pipeline/common.py b/zipline/pipeline/common.py new file mode 100644 index 00000000..de225409 --- /dev/null +++ b/zipline/pipeline/common.py @@ -0,0 +1,17 @@ +""" +Common constants for Pipeline. +""" +AD_FIELD_NAME = 'asof_date' +ANNOUNCEMENT_FIELD_NAME = 'announcement_date' +CASH_FIELD_NAME = 'cash' +BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date' +DAYS_SINCE_PREV = 'days_since_prev' +DAYS_TO_NEXT = 'days_to_next' +NEXT_ANNOUNCEMENT = 'next_announcement' +PREVIOUS_ANNOUNCEMENT = 'previous_announcement' +PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement' +PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash' +PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count' +SHARE_COUNT_FIELD_NAME = 'share_count' +SID_FIELD_NAME = 'sid' +TS_FIELD_NAME = 'timestamp' diff --git a/zipline/pipeline/data/__init__.py b/zipline/pipeline/data/__init__.py index 894bc86e..f7357f09 100644 --- a/zipline/pipeline/data/__init__.py +++ b/zipline/pipeline/data/__init__.py @@ -1,11 +1,14 @@ +from .buyback_auth import CashBuybackAuthorizations, ShareBuybackAuthorizations from .earnings import EarningsCalendar from .equity_pricing import USEquityPricing from .dataset import DataSet, Column, BoundColumn __all__ = [ 'BoundColumn', + 'CashBuybackAuthorizations', 'Column', 'DataSet', 'EarningsCalendar', + 'ShareBuybackAuthorizations', 'USEquityPricing', ] diff --git a/zipline/pipeline/data/buyback_auth.py b/zipline/pipeline/data/buyback_auth.py new file mode 100644 index 00000000..8541d2a8 --- /dev/null +++ b/zipline/pipeline/data/buyback_auth.py @@ -0,0 +1,24 @@ +""" +Datasets representing dates of recently announced buyback authorizations. +""" +from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype + +from .dataset import Column, DataSet + + +class CashBuybackAuthorizations(DataSet): + """ + Dataset representing dates of recently announced cash buyback + authorizations. + """ + previous_value = Column(float64_dtype) + previous_announcement_date = Column(datetime64ns_dtype) + + +class ShareBuybackAuthorizations(DataSet): + """ + Dataset representing dates of recently announced share buyback + authorizations. + """ + previous_share_count = Column(float64_dtype) + previous_announcement_date = Column(datetime64ns_dtype) diff --git a/zipline/pipeline/factors/events.py b/zipline/pipeline/factors/events.py index 2491efc8..cee82d2d 100644 --- a/zipline/pipeline/factors/events.py +++ b/zipline/pipeline/factors/events.py @@ -3,6 +3,10 @@ Factors describing information about event data (e.g. earnings announcements, acquisitions, dividends, etc.). """ from numpy import newaxis +from zipline.pipeline.data.buyback_auth import ( + CashBuybackAuthorizations, + ShareBuybackAuthorizations +) from zipline.pipeline.data.earnings import EarningsCalendar from zipline.utils.numpy_utils import ( NaTD, @@ -14,10 +18,42 @@ from zipline.utils.numpy_utils import ( from .factor import Factor -class BusinessDaysUntilNextEarnings(Factor): +class BusinessDaysSincePreviousEvents(Factor): """ - Factor returning the number of **business days** (not trading days!) until - the next known earnings date for each asset. + Abstract class for business days since a previous event. + Returns the number of **business days** (not trading days!) since + the most recent event date for each asset. + + This doesn't use trading days for symmetry with + BusinessDaysUntilNextEarnings. + + Assets which announced or will announce the event today will produce a + value of 0.0. Assets that announced the event on the previous business + day will produce a value of 1.0. + + Assets for which the event date is `NaT` will produce a value of `NaN`. + """ + window_length = 0 + dtype = float64_dtype + + def _compute(self, arrays, dates, assets, mask): + + # Coerce from [ns] to [D] for numpy busday_count. + announce_dates = arrays[0].astype(datetime64D_dtype) + + # Set masked values to NaT. + announce_dates[~mask] = NaTD + + # Convert row labels into a column vector for broadcasted comparison. + reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis] + return busday_count_mask_NaT(announce_dates, reference_dates) + + +class BusinessDaysUntilNextEvents(Factor): + """ + Abstract class for business days since a next event. + Returns the number of **business days** (not trading days!) until + the next known event date for each asset. This doesn't use trading days because the trading calendar includes information that may not have been available to the algorithm at the time @@ -26,19 +62,12 @@ class BusinessDaysUntilNextEarnings(Factor): For example, the NYSE closings September 11th 2001, would not have been known to the algorithm on September 10th. - Assets that announced or will announce earnings today will produce a value - of 0.0. Assets that will announce earnings on the next upcoming business + Assets that announced or will announce the event today will produce a value + of 0.0. Assets that will announce the event on the next upcoming business day will produce a value of 1.0. - Assets for which `EarningsCalendar.next_announcement` is `NaT` will produce - a value of `NaN`. - - - See Also - -------- - zipline.pipeline.factors.BusinessDaysSincePreviousEarnings + Assets for which the event date is `NaT` will produce a value of `NaN`. """ - inputs = [EarningsCalendar.next_announcement] window_length = 0 dtype = float64_dtype @@ -55,37 +84,54 @@ class BusinessDaysUntilNextEarnings(Factor): return busday_count_mask_NaT(reference_dates, announce_dates) -class BusinessDaysSincePreviousEarnings(Factor): +class BusinessDaysUntilNextEarnings(BusinessDaysUntilNextEvents): + """ + Factor returning the number of **business days** (not trading days!) until + the next known earnings date for each asset. + + See Also + -------- + zipline.pipeline.factors.BusinessDaysSincePreviousEarnings + """ + inputs = [EarningsCalendar.next_announcement] + + +class BusinessDaysSincePreviousEarnings(BusinessDaysSincePreviousEvents): """ Factor returning the number of **business days** (not trading days!) since the most recent earnings date for each asset. - This doesn't use trading days for symmetry with - BusinessDaysUntilNextEarnings. - - Assets which announced or will announce earnings today will produce a value - of 0.0. Assets that announced earnings on the previous business day will - produce a value of 1.0. - - Assets for which `EarningsCalendar.previous_announcement` is `NaT` will - produce a value of `NaN`. - See Also -------- zipline.pipeline.factors.BusinessDaysUntilNextEarnings """ inputs = [EarningsCalendar.previous_announcement] - window_length = 0 - dtype = float64_dtype - def _compute(self, arrays, dates, assets, mask): - # Coerce from [ns] to [D] for numpy busday_count. - announce_dates = arrays[0].astype(datetime64D_dtype) +class BusinessDaysSincePreviousCashBuybackAuth( + BusinessDaysSincePreviousEvents +): + """ + Factor returning the number of **business days** (not trading days!) since + the most recent cash buyback authorization for each asset. - # Set masked values to NaT. - announce_dates[~mask] = NaTD + See Also + -------- + zipline.pipeline.factors.BusinessDaysSincePreviousCashBuybackAuth + """ + inputs = [CashBuybackAuthorizations.previous_announcement_date] - # Convert row labels into a column vector for broadcasted comparison. - reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis] - return busday_count_mask_NaT(announce_dates, reference_dates) + +class BusinessDaysSincePreviousShareBuybackAuth( + BusinessDaysSincePreviousEvents +): + """ + Factor returning the number of **business days** (not trading days!) since + the most recent share buyback authorization for each asset. + + + See Also + -------- + zipline.pipeline.factors.BusinessDaysSincePreviousShareBuybackAuth + """ + inputs = [ShareBuybackAuthorizations.previous_announcement_date] diff --git a/zipline/pipeline/loaders/blaze/__init__.py b/zipline/pipeline/loaders/blaze/__init__.py index 200fa583..838e3577 100644 --- a/zipline/pipeline/loaders/blaze/__init__.py +++ b/zipline/pipeline/loaders/blaze/__init__.py @@ -1,25 +1,25 @@ + +from .buyback_auth import ( + BlazeCashBuybackAuthorizationsLoader, + BlazeShareBuybackAuthorizationsLoader +) from .core import ( - AD_FIELD_NAME, BlazeLoader, NoDeltasWarning, - SID_FIELD_NAME, - TS_FIELD_NAME, from_blaze, global_loader, ) + from .earnings import ( - ANNOUNCEMENT_FIELD_NAME, BlazeEarningsCalendarLoader, ) __all__ = ( - 'AD_FIELD_NAME', - 'ANNOUNCEMENT_FIELD_NAME', + 'BlazeCashBuybackAuthorizationsLoader', 'BlazeEarningsCalendarLoader', 'BlazeLoader', - 'NoDeltasWarning', - 'SID_FIELD_NAME', - 'TS_FIELD_NAME', + 'BlazeShareBuybackAuthorizationsLoader', 'from_blaze', 'global_loader', + 'NoDeltasWarning', ) diff --git a/zipline/pipeline/loaders/blaze/buyback_auth.py b/zipline/pipeline/loaders/blaze/buyback_auth.py new file mode 100644 index 00000000..cd83e1d1 --- /dev/null +++ b/zipline/pipeline/loaders/blaze/buyback_auth.py @@ -0,0 +1,160 @@ +from .core import ( + TS_FIELD_NAME, + SID_FIELD_NAME, +) +from zipline.pipeline.common import ( + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME, + SHARE_COUNT_FIELD_NAME +) +from zipline.pipeline.data import (CashBuybackAuthorizations, + ShareBuybackAuthorizations) +from zipline.pipeline.loaders.buyback_auth import ( + CashBuybackAuthorizationsLoader, + ShareBuybackAuthorizationsLoader, +) +from .events import BlazeEventsLoader + + +class BlazeCashBuybackAuthorizationsLoader(BlazeEventsLoader): + """A pipeline loader for the ``CashBuybackAuthorizations`` dataset that + loads data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + dataset: DataSet + The DataSet object for which this loader loads data. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime, + {CASH_FIELD_NAME}: ?float64 + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the announcement, the + date when the buyback was announced, the share count, and the cash amount. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME=CASH_FIELD_NAME + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME + }) + + concrete_loader = CashBuybackAuthorizationsLoader + + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=CashBuybackAuthorizations, + **kwargs): + super( + BlazeCashBuybackAuthorizationsLoader, self + ).__init__(expr, + resources=resources, + odo_kwargs=odo_kwargs, + data_query_time=data_query_time, + data_query_tz=data_query_tz, + dataset=dataset, + **kwargs) + + +class BlazeShareBuybackAuthorizationsLoader(BlazeEventsLoader): + """A pipeline loader for the ``ShareBuybackAuthorizations`` dataset that + loads data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + dataset: DataSet + The DataSet object for which this loader loads data. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime, + {SHARE_COUNT_FIELD_NAME}: ?float64, + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the announcement, the + date when the buyback was announced, the share count, and the value. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME=SHARE_COUNT_FIELD_NAME, + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME, + }) + + concrete_loader = ShareBuybackAuthorizationsLoader + + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=ShareBuybackAuthorizations, + **kwargs): + super( + BlazeShareBuybackAuthorizationsLoader, self + ).__init__(expr, + resources=resources, + odo_kwargs=odo_kwargs, + data_query_time=data_query_time, + data_query_tz=data_query_tz, + dataset=dataset, + **kwargs) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 0fcd52e0..6a97540e 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -158,7 +158,11 @@ from toolz import ( ) import toolz.curried.operator as op - +from zipline.pipeline.common import ( + AD_FIELD_NAME, + SID_FIELD_NAME, + TS_FIELD_NAME +) from zipline.pipeline.data.dataset import DataSet, Column from zipline.pipeline.loaders.utils import ( check_data_query_args, @@ -179,9 +183,6 @@ from zipline.utils.pandas_utils import sort_values from zipline.utils.preprocess import preprocess -AD_FIELD_NAME = 'asof_date' -TS_FIELD_NAME = 'timestamp' -SID_FIELD_NAME = 'sid' valid_deltas_node_types = ( bz.expr.Field, bz.expr.ReLabel, diff --git a/zipline/pipeline/loaders/blaze/earnings.py b/zipline/pipeline/loaders/blaze/earnings.py index 9f5137f8..39724e76 100644 --- a/zipline/pipeline/loaders/blaze/earnings.py +++ b/zipline/pipeline/loaders/blaze/earnings.py @@ -1,29 +1,14 @@ -from datashape import istabular -import pandas as pd -from toolz import valmap - -from .core import ( - TS_FIELD_NAME, +from zipline.pipeline.common import ( + ANNOUNCEMENT_FIELD_NAME, SID_FIELD_NAME, - bind_expression_to_resources, - ffill_query_in_range, + TS_FIELD_NAME, ) from zipline.pipeline.data import EarningsCalendar -from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.earnings import EarningsCalendarLoader -from zipline.pipeline.loaders.utils import ( - check_data_query_args, - normalize_data_query_bounds, - normalize_timestamp_to_query_time, -) -from zipline.utils.input_validation import ensure_timezone, optionally -from zipline.utils.preprocess import preprocess +from .events import BlazeEventsLoader -ANNOUNCEMENT_FIELD_NAME = 'announcement_date' - - -class BlazeEarningsCalendarLoader(PipelineLoader): +class BlazeEarningsCalendarLoader(BlazeEventsLoader): """A pipeline loader for the ``EarningsCalendar`` dataset that loads data from a blaze expression. @@ -39,6 +24,8 @@ class BlazeEarningsCalendarLoader(PipelineLoader): The time to use for the data query cutoff. data_query_tz : tzinfo or str The timezeone to use for the data query cutoff. + dataset: DataSet + The DataSet object for which this loader loads data. Notes ----- @@ -57,6 +44,7 @@ class BlazeEarningsCalendarLoader(PipelineLoader): If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. """ + __doc__ = __doc__.format( TS_FIELD_NAME=TS_FIELD_NAME, SID_FIELD_NAME=SID_FIELD_NAME, @@ -69,75 +57,19 @@ class BlazeEarningsCalendarLoader(PipelineLoader): ANNOUNCEMENT_FIELD_NAME, }) - @preprocess(data_query_tz=optionally(ensure_timezone)) + concrete_loader = EarningsCalendarLoader + def __init__(self, expr, resources=None, odo_kwargs=None, data_query_time=None, data_query_tz=None, - dataset=EarningsCalendar): - dshape = expr.dshape - - if not istabular(dshape): - raise ValueError( - 'expression dshape must be tabular, got: %s' % dshape, - ) - - expected_fields = self._expected_fields - self._expr = bind_expression_to_resources( - expr[list(expected_fields)], - resources, - ) - self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {} - self._dataset = dataset - check_data_query_args(data_query_time, data_query_tz) - self._data_query_time = data_query_time - self._data_query_tz = data_query_tz - - def load_adjusted_array(self, columns, dates, assets, mask): - data_query_time = self._data_query_time - data_query_tz = self._data_query_tz - lower_dt, upper_dt = normalize_data_query_bounds( - dates[0], - dates[-1], - data_query_time, - data_query_tz, - ) - - raw = ffill_query_in_range( - self._expr, - lower_dt, - upper_dt, - self._odo_kwargs, - ) - sids = raw.loc[:, SID_FIELD_NAME] - raw.drop( - sids[~sids.isin(assets)].index, - inplace=True - ) - if data_query_time is not None: - normalize_timestamp_to_query_time( - raw, - data_query_time, - data_query_tz, - inplace=True, - ts_field=TS_FIELD_NAME, - ) - - gb = raw.groupby(SID_FIELD_NAME) - - def mkseries(idx, raw_loc=raw.loc): - vs = raw_loc[ - idx, [TS_FIELD_NAME, ANNOUNCEMENT_FIELD_NAME] - ].values - return pd.Series( - index=pd.DatetimeIndex(vs[:, 0]), - data=vs[:, 1], - ) - - return EarningsCalendarLoader( - dates, - valmap(mkseries, gb.groups), - dataset=self._dataset, - ).load_adjusted_array(columns, dates, assets, mask) + dataset=EarningsCalendar, + **kwargs): + super( + BlazeEarningsCalendarLoader, self + ).__init__(expr, dataset=dataset, + resources=resources, odo_kwargs=odo_kwargs, + data_query_time=data_query_time, + data_query_tz=data_query_tz, **kwargs) diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py new file mode 100644 index 00000000..93ed6b72 --- /dev/null +++ b/zipline/pipeline/loaders/blaze/events.py @@ -0,0 +1,128 @@ +import abc + +from datashape import istabular + +from .core import ( + bind_expression_to_resources, + ffill_query_in_range, +) +from zipline.pipeline.loaders.base import PipelineLoader +from zipline.pipeline.common import ( + SID_FIELD_NAME, + TS_FIELD_NAME, +) +from zipline.pipeline.loaders.utils import ( + check_data_query_args, + normalize_data_query_bounds, + normalize_timestamp_to_query_time, +) +from zipline.utils.input_validation import ensure_timezone, optionally +from zipline.utils.preprocess import preprocess + + +class BlazeEventsLoader(PipelineLoader): + """An abstract pipeline loader for the events datasets that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + dataset : DataSet + The DataSet object for which this loader loads data. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + }} + + And other dataset-specific fields, where each row of the table is a + record including the sid to identify the company, the timestamp where we + learned about the announcement, and the date when the earnings will be z + announced. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + + @preprocess(data_query_tz=optionally(ensure_timezone)) + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=None): + dshape = expr.dshape + + if not istabular(dshape): + raise ValueError( + 'expression dshape must be tabular, got: %s' % dshape, + ) + + expected_fields = self._expected_fields + self._expr = bind_expression_to_resources( + expr[list(expected_fields)], + resources, + ) + self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {} + self._dataset = dataset + check_data_query_args(data_query_time, data_query_tz) + self._data_query_time = data_query_time + self._data_query_tz = data_query_tz + + @abc.abstractproperty + def concrete_loader(self): + NotImplementedError('concrete_loader') + + def load_adjusted_array(self, columns, dates, assets, mask): + data_query_time = self._data_query_time + data_query_tz = self._data_query_tz + lower_dt, upper_dt = normalize_data_query_bounds( + dates[0], + dates[-1], + data_query_time, + data_query_tz, + ) + + raw = ffill_query_in_range( + self._expr, + lower_dt, + upper_dt, + self._odo_kwargs, + ) + sids = raw.loc[:, SID_FIELD_NAME] + raw.drop( + sids[~sids.isin(assets)].index, + inplace=True + ) + if data_query_time is not None: + normalize_timestamp_to_query_time( + raw, + data_query_time, + data_query_tz, + inplace=True, + ts_field=TS_FIELD_NAME, + ) + gb = raw.groupby(SID_FIELD_NAME) + return self.concrete_loader( + dates, + self.prepare_data(raw, gb), + dataset=self._dataset, + ).load_adjusted_array(columns, dates, assets, mask) + + def prepare_data(self, raw, gb): + return {sid: raw.loc[group].drop(SID_FIELD_NAME, axis=1) for sid, group + in gb.groups.items()} diff --git a/zipline/pipeline/loaders/buyback_auth.py b/zipline/pipeline/loaders/buyback_auth.py new file mode 100644 index 00000000..d46a207a --- /dev/null +++ b/zipline/pipeline/loaders/buyback_auth.py @@ -0,0 +1,98 @@ +""" +Reference implementation for buyback auth loaders. +""" + +from ..data.buyback_auth import ( + CashBuybackAuthorizations, + ShareBuybackAuthorizations +) +from .events import EventsLoader +from zipline.pipeline.common import ( + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME, + SHARE_COUNT_FIELD_NAME +) +from zipline.utils.memoize import lazyval + + +class CashBuybackAuthorizationsLoader(EventsLoader): + """ + Reference loader for + :class:`zipline.pipeline.data.CashBuybackAuthorizations`. + + events_by_sid: dict[sid -> pd.DataFrame(knowledge date, + event date, cash value)] + + """ + expected_cols = frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME]) + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=CashBuybackAuthorizations): + super(CashBuybackAuthorizationsLoader, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset, + ) + + @lazyval + def previous_value_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_value, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CASH_FIELD_NAME + ) + + @lazyval + def previous_announcement_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement_date, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + ) + + +class ShareBuybackAuthorizationsLoader(EventsLoader): + """ + Reference loader for + :class:`zipline.pipeline.data.ShareBuybackAuthorizations`. + + Does not currently support adjustments to the dates of known buyback + authorizations. + + events_by_sid: dict[sid -> pd.DataFrame(knowledge date, + event date, share value)] + + """ + expected_cols = frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME]) + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=ShareBuybackAuthorizations): + super(ShareBuybackAuthorizationsLoader, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset, + ) + + @lazyval + def previous_share_count_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_share_count, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME + ) + + @lazyval + def previous_announcement_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement_date, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + ) diff --git a/zipline/pipeline/loaders/earnings.py b/zipline/pipeline/loaders/earnings.py index 9912faff..89d376c9 100644 --- a/zipline/pipeline/loaders/earnings.py +++ b/zipline/pipeline/loaders/earnings.py @@ -1,108 +1,32 @@ """ Reference implementation for EarningsCalendar loaders. """ -from itertools import repeat -import pandas as pd -from six import iteritems -from toolz import merge - -from .base import PipelineLoader -from .frame import DataFrameLoader -from .utils import next_date_frame, previous_date_frame from ..data.earnings import EarningsCalendar +from .events import EventsLoader +from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME from zipline.utils.memoize import lazyval -class EarningsCalendarLoader(PipelineLoader): - """ - Reference loader for - :class:`zipline.pipeline.data.earnings.EarningsCalendar`. +class EarningsCalendarLoader(EventsLoader): - Does not currently support adjustments to the dates of known earnings. + expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME]) - Parameters - ---------- - all_dates : pd.DatetimeIndex - Index of dates for which we can serve queries. - announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex] - Dict mapping sids to objects representing dates on which earnings - occurred. - - If a dict value is a Series, it's interpreted as a mapping from the - date on which we learned an announcement was coming to the date on - which the announcement was made. - - If a dict value is a DatetimeIndex, it's interpreted as just containing - the dates that announcements were made, and we assume we knew about the - announcement on all prior dates. This mode is only supported if - ``infer_timestamp`` is explicitly passed as a truthy value. - - infer_timestamps : bool, optional - Whether to allow passing ``DatetimeIndex`` values in - ``announcement_dates``. - """ - def __init__(self, - all_dates, - announcement_dates, + def __init__(self, all_dates, events_by_sid, infer_timestamps=False, dataset=EarningsCalendar): - self.all_dates = all_dates - self.announcement_dates = announcement_dates = ( - announcement_dates.copy() + super(EarningsCalendarLoader, self).__init__( + all_dates, events_by_sid, infer_timestamps, dataset=dataset, ) - dates = self.all_dates.values - for k, v in iteritems(announcement_dates): - if isinstance(v, pd.DatetimeIndex): - if not infer_timestamps: - raise ValueError( - "Got DatetimeIndex of announcement dates for sid %d.\n" - "Pass `infer_timestamps=True` to use the first date in" - " `all_dates` as implicit timestamp." - ) - # If we are passed a DatetimeIndex, we always have - # knowledge of the announcements. - announcement_dates[k] = pd.Series( - v, index=repeat(dates[0], len(v)), - ) - self.dataset = dataset - - def get_loader(self, column): - """Dispatch to the loader for ``column``. - """ - if column is self.dataset.next_announcement: - return self.next_announcement_loader - elif column is self.dataset.previous_announcement: - return self.previous_announcement_loader - else: - raise ValueError("Don't know how to load column '%s'." % column) @lazyval def next_announcement_loader(self): - return DataFrameLoader( - self.dataset.next_announcement, - next_date_frame( - self.all_dates, - self.announcement_dates, - ), - adjustments=None, - ) + return self._next_event_date_loader(self.dataset.next_announcement, + ANNOUNCEMENT_FIELD_NAME) @lazyval def previous_announcement_loader(self): - return DataFrameLoader( + return self._previous_event_date_loader( self.dataset.previous_announcement, - previous_date_frame( - self.all_dates, - self.announcement_dates, - ), - adjustments=None, - ) - - def load_adjusted_array(self, columns, dates, assets, mask): - return merge( - self.get_loader(column).load_adjusted_array( - [column], dates, assets, mask - ) - for column in columns + ANNOUNCEMENT_FIELD_NAME ) diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py new file mode 100644 index 00000000..84d0735c --- /dev/null +++ b/zipline/pipeline/loaders/events.py @@ -0,0 +1,205 @@ +import abc +import pandas as pd +from six import iteritems +from toolz import merge + +from .base import PipelineLoader +from .frame import DataFrameLoader +from .utils import next_date_frame, previous_date_frame, previous_value +from zipline.pipeline.common import TS_FIELD_NAME + +WRONG_COLS_ERROR = "Expected columns {expected_columns} for sid {sid} but " \ + "got columns {resulting_columns}." + +WRONG_SINGLE_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have " + "1 column and to be in a DataFrame, " + "Series, or DatetimeIndex.") + +WRONG_MANY_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have " + "more than 1 column and to be in a " + "DataFrame.") + +SERIES_NO_DTINDEX_ERROR = ("Got Series for sid {sid}, but index was not " + "DatetimeIndex.") + +DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid {sid}.\n" + "Pass `infer_timestamps=True` to use the first " + "date in `all_dates` as implicit timestamp.") + +DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '{" + "timestamp_column_name}' column for sid {sid}." + "\nPass `infer_timestamps=True` to use the " + "first date in `all_dates` as implicit " + "timestamp.") + + +class EventsLoader(PipelineLoader): + """ + Abstract loader. + + Does not currently support adjustments to the dates of known events. + + Parameters + ---------- + all_dates : pd.DatetimeIndex + Index of dates for which we can serve queries. + events_by_sid : dict[int -> pd.DataFrame or pd.Series or pd.DatetimeIndex] + Dict mapping sids to objects representing dates on which earnings + occurred. + + If a dict value is a Series, it's interpreted as a mapping from the + date on which we learned an announcement was coming to the date on + which the announcement was made. + + If a dict value is a DatetimeIndex, it's interpreted as just containing + the dates that announcements were made, and we assume we knew about the + announcement on all prior dates. This mode is only supported if + ``infer_timestamp`` is explicitly passed as a truthy value. + Dict mapping sids to DataFrames, Series, or DatetimeIndexes. + + If the value is a DataFrame, it then represents dates on which events + occurred along with other associated values. If the DataFrame + contains a "timestamp" column, that column is interpreted as the date + on which we learned about the event. If the DataFrames do not contain a + "timestamp" column, we assume we knew about the event on all prior + dates. This mode is only supported if ``infer_timestamp`` is + explicitly passed as a truthy value. + + infer_timestamps : bool, optional + Whether to allow omitting the "timestamp" column. + dataset : DataSet + The DataSet object for which this loader loads data. + + """ + + @abc.abstractproperty + def expected_cols(self): + raise NotImplemented('expected_cols') + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=None): + self.all_dates = all_dates + # Do not modify the original in place, since it may be used for other + # purposes. + self.events_by_sid = ( + events_by_sid.copy() + ) + dates = self.all_dates.values + + for k, v in iteritems(events_by_sid): + # Already a DataFrame + if isinstance(v, pd.DataFrame): + if TS_FIELD_NAME not in v.columns: + if not infer_timestamps: + raise ValueError( + DF_NO_TS_NOT_INFER_TS_ERROR.format( + timestamp_column_name=TS_FIELD_NAME, + sid=k + ) + ) + self.events_by_sid[k] = v = v.copy() + v.index = [dates[0]] * len(v) + else: + self.events_by_sid[k] = v.set_index(TS_FIELD_NAME) + # Once data is in a DF, make sure columns are correct. + cols_except_ts = (set(v.columns) - + {TS_FIELD_NAME}) + + # Check that all columns other than timestamp are as expected. + if cols_except_ts != self.expected_cols: + raise ValueError( + WRONG_COLS_ERROR.format( + expected_columns=list(self.expected_cols), + sid=k, + resulting_columns=v.columns.values + ) + ) + # Not a DataFrame and we only expect 1 column + elif len(self.expected_cols) == 1: + # First, must convert to DataFrame. + if isinstance(v, pd.Series): + if not isinstance(v.index, pd.DatetimeIndex): + raise ValueError( + SERIES_NO_DTINDEX_ERROR.format(sid=k) + ) + self.events_by_sid[k] = pd.DataFrame({ + list(self.expected_cols)[0]: v}) + elif isinstance(v, pd.DatetimeIndex): + if not infer_timestamps: + raise ValueError( + DTINDEX_NOT_INFER_TS_ERROR.format(sid=k) + ) + self.events_by_sid[k] = pd.DataFrame({ + list(self.expected_cols)[0]: v + }, index=[dates[0]] * len(v)) + else: + # We expect 1 column, but we got something other than a + # Series, DatetimeIndex, or DataFrame. + raise ValueError( + WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=k) + ) + else: + # We expected multiple columns, but we got something other + # than a DataFrame. + raise ValueError( + WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=k) + ) + + self.dataset = dataset + + def get_loader(self, column): + if column in self.dataset.columns: + return getattr(self, "%s_loader" % column.name) + raise ValueError("Don't know how to load column '%s'." % column) + + def load_adjusted_array(self, columns, dates, assets, mask): + return merge( + self.get_loader(column).load_adjusted_array( + [column], dates, assets, mask + ) + for column in columns + ) + + def _next_event_date_loader(self, next_date_field, event_date_field_name): + return DataFrameLoader( + next_date_field, + next_date_frame( + self.all_dates, + self.events_by_sid, + event_date_field_name + ), + adjustments=None, + ) + + def _previous_event_date_loader(self, + prev_date_field, + event_date_field_name): + return DataFrameLoader( + prev_date_field, + previous_date_frame( + self.all_dates, + self.events_by_sid, + event_date_field_name, + ), + adjustments=None, + ) + + def _previous_event_value_loader(self, + previous_value_field, + event_date_field_name, + value_field_name): + return DataFrameLoader( + previous_value_field, + previous_value( + self.all_dates, + self.events_by_sid, + event_date_field_name, + value_field_name, + previous_value_field.dtype, + previous_value_field.missing_value + ), + adjustments=None, + ) diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 7f9448e7..73d0ad3f 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -5,10 +5,10 @@ import pandas as pd from six import iteritems from six.moves import zip -from zipline.utils.numpy_utils import NaTns +from zipline.utils.numpy_utils import NaTns, NaTD -def next_date_frame(dates, events_by_sid): +def next_date_frame(dates, events_by_sid, event_date_field_name): """ Make a DataFrame representing the simulated next known date for an event. @@ -20,6 +20,9 @@ def next_date_frame(dates, events_by_sid): Dict mapping sids to a series of dates. Each k:v pair of the series represents the date we learned of the event mapping to the date the event will occur. + event_date_field_name : str + The name of the date field that marks when the event occurred. + Returns ------- next_events: pd.DataFrame @@ -37,7 +40,8 @@ def next_date_frame(dates, events_by_sid): equity: np.full_like(dates, NaTns) for equity in events_by_sid } raw_dates = dates.values - for equity, event_dates in iteritems(events_by_sid): + for equity, df in iteritems(events_by_sid): + event_dates = df[event_date_field_name] data = cols[equity] if not event_dates.index.is_monotonic_increasing: event_dates = event_dates.sort_index() @@ -56,7 +60,51 @@ def next_date_frame(dates, events_by_sid): return pd.DataFrame(index=dates, data=cols) -def previous_date_frame(date_index, events_by_sid): +def previous_date_frame(date_index, events_by_sid, event_date_field_name): + """ + Make a DataFrame representing simulated next earnings date_index. + + Parameters + ---------- + date_index : DatetimeIndex. + The index of the returned DataFrame. + events_by_sid : dict[int -> pd.DataFrame] + Dict mapping sids to a DataFrame. The index of the DataFrame + represents the date we learned of the event mapping to the event + data. + event_date_field_name : str + The name of the date field that marks when the event occurred. + + Returns + ------- + previous_events: pd.DataFrame + A DataFrame where each column is a security from `events_by_sid` where + the values are the dates of the previous event that occurred on the + date of the index. Entries falling before the first date will have + `NaT` as the result in the output. + + See Also + -------- + next_date_frame + """ + sids = list(events_by_sid) + out = np.full((len(date_index), len(sids)), NaTD, dtype='datetime64[ns]') + d_n = date_index[-1].asm8 + for col_idx, sid in enumerate(sids): + # events_by_sid[sid] is Series mapping knowledge_date to actual + # event_date. We don't care about the knowledge date for + # computing previous earnings. + values = events_by_sid[sid][event_date_field_name].values + values = values[values <= d_n] + out[date_index.searchsorted(values), col_idx] = values + + frame = pd.DataFrame(out, index=date_index, columns=sids) + frame.ffill(inplace=True) + return frame + + +def previous_value(date_index, events_by_sid, event_date_field, value_field, + value_field_dtype, missing_value): """ Make a DataFrame representing simulated next earnings date_index. @@ -82,15 +130,21 @@ def previous_date_frame(date_index, events_by_sid): next_date_frame """ sids = list(events_by_sid) - out = np.full((len(date_index), len(sids)), NaTns, dtype='datetime64[ns]') - dn = date_index[-1].asm8 + out = np.full( + (len(date_index), len(sids)), + missing_value, + dtype=value_field_dtype + ) + d_n = date_index[-1].asm8 for col_idx, sid in enumerate(sids): - # events_by_sid[sid] is Series mapping knowledge_date to actual - # event_date. We don't care about the knowledge date for - # computing previous earnings. - values = events_by_sid[sid].values - values = values[values <= dn] - out[date_index.searchsorted(values), col_idx] = values + # events_by_sid[sid] is DataFrame mapping knowledge_date to event + # date and value. We don't care about the knowledge date for computing + # previous values. + df = events_by_sid[sid] + df = df[df[event_date_field] <= d_n] + out[ + date_index.searchsorted(df[event_date_field].values), col_idx + ] = df[value_field] frame = pd.DataFrame(out, index=date_index, columns=sids) frame.ffill(inplace=True)