Loader return column vector for no sids case

This commit is contained in:
dmichalowicz
2016-06-23 12:14:39 -04:00
parent d6c1c5fce9
commit d8e9fa91bd
4 changed files with 115 additions and 141 deletions
+97 -119
View File
@@ -20,6 +20,7 @@ from toolz import keymap, valmap, concatv
from toolz.curried import operator as op
from zipline.assets.synthetic import make_simple_equity_info
from zipline.errors import UnsupportedPipelineOutput
from zipline.pipeline import Pipeline, CustomFactor
from zipline.pipeline.data import DataSet, BoundColumn, Column
from zipline.pipeline.engine import SimplePipelineEngine
@@ -38,12 +39,9 @@ from zipline.testing import (
tmp_asset_finder,
)
from zipline.testing.fixtures import WithAssetFinder
from zipline.utils.numpy_utils import (
float64_dtype,
int64_dtype,
repeat_last_axis,
)
from zipline.testing.predicates import assert_equal, assert_isidentical
from zipline.utils.numpy_utils import float64_dtype, int64_dtype
nameof = op.attrgetter('name')
dtypeof = op.attrgetter('dtype')
@@ -778,6 +776,44 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
check_dtype=False,
)
def _test_id_macro(self, df, dshape, expected, finder, add):
dates = self.dates
expr = bz.data(df, name='expr', dshape=dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule='ignore',
missing_values=self.missing_values,
)
p = Pipeline()
macro_inputs = []
for column_name in add:
column = getattr(ds, column_name)
macro_inputs.append(column)
with self.assertRaises(UnsupportedPipelineOutput):
# Single column output terms cannot be added to a pipeline.
p.add(column.latest, column_name)
class UsesMacroInputs(CustomFactor):
inputs = macro_inputs
window_length = 1
def compute(self, today, assets, out, *inputs):
e = expected.loc[today]
for i, input_ in enumerate(inputs):
# Each macro input should only have one column.
assert input_.shape == (self.window_length, 1)
assert_equal(input_[0, 0], e[i])
# Run the pipeline with our custom factor. Assertions about the
# expected macro data are made in the `compute` function of our custom
# factor above.
p.add(UsesMacroInputs(), 'uses_macro_inputs')
engine = SimplePipelineEngine(loader, dates, finder)
engine.run_pipeline(p, dates[0], dates[-1])
def test_custom_query_time_tz(self):
df = self.df.copy()
df['timestamp'] = (
@@ -972,27 +1008,19 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
6 2014-01-03 2014-01-03 2
output (expected):
value
2014-01-01 Equity(65 [A]) 0
Equity(66 [B]) 0
Equity(67 [C]) 0
2014-01-02 Equity(65 [A]) 1
Equity(66 [B]) 1
Equity(67 [C]) 1
2014-01-03 Equity(65 [A]) 2
Equity(66 [B]) 2
Equity(67 [C]) 2
value
2014-01-01 0
2014-01-02 1
2014-01-03 2
"""
nassets = len(simple_asset_info)
expected = pd.DataFrame(
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
index=pd.MultiIndex.from_product((
self.macro_df.timestamp,
self.asset_finder.retrieve_all(simple_asset_info.index),
)),
columns=('value',),
data=[[0],
[1],
[2]],
columns=['value'],
index=self.dates,
)
self._test_id(
self._test_id_macro(
self.macro_df,
self.macro_dshape,
expected,
@@ -1009,16 +1037,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
2 2013-12-24 2013-12-24 NaN NaN
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-03 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
other value
2014-01-01 1 0
2014-01-02 1 0
2014-01-03 1 0
"""
dates = self.dates - timedelta(days=10)
df = pd.DataFrame({
@@ -1031,23 +1053,13 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
fields['other'] = fields['value']
expected = pd.DataFrame(
np.array([[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1]]),
columns=['value', 'other'],
index=pd.MultiIndex.from_product(
(self.dates, self.asset_finder.retrieve_all(
self.ASSET_FINDER_EQUITY_SIDS
)),
),
).sort_index(axis=1)
self._test_id(
data=[[0, 1],
[0, 1],
[0, 1]],
columns=['other', 'value'],
index=self.dates,
)
self._test_id_macro(
df,
var * Record(fields),
expected,
@@ -1064,16 +1076,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
6 2014-01-03 2014-01-03 3 2
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 2 1
Equity(66 [B]) 2 1
Equity(67 [C]) 2 1
2014-01-03 Equity(65 [A]) 3 2
Equity(66 [B]) 3 2
Equity(67 [C]) 3 2
other value
2014-01-01 1 0
2014-01-02 2 1
2014-01-03 3 2
"""
df = self.macro_df.copy()
df['other'] = df.value + 1
@@ -1082,16 +1088,14 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
with tmp_asset_finder(equities=simple_asset_info) as finder:
expected = pd.DataFrame(
np.array([[0, 1],
[1, 2],
[2, 3]]).repeat(3, axis=0),
index=pd.MultiIndex.from_product((
df.timestamp,
finder.retrieve_all(simple_asset_info.index),
)),
columns=('value', 'other'),
).sort_index(axis=1)
self._test_id(
data=[[0, 1],
[1, 2],
[2, 3]],
columns=['value', 'other'],
index=self.dates,
dtype=np.float64,
)
self._test_id_macro(
df,
var * Record(fields),
expected,
@@ -1158,16 +1162,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
"""
output (expected):
other value
2014-01-01 Equity(65 [A]) NaN 1
Equity(66 [B]) NaN 1
Equity(67 [C]) NaN 1
2014-01-02 Equity(65 [A]) 1 2
Equity(66 [B]) 1 2
Equity(67 [C]) 1 2
2014-01-03 Equity(65 [A]) 2 2
Equity(66 [B]) 2 2
Equity(67 [C]) 2 2
other value
2014-01-01 NaN 1
2014-01-02 1 2
2014-01-03 2 2
"""
T = pd.Timestamp
df = pd.DataFrame(
@@ -1185,32 +1183,18 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
fields['other'] = fields['value']
expected = pd.DataFrame(
columns=[
'other', 'value',
],
data=[
[np.nan, 1], # 2014-01-01 Equity(65 [A])
[np.nan, 1], # Equity(66 [B])
[np.nan, 1], # Equity(67 [C])
[1, 2], # 2014-01-02 Equity(65 [A])
[1, 2], # Equity(66 [B])
[1, 2], # Equity(67 [C])
[2, 2], # 2014-01-03 Equity(65 [A])
[2, 2], # Equity(66 [B])
[2, 2], # Equity(67 [C])
],
index=pd.MultiIndex.from_product(
(self.dates, self.asset_finder.retrieve_all(
self.ASSET_FINDER_EQUITY_SIDS
)),
),
data=[[np.nan, 1], # 2014-01-01
[1, 2], # 2014-01-02
[2, 2]], # 2014-01-03
columns=['other', 'value'],
index=self.dates,
)
self._test_id(
self._test_id_macro(
df,
var * Record(fields),
expected,
self.asset_finder,
('value', 'other'),
('other', 'value'),
)
def _run_pipeline(self,
@@ -1400,8 +1384,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-02': repeat_last_axis(np.array([10.0, 1.0]), nassets),
'2014-01-03': repeat_last_axis(np.array([11.0, 2.0]), nassets),
'2014-01-02': np.array([[10.0],
[1.0]]),
'2014-01-03': np.array([[11.0],
[2.0]]),
})
with tmp_asset_finder(equities=simple_asset_info) as finder:
@@ -1523,14 +1509,12 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': repeat_last_axis(
np.array([10.0, 10.0, 10.0]),
nassets,
),
'2014-01-06': repeat_last_axis(
np.array([10.0, 10.0, 11.0]),
nassets,
),
'2014-01-03': np.array([[10.0],
[10.0],
[10.0]]),
'2014-01-06': np.array([[10.0],
[10.0],
[11.0]]),
})
cal = pd.DatetimeIndex([
@@ -1586,14 +1570,8 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase):
nassets = len(simple_asset_info)
expected_views = keymap(pd.Timestamp, {
'2014-01-03': repeat_last_axis(
np.array([ffilled_value]),
nassets,
),
'2014-01-04': repeat_last_axis(
np.array([1.0]),
nassets,
),
'2014-01-03': np.array([[ffilled_value]]),
'2014-01-04': np.array([[1.0]]),
})
with tmp_asset_finder(equities=simple_asset_info) as finder:
+3
View File
@@ -123,6 +123,7 @@ class BoundColumn(LoadableTerm):
missing_value=missing_value,
dataset=dataset,
name=name,
ndim=dataset.ndim,
)
def _init(self, dataset, name, *args, **kwargs):
@@ -176,6 +177,7 @@ class BoundColumn(LoadableTerm):
inputs=(self,),
dtype=dtype,
missing_value=self.missing_value,
ndim=self.ndim,
)
def __repr__(self):
@@ -227,3 +229,4 @@ class DataSetMeta(type):
class DataSet(with_metaclass(DataSetMeta, object)):
domain = None
ndim = 2
+13 -22
View File
@@ -139,7 +139,6 @@ from __future__ import division, absolute_import
from abc import ABCMeta, abstractproperty
from collections import namedtuple, defaultdict
from copy import copy
from functools import partial
from itertools import count
import warnings
@@ -187,10 +186,7 @@ from zipline.utils.input_validation import (
ensure_timezone,
optionally,
)
from zipline.utils.numpy_utils import (
categorical_dtype,
repeat_last_axis,
)
from zipline.utils.numpy_utils import bool_dtype, categorical_dtype
from zipline.utils.pandas_utils import sort_values
from zipline.utils.preprocess import preprocess
@@ -337,7 +333,7 @@ def new_dataset(expr, deltas, missing_values):
the same type.
"""
missing_values = dict(missing_values)
columns = {}
class_dict = {'ndim': 2 if SID_FIELD_NAME in expr.fields else 1}
for name, type_ in expr.dshape.measure.fields:
# Don't generate a column for sid or timestamp, since they're
# implicitly the labels if the arrays that will be passed to pipeline
@@ -352,7 +348,7 @@ def new_dataset(expr, deltas, missing_values):
)
else:
col = NonPipelineField(name, type_)
columns[name] = col
class_dict[name] = col
name = expr._name
if name is None:
@@ -363,7 +359,7 @@ def new_dataset(expr, deltas, missing_values):
if PY2 and isinstance(name, unicode): # pragma: no cover # noqa
name = name.encode('utf-8')
return type(name, (DataSet,), columns)
return type(name, (DataSet,), class_dict)
def _check_resources(name, expr, resources):
@@ -850,7 +846,7 @@ def adjustments_from_deltas_no_sids(dense_dates,
The adjustments dictionary to feed to the adjusted array.
"""
ad_series = deltas[AD_FIELD_NAME]
idx = 0, len(asset_idx) - 1
idx = 0, 0
return {
dense_dates.get_loc(kd): overwrite_from_dates(
ad_series.loc[kd],
@@ -966,7 +962,7 @@ class BlazeLoader(dict):
raise AssertionError('all columns must come from the same dataset')
expr, deltas, checkpoints, odo_kwargs = self[dataset]
have_sids = SID_FIELD_NAME in expr.fields
have_sids = (dataset.ndim == 2)
asset_idx = pd.Series(index=assets, data=np.arange(len(assets)))
assets = list(map(int, assets)) # coerce from numpy.int64
added_query_fields = [AD_FIELD_NAME, TS_FIELD_NAME] + (
@@ -1142,19 +1138,14 @@ class BlazeLoader(dict):
adjustments_from_deltas = adjustments_from_deltas_with_sids
column_view = identity
else:
# We use the column view to make an array per asset.
column_view = compose(
# We need to copy this because we need a concrete ndarray.
# The `repeat_last_axis` call will give us a fancy strided
# array which uses a buffer to represent `len(assets)` columns.
# The engine puts nans at the indicies for which we do not have
# sid information so that the nan-aware reductions still work.
# A future change to the engine would be to add first class
# support for macro econimic datasets.
copy,
partial(repeat_last_axis, count=len(assets)),
)
# If we do not have sids, use the column view to make a single
# column vector which is unassociated with any assets.
column_view = op.itemgetter(np.s_[:, np.newaxis])
adjustments_from_deltas = adjustments_from_deltas_no_sids
mask = np.full(
shape=(len(mask), 1), fill_value=True, dtype=bool_dtype,
)
for column_idx, column in enumerate(columns):
column_name = column.name
+2
View File
@@ -94,6 +94,7 @@ class CustomTermMixin(object):
mask=NotSpecified,
dtype=NotSpecified,
missing_value=NotSpecified,
ndim=NotSpecified,
**kwargs):
unexpected_keys = set(kwargs) - set(cls.params)
@@ -114,6 +115,7 @@ class CustomTermMixin(object):
mask=mask,
dtype=dtype,
missing_value=missing_value,
ndim=ndim,
**kwargs
)