mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 07:39:11 +08:00
Merge pull request #1186 from quantopian/remove_type_restrictions_for_generic_loader
Remove type restrictions for generic loader
This commit is contained in:
+436
-196
@@ -6,7 +6,6 @@ from __future__ import division
|
||||
from collections import OrderedDict
|
||||
from datetime import timedelta, time
|
||||
from itertools import product, chain
|
||||
from unittest import TestCase
|
||||
import warnings
|
||||
|
||||
import blaze as bz
|
||||
@@ -30,16 +29,16 @@ from zipline.pipeline.loaders.blaze import (
|
||||
NoDeltasWarning,
|
||||
)
|
||||
from zipline.pipeline.loaders.blaze.core import (
|
||||
NonNumpyField,
|
||||
NonPipelineField,
|
||||
no_deltas_rules,
|
||||
)
|
||||
from zipline.testing.fixtures import WithAssetFinder
|
||||
from zipline.utils.numpy_utils import (
|
||||
float64_dtype,
|
||||
int64_dtype,
|
||||
repeat_last_axis,
|
||||
)
|
||||
from zipline.testing import tmp_asset_finder
|
||||
from zipline.testing import tmp_asset_finder, ZiplineTestCase
|
||||
|
||||
nameof = op.attrgetter('name')
|
||||
dtypeof = op.attrgetter('dtype')
|
||||
@@ -74,14 +73,17 @@ def _utc_localize_index_level_0(df):
|
||||
return df
|
||||
|
||||
|
||||
class BlazeToPipelineTestCase(TestCase):
|
||||
class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
|
||||
START_DATE = pd.Timestamp(0)
|
||||
END_DATE = pd.Timestamp('2015')
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
def init_class_fixtures(cls):
|
||||
super(BlazeToPipelineTestCase, cls).init_class_fixtures()
|
||||
cls.dates = dates = pd.date_range('2014-01-01', '2014-01-03')
|
||||
dates = cls.dates.repeat(3)
|
||||
cls.sids = sids = ord('A'), ord('B'), ord('C')
|
||||
cls.df = df = pd.DataFrame({
|
||||
'sid': sids * 3,
|
||||
'sid': cls.ASSET_FINDER_EQUITY_SIDS * 3,
|
||||
'value': (0., 1., 2., 1., 2., 3., 2., 3., 4.),
|
||||
'int_value': (0, 1, 2, 1, 2, 3, 2, 3, 4),
|
||||
'asof_date': dates,
|
||||
@@ -122,14 +124,6 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
self.assertTrue(np.isnan(ds.value.missing_value))
|
||||
self.assertEqual(ds.int_value.missing_value, 0)
|
||||
|
||||
invalid_type_fields = ('asof_date',)
|
||||
|
||||
for field in invalid_type_fields:
|
||||
with self.assertRaises(AttributeError) as e:
|
||||
getattr(ds, field)
|
||||
self.assertIn("'%s'" % field, str(e.exception))
|
||||
self.assertIn("'datetime'", str(e.exception))
|
||||
|
||||
# test memoization
|
||||
self.assertIs(
|
||||
from_blaze(
|
||||
@@ -254,34 +248,12 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
)
|
||||
self.assertIn(str(expr), str(e.exception))
|
||||
|
||||
def test_non_numpy_field(self):
|
||||
expr = bz.data(
|
||||
[],
|
||||
dshape="""
|
||||
var * {
|
||||
a: datetime,
|
||||
asof_date: datetime,
|
||||
timestamp: datetime,
|
||||
}""",
|
||||
)
|
||||
ds = from_blaze(
|
||||
expr,
|
||||
loader=self.garbage_loader,
|
||||
no_deltas_rule=no_deltas_rules.ignore,
|
||||
)
|
||||
with self.assertRaises(AttributeError):
|
||||
ds.a
|
||||
self.assertIsInstance(object.__getattribute__(ds, 'a'), NonNumpyField)
|
||||
|
||||
def test_non_pipeline_field(self):
|
||||
# NOTE: This test will fail if we ever allow string types in
|
||||
# the Pipeline API. If this happens, change the dtype of the `a` field
|
||||
# of expr to another type we don't allow.
|
||||
expr = bz.data(
|
||||
[],
|
||||
dshape="""
|
||||
var * {
|
||||
a: string,
|
||||
a: complex,
|
||||
asof_date: datetime,
|
||||
timestamp: datetime,
|
||||
}""",
|
||||
@@ -298,6 +270,275 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
NonPipelineField,
|
||||
)
|
||||
|
||||
def test_cols_with_all_missing_vals(self):
|
||||
"""
|
||||
Tests that when there is no known data, we get output where the
|
||||
columns have the right dtypes and the right missing values filled in.
|
||||
|
||||
input (self.df):
|
||||
Empty DataFrame
|
||||
Columns: [sid, float_value, str_value, int_value, bool_value, dt_value,
|
||||
asof_date, timestamp]
|
||||
Index: []
|
||||
|
||||
output (expected)
|
||||
str_value float_value int_value
|
||||
2014-01-01 Equity(65 [A]) None NaN 0
|
||||
Equity(66 [B]) None NaN 0
|
||||
Equity(67 [C]) None NaN 0
|
||||
2014-01-02 Equity(65 [A]) None NaN 0
|
||||
Equity(66 [B]) None NaN 0
|
||||
Equity(67 [C]) None NaN 0
|
||||
2014-01-03 Equity(65 [A]) None NaN 0
|
||||
Equity(66 [B]) None NaN 0
|
||||
Equity(67 [C]) None NaN 0
|
||||
|
||||
dt_value bool_value
|
||||
2014-01-01 Equity(65 [A]) NaT False
|
||||
Equity(66 [B]) NaT False
|
||||
Equity(67 [C]) NaT False
|
||||
2014-01-02 Equity(65 [A]) NaT False
|
||||
Equity(66 [B]) NaT False
|
||||
Equity(67 [C]) NaT False
|
||||
2014-01-03 Equity(65 [A]) NaT False
|
||||
Equity(66 [B]) NaT False
|
||||
Equity(67 [C]) NaT False
|
||||
"""
|
||||
df = pd.DataFrame(columns=['sid', 'float_value', 'str_value',
|
||||
'int_value', 'bool_value', 'dt_value',
|
||||
'asof_date', 'timestamp'])
|
||||
|
||||
expr = bz.data(
|
||||
df,
|
||||
dshape="""
|
||||
var * {
|
||||
sid: int64,
|
||||
float_value: float64,
|
||||
str_value: string,
|
||||
int_value: int64,
|
||||
bool_value: bool,
|
||||
dt_value: datetime,
|
||||
asof_date: datetime,
|
||||
timestamp: datetime,
|
||||
}""",
|
||||
)
|
||||
fields = OrderedDict(expr.dshape.measure.fields)
|
||||
|
||||
expected = pd.DataFrame({
|
||||
"str_value": np.array([None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None],
|
||||
dtype='object'),
|
||||
"float_value": np.array([np.NaN,
|
||||
np.NaN,
|
||||
np.NaN,
|
||||
np.NaN,
|
||||
np.NaN,
|
||||
np.NaN,
|
||||
np.NaN,
|
||||
np.NaN,
|
||||
np.NaN],
|
||||
dtype='float64'),
|
||||
"int_value": np.array([0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0],
|
||||
dtype='int64'),
|
||||
"bool_value": np.array([False,
|
||||
False,
|
||||
False,
|
||||
False,
|
||||
False,
|
||||
False,
|
||||
False,
|
||||
False,
|
||||
False],
|
||||
dtype='bool'),
|
||||
"dt_value": [pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT,
|
||||
pd.NaT],
|
||||
},
|
||||
columns=['str_value', 'float_value', 'int_value', 'bool_value',
|
||||
'dt_value'],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, self.asset_finder.retrieve_all(
|
||||
self.ASSET_FINDER_EQUITY_SIDS
|
||||
))
|
||||
)
|
||||
)
|
||||
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('float_value', 'str_value', 'int_value', 'bool_value',
|
||||
'dt_value'),
|
||||
)
|
||||
|
||||
def test_cols_with_some_missing_vals(self):
|
||||
"""
|
||||
Tests the following:
|
||||
1) Forward filling replaces missing values correctly for the data
|
||||
types supported in pipeline.
|
||||
2) We don't forward fill when the missing value is the actual value
|
||||
we got for a date in the case of int/bool columns.
|
||||
3) We get the correct type of missing value in the output.
|
||||
|
||||
input (self.df):
|
||||
asof_date bool_value dt_value float_value int_value sid
|
||||
0 2014-01-01 True 2011-01-01 0 1 65
|
||||
1 2014-01-03 True 2011-01-02 1 2 66
|
||||
2 2014-01-01 True 2011-01-03 2 3 67
|
||||
3 2014-01-02 False NaT NaN 0 67
|
||||
|
||||
str_value timestamp
|
||||
0 a 2014-01-01
|
||||
1 b 2014-01-03
|
||||
2 c 2014-01-01
|
||||
3 None 2014-01-02
|
||||
|
||||
output (expected)
|
||||
str_value float_value int_value bool_value
|
||||
2014-01-01 Equity(65 [A]) a 0 1 True
|
||||
Equity(66 [B]) None NaN 0 False
|
||||
Equity(67 [C]) c 2 3 True
|
||||
2014-01-02 Equity(65 [A]) a 0 1 True
|
||||
Equity(66 [B]) None NaN 0 False
|
||||
Equity(67 [C]) c 2 0 False
|
||||
2014-01-03 Equity(65 [A]) a 0 1 True
|
||||
Equity(66 [B]) b 1 2 True
|
||||
Equity(67 [C]) c 2 0 False
|
||||
|
||||
dt_value
|
||||
2014-01-01 Equity(65 [A]) 2011-01-01
|
||||
Equity(66 [B]) NaT
|
||||
Equity(67 [C]) 2011-01-03
|
||||
2014-01-02 Equity(65 [A]) 2011-01-01
|
||||
Equity(66 [B]) NaT
|
||||
Equity(67 [C]) 2011-01-03
|
||||
2014-01-03 Equity(65 [A]) 2011-01-01
|
||||
Equity(66 [B]) 2011-01-02
|
||||
Equity(67 [C]) 2011-01-03
|
||||
"""
|
||||
dates = (self.dates[0], self.dates[-1], self.dates[0], self.dates[1])
|
||||
df = pd.DataFrame({
|
||||
'sid': self.ASSET_FINDER_EQUITY_SIDS[:-1] +
|
||||
(self.ASSET_FINDER_EQUITY_SIDS[-1],)*2,
|
||||
'float_value': (0., 1., 2., np.NaN),
|
||||
'str_value': ("a", "b", "c", None),
|
||||
'int_value': (1, 2, 3, 0),
|
||||
'bool_value': (True, True, True, False),
|
||||
'dt_value': (pd.Timestamp('2011-01-01'),
|
||||
pd.Timestamp('2011-01-02'),
|
||||
pd.Timestamp('2011-01-03'),
|
||||
pd.NaT),
|
||||
'asof_date': dates,
|
||||
'timestamp': dates,
|
||||
})
|
||||
|
||||
expr = bz.data(
|
||||
df,
|
||||
dshape="""
|
||||
var * {
|
||||
sid: int64,
|
||||
float_value: float64,
|
||||
str_value: string,
|
||||
int_value: int64,
|
||||
bool_value: bool,
|
||||
dt_value: datetime,
|
||||
asof_date: datetime,
|
||||
timestamp: datetime,
|
||||
}""",
|
||||
)
|
||||
fields = OrderedDict(expr.dshape.measure.fields)
|
||||
|
||||
expected = pd.DataFrame({
|
||||
"str_value": np.array(["a",
|
||||
None,
|
||||
"c",
|
||||
"a",
|
||||
None,
|
||||
"c",
|
||||
"a",
|
||||
"b",
|
||||
"c"],
|
||||
dtype='object'),
|
||||
"float_value": np.array([0,
|
||||
np.NaN,
|
||||
2,
|
||||
0,
|
||||
np.NaN,
|
||||
2,
|
||||
0,
|
||||
1,
|
||||
2],
|
||||
dtype='float64'),
|
||||
"int_value": np.array([1,
|
||||
0,
|
||||
3,
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
0],
|
||||
dtype='int64'),
|
||||
"bool_value": np.array([True,
|
||||
False,
|
||||
True,
|
||||
True,
|
||||
False,
|
||||
False,
|
||||
True,
|
||||
True,
|
||||
False],
|
||||
dtype='bool'),
|
||||
"dt_value": [pd.Timestamp('2011-01-01'),
|
||||
pd.NaT,
|
||||
pd.Timestamp('2011-01-03'),
|
||||
pd.Timestamp('2011-01-01'),
|
||||
pd.NaT,
|
||||
pd.Timestamp('2011-01-03'),
|
||||
pd.Timestamp('2011-01-01'),
|
||||
pd.Timestamp('2011-01-02'),
|
||||
pd.Timestamp('2011-01-03')],
|
||||
},
|
||||
columns=['str_value', 'float_value', 'int_value', 'bool_value',
|
||||
'dt_value'],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, self.asset_finder.retrieve_all(
|
||||
self.ASSET_FINDER_EQUITY_SIDS
|
||||
))
|
||||
)
|
||||
)
|
||||
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('float_value', 'str_value', 'int_value', 'bool_value',
|
||||
'dt_value'),
|
||||
)
|
||||
|
||||
def test_complex_expr(self):
|
||||
expr = bz.data(self.df, dshape=self.dshape)
|
||||
# put an Add in the table
|
||||
@@ -353,16 +594,14 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
p.add(getattr(ds, a).latest, a)
|
||||
dates = self.dates
|
||||
|
||||
with tmp_asset_finder() as finder:
|
||||
result = SimplePipelineEngine(
|
||||
loader,
|
||||
dates,
|
||||
finder,
|
||||
).run_pipeline(p, dates[0], dates[-1])
|
||||
|
||||
result = SimplePipelineEngine(
|
||||
loader,
|
||||
dates,
|
||||
finder,
|
||||
).run_pipeline(p, dates[0], dates[-1])
|
||||
assert_frame_equal(
|
||||
result,
|
||||
_utc_localize_index_level_0(expected),
|
||||
result.sort_index(axis=1),
|
||||
_utc_localize_index_level_0(expected.sort_index(axis=1)),
|
||||
check_dtype=False,
|
||||
)
|
||||
|
||||
@@ -386,12 +625,11 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
p.add(ds.int_value.latest, 'int_value')
|
||||
dates = self.dates
|
||||
|
||||
with tmp_asset_finder() as finder:
|
||||
result = SimplePipelineEngine(
|
||||
loader,
|
||||
dates,
|
||||
finder,
|
||||
).run_pipeline(p, dates[0], dates[-1])
|
||||
result = SimplePipelineEngine(
|
||||
loader,
|
||||
dates,
|
||||
self.asset_finder,
|
||||
).run_pipeline(p, dates[0], dates[-1])
|
||||
|
||||
expected = df.drop('asof_date', axis=1)
|
||||
expected['timestamp'] = expected['timestamp'].dt.normalize().astype(
|
||||
@@ -401,7 +639,7 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
expected.set_index(['timestamp', 'sid'], inplace=True)
|
||||
expected.index = pd.MultiIndex.from_product((
|
||||
expected.index.levels[0],
|
||||
finder.retrieve_all(expected.index.levels[1]),
|
||||
self.asset_finder.retrieve_all(expected.index.levels[1]),
|
||||
))
|
||||
assert_frame_equal(result, expected, check_dtype=False)
|
||||
|
||||
@@ -431,17 +669,17 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
Equity(66 [B]) 3
|
||||
Equity(67 [C]) 4
|
||||
"""
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = self.df.drop('asof_date', axis=1).set_index(
|
||||
['timestamp', 'sid'],
|
||||
)
|
||||
expected.index = pd.MultiIndex.from_product((
|
||||
expected.index.levels[0],
|
||||
finder.retrieve_all(expected.index.levels[1]),
|
||||
))
|
||||
self._test_id(
|
||||
self.df, self.dshape, expected, finder, ('int_value', 'value',)
|
||||
)
|
||||
expected = self.df.drop('asof_date', axis=1).set_index(
|
||||
['timestamp', 'sid'],
|
||||
)
|
||||
expected.index = pd.MultiIndex.from_product((
|
||||
expected.index.levels[0],
|
||||
self.asset_finder.retrieve_all(expected.index.levels[1]),
|
||||
))
|
||||
self._test_id(
|
||||
self.df, self.dshape, expected, self.asset_finder,
|
||||
('int_value', 'value',)
|
||||
)
|
||||
|
||||
def test_id_ffill_out_of_window(self):
|
||||
"""
|
||||
@@ -472,7 +710,7 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
"""
|
||||
dates = self.dates.repeat(3) - timedelta(days=10)
|
||||
df = pd.DataFrame({
|
||||
'sid': self.sids * 3,
|
||||
'sid': self.ASSET_FINDER_EQUITY_SIDS * 3,
|
||||
'value': (0, 1, np.nan, 1, np.nan, 3, np.nan, 3, 4),
|
||||
'other': (0, np.nan, 2, np.nan, 2, 3, 2, 3, np.nan),
|
||||
'asof_date': dates,
|
||||
@@ -481,29 +719,30 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
fields = OrderedDict(self.dshape.measure.fields)
|
||||
fields['other'] = fields['value']
|
||||
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = pd.DataFrame(
|
||||
np.array([[2, 1],
|
||||
[3, 3],
|
||||
[3, 4],
|
||||
[2, 1],
|
||||
[3, 3],
|
||||
[3, 4],
|
||||
[2, 1],
|
||||
[3, 3],
|
||||
[3, 4]]),
|
||||
columns=['other', 'value'],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, finder.retrieve_all(self.sids)),
|
||||
),
|
||||
)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
expected = pd.DataFrame(
|
||||
np.array([[2, 1],
|
||||
[3, 3],
|
||||
[3, 4],
|
||||
[2, 1],
|
||||
[3, 3],
|
||||
[3, 4],
|
||||
[2, 1],
|
||||
[3, 3],
|
||||
[3, 4]]),
|
||||
columns=['other', 'value'],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, self.asset_finder.retrieve_all(
|
||||
self.ASSET_FINDER_EQUITY_SIDS
|
||||
)),
|
||||
),
|
||||
)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
|
||||
def test_id_multiple_columns(self):
|
||||
"""
|
||||
@@ -535,21 +774,20 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
df['other'] = df.value + 1
|
||||
fields = OrderedDict(self.dshape.measure.fields)
|
||||
fields['other'] = fields['value']
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = df.drop('asof_date', axis=1).set_index(
|
||||
['timestamp', 'sid'],
|
||||
).sort_index(axis=1)
|
||||
expected.index = pd.MultiIndex.from_product((
|
||||
expected.index.levels[0],
|
||||
finder.retrieve_all(expected.index.levels[1]),
|
||||
))
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
finder,
|
||||
('value', 'int_value', 'other'),
|
||||
)
|
||||
expected = df.drop('asof_date', axis=1).set_index(
|
||||
['timestamp', 'sid'],
|
||||
).sort_index(axis=1)
|
||||
expected.index = pd.MultiIndex.from_product((
|
||||
expected.index.levels[0],
|
||||
self.asset_finder.retrieve_all(expected.index.levels[1]),
|
||||
))
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('value', 'int_value', 'other'),
|
||||
)
|
||||
|
||||
def test_id_macro_dataset(self):
|
||||
"""
|
||||
@@ -573,22 +811,21 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
"""
|
||||
asset_info = asset_infos[0][0]
|
||||
nassets = len(asset_info)
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = pd.DataFrame(
|
||||
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
|
||||
index=pd.MultiIndex.from_product((
|
||||
self.macro_df.timestamp,
|
||||
finder.retrieve_all(asset_info.index),
|
||||
)),
|
||||
columns=('value',),
|
||||
)
|
||||
self._test_id(
|
||||
self.macro_df,
|
||||
self.macro_dshape,
|
||||
expected,
|
||||
finder,
|
||||
('value',),
|
||||
)
|
||||
expected = pd.DataFrame(
|
||||
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
|
||||
index=pd.MultiIndex.from_product((
|
||||
self.macro_df.timestamp,
|
||||
self.asset_finder.retrieve_all(asset_info.index),
|
||||
)),
|
||||
columns=('value',),
|
||||
)
|
||||
self._test_id(
|
||||
self.macro_df,
|
||||
self.macro_dshape,
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('value',),
|
||||
)
|
||||
|
||||
def test_id_ffill_out_of_window_macro_dataset(self):
|
||||
"""
|
||||
@@ -620,29 +857,30 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
fields = OrderedDict(self.macro_dshape.measure.fields)
|
||||
fields['other'] = fields['value']
|
||||
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = pd.DataFrame(
|
||||
np.array([[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1]]),
|
||||
columns=['value', 'other'],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, finder.retrieve_all(self.sids)),
|
||||
),
|
||||
).sort_index(axis=1)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
expected = pd.DataFrame(
|
||||
np.array([[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1],
|
||||
[0, 1]]),
|
||||
columns=['value', 'other'],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, self.asset_finder.retrieve_all(
|
||||
self.ASSET_FINDER_EQUITY_SIDS
|
||||
)),
|
||||
),
|
||||
).sort_index(axis=1)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
|
||||
def test_id_macro_dataset_multiple_columns(self):
|
||||
"""
|
||||
@@ -717,31 +955,32 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
fields = OrderedDict(self.dshape.measure.fields)
|
||||
fields['other'] = fields['value']
|
||||
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = pd.DataFrame(
|
||||
columns=['other', 'value'],
|
||||
data=[
|
||||
[1, 0], # 2014-01-01 Equity(65 [A])
|
||||
[np.nan, 1], # Equity(66 [B])
|
||||
[2, np.nan], # Equity(67 [C])
|
||||
[1, 1], # 2014-01-02 Equity(65 [A])
|
||||
[2, 1], # Equity(66 [B])
|
||||
[3, 3], # Equity(67 [C])
|
||||
[2, 1], # 2014-01-03 Equity(65 [A])
|
||||
[3, 3], # Equity(66 [B])
|
||||
[3, 3], # Equity(67 [C])
|
||||
],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, finder.retrieve_all(self.sids)),
|
||||
),
|
||||
)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
expected = pd.DataFrame(
|
||||
columns=['other', 'value'],
|
||||
data=[
|
||||
[1, 0], # 2014-01-01 Equity(65 [A])
|
||||
[np.nan, 1], # Equity(66 [B])
|
||||
[2, np.nan], # Equity(67 [C])
|
||||
[1, 1], # 2014-01-02 Equity(65 [A])
|
||||
[2, 1], # Equity(66 [B])
|
||||
[3, 3], # Equity(67 [C])
|
||||
[2, 1], # 2014-01-03 Equity(65 [A])
|
||||
[3, 3], # Equity(66 [B])
|
||||
[3, 3], # Equity(67 [C])
|
||||
],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, self.asset_finder.retrieve_all(
|
||||
self.ASSET_FINDER_EQUITY_SIDS
|
||||
)),
|
||||
),
|
||||
)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
|
||||
def test_id_take_last_in_group_macro(self):
|
||||
"""
|
||||
@@ -773,33 +1012,34 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
fields = OrderedDict(self.macro_dshape.measure.fields)
|
||||
fields['other'] = fields['value']
|
||||
|
||||
with tmp_asset_finder() as finder:
|
||||
expected = pd.DataFrame(
|
||||
columns=[
|
||||
'other', 'value',
|
||||
],
|
||||
data=[
|
||||
[np.nan, 1], # 2014-01-01 Equity(65 [A])
|
||||
[np.nan, 1], # Equity(66 [B])
|
||||
[np.nan, 1], # Equity(67 [C])
|
||||
[1, 2], # 2014-01-02 Equity(65 [A])
|
||||
[1, 2], # Equity(66 [B])
|
||||
[1, 2], # Equity(67 [C])
|
||||
[2, 2], # 2014-01-03 Equity(65 [A])
|
||||
[2, 2], # Equity(66 [B])
|
||||
[2, 2], # Equity(67 [C])
|
||||
],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, finder.retrieve_all(self.sids)),
|
||||
),
|
||||
)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
expected = pd.DataFrame(
|
||||
columns=[
|
||||
'other', 'value',
|
||||
],
|
||||
data=[
|
||||
[np.nan, 1], # 2014-01-01 Equity(65 [A])
|
||||
[np.nan, 1], # Equity(66 [B])
|
||||
[np.nan, 1], # Equity(67 [C])
|
||||
[1, 2], # 2014-01-02 Equity(65 [A])
|
||||
[1, 2], # Equity(66 [B])
|
||||
[1, 2], # Equity(67 [C])
|
||||
[2, 2], # 2014-01-03 Equity(65 [A])
|
||||
[2, 2], # Equity(66 [B])
|
||||
[2, 2], # Equity(67 [C])
|
||||
],
|
||||
index=pd.MultiIndex.from_product(
|
||||
(self.dates, self.asset_finder.retrieve_all(
|
||||
self.ASSET_FINDER_EQUITY_SIDS
|
||||
)),
|
||||
),
|
||||
)
|
||||
self._test_id(
|
||||
df,
|
||||
var * Record(fields),
|
||||
expected,
|
||||
self.asset_finder,
|
||||
('value', 'other'),
|
||||
)
|
||||
|
||||
def _run_pipeline(self,
|
||||
expr,
|
||||
@@ -1015,7 +1255,7 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
])
|
||||
repeated_dates = base_dates.repeat(3)
|
||||
baseline = pd.DataFrame({
|
||||
'sid': self.sids * 2,
|
||||
'sid': self.ASSET_FINDER_EQUITY_SIDS * 2,
|
||||
'value': (0., 1., 2., 1., 2., 3.),
|
||||
'int_value': (0, 1, 2, 1, 2, 3),
|
||||
'asof_date': repeated_dates,
|
||||
|
||||
@@ -137,11 +137,9 @@ from datashape import (
|
||||
Date,
|
||||
DateTime,
|
||||
Option,
|
||||
float64,
|
||||
floating,
|
||||
isrecord,
|
||||
isscalar,
|
||||
promote,
|
||||
)
|
||||
import numpy as np
|
||||
from odo import odo
|
||||
@@ -170,7 +168,7 @@ from zipline.pipeline.loaders.utils import (
|
||||
normalize_timestamp_to_query_time,
|
||||
)
|
||||
from zipline.pipeline.term import NotSpecified
|
||||
from zipline.lib.adjusted_array import AdjustedArray
|
||||
from zipline.lib.adjusted_array import AdjustedArray, can_represent_dtype
|
||||
from zipline.lib.adjustment import Float64Overwrite
|
||||
from zipline.utils.enum import enum
|
||||
from zipline.utils.input_validation import (
|
||||
@@ -178,7 +176,10 @@ from zipline.utils.input_validation import (
|
||||
ensure_timezone,
|
||||
optionally,
|
||||
)
|
||||
from zipline.utils.numpy_utils import repeat_last_axis
|
||||
from zipline.utils.numpy_utils import (
|
||||
categorical_dtype,
|
||||
repeat_last_axis,
|
||||
)
|
||||
from zipline.utils.pandas_utils import sort_values
|
||||
from zipline.utils.preprocess import preprocess
|
||||
|
||||
@@ -276,6 +277,31 @@ class NotPipelineCompatible(TypeError):
|
||||
_new_names = ('BlazeDataSet_%d' % n for n in count())
|
||||
|
||||
|
||||
def datashape_type_to_numpy(type_):
|
||||
"""
|
||||
Given a datashape type, return the associated numpy type. Maps
|
||||
datashape's DateTime type to numpy's `datetime64[ns]` dtype, since the
|
||||
numpy datetime returned by datashape isn't supported by pipeline.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
type_: datashape.coretypes.Type
|
||||
The datashape type.
|
||||
|
||||
Returns
|
||||
-------
|
||||
type_ np.dtype
|
||||
The numpy dtype.
|
||||
|
||||
"""
|
||||
if isinstance(type_, Option):
|
||||
type_ = type_.ty
|
||||
if isinstance(type_, DateTime):
|
||||
return np.dtype('datetime64[ns]')
|
||||
else:
|
||||
return type_.to_numpy_dtype()
|
||||
|
||||
|
||||
@memoize
|
||||
def new_dataset(expr, deltas, missing_values):
|
||||
"""
|
||||
@@ -311,22 +337,14 @@ def new_dataset(expr, deltas, missing_values):
|
||||
# Terms.
|
||||
if name in (SID_FIELD_NAME, TS_FIELD_NAME):
|
||||
continue
|
||||
try:
|
||||
# TODO: This should support datetime and bool columns.
|
||||
if promote(type_, float64, promote_option=False) != float64:
|
||||
raise NotPipelineCompatible()
|
||||
if isinstance(type_, Option):
|
||||
type_ = type_.ty
|
||||
except NotPipelineCompatible:
|
||||
col = NonPipelineField(name, type_)
|
||||
except TypeError:
|
||||
col = NonNumpyField(name, type_)
|
||||
else:
|
||||
type_ = datashape_type_to_numpy(type_)
|
||||
if can_represent_dtype(type_):
|
||||
col = Column(
|
||||
type_.to_numpy_dtype(),
|
||||
type_,
|
||||
missing_values.get(name, NotSpecified),
|
||||
)
|
||||
|
||||
else:
|
||||
col = NonPipelineField(name, type_)
|
||||
columns[name] = col
|
||||
|
||||
name = expr._name
|
||||
@@ -1018,6 +1036,37 @@ class BlazeLoader(dict):
|
||||
dense_output = last_in_date_group(sparse_output, reindex=True)
|
||||
dense_output.ffill(inplace=True)
|
||||
|
||||
# Fill in missing values specified by each column. This is made
|
||||
# significantly more complex by the fact that we need to work around
|
||||
# two pandas issues:
|
||||
|
||||
# 1) When we have sids, if there are no records for a given sid for any
|
||||
# dates, pandas will generate a column full of NaNs for that sid.
|
||||
# This means that some of the columns in `dense_output` are now
|
||||
# float instead of the intended dtype, so we have to coerce back to
|
||||
# our expected type and convert NaNs into the desired missing value.
|
||||
|
||||
# 2) DataFrame.ffill assumes that receiving None as a fill-value means
|
||||
# that no value was passed. Consequently, there's no way to tell
|
||||
# pandas to replace NaNs in an object column with None using fillna,
|
||||
# so we have to roll our own instead using df.where.
|
||||
for column in columns:
|
||||
# Special logic for strings since `fillna` doesn't work if the
|
||||
# missing value is `None`.
|
||||
if column.dtype == categorical_dtype:
|
||||
dense_output[column.name] = dense_output[
|
||||
column.name
|
||||
].where(pd.notnull(dense_output[column.name]),
|
||||
column.missing_value)
|
||||
else:
|
||||
# We need to execute `fillna` before `astype` in case the
|
||||
# column contains NaNs and needs to be cast to bool or int.
|
||||
# This is so that the NaNs are replaced first, since pandas
|
||||
# can't convert NaNs for those types.
|
||||
dense_output[column.name] = dense_output[
|
||||
column.name
|
||||
].fillna(column.missing_value).astype(column.dtype)
|
||||
|
||||
if have_sids:
|
||||
adjustments_from_deltas = adjustments_from_deltas_with_sids
|
||||
column_view = identity
|
||||
|
||||
Reference in New Issue
Block a user