Merge pull request #810 from quantopian/ffill

Blaze loader forward fill fix
This commit is contained in:
Joe Jevnik
2016-01-21 13:57:21 -05:00
3 changed files with 511 additions and 85 deletions
+1 -1
View File
@@ -1,3 +1,3 @@
-e git://github.com/quantopian/blaze.git@43d2f7e00a228106cea038a53322497831539559#egg=blaze-dev
-e git://github.com/quantopian/blaze.git@2e3d4d5d99588105304fdf226348425bbca73539#egg=blaze-dev
-e git://github.com/quantopian/odo.git@4f7f45fb039d89ea101803b95da21fc055901d66#egg=odo-dev
-e git://github.com/quantopian/datashape.git@9bd8fb970a0fc55e866a0b46b5101c9aa47e24ed#egg=datashape-dev
+390 -18
View File
@@ -295,8 +295,8 @@ class BlazeToPipelineTestCase(TestCase):
loader=self.garbage_loader,
)
def test_id(self):
expr = bz.Data(self.df, name='expr', dshape=self.dshape)
def _test_id(self, df, dshape, expected, finder, add):
expr = bz.Data(df, name='expr', dshape=dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
@@ -304,7 +304,8 @@ class BlazeToPipelineTestCase(TestCase):
no_deltas_rule=no_deltas_rules.ignore,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
for a in add:
p.add(getattr(ds, a).latest, a)
dates = self.dates
with tmp_asset_finder() as finder:
@@ -314,13 +315,6 @@ class BlazeToPipelineTestCase(TestCase):
finder,
).run_pipeline(p, dates[0], dates[-1])
expected = self.df.drop('asof_date', axis=1).set_index(
['timestamp', 'sid'],
)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
assert_frame_equal(result, expected, check_dtype=False)
def test_custom_query_time_tz(self):
@@ -328,8 +322,8 @@ class BlazeToPipelineTestCase(TestCase):
df['timestamp'] = (
pd.DatetimeIndex(df['timestamp'], tz='EST') +
timedelta(hours=8, minutes=44)
).tz_convert('utc')
df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45', tz='utc')
).tz_convert('utc').tz_localize(None)
df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45')
expr = bz.Data(df, name='expr', dshape=self.dshape)
loader = BlazeLoader(data_query_time=time(8, 45), data_query_tz='EST')
ds = from_blaze(
@@ -360,8 +354,268 @@ class BlazeToPipelineTestCase(TestCase):
))
assert_frame_equal(result, expected, check_dtype=False)
def test_id(self):
"""
input (self.df):
asof_date sid timestamp value
0 2014-01-01 65 2014-01-01 0
1 2014-01-01 66 2014-01-01 1
2 2014-01-01 67 2014-01-01 2
3 2014-01-02 65 2014-01-02 1
4 2014-01-02 66 2014-01-02 2
5 2014-01-02 67 2014-01-02 3
6 2014-01-03 65 2014-01-03 2
7 2014-01-03 66 2014-01-03 3
8 2014-01-03 67 2014-01-03 4
output (expected)
value
2014-01-01 Equity(65 [A]) 0
Equity(66 [B]) 1
Equity(67 [C]) 2
2014-01-02 Equity(65 [A]) 1
Equity(66 [B]) 2
Equity(67 [C]) 3
2014-01-03 Equity(65 [A]) 2
Equity(66 [B]) 3
Equity(67 [C]) 4
"""
with tmp_asset_finder() as finder:
expected = self.df.drop('asof_date', axis=1).set_index(
['timestamp', 'sid'],
)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
self._test_id(self.df, self.dshape, expected, finder, ('value',))
def test_id_ffill_out_of_window(self):
"""
input (df):
asof_date timestamp sid other value
0 2013-12-22 2013-12-22 65 0 0
1 2013-12-22 2013-12-22 66 NaN 1
2 2013-12-22 2013-12-22 67 2 NaN
3 2013-12-23 2013-12-23 65 NaN 1
4 2013-12-23 2013-12-23 66 2 NaN
5 2013-12-23 2013-12-23 67 3 3
6 2013-12-24 2013-12-24 65 2 NaN
7 2013-12-24 2013-12-24 66 3 3
8 2013-12-24 2013-12-24 67 NaN 4
output (expected):
other value
2014-01-01 Equity(65 [A]) 2 1
Equity(66 [B]) 3 3
Equity(67 [C]) 3 4
2014-01-02 Equity(65 [A]) 2 1
Equity(66 [B]) 3 3
Equity(67 [C]) 3 4
2014-01-03 Equity(65 [A]) 2 1
Equity(66 [B]) 3 3
Equity(67 [C]) 3 4
"""
dates = self.dates.repeat(3) - timedelta(days=10)
df = pd.DataFrame({
'sid': self.sids * 3,
'value': (0, 1, np.nan, 1, np.nan, 3, np.nan, 3, 4),
'other': (0, np.nan, 2, np.nan, 2, 3, 2, 3, np.nan),
'asof_date': dates,
'timestamp': dates,
})
fields = OrderedDict(self.dshape.measure.fields)
fields['other'] = fields['value']
with tmp_asset_finder() as finder:
expected = pd.DataFrame(
np.array([[2, 1],
[3, 3],
[3, 4],
[2, 1],
[3, 3],
[3, 4],
[2, 1],
[3, 3],
[3, 4]]),
columns=['other', 'value'],
index=pd.MultiIndex.from_product(
(self.dates, finder.retrieve_all(self.sids)),
),
)
self._test_id(
df,
var * Record(fields),
expected,
finder,
('value', 'other'),
)
def test_id_multiple_columns(self):
"""
input (df):
asof_date sid timestamp value other
0 2014-01-01 65 2014-01-01 0 1
1 2014-01-01 66 2014-01-01 1 2
2 2014-01-01 67 2014-01-01 2 3
3 2014-01-02 65 2014-01-02 1 2
4 2014-01-02 66 2014-01-02 2 3
5 2014-01-02 67 2014-01-02 3 4
6 2014-01-03 65 2014-01-03 2 3
7 2014-01-03 66 2014-01-03 3 4
8 2014-01-03 67 2014-01-03 4 5
output (expected):
value other
2014-01-01 Equity(65 [A]) 0 1
Equity(66 [B]) 1 2
Equity(67 [C]) 2 3
2014-01-02 Equity(65 [A]) 1 2
Equity(66 [B]) 2 3
Equity(67 [C]) 3 4
2014-01-03 Equity(65 [A]) 2 3
Equity(66 [B]) 3 4
Equity(67 [C]) 4 5
"""
df = self.df.copy()
df['other'] = df.value + 1
fields = OrderedDict(self.dshape.measure.fields)
fields['other'] = fields['value']
with tmp_asset_finder() as finder:
expected = df.drop('asof_date', axis=1).set_index(
['timestamp', 'sid'],
).sort_index(axis=1)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
self._test_id(
df,
var * Record(fields),
expected,
finder,
('value', 'other'),
)
def test_id_macro_dataset(self):
expr = bz.Data(self.macro_df, name='expr', dshape=self.macro_dshape)
"""
input (self.macro_df)
asof_date timestamp value
0 2014-01-01 2014-01-01 0
3 2014-01-02 2014-01-02 1
6 2014-01-03 2014-01-03 2
output (expected):
value
2014-01-01 Equity(65 [A]) 0
Equity(66 [B]) 0
Equity(67 [C]) 0
2014-01-02 Equity(65 [A]) 1
Equity(66 [B]) 1
Equity(67 [C]) 1
2014-01-03 Equity(65 [A]) 2
Equity(66 [B]) 2
Equity(67 [C]) 2
"""
asset_info = asset_infos[0][0]
nassets = len(asset_info)
with tmp_asset_finder() as finder:
expected = pd.DataFrame(
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
index=pd.MultiIndex.from_product((
self.macro_df.timestamp,
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
self._test_id(
self.macro_df,
self.macro_dshape,
expected,
finder,
('value',),
)
def test_id_ffill_out_of_window_macro_dataset(self):
"""
input (df):
asof_date timestamp other value
0 2013-12-22 2013-12-22 NaN 0
1 2013-12-23 2013-12-23 1 NaN
2 2013-12-24 2013-12-24 NaN NaN
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-03 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
"""
dates = self.dates - timedelta(days=10)
df = pd.DataFrame({
'value': (0, np.nan, np.nan),
'other': (np.nan, 1, np.nan),
'asof_date': dates,
'timestamp': dates,
})
fields = OrderedDict(self.macro_dshape.measure.fields)
fields['other'] = fields['value']
with tmp_asset_finder() as finder:
expected = pd.DataFrame(
np.array([[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1],
[0, 1]]),
columns=['value', 'other'],
index=pd.MultiIndex.from_product(
(self.dates, finder.retrieve_all(self.sids)),
),
).sort_index(axis=1)
self._test_id(
df,
var * Record(fields),
expected,
finder,
('value', 'other'),
)
def test_id_macro_dataset_multiple_columns(self):
"""
input (df):
asof_date timestamp other value
0 2014-01-01 2014-01-01 1 0
3 2014-01-02 2014-01-02 2 1
6 2014-01-03 2014-01-03 3 2
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 2 1
Equity(66 [B]) 2 1
Equity(67 [C]) 2 1
2014-01-03 Equity(65 [A]) 3 2
Equity(66 [B]) 3 2
Equity(67 [C]) 3 2
"""
df = self.macro_df.copy()
df['other'] = df.value + 1
fields = OrderedDict(self.macro_dshape.measure.fields)
fields['other'] = fields['value']
expr = bz.Data(df, name='expr', dshape=var * Record(fields))
loader = BlazeLoader()
ds = from_blaze(
expr,
@@ -370,6 +624,7 @@ class BlazeToPipelineTestCase(TestCase):
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.other.latest, 'other')
dates = self.dates
asset_info = asset_infos[0][0]
@@ -380,16 +635,133 @@ class BlazeToPipelineTestCase(TestCase):
finder,
).run_pipeline(p, dates[0], dates[-1])
nassets = len(asset_info)
expected = pd.DataFrame(
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
np.array([[0, 1],
[1, 2],
[2, 3]]).repeat(3, axis=0),
index=pd.MultiIndex.from_product((
self.macro_df.timestamp,
df.timestamp,
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
columns=('value', 'other'),
).sort_index(axis=1)
assert_frame_equal(
result,
expected.sort_index(axis=1),
check_dtype=False,
)
assert_frame_equal(result, expected, check_dtype=False)
def test_id_take_last_in_group(self):
T = pd.Timestamp
df = pd.DataFrame(
columns=['asof_date', 'timestamp', 'sid', 'other', 'value'],
data=[
[T('2014-01-01'), T('2014-01-01 00'), 65, 0, 0],
[T('2014-01-01'), T('2014-01-01 01'), 65, 1, np.nan],
[T('2014-01-01'), T('2014-01-01 00'), 66, np.nan, np.nan],
[T('2014-01-01'), T('2014-01-01 01'), 66, np.nan, 1],
[T('2014-01-01'), T('2014-01-01 00'), 67, 2, np.nan],
[T('2014-01-01'), T('2014-01-01 01'), 67, np.nan, np.nan],
[T('2014-01-02'), T('2014-01-02 00'), 65, np.nan, np.nan],
[T('2014-01-02'), T('2014-01-02 01'), 65, np.nan, 1],
[T('2014-01-02'), T('2014-01-02 00'), 66, np.nan, np.nan],
[T('2014-01-02'), T('2014-01-02 01'), 66, 2, np.nan],
[T('2014-01-02'), T('2014-01-02 00'), 67, 3, 3],
[T('2014-01-02'), T('2014-01-02 01'), 67, 3, 3],
[T('2014-01-03'), T('2014-01-03 00'), 65, 2, np.nan],
[T('2014-01-03'), T('2014-01-03 01'), 65, 2, np.nan],
[T('2014-01-03'), T('2014-01-03 00'), 66, 3, 3],
[T('2014-01-03'), T('2014-01-03 01'), 66, np.nan, np.nan],
[T('2014-01-03'), T('2014-01-03 00'), 67, np.nan, np.nan],
[T('2014-01-03'), T('2014-01-03 01'), 67, np.nan, 4],
],
)
fields = OrderedDict(self.dshape.measure.fields)
fields['other'] = fields['value']
with tmp_asset_finder() as finder:
expected = pd.DataFrame(
columns=['other', 'value'],
data=[
[1, 0], # 2014-01-01 Equity(65 [A])
[np.nan, 1], # Equity(66 [B])
[2, np.nan], # Equity(67 [C])
[1, 1], # 2014-01-02 Equity(65 [A])
[2, 1], # Equity(66 [B])
[3, 3], # Equity(67 [C])
[2, 1], # 2014-01-03 Equity(65 [A])
[3, 3], # Equity(66 [B])
[3, 3], # Equity(67 [C])
],
index=pd.MultiIndex.from_product(
(self.dates, finder.retrieve_all(self.sids)),
),
)
self._test_id(
df,
var * Record(fields),
expected,
finder,
('value', 'other'),
)
def test_id_take_last_in_group_macro(self):
"""
output (expected):
other value
2014-01-01 Equity(65 [A]) NaN 1
Equity(66 [B]) NaN 1
Equity(67 [C]) NaN 1
2014-01-02 Equity(65 [A]) 1 2
Equity(66 [B]) 1 2
Equity(67 [C]) 1 2
2014-01-03 Equity(65 [A]) 2 2
Equity(66 [B]) 2 2
Equity(67 [C]) 2 2
"""
T = pd.Timestamp
df = pd.DataFrame(
columns=['asof_date', 'timestamp', 'other', 'value'],
data=[
[T('2014-01-01'), T('2014-01-01 00'), np.nan, 1],
[T('2014-01-01'), T('2014-01-01 01'), np.nan, np.nan],
[T('2014-01-02'), T('2014-01-02 00'), 1, np.nan],
[T('2014-01-02'), T('2014-01-02 01'), np.nan, 2],
[T('2014-01-03'), T('2014-01-03 00'), 2, np.nan],
[T('2014-01-03'), T('2014-01-03 01'), 3, 3],
],
)
fields = OrderedDict(self.macro_dshape.measure.fields)
fields['other'] = fields['value']
with tmp_asset_finder() as finder:
expected = pd.DataFrame(
columns=[
'other', 'value',
],
data=[
[np.nan, 1], # 2014-01-01 Equity(65 [A])
[np.nan, 1], # Equity(66 [B])
[np.nan, 1], # Equity(67 [C])
[1, 2], # 2014-01-02 Equity(65 [A])
[1, 2], # Equity(66 [B])
[1, 2], # Equity(67 [C])
[2, 2], # 2014-01-03 Equity(65 [A])
[2, 2], # Equity(66 [B])
[2, 2], # Equity(67 [C])
],
index=pd.MultiIndex.from_product(
(self.dates, finder.retrieve_all(self.sids)),
),
)
self._test_id(
df,
var * Record(fields),
expected,
finder,
('value', 'other'),
)
def _run_pipeline(self,
expr,
+120 -66
View File
@@ -127,7 +127,7 @@ from __future__ import division, absolute_import
from abc import ABCMeta, abstractproperty
from collections import namedtuple, defaultdict
from copy import copy
from functools import partial
from functools import partial, reduce
from itertools import count
import warnings
from weakref import WeakKeyDictionary
@@ -188,7 +188,7 @@ traversable_nodes = (
bz.expr.Label,
)
is_invalid_deltas_node = complement(flip(isinstance, valid_deltas_node_types))
getname = op.attrgetter('__name__')
get__name__ = op.attrgetter('__name__')
class _ExprRepr(object):
@@ -523,8 +523,10 @@ def from_blaze(expr,
raise TypeError(
'expression with deltas may only contain (%s) nodes,'
" found: %s" % (
', '.join(map(getname, valid_deltas_node_types)),
', '.join(set(map(compose(getname, type), invalid_nodes))),
', '.join(map(get__name__, valid_deltas_node_types)),
', '.join(
set(map(compose(get__name__, type), invalid_nodes)),
),
),
)
@@ -602,7 +604,7 @@ def from_blaze(expr,
getdataset = op.attrgetter('dataset')
dataset_name = op.attrgetter('name')
getname = op.attrgetter('name')
def overwrite_novel_deltas(baseline, deltas, dates):
@@ -673,7 +675,7 @@ def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value):
Then the overwrite will apply to indexes: 1, 2, 3, 4
"""
first_row = dense_dates.searchsorted(asof)
next_idx = sparse_dates.searchsorted(asof, 'right')
next_idx = sparse_dates.searchsorted(asof.asm8, 'right')
if next_idx == len(sparse_dates):
# There is no next date in the sparse, this overwrite should apply
# through the end of the dense dates.
@@ -690,8 +692,8 @@ def overwrite_from_dates(asof, dense_dates, sparse_dates, asset_idx, value):
yield Float64Overwrite(first_row, last_row, first, last, value)
def adjustments_from_deltas_no_sids(dates,
dense_dates,
def adjustments_from_deltas_no_sids(dense_dates,
sparse_dates,
column_idx,
column_name,
assets,
@@ -701,10 +703,10 @@ def adjustments_from_deltas_no_sids(dates,
Parameters
----------
dates : pd.DatetimeIndex
The dates requested by the loader.
dense_dates : pd.DatetimeIndex
The dates that were in the dense data.
The dates requested by the loader.
sparse_dates : pd.DatetimeIndex
The dates that were in the raw data.
column_idx : int
The index of the column in the dataset.
column_name : str
@@ -720,18 +722,18 @@ def adjustments_from_deltas_no_sids(dates,
ad_series = deltas[AD_FIELD_NAME]
asset_idx = 0, len(assets) - 1
return {
dates.get_loc(kd): overwrite_from_dates(
dense_dates.get_loc(kd): overwrite_from_dates(
ad_series.loc[kd],
dates,
dense_dates,
sparse_dates,
asset_idx,
v,
) for kd, v in deltas[column_name].iteritems()
}
def adjustments_from_deltas_with_sids(dates,
dense_dates,
def adjustments_from_deltas_with_sids(dense_dates,
sparse_dates,
column_idx,
column_name,
assets,
@@ -744,7 +746,7 @@ def adjustments_from_deltas_with_sids(dates,
dates : pd.DatetimeIndex
The dates requested by the loader.
dense_dates : pd.DatetimeIndex
The dates that were in the dense data.
The dates that were in the raw data.
column_idx : int
The index of the column in the dataset.
column_name : str
@@ -761,11 +763,11 @@ def adjustments_from_deltas_with_sids(dates,
adjustments = defaultdict(list)
for sid_idx, (sid, per_sid) in enumerate(deltas[column_name].iteritems()):
for kd, v in per_sid.iteritems():
adjustments[dates.searchsorted(kd)].extend(
adjustments[dense_dates.searchsorted(kd)].extend(
overwrite_from_dates(
ad_series.loc[kd, sid],
dates,
dense_dates,
sparse_dates,
(sid_idx, sid_idx),
v,
),
@@ -778,8 +780,10 @@ class BlazeLoader(dict):
Parameters
----------
colmap : mapping[BoundColumn -> tuple[Expr, Expr, any]], optional
The initial column mapping to use.
dsmap : mapping, optional
An initial mapping of datasets to ``ExprData`` objects.
NOTE: Further mutations to this map will not be reflected by this
object.
data_query_time : time, optional
The time to use for the data query cutoff.
data_query_tz : tzinfo or str
@@ -787,11 +791,10 @@ class BlazeLoader(dict):
"""
@preprocess(data_query_tz=optionally(ensure_timezone))
def __init__(self,
colmap=None,
dsmap=None,
data_query_time=None,
data_query_tz=None):
self.update(colmap or {})
self.update(dsmap or {})
check_data_query_args(data_query_time, data_query_tz)
self._data_query_time = data_query_time
self._data_query_tz = data_query_tz
@@ -826,8 +829,7 @@ class BlazeLoader(dict):
expr, deltas, resources = self[dataset]
have_sids = SID_FIELD_NAME in expr.fields
assets = list(map(int, assets)) # coerce from numpy.int64
fields = list(map(dataset_name, columns))
query_fields = fields + [AD_FIELD_NAME, TS_FIELD_NAME] + (
added_query_fields = [AD_FIELD_NAME, TS_FIELD_NAME] + (
[SID_FIELD_NAME] if have_sids else []
)
@@ -840,9 +842,47 @@ class BlazeLoader(dict):
data_query_tz,
)
def where(e):
def where(e, column):
"""Create the query to run against the resources.
Parameters
----------
e : Expr
The baseline or deltas expression.
column : BoundColumn
The column to query for.
Returns
-------
q : Expr
The query to run for the given column.
"""
colname = column.name
filtered = e[e[colname].notnull() & (e[TS_FIELD_NAME] <= lower_dt)]
lower = filtered.timestamp.max()
if have_sids:
# If we have sids, then we need to take the earliest of the
# greatest date that has a non-null value by sid.
lower = bz.by(
filtered[SID_FIELD_NAME],
timestamp=lower,
).timestamp.min()
lower = odo(lower, pd.Timestamp)
if lower is pd.NaT:
# If there is no lower date, just query for data in he date
# range. It must all be null anyways.
lower = lower_dt
return e[
(e[TS_FIELD_NAME] >= lower) &
(e[TS_FIELD_NAME] <= upper_dt)
][added_query_fields + [colname]]
def collect_expr(e, _kwargs={'d': resources} if resources else {}):
"""Execute and merge all of the per-column subqueries.
Parameters
----------
e : Expr
@@ -850,28 +890,29 @@ class BlazeLoader(dict):
Returns
-------
q : Expr
The query to run.
result : pd.DataFrame
The resulting dataframe.
Notes
-----
This can return more data than needed. The in memory reindex will
handle this.
"""
ts = e[TS_FIELD_NAME]
# Hack to get the lower bound to query:
# This must be strictly executed because the data for `ts` will
# be removed from scope too early otherwise.
lower = odo(ts[ts <= lower_dt].max(), pd.Timestamp)
selection = ts <= upper_dt
if have_sids:
selection &= e[SID_FIELD_NAME].isin(assets)
if lower is not pd.NaT:
selection &= ts >= lower
return reduce(
partial(pd.merge, on=added_query_fields, how='outer'),
(
odo(where(e, column), pd.DataFrame, **_kwargs)
for column in columns
),
).sort(TS_FIELD_NAME) # sort for the groupby later
return e[selection][query_fields]
extra_kwargs = {'d': resources} if resources else {}
materialized_expr = odo(where(expr), pd.DataFrame, **extra_kwargs)
materialized_expr = collect_expr(expr)
materialized_deltas = (
odo(where(deltas), pd.DataFrame, **extra_kwargs)
collect_expr(deltas)
if deltas is not None else
pd.DataFrame(columns=query_fields)
pd.DataFrame(
columns=added_query_fields + list(map(getname, columns)),
)
)
if data_query_time is not None:
@@ -898,25 +939,41 @@ class BlazeLoader(dict):
)
sparse_output.drop(AD_FIELD_NAME, axis=1, inplace=True)
def last_in_date_group(df, reindex, have_sids=have_sids):
idx = dates[dates.searchsorted(
df[TS_FIELD_NAME].values.astype('datetime64[D]')
)]
if have_sids:
idx = [idx, SID_FIELD_NAME]
last_in_group = df.drop(TS_FIELD_NAME, axis=1).groupby(
idx,
sort=False,
).last()
if have_sids:
last_in_group = last_in_group.unstack()
if reindex:
if have_sids:
cols = last_in_group.columns
last_in_group = last_in_group.reindex(
index=dates,
columns=pd.MultiIndex.from_product(
(cols.levels[0], assets),
names=cols.names,
),
)
else:
last_in_group = last_in_group.reindex(dates)
return last_in_group
sparse_deltas = last_in_date_group(non_novel_deltas, reindex=False)
dense_output = last_in_date_group(sparse_output, reindex=True)
dense_output.ffill(inplace=True)
if have_sids:
# Unstack by the sid so that we get a multi-index on the columns
# of datacolumn, sid.
sparse_output = sparse_output.set_index(
[TS_FIELD_NAME, SID_FIELD_NAME],
).unstack()
sparse_deltas = non_novel_deltas.set_index(
[TS_FIELD_NAME, SID_FIELD_NAME],
).unstack()
dense_output = sparse_output.reindex(dates, method='ffill')
cols = dense_output.columns
dense_output = dense_output.reindex(
columns=pd.MultiIndex.from_product(
(cols.levels[0], assets),
names=cols.names,
),
)
adjustments_from_deltas = adjustments_from_deltas_with_sids
column_view = identity
else:
@@ -932,9 +989,6 @@ class BlazeLoader(dict):
copy,
partial(repeat_last_axis, count=len(assets)),
)
sparse_output = sparse_output.set_index(TS_FIELD_NAME)
dense_output = sparse_output.reindex(dates, method='ffill')
sparse_deltas = non_novel_deltas.set_index(TS_FIELD_NAME)
adjustments_from_deltas = adjustments_from_deltas_no_sids
for column_idx, column in enumerate(columns):
@@ -946,7 +1000,7 @@ class BlazeLoader(dict):
mask,
adjustments_from_deltas(
dates,
sparse_output.index,
sparse_output[TS_FIELD_NAME].values,
column_idx,
column_name,
assets,