From 3142fa516fb190eede217f7d7a34ba35b87ce422 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Fri, 15 Jan 2016 16:58:04 -0500 Subject: [PATCH] ENH: add buyback_auth loader. WIP: finish refactoring blaze events loader. WIP: tests passing for earnings. BUG: pass all kwargs explicitly for BlazeEventsCalendarLoader. If this is not done, resources are not bound correctly. MAINT: refactor for buyback_auth. --- tests/pipeline/test_buyback_auth.py | 404 ++++++++++++++++++ tests/pipeline/test_earnings.py | 72 ++-- zipline/pipeline/data/__init__.py | 3 + zipline/pipeline/data/buyback_auth.py | 12 +- zipline/pipeline/factors/events.py | 112 +++-- zipline/pipeline/loaders/blaze/__init__.py | 14 + .../pipeline/loaders/blaze/buyback_auth.py | 134 ++++++ zipline/pipeline/loaders/blaze/core.py | 1 + zipline/pipeline/loaders/blaze/earnings.py | 98 +---- zipline/pipeline/loaders/blaze/events.py | 120 ++++++ zipline/pipeline/loaders/buyback_auth.py | 123 +++++- zipline/pipeline/loaders/earnings.py | 92 +--- zipline/pipeline/loaders/events.py | 103 ++++- zipline/pipeline/loaders/utils.py | 60 ++- 14 files changed, 1090 insertions(+), 258 deletions(-) create mode 100644 tests/pipeline/test_buyback_auth.py create mode 100644 zipline/pipeline/loaders/blaze/buyback_auth.py create mode 100644 zipline/pipeline/loaders/blaze/events.py diff --git a/tests/pipeline/test_buyback_auth.py b/tests/pipeline/test_buyback_auth.py new file mode 100644 index 00000000..c76167ae --- /dev/null +++ b/tests/pipeline/test_buyback_auth.py @@ -0,0 +1,404 @@ +""" +Tests for the reference loader for EarningsCalendar. +""" +from unittest import TestCase + +import blaze as bz +from blaze.compute.core import swap_resources_into_scope +from contextlib2 import ExitStack +from nose_parameterized import parameterized +import pandas as pd +import numpy as np +from pandas.util.testing import assert_series_equal +from six import iteritems + +from zipline.pipeline import Pipeline +from zipline.pipeline.data import (CashBuybackAuthorizations, + ShareBuybackAuthorizations) +from zipline.pipeline.engine import SimplePipelineEngine +from zipline.pipeline.factors.events import ( + BusinessDaysSincePreviousCashBuybackAuth, + BusinessDaysSincePreviousShareBuybackAuth +) +from zipline.pipeline.loaders.buyback_auth import \ + CashBuybackAuthorizationsLoader, ShareBuybackAuthorizationsLoader +from zipline.pipeline.loaders.blaze import ( + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + CashBuybackAuthorizationsLoader, + SHARE_COUNT_FIELD_NAME, + SID_FIELD_NAME, + ShareBuybackAuthorizationsLoader, + TS_FIELD_NAME, + VALUE_FIELD_NAME +) +from zipline.utils.numpy_utils import make_datetime64D, np_NaT +from zipline.utils.test_utils import ( + make_simple_equity_info, + tmp_asset_finder, + gen_calendars, + num_days_in_range, +) + + +sids = A, B, C, D, E = range(5) + +equity_info = make_simple_equity_info( + sids, + start_date=pd.Timestamp('2013-01-01', tz='UTC'), + end_date=pd.Timestamp('2015-01-01', tz='UTC'), + ) + +buyback_authorizations = { + # K1--K2--A1--A2--SC1--SC2--V1--V2. + A: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']), + BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15', + '2014-01-20']), + SHARE_COUNT_FIELD_NAME: [1, 15], + VALUE_FIELD_NAME: [10, 20] + }), + # K1--K2--E2--E1. + B: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']), + BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([ + '2014-01-20', '2014-01-15']), + SHARE_COUNT_FIELD_NAME: [7, 13], VALUE_FIELD_NAME: [10, 22] + }), + # K1--E1--K2--E2. + C: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05', '2014-01-15']), + BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([ + '2014-01-10', '2014-01-20']), + SHARE_COUNT_FIELD_NAME: [3, 1], + VALUE_FIELD_NAME: [4, 7] + }), + # K1 == K2. + D: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05'] * 2), + BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([ + '2014-01-10', '2014-01-15']), + SHARE_COUNT_FIELD_NAME: [6, 23], + VALUE_FIELD_NAME: [1, 2] + }), + E: pd.DataFrame( + columns=["timestamp", + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME, + VALUE_FIELD_NAME], + dtype='datetime64[ns]' + ), + } + +param_dates = gen_calendars( + '2014-01-01', + '2014-01-31', + critical_dates=pd.to_datetime([ + '2014-01-05', + '2014-01-10', + '2014-01-15', + '2014-01-20', + ]), + ) + + +def zip_with_floats(flts, dates): + return pd.Series(flts, index=dates).astype('float') + + +def num_days_between(dates, start_date, end_date): + return num_days_in_range(dates, start_date, end_date) + + +def zip_with_dates(dts, dates): + return pd.Series(pd.to_datetime(dts), index=dates) + + +class BuybackAuthLoaderTestCase(TestCase): + """ + Tests for loading the earnings announcement data. + """ + + @classmethod + def setUpClass(cls): + cls._cleanup_stack = stack = ExitStack() + + cls.finder = stack.enter_context( + tmp_asset_finder(equities=equity_info), + ) + cls.cols = {} + cls.buyback_authorizations = None + + + @classmethod + def tearDownClass(cls): + cls._cleanup_stack.close() + + def loader_args(self, dates): + """Construct the base buyback authorizations object to pass to the + loader. + + Parameters + ---------- + dates : pd.DatetimeIndex + The dates we can serve. + + Returns + ------- + args : tuple[any] + The arguments to forward to the loader positionally. + """ + return dates, self.buyback_authorizations + + def setup(self, dates): + """ + Make a PipelineEngine and expectation functions for the given dates + calendar. + + This exists to make it easy to test our various cases with critical + dates missing from the calendar. + """ + + _expected_previous_buyback_announcement = pd.DataFrame({ + A: zip_with_dates( + ['NaT'] * num_days_between(dates, None, '2014-01-14') + + ['2014-01-15'] * num_days_between(dates, '2014-01-15', '2014-01-19') + + ['2014-01-20'] * num_days_between(dates, '2014-01-20', None), + dates + ), + B: zip_with_dates( + ['NaT'] * num_days_between(dates, None, '2014-01-14') + + ['2014-01-15'] * num_days_between(dates, '2014-01-15', '2014-01-19') + + ['2014-01-20'] * num_days_between(dates, '2014-01-20', None), + dates + ), + C: zip_with_dates( + ['NaT'] * num_days_between(dates, None, '2014-01-09') + + ['2014-01-10'] * num_days_between(dates, '2014-01-10', '2014-01-19') + + ['2014-01-20'] * num_days_between(dates, '2014-01-20', None), + dates + ), + D: zip_with_dates( + ['NaT'] * num_days_between(dates, None, '2014-01-09') + + ['2014-01-10'] * num_days_between(dates, '2014-01-10', '2014-01-14') + + ['2014-01-15'] * num_days_between(dates, '2014-01-15', None), + dates + ), + E: zip_with_dates(['NaT'] * len(dates), dates), + }, index=dates) + + _expected_previous_busday_offsets = self._compute_busday_offsets( + _expected_previous_buyback_announcement + ) + + self.cols['previous_buyback_announcement'] = _expected_previous_buyback_announcement + self.cols['days_since_prev'] = _expected_previous_busday_offsets + + loader = self.loader_type(*self.loader_args(dates)) + engine = SimplePipelineEngine(lambda _: loader, dates, self.finder) + return engine + + @staticmethod + def _compute_busday_offsets(announcement_dates): + """ + Compute expected business day offsets from a DataFrame of announcement + dates. + """ + # Column-vector of dates on which factor `compute` will be called. + raw_call_dates = announcement_dates.index.values.astype( + 'datetime64[D]' + )[:, None] + + # 2D array of dates containining expected nexg announcement. + raw_announce_dates = ( + announcement_dates.values.astype('datetime64[D]') + ) + + # Set NaTs to 0 temporarily because busday_count doesn't support NaT. + # We fill these entries with NaNs later. + whereNaT = raw_announce_dates == np_NaT + raw_announce_dates[whereNaT] = make_datetime64D(0) + + # The abs call here makes it so that we can use this function to + # compute offsets for both next and previous earnings (previous + # earnings offsets come back negative). + expected = abs(np.busday_count( + raw_call_dates, + raw_announce_dates + ).astype(float)) + + expected[whereNaT] = np.nan + return pd.DataFrame( + data=expected, + columns=announcement_dates.columns, + index=announcement_dates.index, + ) + + def _test_compute_buyback_auth(self, dates): + engine = self.setup(dates) + + pipe = Pipeline( + columns=self.pipeline_columns + ) + + result = engine.run_pipeline( + pipe, + start_date=dates[0], + end_date=dates[-1], + ) + + for sid in sids: + for col_name in self.cols.keys(): + assert_series_equal(result[col_name].xs(sid, level=1), + self.cols[col_name][sid], + sid) + + +class ShareBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase): + buyback_authorizations = {sid: df.drop(VALUE_FIELD_NAME, 1) + for sid, df in iteritems(buyback_authorizations)} + pipeline_columns = { + 'previous_buyback_share_count': + ShareBuybackAuthorizations.previous_share_count.latest, + 'previous_buyback_announcement': + ShareBuybackAuthorizations.previous_announcement_date.latest, + 'days_since_prev': + BusinessDaysSincePreviousShareBuybackAuth(), + } + + @classmethod + def setUpClass(cls): + super(ShareBuybackAuthLoaderTestCase, cls).setUpClass() + cls.buyback_authorizations = buyback_authorizations + cls.loader_type = ShareBuybackAuthorizationsLoader + + def setup(self, dates): + engine = super(ShareBuybackAuthLoaderTestCase, self).setup(dates) + _expected_previous_buyback_share_count = pd.DataFrame({ + A: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-14') + + [1] * num_days_between(dates, '2014-01-15', '2014-01-19') + + [15] * num_days_between(dates, '2014-01-20', None), dates), + B: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-14') + + [13] * num_days_between(dates, '2014-01-15', '2014-01-19') + + [7] * num_days_between(dates, '2014-01-20', None), dates), + C: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') + + [3] * num_days_between(dates, '2014-01-10', '2014-01-19') + + [1] * num_days_between(dates, '2014-01-20', None), dates), + D: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') + + [6] * num_days_between(dates, '2014-01-10', '2014-01-14') + + [23] * num_days_between(dates, '2014-01-15', None), dates), + E: zip_with_floats(['NaN'] * len(dates), dates), + }, index=dates) + self.cols['previous_buyback_share_count'] = _expected_previous_buyback_share_count + return engine + + @parameterized.expand(param_dates) + def test_compute_buyback_auth(self, dates): + self._test_compute_buyback_auth(dates) + + +class CashBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase): + buyback_authorizations = {sid: df.drop(SHARE_COUNT_FIELD_NAME, 1) + for sid, df in iteritems(buyback_authorizations)} + pipeline_columns = { + 'previous_buyback_value': + CashBuybackAuthorizations.previous_value.latest, + 'previous_buyback_announcement': + CashBuybackAuthorizations.previous_announcement_date.latest, + 'days_since_prev': + BusinessDaysSincePreviousCashBuybackAuth(), + } + + @classmethod + def setUpClass(cls): + super(CashBuybackAuthLoaderTestCase, cls).setUpClass() + cls.buyback_authorizations = buyback_authorizations + cls.loader_type = CashBuybackAuthLoaderTestCase + + def setup(self, dates): + engine = super(ShareBuybackAuthLoaderTestCase, self).setup(dates) + _expected_previous_value = pd.DataFrame({ + # TODO if the next knowledge date is 10, why is the range + # until 15? + A: zip_with_floats( + ['NaN'] * num_days_between(dates, None, '2014-01-14') + + [10] * num_days_between(dates, '2014-01-15', '2014-01-19') + + [20] * num_days_between(dates, '2014-01-20', None), dates), + B: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-14') + + [22] * num_days_between(dates, '2014-01-15', '2014-01-19') + + [10] * num_days_between(dates, '2014-01-20', None), dates), + C: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') + + [4] * num_days_between(dates, '2014-01-10', '2014-01-19') + + [7] * num_days_between(dates, '2014-01-20', None), dates), + D: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') + + [1] * num_days_between(dates, '2014-01-10', '2014-01-14') + + [2] * num_days_between(dates, '2014-01-15', None), dates), + E: zip_with_floats(['NaN'] * len(dates), dates), + }, index=dates) + self.cols['previous_buyback_value'] = _expected_previous_value + return engine + + @parameterized.expand(param_dates) + def test_compute_buyback_auth(self, dates): + self._test_compute_buyback_auth(dates) + + +# class BlazeBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase): +# loader_type = BlazeBuybackAuthorizationsLoader +# +# def loader_args(self, dates): +# _, mapping = super( +# BlazeBuybackAuthLoaderTestCase, +# self, +# ).loader_args(dates) +# return (bz.Data(pd.concat( +# pd.DataFrame({ +# BUYBACK_ANNOUNCEMENT_FIELD_NAME: +# frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME], +# SHARE_COUNT_FIELD_NAME: frame[SHARE_COUNT_FIELD_NAME], +# VALUE_FIELD_NAME: frame[VALUE_FIELD_NAME], +# TS_FIELD_NAME: frame.index, +# SID_FIELD_NAME: sid, +# }) +# for sid, frame in iteritems(mapping) +# ).reset_index(drop=True)),) +# +# +# class BlazeEarningsCalendarLoaderNotInteractiveTestCase( +# BlazeBuybackAuthLoaderTestCase): +# """Test case for passing a non-interactive symbol and a dict of resources. +# """ +# def loader_args(self, dates): +# (bound_expr,) = super( +# BlazeEarningsCalendarLoaderNotInteractiveTestCase, +# self, +# ).loader_args(dates) +# return swap_resources_into_scope(bound_expr, {}) +# +# +# class BuybackAuthLoaderInferTimestampTestCase(TestCase): +# def test_infer_timestamp(self): +# dtx = pd.date_range('2014-01-01', '2014-01-10') +# events_by_sid = { +# 0: pd.DataFrame({BUYBACK_ANNOUNCEMENT_FIELD_NAME: dtx}), +# 1: pd.DataFrame( +# {BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.Series(dtx, dtx)}, +# index=dtx +# ) +# } +# loader = BuybackAuthorizationsLoader( +# dtx, +# events_by_sid, +# infer_timestamps=True, +# ) +# self.assertEqual( +# loader.events_by_sid.keys(), +# events_by_sid.keys(), +# ) +# assert_series_equal( +# loader.events_by_sid[0][BUYBACK_ANNOUNCEMENT_FIELD_NAME], +# pd.Series(index=[dtx[0]] * 10, data=dtx), +# ) +# assert_series_equal( +# loader.events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME], +# events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME], +# ) diff --git a/tests/pipeline/test_earnings.py b/tests/pipeline/test_earnings.py index 57b72420..4f78fb68 100644 --- a/tests/pipeline/test_earnings.py +++ b/tests/pipeline/test_earnings.py @@ -57,30 +57,33 @@ class EarningsCalendarLoaderTestCase(TestCase): cls.earnings_dates = { # K1--K2--E1--E2. - A: to_series( - knowledge_dates=['2014-01-05', '2014-01-10'], - earning_dates=['2014-01-15', '2014-01-20'], - ), + A: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15', + '2014-01-20']) + }), # K1--K2--E2--E1. - B: to_series( - knowledge_dates=['2014-01-05', '2014-01-10'], - earning_dates=['2014-01-20', '2014-01-15'] - ), + B: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-20', + '2014-01-15']) + }), # K1--E1--K2--E2. - C: to_series( - knowledge_dates=['2014-01-05', '2014-01-15'], - earning_dates=['2014-01-10', '2014-01-20'] - ), + C: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05', '2014-01-15']), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10', + '2014-01-20']) + }), # K1 == K2. - D: to_series( - knowledge_dates=['2014-01-05'] * 2, - earning_dates=['2014-01-10', '2014-01-15'], - ), - E: pd.Series( - data=[], - index=pd.DatetimeIndex([]), - dtype='datetime64[ns]', - ), + D: pd.DataFrame({ + "timestamp": pd.to_datetime(['2014-01-05'] * 2), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10', + '2014-01-15']) + }), + E: pd.DataFrame({ + "timestamp": pd.to_datetime([]), + ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([]) + }) } @classmethod @@ -118,7 +121,8 @@ class EarningsCalendarLoaderTestCase(TestCase): def zip_with_dates(dts): return pd.Series(pd.to_datetime(dts), index=dates) - + # TODO: tests will break because I now need mappings of sid -> + # dataframe instead of sid -> series _expected_next_announce = pd.DataFrame({ A: zip_with_dates( ['NaT'] * num_days_between(None, '2014-01-04') + @@ -345,11 +349,11 @@ class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase): ).loader_args(dates) return (bz.Data(pd.concat( pd.DataFrame({ - ANNOUNCEMENT_FIELD_NAME: earning_dates, - TS_FIELD_NAME: earning_dates.index, + ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME], + TS_FIELD_NAME: df[TS_FIELD_NAME], SID_FIELD_NAME: sid, }) - for sid, earning_dates in iteritems(mapping) + for sid, df in iteritems(mapping) ).reset_index(drop=True)),) @@ -369,8 +373,8 @@ class EarningsCalendarLoaderInferTimestampTestCase(TestCase): def test_infer_timestamp(self): dtx = pd.date_range('2014-01-01', '2014-01-10') announcement_dates = { - 0: dtx, - 1: pd.Series(dtx, dtx), + 0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), + 1: pd.DataFrame({TS_FIELD_NAME: dtx, ANNOUNCEMENT_FIELD_NAME: dtx}), } loader = EarningsCalendarLoader( dtx, @@ -378,14 +382,18 @@ class EarningsCalendarLoaderInferTimestampTestCase(TestCase): infer_timestamps=True, ) self.assertEqual( - loader.announcement_dates.keys(), + loader.events_by_sid.keys(), announcement_dates.keys(), ) assert_series_equal( - loader.announcement_dates[0], - pd.Series(index=[dtx[0]] * 10, data=dtx), + pd.Series(loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME]), + pd.Series(index=[dtx[0]] * 10, data=dtx, + name=ANNOUNCEMENT_FIELD_NAME), ) assert_series_equal( - loader.announcement_dates[1], - announcement_dates[1], + pd.Series(loader.events_by_sid[1][ANNOUNCEMENT_FIELD_NAME]), + pd.Series(index=announcement_dates[1][TS_FIELD_NAME], + data=np.array(announcement_dates[1][ + ANNOUNCEMENT_FIELD_NAME]), + name=ANNOUNCEMENT_FIELD_NAME) ) diff --git a/zipline/pipeline/data/__init__.py b/zipline/pipeline/data/__init__.py index 894bc86e..f7357f09 100644 --- a/zipline/pipeline/data/__init__.py +++ b/zipline/pipeline/data/__init__.py @@ -1,11 +1,14 @@ +from .buyback_auth import CashBuybackAuthorizations, ShareBuybackAuthorizations from .earnings import EarningsCalendar from .equity_pricing import USEquityPricing from .dataset import DataSet, Column, BoundColumn __all__ = [ 'BoundColumn', + 'CashBuybackAuthorizations', 'Column', 'DataSet', 'EarningsCalendar', + 'ShareBuybackAuthorizations', 'USEquityPricing', ] diff --git a/zipline/pipeline/data/buyback_auth.py b/zipline/pipeline/data/buyback_auth.py index dd58e9b2..7c1cf952 100644 --- a/zipline/pipeline/data/buyback_auth.py +++ b/zipline/pipeline/data/buyback_auth.py @@ -6,12 +6,14 @@ from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype from .dataset import Column, DataSet -class BuybackAuthorizations(DataSet): +class CashBuybackAuthorizations(DataSet): """ Dataset representing dates of recently announced buyback authorization. """ - previous_buyback_value = Column(float64_dtype) - previous_buyback_share_count = Column(float64_dtype) - previous_buyback_value_announcement = Column(datetime64ns_dtype) - previous_buyback_share_count_announcement = Column(datetime64ns_dtype) + previous_value = Column(float64_dtype) + previous_announcement_date = Column(datetime64ns_dtype) + +class ShareBuybackAuthorizations(DataSet): + previous_share_count = Column(float64_dtype) + previous_announcement_date = Column(datetime64ns_dtype) diff --git a/zipline/pipeline/factors/events.py b/zipline/pipeline/factors/events.py index 2491efc8..481e0e8a 100644 --- a/zipline/pipeline/factors/events.py +++ b/zipline/pipeline/factors/events.py @@ -3,6 +3,10 @@ Factors describing information about event data (e.g. earnings announcements, acquisitions, dividends, etc.). """ from numpy import newaxis +from zipline.pipeline.data.buyback_auth import ( + CashBuybackAuthorizations, + ShareBuybackAuthorizations +) from zipline.pipeline.data.earnings import EarningsCalendar from zipline.utils.numpy_utils import ( NaTD, @@ -14,10 +18,42 @@ from zipline.utils.numpy_utils import ( from .factor import Factor -class BusinessDaysUntilNextEarnings(Factor): +class BusinessDaysSincePreviousEvents(Factor): """ - Factor returning the number of **business days** (not trading days!) until - the next known earnings date for each asset. + Abstract class for business days since a previous event. + Returns the number of **business days** (not trading days!) since + the most recent event date for each asset. + + This doesn't use trading days for symmetry with + BusinessDaysUntilNextEarnings. + + Assets which announced or will announce the event today will produce a value + of 0.0. Assets that announced the event on the previous business day will + produce a value of 1.0. + + Assets for which the event date is `NaT` will produce a value of `NaN`. + """ + window_length = 0 + dtype = float64_dtype + + def _compute(self, arrays, dates, assets, mask): + + # Coerce from [ns] to [D] for numpy busday_count. + announce_dates = arrays[0].astype(datetime64D_dtype) + + # Set masked values to NaT. + announce_dates[~mask] = np_NaT + + # Convert row labels into a column vector for broadcasted comparison. + reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis] + return busday_count_mask_NaT(announce_dates, reference_dates) + + +class BusinessDaysUntilNextEvents(Factor): + """ + Abstract class for business days since a next event. + Returns the number of **business days** (not trading days!) until + the next known event date for each asset. This doesn't use trading days because the trading calendar includes information that may not have been available to the algorithm at the time @@ -26,19 +62,12 @@ class BusinessDaysUntilNextEarnings(Factor): For example, the NYSE closings September 11th 2001, would not have been known to the algorithm on September 10th. - Assets that announced or will announce earnings today will produce a value - of 0.0. Assets that will announce earnings on the next upcoming business + Assets that announced or will announce the event today will produce a value + of 0.0. Assets that will announce the event on the next upcoming business day will produce a value of 1.0. - Assets for which `EarningsCalendar.next_announcement` is `NaT` will produce - a value of `NaN`. - - - See Also - -------- - zipline.pipeline.factors.BusinessDaysSincePreviousEarnings + Assets for which the event date is `NaT` will produce a value of `NaN`. """ - inputs = [EarningsCalendar.next_announcement] window_length = 0 dtype = float64_dtype @@ -55,37 +84,52 @@ class BusinessDaysUntilNextEarnings(Factor): return busday_count_mask_NaT(reference_dates, announce_dates) -class BusinessDaysSincePreviousEarnings(Factor): +class BusinessDaysUntilNextEarnings(BusinessDaysUntilNextEvents): + """ + Factor returning the number of **business days** (not trading days!) until + the next known earnings date for each asset. + + See Also + -------- + zipline.pipeline.factors.BusinessDaysSincePreviousEarnings + """ + inputs = [EarningsCalendar.next_announcement] + + +class BusinessDaysSincePreviousEarnings(BusinessDaysSincePreviousEvents): """ Factor returning the number of **business days** (not trading days!) since the most recent earnings date for each asset. - This doesn't use trading days for symmetry with - BusinessDaysUntilNextEarnings. - - Assets which announced or will announce earnings today will produce a value - of 0.0. Assets that announced earnings on the previous business day will - produce a value of 1.0. - - Assets for which `EarningsCalendar.previous_announcement` is `NaT` will - produce a value of `NaN`. - See Also -------- zipline.pipeline.factors.BusinessDaysUntilNextEarnings """ inputs = [EarningsCalendar.previous_announcement] - window_length = 0 - dtype = float64_dtype - def _compute(self, arrays, dates, assets, mask): - # Coerce from [ns] to [D] for numpy busday_count. - announce_dates = arrays[0].astype(datetime64D_dtype) +class BusinessDaysSincePreviousCashBuybackAuth(BusinessDaysSincePreviousEvents): + """ + Factor returning the number of **business days** (not trading days!) since + the most recent cash buyback authorization for each asset. - # Set masked values to NaT. - announce_dates[~mask] = NaTD + See Also + -------- + zipline.pipeline.factors.BusinessDaysUntilNextEarnings + """ + inputs = [CashBuybackAuthorizations.previous_announcement_date] - # Convert row labels into a column vector for broadcasted comparison. - reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis] - return busday_count_mask_NaT(announce_dates, reference_dates) + +class BusinessDaysSincePreviousShareBuybackAuth( + BusinessDaysSincePreviousEvents +): + """ + Factor returning the number of **business days** (not trading days!) since + the most recent share buyback authorization for each asset. + + + See Also + -------- + zipline.pipeline.factors.BusinessDaysUntilNextEarnings + """ + inputs = [ShareBuybackAuthorizations.previous_announcement_date] diff --git a/zipline/pipeline/loaders/blaze/__init__.py b/zipline/pipeline/loaders/blaze/__init__.py index 200fa583..9702d5ea 100644 --- a/zipline/pipeline/loaders/blaze/__init__.py +++ b/zipline/pipeline/loaders/blaze/__init__.py @@ -1,3 +1,7 @@ +from .buyback_auth import ( + CashBuybackAuthorizationsLoader, + ShareBuybackAuthorizationsLoader +) from .core import ( AD_FIELD_NAME, BlazeLoader, @@ -7,6 +11,11 @@ from .core import ( from_blaze, global_loader, ) +from .buyback_auth import ( + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME, + VALUE_FIELD_NAME +) from .earnings import ( ANNOUNCEMENT_FIELD_NAME, BlazeEarningsCalendarLoader, @@ -17,9 +26,14 @@ __all__ = ( 'ANNOUNCEMENT_FIELD_NAME', 'BlazeEarningsCalendarLoader', 'BlazeLoader', + 'BUYBACK_ANNOUNCEMENT_FIELD_NAME', + 'CashBuybackAuthorizationsLoader', 'NoDeltasWarning', + 'SHARE_COUNT_FIELD_NAME', 'SID_FIELD_NAME', + 'ShareBuybackAuthorizationsLoader', 'TS_FIELD_NAME', + 'VALUE_FIELD_NAME', 'from_blaze', 'global_loader', ) diff --git a/zipline/pipeline/loaders/blaze/buyback_auth.py b/zipline/pipeline/loaders/blaze/buyback_auth.py new file mode 100644 index 00000000..b98a03aa --- /dev/null +++ b/zipline/pipeline/loaders/blaze/buyback_auth.py @@ -0,0 +1,134 @@ +from .core import ( + TS_FIELD_NAME, + SID_FIELD_NAME, +) +from zipline.pipeline.data import (CashBuybackAuthorizations, + ShareBuybackAuthorizations) +from zipline.pipeline.loaders.buyback_auth import ( + CashBuybackAuthorizationsLoader, + ShareBuybackAuthorizationsLoader +) +from .events import BlazeEventsCalendarLoader + + +BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_dates' +SHARE_COUNT_FIELD_NAME = 'share_counts' +VALUE_FIELD_NAME = 'values' + + +class BlazeCashBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): + """A pipeline loader for the ``BuybackAuth`` dataset that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime, + {VALUE_FIELD_NAME}: ?float64 + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the announcement, the + date when the buyback was announced, the share count, and the value. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME, + VALUE_FIELD_NAME=VALUE_FIELD_NAME + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + VALUE_FIELD_NAME + }) + + def __init__(self, + expr, + dataset=CashBuybackAuthorizations, + loader=CashBuybackAuthorizationsLoader, + **kwargs): + super( + BlazeCashBuybackAuthorizationsLoader, self + ).__init__(expr, dataset=dataset, loader=loader, **kwargs) + + +class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader): + """A pipeline loader for the ``BuybackAuth`` dataset that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + {BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime, + {SHARE_COUNT_FIELD_NAME}: ?float64, + }} + + Where each row of the table is a record including the sid to identify the + company, the timestamp where we learned about the announcement, the + date when the buyback was announced, the share count, and the value. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + __doc__ = __doc__.format( + TS_FIELD_NAME=TS_FIELD_NAME, + SID_FIELD_NAME=SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME=SHARE_COUNT_FIELD_NAME, + ) + + _expected_fields = frozenset({ + TS_FIELD_NAME, + SID_FIELD_NAME, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME, + }) + + def __init__(self, + expr, + dataset=ShareBuybackAuthorizations, + loader=ShareBuybackAuthorizationsLoader, + **kwargs): + super( + BlazeShareBuybackAuthorizationsLoader, self + ).__init__(expr, dataset=dataset, loader=loader, **kwargs) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 0fcd52e0..df34c975 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -182,6 +182,7 @@ from zipline.utils.preprocess import preprocess AD_FIELD_NAME = 'asof_date' TS_FIELD_NAME = 'timestamp' SID_FIELD_NAME = 'sid' +ANNOUNCEMENT_FIELD_NAME = 'announcement_date' valid_deltas_node_types = ( bz.expr.Field, bz.expr.ReLabel, diff --git a/zipline/pipeline/loaders/blaze/earnings.py b/zipline/pipeline/loaders/blaze/earnings.py index 9f5137f8..287949da 100644 --- a/zipline/pipeline/loaders/blaze/earnings.py +++ b/zipline/pipeline/loaders/blaze/earnings.py @@ -1,29 +1,14 @@ -from datashape import istabular -import pandas as pd -from toolz import valmap - from .core import ( + ANNOUNCEMENT_FIELD_NAME, TS_FIELD_NAME, - SID_FIELD_NAME, - bind_expression_to_resources, - ffill_query_in_range, + SID_FIELD_NAME ) from zipline.pipeline.data import EarningsCalendar -from zipline.pipeline.loaders.base import PipelineLoader from zipline.pipeline.loaders.earnings import EarningsCalendarLoader -from zipline.pipeline.loaders.utils import ( - check_data_query_args, - normalize_data_query_bounds, - normalize_timestamp_to_query_time, -) -from zipline.utils.input_validation import ensure_timezone, optionally -from zipline.utils.preprocess import preprocess +from .events import BlazeEventsCalendarLoader -ANNOUNCEMENT_FIELD_NAME = 'announcement_date' - - -class BlazeEarningsCalendarLoader(PipelineLoader): +class BlazeEarningsCalendarLoader(BlazeEventsCalendarLoader): """A pipeline loader for the ``EarningsCalendar`` dataset that loads data from a blaze expression. @@ -57,6 +42,7 @@ class BlazeEarningsCalendarLoader(PipelineLoader): If the '{TS_FIELD_NAME}' field is not included it is assumed that we start the backtest with knowledge of all announcements. """ + __doc__ = __doc__.format( TS_FIELD_NAME=TS_FIELD_NAME, SID_FIELD_NAME=SID_FIELD_NAME, @@ -69,75 +55,17 @@ class BlazeEarningsCalendarLoader(PipelineLoader): ANNOUNCEMENT_FIELD_NAME, }) - @preprocess(data_query_tz=optionally(ensure_timezone)) def __init__(self, expr, resources=None, odo_kwargs=None, data_query_time=None, data_query_tz=None, - dataset=EarningsCalendar): - dshape = expr.dshape - - if not istabular(dshape): - raise ValueError( - 'expression dshape must be tabular, got: %s' % dshape, - ) - - expected_fields = self._expected_fields - self._expr = bind_expression_to_resources( - expr[list(expected_fields)], - resources, - ) - self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {} - self._dataset = dataset - check_data_query_args(data_query_time, data_query_tz) - self._data_query_time = data_query_time - self._data_query_tz = data_query_tz - - def load_adjusted_array(self, columns, dates, assets, mask): - data_query_time = self._data_query_time - data_query_tz = self._data_query_tz - lower_dt, upper_dt = normalize_data_query_bounds( - dates[0], - dates[-1], - data_query_time, - data_query_tz, - ) - - raw = ffill_query_in_range( - self._expr, - lower_dt, - upper_dt, - self._odo_kwargs, - ) - sids = raw.loc[:, SID_FIELD_NAME] - raw.drop( - sids[~sids.isin(assets)].index, - inplace=True - ) - if data_query_time is not None: - normalize_timestamp_to_query_time( - raw, - data_query_time, - data_query_tz, - inplace=True, - ts_field=TS_FIELD_NAME, - ) - - gb = raw.groupby(SID_FIELD_NAME) - - def mkseries(idx, raw_loc=raw.loc): - vs = raw_loc[ - idx, [TS_FIELD_NAME, ANNOUNCEMENT_FIELD_NAME] - ].values - return pd.Series( - index=pd.DatetimeIndex(vs[:, 0]), - data=vs[:, 1], - ) - - return EarningsCalendarLoader( - dates, - valmap(mkseries, gb.groups), - dataset=self._dataset, - ).load_adjusted_array(columns, dates, assets, mask) + dataset=EarningsCalendar, + loader=EarningsCalendarLoader, + **kwargs): + super( + BlazeEarningsCalendarLoader, self + ).__init__(expr, dataset=dataset, loader=loader, resources=resources, + odo_kwargs=odo_kwargs, data_query_time=data_query_time, + data_query_tz=data_query_tz, **kwargs) diff --git a/zipline/pipeline/loaders/blaze/events.py b/zipline/pipeline/loaders/blaze/events.py new file mode 100644 index 00000000..c74cad5b --- /dev/null +++ b/zipline/pipeline/loaders/blaze/events.py @@ -0,0 +1,120 @@ +from datashape import istabular + +from .core import ( + TS_FIELD_NAME, + SID_FIELD_NAME, + bind_expression_to_resources, + ffill_query_in_range, +) +from zipline.pipeline.loaders.base import PipelineLoader +from zipline.pipeline.loaders.utils import ( + check_data_query_args, + normalize_data_query_bounds, + normalize_timestamp_to_query_time, +) +from zipline.utils.input_validation import ensure_timezone, optionally +from zipline.utils.preprocess import preprocess + + + +class BlazeEventsCalendarLoader(PipelineLoader): + """An abstract pipeline loader for the events datasets that loads + data from a blaze expression. + + Parameters + ---------- + expr : Expr + The expression representing the data to load. + resources : dict, optional + Mapping from the atomic terms of ``expr`` to actual data resources. + odo_kwargs : dict, optional + Extra keyword arguments to pass to odo when executing the expression. + data_query_time : time, optional + The time to use for the data query cutoff. + data_query_tz : tzinfo or str + The timezeone to use for the data query cutoff. + + Notes + ----- + The expression should have a tabular dshape of:: + + Dim * {{ + {SID_FIELD_NAME}: int64, + {TS_FIELD_NAME}: datetime, + }} + + And other dataset-specific fields, where each row of the table is a + record including the sid to identify the company, the timestamp where we + learned about the announcement, and the date when the earnings will be + announced. + + If the '{TS_FIELD_NAME}' field is not included it is assumed that we + start the backtest with knowledge of all announcements. + """ + + @preprocess(data_query_tz=optionally(ensure_timezone)) + def __init__(self, + expr, + resources=None, + odo_kwargs=None, + data_query_time=None, + data_query_tz=None, + dataset=None, + loader=None): + dshape = expr.dshape + + if not istabular(dshape): + raise ValueError( + 'expression dshape must be tabular, got: %s' % dshape, + ) + + expected_fields = self._expected_fields + self._expr = bind_expression_to_resources( + expr[list(expected_fields)], + resources, + ) + self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {} + self._dataset = dataset + check_data_query_args(data_query_time, data_query_tz) + self._data_query_time = data_query_time + self._data_query_tz = data_query_tz + self._loader = loader + + def load_adjusted_array(self, columns, dates, assets, mask): + data_query_time = self._data_query_time + data_query_tz = self._data_query_tz + lower_dt, upper_dt = normalize_data_query_bounds( + dates[0], + dates[-1], + data_query_time, + data_query_tz, + ) + + raw = ffill_query_in_range( + self._expr, + lower_dt, + upper_dt, + self._odo_kwargs, + ) + sids = raw.loc[:, SID_FIELD_NAME] + raw.drop( + sids[~sids.isin(assets)].index, + inplace=True + ) + if data_query_time is not None: + normalize_timestamp_to_query_time( + raw, + data_query_time, + data_query_tz, + inplace=True, + ts_field=TS_FIELD_NAME, + ) + gb = raw.groupby(SID_FIELD_NAME) + return self._loader( + dates, + self.prepare_data(raw, gb), + dataset=self._dataset, + ).load_adjusted_array(columns, dates, assets, mask) + + def prepare_data(self, raw, gb): + return {sid: raw.loc[group] for sid, group in gb.groups.iteritems()} diff --git a/zipline/pipeline/loaders/buyback_auth.py b/zipline/pipeline/loaders/buyback_auth.py index 8710fd29..02727e53 100644 --- a/zipline/pipeline/loaders/buyback_auth.py +++ b/zipline/pipeline/loaders/buyback_auth.py @@ -1 +1,122 @@ -__author__ = 'mtydykov' +""" +Reference implementation for EarningsCalendar loaders. +""" + +from ..data.buyback_auth import CashBuybackAuthorizations, \ + ShareBuybackAuthorizations +from events import EventsLoader +from zipline.utils.memoize import lazyval + + +BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_dates' +SHARE_COUNT_FIELD_NAME = 'share_counts' +VALUE_FIELD_NAME = 'values' + + +# TODO: split into 2 datasets - or just think about how to generalize since +# we will often have cases where we have a knowledge date and, optionally, +# a value for that event; having no value (like earnings) is a special case. +class CashBuybackAuthorizationsLoader(EventsLoader): + """ + Reference loader for + :class:`zipline.pipeline.data.earnings.BuybackAuthorizations`. + + Does not currently support adjustments to the dates of known buyback + authorizations. + + events_by_sid: dict[sid -> pd.DataFrame(knowledge date, + event date, value)] + + """ + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=CashBuybackAuthorizations): + super(CashBuybackAuthorizationsLoader, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset + ) + + + def get_loader(self, column): + """dispatch to the loader for ``column``. + """ + if column is self.dataset.previous_value: + return self.previous_buyback_value_loader + elif column is self.dataset.previous_announcement_date: + return self.previous_event_date_loader + else: + raise ValueError("Don't know how to load column '%s'." % column) + + + @lazyval + def previous_buyback_value_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_value, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + VALUE_FIELD_NAME + ) + + @lazyval + def previous_event_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement_date, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + ) + + +class ShareBuybackAuthorizationsLoader(EventsLoader): + """ + Reference loader for + :class:`zipline.pipeline.data.earnings.BuybackAuthorizations`. + + Does not currently support adjustments to the dates of known buyback + authorizations. + + events_by_sid: dict[sid -> pd.DataFrame(knowledge date, + event date, value)] + + """ + + def __init__(self, + all_dates, + events_by_sid, + infer_timestamps=False, + dataset=ShareBuybackAuthorizations): + super(ShareBuybackAuthorizationsLoader, self).__init__( + all_dates, + events_by_sid, + infer_timestamps=infer_timestamps, + dataset=dataset + ) + + + def get_loader(self, column): + """dispatch to the loader for ``column``. + """ + if column is self.dataset.previous_share_count: + return self.previous_buyback_share_count_loader + elif column is self.dataset.previous_announcement_date: + return self.previous_event_date_loader + else: + raise ValueError("Don't know how to load column '%s'." % column) + + + @lazyval + def previous_buyback_share_count_loader(self): + return self._previous_event_value_loader( + self.dataset.previous_share_count, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + SHARE_COUNT_FIELD_NAME + ) + + @lazyval + def previous_event_date_loader(self): + return self._previous_event_date_loader( + self.dataset.previous_announcement_date, + BUYBACK_ANNOUNCEMENT_FIELD_NAME, + ) diff --git a/zipline/pipeline/loaders/earnings.py b/zipline/pipeline/loaders/earnings.py index 9912faff..f93645d0 100644 --- a/zipline/pipeline/loaders/earnings.py +++ b/zipline/pipeline/loaders/earnings.py @@ -1,71 +1,21 @@ """ Reference implementation for EarningsCalendar loaders. """ -from itertools import repeat -import pandas as pd -from six import iteritems -from toolz import merge - -from .base import PipelineLoader -from .frame import DataFrameLoader -from .utils import next_date_frame, previous_date_frame +from events import EventsLoader from ..data.earnings import EarningsCalendar from zipline.utils.memoize import lazyval +ANNOUNCEMENT_FIELD_NAME = "announcement_date" -class EarningsCalendarLoader(PipelineLoader): - """ - Reference loader for - :class:`zipline.pipeline.data.earnings.EarningsCalendar`. - Does not currently support adjustments to the dates of known earnings. - - Parameters - ---------- - all_dates : pd.DatetimeIndex - Index of dates for which we can serve queries. - announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex] - Dict mapping sids to objects representing dates on which earnings - occurred. - - If a dict value is a Series, it's interpreted as a mapping from the - date on which we learned an announcement was coming to the date on - which the announcement was made. - - If a dict value is a DatetimeIndex, it's interpreted as just containing - the dates that announcements were made, and we assume we knew about the - announcement on all prior dates. This mode is only supported if - ``infer_timestamp`` is explicitly passed as a truthy value. - - infer_timestamps : bool, optional - Whether to allow passing ``DatetimeIndex`` values in - ``announcement_dates``. - """ - def __init__(self, - all_dates, - announcement_dates, - infer_timestamps=False, +class EarningsCalendarLoader(EventsLoader): + def __init__(self, all_dates, events_by_sid, infer_timestamps=False, dataset=EarningsCalendar): - self.all_dates = all_dates - self.announcement_dates = announcement_dates = ( - announcement_dates.copy() - ) - dates = self.all_dates.values - for k, v in iteritems(announcement_dates): - if isinstance(v, pd.DatetimeIndex): - if not infer_timestamps: - raise ValueError( - "Got DatetimeIndex of announcement dates for sid %d.\n" - "Pass `infer_timestamps=True` to use the first date in" - " `all_dates` as implicit timestamp." - ) - # If we are passed a DatetimeIndex, we always have - # knowledge of the announcements. - announcement_dates[k] = pd.Series( - v, index=repeat(dates[0], len(v)), - ) - self.dataset = dataset + super(EarningsCalendarLoader, self).__init__(all_dates, + events_by_sid, + infer_timestamps, + dataset=dataset) def get_loader(self, column): """Dispatch to the loader for ``column``. @@ -79,30 +29,12 @@ class EarningsCalendarLoader(PipelineLoader): @lazyval def next_announcement_loader(self): - return DataFrameLoader( - self.dataset.next_announcement, - next_date_frame( - self.all_dates, - self.announcement_dates, - ), - adjustments=None, - ) + return self._next_event_date_loader(self.dataset.next_announcement, + ANNOUNCEMENT_FIELD_NAME) @lazyval def previous_announcement_loader(self): - return DataFrameLoader( + return self._previous_event_date_loader( self.dataset.previous_announcement, - previous_date_frame( - self.all_dates, - self.announcement_dates, - ), - adjustments=None, - ) - - def load_adjusted_array(self, columns, dates, assets, mask): - return merge( - self.get_loader(column).load_adjusted_array( - [column], dates, assets, mask - ) - for column in columns + ANNOUNCEMENT_FIELD_NAME ) diff --git a/zipline/pipeline/loaders/events.py b/zipline/pipeline/loaders/events.py index 1e6fe7e0..8c6e2a5e 100644 --- a/zipline/pipeline/loaders/events.py +++ b/zipline/pipeline/loaders/events.py @@ -1,39 +1,71 @@ -from abc import abstractmethod -from itertools import repeat +from abc import ABCMeta, abstractmethod +import numpy as np import pandas as pd from six import iteritems from toolz import merge from .base import PipelineLoader from .frame import DataFrameLoader -from zipline.utils.memoize import lazyval +from .utils import next_date_frame, previous_date_frame, previous_value + +TS_FIELD_NAME = "timestamp" class EventsLoader(PipelineLoader): + """ + Abstract loader. + + Does not currently support adjustments to the dates of known events. + + Parameters + ---------- + all_dates : pd.DatetimeIndex + Index of dates for which we can serve queries. + events_by_sid : dict[int -> pd.Series] + Dict mapping sids to objects representing dates on which events + occurred. + + If a dict value is a Series, it's interpreted as a mapping from the + date on which we learned an announcement was coming to the date on + which the announcement was made. + + If a dict value is a DatetimeIndex, it's interpreted as just containing + the dates that announcements were made, and we assume we knew about the + announcement on all prior dates. This mode is only supported if + ``infer_timestamp`` is explicitly passed as a truthy value. + + infer_timestamps : bool, optional + Whether to allow passing ``DatetimeIndex`` values in + ``announcement_dates``. + """ + def __init__(self, all_dates, - announcement_dates, + events_by_sid, infer_timestamps=False, dataset=None): self.all_dates = all_dates - self.announcement_dates = ( - announcement_dates.copy() + # TODO: why are we making a copy here? We end up with a copy that we + # modify and then don't use, and an unmodified original which we do use. + self.events_by_sid = ( + events_by_sid.copy() ) dates = self.all_dates.values - for k, v in iteritems(announcement_dates): - if isinstance(v, pd.DatetimeIndex): + + for k, v in iteritems(events_by_sid): + if "timestamp" not in v.columns: if not infer_timestamps: raise ValueError( "Got DatetimeIndex of announcement dates for sid %d.\n" "Pass `infer_timestamps=True` to use the first date in" " `all_dates` as implicit timestamp." ) - # If we are passed a DatetimeIndex, we always have - # knowledge of the announcements. - announcement_dates[k] = pd.Series( - v, index=repeat(dates[0], len(v)), - ) + self.events_by_sid[k] = v = v.copy() + v.index = [dates[0]] * len(v) + else: + self.events_by_sid[k] = v.set_index("timestamp") + self.dataset = dataset @@ -50,13 +82,48 @@ class EventsLoader(PipelineLoader): for column in columns ) - @lazyval - def date_frame_loader(self, col_name, next_or_prev): + def mk_date_series(self, date_field_name): + return {sid: pd.Series(index=event.index, + data=np.array(event[date_field_name])) + for sid, event in iteritems(self.events_by_sid)} + + def _next_event_date_loader(self, next_date_field, event_date_field_name): return DataFrameLoader( - col_name, - next_or_prev( + next_date_field, + next_date_frame( self.all_dates, - self.announcement_dates, + self.mk_date_series(event_date_field_name), ), adjustments=None, ) + + def _previous_event_date_loader(self, prev_date_field, event_date_field_name): + return DataFrameLoader( + prev_date_field, + previous_date_frame( + self.all_dates, + self.mk_date_series(event_date_field_name), + ), + adjustments=None, + ) + + def _previous_event_value_loader(self, + previous_value_field, + event_date_field_name, + value_field_name): + return DataFrameLoader( + previous_value_field, + previous_value( + self.all_dates, + self.events_by_sid, + event_date_field_name, + value_field_name, + previous_value_field.dtype, + # TODO: need to get actual name/method to use to get missing + # value + None + ), + adjustments=None, + ) + + diff --git a/zipline/pipeline/loaders/utils.py b/zipline/pipeline/loaders/utils.py index 7f9448e7..ff4ee899 100644 --- a/zipline/pipeline/loaders/utils.py +++ b/zipline/pipeline/loaders/utils.py @@ -1,6 +1,7 @@ import datetime import numpy as np +from numpy import NaN import pandas as pd from six import iteritems from six.moves import zip @@ -82,14 +83,14 @@ def previous_date_frame(date_index, events_by_sid): next_date_frame """ sids = list(events_by_sid) - out = np.full((len(date_index), len(sids)), NaTns, dtype='datetime64[ns]') - dn = date_index[-1].asm8 + out = np.full((len(date_index), len(sids)), np_NaT, dtype='datetime64[ns]') + d_n = date_index[-1].asm8 for col_idx, sid in enumerate(sids): # events_by_sid[sid] is Series mapping knowledge_date to actual # event_date. We don't care about the knowledge date for # computing previous earnings. values = events_by_sid[sid].values - values = values[values <= dn] + values = values[values <= d_n] out[date_index.searchsorted(values), col_idx] = values frame = pd.DataFrame(out, index=date_index, columns=sids) @@ -97,6 +98,59 @@ def previous_date_frame(date_index, events_by_sid): return frame +def previous_value(date_index, events_by_sid, event_date_field, value_field, + value_field_dtype, missing_value): + """ + Make a DataFrame representing simulated next earnings date_index. + + Parameters + ---------- + date_index : DatetimeIndex. + The index of the returned DataFrame. + events_by_sid : dict[int -> DatetimeIndex] + Dict mapping sids to a series of dates. Each k:v pair of the series + represents the date we learned of the event mapping to the date the + event will occur. + + Returns + ------- + previous_events: pd.DataFrame + A DataFrame where each column is a security from `events_by_sid` where + the values are the dates of the previous event that occured on the date + of the index. Entries falling before the first date will have `NaT` as + the result in the output. + + See Also + -------- + next_date_frame + """ + sids = list(events_by_sid) + # TODO: generalize; need to use dtype of column and missing value for that + # column; so pass + # in the pipeline column's attributes for these (replace NaN and dtype + # below) + out = np.full( + (len(date_index), len(sids)), + # TODO; replace with missing_value + NaN, + dtype=value_field_dtype + ) + d_n = date_index[-1].asm8 + for col_idx, sid in enumerate(sids): + # events_by_sid[sid] is DataFrame mapping knowledge_date to event + # date and value. We don't care about the knowledge date for computing + # previous values. + df = events_by_sid[sid] + df = df[df[event_date_field] <= d_n] + out[ + date_index.searchsorted(df[event_date_field].values), col_idx + ] = df[value_field] + + frame = pd.DataFrame(out, index=date_index, columns=sids) + frame.ffill(inplace=True) + return frame + + def normalize_data_query_time(dt, time, tz): """Apply the correct time and timezone to a date.