From 488721e8055594b024749dc20c4769807c193446 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Sun, 31 Jan 2016 18:39:39 -0500 Subject: [PATCH] ENH: Add padding method to minute bars writer. So that consumers can write empty days worth of data, without needing to construct a DataFrame with zero data force a write. The internal loader uses `last_date_in_output_for_sid` to signify that data has been attempted to be retrieved for all dates up until that, so that when resuming a job those retrieval of data for those dates are not re-attempted. Also, used to make the write logic cleaneer, by making it only necessary to create an array large enough for the given df. --- tests/data/test_minute_bars.py | 47 ++++++++++++++++++ zipline/data/minute_bars.py | 87 ++++++++++++++++++++++------------ 2 files changed, 105 insertions(+), 29 deletions(-) diff --git a/tests/data/test_minute_bars.py b/tests/data/test_minute_bars.py index d7570b3a..1496f605 100644 --- a/tests/data/test_minute_bars.py +++ b/tests/data/test_minute_bars.py @@ -23,6 +23,7 @@ from pandas import ( DataFrame, DatetimeIndex, Timestamp, + NaT ) from testfixtures import TempDirectory @@ -399,3 +400,49 @@ class BcolzMinuteBarTestCase(TestCase): volume_price = self.reader.get_value(sid, minute, 'volume') self.assertEquals(200.0, volume_price) + + def test_pad_data(self): + """ + Test writing empty data. + """ + sid = 1 + last_date = self.writer.last_date_in_output_for_sid(sid) + self.assertIs(last_date, NaT) + + self.writer.pad(sid, TEST_CALENDAR_START) + + last_date = self.writer.last_date_in_output_for_sid(sid) + self.assertEqual(last_date, TEST_CALENDAR_START) + + freq = self.market_opens.index.freq + minute = self.market_opens[TEST_CALENDAR_START + freq] + data = DataFrame( + data={ + 'open': [15.0], + 'high': [17.0], + 'low': [11.0], + 'close': [15.0], + 'volume': [100.0] + }, + index=[minute]) + self.writer.write(sid, data) + + open_price = self.reader.get_value(sid, minute, 'open') + + self.assertEquals(15.0, open_price) + + high_price = self.reader.get_value(sid, minute, 'high') + + self.assertEquals(17.0, high_price) + + low_price = self.reader.get_value(sid, minute, 'low') + + self.assertEquals(11.0, low_price) + + close_price = self.reader.get_value(sid, minute, 'close') + + self.assertEquals(15.0, close_price) + + volume_price = self.reader.get_value(sid, minute, 'volume') + + self.assertEquals(100.0, volume_price) diff --git a/zipline/data/minute_bars.py b/zipline/data/minute_bars.py index f1f18603..dc604dcd 100644 --- a/zipline/data/minute_bars.py +++ b/zipline/data/minute_bars.py @@ -282,6 +282,8 @@ class BcolzMinuteBarWriter(object): given sid. """ sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid)) + if not os.path.exists(sizes_path): + return pd.NaT with open(sizes_path, mode='r') as f: sizes = f.read() data = json.loads(sizes) @@ -344,6 +346,48 @@ class BcolzMinuteBarWriter(object): table.append([prepend_array] * 5) table.flush() + def pad(self, sid, date): + """ + Fill sid container with empty data through the specified date. + + e.g. if the date is two days after the last date in the sid's existing + output, 2 x `minute_per_day` worth of zeros will be added to the + output. + + Parameters: + ----------- + sid : int + The asset identifier for the data being written. + date : datetime-like + The date used to calculate how many slots to be pad. + The padding is done through the date, i.e. after the padding is + done the `last_date_in_output_for_sid` will be equal to `date` + """ + table = self._ensure_ctable(sid) + + last_date = self.last_date_in_output_for_sid(sid) + + tds = self._trading_days + + if date <= last_date or date < tds[0]: + # No need to pad. + return + + if last_date == pd.NaT: + # If there is no data, determine how many days to add so that + # desired days are written to the correct slots. + days_to_zerofill = tds[tds.slice_indexer(end=date)] + else: + days_to_zerofill = tds[tds.slice_indexer( + start=last_date + tds.freq, + end=date)] + + self._zerofill(table, len(days_to_zerofill)) + + new_last_date = self.last_date_in_output_for_sid(sid) + assert new_last_date == date, "new_last_date={0} != date={1}".format( + new_last_date, date) + def write(self, sid, df): """ Write the OHLCV data for the given sid. @@ -373,40 +417,25 @@ class BcolzMinuteBarWriter(object): """ table = self._ensure_ctable(sid) - last_date = self.last_date_in_output_for_sid(sid) tds = self._trading_days - days = tds[tds.slice_indexer(start=normalize_date(df.index[0]), - end=normalize_date(df.index[-1]))] - input_first_day = days[0] + input_first_day = normalize_date(df.index[0]) + input_last_day = normalize_date(df.index[-1]) - if last_date is pd.NaT: - # If there is no data, determine how many days to add so that - # desired days are written to the correct slots. - days_to_zerofill = tds[tds.slice_indexer(end=input_first_day)] - # Chop off the input first day. - days_to_zerofill = days_to_zerofill[:-1] - else: - next_date = last_date + 1 - if next_date < input_first_day: - # If last_date and input_first_day are not adjacent need to - # fill in between. - days_to_zerofill = tds[tds.slice_indexer( - start=last_date + 1, - end=input_first_day)] - # Chop off the input first day. - days_to_zerofill = days_to_zerofill[:-1] - elif next_date > input_first_day: - raise BcolzMinuteOverlappingData(dedent(""" - window start={0} is before expected write date={1} for - sid={2}""".strip()).format(days[0], input_first_day, sid)) - else: - days_to_zerofill = None + last_date = self.last_date_in_output_for_sid(sid) - if days_to_zerofill is not None and len(days_to_zerofill): - self._zerofill(table, len(days_to_zerofill)) + if last_date >= input_first_day: + raise BcolzMinuteOverlappingData(dedent(""" + Data with last_date={0} already includes input start={1} for + sid={2}""".strip()).format(last_date, input_first_day, sid)) + + day_before_input = input_first_day - tds.freq + + self.pad(sid, day_before_input) + table = self._ensure_ctable(sid) days_to_write = tds[tds.slice_indexer(start=input_first_day, - end=days[-1])] + end=input_last_day)] + minutes_count = len(days_to_write) * self._minutes_per_day all_minutes = self._minute_index