From dda53345803affb308a75ffaf8ec01753165693b Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Wed, 25 Jan 2017 09:52:35 -0500 Subject: [PATCH] TST: add arg to test assertion --- tests/pipeline/test_blaze.py | 184 +++++++++++++++++++------ zipline/pipeline/loaders/blaze/core.py | 30 ++-- 2 files changed, 160 insertions(+), 54 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 68843a33..eec8fc97 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -1247,12 +1247,14 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): start, end, window_length, - compute_fn): + compute_fn, + apply_deltas_adjustments=True): loader = BlazeLoader() ds = from_blaze( expr, deltas, checkpoints, + apply_deltas_adjustments=apply_deltas_adjustments, loader=loader, no_deltas_rule='raise', no_checkpoints_rule='ignore', @@ -1480,7 +1482,7 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): name='delta', dshape=self.dshape, ) - expected_views = keymap(pd.Timestamp, { + expected_views_all_deltas = keymap(pd.Timestamp, { '2014-01-03': np.array([[10.0, 11.0, 12.0], [10.0, 11.0, 12.0], [10.0, 11.0, 12.0]]), @@ -1488,14 +1490,47 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): [10.0, 11.0, 12.0], [11.0, 12.0, 13.0]]), }) - if len(asset_info) == 4: - expected_views = valmap( - lambda view: np.c_[view, [np.nan, np.nan, np.nan]], + # The only novel delta is on 2014-01-05, because it modifies a + # baseline data point that occurred on 2014-01-04, which is on a + # Saturday. The other delta, occurring on 2014-01-02, is seen after + # we already see the baseline data it modifies, and so it is a + # non-novel delta. Thus, the only delta seen in the expected view for + # novel deltas is on 2014-01-06 at (2, 0), (2, 1), and (2, 2). + expected_views_novel_deltas = keymap(pd.Timestamp, { + '2014-01-03': np.array([[0.0, 1.0, 2.0], + [0.0, 1.0, 2.0], + [0.0, 1.0, 2.0]]), + '2014-01-06': np.array([[0.0, 1.0, 2.0], + [0.0, 1.0, 2.0], + [11.0, 12.0, 13.0]]), + }) + + def get_fourth_asset_view(expected_views, window_length): + return valmap( + lambda view: np.c_[view, [np.nan] * window_length], expected_views, ) - expected_output_buffer = [10, 11, 12, np.nan, 11, 12, 13, np.nan] + + if len(asset_info) == 4: + expected_views_all_deltas = get_fourth_asset_view( + expected_views_all_deltas, window_length=3 + ) + expected_views_novel_deltas = get_fourth_asset_view( + expected_views_novel_deltas, window_length=3 + ) + expected_output_buffer_all_deltas = [ + 10, 11, 12, np.nan, 11, 12, 13, np.nan + ] + expected_output_buffer_novel_deltas = [ + 0, 1, 2, np.nan, 11, 12, 13, np.nan + ] else: - expected_output_buffer = [10, 11, 12, 11, 12, 13] + expected_output_buffer_all_deltas = [ + 10, 11, 12, 11, 12, 13 + ] + expected_output_buffer_novel_deltas = [ + 0, 1, 2, 11, 12, 13 + ] cal = pd.DatetimeIndex([ pd.Timestamp('2014-01-01'), @@ -1506,28 +1541,51 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): ]) with tmp_asset_finder(equities=asset_info) as finder: - expected_output = pd.DataFrame( - expected_output_buffer, + expected_output_all_deltas = pd.DataFrame( + expected_output_buffer_all_deltas, index=pd.MultiIndex.from_product(( - sorted(expected_views.keys()), + sorted(expected_views_all_deltas.keys()), finder.retrieve_all(asset_info.index), )), columns=('value',), ) - self._run_pipeline( - expr, - deltas, - None, - expected_views, - expected_output, - finder, - calendar=cal, - start=cal[2], - end=cal[-1], - window_length=3, - compute_fn=op.itemgetter(-1), + expected_output_novel_deltas = pd.DataFrame( + expected_output_buffer_novel_deltas, + index=pd.MultiIndex.from_product(( + sorted(expected_views_novel_deltas.keys()), + finder.retrieve_all(asset_info.index), + )), + columns=('value',), ) + it = ( + ( + True, + expected_views_all_deltas, + expected_output_all_deltas + ), + ( + False, + expected_views_novel_deltas, + expected_output_novel_deltas + ) + ) + for apply_deltas_adjs, expected_views, expected_output in it: + self._run_pipeline( + expr, + deltas, + None, + expected_views, + expected_output, + finder, + calendar=cal, + start=cal[2], + end=cal[-1], + window_length=3, + compute_fn=op.itemgetter(-1), + apply_deltas_adjustments=apply_deltas_adjs, + ) + def test_novel_deltas_macro(self): base_dates = pd.DatetimeIndex([ pd.Timestamp('2014-01-01'), @@ -1547,7 +1605,7 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): ) nassets = len(simple_asset_info) - expected_views = keymap(pd.Timestamp, { + expected_views_all_deltas = keymap(pd.Timestamp, { '2014-01-03': np.array([[10.0], [10.0], [10.0]]), @@ -1555,6 +1613,20 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): [10.0], [11.0]]), }) + # The only novel delta is on 2014-01-05, because it modifies a + # baseline data point that occurred on 2014-01-04, which is on a + # Saturday. The other delta, occurring on 2014-01-02, is seen after + # we already see the baseline data it modifies, and so it is a + # non-novel delta. Thus, the only delta seen in the expected view for + # novel deltas is on 2014-01-06 at (2, 0). + expected_views_novel_deltas = keymap(pd.Timestamp, { + '2014-01-03': np.array([[0.0], + [0.0], + [0.0]]), + '2014-01-06': np.array([[0.0], + [0.0], + [11.0]]), + }) cal = pd.DatetimeIndex([ pd.Timestamp('2014-01-01'), @@ -1563,28 +1635,53 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): # omitting the 4th and 5th to simulate a weekend pd.Timestamp('2014-01-06'), ]) + + def get_expected_output(expected_views, values, asset_info): + return pd.DataFrame( + list(concatv(*([value] * nassets for value in values))), + index=pd.MultiIndex.from_product( + (sorted(expected_views.keys()), + finder.retrieve_all(asset_info.index),) + ), columns=('value',), + ) with tmp_asset_finder(equities=simple_asset_info) as finder: - expected_output = pd.DataFrame( - list(concatv([10] * nassets, [11] * nassets)), - index=pd.MultiIndex.from_product(( - sorted(expected_views.keys()), - finder.retrieve_all(simple_asset_info.index), - )), - columns=('value',), + expected_output_all_deltas = get_expected_output( + expected_views_all_deltas, + [10, 11], + simple_asset_info, ) - self._run_pipeline( - expr, - deltas, - None, - expected_views, - expected_output, - finder, - calendar=cal, - start=cal[2], - end=cal[-1], - window_length=3, - compute_fn=op.itemgetter(-1), + expected_output_novel_deltas = get_expected_output( + expected_views_novel_deltas, + [0, 11], + simple_asset_info, ) + it = ( + ( + True, + expected_views_all_deltas, + expected_output_all_deltas + ), + ( + False, + expected_views_novel_deltas, + expected_output_novel_deltas + ) + ) + for apply_deltas_adjs, expected_views, expected_output in it: + self._run_pipeline( + expr, + deltas, + None, + expected_views, + expected_output, + finder, + calendar=cal, + start=cal[2], + end=cal[-1], + window_length=3, + compute_fn=op.itemgetter(-1), + apply_deltas_adjustments=apply_deltas_adjs, + ) def _test_checkpoints_macro(self, checkpoints, ffilled_value=-1.0): """Simple checkpoints test that accepts a checkpoints dataframe and @@ -1805,7 +1902,8 @@ class MiscTestCase(ZiplineTestCase): odo_kwargs={'a': 'b'}, )), "ExprData(expr='expr', deltas='deltas'," - " checkpoints='checkpoints', odo_kwargs={'a': 'b'})", + " checkpoints='checkpoints', odo_kwargs={'a': 'b'}, " + "apply_deltas_adjustments=True)", ) def test_blaze_loader_repr(self): diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 0052029d..009e2ea6 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -206,12 +206,12 @@ is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) get__name__ = op.attrgetter('__name__') -class ExprData( - namedtuple( - 'ExprData', - 'expr deltas checkpoints odo_kwargs apply_deltas_adjustments' - ) -): +_expr_data_base = namedtuple( + 'ExprData', 'expr deltas checkpoints odo_kwargs apply_deltas_adjustments' +) + + +class ExprData(_expr_data_base): """A pair of expressions and data resources. The expressions will be computed using the resources as the starting scope. @@ -225,6 +225,9 @@ class ExprData( The forward fill checkpoints for the data. odo_kwargs : dict, optional The keyword arguments to forward to the odo calls internally. + apply_deltas_adjustments : bool, optional + Whether or not deltas adjustments should be applied to the baseline + values. If False, only novel deltas will be applied. """ def __new__(cls, expr, @@ -567,10 +570,6 @@ def from_blaze(expr, ---------- expr : Expr The blaze expression to use. - apply_deltas_adjustments : bool, optional - Whether or not deltas adjustments should be applied for this dataset. - True by default because not applying deltas adjustments is an exception - rather than the rule. deltas : Expr, 'auto' or None, optional The expression to use for the point in time adjustments. If the string 'auto' is passed, a deltas expr will be looked up @@ -604,6 +603,10 @@ def from_blaze(expr, found. 'warn' says to raise a warning but continue. 'raise' says to raise an exception if no deltas can be found. 'ignore' says take no action and proceed with no deltas. + apply_deltas_adjustments : bool, optional + Whether or not deltas adjustments should be applied for this dataset. + True by default because not applying deltas adjustments is an exception + rather than the rule. Returns ------- @@ -1114,7 +1117,12 @@ class BlazeLoader(dict): have_sids=have_sids) ffill_across_cols(dense_output, columns, {c.name: c.name for c in columns}) - adjustments_from_deltas = lambda *args: {} + + # By default, no non-novel deltas are applied. + def no_adjustments_from_deltas(*args): + return {} + + adjustments_from_deltas = no_adjustments_from_deltas if have_sids: if apply_deltas_adjustments: adjustments_from_deltas = adjustments_from_deltas_with_sids