diff --git a/tests/data/test_us_equity_pricing.py b/tests/data/test_us_equity_pricing.py index 44633d11..12c4ce80 100644 --- a/tests/data/test_us_equity_pricing.py +++ b/tests/data/test_us_equity_pricing.py @@ -31,6 +31,7 @@ from pandas.util.testing import assert_index_equal from zipline.data.us_equity_pricing import ( BcolzDailyBarReader, + BcolzDailyBarWriter, NoDataBeforeDate, NoDataAfterDate, ) @@ -44,7 +45,10 @@ from zipline.pipeline.loaders.synthetic import ( ) from zipline.testing import seconds_to_timestamp from zipline.testing.fixtures import ( + WithAssetFinder, WithBcolzEquityDailyBarReader, + WithTmpDir, + WithTradingCalendars, ZiplineTestCase, ) from zipline.utils.calendars import get_calendar @@ -362,3 +366,42 @@ class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase): `load_raw_array`. """ BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize + + +class BcolzDailyBarWriterMissingDataTestCase(WithAssetFinder, + WithTmpDir, + WithTradingCalendars, + ZiplineTestCase): + # Sid 3 is active from 2015-06-02 to 2015-06-30. + MISSING_DATA_SID = 3 + # Leave out data for a day in the middle of the query range. + MISSING_DATA_DAY = Timestamp('2015-06-15', tz='UTC') + + @classmethod + def make_equity_info(cls): + return EQUITY_INFO.loc[EQUITY_INFO.index == cls.MISSING_DATA_SID] + + def test_missing_values_assertion(self): + sessions = self.trading_calendar.sessions_in_range( + TEST_CALENDAR_START, + TEST_CALENDAR_STOP, + ) + + sessions_with_gap = sessions[sessions != self.MISSING_DATA_DAY] + bar_data = make_bar_data(self.make_equity_info(), sessions_with_gap) + + writer = BcolzDailyBarWriter( + self.tmpdir.path, + self.trading_calendar, + sessions[0], + sessions[-1], + ) + + # There are 21 sessions between the start and end date for this + # asset, and we excluded one. + expected_msg = ( + 'Got 20 rows for daily bars table with first day=2015-06-02, last ' + 'day=2015-06-30, expected 21 rows.' + ) + with self.assertRaisesRegexp(AssertionError, expected_msg): + writer.write(bar_data) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 31853fd0..1c25fbfb 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -2261,7 +2261,7 @@ class TestCapitalChanges(WithLogger, @classmethod def make_equity_daily_bar_data(cls): - days = cls.trading_calendar.minutes_in_range( + days = cls.trading_calendar.sessions_in_range( pd.Timestamp('2006-01-03', tz='UTC'), pd.Timestamp('2006-01-09', tz='UTC') ) diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index 1ffe403c..a599b778 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -50,6 +50,7 @@ from six import ( string_types, viewkeys, ) +from toolz import compose from zipline.data.session_bars import SessionBarReader from zipline.data.bar_reader import ( @@ -162,21 +163,6 @@ def winsorise_uint32(df, invalid_data_behavior, column, *columns): return df -@expect_element(invalid_data_behavior={'warn', 'raise', 'ignore'}) -def to_ctable(raw_data, invalid_data_behavior): - if isinstance(raw_data, ctable): - # we already have a ctable so do nothing - return raw_data - - winsorise_uint32(raw_data, invalid_data_behavior, 'volume', *OHLC) - processed = (raw_data[list(OHLC)] * 1000).astype('uint32') - dates = raw_data.index.values.astype('datetime64[s]') - check_uint32_safe(dates.max().view(np.int64), 'day') - processed['day'] = dates.astype('uint32') - processed['volume'] = raw_data.volume.astype('uint32') - return ctable.fromdataframe(processed) - - class BcolzDailyBarWriter(object): """ Class capable of writing daily OHLCV data to disk in a format that can @@ -257,7 +243,10 @@ class BcolzDailyBarWriter(object): The newly-written table. """ ctx = maybe_show_progress( - ((sid, to_ctable(df, invalid_data_behavior)) for sid, df in data), + ( + (sid, self.to_ctable(df, invalid_data_behavior)) + for sid, df in data + ), show_progress=show_progress, item_show_func=self.progress_bar_item_show_func, label=self.progress_bar_message, @@ -355,15 +344,30 @@ class BcolzDailyBarWriter(object): last_row[asset_key] = total_rows + nrows - 1 total_rows += nrows + table_day_to_session = compose( + self._calendar.minute_to_session_label, + partial(Timestamp, unit='s', tz='UTC'), + ) + asset_first_day = table_day_to_session(table['day'][0]) + asset_last_day = table_day_to_session(table['day'][-1]) + + asset_sessions = sessions[ + sessions.slice_indexer(asset_first_day, asset_last_day) + ] + assert len(table) == len(asset_sessions), ( + 'Got {} rows for daily bars table with first day={}, last ' + 'day={}, expected {} rows.'.format( + len(table), + asset_first_day.date(), + asset_last_day.date(), + len(asset_sessions), + ) + ) + # Calculate the number of trading days between the first date # in the stored data and the first date of **this** asset. This # offset used for output alignment by the reader. - asset_first_day = table['day'][0] - calendar_offset[asset_key] = sessions.get_loc( - self._calendar.minute_to_session_label( - Timestamp(asset_first_day, unit='s', tz='UTC') - ) - ) + calendar_offset[asset_key] = sessions.get_loc(asset_first_day) # This writes the table to disk. full_table = ctable( @@ -389,6 +393,20 @@ class BcolzDailyBarWriter(object): full_table.flush() return full_table + @expect_element(invalid_data_behavior={'warn', 'raise', 'ignore'}) + def to_ctable(self, raw_data, invalid_data_behavior): + if isinstance(raw_data, ctable): + # we already have a ctable so do nothing + return raw_data + + winsorise_uint32(raw_data, invalid_data_behavior, 'volume', *OHLC) + processed = (raw_data[list(OHLC)] * 1000).astype('uint32') + dates = raw_data.index.values.astype('datetime64[s]') + check_uint32_safe(dates.max().view(np.int64), 'day') + processed['day'] = dates.astype('uint32') + processed['volume'] = raw_data.volume.astype('uint32') + return ctable.fromdataframe(processed) + class BcolzDailyBarReader(SessionBarReader): """