diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 3a26cb7e..09ae7bce 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -20,6 +20,7 @@ from toolz import keymap, valmap, concatv from toolz.curried import operator as op from zipline.assets.synthetic import make_simple_equity_info +from zipline.errors import UnsupportedPipelineOutput from zipline.pipeline import Pipeline, CustomFactor from zipline.pipeline.data import DataSet, BoundColumn, Column from zipline.pipeline.engine import SimplePipelineEngine @@ -38,12 +39,9 @@ from zipline.testing import ( tmp_asset_finder, ) from zipline.testing.fixtures import WithAssetFinder -from zipline.utils.numpy_utils import ( - float64_dtype, - int64_dtype, - repeat_last_axis, -) from zipline.testing.predicates import assert_equal, assert_isidentical +from zipline.utils.numpy_utils import float64_dtype, int64_dtype + nameof = op.attrgetter('name') dtypeof = op.attrgetter('dtype') @@ -778,6 +776,44 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): check_dtype=False, ) + def _test_id_macro(self, df, dshape, expected, finder, add): + dates = self.dates + expr = bz.data(df, name='expr', dshape=dshape) + loader = BlazeLoader() + ds = from_blaze( + expr, + loader=loader, + no_deltas_rule='ignore', + missing_values=self.missing_values, + ) + + p = Pipeline() + macro_inputs = [] + for column_name in add: + column = getattr(ds, column_name) + macro_inputs.append(column) + with self.assertRaises(UnsupportedPipelineOutput): + # Single column output terms cannot be added to a pipeline. + p.add(column.latest, column_name) + + class UsesMacroInputs(CustomFactor): + inputs = macro_inputs + window_length = 1 + + def compute(self, today, assets, out, *inputs): + e = expected.loc[today] + for i, input_ in enumerate(inputs): + # Each macro input should only have one column. + assert input_.shape == (self.window_length, 1) + assert_equal(input_[0, 0], e[i]) + + # Run the pipeline with our custom factor. Assertions about the + # expected macro data are made in the `compute` function of our custom + # factor above. + p.add(UsesMacroInputs(), 'uses_macro_inputs') + engine = SimplePipelineEngine(loader, dates, finder) + engine.run_pipeline(p, dates[0], dates[-1]) + def test_custom_query_time_tz(self): df = self.df.copy() df['timestamp'] = ( @@ -972,27 +1008,19 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): 6 2014-01-03 2014-01-03 2 output (expected): - value - 2014-01-01 Equity(65 [A]) 0 - Equity(66 [B]) 0 - Equity(67 [C]) 0 - 2014-01-02 Equity(65 [A]) 1 - Equity(66 [B]) 1 - Equity(67 [C]) 1 - 2014-01-03 Equity(65 [A]) 2 - Equity(66 [B]) 2 - Equity(67 [C]) 2 + value + 2014-01-01 0 + 2014-01-02 1 + 2014-01-03 2 """ - nassets = len(simple_asset_info) expected = pd.DataFrame( - list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), - index=pd.MultiIndex.from_product(( - self.macro_df.timestamp, - self.asset_finder.retrieve_all(simple_asset_info.index), - )), - columns=('value',), + data=[[0], + [1], + [2]], + columns=['value'], + index=self.dates, ) - self._test_id( + self._test_id_macro( self.macro_df, self.macro_dshape, expected, @@ -1009,16 +1037,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): 2 2013-12-24 2013-12-24 NaN NaN output (expected): - other value - 2014-01-01 Equity(65 [A]) 1 0 - Equity(66 [B]) 1 0 - Equity(67 [C]) 1 0 - 2014-01-02 Equity(65 [A]) 1 0 - Equity(66 [B]) 1 0 - Equity(67 [C]) 1 0 - 2014-01-03 Equity(65 [A]) 1 0 - Equity(66 [B]) 1 0 - Equity(67 [C]) 1 0 + other value + 2014-01-01 1 0 + 2014-01-02 1 0 + 2014-01-03 1 0 """ dates = self.dates - timedelta(days=10) df = pd.DataFrame({ @@ -1031,23 +1053,13 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): fields['other'] = fields['value'] expected = pd.DataFrame( - np.array([[0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1]]), - columns=['value', 'other'], - index=pd.MultiIndex.from_product( - (self.dates, self.asset_finder.retrieve_all( - self.ASSET_FINDER_EQUITY_SIDS - )), - ), - ).sort_index(axis=1) - self._test_id( + data=[[0, 1], + [0, 1], + [0, 1]], + columns=['other', 'value'], + index=self.dates, + ) + self._test_id_macro( df, var * Record(fields), expected, @@ -1064,16 +1076,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): 6 2014-01-03 2014-01-03 3 2 output (expected): - other value - 2014-01-01 Equity(65 [A]) 1 0 - Equity(66 [B]) 1 0 - Equity(67 [C]) 1 0 - 2014-01-02 Equity(65 [A]) 2 1 - Equity(66 [B]) 2 1 - Equity(67 [C]) 2 1 - 2014-01-03 Equity(65 [A]) 3 2 - Equity(66 [B]) 3 2 - Equity(67 [C]) 3 2 + other value + 2014-01-01 1 0 + 2014-01-02 2 1 + 2014-01-03 3 2 """ df = self.macro_df.copy() df['other'] = df.value + 1 @@ -1082,16 +1088,14 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): with tmp_asset_finder(equities=simple_asset_info) as finder: expected = pd.DataFrame( - np.array([[0, 1], - [1, 2], - [2, 3]]).repeat(3, axis=0), - index=pd.MultiIndex.from_product(( - df.timestamp, - finder.retrieve_all(simple_asset_info.index), - )), - columns=('value', 'other'), - ).sort_index(axis=1) - self._test_id( + data=[[0, 1], + [1, 2], + [2, 3]], + columns=['value', 'other'], + index=self.dates, + dtype=np.float64, + ) + self._test_id_macro( df, var * Record(fields), expected, @@ -1158,16 +1162,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): """ output (expected): - other value - 2014-01-01 Equity(65 [A]) NaN 1 - Equity(66 [B]) NaN 1 - Equity(67 [C]) NaN 1 - 2014-01-02 Equity(65 [A]) 1 2 - Equity(66 [B]) 1 2 - Equity(67 [C]) 1 2 - 2014-01-03 Equity(65 [A]) 2 2 - Equity(66 [B]) 2 2 - Equity(67 [C]) 2 2 + other value + 2014-01-01 NaN 1 + 2014-01-02 1 2 + 2014-01-03 2 2 """ T = pd.Timestamp df = pd.DataFrame( @@ -1185,32 +1183,18 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): fields['other'] = fields['value'] expected = pd.DataFrame( - columns=[ - 'other', 'value', - ], - data=[ - [np.nan, 1], # 2014-01-01 Equity(65 [A]) - [np.nan, 1], # Equity(66 [B]) - [np.nan, 1], # Equity(67 [C]) - [1, 2], # 2014-01-02 Equity(65 [A]) - [1, 2], # Equity(66 [B]) - [1, 2], # Equity(67 [C]) - [2, 2], # 2014-01-03 Equity(65 [A]) - [2, 2], # Equity(66 [B]) - [2, 2], # Equity(67 [C]) - ], - index=pd.MultiIndex.from_product( - (self.dates, self.asset_finder.retrieve_all( - self.ASSET_FINDER_EQUITY_SIDS - )), - ), + data=[[np.nan, 1], # 2014-01-01 + [1, 2], # 2014-01-02 + [2, 2]], # 2014-01-03 + columns=['other', 'value'], + index=self.dates, ) - self._test_id( + self._test_id_macro( df, var * Record(fields), expected, self.asset_finder, - ('value', 'other'), + ('other', 'value'), ) def _run_pipeline(self, @@ -1400,8 +1384,10 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): nassets = len(simple_asset_info) expected_views = keymap(pd.Timestamp, { - '2014-01-02': repeat_last_axis(np.array([10.0, 1.0]), nassets), - '2014-01-03': repeat_last_axis(np.array([11.0, 2.0]), nassets), + '2014-01-02': np.array([[10.0], + [1.0]]), + '2014-01-03': np.array([[11.0], + [2.0]]), }) with tmp_asset_finder(equities=simple_asset_info) as finder: @@ -1523,14 +1509,12 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): nassets = len(simple_asset_info) expected_views = keymap(pd.Timestamp, { - '2014-01-03': repeat_last_axis( - np.array([10.0, 10.0, 10.0]), - nassets, - ), - '2014-01-06': repeat_last_axis( - np.array([10.0, 10.0, 11.0]), - nassets, - ), + '2014-01-03': np.array([[10.0], + [10.0], + [10.0]]), + '2014-01-06': np.array([[10.0], + [10.0], + [11.0]]), }) cal = pd.DatetimeIndex([ @@ -1586,14 +1570,8 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): nassets = len(simple_asset_info) expected_views = keymap(pd.Timestamp, { - '2014-01-03': repeat_last_axis( - np.array([ffilled_value]), - nassets, - ), - '2014-01-04': repeat_last_axis( - np.array([1.0]), - nassets, - ), + '2014-01-03': np.array([[ffilled_value]]), + '2014-01-04': np.array([[1.0]]), }) with tmp_asset_finder(equities=simple_asset_info) as finder: diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index bca7310e..d06b72a8 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -123,6 +123,7 @@ class BoundColumn(LoadableTerm): missing_value=missing_value, dataset=dataset, name=name, + ndim=dataset.ndim, ) def _init(self, dataset, name, *args, **kwargs): @@ -176,6 +177,7 @@ class BoundColumn(LoadableTerm): inputs=(self,), dtype=dtype, missing_value=self.missing_value, + ndim=self.ndim, ) def __repr__(self): @@ -227,3 +229,4 @@ class DataSetMeta(type): class DataSet(with_metaclass(DataSetMeta, object)): domain = None + ndim = 2 diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 89011ad3..edef7c40 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -139,7 +139,6 @@ from __future__ import division, absolute_import from abc import ABCMeta, abstractproperty from collections import namedtuple, defaultdict -from copy import copy from functools import partial from itertools import count import warnings @@ -187,10 +186,7 @@ from zipline.utils.input_validation import ( ensure_timezone, optionally, ) -from zipline.utils.numpy_utils import ( - categorical_dtype, - repeat_last_axis, -) +from zipline.utils.numpy_utils import bool_dtype, categorical_dtype from zipline.utils.pandas_utils import sort_values from zipline.utils.preprocess import preprocess @@ -337,7 +333,7 @@ def new_dataset(expr, deltas, missing_values): the same type. """ missing_values = dict(missing_values) - columns = {} + class_dict = {'ndim': 2 if SID_FIELD_NAME in expr.fields else 1} for name, type_ in expr.dshape.measure.fields: # Don't generate a column for sid or timestamp, since they're # implicitly the labels if the arrays that will be passed to pipeline @@ -352,7 +348,7 @@ def new_dataset(expr, deltas, missing_values): ) else: col = NonPipelineField(name, type_) - columns[name] = col + class_dict[name] = col name = expr._name if name is None: @@ -363,7 +359,7 @@ def new_dataset(expr, deltas, missing_values): if PY2 and isinstance(name, unicode): # pragma: no cover # noqa name = name.encode('utf-8') - return type(name, (DataSet,), columns) + return type(name, (DataSet,), class_dict) def _check_resources(name, expr, resources): @@ -850,7 +846,7 @@ def adjustments_from_deltas_no_sids(dense_dates, The adjustments dictionary to feed to the adjusted array. """ ad_series = deltas[AD_FIELD_NAME] - idx = 0, len(asset_idx) - 1 + idx = 0, 0 return { dense_dates.get_loc(kd): overwrite_from_dates( ad_series.loc[kd], @@ -966,7 +962,7 @@ class BlazeLoader(dict): raise AssertionError('all columns must come from the same dataset') expr, deltas, checkpoints, odo_kwargs = self[dataset] - have_sids = SID_FIELD_NAME in expr.fields + have_sids = (dataset.ndim == 2) asset_idx = pd.Series(index=assets, data=np.arange(len(assets))) assets = list(map(int, assets)) # coerce from numpy.int64 added_query_fields = [AD_FIELD_NAME, TS_FIELD_NAME] + ( @@ -1142,19 +1138,14 @@ class BlazeLoader(dict): adjustments_from_deltas = adjustments_from_deltas_with_sids column_view = identity else: - # We use the column view to make an array per asset. - column_view = compose( - # We need to copy this because we need a concrete ndarray. - # The `repeat_last_axis` call will give us a fancy strided - # array which uses a buffer to represent `len(assets)` columns. - # The engine puts nans at the indicies for which we do not have - # sid information so that the nan-aware reductions still work. - # A future change to the engine would be to add first class - # support for macro econimic datasets. - copy, - partial(repeat_last_axis, count=len(assets)), - ) + # If we do not have sids, use the column view to make a single + # column vector which is unassociated with any assets. + column_view = op.itemgetter(np.s_[:, np.newaxis]) + adjustments_from_deltas = adjustments_from_deltas_no_sids + mask = np.full( + shape=(len(mask), 1), fill_value=True, dtype=bool_dtype, + ) for column_idx, column in enumerate(columns): column_name = column.name diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index a2896267..3eb0cfd4 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -94,6 +94,7 @@ class CustomTermMixin(object): mask=NotSpecified, dtype=NotSpecified, missing_value=NotSpecified, + ndim=NotSpecified, **kwargs): unexpected_keys = set(kwargs) - set(cls.params) @@ -114,6 +115,7 @@ class CustomTermMixin(object): mask=mask, dtype=dtype, missing_value=missing_value, + ndim=ndim, **kwargs )