From 0f8738ef3273dfb87d5903da026f3724051a228f Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Thu, 5 May 2016 10:57:53 -0400 Subject: [PATCH 1/6] MAINT: remove restrictions on strings for generic loader MAINT: remove catch for NonNumpyCompatible since it's a subset of NonPipelineCompatible --- zipline/pipeline/loaders/blaze/core.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 4df69559..f54e0a80 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -137,11 +137,10 @@ from datashape import ( Date, DateTime, Option, - float64, floating, isrecord, isscalar, - promote, + String ) import numpy as np from odo import odo @@ -170,7 +169,7 @@ from zipline.pipeline.loaders.utils import ( normalize_timestamp_to_query_time, ) from zipline.pipeline.term import NotSpecified -from zipline.lib.adjusted_array import AdjustedArray +from zipline.lib.adjusted_array import AdjustedArray, can_represent_dtype from zipline.lib.adjustment import Float64Overwrite from zipline.utils.enum import enum from zipline.utils.input_validation import ( @@ -312,21 +311,17 @@ def new_dataset(expr, deltas, missing_values): if name in (SID_FIELD_NAME, TS_FIELD_NAME): continue try: - # TODO: This should support datetime and bool columns. - if promote(type_, float64, promote_option=False) != float64: - raise NotPipelineCompatible() if isinstance(type_, Option): type_ = type_.ty - except NotPipelineCompatible: - col = NonPipelineField(name, type_) - except TypeError: - col = NonNumpyField(name, type_) - else: + type_ = type_.to_numpy_dtype() + if not isinstance(type_, String) and not can_represent_dtype(type_): + raise NotPipelineCompatible() col = Column( - type_.to_numpy_dtype(), + type_, missing_values.get(name, NotSpecified), ) - + except NotPipelineCompatible: + col = NonPipelineField(name, type_) columns[name] = col name = expr._name From 8e630bff77626378e4af0499e61d5a36d529c8b1 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Fri, 6 May 2016 10:20:53 -0400 Subject: [PATCH 2/6] TST: remove obsolete test and update test --- tests/pipeline/test_blaze.py | 60 +++++++++++++++----------- zipline/pipeline/loaders/blaze/core.py | 12 +++++- 2 files changed, 45 insertions(+), 27 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index fdfe495a..105ed219 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -128,7 +128,7 @@ class BlazeToPipelineTestCase(TestCase): with self.assertRaises(AttributeError) as e: getattr(ds, field) self.assertIn("'%s'" % field, str(e.exception)) - self.assertIn("'datetime'", str(e.exception)) + self.assertIn("'datetime64[us]'", str(e.exception)) # test memoization self.assertIs( @@ -254,34 +254,12 @@ class BlazeToPipelineTestCase(TestCase): ) self.assertIn(str(expr), str(e.exception)) - def test_non_numpy_field(self): - expr = bz.data( - [], - dshape=""" - var * { - a: datetime, - asof_date: datetime, - timestamp: datetime, - }""", - ) - ds = from_blaze( - expr, - loader=self.garbage_loader, - no_deltas_rule=no_deltas_rules.ignore, - ) - with self.assertRaises(AttributeError): - ds.a - self.assertIsInstance(object.__getattribute__(ds, 'a'), NonNumpyField) - def test_non_pipeline_field(self): - # NOTE: This test will fail if we ever allow string types in - # the Pipeline API. If this happens, change the dtype of the `a` field - # of expr to another type we don't allow. expr = bz.data( [], dshape=""" var * { - a: string, + a: complex, asof_date: datetime, timestamp: datetime, }""", @@ -298,6 +276,38 @@ class BlazeToPipelineTestCase(TestCase): NonPipelineField, ) + def test_cols_with_missing_vals(self): + dates = (pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-03')) * 3 + df = pd.DataFrame({ + 'sid': self.sids * 2, + 'value': (np.NaN, 0., 1., 2., 3., 2.,), + 'str_value': (None, "b", "c", "d", "e", "f"), + 'asof_date': dates, + 'timestamp': dates, + }) + expr = bz.data( + df, + dshape=""" + var * { + sid: int64, + value: float64, + str_value: string, + asof_date: datetime, + timestamp: datetime, + }""", + ) + fields = OrderedDict(expr.dshape.measure.fields) + + with tmp_asset_finder() as finder: + expected = pd.DataFrame() + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'str_value'), + ) + def test_complex_expr(self): expr = bz.data(self.df, dshape=self.dshape) # put an Add in the table @@ -359,7 +369,7 @@ class BlazeToPipelineTestCase(TestCase): dates, finder, ).run_pipeline(p, dates[0], dates[-1]) - + import pdb; pdb.set_trace() assert_frame_equal( result, _utc_localize_index_level_0(expected), diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index f54e0a80..fd265691 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -177,7 +177,7 @@ from zipline.utils.input_validation import ( ensure_timezone, optionally, ) -from zipline.utils.numpy_utils import repeat_last_axis +from zipline.utils.numpy_utils import repeat_last_axis, categorical_dtype from zipline.utils.pandas_utils import sort_values from zipline.utils.preprocess import preprocess @@ -1006,7 +1006,15 @@ class BlazeLoader(dict): ) else: last_in_group = last_in_group.reindex(dates) - + import pdb; pdb.set_trace() + # str_cols = df.columns[df.dtypes == categorical_dtype] + # + # # Unstack will fill all missing values with NaN; we need to fix + # # this for strings. + # for col in str_cols: + # last_in_group.iloc[ + # :, last_in_group.columns.get_level_values(0) == col + # ].fillna('None') return last_in_group sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) From c94f3d0c9b7f4ade6231d21caf747d23108c1816 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Mon, 9 May 2016 17:08:03 -0400 Subject: [PATCH 3/6] BUG: fix replacement of NaN with None TST: finish test with expected data STY: alphabetize imports MAINT: simplify condition - remove unnecessary statement --- tests/pipeline/test_blaze.py | 26 +++++++++++++++++++------- zipline/pipeline/loaders/blaze/core.py | 22 +++++++++++----------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 105ed219..a0faf6be 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -30,7 +30,6 @@ from zipline.pipeline.loaders.blaze import ( NoDeltasWarning, ) from zipline.pipeline.loaders.blaze.core import ( - NonNumpyField, NonPipelineField, no_deltas_rules, ) @@ -277,11 +276,11 @@ class BlazeToPipelineTestCase(TestCase): ) def test_cols_with_missing_vals(self): - dates = (pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-03')) * 3 + dates = (pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-03')) df = pd.DataFrame({ - 'sid': self.sids * 2, - 'value': (np.NaN, 0., 1., 2., 3., 2.,), - 'str_value': (None, "b", "c", "d", "e", "f"), + 'sid': self.sids[:-1], + 'value': (0., 1.,), + 'str_value': ("a", "b",), 'asof_date': dates, 'timestamp': dates, }) @@ -299,7 +298,21 @@ class BlazeToPipelineTestCase(TestCase): fields = OrderedDict(expr.dshape.measure.fields) with tmp_asset_finder() as finder: - expected = pd.DataFrame() + expected = pd.DataFrame( + np.array([["a", 0], + [None, np.NaN], + [None, np.NaN], + ["a", 0], + [None, np.NaN], + [None, np.NaN], + ["a", 0], + ["b", 1], + [None, np.NaN]]), + columns=['str_value', 'value'], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)) + ) + ) self._test_id( df, var * Record(fields), @@ -369,7 +382,6 @@ class BlazeToPipelineTestCase(TestCase): dates, finder, ).run_pipeline(p, dates[0], dates[-1]) - import pdb; pdb.set_trace() assert_frame_equal( result, _utc_localize_index_level_0(expected), diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index fd265691..53504fe7 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -177,7 +177,7 @@ from zipline.utils.input_validation import ( ensure_timezone, optionally, ) -from zipline.utils.numpy_utils import repeat_last_axis, categorical_dtype +from zipline.utils.numpy_utils import categorical_dtype, repeat_last_axis from zipline.utils.pandas_utils import sort_values from zipline.utils.preprocess import preprocess @@ -314,7 +314,7 @@ def new_dataset(expr, deltas, missing_values): if isinstance(type_, Option): type_ = type_.ty type_ = type_.to_numpy_dtype() - if not isinstance(type_, String) and not can_represent_dtype(type_): + if not can_represent_dtype(type_): raise NotPipelineCompatible() col = Column( type_, @@ -1006,15 +1006,15 @@ class BlazeLoader(dict): ) else: last_in_group = last_in_group.reindex(dates) - import pdb; pdb.set_trace() - # str_cols = df.columns[df.dtypes == categorical_dtype] - # - # # Unstack will fill all missing values with NaN; we need to fix - # # this for strings. - # for col in str_cols: - # last_in_group.iloc[ - # :, last_in_group.columns.get_level_values(0) == col - # ].fillna('None') + # Unstack will fill all missing values with NaN; we need to fix + # this for strings. + if not df.empty: + str_cols = df.columns[df.dtypes == categorical_dtype] + + for col in str_cols: + last_in_group[col] = last_in_group[col].where(pd.notnull( + last_in_group[col]), None) + return last_in_group sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) From c0eb798cc65d74696116b4a98c3abec14b818972 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 10 May 2016 12:46:53 -0400 Subject: [PATCH 4/6] TST: modify test class to use WithAssetFinder fixture. BUG: assign result to var TST: remove obsolete assertion STY: fix line length --- tests/pipeline/test_blaze.py | 441 ++++++++++++++----------- zipline/pipeline/loaders/blaze/core.py | 76 ++++- 2 files changed, 304 insertions(+), 213 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index a0faf6be..512be3ee 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -6,7 +6,6 @@ from __future__ import division from collections import OrderedDict from datetime import timedelta, time from itertools import product, chain -from unittest import TestCase import warnings import blaze as bz @@ -33,12 +32,13 @@ from zipline.pipeline.loaders.blaze.core import ( NonPipelineField, no_deltas_rules, ) +from zipline.testing.fixtures import WithAssetFinder from zipline.utils.numpy_utils import ( float64_dtype, int64_dtype, repeat_last_axis, ) -from zipline.testing import tmp_asset_finder +from zipline.testing import tmp_asset_finder, ZiplineTestCase nameof = op.attrgetter('name') dtypeof = op.attrgetter('dtype') @@ -73,14 +73,17 @@ def _utc_localize_index_level_0(df): return df -class BlazeToPipelineTestCase(TestCase): +class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): + START_DATE = pd.Timestamp(0) + END_DATE = pd.Timestamp('2015') + @classmethod - def setUpClass(cls): + def init_class_fixtures(cls): + super(BlazeToPipelineTestCase, cls).init_class_fixtures() cls.dates = dates = pd.date_range('2014-01-01', '2014-01-03') dates = cls.dates.repeat(3) - cls.sids = sids = ord('A'), ord('B'), ord('C') cls.df = df = pd.DataFrame({ - 'sid': sids * 3, + 'sid': cls.ASSET_FINDER_EQUITY_SIDS * 3, 'value': (0., 1., 2., 1., 2., 3., 2., 3., 4.), 'int_value': (0, 1, 2, 1, 2, 3, 2, 3, 4), 'asof_date': dates, @@ -121,14 +124,6 @@ class BlazeToPipelineTestCase(TestCase): self.assertTrue(np.isnan(ds.value.missing_value)) self.assertEqual(ds.int_value.missing_value, 0) - invalid_type_fields = ('asof_date',) - - for field in invalid_type_fields: - with self.assertRaises(AttributeError) as e: - getattr(ds, field) - self.assertIn("'%s'" % field, str(e.exception)) - self.assertIn("'datetime64[us]'", str(e.exception)) - # test memoization self.assertIs( from_blaze( @@ -276,11 +271,15 @@ class BlazeToPipelineTestCase(TestCase): ) def test_cols_with_missing_vals(self): - dates = (pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-03')) + dates = (self.dates[0], self.dates[-1]) df = pd.DataFrame({ - 'sid': self.sids[:-1], - 'value': (0., 1.,), + 'sid': self.ASSET_FINDER_EQUITY_SIDS[:-1], + 'float_value': (0., 1.,), 'str_value': ("a", "b",), + 'int_value': (1, 2), + 'bool_value': (True, True), + 'dt_value': (pd.Timestamp('2011-01-01'), + pd.Timestamp('2011-01-02')), 'asof_date': dates, 'timestamp': dates, }) @@ -289,37 +288,85 @@ class BlazeToPipelineTestCase(TestCase): dshape=""" var * { sid: int64, - value: float64, + float_value: float64, str_value: string, + int_value: int64, + bool_value: bool, + dt_value: datetime, asof_date: datetime, timestamp: datetime, }""", ) fields = OrderedDict(expr.dshape.measure.fields) - with tmp_asset_finder() as finder: - expected = pd.DataFrame( - np.array([["a", 0], - [None, np.NaN], - [None, np.NaN], - ["a", 0], - [None, np.NaN], - [None, np.NaN], - ["a", 0], - ["b", 1], - [None, np.NaN]]), - columns=['str_value', 'value'], - index=pd.MultiIndex.from_product( - (self.dates, finder.retrieve_all(self.sids)) - ) - ) - self._test_id( - df, - var * Record(fields), - expected, - finder, - ('value', 'str_value'), + expected = pd.DataFrame({ + "str_value": np.array(["a", + None, + None, + "a", + None, + None, + "a", + "b", + None], + dtype='object'), + "float_value": np.array([0, + np.NaN, + np.NaN, + 0, + np.NaN, + np.NaN, + 0, + 1, + np.NaN], + dtype='float64'), + "int_value": np.array([1, + 0, + 0, + 1, + 0, + 0, + 1, + 2, + 0], + dtype='int64'), + "bool_value": np.array([True, + False, + False, + True, + False, + False, + True, + True, + False], + dtype='bool'), + "dt_value": [pd.Timestamp('2011-01-01'), + pd.NaT, + pd.NaT, + pd.Timestamp('2011-01-01'), + pd.NaT, + pd.NaT, + pd.Timestamp('2011-01-01'), + pd.Timestamp('2011-01-02'), + pd.NaT], + }, + columns=['str_value', 'float_value', 'int_value', 'bool_value', + 'dt_value'], + index=pd.MultiIndex.from_product( + (self.dates, self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS + )) ) + ) + + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('float_value', 'str_value', 'int_value', 'bool_value', + 'dt_value'), + ) def test_complex_expr(self): expr = bz.data(self.df, dshape=self.dshape) @@ -376,15 +423,14 @@ class BlazeToPipelineTestCase(TestCase): p.add(getattr(ds, a).latest, a) dates = self.dates - with tmp_asset_finder() as finder: - result = SimplePipelineEngine( - loader, - dates, - finder, - ).run_pipeline(p, dates[0], dates[-1]) + result = SimplePipelineEngine( + loader, + dates, + finder, + ).run_pipeline(p, dates[0], dates[-1]) assert_frame_equal( - result, - _utc_localize_index_level_0(expected), + result.sort_index(axis=1), + _utc_localize_index_level_0(expected.sort_index(axis=1)), check_dtype=False, ) @@ -408,12 +454,11 @@ class BlazeToPipelineTestCase(TestCase): p.add(ds.int_value.latest, 'int_value') dates = self.dates - with tmp_asset_finder() as finder: - result = SimplePipelineEngine( - loader, - dates, - finder, - ).run_pipeline(p, dates[0], dates[-1]) + result = SimplePipelineEngine( + loader, + dates, + self.asset_finder, + ).run_pipeline(p, dates[0], dates[-1]) expected = df.drop('asof_date', axis=1) expected['timestamp'] = expected['timestamp'].dt.normalize().astype( @@ -423,7 +468,7 @@ class BlazeToPipelineTestCase(TestCase): expected.set_index(['timestamp', 'sid'], inplace=True) expected.index = pd.MultiIndex.from_product(( expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), + self.asset_finder.retrieve_all(expected.index.levels[1]), )) assert_frame_equal(result, expected, check_dtype=False) @@ -453,17 +498,17 @@ class BlazeToPipelineTestCase(TestCase): Equity(66 [B]) 3 Equity(67 [C]) 4 """ - with tmp_asset_finder() as finder: - expected = self.df.drop('asof_date', axis=1).set_index( - ['timestamp', 'sid'], - ) - expected.index = pd.MultiIndex.from_product(( - expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), - )) - self._test_id( - self.df, self.dshape, expected, finder, ('int_value', 'value',) - ) + expected = self.df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + self.asset_finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id( + self.df, self.dshape, expected, self.asset_finder, + ('int_value', 'value',) + ) def test_id_ffill_out_of_window(self): """ @@ -494,7 +539,7 @@ class BlazeToPipelineTestCase(TestCase): """ dates = self.dates.repeat(3) - timedelta(days=10) df = pd.DataFrame({ - 'sid': self.sids * 3, + 'sid': self.ASSET_FINDER_EQUITY_SIDS * 3, 'value': (0, 1, np.nan, 1, np.nan, 3, np.nan, 3, 4), 'other': (0, np.nan, 2, np.nan, 2, 3, 2, 3, np.nan), 'asof_date': dates, @@ -503,29 +548,30 @@ class BlazeToPipelineTestCase(TestCase): fields = OrderedDict(self.dshape.measure.fields) fields['other'] = fields['value'] - with tmp_asset_finder() as finder: - expected = pd.DataFrame( - np.array([[2, 1], - [3, 3], - [3, 4], - [2, 1], - [3, 3], - [3, 4], - [2, 1], - [3, 3], - [3, 4]]), - columns=['other', 'value'], - index=pd.MultiIndex.from_product( - (self.dates, finder.retrieve_all(self.sids)), - ), - ) - self._test_id( - df, - var * Record(fields), - expected, - finder, - ('value', 'other'), - ) + expected = pd.DataFrame( + np.array([[2, 1], + [3, 3], + [3, 4], + [2, 1], + [3, 3], + [3, 4], + [2, 1], + [3, 3], + [3, 4]]), + columns=['other', 'value'], + index=pd.MultiIndex.from_product( + (self.dates, self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS + )), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('value', 'other'), + ) def test_id_multiple_columns(self): """ @@ -557,21 +603,20 @@ class BlazeToPipelineTestCase(TestCase): df['other'] = df.value + 1 fields = OrderedDict(self.dshape.measure.fields) fields['other'] = fields['value'] - with tmp_asset_finder() as finder: - expected = df.drop('asof_date', axis=1).set_index( - ['timestamp', 'sid'], - ).sort_index(axis=1) - expected.index = pd.MultiIndex.from_product(( - expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), - )) - self._test_id( - df, - var * Record(fields), - expected, - finder, - ('value', 'int_value', 'other'), - ) + expected = df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ).sort_index(axis=1) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + self.asset_finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('value', 'int_value', 'other'), + ) def test_id_macro_dataset(self): """ @@ -595,22 +640,21 @@ class BlazeToPipelineTestCase(TestCase): """ asset_info = asset_infos[0][0] nassets = len(asset_info) - with tmp_asset_finder() as finder: - expected = pd.DataFrame( - list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), - index=pd.MultiIndex.from_product(( - self.macro_df.timestamp, - finder.retrieve_all(asset_info.index), - )), - columns=('value',), - ) - self._test_id( - self.macro_df, - self.macro_dshape, - expected, - finder, - ('value',), - ) + expected = pd.DataFrame( + list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), + index=pd.MultiIndex.from_product(( + self.macro_df.timestamp, + self.asset_finder.retrieve_all(asset_info.index), + )), + columns=('value',), + ) + self._test_id( + self.macro_df, + self.macro_dshape, + expected, + self.asset_finder, + ('value',), + ) def test_id_ffill_out_of_window_macro_dataset(self): """ @@ -642,29 +686,30 @@ class BlazeToPipelineTestCase(TestCase): fields = OrderedDict(self.macro_dshape.measure.fields) fields['other'] = fields['value'] - with tmp_asset_finder() as finder: - expected = pd.DataFrame( - np.array([[0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1], - [0, 1]]), - columns=['value', 'other'], - index=pd.MultiIndex.from_product( - (self.dates, finder.retrieve_all(self.sids)), - ), - ).sort_index(axis=1) - self._test_id( - df, - var * Record(fields), - expected, - finder, - ('value', 'other'), - ) + expected = pd.DataFrame( + np.array([[0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1]]), + columns=['value', 'other'], + index=pd.MultiIndex.from_product( + (self.dates, self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS + )), + ), + ).sort_index(axis=1) + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('value', 'other'), + ) def test_id_macro_dataset_multiple_columns(self): """ @@ -739,31 +784,32 @@ class BlazeToPipelineTestCase(TestCase): fields = OrderedDict(self.dshape.measure.fields) fields['other'] = fields['value'] - with tmp_asset_finder() as finder: - expected = pd.DataFrame( - columns=['other', 'value'], - data=[ - [1, 0], # 2014-01-01 Equity(65 [A]) - [np.nan, 1], # Equity(66 [B]) - [2, np.nan], # Equity(67 [C]) - [1, 1], # 2014-01-02 Equity(65 [A]) - [2, 1], # Equity(66 [B]) - [3, 3], # Equity(67 [C]) - [2, 1], # 2014-01-03 Equity(65 [A]) - [3, 3], # Equity(66 [B]) - [3, 3], # Equity(67 [C]) - ], - index=pd.MultiIndex.from_product( - (self.dates, finder.retrieve_all(self.sids)), - ), - ) - self._test_id( - df, - var * Record(fields), - expected, - finder, - ('value', 'other'), - ) + expected = pd.DataFrame( + columns=['other', 'value'], + data=[ + [1, 0], # 2014-01-01 Equity(65 [A]) + [np.nan, 1], # Equity(66 [B]) + [2, np.nan], # Equity(67 [C]) + [1, 1], # 2014-01-02 Equity(65 [A]) + [2, 1], # Equity(66 [B]) + [3, 3], # Equity(67 [C]) + [2, 1], # 2014-01-03 Equity(65 [A]) + [3, 3], # Equity(66 [B]) + [3, 3], # Equity(67 [C]) + ], + index=pd.MultiIndex.from_product( + (self.dates, self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS + )), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('value', 'other'), + ) def test_id_take_last_in_group_macro(self): """ @@ -795,33 +841,34 @@ class BlazeToPipelineTestCase(TestCase): fields = OrderedDict(self.macro_dshape.measure.fields) fields['other'] = fields['value'] - with tmp_asset_finder() as finder: - expected = pd.DataFrame( - columns=[ - 'other', 'value', - ], - data=[ - [np.nan, 1], # 2014-01-01 Equity(65 [A]) - [np.nan, 1], # Equity(66 [B]) - [np.nan, 1], # Equity(67 [C]) - [1, 2], # 2014-01-02 Equity(65 [A]) - [1, 2], # Equity(66 [B]) - [1, 2], # Equity(67 [C]) - [2, 2], # 2014-01-03 Equity(65 [A]) - [2, 2], # Equity(66 [B]) - [2, 2], # Equity(67 [C]) - ], - index=pd.MultiIndex.from_product( - (self.dates, finder.retrieve_all(self.sids)), - ), - ) - self._test_id( - df, - var * Record(fields), - expected, - finder, - ('value', 'other'), - ) + expected = pd.DataFrame( + columns=[ + 'other', 'value', + ], + data=[ + [np.nan, 1], # 2014-01-01 Equity(65 [A]) + [np.nan, 1], # Equity(66 [B]) + [np.nan, 1], # Equity(67 [C]) + [1, 2], # 2014-01-02 Equity(65 [A]) + [1, 2], # Equity(66 [B]) + [1, 2], # Equity(67 [C]) + [2, 2], # 2014-01-03 Equity(65 [A]) + [2, 2], # Equity(66 [B]) + [2, 2], # Equity(67 [C]) + ], + index=pd.MultiIndex.from_product( + (self.dates, self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS + )), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('value', 'other'), + ) def _run_pipeline(self, expr, @@ -1037,7 +1084,7 @@ class BlazeToPipelineTestCase(TestCase): ]) repeated_dates = base_dates.repeat(3) baseline = pd.DataFrame({ - 'sid': self.sids * 2, + 'sid': self.ASSET_FINDER_EQUITY_SIDS * 2, 'value': (0., 1., 2., 1., 2., 3.), 'int_value': (0, 1, 2, 1, 2, 3), 'asof_date': repeated_dates, diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 53504fe7..f676bcf7 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -140,7 +140,6 @@ from datashape import ( floating, isrecord, isscalar, - String ) import numpy as np from odo import odo @@ -177,7 +176,11 @@ from zipline.utils.input_validation import ( ensure_timezone, optionally, ) -from zipline.utils.numpy_utils import categorical_dtype, repeat_last_axis +from zipline.utils.numpy_utils import ( + categorical_dtype, + repeat_last_axis, + datetime64ns_dtype +) from zipline.utils.pandas_utils import sort_values from zipline.utils.preprocess import preprocess @@ -275,6 +278,31 @@ class NotPipelineCompatible(TypeError): _new_names = ('BlazeDataSet_%d' % n for n in count()) +def datashape_type_to_numpy(type_): + """ + Given a datashape type, return the associated numpy type. Maps + datashape's DateTime type to numpy's `datetime64[ns]` dtype, since the + numpy datetime returned by datashape isn't supported by pipeline. + + Parameters + ---------- + type_: datashape.coretypes.Type + The datashape type. + + Returns + ------- + type_ np.dtype + The numpy dtype. + + """ + if isinstance(type_, Option): + type_ = type_.ty + if isinstance(type_, DateTime): + return np.dtype('datetime64[ns]') + else: + return type_.to_numpy_dtype() + + @memoize def new_dataset(expr, deltas, missing_values): """ @@ -310,17 +338,13 @@ def new_dataset(expr, deltas, missing_values): # Terms. if name in (SID_FIELD_NAME, TS_FIELD_NAME): continue - try: - if isinstance(type_, Option): - type_ = type_.ty - type_ = type_.to_numpy_dtype() - if not can_represent_dtype(type_): - raise NotPipelineCompatible() + type_ = datashape_type_to_numpy(type_) + if can_represent_dtype(type_): col = Column( type_, missing_values.get(name, NotSpecified), ) - except NotPipelineCompatible: + else: col = NonPipelineField(name, type_) columns[name] = col @@ -1007,19 +1031,39 @@ class BlazeLoader(dict): else: last_in_group = last_in_group.reindex(dates) # Unstack will fill all missing values with NaN; we need to fix - # this for strings. + # this for all types that are not float. if not df.empty: - str_cols = df.columns[df.dtypes == categorical_dtype] - - for col in str_cols: - last_in_group[col] = last_in_group[col].where(pd.notnull( - last_in_group[col]), None) + for column in columns: + if df[column.name].dtype == categorical_dtype: + last_in_group[column.name] = last_in_group[ + column.name + ].where(pd.notnull(last_in_group[column.name]), + column.missing_value) + # Need to convert from float col to datetime col + elif df[column.name].dtype == datetime64ns_dtype: + last_in_group[column.name] = last_in_group[ + column.name + ].astype('datetime64[ns]') + else: + last_in_group[column.name] = last_in_group[ + column.name + ].fillna(column.missing_value) return last_in_group sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) dense_output = last_in_date_group(sparse_output, reindex=True) - dense_output.ffill(inplace=True) + for column in columns: + if have_sids: + dense_output[column.name] = dense_output[ + column.name + ].apply(lambda x: x.replace( + to_replace=column.missing_value, method='ffill' + )) + else: + dense_output[column.name] = dense_output[column.name].replace( + to_replace=column.missing_value, method='ffill' + ) if have_sids: adjustments_from_deltas = adjustments_from_deltas_with_sids From 3a3c7db844f251d1578336a1fa2c84c908b50821 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 17 May 2016 14:44:15 -0400 Subject: [PATCH 5/6] MAINT: remove filling in missing value and ffill before coercing column --- zipline/pipeline/loaders/blaze/core.py | 40 ++++++++++---------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index f676bcf7..28af4faf 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -1030,40 +1030,30 @@ class BlazeLoader(dict): ) else: last_in_group = last_in_group.reindex(dates) - # Unstack will fill all missing values with NaN; we need to fix - # this for all types that are not float. - if not df.empty: - for column in columns: - if df[column.name].dtype == categorical_dtype: - last_in_group[column.name] = last_in_group[ - column.name - ].where(pd.notnull(last_in_group[column.name]), - column.missing_value) - # Need to convert from float col to datetime col - elif df[column.name].dtype == datetime64ns_dtype: - last_in_group[column.name] = last_in_group[ - column.name - ].astype('datetime64[ns]') - else: - last_in_group[column.name] = last_in_group[ - column.name - ].fillna(column.missing_value) return last_in_group sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) dense_output = last_in_date_group(sparse_output, reindex=True) + dense_output = dense_output.ffill() + + # Unstack will fill all missing values with NaN; we need to fix + # this for all types that are not float. for column in columns: - if have_sids: + if column.dtype == categorical_dtype: dense_output[column.name] = dense_output[ column.name - ].apply(lambda x: x.replace( - to_replace=column.missing_value, method='ffill' - )) + ].where(pd.notnull(dense_output[column.name]), + column.missing_value) + # Need to convert from float col to datetime col + elif column.dtype == datetime64ns_dtype: + dense_output[column.name] = dense_output[ + column.name + ].astype('datetime64[ns]') else: - dense_output[column.name] = dense_output[column.name].replace( - to_replace=column.missing_value, method='ffill' - ) + dense_output[column.name] = dense_output[ + column.name + ].fillna(column.missing_value) if have_sids: adjustments_from_deltas = adjustments_from_deltas_with_sids From e5039a43b09bbcdf6bfd20e6f54abd623312ffd1 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Wed, 18 May 2016 15:48:04 -0400 Subject: [PATCH 6/6] TST: add tests to ensure no forward filling of non-missing values STY: fix indentation DOC: add docs to clarify test input/output --- tests/pipeline/test_blaze.py | 209 ++++++++++++++++++++++--- zipline/pipeline/loaders/blaze/core.py | 32 ++-- 2 files changed, 212 insertions(+), 29 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 512be3ee..742a6b70 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -270,19 +270,190 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): NonPipelineField, ) - def test_cols_with_missing_vals(self): - dates = (self.dates[0], self.dates[-1]) + def test_cols_with_all_missing_vals(self): + """ + Tests that when there is no known data, we get output where the + columns have the right dtypes and the right missing values filled in. + + input (self.df): + Empty DataFrame + Columns: [sid, float_value, str_value, int_value, bool_value, dt_value, + asof_date, timestamp] + Index: [] + + output (expected) + str_value float_value int_value + 2014-01-01 Equity(65 [A]) None NaN 0 + Equity(66 [B]) None NaN 0 + Equity(67 [C]) None NaN 0 + 2014-01-02 Equity(65 [A]) None NaN 0 + Equity(66 [B]) None NaN 0 + Equity(67 [C]) None NaN 0 + 2014-01-03 Equity(65 [A]) None NaN 0 + Equity(66 [B]) None NaN 0 + Equity(67 [C]) None NaN 0 + + dt_value bool_value + 2014-01-01 Equity(65 [A]) NaT False + Equity(66 [B]) NaT False + Equity(67 [C]) NaT False + 2014-01-02 Equity(65 [A]) NaT False + Equity(66 [B]) NaT False + Equity(67 [C]) NaT False + 2014-01-03 Equity(65 [A]) NaT False + Equity(66 [B]) NaT False + Equity(67 [C]) NaT False + """ + df = pd.DataFrame(columns=['sid', 'float_value', 'str_value', + 'int_value', 'bool_value', 'dt_value', + 'asof_date', 'timestamp']) + + expr = bz.data( + df, + dshape=""" + var * { + sid: int64, + float_value: float64, + str_value: string, + int_value: int64, + bool_value: bool, + dt_value: datetime, + asof_date: datetime, + timestamp: datetime, + }""", + ) + fields = OrderedDict(expr.dshape.measure.fields) + + expected = pd.DataFrame({ + "str_value": np.array([None, + None, + None, + None, + None, + None, + None, + None, + None], + dtype='object'), + "float_value": np.array([np.NaN, + np.NaN, + np.NaN, + np.NaN, + np.NaN, + np.NaN, + np.NaN, + np.NaN, + np.NaN], + dtype='float64'), + "int_value": np.array([0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0], + dtype='int64'), + "bool_value": np.array([False, + False, + False, + False, + False, + False, + False, + False, + False], + dtype='bool'), + "dt_value": [pd.NaT, + pd.NaT, + pd.NaT, + pd.NaT, + pd.NaT, + pd.NaT, + pd.NaT, + pd.NaT, + pd.NaT], + }, + columns=['str_value', 'float_value', 'int_value', 'bool_value', + 'dt_value'], + index=pd.MultiIndex.from_product( + (self.dates, self.asset_finder.retrieve_all( + self.ASSET_FINDER_EQUITY_SIDS + )) + ) + ) + + self._test_id( + df, + var * Record(fields), + expected, + self.asset_finder, + ('float_value', 'str_value', 'int_value', 'bool_value', + 'dt_value'), + ) + + def test_cols_with_some_missing_vals(self): + """ + Tests the following: + 1) Forward filling replaces missing values correctly for the data + types supported in pipeline. + 2) We don't forward fill when the missing value is the actual value + we got for a date in the case of int/bool columns. + 3) We get the correct type of missing value in the output. + + input (self.df): + asof_date bool_value dt_value float_value int_value sid + 0 2014-01-01 True 2011-01-01 0 1 65 + 1 2014-01-03 True 2011-01-02 1 2 66 + 2 2014-01-01 True 2011-01-03 2 3 67 + 3 2014-01-02 False NaT NaN 0 67 + + str_value timestamp + 0 a 2014-01-01 + 1 b 2014-01-03 + 2 c 2014-01-01 + 3 None 2014-01-02 + + output (expected) + str_value float_value int_value bool_value + 2014-01-01 Equity(65 [A]) a 0 1 True + Equity(66 [B]) None NaN 0 False + Equity(67 [C]) c 2 3 True + 2014-01-02 Equity(65 [A]) a 0 1 True + Equity(66 [B]) None NaN 0 False + Equity(67 [C]) c 2 0 False + 2014-01-03 Equity(65 [A]) a 0 1 True + Equity(66 [B]) b 1 2 True + Equity(67 [C]) c 2 0 False + + dt_value + 2014-01-01 Equity(65 [A]) 2011-01-01 + Equity(66 [B]) NaT + Equity(67 [C]) 2011-01-03 + 2014-01-02 Equity(65 [A]) 2011-01-01 + Equity(66 [B]) NaT + Equity(67 [C]) 2011-01-03 + 2014-01-03 Equity(65 [A]) 2011-01-01 + Equity(66 [B]) 2011-01-02 + Equity(67 [C]) 2011-01-03 + """ + dates = (self.dates[0], self.dates[-1], self.dates[0], self.dates[1]) df = pd.DataFrame({ - 'sid': self.ASSET_FINDER_EQUITY_SIDS[:-1], - 'float_value': (0., 1.,), - 'str_value': ("a", "b",), - 'int_value': (1, 2), - 'bool_value': (True, True), + 'sid': self.ASSET_FINDER_EQUITY_SIDS[:-1] + + (self.ASSET_FINDER_EQUITY_SIDS[-1],)*2, + 'float_value': (0., 1., 2., np.NaN), + 'str_value': ("a", "b", "c", None), + 'int_value': (1, 2, 3, 0), + 'bool_value': (True, True, True, False), 'dt_value': (pd.Timestamp('2011-01-01'), - pd.Timestamp('2011-01-02')), + pd.Timestamp('2011-01-02'), + pd.Timestamp('2011-01-03'), + pd.NaT), 'asof_date': dates, 'timestamp': dates, }) + expr = bz.data( df, dshape=""" @@ -302,27 +473,27 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): expected = pd.DataFrame({ "str_value": np.array(["a", None, - None, + "c", "a", None, - None, + "c", "a", "b", - None], + "c"], dtype='object'), "float_value": np.array([0, np.NaN, - np.NaN, + 2, 0, np.NaN, - np.NaN, + 2, 0, 1, - np.NaN], + 2], dtype='float64'), "int_value": np.array([1, 0, - 0, + 3, 1, 0, 0, @@ -332,7 +503,7 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): dtype='int64'), "bool_value": np.array([True, False, - False, + True, True, False, False, @@ -342,13 +513,13 @@ class BlazeToPipelineTestCase(WithAssetFinder, ZiplineTestCase): dtype='bool'), "dt_value": [pd.Timestamp('2011-01-01'), pd.NaT, - pd.NaT, + pd.Timestamp('2011-01-03'), pd.Timestamp('2011-01-01'), pd.NaT, - pd.NaT, + pd.Timestamp('2011-01-03'), pd.Timestamp('2011-01-01'), pd.Timestamp('2011-01-02'), - pd.NaT], + pd.Timestamp('2011-01-03')], }, columns=['str_value', 'float_value', 'int_value', 'bool_value', 'dt_value'], diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 28af4faf..d17612a4 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -179,7 +179,6 @@ from zipline.utils.input_validation import ( from zipline.utils.numpy_utils import ( categorical_dtype, repeat_last_axis, - datetime64ns_dtype ) from zipline.utils.pandas_utils import sort_values from zipline.utils.preprocess import preprocess @@ -1035,25 +1034,38 @@ class BlazeLoader(dict): sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) dense_output = last_in_date_group(sparse_output, reindex=True) - dense_output = dense_output.ffill() + dense_output.ffill(inplace=True) - # Unstack will fill all missing values with NaN; we need to fix - # this for all types that are not float. + # Fill in missing values specified by each column. This is made + # significantly more complex by the fact that we need to work around + # two pandas issues: + + # 1) When we have sids, if there are no records for a given sid for any + # dates, pandas will generate a column full of NaNs for that sid. + # This means that some of the columns in `dense_output` are now + # float instead of the intended dtype, so we have to coerce back to + # our expected type and convert NaNs into the desired missing value. + + # 2) DataFrame.ffill assumes that receiving None as a fill-value means + # that no value was passed. Consequently, there's no way to tell + # pandas to replace NaNs in an object column with None using fillna, + # so we have to roll our own instead using df.where. for column in columns: + # Special logic for strings since `fillna` doesn't work if the + # missing value is `None`. if column.dtype == categorical_dtype: dense_output[column.name] = dense_output[ column.name ].where(pd.notnull(dense_output[column.name]), column.missing_value) - # Need to convert from float col to datetime col - elif column.dtype == datetime64ns_dtype: - dense_output[column.name] = dense_output[ - column.name - ].astype('datetime64[ns]') else: + # We need to execute `fillna` before `astype` in case the + # column contains NaNs and needs to be cast to bool or int. + # This is so that the NaNs are replaced first, since pandas + # can't convert NaNs for those types. dense_output[column.name] = dense_output[ column.name - ].fillna(column.missing_value) + ].fillna(column.missing_value).astype(column.dtype) if have_sids: adjustments_from_deltas = adjustments_from_deltas_with_sids