mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 12:29:43 +08:00
TST: add tests for quarter rotation logic
This commit is contained in:
@@ -1,15 +1,24 @@
|
||||
from itertools import product
|
||||
import itertools
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandas.util.testing import assert_series_equal
|
||||
from zipline.pipeline import SimplePipelineEngine, Pipeline
|
||||
|
||||
from zipline.pipeline.data import DataSet, Column
|
||||
from zipline.pipeline.loaders.quarter_estimates import \
|
||||
NextQuartersEstimatesLoader, PreviousQuartersEstimatesLoader
|
||||
from zipline.pipeline.loaders.quarter_estimates import (
|
||||
NextQuartersEstimatesLoader,
|
||||
PreviousQuartersEstimatesLoader
|
||||
)
|
||||
from zipline.pipeline.loaders.quarter_estimates import (
|
||||
calc_forward_shift,
|
||||
calc_backward_shift
|
||||
)
|
||||
from zipline.testing import ZiplineTestCase
|
||||
from zipline.testing.fixtures import WithAssetFinder, WithTradingSessions
|
||||
from zipline.testing.predicates import assert_equal
|
||||
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
|
||||
import line_profiler
|
||||
prof = line_profiler.LineProfiler()
|
||||
|
||||
|
||||
class Estimates(DataSet):
|
||||
@@ -23,70 +32,106 @@ class Estimates(DataSet):
|
||||
def QuartersEstimates(num_qtr):
|
||||
class QtrEstimates(Estimates):
|
||||
num_quarters = num_qtr
|
||||
name=Estimates
|
||||
name = Estimates
|
||||
return QtrEstimates
|
||||
|
||||
# Final release dates never change
|
||||
# Final release dates never change. The quarters have very tight date ranges
|
||||
# in order to reduce the number of dates we need to iterate through when
|
||||
# testing.
|
||||
releases = pd.DataFrame({
|
||||
'sid': [1, 1],
|
||||
'timestamp': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-4-20')],
|
||||
'event_date': [pd.Timestamp('2015-01-20'), pd.Timestamp('2015-04-20')],
|
||||
'timestamp': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')],
|
||||
'event_date': [pd.Timestamp('2015-01-15'), pd.Timestamp('2015-01-31')],
|
||||
'estimate': [0.5, 0.8],
|
||||
'value': [0.6, 0.9],
|
||||
'fiscal_quarter': [1, 2],
|
||||
'fiscal_year': [2015, 2015]
|
||||
'fiscal_quarter': [1.0, 2.0],
|
||||
'fiscal_year': [2015.0, 2015.0]
|
||||
})
|
||||
|
||||
q1_knowledge_dates = [pd.Timestamp('2015-01-01'), pd.Timestamp('2015-01-04'),
|
||||
pd.Timestamp('2015-01-08'), pd.Timestamp('2015-01-12')]
|
||||
q2_knowledge_dates = [pd.Timestamp('2015-01-16'), pd.Timestamp('2015-01-20'),
|
||||
pd.Timestamp('2015-01-24'), pd.Timestamp('2015-01-28')]
|
||||
# We want to model the possibility of an estimate predicting a release date
|
||||
# that gets shifted forward/backward.
|
||||
q1_release_dates = [pd.Timestamp('2015-01-13'), pd.Timestamp('2015-01-15')]
|
||||
q2_release_dates = [pd.Timestamp('2015-01-28'), pd.Timestamp('2015-01-30')]
|
||||
estimates = pd.DataFrame({
|
||||
'sid': [1, 1, 1, 1],
|
||||
'timestamp': [pd.Timestamp('2015-01-02'),
|
||||
pd.Timestamp('2015-01-10'),
|
||||
pd.Timestamp('2015-04-02'),
|
||||
pd.Timestamp('2015-4-10')],
|
||||
'event_date': [pd.Timestamp('2015-01-20'),
|
||||
pd.Timestamp('2015-01-20'),
|
||||
pd.Timestamp('2015-04-20'),
|
||||
pd.Timestamp('2015-04-20')],
|
||||
'estimate': [.1, .2, .3, .4],
|
||||
'value': [np.NaN, np.NaN, np.NaN, np.NaN],
|
||||
'fiscal_quarter': [1, 1, 2, 2],
|
||||
'fiscal_year': [2015, 2015, 2015, 2015]
|
||||
'fiscal_quarter': [1.0, 1.0, 2.0, 2.0],
|
||||
'fiscal_year': [2015.0, 2015.0, 2015.0, 2015.0]
|
||||
})
|
||||
|
||||
events = pd.concat([releases, estimates])
|
||||
|
||||
def gen_estimates():
|
||||
sid_estimates = []
|
||||
sid_releases = []
|
||||
release_dates = list(itertools.product(q1_release_dates, q2_release_dates))
|
||||
knowledge_permutations = list(itertools.permutations(q1_knowledge_dates +
|
||||
q2_knowledge_dates,
|
||||
4))
|
||||
all_permutations = itertools.product(knowledge_permutations,
|
||||
release_dates)
|
||||
for sid, ((q1e1, q1e2, q2e1, q2e2), (rd1, rd2)) in enumerate(
|
||||
all_permutations):
|
||||
# We're assuming that estimates must come before the relevant release.
|
||||
if q1e1 < q1e2 and q2e1 < q2e2 and q1e1 < rd1 and q1e2 < \
|
||||
rd2:
|
||||
sid_estimate = estimates.copy(True)
|
||||
sid_estimate['timestamp'] = [q1e1, q1e2, q2e1, q2e2]
|
||||
sid_estimate['event_date'] = [rd1]*2 + [rd2] * 2
|
||||
sid_estimate['sid'] = sid
|
||||
sid_estimates += [sid_estimate]
|
||||
sid_release = releases.copy(True)
|
||||
sid_release['sid'] = sid_estimate['sid']
|
||||
sid_releases += [sid_release]
|
||||
|
||||
return pd.concat(sid_estimates + sid_releases).reset_index(drop=True)
|
||||
|
||||
|
||||
class NextEstimateTestCase(WithAssetFinder,
|
||||
WithTradingSessions,
|
||||
ZiplineTestCase):
|
||||
START_DATE = pd.Timestamp('2015-01-01')
|
||||
END_DATE = pd.Timestamp('2015-04-30')
|
||||
class EstimateTestCase(WithAssetFinder,
|
||||
WithTradingSessions,
|
||||
ZiplineTestCase):
|
||||
START_DATE = pd.Timestamp('2014-12-28')
|
||||
END_DATE = pd.Timestamp('2015-02-03')
|
||||
|
||||
@classmethod
|
||||
def make_loader(cls, events, columns):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def init_class_fixtures(cls):
|
||||
cls.events = gen_estimates()
|
||||
cls.sids = cls.events['sid'].unique()
|
||||
cls.columns = {
|
||||
Estimates.estimate: 'estimate',
|
||||
Estimates.event_date: 'event_date',
|
||||
Estimates.fiscal_quarter: 'fiscal_quarter',
|
||||
Estimates.fiscal_year: 'fiscal_year',
|
||||
Estimates.value: 'value',
|
||||
}
|
||||
cls.loader = cls.make_loader(
|
||||
events=cls.events,
|
||||
columns=cls.columns
|
||||
)
|
||||
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique())
|
||||
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
|
||||
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
|
||||
]
|
||||
super(EstimateTestCase, cls).init_class_fixtures()
|
||||
|
||||
|
||||
class NextEstimateTestCase(EstimateTestCase):
|
||||
@classmethod
|
||||
def make_loader(cls, events, columns):
|
||||
return NextQuartersEstimatesLoader(events, columns)
|
||||
|
||||
@classmethod
|
||||
def init_class_fixtures(cls):
|
||||
cls.events = events
|
||||
cls.columns = {
|
||||
Estimates.estimate: 'estimate',
|
||||
Estimates.event_date: 'event_date',
|
||||
Estimates.fiscal_quarter: 'fiscal_quarter',
|
||||
Estimates.fiscal_year: 'fiscal_year',
|
||||
Estimates.value: 'value',
|
||||
}
|
||||
cls.loader = cls.make_loader(
|
||||
events=cls.events,
|
||||
columns=cls.columns
|
||||
)
|
||||
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique())
|
||||
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
|
||||
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
|
||||
]
|
||||
super(NextEstimateTestCase, cls).init_class_fixtures()
|
||||
|
||||
def test_regular(self):
|
||||
#@profile
|
||||
def test_next_estimates(self):
|
||||
"""
|
||||
The goal of this test is to make sure that we select the right
|
||||
datapoint as our 'next' w.r.t each date.
|
||||
"""
|
||||
dataset = QuartersEstimates(1)
|
||||
engine = SimplePipelineEngine(
|
||||
lambda x: self.loader,
|
||||
@@ -99,55 +144,43 @@ class NextEstimateTestCase(WithAssetFinder,
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
sid_events = results.xs(1, level=1)
|
||||
ed_sorted_events = self.events.sort(['event_date', 'timestamp'])
|
||||
for i, date in enumerate(sid_events.index):
|
||||
# Get all upcoming events that we know about on 'date'
|
||||
eligible_timestamps = ed_sorted_events[ed_sorted_events['timestamp']
|
||||
<= date]
|
||||
eligible_events = eligible_timestamps[eligible_timestamps['event_date'] >= date]
|
||||
if not eligible_events.empty:
|
||||
smallest_event_date = eligible_events.iloc[0]['event_date']
|
||||
expected_event = eligible_events[eligible_events['event_date'] == smallest_event_date].iloc[-1]
|
||||
for colname in sid_events.columns:
|
||||
expected_value = expected_event[colname]
|
||||
computed_value = sid_events.iloc[i][colname]
|
||||
assert_equal(expected_value, computed_value)
|
||||
else:
|
||||
assert sid_events.iloc[i].isnull().all()
|
||||
for sid in self.sids:
|
||||
sid_events = results.xs(sid, level=1)
|
||||
ed_sorted_events = self.events[
|
||||
self.events['sid'] == sid
|
||||
]
|
||||
ed_sorted_events['key'] = 1
|
||||
all_dates = pd.DataFrame({'all_dates': sid_events.index})
|
||||
all_dates['key'] = 1
|
||||
crossproduct = pd.merge(all_dates, ed_sorted_events, on='key')
|
||||
crossproduct = crossproduct[crossproduct['timestamp'] <=
|
||||
crossproduct['all_dates']]
|
||||
crossproduct = crossproduct[crossproduct['event_date'] >=
|
||||
crossproduct['all_dates']]
|
||||
final = crossproduct.sort_values(by=['all_dates',
|
||||
'event_date',
|
||||
'timestamp'],
|
||||
ascending=[True, True,
|
||||
False]).groupby([
|
||||
'all_dates', 'sid']).first().reset_index()
|
||||
final = pd.merge(final, all_dates,
|
||||
how='right').sort_values(by='all_dates').set_index(
|
||||
'all_dates')
|
||||
final.index.name = None
|
||||
for colname in sid_events.columns:
|
||||
assert_series_equal(final[colname], sid_events[colname])
|
||||
|
||||
|
||||
class PreviousEstimateTestCase(WithAssetFinder,
|
||||
WithTradingSessions,
|
||||
ZiplineTestCase):
|
||||
START_DATE = pd.Timestamp('2015-01-01')
|
||||
END_DATE = pd.Timestamp('2015-04-30')
|
||||
|
||||
class PreviousEstimateTestCase(EstimateTestCase):
|
||||
@classmethod
|
||||
def make_loader(cls, events, columns):
|
||||
return PreviousQuartersEstimatesLoader(events, columns)
|
||||
|
||||
@classmethod
|
||||
def init_class_fixtures(cls):
|
||||
cls.events = events
|
||||
cls.columns = {
|
||||
Estimates.estimate: 'estimate',
|
||||
Estimates.event_date: 'event_date',
|
||||
Estimates.fiscal_quarter: 'fiscal_quarter',
|
||||
Estimates.fiscal_year: 'fiscal_year',
|
||||
Estimates.value: 'value',
|
||||
}
|
||||
cls.loader = cls.make_loader(
|
||||
events=cls.events,
|
||||
columns=cls.columns
|
||||
)
|
||||
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.events['sid'].unique())
|
||||
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
|
||||
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
|
||||
]
|
||||
super(PreviousEstimateTestCase, cls).init_class_fixtures()
|
||||
|
||||
def test_regular(self):
|
||||
def test_previous_estimates(self):
|
||||
"""
|
||||
The goal of this test is to make sure that we select the right
|
||||
datapoint as our 'previous' w.r.t each date.
|
||||
"""
|
||||
dataset = QuartersEstimates(1)
|
||||
engine = SimplePipelineEngine(
|
||||
lambda x: self.loader,
|
||||
@@ -160,19 +193,53 @@ class PreviousEstimateTestCase(WithAssetFinder,
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
sid_events = results.xs(1, level=1)
|
||||
ed_sorted_events = self.events.sort(['event_date', 'timestamp'])
|
||||
for i, date in enumerate(sid_events.index):
|
||||
# Filter for events that happened on or before the simulation
|
||||
# date and that we knew about on or before the simulation date.
|
||||
ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date]
|
||||
ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date]
|
||||
if not ts_eligible_events.empty:
|
||||
# The expected event is the one we knew about last.
|
||||
expected_event = ts_eligible_events.iloc[-1]
|
||||
for colname in sid_events.columns:
|
||||
expected_value = expected_event[colname]
|
||||
computed_value = sid_events.iloc[i][colname]
|
||||
assert_equal(expected_value, computed_value)
|
||||
else:
|
||||
assert sid_events.iloc[i].isnull().all()
|
||||
for sid in self.sids:
|
||||
sid_events = results.xs(sid, level=1)
|
||||
ed_sorted_events = self.events[
|
||||
self.events['sid'] == sid
|
||||
].sort_values(by=['event_date', 'timestamp'])
|
||||
for i, date in enumerate(sid_events.index):
|
||||
# Filter for events that happened on or before the simulation
|
||||
# date and that we knew about on or before the simulation date.
|
||||
ed_eligible_events = ed_sorted_events[ed_sorted_events['event_date'] <= date]
|
||||
ts_eligible_events = ed_eligible_events[ed_eligible_events['timestamp'] <= date]
|
||||
if not ts_eligible_events.empty:
|
||||
# The expected event is the one we knew about last.
|
||||
expected_event = ts_eligible_events.iloc[-1]
|
||||
for colname in sid_events.columns:
|
||||
expected_value = expected_event[colname]
|
||||
computed_value = sid_events.iloc[i][colname]
|
||||
assert_equal(expected_value, computed_value)
|
||||
else:
|
||||
assert sid_events.iloc[i].isnull().all()
|
||||
|
||||
|
||||
class QuarterShiftTestCase(ZiplineTestCase):
|
||||
"""
|
||||
This tests, in isolation, quarter calculation logic for shifting quarters
|
||||
backwards/forwards from a starting point.
|
||||
"""
|
||||
def test_calc_forward_shift(self):
|
||||
input_yrs = pd.Series([0] * 4)
|
||||
input_qtrs = pd.Series(range(1, 5))
|
||||
expected = pd.DataFrame(([yr, qtr] for yr in range(0, 4) for qtr
|
||||
in range(1, 5)))
|
||||
for i in range(0, 8):
|
||||
years, quarters = calc_forward_shift(input_yrs, input_qtrs, i)
|
||||
# Can't use assert_series_equal here with check_names=False
|
||||
# because that still fails due to name differences.
|
||||
assert years.equals(expected[i:i+4].reset_index(drop=True)[0])
|
||||
assert quarters.equals(expected[i:i+4].reset_index(drop=True)[1])
|
||||
|
||||
|
||||
def test_calc_backward_shift(self):
|
||||
input_yrs = pd.Series([0] * 4)
|
||||
input_qtrs = pd.Series(range(4, 0, -1))
|
||||
expected = pd.DataFrame(([yr, qtr] for yr in range(0, -4, -1) for qtr
|
||||
in range(4, 0, -1)))
|
||||
for i in range(0, 8):
|
||||
years, quarters = calc_backward_shift(input_yrs, input_qtrs, i)
|
||||
# Can't use assert_series_equal here with check_names=False
|
||||
# because that still fails due to name differences.
|
||||
assert years.equals(expected[i:i+4].reset_index(drop=True)[0])
|
||||
assert quarters.equals(expected[i:i+4].reset_index(drop=True)[1])
|
||||
|
||||
@@ -12,6 +12,7 @@ from zipline.pipeline.common import (
|
||||
TS_FIELD_NAME,
|
||||
)
|
||||
from zipline.pipeline.loaders.utils import (
|
||||
choose_rows_by_indexer,
|
||||
next_event_indexer,
|
||||
previous_event_indexer,
|
||||
)
|
||||
@@ -166,7 +167,8 @@ class EventsLoader(PipelineLoader):
|
||||
if not columns:
|
||||
return {}
|
||||
|
||||
return self._load_events(
|
||||
return choose_rows_by_indexer(
|
||||
rows=self.events,
|
||||
name_map=self.next_value_columns,
|
||||
indexer=self.next_event_indexer(dates, sids),
|
||||
columns=columns,
|
||||
@@ -179,7 +181,8 @@ class EventsLoader(PipelineLoader):
|
||||
if not columns:
|
||||
return {}
|
||||
|
||||
return self._load_events(
|
||||
return choose_rows_by_indexer(
|
||||
rows=self.events,
|
||||
name_map=self.previous_value_columns,
|
||||
indexer=self.previous_event_indexer(dates, sids),
|
||||
columns=columns,
|
||||
@@ -188,22 +191,6 @@ class EventsLoader(PipelineLoader):
|
||||
mask=mask,
|
||||
)
|
||||
|
||||
def _load_events(self, name_map, indexer, columns, dates, sids, mask):
|
||||
def to_frame(array):
|
||||
return pd.DataFrame(array, index=dates, columns=sids)
|
||||
|
||||
out = {}
|
||||
for c in columns:
|
||||
raw = self.events[name_map[c]][indexer]
|
||||
# indexer will be -1 for locations where we don't have a known
|
||||
# value.
|
||||
raw[indexer < 0] = c.missing_value
|
||||
|
||||
# Delegate the actual array formatting logic to a DataFrameLoader.
|
||||
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
|
||||
out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c]
|
||||
return out
|
||||
|
||||
def load_adjusted_array(self, columns, dates, sids, mask):
|
||||
n, p = self.split_next_and_previous_event_columns(columns)
|
||||
return merge(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from six import viewvalues
|
||||
from toolz import groupby
|
||||
@@ -10,8 +11,84 @@ from zipline.pipeline.common import (
|
||||
)
|
||||
from zipline.pipeline.loaders.base import PipelineLoader
|
||||
from zipline.pipeline.loaders.frame import DataFrameLoader
|
||||
from zipline.pipeline.loaders.utils import calc_backward_shift, \
|
||||
calc_forward_shift
|
||||
|
||||
import line_profiler
|
||||
from zipline.pipeline.loaders.utils import choose_rows_by_indexer
|
||||
|
||||
PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter'
|
||||
|
||||
PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year'
|
||||
|
||||
NEXT_FISCAL_QUARTER = 'next_fiscal_quarter'
|
||||
|
||||
NEXT_FISCAL_YEAR = 'next_fiscal_year'
|
||||
|
||||
FISCAL_QUARTER = 'fiscal_quarter'
|
||||
|
||||
FISCAL_YEAR = 'fiscal_year'
|
||||
|
||||
ALL_DATES = 'dates'
|
||||
|
||||
prof = line_profiler.LineProfiler()
|
||||
|
||||
|
||||
#@profile
|
||||
def calc_forward_shift(yrs, qtrs, num_qtrs_shift):
|
||||
"""
|
||||
Calculate the number of years to shift forward and the new quarter in the
|
||||
shifted year.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
qtr : int
|
||||
The starting quarter.
|
||||
num_qtr_shift : int
|
||||
The number of quarters to shift forward.
|
||||
yr : int
|
||||
The starting year.
|
||||
|
||||
Returns
|
||||
-------
|
||||
s : pd.Series
|
||||
A series containins the new year and quarter.
|
||||
"""
|
||||
|
||||
result_qtrs = (qtrs + num_qtrs_shift) % 4
|
||||
result_years = yrs + (qtrs + num_qtrs_shift) // 4
|
||||
to_adjust = result_qtrs[result_qtrs == 0].index
|
||||
result_years.iloc[to_adjust] -= 1
|
||||
result_qtrs.iloc[to_adjust] = 4
|
||||
return result_years, result_qtrs
|
||||
|
||||
|
||||
#@profile
|
||||
def calc_backward_shift(yrs, qtrs, num_qtrs_shift):
|
||||
"""
|
||||
Calculate the number of years to shift backward and the new quarter in the
|
||||
shifted year.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
qtr : int
|
||||
The starting quarter.
|
||||
num_qtr_shift : int
|
||||
The number of quarters to shift backward.
|
||||
yr : int
|
||||
The starting year.
|
||||
|
||||
Returns
|
||||
-------
|
||||
s : pd.Series
|
||||
A series containins the new year and quarter.
|
||||
"""
|
||||
result_qtrs = 4 - (num_qtrs_shift - qtrs) % 4
|
||||
# Must subtract 1 year since we go backwards at least `qtr` number of
|
||||
# quarters
|
||||
result_years = yrs - (num_qtrs_shift - qtrs) // 4 - 1
|
||||
no_yr_boundary_crossed = qtrs[qtrs > num_qtrs_shift].index
|
||||
result_years.iloc[no_yr_boundary_crossed] = yrs.iloc[no_yr_boundary_crossed]
|
||||
result_qtrs.iloc[no_yr_boundary_crossed] = qtrs.iloc[no_yr_boundary_crossed] - num_qtrs_shift
|
||||
return result_years, result_qtrs
|
||||
|
||||
|
||||
def required_event_fields(columns):
|
||||
@@ -56,35 +133,40 @@ def validate_column_specs(events, columns):
|
||||
|
||||
class QuarterEstimatesLoader(PipelineLoader):
|
||||
def __init__(self,
|
||||
events,
|
||||
columns):
|
||||
estimates,
|
||||
base_column_name_map):
|
||||
validate_column_specs(
|
||||
events,
|
||||
columns
|
||||
estimates,
|
||||
base_column_name_map
|
||||
)
|
||||
|
||||
self.events = events[
|
||||
events[EVENT_DATE_FIELD_NAME].notnull() &
|
||||
events[FISCAL_QUARTER_FIELD_NAME].notnull() &
|
||||
events[FISCAL_YEAR_FIELD_NAME].notnull()
|
||||
self.estimates = estimates[
|
||||
estimates[EVENT_DATE_FIELD_NAME].notnull() &
|
||||
estimates[FISCAL_QUARTER_FIELD_NAME].notnull() &
|
||||
estimates[FISCAL_YEAR_FIELD_NAME].notnull()
|
||||
]
|
||||
|
||||
self.columns = columns
|
||||
self.base_column_name_map = base_column_name_map
|
||||
|
||||
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
|
||||
pass
|
||||
|
||||
#@profile
|
||||
def load_adjusted_array(self, columns, dates, assets, mask):
|
||||
groups = groupby(lambda x: x.dataset.num_quarters, columns)
|
||||
out = {}
|
||||
date_values = pd.DataFrame({'dates': dates})
|
||||
date_values['key'] = 1
|
||||
self.events['key'] = 1
|
||||
merged = pd.merge(date_values, self.events, on='key')
|
||||
asset_df = pd.DataFrame({'sid': assets})
|
||||
self.estimates['key'] = 1
|
||||
merged = pd.merge(date_values, self.estimates, on='key')
|
||||
asset_df = pd.DataFrame({SID_FIELD_NAME: assets})
|
||||
asset_df['key'] = 1
|
||||
dates_sids = pd.merge(date_values, asset_df, on='key')
|
||||
merged.drop('key', axis=1, inplace=True)
|
||||
dates_sids.drop('key', axis=1, inplace=True)
|
||||
for num_quarters in groups:
|
||||
name_map = {c: self.base_column_name_map[getattr(c.dataset.__base__, c.name)] for c in columns}
|
||||
|
||||
columns = groups[num_quarters]
|
||||
# First, group by sid, fiscal year, and fiscal quarter and only
|
||||
# keep the last estimate made.
|
||||
@@ -92,7 +174,7 @@ class QuarterEstimatesLoader(PipelineLoader):
|
||||
merged.dates].sort(
|
||||
['dates', TS_FIELD_NAME]
|
||||
).groupby(
|
||||
['dates', 'sid', 'fiscal_year', 'fiscal_quarter']
|
||||
['dates', SID_FIELD_NAME, FISCAL_YEAR, FISCAL_QUARTER]
|
||||
).last()
|
||||
final_releases_per_qtr = final_releases_per_qtr.reset_index()
|
||||
|
||||
@@ -101,15 +183,14 @@ class QuarterEstimatesLoader(PipelineLoader):
|
||||
final_releases_per_qtr)
|
||||
|
||||
for c in columns:
|
||||
super_col = getattr(c.dataset.__base__, c.name)
|
||||
column_name = self.columns[super_col]
|
||||
column_name = name_map[c]
|
||||
# Need to pass a DataFrame that has dates as the index and
|
||||
# all sids as columns with column values being the value in
|
||||
# 'result' for column c
|
||||
loader = DataFrameLoader(
|
||||
c,
|
||||
result.pivot(index='dates',
|
||||
columns='sid',
|
||||
columns=SID_FIELD_NAME,
|
||||
values=column_name),
|
||||
adjustments=None
|
||||
)
|
||||
@@ -118,11 +199,8 @@ class QuarterEstimatesLoader(PipelineLoader):
|
||||
|
||||
|
||||
class NextQuartersEstimatesLoader(QuarterEstimatesLoader):
|
||||
def __init__(self,
|
||||
events,
|
||||
columns):
|
||||
super(NextQuartersEstimatesLoader, self).__init__(events, columns)
|
||||
|
||||
#@profile
|
||||
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
|
||||
# Filter for releases that are after each simulation date.
|
||||
eligible_next_releases = final_releases_per_qtr[
|
||||
@@ -133,39 +211,34 @@ class NextQuartersEstimatesLoader(QuarterEstimatesLoader):
|
||||
eligible_next_releases.sort(EVENT_DATE_FIELD_NAME)
|
||||
# For each sid, get the upcoming release/year/quarter.
|
||||
next_releases = eligible_next_releases.groupby(
|
||||
['dates', 'sid']
|
||||
).min()
|
||||
['dates', SID_FIELD_NAME]
|
||||
).nth(0).reset_index() # We use nth here to avoid forward filling
|
||||
# NaNs, which `first()` will do.
|
||||
next_releases = next_releases.rename(
|
||||
columns={'fiscal_year': 'next_fiscal_year',
|
||||
'fiscal_quarter': 'next_fiscal_quarter'}
|
||||
columns={FISCAL_YEAR: NEXT_FISCAL_YEAR,
|
||||
FISCAL_QUARTER: NEXT_FISCAL_QUARTER}
|
||||
)
|
||||
# `next_qtr` is already the next quarter over,
|
||||
# so we should offest `num_shifts` by 1.
|
||||
next_releases['fiscal_quarter'] = next_releases.apply(
|
||||
lambda x: calc_forward_shift(x['next_fiscal_quarter'],
|
||||
num_quarters - 1)[1],
|
||||
axis=1
|
||||
)
|
||||
next_releases['fiscal_year'] = next_releases.apply(
|
||||
lambda x:
|
||||
x['next_fiscal_year'] +
|
||||
calc_forward_shift(x['next_fiscal_quarter'],
|
||||
num_quarters - 1)[0],
|
||||
axis=1
|
||||
(next_releases[FISCAL_YEAR],
|
||||
next_releases[FISCAL_QUARTER]) = calc_forward_shift(
|
||||
next_releases[NEXT_FISCAL_YEAR],
|
||||
next_releases[NEXT_FISCAL_QUARTER], (num_quarters - 1)
|
||||
)
|
||||
# Merge to get the rows we care about for each date
|
||||
result = dates_sids.merge(next_releases.reset_index(),
|
||||
on=(['dates', 'sid']),
|
||||
result = dates_sids.merge(next_releases,
|
||||
on=(['dates', SID_FIELD_NAME]),
|
||||
how='left')
|
||||
return result
|
||||
|
||||
|
||||
class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader):
|
||||
def __init__(self,
|
||||
events,
|
||||
estimates,
|
||||
columns):
|
||||
super(PreviousQuartersEstimatesLoader, self).__init__(events, columns)
|
||||
super(PreviousQuartersEstimatesLoader, self).__init__(estimates, columns)
|
||||
|
||||
#@profile
|
||||
def load_quarters(self, num_quarters, dates_sids, final_releases_per_qtr):
|
||||
# Filter for releases that are before each simulation date.
|
||||
eligible_previous_releases = final_releases_per_qtr[
|
||||
@@ -177,26 +250,23 @@ class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader):
|
||||
# For each sid, get the latest release we knew about prior to
|
||||
# each simulation date.
|
||||
previous_releases = eligible_previous_releases.groupby(
|
||||
['dates', 'sid']
|
||||
).max()
|
||||
['dates', SID_FIELD_NAME]
|
||||
).nth(-1).reset_index() # We use nth here to avoid forward filling
|
||||
# NaNs, which `last()` will do.
|
||||
|
||||
previous_releases = previous_releases.rename(columns={
|
||||
'fiscal_year': 'previous_fiscal_year',
|
||||
'fiscal_quarter': 'previous_fiscal_quarter'
|
||||
FISCAL_YEAR: PREVIOUS_FISCAL_YEAR,
|
||||
FISCAL_QUARTER: PREVIOUS_FISCAL_QUARTER
|
||||
})
|
||||
previous_releases['fiscal_quarter'] = previous_releases.apply(
|
||||
lambda x: calc_backward_shift(x['previous_fiscal_quarter'],
|
||||
(num_quarters - 1))[1],
|
||||
axis=1
|
||||
)
|
||||
previous_releases['fiscal_year'] = previous_releases.apply(
|
||||
lambda x:
|
||||
x['previous_fiscal_year'] -
|
||||
calc_backward_shift(x['previous_fiscal_quarter'],
|
||||
(num_quarters - 1))[0],
|
||||
axis=1
|
||||
|
||||
(previous_releases[FISCAL_YEAR],
|
||||
previous_releases[FISCAL_QUARTER]) = \
|
||||
calc_backward_shift(
|
||||
previous_releases[PREVIOUS_FISCAL_YEAR], previous_releases[
|
||||
PREVIOUS_FISCAL_QUARTER], (num_quarters - 1)
|
||||
)
|
||||
# Merge to get the rows we care about for each date
|
||||
result = dates_sids.merge(previous_releases.reset_index(),
|
||||
on=(['dates', 'sid']), how='left')
|
||||
result = dates_sids.merge(previous_releases,
|
||||
on=(['dates', SID_FIELD_NAME]), how='left')
|
||||
return result
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import datetime
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from zipline.pipeline.loaders.frame import DataFrameLoader
|
||||
from zipline.utils.pandas_utils import mask_between_time
|
||||
|
||||
|
||||
@@ -274,59 +275,18 @@ def check_data_query_args(data_query_time, data_query_tz):
|
||||
)
|
||||
|
||||
|
||||
def calc_forward_shift(qtr, num_qtrs_shift):
|
||||
"""
|
||||
Calculate the number of years to shift forward and the new quarter in the
|
||||
shifted year.
|
||||
def choose_rows_by_indexer(rows, name_map, indexer, columns, dates, sids, mask):
|
||||
def to_frame(array):
|
||||
return pd.DataFrame(array, index=dates, columns=sids)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
qtr : int
|
||||
The starting quarter.
|
||||
num_qtr_shift : int
|
||||
The number of quarters to shift forward.
|
||||
out = {}
|
||||
for c in columns:
|
||||
raw = rows[name_map[c]][indexer]
|
||||
# indexer will be -1 for locations where we don't have a known
|
||||
# value.
|
||||
raw[indexer < 0] = c.missing_value
|
||||
|
||||
Returns
|
||||
-------
|
||||
yrs_to_shift : int
|
||||
The number of years to shift forward.
|
||||
new_qtr : int
|
||||
The quarter number of the new quarter after shifting num_qtrs_shift
|
||||
forward from qtr.
|
||||
"""
|
||||
yrs_to_shift, new_qtr = divmod(qtr + num_qtrs_shift, 4)
|
||||
if new_qtr == 0:
|
||||
yrs_to_shift -= 1
|
||||
new_qtr = 4
|
||||
return yrs_to_shift, new_qtr
|
||||
|
||||
|
||||
def calc_backward_shift(qtr, num_qtrs_shift):
|
||||
"""
|
||||
Calculate the number of years to shift backward and the new quarter in the
|
||||
shifted year.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
qtr : int
|
||||
The starting quarter.
|
||||
num_qtr_shift : int
|
||||
The number of quarters to shift backward.
|
||||
|
||||
Returns
|
||||
-------
|
||||
yrs_to_shift : int
|
||||
The number of years to shift backward.
|
||||
new_qtr : int
|
||||
The quarter number of the new quarter after shifting num_qtrs_shift
|
||||
backward from qtr.
|
||||
"""
|
||||
if qtr > num_qtrs_shift:
|
||||
return 0, qtr - num_qtrs_shift
|
||||
# num_qtrs_shift >= qtr; subtract to offset qtr, then calculate how many
|
||||
# years/quarters to subtract.
|
||||
yrs_to_shift, subtract_qtr = divmod(abs(num_qtrs_shift - qtr), 4)
|
||||
# Must add 1 year since we go backwards at least `qtr` number of quarters
|
||||
yrs_to_shift += 1
|
||||
new_qtr = 4 - subtract_qtr
|
||||
return yrs_to_shift, new_qtr
|
||||
# Delegate the actual array formatting logic to a DataFrameLoader.
|
||||
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
|
||||
out[c] = loader.load_adjusted_array([c], dates, sids, mask)[c]
|
||||
return out
|
||||
Reference in New Issue
Block a user