mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-03 09:09:19 +08:00
ENH: preallocate the output dataframe
This commit is contained in:
@@ -2,8 +2,8 @@ from __future__ import division
|
||||
|
||||
from abc import ABCMeta, abstractproperty
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from operator import attrgetter
|
||||
from weakref import WeakKeyDictionary
|
||||
|
||||
import blaze as bz
|
||||
from datashape import (
|
||||
@@ -19,7 +19,6 @@ from logbook import Logger
|
||||
from numpy.lib.stride_tricks import as_strided
|
||||
from odo import odo
|
||||
import pandas as pd
|
||||
from pytz import utc
|
||||
from toolz import flip, memoize, compose, complement, identity
|
||||
from six import with_metaclass
|
||||
|
||||
@@ -195,7 +194,7 @@ def _check_datetime_field(name, measure):
|
||||
"""
|
||||
if not isinstance(measure[name], (Date, DateTime)):
|
||||
raise TypeError(
|
||||
"'{n}' field must be a '{dt}', not: '{dshape}'".format(
|
||||
"'{name}' field must be a '{dt}', not: '{dshape}'".format(
|
||||
name=name,
|
||||
dt=DateTime(),
|
||||
dshape=measure[name],
|
||||
@@ -295,7 +294,7 @@ def pipeline_api_from_blaze(expr,
|
||||
if isrecord(expr.dshape.measure):
|
||||
break
|
||||
else:
|
||||
expr = bz.Data({single_column: col})
|
||||
expr = bz.Data(col, name=single_column)
|
||||
|
||||
deltas = _get_deltas(expr, deltas, no_deltas_rule)
|
||||
if deltas is not None:
|
||||
@@ -460,30 +459,12 @@ def adjustments_from_deltas(dates,
|
||||
}
|
||||
|
||||
|
||||
def to_datetime(dt64, factory=datetime.fromtimestamp, _ns_to_s=1000 ** 3):
|
||||
"""Convert a numpy datetime64 to a datetime object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dt64 : datetime64
|
||||
The datetime64 to coerce.
|
||||
factory : callable, optional
|
||||
The function to coerce the timestamp as seconds into an object.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dt : datetime
|
||||
The dt64 coerced to a datetime.
|
||||
"""
|
||||
return factory(int(dt64) / _ns_to_s, tz=utc)
|
||||
|
||||
|
||||
class BlazeLoader(dict):
|
||||
def __init__(self, colmap=None):
|
||||
self.update(colmap or {})
|
||||
|
||||
@classmethod
|
||||
@memoize
|
||||
@memoize(cache=WeakKeyDictionary())
|
||||
def global_instance(cls):
|
||||
return cls()
|
||||
|
||||
@@ -518,11 +499,11 @@ class BlazeLoader(dict):
|
||||
# Hack to get the lower bound to query:
|
||||
# This must be strictly executed because the data for `ts` will
|
||||
# be removed from scope too early otherwise.
|
||||
lower = odo(ts[ts <= to_datetime(dates[0])].max(), pd.Timestamp)
|
||||
lower = odo(ts[ts <= dates[0]].max(), pd.Timestamp)
|
||||
return e[
|
||||
e[SID_FIELD_NAME].isin(assets) &
|
||||
(ts >= lower) &
|
||||
(ts < to_datetime(dates[-1]))
|
||||
((ts >= lower) if lower is not pd.NaT else True) &
|
||||
(ts <= dates[-1])
|
||||
][query_fields]
|
||||
|
||||
materialized_expr = odo(
|
||||
@@ -539,21 +520,41 @@ class BlazeLoader(dict):
|
||||
# Inline the deltas that changed our most recently known value.
|
||||
# Also, we reindex by the dates to create a dense representation of
|
||||
# the data.
|
||||
base = inline_novel_deltas(
|
||||
dense_output = inline_novel_deltas(
|
||||
materialized_expr,
|
||||
materialized_deltas,
|
||||
dates,
|
||||
).drop(AD_FIELD_NAME, axis=1).set_index(TS_FIELD_NAME).reindex(
|
||||
dates,
|
||||
method='ffill',
|
||||
)
|
||||
).drop(AD_FIELD_NAME, axis=1).set_index(TS_FIELD_NAME)
|
||||
|
||||
if have_sids:
|
||||
base.index.name = TS_FIELD_NAME
|
||||
# Unstack by the sid so that we get a multi-index on the columns
|
||||
# of datacolumn, sid.
|
||||
base = base.set_index(SID_FIELD_NAME, append=True).unstack()
|
||||
dense_output = dense_output.set_index(
|
||||
SID_FIELD_NAME,
|
||||
append=True,
|
||||
).unstack()
|
||||
|
||||
# Allocate the whole output dataframe at once instead of
|
||||
# reindexing.
|
||||
sparse_output = pd.DataFrame(
|
||||
columns=pd.MultiIndex.from_product(
|
||||
(dense_output.columns.levels[0], assets),
|
||||
names=(
|
||||
dense_output.columns.levels[0].name,
|
||||
SID_FIELD_NAME,
|
||||
),
|
||||
),
|
||||
index=dates,
|
||||
)
|
||||
|
||||
# In place update the output based on the base.
|
||||
sparse_output.update(dense_output)
|
||||
|
||||
column_view = identity
|
||||
else:
|
||||
# We use the column view to make an array per asset.
|
||||
sparse_output = dense_output.reindex(dates)
|
||||
|
||||
def column_view(arr, _shape=(len(dates), len(assets))):
|
||||
"""Return a virtual matrix where we make a view that
|
||||
duplicates a single column for all the assets.
|
||||
@@ -572,9 +573,15 @@ class BlazeLoader(dict):
|
||||
strides=(arr.itemsize, 0),
|
||||
)
|
||||
|
||||
# Walk forward the data after any symbol mapped or non-symbol mapped
|
||||
# specific transforms have been applied.
|
||||
sparse_output = sparse_output.ffill()
|
||||
|
||||
for column_idx, column in enumerate(columns):
|
||||
yield adjusted_array(
|
||||
column_view(base[column.name].values.astype(column.dtype)),
|
||||
column_view(
|
||||
sparse_output[column.name].values.astype(column.dtype),
|
||||
),
|
||||
mask,
|
||||
adjustments_from_deltas(
|
||||
dates,
|
||||
@@ -585,4 +592,10 @@ class BlazeLoader(dict):
|
||||
)
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(%s)' % (
|
||||
type(self).__name__,
|
||||
super(BlazeLoader, self).__repr__(),
|
||||
)
|
||||
|
||||
global_loader = BlazeLoader.global_instance()
|
||||
|
||||
Reference in New Issue
Block a user