MAINT: Perspective offset for load adjustments.

Add a perspective offset to `AdjustedArrayWindow` and `AdjustedArray`,
so that `HistoryLoader` does not need to twiddle with offsets to support
viewing the data from the bar after end of the window, (Which is the
case when a '1d' history window is retrieved in minute mode, which is
explained in the docstring for `HistoryLoader.history`)

Presently, this simplifies the logic in
`HistoryLoader._get_adjustments_in_range`, and other incoming
AdjustmentReader's, (e.g. the roll based adjustment reader for continous
futures.) This patch should also make it easier for history and pipeline
to converge on a singular `load_adjustments` method.
This commit is contained in:
Eddie Hebert
2016-10-03 14:27:42 -04:00
parent 837d8824de
commit 34d4e4b974
4 changed files with 118 additions and 24 deletions
+96
View File
@@ -202,6 +202,85 @@ def _gen_multiplicative_adjustment_cases(dtype):
)
def _gen_multiplicative_adjustment_cases_with_perpsective_offset(dtype):
"""
Generate expected moving windows on a buffer with adjustments.
We proceed by constructing, at each row, the view of the array we expect in
in all windows anchored on that row.
In general, if we have an adjustment to be applied once we process the row
at index N, should see that adjustment applied to the underlying buffer for
any window containing the row at index N - 1.
We then build all legal windows over these buffers.
"""
adjustment_type = {
float64_dtype: Float64Multiply,
}[dtype]
nrows, ncols = 6, 3
adjustments = {}
buffer_as_of = [None] * 6
baseline = full((nrows, ncols), 1, dtype=dtype)
# Note that row indices are inclusive!
adjustments[1] = [
adjustment_type(0, 0, 0, 0, coerce_to_dtype(dtype, 2)),
]
buffer_as_of[0] = array([[2, 1, 1],
[1, 1, 1],
[1, 1, 1],
[1, 1, 1],
[1, 1, 1],
[1, 1, 1]], dtype=dtype)
# No adjustment at index 2.
buffer_as_of[1] = buffer_as_of[0]
adjustments[3] = [
adjustment_type(1, 2, 1, 1, coerce_to_dtype(dtype, 3)),
adjustment_type(0, 1, 0, 0, coerce_to_dtype(dtype, 4)),
]
buffer_as_of[2] = array([[8, 1, 1],
[4, 3, 1],
[1, 3, 1],
[1, 1, 1],
[1, 1, 1],
[1, 1, 1]], dtype=dtype)
adjustments[4] = [
adjustment_type(0, 3, 2, 2, coerce_to_dtype(dtype, 5))
]
buffer_as_of[3] = array([[8, 1, 5],
[4, 3, 5],
[1, 3, 5],
[1, 1, 5],
[1, 1, 1],
[1, 1, 1]], dtype=dtype)
adjustments[5] = [
adjustment_type(0, 4, 1, 1, coerce_to_dtype(dtype, 6)),
adjustment_type(2, 2, 2, 2, coerce_to_dtype(dtype, 7)),
]
buffer_as_of[4] = array([[8, 6, 5],
[4, 18, 5],
[1, 18, 35],
[1, 6, 5],
[1, 6, 1],
[1, 1, 1]], dtype=dtype)
buffer_as_of[5] = buffer_as_of[4]
return _gen_expectations(
baseline,
default_missing_value_for_dtype(dtype),
adjustments,
buffer_as_of,
nrows,
)
def _gen_overwrite_adjustment_cases(dtype):
"""
Generate test cases for overwrite adjustments.
@@ -527,6 +606,23 @@ class AdjustedArrayTestCase(TestCase):
for yielded, expected_yield in zip_longest(window_iter, expected):
check_arrays(yielded, expected_yield)
@parameterized.expand(
_gen_multiplicative_adjustment_cases_with_perpsective_offset(
float64_dtype))
def test_multiplicative_adjustments_with_perspective_offset(self,
name,
data,
lookback,
adjustments,
missing_value,
expected):
array = AdjustedArray(data, NOMASK, adjustments, missing_value, 1)
for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable.
window_iter = array.traverse(lookback)
for yielded, expected_yield in zip_longest(window_iter, expected):
print yielded
check_arrays(yielded, expected_yield)
@parameterized.expand(
chain(
_gen_overwrite_adjustment_cases(float64_dtype),
+4 -19
View File
@@ -107,8 +107,7 @@ class HistoryLoader(with_metaclass(ABCMeta)):
def _array(self, start, end, assets, field):
pass
def _get_adjustments_in_range(self, asset, dts, field,
is_perspective_after):
def _get_adjustments_in_range(self, asset, dts, field):
"""
Get the Float64Multiply objects to pass to an AdjustedArrayWindow.
@@ -154,11 +153,6 @@ class HistoryLoader(with_metaclass(ABCMeta)):
if start < dt <= end:
end_loc = dts.searchsorted(dt)
adj_loc = end_loc
if is_perspective_after:
# Set adjustment pop location so that it applies
# to last value if adjustment occurs immediately after
# the last slot.
adj_loc -= 1
mult = Float64Multiply(0,
end_loc - 1,
0,
@@ -175,11 +169,6 @@ class HistoryLoader(with_metaclass(ABCMeta)):
if start < dt <= end:
end_loc = dts.searchsorted(dt)
adj_loc = end_loc
if is_perspective_after:
# Set adjustment pop location so that it applies
# to last value if adjustment occurs immediately after
# the last slot.
adj_loc -= 1
mult = Float64Multiply(0,
end_loc - 1,
0,
@@ -200,11 +189,6 @@ class HistoryLoader(with_metaclass(ABCMeta)):
ratio = s[1]
end_loc = dts.searchsorted(dt)
adj_loc = end_loc
if is_perspective_after:
# Set adjustment pop location so that it applies
# to last value if adjustment occurs immediately after
# the last slot.
adj_loc -= 1
mult = Float64Multiply(0,
end_loc - 1,
0,
@@ -284,7 +268,7 @@ class HistoryLoader(with_metaclass(ABCMeta)):
for i, asset in enumerate(needed_assets):
if self._adjustments_reader:
adjs = self._get_adjustments_in_range(
asset, prefetch_dts, field, is_perspective_after)
asset, prefetch_dts, field)
else:
adjs = {}
window = window_type(
@@ -292,7 +276,8 @@ class HistoryLoader(with_metaclass(ABCMeta)):
view_kwargs,
adjs,
offset,
size
size,
int(is_perspective_after)
)
sliding_window = SlidingWindow(window, size, start_ix, offset)
asset_windows[asset] = sliding_window
+8 -4
View File
@@ -35,6 +35,7 @@ cdef class AdjustedArrayWindow:
readonly dict view_kwargs
readonly Py_ssize_t window_length
Py_ssize_t anchor, next_anchor, max_anchor, next_adj
Py_ssize_t perspective_offset
dict adjustments
list adjustment_indices
ndarray last_out
@@ -44,14 +45,15 @@ cdef class AdjustedArrayWindow:
dict view_kwargs not None,
dict adjustments not None,
Py_ssize_t offset,
Py_ssize_t window_length):
Py_ssize_t window_length,
Py_ssize_t perspective_offset):
self.data = data
self.view_kwargs = view_kwargs
self.adjustments = adjustments
self.adjustment_indices = sorted(adjustments, reverse=True)
self.window_length = window_length
self.anchor = window_length + offset
self.perspective_offset = perspective_offset
self.next_anchor = self.anchor
self.max_anchor = data.shape[0]
@@ -65,7 +67,7 @@ cdef class AdjustedArrayWindow:
if len(self.adjustment_indices) > 0:
return self.adjustment_indices.pop()
else:
return self.max_anchor
return -1
def __iter__(self):
return self
@@ -84,7 +86,9 @@ cdef class AdjustedArrayWindow:
# Apply any adjustments that occured before our current anchor.
# Equivalently, apply any adjustments known **on or before** the date
# for which we're calculating a window.
while self.next_adj < anchor:
while (self.next_adj != -1
and
self.next_adj - self.perspective_offset < anchor):
for adjustment in self.adjustments[self.next_adj]:
adjustment.mutate(self.data)
+10 -1
View File
@@ -152,16 +152,22 @@ class AdjustedArray(object):
missing_value : object
A value to use to fill missing data in yielded windows.
Should be a value coercible to `data.dtype`.
perspective_offset : int
The number of rows after the current end of the window, from which the
data is being viewed. This value is used so that adjustments that occur
between the end of the window and the vantage point are applied.
"""
__slots__ = (
'_data',
'_view_kwargs',
'adjustments',
'missing_value',
'perspective_offset',
'__weakref__',
)
def __init__(self, data, mask, adjustments, missing_value):
def __init__(self, data, mask, adjustments, missing_value,
perspective_offset=0):
self._data, self._view_kwargs = _normalize_array(data, missing_value)
self.adjustments = adjustments
@@ -177,6 +183,8 @@ class AdjustedArray(object):
)
self._data[~mask] = self.missing_value
self.perspective_offset = perspective_offset
@lazyval
def data(self):
"""
@@ -220,6 +228,7 @@ class AdjustedArray(object):
self.adjustments,
offset,
window_length,
self.perspective_offset
)
def inspect(self):