PERF: Speedup minute to session sampling.

The minute to session sampling reading was creating two DataFrame
objects, the first to hold the minute data, and then a second returned
by the `DataFrame.groupby` to sample down to sessions.

Instead use the arrays returned by the minute readers `load_raw_arrays`
and implement sampling logic which takes advantage that the minutes
being passed start with the first minute of the first session and end
with the last minute of the last session.

On my machine this takes the tests in `test/test_continuous_futures`
from ~4.0 to about ~0.1 seconds.
This commit is contained in:
Eddie Hebert
2016-10-20 13:43:54 -04:00
parent eff2428068
commit e82fef41dd
5 changed files with 188 additions and 34 deletions
+4
View File
@@ -103,6 +103,10 @@ ext_modules = [
'zipline.utils.calendars._calendar_helpers',
['zipline/utils/calendars/_calendar_helpers.pyx']
),
Extension(
'zipline.data._resample',
['zipline/data/_resample.pyx']
),
]
+6 -4
View File
@@ -23,7 +23,7 @@ from six import iteritems
from zipline.data.bar_reader import NoDataOnDate
from zipline.data.resample import (
minute_to_session,
minute_frame_to_session_frame,
DailyHistoryAggregator,
MinuteResampleSessionBarReader,
ReindexMinuteBarReader,
@@ -501,7 +501,7 @@ class TestMinuteToSession(WithEquityMinuteBarData,
for sid in self.ASSET_FINDER_EQUITY_SIDS:
frame = self.equity_frames[sid]
expected = EXPECTED_SESSIONS[sid]
result = minute_to_session(frame, self.nyse_calendar)
result = minute_frame_to_session_frame(frame, self.nyse_calendar)
assert_almost_equal(expected.values,
result.values,
err_msg='sid={0}'.format(sid))
@@ -557,7 +557,8 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader,
OHLCV, first, last, [sid])
for i, field in enumerate(OHLCV):
assert_almost_equal(
result[i], EXPECTED_SESSIONS[sid][[field]],
EXPECTED_SESSIONS[sid][[field]],
result[i],
err_msg="sid={0} field={1}".format(sid, field))
def test_sessions(self):
@@ -588,7 +589,8 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader,
dt = pd.Timestamp(dt_str, tz='UTC')
for col in OHLCV:
result = session_bar_reader.get_value(sid, dt, col)
assert_almost_equal(values[col], result,
assert_almost_equal(result,
values[col],
err_msg="sid={0} col={1} dt={2}".
format(sid, col, dt))
+108
View File
@@ -0,0 +1,108 @@
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cython cimport boundscheck, wraparound
from numpy import finfo, float64, nan, isnan
from numpy cimport intp_t, float64_t, uint32_t
@boundscheck(False)
@wraparound(False)
cpdef void _minute_to_session_open(intp_t[:] close_locs,
float64_t[:] data,
float64_t[:] out):
cdef intp_t i, close_loc, loc = 0
cdef float64_t val
for i, close_loc in enumerate(close_locs):
val = data[loc]
while isnan(val) and loc <= close_loc:
loc += 1
val = data[loc]
out[i] = val
loc = close_loc + 1
@boundscheck(False)
@wraparound(False)
cpdef void _minute_to_session_high(intp_t[:] close_locs,
float64_t[:] data,
float64_t[:] out):
cdef intp_t i, close_loc, loc = 0
cdef float64_t val
for i, close_loc in enumerate(close_locs):
val = -1
while loc <= close_loc:
val = max(val, data[loc])
loc += 1
if val == -1:
val = nan
out[i] = val
loc = close_loc + 1
@boundscheck(False)
@wraparound(False)
cpdef void _minute_to_session_low(intp_t[:] close_locs,
float64_t[:] data,
float64_t[:] out):
cdef intp_t i, close_loc, loc = 0
cdef float64_t val
cdef float64_t max_float = finfo(float64).max
for i, close_loc in enumerate(close_locs):
val = max_float
while loc <= close_loc:
val = min(val, data[loc])
loc += 1
if val == max_float:
val = nan
out[i] = val
loc = close_loc + 1
@boundscheck(False)
@wraparound(False)
cpdef void _minute_to_session_close(intp_t[:] close_locs,
float64_t[:] data,
float64_t[:] out):
cdef intp_t i, close_loc, loc = 0
cdef float64_t val
loc = len(data) - 1
num_out = len(close_locs)
for j in range(num_out, 0, -1):
i = j - 1
if i > 0:
close_loc = close_locs[i - 1]
else:
close_loc = -1
val = data[loc]
while isnan(val) and loc > close_loc:
loc -= 1
val = data[loc]
out[i] = val
loc = close_loc
@boundscheck(False)
@wraparound(False)
cpdef void _minute_to_session_volume(intp_t[:] close_locs,
uint32_t[:] data,
uint32_t[:] out):
cdef intp_t i, close_loc, loc = 0
cdef uint32_t val
loc = 0
for i, close_loc in enumerate(close_locs):
val = 0
while loc <= close_loc:
val += data[loc]
loc += 1
out[i] = val
loc = close_loc + 1
+66 -27
View File
@@ -16,9 +16,15 @@ from abc import ABCMeta, abstractmethod
import numpy as np
import pandas as pd
from pandas import DataFrame
from six import with_metaclass
from zipline.data._resample import (
_minute_to_session_open,
_minute_to_session_high,
_minute_to_session_low,
_minute_to_session_close,
_minute_to_session_volume,
)
from zipline.data.minute_bars import MinuteBarReader
from zipline.data.session_bars import SessionBarReader
from zipline.utils.memoize import lazyval
@@ -32,7 +38,8 @@ _MINUTE_TO_SESSION_OHCLV_HOW = OrderedDict((
))
def minute_to_session(minute_frame, calendar):
def minute_frame_to_session_frame(minute_frame, calendar):
"""
Resample a DataFrame with minute data into the frame expected by a
BcolzDailyBarWriter.
@@ -57,6 +64,40 @@ def minute_to_session(minute_frame, calendar):
return minute_frame.groupby(calendar.minute_to_session_label).agg(how)
def minute_to_session(column, close_locs, data, out):
"""
Resample an array with minute data into an array with session data.
This function assumes that the minute data is the exact length of all
minutes in the sessions in the output.
Parameters
----------
column : str
The `open`, `high`, `low`, `close`, or `volume` column.
close_locs : array[intp]
The locations in `data` which are the market close minutes.
data : array[float64|uint32]
The minute data to be sampled into session data.
The first value should align with the market open of the first session,
containing values for all minutes for all sessions. With the last value
being the market close of the last session.
out : array[float64|uint32]
The output array into which to write the sampled sessions.
"""
if column == 'open':
_minute_to_session_open(close_locs, data, out)
elif column == 'high':
_minute_to_session_high(close_locs, data, out)
elif column == 'low':
_minute_to_session_low(close_locs, data, out)
elif column == 'close':
_minute_to_session_close(close_locs, data, out)
elif column == 'volume':
_minute_to_session_volume(close_locs, data, out)
return out
class DailyHistoryAggregator(object):
"""
Converts minute pricing data into a daily summary, to be used for the
@@ -462,47 +503,45 @@ class MinuteResampleSessionBarReader(SessionBarReader):
def _get_resampled(self, columns, start_dt, end_dt, assets):
minute_data = self._minute_bar_reader.load_raw_arrays(
columns, start_dt, end_dt, assets)
dts = self._calendar.minutes_in_range(start_dt, end_dt)
frames = []
for i, _ in enumerate(assets):
minute_frame = DataFrame((d.T[i] for d in minute_data),
index=columns, columns=dts).T
df = minute_to_session(minute_frame, self._calendar)
frames.append(df)
return frames
@property
def trading_calendar(self):
return self._calendar
def load_raw_arrays(self, columns, start_dt, end_dt, sids):
dts = self._calendar.minutes_in_range(start_dt, end_dt).values
sessions = self._calendar.sessions_in_range(start_dt, end_dt)
range_open, _ = self._calendar.open_and_close_for_session(
start_dt)
_, range_close = self._calendar.open_and_close_for_session(
end_dt)
shape = len(sessions), len(sids)
m_closes = np.zeros(len(sessions), dtype=np.dtype('datetime64[ns]'))
for i, s in enumerate(sessions):
close = self._calendar.open_and_close_for_session(s)[1]
m_closes[i] = close.value
m_locs = np.searchsorted(dts, m_closes)
results = []
shape = (len(sessions), len(assets))
for col in columns:
if col != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
results.append(out)
frames = self._get_resampled(columns, range_open, range_close, sids)
for i, result in enumerate(results):
for j, frame in enumerate(frames):
result[:, j] = frame.values[:, i]
for i in range(len(assets)):
for j, column in enumerate(columns):
data = minute_data[j][:, i]
minute_to_session(column, m_locs, data, results[j][:, i])
return results
@property
def trading_calendar(self):
return self._calendar
def load_raw_arrays(self, columns, start_dt, end_dt, sids):
range_open, _ = self._calendar.open_and_close_for_session(
start_dt)
_, range_close = self._calendar.open_and_close_for_session(
end_dt)
return self._get_resampled(columns, range_open, range_close, sids)
def get_value(self, sid, session, colname):
# WARNING: This will need caching or other optimization if used in a
# tight loop.
# This was developed to complete interface, but has not been tuned
# for real world use.
start, end = self._calendar.open_and_close_for_session(session)
frame = self._get_resampled([colname], start, end, [sid])[0]
return frame.loc[session, colname]
return self._get_resampled([colname], start, end, [sid])[0]
@lazyval
def sessions(self):
+4 -3
View File
@@ -15,7 +15,7 @@ from .core import (
)
from ..data.data_portal import DataPortal
from ..data.resample import (
minute_to_session,
minute_frame_to_session_frame,
MinuteResampleSessionBarReader
)
from ..data.us_equity_pricing import (
@@ -679,8 +679,9 @@ class WithEquityDailyBarData(WithTradingEnvironment):
assets = cls.asset_finder.retrieve_all(cls.asset_finder.equities_sids)
minute_data = dict(cls.make_equity_minute_bar_data())
for asset in assets:
yield asset.sid, minute_to_session(minute_data[asset.sid],
cls.trading_calendars[Equity])
yield asset.sid, minute_frame_to_session_frame(
minute_data[asset.sid],
cls.trading_calendars[Equity])
@classmethod
def make_equity_daily_bar_data(cls):