MAINT: optionally apply deltas adjustments based on info from dataset

This commit is contained in:
Maya Tydykov
2017-01-24 12:48:56 -05:00
parent b080d337f5
commit bd06ff14ed
+31 -9
View File
@@ -206,8 +206,13 @@ is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types))
get__name__ = op.attrgetter('__name__')
class ExprData(namedtuple('ExprData', 'expr deltas checkpoints odo_kwargs')):
"""A pair of expressions and data resources. The expresions will be
class ExprData(
namedtuple(
'ExprData',
'expr deltas checkpoints odo_kwargs apply_deltas_adjustments'
)
):
"""A pair of expressions and data resources. The expressions will be
computed using the resources as the starting scope.
Parameters
@@ -221,13 +226,19 @@ class ExprData(namedtuple('ExprData', 'expr deltas checkpoints odo_kwargs')):
odo_kwargs : dict, optional
The keyword arguments to forward to the odo calls internally.
"""
def __new__(cls, expr, deltas=None, checkpoints=None, odo_kwargs=None):
def __new__(cls,
expr,
deltas=None,
checkpoints=None,
odo_kwargs=None,
apply_deltas_adjustments=True):
return super(ExprData, cls).__new__(
cls,
expr,
deltas,
checkpoints,
odo_kwargs or {},
apply_deltas_adjustments,
)
def __repr__(self):
@@ -239,6 +250,7 @@ class ExprData(namedtuple('ExprData', 'expr deltas checkpoints odo_kwargs')):
str(self.deltas),
str(self.checkpoints),
self.odo_kwargs,
self.apply_deltas_adjustments,
))
@@ -547,20 +559,25 @@ def from_blaze(expr,
odo_kwargs=None,
missing_values=None,
no_deltas_rule='warn',
no_checkpoints_rule='warn'):
no_checkpoints_rule='warn',
apply_deltas_adjustments=True,):
"""Create a Pipeline API object from a blaze expression.
Parameters
----------
expr : Expr
The blaze expression to use.
apply_deltas_adjustments : bool, optional
Whether or not deltas adjustments should be applied for this dataset.
True by default because not applying deltas adjustments is an exception
rather than the rule.
deltas : Expr, 'auto' or None, optional
The expression to use for the point in time adjustments.
If the string 'auto' is passed, a deltas expr will be looked up
by stepping up the expression tree and looking for another field
with the name of ``expr._name`` + '_deltas'. If None is passed, no
deltas will be used.
deltas : Expr, 'auto' or None, optional
checkpoints : Expr, 'auto' or None, optional
The expression to use for the forward fill checkpoints.
If the string 'auto' is passed, a checkpoints expr will be looked up
by stepping up the expression tree and looking for another field
@@ -714,6 +731,7 @@ def from_blaze(expr,
if checkpoints is not None else
None,
odo_kwargs=odo_kwargs,
apply_deltas_adjustments=apply_deltas_adjustments
)
if single_column is not None:
# We were passed a single column, extract and return it.
@@ -984,7 +1002,9 @@ class BlazeLoader(dict):
except ValueError:
raise AssertionError('all columns must come from the same dataset')
expr, deltas, checkpoints, odo_kwargs = self[dataset]
expr, deltas, checkpoints, odo_kwargs, apply_deltas_adjustments = self[
dataset
]
have_sids = (dataset.ndim == 2)
asset_idx = pd.Series(index=assets, data=np.arange(len(assets)))
assets = list(map(int, assets)) # coerce from numpy.int64
@@ -1094,15 +1114,17 @@ class BlazeLoader(dict):
have_sids=have_sids)
ffill_across_cols(dense_output, columns, {c.name: c.name
for c in columns})
adjustments_from_deltas = lambda *args: {}
if have_sids:
adjustments_from_deltas = adjustments_from_deltas_with_sids
if apply_deltas_adjustments:
adjustments_from_deltas = adjustments_from_deltas_with_sids
column_view = identity
else:
# If we do not have sids, use the column view to make a single
# column vector which is unassociated with any assets.
column_view = op.itemgetter(np.s_[:, np.newaxis])
adjustments_from_deltas = adjustments_from_deltas_no_sids
if apply_deltas_adjustments:
adjustments_from_deltas = adjustments_from_deltas_no_sids
mask = np.full(
shape=(len(mask), 1), fill_value=True, dtype=bool_dtype,
)