From 23ca58813a235a740d09f87822c9a9872f0c6ece Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Fri, 9 Oct 2015 17:33:25 -0400 Subject: [PATCH] PERF: Speed up reading of adjustments. For a pipeline doing simple computations on USEquityPricing data, we were spending ~60% of `run_pipeline` loading adjustments. Almost all of that time was spent in calls to `DatetimeIndex.get_loc` to find the indices of adjustment `eff_date`s. This optimizes the eff_date lookups by pre-populating a cache of seconds-since-epoch timestamps that we expect to see, and falling back to `np.searchsorted` on cache misses. In testing, this reduces the time to compute a 1-year pipeline with 30 and 90 day moving averages from 3.1 seconds to 0.9 seconds. --- zipline/data/_adjustments.pyx | 42 ++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/zipline/data/_adjustments.pyx b/zipline/data/_adjustments.pyx index 9a6a8b42..8f923ee0 100644 --- a/zipline/data/_adjustments.pyx +++ b/zipline/data/_adjustments.pyx @@ -21,6 +21,7 @@ from numpy import ( uint32, zeros, ) +from numpy cimport int64_t, ndarray from pandas import Timestamp ctypedef object Timestamp_t @@ -195,24 +196,30 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection cdef list results = [{} for column in columns] cdef dict asset_ixs = {} # Cache sid lookups here. + cdef dict date_ixs = {} cdef: + int i + int dt int sid double ratio int eff_date int date_loc Py_ssize_t asset_ix - int i dict col_adjustments + cdef ndarray[int64_t, ndim=1] _dates_seconds = \ + dates.values.astype('datetime64[s]').view(int) + + # Pre-populate date index cache. + for i, dt in enumerate(_dates_seconds): + date_ixs[dt] = i + # splits affect prices and volumes, volumes is the inverse for sid, ratio, eff_date in splits: if eff_date < start_date: continue - date_loc = dates.get_loc( - Timestamp(eff_date, unit='s', tz='UTC'), - # Get the first date **on or after** the effective date. - method='bfill', - ) + + date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds) if not PyDict_Contains(asset_ixs, sid): asset_ixs[sid] = assets.get_loc(sid) @@ -239,11 +246,8 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection for sid, ratio, eff_date in mergers: if eff_date < start_date: continue - date_loc = dates.get_loc( - Timestamp(eff_date, unit='s', tz='UTC'), - # Get the first date **on or after** the effective date. - method='bfill', - ) + + date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds) if not PyDict_Contains(asset_ixs, sid): asset_ixs[sid] = assets.get_loc(sid) @@ -262,11 +266,8 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection for sid, ratio, eff_date in dividends: if eff_date < start_date: continue - date_loc = dates.get_loc( - Timestamp(eff_date, unit='s', tz='UTC'), - # Get the first date **on or after** the effective date. - method='bfill', - ) + + date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds) if not PyDict_Contains(asset_ixs, sid): asset_ixs[sid] = assets.get_loc(sid) @@ -282,3 +283,12 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection col_adjustments[date_loc] = [adj] return results + + +cdef _lookup_dt(dict dt_cache, + int dt, + ndarray[int64_t, ndim=1] fallback): + + if not PyDict_Contains(dt_cache, dt): + dt_cache[dt] = fallback.searchsorted(dt, side='right') + return dt_cache[dt]