diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 2fc54c5a..4bb6f6b0 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -344,6 +344,8 @@ class SimplePipelineEngine(object): loader_group_key = juxt(get_loader, getitem(graph.extra_rows)) loader_groups = groupby(loader_group_key, graph.loadable_terms) + refcounts = graph.initial_refcounts(workspace) + for term in graph.ordered(): # `term` may have been supplied in `initial_workspace`, and in the # future we may pre-compute loadable terms coming from the same @@ -380,6 +382,11 @@ class SimplePipelineEngine(object): else: assert workspace[term].shape == (mask.shape[0], 1) + # Decref dependencies of ``term``, and clear any terms whose + # refcounts hit 0. + for garbage_term in graph.decref_dependencies(term, refcounts): + del workspace[garbage_term] + out = {} graph_extra_rows = graph.extra_rows for name, term in iteritems(graph.outputs): diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 9ff09d45..667c3be9 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -119,6 +119,54 @@ class TermGraph(DiGraph): def _repr_png_(self): return self.png.data + def initial_refcounts(self, initial_terms): + """ + Calculate initial refcounts for execution of this graph. + + Parameters + ---------- + initial_terms : iterable[Term] + An iterable of terms that were pre-computed before graph execution. + + Each node starts with a refcount equal to its outdegree, and output + nodes get one extra reference to ensure that they're still in the graph + at the end of execution. + """ + refcounts = self.out_degree() + for t in self.outputs.values(): + refcounts[t] += 1 + + for t in initial_terms: + self.decref_dependencies(t, refcounts) + + return refcounts + + def decref_dependencies(self, term, refcounts): + """ + Decrement in-edges for ``term`` after computation. + + Parameters + ---------- + term : zipline.pipeline.Term + The term whose parents should be decref'ed. + refcounts : dict[Term -> int] + Dictionary of refcounts. + + Return + ------ + garbage : set[Term] + Terms whose refcounts hit zero after decrefing. + """ + garbage = set() + # Edges are tuple of (from, to). + for parent, _ in self.in_edges([term]): + refcounts[parent] -= 1 + # No one else depends on this term. Remove it from the + # workspace to conserve memory. + if refcounts[parent] == 0: + garbage.add(parent) + return garbage + class ExecutionPlan(TermGraph): """