mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-28 20:05:52 +08:00
Merge pull request #1735 from quantopian/speedup-daily-history-aggregator-closes
PERF: Avoid repeated recursive calls when getting forward-filled close
This commit is contained in:
+75
-29
@@ -30,6 +30,7 @@ from zipline.data.resample import (
|
||||
ReindexSessionBarReader,
|
||||
)
|
||||
|
||||
from zipline.testing import parameter_space
|
||||
from zipline.testing.fixtures import (
|
||||
WithEquityMinuteBarData,
|
||||
WithBcolzEquityMinuteBarReader,
|
||||
@@ -253,6 +254,7 @@ EXPECTED_SESSIONS = {
|
||||
|
||||
|
||||
class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
|
||||
WithBcolzFutureMinuteBarReader,
|
||||
ZiplineTestCase):
|
||||
|
||||
# March 2016
|
||||
@@ -269,7 +271,11 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
|
||||
TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp(
|
||||
'2016-03-31', tz='UTC',
|
||||
)
|
||||
|
||||
TRADING_CALENDAR_STRS = ('NYSE', 'us_futures')
|
||||
|
||||
ASSET_FINDER_EQUITY_SIDS = 1, 2, 3, 4, 5
|
||||
ASSET_FINDER_FUTURE_SIDS = 1001, 1002, 1003, 1004
|
||||
|
||||
@classmethod
|
||||
def make_equity_info(cls):
|
||||
@@ -285,47 +291,87 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
|
||||
frame = EQUITY_CASES[sid]
|
||||
yield sid, frame
|
||||
|
||||
@classmethod
|
||||
def make_futures_info(cls):
|
||||
future_dict = {}
|
||||
|
||||
for future_sid in cls.ASSET_FINDER_FUTURE_SIDS:
|
||||
future_dict[future_sid] = {
|
||||
'multiplier': 1000,
|
||||
'exchange': 'CME',
|
||||
'root_symbol': "ABC"
|
||||
}
|
||||
|
||||
return pd.DataFrame.from_dict(future_dict, orient='index')
|
||||
|
||||
@classmethod
|
||||
def make_future_minute_bar_data(cls):
|
||||
for sid in cls.ASSET_FINDER_FUTURE_SIDS:
|
||||
frame = FUTURE_CASES[sid]
|
||||
yield sid, frame
|
||||
|
||||
def init_instance_fixtures(self):
|
||||
super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures()
|
||||
# Set up a fresh data portal for each test, since order of calling
|
||||
# needs to be tested.
|
||||
self.equity_daily_aggregator = DailyHistoryAggregator(
|
||||
self.trading_calendar.schedule.market_open,
|
||||
self.nyse_calendar.schedule.market_open,
|
||||
self.bcolz_equity_minute_bar_reader,
|
||||
self.trading_calendar
|
||||
self.nyse_calendar,
|
||||
)
|
||||
|
||||
@parameterized.expand([
|
||||
('open_sid_1', 'open', 1),
|
||||
('high_1', 'high', 1),
|
||||
('low_1', 'low', 1),
|
||||
('close_1', 'close', 1),
|
||||
('volume_1', 'volume', 1),
|
||||
('open_2', 'open', 2),
|
||||
('high_2', 'high', 2),
|
||||
('low_2', 'low', 2),
|
||||
('close_2', 'close', 2),
|
||||
('volume_2', 'volume', 2),
|
||||
('open_3', 'open', 3),
|
||||
('high_3', 'high', 3),
|
||||
('low_3', 'low', 3),
|
||||
('close_3', 'close', 3),
|
||||
('volume_3', 'volume', 3),
|
||||
('open_4', 'open', 4),
|
||||
('high_4', 'high', 4),
|
||||
('low_4', 'low', 4),
|
||||
('close_4', 'close', 4),
|
||||
('volume_4', 'volume', 4),
|
||||
])
|
||||
def test_contiguous_minutes_individual(self, name, field, sid):
|
||||
self.future_daily_aggregator = DailyHistoryAggregator(
|
||||
self.us_futures_calendar.schedule.market_open,
|
||||
self.bcolz_future_minute_bar_reader,
|
||||
self.us_futures_calendar
|
||||
)
|
||||
|
||||
@parameter_space(
|
||||
field=OHLCV,
|
||||
sid=ASSET_FINDER_EQUITY_SIDS,
|
||||
__fail_fast=True,
|
||||
)
|
||||
def test_equity_contiguous_minutes_individual(self, field, sid):
|
||||
asset = self.asset_finder.retrieve_asset(sid)
|
||||
minutes = EQUITY_CASES[asset].index
|
||||
|
||||
self._test_contiguous_minutes_individual(
|
||||
field,
|
||||
asset,
|
||||
minutes,
|
||||
self.equity_daily_aggregator,
|
||||
)
|
||||
|
||||
@parameter_space(
|
||||
field=OHLCV,
|
||||
sid=ASSET_FINDER_FUTURE_SIDS,
|
||||
__fail_fast=True,
|
||||
)
|
||||
def test_future_contiguous_minutes_individual(self, field, sid):
|
||||
asset = self.asset_finder.retrieve_asset(sid)
|
||||
minutes = FUTURE_CASES[asset].index
|
||||
|
||||
self._test_contiguous_minutes_individual(
|
||||
field,
|
||||
asset,
|
||||
minutes,
|
||||
self.future_daily_aggregator,
|
||||
)
|
||||
|
||||
def _test_contiguous_minutes_individual(
|
||||
self,
|
||||
field,
|
||||
asset,
|
||||
minutes,
|
||||
aggregator,
|
||||
):
|
||||
# First test each minute in order.
|
||||
method_name = field + 's'
|
||||
results = []
|
||||
repeat_results = []
|
||||
asset = self.asset_finder.retrieve_asset(sid)
|
||||
minutes = EQUITY_CASES[asset].index
|
||||
|
||||
for minute in minutes:
|
||||
value = getattr(self.equity_daily_aggregator, method_name)(
|
||||
value = getattr(aggregator, method_name)(
|
||||
[asset], minute)[0]
|
||||
# Prevent regression on building an array when scalar is intended.
|
||||
self.assertIsInstance(value, Real)
|
||||
@@ -334,7 +380,7 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
|
||||
# Call a second time with the same dt, to prevent regression
|
||||
# against case where crossed start and end dts caused a crash
|
||||
# instead of the last value.
|
||||
value = getattr(self.equity_daily_aggregator, method_name)(
|
||||
value = getattr(aggregator, method_name)(
|
||||
[asset], minute)[0]
|
||||
# Prevent regression on building an array when scalar is intended.
|
||||
self.assertIsInstance(value, Real)
|
||||
|
||||
@@ -384,6 +384,23 @@ class DailyHistoryAggregator(object):
|
||||
closes = []
|
||||
session_label = self._trading_calendar.minute_to_session_label(dt)
|
||||
|
||||
def _get_filled_close(asset):
|
||||
"""
|
||||
Returns the most recent non-nan close for the asset in this
|
||||
session. If there has been no data in this session on or before the
|
||||
`dt`, returns `nan`
|
||||
"""
|
||||
window = self._minute_reader.load_raw_arrays(
|
||||
['close'],
|
||||
market_open,
|
||||
dt,
|
||||
[asset],
|
||||
)[0]
|
||||
try:
|
||||
return window[~np.isnan(window)][-1]
|
||||
except IndexError:
|
||||
return np.NaN
|
||||
|
||||
for asset in assets:
|
||||
if not asset.is_alive_for_session(session_label):
|
||||
closes.append(np.NaN)
|
||||
@@ -412,9 +429,7 @@ class DailyHistoryAggregator(object):
|
||||
val = self._minute_reader.get_value(
|
||||
asset, dt, 'close')
|
||||
if pd.isnull(val):
|
||||
val = self.closes(
|
||||
[asset],
|
||||
pd.Timestamp(prev_dt, tz='UTC'))[0]
|
||||
val = _get_filled_close(asset)
|
||||
entries[asset] = (dt_value, val)
|
||||
closes.append(val)
|
||||
continue
|
||||
@@ -422,8 +437,7 @@ class DailyHistoryAggregator(object):
|
||||
val = self._minute_reader.get_value(
|
||||
asset, dt, 'close')
|
||||
if pd.isnull(val):
|
||||
val = self.closes([asset],
|
||||
pd.Timestamp(prev_dt, tz='UTC'))[0]
|
||||
val = _get_filled_close(asset)
|
||||
entries[asset] = (dt_value, val)
|
||||
closes.append(val)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user