ENH: Add adjusted history for continuous futures.

Add `.adj('mul')` and `.adj('add')` methods on ContinuousFuture, which
when used with `history`, will calculate and apply adjustments so that
the values are adjusted to account for discounts and premiums during
rolls.

Example usage in an algo:

```
from zipline.api import continuous_future

def initialize(context):
    context.cl_add = continuous_future('CL', offset=0, roll='calendar').adj('add')
    context.cl_mul = continuous_future('CL', offset=0, roll='calendar').adj('mul')
    context.cl = continuous_future('CL', offset=0, roll='calendar')
    schedule_function(print_history)

def print_history(context, data):
    frame = data.history([context.cl, context.cl_add, context.cl_mul],
                         ['price', 'sid'],
                         20,
                         '1d')
    print 'unadjusted'
    print frame.loc[:, :, context.cl]
    print 'adjusted add'
    print frame.loc[:, :, context.cl_add]
    print 'adjusted mul'
    print frame.loc[:, :, context.cl_mul]
```
This commit is contained in:
Eddie Hebert
2016-10-17 09:39:38 -04:00
parent bf7fdd9685
commit ce37ea64a9
6 changed files with 408 additions and 25 deletions
+189
View File
@@ -483,6 +483,132 @@ def record_current_contract(algo, data):
135441.440,
err_msg="On session after roll, Should be FOJ16's 44th value.")
def test_history_close_session_adjusted(self):
cf = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar')
cf_mul = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar').adj('mul')
cf_add = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar').adj('add')
window = self.data_portal.get_history_window(
[cf, cf_mul, cf_add],
Timestamp('2016-03-06', tz='UTC'), 30, '1d', 'close')
# Unadjusted value is: 115011.44
# Adjustment is based on hop from 115231.44 to 122240.001
# a ratio of ~1.06
assert_almost_equal(
window.loc['2016-01-26', cf_mul],
122006.62,
err_msg="At beginning of window, should be FOG16's first value, "
"adjusted.")
# Difference of 7008.561
assert_almost_equal(
window.loc['2016-01-26', cf_add],
122020.001,
err_msg="At beginning of window, should be FOG16's first value, "
"adjusted.")
assert_almost_equal(
window.loc['2016-02-26', cf_mul],
125241.440,
err_msg="On session with roll, should be FOH16's 24th value, "
"unadjusted.")
assert_almost_equal(
window.loc['2016-02-26', cf_add],
125241.440,
err_msg="On session with roll, should be FOH16's 24th value, "
"unadjusted.")
assert_almost_equal(
window.loc['2016-02-29', cf_mul],
125251.440,
err_msg="After roll, Should be FOH16's 25th value, unadjusted.")
assert_almost_equal(
window.loc['2016-02-29', cf_add],
125251.440,
err_msg="After roll, Should be FOH16's 25th value, unadjusted.")
# Advance the window a month.
window = self.data_portal.get_history_window(
[cf, cf_mul, cf_add],
Timestamp('2016-04-06', tz='UTC'), 30, '1d', 'close')
# Unadjusted value: 115221.44
# Adjustments based on hops:
# 2016-02-25 00:00:00+00:00 115231.440
# 2016-02-26 00:00:00+00:00 122240.001
# ratio: ~1.061
# difference: 7008.561
# and
# 2016-03-23 00:00:00+00:00 125421.440
# 2016-03-24 00:00:00+00:00 132430.001
# ratio: ~1.056
# difference: 7008.56
assert_almost_equal(
window.loc['2016-02-24', cf_mul],
129059.581,
err_msg="At beginning of window, should be FOG16's 22nd value, "
"with two adjustments.")
assert_almost_equal(
window.loc['2016-02-24', cf_add],
129238.561,
err_msg="At beginning of window, should be FOG16's 22nd value, "
"with two adjustments")
# Unadjusted: 125241.44
assert_almost_equal(
window.loc['2016-02-26', cf_mul],
132239.942,
err_msg="On session with roll, should be FOH16's 24th value, "
"with one adjustment.")
assert_almost_equal(
window.loc['2016-02-26', cf_add],
132250.0,
err_msg="On session with roll, should be FOH16's 24th value, "
"with one adjustment.")
# Unadjusted: 125251.44
assert_almost_equal(
window.loc['2016-02-29', cf_mul],
132250.500,
err_msg="On session after roll, should be FOH16's 25th value, "
"with one adjustment.")
assert_almost_equal(
window.loc['2016-02-29', cf_add],
132260.000,
err_msg="On session after roll, should be FOH16's 25th value, "
"unadjusted.")
# Unadjusted: 135431.44
assert_almost_equal(
window.loc['2016-03-24', cf_mul],
135431.44,
err_msg="On session with roll, should be FOJ16's 43rd value, "
"unadjusted.")
assert_almost_equal(
window.loc['2016-03-24', cf_add],
135431.44,
err_msg="On session with roll, should be FOJ16's 43rd value.")
# Unadjusted: 135441.44
assert_almost_equal(
window.loc['2016-03-28', cf_mul],
135441.44,
err_msg="On session after roll, Should be FOJ16's 44th value.")
assert_almost_equal(
window.loc['2016-03-28', cf_add],
135441.44,
err_msg="On session after roll, Should be FOJ16's 44th value.")
def test_history_close_minute(self):
cf = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar')
@@ -518,6 +644,69 @@ def record_current_contract(algo, data):
125250.001,
"Should remain FOH16 on next session.")
def test_history_close_minute_adjusted(self):
cf = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar')
cf_mul = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar').adj('mul')
cf_add = self.data_portal.asset_finder.create_continuous_future(
'FO', 0, 'calendar').adj('add')
window = self.data_portal.get_history_window(
[cf, cf_mul, cf_add],
Timestamp('2016-02-25 18:01', tz='US/Eastern').tz_convert('UTC'),
30, '1m', 'close')
# Unadjusted: 115231.412
# Adjustment based on roll:
# 2016-02-25 23:00:00+00:00 115231.440
# 2016-02-25 23:01:00+00:00 122240.001
# Ratio: ~1.061
# Difference: 7008.561
self.assertEqual(window.loc['2016-02-25 22:32', cf_mul],
122239.971,
"Should be FOG16 at beginning of window. A minute "
"which is in the 02-25 session, before the roll.")
self.assertEqual(window.loc['2016-02-25 22:32', cf_add],
122239.973,
"Should be FOG16 at beginning of window. A minute "
"which is in the 02-25 session, before the roll.")
# Unadjusted: 115231.44
# Should use same ratios as above.
self.assertEqual(window.loc['2016-02-25 23:00', cf_mul],
122240.001,
"Should be FOG16 on on minute before roll minute, "
"adjusted.")
self.assertEqual(window.loc['2016-02-25 23:00', cf_add],
122240.001,
"Should be FOG16 on on minute before roll minute, "
"adjusted.")
self.assertEqual(window.loc['2016-02-25 23:01', cf_mul],
125240.001,
"Should be FOH16 on minute after roll, unadjusted.")
self.assertEqual(window.loc['2016-02-25 23:01', cf_add],
125240.001,
"Should be FOH16 on minute after roll, unadjusted.")
# Advance the window a session.
window = self.data_portal.get_history_window(
[cf, cf_mul, cf_add],
Timestamp('2016-02-28 18:01', tz='US/Eastern').tz_convert('UTC'),
30, '1m', 'close')
# No adjustments in this window.
self.assertEqual(window.loc['2016-02-26 22:32', cf_mul],
125241.412,
"Should be FOH16 at beginning of window.")
self.assertEqual(window.loc['2016-02-28 23:01', cf_mul],
125250.001,
"Should remain FOH16 on next session.")
class OrderedContractsTestCase(ZiplineTestCase):
+47 -6
View File
@@ -132,15 +132,25 @@ CONTINUOUS_FUTURE_ROLL_STYLE_IDS = {
'calendar': 0
}
CONTINUOUS_FUTURE_ADJUSTMENT_STYLE_IDS = {
None: 0,
'div': 1,
'add': 2,
}
def _encode_continuous_future_sid(root_symbol, offset, roll_style):
s = struct.Struct("B 2B B B 3B")
def _encode_continuous_future_sid(root_symbol,
offset,
roll_style,
adjustment_style):
s = struct.Struct("B 2B B B B 2B")
# B - sid type
# 2B - root symbol
# B - offset (could be packed smaller since offsets of greater than 12 are
# probably unneeded.)
# B - roll type
# 3B - empty space left for parameterized roll types
# B - adjustment
# 2B - empty space left for parameterized roll types
# The root symbol currently supports 2 characters. If 3 char root symbols
# are needed, the size of the root symbol does not need to change, however
@@ -153,7 +163,8 @@ def _encode_continuous_future_sid(root_symbol, offset, roll_style):
rs[1],
offset,
CONTINUOUS_FUTURE_ROLL_STYLE_IDS[roll_style],
0, 0, 0,)
CONTINUOUS_FUTURE_ADJUSTMENT_STYLE_IDS[adjustment_style],
0, 0)
s.pack_into(a, 0, *values)
return int(binascii.hexlify(a), 16)
@@ -908,15 +919,45 @@ class AssetFinder(object):
end_date = self.retrieve_asset(oc.contract_sids[-1]).end_date
exchange = self._get_root_symbol_exchange(root_symbol)
sid = _encode_continuous_future_sid(root_symbol, offset, roll_style)
sid = _encode_continuous_future_sid(root_symbol, offset,
roll_style,
None)
mul_sid = _encode_continuous_future_sid(root_symbol, offset,
roll_style,
'div')
add_sid = _encode_continuous_future_sid(root_symbol, offset,
roll_style,
'add')
mul_cf = ContinuousFuture(mul_sid,
root_symbol,
offset,
roll_style,
start_date,
end_date,
exchange,
'mul')
add_cf = ContinuousFuture(add_sid,
root_symbol,
offset,
roll_style,
start_date,
end_date,
exchange,
'add')
cf = ContinuousFuture(sid,
root_symbol,
offset,
roll_style,
start_date,
end_date,
exchange)
exchange,
adjustment_children={
'mul': mul_cf,
'add': add_cf
})
self._asset_cache[cf.sid] = cf
self._asset_cache[add_cf.sid] = add_cf
self._asset_cache[mul_cf.sid] = mul_cf
return cf
def _make_sids(tblattr):
+19 -4
View File
@@ -65,6 +65,9 @@ cdef class ContinuousFuture:
cdef readonly object exchange
cdef readonly object adjustment
cdef readonly object _adjustment_children
_kwargnames = frozenset({
'sid',
'root_symbol',
@@ -81,7 +84,9 @@ cdef class ContinuousFuture:
object roll_style,
object start_date,
object end_date,
object exchange):
object exchange,
object adjustment=None,
dict adjustment_children=None):
self.sid = sid
self.sid_hash = hash(sid)
@@ -91,6 +96,9 @@ cdef class ContinuousFuture:
self.exchange = exchange
self.start_date = start_date
self.end_date = end_date
self.adjustment = adjustment
self._adjustment_children = adjustment_children
def __int__(self):
return self.sid
@@ -138,16 +146,17 @@ cdef class ContinuousFuture:
raise AssertionError('%d is not an operator' % op)
def __str__(self):
return '%s(%d [%s, %s, %s])' % (
return '%s(%d [%s, %s, %s, %s])' % (
type(self).__name__,
self.sid,
self.root_symbol,
self.offset,
self.roll_style
self.roll_style,
self.adjustment,
)
def __repr__(self):
attrs = ('root_symbol', 'offset', 'roll_style')
attrs = ('root_symbol', 'offset', 'roll_style', 'adjustment')
tuples = ((attr, repr(getattr(self, attr, None)))
for attr in attrs)
strings = ('%s=%s' % (t[0], t[1]) for t in tuples)
@@ -226,6 +235,12 @@ cdef class ContinuousFuture:
calendar = get_calendar(self.exchange)
return calendar.is_open_on_minute(dt_minute)
def adj(self, style):
try:
return self._adjustment_children[style]
except KeyError:
return None
cdef class OrderedContracts(object):
"""
-2
View File
@@ -229,8 +229,6 @@ class ContinuousFutureMinuteBarReader(SessionBarReader):
# Get partitions
partitions_by_asset = {}
for asset in assets:
rolls_by_asset[asset] = rf.get_rolls(
asset.root_symbol, start_date, end_date, asset.offset)
partitions = []
partitions_by_asset[asset] = partitions
rolls = rolls_by_asset[asset]
+6 -2
View File
@@ -229,12 +229,16 @@ class DataPortal(object):
self._history_loader = DailyHistoryLoader(
self.trading_calendar,
_dispatch_session_reader,
self._adjustment_reader
self._adjustment_reader,
self.asset_finder,
self._roll_finders,
)
self._minute_history_loader = MinuteHistoryLoader(
self.trading_calendar,
_dispatch_minute_reader,
self._adjustment_reader
self._adjustment_reader,
self.asset_finder,
self._roll_finders,
)
self._first_trading_day = first_trading_day
+147 -11
View File
@@ -17,16 +17,20 @@ from abc import (
abstractmethod,
abstractproperty,
)
from lru import LRU
from lru import LRU
from numpy import around, hstack
from pandas import isnull
from pandas.tslib import normalize_date
from toolz import sliding_window
from six import with_metaclass
from zipline.assets import Equity
from zipline.assets.continuous_futures import ContinuousFuture
from zipline.lib._int64window import AdjustedArrayWindow as Int64Window
from zipline.lib._float64window import AdjustedArrayWindow as Float64Window
from zipline.lib.adjustment import Float64Multiply
from zipline.lib.adjustment import Float64Multiply, Float64Add
from zipline.utils.cache import ExpiringCache
from zipline.utils.memoize import lazyval
from zipline.utils.numpy_utils import float64_dtype
@@ -143,6 +147,113 @@ class HistoryCompatibleUSEquityAdjustmentReader(object):
return adjs
class ContinuousFutureAdjustmentReader(object):
"""
Calculates adjustments for continuous futures, based on the
close and open of the contracts on the either side of each roll.
"""
def __init__(self,
trading_calendar,
asset_finder,
bar_reader,
roll_finders,
frequency):
self._trading_calendar = trading_calendar
self._asset_finder = asset_finder
self._bar_reader = bar_reader
self._roll_finders = roll_finders
self._frequency = frequency
def load_adjustments(self, columns, dts, assets):
"""
Returns
-------
adjustments : list[dict[int -> Adjustment]]
A list, where each element corresponds to the `columns`, of
mappings from index to adjustment objects to apply at that index.
"""
out = [None] * len(columns)
for i, column in enumerate(columns):
adjs = {}
for asset in assets:
adjs.update(self._get_adjustments_in_range(
asset, dts, column))
out[i] = adjs
return out
def _make_adjustment(self,
adjustment_type,
left_close,
right_open,
end_loc):
adj_base = left_close - right_open
if adjustment_type == 'mul':
adj_value = 1.0 - adj_base / left_close
adj_class = Float64Multiply
elif adjustment_type == 'add':
adj_value = -adj_base
adj_class = Float64Add
return adj_class(0,
end_loc,
0,
0,
adj_value)
def _get_adjustments_in_range(self, cf, dts, field):
if field == 'volume' or field == 'sid':
return {}
if cf.adjustment is None:
return {}
rf = self._roll_finders[cf.roll_style]
partitions = []
rolls = rf.get_rolls(cf.root_symbol, dts[0], dts[-1],
cf.offset)
tc = self._trading_calendar
adjs = {}
for left, right in sliding_window(2, rolls):
left_sid, right_dt = left
right_sid = right[0]
left_dt = tc.previous_session_label(right_dt)
if self._frequency == 'minute':
_, left_dt = tc.open_and_close_for_session(left_dt)
right_dt, _ = tc.open_and_close_for_session(right_dt)
partitions.append((left_sid,
right_sid,
left_dt,
right_dt))
for partition in partitions:
left_sid, right_sid, left_dt, right_dt = partition
last_left_dt = self._bar_reader.get_last_traded_dt(
self._asset_finder.retrieve_asset(left_sid),
left_dt)
last_right_dt = self._bar_reader.get_last_traded_dt(
self._asset_finder.retrieve_asset(right_sid),
right_dt)
if isnull(last_left_dt) or isnull(last_right_dt):
continue
left_close = self._bar_reader.get_value(
left_sid, last_left_dt, 'close')
right_open = self._bar_reader.get_value(
right_sid, last_right_dt, 'open')
adj_loc = dts.searchsorted(right_dt)
end_loc = adj_loc - 1
adj = self._make_adjustment(cf.adjustment,
left_close,
right_open,
end_loc)
try:
adjs[adj_loc].append(adj)
except KeyError:
adjs[adj_loc] = [adj]
return adjs
class SlidingWindow(object):
"""
Wrapper around an AdjustedArrayWindow which supports monotonically
@@ -196,20 +307,34 @@ class HistoryLoader(with_metaclass(ABCMeta)):
"""
FIELDS = ('open', 'high', 'low', 'close', 'volume', 'sid')
def __init__(self, trading_calendar, reader, adjustment_reader,
def __init__(self, trading_calendar, reader, equity_adjustment_reader,
asset_finder,
roll_finders=None,
sid_cache_size=1000):
self.trading_calendar = trading_calendar
self._asset_finder = asset_finder
self._reader = reader
if adjustment_reader is not None:
self._adjustments_reader = \
HistoryCompatibleUSEquityAdjustmentReader(adjustment_reader)
else:
self._adjustments_reader = None
self._adjustment_readers = {}
if equity_adjustment_reader is not None:
self._adjustment_readers[Equity] = \
HistoryCompatibleUSEquityAdjustmentReader(
equity_adjustment_reader)
if roll_finders:
self._adjustment_readers[ContinuousFuture] =\
ContinuousFutureAdjustmentReader(trading_calendar,
asset_finder,
reader,
roll_finders,
self._frequency)
self._window_blocks = {
field: ExpiringCache(LRU(sid_cache_size))
for field in self.FIELDS
}
@abstractproperty
def _frequency(self):
pass
@abstractproperty
def _prefetch_length(self):
pass
@@ -257,6 +382,8 @@ class HistoryLoader(with_metaclass(ABCMeta)):
asset_windows = {}
needed_assets = []
assets = self._asset_finder.retrieve_all(assets)
for asset in assets:
try:
asset_windows[asset] = self._window_blocks[field].get(
@@ -288,10 +415,11 @@ class HistoryLoader(with_metaclass(ABCMeta)):
array = array.astype(float64_dtype)
for i, asset in enumerate(needed_assets):
if self._adjustments_reader:
adjs = self._adjustments_reader.load_adjustments(
try:
adj_reader = self._adjustment_readers[type(asset)]
adjs = adj_reader.load_adjustments(
[field], prefetch_dts, [asset])[0]
else:
except KeyError:
adjs = {}
window = window_type(
array[:, i].reshape(prefetch_len, 1),
@@ -395,6 +523,10 @@ class HistoryLoader(with_metaclass(ABCMeta)):
class DailyHistoryLoader(HistoryLoader):
@property
def _frequency(self):
return 'daily'
@property
def _prefetch_length(self):
return 40
@@ -414,6 +546,10 @@ class DailyHistoryLoader(HistoryLoader):
class MinuteHistoryLoader(HistoryLoader):
@property
def _frequency(self):
return 'minute'
@property
def _prefetch_length(self):
return 1560