From 8217cdb1bd6c6555d28f29c124110220c9787ba3 Mon Sep 17 00:00:00 2001 From: Stewart Douglas Date: Wed, 11 May 2016 17:55:45 -0400 Subject: [PATCH] ENH: Allow BcolzMinuteBarWriter to append to most recent day Minutely data can now be appended to bcolz files even when minutes in the same day have already been written. For example, previously attempting to write data for the minute 2016-05-11 16:30 would raise an exception if any OHLCV data for 2016-05-11 had been written to the same file. Trying to overwrite existing minutes still raises a BcolzMinuteOverlappingData exception. Note that previously all sids' bcolz files ended at the same time. This is no longer necessarily the case. The last record in each sid's bcolz file now corresponds to the latest minute for which OHLCV data is provided to the writer. --- tests/data/test_minute_bars.py | 61 ++++++++++++++++++++++++++++++++++ zipline/data/minute_bars.py | 55 ++++++++++++++++++++++-------- 2 files changed, 102 insertions(+), 14 deletions(-) diff --git a/tests/data/test_minute_bars.py b/tests/data/test_minute_bars.py index edddf060..824e9341 100644 --- a/tests/data/test_minute_bars.py +++ b/tests/data/test_minute_bars.py @@ -331,6 +331,48 @@ class BcolzMinuteBarTestCase(TestCase): with self.assertRaises(BcolzMinuteOverlappingData): self.writer.write_sid(sid, data) + def test_append_to_same_day(self): + """ + Test writing data with the same date as existing data in our file. + """ + sid = 1 + + first_minute = self.market_opens[TEST_CALENDAR_START] + data = DataFrame( + data={ + 'open': [10.0], + 'high': [20.0], + 'low': [30.0], + 'close': [40.0], + 'volume': [50.0] + }, + index=[first_minute]) + self.writer.write_sid(sid, data) + + # Write data in the same day as the previous minute + second_minute = first_minute + Timedelta(minutes=1) + new_data = DataFrame( + data={ + 'open': [5.0], + 'high': [10.0], + 'low': [3.0], + 'close': [7.0], + 'volume': [10.0] + }, + index=[second_minute]) + self.writer.write_sid(sid, new_data) + + open_price = self.reader.get_value(sid, second_minute, 'open') + self.assertEquals(5.0, open_price) + high_price = self.reader.get_value(sid, second_minute, 'high') + self.assertEquals(10.0, high_price) + low_price = self.reader.get_value(sid, second_minute, 'low') + self.assertEquals(3.0, low_price) + close_price = self.reader.get_value(sid, second_minute, 'close') + self.assertEquals(7.0, close_price) + volume_price = self.reader.get_value(sid, second_minute, 'volume') + self.assertEquals(10.0, volume_price) + def test_write_multiple_sids(self): """ Test writing multiple sids. @@ -806,3 +848,22 @@ class BcolzMinuteBarTestCase(TestCase): Timestamp('2015-11-30 21:01:00', tz='UTC'), 'open'), 600) + + def test_set_sid_attrs(self): + """Confirm that we can set the attributes of a sid's file correctly. + """ + + sid = 1 + start_day = Timestamp('2015-11-27', tz='UTC') + end_day = Timestamp('2015-06-02', tz='UTC') + attrs = { + 'start_day': start_day.value / int(1e9), + 'end_day': end_day.value / int(1e9), + 'factor': 100, + } + + # Write the attributes + self.writer.set_sid_attrs(sid, **attrs) + # Read the attributes + for k, v in attrs.items(): + self.assertEqual(self.reader.get_sid_attr(sid, k), v) diff --git a/zipline/data/minute_bars.py b/zipline/data/minute_bars.py index 9574156a..5ba4c140 100644 --- a/zipline/data/minute_bars.py +++ b/zipline/data/minute_bars.py @@ -441,6 +441,13 @@ class BcolzMinuteBarWriter(object): assert new_last_date == date, "new_last_date={0} != date={1}".format( new_last_date, date) + def set_sid_attrs(self, sid, **kwargs): + """Write all the supplied kwargs as attributes of the sid's file. + """ + table = self._ensure_ctable(sid) + for k, v in kwargs.items(): + table.attrs[k] = v + def write(self, data, show_progress=False): """Write a stream of minute data. @@ -559,29 +566,37 @@ class BcolzMinuteBarWriter(object): tds = self._trading_days input_first_day = pd.Timestamp(dts[0].astype('datetime64[D]'), tz='UTC') - input_last_day = pd.Timestamp(dts[-1].astype('datetime64[D]'), - tz='UTC') last_date = self.last_date_in_output_for_sid(sid) - if last_date >= input_first_day: - raise BcolzMinuteOverlappingData(dedent(""" - Data with last_date={0} already includes input start={1} for - sid={2}""".strip()).format(last_date, input_first_day, sid)) - day_before_input = input_first_day - tds.freq self.pad(sid, day_before_input) table = self._ensure_ctable(sid) - days_to_write = tds[tds.slice_indexer(start=input_first_day, - end=input_last_day)] - - minutes_count = len(days_to_write) * self._minutes_per_day + # Get the number of minutes already recorded in this sid's ctable + num_rec_mins = table.size all_minutes = self._minute_index - indexer = all_minutes.slice_indexer(start=days_to_write[0]) - all_minutes_in_window = all_minutes[indexer] + # Get the latest minute we wish to write to the ctable + last_minute_to_write = dts[-1] + + # In the event that we've already written some minutely data to the + # ctable, guard against overwritting that data. + if num_rec_mins > 0: + last_recorded_minute = np.datetime64(all_minutes[num_rec_mins - 1]) + if last_minute_to_write <= last_recorded_minute: + raise BcolzMinuteOverlappingData(dedent(""" + Data with last_date={0} already includes input start={1} for + sid={2}""".strip()).format(last_date, input_first_day, sid)) + + latest_min_count = all_minutes.get_loc(last_minute_to_write) + + # Get all the minutes we wish to write (all market minutes after the + # latest currently written, up to and including last_minute_to_write) + all_minutes_in_window = all_minutes[num_rec_mins:latest_min_count + 1] + + minutes_count = all_minutes_in_window.size open_col = np.zeros(minutes_count, dtype=np.uint32) high_col = np.zeros(minutes_count, dtype=np.uint32) @@ -756,6 +771,15 @@ class BcolzMinuteBarReader(object): return carray + def get_sid_attr(self, sid, name): + sid_subdir = _sid_subdir_path(sid) + sid_path = os.path.join(self._rootdir, sid_subdir) + attrs = bcolz.attrs.attrs(sid_path, 'r') + try: + return attrs[name] + except KeyError: + return None + def get_value(self, sid, dt, field): """ Retrieve the pricing info for the given sid, dt, and field. @@ -791,7 +815,10 @@ class BcolzMinuteBarReader(object): self._last_get_value_dt_value = dt.value self._last_get_value_dt_position = minute_pos - value = self._open_minute_file(field, sid)[minute_pos] + try: + value = self._open_minute_file(field, sid)[minute_pos] + except IndexError: + value = 0 if value == 0: if field == 'volume': return 0