From 5e4b8b8f8f9f61ba79de822eccf6fabb84393fe9 Mon Sep 17 00:00:00 2001 From: llllllllll Date: Mon, 2 Nov 2015 20:55:09 -0500 Subject: [PATCH] TST: tests for new forward fill --- tests/pipeline/test_blaze.py | 320 +++++++++++++++++++++++++++-------- 1 file changed, 251 insertions(+), 69 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index fbad3fda..18404cdc 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -295,8 +295,8 @@ class BlazeToPipelineTestCase(TestCase): loader=self.garbage_loader, ) - def test_id(self): - expr = bz.Data(self.df, name='expr', dshape=self.dshape) + def _test_id(self, df, dshape, expected, finder, add): + expr = bz.Data(df, name='expr', dshape=dshape) loader = BlazeLoader() ds = from_blaze( expr, @@ -304,7 +304,8 @@ class BlazeToPipelineTestCase(TestCase): no_deltas_rule=no_deltas_rules.ignore, ) p = Pipeline() - p.add(ds.value.latest, 'value') + for a in add: + p.add(getattr(ds, a).latest, a) dates = self.dates with tmp_asset_finder() as finder: @@ -314,13 +315,6 @@ class BlazeToPipelineTestCase(TestCase): finder, ).run_pipeline(p, dates[0], dates[-1]) - expected = self.df.drop('asof_date', axis=1).set_index( - ['timestamp', 'sid'], - ) - expected.index = pd.MultiIndex.from_product(( - expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), - )) assert_frame_equal(result, expected, check_dtype=False) def test_custom_query_time_tz(self): @@ -360,75 +354,263 @@ class BlazeToPipelineTestCase(TestCase): )) assert_frame_equal(result, expected, check_dtype=False) + def test_id(self): + """ + input (self.df): + asof_date sid timestamp value + 0 2014-01-01 65 2014-01-01 0 + 1 2014-01-01 66 2014-01-01 1 + 2 2014-01-01 67 2014-01-01 2 + 3 2014-01-02 65 2014-01-02 1 + 4 2014-01-02 66 2014-01-02 2 + 5 2014-01-02 67 2014-01-02 3 + 6 2014-01-03 65 2014-01-03 2 + 7 2014-01-03 66 2014-01-03 3 + 8 2014-01-03 67 2014-01-03 4 + + output (expected) + value + 2014-01-01 Equity(65 [A]) 0 + Equity(66 [B]) 1 + Equity(67 [C]) 2 + 2014-01-02 Equity(65 [A]) 1 + Equity(66 [B]) 2 + Equity(67 [C]) 3 + 2014-01-03 Equity(65 [A]) 2 + Equity(66 [B]) 3 + Equity(67 [C]) 4 + """ + with tmp_asset_finder() as finder: + expected = self.df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id(self.df, self.dshape, expected, finder, ('value',)) + + def test_id_ffill_out_of_window(self): + """ + input (df): + + asof_date timestamp sid other value + 0 2013-12-22 2013-12-22 65 0 0 + 1 2013-12-22 2013-12-22 66 NaN 1 + 2 2013-12-22 2013-12-22 67 2 NaN + 3 2013-12-23 2013-12-23 65 NaN 1 + 4 2013-12-23 2013-12-23 66 2 NaN + 5 2013-12-23 2013-12-23 67 3 3 + 6 2013-12-24 2013-12-24 65 2 NaN + 7 2013-12-24 2013-12-24 66 3 3 + 8 2013-12-24 2013-12-24 67 NaN 4 + + output (expected): + other value + 2014-01-01 Equity(65 [A]) 2 1 + Equity(66 [B]) 3 3 + Equity(67 [C]) 3 4 + 2014-01-02 Equity(65 [A]) 2 1 + Equity(66 [B]) 3 3 + Equity(67 [C]) 3 4 + 2014-01-03 Equity(65 [A]) 2 1 + Equity(66 [B]) 3 3 + Equity(67 [C]) 3 4 + """ + dates = self.dates.repeat(3) - timedelta(days=10) + df = pd.DataFrame({ + 'sid': self.sids * 3, + 'value': (0, 1, np.nan, 1, np.nan, 3, np.nan, 3, 4), + 'other': (0, np.nan, 2, np.nan, 2, 3, 2, 3, np.nan), + 'asof_date': dates, + 'timestamp': dates, + }) + fields = OrderedDict(self.dshape.measure.fields) + fields['other'] = fields['value'] + + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + np.array([[2, 1], + [3, 3], + [3, 4], + [2, 1], + [3, 3], + [3, 4], + [2, 1], + [3, 3], + [3, 4]]), + columns=['other', 'value'], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'other'), + ) + def test_id_multiple_columns(self): + """ + input (df): + asof_date sid timestamp value other + 0 2014-01-01 65 2014-01-01 0 1 + 1 2014-01-01 66 2014-01-01 1 2 + 2 2014-01-01 67 2014-01-01 2 3 + 3 2014-01-02 65 2014-01-02 1 2 + 4 2014-01-02 66 2014-01-02 2 3 + 5 2014-01-02 67 2014-01-02 3 4 + 6 2014-01-03 65 2014-01-03 2 3 + 7 2014-01-03 66 2014-01-03 3 4 + 8 2014-01-03 67 2014-01-03 4 5 + + output (expected): + value other + 2014-01-01 Equity(65 [A]) 0 1 + Equity(66 [B]) 1 2 + Equity(67 [C]) 2 3 + 2014-01-02 Equity(65 [A]) 1 2 + Equity(66 [B]) 2 3 + Equity(67 [C]) 3 4 + 2014-01-03 Equity(65 [A]) 2 3 + Equity(66 [B]) 3 4 + Equity(67 [C]) 4 5 + """ df = self.df.copy() df['other'] = df.value + 1 fields = OrderedDict(self.dshape.measure.fields) fields['other'] = fields['value'] - expr = bz.Data(df, name='expr', dshape=var * Record(fields)) - loader = BlazeLoader() - ds = from_blaze( - expr, - loader=loader, - no_deltas_rule='ignore', - ) - p = Pipeline() - p.add(ds.value.latest, 'value') - p.add(ds.other.latest, 'other') - dates = self.dates - with tmp_asset_finder() as finder: - result = SimplePipelineEngine( - loader, - dates, + expected = df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ).sort_index(axis=1) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id( + df, + var * Record(fields), + expected, finder, - ).run_pipeline(p, dates[0], dates[-1]) - - expected = df.drop('asof_date', axis=1).set_index( - ['timestamp', 'sid'], - ).sort_index(axis=1) - expected.index = pd.MultiIndex.from_product(( - expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), - )) - assert_frame_equal( - result, - expected.sort_index(axis=1), - check_dtype=False, - ) + ('value', 'other'), + ) def test_id_macro_dataset(self): - expr = bz.Data(self.macro_df, name='expr', dshape=self.macro_dshape) - loader = BlazeLoader() - ds = from_blaze( - expr, - loader=loader, - no_deltas_rule=no_deltas_rules.ignore, - ) - p = Pipeline() - p.add(ds.value.latest, 'value') - dates = self.dates + """ + input (self.macro_df) + asof_date timestamp value + 0 2014-01-01 2014-01-01 0 + 3 2014-01-02 2014-01-02 1 + 6 2014-01-03 2014-01-03 2 + output (expected): + value + 2014-01-01 Equity(65 [A]) 0 + Equity(66 [B]) 0 + Equity(67 [C]) 0 + 2014-01-02 Equity(65 [A]) 1 + Equity(66 [B]) 1 + Equity(67 [C]) 1 + 2014-01-03 Equity(65 [A]) 2 + Equity(66 [B]) 2 + Equity(67 [C]) 2 + """ asset_info = asset_infos[0][0] - with tmp_asset_finder(equities=asset_info) as finder: - result = SimplePipelineEngine( - loader, - dates, - finder, - ).run_pipeline(p, dates[0], dates[-1]) - nassets = len(asset_info) - expected = pd.DataFrame( - list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), - index=pd.MultiIndex.from_product(( - self.macro_df.timestamp, - finder.retrieve_all(asset_info.index), - )), - columns=('value',), - ) - assert_frame_equal(result, expected, check_dtype=False) + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), + index=pd.MultiIndex.from_product(( + self.macro_df.timestamp, + finder.retrieve_all(asset_info.index), + )), + columns=('value',), + ) + self._test_id( + self.macro_df, + self.macro_dshape, + expected, + finder, + ('value',), + ) + + def test_id_ffill_out_of_window_macro_dataset(self): + """ + input (df): + asof_date timestamp other value + 0 2013-12-22 2013-12-22 NaN 0 + 1 2013-12-23 2013-12-23 1 NaN + 2 2013-12-24 2013-12-24 NaN NaN + + output (expected): + other value + 2014-01-01 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + 2014-01-02 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + 2014-01-03 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + """ + dates = self.dates - timedelta(days=10) + df = pd.DataFrame({ + 'value': (0, np.nan, np.nan), + 'other': (np.nan, 1, np.nan), + 'asof_date': dates, + 'timestamp': dates, + }) + fields = OrderedDict(self.macro_dshape.measure.fields) + fields['other'] = fields['value'] + + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + np.array([[0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1]]), + columns=['value', 'other'], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)), + ), + ).sort_index(axis=1) + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'other'), + ) def test_id_macro_dataset_multiple_columns(self): + """ + input (df): + asof_date timestamp other value + 0 2014-01-01 2014-01-01 1 0 + 3 2014-01-02 2014-01-02 2 1 + 6 2014-01-03 2014-01-03 3 2 + + output (expected): + other value + 2014-01-01 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + 2014-01-02 Equity(65 [A]) 2 1 + Equity(66 [B]) 2 1 + Equity(67 [C]) 2 1 + 2014-01-03 Equity(65 [A]) 3 2 + Equity(66 [B]) 3 2 + Equity(67 [C]) 3 2 + """ df = self.macro_df.copy() df['other'] = df.value + 1 fields = OrderedDict(self.macro_dshape.measure.fields) @@ -438,15 +620,15 @@ class BlazeToPipelineTestCase(TestCase): ds = from_blaze( expr, loader=loader, - no_deltas_rule='ignore', + no_deltas_rule=no_deltas_rules.ignore, ) p = Pipeline() p.add(ds.value.latest, 'value') - p.add(ds.other.latest, 'latest') + p.add(ds.other.latest, 'other') dates = self.dates asset_info = asset_infos[0][0] - with tmp_asset_finder(asset_info) as finder: + with tmp_asset_finder(equities=asset_info) as finder: result = SimplePipelineEngine( loader, dates, @@ -461,7 +643,7 @@ class BlazeToPipelineTestCase(TestCase): df.timestamp, finder.retrieve_all(asset_info.index), )), - columns=('value', 'latest'), + columns=('value', 'other'), ).sort_index(axis=1) assert_frame_equal( result,