TST: add test for sid with no data

MAINT: optimization - only look at assets appearing in data

TST: simplify test

DOC: add documentation for checkpoints

MAINT: explicitly cast event date field to datetime

MAINT: add back import

TST: fix indexing to remove setting wtih copy warning
This commit is contained in:
Maya Tydykov
2016-09-14 10:26:29 -04:00
parent 8d8f057120
commit cbd9bd068c
9 changed files with 776 additions and 642 deletions
+96 -96
View File
@@ -4,7 +4,6 @@ from nose.tools import assert_true
from nose_parameterized import parameterized
import numpy as np
import pandas as pd
from pandas.util.testing import assert_frame_equal, assert_series_equal
from toolz import merge
from zipline.pipeline import SimplePipelineEngine, Pipeline, CustomFactor
@@ -21,11 +20,11 @@ from zipline.pipeline.loaders.blaze.estimates import (
BlazeNextEstimatesLoader,
BlazePreviousEstimatesLoader
)
from zipline.pipeline.loaders.quarter_estimates import (
from zipline.pipeline.loaders.earnings_estimates import (
INVALID_NUM_QTRS_MESSAGE,
NextQuartersEstimatesLoader,
NextEarningsEstimatesLoader,
normalize_quarters,
PreviousQuartersEstimatesLoader,
PreviousEarningsEstimatesLoader,
split_normalized_quarters,
)
from zipline.testing.fixtures import (
@@ -78,7 +77,7 @@ class WithEstimates(WithTradingSessions, WithAssetFinder):
# Short window defined in order for test to run faster.
START_DATE = pd.Timestamp('2014-12-28')
END_DATE = pd.Timestamp('2015-02-03')
END_DATE = pd.Timestamp('2015-02-04')
@classmethod
def make_loader(cls, events, columns):
@@ -88,10 +87,14 @@ class WithEstimates(WithTradingSessions, WithAssetFinder):
def make_events(cls):
raise NotImplementedError('make_events')
@classmethod
def get_sids(cls):
return cls.events[SID_FIELD_NAME].unique()
@classmethod
def init_class_fixtures(cls):
cls.events = cls.make_events()
cls.sids = cls.events[SID_FIELD_NAME].unique()
cls.ASSET_FINDER_EQUITY_SIDS = cls.get_sids()
cls.columns = {
Estimates.event_date: 'event_date',
Estimates.fiscal_quarter: 'fiscal_quarter',
@@ -101,9 +104,6 @@ class WithEstimates(WithTradingSessions, WithAssetFinder):
cls.loader = cls.make_loader(cls.events, {column.name: val for
column, val in
cls.columns.items()})
cls.ASSET_FINDER_EQUITY_SIDS = list(
cls.events[SID_FIELD_NAME].unique()
)
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
]
@@ -190,7 +190,7 @@ class PreviousWithWrongNumQuarters(WithWrongLoaderDefinition,
"""
@classmethod
def make_loader(cls, events, columns):
return PreviousQuartersEstimatesLoader(events, columns)
return PreviousEarningsEstimatesLoader(events, columns)
class NextWithWrongNumQuarters(WithWrongLoaderDefinition,
@@ -201,7 +201,7 @@ class NextWithWrongNumQuarters(WithWrongLoaderDefinition,
"""
@classmethod
def make_loader(cls, events, columns):
return NextQuartersEstimatesLoader(events, columns)
return NextEarningsEstimatesLoader(events, columns)
class WithEstimatesTimeZero(WithEstimates):
@@ -234,24 +234,27 @@ class WithEstimatesTimeZero(WithEstimates):
Tests that we get the right 'time zero' value on each day for each
sid and for each column.
"""
# Shorter date range for performance
END_DATE = pd.Timestamp('2015-01-28')
q1_knowledge_dates = [pd.Timestamp('2015-01-01'),
pd.Timestamp('2015-01-04'),
pd.Timestamp('2015-01-08'),
pd.Timestamp('2015-01-12')]
q2_knowledge_dates = [pd.Timestamp('2015-01-16'),
pd.Timestamp('2015-01-07'),
pd.Timestamp('2015-01-11')]
q2_knowledge_dates = [pd.Timestamp('2015-01-14'),
pd.Timestamp('2015-01-17'),
pd.Timestamp('2015-01-20'),
pd.Timestamp('2015-01-24'),
pd.Timestamp('2015-01-28')]
pd.Timestamp('2015-01-23')]
# We want to model the possibility of an estimate predicting a release date
# that doesn't match the actual release. This could be done by dynamically
# generating more combinations with different release dates, but that
# significantly increases the amount of time it takes to run the tests.
# These hard-coded cases are sufficient to know that we can update our
# beliefs when we get new information.
q1_release_dates = [pd.Timestamp('2015-01-15'),
pd.Timestamp('2015-01-16')] # One day late
q2_release_dates = [pd.Timestamp('2015-01-30'), # One day early
pd.Timestamp('2015-01-31')]
q1_release_dates = [pd.Timestamp('2015-01-13'),
pd.Timestamp('2015-01-14')] # One day late
q2_release_dates = [pd.Timestamp('2015-01-25'), # One day early
pd.Timestamp('2015-01-26')]
@classmethod
def make_events(cls):
@@ -300,8 +303,15 @@ class WithEstimatesTimeZero(WithEstimates):
q2e2,
sid))
sid_releases.append(cls.create_releases_df(sid))
return pd.concat(sid_estimates +
sid_releases).reset_index(drop=True)
return pd.concat(sid_estimates + sid_releases).reset_index(drop=True)
@classmethod
def get_sids(cls):
sids = cls.events[SID_FIELD_NAME].unique()
# Tack on an extra sid to make sure that sids with no data are
# included but have all-null columns.
return list(sids) + [max(sids) + 1]
@classmethod
def create_releases_df(cls, sid):
@@ -309,10 +319,10 @@ class WithEstimatesTimeZero(WithEstimates):
# ranges in order to reduce the number of dates we need to iterate
# through when testing.
return pd.DataFrame({
TS_FIELD_NAME: [pd.Timestamp('2015-01-15'),
pd.Timestamp('2015-01-31')],
EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-15'),
pd.Timestamp('2015-01-31')],
TS_FIELD_NAME: [pd.Timestamp('2015-01-13'),
pd.Timestamp('2015-01-26')],
EVENT_DATE_FIELD_NAME: [pd.Timestamp('2015-01-13'),
pd.Timestamp('2015-01-26')],
'estimate': [0.5, 0.8],
FISCAL_QUARTER_FIELD_NAME: [1.0, 2.0],
FISCAL_YEAR_FIELD_NAME: [2015.0, 2015.0],
@@ -337,8 +347,6 @@ class WithEstimatesTimeZero(WithEstimates):
@classmethod
def init_class_fixtures(cls):
# Must be generated before call to super since super uses `events`.
cls.events = cls.make_events()
super(WithEstimatesTimeZero, cls).init_class_fixtures()
def get_expected_estimate(self,
@@ -356,58 +364,42 @@ class WithEstimatesTimeZero(WithEstimates):
)
results = engine.run_pipeline(
Pipeline({c.name: c.latest for c in dataset.columns}),
start_date=self.trading_days[0],
end_date=self.trading_days[-1],
start_date=self.trading_days[1],
end_date=self.trading_days[-2],
)
for sid in self.sids:
for sid in self.ASSET_FINDER_EQUITY_SIDS:
sid_estimates = results.xs(sid, level=1)
ts_sorted_estimates = self.events[
self.events[SID_FIELD_NAME] == sid
].sort(TS_FIELD_NAME)
for i, date in enumerate(sid_estimates.index):
comparable_date = date.tz_localize(None)
# Filter out estimates we don't know about yet.
ts_eligible_estimates = ts_sorted_estimates[
ts_sorted_estimates[TS_FIELD_NAME] <= comparable_date
# Separate assertion for all-null DataFrame to avoid setting
# column dtypes on `all_expected`.
if sid == max(self.ASSET_FINDER_EQUITY_SIDS):
assert_true(sid_estimates.isnull().all().all())
else:
ts_sorted_estimates = self.events[
self.events[SID_FIELD_NAME] == sid
].sort(TS_FIELD_NAME)
q1_knowledge = ts_sorted_estimates[
ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 1
]
# If there are estimates we know about:
if not ts_eligible_estimates.empty:
# Determine the last piece of information we know about
# for q1 and q2. This takes advantage of the fact that we
# only have 2 quarters in the test data.
q1_knowledge = ts_eligible_estimates[
ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 1
]
q2_knowledge = ts_eligible_estimates[
ts_eligible_estimates[FISCAL_QUARTER_FIELD_NAME] == 2
]
expected_estimate = self.get_expected_estimate(
q1_knowledge,
q2_knowledge,
comparable_date,
)
# Have to explicitly check for None because
# `expected_estimate` might be a DataFrame.
if expected_estimate is not None:
assert_series_equal(
sid_estimates.iloc[i],
expected_estimate[sid_estimates.columns],
check_names=False
)
else:
# There are no eligible 'next'/'previous' estimates on
# this day; everything should be null.
assert_true(sid_estimates.iloc[i].isnull().all())
else:
# We don't know about any estimates on this day;
# everything should be null.
assert_true(sid_estimates.iloc[i].isnull().all())
q2_knowledge = ts_sorted_estimates[
ts_sorted_estimates[FISCAL_QUARTER_FIELD_NAME] == 2
]
all_expected = pd.concat(
[self.get_expected_estimate(
q1_knowledge[q1_knowledge[TS_FIELD_NAME] <=
date.tz_localize(None)],
q2_knowledge[q2_knowledge[TS_FIELD_NAME] <=
date.tz_localize(None)],
date.tz_localize(None),
).set_index([[date]]) for date in sid_estimates.index],
axis=0)
assert_equal(all_expected[sid_estimates.columns],
sid_estimates)
class NextEstimate(WithEstimatesTimeZero, ZiplineTestCase):
@classmethod
def make_loader(cls, events, columns):
return NextQuartersEstimatesLoader(events, columns)
return NextEarningsEstimatesLoader(events, columns)
def get_expected_estimate(self,
q1_knowledge,
@@ -419,15 +411,16 @@ class NextEstimate(WithEstimatesTimeZero, ZiplineTestCase):
if (not q1_knowledge.empty and
q1_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] >=
comparable_date):
return q1_knowledge.iloc[-1]
return q1_knowledge.iloc[-1:]
# If q1 has already happened or we don't know about it
# yet and our latest knowledge indicates that q2 hasn't
# happened yet, then that's the estimate we want to use.
elif (not q2_knowledge.empty and
q2_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] >=
comparable_date):
return q2_knowledge.iloc[-1]
return None
return q2_knowledge.iloc[-1:]
return pd.DataFrame(columns=q1_knowledge.columns,
index=[comparable_date])
class BlazeNextEstimateLoaderTestCase(NextEstimate):
@@ -446,7 +439,7 @@ class BlazeNextEstimateLoaderTestCase(NextEstimate):
class PreviousEstimate(WithEstimatesTimeZero, ZiplineTestCase):
@classmethod
def make_loader(cls, events, columns):
return PreviousQuartersEstimatesLoader(events, columns)
return PreviousEarningsEstimatesLoader(events, columns)
def get_expected_estimate(self,
q1_knowledge,
@@ -460,12 +453,13 @@ class PreviousEstimate(WithEstimatesTimeZero, ZiplineTestCase):
if (not q2_knowledge.empty and
q2_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] <=
comparable_date):
return q2_knowledge.iloc[-1]
return q2_knowledge.iloc[-1:]
elif (not q1_knowledge.empty and
q1_knowledge[EVENT_DATE_FIELD_NAME].iloc[-1] <=
comparable_date):
return q1_knowledge.iloc[-1]
return None
return q1_knowledge.iloc[-1:]
return pd.DataFrame(columns=q1_knowledge.columns,
index=[comparable_date])
class BlazePreviousEstimateLoaderTestCase(PreviousEstimate):
@@ -572,8 +566,8 @@ class WithEstimateMultipleQuarters(WithEstimates):
# quarters out for each of the dataset columns.
assert_equal(sorted(np.array(q1_columns + q2_columns)),
sorted(results.columns.values))
assert_frame_equal(self.expected_out.sort(axis=1),
results.xs(0, level=1).sort(axis=1))
assert_equal(self.expected_out.sort(axis=1),
results.xs(0, level=1).sort(axis=1))
class NextEstimateMultipleQuarters(
@@ -581,17 +575,19 @@ class NextEstimateMultipleQuarters(
):
@classmethod
def make_loader(cls, events, columns):
return NextQuartersEstimatesLoader(events, columns)
return NextEarningsEstimatesLoader(events, columns)
@classmethod
def fill_expected_out(cls, expected):
# Fill columns for 1 Q out
for raw_name in cls.columns.values():
expected[raw_name + '1'].loc[
pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-11')
expected.loc[
pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-11'),
raw_name + '1'
] = cls.events[raw_name].iloc[0]
expected[raw_name + '1'].loc[
pd.Timestamp('2015-01-11'):pd.Timestamp('2015-01-20')
expected.loc[
pd.Timestamp('2015-01-11'):pd.Timestamp('2015-01-20'),
raw_name + '1'
] = cls.events[raw_name].iloc[1]
# Fill columns for 2 Q out
@@ -599,19 +595,23 @@ class NextEstimateMultipleQuarters(
# Q1's event happens; after Q1's event, we know 1 Q out but not 2 Qs
# out.
for col_name in ['estimate', 'event_date']:
expected[col_name + '2'].loc[
pd.Timestamp('2015-01-06'):pd.Timestamp('2015-01-10')
expected.loc[
pd.Timestamp('2015-01-06'):pd.Timestamp('2015-01-10'),
col_name + '2'
] = cls.events[col_name].iloc[1]
# But we know what FQ and FY we'd need in both Q1 and Q2
# because we know which FQ is next and can calculate from there
expected[FISCAL_QUARTER_FIELD_NAME + '2'].loc[
pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-09')
expected.loc[
pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-09'),
FISCAL_QUARTER_FIELD_NAME + '2'
] = 2
expected[FISCAL_QUARTER_FIELD_NAME + '2'].loc[
pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20')
expected.loc[
pd.Timestamp('2015-01-12'):pd.Timestamp('2015-01-20'),
FISCAL_QUARTER_FIELD_NAME + '2'
] = 3
expected[FISCAL_YEAR_FIELD_NAME + '2'].loc[
pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-20')
expected.loc[
pd.Timestamp('2015-01-01'):pd.Timestamp('2015-01-20'),
FISCAL_YEAR_FIELD_NAME + '2'
] = 2015
return expected
@@ -624,7 +624,7 @@ class PreviousEstimateMultipleQuarters(
@classmethod
def make_loader(cls, events, columns):
return PreviousQuartersEstimatesLoader(events, columns)
return PreviousEarningsEstimatesLoader(events, columns)
@classmethod
def fill_expected_out(cls, expected):
@@ -804,7 +804,7 @@ class WithEstimateWindows(WithEstimates):
class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase):
@classmethod
def make_loader(cls, events, columns):
return PreviousQuartersEstimatesLoader(events, columns)
return PreviousEarningsEstimatesLoader(events, columns)
@classmethod
def make_expected_timelines(cls):
@@ -867,7 +867,7 @@ class PreviousEstimateWindows(WithEstimateWindows, ZiplineTestCase):
class NextEstimateWindows(WithEstimateWindows, ZiplineTestCase):
@classmethod
def make_loader(cls, events, columns):
return NextQuartersEstimatesLoader(events, columns)
return NextEarningsEstimatesLoader(events, columns)
@classmethod
def make_expected_timelines(cls):
+19 -3
View File
@@ -1028,7 +1028,6 @@ class BlazeLoader(dict):
return odo(e[predicate][colnames], pd.DataFrame, **odo_kwargs)
lower, materialized_checkpoints = get_materialized_checkpoints(
checkpoints, colnames, lower_dt, odo_kwargs
)
@@ -1158,6 +1157,22 @@ def bind_expression_to_resources(expr, resources):
def get_materialized_checkpoints(checkpoints, colnames, lower_dt, odo_kwargs):
"""
Computes a lower bound and a DataFrame checkpoints.
Parameters
----------
checkpoints : Expr
Bound blaze expression for a checkpoints table from which to get a
computed lower bound.
colnames : iterable of str
The names of the columns for which checkpoints should be computed.
lower_dt : pd.Timestamp
The lower date being queried for that serves as an upper bound for
checkpoints.
odo_kwargs : dict, optional
The extra keyword arguments to pass to ``odo``.
"""
if checkpoints is not None:
ts = checkpoints[TS_FIELD_NAME]
checkpoints_ts = odo(ts[ts <= lower_dt].max(), pd.Timestamp)
@@ -1194,12 +1209,13 @@ def ffill_query_in_range(expr,
The lower date to query for.
upper : datetime
The upper date to query for.
checkpoints : Expr, optional
Bound blaze expression for a checkpoints table from which to get a
computed lower bound.
odo_kwargs : dict, optional
The extra keyword arguments to pass to ``odo``.
ts_field : str, optional
The name of the timestamp field in the given blaze expression.
sid_field : str, optional
The name of the sid field in the given blaze expression.
Returns
-------
+22 -5
View File
@@ -3,11 +3,18 @@ from datashape import istabular
from .core import (
bind_expression_to_resources,
)
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.blaze.utils import load_raw_data
from zipline.pipeline.loaders.quarter_estimates import (
NextQuartersEstimatesLoader,
PreviousQuartersEstimatesLoader,
from zipline.pipeline.loaders.earnings_estimates import (
NextEarningsEstimatesLoader,
PreviousEarningsEstimatesLoader,
required_estimates_fields,
)
from zipline.pipeline.loaders.utils import (
@@ -35,6 +42,9 @@ class BlazeEstimatesLoader(PipelineLoader):
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
The timezeone to use for the data query cutoff.
checkpoints : Expr, optional
The expression representing checkpointed data to be used for faster
forward-filling of data from `expr`.
Notes
-----
@@ -55,6 +65,13 @@ class BlazeEstimatesLoader(PipelineLoader):
If the '{TS_FIELD_NAME}' field is not included it is assumed that we
start the backtest with knowledge of all announcements.
"""
__doc__ = __doc__.format(
SID_FIELD_NAME=SID_FIELD_NAME,
TS_FIELD_NAME=TS_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME=FISCAL_YEAR_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME=FISCAL_QUARTER_FIELD_NAME,
EVENT_DATE_FIELD_NAME=EVENT_DATE_FIELD_NAME,
)
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
@@ -107,8 +124,8 @@ class BlazeEstimatesLoader(PipelineLoader):
class BlazeNextEstimatesLoader(BlazeEstimatesLoader):
loader = NextQuartersEstimatesLoader
loader = NextEarningsEstimatesLoader
class BlazePreviousEstimatesLoader(BlazeEstimatesLoader):
loader = PreviousQuartersEstimatesLoader
loader = PreviousEarningsEstimatesLoader
+6
View File
@@ -3,6 +3,8 @@ from datashape import istabular
from .core import (
bind_expression_to_resources,
)
from zipline.pipeline.common import SID_FIELD_NAME, TS_FIELD_NAME, \
EVENT_DATE_FIELD_NAME
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.pipeline.loaders.blaze.utils import load_raw_data
from zipline.pipeline.loaders.events import (
@@ -55,6 +57,10 @@ class BlazeEventsLoader(PipelineLoader):
start the backtest with knowledge of all announcements.
"""
__doc__ == __doc__.format(SID_FIELD_NAME=SID_FIELD_NAME,
TS_FIELD_NAME=TS_FIELD_NAME,
EVENT_DATE_FIELD_NAME=EVENT_DATE_FIELD_NAME)
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
expr,
+7 -4
View File
@@ -14,10 +14,11 @@ def load_raw_data(assets,
odo_kwargs,
checkpoints=None):
"""
given an expression representing data to load, perform normalization and
forward-filling and return the data, materialized.
Given an expression representing data to load, perform normalization and
forward-filling and return the data, materialized. Only accepts data with a
`sid` field.
parameters
Parameters
----------
assets : pd.int64index
the assets to load data for.
@@ -32,8 +33,10 @@ def load_raw_data(assets,
the expression representing the data to load.
odo_kwargs : dict
extra keyword arguments to pass to odo when executing the expression.
checkpoints : expr, optional
the expression representing the checkpointed data for `expr`.
returns
Returns
-------
raw : pd.dataframe
The result of computing expr and materializing the result as a
@@ -0,0 +1,619 @@
from collections import defaultdict
from abc import abstractmethod, abstractproperty
import pandas as pd
from six import viewvalues
from toolz import groupby
from zipline.lib.adjusted_array import AdjustedArray
from zipline.lib.adjustment import (
Datetime641DArrayOverwrite,
Datetime64Overwrite,
Float641DArrayOverwrite,
Float64Overwrite,
)
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.utils.numpy_utils import datetime64ns_dtype, float64_dtype
from zipline.pipeline.loaders.utils import (
ffill_across_cols,
last_in_date_group
)
INVALID_NUM_QTRS_MESSAGE = "Passed invalid number of quarters %s; " \
"must pass a number of quarters >= 0"
NEXT_FISCAL_QUARTER = 'next_fiscal_quarter'
NEXT_FISCAL_YEAR = 'next_fiscal_year'
NORMALIZED_QUARTERS = 'normalized_quarters'
PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter'
PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year'
SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters'
SIMULTATION_DATES = 'dates'
def normalize_quarters(years, quarters):
return years * 4 + quarters - 1
def split_normalized_quarters(normalized_quarters):
years = normalized_quarters // 4
quarters = normalized_quarters % 4
return years, quarters + 1
def required_estimates_fields(columns):
"""
Compute the set of resource columns required to serve
`columns`.
"""
# These metadata columns are used to align event indexers.
return {
TS_FIELD_NAME,
SID_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME
}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
viewvalues(columns),
)
def validate_column_specs(events, columns):
"""
Verify that the columns of ``events`` can be used by a
EarningsEstimatesLoader to serve the BoundColumns described by
`columns`.
"""
required = required_estimates_fields(columns)
received = set(events.columns)
missing = required - received
if missing:
raise ValueError(
"EarningsEstimatesLoader missing required columns {missing}.\n"
"Got Columns: {received}\n"
"Expected Columns: {required}".format(
missing=sorted(missing),
received=sorted(received),
required=sorted(required),
)
)
class EarningsEstimatesLoader(PipelineLoader):
"""
An abstract pipeline loader for estimates data that can load data a
variable number of quarters forwards/backwards from calendar dates
depending on the `num_quarters` attribute of the columns' dataset.
Parameters
----------
estimates : pd.DataFrame
The raw estimates data.
``estimates`` must contain at least 5 columns:
sid : int64
The asset id associated with each estimate.
event_date : datetime64[ns]
The date on which the event that the estimate is for will/has
occurred..
timestamp : datetime64[ns]
The date on which we learned about the estimate.
fiscal_quarter : int64
The quarter during which the event has/will occur.
fiscal_year : int64
The year during which the event has/will occur.
name_map : dict[str -> str]
A map of names of BoundColumns that this loader will load to the
names of the corresponding columns in `events`.
"""
def __init__(self,
estimates,
name_map):
validate_column_specs(
estimates,
name_map
)
self.estimates = estimates[
estimates[EVENT_DATE_FIELD_NAME].notnull() &
estimates[FISCAL_QUARTER_FIELD_NAME].notnull() &
estimates[FISCAL_YEAR_FIELD_NAME].notnull()
]
self.estimates[NORMALIZED_QUARTERS] = normalize_quarters(
self.estimates[FISCAL_YEAR_FIELD_NAME],
self.estimates[FISCAL_QUARTER_FIELD_NAME],
)
self.array_overwrites_dict = {datetime64ns_dtype:
Datetime641DArrayOverwrite,
float64_dtype: Float641DArrayOverwrite}
self.scalar_overwrites_dict = {datetime64ns_dtype: Datetime64Overwrite,
float64_dtype: Float64Overwrite}
self.name_map = name_map
@abstractmethod
def get_zeroth_quarter_idx(self, num_quarters, last, dates):
raise NotImplementedError('get_zeroth_quarter_idx')
@abstractmethod
def get_shifted_qtrs(self, zero_qtrs, num_quarters):
raise NotImplementedError('get_shifted_qtrs')
@abstractmethod
def create_overwrite_for_estimate(self,
column,
column_name,
last_per_qtr,
next_qtr_start_idx,
requested_quarter,
sid,
sid_idx):
raise NotImplementedError('create_overwrite_for_estimate')
@abstractproperty
def searchsorted_side(self):
return NotImplementedError('searchsorted_side')
def get_requested_quarter_data(self, stacked_last_per_qtr, idx, dates):
"""
Selects the requested data for each date.
Parameters
----------
stacked_last_per_qtr : pd.DataFrame
The latest estimate known with the dates, normalized quarter, and
sid as the index.
idx : pd.MultiIndex
The index of the row of the requested quarter from each date for
each sid.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
Returns
--------
requested_qtr_data : pd.DataFrame
The DataFrame with the latest values for the requested quarter
for all columns; `dates` are the index and columns are a MultiIndex
with sids at the top level and the dataset columns on the bottom.
"""
requested_qtr_data = stacked_last_per_qtr.loc[idx]
# We've lost the index names when doing `loc`, so set them here.
requested_qtr_data.index = requested_qtr_data.index.set_names(
idx.names
)
requested_qtr_data = requested_qtr_data.reset_index(
SHIFTED_NORMALIZED_QTRS
)
# Calculate the actual year/quarter being requested and add those in
# as columns.
(requested_qtr_data[FISCAL_YEAR_FIELD_NAME],
requested_qtr_data[FISCAL_QUARTER_FIELD_NAME]) = \
split_normalized_quarters(
requested_qtr_data[SHIFTED_NORMALIZED_QTRS]
)
# Once we're left with just dates as the index, we can reindex by all
# dates so that we have a value for each calendar date.
return requested_qtr_data.unstack(SID_FIELD_NAME).reindex(dates)
def get_adjustments(self,
zero_qtr_data,
requested_qtr_data,
last_per_qtr,
dates,
assets,
columns):
"""
Creates an AdjustedArray from the given estimates data for the given
dates.
Parameters
----------
zero_qtr_data : pd.DataFrame
The 'time zero' data for each calendar date per sid.
requested_qtr_data : pd.DataFrame
The requested quarter data for each calendar date per sid.
last_per_qtr : pd.DataFrame
A DataFrame with a column MultiIndex of [self.estimates.columns,
normalized_quarters, sid] that allows easily getting the timeline
of estimates for a particular sid for a particular quarter.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
assets : pd.Int64Index
An index of all the assets from the raw data.
columns : list of BoundColumn
The columns for which adjustments need to be calculated.
Returns
-------
adjusted_array : AdjustedArray
The array of data and overwrites for the given column.
"""
col_to_overwrites = defaultdict(dict)
# We no longer need NORMALIZED_QUARTERS in the index, but we do need it
# as a column to calculate adjustments.
zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS)
for sid_idx, sid in enumerate(assets):
zero_qtr_sid_data = zero_qtr_data[
zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid
]
# Determine where quarters are changing for this sid.
qtr_shifts = zero_qtr_sid_data[
zero_qtr_sid_data[NORMALIZED_QUARTERS] !=
zero_qtr_sid_data[NORMALIZED_QUARTERS].shift(1)
]
# On dates where we don't have any information about quarters,
# we will get nulls, and each of these will be interpreted as
# quarter shifts. We need to remove these here.
qtr_shifts = qtr_shifts[
qtr_shifts[NORMALIZED_QUARTERS].notnull()
]
# For the given sid, determine which quarters we have estimates
# for.
qtrs_with_estimates_for_sid = last_per_qtr.xs(
sid, axis=1, level=SID_FIELD_NAME
).groupby(axis=1, level=1).first().columns.values
for row_indexer in list(qtr_shifts.index):
# Find the starting index of the quarter that comes right
# after this row. This isn't the starting index of the
# requested quarter, but simply the date we cross over into a
# new quarter.
next_qtr_start_idx = dates.searchsorted(
zero_qtr_data.loc[
row_indexer
][EVENT_DATE_FIELD_NAME],
side=self.searchsorted_side
)
# Only add adjustments if the next quarter starts somewhere in
# our date index for this sid. Our 'next' quarter can never
# start at index 0; a starting index of 0 means that the next
# quarter's event date was NaT.
if 0 < next_qtr_start_idx < len(dates):
self.create_overwrite_for_quarter(
col_to_overwrites,
next_qtr_start_idx,
last_per_qtr,
qtrs_with_estimates_for_sid,
requested_qtr_data,
sid,
sid_idx,
columns,
)
return col_to_overwrites
def create_overwrite_for_quarter(self,
col_to_overwrites,
next_qtr_start_idx,
last_per_qtr,
quarters_with_estimates_for_sid,
requested_qtr_data,
sid,
sid_idx,
columns):
"""
Add entries to the dictionary of columns to adjustments for the given
sid and the given quarter.
Parameters
----------
col_to_overwrites : dict [column_name -> list of ArrayAdjustment]
A dictionary mapping column names to all overwrites for those
columns.
next_qtr_start_idx : int
The index of the first day of the next quarter in the calendar
dates.
last_per_qtr : pd.DataFrame
A DataFrame with a column MultiIndex of [self.estimates.columns,
normalized_quarters, sid] that allows easily getting the timeline
of estimates for a particular sid for a particular quarter; this
is particularly useful for getting adjustments for 'next'
estimates.
quarters_with_estimates_for_sid : np.array
An array of all quarters for which there are estimates for the
given sid.
sid : int
The sid for which to create overwrites.
sid_idx : int
The index of the sid in `assets`.
columns : list of BoundColumn
The columns for which to create overwrites.
"""
# Find the quarter being requested in the quarter we're
# crossing into.
requested_quarter = requested_qtr_data[
SHIFTED_NORMALIZED_QTRS
][sid].iloc[next_qtr_start_idx]
for col in columns:
column_name = self.name_map[col.name]
# If there are estimates for the requested quarter,
# overwrite all values going up to the starting index of
# that quarter with estimates for that quarter.
if requested_quarter in quarters_with_estimates_for_sid:
col_to_overwrites[column_name][next_qtr_start_idx] = \
[self.create_overwrite_for_estimate(
col,
column_name,
last_per_qtr,
next_qtr_start_idx,
requested_quarter,
sid,
sid_idx
)]
# There are no estimates for the quarter. Overwrite all
# values going up to the starting index of that quarter
# with the missing value for this column.
else:
col_to_overwrites[column_name][next_qtr_start_idx] =\
[self.overwrite_with_null(
col,
last_per_qtr.index,
next_qtr_start_idx,
sid_idx
)]
def overwrite_with_null(self,
column,
dates,
next_qtr_start_idx,
sid_idx):
return self.scalar_overwrites_dict[column.dtype](
0,
next_qtr_start_idx - 1,
sid_idx,
sid_idx,
column.missing_value
)
def load_adjusted_array(self, columns, dates, assets, mask):
# Separate out getting the columns' datasets and the datasets'
# num_quarters attributes to ensure that we're catching the right
# AttributeError.
col_to_datasets = {col: col.dataset for col in columns}
try:
groups = groupby(lambda col: col_to_datasets[col].num_quarters,
col_to_datasets)
except AttributeError:
raise AttributeError("Datasets loaded via the "
"EarningsEstimatesLoader must define a "
"`num_quarters` attribute that defines how "
"many quarters out the loader should load "
"the data relative to `dates`.")
if any(num_qtr < 0 for num_qtr in groups):
raise ValueError(
INVALID_NUM_QTRS_MESSAGE % ','.join(
str(qtr) for qtr in groups if qtr < 0
)
)
out = {}
# To optimize performance, only work below on assets that are
# actually in the raw data.
assets_with_data = set(assets) & set(self.estimates[SID_FIELD_NAME])
for num_quarters, columns in groups.items():
last_per_qtr, stacked_last_per_qtr = self.get_last_data_per_qtr(
assets_with_data, columns, dates
)
# Determine which quarter is immediately next/previous for each
# date.
zeroth_quarter_idx = self.get_zeroth_quarter_idx(
num_quarters, stacked_last_per_qtr
)
zero_qtr_data = stacked_last_per_qtr.loc[zeroth_quarter_idx]
# Doing it this way because creating a MultiIndex from scratch
# results in being unable to unstack sids because of duplicate
# values, even though the MultiIndex is created with the same
# exact values as below - possible pandas bug.
requested_qtr_idx = zero_qtr_data.reset_index(
NORMALIZED_QUARTERS
).set_index(
self.get_shifted_qtrs(
zeroth_quarter_idx.get_level_values(NORMALIZED_QUARTERS),
num_quarters
),
append=True
).index
requested_qtr_idx = requested_qtr_idx.rename(
SHIFTED_NORMALIZED_QTRS, -1
)
requested_qtr_data = self.get_requested_quarter_data(
stacked_last_per_qtr, requested_qtr_idx, dates
)
# Calculate all adjustments for the given quarter and accumulate
# them for each column.
col_to_adjustments = self.get_adjustments(zero_qtr_data,
requested_qtr_data,
last_per_qtr,
dates,
assets_with_data,
columns)
for col in columns:
column_name = self.name_map[col.name]
# We may have dropped assets if they never have any data for
# the requested quarter.
df = pd.DataFrame(data=requested_qtr_data[column_name],
index=dates,
columns=assets,
dtype=col.dtype)
out[col] = AdjustedArray(
df.values.astype(col.dtype),
mask,
dict(col_to_adjustments[column_name]),
col.missing_value,
)
return out
def get_last_data_per_qtr(self, assets_with_data, columns, dates):
"""
Determine the last piece of information we know for each column on each
date in the index for each sid and quarter.
Parameters
----------
assets_with_data : pd.Index
Index of all assets that appear in the raw data given to the
loader.
columns : iterable of BoundColumn
The columns that need to be loaded from the raw data.
dates : pd.DatetimeIndex
The calendar of dates for which data should be loaded.
Returns
-------
stacked_last_per_qtr : pd.DataFrame
A DataFrame indexed by [dates, sid, normalized_quarters] that has
the latest information for each row of the index, sorted by event
date.
last_per_qtr : pd.DataFrame
A DataFrame with columns that are a MultiIndex of [
self.estimates.columns, normalized_quarters, sid].
"""
# Get a DataFrame indexed by date with a MultiIndex of columns of [
# self.estimates.columns, normalized_quarters, sid], where each cell
# contains the latest data for that day.
last_per_qtr = last_in_date_group(
self.estimates, dates, assets_with_data, reindex=True,
extra_groupers=[NORMALIZED_QUARTERS]
)
# Forward fill values for each quarter/sid/dataset column.
ffill_across_cols(last_per_qtr, columns, self.name_map)
# Stack quarter and sid into the index.
stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME,
NORMALIZED_QUARTERS])
# Set date index name for ease of reference
stacked_last_per_qtr.index.set_names(SIMULTATION_DATES,
level=0,
inplace=True)
stacked_last_per_qtr = stacked_last_per_qtr.sort(
EVENT_DATE_FIELD_NAME
)
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] = pd.to_datetime(
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME]
)
return last_per_qtr, stacked_last_per_qtr
class NextEarningsEstimatesLoader(EarningsEstimatesLoader):
@property
def searchsorted_side(self):
return 'right'
def create_overwrite_for_estimate(self,
column,
column_name,
last_per_qtr,
next_qtr_start_idx,
requested_quarter,
sid,
sid_idx):
return self.array_overwrites_dict[column.dtype](
0,
# overwrite thru last qtr
next_qtr_start_idx - 1,
sid_idx,
sid_idx,
last_per_qtr[
column_name,
requested_quarter,
sid
][:next_qtr_start_idx].values)
def get_shifted_qtrs(self, zero_qtrs, num_quarters):
return zero_qtrs + (num_quarters - 1)
def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the next quarter by picking out the upcoming release for
each date in the index.
Parameters
----------
num_quarters : int
Number of quarters to go out in the future.
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
index values, sorted by event date.
Returns
-------
next_releases_per_date_index : pd.MultiIndex
An index of calendar dates, sid, and normalized quarters, for only
the rows that have a next event.
"""
next_releases_per_date = stacked_last_per_qtr.loc[
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >=
stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES)
].groupby(
level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False
).nth(0)
return next_releases_per_date.index
class PreviousEarningsEstimatesLoader(EarningsEstimatesLoader):
@property
def searchsorted_side(self):
return 'left'
def create_overwrite_for_estimate(self,
column,
column_name,
dates,
next_qtr_start_idx,
requested_quarter,
sid,
sid_idx):
return self.overwrite_with_null(column,
dates,
next_qtr_start_idx,
sid_idx)
def get_shifted_qtrs(self, zero_qtrs, num_quarters):
return zero_qtrs - (num_quarters - 1)
def get_zeroth_quarter_idx(self, num_quarters, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the previous quarter by picking out the most recent
release relative to each date in the index.
Parameters
----------
num_quarters : int
Number of quarters to go out in the past.
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
index values, sorted by event date.
Returns
-------
previous_releases_per_date_index : pd.MultiIndex
An index of calendar dates, sid, and normalized quarters, for only
the rows that have a previous event.
"""
previous_releases_per_date = stacked_last_per_qtr.loc[
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <=
stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES)
].groupby(
level=[SIMULTATION_DATES, SID_FIELD_NAME], as_index=False
).nth(-1)
return previous_releases_per_date.index
@@ -1,529 +0,0 @@
from collections import defaultdict
from abc import abstractmethod
import numpy as np
import pandas as pd
from six import viewvalues
from toolz import groupby
from zipline.lib.adjusted_array import AdjustedArray
from zipline.lib.adjustment import (Datetime641DArrayOverwrite,
Float641DArrayOverwrite)
from zipline.pipeline.common import (
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME,
SID_FIELD_NAME,
TS_FIELD_NAME,
)
from zipline.pipeline.loaders.base import PipelineLoader
from zipline.utils.numpy_utils import datetime64ns_dtype
from zipline.pipeline.loaders.utils import (
ffill_across_cols,
last_in_date_group
)
INVALID_NUM_QTRS_MESSAGE = "Passed invalid number of quarters %s; " \
"must pass a number of quarters >= 0"
NEXT_FISCAL_QUARTER = 'next_fiscal_quarter'
NEXT_FISCAL_YEAR = 'next_fiscal_year'
NORMALIZED_QUARTERS = 'normalized_quarters'
PREVIOUS_FISCAL_QUARTER = 'previous_fiscal_quarter'
PREVIOUS_FISCAL_YEAR = 'previous_fiscal_year'
SHIFTED_NORMALIZED_QTRS = 'shifted_normalized_quarters'
SIMULTATION_DATES = 'dates'
def normalize_quarters(years, quarters):
return years * 4 + quarters - 1
def split_normalized_quarters(normalized_quarters):
years = normalized_quarters // 4
quarters = normalized_quarters % 4
return years, quarters + 1
def required_estimates_fields(columns):
"""
Compute the set of resource columns required to serve
`columns`.
"""
# These metadata columns are used to align event indexers.
return {
TS_FIELD_NAME,
SID_FIELD_NAME,
EVENT_DATE_FIELD_NAME,
FISCAL_QUARTER_FIELD_NAME,
FISCAL_YEAR_FIELD_NAME
}.union(
# We also expect any of the field names that our loadable columns
# are mapped to.
viewvalues(columns),
)
def validate_column_specs(events, columns):
"""
Verify that the columns of ``events`` can be used by a
QuarterEstimatesLoader to serve the BoundColumns described by
`columns`.
"""
required = required_estimates_fields(columns)
received = set(events.columns)
missing = required - received
if missing:
raise ValueError(
"QuarterEstimatesLoader missing required columns {missing}.\n"
"Got Columns: {received}\n"
"Expected Columns: {required}".format(
missing=sorted(missing),
received=sorted(received),
required=sorted(required),
)
)
class QuarterEstimatesLoader(PipelineLoader):
def __init__(self,
estimates,
name_map):
validate_column_specs(
estimates,
name_map
)
self.estimates = estimates[
estimates[EVENT_DATE_FIELD_NAME].notnull() &
estimates[FISCAL_QUARTER_FIELD_NAME].notnull() &
estimates[FISCAL_YEAR_FIELD_NAME].notnull()
]
self.estimates[NORMALIZED_QUARTERS] = normalize_quarters(
self.estimates[FISCAL_YEAR_FIELD_NAME],
self.estimates[FISCAL_QUARTER_FIELD_NAME],
)
self.name_map = name_map
@abstractmethod
def load_quarters(self, num_quarters, last, dates):
raise NotImplementedError('load_quarters')
def get_requested_data_for_col(self, stacked_last_per_qtr, idx, dates):
"""
Selects the requested data for each date.
Parameters
----------
stacked_last_per_qtr : pd.DataFrame
The latest estimate known with the dates, normalized quarter, and
sid as the index.
idx : pd.MultiIndex
The index of the row of the requested quarter from each date for
each sid.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
Returns
--------
requested_qtr_data : pd.DataFrame
The DataFrame with the latest values for the requested quarter
for all columns; `dates` are the index and columns are a MultiIndex
with sids at the top level and the dataset columns on the bottom.
"""
requested_qtr_data = stacked_last_per_qtr.loc[idx]
# We no longer need the shifted normalized quarters in the index, but
# we do need it as a column to calculate adjustments.
requested_qtr_data = requested_qtr_data.reset_index(
SHIFTED_NORMALIZED_QTRS
)
# Calculate the actual year/quarter being requested and add those in
# as columns.
(requested_qtr_data[FISCAL_YEAR_FIELD_NAME],
requested_qtr_data[FISCAL_QUARTER_FIELD_NAME]) = \
split_normalized_quarters(
requested_qtr_data[SHIFTED_NORMALIZED_QTRS]
)
# Move sids into the columns. Once we're left with just dates
# as the index, we can reindex by all dates so that we have a
# value for each calendar date.
requested_qtr_data = requested_qtr_data.unstack(
SID_FIELD_NAME
).reindex(dates)
return requested_qtr_data
def get_adjustments(self,
zero_qtr_data,
requested_qtr_data,
last_per_qtr,
dates,
assets,
columns):
"""
Creates an AdjustedArray from the given estimates data for the given
dates.
Parameters
----------
zero_qtr_data : pd.DataFrame
The 'time zero' data for each date/sid.
zero_qtr_data : pd.DataFrame
The data for the requested quarter.
last_per_qtr : pd.DataFrame
The latest estimate known per sid per date per quarter with
dates as the index and normalized quarter and sid in the columns
MultiIndex; allows easy access to the timeline of estimates
across all dates for a sid for a particular quarter.
dates : pd.DatetimeIndex
The calendar dates for which estimates data is requested.
column_name : string
The name of the column for which the AdjustedArray is being
computed.
column : BoundColumn
The column for which the AdjustedArray is being computed.
mask : np.array
Mask array of dimensions len(dates) X len(assets).
assets : pd.Int64Index
An index of all the assets from the raw data.
Returns
-------
adjusted_array : AdjustedArray
The array of data and overwrites for the given column.
"""
col_to_adjustments = defaultdict(dict)
# We no longer need this in the index, but we do need it as a column
# to calculate adjustments.
zero_qtr_data = zero_qtr_data.reset_index(NORMALIZED_QUARTERS)
for sid_idx, sid in enumerate(assets):
zero_qtr_sid_data = zero_qtr_data[
zero_qtr_data.index.get_level_values(SID_FIELD_NAME) == sid
]
# Determine where quarters are changing for this sid.
qtr_shifts = zero_qtr_sid_data[
zero_qtr_sid_data[NORMALIZED_QUARTERS] !=
zero_qtr_sid_data[NORMALIZED_QUARTERS].shift(1)
]
# On dates where we don't have any information about quarters,
# we will get nulls, and each of these will be interpreted as
# quarter shifts. We need to remove these here.
qtr_shifts = qtr_shifts[
qtr_shifts[NORMALIZED_QUARTERS].notnull()
]
# For the given sid, determine which quarters we have estimates
# for.
qtrs_with_estimates_for_sid = last_per_qtr.xs(
sid, axis=1, level=SID_FIELD_NAME
).groupby(axis=1, level=1).first().columns.values
for row_indexer in list(qtr_shifts.index):
# Find the starting index of the quarter that comes right
# after this row. This isn't the starting index of the
# requested quarter, but simply the date we cross over into a
# new quarter.
next_qtr_start_idx = dates.searchsorted(
zero_qtr_data.loc[
row_indexer
][EVENT_DATE_FIELD_NAME],
side='left'
if isinstance(self, PreviousQuartersEstimatesLoader)
else 'right'
)
self.create_overwrite_for_quarter(
col_to_adjustments,
next_qtr_start_idx,
dates,
last_per_qtr,
qtrs_with_estimates_for_sid,
requested_qtr_data,
sid,
sid_idx,
columns,
)
return col_to_adjustments
def create_overwrite_for_quarter(self,
col_to_adjustments,
next_qtr_start_idx,
dates,
last_per_qtr,
quarters_with_estimates_for_sid,
requested_qtr_data,
sid,
sid_idx,
columns):
overwrites_dict = {}
for col in columns:
if col.dtype == datetime64ns_dtype:
overwrites_dict[col] = Datetime641DArrayOverwrite
else:
overwrites_dict[col] = Float641DArrayOverwrite
# Only add adjustments if the next quarter starts somewhere in
# our date index for this sid. Our 'next' quarter can never
# start at index 0; a starting index of 0 means that the next
# quarter's event date was NaT.
if 0 < next_qtr_start_idx < len(dates):
# Find the quarter being requested in the quarter we're
# crossing into.
requested_quarter = requested_qtr_data[
SHIFTED_NORMALIZED_QTRS
][sid].iloc[next_qtr_start_idx]
for col in columns:
column_name = self.name_map[col.name]
# If there are estimates for the requested quarter,
# overwrite all values going up to the starting index of
# that quarter with estimates for that quarter.
if requested_quarter in quarters_with_estimates_for_sid:
col_to_adjustments[column_name][next_qtr_start_idx] = \
self.create_overwrite_for_estimate(
col,
column_name,
last_per_qtr,
next_qtr_start_idx,
overwrites_dict[col],
requested_quarter,
sid,
sid_idx
)
# There are no estimates for the quarter. Overwrite all
# values going up to the starting index of that quarter
# with the missing value for this column.
else:
col_to_adjustments[column_name][next_qtr_start_idx] =\
self.overwrite_with_null(
col,
last_per_qtr,
next_qtr_start_idx,
overwrites_dict[col],
sid_idx
)
def overwrite_with_null(self,
column,
last_per_qtr,
next_qtr_start_idx,
overwrite,
sid_idx):
return [overwrite(
0,
next_qtr_start_idx - 1,
sid_idx,
sid_idx,
np.full(
len(
last_per_qtr.index[:next_qtr_start_idx]
),
column.missing_value,
dtype=column.dtype
))]
def load_adjusted_array(self, columns, dates, assets, mask):
# Separate out getting the columns' datasets and the datasets'
# num_quarters attributes to ensure that we're catching the right
# AttributeError.
col_to_datasets = {col: col.dataset for col in columns}
try:
groups = groupby(lambda col: col_to_datasets[col].num_quarters,
col_to_datasets)
except AttributeError:
raise AttributeError("Datasets loaded via the "
"QuarterEstimatesLoader must define a "
"`num_quarters` attribute that defines how "
"many quarters out the loader should load "
"the data relative to `dates`.")
if any(num_qtr < 0 for num_qtr in groups):
raise ValueError(
INVALID_NUM_QTRS_MESSAGE % ','.join(
str(qtr) for qtr in groups if qtr < 0
)
)
out = {}
for num_quarters, columns in groups.items():
# Determine the last piece of information we know for each column
# on each date in the index for each sid and quarter.
last_per_qtr = last_in_date_group(
self.estimates, dates, assets, reindex=True,
extra_groupers=[NORMALIZED_QUARTERS]
)
# Forward fill values for each quarter/sid/dataset column.
ffill_across_cols(last_per_qtr, columns, self.name_map)
# Stack quarter and sid into the index.
stacked_last_per_qtr = last_per_qtr.stack([SID_FIELD_NAME,
NORMALIZED_QUARTERS])
# Set date index name for ease of reference
stacked_last_per_qtr.index.set_names(SIMULTATION_DATES,
level=0,
inplace=True)
# We want to know the most recent/next event relative to each date.
stacked_last_per_qtr = stacked_last_per_qtr.sort(
EVENT_DATE_FIELD_NAME
)
# Determine which quarter is next/previous for each date.
shifted_qtr_data = self.load_quarters(num_quarters,
stacked_last_per_qtr)
zero_qtr_idx = shifted_qtr_data.index
requested_qtr_idx = shifted_qtr_data.set_index([
shifted_qtr_data.index.get_level_values(
SIMULTATION_DATES
),
shifted_qtr_data.index.get_level_values(
SID_FIELD_NAME
),
shifted_qtr_data[SHIFTED_NORMALIZED_QTRS]
]).index
requested_qtr_data = self.get_requested_data_for_col(
stacked_last_per_qtr, requested_qtr_idx, dates
)
zero_qtr_data = stacked_last_per_qtr.loc[zero_qtr_idx]
col_to_adjustments = self.get_adjustments(zero_qtr_data,
requested_qtr_data,
last_per_qtr,
dates,
assets,
columns)
for col in columns:
column_name = self.name_map[col.name]
# We may have dropped assets if they never have any data for the
# requested quarter.
df = pd.DataFrame(data=requested_qtr_data[column_name],
index=dates,
columns=assets,
dtype=col.dtype)
out[col] = AdjustedArray(
df.values.astype(col.dtype),
mask,
dict(col_to_adjustments[column_name]),
col.missing_value,
)
return out
class NextQuartersEstimatesLoader(QuarterEstimatesLoader):
def create_overwrite_for_estimate(self,
column,
column_name,
last_per_qtr,
next_qtr_start_idx,
overwrite,
requested_quarter,
sid,
sid_idx):
return [overwrite(
0,
# overwrite thru last qtr
next_qtr_start_idx - 1,
sid_idx,
sid_idx,
last_per_qtr[
column_name,
requested_quarter,
sid
][0:next_qtr_start_idx].values)]
def load_quarters(self, num_quarters, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the next quarter by picking out the upcoming release for
each date in the index. Adda a SHIFTED_NORMALIZED_QTRS column which
contains the requested next quarter for each calendar date and sid.
Parameters
----------
num_quarters : int
Number of quarters to go out in the future.
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
index values, sorted by event date.
Returns
-------
next_releases_per_date : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters, keeping only rows with next event information relative to
the index values and with an added column for
SHIFTED_NORMALIZED_QTRS, which contains the requested quarter for
each row.
"""
# We reset the index here because in pandas3, a groupby on the index
# will set the index to just the items in the groupby, so we will lose
# the normalized quarters.
next_releases_per_date = stacked_last_per_qtr.loc[
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] >=
stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES)
].reset_index(NORMALIZED_QUARTERS).groupby(
level=[SIMULTATION_DATES, SID_FIELD_NAME]
).nth(0).set_index(NORMALIZED_QUARTERS, append=True)
next_releases_per_date[
SHIFTED_NORMALIZED_QTRS
] = next_releases_per_date.index.get_level_values(
NORMALIZED_QUARTERS
) + (num_quarters - 1)
return next_releases_per_date
class PreviousQuartersEstimatesLoader(QuarterEstimatesLoader):
def create_overwrite_for_estimate(self,
column,
column_name,
last_per_qtr,
next_qtr_start_idx,
overwrite,
requested_quarter,
sid,
sid_idx):
return self.overwrite_with_null(column,
last_per_qtr,
next_qtr_start_idx,
overwrite,
sid_idx)
def load_quarters(self, num_quarters, stacked_last_per_qtr):
"""
Filters for releases that are on or after each simulation date and
determines the previous quarter by picking out the most recent
release relative to each date in the index. Adds a
SHIFTED_NORMALIZED_QTRS column which contains the requested previous
quarter for each calendar date and sid.
Parameters
----------
num_quarters : int
Number of quarters to go out in the past.
stacked_last_per_qtr : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters with each row being the latest estimate for the row's
index values, sorted by event date.
Returns
-------
next_releases_per_date : pd.DataFrame
A DataFrame with index of calendar dates, sid, and normalized
quarters, keeping only rows with have a previous event relative
to the index values and with an added column for
SHIFTED_NORMALIZED_QTRS, which contains the requested quarter for
each row.
"""
# We reset the index here because in pandas3, a groupby on the index
# will set the index to just the items in the groupby, so we will lose
# the normalized quarters.
previous_releases_per_date = stacked_last_per_qtr.loc[
stacked_last_per_qtr[EVENT_DATE_FIELD_NAME] <=
stacked_last_per_qtr.index.get_level_values(SIMULTATION_DATES)
].reset_index(NORMALIZED_QUARTERS).groupby(
level=[SIMULTATION_DATES, SID_FIELD_NAME]
).nth(-1).set_index(NORMALIZED_QUARTERS, append=True)
previous_releases_per_date[
SHIFTED_NORMALIZED_QTRS
] = previous_releases_per_date.index.get_level_values(
NORMALIZED_QUARTERS
) - (num_quarters - 1)
return previous_releases_per_date
+5 -2
View File
@@ -320,8 +320,11 @@ def last_in_date_group(df, dates, assets, reindex=True, have_sids=True,
).last()
# For the number of things that we're grouping by (except TS), unstack
# the df
last_in_group = last_in_group.unstack(list(range(-1, -len(idx), -1)))
# the df. Done this way because of an unresolved pandas bug whereby
# passing a list of levels with mixed dtypes to unstack causes the
# resulting DataFrame to have all object-type columns.
for _ in range(len(idx) - 1):
last_in_group = last_in_group.unstack(-1)
if reindex:
if have_sids:
+2 -3
View File
@@ -34,14 +34,13 @@ from ..finance.trading import TradingEnvironment
from ..utils import factory
from ..utils.classproperty import classproperty
from ..utils.final import FinalMeta, final
from .core import (tmp_asset_finder, make_simple_equity_info)
from .core import tmp_asset_finder, make_simple_equity_info
from zipline.assets import Equity, Future
from zipline.pipeline import SimplePipelineEngine
from zipline.pipeline.loaders.testing import make_seeded_random_loader
from zipline.utils.calendars import (
get_calendar,
register_calendar
)
register_calendar)
class ZiplineTestCase(with_metaclass(FinalMeta, TestCase)):