diff --git a/tests/pipeline/test_adjusted_array.py b/tests/pipeline/test_adjusted_array.py index 730af952..0327bdd0 100644 --- a/tests/pipeline/test_adjusted_array.py +++ b/tests/pipeline/test_adjusted_array.py @@ -202,6 +202,85 @@ def _gen_multiplicative_adjustment_cases(dtype): ) +def _gen_multiplicative_adjustment_cases_with_perpsective_offset(dtype): + """ + Generate expected moving windows on a buffer with adjustments. + + We proceed by constructing, at each row, the view of the array we expect in + in all windows anchored on that row. + + In general, if we have an adjustment to be applied once we process the row + at index N, should see that adjustment applied to the underlying buffer for + any window containing the row at index N - 1. + + We then build all legal windows over these buffers. + """ + adjustment_type = { + float64_dtype: Float64Multiply, + }[dtype] + + nrows, ncols = 6, 3 + adjustments = {} + buffer_as_of = [None] * 6 + baseline = full((nrows, ncols), 1, dtype=dtype) + + # Note that row indices are inclusive! + adjustments[1] = [ + adjustment_type(0, 0, 0, 0, coerce_to_dtype(dtype, 2)), + ] + buffer_as_of[0] = array([[2, 1, 1], + [1, 1, 1], + [1, 1, 1], + [1, 1, 1], + [1, 1, 1], + [1, 1, 1]], dtype=dtype) + + # No adjustment at index 2. + buffer_as_of[1] = buffer_as_of[0] + + adjustments[3] = [ + adjustment_type(1, 2, 1, 1, coerce_to_dtype(dtype, 3)), + adjustment_type(0, 1, 0, 0, coerce_to_dtype(dtype, 4)), + ] + buffer_as_of[2] = array([[8, 1, 1], + [4, 3, 1], + [1, 3, 1], + [1, 1, 1], + [1, 1, 1], + [1, 1, 1]], dtype=dtype) + + adjustments[4] = [ + adjustment_type(0, 3, 2, 2, coerce_to_dtype(dtype, 5)) + ] + buffer_as_of[3] = array([[8, 1, 5], + [4, 3, 5], + [1, 3, 5], + [1, 1, 5], + [1, 1, 1], + [1, 1, 1]], dtype=dtype) + + adjustments[5] = [ + adjustment_type(0, 4, 1, 1, coerce_to_dtype(dtype, 6)), + adjustment_type(2, 2, 2, 2, coerce_to_dtype(dtype, 7)), + ] + buffer_as_of[4] = array([[8, 6, 5], + [4, 18, 5], + [1, 18, 35], + [1, 6, 5], + [1, 6, 1], + [1, 1, 1]], dtype=dtype) + + buffer_as_of[5] = buffer_as_of[4] + + return _gen_expectations( + baseline, + default_missing_value_for_dtype(dtype), + adjustments, + buffer_as_of, + nrows, + ) + + def _gen_overwrite_adjustment_cases(dtype): """ Generate test cases for overwrite adjustments. @@ -527,6 +606,23 @@ class AdjustedArrayTestCase(TestCase): for yielded, expected_yield in zip_longest(window_iter, expected): check_arrays(yielded, expected_yield) + @parameterized.expand( + _gen_multiplicative_adjustment_cases_with_perpsective_offset( + float64_dtype)) + def test_multiplicative_adjustments_with_perspective_offset(self, + name, + data, + lookback, + adjustments, + missing_value, + expected): + array = AdjustedArray(data, NOMASK, adjustments, missing_value, 1) + for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable. + window_iter = array.traverse(lookback) + for yielded, expected_yield in zip_longest(window_iter, expected): + print yielded + check_arrays(yielded, expected_yield) + @parameterized.expand( chain( _gen_overwrite_adjustment_cases(float64_dtype), diff --git a/zipline/data/history_loader.py b/zipline/data/history_loader.py index b53f2ced..df225ccc 100644 --- a/zipline/data/history_loader.py +++ b/zipline/data/history_loader.py @@ -107,8 +107,7 @@ class HistoryLoader(with_metaclass(ABCMeta)): def _array(self, start, end, assets, field): pass - def _get_adjustments_in_range(self, asset, dts, field, - is_perspective_after): + def _get_adjustments_in_range(self, asset, dts, field): """ Get the Float64Multiply objects to pass to an AdjustedArrayWindow. @@ -154,11 +153,6 @@ class HistoryLoader(with_metaclass(ABCMeta)): if start < dt <= end: end_loc = dts.searchsorted(dt) adj_loc = end_loc - if is_perspective_after: - # Set adjustment pop location so that it applies - # to last value if adjustment occurs immediately after - # the last slot. - adj_loc -= 1 mult = Float64Multiply(0, end_loc - 1, 0, @@ -175,11 +169,6 @@ class HistoryLoader(with_metaclass(ABCMeta)): if start < dt <= end: end_loc = dts.searchsorted(dt) adj_loc = end_loc - if is_perspective_after: - # Set adjustment pop location so that it applies - # to last value if adjustment occurs immediately after - # the last slot. - adj_loc -= 1 mult = Float64Multiply(0, end_loc - 1, 0, @@ -200,11 +189,6 @@ class HistoryLoader(with_metaclass(ABCMeta)): ratio = s[1] end_loc = dts.searchsorted(dt) adj_loc = end_loc - if is_perspective_after: - # Set adjustment pop location so that it applies - # to last value if adjustment occurs immediately after - # the last slot. - adj_loc -= 1 mult = Float64Multiply(0, end_loc - 1, 0, @@ -284,7 +268,7 @@ class HistoryLoader(with_metaclass(ABCMeta)): for i, asset in enumerate(needed_assets): if self._adjustments_reader: adjs = self._get_adjustments_in_range( - asset, prefetch_dts, field, is_perspective_after) + asset, prefetch_dts, field) else: adjs = {} window = window_type( @@ -292,7 +276,8 @@ class HistoryLoader(with_metaclass(ABCMeta)): view_kwargs, adjs, offset, - size + size, + int(is_perspective_after) ) sliding_window = SlidingWindow(window, size, start_ix, offset) asset_windows[asset] = sliding_window diff --git a/zipline/lib/_windowtemplate.pxi b/zipline/lib/_windowtemplate.pxi index fb50736d..0dbd141c 100644 --- a/zipline/lib/_windowtemplate.pxi +++ b/zipline/lib/_windowtemplate.pxi @@ -35,6 +35,7 @@ cdef class AdjustedArrayWindow: readonly dict view_kwargs readonly Py_ssize_t window_length Py_ssize_t anchor, next_anchor, max_anchor, next_adj + Py_ssize_t perspective_offset dict adjustments list adjustment_indices ndarray last_out @@ -44,14 +45,15 @@ cdef class AdjustedArrayWindow: dict view_kwargs not None, dict adjustments not None, Py_ssize_t offset, - Py_ssize_t window_length): - + Py_ssize_t window_length, + Py_ssize_t perspective_offset): self.data = data self.view_kwargs = view_kwargs self.adjustments = adjustments self.adjustment_indices = sorted(adjustments, reverse=True) self.window_length = window_length self.anchor = window_length + offset + self.perspective_offset = perspective_offset self.next_anchor = self.anchor self.max_anchor = data.shape[0] @@ -65,7 +67,7 @@ cdef class AdjustedArrayWindow: if len(self.adjustment_indices) > 0: return self.adjustment_indices.pop() else: - return self.max_anchor + return -1 def __iter__(self): return self @@ -84,7 +86,9 @@ cdef class AdjustedArrayWindow: # Apply any adjustments that occured before our current anchor. # Equivalently, apply any adjustments known **on or before** the date # for which we're calculating a window. - while self.next_adj < anchor: + while (self.next_adj != -1 + and + self.next_adj - self.perspective_offset < anchor): for adjustment in self.adjustments[self.next_adj]: adjustment.mutate(self.data) diff --git a/zipline/lib/adjusted_array.py b/zipline/lib/adjusted_array.py index 44645acd..cc4d6407 100644 --- a/zipline/lib/adjusted_array.py +++ b/zipline/lib/adjusted_array.py @@ -152,16 +152,22 @@ class AdjustedArray(object): missing_value : object A value to use to fill missing data in yielded windows. Should be a value coercible to `data.dtype`. + perspective_offset : int + The number of rows after the current end of the window, from which the + data is being viewed. This value is used so that adjustments that occur + between the end of the window and the vantage point are applied. """ __slots__ = ( '_data', '_view_kwargs', 'adjustments', 'missing_value', + 'perspective_offset', '__weakref__', ) - def __init__(self, data, mask, adjustments, missing_value): + def __init__(self, data, mask, adjustments, missing_value, + perspective_offset=0): self._data, self._view_kwargs = _normalize_array(data, missing_value) self.adjustments = adjustments @@ -177,6 +183,8 @@ class AdjustedArray(object): ) self._data[~mask] = self.missing_value + self.perspective_offset = perspective_offset + @lazyval def data(self): """ @@ -220,6 +228,7 @@ class AdjustedArray(object): self.adjustments, offset, window_length, + self.perspective_offset ) def inspect(self):