From e880fa3e3453719338a4115d3787a7f88f047dfa Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Thu, 10 Sep 2015 15:34:24 -0400 Subject: [PATCH 01/11] PERF: Batch load atomic terms by dataset Added CompositeTerm and now we dispatch more generally on atomic --- tests/pipeline/test_term.py | 57 +++----- zipline/pipeline/classifier.py | 4 +- zipline/pipeline/data/dataset.py | 6 +- zipline/pipeline/engine.py | 33 +++-- zipline/pipeline/expression.py | 4 +- zipline/pipeline/factors/factor.py | 4 +- zipline/pipeline/filters/filter.py | 4 +- zipline/pipeline/graph.py | 43 +++--- zipline/pipeline/term.py | 225 +++++++++++++++++------------ zipline/pipeline/visualize.py | 2 +- 10 files changed, 213 insertions(+), 169 deletions(-) diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index 9fc9fac8..c10bf16b 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -82,11 +82,16 @@ def to_dict(l): class DependencyResolutionTestCase(TestCase): - def setup(self): - pass + def check_dependency_order(self, ordered_terms): + seen = set() - def teardown(self): - pass + for term in ordered_terms: + if not term.atomic: + for input_ in term.inputs: + self.assertIn(input_, seen) + self.assertIn(term.mask, seen) + + seen.add(term) def test_single_factor(self): """ @@ -97,12 +102,12 @@ class DependencyResolutionTestCase(TestCase): resolution_order = list(graph.ordered()) self.assertEqual(len(resolution_order), 4) - self.assertIs(resolution_order[0], AssetExists()) - self.assertEqual( - set([resolution_order[1], resolution_order[2]]), - set([SomeDataSet.foo, SomeDataSet.bar]), - ) - self.assertEqual(resolution_order[-1], SomeFactor()) + self.check_dependency_order(resolution_order) + self.assertIn(AssetExists(), resolution_order) + self.assertIn(SomeDataSet.foo, resolution_order) + self.assertIn(SomeDataSet.bar, resolution_order) + self.assertIn(SomeFactor(), resolution_order) + self.assertEqual(graph.node[SomeDataSet.foo]['extra_rows'], 4) self.assertEqual(graph.node[SomeDataSet.bar]['extra_rows'], 4) @@ -121,18 +126,14 @@ class DependencyResolutionTestCase(TestCase): # SomeFactor, its inputs, and AssetExists() self.assertEqual(len(resolution_order), 4) - - self.assertIs(resolution_order[0], AssetExists()) + self.check_dependency_order(resolution_order) + self.assertIn(AssetExists(), resolution_order) self.assertEqual(graph.extra_rows[AssetExists()], 4) - self.assertEqual( - set([resolution_order[1], resolution_order[2]]), - set([bar, buzz]), - ) - self.assertEqual( - resolution_order[-1], - SomeFactor([bar, buzz], window_length=5), - ) + self.assertIn(bar, resolution_order) + self.assertIn(buzz, resolution_order) + self.assertIn(SomeFactor([bar, buzz], window_length=5), + resolution_order) self.assertEqual(graph.extra_rows[bar], 4) self.assertEqual(graph.extra_rows[buzz], 4) @@ -148,20 +149,8 @@ class DependencyResolutionTestCase(TestCase): # bar should only appear once. self.assertEqual(len(resolution_order), 6) - indices = { - term: resolution_order.index(term) - for term in resolution_order - } - - self.assertEqual(indices[AssetExists()], 0) - - # Verify that f1's dependencies will be computed before f1. - self.assertLess(indices[SomeDataSet.foo], indices[f1]) - self.assertLess(indices[SomeDataSet.bar], indices[f1]) - - # Verify that f2's dependencies will be computed before f2. - self.assertLess(indices[SomeDataSet.bar], indices[f2]) - self.assertLess(indices[SomeDataSet.buzz], indices[f2]) + self.assertEqual(len(set(resolution_order)), 6) + self.check_dependency_order(resolution_order) def test_disallow_recursive_lookback(self): diff --git a/zipline/pipeline/classifier.py b/zipline/pipeline/classifier.py index aa1263ad..16219d58 100644 --- a/zipline/pipeline/classifier.py +++ b/zipline/pipeline/classifier.py @@ -2,8 +2,8 @@ classifier.py """ -from zipline.pipeline.term import Term +from zipline.pipeline.term import CompositeTerm -class Classifier(Term): +class Classifier(CompositeTerm): pass diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index 916c352d..211d03b0 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -6,7 +6,7 @@ from six import ( with_metaclass, ) -from zipline.pipeline.term import Term +from zipline.pipeline.term import AtomicTerm from zipline.pipeline.factors import Latest @@ -25,7 +25,7 @@ class Column(object): return BoundColumn(dtype=self.dtype, dataset=dataset, name=name) -class BoundColumn(Term): +class BoundColumn(AtomicTerm): """ A Column of data that's been concretely bound to a particular dataset. """ @@ -33,8 +33,6 @@ class BoundColumn(Term): def __new__(cls, dtype, dataset, name): return super(BoundColumn, cls).__new__( cls, - inputs=(), - window_length=0, domain=dataset.domain, dtype=dtype, dataset=dataset, diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index d11746f0..d1d421d3 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -240,7 +240,15 @@ class SimplePipelineEngine(object): offset = graph.extra_rows[mask] - graph.extra_rows[term] return workspace[mask][offset:], dates[offset:] - def _inputs_for_term(self, term, workspace, graph): + def _mask_and_dates_for_atomic_terms(self, terms, workspace, graph, dates): + max_extra_rows = max(graph.extra_rows[term] for term in terms) + + mask = self._root_mask_term + offset = graph.extra_rows[mask] - max_extra_rows + return workspace[mask][offset:], dates[offset:] + + @staticmethod + def _inputs_for_term(term, workspace, graph): """ Compute inputs for the given term. @@ -273,6 +281,12 @@ class SimplePipelineEngine(object): out.append(input_data) return out + @staticmethod + def _atomic_dataset_terms(graph, match): + for term in graph.atomic_terms: + if term.dataset == match.dataset: + yield term + def compute_chunk(self, graph, dates, assets, initial_workspace): """ Compute the Pipeline terms in the graph for the requested start and end @@ -310,15 +324,11 @@ class SimplePipelineEngine(object): if term in workspace: continue - # Asset labels are always the same, but date labels vary by how - # many extra rows are needed. - mask, mask_dates = self._mask_and_dates_for_term( - term, workspace, graph, dates - ) if term.atomic: - # FUTURE OPTIMIZATION: Scan the resolution order for terms in - # the same dataset and load them here as well. - to_load = [term] + to_load = list(self._atomic_dataset_terms(graph, term)) + mask, mask_dates = self._mask_and_dates_for_atomic_terms( + to_load, workspace, graph, dates, + ) loaded = loader.load_adjusted_array( to_load, mask_dates, assets, mask, ) @@ -326,6 +336,11 @@ class SimplePipelineEngine(object): for loaded_term, adj_array in zip_longest(to_load, loaded): workspace[loaded_term] = adj_array else: + # Asset labels are always the same, but date labels vary by how + # many extra rows are needed. + mask, mask_dates = self._mask_and_dates_for_term( + term, workspace, graph, dates + ) workspace[term] = term._compute( self._inputs_for_term(term, workspace, graph), mask_dates, diff --git a/zipline/pipeline/expression.py b/zipline/pipeline/expression.py index 8ec5200e..b4d100b0 100644 --- a/zipline/pipeline/expression.py +++ b/zipline/pipeline/expression.py @@ -12,7 +12,7 @@ from numpy import ( find_common_type, ) -from zipline.pipeline.term import Term, NotSpecified +from zipline.pipeline.term import Term, NotSpecified, CompositeTerm _VARIABLE_NAME_RE = re.compile("^(x_)([0-9]+)$") @@ -154,7 +154,7 @@ def is_comparison(op): return op in COMPARISONS -class NumericalExpression(Term): +class NumericalExpression(CompositeTerm): """ Term binding to a numexpr expression. diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index bc05fcb2..5855777d 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -21,7 +21,7 @@ from zipline.pipeline.term import ( NotSpecified, RequiredWindowLengthMixin, SingleInputMixin, - Term, + CompositeTerm, ) from zipline.pipeline.expression import ( BadBinaryOperator, @@ -184,7 +184,7 @@ def function_application(func): return mathfunc -class Factor(Term): +class Factor(CompositeTerm): """ Pipeline API expression producing numerically-valued outputs. """ diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 43575750..9869e910 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -15,7 +15,7 @@ from zipline.errors import ( ) from zipline.pipeline.term import ( SingleInputMixin, - Term, + CompositeTerm, ) from zipline.pipeline.expression import ( BadBinaryOperator, @@ -83,7 +83,7 @@ def binary_operator(op): return binary_operator -class Filter(Term): +class Filter(CompositeTerm): """ Pipeline API expression producing boolean-valued outputs. """ diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 428b5424..3a91a37c 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -104,11 +104,12 @@ class TermGraph(DiGraph): """ out = {} for term in self: - extra_input_rows = term.extra_input_rows - for input_ in term.inputs: - out[term, input_] = self.extra_rows[input_] - extra_input_rows - mask = term.mask - if term.mask is not None: + if not term.atomic: + extra_input_rows = term.extra_input_rows + for input_ in term.inputs: + out[term, input_] = (self.extra_rows[input_] + - extra_input_rows) + mask = term.mask out[term, mask] = self.extra_rows[mask] - extra_input_rows return out @@ -168,6 +169,10 @@ class TermGraph(DiGraph): """ return iter(self._ordered) + @lazyval + def atomic_terms(self): + return tuple(term for term in self if term.atomic) + def _add_to_graph(self, term, parents, extra_rows): """ Add `term` and all its inputs to the graph. @@ -187,21 +192,23 @@ class TermGraph(DiGraph): # Make sure we're going to compute at least `extra_rows` of `term`. self._ensure_extra_rows(term, extra_rows) - # Number of extra rows we need to compute for this term's dependencies. - dependency_extra_rows = extra_rows + term.extra_input_rows + if not term.atomic: + # Number of extra rows we need to compute for this term's + # dependencies. + dependency_extra_rows = extra_rows + term.extra_input_rows - # Recursively add dependencies. - for dependency in term.inputs: - self._add_to_graph( - dependency, - parents, - extra_rows=dependency_extra_rows, - ) - self.add_edge(dependency, term) + # Recursively add dependencies. + for dependency in term.inputs: + self._add_to_graph( + dependency, + parents, + extra_rows=dependency_extra_rows, + ) + self.add_edge(dependency, term) - # Add term's mask, which is really just a specially-enumerated input. - mask = term.mask - if mask is not None: + # Add term's mask, which is really just a specially-enumerated + # input. + mask = term.mask self._add_to_graph(mask, parents, extra_rows=dependency_extra_rows) self.add_edge(mask, term) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 56855253..1dc3974a 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -43,18 +43,12 @@ class Term(object): Base class for terms in a Pipeline API compute graph. """ # These are NotSpecified because a subclass is required to provide them. - inputs = NotSpecified - window_length = NotSpecified dtype = NotSpecified - mask = NotSpecified domain = NotSpecified _term_cache = WeakValueDictionary() def __new__(cls, - inputs=NotSpecified, - mask=NotSpecified, - window_length=NotSpecified, domain=NotSpecified, dtype=NotSpecified, *args, @@ -72,23 +66,6 @@ class Term(object): # Class-level attributes can be used to provide defaults for Term # subclasses. - if inputs is NotSpecified: - inputs = cls.inputs - # Having inputs = NotSpecified is an error, but we handle it later - # in self._validate rather than here. - if inputs is not NotSpecified: - # Allow users to specify lists as class-level defaults, but - # normalize to a tuple so that inputs is hashable. - inputs = tuple(inputs) - - if mask is NotSpecified: - mask = cls.mask - if mask is NotSpecified: - mask = AssetExists() - - if window_length is NotSpecified: - window_length = cls.window_length - if domain is NotSpecified: domain = cls.domain @@ -96,9 +73,6 @@ class Term(object): dtype = cls.dtype identity = cls.static_identity( - inputs=inputs, - mask=mask, - window_length=window_length, domain=domain, dtype=dtype, *args, **kwargs @@ -109,9 +83,6 @@ class Term(object): except KeyError: new_instance = cls._term_cache[identity] = \ super(Term, cls).__new__(cls)._init( - inputs=inputs, - mask=mask, - window_length=window_length, domain=domain, dtype=dtype, *args, **kwargs @@ -134,10 +105,7 @@ class Term(object): """ pass - def _init(self, inputs, mask, window_length, domain, dtype): - self.inputs = inputs - self.mask = mask - self.window_length = window_length + def _init(self, domain, dtype): self.domain = domain self.dtype = dtype @@ -145,7 +113,7 @@ class Term(object): return self @classmethod - def static_identity(cls, inputs, mask, window_length, domain, dtype): + def static_identity(cls, domain, dtype): """ Return the identity of the Term that would be constructed from the given arguments. @@ -157,80 +125,25 @@ class Term(object): This is a classmethod so that it can be called from Term.__new__ to determine whether to produce a new instance. """ - return (cls, inputs, mask, window_length, domain, dtype) + return (cls, domain, dtype) def _validate(self): """ Assert that this term is well-formed. This should be called exactly once, at the end of Term._init(). """ - if self.inputs is NotSpecified: - raise TermInputsNotSpecified(termname=type(self).__name__) - if self.window_length is NotSpecified: - raise WindowLengthNotSpecified(termname=type(self).__name__) if self.dtype is NotSpecified: raise DTypeNotSpecified(termname=type(self).__name__) - if self.mask is NotSpecified and not self.atomic: - # This isn't user error, this is a bug in our code. - raise AssertionError("{term} has no mask".format(term=self)) - if self.window_length: - for child in self.inputs: - if not child.atomic: - raise InputTermNotAtomic(parent=self, child=child) - - @lazyval + @property def atomic(self): """ Whether or not this term has dependencies. If term.atomic is truthy, it should have dataset and dtype attributes. """ - return len(self.inputs) == 0 - - @lazyval - def windowed(self): - """ - Whether or not this term represents a trailing window computation. - - If term.windowed is truthy, its compute_from_windows method will be - called with instances of AdjustedArray as inputs. - - If term.windowed is falsey, its compute_from_baseline will be called - with instances of np.ndarray as inputs. - """ - return ( - self.window_length is not NotSpecified - and self.window_length > 0 - ) - - @lazyval - def extra_input_rows(self): - """ - The number of extra rows needed for each of our inputs to compute this - term. - """ - return max(0, self.window_length - 1) - - def _compute(self, inputs, dates, assets, mask): - """ - Subclasses should implement this to perform actual computation. - - This is `_compute` rather than just `compute` because `compute` is - reserved for user-supplied functions in CustomFactor. - """ raise NotImplementedError() - def __repr__(self): - return ( - "{type}({inputs}, window_length={window_length})" - ).format( - type=type(self).__name__, - inputs=self.inputs, - window_length=self.window_length, - mask=self.mask, - ) - # TODO: Move mixins to a separate file? class SingleInputMixin(object): @@ -307,7 +220,128 @@ class CustomTermMixin(object): return out -class AssetExists(Term): +class AtomicTerm(Term): + + @property + def atomic(self): + return True + + @property + def dataset(self): + raise NotImplementedError() + + +class CompositeTerm(Term): + inputs = NotSpecified + window_length = NotSpecified + mask = NotSpecified + + def __new__(cls, inputs=NotSpecified, window_length=NotSpecified, + mask=NotSpecified, *args, **kwargs): + + if inputs is NotSpecified: + inputs = cls.inputs + # Having inputs = NotSpecified is an error, but we handle it later + # in self._validate rather than here. + if inputs is not NotSpecified: + # Allow users to specify lists as class-level defaults, but + # normalize to a tuple so that inputs is hashable. + inputs = tuple(inputs) + + if mask is NotSpecified: + mask = cls.mask + if mask is NotSpecified: + mask = AssetExists() + + if window_length is NotSpecified: + window_length = cls.window_length + + return super(CompositeTerm, cls).__new__(cls, inputs=inputs, mask=mask, + window_length=window_length, + *args, **kwargs) + + def _init(self, inputs, window_length, mask, *args, **kwargs): + self.inputs = inputs + self.window_length = window_length + self.mask = mask + return super(CompositeTerm, self)._init(*args, **kwargs) + + @classmethod + def static_identity(cls, inputs, window_length, mask, *args, **kwargs): + return ( + super(CompositeTerm, cls).static_identity(*args, **kwargs), + inputs, + window_length, + mask, + ) + + def _validate(self): + """ + Assert that this term is well-formed. This should be called exactly + once, at the end of Term._init(). + """ + if self.inputs is NotSpecified: + raise TermInputsNotSpecified(termname=type(self).__name__) + if self.window_length is NotSpecified: + raise WindowLengthNotSpecified(termname=type(self).__name__) + if self.mask is NotSpecified: + # This isn't user error, this is a bug in our code. + raise AssertionError("{term} has no mask".format(term=self)) + + if self.window_length: + for child in self.inputs: + if not child.atomic: + raise InputTermNotAtomic(parent=self, child=child) + + return super(CompositeTerm, self)._validate() + + @property + def atomic(self): + return False + + def _compute(self, inputs, dates, assets, mask): + """ + Subclasses should implement this to perform actual computation. + This is `_compute` rather than just `compute` because `compute` is + reserved for user-supplied functions in CustomFactor. + """ + raise NotImplementedError() + + @lazyval + def windowed(self): + """ + Whether or not this term represents a trailing window computation. + + If term.windowed is truthy, its compute_from_windows method will be + called with instances of AdjustedArray as inputs. + + If term.windowed is falsey, its compute_from_baseline will be called + with instances of np.ndarray as inputs. + """ + return ( + self.window_length is not NotSpecified + and self.window_length > 0 + ) + + @lazyval + def extra_input_rows(self): + """ + The number of extra rows needed for each of our inputs to compute this + term. + """ + return max(0, self.window_length - 1) + + def __repr__(self): + return ( + "{type}({inputs}, window_length={window_length})" + ).format( + type=type(self).__name__, + inputs=self.inputs, + window_length=self.window_length, + ) + + +class AssetExists(AtomicTerm): """ Pseudo-filter describing whether or not an asset existed on a given day. This is the default mask for all terms that haven't been passed a mask @@ -321,10 +355,8 @@ class AssetExists(Term): -------- zipline.assets.AssetFinder.lifetimes """ - inputs = () dtype = bool_ - window_length = 0 - mask = None + dataset = None def _compute(self, *args, **kwargs): # TODO: Consider moving the bulk of the logic from @@ -332,3 +364,6 @@ class AssetExists(Term): raise NotImplementedError( "Direct computation of AssetExists is not supported!" ) + + def __repr__(self): + return "AssetExists()" diff --git a/zipline/pipeline/visualize.py b/zipline/pipeline/visualize.py index 03297a27..44ffe530 100644 --- a/zipline/pipeline/visualize.py +++ b/zipline/pipeline/visualize.py @@ -92,7 +92,7 @@ def _render(g, out, format_, include_asset_exists=False): graph_attrs = {'rankdir': 'TB', 'splines': 'ortho'} cluster_attrs = {'style': 'filled', 'color': 'lightgoldenrod1'} - in_nodes = list(node for node in g if node.atomic) + in_nodes = g.atomic_terms out_nodes = list(g.outputs.values()) f = BytesIO() From 83bd1310d9cfbdcde6c0b7a623714a825af59104 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Fri, 11 Sep 2015 10:19:21 -0400 Subject: [PATCH 02/11] PERF: Using pipeline_loader_dispatch to group by loader instead of dataset --- tests/pipeline/base.py | 2 +- tests/pipeline/test_engine.py | 25 ++++++++++++++++--------- tests/pipeline/test_pipeline_algo.py | 12 ++++++------ zipline/algorithm.py | 8 ++++---- zipline/pipeline/data/dataset.py | 5 +++++ zipline/pipeline/engine.py | 27 +++++++++++++++++++-------- 6 files changed, 51 insertions(+), 28 deletions(-) diff --git a/tests/pipeline/base.py b/tests/pipeline/base.py index 06ddae44..17556087 100644 --- a/tests/pipeline/base.py +++ b/tests/pipeline/base.py @@ -93,7 +93,7 @@ class BasePipelineTestCase(TestCase): Mapping from termname -> computed result. """ engine = SimplePipelineEngine( - ExplodingObject(), + lambda column: ExplodingObject(), self.__calendar, self.__finder, ) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index db89e26a..776881b2 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -115,7 +115,8 @@ class ConstantInputTestCase(TestCase): def test_bad_dates(self): loader = self.loader - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) p = Pipeline() @@ -129,7 +130,8 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = array(self.assets) - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) num_dates = 5 dates = self.dates[10:10 + num_dates] @@ -152,7 +154,8 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = self.assets - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) result_shape = (num_dates, num_assets) = (5, len(assets)) dates = self.dates[10:10 + num_dates] @@ -185,7 +188,8 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = self.assets - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) shape = num_dates, num_assets = (5, len(assets)) dates = self.dates[10:10 + num_dates] @@ -228,7 +232,8 @@ class ConstantInputTestCase(TestCase): def test_numeric_factor(self): constants = self.constants loader = self.loader - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) num_dates = 5 dates = self.dates[10:10 + num_dates] high, low = USEquityPricing.high, USEquityPricing.low @@ -355,7 +360,8 @@ class FrameInputTestCase(TestCase): high_loader = DataFrameLoader(high, high_base, adjustments) loader = MultiColumnLoader({low: low_loader, high: high_loader}) - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) for window_length in range(1, 4): low_mavg = SimpleMovingAverage( @@ -465,7 +471,7 @@ class SyntheticBcolzTestCase(TestCase): def test_SMA(self): engine = SimplePipelineEngine( - self.pipeline_loader, + lambda column: self.pipeline_loader, self.env.trading_days, self.finder, ) @@ -517,7 +523,7 @@ class SyntheticBcolzTestCase(TestCase): # or zero, but verifying we correctly handle those corner cases is # valuable. engine = SimplePipelineEngine( - self.pipeline_loader, + lambda column: self.pipeline_loader, self.env.trading_days, self.finder, ) @@ -584,7 +590,8 @@ class MultiColumnLoaderTestCase(TestCase): dates=self.dates, assets=self.assets, ) - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) sumdiff = RollingSumDifference() diff --git a/tests/pipeline/test_pipeline_algo.py b/tests/pipeline/test_pipeline_algo.py index 40fef5f5..09ac4aed 100644 --- a/tests/pipeline/test_pipeline_algo.py +++ b/tests/pipeline/test_pipeline_algo.py @@ -182,7 +182,7 @@ class ClosesOnly(TestCase): initialize=initialize, handle_data=late_attach, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + pipeline_loader_dispatch=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -199,7 +199,7 @@ class ClosesOnly(TestCase): before_trading_start=late_attach, handle_data=barf, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + pipeline_loader_dispatch=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -228,7 +228,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + pipeline_loader_dispatch=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -256,7 +256,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + pipeline_loader_dispatch=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -294,7 +294,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + pipeline_loader_dispatch=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -524,7 +524,7 @@ class PipelineAlgorithmTestCase(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + pipeline_loader_dispatch=lambda column: self.pipeline_loader, start=self.dates[max(window_lengths)], end=self.dates[-1], env=self.env, diff --git a/zipline/algorithm.py b/zipline/algorithm.py index c6e967a3..83685601 100644 --- a/zipline/algorithm.py +++ b/zipline/algorithm.py @@ -232,7 +232,7 @@ class TradingAlgorithm(object): self.asset_finder = self.trading_environment.asset_finder # Initialize Pipeline API data. - self.init_engine(kwargs.pop('pipeline_loader', None)) + self.init_engine(kwargs.pop('pipeline_loader_dispatch', None)) self._pipelines = {} # Create an always-expired cache so that we compute the first time data # is requested. @@ -323,15 +323,15 @@ class TradingAlgorithm(object): self.initialize_args = args self.initialize_kwargs = kwargs - def init_engine(self, loader): + def init_engine(self, loader_dispatch): """ Construct and store a PipelineEngine from loader. If loader is None, constructs a NoOpPipelineEngine. """ - if loader is not None: + if loader_dispatch is not None: self.engine = SimplePipelineEngine( - loader, + loader_dispatch, self.trading_environment.trading_days, self.asset_finder, ) diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index 211d03b0..dd64fd50 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -1,6 +1,7 @@ """ dataset.py """ +from functools import total_ordering from six import ( iteritems, with_metaclass, @@ -84,6 +85,7 @@ class BoundColumn(AtomicTerm): return self.qualname +@total_ordering class DataSetMeta(type): """ Metaclass for DataSets @@ -107,6 +109,9 @@ class DataSetMeta(type): def columns(self): return self._columns + def __lt__(self, other): + return id(self) < id(other) + class DataSet(with_metaclass(DataSetMeta)): domain = None diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index d1d421d3..ba5485bc 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -92,15 +92,15 @@ class SimplePipelineEngine(object): which assets are in the top-level universe at any point in time. """ __slots__ = [ - '_loader', + '_loader_dispatch', '_calendar', '_finder', '_root_mask_term', '__weakref__', ] - def __init__(self, loader, calendar, asset_finder): - self._loader = loader + def __init__(self, loader_dispatch, calendar, asset_finder): + self._loader_dispatch = loader_dispatch self._calendar = calendar self._finder = asset_finder self._root_mask_term = AssetExists() @@ -281,12 +281,21 @@ class SimplePipelineEngine(object): out.append(input_data) return out - @staticmethod - def _atomic_dataset_terms(graph, match): + def _atomic_terms_for_loader(self, graph, loader): + loader_dispatch = self.loader_dispatch for term in graph.atomic_terms: - if term.dataset == match.dataset: + if loader_dispatch(term) == loader: yield term + def loader_dispatch(self, term): + if term is AssetExists(): + return None + + loader = self._loader_dispatch(term) + if loader is None: + raise ValueError("Couldn't find loader for %s" % term) + return loader + def compute_chunk(self, graph, dates, assets, initial_workspace): """ Compute the Pipeline terms in the graph for the requested start and end @@ -311,7 +320,7 @@ class SimplePipelineEngine(object): Dictionary mapping requested results to outputs. """ self._validate_compute_chunk_params(dates, assets, initial_workspace) - loader = self._loader + loader_dispatch = self.loader_dispatch # Copy the supplied initial workspace so we don't mutate it in place. workspace = initial_workspace.copy() @@ -325,7 +334,9 @@ class SimplePipelineEngine(object): continue if term.atomic: - to_load = list(self._atomic_dataset_terms(graph, term)) + loader = loader_dispatch(term) + to_load = sorted(self._atomic_terms_for_loader(graph, loader), + key=lambda t: t.dataset) mask, mask_dates = self._mask_and_dates_for_atomic_terms( to_load, workspace, graph, dates, ) From 143f780036e15f5759f2ea9eaf241075bd2e1548 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Fri, 18 Sep 2015 11:48:15 -0400 Subject: [PATCH 03/11] MAINT: Fixed base class signature --- zipline/pipeline/loaders/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/pipeline/loaders/base.py b/zipline/pipeline/loaders/base.py index 5e8b84b7..0f5dcedb 100644 --- a/zipline/pipeline/loaders/base.py +++ b/zipline/pipeline/loaders/base.py @@ -17,5 +17,5 @@ class PipelineLoader(with_metaclass(ABCMeta)): TODO: DOCUMENT THIS MORE! """ @abstractmethod - def load_adjusted_array(self, columns, mask): + def load_adjusted_array(self, columns, dates, assets, mask): pass From ba0542a641a9b28b032787aa15503977b9550040 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Mon, 5 Oct 2015 12:34:43 -0400 Subject: [PATCH 04/11] MAINT: Removed MultiColumnLoader since we can use pipeline_loader_dispatch instead --- tests/pipeline/test_engine.py | 129 ++++++++++++-------------- zipline/pipeline/loaders/synthetic.py | 43 +++------ 2 files changed, 72 insertions(+), 100 deletions(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 776881b2..f2258358 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -26,7 +26,6 @@ from testfixtures import TempDirectory from zipline.pipeline.loaders.synthetic import ( ConstantLoader, - MultiColumnLoader, NullAdjustmentReader, SyntheticDailyBarWriter, ) @@ -97,7 +96,7 @@ class ConstantInputTestCase(TestCase): USEquityPricing.high: 4, } self.assets = [1, 2, 3] - self.dates = date_range('2014-01-01', '2014-02-01', freq='D', tz='UTC') + self.dates = date_range('2014-01', '2014-03', freq='D', tz='UTC') self.loader = ConstantLoader( constants=self.constants, dates=self.dates, @@ -276,6 +275,57 @@ class ConstantInputTestCase(TestCase): DataFrame(expected_avg, index=dates, columns=self.assets), ) + def test_rolling_and_nonrolling(self): + open_ = USEquityPricing.open + close = USEquityPricing.close + volume = USEquityPricing.volume + + # Test for thirty days up to the last day that we think all + # the assets existed. + dates_to_test = self.dates[-30:] + + constants = {open_: 1, close: 2, volume: 3} + loader = ConstantLoader( + constants=constants, + dates=self.dates, + assets=self.assets, + ) + engine = SimplePipelineEngine(lambda column: loader, + self.dates, self.asset_finder) + + sumdiff = RollingSumDifference() + + result = engine.run_pipeline( + Pipeline( + columns={ + 'sumdiff': sumdiff, + 'open': open_.latest, + 'close': close.latest, + 'volume': volume.latest, + }, + ), + dates_to_test[0], + dates_to_test[-1] + ) + self.assertIsNotNone(result) + self.assertEqual( + {'sumdiff', 'open', 'close', 'volume'}, + set(result.columns) + ) + + result_index = self.assets * len(dates_to_test) + result_shape = (len(result_index),) + check_arrays( + result['sumdiff'], + Series(index=result_index, data=full(result_shape, -3)), + ) + + for name, const in [('open', 1), ('close', 2), ('volume', 3)]: + check_arrays( + result[name], + Series(index=result_index, data=full(result_shape, const)), + ) + class FrameInputTestCase(TestCase): @@ -358,10 +408,12 @@ class FrameInputTestCase(TestCase): high_base.iloc[:apply_idxs[2], 1] /= 5.0 high_loader = DataFrameLoader(high, high_base, adjustments) - loader = MultiColumnLoader({low: low_loader, high: high_loader}) - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + {low: low_loader, high: high_loader}.__getitem__, + self.dates, + self.asset_finder, + ) for window_length in range(1, 4): low_mavg = SimpleMovingAverage( @@ -558,70 +610,3 @@ class SyntheticBcolzTestCase(TestCase): result = results['drawdown'].unstack() assert_frame_equal(expected, result) - - -class MultiColumnLoaderTestCase(TestCase): - def setUp(self): - self.assets = [1, 2, 3] - self.dates = date_range('2014-01', '2014-03', freq='D', tz='UTC') - - asset_info = make_simple_asset_info( - self.assets, - start_date=self.dates[0], - end_date=self.dates[-1], - ) - env = TradingEnvironment() - env.write_data(equities_df=asset_info) - self.asset_finder = env.asset_finder - - def test_engine_with_multicolumn_loader(self): - open_ = USEquityPricing.open - close = USEquityPricing.close - volume = USEquityPricing.volume - - # Test for thirty days up to the second to last day that we think all - # the assets existed. If we test the last day of our calendar, no - # assets will be in our output, because their end dates are all - dates_to_test = self.dates[-32:-2] - - constants = {open_: 1, close: 2, volume: 3} - loader = ConstantLoader( - constants=constants, - dates=self.dates, - assets=self.assets, - ) - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) - - sumdiff = RollingSumDifference() - - result = engine.run_pipeline( - Pipeline( - columns={ - 'sumdiff': sumdiff, - 'open': open_.latest, - 'close': close.latest, - 'volume': volume.latest, - }, - ), - dates_to_test[0], - dates_to_test[-1] - ) - self.assertIsNotNone(result) - self.assertEqual( - {'sumdiff', 'open', 'close', 'volume'}, - set(result.columns) - ) - - result_index = self.assets * len(dates_to_test) - result_shape = (len(result_index),) - check_arrays( - result['sumdiff'], - Series(index=result_index, data=full(result_shape, -3)), - ) - - for name, const in [('open', 1), ('close', 2), ('volume', 3)]: - check_arrays( - result[name], - Series(index=result_index, data=full(result_shape, const)), - ) diff --git a/zipline/pipeline/loaders/synthetic.py b/zipline/pipeline/loaders/synthetic.py index 2aff90de..92c2a6e4 100644 --- a/zipline/pipeline/loaders/synthetic.py +++ b/zipline/pipeline/loaders/synthetic.py @@ -32,33 +32,7 @@ def nanos_to_seconds(nanos): return nanos / (1000 * 1000 * 1000) -class MultiColumnLoader(PipelineLoader): - """ - PipelineLoader that can delegate to sub-loaders. - - Parameters - ---------- - loaders : dict - Dictionary mapping columns -> loader - """ - def __init__(self, loaders): - self._loaders = loaders - - def load_adjusted_array(self, columns, dates, assets, mask): - """ - Load by delegating to sub-loaders. - """ - out = [] - for col in columns: - try: - loader = self._loaders[col] - except KeyError: - raise ValueError("Couldn't find loader for %s" % col) - out.extend(loader.load_adjusted_array([col], dates, assets, mask)) - return out - - -class ConstantLoader(MultiColumnLoader): +class ConstantLoader(PipelineLoader): """ Synthetic PipelineLoader that returns a constant value for each column. @@ -91,7 +65,20 @@ class ConstantLoader(MultiColumnLoader): adjustments=None, ) - super(ConstantLoader, self).__init__(loaders) + self._loaders = loaders + + def load_adjusted_array(self, columns, dates, assets, mask): + """ + Load by delegating to sub-loaders. + """ + out = [] + for col in columns: + try: + loader = self._loaders[col] + except KeyError: + raise ValueError("Couldn't find loader for %s" % col) + out.extend(loader.load_adjusted_array([col], dates, assets, mask)) + return out class SyntheticDailyBarWriter(BcolzDailyBarWriter): From 7bd6b69a89311baca1df3f142d5604f698cbeb6d Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Tue, 6 Oct 2015 17:53:39 -0400 Subject: [PATCH 05/11] MAINT: Group by loader and extra_rows so that the mask and dates are the same for all the columns the loader is loading at a time. --- zipline/pipeline/engine.py | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index ba5485bc..a8347776 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -236,17 +236,10 @@ class SimplePipelineEngine(object): """ Load mask and mask row labels for term. """ - mask = term.mask + mask = term.mask if not term.atomic else self._root_mask_term offset = graph.extra_rows[mask] - graph.extra_rows[term] return workspace[mask][offset:], dates[offset:] - def _mask_and_dates_for_atomic_terms(self, terms, workspace, graph, dates): - max_extra_rows = max(graph.extra_rows[term] for term in terms) - - mask = self._root_mask_term - offset = graph.extra_rows[mask] - max_extra_rows - return workspace[mask][offset:], dates[offset:] - @staticmethod def _inputs_for_term(term, workspace, graph): """ @@ -281,10 +274,14 @@ class SimplePipelineEngine(object): out.append(input_data) return out - def _atomic_terms_for_loader(self, graph, loader): + def _similar_atomic_terms(self, graph, atomic_term): loader_dispatch = self.loader_dispatch + loader = loader_dispatch(atomic_term) + extra_rows = graph.extra_rows[atomic_term] + for term in graph.atomic_terms: - if loader_dispatch(term) == loader: + if (loader_dispatch(term) == loader + and graph.extra_rows[term] == extra_rows): yield term def loader_dispatch(self, term): @@ -333,13 +330,18 @@ class SimplePipelineEngine(object): if term in workspace: continue + # Asset labels are always the same, but date labels vary by how + # many extra rows are needed. + mask, mask_dates = self._mask_and_dates_for_term( + term, workspace, graph, dates + ) + if term.atomic: - loader = loader_dispatch(term) - to_load = sorted(self._atomic_terms_for_loader(graph, loader), - key=lambda t: t.dataset) - mask, mask_dates = self._mask_and_dates_for_atomic_terms( - to_load, workspace, graph, dates, + to_load = sorted( + self._similar_atomic_terms(graph, term), + key=lambda t: t.dataset ) + loader = loader_dispatch(term) loaded = loader.load_adjusted_array( to_load, mask_dates, assets, mask, ) @@ -347,11 +349,6 @@ class SimplePipelineEngine(object): for loaded_term, adj_array in zip_longest(to_load, loaded): workspace[loaded_term] = adj_array else: - # Asset labels are always the same, but date labels vary by how - # many extra rows are needed. - mask, mask_dates = self._mask_and_dates_for_term( - term, workspace, graph, dates - ) workspace[term] = term._compute( self._inputs_for_term(term, workspace, graph), mask_dates, From 940831e1cfc43a509e9e27eb164336eb8e6c2ee0 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Wed, 7 Oct 2015 14:18:41 -0400 Subject: [PATCH 06/11] TST: Added test that columns are batched when they share the same loader and extra_rows --- tests/pipeline/test_engine.py | 138 +++++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index f2258358..148c6a8b 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -2,6 +2,7 @@ Tests for SimplePipelineEngine """ from __future__ import division +from collections import OrderedDict from unittest import TestCase from itertools import product @@ -11,6 +12,8 @@ from numpy import ( nan, tile, zeros, + float32, + concatenate, ) from pandas import ( DataFrame, @@ -21,7 +24,9 @@ from pandas import ( Series, Timestamp, ) +from pandas.compat.chainmap import ChainMap from pandas.util.testing import assert_frame_equal +from six import iteritems, itervalues from testfixtures import TempDirectory from zipline.pipeline.loaders.synthetic import ( @@ -32,7 +37,7 @@ from zipline.pipeline.loaders.synthetic import ( from zipline.data.us_equity_pricing import BcolzDailyBarReader from zipline.finance.trading import TradingEnvironment from zipline.pipeline import Pipeline -from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.data import USEquityPricing, DataSet, Column from zipline.pipeline.loaders.frame import DataFrameLoader, MULTIPLY from zipline.pipeline.loaders.equity_pricing_loader import ( USEquityPricingLoader, @@ -84,6 +89,49 @@ def assert_multi_index_is_product(testcase, index, *levels): testcase.assertEqual(set(index), set(product(*levels))) +class ColumnArgs(tuple): + """A tuple of Columns that defines equivalence based on the order of the + columns' DataSets, instead of the columns themselves. This is used when + comparing the columns passed to a loader's load_adjusted_array method, + since we want to assert that they are ordered by DataSet. + """ + def __new__(cls, *cols): + return super(ColumnArgs, cls).__new__(cls, cols) + + @classmethod + def sorted_by_ds(cls, *cols): + return cls(*sorted(cols, key=lambda col: col.dataset)) + + def by_ds(self): + return tuple(col.dataset for col in self) + + def __eq__(self, other): + return set(self) == set(other) and self.by_ds() == other.by_ds() + + def __hash__(self): + return hash(frozenset(self)) + + +class RecordingConstantLoader(ConstantLoader): + def __init__(self, *args, **kwargs): + super(RecordingConstantLoader, self).__init__(*args, **kwargs) + + self.load_calls = [] + + def load_adjusted_array(self, columns, dates, assets, mask): + self.load_calls.append(ColumnArgs(*columns)) + + return super(RecordingConstantLoader, self).load_adjusted_array( + columns, dates, assets, mask, + ) + + +class RollingSumSum(CustomFactor): + def compute(self, today, assets, out, *inputs): + assert len(self.inputs) == len(inputs) + out[:] = sum(inputs).sum(axis=0) + + class ConstantInputTestCase(TestCase): def setUp(self): @@ -326,6 +374,94 @@ class ConstantInputTestCase(TestCase): Series(index=result_index, data=full(result_shape, const)), ) + def test_loader_given_multiple_columns(self): + + class Loader1DataSet1(DataSet): + col1 = Column(float32) + col2 = Column(float32) + + class Loader1DataSet2(DataSet): + col1 = Column(float32) + col2 = Column(float32) + + class Loader2DataSet(DataSet): + col1 = Column(float32) + col2 = Column(float32) + + constants1 = {Loader1DataSet1.col1: 1, + Loader1DataSet1.col2: 2, + Loader1DataSet2.col1: 3, + Loader1DataSet2.col2: 4} + loader1 = RecordingConstantLoader(constants=constants1, + dates=self.dates, + assets=self.assets) + constants2 = {Loader2DataSet.col1: 5, + Loader2DataSet.col2: 6} + loader2 = RecordingConstantLoader(constants=constants2, + dates=self.dates, + assets=self.assets) + + engine = SimplePipelineEngine(lambda column: loader2 + if column.dataset == Loader2DataSet + else loader1, + self.dates, self.asset_finder) + + pipe_col1 = RollingSumSum(inputs=[Loader1DataSet1.col1, + Loader1DataSet2.col1, + Loader2DataSet.col1], + window_length=2) + + pipe_col2 = RollingSumSum(inputs=[Loader1DataSet1.col2, + Loader1DataSet2.col2, + Loader2DataSet.col2], + window_length=3) + + pipe_col3 = RollingSumSum(inputs=[Loader2DataSet.col1], + window_length=3) + + columns = OrderedDict([ + ('pipe_col1', pipe_col1), + ('pipe_col2', pipe_col2), + ('pipe_col3', pipe_col3), + ]) + result = engine.run_pipeline( + Pipeline(columns=columns), + self.dates[2], # index is >= the largest window length - 1 + self.dates[-1] + ) + min_window = min(pip_col.window_length + for pip_col in itervalues(columns)) + col_to_val = ChainMap(constants1, constants2) + vals = {name: (sum(col_to_val[col] for col in pipe_col.inputs) + * pipe_col.window_length) + for name, pipe_col in iteritems(columns)} + + index = MultiIndex.from_product([self.dates[2:], self.assets]) + expected = DataFrame( + data={col: + concatenate(( + full((columns[col].window_length - min_window) + * index.levshape[1], + nan), + full((index.levshape[0] + - (columns[col].window_length - min_window)) + * index.levshape[1], + val))) + for col, val in iteritems(vals)}, + index=index, + columns=columns) + + assert_frame_equal(result, expected) + + self.assertEqual(set(loader1.load_calls), + {ColumnArgs.sorted_by_ds(Loader1DataSet1.col1, + Loader1DataSet2.col1), + ColumnArgs.sorted_by_ds(Loader1DataSet1.col2, + Loader1DataSet2.col2)}) + self.assertEqual(set(loader2.load_calls), + {ColumnArgs.sorted_by_ds(Loader2DataSet.col1, + Loader2DataSet.col2)}) + class FrameInputTestCase(TestCase): From 2dabda6b76caa7e2437863b5908c5bea87f8a7ec Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Fri, 9 Oct 2015 17:32:22 -0400 Subject: [PATCH 07/11] MAINT: Reworked Term atomicity --- tests/pipeline/test_term.py | 6 +- zipline/pipeline/data/dataset.py | 7 ++- zipline/pipeline/engine.py | 2 +- zipline/pipeline/graph.py | 41 +++++--------- zipline/pipeline/term.py | 97 ++++++++++++++++---------------- 5 files changed, 69 insertions(+), 84 deletions(-) diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index c10bf16b..ffa2df7b 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -86,10 +86,8 @@ class DependencyResolutionTestCase(TestCase): seen = set() for term in ordered_terms: - if not term.atomic: - for input_ in term.inputs: - self.assertIn(input_, seen) - self.assertIn(term.mask, seen) + for dep in term.dependencies: + self.assertIn(dep, seen) seen.add(term) diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index dd64fd50..fe8be41a 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -7,7 +7,7 @@ from six import ( with_metaclass, ) -from zipline.pipeline.term import AtomicTerm +from zipline.pipeline.term import Term, AssetExists from zipline.pipeline.factors import Latest @@ -26,10 +26,13 @@ class Column(object): return BoundColumn(dtype=self.dtype, dataset=dataset, name=name) -class BoundColumn(AtomicTerm): +class BoundColumn(Term): """ A Column of data that's been concretely bound to a particular dataset. """ + mask = AssetExists() + extra_input_rows = 0 + inputs = () def __new__(cls, dtype, dataset, name): return super(BoundColumn, cls).__new__( diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index a8347776..8abccb83 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -236,7 +236,7 @@ class SimplePipelineEngine(object): """ Load mask and mask row labels for term. """ - mask = term.mask if not term.atomic else self._root_mask_term + mask = term.mask offset = graph.extra_rows[mask] - graph.extra_rows[term] return workspace[mask][offset:], dates[offset:] diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 3a91a37c..af4a8326 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -102,16 +102,9 @@ class TermGraph(DiGraph): zipline.pipeline.engine.SimplePipelineEngine._inputs_for_term zipline.pipeline.engine.SimplePipelineEngine._mask_and_dates_for_term """ - out = {} - for term in self: - if not term.atomic: - extra_input_rows = term.extra_input_rows - for input_ in term.inputs: - out[term, input_] = (self.extra_rows[input_] - - extra_input_rows) - mask = term.mask - out[term, mask] = self.extra_rows[mask] - extra_input_rows - return out + return {(term, dep): self.extra_rows[dep] - term.extra_input_rows + for term in self + for dep in term.dependencies} @lazyval def extra_rows(self): @@ -192,25 +185,17 @@ class TermGraph(DiGraph): # Make sure we're going to compute at least `extra_rows` of `term`. self._ensure_extra_rows(term, extra_rows) - if not term.atomic: - # Number of extra rows we need to compute for this term's - # dependencies. - dependency_extra_rows = extra_rows + term.extra_input_rows + # Number of extra rows we need to compute for this term's dependencies. + dependency_extra_rows = extra_rows + term.extra_input_rows - # Recursively add dependencies. - for dependency in term.inputs: - self._add_to_graph( - dependency, - parents, - extra_rows=dependency_extra_rows, - ) - self.add_edge(dependency, term) - - # Add term's mask, which is really just a specially-enumerated - # input. - mask = term.mask - self._add_to_graph(mask, parents, extra_rows=dependency_extra_rows) - self.add_edge(mask, term) + # Recursively add dependencies. + for dependency in term.dependencies: + self._add_to_graph( + dependency, + parents, + extra_rows=dependency_extra_rows, + ) + self.add_edge(dependency, term) parents.remove(term) diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 1dc3974a..31301053 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -1,9 +1,11 @@ """ Base class for Filters, Factors and Classifiers """ +from abc import ABCMeta, abstractproperty from weakref import WeakValueDictionary from numpy import bool_, full, nan +from six import with_metaclass from zipline.errors import ( DTypeNotSpecified, @@ -38,7 +40,7 @@ class NotSpecified(object): return self -class Term(object): +class Term(with_metaclass(ABCMeta, object)): """ Base class for terms in a Pipeline API compute graph. """ @@ -135,15 +137,55 @@ class Term(object): if self.dtype is NotSpecified: raise DTypeNotSpecified(termname=type(self).__name__) - @property - def atomic(self): + @abstractproperty + def inputs(self): """ - Whether or not this term has dependencies. - - If term.atomic is truthy, it should have dataset and dtype attributes. + A tuple of other Terms that this Term requires for computation. """ raise NotImplementedError() + @abstractproperty + def mask(self): + """ + A 2D Filter representing asset/date pairs to include while + computing this Term. (True means include; False means exclude.) + """ + raise NotImplementedError() + + @lazyval + def dependencies(self): + return self.inputs + (self.mask,) + + @lazyval + def atomic(self): + return not any(dep for dep in self.dependencies + if dep is not AssetExists()) + + +class AssetExists(Term): + """ + Pseudo-filter describing whether or not an asset existed on a given day. + This is the default mask for all terms that haven't been passed a mask + explicitly. + + This is morally a Filter, in the sense that it produces a boolean value for + every asset on every date. We don't subclass Filter, however, because + `AssetExists` is computed directly by the PipelineEngine. + + See Also + -------- + zipline.assets.AssetFinder.lifetimes + """ + dtype = bool_ + dataset = None + extra_input_rows = 0 + inputs = () + dependencies = () + mask = None + + def __repr__(self): + return "AssetExists()" + # TODO: Move mixins to a separate file? class SingleInputMixin(object): @@ -220,17 +262,6 @@ class CustomTermMixin(object): return out -class AtomicTerm(Term): - - @property - def atomic(self): - return True - - @property - def dataset(self): - raise NotImplementedError() - - class CompositeTerm(Term): inputs = NotSpecified window_length = NotSpecified @@ -295,10 +326,6 @@ class CompositeTerm(Term): return super(CompositeTerm, self)._validate() - @property - def atomic(self): - return False - def _compute(self, inputs, dates, assets, mask): """ Subclasses should implement this to perform actual computation. @@ -339,31 +366,3 @@ class CompositeTerm(Term): inputs=self.inputs, window_length=self.window_length, ) - - -class AssetExists(AtomicTerm): - """ - Pseudo-filter describing whether or not an asset existed on a given day. - This is the default mask for all terms that haven't been passed a mask - explicitly. - - This is morally a Filter, in the sense that it produces a boolean value for - every asset on every date. We don't subclass Filter, however, because - `AssetExists` is computed directly by the PipelineEngine. - - See Also - -------- - zipline.assets.AssetFinder.lifetimes - """ - dtype = bool_ - dataset = None - - def _compute(self, *args, **kwargs): - # TODO: Consider moving the bulk of the logic from - # SimplePipelineEngine._compute_root_mask here. - raise NotImplementedError( - "Direct computation of AssetExists is not supported!" - ) - - def __repr__(self): - return "AssetExists()" From 99de89c8172b9cc068588525e367a1ae3bec045d Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Fri, 9 Oct 2015 18:17:11 -0400 Subject: [PATCH 08/11] PERF: Don't recalc similar atomic terms --- etc/requirements.txt | 2 +- zipline/pipeline/engine.py | 23 +++++++++++------------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/etc/requirements.txt b/etc/requirements.txt index 7125a63f..5c00af0a 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -47,7 +47,7 @@ bcolz==0.10.0 click==4.0.0 # FUNctional programming utilities -toolz==0.7.2 +toolz==0.7.4 # Asset writer and finder sqlalchemy==1.0.8 diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 8abccb83..64cc1306 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -13,12 +13,13 @@ from six import ( ) from six.moves import zip_longest from numpy import array - from pandas import ( DataFrame, date_range, MultiIndex, ) +from toolz import groupby, juxt +from toolz.curried.operator import getitem from zipline.lib.adjusted_array import ensure_ndarray from zipline.errors import NoFurtherDataError @@ -274,17 +275,10 @@ class SimplePipelineEngine(object): out.append(input_data) return out - def _similar_atomic_terms(self, graph, atomic_term): - loader_dispatch = self.loader_dispatch - loader = loader_dispatch(atomic_term) - extra_rows = graph.extra_rows[atomic_term] - - for term in graph.atomic_terms: - if (loader_dispatch(term) == loader - and graph.extra_rows[term] == extra_rows): - yield term - def loader_dispatch(self, term): + # AssetExists is one of the atomic terms in the graph, so we look up + # a loader here when grouping by loader, but since it's already in the + # workspace, we don't actually use that group. if term is AssetExists(): return None @@ -322,6 +316,11 @@ class SimplePipelineEngine(object): # Copy the supplied initial workspace so we don't mutate it in place. workspace = initial_workspace.copy() + # If atomic terms share the same loader and extra_rows, load them all + # together. + atomic_group_key = juxt(loader_dispatch, getitem(graph.extra_rows)) + atomic_groups = groupby(atomic_group_key, graph.atomic_terms) + for term in graph.ordered(): # `term` may have been supplied in `initial_workspace`, and in the # future we may pre-compute atomic terms coming from the same @@ -338,7 +337,7 @@ class SimplePipelineEngine(object): if term.atomic: to_load = sorted( - self._similar_atomic_terms(graph, term), + atomic_groups[atomic_group_key(term)], key=lambda t: t.dataset ) loader = loader_dispatch(term) From ee26a218554ff2df27fa129ac12e7b012b81c8e6 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Fri, 9 Oct 2015 23:55:15 -0400 Subject: [PATCH 09/11] MAINT: Renamed loader_dispatch to get_loader Now it raises a KeyError instead of returning None, if loader not found. --- tests/pipeline/test_pipeline_algo.py | 12 ++++++------ zipline/algorithm.py | 10 +++++----- zipline/pipeline/engine.py | 24 +++++++++++------------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/tests/pipeline/test_pipeline_algo.py b/tests/pipeline/test_pipeline_algo.py index 09ac4aed..f947d25b 100644 --- a/tests/pipeline/test_pipeline_algo.py +++ b/tests/pipeline/test_pipeline_algo.py @@ -182,7 +182,7 @@ class ClosesOnly(TestCase): initialize=initialize, handle_data=late_attach, data_frequency='daily', - pipeline_loader_dispatch=lambda column: self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -199,7 +199,7 @@ class ClosesOnly(TestCase): before_trading_start=late_attach, handle_data=barf, data_frequency='daily', - pipeline_loader_dispatch=lambda column: self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -228,7 +228,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader_dispatch=lambda column: self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -256,7 +256,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader_dispatch=lambda column: self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -294,7 +294,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader_dispatch=lambda column: self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -524,7 +524,7 @@ class PipelineAlgorithmTestCase(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader_dispatch=lambda column: self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.dates[max(window_lengths)], end=self.dates[-1], env=self.env, diff --git a/zipline/algorithm.py b/zipline/algorithm.py index 83685601..9c43d4df 100644 --- a/zipline/algorithm.py +++ b/zipline/algorithm.py @@ -232,7 +232,7 @@ class TradingAlgorithm(object): self.asset_finder = self.trading_environment.asset_finder # Initialize Pipeline API data. - self.init_engine(kwargs.pop('pipeline_loader_dispatch', None)) + self.init_engine(kwargs.pop('get_pipeline_loader', None)) self._pipelines = {} # Create an always-expired cache so that we compute the first time data # is requested. @@ -323,15 +323,15 @@ class TradingAlgorithm(object): self.initialize_args = args self.initialize_kwargs = kwargs - def init_engine(self, loader_dispatch): + def init_engine(self, get_loader): """ Construct and store a PipelineEngine from loader. - If loader is None, constructs a NoOpPipelineEngine. + If get_loader is None, constructs a NoOpPipelineEngine. """ - if loader_dispatch is not None: + if get_loader is not None: self.engine = SimplePipelineEngine( - loader_dispatch, + get_loader, self.trading_environment.trading_days, self.asset_finder, ) diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 64cc1306..2e4f2e0f 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -83,8 +83,9 @@ class SimplePipelineEngine(object): Parameters ---------- - loader : PipelineLoader - A loader to use to retrieve raw data for atomic terms. + get_loader : callable + A function that is given an atomic term and returns a PipelineLoader + to use to retrieve raw data for that term. calendar : DatetimeIndex Array of dates to consider as trading days when computing a range between a fixed start and end. @@ -93,15 +94,15 @@ class SimplePipelineEngine(object): which assets are in the top-level universe at any point in time. """ __slots__ = [ - '_loader_dispatch', + '_get_loader', '_calendar', '_finder', '_root_mask_term', '__weakref__', ] - def __init__(self, loader_dispatch, calendar, asset_finder): - self._loader_dispatch = loader_dispatch + def __init__(self, get_loader, calendar, asset_finder): + self._get_loader = get_loader self._calendar = calendar self._finder = asset_finder self._root_mask_term = AssetExists() @@ -275,17 +276,14 @@ class SimplePipelineEngine(object): out.append(input_data) return out - def loader_dispatch(self, term): + def get_loader(self, term): # AssetExists is one of the atomic terms in the graph, so we look up # a loader here when grouping by loader, but since it's already in the # workspace, we don't actually use that group. if term is AssetExists(): return None - loader = self._loader_dispatch(term) - if loader is None: - raise ValueError("Couldn't find loader for %s" % term) - return loader + return self._get_loader(term) def compute_chunk(self, graph, dates, assets, initial_workspace): """ @@ -311,14 +309,14 @@ class SimplePipelineEngine(object): Dictionary mapping requested results to outputs. """ self._validate_compute_chunk_params(dates, assets, initial_workspace) - loader_dispatch = self.loader_dispatch + get_loader = self.get_loader # Copy the supplied initial workspace so we don't mutate it in place. workspace = initial_workspace.copy() # If atomic terms share the same loader and extra_rows, load them all # together. - atomic_group_key = juxt(loader_dispatch, getitem(graph.extra_rows)) + atomic_group_key = juxt(get_loader, getitem(graph.extra_rows)) atomic_groups = groupby(atomic_group_key, graph.atomic_terms) for term in graph.ordered(): @@ -340,7 +338,7 @@ class SimplePipelineEngine(object): atomic_groups[atomic_group_key(term)], key=lambda t: t.dataset ) - loader = loader_dispatch(term) + loader = get_loader(term) loaded = loader.load_adjusted_array( to_load, mask_dates, assets, mask, ) From 7a638e4580f89b210dc0cbbb806c59fb30061dd2 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Sat, 10 Oct 2015 00:07:05 -0400 Subject: [PATCH 10/11] STY: Moving args to new line --- tests/pipeline/test_engine.py | 39 +++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 148c6a8b..eda1d237 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -162,8 +162,9 @@ class ConstantInputTestCase(TestCase): def test_bad_dates(self): loader = self.loader - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) p = Pipeline() @@ -177,8 +178,9 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = array(self.assets) - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) num_dates = 5 dates = self.dates[10:10 + num_dates] @@ -201,8 +203,9 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = self.assets - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) result_shape = (num_dates, num_assets) = (5, len(assets)) dates = self.dates[10:10 + num_dates] @@ -235,8 +238,9 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = self.assets - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) shape = num_dates, num_assets = (5, len(assets)) dates = self.dates[10:10 + num_dates] @@ -279,8 +283,9 @@ class ConstantInputTestCase(TestCase): def test_numeric_factor(self): constants = self.constants loader = self.loader - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) num_dates = 5 dates = self.dates[10:10 + num_dates] high, low = USEquityPricing.high, USEquityPricing.low @@ -338,8 +343,9 @@ class ConstantInputTestCase(TestCase): dates=self.dates, assets=self.assets, ) - engine = SimplePipelineEngine(lambda column: loader, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) sumdiff = RollingSumDifference() @@ -401,10 +407,11 @@ class ConstantInputTestCase(TestCase): dates=self.dates, assets=self.assets) - engine = SimplePipelineEngine(lambda column: loader2 - if column.dataset == Loader2DataSet - else loader1, - self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: + loader2 if column.dataset == Loader2DataSet else loader1, + self.dates, self.asset_finder, + ) pipe_col1 = RollingSumSum(inputs=[Loader1DataSet1.col1, Loader1DataSet2.col1, From 952da68610d1cd075273beedaace964e9d50fb60 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Mon, 12 Oct 2015 09:11:36 -0400 Subject: [PATCH 11/11] DOC: Fixed docstring --- zipline/pipeline/factors/factor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 5855777d..4c22d1ef 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -450,7 +450,7 @@ class CustomFactor(RequiredWindowLengthMixin, CustomTermMixin, Factor): class-level attribute named `inputs`. window_length : int, optional Number of rows of rows to pass for each input. If this - argument is passed to the CustomFactor constructor, we look for a + argument is not passed to the CustomFactor constructor, we look for a class-level attribute named `window_length`. Notes