ENH: Add a reader writer pair for HDF5 minute bar updates.

This format is intended for storing data for all sids of an asset type,
e.g. equities or futures for a session. bcolz is not used to avoid the overhead
of creating the directories and files for each asset (which numbers around ~8000
for active equities) can be removed since the update is meant to be read at
once, instead of supporting the random access pattern needed by the simulation.

This patch only adds the reader/writer pair, with the management of finding the
paths to delta files and the application of the updates to the bcolz write left
to internal loader code.

Also, the update reader interface is intentionally constrained to the data for
an entire session to allow for an implementation that allows for mid-session updates.
This commit is contained in:
Eddie Hebert
2016-12-21 16:05:48 -05:00
committed by Eddie Hebert
parent 3095f8c573
commit e913519734
3 changed files with 150 additions and 1 deletions
+2
View File
@@ -69,3 +69,5 @@ lru-dict==1.1.4
# For financial risk calculations
empyrical==0.2.1
tables==3.3.0
+55 -1
View File
@@ -42,7 +42,9 @@ from zipline.data.minute_bars import (
BcolzMinuteBarReader,
BcolzMinuteOverlappingData,
US_EQUITIES_MINUTES_PER_DAY,
BcolzMinuteWriterColumnMismatch
BcolzMinuteWriterColumnMismatch,
H5MinuteBarUpdateWriter,
H5MinuteBarUpdateReader,
)
from zipline.testing.fixtures import (
@@ -1146,3 +1148,55 @@ class BcolzMinuteBarTestCase(WithTradingCalendars,
"The last traded dt should be before the early "
"close, even when data is written between the early "
"close and the next open.")
def test_minute_updates(self):
"""
Test minute updates.
"""
start_minute = self.market_opens[TEST_CALENDAR_START]
minutes = [start_minute,
start_minute + Timedelta('1 min'),
start_minute + Timedelta('2 min')]
sids = [1, 2]
data_1 = DataFrame(
data={
'open': [15.0, nan, 15.1],
'high': [17.0, nan, 17.1],
'low': [11.0, nan, 11.1],
'close': [14.0, nan, 14.1],
'volume': [1000, 0, 1001]
},
index=minutes)
data_2 = DataFrame(
data={
'open': [25.0, nan, 25.1],
'high': [27.0, nan, 27.1],
'low': [21.0, nan, 21.1],
'close': [24.0, nan, 24.1],
'volume': [2000, 0, 2001]
},
index=minutes)
frames = {1: data_1, 2: data_2}
update_path = self.instance_tmpdir.getpath('updates.h5')
update_writer = H5MinuteBarUpdateWriter(update_path)
update_writer.write(frames)
update_reader = H5MinuteBarUpdateReader(update_path)
self.writer.write(update_reader.read(minutes, sids))
# Refresh the reader since truncate update the metadata.
reader = BcolzMinuteBarReader(self.dest)
columns = ['open', 'high', 'low', 'close', 'volume']
sids = [sids[0], sids[1]]
arrays = list(map(transpose, reader.load_raw_arrays(
columns, minutes[0], minutes[-1], sids,
)))
data = {sids[0]: data_1, sids[1]: data_2}
for i, col in enumerate(columns):
for j, sid in enumerate(sids):
assert_almost_equal(data[sid][col], arrays[i][j])
+93
View File
@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
import json
import os
from glob import glob
@@ -24,6 +25,9 @@ from intervaltree import IntervalTree
import logbook
import numpy as np
import pandas as pd
from pandas import HDFStore
import tables
from six import with_metaclass
from toolz import keymap, valmap
from zipline.data._minute_bar_internal import (
@@ -1242,3 +1246,92 @@ class BcolzMinuteBarReader(MinuteBarReader):
results.append(out)
return results
class MinuteBarUpdateReader(with_metaclass(ABCMeta, object)):
"""
Abstract base class for minute update readers.
"""
@abstractmethod
def read(self, dts, sids):
"""
Read and return pricing update data.
Parameters
----------
dts : DatetimeIndex
The minutes for which to read the pricing updates.
sids : iter[int]
The sids for which to read the pricing updates.
Returns
-------
data : iter[(int, DataFrame)]
Returns an iterable of ``sid`` to the corresponding OHLCV data.
"""
raise NotImplementedError()
class H5MinuteBarUpdateWriter(object):
"""
Writer for files containing minute bar updates for consumption by a writer
for a ``MinuteBarReader`` format.
Parameters
----------
path : str
The destination path.
complevel : int, optional
The HDF5 complevel, defaults to ``5``.
complib : str, optional
The HDF5 complib, defaults to ``zlib``.
"""
FORMAT_VERSION = 0
_COMPLEVEL = 5
_COMPLIB = 'zlib'
def __init__(self, path, complevel=None, complib=None):
self._complevel = complevel if complevel \
is not None else self._COMPLEVEL
self._complib = complib if complib \
is not None else self._COMPLIB
self._path = path
def write(self, frames):
"""
Write the frames to the target HDF5 file, using the format used by
``pd.Panel.to_hdf``
Parameters
----------
frames : iter[(int, DataFrame)] or dict[int -> DataFrame]
An iterable or other mapping of sid to the corresponding OHLCV
pricing data.
"""
with HDFStore(self._path, 'w',
complevel=self._complevel, complib=self._complib) \
as store:
panel = pd.Panel.from_dict(dict(frames))
panel.to_hdf(store, 'updates')
with tables.open_file(self._path, mode='r+') as h5file:
h5file.set_node_attr('/', 'version', 0)
class H5MinuteBarUpdateReader(MinuteBarUpdateReader):
"""
Reader for minute bar updates stored in HDF5 files.
Parameters
----------
path : str
The path of the HDF5 file from which to source data.
"""
def __init__(self, path):
self._panel = pd.read_hdf(path)
def read(self, dts, sids):
panel = self._panel[sids, dts, :]
return panel.iteritems()