TST: add cases for shifting release dates

BUG: fix bugs in blaze loader

BUG: call correct method

MAINT: explicitly cast dates column

MAINT: modify code to comply with pandas 0.16.1
This commit is contained in:
Maya Tydykov
2016-08-16 08:28:50 -04:00
parent f1c07708cd
commit ef350f3889
8 changed files with 372 additions and 323 deletions
+176 -81
View File
@@ -1,24 +1,30 @@
import blaze as bz
import itertools
import numpy as np
import pandas as pd
from pandas.util.testing import assert_series_equal
from zipline.pipeline import SimplePipelineEngine, Pipeline
from zipline.pipeline import SimplePipelineEngine, Pipeline
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.data import DataSet, Column
from zipline.pipeline.loaders.blaze.estimates import (
BlazeNextEstimatesLoader,
BlazePreviousEstimatesLoader
)
from zipline.pipeline.loaders.quarter_estimates import (
NextQuartersEstimatesLoader,
PreviousQuartersEstimatesLoader
)
from zipline.pipeline.loaders.quarter_estimates import (
calc_forward_shift,
calc_backward_shift
)
from zipline.pipeline.loaders.quarter_estimates import shift_quarters
from zipline.testing import ZiplineTestCase
from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions
from zipline.testing.predicates import assert_equal
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
import line_profiler
prof = line_profiler.LineProfiler()
class Estimates(DataSet):
@@ -39,12 +45,13 @@ def QuartersEstimates(num_qtr):
# in order to reduce the number of dates we need to iterate through when
# testing.
releases = pd.DataFrame({
'timestamp': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')],
'event_date': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')],
TS_FIELD_NAME: [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')],
EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-15'),
pd.Timestamp('2015-01-31')],
'estimate': [0.5, 0.8],
'value': [0.6, 0.9],
'fiscal_quarter': [1.0, 2.0],
'fiscal_year': [2015.0, 2015.0]
FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0],
FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0]
})
q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'),
@@ -52,40 +59,43 @@ q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'),
q2_knowledge_dates = [pd.Timestamp('2015-01-16'), pd.Timestamp('2015-01-20'),
pd.Timestamp('2015-01-24'), pd.Timestamp('2015-01-28')]
# We want to model the possibility of an estimate predicting a release date
# that gets shifted forward/backward.
q1_release_dates = [pd.Timestamp('2015-01-13'), pd.Timestamp('2015-01-15')]
q2_release_dates = [pd.Timestamp('2015-01-28'), pd.Timestamp('2015-01-30')]
# that doesn't match the actual release. This could be done by dynamically
# generating more combinations with different release dates, but that
# significantly increases the amount of time it takes to run the tests. These
# hard-coded cases are sufficient to know that we can update our beliefs when
# we get new information.
q1_release_dates = [pd.Timestamp('2015-01-15'),
pd.Timestamp('2015-01-16')] # One day late
q2_release_dates = [pd.Timestamp('2015-01-30'), # One day early
pd.Timestamp('2015-01-31')]
estimates = pd.DataFrame({
EVENT_DATE_FIELD_NAME: q1_release_dates + q2_release_dates,
'estimate': [.1, .2, .3, .4],
'value': [np.NaN, np.NaN, np.NaN, np.NaN],
'fiscal_quarter': [1.0, 1.0, 2.0, 2.0],
'fiscal_year': [2015.0, 2015.0, 2015.0, 2015.0]
FISCAL_QUARTER_FIELD_NAME: [1.0, 1.0, 2.0, 2.0],
FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0, 2015.0, 2015.0]
})
def gen_estimates():
sid_estimates = []
sid_releases = []
release_dates = list(itertools.product(q1_release_dates, q2_release_dates))
knowledge_permutations = list(itertools.permutations(q1_knowledge_dates +
q2_knowledge_dates,
4))
all_permutations = itertools.product(knowledge_permutations,
release_dates)
for sid, ((q1e1, q1e2, q2e1, q2e2), (rd1, rd2)) in enumerate(
all_permutations):
for sid, (q1e1, q1e2, q2e1, q2e2) in enumerate(
itertools.permutations(q1_knowledge_dates + q2_knowledge_dates,
4)
):
# We're assuming that estimates must come before the relevant release.
if q1e1 < q1e2 and q2e1 < q2e2 and q1e1 < rd1 and q1e2 < \
rd2:
if (q1e1 < q1e2 and
q2e1 < q2e2 and
q1e1 < q1_release_dates[0] and
q1e2 < q1_release_dates[1]):
sid_estimate = estimates.copy(True)
sid_estimate['timestamp'] = [q1e1, q1e2, q2e1, q2e2]
sid_estimate['event_date'] = [rd1]*2 + [rd2] * 2
sid_estimate['sid'] = sid
sid_estimate[TS_FIELD_NAME] = [q1e1, q1e2, q2e1, q2e2]
sid_estimate[SID_FIELD_NAME] = sid
sid_estimates += [sid_estimate]
sid_release = releases.copy(True)
sid_release['sid'] = sid_estimate['sid']
sid_release[SID_FIELD_NAME] = sid_estimate[SID_FIELD_NAME]
sid_releases += [sid_release]
return pd.concat(sid_estimates + sid_releases).reset_index(drop=True)
@@ -105,28 +115,44 @@ class EstimateTestCase(WithAssetFinder,
cls.sids = cls.events['sid'].unique()
cls.columns = {
Estimates.estimate: 'estimate',
Estimates.event_date: 'event_date',
Estimates.fiscal_quarter: 'fiscal_quarter',
Estimates.fiscal_year: 'fiscal_year',
Estimates.event_date: EVENT_DATE_FIELD_NAME,
Estimates.fiscal_quarter: FISCAL_QUARTER_FIELD_NAME,
Estimates.fiscal_year: FISCAL_YEAR_FIELD_NAME,
Estimates.value: 'value',
}
cls.loader = cls.make_loader(
events=cls.events,
columns=cls.columns
)
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique())
cls.ASSET_FINDER_EQUITY_SIDS = list(
cls.events[SID_FIELD_NAME].unique()
)
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
]
super(EstimateTestCase, cls).init_class_fixtures()
def _test_wrong_num_quarters_passed(self):
with self.assertRaises(ValueError):
dataset = QuartersEstimates(-1)
engine = SimplePipelineEngine(
lambda x: self.loader,
self.trading_days,
self.asset_finder,
)
engine.run_pipeline(
Pipeline({c.name: c.latest for c in dataset.columns}),
start_date=self.trading_days[0],
end_date=self.trading_days[-1],
)
class NextEstimateTestCase(EstimateTestCase):
@classmethod
def make_loader(cls, events, columns):
return NextQuartersEstimatesLoader(events, columns)
#@profile
def test_next_estimates(self):
"""
The goal of this test is to make sure that we select the right
@@ -145,30 +171,62 @@ class NextEstimateTestCase(EstimateTestCase):
end_date=self.trading_days[-1],
)
for sid in self.sids:
sid_events = results.xs(sid, level=1)
ed_sorted_events = self.events[
self.events['sid'] == sid
]
ed_sorted_events['key'] = 1
all_dates = pd.DataFrame({'all_dates': sid_events.index})
all_dates['key'] = 1
crossproduct = pd.merge(all_dates, ed_sorted_events, on='key')
crossproduct = crossproduct[crossproduct['timestamp'] <=
crossproduct['all_dates']]
crossproduct = crossproduct[crossproduct['event_date'] >=
crossproduct['all_dates']]
final = crossproduct.sort_values(by=['all_dates',
'event_date',
'timestamp'],
ascending=[True, True,
False]).groupby([
'all_dates', 'sid']).first().reset_index()
final = pd.merge(final, all_dates,
how='right').sort_values(by='all_dates').set_index(
'all_dates')
final.index.name = None
for colname in sid_events.columns:
assert_series_equal(final[colname], sid_events[colname])
sid_estimates = results.xs(sid, level=1)
ts_sorted_estimates = self.events[
self.events[SID_FIELD_NAME] == sid
].sort(TS_FIELD_NAME)
for i, date in enumerate(sid_estimates.index):
comparable_date = date.tz_localize(None)
# Filter out estimates we don't know about yet.
ts_eligible_estimates = ts_sorted_estimates[
ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date
]
expected_estimate = pd.DataFrame()
if not ts_eligible_estimates.empty:
q1_knowledge = ts_eligible_estimates[
ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1
]
q2_knowledge = ts_eligible_estimates[
ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2
]
# If our latest knowledge of q1 is that the release is
# happening on this simulation date or later, then that's
# the estimate we want to use.
if (not q1_knowledge.empty and
q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >=
comparable_date):
expected_estimate = q1_knowledge.iloc[-1]
# If q1 has already happened or we don't know about it
# yet and our latest knowledge indicates that q2 hasn't
# happend yet, then that's the estimate we want to use.
elif (not q2_knowledge.empty and
q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] >=
comparable_date):
expected_estimate = q2_knowledge.iloc[-1]
if not expected_estimate.empty:
for colname in sid_estimates.columns:
expected_value = expected_estimate[colname]
computed_value = sid_estimates.iloc[i][colname]
assert_equal(expected_value, computed_value)
else:
assert sid_estimates.iloc[i].isnull().all()
def test_wrong_num_quarters_passed(self):
self._test_wrong_num_quarters_passed()
class BlazeNextEstimateLoaderTestCase(NextEstimateTestCase):
"""
Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader.
"""
@classmethod
def make_loader(cls, events, columns):
return BlazeNextEstimatesLoader(
bz.data(events),
columns,
)
class PreviousEstimateTestCase(EstimateTestCase):
@@ -194,24 +252,62 @@ class PreviousEstimateTestCase(EstimateTestCase):
end_date=self.trading_days[-1],
)
for sid in self.sids:
sid_events = results.xs(sid, level=1)
ed_sorted_events = self.events[
self.events['sid'] == sid
].sort_values(by=['event_date', 'timestamp'])
for i, date in enumerate(sid_events.index):
# Filter for events that happened on or before the simulation
# date and that we knew about on or before the simulation date.
ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date]
ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date]
if not ts_eligible_events.empty:
# The expected event is the one we knew about last.
expected_event = ts_eligible_events.iloc[-1]
for colname in sid_events.columns:
expected_value = expected_event[colname]
computed_value = sid_events.iloc[i][colname]
sid_estimates = results.xs(sid, level=1)
ts_sorted_estimates = self.events[
self.events[SID_FIELD_NAME] == sid
].sort(TS_FIELD_NAME)
for i, date in enumerate(sid_estimates.index):
comparable_date = date.tz_localize(None)
# Filter out estimates we don't know about yet.
ts_eligible_estimates = ts_sorted_estimates[
ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date
]
expected_estimate = pd.DataFrame()
if not ts_eligible_estimates.empty:
# Determine the last piece of information we know about
# for q1 and q2. This takes advantage of the fact that we
# only have 2 quarters in the test data.
q1_knowledge = ts_eligible_estimates[
ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1
]
q2_knowledge = ts_eligible_estimates[
ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2
]
# The expected estimate will be for q2 if the last thing
# we've seen is that the release date already happened.
# Otherwise, it'll be for q1, as long as the release date
# for q1 has already happened.
if (not q2_knowledge.empty and
q2_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <=
comparable_date):
expected_estimate = q2_knowledge.iloc[-1]
elif (not q1_knowledge.empty and
q1_knowledge.iloc[-1][EVENT_DATE_FIELD_NAME] <=
comparable_date):
expected_estimate = q1_knowledge.iloc[-1]
if not expected_estimate.empty:
for colname in sid_estimates.columns:
expected_value = expected_estimate[colname]
computed_value = sid_estimates.iloc[i][colname]
assert_equal(expected_value, computed_value)
else:
assert sid_events.iloc[i].isnull().all()
assert sid_estimates.iloc[i].isnull().all()
def test_wrong_num_quarters_passed(self):
self._test_wrong_num_quarters_passed()
class BlazePreviousEstimateLoaderTestCase(PreviousEstimateTestCase):
"""
Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader.
"""
@classmethod
def make_loader(cls, events, columns):
return BlazePreviousEstimatesLoader(
bz.data(events),
columns,
)
class QuarterShiftTestCase(ZiplineTestCase):
@@ -225,20 +321,19 @@ class QuarterShiftTestCase(ZiplineTestCase):
expected = pd.DataFrame(([yr, qtr] for yr in range(0, 4) for qtr
in range(1, 5)))
for i in range(0, 8):
years, quarters = calc_forward_shift(input_yrs, input_qtrs, i)
years, quarters = shift_quarters(i, input_yrs, input_qtrs)
# Can't use assert_series_equal here with check_names=False
# because that still fails due to name differences.
assert years.equals(expected[i:i+4].reset_index(drop=True)[0])
assert quarters.equals(expected[i:i+4].reset_index(drop=True)[1])
def test_calc_backward_shift(self):
input_yrs = pd.Series([0] * 4)
input_qtrs = pd.Series(range(4, 0, -1))
expected = pd.DataFrame(([yr, qtr] for yr in range(0, -4, -1) for qtr
in range(4, 0, -1)))
for i in range(0, 8):
years, quarters = calc_backward_shift(input_yrs, input_qtrs, i)
for i in range(0, 8, 1):
years, quarters = shift_quarters(-i, input_yrs, input_qtrs)
# Can't use assert_series_equal here with check_names=False
# because that still fails due to name differences.
assert years.equals(expected[i:i+4].reset_index(drop=True)[0])
+1 -31
View File
@@ -1219,36 +1219,6 @@ def bind_expression_to_resources(expr, resources):
})
def load_raw_data(assets, dates, data_query_time, data_query_tz, expr,
odo_kwargs):
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
expr,
lower_dt,
upper_dt,
odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
return raw
def ffill_query_in_range(expr,
lower,
upper,
@@ -1303,4 +1273,4 @@ def ffill_query_in_range(expr,
**odo_kwargs
)
raw.loc[:, ts_field] = raw.loc[:, ts_field].astype('datetime64[ns]')
return raw
return raw
+16 -46
View File
@@ -2,14 +2,14 @@ from datashape import istabular
from .core import (
bind_expression_to_resources,
load_raw_data,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.events import (
required_event_fields,
from zipline.pipeline.loaders.blaze.utils import load_raw_data
from zipline.pipeline.loaders.quarter_estimates import (
NextQuartersEstimatesLoader,
PreviousQuartersEstimatesLoader,
required_estimates_fields,
)
from zipline.pipeline.loaders.quarter_estimates import \
NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader
from zipline.pipeline.loaders.utils import (
check_data_query_args,
)
@@ -47,7 +47,7 @@ class BlazeEstimatesLoader(PipelineLoader):
And other dataset-specific fields, where each row of the table is a
record including the sid to identify the company, the timestamp where we
learned about the announcement, and the date when the earnings will be z
learned about the announcement, and the date when the earnings will be
announced.
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
@@ -61,8 +61,7 @@ class BlazeEstimatesLoader(PipelineLoader):
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
loader=None):
data_query_tz=None):
dshape = expr.dshape
if not istabular(dshape):
@@ -71,7 +70,7 @@ class BlazeEstimatesLoader(PipelineLoader):
)
required_cols = list(
required_event_fields(columns)
required_estimates_fields(columns)
)
self._expr = bind_expression_to_resources(
expr[required_cols],
@@ -82,15 +81,18 @@ class BlazeEstimatesLoader(PipelineLoader):
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
self.loader = loader
def load_adjusted_array(self, columns, dates, assets, mask):
raw = load_raw_data(assets, dates, self._data_query_time,
self._data_query_tz, self._exp, self._odo_kwargs)
raw = load_raw_data(assets,
dates,
self._data_query_time,
self._data_query_tz,
self._expr,
self._odo_kwargs)
return self.loader(
events=raw,
next_value_columns=self._columns,
raw,
self._columns,
).load_adjusted_array(
columns,
dates,
@@ -102,38 +104,6 @@ class BlazeEstimatesLoader(PipelineLoader):
class BlazeNextEstimatesLoader(BlazeEstimatesLoader):
loader = NextQuartersEstimatesLoader
def __init__(self,
expr,
columns,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
loader=None):
super(BlazeNextEstimatesLoader).__init__(expr,
columns,
resources,
odo_kwargs,
data_query_time,
data_query_tz,
loader)
class BlazePreviousEstimatesLoader(BlazeEstimatesLoader):
loader = PreviousQuartersEstimatesLoader
def __init__(self,
expr,
columns,
resources=None,
odo_kwargs=None,
data_query_time=None,
data_query_tz=None,
loader=None):
super(BlazeNextEstimatesLoader).__init__(expr,
columns,
resources,
odo_kwargs,
data_query_time,
data_query_tz,
loader)
+5 -3
View File
@@ -2,14 +2,16 @@ from datashape import istabular
from .core import (
bind_expression_to_resources,
load_raw_data,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.blaze.utils import load_raw_data
from zipline.pipeline.loaders.events import (
EventsLoader,
required_event_fields,
)
from zipline.pipeline.loaders.utils import check_data_query_args
from zipline.pipeline.loaders.utils import (
check_data_query_args,
)
from zipline.utils.input_validation import ensure_timezone, optionally
from zipline.utils.preprocess import preprocess
@@ -29,7 +31,7 @@ class BlazeEventsLoader(PipelineLoader):
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
The timezone to use for the data query cutoff.
dataset : DataSet
The DataSet object for which this loader loads data.
+61
View File
@@ -0,0 +1,61 @@
from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME
from zipline.pipeline.loaders.blaze.core import ffill_query_in_range
from zipline.pipeline.loaders.utils import (
normalize_data_query_bounds,
normalize_timestamp_to_query_time,
)
def load_raw_data(assets, dates, data_query_time, data_query_tz, expr,
odo_kwargs):
"""
given an expression representing data to load, perform normalization and
forward-filling and return the data, materialized.
parameters
----------
assets : pd.int64index
the assets to load data for.
dates : pd.datetimeindex
the simulation dates to load data for.
data_query_time : datetime.time
the time used as cutoff for new information.
data_query_tz : tzinfo
the timezone to normalize your dates to before comparing against
`time`.
expr : expr
the expression representing the data to load.
odo_kwargs : dict, optional
extra keyword arguments to pass to odo when executing the expression.
returns
-------
raw : pd.dataframe
the data symbolized by `expr` materialized in a dataframe.
"""
lower_dt, upper_dt = normalize_data_query_bounds(
dates[0],
dates[-1],
data_query_time,
data_query_tz,
)
raw = ffill_query_in_range(
expr,
lower_dt,
upper_dt,
odo_kwargs,
)
sids = raw.loc[:, SID_FIELD_NAME]
raw.drop(
sids[~sids.isin(assets)].index,
inplace=True
)
if data_query_time is not None:
normalize_timestamp_to_query_time(
raw,
data_query_time,
data_query_tz,
inplace=True,
ts_field=TS_FIELD_NAME,
)
return raw
+19 -4
View File
@@ -5,14 +5,13 @@ from six import viewvalues
from toolz import groupby, merge
from .base import PipelineLoader
from .frame import DataFrameLoader
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.pipeline.loaders.utils import (
choose_rows_by_indexer,
next_event_indexer,
previous_event_indexer,
)
@@ -167,7 +166,7 @@ class EventsLoader(PipelineLoader):
if not columns:
return {}
return choose_rows_by_indexer(
return self._load_events(
rows=self.events,
name_map=self.next_value_columns,
indexer=self.next_event_indexer(dates, sids),
@@ -181,7 +180,7 @@ class EventsLoader(PipelineLoader):
if not columns:
return {}
return choose_rows_by_indexer(
return self._load_events(
rows=self.events,
name_map=self.previous_value_columns,
indexer=self.previous_event_indexer(dates, sids),
@@ -191,6 +190,22 @@ class EventsLoader(PipelineLoader):
mask=mask,
)
def _load_events(self, name_map, indexer, columns, dates, sids, mask):
def to_frame(array):
return pd.DataFrame(array, index=dates, columns=sids)
out = {}
for c in columns:
raw = self.events[name_map[c]][indexer]
# indexer will be -1 for locations where we don't have a known
# value.
raw[indexer < 0] = c.missing_value
# Delegate the actual array formatting logic to a DataFrameLoader.
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c]
return out
def load_adjusted_array(self, columns, dates, sids, mask):
n, p = self.split_next_and_previous_event_columns(columns)
return merge(
+94 -140
View File
@@ -1,7 +1,8 @@
import numpy as np
from abc import abstractmethod
import pandas as pd
from six import viewvalues
from toolz import groupby
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
@@ -11,90 +12,33 @@ from zipline.pipeline.common import (
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.frame import DataFrameLoader
import line_profiler
from zipline.pipeline.loaders.utils import choose_rows_by_indexer
PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter'
PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year'
from zipline.utils.pandas_utils import cross_product
NEXT_FISCAL_QUARTER = 'next_fiscal_quarter'
NEXT_FISCAL_YEAR = 'next_fiscal_year'
FISCAL_QUARTER = 'fiscal_quarter'
FISCAL_YEAR = 'fiscal_year'
ALL_DATES = 'dates'
prof = line_profiler.LineProfiler()
PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter'
PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year'
SIMULTATION_DATES = 'dates'
#@profile
def calc_forward_shift(yrs, qtrs, num_qtrs_shift):
"""
Calculate the number of years to shift forward and the new quarter in the
shifted year.
Parameters
----------
qtr : int
The starting quarter.
num_qtr_shift : int
The number of quarters to shift forward.
yr : int
The starting year.
Returns
-------
s : pd.Series
A series containins the new year and quarter.
"""
result_qtrs = (qtrs + num_qtrs_shift) % 4
result_years = yrs + (qtrs + num_qtrs_shift) // 4
to_adjust = result_qtrs[result_qtrs == 0].index
result_years.iloc[to_adjust] -= 1
result_qtrs.iloc[to_adjust] = 4
return result_years, result_qtrs
def normalize_quarters(years, quarters):
return years * 4 + quarters - 1
#@profile
def calc_backward_shift(yrs, qtrs, num_qtrs_shift):
"""
Calculate the number of years to shift backward and the new quarter in the
shifted year.
Parameters
----------
qtr : int
The starting quarter.
num_qtr_shift : int
The number of quarters to shift backward.
yr : int
The starting year.
Returns
-------
s : pd.Series
A series containins the new year and quarter.
"""
result_qtrs = 4 - (num_qtrs_shift - qtrs) % 4
# Must subtract 1 year since we go backwards at least `qtr` number of
# quarters
result_years = yrs - (num_qtrs_shift - qtrs) // 4 - 1
no_yr_boundary_crossed = qtrs[qtrs > num_qtrs_shift].index
result_years.iloc[no_yr_boundary_crossed] = yrs.iloc[no_yr_boundary_crossed]
result_qtrs.iloc[no_yr_boundary_crossed] = qtrs.iloc[no_yr_boundary_crossed] - num_qtrs_shift
return result_years, result_qtrs
def split_normalized_quarters(normalized_quarters):
years = normalized_quarters // 4
quarters = normalized_quarters % 4
return years, quarters + 1
def required_event_fields(columns):
def shift_quarters(by, years, quarters):
return split_normalized_quarters(normalize_quarters(years, quarters) + by)
def required_estimates_fields(columns):
"""
Compute the set of resource columns required to serve
``next_value_columns`` and ``previous_value_columns``.
`columns`.
"""
# These metadata columns are used to align event indexers.
return {
@@ -112,16 +56,16 @@ def required_event_fields(columns):
def validate_column_specs(events, columns):
"""
Verify that the columns of ``events`` can be used by an EventsLoader to
serve the BoundColumns described by ``next_value_columns`` and
``previous_value_columns``.
Verify that the columns of ``events`` can be used by a
QuarterEstimatesLoader to serve the BoundColumns described by
`columns`.
"""
required = required_event_fields(columns)
required = required_estimates_fields(columns)
received = set(events.columns)
missing = required - received
if missing:
raise ValueError(
"EventsLoader missing required columns {missing}.\n"
"QuarterEstimatesLoader missing required columns {missing}.\n"
"Got Columns: {received}\n"
"Expected Columns: {required}".format(
missing=sorted(missing),
@@ -148,35 +92,45 @@ class QuarterEstimatesLoader(PipelineLoader):
self.base_column_name_map = base_column_name_map
@abstractmethod
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
pass
#@profile
def load_adjusted_array(self, columns, dates, assets, mask):
# TODO: how can we enforce that datasets have the num_quarters
# attribute, given that they're created dynamically?
groups = groupby(lambda x: x.dataset.num_quarters, columns)
groups_columns = dict(groups)
if (pd.Series(groups_columns.keys()) < 0).any():
raise ValueError("Must pass a number of quarters >= 0")
out = {}
date_values = pd.DataFrame({'dates': dates})
date_values['key'] = 1
self.estimates['key'] = 1
merged = pd.merge(date_values, self.estimates, on='key')
date_values = pd.DataFrame({SIMULTATION_DATES: dates})
# dates column must be of type datetime64[ns] in order for subsequent
# comparisons to work correctly.
date_values[SIMULTATION_DATES] = date_values[
SIMULTATION_DATES
].astype('datetime64[ns]')
estimates_all_dates = cross_product(date_values, self.estimates)
asset_df = pd.DataFrame({SID_FIELD_NAME: assets})
asset_df['key'] = 1
dates_sids = pd.merge(date_values, asset_df, on='key')
merged.drop('key', axis=1, inplace=True)
dates_sids.drop('key', axis=1, inplace=True)
for num_quarters in groups:
name_map = {c: self.base_column_name_map[getattr(c.dataset.__base__, c.name)] for c in columns}
dates_sids = cross_product(date_values, asset_df)
for num_quarters, columns in groups_columns.iteritems():
name_map = {c:
self.base_column_name_map[
getattr(c.dataset.__base__, c.name)
] for c in columns}
columns = groups[num_quarters]
# First, group by sid, fiscal year, and fiscal quarter and only
# keep the last estimate made.
final_releases_per_qtr = merged[merged[TS_FIELD_NAME] <=
merged.dates].sort(
['dates', TS_FIELD_NAME]
).groupby(
['dates', SID_FIELD_NAME, FISCAL_YEAR, FISCAL_QUARTER]
).last()
final_releases_per_qtr = final_releases_per_qtr.reset_index()
# First, determine which estimates we would have known about on
# each date. Then, Sort by timestamp and group to find the latest
# estimate for each quarter.
final_releases_per_qtr = estimates_all_dates[
estimates_all_dates[TS_FIELD_NAME] <=
estimates_all_dates.dates
].sort([TS_FIELD_NAME]).groupby(
[SIMULTATION_DATES,
SID_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME]
).nth(-1).reset_index()
result = self.load_quarters(num_quarters,
dates_sids,
@@ -184,50 +138,51 @@ class QuarterEstimatesLoader(PipelineLoader):
for c in columns:
column_name = name_map[c]
# Need to pass a DataFrame that has dates as the index and
# all sids as columns with column values being the value in
# 'result' for column c
# Pivot to get a DataFrame with dates as the index and
# sids as the columns.
loader = DataFrameLoader(
c,
result.pivot(index='dates',
result.pivot(index=SIMULTATION_DATES,
columns=SID_FIELD_NAME,
values=column_name),
adjustments=None
)
out[c] = loader.load_adjusted_array([c], dates, assets, mask)[c]
out[c] = loader.load_adjusted_array([c],
dates,
assets,
mask)[c]
return out
class NextQuartersEstimatesLoader(QuarterEstimatesLoader):
#@profile
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
# Filter for releases that are after each simulation date.
# Filter for releases that are on or after each simulation date.
eligible_next_releases = final_releases_per_qtr[
final_releases_per_qtr[EVENT_DATE_FIELD_NAME] >=
final_releases_per_qtr['dates']
final_releases_per_qtr[SIMULTATION_DATES]
]
# For each sid, get the upcoming release.
eligible_next_releases.sort(EVENT_DATE_FIELD_NAME)
# For each sid, get the upcoming release/year/quarter.
next_releases = eligible_next_releases.groupby(
['dates', SID_FIELD_NAME]
[SIMULTATION_DATES, SID_FIELD_NAME]
).nth(0).reset_index() # We use nth here to avoid forward filling
# NaNs, which `first()` will do.
next_releases = next_releases.rename(
columns={FISCAL_YEAR: NEXT_FISCAL_YEAR,
FISCAL_QUARTER: NEXT_FISCAL_QUARTER}
columns={FISCAL_YEAR_FIELD_NAME: NEXT_FISCAL_YEAR,
FISCAL_QUARTER_FIELD_NAME: NEXT_FISCAL_QUARTER}
)
# `next_qtr` is already the next quarter over,
# so we should offest `num_shifts` by 1.
(next_releases[FISCAL_YEAR],
next_releases[FISCAL_QUARTER]) = calc_forward_shift(
# The next fiscal quarter is already our starting point,
# so we should offset `num_quarters` by 1.
(next_releases[FISCAL_YEAR_FIELD_NAME],
next_releases[FISCAL_QUARTER_FIELD_NAME]) = shift_quarters(
(num_quarters - 1),
next_releases[NEXT_FISCAL_YEAR],
next_releases[NEXT_FISCAL_QUARTER], (num_quarters - 1)
next_releases[NEXT_FISCAL_QUARTER],
)
# Merge to get the rows we care about for each date
# Do a left merge to get values for each date.
result = dates_sids.merge(next_releases,
on=(['dates', SID_FIELD_NAME]),
on=([SIMULTATION_DATES, SID_FIELD_NAME]),
how='left')
return result
@@ -236,37 +191,36 @@ class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader):
def __init__(self,
estimates,
columns):
super(PreviousQuartersEstimatesLoader, self).__init__(estimates, columns)
super(PreviousQuartersEstimatesLoader, self).__init__(estimates,
columns)
#@profile
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
# Filter for releases that are before each simulation date.
# Filter for releases that are on or before each simulation date.
eligible_previous_releases = final_releases_per_qtr[
final_releases_per_qtr[EVENT_DATE_FIELD_NAME] <=
final_releases_per_qtr['dates']
final_releases_per_qtr[SIMULTATION_DATES]
]
# For each sid, get the latest release.
eligible_previous_releases.sort(EVENT_DATE_FIELD_NAME)
# For each sid, get the latest release we knew about prior to
# each simulation date.
previous_releases = eligible_previous_releases.groupby(
['dates', SID_FIELD_NAME]
[SIMULTATION_DATES, SID_FIELD_NAME]
).nth(-1).reset_index() # We use nth here to avoid forward filling
# NaNs, which `last()` will do.
previous_releases = previous_releases.rename(columns={
FISCAL_YEAR: PREVIOUS_FISCAL_YEAR,
FISCAL_QUARTER: PREVIOUS_FISCAL_QUARTER
FISCAL_YEAR_FIELD_NAME: PREVIOUS_FISCAL_YEAR,
FISCAL_QUARTER_FIELD_NAME: PREVIOUS_FISCAL_QUARTER
})
(previous_releases[FISCAL_YEAR],
previous_releases[FISCAL_QUARTER]) = \
calc_backward_shift(
previous_releases[PREVIOUS_FISCAL_YEAR], previous_releases[
PREVIOUS_FISCAL_QUARTER], (num_quarters - 1)
# The previous fiscal quarter is already our starting point,
# so we should offset `num_quarters` by 1.
(previous_releases[FISCAL_YEAR_FIELD_NAME],
previous_releases[FISCAL_QUARTER_FIELD_NAME]) = shift_quarters(
-(num_quarters - 1),
previous_releases[PREVIOUS_FISCAL_YEAR],
previous_releases[PREVIOUS_FISCAL_QUARTER],
)
# Merge to get the rows we care about for each date
# Do a left merge to get values for each date.
result = dates_sids.merge(previous_releases,
on=(['dates', SID_FIELD_NAME]), how='left')
on=([SIMULTATION_DATES,
SID_FIELD_NAME]),
how='left')
return result
-18
View File
@@ -2,7 +2,6 @@ import datetime
import numpy as np
import pandas as pd
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.utils.pandas_utils import mask_between_time
@@ -273,20 +272,3 @@ def check_data_query_args(data_query_time, data_query_tz):
data_query_tz,
),
)
def choose_rows_by_indexer(rows, name_map, indexer, columns, dates, sids, mask):
def to_frame(array):
return pd.DataFrame(array, index=dates, columns=sids)
out = {}
for c in columns:
raw = rows[name_map[c]][indexer]
# indexer will be -1 for locations where we don't have a known
# value.
raw[indexer < 0] = c.missing_value
# Delegate the actual array formatting logic to a DataFrameLoader.
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c]
return out