diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index d17612a4..03eb46d8 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -904,44 +904,12 @@ class BlazeLoader(dict): q : Expr The query to run. """ - def lower_for_col(column): - pred = e[TS_FIELD_NAME] <= lower_dt - colname = column.name - schema = e[colname].schema.measure - if isinstance(schema, Option): - pred &= e[colname].notnull() - schema = schema.ty - if schema in floating: - pred &= ~e[colname].isnan() - - filtered = e[pred] - lower = filtered[TS_FIELD_NAME].max() - if have_sids: - # If we have sids, then we need to take the earliest of the - # greatest date that has a non-null value by sid. - lower = bz.by( - filtered[SID_FIELD_NAME], - timestamp=lower, - ).timestamp.min() - return lower - - lower = odo( - reduce( - bz.least, - map(lower_for_col, columns), - ), - pd.Timestamp, - **odo_kwargs - ) - if lower is pd.NaT: - lower = lower_dt return e[ - (e[TS_FIELD_NAME] >= lower) & (e[TS_FIELD_NAME] <= upper_dt) ][added_query_fields + list(map(getname, columns))] def collect_expr(e): - """Execute and merge all of the per-column subqueries. + """Materialize the expression as a dataframe. Parameters ----------