From 2036f3d389ed272a35982ee417dfe1525ca109fb Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Fri, 12 Feb 2016 00:27:25 -0500 Subject: [PATCH 1/2] ENH: condense the blaze query --- etc/requirements_blaze.txt | 5 +- tests/pipeline/test_blaze.py | 37 ++++++++++----- zipline/pipeline/loaders/blaze/core.py | 66 +++++++++++++------------- 3 files changed, 62 insertions(+), 46 deletions(-) diff --git a/etc/requirements_blaze.txt b/etc/requirements_blaze.txt index 09e9e14f..8de69a32 100644 --- a/etc/requirements_blaze.txt +++ b/etc/requirements_blaze.txt @@ -1,3 +1,4 @@ --e git://github.com/quantopian/blaze.git@831116adba808b89f42cef48c7d96cc44603d05a#egg=blaze-dev --e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev -e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev +-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev +-e git://github.com/quantopian/blaze.git@0b6e76122a57c7115f18c6fdbd5fbab5501fd486#egg=blaze-dev + diff --git a/tests/pipeline/test_blaze.py b/tests/pipeline/test_blaze.py index 5527b963..ed19f5f4 100644 --- a/tests/pipeline/test_blaze.py +++ b/tests/pipeline/test_blaze.py @@ -13,6 +13,7 @@ from datashape import dshape, var, Record from nose_parameterized import parameterized import numpy as np from numpy.testing.utils import assert_array_almost_equal +from odo import odo import pandas as pd from pandas.util.testing import assert_frame_equal from toolz import keymap, valmap, concatv @@ -845,11 +846,18 @@ class BlazeToPipelineTestCase(TestCase): @with_extra_sid def test_deltas(self, asset_info): expr = bz.Data(self.df, name='expr', dshape=self.dshape) - deltas = bz.Data(self.df, name='deltas', dshape=self.dshape) - deltas = bz.transform( - deltas, - value=deltas.value + 10, - timestamp=deltas.timestamp + timedelta(days=1), + deltas = bz.Data(self.df, dshape=self.dshape) + deltas = bz.Data( + odo( + bz.transform( + deltas, + value=deltas.value + 10, + timestamp=deltas.timestamp + timedelta(days=1), + ), + pd.DataFrame, + ), + name='delta', + dshape=self.dshape, ) expected_views = keymap(pd.Timestamp, { @@ -996,16 +1004,23 @@ class BlazeToPipelineTestCase(TestCase): repeated_dates = base_dates.repeat(3) baseline = pd.DataFrame({ 'sid': self.sids * 2, - 'value': (0, 1, 2, 1, 2, 3), + 'value': (0., 1., 2., 1., 2., 3.), + 'int_value': (0, 1, 2, 1, 2, 3), 'asof_date': repeated_dates, 'timestamp': repeated_dates, }) expr = bz.Data(baseline, name='expr', dshape=self.dshape) - deltas = bz.Data(baseline, name='deltas', dshape=self.dshape) - deltas = bz.transform( - deltas, - value=deltas.value + 10, - timestamp=deltas.timestamp + timedelta(days=1), + deltas = bz.Data( + odo( + bz.transform( + expr, + value=expr.value + 10, + timestamp=expr.timestamp + timedelta(days=1), + ), + pd.DataFrame, + ), + name='delta', + dshape=self.dshape, ) expected_views = keymap(pd.Timestamp, { '2014-01-03': np.array([[10.0, 11.0, 12.0], diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index c7ee00c4..0fcd52e0 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -872,50 +872,54 @@ class BlazeLoader(dict): data_query_tz, ) - def where(e, column): + def where(e): """Create the query to run against the resources. Parameters ---------- e : Expr The baseline or deltas expression. - column : BoundColumn - The column to query for. Returns ------- q : Expr - The query to run for the given column. + The query to run. """ - colname = column.name - pred = e[TS_FIELD_NAME] <= lower_dt - schema = e[colname].schema.measure - if isinstance(schema, Option): - pred &= e[colname].notnull() - schema = schema.ty - if schema in floating: - pred &= ~e[colname].isnan() - filtered = e[pred] - lower = filtered.timestamp.max() + def lower_for_col(column): + pred = e[TS_FIELD_NAME] <= lower_dt + colname = column.name + schema = e[colname].schema.measure + if isinstance(schema, Option): + pred &= e[colname].notnull() + schema = schema.ty + if schema in floating: + pred &= ~e[colname].isnan() - if have_sids: - # If we have sids, then we need to take the earliest of the - # greatest date that has a non-null value by sid. - lower = bz.by( - filtered[SID_FIELD_NAME], - timestamp=lower, - ).timestamp.min() + filtered = e[pred] + lower = filtered[TS_FIELD_NAME].max() + if have_sids: + # If we have sids, then we need to take the earliest of the + # greatest date that has a non-null value by sid. + lower = bz.by( + filtered[SID_FIELD_NAME], + timestamp=lower, + ).timestamp.min() + return lower - lower = odo(lower, pd.Timestamp) + lower = odo( + reduce( + bz.least, + map(lower_for_col, columns), + ), + pd.Timestamp, + **odo_kwargs + ) if lower is pd.NaT: - # If there is no lower date, just query for data in he date - # range. It must all be null anyways. lower = lower_dt - return e[ (e[TS_FIELD_NAME] >= lower) & (e[TS_FIELD_NAME] <= upper_dt) - ][added_query_fields + [colname]] + ][added_query_fields + list(map(getname, columns))] def collect_expr(e): """Execute and merge all of the per-column subqueries. @@ -935,13 +939,9 @@ class BlazeLoader(dict): This can return more data than needed. The in memory reindex will handle this. """ - return sort_values(reduce( - partial(pd.merge, on=added_query_fields, how='outer'), - ( - odo(where(e, column), pd.DataFrame, **odo_kwargs) - for column in columns - ), - ), TS_FIELD_NAME) # sort for the groupby later + df = odo(where(e), pd.DataFrame, **odo_kwargs) + df.sort(TS_FIELD_NAME, inplace=True) # sort for the groupby later + return df materialized_expr = collect_expr(expr) materialized_deltas = ( From ef29de222a7295c8fdc24ff588cc28fdaa186a99 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Wed, 17 Feb 2016 20:23:29 -0500 Subject: [PATCH 2/2] MAINT: add .eggs/* to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 5ff97ba4..305dc8ef 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ test.log # Packages *.egg +.eggs/* *.egg-info dist build