From e913519734698f42bbec47d256dfab4d7fbf472c Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Wed, 21 Dec 2016 16:05:48 -0500 Subject: [PATCH] ENH: Add a reader writer pair for HDF5 minute bar updates. This format is intended for storing data for all sids of an asset type, e.g. equities or futures for a session. bcolz is not used to avoid the overhead of creating the directories and files for each asset (which numbers around ~8000 for active equities) can be removed since the update is meant to be read at once, instead of supporting the random access pattern needed by the simulation. This patch only adds the reader/writer pair, with the management of finding the paths to delta files and the application of the updates to the bcolz write left to internal loader code. Also, the update reader interface is intentionally constrained to the data for an entire session to allow for an implementation that allows for mid-session updates. --- etc/requirements.txt | 2 + tests/data/test_minute_bars.py | 56 +++++++++++++++++++- zipline/data/minute_bars.py | 93 ++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) diff --git a/etc/requirements.txt b/etc/requirements.txt index 9f1b4f77..70b47a50 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -69,3 +69,5 @@ lru-dict==1.1.4 # For financial risk calculations empyrical==0.2.1 + +tables==3.3.0 diff --git a/tests/data/test_minute_bars.py b/tests/data/test_minute_bars.py index 76a12739..f469365a 100644 --- a/tests/data/test_minute_bars.py +++ b/tests/data/test_minute_bars.py @@ -42,7 +42,9 @@ from zipline.data.minute_bars import ( BcolzMinuteBarReader, BcolzMinuteOverlappingData, US_EQUITIES_MINUTES_PER_DAY, - BcolzMinuteWriterColumnMismatch + BcolzMinuteWriterColumnMismatch, + H5MinuteBarUpdateWriter, + H5MinuteBarUpdateReader, ) from zipline.testing.fixtures import ( @@ -1146,3 +1148,55 @@ class BcolzMinuteBarTestCase(WithTradingCalendars, "The last traded dt should be before the early " "close, even when data is written between the early " "close and the next open.") + + def test_minute_updates(self): + """ + Test minute updates. + """ + start_minute = self.market_opens[TEST_CALENDAR_START] + minutes = [start_minute, + start_minute + Timedelta('1 min'), + start_minute + Timedelta('2 min')] + sids = [1, 2] + data_1 = DataFrame( + data={ + 'open': [15.0, nan, 15.1], + 'high': [17.0, nan, 17.1], + 'low': [11.0, nan, 11.1], + 'close': [14.0, nan, 14.1], + 'volume': [1000, 0, 1001] + }, + index=minutes) + + data_2 = DataFrame( + data={ + 'open': [25.0, nan, 25.1], + 'high': [27.0, nan, 27.1], + 'low': [21.0, nan, 21.1], + 'close': [24.0, nan, 24.1], + 'volume': [2000, 0, 2001] + }, + index=minutes) + + frames = {1: data_1, 2: data_2} + update_path = self.instance_tmpdir.getpath('updates.h5') + update_writer = H5MinuteBarUpdateWriter(update_path) + update_writer.write(frames) + + update_reader = H5MinuteBarUpdateReader(update_path) + self.writer.write(update_reader.read(minutes, sids)) + + # Refresh the reader since truncate update the metadata. + reader = BcolzMinuteBarReader(self.dest) + + columns = ['open', 'high', 'low', 'close', 'volume'] + sids = [sids[0], sids[1]] + arrays = list(map(transpose, reader.load_raw_arrays( + columns, minutes[0], minutes[-1], sids, + ))) + + data = {sids[0]: data_1, sids[1]: data_2} + + for i, col in enumerate(columns): + for j, sid in enumerate(sids): + assert_almost_equal(data[sid][col], arrays[i][j]) diff --git a/zipline/data/minute_bars.py b/zipline/data/minute_bars.py index c2d44c45..c5141172 100644 --- a/zipline/data/minute_bars.py +++ b/zipline/data/minute_bars.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABCMeta, abstractmethod import json import os from glob import glob @@ -24,6 +25,9 @@ from intervaltree import IntervalTree import logbook import numpy as np import pandas as pd +from pandas import HDFStore +import tables +from six import with_metaclass from toolz import keymap, valmap from zipline.data._minute_bar_internal import ( @@ -1242,3 +1246,92 @@ class BcolzMinuteBarReader(MinuteBarReader): results.append(out) return results + + +class MinuteBarUpdateReader(with_metaclass(ABCMeta, object)): + """ + Abstract base class for minute update readers. + """ + + @abstractmethod + def read(self, dts, sids): + """ + Read and return pricing update data. + + Parameters + ---------- + dts : DatetimeIndex + The minutes for which to read the pricing updates. + sids : iter[int] + The sids for which to read the pricing updates. + + Returns + ------- + data : iter[(int, DataFrame)] + Returns an iterable of ``sid`` to the corresponding OHLCV data. + """ + raise NotImplementedError() + + +class H5MinuteBarUpdateWriter(object): + """ + Writer for files containing minute bar updates for consumption by a writer + for a ``MinuteBarReader`` format. + + Parameters + ---------- + path : str + The destination path. + complevel : int, optional + The HDF5 complevel, defaults to ``5``. + complib : str, optional + The HDF5 complib, defaults to ``zlib``. + """ + + FORMAT_VERSION = 0 + + _COMPLEVEL = 5 + _COMPLIB = 'zlib' + + def __init__(self, path, complevel=None, complib=None): + self._complevel = complevel if complevel \ + is not None else self._COMPLEVEL + self._complib = complib if complib \ + is not None else self._COMPLIB + self._path = path + + def write(self, frames): + """ + Write the frames to the target HDF5 file, using the format used by + ``pd.Panel.to_hdf`` + + Parameters + ---------- + frames : iter[(int, DataFrame)] or dict[int -> DataFrame] + An iterable or other mapping of sid to the corresponding OHLCV + pricing data. + """ + with HDFStore(self._path, 'w', + complevel=self._complevel, complib=self._complib) \ + as store: + panel = pd.Panel.from_dict(dict(frames)) + panel.to_hdf(store, 'updates') + with tables.open_file(self._path, mode='r+') as h5file: + h5file.set_node_attr('/', 'version', 0) + + +class H5MinuteBarUpdateReader(MinuteBarUpdateReader): + """ + Reader for minute bar updates stored in HDF5 files. + + Parameters + ---------- + path : str + The path of the HDF5 file from which to source data. + """ + def __init__(self, path): + self._panel = pd.read_hdf(path) + + def read(self, dts, sids): + panel = self._panel[sids, dts, :] + return panel.iteritems()