mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 05:14:14 +08:00
Merge pull request #1157 from quantopian/use-carray-instead-of-read-all-on-small-size
PERF: Improve read time for smaller num of assets.
This commit is contained in:
@@ -12,6 +12,8 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from sys import maxsize
|
||||
|
||||
from nose_parameterized import parameterized
|
||||
from numpy import (
|
||||
arange,
|
||||
@@ -330,3 +332,21 @@ class BcolzDailyBarTestCase(WithBcolzDailyBarReader, ZiplineTestCase):
|
||||
self.assertEqual(-1, close)
|
||||
finally:
|
||||
reader._spot_col('close')[zero_ix] = old
|
||||
|
||||
|
||||
class BcolzDailyBarAlwaysReadAllTestCase(BcolzDailyBarTestCase):
|
||||
"""
|
||||
Force tests defined in BcolzDailyBarTestCase to always read the entire
|
||||
column into memory before selecting desired asset data, when invoking
|
||||
`load_raw_array`.
|
||||
"""
|
||||
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = 0
|
||||
|
||||
|
||||
class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase):
|
||||
"""
|
||||
Force tests defined in BcolzDailyBarTestCase to never read the entire
|
||||
column into memory before selecting desired asset data, when invoking
|
||||
`load_raw_array`.
|
||||
"""
|
||||
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
import bcolz
|
||||
cimport cython
|
||||
from cpython cimport bool
|
||||
|
||||
from numpy import (
|
||||
array,
|
||||
@@ -31,6 +32,7 @@ from numpy cimport (
|
||||
)
|
||||
from numpy.math cimport NAN
|
||||
|
||||
ctypedef object carray_t
|
||||
ctypedef object ctable_t
|
||||
ctypedef object Timestamp_t
|
||||
ctypedef object DatetimeIndex_t
|
||||
@@ -134,7 +136,8 @@ cpdef _read_bcolz_data(ctable_t table,
|
||||
list columns,
|
||||
intp_t[:] first_rows,
|
||||
intp_t[:] last_rows,
|
||||
intp_t[:] offsets):
|
||||
intp_t[:] offsets,
|
||||
bool read_all):
|
||||
"""
|
||||
Load raw bcolz data for the given columns and indices.
|
||||
|
||||
@@ -151,6 +154,9 @@ cpdef _read_bcolz_data(ctable_t table,
|
||||
last_rows : ndarray[intp]
|
||||
offsets : ndarray[intp
|
||||
Arrays in the format returned by _compute_row_slices.
|
||||
read_all : bool
|
||||
Whether to read_all sid data at once, or to read a silce from the
|
||||
carray for each sid.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -160,6 +166,7 @@ cpdef _read_bcolz_data(ctable_t table,
|
||||
cdef:
|
||||
int nassets
|
||||
str column_name
|
||||
carray_t carray
|
||||
ndarray[dtype=uint32_t, ndim=1] raw_data
|
||||
ndarray[dtype=uint32_t, ndim=2] outbuf
|
||||
ndarray[dtype=uint8_t, ndim=2, cast=True] where_nan
|
||||
@@ -172,20 +179,40 @@ cpdef _read_bcolz_data(ctable_t table,
|
||||
intp_t offset
|
||||
list results = []
|
||||
|
||||
ndays = shape[0]
|
||||
nassets = shape[1]
|
||||
if not nassets== len(first_rows) == len(last_rows) == len(offsets):
|
||||
raise ValueError("Incompatible index arrays.")
|
||||
|
||||
for column_name in columns:
|
||||
raw_data = table[column_name][:]
|
||||
outbuf = zeros(shape=shape, dtype=uint32)
|
||||
for asset in range(nassets):
|
||||
first_row = first_rows[asset]
|
||||
last_row = last_rows[asset]
|
||||
offset = offsets[asset]
|
||||
for out_idx, raw_idx in enumerate(range(first_row, last_row + 1)):
|
||||
outbuf[out_idx + offset, asset] = raw_data[raw_idx]
|
||||
if read_all:
|
||||
raw_data = table[column_name][:]
|
||||
|
||||
for asset in range(nassets):
|
||||
first_row = first_rows[asset]
|
||||
last_row = last_rows[asset]
|
||||
offset = offsets[asset]
|
||||
if first_row <= last_row:
|
||||
outbuf[offset:offset + (last_row + 1 - first_row), asset] =\
|
||||
raw_data[first_row:last_row + 1]
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
carray = table[column_name]
|
||||
|
||||
for asset in range(nassets):
|
||||
first_row = first_rows[asset]
|
||||
last_row = last_rows[asset]
|
||||
offset = offsets[asset]
|
||||
out_start = offset
|
||||
out_end = (last_row - first_row) + offset + 1
|
||||
if first_row <= last_row:
|
||||
outbuf[offset:offset + (last_row + 1 - first_row), asset] =\
|
||||
carray[first_row:last_row + 1]
|
||||
else:
|
||||
continue
|
||||
|
||||
if column_name in {'open', 'high', 'low', 'close'}:
|
||||
where_nan = (outbuf == 0)
|
||||
outbuf_as_float = outbuf.astype(float64) * .001
|
||||
|
||||
@@ -417,6 +417,21 @@ class BcolzDailyBarReader(DailyBarReader):
|
||||
When read across the open, high, low, close, and volume with the same
|
||||
index should represent the same asset and day.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
table : bcolz.ctable
|
||||
The ctable contaning the pricing data, with attrs corresponding to the
|
||||
Attributes list below.
|
||||
read_all_threshold : int
|
||||
The number of equities at which;
|
||||
below, the data is read by reading a slice from the carray
|
||||
per asset.
|
||||
above, the data is read by pulling all of the data for all assets
|
||||
into memory and then indexing into that array for each day and
|
||||
asset pair.
|
||||
Used to tune performance of reads when using a small or large number
|
||||
of equities.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
The table with which this loader interacts contains the following
|
||||
@@ -438,7 +453,7 @@ class BcolzDailyBarReader(DailyBarReader):
|
||||
range of queried dates.
|
||||
"""
|
||||
@preprocess(table=coerce_string(open_ctable, mode='r'))
|
||||
def __init__(self, table):
|
||||
def __init__(self, table, read_all_threshold=3000):
|
||||
|
||||
self._table = table
|
||||
# Cache of fully read np.array for the carrays in the daily bar table.
|
||||
@@ -447,6 +462,7 @@ class BcolzDailyBarReader(DailyBarReader):
|
||||
# process first.
|
||||
self._spot_cols = {}
|
||||
self.PRICE_ADJUSTMENT_FACTOR = 0.001
|
||||
self._read_all_threshold = read_all_threshold
|
||||
|
||||
@lazyval
|
||||
def _calendar(self):
|
||||
@@ -545,6 +561,7 @@ class BcolzDailyBarReader(DailyBarReader):
|
||||
end_idx,
|
||||
assets,
|
||||
)
|
||||
read_all = len(assets) > self._read_all_threshold
|
||||
return _read_bcolz_data(
|
||||
self._table,
|
||||
(end_idx - start_idx + 1, len(assets)),
|
||||
@@ -552,6 +569,7 @@ class BcolzDailyBarReader(DailyBarReader):
|
||||
first_rows,
|
||||
last_rows,
|
||||
offsets,
|
||||
read_all,
|
||||
)
|
||||
|
||||
def _spot_col(self, colname):
|
||||
|
||||
@@ -578,6 +578,9 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir):
|
||||
If this flag is set the ``bcolz_daily_bar_days`` will be the full
|
||||
set of trading days from the trading environment. This flag overrides
|
||||
``BCOLZ_DAILY_BAR_LOOKBACK_DAYS``.
|
||||
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD : int
|
||||
If this flag is set, use the value as the `read_all_threshold`
|
||||
parameter to BcolzDailyBarReader, otherwise use the default value.
|
||||
|
||||
Methods
|
||||
-------
|
||||
@@ -599,6 +602,7 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir):
|
||||
BCOLZ_DAILY_BAR_USE_FULL_CALENDAR = False
|
||||
BCOLZ_DAILY_BAR_START_DATE = alias('START_DATE')
|
||||
BCOLZ_DAILY_BAR_END_DATE = alias('END_DATE')
|
||||
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = None
|
||||
# allows WithBcolzDailyBarReaderFromCSVs to call the `write_csvs` method
|
||||
# without needing to reimplement `init_class_fixtures`
|
||||
_write_method_name = 'write'
|
||||
@@ -632,7 +636,11 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir):
|
||||
cls._write_method_name,
|
||||
)(cls.make_daily_bar_data())
|
||||
|
||||
cls.bcolz_daily_bar_reader = BcolzDailyBarReader(t)
|
||||
if cls.BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD is not None:
|
||||
cls.bcolz_daily_bar_reader = BcolzDailyBarReader(
|
||||
t, cls.BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD)
|
||||
else:
|
||||
cls.bcolz_daily_bar_reader = BcolzDailyBarReader(t)
|
||||
|
||||
|
||||
class WithBcolzDailyBarReaderFromCSVs(WithBcolzDailyBarReader):
|
||||
|
||||
Reference in New Issue
Block a user