mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-01 09:25:40 +08:00
BUG: make the events loader handle empty raw data
TST: add test case for empty raw events data BUG: update for python compatibility MAINT: Simplify assertion for empty events case. DOC: Add comments on indexer unpacking. MAINT: move some config to test method
This commit is contained in:
@@ -267,6 +267,88 @@ class EventIndexerTestCase(ZiplineTestCase):
|
||||
self.assertEqual(computed_index, -1)
|
||||
|
||||
|
||||
class EventsLoaderEmptyTestCase(WithAssetFinder,
|
||||
WithTradingSessions,
|
||||
ZiplineTestCase):
|
||||
START_DATE = pd.Timestamp('2014-01-01')
|
||||
END_DATE = pd.Timestamp('2014-01-30')
|
||||
|
||||
@classmethod
|
||||
def init_class_fixtures(cls):
|
||||
cls.ASSET_FINDER_EQUITY_SIDS = [0, 1]
|
||||
cls.ASSET_FINDER_EQUITY_SYMBOLS = ['A', 'B']
|
||||
super(EventsLoaderEmptyTestCase, cls).init_class_fixtures()
|
||||
|
||||
def frame_containing_all_missing_values(self, index, columns):
|
||||
frame = pd.DataFrame(
|
||||
index=index,
|
||||
data={c.name: c.missing_value for c in EventDataSet.columns},
|
||||
)
|
||||
for c in columns:
|
||||
# The construction above produces columns of dtype `object` when
|
||||
# the missing value is string, but we expect categoricals in the
|
||||
# final result.
|
||||
if c.dtype == categorical_dtype:
|
||||
frame[c.name] = frame[c.name].astype('category')
|
||||
return frame
|
||||
|
||||
def test_load_empty(self):
|
||||
"""
|
||||
For the case where raw data is empty, make sure we have a result for
|
||||
all sids, that the dimensions are correct, and that we have the
|
||||
correct missing value.
|
||||
"""
|
||||
raw_events = pd.DataFrame(
|
||||
columns=["sid",
|
||||
"timestamp",
|
||||
"event_date",
|
||||
"float",
|
||||
"int",
|
||||
"datetime",
|
||||
"string"]
|
||||
)
|
||||
next_value_columns = {
|
||||
EventDataSet.next_datetime: 'datetime',
|
||||
EventDataSet.next_event_date: 'event_date',
|
||||
EventDataSet.next_float: 'float',
|
||||
EventDataSet.next_int: 'int',
|
||||
EventDataSet.next_string: 'string',
|
||||
EventDataSet.next_string_custom_missing: 'string'
|
||||
}
|
||||
previous_value_columns = {
|
||||
EventDataSet.previous_datetime: 'datetime',
|
||||
EventDataSet.previous_event_date: 'event_date',
|
||||
EventDataSet.previous_float: 'float',
|
||||
EventDataSet.previous_int: 'int',
|
||||
EventDataSet.previous_string: 'string',
|
||||
EventDataSet.previous_string_custom_missing: 'string'
|
||||
}
|
||||
loader = EventsLoader(
|
||||
raw_events, next_value_columns, previous_value_columns
|
||||
)
|
||||
engine = SimplePipelineEngine(
|
||||
lambda x: loader,
|
||||
self.trading_days,
|
||||
self.asset_finder,
|
||||
)
|
||||
|
||||
results = engine.run_pipeline(
|
||||
Pipeline({c.name: c.latest for c in EventDataSet.columns}),
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
|
||||
assets = self.asset_finder.retrieve_all(self.ASSET_FINDER_EQUITY_SIDS)
|
||||
dates = self.trading_days
|
||||
|
||||
expected = self.frame_containing_all_missing_values(
|
||||
index=pd.MultiIndex.from_product([dates, assets]),
|
||||
columns=EventDataSet.columns,
|
||||
)
|
||||
|
||||
assert_equal(results, expected)
|
||||
|
||||
|
||||
class EventsLoaderTestCase(WithAssetFinder,
|
||||
WithTradingSessions,
|
||||
ZiplineTestCase):
|
||||
|
||||
@@ -192,12 +192,31 @@ class EventsLoader(PipelineLoader):
|
||||
def to_frame(array):
|
||||
return pd.DataFrame(array, index=dates, columns=sids)
|
||||
|
||||
assert indexer.shape == (len(dates), len(sids))
|
||||
|
||||
out = {}
|
||||
for c in columns:
|
||||
raw = self.events[name_map[c]][indexer]
|
||||
# indexer will be -1 for locations where we don't have a known
|
||||
# value.
|
||||
raw[indexer < 0] = c.missing_value
|
||||
# Array holding the value for column `c` for every event we have.
|
||||
col_array = self.events[name_map[c]]
|
||||
|
||||
if not len(col_array):
|
||||
# We don't have **any** events, so return col.missing_value
|
||||
# every day for every sid. We have to special case empty events
|
||||
# because in normal branch we depend on being able to index
|
||||
# with -1 for missing values, which fails if there are no
|
||||
# events at all.
|
||||
raw = np.full(
|
||||
(len(dates), len(sids)), c.missing_value, dtype=c.dtype
|
||||
)
|
||||
else:
|
||||
# Slot event values into sid/date locations using `indexer`.
|
||||
# This produces a 2D array of the same shape as `indexer`,
|
||||
# which must be (len(dates), len(sids))`.
|
||||
raw = col_array[indexer]
|
||||
|
||||
# indexer will be -1 for locations where we don't have a known
|
||||
# value. Overwrite those locations with c.missing_value.
|
||||
raw[indexer < 0] = c.missing_value
|
||||
|
||||
# Delegate the actual array formatting logic to a DataFrameLoader.
|
||||
loader = DataFrameLoader(c, to_frame(raw), adjustments=None)
|
||||
|
||||
Reference in New Issue
Block a user