PERF: Avoid repeated recursive calls when getting forward-filled close

Instead of recursively calling `DailyHistoryAggregator.closes` until we
find a non-nan close, we can instead call `load_raw_arrays` once, and
find the value from the returned array.
This commit is contained in:
Andrew Daniels
2017-04-03 13:01:47 -04:00
parent 1212d2f0a1
commit f8eced41bf
2 changed files with 94 additions and 34 deletions
+75 -29
View File
@@ -30,6 +30,7 @@ from zipline.data.resample import (
ReindexSessionBarReader,
)
from zipline.testing import parameter_space
from zipline.testing.fixtures import (
WithEquityMinuteBarData,
WithBcolzEquityMinuteBarReader,
@@ -253,6 +254,7 @@ EXPECTED_SESSIONS = {
class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
WithBcolzFutureMinuteBarReader,
ZiplineTestCase):
# March 2016
@@ -269,7 +271,11 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp(
'2016-03-31', tz='UTC',
)
TRADING_CALENDAR_STRS = ('NYSE', 'us_futures')
ASSET_FINDER_EQUITY_SIDS = 1, 2, 3, 4, 5
ASSET_FINDER_FUTURE_SIDS = 1001, 1002, 1003, 1004
@classmethod
def make_equity_info(cls):
@@ -285,47 +291,87 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
frame = EQUITY_CASES[sid]
yield sid, frame
@classmethod
def make_futures_info(cls):
future_dict = {}
for future_sid in cls.ASSET_FINDER_FUTURE_SIDS:
future_dict[future_sid] = {
'multiplier': 1000,
'exchange': 'CME',
'root_symbol': "ABC"
}
return pd.DataFrame.from_dict(future_dict, orient='index')
@classmethod
def make_future_minute_bar_data(cls):
for sid in cls.ASSET_FINDER_FUTURE_SIDS:
frame = FUTURE_CASES[sid]
yield sid, frame
def init_instance_fixtures(self):
super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures()
# Set up a fresh data portal for each test, since order of calling
# needs to be tested.
self.equity_daily_aggregator = DailyHistoryAggregator(
self.trading_calendar.schedule.market_open,
self.nyse_calendar.schedule.market_open,
self.bcolz_equity_minute_bar_reader,
self.trading_calendar
self.nyse_calendar,
)
@parameterized.expand([
('open_sid_1', 'open', 1),
('high_1', 'high', 1),
('low_1', 'low', 1),
('close_1', 'close', 1),
('volume_1', 'volume', 1),
('open_2', 'open', 2),
('high_2', 'high', 2),
('low_2', 'low', 2),
('close_2', 'close', 2),
('volume_2', 'volume', 2),
('open_3', 'open', 3),
('high_3', 'high', 3),
('low_3', 'low', 3),
('close_3', 'close', 3),
('volume_3', 'volume', 3),
('open_4', 'open', 4),
('high_4', 'high', 4),
('low_4', 'low', 4),
('close_4', 'close', 4),
('volume_4', 'volume', 4),
])
def test_contiguous_minutes_individual(self, name, field, sid):
self.future_daily_aggregator = DailyHistoryAggregator(
self.us_futures_calendar.schedule.market_open,
self.bcolz_future_minute_bar_reader,
self.us_futures_calendar
)
@parameter_space(
field=OHLCV,
sid=ASSET_FINDER_EQUITY_SIDS,
__fail_fast=True,
)
def test_equity_contiguous_minutes_individual(self, field, sid):
asset = self.asset_finder.retrieve_asset(sid)
minutes = EQUITY_CASES[asset].index
self._test_contiguous_minutes_individual(
field,
asset,
minutes,
self.equity_daily_aggregator,
)
@parameter_space(
field=OHLCV,
sid=ASSET_FINDER_FUTURE_SIDS,
__fail_fast=True,
)
def test_future_contiguous_minutes_individual(self, field, sid):
asset = self.asset_finder.retrieve_asset(sid)
minutes = FUTURE_CASES[asset].index
self._test_contiguous_minutes_individual(
field,
asset,
minutes,
self.future_daily_aggregator,
)
def _test_contiguous_minutes_individual(
self,
field,
asset,
minutes,
aggregator,
):
# First test each minute in order.
method_name = field + 's'
results = []
repeat_results = []
asset = self.asset_finder.retrieve_asset(sid)
minutes = EQUITY_CASES[asset].index
for minute in minutes:
value = getattr(self.equity_daily_aggregator, method_name)(
value = getattr(aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
@@ -334,7 +380,7 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader,
# Call a second time with the same dt, to prevent regression
# against case where crossed start and end dts caused a crash
# instead of the last value.
value = getattr(self.equity_daily_aggregator, method_name)(
value = getattr(aggregator, method_name)(
[asset], minute)[0]
# Prevent regression on building an array when scalar is intended.
self.assertIsInstance(value, Real)
+19 -5
View File
@@ -384,6 +384,23 @@ class DailyHistoryAggregator(object):
closes = []
session_label = self._trading_calendar.minute_to_session_label(dt)
def _get_filled_close(asset):
"""
Returns the most recent non-nan close for the asset in this
session. If there has been no data in this session on or before the
`dt`, returns `nan`
"""
window = self._minute_reader.load_raw_arrays(
['close'],
market_open,
dt,
[asset],
)[0]
try:
return window[~np.isnan(window)][-1]
except IndexError:
return np.NaN
for asset in assets:
if not asset.is_alive_for_session(session_label):
closes.append(np.NaN)
@@ -412,9 +429,7 @@ class DailyHistoryAggregator(object):
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes(
[asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
val = _get_filled_close(asset)
entries[asset] = (dt_value, val)
closes.append(val)
continue
@@ -422,8 +437,7 @@ class DailyHistoryAggregator(object):
val = self._minute_reader.get_value(
asset, dt, 'close')
if pd.isnull(val):
val = self.closes([asset],
pd.Timestamp(prev_dt, tz='UTC'))[0]
val = _get_filled_close(asset)
entries[asset] = (dt_value, val)
closes.append(val)
continue