From bd06ff14edb19b04dd9b60d8c030ae95ba1dfd77 Mon Sep 17 00:00:00 2001 From: Maya Tydykov Date: Tue, 24 Jan 2017 12:48:56 -0500 Subject: [PATCH] MAINT: optionally apply deltas adjustments based on info from dataset --- zipline/pipeline/loaders/blaze/core.py | 40 ++++++++++++++++++++------ 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/zipline/pipeline/loaders/blaze/core.py b/zipline/pipeline/loaders/blaze/core.py index 00755078..0052029d 100644 --- a/zipline/pipeline/loaders/blaze/core.py +++ b/zipline/pipeline/loaders/blaze/core.py @@ -206,8 +206,13 @@ is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types)) get__name__ = op.attrgetter('__name__') -class ExprData(namedtuple('ExprData', 'expr deltas checkpoints odo_kwargs')): - """A pair of expressions and data resources. The expresions will be +class ExprData( + namedtuple( + 'ExprData', + 'expr deltas checkpoints odo_kwargs apply_deltas_adjustments' + ) +): + """A pair of expressions and data resources. The expressions will be computed using the resources as the starting scope. Parameters @@ -221,13 +226,19 @@ class ExprData(namedtuple('ExprData', 'expr deltas checkpoints odo_kwargs')): odo_kwargs : dict, optional The keyword arguments to forward to the odo calls internally. """ - def __new__(cls, expr, deltas=None, checkpoints=None, odo_kwargs=None): + def __new__(cls, + expr, + deltas=None, + checkpoints=None, + odo_kwargs=None, + apply_deltas_adjustments=True): return super(ExprData, cls).__new__( cls, expr, deltas, checkpoints, odo_kwargs or {}, + apply_deltas_adjustments, ) def __repr__(self): @@ -239,6 +250,7 @@ class ExprData(namedtuple('ExprData', 'expr deltas checkpoints odo_kwargs')): str(self.deltas), str(self.checkpoints), self.odo_kwargs, + self.apply_deltas_adjustments, )) @@ -547,20 +559,25 @@ def from_blaze(expr, odo_kwargs=None, missing_values=None, no_deltas_rule='warn', - no_checkpoints_rule='warn'): + no_checkpoints_rule='warn', + apply_deltas_adjustments=True,): """Create a Pipeline API object from a blaze expression. Parameters ---------- expr : Expr The blaze expression to use. + apply_deltas_adjustments : bool, optional + Whether or not deltas adjustments should be applied for this dataset. + True by default because not applying deltas adjustments is an exception + rather than the rule. deltas : Expr, 'auto' or None, optional The expression to use for the point in time adjustments. If the string 'auto' is passed, a deltas expr will be looked up by stepping up the expression tree and looking for another field with the name of ``expr._name`` + '_deltas'. If None is passed, no deltas will be used. - deltas : Expr, 'auto' or None, optional + checkpoints : Expr, 'auto' or None, optional The expression to use for the forward fill checkpoints. If the string 'auto' is passed, a checkpoints expr will be looked up by stepping up the expression tree and looking for another field @@ -714,6 +731,7 @@ def from_blaze(expr, if checkpoints is not None else None, odo_kwargs=odo_kwargs, + apply_deltas_adjustments=apply_deltas_adjustments ) if single_column is not None: # We were passed a single column, extract and return it. @@ -984,7 +1002,9 @@ class BlazeLoader(dict): except ValueError: raise AssertionError('all columns must come from the same dataset') - expr, deltas, checkpoints, odo_kwargs = self[dataset] + expr, deltas, checkpoints, odo_kwargs, apply_deltas_adjustments = self[ + dataset + ] have_sids = (dataset.ndim == 2) asset_idx = pd.Series(index=assets, data=np.arange(len(assets))) assets = list(map(int, assets)) # coerce from numpy.int64 @@ -1094,15 +1114,17 @@ class BlazeLoader(dict): have_sids=have_sids) ffill_across_cols(dense_output, columns, {c.name: c.name for c in columns}) + adjustments_from_deltas = lambda *args: {} if have_sids: - adjustments_from_deltas = adjustments_from_deltas_with_sids + if apply_deltas_adjustments: + adjustments_from_deltas = adjustments_from_deltas_with_sids column_view = identity else: # If we do not have sids, use the column view to make a single # column vector which is unassociated with any assets. column_view = op.itemgetter(np.s_[:, np.newaxis]) - - adjustments_from_deltas = adjustments_from_deltas_no_sids + if apply_deltas_adjustments: + adjustments_from_deltas = adjustments_from_deltas_no_sids mask = np.full( shape=(len(mask), 1), fill_value=True, dtype=bool_dtype, )