From 8db59b387bbf3ce72c20fe4dd92e6a866c5fc3fa Mon Sep 17 00:00:00 2001 From: dmichalowicz Date: Wed, 6 Apr 2016 17:20:52 -0400 Subject: [PATCH] TST: Overhaul test case --- tests/pipeline/test_engine.py | 102 +++++++++++++++++++++------------- zipline/pipeline/engine.py | 8 ++- zipline/pipeline/graph.py | 2 +- 3 files changed, 70 insertions(+), 42 deletions(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 9dff54fa..871c7a80 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -10,13 +10,14 @@ from nose_parameterized import parameterized from numpy import ( arange, array, + concatenate, + float32, full, + log, nan, tile, + where, zeros, - float32, - concatenate, - log, ) from numpy.testing import assert_almost_equal from pandas import ( @@ -62,7 +63,6 @@ from zipline.pipeline.factors import ( MaxDrawdown, SimpleMovingAverage, ) -from zipline.pipeline.term import NotSpecified from zipline.testing import ( make_rotating_equity_info, make_simple_equity_info, @@ -96,6 +96,14 @@ class AssetID(CustomFactor): out[:] = assets +class AssetIDPlusDay(CustomFactor): + window_length = 1 + inputs = [USEquityPricing.close] + + def compute(self, today, assets, out, close): + out[:] = assets + today.day + + class OpenPrice(CustomFactor): window_length = 1 inputs = [USEquityPricing.open] @@ -166,7 +174,7 @@ class ConstantInputTestCase(TestCase): USEquityPricing.close: 3, USEquityPricing.high: 4, } - self.asset_ids = [1, 2, 3] + self.asset_ids = [1, 2, 3, 4] self.dates = date_range('2014-01', '2014-03', freq='D', tz='UTC') self.loader = PrecomputedLoader( constants=self.constants, @@ -372,52 +380,68 @@ class ConstantInputTestCase(TestCase): values. """ loader = self.loader - dates = self.dates[5:10] + dates = self.dates[5:8] assets = self.assets asset_ids = self.asset_ids constants = self.constants - num_dates = len(dates) open = USEquityPricing.open + close = USEquityPricing.close engine = SimplePipelineEngine( lambda column: loader, self.dates, self.asset_finder, ) - # These are the expected values for the OpenPrice factor. If we pass - # OpenPrice a mask, any assets that are filtered out should have all - # NaN values. Otherwise, we expect its computed values to be the - # asset's open price. - values = array([constants[open]] * num_dates, dtype=float) - missing_values = array([nan] * num_dates) + factor1_value = constants[open] + factor2_value = 3.0 * (constants[open] - constants[close]) - for asset_id in asset_ids: - mask = AssetID() <= asset_id - factor1 = OpenPrice(mask=mask) + def create_expected_results(expected_value, mask): + expected_values = where(mask, expected_value, nan) + return DataFrame(expected_values, index=dates, columns=assets) - # Test running our pipeline both with and without a second factor. - # We do not explicitly test the resulting values of the second - # factor; we just want to implicitly ensure that the addition of - # another factor to the pipeline term graph does not cause any - # unexpected exceptions when calling `run_pipeline`. - for factor2 in (None, - RollingSumDifference(mask=NotSpecified), - RollingSumDifference(mask=mask)): - if factor2 is None: - columns = {'factor1': factor1} - else: - columns = {'factor1': factor1, 'factor2': factor2} - pipeline = Pipeline(columns=columns) - results = engine.run_pipeline(pipeline, dates[0], dates[-1]) - factor1_results = results['factor1'].unstack() + # Produce a mask that looks like: + # + # Equity(0 [A]) Equity(1 [B]) Equity(2 [C]) Equity(3 [D]) + # Day 1 True True True False + # Day 2 True True False False + # Day 3 True False False False + # + cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day) - expected = { - asset: values if asset.sid <= asset_id else missing_values - for asset in assets - } + # And another one that looks like: + # + # Equity(0 [A]) Equity(1 [B]) Equity(2 [C]) Equity(3 [D]) + # Day 1 False True False True + # Day 2 True False True False + # Day 3 False True False True + # + alternating_mask = (AssetIDPlusDay() % 2).eq(0) - assert_frame_equal( - factor1_results, - DataFrame(expected, index=dates, columns=assets), - ) + for mask in (cascading_mask, alternating_mask): + # Test running a pipeline with a single masked factor. + columns = {'factor1': OpenPrice(mask=mask), 'mask': mask} + pipeline = Pipeline(columns=columns) + results = engine.run_pipeline(pipeline, dates[0], dates[-1]) + mask_results = results['mask'].unstack() + factor1_results = results['factor1'].unstack() + factor1_expected = create_expected_results(factor1_value, + mask_results) + assert_frame_equal(factor1_results, factor1_expected) + + # Test running a pipeline with a second factor. This ensures that + # adding another factor to the pipeline with a different window + # length does not cause any unexpected behavior, especially when + # both factors share the same mask. + columns['factor2'] = RollingSumDifference(mask=mask) + pipeline = Pipeline(columns=columns) + results = engine.run_pipeline(pipeline, dates[0], dates[-1]) + mask_results = results['mask'].unstack() + factor1_results = results['factor1'].unstack() + factor2_results = results['factor2'].unstack() + factor1_expected = create_expected_results(factor1_value, + mask_results) + factor2_expected = create_expected_results(factor2_value, + mask_results) + assert_frame_equal(factor1_results, factor1_expected) + assert_frame_equal(factor2_results, factor2_expected) def test_rolling_and_nonrolling(self): open_ = USEquityPricing.open diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 39a4776c..8a751bc5 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -237,16 +237,20 @@ class SimplePipelineEngine(object): assert shape[0] * shape[1] != 0, 'root mask cannot be empty' return ret - def _mask_and_dates_for_term(self, term, workspace, graph, dates): + def _mask_and_dates_for_term(self, term, workspace, graph, all_dates): """ Load mask and mask row labels for term. """ mask = term.mask mask_offset = graph.extra_rows[mask] - graph.extra_rows[term] + + # This offset is computed against _root_mask_term because that is what + # determines the shape of the top-level dates array. dates_offset = ( graph.extra_rows[self._root_mask_term] - graph.extra_rows[term] ) - return workspace[mask][mask_offset:], dates[dates_offset:] + + return workspace[mask][mask_offset:], all_dates[dates_offset:] @staticmethod def _inputs_for_term(term, workspace, graph): diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 6290c937..529e2459 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -9,7 +9,7 @@ from six import itervalues, iteritems from zipline.utils.memoize import lazyval from zipline.pipeline.visualize import display_graph -from .term import ComputableTerm, LoadableTerm +from .term import LoadableTerm class CyclicDependency(Exception):