mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-03 21:11:55 +08:00
BUG: Group and aggregate downsampling by trading day.
Otherwise, out of market events would result in a mishaped panel.
This commit is contained in:
@@ -49,7 +49,7 @@ def get_sample_func(item):
|
||||
return 'last'
|
||||
|
||||
|
||||
def downsample_panel(minute_rp, daily_rp, dt):
|
||||
def downsample_panel(minute_rp, daily_rp, mkt_close):
|
||||
"""
|
||||
@minute_rp is a rolling panel, which should have minutely rows
|
||||
@daily_rp is a rolling panel, which should have daily rows
|
||||
@@ -63,14 +63,27 @@ def downsample_panel(minute_rp, daily_rp, dt):
|
||||
cur_panel = minute_rp.get_current()
|
||||
sids = minute_rp.minor_axis
|
||||
day_frame = pd.DataFrame(columns=sids, index=cur_panel.items)
|
||||
dt1 = trading.environment.normalize_date(mkt_close)
|
||||
dt2 = trading.environment.next_trading_day(mkt_close)
|
||||
by_close = functools.partial(get_date, mkt_close, dt1, dt2)
|
||||
for item in minute_rp.items:
|
||||
frame = cur_panel[item]
|
||||
func = get_sample_func(item)
|
||||
dframe = frame.groupby(lambda d: d.date()).resample('1d', how=func)
|
||||
# group by trading day, using the market close of the current
|
||||
# day. If events occurred after the last close (yesterday) but
|
||||
# before today's close, group them into today.
|
||||
dframe = frame.groupby(lambda d: by_close(d)).agg(func)
|
||||
for stock in sids:
|
||||
day_frame[stock][item] = dframe[stock][dframe.index[-1][0]]
|
||||
day_frame[stock][item] = dframe[stock].ix[dt1]
|
||||
# store the frame at midnight instead of the close
|
||||
daily_rp.add_frame(trading.environment.normalize_date(dt), day_frame)
|
||||
daily_rp.add_frame(dt1, day_frame)
|
||||
|
||||
|
||||
def get_date(mkt_close, d1, d2, d):
|
||||
if d > mkt_close:
|
||||
return d2
|
||||
else:
|
||||
return d1
|
||||
|
||||
|
||||
class BatchTransform(object):
|
||||
@@ -276,24 +289,25 @@ class BatchTransform(object):
|
||||
if self.rolling_panel is None:
|
||||
self._init_panels(sids)
|
||||
|
||||
# update trading day counters
|
||||
_, mkt_close = trading.environment.get_open_and_close(event.dt)
|
||||
if self.bars == 'daily':
|
||||
# Daily bars have their dt set to midnight.
|
||||
mkt_close = trading.environment.normalize_date(mkt_close)
|
||||
if event.dt >= mkt_close:
|
||||
if self.downsample:
|
||||
downsample_panel(self.rolling_panel,
|
||||
self.daily_rolling_panel,
|
||||
mkt_close)
|
||||
self.trading_days_total += 1
|
||||
|
||||
# Store event in rolling frame
|
||||
self.rolling_panel.add_frame(event.dt,
|
||||
pd.DataFrame(event.data,
|
||||
index=self.field_names,
|
||||
columns=sids))
|
||||
|
||||
# update trading day counters
|
||||
_, mkt_close = trading.environment.get_open_and_close(event.dt)
|
||||
if self.bars == 'daily':
|
||||
# Daily bars have their dt set to midnight.
|
||||
mkt_close = trading.environment.normalize_date(mkt_close)
|
||||
if event.dt == mkt_close:
|
||||
if self.downsample:
|
||||
downsample_panel(self.rolling_panel,
|
||||
self.daily_rolling_panel,
|
||||
mkt_close
|
||||
)
|
||||
self.trading_days_total += 1
|
||||
|
||||
self.last_dt = event.dt
|
||||
|
||||
if self.trading_days_total >= self.window_length:
|
||||
|
||||
Reference in New Issue
Block a user