TST: add tests for quarter estimates

MAINT: modify algorithm for calculating previous releases

BUG: fix quarter calculation logic
This commit is contained in:
Maya Tydykov
2016-08-09 09:52:09 -04:00
parent 1c375806e7
commit afc5297fe3
6 changed files with 326 additions and 140 deletions
+177 -13
View File
@@ -1,14 +1,178 @@
def test_shift_quarters_forward():
quarters = list(range(1, 5))
shifts = list(range(5))
expected = [(x, i) for ]
expected = ((0, 1), (0, 2), (0, 3), (0, 4), (1, 1),
(0, 2), (0, 3), (0, 4), (1, 1), (1, 2))
for quarter in quarters:
for shift in shifts:
yrs_to_shift, new_qtr = EstimizeLoader.calc_forward_shift(quarter,
shift)
if quarter + shift <= 4:
assert yrs_to_shift == 0
assert new_qtr == quarter + shift
from itertools import product
import numpy as np
import pandas as pd
from zipline.pipeline import SimplePipelineEngine, Pipeline
from zipline.pipeline.data import DataSet, Column
from zipline.pipeline.loaders.quarter_estimates import \
NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader
from zipline.testing import ZiplineTestCase
from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions
from zipline.testing.predicates import assert_equal
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
class Estimates(DataSet):
event_date = Column(dtype=datetime64ns_dtype)
fiscal_quarter = Column(dtype=float64_dtype)
fiscal_year = Column(dtype=float64_dtype)
estimate = Column(dtype=float64_dtype)
value = Column(dtype=float64_dtype)
def QuartersEstimates(num_qtr):
class QtrEstimates(Estimates):
num_quarters = num_qtr
name=Estimates
return QtrEstimates
# Final release dates never change
releases = pd.DataFrame({
'sid': [1, 1],
'timestamp': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-4-20')],
'event_date': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-04-20')],
'estimate': [0.5, 0.8],
'value': [0.6, 0.9],
'fiscal_quarter': [1, 2],
'fiscal_year': [2015, 2015]
})
estimates = pd.DataFrame({
'sid': [1, 1, 1, 1],
'timestamp': [pd.Timestamp('2015-01-02'),
pd.Timestamp('2015-01-10'),
pd.Timestamp('2015-04-02'),
pd.Timestamp('2015-4-10')],
'event_date': [pd.Timestamp('2015-01-20'),
pd.Timestamp('2015-01-20'),
pd.Timestamp('2015-04-20'),
pd.Timestamp('2015-04-20')],
'estimate': [.1, .2, .3, .4],
'value': [np.NaN, np.NaN, np.NaN, np.NaN],
'fiscal_quarter': [1, 1, 2, 2],
'fiscal_year': [2015, 2015, 2015, 2015]
})
events = pd.concat([releases, estimates])
class NextEstimateTestCase(WithAssetFinder,
WithTradingSessions,
ZiplineTestCase):
START_DATE = pd.Timestamp('2015-01-01')
END_DATE = pd.Timestamp('2015-04-30')
@classmethod
def make_loader(cls, events, columns):
return NextQuartersEstimatesLoader(events, columns)
@classmethod
def init_class_fixtures(cls):
cls.events = events
cls.columns = {
Estimates.estimate: 'estimate',
Estimates.event_date: 'event_date',
Estimates.fiscal_quarter: 'fiscal_quarter',
Estimates.fiscal_year: 'fiscal_year',
Estimates.value: 'value',
}
cls.loader = cls.make_loader(
events=cls.events,
columns=cls.columns
)
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique())
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
]
super(NextEstimateTestCase, cls).init_class_fixtures()
def test_regular(self):
dataset = QuartersEstimates(1)
engine = SimplePipelineEngine(
lambda x: self.loader,
self.trading_days,
self.asset_finder,
)
results = engine.run_pipeline(
Pipeline({c.name: c.latest for c in dataset.columns}),
start_date=self.trading_days[0],
end_date=self.trading_days[-1],
)
sid_events = results.xs(1, level=1)
ed_sorted_events = self.events.sort(['event_date', 'timestamp'])
for i, date in enumerate(sid_events.index):
# Get all upcoming events that we know about on 'date'
eligible_timestamps = ed_sorted_events[ed_sorted_events['timestamp']
<= date]
eligible_events = eligible_timestamps[eligible_timestamps['event_date'] >= date]
if not eligible_events.empty:
smallest_event_date = eligible_events.iloc[0]['event_date']
expected_event = eligible_events[eligible_events['event_date'] == smallest_event_date].iloc[-1]
for colname in sid_events.columns:
expected_value = expected_event[colname]
computed_value = sid_events.iloc[i][colname]
assert_equal(expected_value, computed_value)
else:
assert sid_events.iloc[i].isnull().all()
class PreviousEstimateTestCase(WithAssetFinder,
WithTradingSessions,
ZiplineTestCase):
START_DATE = pd.Timestamp('2015-01-01')
END_DATE = pd.Timestamp('2015-04-30')
@classmethod
def make_loader(cls, events, columns):
return PreviousQuartersEstimatesLoader(events, columns)
@classmethod
def init_class_fixtures(cls):
cls.events = events
cls.columns = {
Estimates.estimate: 'estimate',
Estimates.event_date: 'event_date',
Estimates.fiscal_quarter: 'fiscal_quarter',
Estimates.fiscal_year: 'fiscal_year',
Estimates.value: 'value',
}
cls.loader = cls.make_loader(
events=cls.events,
columns=cls.columns
)
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique())
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
]
super(PreviousEstimateTestCase, cls).init_class_fixtures()
def test_regular(self):
dataset = QuartersEstimates(1)
engine = SimplePipelineEngine(
lambda x: self.loader,
self.trading_days,
self.asset_finder,
)
results = engine.run_pipeline(
Pipeline({c.name: c.latest for c in dataset.columns}),
start_date=self.trading_days[0],
end_date=self.trading_days[-1],
)
sid_events = results.xs(1, level=1)
ed_sorted_events = self.events.sort(['event_date', 'timestamp'])
for i, date in enumerate(sid_events.index):
# Filter for events that happened on or before the simulation
# date and that we knew about on or before the simulation date.
ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date]
ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date]
if not ts_eligible_events.empty:
# The expected event is the one we knew about last.
expected_event = ts_eligible_events.iloc[-1]
for colname in sid_events.columns:
expected_value = expected_event[colname]
computed_value = sid_events.iloc[i][colname]
assert_equal(expected_value, computed_value)
else:
assert sid_events.iloc[i].isnull().all()
+31 -1
View File
@@ -1219,6 +1219,36 @@ def bind_expression_to_resources(expr, resources):
})
def load_raw_data(assets, dates, data_query_time, data_query_tz, expr,
odo_kwargs):
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
expr,
lower_dt,
upper_dt,
odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
return raw
def ffill_query_in_range(expr,
lower,
upper,
@@ -1273,4 +1303,4 @@ def ffill_query_in_range(expr,
**odo_kwargs
)
raw.loc[:, ts_field] = raw.loc[:, ts_field].astype('datetime64[ns]')
return raw
return raw
+2 -9
View File
@@ -2,24 +2,17 @@ from datashape import istabular
from .core import (
bind_expression_to_resources,
ffill_query_in_range,
load_raw_data,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.events import (
EventsLoader,
required_event_fields,
)
from zipline.pipeline.common import (
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.quarter_estimates import \
NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
load_raw_data)
)
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
+2 -10
View File
@@ -2,22 +2,14 @@ from datashape import istabular
from .core import (
bind_expression_to_resources,
ffill_query_in_range,
load_raw_data,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.events import (
EventsLoader,
required_event_fields,
)
from zipline.pipeline.common import (
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.utils import (
check_data_query_args,
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
load_raw_data)
from zipline.pipeline.loaders.utils import check_data_query_args
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
+58 -77
View File
@@ -1,11 +1,17 @@
from itertools import groupby
import numpy as np
import pandas as pd
from six import viewvalues
from zipline.pipeline.common import AD_FIELD_NAME, SID_FIELD_NAME, \
EVENT_DATE_FIELD_NAME, FISCAL_QUARTER_FIELD_NAME, FISCAL_YEAR_FIELD_NAME
from toolz import groupby
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.pipeline.loaders.utils import calc_backward_shift, \
calc_forward_shift
def required_event_fields(columns):
@@ -15,7 +21,7 @@ def required_event_fields(columns):
"""
# These metadata columns are used to align event indexers.
return {
AD_FIELD_NAME,
TS_FIELD_NAME,
SID_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
@@ -48,23 +54,6 @@ def validate_column_specs(events, columns):
)
def calc_forward_shift(qtr, num_shifts):
yrs_to_shift, new_qtr = divmod(qtr + num_shifts, 4)
if yrs_to_shift == 1 and new_qtr == 0:
yrs_to_shift = 0
new_qtr = 4
return yrs_to_shift, new_qtr
def calc_backward_shift(qtr, num_shifts):
yrs_to_shift, new_qtr = divmod(abs(num_shifts - qtr), 4)
if yrs_to_shift == 0 and new_qtr == 0:
yrs_to_shift = 1
new_qtr = 4
yrs_to_shift = -yrs_to_shift
return yrs_to_shift, new_qtr
class QuarterEstimatesLoader(PipelineLoader):
def __init__(self,
events,
@@ -75,60 +64,45 @@ class QuarterEstimatesLoader(PipelineLoader):
)
self.events = events[
events[EVENT_DATE_FIELD_NAME].notnull() and
events[FISCAL_QUARTER_FIELD_NAME].notnull() and
events[EVENT_DATE_FIELD_NAME].notnull() &
events[FISCAL_QUARTER_FIELD_NAME].notnull() &
events[FISCAL_YEAR_FIELD_NAME].notnull()
]
self.columns = columns
def load_quarters(self, next_releases, num_quarters, dates_sids, gb):
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
pass
def load_adjusted_array(self, columns, dates, assets, mask):
groups = groupby(lambda x: x.dataset.num_quarters, columns)
out = {}
date_values = pd.DataFrame(dates, columns=['dates'])
date_values = pd.DataFrame({'dates': dates})
date_values['key'] = 1
self.events['key'] = 1
merged = pd.merge(date_values, self.events, on='key')
asset_df = pd.DataFrame(assets, columns=['sid'])
asset_df = pd.DataFrame({'sid': assets})
asset_df['key'] = 1
dates_sids = pd.merge(date_values, asset_df, on='key')
for num_quarters in groups:
columns = groups[num_quarters]
# First, group by sid, fiscal year, and fiscal quarter and only
# keep the last estimate made.
final_releases_per_qtr = merged[merged.asof_date <=
final_releases_per_qtr = merged[merged[TS_FIELD_NAME] <=
merged.dates].sort(
['dates', 'asof_date']
['dates', TS_FIELD_NAME]
).groupby(
['dates', 'sid', 'fiscal_year', 'fiscal_quarter']
).last()
gb = final_releases_per_qtr.reset_index().groupby(['dates', 'sid'])
# Split the date-sid combinations into ones with a next release
# and ones without
eligible_next_releases = pd.concat([group[1] for group in gb if (
group[1][EVENT_DATE_FIELD_NAME] >= group[1]['dates']
).any()])
final_releases_per_qtr = final_releases_per_qtr.reset_index()
eligible_next_releases.sort(EVENT_DATE_FIELD_NAME)
# For each sid, get the next release/year/quarter that we care
# about.
next_releases = eligible_next_releases.groupby(
['dates', 'sid']
).min()
next_releases = next_releases.rename(
columns={'fiscal_year': 'next_fiscal_year',
'fiscal_quarter': 'next_fiscal_quarter'}
)
result = self.load_quarters(next_releases,
num_quarters,
dates_sids)
result = self.load_quarters(num_quarters,
dates_sids,
final_releases_per_qtr)
for c in columns:
column_name = self.columns[c.name]
super_col = getattr(c.dataset.__base__, c.name)
column_name = self.columns[super_col]
# Need to pass a DataFrame that has dates as the index and
# all sids as columns with column values being the value in
# 'result' for column c
@@ -147,9 +121,24 @@ class NextQuartersEstimatesLoader(QuarterEstimatesLoader):
def __init__(self,
events,
columns):
super(NextQuartersEstimatesLoader).__init__(events, columns)
super(NextQuartersEstimatesLoader, self).__init__(events, columns)
def load_quarters(self, next_releases, num_quarters, dates_sids, gb):
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
# Filter for releases that are after each simulation date.
eligible_next_releases = final_releases_per_qtr[
final_releases_per_qtr[EVENT_DATE_FIELD_NAME] >=
final_releases_per_qtr['dates']
]
eligible_next_releases.sort(EVENT_DATE_FIELD_NAME)
# For each sid, get the upcoming release/year/quarter.
next_releases = eligible_next_releases.groupby(
['dates', 'sid']
).min()
next_releases = next_releases.rename(
columns={'fiscal_year': 'next_fiscal_year',
'fiscal_quarter': 'next_fiscal_quarter'}
)
# `next_qtr` is already the next quarter over,
# so we should offest `num_shifts` by 1.
next_releases['fiscal_quarter'] = next_releases.apply(
@@ -175,47 +164,39 @@ class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader):
def __init__(self,
events,
columns):
super(PreviousQuartersEstimatesLoader).__init__(events, columns)
super(PreviousQuartersEstimatesLoader, self).__init__(events, columns)
def load_quarters(self, next_releases, num_quarters, dates_sids, gb):
next_releases['fiscal_quarter'] = next_releases.apply(
lambda x: calc_backward_shift(x['next_fiscal_quarter'],
num_quarters)[1],
axis=1
)
next_releases['fiscal_year'] = next_releases.apply(
lambda x:
x['next_fiscal_year'] +
calc_backward_shift(x['next_fiscal_quarter'],
num_quarters)[0],
axis=1
)
only_previous_releases = pd.concat([group[1] for group in gb if (
group[1][EVENT_DATE_FIELD_NAME] < group[1]['dates']
).all()])
only_previous_releases.sort(EVENT_DATE_FIELD_NAME)
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
# Filter for releases that are before each simulation date.
eligible_previous_releases = final_releases_per_qtr[
final_releases_per_qtr[EVENT_DATE_FIELD_NAME] <=
final_releases_per_qtr['dates']
]
eligible_previous_releases.sort(EVENT_DATE_FIELD_NAME)
# For each sid, get the latest release we knew about prior to
# each simulation date.
previous_releases = only_previous_releases.groupby(['dates',
'sid']).max()
previous_releases = eligible_previous_releases.groupby(
['dates', 'sid']
).max()
previous_releases = previous_releases.rename(columns={
'fiscal_year': 'previous_fiscal_year',
'fiscal_quarter': 'previous_fiscal_quarter'
})
previous_releases['fiscal_quarter'] = previous_releases.apply(
lambda x: calc_backward_shift(x['previous_fiscal_quarter'],
num_quarters)[1],
(num_quarters - 1))[1],
axis=1
)
previous_releases['fiscal_year'] = previous_releases.apply(
lambda x:
x['previous_fiscal_year'] +
x['previous_fiscal_year'] -
calc_backward_shift(x['previous_fiscal_quarter'],
num_quarters)[0],
(num_quarters - 1))[0],
axis=1
)
all_releases = pd.concat([next_releases, previous_releases])
# Merge to get the rows we care about for each date
result = dates_sids.merge(all_releases.reset_index(),
result = dates_sids.merge(previous_releases.reset_index(),
on=(['dates', 'sid']), how='left')
return result
+56 -30
View File
@@ -2,8 +2,6 @@ import datetime
import numpy as np
import pandas as pd
from zipline.pipeline.common import TS_FIELD_NAME, SID_FIELD_NAME
from zipline.pipeline.loaders.blaze.core import ffill_query_in_range
from zipline.utils.pandas_utils import mask_between_time
@@ -276,31 +274,59 @@ def check_data_query_args(data_query_time, data_query_tz):
)
def load_raw_data(assets, dates, data_query_time, data_query_tz, expr,
odo_kwargs):
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
expr,
lower_dt,
upper_dt,
odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
return raw
def calc_forward_shift(qtr, num_qtrs_shift):
"""
Calculate the number of years to shift forward and the new quarter in the
shifted year.
Parameters
----------
qtr : int
The starting quarter.
num_qtr_shift : int
The number of quarters to shift forward.
Returns
-------
yrs_to_shift : int
The number of years to shift forward.
new_qtr : int
The quarter number of the new quarter after shifting num_qtrs_shift
forward from qtr.
"""
yrs_to_shift, new_qtr = divmod(qtr + num_qtrs_shift, 4)
if new_qtr == 0:
yrs_to_shift -= 1
new_qtr = 4
return yrs_to_shift, new_qtr
def calc_backward_shift(qtr, num_qtrs_shift):
"""
Calculate the number of years to shift backward and the new quarter in the
shifted year.
Parameters
----------
qtr : int
The starting quarter.
num_qtr_shift : int
The number of quarters to shift backward.
Returns
-------
yrs_to_shift : int
The number of years to shift backward.
new_qtr : int
The quarter number of the new quarter after shifting num_qtrs_shift
backward from qtr.
"""
if qtr > num_qtrs_shift:
return 0, qtr - num_qtrs_shift
# num_qtrs_shift >= qtr; subtract to offset qtr, then calculate how many
# years/quarters to subtract.
yrs_to_shift, subtract_qtr = divmod(abs(num_qtrs_shift - qtr), 4)
# Must add 1 year since we go backwards at least `qtr` number of quarters
yrs_to_shift += 1
new_qtr = 4 - subtract_qtr
return yrs_to_shift, new_qtr