Merge pull request #1484 from quantopian/refcount-pipelines

PERF: Release unneeded pipeline terms.
This commit is contained in:
Scott Sanderson
2016-09-14 20:24:10 -04:00
committed by GitHub
2 changed files with 55 additions and 0 deletions
+7
View File
@@ -344,6 +344,8 @@ class SimplePipelineEngine(object):
loader_group_key = juxt(get_loader, getitem(graph.extra_rows))
loader_groups = groupby(loader_group_key, graph.loadable_terms)
refcounts = graph.initial_refcounts(workspace)
for term in graph.ordered():
# `term` may have been supplied in `initial_workspace`, and in the
# future we may pre-compute loadable terms coming from the same
@@ -380,6 +382,11 @@ class SimplePipelineEngine(object):
else:
assert workspace[term].shape == (mask.shape[0], 1)
# Decref dependencies of ``term``, and clear any terms whose
# refcounts hit 0.
for garbage_term in graph.decref_dependencies(term, refcounts):
del workspace[garbage_term]
out = {}
graph_extra_rows = graph.extra_rows
for name, term in iteritems(graph.outputs):
+48
View File
@@ -119,6 +119,54 @@ class TermGraph(DiGraph):
def _repr_png_(self):
return self.png.data
def initial_refcounts(self, initial_terms):
"""
Calculate initial refcounts for execution of this graph.
Parameters
----------
initial_terms : iterable[Term]
An iterable of terms that were pre-computed before graph execution.
Each node starts with a refcount equal to its outdegree, and output
nodes get one extra reference to ensure that they're still in the graph
at the end of execution.
"""
refcounts = self.out_degree()
for t in self.outputs.values():
refcounts[t] += 1
for t in initial_terms:
self.decref_dependencies(t, refcounts)
return refcounts
def decref_dependencies(self, term, refcounts):
"""
Decrement in-edges for ``term`` after computation.
Parameters
----------
term : zipline.pipeline.Term
The term whose parents should be decref'ed.
refcounts : dict[Term -> int]
Dictionary of refcounts.
Return
------
garbage : set[Term]
Terms whose refcounts hit zero after decrefing.
"""
garbage = set()
# Edges are tuple of (from, to).
for parent, _ in self.in_edges([term]):
refcounts[parent] -= 1
# No one else depends on this term. Remove it from the
# workspace to conserve memory.
if refcounts[parent] == 0:
garbage.add(parent)
return garbage
class ExecutionPlan(TermGraph):
"""