From 9e06886794ec5d1e5fb529c9f35c0986736ba68f Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 12 Aug 2016 13:08:40 -0400 Subject: [PATCH 01/25] DOC: Fix typo in docstring. --- zipline/pipeline/term.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 4671daed..4174f495 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -353,7 +353,7 @@ class ComputableTerm(Term): A Term that should be computed from a tuple of inputs. This is the base class for :class:`zipline.pipeline.Factor`, - :class:`zipline.pipeline.Filter`, and :class:`zipline.pipeline.Factor`. + :class:`zipline.pipeline.Filter`, and :class:`zipline.pipeline.Classifier`. """ inputs = NotSpecified outputs = NotSpecified From 670f3c106fec948fa933dd7b8d92c4b98e5421c5 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 12 Aug 2016 13:09:04 -0400 Subject: [PATCH 02/25] DOC: Clarify how AssetExists() is special. --- zipline/pipeline/term.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 4174f495..95b444d2 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -320,6 +320,9 @@ class AssetExists(Term): every asset on every date. We don't subclass Filter, however, because `AssetExists` is computed directly by the PipelineEngine. + This term is guaranteed to be available as an input for any term computed + by SimplePipelineEngine.run_pipeline(). + See Also -------- zipline.assets.AssetFinder.lifetimes From d4a9d286fcc4c2ee8801f5298a0c6040c3895105 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 12 Aug 2016 13:15:31 -0400 Subject: [PATCH 03/25] DOC: Add a docstring for RecarrayField. --- zipline/pipeline/factors/factor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index a9f3faef..3410a60d 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -1468,7 +1468,9 @@ class CustomFactor(PositiveWindowLengthMixin, CustomTermMixin, Factor): class RecarrayField(SingleInputMixin, Factor): - + """ + A single field from a multi-output factor. + """ def __new__(cls, factor, attribute): return super(RecarrayField, cls).__new__( cls, From a81562d5f494003a4f327c382f1ec38e78ab3c7b Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 12 Aug 2016 14:03:15 -0400 Subject: [PATCH 04/25] MAINT: Improve/test errors for insufficient data. --- tests/pipeline/test_engine.py | 23 +++++++++++++++++++++++ zipline/pipeline/engine.py | 8 ++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 0521174b..0e2d2197 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -206,6 +206,29 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase): with self.assertRaisesRegexp(ValueError, msg): engine.run_pipeline(p, self.dates[2], self.dates[1]) + def test_fail_usefully_on_insufficient_data(self): + loader = self.loader + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) + + class SomeFactor(CustomFactor): + inputs = [USEquityPricing.close] + window_length = 10 + + def compute(self, today, assets, out, closes): + pass + + p = Pipeline(columns={'t': SomeFactor()}) + + # self.dates[9] is the earliest date we should be able to compute. + engine.run_pipeline(p, self.dates[9], self.dates[9]) + + # We shouldn't be able to compute dates[8], since we only know about 8 + # prior dates, and we need a window length of 10. + with self.assertRaises(NoFurtherDataError) as e: + engine.run_pipeline(p, self.dates[8], self.dates[8]) + def test_same_day_pipeline(self): loader = self.loader engine = SimplePipelineEngine( diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 8be01f4e..ffb5b1c9 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -211,10 +211,10 @@ class SimplePipelineEngine(object): start_idx, end_idx = self._calendar.slice_locs(start_date, end_date) if start_idx < extra_rows: raise NoFurtherDataError( - msg="Insufficient data to compute Pipeline mask: " - "start date was %s, " - "earliest known date was %s, " - "and %d extra rows were requested." % ( + msg="Insufficient data to compute Pipeline.\n\n" + "start date was %s\n" + "earliest known date was %s\n" + "%d extra rows were required" % ( start_date, calendar[0], extra_rows, ), ) From 1d93e1c21fb7836f563de860ca32c0707e538385 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 12 Aug 2016 14:03:49 -0400 Subject: [PATCH 05/25] ENH: Add as_column to numpy_utils. --- zipline/utils/numpy_utils.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/zipline/utils/numpy_utils.py b/zipline/utils/numpy_utils.py index 59a32d20..84fbfd7e 100644 --- a/zipline/utils/numpy_utils.py +++ b/zipline/utils/numpy_utils.py @@ -364,3 +364,36 @@ def vectorized_is_element(array, choices): Array indicating whether each element of ``array`` was in ``choices``. """ return vectorize(choices.__contains__, otypes=[bool])(array) + + +def as_column(a): + """ + Convert an array of shape (N,) into an array of shape (N, 1). + + This is equivalent to `a[:, np.newaxis]`. + + Parameters + ---------- + a : np.ndarray + + Example + ------- + >>> import numpy as np + >>> a = np.arange(5) + >>> a + array([0, 1, 2, 3, 4]) + >>> as_column(a) + array([[0], + [1], + [2], + [3], + [4]]) + >>> as_column(a).shape + (5, 1) + """ + if a.ndim != 1: + raise ValueError( + "as_column expected an 1-dimensional array, " + "but got an array of shape %s" % a.shape + ) + return a[:, None] From d99d993aeafc8efaa8ae9b2a5e20f810e4c03cc0 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 12 Aug 2016 14:05:55 -0400 Subject: [PATCH 06/25] MAINT: Remove empty inputs from BoundColumn. They belong on LoadableTerm instead. --- zipline/pipeline/data/dataset.py | 1 - zipline/pipeline/term.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index d06b72a8..2008170b 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -112,7 +112,6 @@ class BoundColumn(LoadableTerm): The name of this column. """ mask = AssetExists() - inputs = () window_safe = True def __new__(cls, dtype, missing_value, dataset, name): diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 95b444d2..5a5dba5b 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -345,6 +345,7 @@ class LoadableTerm(Term): This is the base class for :class:`zipline.pipeline.data.BoundColumn`. """ windowed = False + inputs = () @lazyval def dependencies(self): From a8b67d352ef9eebb6208930aa25dcb142ce16885 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 13:14:43 -0400 Subject: [PATCH 07/25] MAINT: Refactor in prep for downsampled terms. - Split out extra_rows handling into an `ExecutionPlan` subclass. `ExecutionPlan` now requires the dates and calendar against which a set of terms will be computed, and now defers to a term's `compute_extra_rows` method when deciding how many extra rows are required to compute for that term. This will allow downsampled terms to request enough extra rows to guarantee that we can maintain consistent calculation dates. As a consequence of the above, `TermGraph` now only deals with logical dependencies, not with metadata surrounding extra row calculations. This means that TermGraph can be used to generate dependency visualizations in interactive contexts where we don't yet have a calendar or start/end dates. - Refactored test_{filter,factor,classifier} to use check_terms instead of run_graph. This makes it easier to make changes to TermGraph, since the testing interface is now to simply provide a dict of terms. - Refactored BasePipelineTestCase to use fixtures to create an asset finder. This fixes a potential leak of the test's asset db, which was not being explicitly cleaned up. - Refactored test_technical to use BasePipelineTestCase. - Added a new special term, `InputDates()`, which can be used to request date labels for inputs. Like `AssetExists`, `InputDates` is provided in the initial workspace by default. - Added a default (failing) `_compute` method to `AssetExists` which provides a more useful error than AttributeError. --- tests/pipeline/base.py | 72 ++++--- tests/pipeline/test_engine.py | 29 ++- tests/pipeline/test_factor.py | 122 +++++------- tests/pipeline/test_filter.py | 329 ++++++++++++++----------------- tests/pipeline/test_technical.py | 123 ++++-------- tests/pipeline/test_term.py | 29 ++- zipline/pipeline/__init__.py | 3 +- zipline/pipeline/engine.py | 18 +- zipline/pipeline/graph.py | 234 +++++++++++++++------- zipline/pipeline/pipeline.py | 47 ++++- zipline/pipeline/term.py | 50 ++++- 11 files changed, 600 insertions(+), 456 deletions(-) diff --git a/tests/pipeline/base.py b/tests/pipeline/base.py index a5c8454f..98de5aee 100644 --- a/tests/pipeline/base.py +++ b/tests/pipeline/base.py @@ -1,24 +1,23 @@ """ -Base class for Pipeline API unittests. +Base class for Pipeline API unit tests. """ from functools import wraps import numpy as np from numpy import arange, prod -from pandas import date_range, Int64Index, DataFrame +from pandas import DataFrame, Timestamp from six import iteritems -from zipline.assets.synthetic import make_simple_equity_info from zipline.pipeline.engine import SimplePipelineEngine -from zipline.pipeline import TermGraph -from zipline.pipeline.term import AssetExists +from zipline.pipeline import ExecutionPlan +from zipline.pipeline.term import AssetExists, InputDates from zipline.testing import ( check_arrays, ExplodingObject, - tmp_asset_finder, ) from zipline.testing.fixtures import ( - WithTradingCalendars, + WithAssetFinder, + WithTradingSessions, ZiplineTestCase, ) @@ -53,32 +52,26 @@ def with_defaults(**default_funcs): with_default_shape = with_defaults(shape=lambda self: self.default_shape) -class BasePipelineTestCase(WithTradingCalendars, ZiplineTestCase): +class BasePipelineTestCase(WithTradingSessions, + WithAssetFinder, + ZiplineTestCase): + START_DATE = Timestamp('2014', tz='UTC') + END_DATE = Timestamp('2014-12-31', tz='UTC') + ASSET_FINDER_EQUITY_SIDS = list(range(20)) @classmethod def init_class_fixtures(cls): super(BasePipelineTestCase, cls).init_class_fixtures() - cls.__calendar = date_range('2014', '2015', - freq=cls.trading_calendar.day) - cls.__assets = assets = Int64Index(arange(1, 20)) - cls.__tmp_finder_ctx = tmp_asset_finder( - equities=make_simple_equity_info( - assets, - cls.__calendar[0], - cls.__calendar[-1], - ) - ) - cls.__finder = cls.__tmp_finder_ctx.__enter__() - cls.__mask = cls.__finder.lifetimes( - cls.__calendar[-30:], + cls.default_asset_exists_mask = cls.asset_finder.lifetimes( + cls.nyse_sessions[-30:], include_start_date=False, ) @property def default_shape(self): """Default shape for methods that build test data.""" - return self.__mask.shape + return self.default_asset_exists_mask.shape def run_graph(self, graph, initial_workspace, mask=None): """ @@ -103,14 +96,17 @@ class BasePipelineTestCase(WithTradingCalendars, ZiplineTestCase): """ engine = SimplePipelineEngine( lambda column: ExplodingObject(), - self.__calendar, - self.__finder, + self.nyse_sessions, + self.asset_finder, ) if mask is None: - mask = self.__mask + mask = self.default_asset_exists_mask dates, assets, mask_values = explode(mask) + initial_workspace.setdefault(AssetExists(), mask_values) + initial_workspace.setdefault(InputDates(), dates) + return engine.compute_chunk( graph, dates, @@ -118,15 +114,29 @@ class BasePipelineTestCase(WithTradingCalendars, ZiplineTestCase): initial_workspace, ) - def check_terms(self, terms, expected, initial_workspace, mask): + def check_terms(self, + terms, + expected, + initial_workspace, + mask, + check=check_arrays): """ Compile the given terms into a TermGraph, compute it with initial_workspace, and compare the results with ``expected``. """ - graph = TermGraph(terms) + start_date, end_date = mask.index[[0, -1]] + graph = ExecutionPlan( + terms, + all_dates=self.nyse_sessions, + start_date=start_date, + end_date=end_date, + ) + results = self.run_graph(graph, initial_workspace, mask) for key, (res, exp) in dzip_exact(results, expected).items(): - check_arrays(res, exp) + check(res, exp) + + return results def build_mask(self, array): """ @@ -138,13 +148,13 @@ class BasePipelineTestCase(WithTradingCalendars, ZiplineTestCase): array, # Use the **last** N dates rather than the first N so that we have # space for lookbacks. - index=self.__calendar[-ndates:], - columns=self.__assets[:nassets], + index=self.nyse_sessions[-ndates:], + columns=self.ASSET_FINDER_EQUITY_SIDS[:nassets], dtype=bool, ) @with_default_shape - def arange_data(self, shape, dtype=float): + def arange_data(self, shape, dtype=np.float64): """ Build a block of testing data from numpy.arange. """ diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 0e2d2197..fecebd93 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -40,6 +40,7 @@ from six import iteritems, itervalues from toolz import merge from zipline.assets.synthetic import make_rotating_equity_info +from zipline.errors import NoFurtherDataError from zipline.lib.adjustment import MULTIPLY from zipline.lib.labelarray import LabelArray from zipline.pipeline import CustomFactor, Pipeline @@ -65,6 +66,7 @@ from zipline.pipeline.loaders.synthetic import ( expected_bar_values_2d, ) from zipline.pipeline.sentinels import NotSpecified +from zipline.pipeline.term import InputDates from zipline.testing import ( AssetID, AssetIDPlusDay, @@ -81,7 +83,7 @@ from zipline.testing.fixtures import ( ZiplineTestCase, ) from zipline.utils.memoize import lazyval -from zipline.utils.numpy_utils import bool_dtype +from zipline.utils.numpy_utils import bool_dtype, datetime64ns_dtype class RollingSumDifference(CustomFactor): @@ -229,6 +231,31 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase): with self.assertRaises(NoFurtherDataError) as e: engine.run_pipeline(p, self.dates[8], self.dates[8]) + def test_input_dates_provided_by_default(self): + loader = self.loader + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) + + class TestFactor(CustomFactor): + inputs = [InputDates(), USEquityPricing.close] + window_length = 10 + dtype = datetime64ns_dtype + + def compute(self, today, assets, out, dates, closes): + first, last = dates[[0, -1], 0] + assert last == today.asm8 + assert len(dates) == len(closes) == self.window_length + out[:] = first + + p = Pipeline(columns={'t': TestFactor()}) + results = engine.run_pipeline(p, self.dates[9], self.dates[10]) + + # All results are the same, so just grab one column. + column = results.unstack().iloc[:, 0].values + check_arrays(column, self.dates[:2].values) + + def test_same_day_pipeline(self): loader = self.loader engine = SimplePipelineEngine( diff --git a/tests/pipeline/test_factor.py b/tests/pipeline/test_factor.py index 4c63429d..ba50fdae 100644 --- a/tests/pipeline/test_factor.py +++ b/tests/pipeline/test_factor.py @@ -123,20 +123,18 @@ class FactorTestCase(BasePipelineTestCase): data = arange(25).reshape(5, 5) data[eye(5, dtype=bool)] = custom_missing_value - graph = TermGraph( + self.check_terms( { 'isnull': factor.isnull(), 'notnull': factor.notnull(), - } - ) - - results = self.run_graph( - graph, + }, + { + 'isnull': eye(5, dtype=bool), + 'notnull': ~eye(5, dtype=bool), + }, initial_workspace={factor: data}, mask=self.build_mask(ones((5, 5))), ) - check_arrays(results['isnull'], eye(5, dtype=bool)) - check_arrays(results['notnull'], ~eye(5, dtype=bool)) def test_isnull_datetime_dtype(self): class DatetimeFactor(Factor): @@ -149,20 +147,18 @@ class FactorTestCase(BasePipelineTestCase): data = arange(25).reshape(5, 5).astype('datetime64[ns]') data[eye(5, dtype=bool)] = NaTns - graph = TermGraph( + self.check_terms( { 'isnull': factor.isnull(), 'notnull': factor.notnull(), - } - ) - - results = self.run_graph( - graph, + }, + { + 'isnull': eye(5, dtype=bool), + 'notnull': ~eye(5, dtype=bool), + }, initial_workspace={factor: data}, mask=self.build_mask(ones((5, 5))), ) - check_arrays(results['isnull'], eye(5, dtype=bool)) - check_arrays(results['notnull'], ~eye(5, dtype=bool)) @for_each_factor_dtype def test_rank_ascending(self, name, factor_dtype): @@ -206,14 +202,12 @@ class FactorTestCase(BasePipelineTestCase): } def check(terms): - graph = TermGraph(terms) - results = self.run_graph( - graph, + self.check_terms( + terms, + expected={name: expected_ranks[name] for name in terms}, initial_workspace={f: data}, mask=self.build_mask(ones((5, 5))), ) - for method in terms: - check_arrays(results[method], expected_ranks[method]) check({meth: f.rank(method=meth) for meth in expected_ranks}) check({ @@ -265,14 +259,12 @@ class FactorTestCase(BasePipelineTestCase): } def check(terms): - graph = TermGraph(terms) - results = self.run_graph( - graph, + self.check_terms( + terms, + expected={name: expected_ranks[name] for name in terms}, initial_workspace={f: data}, mask=self.build_mask(ones((5, 5))), ) - for method in terms: - check_arrays(results[method], expected_ranks[method]) check({ meth: f.rank(method=meth, ascending=False) @@ -294,14 +286,12 @@ class FactorTestCase(BasePipelineTestCase): mask_data = ~eye(5, dtype=bool) initial_workspace = {f: data, Mask(): mask_data} - graph = TermGraph( - { - "ascending_nomask": f.rank(ascending=True), - "ascending_mask": f.rank(ascending=True, mask=Mask()), - "descending_nomask": f.rank(ascending=False), - "descending_mask": f.rank(ascending=False, mask=Mask()), - } - ) + terms = { + "ascending_nomask": f.rank(ascending=True), + "ascending_mask": f.rank(ascending=True, mask=Mask()), + "descending_nomask": f.rank(ascending=False), + "descending_mask": f.rank(ascending=False, mask=Mask()), + } expected = { "ascending_nomask": array([[1., 3., 4., 5., 2.], @@ -328,13 +318,12 @@ class FactorTestCase(BasePipelineTestCase): [4., 3., 2., 1., nan]]), } - results = self.run_graph( - graph, + self.check_terms( + terms, + expected, initial_workspace, mask=self.build_mask(ones((5, 5))), ) - for method in results: - check_arrays(expected[method], results[method]) @for_each_factor_dtype def test_grouped_rank_ascending(self, name, factor_dtype=float64_dtype): @@ -363,7 +352,7 @@ class FactorTestCase(BasePipelineTestCase): missing_value=None, ) - expected_grouped_ranks = { + expected_ranks = { 'ordinal': array( [[1., 1., 3., 2., 2.], [1., 2., 3., 1., 2.], @@ -402,9 +391,9 @@ class FactorTestCase(BasePipelineTestCase): } def check(terms): - graph = TermGraph(terms) - results = self.run_graph( - graph, + self.check_terms( + terms, + expected={name: expected_ranks[name] for name in terms}, initial_workspace={ f: data, c: classifier_data, @@ -413,25 +402,22 @@ class FactorTestCase(BasePipelineTestCase): mask=self.build_mask(ones((5, 5))), ) - for method in terms: - check_arrays(results[method], expected_grouped_ranks[method]) - # Not specifying the value of ascending param should default to True check({ meth: f.rank(method=meth, groupby=c) - for meth in expected_grouped_ranks + for meth in expected_ranks }) check({ meth: f.rank(method=meth, groupby=str_c) - for meth in expected_grouped_ranks + for meth in expected_ranks }) check({ meth: f.rank(method=meth, groupby=c, ascending=True) - for meth in expected_grouped_ranks + for meth in expected_ranks }) check({ meth: f.rank(method=meth, groupby=str_c, ascending=True) - for meth in expected_grouped_ranks + for meth in expected_ranks }) # Not passing a method should default to ordinal @@ -468,7 +454,7 @@ class FactorTestCase(BasePipelineTestCase): missing_value=None, ) - expected_grouped_ranks = { + expected_ranks = { 'ordinal': array( [[2., 2., 1., 1., 3.], [2., 1., 1., 2., 3.], @@ -507,9 +493,9 @@ class FactorTestCase(BasePipelineTestCase): } def check(terms): - graph = TermGraph(terms) - results = self.run_graph( - graph, + self.check_terms( + terms, + expected={name: expected_ranks[name] for name in terms}, initial_workspace={ f: data, c: classifier_data, @@ -518,16 +504,13 @@ class FactorTestCase(BasePipelineTestCase): mask=self.build_mask(ones((5, 5))), ) - for method in terms: - check_arrays(results[method], expected_grouped_ranks[method]) - check({ meth: f.rank(method=meth, groupby=c, ascending=False) - for meth in expected_grouped_ranks + for meth in expected_ranks }) check({ meth: f.rank(method=meth, groupby=str_c, ascending=False) - for meth in expected_grouped_ranks + for meth in expected_ranks }) # Not passing a method should default to ordinal @@ -707,9 +690,9 @@ class FactorTestCase(BasePipelineTestCase): expected['grouped_str'] = expected['grouped'] expected['grouped_masked_str'] = expected['grouped_masked'] - graph = TermGraph(terms) - results = self.run_graph( - graph, + self.check_terms( + terms, + expected, initial_workspace={ f: factor_data, c: classifier_data, @@ -717,20 +700,13 @@ class FactorTestCase(BasePipelineTestCase): m: filter_data, }, mask=self.build_mask(self.ones_mask(shape=factor_data.shape)), + # The hand-computed values aren't very precise (in particular, + # we truncate repeating decimals at 3 places) This is just + # asserting that the example isn't misleading by being totally + # wrong. + check=partial(check_allclose, atol=0.001), ) - for key, (res, exp) in dzip_exact(results, expected).items(): - check_allclose( - res, - exp, - # The hand-computed values aren't very precise (in particular, - # we truncate repeating decimals at 3 places) This is just - # asserting that the example isn't misleading by being totally - # wrong. - atol=0.001, - err_msg="Mismatch for %r" % key - ) - @parameter_space( seed_value=range(1, 2), normalizer_name_and_func=[ diff --git a/tests/pipeline/test_filter.py b/tests/pipeline/test_filter.py index 1861dee8..958c6e90 100644 --- a/tests/pipeline/test_filter.py +++ b/tests/pipeline/test_filter.py @@ -12,7 +12,6 @@ from numpy import ( array, eye, float64, - full_like, full, inf, isfinite, @@ -127,7 +126,6 @@ class FilterTestCase(BasePipelineTestCase): nan_data[:, 0] = nan mask = Mask() - workspace = {self.f: data, mask: mask_data} methods = ['top', 'bottom'] counts = 2, 3, 10 @@ -136,18 +134,6 @@ class FilterTestCase(BasePipelineTestCase): def termname(method, count, masked): return '_'.join([method, str(count), 'mask' if masked else '']) - # Add a term for each permutation of top/bottom, count, and - # mask/no_mask. - terms = {} - for method, count, masked in term_combos: - kwargs = {'N': count} - if masked: - kwargs['mask'] = mask - term = getattr(self.f, method)(**kwargs) - terms[termname(method, count, masked)] = term - - results = self.run_graph(TermGraph(terms), initial_workspace=workspace) - def expected_result(method, count, masked): # Ranking with a mask is equivalent to ranking with nans applied on # the masked values. @@ -158,72 +144,55 @@ class FilterTestCase(BasePipelineTestCase): elif method == 'bottom': return rowwise_rank(to_rank) < count + # Add a term for each permutation of top/bottom, count, and + # mask/no_mask. + terms = {} + expected = {} for method, count, masked in term_combos: - result = results[termname(method, count, masked)] + kwargs = {'N': count} + if masked: + kwargs['mask'] = mask + term = getattr(self.f, method)(**kwargs) + name = termname(method, count, masked) + terms[name] = term + expected[name] = expected_result(method, count, masked) - # Check that `min(c, num_assets)` assets passed each day. - passed_per_day = result.sum(axis=1) - check_arrays( - passed_per_day, - full_like(passed_per_day, min(count, data.shape[1])), - ) - - expected = expected_result(method, count, masked) - check_arrays(result, expected) - - def test_bottom(self): - counts = 2, 3, 10 - data = self.randn_data(seed=5) # Arbitrary seed choice. - results = self.run_graph( - TermGraph( - {'bottom_' + str(c): self.f.bottom(c) for c in counts} - ), - initial_workspace={self.f: data}, + self.check_terms( + terms, + expected, + initial_workspace={self.f: data, mask: mask_data}, + mask=self.build_mask(self.ones_mask()), ) - for c in counts: - result = results['bottom_' + str(c)] - - # Check that `min(c, num_assets)` assets passed each day. - passed_per_day = result.sum(axis=1) - check_arrays( - passed_per_day, - full_like(passed_per_day, min(c, data.shape[1])), - ) - - # Check that the bottom `c` assets passed. - expected = rowwise_rank(data) < c - check_arrays(result, expected) def test_percentile_between(self): quintiles = range(5) filter_names = ['pct_' + str(q) for q in quintiles] iter_quintiles = zip(filter_names, quintiles) - - graph = TermGraph( - { - name: self.f.percentile_between(q * 20.0, (q + 1) * 20.0) - for name, q in zip(filter_names, quintiles) - } - ) + terms = { + name: self.f.percentile_between(q * 20.0, (q + 1) * 20.0) + for name, q in iter_quintiles + } # Test with 5 columns and no NaNs. eye5 = eye(5, dtype=float64) - results = self.run_graph( - graph, - initial_workspace={self.f: eye5}, - mask=self.build_mask(ones((5, 5))), - ) + expected = {} for name, quintile in iter_quintiles: - result = results[name] if quintile < 4: # There are four 0s and one 1 in each row, so the first 4 # quintiles should be all the locations with zeros in the input # array. - check_arrays(result, ~eye5.astype(bool)) + expected[name] = ~eye5.astype(bool) else: # The top quintile should match the sole 1 in each row. - check_arrays(result, eye5.astype(bool)) + expected[name] = eye5.astype(bool) + + self.check_terms( + terms=terms, + expected=expected, + initial_workspace={self.f: eye5}, + mask=self.build_mask(ones((5, 5))), + ) # Test with 6 columns, no NaNs, and one masked entry per day. eye6 = eye(6, dtype=float64) @@ -233,41 +202,44 @@ class FilterTestCase(BasePipelineTestCase): [1, 1, 0, 1, 1, 1], [1, 1, 1, 0, 1, 1], [1, 1, 1, 1, 0, 1]], dtype=bool) - - results = self.run_graph( - graph, - initial_workspace={self.f: eye6}, - mask=self.build_mask(mask) - ) + expected = {} for name, quintile in iter_quintiles: - result = results[name] if quintile < 4: # Should keep all values that were 0 in the base data and were # 1 in the mask. - check_arrays(result, mask & (~eye6.astype(bool))), + expected[name] = mask & ~eye6.astype(bool) else: - # Should keep all the 1s in the base data. - check_arrays(result, eye6.astype(bool)) + # The top quintile should match the sole 1 in each row. + expected[name] = eye6.astype(bool) + + self.check_terms( + terms=terms, + expected=expected, + initial_workspace={self.f: eye6}, + mask=self.build_mask(mask), + ) # Test with 6 columns, no mask, and one NaN per day. Should have the # same outcome as if we had masked the NaNs. # In particular, the NaNs should never pass any filters. eye6_withnans = eye6.copy() putmask(eye6_withnans, ~mask, nan) - results = self.run_graph( - graph, - initial_workspace={self.f: eye6}, - mask=self.build_mask(mask) - ) + expected = {} for name, quintile in iter_quintiles: - result = results[name] if quintile < 4: # Should keep all values that were 0 in the base data and were # 1 in the mask. - check_arrays(result, mask & (~eye6.astype(bool))), + expected[name] = mask & (~eye6.astype(bool)) else: # Should keep all the 1s in the base data. - check_arrays(result, eye6.astype(bool)) + expected[name] = eye6.astype(bool) + + self.check_terms( + terms, + expected, + initial_workspace={self.f: eye6}, + mask=self.build_mask(mask), + ) def test_percentile_nasty_partitions(self): # Test percentile with nasty partitions: divide up 5 assets into @@ -281,27 +253,26 @@ class FilterTestCase(BasePipelineTestCase): quartiles = range(4) filter_names = ['pct_' + str(q) for q in quartiles] - graph = TermGraph( - { - name: self.f.percentile_between(q * 25.0, (q + 1) * 25.0) - for name, q in zip(filter_names, quartiles) - } - ) - results = self.run_graph( - graph, - initial_workspace={self.f: data}, - mask=self.build_mask(ones((5, 5))), - ) + terms = { + name: self.f.percentile_between(q * 25.0, (q + 1) * 25.0) + for name, q in zip(filter_names, quartiles) + } + expected = {} for name, quartile in zip(filter_names, quartiles): - result = results[name] lower = quartile * 25.0 upper = (quartile + 1) * 25.0 - expected = and_( + expected[name] = and_( nanpercentile(data, lower, axis=1, keepdims=True) <= data, data <= nanpercentile(data, upper, axis=1, keepdims=True), ) - check_arrays(result, expected) + + self.check_terms( + terms, + expected, + initial_workspace={self.f: data}, + mask=self.build_mask(ones((5, 5))), + ) def test_percentile_after_mask(self): f_input = eye(5) @@ -312,77 +283,79 @@ class FilterTestCase(BasePipelineTestCase): without_mask = self.g.percentile_between(80, 100) with_mask = self.g.percentile_between(80, 100, mask=custom_mask) - graph = TermGraph( - { - 'custom_mask': custom_mask, - 'without': without_mask, - 'with': with_mask, - } - ) + terms = { + 'mask': custom_mask, + 'without_mask': without_mask, + 'with_mask': with_mask, + } + expected = { + # Mask that accepts everything except the diagonal. + 'mask': ~eye(5, dtype=bool), + # Second should pass the largest value each day. Each row is + # strictly increasing, so we always select the last value. + 'without_mask': array( + [[0, 0, 0, 0, 1], + [0, 0, 0, 0, 1], + [0, 0, 0, 0, 1], + [0, 0, 0, 0, 1], + [0, 0, 0, 0, 1]], + dtype=bool, + ), + # With a mask, we should remove the diagonal as an option before + # computing percentiles. On the last day, we should get the + # second-largest value, rather than the largest. + 'with_mask': array( + [[0, 0, 0, 0, 1], + [0, 0, 0, 0, 1], + [0, 0, 0, 0, 1], + [0, 0, 0, 0, 1], + [0, 0, 0, 1, 0]], # Different from with! + dtype=bool, + ), + } - results = self.run_graph( - graph, + self.check_terms( + terms, + expected, initial_workspace={self.f: f_input, self.g: g_input}, mask=initial_mask, ) - # First should pass everything but the diagonal. - check_arrays(results['custom_mask'], ~eye(5, dtype=bool)) - - # Second should pass the largest value each day. Each row is strictly - # increasing, so we always select the last value. - expected_without = array( - [[0, 0, 0, 0, 1], - [0, 0, 0, 0, 1], - [0, 0, 0, 0, 1], - [0, 0, 0, 0, 1], - [0, 0, 0, 0, 1]], - dtype=bool, - ) - check_arrays(results['without'], expected_without) - - # When sequencing, we should remove the diagonal as an option before - # computing percentiles. On the last day, we should get the - # second-largest value, rather than the largest. - expected_with = array( - [[0, 0, 0, 0, 1], - [0, 0, 0, 0, 1], - [0, 0, 0, 0, 1], - [0, 0, 0, 0, 1], - [0, 0, 0, 1, 0]], # Different from previous! - dtype=bool, - ) - check_arrays(results['with'], expected_with) - def test_isnan(self): data = self.randn_data(seed=10) diag = eye(*data.shape, dtype=bool) data[diag] = nan - results = self.run_graph( - TermGraph({ + self.check_terms( + terms={ 'isnan': self.f.isnan(), 'isnull': self.f.isnull(), - }), + }, + expected={ + 'isnan': diag, + 'isnull': diag, + }, initial_workspace={self.f: data}, + mask=self.build_mask(self.ones_mask()), ) - check_arrays(results['isnan'], diag) - check_arrays(results['isnull'], diag) def test_notnan(self): data = self.randn_data(seed=10) diag = eye(*data.shape, dtype=bool) data[diag] = nan - results = self.run_graph( - TermGraph({ + self.check_terms( + terms={ 'notnan': self.f.notnan(), 'notnull': self.f.notnull(), - }), + }, + expected={ + 'notnan': ~diag, + 'notnull': ~diag, + }, initial_workspace={self.f: data}, + mask=self.build_mask(self.ones_mask()), ) - check_arrays(results['notnan'], ~diag) - check_arrays(results['notnull'], ~diag) def test_isfinite(self): data = self.randn_data(seed=10) @@ -390,11 +363,12 @@ class FilterTestCase(BasePipelineTestCase): data[:, 2] = inf data[:, 4] = -inf - results = self.run_graph( - TermGraph({'isfinite': self.f.isfinite()}), + self.check_terms( + terms={'isfinite': self.f.isfinite()}, + expected={'isfinite': isfinite(data)}, initial_workspace={self.f: data}, + mask=self.build_mask(self.ones_mask()), ) - check_arrays(results['isfinite'], isfinite(data)) def test_all(self): @@ -427,18 +401,19 @@ class FilterTestCase(BasePipelineTestCase): inputs = () window_length = 0 - results = self.run_graph( - TermGraph({ + self.check_terms( + terms={ '3': All(inputs=[Input()], window_length=3), '4': All(inputs=[Input()], window_length=4), - }), + }, + expected={ + '3': expected_3, + '4': expected_4, + }, initial_workspace={Input(): data}, mask=self.build_mask(ones(shape=data.shape)), ) - check_arrays(results['3'], expected_3) - check_arrays(results['4'], expected_4) - def test_any(self): # FUN FACT: The inputs and outputs here are exactly the negation of @@ -486,18 +461,19 @@ class FilterTestCase(BasePipelineTestCase): inputs = () window_length = 0 - results = self.run_graph( - TermGraph({ + self.check_terms( + terms={ '3': Any(inputs=[Input()], window_length=3), '4': Any(inputs=[Input()], window_length=4), - }), + }, + expected={ + '3': expected_3, + '4': expected_4, + }, initial_workspace={Input(): data}, mask=self.build_mask(ones(shape=data.shape)), ) - check_arrays(results['3'], expected_3) - check_arrays(results['4'], expected_4) - def test_at_least_N(self): # With a window_length of K, AtLeastN should return 1 @@ -553,26 +529,27 @@ class FilterTestCase(BasePipelineTestCase): window_length=4, N=4) - results = self.run_graph( - TermGraph({ + self.check_terms( + terms={ 'AllButOne': all_but_one, 'AllButTwo': all_but_two, 'AnyEquiv': any_equiv, 'AllEquiv': all_equiv, 'Any': Any(inputs=[Input()], window_length=4), 'All': All(inputs=[Input()], window_length=4) - }), + }, + expected={ + 'Any': expected_1, + 'AnyEquiv': expected_1, + 'AllButTwo': expected_2, + 'AllButOne': expected_3, + 'All': expected_4, + 'AllEquiv': expected_4, + }, initial_workspace={Input(): data}, mask=self.build_mask(ones(shape=data.shape)), ) - check_arrays(results['Any'], expected_1) - check_arrays(results['AnyEquiv'], expected_1) - check_arrays(results['AllButTwo'], expected_2) - check_arrays(results['AllButOne'], expected_3) - check_arrays(results['All'], expected_4) - check_arrays(results['AllEquiv'], expected_4) - @parameter_space(factor_len=[2, 3, 4]) def test_window_safe(self, factor_len): # all true data set of (days, securities) @@ -591,19 +568,19 @@ class FilterTestCase(BasePipelineTestCase): # sum for each column out[:] = np_sum(filter_, axis=0) - results = self.run_graph( - TermGraph({'windowsafe': TestFactor()}), - initial_workspace={InputFilter(): data}, - ) - - # number of days in default_shape n = self.default_shape[0] - - # shape of output array output_shape = ((n - factor_len + 1), self.default_shape[1]) - check_arrays( - results['windowsafe'], - full(output_shape, factor_len, dtype=float64) + full(output_shape, factor_len, dtype=float64) + + self.check_terms( + terms={ + 'windowsafe': TestFactor(), + }, + expected={ + 'windowsafe': full(output_shape, factor_len, dtype=float64), + }, + initial_workspace={InputFilter(): data}, + mask=self.build_mask(self.ones_mask()), ) @parameter_space( diff --git a/tests/pipeline/test_technical.py b/tests/pipeline/test_technical.py index 718cc7fd..505e1f38 100644 --- a/tests/pipeline/test_technical.py +++ b/tests/pipeline/test_technical.py @@ -7,10 +7,7 @@ import pandas as pd import talib from zipline.lib.adjusted_array import AdjustedArray -from zipline.pipeline import TermGraph from zipline.pipeline.data import USEquityPricing -from zipline.pipeline.engine import SimplePipelineEngine -from zipline.pipeline.term import AssetExists from zipline.pipeline.factors import ( BollingerBands, Aroon, @@ -20,61 +17,22 @@ from zipline.pipeline.factors import ( RateOfChangePercentage, TrueRange, ) -from zipline.testing import ExplodingObject, parameter_space -from zipline.testing.fixtures import WithAssetFinder, ZiplineTestCase +from zipline.testing import parameter_space +from zipline.testing.fixtures import ZiplineTestCase from zipline.testing.predicates import assert_equal - -class WithTechnicalFactor(WithAssetFinder): - """ZiplineTestCase fixture for testing technical factors. - """ - ASSET_FINDER_EQUITY_SIDS = tuple(range(5)) - START_DATE = pd.Timestamp('2014-01-01', tz='utc') - - @classmethod - def init_class_fixtures(cls): - super(WithTechnicalFactor, cls).init_class_fixtures() - cls.ndays = ndays = 24 - cls.nassets = nassets = len(cls.ASSET_FINDER_EQUITY_SIDS) - cls.dates = dates = pd.date_range(cls.START_DATE, periods=ndays) - cls.assets = pd.Index(cls.asset_finder.sids) - cls.engine = SimplePipelineEngine( - lambda column: ExplodingObject(), - dates, - cls.asset_finder, - ) - cls.asset_exists = exists = np.full((ndays, nassets), True, dtype=bool) - cls.asset_exists_masked = masked = exists.copy() - masked[:, -1] = False - - def run_graph(self, graph, initial_workspace, mask_sid): - initial_workspace.setdefault( - AssetExists(), - self.asset_exists_masked if mask_sid else self.asset_exists, - ) - return self.engine.compute_chunk( - graph, - self.dates, - self.assets, - initial_workspace, - ) +from .base import BasePipelineTestCase -class BollingerBandsTestCase(WithTechnicalFactor, ZiplineTestCase): - @classmethod - def init_class_fixtures(cls): - super(BollingerBandsTestCase, cls).init_class_fixtures() - cls._closes = closes = ( - np.arange(cls.ndays, dtype=float)[:, np.newaxis] + - np.arange(cls.nassets, dtype=float) * 100 - ) - cls._closes_masked = masked = closes.copy() - masked[:, -1] = np.nan +class BollingerBandsTestCase(BasePipelineTestCase): - def closes(self, masked): - return self._closes_masked if masked else self._closes + def closes(self, mask_last_sid): + data = self.arange_data(dtype=np.float64) + if mask_last_sid: + data[:, -1] = np.nan + return data - def expected(self, window_length, k, closes): + def expected_bbands(self, window_length, k, closes): """Compute the expected data (without adjustments) for the given window, k, and closes array. @@ -83,11 +41,14 @@ class BollingerBandsTestCase(WithTechnicalFactor, ZiplineTestCase): lower_cols = [] middle_cols = [] upper_cols = [] - for n in range(self.nassets): + + ndates, nassets = closes.shape + + for n in range(nassets): close_col = closes[:, n] if np.isnan(close_col).all(): # ta-lib doesn't deal well with all nans. - upper, middle, lower = [np.full(self.ndays, np.nan)] * 3 + upper, middle, lower = [np.full(ndates, np.nan)] * 3 else: upper, middle, lower = talib.BBANDS( close_col, @@ -112,38 +73,38 @@ class BollingerBandsTestCase(WithTechnicalFactor, ZiplineTestCase): @parameter_space( window_length={5, 10, 20}, k={1.5, 2, 2.5}, - mask_sid={True, False}, + mask_last_sid={True, False}, + __fail_fast=True, ) - def test_bollinger_bands(self, window_length, k, mask_sid): - closes = self.closes(mask_sid) - result = self.run_graph( - TermGraph({ - 'f': BollingerBands( - window_length=window_length, - k=k, - ), - }), + def test_bollinger_bands(self, window_length, k, mask_last_sid): + closes = self.closes(mask_last_sid=mask_last_sid) + mask = ~np.isnan(closes) + bbands = BollingerBands(window_length=window_length, k=k) + + expected = self.expected_bbands(window_length, k, closes) + + self.check_terms( + terms={ + 'upper': bbands.upper, + 'middle': bbands.middle, + 'lower': bbands.lower, + }, + expected={ + 'upper': expected[0], + 'middle': expected[1], + 'lower': expected[2], + }, initial_workspace={ USEquityPricing.close: AdjustedArray( - closes, - np.full_like(closes, True, dtype=bool), - {}, - np.nan, + data=closes, + mask=mask, + adjustments={}, + missing_value=np.nan, ), }, - mask_sid=mask_sid, - )['f'] - - expected_upper, expected_middle, expected_lower = self.expected( - window_length, - k, - closes, + mask=self.build_mask(mask), ) - assert_equal(result.upper, expected_upper) - assert_equal(result.middle, expected_middle) - assert_equal(result.lower, expected_lower) - def test_bollinger_bands_output_ordering(self): bbands = BollingerBands(window_length=5, k=2) lower, middle, upper = bbands @@ -185,7 +146,7 @@ class AroonTestCase(ZiplineTestCase): assert_equal(out, expected_out) -class TestFastStochasticOscillator(WithTechnicalFactor, ZiplineTestCase): +class TestFastStochasticOscillator(ZiplineTestCase): """ Test the Fast Stochastic Oscillator """ @@ -427,7 +388,7 @@ class TestLinearWeightedMovingAverage(ZiplineTestCase): assert_equal(out, np.array([30., 31., 32., 33., 34.])) -class TestTrueRange(WithTechnicalFactor, ZiplineTestCase): +class TestTrueRange(ZiplineTestCase): def test_tr_basic(self): tr = TrueRange() diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index ecf31ce2..c58dd4e0 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -6,6 +6,7 @@ from itertools import product from unittest import TestCase from toolz import assoc +import pandas as pd from zipline.assets import Asset from zipline.errors import ( @@ -24,7 +25,7 @@ from zipline.pipeline import ( CustomFactor, Factor, Filter, - TermGraph, + ExecutionPlan, ) from zipline.pipeline.data import Column, DataSet from zipline.pipeline.data.testing import TestingDataSet @@ -33,6 +34,7 @@ from zipline.pipeline.factors import RecarrayField from zipline.pipeline.sentinels import NotSpecified from zipline.pipeline.term import AssetExists, Slice from zipline.testing import parameter_space +from zipline.testing.fixtures import WithTradingSessions, ZiplineTestCase from zipline.testing.predicates import ( assert_equal, assert_raises, @@ -152,7 +154,14 @@ def to_dict(l): return dict(zip(map(str, range(len(l))), l)) -class DependencyResolutionTestCase(TestCase): +class DependencyResolutionTestCase(WithTradingSessions, ZiplineTestCase): + + TRADING_CALENDAR_STRS = ('NYSE',) + START_DATE = pd.Timestamp('2014-01-02', tz='UTC') + END_DATE = pd.Timestamp('2014-12-31', tz='UTC') + + execution_plan_start = pd.Timestamp('2014-06-01', tz='UTC') + execution_plan_end = pd.Timestamp('2014-06-30', tz='UTC') def check_dependency_order(self, ordered_terms): seen = set() @@ -163,6 +172,14 @@ class DependencyResolutionTestCase(TestCase): seen.add(term) + def make_execution_plan(self, terms): + return ExecutionPlan( + terms, + self.nyse_sessions, + self.execution_plan_start, + self.execution_plan_end, + ) + def test_single_factor(self): """ Test dependency resolution for a single factor. @@ -182,7 +199,7 @@ class DependencyResolutionTestCase(TestCase): self.assertEqual(graph.node[SomeDataSet.bar]['extra_rows'], 4) for foobar in gen_equivalent_factors(): - check_output(TermGraph(to_dict([foobar]))) + check_output(self.make_execution_plan(to_dict([foobar]))) def test_single_factor_instance_args(self): """ @@ -190,7 +207,9 @@ class DependencyResolutionTestCase(TestCase): the constructor. """ bar, buzz = SomeDataSet.bar, SomeDataSet.buzz - graph = TermGraph(to_dict([SomeFactor([bar, buzz], window_length=5)])) + + factor = SomeFactor([bar, buzz], window_length=5) + graph = self.make_execution_plan(to_dict([factor])) resolution_order = list(graph.ordered()) @@ -214,7 +233,7 @@ class DependencyResolutionTestCase(TestCase): f1 = SomeFactor([SomeDataSet.foo, SomeDataSet.bar]) f2 = SomeOtherFactor([SomeDataSet.bar, SomeDataSet.buzz]) - graph = TermGraph(to_dict([f1, f2])) + graph = self.make_execution_plan(to_dict([f1, f2])) resolution_order = list(graph.ordered()) # bar should only appear once. diff --git a/zipline/pipeline/__init__.py b/zipline/pipeline/__init__.py index de6d142f..8d2a412f 100644 --- a/zipline/pipeline/__init__.py +++ b/zipline/pipeline/__init__.py @@ -6,7 +6,7 @@ from .engine import SimplePipelineEngine from .factors import Factor, CustomFactor from .filters import Filter, CustomFilter from .term import Term -from .graph import TermGraph +from .graph import ExecutionPlan, TermGraph from .pipeline import Pipeline from .loaders import USEquityPricingLoader @@ -56,6 +56,7 @@ __all__ = ( 'CustomFilter', 'CustomClassifier', 'engine_from_files', + 'ExecutionPlan', 'Factor', 'Filter', 'Pipeline', diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index ffb5b1c9..616178c7 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -21,7 +21,7 @@ from zipline.errors import NoFurtherDataError from zipline.utils.numpy_utils import repeat_first_axis, repeat_last_axis from zipline.utils.pandas_utils import explode -from .term import AssetExists, LoadableTerm +from .term import AssetExists, InputDates, LoadableTerm class PipelineEngine(with_metaclass(ABCMeta)): @@ -98,6 +98,7 @@ class SimplePipelineEngine(object): '_calendar', '_finder', '_root_mask_term', + '_root_mask_dates_term', '__weakref__', ) @@ -105,7 +106,9 @@ class SimplePipelineEngine(object): self._get_loader = get_loader self._calendar = calendar self._finder = asset_finder + self._root_mask_term = AssetExists() + self._root_mask_dates_term = InputDates() def run_pipeline(self, pipeline, start_date, end_date): """ @@ -161,7 +164,13 @@ class SimplePipelineEngine(object): ) screen_name = uuid4().hex - graph = pipeline.to_graph(screen_name, self._root_mask_term) + graph = pipeline.to_execution_plan( + screen_name, + self._root_mask_term, + self._calendar, + start_date, + end_date, + ) extra_rows = graph.extra_rows[self._root_mask_term] root_mask = self._compute_root_mask(start_date, end_date, extra_rows) dates, assets, root_mask_values = explode(root_mask) @@ -170,7 +179,10 @@ class SimplePipelineEngine(object): graph, dates, assets, - initial_workspace={self._root_mask_term: root_mask_values}, + initial_workspace={ + self._root_mask_term: root_mask_values, + self._root_mask_dates_term: dates.values[:, None], + }, ) return self._to_narrow( diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 510776ad..eedb8c3c 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -5,7 +5,7 @@ from networkx import ( DiGraph, topological_sort, ) -from six import itervalues, iteritems +from six import iteritems from zipline.utils.memoize import lazyval from zipline.pipeline.visualize import display_graph @@ -18,7 +18,112 @@ class CyclicDependency(Exception): class TermGraph(DiGraph): """ - Graph represention of Pipeline Term dependencies. + An abstract representation of Pipeline Term dependencies. + + This class does not keep any additional metadata about any term relations + other than dependency ordering. As such it is only useful in contexts + where you care exclusively about order properties (for example, when + drawing visualizations of execution order). + + Parameters + ---------- + terms : dict + A dict mapping names to final output terms. + + Attributes + ---------- + outputs + + Methods + ------- + ordered() + Return a topologically-sorted iterator over the terms in self. + + See Also + -------- + ExecutionPlan + """ + def __init__(self, terms): + super(TermGraph, self).__init__() + + self._frozen = False + parents = set() + for name, term in iteritems(terms): + self._add_to_graph(term, parents) + # No parents should be left between top-level terms. + assert not parents + + self._outputs = terms + self._ordered = topological_sort(self) + + # Mark that no more terms should be added to the graph. + self._frozen = True + + def _add_to_graph(self, term, parents): + """ + Add a term and all its children to ``graph``. + + ``parents`` is the set of all the parents of ``term` that we've added + so far. It is only used to detect dependency cycles. + """ + if self._frozen: + raise ValueError( + "Can't mutate %s after construction." % type(self).__name__ + ) + + # If we've seen this node already as a parent of the current traversal, + # it means we have an unsatisifiable dependency. This should only be + # possible if the term's inputs are mutated after construction. + if term in parents: + raise CyclicDependency(term) + + parents.add(term) + + self.add_node(term) + + for dependency in term.dependencies: + self._add_to_graph(dependency, parents) + self.add_edge(dependency, term) + + parents.remove(term) + + @property + def outputs(self): + """ + Dict mapping names to designated output terms. + """ + return self._outputs + + def ordered(self): + """ + Return a topologically-sorted iterator over the terms in `self`. + """ + return iter(self._ordered) + + @lazyval + def loadable_terms(self): + return tuple(term for term in self if isinstance(term, LoadableTerm)) + + @lazyval + def jpeg(self): + return display_graph(self, 'jpeg') + + @lazyval + def png(self): + return display_graph(self, 'png') + + @lazyval + def svg(self): + return display_graph(self, 'svg') + + def _repr_png_(self): + return self.png.data + + +class ExecutionPlan(TermGraph): + """ + Graph represention of Pipeline Term dependencies that includes metadata + about extra rows required to perform computations. Each node in the graph has an `extra_rows` attribute, indicating how many, if any, extra rows we should compute for the node. Extra rows are most @@ -30,6 +135,8 @@ class TermGraph(DiGraph): ---------- terms : dict A dict mapping names to final output terms. + all_dates : pd.DatetimeIndex + The dates fo Attributes ---------- @@ -42,21 +149,58 @@ class TermGraph(DiGraph): ordered() Return a topologically-sorted iterator over the terms in self. """ - def __init__(self, terms): - super(TermGraph, self).__init__(self) + def __init__(self, + terms, + all_dates, + start_date, + end_date, + min_extra_rows=0): + super(ExecutionPlan, self).__init__(terms) - self._frozen = False - parents = set() - for term in itervalues(terms): - self._add_to_graph(term, parents, extra_rows=0) - # No parents should be left between top-level terms. - assert not parents + for term in terms.values(): + self.set_extra_rows( + term, + all_dates, + start_date, + end_date, + min_extra_rows=min_extra_rows, + ) - self._outputs = terms - self._ordered = topological_sort(self) + def set_extra_rows(self, + term, + all_dates, + start_date, + end_date, + min_extra_rows): + """ + Compute ``extra_rows`` for transitive dependencies of ``root_terms`` + """ + # A term can require that additional extra rows beyond the minimum be + # computed. This is most often used with downsampled terms, which need + # to ensure that the first date is a computation date. + extra_rows_for_term = term.compute_extra_rows( + all_dates, + start_date, + end_date, + min_extra_rows, + ) + if extra_rows_for_term < min_extra_rows: + raise ValueError( + "term %s requested fewer rows than the minimum of %d" % ( + term, min_extra_rows, + ) + ) - # Mark that no more terms should be added to the graph. - self._frozen = True + self._ensure_extra_rows(term, extra_rows_for_term) + + for dependency, additional_extra_rows in term.dependencies.items(): + self.set_extra_rows( + dependency, + all_dates, + start_date, + end_date, + min_extra_rows=extra_rows_for_term + additional_extra_rows, + ) @lazyval def offset(self): @@ -175,71 +319,9 @@ class TermGraph(DiGraph): for term, attrs in iteritems(self.node) } - @property - def outputs(self): - """ - Dict mapping names to designated output terms. - """ - return self._outputs - - def ordered(self): - """ - Return a topologically-sorted iterator over the terms in `self`. - """ - return iter(self._ordered) - - @lazyval - def loadable_terms(self): - return tuple(term for term in self if isinstance(term, LoadableTerm)) - - def _add_to_graph(self, term, parents, extra_rows): - """ - Add `term` and all its inputs to the graph. - """ - if self._frozen: - raise ValueError("Can't mutate `TermGraph` after construction.") - # If we've seen this node already as a parent of the current traversal, - # it means we have an unsatisifiable dependency. This should only be - # possible if the term's inputs are mutated after construction. - if term in parents: - raise CyclicDependency(term) - parents.add(term) - - # Idempotent if term is already in the graph. - self.add_node(term) - - # Make sure we're going to compute at least `extra_rows` of `term`. - self._ensure_extra_rows(term, extra_rows) - - # Recursively add dependencies. - for dependency, additional_extra_rows in term.dependencies.items(): - self._add_to_graph( - dependency, - parents, - extra_rows=extra_rows + additional_extra_rows, - ) - self.add_edge(dependency, term) - - parents.remove(term) - def _ensure_extra_rows(self, term, N): """ Ensure that we're going to compute at least N extra rows of `term`. """ attrs = self.node[term] attrs['extra_rows'] = max(N, attrs.get('extra_rows', 0)) - - @lazyval - def jpeg(self): - return display_graph(self, 'jpeg') - - @lazyval - def png(self): - return display_graph(self, 'png') - - @lazyval - def svg(self): - return display_graph(self, 'svg') - - def _repr_png_(self): - return self.png.data diff --git a/zipline/pipeline/pipeline.py b/zipline/pipeline/pipeline.py index 53276b82..631b99d3 100644 --- a/zipline/pipeline/pipeline.py +++ b/zipline/pipeline/pipeline.py @@ -2,9 +2,9 @@ from zipline.errors import UnsupportedPipelineOutput from zipline.utils.input_validation import expect_types, optional -from .term import AssetExists, ComputableTerm, Term +from .graph import ExecutionPlan, TermGraph from .filters import Filter -from .graph import TermGraph +from .term import AssetExists, ComputableTerm, Term class Pipeline(object): @@ -148,9 +148,39 @@ class Pipeline(object): ) self._screen = screen - def to_graph(self, screen_name, default_screen): + def to_execution_plan(self, + screen_name, + default_screen, + all_dates, + start_date, + end_date): """ - Compile into a TermGraph. + Compile into an ExecutionPlan. + + Parameters + ---------- + screen_name : str + Name to supply for self.screen. + default_screen : zipline.pipeline.term.Term + Term to use as a screen if self.screen is None. + all_dates : pd.DatetimeIndex + A calendar of dates to use to calculate starts and ends for each + term. + start_date : pd.Timestamp + The first date of requested output. + end_date : pd.Timestamp + The last date of requested output. + """ + return ExecutionPlan( + self._prepare_graph_terms(screen_name, default_screen), + all_dates, + start_date, + end_date, + ) + + def to_simple_graph(self, screen_name, default_screen): + """ + Compile into a simple TermGraph with no extra row metadata. Parameters ---------- @@ -159,13 +189,16 @@ class Pipeline(object): default_screen : zipline.pipeline.term.Term Term to use as a screen if self.screen is None. """ + return TermGraph(self._prepare_graph_terms()) + + def _prepare_graph_terms(self, screen_name, default_screen): + """Helper for to_graph and to_execution_plan.""" columns = self.columns.copy() screen = self.screen if screen is None: screen = default_screen columns[screen_name] = screen - - return TermGraph(columns) + return columns def show_graph(self, format='svg'): """ @@ -176,7 +209,7 @@ class Pipeline(object): format : {'svg', 'png', 'jpeg'} Image format to render with. Default is 'svg'. """ - g = self.to_graph('', AssetExists()) + g = self.to_simple_graph('', AssetExists()) if format == 'svg': return g.svg elif format == 'png': diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 5a5dba5b..5c7218e2 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -34,10 +34,10 @@ from zipline.utils.memoize import lazyval from zipline.utils.numpy_utils import ( bool_dtype, categorical_dtype, + datetime64ns_dtype, default_missing_value_for_dtype, ) -from .mixins import SingleInputMixin from .sentinels import NotSpecified @@ -279,6 +279,20 @@ class Term(with_metaclass(ABCMeta, object)): # call super(). self._subclass_called_super_validate = True + def compute_extra_rows(self, + all_dates, + start_date, + end_date, + min_extra_rows): + """ + Calculate the number of extra rows needed to compute ``self``. + + Must return at least ``min_extra_rows``, but can optionally require + more. This is used by downsampled terms to ensure that the first date + computed is a recomputation date. + """ + return min_extra_rows + @abstractproperty def inputs(self): """ @@ -337,6 +351,38 @@ class AssetExists(Term): def __repr__(self): return "AssetExists()" + def _compute(self, today, assets, out): + raise NotImplementedError( + "AssetExists cannot be computed directly." + " Check your PipelineEngine configuration." + ) + + +class InputDates(Term): + """ + 1-Dimensional term providing date labels for other term inputs. + + This term is guaranteed to be available as an input for any term computed + by SimplePipelineEngine.run_pipeline(). + """ + ndim = 1 + dataset = None + dtype = datetime64ns_dtype + inputs = () + dependencies = {} + mask = None + windowed = False + window_safe = True + + def __repr__(self): + return "InputDates()" + + def _compute(self, today, assets, out): + raise NotImplementedError( + "InputDates cannot be computed directly." + " Check your PipelineEngine configuration." + ) + class LoadableTerm(Term): """ @@ -530,7 +576,7 @@ class ComputableTerm(Term): ) -class Slice(ComputableTerm, SingleInputMixin): +class Slice(ComputableTerm): """ Term for extracting a single column of a another term's output. From 6ac80464986225c033f83cf67af715949c369d2b Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 17:19:22 -0400 Subject: [PATCH 08/25] MAINT: Add nearest_unequal_elements.. --- tests/utils/test_pandas_utils.py | 79 ++++++++++++++++++++++++++++++++ zipline/utils/pandas_utils.py | 50 ++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 tests/utils/test_pandas_utils.py diff --git a/tests/utils/test_pandas_utils.py b/tests/utils/test_pandas_utils.py new file mode 100644 index 00000000..c3389d61 --- /dev/null +++ b/tests/utils/test_pandas_utils.py @@ -0,0 +1,79 @@ +""" +Tests for zipline/utils/pandas_utils.py +""" +import pandas as pd + +from zipline.testing import parameter_space, ZiplineTestCase +from zipline.utils.pandas_utils import nearest_unequal_elements + + +class TestNearestUnequalElements(ZiplineTestCase): + + @parameter_space(tz=['UTC', 'US/Eastern'], __fail_fast=True) + def test_nearest_unequal_elements(self, tz): + + dts = pd.to_datetime( + ['2014-01-01', '2014-01-05', '2014-01-06', '2014-01-09'], + ).tz_localize(tz) + + t = lambda s: None if s is None else pd.Timestamp(s, tz=tz) + + for dt, before, after in (('2013-12-30', None, '2014-01-01'), + ('2013-12-31', None, '2014-01-01'), + ('2014-01-01', None, '2014-01-05'), + ('2014-01-02', '2014-01-01', '2014-01-05'), + ('2014-01-03', '2014-01-01', '2014-01-05'), + ('2014-01-04', '2014-01-01', '2014-01-05'), + ('2014-01-05', '2014-01-01', '2014-01-06'), + ('2014-01-06', '2014-01-05', '2014-01-09'), + ('2014-01-07', '2014-01-06', '2014-01-09'), + ('2014-01-08', '2014-01-06', '2014-01-09'), + ('2014-01-09', '2014-01-06', None), + ('2014-01-10', '2014-01-09', None), + ('2014-01-11', '2014-01-09', None)): + computed = nearest_unequal_elements(dts, t(dt)) + expected = (t(before), t(after)) + self.assertEqual(computed, expected) + + @parameter_space(tz=['UTC', 'US/Eastern'], __fail_fast=True) + def test_nearest_unequal_elements_short_dts(self, tz): + + # Length 1. + dts = pd.to_datetime(['2014-01-01']).tz_localize(tz) + t = lambda s: None if s is None else pd.Timestamp(s, tz=tz) + + for dt, before, after in (('2013-12-31', None, '2014-01-01'), + ('2014-01-01', None, None), + ('2014-01-02', '2014-01-01', None)): + computed = nearest_unequal_elements(dts, t(dt)) + expected = (t(before), t(after)) + self.assertEqual(computed, expected) + + # Length 0 + dts = pd.to_datetime([]).tz_localize(tz) + for dt, before, after in (('2013-12-31', None, None), + ('2014-01-01', None, None), + ('2014-01-02', None, None)): + computed = nearest_unequal_elements(dts, t(dt)) + expected = (t(before), t(after)) + self.assertEqual(computed, expected) + + def test_nearest_unequal_bad_input(self): + with self.assertRaises(ValueError) as e: + nearest_unequal_elements( + pd.to_datetime(['2014', '2014']), + pd.Timestamp('2014'), + ) + + self.assertEqual(str(e.exception), 'dts must be unique') + + with self.assertRaises(ValueError) as e: + nearest_unequal_elements( + pd.to_datetime(['2014', '2013']), + pd.Timestamp('2014'), + ) + + self.assertEqual( + str(e.exception), + 'dts must be sorted in increasing order', + ) diff --git a/zipline/utils/pandas_utils.py b/zipline/utils/pandas_utils.py index 88fdb662..1f0a9db2 100644 --- a/zipline/utils/pandas_utils.py +++ b/zipline/utils/pandas_utils.py @@ -96,3 +96,53 @@ def mask_between_time(dts, start, end, include_start=True, include_end=True): left_op(start_micros, time_micros), right_op(time_micros, end_micros), ) + + +def nearest_unequal_elements(dts, dt): + """ + Find values in ``dts`` closest but not equal to ``dt``. + + Returns a pair of (last_before, first_after). + + When ``dt`` is less than any element in ``dts``, ``last_before`` is None. + When ``dt`` is greater any element in ``dts``, ``first_after`` is None. + + ``dts`` must be unique and sorted in increasing order. + + Parameters + ---------- + dts : pd.DatetimeIndex + Dates in which to search. + dt : pd.Timestamp + Date for which to find bounds. + """ + if not dts.is_unique: + raise ValueError("dts must be unique") + + if not dts.is_monotonic_increasing: + raise ValueError("dts must be sorted in increasing order") + + if not len(dts): + return None, None + + sortpos = dts.searchsorted(dt, side='left') + try: + sortval = dts[sortpos] + except IndexError: + # dt is greater than any value in the array. + return dts[-1], None + + if dt < sortval: + lower_ix = sortpos - 1 + upper_ix = sortpos + elif dt == sortval: + lower_ix = sortpos - 1 + upper_ix = sortpos + 1 + else: + lower_ix = sortpos + upper_ix = sortpos + 1 + + lower_value = dts[lower_ix] if lower_ix >= 0 else None + upper_value = dts[upper_ix] if upper_ix < len(dts) else None + + return lower_value, upper_value From 7bc8603e22d9b4b12ac27d622f135a18f81b3351 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 22:56:13 -0400 Subject: [PATCH 09/25] MAINT: Remove double import. --- zipline/testing/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/zipline/testing/__init__.py b/zipline/testing/__init__.py index 5772e221..ee2aad9d 100644 --- a/zipline/testing/__init__.py +++ b/zipline/testing/__init__.py @@ -30,7 +30,6 @@ from .core import ( # noqa make_test_handler, make_trade_data_for_asset_info, parameter_space, - parameter_space, patch_os_environment, patch_read_csv, permute_rows, From 141a088f7ffdf6c02f9c770ee696acdec5248823 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 22:56:27 -0400 Subject: [PATCH 10/25] ENH: Add numpy_utils.changed_locations. --- zipline/utils/numpy_utils.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/zipline/utils/numpy_utils.py b/zipline/utils/numpy_utils.py index 84fbfd7e..7a6326cc 100644 --- a/zipline/utils/numpy_utils.py +++ b/zipline/utils/numpy_utils.py @@ -11,8 +11,11 @@ from numpy import ( broadcast, busday_count, datetime64, + diff, dtype, empty, + flatnonzero, + hstack, nan, vectorize, where @@ -397,3 +400,33 @@ def as_column(a): "but got an array of shape %s" % a.shape ) return a[:, None] + + +def changed_locations(a, include_first): + """ + Compute indices of values in ``a`` that differ from the previous value. + + Parameters + ---------- + a : np.ndarray + The array on which to indices of change. + include_first : bool + Whether or not to consider the first index of the array as "changed". + + Example + ------- + >>> import numpy as np + >>> changed_locations(np.array([0, 0, 5, 5, 1, 1]), include_first=False) + array([2, 4]) + + >>> changed_locations(np.array([0, 0, 5, 5, 1, 1]), include_first=True) + array([0, 2, 4]) + """ + if a.ndim > 1: + raise ValueError("indices_of_changed_values only supports 1D arrays.") + indices = flatnonzero(diff(a)) + 1 + + if not include_first: + return indices + + return hstack([[0], indices]) From c0005e22ad3309578e7b4f87e13148347f3afa8e Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 22:56:45 -0400 Subject: [PATCH 11/25] MAINT: Clean up WithTradingSessions. - Add WithDefaultDateBounds, since we use alias('START_DATE') and alias ('END_DATE'). - Fix copypasta in assertion. --- zipline/testing/fixtures.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index b2d440c3..88af7f1d 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -527,7 +527,7 @@ class WithSimParams(WithTradingEnvironment): cls.sim_params = cls.make_simparams() -class WithTradingSessions(WithTradingCalendars): +class WithTradingSessions(WithTradingCalendars, WithDefaultDateBounds): """ ZiplineTestCase mixin providing cls.trading_days, cls.all_trading_sessions as a class-level fixture. @@ -1212,7 +1212,7 @@ class WithSeededRandomPipelineEngine(WithTradingSessions, WithAssetFinder): if start_date not in self.trading_days: raise AssertionError("Start date not in calendar: %s" % start_date) if end_date not in self.trading_days: - raise AssertionError("Start date not in calendar: %s" % start_date) + raise AssertionError("End date not in calendar: %s" % end_date) return self.seeded_random_engine.run_pipeline( pipeline, start_date, From 790ee594554a82a245601eaca1f32a8c4730f1a0 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 22:58:04 -0400 Subject: [PATCH 12/25] MAINT: Add alt-constructor for NoFurtherDataError. --- zipline/errors.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/zipline/errors.py b/zipline/errors.py index 84db348f..c602c099 100644 --- a/zipline/errors.py +++ b/zipline/errors.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from textwrap import dedent from zipline.utils.memoize import lazyval @@ -583,6 +584,29 @@ class NoFurtherDataError(ZiplineError): # that can be usefully templated. msg = '{msg}' + @classmethod + def from_lookback_window(cls, + initial_message, + first_date, + lookback_start, + lookback_length): + return cls( + msg=dedent( + """ + {initial_message} + + lookback window started at {lookback_start} + earliest known date was {first_date} + {lookback_length} extra rows of data were required + """ + ).format( + initial_message=initial_message, + first_date=first_date, + lookback_start=lookback_start, + lookback_length=lookback_length, + ) + ) + class UnsupportedDatetimeFormat(ZiplineError): """ From b40ebdcfce7b6e3fc18735505cc0f26a8b71bbdf Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 15 Aug 2016 22:58:40 -0400 Subject: [PATCH 13/25] ENH: Add support for downsampling. Adds a new ``downsample`` method to all computable terms. Computable terms (Filters, Factors, and Classifiers) can be downsampled to yearly, quarterly, monthly, or weekly frequency. The result of ``term.downsample`` is a new term of the same family (Filter/Factor/Classifier) as ``term``. The downsampled term computes by delegating to the original term; repeatedly calling its ``compute`` method with length-1 date ranges. Downsampled terms take advantage of a new ``compute_extra_rows`` Term method, which allows terms to dynamically request that additional extra rows of themselves be computed based on the dates for which they're being computed. This ensures, for example, that a monthly-downsampled term always computes at the start of a month, even when a naively-calculated pipeline window would end in the middle of the month. --- tests/pipeline/test_downsampling.py | 615 +++++++++++++++++++++ zipline/pipeline/classifiers/classifier.py | 17 + zipline/pipeline/engine.py | 12 +- zipline/pipeline/factors/factor.py | 16 + zipline/pipeline/filters/filter.py | 16 + zipline/pipeline/mixins.py | 221 +++++++- zipline/pipeline/term.py | 56 +- 7 files changed, 942 insertions(+), 11 deletions(-) create mode 100644 tests/pipeline/test_downsampling.py diff --git a/tests/pipeline/test_downsampling.py b/tests/pipeline/test_downsampling.py new file mode 100644 index 00000000..47f35eee --- /dev/null +++ b/tests/pipeline/test_downsampling.py @@ -0,0 +1,615 @@ +""" +Tests for Downsampled Filters/Factors/Classifiers +""" +import pandas as pd +from pandas.util.testing import assert_frame_equal + +from zipline.pipeline import ( + Pipeline, + CustomFactor, + CustomFilter, + CustomClassifier, +) +from zipline.pipeline.data.testing import TestingDataSet +from zipline.pipeline.factors import SimpleMovingAverage +from zipline.testing import ZiplineTestCase, parameter_space +from zipline.testing.fixtures import ( + WithTradingSessions, + WithSeededRandomPipelineEngine, +) + + +class NDaysAgoFactor(CustomFactor): + inputs = [TestingDataSet.float_col] + + def compute(self, today, assets, out, floats): + out[:] = floats[0] + + +class NDaysAgoFilter(CustomFilter): + inputs = [TestingDataSet.bool_col] + + def compute(self, today, assets, out, bools): + out[:] = bools[0] + + +class NDaysAgoClassifier(CustomClassifier): + inputs = [TestingDataSet.categorical_col] + dtype = TestingDataSet.categorical_col.dtype + + def compute(self, today, assets, out, cats): + out[:] = cats[0] + + +class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase): + + DATA_MIN_DAY = pd.Timestamp('2012-06', tz='UTC') + DATA_MAX_DAY = pd.Timestamp('2015', tz='UTC') + TRADING_CALENDAR_STRS = ('NYSE',) + + # Test with different window_lengths to ensure that window length is not + # used when calculating exra rows for the top-level term. + factor1 = TestingDataSet.float_col.latest + factor11 = NDaysAgoFactor(window_length=11) + factor91 = NDaysAgoFactor(window_length=91) + + filter1 = TestingDataSet.bool_col.latest + filter11 = NDaysAgoFilter(window_length=11) + filter91 = NDaysAgoFilter(window_length=91) + + classifier1 = TestingDataSet.categorical_col.latest + classifier11 = NDaysAgoClassifier(window_length=11) + classifier91 = NDaysAgoClassifier(window_length=91) + + all_terms = [ + factor1, + factor11, + factor91, + filter1, + filter11, + filter91, + classifier1, + classifier11, + classifier91, + ] + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_yearly(self, base_terms, calendar_name): + downsampled_terms = tuple(t.downsample('Y') for t in base_terms) + all_terms = base_terms + downsampled_terms + + all_sessions = self.trading_sessions[calendar_name] + end_session = all_sessions[-1] + + years = all_sessions.year + sessions_in_2012 = all_sessions[years == 2012] + sessions_in_2013 = all_sessions[years == 2013] + sessions_in_2014 = all_sessions[years == 2014] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in 2014. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(0, 30, 5): + start_session = sessions_in_2014[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the second date in 2014. We should request one more extra + # row in the downsampled terms to push us back to the first date in + # 2014. + for i in range(0, 30, 5): + start_session = sessions_in_2014[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of 2013. The downsampled terms should request + # enough extra rows to push us back to the start of 2013. + for i in range(0, 30, 5): + start_session = sessions_in_2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(sessions_in_2013), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of 2012. The downsampled terms should request + # enough extra rows to push us back to the first known date, which is + # in the middle of 2012 + for i in range(0, 30, 5): + start_session = sessions_in_2013[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(sessions_in_2012), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_quarterly(self, calendar_name, base_terms): + downsampled_terms = tuple(t.downsample('Q') for t in base_terms) + all_terms = base_terms + downsampled_terms + + # This region intersects with Q4 2013, Q1 2014, and Q2 2014. + tmp = self.trading_sessions[calendar_name] + all_sessions = tmp[tmp.slice_indexer('2013-12-15', '2014-04-30')] + end_session = all_sessions[-1] + + months = all_sessions.month + Q4_2013 = all_sessions[months == 12] + Q1_2014 = all_sessions[(months == 1) | (months == 2) | (months == 3)] + Q2_2014 = all_sessions[months == 4] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in Q2 2014. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(0, 15, 5): + start_session = Q2_2014[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the second date in Q2 2014. + # The downsampled terms should request one more extra row. + for i in range(0, 15, 5): + start_session = Q2_2014[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in Q1 2014. The downsampled terms + # should request enough extra rows to push us back to the first date of + # Q1 2014. + for i in range(0, 15, 5): + start_session = Q2_2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(Q1_2014), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in Q4 2013. The downsampled terms + # should request enough extra rows to push us back to the first known + # date, which is in the middle of december 2013. + for i in range(0, 15, 5): + start_session = Q1_2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(Q4_2013), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_monthly(self, calendar_name, base_terms): + downsampled_terms = tuple(t.downsample('M') for t in base_terms) + all_terms = base_terms + downsampled_terms + + # This region intersects with Dec 2013, Jan 2014, and Feb 2014. + tmp = self.trading_sessions[calendar_name] + all_sessions = tmp[tmp.slice_indexer('2013-12-15', '2014-02-28')] + end_session = all_sessions[-1] + + months = all_sessions.month + dec2013 = all_sessions[months == 12] + jan2014 = all_sessions[months == 1] + feb2014 = all_sessions[months == 2] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in feb 2014. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(0, 10, 2): + start_session = feb2014[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the second date in feb 2014. We should request one more + # extra row in the downsampled terms to push us back to the first date + # in 2014. + for i in range(0, 10, 2): + start_session = feb2014[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of jan 2014. The downsampled terms should + # request enough extra rows to push us back to the start of jan 2014. + for i in range(0, 10, 2): + start_session = feb2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(jan2014), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land on the last date of dec 2013. The downsampled terms should + # request enough extra rows to push us back to the first known date, + # which is in the middle of december 2013. + for i in range(0, 10, 2): + start_session = jan2014[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(dec2013), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + @parameter_space( + calendar_name=TRADING_CALENDAR_STRS, + base_terms=[ + (factor1, factor11, factor91), + (filter1, filter11, filter91), + (classifier1, classifier11, classifier91), + ], + __fail_fast=True + ) + def test_weekly(self, calendar_name, base_terms): + downsampled_terms = tuple(t.downsample('W') for t in base_terms) + all_terms = base_terms + downsampled_terms + + # December 2013 + # Mo Tu We Th Fr Sa Su + # 1 + # 2 3 4 5 6 7 8 + # 9 10 11 12 13 14 15 + # 16 17 18 19 20 21 22 + # 23 24 25 26 27 28 29 + # 30 31 + + # January 2014 + # Mo Tu We Th Fr Sa Su + # 1 2 3 4 5 + # 6 7 8 9 10 11 12 + # 13 14 15 16 17 18 19 + # 20 21 22 23 24 25 26 + # 27 28 29 30 31 + + # This region intersects with the last full week of 2013, the week + # shared by 2013 and 2014, and the first full week of 2014. + tmp = self.trading_sessions[calendar_name] + all_sessions = tmp[tmp.slice_indexer('2013-12-27', '2014-01-12')] + end_session = all_sessions[-1] + + week0 = all_sessions[ + all_sessions.slice_indexer('2013-12-27', '2013-12-29') + ] + week1 = all_sessions[ + all_sessions.slice_indexer('2013-12-30', '2014-01-05') + ] + week2 = all_sessions[ + all_sessions.slice_indexer('2014-01-06', '2014-01-12') + ] + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the first date in week 2. We shouldn't request any + # additional rows for the regular terms or the downsampled terms. + for i in range(3): + start_session = week2[i] + self.check_extra_row_calculations( + all_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the second date in week 2. The downsampled terms + # should request one more extra row. + for i in range(3): + start_session = week2[i + 1] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i + 1, + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i, + expected_extra_rows=i, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in week 1. The downsampled terms + # should request enough extra rows to push us back to the first date of + # week 1. + for i in range(3): + start_session = week2[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(week1), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + # Simulate requesting computation where the unaltered lookback would + # land exactly on the last date in week0. The downsampled terms + # should request enough extra rows to push us back to the first known + # date, which is in the middle of december 2013. + for i in range(3): + start_session = week1[i] + self.check_extra_row_calculations( + downsampled_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + len(week0), + ) + self.check_extra_row_calculations( + base_terms, + all_sessions, + start_session, + end_session, + min_extra_rows=i + 1, + expected_extra_rows=i + 1, + ) + + def check_extra_row_calculations(self, + terms, + all_sessions, + start_session, + end_session, + min_extra_rows, + expected_extra_rows): + """ + Check that each term in ``terms`` computes an expected number of extra + rows for the given parameters. + """ + for term in terms: + result = term.compute_extra_rows( + all_sessions, + start_session, + end_session, + min_extra_rows, + ) + self.assertEqual( + result, + expected_extra_rows, + "Expected {} extra_rows from {}, but got {}.".format( + expected_extra_rows, + term, + result, + ) + ) + + +class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, + ZiplineTestCase): + + # Extend into the last few days of 2013 to test year/quarter boundaries. + START_DATE = pd.Timestamp('2013-12-15', tz='UTC') + + # Extend into the first few days of 2015 to test year/quarter boundaries. + END_DATE = pd.Timestamp('2015-01-06', tz='UTC') + + def test_downsample_windowed_factor(self): + + f = SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + + # June 2014 + # Mo Tu We Th Fr Sa Su + # 1 + # 2 3 4 5 6 7 8 + # 9 10 11 12 13 14 15 + # 16 17 18 19 20 21 22 + # 23 24 25 26 27 28 29 + # 30 + all_sessions = self.nyse_sessions + compute_dates = all_sessions[ + all_sessions.slice_indexer('2014-06-05', '2015-01-06') + ] + start_date, end_date = compute_dates[[0, -1]] + + pipe = Pipeline({ + 'year': f.downsample(frequency='Y'), + 'quarter': f.downsample(frequency='Q'), + 'month': f.downsample(frequency='M'), + 'week': f.downsample(frequency='W'), + }) + + # Raw values for f, computed each day from 2014 to the end of the + # target period. + raw_f_results = self.run_pipeline( + Pipeline({'f': f}), + start_date=pd.Timestamp('2014-01-02', tz='UTC'), + end_date=pd.Timestamp('2015-01-06', tz='UTC'), + )['f'].unstack() + + expected_results = { + 'year': (raw_f_results + .groupby(pd.TimeGrouper('AS')) + .first() + .reindex(compute_dates, method='ffill')), + 'quarter': (raw_f_results + .groupby(pd.TimeGrouper('QS')) + .first() + .reindex(compute_dates, method='ffill')), + 'month': (raw_f_results + .groupby(pd.TimeGrouper('MS')) + .first() + .reindex(compute_dates, method='ffill')), + 'week': (raw_f_results + .groupby(pd.TimeGrouper('W', label='left')) + .first() + .reindex(compute_dates, method='ffill')), + } + + results = self.run_pipeline(pipe, start_date, end_date) + + for frequency in expected_results: + result = results[frequency].unstack() + expected = expected_results[frequency] + assert_frame_equal(result, expected) diff --git a/zipline/pipeline/classifiers/classifier.py b/zipline/pipeline/classifiers/classifier.py index 1ae66dbc..12b3df6b 100644 --- a/zipline/pipeline/classifiers/classifier.py +++ b/zipline/pipeline/classifiers/classifier.py @@ -23,6 +23,7 @@ from zipline.utils.numpy_utils import ( from ..filters import ArrayPredicate, NotNullFilter, NullFilter, NumExprFilter from ..mixins import ( CustomTermMixin, + DownsampledMixin, LatestMixin, PositiveWindowLengthMixin, RestrictedDTypeMixin, @@ -301,6 +302,10 @@ class Classifier(RestrictedDTypeMixin, ComputableTerm): raise AssertionError("Expected a LabelArray, got %s." % type(data)) return data.as_categorical() + @property + def _downsampled_type(self): + return DownsampledClassifier + class Everything(Classifier): """ @@ -386,6 +391,18 @@ class Latest(LatestMixin, CustomClassifier): pass +class DownsampledClassifier(DownsampledMixin, Classifier): + """ + A Classifier that defers to another Classifier at lower-than-daily + frequency. + + Parameters + ---------- + term : zipline.Classifier + freq : {'Y', 'Q', 'M', 'W'} + """ + + class InvalidClassifierComparison(TypeError): def __init__(self, classifier, compval): super(InvalidClassifierComparison, self).__init__( diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 616178c7..dcda762a 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -222,13 +222,11 @@ class SimplePipelineEngine(object): finder = self._finder start_idx, end_idx = self._calendar.slice_locs(start_date, end_date) if start_idx < extra_rows: - raise NoFurtherDataError( - msg="Insufficient data to compute Pipeline.\n\n" - "start date was %s\n" - "earliest known date was %s\n" - "%d extra rows were required" % ( - start_date, calendar[0], extra_rows, - ), + raise NoFurtherDataError.from_lookback_window( + initial_message="Insufficient data to compute Pipeline:", + first_date=calendar[0], + lookback_start=start_date, + lookback_length=extra_rows, ) # Build lifetimes matrix reaching back to `extra_rows` days before diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 3410a60d..c78c074c 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -33,6 +33,7 @@ from zipline.pipeline.filters import ( ) from zipline.pipeline.mixins import ( CustomTermMixin, + DownsampledMixin, LatestMixin, PositiveWindowLengthMixin, RestrictedDTypeMixin, @@ -1071,6 +1072,10 @@ class Factor(RestrictedDTypeMixin, ComputableTerm): """ return (-inf < self) & (self < inf) + @property + def _downsampled_type(self): + return DownsampledFactor + class NumExprFactor(NumericalExpression, Factor): """ @@ -1510,6 +1515,17 @@ class Latest(LatestMixin, CustomFactor): out[:] = data[-1] +class DownsampledFactor(DownsampledMixin, Factor): + """ + A Factor that defers to another Factor at lower-than-daily frequency. + + Parameters + ---------- + term : zipline.pipeline.Factor + freq : {'Y', 'Q', 'M', 'W'} + """ + + # Functions to be passed to GroupedRowTransform. These aren't defined inline # because the transformation function is part of the instance hash key. def demean(row): diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 502fd031..c600fc15 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -25,6 +25,7 @@ from zipline.pipeline.expression import ( ) from zipline.pipeline.mixins import ( CustomTermMixin, + DownsampledMixin, LatestMixin, PositiveWindowLengthMixin, RestrictedDTypeMixin, @@ -201,6 +202,10 @@ class Filter(RestrictedDTypeMixin, ComputableTerm): ) return retval + @property + def _downsampled_type(self): + return DownsampledFilter + class NumExprFilter(NumericalExpression, Filter): """ @@ -458,6 +463,17 @@ class Latest(LatestMixin, CustomFilter): pass +class DownsampledFilter(DownsampledMixin, Filter): + """ + A Filter that defers to another Filter at lower-than-daily frequency. + + Parameters + ---------- + term : zipline.pipeline.Filter + freq : {'Y', 'Q', 'M', 'W'} + """ + + class SingleAsset(Filter): """ A Filter that computes to True only for the given asset. diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 3eb0cfd4..73b6d111 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -1,16 +1,28 @@ """ Mixins classes for use with Filters and Factors. """ +from operator import attrgetter + from numpy import ( array, full, recarray, + vstack, ) +from pandas import NaT as pd_NaT +from zipline.errors import ( + WindowLengthNotPositive, + UnsupportedDataType, + NoFurtherDataError, +) from zipline.utils.control_flow import nullctx -from zipline.errors import WindowLengthNotPositive, UnsupportedDataType +from zipline.utils.input_validation import expect_element, expect_types +from zipline.utils.numpy_utils import changed_locations +from zipline.utils.pandas_utils import nearest_unequal_elements from .sentinels import NotSpecified +from .term import Term class PositiveWindowLengthMixin(object): @@ -218,3 +230,210 @@ class LatestMixin(SingleInputMixin): actual=self.inputs[0].dtype, ) ) + + +_dt_to_period = { + 'Y': attrgetter('year'), + 'Q': attrgetter('quarter'), + 'M': attrgetter('month'), + 'W': attrgetter('week'), +} + + +def select_sampling_indices(dates, frequency): + """ + Choose entries from ``dates`` to use for downsampling at ``frequency``. + + Parameters + ---------- + dates : pd.DatetimeIndex + Dates from which to select sample choices. + frequency : {'Y', 'Q', 'M', 'W'} + Frequency at which samples are to be taken. + + Returns + ------- + indices : np.array[int64] + An array condtaining indices of dates on which samples should be taken. + + The resulting index will always include 0 as a sample index, and it + will include the first date of each subsequent year/quarter/month/week, + as determined by ``frequency``. + + Notes + ----- + This function assumes that ``dates`` does not have large gaps. + + In particular, it assumes that the maximum distance between any two entries + in ``dates`` is never greater than a year, which we rely on because we use + ``np.diff(dates.{quarter,month,week})`` to find dates where the sampling + period has changed. + """ + return changed_locations( + _dt_to_period[frequency](dates), + include_first=True + ) + + +class DownsampledMixin(StandardOutputs): + """ + Mixin for behavior shared by Downsampled{Factor,Filter,Classifier} + + A downsampled term is a wrapper around the "real" term that performs actual + computation. The downsampler is responsible for calling the real term's + `compute` method at selected intervals and forward-filling the computed + values. + + Downsampling is not currently supported for terms with multiple outputs. + """ + # There's no reason to take a window of a downsampled term. The whole + # point is that you're re-using the same result multiple times. + window_safe = False + + @expect_types(term=Term) + @expect_element(frequency=frozenset(_dt_to_period)) + def __new__(cls, term, frequency): + return super(DownsampledMixin, cls).__new__( + cls, + inputs=term.inputs, + outputs=term.outputs, + window_length=term.window_length, + mask=term.mask, + frequency=frequency, + wrapped_term=term, + dtype=term.dtype, + missing_value=term.missing_value, + ndim=term.ndim, + ) + + def _init(self, frequency, wrapped_term, *args, **kwargs): + self._frequency = frequency + self._wrapped_term = wrapped_term + return super(DownsampledMixin, self)._init(*args, **kwargs) + + @classmethod + def _static_identity(cls, frequency, wrapped_term, *args, **kwargs): + return ( + super(DownsampledMixin, cls)._static_identity(*args, **kwargs), + frequency, + wrapped_term, + ) + + def compute_extra_rows(self, + all_dates, + start_date, + end_date, + min_extra_rows): + """ + Ensure that min_extra_rows pushes us back to a computation date. + + Parameters + ---------- + all_dates : pd.DatetimeIndex + The trading sessions against which ``self`` will be computed. + start_date : pd.Timestamp + The first date for which final output is requested. + end_date : pd.Timestamp + The last date for which final output is requested. + min_extra_rows : int + The minimum number of extra rows required of ``self``, as + determined by other terms that depend on ``self``. + + Returns + ------- + extra_rows : int + The number of extra rows to compute. This will be the minimum + number of rows required to make our computed start_date fall on a + recomputation date. + """ + try: + current_start_pos = all_dates.get_loc(start_date) - min_extra_rows + if current_start_pos < 0: + raise NoFurtherDataError( + initial_message="Insufficient data to compute Pipeline:", + first_date=all_dates[0], + lookback_start=start_date, + lookback_length=min_extra_rows, + ) + except KeyError: + before, after = nearest_unequal_elements(all_dates, start_date) + raise ValueError( + "Pipeline start_date {start_date} is not in calendar.\n" + "Latest date before start_date is {before}.\n" + "Earliest date after start_date is {after}.".format( + start_date=start_date, + before=before, + after=after, + ) + ) + + # Our possible target dates are all the dates on or before the current + # starting position. + # TODO: Consider bounding this below by self.window_length + candidates = all_dates[:current_start_pos + 1] + + # Choose the latest date in the candidates that is the start of a new + # period at our frequency. + choices = select_sampling_indices(candidates, self._frequency) + + # If we have choices, the last choice is the first date if the + # period containing current_start_date. Choose it. + new_start_date = candidates[choices[-1]] + + # Add the difference between the new and old start dates to get the + # number of rows for the new start_date. + new_start_pos = all_dates.get_loc(new_start_date) + assert new_start_pos <= current_start_pos, \ + "Computed negative extra rows!" + + return min_extra_rows + (current_start_pos - new_start_pos) + + def _compute(self, windows, dates, assets, mask): + """ + Compute by delegating to self._wrapped_term._compute on sample dates. + + On non-sample dates, forward-fill from previously-computed samples. + """ + to_sample = dates[select_sampling_indices(dates, self._frequency)] + assert to_sample[0] == dates[0], \ + "Misaligned sampling dates in %s." % type(self).__name__ + + real_compute = self._wrapped_term._compute + + results = [] + samples = iter(to_sample) + next_sample = next(samples) + for i, compute_date in enumerate(dates): + if next_sample == compute_date: + results.append( + real_compute( + windows, + dates[i:i + 1], + assets, + mask[i:i + 1], + ) + ) + try: + next_sample = next(samples) + except StopIteration: + # No more samples to take. Set next_sample to Nat, which + # compares False with any other datetime. + next_sample = pd_NaT + else: + # Copy results from previous sample period. + results.append(results[-1]) + + # Force adjusted arrays forward one tick. + for w in windows: + next(w) + + # We should have exhausted our sample dates. + try: + next_sample = next(samples) + except StopIteration: + pass + else: + raise AssertionError("Unconsumed sample date: %s" % next_sample) + + # Concatenate stored results. + return vstack(results) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 5c7218e2..98941fe8 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -287,9 +287,28 @@ class Term(with_metaclass(ABCMeta, object)): """ Calculate the number of extra rows needed to compute ``self``. - Must return at least ``min_extra_rows``, but can optionally require - more. This is used by downsampled terms to ensure that the first date - computed is a recomputation date. + Must return at least ``min_extra_rows``, and the default implementation + is to just return ``min_extra_rows``. This is overridden by + downsampled terms to ensure that the first date computed is a + recomputation date. + + Parameters + ---------- + all_dates : pd.DatetimeIndex + The trading sessions against which ``self`` will be computed. + start_date : pd.Timestamp + The first date for which final output is requested. + end_date : pd.Timestamp + The last date for which final output is requested. + min_extra_rows : int + The minimum number of extra rows required of ``self``, as + determined by other terms that depend on ``self``. + + Returns + ------- + extra_rows : int + The number of extra rows to compute. Must be at least + ``min_extra_rows``. """ return min_extra_rows @@ -566,6 +585,31 @@ class ComputableTerm(Term): """ return data + def _downsampled_type(self): + """ + The expression type to return from self.downsample(). + """ + raise NotImplementedError( + "downsampling is not yet implemented " + "for instances of %s." % type(self).__name__ + ) + + def downsample(self, frequency): + """ + Make a term that computes from ``self`` at lower-than-daily frequency. + + Parameters + ---------- + frequency : str, {'Y', 'Q', 'M', 'W'} + A string indicating the desired sampling rate. + 'Y' -> sample on the first trading day of each calendar year + 'Q' -> sample on the first trading day of + January, April, July, and October + 'M' -> sample on the first trading day of each month + 'W' -> sample on the first trading day of each week + """ + return self._downsampled_type(term=self, frequency=frequency) + def __repr__(self): return ( "{type}({inputs}, window_length={window_length})" @@ -632,6 +676,12 @@ class Slice(ComputableTerm): # column. return windows[0][:, [asset_column]] + @property + def _downsampled_type(self): + raise NotImplementedError( + 'downsampling of slices is not yet supported' + ) + def validate_dtype(termname, dtype, missing_value): """ From 221ec2073fa6e72036de6c5513273b06965d2650 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Tue, 16 Aug 2016 14:05:27 -0400 Subject: [PATCH 14/25] STY: Flake8 cleanup. --- tests/pipeline/test_engine.py | 3 +-- tests/pipeline/test_factor.py | 3 +-- tests/pipeline/test_filter.py | 4 ++-- zipline/pipeline/graph.py | 4 ++-- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index fecebd93..e656cbae 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -228,7 +228,7 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase): # We shouldn't be able to compute dates[8], since we only know about 8 # prior dates, and we need a window length of 10. - with self.assertRaises(NoFurtherDataError) as e: + with self.assertRaises(NoFurtherDataError): engine.run_pipeline(p, self.dates[8], self.dates[8]) def test_input_dates_provided_by_default(self): @@ -255,7 +255,6 @@ class ConstantInputTestCase(WithTradingEnvironment, ZiplineTestCase): column = results.unstack().iloc[:, 0].values check_arrays(column, self.dates[:2].values) - def test_same_day_pipeline(self): loader = self.loader engine = SimplePipelineEngine( diff --git a/tests/pipeline/test_factor.py b/tests/pipeline/test_factor.py index ba50fdae..91acbe35 100644 --- a/tests/pipeline/test_factor.py +++ b/tests/pipeline/test_factor.py @@ -26,7 +26,7 @@ from zipline.errors import UnknownRankMethod from zipline.lib.labelarray import LabelArray from zipline.lib.rank import masked_rankdata_2d from zipline.lib.normalize import naive_grouped_rowwise_apply as grouped_apply -from zipline.pipeline import Classifier, Factor, Filter, TermGraph +from zipline.pipeline import Classifier, Factor, Filter from zipline.pipeline.factors import ( Returns, RSI, @@ -37,7 +37,6 @@ from zipline.testing import ( parameter_space, permute_rows, ) -from zipline.utils.functional import dzip_exact from zipline.utils.numpy_utils import ( categorical_dtype, datetime64ns_dtype, diff --git a/tests/pipeline/test_filter.py b/tests/pipeline/test_filter.py index 958c6e90..62cc187d 100644 --- a/tests/pipeline/test_filter.py +++ b/tests/pipeline/test_filter.py @@ -26,11 +26,11 @@ from numpy import ( from numpy.random import randn, seed as random_seed from zipline.errors import BadPercentileBounds -from zipline.pipeline import Filter, Factor, TermGraph +from zipline.pipeline import Filter, Factor from zipline.pipeline.classifiers import Classifier from zipline.pipeline.factors import CustomFactor from zipline.pipeline.filters import All, Any, AtLeastN -from zipline.testing import check_arrays, parameter_space, permute_rows +from zipline.testing import parameter_space, permute_rows from zipline.utils.numpy_utils import float64_dtype, int64_dtype from .base import BasePipelineTestCase, with_default_shape diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index eedb8c3c..e18c3602 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -5,7 +5,7 @@ from networkx import ( DiGraph, topological_sort, ) -from six import iteritems +from six import iteritems, itervalues from zipline.utils.memoize import lazyval from zipline.pipeline.visualize import display_graph @@ -48,7 +48,7 @@ class TermGraph(DiGraph): self._frozen = False parents = set() - for name, term in iteritems(terms): + for term in itervalues(terms): self._add_to_graph(term, parents) # No parents should be left between top-level terms. assert not parents From 72a31ceb7d37a4030a208b3d2fcc62e36ee3207f Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Tue, 16 Aug 2016 14:34:57 -0400 Subject: [PATCH 15/25] DOC: Docstring cleanups. --- zipline/pipeline/graph.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index e18c3602..9ff09d45 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -136,7 +136,12 @@ class ExecutionPlan(TermGraph): terms : dict A dict mapping names to final output terms. all_dates : pd.DatetimeIndex - The dates fo + An index of all known trading days for which ``terms`` will be + computed. + start_date : pd.Timestamp + The first date for which output is requested for ``terms``. + end_date : pd.Timestamp + The last date for which output is requested for ``terms``. Attributes ---------- @@ -282,9 +287,6 @@ class ExecutionPlan(TermGraph): """ A dict mapping `term` -> `# of extra rows to load/compute of `term`. - This is always the maximum number of extra **input** rows required by - any Filter/Factor for which `term` is an input. - Notes ---- This value depends on the other terms in the graph that require `term` From 20e48cf8266bd8650d68473782608729d92ca678 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Tue, 16 Aug 2016 19:40:20 -0400 Subject: [PATCH 16/25] ENH: Add non-windowed downsampling. --- tests/pipeline/test_downsampling.py | 83 +++++++++++++++++++++++------ zipline/pipeline/mixins.py | 30 ++++++++--- 2 files changed, 90 insertions(+), 23 deletions(-) diff --git a/tests/pipeline/test_downsampling.py b/tests/pipeline/test_downsampling.py index 47f35eee..e57beed6 100644 --- a/tests/pipeline/test_downsampling.py +++ b/tests/pipeline/test_downsampling.py @@ -12,11 +12,13 @@ from zipline.pipeline import ( ) from zipline.pipeline.data.testing import TestingDataSet from zipline.pipeline.factors import SimpleMovingAverage +from zipline.pipeline.filters.smoothing import All from zipline.testing import ZiplineTestCase, parameter_space from zipline.testing.fixtures import ( WithTradingSessions, WithSeededRandomPipelineEngine, ) +from zipline.utils.numpy_utils import int64_dtype class NDaysAgoFactor(CustomFactor): @@ -552,12 +554,9 @@ class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, # Extend into the first few days of 2015 to test year/quarter boundaries. END_DATE = pd.Timestamp('2015-01-06', tz='UTC') - def test_downsample_windowed_factor(self): + ASSET_FINDER_EQUITY_SIDS = tuple(range(10)) - f = SimpleMovingAverage( - inputs=[TestingDataSet.float_col], - window_length=5, - ) + def check_downsampled_term(self, term): # June 2014 # Mo Tu We Th Fr Sa Su @@ -574,34 +573,34 @@ class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, start_date, end_date = compute_dates[[0, -1]] pipe = Pipeline({ - 'year': f.downsample(frequency='Y'), - 'quarter': f.downsample(frequency='Q'), - 'month': f.downsample(frequency='M'), - 'week': f.downsample(frequency='W'), + 'year': term.downsample(frequency='Y'), + 'quarter': term.downsample(frequency='Q'), + 'month': term.downsample(frequency='M'), + 'week': term.downsample(frequency='W'), }) - # Raw values for f, computed each day from 2014 to the end of the + # Raw values for term, computed each day from 2014 to the end of the # target period. - raw_f_results = self.run_pipeline( - Pipeline({'f': f}), + raw_term_results = self.run_pipeline( + Pipeline({'term': term}), start_date=pd.Timestamp('2014-01-02', tz='UTC'), end_date=pd.Timestamp('2015-01-06', tz='UTC'), - )['f'].unstack() + )['term'].unstack() expected_results = { - 'year': (raw_f_results + 'year': (raw_term_results .groupby(pd.TimeGrouper('AS')) .first() .reindex(compute_dates, method='ffill')), - 'quarter': (raw_f_results + 'quarter': (raw_term_results .groupby(pd.TimeGrouper('QS')) .first() .reindex(compute_dates, method='ffill')), - 'month': (raw_f_results + 'month': (raw_term_results .groupby(pd.TimeGrouper('MS')) .first() .reindex(compute_dates, method='ffill')), - 'week': (raw_f_results + 'week': (raw_term_results .groupby(pd.TimeGrouper('W', label='left')) .first() .reindex(compute_dates, method='ffill')), @@ -613,3 +612,53 @@ class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, result = results[frequency].unstack() expected = expected_results[frequency] assert_frame_equal(result, expected) + + def test_downsample_windowed_factor(self): + self.check_downsampled_term( + SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + ) + + def test_downsample_non_windowed_factor(self): + sma = SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + + self.check_downsampled_term(((sma + sma) / 2).rank()) + + def test_downsample_windowed_filter(self): + sma = SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + self.check_downsampled_term(All(inputs=[sma.top(4)], window_length=5)) + + def test_downsample_nonwindowed_filter(self): + sma = SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + self.check_downsampled_term(sma > 5) + + def test_downsample_windowed_classifier(self): + + class IntSumClassifier(CustomClassifier): + inputs = [TestingDataSet.float_col] + window_length = 8 + dtype = int64_dtype + missing_value = -1 + + def compute(self, today, assets, out, floats): + out[:] = floats.sum(axis=0).astype(int) % 4 + + self.check_downsampled_term(IntSumClassifier()) + + def test_downsample_nonwindowed_classifier(self): + sma = SimpleMovingAverage( + inputs=[TestingDataSet.float_col], + window_length=5, + ) + self.check_downsampled_term(sma.quantiles(5)) diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 73b6d111..9fa0ad6a 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -388,7 +388,7 @@ class DownsampledMixin(StandardOutputs): return min_extra_rows + (current_start_pos - new_start_pos) - def _compute(self, windows, dates, assets, mask): + def _compute(self, inputs, dates, assets, mask): """ Compute by delegating to self._wrapped_term._compute on sample dates. @@ -400,6 +400,27 @@ class DownsampledMixin(StandardOutputs): real_compute = self._wrapped_term._compute + if self.windowed: + # If we're windowed, inputs are stateful AdjustedArrays. We don't + # need to do any preparation before forwarding to real_compute, but + # we need to call `next` on them if we want to skip an iteration. + def prepare_inputs(): + return inputs + + def skip_this_input(): + for w in inputs: + next(w) + else: + # If we're not windowed, inputs are just ndarrays. We need to + # slice off one row when forwarding to real_compute, but we don't + # need to do anything to skip an input. + def prepare_inputs(): + # i is the loop iteration variable below. + return [a[[i]] for a in inputs] + + def skip_this_input(): + pass + results = [] samples = iter(to_sample) next_sample = next(samples) @@ -407,7 +428,7 @@ class DownsampledMixin(StandardOutputs): if next_sample == compute_date: results.append( real_compute( - windows, + prepare_inputs(), dates[i:i + 1], assets, mask[i:i + 1], @@ -420,13 +441,10 @@ class DownsampledMixin(StandardOutputs): # compares False with any other datetime. next_sample = pd_NaT else: + skip_this_input() # Copy results from previous sample period. results.append(results[-1]) - # Force adjusted arrays forward one tick. - for w in windows: - next(w) - # We should have exhausted our sample dates. try: next_sample = next(samples) From 7fc0dd6d97025602950d23b3870dbeaa326a2955 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Tue, 16 Aug 2016 19:51:54 -0400 Subject: [PATCH 17/25] MAINT: Use `numpy_utils.as_column` in more places. --- zipline/assets/assets.py | 3 ++- zipline/pipeline/engine.py | 8 ++++++-- zipline/pipeline/loaders/frame.py | 3 ++- zipline/testing/core.py | 10 ++++++---- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/zipline/assets/assets.py b/zipline/assets/assets.py index f8620ddc..8ca34629 100644 --- a/zipline/assets/assets.py +++ b/zipline/assets/assets.py @@ -59,6 +59,7 @@ from .asset_db_schema import ( ) from zipline.utils.control_flow import invert from zipline.utils.memoize import lazyval +from zipline.utils.numpy_utils import as_column from zipline.utils.sqlite_utils import group_into_chunks log = Logger('assets.py') @@ -1096,7 +1097,7 @@ class AssetFinder(object): self._asset_lifetimes = self._compute_asset_lifetimes() lifetimes = self._asset_lifetimes - raw_dates = dates.asi8[:, None] + raw_dates = as_column(dates.asi8) if include_start_date: mask = lifetimes.start <= raw_dates else: diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index dcda762a..2fc54c5a 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -18,7 +18,11 @@ from toolz.curried.operator import getitem from zipline.lib.adjusted_array import ensure_adjusted_array, ensure_ndarray from zipline.errors import NoFurtherDataError -from zipline.utils.numpy_utils import repeat_first_axis, repeat_last_axis +from zipline.utils.numpy_utils import ( + as_column, + repeat_first_axis, + repeat_last_axis, +) from zipline.utils.pandas_utils import explode from .term import AssetExists, InputDates, LoadableTerm @@ -181,7 +185,7 @@ class SimplePipelineEngine(object): assets, initial_workspace={ self._root_mask_term: root_mask_values, - self._root_mask_dates_term: dates.values[:, None], + self._root_mask_dates_term: as_column(dates.values) }, ) diff --git a/zipline/pipeline/loaders/frame.py b/zipline/pipeline/loaders/frame.py index 77d4bd91..ba7d9383 100644 --- a/zipline/pipeline/loaders/frame.py +++ b/zipline/pipeline/loaders/frame.py @@ -15,6 +15,7 @@ from pandas import ( ) from zipline.lib.adjusted_array import AdjustedArray from zipline.lib.adjustment import make_adjustment_from_labels +from zipline.utils.numpy_utils import as_column from zipline.utils.pandas_utils import sort_values from .base import PipelineLoader @@ -169,7 +170,7 @@ class DataFrameLoader(PipelineLoader): # Pull out requested columns/rows from our baseline data. data=self.baseline[ix_(date_indexer, assets_indexer)], # Mask out requested columns/rows that didnt match. - mask=(good_assets & good_dates[:, None]) & mask, + mask=(good_assets & as_column(good_dates)) & mask, adjustments=self.format_adjustments(dates, assets), missing_value=column.missing_value, ), diff --git a/zipline/testing/core.py b/zipline/testing/core.py index 9701fc30..65a6caa0 100644 --- a/zipline/testing/core.py +++ b/zipline/testing/core.py @@ -47,9 +47,11 @@ from zipline.pipeline.engine import SimplePipelineEngine from zipline.pipeline.factors import CustomFactor from zipline.pipeline.loaders.testing import make_seeded_random_loader from zipline.utils import security_list -from zipline.utils.input_validation import expect_dimensions -from zipline.utils.sentinel import sentinel from zipline.utils.calendars import get_calendar +from zipline.utils.input_validation import expect_dimensions +from zipline.utils.numpy_utils import as_column +from zipline.utils.sentinel import sentinel + import numpy as np from numpy import float64 @@ -308,11 +310,11 @@ def make_trade_data_for_asset_info(dates, price_sid_deltas = np.arange(len(sids), dtype=float64) * price_step_by_sid price_date_deltas = (np.arange(len(dates), dtype=float64) * price_step_by_date) - prices = (price_sid_deltas + price_date_deltas[:, None]) + price_start + prices = (price_sid_deltas + as_column(price_date_deltas)) + price_start volume_sid_deltas = np.arange(len(sids)) * volume_step_by_sid volume_date_deltas = np.arange(len(dates)) * volume_step_by_date - volumes = (volume_sid_deltas + volume_date_deltas[:, None]) + volume_start + volumes = volume_sid_deltas + as_column(volume_date_deltas) + volume_start for j, sid in enumerate(sids): start_date, end_date = asset_info.loc[sid, ['start_date', 'end_date']] From 19963f5b022accf63b21f6e8cfe51000ca956744 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 14:30:09 -0400 Subject: [PATCH 18/25] MAINT: Clean up downsampling boilerplate. Consolidate docs and mixin applications into one place. --- tests/pipeline/test_downsampling.py | 37 ++++++-- tests/utils/test_preprocess.py | 6 +- zipline/pipeline/classifiers/classifier.py | 17 +--- zipline/pipeline/downsample_helpers.py | 61 ++++++++++++ zipline/pipeline/factors/factor.py | 16 +--- zipline/pipeline/filters/filter.py | 16 +--- zipline/pipeline/mixins.py | 102 +++++++++++---------- zipline/pipeline/term.py | 15 +-- zipline/utils/input_validation.py | 9 +- zipline/utils/sharedoc.py | 90 ++++++++++++++++++ 10 files changed, 263 insertions(+), 106 deletions(-) create mode 100644 zipline/pipeline/downsample_helpers.py create mode 100644 zipline/utils/sharedoc.py diff --git a/tests/pipeline/test_downsampling.py b/tests/pipeline/test_downsampling.py index e57beed6..357c62f0 100644 --- a/tests/pipeline/test_downsampling.py +++ b/tests/pipeline/test_downsampling.py @@ -85,7 +85,9 @@ class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase): __fail_fast=True ) def test_yearly(self, base_terms, calendar_name): - downsampled_terms = tuple(t.downsample('Y') for t in base_terms) + downsampled_terms = tuple( + t.downsample('year_start') for t in base_terms + ) all_terms = base_terms + downsampled_terms all_sessions = self.trading_sessions[calendar_name] @@ -188,7 +190,9 @@ class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase): __fail_fast=True ) def test_quarterly(self, calendar_name, base_terms): - downsampled_terms = tuple(t.downsample('Q') for t in base_terms) + downsampled_terms = tuple( + t.downsample('quarter_start') for t in base_terms + ) all_terms = base_terms + downsampled_terms # This region intersects with Q4 2013, Q1 2014, and Q2 2014. @@ -293,7 +297,9 @@ class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase): __fail_fast=True ) def test_monthly(self, calendar_name, base_terms): - downsampled_terms = tuple(t.downsample('M') for t in base_terms) + downsampled_terms = tuple( + t.downsample('month_start') for t in base_terms + ) all_terms = base_terms + downsampled_terms # This region intersects with Dec 2013, Jan 2014, and Feb 2014. @@ -398,7 +404,9 @@ class ComputeExtraRowsTestcase(WithTradingSessions, ZiplineTestCase): __fail_fast=True ) def test_weekly(self, calendar_name, base_terms): - downsampled_terms = tuple(t.downsample('W') for t in base_terms) + downsampled_terms = tuple( + t.downsample('week_start') for t in base_terms + ) all_terms = base_terms + downsampled_terms # December 2013 @@ -573,10 +581,10 @@ class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, start_date, end_date = compute_dates[[0, -1]] pipe = Pipeline({ - 'year': term.downsample(frequency='Y'), - 'quarter': term.downsample(frequency='Q'), - 'month': term.downsample(frequency='M'), - 'week': term.downsample(frequency='W'), + 'year': term.downsample(frequency='year_start'), + 'quarter': term.downsample(frequency='quarter_start'), + 'month': term.downsample(frequency='month_start'), + 'week': term.downsample(frequency='week_start'), }) # Raw values for term, computed each day from 2014 to the end of the @@ -662,3 +670,16 @@ class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, window_length=5, ) self.check_downsampled_term(sma.quantiles(5)) + + def test_errors_on_bad_downsample_frequency(self): + + f = NDaysAgoFactor(window_length=3) + with self.assertRaises(ValueError) as e: + f.downsample('bad') + + expected = ( + "zipline.pipeline.term.downsample() expected a value in " + "('month_start', 'quarter_start', 'week_start', 'year_start') " + "for argument 'frequency', but got 'bad' instead." + ) + self.assertEqual(str(e.exception), expected) diff --git a/tests/utils/test_preprocess.py b/tests/utils/test_preprocess.py index 343966ab..b9e0162a 100644 --- a/tests/utils/test_preprocess.py +++ b/tests/utils/test_preprocess.py @@ -262,7 +262,11 @@ class PreprocessTestCase(TestCase): expected_message = ( "{qualname}() expected a value in {set_!r}" " for argument 'a', but got 'c' instead." - ).format(set_=set_, qualname=qualname(f)) + ).format( + # We special-case set to show a tuple instead of the set repr. + set_=tuple(set_), + qualname=qualname(f), + ) self.assertEqual(e.exception.args[0], expected_message) def test_expect_dtypes(self): diff --git a/zipline/pipeline/classifiers/classifier.py b/zipline/pipeline/classifiers/classifier.py index 12b3df6b..73bf5a31 100644 --- a/zipline/pipeline/classifiers/classifier.py +++ b/zipline/pipeline/classifiers/classifier.py @@ -14,6 +14,7 @@ from zipline.pipeline.sentinels import NotSpecified from zipline.pipeline.term import ComputableTerm from zipline.utils.compat import unicode from zipline.utils.input_validation import expect_types +from zipline.utils.memoize import classlazyval from zipline.utils.numpy_utils import ( categorical_dtype, int64_dtype, @@ -302,9 +303,9 @@ class Classifier(RestrictedDTypeMixin, ComputableTerm): raise AssertionError("Expected a LabelArray, got %s." % type(data)) return data.as_categorical() - @property + @classlazyval def _downsampled_type(self): - return DownsampledClassifier + return DownsampledMixin.make_downsampled_type(Classifier) class Everything(Classifier): @@ -391,18 +392,6 @@ class Latest(LatestMixin, CustomClassifier): pass -class DownsampledClassifier(DownsampledMixin, Classifier): - """ - A Classifier that defers to another Classifier at lower-than-daily - frequency. - - Parameters - ---------- - term : zipline.Classifier - freq : {'Y', 'Q', 'M', 'W'} - """ - - class InvalidClassifierComparison(TypeError): def __init__(self, classifier, compval): super(InvalidClassifierComparison, self).__init__( diff --git a/zipline/pipeline/downsample_helpers.py b/zipline/pipeline/downsample_helpers.py new file mode 100644 index 00000000..d514fd4f --- /dev/null +++ b/zipline/pipeline/downsample_helpers.py @@ -0,0 +1,61 @@ +""" +Helpers for downsampling code. +""" +from operator import attrgetter + +from zipline.utils.input_validation import expect_element +from zipline.utils.numpy_utils import changed_locations +from zipline.utils.sharedoc import ( + templated_docstring, + PIPELINE_DOWNSAMPLING_FREQUENCY_DOC, +) + +_dt_to_period = { + 'year_start': attrgetter('year'), + 'quarter_start': attrgetter('quarter'), + 'month_start': attrgetter('month'), + 'week_start': attrgetter('week'), +} + +SUPPORTED_DOWNSAMPLE_FREQUENCIES = frozenset(_dt_to_period) + + +expect_downsample_frequency = expect_element( + frequency=SUPPORTED_DOWNSAMPLE_FREQUENCIES, +) + + +@expect_downsample_frequency +@templated_docstring(frequency=PIPELINE_DOWNSAMPLING_FREQUENCY_DOC) +def select_sampling_indices(dates, frequency): + """ + Choose entries from ``dates`` to use for downsampling at ``frequency``. + + Parameters + ---------- + dates : pd.DatetimeIndex + Dates from which to select sample choices. + {frequency} + + Returns + ------- + indices : np.array[int64] + An array condtaining indices of dates on which samples should be taken. + + The resulting index will always include 0 as a sample index, and it + will include the first date of each subsequent year/quarter/month/week, + as determined by ``frequency``. + + Notes + ----- + This function assumes that ``dates`` does not have large gaps. + + In particular, it assumes that the maximum distance between any two entries + in ``dates`` is never greater than a year, which we rely on because we use + ``np.diff(dates.)`` to find dates where the sampling + period has changed. + """ + return changed_locations( + _dt_to_period[frequency](dates), + include_first=True + ) diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index c78c074c..436ae981 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -44,6 +44,7 @@ from zipline.pipeline.term import ComputableTerm, Term from zipline.utils.functional import with_doc, with_name from zipline.utils.input_validation import expect_types from zipline.utils.math_utils import nanmean, nanstd +from zipline.utils.memoize import classlazyval from zipline.utils.numpy_utils import ( bool_dtype, categorical_dtype, @@ -1072,9 +1073,9 @@ class Factor(RestrictedDTypeMixin, ComputableTerm): """ return (-inf < self) & (self < inf) - @property + @classlazyval def _downsampled_type(self): - return DownsampledFactor + return DownsampledMixin.make_downsampled_type(Factor) class NumExprFactor(NumericalExpression, Factor): @@ -1515,17 +1516,6 @@ class Latest(LatestMixin, CustomFactor): out[:] = data[-1] -class DownsampledFactor(DownsampledMixin, Factor): - """ - A Factor that defers to another Factor at lower-than-daily frequency. - - Parameters - ---------- - term : zipline.pipeline.Factor - freq : {'Y', 'Q', 'M', 'W'} - """ - - # Functions to be passed to GroupedRowTransform. These aren't defined inline # because the transformation function is part of the instance hash key. def demean(row): diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index c600fc15..13380dcd 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -33,6 +33,7 @@ from zipline.pipeline.mixins import ( ) from zipline.pipeline.term import ComputableTerm, Term from zipline.utils.input_validation import expect_types +from zipline.utils.memoize import classlazyval from zipline.utils.numpy_utils import bool_dtype, repeat_first_axis @@ -202,9 +203,9 @@ class Filter(RestrictedDTypeMixin, ComputableTerm): ) return retval - @property + @classlazyval def _downsampled_type(self): - return DownsampledFilter + return DownsampledMixin.make_downsampled_type(Filter) class NumExprFilter(NumericalExpression, Filter): @@ -463,17 +464,6 @@ class Latest(LatestMixin, CustomFilter): pass -class DownsampledFilter(DownsampledMixin, Filter): - """ - A Filter that defers to another Filter at lower-than-daily frequency. - - Parameters - ---------- - term : zipline.pipeline.Filter - freq : {'Y', 'Q', 'M', 'W'} - """ - - class SingleAsset(Filter): """ A Filter that computes to True only for the given asset. diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 9fa0ad6a..7302f7cc 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -1,7 +1,7 @@ """ Mixins classes for use with Filters and Factors. """ -from operator import attrgetter +from textwrap import dedent from numpy import ( array, @@ -17,10 +17,18 @@ from zipline.errors import ( NoFurtherDataError, ) from zipline.utils.control_flow import nullctx -from zipline.utils.input_validation import expect_element, expect_types -from zipline.utils.numpy_utils import changed_locations +from zipline.utils.input_validation import expect_types +from zipline.utils.sharedoc import ( + format_docstring, + PIPELINE_DOWNSAMPLING_FREQUENCY_DOC, +) from zipline.utils.pandas_utils import nearest_unequal_elements + +from .downsample_helpers import ( + select_sampling_indices, + expect_downsample_frequency, +) from .sentinels import NotSpecified from .term import Term @@ -232,49 +240,6 @@ class LatestMixin(SingleInputMixin): ) -_dt_to_period = { - 'Y': attrgetter('year'), - 'Q': attrgetter('quarter'), - 'M': attrgetter('month'), - 'W': attrgetter('week'), -} - - -def select_sampling_indices(dates, frequency): - """ - Choose entries from ``dates`` to use for downsampling at ``frequency``. - - Parameters - ---------- - dates : pd.DatetimeIndex - Dates from which to select sample choices. - frequency : {'Y', 'Q', 'M', 'W'} - Frequency at which samples are to be taken. - - Returns - ------- - indices : np.array[int64] - An array condtaining indices of dates on which samples should be taken. - - The resulting index will always include 0 as a sample index, and it - will include the first date of each subsequent year/quarter/month/week, - as determined by ``frequency``. - - Notes - ----- - This function assumes that ``dates`` does not have large gaps. - - In particular, it assumes that the maximum distance between any two entries - in ``dates`` is never greater than a year, which we rely on because we use - ``np.diff(dates.{quarter,month,week})`` to find dates where the sampling - period has changed. - """ - return changed_locations( - _dt_to_period[frequency](dates), - include_first=True - ) - - class DownsampledMixin(StandardOutputs): """ Mixin for behavior shared by Downsampled{Factor,Filter,Classifier} @@ -291,7 +256,7 @@ class DownsampledMixin(StandardOutputs): window_safe = False @expect_types(term=Term) - @expect_element(frequency=frozenset(_dt_to_period)) + @expect_downsample_frequency def __new__(cls, term, frequency): return super(DownsampledMixin, cls).__new__( cls, @@ -400,6 +365,17 @@ class DownsampledMixin(StandardOutputs): real_compute = self._wrapped_term._compute + # Inputs will contain different kinds of values depending on whether or + # not we're a windowed computation. + + # If we're windowed, then `inputs` is a list of iterators of ndarrays. + # If we're not windowed, then `inputs` is just a list of ndarrays. + # There are two things we care about doing with the input: + # 1. Preparing an input to be passed to our wrapped term. + # 2. Skipping an input if we're going to use an already-computed row. + # We perform these actions differently based on the expected kind of + # input, and we encapsulate these actions with closures so that we + # don't clutter the code below with lots of branching. if self.windowed: # If we're windowed, inputs are stateful AdjustedArrays. We don't # need to do any preparation before forwarding to real_compute, but @@ -412,8 +388,8 @@ class DownsampledMixin(StandardOutputs): next(w) else: # If we're not windowed, inputs are just ndarrays. We need to - # slice off one row when forwarding to real_compute, but we don't - # need to do anything to skip an input. + # slice out a single row when forwarding to real_compute, but we + # don't need to do anything to skip an input. def prepare_inputs(): # i is the loop iteration variable below. return [a[[i]] for a in inputs] @@ -455,3 +431,31 @@ class DownsampledMixin(StandardOutputs): # Concatenate stored results. return vstack(results) + + @classmethod + def make_downsampled_type(cls, other_base): + """ + Factory for making Downsampled{Filter,Factor,Classifier}. + """ + docstring = dedent( + """ + A {t} that defers to another {t} at lower-than-daily frequency. + + Parameters + ---------- + term : {t} + {{frequency}} + """ + ).format(t=other_base.__name__) + + doc = format_docstring( + owner_name=other_base.__name__, + docstring=docstring, + formatters={'frequency': PIPELINE_DOWNSAMPLING_FREQUENCY_DOC}, + ) + + return type( + 'Downsampled' + other_base.__name__, + (cls, other_base,), + {'__doc__': doc}, + ) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 98941fe8..6cecd147 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -37,7 +37,12 @@ from zipline.utils.numpy_utils import ( datetime64ns_dtype, default_missing_value_for_dtype, ) +from zipline.utils.sharedoc import ( + templated_docstring, + PIPELINE_DOWNSAMPLING_FREQUENCY_DOC, +) +from .downsample_helpers import expect_downsample_frequency from .sentinels import NotSpecified @@ -594,19 +599,15 @@ class ComputableTerm(Term): "for instances of %s." % type(self).__name__ ) + @expect_downsample_frequency + @templated_docstring(frequency=PIPELINE_DOWNSAMPLING_FREQUENCY_DOC) def downsample(self, frequency): """ Make a term that computes from ``self`` at lower-than-daily frequency. Parameters ---------- - frequency : str, {'Y', 'Q', 'M', 'W'} - A string indicating the desired sampling rate. - 'Y' -> sample on the first trading day of each calendar year - 'Q' -> sample on the first trading day of - January, April, July, and October - 'M' -> sample on the first trading day of each month - 'W' -> sample on the first trading day of each week + {frequency} """ return self._downsampled_type(term=self, frequency=frequency) diff --git a/zipline/utils/input_validation.py b/zipline/utils/input_validation.py index 6c5298a4..c8347cd3 100644 --- a/zipline/utils/input_validation.py +++ b/zipline/utils/input_validation.py @@ -483,10 +483,17 @@ def expect_element(*_pos, **named): raise TypeError("expect_element() only takes keyword arguments.") def _expect_element(collection): + if isinstance(collection, (set, frozenset)): + # Special case the error message for set and frozen set to make it + # less verbose. + collection_for_error_message = tuple(sorted(collection)) + else: + collection_for_error_message = collection + template = ( "%(funcname)s() expected a value in {collection} " "for argument '%(argname)s', but got %(actual)s instead." - ).format(collection=collection) + ).format(collection=collection_for_error_message) return make_check( ValueError, template, diff --git a/zipline/utils/sharedoc.py b/zipline/utils/sharedoc.py new file mode 100644 index 00000000..8ba028c6 --- /dev/null +++ b/zipline/utils/sharedoc.py @@ -0,0 +1,90 @@ +""" +Shared docstrings for parameters that should be documented identically +across different functions. +""" +import re +from six import iteritems +from textwrap import dedent + +PIPELINE_DOWNSAMPLING_FREQUENCY_DOC = dedent( + """\ + frequency : {'year_start', 'quarter_start', 'month_start', 'week_start'} + A string indicating desired sampling dates: + + 'year_start' -> first trading day of each year + 'quarter_start' -> first trading day of January, April, July, October + 'month_start' -> first trading day of each month + 'week_start' -> first trading_day of each week + """ +) + + +def pad_lines(prefix, s): + """Apply a prefix to each line in s.""" + return '\n'.join(prefix + line for line in s.splitlines()) + + +def format_docstring(owner_name, docstring, formatters): + """ + Template ``formatters`` into ``docstring``. + + Parameters + ---------- + owner_name : str + The name of the function or class whose docstring is being templated. + Only used for error messages. + docstring : str + The docstring to template. + formatters : dict[str -> str] + Parameters for a a str.format() call on ``docstring``. + + Multi-line values in ``formatters`` will have leading whitespace padded + to match the leading whitespace of the substitution string. + """ + # Build a dict of parameters to a vanilla format() call by searching for + # each entry in **formatters and applying any leading whitespace to each + # line in the desired substitution. + format_params = {} + for target, doc_for_target in iteritems(formatters): + # Search for '{name}', with optional leading whitespace. + regex = re.compile('^(\s*)' + '({' + target + '})$', re.MULTILINE) + matches = regex.findall(docstring) + if not matches: + raise ValueError( + "Couldn't find template for parameter {!r} in docstring " + "for {}." + "\nParameter name must be alone on a line surrounded by " + "braces.".format(target, owner_name), + ) + elif len(matches) > 1: + raise ValueError( + "Couldn't found multiple templates for parameter {!r}" + "in docstring for {}." + "\nParameter should only appear once.".format( + target, owner_name + ) + ) + + (leading_whitespace, _) = matches[0] + format_params[target] = pad_lines(leading_whitespace, doc_for_target) + + return docstring.format(**format_params) + + +def templated_docstring(**docs): + """ + Decorator allowing the use of templated docstrings. + + Usage + ----- + >>> @templated_docstring(foo='bar') + ... def my_func(self, foo): + ... '''{foo}''' + ... + >>> my_func.__doc__ + 'bar' + """ + def decorator(f): + f.__doc__ = format_docstring(f.__name__, f.__doc__, docs) + return f + return decorator From 5e7e56176f13c480012a1bae975813f9babaa3a5 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 14:30:41 -0400 Subject: [PATCH 19/25] BUG: Fix broken fixture resolution. Make sure that WithDefaultDateBounds is last in everyone's mro(). --- zipline/testing/fixtures.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index 88af7f1d..88d31f4d 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -235,6 +235,9 @@ class WithDefaultDateBounds(object): ZiplineTestCase mixin which makes it possible to synchronize date bounds across fixtures. + This fixture should always be the last fixture in bases of any fixture or + test case that uses it. + Attributes ---------- START_DATE : datetime @@ -420,7 +423,9 @@ class WithTradingCalendars(object): cls.trading_calendars[exchange] = get_calendar(cal_str) -class WithTradingEnvironment(WithAssetFinder, WithTradingCalendars): +class WithTradingEnvironment(WithAssetFinder, + WithTradingCalendars, + WithDefaultDateBounds): """ ZiplineTestCase mixin providing cls.env as a class-level fixture. From d82e38e73b4c76534ffcadd6a1ec02df9154e3e8 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 14:58:35 -0400 Subject: [PATCH 20/25] BUG/TEST: Fix test assertion in py3. --- tests/pipeline/test_downsampling.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/pipeline/test_downsampling.py b/tests/pipeline/test_downsampling.py index 357c62f0..56344051 100644 --- a/tests/pipeline/test_downsampling.py +++ b/tests/pipeline/test_downsampling.py @@ -18,6 +18,7 @@ from zipline.testing.fixtures import ( WithTradingSessions, WithSeededRandomPipelineEngine, ) +from zipline.utils.input_validation import _qualified_name from zipline.utils.numpy_utils import int64_dtype @@ -678,8 +679,8 @@ class DownsampledPipelineTestCase(WithSeededRandomPipelineEngine, f.downsample('bad') expected = ( - "zipline.pipeline.term.downsample() expected a value in " + "{}() expected a value in " "('month_start', 'quarter_start', 'week_start', 'year_start') " "for argument 'frequency', but got 'bad' instead." - ) + ).format(_qualified_name(f.downsample)) self.assertEqual(str(e.exception), expected) From 659ba57d4b7505ae5d9d3224434d7a6d3763165f Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 15:00:13 -0400 Subject: [PATCH 21/25] BUG: Force iterator for py3. --- tests/pipeline/test_filter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pipeline/test_filter.py b/tests/pipeline/test_filter.py index 62cc187d..c4562afe 100644 --- a/tests/pipeline/test_filter.py +++ b/tests/pipeline/test_filter.py @@ -168,7 +168,7 @@ class FilterTestCase(BasePipelineTestCase): quintiles = range(5) filter_names = ['pct_' + str(q) for q in quintiles] - iter_quintiles = zip(filter_names, quintiles) + iter_quintiles = list(zip(filter_names, quintiles)) terms = { name: self.f.percentile_between(q * 20.0, (q + 1) * 20.0) for name, q in iter_quintiles From 6006aedeef9e5782f564270ac9c26712704a1e76 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 18:24:30 -0400 Subject: [PATCH 22/25] BLD: Ignore numpy_utils.py doctests on windows. They fail because of 32/64-bit issues. --- appveyor.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appveyor.yml b/appveyor.yml index c3ba6ea9..1e4fce96 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -101,7 +101,7 @@ install: - pip freeze | sort test_script: - - nosetests + - nosetests -e zipline.utils.numpy_utils - flake8 zipline tests branches: From 8cc8814b5faa2f610ca089e02c6a11f347ce1565 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 19:29:57 -0400 Subject: [PATCH 23/25] BUG: Fix nondeterministic failure from sorting. --- tests/utils/test_preprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils/test_preprocess.py b/tests/utils/test_preprocess.py index b9e0162a..c3ad15df 100644 --- a/tests/utils/test_preprocess.py +++ b/tests/utils/test_preprocess.py @@ -264,7 +264,7 @@ class PreprocessTestCase(TestCase): " for argument 'a', but got 'c' instead." ).format( # We special-case set to show a tuple instead of the set repr. - set_=tuple(set_), + set_=tuple(sorted(set_)), qualname=qualname(f), ) self.assertEqual(e.exception.args[0], expected_message) From b1894e0d54c319bdf0f6290986cf0af060a183f6 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 17 Aug 2016 19:30:08 -0400 Subject: [PATCH 24/25] BUG: Supply a module for Downsampled terms. --- zipline/pipeline/mixins.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 7302f7cc..a1825101 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -457,5 +457,6 @@ class DownsampledMixin(StandardOutputs): return type( 'Downsampled' + other_base.__name__, (cls, other_base,), - {'__doc__': doc}, + {'__doc__': doc, + '__module__': other_base.__module__}, ) From 5a5353beadb788a85dc99438886bc63e81c6832a Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Thu, 18 Aug 2016 11:07:17 -0400 Subject: [PATCH 25/25] BUG: Fix broken graph visualizations. --- tests/pipeline/test_pipeline.py | 58 +++++++++++++++++++++++++++++++++ zipline/pipeline/pipeline.py | 15 +++++++-- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 9eb686d1..30d9014c 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,10 +1,14 @@ """ Tests for zipline.pipeline.Pipeline """ +import inspect from unittest import TestCase +from mock import patch + from zipline.pipeline import Factor, Filter, Pipeline from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.graph import display_graph from zipline.utils.numpy_utils import float64_dtype @@ -137,3 +141,57 @@ class PipelineTestCase(TestCase): "expected a value of type bool or int for argument 'overwrite'", message, ) + + def test_show_graph(self): + f = SomeFactor() + p = Pipeline(columns={'f': SomeFactor()}) + + # The real display_graph call shells out to GraphViz, which isn't a + # requirement, so patch it out for testing. + + def mock_display_graph(g, format='svg', include_asset_exists=False): + return (g, format, include_asset_exists) + + self.assertEqual( + inspect.getargspec(display_graph), + inspect.getargspec(mock_display_graph), + msg="Mock signature doesn't match signature for display_graph." + ) + + patch_display_graph = patch( + 'zipline.pipeline.graph.display_graph', + mock_display_graph, + ) + + with patch_display_graph: + graph, format, include_asset_exists = p.show_graph() + self.assertIs(graph.outputs['f'], f) + # '' is a sentinel used for screen if it's not supplied. + self.assertEqual(sorted(graph.outputs.keys()), ['', 'f']) + self.assertEqual(format, 'svg') + self.assertEqual(include_asset_exists, False) + + with patch_display_graph: + graph, format, include_asset_exists = p.show_graph(format='png') + self.assertIs(graph.outputs['f'], f) + # '' is a sentinel used for screen if it's not supplied. + self.assertEqual(sorted(graph.outputs.keys()), ['', 'f']) + self.assertEqual(format, 'png') + self.assertEqual(include_asset_exists, False) + + with patch_display_graph: + graph, format, include_asset_exists = p.show_graph(format='jpeg') + self.assertIs(graph.outputs['f'], f) + # '' is a sentinel used for screen if it's not supplied. + self.assertEqual(sorted(graph.outputs.keys()), ['', 'f']) + self.assertEqual(format, 'jpeg') + self.assertEqual(include_asset_exists, False) + + expected = ( + r".*\.show_graph\(\) expected a value in " + r"\('svg', 'png', 'jpeg'\) for argument 'format', " + r"but got 'fizzbuzz' instead." + ) + + with self.assertRaisesRegexp(ValueError, expected): + p.show_graph(format='fizzbuzz') diff --git a/zipline/pipeline/pipeline.py b/zipline/pipeline/pipeline.py index 631b99d3..78ee5ed5 100644 --- a/zipline/pipeline/pipeline.py +++ b/zipline/pipeline/pipeline.py @@ -1,6 +1,10 @@ from zipline.errors import UnsupportedPipelineOutput -from zipline.utils.input_validation import expect_types, optional +from zipline.utils.input_validation import ( + expect_element, + expect_types, + optional, +) from .graph import ExecutionPlan, TermGraph from .filters import Filter @@ -189,7 +193,9 @@ class Pipeline(object): default_screen : zipline.pipeline.term.Term Term to use as a screen if self.screen is None. """ - return TermGraph(self._prepare_graph_terms()) + return TermGraph( + self._prepare_graph_terms(screen_name, default_screen) + ) def _prepare_graph_terms(self, screen_name, default_screen): """Helper for to_graph and to_execution_plan.""" @@ -200,6 +206,7 @@ class Pipeline(object): columns[screen_name] = screen return columns + @expect_element(format=('svg', 'png', 'jpeg')) def show_graph(self, format='svg'): """ Render this Pipeline as a DAG. @@ -217,7 +224,9 @@ class Pipeline(object): elif format == 'jpeg': return g.jpeg else: - raise ValueError("Unknown graph format %r." % format) + # We should never get here because of the expect_element decorator + # above. + raise AssertionError("Unknown graph format %r." % format) @staticmethod def validate_column(column_name, term):