From 52667b4a90f1cdb93ef7df124ab9609b248ca014 Mon Sep 17 00:00:00 2001 From: Andrew Daniels Date: Wed, 3 May 2017 20:49:22 -0400 Subject: [PATCH] MAINT: Handle gaps in input to daily bars writer (#1778) Previously, a dataframe passed into BcolzDailyBarWriter.write that was missing an expected session between its first and last sessions would be written incorrectly. Upon converting the dataframe to a ctable, the values for all days following the gap would be shifted backwards, and nans would be shifted in at the end. This commit handles the issue by asserting that the number of rows in the input table matches the number of sessions in the calendar between the table's first and last sessions. Also fixes a test that was mistakenly using minutes_in_range where it should have been using sessions_in_range (uncovered by this change). --- tests/data/test_us_equity_pricing.py | 43 +++++++++++++++++++ tests/test_algorithm.py | 2 +- zipline/data/us_equity_pricing.py | 62 ++++++++++++++++++---------- 3 files changed, 84 insertions(+), 23 deletions(-) diff --git a/tests/data/test_us_equity_pricing.py b/tests/data/test_us_equity_pricing.py index 44633d11..12c4ce80 100644 --- a/tests/data/test_us_equity_pricing.py +++ b/tests/data/test_us_equity_pricing.py @@ -31,6 +31,7 @@ from pandas.util.testing import assert_index_equal from zipline.data.us_equity_pricing import ( BcolzDailyBarReader, + BcolzDailyBarWriter, NoDataBeforeDate, NoDataAfterDate, ) @@ -44,7 +45,10 @@ from zipline.pipeline.loaders.synthetic import ( ) from zipline.testing import seconds_to_timestamp from zipline.testing.fixtures import ( + WithAssetFinder, WithBcolzEquityDailyBarReader, + WithTmpDir, + WithTradingCalendars, ZiplineTestCase, ) from zipline.utils.calendars import get_calendar @@ -362,3 +366,42 @@ class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase): `load_raw_array`. """ BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize + + +class BcolzDailyBarWriterMissingDataTestCase(WithAssetFinder, + WithTmpDir, + WithTradingCalendars, + ZiplineTestCase): + # Sid 3 is active from 2015-06-02 to 2015-06-30. + MISSING_DATA_SID = 3 + # Leave out data for a day in the middle of the query range. + MISSING_DATA_DAY = Timestamp('2015-06-15', tz='UTC') + + @classmethod + def make_equity_info(cls): + return EQUITY_INFO.loc[EQUITY_INFO.index == cls.MISSING_DATA_SID] + + def test_missing_values_assertion(self): + sessions = self.trading_calendar.sessions_in_range( + TEST_CALENDAR_START, + TEST_CALENDAR_STOP, + ) + + sessions_with_gap = sessions[sessions != self.MISSING_DATA_DAY] + bar_data = make_bar_data(self.make_equity_info(), sessions_with_gap) + + writer = BcolzDailyBarWriter( + self.tmpdir.path, + self.trading_calendar, + sessions[0], + sessions[-1], + ) + + # There are 21 sessions between the start and end date for this + # asset, and we excluded one. + expected_msg = ( + 'Got 20 rows for daily bars table with first day=2015-06-02, last ' + 'day=2015-06-30, expected 21 rows.' + ) + with self.assertRaisesRegexp(AssertionError, expected_msg): + writer.write(bar_data) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 31853fd0..1c25fbfb 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -2261,7 +2261,7 @@ class TestCapitalChanges(WithLogger, @classmethod def make_equity_daily_bar_data(cls): - days = cls.trading_calendar.minutes_in_range( + days = cls.trading_calendar.sessions_in_range( pd.Timestamp('2006-01-03', tz='UTC'), pd.Timestamp('2006-01-09', tz='UTC') ) diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index 1ffe403c..a599b778 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -50,6 +50,7 @@ from six import ( string_types, viewkeys, ) +from toolz import compose from zipline.data.session_bars import SessionBarReader from zipline.data.bar_reader import ( @@ -162,21 +163,6 @@ def winsorise_uint32(df, invalid_data_behavior, column, *columns): return df -@expect_element(invalid_data_behavior={'warn', 'raise', 'ignore'}) -def to_ctable(raw_data, invalid_data_behavior): - if isinstance(raw_data, ctable): - # we already have a ctable so do nothing - return raw_data - - winsorise_uint32(raw_data, invalid_data_behavior, 'volume', *OHLC) - processed = (raw_data[list(OHLC)] * 1000).astype('uint32') - dates = raw_data.index.values.astype('datetime64[s]') - check_uint32_safe(dates.max().view(np.int64), 'day') - processed['day'] = dates.astype('uint32') - processed['volume'] = raw_data.volume.astype('uint32') - return ctable.fromdataframe(processed) - - class BcolzDailyBarWriter(object): """ Class capable of writing daily OHLCV data to disk in a format that can @@ -257,7 +243,10 @@ class BcolzDailyBarWriter(object): The newly-written table. """ ctx = maybe_show_progress( - ((sid, to_ctable(df, invalid_data_behavior)) for sid, df in data), + ( + (sid, self.to_ctable(df, invalid_data_behavior)) + for sid, df in data + ), show_progress=show_progress, item_show_func=self.progress_bar_item_show_func, label=self.progress_bar_message, @@ -355,15 +344,30 @@ class BcolzDailyBarWriter(object): last_row[asset_key] = total_rows + nrows - 1 total_rows += nrows + table_day_to_session = compose( + self._calendar.minute_to_session_label, + partial(Timestamp, unit='s', tz='UTC'), + ) + asset_first_day = table_day_to_session(table['day'][0]) + asset_last_day = table_day_to_session(table['day'][-1]) + + asset_sessions = sessions[ + sessions.slice_indexer(asset_first_day, asset_last_day) + ] + assert len(table) == len(asset_sessions), ( + 'Got {} rows for daily bars table with first day={}, last ' + 'day={}, expected {} rows.'.format( + len(table), + asset_first_day.date(), + asset_last_day.date(), + len(asset_sessions), + ) + ) + # Calculate the number of trading days between the first date # in the stored data and the first date of **this** asset. This # offset used for output alignment by the reader. - asset_first_day = table['day'][0] - calendar_offset[asset_key] = sessions.get_loc( - self._calendar.minute_to_session_label( - Timestamp(asset_first_day, unit='s', tz='UTC') - ) - ) + calendar_offset[asset_key] = sessions.get_loc(asset_first_day) # This writes the table to disk. full_table = ctable( @@ -389,6 +393,20 @@ class BcolzDailyBarWriter(object): full_table.flush() return full_table + @expect_element(invalid_data_behavior={'warn', 'raise', 'ignore'}) + def to_ctable(self, raw_data, invalid_data_behavior): + if isinstance(raw_data, ctable): + # we already have a ctable so do nothing + return raw_data + + winsorise_uint32(raw_data, invalid_data_behavior, 'volume', *OHLC) + processed = (raw_data[list(OHLC)] * 1000).astype('uint32') + dates = raw_data.index.values.astype('datetime64[s]') + check_uint32_safe(dates.max().view(np.int64), 'day') + processed['day'] = dates.astype('uint32') + processed['volume'] = raw_data.volume.astype('uint32') + return ctable.fromdataframe(processed) + class BcolzDailyBarReader(SessionBarReader): """