Files
catalyst/zipline/data/resample.py
T
Eddie Hebert 4a017ef63b ENH: Session bar reader resampled from minute data
Implement a `SessionBarReader` which uses a minute bar reader as a
backing source, resampling the minute bars into the box around the
corresponding session data.

Also, add future/CME test cases to resample suite.
2016-08-18 11:37:42 -04:00

501 lines
19 KiB
Python

# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
import numpy as np
import pandas as pd
from pandas import DataFrame
from zipline.data.session_bars import SessionBarReader
from zipline.utils.memoize import lazyval
_MINUTE_TO_SESSION_OHCLV_HOW = OrderedDict((
('open', 'first'),
('high', 'max'),
('low', 'min'),
('close', 'last'),
('volume', 'sum'),
))
def minute_to_session(minute_frame, calendar):
"""
Resample a DataFrame with minute data into the frame expected by a
BcolzDailyBarWriter.
Parameters
----------
minute_frame : pd.DataFrame
A DataFrame with the columns `open`, `high`, `low`, `close`, `volume`,
and `dt` (minute dts)
calendar : zipline.utils.calendars.trading_calendar.TradingCalendar
A TradingCalendar on which session labels to resample from minute
to session.
Return
------
session_frame : pd.DataFrame
A DataFrame with the columns `open`, `high`, `low`, `close`, `volume`,
and `day` (datetime-like).
"""
how = OrderedDict((c, _MINUTE_TO_SESSION_OHCLV_HOW[c])
for c in minute_frame.columns)
return minute_frame.groupby(calendar.minute_to_session_label).agg(
how)
class DailyHistoryAggregator(object):
"""
Converts minute pricing data into a daily summary, to be used for the
last slot in a call to history with a frequency of `1d`.
This summary is the same as a daily bar rollup of minute data, with the
distinction that the summary is truncated to the `dt` requested.
i.e. the aggregation slides forward during a the course of simulation day.
Provides aggregation for `open`, `high`, `low`, `close`, and `volume`.
The aggregation rules for each price type is documented in their respective
"""
def __init__(self, market_opens, minute_reader, trading_calendar):
self._market_opens = market_opens
self._minute_reader = minute_reader
self._trading_calendar = trading_calendar
# The caches are structured as (date, market_open, entries), where
# entries is a dict of asset -> (last_visited_dt, value)
#
# Whenever an aggregation method determines the current value,
# the entry for the respective asset should be overwritten with a new
# entry for the current dt.value (int) and aggregation value.
#
# When the requested dt's date is different from date the cache is
# flushed, so that the cache entries do not grow unbounded.
#
# Example cache:
# cache = (date(2016, 3, 17),
# pd.Timestamp('2016-03-17 13:31', tz='UTC'),
# {
# 1: (1458221460000000000, np.nan),
# 2: (1458221460000000000, 42.0),
# })
self._caches = {
'open': None,
'high': None,
'low': None,
'close': None,
'volume': None
}
# The int value is used for deltas to avoid extra computation from
# creating new Timestamps.
self._one_min = pd.Timedelta('1 min').value
def _prelude(self, dt, field):
date = dt.date()
dt_value = dt.value
cache = self._caches[field]
if cache is None or cache[0] != date:
market_open = self._market_opens.loc[date]
cache = self._caches[field] = (dt.date(), market_open, {})
_, market_open, entries = cache
market_open = market_open.tz_localize('UTC')
if dt != market_open:
prev_dt = dt_value - self._one_min
else:
prev_dt = None
return market_open, prev_dt, dt_value, entries
def opens(self, assets, dt):
"""
The open field's aggregation returns the first value that occurs
for the day, if there has been no data on or before the `dt` the open
is `nan`.
Once the first non-nan open is seen, that value remains constant per
asset for the remainder of the day.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open')
opens = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
opens.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'open')
entries[asset] = (dt_value, val)
opens.append(val)
continue
else:
try:
last_visited_dt, first_open = entries[asset]
if last_visited_dt == dt_value:
opens.append(first_open)
continue
elif not pd.isnull(first_open):
opens.append(first_open)
entries[asset] = (dt_value, first_open)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['open'],
after_last,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['open'],
market_open,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
return np.array(opens)
def highs(self, assets, dt):
"""
The high field's aggregation returns the largest high seen between
the market open and the current dt.
If there has been no data on or before the `dt` the high is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high')
highs = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
highs.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'high')
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
try:
last_visited_dt, last_max = entries[asset]
if last_visited_dt == dt_value:
highs.append(last_max)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'high')
if pd.isnull(curr_val):
val = last_max
elif pd.isnull(last_max):
val = curr_val
else:
val = max(last_max, curr_val)
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['high'],
after_last,
dt,
[asset],
)[0].T
val = max(last_max, np.nanmax(window))
entries[asset] = (dt_value, val)
highs.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['high'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmax(window)
entries[asset] = (dt_value, val)
highs.append(val)
continue
return np.array(highs)
def lows(self, assets, dt):
"""
The low field's aggregation returns the smallest low seen between
the market open and the current dt.
If there has been no data on or before the `dt` the low is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low')
lows = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
lows.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'low')
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
try:
last_visited_dt, last_min = entries[asset]
if last_visited_dt == dt_value:
lows.append(last_min)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'low')
val = np.nanmin([last_min, curr_val])
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['low'],
after_last,
dt,
[asset],
)[0].T
window_min = np.nanmin(window)
if pd.isnull(window_min):
val = last_min
else:
val = min(last_min, window_min)
entries[asset] = (dt_value, val)
lows.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['low'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmin(window)
entries[asset] = (dt_value, val)
lows.append(val)
continue
return np.array(lows)
def closes(self, assets, dt):
"""
The close field's aggregation returns the latest close at the given
dt.
If the close for the given dt is `nan`, the most recent non-nan
`close` is used.
If there has been no data on or before the `dt` the close is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close')
closes = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
closes.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'close')
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
try:
last_visited_dt, last_close = entries[asset]
if last_visited_dt == dt_value:
closes.append(last_close)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = last_close
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes(
[asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
except KeyError:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes([asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
return np.array(closes)
def volumes(self, assets, dt):
"""
The volume field's aggregation returns the sum of all volumes
between the market open and the `dt`
If there has been no data on or before the `dt` the volume is 0.
Returns
-------
np.array with dtype=int64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume')
volumes = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
volumes.append(0)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'volume')
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
try:
last_visited_dt, last_total = entries[asset]
if last_visited_dt == dt_value:
volumes.append(last_total)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'volume')
val += last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['volume'],
after_last,
dt,
[asset],
)[0]
val = np.nansum(window) + last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['volume'],
market_open,
dt,
[asset],
)[0]
val = np.nansum(window)
entries[asset] = (dt_value, val)
volumes.append(val)
continue
return np.array(volumes)
class MinuteResampleSessionBarReader(SessionBarReader):
def __init__(self, calendar, minute_bar_reader):
self._calendar = calendar
self._minute_bar_reader = minute_bar_reader
def _get_resampled(self, columns, start_dt, end_dt, assets):
minute_data = self._minute_bar_reader.load_raw_arrays(
columns, start_dt, end_dt, assets)
dts = self._calendar.minutes_in_range(start_dt, end_dt)
minute_frame = DataFrame(
[d.T[0] for d in minute_data], index=columns, columns=dts).T
return minute_to_session(minute_frame, self._calendar)
@property
def trading_calendar(self):
return self._calendar
def load_raw_arrays(self, columns, start_dt, end_dt, assets):
return self._get_resampled(columns, start_dt, end_dt, assets).values
def spot_price(self, sid, session, colname):
# WARNING: This will need caching or other optimization if used in a
# tight loop.
# This was developed to complete interface, but has not been tuned
# for real world use.
start, end = self._calendar.open_and_close_for_session(session)
frame = self._get_resampled([colname], start, end, [sid])
return frame.loc[session, colname]
@lazyval
def sessions(self):
cal = self._calendar
first = self._minute_bar_reader.first_trading_day
last = cal.minute_to_session_label(
self._minute_bar_reader.last_available_dt)
return cal.sessions_in_range(first, last)
@lazyval
def last_available_dt(self):
return self.trading_calendar.minute_to_session_label(
self._minute_bar_reader.last_available_dt
)