From 66d05aa563c250e5b9926f1c7306f08e9386ff26 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Tue, 19 Apr 2016 16:00:49 -0400 Subject: [PATCH] PERF: Improve read time for smaller num of assets. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The BcolzDailyBarReader was optimized for the pipeline case of reading all assets at once. Now that the reader is also used to support daily history the case of reading a data for a small number of assets is more common, particularly in algorithms that use the history API which have a high rotation of assets (e.g. an algorithm which pipeline uses to set the active universe) Remove the bottleneck in reading a small number of assets by conditionally reading the slice for each asset from the carray, instead of reading the data for all equities and then indexing into that full array. On a certain number of assets, it is still better to read all the data at once. On the Quantopian dataset, which holds data for 20000 about for the last 10 years of equity data (where not all equities trade over the full range), stored in 118 blosc blp files per column, the tipping point where the 'read all' mode wins out between 3000-4000 assets. That number was tested by trying to exercise a worst case scenario where the equities were spread out evenly across the blp files, by stepping along a sorted list of assets that were alive over a query range which spanned 70 trading days. ``` size = 3000 sids = [assets[i] for i in range(0, len(assets), len(assets) / size)][:size] ``` Also, add parameter to WithBcolzDailyBarReader fixture which allows the test to specify what the threshold count for reading all data should be, so that the test_us_equity_pricing can be forced into either mode to make sure that both branches in logic are covered by all test cases. On local dev machine this patch improves the read time of `load_raw_array` for one asset from 100 ms to 96.5 µs. (10^5 improvement.) With reading only asset per call a being an observed common case when populating the non-cached values in USEquityHistoryLoader. --- tests/data/test_us_equity_pricing.py | 20 +++++++++++++ zipline/data/_equities.pyx | 43 ++++++++++++++++++++++------ zipline/data/us_equity_pricing.py | 20 ++++++++++++- zipline/testing/fixtures.py | 10 ++++++- 4 files changed, 83 insertions(+), 10 deletions(-) diff --git a/tests/data/test_us_equity_pricing.py b/tests/data/test_us_equity_pricing.py index b289c494..9391809b 100644 --- a/tests/data/test_us_equity_pricing.py +++ b/tests/data/test_us_equity_pricing.py @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from sys import maxsize + from nose_parameterized import parameterized from numpy import ( arange, @@ -330,3 +332,21 @@ class BcolzDailyBarTestCase(WithBcolzDailyBarReader, ZiplineTestCase): self.assertEqual(-1, close) finally: reader._spot_col('close')[zero_ix] = old + + +class BcolzDailyBarAlwaysReadAllTestCase(BcolzDailyBarTestCase): + """ + Force tests defined in BcolzDailyBarTestCase to always read the entire + column into memory before selecting desired asset data, when invoking + `load_raw_array`. + """ + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = 0 + + +class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase): + """ + Force tests defined in BcolzDailyBarTestCase to never read the entire + column into memory before selecting desired asset data, when invoking + `load_raw_array`. + """ + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize diff --git a/zipline/data/_equities.pyx b/zipline/data/_equities.pyx index 36cce95d..caa8ae69 100644 --- a/zipline/data/_equities.pyx +++ b/zipline/data/_equities.pyx @@ -14,6 +14,7 @@ # limitations under the License. import bcolz cimport cython +from cpython cimport bool from numpy import ( array, @@ -31,6 +32,7 @@ from numpy cimport ( ) from numpy.math cimport NAN +ctypedef object carray_t ctypedef object ctable_t ctypedef object Timestamp_t ctypedef object DatetimeIndex_t @@ -134,7 +136,8 @@ cpdef _read_bcolz_data(ctable_t table, list columns, intp_t[:] first_rows, intp_t[:] last_rows, - intp_t[:] offsets): + intp_t[:] offsets, + bool read_all): """ Load raw bcolz data for the given columns and indices. @@ -151,6 +154,9 @@ cpdef _read_bcolz_data(ctable_t table, last_rows : ndarray[intp] offsets : ndarray[intp Arrays in the format returned by _compute_row_slices. + read_all : bool + Whether to read_all sid data at once, or to read a silce from the + carray for each sid. Returns ------- @@ -160,6 +166,7 @@ cpdef _read_bcolz_data(ctable_t table, cdef: int nassets str column_name + carray_t carray ndarray[dtype=uint32_t, ndim=1] raw_data ndarray[dtype=uint32_t, ndim=2] outbuf ndarray[dtype=uint8_t, ndim=2, cast=True] where_nan @@ -172,20 +179,40 @@ cpdef _read_bcolz_data(ctable_t table, intp_t offset list results = [] + ndays = shape[0] nassets = shape[1] if not nassets== len(first_rows) == len(last_rows) == len(offsets): raise ValueError("Incompatible index arrays.") for column_name in columns: - raw_data = table[column_name][:] outbuf = zeros(shape=shape, dtype=uint32) - for asset in range(nassets): - first_row = first_rows[asset] - last_row = last_rows[asset] - offset = offsets[asset] - for out_idx, raw_idx in enumerate(range(first_row, last_row + 1)): - outbuf[out_idx + offset, asset] = raw_data[raw_idx] + if read_all: + raw_data = table[column_name][:] + for asset in range(nassets): + first_row = first_rows[asset] + last_row = last_rows[asset] + offset = offsets[asset] + if first_row <= last_row: + outbuf[offset:offset + (last_row + 1 - first_row), asset] =\ + raw_data[first_row:last_row + 1] + else: + continue + else: + carray = table[column_name] + + for asset in range(nassets): + first_row = first_rows[asset] + last_row = last_rows[asset] + offset = offsets[asset] + out_start = offset + out_end = (last_row - first_row) + offset + 1 + if first_row <= last_row: + outbuf[offset:offset + (last_row + 1 - first_row), asset] =\ + carray[first_row:last_row + 1] + else: + continue + if column_name in {'open', 'high', 'low', 'close'}: where_nan = (outbuf == 0) outbuf_as_float = outbuf.astype(float64) * .001 diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index a9b41bb3..d54e209b 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -417,6 +417,21 @@ class BcolzDailyBarReader(DailyBarReader): When read across the open, high, low, close, and volume with the same index should represent the same asset and day. + Parameters + ---------- + table : bcolz.ctable + The ctable contaning the pricing data, with attrs corresponding to the + Attributes list below. + read_all_threshold : int + The number of equities at which; + below, the data is read by reading a slice from the carray + per asset. + above, the data is read by pulling all of the data for all assets + into memory and then indexing into that array for each day and + asset pair. + Used to tune performance of reads when using a small or large number + of equities. + Attributes ---------- The table with which this loader interacts contains the following @@ -438,7 +453,7 @@ class BcolzDailyBarReader(DailyBarReader): range of queried dates. """ @preprocess(table=coerce_string(open_ctable, mode='r')) - def __init__(self, table): + def __init__(self, table, read_all_threshold=3000): self._table = table # Cache of fully read np.array for the carrays in the daily bar table. @@ -447,6 +462,7 @@ class BcolzDailyBarReader(DailyBarReader): # process first. self._spot_cols = {} self.PRICE_ADJUSTMENT_FACTOR = 0.001 + self._read_all_threshold = read_all_threshold @lazyval def _calendar(self): @@ -545,6 +561,7 @@ class BcolzDailyBarReader(DailyBarReader): end_idx, assets, ) + read_all = len(assets) > self._read_all_threshold return _read_bcolz_data( self._table, (end_idx - start_idx + 1, len(assets)), @@ -552,6 +569,7 @@ class BcolzDailyBarReader(DailyBarReader): first_rows, last_rows, offsets, + read_all, ) def _spot_col(self, colname): diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index 9748f0bb..144286c4 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -597,6 +597,9 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir): If this flag is set the ``bcolz_daily_bar_days`` will be the full set of trading days from the trading environment. This flag overrides ``BCOLZ_DAILY_BAR_LOOKBACK_DAYS``. + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD : int + If this flag is set, use the value as the `read_all_threshold` + parameter to BcolzDailyBarReader, otherwise use the default value. Methods ------- @@ -618,6 +621,7 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir): BCOLZ_DAILY_BAR_USE_FULL_CALENDAR = False BCOLZ_DAILY_BAR_START_DATE = alias('START_DATE') BCOLZ_DAILY_BAR_END_DATE = alias('END_DATE') + BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = None # allows WithBcolzDailyBarReaderFromCSVs to call the `write_csvs` method # without needing to reimplement `init_class_fixtures` _write_method_name = 'write' @@ -651,7 +655,11 @@ class WithBcolzDailyBarReader(WithTradingEnvironment, WithTmpDir): cls._write_method_name, )(cls.make_daily_bar_data()) - cls.bcolz_daily_bar_reader = BcolzDailyBarReader(t) + if cls.BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD is not None: + cls.bcolz_daily_bar_reader = BcolzDailyBarReader( + t, cls.BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD) + else: + cls.bcolz_daily_bar_reader = BcolzDailyBarReader(t) class WithBcolzDailyBarReaderFromCSVs(WithBcolzDailyBarReader):