mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 09:02:58 +08:00
PERF: Deterministically GC pipeline results.
Any DataFrame that's had `.loc` or `.iloc `called on it participates in a cycle, which means they're not immediately garbage collected when they go out of scope. This matters for pipeline results because they consume multiple megabytes per column, which means that a pipeline result with many columns can hold take up over 100MB. By manually breaking DataFrame cycles, we can ensure that we never hold multiple pipeline results in memory at once.
This commit is contained in:
@@ -22,6 +22,7 @@ import pandas as pd
|
||||
from contextlib2 import ExitStack
|
||||
from pandas.tseries.tools import normalize_date
|
||||
import numpy as np
|
||||
import sys
|
||||
|
||||
from itertools import chain, repeat
|
||||
from numbers import Integral
|
||||
@@ -125,6 +126,7 @@ from zipline.utils.math_utils import (
|
||||
tolerant_equals,
|
||||
round_if_near_integer
|
||||
)
|
||||
from zipline.utils.pandas_utils import clear_dataframe_indexer_caches
|
||||
from zipline.utils.preprocess import preprocess
|
||||
from zipline.utils.security_list import SecurityList
|
||||
|
||||
@@ -2344,6 +2346,30 @@ class TradingAlgorithm(object):
|
||||
try:
|
||||
data = self._pipeline_cache.unwrap(today)
|
||||
except Expired:
|
||||
# Try to deterministically garbage collect the previous result by
|
||||
# removing any references to it. There are at least three sources
|
||||
# of references:
|
||||
|
||||
# 1. self._pipeline_cache holds a reference.
|
||||
# 2. The dataframe itself holds a reference via cached .iloc/.loc
|
||||
# accessors.
|
||||
# 3. The traceback held in sys.exc_info includes stack frames in
|
||||
# which self._pipeline_cache is a local variable.
|
||||
|
||||
# We remove the above sources of references in reverse order:
|
||||
|
||||
# 3. Clear the traceback.
|
||||
sys.exc_clear()
|
||||
|
||||
# 2. Clear the .loc/.iloc caches.
|
||||
clear_dataframe_indexer_caches(
|
||||
self._pipeline_cache._unsafe_get_value()
|
||||
)
|
||||
|
||||
# 1. Clear the reference to self._pipeline_cache.
|
||||
self._pipeline_cache = None
|
||||
|
||||
# Calculate the next block.
|
||||
data, valid_until = self._run_pipeline(
|
||||
pipeline, today, next(chunks),
|
||||
)
|
||||
|
||||
+12
-5
@@ -1,7 +1,7 @@
|
||||
"""
|
||||
Caching utilities for zipline
|
||||
"""
|
||||
from collections import namedtuple, MutableMapping
|
||||
from collections import MutableMapping
|
||||
import errno
|
||||
import os
|
||||
import pickle
|
||||
@@ -20,7 +20,7 @@ class Expired(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class CachedObject(namedtuple("_CachedObject", "value expires")):
|
||||
class CachedObject(object):
|
||||
"""
|
||||
A simple struct for maintaining a cached object with an expiration date.
|
||||
|
||||
@@ -47,6 +47,9 @@ class CachedObject(namedtuple("_CachedObject", "value expires")):
|
||||
...
|
||||
Expired: 2014-01-01 00:00:00+00:00
|
||||
"""
|
||||
def __init__(self, value, expires):
|
||||
self._value = value
|
||||
self._expires = expires
|
||||
|
||||
def unwrap(self, dt):
|
||||
"""
|
||||
@@ -62,9 +65,13 @@ class CachedObject(namedtuple("_CachedObject", "value expires")):
|
||||
Expired
|
||||
Raised when `dt` is greater than self.expires.
|
||||
"""
|
||||
if dt > self.expires:
|
||||
raise Expired(self.expires)
|
||||
return self.value
|
||||
if dt > self._expires:
|
||||
raise Expired(self._expires)
|
||||
return self._value
|
||||
|
||||
def _unsafe_get_value(self):
|
||||
"""You almost certainly shouldn't use this."""
|
||||
return self._value
|
||||
|
||||
|
||||
class ExpiringCache(object):
|
||||
|
||||
@@ -166,3 +166,22 @@ def ignore_pandas_nan_categorical_warning():
|
||||
category=FutureWarning,
|
||||
)
|
||||
yield
|
||||
|
||||
|
||||
def clear_dataframe_indexer_caches(df):
|
||||
"""
|
||||
Clear cached attributes from a pandas DataFrame.
|
||||
|
||||
By default pandas memoizes `iloc`, `loc` objects on DataFrames, resulting
|
||||
in refcycles that can lead to unexpectedly long-lived DataFrames. This
|
||||
function attempts to clear those cycles.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
df : pd.DataFrame
|
||||
"""
|
||||
for attr in ('_loc', '_iloc'):
|
||||
try:
|
||||
delattr(df, attr)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user