MAINT: Rework event datasets.

- Refactored EventsLoader and BlazeEventsLoader to not require a
  subclass per dataset.  Instead, you now pass a map from columns to
  event fields directly to the EventsLoader constructor.

- Removed a large number of Quantopian-specific datasets and associated
  tests.

- Rewrote the core logic of EventsLoader and BlazeEventsLoader to share
  index calculations across multiple requested columns.

- Fixed a bug where event fields were incorrectly forward-filled when
  null values were present in an event.
This commit is contained in:
Scott Sanderson
2016-06-09 01:54:13 -04:00
parent 5a6b870cd5
commit bc302beec9
33 changed files with 780 additions and 3556 deletions
-115
View File
@@ -1,115 +0,0 @@
"""
Tests for the reference loader for 13d filings.
"""
import pandas as pd
from zipline.pipeline.common import TS_FIELD_NAME
from zipline.pipeline.data import _13DFilings
from zipline.pipeline.factors.events import BusinessDaysSince13DFilingsDate
from zipline.pipeline.loaders._13d_filings import (
_13DFilingsLoader,
DISCLOSURE_DATE,
NUM_SHARES,
PERCENT_SHARES,
)
from zipline.pipeline.loaders.utils import (
zip_with_floats,
zip_with_dates
)
from zipline.testing.fixtures import WithPipelineEventDataLoader
from zipline.testing.fixtures import ZiplineTestCase
DAYS_SINCE_PREV_DISCLOSURE = 'days_since_prev_disclosure'
PREVIOUS_DISCLOSURE_DATE = 'previous_disclosure_date'
PREVIOUS_NUM_SHARES = 'previous_number_shares'
PREVIOUS_PERCENT_SHARES = 'previous_percentage'
date_intervals = [
[['2014-01-01', '2014-01-04'],
['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-31']]
]
empty_df = pd.DataFrame(
columns=[NUM_SHARES,
PERCENT_SHARES,
DISCLOSURE_DATE,
TS_FIELD_NAME],
)
empty_df[NUM_SHARES] = empty_df[NUM_SHARES].astype('float')
empty_df[PERCENT_SHARES] = empty_df[PERCENT_SHARES].astype('float')
empty_df[TS_FIELD_NAME] = empty_df[TS_FIELD_NAME].astype('datetime64[ns]')
empty_df[DISCLOSURE_DATE] = empty_df[DISCLOSURE_DATE].astype('datetime64[ns]')
_13d_filings_cases = [
pd.DataFrame({
NUM_SHARES: [1, 15],
PERCENT_SHARES: [10, 20],
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
DISCLOSURE_DATE: pd.to_datetime(['2014-01-04', '2014-01-09'])
}),
empty_df
]
class _13DFilingsLoaderTestCase(WithPipelineEventDataLoader,
ZiplineTestCase):
"""
Test for _13_filings dataset.
"""
pipeline_columns = {
PREVIOUS_NUM_SHARES:
_13DFilings.number_shares.latest,
PREVIOUS_PERCENT_SHARES:
_13DFilings.percent_shares.latest,
PREVIOUS_DISCLOSURE_DATE:
_13DFilings.disclosure_date.latest,
DAYS_SINCE_PREV_DISCLOSURE:
BusinessDaysSince13DFilingsDate(),
}
@classmethod
def get_sids(cls):
return range(2)
@classmethod
def get_dataset(cls):
return {sid: frame
for sid, frame
in enumerate(_13d_filings_cases)}
loader_type = _13DFilingsLoader
def setup(self, dates):
cols = {
PREVIOUS_DISCLOSURE_DATE: self.get_sids_to_frames(
zip_with_dates,
[['NaT', '2014-01-04', '2014-01-09']],
date_intervals,
dates,
'datetime64[ns]',
'NaN'
),
PREVIOUS_NUM_SHARES: self.get_sids_to_frames(
zip_with_floats,
[['NaN', 1, 15]],
date_intervals,
dates,
'float',
'NaN'
),
PREVIOUS_PERCENT_SHARES: self.get_sids_to_frames(
zip_with_floats,
[['NaN', 10, 20]],
date_intervals,
dates,
'float',
'NaN'
)
}
cols[DAYS_SINCE_PREV_DISCLOSURE] = self._compute_busday_offsets(
cols[PREVIOUS_DISCLOSURE_DATE]
)
return cols
-172
View File
@@ -1,172 +0,0 @@
"""
Tests for the reference loader for Buyback Authorizations.
"""
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
import pandas as pd
from six import iteritems
from zipline.pipeline.common import(
DAYS_SINCE_PREV,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data import BuybackAuthorizations
from zipline.pipeline.factors.events import BusinessDaysSinceBuybackAuth
from zipline.pipeline.loaders.buyback_auth import (
BUYBACK_AMOUNT_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
BUYBACK_TYPE_FIELD_NAME,
BUYBACK_UNIT_FIELD_NAME,
BuybackAuthorizationsLoader,
)
from zipline.pipeline.loaders.blaze import BlazeBuybackAuthorizationsLoader
from zipline.pipeline.loaders.utils import (
zip_with_dates,
zip_with_floats,
zip_with_strs
)
from zipline.testing.fixtures import (
WithPipelineEventDataLoader, ZiplineTestCase
)
PREVIOUS_BUYBACK_AMOUNT = 'previous_value'
PREVIOUS_BUYBACK_ANNOUNCEMENT = 'previous_buyback_announcement'
PREVIOUS_BUYBACK_CASH = 'previous_buyback_cash'
PREVIOUS_BUYBACK_SHARE_COUNT = 'previous_buyback_share_count'
PREVIOUS_BUYBACK_TYPE = 'previous_buyback_type'
PREVIOUS_BUYBACK_UNIT = 'previous_buyback_unit'
date_intervals = [
[['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-31']]
]
buyback_authorizations_cases = [
pd.DataFrame({
BUYBACK_AMOUNT_FIELD_NAME: [1, 15],
BUYBACK_UNIT_FIELD_NAME: ["$M", "Mshares"],
BUYBACK_TYPE_FIELD_NAME: ["New", "Additional"],
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
BUYBACK_ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04',
'2014-01-09'])
}),
pd.DataFrame(
columns=[BUYBACK_AMOUNT_FIELD_NAME,
BUYBACK_UNIT_FIELD_NAME,
BUYBACK_TYPE_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
TS_FIELD_NAME],
dtype='datetime64[ns]'
),
]
class BuybackAuthLoaderTestCase(WithPipelineEventDataLoader, ZiplineTestCase):
"""
Test for cash buyback authorizations dataset.
"""
pipeline_columns = {
PREVIOUS_BUYBACK_AMOUNT:
BuybackAuthorizations.previous_amount.latest,
PREVIOUS_BUYBACK_ANNOUNCEMENT:
BuybackAuthorizations.previous_date.latest,
PREVIOUS_BUYBACK_UNIT:
BuybackAuthorizations.previous_unit.latest,
PREVIOUS_BUYBACK_TYPE:
BuybackAuthorizations.previous_type.latest,
DAYS_SINCE_PREV:
BusinessDaysSinceBuybackAuth(),
}
@classmethod
def get_sids(cls):
return range(2)
@classmethod
def get_dataset(cls):
return {sid: frame
for sid, frame
in enumerate(buyback_authorizations_cases)}
loader_type = BuybackAuthorizationsLoader
def setup(self, dates):
cols = {
PREVIOUS_BUYBACK_AMOUNT: self.get_sids_to_frames(zip_with_floats,
[['NaN', 1, 15]],
date_intervals,
dates,
'float',
'NaN'),
PREVIOUS_BUYBACK_ANNOUNCEMENT: self.get_sids_to_frames(
zip_with_dates,
[['NaT', '2014-01-04', '2014-01-09']],
date_intervals,
dates,
'datetime64[ns]',
'NaN'
),
PREVIOUS_BUYBACK_UNIT: self.get_sids_to_frames(
zip_with_strs,
[[None, "$M", "Mshares"]],
date_intervals,
dates,
'category',
None
),
PREVIOUS_BUYBACK_TYPE: self.get_sids_to_frames(
zip_with_strs,
[[None, "New", "Additional"]],
date_intervals,
dates,
'category',
None
)
}
cols[DAYS_SINCE_PREV] = self._compute_busday_offsets(
cols[PREVIOUS_BUYBACK_ANNOUNCEMENT]
)
return cols
class BlazeBuybackAuthLoaderTestCase(BuybackAuthLoaderTestCase):
""" Test case for loading via blaze.
"""
loader_type = BlazeBuybackAuthorizationsLoader
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeBuybackAuthLoaderTestCase,
self,
).pipeline_event_loader_args(dates)
return (bz.data(pd.concat(
pd.DataFrame({
BUYBACK_ANNOUNCEMENT_FIELD_NAME:
frame[BUYBACK_ANNOUNCEMENT_FIELD_NAME],
BUYBACK_AMOUNT_FIELD_NAME:
frame[BUYBACK_AMOUNT_FIELD_NAME],
BUYBACK_UNIT_FIELD_NAME:
frame[BUYBACK_UNIT_FIELD_NAME],
BUYBACK_TYPE_FIELD_NAME:
frame[BUYBACK_TYPE_FIELD_NAME],
TS_FIELD_NAME:
frame[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, frame in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeBuybackAuthLoaderNotInteractiveTestCase(
BlazeBuybackAuthLoaderTestCase
):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def pipeline_event_loader_args(self, dates):
(bound_expr,) = super(
BlazeBuybackAuthLoaderNotInteractiveTestCase,
self,
).pipeline_event_loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
-353
View File
@@ -1,353 +0,0 @@
"""
Tests for the reference loader for ConsensusEstimates.
"""
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
import pandas as pd
from six import iteritems
from zipline.pipeline.common import SID_FIELD_NAME
from zipline.pipeline.data import ConsensusEstimates
from zipline.pipeline.loaders.consensus_estimates import (
ACTUAL_VALUE_FIELD_NAME,
ConsensusEstimatesLoader,
COUNT_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
HIGH_FIELD_NAME,
LOW_FIELD_NAME,
MEAN_FIELD_NAME,
RELEASE_DATE_FIELD_NAME,
STANDARD_DEVIATION_FIELD_NAME,
)
from zipline.pipeline.loaders.blaze import BlazeConsensusEstimatesLoader
from zipline.pipeline.loaders.utils import (
zip_with_floats
)
from zipline.testing.fixtures import (
ZiplineTestCase,
WithNextAndPreviousEventDataLoader
)
NEXT_COUNT = 'next_count'
NEXT_FISCAL_QUARTER = 'next_fiscal_quarter'
NEXT_FISCAL_YEAR = 'next_fiscal_year'
NEXT_HIGH = 'next_high'
NEXT_LOW = 'next_low'
NEXT_MEAN = 'next_mean'
NEXT_RELEASE_DATE = 'next_release_date'
NEXT_STANDARD_DEVIATION = 'next_standard_deviation'
PREVIOUS_ACTUAL_VALUE = 'previous_actual_value'
PREVIOUS_COUNT = 'previous_count'
PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter'
PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year'
PREVIOUS_HIGH = 'previous_high'
PREVIOUS_LOW = 'previous_low'
PREVIOUS_MEAN = 'previous_mean'
PREVIOUS_RELEASE_DATE = 'previous_release_date'
PREVIOUS_STANDARD_DEVIATION = 'previous_standard_deviation'
consensus_estimates_cases = [
# K1--K2--A1--A2.
pd.DataFrame({
ACTUAL_VALUE_FIELD_NAME: (100, 200),
STANDARD_DEVIATION_FIELD_NAME: (.5, .6),
COUNT_FIELD_NAME: (1, 2),
FISCAL_QUARTER_FIELD_NAME: (1, 1),
HIGH_FIELD_NAME: (.6, .7),
MEAN_FIELD_NAME: (.1, .2),
FISCAL_YEAR_FIELD_NAME: (2014, 2014),
LOW_FIELD_NAME: (.05, .06),
}),
# K1--K2--A2--A1.
pd.DataFrame({
ACTUAL_VALUE_FIELD_NAME: (200, 300),
STANDARD_DEVIATION_FIELD_NAME: (.6, .7),
COUNT_FIELD_NAME: (2, 3),
FISCAL_QUARTER_FIELD_NAME: (1, 1),
HIGH_FIELD_NAME: (.7, .8),
MEAN_FIELD_NAME: (.2, .3),
FISCAL_YEAR_FIELD_NAME: (2014, 2014),
LOW_FIELD_NAME: (.06, .07),
}),
# K1--A1--K2--A2.
pd.DataFrame({
ACTUAL_VALUE_FIELD_NAME: (300, 400),
STANDARD_DEVIATION_FIELD_NAME: (.7, .8),
COUNT_FIELD_NAME: (3, 4),
FISCAL_QUARTER_FIELD_NAME: (1, 1),
HIGH_FIELD_NAME: (.8, .9),
MEAN_FIELD_NAME: (.3, .4),
FISCAL_YEAR_FIELD_NAME: (2014, 2014),
LOW_FIELD_NAME: (.07, .08),
}),
# K1 == K2.
pd.DataFrame({
ACTUAL_VALUE_FIELD_NAME: (400, 500),
STANDARD_DEVIATION_FIELD_NAME: (.8, .9),
COUNT_FIELD_NAME: (4, 5),
FISCAL_QUARTER_FIELD_NAME: (1, 1),
HIGH_FIELD_NAME: (.9, 1.0),
MEAN_FIELD_NAME: (.4, .5),
FISCAL_YEAR_FIELD_NAME: (2014, 2014),
LOW_FIELD_NAME: (.08, .09),
}),
pd.DataFrame(
columns=[ACTUAL_VALUE_FIELD_NAME,
STANDARD_DEVIATION_FIELD_NAME,
COUNT_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
HIGH_FIELD_NAME,
MEAN_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
LOW_FIELD_NAME],
dtype='datetime64[ns]'
),
]
prev_actual_value = [
['NaN', 100, 200],
['NaN', 300, 200],
['NaN', 300, 400],
['NaN', 400, 500],
['NaN']
]
next_standard_deviation = [
['NaN', .5, .6, 'NaN'],
['NaN', .6, .7, .6, 'NaN'],
['NaN', .7, 'NaN', .8, 'NaN'],
['NaN', .8, .9, 'NaN'],
['NaN']
]
prev_standard_deviation = [
['NaN', .5, .6],
['NaN', .7, .6],
['NaN', .7, .8],
['NaN', .8, .9],
['NaN']
]
next_count = [
['NaN', 1, 2, 'NaN'],
['NaN', 2, 3, 2, 'NaN'],
['NaN', 3, 'NaN', 4, 'NaN'],
['NaN', 4, 5, 'NaN'],
['NaN']
]
prev_count = [
['NaN', 1, 2],
['NaN', 3, 2],
['NaN', 3, 4],
['NaN', 4, 5],
['NaN']
]
next_fiscal_quarter = [
['NaN', 1, 1, 'NaN'],
['NaN', 1, 1, 1, 'NaN'],
['NaN', 1, 'NaN', 1, 'NaN'],
['NaN', 1, 1, 'NaN'],
['NaN']
]
prev_fiscal_quarter = [
['NaN', 1, 1],
['NaN', 1, 1],
['NaN', 1, 1],
['NaN', 1, 1],
['NaN']
]
next_high = [
['NaN', .6, .7, 'NaN'],
['NaN', .7, .8, .7, 'NaN'],
['NaN', .8, 'NaN', .9, 'NaN'],
['NaN', .9, 1.0, 'NaN'],
['NaN']
]
prev_high = [
['NaN', .6, .7],
['NaN', .8, .7],
['NaN', .8, .9],
['NaN', .9, 1.0],
['NaN']
]
next_mean = [
['NaN', .1, .2, 'NaN'],
['NaN', .2, .3, .2, 'NaN'],
['NaN', .3, 'NaN', .4, 'NaN'],
['NaN', .4, .5, 'NaN'],
['NaN']
]
prev_mean = [
['NaN', .1, .2],
['NaN', .3, .2],
['NaN', .3, .4],
['NaN', .4, .5],
['NaN']
]
next_fiscal_year = [
['NaN', 2014, 2014, 'NaN'],
['NaN', 2014, 2014, 2014, 'NaN'],
['NaN', 2014, 'NaN', 2014, 'NaN'],
['NaN', 2014, 2014, 'NaN'],
['NaN']
]
prev_fiscal_year = [
['NaN', 2014, 2014],
['NaN', 2014, 2014],
['NaN', 2014, 2014],
['NaN', 2014, 2014],
['NaN']
]
next_low = [
['NaN', .05, .06, 'NaN'],
['NaN', .06, .07, .06, 'NaN'],
['NaN', .07, 'NaN', .08, 'NaN'],
['NaN', .08, .09, 'NaN'],
['NaN']
]
prev_low = [
['NaN', .05, .06],
['NaN', .07, .06],
['NaN', .07, .08],
['NaN', .08, .09],
['NaN']
]
field_name_to_expected_col = {
PREVIOUS_ACTUAL_VALUE: prev_actual_value,
PREVIOUS_STANDARD_DEVIATION: prev_standard_deviation,
NEXT_STANDARD_DEVIATION: next_standard_deviation,
PREVIOUS_COUNT: prev_count,
NEXT_COUNT: next_count,
PREVIOUS_FISCAL_QUARTER: prev_fiscal_quarter,
NEXT_FISCAL_QUARTER: next_fiscal_quarter,
PREVIOUS_HIGH: prev_high,
NEXT_HIGH: next_high,
PREVIOUS_MEAN: prev_mean,
NEXT_MEAN: next_mean,
PREVIOUS_FISCAL_YEAR: prev_fiscal_year,
NEXT_FISCAL_YEAR: next_fiscal_year,
PREVIOUS_LOW: prev_low,
NEXT_LOW: next_low
}
class ConsensusEstimatesLoaderTestCase(WithNextAndPreviousEventDataLoader,
ZiplineTestCase):
"""
Tests for loading the consensus estimates data.
"""
pipeline_columns = {
PREVIOUS_ACTUAL_VALUE:
ConsensusEstimates.previous_actual_value.latest,
NEXT_RELEASE_DATE:
ConsensusEstimates.next_release_date.latest,
PREVIOUS_RELEASE_DATE:
ConsensusEstimates.previous_release_date.latest,
PREVIOUS_STANDARD_DEVIATION:
ConsensusEstimates.previous_standard_deviation.latest,
NEXT_STANDARD_DEVIATION:
ConsensusEstimates.next_standard_deviation.latest,
PREVIOUS_COUNT:
ConsensusEstimates.previous_count.latest,
NEXT_COUNT:
ConsensusEstimates.next_count.latest,
PREVIOUS_FISCAL_QUARTER:
ConsensusEstimates.previous_fiscal_quarter.latest,
NEXT_FISCAL_QUARTER:
ConsensusEstimates.next_fiscal_quarter.latest,
PREVIOUS_HIGH:
ConsensusEstimates.previous_high.latest,
NEXT_HIGH:
ConsensusEstimates.next_high.latest,
PREVIOUS_MEAN:
ConsensusEstimates.previous_mean.latest,
NEXT_MEAN:
ConsensusEstimates.next_mean.latest,
PREVIOUS_FISCAL_YEAR:
ConsensusEstimates.previous_fiscal_year.latest,
NEXT_FISCAL_YEAR:
ConsensusEstimates.next_fiscal_year.latest,
PREVIOUS_LOW:
ConsensusEstimates.previous_low.latest,
NEXT_LOW:
ConsensusEstimates.next_low.latest
}
@classmethod
def get_dataset(cls):
return {sid:
pd.concat([
cls.base_cases[sid].rename(columns={
'other_date': RELEASE_DATE_FIELD_NAME
}),
df
], axis=1)
for sid, df in enumerate(consensus_estimates_cases)}
loader_type = ConsensusEstimatesLoader
def setup(self, dates):
cols = {
PREVIOUS_RELEASE_DATE:
self.get_expected_previous_event_dates(
dates, 'datetime64[ns]', 'NaN'
),
NEXT_RELEASE_DATE: self.get_expected_next_event_dates(
dates, 'datetime64[ns]', 'NaN'
)
}
for field_name in field_name_to_expected_col:
cols[field_name] = self.get_sids_to_frames(
zip_with_floats, field_name_to_expected_col[field_name],
self.prev_date_intervals
if field_name.startswith("previous")
else self.next_date_intervals,
dates,
'float',
'NaN'
)
return cols
class BlazeConsensusEstimatesLoaderTestCase(ConsensusEstimatesLoaderTestCase):
loader_type = BlazeConsensusEstimatesLoader
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeConsensusEstimatesLoaderTestCase,
self,
).pipeline_event_loader_args(dates)
frames = []
for sid, df in iteritems(mapping):
frame = df.copy()
frame[SID_FIELD_NAME] = sid
frames.append(frame)
return bz.data(pd.concat(frames).reset_index(drop=True)),
class BlazeConsensusEstimatesLoaderNotInteractiveTestCase(
BlazeConsensusEstimatesLoaderTestCase
):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def pipeline_event_loader_args(self, dates):
(bound_expr,) = super(
BlazeConsensusEstimatesLoaderNotInteractiveTestCase,
self,
).pipeline_event_loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
-517
View File
@@ -1,517 +0,0 @@
"""
Tests for the reference loader for Dividends datasets.
"""
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
import pandas as pd
from six import iteritems
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
PREVIOUS_AMOUNT,
PREVIOUS_ANNOUNCEMENT,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data.dividends import (
DividendsByAnnouncementDate,
DividendsByExDate,
DividendsByPayDate
)
from zipline.pipeline.factors.events import (
BusinessDaysSinceDividendAnnouncement,
BusinessDaysSincePreviousExDate,
BusinessDaysUntilNextExDate
)
from zipline.pipeline.loaders.blaze.dividends import (
BlazeDividendsByAnnouncementDateLoader,
BlazeDividendsByPayDateLoader,
BlazeDividendsByExDateLoader
)
from zipline.pipeline.loaders.dividends import (
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME,
DividendsByAnnouncementDateLoader,
DividendsByExDateLoader,
DividendsByPayDateLoader,
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME,
)
from zipline.pipeline.loaders.utils import (
zip_with_dates,
zip_with_floats,
zip_with_strs,
)
from zipline.testing.fixtures import (
WithPipelineEventDataLoader,
ZiplineTestCase
)
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT = 'days_since_prev_dividend_announcement'
DAYS_SINCE_PREV_EX_DATE = 'days_since_prev_ex_date'
DAYS_TO_NEXT_EX_DATE = 'days_to_next_ex_date'
NEXT_AMOUNT = 'next_amount'
NEXT_CURRENCY_TYPE = 'next_currency_type'
NEXT_DIVIDEND_TYPE = 'next_dividend_type'
NEXT_EX_DATE = 'next_ex_date'
NEXT_PAY_DATE = 'next_pay_date'
PREVIOUS_CURRENCY_TYPE = 'previous_currency_type'
PREVIOUS_DIVIDEND_TYPE = 'previous_dividend_type'
PREVIOUS_EX_DATE = 'previous_ex_date'
PREVIOUS_PAY_DATE = 'previous_pay_date'
dividends_cases = [
# K1--K2--A1--A2.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [1, 15],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-15', '2014-01-20']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09']),
CURRENCY_FIELD_NAME: ["$", "EUR"],
DIVIDEND_TYPE_FIELD_NAME: ["Stock", "Mixed"]
}),
# K1--K2--A2--A1.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [7, 13],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-20', '2014-01-15']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-09']),
CURRENCY_FIELD_NAME: ["EUR", "$"],
DIVIDEND_TYPE_FIELD_NAME: ["Mixed", "Stock"]
}),
# K1--A1--K2--A2.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [3, 1],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-20']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-14']),
CURRENCY_FIELD_NAME: ["$", "EUR"],
DIVIDEND_TYPE_FIELD_NAME: ["Stock", "Mixed"]
}),
# K1 == K2.
pd.DataFrame({
CASH_AMOUNT_FIELD_NAME: [6, 23],
EX_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']),
PAY_DATE_FIELD_NAME: pd.to_datetime(['2014-01-10', '2014-01-15']),
TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2),
ANNOUNCEMENT_FIELD_NAME: pd.to_datetime(['2014-01-04', '2014-01-04']),
CURRENCY_FIELD_NAME: ["$", "EUR"],
DIVIDEND_TYPE_FIELD_NAME: ["Stock", "Mixed"]
}),
pd.DataFrame(
columns=[CASH_AMOUNT_FIELD_NAME,
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME,
TS_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME],
dtype='datetime64[ns]'
),
]
prev_date_intervals = [
[
['2014-01-01', '2014-01-14'], ['2014-01-15', '2014-01-19'],
['2014-01-20', '2014-01-31']
],
[
['2014-01-01', '2014-01-14'], ['2014-01-15', '2014-01-19'],
['2014-01-20', '2014-01-31']
],
[
['2014-01-01', '2014-01-09'], ['2014-01-10', '2014-01-19'],
['2014-01-20', '2014-01-31']
],
[
['2014-01-01', '2014-01-09'], ['2014-01-10', '2014-01-14'],
['2014-01-15', '2014-01-31']
]
]
next_date_intervals = [
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-15'],
['2014-01-16', '2014-01-20'], ['2014-01-21', '2014-01-31']
],
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-15'], ['2014-01-16', '2014-01-20'],
['2014-01-21', '2014-01-31']
],
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-10'],
['2014-01-11', '2014-01-14'], ['2014-01-15', '2014-01-20'],
['2014-01-21', '2014-01-31']
],
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-10'],
['2014-01-11', '2014-01-15'], ['2014-01-16', '2014-01-31']
]
]
next_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20', 'NaT'],
['NaT', '2014-01-20', '2014-01-15', '2014-01-20',
'NaT'],
['NaT', '2014-01-10', 'NaT', '2014-01-20', 'NaT'],
['NaT', '2014-01-10', '2014-01-15', 'NaT']]
prev_ex_and_pay_dates = [['NaT', '2014-01-15', '2014-01-20'],
['NaT', '2014-01-15', '2014-01-20'],
['NaT', '2014-01-10', '2014-01-20'],
['NaT', '2014-01-10', '2014-01-15']]
prev_amounts = [['NaN', 1, 15],
['NaN', 13, 7],
['NaN', 3, 1],
['NaN', 6, 23]]
next_amounts = [['NaN', 1, 15, 'NaN'],
['NaN', 7, 13, 7, 'NaN'],
['NaN', 3, 'NaN', 1, 'NaN'],
['NaN', 6, 23, 'NaN']]
prev_currency_types = [[None, "$", "EUR"],
[None, "$", "EUR"],
[None, "$", "EUR"],
[None, "$", "EUR"]]
next_currency_types = [[None, "$", "EUR", None],
[None, "EUR", "$", "EUR", None],
[None, "$", None, "EUR", None],
[None, "$", "EUR", None]]
prev_dividend_types = [[None, "Stock", "Mixed"],
[None, "Stock", "Mixed"],
[None, "Stock", "Mixed"],
[None, "Stock", "Mixed"]]
next_dividend_types = [[None, "Stock", "Mixed", None],
[None, "Mixed", "Stock", "Mixed", None],
[None, "Stock", None, "Mixed", None],
[None, "Stock", "Mixed", None]]
class DividendsByAnnouncementDateTestCase(WithPipelineEventDataLoader,
ZiplineTestCase):
"""
Tests for loading the dividends by announcement date data.
"""
pipeline_columns = {
PREVIOUS_ANNOUNCEMENT:
DividendsByAnnouncementDate.previous_announcement_date.latest,
PREVIOUS_AMOUNT: DividendsByAnnouncementDate.previous_amount.latest,
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT:
BusinessDaysSinceDividendAnnouncement(),
PREVIOUS_CURRENCY_TYPE:
DividendsByAnnouncementDate.previous_currency.latest,
PREVIOUS_DIVIDEND_TYPE:
DividendsByAnnouncementDate.previous_type.latest,
}
@classmethod
def get_dataset(cls):
return {sid:
frame.drop([EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME], axis=1)
for sid, frame
in enumerate(dividends_cases)}
loader_type = DividendsByAnnouncementDateLoader
def setup(self, dates):
date_intervals = [
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-31']
],
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-31']
],
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-14'],
['2014-01-15', '2014-01-31']
],
[
['2014-01-01', '2014-01-04'], ['2014-01-05', '2014-01-31']
]
]
announcement_dates = [['NaT', '2014-01-04', '2014-01-09'],
['NaT', '2014-01-04', '2014-01-09'],
['NaT', '2014-01-04', '2014-01-14'],
['NaT', '2014-01-04']]
amounts = [['NaN', 1, 15], ['NaN', 7, 13], ['NaN', 3, 1], ['NaN', 23]]
currency_types = [[None, "$", "EUR"], [None, "EUR", "$"],
[None, "$", "EUR"], [None, "EUR"]]
dividend_types = [[None, "Stock", "Mixed"], [None, "Mixed", "Stock"],
[None, "Stock", "Mixed"], [None, "Mixed"]]
cols = {
PREVIOUS_ANNOUNCEMENT: self.get_sids_to_frames(
zip_with_dates, announcement_dates, date_intervals, dates,
'datetime64[ns]', 'NaN'
),
PREVIOUS_AMOUNT: self.get_sids_to_frames(
zip_with_floats, amounts, date_intervals, dates, 'float', 'NaN'
),
PREVIOUS_CURRENCY_TYPE: self.get_sids_to_frames(
zip_with_strs, currency_types, date_intervals, dates,
'category', None
),
PREVIOUS_DIVIDEND_TYPE: self.get_sids_to_frames(
zip_with_strs, dividend_types, date_intervals, dates,
'category', None
),
}
cols[
DAYS_SINCE_PREV_DIVIDEND_ANNOUNCEMENT
] = self._compute_busday_offsets(cols[PREVIOUS_ANNOUNCEMENT])
return cols
class BlazeDividendsByAnnouncementDateTestCase(
DividendsByAnnouncementDateTestCase
):
loader_type = BlazeDividendsByAnnouncementDateLoader
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeDividendsByAnnouncementDateTestCase,
self,
).pipeline_event_loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME],
CURRENCY_FIELD_NAME: df[CURRENCY_FIELD_NAME],
DIVIDEND_TYPE_FIELD_NAME: df[DIVIDEND_TYPE_FIELD_NAME],
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeDividendsByAnnouncementDateNotInteractiveTestCase(
BlazeDividendsByAnnouncementDateTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def pipeline_event_loader_args(self, dates):
(bound_expr,) = super(
BlazeDividendsByAnnouncementDateNotInteractiveTestCase,
self,
).pipeline_event_loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class DividendsByExDateTestCase(WithPipelineEventDataLoader, ZiplineTestCase):
"""
Tests for loading the dividends by ex date data.
"""
pipeline_columns = {
NEXT_EX_DATE: DividendsByExDate.next_date.latest,
PREVIOUS_EX_DATE: DividendsByExDate.previous_date.latest,
NEXT_AMOUNT: DividendsByExDate.next_amount.latest,
PREVIOUS_AMOUNT: DividendsByExDate.previous_amount.latest,
PREVIOUS_CURRENCY_TYPE: DividendsByExDate.previous_currency.latest,
NEXT_CURRENCY_TYPE: DividendsByExDate.next_currency.latest,
PREVIOUS_DIVIDEND_TYPE: DividendsByExDate.previous_type.latest,
NEXT_DIVIDEND_TYPE: DividendsByExDate.next_type.latest,
DAYS_TO_NEXT_EX_DATE: BusinessDaysUntilNextExDate(),
DAYS_SINCE_PREV_EX_DATE: BusinessDaysSincePreviousExDate()
}
@classmethod
def get_dataset(cls):
return {sid:
frame.drop([ANNOUNCEMENT_FIELD_NAME,
PAY_DATE_FIELD_NAME], axis=1)
for sid, frame
in enumerate(dividends_cases)}
loader_type = DividendsByExDateLoader
def setup(self, dates):
cols = {
NEXT_EX_DATE: self.get_sids_to_frames(
zip_with_dates, next_ex_and_pay_dates, next_date_intervals,
dates,
'datetime64[ns]', 'NaN'
),
PREVIOUS_EX_DATE: self.get_sids_to_frames(
zip_with_dates, prev_ex_and_pay_dates, prev_date_intervals,
dates,
'datetime64[ns]', 'NaN'
),
NEXT_AMOUNT: self.get_sids_to_frames(
zip_with_floats, next_amounts, next_date_intervals, dates,
'float', 'NaN'
),
PREVIOUS_AMOUNT: self.get_sids_to_frames(
zip_with_floats, prev_amounts, prev_date_intervals, dates,
'float', 'NaN'
),
PREVIOUS_CURRENCY_TYPE: self.get_sids_to_frames(
zip_with_strs, prev_currency_types, prev_date_intervals, dates,
'category', None
),
NEXT_CURRENCY_TYPE: self.get_sids_to_frames(
zip_with_strs, next_currency_types, next_date_intervals, dates,
'category', None
),
PREVIOUS_DIVIDEND_TYPE: self.get_sids_to_frames(
zip_with_strs, prev_dividend_types, prev_date_intervals, dates,
'category', None
),
NEXT_DIVIDEND_TYPE: self.get_sids_to_frames(
zip_with_strs, next_dividend_types, next_date_intervals, dates,
'category', None
),
}
cols[DAYS_TO_NEXT_EX_DATE] = self._compute_busday_offsets(
cols[NEXT_EX_DATE]
)
cols[DAYS_SINCE_PREV_EX_DATE] = self._compute_busday_offsets(
cols[PREVIOUS_EX_DATE]
)
return cols
class BlazeDividendsByExDateLoaderTestCase(DividendsByExDateTestCase):
loader_type = BlazeDividendsByExDateLoader
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeDividendsByExDateLoaderTestCase,
self,
).pipeline_event_loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
EX_DATE_FIELD_NAME: df[EX_DATE_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME],
CURRENCY_FIELD_NAME: df[CURRENCY_FIELD_NAME],
DIVIDEND_TYPE_FIELD_NAME: df[DIVIDEND_TYPE_FIELD_NAME],
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeDividendsByExDateLoaderNotInteractiveTestCase(
BlazeDividendsByExDateLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def pipeline_event_loader_args(self, dates):
(bound_expr,) = super(
BlazeDividendsByExDateLoaderNotInteractiveTestCase,
self,
).pipeline_event_loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
class DividendsByPayDateTestCase(WithPipelineEventDataLoader, ZiplineTestCase):
"""
Tests for loading the dividends by pay date data.
"""
pipeline_columns = {
NEXT_PAY_DATE: DividendsByPayDate.next_date.latest,
PREVIOUS_PAY_DATE: DividendsByPayDate.previous_date.latest,
NEXT_AMOUNT: DividendsByPayDate.next_amount.latest,
PREVIOUS_AMOUNT: DividendsByPayDate.previous_amount.latest,
PREVIOUS_CURRENCY_TYPE: DividendsByPayDate.previous_currency.latest,
NEXT_CURRENCY_TYPE: DividendsByPayDate.next_currency.latest,
PREVIOUS_DIVIDEND_TYPE: DividendsByPayDate.previous_type.latest,
NEXT_DIVIDEND_TYPE: DividendsByPayDate.next_type.latest,
}
@classmethod
def get_dataset(cls):
return {sid:
frame.drop([ANNOUNCEMENT_FIELD_NAME,
EX_DATE_FIELD_NAME], axis=1)
for sid, frame
in enumerate(dividends_cases)}
loader_type = DividendsByPayDateLoader
def setup(self, dates):
return {
NEXT_PAY_DATE: self.get_sids_to_frames(
zip_with_dates, next_ex_and_pay_dates, next_date_intervals,
dates,
'datetime64[ns]', 'NaN'
),
PREVIOUS_PAY_DATE: self.get_sids_to_frames(
zip_with_dates, prev_ex_and_pay_dates, prev_date_intervals,
dates,
'datetime64[ns]', 'NaN'
),
NEXT_AMOUNT: self.get_sids_to_frames(
zip_with_floats, next_amounts, next_date_intervals, dates,
'float', 'NaN'
),
PREVIOUS_AMOUNT: self.get_sids_to_frames(
zip_with_floats, prev_amounts, prev_date_intervals, dates,
'float', 'NaN'
),
PREVIOUS_CURRENCY_TYPE: self.get_sids_to_frames(
zip_with_strs, prev_currency_types, prev_date_intervals, dates,
'category', None
),
NEXT_CURRENCY_TYPE: self.get_sids_to_frames(
zip_with_strs, next_currency_types, next_date_intervals, dates,
'category', None
),
PREVIOUS_DIVIDEND_TYPE: self.get_sids_to_frames(
zip_with_strs, prev_dividend_types, prev_date_intervals, dates,
'category', None
),
NEXT_DIVIDEND_TYPE: self.get_sids_to_frames(
zip_with_strs, next_dividend_types, next_date_intervals, dates,
'category', None
),
}
class BlazeDividendsByPayDateLoaderTestCase(DividendsByPayDateTestCase):
loader_type = BlazeDividendsByPayDateLoader
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeDividendsByPayDateLoaderTestCase,
self,
).pipeline_event_loader_args(dates)
return (bz.Data(pd.concat(
pd.DataFrame({
PAY_DATE_FIELD_NAME: df[PAY_DATE_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
CASH_AMOUNT_FIELD_NAME: df[CASH_AMOUNT_FIELD_NAME],
CURRENCY_FIELD_NAME: df[CURRENCY_FIELD_NAME],
DIVIDEND_TYPE_FIELD_NAME: df[DIVIDEND_TYPE_FIELD_NAME],
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeDividendsByPayDateLoaderNotInteractiveTestCase(
BlazeDividendsByPayDateLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def pipeline_event_loader_args(self, dates):
(bound_expr,) = super(
BlazeDividendsByPayDateLoaderNotInteractiveTestCase,
self,
).pipeline_event_loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
-98
View File
@@ -1,98 +0,0 @@
"""
Tests for the reference loader for EarningsCalendar.
"""
import blaze as bz
from blaze.compute.core import swap_resources_into_scope
import pandas as pd
from six import iteritems
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
DAYS_SINCE_PREV,
DAYS_TO_NEXT,
NEXT_ANNOUNCEMENT,
PREVIOUS_ANNOUNCEMENT,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.factors.events import (
BusinessDaysSincePreviousEarnings,
BusinessDaysUntilNextEarnings,
)
from zipline.pipeline.loaders.earnings import EarningsCalendarLoader
from zipline.pipeline.loaders.blaze import BlazeEarningsCalendarLoader
from zipline.testing.fixtures import (
ZiplineTestCase,
WithNextAndPreviousEventDataLoader
)
class EarningsCalendarLoaderTestCase(WithNextAndPreviousEventDataLoader,
ZiplineTestCase):
"""
Tests for loading the earnings announcement data.
"""
pipeline_columns = {
NEXT_ANNOUNCEMENT: EarningsCalendar.next_announcement.latest,
PREVIOUS_ANNOUNCEMENT: EarningsCalendar.previous_announcement.latest,
DAYS_SINCE_PREV: BusinessDaysSincePreviousEarnings(),
DAYS_TO_NEXT: BusinessDaysUntilNextEarnings(),
}
@classmethod
def get_dataset(cls):
return {sid: df.rename(
columns={'other_date': ANNOUNCEMENT_FIELD_NAME}
) for sid, df in enumerate(cls.base_cases)}
loader_type = EarningsCalendarLoader
def setup(self, dates):
cols = {
PREVIOUS_ANNOUNCEMENT: self.get_expected_previous_event_dates(
dates,
'datetime64[ns]', 'NaN'
),
NEXT_ANNOUNCEMENT: self.get_expected_next_event_dates(
dates, 'datetime64[ns]', 'NaN'
),
}
cols[DAYS_TO_NEXT] = self._compute_busday_offsets(
cols[NEXT_ANNOUNCEMENT]
)
cols[DAYS_SINCE_PREV] = self._compute_busday_offsets(
cols[PREVIOUS_ANNOUNCEMENT]
)
return cols
class BlazeEarningsCalendarLoaderTestCase(EarningsCalendarLoaderTestCase):
loader_type = BlazeEarningsCalendarLoader
def pipeline_event_loader_args(self, dates):
_, mapping = super(
BlazeEarningsCalendarLoaderTestCase,
self,
).pipeline_event_loader_args(dates)
return (bz.data(pd.concat(
pd.DataFrame({
ANNOUNCEMENT_FIELD_NAME: df[ANNOUNCEMENT_FIELD_NAME],
TS_FIELD_NAME: df[TS_FIELD_NAME],
SID_FIELD_NAME: sid,
})
for sid, df in iteritems(mapping)
).reset_index(drop=True)),)
class BlazeEarningsCalendarLoaderNotInteractiveTestCase(
BlazeEarningsCalendarLoaderTestCase):
"""Test case for passing a non-interactive symbol and a dict of resources.
"""
def pipeline_event_loader_args(self, dates):
(bound_expr,) = super(
BlazeEarningsCalendarLoaderNotInteractiveTestCase,
self,
).pipeline_event_loader_args(dates)
return swap_resources_into_scope(bound_expr, {})
+425 -266
View File
@@ -1,299 +1,458 @@
"""
Tests for setting up an EventsLoader and a BlazeEventsLoader.
"""
import re
from unittest import TestCase
from itertools import product
import blaze as bz
from nose_parameterized import parameterized
import numpy as np
from numpy.testing import assert_array_equal
import pandas as pd
from pandas.util.testing import assert_series_equal
from zipline.pipeline import Pipeline, SimplePipelineEngine
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
TS_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME
)
from zipline.pipeline.data import DataSet, Column
from zipline.pipeline.loaders.events import EventsLoader
from zipline.pipeline.loaders.blaze.events import BlazeEventsLoader
from zipline.pipeline.loaders.events import (
DF_NO_TS_NOT_INFER_TS_ERROR,
DTINDEX_NOT_INFER_TS_ERROR,
EventsLoader,
SERIES_NO_DTINDEX_ERROR,
WRONG_COLS_ERROR,
WRONG_MANY_COL_DATA_FORMAT_ERROR,
WRONG_SINGLE_COL_DATA_FORMAT_ERROR
from zipline.pipeline.loaders.utils import (
previous_event_indexer,
next_event_indexer,
)
from zipline.testing import ZiplineTestCase
from zipline.testing.fixtures import (
WithAssetFinder,
WithNYSETradingDays,
)
from zipline.testing.predicates import assert_equal
from zipline.utils.numpy_utils import (
categorical_dtype,
datetime64ns_dtype,
float64_dtype,
int64_dtype,
)
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import datetime64ns_dtype
OTHER_FIELD = "other_field"
ABSTRACT_CONCRETE_LOADER_ERROR = 'abstract methods concrete_loader'
ABSTRACT_EXPECTED_COLS_ERROR = 'abstract methods event_date_col, expected_cols'
class EventDataSet(DataSet):
previous_announcement = Column(datetime64ns_dtype)
next_announcement = Column(datetime64ns_dtype)
previous_event_date = Column(dtype=datetime64ns_dtype)
next_event_date = Column(dtype=datetime64ns_dtype)
class EventDataSetLoader(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME])
previous_float = Column(dtype=float64_dtype)
next_float = Column(dtype=float64_dtype)
event_date_col = ANNOUNCEMENT_FIELD_NAME
previous_datetime = Column(dtype=datetime64ns_dtype)
next_datetime = Column(dtype=datetime64ns_dtype)
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=EventDataSet):
super(EventDataSetLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
previous_int = Column(dtype=int64_dtype, missing_value=-1)
next_int = Column(dtype=int64_dtype, missing_value=-1)
@lazyval
def previous_announcement_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement,
)
previous_string = Column(dtype=categorical_dtype, missing_value=None)
next_string = Column(dtype=categorical_dtype, missing_value=None)
@lazyval
def next_announcement_loader(self):
return self._next_event_date_loader(
self.dataset.next_announcement,
)
# Test case just for catching an error when multiple columns are in the wrong
# data format, so no loader defined.
class EventDataSetLoaderMultipleExpectedColsNoColumnLoaders(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME, OTHER_FIELD])
event_date_col = ANNOUNCEMENT_FIELD_NAME
class EventDataSetLoaderNoExpectedCols(EventsLoader):
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=EventDataSet):
super(EventDataSetLoaderNoExpectedCols, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
dtx = pd.date_range('2014-01-01', '2014-01-10')
class EventLoaderTestCase(TestCase):
def test_null_in_event_date_col(self):
# Tests that if there is a null date in the event date column, it is
# filtered out and does not break on loading the adjusted array.
dates_with_null = pd.Series(dtx)
dates_with_null[2] = pd.NaT
events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME:
dates_with_null,
TS_FIELD_NAME: dtx})}
loader = EventDataSetLoader(
dtx,
events_by_sid,
)
prev_result = loader.load_adjusted_array({
EventDataSet.previous_announcement
}, dtx, [0], [True])[EventDataSet.previous_announcement].data[:, 0]
next_result = loader.load_adjusted_array({
EventDataSet.next_announcement
}, dtx, [0], [True])[EventDataSet.next_announcement].data[:, 0]
expected_prev = dates_with_null[:]
expected_prev[2] = dtx[1]
assert_array_equal(prev_result, expected_prev)
expected_next = dates_with_null[:]
expected_next[2] = np.datetime64('NaT')
assert_array_equal(next_result, expected_next)
def assert_loader_error(self, events_by_sid, error, msg,
infer_timestamps, loader):
with self.assertRaisesRegexp(error, re.escape(msg)):
loader(
dtx, events_by_sid, infer_timestamps=infer_timestamps,
)
def test_no_expected_cols_defined(self):
events_by_sid = {0: pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx})}
self.assert_loader_error(events_by_sid, TypeError,
ABSTRACT_EXPECTED_COLS_ERROR,
True, EventDataSetLoaderNoExpectedCols)
def test_wrong_cols(self):
wrong_col_name = 'some_other_col'
# Test wrong cols (cols != expected)
events_by_sid = {0: pd.DataFrame({wrong_col_name: dtx})}
self.assert_loader_error(
events_by_sid, ValueError, WRONG_COLS_ERROR.format(
expected_columns=list(EventDataSetLoader.expected_cols),
sid=0,
resulting_columns=[wrong_col_name],
),
True,
EventDataSetLoader
)
@parameterized.expand([
# DataFrame without timestamp column and infer_timestamps = True
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}), True],
# DataFrame with timestamp column
[pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
TS_FIELD_NAME: dtx}), False],
# DatetimeIndex with infer_timestamps = True
[pd.DatetimeIndex(dtx), True],
# Series with DatetimeIndex as index and infer_timestamps = False
[pd.Series(dtx, index=dtx), False]
])
def test_conversion_to_df(self, df, infer_timestamps):
events_by_sid = {0: df}
loader = EventDataSetLoader(
dtx,
events_by_sid,
infer_timestamps=infer_timestamps,
)
self.assertEqual(
loader.events_by_sid.keys(),
events_by_sid.keys(),
)
if infer_timestamps:
expected = pd.Series(index=[dtx[0]] * 10, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME)
else:
expected = pd.Series(index=dtx, data=dtx,
name=ANNOUNCEMENT_FIELD_NAME)
expected.index.name = TS_FIELD_NAME
# Check that index by first given date has been added
assert_series_equal(
loader.events_by_sid[0][ANNOUNCEMENT_FIELD_NAME],
expected,
)
@parameterized.expand(
[
# DataFrame without timestamp column and infer_timestamps = True
[
pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx}),
False,
DF_NO_TS_NOT_INFER_TS_ERROR.format(
timestamp_column_name=TS_FIELD_NAME,
sid=0
),
EventDataSetLoader
],
# DatetimeIndex with infer_timestamps = False
[
pd.DatetimeIndex(dtx, name=ANNOUNCEMENT_FIELD_NAME),
False,
DTINDEX_NOT_INFER_TS_ERROR.format(sid=0),
EventDataSetLoader
],
# Series with DatetimeIndex as index and infer_timestamps = False
[
pd.Series(dtx, name=ANNOUNCEMENT_FIELD_NAME),
False,
SERIES_NO_DTINDEX_ERROR.format(sid=0),
EventDataSetLoader
],
# Below, 2 cases repeated for infer_timestamps = True and False.
# Shouldn't make a difference in the outcome.
# We expected 1 column but got a data structure other than a
# DataFrame, Series, or DatetimeIndex
[
[dtx],
True,
WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoader
],
# We expected multiple columns but got a data structure other
# than a DataFrame
[
[dtx, dtx],
True,
WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoaderMultipleExpectedColsNoColumnLoaders
],
[
[dtx],
False,
WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoader
],
# We expected multiple columns but got a data structure other
# than a DataFrame
[
[dtx, dtx],
False,
WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=0),
EventDataSetLoaderMultipleExpectedColsNoColumnLoaders
]
]
previous_string_custom_missing = Column(
dtype=categorical_dtype,
missing_value=u"<<NULL>>",
)
next_string_custom_missing = Column(
dtype=categorical_dtype,
missing_value=u"<<NULL>>",
)
def test_bad_conversion_to_df(self, df, infer_timestamps, msg, loader):
events_by_sid = {0: df}
self.assert_loader_error(events_by_sid, ValueError, msg,
infer_timestamps, loader)
class BlazeEventDataSetLoaderNoConcreteLoader(BlazeEventsLoader):
def __init__(self,
expr,
dataset=EventDataSet,
**kwargs):
super(
BlazeEventDataSetLoaderNoConcreteLoader, self
).__init__(expr,
dataset=dataset,
**kwargs)
critical_dates = pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
])
class BlazeEventLoaderTestCase(TestCase):
# Blaze loader: need to test failure if no concrete loader
def test_no_concrete_loader_defined(self):
with self.assertRaisesRegexp(
TypeError, re.escape(ABSTRACT_CONCRETE_LOADER_ERROR)
):
BlazeEventDataSetLoaderNoConcreteLoader(
bz.data(
pd.DataFrame({ANNOUNCEMENT_FIELD_NAME: dtx,
SID_FIELD_NAME: 0})
def make_events_for_sid(sid, event_dates, event_timestamps):
num_events = len(event_dates)
return pd.DataFrame({
'sid': np.full(num_events, sid, dtype=np.int64),
'timestamp': event_timestamps,
'event_date': event_dates,
'float': np.arange(num_events, dtype=np.float64) + sid,
'int': np.arange(num_events) + sid,
'datetime': pd.date_range('1990-01-01', periods=num_events).shift(sid),
'string': ['-'.join([str(sid), str(i)]) for i in range(num_events)],
})
def make_null_event_date_events(all_sids, timestamp):
"""
Make an event with a null event_date for all sids.
Used to test that EventsLoaders filter out null events.
"""
return pd.DataFrame({
'sid': all_sids,
'timestamp': timestamp,
'event_date': pd.Timestamp('NaT'),
'float': -9999.0,
'int': -9999,
'datetime': pd.Timestamp('1980'),
'string': 'should be ignored',
})
def make_events(add_nulls):
"""
Every event has at least three pieces of data associated with it:
1. sid : The ID of the asset associated with the event.
2. event_date : The date on which an event occurred.
3. timestamp : The date on which we learned about the event.
This can be before the occurence_date in the case of an
announcement about an upcoming event.
Events for two different sids shouldn't interact in any way, so the
interesting cases are determined by the possible interleavings of
event_date and timestamp for a single sid.
Fix two events with dates e1, e2 and timestamps t1 and t2.
Without loss of generality, assume that e1 < e2. (If two events have the
same occurrence date, the behavior of next/previous event is undefined).
The remaining possible sequences of events are given by taking all possible
4-tuples of four ascending dates. For each possible interleaving, we
generate a set of fake events with those dates and assign them to a new
sid.
"""
def gen_date_interleavings():
for e1, e2, t1, t2 in product(*[critical_dates] * 4):
if e1 < e2:
yield (e1, e2, t1, t2)
event_frames = []
for sid, (e1, e2, t1, t2) in enumerate(gen_date_interleavings()):
event_frames.append(make_events_for_sid(sid, [e1, e2], [t1, t2]))
if add_nulls:
for date in critical_dates:
event_frames.append(
make_null_event_date_events(
np.arange(sid + 1),
timestamp=date,
)
)
return pd.concat(event_frames, ignore_index=True)
class BlazeEventDataSetLoader(BlazeEventsLoader):
concrete_loader = EventDataSetLoader
_expected_fields = frozenset({ANNOUNCEMENT_FIELD_NAME,
TS_FIELD_NAME,
SID_FIELD_NAME})
def __init__(self,
expr,
dataset=EventDataSet,
**kwargs):
super(
BlazeEventDataSetLoader, self
).__init__(expr,
dataset=dataset,
**kwargs)
class EventIndexerTestCase(ZiplineTestCase):
@classmethod
def init_class_fixtures(cls):
super(EventIndexerTestCase, cls).init_class_fixtures()
cls.events = make_events(add_nulls=False).sort('event_date')
cls.events.reset_index(inplace=True)
def test_previous_event_indexer(self):
events = self.events
event_sids = events['sid'].values
event_dates = events['event_date'].values
event_timestamps = events['timestamp'].values
all_dates = pd.date_range('2014', '2014-01-31')
all_sids = np.unique(event_sids)
indexer = previous_event_indexer(
all_dates,
all_sids,
event_dates,
event_timestamps,
event_sids,
)
# Compute expected results without knowledge of null events.
for i, sid in enumerate(all_sids):
self.check_previous_event_indexer(
events,
all_dates,
sid,
indexer[:, i],
)
def check_previous_event_indexer(self,
events,
all_dates,
sid,
indexer):
relevant_events = events[events.sid == sid]
self.assertEqual(len(relevant_events), 2)
ix1, ix2 = relevant_events.index
# An event becomes a possible value once we're past both its event_date
# and its timestamp.
event1_first_eligible = max(
relevant_events.loc[ix1, ['event_date', 'timestamp']],
)
event2_first_eligible = max(
relevant_events.loc[ix2, ['event_date', 'timestamp']],
)
for date, computed_index in zip(all_dates, indexer):
if date >= event2_first_eligible:
# If we've seen event 2, it should win even if we've seen event
# 1, because events are sorted by event_date.
self.assertEqual(computed_index, ix2)
elif date >= event1_first_eligible:
# If we've seen event 1 but not event 2, event 1 should win.
self.assertEqual(computed_index, ix1)
else:
# If we haven't seen either event, then we should have -1 as
# sentinel.
self.assertEqual(computed_index, -1)
def test_next_event_indexer(self):
events = self.events
event_sids = events['sid'].values
event_dates = events['event_date'].values
event_timestamps = events['timestamp'].values
all_dates = pd.date_range('2014', '2014-01-31')
all_sids = np.unique(event_sids)
indexer = next_event_indexer(
all_dates,
all_sids,
event_dates,
event_timestamps,
event_sids,
)
# Compute expected results without knowledge of null events.
for i, sid in enumerate(all_sids):
self.check_next_event_indexer(
events,
all_dates,
sid,
indexer[:, i],
)
def check_next_event_indexer(self,
events,
all_dates,
sid,
indexer):
relevant_events = events[events.sid == sid]
self.assertEqual(len(relevant_events), 2)
ix1, ix2 = relevant_events.index
e1, e2 = relevant_events['event_date']
t1, t2 = relevant_events['timestamp']
for date, computed_index in zip(all_dates, indexer):
# An event is eligible to be the next event if it's between the
# timestamp and the event_date, inclusive.
if t1 <= date <= e1:
# If e1 is eligible, it should be chosen even if e2 is
# eligible, since it's earlier.
self.assertEqual(computed_index, ix1)
elif t2 <= date <= e2:
# e2 is eligible and e1 is not, so e2 should be chosen.
self.assertEqual(computed_index, ix2)
else:
# Neither event is eligible. Return -1 as a sentinel.
self.assertEqual(computed_index, -1)
class EventsLoaderTestCase(WithAssetFinder,
WithNYSETradingDays,
ZiplineTestCase):
START_DATE = pd.Timestamp('2014-01-01')
END_DATE = pd.Timestamp('2014-01-30')
@classmethod
def init_class_fixtures(cls):
# This is a rare case where we actually want to do work **before** we
# call init_class_fixtures. We choose our sids for WithAssetFinder
# based on the events generated by make_event_data.
cls.raw_events = make_events(add_nulls=True)
cls.raw_events_no_nulls = cls.raw_events[
cls.raw_events['event_date'].notnull()
]
cls.next_value_columns = {
EventDataSet.next_datetime: 'datetime',
EventDataSet.next_event_date: 'event_date',
EventDataSet.next_float: 'float',
EventDataSet.next_int: 'int',
EventDataSet.next_string: 'string',
EventDataSet.next_string_custom_missing: 'string'
}
cls.previous_value_columns = {
EventDataSet.previous_datetime: 'datetime',
EventDataSet.previous_event_date: 'event_date',
EventDataSet.previous_float: 'float',
EventDataSet.previous_int: 'int',
EventDataSet.previous_string: 'string',
EventDataSet.previous_string_custom_missing: 'string'
}
cls.loader = cls.make_loader(
events=cls.raw_events,
next_value_columns=cls.next_value_columns,
previous_value_columns=cls.previous_value_columns,
)
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.raw_events['sid'].unique())
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
]
super(EventsLoaderTestCase, cls).init_class_fixtures()
@classmethod
def make_loader(cls, events, next_value_columns, previous_value_columns):
# This method exists to be overridden by BlazeEventsLoaderTestCase
return EventsLoader(events, next_value_columns, previous_value_columns)
def test_load_with_trading_calendar(self):
engine = SimplePipelineEngine(
lambda x: self.loader,
self.trading_days,
self.asset_finder,
)
results = engine.run_pipeline(
Pipeline({c.name: c.latest for c in EventDataSet.columns}),
start_date=self.trading_days[0],
end_date=self.trading_days[-1],
)
for c in EventDataSet.columns:
if c in self.next_value_columns:
self.check_next_value_results(c, results[c.name].unstack())
elif c in self.previous_value_columns:
self.check_previous_value_results(c, results[c.name].unstack())
else:
raise AssertionError("Unexpected column %s." % c)
def assert_result_contains_all_sids(self, results):
assert_equal(
list(map(int, results.columns)),
self.ASSET_FINDER_EQUITY_SIDS,
)
def check_previous_value_results(self, column, results):
"""
Check previous value results for a single column.
"""
# Verify that we got a result for every sid.
self.assert_result_contains_all_sids(results)
events = self.raw_events_no_nulls
# Remove timezone info from trading days, since the outputs
# from pandas won't be tz_localized.
dates = self.trading_days.tz_localize(None)
for asset, asset_result in results.iterkv():
relevant_events = events[events.sid == asset.sid]
self.assertEqual(len(relevant_events), 2)
v1, v2 = relevant_events[self.previous_value_columns[column]]
event1_first_eligible = max(
# .ix doesn't work here because the frame index contains
# integers, so 0 is still interpreted as a key.
relevant_events.iloc[0].loc[['event_date', 'timestamp']],
)
event2_first_eligible = max(
relevant_events.iloc[1].loc[['event_date', 'timestamp']]
)
for date, computed_value in zip(dates, asset_result):
if date >= event2_first_eligible:
# If we've seen event 2, it should win even if we've seen
# event 1, because events are sorted by event_date.
self.assertEqual(computed_value, v2)
elif date >= event1_first_eligible:
# If we've seen event 1 but not event 2, event 1 should
# win.
self.assertEqual(computed_value, v1)
else:
# If we haven't seen either event, then we should have
# column.missing_value.
assert_equal(
computed_value,
column.missing_value,
# Coerce from Timestamp to datetime64.
allow_datetime_coercions=True,
)
def check_next_value_results(self, column, results):
"""
Check results for a single column.
"""
self.assert_result_contains_all_sids(results)
events = self.raw_events_no_nulls
# Remove timezone info from trading days, since the outputs
# from pandas won't be tz_localized.
dates = self.trading_days.tz_localize(None)
for asset, asset_result in results.iterkv():
relevant_events = events[events.sid == asset.sid]
self.assertEqual(len(relevant_events), 2)
v1, v2 = relevant_events[self.next_value_columns[column]]
e1, e2 = relevant_events['event_date']
t1, t2 = relevant_events['timestamp']
for date, computed_value in zip(dates, asset_result):
if t1 <= date <= e1:
# If we've seen event 2, it should win even if we've seen
# event 1, because events are sorted by event_date.
self.assertEqual(computed_value, v1)
elif t2 <= date <= e2:
# If we've seen event 1 but not event 2, event 1 should
# win.
self.assertEqual(computed_value, v2)
else:
# If we haven't seen either event, then we should have
# column.missing_value.
assert_equal(
computed_value,
column.missing_value,
# Coerce from Timestamp to datetime64.
allow_datetime_coercions=True,
)
def test_wrong_cols(self):
# Test wrong cols (cols != expected)
events = pd.DataFrame({
'c': [5],
SID_FIELD_NAME: [1],
TS_FIELD_NAME: [pd.Timestamp('2014')],
EVENT_DATE_FIELD_NAME: [pd.Timestamp('2014')],
})
EventsLoader(events, {EventDataSet.next_float: 'c'}, {})
EventsLoader(events, {}, {EventDataSet.previous_float: 'c'})
with self.assertRaises(ValueError) as e:
EventsLoader(events, {EventDataSet.next_float: 'd'}, {})
msg = str(e.exception)
expected = (
"EventsLoader missing required columns ['d'].\n"
"Got Columns: ['c', 'event_date', 'sid', 'timestamp']\n"
"Expected Columns: ['d', 'event_date', 'sid', 'timestamp']"
)
self.assertEqual(msg, expected)
class BlazeEventsLoaderTestCase(EventsLoaderTestCase):
"""
Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader.
"""
@classmethod
def make_loader(cls, events, next_value_columns, previous_value_columns):
return BlazeEventsLoader(
bz.data(events),
next_value_columns,
previous_value_columns,
)
+2
View File
@@ -9,5 +9,7 @@ DAYS_TO_NEXT = 'days_to_next'
NEXT_ANNOUNCEMENT = 'next_announcement'
PREVIOUS_AMOUNT = 'previous_amount'
PREVIOUS_ANNOUNCEMENT = 'previous_announcement'
EVENT_DATE_FIELD_NAME = 'event_date'
SID_FIELD_NAME = 'sid'
TS_FIELD_NAME = 'timestamp'
-15
View File
@@ -1,15 +0,0 @@
"""
Dataset representing recently disclosed 13d filings.
"""
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
from .dataset import Column, DataSet
class _13DFilings(DataSet):
"""
Dataset representing dates of recently disclosed 13d filings.
"""
number_shares = Column(float64_dtype)
percent_shares = Column(float64_dtype)
disclosure_date = Column(datetime64ns_dtype)
-16
View File
@@ -1,25 +1,9 @@
from ._13d_filings import _13DFilings
from .buyback_auth import BuybackAuthorizations
from .dividends import (
DividendsByAnnouncementDate,
DividendsByExDate,
DividendsByPayDate,
)
from .earnings import EarningsCalendar
from .consensus_estimates import ConsensusEstimates
from .equity_pricing import USEquityPricing
from .dataset import DataSet, Column, BoundColumn
__all__ = [
'_13DFilings',
'BoundColumn',
'BuybackAuthorizations',
'Column',
'DataSet',
'DividendsByAnnouncementDate',
'DividendsByExDate',
'DividendsByPayDate',
'EarningsCalendar',
'ConsensusEstimates',
'USEquityPricing',
]
-21
View File
@@ -1,21 +0,0 @@
"""
Datasets representing dates of recently announced buyback authorizations.
"""
from zipline.utils.numpy_utils import (
datetime64ns_dtype,
float64_dtype,
categorical_dtype
)
from .dataset import Column, DataSet
class BuybackAuthorizations(DataSet):
"""
Dataset representing dates of recently announced cash buyback
authorizations.
"""
previous_amount = Column(float64_dtype)
previous_date = Column(datetime64ns_dtype)
previous_unit = Column(categorical_dtype, missing_value=None)
previous_type = Column(categorical_dtype, missing_value=None)
@@ -1,29 +0,0 @@
"""
Datasets representing consensus estimates data.
"""
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
from .dataset import Column, DataSet
class ConsensusEstimates(DataSet):
"""
Dataset representing consensus estimates data.
"""
previous_release_date = Column(datetime64ns_dtype)
next_release_date = Column(datetime64ns_dtype)
previous_standard_deviation = Column(float64_dtype)
next_standard_deviation = Column(float64_dtype)
previous_count = Column(float64_dtype)
next_count = Column(float64_dtype)
previous_fiscal_quarter = Column(float64_dtype)
next_fiscal_quarter = Column(float64_dtype)
previous_high = Column(float64_dtype)
next_high = Column(float64_dtype)
previous_mean = Column(float64_dtype)
next_mean = Column(float64_dtype)
previous_fiscal_year = Column(float64_dtype)
next_fiscal_year = Column(float64_dtype)
previous_low = Column(float64_dtype)
next_low = Column(float64_dtype)
previous_actual_value = Column(float64_dtype)
-39
View File
@@ -1,39 +0,0 @@
"""
Dataset representing dates of upcoming dividends.
"""
from zipline.utils.numpy_utils import (
categorical_dtype,
datetime64ns_dtype,
float64_dtype,
)
from .dataset import Column, DataSet
class DividendsByExDate(DataSet):
next_date = Column(datetime64ns_dtype)
previous_date = Column(datetime64ns_dtype)
next_amount = Column(float64_dtype)
previous_amount = Column(float64_dtype)
next_currency = Column(categorical_dtype)
previous_currency = Column(categorical_dtype)
next_type = Column(categorical_dtype)
previous_type = Column(categorical_dtype)
class DividendsByPayDate(DataSet):
next_date = Column(datetime64ns_dtype)
previous_date = Column(datetime64ns_dtype)
next_amount = Column(float64_dtype)
previous_amount = Column(float64_dtype)
next_currency = Column(categorical_dtype)
previous_currency = Column(categorical_dtype)
next_type = Column(categorical_dtype)
previous_type = Column(categorical_dtype)
class DividendsByAnnouncementDate(DataSet):
previous_announcement_date = Column(datetime64ns_dtype)
previous_amount = Column(float64_dtype)
previous_currency = Column(categorical_dtype)
previous_type = Column(categorical_dtype)
-17
View File
@@ -1,17 +0,0 @@
"""
Dataset representing dates of upcoming earnings.
"""
from zipline.utils.numpy_utils import datetime64ns_dtype
from .dataset import Column, DataSet
class EarningsCalendar(DataSet):
"""
Dataset representing dates of upcoming or recently announced earnings.
"""
next_announcement = Column(datetime64ns_dtype)
previous_announcement = Column(datetime64ns_dtype)
# TODO: Provide categorical columns for when during the day the
# announcement occurred.
+4 -14
View File
@@ -5,13 +5,8 @@ from .factor import (
RecarrayField,
)
from .events import (
BusinessDaysSince13DFilingsDate,
BusinessDaysSinceBuybackAuth,
BusinessDaysSinceDividendAnnouncement,
BusinessDaysSincePreviousEarnings,
BusinessDaysSincePreviousExDate,
BusinessDaysUntilNextEarnings,
BusinessDaysUntilNextExDate,
BusinessDaysSincePreviousEvent,
BusinessDaysUntilNextEvent,
)
from .technical import (
Aroon,
@@ -37,13 +32,8 @@ __all__ = [
'Aroon',
'AverageDollarVolume',
'BollingerBands',
'BusinessDaysSince13DFilingsDate',
'BusinessDaysSinceBuybackAuth',
'BusinessDaysSinceDividendAnnouncement',
'BusinessDaysSincePreviousEarnings',
'BusinessDaysSincePreviousExDate',
'BusinessDaysUntilNextEarnings',
'BusinessDaysUntilNextExDate',
'BusinessDaysSincePreviousEvent',
'BusinessDaysUntilNextEvent',
'CustomFactor',
'EWMA',
'EWMSTD',
+2 -124
View File
@@ -3,13 +3,6 @@ Factors describing information about event data (e.g. earnings
announcements, acquisitions, dividends, etc.).
"""
from numpy import newaxis
from ..data import (
_13DFilings,
BuybackAuthorizations,
DividendsByAnnouncementDate,
DividendsByExDate,
EarningsCalendar
)
from zipline.utils.numpy_utils import (
NaTD,
busday_count_mask_NaT,
@@ -20,7 +13,7 @@ from zipline.utils.numpy_utils import (
from .factor import Factor
class BusinessDaysSincePreviousEvents(Factor):
class BusinessDaysSincePreviousEvent(Factor):
"""
Abstract class for business days since a previous event.
Returns the number of **business days** (not trading days!) since
@@ -51,7 +44,7 @@ class BusinessDaysSincePreviousEvents(Factor):
return busday_count_mask_NaT(announce_dates, reference_dates)
class BusinessDaysUntilNextEvents(Factor):
class BusinessDaysUntilNextEvent(Factor):
"""
Abstract class for business days since a next event.
Returns the number of **business days** (not trading days!) until
@@ -84,118 +77,3 @@ class BusinessDaysUntilNextEvents(Factor):
# Convert row labels into a column vector for broadcasted comparison.
reference_dates = dates.values.astype(datetime64D_dtype)[:, newaxis]
return busday_count_mask_NaT(reference_dates, announce_dates)
class BusinessDaysUntilNextEarnings(BusinessDaysUntilNextEvents):
"""
Factor returning the number of **business days** (not trading days!) until
the next known earnings date for each asset.
Assets that announced or will announce earnings on the day of ``compute``
will produce a value of 0.0. Assets that will announce the event on the
next upcoming business day will produce a value of 1.0.
Assets for which the date of the next earnings announcement is ``NaT`` will
produce a value of ``NaN``. This most commonly occurs because many
companies do not publish the exact date of their upcoming earnings
announcements until a few weeks before the announcement.
See Also
--------
BusinessDaysSincePreviousEarnings
"""
inputs = [EarningsCalendar.next_announcement]
class BusinessDaysSincePreviousEarnings(BusinessDaysSincePreviousEvents):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent earnings date for each asset.
Assets that announced or will announce earnings on the day of ``compute``
will produce a value of 0.0. Assets that will announce the event on the
next upcoming business day will produce a value of 1.0.
Assets which announced or will announce the earnings today will produce a
value of 0.0. Assets that announced the on the previous business day will
produce a value of 1.0.
Assets for which the previous earnings date is `NaT` will produce a value
of `NaN`. This will happen in the interval between IPO and first earnings
for most companies.
See Also
--------
BusinessDaysUntilNextEarnings
"""
inputs = [EarningsCalendar.previous_announcement]
class BusinessDaysSinceBuybackAuth(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent buyback authorization for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceBuybackAuth
"""
inputs = [BuybackAuthorizations.previous_date]
class BusinessDaysSinceDividendAnnouncement(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent dividend announcement for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement
"""
inputs = [DividendsByAnnouncementDate.previous_announcement_date]
class BusinessDaysUntilNextExDate(
BusinessDaysUntilNextEvents
):
"""
Factor returning the number of **business days** (not trading days!) until
the next ex date for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement
"""
inputs = [DividendsByExDate.next_date]
class BusinessDaysSincePreviousExDate(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent ex date for each asset.
See Also
--------
zipline.pipeline.factors.BusinessDaysSinceDividendAnnouncement
"""
inputs = [DividendsByExDate.previous_date]
class BusinessDaysSince13DFilingsDate(
BusinessDaysSincePreviousEvents
):
"""
Factor returning the number of **business days** (not trading days!) since
the most recent 13d filings for each asset.
"""
inputs = [_13DFilings.disclosure_date]
-54
View File
@@ -1,54 +0,0 @@
"""
Reference implementation for 13d filings loaders.
"""
from zipline.pipeline.data import _13DFilings
from zipline.pipeline.loaders.events import EventsLoader
from zipline.utils.memoize import lazyval
DISCLOSURE_DATE = 'disclosure_date'
NUM_SHARES = 'number_shares'
PERCENT_SHARES = 'percent_shares'
class _13DFilingsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data._13DFilings`.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
disclosure date, percent shares, number of shares)]
"""
expected_cols = frozenset([DISCLOSURE_DATE,
PERCENT_SHARES,
NUM_SHARES])
event_date_col = DISCLOSURE_DATE
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=_13DFilings):
super(_13DFilingsLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def disclosure_date_loader(self):
return self._previous_event_date_loader(
self.dataset.disclosure_date,
)
@lazyval
def percent_shares_loader(self):
return self._previous_event_value_loader(
self.dataset.percent_shares,
PERCENT_SHARES
)
@lazyval
def number_shares_loader(self):
return self._previous_event_value_loader(
self.dataset.number_shares,
NUM_SHARES
)
-16
View File
@@ -1,21 +1,5 @@
from ._13d_filings import _13DFilingsLoader
from .buyback_auth import BuybackAuthorizationsLoader
from .consensus_estimates import ConsensusEstimatesLoader
from .earnings import EarningsCalendarLoader
from .dividends import (
DividendsByAnnouncementDateLoader,
DividendsByExDateLoader,
DividendsByPayDateLoader,
)
from .equity_pricing_loader import USEquityPricingLoader
__all__ = [
'_13DFilingsLoader',
'BuybackAuthorizationsLoader',
'DividendsByAnnouncementDateLoader',
'DividendsByExDateLoader',
'DividendsByPayDateLoader',
'EarningsCalendarLoader',
'ConsensusEstimatesLoader',
'USEquityPricingLoader',
]
@@ -1,68 +0,0 @@
from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME
from zipline.pipeline.data import _13DFilings
from zipline.pipeline.loaders import _13DFilingsLoader
from .events import BlazeEventsLoader
from zipline.pipeline.loaders._13d_filings import (
DISCLOSURE_DATE,
NUM_SHARES,
PERCENT_SHARES,
)
class Blaze_13DFilingsLoader(BlazeEventsLoader):
"""A pipeline loader for the ``_13DFilings`` dataset that
loads data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{PERCENTAGE}: float64,
{NUM_SHARES}: float64,
{DISCLOSURE_DATE}: ?datetime,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the disclosure, the
date of the disclosure, the percentage, and the number of shares.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all disclosures.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
PERCENTAGE=PERCENT_SHARES,
NUM_SHARES=NUM_SHARES,
DISCLOSURE_DATE=DISCLOSURE_DATE
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
PERCENT_SHARES,
NUM_SHARES,
DISCLOSURE_DATE
})
concrete_loader = _13DFilingsLoader
concrete_dataset = _13DFilings
@@ -1,30 +1,11 @@
from ._13d_filings import Blaze_13DFilingsLoader
from .buyback_auth import BlazeBuybackAuthorizationsLoader
from .core import (
BlazeLoader,
NoDeltasWarning,
from_blaze,
global_loader,
)
from .dividends import (
BlazeDividendsByAnnouncementDateLoader,
BlazeDividendsByExDateLoader,
BlazeDividendsByPayDateLoader
)
from .earnings import (
BlazeEarningsCalendarLoader,
)
from .consensus_estimates import BlazeConsensusEstimatesLoader
__all__ = (
'Blaze_13DFilingsLoader',
'BlazeBuybackAuthorizationsLoader',
'BlazeDividendsByAnnouncementDateLoader',
'BlazeConsensusEstimatesLoader',
'BlazeDividendsByExDateLoader',
'BlazeDividendsByPayDateLoader',
'BlazeEarningsCalendarLoader',
'BlazeLoader',
'from_blaze',
'global_loader',
@@ -1,76 +0,0 @@
from .core import (
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data import BuybackAuthorizations
from zipline.pipeline.loaders import BuybackAuthorizationsLoader
from .events import BlazeEventsLoader
from zipline.pipeline.loaders.buyback_auth import (
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
BUYBACK_TYPE_FIELD_NAME,
BUYBACK_UNIT_FIELD_NAME,
)
from zipline.pipeline.loaders.buyback_auth import BUYBACK_AMOUNT_FIELD_NAME
class BlazeBuybackAuthorizationsLoader(BlazeEventsLoader):
"""A pipeline loader for the ``BuybackAuthorizations`` dataset that
loads data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the loadable terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{BUYBACK_ANNOUNCEMENT_FIELD_NAME}: ?datetime,
{BUYBACK_AMOUNT_FIELD_NAME}: ?float64,
{BUYBACK_UNIT_FIELD_NAME}: ?str,
{BUYBACK_TYPE_FIELD_NAME}: ?str,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the buyback was announced, the buyback amount, the buyback unit,
and the buyback type.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME=BUYBACK_ANNOUNCEMENT_FIELD_NAME,
BUYBACK_AMOUNT_FIELD_NAME=BUYBACK_AMOUNT_FIELD_NAME,
BUYBACK_UNIT_FIELD_NAME=BUYBACK_UNIT_FIELD_NAME,
BUYBACK_TYPE_FIELD_NAME=BUYBACK_TYPE_FIELD_NAME
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
BUYBACK_ANNOUNCEMENT_FIELD_NAME,
BUYBACK_AMOUNT_FIELD_NAME,
BUYBACK_UNIT_FIELD_NAME,
BUYBACK_TYPE_FIELD_NAME
})
concrete_loader = BuybackAuthorizationsLoader
default_dataset = BuybackAuthorizations
@@ -1,94 +0,0 @@
from .events import BlazeEventsLoader
from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME
from zipline.pipeline.data import ConsensusEstimates
from zipline.pipeline.loaders import ConsensusEstimatesLoader
from zipline.pipeline.loaders.consensus_estimates import (
ACTUAL_VALUE_FIELD_NAME,
COUNT_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
HIGH_FIELD_NAME,
LOW_FIELD_NAME,
MEAN_FIELD_NAME,
RELEASE_DATE_FIELD_NAME,
STANDARD_DEVIATION_FIELD_NAME,
)
class BlazeConsensusEstimatesLoader(BlazeEventsLoader):
"""A pipeline loader for the ``ConsensusEstimates`` dataset that
loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the loadable terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{RELEASE_DATE_FIELD_NAME}: ?datetime,
{STANDARD_DEVIATION_FIELD_NAME}: ?float64,
{COUNT_FIELD_NAME}: ?float64,
{FISCAL_QUARTER_FIELD_NAME}: ?float64,
{HIGH_FIELD_NAME}: ?float64,
{MEAN_FIELD_NAME}: ?float64,
{FISCAL_YEAR_FIELD_NAME}: ?float64,
{LOW_FIELD_NAME}: ?float64,
{ACTUAL_VALUE_FIELD_NAME}: ?float64
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement,
the release date for the corresponding estimate, and other estimate
information.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
RELEASE_DATE_FIELD_NAME=RELEASE_DATE_FIELD_NAME,
STANDARD_DEVIATION_FIELD_NAME=STANDARD_DEVIATION_FIELD_NAME,
COUNT_FIELD_NAME=COUNT_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME=FISCAL_QUARTER_FIELD_NAME,
HIGH_FIELD_NAME=HIGH_FIELD_NAME,
MEAN_FIELD_NAME=MEAN_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME=FISCAL_YEAR_FIELD_NAME,
LOW_FIELD_NAME=LOW_FIELD_NAME,
ACTUAL_VALUE_FIELD_NAME=ACTUAL_VALUE_FIELD_NAME
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
RELEASE_DATE_FIELD_NAME,
STANDARD_DEVIATION_FIELD_NAME,
COUNT_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
HIGH_FIELD_NAME,
MEAN_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
LOW_FIELD_NAME,
ACTUAL_VALUE_FIELD_NAME
})
concrete_loader = ConsensusEstimatesLoader
default_dataset = ConsensusEstimates
-209
View File
@@ -1,209 +0,0 @@
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data import (
DividendsByExDate,
DividendsByAnnouncementDate,
DividendsByPayDate
)
from zipline.pipeline.loaders import (
DividendsByAnnouncementDateLoader,
DividendsByPayDateLoader,
DividendsByExDateLoader
)
from .events import BlazeEventsLoader
from zipline.pipeline.loaders.dividends import (
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME,
EX_DATE_FIELD_NAME,
PAY_DATE_FIELD_NAME,
)
class BlazeDividendsByAnnouncementDateLoader(BlazeEventsLoader):
"""A pipeline loader for the ``DividendsByAnnouncementDate`` dataset that
loads data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{CASH_AMOUNT_FIELD_NAME}: ?float64,
{ANNOUNCEMENT_FIELD_NAME}: ?datetime,
{CURRENCY_FIELD_NAME}: ?string,
{DIVIDEND_TYPE_FIELD_NAME}: ?string,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, the
date when the dividends will be announced, and the cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME=ANNOUNCEMENT_FIELD_NAME,
CURRENCY_FIELD_NAME=CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME=DIVIDEND_TYPE_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME
})
concrete_loader = DividendsByAnnouncementDateLoader
default_dataset = DividendsByAnnouncementDate
class BlazeDividendsByExDateLoader(BlazeEventsLoader):
"""A pipeline loader for the ``DividendsByExDate`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{EX_DATE_FIELD_NAME}: ?datetime,
{CASH_AMOUNT_FIELD_NAME}: ?datetime,
{CURRENCY_FIELD_NAME}: ?string,
{DIVIDEND_TYPE_FIELD_NAME}: ?string,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the ex date, the
ex date, and the associated cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
EX_DATE_FIELD_NAME=EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME=CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME=DIVIDEND_TYPE_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME,
})
concrete_loader = DividendsByExDateLoader
default_dataset = DividendsByExDate
class BlazeDividendsByPayDateLoader(BlazeEventsLoader):
"""A pipeline loader for the ``DividendsByPayDate`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the atomic terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{PAY_DATE_FIELD_NAME}: ?datetime,
{CASH_AMOUNT_FIELD_NAME}: ?datetime,
{CURRENCY_FIELD_NAME}: ?string,
{DIVIDEND_TYPE_FIELD_NAME}: ?string,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the pay date, the pay date,
and the associated cash amount.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
PAY_DATE_FIELD_NAME=PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME=CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME=CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME=DIVIDEND_TYPE_FIELD_NAME
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME,
})
concrete_loader = DividendsByPayDateLoader
default_dataset = DividendsByPayDate
@@ -1,61 +0,0 @@
from zipline.pipeline.common import (
ANNOUNCEMENT_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data import EarningsCalendar
from zipline.pipeline.loaders import EarningsCalendarLoader
from .events import BlazeEventsLoader
class BlazeEarningsCalendarLoader(BlazeEventsLoader):
"""A pipeline loader for the ``EarningsCalendar`` dataset that loads
data from a blaze expression.
Parameters
----------
expr : Expr
The expression representing the data to load.
resources : dict, optional
Mapping from the loadable terms of ``expr`` to actual data resources.
odo_kwargs : dict, optional
Extra keyword arguments to pass to odo when executing the expression.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
dataset: DataSet
The DataSet object for which this loader loads data.
Notes
-----
The expression should have a tabular dshape of::
Dim * {{
{SID_FIELD_NAME}: int64,
{TS_FIELD_NAME}: datetime,
{ANNOUNCEMENT_FIELD_NAME}: ?datetime,
}}
Where each row of the table is a record including the sid to identify the
company, the timestamp where we learned about the announcement, and the
date when the earnings will be announced.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
TS_FIELD_NAME=TS_FIELD_NAME,
SID_FIELD_NAME=SID_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME=ANNOUNCEMENT_FIELD_NAME,
)
_expected_fields = frozenset({
TS_FIELD_NAME,
SID_FIELD_NAME,
ANNOUNCEMENT_FIELD_NAME,
})
concrete_loader = EarningsCalendarLoader
default_dataset = EarningsCalendar
+23 -24
View File
@@ -1,5 +1,3 @@
import abc
from datashape import istabular
from .core import (
@@ -7,6 +5,10 @@ from .core import (
ffill_query_in_range,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.events import (
EventsLoader,
required_event_fields,
)
from zipline.pipeline.common import (
SID_FIELD_NAME,
TS_FIELD_NAME,
@@ -56,41 +58,37 @@ class BlazeEventsLoader(PipelineLoader):
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
default_dataset = None
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
expr,
next_value_columns,
previous_value_columns,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
dataset=default_dataset):
if dataset is None:
dataset = self.default_dataset
data_query_tz=None):
dshape = expr.dshape
if not istabular(dshape):
raise ValueError(
'expression dshape must be tabular, got: %s' % dshape,
)
expected_fields = self._expected_fields
required_cols = list(
required_event_fields(next_value_columns, previous_value_columns)
)
self._expr = bind_expression_to_resources(
expr[list(expected_fields)],
expr[required_cols],
resources,
)
self._next_value_columns = next_value_columns
self._previous_value_columns = previous_value_columns
self._odo_kwargs = odo_kwargs if odo_kwargs is not None else {}
self._dataset = dataset
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
@abc.abstractproperty
def concrete_loader(self):
NotImplementedError('concrete_loader')
def load_adjusted_array(self, columns, dates, assets, mask):
data_query_time = self._data_query_time
data_query_tz = self._data_query_tz
@@ -120,13 +118,14 @@ class BlazeEventsLoader(PipelineLoader):
inplace=True,
ts_field=TS_FIELD_NAME,
)
gb = raw.groupby(SID_FIELD_NAME)
return self.concrete_loader(
dates,
self.prepare_data(raw, gb),
dataset=self._dataset,
).load_adjusted_array(columns, dates, assets, mask)
def prepare_data(self, raw, gb):
return {sid: raw.loc[group].drop(SID_FIELD_NAME, axis=1) for sid, group
in gb.groups.items()}
return EventsLoader(
events=raw,
next_value_columns=self._next_value_columns,
previous_value_columns=self._previous_value_columns,
).load_adjusted_array(
columns,
dates,
assets,
mask,
)
-69
View File
@@ -1,69 +0,0 @@
"""
Reference implementation for buyback auth loaders.
"""
from ..data import BuybackAuthorizations
from .events import EventsLoader
from zipline.utils.memoize import lazyval
BUYBACK_AMOUNT_FIELD_NAME = 'buyback_amount'
BUYBACK_ANNOUNCEMENT_FIELD_NAME = 'buyback_date'
BUYBACK_TYPE_FIELD_NAME = 'buyback_type'
BUYBACK_UNIT_FIELD_NAME = 'buyback_unit'
class BuybackAuthorizationsLoader(EventsLoader):
"""
Reference loader for
:class:`zipline.pipeline.data.BuybackAuthorizations`.
events_by_sid: dict[sid -> pd.DataFrame(knowledge date,
event date, buyback amount, buyback unit, buyback type)]
"""
expected_cols = frozenset([BUYBACK_ANNOUNCEMENT_FIELD_NAME,
BUYBACK_AMOUNT_FIELD_NAME,
BUYBACK_UNIT_FIELD_NAME,
BUYBACK_TYPE_FIELD_NAME])
event_date_col = BUYBACK_ANNOUNCEMENT_FIELD_NAME
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=BuybackAuthorizations):
super(BuybackAuthorizationsLoader, self).__init__(
all_dates,
events_by_sid,
infer_timestamps=infer_timestamps,
dataset=dataset,
)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
BUYBACK_AMOUNT_FIELD_NAME
)
@lazyval
def previous_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_date,
)
@lazyval
def previous_unit_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_unit,
BUYBACK_UNIT_FIELD_NAME,
)
@lazyval
def previous_type_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_type,
BUYBACK_TYPE_FIELD_NAME,
)
@@ -1,156 +0,0 @@
"""
Reference implementation for ConsensusEstimates loaders.
"""
from ..data import ConsensusEstimates
from .events import EventsLoader
from zipline.utils.memoize import lazyval
ACTUAL_VALUE_FIELD_NAME = 'actual_value'
COUNT_FIELD_NAME = 'count'
FISCAL_QUARTER_FIELD_NAME = 'fiscal_quarter'
FISCAL_YEAR_FIELD_NAME = 'fiscal_year'
HIGH_FIELD_NAME = 'high'
LOW_FIELD_NAME = 'low'
MEAN_FIELD_NAME = 'mean'
RELEASE_DATE_FIELD_NAME = 'release_date'
STANDARD_DEVIATION_FIELD_NAME = 'standard_deviation'
class ConsensusEstimatesLoader(EventsLoader):
expected_cols = frozenset([RELEASE_DATE_FIELD_NAME,
STANDARD_DEVIATION_FIELD_NAME,
COUNT_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
HIGH_FIELD_NAME,
MEAN_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
LOW_FIELD_NAME,
ACTUAL_VALUE_FIELD_NAME])
event_date_col = RELEASE_DATE_FIELD_NAME
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=ConsensusEstimates):
super(ConsensusEstimatesLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def next_release_date_loader(self):
return self._next_event_date_loader(
self.dataset.next_release_date,
)
@lazyval
def previous_release_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_release_date,
)
@lazyval
def next_standard_deviation_loader(self):
return self._next_event_value_loader(
self.dataset.next_standard_deviation,
STANDARD_DEVIATION_FIELD_NAME,
)
@lazyval
def previous_standard_deviation_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_standard_deviation,
STANDARD_DEVIATION_FIELD_NAME,
)
@lazyval
def next_count_loader(self):
return self._next_event_value_loader(
self.dataset.next_count,
COUNT_FIELD_NAME,
)
@lazyval
def previous_count_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_count,
COUNT_FIELD_NAME,
)
@lazyval
def next_fiscal_quarter_loader(self):
return self._next_event_value_loader(
self.dataset.next_fiscal_quarter,
FISCAL_QUARTER_FIELD_NAME,
)
@lazyval
def previous_fiscal_quarter_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_fiscal_quarter,
FISCAL_QUARTER_FIELD_NAME,
)
@lazyval
def next_high_loader(self):
return self._next_event_value_loader(
self.dataset.next_high,
HIGH_FIELD_NAME,
)
@lazyval
def previous_high_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_high,
HIGH_FIELD_NAME,
)
@lazyval
def next_mean_loader(self):
return self._next_event_value_loader(
self.dataset.next_mean,
MEAN_FIELD_NAME,
)
@lazyval
def previous_mean_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_mean,
MEAN_FIELD_NAME,
)
@lazyval
def next_fiscal_year_loader(self):
return self._next_event_value_loader(
self.dataset.next_fiscal_year,
FISCAL_YEAR_FIELD_NAME,
)
@lazyval
def previous_fiscal_year_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_fiscal_year,
FISCAL_YEAR_FIELD_NAME,
)
@lazyval
def next_low_loader(self):
return self._next_event_value_loader(
self.dataset.next_low,
LOW_FIELD_NAME,
)
@lazyval
def previous_low_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_low,
LOW_FIELD_NAME,
)
@lazyval
def previous_actual_value_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_actual_value,
ACTUAL_VALUE_FIELD_NAME,
)
-191
View File
@@ -1,191 +0,0 @@
from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME
from zipline.pipeline.loaders.events import EventsLoader
from ..data import (
DividendsByAnnouncementDate,
DividendsByExDate,
DividendsByPayDate,
)
from zipline.utils.memoize import lazyval
CASH_AMOUNT_FIELD_NAME = 'cash_amount'
CASH_AMOUNT_FIELD_NAME = 'cash_amount'
CURRENCY_FIELD_NAME = 'currency_type'
DIVIDEND_TYPE_FIELD_NAME = 'dividend_type'
EX_DATE_FIELD_NAME = 'ex_date'
PAY_DATE_FIELD_NAME = 'pay_date'
class DividendsByAnnouncementDateLoader(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME])
event_date_col = ANNOUNCEMENT_FIELD_NAME
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=DividendsByAnnouncementDate):
super(DividendsByAnnouncementDateLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def previous_announcement_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement_date,
)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
CASH_AMOUNT_FIELD_NAME
)
@lazyval
def previous_currency_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_currency,
CURRENCY_FIELD_NAME
)
@lazyval
def previous_type_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_type,
DIVIDEND_TYPE_FIELD_NAME
)
class DividendsByPayDateLoader(EventsLoader):
expected_cols = frozenset([PAY_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME])
event_date_col = PAY_DATE_FIELD_NAME
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=DividendsByPayDate):
super(DividendsByPayDateLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def next_date_loader(self):
return self._next_event_date_loader(self.dataset.next_date)
@lazyval
def previous_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_date,
)
@lazyval
def next_amount_loader(self):
return self._next_event_value_loader(self.dataset.next_amount,
CASH_AMOUNT_FIELD_NAME)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
CASH_AMOUNT_FIELD_NAME
)
@lazyval
def previous_currency_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_currency,
CURRENCY_FIELD_NAME
)
@lazyval
def next_currency_loader(self):
return self._next_event_value_loader(
self.dataset.next_currency,
CURRENCY_FIELD_NAME
)
@lazyval
def previous_type_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_type,
DIVIDEND_TYPE_FIELD_NAME
)
@lazyval
def next_type_loader(self):
return self._next_event_value_loader(
self.dataset.next_type,
DIVIDEND_TYPE_FIELD_NAME
)
class DividendsByExDateLoader(EventsLoader):
expected_cols = frozenset([EX_DATE_FIELD_NAME,
CASH_AMOUNT_FIELD_NAME,
CURRENCY_FIELD_NAME,
DIVIDEND_TYPE_FIELD_NAME])
event_date_col = EX_DATE_FIELD_NAME
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=DividendsByExDate):
super(DividendsByExDateLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def next_date_loader(self):
return self._next_event_date_loader(self.dataset.next_date)
@lazyval
def previous_date_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_date,
)
@lazyval
def next_amount_loader(self):
return self._next_event_value_loader(self.dataset.next_amount,
CASH_AMOUNT_FIELD_NAME)
@lazyval
def previous_amount_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_amount,
CASH_AMOUNT_FIELD_NAME
)
@lazyval
def previous_currency_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_currency,
CURRENCY_FIELD_NAME
)
@lazyval
def next_currency_loader(self):
return self._next_event_value_loader(
self.dataset.next_currency,
CURRENCY_FIELD_NAME
)
@lazyval
def previous_type_loader(self):
return self._previous_event_value_loader(
self.dataset.previous_type,
DIVIDEND_TYPE_FIELD_NAME
)
@lazyval
def next_type_loader(self):
return self._next_event_value_loader(
self.dataset.next_type,
DIVIDEND_TYPE_FIELD_NAME
)
-32
View File
@@ -1,32 +0,0 @@
"""
Reference implementation for EarningsCalendar loaders.
"""
from ..data import EarningsCalendar
from .events import EventsLoader
from zipline.pipeline.common import ANNOUNCEMENT_FIELD_NAME
from zipline.utils.memoize import lazyval
class EarningsCalendarLoader(EventsLoader):
expected_cols = frozenset([ANNOUNCEMENT_FIELD_NAME])
event_date_col = ANNOUNCEMENT_FIELD_NAME
def __init__(self, all_dates, events_by_sid,
infer_timestamps=False,
dataset=EarningsCalendar):
super(EarningsCalendarLoader, self).__init__(
all_dates, events_by_sid, infer_timestamps, dataset=dataset,
)
@lazyval
def next_announcement_loader(self):
return self._next_event_date_loader(self.dataset.next_announcement)
@lazyval
def previous_announcement_loader(self):
return self._previous_event_date_loader(
self.dataset.previous_announcement,
)
+180 -193
View File
@@ -1,231 +1,218 @@
import abc
import numpy as np
import pandas as pd
from six import iteritems
from toolz import merge
from six import viewvalues
from toolz import groupby, merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from .utils import previous_event_frame, next_event_frame
from zipline.pipeline.common import TS_FIELD_NAME
from zipline.utils.numpy_utils import NaTD
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.utils import (
next_event_indexer,
previous_event_indexer,
)
WRONG_COLS_ERROR = "Expected columns {expected_columns} for sid {sid} but " \
"got columns {resulting_columns}."
WRONG_SINGLE_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have "
"1 column and to be in a DataFrame, "
"Series, or DatetimeIndex.")
def required_event_fields(next_value_columns, previous_value_columns):
"""
Compute the set of resource columns required to serve
``next_value_columns`` and ``previous_value_columns``.
"""
# These metadata columns are used to align event indexers.
return {
TS_FIELD_NAME,
SID_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
viewvalues(next_value_columns),
viewvalues(previous_value_columns),
)
WRONG_MANY_COL_DATA_FORMAT_ERROR = ("Data for sid {sid} is expected to have "
"more than 1 column and to be in a "
"DataFrame.")
SERIES_NO_DTINDEX_ERROR = ("Got Series for sid {sid}, but index was not "
"DatetimeIndex.")
DTINDEX_NOT_INFER_TS_ERROR = ("Got DatetimeIndex for sid {sid}.\n"
"Pass `infer_timestamps=True` to use the first "
"date in `all_dates` as implicit timestamp.")
DF_NO_TS_NOT_INFER_TS_ERROR = ("Got DataFrame without a '{"
"timestamp_column_name}' column for sid {sid}."
"\nPass `infer_timestamps=True` to use the "
"first date in `all_dates` as implicit "
"timestamp.")
def validate_column_specs(events, next_value_columns, previous_value_columns):
"""
Verify that the columns of ``events`` can be used by an EventsLoader to
serve the BoundColumns described by ``next_value_columns`` and
``previous_value_columns``.
"""
required = {
TS_FIELD_NAME,
SID_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
viewvalues(next_value_columns),
viewvalues(previous_value_columns),
)
received = set(events.columns)
missing = required - received
if missing:
raise ValueError(
"EventsLoader missing required columns {missing}.\n"
"Got Columns: {received}\n"
"Expected Columns: {required}".format(
missing=sorted(missing),
received=sorted(received),
required=sorted(required),
)
)
class EventsLoader(PipelineLoader):
"""
Abstract loader.
Base class for PipelineLoaders that supports loading the next and previous
value of an event field.
Does not currently support adjustments to the dates of known events.
Does not currently support adjustments.
Parameters
----------
all_dates : pd.DatetimeIndex
Index of dates for which we can serve queries.
events_by_sid : dict[int -> pd.DataFrame or pd.Series or pd.DatetimeIndex]
Dict mapping sids to objects representing dates on which earnings
occurred.
events : pd.DataFrame
A DataFrame representing events (e.g. share buybacks or
earnings announcements) associated with particular companies.
If a dict value is a Series, it's interpreted as a mapping from the
date on which we learned an announcement was coming to the date on
which the announcement was made.
``events`` must contain at least three columns::
sid : int64
The asset id associated with each event.
If a dict value is a DatetimeIndex, it's interpreted as just containing
the dates that announcements were made, and we assume we knew about the
announcement on all prior dates. This mode is only supported if
``infer_timestamp`` is explicitly passed as a truthy value.
Dict mapping sids to DataFrames, Series, or DatetimeIndexes.
event_date : datetime64[ns]
The date on which the event occurred.
If the value is a DataFrame, it then represents dates on which events
occurred along with other associated values. If the DataFrame
contains a "timestamp" column, that column is interpreted as the date
on which we learned about the event. If the DataFrames do not contain a
"timestamp" column, we assume we knew about the event on all prior
dates. This mode is only supported if ``infer_timestamp`` is
explicitly passed as a truthy value.
timestamp : datetime64[ns]
The date on which we learned about the event.
infer_timestamps : bool, optional
Whether to allow omitting the "timestamp" column.
dataset : DataSet
The DataSet object for which this loader loads data.
next_value_columns : dict[BoundColumn -> str]
Map from dataset columns to raw field names that should be used when
searching for a next event value.
previous_value_columns : dict[BoundColumn -> str]
Map from dataset columns to raw field names that should be used when
searching for a previous event value.
"""
@abc.abstractproperty
def expected_cols(self):
raise NotImplemented('expected_cols')
@abc.abstractproperty
def event_date_col(self):
raise NotImplemented('event_date_col')
def __init__(self,
all_dates,
events_by_sid,
infer_timestamps=False,
dataset=None):
self.all_dates = all_dates
# Do not modify the original in place, since it may be used for other
# purposes.
self.events_by_sid = (
events_by_sid.copy()
events,
next_value_columns,
previous_value_columns):
validate_column_specs(
events,
next_value_columns,
previous_value_columns,
)
dates = self.all_dates.values
for k, v in iteritems(events_by_sid):
# Already a DataFrame
if isinstance(v, pd.DataFrame):
if TS_FIELD_NAME not in v.columns:
if not infer_timestamps:
raise ValueError(
DF_NO_TS_NOT_INFER_TS_ERROR.format(
timestamp_column_name=TS_FIELD_NAME,
sid=k
)
)
self.events_by_sid[k] = v = v.copy()
v.index = [dates[0]] * len(v)
else:
self.events_by_sid[k] = v.set_index(TS_FIELD_NAME)
# Once data is in a DF, make sure columns are correct.
cols_except_ts = (set(v.columns) -
{TS_FIELD_NAME})
events = events[events[EVENT_DATE_FIELD_NAME].notnull()]
# Check that all columns other than timestamp are as expected.
if cols_except_ts != self.expected_cols:
raise ValueError(
WRONG_COLS_ERROR.format(
expected_columns=list(self.expected_cols),
sid=k,
resulting_columns=v.columns.values
)
)
# Not a DataFrame and we only expect 1 column
elif len(self.expected_cols) == 1:
# First, must convert to DataFrame.
if isinstance(v, pd.Series):
if not isinstance(v.index, pd.DatetimeIndex):
raise ValueError(
SERIES_NO_DTINDEX_ERROR.format(sid=k)
)
self.events_by_sid[k] = pd.DataFrame({
list(self.expected_cols)[0]: v})
elif isinstance(v, pd.DatetimeIndex):
if not infer_timestamps:
raise ValueError(
DTINDEX_NOT_INFER_TS_ERROR.format(sid=k)
)
self.events_by_sid[k] = pd.DataFrame({
list(self.expected_cols)[0]: v
}, index=[dates[0]] * len(v))
else:
# We expect 1 column, but we got something other than a
# Series, DatetimeIndex, or DataFrame.
raise ValueError(
WRONG_SINGLE_COL_DATA_FORMAT_ERROR.format(sid=k)
)
else:
# We expected multiple columns, but we got something other
# than a DataFrame.
raise ValueError(
WRONG_MANY_COL_DATA_FORMAT_ERROR.format(sid=k)
)
self.events_by_sid = {sid: df.dropna(subset=[self.event_date_col]) for
sid, df in self.events_by_sid.items()}
self.dataset = dataset
# We always work with entries from ``events`` directly as numpy arrays,
# so we coerce from a frame here.
self.events = {
name: np.asarray(series)
for name, series in events.sort(EVENT_DATE_FIELD_NAME).iteritems()
}
def get_loader(self, column):
if column in self.dataset.columns:
return getattr(self, "%s_loader" % column.name)
raise ValueError("Don't know how to load column '%s'." % column)
# Columns to load with self.load_next_events.
self.next_value_columns = next_value_columns
def load_adjusted_array(self, columns, dates, assets, mask):
return merge(
self.get_loader(column).load_adjusted_array(
[column], dates, assets, mask
# Columns to load with self.load_previous_events.
self.previous_value_columns = previous_value_columns
def split_next_and_previous_event_columns(self, requested_columns):
"""
Split requested columns into columns that should load the next known
value and columns that should load the previous known value.
Parameters
----------
requested_columns : iterable[BoundColumn]
Returns
-------
next_cols, previous_cols : iterable[BoundColumn], iterable[BoundColumn]
``requested_columns``, partitioned into sub-sequences based on
whether the column should produce values from the next event or the
previous event
"""
def next_or_previous(c):
if c in self.next_value_columns:
return 'next'
elif c in self.previous_value_columns:
return 'previous'
raise ValueError(
"{c} not found in next_value_columns "
"or previous_value_columns".format(c=c)
)
for column in columns
groups = groupby(next_or_previous, requested_columns)
return groups.get('next', ()), groups.get('previous', ())
def next_event_indexer(self, dates, sids):
return next_event_indexer(
dates,
sids,
self.events[EVENT_DATE_FIELD_NAME],
self.events[TS_FIELD_NAME],
self.events[SID_FIELD_NAME],
)
def _next_event_date_loader(self, next_date_field):
return DataFrameLoader(
next_date_field,
next_event_frame(
self.events_by_sid,
self.all_dates,
next_date_field.missing_value,
next_date_field.dtype,
self.event_date_col,
self.event_date_col
),
adjustments=None,
def previous_event_indexer(self, dates, sids):
return previous_event_indexer(
dates,
sids,
self.events[EVENT_DATE_FIELD_NAME],
self.events[TS_FIELD_NAME],
self.events[SID_FIELD_NAME],
)
def _next_event_value_loader(self,
next_value_field,
value_field_name):
return DataFrameLoader(
next_value_field,
next_event_frame(
self.events_by_sid,
self.all_dates,
next_value_field.missing_value,
next_value_field.dtype,
self.event_date_col,
value_field_name
),
adjustments=None,
def load_next_events(self, columns, dates, sids, mask):
if not columns:
return {}
return self._load_events(
name_map=self.next_value_columns,
indexer=self.next_event_indexer(dates, sids),
columns=columns,
dates=dates,
sids=sids,
mask=mask,
)
def _previous_event_date_loader(self,
prev_date_field):
return DataFrameLoader(
prev_date_field,
previous_event_frame(
self.events_by_sid,
self.all_dates,
NaTD,
'datetime64[ns]',
self.event_date_col,
self.event_date_col
),
adjustments=None,
def load_previous_events(self, columns, dates, sids, mask):
if not columns:
return {}
return self._load_events(
name_map=self.previous_value_columns,
indexer=self.previous_event_indexer(dates, sids),
columns=columns,
dates=dates,
sids=sids,
mask=mask,
)
def _previous_event_value_loader(self,
previous_value_field,
value_field_name):
return DataFrameLoader(
previous_value_field,
previous_event_frame(
self.events_by_sid,
self.all_dates,
previous_value_field.missing_value,
previous_value_field.dtype,
self.event_date_col,
value_field_name
),
adjustments=None,
def _load_events(self, name_map, indexer, columns, dates, sids, mask):
def to_frame(array):
return pd.DataFrame(array, index=dates, columns=sids)
out = {}
for c in columns:
raw = self.events[name_map[c]][indexer]
# indexer will be -1 for locations where we don't have a known
# value.
raw[indexer < 0] = c.missing_value
# Delegate the actual array formatting logic to a DataFrameLoader.
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c]
return out
def load_adjusted_array(self, columns, dates, sids, mask):
n, p = self.split_next_and_previous_event_columns(columns)
return merge(
self.load_next_events(n, dates, sids, mask),
self.load_previous_events(p, dates, sids, mask),
)
+101 -183
View File
@@ -2,150 +2,131 @@ import datetime
import numpy as np
import pandas as pd
from six import iteritems
from six.moves import zip
from zipline.utils.numpy_utils import categorical_dtype, NaTns
from zipline.utils.pandas_utils import mask_between_time
def next_event_frame(events_by_sid,
dates,
missing_value,
field_dtype,
event_date_field_name,
return_field_name):
def is_sorted_ascending(a):
"""Check if a numpy array is sorted."""
return (np.fmax.accumulate(a) <= a).all()
def validate_event_metadata(event_dates,
event_timestamps,
event_sids):
assert is_sorted_ascending(event_dates), "event dates must be sorted"
assert len(event_sids) == len(event_dates) == len(event_timestamps), \
"mismatched arrays: %d != %d != %d" % (
len(event_sids),
len(event_dates),
len(event_timestamps),
)
def next_event_indexer(all_dates,
all_sids,
event_dates,
event_timestamps,
event_sids):
"""
Make a DataFrame representing the simulated next known dates or values
for an event.
Construct an index array that, when applied to an array of values, produces
a 2D array containing the values associated with the next event for each
sid at each moment in time.
Locations where no next event was known will be filled with -1.
Parameters
----------
dates : pd.DatetimeIndex.
The index of the returned DataFrame.
events_by_sid : dict[int -> pd.Series]
Dict mapping sids to a series of dates. Each k:v pair of the series
represents the date we learned of the event mapping to the date the
event will occur.
event_date_field_name : str
The name of the date field that marks when the event occurred.
all_dates : ndarray[datetime64[ns], ndim=1]
Row labels for the target output.
all_sids : ndarray[int, ndim=1]
Column labels for the target output.
event_dates : ndarray[datetime64[ns], ndim=1]
Dates on which each input events occurred/will occur. ``event_dates``
must be in sorted order, and may not contain any NaT values.
event_timestamps : ndarray[datetime64[ns], ndim=1]
Dates on which we learned about each input event.
event_sids : ndarray[int, ndim=1]
Sids assocated with each input event.
Returns
-------
next_events: pd.DataFrame
A DataFrame where each column is a security from `events_by_sid` where
the values are the dates of the next known event with the knowledge we
had on the date of the index. Entries falling after the last date will
have `NaT` as the result in the output.
See Also
--------
previous_date_frame
indexer : ndarray[int, ndim=2]
An array of shape (len(all_dates), len(all_sids)) of indices into
``event_{dates,timestamps,sids}``.
"""
date_cols = {
equity: np.full_like(dates, NaTns) for equity in events_by_sid
}
value_cols = {
equity: np.full(len(dates), missing_value, dtype=field_dtype)
for equity in events_by_sid
}
validate_event_metadata(event_dates, event_timestamps, event_sids)
out = np.full((len(all_dates), len(all_sids)), -1, dtype=np.int64)
raw_dates = dates.values
for equity, df in iteritems(events_by_sid):
event_dates = df[event_date_field_name]
values = df[return_field_name]
data = date_cols[equity]
if not event_dates.index.is_monotonic_increasing:
event_dates = event_dates.sort_index()
sid_ixs = all_sids.searchsorted(event_sids)
# side='right' here ensures that we include the event date itself
# if it's in all_dates.
dt_ixs = all_dates.searchsorted(event_dates, side='right')
ts_ixs = all_dates.searchsorted(event_timestamps)
# Iterate over the raw Series values, since we're comparing against
# numpy arrays anyway.
iter_date_vals = zip(event_dates.index.values, event_dates.values,
values)
for knowledge_date, event_date, value in iter_date_vals:
date_mask = (
(knowledge_date <= raw_dates) &
(raw_dates <= event_date)
)
value_mask = (event_date <= data) | (data == NaTns)
data_indices = np.where(date_mask & value_mask)
data[data_indices] = event_date
value_cols[equity][data_indices] = value
return pd.DataFrame(index=dates, data=value_cols)
# Walk backward through the events, writing the index of the event into
# slots ranging from the event's timestamp to its asof. This depends for
# correctness on the fact that event_dates is sorted in ascending order,
# because we need to overwrite later events with earlier ones if their
# eligible windows overlap.
for i in range(len(event_sids) - 1, -1, -1):
start_ix = ts_ixs[i]
end_ix = dt_ixs[i]
out[start_ix:end_ix, sid_ixs[i]] = i
return out
def previous_event_frame(events_by_sid,
date_index,
missing_value,
field_dtype,
event_date_field,
previous_return_field):
def previous_event_indexer(all_dates,
all_sids,
event_dates,
event_timestamps,
event_sids):
"""
Make a DataFrame representing simulated previous dates or values for an
event.
Construct an index array that, when applied to an array of values, produces
a 2D array containing the values associated with the previous event for
each sid at each moment in time.
Locations where no previous event was known will be filled with -1.
Parameters
----------
events_by_sid : dict[int -> DatetimeIndex]
Dict mapping sids to a series of dates. Each k:v pair of the series
represents the date we learned of the event mapping to the date the
event will occur.
date_index : DatetimeIndex.
The index of the returned DataFrame.
missing_value : any
Data which missing values should be filled with.
field_dtype: any
The dtype of the field for which the previous values are being
retrieved.
event_date_field: str
The name of the date field that marks when the event occurred.
return_field: str
The name of the field for which the previous values are being
retrieved.
all_dates : ndarray[datetime64[ns], ndim=1]
Row labels for the target output.
all_sids : ndarray[int, ndim=1]
Column labels for the target output.
event_dates : ndarray[datetime64[ns], ndim=1]
Dates on which each input events occurred/will occur. ``event_dates``
must be in sorted order, and may not contain any NaT values.
event_timestamps : ndarray[datetime64[ns], ndim=1]
Dates on which we learned about each input event.
event_sids : ndarray[int, ndim=1]
Sids assocated with each input event.
Returns
-------
previous_events: pd.DataFrame
A DataFrame where each column is a security from `events_by_sid` and
the values are the values for the previous event that occurred on the
date of the index. Entries falling before the first date will have
`missing_value` filled in as the result in the output.
See Also
--------
next_date_frame
indexer : ndarray[int, ndim=2]
An array of shape (len(all_dates), len(all_sids)) of indices into
``event_{dates,timestamps,sids}``.
"""
sids = list(events_by_sid)
populate_value = None if field_dtype == categorical_dtype else \
missing_value
out = np.full(
(len(date_index), len(sids)),
populate_value,
dtype=field_dtype
)
d_n = date_index[-1].asm8
for col_idx, sid in enumerate(sids):
# events_by_sid[sid] is a DataFrame mapping knowledge_date to event
# date and values.
df = events_by_sid[sid]
df = df[df[event_date_field] <= d_n]
event_date_vals = df[event_date_field].values
# Get knowledge dates corresponding to the values in which we are
# interested
kd_vals = df[df[event_date_field] <= d_n].index.values
# The date at which a previous event is first known is the max of the
# kd and the event date.
index_dates = np.maximum(kd_vals, event_date_vals)
out[
date_index.searchsorted(index_dates), col_idx
] = df[previous_return_field]
validate_event_metadata(event_dates, event_timestamps, event_sids)
out = np.full((len(all_dates), len(all_sids)), -1, dtype=np.int64)
frame = pd.DataFrame(out, index=date_index, columns=sids)
frame.ffill(inplace=True)
if field_dtype == categorical_dtype:
frame[frame.isnull()] = missing_value
return frame
eff_dts = np.maximum(event_dates, event_timestamps)
sid_ixs = all_sids.searchsorted(event_sids)
dt_ixs = all_dates.searchsorted(eff_dts)
# Walk backwards through the events, writing the index of the event into
# slots ranging from max(event_date, event_timestamp) to the start of the
# previously-written event. This depends for correctness on the fact that
# event_dates is sorted in ascending order, because we need to have written
# later events so we know where to stop forward-filling earlier events.
last_written = {}
for i in range(len(event_dates) - 1, -1, -1):
sid_ix = sid_ixs[i]
dt_ix = dt_ixs[i]
out[dt_ix:last_written.get(sid_ix, None), sid_ix] = i
last_written[sid_ix] = dt_ix
return out
def normalize_data_query_time(dt, time, tz):
@@ -287,66 +268,3 @@ def check_data_query_args(data_query_time, data_query_tz):
data_query_tz,
),
)
def zip_with_floats(dates, flts):
return pd.Series(flts, index=dates, dtype='float')
def zip_with_strs(dates, strs):
return pd.Series(strs, index=dates, dtype='object')
def zip_with_dates(index_dates, dts):
return pd.Series(pd.to_datetime(dts), index=index_dates)
def get_values_for_date_ranges(zip_date_index_with_vals,
vals_for_date_intervals,
starts,
ends,
date_index):
"""
Returns a Series of values indexed by date based on the intervals defined
by the start and end dates.
Parameters
----------
zip_date_index_with_vals : callable
A function that takes in a list of dates and a list of values and
returns a pd.Series with the values indexed by the dates.
vals_for_date_intervals : list
A list of values for each date interval in `date_intervals`.
starts : DatetimeIndex
A DatetimeIndex of start dates.
ends : list
A DatetimeIndex of end dates.
date_index : DatetimeIndex
The DatetimeIndex containing all dates for which values were requested.
Returns
-------
date_index_with_vals : pd.Series
A Series indexed by the given DatetimeIndex and with values assigned
to dates based on the given date intervals.
"""
# Fill in given values for given date ranges.
end_indexes = date_index.values.searchsorted(ends)
start_indexes = date_index.values.searchsorted(starts)
num_days = (end_indexes - start_indexes) + 1
# In case any of the end dates falls on days missing from the date_index,
# searchsorted will have placed their index within `date_index` to the
# index of the next start date, so we will have added 1 extra day for
# each of these. Subtract those extra days, but ignore any cases where the
# start and end dates are equal. Note: if any of the start dates is
# missing, it won't affect calculations because searchsorted will advance
# the index to the next date within the same range.
num_days[np.where(~np.in1d(ends, date_index) & (num_days != 0))] -= 1
return zip_date_index_with_vals(
date_index,
np.repeat(
vals_for_date_intervals,
num_days,
)
)
-1
View File
@@ -23,7 +23,6 @@ from .core import ( # noqa
empty_asset_finder,
empty_assets_db,
empty_trading_env,
gen_calendars,
make_test_handler,
make_trade_data_for_asset_info,
parameter_space,
+2 -314
View File
@@ -1,14 +1,10 @@
from abc import ABCMeta, abstractproperty
import sqlite3
from unittest import TestCase
from contextlib2 import ExitStack
from logbook import NullHandler, Logger
from nose_parameterized import parameterized
from pandas.util.testing import assert_series_equal
from six import with_metaclass
from toolz import flip
import numpy as np
import pandas as pd
import responses
@@ -36,17 +32,9 @@ from ..finance.trading import TradingEnvironment
from ..utils import factory
from ..utils.classproperty import classproperty
from ..utils.final import FinalMeta, final
from ..utils.metautils import with_metaclasses
from .core import tmp_asset_finder, make_simple_equity_info, gen_calendars
from zipline.pipeline import Pipeline, SimplePipelineEngine
from .core import tmp_asset_finder, make_simple_equity_info
from zipline.pipeline import SimplePipelineEngine
from zipline.pipeline.loaders.testing import make_seeded_random_loader
from zipline.utils.numpy_utils import make_datetime64D
from zipline.utils.numpy_utils import NaTD
from zipline.pipeline.common import TS_FIELD_NAME
from zipline.pipeline.loaders.utils import (
get_values_for_date_ranges,
zip_with_dates
)
from zipline.utils.calendars import (
get_calendar,
ExchangeTradingSchedule,
@@ -908,182 +896,6 @@ class WithAdjustmentReader(WithBcolzDailyBarReader):
cls.adjustment_reader = SQLiteAdjustmentReader(conn)
class WithPipelineEventDataLoader(
with_metaclasses((type(ZiplineTestCase), ABCMeta), WithAssetFinder)):
"""
ZiplineTestCase mixin providing common test methods/behaviors for event
data loaders.
Attributes
----------
loader_type : PipelineLoader
The type of loader to use. This must be overridden by subclasses.
Methods
-------
get_sids() -> iterable[int]
Class method which returns the sids that need to be available to the
tests.
get_dataset() -> dict[int -> pd.DataFrmae]
Class method which returns a mapping from sid to data for that sid.
By default this is empty for every sid.
pipeline_event_loader_args(dates: pd.DatetimeIndex) -> tuple[any]
The arguments to pass to the ``loader_type`` to construct the pipeline
loader for this test.
"""
@classmethod
def get_sids(cls):
return range(0, 5)
@classmethod
def get_dataset(cls):
return {sid: pd.DataFrame() for sid in cls.get_sids()}
@abstractproperty
def loader_type(self):
raise NotImplementedError('loader_type')
@classmethod
def make_equity_info(cls):
return make_simple_equity_info(
cls.get_sids(),
start_date=pd.Timestamp('2013-01-01', tz='UTC'),
end_date=pd.Timestamp('2015-01-01', tz='UTC'),
)
def pipeline_event_loader_args(self, dates):
"""Construct the base object to pass to the loader.
Parameters
----------
dates : pd.DatetimeIndex
The dates we can serve.
Returns
-------
args : tuple[any]
The arguments to forward to the loader positionally.
"""
return dates, self.get_dataset()
def pipeline_event_setup_engine(self, dates):
"""
Make a Pipeline Enigne object based on the given dates.
"""
loader = self.loader_type(*self.pipeline_event_loader_args(dates))
return SimplePipelineEngine(lambda _: loader, dates, self.asset_finder)
def get_sids_to_frames(self,
zip_date_index_with_vals,
vals,
date_intervals,
dates,
dtype_name,
missing_dtype):
"""
Construct a DataFrame that maps sid to the expected values for the
given dates.
Parameters
----------
zip_date_index_with_vals: callable
A function that returns a series of `vals` repeated based on the
number of days in the date interval for each val, indexed by the
dates in `dates`.
vals: iterable
An iterable with values that correspond to each interval in
`date_intervals`.
date_intervals: list
A list of date intervals for each sid that correspond to values in
`vals`.
dates: DatetimeIndex
The dates which will serve as the index for each Series for each
sid in the DataFrame.
dtype_name: str
The name of the dtype of the values in `vals`.
missing_dtype: str
The name of the value that should be used as the missing value
for the dtype of `vals` - e.g., 'NaN' for floats.
"""
frame = pd.DataFrame({sid: get_values_for_date_ranges(
zip_date_index_with_vals,
vals[sid],
pd.DatetimeIndex(list(zip(*date_intervals[sid]))[0]),
pd.DatetimeIndex(list(zip(*date_intervals[sid]))[1]),
dates
).astype(dtype_name) for sid in self.get_sids()[:-1]})
frame[self.get_sids()[-1]] = zip_date_index_with_vals(
dates, [missing_dtype] * len(dates)
).astype(dtype_name)
return frame
@staticmethod
def _compute_busday_offsets(announcement_dates):
"""
Compute expected business day offsets from a DataFrame of announcement
dates.
"""
# Column-vector of dates on which factor `compute` will be called.
raw_call_dates = announcement_dates.index.values.astype(
'datetime64[D]'
)[:, None]
# 2D array of dates containining expected nexg announcement.
raw_announce_dates = (
announcement_dates.values.astype('datetime64[D]')
)
# Set NaTs to 0 temporarily because busday_count doesn't support NaT.
# We fill these entries with NaNs later.
whereNaT = raw_announce_dates == NaTD
raw_announce_dates[whereNaT] = make_datetime64D(0)
# The abs call here makes it so that we can use this function to
# compute offsets for both next and previous earnings (previous
# earnings offsets come back negative).
expected = abs(np.busday_count(
raw_call_dates,
raw_announce_dates
).astype(float))
expected[whereNaT] = np.nan
return pd.DataFrame(
data=expected,
columns=announcement_dates.columns,
index=announcement_dates.index,
)
@parameterized.expand(gen_calendars(
'2014-01-01',
'2014-01-31',
critical_dates=pd.to_datetime([
'2014-01-05',
'2014-01-10',
'2014-01-15',
'2014-01-20',
], utc=True),
))
def test_compute(self, dates):
engine = self.pipeline_event_setup_engine(dates)
cols = self.setup(dates)
pipe = Pipeline(
columns=self.pipeline_columns
)
result = engine.run_pipeline(
pipe,
start_date=dates[0],
end_date=dates[-1],
)
for sid in self.get_sids():
for col_name in cols.keys():
assert_series_equal(result[col_name].unstack(1)[sid],
cols[col_name][sid],
check_names=False)
class WithSeededRandomPipelineEngine(WithNYSETradingDays, WithAssetFinder):
"""
ZiplineTestCase mixin providing class-level fixtures for running pipelines
@@ -1239,127 +1051,3 @@ class WithResponses(object):
self.responses = self.enter_instance_context(
responses.RequestsMock(),
)
class WithNextAndPreviousEventDataLoader(WithPipelineEventDataLoader):
"""
ZiplineTestCase mixin extending common functionality for event data
loader tests that have both next and previous events.
`base_cases` should be used as the template to test cases that combine
knowledge date (timestamp) and some 'other_date' in various ways.
`next_date_intervals` gives the date intervals for the next event based
on the dates given in `base_cases`.
`next_dates` gives the next date from `other_date` which is known about at
each interval.
`prev_date_intervals` gives the date intervals for each sid for the
previous event based on the dates given in `base_cases`.
`prev_dates` gives the previous date from `other_date` which is known
about at each interval.
`get_expected_previous_event_dates` is a convenience function that fills
a DataFrame with the previously known dates for each sid for the given
dates.
`get_expected_next_event_dates` is a convenience function that fills
a DataFrame with the next known dates for each sid for the given
dates.
"""
base_cases = [
# K1--K2--A1--A2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
'other_date': pd.to_datetime(['2014-01-15', '2014-01-20']),
}),
# K1--K2--A2--A1.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-10']),
'other_date': pd.to_datetime(['2014-01-20', '2014-01-15']),
}),
# K1--A1--K2--A2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05', '2014-01-15']),
'other_date': pd.to_datetime(['2014-01-10', '2014-01-20']),
}),
# K1 == K2.
pd.DataFrame({
TS_FIELD_NAME: pd.to_datetime(['2014-01-05'] * 2),
'other_date': pd.to_datetime(['2014-01-10', '2014-01-15']),
}),
pd.DataFrame(
columns=['other_date',
TS_FIELD_NAME],
dtype='datetime64[ns]'
),
]
next_date_intervals = [
[['2014-01-01', '2014-01-04'],
['2014-01-05', '2014-01-15'],
['2014-01-16', '2014-01-20'],
['2014-01-21', '2014-01-31']],
[['2014-01-01', '2014-01-04'],
['2014-01-05', '2014-01-09'],
['2014-01-10', '2014-01-15'],
['2014-01-16', '2014-01-20'],
['2014-01-21', '2014-01-31']],
[['2014-01-01', '2014-01-04'],
['2014-01-05', '2014-01-10'],
['2014-01-11', '2014-01-14'],
['2014-01-15', '2014-01-20'],
['2014-01-21', '2014-01-31']],
[['2014-01-01', '2014-01-04'],
['2014-01-05', '2014-01-10'],
['2014-01-11', '2014-01-15'],
['2014-01-16', '2014-01-31']]
]
next_dates = [
['NaT', '2014-01-15', '2014-01-20', 'NaT'],
['NaT', '2014-01-20', '2014-01-15', '2014-01-20', 'NaT'],
['NaT', '2014-01-10', 'NaT', '2014-01-20', 'NaT'],
['NaT', '2014-01-10', '2014-01-15', 'NaT'],
['NaT']
]
prev_date_intervals = [
[['2014-01-01', '2014-01-14'],
['2014-01-15', '2014-01-19'],
['2014-01-20', '2014-01-31']],
[['2014-01-01', '2014-01-14'],
['2014-01-15', '2014-01-19'],
['2014-01-20', '2014-01-31']],
[['2014-01-01', '2014-01-09'],
['2014-01-10', '2014-01-19'],
['2014-01-20', '2014-01-31']],
[['2014-01-01', '2014-01-09'],
['2014-01-10', '2014-01-14'],
['2014-01-15', '2014-01-31']]
]
prev_dates = [
['NaT', '2014-01-15', '2014-01-20'],
['NaT', '2014-01-15', '2014-01-20'],
['NaT', '2014-01-10', '2014-01-20'],
['NaT', '2014-01-10', '2014-01-15'],
['NaT']
]
def get_expected_previous_event_dates(self, dates, dtype_name,
missing_dtype):
return self.get_sids_to_frames(
zip_with_dates,
self.prev_dates,
self.prev_date_intervals,
dates,
dtype_name,
missing_dtype
)
def get_expected_next_event_dates(self, dates, dtype_name, missing_dtype):
return self.get_sids_to_frames(
zip_with_dates,
self.next_dates,
self.next_date_intervals,
dates,
dtype_name,
missing_dtype
)
+41
View File
@@ -1,3 +1,4 @@
import datetime
from functools import partial
import inspect
@@ -339,6 +340,46 @@ def assert_adjustment_equal(result, expected, path=(), **kwargs):
)
@assert_equal.register(
(datetime.datetime, np.datetime64),
(datetime.datetime, np.datetime64),
)
def assert_timestamp_and_datetime_equal(result,
expected,
path=(),
msg='',
allow_datetime_coercions=False,
compare_nat_equal=True,
**kwargs):
"""
Branch for comparing python datetime (which includes pandas Timestamp) and
np.datetime64 as equal.
Returns raises unless ``allow_datetime_coercions`` is passed as True.
"""
assert allow_datetime_coercions or type(result) == type(expected), (
"%sdatetime types (%s, %s) don't match and "
"allow_datetime_coercions was not set.\n%s" % (
_fmt_msg(msg),
type(result),
type(expected),
_fmt_path(path),
)
)
result = pd.Timestamp(result)
expected = pd.Timestamp(result)
if compare_nat_equal and pd.isnull(result) and pd.isnull(expected):
return
assert_equal.dispatch(object, object)(
result,
expected,
path=path,
**kwargs
)
try:
# pull the dshape cases in
from datashape.util.testing import assert_dshape_equal