mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 07:51:59 +08:00
MAINT: Handle gaps in input to daily bars writer (#1778)
Previously, a dataframe passed into BcolzDailyBarWriter.write that was missing an expected session between its first and last sessions would be written incorrectly. Upon converting the dataframe to a ctable, the values for all days following the gap would be shifted backwards, and nans would be shifted in at the end. This commit handles the issue by asserting that the number of rows in the input table matches the number of sessions in the calendar between the table's first and last sessions. Also fixes a test that was mistakenly using minutes_in_range where it should have been using sessions_in_range (uncovered by this change).
This commit is contained in:
@@ -31,6 +31,7 @@ from pandas.util.testing import assert_index_equal
|
||||
|
||||
from zipline.data.us_equity_pricing import (
|
||||
BcolzDailyBarReader,
|
||||
BcolzDailyBarWriter,
|
||||
NoDataBeforeDate,
|
||||
NoDataAfterDate,
|
||||
)
|
||||
@@ -44,7 +45,10 @@ from zipline.pipeline.loaders.synthetic import (
|
||||
)
|
||||
from zipline.testing import seconds_to_timestamp
|
||||
from zipline.testing.fixtures import (
|
||||
WithAssetFinder,
|
||||
WithBcolzEquityDailyBarReader,
|
||||
WithTmpDir,
|
||||
WithTradingCalendars,
|
||||
ZiplineTestCase,
|
||||
)
|
||||
from zipline.utils.calendars import get_calendar
|
||||
@@ -362,3 +366,42 @@ class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase):
|
||||
`load_raw_array`.
|
||||
"""
|
||||
BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize
|
||||
|
||||
|
||||
class BcolzDailyBarWriterMissingDataTestCase(WithAssetFinder,
|
||||
WithTmpDir,
|
||||
WithTradingCalendars,
|
||||
ZiplineTestCase):
|
||||
# Sid 3 is active from 2015-06-02 to 2015-06-30.
|
||||
MISSING_DATA_SID = 3
|
||||
# Leave out data for a day in the middle of the query range.
|
||||
MISSING_DATA_DAY = Timestamp('2015-06-15', tz='UTC')
|
||||
|
||||
@classmethod
|
||||
def make_equity_info(cls):
|
||||
return EQUITY_INFO.loc[EQUITY_INFO.index == cls.MISSING_DATA_SID]
|
||||
|
||||
def test_missing_values_assertion(self):
|
||||
sessions = self.trading_calendar.sessions_in_range(
|
||||
TEST_CALENDAR_START,
|
||||
TEST_CALENDAR_STOP,
|
||||
)
|
||||
|
||||
sessions_with_gap = sessions[sessions != self.MISSING_DATA_DAY]
|
||||
bar_data = make_bar_data(self.make_equity_info(), sessions_with_gap)
|
||||
|
||||
writer = BcolzDailyBarWriter(
|
||||
self.tmpdir.path,
|
||||
self.trading_calendar,
|
||||
sessions[0],
|
||||
sessions[-1],
|
||||
)
|
||||
|
||||
# There are 21 sessions between the start and end date for this
|
||||
# asset, and we excluded one.
|
||||
expected_msg = (
|
||||
'Got 20 rows for daily bars table with first day=2015-06-02, last '
|
||||
'day=2015-06-30, expected 21 rows.'
|
||||
)
|
||||
with self.assertRaisesRegexp(AssertionError, expected_msg):
|
||||
writer.write(bar_data)
|
||||
|
||||
@@ -2261,7 +2261,7 @@ class TestCapitalChanges(WithLogger,
|
||||
|
||||
@classmethod
|
||||
def make_equity_daily_bar_data(cls):
|
||||
days = cls.trading_calendar.minutes_in_range(
|
||||
days = cls.trading_calendar.sessions_in_range(
|
||||
pd.Timestamp('2006-01-03', tz='UTC'),
|
||||
pd.Timestamp('2006-01-09', tz='UTC')
|
||||
)
|
||||
|
||||
@@ -50,6 +50,7 @@ from six import (
|
||||
string_types,
|
||||
viewkeys,
|
||||
)
|
||||
from toolz import compose
|
||||
|
||||
from zipline.data.session_bars import SessionBarReader
|
||||
from zipline.data.bar_reader import (
|
||||
@@ -162,21 +163,6 @@ def winsorise_uint32(df, invalid_data_behavior, column, *columns):
|
||||
return df
|
||||
|
||||
|
||||
@expect_element(invalid_data_behavior={'warn', 'raise', 'ignore'})
|
||||
def to_ctable(raw_data, invalid_data_behavior):
|
||||
if isinstance(raw_data, ctable):
|
||||
# we already have a ctable so do nothing
|
||||
return raw_data
|
||||
|
||||
winsorise_uint32(raw_data, invalid_data_behavior, 'volume', *OHLC)
|
||||
processed = (raw_data[list(OHLC)] * 1000).astype('uint32')
|
||||
dates = raw_data.index.values.astype('datetime64[s]')
|
||||
check_uint32_safe(dates.max().view(np.int64), 'day')
|
||||
processed['day'] = dates.astype('uint32')
|
||||
processed['volume'] = raw_data.volume.astype('uint32')
|
||||
return ctable.fromdataframe(processed)
|
||||
|
||||
|
||||
class BcolzDailyBarWriter(object):
|
||||
"""
|
||||
Class capable of writing daily OHLCV data to disk in a format that can
|
||||
@@ -257,7 +243,10 @@ class BcolzDailyBarWriter(object):
|
||||
The newly-written table.
|
||||
"""
|
||||
ctx = maybe_show_progress(
|
||||
((sid, to_ctable(df, invalid_data_behavior)) for sid, df in data),
|
||||
(
|
||||
(sid, self.to_ctable(df, invalid_data_behavior))
|
||||
for sid, df in data
|
||||
),
|
||||
show_progress=show_progress,
|
||||
item_show_func=self.progress_bar_item_show_func,
|
||||
label=self.progress_bar_message,
|
||||
@@ -355,15 +344,30 @@ class BcolzDailyBarWriter(object):
|
||||
last_row[asset_key] = total_rows + nrows - 1
|
||||
total_rows += nrows
|
||||
|
||||
table_day_to_session = compose(
|
||||
self._calendar.minute_to_session_label,
|
||||
partial(Timestamp, unit='s', tz='UTC'),
|
||||
)
|
||||
asset_first_day = table_day_to_session(table['day'][0])
|
||||
asset_last_day = table_day_to_session(table['day'][-1])
|
||||
|
||||
asset_sessions = sessions[
|
||||
sessions.slice_indexer(asset_first_day, asset_last_day)
|
||||
]
|
||||
assert len(table) == len(asset_sessions), (
|
||||
'Got {} rows for daily bars table with first day={}, last '
|
||||
'day={}, expected {} rows.'.format(
|
||||
len(table),
|
||||
asset_first_day.date(),
|
||||
asset_last_day.date(),
|
||||
len(asset_sessions),
|
||||
)
|
||||
)
|
||||
|
||||
# Calculate the number of trading days between the first date
|
||||
# in the stored data and the first date of **this** asset. This
|
||||
# offset used for output alignment by the reader.
|
||||
asset_first_day = table['day'][0]
|
||||
calendar_offset[asset_key] = sessions.get_loc(
|
||||
self._calendar.minute_to_session_label(
|
||||
Timestamp(asset_first_day, unit='s', tz='UTC')
|
||||
)
|
||||
)
|
||||
calendar_offset[asset_key] = sessions.get_loc(asset_first_day)
|
||||
|
||||
# This writes the table to disk.
|
||||
full_table = ctable(
|
||||
@@ -389,6 +393,20 @@ class BcolzDailyBarWriter(object):
|
||||
full_table.flush()
|
||||
return full_table
|
||||
|
||||
@expect_element(invalid_data_behavior={'warn', 'raise', 'ignore'})
|
||||
def to_ctable(self, raw_data, invalid_data_behavior):
|
||||
if isinstance(raw_data, ctable):
|
||||
# we already have a ctable so do nothing
|
||||
return raw_data
|
||||
|
||||
winsorise_uint32(raw_data, invalid_data_behavior, 'volume', *OHLC)
|
||||
processed = (raw_data[list(OHLC)] * 1000).astype('uint32')
|
||||
dates = raw_data.index.values.astype('datetime64[s]')
|
||||
check_uint32_safe(dates.max().view(np.int64), 'day')
|
||||
processed['day'] = dates.astype('uint32')
|
||||
processed['volume'] = raw_data.volume.astype('uint32')
|
||||
return ctable.fromdataframe(processed)
|
||||
|
||||
|
||||
class BcolzDailyBarReader(SessionBarReader):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user