Files
catalyst/zipline/data/resample.py
T
Eddie Hebert d463a9855b TST/BUG: Cover all reindex session public methods.
Increase coverage on `ReindexSessionBarReader` so that all methods which
are considered part of the interface are covered by `test_resample`.

Fix bug in `get_value`, exposed by increased coverage, where the
`NoDataOnDate` exception was bubbling from the bcolz reader all the way
up when a session which was a holidy on the underlying reader was passed
to the reindex reader. (The reindex reader should return nan/0 in that
case.)

Also, move location of data index exceptions so that they are agnostic
to bcolz/us_equity_pricing; since the exception is now used by the
resample module to fix aforementioned bug.
2016-09-01 11:51:00 -04:00

669 lines
24 KiB
Python

# Copyright 2016 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from abc import ABCMeta, abstractmethod
import numpy as np
from numpy import nan
import pandas as pd
from pandas import DataFrame
from six import with_metaclass
from zipline.data.minute_bars import MinuteBarReader
from zipline.data.us_equity_pricing import NoDataOnDate
from zipline.data.session_bars import SessionBarReader
from zipline.utils.memoize import lazyval
_MINUTE_TO_SESSION_OHCLV_HOW = OrderedDict((
('open', 'first'),
('high', 'max'),
('low', 'min'),
('close', 'last'),
('volume', 'sum'),
))
def minute_to_session(minute_frame, calendar):
"""
Resample a DataFrame with minute data into the frame expected by a
BcolzDailyBarWriter.
Parameters
----------
minute_frame : pd.DataFrame
A DataFrame with the columns `open`, `high`, `low`, `close`, `volume`,
and `dt` (minute dts)
calendar : zipline.utils.calendars.trading_calendar.TradingCalendar
A TradingCalendar on which session labels to resample from minute
to session.
Return
------
session_frame : pd.DataFrame
A DataFrame with the columns `open`, `high`, `low`, `close`, `volume`,
and `day` (datetime-like).
"""
how = OrderedDict((c, _MINUTE_TO_SESSION_OHCLV_HOW[c])
for c in minute_frame.columns)
return minute_frame.groupby(calendar.minute_to_session_label).agg(
how)
class DailyHistoryAggregator(object):
"""
Converts minute pricing data into a daily summary, to be used for the
last slot in a call to history with a frequency of `1d`.
This summary is the same as a daily bar rollup of minute data, with the
distinction that the summary is truncated to the `dt` requested.
i.e. the aggregation slides forward during a the course of simulation day.
Provides aggregation for `open`, `high`, `low`, `close`, and `volume`.
The aggregation rules for each price type is documented in their respective
"""
def __init__(self, market_opens, minute_reader, trading_calendar):
self._market_opens = market_opens
self._minute_reader = minute_reader
self._trading_calendar = trading_calendar
# The caches are structured as (date, market_open, entries), where
# entries is a dict of asset -> (last_visited_dt, value)
#
# Whenever an aggregation method determines the current value,
# the entry for the respective asset should be overwritten with a new
# entry for the current dt.value (int) and aggregation value.
#
# When the requested dt's date is different from date the cache is
# flushed, so that the cache entries do not grow unbounded.
#
# Example cache:
# cache = (date(2016, 3, 17),
# pd.Timestamp('2016-03-17 13:31', tz='UTC'),
# {
# 1: (1458221460000000000, np.nan),
# 2: (1458221460000000000, 42.0),
# })
self._caches = {
'open': None,
'high': None,
'low': None,
'close': None,
'volume': None
}
# The int value is used for deltas to avoid extra computation from
# creating new Timestamps.
self._one_min = pd.Timedelta('1 min').value
def _prelude(self, dt, field):
session = self._trading_calendar.minute_to_session_label(dt)
dt_value = dt.value
cache = self._caches[field]
if cache is None or cache[0] != session:
market_open = self._market_opens.loc[session]
cache = self._caches[field] = (session, market_open, {})
_, market_open, entries = cache
market_open = market_open.tz_localize('UTC')
if dt != market_open:
prev_dt = dt_value - self._one_min
else:
prev_dt = None
return market_open, prev_dt, dt_value, entries
def opens(self, assets, dt):
"""
The open field's aggregation returns the first value that occurs
for the day, if there has been no data on or before the `dt` the open
is `nan`.
Once the first non-nan open is seen, that value remains constant per
asset for the remainder of the day.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open')
opens = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
opens.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'open')
entries[asset] = (dt_value, val)
opens.append(val)
continue
else:
try:
last_visited_dt, first_open = entries[asset]
if last_visited_dt == dt_value:
opens.append(first_open)
continue
elif not pd.isnull(first_open):
opens.append(first_open)
entries[asset] = (dt_value, first_open)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['open'],
after_last,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['open'],
market_open,
dt,
[asset],
)[0]
nonnan = window[~pd.isnull(window)]
if len(nonnan):
val = nonnan[0]
else:
val = np.nan
entries[asset] = (dt_value, val)
opens.append(val)
continue
return np.array(opens)
def highs(self, assets, dt):
"""
The high field's aggregation returns the largest high seen between
the market open and the current dt.
If there has been no data on or before the `dt` the high is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high')
highs = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
highs.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'high')
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
try:
last_visited_dt, last_max = entries[asset]
if last_visited_dt == dt_value:
highs.append(last_max)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'high')
if pd.isnull(curr_val):
val = last_max
elif pd.isnull(last_max):
val = curr_val
else:
val = max(last_max, curr_val)
entries[asset] = (dt_value, val)
highs.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['high'],
after_last,
dt,
[asset],
)[0].T
val = max(last_max, np.nanmax(window))
entries[asset] = (dt_value, val)
highs.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['high'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmax(window)
entries[asset] = (dt_value, val)
highs.append(val)
continue
return np.array(highs)
def lows(self, assets, dt):
"""
The low field's aggregation returns the smallest low seen between
the market open and the current dt.
If there has been no data on or before the `dt` the low is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low')
lows = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
lows.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'low')
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
try:
last_visited_dt, last_min = entries[asset]
if last_visited_dt == dt_value:
lows.append(last_min)
continue
elif last_visited_dt == prev_dt:
curr_val = self._minute_reader.get_value(
asset, dt, 'low')
val = np.nanmin([last_min, curr_val])
entries[asset] = (dt_value, val)
lows.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['low'],
after_last,
dt,
[asset],
)[0].T
window_min = np.nanmin(window)
if pd.isnull(window_min):
val = last_min
else:
val = min(last_min, window_min)
entries[asset] = (dt_value, val)
lows.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['low'],
market_open,
dt,
[asset],
)[0].T
val = np.nanmin(window)
entries[asset] = (dt_value, val)
lows.append(val)
continue
return np.array(lows)
def closes(self, assets, dt):
"""
The close field's aggregation returns the latest close at the given
dt.
If the close for the given dt is `nan`, the most recent non-nan
`close` is used.
If there has been no data on or before the `dt` the close is `nan`.
Returns
-------
np.array with dtype=float64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close')
closes = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
closes.append(np.NaN)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'close')
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
try:
last_visited_dt, last_close = entries[asset]
if last_visited_dt == dt_value:
closes.append(last_close)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = last_close
entries[asset] = (dt_value, val)
closes.append(val)
continue
else:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes(
[asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
except KeyError:
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes([asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
entries[asset] = (dt_value, val)
closes.append(val)
continue
return np.array(closes)
def volumes(self, assets, dt):
"""
The volume field's aggregation returns the sum of all volumes
between the market open and the `dt`
If there has been no data on or before the `dt` the volume is 0.
Returns
-------
np.array with dtype=int64, in order of assets parameter.
"""
market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume')
volumes = []
session_label = self._trading_calendar.minute_to_session_label(dt)
for asset in assets:
if not asset.is_alive_for_session(session_label):
volumes.append(0)
continue
if prev_dt is None:
val = self._minute_reader.get_value(asset, dt, 'volume')
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
try:
last_visited_dt, last_total = entries[asset]
if last_visited_dt == dt_value:
volumes.append(last_total)
continue
elif last_visited_dt == prev_dt:
val = self._minute_reader.get_value(
asset, dt, 'volume')
val += last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
else:
after_last = pd.Timestamp(
last_visited_dt + self._one_min, tz='UTC')
window = self._minute_reader.load_raw_arrays(
['volume'],
after_last,
dt,
[asset],
)[0]
val = np.nansum(window) + last_total
entries[asset] = (dt_value, val)
volumes.append(val)
continue
except KeyError:
window = self._minute_reader.load_raw_arrays(
['volume'],
market_open,
dt,
[asset],
)[0]
val = np.nansum(window)
entries[asset] = (dt_value, val)
volumes.append(val)
continue
return np.array(volumes)
class MinuteResampleSessionBarReader(SessionBarReader):
def __init__(self, calendar, minute_bar_reader):
self._calendar = calendar
self._minute_bar_reader = minute_bar_reader
def _get_resampled(self, columns, start_dt, end_dt, assets):
minute_data = self._minute_bar_reader.load_raw_arrays(
columns, start_dt, end_dt, assets)
dts = self._calendar.minutes_in_range(start_dt, end_dt)
frames = []
for i, _ in enumerate(assets):
minute_frame = DataFrame((d.T[i] for d in minute_data),
index=columns, columns=dts).T
df = minute_to_session(minute_frame, self._calendar)
frames.append(df)
return frames
@property
def trading_calendar(self):
return self._calendar
def load_raw_arrays(self, columns, start_dt, end_dt, sids):
sessions = self._calendar.sessions_in_range(start_dt, end_dt)
range_open, _ = self._calendar.open_and_close_for_session(
start_dt)
_, range_close = self._calendar.open_and_close_for_session(
end_dt)
shape = len(sessions), len(sids)
results = []
for col in columns:
if col != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
results.append(out)
frames = self._get_resampled(columns, range_open, range_close, sids)
for i, result in enumerate(results):
for j, frame in enumerate(frames):
result[:, j] = frame.values[:, i]
return results
def get_value(self, sid, session, colname):
# WARNING: This will need caching or other optimization if used in a
# tight loop.
# This was developed to complete interface, but has not been tuned
# for real world use.
start, end = self._calendar.open_and_close_for_session(session)
frame = self._get_resampled([colname], start, end, [sid])[0]
return frame.loc[session, colname]
@lazyval
def sessions(self):
cal = self._calendar
first = self._minute_bar_reader.first_trading_day
last = cal.minute_to_session_label(
self._minute_bar_reader.last_available_dt)
return cal.sessions_in_range(first, last)
@lazyval
def last_available_dt(self):
return self.trading_calendar.minute_to_session_label(
self._minute_bar_reader.last_available_dt
)
@property
def first_trading_day(self):
return self._minute_bar_reader.first_trading_day
class ReindexBarReader(with_metaclass(ABCMeta)):
"""
A base class for readers which reindexes results, filling in the additional
indices with empty data.
Used to align the reading assets which trade on different calendars.
Currently only supports a ``trading_calendar`` which is a superset of the
``reader``'s calendar.
Also, the currenty implementation only reindexes the results from
``load_raw_arrays``, but in the future, `get_value` may also be made to
provide an empty result instead of raising on error.
Parameters
----------
- trading_calendar : zipline.utils.trading_calendar.TradingCalendar
The calendar to use when indexing results from the reader.
- reader : MinuteBarReader|SessionBarReader
The reader which has a calendar that is a subset of the desired
``trading_calendar``.
- first_trading_session : pd.Timestamp
The first trading session the reader should provide. Must be specified,
since the ``reader``'s first session may not exactly align with the
desired calendar. Specifically, in the case where the first session
on the target calendar is a holiday on the ``reader``'s calendar.
- last_trading_session : pd.Timestamp
The last trading session the reader should provide. Must be specified,
since the ``reader``'s last session may not exactly align with the
desired calendar. Specifically, in the case where the last session
on the target calendar is a holiday on the ``reader``'s calendar.
"""
def __init__(self,
trading_calendar,
reader,
first_trading_session,
last_trading_session):
self._trading_calendar = trading_calendar
self._reader = reader
self._first_trading_session = first_trading_session
self._last_trading_session = last_trading_session
@property
def last_available_dt(self):
return self._reader.last_available_dt
def get_last_traded_dt(self, sid, dt):
return self._reader.get_last_traded_dt(sid, dt)
@property
def first_trading_day(self):
return self._reader.first_trading_day
def get_value(self, sid, dt, field):
try:
return self._reader.get_value(sid, dt, field)
except NoDataOnDate:
if field == 'volume':
return 0
else:
return nan
@abstractmethod
def _outer_dts(self, start_dt, end_dt):
raise NotImplementedError
@abstractmethod
def _inner_dts(self, start_dt, end_dt):
raise NotImplementedError
@property
def trading_calendar(self):
return self._trading_calendar
@lazyval
def sessions(self):
return self.trading_calendar.sessions_in_range(
self._first_trading_session,
self._last_trading_session
)
def load_raw_arrays(self, fields, start_dt, end_dt, sids):
outer_dts = self._outer_dts(start_dt, end_dt)
inner_dts = self._inner_dts(start_dt, end_dt)
indices = outer_dts.searchsorted(inner_dts)
shape = len(outer_dts), len(sids)
outer_results = []
if len(inner_dts) > 0:
inner_results = self._reader.load_raw_arrays(
fields, inner_dts[0], inner_dts[-1], sids)
else:
inner_results = None
for i, field in enumerate(fields):
if field != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
if inner_results is not None:
out[indices] = inner_results[i]
outer_results.append(out)
return outer_results
class ReindexMinuteBarReader(ReindexBarReader, MinuteBarReader):
"""
See: ``ReindexBarReader``
"""
def _outer_dts(self, start_dt, end_dt):
return self._trading_calendar.minutes_in_range(start_dt, end_dt)
def _inner_dts(self, start_dt, end_dt):
return self._reader.calendar.minutes_in_range(start_dt, end_dt)
class ReindexSessionBarReader(ReindexBarReader, SessionBarReader):
"""
See: ``ReindexBarReader``
"""
def _outer_dts(self, start_dt, end_dt):
return self.trading_calendar.sessions_in_range(start_dt, end_dt)
def _inner_dts(self, start_dt, end_dt):
return self._reader.trading_calendar.sessions_in_range(
start_dt, end_dt)