diff --git a/etc/requirements.txt b/etc/requirements.txt index 9f1b4f77..70b47a50 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -69,3 +69,5 @@ lru-dict==1.1.4 # For financial risk calculations empyrical==0.2.1 + +tables==3.3.0 diff --git a/tests/data/test_minute_bars.py b/tests/data/test_minute_bars.py index 76a12739..f469365a 100644 --- a/tests/data/test_minute_bars.py +++ b/tests/data/test_minute_bars.py @@ -42,7 +42,9 @@ from zipline.data.minute_bars import ( BcolzMinuteBarReader, BcolzMinuteOverlappingData, US_EQUITIES_MINUTES_PER_DAY, - BcolzMinuteWriterColumnMismatch + BcolzMinuteWriterColumnMismatch, + H5MinuteBarUpdateWriter, + H5MinuteBarUpdateReader, ) from zipline.testing.fixtures import ( @@ -1146,3 +1148,55 @@ class BcolzMinuteBarTestCase(WithTradingCalendars, "The last traded dt should be before the early " "close, even when data is written between the early " "close and the next open.") + + def test_minute_updates(self): + """ + Test minute updates. + """ + start_minute = self.market_opens[TEST_CALENDAR_START] + minutes = [start_minute, + start_minute + Timedelta('1 min'), + start_minute + Timedelta('2 min')] + sids = [1, 2] + data_1 = DataFrame( + data={ + 'open': [15.0, nan, 15.1], + 'high': [17.0, nan, 17.1], + 'low': [11.0, nan, 11.1], + 'close': [14.0, nan, 14.1], + 'volume': [1000, 0, 1001] + }, + index=minutes) + + data_2 = DataFrame( + data={ + 'open': [25.0, nan, 25.1], + 'high': [27.0, nan, 27.1], + 'low': [21.0, nan, 21.1], + 'close': [24.0, nan, 24.1], + 'volume': [2000, 0, 2001] + }, + index=minutes) + + frames = {1: data_1, 2: data_2} + update_path = self.instance_tmpdir.getpath('updates.h5') + update_writer = H5MinuteBarUpdateWriter(update_path) + update_writer.write(frames) + + update_reader = H5MinuteBarUpdateReader(update_path) + self.writer.write(update_reader.read(minutes, sids)) + + # Refresh the reader since truncate update the metadata. + reader = BcolzMinuteBarReader(self.dest) + + columns = ['open', 'high', 'low', 'close', 'volume'] + sids = [sids[0], sids[1]] + arrays = list(map(transpose, reader.load_raw_arrays( + columns, minutes[0], minutes[-1], sids, + ))) + + data = {sids[0]: data_1, sids[1]: data_2} + + for i, col in enumerate(columns): + for j, sid in enumerate(sids): + assert_almost_equal(data[sid][col], arrays[i][j]) diff --git a/zipline/data/minute_bars.py b/zipline/data/minute_bars.py index c2d44c45..c5141172 100644 --- a/zipline/data/minute_bars.py +++ b/zipline/data/minute_bars.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABCMeta, abstractmethod import json import os from glob import glob @@ -24,6 +25,9 @@ from intervaltree import IntervalTree import logbook import numpy as np import pandas as pd +from pandas import HDFStore +import tables +from six import with_metaclass from toolz import keymap, valmap from zipline.data._minute_bar_internal import ( @@ -1242,3 +1246,92 @@ class BcolzMinuteBarReader(MinuteBarReader): results.append(out) return results + + +class MinuteBarUpdateReader(with_metaclass(ABCMeta, object)): + """ + Abstract base class for minute update readers. + """ + + @abstractmethod + def read(self, dts, sids): + """ + Read and return pricing update data. + + Parameters + ---------- + dts : DatetimeIndex + The minutes for which to read the pricing updates. + sids : iter[int] + The sids for which to read the pricing updates. + + Returns + ------- + data : iter[(int, DataFrame)] + Returns an iterable of ``sid`` to the corresponding OHLCV data. + """ + raise NotImplementedError() + + +class H5MinuteBarUpdateWriter(object): + """ + Writer for files containing minute bar updates for consumption by a writer + for a ``MinuteBarReader`` format. + + Parameters + ---------- + path : str + The destination path. + complevel : int, optional + The HDF5 complevel, defaults to ``5``. + complib : str, optional + The HDF5 complib, defaults to ``zlib``. + """ + + FORMAT_VERSION = 0 + + _COMPLEVEL = 5 + _COMPLIB = 'zlib' + + def __init__(self, path, complevel=None, complib=None): + self._complevel = complevel if complevel \ + is not None else self._COMPLEVEL + self._complib = complib if complib \ + is not None else self._COMPLIB + self._path = path + + def write(self, frames): + """ + Write the frames to the target HDF5 file, using the format used by + ``pd.Panel.to_hdf`` + + Parameters + ---------- + frames : iter[(int, DataFrame)] or dict[int -> DataFrame] + An iterable or other mapping of sid to the corresponding OHLCV + pricing data. + """ + with HDFStore(self._path, 'w', + complevel=self._complevel, complib=self._complib) \ + as store: + panel = pd.Panel.from_dict(dict(frames)) + panel.to_hdf(store, 'updates') + with tables.open_file(self._path, mode='r+') as h5file: + h5file.set_node_attr('/', 'version', 0) + + +class H5MinuteBarUpdateReader(MinuteBarUpdateReader): + """ + Reader for minute bar updates stored in HDF5 files. + + Parameters + ---------- + path : str + The path of the HDF5 file from which to source data. + """ + def __init__(self, path): + self._panel = pd.read_hdf(path) + + def read(self, dts, sids): + panel = self._panel[sids, dts, :] + return panel.iteritems()