From e82fef41dd714341bc90c465bbf3c10060d32d55 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Thu, 20 Oct 2016 13:43:54 -0400 Subject: [PATCH] PERF: Speedup minute to session sampling. The minute to session sampling reading was creating two DataFrame objects, the first to hold the minute data, and then a second returned by the `DataFrame.groupby` to sample down to sessions. Instead use the arrays returned by the minute readers `load_raw_arrays` and implement sampling logic which takes advantage that the minutes being passed start with the first minute of the first session and end with the last minute of the last session. On my machine this takes the tests in `test/test_continuous_futures` from ~4.0 to about ~0.1 seconds. --- setup.py | 4 ++ tests/data/test_resample.py | 10 ++-- zipline/data/_resample.pyx | 108 ++++++++++++++++++++++++++++++++++++ zipline/data/resample.py | 93 ++++++++++++++++++++++--------- zipline/testing/fixtures.py | 7 ++- 5 files changed, 188 insertions(+), 34 deletions(-) create mode 100644 zipline/data/_resample.pyx diff --git a/setup.py b/setup.py index 9cf59e00..cd3b79cc 100644 --- a/setup.py +++ b/setup.py @@ -103,6 +103,10 @@ ext_modules = [ 'zipline.utils.calendars._calendar_helpers', ['zipline/utils/calendars/_calendar_helpers.pyx'] ), + Extension( + 'zipline.data._resample', + ['zipline/data/_resample.pyx'] + ), ] diff --git a/tests/data/test_resample.py b/tests/data/test_resample.py index debd2625..db8d2102 100644 --- a/tests/data/test_resample.py +++ b/tests/data/test_resample.py @@ -23,7 +23,7 @@ from six import iteritems from zipline.data.bar_reader import NoDataOnDate from zipline.data.resample import ( - minute_to_session, + minute_frame_to_session_frame, DailyHistoryAggregator, MinuteResampleSessionBarReader, ReindexMinuteBarReader, @@ -501,7 +501,7 @@ class TestMinuteToSession(WithEquityMinuteBarData, for sid in self.ASSET_FINDER_EQUITY_SIDS: frame = self.equity_frames[sid] expected = EXPECTED_SESSIONS[sid] - result = minute_to_session(frame, self.nyse_calendar) + result = minute_frame_to_session_frame(frame, self.nyse_calendar) assert_almost_equal(expected.values, result.values, err_msg='sid={0}'.format(sid)) @@ -557,7 +557,8 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader, OHLCV, first, last, [sid]) for i, field in enumerate(OHLCV): assert_almost_equal( - result[i], EXPECTED_SESSIONS[sid][[field]], + EXPECTED_SESSIONS[sid][[field]], + result[i], err_msg="sid={0} field={1}".format(sid, field)) def test_sessions(self): @@ -588,7 +589,8 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader, dt = pd.Timestamp(dt_str, tz='UTC') for col in OHLCV: result = session_bar_reader.get_value(sid, dt, col) - assert_almost_equal(values[col], result, + assert_almost_equal(result, + values[col], err_msg="sid={0} col={1} dt={2}". format(sid, col, dt)) diff --git a/zipline/data/_resample.pyx b/zipline/data/_resample.pyx new file mode 100644 index 00000000..ce87e9d6 --- /dev/null +++ b/zipline/data/_resample.pyx @@ -0,0 +1,108 @@ +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from cython cimport boundscheck, wraparound +from numpy import finfo, float64, nan, isnan +from numpy cimport intp_t, float64_t, uint32_t + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_open(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + for i, close_loc in enumerate(close_locs): + val = data[loc] + while isnan(val) and loc <= close_loc: + loc += 1 + val = data[loc] + out[i] = val + loc = close_loc + 1 + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_high(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + for i, close_loc in enumerate(close_locs): + val = -1 + while loc <= close_loc: + val = max(val, data[loc]) + loc += 1 + if val == -1: + val = nan + out[i] = val + loc = close_loc + 1 + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_low(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + cdef float64_t max_float = finfo(float64).max + for i, close_loc in enumerate(close_locs): + val = max_float + while loc <= close_loc: + val = min(val, data[loc]) + loc += 1 + if val == max_float: + val = nan + out[i] = val + loc = close_loc + 1 + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_close(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + loc = len(data) - 1 + num_out = len(close_locs) + for j in range(num_out, 0, -1): + i = j - 1 + if i > 0: + close_loc = close_locs[i - 1] + else: + close_loc = -1 + val = data[loc] + while isnan(val) and loc > close_loc: + loc -= 1 + val = data[loc] + out[i] = val + loc = close_loc + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_volume(intp_t[:] close_locs, + uint32_t[:] data, + uint32_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef uint32_t val + loc = 0 + for i, close_loc in enumerate(close_locs): + val = 0 + while loc <= close_loc: + val += data[loc] + loc += 1 + out[i] = val + loc = close_loc + 1 diff --git a/zipline/data/resample.py b/zipline/data/resample.py index 38e4cb1d..aa87e594 100644 --- a/zipline/data/resample.py +++ b/zipline/data/resample.py @@ -16,9 +16,15 @@ from abc import ABCMeta, abstractmethod import numpy as np import pandas as pd -from pandas import DataFrame from six import with_metaclass +from zipline.data._resample import ( + _minute_to_session_open, + _minute_to_session_high, + _minute_to_session_low, + _minute_to_session_close, + _minute_to_session_volume, +) from zipline.data.minute_bars import MinuteBarReader from zipline.data.session_bars import SessionBarReader from zipline.utils.memoize import lazyval @@ -32,7 +38,8 @@ _MINUTE_TO_SESSION_OHCLV_HOW = OrderedDict(( )) -def minute_to_session(minute_frame, calendar): +def minute_frame_to_session_frame(minute_frame, calendar): + """ Resample a DataFrame with minute data into the frame expected by a BcolzDailyBarWriter. @@ -57,6 +64,40 @@ def minute_to_session(minute_frame, calendar): return minute_frame.groupby(calendar.minute_to_session_label).agg(how) +def minute_to_session(column, close_locs, data, out): + """ + Resample an array with minute data into an array with session data. + + This function assumes that the minute data is the exact length of all + minutes in the sessions in the output. + + Parameters + ---------- + column : str + The `open`, `high`, `low`, `close`, or `volume` column. + close_locs : array[intp] + The locations in `data` which are the market close minutes. + data : array[float64|uint32] + The minute data to be sampled into session data. + The first value should align with the market open of the first session, + containing values for all minutes for all sessions. With the last value + being the market close of the last session. + out : array[float64|uint32] + The output array into which to write the sampled sessions. + """ + if column == 'open': + _minute_to_session_open(close_locs, data, out) + elif column == 'high': + _minute_to_session_high(close_locs, data, out) + elif column == 'low': + _minute_to_session_low(close_locs, data, out) + elif column == 'close': + _minute_to_session_close(close_locs, data, out) + elif column == 'volume': + _minute_to_session_volume(close_locs, data, out) + return out + + class DailyHistoryAggregator(object): """ Converts minute pricing data into a daily summary, to be used for the @@ -462,47 +503,45 @@ class MinuteResampleSessionBarReader(SessionBarReader): def _get_resampled(self, columns, start_dt, end_dt, assets): minute_data = self._minute_bar_reader.load_raw_arrays( columns, start_dt, end_dt, assets) - dts = self._calendar.minutes_in_range(start_dt, end_dt) - frames = [] - for i, _ in enumerate(assets): - minute_frame = DataFrame((d.T[i] for d in minute_data), - index=columns, columns=dts).T - df = minute_to_session(minute_frame, self._calendar) - frames.append(df) - return frames - - @property - def trading_calendar(self): - return self._calendar - - def load_raw_arrays(self, columns, start_dt, end_dt, sids): + dts = self._calendar.minutes_in_range(start_dt, end_dt).values sessions = self._calendar.sessions_in_range(start_dt, end_dt) - range_open, _ = self._calendar.open_and_close_for_session( - start_dt) - _, range_close = self._calendar.open_and_close_for_session( - end_dt) - shape = len(sessions), len(sids) + m_closes = np.zeros(len(sessions), dtype=np.dtype('datetime64[ns]')) + for i, s in enumerate(sessions): + close = self._calendar.open_and_close_for_session(s)[1] + m_closes[i] = close.value + m_locs = np.searchsorted(dts, m_closes) results = [] + shape = (len(sessions), len(assets)) for col in columns: if col != 'volume': out = np.full(shape, np.nan) else: out = np.zeros(shape, dtype=np.uint32) results.append(out) - frames = self._get_resampled(columns, range_open, range_close, sids) - for i, result in enumerate(results): - for j, frame in enumerate(frames): - result[:, j] = frame.values[:, i] + for i in range(len(assets)): + for j, column in enumerate(columns): + data = minute_data[j][:, i] + minute_to_session(column, m_locs, data, results[j][:, i]) return results + @property + def trading_calendar(self): + return self._calendar + + def load_raw_arrays(self, columns, start_dt, end_dt, sids): + range_open, _ = self._calendar.open_and_close_for_session( + start_dt) + _, range_close = self._calendar.open_and_close_for_session( + end_dt) + return self._get_resampled(columns, range_open, range_close, sids) + def get_value(self, sid, session, colname): # WARNING: This will need caching or other optimization if used in a # tight loop. # This was developed to complete interface, but has not been tuned # for real world use. start, end = self._calendar.open_and_close_for_session(session) - frame = self._get_resampled([colname], start, end, [sid])[0] - return frame.loc[session, colname] + return self._get_resampled([colname], start, end, [sid])[0] @lazyval def sessions(self): diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index ed5ad62f..b7435bbe 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -15,7 +15,7 @@ from .core import ( ) from ..data.data_portal import DataPortal from ..data.resample import ( - minute_to_session, + minute_frame_to_session_frame, MinuteResampleSessionBarReader ) from ..data.us_equity_pricing import ( @@ -679,8 +679,9 @@ class WithEquityDailyBarData(WithTradingEnvironment): assets = cls.asset_finder.retrieve_all(cls.asset_finder.equities_sids) minute_data = dict(cls.make_equity_minute_bar_data()) for asset in assets: - yield asset.sid, minute_to_session(minute_data[asset.sid], - cls.trading_calendars[Equity]) + yield asset.sid, minute_frame_to_session_frame( + minute_data[asset.sid], + cls.trading_calendars[Equity]) @classmethod def make_equity_daily_bar_data(cls):