From 4a017ef63b4593c048c78844edbf0979e9a6d4e8 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Wed, 17 Aug 2016 13:06:07 -0400 Subject: [PATCH] ENH: Session bar reader resampled from minute data Implement a `SessionBarReader` which uses a minute bar reader as a backing source, resampling the minute bars into the box around the corresponding session data. Also, add future/CME test cases to resample suite. --- tests/data/test_resample.py | 152 +++++++++++++++++++++++++++++++++++- zipline/data/resample.py | 58 ++++++++++++-- zipline/testing/fixtures.py | 5 +- 3 files changed, 204 insertions(+), 11 deletions(-) diff --git a/tests/data/test_resample.py b/tests/data/test_resample.py index 9326613a..dd351c7d 100644 --- a/tests/data/test_resample.py +++ b/tests/data/test_resample.py @@ -23,12 +23,14 @@ from six import iteritems from zipline.data.resample import ( minute_to_session, - DailyHistoryAggregator + DailyHistoryAggregator, + MinuteResampleSessionBarReader, ) from zipline.testing.fixtures import ( WithEquityMinuteBarData, WithBcolzEquityMinuteBarReader, + WithBcolzFutureMinuteBarReader, ZiplineTestCase, ) @@ -56,6 +58,26 @@ NYSE_MINUTES = OrderedDict(( )) +CME_MINUTES = OrderedDict(( + ('day_0_front', pd.date_range('2016-03-15 18:01', + '2016-03-15 18:03', + freq='min', + tz='US/Eastern').tz_convert('UTC')), + ('day_0_back', pd.date_range('2016-03-16 17:58', + '2016-03-16 18:00', + freq='min', + tz='US/Eastern').tz_convert('UTC')), + ('day_1_front', pd.date_range('2016-03-16 18:01', + '2016-03-16 18:03', + freq='min', + tz='US/Eastern').tz_convert('UTC')), + ('day_1_back', pd.date_range('2016-03-17 17:58', + '2016-03-17 18:00', + freq='min', + tz='US/Eastern').tz_convert('UTC')), +)) + + SCENARIOS = OrderedDict(( ('none_missing', array([ [101.5, 101.9, 101.1, 101.3, 1001], @@ -103,6 +125,24 @@ for sid, combos in _EQUITY_CASES: for s, m in combos] EQUITY_CASES[sid] = pd.concat(frames) +_FUTURE_CASES = ( + (1001, (('none_missing', 'day_0_front'), + ('none_missing', 'day_0_back'))), + (1002, (('missing_first', 'day_0_front'), + ('none_missing', 'day_0_back'))), + (1003, (('missing_last', 'day_0_back'), + ('missing_first', 'day_1_front'))), +) + +FUTURE_CASES = OrderedDict() + +for sid, combos in _FUTURE_CASES: + frames = [DataFrame(SCENARIOS[s], columns=OHLCV). + set_index(CME_MINUTES[m]) + for s, m in combos] + FUTURE_CASES[sid] = pd.concat(frames) + + EXPECTED_AGGREGATION = { 1: DataFrame({ 'open': [101.5, 101.5, 101.5, 101.5, 101.5, 101.5], @@ -126,18 +166,50 @@ EXPECTED_AGGREGATION = { 'close': [107.3, 108.3, 108.3, nan, 103.3, 102.3], 'volume': [1007, 2015, 2015, 0, 1003, 2005], }, columns=OHLCV), + 1001: DataFrame({ + 'open': [101.5, 101.5, 101.5, 101.5, 101.5, 101.5], + 'high': [101.9, 103.9, 103.9, 103.9, 103.9, 103.9], + 'low': [101.1, 101.1, 101.1, 101.1, 101.1, 101.1], + 'close': [101.3, 103.3, 102.3, 101.3, 103.3, 102.3], + 'volume': [1001, 2004, 3006, 4007, 5010, 6012], + }, columns=OHLCV), + 1002: DataFrame({ + 'open': [nan, 103.5, 103.5, 103.5, 103.5, 103.5], + 'high': [nan, 103.9, 103.9, 103.9, 103.9, 103.9], + 'low': [nan, 103.1, 102.1, 101.1, 101.1, 101.1], + 'close': [nan, 103.3, 102.3, 101.3, 103.3, 102.3], + 'volume': [0, 1003, 2005, 3006, 4009, 5011], + }, columns=OHLCV), + # Equity 3 straddles two days. + 1003: DataFrame({ + 'open': [107.5, 107.5, 107.5, nan, 103.5, 103.5], + 'high': [107.9, 108.9, 108.9, nan, 103.9, 103.9], + 'low': [107.1, 107.1, 107.1, nan, 103.1, 102.1], + 'close': [107.3, 108.3, 108.3, nan, 103.3, 102.3], + 'volume': [1007, 2015, 2015, 0, 1003, 2005], + }, columns=OHLCV), } EXPECTED_SESSIONS = { 1: DataFrame([EXPECTED_AGGREGATION[1].iloc[-1].values], columns=OHLCV, - index=['2016-03-15']), + index=pd.to_datetime(['2016-03-15'], utc=True)), 2: DataFrame([EXPECTED_AGGREGATION[2].iloc[-1].values], columns=OHLCV, - index=['2016-03-15']), + index=pd.to_datetime(['2016-03-15'], utc=True)), 3: DataFrame(EXPECTED_AGGREGATION[3].iloc[[2, 5]].values, columns=OHLCV, - index=['2016-03-15', '2016-03-16']), + index=pd.to_datetime(['2016-03-15', '2016-03-16'], utc=True)), + 1001: DataFrame([EXPECTED_AGGREGATION[1001].iloc[-1].values], + columns=OHLCV, + index=pd.to_datetime(['2016-03-16'], utc=True)), + 1002: DataFrame([EXPECTED_AGGREGATION[1002].iloc[-1].values], + columns=OHLCV, + index=pd.to_datetime(['2016-03-16'], utc=True)), + 1003: DataFrame(EXPECTED_AGGREGATION[1003].iloc[[2, 5]].values, + columns=OHLCV, + index=pd.to_datetime(['2016-03-16', '2016-03-17'], + utc=True)) } @@ -383,3 +455,75 @@ class TestMinuteToSession(WithEquityMinuteBarData, assert_almost_equal(expected.values, result.values, err_msg='sid={0}'.format(sid)) + + +class TestResampleSessionBars(WithBcolzFutureMinuteBarReader, + ZiplineTestCase): + + TRADING_CALENDAR_STRS = ('CME',) + TRADING_CALENDAR_PRIMARY_CAL = 'CME' + + ASSET_FINDER_FUTURE_SIDS = 1001, 1002, 1003 + + START_DATE = pd.Timestamp('2016-03-16', tz='UTC') + END_DATE = pd.Timestamp('2016-03-17', tz='UTC') + NUM_SESSIONS = 2 + + @classmethod + def make_future_minute_bar_data(cls): + for sid in cls.ASSET_FINDER_FUTURE_SIDS: + frame = FUTURE_CASES[sid] + yield sid, frame + + def test_resample(self): + calendar = self.trading_calendar + session_bar_reader = MinuteResampleSessionBarReader( + calendar, + self.bcolz_future_minute_bar_reader + ) + for sid in self.ASSET_FINDER_FUTURE_SIDS: + case_frame = FUTURE_CASES[sid] + first = case_frame.index[0] + last = case_frame.index[-1] + result = session_bar_reader.load_raw_arrays( + ['open', 'high', 'low', 'close', 'volume'], + first, last, [sid]) + assert_almost_equal(result, EXPECTED_SESSIONS[sid], + err_msg="sid={0}".format(sid)) + + def test_sessions(self): + calendar = self.trading_calendar + session_bar_reader = MinuteResampleSessionBarReader( + calendar, + self.bcolz_future_minute_bar_reader + ) + sessions = session_bar_reader.sessions + + self.assertEqual(self.NUM_SESSIONS, len(sessions)) + self.assertEqual(self.START_DATE, sessions[0]) + self.assertEqual(self.END_DATE, sessions[-1]) + + def test_last_available_dt(self): + calendar = self.trading_calendar + session_bar_reader = MinuteResampleSessionBarReader( + calendar, + self.bcolz_future_minute_bar_reader + ) + + self.assertEqual(self.END_DATE, session_bar_reader.last_available_dt) + + def test_spot_price(self): + calendar = self.trading_calendar + session_bar_reader = MinuteResampleSessionBarReader( + calendar, + self.bcolz_future_minute_bar_reader + ) + for sid in self.ASSET_FINDER_FUTURE_SIDS: + expected = EXPECTED_SESSIONS[sid] + for dt_str, values in expected.iterrows(): + dt = pd.Timestamp(dt_str, tz='UTC') + for col in OHLCV: + result = session_bar_reader.spot_price(sid, dt, col) + assert_almost_equal(values[col], result, + err_msg="sid={0} col={1} dt={2}". + format(sid, col, dt)) diff --git a/zipline/data/resample.py b/zipline/data/resample.py index 05d223e5..170616da 100644 --- a/zipline/data/resample.py +++ b/zipline/data/resample.py @@ -15,6 +15,10 @@ from collections import OrderedDict import numpy as np import pandas as pd +from pandas import DataFrame + +from zipline.data.session_bars import SessionBarReader +from zipline.utils.memoize import lazyval _MINUTE_TO_SESSION_OHCLV_HOW = OrderedDict(( ('open', 'first'), @@ -45,11 +49,10 @@ def minute_to_session(minute_frame, calendar): A DataFrame with the columns `open`, `high`, `low`, `close`, `volume`, and `day` (datetime-like). """ - # Group minutes into their respective days. Note that this will - # create groups for all trading days in the desired range, - # including days with no minute data. - return minute_frame.resample(calendar.day, - how=_MINUTE_TO_SESSION_OHCLV_HOW) + how = OrderedDict((c, _MINUTE_TO_SESSION_OHCLV_HOW[c]) + for c in minute_frame.columns) + return minute_frame.groupby(calendar.minute_to_session_label).agg( + how) class DailyHistoryAggregator(object): @@ -450,3 +453,48 @@ class DailyHistoryAggregator(object): volumes.append(val) continue return np.array(volumes) + + +class MinuteResampleSessionBarReader(SessionBarReader): + + def __init__(self, calendar, minute_bar_reader): + self._calendar = calendar + self._minute_bar_reader = minute_bar_reader + + def _get_resampled(self, columns, start_dt, end_dt, assets): + minute_data = self._minute_bar_reader.load_raw_arrays( + columns, start_dt, end_dt, assets) + dts = self._calendar.minutes_in_range(start_dt, end_dt) + minute_frame = DataFrame( + [d.T[0] for d in minute_data], index=columns, columns=dts).T + return minute_to_session(minute_frame, self._calendar) + + @property + def trading_calendar(self): + return self._calendar + + def load_raw_arrays(self, columns, start_dt, end_dt, assets): + return self._get_resampled(columns, start_dt, end_dt, assets).values + + def spot_price(self, sid, session, colname): + # WARNING: This will need caching or other optimization if used in a + # tight loop. + # This was developed to complete interface, but has not been tuned + # for real world use. + start, end = self._calendar.open_and_close_for_session(session) + frame = self._get_resampled([colname], start, end, [sid]) + return frame.loc[session, colname] + + @lazyval + def sessions(self): + cal = self._calendar + first = self._minute_bar_reader.first_trading_day + last = cal.minute_to_session_label( + self._minute_bar_reader.last_available_dt) + return cal.sessions_in_range(first, last) + + @lazyval + def last_available_dt(self): + return self.trading_calendar.minute_to_session_label( + self._minute_bar_reader.last_available_dt + ) diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index b2d440c3..fbbda278 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -393,11 +393,10 @@ class WithTradingCalendars(object): TRADING_CALENDAR_STRS = ('NYSE',) TRADING_CALENDAR_FOR_ASSET_TYPE = {Equity: 'NYSE'} TRADING_CALENDAR_FOR_EXCHANGE = {} - # For backwards compatibility, exisitng tests and fixtures refer to # `trading_calendar` with the assumption that the value is the NYSE # calendar. - trading_calendar = alias('nyse_calendar') + TRADING_CALENDAR_PRIMARY_CAL = 'NYSE' @classmethod def init_class_fixtures(cls): @@ -418,6 +417,8 @@ class WithTradingCalendars(object): for exchange, cal_str in iteritems(cls.TRADING_CALENDAR_FOR_EXCHANGE): register_calendar(exchange, get_calendar(cal_str)) cls.trading_calendars[exchange] = get_calendar(cal_str) + cls.trading_calendar = cls.trading_calendars[ + cls.TRADING_CALENDAR_PRIMARY_CAL] class WithTradingEnvironment(WithAssetFinder, WithTradingCalendars):