From 6aa885dbebd21dd04389a155ce1c0f5a9c46d2e0 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Tue, 13 Sep 2016 23:28:25 -0400 Subject: [PATCH 1/3] PERF: Release unneeded pipeline terms. Refcount pipeline terms during execution and release terms once they're no longer needed. This dramatically reduces memory usage on large pipelines. --- zipline/pipeline/engine.py | 10 ++++++++++ zipline/pipeline/graph.py | 13 +++++++++++++ 2 files changed, 23 insertions(+) diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 2fc54c5a..93d296ed 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -344,6 +344,8 @@ class SimplePipelineEngine(object): loader_group_key = juxt(get_loader, getitem(graph.extra_rows)) loader_groups = groupby(loader_group_key, graph.loadable_terms) + refcounts = graph.initial_refcounts() + for term in graph.ordered(): # `term` may have been supplied in `initial_workspace`, and in the # future we may pre-compute loadable terms coming from the same @@ -380,6 +382,14 @@ class SimplePipelineEngine(object): else: assert workspace[term].shape == (mask.shape[0], 1) + # Decref any term we depended on. + for (parent, _) in graph.in_edges(term): + refcounts[parent] -= 1 + # No one else depends on this term. Remove it from the + # workspace to conserve memory. + if refcounts[parent] == 0: + del workspace[parent] + out = {} graph_extra_rows = graph.extra_rows for name, term in iteritems(graph.outputs): diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 9ff09d45..9078c7ea 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -119,6 +119,19 @@ class TermGraph(DiGraph): def _repr_png_(self): return self.png.data + def initial_refcounts(self): + """ + Calculate initial refcounts for execution of this graph. + + Each node starts with a refcount equal to its outdegree, and output + nodes get one extra reference to ensure that they're still in the graph + at the end of execution. + """ + refcounts = self.out_degree() + for t in self.outputs.values(): + refcounts[t] += 1 + return refcounts + class ExecutionPlan(TermGraph): """ From a0e1b881aa07aa5c20419bc8e89e9c60428badf3 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 14 Sep 2016 11:16:40 -0400 Subject: [PATCH 2/3] MAINT: Move refcount management into TermGraph. --- zipline/pipeline/engine.py | 12 ++++-------- zipline/pipeline/graph.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index 93d296ed..e17331a3 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -344,7 +344,7 @@ class SimplePipelineEngine(object): loader_group_key = juxt(get_loader, getitem(graph.extra_rows)) loader_groups = groupby(loader_group_key, graph.loadable_terms) - refcounts = graph.initial_refcounts() + refcounts = graph.initial_refcounts(workspace) for term in graph.ordered(): # `term` may have been supplied in `initial_workspace`, and in the @@ -382,13 +382,9 @@ class SimplePipelineEngine(object): else: assert workspace[term].shape == (mask.shape[0], 1) - # Decref any term we depended on. - for (parent, _) in graph.in_edges(term): - refcounts[parent] -= 1 - # No one else depends on this term. Remove it from the - # workspace to conserve memory. - if refcounts[parent] == 0: - del workspace[parent] + # Decref dependencies of ``term``, and clear any terms whose refcounts hit 0. + for garbage_term in graph.decref_dependencies(term, refcounts): + del workspace[garbage_term] out = {} graph_extra_rows = graph.extra_rows diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 9078c7ea..78dffc14 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -119,7 +119,7 @@ class TermGraph(DiGraph): def _repr_png_(self): return self.png.data - def initial_refcounts(self): + def initial_refcounts(self, initial_workspace): """ Calculate initial refcounts for execution of this graph. @@ -130,8 +130,38 @@ class TermGraph(DiGraph): refcounts = self.out_degree() for t in self.outputs.values(): refcounts[t] += 1 + + for t in initial_workspace: + self.decref_dependencies(t, refcounts) + return refcounts + def decref_dependencies(self, term, refcounts): + """ + Decrement in-edges for ``term`` after computation. + + Parameters + ---------- + term : zipline.pipeline.Term + The term whose parents should be decref'ed. + refcounts : dict[Term -> int] + Dictionary of refcounts. + + Return + ------ + garbage : set[Term] + Terms whose refcounts hit zero after decrefing. + """ + garbage = set() + # Edges are tuple of (from, to). + for parent, _ in self.in_edges([term]): + refcounts[parent] -= 1 + # No one else depends on this term. Remove it from the + # workspace to conserve memory. + if refcounts[parent] == 0: + garbage.add(parent) + return garbage + class ExecutionPlan(TermGraph): """ From 7c58bf1e5b014d5bedf16de1abd605d90f726992 Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Wed, 14 Sep 2016 14:45:00 -0400 Subject: [PATCH 3/3] STY: Flake8 and parameter rename. --- zipline/pipeline/engine.py | 3 ++- zipline/pipeline/graph.py | 9 +++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index e17331a3..4bb6f6b0 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -382,7 +382,8 @@ class SimplePipelineEngine(object): else: assert workspace[term].shape == (mask.shape[0], 1) - # Decref dependencies of ``term``, and clear any terms whose refcounts hit 0. + # Decref dependencies of ``term``, and clear any terms whose + # refcounts hit 0. for garbage_term in graph.decref_dependencies(term, refcounts): del workspace[garbage_term] diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 78dffc14..667c3be9 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -119,10 +119,15 @@ class TermGraph(DiGraph): def _repr_png_(self): return self.png.data - def initial_refcounts(self, initial_workspace): + def initial_refcounts(self, initial_terms): """ Calculate initial refcounts for execution of this graph. + Parameters + ---------- + initial_terms : iterable[Term] + An iterable of terms that were pre-computed before graph execution. + Each node starts with a refcount equal to its outdegree, and output nodes get one extra reference to ensure that they're still in the graph at the end of execution. @@ -131,7 +136,7 @@ class TermGraph(DiGraph): for t in self.outputs.values(): refcounts[t] += 1 - for t in initial_workspace: + for t in initial_terms: self.decref_dependencies(t, refcounts) return refcounts