From ce37ea64a9cec7d1a7c28a462cdad9d5ccb1c202 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Mon, 17 Oct 2016 09:39:38 -0400 Subject: [PATCH] ENH: Add adjusted history for continuous futures. Add `.adj('mul')` and `.adj('add')` methods on ContinuousFuture, which when used with `history`, will calculate and apply adjustments so that the values are adjusted to account for discounts and premiums during rolls. Example usage in an algo: ``` from zipline.api import continuous_future def initialize(context): context.cl_add = continuous_future('CL', offset=0, roll='calendar').adj('add') context.cl_mul = continuous_future('CL', offset=0, roll='calendar').adj('mul') context.cl = continuous_future('CL', offset=0, roll='calendar') schedule_function(print_history) def print_history(context, data): frame = data.history([context.cl, context.cl_add, context.cl_mul], ['price', 'sid'], 20, '1d') print 'unadjusted' print frame.loc[:, :, context.cl] print 'adjusted add' print frame.loc[:, :, context.cl_add] print 'adjusted mul' print frame.loc[:, :, context.cl_mul] ``` --- tests/test_continuous_futures.py | 189 +++++++++++++++++++++++ zipline/assets/assets.py | 53 ++++++- zipline/assets/continuous_futures.pyx | 23 ++- zipline/data/continuous_future_reader.py | 2 - zipline/data/data_portal.py | 8 +- zipline/data/history_loader.py | 158 +++++++++++++++++-- 6 files changed, 408 insertions(+), 25 deletions(-) diff --git a/tests/test_continuous_futures.py b/tests/test_continuous_futures.py index d04dcf39..ff300238 100644 --- a/tests/test_continuous_futures.py +++ b/tests/test_continuous_futures.py @@ -483,6 +483,132 @@ def record_current_contract(algo, data): 135441.440, err_msg="On session after roll, Should be FOJ16's 44th value.") + def test_history_close_session_adjusted(self): + cf = self.data_portal.asset_finder.create_continuous_future( + 'FO', 0, 'calendar') + cf_mul = self.data_portal.asset_finder.create_continuous_future( + 'FO', 0, 'calendar').adj('mul') + cf_add = self.data_portal.asset_finder.create_continuous_future( + 'FO', 0, 'calendar').adj('add') + window = self.data_portal.get_history_window( + [cf, cf_mul, cf_add], + Timestamp('2016-03-06', tz='UTC'), 30, '1d', 'close') + + # Unadjusted value is: 115011.44 + # Adjustment is based on hop from 115231.44 to 122240.001 + # a ratio of ~1.06 + assert_almost_equal( + window.loc['2016-01-26', cf_mul], + 122006.62, + err_msg="At beginning of window, should be FOG16's first value, " + "adjusted.") + + # Difference of 7008.561 + assert_almost_equal( + window.loc['2016-01-26', cf_add], + 122020.001, + err_msg="At beginning of window, should be FOG16's first value, " + "adjusted.") + + assert_almost_equal( + window.loc['2016-02-26', cf_mul], + 125241.440, + err_msg="On session with roll, should be FOH16's 24th value, " + "unadjusted.") + + assert_almost_equal( + window.loc['2016-02-26', cf_add], + 125241.440, + err_msg="On session with roll, should be FOH16's 24th value, " + "unadjusted.") + + assert_almost_equal( + window.loc['2016-02-29', cf_mul], + 125251.440, + err_msg="After roll, Should be FOH16's 25th value, unadjusted.") + + assert_almost_equal( + window.loc['2016-02-29', cf_add], + 125251.440, + err_msg="After roll, Should be FOH16's 25th value, unadjusted.") + + # Advance the window a month. + window = self.data_portal.get_history_window( + [cf, cf_mul, cf_add], + Timestamp('2016-04-06', tz='UTC'), 30, '1d', 'close') + + # Unadjusted value: 115221.44 + # Adjustments based on hops: + # 2016-02-25 00:00:00+00:00 115231.440 + # 2016-02-26 00:00:00+00:00 122240.001 + # ratio: ~1.061 + # difference: 7008.561 + # and + # 2016-03-23 00:00:00+00:00 125421.440 + # 2016-03-24 00:00:00+00:00 132430.001 + # ratio: ~1.056 + # difference: 7008.56 + assert_almost_equal( + window.loc['2016-02-24', cf_mul], + 129059.581, + err_msg="At beginning of window, should be FOG16's 22nd value, " + "with two adjustments.") + + assert_almost_equal( + window.loc['2016-02-24', cf_add], + 129238.561, + err_msg="At beginning of window, should be FOG16's 22nd value, " + "with two adjustments") + + # Unadjusted: 125241.44 + assert_almost_equal( + window.loc['2016-02-26', cf_mul], + 132239.942, + err_msg="On session with roll, should be FOH16's 24th value, " + "with one adjustment.") + + assert_almost_equal( + window.loc['2016-02-26', cf_add], + 132250.0, + err_msg="On session with roll, should be FOH16's 24th value, " + "with one adjustment.") + + # Unadjusted: 125251.44 + assert_almost_equal( + window.loc['2016-02-29', cf_mul], + 132250.500, + err_msg="On session after roll, should be FOH16's 25th value, " + "with one adjustment.") + + assert_almost_equal( + window.loc['2016-02-29', cf_add], + 132260.000, + err_msg="On session after roll, should be FOH16's 25th value, " + "unadjusted.") + + # Unadjusted: 135431.44 + assert_almost_equal( + window.loc['2016-03-24', cf_mul], + 135431.44, + err_msg="On session with roll, should be FOJ16's 43rd value, " + "unadjusted.") + + assert_almost_equal( + window.loc['2016-03-24', cf_add], + 135431.44, + err_msg="On session with roll, should be FOJ16's 43rd value.") + + # Unadjusted: 135441.44 + assert_almost_equal( + window.loc['2016-03-28', cf_mul], + 135441.44, + err_msg="On session after roll, Should be FOJ16's 44th value.") + + assert_almost_equal( + window.loc['2016-03-28', cf_add], + 135441.44, + err_msg="On session after roll, Should be FOJ16's 44th value.") + def test_history_close_minute(self): cf = self.data_portal.asset_finder.create_continuous_future( 'FO', 0, 'calendar') @@ -518,6 +644,69 @@ def record_current_contract(algo, data): 125250.001, "Should remain FOH16 on next session.") + def test_history_close_minute_adjusted(self): + cf = self.data_portal.asset_finder.create_continuous_future( + 'FO', 0, 'calendar') + cf_mul = self.data_portal.asset_finder.create_continuous_future( + 'FO', 0, 'calendar').adj('mul') + cf_add = self.data_portal.asset_finder.create_continuous_future( + 'FO', 0, 'calendar').adj('add') + window = self.data_portal.get_history_window( + [cf, cf_mul, cf_add], + Timestamp('2016-02-25 18:01', tz='US/Eastern').tz_convert('UTC'), + 30, '1m', 'close') + + # Unadjusted: 115231.412 + # Adjustment based on roll: + # 2016-02-25 23:00:00+00:00 115231.440 + # 2016-02-25 23:01:00+00:00 122240.001 + # Ratio: ~1.061 + # Difference: 7008.561 + self.assertEqual(window.loc['2016-02-25 22:32', cf_mul], + 122239.971, + "Should be FOG16 at beginning of window. A minute " + "which is in the 02-25 session, before the roll.") + + self.assertEqual(window.loc['2016-02-25 22:32', cf_add], + 122239.973, + "Should be FOG16 at beginning of window. A minute " + "which is in the 02-25 session, before the roll.") + + # Unadjusted: 115231.44 + # Should use same ratios as above. + self.assertEqual(window.loc['2016-02-25 23:00', cf_mul], + 122240.001, + "Should be FOG16 on on minute before roll minute, " + "adjusted.") + + self.assertEqual(window.loc['2016-02-25 23:00', cf_add], + 122240.001, + "Should be FOG16 on on minute before roll minute, " + "adjusted.") + + self.assertEqual(window.loc['2016-02-25 23:01', cf_mul], + 125240.001, + "Should be FOH16 on minute after roll, unadjusted.") + + self.assertEqual(window.loc['2016-02-25 23:01', cf_add], + 125240.001, + "Should be FOH16 on minute after roll, unadjusted.") + + # Advance the window a session. + window = self.data_portal.get_history_window( + [cf, cf_mul, cf_add], + Timestamp('2016-02-28 18:01', tz='US/Eastern').tz_convert('UTC'), + 30, '1m', 'close') + + # No adjustments in this window. + self.assertEqual(window.loc['2016-02-26 22:32', cf_mul], + 125241.412, + "Should be FOH16 at beginning of window.") + + self.assertEqual(window.loc['2016-02-28 23:01', cf_mul], + 125250.001, + "Should remain FOH16 on next session.") + class OrderedContractsTestCase(ZiplineTestCase): diff --git a/zipline/assets/assets.py b/zipline/assets/assets.py index d6e43e07..9dd928bd 100644 --- a/zipline/assets/assets.py +++ b/zipline/assets/assets.py @@ -132,15 +132,25 @@ CONTINUOUS_FUTURE_ROLL_STYLE_IDS = { 'calendar': 0 } +CONTINUOUS_FUTURE_ADJUSTMENT_STYLE_IDS = { + None: 0, + 'div': 1, + 'add': 2, +} -def _encode_continuous_future_sid(root_symbol, offset, roll_style): - s = struct.Struct("B 2B B B 3B") + +def _encode_continuous_future_sid(root_symbol, + offset, + roll_style, + adjustment_style): + s = struct.Struct("B 2B B B B 2B") # B - sid type # 2B - root symbol # B - offset (could be packed smaller since offsets of greater than 12 are # probably unneeded.) # B - roll type - # 3B - empty space left for parameterized roll types + # B - adjustment + # 2B - empty space left for parameterized roll types # The root symbol currently supports 2 characters. If 3 char root symbols # are needed, the size of the root symbol does not need to change, however @@ -153,7 +163,8 @@ def _encode_continuous_future_sid(root_symbol, offset, roll_style): rs[1], offset, CONTINUOUS_FUTURE_ROLL_STYLE_IDS[roll_style], - 0, 0, 0,) + CONTINUOUS_FUTURE_ADJUSTMENT_STYLE_IDS[adjustment_style], + 0, 0) s.pack_into(a, 0, *values) return int(binascii.hexlify(a), 16) @@ -908,15 +919,45 @@ class AssetFinder(object): end_date = self.retrieve_asset(oc.contract_sids[-1]).end_date exchange = self._get_root_symbol_exchange(root_symbol) - sid = _encode_continuous_future_sid(root_symbol, offset, roll_style) + sid = _encode_continuous_future_sid(root_symbol, offset, + roll_style, + None) + mul_sid = _encode_continuous_future_sid(root_symbol, offset, + roll_style, + 'div') + add_sid = _encode_continuous_future_sid(root_symbol, offset, + roll_style, + 'add') + mul_cf = ContinuousFuture(mul_sid, + root_symbol, + offset, + roll_style, + start_date, + end_date, + exchange, + 'mul') + add_cf = ContinuousFuture(add_sid, + root_symbol, + offset, + roll_style, + start_date, + end_date, + exchange, + 'add') cf = ContinuousFuture(sid, root_symbol, offset, roll_style, start_date, end_date, - exchange) + exchange, + adjustment_children={ + 'mul': mul_cf, + 'add': add_cf + }) self._asset_cache[cf.sid] = cf + self._asset_cache[add_cf.sid] = add_cf + self._asset_cache[mul_cf.sid] = mul_cf return cf def _make_sids(tblattr): diff --git a/zipline/assets/continuous_futures.pyx b/zipline/assets/continuous_futures.pyx index 8c83f1c5..528d1e27 100644 --- a/zipline/assets/continuous_futures.pyx +++ b/zipline/assets/continuous_futures.pyx @@ -65,6 +65,9 @@ cdef class ContinuousFuture: cdef readonly object exchange + cdef readonly object adjustment + cdef readonly object _adjustment_children + _kwargnames = frozenset({ 'sid', 'root_symbol', @@ -81,7 +84,9 @@ cdef class ContinuousFuture: object roll_style, object start_date, object end_date, - object exchange): + object exchange, + object adjustment=None, + dict adjustment_children=None): self.sid = sid self.sid_hash = hash(sid) @@ -91,6 +96,9 @@ cdef class ContinuousFuture: self.exchange = exchange self.start_date = start_date self.end_date = end_date + self.adjustment = adjustment + self._adjustment_children = adjustment_children + def __int__(self): return self.sid @@ -138,16 +146,17 @@ cdef class ContinuousFuture: raise AssertionError('%d is not an operator' % op) def __str__(self): - return '%s(%d [%s, %s, %s])' % ( + return '%s(%d [%s, %s, %s, %s])' % ( type(self).__name__, self.sid, self.root_symbol, self.offset, - self.roll_style + self.roll_style, + self.adjustment, ) def __repr__(self): - attrs = ('root_symbol', 'offset', 'roll_style') + attrs = ('root_symbol', 'offset', 'roll_style', 'adjustment') tuples = ((attr, repr(getattr(self, attr, None))) for attr in attrs) strings = ('%s=%s' % (t[0], t[1]) for t in tuples) @@ -226,6 +235,12 @@ cdef class ContinuousFuture: calendar = get_calendar(self.exchange) return calendar.is_open_on_minute(dt_minute) + def adj(self, style): + try: + return self._adjustment_children[style] + except KeyError: + return None + cdef class OrderedContracts(object): """ diff --git a/zipline/data/continuous_future_reader.py b/zipline/data/continuous_future_reader.py index 0c960f28..ed84f79c 100644 --- a/zipline/data/continuous_future_reader.py +++ b/zipline/data/continuous_future_reader.py @@ -229,8 +229,6 @@ class ContinuousFutureMinuteBarReader(SessionBarReader): # Get partitions partitions_by_asset = {} for asset in assets: - rolls_by_asset[asset] = rf.get_rolls( - asset.root_symbol, start_date, end_date, asset.offset) partitions = [] partitions_by_asset[asset] = partitions rolls = rolls_by_asset[asset] diff --git a/zipline/data/data_portal.py b/zipline/data/data_portal.py index f4f92931..ad2a870d 100644 --- a/zipline/data/data_portal.py +++ b/zipline/data/data_portal.py @@ -229,12 +229,16 @@ class DataPortal(object): self._history_loader = DailyHistoryLoader( self.trading_calendar, _dispatch_session_reader, - self._adjustment_reader + self._adjustment_reader, + self.asset_finder, + self._roll_finders, ) self._minute_history_loader = MinuteHistoryLoader( self.trading_calendar, _dispatch_minute_reader, - self._adjustment_reader + self._adjustment_reader, + self.asset_finder, + self._roll_finders, ) self._first_trading_day = first_trading_day diff --git a/zipline/data/history_loader.py b/zipline/data/history_loader.py index 7867b075..4ae6ebb1 100644 --- a/zipline/data/history_loader.py +++ b/zipline/data/history_loader.py @@ -17,16 +17,20 @@ from abc import ( abstractmethod, abstractproperty, ) -from lru import LRU +from lru import LRU from numpy import around, hstack +from pandas import isnull from pandas.tslib import normalize_date +from toolz import sliding_window from six import with_metaclass +from zipline.assets import Equity +from zipline.assets.continuous_futures import ContinuousFuture from zipline.lib._int64window import AdjustedArrayWindow as Int64Window from zipline.lib._float64window import AdjustedArrayWindow as Float64Window -from zipline.lib.adjustment import Float64Multiply +from zipline.lib.adjustment import Float64Multiply, Float64Add from zipline.utils.cache import ExpiringCache from zipline.utils.memoize import lazyval from zipline.utils.numpy_utils import float64_dtype @@ -143,6 +147,113 @@ class HistoryCompatibleUSEquityAdjustmentReader(object): return adjs +class ContinuousFutureAdjustmentReader(object): + """ + Calculates adjustments for continuous futures, based on the + close and open of the contracts on the either side of each roll. + """ + + def __init__(self, + trading_calendar, + asset_finder, + bar_reader, + roll_finders, + frequency): + self._trading_calendar = trading_calendar + self._asset_finder = asset_finder + self._bar_reader = bar_reader + self._roll_finders = roll_finders + self._frequency = frequency + + def load_adjustments(self, columns, dts, assets): + """ + Returns + ------- + adjustments : list[dict[int -> Adjustment]] + A list, where each element corresponds to the `columns`, of + mappings from index to adjustment objects to apply at that index. + """ + out = [None] * len(columns) + for i, column in enumerate(columns): + adjs = {} + for asset in assets: + adjs.update(self._get_adjustments_in_range( + asset, dts, column)) + out[i] = adjs + return out + + def _make_adjustment(self, + adjustment_type, + left_close, + right_open, + end_loc): + adj_base = left_close - right_open + if adjustment_type == 'mul': + adj_value = 1.0 - adj_base / left_close + adj_class = Float64Multiply + elif adjustment_type == 'add': + adj_value = -adj_base + adj_class = Float64Add + return adj_class(0, + end_loc, + 0, + 0, + adj_value) + + def _get_adjustments_in_range(self, cf, dts, field): + if field == 'volume' or field == 'sid': + return {} + if cf.adjustment is None: + return {} + rf = self._roll_finders[cf.roll_style] + partitions = [] + + rolls = rf.get_rolls(cf.root_symbol, dts[0], dts[-1], + cf.offset) + + tc = self._trading_calendar + + adjs = {} + + for left, right in sliding_window(2, rolls): + left_sid, right_dt = left + right_sid = right[0] + left_dt = tc.previous_session_label(right_dt) + if self._frequency == 'minute': + _, left_dt = tc.open_and_close_for_session(left_dt) + right_dt, _ = tc.open_and_close_for_session(right_dt) + partitions.append((left_sid, + right_sid, + left_dt, + right_dt)) + + for partition in partitions: + left_sid, right_sid, left_dt, right_dt = partition + last_left_dt = self._bar_reader.get_last_traded_dt( + self._asset_finder.retrieve_asset(left_sid), + left_dt) + last_right_dt = self._bar_reader.get_last_traded_dt( + self._asset_finder.retrieve_asset(right_sid), + right_dt) + if isnull(last_left_dt) or isnull(last_right_dt): + continue + left_close = self._bar_reader.get_value( + left_sid, last_left_dt, 'close') + right_open = self._bar_reader.get_value( + right_sid, last_right_dt, 'open') + adj_loc = dts.searchsorted(right_dt) + end_loc = adj_loc - 1 + adj = self._make_adjustment(cf.adjustment, + left_close, + right_open, + end_loc) + try: + adjs[adj_loc].append(adj) + except KeyError: + adjs[adj_loc] = [adj] + return adjs + + class SlidingWindow(object): """ Wrapper around an AdjustedArrayWindow which supports monotonically @@ -196,20 +307,34 @@ class HistoryLoader(with_metaclass(ABCMeta)): """ FIELDS = ('open', 'high', 'low', 'close', 'volume', 'sid') - def __init__(self, trading_calendar, reader, adjustment_reader, + def __init__(self, trading_calendar, reader, equity_adjustment_reader, + asset_finder, + roll_finders=None, sid_cache_size=1000): self.trading_calendar = trading_calendar + self._asset_finder = asset_finder self._reader = reader - if adjustment_reader is not None: - self._adjustments_reader = \ - HistoryCompatibleUSEquityAdjustmentReader(adjustment_reader) - else: - self._adjustments_reader = None + self._adjustment_readers = {} + if equity_adjustment_reader is not None: + self._adjustment_readers[Equity] = \ + HistoryCompatibleUSEquityAdjustmentReader( + equity_adjustment_reader) + if roll_finders: + self._adjustment_readers[ContinuousFuture] =\ + ContinuousFutureAdjustmentReader(trading_calendar, + asset_finder, + reader, + roll_finders, + self._frequency) self._window_blocks = { field: ExpiringCache(LRU(sid_cache_size)) for field in self.FIELDS } + @abstractproperty + def _frequency(self): + pass + @abstractproperty def _prefetch_length(self): pass @@ -257,6 +382,8 @@ class HistoryLoader(with_metaclass(ABCMeta)): asset_windows = {} needed_assets = [] + assets = self._asset_finder.retrieve_all(assets) + for asset in assets: try: asset_windows[asset] = self._window_blocks[field].get( @@ -288,10 +415,11 @@ class HistoryLoader(with_metaclass(ABCMeta)): array = array.astype(float64_dtype) for i, asset in enumerate(needed_assets): - if self._adjustments_reader: - adjs = self._adjustments_reader.load_adjustments( + try: + adj_reader = self._adjustment_readers[type(asset)] + adjs = adj_reader.load_adjustments( [field], prefetch_dts, [asset])[0] - else: + except KeyError: adjs = {} window = window_type( array[:, i].reshape(prefetch_len, 1), @@ -395,6 +523,10 @@ class HistoryLoader(with_metaclass(ABCMeta)): class DailyHistoryLoader(HistoryLoader): + @property + def _frequency(self): + return 'daily' + @property def _prefetch_length(self): return 40 @@ -414,6 +546,10 @@ class DailyHistoryLoader(HistoryLoader): class MinuteHistoryLoader(HistoryLoader): + @property + def _frequency(self): + return 'minute' + @property def _prefetch_length(self): return 1560