TST: add arg to test assertion

This commit is contained in:
Maya Tydykov
2017-01-25 09:52:35 -05:00
parent bd06ff14ed
commit dda5334580
2 changed files with 160 additions and 54 deletions
+141 -43
View File
@@ -1247,12 +1247,14 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
start,
end,
window_length,
compute_fn):
compute_fn,
apply_deltas_adjustments=True):
loader = BlazeLoader()
ds = from_blaze(
expr,
deltas,
checkpoints,
apply_deltas_adjustments=apply_deltas_adjustments,
loader=loader,
no_deltas_rule='raise',
no_checkpoints_rule='ignore',
@@ -1480,7 +1482,7 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
name='delta',
dshape=self.dshape,
)
expected_views = keymap(pd.Timestamp, {
expected_views_all_deltas = keymap(pd.Timestamp, {
'2014-01-03': np.array([[10.0, 11.0, 12.0],
[10.0, 11.0, 12.0],
[10.0, 11.0, 12.0]]),
@@ -1488,14 +1490,47 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
[10.0, 11.0, 12.0],
[11.0, 12.0, 13.0]]),
})
if len(asset_info) == 4:
expected_views = valmap(
lambda view: np.c_[view, [np.nan, np.nan, np.nan]],
# The only novel delta is on 2014-01-05, because it modifies a
# baseline data point that occurred on 2014-01-04, which is on a
# Saturday. The other delta, occurring on 2014-01-02, is seen after
# we already see the baseline data it modifies, and so it is a
# non-novel delta. Thus, the only delta seen in the expected view for
# novel deltas is on 2014-01-06 at (2, 0), (2, 1), and (2, 2).
expected_views_novel_deltas = keymap(pd.Timestamp, {
'2014-01-03': np.array([[0.0, 1.0, 2.0],
[0.0, 1.0, 2.0],
[0.0, 1.0, 2.0]]),
'2014-01-06': np.array([[0.0, 1.0, 2.0],
[0.0, 1.0, 2.0],
[11.0, 12.0, 13.0]]),
})
def get_fourth_asset_view(expected_views, window_length):
return valmap(
lambda view: np.c_[view, [np.nan] * window_length],
expected_views,
)
expected_output_buffer = [10, 11, 12, np.nan, 11, 12, 13, np.nan]
if len(asset_info) == 4:
expected_views_all_deltas = get_fourth_asset_view(
expected_views_all_deltas, window_length=3
)
expected_views_novel_deltas = get_fourth_asset_view(
expected_views_novel_deltas, window_length=3
)
expected_output_buffer_all_deltas = [
10, 11, 12, np.nan, 11, 12, 13, np.nan
]
expected_output_buffer_novel_deltas = [
0, 1, 2, np.nan, 11, 12, 13, np.nan
]
else:
expected_output_buffer = [10, 11, 12, 11, 12, 13]
expected_output_buffer_all_deltas = [
10, 11, 12, 11, 12, 13
]
expected_output_buffer_novel_deltas = [
0, 1, 2, 11, 12, 13
]
cal = pd.DatetimeIndex([
pd.Timestamp('2014-01-01'),
@@ -1506,28 +1541,51 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
])
with tmp_asset_finder(equities=asset_info) as finder:
expected_output = pd.DataFrame(
expected_output_buffer,
expected_output_all_deltas = pd.DataFrame(
expected_output_buffer_all_deltas,
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
sorted(expected_views_all_deltas.keys()),
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
self._run_pipeline(
expr,
deltas,
None,
expected_views,
expected_output,
finder,
calendar=cal,
start=cal[2],
end=cal[-1],
window_length=3,
compute_fn=op.itemgetter(-1),
expected_output_novel_deltas = pd.DataFrame(
expected_output_buffer_novel_deltas,
index=pd.MultiIndex.from_product((
sorted(expected_views_novel_deltas.keys()),
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
it = (
(
True,
expected_views_all_deltas,
expected_output_all_deltas
),
(
False,
expected_views_novel_deltas,
expected_output_novel_deltas
)
)
for apply_deltas_adjs, expected_views, expected_output in it:
self._run_pipeline(
expr,
deltas,
None,
expected_views,
expected_output,
finder,
calendar=cal,
start=cal[2],
end=cal[-1],
window_length=3,
compute_fn=op.itemgetter(-1),
apply_deltas_adjustments=apply_deltas_adjs,
)
def test_novel_deltas_macro(self):
base_dates = pd.DatetimeIndex([
pd.Timestamp('2014-01-01'),
@@ -1547,7 +1605,7 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
)
nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
expected_views_all_deltas = keymap(pd.Timestamp, {
'2014-01-03': np.array([[10.0],
[10.0],
[10.0]]),
@@ -1555,6 +1613,20 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
[10.0],
[11.0]]),
})
# The only novel delta is on 2014-01-05, because it modifies a
# baseline data point that occurred on 2014-01-04, which is on a
# Saturday. The other delta, occurring on 2014-01-02, is seen after
# we already see the baseline data it modifies, and so it is a
# non-novel delta. Thus, the only delta seen in the expected view for
# novel deltas is on 2014-01-06 at (2, 0).
expected_views_novel_deltas = keymap(pd.Timestamp, {
'2014-01-03': np.array([[0.0],
[0.0],
[0.0]]),
'2014-01-06': np.array([[0.0],
[0.0],
[11.0]]),
})
cal = pd.DatetimeIndex([
pd.Timestamp('2014-01-01'),
@@ -1563,28 +1635,53 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
# omitting the 4th and 5th to simulate a weekend
pd.Timestamp('2014-01-06'),
])
def get_expected_output(expected_views, values, asset_info):
return pd.DataFrame(
list(concatv(*([value] * nassets for value in values))),
index=pd.MultiIndex.from_product(
(sorted(expected_views.keys()),
finder.retrieve_all(asset_info.index),)
), columns=('value',),
)
with tmp_asset_finder(equities=simple_asset_info) as finder:
expected_output = pd.DataFrame(
list(concatv([10] * nassets, [11] * nassets)),
index=pd.MultiIndex.from_product((
sorted(expected_views.keys()),
finder.retrieve_all(simple_asset_info.index),
)),
columns=('value',),
expected_output_all_deltas = get_expected_output(
expected_views_all_deltas,
[10, 11],
simple_asset_info,
)
self._run_pipeline(
expr,
deltas,
None,
expected_views,
expected_output,
finder,
calendar=cal,
start=cal[2],
end=cal[-1],
window_length=3,
compute_fn=op.itemgetter(-1),
expected_output_novel_deltas = get_expected_output(
expected_views_novel_deltas,
[0, 11],
simple_asset_info,
)
it = (
(
True,
expected_views_all_deltas,
expected_output_all_deltas
),
(
False,
expected_views_novel_deltas,
expected_output_novel_deltas
)
)
for apply_deltas_adjs, expected_views, expected_output in it:
self._run_pipeline(
expr,
deltas,
None,
expected_views,
expected_output,
finder,
calendar=cal,
start=cal[2],
end=cal[-1],
window_length=3,
compute_fn=op.itemgetter(-1),
apply_deltas_adjustments=apply_deltas_adjs,
)
def _test_checkpoints_macro(self, checkpoints, ffilled_value=-1.0):
"""Simple checkpoints test that accepts a checkpoints dataframe and
@@ -1805,7 +1902,8 @@ class MiscTestCase(ZiplineTestCase):
odo_kwargs={'a': 'b'},
)),
"ExprData(expr='expr', deltas='deltas',"
" checkpoints='checkpoints', odo_kwargs={'a': 'b'})",
" checkpoints='checkpoints', odo_kwargs={'a': 'b'}, "
"apply_deltas_adjustments=True)",
)
def test_blaze_loader_repr(self):
+19 -11
View File
@@ -206,12 +206,12 @@ is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types))
get__name__ = op.attrgetter('__name__')
class ExprData(
namedtuple(
'ExprData',
'expr deltas checkpoints odo_kwargs apply_deltas_adjustments'
)
):
_expr_data_base = namedtuple(
'ExprData', 'expr deltas checkpoints odo_kwargs apply_deltas_adjustments'
)
class ExprData(_expr_data_base):
"""A pair of expressions and data resources. The expressions will be
computed using the resources as the starting scope.
@@ -225,6 +225,9 @@ class ExprData(
The forward fill checkpoints for the data.
odo_kwargs : dict, optional
The keyword arguments to forward to the odo calls internally.
apply_deltas_adjustments : bool, optional
Whether or not deltas adjustments should be applied to the baseline
values. If False, only novel deltas will be applied.
"""
def __new__(cls,
expr,
@@ -567,10 +570,6 @@ def from_blaze(expr,
----------
expr : Expr
The blaze expression to use.
apply_deltas_adjustments : bool, optional
Whether or not deltas adjustments should be applied for this dataset.
True by default because not applying deltas adjustments is an exception
rather than the rule.
deltas : Expr, 'auto' or None, optional
The expression to use for the point in time adjustments.
If the string 'auto' is passed, a deltas expr will be looked up
@@ -604,6 +603,10 @@ def from_blaze(expr,
found. 'warn' says to raise a warning but continue.
'raise' says to raise an exception if no deltas can be found.
'ignore' says take no action and proceed with no deltas.
apply_deltas_adjustments : bool, optional
Whether or not deltas adjustments should be applied for this dataset.
True by default because not applying deltas adjustments is an exception
rather than the rule.
Returns
-------
@@ -1114,7 +1117,12 @@ class BlazeLoader(dict):
have_sids=have_sids)
ffill_across_cols(dense_output, columns, {c.name: c.name
for c in columns})
adjustments_from_deltas = lambda *args: {}
# By default, no non-novel deltas are applied.
def no_adjustments_from_deltas(*args):
return {}
adjustments_from_deltas = no_adjustments_from_deltas
if have_sids:
if apply_deltas_adjustments:
adjustments_from_deltas = adjustments_from_deltas_with_sids