diff --git a/tests/data/test_us_equity_pricing.py b/tests/data/test_us_equity_pricing.py index b289c494..9391809b 100644 --- a/tests/data/test_us_equity_pricing.py +++ b/tests/data/test_us_equity_pricing.py @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from sys import maxsize + from nose_parameterized import parameterized from numpy import ( arange, @@ -330,3 +332,21 @@ class BcolzDailyBarTestCase(WithBcolzDailyBarReader, ZiplineTestCase): self.assertEqual(-1, close) finally: reader._spot_col('close')[zero_ix] = old + + +class BcolzDailyBarAlwaysReadAllTestCase(BcolzDailyBarTestCase): + """ + Force tests defined in BcolzDailyBarTestCase to always read the entire + column into memory before selecting desired asset data, when invoking + `load_raw_array`. + """ + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = 0 + + +class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase): + """ + Force tests defined in BcolzDailyBarTestCase to never read the entire + column into memory before selecting desired asset data, when invoking + `load_raw_array`. + """ + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize diff --git a/zipline/data/_equities.pyx b/zipline/data/_equities.pyx index 36cce95d..caa8ae69 100644 --- a/zipline/data/_equities.pyx +++ b/zipline/data/_equities.pyx @@ -14,6 +14,7 @@ # limitations under the License. import bcolz cimport cython +from cpython cimport bool from numpy import ( array, @@ -31,6 +32,7 @@ from numpy cimport ( ) from numpy.math cimport NAN +ctypedef object carray_t ctypedef object ctable_t ctypedef object Timestamp_t ctypedef object DatetimeIndex_t @@ -134,7 +136,8 @@ cpdef _read_bcolz_data(ctable_t table, list columns, intp_t[:] first_rows, intp_t[:] last_rows, - intp_t[:] offsets): + intp_t[:] offsets, + bool read_all): """ Load raw bcolz data for the given columns and indices. @@ -151,6 +154,9 @@ cpdef _read_bcolz_data(ctable_t table, last_rows : ndarray[intp] offsets : ndarray[intp Arrays in the format returned by _compute_row_slices. + read_all : bool + Whether to read_all sid data at once, or to read a silce from the + carray for each sid. Returns ------- @@ -160,6 +166,7 @@ cpdef _read_bcolz_data(ctable_t table, cdef: int nassets str column_name + carray_t carray ndarray[dtype=uint32_t, ndim=1] raw_data ndarray[dtype=uint32_t, ndim=2] outbuf ndarray[dtype=uint8_t, ndim=2, cast=True] where_nan @@ -172,20 +179,40 @@ cpdef _read_bcolz_data(ctable_t table, intp_t offset list results = [] + ndays = shape[0] nassets = shape[1] if not nassets== len(first_rows) == len(last_rows) == len(offsets): raise ValueError("Incompatible index arrays.") for column_name in columns: - raw_data = table[column_name][:] outbuf = zeros(shape=shape, dtype=uint32) - for asset in range(nassets): - first_row = first_rows[asset] - last_row = last_rows[asset] - offset = offsets[asset] - for out_idx, raw_idx in enumerate(range(first_row, last_row + 1)): - outbuf[out_idx + offset, asset] = raw_data[raw_idx] + if read_all: + raw_data = table[column_name][:] + for asset in range(nassets): + first_row = first_rows[asset] + last_row = last_rows[asset] + offset = offsets[asset] + if first_row <= last_row: + outbuf[offset:offset + (last_row + 1 - first_row), asset] =\ + raw_data[first_row:last_row + 1] + else: + continue + else: + carray = table[column_name] + + for asset in range(nassets): + first_row = first_rows[asset] + last_row = last_rows[asset] + offset = offsets[asset] + out_start = offset + out_end = (last_row - first_row) + offset + 1 + if first_row <= last_row: + outbuf[offset:offset + (last_row + 1 - first_row), asset] =\ + carray[first_row:last_row + 1] + else: + continue + if column_name in {'open', 'high', 'low', 'close'}: where_nan = (outbuf == 0) outbuf_as_float = outbuf.astype(float64) * .001 diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index a9b41bb3..d54e209b 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -417,6 +417,21 @@ class BcolzDailyBarReader(DailyBarReader): When read across the open, high, low, close, and volume with the same index should represent the same asset and day. + Parameters + ---------- + table : bcolz.ctable + The ctable contaning the pricing data, with attrs corresponding to the + Attributes list below. + read_all_threshold : int + The number of equities at which; + below, the data is read by reading a slice from the carray + per asset. + above, the data is read by pulling all of the data for all assets + into memory and then indexing into that array for each day and + asset pair. + Used to tune performance of reads when using a small or large number + of equities. + Attributes ---------- The table with which this loader interacts contains the following @@ -438,7 +453,7 @@ class BcolzDailyBarReader(DailyBarReader): range of queried dates. """ @preprocess(table=coerce_string(open_ctable, mode='r')) - def __init__(self, table): + def __init__(self, table, read_all_threshold=3000): self._table = table # Cache of fully read np.array for the carrays in the daily bar table. @@ -447,6 +462,7 @@ class BcolzDailyBarReader(DailyBarReader): # process first. self._spot_cols = {} self.PRICE_ADJUSTMENT_FACTOR = 0.001 + self._read_all_threshold = read_all_threshold @lazyval def _calendar(self): @@ -545,6 +561,7 @@ class BcolzDailyBarReader(DailyBarReader): end_idx, assets, ) + read_all = len(assets) > self._read_all_threshold return _read_bcolz_data( self._table, (end_idx - start_idx + 1, len(assets)), @@ -552,6 +569,7 @@ class BcolzDailyBarReader(DailyBarReader): first_rows, last_rows, offsets, + read_all, ) def _spot_col(self, colname): diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index 27754226..35eaa406 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -578,6 +578,9 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir): If this flag is set the ``bcolz_daily_bar_days`` will be the full set of trading days from the trading environment. This flag overrides ``BCOLZ_DAILY_BAR_LOOKBACK_DAYS``. + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD : int + If this flag is set, use the value as the `read_all_threshold` + parameter to BcolzDailyBarReader, otherwise use the default value. Methods ------- @@ -599,6 +602,7 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir): BCOLZ_DAILY_BAR_USE_FULL_CALENDAR = False BCOLZ_DAILY_BAR_START_DATE = alias('START_DATE') BCOLZ_DAILY_BAR_END_DATE = alias('END_DATE') + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = None # allows WithBcolzDailyBarReaderFromCSVs to call the `write_csvs` method # without needing to reimplement `init_class_fixtures` _write_method_name = 'write' @@ -632,7 +636,11 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir): cls._write_method_name, )(cls.make_daily_bar_data()) - cls.bcolz_daily_bar_reader = BcolzDailyBarReader(t) + if cls.BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD is not None: + cls.bcolz_daily_bar_reader = BcolzDailyBarReader( + t, cls.BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD) + else: + cls.bcolz_daily_bar_reader = BcolzDailyBarReader(t) class WithBcolzDailyBarReaderFromCSVs(WithBcolzDailyBarReader):