ENH: add buyback_auth loader.

WIP: finish refactoring blaze events loader.

WIP: tests passing for earnings.

BUG: pass all kwargs explicitly for BlazeEventsCalendarLoader.

If this is not done, resources are not bound correctly.

MAINT: refactor for buyback_auth.
This commit is contained in:
Maya Tydykov
2016-01-15 16:58:04 -05:00
parent 534f820a91
commit 3142fa516f
14 changed files with 1090 additions and 258 deletions
+404
View File
@@ -0,0 +1,404 @@
"""
Tests for the reference loader for EarningsCalendar.
"""
from unittest import TestCase
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
from contextlib2 import ExitStack
from nose_parameterized import parameterized
import pandas as pd
import numpy as np
from pandas.util.testing import assert_series_equal
from six import iteritems
from zipline.pipeline import Pipeline
from zipline.pipeline.data import (CashBuybackAuthorizations,
ShareBuybackAuthorizations)
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.factors.events import (
BusinessDaysSincePreviousCashBuybackAuth,
BusinessDaysSincePreviousShareBuybackAuth
)
from zipline.pipeline.loaders.buyback_auth import \
CashBuybackAuthorizationsLoader, ShareBuybackAuthorizationsLoader
from zipline.pipeline.loaders.blaze import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
CashBuybackAuthorizationsLoader,
SHARE_COUNT_FIELD_NAME,
SID_FIELD_NAME,
ShareBuybackAuthorizationsLoader,
TS_FIELD_NAME,
VALUE_FIELD_NAME
)
from zipline.utils.numpy_utils import make_datetime64D, np_NaT
from zipline.utils.test_utils import (
make_simple_equity_info,
tmp_asset_finder,
gen_calendars,
num_days_in_range,
)
sids = A, B, C, D, E = range(5)
equity_info = make_simple_equity_info(
sids,
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
buyback_authorizations = {
# K1--K2--A1--A2--SC1--SC2--V1--V2.
A: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15',
'2014-01-20']),
SHARE_COUNT_FIELD_NAME: [1, 15],
VALUE_FIELD_NAME: [10, 20]
}),
# K1--K2--E2--E1.
B: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([
'2014-01-20', '2014-01-15']),
SHARE_COUNT_FIELD_NAME: [7, 13], VALUE_FIELD_NAME: [10, 22]
}),
# K1--E1--K2--E2.
C: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-15']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([
'2014-01-10', '2014-01-20']),
SHARE_COUNT_FIELD_NAME: [3, 1],
VALUE_FIELD_NAME: [4, 7]
}),
# K1 == K2.
D: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05'] * 2),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([
'2014-01-10', '2014-01-15']),
SHARE_COUNT_FIELD_NAME: [6, 23],
VALUE_FIELD_NAME: [1, 2]
}),
E: pd.DataFrame(
columns=["timestamp",
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
VALUE_FIELD_NAME],
dtype='datetime64[ns]'
),
}
param_dates = gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
]),
)
def zip_with_floats(flts, dates):
return pd.Series(flts, index=dates).astype('float')
def num_days_between(dates, start_date, end_date):
return num_days_in_range(dates, start_date, end_date)
def zip_with_dates(dts, dates):
return pd.Series(pd.to_datetime(dts), index=dates)
class BuybackAuthLoaderTestCase(TestCase):
"""
Tests for loading the earnings announcement data.
"""
@classmethod
def setUpClass(cls):
cls._cleanup_stack = stack = ExitStack()
cls.finder = stack.enter_context(
tmp_asset_finder(equities=equity_info),
)
cls.cols = {}
cls.buyback_authorizations = None
@classmethod
def tearDownClass(cls):
cls._cleanup_stack.close()
def loader_args(self, dates):
"""Construct the base buyback authorizations object to pass to the
loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.buyback_authorizations
def setup(self, dates):
"""
Make a PipelineEngine and expectation functions for the given dates
calendar.
This exists to make it easy to test our various cases with critical
dates missing from the calendar.
"""
_expected_previous_buyback_announcement = pd.DataFrame({
A: zip_with_dates(
['NaT'] * num_days_between(dates, None, '2014-01-14') +
['2014-01-15'] * num_days_between(dates, '2014-01-15', '2014-01-19') +
['2014-01-20'] * num_days_between(dates, '2014-01-20', None),
dates
),
B: zip_with_dates(
['NaT'] * num_days_between(dates, None, '2014-01-14') +
['2014-01-15'] * num_days_between(dates, '2014-01-15', '2014-01-19') +
['2014-01-20'] * num_days_between(dates, '2014-01-20', None),
dates
),
C: zip_with_dates(
['NaT'] * num_days_between(dates, None, '2014-01-09') +
['2014-01-10'] * num_days_between(dates, '2014-01-10', '2014-01-19') +
['2014-01-20'] * num_days_between(dates, '2014-01-20', None),
dates
),
D: zip_with_dates(
['NaT'] * num_days_between(dates, None, '2014-01-09') +
['2014-01-10'] * num_days_between(dates, '2014-01-10', '2014-01-14') +
['2014-01-15'] * num_days_between(dates, '2014-01-15', None),
dates
),
E: zip_with_dates(['NaT'] * len(dates), dates),
}, index=dates)
_expected_previous_busday_offsets = self._compute_busday_offsets(
_expected_previous_buyback_announcement
)
self.cols['previous_buyback_announcement'] = _expected_previous_buyback_announcement
self.cols['days_since_prev'] = _expected_previous_busday_offsets
loader = self.loader_type(*self.loader_args(dates))
engine = SimplePipelineEngine(lambda _: loader, dates, self.finder)
return engine
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == np_NaT
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
def _test_compute_buyback_auth(self, dates):
engine = self.setup(dates)
pipe = Pipeline(
columns=self.pipeline_columns
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
for sid in sids:
for col_name in self.cols.keys():
assert_series_equal(result[col_name].xs(sid, level=1),
self.cols[col_name][sid],
sid)
class ShareBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase):
buyback_authorizations = {sid: df.drop(VALUE_FIELD_NAME, 1)
for sid, df in iteritems(buyback_authorizations)}
pipeline_columns = {
'previous_buyback_share_count':
ShareBuybackAuthorizations.previous_share_count.latest,
'previous_buyback_announcement':
ShareBuybackAuthorizations.previous_announcement_date.latest,
'days_since_prev':
BusinessDaysSincePreviousShareBuybackAuth(),
}
@classmethod
def setUpClass(cls):
super(ShareBuybackAuthLoaderTestCase, cls).setUpClass()
cls.buyback_authorizations = buyback_authorizations
cls.loader_type = ShareBuybackAuthorizationsLoader
def setup(self, dates):
engine = super(ShareBuybackAuthLoaderTestCase, self).setup(dates)
_expected_previous_buyback_share_count = pd.DataFrame({
A: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-14') +
[1] * num_days_between(dates, '2014-01-15', '2014-01-19') +
[15] * num_days_between(dates, '2014-01-20', None), dates),
B: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-14') +
[13] * num_days_between(dates, '2014-01-15', '2014-01-19') +
[7] * num_days_between(dates, '2014-01-20', None), dates),
C: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') +
[3] * num_days_between(dates, '2014-01-10', '2014-01-19') +
[1] * num_days_between(dates, '2014-01-20', None), dates),
D: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') +
[6] * num_days_between(dates, '2014-01-10', '2014-01-14') +
[23] * num_days_between(dates, '2014-01-15', None), dates),
E: zip_with_floats(['NaN'] * len(dates), dates),
}, index=dates)
self.cols['previous_buyback_share_count'] = _expected_previous_buyback_share_count
return engine
@parameterized.expand(param_dates)
def test_compute_buyback_auth(self, dates):
self._test_compute_buyback_auth(dates)
class CashBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase):
buyback_authorizations = {sid: df.drop(SHARE_COUNT_FIELD_NAME, 1)
for sid, df in iteritems(buyback_authorizations)}
pipeline_columns = {
'previous_buyback_value':
CashBuybackAuthorizations.previous_value.latest,
'previous_buyback_announcement':
CashBuybackAuthorizations.previous_announcement_date.latest,
'days_since_prev':
BusinessDaysSincePreviousCashBuybackAuth(),
}
@classmethod
def setUpClass(cls):
super(CashBuybackAuthLoaderTestCase, cls).setUpClass()
cls.buyback_authorizations = buyback_authorizations
cls.loader_type = CashBuybackAuthLoaderTestCase
def setup(self, dates):
engine = super(ShareBuybackAuthLoaderTestCase, self).setup(dates)
_expected_previous_value = pd.DataFrame({
# TODO if the next knowledge date is 10, why is the range
# until 15?
A: zip_with_floats(
['NaN'] * num_days_between(dates, None, '2014-01-14') +
[10] * num_days_between(dates, '2014-01-15', '2014-01-19') +
[20] * num_days_between(dates, '2014-01-20', None), dates),
B: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-14') +
[22] * num_days_between(dates, '2014-01-15', '2014-01-19') +
[10] * num_days_between(dates, '2014-01-20', None), dates),
C: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') +
[4] * num_days_between(dates, '2014-01-10', '2014-01-19') +
[7] * num_days_between(dates, '2014-01-20', None), dates),
D: zip_with_floats(['NaN'] * num_days_between(dates, None, '2014-01-09') +
[1] * num_days_between(dates, '2014-01-10', '2014-01-14') +
[2] * num_days_between(dates, '2014-01-15', None), dates),
E: zip_with_floats(['NaN'] * len(dates), dates),
}, index=dates)
self.cols['previous_buyback_value'] = _expected_previous_value
return engine
@parameterized.expand(param_dates)
def test_compute_buyback_auth(self, dates):
self._test_compute_buyback_auth(dates)
# class BlazeBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase):
# loader_type = BlazeBuybackAuthorizationsLoader
#
# def loader_args(self, dates):
# _, mapping = super(
# BlazeBuybackAuthLoaderTestCase,
# self,
# ).loader_args(dates)
# return (bz.Data(pd.concat(
# pd.DataFrame({
# BUYBACK_ANNOUNCEMENT_FIELD_NAME:
# frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME],
# SHARE_COUNT_FIELD_NAME: frame[SHARE_COUNT_FIELD_NAME],
# VALUE_FIELD_NAME: frame[VALUE_FIELD_NAME],
# TS_FIELD_NAME: frame.index,
# SID_FIELD_NAME: sid,
# })
# for sid, frame in iteritems(mapping)
# ).reset_index(drop=True)),)
#
#
# class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
# BlazeBuybackAuthLoaderTestCase):
# """Test case for passing a non-interactive symbol and a dict of resources.
# """
# def loader_args(self, dates):
# (bound_expr,) = super(
# BlazeEarningsCalendarLoaderNotInteractiveTestCase,
# self,
# ).loader_args(dates)
# return swap_resources_into_scope(bound_expr, {})
#
#
# class BuybackAuthLoaderInferTimestampTestCase(TestCase):
# def test_infer_timestamp(self):
# dtx = pd.date_range('2014-01-01', '2014-01-10')
# events_by_sid = {
# 0: pd.DataFrame({BUYBACK_ANNOUNCEMENT_FIELD_NAME: dtx}),
# 1: pd.DataFrame(
# {BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.Series(dtx, dtx)},
# index=dtx
# )
# }
# loader = BuybackAuthorizationsLoader(
# dtx,
# events_by_sid,
# infer_timestamps=True,
# )
# self.assertEqual(
# loader.events_by_sid.keys(),
# events_by_sid.keys(),
# )
# assert_series_equal(
# loader.events_by_sid[0][BUYBACK_ANNOUNCEMENT_FIELD_NAME],
# pd.Series(index=[dtx[0]] * 10, data=dtx),
# )
# assert_series_equal(
# loader.events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME],
# events_by_sid[1][BUYBACK_ANNOUNCEMENT_FIELD_NAME],
# )
+40 -32
View File
@@ -57,30 +57,33 @@ class EarningsCalendarLoaderTestCase(TestCase):
cls.earnings_dates = {
# K1--K2--E1--E2.
A: to_series(
knowledge_dates=['2014-01-05', '2014-01-10'],
earning_dates=['2014-01-15', '2014-01-20'],
),
A: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-15',
'2014-01-20'])
}),
# K1--K2--E2--E1.
B: to_series(
knowledge_dates=['2014-01-05', '2014-01-10'],
earning_dates=['2014-01-20', '2014-01-15']
),
B: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-20',
'2014-01-15'])
}),
# K1--E1--K2--E2.
C: to_series(
knowledge_dates=['2014-01-05', '2014-01-15'],
earning_dates=['2014-01-10', '2014-01-20']
),
C: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05', '2014-01-15']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10',
'2014-01-20'])
}),
# K1 == K2.
D: to_series(
knowledge_dates=['2014-01-05'] * 2,
earning_dates=['2014-01-10', '2014-01-15'],
),
E: pd.Series(
data=[],
index=pd.DatetimeIndex([]),
dtype='datetime64[ns]',
),
D: pd.DataFrame({
"timestamp": pd.to_datetime(['2014-01-05'] * 2),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-10',
'2014-01-15'])
}),
E: pd.DataFrame({
"timestamp": pd.to_datetime([]),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime([])
})
}
@classmethod
@@ -118,7 +121,8 @@ class EarningsCalendarLoaderTestCase(TestCase):
def zip_with_dates(dts):
return pd.Series(pd.to_datetime(dts), index=dates)
# TODO: tests will break because I now need mappings of sid ->
# dataframe instead of sid -> series
_expected_next_announce = pd.DataFrame({
A: zip_with_dates(
['NaT'] * num_days_between(None, '2014-01-04') +
@@ -345,11 +349,11 @@ class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
).loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
ANNOUNCEMENT_FIELD_NAME: earning_dates,
TS_FIELD_NAME: earning_dates.index,
ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, earning_dates in iteritems(mapping)
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
@@ -369,8 +373,8 @@ class EarningsCalendarLoaderInferTimestampTestCase(TestCase):
def test_infer_timestamp(self):
dtx = pd.date_range('2014-01-01', '2014-01-10')
announcement_dates = {
0: dtx,
1: pd.Series(dtx, dtx),
0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}),
1: pd.DataFrame({TS_FIELD_NAME: dtx, ANNOUNCEMENT_FIELD_NAME: dtx}),
}
loader = EarningsCalendarLoader(
dtx,
@@ -378,14 +382,18 @@ class EarningsCalendarLoaderInferTimestampTestCase(TestCase):
infer_timestamps=True,
)
self.assertEqual(
loader.announcement_dates.keys(),
loader.events_by_sid.keys(),
announcement_dates.keys(),
)
assert_series_equal(
loader.announcement_dates[0],
pd.Series(index=[dtx[0]] * 10, data=dtx),
pd.Series(loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME]),
pd.Series(index=[dtx[0]] * 10, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME),
)
assert_series_equal(
loader.announcement_dates[1],
announcement_dates[1],
pd.Series(loader.events_by_sid[1][ANNOUNCEMENT_FIELD_NAME]),
pd.Series(index=announcement_dates[1][TS_FIELD_NAME],
data=np.array(announcement_dates[1][
ANNOUNCEMENT_FIELD_NAME]),
name=ANNOUNCEMENT_FIELD_NAME)
)
+3
View File
@@ -1,11 +1,14 @@
from .buyback_auth import CashBuybackAuthorizations, ShareBuybackAuthorizations
from .earnings import EarningsCalendar
from .equity_pricing import USEquityPricing
from .dataset import DataSet, Column, BoundColumn
__all__ = [
'BoundColumn',
'CashBuybackAuthorizations',
'Column',
'DataSet',
'EarningsCalendar',
'ShareBuybackAuthorizations',
'USEquityPricing',
]
+7 -5
View File
@@ -6,12 +6,14 @@ from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
from .dataset import Column, DataSet
class BuybackAuthorizations(DataSet):
class CashBuybackAuthorizations(DataSet):
"""
Dataset representing dates of recently announced buyback authorization.
"""
previous_buyback_value = Column(float64_dtype)
previous_buyback_share_count = Column(float64_dtype)
previous_buyback_value_announcement = Column(datetime64ns_dtype)
previous_buyback_share_count_announcement = Column(datetime64ns_dtype)
previous_value = Column(float64_dtype)
previous_announcement_date = Column(datetime64ns_dtype)
class ShareBuybackAuthorizations(DataSet):
previous_share_count = Column(float64_dtype)
previous_announcement_date = Column(datetime64ns_dtype)
+78 -34
View File
@@ -3,6 +3,10 @@ Factors describing information about event data (e.g. earnings
announcements, acquisitions, dividends, etc.).
"""
from numpy import newaxis
from zipline.pipeline.data.buyback_auth import (
CashBuybackAuthorizations,
ShareBuybackAuthorizations
)
from zipline.pipeline.data.earnings import EarningsCalendar
from zipline.utils.numpy_utils import (
NaTD,
@@ -14,10 +18,42 @@ from zipline.utils.numpy_utils import (
from .factor import Factor
class BusinessDaysUntilNextEarnings(Factor):
class BusinessDaysSincePreviousEvents(Factor):
"""
Factor returning the number of **business days** (not trading days!) until
the next known earnings date for each asset.
Abstract class for business days since a previous event.
Returns the number of **business days** (not trading days!) since
the most recent event date for each asset.
This doesn't use trading days for symmetry with
BusinessDaysUntilNextEarnings.
Assets which announced or will announce the event today will produce a value
of 0.0. Assets that announced the event on the previous business day will
produce a value of 1.0.
Assets for which the event date is `NaT` will produce a value of `NaN`.
"""
window_length = 0
dtype = float64_dtype
def _compute(self, arrays, dates, assets, mask):
# Coerce from [ns] to [D] for numpy busday_count.
announce_dates = arrays[0].astype(datetime64D_dtype)
# Set masked values to NaT.
announce_dates[~mask] = np_NaT
# Convert row labels into a column vector for broadcasted comparison.
reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis]
return busday_count_mask_NaT(announce_dates, reference_dates)
class BusinessDaysUntilNextEvents(Factor):
"""
Abstract class for business days since a next event.
Returns the number of **business days** (not trading days!) until
the next known event date for each asset.
This doesn't use trading days because the trading calendar includes
information that may not have been available to the algorithm at the time
@@ -26,19 +62,12 @@ class BusinessDaysUntilNextEarnings(Factor):
For example, the NYSE closings September 11th 2001, would not have been
known to the algorithm on September 10th.
Assets that announced or will announce earnings today will produce a value
of 0.0. Assets that will announce earnings on the next upcoming business
Assets that announced or will announce the event today will produce a value
of 0.0. Assets that will announce the event on the next upcoming business
day will produce a value of 1.0.
Assets for which `EarningsCalendar.next_announcement` is `NaT` will produce
a value of `NaN`.
See Also
--------
zipline.pipeline.factors.BusinessDaysSincePreviousEarnings
Assets for which the event date is `NaT` will produce a value of `NaN`.
"""
inputs = [EarningsCalendar.next_announcement]
window_length = 0
dtype = float64_dtype
@@ -55,37 +84,52 @@ class BusinessDaysUntilNextEarnings(Factor):
return busday_count_mask_NaT(reference_dates, announce_dates)
class BusinessDaysSincePreviousEarnings(Factor):
class BusinessDaysUntilNextEarnings(BusinessDaysUntilNextEvents):
"""
Factor returning the number of **business days** (not trading days!) until
the next known earnings date for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSincePreviousEarnings
"""
inputs = [EarningsCalendar.next_announcement]
class BusinessDaysSincePreviousEarnings(BusinessDaysSincePreviousEvents):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent earnings date for each asset.
This doesn't use trading days for symmetry with
BusinessDaysUntilNextEarnings.
Assets which announced or will announce earnings today will produce a value
of 0.0. Assets that announced earnings on the previous business day will
produce a value of 1.0.
Assets for which `EarningsCalendar.previous_announcement` is `NaT` will
produce a value of `NaN`.
See Also
--------
zipline.pipeline.factors.BusinessDaysUntilNextEarnings
"""
inputs = [EarningsCalendar.previous_announcement]
window_length = 0
dtype = float64_dtype
def _compute(self, arrays, dates, assets, mask):
# Coerce from [ns] to [D] for numpy busday_count.
announce_dates = arrays[0].astype(datetime64D_dtype)
class BusinessDaysSincePreviousCashBuybackAuth(BusinessDaysSincePreviousEvents):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent cash buyback authorization for each asset.
# Set masked values to NaT.
announce_dates[~mask] = NaTD
See Also
--------
zipline.pipeline.factors.BusinessDaysUntilNextEarnings
"""
inputs = [CashBuybackAuthorizations.previous_announcement_date]
# Convert row labels into a column vector for broadcasted comparison.
reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis]
return busday_count_mask_NaT(announce_dates, reference_dates)
class BusinessDaysSincePreviousShareBuybackAuth(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent share buyback authorization for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysUntilNextEarnings
"""
inputs = [ShareBuybackAuthorizations.previous_announcement_date]
@@ -1,3 +1,7 @@
from .buyback_auth import (
CashBuybackAuthorizationsLoader,
ShareBuybackAuthorizationsLoader
)
from .core import (
AD_FIELD_NAME,
BlazeLoader,
@@ -7,6 +11,11 @@ from .core import (
from_blaze,
global_loader,
)
from .buyback_auth import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
VALUE_FIELD_NAME
)
from .earnings import (
ANNOUNCEMENT_FIELD_NAME,
BlazeEarningsCalendarLoader,
@@ -17,9 +26,14 @@ __all__ = (
'ANNOUNCEMENT_FIELD_NAME',
'BlazeEarningsCalendarLoader',
'BlazeLoader',
'BUYBACK_ANNOUNCEMENT_FIELD_NAME',
'CashBuybackAuthorizationsLoader',
'NoDeltasWarning',
'SHARE_COUNT_FIELD_NAME',
'SID_FIELD_NAME',
'ShareBuybackAuthorizationsLoader',
'TS_FIELD_NAME',
'VALUE_FIELD_NAME',
'from_blaze',
'global_loader',
)
@@ -0,0 +1,134 @@
from .core import (
TS_FIELD_NAME,
SID_FIELD_NAME,
)
from zipline.pipeline.data import (CashBuybackAuthorizations,
ShareBuybackAuthorizations)
from zipline.pipeline.loaders.buyback_auth import (
CashBuybackAuthorizationsLoader,
ShareBuybackAuthorizationsLoader
)
from .events import BlazeEventsCalendarLoader
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_dates'
SHARE_COUNT_FIELD_NAME = 'share_counts'
VALUE_FIELD_NAME = 'values'
class BlazeCashBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
"""A pipeline loader for the ``BuybackAuth`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime,
{VALUE_FIELD_NAME}: ?float64
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the buyback was announced, the share count, and the value.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME,
VALUE_FIELD_NAME=VALUE_FIELD_NAME
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
VALUE_FIELD_NAME
})
def __init__(self,
expr,
dataset=CashBuybackAuthorizations,
loader=CashBuybackAuthorizationsLoader,
**kwargs):
super(
BlazeCashBuybackAuthorizationsLoader, self
).__init__(expr, dataset=dataset, loader=loader, **kwargs)
class BlazeShareBuybackAuthorizationsLoader(BlazeEventsCalendarLoader):
"""A pipeline loader for the ``BuybackAuth`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime,
{SHARE_COUNT_FIELD_NAME}: ?float64,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the buyback was announced, the share count, and the value.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME=SHARE_COUNT_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME,
})
def __init__(self,
expr,
dataset=ShareBuybackAuthorizations,
loader=ShareBuybackAuthorizationsLoader,
**kwargs):
super(
BlazeShareBuybackAuthorizationsLoader, self
).__init__(expr, dataset=dataset, loader=loader, **kwargs)
+1
View File
@@ -182,6 +182,7 @@ from zipline.utils.preprocess import preprocess
AD_FIELD_NAME = 'asof_date'
TS_FIELD_NAME = 'timestamp'
SID_FIELD_NAME = 'sid'
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
valid_deltas_node_types = (
bz.expr.Field,
bz.expr.ReLabel,
+13 -85
View File
@@ -1,29 +1,14 @@
from datashape import istabular
import pandas as pd
from toolz import valmap
from .core import (
ANNOUNCEMENT_FIELD_NAME,
TS_FIELD_NAME,
SID_FIELD_NAME,
bind_expression_to_resources,
ffill_query_in_range,
SID_FIELD_NAME
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
)
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
from .events import BlazeEventsCalendarLoader
ANNOUNCEMENT_FIELD_NAME = 'announcement_date'
class BlazeEarningsCalendarLoader(PipelineLoader):
class BlazeEarningsCalendarLoader(BlazeEventsCalendarLoader):
"""A pipeline loader for the ``EarningsCalendar`` dataset that loads
data from a blaze expression.
@@ -57,6 +42,7 @@ class BlazeEarningsCalendarLoader(PipelineLoader):
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
@@ -69,75 +55,17 @@ class BlazeEarningsCalendarLoader(PipelineLoader):
ANNOUNCEMENT_FIELD_NAME,
})
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=EarningsCalendar):
dshape = expr.dshape
if not istabular(dshape):
raise ValueError(
'expression dshape must be tabular, got: %s' % dshape,
)
expected_fields = self._expected_fields
self._expr = bind_expression_to_resources(
expr[list(expected_fields)],
resources,
)
self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {}
self._dataset = dataset
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
data_query_tz = self._data_query_tz
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
self._expr,
lower_dt,
upper_dt,
self._odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
gb = raw.groupby(SID_FIELD_NAME)
def mkseries(idx, raw_loc=raw.loc):
vs = raw_loc[
idx, [TS_FIELD_NAME, ANNOUNCEMENT_FIELD_NAME]
].values
return pd.Series(
index=pd.DatetimeIndex(vs[:, 0]),
data=vs[:, 1],
)
return EarningsCalendarLoader(
dates,
valmap(mkseries, gb.groups),
dataset=self._dataset,
).load_adjusted_array(columns, dates, assets, mask)
dataset=EarningsCalendar,
loader=EarningsCalendarLoader,
**kwargs):
super(
BlazeEarningsCalendarLoader, self
).__init__(expr, dataset=dataset, loader=loader, resources=resources,
odo_kwargs=odo_kwargs, data_query_time=data_query_time,
data_query_tz=data_query_tz, **kwargs)
+120
View File
@@ -0,0 +1,120 @@
from datashape import istabular
from .core import (
TS_FIELD_NAME,
SID_FIELD_NAME,
bind_expression_to_resources,
ffill_query_in_range,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
)
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
class BlazeEventsCalendarLoader(PipelineLoader):
"""An abstract pipeline loader for the events datasets that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
}}
And other dataset-specific fields, where each row of the table is a
record including the sid to identify the company, the timestamp where we
learned about the announcement, and the date when the earnings will be
announced.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
expr,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=None,
loader=None):
dshape = expr.dshape
if not istabular(dshape):
raise ValueError(
'expression dshape must be tabular, got: %s' % dshape,
)
expected_fields = self._expected_fields
self._expr = bind_expression_to_resources(
expr[list(expected_fields)],
resources,
)
self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {}
self._dataset = dataset
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
self._loader = loader
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
data_query_tz = self._data_query_tz
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
self._expr,
lower_dt,
upper_dt,
self._odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
gb = raw.groupby(SID_FIELD_NAME)
return self._loader(
dates,
self.prepare_data(raw, gb),
dataset=self._dataset,
).load_adjusted_array(columns, dates, assets, mask)
def prepare_data(self, raw, gb):
return {sid: raw.loc[group] for sid, group in gb.groups.iteritems()}
+122 -1
View File
@@ -1 +1,122 @@
__author__ = 'mtydykov'
"""
Reference implementation for EarningsCalendar loaders.
"""
from ..data.buyback_auth import CashBuybackAuthorizations, \
ShareBuybackAuthorizations
from events import EventsLoader
from zipline.utils.memoize import lazyval
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_dates'
SHARE_COUNT_FIELD_NAME = 'share_counts'
VALUE_FIELD_NAME = 'values'
# TODO: split into 2 datasets - or just think about how to generalize since
# we will often have cases where we have a knowledge date and, optionally,
# a value for that event; having no value (like earnings) is a special case.
class CashBuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.earnings.BuybackAuthorizations`.
Does not currently support adjustments to the dates of known buyback
authorizations.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, value)]
"""
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=CashBuybackAuthorizations):
super(CashBuybackAuthorizationsLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset
)
def get_loader(self, column):
"""dispatch to the loader for ``column``.
"""
if column is self.dataset.previous_value:
return self.previous_buyback_value_loader
elif column is self.dataset.previous_announcement_date:
return self.previous_event_date_loader
else:
raise ValueError("Don't know how to load column '%s'." % column)
@lazyval
def previous_buyback_value_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_value,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
VALUE_FIELD_NAME
)
@lazyval
def previous_event_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement_date,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
)
class ShareBuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.earnings.BuybackAuthorizations`.
Does not currently support adjustments to the dates of known buyback
authorizations.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, value)]
"""
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=ShareBuybackAuthorizations):
super(ShareBuybackAuthorizationsLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset
)
def get_loader(self, column):
"""dispatch to the loader for ``column``.
"""
if column is self.dataset.previous_share_count:
return self.previous_buyback_share_count_loader
elif column is self.dataset.previous_announcement_date:
return self.previous_event_date_loader
else:
raise ValueError("Don't know how to load column '%s'." % column)
@lazyval
def previous_buyback_share_count_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_share_count,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
SHARE_COUNT_FIELD_NAME
)
@lazyval
def previous_event_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement_date,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
)
+12 -80
View File
@@ -1,71 +1,21 @@
"""
Reference implementation for EarningsCalendar loaders.
"""
from itertools import repeat
import pandas as pd
from six import iteritems
from toolz import merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import next_date_frame, previous_date_frame
from events import EventsLoader
from ..data.earnings import EarningsCalendar
from zipline.utils.memoize import lazyval
ANNOUNCEMENT_FIELD_NAME = "announcement_date"
class EarningsCalendarLoader(PipelineLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.earnings.EarningsCalendar`.
Does not currently support adjustments to the dates of known earnings.
Parameters
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
announcement_dates : dict[int -> pd.Series or pd.DatetimeIndex]
Dict mapping sids to objects representing dates on which earnings
occurred.
If a dict value is a Series, it's interpreted as a mapping from the
date on which we learned an announcement was coming to the date on
which the announcement was made.
If a dict value is a DatetimeIndex, it's interpreted as just containing
the dates that announcements were made, and we assume we knew about the
announcement on all prior dates. This mode is only supported if
``infer_timestamp`` is explicitly passed as a truthy value.
infer_timestamps : bool, optional
Whether to allow passing ``DatetimeIndex`` values in
``announcement_dates``.
"""
def __init__(self,
all_dates,
announcement_dates,
infer_timestamps=False,
class EarningsCalendarLoader(EventsLoader):
def __init__(self, all_dates, events_by_sid, infer_timestamps=False,
dataset=EarningsCalendar):
self.all_dates = all_dates
self.announcement_dates = announcement_dates = (
announcement_dates.copy()
)
dates = self.all_dates.values
for k, v in iteritems(announcement_dates):
if isinstance(v, pd.DatetimeIndex):
if not infer_timestamps:
raise ValueError(
"Got DatetimeIndex of announcement dates for sid %d.\n"
"Pass `infer_timestamps=True` to use the first date in"
" `all_dates` as implicit timestamp."
)
# If we are passed a DatetimeIndex, we always have
# knowledge of the announcements.
announcement_dates[k] = pd.Series(
v, index=repeat(dates[0], len(v)),
)
self.dataset = dataset
super(EarningsCalendarLoader, self).__init__(all_dates,
events_by_sid,
infer_timestamps,
dataset=dataset)
def get_loader(self, column):
"""Dispatch to the loader for ``column``.
@@ -79,30 +29,12 @@ class EarningsCalendarLoader(PipelineLoader):
@lazyval
def next_announcement_loader(self):
return DataFrameLoader(
self.dataset.next_announcement,
next_date_frame(
self.all_dates,
self.announcement_dates,
),
adjustments=None,
)
return self._next_event_date_loader(self.dataset.next_announcement,
ANNOUNCEMENT_FIELD_NAME)
@lazyval
def previous_announcement_loader(self):
return DataFrameLoader(
return self._previous_event_date_loader(
self.dataset.previous_announcement,
previous_date_frame(
self.all_dates,
self.announcement_dates,
),
adjustments=None,
)
def load_adjusted_array(self, columns, dates, assets, mask):
return merge(
self.get_loader(column).load_adjusted_array(
[column], dates, assets, mask
)
for column in columns
ANNOUNCEMENT_FIELD_NAME
)
+85 -18
View File
@@ -1,39 +1,71 @@
from abc import abstractmethod
from itertools import repeat
from abc import ABCMeta, abstractmethod
import numpy as np
import pandas as pd
from six import iteritems
from toolz import merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from zipline.utils.memoize import lazyval
from .utils import next_date_frame, previous_date_frame, previous_value
TS_FIELD_NAME = "timestamp"
class EventsLoader(PipelineLoader):
"""
Abstract loader.
Does not currently support adjustments to the dates of known events.
Parameters
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
events_by_sid : dict[int -> pd.Series]
Dict mapping sids to objects representing dates on which events
occurred.
If a dict value is a Series, it's interpreted as a mapping from the
date on which we learned an announcement was coming to the date on
which the announcement was made.
If a dict value is a DatetimeIndex, it's interpreted as just containing
the dates that announcements were made, and we assume we knew about the
announcement on all prior dates. This mode is only supported if
``infer_timestamp`` is explicitly passed as a truthy value.
infer_timestamps : bool, optional
Whether to allow passing ``DatetimeIndex`` values in
``announcement_dates``.
"""
def __init__(self,
all_dates,
announcement_dates,
events_by_sid,
infer_timestamps=False,
dataset=None):
self.all_dates = all_dates
self.announcement_dates = (
announcement_dates.copy()
# TODO: why are we making a copy here? We end up with a copy that we
# modify and then don't use, and an unmodified original which we do use.
self.events_by_sid = (
events_by_sid.copy()
)
dates = self.all_dates.values
for k, v in iteritems(announcement_dates):
if isinstance(v, pd.DatetimeIndex):
for k, v in iteritems(events_by_sid):
if "timestamp" not in v.columns:
if not infer_timestamps:
raise ValueError(
"Got DatetimeIndex of announcement dates for sid %d.\n"
"Pass `infer_timestamps=True` to use the first date in"
" `all_dates` as implicit timestamp."
)
# If we are passed a DatetimeIndex, we always have
# knowledge of the announcements.
announcement_dates[k] = pd.Series(
v, index=repeat(dates[0], len(v)),
)
self.events_by_sid[k] = v = v.copy()
v.index = [dates[0]] * len(v)
else:
self.events_by_sid[k] = v.set_index("timestamp")
self.dataset = dataset
@@ -50,13 +82,48 @@ class EventsLoader(PipelineLoader):
for column in columns
)
@lazyval
def date_frame_loader(self, col_name, next_or_prev):
def mk_date_series(self, date_field_name):
return {sid: pd.Series(index=event.index,
data=np.array(event[date_field_name]))
for sid, event in iteritems(self.events_by_sid)}
def _next_event_date_loader(self, next_date_field, event_date_field_name):
return DataFrameLoader(
col_name,
next_or_prev(
next_date_field,
next_date_frame(
self.all_dates,
self.announcement_dates,
self.mk_date_series(event_date_field_name),
),
adjustments=None,
)
def _previous_event_date_loader(self, prev_date_field, event_date_field_name):
return DataFrameLoader(
prev_date_field,
previous_date_frame(
self.all_dates,
self.mk_date_series(event_date_field_name),
),
adjustments=None,
)
def _previous_event_value_loader(self,
previous_value_field,
event_date_field_name,
value_field_name):
return DataFrameLoader(
previous_value_field,
previous_value(
self.all_dates,
self.events_by_sid,
event_date_field_name,
value_field_name,
previous_value_field.dtype,
# TODO: need to get actual name/method to use to get missing
# value
None
),
adjustments=None,
)
+57 -3
View File
@@ -1,6 +1,7 @@
import datetime
import numpy as np
from numpy import NaN
import pandas as pd
from six import iteritems
from six.moves import zip
@@ -82,14 +83,14 @@ def previous_date_frame(date_index, events_by_sid):
next_date_frame
"""
sids = list(events_by_sid)
out = np.full((len(date_index), len(sids)), NaTns, dtype='datetime64[ns]')
dn = date_index[-1].asm8
out = np.full((len(date_index), len(sids)), np_NaT, dtype='datetime64[ns]')
d_n = date_index[-1].asm8
for col_idx, sid in enumerate(sids):
# events_by_sid[sid] is Series mapping knowledge_date to actual
# event_date. We don't care about the knowledge date for
# computing previous earnings.
values = events_by_sid[sid].values
values = values[values <= dn]
values = values[values <= d_n]
out[date_index.searchsorted(values), col_idx] = values
frame = pd.DataFrame(out, index=date_index, columns=sids)
@@ -97,6 +98,59 @@ def previous_date_frame(date_index, events_by_sid):
return frame
def previous_value(date_index, events_by_sid, event_date_field, value_field,
value_field_dtype, missing_value):
"""
Make a DataFrame representing simulated next earnings date_index.
Parameters
----------
date_index : DatetimeIndex.
The index of the returned DataFrame.
events_by_sid : dict[int -> DatetimeIndex]
Dict mapping sids to a series of dates. Each k:v pair of the series
represents the date we learned of the event mapping to the date the
event will occur.
Returns
-------
previous_events: pd.DataFrame
A DataFrame where each column is a security from `events_by_sid` where
the values are the dates of the previous event that occured on the date
of the index. Entries falling before the first date will have `NaT` as
the result in the output.
See Also
--------
next_date_frame
"""
sids = list(events_by_sid)
# TODO: generalize; need to use dtype of column and missing value for that
# column; so pass
# in the pipeline column's attributes for these (replace NaN and dtype
# below)
out = np.full(
(len(date_index), len(sids)),
# TODO; replace with missing_value
NaN,
dtype=value_field_dtype
)
d_n = date_index[-1].asm8
for col_idx, sid in enumerate(sids):
# events_by_sid[sid] is DataFrame mapping knowledge_date to event
# date and value. We don't care about the knowledge date for computing
# previous values.
df = events_by_sid[sid]
df = df[df[event_date_field] <= d_n]
out[
date_index.searchsorted(df[event_date_field].values), col_idx
] = df[value_field]
frame = pd.DataFrame(out, index=date_index, columns=sids)
frame.ffill(inplace=True)
return frame
def normalize_data_query_time(dt, time, tz):
"""Apply the correct time and timezone to a date.