diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index ed19f5f4..12016c9a 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -5,6 +5,7 @@ from __future__ import division from collections import OrderedDict from datetime import timedelta, time +from itertools import product, chain from unittest import TestCase import warnings @@ -37,8 +38,10 @@ from zipline.utils.numpy_utils import ( int64_dtype, repeat_last_axis, ) -from zipline.utils.test_utils import tmp_asset_finder, make_simple_equity_info - +from zipline.utils.test_utils import ( + tmp_asset_finder, + make_simple_equity_info, +) nameof = op.attrgetter('name') dtypeof = op.attrgetter('dtype') @@ -55,6 +58,9 @@ asset_infos = ( ),), ) with_extra_sid = parameterized.expand(asset_infos) +with_ignore_sid = parameterized.expand( + product(chain.from_iterable(asset_infos), [True, False]) +) def _utc_localize_index_level_0(df): @@ -843,10 +849,20 @@ class BlazeToPipelineTestCase(TestCase): check_dtype=False, ) - @with_extra_sid - def test_deltas(self, asset_info): - expr = bz.Data(self.df, name='expr', dshape=self.dshape) - deltas = bz.Data(self.df, dshape=self.dshape) + @with_ignore_sid + def test_deltas(self, asset_info, add_extra_sid): + df = self.df.copy() + if add_extra_sid: + extra_sid_df = pd.DataFrame({ + 'asof_date': self.dates, + 'timestamp': self.dates, + 'sid': (ord('E'),) * 3, + 'value': (3., 4., 5.,), + 'int_value': (3, 4, 5), + }) + df = df.append(extra_sid_df, ignore_index=True) + expr = bz.Data(df, name='expr', dshape=self.dshape) + deltas = bz.Data(df, dshape=self.dshape) deltas = bz.Data( odo( bz.transform( @@ -859,7 +875,6 @@ class BlazeToPipelineTestCase(TestCase): name='delta', dshape=self.dshape, ) - expected_views = keymap(pd.Timestamp, { '2014-01-02': np.array([[10.0, 11.0, 12.0], [1.0, 2.0, 3.0]]), @@ -875,7 +890,6 @@ class BlazeToPipelineTestCase(TestCase): lambda view: np.c_[view, [np.nan, np.nan]], expected_views, ) - with tmp_asset_finder(equities=asset_info) as finder: expected_output = pd.DataFrame( list(concatv([12] * nassets, [13] * nassets, [14] * nassets)), diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 904300a6..b471d2bf 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -765,8 +765,8 @@ def adjustments_from_deltas_with_sids(dense_dates, column_name, asset_idx, deltas): - """Collect all the adjustments that occur in a dataset that does not - have a sid column. + """Collect all the adjustments that occur in a dataset that has a sid + column. Parameters ---------- @@ -953,6 +953,13 @@ class BlazeLoader(dict): ) ) + # It's not guaranteed that assets returned by the engine will contain + # all sids from the deltas table; filter out such mismatches here. + if not materialized_deltas.empty and have_sids: + materialized_deltas = materialized_deltas[ + materialized_deltas[SID_FIELD_NAME].isin(assets) + ] + if data_query_time is not None: for m in (materialized_expr, materialized_deltas): m.loc[:, TS_FIELD_NAME] = m.loc[ diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 0d51a6ca..9db9e3a3 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -12,7 +12,7 @@ import shutil from string import ascii_uppercase import tempfile -from logbook import FileHandler +from logbook import FileHandler, TestHandler from mock import patch from numpy.testing import assert_allclose, assert_array_equal import numpy as np @@ -881,3 +881,25 @@ def parameter_space(**params): param_sets = product(*(params[name] for name in argnames)) return subtest(param_sets, *argnames)(f) return decorator + + +def make_test_handler(testcase, *args, **kwargs): + """ + Returns a TestHandler which will be used by the given testcase. This + handler can be used to test log messages. + + Parameters + ---------- + testcase: unittest.TestCase + The test class in which the log handler will be used. + *args, **kwargs + Forwarded to the new TestHandler object. + + Returns + ------- + handler: logbook.TestHandler + The handler to use for the test case. + """ + handler = TestHandler(*args, **kwargs) + testcase.addCleanup(handler.close) + return handler