From f8eced41bf6b5e62e85eecf721cbd2a30fe7585f Mon Sep 17 00:00:00 2001 From: Andrew Daniels Date: Mon, 3 Apr 2017 13:01:47 -0400 Subject: [PATCH] PERF: Avoid repeated recursive calls when getting forward-filled close Instead of recursively calling `DailyHistoryAggregator.closes` until we find a non-nan close, we can instead call `load_raw_arrays` once, and find the value from the returned array. --- tests/data/test_resample.py | 104 ++++++++++++++++++++++++++---------- zipline/data/resample.py | 24 +++++++-- 2 files changed, 94 insertions(+), 34 deletions(-) diff --git a/tests/data/test_resample.py b/tests/data/test_resample.py index d9639323..53c5493a 100644 --- a/tests/data/test_resample.py +++ b/tests/data/test_resample.py @@ -30,6 +30,7 @@ from zipline.data.resample import ( ReindexSessionBarReader, ) +from zipline.testing import parameter_space from zipline.testing.fixtures import ( WithEquityMinuteBarData, WithBcolzEquityMinuteBarReader, @@ -253,6 +254,7 @@ EXPECTED_SESSIONS = { class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader, + WithBcolzFutureMinuteBarReader, ZiplineTestCase): # March 2016 @@ -269,7 +271,11 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader, TRADING_ENV_MAX_DATE = END_DATE = pd.Timestamp( '2016-03-31', tz='UTC', ) + + TRADING_CALENDAR_STRS = ('NYSE', 'us_futures') + ASSET_FINDER_EQUITY_SIDS = 1, 2, 3, 4, 5 + ASSET_FINDER_FUTURE_SIDS = 1001, 1002, 1003, 1004 @classmethod def make_equity_info(cls): @@ -285,47 +291,87 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader, frame = EQUITY_CASES[sid] yield sid, frame + @classmethod + def make_futures_info(cls): + future_dict = {} + + for future_sid in cls.ASSET_FINDER_FUTURE_SIDS: + future_dict[future_sid] = { + 'multiplier': 1000, + 'exchange': 'CME', + 'root_symbol': "ABC" + } + + return pd.DataFrame.from_dict(future_dict, orient='index') + + @classmethod + def make_future_minute_bar_data(cls): + for sid in cls.ASSET_FINDER_FUTURE_SIDS: + frame = FUTURE_CASES[sid] + yield sid, frame + def init_instance_fixtures(self): super(MinuteToDailyAggregationTestCase, self).init_instance_fixtures() # Set up a fresh data portal for each test, since order of calling # needs to be tested. self.equity_daily_aggregator = DailyHistoryAggregator( - self.trading_calendar.schedule.market_open, + self.nyse_calendar.schedule.market_open, self.bcolz_equity_minute_bar_reader, - self.trading_calendar + self.nyse_calendar, ) - @parameterized.expand([ - ('open_sid_1', 'open', 1), - ('high_1', 'high', 1), - ('low_1', 'low', 1), - ('close_1', 'close', 1), - ('volume_1', 'volume', 1), - ('open_2', 'open', 2), - ('high_2', 'high', 2), - ('low_2', 'low', 2), - ('close_2', 'close', 2), - ('volume_2', 'volume', 2), - ('open_3', 'open', 3), - ('high_3', 'high', 3), - ('low_3', 'low', 3), - ('close_3', 'close', 3), - ('volume_3', 'volume', 3), - ('open_4', 'open', 4), - ('high_4', 'high', 4), - ('low_4', 'low', 4), - ('close_4', 'close', 4), - ('volume_4', 'volume', 4), - ]) - def test_contiguous_minutes_individual(self, name, field, sid): + self.future_daily_aggregator = DailyHistoryAggregator( + self.us_futures_calendar.schedule.market_open, + self.bcolz_future_minute_bar_reader, + self.us_futures_calendar + ) + + @parameter_space( + field=OHLCV, + sid=ASSET_FINDER_EQUITY_SIDS, + __fail_fast=True, + ) + def test_equity_contiguous_minutes_individual(self, field, sid): + asset = self.asset_finder.retrieve_asset(sid) + minutes = EQUITY_CASES[asset].index + + self._test_contiguous_minutes_individual( + field, + asset, + minutes, + self.equity_daily_aggregator, + ) + + @parameter_space( + field=OHLCV, + sid=ASSET_FINDER_FUTURE_SIDS, + __fail_fast=True, + ) + def test_future_contiguous_minutes_individual(self, field, sid): + asset = self.asset_finder.retrieve_asset(sid) + minutes = FUTURE_CASES[asset].index + + self._test_contiguous_minutes_individual( + field, + asset, + minutes, + self.future_daily_aggregator, + ) + + def _test_contiguous_minutes_individual( + self, + field, + asset, + minutes, + aggregator, + ): # First test each minute in order. method_name = field + 's' results = [] repeat_results = [] - asset = self.asset_finder.retrieve_asset(sid) - minutes = EQUITY_CASES[asset].index + for minute in minutes: - value = getattr(self.equity_daily_aggregator, method_name)( + value = getattr(aggregator, method_name)( [asset], minute)[0] # Prevent regression on building an array when scalar is intended. self.assertIsInstance(value, Real) @@ -334,7 +380,7 @@ class MinuteToDailyAggregationTestCase(WithBcolzEquityMinuteBarReader, # Call a second time with the same dt, to prevent regression # against case where crossed start and end dts caused a crash # instead of the last value. - value = getattr(self.equity_daily_aggregator, method_name)( + value = getattr(aggregator, method_name)( [asset], minute)[0] # Prevent regression on building an array when scalar is intended. self.assertIsInstance(value, Real) diff --git a/zipline/data/resample.py b/zipline/data/resample.py index 538a0c8e..5fe3bdf6 100644 --- a/zipline/data/resample.py +++ b/zipline/data/resample.py @@ -384,6 +384,23 @@ class DailyHistoryAggregator(object): closes = [] session_label = self._trading_calendar.minute_to_session_label(dt) + def _get_filled_close(asset): + """ + Returns the most recent non-nan close for the asset in this + session. If there has been no data in this session on or before the + `dt`, returns `nan` + """ + window = self._minute_reader.load_raw_arrays( + ['close'], + market_open, + dt, + [asset], + )[0] + try: + return window[~np.isnan(window)][-1] + except IndexError: + return np.NaN + for asset in assets: if not asset.is_alive_for_session(session_label): closes.append(np.NaN) @@ -412,9 +429,7 @@ class DailyHistoryAggregator(object): val = self._minute_reader.get_value( asset, dt, 'close') if pd.isnull(val): - val = self.closes( - [asset], - pd.Timestamp(prev_dt, tz='UTC'))[0] + val = _get_filled_close(asset) entries[asset] = (dt_value, val) closes.append(val) continue @@ -422,8 +437,7 @@ class DailyHistoryAggregator(object): val = self._minute_reader.get_value( asset, dt, 'close') if pd.isnull(val): - val = self.closes([asset], - pd.Timestamp(prev_dt, tz='UTC'))[0] + val = _get_filled_close(asset) entries[asset] = (dt_value, val) closes.append(val) continue