mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-27 21:51:37 +08:00
Merge pull request #994 from quantopian/blaze-condense-query
condense the blaze query
This commit is contained in:
@@ -25,6 +25,7 @@ test.log
|
||||
|
||||
# Packages
|
||||
*.egg
|
||||
.eggs/*
|
||||
*.egg-info
|
||||
dist
|
||||
build
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
-e git://github.com/quantopian/blaze.git@831116adba808b89f42cef48c7d96cc44603d05a#egg=blaze-dev
|
||||
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
|
||||
-e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev
|
||||
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
|
||||
-e git://github.com/quantopian/blaze.git@0b6e76122a57c7115f18c6fdbd5fbab5501fd486#egg=blaze-dev
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ from datashape import dshape, var, Record
|
||||
from nose_parameterized import parameterized
|
||||
import numpy as np
|
||||
from numpy.testing.utils import assert_array_almost_equal
|
||||
from odo import odo
|
||||
import pandas as pd
|
||||
from pandas.util.testing import assert_frame_equal
|
||||
from toolz import keymap, valmap, concatv
|
||||
@@ -845,11 +846,18 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
@with_extra_sid
|
||||
def test_deltas(self, asset_info):
|
||||
expr = bz.Data(self.df, name='expr', dshape=self.dshape)
|
||||
deltas = bz.Data(self.df, name='deltas', dshape=self.dshape)
|
||||
deltas = bz.transform(
|
||||
deltas,
|
||||
value=deltas.value + 10,
|
||||
timestamp=deltas.timestamp + timedelta(days=1),
|
||||
deltas = bz.Data(self.df, dshape=self.dshape)
|
||||
deltas = bz.Data(
|
||||
odo(
|
||||
bz.transform(
|
||||
deltas,
|
||||
value=deltas.value + 10,
|
||||
timestamp=deltas.timestamp + timedelta(days=1),
|
||||
),
|
||||
pd.DataFrame,
|
||||
),
|
||||
name='delta',
|
||||
dshape=self.dshape,
|
||||
)
|
||||
|
||||
expected_views = keymap(pd.Timestamp, {
|
||||
@@ -996,16 +1004,23 @@ class BlazeToPipelineTestCase(TestCase):
|
||||
repeated_dates = base_dates.repeat(3)
|
||||
baseline = pd.DataFrame({
|
||||
'sid': self.sids * 2,
|
||||
'value': (0, 1, 2, 1, 2, 3),
|
||||
'value': (0., 1., 2., 1., 2., 3.),
|
||||
'int_value': (0, 1, 2, 1, 2, 3),
|
||||
'asof_date': repeated_dates,
|
||||
'timestamp': repeated_dates,
|
||||
})
|
||||
expr = bz.Data(baseline, name='expr', dshape=self.dshape)
|
||||
deltas = bz.Data(baseline, name='deltas', dshape=self.dshape)
|
||||
deltas = bz.transform(
|
||||
deltas,
|
||||
value=deltas.value + 10,
|
||||
timestamp=deltas.timestamp + timedelta(days=1),
|
||||
deltas = bz.Data(
|
||||
odo(
|
||||
bz.transform(
|
||||
expr,
|
||||
value=expr.value + 10,
|
||||
timestamp=expr.timestamp + timedelta(days=1),
|
||||
),
|
||||
pd.DataFrame,
|
||||
),
|
||||
name='delta',
|
||||
dshape=self.dshape,
|
||||
)
|
||||
expected_views = keymap(pd.Timestamp, {
|
||||
'2014-01-03': np.array([[10.0, 11.0, 12.0],
|
||||
|
||||
@@ -872,50 +872,54 @@ class BlazeLoader(dict):
|
||||
data_query_tz,
|
||||
)
|
||||
|
||||
def where(e, column):
|
||||
def where(e):
|
||||
"""Create the query to run against the resources.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
e : Expr
|
||||
The baseline or deltas expression.
|
||||
column : BoundColumn
|
||||
The column to query for.
|
||||
|
||||
Returns
|
||||
-------
|
||||
q : Expr
|
||||
The query to run for the given column.
|
||||
The query to run.
|
||||
"""
|
||||
colname = column.name
|
||||
pred = e[TS_FIELD_NAME] <= lower_dt
|
||||
schema = e[colname].schema.measure
|
||||
if isinstance(schema, Option):
|
||||
pred &= e[colname].notnull()
|
||||
schema = schema.ty
|
||||
if schema in floating:
|
||||
pred &= ~e[colname].isnan()
|
||||
filtered = e[pred]
|
||||
lower = filtered.timestamp.max()
|
||||
def lower_for_col(column):
|
||||
pred = e[TS_FIELD_NAME] <= lower_dt
|
||||
colname = column.name
|
||||
schema = e[colname].schema.measure
|
||||
if isinstance(schema, Option):
|
||||
pred &= e[colname].notnull()
|
||||
schema = schema.ty
|
||||
if schema in floating:
|
||||
pred &= ~e[colname].isnan()
|
||||
|
||||
if have_sids:
|
||||
# If we have sids, then we need to take the earliest of the
|
||||
# greatest date that has a non-null value by sid.
|
||||
lower = bz.by(
|
||||
filtered[SID_FIELD_NAME],
|
||||
timestamp=lower,
|
||||
).timestamp.min()
|
||||
filtered = e[pred]
|
||||
lower = filtered[TS_FIELD_NAME].max()
|
||||
if have_sids:
|
||||
# If we have sids, then we need to take the earliest of the
|
||||
# greatest date that has a non-null value by sid.
|
||||
lower = bz.by(
|
||||
filtered[SID_FIELD_NAME],
|
||||
timestamp=lower,
|
||||
).timestamp.min()
|
||||
return lower
|
||||
|
||||
lower = odo(lower, pd.Timestamp)
|
||||
lower = odo(
|
||||
reduce(
|
||||
bz.least,
|
||||
map(lower_for_col, columns),
|
||||
),
|
||||
pd.Timestamp,
|
||||
**odo_kwargs
|
||||
)
|
||||
if lower is pd.NaT:
|
||||
# If there is no lower date, just query for data in he date
|
||||
# range. It must all be null anyways.
|
||||
lower = lower_dt
|
||||
|
||||
return e[
|
||||
(e[TS_FIELD_NAME] >= lower) &
|
||||
(e[TS_FIELD_NAME] <= upper_dt)
|
||||
][added_query_fields + [colname]]
|
||||
][added_query_fields + list(map(getname, columns))]
|
||||
|
||||
def collect_expr(e):
|
||||
"""Execute and merge all of the per-column subqueries.
|
||||
@@ -935,13 +939,9 @@ class BlazeLoader(dict):
|
||||
This can return more data than needed. The in memory reindex will
|
||||
handle this.
|
||||
"""
|
||||
return sort_values(reduce(
|
||||
partial(pd.merge, on=added_query_fields, how='outer'),
|
||||
(
|
||||
odo(where(e, column), pd.DataFrame, **odo_kwargs)
|
||||
for column in columns
|
||||
),
|
||||
), TS_FIELD_NAME) # sort for the groupby later
|
||||
df = odo(where(e), pd.DataFrame, **odo_kwargs)
|
||||
df.sort(TS_FIELD_NAME, inplace=True) # sort for the groupby later
|
||||
return df
|
||||
|
||||
materialized_expr = collect_expr(expr)
|
||||
materialized_deltas = (
|
||||
|
||||
Reference in New Issue
Block a user