Merge pull request #1433 from quantopian/dispatch-bar-reader

MAINT: Add a reader which dispatches on asset type
This commit is contained in:
Eddie Hebert
2016-08-26 08:44:20 -04:00
committed by GitHub
5 changed files with 511 additions and 17 deletions
+332
View File
@@ -0,0 +1,332 @@
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from numpy import array, nan
from numpy.testing import assert_almost_equal
from pandas import DataFrame, Timestamp
from zipline.assets import Equity, Future
from zipline.data.dispatch_bar_reader import (
AssetDispatchMinuteBarReader,
AssetDispatchSessionBarReader,
)
from zipline.data.resample import (
MinuteResampleSessionBarReader,
ReindexMinuteBarReader,
ReindexSessionBarReader,
)
from zipline.testing.fixtures import (
WithBcolzEquityMinuteBarReader,
WithBcolzEquityDailyBarReader,
WithBcolzFutureMinuteBarReader,
WithTradingSessions,
ZiplineTestCase,
)
OHLC = ['open', 'high', 'low', 'close']
class AssetDispatchSessionBarTestCase(WithBcolzEquityDailyBarReader,
WithBcolzFutureMinuteBarReader,
WithTradingSessions,
ZiplineTestCase):
TRADING_CALENDAR_STRS = ('CME', 'NYSE')
TRADING_CALENDAR_PRIMARY_CAL = 'CME'
ASSET_FINDER_EQUITY_SIDS = 1, 2, 3
START_DATE = Timestamp('2016-08-22', tz='UTC')
END_DATE = Timestamp('2016-08-24', tz='UTC')
@classmethod
def make_future_minute_bar_data(cls):
m_opens = [
cls.trading_calendar.open_and_close_for_session(session)[0]
for session in cls.trading_sessions['CME']]
yield 10001, DataFrame({
'open': [10000.5, 10001.5, nan],
'high': [10000.9, 10001.9, nan],
'low': [10000.1, 10001.1, nan],
'close': [10000.3, 10001.3, nan],
'volume': [1000, 1001, 0],
}, index=m_opens)
yield 10002, DataFrame({
'open': [20000.5, nan, 20002.5],
'high': [20000.9, nan, 20002.9],
'low': [20000.1, nan, 20002.1],
'close': [20000.3, nan, 20002.3],
'volume': [2000, 0, 2002],
}, index=m_opens)
yield 10003, DataFrame({
'open': [nan, 30001.5, 30002.5],
'high': [nan, 30001.9, 30002.9],
'low': [nan, 30001.1, 30002.1],
'close': [nan, 30001.3, 30002.3],
'volume': [0, 3001, 3002],
}, index=m_opens)
@classmethod
def make_equity_daily_bar_data(cls):
sessions = cls.trading_sessions['NYSE']
yield 1, DataFrame({
'open': [100.5, 101.5, nan],
'high': [100.9, 101.9, nan],
'low': [100.1, 101.1, nan],
'close': [100.3, 101.3, nan],
'volume': [1000, 1001, 0],
}, index=sessions)
yield 2, DataFrame({
'open': [200.5, nan, 202.5],
'high': [200.9, nan, 202.9],
'low': [200.1, nan, 202.1],
'close': [200.3, nan, 202.3],
'volume': [2000, 0, 2002],
}, index=sessions)
yield 3, DataFrame({
'open': [301.5, 302.5, nan],
'high': [301.9, 302.9, nan],
'low': [301.1, 302.1, nan],
'close': [301.3, 302.3, nan],
'volume': [3001, 3002, 0],
}, index=sessions)
@classmethod
def make_futures_info(cls):
return DataFrame({
'sid': [10001, 10002, 10003],
'root_symbol': ['FOO', 'BAR', 'BAZ'],
'symbol': ['FOOA', 'BARA', 'BAZA'],
'start_date': [cls.START_DATE] * 3,
'end_date': [cls.END_DATE] * 3,
# TODO: Make separate from 'end_date'
'notice_date': [cls.END_DATE] * 3,
'expiration_date': [cls.END_DATE] * 3,
'multiplier': [500] * 3,
'exchange': ['CME'] * 3,
})
@classmethod
def init_class_fixtures(cls):
super(AssetDispatchSessionBarTestCase, cls).init_class_fixtures()
readers = {
Equity: ReindexSessionBarReader(
cls.trading_calendar,
cls.bcolz_equity_daily_bar_reader,
cls.START_DATE,
cls.END_DATE),
Future: MinuteResampleSessionBarReader(
cls.trading_calendar,
cls.bcolz_future_minute_bar_reader,
)
}
cls.dispatch_reader = AssetDispatchSessionBarReader(
cls.trading_calendar,
cls.asset_finder,
readers
)
def test_load_raw_arrays(self):
sessions = self.trading_calendar.sessions_in_range(
self.START_DATE, self.END_DATE)
results = self.dispatch_reader.load_raw_arrays(
['high', 'volume'],
sessions[0], sessions[2], [2, 10003, 1, 10001])
expected_per_sid = (
(2, [array([200.9, nan, 202.9]),
array([2000, 0, 2002])],
"sid=2 should have values on the first and third sessions."),
(10003, [array([nan, 30001.9, 30002.9]),
array([0, 3001, 3002])],
"sid=10003 should have values on the second and third sessions."),
(1, [array([100.9, 101.90, nan]),
array([1000, 1001, 0])],
"sid=1 should have values on the first and second sessions."),
(10001, [array([10000.9, 10001.9, nan]),
array([1000, 1001, 0])],
"sid=10001 should have a values on the first and second "
"sessions."),
)
for i, (sid, expected, msg) in enumerate(expected_per_sid):
for j, result in enumerate(results):
assert_almost_equal(result[:, i], expected[j], err_msg=msg)
class AssetDispatchMinuteBarTestCase(WithBcolzEquityMinuteBarReader,
WithBcolzFutureMinuteBarReader,
ZiplineTestCase):
TRADING_CALENDAR_STRS = ('CME', 'NYSE')
TRADING_CALENDAR_PRIMARY_CAL = 'CME'
ASSET_FINDER_EQUITY_SIDS = 1, 2, 3
START_DATE = Timestamp('2016-08-24', tz='UTC')
END_DATE = Timestamp('2016-08-24', tz='UTC')
@classmethod
def make_equity_minute_bar_data(cls):
minutes = cls.trading_calendars[Equity].minutes_for_session(
cls.START_DATE)
yield 1, DataFrame({
'open': [100.5, 101.5],
'high': [100.9, 101.9],
'low': [100.1, 101.1],
'close': [100.3, 101.3],
'volume': [1000, 1001],
}, index=minutes[[0, 1]])
yield 2, DataFrame({
'open': [200.5, 202.5],
'high': [200.9, 202.9],
'low': [200.1, 202.1],
'close': [200.3, 202.3],
'volume': [2000, 2002],
}, index=minutes[[0, 2]])
yield 3, DataFrame({
'open': [301.5, 302.5],
'high': [301.9, 302.9],
'low': [301.1, 302.1],
'close': [301.3, 302.3],
'volume': [3001, 3002],
}, index=minutes[[1, 2]])
@classmethod
def make_future_minute_bar_data(cls):
e_m = cls.trading_calendars[Equity].minutes_for_session(
cls.START_DATE)
f_m = cls.trading_calendar.minutes_for_session(
cls.START_DATE)
# Equity market open occurs at loc 930 in Future minutes.
minutes = [f_m[0], e_m[0], e_m[1]]
yield 10001, DataFrame({
'open': [10000.5, 10930.5, 10931.5],
'high': [10000.9, 10930.9, 10931.9],
'low': [10000.1, 10930.1, 10931.1],
'close': [10000.3, 10930.3, 10931.3],
'volume': [1000, 1930, 1931],
}, index=minutes)
minutes = [f_m[1], e_m[1], e_m[2]]
yield 10002, DataFrame({
'open': [20001.5, 20931.5, 20932.5],
'high': [20001.9, 20931.9, 20932.9],
'low': [20001.1, 20931.1, 20932.1],
'close': [20001.3, 20931.3, 20932.3],
'volume': [2001, 2931, 2932],
}, index=minutes)
minutes = [f_m[2], e_m[0], e_m[2]]
yield 10003, DataFrame({
'open': [30002.5, 30930.5, 30932.5],
'high': [30002.9, 30930.9, 30932.9],
'low': [30002.1, 30930.1, 30932.1],
'close': [30002.3, 30930.3, 30932.3],
'volume': [3002, 3930, 3932],
}, index=minutes)
@classmethod
def make_futures_info(cls):
return DataFrame({
'sid': [10001, 10002, 10003],
'root_symbol': ['FOO', 'BAR', 'BAZ'],
'symbol': ['FOOA', 'BARA', 'BAZA'],
'start_date': [cls.START_DATE] * 3,
'end_date': [cls.END_DATE] * 3,
# TODO: Make separate from 'end_date'
'notice_date': [cls.END_DATE] * 3,
'expiration_date': [cls.END_DATE] * 3,
'multiplier': [500] * 3,
'exchange': ['CME'] * 3,
})
@classmethod
def init_class_fixtures(cls):
super(AssetDispatchMinuteBarTestCase, cls).init_class_fixtures()
readers = {
Equity: ReindexMinuteBarReader(
cls.trading_calendar,
cls.bcolz_equity_minute_bar_reader,
cls.START_DATE,
cls.END_DATE),
Future: cls.bcolz_future_minute_bar_reader
}
cls.dispatch_reader = AssetDispatchMinuteBarReader(
cls.trading_calendar,
cls.asset_finder,
readers
)
def test_load_raw_arrays_at_future_session_open(self):
f_minutes = self.trading_calendar.minutes_for_session(self.START_DATE)
results = self.dispatch_reader.load_raw_arrays(
['open', 'close'],
f_minutes[0], f_minutes[2], [2, 10003, 1, 10001])
expected_per_sid = (
(2, [array([nan, nan, nan]),
array([nan, nan, nan])],
"Before Equity market open, sid=2 should have no values."),
(10003, [array([nan, nan, 30002.5]),
array([nan, nan, 30002.3])],
"sid=10003 should have a value at the 22:03 occurring "
"before the session label, which will be the third minute."),
(1, [array([nan, nan, nan]),
array([nan, nan, nan])],
"Before Equity market open, sid=1 should have no values."),
(10001, [array([10000.5, nan, nan]),
array([10000.3, nan, nan])],
"sid=10001 should have a value at the market open."),
)
for i, (sid, expected, msg) in enumerate(expected_per_sid):
for j, result in enumerate(results):
assert_almost_equal(result[:, i], expected[j], err_msg=msg)
results = self.dispatch_reader.load_raw_arrays(
['open'], f_minutes[0], f_minutes[2], [2, 10003, 1, 10001])
def test_load_raw_arrays_at_equity_session_open(self):
e_minutes = self.trading_calendars[Equity].minutes_for_session(
self.START_DATE)
results = self.dispatch_reader.load_raw_arrays(
['open', 'high'], e_minutes[0], e_minutes[2],
[10002, 1, 3, 10001])
expected_per_sid = (
(10002, [array([nan, 20931.5, 20932.5]),
array([nan, 20931.9, 20932.9])],
"At Equity market open, sid=10002 should have values at the "
"second and third minute."),
(1, [array([100.5, 101.5, nan]),
array([100.9, 101.9, nan])],
"At Equity market open, sid=1 should have values at the first "
"and second minute."),
(3, [array([nan, 301.5, 302.5]),
array([nan, 301.9, 302.9])],
"At Equity market open, sid=3 should have a values at the second "
"and third minute."),
(10001, [array([10930.5, 10931.5, nan]),
array([10930.9, 10931.9, nan])],
"At Equity market open, sid=10001 should have a values at the "
"first and second minute."),
)
for i, (sid, expected, msg) in enumerate(expected_per_sid):
for j, result in enumerate(results):
assert_almost_equal(result[:, i], expected[j], err_msg=msg)
+9 -6
View File
@@ -486,13 +486,16 @@ class TestResampleSessionBars(WithBcolzFutureMinuteBarReader,
)
for sid in self.ASSET_FINDER_FUTURE_SIDS:
case_frame = FUTURE_CASES[sid]
first = case_frame.index[0]
last = case_frame.index[-1]
first = calendar.minute_to_session_label(
case_frame.index[0])
last = calendar.minute_to_session_label(
case_frame.index[-1])
result = session_bar_reader.load_raw_arrays(
['open', 'high', 'low', 'close', 'volume'],
first, last, [sid])
assert_almost_equal(result, EXPECTED_SESSIONS[sid],
err_msg="sid={0}".format(sid))
OHLCV, first, last, [sid])
for i, field in enumerate(OHLCV):
assert_almost_equal(
result[i], EXPECTED_SESSIONS[sid][[field]],
err_msg="sid={0} field={1}".format(sid, field))
def test_sessions(self):
calendar = self.trading_calendar
+130
View File
@@ -0,0 +1,130 @@
#
# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
from numpy import (
full,
nan,
uint32,
zeros
)
from six import iteritems, with_metaclass
from zipline.utils.memoize import lazyval
class AssetDispatchBarReader(with_metaclass(ABCMeta)):
"""
Parameters
----------
- trading_calendar : zipline.utils.trading_calendar.TradingCalendar
- asset_finder : zipline.assets.AssetFinder
- readers : dict
A dict mapping Asset type to the corresponding
[Minute|Session]BarReader
"""
def __init__(self, trading_calendar, asset_finder, readers):
self._trading_calendar = trading_calendar
self._asset_finder = asset_finder
self._readers = readers
for t, r in iteritems(self._readers):
assert trading_calendar == r.trading_calendar, \
"All readers must share target trading_calendar. " \
"Reader={0} for type={1} uses calendar={2} which does not " \
"match the desired shared calendar={3} ".format(
r, t, r.trading_calendar, trading_calendar)
@abstractmethod
def _dt_window_size(self, start_dt, end_dt):
pass
@property
def _asset_types(self):
return self._readers.keys()
def _make_raw_array_shape(self, start_dt, end_dt, num_sids):
return self._dt_window_size(start_dt, end_dt), num_sids
def _make_raw_array_out(self, field, shape):
if field != 'volume':
out = full(shape, nan)
else:
out = zeros(shape, dtype=uint32)
return out
@property
def trading_calendar(self):
return self._trading_calendar
@lazyval
def last_available_dt(self):
return min(r.last_available_dt for r in self._readers.values)
@lazyval
def first_trading_day(self):
return max(r.first_trading_day for r in self._readers.values)
def get_value(self, sid, dt, field):
asset = self.asset_finder.retrieve_asset(sid)
r = self._readers[type(asset)]
return r.get_value(sid, dt, field)
def get_last_traded_dt(self, asset, dt):
r = self._readers[type(asset)]
return r.get_value(asset, dt)
def load_raw_arrays(self, fields, start_dt, end_dt, sids):
asset_types = self._asset_types
sid_groups = {t: [] for t in asset_types}
out_pos = {t: [] for t in asset_types}
assets = self._asset_finder.retrieve_all(sids)
for i, asset in enumerate(assets):
t = type(asset)
sid_groups[t].append(asset.sid)
out_pos[t].append(i)
batched_arrays = {
t: self._readers[t].load_raw_arrays(fields,
start_dt,
end_dt,
sid_groups[t])
for t in asset_types if sid_groups[t]}
results = []
shape = self._make_raw_array_shape(start_dt, end_dt, len(sids))
for i, field in enumerate(fields):
out = self._make_raw_array_out(field, shape)
for t, arrays in iteritems(batched_arrays):
out[:, out_pos[t]] = arrays[i]
results.append(out)
return results
class AssetDispatchMinuteBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
return len(self.trading_calendar.minutes_in_range(start_dt, end_dt))
class AssetDispatchSessionBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
return len(self.trading_calendar.sessions_in_range(start_dt, end_dt))
+6 -2
View File
@@ -764,8 +764,8 @@ class BcolzMinuteBarWriter(object):
table = self._ensure_ctable(sid)
tds = self._session_labels
input_first_day = pd.Timestamp(dts[0].astype('datetime64[D]'),
tz='UTC')
input_first_day = self._calendar.minute_to_session_label(
pd.Timestamp(dts[0]))
last_date = self.last_date_in_output_for_sid(sid)
@@ -888,6 +888,10 @@ class BcolzMinuteBarReader(MinuteBarReader):
def _get_metadata(self):
return BcolzMinuteBarMetadata.read(self._rootdir)
@property
def trading_calendar(self):
return self.calendar
@lazyval
def last_available_dt(self):
_, close = self.calendar.open_and_close_for_session(self._end_session)
+34 -9
View File
@@ -468,16 +468,37 @@ class MinuteResampleSessionBarReader(SessionBarReader):
minute_data = self._minute_bar_reader.load_raw_arrays(
columns, start_dt, end_dt, assets)
dts = self._calendar.minutes_in_range(start_dt, end_dt)
minute_frame = DataFrame(
[d.T[0] for d in minute_data], index=columns, columns=dts).T
return minute_to_session(minute_frame, self._calendar)
frames = []
for i, _ in enumerate(assets):
minute_frame = DataFrame((d.T[i] for d in minute_data),
index=columns, columns=dts).T
df = minute_to_session(minute_frame, self._calendar)
frames.append(df)
return frames
@property
def trading_calendar(self):
return self._calendar
def load_raw_arrays(self, columns, start_dt, end_dt, assets):
return self._get_resampled(columns, start_dt, end_dt, assets).values
def load_raw_arrays(self, columns, start_dt, end_dt, sids):
sessions = self._calendar.sessions_in_range(start_dt, end_dt)
range_open, _ = self._calendar.open_and_close_for_session(
start_dt)
_, range_close = self._calendar.open_and_close_for_session(
end_dt)
shape = len(sessions), len(sids)
results = []
for col in columns:
if col != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
results.append(out)
frames = self._get_resampled(columns, range_open, range_close, sids)
for i, result in enumerate(results):
for j, frame in enumerate(frames):
result[:, j] = frame.values[:, i]
return results
def get_value(self, sid, session, colname):
# WARNING: This will need caching or other optimization if used in a
@@ -485,7 +506,7 @@ class MinuteResampleSessionBarReader(SessionBarReader):
# This was developed to complete interface, but has not been tuned
# for real world use.
start, end = self._calendar.open_and_close_for_session(session)
frame = self._get_resampled([colname], start, end, [sid])
frame = self._get_resampled([colname], start, end, [sid])[0]
return frame.loc[session, colname]
@lazyval
@@ -590,8 +611,11 @@ class ReindexBarReader(with_metaclass(ABCMeta)):
outer_results = []
inner_results = self._reader.load_raw_arrays(
fields, inner_dts[0], inner_dts[-1], sids)
if len(inner_dts) > 0:
inner_results = self._reader.load_raw_arrays(
fields, inner_dts[0], inner_dts[-1], sids)
else:
inner_results = None
for i, field in enumerate(fields):
if field != 'volume':
@@ -599,7 +623,8 @@ class ReindexBarReader(with_metaclass(ABCMeta)):
else:
out = np.zeros(shape, dtype=np.uint32)
out[indices] = inner_results[i]
if inner_results is not None:
out[indices] = inner_results[i]
outer_results.append(out)