From 6976f7e45915f3de8d6003eae6694f809ad7b252 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Thu, 3 Mar 2016 13:23:39 -0500 Subject: [PATCH 1/5] # This is a combination of 8 commits. # The first commit's message is: BUG: ignore sids in deltas missing from asset index. # This is the 2nd commit message: MAINT: use correct debugger. # This is the 3rd commit message: MAINT: fix set add. # This is the 4th commit message: WIP: move sid filtering. # This is the 5th commit message: WIP: move filtering logic. # This is the 6th commit message: WIP: working test. # This is the 7th commit message: TST: clean up test. # This is the 8th commit message: STY: fix flake8. --- tests/pipeline/test_blaze.py | 86 +++++++++++++++++++++++++- zipline/pipeline/loaders/blaze/core.py | 18 +++++- zipline/utils/test_utils.py | 8 ++- 3 files changed, 106 insertions(+), 6 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index ed19f5f4..e7582030 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -37,8 +37,11 @@ from zipline.utils.numpy_utils import ( int64_dtype, repeat_last_axis, ) -from zipline.utils.test_utils import tmp_asset_finder, make_simple_equity_info - +from zipline.utils.test_utils import ( + tmp_asset_finder, + make_simple_equity_info, + make_test_handler +) nameof = op.attrgetter('name') dtypeof = op.attrgetter('dtype') @@ -1128,3 +1131,82 @@ class BlazeToPipelineTestCase(TestCase): window_length=3, compute_fn=op.itemgetter(-1), ) + + def test_ignore_nonexistent_delta(self): + missing_sid = ord('E') + handler = make_test_handler(self) + asset_info = asset_infos[0][0] + base_dates = pd.DatetimeIndex([ + pd.Timestamp('2014-01-01'), + pd.Timestamp('2014-01-04') + ]) + repeated_dates = base_dates.repeat(4) + baseline = pd.DataFrame({ + 'sid': (self.sids + (missing_sid,)) * 2, + 'value': (0., 1., 2., 3., 1., 2., 3., 4.), + 'int_value': (0, 1, 2, 3, 1, 2, 3, 4.), + 'asof_date': repeated_dates, + 'timestamp': repeated_dates + }) + expr = bz.Data(baseline, name='expr', dshape=self.dshape) + deltas = bz.Data( + odo( + bz.transform( + expr, + value=expr.value + 10, + timestamp=expr.timestamp + timedelta(days=1), + ), + pd.DataFrame, + ), + name='delta', + dshape=self.dshape, + ) + expected_views = keymap(pd.Timestamp, { + '2014-01-03': np.array([[10.0, 11.0, 12.0], + [10.0, 11.0, 12.0], + [10.0, 11.0, 12.0]]), + '2014-01-06': np.array([[10.0, 11.0, 12.0], + [10.0, 11.0, 12.0], + [11.0, 12.0, 13.0]]), + }) + if len(asset_info) == 4: + expected_views = valmap( + lambda view: np.c_[view, [np.nan, np.nan, np.nan]], + expected_views, + ) + expected_output_buffer = [10, 11, 12, np.nan, 11, 12, 13, np.nan] + else: + expected_output_buffer = [10, 11, 12, 11, 12, 13] + + cal = pd.DatetimeIndex([ + pd.Timestamp('2014-01-01'), + pd.Timestamp('2014-01-02'), + pd.Timestamp('2014-01-03'), + # omitting the 4th and 5th to simulate a weekend + pd.Timestamp('2014-01-06'), + ]) + with handler.applicationbound(): + with tmp_asset_finder(equities=asset_info) as finder: + expected_output = pd.DataFrame( + expected_output_buffer, + index=pd.MultiIndex.from_product(( + sorted(expected_views.keys()), + finder.retrieve_all(asset_info.index), + )), + columns=('value',), + ) + self._run_pipeline( + expr, + deltas, + expected_views, + expected_output, + finder, + calendar=cal, + start=cal[2], + end=cal[-1], + window_length=3, + compute_fn=op.itemgetter(-1), + ) + message = handler.records[0].message + exp_msg = "Didn't find the following sids in asset index: {}" + self.assertIn(exp_msg.format(set([missing_sid])), message) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 904300a6..c32674fa 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -129,6 +129,7 @@ from collections import namedtuple, defaultdict from copy import copy from functools import partial, reduce from itertools import count +import logbook import warnings from weakref import WeakKeyDictionary @@ -194,7 +195,7 @@ traversable_nodes = ( ) is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) get__name__ = op.attrgetter('__name__') - +log = logbook.Logger('BlazeLoader') class ExprData(namedtuple('ExprData', 'expr deltas odo_kwargs')): """A pair of expressions and data resources. The expresions will be @@ -765,8 +766,8 @@ def adjustments_from_deltas_with_sids(dense_dates, column_name, asset_idx, deltas): - """Collect all the adjustments that occur in a dataset that does not - have a sid column. + """Collect all the adjustments that occur in a dataset that has a sid + column. Parameters ---------- @@ -952,6 +953,17 @@ class BlazeLoader(dict): columns=added_query_fields + list(map(getname, columns)), ) ) + if not materialized_deltas.empty and have_sids: + missing_sids = materialized_deltas[ + ~materialized_deltas.sid.isin(assets) + ] + log.debug( + "Didn't find the following sids in asset index: {}".format( + set(missing_sids.sid)) + ) + materialized_deltas = materialized_deltas[ + materialized_deltas.sid.isin(assets) + ] if data_query_time is not None: for m in (materialized_expr, materialized_deltas): diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 0d51a6ca..d08b0280 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -12,7 +12,7 @@ import shutil from string import ascii_uppercase import tempfile -from logbook import FileHandler +from logbook import FileHandler, TestHandler from mock import patch from numpy.testing import assert_allclose, assert_array_equal import numpy as np @@ -881,3 +881,9 @@ def parameter_space(**params): param_sets = product(*(params[name] for name in argnames)) return subtest(param_sets, *argnames)(f) return decorator + + +def make_test_handler(testcase, *args, **kwargs): + handler = TestHandler(*args, **kwargs) + testcase.addCleanup(handler.close) + return handler From ac82aa7d95436fb923ea810ca27c5b588e637224 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Thu, 3 Mar 2016 13:23:39 -0500 Subject: [PATCH 2/5] BUG: ignore sids in deltas missing from asset index. MAINT: use correct debugger. MAINT: fix set add. WIP: move sid filtering. WIP: move filtering logic. WIP: working test. TST: clean up test. STY: fix flake8. STY: fix flake8. --- zipline/pipeline/loaders/blaze/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index c32674fa..74dfedc8 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -197,6 +197,7 @@ is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) get__name__ = op.attrgetter('__name__') log = logbook.Logger('BlazeLoader') + class ExprData(namedtuple('ExprData', 'expr deltas odo_kwargs')): """A pair of expressions and data resources. The expresions will be computed using the resources as the starting scope. From ecb493e1a1bebe49b273e094d419c14538165eba Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 8 Mar 2016 12:17:31 -0500 Subject: [PATCH 3/5] MAINT: remove logging. --- zipline/pipeline/loaders/blaze/core.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 74dfedc8..091599c2 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -129,7 +129,6 @@ from collections import namedtuple, defaultdict from copy import copy from functools import partial, reduce from itertools import count -import logbook import warnings from weakref import WeakKeyDictionary @@ -195,7 +194,6 @@ traversable_nodes = ( ) is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) get__name__ = op.attrgetter('__name__') -log = logbook.Logger('BlazeLoader') class ExprData(namedtuple('ExprData', 'expr deltas odo_kwargs')): @@ -954,14 +952,10 @@ class BlazeLoader(dict): columns=added_query_fields + list(map(getname, columns)), ) ) + + # It's not guaranteed that assets returned by the engine will contain + # all sids from the deltas table; filter out such mismatches here. if not materialized_deltas.empty and have_sids: - missing_sids = materialized_deltas[ - ~materialized_deltas.sid.isin(assets) - ] - log.debug( - "Didn't find the following sids in asset index: {}".format( - set(missing_sids.sid)) - ) materialized_deltas = materialized_deltas[ materialized_deltas.sid.isin(assets) ] From c87293942f30eb23b4628ef5aa3e2faca3eef030 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 8 Mar 2016 12:18:04 -0500 Subject: [PATCH 4/5] TST: remove logging test and move missing sid test. DOC: add docstring. TST: reduce test data redundancy. MAINT: use string constant. --- tests/pipeline/test_blaze.py | 104 +++++-------------------- zipline/pipeline/loaders/blaze/core.py | 2 +- zipline/utils/test_utils.py | 7 ++ 3 files changed, 26 insertions(+), 87 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index e7582030..12016c9a 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -5,6 +5,7 @@ from __future__ import division from collections import OrderedDict from datetime import timedelta, time +from itertools import product, chain from unittest import TestCase import warnings @@ -40,7 +41,6 @@ from zipline.utils.numpy_utils import ( from zipline.utils.test_utils import ( tmp_asset_finder, make_simple_equity_info, - make_test_handler ) nameof = op.attrgetter('name') @@ -58,6 +58,9 @@ asset_infos = ( ),), ) with_extra_sid = parameterized.expand(asset_infos) +with_ignore_sid = parameterized.expand( + product(chain.from_iterable(asset_infos), [True, False]) +) def _utc_localize_index_level_0(df): @@ -846,10 +849,20 @@ class BlazeToPipelineTestCase(TestCase): check_dtype=False, ) - @with_extra_sid - def test_deltas(self, asset_info): - expr = bz.Data(self.df, name='expr', dshape=self.dshape) - deltas = bz.Data(self.df, dshape=self.dshape) + @with_ignore_sid + def test_deltas(self, asset_info, add_extra_sid): + df = self.df.copy() + if add_extra_sid: + extra_sid_df = pd.DataFrame({ + 'asof_date': self.dates, + 'timestamp': self.dates, + 'sid': (ord('E'),) * 3, + 'value': (3., 4., 5.,), + 'int_value': (3, 4, 5), + }) + df = df.append(extra_sid_df, ignore_index=True) + expr = bz.Data(df, name='expr', dshape=self.dshape) + deltas = bz.Data(df, dshape=self.dshape) deltas = bz.Data( odo( bz.transform( @@ -862,7 +875,6 @@ class BlazeToPipelineTestCase(TestCase): name='delta', dshape=self.dshape, ) - expected_views = keymap(pd.Timestamp, { '2014-01-02': np.array([[10.0, 11.0, 12.0], [1.0, 2.0, 3.0]]), @@ -878,7 +890,6 @@ class BlazeToPipelineTestCase(TestCase): lambda view: np.c_[view, [np.nan, np.nan]], expected_views, ) - with tmp_asset_finder(equities=asset_info) as finder: expected_output = pd.DataFrame( list(concatv([12] * nassets, [13] * nassets, [14] * nassets)), @@ -1131,82 +1142,3 @@ class BlazeToPipelineTestCase(TestCase): window_length=3, compute_fn=op.itemgetter(-1), ) - - def test_ignore_nonexistent_delta(self): - missing_sid = ord('E') - handler = make_test_handler(self) - asset_info = asset_infos[0][0] - base_dates = pd.DatetimeIndex([ - pd.Timestamp('2014-01-01'), - pd.Timestamp('2014-01-04') - ]) - repeated_dates = base_dates.repeat(4) - baseline = pd.DataFrame({ - 'sid': (self.sids + (missing_sid,)) * 2, - 'value': (0., 1., 2., 3., 1., 2., 3., 4.), - 'int_value': (0, 1, 2, 3, 1, 2, 3, 4.), - 'asof_date': repeated_dates, - 'timestamp': repeated_dates - }) - expr = bz.Data(baseline, name='expr', dshape=self.dshape) - deltas = bz.Data( - odo( - bz.transform( - expr, - value=expr.value + 10, - timestamp=expr.timestamp + timedelta(days=1), - ), - pd.DataFrame, - ), - name='delta', - dshape=self.dshape, - ) - expected_views = keymap(pd.Timestamp, { - '2014-01-03': np.array([[10.0, 11.0, 12.0], - [10.0, 11.0, 12.0], - [10.0, 11.0, 12.0]]), - '2014-01-06': np.array([[10.0, 11.0, 12.0], - [10.0, 11.0, 12.0], - [11.0, 12.0, 13.0]]), - }) - if len(asset_info) == 4: - expected_views = valmap( - lambda view: np.c_[view, [np.nan, np.nan, np.nan]], - expected_views, - ) - expected_output_buffer = [10, 11, 12, np.nan, 11, 12, 13, np.nan] - else: - expected_output_buffer = [10, 11, 12, 11, 12, 13] - - cal = pd.DatetimeIndex([ - pd.Timestamp('2014-01-01'), - pd.Timestamp('2014-01-02'), - pd.Timestamp('2014-01-03'), - # omitting the 4th and 5th to simulate a weekend - pd.Timestamp('2014-01-06'), - ]) - with handler.applicationbound(): - with tmp_asset_finder(equities=asset_info) as finder: - expected_output = pd.DataFrame( - expected_output_buffer, - index=pd.MultiIndex.from_product(( - sorted(expected_views.keys()), - finder.retrieve_all(asset_info.index), - )), - columns=('value',), - ) - self._run_pipeline( - expr, - deltas, - expected_views, - expected_output, - finder, - calendar=cal, - start=cal[2], - end=cal[-1], - window_length=3, - compute_fn=op.itemgetter(-1), - ) - message = handler.records[0].message - exp_msg = "Didn't find the following sids in asset index: {}" - self.assertIn(exp_msg.format(set([missing_sid])), message) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 091599c2..b471d2bf 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -957,7 +957,7 @@ class BlazeLoader(dict): # all sids from the deltas table; filter out such mismatches here. if not materialized_deltas.empty and have_sids: materialized_deltas = materialized_deltas[ - materialized_deltas.sid.isin(assets) + materialized_deltas[SID_FIELD_NAME].isin(assets) ] if data_query_time is not None: diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index d08b0280..652258fa 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -884,6 +884,13 @@ def parameter_space(**params): def make_test_handler(testcase, *args, **kwargs): + """ + Returns a TestHandler which will be used by the given testcase. This + handler can be used to test log messages. + + testcase: unittest.TestCase + The test class in which the log handler will be used. + """ handler = TestHandler(*args, **kwargs) testcase.addCleanup(handler.close) return handler From 195c2fffb694e7e6ca8381fb486268ebf254fa45 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 8 Mar 2016 15:48:37 -0500 Subject: [PATCH 5/5] DOC: update doc. DOC: update docs. --- zipline/utils/test_utils.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 652258fa..9db9e3a3 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -888,8 +888,17 @@ def make_test_handler(testcase, *args, **kwargs): Returns a TestHandler which will be used by the given testcase. This handler can be used to test log messages. + Parameters + ---------- testcase: unittest.TestCase The test class in which the log handler will be used. + *args, **kwargs + Forwarded to the new TestHandler object. + + Returns + ------- + handler: logbook.TestHandler + The handler to use for the test case. """ handler = TestHandler(*args, **kwargs) testcase.addCleanup(handler.close)