mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 09:54:01 +08:00
BUG: fix blaze pipeline queries for asof_date
This commit is contained in:
@@ -857,28 +857,28 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
|
||||
def test_id(self):
|
||||
"""
|
||||
input (self.df):
|
||||
asof_date sid timestamp value
|
||||
0 2014-01-01 65 2014-01-01 0
|
||||
1 2014-01-01 66 2014-01-01 1
|
||||
2 2014-01-01 67 2014-01-01 2
|
||||
3 2014-01-02 65 2014-01-02 1
|
||||
4 2014-01-02 66 2014-01-02 2
|
||||
5 2014-01-02 67 2014-01-02 3
|
||||
6 2014-01-03 65 2014-01-03 2
|
||||
7 2014-01-03 66 2014-01-03 3
|
||||
8 2014-01-03 67 2014-01-03 4
|
||||
asof_date sid timestamp int_value value
|
||||
0 2014-01-01 65 2014-01-01 0 0
|
||||
1 2014-01-01 66 2014-01-01 1 1
|
||||
2 2014-01-01 67 2014-01-01 2 2
|
||||
3 2014-01-02 65 2014-01-02 1 1
|
||||
4 2014-01-02 66 2014-01-02 2 2
|
||||
5 2014-01-02 67 2014-01-02 3 3
|
||||
6 2014-01-03 65 2014-01-03 2 2
|
||||
7 2014-01-03 66 2014-01-03 3 3
|
||||
8 2014-01-03 67 2014-01-03 4 4
|
||||
|
||||
output (expected)
|
||||
value
|
||||
2014-01-01 Equity(65 [A]) 0
|
||||
Equity(66 [B]) 1
|
||||
Equity(67 [C]) 2
|
||||
2014-01-02 Equity(65 [A]) 1
|
||||
Equity(66 [B]) 2
|
||||
Equity(67 [C]) 3
|
||||
2014-01-03 Equity(65 [A]) 2
|
||||
Equity(66 [B]) 3
|
||||
Equity(67 [C]) 4
|
||||
int_value value
|
||||
2014-01-01 Equity(65 [A]) 0 0
|
||||
Equity(66 [B]) 1 1
|
||||
Equity(67 [C]) 2 2
|
||||
2014-01-02 Equity(65 [A]) 1 1
|
||||
Equity(66 [B]) 2 2
|
||||
Equity(67 [C]) 3 3
|
||||
2014-01-03 Equity(65 [A]) 2 2
|
||||
Equity(66 [B]) 3 3
|
||||
Equity(67 [C]) 4 4
|
||||
"""
|
||||
expected = self.df.drop('asof_date', axis=1).set_index(
|
||||
['timestamp', 'sid'],
|
||||
@@ -892,6 +892,44 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
|
||||
('int_value', 'value',)
|
||||
)
|
||||
|
||||
def test_id_with_asof_date(self):
|
||||
"""
|
||||
input (self.df):
|
||||
asof_date sid timestamp int_value value
|
||||
0 2014-01-01 65 2014-01-01 0 0
|
||||
1 2014-01-01 66 2014-01-01 1 1
|
||||
2 2014-01-01 67 2014-01-01 2 2
|
||||
3 2014-01-02 65 2014-01-02 1 1
|
||||
4 2014-01-02 66 2014-01-02 2 2
|
||||
5 2014-01-02 67 2014-01-02 3 3
|
||||
6 2014-01-03 65 2014-01-03 2 2
|
||||
7 2014-01-03 66 2014-01-03 3 3
|
||||
8 2014-01-03 67 2014-01-03 4 4
|
||||
|
||||
output (expected)
|
||||
asof_date
|
||||
2014-01-01 Equity(65 [A]) 2014-01-01
|
||||
Equity(66 [B]) 2014-01-01
|
||||
Equity(67 [C]) 2014-01-01
|
||||
2014-01-02 Equity(65 [A]) 2014-01-02
|
||||
Equity(66 [B]) 2014-01-02
|
||||
Equity(67 [C]) 2014-01-02
|
||||
2014-01-03 Equity(65 [A]) 2014-01-03
|
||||
Equity(66 [B]) 2014-01-03
|
||||
Equity(67 [C]) 2014-01-03
|
||||
"""
|
||||
expected = self.df.drop(['value', 'int_value'], axis=1).set_index(
|
||||
['timestamp', 'sid'],
|
||||
)
|
||||
expected.index = pd.MultiIndex.from_product((
|
||||
expected.index.levels[0],
|
||||
self.asset_finder.retrieve_all(expected.index.levels[1]),
|
||||
))
|
||||
self._test_id(
|
||||
self.df, self.dshape, expected, self.asset_finder,
|
||||
('asof_date',)
|
||||
)
|
||||
|
||||
def test_id_ffill_out_of_window(self):
|
||||
"""
|
||||
input (df):
|
||||
|
||||
@@ -988,10 +988,11 @@ class BlazeLoader(dict):
|
||||
have_sids = (dataset.ndim == 2)
|
||||
asset_idx = pd.Series(index=assets, data=np.arange(len(assets)))
|
||||
assets = list(map(int, assets)) # coerce from numpy.int64
|
||||
added_query_fields = [AD_FIELD_NAME, TS_FIELD_NAME] + (
|
||||
[SID_FIELD_NAME] if have_sids else []
|
||||
added_query_fields = {AD_FIELD_NAME, TS_FIELD_NAME} | (
|
||||
{SID_FIELD_NAME} if have_sids else set()
|
||||
)
|
||||
colnames = added_query_fields + list(map(getname, columns))
|
||||
requested_columns = set(map(getname, columns))
|
||||
colnames = sorted(added_query_fields | requested_columns)
|
||||
|
||||
data_query_time = self._data_query_time
|
||||
data_query_tz = self._data_query_tz
|
||||
@@ -1078,7 +1079,8 @@ class BlazeLoader(dict):
|
||||
materialized_deltas,
|
||||
dates,
|
||||
)
|
||||
sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True)
|
||||
if AD_FIELD_NAME not in requested_columns:
|
||||
sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True)
|
||||
|
||||
sparse_deltas = last_in_date_group(non_novel_deltas,
|
||||
dates,
|
||||
|
||||
Reference in New Issue
Block a user