Merge pull request #1031 from quantopian/ignore_missing_deltas_in_pipeline

Ignore missing deltas in pipeline
This commit is contained in:
Maya Tydykov
2016-03-08 18:00:07 -05:00
3 changed files with 54 additions and 11 deletions
+22 -8
View File
@@ -5,6 +5,7 @@ from __future__ import division
from collections import OrderedDict
from datetime import timedelta, time
from itertools import product, chain
from unittest import TestCase
import warnings
@@ -37,8 +38,10 @@ from zipline.utils.numpy_utils import (
int64_dtype,
repeat_last_axis,
)
from zipline.utils.test_utils import tmp_asset_finder, make_simple_equity_info
from zipline.utils.test_utils import (
tmp_asset_finder,
make_simple_equity_info,
)
nameof = op.attrgetter('name')
dtypeof = op.attrgetter('dtype')
@@ -55,6 +58,9 @@ asset_infos = (
),),
)
with_extra_sid = parameterized.expand(asset_infos)
with_ignore_sid = parameterized.expand(
product(chain.from_iterable(asset_infos), [True, False])
)
def _utc_localize_index_level_0(df):
@@ -843,10 +849,20 @@ class BlazeToPipelineTestCase(TestCase):
check_dtype=False,
)
@with_extra_sid
def test_deltas(self, asset_info):
expr = bz.Data(self.df, name='expr', dshape=self.dshape)
deltas = bz.Data(self.df, dshape=self.dshape)
@with_ignore_sid
def test_deltas(self, asset_info, add_extra_sid):
df = self.df.copy()
if add_extra_sid:
extra_sid_df = pd.DataFrame({
'asof_date': self.dates,
'timestamp': self.dates,
'sid': (ord('E'),) * 3,
'value': (3., 4., 5.,),
'int_value': (3, 4, 5),
})
df = df.append(extra_sid_df, ignore_index=True)
expr = bz.Data(df, name='expr', dshape=self.dshape)
deltas = bz.Data(df, dshape=self.dshape)
deltas = bz.Data(
odo(
bz.transform(
@@ -859,7 +875,6 @@ class BlazeToPipelineTestCase(TestCase):
name='delta',
dshape=self.dshape,
)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': np.array([[10.0, 11.0, 12.0],
[1.0, 2.0, 3.0]]),
@@ -875,7 +890,6 @@ class BlazeToPipelineTestCase(TestCase):
lambda view: np.c_[view, [np.nan, np.nan]],
expected_views,
)
with tmp_asset_finder(equities=asset_info) as finder:
expected_output = pd.DataFrame(
list(concatv([12] * nassets, [13] * nassets, [14] * nassets)),
+9 -2
View File
@@ -765,8 +765,8 @@ def adjustments_from_deltas_with_sids(dense_dates,
column_name,
asset_idx,
deltas):
"""Collect all the adjustments that occur in a dataset that does not
have a sid column.
"""Collect all the adjustments that occur in a dataset that has a sid
column.
Parameters
----------
@@ -953,6 +953,13 @@ class BlazeLoader(dict):
)
)
# It's not guaranteed that assets returned by the engine will contain
# all sids from the deltas table; filter out such mismatches here.
if not materialized_deltas.empty and have_sids:
materialized_deltas = materialized_deltas[
materialized_deltas[SID_FIELD_NAME].isin(assets)
]
if data_query_time is not None:
for m in (materialized_expr, materialized_deltas):
m.loc[:, TS_FIELD_NAME] = m.loc[
+23 -1
View File
@@ -12,7 +12,7 @@ import shutil
from string import ascii_uppercase
import tempfile
from logbook import FileHandler
from logbook import FileHandler, TestHandler
from mock import patch
from numpy.testing import assert_allclose, assert_array_equal
import numpy as np
@@ -881,3 +881,25 @@ def parameter_space(**params):
param_sets = product(*(params[name] for name in argnames))
return subtest(param_sets, *argnames)(f)
return decorator
def make_test_handler(testcase, *args, **kwargs):
"""
Returns a TestHandler which will be used by the given testcase. This
handler can be used to test log messages.
Parameters
----------
testcase: unittest.TestCase
The test class in which the log handler will be used.
*args, **kwargs
Forwarded to the new TestHandler object.
Returns
-------
handler: logbook.TestHandler
The handler to use for the test case.
"""
handler = TestHandler(*args, **kwargs)
testcase.addCleanup(handler.close)
return handler