From 97f6bbc60c7cd779448079bf765e9c4911d27b5d Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Sat, 1 Oct 2016 02:29:37 -0400 Subject: [PATCH] MAINT: Begin making a common adjustment interface. Start making the equity adjustments calculations for the history loader conform to the same method signature as `load_adjustments` provided by `SQLiteAdjustmentReader, so that an `AdjustmentReader` interface can begin to take form. This prepares for creating a `DispatchAdjustmentReader` which will route adjustment calculations for equities to the `HistoryCompatibleUSEquityAdjustmentReader` and continuous futures to a not yet implemented adjustment reader. All of these readers will share the `load_adjustments` method. --- zipline/data/history_loader.py | 180 ++++++++++++++++++--------------- 1 file changed, 101 insertions(+), 79 deletions(-) diff --git a/zipline/data/history_loader.py b/zipline/data/history_loader.py index df225ccc..e2f90cc0 100644 --- a/zipline/data/history_loader.py +++ b/zipline/data/history_loader.py @@ -32,80 +32,27 @@ from zipline.utils.memoize import lazyval from zipline.utils.numpy_utils import float64_dtype -class SlidingWindow(object): - """ - Wrapper around an AdjustedArrayWindow which supports monotonically - increasing (by datetime) requests for a sized window of data. +class HistoryCompatibleUSEquityAdjustmentReader(object): - Parameters - ---------- - window : AdjustedArrayWindow - Window of pricing data with prefetched values beyond the current - simulation dt. - cal_start : int - Index in the overall calendar at which the window starts. - """ + def __init__(self, adjustment_reader): + self._adjustments_reader = adjustment_reader - def __init__(self, window, size, cal_start, offset): - self.window = window - self.cal_start = cal_start - self.current = around(next(window), 3) - self.offset = offset - self.most_recent_ix = self.cal_start + size - - def get(self, end_ix): + def load_adjustments(self, columns, dts, assets, output_offset=0): """ Returns ------- - out : A np.ndarray of the equity pricing up to end_ix after adjustments - and rounding have been applied. + adjustments : list[dict[int -> Adjustment]] + A list, where each element corresponds to the `columns`, of + mappings from index to adjustment objects to apply at that index. """ - if self.most_recent_ix == end_ix: - return self.current - - target = end_ix - self.cal_start - self.offset + 1 - self.current = around(self.window.seek(target), 3) - - self.most_recent_ix = end_ix - return self.current - - -class HistoryLoader(with_metaclass(ABCMeta)): - """ - Loader for sliding history windows, with support for adjustments. - - Parameters - ---------- - trading_calendar: TradingCalendar - Contains the grouping logic needed to assign minutes to periods. - reader : DailyBarReader, MinuteBarReader - Reader for pricing bars. - adjustment_reader : SQLiteAdjustmentReader - Reader for adjustment data. - """ - FIELDS = ('open', 'high', 'low', 'close', 'volume', 'sid') - - def __init__(self, trading_calendar, reader, adjustment_reader, - sid_cache_size=1000): - self.trading_calendar = trading_calendar - self._reader = reader - self._adjustments_reader = adjustment_reader - self._window_blocks = { - field: ExpiringCache(LRU(sid_cache_size)) - for field in self.FIELDS - } - - @abstractproperty - def _prefetch_length(self): - pass - - @abstractproperty - def _calendar(self): - pass - - @abstractmethod - def _array(self, start, end, assets, field): - pass + out = [None] * len(columns) + for i, column in enumerate(columns): + adjs = {} + for asset in assets: + adjs.update(self._get_adjustments_in_range( + asset, dts, column)) + out[i] = adjs + return out def _get_adjustments_in_range(self, asset, dts, field): """ @@ -126,20 +73,15 @@ class HistoryLoader(with_metaclass(ABCMeta)): ---------- asset : Asset The assets for which to get adjustments. - days : iterable of datetime64-like - The days for which adjustment data is needed. + dts : iterable of datetime64-like + The dts for which adjustment data is needed. field : str OHLCV field for which to get the adjustments. - is_perspective_after : bool - see: `PricingHistoryLoader.history` - If True, the index at which the Multiply object is registered to - be popped is calculated so that it applies to the last slot in the - sliding window when the adjustment occurs immediately after the dt - that slot represents. Returns ------- - out : The adjustments as a dict of loc -> Float64Multiply + out : dict[loc -> Float64Multiply] + The adjustments as a dict of loc -> Float64Multiply """ sid = int(asset) start = normalize_date(dts[0]) @@ -200,6 +142,86 @@ class HistoryLoader(with_metaclass(ABCMeta)): adjs[adj_loc] = [mult] return adjs + +class SlidingWindow(object): + """ + Wrapper around an AdjustedArrayWindow which supports monotonically + increasing (by datetime) requests for a sized window of data. + + Parameters + ---------- + window : AdjustedArrayWindow + Window of pricing data with prefetched values beyond the current + simulation dt. + cal_start : int + Index in the overall calendar at which the window starts. + """ + + def __init__(self, window, size, cal_start, offset): + self.window = window + self.cal_start = cal_start + self.current = around(next(window), 3) + self.offset = offset + self.most_recent_ix = self.cal_start + size + + def get(self, end_ix): + """ + Returns + ------- + out : A np.ndarray of the equity pricing up to end_ix after adjustments + and rounding have been applied. + """ + if self.most_recent_ix == end_ix: + return self.current + + target = end_ix - self.cal_start - self.offset + 1 + self.current = around(self.window.seek(target), 3) + + self.most_recent_ix = end_ix + return self.current + + +class HistoryLoader(with_metaclass(ABCMeta)): + """ + Loader for sliding history windows, with support for adjustments. + + Parameters + ---------- + trading_calendar: TradingCalendar + Contains the grouping logic needed to assign minutes to periods. + reader : DailyBarReader, MinuteBarReader + Reader for pricing bars. + adjustment_reader : SQLiteAdjustmentReader + Reader for adjustment data. + """ + FIELDS = ('open', 'high', 'low', 'close', 'volume', 'sid') + + def __init__(self, trading_calendar, reader, adjustment_reader, + sid_cache_size=1000): + self.trading_calendar = trading_calendar + self._reader = reader + if adjustment_reader is not None: + self._adjustments_reader = \ + HistoryCompatibleUSEquityAdjustmentReader(adjustment_reader) + else: + self._adjustments_reader = None + self._window_blocks = { + field: ExpiringCache(LRU(sid_cache_size)) + for field in self.FIELDS + } + + @abstractproperty + def _prefetch_length(self): + pass + + @abstractproperty + def _calendar(self): + pass + + @abstractmethod + def _array(self, start, end, assets, field): + pass + def _ensure_sliding_windows(self, assets, dts, field, is_perspective_after): """ @@ -267,8 +289,8 @@ class HistoryLoader(with_metaclass(ABCMeta)): for i, asset in enumerate(needed_assets): if self._adjustments_reader: - adjs = self._get_adjustments_in_range( - asset, prefetch_dts, field) + adjs = self._adjustments_reader.load_adjustments( + [field], prefetch_dts, [asset])[0] else: adjs = {} window = window_type(