diff --git a/zipline/pipeline/loaders/blaze.py b/zipline/pipeline/loaders/blaze.py index 2c800934..d727e972 100644 --- a/zipline/pipeline/loaders/blaze.py +++ b/zipline/pipeline/loaders/blaze.py @@ -2,8 +2,8 @@ from __future__ import division from abc import ABCMeta, abstractproperty from collections import namedtuple -from datetime import datetime from operator import attrgetter +from weakref import WeakKeyDictionary import blaze as bz from datashape import ( @@ -19,7 +19,6 @@ from logbook import Logger from numpy.lib.stride_tricks import as_strided from odo import odo import pandas as pd -from pytz import utc from toolz import flip, memoize, compose, complement, identity from six import with_metaclass @@ -195,7 +194,7 @@ def _check_datetime_field(name, measure): """ if not isinstance(measure[name], (Date, DateTime)): raise TypeError( - "'{n}' field must be a '{dt}', not: '{dshape}'".format( + "'{name}' field must be a '{dt}', not: '{dshape}'".format( name=name, dt=DateTime(), dshape=measure[name], @@ -295,7 +294,7 @@ def pipeline_api_from_blaze(expr, if isrecord(expr.dshape.measure): break else: - expr = bz.Data({single_column: col}) + expr = bz.Data(col, name=single_column) deltas = _get_deltas(expr, deltas, no_deltas_rule) if deltas is not None: @@ -460,30 +459,12 @@ def adjustments_from_deltas(dates, } -def to_datetime(dt64, factory=datetime.fromtimestamp, _ns_to_s=1000 ** 3): - """Convert a numpy datetime64 to a datetime object. - - Parameters - ---------- - dt64 : datetime64 - The datetime64 to coerce. - factory : callable, optional - The function to coerce the timestamp as seconds into an object. - - Returns - ------- - dt : datetime - The dt64 coerced to a datetime. - """ - return factory(int(dt64) / _ns_to_s, tz=utc) - - class BlazeLoader(dict): def __init__(self, colmap=None): self.update(colmap or {}) @classmethod - @memoize + @memoize(cache=WeakKeyDictionary()) def global_instance(cls): return cls() @@ -518,11 +499,11 @@ class BlazeLoader(dict): # Hack to get the lower bound to query: # This must be strictly executed because the data for `ts` will # be removed from scope too early otherwise. - lower = odo(ts[ts <= to_datetime(dates[0])].max(), pd.Timestamp) + lower = odo(ts[ts <= dates[0]].max(), pd.Timestamp) return e[ e[SID_FIELD_NAME].isin(assets) & - (ts >= lower) & - (ts < to_datetime(dates[-1])) + ((ts >= lower) if lower is not pd.NaT else True) & + (ts <= dates[-1]) ][query_fields] materialized_expr = odo( @@ -539,21 +520,41 @@ class BlazeLoader(dict): # Inline the deltas that changed our most recently known value. # Also, we reindex by the dates to create a dense representation of # the data. - base = inline_novel_deltas( + dense_output = inline_novel_deltas( materialized_expr, materialized_deltas, dates, - ).drop(AD_FIELD_NAME, axis=1).set_index(TS_FIELD_NAME).reindex( - dates, - method='ffill', - ) + ).drop(AD_FIELD_NAME, axis=1).set_index(TS_FIELD_NAME) + if have_sids: - base.index.name = TS_FIELD_NAME # Unstack by the sid so that we get a multi-index on the columns # of datacolumn, sid. - base = base.set_index(SID_FIELD_NAME, append=True).unstack() + dense_output = dense_output.set_index( + SID_FIELD_NAME, + append=True, + ).unstack() + + # Allocate the whole output dataframe at once instead of + # reindexing. + sparse_output = pd.DataFrame( + columns=pd.MultiIndex.from_product( + (dense_output.columns.levels[0], assets), + names=( + dense_output.columns.levels[0].name, + SID_FIELD_NAME, + ), + ), + index=dates, + ) + + # In place update the output based on the base. + sparse_output.update(dense_output) + column_view = identity else: + # We use the column view to make an array per asset. + sparse_output = dense_output.reindex(dates) + def column_view(arr, _shape=(len(dates), len(assets))): """Return a virtual matrix where we make a view that duplicates a single column for all the assets. @@ -572,9 +573,15 @@ class BlazeLoader(dict): strides=(arr.itemsize, 0), ) + # Walk forward the data after any symbol mapped or non-symbol mapped + # specific transforms have been applied. + sparse_output = sparse_output.ffill() + for column_idx, column in enumerate(columns): yield adjusted_array( - column_view(base[column.name].values.astype(column.dtype)), + column_view( + sparse_output[column.name].values.astype(column.dtype), + ), mask, adjustments_from_deltas( dates, @@ -585,4 +592,10 @@ class BlazeLoader(dict): ) ) + def __repr__(self): + return '%s(%s)' % ( + type(self).__name__, + super(BlazeLoader, self).__repr__(), + ) + global_loader = BlazeLoader.global_instance()