mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-04 21:19:50 +08:00
MAINT: Begin making a common adjustment interface.
Start making the equity adjustments calculations for the history loader conform to the same method signature as `load_adjustments` provided by `SQLiteAdjustmentReader, so that an `AdjustmentReader` interface can begin to take form. This prepares for creating a `DispatchAdjustmentReader` which will route adjustment calculations for equities to the `HistoryCompatibleUSEquityAdjustmentReader` and continuous futures to a not yet implemented adjustment reader. All of these readers will share the `load_adjustments` method.
This commit is contained in:
+101
-79
@@ -32,80 +32,27 @@ from zipline.utils.memoize import lazyval
|
||||
from zipline.utils.numpy_utils import float64_dtype
|
||||
|
||||
|
||||
class SlidingWindow(object):
|
||||
"""
|
||||
Wrapper around an AdjustedArrayWindow which supports monotonically
|
||||
increasing (by datetime) requests for a sized window of data.
|
||||
class HistoryCompatibleUSEquityAdjustmentReader(object):
|
||||
|
||||
Parameters
|
||||
----------
|
||||
window : AdjustedArrayWindow
|
||||
Window of pricing data with prefetched values beyond the current
|
||||
simulation dt.
|
||||
cal_start : int
|
||||
Index in the overall calendar at which the window starts.
|
||||
"""
|
||||
def __init__(self, adjustment_reader):
|
||||
self._adjustments_reader = adjustment_reader
|
||||
|
||||
def __init__(self, window, size, cal_start, offset):
|
||||
self.window = window
|
||||
self.cal_start = cal_start
|
||||
self.current = around(next(window), 3)
|
||||
self.offset = offset
|
||||
self.most_recent_ix = self.cal_start + size
|
||||
|
||||
def get(self, end_ix):
|
||||
def load_adjustments(self, columns, dts, assets, output_offset=0):
|
||||
"""
|
||||
Returns
|
||||
-------
|
||||
out : A np.ndarray of the equity pricing up to end_ix after adjustments
|
||||
and rounding have been applied.
|
||||
adjustments : list[dict[int -> Adjustment]]
|
||||
A list, where each element corresponds to the `columns`, of
|
||||
mappings from index to adjustment objects to apply at that index.
|
||||
"""
|
||||
if self.most_recent_ix == end_ix:
|
||||
return self.current
|
||||
|
||||
target = end_ix - self.cal_start - self.offset + 1
|
||||
self.current = around(self.window.seek(target), 3)
|
||||
|
||||
self.most_recent_ix = end_ix
|
||||
return self.current
|
||||
|
||||
|
||||
class HistoryLoader(with_metaclass(ABCMeta)):
|
||||
"""
|
||||
Loader for sliding history windows, with support for adjustments.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trading_calendar: TradingCalendar
|
||||
Contains the grouping logic needed to assign minutes to periods.
|
||||
reader : DailyBarReader, MinuteBarReader
|
||||
Reader for pricing bars.
|
||||
adjustment_reader : SQLiteAdjustmentReader
|
||||
Reader for adjustment data.
|
||||
"""
|
||||
FIELDS = ('open', 'high', 'low', 'close', 'volume', 'sid')
|
||||
|
||||
def __init__(self, trading_calendar, reader, adjustment_reader,
|
||||
sid_cache_size=1000):
|
||||
self.trading_calendar = trading_calendar
|
||||
self._reader = reader
|
||||
self._adjustments_reader = adjustment_reader
|
||||
self._window_blocks = {
|
||||
field: ExpiringCache(LRU(sid_cache_size))
|
||||
for field in self.FIELDS
|
||||
}
|
||||
|
||||
@abstractproperty
|
||||
def _prefetch_length(self):
|
||||
pass
|
||||
|
||||
@abstractproperty
|
||||
def _calendar(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _array(self, start, end, assets, field):
|
||||
pass
|
||||
out = [None] * len(columns)
|
||||
for i, column in enumerate(columns):
|
||||
adjs = {}
|
||||
for asset in assets:
|
||||
adjs.update(self._get_adjustments_in_range(
|
||||
asset, dts, column))
|
||||
out[i] = adjs
|
||||
return out
|
||||
|
||||
def _get_adjustments_in_range(self, asset, dts, field):
|
||||
"""
|
||||
@@ -126,20 +73,15 @@ class HistoryLoader(with_metaclass(ABCMeta)):
|
||||
----------
|
||||
asset : Asset
|
||||
The assets for which to get adjustments.
|
||||
days : iterable of datetime64-like
|
||||
The days for which adjustment data is needed.
|
||||
dts : iterable of datetime64-like
|
||||
The dts for which adjustment data is needed.
|
||||
field : str
|
||||
OHLCV field for which to get the adjustments.
|
||||
is_perspective_after : bool
|
||||
see: `PricingHistoryLoader.history`
|
||||
If True, the index at which the Multiply object is registered to
|
||||
be popped is calculated so that it applies to the last slot in the
|
||||
sliding window when the adjustment occurs immediately after the dt
|
||||
that slot represents.
|
||||
|
||||
Returns
|
||||
-------
|
||||
out : The adjustments as a dict of loc -> Float64Multiply
|
||||
out : dict[loc -> Float64Multiply]
|
||||
The adjustments as a dict of loc -> Float64Multiply
|
||||
"""
|
||||
sid = int(asset)
|
||||
start = normalize_date(dts[0])
|
||||
@@ -200,6 +142,86 @@ class HistoryLoader(with_metaclass(ABCMeta)):
|
||||
adjs[adj_loc] = [mult]
|
||||
return adjs
|
||||
|
||||
|
||||
class SlidingWindow(object):
|
||||
"""
|
||||
Wrapper around an AdjustedArrayWindow which supports monotonically
|
||||
increasing (by datetime) requests for a sized window of data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
window : AdjustedArrayWindow
|
||||
Window of pricing data with prefetched values beyond the current
|
||||
simulation dt.
|
||||
cal_start : int
|
||||
Index in the overall calendar at which the window starts.
|
||||
"""
|
||||
|
||||
def __init__(self, window, size, cal_start, offset):
|
||||
self.window = window
|
||||
self.cal_start = cal_start
|
||||
self.current = around(next(window), 3)
|
||||
self.offset = offset
|
||||
self.most_recent_ix = self.cal_start + size
|
||||
|
||||
def get(self, end_ix):
|
||||
"""
|
||||
Returns
|
||||
-------
|
||||
out : A np.ndarray of the equity pricing up to end_ix after adjustments
|
||||
and rounding have been applied.
|
||||
"""
|
||||
if self.most_recent_ix == end_ix:
|
||||
return self.current
|
||||
|
||||
target = end_ix - self.cal_start - self.offset + 1
|
||||
self.current = around(self.window.seek(target), 3)
|
||||
|
||||
self.most_recent_ix = end_ix
|
||||
return self.current
|
||||
|
||||
|
||||
class HistoryLoader(with_metaclass(ABCMeta)):
|
||||
"""
|
||||
Loader for sliding history windows, with support for adjustments.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trading_calendar: TradingCalendar
|
||||
Contains the grouping logic needed to assign minutes to periods.
|
||||
reader : DailyBarReader, MinuteBarReader
|
||||
Reader for pricing bars.
|
||||
adjustment_reader : SQLiteAdjustmentReader
|
||||
Reader for adjustment data.
|
||||
"""
|
||||
FIELDS = ('open', 'high', 'low', 'close', 'volume', 'sid')
|
||||
|
||||
def __init__(self, trading_calendar, reader, adjustment_reader,
|
||||
sid_cache_size=1000):
|
||||
self.trading_calendar = trading_calendar
|
||||
self._reader = reader
|
||||
if adjustment_reader is not None:
|
||||
self._adjustments_reader = \
|
||||
HistoryCompatibleUSEquityAdjustmentReader(adjustment_reader)
|
||||
else:
|
||||
self._adjustments_reader = None
|
||||
self._window_blocks = {
|
||||
field: ExpiringCache(LRU(sid_cache_size))
|
||||
for field in self.FIELDS
|
||||
}
|
||||
|
||||
@abstractproperty
|
||||
def _prefetch_length(self):
|
||||
pass
|
||||
|
||||
@abstractproperty
|
||||
def _calendar(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _array(self, start, end, assets, field):
|
||||
pass
|
||||
|
||||
def _ensure_sliding_windows(self, assets, dts, field,
|
||||
is_perspective_after):
|
||||
"""
|
||||
@@ -267,8 +289,8 @@ class HistoryLoader(with_metaclass(ABCMeta)):
|
||||
|
||||
for i, asset in enumerate(needed_assets):
|
||||
if self._adjustments_reader:
|
||||
adjs = self._get_adjustments_in_range(
|
||||
asset, prefetch_dts, field)
|
||||
adjs = self._adjustments_reader.load_adjustments(
|
||||
[field], prefetch_dts, [asset])[0]
|
||||
else:
|
||||
adjs = {}
|
||||
window = window_type(
|
||||
|
||||
Reference in New Issue
Block a user