mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-06 05:14:38 +08:00
MAINT: reshuffle logic in from_blaze to make the control flow easier
This commit is contained in:
@@ -428,6 +428,42 @@ def _get_deltas(expr, deltas, no_deltas_rule):
|
||||
return None
|
||||
|
||||
|
||||
def _ensure_timestamp_field(dataset_expr, deltas):
|
||||
"""Verify that the baseline and deltas expressions have a timestamp field.
|
||||
|
||||
If there is not a ``TS_FIELD_NAME`` on either of the expressions, it will
|
||||
be copied from the ``AD_FIELD_NAME``. If one is provided, then we will
|
||||
verify that it is the correct dshape.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dataset_expr : Expr
|
||||
The baseline expression.
|
||||
deltas : Expr or None
|
||||
The deltas expression if any was provided.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dataset_expr, deltas : Expr
|
||||
The new baseline and deltas expressions to use.
|
||||
"""
|
||||
measure = dataset_expr.dshape.measure
|
||||
if TS_FIELD_NAME not in measure.names:
|
||||
dataset_expr = bz.transform(
|
||||
dataset_expr,
|
||||
**{TS_FIELD_NAME: dataset_expr[AD_FIELD_NAME]}
|
||||
)
|
||||
if deltas is not None:
|
||||
deltas = bz.transform(
|
||||
deltas,
|
||||
**{TS_FIELD_NAME: deltas[AD_FIELD_NAME]}
|
||||
)
|
||||
else:
|
||||
_check_datetime_field(TS_FIELD_NAME, measure)
|
||||
|
||||
return dataset_expr, deltas
|
||||
|
||||
|
||||
@expect_element(no_deltas_rule=_valid_no_deltas_rules)
|
||||
def from_blaze(expr,
|
||||
deltas='auto',
|
||||
@@ -508,9 +544,11 @@ def from_blaze(expr,
|
||||
break
|
||||
rename = expr._name
|
||||
expr = expr._child
|
||||
expr = expr.relabel({rename: single_column})
|
||||
dataset_expr = expr.relabel({rename: single_column})
|
||||
else:
|
||||
dataset_expr = expr
|
||||
|
||||
measure = expr.dshape.measure
|
||||
measure = dataset_expr.dshape.measure
|
||||
if not isrecord(measure) or AD_FIELD_NAME not in measure.names:
|
||||
raise TypeError(
|
||||
"The dataset must be a collection of records with at least an"
|
||||
@@ -521,16 +559,7 @@ def from_blaze(expr,
|
||||
),
|
||||
)
|
||||
_check_datetime_field(AD_FIELD_NAME, measure)
|
||||
|
||||
if TS_FIELD_NAME not in measure.names:
|
||||
expr = bz.transform(expr, **{TS_FIELD_NAME: expr[AD_FIELD_NAME]})
|
||||
if deltas is not None:
|
||||
deltas = bz.transform(
|
||||
deltas,
|
||||
**{TS_FIELD_NAME: deltas[AD_FIELD_NAME]}
|
||||
)
|
||||
else:
|
||||
_check_datetime_field(TS_FIELD_NAME, measure)
|
||||
dataset_expr, deltas = _ensure_timestamp_field(dataset_expr, deltas)
|
||||
|
||||
if deltas is not None and (sorted(deltas.dshape.measure.fields) !=
|
||||
sorted(measure.fields)):
|
||||
@@ -542,14 +571,14 @@ def from_blaze(expr,
|
||||
)
|
||||
|
||||
# Ensure that we have a data resource to execute the query against.
|
||||
_check_resources('expr', expr, resources)
|
||||
_check_resources('dataset_expr', dataset_expr, resources)
|
||||
_check_resources('deltas', deltas, resources)
|
||||
|
||||
# Create or retrieve the Pipeline API dataset.
|
||||
ds = new_dataset(expr, deltas)
|
||||
ds = new_dataset(dataset_expr, deltas)
|
||||
# Register our new dataset with the loader.
|
||||
(loader if loader is not None else global_loader)[ds] = ExprData(
|
||||
expr,
|
||||
dataset_expr,
|
||||
deltas,
|
||||
resources,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user