diff --git a/tests/data/test_dispatch_bar_reader.py b/tests/data/test_dispatch_bar_reader.py new file mode 100644 index 00000000..a034b2fa --- /dev/null +++ b/tests/data/test_dispatch_bar_reader.py @@ -0,0 +1,332 @@ +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from numpy import array, nan +from numpy.testing import assert_almost_equal +from pandas import DataFrame, Timestamp + +from zipline.assets import Equity, Future + +from zipline.data.dispatch_bar_reader import ( + AssetDispatchMinuteBarReader, + AssetDispatchSessionBarReader, +) +from zipline.data.resample import ( + MinuteResampleSessionBarReader, + ReindexMinuteBarReader, + ReindexSessionBarReader, +) +from zipline.testing.fixtures import ( + WithBcolzEquityMinuteBarReader, + WithBcolzEquityDailyBarReader, + WithBcolzFutureMinuteBarReader, + WithTradingSessions, + ZiplineTestCase, +) + +OHLC = ['open', 'high', 'low', 'close'] + + +class AssetDispatchSessionBarTestCase(WithBcolzEquityDailyBarReader, + WithBcolzFutureMinuteBarReader, + WithTradingSessions, + ZiplineTestCase): + + TRADING_CALENDAR_STRS = ('CME', 'NYSE') + TRADING_CALENDAR_PRIMARY_CAL = 'CME' + + ASSET_FINDER_EQUITY_SIDS = 1, 2, 3 + + START_DATE = Timestamp('2016-08-22', tz='UTC') + END_DATE = Timestamp('2016-08-24', tz='UTC') + + @classmethod + def make_future_minute_bar_data(cls): + m_opens = [ + cls.trading_calendar.open_and_close_for_session(session)[0] + for session in cls.trading_sessions['CME']] + yield 10001, DataFrame({ + 'open': [10000.5, 10001.5, nan], + 'high': [10000.9, 10001.9, nan], + 'low': [10000.1, 10001.1, nan], + 'close': [10000.3, 10001.3, nan], + 'volume': [1000, 1001, 0], + }, index=m_opens) + yield 10002, DataFrame({ + 'open': [20000.5, nan, 20002.5], + 'high': [20000.9, nan, 20002.9], + 'low': [20000.1, nan, 20002.1], + 'close': [20000.3, nan, 20002.3], + 'volume': [2000, 0, 2002], + }, index=m_opens) + yield 10003, DataFrame({ + 'open': [nan, 30001.5, 30002.5], + 'high': [nan, 30001.9, 30002.9], + 'low': [nan, 30001.1, 30002.1], + 'close': [nan, 30001.3, 30002.3], + 'volume': [0, 3001, 3002], + }, index=m_opens) + + @classmethod + def make_equity_daily_bar_data(cls): + sessions = cls.trading_sessions['NYSE'] + yield 1, DataFrame({ + 'open': [100.5, 101.5, nan], + 'high': [100.9, 101.9, nan], + 'low': [100.1, 101.1, nan], + 'close': [100.3, 101.3, nan], + 'volume': [1000, 1001, 0], + }, index=sessions) + yield 2, DataFrame({ + 'open': [200.5, nan, 202.5], + 'high': [200.9, nan, 202.9], + 'low': [200.1, nan, 202.1], + 'close': [200.3, nan, 202.3], + 'volume': [2000, 0, 2002], + }, index=sessions) + yield 3, DataFrame({ + 'open': [301.5, 302.5, nan], + 'high': [301.9, 302.9, nan], + 'low': [301.1, 302.1, nan], + 'close': [301.3, 302.3, nan], + 'volume': [3001, 3002, 0], + }, index=sessions) + + @classmethod + def make_futures_info(cls): + return DataFrame({ + 'sid': [10001, 10002, 10003], + 'root_symbol': ['FOO', 'BAR', 'BAZ'], + 'symbol': ['FOOA', 'BARA', 'BAZA'], + 'start_date': [cls.START_DATE] * 3, + 'end_date': [cls.END_DATE] * 3, + # TODO: Make separate from 'end_date' + 'notice_date': [cls.END_DATE] * 3, + 'expiration_date': [cls.END_DATE] * 3, + 'multiplier': [500] * 3, + 'exchange': ['CME'] * 3, + }) + + @classmethod + def init_class_fixtures(cls): + super(AssetDispatchSessionBarTestCase, cls).init_class_fixtures() + + readers = { + Equity: ReindexSessionBarReader( + cls.trading_calendar, + cls.bcolz_equity_daily_bar_reader, + cls.START_DATE, + cls.END_DATE), + Future: MinuteResampleSessionBarReader( + cls.trading_calendar, + cls.bcolz_future_minute_bar_reader, + ) + } + cls.dispatch_reader = AssetDispatchSessionBarReader( + cls.trading_calendar, + cls.asset_finder, + readers + ) + + def test_load_raw_arrays(self): + sessions = self.trading_calendar.sessions_in_range( + self.START_DATE, self.END_DATE) + + results = self.dispatch_reader.load_raw_arrays( + ['high', 'volume'], + sessions[0], sessions[2], [2, 10003, 1, 10001]) + + expected_per_sid = ( + (2, [array([200.9, nan, 202.9]), + array([2000, 0, 2002])], + "sid=2 should have values on the first and third sessions."), + (10003, [array([nan, 30001.9, 30002.9]), + array([0, 3001, 3002])], + "sid=10003 should have values on the second and third sessions."), + (1, [array([100.9, 101.90, nan]), + array([1000, 1001, 0])], + "sid=1 should have values on the first and second sessions."), + (10001, [array([10000.9, 10001.9, nan]), + array([1000, 1001, 0])], + "sid=10001 should have a values on the first and second " + "sessions."), + ) + + for i, (sid, expected, msg) in enumerate(expected_per_sid): + for j, result in enumerate(results): + assert_almost_equal(result[:, i], expected[j], err_msg=msg) + + +class AssetDispatchMinuteBarTestCase(WithBcolzEquityMinuteBarReader, + WithBcolzFutureMinuteBarReader, + ZiplineTestCase): + + TRADING_CALENDAR_STRS = ('CME', 'NYSE') + TRADING_CALENDAR_PRIMARY_CAL = 'CME' + + ASSET_FINDER_EQUITY_SIDS = 1, 2, 3 + + START_DATE = Timestamp('2016-08-24', tz='UTC') + END_DATE = Timestamp('2016-08-24', tz='UTC') + + @classmethod + def make_equity_minute_bar_data(cls): + minutes = cls.trading_calendars[Equity].minutes_for_session( + cls.START_DATE) + yield 1, DataFrame({ + 'open': [100.5, 101.5], + 'high': [100.9, 101.9], + 'low': [100.1, 101.1], + 'close': [100.3, 101.3], + 'volume': [1000, 1001], + }, index=minutes[[0, 1]]) + yield 2, DataFrame({ + 'open': [200.5, 202.5], + 'high': [200.9, 202.9], + 'low': [200.1, 202.1], + 'close': [200.3, 202.3], + 'volume': [2000, 2002], + }, index=minutes[[0, 2]]) + yield 3, DataFrame({ + 'open': [301.5, 302.5], + 'high': [301.9, 302.9], + 'low': [301.1, 302.1], + 'close': [301.3, 302.3], + 'volume': [3001, 3002], + }, index=minutes[[1, 2]]) + + @classmethod + def make_future_minute_bar_data(cls): + e_m = cls.trading_calendars[Equity].minutes_for_session( + cls.START_DATE) + f_m = cls.trading_calendar.minutes_for_session( + cls.START_DATE) + # Equity market open occurs at loc 930 in Future minutes. + minutes = [f_m[0], e_m[0], e_m[1]] + yield 10001, DataFrame({ + 'open': [10000.5, 10930.5, 10931.5], + 'high': [10000.9, 10930.9, 10931.9], + 'low': [10000.1, 10930.1, 10931.1], + 'close': [10000.3, 10930.3, 10931.3], + 'volume': [1000, 1930, 1931], + }, index=minutes) + minutes = [f_m[1], e_m[1], e_m[2]] + yield 10002, DataFrame({ + 'open': [20001.5, 20931.5, 20932.5], + 'high': [20001.9, 20931.9, 20932.9], + 'low': [20001.1, 20931.1, 20932.1], + 'close': [20001.3, 20931.3, 20932.3], + 'volume': [2001, 2931, 2932], + }, index=minutes) + minutes = [f_m[2], e_m[0], e_m[2]] + yield 10003, DataFrame({ + 'open': [30002.5, 30930.5, 30932.5], + 'high': [30002.9, 30930.9, 30932.9], + 'low': [30002.1, 30930.1, 30932.1], + 'close': [30002.3, 30930.3, 30932.3], + 'volume': [3002, 3930, 3932], + }, index=minutes) + + @classmethod + def make_futures_info(cls): + return DataFrame({ + 'sid': [10001, 10002, 10003], + 'root_symbol': ['FOO', 'BAR', 'BAZ'], + 'symbol': ['FOOA', 'BARA', 'BAZA'], + 'start_date': [cls.START_DATE] * 3, + 'end_date': [cls.END_DATE] * 3, + # TODO: Make separate from 'end_date' + 'notice_date': [cls.END_DATE] * 3, + 'expiration_date': [cls.END_DATE] * 3, + 'multiplier': [500] * 3, + 'exchange': ['CME'] * 3, + }) + + @classmethod + def init_class_fixtures(cls): + super(AssetDispatchMinuteBarTestCase, cls).init_class_fixtures() + + readers = { + Equity: ReindexMinuteBarReader( + cls.trading_calendar, + cls.bcolz_equity_minute_bar_reader, + cls.START_DATE, + cls.END_DATE), + Future: cls.bcolz_future_minute_bar_reader + } + cls.dispatch_reader = AssetDispatchMinuteBarReader( + cls.trading_calendar, + cls.asset_finder, + readers + ) + + def test_load_raw_arrays_at_future_session_open(self): + f_minutes = self.trading_calendar.minutes_for_session(self.START_DATE) + + results = self.dispatch_reader.load_raw_arrays( + ['open', 'close'], + f_minutes[0], f_minutes[2], [2, 10003, 1, 10001]) + + expected_per_sid = ( + (2, [array([nan, nan, nan]), + array([nan, nan, nan])], + "Before Equity market open, sid=2 should have no values."), + (10003, [array([nan, nan, 30002.5]), + array([nan, nan, 30002.3])], + "sid=10003 should have a value at the 22:03 occurring " + "before the session label, which will be the third minute."), + (1, [array([nan, nan, nan]), + array([nan, nan, nan])], + "Before Equity market open, sid=1 should have no values."), + (10001, [array([10000.5, nan, nan]), + array([10000.3, nan, nan])], + "sid=10001 should have a value at the market open."), + ) + + for i, (sid, expected, msg) in enumerate(expected_per_sid): + for j, result in enumerate(results): + assert_almost_equal(result[:, i], expected[j], err_msg=msg) + + results = self.dispatch_reader.load_raw_arrays( + ['open'], f_minutes[0], f_minutes[2], [2, 10003, 1, 10001]) + + def test_load_raw_arrays_at_equity_session_open(self): + e_minutes = self.trading_calendars[Equity].minutes_for_session( + self.START_DATE) + + results = self.dispatch_reader.load_raw_arrays( + ['open', 'high'], e_minutes[0], e_minutes[2], + [10002, 1, 3, 10001]) + + expected_per_sid = ( + (10002, [array([nan, 20931.5, 20932.5]), + array([nan, 20931.9, 20932.9])], + "At Equity market open, sid=10002 should have values at the " + "second and third minute."), + (1, [array([100.5, 101.5, nan]), + array([100.9, 101.9, nan])], + "At Equity market open, sid=1 should have values at the first " + "and second minute."), + (3, [array([nan, 301.5, 302.5]), + array([nan, 301.9, 302.9])], + "At Equity market open, sid=3 should have a values at the second " + "and third minute."), + (10001, [array([10930.5, 10931.5, nan]), + array([10930.9, 10931.9, nan])], + "At Equity market open, sid=10001 should have a values at the " + "first and second minute."), + ) + + for i, (sid, expected, msg) in enumerate(expected_per_sid): + for j, result in enumerate(results): + assert_almost_equal(result[:, i], expected[j], err_msg=msg) diff --git a/tests/data/test_resample.py b/tests/data/test_resample.py index 2629009f..c63ed0e1 100644 --- a/tests/data/test_resample.py +++ b/tests/data/test_resample.py @@ -486,13 +486,16 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader, ) for sid in self.ASSET_FINDER_FUTURE_SIDS: case_frame = FUTURE_CASES[sid] - first = case_frame.index[0] - last = case_frame.index[-1] + first = calendar.minute_to_session_label( + case_frame.index[0]) + last = calendar.minute_to_session_label( + case_frame.index[-1]) result = session_bar_reader.load_raw_arrays( - ['open', 'high', 'low', 'close', 'volume'], - first, last, [sid]) - assert_almost_equal(result, EXPECTED_SESSIONS[sid], - err_msg="sid={0}".format(sid)) + OHLCV, first, last, [sid]) + for i, field in enumerate(OHLCV): + assert_almost_equal( + result[i], EXPECTED_SESSIONS[sid][[field]], + err_msg="sid={0} field={1}".format(sid, field)) def test_sessions(self): calendar = self.trading_calendar diff --git a/zipline/data/dispatch_bar_reader.py b/zipline/data/dispatch_bar_reader.py new file mode 100644 index 00000000..aedf808f --- /dev/null +++ b/zipline/data/dispatch_bar_reader.py @@ -0,0 +1,130 @@ +# +# Copyright 2016 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from abc import ABCMeta, abstractmethod + +from numpy import ( + full, + nan, + uint32, + zeros +) +from six import iteritems, with_metaclass + +from zipline.utils.memoize import lazyval + + +class AssetDispatchBarReader(with_metaclass(ABCMeta)): + """ + + Parameters + ---------- + - trading_calendar : zipline.utils.trading_calendar.TradingCalendar + - asset_finder : zipline.assets.AssetFinder + - readers : dict + A dict mapping Asset type to the corresponding + [Minute|Session]BarReader + """ + def __init__(self, trading_calendar, asset_finder, readers): + self._trading_calendar = trading_calendar + self._asset_finder = asset_finder + self._readers = readers + + for t, r in iteritems(self._readers): + assert trading_calendar == r.trading_calendar, \ + "All readers must share target trading_calendar. " \ + "Reader={0} for type={1} uses calendar={2} which does not " \ + "match the desired shared calendar={3} ".format( + r, t, r.trading_calendar, trading_calendar) + + @abstractmethod + def _dt_window_size(self, start_dt, end_dt): + pass + + @property + def _asset_types(self): + return self._readers.keys() + + def _make_raw_array_shape(self, start_dt, end_dt, num_sids): + return self._dt_window_size(start_dt, end_dt), num_sids + + def _make_raw_array_out(self, field, shape): + if field != 'volume': + out = full(shape, nan) + else: + out = zeros(shape, dtype=uint32) + return out + + @property + def trading_calendar(self): + return self._trading_calendar + + @lazyval + def last_available_dt(self): + return min(r.last_available_dt for r in self._readers.values) + + @lazyval + def first_trading_day(self): + return max(r.first_trading_day for r in self._readers.values) + + def get_value(self, sid, dt, field): + asset = self.asset_finder.retrieve_asset(sid) + r = self._readers[type(asset)] + return r.get_value(sid, dt, field) + + def get_last_traded_dt(self, asset, dt): + r = self._readers[type(asset)] + return r.get_value(asset, dt) + + def load_raw_arrays(self, fields, start_dt, end_dt, sids): + asset_types = self._asset_types + sid_groups = {t: [] for t in asset_types} + out_pos = {t: [] for t in asset_types} + + assets = self._asset_finder.retrieve_all(sids) + + for i, asset in enumerate(assets): + t = type(asset) + sid_groups[t].append(asset.sid) + out_pos[t].append(i) + + batched_arrays = { + t: self._readers[t].load_raw_arrays(fields, + start_dt, + end_dt, + sid_groups[t]) + for t in asset_types if sid_groups[t]} + + results = [] + shape = self._make_raw_array_shape(start_dt, end_dt, len(sids)) + + for i, field in enumerate(fields): + out = self._make_raw_array_out(field, shape) + for t, arrays in iteritems(batched_arrays): + out[:, out_pos[t]] = arrays[i] + results.append(out) + + return results + + +class AssetDispatchMinuteBarReader(AssetDispatchBarReader): + + def _dt_window_size(self, start_dt, end_dt): + return len(self.trading_calendar.minutes_in_range(start_dt, end_dt)) + + +class AssetDispatchSessionBarReader(AssetDispatchBarReader): + + def _dt_window_size(self, start_dt, end_dt): + return len(self.trading_calendar.sessions_in_range(start_dt, end_dt)) diff --git a/zipline/data/minute_bars.py b/zipline/data/minute_bars.py index de5de473..25d8f582 100644 --- a/zipline/data/minute_bars.py +++ b/zipline/data/minute_bars.py @@ -764,8 +764,8 @@ class BcolzMinuteBarWriter(object): table = self._ensure_ctable(sid) tds = self._session_labels - input_first_day = pd.Timestamp(dts[0].astype('datetime64[D]'), - tz='UTC') + input_first_day = self._calendar.minute_to_session_label( + pd.Timestamp(dts[0])) last_date = self.last_date_in_output_for_sid(sid) @@ -888,6 +888,10 @@ class BcolzMinuteBarReader(MinuteBarReader): def _get_metadata(self): return BcolzMinuteBarMetadata.read(self._rootdir) + @property + def trading_calendar(self): + return self.calendar + @lazyval def last_available_dt(self): _, close = self.calendar.open_and_close_for_session(self._end_session) diff --git a/zipline/data/resample.py b/zipline/data/resample.py index 06af49db..c9f4605a 100644 --- a/zipline/data/resample.py +++ b/zipline/data/resample.py @@ -468,16 +468,37 @@ class MinuteResampleSessionBarReader(SessionBarReader): minute_data = self._minute_bar_reader.load_raw_arrays( columns, start_dt, end_dt, assets) dts = self._calendar.minutes_in_range(start_dt, end_dt) - minute_frame = DataFrame( - [d.T[0] for d in minute_data], index=columns, columns=dts).T - return minute_to_session(minute_frame, self._calendar) + frames = [] + for i, _ in enumerate(assets): + minute_frame = DataFrame((d.T[i] for d in minute_data), + index=columns, columns=dts).T + df = minute_to_session(minute_frame, self._calendar) + frames.append(df) + return frames @property def trading_calendar(self): return self._calendar - def load_raw_arrays(self, columns, start_dt, end_dt, assets): - return self._get_resampled(columns, start_dt, end_dt, assets).values + def load_raw_arrays(self, columns, start_dt, end_dt, sids): + sessions = self._calendar.sessions_in_range(start_dt, end_dt) + range_open, _ = self._calendar.open_and_close_for_session( + start_dt) + _, range_close = self._calendar.open_and_close_for_session( + end_dt) + shape = len(sessions), len(sids) + results = [] + for col in columns: + if col != 'volume': + out = np.full(shape, np.nan) + else: + out = np.zeros(shape, dtype=np.uint32) + results.append(out) + frames = self._get_resampled(columns, range_open, range_close, sids) + for i, result in enumerate(results): + for j, frame in enumerate(frames): + result[:, j] = frame.values[:, i] + return results def get_value(self, sid, session, colname): # WARNING: This will need caching or other optimization if used in a @@ -485,7 +506,7 @@ class MinuteResampleSessionBarReader(SessionBarReader): # This was developed to complete interface, but has not been tuned # for real world use. start, end = self._calendar.open_and_close_for_session(session) - frame = self._get_resampled([colname], start, end, [sid]) + frame = self._get_resampled([colname], start, end, [sid])[0] return frame.loc[session, colname] @lazyval @@ -590,8 +611,11 @@ class ReindexBarReader(with_metaclass(ABCMeta)): outer_results = [] - inner_results = self._reader.load_raw_arrays( - fields, inner_dts[0], inner_dts[-1], sids) + if len(inner_dts) > 0: + inner_results = self._reader.load_raw_arrays( + fields, inner_dts[0], inner_dts[-1], sids) + else: + inner_results = None for i, field in enumerate(fields): if field != 'volume': @@ -599,7 +623,8 @@ class ReindexBarReader(with_metaclass(ABCMeta)): else: out = np.zeros(shape, dtype=np.uint32) - out[indices] = inner_results[i] + if inner_results is not None: + out[indices] = inner_results[i] outer_results.append(out)