mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-03 18:21:25 +08:00
Merge pull request #762 from quantopian/adjustment-perf
PERF: Speed up reading of adjustments.
This commit is contained in:
@@ -21,6 +21,7 @@ from numpy import (
|
||||
uint32,
|
||||
zeros,
|
||||
)
|
||||
from numpy cimport int64_t, ndarray
|
||||
from pandas import Timestamp
|
||||
|
||||
ctypedef object Timestamp_t
|
||||
@@ -195,24 +196,30 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
|
||||
|
||||
cdef list results = [{} for column in columns]
|
||||
cdef dict asset_ixs = {} # Cache sid lookups here.
|
||||
cdef dict date_ixs = {}
|
||||
cdef:
|
||||
int i
|
||||
int dt
|
||||
int sid
|
||||
double ratio
|
||||
int eff_date
|
||||
int date_loc
|
||||
Py_ssize_t asset_ix
|
||||
int i
|
||||
dict col_adjustments
|
||||
|
||||
cdef ndarray[int64_t, ndim=1] _dates_seconds = \
|
||||
dates.values.astype('datetime64[s]').view(int)
|
||||
|
||||
# Pre-populate date index cache.
|
||||
for i, dt in enumerate(_dates_seconds):
|
||||
date_ixs[dt] = i
|
||||
|
||||
# splits affect prices and volumes, volumes is the inverse
|
||||
for sid, ratio, eff_date in splits:
|
||||
if eff_date < start_date:
|
||||
continue
|
||||
date_loc = dates.get_loc(
|
||||
Timestamp(eff_date, unit='s', tz='UTC'),
|
||||
# Get the first date **on or after** the effective date.
|
||||
method='bfill',
|
||||
)
|
||||
|
||||
date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds)
|
||||
|
||||
if not PyDict_Contains(asset_ixs, sid):
|
||||
asset_ixs[sid] = assets.get_loc(sid)
|
||||
@@ -239,11 +246,8 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
|
||||
for sid, ratio, eff_date in mergers:
|
||||
if eff_date < start_date:
|
||||
continue
|
||||
date_loc = dates.get_loc(
|
||||
Timestamp(eff_date, unit='s', tz='UTC'),
|
||||
# Get the first date **on or after** the effective date.
|
||||
method='bfill',
|
||||
)
|
||||
|
||||
date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds)
|
||||
|
||||
if not PyDict_Contains(asset_ixs, sid):
|
||||
asset_ixs[sid] = assets.get_loc(sid)
|
||||
@@ -262,11 +266,8 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
|
||||
for sid, ratio, eff_date in dividends:
|
||||
if eff_date < start_date:
|
||||
continue
|
||||
date_loc = dates.get_loc(
|
||||
Timestamp(eff_date, unit='s', tz='UTC'),
|
||||
# Get the first date **on or after** the effective date.
|
||||
method='bfill',
|
||||
)
|
||||
|
||||
date_loc = _lookup_dt(date_ixs, eff_date, _dates_seconds)
|
||||
|
||||
if not PyDict_Contains(asset_ixs, sid):
|
||||
asset_ixs[sid] = assets.get_loc(sid)
|
||||
@@ -282,3 +283,12 @@ cpdef load_adjustments_from_sqlite(object adjustments_db, # sqlite3.Connection
|
||||
col_adjustments[date_loc] = [adj]
|
||||
|
||||
return results
|
||||
|
||||
|
||||
cdef _lookup_dt(dict dt_cache,
|
||||
int dt,
|
||||
ndarray[int64_t, ndim=1] fallback):
|
||||
|
||||
if not PyDict_Contains(dt_cache, dt):
|
||||
dt_cache[dt] = fallback.searchsorted(dt, side='right')
|
||||
return dt_cache[dt]
|
||||
|
||||
Reference in New Issue
Block a user