From 97298d1ad45a4d323c1732c0c2fc2cc37f7a2126 Mon Sep 17 00:00:00 2001 From: llllllllll Date: Mon, 2 Nov 2015 16:32:30 -0500 Subject: [PATCH 1/6] ENH: upgrade ffill logic to look back as far as needed --- tests/pipeline/test_blaze.py | 4 +- zipline/pipeline/loaders/blaze/core.py | 118 +++++++++++++++++-------- 2 files changed, 84 insertions(+), 38 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 11d741b2..a8b57e8c 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -328,8 +328,8 @@ class BlazeToPipelineTestCase(TestCase): df['timestamp'] = ( pd.DatetimeIndex(df['timestamp'], tz='EST') + timedelta(hours=8, minutes=44) - ).tz_convert('utc') - df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45', tz='utc') + ).tz_convert('utc').tz_localize(None) + df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45') expr = bz.Data(df, name='expr', dshape=self.dshape) loader = BlazeLoader(data_query_time=time(8, 45), data_query_tz='EST') ds = from_blaze( diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 75e365da..fb79db8d 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -127,7 +127,7 @@ from __future__ import division, absolute_import from abc import ABCMeta, abstractproperty from collections import namedtuple, defaultdict from copy import copy -from functools import partial +from functools import partial, reduce from itertools import count import warnings from weakref import WeakKeyDictionary @@ -188,7 +188,7 @@ traversable_nodes = ( bz.expr.Label, ) is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) -getname = op.attrgetter('__name__') +get__name__ = op.attrgetter('__name__') class _ExprRepr(object): @@ -523,8 +523,10 @@ def from_blaze(expr, raise TypeError( 'expression with deltas may only contain (%s) nodes,' " found: %s" % ( - ', '.join(map(getname, valid_deltas_node_types)), - ', '.join(set(map(compose(getname, type), invalid_nodes))), + ', '.join(map(get__name__, valid_deltas_node_types)), + ', '.join( + set(map(compose(get__name__, type), invalid_nodes)), + ), ), ) @@ -602,7 +604,7 @@ def from_blaze(expr, getdataset = op.attrgetter('dataset') -dataset_name = op.attrgetter('name') +getname = op.attrgetter('name') def overwrite_novel_deltas(baseline, deltas, dates): @@ -778,8 +780,10 @@ class BlazeLoader(dict): Parameters ---------- - colmap : mapping[BoundColumn -> tuple[Expr, Expr, any]], optional - The initial column mapping to use. + dsmap : mapping, optional + An initial mapping of datasets to ``ExprData`` objects. + NOTE: Further mutations to this map will not be reflected by this + object. data_query_time : time, optional The time to use for the data query cutoff. data_query_tz : tzinfo or str @@ -787,11 +791,10 @@ class BlazeLoader(dict): """ @preprocess(data_query_tz=optionally(ensure_timezone)) def __init__(self, - colmap=None, + dsmap=None, data_query_time=None, data_query_tz=None): - self.update(colmap or {}) - + self.update(dsmap or {}) check_data_query_args(data_query_time, data_query_tz) self._data_query_time = data_query_time self._data_query_tz = data_query_tz @@ -826,8 +829,7 @@ class BlazeLoader(dict): expr, deltas, resources = self[dataset] have_sids = SID_FIELD_NAME in expr.fields assets = list(map(int, assets)) # coerce from numpy.int64 - fields = list(map(dataset_name, columns)) - query_fields = fields + [AD_FIELD_NAME, TS_FIELD_NAME] + ( + added_query_fields = [AD_FIELD_NAME, TS_FIELD_NAME] + ( [SID_FIELD_NAME] if have_sids else [] ) @@ -840,9 +842,47 @@ class BlazeLoader(dict): data_query_tz, ) - def where(e): + def where(e, column): """Create the query to run against the resources. + Parameters + ---------- + e : Expr + The baseline or deltas expression. + column : BoundColumn + The column to query for. + + Returns + ------- + q : Expr + The query to run for the given column. + """ + colname = column.name + filtered = e[e[colname].notnull() & (e[TS_FIELD_NAME] <= lower_dt)] + lower = filtered.timestamp.max() + + if have_sids: + # If we have sids, then we need to take the earliest of the + # greatest date that has a non-null value by sid. + lower = bz.by( + filtered[SID_FIELD_NAME], + timestamp=lower, + ).timestamp.min() + + lower = odo(lower, pd.Timestamp) + if lower is pd.NaT: + # If there is no lower date, just query for data in he date + # range. It must all be null anyways. + lower = lower_dt + + return e[ + (e[TS_FIELD_NAME] >= lower) & + (e[TS_FIELD_NAME] <= upper_dt) + ][added_query_fields + [colname]] + + def collect_expr(e, _kwargs={'d': resources} if resources else {}): + """Execute and merge all of the per-column subqueries. + Parameters ---------- e : Expr @@ -850,28 +890,29 @@ class BlazeLoader(dict): Returns ------- - q : Expr - The query to run. + result : pd.DataFrame + The resulting dataframe. + + Notes + ----- + This can return more data than needed. The in memory reindex will + handle this. """ - ts = e[TS_FIELD_NAME] - # Hack to get the lower bound to query: - # This must be strictly executed because the data for `ts` will - # be removed from scope too early otherwise. - lower = odo(ts[ts <= lower_dt].max(), pd.Timestamp) - selection = ts <= upper_dt - if have_sids: - selection &= e[SID_FIELD_NAME].isin(assets) - if lower is not pd.NaT: - selection &= ts >= lower + return reduce( + partial(pd.merge, on=added_query_fields, how='outer'), + ( + odo(where(e, column), pd.DataFrame, **_kwargs) + for column in columns + ), + ) - return e[selection][query_fields] - - extra_kwargs = {'d': resources} if resources else {} - materialized_expr = odo(where(expr), pd.DataFrame, **extra_kwargs) + materialized_expr = collect_expr(expr) materialized_deltas = ( - odo(where(deltas), pd.DataFrame, **extra_kwargs) + collect_expr(deltas) if deltas is not None else - pd.DataFrame(columns=query_fields) + pd.DataFrame( + columns=added_query_fields + list(map(getname, columns)), + ) ) if data_query_time is not None: @@ -907,10 +948,11 @@ class BlazeLoader(dict): sparse_deltas = non_novel_deltas.set_index( [TS_FIELD_NAME, SID_FIELD_NAME], ).unstack() - - dense_output = sparse_output.reindex(dates, method='ffill') - cols = dense_output.columns - dense_output = dense_output.reindex( + cols = sparse_output.columns + dense_output = sparse_output.groupby( + dates[dates.searchsorted(sparse_output.index)], + ).last().reindex( + index=dates, columns=pd.MultiIndex.from_product( (cols.levels[0], assets), names=cols.names, @@ -933,10 +975,14 @@ class BlazeLoader(dict): partial(repeat_last_axis, count=len(assets)), ) sparse_output = sparse_output.set_index(TS_FIELD_NAME) - dense_output = sparse_output.reindex(dates, method='ffill') + dense_output = sparse_output.groupby( + dates[dates.searchsorted(sparse_output.index)], + ).last().reindex(dates) sparse_deltas = non_novel_deltas.set_index(TS_FIELD_NAME) adjustments_from_deltas = adjustments_from_deltas_no_sids + dense_output.ffill(inplace=True) + for column_idx, column in enumerate(columns): column_name = column.name yield column, AdjustedArray( From 517ad7a5f1b88ec408eef63e10c6d2fe9c6110d2 Mon Sep 17 00:00:00 2001 From: llllllllll Date: Mon, 2 Nov 2015 16:32:50 -0500 Subject: [PATCH 2/6] TST: Adds tests for running multiple columns in one query --- tests/pipeline/test_blaze.py | 78 ++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index a8b57e8c..fbad3fda 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -360,6 +360,43 @@ class BlazeToPipelineTestCase(TestCase): )) assert_frame_equal(result, expected, check_dtype=False) + def test_id_multiple_columns(self): + df = self.df.copy() + df['other'] = df.value + 1 + fields = OrderedDict(self.dshape.measure.fields) + fields['other'] = fields['value'] + expr = bz.Data(df, name='expr', dshape=var * Record(fields)) + loader = BlazeLoader() + ds = from_blaze( + expr, + loader=loader, + no_deltas_rule='ignore', + ) + p = Pipeline() + p.add(ds.value.latest, 'value') + p.add(ds.other.latest, 'other') + dates = self.dates + + with tmp_asset_finder() as finder: + result = SimplePipelineEngine( + loader, + dates, + finder, + ).run_pipeline(p, dates[0], dates[-1]) + + expected = df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ).sort_index(axis=1) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + finder.retrieve_all(expected.index.levels[1]), + )) + assert_frame_equal( + result, + expected.sort_index(axis=1), + check_dtype=False, + ) + def test_id_macro_dataset(self): expr = bz.Data(self.macro_df, name='expr', dshape=self.macro_dshape) loader = BlazeLoader() @@ -391,6 +428,47 @@ class BlazeToPipelineTestCase(TestCase): ) assert_frame_equal(result, expected, check_dtype=False) + def test_id_macro_dataset_multiple_columns(self): + df = self.macro_df.copy() + df['other'] = df.value + 1 + fields = OrderedDict(self.macro_dshape.measure.fields) + fields['other'] = fields['value'] + expr = bz.Data(df, name='expr', dshape=var * Record(fields)) + loader = BlazeLoader() + ds = from_blaze( + expr, + loader=loader, + no_deltas_rule='ignore', + ) + p = Pipeline() + p.add(ds.value.latest, 'value') + p.add(ds.other.latest, 'latest') + dates = self.dates + + asset_info = asset_infos[0][0] + with tmp_asset_finder(asset_info) as finder: + result = SimplePipelineEngine( + loader, + dates, + finder, + ).run_pipeline(p, dates[0], dates[-1]) + + expected = pd.DataFrame( + np.array([[0, 1], + [1, 2], + [2, 3]]).repeat(3, axis=0), + index=pd.MultiIndex.from_product(( + df.timestamp, + finder.retrieve_all(asset_info.index), + )), + columns=('value', 'latest'), + ).sort_index(axis=1) + assert_frame_equal( + result, + expected.sort_index(axis=1), + check_dtype=False, + ) + def _run_pipeline(self, expr, deltas, From 5e4b8b8f8f9f61ba79de822eccf6fabb84393fe9 Mon Sep 17 00:00:00 2001 From: llllllllll Date: Mon, 2 Nov 2015 20:55:09 -0500 Subject: [PATCH 3/6] TST: tests for new forward fill --- tests/pipeline/test_blaze.py | 320 +++++++++++++++++++++++++++-------- 1 file changed, 251 insertions(+), 69 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index fbad3fda..18404cdc 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -295,8 +295,8 @@ class BlazeToPipelineTestCase(TestCase): loader=self.garbage_loader, ) - def test_id(self): - expr = bz.Data(self.df, name='expr', dshape=self.dshape) + def _test_id(self, df, dshape, expected, finder, add): + expr = bz.Data(df, name='expr', dshape=dshape) loader = BlazeLoader() ds = from_blaze( expr, @@ -304,7 +304,8 @@ class BlazeToPipelineTestCase(TestCase): no_deltas_rule=no_deltas_rules.ignore, ) p = Pipeline() - p.add(ds.value.latest, 'value') + for a in add: + p.add(getattr(ds, a).latest, a) dates = self.dates with tmp_asset_finder() as finder: @@ -314,13 +315,6 @@ class BlazeToPipelineTestCase(TestCase): finder, ).run_pipeline(p, dates[0], dates[-1]) - expected = self.df.drop('asof_date', axis=1).set_index( - ['timestamp', 'sid'], - ) - expected.index = pd.MultiIndex.from_product(( - expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), - )) assert_frame_equal(result, expected, check_dtype=False) def test_custom_query_time_tz(self): @@ -360,75 +354,263 @@ class BlazeToPipelineTestCase(TestCase): )) assert_frame_equal(result, expected, check_dtype=False) + def test_id(self): + """ + input (self.df): + asof_date sid timestamp value + 0 2014-01-01 65 2014-01-01 0 + 1 2014-01-01 66 2014-01-01 1 + 2 2014-01-01 67 2014-01-01 2 + 3 2014-01-02 65 2014-01-02 1 + 4 2014-01-02 66 2014-01-02 2 + 5 2014-01-02 67 2014-01-02 3 + 6 2014-01-03 65 2014-01-03 2 + 7 2014-01-03 66 2014-01-03 3 + 8 2014-01-03 67 2014-01-03 4 + + output (expected) + value + 2014-01-01 Equity(65 [A]) 0 + Equity(66 [B]) 1 + Equity(67 [C]) 2 + 2014-01-02 Equity(65 [A]) 1 + Equity(66 [B]) 2 + Equity(67 [C]) 3 + 2014-01-03 Equity(65 [A]) 2 + Equity(66 [B]) 3 + Equity(67 [C]) 4 + """ + with tmp_asset_finder() as finder: + expected = self.df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id(self.df, self.dshape, expected, finder, ('value',)) + + def test_id_ffill_out_of_window(self): + """ + input (df): + + asof_date timestamp sid other value + 0 2013-12-22 2013-12-22 65 0 0 + 1 2013-12-22 2013-12-22 66 NaN 1 + 2 2013-12-22 2013-12-22 67 2 NaN + 3 2013-12-23 2013-12-23 65 NaN 1 + 4 2013-12-23 2013-12-23 66 2 NaN + 5 2013-12-23 2013-12-23 67 3 3 + 6 2013-12-24 2013-12-24 65 2 NaN + 7 2013-12-24 2013-12-24 66 3 3 + 8 2013-12-24 2013-12-24 67 NaN 4 + + output (expected): + other value + 2014-01-01 Equity(65 [A]) 2 1 + Equity(66 [B]) 3 3 + Equity(67 [C]) 3 4 + 2014-01-02 Equity(65 [A]) 2 1 + Equity(66 [B]) 3 3 + Equity(67 [C]) 3 4 + 2014-01-03 Equity(65 [A]) 2 1 + Equity(66 [B]) 3 3 + Equity(67 [C]) 3 4 + """ + dates = self.dates.repeat(3) - timedelta(days=10) + df = pd.DataFrame({ + 'sid': self.sids * 3, + 'value': (0, 1, np.nan, 1, np.nan, 3, np.nan, 3, 4), + 'other': (0, np.nan, 2, np.nan, 2, 3, 2, 3, np.nan), + 'asof_date': dates, + 'timestamp': dates, + }) + fields = OrderedDict(self.dshape.measure.fields) + fields['other'] = fields['value'] + + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + np.array([[2, 1], + [3, 3], + [3, 4], + [2, 1], + [3, 3], + [3, 4], + [2, 1], + [3, 3], + [3, 4]]), + columns=['other', 'value'], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'other'), + ) + def test_id_multiple_columns(self): + """ + input (df): + asof_date sid timestamp value other + 0 2014-01-01 65 2014-01-01 0 1 + 1 2014-01-01 66 2014-01-01 1 2 + 2 2014-01-01 67 2014-01-01 2 3 + 3 2014-01-02 65 2014-01-02 1 2 + 4 2014-01-02 66 2014-01-02 2 3 + 5 2014-01-02 67 2014-01-02 3 4 + 6 2014-01-03 65 2014-01-03 2 3 + 7 2014-01-03 66 2014-01-03 3 4 + 8 2014-01-03 67 2014-01-03 4 5 + + output (expected): + value other + 2014-01-01 Equity(65 [A]) 0 1 + Equity(66 [B]) 1 2 + Equity(67 [C]) 2 3 + 2014-01-02 Equity(65 [A]) 1 2 + Equity(66 [B]) 2 3 + Equity(67 [C]) 3 4 + 2014-01-03 Equity(65 [A]) 2 3 + Equity(66 [B]) 3 4 + Equity(67 [C]) 4 5 + """ df = self.df.copy() df['other'] = df.value + 1 fields = OrderedDict(self.dshape.measure.fields) fields['other'] = fields['value'] - expr = bz.Data(df, name='expr', dshape=var * Record(fields)) - loader = BlazeLoader() - ds = from_blaze( - expr, - loader=loader, - no_deltas_rule='ignore', - ) - p = Pipeline() - p.add(ds.value.latest, 'value') - p.add(ds.other.latest, 'other') - dates = self.dates - with tmp_asset_finder() as finder: - result = SimplePipelineEngine( - loader, - dates, + expected = df.drop('asof_date', axis=1).set_index( + ['timestamp', 'sid'], + ).sort_index(axis=1) + expected.index = pd.MultiIndex.from_product(( + expected.index.levels[0], + finder.retrieve_all(expected.index.levels[1]), + )) + self._test_id( + df, + var * Record(fields), + expected, finder, - ).run_pipeline(p, dates[0], dates[-1]) - - expected = df.drop('asof_date', axis=1).set_index( - ['timestamp', 'sid'], - ).sort_index(axis=1) - expected.index = pd.MultiIndex.from_product(( - expected.index.levels[0], - finder.retrieve_all(expected.index.levels[1]), - )) - assert_frame_equal( - result, - expected.sort_index(axis=1), - check_dtype=False, - ) + ('value', 'other'), + ) def test_id_macro_dataset(self): - expr = bz.Data(self.macro_df, name='expr', dshape=self.macro_dshape) - loader = BlazeLoader() - ds = from_blaze( - expr, - loader=loader, - no_deltas_rule=no_deltas_rules.ignore, - ) - p = Pipeline() - p.add(ds.value.latest, 'value') - dates = self.dates + """ + input (self.macro_df) + asof_date timestamp value + 0 2014-01-01 2014-01-01 0 + 3 2014-01-02 2014-01-02 1 + 6 2014-01-03 2014-01-03 2 + output (expected): + value + 2014-01-01 Equity(65 [A]) 0 + Equity(66 [B]) 0 + Equity(67 [C]) 0 + 2014-01-02 Equity(65 [A]) 1 + Equity(66 [B]) 1 + Equity(67 [C]) 1 + 2014-01-03 Equity(65 [A]) 2 + Equity(66 [B]) 2 + Equity(67 [C]) 2 + """ asset_info = asset_infos[0][0] - with tmp_asset_finder(equities=asset_info) as finder: - result = SimplePipelineEngine( - loader, - dates, - finder, - ).run_pipeline(p, dates[0], dates[-1]) - nassets = len(asset_info) - expected = pd.DataFrame( - list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), - index=pd.MultiIndex.from_product(( - self.macro_df.timestamp, - finder.retrieve_all(asset_info.index), - )), - columns=('value',), - ) - assert_frame_equal(result, expected, check_dtype=False) + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), + index=pd.MultiIndex.from_product(( + self.macro_df.timestamp, + finder.retrieve_all(asset_info.index), + )), + columns=('value',), + ) + self._test_id( + self.macro_df, + self.macro_dshape, + expected, + finder, + ('value',), + ) + + def test_id_ffill_out_of_window_macro_dataset(self): + """ + input (df): + asof_date timestamp other value + 0 2013-12-22 2013-12-22 NaN 0 + 1 2013-12-23 2013-12-23 1 NaN + 2 2013-12-24 2013-12-24 NaN NaN + + output (expected): + other value + 2014-01-01 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + 2014-01-02 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + 2014-01-03 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + """ + dates = self.dates - timedelta(days=10) + df = pd.DataFrame({ + 'value': (0, np.nan, np.nan), + 'other': (np.nan, 1, np.nan), + 'asof_date': dates, + 'timestamp': dates, + }) + fields = OrderedDict(self.macro_dshape.measure.fields) + fields['other'] = fields['value'] + + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + np.array([[0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1], + [0, 1]]), + columns=['value', 'other'], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)), + ), + ).sort_index(axis=1) + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'other'), + ) def test_id_macro_dataset_multiple_columns(self): + """ + input (df): + asof_date timestamp other value + 0 2014-01-01 2014-01-01 1 0 + 3 2014-01-02 2014-01-02 2 1 + 6 2014-01-03 2014-01-03 3 2 + + output (expected): + other value + 2014-01-01 Equity(65 [A]) 1 0 + Equity(66 [B]) 1 0 + Equity(67 [C]) 1 0 + 2014-01-02 Equity(65 [A]) 2 1 + Equity(66 [B]) 2 1 + Equity(67 [C]) 2 1 + 2014-01-03 Equity(65 [A]) 3 2 + Equity(66 [B]) 3 2 + Equity(67 [C]) 3 2 + """ df = self.macro_df.copy() df['other'] = df.value + 1 fields = OrderedDict(self.macro_dshape.measure.fields) @@ -438,15 +620,15 @@ class BlazeToPipelineTestCase(TestCase): ds = from_blaze( expr, loader=loader, - no_deltas_rule='ignore', + no_deltas_rule=no_deltas_rules.ignore, ) p = Pipeline() p.add(ds.value.latest, 'value') - p.add(ds.other.latest, 'latest') + p.add(ds.other.latest, 'other') dates = self.dates asset_info = asset_infos[0][0] - with tmp_asset_finder(asset_info) as finder: + with tmp_asset_finder(equities=asset_info) as finder: result = SimplePipelineEngine( loader, dates, @@ -461,7 +643,7 @@ class BlazeToPipelineTestCase(TestCase): df.timestamp, finder.retrieve_all(asset_info.index), )), - columns=('value', 'latest'), + columns=('value', 'other'), ).sort_index(axis=1) assert_frame_equal( result, From 33db0e330ec7806464064f8f8b359d04874a0600 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Thu, 14 Jan 2016 20:06:59 -0500 Subject: [PATCH 4/6] BLD: update blaze --- etc/requirements_blaze.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/requirements_blaze.txt b/etc/requirements_blaze.txt index 36bd76cb..113ed569 100644 --- a/etc/requirements_blaze.txt +++ b/etc/requirements_blaze.txt @@ -1,3 +1,3 @@ --e git://github.com/quantopian/blaze.git@43d2f7e00a228106cea038a53322497831539559#egg=blaze-dev +-e git://github.com/quantopian/blaze.git@c785f07f83b591a8d53db3a5013423f6b0f7ffd3#egg=blaze-dev -e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev -e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev From 9adef3737317a5de471f8759880ab39c228fe3e6 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Tue, 19 Jan 2016 22:09:03 -0500 Subject: [PATCH 5/6] BUG: Fix the case where multiple values happen on the same day --- tests/pipeline/test_blaze.py | 112 +++++++++++++++++++++++++ zipline/pipeline/loaders/blaze/core.py | 90 +++++++++++--------- 2 files changed, 161 insertions(+), 41 deletions(-) diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 18404cdc..56a06e35 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -651,6 +651,118 @@ class BlazeToPipelineTestCase(TestCase): check_dtype=False, ) + def test_id_take_last_in_group(self): + T = pd.Timestamp + df = pd.DataFrame( + columns=['asof_date', 'timestamp', 'sid', 'other', 'value'], + data=[ + [T('2014-01-01'), T('2014-01-01 00'), 65, 0, 0], + [T('2014-01-01'), T('2014-01-01 01'), 65, 1, np.nan], + [T('2014-01-01'), T('2014-01-01 00'), 66, np.nan, np.nan], + [T('2014-01-01'), T('2014-01-01 01'), 66, np.nan, 1], + [T('2014-01-01'), T('2014-01-01 00'), 67, 2, np.nan], + [T('2014-01-01'), T('2014-01-01 01'), 67, np.nan, np.nan], + [T('2014-01-02'), T('2014-01-02 00'), 65, np.nan, np.nan], + [T('2014-01-02'), T('2014-01-02 01'), 65, np.nan, 1], + [T('2014-01-02'), T('2014-01-02 00'), 66, np.nan, np.nan], + [T('2014-01-02'), T('2014-01-02 01'), 66, 2, np.nan], + [T('2014-01-02'), T('2014-01-02 00'), 67, 3, 3], + [T('2014-01-02'), T('2014-01-02 01'), 67, 3, 3], + [T('2014-01-03'), T('2014-01-03 00'), 65, 2, np.nan], + [T('2014-01-03'), T('2014-01-03 01'), 65, 2, np.nan], + [T('2014-01-03'), T('2014-01-03 00'), 66, 3, 3], + [T('2014-01-03'), T('2014-01-03 01'), 66, np.nan, np.nan], + [T('2014-01-03'), T('2014-01-03 00'), 67, np.nan, np.nan], + [T('2014-01-03'), T('2014-01-03 01'), 67, np.nan, 4], + ], + ) + fields = OrderedDict(self.dshape.measure.fields) + fields['other'] = fields['value'] + + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + columns=['other', 'value'], + data=[ + [1, 0], # 2014-01-01 Equity(65 [A]) + [np.nan, 1], # Equity(66 [B]) + [2, np.nan], # Equity(67 [C]) + [1, 1], # 2014-01-02 Equity(65 [A]) + [2, 1], # Equity(66 [B]) + [3, 3], # Equity(67 [C]) + [2, 1], # 2014-01-03 Equity(65 [A]) + [3, 3], # Equity(66 [B]) + [3, 3], # Equity(67 [C]) + ], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'other'), + ) + + def test_id_take_last_in_group_macro(self): + """ + output (expected): + + other value + 2014-01-01 Equity(65 [A]) NaN 1 + Equity(66 [B]) NaN 1 + Equity(67 [C]) NaN 1 + 2014-01-02 Equity(65 [A]) 1 2 + Equity(66 [B]) 1 2 + Equity(67 [C]) 1 2 + 2014-01-03 Equity(65 [A]) 2 2 + Equity(66 [B]) 2 2 + Equity(67 [C]) 2 2 + """ + T = pd.Timestamp + df = pd.DataFrame( + columns=['asof_date', 'timestamp', 'other', 'value'], + data=[ + [T('2014-01-01'), T('2014-01-01 00'), np.nan, 1], + [T('2014-01-01'), T('2014-01-01 01'), np.nan, np.nan], + [T('2014-01-02'), T('2014-01-02 00'), 1, np.nan], + [T('2014-01-02'), T('2014-01-02 01'), np.nan, 2], + [T('2014-01-03'), T('2014-01-03 00'), 2, np.nan], + [T('2014-01-03'), T('2014-01-03 01'), 3, 3], + ], + ) + fields = OrderedDict(self.macro_dshape.measure.fields) + fields['other'] = fields['value'] + + with tmp_asset_finder() as finder: + expected = pd.DataFrame( + columns=[ + 'other', 'value', + ], + data=[ + [np.nan, 1], # 2014-01-01 Equity(65 [A]) + [np.nan, 1], # Equity(66 [B]) + [np.nan, 1], # Equity(67 [C]) + [1, 2], # 2014-01-02 Equity(65 [A]) + [1, 2], # Equity(66 [B]) + [1, 2], # Equity(67 [C]) + [2, 2], # 2014-01-03 Equity(65 [A]) + [2, 2], # Equity(66 [B]) + [2, 2], # Equity(67 [C]) + ], + index=pd.MultiIndex.from_product( + (self.dates, finder.retrieve_all(self.sids)), + ), + ) + self._test_id( + df, + var * Record(fields), + expected, + finder, + ('value', 'other'), + ) + def _run_pipeline(self, expr, deltas, diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index fb79db8d..18b9faee 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -675,7 +675,7 @@ def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value): Then the overwrite will apply to indexes: 1, 2, 3, 4 """ first_row = dense_dates.searchsorted(asof) - next_idx = sparse_dates.searchsorted(asof, 'right') + next_idx = sparse_dates.searchsorted(asof.asm8, 'right') if next_idx == len(sparse_dates): # There is no next date in the sparse, this overwrite should apply # through the end of the dense dates. @@ -692,8 +692,8 @@ def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value): yield Float64Overwrite(first_row, last_row, first, last, value) -def adjustments_from_deltas_no_sids(dates, - dense_dates, +def adjustments_from_deltas_no_sids(dense_dates, + sparse_dates, column_idx, column_name, assets, @@ -703,10 +703,10 @@ def adjustments_from_deltas_no_sids(dates, Parameters ---------- - dates : pd.DatetimeIndex - The dates requested by the loader. dense_dates : pd.DatetimeIndex - The dates that were in the dense data. + The dates requested by the loader. + sparse_dates : pd.DatetimeIndex + The dates that were in the raw data. column_idx : int The index of the column in the dataset. column_name : str @@ -722,18 +722,18 @@ def adjustments_from_deltas_no_sids(dates, ad_series = deltas[AD_FIELD_NAME] asset_idx = 0, len(assets) - 1 return { - dates.get_loc(kd): overwrite_from_dates( + dense_dates.get_loc(kd): overwrite_from_dates( ad_series.loc[kd], - dates, dense_dates, + sparse_dates, asset_idx, v, ) for kd, v in deltas[column_name].iteritems() } -def adjustments_from_deltas_with_sids(dates, - dense_dates, +def adjustments_from_deltas_with_sids(dense_dates, + sparse_dates, column_idx, column_name, assets, @@ -746,7 +746,7 @@ def adjustments_from_deltas_with_sids(dates, dates : pd.DatetimeIndex The dates requested by the loader. dense_dates : pd.DatetimeIndex - The dates that were in the dense data. + The dates that were in the raw data. column_idx : int The index of the column in the dataset. column_name : str @@ -763,11 +763,11 @@ def adjustments_from_deltas_with_sids(dates, adjustments = defaultdict(list) for sid_idx, (sid, per_sid) in enumerate(deltas[column_name].iteritems()): for kd, v in per_sid.iteritems(): - adjustments[dates.searchsorted(kd)].extend( + adjustments[dense_dates.searchsorted(kd)].extend( overwrite_from_dates( ad_series.loc[kd, sid], - dates, dense_dates, + sparse_dates, (sid_idx, sid_idx), v, ), @@ -904,7 +904,7 @@ class BlazeLoader(dict): odo(where(e, column), pd.DataFrame, **_kwargs) for column in columns ), - ) + ).sort(TS_FIELD_NAME) # sort for the groupby later materialized_expr = collect_expr(expr) materialized_deltas = ( @@ -939,26 +939,41 @@ class BlazeLoader(dict): ) sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True) - if have_sids: - # Unstack by the sid so that we get a multi-index on the columns - # of datacolumn, sid. - sparse_output = sparse_output.set_index( - [TS_FIELD_NAME, SID_FIELD_NAME], - ).unstack() - sparse_deltas = non_novel_deltas.set_index( - [TS_FIELD_NAME, SID_FIELD_NAME], - ).unstack() - cols = sparse_output.columns - dense_output = sparse_output.groupby( - dates[dates.searchsorted(sparse_output.index)], - ).last().reindex( - index=dates, - columns=pd.MultiIndex.from_product( - (cols.levels[0], assets), - names=cols.names, - ), - ) + def last_in_date_group(df, reindex, have_sids=have_sids): + idx = dates[dates.searchsorted( + df[TS_FIELD_NAME].values.astype('datetime64[D]') + )] + if have_sids: + idx = [idx, SID_FIELD_NAME] + last_in_group = df.drop(TS_FIELD_NAME, axis=1).groupby( + idx, + sort=False, + ).last() + + if have_sids: + last_in_group = last_in_group.unstack() + + if reindex: + if have_sids: + cols = last_in_group.columns + last_in_group = last_in_group.reindex( + index=dates, + columns=pd.MultiIndex.from_product( + (cols.levels[0], assets), + names=cols.names, + ), + ) + else: + last_in_group = last_in_group.reindex(dates) + + return last_in_group + + sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False) + dense_output = last_in_date_group(sparse_output, reindex=True) + dense_output.ffill(inplace=True) + + if have_sids: adjustments_from_deltas = adjustments_from_deltas_with_sids column_view = identity else: @@ -974,15 +989,8 @@ class BlazeLoader(dict): copy, partial(repeat_last_axis, count=len(assets)), ) - sparse_output = sparse_output.set_index(TS_FIELD_NAME) - dense_output = sparse_output.groupby( - dates[dates.searchsorted(sparse_output.index)], - ).last().reindex(dates) - sparse_deltas = non_novel_deltas.set_index(TS_FIELD_NAME) adjustments_from_deltas = adjustments_from_deltas_no_sids - dense_output.ffill(inplace=True) - for column_idx, column in enumerate(columns): column_name = column.name yield column, AdjustedArray( @@ -992,7 +1000,7 @@ class BlazeLoader(dict): mask, adjustments_from_deltas( dates, - sparse_output.index, + sparse_output[TS_FIELD_NAME].values, column_idx, column_name, assets, From 0f43b6b371cde022e2fb577de0e18b8db5a24e54 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Thu, 21 Jan 2016 13:30:16 -0500 Subject: [PATCH 6/6] MAINT: fix blaze commit --- etc/requirements_blaze.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/requirements_blaze.txt b/etc/requirements_blaze.txt index 113ed569..dff430ea 100644 --- a/etc/requirements_blaze.txt +++ b/etc/requirements_blaze.txt @@ -1,3 +1,3 @@ --e git://github.com/quantopian/blaze.git@c785f07f83b591a8d53db3a5013423f6b0f7ffd3#egg=blaze-dev +-e git://github.com/quantopian/blaze.git@2e3d4d5d99588105304fdf226348425bbca73539#egg=blaze-dev -e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev -e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev