diff --git a/setup.py b/setup.py index 9cf59e00..cd3b79cc 100644 --- a/setup.py +++ b/setup.py @@ -103,6 +103,10 @@ ext_modules = [ 'zipline.utils.calendars._calendar_helpers', ['zipline/utils/calendars/_calendar_helpers.pyx'] ), + Extension( + 'zipline.data._resample', + ['zipline/data/_resample.pyx'] + ), ] diff --git a/tests/data/test_resample.py b/tests/data/test_resample.py index debd2625..db8d2102 100644 --- a/tests/data/test_resample.py +++ b/tests/data/test_resample.py @@ -23,7 +23,7 @@ from six import iteritems from zipline.data.bar_reader import NoDataOnDate from zipline.data.resample import ( - minute_to_session, + minute_frame_to_session_frame, DailyHistoryAggregator, MinuteResampleSessionBarReader, ReindexMinuteBarReader, @@ -501,7 +501,7 @@ class TestMinuteToSession(WithEquityMinuteBarData, for sid in self.ASSET_FINDER_EQUITY_SIDS: frame = self.equity_frames[sid] expected = EXPECTED_SESSIONS[sid] - result = minute_to_session(frame, self.nyse_calendar) + result = minute_frame_to_session_frame(frame, self.nyse_calendar) assert_almost_equal(expected.values, result.values, err_msg='sid={0}'.format(sid)) @@ -557,7 +557,8 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader, OHLCV, first, last, [sid]) for i, field in enumerate(OHLCV): assert_almost_equal( - result[i], EXPECTED_SESSIONS[sid][[field]], + EXPECTED_SESSIONS[sid][[field]], + result[i], err_msg="sid={0} field={1}".format(sid, field)) def test_sessions(self): @@ -588,7 +589,8 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader, dt = pd.Timestamp(dt_str, tz='UTC') for col in OHLCV: result = session_bar_reader.get_value(sid, dt, col) - assert_almost_equal(values[col], result, + assert_almost_equal(result, + values[col], err_msg="sid={0} col={1} dt={2}". format(sid, col, dt)) diff --git a/zipline/data/_resample.pyx b/zipline/data/_resample.pyx new file mode 100644 index 00000000..ce87e9d6 --- /dev/null +++ b/zipline/data/_resample.pyx @@ -0,0 +1,108 @@ +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from cython cimport boundscheck, wraparound +from numpy import finfo, float64, nan, isnan +from numpy cimport intp_t, float64_t, uint32_t + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_open(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + for i, close_loc in enumerate(close_locs): + val = data[loc] + while isnan(val) and loc <= close_loc: + loc += 1 + val = data[loc] + out[i] = val + loc = close_loc + 1 + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_high(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + for i, close_loc in enumerate(close_locs): + val = -1 + while loc <= close_loc: + val = max(val, data[loc]) + loc += 1 + if val == -1: + val = nan + out[i] = val + loc = close_loc + 1 + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_low(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + cdef float64_t max_float = finfo(float64).max + for i, close_loc in enumerate(close_locs): + val = max_float + while loc <= close_loc: + val = min(val, data[loc]) + loc += 1 + if val == max_float: + val = nan + out[i] = val + loc = close_loc + 1 + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_close(intp_t[:] close_locs, + float64_t[:] data, + float64_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef float64_t val + loc = len(data) - 1 + num_out = len(close_locs) + for j in range(num_out, 0, -1): + i = j - 1 + if i > 0: + close_loc = close_locs[i - 1] + else: + close_loc = -1 + val = data[loc] + while isnan(val) and loc > close_loc: + loc -= 1 + val = data[loc] + out[i] = val + loc = close_loc + + +@boundscheck(False) +@wraparound(False) +cpdef void _minute_to_session_volume(intp_t[:] close_locs, + uint32_t[:] data, + uint32_t[:] out): + cdef intp_t i, close_loc, loc = 0 + cdef uint32_t val + loc = 0 + for i, close_loc in enumerate(close_locs): + val = 0 + while loc <= close_loc: + val += data[loc] + loc += 1 + out[i] = val + loc = close_loc + 1 diff --git a/zipline/data/resample.py b/zipline/data/resample.py index 38e4cb1d..aa87e594 100644 --- a/zipline/data/resample.py +++ b/zipline/data/resample.py @@ -16,9 +16,15 @@ from abc import ABCMeta, abstractmethod import numpy as np import pandas as pd -from pandas import DataFrame from six import with_metaclass +from zipline.data._resample import ( + _minute_to_session_open, + _minute_to_session_high, + _minute_to_session_low, + _minute_to_session_close, + _minute_to_session_volume, +) from zipline.data.minute_bars import MinuteBarReader from zipline.data.session_bars import SessionBarReader from zipline.utils.memoize import lazyval @@ -32,7 +38,8 @@ _MINUTE_TO_SESSION_OHCLV_HOW = OrderedDict(( )) -def minute_to_session(minute_frame, calendar): +def minute_frame_to_session_frame(minute_frame, calendar): + """ Resample a DataFrame with minute data into the frame expected by a BcolzDailyBarWriter. @@ -57,6 +64,40 @@ def minute_to_session(minute_frame, calendar): return minute_frame.groupby(calendar.minute_to_session_label).agg(how) +def minute_to_session(column, close_locs, data, out): + """ + Resample an array with minute data into an array with session data. + + This function assumes that the minute data is the exact length of all + minutes in the sessions in the output. + + Parameters + ---------- + column : str + The `open`, `high`, `low`, `close`, or `volume` column. + close_locs : array[intp] + The locations in `data` which are the market close minutes. + data : array[float64|uint32] + The minute data to be sampled into session data. + The first value should align with the market open of the first session, + containing values for all minutes for all sessions. With the last value + being the market close of the last session. + out : array[float64|uint32] + The output array into which to write the sampled sessions. + """ + if column == 'open': + _minute_to_session_open(close_locs, data, out) + elif column == 'high': + _minute_to_session_high(close_locs, data, out) + elif column == 'low': + _minute_to_session_low(close_locs, data, out) + elif column == 'close': + _minute_to_session_close(close_locs, data, out) + elif column == 'volume': + _minute_to_session_volume(close_locs, data, out) + return out + + class DailyHistoryAggregator(object): """ Converts minute pricing data into a daily summary, to be used for the @@ -462,47 +503,45 @@ class MinuteResampleSessionBarReader(SessionBarReader): def _get_resampled(self, columns, start_dt, end_dt, assets): minute_data = self._minute_bar_reader.load_raw_arrays( columns, start_dt, end_dt, assets) - dts = self._calendar.minutes_in_range(start_dt, end_dt) - frames = [] - for i, _ in enumerate(assets): - minute_frame = DataFrame((d.T[i] for d in minute_data), - index=columns, columns=dts).T - df = minute_to_session(minute_frame, self._calendar) - frames.append(df) - return frames - - @property - def trading_calendar(self): - return self._calendar - - def load_raw_arrays(self, columns, start_dt, end_dt, sids): + dts = self._calendar.minutes_in_range(start_dt, end_dt).values sessions = self._calendar.sessions_in_range(start_dt, end_dt) - range_open, _ = self._calendar.open_and_close_for_session( - start_dt) - _, range_close = self._calendar.open_and_close_for_session( - end_dt) - shape = len(sessions), len(sids) + m_closes = np.zeros(len(sessions), dtype=np.dtype('datetime64[ns]')) + for i, s in enumerate(sessions): + close = self._calendar.open_and_close_for_session(s)[1] + m_closes[i] = close.value + m_locs = np.searchsorted(dts, m_closes) results = [] + shape = (len(sessions), len(assets)) for col in columns: if col != 'volume': out = np.full(shape, np.nan) else: out = np.zeros(shape, dtype=np.uint32) results.append(out) - frames = self._get_resampled(columns, range_open, range_close, sids) - for i, result in enumerate(results): - for j, frame in enumerate(frames): - result[:, j] = frame.values[:, i] + for i in range(len(assets)): + for j, column in enumerate(columns): + data = minute_data[j][:, i] + minute_to_session(column, m_locs, data, results[j][:, i]) return results + @property + def trading_calendar(self): + return self._calendar + + def load_raw_arrays(self, columns, start_dt, end_dt, sids): + range_open, _ = self._calendar.open_and_close_for_session( + start_dt) + _, range_close = self._calendar.open_and_close_for_session( + end_dt) + return self._get_resampled(columns, range_open, range_close, sids) + def get_value(self, sid, session, colname): # WARNING: This will need caching or other optimization if used in a # tight loop. # This was developed to complete interface, but has not been tuned # for real world use. start, end = self._calendar.open_and_close_for_session(session) - frame = self._get_resampled([colname], start, end, [sid])[0] - return frame.loc[session, colname] + return self._get_resampled([colname], start, end, [sid])[0] @lazyval def sessions(self): diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index ed5ad62f..b7435bbe 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -15,7 +15,7 @@ from .core import ( ) from ..data.data_portal import DataPortal from ..data.resample import ( - minute_to_session, + minute_frame_to_session_frame, MinuteResampleSessionBarReader ) from ..data.us_equity_pricing import ( @@ -679,8 +679,9 @@ class WithEquityDailyBarData(WithTradingEnvironment): assets = cls.asset_finder.retrieve_all(cls.asset_finder.equities_sids) minute_data = dict(cls.make_equity_minute_bar_data()) for asset in assets: - yield asset.sid, minute_to_session(minute_data[asset.sid], - cls.trading_calendars[Equity]) + yield asset.sid, minute_frame_to_session_frame( + minute_data[asset.sid], + cls.trading_calendars[Equity]) @classmethod def make_equity_daily_bar_data(cls):